I have a problem when measuring the write time from Kafka to MongoDB.
These are my docker-compose files:
version: "3.6"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
container_name: zookeeper
networks:
- localnet
environment:
ZOOKEEPER_CLIENT_PORT: 2181
KAFKA_JMX_PORT: 35000
broker:
image: confluentinc/cp-kafka:latest
hostname: broker
container_name: broker
ports:
- "9092:9092"
depends_on:
- zookeeper
networks:
- localnet
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENERS: LISTENER_1://broker:29092,LISTENER_2://broker:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_1://broker:29092,LISTENER_2://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_1:PLAINTEXT,LISTENER_2:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_JMX_PORT: 35000
schema-registry:
image: confluentinc/cp-schema-registry:latest
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
networks:
- localnet
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "broker:29092"
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
build:
context: .
dockerfile: connect.Dockerfile
ports:
- "35000:35000"
- "8083:8083"
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
networks:
- localnet
environment:
KAFKA_JMX_PORT: 35000
KAFKA_JMX_HOSTNAME: localhost
CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: connect-cluster-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_ZOOKEEPER_CONNECT: "zookeeper:2181"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_CONNECTIONS_MAX_IDLE_MS: 180000
CONNECT_METADATA_MAX_AGE_MS: 180000
CONNECT_AUTO_CREATE_TOPICS_ENABLE: "true"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
mongo1:
image: mongo:latest
container_name: mongo1
command: --replSet rs0 --oplogSize 128
ports:
- "35001:27017"
depends_on:
- zookeeper
- broker
- connect
networks:
- localnet
restart: always
mongo1-setup:
image: mongo:latest
container_name: mongo1-setup
volumes:
- ./config-replica.js:/config-replica.js
depends_on:
- mongo1
networks:
- localnet
entrypoint:
[
"bash",
"-c",
"sleep 10 && mongosh --host mongo1:27017 config-replica.js && sleep 10",
]
restart: "no"
control-center:
image: confluentinc/cp-enterprise-control-center:latest
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
networks:
- localnet
networks:
localnet:
attachable: true
FROM confluentinc/cp-kafka-connect:latest
RUN confluent-hub install --no-prompt --verbose mongodb/kafka-connect-mongodb:latest
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
a Connector configuration:
{
"name": "mongo-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "IoT",
"connection.uri": "mongodb://mongo1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"database": "IoT",
"value.converter.schemas.enable": "false",
"writemodel.strategy": "org.example.klas",
"collection": "devices",
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.timestamp.field": "insertedToKafka",
"timeseries.timefield": "measureTime",
"timeseries.timefield.auto.convert": "true",
"timeseries.timefield.auto.convert.date.format": "yyyy-MM-dd'T'HH:mm:ss'Z'"
}
}
And my custom write strategy:
package org.example;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.WriteModel;
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
import com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy;
import com.mongodb.kafka.connect.sink.writemodel.strategy.WriteModelStrategy;
import org.bson.BsonDateTime;
import org.bson.BsonDocument;
public class klas implements WriteModelStrategy {
private final static String ID_FIELD_NAME = "_id";
private final static String EVENT_TYPE_FIELD_NAME = "eventType";
@Override
public WriteModel<BsonDocument> createWriteModel(SinkDocument sinkDocument) {
BsonDocument vd = (BsonDocument)sinkDocument.getValueDoc().orElseThrow(() -> {
return new RuntimeException("Could not build the WriteModel,the value document was missing unexpectedly");
});
vd = vd.append("insertTime",new BsonDateTime(System.currentTimeMillis()));
return new InsertOneModel(vd);
}
}
Generally I would like to measure how much time do I need to consume data from Apache Kafka and save data in MongoDB.
To get time I put data into kafka I have used this part of configuration file
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.timestamp.field": "insertedToKafka"
Document in MongoDB looks like:
{
measureTime: ISODate("2023-07-15T19:39:15.977Z"),
_id: ObjectId("64b3df9edcf32746323e15fe"),
value: 91.40457465058628,
insertedToKafka: Long("1689509790584"),
externalId: Long("4355"),
deviceId: '0d036333-83aa-4565-80ad-d4db4238a910',
deviceName: 'TERMOMETER',
insertTime: ISODate("2023-07-16T12:16:30.587Z")
}
To save time when MongoDB save data I have used upper write model strategy(insertTime in document)
I used Java program to send data on Kafka. It work quite well but in case I want to slow down (using Thread.sleep(20)) a while loop which put data into Kafka sometimes insertTime > insertedToKafka.
Is this problem with Kafka or with Thread.sleep()? How could I avoid it?
This is code I have been using to send data on Kafka
public static void main(String[] args) throws IOException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
List<String> anotherList = new ArrayList<>();
BufferedReader reader = new BufferedReader(new FileReader("data.txt"));
String topic = "IoT";
String line;
String kafkaIndex="0";
Producer<String, String> producer = new KafkaProducer<>(properties);
while ((line = reader.readLine()) != null) {
if(line.contains("TERMOMETER"))
{
kafkaIndex = "1";
}
if(line.contains("HIGROMETER"))
{
kafkaIndex = "3";
}
if(line.contains("BAROMETER"))
{
kafkaIndex = "4";
}
ProducerRecord<String, String> record = new ProducerRecord<>(topic,kafkaIndex, line);
var x = producer.send(record, (recordMetadata, e) -> {
if (e == null) {
// the record was successfully sent
anotherList.add(String.valueOf(recordMetadata.timestamp()));
} else {
System.out.println("Error while producing"+e);
}
});
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.flush();
}
producer.close();
givenDataArray_whenConvertToCSV_thenOutputCreated(anotherList,"timeFromKafka.csv");
}
When I add Thread.sleep(20)
a subtraction between insertTime and insertedToKafa sometimes is between -50k ms and +50k ms
I have been looking for any solution or tip but can’t find anything usefull