Hi,
Im trying to run the JDBC Sink connector for sql server with upsert mode on. I tried various combination of max.poll.records. I still get the error -
[Consumer clientId=connector-consumer-sink-azure-jdbc-sepa-gze-sql-pr-0, groupId=connect-sink-azure-jdbc-sepa-gze-sql-pr] Member connector-consumer-sink-azure-jdbc-sepa-gze-sql-pr-0-b085e209-ced8-46db-a397-8a5a84910b63 sending LeaveGroup request to coordinator broker***:9092 (id: 2147483544 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) -
Here are my connector configurations-
{
“consumer.override.heartbeat.interval.ms”: “90000”,
“consumer.override.session.timeout.ms”: “120000”,
“consumer.override.max.poll.records”: “1000”,
“confluent.topic.bootstrap.servers”: “”,
“consumer.override.max.poll.interval.ms”: “300000”,
“transforms.InsertSource.timestamp.field”: “kafka_timestamp”,
“key.converter.schemas.enable”: “true”,
“value.converter.schema.registry.url”: “https://:8081",
“value.converter.schemas.enable”: “true”,
“key.converter.schema.registry.url”: "https://:8081”,
“name”: “sink-azure-jdbc-sepa-gze-sql-pr”,
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“tasks.max”: “2”,
“key.converter”: “io.confluent.connect.avro.AvroConverter”,
“value.converter”: “io.confluent.connect.avro.AvroConverter”,
“transforms”: [
“RenameField”,
“InsertField”
],
“topics”: [
“topic-tgz-coremandate”
],
“transforms.RenameField.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”,
“transforms.RenameField.renames”: [
“SEQ_TYPE:SEQUENCE_TYPE”,
“LAST_PROCESSING_REF:LAST_PROCESSING_REFERENCE”,
“LAST_PROCESSED_E2E_REF:LAST_PROCESSED_E2E_REFERENCE”,
“LAST_PROCESSED_LOC_INS:LAST_PROCESSED_LOCAL_INSTRUMENT”,
“COUNT_NO:COUNT_NUMBER”
],
“transforms.InsertField.type”: “org.apache.kafka.connect.transforms.InsertField$Value”,
“transforms.InsertField.timestamp.field”: “MF_DATA_SYNCED_TIMESTAMP”,
“connection.url”: “”,
“insert.mode”: “upsert”,
“batch.size”: “6000”,
“delete.enabled”: “true”,
“table.name.format”: “sdd_core_mandate”,
“pk.mode”: “record_key”,
“pk.fields”: [
“IBAN_PART”,
“DEB_ACC”,
“CREDITOR_ALIAS”,
“MANDATE_ALIAS”
],
“auto.create”: “false”,
“consumer.override.batch.size”: “32768”
}