Unlock CorrelationId Pattern: handle duplicate messages and ensure idempotent

In the previous article about the Outbox Pattern, we built a reliable system for publishing events. But there's a critical question we need to address: What happens when the same event is processed multiple times?
This is where idempotency and message deduplication become essential—especially in high-throughput environments with multiple consumers racing to process the same messages.
The Challenge: Why Duplicates Are Inevitable
When using the Outbox Pattern with retries, duplicate processing is not just possible—it's guaranteed to happen. Here's why:
Scenario 1: Network Timeout
1. Outbox Processor publishes event to Service Bus ✅
2. Service Bus accepts the message ✅
3. Acknowledgment gets lost in network timeout ❌
4. Processor thinks it failed, retries ➡️ Duplicate!
Scenario 2: Processor Crash
1. Outbox Processor reads message from database ✅
2. Publishes to Service Bus ✅
3. Processor crashes before marking as processed ❌
4. On restart, processes same message again ➡️ Duplicate!
Scenario 3: Multiple Consumers
1. Consumer A receives event, starts processing ✅
2. Processing takes 30 seconds ⏱️
3. Service Bus lock expires (default: 1 minute) ⏰
4. Consumer B receives same event ➡️ Duplicate!
In high-throughput systems processing millions of events, these scenarios happen constantly. We need multiple layers of defense.
Understanding Idempotency
Idempotency means that performing an operation multiple times produces the same result as performing it once.
Idempotent vs Non-Idempotent Operations
// ❌ NOT Idempotent: Running this twice charges customer twice! public async Task ProcessOrderPaymentAsync(Guid orderId) { var order = await _orderRepository.GetByIdAsync(orderId); await _paymentGateway.ChargeAsync(order.CustomerId, order.Total); } // ✅ Idempotent: Safe to run multiple times public async Task ProcessOrderPaymentAsync(Guid orderId) { var order = await _orderRepository.GetByIdAsync(orderId); // Check if already charged var existingCharge = await _paymentRepository .GetChargeByOrderIdAsync(orderId); if (existingCharge != null) { _logger.LogInformation("Order {OrderId} already charged", orderId); return; // Skip - already processed } var charge = await _paymentGateway.ChargeAsync(order.CustomerId, order.Total); await _paymentRepository.SaveChargeAsync(orderId, charge); }
Natural Idempotency: Design Events Wisely
Some operations are naturally idempotent:
// ❌ NOT naturally idempotent public record PriceAdjustmentEvent { public Guid ProductId { get; init; } public decimal Adjustment { get; init; } // Add $5 - running twice = wrong! } // ✅ Naturally idempotent public record PriceSetEvent { public Guid ProductId { get; init; } public decimal NewPrice { get; init; } // Set to $25 - running twice = same result! }
Understanding Message Deduplication
Message deduplication prevents the same message from being delivered multiple times by detecting and discarding duplicates.
Two Levels of Deduplication
- Publisher-side: Prevent duplicate messages from entering the system
- Consumer-side: Detect and skip messages already processed
Let's implement both.
The CorrelationId Pattern: Your Primary Defense
The CorrelationId pattern is the cornerstone of idempotency in distributed systems. It uses a unique identifier to track whether an event has already been processed.
How It Works

