Event-Driven Architecture at Scale: Patterns and Implementation

Event-driven architecture has become the backbone of modern distributed systems. After implementing event-driven patterns in systems processing millions of events per second, I've learned that success lies not just in choosing the right technology, but in designing for eventual consistency, handling failures gracefully, and maintaining event schemas over time.

The Foundation of Event-Driven Systems

Understanding Events vs Messages

The distinction between events and messages is crucial for designing effective event-driven systems:

// Event - Something that happened in the past
interface OrderCreatedEvent {
  eventId: string;
  aggregateId: string;
  eventType: 'OrderCreated';
  timestamp: Date;
  version: number;
  data: {
    orderId: string;
    customerId: string;
    items: OrderItem[];
    totalAmount: number;
  };
}

// Command - Intent to do something
interface CreateOrderCommand {
  commandId: string;
  customerId: string;
  items: OrderItem[];
  paymentMethod: string;
}

// Message - Communication between services
interface OrderProcessingMessage {
  messageId: string;
  correlationId: string;
  messageType: 'ProcessOrder';
  payload: {
    orderId: string;
    priority: 'high' | 'normal' | 'low';
  };
  metadata: {
    source: string;
    timestamp: Date;
    retryCount: number;
  };
}

Event Store Implementation

class EventStore {
  private eventStreams: Map<string, DomainEvent[]> = new Map();
  private eventBus: EventBus;
  private snapshotStore: SnapshotStore;
  
  constructor(eventBus: EventBus, snapshotStore: SnapshotStore) {
    this.eventBus = eventBus;
    this.snapshotStore = snapshotStore;
  }
  
  async appendEvents(
    streamId: string,
    events: DomainEvent[],
    expectedVersion: number
  ): Promise<void> {
    // Optimistic concurrency control
    const currentEvents = this.eventStreams.get(streamId) || [];
    if (currentEvents.length !== expectedVersion) {
      throw new ConcurrencyError(
        `Expected version ${expectedVersion}, but stream is at version ${currentEvents.length}`
      );
    }
    
    // Validate events
    await this.validateEvents(events);
    
    // Append events atomically
    const updatedEvents = [...currentEvents, ...events];
    this.eventStreams.set(streamId, updatedEvents);
    
    // Publish events to event bus
    await this.publishEvents(events);
    
    // Create snapshot if threshold reached
    if (updatedEvents.length % 100 === 0) {
      await this.createSnapshot(streamId, updatedEvents);
    }
  }
  
  async getEvents(
    streamId: string,
    fromVersion?: number,
    toVersion?: number
  ): Promise<DomainEvent[]> {
    const events = this.eventStreams.get(streamId) || [];
    const start = fromVersion || 0;
    const end = toVersion || events.length;
    
    return events.slice(start, end);
  }
  
  async getEventsFromSnapshot(streamId: string): Promise<{
    events: DomainEvent[];
    snapshotVersion: number;
  }> {
    const snapshot = await this.snapshotStore.getSnapshot(streamId);
    
    if (snapshot) {
      const eventsFromSnapshot = await this.getEvents(
        streamId,
        snapshot.version
      );
      return {
        events: eventsFromSnapshot,
        snapshotVersion: snapshot.version
      };
    }
    
    return {
      events: await this.getEvents(streamId),
      snapshotVersion: 0
    };
  }
  
  private async validateEvents(events: DomainEvent[]): Promise<void> {
    for (const event of events) {
      // Validate event schema
      const isValid = await this.schemaRegistry.validate(
        event.eventType,
        event.data
      );
      
      if (!isValid) {
        throw new InvalidEventError(`Invalid event schema for ${event.eventType}`);
      }
      
      // Check for duplicate events
      if (await this.isDuplicateEvent(event)) {
        throw new DuplicateEventError(`Duplicate event: ${event.eventId}`);
      }
    }
  }
}

Apache Kafka Implementation Patterns

High-Throughput Producer Configuration

class HighThroughputKafkaProducer {
  private producer: kafka.Producer;
  private batchBuffer: Map<string, ProducerRecord[]> = new Map();
  private batchTimer: NodeJS.Timeout | null = null;
  
