Skip to main content

Pipes & Filters / Pipeline Architecture

Chain independent processing stages with data flowing through pipes

TL;DR

Pipes & Filters architecture chains independent processing stages (filters) connected by data streams (pipes). Each filter takes input, transforms it, and passes output to the next filter. Inspired by Unix philosophy (cat | grep | sort). Excellent for ETL, data processing, and streaming workloads. Simplicity is its strength; ordering is its constraint.

Learning Objectives

  • Understand filters (processing stages) and pipes (data streams)
  • Design reusable, composable filters
  • Implement linear data pipelines
  • Handle data format compatibility between filters
  • Know when to use vs when pipelines become unwieldy

Motivating Scenario

Your analytics system ingests user event logs, processes them through multiple stages: parse JSON → filter invalid events → enrich with user data → aggregate by hour → send to warehouse. Each stage is independent: parsing doesn't need to know about enrichment, aggregation doesn't care how filtering works. Pipes & Filters matches this naturally: each stage is a filter; data flows through pipes.

Core Concepts

Pipes & Filters follows the Unix philosophy: small, focused programs chained together.

Filter: A processing stage that reads input, transforms it, writes output. Single responsibility.

Pipe: A connection between filters. Carries data in a specific format (JSON, CSV, objects).

Pipes and Filters architecture

Key Characteristics

Linear Flow: Data flows in one direction through the pipeline.

Independent Filters: Each filter knows nothing about others; can test, deploy, scale independently.

Composability: Add, remove, or reorder filters without modifying existing ones.

Streaming: Can process unbounded data streams, not just batch.

Simplicity: Easy to understand, debug, and reason about.

Practical Example

# Pipeline using Python generators (pipes are just data flow)

def parse_events(raw_data):
"""Filter: Parse JSON events."""
for line in raw_data:
try:
yield json.loads(line)
except json.JSONDecodeError:
continue # Skip invalid JSON

def validate_events(events):
"""Filter: Validate required fields."""
for event in events:
if 'user_id' in event and 'action' in event and 'timestamp' in event:
yield event

def enrich_events(events, user_db):
"""Filter: Enrich with user data."""
for event in events:
user = user_db.get(event['user_id'], {})
event['user_segment'] = user.get('segment', 'unknown')
event['user_tier'] = user.get('tier', 'free')
yield event

def aggregate_events(events):
"""Filter: Aggregate by hour."""
hourly_buckets = {}
for event in events:
hour_key = event['timestamp'][:13] # YYYY-MM-DD HH
if hour_key not in hourly_buckets:
hourly_buckets[hour_key] = {'count': 0, 'actions': {}}
hourly_buckets[hour_key]['count'] += 1
action = event['action']
hourly_buckets[hour_key]['actions'][action] = \
hourly_buckets[hour_key]['actions'].get(action, 0) + 1

yield hourly_buckets[hour_key]

def format_output(events):
"""Filter: Format as JSON for warehouse."""
for event in events:
yield json.dumps(event)

# Compose the pipeline
def run_pipeline(raw_data, user_db):
parsed = parse_events(raw_data)
validated = validate_events(parsed)
enriched = enrich_events(validated, user_db)
aggregated = aggregate_events(enriched)
formatted = format_output(aggregated)
return formatted

# Usage
with open('events.jsonl') as f:
user_db = load_user_database()
results = run_pipeline(f, user_db)
for line in results:
warehouse.insert(json.loads(line))

When to Use / When Not to Use

Use Pipes & Filters When:
  1. Processing linear data streams (logs, events, files)
  2. Transformation stages are independent and reusable
  3. ETL (Extract, Transform, Load) pipelines
  4. Each stage has a single, well-defined responsibility
  5. Want simplicity and ease of understanding
  6. Can express workflow as a sequence of transforms
Avoid Pipes & Filters When:
  1. Need branching or conditional logic (if-then-else)
  2. Different stages have complex interdependencies
  3. Require bidirectional or non-linear data flow
  4. Multiple inputs or outputs per stage
  5. Need dynamic pipeline structure at runtime

Patterns and Pitfalls

Patterns and Pitfalls

One slow filter delays the entire pipeline. E.g., network call in enrichment filter. Profile each filter. Parallelize slow filters. Use connection pooling, caching.
Filter A outputs objects, Filter B expects JSON strings. Intermediate transformation needed. Define clear format contracts between filters. Document input/output schema.
Filter maintains state (counter, cache) that leaks into next record's output. Each filter should be stateless per record. Use explicit state only if necessary and document.
Design filters that work with any input matching the interface. Generic filters (Parse, Filter, Map, Reduce). Compose them for specific use cases.
Invalid data in pipeline. Choose: skip, log, or route to dead letter. Explicit error handling in each filter. Dead letter queues for unparseable data.
Process splits into multiple paths (e.g., valid vs invalid events). Use a Splitter filter that yields to multiple outputs. Or fan-out to parallel pipelines.

Design Review Checklist

  • Is each filter focused on a single transformation?
  • Can filters be tested in isolation with mock input/output?
  • Is the data format consistent between filters?
  • Are filters stateless (or is state explicitly documented)?
  • Can you reorder filters without breaking the pipeline?
  • Is error handling clear (skip, log, fail fast)?
  • Are there performance bottlenecks (profile filters)?
  • Is the pipeline ordering intuitive (matches logical flow)?
  • Can new filters be added without modifying existing ones?
  • Is the pipeline testable end-to-end with realistic data?

Self-Check

  1. What's the main benefit of pipes & filters architecture? Simplicity, reusability, and composability. Each filter is independent; easy to test, modify, and reuse in different pipelines.
  2. When would branching break the pipes & filters model? If you need different paths (valid vs invalid events), you're not in pure linear flow anymore. Consider event-driven for conditional routing.
  3. How do you handle a filter that's a bottleneck? Profile it. If it's I/O bound (network call), use connection pooling or caching. If CPU bound, parallelize or optimize algorithm.
info

One Takeaway: Pipes & Filters is the Unix philosophy applied to system architecture. Use it for linear, data-intensive workflows. When logic becomes conditional or branching, consider event-driven architecture.

Next Steps

  • Stream Processing Frameworks: Kafka Streams, Apache Flink, Beam
  • Event-Driven Architecture: When you need non-linear, conditional workflows
  • Parallelization Strategies: Process multiple pipelines concurrently
  • Monitoring Data Pipelines: Lag, throughput, error rates
  • Backpressure Handling: Managing fast producers and slow consumers

References

  • Richards, M., & Ford, N. (2020). Fundamentals of Software Architecture. O'Reilly. ↗️
  • McIlroy, M.D. et al. (1978). Unix Time-Sharing System: The UNIX System. Bell Labs ↗️
  • Kreps, J. (2014). The Log: What every software engineer should know about real-time data. LinkedIn Engineering ↗️