Apache Kafka Series | Session 11 | Consumer Assign & Seek

As we already discussed, our consumer can be assigned to many different topics / partitions in Kafka. If we wanted to replay messages that were sent to a particular topic partition, we can make use of assign() and seek(). We can also define from which offset the replay has to begin.

You can’t use assign() and seek() without a topic and partition. Also, we can’t use seek() without assign(), it will throw an exception otherwise.

it is not possible to use both manual partition assignment with assign(Collection) and group assignment with subscribe(Collection, ConsumerRebalanceListener).

seek() overrides the fetch offsets that the consumer will use on the next poll(timeout). If this API is invoked for the same partition more than once, the latest offset will be used on the next poll(). Let’s see an example for assign & seek.

package kafka.tutorial.basics.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerDemoAssignAndSeek {
    public static void main(String[] args) {
        final String topic = "first_topic";
        Logger logger = LoggerFactory.getLogger(ConsumerDemoAssignAndSeek.class.getName());
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        TopicPartition partitionToReadFrom = new TopicPartition(topic, 1);
        consumer.assign(Arrays.asList(partitionToReadFrom));

        consumer.seek(partitionToReadFrom, 15L);

        int totalNumberOfMessagesToRead = 5;
        int numberOfMessagesReadSoFar = 0;
        boolean keepOnReading = true;

        while (keepOnReading) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord record : records) {
                numberOfMessagesReadSoFar += 1;
                logger.info("Key: " + record.key() + "\n" +
                        "Value: " + record.value() + "\n" +
                        "Partition: " + record.partition() + "\n" +
                        "Offset: " + record.offset()
                );
                if (numberOfMessagesReadSoFar >= totalNumberOfMessagesToRead) {
                    keepOnReading = false;
                    break;
                }
            }
        }
        logger.info("Application has exited");
    }
}

As you can see, we’ve removed group_id from the consumer as assign and group are mutually exclusive. We can assign any number of partitions as a list, but here only one partition is given. And, finally the consumer will seek from an offset and we can replay messages from that offset.

We’ve also added a break statement based on a condition to stop the infinite while loop.

Apache Kafka Series | Session 10 | Consumer re-balancing

We’ve already created a topic namely tweet, with 3 partitions and 1 replication-factor. Also this consumer is a part of the consumer group, kafka-tweet-group. Suppose if we run a java consumer, let’s check what Kafka writes to the console. We’re using the same console, that we’ve used in our last example.

1. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Finished assignment for group at generation 30: {consumer-kafka-tweet-group-1-0049dca3-c916-46d2-8da4-6422fe17ed70=Assignment(partitions=[tweet-0, tweet-1, tweet-2])}
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 30
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=[tweet-0, tweet-1, tweet-2])
5. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: tweet-0, tweet-2, tweet-1

It’s clear from the above log that this consumer joining / rejoining to the consumer group kafka-tweet-group. Also it’s given that this consumer will consume all messages, that are being sent to the partitions [tweet-0, tweet-1, tweet-2].

Cool, now let’s run one more consumer and see what will happen at the console. Keep the existing consumer in running state.

If you are facing issues with running the same class in parallel in IntelliJ, then follow these steps:

1. Select the class, that you wanted to make run in parallel, from the configuration drop-down on the top right on the IDE.
2. Click Run menu
3. Select Edit Configurations option.
4. On the pop-up window, select "Allow Parallel run" option.
5. Click OK. Now the configuration is saved.

We’ve received following logs in the console for consumer-1 and consumer-2

consumer-1
1. (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Finished assignment for group at generation 31: {consumer-kafka-tweet-group-1-0049dca3-c916-46d2-8da4-6422fe17ed70=Assignment(partitions=[tweet-0, tweet-1]), consumer-kafka-tweet-group-1-9b8b10aa-7174-4af3-ae3e-6ad4964aeca4=Assignment(partitions=[tweet-2])}
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 31
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=[tweet-0, tweet-1])
5. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: tweet-0, tweet-1
consumer-2
1. (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 31
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=[tweet-2])
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: tweet-2

It’s clear that the joining / re-joining is happening whenever a new consumer joins to the same consumer group. This is called consumer re-balancing. And it’s also comprehensible that consumer-1 will consume all messages, that are sent to the partitions [tweet-0, tweet-1], and consumer-2 will consume messages that are sent to [tweet-2].

