Skip to main content

Repositories

Abstract persistence logic using the repository pattern.

TL;DR

Repository: Abstraction for persisting and retrieving aggregates. Looks like in-memory collection: repo.save(order), repo.get(order_id). Hides database details. One repository per aggregate root. Repositories know about aggregates, not entities or value objects. Avoid anemic repositories (just CRUD). Implement rich queries (find orders by status, date, etc.). Use in-memory repositories for testing; swap with real DB in production. Never leak SQL/database abstractions to domain logic.

Learning Objectives

  • Understand repository pattern and its purpose
  • Implement repositories for aggregates
  • Design rich query methods
  • Use repositories in application services
  • Test repositories with in-memory implementations
  • Avoid N+1 queries
  • Implement efficient pagination
  • Choose between repository and query object patterns

Motivating Scenario

Code calls SQL directly in service: SELECT * FROM orders WHERE status = 'pending' AND created_at > date-30days. Tight coupling to database. Hard to test (need database). Query changes = service changes. Switch to repository: repo.find_pending_orders_last_30_days(). Service doesn't know SQL. Test with in-memory repo. Change query = only repo changes.

Core Concepts

Repository vs. DAO vs. Query Objects

PatternScopeMethodUse Case
RepositoryAggregatesave(agg), get(id), find_by(spec)DDD, rich domain
DAOEntityinsert(), update(), delete(), select()Anemic domains
Query ObjectComplex queriesget_high_value_customers()CQRS, read models

One Repository Per Aggregate

Aggregate: Order
├── Entity: OrderItem (part of aggregate, not retrieved separately)
├── Value Object: Money (part of aggregate)
└── Repository: OrderRepository (manages Order aggregate)

NOT:
├── OrderRepository (for Order)
├── OrderItemRepository (NO! OrderItem is part of aggregate)
├── MoneyRepository (NO! Value object, not persisted separately)

Query Methods

# ✓ GOOD: Domain language
repo.find_pending_orders()
repo.find_orders_by_customer(customer_id)
repo.find_orders_over_amount(1000)
repo.find_high_value_customers(min_lifetime_value=10000)

# ❌ BAD: SQL-like
repo.find(status='pending')
repo.find_where({'status': 'pending', 'created_at': '2024-01-01'})
repo.execute_sql("SELECT * FROM orders WHERE ...")

Implementation

from abc import ABC, abstractmethod
from typing import List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta

# Domain models
@dataclass(frozen=True)
class OrderId:
value: str

@dataclass(frozen=True)
class CustomerId:
value: str

@dataclass(frozen=True)
class Money:
amount: float
currency: str

class Order:
def __init__(self, order_id: OrderId, customer_id: CustomerId,
items: List['OrderItem'], total: Money,
status: str, created_at: datetime):
self.order_id = order_id
self.customer_id = customer_id
self.items = items
self.total = total
self.status = status
self.created_at = created_at

def is_pending(self):
return self.status == 'pending'

def confirm(self):
self.status = 'confirmed'

def cancel(self):
self.status = 'cancelled'

@dataclass
class OrderItem:
sku: str
quantity: int
price: Money

# Repository abstraction (interface)
class OrderRepository(ABC):
@abstractmethod
def save(self, order: Order) -> None:
"""Save order to persistence"""
pass

@abstractmethod
def get(self, order_id: OrderId) -> Optional[Order]:
"""Get order by ID"""
pass

@abstractmethod
def find_pending_orders(self) -> List[Order]:
"""Find all pending orders"""
pass

@abstractmethod
def find_by_customer(self, customer_id: CustomerId) -> List[Order]:
"""Find orders by customer"""
pass

@abstractmethod
def find_orders_over_amount(self, amount: float) -> List[Order]:
"""Find orders exceeding amount"""
pass

@abstractmethod
def find_recent_orders(self, days: int) -> List[Order]:
"""Find orders from last N days"""
pass

# In-memory implementation (for testing)
class InMemoryOrderRepository(OrderRepository):
def __init__(self):
self.orders = {}

def save(self, order: Order) -> None:
self.orders[order.order_id.value] = order

def get(self, order_id: OrderId) -> Optional[Order]:
return self.orders.get(order_id.value)

def find_pending_orders(self) -> List[Order]:
return [o for o in self.orders.values() if o.is_pending()]

