Spooldir - Programmatically Building Schema

Hi All,

I’m trying to use the SpoolDir source connector and want to pass in the “value.schema” property as shown in the the example, i.e.

value.schema={\n  \"name\" : \"com.example.users.User\",\n  \"type\" : \"STRUCT\",\n  \"isOptional\" : false,\n  \"fieldSchemas\" : {\n    \"id\" : {\n      \"type\" : \"INT64\",\n      \"isOptional\" : false\n    },\n    \"first_name\" : {\n      \"type\" : \"STRING\",\n      \"isOptional\" : true\n    },\n    \"last_name\" : {\n      \"type\" : \"STRING\",\n      \"isOptional\" : true\n    },\n    \"email\" : {\n      \"type\" : \"STRING\",\n      \"isOptional\" : true\n    },\n    \"gender\" : {\n      \"type\" : \"STRING\",\n      \"isOptional\" : true\n    },\n    \"ip_address\" : {\n      \"type\" : \"STRING\",\n      \"isOptional\" : true\n    },\n    \"last_login\" : {\n      \"type\" : \"STRING\",\n      \"isOptional\" : true\n    },\n    \"account_balance\" : {\n      \"name\" : \"org.apache.kafka.connect.data.Decimal\",\n      \"type\" : \"BYTES\",\n      \"version\" : 1,\n      \"parameters\" : {\n        \"scale\" : \"2\"\n      },\n      \"isOptional\" : true\n    },\n    \"country\" : {\n      \"type\" : \"STRING\",\n      \"isOptional\" : true\n    },\n    \"favorite_color\" : {\n      \"type\" : \"STRING\",\n      \"isOptional\" : true\n    }\n  }\n}

However, I want to build the schema programmatically using something like your org.apache.afka.connect.data.SchemaBuilder but I can’t figure out how to then get the JSON string from the SchemaBuilder/Schema objects to pass into the connector config as shown above.

Any idea how I can do that, or what’s the best way to programmatically build a schema to then get the required Kafka Connect JSON string to pass to the spooldir connector as the value.schema property?

Regards

GP

Hello Pronzato,

Welcome to the forum!

You can POST connector configs using the HTTP API (or POST to update it), so you could use an HTTP client in the application that is creating the schema and update the connector configs with that. You can find more info on the RES here: Connect REST Interface | Confluent Documentation

I hope that helps,
Dave

Hi Dave,

Thanks for getting back to me. Yes, I’m using the Connect REST API to configure my connectors already, the question is how do I create that schema string that I add to the config to create the spooldir connector?

I don’t want to create that string by hand, I was hoping to use a Java object like your SchemaBuilder to build my CSV schema programmatically and then somehow get the ‘string’ version of it to then add to the config and then use the REST API to create the connector. However, the SchemaBuilder.toString() doesn’t create the desired output.

Is there something that I can use to generate that JSON string from a Schema object?

Regards

GP

I haven’t had time to try this out, but you could try using the JsonConverter’s .asJsonSchema method to convert the schema provided by the SchemaBuilder to a JSON ObjectNode and then get the string from that. Not sure it would provide a valid JSON string, but it’s worth a try. :slight_smile:

Thank you Dave. I just tried that but unfortunately it doesn’t seem to produce the same JSON string shown in the Spooldir examples.

I tried:
JsonConverter converter = new JsonConverter();
converter.configure(Collections.emptyMap(), false);
String schemaString = converter.asJsonSchema(schema).toString();

but it produces:

{“type”:“struct”,“fields”:[{type":“int64”,“optional”:true",“field”:“id”},…

where all the Spooldir examples online specify the schema string as needing to look like this:

{\n “name” : “com.example.users.User”,\n “type” : “STRUCT”,\n “isOptional” : false,\n “fieldSchemas” : {\n “id” : {\n “type” : “INT64”,…

Regards

GP