Implementation: Step-by-Step
Step 1: Define the ProcessedEvents Table
// Domain/Events/ProcessedEvent.cs public class ProcessedEvent { public Guid EventId { get; set; } public string EventType { get; set; } = string.Empty; public string ConsumerName { get; set; } = string.Empty; public DateTime ProcessedAt { get; set; } }
Step 2: Configure EF Core
// Infrastructure/Persistence/ApplicationDbContext.cs public DbSet<ProcessedEvent> ProcessedEvents => Set<ProcessedEvent>(); protected override void OnModelCreating(ModelBuilder modelBuilder) { modelBuilder.Entity<ProcessedEvent>(entity => { entity.ToTable("ProcessedEvents"); // Composite key: EventId + ConsumerName // This allows different consumers to process the same event independently entity.HasKey(e => new { e.EventId, e.ConsumerName }); entity.Property(e => e.EventType).IsRequired().HasMaxLength(200); entity.Property(e => e.ConsumerName).IsRequired().HasMaxLength(200); // Index for fast lookups entity.HasIndex(e => e.EventId); entity.HasIndex(e => e.ProcessedAt); }); }
Step 3: Implement Idempotent Event Handler
// Application/Events/OrderCreatedEmailHandler.cs public class OrderCreatedEmailHandler : IEventHandler<OrderCreatedEvent> { private readonly IEmailService _emailService; private readonly ApplicationDbContext _dbContext; private readonly ILogger<OrderCreatedEmailHandler> _logger; private const string ConsumerName = nameof(OrderCreatedEmailHandler); public async Task HandleAsync( OrderCreatedEvent domainEvent, CancellationToken cancellationToken = default) { // ✅ STEP 1: Check if already processed (CorrelationId pattern) var alreadyProcessed = await _dbContext.ProcessedEvents .AnyAsync(e => e.EventId == domainEvent.EventId && e.ConsumerName == ConsumerName, cancellationToken); if (alreadyProcessed) { _logger.LogInformation( "Event {EventId} already processed by {ConsumerName}, skipping", domainEvent.EventId, ConsumerName); return; // 🛑 Stop here - already done! } // ✅ STEP 2: Use transaction for atomicity await using var transaction = await _dbContext.Database .BeginTransactionAsync(cancellationToken); try { // ✅ STEP 3: Perform business logic await _emailService.SendOrderConfirmationAsync( domainEvent.OrderId, domainEvent.CustomerId, cancellationToken); // ✅ STEP 4: Record that we processed this event await _dbContext.ProcessedEvents.AddAsync(new ProcessedEvent { EventId = domainEvent.EventId, EventType = nameof(OrderCreatedEvent), ConsumerName = ConsumerName, ProcessedAt = DateTime.UtcNow }, cancellationToken); await _dbContext.SaveChangesAsync(cancellationToken); await transaction.CommitAsync(cancellationToken); _logger.LogInformation( "Successfully processed event {EventId} by {ConsumerName}", domainEvent.EventId, ConsumerName); } catch (Exception ex) { await transaction.RollbackAsync(cancellationToken); _logger.LogError(ex, "Failed to process event {EventId} by {ConsumerName}", domainEvent.EventId, ConsumerName); throw; // Let retry mechanism handle it } } }
Why This Works
Atomicity: Database transaction ensures that either:
- ✅ Email sent AND ProcessedEvent recorded
- ❌ Neither happens (can retry safely)
No Race Conditions: Composite key (EventId, ConsumerName) prevents concurrent processing by same consumer.
Azure Service Bus Duplicate Detection
Azure Service Bus provides built-in duplicate detection at the infrastructure level.
Configure Service Bus with Duplicate Detection
// Infrastructure/Configuration/ServiceBusSetup.cs public static class ServiceBusSetup { public static async Task ConfigureTopicsAsync( ServiceBusAdministrationClient adminClient, IConfiguration configuration) { var topicName = "ordercreated"; var topicOptions = new CreateTopicOptions(topicName) { // ✅ Enable duplicate detection RequiresDuplicateDetection = true, // Detection window: 10 minutes // Keep small for high-throughput scenarios DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(10), // Enable partitioning for better throughput EnablePartitioning = true }; await adminClient.CreateTopicAsync(topicOptions); } }
Use EventId as MessageId
// Infrastructure/Messaging/AzureServiceBusPublisher.cs public async Task PublishAsync( object domainEvent, Type eventType, CancellationToken cancellationToken = default) { var topicName = eventType.Name.ToLowerInvariant().Replace("event", ""); var sender = _serviceBusClient.CreateSender(topicName); try { var eventData = JsonSerializer.Serialize(domainEvent); // ✅ Extract EventId from domain event var eventId = GetEventId(domainEvent); var message = new ServiceBusMessage(eventData) { // 🔑 KEY: Use EventId as MessageId for deduplication MessageId = eventId.ToString(), ContentType = "application/json", Subject = eventType.Name }; await sender.SendMessageAsync(message, cancellationToken); } finally { await sender.DisposeAsync(); } } private Guid GetEventId(object domainEvent) { var eventIdProperty = domainEvent.GetType().GetProperty("EventId"); if (eventIdProperty?.PropertyType == typeof(Guid)) { return (Guid)eventIdProperty.GetValue(domainEvent)!; } throw new InvalidOperationException( $"Event {domainEvent.GetType().Name} must have an EventId property"); }
How Service Bus Deduplication Works:
- Tracks all
MessageIdvalues within the detection window (10 minutes) - If a message with the same
MessageIdarrives again, Service Bus silently drops it - No duplicate reaches your consumers
High-Throughput Optimization: Distributed Cache
For systems processing thousands of events per second, database lookups become a bottleneck. Use Redis for fast deduplication checks.
Add Redis Caching Layer
// Infrastructure/Events/CachedIdempotentEventHandler.cs public class CachedIdempotentEventHandler<TEvent> : IEventHandler<TEvent> where TEvent : IDomainEvent { private readonly IEventHandler<TEvent> _innerHandler; private readonly IDistributedCache _cache; private readonly ApplicationDbContext _dbContext; private readonly ILogger<CachedIdempotentEventHandler<TEvent>> _logger; private readonly string _consumerName; public CachedIdempotentEventHandler( IEventHandler<TEvent> innerHandler, IDistributedCache cache, ApplicationDbContext dbContext, ILogger<CachedIdempotentEventHandler<TEvent>> logger) { _innerHandler = innerHandler; _cache = cache; _dbContext = dbContext; _logger = logger; _consumerName = innerHandler.GetType().Name; } public async Task HandleAsync(TEvent domainEvent, CancellationToken cancellationToken = default) { var cacheKey = $"processed:{_consumerName}:{domainEvent.EventId}"; // ✅ FAST PATH: Check Redis first (microseconds) var cachedValue = await _cache.GetStringAsync(cacheKey, cancellationToken); if (cachedValue != null) { _logger.LogInformation( "Event {EventId} already processed (cache hit), skipping", domainEvent.EventId); return; } // ✅ FALLBACK: Check database (milliseconds) var alreadyProcessed = await _dbContext.ProcessedEvents .AnyAsync(e => e.EventId == domainEvent.EventId && e.ConsumerName == _consumerName, cancellationToken); if (alreadyProcessed) { // Found in DB but not cache - warm the cache await _cache.SetStringAsync( cacheKey, "1", new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(24) }, cancellationToken); _logger.LogInformation( "Event {EventId} already processed (db hit), skipping", domainEvent.EventId); return; } // Process the event await _innerHandler.HandleAsync(domainEvent, cancellationToken); // ✅ Cache the result await _cache.SetStringAsync( cacheKey, "1", new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(24) }, cancellationToken); } }
Register with DI
// Program.cs builder.Services.AddStackExchangeRedisCache(options => { options.Configuration = builder.Configuration["Redis:ConnectionString"]; options.InstanceName = "EventProcessing:"; }); // Decorate handlers with caching builder.Services.AddScoped<IEventHandler<OrderCreatedEvent>, OrderCreatedEmailHandler>(); builder.Services.Decorate<IEventHandler<OrderCreatedEvent>, CachedIdempotentEventHandler<OrderCreatedEvent>>();
Performance Impact:
- ✅ Redis lookup: ~1ms
- ❌ Database lookup: ~10-50ms
- 📈 10-50x faster for duplicate checks!
Complete Defense Strategy: Three Layers
| Layer | Mechanism | Prevents | Performance Impact |
|---|---|---|---|
| 1. Consumer CorrelationId | ProcessedEvents table + transaction |
Consumer-side duplicates | Medium (~10-50ms per event) |
| 2. Service Bus Deduplication | MessageId + RequiresDuplicateDetection |
Publisher-side duplicates | None (infrastructure) |
| 3. Redis Cache | Distributed cache lookup | Redundant DB queries | Low (~1ms per event) |
Why All Three Layers?
Layer 1 (CorrelationId + DB): Provides permanent record of processed events—no time window limits.
Layer 2 (Service Bus): Stops duplicates from even reaching consumers—but has a time window limit (10 minutes).
Layer 3 (Redis): Makes Layer 2 fast enough for high-throughput scenarios—without sacrificing durability.
Going futher - Cleanup: Preventing Unbounded Growth
The ProcessedEvents table will grow indefinitely—clean it up periodically.
// Infrastructure/BackgroundServices/ProcessedEventsCleanupService.cs public class ProcessedEventsCleanupService : BackgroundService { private readonly IServiceProvider _serviceProvider; private readonly ILogger<ProcessedEventsCleanupService> _logger; private readonly TimeSpan _interval = TimeSpan.FromHours(6); private readonly TimeSpan _retentionPeriod = TimeSpan.FromDays(7); protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { try { await CleanupAsync(stoppingToken); } catch (Exception ex) { _logger.LogError(ex, "Error during cleanup"); } await Task.Delay(_interval, stoppingToken); } } private async Task CleanupAsync(CancellationToken cancellationToken) { await using var scope = _serviceProvider.CreateAsyncScope(); var dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>(); var cutoffDate = DateTime.UtcNow.Subtract(_retentionPeriod); var deletedCount = await dbContext.ProcessedEvents .Where(e => e.ProcessedAt < cutoffDate) .ExecuteDeleteAsync(cancellationToken); if (deletedCount > 0) { _logger.LogInformation( "Cleaned up {Count} processed events older than {Days} days", deletedCount, _retentionPeriod.Days); } } }
Why 7 days?
- Matches typical retry windows
- Covers weekends and holidays
- Balances storage vs safety
The Bottom Line
In high-throughput distributed systems, duplicates will happen. The CorrelationId pattern combined with Azure Service Bus deduplication creates a robust defense that keeps your system consistent—even when processing millions of events per second.
Start with the CorrelationId pattern (Layer 1). Add Redis caching (Layer 3) when throughput demands it. Let Service Bus deduplication (Layer 2) handle the easy cases automatically.
Your customers won't get charged twice. Your inventory won't be double-decremented. Your system will be reliable.
See you for more read on www.devskillsunlock.com
