Connector Debezium SQL - Failed

are there more information in the logs?
which?

some more details around the error above?
error is in kafka connect logfile right?

Here is a small extended log. Yes, taken from Connect.log

[2022-02-09 11:23:59,527] INFO [SqlServer-SQL-TT|task-0]         For table 'Dev01_Tessa.dbo.DynamicRoles' using select statement: 'SELECT [ID], [Name], [SqlText], [SchedulingTypeID], [CronScheduling], [PeriodScheduling], [LastErrorDate], [LastErrorText], [LastSuccessfulRecalcDate], [ScheduleAtLaunch] FROM [Dev01_Tessa].[dbo].[DynamicRoles]' (io.debezium.relational.RelationalSnapshotChangeEventSource:348)
[2022-02-09 11:23:59,539] INFO [SqlServer-SQL-TT|task-0]         Finished exporting 8 records for table 'Dev01_Tessa.dbo.DynamicRoles'; total duration '00:00:00.044' (io.debezium.relational.RelationalSnapshotChangeEventSource:393)
[2022-02-09 11:23:59,539] INFO [SqlServer-SQL-TT|task-0] Exporting data from table 'Dev01_Tessa.dbo.Errors' (66 of 428 tables) (io.debezium.relational.RelationalSnapshotChangeEventSource:340)
[2022-02-09 11:23:59,541] INFO [SqlServer-SQL-TT|task-0]         For table 'Dev01_Tessa.dbo.Errors' using select statement: 'SELECT [ID], [ActionID], [TypeID], [TypeCaption], [CardID], [CardDigest], [Request], [Category], [Text], [Modified], [ModifiedByID], [ModifiedByName] FROM [Dev01_Tessa].[dbo].[Errors]' (io.debezium.relational.RelationalSnapshotChangeEventSource:348)
[2022-02-09 11:23:59,543] INFO [SqlServer-SQL-TT|task-0]         Finished exporting 0 records for table 'Dev01_Tessa.dbo.Errors'; total duration '00:00:00.004' (io.debezium.relational.RelationalSnapshotChangeEventSource:393)
[2022-02-09 11:23:59,543] INFO [SqlServer-SQL-TT|task-0] Exporting data from table 'Dev01_Tessa.dbo.FileCategories' (67 of 428 tables) (io.debezium.relational.RelationalSnapshotChangeEventSource:340)
[2022-02-09 11:23:59,544] INFO [SqlServer-SQL-TT|task-0]         For table 'Dev01_Tessa.dbo.FileCategories' using select statement: 'SELECT [ID], [Name] FROM [Dev01_Tessa].[dbo].[FileCategories]' (io.debezium.relational.RelationalSnapshotChangeEventSource:348)
[2022-02-09 11:23:59,545] INFO [SqlServer-SQL-TT|task-0]         Finished exporting 0 records for table 'Dev01_Tessa.dbo.FileCategories'; total duration '00:00:00.002' (io.debezium.relational.RelationalSnapshotChangeEventSource:393)
[2022-02-09 11:23:59,546] INFO [SqlServer-SQL-TT|task-0] Exporting data from table 'Dev01_Tessa.dbo.FileContent' (68 of 428 tables) (io.debezium.relational.RelationalSnapshotChangeEventSource:340)
[2022-02-09 11:23:59,549] INFO [SqlServer-SQL-TT|task-0]         For table 'Dev01_Tessa.dbo.FileContent' using select statement: 'SELECT [VersionRowID], [Content], [Ext] FROM [Dev01_Tessa].[dbo].[FileContent]' (io.debezium.relational.RelationalSnapshotChangeEventSource:348)
[2022-02-09 11:24:01,789] ERROR [SqlServer-SQL-TT|task-0] WorkerSourceTask{id=SqlServer-SQL-TT-0} failed to send record to TESSA35-SQL-TT.dbo.FileContent:  (org.apache.kafka.connect.runtime.WorkerSourceTask:384)
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1371709 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
[2022-02-09 11:24:04,485] INFO [SqlServer-SQL-TT|task-0|offsets] WorkerSourceTask{id=SqlServer-SQL-TT-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-09 11:24:06,883] INFO [SqlServer-SQL-TT|task-0] WorkerSourceTask{id=SqlServer-SQL-TT-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-09 11:24:06,933] ERROR [SqlServer-SQL-TT|task-0] WorkerSourceTask{id=SqlServer-SQL-TT-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:206)
org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback
        at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:294)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:272)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:199)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:254)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1371709 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
