Latest blog entries
Real-time data processing in Python - technology evaluation
In this blog post, we’ll take a look at the state (heh) of stateful stream processing in Python. Data stream processing is useful in a wide range of applications, such as Internet of Things solutions, where low latency and high-throughput processing is required. Turns out that Python APIs of different stream processing technologies are in varying levels of readiness, so it pays to take a moment and look at the differences between top candidates (personal opinion!).
Here, we’ll implement the same stream processing workflow with multiple open-source technologies, using Python, and make notes of the developer experience. If your time from data to actionable insights is too long, keep reading 😎
You’ll find all the code in our repository.
Concepts
Let’s first introduce some concepts. If you’re already experienced in stream processing, feel free to skim or skip these.
Stream processing is a method of processing data as it is produced, allowing for real-time analysis and decision-making. It takes data as it flows and immediately starts working with it.
Stream processing often complements batch processing workflows where a group of data is processed at once. While stream processors tend to run continuously, batch jobs are commonly executed on schedule. You’d typically use batch processing for things like training machine learning models or computing analytics that are not time-critical. Stream processing is used to feed dashboards with fresh data, run machine learning models with real-time features and detect anomalies as they happen.
Stateful stream processing, then, is needed when the processing of a data stream requires the maintenance of information (the state) between events. This state is used to make decisions and perform computations based on the entire history or a window of events seen in the data stream, rather than just individual events. Stateful stream processing allows for more comprehensive analysis of data streams, leading to improved decision-making and real-time processing capabilities.
There are some additional technicalities explained nicely in the Flink blog.
When evaluating streaming technologies, throughput and latency often come up. Latency means the time it takes the stream processing engine to start processing a new event. Throughput measures the capability to execute the event processing itself. Characteristically, batch processing achieves high throughput but has high latency. Stream processing achieves low latency but also lower throughput, since fault tolerance and other housekeeping is commonly done on an event-by-event basis instead of batching data, which may impact performance.
Stateful processing of real-time data streams
The stateful streaming open-source scene has long been dominated by Java-based technologies, like Apache Spark and Apache Flink and recently Apache Kafka Streams. However, there’s a contender in town; Bytewax, a Python framework based on the Rust library Timely Dataflow. Each Python API of these tools takes a different approach to constructing streams and connecting them to data sources and sinks. Let’s have a look!
We’ll implement the following stateful stream processing flow:
Simulate multiple sensors that emit a number each second. The value emulates a count of events recorded by the sensors.
The sensor data is sent to a Kafka cluster, a distributed event streaming platform.
Stream processing technologies consume the Kafka stream as input.
The stream processor keeps track of the events from each sensor in a time window (the state).
In each time window (a Tumbling Window), the values from each sensor are added together.
The stream processor outputs the sum of values for each sensor, for every time window.
But wait. That sounds rather straightforward. Implementing such a data flow can indeed be quite simple. The complexities arise when you think about questions such as
Event sourcing
Fault tolerance and data recovery
Scalability and load balancing
Real-time processing and low latency
Support for event ordering and exactly-once guarantees (read more in the Flink documentation)
Handling of late/out-of-order events
Manageability and monitoring
Support for multiple data sources and data formats…
You wouldn’t want to implement all this from the ground up, if you can find a suitable existing solution. For example, in our data flow the tumbling window, which sums values from each sensor in a given time frame, sounds simple. Behind the scenes though, the stream processing software does things like waits for events that arrive late, checkpoints the current state so that if the system has an error it can recover and manages timestamp comparisons for us.
So, existing solutions do exist, both open-source and commercially available. The open-source technologies we’ll use this time to implement our stream processing are:
Spark
Flink
Bytewax
These all have out-of-the-box Kafka input support for ingesting our sensor data.
Let’s go!
Spark
🥇Documentation and versatility
Apache Spark is the giant of distributed, big data computing. It’s efficient and reliable even under heavy computation. It's always a spark when you run a Spark job (credit to ChatGPT for the punchline)! While Spark originally focused on batch computations on large-scale data, it also has a stream processing engine. Well, a few actually, read on..
We use the Spark Structured Streaming, which is the second generation stream engine. It replaces the older “Spark Streaming” engine. Under the hood, structured streaming uses micro-batches. Nothing wrong with that, it gets the job done very well. Micro-batching guarantees that the data processing flow achieves exactly-once processing. However, the spark team is working on a continuous processing model, which can bring processing latency down from 100ms to 1ms, but the cost is at-least-once processing promises. So you’ll trade computation time for exactness. Choose carefully!
Structured Streaming has a mature Python API and it seems all functionality of the Java / Scala interfaces is available. The documentation is extensive and there are plenty of examples to get going. We cannot ignore the CamelCase in function names though, it is not our preference for Python code.
Installing is easy, just use pip install pyspark
. This will give you the spark-submit
binary we need. Python up to 3.11 is supported, hooray! The cluster (or your local machines) needs Java 8 or later. To run our stream processing code, we need to instruct Spark to link additional packages for the Kafka integration. This is done by defining necessary packages when submitting the spark job. We run the stream processing with
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1 iot.py
In this command, iot.py
contains our stream processing code. You should make sure that the package matches both your Spark and Scala versions. Here, 2.12 is my Scala version, and 3.3.1 is the Spark version. You find these packages from the Maven repository, for example. Just open the correct looking artifact and stare at the page until you come up with the correct way of forming the package name. Happy browsing :)
There are a few implementation details which stand out when compared to our other stateful streaming technologies. When we’re reading events from Kafka to Spark DataFrames, a schema is needed. This can be specified in a simple string or in a more elaborate manner. The other frameworks don’t need to know the schema.
With regards to windowing our data for the stateful computations, Spark is the only framework that requires user-specified time stamps in the streamed data. The other technologies use system / cluster time or possibly the Kafka (the input source) cluster’s time to figure out time windows by default.
Beef of the Spark implementation:
# This is the raw Kafka payload with Kafka metadata attached
stream_df = (
spark.readStream.format("kafka")
.option("startingOffsets", "latest")
.option("kafka.bootstrap.servers", addr)
.option("subscribe", topic)
.load()
).select(
# Here we make a new DataFrame with just the JSON payload in one field
from_json(col("value").cast("string"), "id INT, time DOUBLE, value INT").alias(
data_col
)
).select(
# Unpack the JSON, transform event time to correct format
col(f"{data_col}.id"),
col(f"{data_col}.value"),
to_timestamp(from_unixtime(col(f"{data_col}.time"))).alias(ts_col),
)
# Tumbling window and grouping
# Without the watermark we cannot use the append mode, only update and complete
windowed_sum_df = (
stream_df.withWatermark(ts_col, f"{win} seconds").groupBy(
window(ts_col, f"{win} seconds"), stream_df.id
)
).sum("value")
On the one hand, the code may feel a bit verbose due to the SQL and table / DataFrame semantics. On the other hand, you may find the table semantics familiar and much to your liking. Overall, writing stream processing with Spark is nice, thanks to great docs and extensive functionality.
Flink
🥇Stream-native, extensive features
Apache Flink has been designed for stream processing from the start. Fault tolerance, state management and forever-running processing are its bread and butter. There is a batch processing mode as well, but the Flink approach is that batch is a special case of streaming.
We use PyFlink, Flink’s Python API. Flink is Java software, so the Java API gets the latest features first. There also is a now deprecated Scala API. Flink documentation is very in-depth and the interested reader will learn a lot about streaming and the computer science behind it. In the docs, you’ll occasionally run into a “not yet implemented in the Python API” message, but for the most part even advanced functionality is available in the Python API.
With Flink, you get to choose how you define your stream processing logic - either with the “Table API” which resembles SQL (and Spark), or with the “DataStream API”. There is also a “DataSet API”, which is deprecated. All APIs can be used for batch or stream processing, and the Python interface is available for all APIs. The DataStream API gives the lowest level control.
Installation happens with pip install apache-flink
. The documentation states that Python is supported only up to 3.8, which is a bummer. I was able to run my stream processing with Python 3.9 but it’s hard to say if all functionality would work as intended. Installing on 3.10 fails at building the pemja
library. Java 11 or later is required to run Flink.
After installation you’ll need to fetch some jar-files to enable the Kafka integration, namely “flink-connector-kafka
” and “kafka-clients
”. You need to specify the path to these jars either in code or alternatively place the files in pyflink’s lib directory. As I ran these examples locally, I placed the jars in the library directory. The path would be …/python/site-packages/pyflink/lib/
. When you deploy in the cloud it’s probably better to give the jar path in the code or as a parameter. Again, the Maven repository is your friend for finding the jar packages. Now you’re ready to ingest Kafka events!
Writing PyFlink code with the DataStream API feels very powerful, you get a lot done with simple operations:
# Kafka source will run in unbounded mode by default
source = (
KafkaSource.builder()
.set_bootstrap_servers(addr)
.set_topics(topic)
.set_group_id("iot-flink")
.set_starting_offsets(KafkaOffsetsInitializer.latest())
.set_value_only_deserializer(SimpleStringSchema(charset="utf-8"))
.build()
)
(
# Ingest from our Kafka source
env.from_source(
source,
WatermarkStrategy.no_watermarks(),
"Kafka Source",
)
# Read the json-strings into dictionaries
.map(json.loads)
# Assign watermarks here, since we can access the event time from our
# json-data. Watermarks are needed for event-time windowing below.
.assign_timestamps_and_watermarks(
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(1))
# Don't wait too long for idle data sources
.with_idleness(Duration.of_millis(500))
.with_timestamp_assigner(SensorTimestampAssigner())
)
# Map each dictionary into (key, value) tuples
.map(lambda x: (x["id"], x["value"]))
# Group by unique keys
.key_by(lambda x: x[0])
# ...and collect each key into their Tumbling windows
.window(TumblingEventTimeWindows.of(Time.seconds(win)))
# ...which are reduced to the sum of value-fields inside each window
.reduce(lambda x, y: (x[0], x[1] + y[1])).print()
)
At times you’ll peek into the Java code to figure out necessary configuration methods, but the documentation is comprehensive overall. You’re going to be creating a lot of objects for configuration purposes and often initializing them with factory functions, which may not always result in the most readable code. You’ll also frequently subclass those configuration objects to override specific functionality. The end result of your stream definition may not be the most pythonic, but there is a certain clarity to the structure, and it’s easy to understand the processing logic with a glance of the code.
Bytewax
🥇Development experience, clarity, productivity
Bytewax is a relatively new project. The library is free and open source with a permissive license, while the development is backed by Bytewax the company. The project is structured so that it’s easy to develop stream processing flows locally and then deploy them to a Kubernetes cluster, using a tool called waxctl. Bytewax is based on a Rust engine that implements the timely dataflow computational model, giving Bytewax its distributed and parallelizable data processing capabilities.
What intrigues us about Bytewax is that it is a Python-first framework for stream processing. Given the momentum Python has in the data and machine learning community, this can potentially be a huge asset. All features are designed with Python in mind, rather than as wrappers to Java methods. Though, it must be noted that a fair bit of Bytewax development involves writing Rust. If that puts you off, too bad. In the 2022 Stack Overflow developer survey, Rust was loved by 87% percent of developers, topping the list for the seventh year in a row! For comparison, Java scored 46% and Python 67%. So the momentum is with Bytewax here :)
Installing is easy, just pip install bytewax
. The necessary Rust components will be installed as executables containing all necessary dependencies, so you don’t need any extra runtimes on your machine. Neat! For now, Python 3.10 is the latest supported version, but 3.11 should be available very soon.
Writing Bytewax stream processing code is very efficient. You add processing steps to the flow in sequential order, much like defining neural networks in Keras (which became hugely popular due to its great usability). The resulting code is very clean and we liked how intuitive it is to configure each step of the flow. Out of the technologies we show in this blog post, we spent the least amount of time configuring the Kafka connector and event time evaluation with Bytewax, thanks to the thought-out APIs.
# Initialize a flow
flow = Dataflow()
# Input is our Kafka stream
flow.input(
"input",
KafkaInputConfig(brokers=[addr], topic=topic, tail=True, starting_offset="end"),
)
# Extract dictionaries from JSON messages
flow.map(deserialize)
# Extract (key, value) pairs with the data we want to operate on as the
# value
flow.map(lambda x: (x[0], x[1]))
# reduce each key according to our reducer function, bytewax will pass only
# the values to the reduce function. Since we output dicts from the
# previous map, we need to output dicts from the reducer
flow.reduce_window(
"sum", clock_config, window_config, lambda x, y: {"value": x["value"] + y["value"]}
)
flow.capture(StdOutputConfig())
It must be said that feature-wise, Bytewax is still in the active development phase. For example different ways of time-based windowing are on the roadmap, while Spark and Flink have them implemented. The documentation is great though and there are plenty of examples and interesting blog posts with real-world use cases implemented in Bytewax. Their community Slack is also an excellent source for more information. Overall, we're excited about this new piece of technology!
Conclusions
We introduced three great open-source libraries with Python APIs, and showed how to implement a timely stateful stream processing flow on each one. When choosing one for your real-time processing needs, consider
Features your application requires (e.g. types of time windows, latency)
Documentation
Developer experience, maintainability, productivity
In follow-up posts, we’ll take a look at the deployment experience. All of the above technologies can be run in a Kubernetes cluster for scalable processing, which we will describe. We may also add some performance comparisons. Moreover, there are features like user interfaces and monitoring the processing clusters, which we did not discuss yet. So stay tuned, there’s a lot to cover 😅
Remember to check out the code here!
If you are curious to see an example of stream processing solutions applied in an enterprise-grade software products, go check out our YOKOT.AI Private Enterprise Generative AI solution.
And as always, please leave a note if you'd like to talk data, cloud or software architecture: