Skip to content

hupe1980/cryptoshredding

Repository files navigation

CryptoShredding

Latest Version Supported Python Versions ci

Crypto shredding is the practice of 'deleting' data through the destruction of the cryptographic keys protecting the data.

You can find the source on GitHub.

Getting Started

Required Prerequisites

  • Python 3.6+

Installation

Note

If you have not already installed cryptography, you might need to install additional prerequisites as detailed in the cryptography installation guide for your operating system.

$ pip install cryptoshredding

Usage

KeyStore

import boto3
from cryptoshredding import DynamodbKeyStore
from dynamodb_encryption_sdk.material_providers.aws_kms import AwsKmsCryptographicMaterialsProvider

aws_cmk_id = "arn:aws:kms:YOUR_KEY"
aws_kms_cmp = AwsKmsCryptographicMaterialsProvider(key_id=aws_cmk_id)

table = boto3.resource("dynamodb").Table("key_store_table")
key_store = DynamodbKeyStore(table=table, materials_provider=aws_kms_cmp)

key_id = "key4711"
key_store.create_main_key(key_id)

main_key = key_store.get_main_key(key_id)

key_store.delete_main_key(key_id)  # shredding

MainKey

import boto3
from cryptoshredding import MainKey

main_key = key_store.get_main_key(key_id)
data_key, encrypted_data_key = main_key.generate_data_key()

decrypted_data_key = main_key.decrypt(encrypted_data_key)
assert data_key == decrypted_data_key

Dynamodb

import boto3
from cryptoshredding.dynamodb import CryptoTable

table = boto3.resource("dynamodb").Table("data_table")
crypto_table = CryptoTable(
   table=table,
   key_store=key_store,
)

crypto_table.put_item(
   CSEKeyId=key_id,
   Item=plaintext_item
)

index_key = {"id": "foo"}
encrypted_item = table.get_item(Key=index_key)["Item"]
decrypted_item = crypto_table.get_item(Key=index_key)["Item"]

encrypted_items = table.scan()["Items"]
decrypted_items = crypto_table.scan()["Items"]

assert len(encrypted_items) == 1
assert len(decrypted_items) == 1

key_store.delete_main_key(key_id)  # shredding

encrypted_items = table.scan()["Items"]
decrypted_items = crypto_table.scan()["Items"]

assert len(encrypted_items) == 1
assert len(decrypted_items) == 0  # !!!

S3

import boto3
from cryptoshredding.s3 import CryptoClient

s3 = boto3.client("s3", region_name="us-east-1")
crypto_client = CryptoClient(
   client=s3,
   key_store=key_store,
)

crypto_s3.put_object(
   CSEKeyId=key_id,
   Bucket=bucket.name,
   Key="object",
   Body="foo bar"",
)

encrypted_obj = s3.get_object(
   Bucket=bucket.name,
   Key="object",
)

decrypted_obj = crypto_s3.get_object(
   Bucket=bucket.name,
   Key="object",
)

File

from cryptoshredding.raw import CryptoFile

crypto_file = CryptoFile(
   key_store=key_store,
)

crypto_file.encrypt(
   key_id=key_id,
   plaintext_filename="plain.txt",
   ciphertext_filename="cipher.txt"
)

crypto_file.decrypt(
   ciphertext_filename="cipher.txt",
   plaintext_filename="decrypt.txt",
)

Bytes

from cryptoshredding.raw import CryptoBytes

crypto_bytes = CryptoBytes(
   key_store=key_store,
)

encrypted, encrypted_header = crypto_bytes.encrypt(
   key_id=key_id,
   data=plain,
)

decrypted, decrypted_header = crypto_bytes.decrypt(
   data=encrypted,
)

Kinesis

import boto3
from cryptoshredding.kinesis import CryptoClient

kinesis = boto3.client("kinesis", region_name="us-east-1")
crypto_kinesis = CryptoClient(
    client=kinesis,
    key_store=key_store,
)

data = b"foo bar"

crypto_kinesis.put_record(
    CSEKeyId=key_id,
    StreamName=stream_name,
    Data=data,
    PartitionKey="key1",
)

response = crypto_kinesis.describe_stream(
    StreamName=stream_name,
)
shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]

response = crypto_kinesis.get_shard_iterator(
    StreamName=stream_name,
    ShardId=shard_id,
    ShardIteratorType="TRIM_HORIZON",
)
shard_iterator = response["ShardIterator"]

encrypred_response = kinesis.get_records(ShardIterator=shard_iterator)
decrypred_response = crypto_kinesis.get_records(ShardIterator=shard_iterator)

assert len(encrypred_response["Records"]) == 1
assert data != encrypred_response["Records"][0]["Data"]

assert len(decrypred_response["Records"]) == 1
assert data == decrypred_response["Records"][0]["Data"]

Mongodb

Sqlalchemy