Data Replication, Sharding, and Partitioning
Scale data storage horizontally by distributing data across multiple databases and replicas.
TL;DR
Single databases don't scale indefinitely. Three techniques distribute data: replication (same data on multiple servers for availability and read scaling), sharding (split data by key across servers to distribute load), and partitioning (split data within a database for manageability). Use replication for high availability and read-heavy workloads. Use sharding when one database can't handle the write volume. Sharding introduces complexity: routing logic (how to find the right shard?), rebalancing (what happens when you add a new shard?), and cross-shard queries (expensive and hard). Most systems use all three: primary/secondary replication for availability, sharding by user ID for horizontal scaling, and logical partitions within each shard for manageability.
Learning Objectives
- Understand replication strategies and consistency tradeoffs
- Design sharding keys and partitioning schemes
- Implement shard routing logic
- Handle shard rebalancing and hotspots
- Deal with cross-shard queries and transactions
- Monitor and manage replicas at scale
Motivating Scenario
A service's database grows to terabytes. Queries slow down. Writes can't keep up. Vertical scaling (bigger servers) has limits. Replication helps with reads but not writes. Sharding distributes writes across multiple databases. But how do you route requests to the right shard? What if one shard gets too much traffic? How do you join data across shards?
Core Concepts
Replication Patterns
Leader-Follower: writes go to leader, replicated asynchronously to followers. Followers handle reads. Tradeoff: eventual consistency (followers lag) but high availability. Multi-Leader: writes can go to any replica, synchronized between replicas. Tradeoff: more complexity, conflict resolution.
Sharding
Partition data by a key (user ID, order ID) across multiple databases. Each database handles a subset of data. Distribute writes across shards, improving throughput. Tradeoff: queries across multiple shards are expensive, transactions spanning shards are hard.
Partitioning
Divide large tables into smaller partitions (by date range, hash range) within a single database. Improves query performance (query only relevant partitions) and manageability (drop old partitions). Not true horizontal scaling but helpful for large tables.
Hotspots and Rebalancing
Some shards get more traffic than others (hotspots). Add new shards and rebalance data: gradually move keys from old shards to new shards. Rebalancing is complex, requires routing logic updates, can impact availability.
Practical Example
- Python
- Go
- Node.js
# ❌ POOR - Single database bottleneck
class OrderService:
def __init__(self, db):
self.db = db
def create_order(self, user_id, items):
order = Order(user_id=user_id, items=items)
self.db.insert('orders', order) # Single database bottleneck
return order
# ✅ EXCELLENT - Sharded data with routing
class ShardRouter:
def __init__(self, num_shards):
self.num_shards = num_shards
self.shards = {i: Shard(i) for i in range(num_shards)}
def get_shard(self, key):
"""Determine which shard owns this key"""
shard_id = hash(key) % self.num_shards
return self.shards[shard_id]
def route(self, key, operation, *args):
"""Route operation to appropriate shard"""
shard = self.get_shard(key)
return getattr(shard, operation)(*args)
class Shard:
def __init__(self, shard_id):
self.shard_id = shard_id
self.leader = Database(f"leader-{shard_id}")
self.followers = [Database(f"follower-{shard_id}-{i}") for i in range(2)]
def write(self, table, data):
"""Write to leader"""
self.leader.insert(table, data)
# Asynchronously replicate to followers
self._replicate_to_followers(table, data)
def read(self, table, conditions):
"""Read from follower for load distribution"""
follower = random.choice(self.followers)
return follower.query(f"SELECT * FROM {table} WHERE {conditions}")
def _replicate_to_followers(self, table, data):
for follower in self.followers:
async_replicate(follower, table, data)
class OrderService:
def __init__(self, router):
self.router = router
def create_order(self, user_id, items):
order = Order(user_id=user_id, items=items)
# Route to shard based on user_id
self.router.route(user_id, 'write', 'orders', order)
return order
def get_user_orders(self, user_id):
# Route to shard, read from follower
return self.router.route(user_id, 'read', 'orders', f"user_id = {user_id}")
def get_all_orders(self):
# Cross-shard query: expensive and slow
all_orders = []
for shard_id in range(self.router.num_shards):
shard = self.router.shards[shard_id]
all_orders.extend(shard.read('orders', '1=1'))
return all_orders
// ❌ POOR - Single database
func (s *OrderService) CreateOrder(ctx context.Context, userID string, items []Item) error {
order := &Order{UserID: userID, Items: items}
return s.db.Insert(ctx, order) // Single database bottleneck
}
// ✅ EXCELLENT - Sharded architecture
type ShardRouter struct {
numShards int
shards []*Shard
}
func NewShardRouter(numShards int) *ShardRouter {
shards := make([]*Shard, numShards)
for i := 0; i < numShards; i++ {
shards[i] = NewShard(i)
}
return &ShardRouter{numShards: numShards, shards: shards}
}
func (sr *ShardRouter) GetShard(key string) *Shard {
shardID := fnv.New32a()
shardID.Write([]byte(key))
return sr.shards[shardID.Sum32()%uint32(sr.numShards)]
}
type Shard struct {
id int
leader *Database
followers []*Database
}
func NewShard(id int) *Shard {
return &Shard{
id: id,
leader: NewDatabase(fmt.Sprintf("leader-%d", id)),
followers: []*Database{
NewDatabase(fmt.Sprintf("follower-%d-0", id)),
NewDatabase(fmt.Sprintf("follower-%d-1", id)),
},
}
}
func (s *Shard) Write(ctx context.Context, order *Order) error {
// Write to leader
if err := s.leader.Insert(ctx, order); err != nil {
return err
}
// Replicate to followers asynchronously
go s.replicateToFollowers(context.Background(), order)
return nil
}
func (s *Shard) Read(ctx context.Context, userID string) ([]Order, error) {
// Read from follower for load distribution
follower := s.followers[rand.Intn(len(s.followers))]
return follower.Query(ctx, "SELECT * FROM orders WHERE user_id = ?", userID)
}
type OrderService struct {
router *ShardRouter
}
func (os *OrderService) CreateOrder(ctx context.Context, userID string, items []Item) error {
order := &Order{UserID: userID, Items: items}
shard := os.router.GetShard(userID)
return shard.Write(ctx, order)
}
func (os *OrderService) GetUserOrders(ctx context.Context, userID string) ([]Order, error) {
shard := os.router.GetShard(userID)
return shard.Read(ctx, userID)
}
func (os *OrderService) GetAllOrders(ctx context.Context) ([]Order, error) {
// Cross-shard query: expensive
var allOrders []Order
for _, shard := range os.router.shards {
orders, err := shard.Read(ctx, "1=1")
if err != nil {
return nil, err
}
allOrders = append(allOrders, orders...)
}
return allOrders, nil
}
// ❌ POOR - Single database
class OrderService {
constructor(db) {
this.db = db;
}
async createOrder(userId, items) {
return await this.db.insert('orders', { userId, items });
}
}
// ✅ EXCELLENT - Sharded architecture
class Shard {
constructor(id, leader, followers) {
this.id = id;
this.leader = leader;
this.followers = followers;
}
async write(table, data) {
await this.leader.insert(table, data);
// Replicate asynchronously
this.followers.forEach(follower => {
setImmediate(() => this.replicate(follower, table, data));
});
}
async read(table, conditions) {
const follower = this.followers[Math.floor(Math.random() * this.followers.length)];
return await follower.query(`SELECT * FROM ${table} WHERE ${conditions}`);
}
async replicate(follower, table, data) {
try {
await follower.insert(table, data);
} catch (error) {
console.error('Replication failed:', error);
}
}
}
class ShardRouter {
constructor(numShards) {
this.numShards = numShards;
this.shards = {};
for (let i = 0; i < numShards; i++) {
const leader = new Database(`leader-${i}`);
const followers = [
new Database(`follower-${i}-0`),
new Database(`follower-${i}-1`)
];
this.shards[i] = new Shard(i, leader, followers);
}
}
getShard(key) {
const hash = this.hashKey(key);
return this.shards[hash % this.numShards];
}
hashKey(key) {
let hash = 0;
for (let i = 0; i < key.length; i++) {
const char = key.charCodeAt(i);
hash = ((hash << 5) - hash) + char;
hash = hash & hash; // Convert to 32bit integer
}
return Math.abs(hash);
}
}
class OrderService {
constructor(router) {
this.router = router;
}
async createOrder(userId, items) {
const shard = this.router.getShard(userId);
await shard.write('orders', { userId, items });
}
async getUserOrders(userId) {
const shard = this.router.getShard(userId);
return await shard.read('orders', `user_id = '${userId}'`);
}
async getAllOrders() {
// Cross-shard query: slow
const allOrders = [];
for (let i = 0; i < this.router.numShards; i++) {
const orders = await this.router.shards[i].read('orders', '1=1');
allOrders.push(...orders);
}
return allOrders;
}
}
When to Use / When Not to Use
- Read-heavy workloads (distribute reads across replicas)
- High availability requirements (survive server failures)
- Geographic distribution (replicas in different regions)
- Any production system handling important data
- Write-heavy workloads exceeding single database capacity
- Data too large to fit on one server
- Geographic partitioning is natural (EU data in EU, US data in US)
- Systems where most queries access single shard
Patterns and Pitfalls
Design Review Checklist
- Replication strategy (leader-follower, multi-leader) is clearly documented
- Replica lag is measured and acceptable for your consistency requirements
- Sharding key is chosen to distribute data and load evenly
- Routing logic correctly maps keys to shards
- Cross-shard queries are identified and marked as expensive operations
- Shard rebalancing strategy exists for adding/removing shards
- Monitoring and alerting cover replica lag, shard balance, and hot shards
Self-Check
- What's the tradeoff between replication and consistency?
- How do you choose a sharding key?
- What challenges arise from cross-shard queries?
Replication solves availability and read scaling. Sharding solves write scaling. Choose both based on your bottlenecks, but start simple and scale only when needed.
Next Steps
- Implement replication with heartbeat monitoring and failover
- Design sharding key selection for your domain
- Build shard router with consistent hashing for rebalancing
- Create tools to monitor replica lag and shard imbalance
References
- Martin Kleppmann, Designing Data-Intensive Applications (O'Reilly)
- MySQL 8.0 Reference Manual: Replication
- Designing Sharded Database Systems