Let’s run one more instance of the consumer by keeping the existing instances in the running state. This is what we received for each consumer instance.

consumer-1
1. (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Finished assignment for group at generation 32: {consumer-kafka-tweet-group-1-0049dca3-c916-46d2-8da4-6422fe17ed70=Assignment(partitions=[tweet-0]), consumer-kafka-tweet-group-1-21a062ff-aeaf-46d3-ad06-fc3334d4c44e=Assignment(partitions=[tweet-1]), consumer-kafka-tweet-group-1-9b8b10aa-7174-4af3-ae3e-6ad4964aeca4=Assignment(partitions=[tweet-2])}
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 32
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=[tweet-0])
5. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: tweet-0
consumer-2
1. (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 32
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=[tweet-2])
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: tweet-2
consumer-3
1. (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 32
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=[tweet-1])
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: tweet-1

Again, the re-balancing happens between the consumers. All the consumers are joining / re-joining to the same group and now each consumer will have their own partition. Consumer-1 will consume from [tweet-0], consumer-2 from [tweet-2] and consumer-3 from [tweet-1].

Now, we can try one more scenario. We’ve 3 partitions for the topic and all the three partitions were distributed evenly among 3 consumers. Now, let’s find out what will happen if we run one more consumer.

consumer-1
1. (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Finished assignment for group at generation 33: {consumer-kafka-tweet-group-1-0049dca3-c916-46d2-8da4-6422fe17ed70=Assignment(partitions=[tweet-0]), consumer-kafka-tweet-group-1-21a062ff-aeaf-46d3-ad06-fc3334d4c44e=Assignment(partitions=[tweet-2]),  consumer-kafka-tweet-group-1-9b8b10aa-7174-4af3-ae3e-6ad4964aeca4=Assignment(partitions=[]), consumer-kafka-tweet-group-1-00563a69-8d00-4318-af7c-8128f93b3ea9=Assignment(partitions=[tweet-1])}
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 33
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=[tweet-0])
5. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: tweet-0
consumer-2
1. (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 33
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=[tweet-2])
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: tweet-2
consumer-3

1. (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 33
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=)
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: 
consumer-4

1. (Re-)joining group
2. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Successfully joined group with generation 33
3. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Notifying assignor about the new Assignment(partitions=[tweet-1])
4. [Consumer clientId=consumer-kafka-tweet-group-1, groupId=kafka-tweet-group] Adding newly assigned partitions: tweet-1

This is quite interesting, the joining / re-joining process and re-balancing happened again and 3 partitions were distributed among 4 consumers. Consumer-1 will consume from [tweet-0], consumer-2 from [tweet-2], consumer-4 from [tweet-1], but consumer-3 will not consume any messages as no partitions allocated for that. If we start more and more consumers, the same thing will going to happen. Some consumers will be allocated with various partitions, and some will not.

Apache Kafka Series | Session 9 | Java Consumer

Now, we’re going to learn how a java consumer for Kafka is being created. Here, we should define Bootstrap server IP, Key deserializer and Value deserializer classes like we did at producer class. For a producer we’d defined Key and Value serializer classes, here it’s deserializer classes, that’s the difference.

If you haven’t read Java Producer article before, please have a look at this.

Apart from the above mentioned properties, we’ll have to add two more properties. First one is the Group ID, which we’ve already explained in one of our previous posts. The Group ID is mandatory for Java consumer, otherwise we’ll get an exception like this.

Exception in thread "main" org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.

Second one is the auto.offset.reset property, which accepts three values of type String. The property defines, what to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

It accepts following values:

  • earliest: automatically reset the offset to the earliest offset (equivalent to --from-beginning)
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer’s group

If we pass anything else, it will throw an exception.

This is our consumer class

package kafka.basics.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerDemo {
    public static void main(String[] args) {
        Logger logger = LoggerFactory.getLogger(ConsumerDemo.class.getName());
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kafka-tweet-group");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList("tweet"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord record : records) {
                logger.info("Key: " + record.key() + "\n" +
                        "Value: " + record.value() + "\n" +
                        "Partition: " + record.partition() + "\n" +
                        "Offset: " + record.offset()
                );
            }
        }
    }
}

