Summary
I wanted to practice setting up Kafka Producers/Consumers in Kotlin. Things have changed quite a bit since the last time I’ve touched it, so I thought I’d share my experience with it while setting up on Ktor.
Docker setup
Zookeeper is no longer needed and its support has been removed since Kafka 4.0, this makes our docker-compose file a bit more lightweight.
We need to generate a CLUSTER_ID, a 22 character, base64 encoded string. You can craft anything fitting you want here, but the recommended way is to use Kafka’s script. You can run it locally using: docker run --rm apache/kafka:4.1.0 /opt/kafka/bin/kafka-storage.sh random-uuid
.
Armed with a generated CLUSTER_ID and a simplified configuration for single node Kafka setup, we have something like this:
services:
kafka:
image: apache/kafka:4.1.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
KAFKA_LISTENERS: 'PLAINTEXT://:19092,CONTROLLER://:29093,PLAINTEXT_HOST://:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:19092,PLAINTEXT_HOST://localhost:9092'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
# Single-node config
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
CLUSTER_ID: 'FKj3xc5KS72JF9yU9DNdrA'
healthcheck:
test: ["CMD-SHELL", "/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
producer:
build: ./ktor-producer
container_name: ktor-producer
depends_on:
kafka:
condition: service_healthy
ports:
- "8080:8080"
environment:
- KAFKA_BOOTSTRAP_SERVERS=kafka:19092
consumer:
build: ./ktor-consumer
container_name: ktor-consumer
depends_on:
kafka:
condition: service_healthy
ports:
- "8081:8081"
environment:
- KAFKA_BOOTSTRAP_SERVERS=kafka:19092
In production you will typically have multiple brokers, but for our dev setup this will suffice.
Set up the Producer
Let’s create a simple producer. After creating an empty Ktor app with some sane defaults, we can configure a KafkaService like this:
class KafkaService {
private val producer: KafkaProducer<String, String>
init {
val kafkaBootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS") ?: "localhost:9092"
val props = Properties().apply {
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers)
put(ProducerConfig.CLIENT_ID_CONFIG, "ktor-kafka-producer") // identifier for this producer, helps with logs/metrics
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
put(ProducerConfig.ACKS_CONFIG, "all") // how many brokers need to acknowledge a write before success
put(ProducerConfig.RETRIES_CONFIG, 3)
put(ProducerConfig.LINGER_MS_CONFIG, 500) // how long to wait before sending a batch of messages even if batch isn't filled
put(ProducerConfig.BATCH_SIZE_CONFIG, 8192) // size of batch per send
}
producer = KafkaProducer(props)
}
fun sendMessageFireAndForget(topic: String, key: String, message: String) {
val record = ProducerRecord(topic, key, message)
producer.send(record)
}
fun close() = producer.close()
}
Every use-case will require different tweaking to these settings, especially ACKS_CONFIG, LINGER_MS_CONFIG and BATCH_SIZE_CONFIG.
Wait for future?
In sendMessageFireAndForget method I simply used producer.send(record)
without any callback or waiting for result. After all Kafka uses its own thread in the background to batch/send messages and we can keep using the current thread for other work.
However, if it’s important for you to wait for acknowledgement from the broker you can take advantage of the callback overload of the send method. I created the following extension method to convert the unpleasant callback code into a coroutine.
suspend fun <K : Any, V : Any> KafkaProducer<K, V>.dispatch(
record: ProducerRecord<K, V>
): RecordMetadata = suspendCancellableCoroutine { continuation ->
val callback = Callback { metadata, exception ->
when {
exception != null -> continuation.resumeWith(Result.failure(exception))
metadata != null -> continuation.resumeWith(Result.success(metadata))
else -> continuation.resumeWith(Result.failure(RuntimeException("Unknown error occurred")))
}
}
val future = this.send(record, callback)
continuation.invokeOnCancellation {
future.cancel(true)
}
}
We can then use this method in this way:
suspend fun sendMessage(topic: String, key: String, message: String) : RecordMetadata {
val record = ProducerRecord(topic, key, message)
return producer.dispatch(record)
}
Note: Because we are waiting for acknowledgement here, this method will take much longer to return (at least the LINGER_MS_CONFIG time). If you have to send a bunch of these messages it’s a good idea to send them concurrently.
suspend fun sendMultipleMessages(topic: String, key: String, message: String) : List<RecordMetadata> = coroutineScope {
val routines = List(5) {
val record = ProducerRecord(topic, key, message)
async { producer.dispatch(record) }
}
routines.awaitAll()
}
Here are processing times of each request:
Method | Time (average of 3) |
---|---|
sendMessageFireAndForget | 14ms |
sendMessage | 527ms |
sendMultipleMessages | 528ms |
As expected sendMessage with acknowledgement takes much longer, but batching allows us to await them concurrently.
Set up the Consumer
Now to setup a consumer, I created another small server and a KafkaService with this configuration:
class KafkaService(private val scope: CoroutineScope) {
private val consumer: KafkaConsumer<String, String>
private val outputFile = File("kafka_messages.log")
private var fileWriter: BufferedWriter? = null
init {
val kafkaBootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS") ?: "localhost:9092"
val props = Properties().apply {
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers)
put(ConsumerConfig.GROUP_ID_CONFIG, "ktor-consumer-group")
put(ConsumerConfig.CLIENT_ID_CONFIG, "ktor-consumer")
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java)
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java)
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100) // max number of messages/records in one poll
}
consumer = KafkaConsumer(props)
}
}
Good to notice the MAX_POLL_RECORDS_CONFIG here, so if the broker is getting flooded with messages we can keep our consumer eating these messages at its own pace. Just have to be careful in these cases since the broker will fill up with unconsumed messages (backpressure).
To start consuming messages, we can add a method that looks something like this:
fun startConsuming(topic: String) {
if (isRunning) return
scope.launch {
isRunning = true
consumer.subscribe(listOf(topic))
fileWriter = BufferedWriter(FileWriter(outputFile, true))
try {
while (isActive && isRunning) {
val records = consumer.poll(Duration.ofMillis(100))
if (records.count() > 0) {
processRecords(records)
}
}
} catch (e: Exception) {
println("Error while consuming messages: ${e.message}")
} finally {
consumer.close()
fileWriter?.close()
isRunning = false
}
}
}
I’ve added a fileWriter here to write the consumed messages to a log. We keep the consumer loop running unless isActive becomes false(this comes from current coroutine, if it’s cancelled for any reason we can stop consuming) or until isRunning is set to false from somewhere in our code.
Processing messages
Since we are consuming batches of messages here, let’s process each one of them. It’s really up to you what kind of processing you do, but I’m just writing each message to a file.
private fun processRecords(records: ConsumerRecords<String, String>) {
records.forEach { record ->
try {
processMessage(record)
} catch (e: Exception) {
println("Error while processing message: ${e.message}")
}
}
consumer.commitAsync { offsets, exception ->
if (exception != null) {
println("Error while committing offsets: ${exception.message}")
} else {
println("Offsets committed successfully, no: ${offsets.count()}")
}
}
}
Once we processed the batch, we tell the broker that this batch is finished processing using consumer.commitAsync.
This is because I’ve set ENABLE_AUTO_COMMIT_CONFIG to false in the configuration. Technically, the code is simpler if we set this flag to true, however we then lose control when we mark the messages as processed and some data loss can occur (although if some data loss is acceptable, this can increase throughput).
Lastly, we write to file like this:
private fun processMessage(record: ConsumerRecord<String, String>) {
val eventsMessage = Json.decodeFromString<EventMessage>(record.value())
val currentTime = LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
val logEntry = """
$currentTime | Topic: ${record.topic()} | Partition: ${record.partition()} | Offset: ${record.offset()} | Key: ${record.key() ?: "null"} | Message: {
id=${eventsMessage.id},
timestamp=${eventsMessage.timestamp},
data="${eventsMessage.data}",
source="${eventsMessage.source}"
}
""".trimIndent()
fileWriter?.let { writer ->
writer.write(logEntry)
writer.newLine()
writer.flush()
}
println("Message processed and logged: ${eventsMessage.id}")
}
Further improvements
This approach works well for testing, but there are still some optimizations that can be done here. For example, processMessage only writes one message at a time, concurrency should be possible here depending on your I/O type.