  constructor(private kafkaClient: kafka.Kafka, private config: ProducerConfig) {
    this.producer = this.kafkaClient.producer({
      // Optimize for throughput
      maxInFlightRequests: 5,
      idempotent: true,
      transactionTimeout: 30000,
      
      // Batching configuration
      batchSize: 16384, // 16KB
      lingerMs: 10, // Wait up to 10ms for batching
      
      // Compression
      compression: CompressionTypes.snappy,
      
      // Retry configuration
      retry: {
        initialRetryTime: 100,
        retries: 5,
        maxRetryTime: 30000,
        factor: 2,
        multiplier: 2,
        retryDelayOnFailover: 100
      }
    });
  }
  
  async publishEvent(topic: string, event: DomainEvent): Promise<void> {
    const record: ProducerRecord = {
      topic,
      key: event.aggregateId,
      value: JSON.stringify(event),
      headers: {
        eventType: event.eventType,
        version: event.version.toString(),
        timestamp: event.timestamp.toISOString()
      },
      partition: this.calculatePartition(event.aggregateId, topic)
    };
    
    // Add to batch buffer
    if (!this.batchBuffer.has(topic)) {
      this.batchBuffer.set(topic, []);
    }
    
    this.batchBuffer.get(topic)!.push(record);
    
    // Flush if batch size reached
    if (this.batchBuffer.get(topic)!.length >= this.config.batchSize) {
      await this.flushBatch(topic);
    } else {
      // Set timer for batch flush
      this.scheduleBatchFlush();
    }
  }
  
  private async flushBatch(topic: string): Promise<void> {
    const batch = this.batchBuffer.get(topic);
    if (!batch || batch.length === 0) return;
    
    try {
      await this.producer.sendBatch({
        topicMessages: [{
          topic,
          messages: batch.map(record => ({
            key: record.key,
            value: record.value,
            headers: record.headers,
            partition: record.partition
          }))
        }]
      });
      
      // Clear batch after successful send
      this.batchBuffer.set(topic, []);
      
    } catch (error) {
      // Handle batch failure - could implement retry logic
      console.error(`Failed to send batch for topic ${topic}:`, error);
      throw error;
    }
  }
  
  private calculatePartition(key: string, topic: string): number {
    // Consistent hashing for partition assignment
    const hash = this.hash(key);
    const partitionCount = this.getPartitionCount(topic);
    return hash % partitionCount;
  }
  
  private scheduleBatchFlush(): void {
    if (this.batchTimer) return;
    
    this.batchTimer = setTimeout(async () => {
      for (const topic of this.batchBuffer.keys()) {
        await this.flushBatch(topic);
      }
      this.batchTimer = null;
    }, this.config.batchTimeoutMs);
  }
}

Consumer Group with Error Handling

class ResilientKafkaConsumer {
  private consumer: kafka.Consumer;
  private deadLetterProducer: kafka.Producer;
  private retryTopics: Map<string, string> = new Map();
  
  constructor(
    private kafkaClient: kafka.Kafka,
    private groupId: string,
    private config: ConsumerConfig
  ) {
    this.consumer = this.kafkaClient.consumer({
      groupId: this.groupId,
      sessionTimeout: 30000,
      rebalanceTimeout: 60000,
      heartbeatInterval: 3000,
      
      // Optimize for reliability
      maxBytesPerPartition: 1048576, // 1MB
      minBytes: 1,
      maxBytes: 10485760, // 10MB
      maxWaitTimeInMs: 5000,
      
      // Auto-commit configuration
      autoCommit: false, // Manual commit for better control
      autoCommitInterval: 5000
    });
    
    this.deadLetterProducer = this.kafkaClient.producer();
    this.setupRetryTopics();
  }
  
