Connector Debezium SQL - Failed

Hello.
I installed confluent-7.0.1 following the instructions Quick Start for Confluent Platform | Confluent Documentation

Installed by debezium team
confluent-hub installs debezium/debezium-connector-sqlserver:1.7.1

and assigned connector

{
  "name": "SqlServer-SQL-TT",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "database.server.name": "TESSA35-SQL-TT",
    "database.dbname": "Dev01_Tessa",
    "database.hostname": "TESSA35-SQL-TT",
    "database.port": "1433",
    "database.user": "kafka",
    "database.password": "********",
    "database.history.kafka.bootstrap.servers": "kf-tt.ipa:9092",
    "database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY"
  }
}

After a few minutes, the connector goes into the failed status. The logs say this:

[2022-02-02 10:40:22,489] ERROR [SqlServer-SQL-TT|task-0] Producer failure (io.debezium.pipeline.ErrorHandler:31)
io.debezium.DebeziumException: java.lang.RuntimeException: com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.
        at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:79)
        at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:118)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.
        at io.debezium.relational.RelationalSnapshotChangeEventSource.rollbackTransaction(RelationalSnapshotChangeEventSource.java:526)
        at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:149)
        at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:70)
        ... 6 more
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.
        at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:234)
        at com.microsoft.sqlserver.jdbc.SQLServerConnection.checkClosed(SQLServerConnection.java:1088)
        at com.microsoft.sqlserver.jdbc.SQLServerConnection.rollback(SQLServerConnection.java:3153)
        at io.debezium.relational.RelationalSnapshotChangeEventSource.rollbackTransaction(RelationalSnapshotChangeEventSource.java:523)
        ... 8 more
[2022-02-02 10:40:22,492] INFO [SqlServer-SQL-TT|task-0] Connected metrics set to 'false' (io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics:70)
[2022-02-02 10:40:26,990] INFO [SqlServer-SQL-TT|task-0|offsets] WorkerSourceTask{id=SqlServer-SQL-TT-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-02 10:41:26,990] INFO [SqlServer-SQL-TT|task-0|offsets] WorkerSourceTask{id=SqlServer-SQL-TT-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-02 10:42:26,991] INFO [SqlServer-SQL-TT|task-0|offsets] WorkerSourceTask{id=SqlServer-SQL-TT-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-02 10:43:26,992] INFO [SqlServer-SQL-TT|task-0|offsets] WorkerSourceTask{id=SqlServer-SQL-TT-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-02 10:44:26,994] INFO [SqlServer-SQL-TT|task-0|offsets] WorkerSourceTask{id=SqlServer-SQL-TT-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)

Please help, what’s wrong?

hey @Administor

did you check SQL Server logfiles?
seems to be related to sql server or firewall.

Hello.
Thanks for the answer.
The firewall is disabled on the sql server and ports 8081, 8082, 8083, 9092, 2181 are registered on the kafka server.
Found an error:

Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1371686 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

The given parameters in the configuration files:
server.properties

message.max.bytes=20000000
replica.fetch.max.bytes=20000000
max.requested.size=20000000
fetch.message.max.bytes=20000000

producer.properties
max.requested.size=20000000

But the error is still the same:
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1371686 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

How to be?

Hi @Administor

the correct setting is
max.request.size instead of

afaik.

which Kafka/Confluent version are you running?

Best,
Michael

Excuse me. That’s how it’s written.

server.properties

message.max.bytes=20000000
replica.fetch.max.bytes=20000000
max.request.size=20000000
fetch.message.max.bytes=20000000

producer.properties
max.request.size=20000000

version confluent-7.0.1

ok I see.
did you adapt the setting for the topic also?

I am installing this for the first time.
Can you elaborate?

it’s possible to set the value mentioned above also on topic level e.g.

kafka-topics.sh --bootstrap-server localhost:9092 --create --topic yourtesttopic --partitions 1 --replication-factor 1 --config max.message.bytes=20971520

The topic is created and here are its parameters.

