Is there an SMT to change field names

I am using the HTTP Source Connector to pull data from an external REST API.
The data is returned and using the Expand JSON SMT to build the schema.
The problem is that the external source has fieldnames that do not conform with what is allowed (e.g. have a . in the fieldname or a space. Fieldname: EX2.2 or Fieldname: Relative Humidity)
When running the connector I get this error, which is expected: Caused by: org.apache.avro.SchemaParseException: Illegal character in: EX2.5

The way it works for me is that the data is received as a string, which is then converted to JSON using HoistField and Expand JSON SMT.

I believe that during the Expand JSON SMT the problem occurs, which is understandable.

So, my question is. Is there a way to perform a String Replace on the value before is past to Expand JSON SMT?

Thank you,
Raoul.

Hi, @rvaneynd!

If I understood you correctly, you are looking for this SMT: ReplaceField | Confluent Documentation

Whatsupbros,

I have tried that. But the problem is that I need to replace it before it is passed to Expand JSON SMT. So at that point is it not yet recognised as a Field (as it is still a simple string). As a result the ReplaceField SMT does not work (e.g. does nothing).

Raoul.

Can you share a sample of the source data, which you have before all transformations (before expandjsonsmt), what you have after applying them, and the desired resulting data, which you would like to see?

It would probably easier to assist you then…

whatsupbros,

Thank you. I will do this tomorrow or Saturday.

Raoul

Can you also share the connector config that you’re using too please

Whatsupbros, Robin,

Thank you for trying to help.

Okay this is what I get back from the HTTP Source Connector:

{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"doc":"HTTP Record Value","field":"value"},{"type":"string","optional":true,"doc":"HTTP Record Key","field":"key"},{"type":"int64","optional":true,"doc":"HTTP Record Timestamp","field":"timestamp"}],"optional":false,"name":"com.github.castorm.kafka.connect.http.Value","doc":"Message Value"},"payload":{"value":"{"PM2.5":"9","Temperature":"22.6","Relative Humidity":"31.9","Air Pressure":"1020.8","TVOC":"927","CO2":"468","CO":"0","Ozone":"7.5","NO2":"18.9","virusIndex":3,"serialNumber":"<serialNumber>","Timestamp":1616418626,"DateTime":"2021-03-22 13:10"}","key":"045915bc-a2a0-3ea0-9e1d-800a035d0c8d","timestamp":1616418624013}}

This is using this connector config (usernames, passwords & URL have been removed).

name=Test-HTTP07
kafka.topic=Test-http01
connector.class=com.github.castorm.kafka.connect.http.HttpSourceConnector
tasks.max=1
http.request.url=https://<url>
http.request.headers=Content-Type:application/json
http.request.method=POST
http.request.body={"username":"<username>","password":"<password>","serialNumber":"<serialnumber>"}
http.timer.interval.millis=30000
http.timer.catchup.interval.millis=1000
http.response.record.mapper=com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapper
http.response.parser=com.github.castorm.kafka.connect.http.response.KvHttpResponseParser

Kafka records this as a single string schema. But what I want is that Kafka records this as a proper schema.
So that is why I then try this config:

name=Test-HTTP07
kafka.topic=Test-http02
connector.class=com.github.castorm.kafka.connect.http.HttpSourceConnector
tasks.max=1
http.request.url=https://<url>
http.request.headers=Content-Type:application/json
http.request.method=POST
http.request.body={"username":"<username>","password":"<password>","serialNumber":"<serialNumber>"}
http.timer.interval.millis=30000
http.timer.catchup.interval.millis=1000
http.response.record.mapper=com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapper
http.response.parser=com.github.castorm.kafka.connect.http.response.KvHttpResponseParser

transforms=Expand
transforms.Expand.type=com.redhat.insights.expandjsonsmt.ExpandJSON$Value
transforms.Expand.sourceFields=value (or payload)

ExpandJSON returns an error very similar to this:

ExpandJSON fields missing from record: SourceRecord{sourcePartition={}, 
sourceOffset={key=72b9488a-3a3e-3bf5-8ba0-3b2164e3292e}} ConnectRecord{topic='Test-http02', kafkaPartition=null, key=72b9488a-3a3e-3bf5-8ba0-3b2164e3292e, keySchema=Schema{STRING}, value={"PM2.5":"9","Temperature":"24.9","Relative Humidity":"24.1","Air Pressure":"1025.2","TVOC":"626","CO2":"559","CO":"0","Ozone":"7.9","NO2":"36.7","serialNumber":"<serilaNumber>","Virus Index":6,"Timestamp":1618578256,"DateTime":"2021-04-16 14:04"}, valueSchema=Schema{STRING}, timestamp=1618578288690, headers=ConnectHeaders(headers=)} (com.redhat.insights.expandjsonsmt.ExpandJSON:96)

And in Kafka the data is recorded as a string.

Then as I try this config:

name=Test-HTTP07
kafka.topic=Test-http02
connector.class=com.github.castorm.kafka.connect.http.HttpSourceConnector
tasks.max=1
http.request.url=https://<url>
http.request.headers=Content-Type:application/json
http.request.method=POST
http.request.body={"username":"<username>","password":"<password>","serialNumber":"<serialNumber>"}
http.timer.interval.millis=30000
http.timer.catchup.interval.millis=1000
http.response.record.mapper=com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapper
http.response.parser=com.github.castorm.kafka.connect.http.response.KvHttpResponseParser

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

Kafka records at this, but without a schema:
{"PM2.5":"10","Temperature":"24.9","Relative Humidity":"23.9","Air Pressure":"1025.2","TVOC":"674","CO2":"532","CO":"0","Ozone":"8.2","NO2":"41.8","serialNumber":"<serialNumber>","Virus Index":6,"Timestamp":1618579096,"DateTime":"2021-04-16 14:18"}

