Exactly-Once Event Processing Made Easy with Apache Kafka and DBOS

Exactly-once Apache Kafka message processing made easy with DBOS Transact
Peter Kraft
September 10, 2024

Many applications built on event streaming systems like Apache Kafka need exactly-once semantics for data integrity. In other words, they need to guarantee that events are processed exactly one time and are never under- or over-counted. However, exactly-once semantics are notoriously hard to obtain in distributed systems, so many event streaming platforms default to weaker guarantees. For example, Kafka natively supports exactly-once semantics only for stream processing, not for general applications. As a result, developers who need exactly-once semantics must invent complicated architectures to obtain it.

In this blog post, we’ll show you how the open-source TypeScript framework DBOS Transact dramatically simplifies exactly-once event processing. We’ll also demonstrate how to write a Kafka consumer that processes every message sent to a topic exactly one time, regardless of interruptions, crashes, or failures.

How exactly-once Kafka message processing works with DBOS Transact

Implementing exactly-once semantics in a distributed system is challenging because it’s theoretically impossible: the sender of a message cannot always verify whether an unresponsive receiver failed to process their message or is just being slow to acknowledge receipt. To get around this problem, a common strategy is to deliver a message multiple times, but design the receiver to process each event idempotently so that duplicate message deliveries have no effect. However, developing idempotent applications is also notoriously difficult!

DBOS Transact makes exactly-once event processing easy by making idempotency easy. You can process Kafka events exactly-once with two different types of operations:

When your consumer receives a message from Kafka, DBOS Transact constructs an idempotency key for that message from the message’s topic, partition, and offset–a combination that is guaranteed to be unique for a Kafka cluster. What happens next depends on whether you’re executing a transaction or a workflow.

Exactly-once transactions

If you’re processing the message with a single transaction, DBOS Transact synchronously executes the transaction, records its idempotency key as part of that transaction, then commits the message’s offset to Kafka. This guarantees exactly-once processing because:

  • If transaction execution fails, the offset isn’t committed, so Kafka retries the message later.
  • If a duplicate message arrives, DBOS Transact checks the idempotency key and doesn’t re-execute the transaction (the transaction logic also handles corner cases where the message arrives twice simultaneously–a subject for a future blog post).

Exactly-once workflows

If you’re processing the message with a workflow, DBOS Transact synchronously records the idempotency key, commits the message’s offset to Kafka, then asynchronously processes the workflow. This guarantees exactly-once processing because:

  • If recording the idempotency key fails, the offset isn’t committed, so Kafka will send the message again later.
  • If the workflow fails, it will always recover and resume from where it left off because DBOS workflows are reliable.
  • If a duplicate message arrives, DBOS Transact checks the idempotency key and doesn’t start a new workflow.

Getting Started with Kafka and DBOS

Now, let’s see what this looks like in code. The first step to exactly-once semantics is to write your event processing code in TypeScript using DBOS Transact. You can write a single database transaction or a workflow coordinating multiple operations. For now, let’s keep it simple:

import { Workflow, WorkflowContext } from '@dbos-inc/dbos-sdk';

class KafkaExample{
  @Workflow() 
  static async kafkaWorkflow(ctxt: WorkflowContext, topic: string, partition: number, message: KafkaMessage) {   	
    ctxt.logger.info(`Message received: ${message.value?.toString()}`) 
  }
}

Next, annotate your method with a @KafkaConsume decorator specifying which topic to consume events from. Additionally, annotate your class with an @Kafka decorator defining which brokers to connect to.  That’s all there is to it! When you start your application, DBOS Transact will invoke your method exactly-once for each message sent to the topic.

import { KafkaConfig, KafkaMessage} from "kafkajs";
import { Workflow, WorkflowContext, Kafka, KafkaConsume } from '@dbos-inc/dbos-sdk';

@Kafka({ brokers: ['localhost:9092']})
class KafkaExample{
  @KafkaConsume("example-topic")
  @Workflow()
  static async kafkaWorkflow(ctxt: WorkflowContext, topic: string, partition: number, message: KafkaMessage) {
    ctxt.logger.info(`Message received: ${message.value?.toString()}`) 
  }
}

If you need more control, you can pass KafkaJS configuration objects to both the class and method Kafka decorators. For example, you can specify a custom consumer group ID:

@KafkaConsume("example-topic", { groupId: "custom-group-id" })
@Workflow()
static async kafkaWorkflow(ctxt: WorkflowContext, topic: string, partition: number, message: KafkaMessage) { 
  ctxt.logger.info(`Message received: ${message.value?.toString()}`)
}

To get started with DBOS Transact, check out the quickstart and docs. For more information on using Kafka with DBOS Transact, check out this docs page. To join our community:

© DBOS, Inc. 2024