General settings

name
testtopic

partitions
1

compression.type
producer

confluent.value.schema.validation
false

leader.replication.throttled.replicas

confluent.key.subject.name.strategy
io.confluent.kafka.serializers.subject.TopicNameStrategy

message.downconversion.enable
true

min.insync.replicas
1

segment.jitter.ms
0

cleanup.policy
delete

flush.ms
9223372036854775807

confluent.tier.local.hotset.ms
86400000

follower.replication.throttled.replicas

confluent.tier.local.hotset.bytes
-1

confluent.value.subject.name.strategy
io.confluent.kafka.serializers.subject.TopicNameStrategy

segment.bytes
1073741824

retention.ms
604800000

flush.messages
9223372036854775807

confluent.tier.enable
false

confluent.tier.segment.hotset.roll.min.bytes
104857600

confluent.segment.speculative.prefetch.enable
false

message.format.version
3.0-IV1

max.compaction.lag.ms
9223372036854775807

file.delete.delay.ms
60000

max.message.bytes
20971520

min.compaction.lag.ms
0

message.timestamp.type
CreateTime

preallocate
false

confluent.placement.constraints

min.cleanable.dirty.ratio
0.5

index.interval.bytes
4096

unclean.leader.election.enable
false

retention.bytes
-1

delete.retention.ms
86400000

confluent.prefer.tier.fetch.ms
-1

confluent.key.schema.validation
false

segment.ms
604800000

message.timestamp.difference.max.ms
9223372036854775807

segment.index.bytes
10485760

But here is the automatically created topic, it also has max.message.bytes
20000000

General settings

name
TESSA35-SQL-TT.dbo.FileContent

partitions
1

compression.type
producer

confluent.value.schema.validation
false

leader.replication.throttled.replicas

--

confluent.key.subject.name.strategy
io.confluent.kafka.serializers.subject.TopicNameStrategy

message.downconversion.enable
true

min.insync.replicas
1

segment.jitter.ms
0

cleanup.policy
delete

flush.ms
9223372036854775807

confluent.tier.local.hotset.ms
86400000

follower.replication.throttled.replicas

confluent.tier.local.hotset.bytes
-1

confluent.value.subject.name.strategy
io.confluent.kafka.serializers.subject.TopicNameStrategy

segment.bytes
1073741824

retention.ms
604800000

flush.messages
9223372036854775807

confluent.tier.enable
false

confluent.tier.segment.hotset.roll.min.bytes
104857600

confluent.segment.speculative.prefetch.enable
false

message.format.version
3.0-IV1

max.compaction.lag.ms
9223372036854775807

file.delete.delay.ms
60000

max.message.bytes
20000000

min.compaction.lag.ms
0

message.timestamp.type
CreateTime

preallocate
false

confluent.placement.constraints

min.cleanable.dirty.ratio

0.5

index.interval.bytes

4096

unclean.leader.election.enable

false

retention.bytes
-1

delete.retention.ms
86400000

confluent.prefer.tier.fetch.ms
-1

confluent.key.schema.validation
false

segment.ms
604800000

message.timestamp.difference.max.ms
9223372036854775807

segment.index.bytes
10485760

hmm ok I see
are there more information in the logs?

best
michael

are there more information in the logs?
which?

some more details around the error above?
error is in kafka connect logfile right?

Here is a small extended log. Yes, taken from Connect.log

