Version:
sh-4.4# curl -X GET localhost:8000
{"version":"7.5.0-ccs","commit":"ff3c201baa948d97889dc26c99d7cdc23d038f2e","kafka_cluster_id":"3eDK72ZES8S_t_jbTv_5Mw"}sh-4.4
We are using Debezium on Kafka connect and schema registry. Over the past few weeks, we are detecting data loss on debezium restart. And the sequence of events is always the same
- The Debezium pod running the connector is churned.
- A new pod is bootstrapped.
- The connector attempts to resume from the last commit LSN.
- A
SocketTimeoutException
occurs when kafka connect tries to connect to schema registry. - The system then resumes normal operation.
Since there’s always a SocketTimeoutException whenever we identify data loss, we are curious if kafka connect is somehow dropping some records
Below are the logs
[2024-10-26 00:27:27,928] ERROR Failed to send HTTP request to endpoint: http://<schema-registry>/subjects/cdc.<redacted>.public.cdc_audit_heartbeat-key/versions?normalize=false (io.confluent.kafka.schemaregistry.client.rest.RestService)
java.net.SocketTimeoutException: connect timed out
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
at java.base/java.net.Socket.connect(Socket.java:615)
at java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:177)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:509)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:604)
at java.base/sun.net.www.http.HttpClient.<init>(HttpClient.java:277)
at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:376)
at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:397)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1258)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1192)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1086)
at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:1020)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1372)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1347)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:303)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:408)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:588)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:576)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:311)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerWithResponse(CachedSchemaRegistryClient.java:427)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerWithResponse(CachedSchemaRegistryClient.java:404)
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.registerWithResponse(AbstractKafkaSchemaSerDe.java:512)
at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufSerializer.serializeImpl(AbstractKafkaProtobufSerializer.java:121)
at io.confluent.connect.protobuf.ProtobufConverter$Serializer.serialize(ProtobufConverter.java:182)
at io.confluent.connect.protobuf.ProtobufConverter.fromConnectData(ProtobufConverter.java:98)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$5(AbstractWorkerSourceTask.java:484)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.convertTransformedRecord(AbstractWorkerSourceTask.java:484)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:396)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:361)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)