How to post messages to kafka cassandra sink

Hi, I am trying to post this message to a topic on which a kafka cassandra sink connector is listening. What am I doing wrong here. It is throwing this error message

confluent-6.1.1/bin/kafka-avro-console-producer --broker-list localhost:9092 --topic cassandra-sink-topic --property value.schema=’{“type”:“record”,“name”:“myrecord”,“fields”:[{“name”:“language”,“type”:“string”},{“name”:“users_count”,“type”:“string”}]}’

{“language”:“LISP”,“users_count”:“4000”}

org.apache.kafka.common.errors.SerializationException: Error registering Avro schema{“type”:“record”,“name”:“myrecord”,“fields”:[{“name”:“language”,“type”:“string”},{“name”:“users_count”,“type”:“string”}]}

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character (’<’ (code 60)): expected a valid value (JSON String, Number, Array, Object or token ‘null’, ‘true’ or ‘false’)

at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]; error code: 50005

at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:295)

at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:355)

at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:498)

at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:489)

at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:462)

at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:214)

at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:276)

at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:252)

at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:84)

at io.confluent.kafka.formatter.AvroMessageReader$AvroMessageSerializer.serialize(AvroMessageReader.java:168)

at io.confluent.kafka.formatter.SchemaMessageReader.readMessage(SchemaMessageReader.java:317)

at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:51)

at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala

Issue resolved when I specified it like this

confluent-6.1.1/bin/kafka-avro-console-producer --broker-list localhost:9092 --topic cassandra-sink-topic --property "parse.key=true" --property "key.separator=|" --property key.schema='{"name":"key","type":"string"}' --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"language","type":"string"},{"name":"users_count","type":"string"}]}'**

“B"|{"language":"LISP","users_count":"4000"}**
1 Like

@kkhambadkone, Glad you were able to resolve it over the weekend :slight_smile:

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.