How to measure streaming time on each record?

Hello frens,
I’ve add some metrics to my cryptocurrency streaming application (it source all cryptocurrency transactions from 60 markets and aggregates average prices on it to make a visualisation) and the results worries me.

It takes about 20 ms on my old computer from the moment when service1 sends a message to Kafka to the moment when service2 consumes it.

but simmilar measurement on Kafka Streams shows me about 150 ms latency!

Fragment of the code:

// service 1:
kStream.process() -> new Processor() {
...
@Override
public void process(K k, V v) {
    timerZero.record(now() - v.getEventTime());
}
...
}
...
kStream.to("myTopic", Produced.with(myKeySerde, myValSerde);
kStream.process(() -> new Processor() {
...
@Override
public void process(K k, V v){
    timerStart.record(now() - v.getEventTime());
}
...
}

// service 2:
streamsBuilder.stream("myTopic", Consumed.with(myKeySerde, myValSerce))
.process(() -> new Processor() {
...
@Override
public void process(K k, V v){
    timerStop.record(now() - v.getEventTime());
}
...
}

example results:

timerZero: 20 ms. after event Time
timerStart: 20 ms. after eventTime
timerStop: 120 ms. after eventTime!!!

Is it a good way to measure it?

I would take a look at GitHub - opentracing-contrib/java-kafka-client: OpenTracing Instrumentation for Apache Kafka Client
This + i.e. Jaeger gives you a very good visualization where you “loose” time.

You can use Jaeger in a docker Container without external persistence for an easy Start.
In my experience you can setup this in about 2 hours locally (incl Research of how exactly)

1 Like

so you mean measure times in the interceptors/callbacks of consumer and producer

Yes, this is imho an easy way to go. For sure it won’t give you details in the steps between one producer/consumer in your Pipeline but I assume you are not doing compute intensive stuff there

To my experience when it comes down to <100 Milliseconds e2e latency in kstreams you need to tune a lot to achieve this.
Each producer/consumer adds latency due Network, polling …

Yeah, but assuming I have measured times well then why using normal kafka write/read takes 20ms and in kafka streams 150 ms…

Kstreams has some configs like commit.interval.ms which - depending on your topology- can slow things down in terms of latency to usually enable higher throughput.

I would have a look on linger.ms and commit.interval as a first step

1 Like

this is the solution. After setting commit.interval.ms=0 it takes 10 ms. now!

Just be careful with this setting. If you have higher throughput, you could put quite some load on the broker if you commit all the time.

1 Like

@mjsax: I already had some challenges in finding the “best fit” in terms to optimize latency vs. throughput and load in some topologies. Do you have some best practice material about what parameters at all might be relevant and where to be careful (trade-offs) - I think that would be quite interesting. i.e. smaller commit.interval imho reduces throughput per partition and increases load on broker.
I just did it based on my experience (incl. try-and-error ^^) and on some google research and would be interested on some more insights on that topic.

I am not aware of any comprehensive single document from the top of my head.

hmm… following Sir @an0r0c 's advice I have created my own Producer with callback. The callback sends metrics. So after every message sent to Kafka the metrics to Prometheus are sent too.

class MyProducer implements Producer<String, TracedRecord> {
...
    @Override
    public Future<RecordMetadata> send(ProducerRecord<String, TracedRecord> producerRecord) {
        return send(producerRecord, ((recordMetadata, e) -> {
            timer.record(clock.millis() - producerRecord.value().getBirthTimestamp(), TimeUnit.MILLISECONDS);
        }));
    }
...

To make Kafka Streams use it I must create KafkaClientSupplier which will return this producer. But look at the types of the producer, it’s byte ! The KafkaClientSupplier is not generic!

public interface KafkaClientSupplier {
...
    Producer<byte[], byte[]> getProducer(Map<String, Object> var1);
...
}

And my Producer is Producer<String, TracedRecord>. It must be like this because I measure the diff between clock.millis() and tradeRecord.getBirthTimestamp().
Deserializing byte to TraceRecord in every callback (so for every message) will be overkill, frens.

Do you really need to deserialize it? It’s a will until I used that last time but I think opentracing also just adds headers to the record therefore deserialization shouldn’t be necessary.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.