All articles

The Transactional Outbox Pattern with PostgreSQL and RabbitMQ

HugoHugo
··7 min read

How do you write to two different systems, for example RabbitMQ and PostgreSQL?

You might say it's pretty straightforward.

kotlin
@Transactional   
fun doSomething() {
    ... 
    // save in database through JPA
    myRepo.save(myEntity)
    // post message
    broker.publishEvent(event)
    ...
}

But what happens if there's an error, if the SQL transaction fails? You end up with a message published but for an operation that never actually took place.

Two-phase commit transactions

This is a relatively common problem and we're used to handling it with 2 Phase commit transactions. The idea being to introduce a transaction across all operations to external systems, including RabbitMQ here.

It's a somewhat complex mechanism that requires an additional coordinator to ensure everything goes well before validating the write across all systems.

2 phase commit workflow
2 phase commit workflow

But there are several problems.

The first is that it's not possible to do 2PC transactions with RabbitMQ (also called XA transactions). Which, you'll agree, is already a relatively significant problem in itself.

excerpt from documentation confirming absence of XA transactions with RabbitMQ
excerpt from documentation confirming absence of XA transactions with RabbitMQ

We could use ActiveMQ which supports 2PC transactions. But on the other hand, it would be a shame to write an article about PostgreSQL and RabbitMQ just to conclude that you need to use ActiveMQ instead, right?

The second problem is that in any case, a 2PC transaction penalizes the overall system performance. To synchronize two or more systems, not only will the total operation time never be less than the minimal time of the slowest system, but you also add a cost related to coordination.

The last problem is that you degrade overall availability. Availability being the multiplication of the average availability of each system. For example with two systems with 99% availability, the total system achieves 98% availability.

$0.99 \times 0.99 = 0.9801$

And that was basically the state of my knowledge until yesterday. I had already used XA transactions, I had already dealt with distributed systems issues and I've worked on plenty of mitigation scenarios for this type of problem.

Yesterday I added RabbitMQ to Writizzy's stack and I used another solution: the Transactional Outbox pattern.

The transactional outbox pattern

While the underlying concept isn't necessarily new, this specific pattern was popularized by Chris Richardson between 2014 and 2016.

The concept is "simple":

  • rely only on the database to publish an event (step 1)
  • read the table in an asynchronous job (step 2)
  • publish to the message queue from the job (step 3)

The code becomes

kotlin
@Transactional   
fun doSomething() {
    ... 
    // save in database through JPA
    myRepo.save(myEntity)
    // save in database
    outboxRepo.save(event)
    ...
}

We now have only a single transaction, and the guarantee that the event is published only if the transaction succeeds. Event that will be sent to RabbitMQ later.

Now, to actually send to RabbitMQ, you need a job to read the table:

javascript
@Scheduled(fixedDelay = 500)
@SchedulerLock(name = "outbox_publisher", lockAtMostFor = "PT30S", lockAtLeastFor = "PT0S")
fun run() {
    outboxPublisherService.publishPending()
}

Note the use of SchedulerLock here which allows the use of schedulers in a multi-node context, to avoid simultaneous reading by multiple applications. This allows replacing Quartz which I frequently used until now.

And you need the publication service code

javascript

    fun publishPending() {
        val events = outboxEventRepository.findTop20ByOrderByCreatedAtAsc()
        events.forEach { event -> publishEvent(event) }
    }

    private fun publishEvent(event: OutboxEvent) {
        try {
            rabbitTemplate.invoke { t ->
                t.convertAndSend(event.exchange, event.routingKey, event.payload) { message ->
                    message.messageProperties.messageId = event.id
                    message.messageProperties.headers[HEADER_ORIGINAL_EXCHANGE] = event.exchange
                    message.messageProperties.headers[HEADER_ORIGINAL_ROUTING_KEY] = event.routingKey
                    message
                }
                t.waitForConfirmsOrDie(5_000)
            }
            outboxEventRepository.delete(event)
        } catch (e: Exception) {
            logger.error("Failed to publish outbox event ${event.id} (${event.eventType})", e)
        }
    }

Note that we've enabled RabbitMQ's publisher confirms mode and we're waiting for write confirmation: t.waitForConfirmsOrDie(5_000)

