Domain Events
Record important business occurrences that others should know about
TL;DR
Domain Events are immutable, time-stamped records of important business occurrences that something significant happened in the domain. Aggregates emit events when their state changes meaningfully (OrderConfirmed, PaymentProcessed, CustomerOnboarded). Other aggregates, services, and bounded contexts subscribe to these events and react asynchronously, enabling loose coupling and eventual consistency. Events decouple services, enable event sourcing for audit trails and temporal queries, and make workflows explicit. Use events instead of direct service calls to coordinate across aggregates and bounded contexts.
Learning Objectives
- Model domain events using business language
- Emit events from aggregates when significant changes occur
- Implement asynchronous event handlers with eventual consistency
- Distinguish domain events from technical infrastructure events
- Use events for inter-aggregate and inter-context communication
- Handle event versioning for long-term evolution
- Implement event sourcing patterns
Motivating Scenario
You're building an order management system. When an order is confirmed, multiple things must happen: payment must be processed, inventory must be reserved, shipping must be arranged, the customer must be notified. If you implement this with direct service calls from Order to Payment, Inventory, Shipping, and Notification services, you create tight coupling. Every change to the order process requires modifying the Order aggregate or orchestration service. Instead, the Order aggregate emits an OrderConfirmed event. Multiple services independently subscribe to this event and react: Payment Service charges the customer, Inventory Service reserves stock, Shipping Service arranges logistics, Notification Service sends confirmation emails. Each service is decoupled. If Notification Service goes down, orders still get confirmed. New services can be added later without changing the Order aggregate.
Core Concepts
What Makes a Good Domain Event:
- Named in past tense: OrderConfirmed, PaymentProcessed, CustomerOnboarded (not "ConfirmOrder")
- Domain-significant: Represents something the business cares about
- Immutable: Once created, never changes
- Self-contained: Contains all data needed by subscribers
- Versioned: Includes version number for evolution
Practical Example
- Event Definition
- Emitting Events
- Event Handlers
from dataclasses import dataclass
from datetime import datetime
from uuid import UUID
@dataclass(frozen=True) # Immutable
class OrderConfirmed:
"""Domain Event: Order was confirmed and payment authorized.
Published when a customer's order is confirmed.
Multiple services react to this event independently.
"""
event_id: UUID
event_version: int = 1 # For future evolution
occurred_at: datetime = None
# Domain data
order_id: UUID
customer_id: UUID
order_total: float
currency: str = "USD"
def __post_init__(self):
if self.occurred_at is None:
object.__setattr__(self, 'occurred_at', datetime.utcnow())
@dataclass(frozen=True)
class OrderShipped:
"""Domain Event: Order was shipped and is in transit."""
event_id: UUID
event_version: int = 1
occurred_at: datetime = None
order_id: UUID
carrier_name: str
tracking_number: str
estimated_delivery: datetime
def __post_init__(self):
if self.occurred_at is None:
object.__setattr__(self, 'occurred_at', datetime.utcnow())
@dataclass(frozen=True)
class OrderCancelled:
"""Domain Event: Order was cancelled before shipment."""
event_id: UUID
event_version: int = 1
occurred_at: datetime = None
order_id: UUID
reason: str
refund_amount: float
def __post_init__(self):
if self.occurred_at is None:
object.__setattr__(self, 'occurred_at', datetime.utcnow())
from typing import List
class Order(AggregateRoot):
"""Order aggregate that publishes domain events."""
def __init__(self, order_id: UUID, customer_id: UUID):
self.order_id = order_id
self.customer_id = customer_id
self.items: List[OrderItem] = []
self.status = OrderStatus.DRAFT
self.total = 0.0
self.currency = "USD"
self.domain_events: List[DomainEvent] = [] # Collect events
def add_item(self, product_id: UUID, quantity: int, price: float):
"""Add item to order."""
if self.status != OrderStatus.DRAFT:
raise OrderCannotBeModified(f"Cannot add items to {self.status} order")
self.items.append(OrderItem(product_id, quantity, price))
def confirm_with_payment(self, total: float, carrier: str = None):
"""Confirm order after payment authorization.
Emits OrderConfirmed event.
"""
if self.status != OrderStatus.DRAFT:
raise OrderAlreadyConfirmed()
if not self.items:
raise OrderMustHaveItems()
# Apply state change
self.status = OrderStatus.CONFIRMED
self.total = total
# Record what happened
self.domain_events.append(OrderConfirmed(
event_id=uuid4(),
order_id=self.order_id,
customer_id=self.customer_id,
order_total=total,
currency=self.currency
))
def ship(self, tracking_number: str, carrier: str):
"""Ship the order.
Emits OrderShipped event.
"""
if self.status != OrderStatus.CONFIRMED:
raise CannotShipUnconfirmedOrder()
self.status = OrderStatus.SHIPPED
self.domain_events.append(OrderShipped(
event_id=uuid4(),
order_id=self.order_id,
carrier_name=carrier,
tracking_number=tracking_number,
estimated_delivery=datetime.utcnow() + timedelta(days=3)
))
def cancel(self, reason: str):
"""Cancel the order.
Emits OrderCancelled event.
"""
if self.status in [OrderStatus.SHIPPED, OrderStatus.DELIVERED]:
raise CannotCancelShippedOrder()
self.status = OrderStatus.CANCELLED
self.domain_events.append(OrderCancelled(
event_id=uuid4(),
order_id=self.order_id,
reason=reason,
refund_amount=self.total
))
def get_uncommitted_events(self) -> List[DomainEvent]:
"""Return events that haven't been published yet."""
return self.domain_events.copy()
def clear_uncommitted_events(self):
"""Clear events after they've been published."""
self.domain_events.clear()
from abc import ABC, abstractmethod
class EventHandler(ABC):
"""Base class for event handlers."""
@abstractmethod
def handle(self, event: DomainEvent):
pass
class PaymentEventHandler(EventHandler):
"""Reacts to order events by charging customer."""
def __init__(self, payment_service: PaymentService):
self.payment_service = payment_service
def handle(self, event: OrderConfirmed):
"""When order is confirmed, charge the customer."""
# Payment service is called asynchronously
try:
payment = self.payment_service.charge_card(
customer_id=event.customer_id,
amount=event.order_total,
currency=event.currency,
order_id=event.order_id
)
# Payment service emits its own PaymentProcessed event
# which Order Service listens to
except PaymentFailedError:
# Payment failed—OrderService listens for PaymentFailed event
# and cancels the order
self.payment_service.emit(PaymentFailed(
order_id=event.order_id,
reason="Card declined"
))
class InventoryEventHandler(EventHandler):
"""Reacts to order events by reserving stock."""
def __init__(self, inventory_service: InventoryService):
self.inventory_service = inventory_service
def handle(self, event: OrderConfirmed):
"""When order is confirmed, reserve items from inventory."""
# This happens asynchronously—inventory might take time
for item in event.items: # Note: items would be in event
self.inventory_service.reserve_stock(
product_id=item.product_id,
quantity=item.quantity,
order_id=event.order_id
)
class NotificationEventHandler(EventHandler):
"""Reacts to order events by notifying customer."""
def __init__(self, email_service: EmailService):
self.email_service = email_service
def handle(self, event: OrderConfirmed):
"""When order is confirmed, send confirmation email."""
self.email_service.send_order_confirmation(
customer_id=event.customer_id,
order_id=event.order_id,
total=event.order_total
)
def handle(self, event: OrderShipped):
"""When order is shipped, send tracking email."""
self.email_service.send_shipment_notification(
customer_id=event.customer_id,
tracking_number=event.tracking_number,
carrier=event.carrier_name
)
class OrderService:
"""Application service that orchestrates order operations."""
def __init__(self,
order_repo: OrderRepository,
event_bus: EventBus,
payment_service: PaymentService):
self.order_repo = order_repo
self.event_bus = event_bus
self.payment_service = payment_service
def confirm_order(self, order_id: UUID, payment_token: str):
"""Confirm an order and publish events."""
# Load order
order = self.order_repo.get_by_id(order_id)
# Process payment synchronously (critical path)
payment_result = self.payment_service.authorize(
payment_token, order.total
)
if not payment_result.success:
raise PaymentAuthorizationFailed()
# Update order state
order.confirm_with_payment(order.total)
# Save order
self.order_repo.save(order)
# Publish events—subscribers react asynchronously
for event in order.get_uncommitted_events():
self.event_bus.publish(event)
order.clear_uncommitted_events()
Event-Driven Order Flow
Timeline:
--------
T0: Customer clicks "Confirm Order"
└─ POST /orders/{id}/confirm
└─ OrderService.confirm_order()
T0+10ms: OrderConfirmed event published
├─ PaymentService (subscribes)
├─ InventoryService (subscribes)
├─ ShippingService (subscribes)
└─ NotificationService (subscribes)
T0+50ms: Payment authorized (PaymentService processes)
└─ Emits PaymentProcessed event
T0+100ms: Inventory reserved (InventoryService processes)
└─ Emits InventoryReserved event
T0+150ms: Shipping arranged (ShippingService processes)
└─ Emits ShippingArranged event
T0+200ms: Confirmation email sent (NotificationService processes)
└─ Response to user: "Order confirmed"
Key properties:
- Order confirmed immediately (not waiting for all side effects)
- Each service handles its responsibility independently
- If a service fails, doesn't block order confirmation
- New services can be added by subscribing to OrderConfirmed
Patterns and Pitfalls
Patterns and Pitfalls
Emitting infrastructure-level events like UserLoggedIn, RowInserted, DatabaseConnected. These are not domain events—they don't express business significance.
Fix: Emit only business-meaningful events: OrderConfirmed, PaymentProcessed, CustomerOnboarded. Events should make sense to domain experts.
Subscribing to events and handling them synchronously, blocking the original operation. Creates tight temporal coupling and slow responses.
Fix: Publish events asynchronously to a message queue. Handlers consume and process independently. Let the aggregate operation complete immediately.
Event handlers that call external APIs or modify shared state. If handler fails, inconsistency results.
Fix: Handlers should be idempotent. Same event processed twice should produce same result. Handle retries gracefully.
Emitting events for every small change: ItemAdded, PriceChanged, TaxCalculated. Creates noise and makes workflows hard to follow.
Fix: Emit events only for business-significant changes. ItemAdded is fine if significant. TaxCalculated is probably too fine-grained.
Store events as the immutable source of truth. Aggregate state is derived by replaying events. Provides complete audit trail and temporal queries.
How to Use: Event store = source of truth. Snapshots = performance optimization for large event logs. Replay from snapshots to reconstruct state.
Saga is a process that listens to events, makes decisions, and emits commands to coordinate changes across multiple aggregates. Useful for workflows that span multiple aggregates.
How to Use: OrderSaga listens to PaymentFailed event, emits CancelOrder command. Orchestrates multi-aggregate workflows without tight coupling.
Include version in events to support evolution. Old versions can be migrated to new formats. Consumers can handle multiple versions.
How to Use: OrderConfirmed.version = 2. Include upconversion logic to handle old event format.
Messages that fail processing are sent to a dead letter queue for investigation and manual handling.
How to Use: Failed event processing goes to DLQ. Monitor DLQ. Investigate and replay when fixed.
Design Review Checklist
- Are events named using past tense (Confirmed, Processed, Delivered)?
- Do events represent domain-significant business occurrences?
- Are events immutable (frozen/final classes)?
- Do aggregates emit events when state changes significantly?
- Can event handlers be added without modifying aggregates?
- Are event handlers idempotent (same event processed twice = same result)?
- Are events consumed asynchronously via message queue?
- Can events be processed out of order without breaking invariants?
- Is the event bus/message broker tested?
- Can you trace important business workflows through events?
- Are events versioned for long-term evolution?
- Is there a strategy for handling failed event processing?
Self-Check
-
What makes a good domain event? Records something important that happened in the business. Other parts of the system care about it. Expressed in ubiquitous language (not technical jargon). Examples: OrderConfirmed, PaymentProcessed, InventoryReserved.
-
Should event handlers process sequentially or in parallel? Handlers can process in parallel if they don't depend on each other. If handler B depends on handler A completing, use Saga pattern to coordinate. Generally, prefer parallel processing with eventual consistency.
-
How do you handle event versioning? Include version field in event. Subscribers can handle multiple versions. Upconvert old versions to new format. Example: OrderConfirmed.version = 2 supports both old and new subscribers.
-
What if an event handler fails? Retry with exponential backoff. If still fails after retries, send to dead letter queue. Monitor DLQ. Investigate and replay when fixed. Handler should be idempotent so retries are safe.
-
When should you use Event Sourcing? When you need complete audit trail, temporal queries ("what was the inventory on this date?"), or debugging ("replay the order process to find the bug"). Not all systems need event sourcing—use when value is clear.
One Takeaway: Domain Events decouple aggregates and services, enable event sourcing, and make workflows explicit. Emit events for important business occurrences in past tense. Let handlers react independently and asynchronously. This creates resilient, scalable, and auditable systems.
Next Steps
- Event Sourcing: Dive deeper into storing events as source of truth
- Sagas: Learn long-running transaction coordination with events
- Message Brokers: Understand Kafka, RabbitMQ, and other event platforms
- CQRS: See how events enable read model updates in CQRS patterns
References
- Evans, E. (2003). Domain-Driven Design. Addison-Wesley.
- Fowler, M. (2005). Event Sourcing ↗️
- Young, G. (2010). CQRS Documents ↗️
- Richardson, C. (2018). Microservices Patterns. Manning.