So finally I try this config:

name=Test-HTTP07
kafka.topic=Test-http02
connector.class=com.github.castorm.kafka.connect.http.HttpSourceConnector
tasks.max=1
http.request.url=https://<url>
http.request.headers=Content-Type:application/json
http.request.method=POST
http.request.body={"username":"<username>","password":"<password>","serialNumber":"<serialNumber>"}
http.timer.interval.millis=30000
http.timer.catchup.interval.millis=1000
http.response.record.mapper=com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapper
http.response.parser=com.github.castorm.kafka.connect.http.response.KvHttpResponseParser

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

transforms=HoistValue,Expand
transforms.HoistValue.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.HoistValue.field=values

transforms.Expand.type=com.redhat.insights.expandjsonsmt.ExpandJSON$Value
transforms.Expand.sourceFields=values

And here I thought get the JSON initially reported, but not now.
Now I get the ExpandJSON fields missing error and this is recorded in Kafka:

Struct{values=Struct{PM2.5=9,Temperature=24.9,Relative Humidity=23.9,Air Pressure=1025.1,TVOC=682,CO2=515,CO=0,Ozone=8.3,NO2=43.7,serialNumber=<serialNumber>,Virus Index=6,Timestamp=1618579396,DateTime=2021-04-16 14:23}}

Between each test I make sure I deleted and recreated the topic and any schemas that might have been created.
I know this is a long post but I thought I better provide as much detail as possible.

Thank you,
Raoul.

Heh, this is a fun one. You’re hitting up against the issue that JSON has a schema when you eyeball it, but the converters in Kafka Connect deal in explicitly declared schemas.

So the HTTP Source connector returns a single field (which happens to hold a JSON object). You can’t process that with any SMT that deal in schema fields because those fields within the JSON aren’t really there (it’s just a single field that holds what happens to be a lump of JSON).

How to deal with it? You need one of the following:

  • the connector to actually create the schema (which AFAIK it can’t)
  • A Single Message Transform to apply the schema (either dynamically derive it or apply a statically defined one) - I’m not aware of an SMT that does this, although it would be cool
  • Use stream processing to apply a schema to the data. This is a pattern I’ve used and write about using ksqlDB here and here (this second example is actually also using the same HTTP source connector that you are doing).

Robin,

Thank you for the quick response.
I thought this was the case… :slight_smile:

So what I have done is just to consume it as a plain string in to kafka and kafka has a single string schema.
We use azure DataBrick upstream. Using confluents example as the bases for our upstream Azure Databrick, I have written code that basically dissembles the ReadStream dataframe from Kafka and build a new DataFrame with something along these lines.

newDf = kafkaDf \
  .withColumn('PM25', split(kafkaDf['parsedValue'], ',').getItem(0)) \
  .select('*')
   
newDf = newDf \
  .withColumn('PM25', split(kafkaDf['PM25'], ':').getItem(1)) \
  .select('*')

And that for all relevant fields.
Not pretty but it works.

Thank you for your feedback. At least I now know not to spend anymore time on the Source Connector.

Have a good weekend.
Raoul.

1 Like

Hi @rvaneynd! Have you tried to do what you need in two steps? I think it should probably work to add a flatten transformation, and then apply the replaceField one.

I mean, the transforms should look something like:

transforms=HoistValue,Expand,Flatten,RenameFields

transforms.HoistValue.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.HoistValue.field=values

transforms.Expand.type=com.redhat.insights.expandjsonsmt.ExpandJSON$Value
transforms.Expand.sourceFields=values

transforms.Flatten.type=org.apache.kafka.connect.transforms.Flatten$Value

transforms.RenameFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameFields.renames=values."PM2.5":PM_2_5,values.Temperature:TEMPERATURE,..."

This is not deriving the schema from the original HTTP response, so I am still not sure if this is what you’re trying to achieve, but probably this kind of adds some food for thought.

1 Like

Thank you. It looks like what I am after and will give it a try later.

Once again, thank you for trying to help.

I will report back ASAP.

Raoul.

@whatsupbros,

Well some very interesting results.
I set it up this way:

transforms=HoistValue,Expand,Flatten,RenameFields

transforms.HoistValue.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.HoistValue.field=values

transforms.Expand.type=com.redhat.insights.expandjsonsmt.ExpandJSON$Value
transforms.Expand.sourceFields=values

transforms.Flatten.type=org.apache.kafka.connect.transforms.Flatten$Value

transforms.RenameFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameFields.renames=values.PM2.5:PM25,values.Temperature:Temperature,values.Relative Humidity:RelativeHumidity,values.Air Pressure:AirPressure,values.TVOC:TVOC,values.CO2:CO2,values.CO:CO,values.Ozone:Ozone,values.NO2:NO2,values.serialNumber:serialNumber,values.Virus Index:VirusIndex,values.Timestamp:Timestamp,values.DateTime:DateTime

I had to apply the field renames to all fields name, even if they are already correct as otherwise I got errors.
But, IT WORKED!!! :smiley: :smiley: :smiley: :smiley:

Data is recorded in Kafka and I have got a schema that matches…

Thank you so much…

I can now apply the same principle to other I need to retrieve using the same HTTP REST API and this connector.

This has made my life so much easier.

2 Likes

That’s cool - nice job!! :+1:

I’m glad I could be of help!

Though, I’m kind of surprised that values.PM2.5:PM25 worked without quotes :sweat_smile:

Same here. But hey, it works and am really happy extremely great full for your help.

Raoul

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.