def find_by_customer(self, customer_id: CustomerId) -> List[Order]:
return [o for o in self.orders.values()
if o.customer_id == customer_id]

def find_orders_over_amount(self, amount: float) -> List[Order]:
return [o for o in self.orders.values()
if o.total.amount > amount]

def find_recent_orders(self, days: int) -> List[Order]:
cutoff = datetime.now() - timedelta(days=days)
return [o for o in self.orders.values()
if o.created_at > cutoff]

# SQL implementation (production)
class SQLOrderRepository(OrderRepository):
def __init__(self, db_connection):
self.db = db_connection

def save(self, order: Order) -> None:
"""Save order to SQL database"""
# Insert or update order
sql = """
INSERT INTO orders (id, customer_id, status, total, created_at)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT(id) DO UPDATE SET
status=EXCLUDED.status, total=EXCLUDED.total
"""
self.db.execute(sql, [
order.order_id.value,
order.customer_id.value,
order.status,
order.total.amount,
order.created_at
])

# Insert order items
for item in order.items:
item_sql = """
INSERT INTO order_items (order_id, sku, quantity, price)
VALUES (%s, %s, %s, %s)
"""
self.db.execute(item_sql, [
order.order_id.value,
item.sku,
item.quantity,
item.price.amount
])

def get(self, order_id: OrderId) -> Optional[Order]:
"""Reconstruct aggregate from database"""
sql = "SELECT * FROM orders WHERE id = %s"
order_row = self.db.fetchone(sql, [order_id.value])

if not order_row:
return None

# Load items
items_sql = "SELECT * FROM order_items WHERE order_id = %s"
items_rows = self.db.fetchall(items_sql, [order_id.value])

items = [OrderItem(
sku=row['sku'],
quantity=row['quantity'],
price=Money(row['price'], 'USD')
) for row in items_rows]

return Order(
order_id=OrderId(order_row['id']),
customer_id=CustomerId(order_row['customer_id']),
items=items,
total=Money(order_row['total'], 'USD'),
status=order_row['status'],
created_at=order_row['created_at']
)

def find_pending_orders(self) -> List[Order]:
"""Query domain logic: pending orders"""
sql = "SELECT id FROM orders WHERE status = 'pending'"
rows = self.db.fetchall(sql, [])

return [self.get(OrderId(row['id'])) for row in rows]

def find_by_customer(self, customer_id: CustomerId) -> List[Order]:
sql = "SELECT id FROM orders WHERE customer_id = %s"
rows = self.db.fetchall(sql, [customer_id.value])

return [self.get(OrderId(row['id'])) for row in rows]

def find_orders_over_amount(self, amount: float) -> List[Order]:
sql = "SELECT id FROM orders WHERE total > %s"
rows = self.db.fetchall(sql, [amount])

return [self.get(OrderId(row['id'])) for row in rows]

def find_recent_orders(self, days: int) -> List[Order]:
sql = "SELECT id FROM orders WHERE created_at > NOW() - INTERVAL %s DAY"
rows = self.db.fetchall(sql, [days])

return [self.get(OrderId(row['id'])) for row in rows]

# Application service (uses repository abstraction)
class ConfirmOrderService:
def __init__(self, order_repo: OrderRepository):
self.order_repo = order_repo

def execute(self, order_id: OrderId):
"""Confirm a pending order"""
# Fetch aggregate
order = self.order_repo.get(order_id)
if not order:
raise OrderNotFound()

# Business logic
if not order.is_pending():
raise OrderAlreadyConfirmed()

order.confirm()

# Persist
self.order_repo.save(order)

# Testable service
class TestConfirmOrder:
def test_confirm_pending_order(self):
# Use in-memory repository for testing
repo = InMemoryOrderRepository()

# Setup
order = Order(
order_id=OrderId('123'),
customer_id=CustomerId('cust-1'),
items=[OrderItem('SKU-1', 1, Money(99.99, 'USD'))],
total=Money(99.99, 'USD'),
status='pending',
created_at=datetime.now()
)
repo.save(order)

# Execute
service = ConfirmOrderService(repo)
service.execute(OrderId('123'))

# Verify
confirmed_order = repo.get(OrderId('123'))
assert confirmed_order.status == 'confirmed'

# Pagination support
class OrderRepository(ABC):
@abstractmethod
def find_pending_orders_paginated(self, page: int, page_size: int) -> dict:
"""Find pending orders with pagination"""
pass

