Ktable Transform Values

Hi all. I am creating live stock price streaming application and I am having some issues implementing the ktable.transformValues() method.

I have two KTables, one KTable contains every single stock and stock option ticker that I am monitoring and the associated values are the latest bid and ask prices for the particular stock. The topic’s schema is below:

{
  "subjectTickerName": "AAPL US Equity",
  "bid": 152.16,
  "mid": 152.165,
  "ask": 152.17,
}

The second Ktable is a static data table (the data gets sent to the table just once when the application starts up). This KTable contains a list of 3 stock and option tickers (the latest prices of the 3 tickers are used to calculate a particular value). That said, at any given time, i need to calculate this value with the latest market data which is stored in the first KTable. The second KTable has this Schema:

{
  "revConPackageID": ,
  "underlyingTickerName": "AAPL US Equity",
  "subjectTickerName": "AAPL US 09/16/22 C9 Equity",
  "oppositeOptionTickerName": "AAPL US 09/16/22 P9 Equity"
}

The way I want to calculate this value is to use a transformValues on the second KTable and in the Transform() method, I use the first KTable’s State Store and look up each ticker to get the latest values.

I have created a ValueTransfromerWithKeySupplier and when I run the script it works but after 30 seconds or so, the script just stops and it doesn’t seem like the ValueTransformer is working.

The init function of the ValueTransformer works as I have the code printing to the console but it doesnt seem like the script gets to the “transform()” method in the class.

Below is the class:

public class RevConPackageTransformer implements ValueTransformerWithKeySupplier<String, RevConPackage, RevConPackageCalculatedPrices> {
    
    public String revConCalculatedPricesStoreName;
    public String tickerStoreName;

    public RevConPackageTransformer(String revConCalculatedPricesStoreName, String tickerStoreName) {
        this.revConCalculatedPricesStoreName = revConCalculatedPricesStoreName;
        this.tickerStoreName = tickerStoreName;
        System.out.println("dfdf" + this.tickerStoreName);
    }

    @Override
    public ValueTransformerWithKey<String, RevConPackage, RevConPackageCalculatedPrices> get() {
        return new ValueTransformerWithKey<String, RevConPackage, RevConPackageCalculatedPrices>(){

            private KeyValueStore<String,RevConPackageCalculatedPrices> kvStore;
            private KeyValueStore<String, Ticker> tickerStore;
            
            @Override
            public void init(ProcessorContext context) {
                
                kvStore = context.getStateStore(revConCalculatedPricesStoreName);
                tickerStore = context.getStateStore("tickerStoreName");
                //context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, (Punctuator) punctuate()); 
            }
            
            @Override
            public RevConPackageCalculatedPrices transform(String readOnlyKey, RevConPackage value) {
                System.out.println("HEEHRre");
                RevConPackageCalculatedPrices t = new RevConPackageCalculatedPrices(readOnlyKey);
                return t;
            }

            @Override
            public void close() {
                // can access this.state
            }
            
        };
        
    }

}

and Below is how I create the new Ktable:

KTable<String, RevConPackageCalculatedPrices> liveRevConPrices = revConPackagTable.transformValues(new RevConPackageTransformer(revConPriceStore, tickerTable.queryableStoreName()), revConPriceStore);

Does anyone see anything wrong?

Hi @krash11554 ,

Maybe try it out with cache.max.bytes.buffering set to zero.

Thanks,
Bill