  async subscribe(topics: string[], messageHandler: MessageHandler): Promise<void> {
    await this.consumer.subscribe({ topics, fromBeginning: false });
    
    await this.consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        const startTime = Date.now();
        let processingResult: ProcessingResult;
        
        try {
          // Deserialize message
          const event = this.deserializeMessage(message);
          
          // Process message with timeout
          processingResult = await Promise.race([
            messageHandler(event),
            this.createTimeoutPromise(this.config.processingTimeoutMs)
          ]);
          
          // Commit offset on successful processing
          if (processingResult.success) {
            await this.consumer.commitOffsets([{
              topic,
              partition,
              offset: (parseInt(message.offset) + 1).toString()
            }]);
          } else {
            await this.handleProcessingFailure(
              topic,
              message,
              processingResult.error
            );
          }
          
        } catch (error) {
          await this.handleProcessingFailure(topic, message, error);
        } finally {
          // Record processing metrics
          this.recordProcessingMetrics(topic, Date.now() - startTime);
        }
      }
    });
  }
  
  private async handleProcessingFailure(
    topic: string,
    message: kafka.KafkaMessage,
    error: Error
  ): Promise<void> {
    const retryCount = this.getRetryCount(message);
    const maxRetries = this.config.maxRetries;
    
    if (retryCount < maxRetries) {
      // Send to retry topic
      await this.sendToRetryTopic(topic, message, retryCount + 1);
    } else {
      // Send to dead letter queue
      await this.sendToDeadLetterQueue(topic, message, error);
    }
  }
  
  private async sendToRetryTopic(
    originalTopic: string,
    message: kafka.KafkaMessage,
    retryCount: number
  ): Promise<void> {
    const retryTopic = this.getRetryTopic(originalTopic);
    const delayMs = this.calculateRetryDelay(retryCount);
    
    await this.deadLetterProducer.send({
      topic: retryTopic,
      messages: [{
        key: message.key,
        value: message.value,
        headers: {
          ...message.headers,
          'retry-count': retryCount.toString(),
          'original-topic': originalTopic,
          'retry-after': (Date.now() + delayMs).toString()
        }
      }]
    });
  }
  
  private async sendToDeadLetterQueue(
    topic: string,
    message: kafka.KafkaMessage,
    error: Error
  ): Promise<void> {
    const dlqTopic = `${topic}.dlq`;
    
    await this.deadLetterProducer.send({
      topic: dlqTopic,
      messages: [{
        key: message.key,
        value: message.value,
        headers: {
          ...message.headers,
          'error-message': error.message,
          'error-stack': error.stack || '',
          'failed-at': new Date().toISOString(),
          'original-topic': topic
        }
      }]
    });
  }
  
  private calculateRetryDelay(retryCount: number): number {
    // Exponential backoff with jitter
    const baseDelay = 1000; // 1 second
    const maxDelay = 300000; // 5 minutes
    const exponentialDelay = baseDelay * Math.pow(2, retryCount - 1);
    const jitter = Math.random() * 0.1 * exponentialDelay;
    
    return Math.min(exponentialDelay + jitter, maxDelay);
  }
}

Event Schema Evolution

Schema Registry Implementation

class EventSchemaRegistry {
  private schemas: Map<string, SchemaVersion[]> = new Map();
  private compatibilityChecker: CompatibilityChecker;
  
  constructor() {
    this.compatibilityChecker = new CompatibilityChecker();
  }
  
  async registerSchema(
    eventType: string,
    schema: JSONSchema,
    version: number
  ): Promise<void> {
    const existingVersions = this.schemas.get(eventType) || [];
    
    // Check backward compatibility
    if (existingVersions.length > 0) {
      const latestVersion = existingVersions[existingVersions.length - 1];
      const isCompatible = await this.compatibilityChecker.isBackwardCompatible(
        latestVersion.schema,
        schema
      );
      
      if (!isCompatible) {
        throw new SchemaCompatibilityError(
          `Schema version ${version} for ${eventType} is not backward compatible`
        );
      }
    }
    
    const schemaVersion: SchemaVersion = {
      version,
      schema,
      registeredAt: new Date(),
      deprecated: false
    };
    
    existingVersions.push(schemaVersion);
    this.schemas.set(eventType, existingVersions);
  }
  
  async validateEvent(eventType: string, eventData: any): Promise<boolean> {
    const versions = this.schemas.get(eventType);
    if (!versions || versions.length === 0) {
      throw new SchemaNotFoundError(`No schema found for event type: ${eventType}`);
    }
    
    // Try to validate against all versions (newest first)
    for (const version of versions.reverse()) {
      if (version.deprecated) continue;
      
      const isValid = await this.validateAgainstSchema(version.schema, eventData);
      if (isValid) {
        return true;
      }
    }
    
    return false;
  }
  
