Unit testing your Kafka code is incredibly important. It’s transporting your most important data. As of 0.9.0 there’s a new way to unit test with mock objects.

Refactoring Your Producer

First of all, you’ll need to be able to change your Producer at runtime. Instead of using the KafkaProducer object directly, you’ll use the Producer interface.

public Producer<String, String> producer;

You can use whichever method for dependency injection, but I’m making the Producer public so I can change it from the unit test.

Next, you’ll want to refactor the code for creating your KafkaProducer. The creation of the KafkaProducer should be in separate method that won’t get called by your production Producer code.

You’ll also need to refactor the code that sends to the data to the Producer object. This code will need to be callable from the unit test.

Unit Testing Your Producer

Kafka unit tests of the Producer code use MockProducer object. The @Before will initialize the MockProducer before each test.

MockProducer<String, String> producer;

@Before
public void setUp() {
    producer = new MockProducer<String, String>(
            true, new StringSerializer(), new StringSerializer());
}

Have you been searching for the best data engineering training? You’ve found it. Sign up for my list so you can get my Professional Data Engineering course.

Once we’ve set the objects up, we can start testing.

@Test
public void testProducer() throws IOException {
    MyTestKafkaProducer myTestKafkaProducer = new MyTestKafkaProducer();
    myTestKafkaProducer.producer = producer;

    myTestKafkaProducer.send();

    List<ProducerRecord<String, String>> history = producer.history();

    List<ProducerRecord<String, String>> expected = Arrays.asList(
            new ProducerRecord<String, String>("my_topic", "mykey", "myvalue0"),
            new ProducerRecord<String, String>("my_topic", "mykey", "myvalue1"),
            new ProducerRecord<String, String>("my_topic", "mykey", "myvalue2"),
            new ProducerRecord<String, String>("my_topic", "mykey", "myvalue3"),
            new ProducerRecord<String, String>("my_topic", "mykey", "myvalue4"));

    Assert.assertEquals("Sent didn't match expected", expected, history);
}

We start off by instantiating the Producer we’re wanting to test. We inject our MockProducer into the Producer. We send some data with the Producer. All of the data sent by the Producer can be accessed with the history() method. We create a list of the ProducerRecords we expected. Finally, we can assert that the two lists match each other.

In a future post, I’ll show you how to unit test a KafkaConsumer.