Run the consumer class first, and whenever we send a message to the topic tweet, by using the java producer, that we have already discussed here, this consumer consumes that message and outputs like this.

Key: null
Value: {"created_at":"Mon Nov 30 12:26:49 +0000 2020","id":1358987003086311427,"text":"mi ne aaj ke ipl maich mein csk ko chaar viket se haraaya","favorited":false,"retweeted":false,"lang":"hi","timestamp_ms":"1606755809325"}
Partition: 2
Offset: 3

Apache Kafka Series | Session 8 | Java Producer with Keys

Here we’re going to see how a key is added to a ProducerRecord that is being sent, what the significance of a key is.

For this example, I’ve created a new topic, namely kafka_callback_topic with 7 partitions.

kafka-topics --zookeeper localhost:2181 --topic kafka_callback_topic --create --partitions 7 --replication-factor 1

Then, I’ve created a Java producer, in which I’ve added a loop and records are being sent within the loop. I’ve added get() method (line no. 41 ) at the end of callback to make the send() method synchronous. But, bear in mind, you should not use this method in production, as it directly affects the performance of your application.

package kafka.basics.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ProducerDemoWithCallback {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        final Logger logger = LoggerFactory.getLogger(ProducerDemoWithCallback.class);

        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        final String topic = "kafka_callback_topic";
        for (int i = 0; i < 10; i++) {
            final String key = "id_" + Integer.toString(i);
            final String value = "value_" + Integer.toString(i);
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
            System.out.println("key: " + i + ", value: " + value);
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (null == e) {
                        logger.info("Received new metadata. \n" +
                                "Topic: " + recordMetadata.topic() + "\n" +
                                "Partition: " + recordMetadata.partition() + "\n" +
                                "Timestamp: " + recordMetadata.timestamp() + "\n" +
                                "Offset: " + recordMetadata.offset()
                        );
                    } else {
                        logger.error("Error while producing", e);
                    }
                }
            }).get();
        }
        producer.flush();
        producer.close();
    }
}

If I don’t make it synchronous, System.out.println() will print all the key-value pairs consecutively, and only after that we could see the messages are being sent. That’s why I made it synchronous.

See, how the record is now being created. A new parameter, key, was added to the record and sent to Kafka (line no. 26). And this is the output of the callback method I received.

key: 0, value: value_0
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: _prRXd9bSnarqvHdH04m0A
[kafka-producer-network-thread | producer-1] INFO kafka.basics.producer.ProducerDemoWithCallback - Received new metadata. 
Topic: kafka_callback_topic
Partition: 2
Timestamp: 1620406097319
Offset: 20
key: 1, value: value_1
[kafka-producer-network-thread | producer-1] INFO kafka.basics.producer.ProducerDemoWithCallback - Received new metadata. 
Topic: kafka_callback_topic
Partition: 4
Timestamp: 1620406097355
Offset: 30
key: 2, value: value_2
[kafka-producer-network-thread | producer-1] INFO kafka.basics.producer.ProducerDemoWithCallback - Received new metadata. 
Topic: kafka_callback_topic
Partition: 5
Timestamp: 1620406097359
Offset: 30
key: 3, value: value_3
[kafka-producer-network-thread | producer-1] INFO kafka.basics.producer.ProducerDemoWithCallback - Received new metadata. 
Topic: kafka_callback_topic
Partition: 0
Timestamp: 1620406097361
Offset: 20
key: 4, value: value_4
[kafka-producer-network-thread | producer-1] INFO kafka.basics.producer.ProducerDemoWithCallback - Received new metadata. 
Topic: kafka_callback_topic
Partition: 3
Timestamp: 1620406097363
Offset: 20
key: 5, value: value_5
[kafka-producer-network-thread | producer-1] INFO kafka.basics.producer.ProducerDemoWithCallback - Received new metadata. 
Topic: kafka_callback_topic
Partition: 2
Timestamp: 1620406097366
Offset: 21
key: 6, value: value_6
[kafka-producer-network-thread | producer-1] INFO kafka.basics.producer.ProducerDemoWithCallback - Received new metadata. 
Topic: kafka_callback_topic
Partition: 5
Timestamp: 1620406097367
Offset: 31
key: 7, value: value_7
[kafka-producer-network-thread | producer-1] INFO kafka.basics.producer.ProducerDemoWithCallback - Received new metadata. 
Topic: kafka_callback_topic
Partition: 0
Timestamp: 1620406097369
Offset: 21
key: 8, value: value_8
[kafka-producer-network-thread | producer-1] INFO kafka.basics.producer.ProducerDemoWithCallback - Received new metadata. 
Topic: kafka_callback_topic
Partition: 3
Timestamp: 1620406097372
Offset: 21
key: 9, value: value_9
[kafka-producer-network-thread | producer-1] INFO kafka.basics.producer.ProducerDemoWithCallback - Received new metadata. 
Topic: kafka_callback_topic
Partition: 4
Timestamp: 1620406097374
Offset: 31

