Hi,
I’m facing an issue when running a simple query trying to consume a normal topic with Avro data.
The query is select * from sdm.dk.sales.sales-orders.v1
Below you can see the show create table result.
CREATE TABLE `cloud-foundation-kafka-test`.`test-cluster`.`sdm.dk.sales.sales-orders.v1` (
`OmsNumber` VARCHAR(2147483647) NOT NULL COMMENT ‘The OMS number is the identifier of the entity in OMS system. The structure of the OMS number is described on Azure DevOps Services | Sign In’,
`SalesOrder` ROW<`OmsNumber` VARCHAR(2147483647) NOT NULL, `CompanyCode` INT NOT NULL, `OrderNumber` INT NOT NULL, `RequisitionNumber` BIGINT, `BuildingSite` VARCHAR(2147483647), `CustomerReference` VARCHAR(2147483647), `SalesHandlerId` VARCHAR(2147483647) NOT NULL, `StoreId` INT NOT NULL, `DeductionPercentage` DECIMAL(7, 3) NOT NULL, `DeliveryNotes` VARCHAR(2147483647), `PickUpStoreId` INT NOT NULL, `DeliveryCode` VARCHAR(2147483647), `DeliveryDate` TIMESTAMP(3) WITH LOCAL TIME ZONE, `CreditNote` BOOLEAN NOT NULL, `OrderDate` TIMESTAMP(3) WITH LOCAL TIME ZONE, `CountryCode` VARCHAR(2147483647), `PaymentType` VARCHAR(2147483647) NOT NULL, `TotalPriceInclVat` DECIMAL(11, 2) NOT NULL, `InvoiceNumber` BIGINT, `CustomerNumber` INT, `MdmCustomerNumber` VARCHAR(2147483647), `IsInternalTrade` BOOLEAN NOT NULL, `OrderStatus` VARCHAR(2147483647) NOT NULL, `IsDiscountApplied` BOOLEAN NOT NULL, `OrderNote` VARCHAR(2147483647), `DeliveryContactInfo` ROW<`ContactPhoneNumber` VARCHAR(2147483647), `EmailAddress` VARCHAR(2147483647)>, `ValidatedDeliveryAddress` ROW<`Country` VARCHAR(2147483647) NOT NULL, `PostalCode` VARCHAR(2147483647) NOT NULL, `FirstName` VARCHAR(2147483647), `LastName` VARCHAR(2147483647), `AdditionalInformation` VARCHAR(2147483647), `City` VARCHAR(2147483647), `Company` VARCHAR(2147483647), `StreetNumber` VARCHAR(2147483647), `PoBox` VARCHAR(2147483647), `StreetName` VARCHAR(2147483647), `Attention` VARCHAR(2147483647)>, `DeliveryAddress` ROW<`RouteId` INT, `CustomerName` VARCHAR(2147483647), `StreetAndNumber` VARCHAR(2147483647), `PostalCodeAndCity` VARCHAR(2147483647), `Address4` VARCHAR(2147483647)> NOT NULL, `TermsOfPayment` INT, `ShippingCondition` VARCHAR(2147483647), `CashPayment` BOOLEAN NOT NULL, `SalesPersonNumber` VARCHAR(2147483647) NOT NULL, `ManualCaseNumber` BIGINT NOT NULL, `EditCode` VARCHAR(2147483647), `OrderLockId` VARCHAR(2147483647), `PointOfSaleMarking` VARCHAR(2147483647), `NoDeliveryFee` BOOLEAN NOT NULL, `NoPackagingFee` BOOLEAN NOT NULL, `NoDispatchFee` BOOLEAN NOT NULL, `IsDeleted` BOOLEAN NOT NULL, `PickingInstruction` VARCHAR(2147483647), `MarkingInstruction` VARCHAR(2147483647), `DeliveryInstruction` VARCHAR(2147483647), `ReturnReasonCode` VARCHAR(2147483647), `IsReadyForPicking` BOOLEAN NOT NULL, `DeliveryMethod` VARCHAR(2147483647) NOT NULL, `OriginCode` VARCHAR(2147483647), `IsRental` BOOLEAN NOT NULL, `WebOrderId` BIGINT, `CashRegisterId` VARCHAR(2147483647), `BatchNumber` BIGINT, `InvoiceDate` TIMESTAMP(3) WITH LOCAL TIME ZONE, `ProjectSalesNumber` INT, `ReservationStatus` VARCHAR(2147483647) NOT NULL, `DeliveryWeek` INT, `IsSpecialOrder` BOOLEAN NOT NULL, `OrderLines` ARRAY<ROW<`IsProjectSale` BOOLEAN, `HandlingUnitOfMeasure` VARCHAR(2147483647), `ItemGroup` VARCHAR(2147483647), `ItemNumber` VARCHAR(2147483647), `Aspect4MasterDataItemNumber` VARCHAR(2147483647), `OrderLineNumber` INT NOT NULL, `PriceUnitOfMeasure` VARCHAR(2147483647), `QuantityInHandlingUnitOfMeasure` DECIMAL(11, 4), `QuantityInPriceUnitOfMeasure` DECIMAL(11, 4) NOT NULL, `RequisitionNumber` BIGINT NOT NULL, `ProductName` ARRAY<VARCHAR(2147483647) NOT NULL> NOT NULL, `ProductType` VARCHAR(2147483647), `BasePriceForPriceUnitOfMeasureExclVat` DECIMAL(11, 2) NOT NULL, `DiscountPercentage1` DECIMAL(7, 3) NOT NULL, `DiscountPercentage2` DECIMAL(7, 3) NOT NULL, `CostPrice` DECIMAL(11, 2) NOT NULL, `EditingCode` VARCHAR(2147483647), `PlaceOfPicking` INT NOT NULL, `IsPriceAdjusted` BOOLEAN NOT NULL, `OrderLineVatAmount` DECIMAL(11, 2) NOT NULL, `OrderLinePriceExclVat` DECIMAL(11, 2) NOT NULL, `OriginalLineNumber` INT NOT NULL, `RequisitionLineNumber` INT NOT NULL, `DispatchQuantity` DECIMAL(11, 3) NOT NULL, `VatPercentage` DECIMAL(5, 2) NOT NULL, `VatCode` INT, `IsDeleted` BOOLEAN NOT NULL, `CampaignId` BIGINT NOT NULL, `IsFreeOfCharge` BOOLEAN NOT NULL, `OrderLineStatus` VARCHAR(2147483647), `ProductId` VARCHAR(2147483647), `SupplierProductId` VARCHAR(2147483647), `IsComment` BOOLEAN NOT NULL, `StockBearing` BOOLEAN NOT NULL, `StockControl` VARCHAR(2147483647), `QuantityInStockKeepingUnit` DECIMAL(11, 3), `ItemType` VARCHAR(2147483647), `ItemLength` VARCHAR(2147483647)> NOT NULL> NOT NULL, `FikNumber` VARCHAR(2147483647), `DocumentReferences` ARRAY<ROW<`System` VARCHAR(2147483647) NOT NULL, `DocumentType` VARCHAR(2147483647) NOT NULL, `Id` VARCHAR(2147483647) NOT NULL> NOT NULL> NOT NULL, `IsUnknownCustomer` BOOLEAN NOT NULL, `ValueDate` TIMESTAMP(3) WITH LOCAL TIME ZONE, `ClosedBeforeSnapshot` BOOLEAN NOT NULL, `RouteId` INT, `SequenceNumber` INT, `BranchType` VARCHAR(2147483647) NOT NULL, `PaymentTransactions` ARRAY<ROW<`WorkStationId` VARCHAR(2147483647), `TransactionDate` TIMESTAMP(3) WITH LOCAL TIME ZONE, `TransactionType` VARCHAR(2147483647) NOT NULL, `SalesPersonId` INT NOT NULL, `CustomerNumber` INT NOT NULL, `InvoiceNumber` BIGINT NOT NULL, `TotalAmount` DECIMAL(11, 4) NOT NULL, `SplitAmount` DECIMAL(11, 4) NOT NULL, `IsSplitPayment` BOOLEAN NOT NULL, `SalesType` VARCHAR(2147483647), `DepositNumber` BIGINT NOT NULL, `IsDeleted` BOOLEAN> NOT NULL> NOT NULL, `CustomerCardNumber` BIGINT, `CreatedDate` TIMESTAMP(3) WITH LOCAL TIME ZONE, `ModifiedDate` TIMESTAMP(3) WITH LOCAL TIME ZONE, `CreatedBy` VARCHAR(2147483647), `ModifiedBy` VARCHAR(2147483647)> NOT NULL,
`MainChanges` ROW<`ChangeType` VARCHAR(2147483647) NOT NULL, `SalesOrderUpdated` ROW<`FieldChanges` ARRAY<VARCHAR(2147483647) NOT NULL>>, `SalesOrderOpenStatusUpdated` ROW<`OldOrderStatus` VARCHAR(2147483647) NOT NULL, `NewOrderStatus` VARCHAR(2147483647) NOT NULL, `OldReservationStatus` VARCHAR(2147483647) NOT NULL, `NewReservationStatus` VARCHAR(2147483647) NOT NULL>, `PickUpStoreUpdated` ROW<`OldPickUpStoreId` INT NOT NULL, `NewPickUpStoreId` INT NOT NULL>, `PlaceOfPickingUpdated` ROW<`OldPlaceOfPicking` INT NOT NULL, `NewPlaceOfPicking` INT NOT NULL, `OriginalLineNumber` INT NOT NULL>, `GoodsIssuedUpdated` ROW<`GoodsIssuedLines` ARRAY<ROW<`OriginalOrderLineNumber` INT NOT NULL, `DeliveredQuantity` DECIMAL(11, 3) NOT NULL, `TotalDeliveredQuantity` DECIMAL(11, 3) NOT NULL, `LineClosed` BOOLEAN NOT NULL> NOT NULL> NOT NULL>> NOT NULL,
CONSTRAINT `PRIMARY` PRIMARY KEY (`OmsNumber`) NOT ENFORCED
)
DISTRIBUTED BY HASH(`OmsNumber`) INTO 5 BUCKETS
WITH (
‘changelog.mode’ = ‘upsert’,
‘connector’ = ‘confluent’,
‘kafka.cleanup-policy’ = ‘compact’,
‘kafka.max-message-size’ = ‘2097164 bytes’,
‘kafka.retention.size’ = ‘0 bytes’,
‘kafka.retention.time’ = ‘0 ms’,
‘key.format’ = ‘avro-registry’,
‘scan.bounded.mode’ = ‘unbounded’,
‘scan.startup.mode’ = ‘earliest-offset’,
‘value.format’ = ‘avro-registry’
)
The error being returned is the following:
Failed to deserialize Avro record.
Suppressed: Cannot read AVRO record.
Suppressed: Failed to deserialize AVRO record.
There is no other information which can actually help.
We can see the messages normally in the Confluent Cloud interface, no issue there, and we even have consumers getting messages from that topic.
But for some reason Flink can’t process the data.
Can anyone help?