Skip to main content

Search Offloading & Aggregations

Use specialized systems for search and analytics

TL;DR

Databases excel at transactional operations but struggle with complex searches (full-text, facets) and analytics (massive aggregations). Offload to specialized systems: Elasticsearch/Solr for search, BigQuery/Snowflake for analytics. Primary database (PostgreSQL, MySQL) handles transactional operations; specialized systems handle search/analytics via denormalized copies. Synchronize via ETL pipelines or CDC (Change Data Capture). This separation provides optimal performance and scaling for each workload.

Learning Objectives

By the end of this article, you will understand:

  • Why specialized systems outperform general-purpose databases for specific tasks
  • Search engine architecture and use cases
  • Data warehouse design for analytics
  • Denormalization strategies for different workloads
  • Synchronization patterns (ETL, CDC, streaming)
  • Trade-offs between consistency and freshness in offloading
  • Operational considerations and monitoring

Motivating Scenario

Your e-commerce platform has 10 million products in PostgreSQL. Users expect to search by keywords, filter by price/category (facets), and see results in < 500ms. A full-text search query on PostgreSQL takes 3+ seconds—too slow. Separately, the finance team needs daily analytics: revenue by region, top products, customer segments. These aggregations scan billions of rows—prohibitively expensive to run on the transactional database. Solution: PostgreSQL for CRUD operations; Elasticsearch for search; BigQuery for analytics.

Core Concepts

Three Database Types

OLTP (Online Transactional Processing):

  • Primary database (PostgreSQL, MySQL)
  • Optimized for: single row lookups, fast inserts/updates
  • Normalized schema (reduce redundancy)
  • Strong consistency
  • Example: Order creation, user profile updates

Search:

  • Elasticsearch, Solr, Meilisearch
  • Optimized for: full-text search, facets, relevance ranking
  • Inverted indexes (fast keyword search)
  • Denormalized data (single document per searchable entity)
  • Example: "blue shoes under $100" → milliseconds

OLAP (Online Analytical Processing):

  • Data warehouse (BigQuery, Snowflake, Redshift)
  • Optimized for: complex aggregations across billions of rows
  • Star schema (fact tables + dimension tables)
  • Columnstore compression (compress similar data together)
  • Eventually consistent (freshness minutes to hours)
  • Example: Revenue by region/product/month
Data Flow: OLTP to Specialized Systems

Denormalization Strategies

Denormalization duplicates data for performance. But copy different fields in different systems:

OLTP (Normalized):

users (id, name, email)
addresses (id, user_id, street, city)
orders (id, user_id, total)

Search (Denormalized):

{
"product_id": 123,
"name": "Blue Running Shoes",
"description": "...",
"price": 89.99,
"category": "shoes",
"category_name": "Shoes", // Denormalized from categories table
"brand": "Nike", // Denormalized from brands table
"tags": ["running", "nike", "blue"]
}

Analytics (Star Schema):

Fact: orders (order_id, user_id, product_id, date, amount, quantity)
Dim: users (user_id, country, signup_date)
Dim: products (product_id, category, brand)
Dim: dates (date, month, quarter, year)

Practical Example

Building Synchronized OLTP + Search + Analytics

import psycopg2
from elasticsearch import Elasticsearch
import json
from datetime import datetime

class DataSyncPipeline:
def __init__(self, pg_conn_str, es_hosts):
self.pg_conn = psycopg2.connect(pg_conn_str)
self.es = Elasticsearch(es_hosts)

def sync_products_to_search(self):
"""ETL: PostgreSQL → Elasticsearch"""
cursor = self.pg_conn.cursor()
cursor.execute("""
SELECT p.id, p.name, p.description, p.price,
c.name as category_name, b.name as brand_name,
ARRAY_AGG(t.tag) as tags
FROM products p
LEFT JOIN categories c ON p.category_id = c.id
LEFT JOIN brands b ON p.brand_id = b.id
LEFT JOIN product_tags pt ON p.id = pt.product_id
LEFT JOIN tags t ON pt.tag_id = t.id
GROUP BY p.id, p.name, c.name, b.name
""")

