Dataflow & Stream Processing
Dataflow programming is a paradigm that models a program as a directed graph of data flowing between operations. Stream processing applies this model to unbounded, continuous streams of data, treating data not as a static collection but as a series of events in motion. It's the foundation for real-time analytics, large-scale event processing, and reactive systems.
"The world is a stream of events. Why should our data architecture be any different?" — Jay Kreps
Core ideas
- Streams: Unbounded, time-ordered sequences of immutable event records.
- Operators: Functions that transform streams. Common operators include
map
,filter
,join
, andaggregate
. - Directed Acyclic Graph (DAG): A pipeline is modeled as a DAG where streams are edges and operators are vertices.
- Windowing: Grouping infinite stream elements into finite "windows" based on time or count, allowing for aggregations over bounded sets.
- State Management: Many stream operations (like windowing or joins) are stateful. The system must provide fault-tolerant state storage.
- Time Semantics: Distinguishing between Event Time (when the event occurred at the source) and Processing Time (when the event is observed by the processing engine).
Examples
The examples show a simple, conceptual windowed aggregation. Real-world stream processors like Flink or Kafka Streams provide robust, fault-tolerant implementations of these concepts.
- Python (Conceptual)
- Go (Conceptual)
- Node.js (Streams)
conceptual_pipeline.py
# Conceptual example of windowed aggregation
from collections import defaultdict
def windowed_sum(events, window_seconds):
"""
Aggregates event values into time-based windows.
`events` is an iterator of dictionaries like {'ts': 1672531205, 'key': 'A', 'value': 10}
"""
buckets = defaultdict(int)
for event in events:
# Integer division creates discrete time buckets
bucket_timestamp = (event["ts"] // window_seconds) * window_seconds
key = (event["key"], bucket_timestamp)
buckets[key] += event["value"]
return buckets
# Example usage:
# stream = get_event_stream()
# aggregated_windows = windowed_sum(stream, 300) # 5-minute windows
conceptual_pipeline.go
package main
import "fmt"
type Event struct {
Timestamp int64
Key string
Value int
}
// WindowedSum aggregates events into time-based windows.
func WindowedSum(events <-chan Event, windowSeconds int64) map[string]int {
buckets := make(map[string]int)
for e := range events {
bucketTimestamp := (e.Timestamp / windowSeconds) * windowSeconds
key := fmt.Sprintf("%s:%d", e.Key, bucketTimestamp)
buckets[key] += e.Value
}
return buckets
}
func main() {
// In a real app, events would come from a source like Kafka.
eventChan := make(chan Event, 2)
eventChan <- Event{1672531205, "A", 10}
eventChan <- Event{1672531215, "A", 5}
close(eventChan)
results := WindowedSum(eventChan, 300)
fmt.Println(results) // map[A:1672531200:15]
}
pipeline.mjs
import { Transform } from 'node:stream';
// A transform stream to perform windowed aggregation.
export function createWindowedAggregator(windowMs) {
const buckets = new Map(); // Stores aggregated values for each window
return new Transform({
objectMode: true,
transform(event, _encoding, callback) {
// { ts: 1672531205, key: 'A', value: 10 }
const bucketTimestamp = Math.floor(event.ts / windowMs) * windowMs;
const bucketKey = `${event.key}:${bucketTimestamp}`;
const currentSum = buckets.get(bucketKey) || 0;
buckets.set(bucketKey, currentSum + event.value);
// In a real system, you'd emit based on watermarks or timers.
// For simplicity, we emit the updated aggregate immediately.
this.push({ bucketKey, sum: buckets.get(bucketKey) });
callback();
},
});
}
When to Use vs. When to Reconsider
When to Use
- Real-time monitoring and analytics: Dashboards, alerting systems, and anomaly detection that require insights with sub-second latency.
- Event-driven architectures: As the processing engine that reacts to, transforms, and enriches event streams from services.
- Large-scale data transformation (ETL): For continuous, low-latency ETL jobs, replacing traditional batch processing.
When to Reconsider
- Batch-oriented, request-response workloads: If you only need to process data on a fixed schedule (e.g., nightly reports), traditional batch tools may be simpler.
- Small-scale or simple tasks: The operational overhead of a distributed stream processing engine can be substantial. Simpler message queues or scripts might suffice.
- Systems requiring strong transactional updates: While some engines offer exactly-once semantics, coordinating transactions across a stream and an external database is highly complex.
Operational Considerations
State & Fault Tolerance
Stateful operators must checkpoint their state to durable storage. In case of failure, the operator can restore its state and resume processing, ensuring correctness.
Backpressure
A fast source can overwhelm a slow operator. A proper stream processing system provides automatic backpressure, where downstream operators signal upstream to slow down.
Schema Evolution
Data schemas change. Use a schema registry (like Avro or Protobuf) to manage schema evolution in a compatible way, preventing pipeline failures.
Reprocessing & Replays
You may need to reprocess data if your business logic changes. Design your system to easily replay events from a specific point in time from your message broker.
Security Considerations
Data Encryption
Ensure data is encrypted both in transit (e.g., using TLS for streams between nodes and brokers) and at rest (e.g., for state backends and durable logs).
Authentication & Authorization
Processing pipelines should authenticate with data sources and sinks. Use mechanisms like mTLS, OAuth, or SASL. Enforce authorization to control which components can read from or write to specific streams.
Data Privacy and Governance
For sensitive data, implement data masking or tokenization within your stream operators. Be aware of data residency requirements and ensure your processing topology complies with them.
Observability
Metrics
Monitor key performance indicators (KPIs) like message throughput, latency (end-to-end, processing time), consumer lag, and watermark progress. Expose these via a monitoring system like Prometheus.
Logging
Implement structured logging within operators. Include correlation IDs to trace a single event or transaction as it flows through the entire DAG.
Tracing
Use distributed tracing to visualize the complete lifecycle of an event across multiple operators and services, helping to pinpoint bottlenecks and errors.
Design Review Checklist
- Is the distinction between Event Time and Processing Time understood and handled correctly?
- Is the windowing strategy (tumbling, sliding, session) appropriate for the business problem?
- How is late-arriving data handled (e.g., allowed lateness, side outputs)?
- Are all stateful operations configured with fault-tolerant state backends?
- Is the end-to-end delivery guarantee (at-least-once vs. exactly-once) defined and implemented correctly?
- Is there a strategy for monitoring pipeline health, including consumer lag and watermarks?