Kinesis Streams with KCLv2¶
About¶
A stream processor component using the Kinesis Client Library (KCL). It is written in Python, and uses the amazon-kclpy Python SDK for KCL (GitHub).
What’s Inside¶
Publishing and subscribing to Kinesis streams, using Python.
Setup¶
Create a Kinesis stream, and set up a Python sandbox for connecting to it using KCL v2.
This section reflects configuration settings stored in record_processor.properties.
AWS¶
Configure a Kinesis Stream, and an IAM policy, because Kinesis needs to create and maintain a “leases table” stored in DynamoDB, so it requires corresponding permissions to do so.
Create a Kinesis stream called
testdrive-stream, per Kinesis Console.Create an IAM Policy and User, applying the permissions outlined on this page. Two example ARN IDs, that address relevant resources in Kinesis and DynamoDB, are:
arn:aws:kinesis:us-east-1:841394475918:stream/testdrive-stream arn:aws:dynamodb:us-east-1:841394475918:table/stream-demo
The leases table in DynamoDB will be automatically created when the first stream consumer (the KCL application) becomes active.
KCL Stream Processor¶
Acquire sources and initialize sandbox.
git clone https://github.com/daq-tools/lorrystream
cd lorrystream
python3 -m venv .venv
source .venv/bin/activate
pip install --editable='.[carabas]'
Install dependencies, mainly the amazon-kclpy package.
cd lorrystream/spike/kcl_kinesis
pip install wheel
pip install --verbose -r requirements.txt
Note that the first installation of the amazon-kclpy package on your machine will take a while, because it will download a bunch of JAR files, defined by a traditional pom.xml recipe, before embedding them into the Python package.
On subsequent installations, as long as you don’t switch versions, that package will install from your local package cache, so it will be much faster.
Alternative: Use ready-made wheel package. Note to self: Need to provide this to the colleagues.
pip install ./dist/amazon_kclpy-2.1.5-py3-none-any.whl
Usage¶
You will need multiple terminal windows. Within both of them, activate the virtualenv on the top-level directory. Then, navigate to the playground directory, and seed AWS credentials.
source .venv/bin/activate
cd lorrystream/spike/kcl_kinesis
export AWS_ACCESS_KEY=...
export AWS_SECRET_ACCESS_KEY=...
Launch the stream processor, subscribing to the stream.
$(sh launch.sh record_processor.properties)
Watch actions of the record processor.
tail -F record_processor.log
Publish a demo message to the stream.
python publish.py