[2022-02-09 11:23:59,527] INFO [SqlServer-SQL-TT|task-0]         For table 'Dev01_Tessa.dbo.DynamicRoles' using select statement: 'SELECT [ID], [Name], [SqlText], [SchedulingTypeID], [CronScheduling], [PeriodScheduling], [LastErrorDate], [LastErrorText], [LastSuccessfulRecalcDate], [ScheduleAtLaunch] FROM [Dev01_Tessa].[dbo].[DynamicRoles]' (io.debezium.relational.RelationalSnapshotChangeEventSource:348)
[2022-02-09 11:23:59,539] INFO [SqlServer-SQL-TT|task-0]         Finished exporting 8 records for table 'Dev01_Tessa.dbo.DynamicRoles'; total duration '00:00:00.044' (io.debezium.relational.RelationalSnapshotChangeEventSource:393)
[2022-02-09 11:23:59,539] INFO [SqlServer-SQL-TT|task-0] Exporting data from table 'Dev01_Tessa.dbo.Errors' (66 of 428 tables) (io.debezium.relational.RelationalSnapshotChangeEventSource:340)
[2022-02-09 11:23:59,541] INFO [SqlServer-SQL-TT|task-0]         For table 'Dev01_Tessa.dbo.Errors' using select statement: 'SELECT [ID], [ActionID], [TypeID], [TypeCaption], [CardID], [CardDigest], [Request], [Category], [Text], [Modified], [ModifiedByID], [ModifiedByName] FROM [Dev01_Tessa].[dbo].[Errors]' (io.debezium.relational.RelationalSnapshotChangeEventSource:348)
[2022-02-09 11:23:59,543] INFO [SqlServer-SQL-TT|task-0]         Finished exporting 0 records for table 'Dev01_Tessa.dbo.Errors'; total duration '00:00:00.004' (io.debezium.relational.RelationalSnapshotChangeEventSource:393)
[2022-02-09 11:23:59,543] INFO [SqlServer-SQL-TT|task-0] Exporting data from table 'Dev01_Tessa.dbo.FileCategories' (67 of 428 tables) (io.debezium.relational.RelationalSnapshotChangeEventSource:340)
[2022-02-09 11:23:59,544] INFO [SqlServer-SQL-TT|task-0]         For table 'Dev01_Tessa.dbo.FileCategories' using select statement: 'SELECT [ID], [Name] FROM [Dev01_Tessa].[dbo].[FileCategories]' (io.debezium.relational.RelationalSnapshotChangeEventSource:348)
[2022-02-09 11:23:59,545] INFO [SqlServer-SQL-TT|task-0]         Finished exporting 0 records for table 'Dev01_Tessa.dbo.FileCategories'; total duration '00:00:00.002' (io.debezium.relational.RelationalSnapshotChangeEventSource:393)
[2022-02-09 11:23:59,546] INFO [SqlServer-SQL-TT|task-0] Exporting data from table 'Dev01_Tessa.dbo.FileContent' (68 of 428 tables) (io.debezium.relational.RelationalSnapshotChangeEventSource:340)
[2022-02-09 11:23:59,549] INFO [SqlServer-SQL-TT|task-0]         For table 'Dev01_Tessa.dbo.FileContent' using select statement: 'SELECT [VersionRowID], [Content], [Ext] FROM [Dev01_Tessa].[dbo].[FileContent]' (io.debezium.relational.RelationalSnapshotChangeEventSource:348)
[2022-02-09 11:24:01,789] ERROR [SqlServer-SQL-TT|task-0] WorkerSourceTask{id=SqlServer-SQL-TT-0} failed to send record to TESSA35-SQL-TT.dbo.FileContent:  (org.apache.kafka.connect.runtime.WorkerSourceTask:384)
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1371709 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
[2022-02-09 11:24:04,485] INFO [SqlServer-SQL-TT|task-0|offsets] WorkerSourceTask{id=SqlServer-SQL-TT-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-09 11:24:06,883] INFO [SqlServer-SQL-TT|task-0] WorkerSourceTask{id=SqlServer-SQL-TT-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-09 11:24:06,933] ERROR [SqlServer-SQL-TT|task-0] WorkerSourceTask{id=SqlServer-SQL-TT-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:206)
org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback
        at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:294)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:272)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:199)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:254)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1371709 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
