Backlog¶
Iteration 1¶
[x] Data model:
ChannelandPacket[x] Make basic example with MQTT and CrateDB work
[x] Docs: Command line vs. library use
[x] Rename to LorryStream
[x] Add software tests
Iteration 2¶
[x] Docs: Improve README
[o] Use stdout as sink when <sink> argument is omitted
[o] Provide Docker Compose file for running auxiliary services
[o] Sink: SQLite
[o] Docs: Provide full example using
curl: MQTT to SQLite[o] Make essential parameters configurable
[o] Examples: Make more example programs work
[o] Examples: Add example to manipulate data
[o] Add more transformers
[o] Provide replacement for
amqp-to-mqtthttps://testcontainers-python.readthedocs.io/en/latest/rabbitmq/README.html
amqp-consume --queue=secpi-on_off catecho '{"action": "shutdown"}' | amqp-publish --routing-key=secpi-op-m
[o] Provide replacement for
PutsReq[o] Examples: Add
appsinkexample[o] Improve inline docs
[o] Release 0.1.0
[o] Excel & ODF: https://github.com/dimastbk/python-calamine
Iteration 3¶
[o] Polars also offers streaming and sinking.
[o] Docs
[o] Docs:
curl[o] Development: Code reloading
[o] Fallback-based content decoder, Kotori-style; see
funcy[o] “First sample” hook to hand over to graphing subsystem
[o] TLS to brokers
[o] Formats: JSON, NDJSON, CSV, Muon
[o] How to inspect the queue and flush all remaining items?
on shutdown
periodically
[o] Custom decoder in Python and JavaScript
[o] Source: Kafka. – https://streamz.readthedocs.io/en/latest/api.html#streamz.from_kafka_batched
[o] How to provide custom DDL statements, in order to account for sharding and partitioning?
[o] How to invoke on/with Dask Distributed?
[o] Tap: Decode the topic and use corresponding details within the database addressing scheme, see “Kotori DAQ topology strategies”: https://github.com/daq-tools/kotori/tree/main/kotori/daq/strategy
[o] More __future__ examples for README
[o] Re-use data loaders from other frameworks
[o] Source: Google Cloud PubSub (see testcontainers)
[o] Source: Network / ZMQ / ZeroMQ
[o] Launch multiple channels per config file
[o] Grafana subsystem example
[o] Source/Sink: MySQL and S3
[o] Sink: Enable running data into multiple sinks at the same time
[o] Logo
[o] Jupyter notebook examples
[o] Audit log
[o] Adapters to Airflow and Flink?
Iteration 4¶
[o] Async decoder?
[o] Emit with metadata
[o] Source: Filesystem, using
Stream.from_textfile()[o] Source: Periodic
[o] Sink: to_mqtt, to_websocket, to_kafka, to_textfile
[o]
--describepipeline[o] Decoder and transformer subsystem
[o] Run with dask-distributed
[o] Source: How to capture streams from different CDC interfaces?
[o] Stream data from Linux subsystems
Unix sockets: https://github.com/Kixunil/ws-unix-framed-bridge
Linux IIO
[o] Source: Redis, Apache IoTDB
[o] Docs: https://github.com/jackersson/gst-python-tutorials
[o] Source: https://gstreamer.freedesktop.org/documentation/soup/souphttpsrc.html
[o] Source:
universal_pathlib[o] Bus: AMQP 1.0 / STOMP (over WebSocket)
[o] Bus: NATS
[o] Integrate with Tinybird - https://github.com/localstack/verdin
https://www.kubeflow.org/docs/components/pipelines/v2/components/
XML via JsonML? - https://en.wikipedia.org/wiki/JsonML - https://github.com/stleary/JSON-java/blob/master/src/main/java/org/json/JSONML.java - http://www.jsonml.org/ - https://github.com/sasano8/jsonml - https://github.com/sasano8/jsonast