HTTP Sink not working properly

I have created a connector for handling Avro data. I am able to publish data into a topic. But I am not getting the data into the output topic. I have checked the logs of connector and rest-proxy there is no error are showing. Please find the below connect APIrequest

{
    "name": "sink-elastic_avro_2_topic",
    "config": {
        "connector.class": "io.confluent.connect.http.HttpSinkConnector",
        "headers": "Content-Type:application/vnd.kafka.json.v2+json|Accept:application/vnd.kafka.v2+json",
        "batch.max.size": "3000",
        "confluent.topic.bootstrap.servers": "broker:9092",
        "tasks.max": "3",
        "http.api.url": "http://xxx.xx.xxx/topics/avro_output_topic",
        "topics": "avro_input_topic",
        "request.method": "POST",
        "reporter.bootstrap.servers": "broker:9092",
        "regex.patterns": "^~$",
        "regex.separator": "~",
        "reporter.error.topic.name": "error-responses",
        "regex.replacements": "{\"key\" : \"${key}\" ,\"value\":~}",
        "reporter.result.topic.name": "success-responses",
        "batch.prefix": "{\"records\":[",
        "reporter.error.topic.replication.factor": "1",
        "consumer.override.auto.offset.reset": "latest",
        "confluent.topic.replication.factor": "1",
        "value.converter.schemas.enable": "false",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "batch.suffix": "]}",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "reporter.result.topic.replication.factor": "1"
    }
}
1 Like

Just to clarify you’re running an HTTP Sink connector, to read data from avro_input_topic that you’re writing to with the REST Proxy, is that right?

Or you’re saying that when you use the Connector config with the Kafka Connect REST API to create the connector, it doesn’t work?

Thanks for the replay. I am using HTTP Sink Connector for read data from `avro_input_topic’.

In the mean time I have created on spring boot API and mapped it in the “http.api.url”. When publishing the data into the input topic I am getting an error in my spring boot application console. Please find it from below

w.s.m.s.DefaultHandlerExceptionResolvere[0;39m e[2m:e[0;39m 
Resolved [org.springframework.http.converter.HttpMessageNotReadableException: JSON parse error: Unrecognized token 'Struct': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false'); 
nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (PushbackInputStream); line: 1, column: 8]]

The presence of “Struct” reminds me of cases where I’ve seen the HTTP Sink Connector try to publish an Avro message as a String. It calls toString() which results in something that begin with “Struct:”.

I wonder if you need to add

    "request.body.format": "json",

to your connector config.

Another idea would be to try dropping your use of regex to see if that’s a factor.