Consume messages in last N days using Python client

Hello, I am building a service in Python to consume messages in last N days from Kafka.

I looked into the Consumer.offsets_for_times function but I’m confused by that it accepts timestamps in the TopicPartition.offset field.

How is a offset equivalent to a timestamp?


as per the kafka-python client , offsets for the given partitions by timestamp. when we query with the timestamp , the returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.

ran some test with client to expose offset for given timestamp.

{TopicPartition(topic='FOO', partition=0): OffsetAndTimestamp(offset=149, timestamp=1645238993441)}

here is my code snippet

dtstmp = datetime(2022,2,18)

s = int(dtstm.timestamp() * 1000)

msg_from  = consumer.offsets_for_times({tp: s})

I thought offsets_for_times accepts TopicPartition, but you are passing a dict to it. Are you sure it is supported?

yes it does, please check out the python documentation.

timestamps (dict) – {TopicPartition: int} mapping from partition to the timestamp to look up. Unit should be milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC))

I am talking about the official client confluent_kafka API — confluent-kafka 1.7.0 documentation

gotcha, here is what I’d would do with module confluent_kafka

dtstmp = datetime(2022,2,18)
s = int(dtstm.timestamp() * 1000)
# topic+partitions with timestamps in the TopicPartition.offset field
tp=list(map(lambda p: confluent_kafka.TopicPartition(topic, p, s), range(0, 5)))
print( consumer.offsets_for_times(tp))

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.