bulk_actions = []
for product in cursor.fetchall():
product_id, name, description, price, category, brand, tags = product

doc = {
"product_id": product_id,
"name": name,
"description": description,
"price": price,
"category": category,
"brand": brand,
"tags": tags or [],
"indexed_at": datetime.now().isoformat()
}

bulk_actions.append({
"index": {"_index": "products", "_id": product_id}
})
bulk_actions.append(doc)

# Bulk index in Elasticsearch
if bulk_actions:
self.es.bulk(body=bulk_actions)
print(f"Indexed {len(bulk_actions)//2} products to Elasticsearch")

def search_products(self, query_text, filters=None):
"""Full-text search with facets"""
query_body = {
"query": {
"bool": {
"must": [
{
"multi_match": {
"query": query_text,
"fields": ["name^2", "description", "brand", "tags"]
}
}
],
"filter": []
}
},
"aggs": {
"categories": {"terms": {"field": "category.keyword"}},
"brands": {"terms": {"field": "brand.keyword"}},
"price_ranges": {
"range": {
"field": "price",
"ranges": [
{"to": 50},
{"from": 50, "to": 100},
{"from": 100, "to": 200},
{"from": 200}
]
}
}
}
}

# Apply filters
if filters:
if "category" in filters:
query_body["query"]["bool"]["filter"].append({
"term": {"category.keyword": filters["category"]}
})
if "brand" in filters:
query_body["query"]["bool"]["filter"].append({
"term": {"brand.keyword": filters["brand"]}
})
if "max_price" in filters:
query_body["query"]["bool"]["filter"].append({
"range": {"price": {"lte": filters["max_price"]}}
})

results = self.es.search(index="products", body=query_body)

return {
"hits": [
{
"product_id": hit["_source"]["product_id"],
"name": hit["_source"]["name"],
"price": hit["_source"]["price"],
"score": hit["_score"]
}
for hit in results["hits"]["hits"]
],
"facets": {
"categories": [
{"name": b["key"], "count": b["doc_count"]}
for b in results["aggregations"]["categories"]["buckets"]
],
"brands": [
{"name": b["key"], "count": b["doc_count"]}
for b in results["aggregations"]["brands"]["buckets"]
]
},
"total": results["hits"]["total"]["value"]
}

# Usage
pipeline = DataSyncPipeline(
pg_conn_str="postgresql://user:pwd@localhost/ecommerce",
es_hosts=["http://localhost:9200"]
)

# Sync to Elasticsearch (run hourly)
pipeline.sync_products_to_search()

# Search API
results = pipeline.search_products(
query_text="blue running shoes",
filters={"max_price": 150, "brand": "Nike"}
)

print(f"Found {results['total']} products")
for hit in results['hits'][:5]:
print(f" - {hit['name']} (${hit['price']})")

When to Use / When Not to Use

Use Search Engine (Elasticsearch) When:
  1. Need full-text search (keyword matching, fuzzy matching)
  2. Require faceted navigation (filter by category, price, etc.)
  3. Need relevance ranking (best matches first)
  4. High volume of search queries
  5. User-facing search feature (sub-second latency expected)
Use Data Warehouse (BigQuery) When:
  1. Complex aggregations across billions of rows
  2. Historical analytics (trend analysis, cohorts)
  3. Cross-organizational reporting
  4. Data freshness: hours or days acceptable
  5. Complex joins across many dimension tables

Patterns and Pitfalls

