Kafka Custom Deserializer

Hi Folks,

So far, I’ve been able to create a KStream with the help of a topic.

KStream<String, Object> testqa2 = builder.stream("testqa2", Consumed.with(Serdes.String(), Serdes.String()))
                .mapValues(value -> {
                    System.out.println(value);
                    return value;
                });

It doesn’t print anything, so on debbuging - I realized I am just creating my KStream. There is no data in it.

I am having a litte trouble creating serializer/deserializer for worker class.

package com.copart.mwa.Avro;

public class Worker {

private static String WorkerActivityName;
private static String WorkerSid;
private static String WorkerPreviousActivityName;
private static String WorkerPreviousActivitySid;

public String getWorkerActivityName() {
    return WorkerActivityName;
}

public void setWorkerActivityName(String workerActivityName) {
    WorkerActivityName = workerActivityName;
}

public static String getWorkerSid() {
    return WorkerSid;
}

public void setWorkerSid(String workerSid) {
    WorkerSid = workerSid;
}

public String getWorkerPreviousActivityName() {
    return WorkerPreviousActivityName;
}

public void setWorkerPreviousActivityName(String workerPreviousActivityName) {
    WorkerPreviousActivityName = workerPreviousActivityName;
}

public String getWorkerPreviousActivitySid() {
    return WorkerPreviousActivitySid;
}

public void setWorkerPreviousActivitySid(String workerPreviousActivitySid) {
    WorkerPreviousActivitySid = workerPreviousActivitySid;
}

@Override
public String toString() {
    return "Worker(" + WorkerSid + ", " + WorkerActivityName + ")";
} }

And the message from the producer to the consumer is a JSON (converted to String):

  {
"WorkerActivityName": "Available",
"EventType": "worker.activity.update",
"ResourceType": "worker",
"WorkerTimeInPreviousActivityMs": "237",
"Timestamp": "1626114642",
"WorkerActivitySid": "WAc9030ef021bc1786d3ae11544f4d9883",
"WorkerPreviousActivitySid": "WAf4feb231e97c1878fecc58b26fdb95f3",
"WorkerTimeInPreviousActivity": "0",
"AccountSid": "AC8c5cd8c9ba538090da104b26d68a12ec",
"WorkerName": "Dorothy.Finegan@Copart.Com",
"Sid": "EV284c8a8bc27480e40865263f0b42e5cf",
"TimestampMs": "1626114642204",
"P": "WKe638256376188fab2a98cccb3c803d67",
"WorkspaceSid": "WS38b10d521442ecb74fcc263d5a4d726e",
"WorkspaceName": "Copart-MiPhone",
"WorkerPreviousActivityName": "Unavailable(RNA)",
"EventDescription": "Worker Dorothy.Finegan@Copart.Com updated to Available Activity",
"ResourceSid": "WKe638256376188fab2a98cccb3c803d67",
"WorkerAttributes": "{\"miphone_dept\":[\"USA_YRD_OPS\"],\"languages\":[\"en\"],\"home_region\":\"GL\",\"roles\":[\"supervisor\"],\"miphone_yards\":[\"81\"],\"miphone_enabled\":true,\"miphone_states\":[\"IL\"],\"home_state\":\"IL\",\"skills\":[\"YD_SELLER\",\"YD_TITLE\"],\"home_division\":\"Northern\",\"miphone_divisions\":[\"Northern\"],\"miphone_functions\":[\"outbound_only\"],\"full_name\":\"Dorothy Finegan\",\"miphone_regions\":[\"GL\"],\"home_country\":\"USA\",\"copart_user_id\":\"USA3204\",\"home_yard\":\"81\",\"home_dept\":\"USA_YRD_OPS\",\"email\":\"dorothy.finegan@copart.com\",\"home_dept_category\":\"OPS\",\"contact_uri\":\"client:Dorothy_2EFinegan_40Copart_2ECom\",\"queue_activity\":\"Available\",\"teams\":[],\"remote_employee\":false,\"miphone_call_center_units\":[\"USA_YRD_OPS|81\"],\"miphone_call_center_teams\":[]}"
}

I want to implemenet a customer deserializer where

“WorkspaceSid”: “WS38b10d521442ecb74fcc263d5a4d726e”, is the key and the remaining values of the other attributes act as the value for the key-value pair.

Thanks,
Anmol

I want to implemenet a customer deserializer where

“WorkspaceSid”: “ WS38b10d521442ecb74fcc263d5a4d726e ”, is the key and the remaining values of the other attributes act as the value for the key-value pair.

In Kafka, a (de)serializer is always for the key or the value (if you want to use the same data format for both key and value, you would specify the same (de)serializer for each). Thus, there is not such thing as a (de)serializer that sets a value and key at the same time.

As your producer seems to write JSON, you might want use the provide JSON Serdes (cf https://kafka.apache.org/28/documentation/streams/developer-guide/datatypes.html#json) when reading the data (or just read the value as String and use mapValues() to parse the String into a JSON). It seems, that the producer does not set the key, but leaves it empty (ie, null). For this case, after reading the topic as a KStream, you can apply selectKey() to set the key as desired.

Something like this:

// use `Void` as key type, because there is no key
KStream<Void, JSON> input = builder.stream(..., Consumed.with(Serdes.Void(), Serdes.from(...));

KStream<String, JSON> keyedStream = input.selectKey(/* extract WorkspaceSid from the value */);

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.