Hello, I am building a service in Python to consume messages in last N days from Kafka.
I looked into the Consumer.offsets_for_times function but I’m confused by that it accepts timestamps in the TopicPartition.offset field.
How is a offset equivalent to a timestamp?
Thanks.
as per the kafka-python client , offsets for the given partitions by timestamp. when we query with the timestamp , the returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
ran some test with client to expose offset for given timestamp.
{TopicPartition(topic='FOO', partition=0): OffsetAndTimestamp(offset=149, timestamp=1645238993441)}
here is my code snippet
dtstmp = datetime(2022,2,18)
s = int(dtstm.timestamp() * 1000)
msg_from = consumer.offsets_for_times({tp: s})
I thought offsets_for_times accepts TopicPartition, but you are passing a dict to it. Are you sure it is supported?
yes it does, please check out the python documentation.
Parameters:
timestamps (dict) – {TopicPartition: int} mapping from partition to the timestamp to look up. Unit should be milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC))
gotcha, here is what I’d would do with module confluent_kafka
dtstmp = datetime(2022,2,18)
s = int(dtstm.timestamp() * 1000)
# topic+partitions with timestamps in the TopicPartition.offset field
tp=list(map(lambda p: confluent_kafka.TopicPartition(topic, p, s), range(0, 5)))
consumer.subscribe([topic])
print( consumer.offsets_for_times(tp))