[2022-02-09 11:24:06,987] INFO [SqlServer-SQL-TT|task-0] Stopping down connector (io.debezium.connector.common.BaseSourceTask:241)
[2022-02-09 11:24:51,954] INFO [SqlServerConnectorConnector_1|task-0|offsets] WorkerSourceTask{id=SqlServerConnectorConnector_1-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-09 11:25:04,625] INFO [SqlServer-SQL-TT|task-0|offsets] WorkerSourceTask{id=SqlServer-SQL-TT-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-09 11:25:04,625] WARN [SqlServer-SQL-TT|task-0|offsets] Couldn't commit processed log positions with the source database due to a concurrent connector shutdown or restart (io.debezium.connector.common.BaseSourceTask:292)
[2022-02-09 11:25:37,014] WARN [SqlServer-SQL-TT|task-0] Coordinator didn't stop in the expected time, shutting down executor now (io.debezium.pipeline.ChangeEventSourceCoordinator:189)
[2022-02-09 11:25:51,965] INFO [SqlServerConnectorConnector_1|task-0|offsets] WorkerSourceTask{id=SqlServerConnectorConnector_1-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-09 11:26:04,629] INFO [SqlServer-SQL-TT|task-0|offsets] WorkerSourceTask{id=SqlServer-SQL-TT-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-09 11:26:04,629] WARN [SqlServer-SQL-TT|task-0|offsets] Couldn't commit processed log positions with the source database due to a concurrent connector shutdown or restart (io.debezium.connector.common.BaseSourceTask:292)
[2022-02-09 11:26:51,965] INFO [SqlServerConnectorConnector_1|task-0|offsets] WorkerSourceTask{id=SqlServerConnectorConnector_1-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-09 11:27:04,629] INFO [SqlServer-SQL-TT|task-0|offsets] WorkerSourceTask{id=SqlServer-SQL-TT-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)
[2022-02-09 11:27:04,630] WARN [SqlServer-SQL-TT|task-0|offsets] Couldn't commit processed log positions with the source database due to a concurrent connector shutdown or restart (io.debezium.connector.common.BaseSourceTask:292)
[2022-02-09 11:27:07,057] INFO [SqlServer-SQL-TT|task-0] Connection gracefully closed (io.debezium.jdbc.JdbcConnection:965)
[2022-02-09 11:27:07,069] INFO [SqlServer-SQL-TT|task-0] [Producer clientId=TESSA35-SQL-TT-dbhistory] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1208)
[2022-02-09 11:27:07,102] INFO [SqlServer-SQL-TT|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:676)
[2022-02-09 11:27:07,103] INFO [SqlServer-SQL-TT|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:680)
[2022-02-09 11:27:07,103] INFO [SqlServer-SQL-TT|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:686)
[2022-02-09 11:27:07,112] INFO [SqlServer-SQL-TT|task-0] App info kafka.producer for TESSA35-SQL-TT-dbhistory unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2022-02-09 11:27:07,117] INFO [SqlServer-SQL-TT|task-0] [Producer clientId=connector-producer-SqlServer-SQL-TT-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1208)
[2022-02-09 11:27:07,123] INFO [SqlServer-SQL-TT|task-0] Publish thread interrupted for client_id=connector-producer-SqlServer-SQL-TT-0 client_type=PRODUCER session= cluster=EfR3zRWUSWGKxQMocOo-7w (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor:285)
[2022-02-09 11:27:07,124] INFO [SqlServer-SQL-TT|task-0] Publishing Monitoring Metrics stopped for client_id=connector-producer-SqlServer-SQL-TT-0 client_type=PRODUCER session= cluster=EfR3zRWUSWGKxQMocOo-7w (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor:297)
[2022-02-09 11:27:07,124] INFO [SqlServer-SQL-TT|task-0] [Producer clientId=confluent.monitoring.interceptor.connector-producer-SqlServer-SQL-TT-0] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer:1208)
[2022-02-09 11:27:07,130] INFO [SqlServer-SQL-TT|task-0] Snapshot - Final stage (io.debezium.pipeline.source.AbstractSnapshotChangeEventSource:82)
[2022-02-09 11:27:07,144] INFO [SqlServer-SQL-TT|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:676)
[2022-02-09 11:27:07,145] INFO [SqlServer-SQL-TT|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:680)
[2022-02-09 11:27:07,145] INFO [SqlServer-SQL-TT|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:686)
[2022-02-09 11:27:07,146] INFO [SqlServer-SQL-TT|task-0] App info kafka.producer for confluent.monitoring.interceptor.connector-producer-SqlServer-SQL-TT-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2022-02-09 11:27:07,146] INFO [SqlServer-SQL-TT|task-0] Closed monitoring interceptor for client_id=connector-producer-SqlServer-SQL-TT-0 client_type=PRODUCER session= cluster=EfR3zRWUSWGKxQMocOo-7w (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor:320)
[2022-02-09 11:27:07,146] INFO [SqlServer-SQL-TT|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:676)
[2022-02-09 11:27:07,146] INFO [SqlServer-SQL-TT|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:680)
[2022-02-09 11:27:07,146] INFO [SqlServer-SQL-TT|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:686)
[2022-02-09 11:27:07,146] INFO [SqlServer-SQL-TT|task-0] App info kafka.producer for connector-producer-SqlServer-SQL-TT-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2022-02-09 11:27:07,718] INFO [SqlServer-SQL-TT|task-0] Removing locking timeout (io.debezium.connector.sqlserver.SqlServerSnapshotChangeEventSource:244)
[2022-02-09 11:27:07,721] ERROR [SqlServer-SQL-TT|task-0] Producer failure (io.debezium.pipeline.ErrorHandler:31)
io.debezium.DebeziumException: java.lang.RuntimeException: com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.
        at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:79)
        at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:118)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.
        at io.debezium.relational.RelationalSnapshotChangeEventSource.rollbackTransaction(RelationalSnapshotChangeEventSource.java:526)
        at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:149)
        at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:70)
        ... 6 more
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.
        at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:234)
        at com.microsoft.sqlserver.jdbc.SQLServerConnection.checkClosed(SQLServerConnection.java:1088)
        at com.microsoft.sqlserver.jdbc.SQLServerConnection.rollback(SQLServerConnection.java:3153)
        at io.debezium.relational.RelationalSnapshotChangeEventSource.rollbackTransaction(RelationalSnapshotChangeEventSource.java:523)
        ... 8 more
