Data Quality, Lineage & Governance
Maintain data integrity and track data flow
TL;DR
Data quality: validate schema, null constraints, detect anomalies at pipeline ingestion. Lineage: track table A → B → C dependencies for debugging. Data catalog: metadata repository (tables, columns, owners, tags). Governance: enforce PII tracking, retention policies, access control, data contracts. Modern tools: dbt (lineage), Great Expectations (quality), Apache Atlas (catalog), Collibra (governance). Invest in observability: monitor freshness, completeness, accuracy of critical datasets.
Learning Objectives
By the end of this article, you will understand:
- Data quality checks and where to implement them
- Data lineage concepts and tools
- Data catalog structure and benefits
- Governance policies: PII, retention, access
- Data contracts and schema validation
- Observability metrics for data pipelines
- Operational best practices
Motivating Scenario
Your finance team runs daily revenue reports. One week, reported revenue drops 50% unexpectedly. Investigation reveals: a data pipeline misconfigured nullable fields, silently dropping transactions with NULL product_id (representing gift cards). Root cause took 3 days to track. With proper governance: data quality checks would catch missing product IDs immediately; lineage would show where rows dropped; catalog would document that product_id is required; contracts would enforce schema upstream.
Core Concepts
1. Data Quality
Schema validation, null/uniqueness constraints, anomaly detection:
# Great Expectations: quality checks
expectation_suite = ExpectationSuite(name="transactions")
expectation_suite.add_expectation(
ExpectColumnValuesToNotBeNull(column="transaction_id")
)
expectation_suite.add_expectation(
ExpectColumnValuesToBeBetween(column="amount", min_value=0, max_value=1000000)
)
expectation_suite.add_expectation(
ExpectTableRowCountToBeBetween(min_value=1000, max_value=10000000)
)
2. Lineage
Tracks data dependencies: source → transform → target
raw_events (source)
↓
events_parsed (cleaned)
↓
user_events_daily (aggregated)
↓
user_dashboards (reporting)
Tools: dbt (SQL transformations), Apache Atlas, OpenMetadata
3. Catalog
Metadata repository: what data exists, owner, freshness, SLA
{
"table": "user_events_daily",
"owner": "analytics@company.com",
"description": "Daily user event aggregations",
"tags": ["pii:email", "pii:user_id", "critical"],
"schema": {...},
"retention": "90 days",
"refresh_sla": "daily at 3 AM UTC",
"last_updated": "2025-02-14T03:05:00Z"
}
Practical Example
Data Quality & Governance Implementation
- Python (Great Expectations + dbt)
- Go (Data Contracts)
- Node.js (Catalog API)
from great_expectations.core.expectation_suite import ExpectationSuite
import pandas as pd
import logging
class DataGovernance:
def __init__(self):
self.quality_suite = ExpectationSuite(name="production")
self.catalog = {}
self.lineage = {}
def define_quality_contract(self, table_name, schema):
"""Define expected data contract"""
expectations = ExpectationSuite(name=table_name)
for column, dtype in schema.items():
# Not null check
expectations.add_expectation(
ExpectColumnValuesToNotBeNull(column=column)
)
# Type check
if dtype == "integer":
expectations.add_expectation(
ExpectColumnValuesToBeOfType(column=column, type_="int64")
)
elif dtype == "email":
expectations.add_expectation(
ExpectColumnValuesToMatchRegex(
column=column,
regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
)
)
self.quality_suite = expectations
return expectations
def validate_data(self, df, expectations):
"""Run quality checks"""
validation_result = expectations.validate(df)
if not validation_result.success:
failed = validation_result.failures
logging.error(f"Data quality check failed: {failed}")
raise ValueError(f"Data validation failed: {failed}")
return True
def register_catalog_entry(self, table_name, metadata):
"""Register table in catalog"""
self.catalog[table_name] = {
"name": table_name,
"owner": metadata.get("owner"),
"description": metadata.get("description"),
"tags": metadata.get("tags", []),
"freshness_sla_hours": metadata.get("freshness_hours", 24),
"pii_fields": metadata.get("pii_fields", []),
"retention_days": metadata.get("retention_days", 90),
"created_at": datetime.now().isoformat()
}
def track_lineage(self, source_tables, target_table, transformation):
"""Track data flow"""
self.lineage[target_table] = {
"sources": source_tables,
"transformation": transformation,
"created_at": datetime.now().isoformat()
}
def validate_pii_access(self, user_email, table_name):
"""Check if user can access PII fields"""
table_meta = self.catalog.get(table_name)
if not table_meta:
raise ValueError(f"Table {table_name} not in catalog")
pii_fields = table_meta.get("pii_fields", [])
if pii_fields:
# Check user permissions
if not self.user_has_pii_access(user_email):
logging.warn(f"User {user_email} denied access to PII in {table_name}")
raise PermissionError(f"No PII access for {user_email}")
return True
def check_freshness(self, table_name, last_update_time):
"""Monitor data freshness"""
table_meta = self.catalog.get(table_name)
sla_hours = table_meta.get("freshness_sla_hours", 24)
age_hours = (datetime.now() - last_update_time).total_seconds() / 3600
if age_hours > sla_hours:
logging.error(f"{table_name} is stale: {age_hours}h > SLA {sla_hours}h")
raise ValueError(f"Data freshness SLA violated for {table_name}")
return True
# Usage
gov = DataGovernance()
# Define contract
schema = {
"user_id": "integer",
"email": "email",
"created_at": "timestamp",
"purchase_amount": "float"
}
gov.define_quality_contract("transactions", schema)
# Register in catalog
gov.register_catalog_entry("transactions", {
"owner": "finance@company.com",
"description": "Daily transaction log",
"tags": ["pii:email", "financial", "critical"],
"pii_fields": ["user_id", "email"],
"freshness_hours": 4,
"retention_days": 365
})
# Track lineage
gov.track_lineage(
source_tables=["raw_events"],
target_table="transactions",
transformation="Clean, deduplicate, enrich with user data"
)
# Validate incoming data
df = pd.read_csv("transactions.csv")
gov.validate_data(df, gov.quality_suite)
package main
import (
"database/sql"
"encoding/json"
"log"
"time"
)
type CatalogEntry struct {
TableName string `json:"table_name"`
Owner string `json:"owner"`
Description string `json:"description"`
Tags []string `json:"tags"`
PIIFields []string `json:"pii_fields"`
FreshnessHours int `json:"freshness_hours"`
RetentionDays int `json:"retention_days"`
Schema map[string]string `json:"schema"`
LastUpdated time.Time `json:"last_updated"`
}
type DataGovernance struct {
catalog map[string]*CatalogEntry
lineage map[string][]string // target -> sources
}
func NewDataGovernance() *DataGovernance {
return &DataGovernance{
catalog: make(map[string]*CatalogEntry),
lineage: make(map[string][]string),
}
}
func (dg *DataGovernance) RegisterTable(entry *CatalogEntry) {
dg.catalog[entry.TableName] = entry
log.Printf("Registered table: %s", entry.TableName)
}
func (dg *DataGovernance) TrackLineage(target string, sources []string) {
dg.lineage[target] = sources
log.Printf("Tracked lineage: %v -> %s", sources, target)
}
func (dg *DataGovernance) ValidateFreshness(tableName string, lastUpdate time.Time) error {
entry, ok := dg.catalog[tableName]
if !ok {
log.Fatalf("Table not in catalog: %s", tableName)
}
age := time.Since(lastUpdate).Hours()
if age > float64(entry.FreshnessHours) {
log.Printf("Freshness SLA violated: %s is %.1f hours old (SLA: %d hours)",
tableName, age, entry.FreshnessHours)
return nil // In production, return error
}
return nil
}
func (dg *DataGovernance) CheckRetention(tableName string, recordAge time.Duration) bool {
entry := dg.catalog[tableName]
retentionDays := time.Duration(entry.RetentionDays) * 24 * time.Hour
if recordAge > retentionDays {
log.Printf("Record exceeds retention: %s > %d days", recordAge, entry.RetentionDays)
return false // Mark for deletion
}
return true
}
func (dg *DataGovernance) GetLineage(tableName string) []string {
return dg.lineage[tableName]
}
func main() {
dg := NewDataGovernance()
// Register tables
dg.RegisterTable(&CatalogEntry{
TableName: "transactions",
Owner: "finance@company.com",
Description: "Daily transaction log",
Tags: []string{"pii:email", "financial", "critical"},
PIIFields: []string{"user_id", "email"},
FreshnessHours: 4,
RetentionDays: 365,
Schema: map[string]string{
"user_id": "INTEGER",
"email": "VARCHAR(255)",
"amount": "DECIMAL(10,2)",
},
LastUpdated: time.Now(),
})
// Track lineage
dg.TrackLineage("transactions", []string{"raw_events", "users"})
// Check freshness
dg.ValidateFreshness("transactions", time.Now().Add(-2*time.Hour))
// Check retention
recordAge := 100 * 24 * time.Hour // 100 days old
dg.CheckRetention("transactions", recordAge)
// Get lineage
sources := dg.GetLineage("transactions")
log.Printf("Transaction lineage: %v", sources)
}
const pg = require('pg');
class DataCatalog {
constructor(dbConnectionString) {
this.client = new pg.Client(dbConnectionString);
this.catalog = {};
}
async registerTable(metadata) {
const {
tableName,
owner,
description,
tags,
piiFields,
freshnessHours,
retentionDays,
schema
} = metadata;
this.catalog[tableName] = {
tableName,
owner,
description,
tags,
piiFields,
freshnessHours,
retentionDays,
schema,
registeredAt: new Date().toISOString()
};
console.log(`Registered table: ${tableName}`);
}
async trackLineage(targetTable, sourceTables, transformation) {
// Store in database
await this.client.query(
`INSERT INTO lineage (target_table, source_tables, transformation, created_at)
VALUES ($1, $2, $3, NOW())`,
[targetTable, JSON.stringify(sourceTables), transformation]
);
}
async validateQuality(tableName, df) {
const schema = this.catalog[tableName].schema;
// Check required fields
for (const [column, type] of Object.entries(schema)) {
const hasNull = df.some(row => row[column] == null);
if (hasNull) {
console.error(`Quality check failed: NULL in ${tableName}.${column}`);
throw new Error(`Data quality violation in ${column}`);
}
}
return true;
}
async checkFreshness(tableName) {
const entry = this.catalog[tableName];
const { rows } = await this.client.query(
`SELECT MAX(updated_at) as last_update FROM ${tableName}`
);
const lastUpdate = rows[0].last_update;
const ageHours = (Date.now() - lastUpdate) / (1000 * 60 * 60);
if (ageHours > entry.freshnessHours) {
console.warn(
`Freshness SLA violated: ${tableName} is ${ageHours.toFixed(1)}h old (SLA: ${entry.freshnessHours}h)`
);
}
return ageHours <= entry.freshnessHours;
}
async enforceRetention(tableName) {
const entry = this.catalog[tableName];
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - entry.retentionDays);
await this.client.query(
`DELETE FROM ${tableName} WHERE created_at < $1`,
[cutoffDate]
);
console.log(`Retention enforced: deleted records older than ${cutoffDate.toISOString()}`);
}
async getCatalog(tableName) {
return this.catalog[tableName] || null;
}
}
// Usage
(async () => {
const catalog = new DataCatalog('postgresql://user:pwd@localhost/prod');
// Register table
await catalog.registerTable({
tableName: 'transactions',
owner: 'finance@company.com',
description: 'Daily transaction log',
tags: ['pii:email', 'financial', 'critical'],
piiFields: ['user_id', 'email'],
freshnessHours: 4,
retentionDays: 365,
schema: {
user_id: 'INTEGER',
email: 'VARCHAR(255)',
amount: 'DECIMAL(10,2)'
}
});
// Track lineage
await catalog.trackLineage(
'transactions',
['raw_events', 'users'],
'Clean, deduplicate, enrich'
);
// Validate freshness
const isFresh = await catalog.checkFreshness('transactions');
console.log('Freshness check:', isFresh);
// Enforce retention
await catalog.enforceRetention('transactions');
})();
Patterns and Pitfalls
Design Review Checklist
- Identified all critical datasets requiring governance
- Defined quality checks: schema, nulls, anomalies
- Implemented data quality framework (Great Expectations or custom)
- Mapped data lineage: sources → transforms → targets
- Built or adopted data catalog (tools: Collibra, Atlas, custom)
- Classified all PII fields; enforced access controls
- Defined data contracts (schema + SLA) with upstream producers
- Set up freshness, completeness, accuracy monitoring
- Established retention policies; automated deletion
- Created runbook for handling data quality failures
- Documented catalog: how to register, tag, search tables
- Regular audits: who accesses PII? Are contracts being met?
Next Steps
- Audit current data: Identify critical datasets
- Implement quality checks: Start with nulls, uniqueness
- Build simple catalog: Spreadsheet or tool (Collibra/Atlas)
- Track lineage: Manually initially, automate later (dbt)
- Classify PII: Tag sensitive fields
- Monitor freshness: Set SLA alerts
- Enforce contracts: Validate schema on ingestion
References
- Great Expectations: Data Quality Framework ↗️
- dbt Docs: Lineage & Transformation ↗️
- Apache Atlas: Data Governance ↗️