Apache Kafka MongoDB weird behaviour during measure time

I have a problem when measuring the write time from Kafka to MongoDB.
These are my docker-compose files:

version: "3.6"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper
    container_name: zookeeper
    networks:
      - localnet
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      KAFKA_JMX_PORT: 35000

  broker:
    image: confluentinc/cp-kafka:latest
    hostname: broker
    container_name: broker
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    networks:
      - localnet
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_LISTENERS: LISTENER_1://broker:29092,LISTENER_2://broker:9092
      KAFKA_ADVERTISED_LISTENERS: LISTENER_1://broker:29092,LISTENER_2://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_1:PLAINTEXT,LISTENER_2:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"
      KAFKA_DELETE_TOPIC_ENABLE: "true"
      KAFKA_JMX_PORT: 35000
      
  schema-registry:
    image: confluentinc/cp-schema-registry:latest
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    networks:
      - localnet
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "broker:29092"
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081    
          
  connect:
    build:
      context: .
      dockerfile: connect.Dockerfile
    ports:
      - "35000:35000"
      - "8083:8083"
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
    networks:
      - localnet
    environment:
      KAFKA_JMX_PORT: 35000
      KAFKA_JMX_HOSTNAME: localhost
      CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: connect-cluster-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_ZOOKEEPER_CONNECT: "zookeeper:2181"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_CONNECTIONS_MAX_IDLE_MS: 180000
      CONNECT_METADATA_MAX_AGE_MS: 180000
      CONNECT_AUTO_CREATE_TOPICS_ENABLE: "true"
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"

  mongo1:
    image: mongo:latest
    container_name: mongo1
    command: --replSet rs0 --oplogSize 128
    ports:
      - "35001:27017"
    depends_on:
      - zookeeper
      - broker
      - connect
    networks:
      - localnet
    restart: always

  mongo1-setup:
    image: mongo:latest
    container_name: mongo1-setup
    volumes: 
      - ./config-replica.js:/config-replica.js
    depends_on:
      - mongo1
    networks:
      - localnet
    entrypoint:
      [
        "bash",
        "-c",
        "sleep 10 && mongosh --host mongo1:27017 config-replica.js && sleep 10",
      ]
    restart: "no"
    
    
  control-center:
    image: confluentinc/cp-enterprise-control-center:latest
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021 
    networks:
      - localnet

networks:
  localnet:
    attachable: true


FROM confluentinc/cp-kafka-connect:latest

RUN confluent-hub install --no-prompt --verbose mongodb/kafka-connect-mongodb:latest


ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"

a Connector configuration:

{
    "name": "mongo-sink",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
        "topics": "IoT",
        "connection.uri": "mongodb://mongo1",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "database": "IoT",
        "value.converter.schemas.enable": "false",
        "writemodel.strategy": "org.example.klas",
        "collection": "devices",
        "transforms": "InsertField",
        "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertField.timestamp.field": "insertedToKafka",
        "timeseries.timefield": "measureTime",
        "timeseries.timefield.auto.convert": "true",
        "timeseries.timefield.auto.convert.date.format": "yyyy-MM-dd'T'HH:mm:ss'Z'"
    }
}

And my custom write strategy:

package org.example;

import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.WriteModel;
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
import com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy;
import com.mongodb.kafka.connect.sink.writemodel.strategy.WriteModelStrategy;
import org.bson.BsonDateTime;
import org.bson.BsonDocument;

public class klas implements WriteModelStrategy {

    private final static String ID_FIELD_NAME = "_id";
    private final static String EVENT_TYPE_FIELD_NAME = "eventType";
    @Override
    public WriteModel<BsonDocument> createWriteModel(SinkDocument sinkDocument) {
        BsonDocument vd = (BsonDocument)sinkDocument.getValueDoc().orElseThrow(() -> {
            return new RuntimeException("Could not build the WriteModel,the value document was missing unexpectedly");
        });
        vd = vd.append("insertTime",new BsonDateTime(System.currentTimeMillis()));
        return new InsertOneModel(vd);
    }
}

Generally I would like to measure how much time do I need to consume data from Apache Kafka and save data in MongoDB.
To get time I put data into kafka I have used this part of configuration file

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.timestamp.field": "insertedToKafka"

Document in MongoDB looks like:

{
    measureTime: ISODate("2023-07-15T19:39:15.977Z"),
    _id: ObjectId("64b3df9edcf32746323e15fe"),
    value: 91.40457465058628,
    insertedToKafka: Long("1689509790584"),
    externalId: Long("4355"),
    deviceId: '0d036333-83aa-4565-80ad-d4db4238a910',
    deviceName: 'TERMOMETER',
    insertTime: ISODate("2023-07-16T12:16:30.587Z")
  }

To save time when MongoDB save data I have used upper write model strategy(insertTime in document)

I used Java program to send data on Kafka. It work quite well but in case I want to slow down (using Thread.sleep(20)) a while loop which put data into Kafka sometimes insertTime > insertedToKafka.
Is this problem with Kafka or with Thread.sleep()? How could I avoid it?

This is code I have been using to send data on Kafka

  public static void main(String[] args) throws IOException {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    List<String> anotherList = new ArrayList<>();



    BufferedReader reader = new BufferedReader(new FileReader("data.txt"));
    String topic = "IoT";
    String line;
    String kafkaIndex="0";
        Producer<String, String> producer = new KafkaProducer<>(properties);
        while ((line = reader.readLine()) != null) {
            if(line.contains("TERMOMETER"))
            {
                kafkaIndex = "1";
            }
            if(line.contains("HIGROMETER"))
            {
                kafkaIndex = "3";
            }
            if(line.contains("BAROMETER"))
            {
                kafkaIndex = "4";
            }

            ProducerRecord<String, String> record = new ProducerRecord<>(topic,kafkaIndex, line);
            var x = producer.send(record, (recordMetadata, e) -> {
                if (e == null) {
                    // the record was successfully sent
                    anotherList.add(String.valueOf(recordMetadata.timestamp()));
                } else {
                    System.out.println("Error while producing"+e);
                }
            });

            try {
                Thread.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            producer.flush();

        }
    producer.close();
        givenDataArray_whenConvertToCSV_thenOutputCreated(anotherList,"timeFromKafka.csv");

    }

When I add Thread.sleep(20) a subtraction between insertTime and insertedToKafa sometimes is between -50k ms and +50k ms

I have been looking for any solution or tip but can’t find anything usefull

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.