Event-Driven Architecture at Scale: Patterns and Implementation
Event-driven architecture has become the backbone of modern distributed systems. After implementing event-driven patterns in systems processing millions of events per second, I've learned that success lies not just in choosing the right technology, but in designing for eventual consistency, handling failures gracefully, and maintaining event schemas over time.
The Foundation of Event-Driven Systems
Understanding Events vs Messages
The distinction between events and messages is crucial for designing effective event-driven systems:
// Event - Something that happened in the past
interface OrderCreatedEvent {
eventId: string;
aggregateId: string;
eventType: 'OrderCreated';
timestamp: Date;
version: number;
data: {
orderId: string;
customerId: string;
items: OrderItem[];
totalAmount: number;
};
}
// Command - Intent to do something
interface CreateOrderCommand {
commandId: string;
customerId: string;
items: OrderItem[];
paymentMethod: string;
}
// Message - Communication between services
interface OrderProcessingMessage {
messageId: string;
correlationId: string;
messageType: 'ProcessOrder';
payload: {
orderId: string;
priority: 'high' | 'normal' | 'low';
};
metadata: {
source: string;
timestamp: Date;
retryCount: number;
};
}
Event Store Implementation
class EventStore {
private eventStreams: Map<string, DomainEvent[]> = new Map();
private eventBus: EventBus;
private snapshotStore: SnapshotStore;
constructor(eventBus: EventBus, snapshotStore: SnapshotStore) {
this.eventBus = eventBus;
this.snapshotStore = snapshotStore;
}
async appendEvents(
streamId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<void> {
// Optimistic concurrency control
const currentEvents = this.eventStreams.get(streamId) || [];
if (currentEvents.length !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but stream is at version ${currentEvents.length}`
);
}
// Validate events
await this.validateEvents(events);
// Append events atomically
const updatedEvents = [...currentEvents, ...events];
this.eventStreams.set(streamId, updatedEvents);
// Publish events to event bus
await this.publishEvents(events);
// Create snapshot if threshold reached
if (updatedEvents.length % 100 === 0) {
await this.createSnapshot(streamId, updatedEvents);
}
}
async getEvents(
streamId: string,
fromVersion?: number,
toVersion?: number
): Promise<DomainEvent[]> {
const events = this.eventStreams.get(streamId) || [];
const start = fromVersion || 0;
const end = toVersion || events.length;
return events.slice(start, end);
}
async getEventsFromSnapshot(streamId: string): Promise<{
events: DomainEvent[];
snapshotVersion: number;
}> {
const snapshot = await this.snapshotStore.getSnapshot(streamId);
if (snapshot) {
const eventsFromSnapshot = await this.getEvents(
streamId,
snapshot.version
);
return {
events: eventsFromSnapshot,
snapshotVersion: snapshot.version
};
}
return {
events: await this.getEvents(streamId),
snapshotVersion: 0
};
}
private async validateEvents(events: DomainEvent[]): Promise<void> {
for (const event of events) {
// Validate event schema
const isValid = await this.schemaRegistry.validate(
event.eventType,
event.data
);
if (!isValid) {
throw new InvalidEventError(`Invalid event schema for ${event.eventType}`);
}
// Check for duplicate events
if (await this.isDuplicateEvent(event)) {
throw new DuplicateEventError(`Duplicate event: ${event.eventId}`);
}
}
}
}
Apache Kafka Implementation Patterns
High-Throughput Producer Configuration
class HighThroughputKafkaProducer {
private producer: kafka.Producer;
private batchBuffer: Map<string, ProducerRecord[]> = new Map();
private batchTimer: NodeJS.Timeout | null = null;
constructor(private kafkaClient: kafka.Kafka, private config: ProducerConfig) {
this.producer = this.kafkaClient.producer({
// Optimize for throughput
maxInFlightRequests: 5,
idempotent: true,
transactionTimeout: 30000,
// Batching configuration
batchSize: 16384, // 16KB
lingerMs: 10, // Wait up to 10ms for batching
// Compression
compression: CompressionTypes.snappy,
// Retry configuration
retry: {
initialRetryTime: 100,
retries: 5,
maxRetryTime: 30000,
factor: 2,
multiplier: 2,
retryDelayOnFailover: 100
}
});
}
async publishEvent(topic: string, event: DomainEvent): Promise<void> {
const record: ProducerRecord = {
topic,
key: event.aggregateId,
value: JSON.stringify(event),
headers: {
eventType: event.eventType,
version: event.version.toString(),
timestamp: event.timestamp.toISOString()
},
partition: this.calculatePartition(event.aggregateId, topic)
};
// Add to batch buffer
if (!this.batchBuffer.has(topic)) {
this.batchBuffer.set(topic, []);
}
this.batchBuffer.get(topic)!.push(record);
// Flush if batch size reached
if (this.batchBuffer.get(topic)!.length >= this.config.batchSize) {
await this.flushBatch(topic);
} else {
// Set timer for batch flush
this.scheduleBatchFlush();
}
}
private async flushBatch(topic: string): Promise<void> {
const batch = this.batchBuffer.get(topic);
if (!batch || batch.length === 0) return;
try {
await this.producer.sendBatch({
topicMessages: [{
topic,
messages: batch.map(record => ({
key: record.key,
value: record.value,
headers: record.headers,
partition: record.partition
}))
}]
});
// Clear batch after successful send
this.batchBuffer.set(topic, []);
} catch (error) {
// Handle batch failure - could implement retry logic
console.error(`Failed to send batch for topic ${topic}:`, error);
throw error;
}
}
private calculatePartition(key: string, topic: string): number {
// Consistent hashing for partition assignment
const hash = this.hash(key);
const partitionCount = this.getPartitionCount(topic);
return hash % partitionCount;
}
private scheduleBatchFlush(): void {
if (this.batchTimer) return;
this.batchTimer = setTimeout(async () => {
for (const topic of this.batchBuffer.keys()) {
await this.flushBatch(topic);
}
this.batchTimer = null;
}, this.config.batchTimeoutMs);
}
}
Consumer Group with Error Handling
class ResilientKafkaConsumer {
private consumer: kafka.Consumer;
private deadLetterProducer: kafka.Producer;
private retryTopics: Map<string, string> = new Map();
constructor(
private kafkaClient: kafka.Kafka,
private groupId: string,
private config: ConsumerConfig
) {
this.consumer = this.kafkaClient.consumer({
groupId: this.groupId,
sessionTimeout: 30000,
rebalanceTimeout: 60000,
heartbeatInterval: 3000,
// Optimize for reliability
maxBytesPerPartition: 1048576, // 1MB
minBytes: 1,
maxBytes: 10485760, // 10MB
maxWaitTimeInMs: 5000,
// Auto-commit configuration
autoCommit: false, // Manual commit for better control
autoCommitInterval: 5000
});
this.deadLetterProducer = this.kafkaClient.producer();
this.setupRetryTopics();
}
async subscribe(topics: string[], messageHandler: MessageHandler): Promise<void> {
await this.consumer.subscribe({ topics, fromBeginning: false });
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const startTime = Date.now();
let processingResult: ProcessingResult;
try {
// Deserialize message
const event = this.deserializeMessage(message);
// Process message with timeout
processingResult = await Promise.race([
messageHandler(event),
this.createTimeoutPromise(this.config.processingTimeoutMs)
]);
// Commit offset on successful processing
if (processingResult.success) {
await this.consumer.commitOffsets([{
topic,
partition,
offset: (parseInt(message.offset) + 1).toString()
}]);
} else {
await this.handleProcessingFailure(
topic,
message,
processingResult.error
);
}
} catch (error) {
await this.handleProcessingFailure(topic, message, error);
} finally {
// Record processing metrics
this.recordProcessingMetrics(topic, Date.now() - startTime);
}
}
});
}
private async handleProcessingFailure(
topic: string,
message: kafka.KafkaMessage,
error: Error
): Promise<void> {
const retryCount = this.getRetryCount(message);
const maxRetries = this.config.maxRetries;
if (retryCount < maxRetries) {
// Send to retry topic
await this.sendToRetryTopic(topic, message, retryCount + 1);
} else {
// Send to dead letter queue
await this.sendToDeadLetterQueue(topic, message, error);
}
}
private async sendToRetryTopic(
originalTopic: string,
message: kafka.KafkaMessage,
retryCount: number
): Promise<void> {
const retryTopic = this.getRetryTopic(originalTopic);
const delayMs = this.calculateRetryDelay(retryCount);
await this.deadLetterProducer.send({
topic: retryTopic,
messages: [{
key: message.key,
value: message.value,
headers: {
...message.headers,
'retry-count': retryCount.toString(),
'original-topic': originalTopic,
'retry-after': (Date.now() + delayMs).toString()
}
}]
});
}
private async sendToDeadLetterQueue(
topic: string,
message: kafka.KafkaMessage,
error: Error
): Promise<void> {
const dlqTopic = `${topic}.dlq`;
await this.deadLetterProducer.send({
topic: dlqTopic,
messages: [{
key: message.key,
value: message.value,
headers: {
...message.headers,
'error-message': error.message,
'error-stack': error.stack || '',
'failed-at': new Date().toISOString(),
'original-topic': topic
}
}]
});
}
private calculateRetryDelay(retryCount: number): number {
// Exponential backoff with jitter
const baseDelay = 1000; // 1 second
const maxDelay = 300000; // 5 minutes
const exponentialDelay = baseDelay * Math.pow(2, retryCount - 1);
const jitter = Math.random() * 0.1 * exponentialDelay;
return Math.min(exponentialDelay + jitter, maxDelay);
}
}
Event Schema Evolution
Schema Registry Implementation
class EventSchemaRegistry {
private schemas: Map<string, SchemaVersion[]> = new Map();
private compatibilityChecker: CompatibilityChecker;
constructor() {
this.compatibilityChecker = new CompatibilityChecker();
}
async registerSchema(
eventType: string,
schema: JSONSchema,
version: number
): Promise<void> {
const existingVersions = this.schemas.get(eventType) || [];
// Check backward compatibility
if (existingVersions.length > 0) {
const latestVersion = existingVersions[existingVersions.length - 1];
const isCompatible = await this.compatibilityChecker.isBackwardCompatible(
latestVersion.schema,
schema
);
if (!isCompatible) {
throw new SchemaCompatibilityError(
`Schema version ${version} for ${eventType} is not backward compatible`
);
}
}
const schemaVersion: SchemaVersion = {
version,
schema,
registeredAt: new Date(),
deprecated: false
};
existingVersions.push(schemaVersion);
this.schemas.set(eventType, existingVersions);
}
async validateEvent(eventType: string, eventData: any): Promise<boolean> {
const versions = this.schemas.get(eventType);
if (!versions || versions.length === 0) {
throw new SchemaNotFoundError(`No schema found for event type: ${eventType}`);
}
// Try to validate against all versions (newest first)
for (const version of versions.reverse()) {
if (version.deprecated) continue;
const isValid = await this.validateAgainstSchema(version.schema, eventData);
if (isValid) {
return true;
}
}
return false;
}
async evolveSchema(
eventType: string,
newSchema: JSONSchema,
evolutionStrategy: EvolutionStrategy
): Promise<void> {
switch (evolutionStrategy) {
case EvolutionStrategy.BACKWARD_COMPATIBLE:
await this.addBackwardCompatibleFields(eventType, newSchema);
break;
case EvolutionStrategy.FORWARD_COMPATIBLE:
await this.addForwardCompatibleFields(eventType, newSchema);
break;
case EvolutionStrategy.FULL_COMPATIBLE:
await this.addFullCompatibleFields(eventType, newSchema);
break;
case EvolutionStrategy.BREAKING_CHANGE:
await this.handleBreakingChange(eventType, newSchema);
break;
}
}
private async handleBreakingChange(
eventType: string,
newSchema: JSONSchema
): Promise<void> {
// Create new event type with version suffix
const newEventType = `${eventType}_v2`;
// Register new schema
await this.registerSchema(newEventType, newSchema, 1);
// Mark old schema as deprecated
const oldVersions = this.schemas.get(eventType) || [];
oldVersions.forEach(version => {
version.deprecated = true;
version.deprecatedAt = new Date();
});
// Set up event transformation pipeline
await this.setupEventTransformation(eventType, newEventType);
}
}
Saga Pattern for Complex Workflows
Distributed Transaction Management
class SagaOrchestrator {
private sagaDefinitions: Map<string, SagaDefinition> = new Map();
private sagaInstances: Map<string, SagaInstance> = new Map();
private eventBus: EventBus;
constructor(eventBus: EventBus) {
this.eventBus = eventBus;
this.setupEventHandlers();
}
defineSaga(sagaType: string, definition: SagaDefinition): void {
this.sagaDefinitions.set(sagaType, definition);
}
async startSaga(sagaType: string, sagaId: string, initialData: any): Promise<void> {
const definition = this.sagaDefinitions.get(sagaType);
if (!definition) {
throw new Error(`Saga definition not found: ${sagaType}`);
}
const sagaInstance: SagaInstance = {
sagaId,
sagaType,
state: SagaState.STARTED,
currentStep: 0,
data: initialData,
executedSteps: [],
createdAt: new Date(),
updatedAt: new Date()
};
this.sagaInstances.set(sagaId, sagaInstance);
// Start executing the saga
await this.executeNextStep(sagaInstance);
}
private async executeNextStep(sagaInstance: SagaInstance): Promise<void> {
const definition = this.sagaDefinitions.get(sagaInstance.sagaType)!;
if (sagaInstance.currentStep >= definition.steps.length) {
await this.completeSaga(sagaInstance);
return;
}
const step = definition.steps[sagaInstance.currentStep];
sagaInstance.state = SagaState.EXECUTING;
sagaInstance.updatedAt = new Date();
try {
// Execute step command
const command = this.buildCommand(step, sagaInstance.data);
await this.eventBus.sendCommand(command);
// Wait for step completion event
await this.waitForStepCompletion(sagaInstance, step);
} catch (error) {
await this.handleStepFailure(sagaInstance, error);
}
}
private async handleStepCompletion(
sagaInstance: SagaInstance,
stepResult: StepResult
): Promise<void> {
const definition = this.sagaDefinitions.get(sagaInstance.sagaType)!;
const step = definition.steps[sagaInstance.currentStep];
// Update saga data with step result
sagaInstance.data = { ...sagaInstance.data, ...stepResult.data };
sagaInstance.executedSteps.push({
stepIndex: sagaInstance.currentStep,
stepName: step.name,
executedAt: new Date(),
result: stepResult
});
// Move to next step
sagaInstance.currentStep++;
sagaInstance.updatedAt = new Date();
// Continue with next step
await this.executeNextStep(sagaInstance);
}
private async handleStepFailure(
sagaInstance: SagaInstance,
error: Error
): Promise<void> {
sagaInstance.state = SagaState.COMPENSATING;
sagaInstance.updatedAt = new Date();
// Execute compensation in reverse order
for (let i = sagaInstance.executedSteps.length - 1; i >= 0; i--) {
const executedStep = sagaInstance.executedSteps[i];
const definition = this.sagaDefinitions.get(sagaInstance.sagaType)!;
const stepDefinition = definition.steps[executedStep.stepIndex];
if (stepDefinition.compensationCommand) {
try {
const compensationCommand = this.buildCommand(
{ ...stepDefinition, command: stepDefinition.compensationCommand },
sagaInstance.data
);
await this.eventBus.sendCommand(compensationCommand);
} catch (compensationError) {
// Log compensation failure but continue
console.error(
`Compensation failed for step ${stepDefinition.name}:`,
compensationError
);
}
}
}
sagaInstance.state = SagaState.COMPENSATED;
sagaInstance.updatedAt = new Date();
}
private async completeSaga(sagaInstance: SagaInstance): Promise<void> {
sagaInstance.state = SagaState.COMPLETED;
sagaInstance.completedAt = new Date();
sagaInstance.updatedAt = new Date();
// Publish saga completion event
const completionEvent: SagaCompletedEvent = {
eventId: generateId(),
sagaId: sagaInstance.sagaId,
sagaType: sagaInstance.sagaType,
eventType: 'SagaCompleted',
timestamp: new Date(),
data: sagaInstance.data
};
await this.eventBus.publishEvent(completionEvent);
}
}
Monitoring and Observability
Event Flow Tracing
class EventFlowTracer {
private traces: Map<string, EventTrace> = new Map();
private metricsCollector: MetricsCollector;
constructor(metricsCollector: MetricsCollector) {
this.metricsCollector = metricsCollector;
}
startTrace(correlationId: string, initiatingEvent: DomainEvent): void {
const trace: EventTrace = {
correlationId,
initiatingEvent,
events: [initiatingEvent],
startTime: Date.now(),
services: new Set([initiatingEvent.source]),
status: TraceStatus.ACTIVE
};
this.traces.set(correlationId, trace);
}
addEvent(correlationId: string, event: DomainEvent): void {
const trace = this.traces.get(correlationId);
if (!trace) return;
trace.events.push(event);
trace.services.add(event.source);
trace.lastEventTime = Date.now();
// Check for completion patterns
if (this.isTraceComplete(trace)) {
this.completeTrace(trace);
}
}
private completeTrace(trace: EventTrace): void {
trace.status = TraceStatus.COMPLETED;
trace.endTime = Date.now();
trace.duration = trace.endTime - trace.startTime;
// Record metrics
this.metricsCollector.recordEventFlowDuration(
trace.initiatingEvent.eventType,
trace.duration
);
this.metricsCollector.recordEventFlowHops(
trace.initiatingEvent.eventType,
trace.events.length
);
this.metricsCollector.recordEventFlowServices(
trace.initiatingEvent.eventType,
trace.services.size
);
// Generate flow visualization data
this.generateFlowVisualization(trace);
}
private generateFlowVisualization(trace: EventTrace): FlowVisualization {
const nodes = Array.from(trace.services).map(service => ({
id: service,
label: service,
type: 'service'
}));
const edges = [];
for (let i = 1; i < trace.events.length; i++) {
const prevEvent = trace.events[i - 1];
const currentEvent = trace.events[i];
edges.push({
source: prevEvent.source,
target: currentEvent.source,
label: currentEvent.eventType,
timestamp: currentEvent.timestamp
});
}
return { nodes, edges, metadata: trace };
}
}
Conclusion
Event-driven architecture at scale requires careful consideration of consistency models, failure handling, and schema evolution. The patterns shared here have been proven in production systems processing millions of events daily.
Key takeaways for 2025:
- Design for Eventual Consistency - Embrace the asynchronous nature
- Implement Comprehensive Error Handling - Plan for failures at every level
- Manage Schema Evolution - Maintain backward compatibility
- Monitor Event Flows - Observability is crucial for debugging
- Use Sagas for Complex Workflows - Coordinate distributed transactions
The future of distributed systems is event-driven, and mastering these patterns will be essential for building scalable, resilient applications.
These patterns have been refined through building event-driven systems handling millions of events per second across e-commerce, fintech, and IoT platforms.