Replication and Partition factor

Hi,

We have created a connector with the following configuration:

{
“name”: “sql-connector-perf_pri”,
“config”: {
“connector.class”: “io.debezium.connector.sqlserver.SqlServerConnector”,
“topic.creation.default.partitions”: “1”,
“topic.creation.default.replication.factor”: “2”,
“auto.create.topics.enable”: “true”,
“transforms”: “route”,
“transforms.route.type”: “org.apache.kafka.connect.transforms.RegexRouter”,
“value.converter.replace.null.with.default”: “false”,
“topic.prefix”: “perf_pri”,
“transforms.route.regex”: “.",
“decimal.handling.mode”: “string”,
“schema.history.internal.kafka.topic”: “dbhistory.perf_pri”,
“transforms.unwrap.drop.tombstones”: “false”,
“poll.interval.ms”: “10000”,
“transforms.unwrap.type”: “io.debezium.transforms.UnwrapFromEnvelope”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“transforms.route.replacement”: “perf_pri.pbc_pri.TxTopic”,
“snapshot.lock.timeout.ms”: “10000”,
“topic.creation.default.compression.type”: “snappy”,
“topic.creation.default.cleanup.policy”: “delete”,
“time.precision.mode”: “connect”,
“schema.history.internal.kafka.bootstrap.servers”: “xyz1:9092,xyz2:9092,xyz3:9092”,
“snapshot.isolation.mode”: “read_committed”,
“topic.creation.default.retention.ms”: “1296000000”,
“schema.ignore”: “true”,
“table.exclude.list”: "^(dbo\.)?sre.log., dbo.sre_idpool, dbo.sre_rule, dbo.sre_event, dbo.jts_job_run_log, dbo.event_log, cdc.
, audit.*”,
“database.hostname”: “abcde”,
“database.port”: “1234”,
“database.user”: “username”,
“database.password”: “pwd”,
“database.names”: “DB_name”,
“database.encrypt”: “false”,
“value.converter.schemas.enable”: “true”,
“name”: “sql-connector-perf_pri”,
“snapshot.mode”: “schema_only”
}
}

We have specified:

“topic.creation.default.partitions”: “1”,
“topic.creation.default.replication.factor”: “2”

However, after some days, when we checked the topic description, we found:

PartitionCount : 3
ReplicationFactor : 3

When I checked the Kafka logs, I saw the following:

controller.log.2024-07-10-03:[2024-07-10 03:04:48,244] INFO [QuorumController id=1001] Replayed TopicRecord for topic perf_pri.pbc_pri.TxTopic with topic ID bDqdDtvdTWauzZMu97kJUQ. (org.apache.kafka.controller.ReplicationControlManager)
controller.log.2024-07-10-03:[2024-07-10 03:04:48,244] INFO [QuorumController id=1001] Replayed ConfigRecord for ConfigResource(type=TOPIC, name='perf_pri.pbc_pri.TxTopic') which set configuration compression.type to snappy (org.apache.kafka.controller.ConfigurationControlManager)
controller.log.2024-07-10-03:[2024-07-10 03:04:48,244] INFO [QuorumController id=1001] Replayed ConfigRecord for ConfigResource(type=TOPIC, name='perf_pri.pbc_pri.TxTopic') which set configuration cleanup.policy to delete (org.apache.kafka.controller.ConfigurationControlManager)
controller.log.2024-07-10-03:[2024-07-10 03:04:48,244] INFO [QuorumController id=1001] Replayed ConfigRecord for ConfigResource(type=TOPIC, name='perf_pri.pbc_pri.TxTopic') which set configuration retention.ms to 1296000000 (org.apache.kafka.controller.ConfigurationControlManager)
controller.log.2024-07-10-03:[2024-07-10 03:04:48,244] INFO [QuorumController id=1001] Replayed PartitionRecord for new partition perf_pri.pbc_pri.TxTopic-0 with topic ID bDqdDtvdTWauzZMu97kJUQ and PartitionRegistration(replicas=[2002, 2003], isr=[2002, 2003], removingReplicas=[], addingReplicas=[], leader=2002, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
controller.log.2024-07-10-07:[2024-07-10 07:00:49,400] INFO [QuorumController id=1001] Replayed TopicRecord for topic perf_pri.pbc_pri.TxTopic with topic ID bDqdDtvdTWauzZMu97kJUQ. (org.apache.kafka.controller.ReplicationControlManager)
controller.log.2024-07-10-07:[2024-07-10 07:00:49,400] INFO [QuorumController id=1001] Replayed PartitionRecord for new partition perf_pri.pbc_pri.TxTopic-0 with topic ID bDqdDtvdTWauzZMu97kJUQ and PartitionRegistration(replicas=[2002, 2003], isr=[2002, 2003], removingReplicas=[], addingReplicas=[], leader=2002, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
controller.log.2024-07-10-07:[2024-07-10 07:00:49,413] INFO [QuorumController id=1001] Replayed ConfigRecord for ConfigResource(type=TOPIC, name='perf_pri.pbc_pri.TxTopic') which set configuration compression.type to snappy (org.apache.kafka.controller.ConfigurationControlManager)
controller.log.2024-07-10-07:[2024-07-10 07:00:49,413] INFO [QuorumController id=1001] Replayed ConfigRecord for ConfigResource(type=TOPIC, name='perf_pri.pbc_pri.TxTopic') which set configuration cleanup.policy to delete (org.apache.kafka.controller.ConfigurationControlManager)
controller.log.2024-07-10-07:[2024-07-10 07:00:49,413] INFO [QuorumController id=1001] Replayed ConfigRecord for ConfigResource(type=TOPIC, name='perf_pri.pbc_pri.TxTopic') which set configuration retention.ms to 1296000000 (org.apache.kafka.controller.ConfigurationControlManager)
controller.log.2024-07-15-10:[2024-07-15 10:05:14,609] INFO [QuorumController id=1001] Replayed RemoveTopicRecord for topic perf_pri.pbc_pri.TxTopic with ID bDqdDtvdTWauzZMu97kJUQ. (org.apache.kafka.controller.ReplicationControlManager)
controller.log.2024-07-15-10:[2024-07-15 10:05:52,584] INFO [QuorumController id=1001] Replayed TopicRecord for topic perf_pri.pbc_pri.TxTopic with topic ID 2fbYHOWlTcKwYPxTDnb0tA. (org.apache.kafka.controller.ReplicationControlManager)
controller.log.2024-07-15-10:[2024-07-15 10:05:52,584] INFO [QuorumController id=1001] Replayed PartitionRecord for new partition perf_pri.pbc_pri.TxTopic-0 with topic ID 2fbYHOWlTcKwYPxTDnb0tA and PartitionRegistration(replicas=[2003, 2001, 2002], isr=[2003, 2001, 2002], removingReplicas=[], addingReplicas=[], leader=2003, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
controller.log.2024-07-15-10:[2024-07-15 10:05:52,584] INFO [QuorumController id=1001] Replayed PartitionRecord for new partition perf_pri.pbc_pri.TxTopic-1 with topic ID 2fbYHOWlTcKwYPxTDnb0tA and PartitionRegistration(replicas=[2001, 2002, 2003], isr=[2001, 2002, 2003], removingReplicas=[], addingReplicas=[], leader=2001, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
controller.log.

2024-07-15-10:[2024-07-15 10:05:52,584] INFO [QuorumController id=1001] Replayed PartitionRecord for new partition perf_pri.pbc_pri.TxTopic-2 with topic ID 2fbYHOWlTcKwYPxTDnb0tA and PartitionRegistration(replicas=[2002, 2003, 2001], isr=[2002, 2003, 2001], removingReplicas=[], addingReplicas=[], leader=2002, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
server.log.2024-07-10-02:[2024-07-10 03:04:47,860] INFO Sent auto-creation request for Set(perf_pri.pbc_pri.TxTopi ) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
server.log.2024-07-10-02:[2024-07-10 03:04:47,960] INFO Sent auto-creation request for Set(perf_pri.pbc_pri.TxTopi ) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
server.log.2024-07-10-02:[2024-07-10 03:04:48,061] INFO Sent auto-creation request for Set(perf_pri.pbc_pri.TxTopi ) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
server.log.2024-07-10-02:[2024-07-10 03:04:48,162] INFO Sent auto-creation request for Set(perf_pri.pbc_pri.TxTopi ) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
server.log.2024-07-10-02:[2024-07-10 03:04:48,242] INFO [DynamicConfigPublisher broker id=2001] Updating topic perf_pri.pbc_pri.TxTopic with new configuration: compression.type -> snappy, cleanup.policy -> delete, retention.ms -> 1296000000 (kafka.server.metadata.DynamicConfigPublisher)
state-change.log.2024-07-15-10:[2024-07-15 10:05:52,577] INFO [Broker id=2001] Creating new partition perf_pri.pbc_pri.TxTopic-1 with topic id 2fbYHOWlTcKwYPxTDnb0tA. (state.change.logger)
state-change.log.2024-07-15-10:[2024-07-15 10:05:52,579] INFO [Broker id=2001] Leader perf_pri.pbc_pri.TxTopic-1 with topic id Some(2fbYHOWlTcKwYPxTDnb0tA) starts at leader epoch 0 from offset 0 with partition epoch 0, high watermark 0, ISR [2001,2002,2003], adding replicas [] and removing replicas []. Previous leader None and previous leader epoch was -1. (state.change.logger)
state-change.log.2024-07-15-10:[2024-07-15 10:05:52,585] INFO [Broker id=2001] Creating new partition perf_pri.pbc_pri.TxTopic-2 with topic id 2fbYHOWlTcKwYPxTDnb0tA. (state.change.logger)
state-change.log.2024-07-15-10:[2024-07-15 10:05:52,586] INFO [Broker id=2001] Follower perf_pri.pbc_pri.TxTopic-2 starts at leader epoch 0 from offset 0 with partition epoch 0 and high watermark 0. Current leader is 2002. Previous leader Some(2002) and previous leader epoch was 0. (state.change.logger)
state-change.log.2024-07-15-10:[2024-07-15 10:05:52,586] INFO [Broker id=2001] Creating new partition perf_pri.pbc_pri.TxTopic-0 with topic id 2fbYHOWlTcKwYPxTDnb0tA. (state.change.logger)
state-change.log.2024-07-15-10:[2024-07-15 10:05:52,586] INFO [Broker id=2001] Follower perf_pri.pbc_pri.TxTopic-0 starts at leader epoch 0 from offset 0 with partition epoch 0 and high watermark 0. Current leader is 2003. Previous leader Some(2003) and previous leader epoch was 0. (state.change.logger)

From this point, the topic was assigned PartitionCount: 3 and ReplicationFactor: 3 . Can anyone help us understand why the topic configuration changed to these values?

hey,

which kafka version are running?
and which cp version?

Best,
Michael

Hi @mmuehlbeyer,

we are using confluent 7.6.0 version

and did you check the replication factor and partition count right after the creation?

yes, we checked the replication factor and partition count by using describe topic after topic created. It is having replication factor as 3 and partition as 1

so clusters default right after creation I assume?

yes kafka clusters are same, we did not change anything

sorry
I meant clusters default replication factor and partition settings?

Yeah… broker properties have replication factor as 3 and partition as 3 but in connector, we configured replication factor as 3 and partition as 1

topic.creation.group not set I think?

we are using auto.create.topics.enable property in connector to create the topics automatically.

understood but the group is not set, right?

Hi @mmuehlbeyer,
We did not use that property in the connector configuration, if we are not using any specific group then we thought that it will go to default group. Let us know if our understanding is wrong.

will it be a issue if we are not specifying the group w.r.t partition and replication factor?

hi @vraghu

afaik you should end up in the default group if not specifying any.
so I think your understanding is right.

though the connector states if the group is not found it will create the topics with the broker defaults.

I think it’s worth to try to specify a group and set your desired config like this:

"topic.creation.group":"$yourgroup",
“topic.creation.$yourgroup.partitions”: “1”,
“topic.creation.$yourgroup.replication.factor”: “2”,

@mmuehlbeyer - we will add that property in connector. we will validate and confirm if it is working

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.