I have deployed confluent platform (confluent-7.4.1) as described in manual. All components run on the same machine having hardware based on 8 Intel(R) Xeon(R) CPU E5-2680 v3 @ 2.50GHz cores and 16 Gb RAM. So, it’s a single node environment with 1 broker. The network and physical storage performances are well enough.
All components started using “systemd” scripts based on ones provided in distributive in “lib/systemd/system”. Commands for development runs like “confluent local…” aren’t used.
The purpose of the installed platform is to track changes in Oracle tables, produce messages to Kafka and then pass them to several destinations like Mongo, Postgres and so on. I have configured required source and sink connectors and achieved desired results with data transferring.
The problem is in heavy CPU loads every time any change made in source Oracle table or while initial transfer of table rows into Kafka. The situation is the same being used “io.confluent.connect.jdbc.JdbcSourceConnector” or “io.confluent.connect.oracle.cdc.OracleCdcSourceConnector”. CPU consumption is by the process named “ConnectDistributed” so it is kafka-connect. I have tried several infrastructure component versions:
- Ubuntu x86_64 with 5.15.0-76-generic kernel, Oracle Linux R9
- OpenJDK Javas 11, 18, and 19
- ojdbc8-21.7.0.0.jar and ojdbc8-19.7.0.0.jar
And results were exactly the same. While data rows of a table are transferred from Oracle into Kafka topic one core has 100% load. Adding another one table results to one more full-loaded core and so on. In fresh deployed confluent platform, I just add one source connector (even without sink one), data transfer process starts and one core per table is completely utilized. During an “idle state” CPU load is very low. But updating just one row in source table results to 5-7 seconds 100% load and latency about 5-10 seconds between update operation is committed in Oracle and message produced in Kafka.
Source connector’s config (many of optional parameters in connector’s config can be removed at all and it doesn’t change anything with the problem described):
{
“name”: “some_name”,
“config”:
{
“connector.class”: “io.confluent.connect.oracle.cdc.OracleCdcSourceConnector”,
“name”: "some_name “,
“tasks.max”: 4,
“confluent.topic.bootstrap.servers”:“localhost:9092”,
“key.converter”: “io.confluent.connect.avro.AvroConverter”,
“key.converter.schema.registry.url”: “http://localhost:8081”,
“value.converter”: “io.confluent.connect.avro.AvroConverter”,
“value.converter.schema.registry.url”: “http://localhost:8081”,
“key.converter.schemas.enable”: “false”,
“value.converter.schemas.enable”: “false”,
“oracle.server”: “some_ip_or_fqdn”,
“oracle.port”: 1521,
“oracle.sid”: “some_sid”,
“oracle.username”: “some_user”,
“oracle.password”: “some_password”,
“start.from”: “snapshot”,
“table.inclusion.regex”: " some_sid \. some_user \.(some_table|another_table)”,
“table.exclusion.regex”: “”,
“table.topic.name.template”: “${fullyQualifiedTableName}”,
“connection.pool.max.size”: 20,
“confluent.topic.replication.factor”: 1,
“redo.log.consumer.bootstrap.servers”: “localhost:9092”,
“topic.creation.enable”: “true”,
“topic.creation.groups”: “redo”,
“topic.creation.redo.include”: “redo-log-topic”,
“topic.creation.redo.replication.factor”: 1,
“topic.creation.redo.partitions”: 4,
“topic.creation.redo.cleanup.policy”: “delete”,
“topic.creation.redo.retention.ms”: 1209600000,
“topic.creation.default.replication.factor”: 1,
“topic.creation.default.partitions”: 8,
“topic.creation.default.cleanup.policy”: “delete”,
“emit.tombstone.on.delete”: “true”,
“output.scn.field”: “”,
“output.row.id.field”: “”,
“output.op.ts.field”: “”,
“output.current.ts.field”: “”,
“output.table.name.field”: “”,
“output.op.type.field”: “”,
“output.username.field”: “”,
“validate.non.null”: “false”,
“batch.max.rows”: “4096”,
“numeric.mapping”: “best_fit”
}
}
/etc/kafka/connect-distributed.properties:
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=localhost:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/usr/share/java,/opt/confluent/confluent-7.4.1/share/confluent-hub-components
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
All other configs have almost default values with just few necessary changes like host names.