  async evolveSchema(
    eventType: string,
    newSchema: JSONSchema,
    evolutionStrategy: EvolutionStrategy
  ): Promise<void> {
    switch (evolutionStrategy) {
      case EvolutionStrategy.BACKWARD_COMPATIBLE:
        await this.addBackwardCompatibleFields(eventType, newSchema);
        break;
        
      case EvolutionStrategy.FORWARD_COMPATIBLE:
        await this.addForwardCompatibleFields(eventType, newSchema);
        break;
        
      case EvolutionStrategy.FULL_COMPATIBLE:
        await this.addFullCompatibleFields(eventType, newSchema);
        break;
        
      case EvolutionStrategy.BREAKING_CHANGE:
        await this.handleBreakingChange(eventType, newSchema);
        break;
    }
  }
  
  private async handleBreakingChange(
    eventType: string,
    newSchema: JSONSchema
  ): Promise<void> {
    // Create new event type with version suffix
    const newEventType = `${eventType}_v2`;
    
    // Register new schema
    await this.registerSchema(newEventType, newSchema, 1);
    
    // Mark old schema as deprecated
    const oldVersions = this.schemas.get(eventType) || [];
    oldVersions.forEach(version => {
      version.deprecated = true;
      version.deprecatedAt = new Date();
    });
    
    // Set up event transformation pipeline
    await this.setupEventTransformation(eventType, newEventType);
  }
}

Saga Pattern for Complex Workflows

Distributed Transaction Management

class SagaOrchestrator {
  private sagaDefinitions: Map<string, SagaDefinition> = new Map();
  private sagaInstances: Map<string, SagaInstance> = new Map();
  private eventBus: EventBus;
  
  constructor(eventBus: EventBus) {
    this.eventBus = eventBus;
    this.setupEventHandlers();
  }
  
  defineSaga(sagaType: string, definition: SagaDefinition): void {
    this.sagaDefinitions.set(sagaType, definition);
  }
  
  async startSaga(sagaType: string, sagaId: string, initialData: any): Promise<void> {
    const definition = this.sagaDefinitions.get(sagaType);
    if (!definition) {
      throw new Error(`Saga definition not found: ${sagaType}`);
    }
    
    const sagaInstance: SagaInstance = {
      sagaId,
      sagaType,
      state: SagaState.STARTED,
      currentStep: 0,
      data: initialData,
      executedSteps: [],
      createdAt: new Date(),
      updatedAt: new Date()
    };
    
    this.sagaInstances.set(sagaId, sagaInstance);
    
    // Start executing the saga
    await this.executeNextStep(sagaInstance);
  }
  
  private async executeNextStep(sagaInstance: SagaInstance): Promise<void> {
    const definition = this.sagaDefinitions.get(sagaInstance.sagaType)!;
    
    if (sagaInstance.currentStep >= definition.steps.length) {
      await this.completeSaga(sagaInstance);
      return;
    }
    
    const step = definition.steps[sagaInstance.currentStep];
    sagaInstance.state = SagaState.EXECUTING;
    sagaInstance.updatedAt = new Date();
    
    try {
      // Execute step command
      const command = this.buildCommand(step, sagaInstance.data);
      await this.eventBus.sendCommand(command);
      
      // Wait for step completion event
      await this.waitForStepCompletion(sagaInstance, step);
      
    } catch (error) {
      await this.handleStepFailure(sagaInstance, error);
    }
  }
  
  private async handleStepCompletion(
    sagaInstance: SagaInstance,
    stepResult: StepResult
  ): Promise<void> {
    const definition = this.sagaDefinitions.get(sagaInstance.sagaType)!;
    const step = definition.steps[sagaInstance.currentStep];
    
    // Update saga data with step result
    sagaInstance.data = { ...sagaInstance.data, ...stepResult.data };
    sagaInstance.executedSteps.push({
      stepIndex: sagaInstance.currentStep,
      stepName: step.name,
      executedAt: new Date(),
      result: stepResult
    });
    
    // Move to next step
    sagaInstance.currentStep++;
    sagaInstance.updatedAt = new Date();
    
    // Continue with next step
    await this.executeNextStep(sagaInstance);
  }
  
