Event Sourcing & Audit Trails¶
Event sourcing patterns in FraiseQL: entity change logs, temporal queries, audit trails, and CQRS with event-driven architectures.
Overview¶
Event sourcing stores all changes to application state as a sequence of events. FraiseQL supports event sourcing through entity change logs, Debezium-style before/after snapshots, and temporal query capabilities.
Key Patterns: - Entity Change Log as event store - Before/after snapshots (Debezium pattern) - Event replay capabilities - Temporal queries (state at timestamp) - Audit trail patterns - CQRS with event sourcing
Entity Change Log¶
Schema Design¶
Complete audit log capturing all entity changes:
CREATE SCHEMA IF NOT EXISTS audit;
CREATE TABLE audit.entity_change_log (
id BIGSERIAL PRIMARY KEY,
entity_type TEXT NOT NULL,
entity_id UUID NOT NULL,
operation TEXT NOT NULL CHECK (operation IN ('INSERT', 'UPDATE', 'DELETE')),
changed_by UUID, -- User who made the change
changed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
before_snapshot JSONB, -- State before change
after_snapshot JSONB, -- State after change
changed_fields JSONB, -- Only changed fields
metadata JSONB, -- Additional context
transaction_id BIGINT, -- Group related changes
correlation_id UUID, -- Trace across services
CONSTRAINT valid_snapshots CHECK (
(operation = 'INSERT' AND before_snapshot IS NULL) OR
(operation = 'DELETE' AND after_snapshot IS NULL) OR
(operation = 'UPDATE' AND before_snapshot IS NOT NULL AND after_snapshot IS NOT NULL)
)
);
-- Indexes for common queries
CREATE INDEX idx_entity_change_log_entity ON audit.entity_change_log(entity_type, entity_id, changed_at DESC);
CREATE INDEX idx_entity_change_log_user ON audit.entity_change_log(changed_by, changed_at DESC);
CREATE INDEX idx_entity_change_log_time ON audit.entity_change_log(changed_at DESC);
CREATE INDEX idx_entity_change_log_tx ON audit.entity_change_log(transaction_id);
CREATE INDEX idx_entity_change_log_correlation ON audit.entity_change_log(correlation_id);
-- GIN index for JSONB searches
CREATE INDEX idx_entity_change_log_before ON audit.entity_change_log USING GIN (before_snapshot);
CREATE INDEX idx_entity_change_log_after ON audit.entity_change_log USING GIN (after_snapshot);
Automatic Change Tracking¶
PostgreSQL trigger to automatically log changes:
CREATE OR REPLACE FUNCTION audit.log_entity_change()
RETURNS TRIGGER AS $$
DECLARE
v_changed_fields JSONB;
v_user_id UUID;
v_correlation_id UUID;
BEGIN
-- Extract user ID from session
v_user_id := NULLIF(current_setting('app.current_user_id', TRUE), '')::UUID;
v_correlation_id := NULLIF(current_setting('app.correlation_id', TRUE), '')::UUID;
-- Calculate changed fields for UPDATE
IF TG_OP = 'UPDATE' THEN
SELECT jsonb_object_agg(key, value)
INTO v_changed_fields
FROM jsonb_each(to_jsonb(NEW))
WHERE value IS DISTINCT FROM (to_jsonb(OLD) -> key);
END IF;
INSERT INTO audit.entity_change_log (
entity_type,
entity_id,
operation,
changed_by,
before_snapshot,
after_snapshot,
changed_fields,
transaction_id,
correlation_id
) VALUES (
TG_TABLE_SCHEMA || '.' || TG_TABLE_NAME,
CASE
WHEN TG_OP = 'DELETE' THEN OLD.id
ELSE NEW.id
END,
TG_OP,
v_user_id,
CASE
WHEN TG_OP IN ('UPDATE', 'DELETE') THEN to_jsonb(OLD)
ELSE NULL
END,
CASE
WHEN TG_OP IN ('INSERT', 'UPDATE') THEN to_jsonb(NEW)
ELSE NULL
END,
v_changed_fields,
txid_current(),
v_correlation_id
);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
-- Attach to tables
CREATE TRIGGER trg_orders_change_log
AFTER INSERT OR UPDATE OR DELETE ON orders.orders
FOR EACH ROW EXECUTE FUNCTION audit.log_entity_change();
CREATE TRIGGER trg_order_items_change_log
AFTER INSERT OR UPDATE OR DELETE ON orders.order_items
FOR EACH ROW EXECUTE FUNCTION audit.log_entity_change();
Change Log Repository¶
from dataclasses import dataclass
from datetime import datetime
from typing import Any
@dataclass
class EntityChange:
"""Entity change event."""
id: int
entity_type: str
entity_id: str
operation: str
changed_by: str | None
changed_at: datetime
before_snapshot: dict[str, Any] | None
after_snapshot: dict[str, Any] | None
changed_fields: dict[str, Any] | None
metadata: dict[str, Any] | None
transaction_id: int
correlation_id: str | None
class EntityChangeLogRepository:
"""Repository for entity change logs."""
def __init__(self, db_pool):
self.db = db_pool
async def get_entity_history(
self,
entity_type: str,
entity_id: str,
limit: int = 100
) -> list[EntityChange]:
"""Get complete history for an entity."""
async with self.db.connection() as conn:
result = await conn.execute("""
SELECT * FROM audit.entity_change_log
WHERE entity_type = $1 AND entity_id = $2
ORDER BY changed_at DESC
LIMIT $3
""", entity_type, entity_id, limit)
return [
EntityChange(**row)
for row in await result.fetchall()
]
async def get_changes_by_user(
self,
user_id: str,
limit: int = 100
) -> list[EntityChange]:
"""Get all changes made by a user."""
async with self.db.connection() as conn:
result = await conn.execute("""
SELECT * FROM audit.entity_change_log
WHERE changed_by = $1
ORDER BY changed_at DESC
LIMIT $2
""", user_id, limit)
return [EntityChange(**row) for row in await result.fetchall()]
async def get_changes_in_transaction(
self,
transaction_id: int
) -> list[EntityChange]:
"""Get all changes in a transaction."""
async with self.db.connection() as conn:
result = await conn.execute("""
SELECT * FROM audit.entity_change_log
WHERE transaction_id = $1
ORDER BY id
""", transaction_id)
return [EntityChange(**row) for row in await result.fetchall()]
async def get_entity_at_time(
self,
entity_type: str,
entity_id: str,
at_time: datetime
) -> dict[str, Any] | None:
"""Get entity state at specific point in time."""
async with self.db.connection() as conn:
result = await conn.execute("""
SELECT after_snapshot
FROM audit.entity_change_log
WHERE entity_type = $1
AND entity_id = $2
AND changed_at <= $3
AND operation != 'DELETE'
ORDER BY changed_at DESC
LIMIT 1
""", entity_type, entity_id, at_time)
row = await result.fetchone()
return row["after_snapshot"] if row else None
Before/After Snapshots¶
Debezium-style change data capture:
GraphQL Queries for Audit¶
import fraiseql
@fraiseql.type_
class EntityChange:
id: int
entity_type: str
entity_id: str
operation: str
changed_by: str | None
changed_at: datetime
before_snapshot: dict | None
after_snapshot: dict | None
changed_fields: dict | None
@fraiseql.query
async def get_order_history(info, order_id: str) -> list[EntityChange]:
"""Get complete audit trail for an order."""
repo = EntityChangeLogRepository(get_db_pool())
return await repo.get_entity_history("orders.orders", order_id)
@fraiseql.query
async def get_order_at_time(info, order_id: str, at_time: datetime) -> dict | None:
"""Get order state at specific point in time."""
repo = EntityChangeLogRepository(get_db_pool())
return await repo.get_entity_at_time("orders.orders", order_id, at_time)
@fraiseql.query
async def get_user_activity(info, user_id: str, limit: int = 50) -> list[EntityChange]:
"""Get all changes made by a user."""
repo = EntityChangeLogRepository(get_db_pool())
return await repo.get_changes_by_user(user_id, limit)
Event Replay¶
Rebuild entity state from event log:
from datetime import datetime
from decimal import Decimal
from fraiseql.types import ID
class OrderEventReplayer:
"""Replay order events to rebuild state."""
@staticmethod
async def replay_to_state(
entity_id: str,
up_to_time: datetime | None = None
) -> dict:
"""Replay events to rebuild order state."""
repo = EntityChangeLogRepository(get_db_pool())
async with repo.db.connection() as conn:
query = """
SELECT operation, after_snapshot, changed_at
FROM audit.entity_change_log
WHERE entity_type = 'orders.orders'
AND entity_id = $1
"""
params = [entity_id]
if up_to_time:
query += " AND changed_at <= $2"
params.append(up_to_time)
query += " ORDER BY changed_at ASC"
result = await conn.execute(query, *params)
events = await result.fetchall()
if not events:
return None
# Start with first event (INSERT)
state = dict(events[0]["after_snapshot"])
# Apply subsequent changes
for event in events[1:]:
if event["operation"] == "UPDATE":
state.update(event["after_snapshot"])
elif event["operation"] == "DELETE":
return None # Entity deleted
return state
@staticmethod
async def rebuild_aggregate(entity_id: str) -> Order:
"""Rebuild complete Order aggregate from events."""
state = await OrderEventReplayer.replay_to_state(entity_id)
if not state:
return None
# Rebuild Order object
order = Order(
id=state["id"],
customer_id=state["customer_id"],
total=Decimal(str(state["total"])),
status=state["status"],
created_at=state["created_at"],
updated_at=state["updated_at"]
)
# Rebuild order items from their change logs
items_repo = EntityChangeLogRepository(get_db_pool())
async with items_repo.db.connection() as conn:
result = await conn.execute("""
SELECT DISTINCT entity_id
FROM audit.entity_change_log
WHERE entity_type = 'orders.order_items'
AND (after_snapshot->>'order_id')::UUID = $1
""", entity_id)
item_ids = [row["entity_id"] for row in await result.fetchall()]
for item_id in item_ids:
item_state = await OrderEventReplayer.replay_to_state(item_id)
if item_state: # Not deleted
order.items.append(OrderItem(**item_state))
return order
Temporal Queries¶
Query entity state at any point in time:
import fraiseql
@fraiseql.query
async def get_order_timeline(
info,
order_id: str,
from_time: datetime,
to_time: datetime
) -> list[dict]:
"""Get order state snapshots over time."""
repo = EntityChangeLogRepository(get_db_pool())
async with repo.db.connection() as conn:
result = await conn.execute("""
SELECT
changed_at,
operation,
after_snapshot,
changed_by
FROM audit.entity_change_log
WHERE entity_type = 'orders.orders'
AND entity_id = $1
AND changed_at BETWEEN $2 AND $3
ORDER BY changed_at ASC
""", order_id, from_time, to_time)
return [dict(row) for row in await result.fetchall()]
@fraiseql.query
async def compare_states(
info,
order_id: str,
time1: datetime,
time2: datetime
) -> dict:
"""Compare order state at two different times."""
repo = EntityChangeLogRepository(get_db_pool())
state1 = await repo.get_entity_at_time("orders.orders", order_id, time1)
state2 = await repo.get_entity_at_time("orders.orders", order_id, time2)
# Calculate diff
changes = {}
all_keys = set(state1.keys()) | set(state2.keys())
for key in all_keys:
val1 = state1.get(key)
val2 = state2.get(key)
if val1 != val2:
changes[key] = {"from": val1, "to": val2}
return {
"state_at_time1": state1,
"state_at_time2": state2,
"changes": changes
}
Audit Trails¶
Complete Audit Dashboard¶
import fraiseql
@fraiseql.type_
class AuditSummary:
total_changes: int
changes_by_operation: dict[str, int]
changes_by_user: dict[str, int]
recent_changes: list[EntityChange]
@fraiseql.query
@requires_role("auditor")
async def get_audit_summary(
info,
entity_type: str | None = None,
from_time: datetime | None = None,
to_time: datetime | None = None
) -> AuditSummary:
"""Get comprehensive audit summary."""
async with get_db_pool().connection() as conn:
# Total changes
result = await conn.execute("""
SELECT COUNT(*) as total
FROM audit.entity_change_log
WHERE ($1::TEXT IS NULL OR entity_type = $1)
AND ($2::TIMESTAMPTZ IS NULL OR changed_at >= $2)
AND ($3::TIMESTAMPTZ IS NULL OR changed_at <= $3)
""", entity_type, from_time, to_time)
total = (await result.fetchone())["total"]
# By operation
result = await conn.execute("""
SELECT operation, COUNT(*) as count
FROM audit.entity_change_log
WHERE ($1::TEXT IS NULL OR entity_type = $1)
AND ($2::TIMESTAMPTZ IS NULL OR changed_at >= $2)
AND ($3::TIMESTAMPTZ IS NULL OR changed_at <= $3)
GROUP BY operation
""", entity_type, from_time, to_time)
by_operation = {row["operation"]: row["count"] for row in await result.fetchall()}
# By user
result = await conn.execute("""
SELECT changed_by::TEXT, COUNT(*) as count
FROM audit.entity_change_log
WHERE changed_by IS NOT NULL
AND ($1::TEXT IS NULL OR entity_type = $1)
AND ($2::TIMESTAMPTZ IS NULL OR changed_at >= $2)
AND ($3::TIMESTAMPTZ IS NULL OR changed_at <= $3)
GROUP BY changed_by
ORDER BY count DESC
LIMIT 10
""", entity_type, from_time, to_time)
by_user = {row["changed_by"]: row["count"] for row in await result.fetchall()}
# Recent changes
result = await conn.execute("""
SELECT * FROM audit.entity_change_log
WHERE ($1::TEXT IS NULL OR entity_type = $1)
AND ($2::TIMESTAMPTZ IS NULL OR changed_at >= $2)
AND ($3::TIMESTAMPTZ IS NULL OR changed_at <= $3)
ORDER BY changed_at DESC
LIMIT 50
""", entity_type, from_time, to_time)
recent = [EntityChange(**row) for row in await result.fetchall()]
return AuditSummary(
total_changes=total,
changes_by_operation=by_operation,
changes_by_user=by_user,
recent_changes=recent
)
CQRS Pattern¶
CQRS (Command Query Responsibility Segregation) separates read and write models using event sourcing:
# Write Model (Command Side)
class OrderCommandHandler:
"""Handle order commands, generate events."""
async def create_order(self, customer_id: str) -> str:
"""Create order - generates OrderCreated event."""
order_id = str(uuid4())
async with get_db_pool().connection() as conn:
await conn.execute("""
INSERT INTO orders.orders (id, customer_id, total, status)
VALUES ($1, $2, 0, 'draft')
""", order_id, customer_id)
# Event automatically logged via trigger
return order_id
async def add_item(self, order_id: str, product_id: str, quantity: int, price: Decimal):
"""Add item - generates ItemAdded event."""
async with get_db_pool().connection() as conn:
await conn.execute("""
INSERT INTO orders.order_items (id, order_id, product_id, quantity, price, total)
VALUES ($1, $2, $3, $4, $5, $6)
""", str(uuid4()), order_id, product_id, quantity, price, price * quantity)
# Update order total
await conn.execute("""
UPDATE orders.orders
SET total = (
SELECT SUM(total) FROM orders.order_items WHERE order_id = $1
)
WHERE id = $1
""", order_id)
# Read Model (Query Side)
class OrderQueryModel:
"""Optimized read model for order queries."""
async def get_order_summary(self, order_id: str) -> dict:
"""Get denormalized order summary."""
async with get_db_pool().connection() as conn:
result = await conn.execute("""
SELECT
o.id,
o.customer_id,
o.total,
o.status,
o.created_at,
COUNT(oi.id) as item_count,
json_agg(
json_build_object(
'product_id', oi.product_id,
'quantity', oi.quantity,
'price', oi.price
)
) as items
FROM orders.orders o
LEFT JOIN orders.order_items oi ON oi.order_id = o.id
WHERE o.id = $1
GROUP BY o.id
""", order_id)
return dict(await result.fetchone())
Event Versioning¶
Handle event schema evolution:
@dataclass
class VersionedEvent:
"""Event with schema version."""
version: int
event_type: str
payload: dict
class EventUpgrader:
"""Upgrade old event schemas to current version."""
@staticmethod
def upgrade_order_created(event: dict, from_version: int) -> dict:
"""Upgrade OrderCreated event schema."""
if from_version == 1:
# v1 -> v2: Added customer_email
event["customer_email"] = None
from_version = 2
if from_version == 2:
# v2 -> v3: Added shipping_address
event["shipping_address"] = None
from_version = 3
return event
@staticmethod
def upgrade_event(event: EntityChange) -> dict:
"""Upgrade event to current schema version."""
current_version = 3
event_version = event.metadata.get("schema_version", 1) if event.metadata else 1
if event_version == current_version:
return event.after_snapshot
# Apply upgrades
upgraded = dict(event.after_snapshot)
if "OrderCreated" in event.entity_type:
upgraded = EventUpgrader.upgrade_order_created(upgraded, event_version)
return upgraded
Performance Optimization¶
Partitioning¶
Partition audit logs by time for better performance:
-- Partition by month
CREATE TABLE audit.entity_change_log (
id BIGSERIAL,
entity_type TEXT NOT NULL,
entity_id UUID NOT NULL,
changed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- ... other fields
) PARTITION BY RANGE (changed_at);
-- Create monthly partitions
CREATE TABLE audit.entity_change_log_2024_01 PARTITION OF audit.entity_change_log
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE audit.entity_change_log_2024_02 PARTITION OF audit.entity_change_log
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- Auto-create partitions
CREATE OR REPLACE FUNCTION audit.create_monthly_partition(target_date DATE)
RETURNS VOID AS $$
DECLARE
partition_name TEXT;
start_date DATE;
end_date DATE;
BEGIN
start_date := DATE_TRUNC('month', target_date);
end_date := start_date + INTERVAL '1 month';
partition_name := 'entity_change_log_' || TO_CHAR(start_date, 'YYYY_MM');
EXECUTE format(
'CREATE TABLE IF NOT EXISTS audit.%I PARTITION OF audit.entity_change_log FOR VALUES FROM (%L) TO (%L)',
partition_name, start_date, end_date
);
END;
$$ LANGUAGE plpgsql;
Snapshot Strategy¶
Periodically snapshot aggregates to avoid full replay:
CREATE TABLE audit.entity_snapshots (
entity_type TEXT NOT NULL,
entity_id UUID NOT NULL,
snapshot_at TIMESTAMPTZ NOT NULL,
snapshot_data JSONB NOT NULL,
last_change_id BIGINT NOT NULL,
PRIMARY KEY (entity_type, entity_id, snapshot_at)
);
-- Create snapshot
INSERT INTO audit.entity_snapshots (entity_type, entity_id, snapshot_at, snapshot_data, last_change_id)
SELECT
entity_type,
entity_id,
NOW(),
after_snapshot,
id
FROM audit.entity_change_log
WHERE entity_type = 'orders.orders'
AND entity_id = '...'
AND operation != 'DELETE'
ORDER BY changed_at DESC
LIMIT 1;
Next Steps¶
- Bounded Contexts - Event-driven context integration
- CQRS - Command Query Responsibility Segregation
- Monitoring - Event sourcing metrics
- Performance - Audit log optimization