And, this is what I received at the consumer

value_0 //Partition 2
value_1 //Partition 4
value_2 //Partition 5
value_3 //Partition 0
value_6 //Partition 3
value_4 //Partition 2
value_5 //Partition 5
value_9 //Partition 0
value_7 //Partition 3
value_8 //Partition 4

So, when I run the same code again, what happens is, the message with the same key will go the same partition, and it will happen forever. That is the significance of the key.

If you remember, we’ve already discussed a real-world example for Kafka in the first post in Kafka series. A fleet of trucks sending it’s position to a Kafka every 20 seconds, and the same is consumed and monitored by a mobile application. It’s true that the messages from different trucks will go the same partition, but when we use a key here, say truck_id, along with the message, it will make sure that the messages sent by a particular truck receives at the consumer in order, and it will help the consumer to draw a path of a given truck.

If we don’t use a key, say truck_id, the messages from the same truck may go to different partitions and there’s a chance that the second, third or fourth message may be consumed before the first message.

As you can see the messages received at the consumer is not in order, but the messages consumed from a particular partition will always be in order. That means 1st and 6th (value_0 and value_4) messages went to the same partition (partition-2), and if I run the application again and again, 1st and 6th messages will always go the same partition (because, the key for 1st and 6th messages will always be same). But it’s not necessary that both 1st and 6th messages will go the partition-2, it may go to any partition, like partition-3, partition-6, etc.

Apache Kafka Series | Session 7 | Java Producer Callback

We’ve already seen how to create a Kafka producer using Java in our last post. Now we are going to see what the producer callback is.

Whenever a message is being sent to Kafka, it returns a metadata about the message to the producer, which is called Callback.

Inside the KafkaProducer.class we can see a method like this, which means we can add a callback method as a second parameter in the send() method.

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return this.doSend(interceptedRecord, callback);
    }

Now, we can add this callback method in our example. I’ve created a new topic for this, namely "callback_topic".

kafka-topics --zookeeper localhost:2181 --topic callback_topic --create --partitions 3 --replication-factor 1

The topic has been created with partitions and replication factor. Now we’re going to create Java producer with callback functionality.

package kafka.basics.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class ProducerDemoWithCallback {

    public static void main(String[] args) {
        final Logger logger = LoggerFactory.getLogger(ProducerDemoWithCallback.class);

        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
               ProducerRecord<String, String> record = new ProducerRecord<String, String>("callback_topic",
                "kafka diy for callback");

        producer.send(record, new Callback() {
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (null == e) {
                    logger.info("Received new metadata. \n" +
                            "Topic: " + recordMetadata.topic() + "\n" +
                            "Partition: " + recordMetadata.partition() + "\n" +
                            "Timestamp: " + recordMetadata.timestamp() + "\n" +
                            "Offset: " + recordMetadata.offset()
                    );
                } else {
                    logger.error("Error while producing", e);
                }
            }
        });

        producer.flush();
        producer.close();
    }
}

When the above code was executed, I received the following metadata.

We’ve received the following output from the logger.

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.6.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 62abe01bee039651
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1620330297493
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: _prRXd9bSnarqvHdH04m0A
[kafka-producer-network-thread | producer-1] INFO kafka.basics.producer.ProducerDemoWithCallback - Received new metadata. 

And, here’s the actual metadata that was being logged. It’s been clear that the message was sent to the 0th position of the 1st partition.

Received new metadata. 
Topic: callback_topic
Partition: 1
Timestamp: 1620330298037
Offset: 0