[2022-02-09 11:24:06,987] INFO [SqlServer-SQL-TT|task-0] Stopping down connector (io.debezium.connector.common.BaseSourceTask:241)
[2022-02-09 11:24:51,954] INFO [SqlServerConnectorConnector_1|task-0|offsets] WorkerSourceTask{id=SqlServerConnectorConnector_1-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-09 11:25:04,625] INFO [SqlServer-SQL-TT|task-0|offsets] WorkerSourceTask{id=SqlServer-SQL-TT-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-09 11:25:04,625] WARN [SqlServer-SQL-TT|task-0|offsets] Couldn't commit processed log positions with the source database due to a concurrent connector shutdown or restart (io.debezium.connector.common.BaseSourceTask:292)
[2022-02-09 11:25:37,014] WARN [SqlServer-SQL-TT|task-0] Coordinator didn't stop in the expected time, shutting down executor now (io.debezium.pipeline.ChangeEventSourceCoordinator:189)
[2022-02-09 11:25:51,965] INFO [SqlServerConnectorConnector_1|task-0|offsets] WorkerSourceTask{id=SqlServerConnectorConnector_1-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-09 11:26:04,629] INFO [SqlServer-SQL-TT|task-0|offsets] WorkerSourceTask{id=SqlServer-SQL-TT-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-09 11:26:04,629] WARN [SqlServer-SQL-TT|task-0|offsets] Couldn't commit processed log positions with the source database due to a concurrent connector shutdown or restart (io.debezium.connector.common.BaseSourceTask:292)
[2022-02-09 11:26:51,965] INFO [SqlServerConnectorConnector_1|task-0|offsets] WorkerSourceTask{id=SqlServerConnectorConnector_1-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-09 11:27:04,629] INFO [SqlServer-SQL-TT|task-0|offsets] WorkerSourceTask{id=SqlServer-SQL-TT-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-09 11:27:04,630] WARN [SqlServer-SQL-TT|task-0|offsets] Couldn't commit processed log positions with the source database due to a concurrent connector shutdown or restart (io.debezium.connector.common.BaseSourceTask:292)
[2022-02-09 11:27:07,057] INFO [SqlServer-SQL-TT|task-0] Connection gracefully closed (io.debezium.jdbc.JdbcConnection:965)
[2022-02-09 11:27:07,069] INFO [SqlServer-SQL-TT|task-0] [Producer clientId=TESSA35-SQL-TT-dbhistory] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1208)
[2022-02-09 11:27:07,102] INFO [SqlServer-SQL-TT|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:676)
[2022-02-09 11:27:07,103] INFO [SqlServer-SQL-TT|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:680)
[2022-02-09 11:27:07,103] INFO [SqlServer-SQL-TT|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:686)
[2022-02-09 11:27:07,112] INFO [SqlServer-SQL-TT|task-0] App info kafka.producer for TESSA35-SQL-TT-dbhistory unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2022-02-09 11:27:07,117] INFO [SqlServer-SQL-TT|task-0] [Producer clientId=connector-producer-SqlServer-SQL-TT-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1208)
[2022-02-09 11:27:07,123] INFO [SqlServer-SQL-TT|task-0] Publish thread interrupted for client_id=connector-producer-SqlServer-SQL-TT-0 client_type=PRODUCER session= cluster=EfR3zRWUSWGKxQMocOo-7w (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor:285)
[2022-02-09 11:27:07,124] INFO [SqlServer-SQL-TT|task-0] Publishing Monitoring Metrics stopped for client_id=connector-producer-SqlServer-SQL-TT-0 client_type=PRODUCER session= cluster=EfR3zRWUSWGKxQMocOo-7w (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor:297)
[2022-02-09 11:27:07,124] INFO [SqlServer-SQL-TT|task-0] [Producer clientId=confluent.monitoring.interceptor.connector-producer-SqlServer-SQL-TT-0] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer:1208)
[2022-02-09 11:27:07,130] INFO [SqlServer-SQL-TT|task-0] Snapshot - Final stage (io.debezium.pipeline.source.AbstractSnapshotChangeEventSource:82)
[2022-02-09 11:27:07,144] INFO [SqlServer-SQL-TT|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:676)
[2022-02-09 11:27:07,145] INFO [SqlServer-SQL-TT|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:680)
[2022-02-09 11:27:07,145] INFO [SqlServer-SQL-TT|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:686)
[2022-02-09 11:27:07,146] INFO [SqlServer-SQL-TT|task-0] App info kafka.producer for confluent.monitoring.interceptor.connector-producer-SqlServer-SQL-TT-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2022-02-09 11:27:07,146] INFO [SqlServer-SQL-TT|task-0] Closed monitoring interceptor for client_id=connector-producer-SqlServer-SQL-TT-0 client_type=PRODUCER session= cluster=EfR3zRWUSWGKxQMocOo-7w (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor:320)
[2022-02-09 11:27:07,146] INFO [SqlServer-SQL-TT|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:676)
[2022-02-09 11:27:07,146] INFO [SqlServer-SQL-TT|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:680)
[2022-02-09 11:27:07,146] INFO [SqlServer-SQL-TT|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:686)
[2022-02-09 11:27:07,146] INFO [SqlServer-SQL-TT|task-0] App info kafka.producer for connector-producer-SqlServer-SQL-TT-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2022-02-09 11:27:07,718] INFO [SqlServer-SQL-TT|task-0] Removing locking timeout (io.debezium.connector.sqlserver.SqlServerSnapshotChangeEventSource:244)
[2022-02-09 11:27:07,721] ERROR [SqlServer-SQL-TT|task-0] Producer failure (io.debezium.pipeline.ErrorHandler:31)
io.debezium.DebeziumException: java.lang.RuntimeException: com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.
        at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:79)
        at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:118)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.
        at io.debezium.relational.RelationalSnapshotChangeEventSource.rollbackTransaction(RelationalSnapshotChangeEventSource.java:526)
        at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:149)
        at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:70)
        ... 6 more
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.
        at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:234)
        at com.microsoft.sqlserver.jdbc.SQLServerConnection.checkClosed(SQLServerConnection.java:1088)
        at com.microsoft.sqlserver.jdbc.SQLServerConnection.rollback(SQLServerConnection.java:3153)
        at io.debezium.relational.RelationalSnapshotChangeEventSource.rollbackTransaction(RelationalSnapshotChangeEventSource.java:523)
        ... 8 more
