# Copyright 2017 Covata Limited or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from base64 import b64encode, b64decode
from . import crypto, apiclient
from collections import namedtuple
from datetime import datetime
[docs]class Client:
"""
The main entry point for the Delta SDK.
An instance of this class will provide an interface to work and interact
with the Delta API. The core domain objects (Identity, Secret and
Event) are returned from method calls to this class, and themselves provide
fluent interface that can be used to continue interactive with the Delta
API. Consumers of this SDK can therefore choose whether they wish to
construct all the calls from base values (i.e. id strings such as
identity_id, secret_id, etc) or via the fluent interfaces (or a mixture of
both).
"""
def __init__(self, key_store, api_client_factory=apiclient.ApiClient):
"""
Creates a new DeltaClient instance from the provided configuration.
:param key_store: the key store
:type key_store: :class:`~.DeltaKeyStore`
:param api_client_factory: the API client factory
:type api_client_factory:
(:class:`~.DeltaKeyStore`) -> :class:`~.ApiClient`
"""
self.__key_store = key_store
self.__api_client = api_client_factory(key_store)
@property
def key_store(self):
return self.__key_store
@property
def api_client(self):
return self.__api_client
[docs] def create_identity(self, external_id=None, metadata=None):
"""
Creates a new identity in Delta.
:param external_id: the external id to associate with the identity
:type external_id: str | None
:param metadata: the metadata to associate with the identity
:type metadata: dict[str, str] | None
:return: the identity
:rtype: :class:`~.Identity`
"""
private_signing_key = crypto.generate_private_key()
private_encryption_key = crypto.generate_private_key()
public_signing_key = crypto.serialize_public_key(
private_signing_key.public_key())
public_encryption_key = crypto.serialize_public_key(
private_encryption_key.public_key())
identity_id = self.api_client.register_identity(public_encryption_key,
public_signing_key,
external_id, metadata)
self.key_store.store_keys(identity_id=identity_id,
private_signing_key=private_signing_key,
private_encryption_key=private_encryption_key)
return Identity(self, identity_id, public_encryption_key,
external_id, metadata)
[docs] def get_identity(self, identity_id, identity_to_retrieve=None):
"""
Gets the identity matching the given identity id.
:param str identity_id: the authenticating identity id
:type identity_to_retrieve: str | None
:return: the identity
:rtype: :class:`~.Identity`
"""
response = self.api_client.get_identity(
identity_id,
identity_to_retrieve if identity_to_retrieve else identity_id)
return Identity(self,
response["id"],
response["cryptoPublicKey"],
response.get("externalId"),
response.get("metadata"))
[docs] def get_events(self, identity_id, secret_id=None, rsa_key_owner_id=None):
"""
Gets a list of events associated filtered by secret id or RSA key owner
or both secret id and RSA key owner.
:param str identity_id: the authenticating identity id
:param secret_id: the secret id of interest
:type secret_id: str | None
:param rsa_key_owner_id: the rsa key owner id of interest
:type rsa_key_owner_id: str | None
:return: a generator of audit events
:rtype: generator of :class:`~.Event`
"""
events = self.api_client.get_events(
identity_id, secret_id, rsa_key_owner_id)
for event in events:
details = event["eventDetails"]
timestamp = event["timestamp"]
yield Event(
event_details=EventDetails(
base_secret_id=details.get("baseSecretId"),
requestor_id=details.get("requesterId"),
rsa_key_owner_id=details.get("rsaKeyOwnerId"),
secret_id=details.get("secretId"),
secret_owner_id=details.get("secretOwnerId")
),
host=event["host"],
event_id=event["id"],
source_ip=event["sourceIp"],
timestamp=datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ"),
event_type=event["type"]
)
[docs] def create_secret(self, identity_id, content):
"""
Creates a new secret in Delta with the given byte contents.
:param str identity_id: the authenticating identity id
:param bytes content: the secret contents
:return: the secret
:rtype: :class:`~.Secret`
"""
secret_key = crypto.generate_secret_key()
iv = crypto.generate_initialisation_vector()
public_key = self.key_store.get_private_encryption_key(
identity_id).public_key()
encrypted_key = crypto.encrypt_key_with_public_key(secret_key,
public_key)
cipher_text, tag = crypto.encrypt(content, secret_key, iv)
response = self.api_client.create_secret(
requestor_id=identity_id,
content=b64encode(cipher_text + tag).decode('utf-8'),
encryption_details=dict(
symmetricKey=b64encode(encrypted_key).decode('utf-8'),
initialisationVector=b64encode(iv).decode('utf-8')
))
return self.get_secret(identity_id, response["id"])
[docs] def get_secret(self, identity_id, secret_id):
"""
Gets the given secret by id.
:param str identity_id: the authenticating identity id
:param str secret_id: the id of the secret to retrieve
:return: the secret
:rtype: :class:`~.Secret`
"""
response = self.api_client.get_secret(identity_id, secret_id)
return Secret(self,
response["id"],
response["created"],
response["rsaKeyOwner"],
response["createdBy"],
EncryptionDetails(
response["encryptionDetails"]["symmetricKey"],
response["encryptionDetails"]["initialisationVector"]
),
response.get("baseSecretId"))
[docs] def get_secrets(self,
identity_id,
base_secret_id=None,
created_by=None,
rsa_key_owner_id=None,
metadata=None,
lookup_type=apiclient.SecretLookupType.any,
page=None,
page_size=None):
"""
Gets a list of secrets based on the query parameters, bound by the
pagination parameters.
:param str identity_id: the authenticating identity id
:param base_secret_id: the id of the base secret
:type base_secret_id: str | None
:param created_by: the id of the secret creator
:type created_by: str | None
:param rsa_key_owner_id: the id of the RSA key owner
:type rsa_key_owner_id: str | None
:param metadata: the metadata associated with the secret
:type metadata: dict[str, str] | None
:param lookup_type: the type of the lookup query
:type lookup_type: :class:`~.SecretLookupType`
:param page: the page number
:type page: int | None
:param page_size: the page size
:type page_size: int | None
:return: a generator of secrets satisfying the search criteria
:rtype: generator of :class:`~.Secret`
"""
secrets = self.api_client.get_secrets(
identity_id, base_secret_id, created_by, rsa_key_owner_id,
metadata, lookup_type, page, page_size)
for secret in secrets:
yield Secret(self,
secret_id=secret["id"],
created=secret["created"],
rsa_key_owner=secret.get("rsaKeyOwner"),
created_by=secret["createdBy"],
encryption_details=None,
base_secret_id=secret.get("baseSecret"))
[docs] def get_secret_content_encrypted(self, identity_id, secret_id):
"""
Gets the base64 encoded encrypted content given the secret id.
Note that the returned encrypted content when decoded from base64 has a
trailing 16 byte GCM authentication tag appended (i.e. the cipher text
is the byte range [:-16] and the authentication tag is the remaining
[-16:] bytes).
:param str identity_id: the authenticating identity id
:param str secret_id: the secret id
:return: the encrypted content encoded in base64
:rtype: str
"""
return self.api_client.get_secret_content(identity_id, secret_id)
[docs] def get_secret_content(self, identity_id, secret_id, symmetric_key,
initialisation_vector):
"""
Gets the plaintext content, given the symmetric key and
initialisation vector used for encryption.
:param str identity_id: the authenticating identity id
:param str secret_id: the secret id
:param str symmetric_key:
the symmetric key used for encryption encoded in base64
:param str initialisation_vector:
the initialisation vector encoded in base64
:return: the plaintext content of the secret
:rtype: bytes
"""
encrypted_content = b64decode(
self.get_secret_content_encrypted(identity_id, secret_id))
decrypted_key = crypto.decrypt_with_private_key(
b64decode(symmetric_key),
self.key_store.get_private_encryption_key(identity_id))
return crypto.decrypt(encrypted_content[:-16],
encrypted_content[-16:],
decrypted_key,
b64decode(initialisation_vector))
[docs] def share_secret(self, identity_id, recipient_id, secret_id):
"""
Shares the base secret with the specified recipient. The contents will
be encrypted with the public encryption key of the RSA key owner, and a
new secret key and initialisation vector will be generated. This call
will result in a new derived secret being created and returned.
:param str identity_id: the authenticating identity id
:param str recipient_id: the target identity id to share the base secret
:param str secret_id: the base secret id
:return: the derived secret
:rtype: :class:`~.Secret`
"""
recipient = self.get_identity(identity_id, recipient_id)
secret = self.get_secret(identity_id, secret_id)
secret_key = crypto.generate_secret_key()
iv = crypto.generate_initialisation_vector()
public_key = crypto.deserialize_public_key(
recipient.public_encryption_key)
encrypted_key = crypto.encrypt_key_with_public_key(secret_key,
public_key)
cipher_text, tag = crypto.encrypt(secret.get_content(), secret_key, iv)
response = self.api_client.share_secret(
requestor_id=identity_id,
content=b64encode(cipher_text + tag).decode('utf-8'),
encryption_details=dict(
symmetricKey=b64encode(encrypted_key).decode('utf-8'),
initialisationVector=b64encode(iv).decode('utf-8')),
base_secret_id=secret.secret_id,
rsa_key_owner_id=recipient.identity_id)
return self.get_secret(recipient.identity_id, response["id"])
[docs] def delete_secret(self, identity_id, secret_id):
"""
Deletes the secret with the given secret id.
:param str identity_id: the authenticating identity id
:param str secret_id: the secret id
"""
self.api_client.delete_secret(identity_id, secret_id)
[docs]class Identity:
"""
An instance of this class encapsulates an identity in Covata Delta. An
identity can be a user, application, device or any other identifiable
entity that can create secrets and/or be target recipient of a secret.
An has two sets of asymmetric keys, for encryption and for signing of
requests. Identities may also have optional, public, searchable metadata
and a reference to an identifier in an external system.
"""
def __init__(self, parent, identity_id, public_encryption_key,
external_id, metadata):
"""
Creates a new identity in Delta with the provided metadata
and external id.
:param parent: the Delta client that constructed this instance
:type parent: :class:`~.Client`
:param identity_id: the id of the identity
:param str public_encryption_key: the public signing key of the identity
:param external_id: the external id of the identity
:type external_id: str | None
:param metadata: the metadata belonging to the identity
:type metadata: dict[str, str] | None
"""
self.__parent = parent
self.__identity_id = identity_id
self.__public_encryption_key = public_encryption_key
self.__external_id = external_id
self.__metadata = metadata
@property
def parent(self):
return self.__parent
@property
def identity_id(self):
return self.__identity_id
@property
def public_encryption_key(self):
return self.__public_encryption_key
@property
def external_id(self):
return self.__external_id
@property
def metadata(self):
return self.__metadata
[docs] def get_identity(self, identity_to_retrieve=None):
"""
Gets the identity matching the given identity id.
:type identity_to_retrieve: str | None
:return: the identity
:rtype: :class:`~.Identity`
"""
return self.parent.get_identity(self.identity_id, identity_to_retrieve)
[docs] def get_events(self, secret_id=None, rsa_key_owner_id=None):
"""
Gets a list of events associated filtered by secret id or RSA key owner
or both secret id and RSA key owner.
:param secret_id: the secret id of interest
:type secret_id: str | None
:param rsa_key_owner_id: the rsa key owner id of interest
:type rsa_key_owner_id: str | None
:return: a generator of audit events
:rtype: generator of :class:`~.Event`
"""
return self.parent.get_events(
self.identity_id, secret_id, rsa_key_owner_id)
[docs] def get_secrets(self,
base_secret_id=None,
created_by=None,
rsa_key_owner_id=None,
metadata=None,
lookup_type=apiclient.SecretLookupType.any,
page=None,
page_size=None):
"""
Gets a list of secrets based on the query parameters, bound by the
pagination parameters.
:param base_secret_id: the id of the base secret
:type base_secret_id: str | None
:param created_by: the id of the secret creator
:type created_by: str | None
:param rsa_key_owner_id: the id of the RSA key owner
:type rsa_key_owner_id: str | None
:param metadata: the metadata associated with the secret
:type metadata: dict[str, str] | None
:param lookup_type: the type of the lookup query
:type lookup_type: :class:`~.SecretLookupType`
:param page: the page number
:type page: int | None
:param page_size: the page size
:type page_size: int | None
:return: a generator of secrets satisfying the search criteria
:rtype: generator of :class:`~.Secret`
"""
return self.parent.get_secrets(
self.identity_id, base_secret_id, created_by, rsa_key_owner_id,
metadata, lookup_type, page, page_size)
[docs] def create_secret(self, content):
"""
Creates a new secret in Delta with the given contents.
:param bytes content: the secret content
:return: the secret
:rtype: :class:`~.Secret`
"""
return self.parent.create_secret(self.identity_id, content)
[docs] def retrieve_secret(self, secret_id):
"""
Retrieves a secret with this identity.
:param str secret_id: the secret id
:return: the secret
:rtype: :class:`~.Secret`
"""
return self.parent.get_secret(self.identity_id, secret_id)
[docs] def delete_secret(self, secret_id):
"""
Deletes the secret with the given secret id.
:param str secret_id: the secret id
"""
self.parent.delete_secret(self.identity_id, secret_id)
def __repr__(self):
return "{cls}(identity_id={identity_id})".format(
cls=self.__class__.__name__, identity_id=self.identity_id)
[docs]class Secret:
"""
An instance of this class encapsulates a secret in Covata Delta. A
secret has contents, which is encrypted by a symmetric key algorithm as
defined in the immutable EncryptionDetails class, holding information such
as the symmetric (secret) key, initialisation vector and algorithm. The
symmetric key is encrypted with the public encryption key of the RSA key
owner. This class will return the decrypted contents and symmetric key if
returned as a result of Client.
"""
def __init__(self, parent, secret_id, created, rsa_key_owner, created_by,
encryption_details, base_secret_id=None):
"""
Creates a new secret with the given parameters.
:param parent: the Delta client that constructed this instance
:type parent: :class:`~.Client`
:param str secret_id: the id of the secret
:param str created: the created date
:param str rsa_key_owner: the identity id of the RSA key owner
:param str created_by: the identity id of the secret creator
:param encryption_details: the encryption details of the secret
:type encryption_details: :class:`~.EncryptionDetails`
"""
self.__parent = parent
self.__secret_id = secret_id
self.__created = created
self.__rsa_key_owner = rsa_key_owner
self.__created_by = created_by
self.__encryption_details = encryption_details
self.__base_secret_id = base_secret_id
@property
def parent(self):
return self.__parent
@property
def secret_id(self):
return self.__secret_id
@property
def created(self):
return self.__created
@property
def rsa_key_owner(self):
return self.__rsa_key_owner
@property
def created_by(self):
return self.__created_by
@property
def encryption_details(self):
return self.__encryption_details
@property
def base_secret_id(self):
return self.__base_secret_id
[docs] def get_content(self):
"""
Gets the content of a secret, encrypted with the details defined in the
encryption_details of this secret and encoded in base64.
:return: the content of the secret encoded in base64
:rtype: str
"""
return self.parent.get_secret_content(
self.rsa_key_owner,
self.secret_id,
self.encryption_details.symmetric_key,
self.encryption_details.initialisation_vector)
[docs] def share_with(self, identity_id):
"""
Shares this secret with the target recipient identity. This action
will create a new (derived) secret in Covata Delta, and the new
secret will be returned to the caller.
The credentials of the RSA key owner must be present in the local
key store.
:param str identity_id: the recipient identity id
:return: the derived secret
:rtype: :class:`~.Secret`
"""
return self.parent.share_secret(
self.created_by,
identity_id,
self.secret_id)
[docs] def get_events(self, rsa_key_owner_id=None):
"""
Gets a list of events associated filtered by this secret id or
both this secret id and RSA key owner.
The credentials of the secret creator must be present in the local
key store.
:param rsa_key_owner_id: the rsa key owner id of interest
:type rsa_key_owner_id: str | None
:return: a generator of audit events
:rtype: generator of :class:`~.Event`
"""
return self.parent.get_events(self.created_by, self.secret_id,
rsa_key_owner_id)
[docs] def get_derived_secrets(self, page=None, page_size=None):
"""
Gets a list of secrets derived from this secret, bound by the pagination
parameters.
The credentials of the secret creator be present in the local
key store.
:param page: the page number
:type page: int | None
:param page_size: the page size
:type page_size: int | None
:return: a generator of secrets
:rtype: generator of :class:`~.Secret`
"""
self.parent.get_secrets(requestor_id=self.created_by,
base_secret_id=self.secret_id,
page=page,
page_size=page_size)
def __repr__(self):
return "{cls}(secret_id={secret_id})".format(
cls=self.__class__.__name__, secret_id=self.secret_id)
[docs]class EncryptionDetails:
"""
This class holds the necessary key materials required to decrypt a
particular secret. The symmetric key itself is protected by a public
encryption key belonging to an identity.
"""
def __init__(self, symmetric_key, initialisation_vector):
"""
Creates a new encryption details with the given parameters.
:param str symmetric_key: the symmetric key
:param str initialisation_vector: the initialisation vector
"""
self.__symmetric_key = symmetric_key
self.__initialisation_vector = initialisation_vector
@property
def symmetric_key(self):
return self.__symmetric_key
@property
def initialisation_vector(self):
return self.__initialisation_vector
[docs]class EventDetails(namedtuple("EventDetails", [
"base_secret_id", "requestor_id", "rsa_key_owner_id", "secret_id",
"secret_owner_id"
])):
"""
This class describes the details of an event related to a secret.
Information includes the secret id, the owner identity id of the secret, and
the identity id triggering the event.
Additional information such as base secret id and
RSA key owner id are also available for derived secrets.
"""
def __init__(self, base_secret_id, requestor_id, rsa_key_owner_id,
secret_id, secret_owner_id):
"""
Creates an instance of event details.
:param str base_secret_id: the id of the base secret
:param str requestor_id: the id of the requesting identity
:param str rsa_key_owner_id: the id of the RSA key owner
:param str secret_id: the id of the secret
:param str secret_owner_id: the id of the secret owner
"""
super(EventDetails, self).__init__()
[docs]class Event:
"""
An instance of this class encapsulates an event in Covata Delta. An
event is an audit entry representing an action undertaken by an
identity on a secret.
"""
def __init__(self,
event_details,
host,
event_id,
source_ip,
timestamp,
event_type):
"""
Creates a new :class:`~.Event` with the given parameters.
:param event_details: details of the audit event.
:type event_details: :class:`~.EventDetails`
:param str host: the host address
:param str event_id: the identifier of the event object
:param str source_ip: the source IP address
:param timestamp: the timestamp of the event
:type timestamp: datetime
:param str event_type: the type of the event
"""
self.__event_details = event_details
self.__host = host
self.__event_id = event_id
self.__source_ip = source_ip
self.__timestamp = timestamp
self.__event_type = event_type
@property
def event_details(self):
return self.__event_details
@property
def host(self):
return self.__host
@property
def event_id(self):
return self.__event_id
@property
def source_ip(self):
return self.__source_ip
@property
def timestamp(self):
return self.__timestamp
@property
def event_type(self):
return self.__event_type
def __repr__(self):
return "{cls}(event_id={event_id})".format(
cls=self.__class__.__name__, event_id=self.event_id)