Wide-Column Stores
Distributed column-oriented storage for petabyte-scale analytics and time-series
TL;DR
Wide-column stores (Cassandra, HBase) store data in columns across distributed nodes, optimized for time-series, metrics, and analytics. Petabyte-scale, tunable consistency (Cassandra), and write-optimized. Trade-off: complex operations model, eventual consistency, designed for append-heavy workloads.
Learning Objectives
- Understand column-family data model
- Design partition and clustering keys for distribution
- Recognize when wide-column stores outperform RDBMS
- Choose appropriate consistency levels
Motivating Scenario
Collecting metrics from 1M servers, 1000 metrics each, 10-second intervals = 100K metrics/second. RDBMS struggles at this scale. Cassandra partitions data by metric_name, stores time-series rows. Scales linearly with nodes. Queries for last 24 hours complete in milliseconds.
Core Concepts
Partition and Clustering Keys
CREATE TABLE metrics (
metric_name TEXT, -- Partition key
server_id TEXT, -- Clustering key part 1
timestamp TIMESTAMP, -- Clustering key part 2
value FLOAT,
PRIMARY KEY ((metric_name), server_id, timestamp)
);
-- Partition key (metric_name) determines which node
-- Clustering keys (server_id, timestamp) order rows within partition
-- All data for one metric on same node
-- Time-series queries within metric are fast
Practical Example
- Cassandra (Python)
- HBase (Python)
- Node.js + cassandra-driver
from cassandra.cluster import Cluster
from cassandra.util import uuid_from_time
from datetime import datetime, timedelta
import time
cluster = Cluster(['localhost'])
session = cluster.connect('metrics')
# Create table
session.execute("""
CREATE TABLE IF NOT EXISTS metrics (
metric_name text,
server_id text,
timestamp timestamp,
value float,
PRIMARY KEY ((metric_name), server_id, timestamp)
) WITH CLUSTERING ORDER BY (server_id ASC, timestamp DESC)
""")
# Insert metrics
def insert_metric(metric_name, server_id, value):
timestamp = datetime.utcnow()
session.execute(
"INSERT INTO metrics (metric_name, server_id, timestamp, value) VALUES (%s, %s, %s, %s)",
(metric_name, server_id, timestamp, value)
)
# Bulk insert
from cassandra.concurrent import execute_concurrent_with_args
inserts = [
("INSERT INTO metrics (metric_name, server_id, timestamp, value) VALUES (%s, %s, %s, %s)",
('cpu', f'server-{i}', datetime.utcnow() - timedelta(seconds=j), 50 + (i*j) % 30))
for i in range(10)
for j in range(60)
]
execute_concurrent_with_args(session, inserts[0][0], inserts)
# Query: Get last 24 hours of CPU metrics for server
query = session.prepare("""
SELECT timestamp, value FROM metrics
WHERE metric_name = ? AND server_id = ? AND timestamp >= ?
LIMIT 1440
ALLOW FILTERING
""")
result = session.execute(query, (
'cpu',
'server-1',
datetime.utcnow() - timedelta(hours=24)
))
for row in result:
print(f"{row.timestamp}: {row.value}%")
# Tunable consistency
from cassandra import ConsistencyLevel
# Read: wait for 2 replicas, timeout faster
read_query = session.prepare("""
SELECT value FROM metrics WHERE metric_name = ? AND server_id = ? AND timestamp = ?
""")
read_query.consistency_level = ConsistencyLevel.TWO
result = session.execute(read_query, ('cpu', 'server-1', datetime.utcnow()))
import happybase
# Connect to HBase
connection = happybase.Connection('localhost')
table = connection.get_table('metrics')
# HBase uses byte strings
def bytes_key(s):
return str(s).encode('utf-8')
# Insert: row_key = metric:server:timestamp
def insert_metric(metric, server, timestamp, value):
row_key = f'{metric}:{server}:{timestamp}'.encode('utf-8')
table.put(row_key, {
b'data:value': str(value).encode('utf-8'),
b'data:unit': b'percent'
})
# Insert metrics
now = int(time.time())
for i in range(100):
insert_metric('cpu', f'server-{i}', now, 50)
# Scan: Get metrics for server in time range
def get_metrics_range(metric, server, start_time, end_time):
start_key = f'{metric}:{server}:{start_time}'.encode('utf-8')
stop_key = f'{metric}:{server}:{end_time}'.encode('utf-8')
results = []
for key, data in table.scan(start_row=start_key, stop_row=stop_key):
value = float(data[b'data:value'])
results.append(value)
return results
# Row-by-row scan
for key, data in table.scan(row_prefix=b'cpu:'):
print(f"Key: {key}, Value: {data[b'data:value']}")
const cassandra = require('cassandra-driver');
const client = new cassandra.Client({
contactPoints: ['localhost'],
localDataCenter: 'datacenter1'
});
async function insertMetric(metricName, serverId, value) {
const timestamp = new Date();
const query = `
INSERT INTO metrics (metric_name, server_id, timestamp, value)
VALUES (?, ?, ?, ?)
`;
await client.execute(query,
[metricName, serverId, timestamp, value],
{ prepare: true }
);
}
async function getMetricsLast24Hours(metricName, serverId) {
const twentyFourHoursAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
const query = `
SELECT timestamp, value FROM metrics
WHERE metric_name = ? AND server_id = ? AND timestamp >= ?
ORDER BY timestamp DESC
LIMIT 1440
`;
const result = await client.execute(query,
[metricName, serverId, twentyFourHoursAgo],
{ prepare: true }
);
return result.rows;
}
async function main() {
await client.connect();
// Insert metrics from multiple servers
const insertPromises = [];
for (let i = 1; i <= 100; i++) {
insertPromises.push(
insertMetric('cpu', `server-${i}`, 40 + Math.random() * 20)
);
}
await Promise.all(insertPromises);
// Query
const metrics = await getMetricsLast24Hours('cpu', 'server-1');
console.log(`Retrieved ${metrics.length} metrics`);
await client.shutdown();
}
main().catch(console.error);
When to Use Wide-Column Stores / When Not to Use
- Time-series data (metrics, events, logs)
- Petabyte-scale data volume
- Write-heavy workloads
- Linear horizontal scaling needed
- Geographically distributed
- Complex queries with JOINs
- Strong ACID required
- Dataset < 100GB easily fits
- Complex reporting queries
- Data normalization important
Patterns and Pitfalls
Design Review Checklist
- Partition key chosen to avoid hotspots
- Clustering key order matches query patterns
- Partition size within limits (e.g., <100GB)
- Time-bucketing strategy for time-series data
- Replication factor >= 3 for reliability
- Consistency level tuned for each query
- Compaction strategy configured
- Monitoring for hotspots and imbalance
- Backup and recovery procedures documented
- Schema versioning strategy planned
Advanced Concepts: Compaction and Repair
Wide-column stores use compaction to optimize storage and query performance:
Write Path in Cassandra:
1. Write to memtable (in-memory)
2. Write to commit log (persistent, for recovery)
3. Memtable gets full; flush to SSTables (immutable files on disk)
4. Multiple SSTables accumulate over time
5. Compaction: merge SSTables, remove deleted data, optimize for queries
Compaction Strategies:
- SizeTieredCompactionStrategy: Merge similar-sized SSTables (default)
- LeveledCompactionStrategy: Keep data in levels, like LSM trees (low latency)
- TimeWindowCompactionStrategy: Good for time-series (e.g., one week per file)
Tombstones mark deleted data:
# Delete key at timestamp 1000
DELETE FROM metrics WHERE metric_name = 'cpu' AND server_id = 'server-1' AND timestamp = 1000
# Actually: insert tombstone with timestamp 1000
# Tombstone is kept for gc_grace_seconds (default 10 days)
# After 10 days, tombstone can be discarded
# Problem: If nodes down during delete, they don't get tombstone
# When they rejoin, old data resurfaces (zombie)
# Solution: Run repair command to sync nodes
nodetool repair cassandra-cluster
# Repair compares nodes' data, resyncs diverged rows
# Cost: Expensive I/O; should be scheduled during low traffic
Designing for Query Patterns
Wide-column stores don't support arbitrary queries; design tables for specific access patterns:
# Anti-pattern: Single table for all queries
# Problem: Can't easily query "all metrics for server X in last 24 hours"
# if partition key is metric_name
# Good: Denormalize for query pattern
CREATE TABLE metrics_by_server (
server_id TEXT,
timestamp TIMESTAMP,
metric_name TEXT,
value FLOAT,
PRIMARY KEY ((server_id), timestamp, metric_name)
) WITH CLUSTERING ORDER BY (timestamp DESC, metric_name ASC)
# Query: Get all metrics for server-1 in last 24 hours
SELECT * FROM metrics_by_server
WHERE server_id = 'server-1'
AND timestamp >= now() - '24h'
# Alternative table for different query
CREATE TABLE metrics_by_metric (
metric_name TEXT,
timestamp TIMESTAMP,
server_id TEXT,
value FLOAT,
PRIMARY KEY ((metric_name), timestamp, server_id)
)
# Query: Get all servers' CPU in last hour
SELECT * FROM metrics_by_metric
WHERE metric_name = 'cpu'
AND timestamp >= now() - '1h'
# Tradeoff: Storing data twice (denormalization)
# Benefit: Both queries are fast
Consistency and Availability Trade-offs
Wide-column stores typically offer eventual consistency, but you can tune per-query:
from cassandra import ConsistencyLevel
# Write: ensure data on N nodes before returning
session.execute(query, consistency_level=ConsistencyLevel.QUORUM)
# QUORUM = ceil(replication_factor / 2) nodes must acknowledge
# For RF=3: need 2 nodes
# Slow but safe: unlikely to lose committed writes
session.execute(query, consistency_level=ConsistencyLevel.ONE)
# Fast but risky: return after 1 node acknowledges
# If that node dies, data could be lost
# Read: wait for N nodes' data, return most recent
read_query = """
SELECT * FROM metrics
WHERE metric_name = ? AND server_id = ? AND timestamp >= ?
"""
read_query.consistency_level = ConsistencyLevel.QUORUM
# Read from 2 nodes, compare timestamps, return latest
# Trade-off: slower, but less likely to see stale data
read_query.consistency_level = ConsistencyLevel.ONE
# Fastest read, but most likely to see stale data
# OK for metrics (small staleness acceptable)
# NOT OK for financial data
Monitoring and Operational Excellence
Wide-column stores require proactive monitoring:
# Metrics to monitor
metrics = {
"read_latency_p99": "target: < 10ms",
"write_latency_p99": "target: < 5ms",
"gc_pause_duration": "target: < 500ms",
"compaction_pending_tasks": "target: < 10",
"tombstone_count_per_sstable": "alert if > 100k",
"heap_used": "alert if > 80% of max",
"disk_space_used": "alert if > 80% of capacity"
}
# Common operational tasks
operations = {
"nodetool status": "Check cluster health; shows each node's token range",
"nodetool repair": "Sync data between nodes; run weekly",
"nodetool compact": "Force compaction; use cautiously (I/O intensive)",
"nodetool tpstats": "Show thread pool stats; detect stalls",
"cqlsh": "Interactive shell for ad-hoc queries"
}
Real-World Scenario: Metrics at 1M Servers
Scale: 1M servers * 1000 metrics/server * 10 measurements/min
= 10B metrics per minute
= ~167M metrics per second (huge!)
Design:
Cluster: 10 nodes, RF=3 (3 copies of each metric)
Partition key: metric_name + date (e.g., "cpu:2025-02-15")
Clustering key: server_id, timestamp
Rationale:
- Daily partitions prevent single partition from getting too large
- ~10B metrics / 10 nodes / 3 copies = 333M metrics per node per day
- Manageable (not too big, not too small)
- Queries for "last 24 hours of CPU across all servers" hit <= 2 partitions
Query examples:
SELECT value FROM metrics
WHERE metric_name = 'cpu:2025-02-15'
AND server_id = 'server-123'
AND timestamp >= '2025-02-14 10:00'
Operations:
- Run repair weekly (distributed system eventually diverges)
- Monitor compaction queue (7-10 day retention per node)
- Plan capacity: add 1 node per 3-4 months to handle growth
- Backup: snapshot clusters daily; retain for 30 days
Self-Check
- What's the difference between partition key and clustering key?
- Why avoid hotspots in Cassandra?
- How would you design metrics storage for 1M servers?
- What's tunable consistency and when use it?
- What are tombstones and why do they matter?
- When would you denormalize and maintain multiple tables for the same data?
Wide-column stores are purpose-built for massive-scale time-series and analytical workloads, trading away complex queries and consistency for linear horizontal scaling and predictable latency. Use them for metrics, logs, and events when your volume exceeds RDBMS capabilities (100K+ writes/sec).
Next Steps
- Explore Data Pipelines & Analytics for aggregating time-series
- Learn Sharding Strategies for distributed data design
- Study Materialized Views for pre-aggregated metrics
- Dive into Monitoring and Observability for operational insights
- Master Compaction Strategies for different workload patterns
References
- Cassandra Documentation
- HBase in Action (O'Reilly)
- "Designing Data-Intensive Applications" by Martin Kleppmann
- Google BigTable Paper (2006)
- "Learning Cassandra" (O'Reilly) by Eben Hewitt
- Cassandra Best Practices (DataStax blogs)