Error creating bean with name defaultKafkaStreamsBuilder defined in class path resource

Issue

I have an application that is based on spring boot, spring-kafka and kafka-streams. When application starts up, it creates kafka streams topology with default list of topics. What I need to do is edit/recreate topology in runtime. For example, when application already running, there is new topic name comes and I want to add this topic to my topology. Currently I'm thinking about somehow to delete existing topology, close and clean up KafkaStreams, run logic where I create topology but with new topic name and start KafkaStreams again. I don`t want to restart my application. Can someone suggest me how to do this in runtime?


Solution

I found 1 solution. I extend StreamsBuilderFactoryBean:

@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME) @Primary public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration kStreamsConfigs) { return new DynamicStreamsBuilderFactoryBean(kStreamsConfigs); } public static class DynamicStreamsBuilderFactoryBean extends StreamsBuilderFactoryBean { private StreamsBuilder instance; public DynamicStreamsBuilderFactoryBean(final KafkaStreamsConfiguration streamsConfig) { super(streamsConfig); } @Override public boolean isSingleton() { return false; } @Override protected synchronized StreamsBuilder createInstance() { if (instance == null) { instance = new StreamsBuilder(); } return instance; } @Override public synchronized void stop() { instance = null; super.stop(); } }

And when I build topology, I, instead of using StreamsBuilder, use StreamsBuilderFactoryBean#getObject():

@Component

public class DynamicStream {

private final StreamsBuilderFactoryBean streamsBuilderFactoryBean; public void init() { StreamsBuilder builder = streamsBuilderFactoryBean.getObject(); //build topology } //call this method when stream reconfiguration is needed public void reinitialize() { streamsBuilderFactoryBean.stop(); init(); streamsBuilderFactoryBean.start(); }

}

Answered By - Igor Dumchykov
Answer Checked By - David Marino (JavaFixing Volunteer)

hey!

I have been trying spring cloud stream with kafka streams and I am facing an issue and struggling to find the root cause.

Basically I have the following topology: <tracking events topic> -> (branching events per type) -> <play events topic> -> (aggregate plays per id & time window) -> <play events store>

In order to achieve this with spring cloud stream, I have two different Functions.

@Bean public Function<KStream<Integer, String>, KStream<Integer, String>[]> processVideoEvents() {@Bean public Function<KStream<Integer, String>, KStream<Integer, WindowedVideoViewsCountMetric>> processVideoViewEvents() {spring: kafka: bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092} cloud: stream: function: definition: processVideoEvents;processVideoViewEvents bindings: processVideoEvents-in-0: destination: video.video.video-events.json processVideoEvents-out-0: destination: video.video.video-view-events.json processVideoViewEvents-in-0: destination: video.video.video-view-events.json processVideoViewEvents-out-0: destination: video.video.video-views-count.json

If I just enable one of them, everything works fine.. however, when I enable both of them to build the data pipeline that I want to achieve, I get the following error:

org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic video.video.video-view-events.json for task 0_0 due to: org.apache.kafka.common.errors.SerializationException: Can't serialize data [{"referer":"https://preview.xyz.com/","device":null,"app":"web","appVersion":null,"extraFields":null,"version":"0.1.0","event_type":"active_view","user_id":0,"user_urn":"urn:xyz:users:user:0","video_id":1,"video_urn":"urn:xyz:videos:video:4","account_key":"<some key>","site_section":"homepage","user_agent":"PostmanRuntime/7.26.1","client_timestamp":"2000-01-01 00:00:00.000 +0000","server_timestamp":"2020-07-08 14:40:30.549 +0000"}] for topic [video.video.video-view-events.json] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:167) ~[kafka-streams-2.6.0.jar:na] ... Caused by: org.apache.kafka.common.errors.SerializationException: Can't serialize data [{"referer":"https://preview.xing.com/","device":null,"app":"web","appVersion":null,"extraFields":null,"version":"0.1.0","event_type":"active_view","user_id":0,"user_urn":"urn:x-xing:users:user:0","video_id":1,"video_urn":"urn:x-xing:videos:video:4","account_key":"52a3176c8ae6e7af0099476aeccf4de0","site_section":"startpage","user_agent":"PostmanRuntime/7.26.1","client_timestamp":"2000-01-01 00:00:00.000 +0000","server_timestamp":"2020-07-08 14:40:30.549 +0000"}] for topic [video.video.video-view-events.json] Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Incompatible types: declared root type ([simple type, class com.xyz.video.model.video.metrics.count.WindowedVideoViewsCountMetric]) vs `java.lang.String`

I don't get it why spring is trying to serialize the json string to the output param of the second function when the input param of that function is expecting a KStream<Integer, String>.

Any idea?