Writing a Kafka Producer and High Level Consumer in Clojure

14 Jan 2015


Kafka is a platform for handling real-time data feeds. In some ways it is like a database that exposes semantics of a messaging system.

The Kafka documentation provides an excellent overview which I have provided an extract from:

Kafka is a distributed, partitioned, replicated commit log service. It provides the
functionality of a messaging system, but with a unique design.

* Kafka maintains feeds of messages in categories called topics.
* We'll call processes that publish messages to a Kafka topic producers.
* We'll call processes that subscribe to topics and process the feed of published
messages consumers.
* Kafka is run as a cluster comprised of one or more servers each of which is called
a broker.

In this post we’ll use Clojure to write a producer that periodically writes random integers to a Kafka topic, and a High Level Consumer that reads them back. I am using Kafka and assume it and ZooKeeper are running on localhost. A quickstart is available that can walk you through downloading and starting the services.

The source code for this project is available on GitHub.

Create a Project

We’ll be using Leiningen to build and run our app.

Create a file called project.clj with the following contents:

(defproject hello-world-kafka "0.1.0"
  :description "Create a kafka producer and high level consumer"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :dependencies [[org.clojure/clojure "1.6.0"]
                 [org.apache.kafka/kafka_2.9.2 "" :exclusions [javax.jms/jms
  :aot [hello-world-kafka.core]
  :main hello-world-kafka.core)

Next create a file /src/hello_world_kafka/core.clj with these contents:

(ns hello-world-kafka.core
  (:import (kafka.consumer Consumer ConsumerConfig KafkaStream)
           (kafka.producer KeyedMessage ProducerConfig)
           (kafka.javaapi.producer Producer)
           (java.util Properties)
           (java.util.concurrent Executors))

Producer Code

(defn- create-producer
  "Creates a producer that can be used to send a message to Kafka"
  (let [props (Properties.)]
    (doto props
      (.put "metadata.broker.list" brokers)
      (.put "serializer.class" "kafka.serializer.StringEncoder")
      (.put "request.required.acks" "1"))
    (Producer. (ProducerConfig. props))))

(defn- send-to-producer
  "Send a string message to Kafka"
  [producer topic message]
  (let [data (KeyedMessage. topic nil message)]
    (.send producer data)))

Creating a producer and sending a message is pretty straightforward. Call the create-producer function with a list of Kafka brokers, and when you want to send a message pass the producer to the send-to-producer method along with the name of the topic and the message.

Consumer Code

(defrecord KafkaMessage [topic offset partition key value-bytes])

(defn- create-consumer-config
  "Returns a configuration for a Kafka client."
  (let [props (Properties.)]
    (doto props
      (.put "zookeeper.connect" "")
      (.put "group.id" "group1")
      (.put "zookeeper.session.timeout.ms" "400")
      (.put "zookeeper.sync.time.ms" "200")
      (.put "auto.commit.interval.ms" "1000"))
    (ConsumerConfig. props)))

(defn- consume-messages
  "Continually consume messages from a Kafka topic and write message value to stdout."
  [stream thread-num]
  (let [it (.iterator ^KafkaStream stream)]
    (println (str "Starting thread " thread-num))
    (while (.hasNext it)
      (as-> (.next it) msg
            (KafkaMessage. (.topic msg) (.offset msg) (.partition msg) (.key msg) (.message msg))
            (println (str "Received on thread " thread-num ": " (String. (:value-bytes msg))))))
    (println (str "Stopping thread " thread-num))))

(defn- start-consumer-threads
  "Start a thread for each stream."
  [thread-pool kafka-streams]
  (loop [streams kafka-streams
         index 0]
    (when (seq streams)
      (.submit thread-pool (cast Callable #(consume-messages (first streams) index)))
      (recur (rest streams) (inc index)))))

A few things to take note of:

  • The message is stored in Kafka as bytes, so in this case we need to turn the bytes into a String.

  • In consume-messages the call to the KafkaStream iterator .hasNext function is reading from a single partition of it’s topic and will block until a message is received. So to read from multiple partitions we will need to run multiple threads of the consume-messages function.

  • The threads will run in a thread pool we later create with a java.util.concurrent.Executors

  • Clojure functions implement both Runnable and Callable, but since the executor’s submit function is overloaded and can accept either, we must explicitly cast the function to a Callable.

Application code

(defn -main
  "Pull messages from a Kafka topic using the High Level Consumer"
  [topic num-threads]
  (let [consumer (Consumer/createJavaConsumerConnector (create-consumer-config))
        consumer-map (.createMessageStreams consumer {topic (Integer/parseInt num-threads)})
        kafka-streams (.get consumer-map topic)
        thread-pool (Executors/newFixedThreadPool (Integer/parseInt num-threads))
        producer (create-producer "")]

    ;; Clean up on a SIGTERM or Ctrl-C
    (.addShutdownHook (Runtime/getRuntime)
                      (Thread. #(do (.shutdown consumer)
                                    (.shutdown thread-pool))))

    ;; Connect and start listening for messages on Kafka
    (start-consumer-threads thread-pool kafka-streams)

    ;; Send a random int to Kafka every 500 milliseconds
    (loop []
      (let [num (str (rand-int 1000))]
        (println (str "Sending to Kafka topic " topic ": " num))
        (send-to-producer producer topic num)
        (Thread/sleep 500)

The above code sets up our thread pools, creates and starts some consumers, and then sends a random integer between 0 and 999 to a topic every 500 milliseconds.

Running from the command line expects arguments like this:

$ lein trampoline run <topic> <num consumer threads>

We use lein trampoline so we can catch a SIGTERM or Control-C and clean up prior to shutting down.

If we want to send and read messages from a topic called random_numbers and use 2 threads for the consumers we can start the app like this:

$ lein trampoline run random_numbers 2

You should see output that looks something like this:

Starting thread 0
Starting thread 1
Sending to Kafka topic random_numbers: 753
Received on thread 1: 753
Sending to Kafka topic random_numbers: 971
Received on thread 1: 971
Sending to Kafka topic random_numbers: 56
Received on thread 1: 56
Sending to Kafka topic random_numbers: 536
Received on thread 1: 536
Sending to Kafka topic random_numbers: 589
Received on thread 1: 589
Stopping thread 0
Stopping thread 1
comments powered by Disqus