max.request.size installed. Problem stay the same
strange, does the value show up if you check it via rest call?
Interesting how to do it?
the both curl commands mentioned above
Oh sorry, I didnāt understand at first
[kafka@1c-kf-tt kafka3.0]$ curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 554 100 554 0 0 714 0 --:--:-- --:--:-- --:--:-- 713
{
"name": "SqlServer-SQL-TT",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.dbname": "Dev01_Tessa",
"database.user": "kafka",
"database.hostname": "TESSA35-SQL-TT",
"database.password": "*******",
"database.history.kafka.bootstrap.servers": "1c-kf-tt.ipa:9092",
"database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY",
"name": "SqlServer-SQL-TT",
"database.server.name": "TESSA35-SQL-TT",
"errors.log.enable": "true",
"database.port": "1433"
},
"tasks": [
{
"connector": "SqlServer-SQL-TT",
"task": 0
}
],
"type": "source"
}
[kafka@1c-kf-tt kafka3.0]$ curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT/tasks | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 579 100 579 0 0 2010 0 --:--:-- --:--:-- --:--:-- 2010
[
{
"id": {
"connector": "SqlServer-SQL-TT",
"task": 0
},
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.dbname": "Dev01_Tessa",
"database.user": "kafka",
"task.class": "io.debezium.connector.sqlserver.SqlServerConnectorTask",
"database.hostname": "TESSA35-SQL-TT",
"database.history.kafka.bootstrap.servers": "1c-kf-tt.ipa:9092",
"database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY",
"database.password": "*******",
"name": "SqlServer-SQL-TT",
"database.server.name": "TESSA35-SQL-TT",
"errors.log.enable": "true",
"database.port": "1433"
}
}
]
ok my mistake adapted the wrong config
create a connector.json file (or adapt) with the following
{
"name": "SqlServer-SQL-TT",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.dbname": "Dev01_Tessa",
"database.user": "kafka",
"database.hostname": "TESSA35-SQL-TT",
"database.password": "*******",
"database.history.kafka.bootstrap.servers": "1c-kf-tt.ipa:9092",
"database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY",
"name": "SqlServer-SQL-TT",
"database.server.name": "TESSA35-SQL-TT",
"errors.log.enable": "true",
"database.port": "1433",
"max.request.size": "20971520"
}
load the config with
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/SqlServer-SQL-TT/config```
[kafka@1c-kf-tt data]$ curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/SqlServer-SQL-TT/config
{"error_code":500,"message":"Cannot deserialize value of type `java.lang.String` from Object value (token `JsonToken.START_OBJECT`)\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 43] (through reference chain: java.util.LinkedHashMap[\"config\"])"
I have added a parameter to the interface
curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT/tasks | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 582 100 582 0 0 25304 0 --:--:-- --:--:-- --:--:-- 25304
[
{
"id": {
"connector": "SqlServer-SQL-TT",
"task": 0
},
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"max.request.size": "20971520",
"database.dbname": "Dev01_Tessa",
"database.user": "kafka",
"task.class": "io.debezium.connector.sqlserver.SqlServerConnectorTask",
"database.hostname": "TESSA35-SQL-TT.",
"database.history.kafka.bootstrap.servers": "1c-kf-tt.ipa:9092",
"database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY",
"database.password": "******",
"name": "SqlServer-SQL-TT",
"database.server.name": "TESSA35-SQL-TT",
"database.port": "1433"
}
}
]
curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 557 100 557 0 0 92833 0 --:--:-- --:--:-- --:--:-- 108k
{
"name": "SqlServer-SQL-TT",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"max.request.size": "20971520",
"database.dbname": "Dev01_Tessa",
"database.user": "kafka",
"database.hostname": "TESSA35-SQL-TT",
"database.password": "******",
"database.history.kafka.bootstrap.servers": "1c-kf-tt.ipa:9092",
"database.history.kafka.topic": "TESSA35-SQL-TT-HISTORY",
"name": "SqlServer-SQL-TT",
"database.server.name": "TESSA35-SQL-TT",
"database.port": "1433"
},
"tasks": [
{
"connector": "SqlServer-SQL-TT",
"task": 0
}
],
"type": "source"
But the error is still the same:
curl -X GET http://localhost:8083/connectors/SqlServer-SQL-TT/status | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1352 100 1352 0 0 264k 0 --:--:-- --:--:-- --:--:-- 264k
{
"name": "SqlServer-SQL-TT",
"connector": {
"state": "RUNNING",
"worker_id": "10.21.254.102:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "10.21.254.102:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:294)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:272)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:199)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:254)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1371709 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.\n"
}
],
"type": "source"
}
hmm pretty strange
need to test it locally by myself
nothing new from my side, but maybe @whatsupbros could help
seems he knows the connector quite well
@whatsupbros any help is appreciated
Hey @mmuehlbeyer! in fact, not really lots of experience with Debezium, and no experience at all with pairing it with MSSQL Server
But the issue doesnāt seem to be connected to Debezium/MSSQL in particular, it is much more general, so letās see what we can doā¦
The first question to @Administor is a question of of the box - do you know why you have such big messages there? Is this a table with 5 thousand columns? Do you really need to sync all table columns to the Kafka topic? Because if the answer is that you could reduce the amount of data to be synced to Kafka, this is what you always should do.
As far as I know, Kafka is not optimized for storing really big messages, and that is why 1MB is the default message sizeā¦
If reducing the size of a single message is not an option for you, then please read furtherā¦
There is unfortunately neither max.request.size
nor message.max.size
configuration property on connector level, as I can see here:
- Debezium connector for SQL Server :: Debezium Documentation
- Kafka Connect Source Configuration Properties for Confluent Platform | Confluent Documentation
However, max.request.size
is a valid Producer configuration property. And it should be possible to override the connectorās internal producerās max.request.size
, so you can try it.
It can be done on Connect Worker level using producer.
prefix (in connect.properties
- then it will affect all your Source connectors), or on a particular Source connector level by using producer.override.
prefix (means your config would look producer.override.max.request.size
). Mind the fact, that Worker configuration overriding must be allowed for connectors in this case.
However, I think, that the problem is still in the maximum message size, which is set for your target topic.
Here I wanted to mention, that the properties names are confusing. The default max message size is set by message.max.bytes
Broker config, but the default value may be overriden with max.message.bytes
Topic config parameter.
So, specifying message.max.bytes=20000000
in server.properties
should be enough, if you create a new topic with default values (and you already did this, as I can see).
However, as I can also see here, your topic was already created, and perhaps it was created before you changed the default Broker value.
And just to be sure, I would recommend to adjust the max.message.bytes
property exactly for your target topic TESSA35-SQL-TT.dbo.FileContent
. I see that you have Control Center, so you should be able to do it there easily.