Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 138 out of bounds for length 2

Hi All, am using flink with kafka schema registry, am getting ArrayIndexoutOfbound exception, when i tired to consume the data it is working fine, but when i used flink am getting exception, here am using flink,avro schema, java kafka, please find the below code and log, each time am getting different number index, when i ran this time am getting 138, if ran next time, am getting different number index, and i have created pojo from my avsc file only, if data and schema is different, i will get Avro exception but, it is saying AIOB
please help me
//my code
public class Test{
public static void main(String args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<My_pojo> kafkaSource = KafkaSource.<enb_Record>builder()

			.setBootstrapServers("localhost:9092")
				 .setTopics("testtopic")
				.setGroupId("group1")
				.setStartingOffsets(OffsetsInitializer.latest())
				 .setDeserializer( KafkaRecordDeserializationSchema.valueOnly(
				  ConfluentRegistryAvroDeserializationSchema.forSpecific( My_pojo.class,
				 "http://127.0.0.1:80")))
				 .build();
DataStream<My_pojo> records = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),
				"transactions");// here am using Data stream of Flink

records.print();// here am getting error AIOB
}
}
logs:

[Source Data Fetcher for Source: transactions → Sink: Print to Std. Out (4/8)#0] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=group1-3, groupId=group1] Cluster ID: 0K1PrikxQGaDOoj65baL3w
0Vendor_ID====1
1 Report_UUID====[2, -54, 123, -70, -12, 19, -21, 67, -4, -125, 50, 93, -54, 114, 67, -57]
[Source: transactions → Sink: Print to Std. Out (4/8)#0] INFO org.apache.flink.connector.base.source.reader.SourceReaderBase - Closing Source Reader.
[Source: transactions → Sink: Print to Std. Out (4/8)#0] INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting down split fetcher 0
[Source Data Fetcher for Source: transactions → Sink: Print to Std. Out (4/8)#0] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=group1-3, groupId=group1] Resetting generation and member id due to: consumer pro-actively leaving the group
[Source Data Fetcher for Source: transactions → Sink: Print to Std. Out (4/8)#0] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=group1-3, groupId=group1] Request joining group due to: consumer pro-actively leaving the group
[Source Data Fetcher for Source: transactions → Sink: Print to Std. Out (4/8)#0] INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=group1-3, groupId=group1] Node 1 sent an invalid full fetch response with extraIds=(SaZBxfGsTFaNHQ9gzHMznw), response=()
[Source Data Fetcher for Source: transactions → Sink: Print to Std. Out (4/8)#0] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[Source Data Fetcher for Source: transactions → Sink: Print to Std. Out (4/8)#0] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[Source Data Fetcher for Source: transactions → Sink: Print to Std. Out (4/8)#0] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
[Source Data Fetcher for Source: transactions → Sink: Print to Std. Out (4/8)#0] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[Source Data Fetcher for Source: transactions → Sink: Print to Std. Out (4/8)#0] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for group1-3 unregistered
[Source Data Fetcher for Source: transactions → Sink: Print to Std. Out (4/8)#0] INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Split fetcher 0 exited.
[Source: transactions → Sink: Print to Std. Out (4/8)#0] WARN org.apache.flink.runtime.taskmanager.Task - Source: transactions → Sink: Print to Std. Out (4/8)#0 (d0ac2e7ead86354ec981c68f4c6da815_cbc357ccb763df2852fee8c4fc7d55f2_3_0) switched from RUNNING to FAILED with failure cause:
java.io.IOException: Failed to deserialize consumer record due to
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 138 out of bounds for length 2
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:109)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
… 14 more
[Source: transactions → Sink: Print to Std. Out (4/8)#0] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: transactions → Sink: Print to Std. Out (4/8)#0 (d0ac2e7ead86354ec981c68f4c6da815_cbc357ccb763df2852fee8c4fc7d55f2_3_0).
[flink-pekko.actor.default-dispatcher-4] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: transactions → Sink: Print to Std. Out (4/8)#0 d0ac2e7ead86354ec981c68f4c6da815_cbc357ccb763df2852fee8c4fc7d55f2_3_0.
[flink-pekko.actor.default-dispatcher-6] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: transactions → Sink: Print to Std. Out (4/8) (d0ac2e7ead86354ec981c68f4c6da815_cbc357ccb763df2852fee8c4fc7d55f2_3_0) switched from RUNNING to FAILED on 432ba4e6-9cd2-4adb-a922-d702097781fb @ 127.0.0.1 (dataPort=-1).
java.io.IOException: Failed to deserialize consumer record due to
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 138 out of bounds for length 2
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:109)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
… 14 more
[flink-pekko.actor.default-dispatcher-4] INFO org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager - Received resource requirements from job a2c15917bdc64b84176a97afa1674d07: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=7}]
[SourceCoordinator-Source: transactions] INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator - Removing registered reader after failure for subtask 3 (#0) of source Source: transactions.
[flink-pekko.actor.default-dispatcher-6] INFO org.apache.flink.runtime.jobmaster.JobMaster - 1 tasks will be restarted to recover the failed task d0ac2e7ead86354ec981c68f4c6da815_cbc357ccb763df2852fee8c4fc7d55f2_3_0.
[flink-pekko.actor.default-dispatcher-6] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job kddi-converter (a2c15917bdc64b84176a97afa1674d07) switched from state RUNNING to RESTARTING.
[flink-pekko.actor.default-dispatcher-6] WARN org.apache.flink.runtime.taskmanager.TaskManagerLocation - No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.
[flink-pekko.actor.default-dispatcher-7] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job kddi-converter (a2c15917bdc64b84176a97afa1674d07) switched from state RESTARTING to RUNNING.
[flink-pekko.actor.default-dispatcher-7] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - No checkpoint found during restore.
[SourceCoordinator-Source: transactions] INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator - Recovering subtask 3 to checkpoint -1 for source Source: transactions to checkpoint.
[flink-pekko.actor.default-dispatcher-7] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: transactions → Sink: Print to Std. Out (4/8) (d0ac2e7ead86354ec981c68f4c6da815_cbc357ccb763df2852fee8c4fc7d55f2_3_1) switched from CREATED to SCHEDULED.
[flink-pekko.actor.default-dispatcher-6] INFO org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager - Received resource requirements from job a2c15917bdc64b84176a97afa1674d07: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=8}]
[flink-pekko.actor.default-dispatcher-7] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: transactions → Sink: Print to Std. Out (4/8) (d0ac2e7ead86354ec981c68f4c6da815_cbc357ccb763df2852fee8c4fc7d55f2_3_1) switched from SCHEDULED to DEPLOYING.
[flink-pekko.actor.default-dispatcher-7] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: transactions → Sink: Print to Std. Out (4/8) (attempt #1) with attempt id d0ac2e7ead86354ec981c68f4c6da815_cbc357ccb763df2852fee8c4fc7d55f2_3_1 and vertex id cbc357ccb763df2852fee8c4fc7d55f2_3 to 432ba4e6-9cd2-4adb-a922-d702097781fb @ 127.0.0.1 (dataPort=-1) with allocation id cad949571eeec85bee7e07c151bf324e
[flink-pekko.actor.default-dispatcher-7] INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot cad949571eeec85bee7e07c151bf324e.
[flink-pekko.actor.default-dispatcher-7] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task Source: transactions → Sink: Print to Std. Out (4/8)#1 (d0ac2e7ead86354ec981c68f4c6da815_cbc357ccb763df2852fee8c4fc7d55f2_3_1), deploy into slot with allocation id cad949571eeec85bee7e07c151bf324e.
[Source: transactions → Sink: Print to Std. Out (4/8)#1] INFO org.apache.flink.runtime.taskmanager.Task - Source: transactions → Sink: Print to Std. Out (4/8)#1 (d0ac2e7ead86354ec981c68f4c6da815_cbc357ccb763df2852fee8c4fc7d55f2_3_1) switched from CREATED to DEPLOYING.
[Source: transactions → Sink: Print to Std. Out (4/8)#1] INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Source: transactions → Sink: Print to Std. Out (4/8)#1 (d0ac2e7ead86354ec981c68f4c6da815_cbc357ccb763df2852fee8c4fc7d55f2_3_1) [DEPLOYING].
[Source: transactions → Sink: Print to Std. Out (4/8)#1] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory org.apache.flink.runtime.state.hashmap.HashMapStateBackend@209fb2e6
[Source: transactions → Sink: Print to Std. Out (4/8)#1] INFO org.apache.flink.runtime.state.StateBackendLoader - State backend loader loads the state backend as HashMapStateBackend
[Source: transactions → Sink: Print to Std. Out (4/8)#1] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Checkpoint storage is set to ‘jobmanager’
[Source: transactions → Sink: Print to Std. Out (4/8)#1] INFO org.apache.flink.runtime.taskmanager.Task - Source: transactions → Sink: Print to Std. Out (4/8)#1 (d0ac2e7ead86354ec981c68f4c6da815_cbc357ccb763df2852fee8c4fc7d55f2_3_1) switched from DEPLOYING to INITIALIZING.
[flink-pekko.actor.default-dispatcher-6] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: transactions → Sink: Print to Std. Out (4/8) (d0ac2e7ead86354ec981c68f4c6da815_cbc357ccb763df2852fee8c4fc7d55f2_3_1) switched from DEPLOYING to INITIALIZING.
[Source: transactions → Sink: Print to Std. Out (4/8)#1] INFO org.apache.flink.runtime.state.StateBackendLoader - State backend is set to heap memory org.apache.flink.runtime.state.hashmap.HashMapStateBackend@6dcc7fc2
[Source: transactions → Sink: Print to Std. Out (4/8)#1] INFO org.apache.flink.runtime.state.StateBackendLoader - State backend is set to heap memory org.apache.flink.runtime.state.hashmap.HashMapStateBackend@4d30e76e
[SourceCoordinator-Source: transactions] INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source: transactions registering reader for parallel task 3 (#1) @
[Source: transactions → Sink: Print to Std. Out (4/8)#1] INFO org.apache.flink.runtime.taskmanager.Task - Source: transactions → Sink: Print to Std. Out (4/8)#1 (d0ac2e7ead86354ec981c68f4c6da815_cbc357ccb763df2852fee8c4fc7d55f2_3_1) switched from INITIALIZING to RUNNING.
[SourceCoordinator-Source: transactions] INFO org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator - Assigning splits to readers {3=[[Partition: avads_enb-0, StartingOffset: 71829976, StoppingOffset: -9223372036854775808]]}
[flink-pekko.actor.default-dispatcher-6] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: transactions → Sink: Print to Std. Out (4/8) (d0ac2e7ead86354ec981c68f4c6da815_cbc357ccb763df2852fee8c4fc7d55f2_3_1) switched from INITIALIZING to RUNNING.
[Source: transactions → Sink: Print to Std. Out (4/8)#1] INFO org.apache.flink.connector.base.source.reader.SourceReaderBase - Adding split(s) to reader: [[Partition: avads_enb-0, StartingOffset: 71829976, StoppingOffset: -9223372036854775808]]

please help me
thanks,
Venkat

Well, you should post in a Flink channel, not Kafka Streams, for this case. Kafka Streams is Kafka’s native stream processing Java library, similar to Flink.