Whatsupbros, Robin,
Thank you for trying to help.
Okay this is what I get back from the HTTP Source Connector:
{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"doc":"HTTP Record Value","field":"value"},{"type":"string","optional":true,"doc":"HTTP Record Key","field":"key"},{"type":"int64","optional":true,"doc":"HTTP Record Timestamp","field":"timestamp"}],"optional":false,"name":"com.github.castorm.kafka.connect.http.Value","doc":"Message Value"},"payload":{"value":"{"PM2.5":"9","Temperature":"22.6","Relative Humidity":"31.9","Air Pressure":"1020.8","TVOC":"927","CO2":"468","CO":"0","Ozone":"7.5","NO2":"18.9","virusIndex":3,"serialNumber":"<serialNumber>","Timestamp":1616418626,"DateTime":"2021-03-22 13:10"}","key":"045915bc-a2a0-3ea0-9e1d-800a035d0c8d","timestamp":1616418624013}}
This is using this connector config (usernames, passwords & URL have been removed).
name=Test-HTTP07
kafka.topic=Test-http01
connector.class=com.github.castorm.kafka.connect.http.HttpSourceConnector
tasks.max=1
http.request.url=https://<url>
http.request.headers=Content-Type:application/json
http.request.method=POST
http.request.body={"username":"<username>","password":"<password>","serialNumber":"<serialnumber>"}
http.timer.interval.millis=30000
http.timer.catchup.interval.millis=1000
http.response.record.mapper=com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapper
http.response.parser=com.github.castorm.kafka.connect.http.response.KvHttpResponseParser
Kafka records this as a single string schema. But what I want is that Kafka records this as a proper schema.
So that is why I then try this config:
name=Test-HTTP07
kafka.topic=Test-http02
connector.class=com.github.castorm.kafka.connect.http.HttpSourceConnector
tasks.max=1
http.request.url=https://<url>
http.request.headers=Content-Type:application/json
http.request.method=POST
http.request.body={"username":"<username>","password":"<password>","serialNumber":"<serialNumber>"}
http.timer.interval.millis=30000
http.timer.catchup.interval.millis=1000
http.response.record.mapper=com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapper
http.response.parser=com.github.castorm.kafka.connect.http.response.KvHttpResponseParser
transforms=Expand
transforms.Expand.type=com.redhat.insights.expandjsonsmt.ExpandJSON$Value
transforms.Expand.sourceFields=value (or payload)
ExpandJSON returns an error very similar to this:
ExpandJSON fields missing from record: SourceRecord{sourcePartition={},
sourceOffset={key=72b9488a-3a3e-3bf5-8ba0-3b2164e3292e}} ConnectRecord{topic='Test-http02', kafkaPartition=null, key=72b9488a-3a3e-3bf5-8ba0-3b2164e3292e, keySchema=Schema{STRING}, value={"PM2.5":"9","Temperature":"24.9","Relative Humidity":"24.1","Air Pressure":"1025.2","TVOC":"626","CO2":"559","CO":"0","Ozone":"7.9","NO2":"36.7","serialNumber":"<serilaNumber>","Virus Index":6,"Timestamp":1618578256,"DateTime":"2021-04-16 14:04"}, valueSchema=Schema{STRING}, timestamp=1618578288690, headers=ConnectHeaders(headers=)} (com.redhat.insights.expandjsonsmt.ExpandJSON:96)
And in Kafka the data is recorded as a string.
Then as I try this config:
name=Test-HTTP07
kafka.topic=Test-http02
connector.class=com.github.castorm.kafka.connect.http.HttpSourceConnector
tasks.max=1
http.request.url=https://<url>
http.request.headers=Content-Type:application/json
http.request.method=POST
http.request.body={"username":"<username>","password":"<password>","serialNumber":"<serialNumber>"}
http.timer.interval.millis=30000
http.timer.catchup.interval.millis=1000
http.response.record.mapper=com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapper
http.response.parser=com.github.castorm.kafka.connect.http.response.KvHttpResponseParser
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
Kafka records at this, but without a schema:
{"PM2.5":"10","Temperature":"24.9","Relative Humidity":"23.9","Air Pressure":"1025.2","TVOC":"674","CO2":"532","CO":"0","Ozone":"8.2","NO2":"41.8","serialNumber":"<serialNumber>","Virus Index":6,"Timestamp":1618579096,"DateTime":"2021-04-16 14:18"}
So finally I try this config:
name=Test-HTTP07
kafka.topic=Test-http02
connector.class=com.github.castorm.kafka.connect.http.HttpSourceConnector
tasks.max=1
http.request.url=https://<url>
http.request.headers=Content-Type:application/json
http.request.method=POST
http.request.body={"username":"<username>","password":"<password>","serialNumber":"<serialNumber>"}
http.timer.interval.millis=30000
http.timer.catchup.interval.millis=1000
http.response.record.mapper=com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapper
http.response.parser=com.github.castorm.kafka.connect.http.response.KvHttpResponseParser
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
transforms=HoistValue,Expand
transforms.HoistValue.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.HoistValue.field=values
transforms.Expand.type=com.redhat.insights.expandjsonsmt.ExpandJSON$Value
transforms.Expand.sourceFields=values
And here I thought get the JSON initially reported, but not now.
Now I get the ExpandJSON fields missing error and this is recorded in Kafka:
Struct{values=Struct{PM2.5=9,Temperature=24.9,Relative Humidity=23.9,Air Pressure=1025.1,TVOC=682,CO2=515,CO=0,Ozone=8.3,NO2=43.7,serialNumber=<serialNumber>,Virus Index=6,Timestamp=1618579396,DateTime=2021-04-16 14:23}}
Between each test I make sure I deleted and recreated the topic and any schemas that might have been created.
I know this is a long post but I thought I better provide as much detail as possible.
Thank you,
Raoul.