[2022-02-09 11:27:07,732] INFO [SqlServer-SQL-TT|task-0] Connected metrics set to 'false' (io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics:70)
[2022-02-09 11:27:51,967] INFO [SqlServerConnectorConnector_1|task-0|offsets] WorkerSourceTask{id=SqlServerConnectorConnector_1-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)

ok seems there is still an issue with the connector config
could you share the current config

you could get it with sth like

curl -X GET http://<yourconnectcluster>:8083/connectors/SqlServer-SQL-TT/status | jq

hth

This is what it shows

curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1352  100  1352    0     0   220k      0 --:--:-- --:--:-- --:--:--  220k
{
  "name": "SqlServer-SQL-TT",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.21.254.102:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "10.21.254.102:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:294)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:272)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:199)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:254)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1371709 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.\n"
    }
  ],
  "type": "source"
}

thx could also execute the following

curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT | jq

and

curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT/tasks | jq

[kafka@1c-kf-tt bin]$ curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   554  100   554    0     0  32588      0 --:--:-- --:--:-- --:--:-- 32588
{
  "name": "SqlServer-SQL-TT",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "database.dbname": "Dev01_Tessa",
    "database.user": "kafka",
    "database.hostname": "TESSA35-SQL-TT",
    "database.password": "*******",
    "database.history.kafka.bootstrap.servers": "1c-kf-tt.ipa:9092",
    "database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY",
    "name": "SqlServer-SQL-TT",
    "database.server.name": "TESSA35-SQL-TT",
    "errors.log.enable": "true",
    "database.port": "1433"
  },
  "tasks": [
    {
      "connector": "SqlServer-SQL-TT",
      "task": 0
    }
  ],
  "type": "source"
}
[kafka@1c-kf-tt bin]$ curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT/tasks | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   579  100   579    0     0   1659      0 --:--:-- --:--:-- --:--:--  1659
[
  {
    "id": {
      "connector": "SqlServer-SQL-TT",
      "task": 0
    },
    "config": {
      "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
      "database.dbname": "Dev01_Tessa",
      "database.user": "kafka",
      "task.class": "io.debezium.connector.sqlserver.SqlServerConnectorTask",
      "database.hostname": "TESSA35-SQL-TT",
      "database.history.kafka.bootstrap.servers": "1c-kf-tt.ipa:9092",
      "database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY",
      "database.password": "*******",
      "name": "SqlServer-SQL-TT",
      "database.server.name": "TESSA35-SQL-TT",
      "errors.log.enable": "true",
      "database.port": "1433"
    }
  }

ok how do the connect.properties look like?

best,
michael

[root@1c-kf-tt kafka]# cat connect-standalone.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java,/usr/local/kafka/kafka3.0/confluent-7.0.1/share/confluent-hub-components

[root@1c-kf-tt kafka]# cat connect-distributed.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##

# This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
# to be used with the examples, and some settings may differ from those used in a production system, especially
# the `bootstrap.servers` and those specifying replication factors.

# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=localhost:9092

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
config.storage.topic=connect-configs
config.storage.replication.factor=1

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.
# Specify hostname as 0.0.0.0 to bind to all interfaces.
# Leave hostname empty to bind to default interface.
# Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084"
#listeners=HTTP://:8083

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
# If not set, it uses the value for "listeners" if configured.
#rest.advertised.host.name=
#rest.advertised.port=
#rest.advertised.listener=

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java,/usr/local/kafka/kafka3.0/confluent-7.0.1/share/confluent-hub-components

[root@1c-kf-tt ksqldb]# cat /usr/local/kafka/kafka3.0/etc/ksqldb/connect.properties

# Sample configuration for a distributed Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092

# The group ID is a unique identifier for the set of workers that form a single Kafka Connect
# cluster
group.id=ksql-connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

# Internal Storage Topics.
#
# Kafka Connect distributed workers store the connector and task configurations, connector offsets,
# and connector statuses in three internal topics. These topics MUST be compacted.
# When the Kafka Connect distributed worker starts, it will check for these topics and attempt to create them
# as compacted topics if they don't yet exist, using the topic name, replication factor, and number of partitions
# as specified in these properties, and other topic-specific settings inherited from your brokers'
# auto-creation settings. If you need more control over these other topic-specific settings, you may want to
# manually create these topics before starting Kafka Connect distributed workers.
#
# The following properties set the names of these three internal topics for storing configs, offsets, and status.
config.storage.topic=ksql-connect-configs
offset.storage.topic=ksql-connect-offsets
status.storage.topic=ksql-connect-statuses

# The following properties set the replication factor for the three internal topics, defaulting to 3 for each
# and therefore requiring a minimum of 3 brokers in the cluster. Since we want the examples to run with
# only a single broker, we set the replication factor here to just 1. That's okay for the examples, but
# ALWAYS use a replication factor of AT LEAST 3 for production environments to reduce the risk of
# losing connector offsets, configurations, and status.
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

# The config storage topic must have a single partition, and this cannot be changed via properties.
# Offsets for all connectors and tasks are written quite frequently and therefore the offset topic
# should be highly partitioned; by default it is created with 25 partitions, but adjust accordingly
# with the number of connector tasks deployed to a distributed worker cluster. Kafka Connect records
# the status less frequently, and so by default the topic is created with 5 partitions.
#offset.storage.partitions=25
#status.storage.partitions=5

# The offsets, status, and configurations are written to the topics using converters specified through
# the following required properties. Most users will always want to use the JSON converter without schemas.
# Offset and config data is never visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false

# fill this configuration in to use custom connectors
# plugin.path=

so I would recommend to set the max.request.size here as well

max.request.size installed. Problem stay the same

strange, does the value show up if you check it via rest call?

Interesting how to do it?

the both curl commands mentioned above :wink:

Oh sorry, I didnā€™t understand at first :smiling_face:

[kafka@1c-kf-tt kafka3.0]$ curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT | jq

 % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   554  100   554    0     0    714      0 --:--:-- --:--:-- --:--:--   713
{
  "name": "SqlServer-SQL-TT",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "database.dbname": "Dev01_Tessa",
    "database.user": "kafka",
    "database.hostname": "TESSA35-SQL-TT",
    "database.password": "*******",
    "database.history.kafka.bootstrap.servers": "1c-kf-tt.ipa:9092",
    "database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY",
    "name": "SqlServer-SQL-TT",
    "database.server.name": "TESSA35-SQL-TT",
    "errors.log.enable": "true",
    "database.port": "1433"
  },
  "tasks": [
    {
      "connector": "SqlServer-SQL-TT",
      "task": 0
    }
  ],
  "type": "source"
}

