Connector Debezium SQL - Failed

max.request.size installed. Problem stay the same

strange, does the value show up if you check it via rest call?

Interesting how to do it?

the both curl commands mentioned above :wink:

Oh sorry, I didnā€™t understand at first :smiling_face:

[kafka@1c-kf-tt kafka3.0]$ curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT | jq

 % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   554  100   554    0     0    714      0 --:--:-- --:--:-- --:--:--   713
{
  "name": "SqlServer-SQL-TT",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "database.dbname": "Dev01_Tessa",
    "database.user": "kafka",
    "database.hostname": "TESSA35-SQL-TT",
    "database.password": "*******",
    "database.history.kafka.bootstrap.servers": "1c-kf-tt.ipa:9092",
    "database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY",
    "name": "SqlServer-SQL-TT",
    "database.server.name": "TESSA35-SQL-TT",
    "errors.log.enable": "true",
    "database.port": "1433"
  },
  "tasks": [
    {
      "connector": "SqlServer-SQL-TT",
      "task": 0
    }
  ],
  "type": "source"
}

[kafka@1c-kf-tt kafka3.0]$ curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT/tasks | jq

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   579  100   579    0     0   2010      0 --:--:-- --:--:-- --:--:--  2010
[
  {
    "id": {
      "connector": "SqlServer-SQL-TT",
      "task": 0
    },
    "config": {
      "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
      "database.dbname": "Dev01_Tessa",
      "database.user": "kafka",
      "task.class": "io.debezium.connector.sqlserver.SqlServerConnectorTask",
      "database.hostname": "TESSA35-SQL-TT",
      "database.history.kafka.bootstrap.servers": "1c-kf-tt.ipa:9092",
      "database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY",
      "database.password": "*******",
      "name": "SqlServer-SQL-TT",
      "database.server.name": "TESSA35-SQL-TT",
      "errors.log.enable": "true",
      "database.port": "1433"
    }
  }
]

ok my mistake adapted the wrong config

create a connector.json file (or adapt) with the following

 {
  "name": "SqlServer-SQL-TT",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "database.dbname": "Dev01_Tessa",
    "database.user": "kafka",
    "database.hostname": "TESSA35-SQL-TT",
    "database.password": "*******",
    "database.history.kafka.bootstrap.servers": "1c-kf-tt.ipa:9092",
    "database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY",
    "name": "SqlServer-SQL-TT",
    "database.server.name": "TESSA35-SQL-TT",
    "errors.log.enable": "true",
    "database.port": "1433",
    "max.request.size": "20971520"
  }

load the config with

curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/SqlServer-SQL-TT/config```
[kafka@1c-kf-tt data]$ curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/SqlServer-SQL-TT/config
{"error_code":500,"message":"Cannot deserialize value of type `java.lang.String` from Object value (token `JsonToken.START_OBJECT`)\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 43] (through reference chain: java.util.LinkedHashMap[\"config\"])"

I have added a parameter to the interface

curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT/tasks | jq

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   582  100   582    0     0  25304      0 --:--:-- --:--:-- --:--:-- 25304
[
  {
    "id": {
      "connector": "SqlServer-SQL-TT",
      "task": 0
    },
    "config": {
      "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
      "max.request.size": "20971520",
      "database.dbname": "Dev01_Tessa",
      "database.user": "kafka",
      "task.class": "io.debezium.connector.sqlserver.SqlServerConnectorTask",
      "database.hostname": "TESSA35-SQL-TT.",
      "database.history.kafka.bootstrap.servers": "1c-kf-tt.ipa:9092",
      "database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY",
      "database.password": "******",
      "name": "SqlServer-SQL-TT",
      "database.server.name": "TESSA35-SQL-TT",
      "database.port": "1433"
    }
  }
]

curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT | jq

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   557  100   557    0     0  92833      0 --:--:-- --:--:-- --:--:--  108k
{
  "name": "SqlServer-SQL-TT",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "max.request.size": "20971520",
    "database.dbname": "Dev01_Tessa",
    "database.user": "kafka",
    "database.hostname": "TESSA35-SQL-TT",
    "database.password": "******",
    "database.history.kafka.bootstrap.servers": "1c-kf-tt.ipa:9092",
    "database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY",
    "name": "SqlServer-SQL-TT",
    "database.server.name": "TESSA35-SQL-TT",
    "database.port": "1433"
  },
  "tasks": [
    {
      "connector": "SqlServer-SQL-TT",
      "task": 0
    }
  ],
  "type": "source"

But the error is still the same:

curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1352  100  1352    0     0   264k      0 --:--:-- --:--:-- --:--:--  264k
{
  "name": "SqlServer-SQL-TT",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.21.254.102:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "10.21.254.102:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:294)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:272)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:199)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:254)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1371709 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.\n"
    }
  ],
  "type": "source"
}

hmm pretty strange
need to test it locally by myself

nothing new from my side, but maybe @whatsupbros could help :wink:
seems he knows the connector quite well

@whatsupbros any help is appreciated :slight_smile:

Hey @mmuehlbeyer! in fact, not really lots of experience with Debezium, and no experience at all with pairing it with MSSQL Server :sweat_smile:

But the issue doesnā€™t seem to be connected to Debezium/MSSQL in particular, it is much more general, so letā€™s see what we can doā€¦

The first question to @Administor is a question of of the box - do you know why you have such big messages there? Is this a table with 5 thousand columns? Do you really need to sync all table columns to the Kafka topic? Because if the answer is that you could reduce the amount of data to be synced to Kafka, this is what you always should do.

As far as I know, Kafka is not optimized for storing really big messages, and that is why 1MB is the default message sizeā€¦

If reducing the size of a single message is not an option for you, then please read furtherā€¦

There is unfortunately neither max.request.size nor message.max.size configuration property on connector level, as I can see here:

However, max.request.size is a valid Producer configuration property. And it should be possible to override the connectorā€™s internal producerā€™s max.request.size, so you can try it.

It can be done on Connect Worker level using producer. prefix (in connect.properties - then it will affect all your Source connectors), or on a particular Source connector level by using producer.override. prefix (means your config would look producer.override.max.request.size). Mind the fact, that Worker configuration overriding must be allowed for connectors in this case.

However, I think, that the problem is still in the maximum message size, which is set for your target topic.

Here I wanted to mention, that the properties names are confusing. The default max message size is set by message.max.bytes Broker config, but the default value may be overriden with max.message.bytes Topic config parameter.

So, specifying message.max.bytes=20000000 in server.properties should be enough, if you create a new topic with default values (and you already did this, as I can see).

However, as I can also see here, your topic was already created, and perhaps it was created before you changed the default Broker value.

And just to be sure, I would recommend to adjust the max.message.bytes property exactly for your target topic TESSA35-SQL-TT.dbo.FileContent. I see that you have Control Center, so you should be able to do it there easily.

1 Like