But you could say to me: "you have the problem of double write to two systems again".

And... you're not wrong.

But it's simpler here. I only have 3 cases:

  • Everything goes well, nothing to say, it's perfect.
  • RabbitMQ fails => we go into the exception, we log, we exit, the message stays in the table and will be retried
  • The PostgreSQL transaction fails. We go into the exception but the message is already sent!!

The 3rd case is important, it will happen so you need to handle it. This implies that each consumer must be able to be idempotent on message reception. That is, each consumer must be able to accept the same event twice without incident.

Here strategies are multiple:

  • read the message ID and store it somewhere to check that we don't do the operation twice, ideal for calls to external services
  • trigger idempotent operations (Example: set status = pending, even if we do it twice, it doesn't pose a problem)

etc...

And error handling?

But there's an issue, if RabbitMQ goes down, if it's unreachable, if the disk is full, we're going to log the exception and with a batch running every 500ms I'll let you imagine the astronomical amount of logs that will produce. So we need a Circuit Breaker. Conveniently, we can use resilience4j.

With a circuit breaker, we'll cut off sending to Rabbit in case of error and wait a bit before retrying. I'll leave the code here but won't detail it, that would be the subject of another article.

javascript

init {
    circuitBreakerRegistry.circuitBreaker("rabbitmq-outbox").eventPublisher
        .onStateTransition { event ->
            logger.warn("RabbitMQ circuit breaker: ${event.stateTransition}")
        }
}

@CircuitBreaker(name = "rabbitmq-outbox", fallbackMethod = "skipOnOpenCircuit")
fun publishPending() {
    val events = outboxEventRepository.findTop20ByOrderByCreatedAtAsc()
    for (event in events) {
        publishEvent(event)
    }
}

private fun skipOnOpenCircuit(e: CallNotPermittedException) {
    // Circuit open — state transition already logged via eventPublisher
}

private fun publishEvent(event: OutboxEvent) {
    try {
        rabbitTemplate.invoke { t ->
            t.convertAndSend(event.exchange, event.routingKey, event.payload) { message ->
                message.messageProperties.messageId = event.id
                message.messageProperties.headers[HEADER_ORIGINAL_EXCHANGE] = event.exchange
                message.messageProperties.headers[HEADER_ORIGINAL_ROUTING_KEY] = event.routingKey
                message
            }
            t.waitForConfirmsOrDie(5_000)
        }
        outboxEventRepository.delete(event)
    } catch (e: AmqpException) {
        throw e  // broker error → propagates to circuit breaker
    } catch (e: Exception) {
        logger.error("Failed to publish outbox event ${event.id} (${event.eventType})", e)
    }
}

Challenging your own developer beliefs with AI

As I said earlier, I was well acquainted with the XA transaction mechanism and the state of my knowledge for solving this problem stopped there.

I wondered with curiosity if Claude would propose a more elegant implementation and I was rather surprised. In this specific kind of case, it was the perfect opportunity to try learning with AI instead of just suffering code we don't understand.

Letting AI write code without oversight, unsurprisingly it's rarely good. You need an expert eye and ultimately the code produced remains our responsibility and we must be able to understand it. So you need to find a middle ground between letting it do its thing and micro-managing the AI.

By giving it my constraints (stemming from my experience), it was the agent that came up with the Transactional Outbox pattern proposal. And while I was initially a bit skeptical, I tried to understand each part of the code to make it my own by asking multiple questions. The code wasn't perfect, it was following these exchanges that was added:

  • the publish confirm pattern
  • resilience with resilience4j
  • the scheduler lock to avoid multi-node issues
  • the addition of messageid in headers (for deduplication)
  • claude had added a state management on messages that wasn't relevant

So yes, it wasn't a smooth plan but it was a good way for me to force myself to get updated, do some research and in short, learn new things.

Beyond the technical aspect of this post, I mainly wanted to illustrate the method I use to code with an agent, which allows me to combine productivity AND craft.

Stay in the loop

Get new articles delivered directly to your inbox. No spam, unsubscribe anytime.

0 Comments

No comments yet. Be the first to comment!