kstreams is a library/micro framework to use with kafka. It has simple kafka streams implementation that gives certain guarantees, see below.
Documentation: https://kpn.github.io/kstreams/
pip install kstreamsYou will need a worker, we recommend aiorun
pip install aiorunimport aiorun
from kstreams import create_engine, ConsumerRecord
stream_engine = create_engine(title="my-stream-engine")
@stream_engine.stream("local--kstream")
async def consume(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
async def produce():
payload = b'{"message": "Hello world!"}'
for i in range(5):
metadata = await stream_engine.send("local--kstreams", value=payload)
print(f"Message sent: {metadata}")
async def start():
await stream_engine.start()
await produce()
async def shutdown(loop):
await stream_engine.stop()
if __name__ == "__main__":
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)- Produce events
- Consumer events with
Streams - Subscribe to topics by
pattern -
Prometheusmetrics and custom monitoring - TestClient
- Custom Serialization and Deserialization
- Easy to integrate with any
asyncframework. No tied to any library!! - Yield events from streams
- Opentelemetry Instrumentation
- Middlewares
- Hooks (on_startup, on_stop, after_startup, after_stop)
- Store (kafka streams pattern)
- Stream Join
- Windowing
This repo requires the use of poetry instead of pip.
Note: If you want to have the virtualenv in the same path as the project first you should run poetry config --local virtualenvs.in-project true
To install the dependencies just execute:
poetry installThen you can activate the virtualenv with
poetry shellRun test:
./scripts/testRun code formatting with ruff:
./scripts/formatWe use conventional commits for the commit message.
The use of commitizen is recommended. Commitizen is part of the dev dependencies.
cz commit