class SQLOrderRepository(OrderRepository):
def find_pending_orders_paginated(self, page: int, page_size: int) -> dict:
offset = (page - 1) * page_size

# Count total
count_sql = "SELECT COUNT(*) as total FROM orders WHERE status = 'pending'"
total = self.db.fetchone(count_sql, [])['total']

# Fetch page
sql = """
SELECT id FROM orders WHERE status = 'pending'
ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""
rows = self.db.fetchall(sql, [page_size, offset])

orders = [self.get(OrderId(row['id'])) for row in rows]

return {
'orders': orders,
'total': total,
'page': page,
'page_size': page_size,
'total_pages': (total + page_size - 1) // page_size
}

Real-World Examples

Example 1: Rich Query Methods

Instead of:

repo.find({'status': 'pending', 'created_at': {'$gt': yesterday}})

Do:

repo.find_pending_orders_created_after(days=1)

Benefits:

  • Domain language (readable)
  • Testable (can mock)
  • Implementation hidden (SQL → MongoDB → cache)

Example 2: Specification Pattern

class OrderSpecification:
def is_satisfied_by(self, order):
"""Predicate for filtering"""
pass

class PendingOrdersSpecification(OrderSpecification):
def is_satisfied_by(self, order):
return order.status == 'pending'

# Use in repository
repo.find_by_spec(PendingOrdersSpecification())

Common Mistakes

Mistake 1: Leaking Database Details

# ❌ BAD: SQL leaks to service
orders = repo.find_where({'status': 'pending'})

# ✓ GOOD: Domain language
orders = repo.find_pending_orders()

Mistake 2: Repositories for Everything

# ❌ BAD: Repository for each entity
UserRepository, AddressRepository, PhoneRepository

# ✓ GOOD: One repository per aggregate
UserRepository (manages User + Address + Phone)

Mistake 3: N+1 Queries

# ❌ BAD: Separate query per item
orders = repo.find_all() # 1 query
for order in orders:
items = repo.find_items(order.id) # N queries

# ✓ GOOD: Aggregate loading
orders = repo.find_all_with_items() # 1 query with JOIN

Design Checklist

  • One repository per aggregate root?
  • Rich query methods (domain language)?
  • In-memory implementation for testing?
  • No N+1 queries?
  • Pagination support?
  • Transaction management?
  • Error handling clear?
  • Database details hidden?
  • Application services use abstraction?
  • No entity repositories?
  • Specifications implemented?
  • Performance optimized (indexes, eager loading)?

Next Steps

  1. Identify aggregate roots
  2. Define repository interface
  3. Implement in-memory version
  4. Write repository tests
  5. Implement SQL version
  6. Optimize queries (N+1, indexes)
  7. Add pagination support

References

Advanced Repository Patterns

Specification Pattern

Complex queries can be represented as specifications:

class Specification(ABC):
@abstractmethod
def is_satisfied_by(self, item):
pass

class PendingOrdersOlderThan(Specification):
def __init__(self, days):
self.days = days

def is_satisfied_by(self, order):
cutoff = datetime.now() - timedelta(days=self.days)
return order.status == 'pending' and order.created_at < cutoff

class HighValueOrdersFrom(Specification):
def __init__(self, customer_id, min_value):
self.customer_id = customer_id
self.min_value = min_value

def is_satisfied_by(self, order):
return order.customer_id == self.customer_id and order.total.amount > self.min_value

# Composite specification
class AndSpecification(Specification):
def __init__(self, left, right):
self.left = left
self.right = right

def is_satisfied_by(self, item):
return self.left.is_satisfied_by(item) and self.right.is_satisfied_by(item)

# Usage
spec = AndSpecification(
PendingOrdersOlderThan(7),
HighValueOrdersFrom('cust-1', 1000)
)

orders = repo.find_by_spec(spec)
# Returns: pending orders > 7 days old AND from customer AND > $1000

Query Objects

Separate read-heavy queries from repositories:

class OrderQueries:
def __init__(self, db):
self.db = db

def revenue_by_date(self, start_date, end_date):
"""Report: Revenue by date"""
sql = """
SELECT DATE(created_at), SUM(total)
FROM orders
WHERE created_at BETWEEN %s AND %s
GROUP BY DATE(created_at)
"""
return self.db.execute(sql, [start_date, end_date])

def top_customers(self, limit=10):
"""Report: Top customers by revenue"""
sql = """
SELECT customer_id, SUM(total) as revenue
FROM orders
GROUP BY customer_id
ORDER BY revenue DESC
LIMIT %s
"""
return self.db.execute(sql, [limit])

def orders_by_status(self, status):
"""Report: Count orders by status"""
sql = "SELECT COUNT(*) FROM orders WHERE status = %s"
return self.db.execute(sql, [status])

# Usage
queries = OrderQueries(db)
revenue = queries.revenue_by_date(date(2024,1,1), date(2024,12,31))
top_10 = queries.top_customers(10)

CQRS (Command Query Responsibility Segregation)

Separate write (command) and read (query) models:

# Commands (writes)
class CreateOrderCommand:
def __init__(self, customer_id, items, total):
self.customer_id = customer_id
self.items = items
self.total = total

class CreateOrderCommandHandler:
def __init__(self, order_repo, event_bus):
self.order_repo = order_repo
self.event_bus = event_bus

def handle(self, command):
order = Order.create(command.customer_id, command.items, command.total)
self.order_repo.save(order)

# Publish event for read model update
self.event_bus.publish(OrderCreatedEvent(order))

return order.order_id

# Queries (reads) - separate read model
class OrderReadModel:
"""Optimized for reading, denormalized"""
def __init__(self, db):
self.db = db

def get_orders_by_customer(self, customer_id):
# Simple query, no joins
sql = "SELECT * FROM orders_read WHERE customer_id = %s"
return self.db.execute(sql, [customer_id])

def get_customer_summary(self, customer_id):
# Pre-aggregated data
sql = "SELECT * FROM customer_summary WHERE id = %s"
return self.db.execute(sql, [customer_id])

# When OrderCreatedEvent published:
# - Update orders_read table
# - Update customer_summary aggregates

Benefits:

  • Write model optimized for consistency
  • Read model optimized for queries
  • Independent scaling

Pagination Patterns

Handle large result sets efficiently:

class OrderRepository:
def find_all_paginated(self, page=1, page_size=20):
offset = (page - 1) * page_size

# Get total count
count_sql = "SELECT COUNT(*) FROM orders"
total = self.db.execute(count_sql)

# Get page
sql = "SELECT * FROM orders LIMIT %s OFFSET %s"
items = self.db.execute(sql, [page_size, offset])

return {
'items': items,
'total': total,
'page': page,
'page_size': page_size,
'pages': (total + page_size - 1) // page_size
}

def find_all_keyset_pagination(self, last_id=None, limit=20):
"""Keyset pagination (more efficient for large tables)"""
sql = "SELECT * FROM orders WHERE id > %s LIMIT %s"
items = self.db.execute(sql, [last_id or 0, limit + 1])

has_more = len(items) > limit
items = items[:limit]
last_item_id = items[-1].id if items else None

return {
'items': items,
'has_more': has_more,
'next_cursor': last_item_id
}

Performance Optimization

N+1 Query Prevention

Eager Loading:

# ❌ N+1 problem
orders = repo.find_all() # 1 query
for order in orders:
for item in order.items: # N queries!
print(item.sku)

# ✅ Eager loading with joins
sql = """
SELECT o.*, oi.* FROM orders o
JOIN order_items oi ON o.id = oi.order_id
"""
orders = self._reconstruct_aggregates(sql)

Caching Layers

class CachedOrderRepository:
def __init__(self, db_repo, cache):
self.db_repo = db_repo
self.cache = cache

def get(self, order_id):
# Try cache first
cached = self.cache.get(f'order:{order_id}')
if cached:
return cached

# Fall through to DB
order = self.db_repo.get(order_id)

# Cache for 1 hour
self.cache.set(f'order:{order_id}', order, ttl=3600)

return order

def save(self, order):
self.db_repo.save(order)
# Invalidate cache
self.cache.delete(f'order:{order.order_id}')

Conclusion

Repositories are powerful abstraction for persistence:

  • Service doesn't know database details
  • Easy to test (in-memory implementation)
  • Easy to change (SQL → NoSQL)
  • Rich domain language (find_pending_orders)

Key principles:

  • One repository per aggregate root
  • No N+1 queries (eager load)
  • Specifications for complex queries
  • Separate read-heavy queries (Query Objects)

Don't overuse: Simple CRUD apps don't need repositories.