Kinesis Streams with KCLv2

About

A stream processor component using the Kinesis Client Library (KCL). It is written in Python, and uses the amazon-kclpy Python SDK for KCL (GitHub).

What’s Inside

  • Publishing and subscribing to Kinesis streams, using Python.

Setup

Create a Kinesis stream, and set up a Python sandbox for connecting to it using KCL v2.

This section reflects configuration settings stored in record_processor.properties.

AWS

Configure a Kinesis Stream, and an IAM policy, because Kinesis needs to create and maintain a “leases table” stored in DynamoDB, so it requires corresponding permissions to do so.

  • Create a Kinesis stream called testdrive-stream, per Kinesis Console.

  • Create an IAM Policy and User, applying the permissions outlined on this page. Two example ARN IDs, that address relevant resources in Kinesis and DynamoDB, are:

    arn:aws:kinesis:us-east-1:841394475918:stream/testdrive-stream
    arn:aws:dynamodb:us-east-1:841394475918:table/stream-demo
    
  • The leases table in DynamoDB will be automatically created when the first stream consumer (the KCL application) becomes active.

KCL Stream Processor

Acquire sources and initialize sandbox.

git clone https://github.com/daq-tools/lorrystream
cd lorrystream
python3 -m venv .venv
source .venv/bin/activate
pip install --editable='.[carabas]'

Install dependencies, mainly the amazon-kclpy package.

cd lorrystream/spike/kcl_kinesis
pip install wheel
pip install --verbose -r requirements.txt

Note that the first installation of the amazon-kclpy package on your machine will take a while, because it will download a bunch of JAR files, defined by a traditional pom.xml recipe, before embedding them into the Python package.

On subsequent installations, as long as you don’t switch versions, that package will install from your local package cache, so it will be much faster.

Alternative: Use ready-made wheel package. Note to self: Need to provide this to the colleagues.

pip install ./dist/amazon_kclpy-2.1.5-py3-none-any.whl

Usage

You will need multiple terminal windows. Within both of them, activate the virtualenv on the top-level directory. Then, navigate to the playground directory, and seed AWS credentials.

source .venv/bin/activate
cd lorrystream/spike/kcl_kinesis
export AWS_ACCESS_KEY=...
export AWS_SECRET_ACCESS_KEY=...

Launch the stream processor, subscribing to the stream.

$(sh launch.sh record_processor.properties)

Watch actions of the record processor.

tail -F record_processor.log

Publish a demo message to the stream.

python publish.py

Documentation

Resources