Confluent Kafka Queue Length

The documentation for the Python Confluent Kafka module includes a len function on the producer. I wanted to use the function because we’re getting a number of duplicated messages on the client, and I was trying to isolate what might be causing the problem. Unfortunately, calling producer.len() failed indicating there’s no len() method. I used dir(producer) to show that, no, there isn’t a len() method.

I realized today that the documentation is telling me that I can call the built-in len() function on a producer to get the queue length.

Code:

print(f"Before produce there are {len(producer)} messages awaiting delivery")
producer.produce(topic, key=bytes(str(int(cs.timestamp) ), 'utf8'), value=cs.SerializeToString() )
print(f"After produce there are {len(producer)} messages awaiting delivery")
producer.poll(0) # Per https://github.com/confluentinc/confluent-kafka-python/issues/16 for queue full error
print(f"After poll0 there are {len(producer)} messages awaiting delivery")

Output:

Before produce there are 160 messages awaiting delivery
After produce there are 161 messages awaiting delivery
After poll0 there are 155 messages awaiting delivery

Leave a Reply

Your email address will not be published. Required fields are marked *