Source code for core_aws.ciphers.kms_cipher
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Optional
from aws_encryption_sdk import CommitmentPolicy
from aws_encryption_sdk import EncryptionSDKClient
from aws_encryption_sdk import StrictAwsKmsMasterKeyProvider
from botocore.session import Session
from core_ciphers.base import ICipher
[docs]
class KMSCipher(ICipher):
""" It uses the AWS Encryption SDK for Python """
[docs]
def __init__(
self,
key_arn: str,
botocore_session: Optional[Session] = None,
**kwargs
) -> None:
super().__init__(**kwargs)
if not key_arn or not isinstance(key_arn, str):
raise ValueError("key_arn must be a non-empty string")
if not key_arn.startswith("arn:aws:kms:"):
raise ValueError(
"key_arn must be a valid AWS KMS ARN starting "
"with 'arn:aws:kms:'"
)
self.session = botocore_session
self.key_arn = key_arn
self.client = EncryptionSDKClient(
commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT)
kms_kwargs = {"key_ids": [self.key_arn]}
if self.session:
kms_kwargs["botocore_session"] = self.session # type: ignore
self.master_key_provider = StrictAwsKmsMasterKeyProvider(**kms_kwargs)
[docs]
def encrypt(self, data: bytes, *args, **kwargs) -> bytes:
if not isinstance(data, bytes):
raise TypeError("data must be bytes")
ciphertext, _ = self.client.encrypt(
source=data,
key_provider=self.master_key_provider)
return ciphertext
[docs]
def decrypt(self, data: bytes, *args, **kwargs) -> str:
if not isinstance(data, bytes):
raise TypeError("data must be bytes")
plaintext, _ = self.client.decrypt(
source=data,
key_provider=self.master_key_provider)
return plaintext.decode(encoding=self.encoding)