Full-text search, relevance scoring, faceted navigation. Maintains inverted indexes. Denormalize product data (embed category, brand names). Sync via ETL pipelines or CDC. Eventual consistency: lag between DB update and search visibility acceptable (seconds to minutes).
Massive analytical queries over billions of rows. Columnar storage compresses similar data. Star schema: fact tables (transactional) join with dimension tables (attributes). Load via ETL batch jobs. Freshness: hourly or daily acceptable for historical analytics.
Streaming pipelines (Kafka, Kinesis) move data from OLTP to warehouse with seconds of lag. Enables near-real-time dashboards. More complex than batch ETL; requires exactly-once semantics.
Options: Batch ETL (nightly), CDC (continuous), Streaming (real-time). Choose based on freshness requirement. Batch: simple, staleness window. CDC/Streaming: always current, more complexity.
OLTP: normalized (update once, consistency). Search/Analytics: denormalized (update in multiple systems, eventual consistency). Document design for search engine is different from star schema for warehouse.
OLTP: strong consistency (updated data immediately available). Search/Analytics: eventual consistency (lag between update and visibility). Communicate to users: 'Search index refreshes hourly'.

Operational Considerations

1. Monitoring Sync Lag

# Track lag between source (OLTP) and destination (search/warehouse)
def monitor_sync_lag(pg_conn, es_client):
cursor = pg_conn.cursor()
cursor.execute("SELECT MAX(updated_at) FROM products")
latest_oltp = cursor.fetchone()[0]

latest_search = es_client.search(
index="products",
body={"aggs": {"max_timestamp": {"max": {"field": "indexed_at"}}}}
)
latest_search_ts = latest_search["aggregations"]["max_timestamp"]["value"]

lag_seconds = (latest_oltp - latest_search_ts).total_seconds()
if lag_seconds > 3600: # Alert if > 1 hour
logger.warn(f"Search sync lag: {lag_seconds}s")

2. Handling Failed Syncs

# Retry with exponential backoff
import time
from functools import wraps

def retry_sync(max_attempts=3, backoff=2):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
logger.error(f"Sync failed after {max_attempts} attempts: {e}")
raise
wait_time = backoff ** attempt
logger.warn(f"Sync failed, retrying in {wait_time}s: {e}")
time.sleep(wait_time)
return wrapper
return decorator

@retry_sync(max_attempts=3)
def sync_to_elasticsearch():
# ETL logic here
pass

Design Review Checklist

  • Identified queries better served by specialized systems (search, analytics)
  • Measured: current latency, acceptable latency, volume
  • Chose appropriate systems (Elasticsearch for search, BigQuery for analytics)
  • Designed denormalization strategy (different for each system)
  • Selected synchronization method (batch ETL, CDC, streaming)
  • Defined acceptable freshness lag for each system
  • Implemented monitoring for sync lag and failures
  • Set up retry logic with exponential backoff
  • Documented data model in each system (search document, warehouse schema)
  • Planned capacity: storage, compute, network bandwidth
  • Tested failover: what if Elasticsearch is down? Search feature degrades gracefully?
  • Verified cost-benefit: operational overhead vs performance gains

Self-Check Questions

  1. When would you use search engine vs database?

    • Search engine: full-text search, facets, relevance ranking. Database: single record lookups, transactional consistency.
  2. What's a star schema and why use for analytics?

    • Central fact table (transactions) with dimension tables (attributes). Enables fast aggregations; standard for data warehouses.
  3. How do you keep search index synchronized with database?

    • ETL pipelines (batch), CDC (continuous), or streaming. Choose based on acceptable lag.
  4. Why denormalize for analytics but normalize for OLTP?

    • OLTP: normalized = update once, consistency. Analytics: denormalized = pre-joined, fast aggregations.

Next Steps

  1. Audit queries: Identify slow searches and analytics
  2. Estimate cost-benefit: Does specialized system ROI justify operational overhead?
  3. Pilot with one system: Start with Elasticsearch for search (most common)
  4. Build sync pipeline: Batch ETL initially (simpler than CDC)
  5. Monitor sync lag: Set up alerts, establish SLA
  6. Extend to analytics: Add data warehouse for complex reporting
  7. Optimize denormalization: Balance consistency and performance

References