Hello frens,
I’ve add some metrics to my cryptocurrency streaming application (it source all cryptocurrency transactions from 60 markets and aggregates average prices on it to make a visualisation) and the results worries me.
It takes about 20 ms on my old computer from the moment when service1 sends a message to Kafka to the moment when service2 consumes it.
but simmilar measurement on Kafka Streams shows me about 150 ms latency!
Fragment of the code:
// service 1:
kStream.process() -> new Processor() {
...
@Override
public void process(K k, V v) {
timerZero.record(now() - v.getEventTime());
}
...
}
...
kStream.to("myTopic", Produced.with(myKeySerde, myValSerde);
kStream.process(() -> new Processor() {
...
@Override
public void process(K k, V v){
timerStart.record(now() - v.getEventTime());
}
...
}
// service 2:
streamsBuilder.stream("myTopic", Consumed.with(myKeySerde, myValSerce))
.process(() -> new Processor() {
...
@Override
public void process(K k, V v){
timerStop.record(now() - v.getEventTime());
}
...
}
example results:
timerZero: 20 ms. after event Time timerStart: 20 ms. after eventTime timerStop: 120 ms. after eventTime!!!
You can use Jaeger in a docker Container without external persistence for an easy Start.
In my experience you can setup this in about 2 hours locally (incl Research of how exactly)
Yes, this is imho an easy way to go. For sure it won’t give you details in the steps between one producer/consumer in your Pipeline but I assume you are not doing compute intensive stuff there
To my experience when it comes down to <100 Milliseconds e2e latency in kstreams you need to tune a lot to achieve this.
Each producer/consumer adds latency due Network, polling …
Kstreams has some configs like commit.interval.ms which - depending on your topology- can slow things down in terms of latency to usually enable higher throughput.
I would have a look on linger.ms and commit.interval as a first step
@mjsax: I already had some challenges in finding the “best fit” in terms to optimize latency vs. throughput and load in some topologies. Do you have some best practice material about what parameters at all might be relevant and where to be careful (trade-offs) - I think that would be quite interesting. i.e. smaller commit.interval imho reduces throughput per partition and increases load on broker.
I just did it based on my experience (incl. try-and-error ^^) and on some google research and would be interested on some more insights on that topic.
hmm… following Sir @an0r0c 's advice I have created my own Producer with callback. The callback sends metrics. So after every message sent to Kafka the metrics to Prometheus are sent too.
To make Kafka Streams use it I must create KafkaClientSupplier which will return this producer. But look at the types of the producer, it’s byte ! The KafkaClientSupplier is not generic!
And my Producer is Producer<String, TracedRecord>. It must be like this because I measure the diff between clock.millis() and tradeRecord.getBirthTimestamp().
Deserializing byte to TraceRecord in every callback (so for every message) will be overkill, frens.
Do you really need to deserialize it? It’s a will until I used that last time but I think opentracing also just adds headers to the record therefore deserialization shouldn’t be necessary.