IssueI have an application that is based on spring boot, spring-kafka and kafka-streams. When application starts up, it creates kafka streams topology with default list of topics. What I need to do is edit/recreate topology in runtime. For example, when application already running, there is new topic name comes and I want to add this topic to my topology. Currently I'm thinking about somehow to delete existing topology, close and clean up KafkaStreams, run logic where I create topology but with new topic name and start KafkaStreams again. I don`t want to restart my application. Can someone suggest me how to do this in runtime? SolutionI found 1 solution. I extend StreamsBuilderFactoryBean: @Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME) @Primary public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration kStreamsConfigs) { return new DynamicStreamsBuilderFactoryBean(kStreamsConfigs); } public static class DynamicStreamsBuilderFactoryBean extends StreamsBuilderFactoryBean { private StreamsBuilder instance; public DynamicStreamsBuilderFactoryBean(final KafkaStreamsConfiguration streamsConfig) { super(streamsConfig); } @Override public boolean isSingleton() { return false; } @Override protected synchronized StreamsBuilder createInstance() { if (instance == null) { instance = new StreamsBuilder(); } return instance; } @Override public synchronized void stop() { instance = null; super.stop(); } }And when I build topology, I, instead of using StreamsBuilder, use StreamsBuilderFactoryBean#getObject(): public class DynamicStream { private final StreamsBuilderFactoryBean streamsBuilderFactoryBean; public void init() { StreamsBuilder builder = streamsBuilderFactoryBean.getObject(); //build topology } //call this method when stream reconfiguration is needed public void reinitialize() { streamsBuilderFactoryBean.stop(); init(); streamsBuilderFactoryBean.start(); }}
Answered By - Igor Dumchykov hey! I have been trying spring cloud stream with kafka streams and I am facing an issue and struggling to find the root cause. Basically I have the following topology: <tracking events topic> -> (branching events per type) -> <play events topic> -> (aggregate plays per id & time window) -> <play events store> In order to achieve this with spring cloud stream, I have two different Functions. @Bean public Function<KStream<Integer, String>, KStream<Integer, String>[]> processVideoEvents() {@Bean public Function<KStream<Integer, String>, KStream<Integer, WindowedVideoViewsCountMetric>> processVideoViewEvents() {spring: kafka: bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092} cloud: stream: function: definition: processVideoEvents;processVideoViewEvents bindings: processVideoEvents-in-0: destination: video.video.video-events.json processVideoEvents-out-0: destination: video.video.video-view-events.json processVideoViewEvents-in-0: destination: video.video.video-view-events.json processVideoViewEvents-out-0: destination: video.video.video-views-count.jsonIf I just enable one of them, everything works fine.. however, when I enable both of them to build the data pipeline that I want to achieve, I get the following error: I don't get it why spring is trying to serialize the json string to the output param of the second function when the input param of that function is expecting a KStream<Integer, String>. Any idea? |