If you've ever built a system where you need to save something to a database and publish an event to a message broker, you've probably run into what's known as the dual write problem. It's one of those things that works fine in development, passes all your tests, and then blows up in production at the worst possible time.
The problem is simple: you save an Order to the database, then publish a StockReservationRequested event to your message broker. But what happens if the database save succeeds and the broker publish fails? Now you have an order in the database but no event was ever published. Stock was never reserved. The customer thinks everything is fine, but the warehouse has no idea.
Or the reverse: the event gets published, but the database transaction rolls back. Now downstream services are reacting to an order that doesn't exist.
This is the dual write problem, and the Transactional Outbox Pattern is one of the cleanest ways to solve it.
What is the Outbox Pattern?
The idea is straightforward. Instead of trying to write to two different systems (database + message broker) in one operation, you only write to the database. You store both your business data and the events that need to be published in the same database transaction. Then a separate process reads those stored events and publishes them to the message broker asynchronously.
This gives you atomicity. Either both the order and the event record are saved, or neither is. There's no split-brain scenario. The event relay process can retry as many times as needed because the events are safely persisted.
The pattern has three parts:
- Write - Save business data and outbox messages in one transaction
- Relay - A background process reads unprocessed outbox messages and publishes them
- Consume - Downstream services handle events idempotently, since the relay might publish duplicates
How It Connects to Domain Events and Unit of Work
If you've read my articles on Domain Events in NestJS and the Unit of Work Pattern with NestJS and TypeORM, you'll see how the outbox fits in naturally.
In the Domain Events article, we saw how aggregates collect domain events internally and these events get dispatched after the aggregate is saved. The problem is that dispatching those events over a message broker is unreliable, it's a network call that can fail.
In the Unit of Work article, we wrapped multiple repository operations in a single database transaction. The outbox pattern extends this by adding one more write to that transaction: persisting the domain events as outbox messages.
So the flow becomes:
- Aggregate collects domain events (as we discussed in the Domain-Driven Design article)
- The Unit of Work opens a transaction
- Business data is saved
- Domain events are written to the outbox table
- Transaction is committed
- A separate relay process picks up outbox messages and publishes them
The domain stays completely unaware of the outbox. It just raises events like it always did. The infrastructure layer takes care of persisting them.
Creating the Outbox Entity
First, we need a TypeORM entity for the outbox table. This table stores every event that needs to be published.
// src/infrastructure/persistence/entities/outbox-message.entity.ts
import { Entity, PrimaryGeneratedColumn, Column, CreateDateColumn, Index } from 'typeorm';
@Entity('outbox_messages')
export class OutboxMessageEntity {
@PrimaryGeneratedColumn('uuid')
public id: string;
@Column({ type: 'varchar', length: 255 })
public aggregateType: string;
@Column({ type: 'uuid' })
@Index()
public aggregateId: string;
@Column({ type: 'varchar', length: 255 })
public eventType: string;
@Column({ type: 'jsonb' })
public payload: Record<string, unknown>;
@CreateDateColumn()
public createdAt: Date;
@Column({ type: 'timestamp', nullable: true })
@Index()
public processedAt: Date | null;
}
A few things to note here. The aggregateType and aggregateId tell you which entity this event belongs to. The eventType is the name of the domain event. The payload stores the serialized event data. And processedAt starts as null and gets set once the relay has successfully published the message. We index it because the relay queries on it constantly.
Storing Events in the Outbox During UoW Commit
Now we need to modify our Unit of Work to write domain events to the outbox table as part of the same transaction. Here's the interface for the outbox writer:
// src/infrastructure/persistence/interfaces/outbox-writer.interface.ts
import { EntityManager } from 'typeorm';
interface DomainEvent {
readonly aggregateId: string;
readonly aggregateType: string;
readonly eventType: string;
readonly payload: Record<string, unknown>;
readonly occurredAt: Date;
}
interface OutboxWriter {
persistEvents(
transactionManager: EntityManager,
domainEvents: ReadonlyArray<DomainEvent>
): Promise<void>;
}
And the implementation:
// src/infrastructure/persistence/outbox-writer.service.ts
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { OutboxMessageEntity } from './entities/outbox-message.entity';
@Injectable()
export class OutboxWriterService implements OutboxWriter {
public async persistEvents(
transactionManager: EntityManager,
domainEvents: ReadonlyArray<DomainEvent>
): Promise<void> {
if (domainEvents.length === 0) {
return;
}
const outboxMessages: Array<OutboxMessageEntity> = domainEvents.map(
(domainEvent: DomainEvent): OutboxMessageEntity => {
const outboxMessage = new OutboxMessageEntity();
outboxMessage.aggregateType = domainEvent.aggregateType;
outboxMessage.aggregateId = domainEvent.aggregateId;
outboxMessage.eventType = domainEvent.eventType;
outboxMessage.payload = domainEvent.payload;
outboxMessage.processedAt = null;
return outboxMessage;
}
);
await transactionManager.save(OutboxMessageEntity, outboxMessages);
}
}
Now, in our Unit of Work's commit phase, we collect domain events from all aggregates and pass them through the outbox writer. Here's how the modified withTransaction method looks:
// src/infrastructure/persistence/order-unit-of-work.ts
import { Injectable } from '@nestjs/common';
import { DataSource, EntityManager } from 'typeorm';
import { OutboxWriterService } from './outbox-writer.service';
interface OrderRepositories {
readonly order: OrderRepository;
readonly orderItem: OrderItemRepository;
}
@Injectable()
export class OrderUnitOfWork {
private readonly dataSource: DataSource;
private readonly outboxWriter: OutboxWriterService;
public constructor(
dataSource: DataSource,
outboxWriter: OutboxWriterService
) {
this.dataSource = dataSource;
this.outboxWriter = outboxWriter;
}
public async withTransaction<TResult>(
operation: (repositories: OrderRepositories) => Promise<TResult>
): Promise<TResult> {
const queryRunner = this.dataSource.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
try {
const transactionManager: EntityManager = queryRunner.manager;
const repositories: OrderRepositories = {
order: transactionManager.getRepository(OrderEntity),
orderItem: transactionManager.getRepository(OrderItemEntity),
};
const result: TResult = await operation(repositories);
// Collect domain events from all aggregates involved in this operation
const domainEvents: Array<DomainEvent> = this.collectDomainEvents();
// Write domain events to the outbox in the same transaction
await this.outboxWriter.persistEvents(transactionManager, domainEvents);
await queryRunner.commitTransaction();
return result;
} catch (error: unknown) {
await queryRunner.rollbackTransaction();
throw error;
} finally {
await queryRunner.release();
}
}
private collectDomainEvents(): Array<DomainEvent> {
// In practice, your aggregates expose their collected events
// and you gather them here before commit
return [];
}
}
The key point is that outboxWriter.persistEvents runs inside the same transaction as all the business data writes. If anything fails, everything rolls back together. No orphaned events, no missing data.
The Outbox Relay (Poller)
Now we need something that reads unprocessed outbox messages and publishes them to the message broker. This is a scheduled service that runs on a regular interval.
// src/infrastructure/messaging/outbox-relay.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, IsNull } from 'typeorm';
import { OutboxMessageEntity } from '../persistence/entities/outbox-message.entity';
interface EventPublisher {
publish(eventType: string, payload: Record<string, unknown>): Promise<void>;
}
@Injectable()
export class OutboxRelayService {
private static readonly BATCH_SIZE: number = 50;
private readonly logger: Logger = new Logger(OutboxRelayService.name);
private readonly outboxRepository: Repository<OutboxMessageEntity>;
private readonly eventPublisher: EventPublisher;
public constructor(
@InjectRepository(OutboxMessageEntity)
outboxRepository: Repository<OutboxMessageEntity>,
eventPublisher: EventPublisher
) {
this.outboxRepository = outboxRepository;
this.eventPublisher = eventPublisher;
}
@Cron(CronExpression.EVERY_5_SECONDS)
public async processOutboxMessages(): Promise<void> {
const unprocessedMessages: Array<OutboxMessageEntity> =
await this.outboxRepository.find({
where: { processedAt: IsNull() },
order: { createdAt: 'ASC' },
take: OutboxRelayService.BATCH_SIZE,
});
if (unprocessedMessages.length === 0) {
return;
}
this.logger.log(
`Processing ${unprocessedMessages.length} outbox messages`
);
for (const outboxMessage of unprocessedMessages) {
try {
await this.eventPublisher.publish(
outboxMessage.eventType,
outboxMessage.payload
);
outboxMessage.processedAt = new Date();
await this.outboxRepository.save(outboxMessage);
this.logger.debug(
`Published outbox message ${outboxMessage.id} (${outboxMessage.eventType})`
);
} catch (error: unknown) {
this.logger.error(
`Failed to publish outbox message ${outboxMessage.id}`,
error instanceof Error ? error.stack : String(error)
);
// Don't mark as processed - it will be retried on the next poll
break;
}
}
}
}
This service runs every 5 seconds, picks up a batch of unprocessed messages, and publishes them one by one. If publishing fails for a message, we stop processing the batch and try again on the next cycle. This preserves ordering within an aggregate.
Notice we process messages in createdAt order. This is important because events for the same aggregate should be published in the order they occurred.
Idempotent Consumers
Since the relay might publish the same event more than once (think: it published the event but crashed before marking it as processed), consumers must be idempotent. This means handling the same event twice should produce the same result as handling it once.
The simplest approach is a processed events table on the consumer side:
// src/infrastructure/persistence/entities/processed-event.entity.ts
import { Entity, PrimaryColumn, CreateDateColumn } from 'typeorm';
@Entity('processed_events')
export class ProcessedEventEntity {
@PrimaryColumn('uuid')
public eventId: string;
@CreateDateColumn()
public processedAt: Date;
}
And then in the consumer handler:
// src/modules/stock/application/handlers/stock-reservation.handler.ts
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { ProcessedEventEntity } from '../../../../infrastructure/persistence/entities/processed-event.entity';
interface StockReservationRequestedEvent {
readonly eventId: string;
readonly orderId: string;
readonly items: ReadonlyArray<{
readonly productId: string;
readonly quantity: number;
}>;
}
@Injectable()
export class StockReservationHandler {
private readonly logger: Logger = new Logger(StockReservationHandler.name);
private readonly processedEventRepository: Repository<ProcessedEventEntity>;
private readonly stockService: StockService;
public constructor(
@InjectRepository(ProcessedEventEntity)
processedEventRepository: Repository<ProcessedEventEntity>,
stockService: StockService
) {
this.processedEventRepository = processedEventRepository;
this.stockService = stockService;
}
public async handle(event: StockReservationRequestedEvent): Promise<void> {
const existingEvent: ProcessedEventEntity | null =
await this.processedEventRepository.findOneBy({
eventId: event.eventId,
});
if (existingEvent !== null) {
this.logger.warn(
`Event ${event.eventId} has already been processed, skipping`
);
return;
}
await this.stockService.reserveStock(event.orderId, event.items);
const processedEvent = new ProcessedEventEntity();
processedEvent.eventId = event.eventId;
await this.processedEventRepository.save(processedEvent);
this.logger.log(
`Successfully processed stock reservation for order ${event.orderId}`
);
}
}
Before doing any work, the handler checks if it has already processed this event. If it has, it skips. If not, it does the work and records the event ID. This way, even if the same event arrives three times, the stock is only reserved once.
One thing to keep in mind: the check-and-save should ideally happen in the same transaction as the business logic to avoid race conditions. In high-throughput scenarios, you might want a unique constraint on the eventId column and catch the duplicate key error instead of doing a separate lookup.
Production Concerns
The basic pattern is simple, but running it in production requires a few more things.
Outbox cleanup. The outbox table will grow forever if you don't clean it up. Run a scheduled job that deletes messages older than a certain threshold (say 7 days) where processedAt is not null. Don't delete unprocessed messages, those still need to be published.
// src/infrastructure/persistence/outbox-cleanup.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, LessThan, Not, IsNull } from 'typeorm';
import { OutboxMessageEntity } from './entities/outbox-message.entity';
@Injectable()
export class OutboxCleanupService {
private static readonly RETENTION_DAYS: number = 7;
private readonly logger: Logger = new Logger(OutboxCleanupService.name);
private readonly outboxRepository: Repository<OutboxMessageEntity>;
public constructor(
@InjectRepository(OutboxMessageEntity)
outboxRepository: Repository<OutboxMessageEntity>
) {
this.outboxRepository = outboxRepository;
}
@Cron(CronExpression.EVERY_DAY_AT_3AM)
public async cleanupProcessedMessages(): Promise<void> {
const cutoffDate: Date = new Date();
cutoffDate.setDate(
cutoffDate.getDate() - OutboxCleanupService.RETENTION_DAYS
);
const result = await this.outboxRepository.delete({
processedAt: Not(IsNull()) && LessThan(cutoffDate),
});
this.logger.log(
`Cleaned up ${result.affected ?? 0} processed outbox messages`
);
}
}
Retry with backoff. If an event keeps failing to publish, you don't want to hammer the broker every 5 seconds forever. Add a retryCount and nextRetryAt column to the outbox table. On each failure, increment the count and push the next retry further into the future using exponential backoff.
Monitoring. Keep an eye on the number of unprocessed outbox messages. If that number keeps growing, something is wrong with your relay or your broker. Set up alerts when unprocessed messages exceed a threshold or when any single message has been unprocessed for longer than a few minutes.
Dead letter handling. After a certain number of retries (say 10), stop retrying and move the message to a dead letter state. Flag it for manual review. You don't want a single poison message blocking the entire relay queue.
Ordering guarantees. The current implementation processes messages in order by createdAt. If you need strict ordering per aggregate, you'll need to stop processing when a message for a specific aggregate fails, while continuing to process messages for other aggregates. This adds complexity but is sometimes necessary.
Clean Architecture Alignment
One thing I want to highlight is how this pattern fits into Clean Architecture principles, which we discussed in the Domain-Driven Design article.
The outbox is purely an infrastructure concern. Your domain layer knows nothing about it. Your Order aggregate raises a StockReservationRequested event the same way it always did. It doesn't know or care that the event ends up in a database table before reaching a message broker.
The Unit of Work, which lives in the infrastructure layer, is responsible for intercepting those domain events and writing them to the outbox. The relay service is also infrastructure. The consumer's idempotency check is infrastructure.
Your domain stays pure. It deals with business rules, raises events, and that's it. The infrastructure layer handles the messy reality of reliable event delivery. This separation means you could swap out the outbox implementation (maybe switch from polling to change data capture) without touching a single line of domain code.
Conclusion
The Transactional Outbox Pattern solves a real and common problem: reliably publishing events when your business data changes. By storing events in the same transaction as your business data and relaying them asynchronously, you eliminate the dual write problem entirely.
Combined with Domain Events for clean aggregate communication and the Unit of Work pattern for transactional consistency, the outbox pattern gives you a solid foundation for building event-driven systems that actually work in production. The domain stays clean, the infrastructure handles reliability, and your events always get delivered, even if it takes a retry or two.
Start simple with the polling relay and idempotent consumers. Add retry backoff, monitoring, and cleanup as your system grows. That's the pragmatic path to reliable event-driven architecture in NestJS.
