Scalable Algorithm Design
Learn how to design algorithms that can handle large-scale data and distributed systems efficiently.
Distributed Algorithms
Distributed Sorting
// Distributed merge sort using MapReduce pattern
class DistributedMergeSort {
public:
// Map phase: Sort local data
vector<int> mapPhase(vector<int>& localData) {
sort(localData.begin(), localData.end());
return localData;
}
// Reduce phase: Merge sorted data from multiple nodes
vector<int> reducePhase(vector<vector<int>>& sortedChunks) {
return mergeKSortedArrays(sortedChunks);
}
private:
vector<int> mergeKSortedArrays(vector<vector<int>>& arrays) {
priority_queue<pair<int, pair<int, int>>, vector<pair<int, pair<int, int>>>, greater<>> pq;
// Initialize heap with first element from each array
for (int i = 0; i < arrays.size(); i++) {
if (!arrays[i].empty()) {
pq.push({arrays[i][0], {i, 0}});
}
}
vector<int> result;
while (!pq.empty()) {
auto [value, indices] = pq.top();
pq.pop();
result.push_back(value);
int arrayIndex = indices.first;
int elementIndex = indices.second;
if (elementIndex + 1 < arrays[arrayIndex].size()) {
pq.push({arrays[arrayIndex][elementIndex + 1], {arrayIndex, elementIndex + 1}});
}
}
return result;
}
};
Distributed Graph Algorithms
// Distributed BFS using message passing
class DistributedBFS {
private:
unordered_map<int, vector<int>> graph;
unordered_map<int, int> distances;
queue<int> messageQueue;
public:
void initializeGraph(const vector<pair<int, int>>& edges) {
for (const auto& edge : edges) {
graph[edge.first].push_back(edge.second);
graph[edge.second].push_back(edge.first);
}
}
void bfs(int startNode) {
distances[startNode] = 0;
messageQueue.push(startNode);
while (!messageQueue.empty()) {
int currentNode = messageQueue.front();
messageQueue.pop();
// Send distance updates to neighbors
for (int neighbor : graph[currentNode]) {
if (distances.find(neighbor) == distances.end()) {
distances[neighbor] = distances[currentNode] + 1;
messageQueue.push(neighbor);
}
}
}
}
int getDistance(int node) {
return distances[node];
}
};
Consensus Algorithms
// Simplified Raft consensus algorithm
class RaftConsensus {
private:
enum State { FOLLOWER, CANDIDATE, LEADER };
State currentState;
int currentTerm;
int votedFor;
int commitIndex;
int lastApplied;
public:
RaftConsensus() : currentState(FOLLOWER), currentTerm(0), votedFor(-1),
commitIndex(0), lastApplied(0) {}
// Request vote from other nodes
bool requestVote(int candidateId, int term, int lastLogIndex, int lastLogTerm) {
if (term > currentTerm) {
currentTerm = term;
currentState = FOLLOWER;
votedFor = candidateId;
return true;
}
return false;
}
// Append entries from leader
bool appendEntries(int leaderId, int term, int prevLogIndex, int prevLogTerm,
const vector<string>& entries, int leaderCommit) {
if (term >= currentTerm) {
currentTerm = term;
currentState = FOLLOWER;
// Apply entries and update commit index
if (leaderCommit > commitIndex) {
commitIndex = min(leaderCommit, (int)entries.size());
}
return true;
}
return false;
}
// Start election
void startElection() {
currentState = CANDIDATE;
currentTerm++;
votedFor = getNodeId();
// Request votes from other nodes
int votes = 1; // Vote for self
for (int nodeId : getOtherNodes()) {
if (requestVoteFromNode(nodeId)) {
votes++;
}
}
if (votes > getMajority()) {
currentState = LEADER;
startHeartbeat();
}
}
private:
int getNodeId() { return 1; } // Simplified
vector<int> getOtherNodes() { return {2, 3, 4, 5}; } // Simplified
bool requestVoteFromNode(int nodeId) { return true; } // Simplified
int getMajority() { return 3; } // Simplified
void startHeartbeat() {} // Simplified
};
Parallel Processing
Parallel Merge Sort
#include <thread>
#include <future>
class ParallelMergeSort {
public:
void parallelSort(vector<int>& arr) {
parallelSortRecursive(arr, 0, arr.size() - 1, 0);
}
private:
void parallelSortRecursive(vector<int>& arr, int left, int right, int depth) {
if (left >= right) return;
int mid = left + (right - left) / 2;
if (depth < maxDepth) {
// Create threads for parallel execution
auto future1 = async(launch::async, [&]() {
parallelSortRecursive(arr, left, mid, depth + 1);
});
auto future2 = async(launch::async, [&]() {
parallelSortRecursive(arr, mid + 1, right, depth + 1);
});
future1.wait();
future2.wait();
} else {
// Sequential execution for deep recursion
parallelSortRecursive(arr, left, mid, depth + 1);
parallelSortRecursive(arr, mid + 1, right, depth + 1);
}
merge(arr, left, mid, right);
}
void merge(vector<int>& arr, int left, int mid, int right) {
vector<int> temp(right - left + 1);
int i = left, j = mid + 1, k = 0;
while (i <= mid && j <= right) {
if (arr[i] <= arr[j]) {
temp[k++] = arr[i++];
} else {
temp[k++] = arr[j++];
}
}
while (i <= mid) temp[k++] = arr[i++];
while (j <= right) temp[k++] = arr[j++];
for (int i = 0; i < k; i++) {
arr[left + i] = temp[i];
}
}
static const int maxDepth = 4; // Limit thread creation depth
};
Parallel Matrix Multiplication
class ParallelMatrixMultiplication {
public:
vector<vector<int>> multiply(const vector<vector<int>>& A,
const vector<vector<int>>& B) {
int m = A.size();
int n = B[0].size();
int p = B.size();
vector<vector<int>> result(m, vector<int>(n, 0));
// Divide work among threads
int numThreads = thread::hardware_concurrency();
vector<thread> threads;
for (int t = 0; t < numThreads; t++) {
threads.emplace_back([&, t, numThreads, m, n, p]() {
for (int i = t; i < m; i += numThreads) {
for (int j = 0; j < n; j++) {
for (int k = 0; k < p; k++) {
result[i][j] += A[i][k] * B[k][j];
}
}
}
});
}
for (auto& thread : threads) {
thread.join();
}
return result;
}
};
MapReduce Patterns
Word Count MapReduce
class WordCountMapReduce {
public:
// Map function: emit (word, 1) for each word
vector<pair<string, int>> map(const string& document) {
vector<pair<string, int>> result;
stringstream ss(document);
string word;
while (ss >> word) {
// Clean word (remove punctuation, convert to lowercase)
word = cleanWord(word);
if (!word.empty()) {
result.push_back({word, 1});
}
}
return result;
}
// Reduce function: sum counts for each word
pair<string, int> reduce(const string& word, const vector<int>& counts) {
int totalCount = 0;
for (int count : counts) {
totalCount += count;
}
return {word, totalCount};
}
private:
string cleanWord(const string& word) {
string cleaned;
for (char c : word) {
if (isalpha(c)) {
cleaned += tolower(c);
}
}
return cleaned;
}
};
Inverted Index MapReduce
class InvertedIndexMapReduce {
public:
// Map function: emit (word, documentId) for each word
vector<pair<string, int>> map(const string& document, int documentId) {
vector<pair<string, int>> result;
stringstream ss(document);
string word;
while (ss >> word) {
word = cleanWord(word);
if (!word.empty()) {
result.push_back({word, documentId});
}
}
return result;
}
// Reduce function: collect document IDs for each word
pair<string, vector<int>> reduce(const string& word, const vector<int>& documentIds) {
set<int> uniqueDocs(documentIds.begin(), documentIds.end());
return {word, vector<int>(uniqueDocs.begin(), uniqueDocs.end())};
}
private:
string cleanWord(const string& word) {
string cleaned;
for (char c : word) {
if (isalpha(c)) {
cleaned += tolower(c);
}
}
return cleaned;
}
};
Stream Processing
Sliding Window Average
class SlidingWindowAverage {
private:
queue<double> window;
double sum;
int windowSize;
public:
SlidingWindowAverage(int size) : windowSize(size), sum(0) {}
double addValue(double value) {
window.push(value);
sum += value;
if (window.size() > windowSize) {
sum -= window.front();
window.pop();
}
return sum / window.size();
}
double getAverage() const {
return window.empty() ? 0 : sum / window.size();
}
};
Top K Elements in Stream
class TopKElements {
private:
priority_queue<int, vector<int>, greater<>> minHeap;
int k;
public:
TopKElements(int k) : k(k) {}
void add(int value) {
if (minHeap.size() < k) {
minHeap.push(value);
} else if (value > minHeap.top()) {
minHeap.pop();
minHeap.push(value);
}
}
vector<int> getTopK() {
vector<int> result;
while (!minHeap.empty()) {
result.push_back(minHeap.top());
minHeap.pop();
}
reverse(result.begin(), result.end());
return result;
}
};
Stream Deduplication
class StreamDeduplication {
private:
unordered_set<string> seen;
queue<string> window;
int windowSize;
public:
StreamDeduplication(int size) : windowSize(size) {}
bool isDuplicate(const string& value) {
if (seen.find(value) != seen.end()) {
return true;
}
seen.insert(value);
window.push(value);
if (window.size() > windowSize) {
string oldValue = window.front();
window.pop();
seen.erase(oldValue);
}
return false;
}
};
Caching Strategies
LRU Cache
class LRUCache {
private:
int capacity;
list<pair<int, int>> items;
unordered_map<int, list<pair<int, int>>::iterator> cache;
public:
LRUCache(int cap) : capacity(cap) {}
int get(int key) {
auto it = cache.find(key);
if (it == cache.end()) {
return -1;
}
// Move to front (most recently used)
items.splice(items.begin(), items, it->second);
return it->second->second;
}
void put(int key, int value) {
auto it = cache.find(key);
if (it != cache.end()) {
// Update existing key
it->second->second = value;
items.splice(items.begin(), items, it->second);
} else {
// Add new key
if (cache.size() >= capacity) {
// Remove least recently used
int lruKey = items.back().first;
items.pop_back();
cache.erase(lruKey);
}
items.push_front({key, value});
cache[key] = items.begin();
}
}
};
LFU Cache
class LFUCache {
private:
int capacity;
unordered_map<int, int> values;
unordered_map<int, int> frequencies;
unordered_map<int, list<int>> freqLists;
unordered_map<int, list<int>::iterator> iterators;
int minFreq;
public:
LFUCache(int cap) : capacity(cap), minFreq(0) {}
int get(int key) {
if (values.find(key) == values.end()) {
return -1;
}
// Update frequency
updateFrequency(key);
return values[key];
}
void put(int key, int value) {
if (capacity <= 0) return;
if (values.find(key) != values.end()) {
// Update existing key
values[key] = value;
updateFrequency(key);
} else {
// Add new key
if (values.size() >= capacity) {
// Remove least frequently used
int lfuKey = freqLists[minFreq].back();
freqLists[minFreq].pop_back();
values.erase(lfuKey);
frequencies.erase(lfuKey);
iterators.erase(lfuKey);
}
values[key] = value;
frequencies[key] = 1;
freqLists[1].push_front(key);
iterators[key] = freqLists[1].begin();
minFreq = 1;
}
}
private:
void updateFrequency(int key) {
int freq = frequencies[key];
freqLists[freq].erase(iterators[key]);
if (freqLists[freq].empty() && freq == minFreq) {
minFreq++;
}
frequencies[key]++;
freqLists[freq + 1].push_front(key);
iterators[key] = freqLists[freq + 1].begin();
}
};
Performance Analysis
Time Complexity
- Distributed sorting: O(n log n / p) where p is number of processors
- Parallel matrix multiplication: O(n³ / p)
- MapReduce: O(n / p) for map phase, O(n) for reduce phase
- Stream processing: O(1) per element
- Caching: O(1) for get/put operations
Space Complexity
- Distributed algorithms: O(n / p) per node
- Parallel processing: O(n) total
- MapReduce: O(n) for intermediate results
- Stream processing: O(k) where k is window size
- Caching: O(capacity)
Common Patterns
- Divide and conquer for parallel processing
- Map-reduce for distributed computing
- Sliding window for stream processing
- Caching for performance optimization
- Consensus for distributed systems
Applications
- Big data processing: Hadoop, Spark
- Distributed systems: Microservices, cloud computing
- Real-time analytics: Stream processing
- Search engines: Inverted indexes
- Caching systems: Redis, Memcached