DynamoDB CDC to CrateDB using Kinesis¶
Introduction¶
DynamoDB Streams captures a time-ordered sequence of item-level modification in any DynamoDB table and stores this information in a log for up to 24 hours.
Applications can access this log and view the data items as they appeared before and after they were modified, in near-real time.
About¶
A change data capture (CDC) pipeline made of a DynamoDB egress CDC processor, sinking data into the CrateDB OLAP database, using Kinesis.
Kinesis Data Streams captures item-level modifications in any DynamoDB table and replicates them to a Kinesis data stream.
What’s Inside¶
Completely on AWS’ premises, there is a process which relays CDC data from a DynamoDB table to a Kinesis stream, configured using AWS’ APIs.
On a compute-environment of your choice, supporting Python, a traditional KCL v2 application subscribes to the Kinesis stream, in order to receive published CDC opslog messages.
On the egress side, the application re-materializes the items of the operations log into any database with SQLAlchemy support.
Setup¶
Create a database table in DynamoDB, and enable a Kinesis Stream on its operations log.
This section reflects configuration settings stored in dynamodb_cdc_processor.properties.
We recommend to run through the setup procedure of Kinesis Streams with KCLv2 beforehand, because it conveys relevant setup instructions about IAM policies, which are obligatory to permit Kinesis access to DynamoDB for storing a “lease table”.
DynamoDB Table¶
# Optionally, drop the table.
aws dynamodb delete-table \
--table-name table-testdrive
# Create table (DDL).
# - It defines a composite primary key.
# - "device" is the partition key
# - "timestamp" is the sort key
# - It does not define auxiliary field names,
# they can be added dynamically.
aws dynamodb create-table \
--table-name table-testdrive \
--key-schema \
AttributeName=device,KeyType=HASH \
AttributeName=timestamp,KeyType=RANGE \
--attribute-definitions \
AttributeName=device,AttributeType=S \
AttributeName=timestamp,AttributeType=S \
--provisioned-throughput \
ReadCapacityUnits=1,WriteCapacityUnits=1 \
--table-class STANDARD
# Display all table names on DynamoDB.
aws dynamodb list-tables
# Check table status.
aws dynamodb describe-table --table-name table-testdrive | grep TableStatus
CrateDB Table¶
The destination table name in CrateDB is currently hard-coded. Please use
this command to create the transactions table, where the CDC record
processor will re-materialize CDC events into.
crash -c "CREATE TABLE transactions (data OBJECT(DYNAMIC));"
Kinesis Stream¶
Capture DynamoDB table operations and relay them to a Kinesis stream.
# Create a Kinesis Data Stream.
aws kinesis create-stream --stream-name dynamodb-cdc --shard-count 4
# Check that the Kinesis stream is active.
aws kinesis describe-stream --stream-name dynamodb-cdc
# Enable Kinesis streaming on the DynamoDB table.
# Replace the `stream-arn` value with the one returned by
# `describe-stream` in the previous step.
STREAM_ARN=$(aws kinesis describe-stream --stream-name dynamodb-cdc | jq -r .StreamDescription.StreamARN)
aws dynamodb enable-kinesis-streaming-destination \
--table-name table-testdrive \
--stream-arn "${STREAM_ARN}" \
--enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
# Check if Kinesis streaming is active on the table.
aws dynamodb describe-kinesis-streaming-destination --table-name table-testdrive
Note that you need to re-run the linking procedure after dropping and re-creating the DynamoDB table.
aws kinesis list-streams
aws kinesis delete-stream --stream-name dynamodb-cdc --enforce-consumer-deletion
KCL Stream Processor¶
Acquire sources and initialize sandbox.
git clone https://github.com/daq-tools/lorrystream
cd lorrystream
python3 -m venv .venv
source .venv/bin/activate
pip install --editable='.[carabas]'
Install dependencies, mainly the [amazon-kclpy] package.
cd lorrystream/spike/kcl_dynamodb
pip install wheel
pip install --verbose -r requirements.txt
Usage¶
You will need multiple terminal windows. Within both of them, activate the virtualenv on the top-level directory. Then, navigate to the playground directory, and seed AWS credentials.
source .venv/bin/activate
cd lorrystream/spike/kcl_dynamodb
export AWS_ACCESS_KEY=...
export AWS_SECRET_ACCESS_KEY=...
Launch the stream processor, subscribing to the DynamoDB CDC operations feed over a Kinesis stream.
sh launch.sh dynamodb_cdc_processor.properties
Watch actions of the CDC processor.
tail -F dynamodb_cdc_processor.log
Insert record into database table.
READING_SQL="{'timestamp': '2024-07-12T01:17:42', 'device': 'foo', 'temperature': 42.42, 'humidity': 84.84}"
aws dynamodb execute-statement --statement \
"INSERT INTO \"table-testdrive\" VALUE ${READING_SQL};"
Query database table.
aws dynamodb execute-statement --statement \
"SELECT * FROM \"table-testdrive\";"
Run UPDATE and DELETE statements, in order to sample the two other DML operations.
aws dynamodb execute-statement --statement \
"UPDATE \"table-testdrive\" SET temperature=55.55 WHERE \"device\"='foo' AND \"timestamp\"='2024-07-12T01:17:42';"
aws dynamodb execute-statement --statement \
"DELETE FROM \"table-testdrive\" WHERE \"device\"='foo' AND \"timestamp\"='2024-07-12T01:17:42';"
Alternative for displaying table contents.
aws dynamodb scan --table-name table-testdrive
Software Tests¶
pytest
Appendix¶
DynamoDB data types¶
The following is a complete list of DynamoDB data type descriptors:
S – String
N – Number
B – Binary
BOOL – Boolean
NULL – Null
M – Map
L – List
SS – String Set
NS – Number Set
BS – Binary Set
Opslog processor samples¶
01:25:17.632 [dynamodb_cdc_processor] INFO process_record - {"awsRegion":"us-east-1","eventID":"b015b5f0-c095-4b50-8ad0-4279aa3d88c6","eventName":"INSERT","userIdentity":null,"recordFormat":"application/json","tableName":"table-testdrive","dynamodb":{"ApproximateCreationDateTime":1720740233012995,"Keys":{"device":{"S":"qux"},"timestamp":{"S":"2024-07-12T01:17:42"}},"NewImage":{"humidity":{"N":"84.84"},"temperature":{"N":"42.42"},"device":{"S":"qux"},"timestamp":{"S":"2024-07-12T01:17:42"}},"SizeBytes":99,"ApproximateCreationDateTimePrecision":"MICROSECOND"},"eventSource":"aws:dynamodb"}
01:58:22.371 [dynamodb_cdc_processor] INFO process_record - {"awsRegion":"us-east-1","eventID":"24757579-ebfd-480a-956d-a1287d2ef707","eventName":"MODIFY","userIdentity":null,"recordFormat":"application/json","tableName":"table-testdrive","dynamodb":{"ApproximateCreationDateTime":1720742302233719,"Keys":{"device":{"S":"foo"},"timestamp":{"S":"2024-07-12T01:17:42"}},"NewImage":{"humidity":{"N":"84.84"},"temperature":{"N":"55.66"},"device":{"S":"foo"},"timestamp":{"S":"2024-07-12T01:17:42"}},"OldImage":{"humidity":{"N":"84.84"},"temperature":{"N":"42.42"},"device":{"S":"foo"},"timestamp":{"S":"2024-07-12T01:17:42"}},"SizeBytes":161,"ApproximateCreationDateTimePrecision":"MICROSECOND"},"eventSource":"aws:dynamodb"}
01:58:42.510 [dynamodb_cdc_processor] INFO process_record - {"awsRegion":"us-east-1","eventID":"ff4e68ab-0820-4a0c-80b2-38753e8e00e5","eventName":"REMOVE","userIdentity":null,"recordFormat":"application/json","tableName":"table-testdrive","dynamodb":{"ApproximateCreationDateTime":1720742321848352,"Keys":{"device":{"S":"foo"},"timestamp":{"S":"2024-07-12T01:17:42"}},"OldImage":{"humidity":{"N":"84.84"},"temperature":{"N":"55.66"},"device":{"S":"foo"},"timestamp":{"S":"2024-07-12T01:17:42"}},"SizeBytes":99,"ApproximateCreationDateTimePrecision":"MICROSECOND"},"eventSource":"aws:dynamodb"}
Documentation¶
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/kds_gettingstarted.html
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-2.html
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/streamsmain.html
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_CreateTable.html
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ql-reference.update.html
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/streamsmain.html