I am using kafka streams processor API in my applicaton. after forwarding the event to sink topic I am seeing a delay of more than 1ms for the next event to come .
Is there a way to tune streams configuration to prevent the delay?
I am using kafka streams processor API in my applicaton. after forwarding the event to sink topic I am seeing a delay of more than 1ms for the next event to come .
Is there a way to tune streams configuration to prevent the delay?
What do you exactly mean? What (and how?) do you exactly measure? – Is your application lagging?
Hello mjsax,
I’m setting the ‘completeTime’ property wit the current time in millis after forwarding the event to sink topic in the topology.
this.context.forward(cEvent.getDeviceId(), cEvent, To.child(sinkTopic));
System.setProperty("completeTime",System.currentTimeMillis()+"");
and in the source processor I’m calculating the diff of the ‘completeTime’ property and the current time using which ‘avg’ is computed. The following are the statements:
long currTime = System.currentTimeMillis();
long endTime = Long.getLong("completeTime");
long diffTime = currTime - endTime;
totalTime = totalTime + diffTime;
totalCount++;
avg = totalTime/totalCount;
I observed that the ‘avg’ is more than 1ms which is reducing the no.of events processed in 1sec. I am running the streams application with default configuration. Also, when I am concurrently producing events @1000 events/sec into the source topic of the topology the value of ‘avg’ is more than 3ms. Is there any way using which I can reduce this delay?
The following is the Configuration of source topic:
Topic: fm.preprocessor_event_correlator PartitionCount:1 ReplicationFactor:1 Configs:segment.bytes=1073741824,retention.ms=3600000,max.message.bytes=10485880,segment.ms=3000000
Topic: fm.preprocessor_event_correlator Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Thanks
Shrenik
Using the avg
is not a good measurement. For example, if there is no data buffered and the consumer needs to fetch data from the brokers, it will take some time giving you a latency spike that will drive up the average significantly. You should use a percentile to get better insight on the latency distribution instead.
You might also want to produce data (like 100K messages) before you start you application if you want to measure the read path. Otherwise, there might also be latency because no data is available yet.
Last: I am wondering why you are interesting in this particular latency?
Thanks for the reply
I had started streams application when the LAG was more than 200k for the consumer group and I am continuously producing @1000 events/sec into the source topic even then I was seeing delay of more than 1ms for the next coming record.
I am using the following configuration:
StreamsConfig.cache.max.bytes.buffering=104857600
StreamsConfig.topic.max.message.bytes=10000000
StreamsConfig.max.poll.records=5000
StreamsConfig.buffered.records.per.partition=5000
StreamsConfig.max.partition.fetch.bytes=10485760
I had even set the buffered.records.per.partition to 5000 and and max.poll.records to 5000 but that did not help.
I am interested in this latency because it had reduced the number events processed in 1sec significantly. I had even checked the time application is taking to process 1 event and forward it to sink topic and it is around 1.2ms .
I am having few queries
Not 100% sure. The Consumer should send fetch request in the background to reduce latency when poll()
is called. Maybe you can dig into consumer metrics and logs to see how often a fetch request is sent, what the latency of it is, and how much data it returns.
Thank you @mjsax your advice helped.
I see that the process-latency-avg is constant but the commit-latency-avg is around 22ms for some runs of the application and othertimes it goes to 630+ms because of which the process-rate decreases. I have set the commit rate at 1/sec. Any idea why this happens as the process-latency-avg is same in both cases?
Below are the details of kafka platform:
##Apache Kafka® 2.3.0