Source code for covata.delta.keystore

#   Copyright 2017 Covata Limited or its affiliates
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from __future__ import absolute_import

from abc import ABCMeta, abstractmethod

import six

import os

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey

from . import utils


@six.add_metaclass(ABCMeta)
[docs]class DeltaKeyStore(object): @abstractmethod @utils.check_id("identity_id")
[docs] def store_keys(self, identity_id, private_signing_key, private_encryption_key): """ Stores the signing and encryption key pairs under a given identity id. :param str identity_id: the identity id of the key owner :param private_signing_key: the private signing key object :type private_signing_key: :class:`RSAPrivateKey` :param private_encryption_key: the private cryptographic key object :type private_encryption_key: :class:`RSAPrivateKey` """
@abstractmethod @utils.check_id("identity_id")
[docs] def get_private_signing_key(self, identity_id): """ Loads a private signing key instance for the given identity id. :param str identity_id: the identity id of the key owner :return: the signing private key object """
@abstractmethod @utils.check_id("identity_id")
[docs] def get_private_encryption_key(self, identity_id): """ Loads a private encryption key instance for the given identity id. :param str identity_id: the identity id of the key owner :return: the cryptographic private key object """
[docs]class FileSystemKeyStore(DeltaKeyStore): def __init__(self, key_store_path, key_store_passphrase): """ Constructs a new Filesystem-backed :class:`~.DeltaKeyStore` with the given configuration. :param str key_store_path: the path to the private key store :param str key_store_passphrase: the passphrase to decrypt the keys """ self.key_store_path = os.path.expanduser(key_store_path) self.__key_store_passphrase = str(key_store_passphrase).encode('utf-8') def store_keys(self, identity_id, private_signing_key, private_encryption_key): super(FileSystemKeyStore, self).store_keys( identity_id, private_signing_key, private_encryption_key) self.__save(private_signing_key, "{}.signing.pem".format(identity_id)) self.__save(private_encryption_key, "{}.crypto.pem".format(identity_id)) def get_private_signing_key(self, identity_id): super(FileSystemKeyStore, self).get_private_signing_key(identity_id) return self.__load("{}.signing.pem".format(identity_id)) def get_private_encryption_key(self, identity_id): super(FileSystemKeyStore, self).get_private_encryption_key(identity_id) return self.__load("{}.crypto.pem".format(identity_id)) def __save(self, private_key, file_name): if not isinstance(private_key, RSAPrivateKey): raise TypeError("private_key must be an instance of RSAPrivateKey, " "actual: {}".format(type(private_key).__name__)) pem = private_key.private_bytes( serialization.Encoding.PEM, serialization.PrivateFormat.PKCS8, serialization.BestAvailableEncryption(self.__key_store_passphrase)) file_path = os.path.join(self.key_store_path, file_name) if not os.path.isdir(self.key_store_path): os.makedirs(self.key_store_path) if os.path.isfile(file_path): msg = "Save failed: A key with name [{}] already exists".format( file_name) raise IOError(msg) with open(file_path, 'w') as f: f.write(pem.decode(encoding='utf8')) def __load(self, file_name): # type: (str) -> RSAPrivateKey file_path = os.path.join(self.key_store_path, file_name) with(open(file_path, 'r')) as f: return serialization.load_pem_private_key( f.read().encode('utf-8'), password=self.__key_store_passphrase, backend=default_backend())