Question regarding thread-local variables within a stream processing topology

Greetings,

I am relatively new to both Kafka and Java, I am reading Mastering Kafka Streams and ksqlDB provided by Confluent as a starting point.

In the crypto sentiment app developed throughout chapter 3, one of the steps in the stream processing topology uses KStream.flatMapValues to perform sentiment analysis on the input records, like so in my implementation of the book’s project:

    // Merge the English and Translated streams
    KStream<byte[], Tweet> merged = ...

    // Create a new Enriched stream of sentiment analyzed tweets (EntitySentiment records)
    KStream<byte[], EntitySentiment> enriched = merged.flatMapValues(
      (tweet) -> {
        // Get the sentiment analyzed records
        return sentimentClient.getEntitiesSentiment(tweet);
      }
    );

Both in my implementation and in the sample project from the book a new instance of sentimentClient is created in the Topology’s build method (in the book it’s called languageClient and also provides translation). The GcpClient implementation of this language/sentiment client from the book uses a ThreadLocal variable of type LanguageServiceClient (from Google’s NLP client library) which is used to perform sentiment analysis within the getEntitiesSentiment method called in flatMapValues (above).

My question is, why does the LanguageServiceClient variable (named nlpClients in the book’s implementation) have to be thread-local? I understand there may be multiple instances of the Topology running in parallel, but since a new sentiment/language client is instantiated in the Topology’s build method, isn’t there a separate client in each thread already? Does the variable actually have to be thread-local?

Are you referring to mastering-kafka-streams-and-ksqldb/CryptoTopology.java at 924bc71b394baf3284c21dedc498b8f5e98898b9 · mitch-seymour/mastering-kafka-streams-and-ksqldb · GitHub?

If yes, note that this will be called only once. The method builds the Topology that is a logical representation of the program. This topology is given to the KafkaStreams client once, and it will be translated into a internal representation called “processor topology” – there will be multiple instances of “processor topology” (one per task; not per thread) but they would share the same GcpClient, because there is only one such object.

Thus, GcpClient needs to be thread-safe because a single KafkaStreams client might run multiple StreamThreads (as configured via num.stream.threads).

I see, thank you. I was under the impression the build method was called once per thread.

1 Like