Kafka 0.9.0 brings with it a bevy of goodness. Many of those features are aimed at operations, like security. Some are developer features, like a new Consumer API. This post will focus on the new features of 0.9.0 for developers.
New Consumer API
The most notable change is a brand-new consumer API. Unfortunately, there isn’t a good way to distinguish between the 0.7.x/0.8.x consumer and the new 0.9.x consumer. We’ve started to call the 0.9.0 consumer the “new” consumer and the 0.7.x/0.8.x consumer the “old” consumer.
The new consumer is a great replacement for the SimpleConsumer
and the old consumer. IMHO, it gives the best of the both worlds. It has the relative simplicity of the old consumer while adding the power user features of the SimpleConsumer
.
HelloWorld Consumer
The JavaDocs for the new consumer have all of the sample code. You’ll want to look there for code samples.
For now, here’s what a very simple new consumer looks like:
KafkaConsumer<String, String> consumer;
public void createConsumer() {
String topic = "hello_topic";
Properties props = new Properties();
// Configure initial location bootstrap servers
props.put("bootstrap.servers", "broker1:9092");
// Configure consumer group
props.put("group.id", "group1");
// Configure key and value deserializers
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// Create the consumer and subscribe to the topic
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
// Poll for ConsumerRecords for a certain amount of time
ConsumerRecords<String, String> records = consumer.poll(100);
// Process the ConsumerRecords, if any, that came back
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
// Do something with message
}
}
}
There are some notable changes here. One big one is the consumer.poll()
instead of the old it.hasNext()
. The poll
will only block for a specified amount of time while the hasNext
blocks until a message comes in.
Also missing is the cumbersome Map to subscribe to topics and the number of streams/threads to consume them with. There’s a much cleaner subscribe
to a List
of topics.
Old Versus New Consumer
As of right now, the new consumer doesn’t have as many production miles on it. The old consumer has vastly more usage. The old consumer isn’t available in the 0.9.0 JAR. However, old consumers can still access the 0.9.0 brokers.
The 0.9.0 consumer assumes all offsets are stored in Kafka. If you have an existing old consumer, you’ll need to move your offsets to Kafka from ZooKeeper by setting offsets.storage=kafka
and dual.commit.enabled=true
in your 0.8.2 consumer code. Once you’ve moved the offsets to Kafka, you can start using a new consumer at the same place. Another option is to save out the current old consumer’s offset location and start the new consumer at the same offset.
Advanced Consumers
The SimpleConsumer
used to be the only way to do advanced consumption. The new consumer changes that. The new consumer can easily consume a single partition. It can manage and change its offset location too. This was a common ask in the community. Check out the JavaDocs to see these examples.
Kafka Connect
Another notable improvement for developers is not having to develop. With Kafka Connect, we get a framework for moving data in and out of Kafka. This way, we can configure data movement instead writing code.
The community has taken up this framework and are creating many new connectors. Confluent has a page keeping track of the ones that have been created so far.