Replica-placement for topic creation in Java

Can anyone give an example of how the config should look when specifying replica-placement when creating a topic using Java? I can only find cli examples of this being used.

I have tried adding it as a config item the same way I would add cleanup.policy or retention.ms for example but it throws this error when I try that:
org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: replica-placement

Here is how my failed config looks (replicaPlacementObj is a JSONObject):

configs.put(“retention.ms”, String.valueOf(retentionPeriod));
configs.put(“replica-placement”, replicaPlacementObj.toString());
configs.put(“min.insync.replicas”, “1”);

Is the functionality you’re looking for called replica-placement or replica-assignment?

By looking into Kafka’s CLI API, I was able to find the replica-placement parameter:

If that is the case, then according to the code, they use Kafka’s Admin API to handle this:

Now, suppose this is a Confluent-specific feature. In that case, I am not sure if the Admin API will recognize the parameter replica-placement passed to your config since it is not a native configuration property. You would need to check on Confluent’s code how this is handled.

@riferrei

1 Like

Thanks riferrei. It could be that it translates across as replicaAssigment in the Admin API. In the CLI the example given uses replica-placement (see below), hence why I tried that name in my code first. I will have a deeper look into replicaAssignment as you’ve suggested.

https://docs.confluent.io/platform/current/multi-dc-deployments/multi-region.html#replica-placement

kafka-topics --create
–bootstrap-server kafka-west-1:9092
–topic testing-observers
–partitions 3
replica-placement /etc/confluent/testing-observers.json
–config min.insync.replicas=2

1 Like

I know the post is old but I had to do this recently with Confluent Platform, see code below:

String todaysDatePostFix = getDate(); //gets date as yyyy-MM-dd-HH-mm-ss

Properties properties = new Properties();

properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

String replicaPlacementJson = readResourceFile("/stretched.json"); //reads from JSON file in resources

String prefix = "stretched-";

HashMap<String,String> topicProperties = new HashMap<String,String>();

 topicProperties.put("min.insync.replicas", "3");
 topicProperties.put("confluent.placement.constraints", replicaPlacementJson);

 String topicName = "replica-placement-" + prefix + todaysDatePostFix;

 // Create the AdminClient
 try (AdminClient adminClient = AdminClient.create(properties)) {

      NewTopic newTopic = new NewTopic(topicName,3,(short) -1); //we are using replica placement so as per docs set replicatonFactor to -1  

       newTopic.configs(topicProperties);

       CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic));
       createTopicsResult.all().get();

        System.out.println("Topic created successfully with custom replica placement. Topic " + topicName);

} catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
 }

The property is “confluent.placement.constraints” also ensure replication factor is set to -1