Can anyone give an example of how the config should look when specifying replica-placement when creating a topic using Java? I can only find cli examples of this being used.
I have tried adding it as a config item the same way I would add cleanup.policy or retention.ms for example but it throws this error when I try that:
“org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: replica-placement ”
Here is how my failed config looks (replicaPlacementObj is a JSONObject):
configs.put(“retention.ms”, String.valueOf(retentionPeriod));
configs.put(“replica-placement”, replicaPlacementObj.toString());
configs.put(“min.insync.replicas”, “1”);
Is the functionality you’re looking for called replica-placement
or replica-assignment
?
By looking into Kafka’s CLI API, I was able to find the replica-placement
parameter:
.ofType(classOf[String])
private val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " +
"altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected). If not supplied for create, defaults to the cluster default.")
.withRequiredArg
.describedAs("# of partitions")
.ofType(classOf[java.lang.Integer])
private val replicationFactorOpt = parser.accepts("replication-factor", "The replication factor for each partition in the topic being created. If not supplied, defaults to the cluster default.")
.withRequiredArg
.describedAs("replication factor")
.ofType(classOf[java.lang.Integer])
private val replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created or altered.")
.withRequiredArg
.describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " +
"broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
.ofType(classOf[String])
private val reportUnderReplicatedPartitionsOpt = parser.accepts("under-replicated-partitions",
"if set when describing topics, only show under replicated partitions")
private val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions",
"if set when describing topics, only show partitions whose leader is not available")
private val reportUnderMinIsrPartitionsOpt = parser.accepts("under-min-isr-partitions",
"if set when describing topics, only show partitions whose isr count is less than the configured minimum.")
If that is the case, then according to the code, they use Kafka’s Admin API to handle this:
val topics = getTopics(opts.topic, opts.excludeInternalTopics)
ensureTopicExists(topics, opts.topic, !opts.ifExists)
if (topics.nonEmpty) {
val topicsInfo = adminClient.describeTopics(topics.asJavaCollection).topicNameValues()
val newPartitions = topics.map { topicName =>
if (topic.hasReplicaAssignment) {
val startPartitionId = topicsInfo.get(topicName).get().partitions().size()
val newAssignment = {
val replicaMap = topic.replicaAssignment.get.drop(startPartitionId)
new util.ArrayList(replicaMap.map(p => p._2.asJava).asJavaCollection).asInstanceOf[util.List[util.List[Integer]]]
}
topicName -> NewPartitions.increaseTo(topic.partitions.get, newAssignment)
} else {
topicName -> NewPartitions.increaseTo(topic.partitions.get)
}
}.toMap
adminClient.createPartitions(newPartitions.asJava,
new CreatePartitionsOptions().retryOnQuotaViolation(false)).all().get()
}
}
Now, suppose this is a Confluent-specific feature. In that case, I am not sure if the Admin API will recognize the parameter replica-placement
passed to your config since it is not a native configuration property. You would need to check on Confluent’s code how this is handled.
— @riferrei
1 Like
Thanks riferrei. It could be that it translates across as replicaAssigment in the Admin API. In the CLI the example given uses replica-placement (see below), hence why I tried that name in my code first. I will have a deeper look into replicaAssignment as you’ve suggested.
https://docs.confluent.io/platform/current/multi-dc-deployments/multi-region.html#replica-placement
kafka-topics --create
–bootstrap-server kafka-west-1:9092
–topic testing-observers
–partitions 3
–replica-placement /etc/confluent/testing-observers.json
–config min.insync.replicas=2
1 Like
I know the post is old but I had to do this recently with Confluent Platform, see code below:
String todaysDatePostFix = getDate(); //gets date as yyyy-MM-dd-HH-mm-ss
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
String replicaPlacementJson = readResourceFile("/stretched.json"); //reads from JSON file in resources
String prefix = "stretched-";
HashMap<String,String> topicProperties = new HashMap<String,String>();
topicProperties.put("min.insync.replicas", "3");
topicProperties.put("confluent.placement.constraints", replicaPlacementJson);
String topicName = "replica-placement-" + prefix + todaysDatePostFix;
// Create the AdminClient
try (AdminClient adminClient = AdminClient.create(properties)) {
NewTopic newTopic = new NewTopic(topicName,3,(short) -1); //we are using replica placement so as per docs set replicatonFactor to -1
newTopic.configs(topicProperties);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic));
createTopicsResult.all().get();
System.out.println("Topic created successfully with custom replica placement. Topic " + topicName);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
The property is “confluent.placement.constraints” also ensure replication factor is set to -1