Azure Event Hub Source Connector using Kafka connect applying SMT transformation not working

Hello all, Actually I was working with Azure Event Hub as Source and using Kafka-Connect and putting the data in to a topic which is working fine. Now my requirement is to use SMT and dump to data to kafka topic with transformations such as chained transformation of ValueKey and ExtractField. I tried a lot but it is not working so need urgent help

Actually on applying transformations I am not getting any error though but no data is coming to the topic when I see in lenses in that topic

[2022-01-18 18:06:23,868] INFO WorkerSourceTask{id=kafka-eventhub-source-connector-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:209)
[2022-01-18 18:07:14,015] INFO WorkerSourceTask{id=kafka-eventhub-source-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2022-01-18 18:07:14,016] INFO WorkerSourceTask{id=kafka-eventhub-source-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2022-01-18 18:08:13,951] INFO WorkerSourceTask{id=kafka-eventhub-source-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2022-01-18 18:08:13,952] INFO WorkerSourceTask{id=kafka-eventhub-source-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433)

![Screenshot 2022-01-18 at 11.56.16 PM|690x258]

This is data without applying transformation …from Azure eventhub via kafka connect to topic
(upload://6gGXDgGqHoV7joQROwlF3LeMXxx.png)
[{"value":{"sensorDetails":{"sensorStatus":{"reading":"Not Active","mappedReading":null,"units":"NA","unResolvedReading":""},"uniqueId":"US|2482|E|1|EM|-1079246548|15|INPUT|DIGITAL|1642410686361","cc":"US","storeNo":2482,"vendorType":"E","rackIndex":1,"rackLabel":"01-HVAC","rackStatus":"ACTIVE","isRackInactive":null,"networkType":"EM","isHvac":"TRUE","modStatus":"ACTIVE","modIndex":-1079246548,"modType":"Sensor DV","modTypeID":96,"modLabel":"DIM 1 UNPLUGGD","resTS":"2022-01-17T09:11:26","reqTS":"2022-01-17T09:11:26","sensorIndex":15,"sensorLabel":"LOGIC IN3","sensorReadingType":"DIGITAL","sensorIOType":"INPUT"},"normalizationDetails":{"category":"none","isNormalized":true},"msgSchema":{"msgVersion":"1.0"},"id":"US|2482|E|1|EM|-1079246548|15|INPUT|DIGITAL|1642410686361","partitionKey":"US|2482|E|1|EM|-1079246548|15|INPUT|DIGITAL","PartitionId":15},"metadata":{"offset":43382,"partition":0,"timestamp":1642410695232,"__keysize":0,"__valsize":15205},"rownum":0,"headers":{"x-opt-partition-key":"","x-opt-enqueued-time":"2022-01-17T09:11:35.232Z","x-opt-sequence-number":"523654654","x-opt-offset":"116200367535648","properties":"{}","azure.eventhubs.namespace":"we-sb-dev-event-hub","system.properties":"{}","azure.eventhubs.hub.name":"sensoreventhub","azure.eventhubs.partition.id":"15"},"id":0},{"key":"VVN8NDY1fE58Mg==","value":{"normalizationDetails":{"rackTemp":"low temp","rackName":"LTA","rackSatTemp":"-12","reversed":false,"assetType":"rack","rackCallLetter":"A","serviceChannelId":1964301,"category":"rack","sensorDescription":"rack phase loss alarm","isNormalized":true},"sensorDetails":{"rackLabel":"RACK LTA (-12)","modTypeID":0,"rackStatus":"ACTIVE","modLabel":"","modType":"","modVersion":"0.0","modStatus":"ACTIVE","sensorLabel":"RACK PHASE LOSS","sensorStatus":{"reading":"OK","mappedReading":"0","units":null},"isHvac":"FALSE","uniqueId":"US|465|N|2|EP2|9999|9999|OUTPUT|DIGITAL|1642412977310","cc":"US","storeNo":465,"reqTS":"2022-01-17T09:49:37","resTS":"2022-01-17T09:49:37","rackIndex":2,"modIndex":9999,"sensorIndex":9999,"sensorReadingType":"DIGITAL","sensorIOType":"OUTPUT","networkType":"EP2","vendorType":"N"},"msgSchema":{"msgVersion":"1"},"partitionKey":"US|465|N|2|EP2|9999|9999|OUTPUT|DIGITAL","id":"US|465|N|2|EP2|9999|9999|OUTPUT|DIGITAL|1642412977310"},"metadata":{"offset":43383,"partition":0,"timestamp":1642412977459,"__keysize":10,"__valsize":955},"rownum":1,"headers":{"x-opt-partition-key":"US|465|N|2","x-opt-enqueued-time":"2022-01-17T09:49:37.459Z","x-opt-sequence-number":"306720240","x-opt-offset":"61602736348392","properties":"{}","azure.eventhubs.namespace":"we-sb-dev-event-hub","system.properties":"{}","azure.eventhubs.hub.name":"sensoreventhub","azure.eventhubs.partition.id":"2"},"id":1}]

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
​
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
​
  <groupId>com.xyz.dip</groupId>
  <artifactId>dip-kcass-iothub-source-connector</artifactId>
  <version>2.0</version>
​
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
    <kcaas-archetype-version>2.0</kcaas-archetype-version>
    <kafkaconnect.version>3.0.0</kafkaconnect.version>
  </properties>
​
  <dependencies>
    <!--Add all required additional dependencies here. For example - transformers, converters, connectors etc.-->
    <dependency>
      <groupId>com.xyz.dip</groupId>
      <artifactId>kcaaseventhub</artifactId>
      <version>2.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>connect-api</artifactId>
      <version>${kafkaconnect.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>connect-transforms</artifactId>
      <version>${kafkaconnect.version}</version>
    </dependency>
<!--    <dependency>-->
<!--      <groupId>org.apache.kafka</groupId>-->
<!--      <artifactId>connect-json</artifactId>-->
<!--      <version>3.0.0</version>-->
<!--    </dependency>-->
    <dependency>
      <groupId>com.github.jcustenborder.kafka.connect</groupId>
      <artifactId>kafka-connect-transform-common</artifactId>
      <version>0.1.0.12</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>connect-json</artifactId>
      <version>${kafkaconnect.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>io.confluent.connect-transformations-dependencies</groupId>
      <artifactId>connect-transforms</artifactId>
      <version>1.4.0</version>
      <scope>provided</scope>
    </dependency>
  </dependencies>
​
  <build>
    <plugins>
      <plugin>
        <artifactId>maven-clean-plugin</artifactId>
        <version>3.1.0</version>
      </plugin>
      <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
      <plugin>
        <artifactId>maven-resources-plugin</artifactId>
        <version>3.0.2</version>
      </plugin>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.0</version>
      </plugin>
      <plugin>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.22.1</version>
      </plugin>
      <plugin>
        <artifactId>maven-jar-plugin</artifactId>
        <version>3.0.2</version>
      </plugin>
      <plugin>
        <artifactId>maven-install-plugin</artifactId>
        <version>2.5.2</version>
      </plugin>
      <plugin>
        <artifactId>maven-deploy-plugin</artifactId>
        <version>2.8.2</version>
      </plugin>
      <!--Shade plugin-->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <executions>
          <execution>
            <id>default</id>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <artifactSet>
                <excludes>
                  <exclude>com.xyz.streaming:kc_config_resolver</exclude>
                  <exclude>com.xyz.streaming:kc-build-tool</exclude>
                  <exclude>com.xyz.streaming:kc_proxy_service</exclude>
                </excludes>
              </artifactSet>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>io.xyz.streaming.App</mainClass>
                </transformer>
              </transformers>
              <finalName>connectors-uber</finalName>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
              <shadedArtifactAttached>true</shadedArtifactAttached>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
​
  <repositories>
    <repository>
      <id>confluent</id>
      <url>http://packages.confluent.io/maven/</url>
    </repository>
  </repositories>
​
  <distributionManagement>
    <repository>
      <id>pangaea_releases</id>
      <url>https://repository.xyz.com/content/repositories/pangaea_releases</url>
      <uniqueVersion>true</uniqueVersion>
    </repository>
    <snapshotRepository>
      <id>pangaea_snapshots</id>
      <url>https://repository.xyz.com/content/repositories/pangaea_snapshots</url>
      <uniqueVersion>false</uniqueVersion>
    </snapshotRepository>
  </distributionManagement>
​
</project>

DOckerfile


# set user
USER 10000

# copy kcaas config files
COPY --chown=10000:10001 pom.xml $KAFKA_HOME
COPY --chown=10000:10001 kitt.yml $KAFKA_HOME
COPY --chown=10000:10001 kc_config.yaml $KAFKA_HOME
COPY --chown=10000:10001 env_properties.yaml $KAFKA_HOME

# copy connectors uber jar
COPY --chown=10000:10001 target/connectors-uber.jar $PLUGIN_PATH/

kc_config.yaml

worker:
  key.converter: org.apache.kafka.connect.storage.StringConverter
  value.converter: org.apache.kafka.connect.json.JsonConverter
  key.converter.schemas.enable: false
  value.converter.schemas.enable: false
  internal.key.converter.schemas.enable: false
  internal.value.converter.schemas.enable: false
  schema.compatibility: NONE
  group.id: iothub-kafkaconnector-test
  #offset.flush.interval.ms: 10000
  #offset.flush.timeout.ms: 5000

connectors:
  - name: kafka-eventhub-source-connector
    config:
      connector.class: io.confluent.connect.azure.eventhubs.EventHubsSourceConnector
      errors.tolerance: all
      tasks.max: 1
      max.events: 1
      value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
      value.converter.schemas.enable: false
      key.converter: org.apache.kafka.connect.storage.StringConverter
      key.converter.schemas.enable: false
      azure.eventhubs.transport.type: AMQP
      azure.eventhubs.offset.type: OFFSET
      azure.eventhubs.partition.starting.position: END_OF_STREAM
      confluent.topic.replication.factor: 1
kafka-eventhub-source-connector:
  dev:
    #topics: RET_DIP_RAW_SensorData_Test
    confluent.topic.bootstrap.servers: kafka_broker:9092
    azure.eventhubs.sas.keyname: dip-kcaas-listener
    azure.eventhubs.sas.key: 31Cegl5mUoRCYQwjFEGL9/KDSdc2gdFv5BRzheZ0+iE=
    azure.eventhubs.namespace: we-sb-dev-event-hub
    azure.eventhubs.hub.name: sensoreventhub
    azure.eventhubs.consumer.group: dip-kcaas
    reporter.result.topic.replication.factor: 1
    reporter.error.topic.replication.factor: 1
    kafka.topic: RET_DIP_RAW_SensorData_Test
    max.interval: 1000
    transforms: copyIdToKey
    transforms.copyIdToKey.type: org.apache.kafka.connect.transforms.ValueToKey
    transforms.copyIdToKey.fields: id
worker:
  dev:
    bootstrap.servers: kafka-btt:9093
    config.storage.topic: redip-eventhub-config-test
    status.storage.topic: redip-iothub-source-status-test
    offset.storage.topic: redip-iothub-source-offset-test
    auto.offset.reset: latest
    offset.storage.partitions: 1

    security.protocol: SSL
    ssl.truststore.type: JKS
    ssl.truststore.location: secret.ref://ret-dip-dev-core/re-dip/dev/kafkaconnect#truststore.jks?encoding=base64
    ssl.truststore.password: secret.value://ret-dip-dev-core/re-dip/dev/kafkaconnect#password
    ssl.keystore.type: JKS
    ssl.keystore.location: secret.ref://ret-dip-dev-core/re-dip/dev/kafkaconnect#keystore.jks?encoding=base64
    ssl.keystore.password: secret.value://ret-dip-dev-core/re-dip/dev/kafkaconnect#password
    ssl.key.password: secret.value://ret-dip-dev-core/re-dip/dev/kafkaconnect#password

    producer.security.protocol: SSL
    producer.ssl.truststore.type: JKS
    producer.ssl.truststore.location: secret.ref://ret-dip-dev-core/re-dip/dev/kafkaconnect#truststore.jks?encoding=base64
    producer.ssl.truststore.password: secret.value://ret-dip-dev-core/re-dip/dev/kafkaconnect#password
    producer.ssl.keystore.type: JKS
    producer.ssl.keystore.location: secret.ref://ret-dip-dev-core/re-dip/dev/kafkaconnect#keystore.jks?encoding=base64
    producer.ssl.keystore.password: secret.value://ret-dip-dev-core/re-dip/dev/kafkaconnect#password
    producer.ssl.key.password: secret.value://ret-dip-dev-core/re-dip/dev/kafkaconnect#password

    confluent.topic.security.protocol: SSL
    confluent.topic.ssl.truststore.type: JKS
    confluent.topic.ssl.truststore.location: secret.ref://ret-dip-dev-core/re-dip/dev/kafkaconnect#truststore.jks?encoding=base64
    confluent.topic.ssl.truststore.password: secret.value://ret-dip-dev-core/re-dip/dev/kafkaconnect#password
    confluent.topic.ssl.keystore.type: JKS
    confluent.topic.ssl.keystore.location: secret.ref://ret-dip-dev-core/re-dip/dev/kafkaconnect#keystore.jks?encoding=base64
    confluent.topic.ssl.keystore.password: secret.value://ret-dip-dev-core/re-dip/dev/kafkaconnect#password
    confluent.topic.ssl.key.password: secret.value://ret-dip-dev-core/re-dip/dev/kafkaconnect#password

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.