  private async handleStepFailure(
    sagaInstance: SagaInstance,
    error: Error
  ): Promise<void> {
    sagaInstance.state = SagaState.COMPENSATING;
    sagaInstance.updatedAt = new Date();
    
    // Execute compensation in reverse order
    for (let i = sagaInstance.executedSteps.length - 1; i >= 0; i--) {
      const executedStep = sagaInstance.executedSteps[i];
      const definition = this.sagaDefinitions.get(sagaInstance.sagaType)!;
      const stepDefinition = definition.steps[executedStep.stepIndex];
      
      if (stepDefinition.compensationCommand) {
        try {
          const compensationCommand = this.buildCommand(
            { ...stepDefinition, command: stepDefinition.compensationCommand },
            sagaInstance.data
          );
          
          await this.eventBus.sendCommand(compensationCommand);
          
        } catch (compensationError) {
          // Log compensation failure but continue
          console.error(
            `Compensation failed for step ${stepDefinition.name}:`,
            compensationError
          );
        }
      }
    }
    
    sagaInstance.state = SagaState.COMPENSATED;
    sagaInstance.updatedAt = new Date();
  }
  
  private async completeSaga(sagaInstance: SagaInstance): Promise<void> {
    sagaInstance.state = SagaState.COMPLETED;
    sagaInstance.completedAt = new Date();
    sagaInstance.updatedAt = new Date();
    
    // Publish saga completion event
    const completionEvent: SagaCompletedEvent = {
      eventId: generateId(),
      sagaId: sagaInstance.sagaId,
      sagaType: sagaInstance.sagaType,
      eventType: 'SagaCompleted',
      timestamp: new Date(),
      data: sagaInstance.data
    };
    
    await this.eventBus.publishEvent(completionEvent);
  }
}

Monitoring and Observability

Event Flow Tracing

class EventFlowTracer {
  private traces: Map<string, EventTrace> = new Map();
  private metricsCollector: MetricsCollector;
  
  constructor(metricsCollector: MetricsCollector) {
    this.metricsCollector = metricsCollector;
  }
  
  startTrace(correlationId: string, initiatingEvent: DomainEvent): void {
    const trace: EventTrace = {
      correlationId,
      initiatingEvent,
      events: [initiatingEvent],
      startTime: Date.now(),
      services: new Set([initiatingEvent.source]),
      status: TraceStatus.ACTIVE
    };
    
    this.traces.set(correlationId, trace);
  }
  
  addEvent(correlationId: string, event: DomainEvent): void {
    const trace = this.traces.get(correlationId);
    if (!trace) return;
    
    trace.events.push(event);
    trace.services.add(event.source);
    trace.lastEventTime = Date.now();
    
    // Check for completion patterns
    if (this.isTraceComplete(trace)) {
      this.completeTrace(trace);
    }
  }
  
  private completeTrace(trace: EventTrace): void {
    trace.status = TraceStatus.COMPLETED;
    trace.endTime = Date.now();
    trace.duration = trace.endTime - trace.startTime;
    
    // Record metrics
    this.metricsCollector.recordEventFlowDuration(
      trace.initiatingEvent.eventType,
      trace.duration
    );
    
    this.metricsCollector.recordEventFlowHops(
      trace.initiatingEvent.eventType,
      trace.events.length
    );
    
    this.metricsCollector.recordEventFlowServices(
      trace.initiatingEvent.eventType,
      trace.services.size
    );
    
    // Generate flow visualization data
    this.generateFlowVisualization(trace);
  }
  
  private generateFlowVisualization(trace: EventTrace): FlowVisualization {
    const nodes = Array.from(trace.services).map(service => ({
      id: service,
      label: service,
      type: 'service'
    }));
    
    const edges = [];
    for (let i = 1; i < trace.events.length; i++) {
      const prevEvent = trace.events[i - 1];
      const currentEvent = trace.events[i];
      
      edges.push({
        source: prevEvent.source,
        target: currentEvent.source,
        label: currentEvent.eventType,
        timestamp: currentEvent.timestamp
      });
    }
    
    return { nodes, edges, metadata: trace };
  }
}

Conclusion

Event-driven architecture at scale requires careful consideration of consistency models, failure handling, and schema evolution. The patterns shared here have been proven in production systems processing millions of events daily.

Key takeaways for 2025:

  1. Design for Eventual Consistency - Embrace the asynchronous nature
  2. Implement Comprehensive Error Handling - Plan for failures at every level
  3. Manage Schema Evolution - Maintain backward compatibility
  4. Monitor Event Flows - Observability is crucial for debugging
  5. Use Sagas for Complex Workflows - Coordinate distributed transactions

The future of distributed systems is event-driven, and mastering these patterns will be essential for building scalable, resilient applications.


These patterns have been refined through building event-driven systems handling millions of events per second across e-commerce, fintech, and IoT platforms.