Kafka 0.9.0 brings with it a bevy of goodness. Many of those features are aimed at operations, like security. Some are developer features, like a new Consumer API. This post will focus on the new features of 0.9.0 for developers.

New Consumer API

The most notable change is a brand-new consumer API. Unfortunately, there isn’t a good way to distinguish between the 0.7.x/0.8.x consumer and the new 0.9.x consumer. We’ve started to call the 0.9.0 consumer the “new” consumer and the 0.7.x/0.8.x consumer the “old” consumer.

The new consumer is a great replacement for the SimpleConsumer and the old consumer. IMHO, it gives the best of the both worlds. It has the relative simplicity of the old consumer while adding the power user features of the SimpleConsumer.

HelloWorld Consumer

The JavaDocs for the new consumer have all of the sample code. You’ll want to look there for code samples.

For now, here’s what a very simple new consumer looks like:

KafkaConsumer<String, String> consumer;

public void createConsumer() {
  String topic = "hello_topic";

  Properties props = new Properties();
  // Configure initial location bootstrap servers
  props.put("bootstrap.servers", "broker1:9092");
  // Configure consumer group
  props.put("group.id", "group1");
  // Configure key and value deserializers
  props.put("key.deserializer", 
      "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", 
      "org.apache.kafka.common.serialization.StringDeserializer");

  // Create the consumer and subscribe to the topic
  consumer = new KafkaConsumer<String, String>(props);
  consumer.subscribe(Arrays.asList(topic));

  while (true) {
    // Poll for ConsumerRecords for a certain amount of time
    ConsumerRecords<String, String> records = consumer.poll(100);

    // Process the ConsumerRecords, if any, that came back
    for (ConsumerRecord<String, String> record : records) {
            String key = record.key();
            String value = record.value();

            // Do something with message
    }
  }
}

There are some notable changes here. One big one is the consumer.poll() instead of the old it.hasNext(). The poll will only block for a specified amount of time while the hasNext blocks until a message comes in.

Also missing is the cumbersome Map to subscribe to topics and the number of streams/threads to consume them with. There’s a much cleaner subscribe to a List of topics.

Old Versus New Consumer

As of right now, the new consumer doesn’t have as many production miles on it. The old consumer has vastly more usage. The old consumer isn’t available in the 0.9.0 JAR. However, old consumers can still access the 0.9.0 brokers.

The 0.9.0 consumer assumes all offsets are stored in Kafka. If you have an existing old consumer, you’ll need to move your offsets to Kafka from ZooKeeper by setting offsets.storage=kafka and dual.commit.enabled=true in your 0.8.2 consumer code. Once you’ve moved the offsets to Kafka, you can start using a new consumer at the same place. Another option is to save out the current old consumer’s offset location and start the new consumer at the same offset.

Advanced Consumers

The SimpleConsumer used to be the only way to do advanced consumption. The new consumer changes that. The new consumer can easily consume a single partition. It can manage and change its offset location too. This was a common ask in the community. Check out the JavaDocs to see these examples.

Kafka Connect

Another notable improvement for developers is not having to develop. With Kafka Connect, we get a framework for moving data in and out of Kafka. This way, we can configure data movement instead writing code.

The community has taken up this framework and are creating many new connectors. Confluent has a page keeping track of the ones that have been created so far.