Kafka Connect SocketTimeoutException while connecting to Schema Registry potentially causing data loss

Version:

sh-4.4# curl -X GET localhost:8000
{"version":"7.5.0-ccs","commit":"ff3c201baa948d97889dc26c99d7cdc23d038f2e","kafka_cluster_id":"3eDK72ZES8S_t_jbTv_5Mw"}sh-4.4

We are using Debezium on Kafka connect and schema registry. Over the past few weeks, we are detecting data loss on debezium restart. And the sequence of events is always the same

  1. The Debezium pod running the connector is churned.
  2. A new pod is bootstrapped.
  3. The connector attempts to resume from the last commit LSN.
  4. A SocketTimeoutException occurs when kafka connect tries to connect to schema registry.
  5. The system then resumes normal operation.

Since there’s always a SocketTimeoutException whenever we identify data loss, we are curious if kafka connect is somehow dropping some records

Below are the logs

[2024-10-26 00:27:27,928] ERROR Failed to send HTTP request to endpoint: http://<schema-registry>/subjects/cdc.<redacted>.public.cdc_audit_heartbeat-key/versions?normalize=false (io.confluent.kafka.schemaregistry.client.rest.RestService)
java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.Socket.connect(Socket.java:615)
	at java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:177)
	at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:509)
	at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:604)
	at java.base/sun.net.www.http.HttpClient.<init>(HttpClient.java:277)
	at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:376)
	at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:397)
	at java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1258)
	at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1192)
	at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1086)
	at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:1020)
	at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1372)
	at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1347)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:303)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:408)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:588)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:576)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:311)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerWithResponse(CachedSchemaRegistryClient.java:427)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerWithResponse(CachedSchemaRegistryClient.java:404)
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.registerWithResponse(AbstractKafkaSchemaSerDe.java:512)
	at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufSerializer.serializeImpl(AbstractKafkaProtobufSerializer.java:121)
	at io.confluent.connect.protobuf.ProtobufConverter$Serializer.serialize(ProtobufConverter.java:182)
	at io.confluent.connect.protobuf.ProtobufConverter.fromConnectData(ProtobufConverter.java:98)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$5(AbstractWorkerSourceTask.java:484)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.convertTransformedRecord(AbstractWorkerSourceTask.java:484)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:396)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:361)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)