[2022-02-09 11:27:07,732] INFO [SqlServer-SQL-TT|task-0] Connected metrics set to 'false' (io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics:70)
[2022-02-09 11:27:51,967] INFO [SqlServerConnectorConnector_1|task-0|offsets] WorkerSourceTask{id=SqlServerConnectorConnector_1-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)

ok seems there is still an issue with the connector config
could you share the current config

you could get it with sth like

curl -X GET http://<yourconnectcluster>:8083/connectors/SqlServer-SQL-TT/status | jq

hth

This is what it shows

curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1352  100  1352    0     0   220k      0 --:--:-- --:--:-- --:--:--  220k
{
  "name": "SqlServer-SQL-TT",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.21.254.102:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "10.21.254.102:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:294)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:272)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:199)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:254)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1371709 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.\n"
    }
  ],
  "type": "source"
}

thx could also execute the following

curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT | jq

and

curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT/tasks | jq

[kafka@1c-kf-tt bin]$ curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   554  100   554    0     0  32588      0 --:--:-- --:--:-- --:--:-- 32588
{
  "name": "SqlServer-SQL-TT",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "database.dbname": "Dev01_Tessa",
    "database.user": "kafka",
    "database.hostname": "TESSA35-SQL-TT",
    "database.password": "*******",
    "database.history.kafka.bootstrap.servers": "1c-kf-tt.ipa:9092",
    "database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY",
    "name": "SqlServer-SQL-TT",
    "database.server.name": "TESSA35-SQL-TT",
    "errors.log.enable": "true",
    "database.port": "1433"
  },
  "tasks": [
    {
      "connector": "SqlServer-SQL-TT",
      "task": 0
    }
  ],
  "type": "source"
}
[kafka@1c-kf-tt bin]$ curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT/tasks | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   579  100   579    0     0   1659      0 --:--:-- --:--:-- --:--:--  1659
[
  {
    "id": {
      "connector": "SqlServer-SQL-TT",
      "task": 0
    },
    "config": {
      "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
      "database.dbname": "Dev01_Tessa",
      "database.user": "kafka",
      "task.class": "io.debezium.connector.sqlserver.SqlServerConnectorTask",
      "database.hostname": "TESSA35-SQL-TT",
      "database.history.kafka.bootstrap.servers": "1c-kf-tt.ipa:9092",
      "database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY",
      "database.password": "*******",
      "name": "SqlServer-SQL-TT",
      "database.server.name": "TESSA35-SQL-TT",
      "errors.log.enable": "true",
      "database.port": "1433"
    }
  }

ok how do the connect.properties look like?

best,
michael

[root@1c-kf-tt kafka]# cat connect-standalone.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java,/usr/local/kafka/kafka3.0/confluent-7.0.1/share/confluent-hub-components

[root@1c-kf-tt kafka]# cat connect-distributed.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##

# This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
# to be used with the examples, and some settings may differ from those used in a production system, especially
# the `bootstrap.servers` and those specifying replication factors.

# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=localhost:9092

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
config.storage.topic=connect-configs
config.storage.replication.factor=1

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.
# Specify hostname as 0.0.0.0 to bind to all interfaces.
# Leave hostname empty to bind to default interface.
# Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084"
#listeners=HTTP://:8083

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
# If not set, it uses the value for "listeners" if configured.
#rest.advertised.host.name=
#rest.advertised.port=
#rest.advertised.listener=

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java,/usr/local/kafka/kafka3.0/confluent-7.0.1/share/confluent-hub-components

[root@1c-kf-tt ksqldb]# cat /usr/local/kafka/kafka3.0/etc/ksqldb/connect.properties

# Sample configuration for a distributed Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092

# The group ID is a unique identifier for the set of workers that form a single Kafka Connect
# cluster
group.id=ksql-connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

# Internal Storage Topics.
#
# Kafka Connect distributed workers store the connector and task configurations, connector offsets,
# and connector statuses in three internal topics. These topics MUST be compacted.
# When the Kafka Connect distributed worker starts, it will check for these topics and attempt to create them
# as compacted topics if they don't yet exist, using the topic name, replication factor, and number of partitions
# as specified in these properties, and other topic-specific settings inherited from your brokers'
# auto-creation settings. If you need more control over these other topic-specific settings, you may want to
# manually create these topics before starting Kafka Connect distributed workers.
#
# The following properties set the names of these three internal topics for storing configs, offsets, and status.
config.storage.topic=ksql-connect-configs
offset.storage.topic=ksql-connect-offsets
status.storage.topic=ksql-connect-statuses

# The following properties set the replication factor for the three internal topics, defaulting to 3 for each
# and therefore requiring a minimum of 3 brokers in the cluster. Since we want the examples to run with
# only a single broker, we set the replication factor here to just 1. That's okay for the examples, but
# ALWAYS use a replication factor of AT LEAST 3 for production environments to reduce the risk of
# losing connector offsets, configurations, and status.
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

# The config storage topic must have a single partition, and this cannot be changed via properties.
# Offsets for all connectors and tasks are written quite frequently and therefore the offset topic
# should be highly partitioned; by default it is created with 25 partitions, but adjust accordingly
# with the number of connector tasks deployed to a distributed worker cluster. Kafka Connect records
# the status less frequently, and so by default the topic is created with 5 partitions.
#offset.storage.partitions=25
#status.storage.partitions=5

# The offsets, status, and configurations are written to the topics using converters specified through
# the following required properties. Most users will always want to use the JSON converter without schemas.
# Offset and config data is never visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false

# fill this configuration in to use custom connectors
# plugin.path=

so I would recommend to set the max.request.size here as well