[kafka@1c-kf-tt kafka3.0]$ curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT/tasks | jq

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   579  100   579    0     0   2010      0 --:--:-- --:--:-- --:--:--  2010
[
  {
    "id": {
      "connector": "SqlServer-SQL-TT",
      "task": 0
    },
    "config": {
      "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
      "database.dbname": "Dev01_Tessa",
      "database.user": "kafka",
      "task.class": "io.debezium.connector.sqlserver.SqlServerConnectorTask",
      "database.hostname": "TESSA35-SQL-TT",
      "database.history.kafka.bootstrap.servers": "1c-kf-tt.ipa:9092",
      "database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY",
      "database.password": "*******",
      "name": "SqlServer-SQL-TT",
      "database.server.name": "TESSA35-SQL-TT",
      "errors.log.enable": "true",
      "database.port": "1433"
    }
  }
]

ok my mistake adapted the wrong config

create a connector.json file (or adapt) with the following

 {
  "name": "SqlServer-SQL-TT",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "database.dbname": "Dev01_Tessa",
    "database.user": "kafka",
    "database.hostname": "TESSA35-SQL-TT",
    "database.password": "*******",
    "database.history.kafka.bootstrap.servers": "1c-kf-tt.ipa:9092",
    "database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY",
    "name": "SqlServer-SQL-TT",
    "database.server.name": "TESSA35-SQL-TT",
    "errors.log.enable": "true",
    "database.port": "1433",
    "max.request.size": "20971520"
  }

load the config with

curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/SqlServer-SQL-TT/config```
[kafka@1c-kf-tt data]$ curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/SqlServer-SQL-TT/config
{"error_code":500,"message":"Cannot deserialize value of type `java.lang.String` from Object value (token `JsonToken.START_OBJECT`)\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 43] (through reference chain: java.util.LinkedHashMap[\"config\"])"

I have added a parameter to the interface

curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT/tasks | jq

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   582  100   582    0     0  25304      0 --:--:-- --:--:-- --:--:-- 25304
[
  {
    "id": {
      "connector": "SqlServer-SQL-TT",
      "task": 0
    },
    "config": {
      "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
      "max.request.size": "20971520",
      "database.dbname": "Dev01_Tessa",
      "database.user": "kafka",
      "task.class": "io.debezium.connector.sqlserver.SqlServerConnectorTask",
      "database.hostname": "TESSA35-SQL-TT.",
      "database.history.kafka.bootstrap.servers": "1c-kf-tt.ipa:9092",
      "database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY",
      "database.password": "******",
      "name": "SqlServer-SQL-TT",
      "database.server.name": "TESSA35-SQL-TT",
      "database.port": "1433"
    }
  }
]

curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT | jq

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   557  100   557    0     0  92833      0 --:--:-- --:--:-- --:--:--  108k
{
  "name": "SqlServer-SQL-TT",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "max.request.size": "20971520",
    "database.dbname": "Dev01_Tessa",
    "database.user": "kafka",
    "database.hostname": "TESSA35-SQL-TT",
    "database.password": "******",
    "database.history.kafka.bootstrap.servers": "1c-kf-tt.ipa:9092",
    "database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY",
    "name": "SqlServer-SQL-TT",
    "database.server.name": "TESSA35-SQL-TT",
    "database.port": "1433"
  },
  "tasks": [
    {
      "connector": "SqlServer-SQL-TT",
      "task": 0
    }
  ],
  "type": "source"

But the error is still the same:

curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1352  100  1352    0     0   264k      0 --:--:-- --:--:-- --:--:--  264k
{
  "name": "SqlServer-SQL-TT",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.21.254.102:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "10.21.254.102:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:294)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:272)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:199)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:254)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1371709 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.\n"
    }
  ],
  "type": "source"
}

hmm pretty strange
need to test it locally by myself

nothing new from my side, but maybe @whatsupbros could help :wink:
seems he knows the connector quite well

@whatsupbros any help is appreciated :slight_smile:

Hey @mmuehlbeyer! in fact, not really lots of experience with Debezium, and no experience at all with pairing it with MSSQL Server :sweat_smile:

But the issue doesnā€™t seem to be connected to Debezium/MSSQL in particular, it is much more general, so letā€™s see what we can doā€¦

The first question to @Administor is a question of of the box - do you know why you have such big messages there? Is this a table with 5 thousand columns? Do you really need to sync all table columns to the Kafka topic? Because if the answer is that you could reduce the amount of data to be synced to Kafka, this is what you always should do.

As far as I know, Kafka is not optimized for storing really big messages, and that is why 1MB is the default message sizeā€¦

If reducing the size of a single message is not an option for you, then please read furtherā€¦

There is unfortunately neither max.request.size nor message.max.size configuration property on connector level, as I can see here:

However, max.request.size is a valid Producer configuration property. And it should be possible to override the connectorā€™s internal producerā€™s max.request.size, so you can try it.

It can be done on Connect Worker level using producer. prefix (in connect.properties - then it will affect all your Source connectors), or on a particular Source connector level by using producer.override. prefix (means your config would look producer.override.max.request.size). Mind the fact, that Worker configuration overriding must be allowed for connectors in this case.

However, I think, that the problem is still in the maximum message size, which is set for your target topic.

Here I wanted to mention, that the properties names are confusing. The default max message size is set by message.max.bytes Broker config, but the default value may be overriden with max.message.bytes Topic config parameter.

So, specifying message.max.bytes=20000000 in server.properties should be enough, if you create a new topic with default values (and you already did this, as I can see).

However, as I can also see here, your topic was already created, and perhaps it was created before you changed the default Broker value.

And just to be sure, I would recommend to adjust the max.message.bytes property exactly for your target topic TESSA35-SQL-TT.dbo.FileContent. I see that you have Control Center, so you should be able to do it there easily.

1 Like