Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple StreamsRegistry in same application #3

Open
AntoineDuComptoirDesPharmacies opened this issue Aug 11, 2022 · 1 comment
Open

Comments

@AntoineDuComptoirDesPharmacies

Hi,

We are currently using StreamsRegistry to monitor a KafkaStream in our application.
We would like to add a new KafkaStream in the same application and, for separation of concerns, we tried to create a new StreamsRegistry for this second stream. However, the registry crash on register with the following error :

application-akka.actor.default-dispatcher-19 - [error] - akka.dispatch.TaskInvocation - Attempting to call unbound fn: #'io.operatr.kpow
.agent/init-registry
java.lang.IllegalStateException: Attempting to call unbound fn: #'io.operatr.kpow.agent/init-registry
        at clojure.lang.Var$Unbound.throwArity(Var.java:45)
        at clojure.lang.AFn.invoke(AFn.java:32)
        at clojure.lang.Var.invoke(Var.java:384)
        at io.operatr.kpow.StreamsRegistry.<init>(StreamsRegistry.java:83)
        at events.KafkaStreamEventProcessor.run(KafkaStreamEventProcessor.java:176)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
[ERROR] [08/11/2022 07:55:20.654] [application-akka.actor.kafka-consumers-dispatcher-15] [TaskInvocation] Attempting to call unbound fn:
 #'io.operatr.kpow.agent/init-registry
java.lang.IllegalStateException: Attempting to call unbound fn: #'io.operatr.kpow.agent/init-registry
        at clojure.lang.Var$Unbound.throwArity(Var.java:45)
        at clojure.lang.AFn.invoke(AFn.java:32)
        at clojure.lang.Var.invoke(Var.java:384)
        at io.operatr.kpow.StreamsRegistry.<init>(StreamsRegistry.java:83)
        at events.KafkaStreamEventProcessor.run(KafkaStreamEventProcessor.java:176)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

We are wondering if StreamsRegistry is meant to be instantiated only one time per application or if we did a mistake while declaring/using it ?

Thanks in advance for your answer !
Yours faithfully,
LCDP

@wavejumper
Copy link
Contributor

wavejumper commented Aug 11, 2022

Hey there @AntoineDuComptoirDesPharmacies,

That seems like a bug to me! You should be able to have multiple instances of the streams agent configured.

There are two patterns for using the streams agent:


Pattern 1: using a single StreamsRegistry instance (recommended)

You should be able to use a single StreamsRegistry instance and register multiple streams against it (example below). This has the advantage of using less resources (threads, kafka producers etc).

import io.operatr.kpow.StreamsRegistry;

// Your Kafka Streams topology
Topology topologyA = createMyTopology(); 
Topology topologyB = createMyOtherTopology(); 


// Your Kafka Streams config
Properties props = new createMyStreamProperties();
 
// Your Kafka Streams instance
KafkaStreams streamsA = new KafkaStreams(topology, props); 
KafkaStreams streamsA = new KafkaStreams(topology, props); 


// Create a kPow StreamsRegistry
StreamsRegistry registry = new StreamsRegistry(props);

// Register your KafkaStreams and Topology instances with the StreamsRegistry
registry.register(streamsA, topologyA); 
registry.register(streamsB, topologyB); 

// Start your Kafka Streams application
streams.start();

Pattern 2: creating multiple StreamsRegistry instances

import io.operatr.kpow.StreamsRegistry;

// Your Kafka Streams topology
Topology topologyA = createMyTopology(); 
Topology topologyB = createMyOtherTopology(); 


// Your Kafka Streams config
Properties props = new createMyStreamProperties();
 
// Your Kafka Streams instance
KafkaStreams streamsA = new KafkaStreams(topology, props); 
KafkaStreams streamsA = new KafkaStreams(topology, props); 


// Create a kPow StreamsRegistry
StreamsRegistry registryA = new StreamsRegistry(props);
StreamsRegistry registryB = new StreamsRegistry(props);


// Register your KafkaStreams and Topology instances with the StreamsRegistry
registryA.register(streamsA, topologyA); 
registryB.register(streamsB, topologyB); 

// Start your Kafka Streams application
streams.start();

This pattern should also be supported, but is looking to cause a runtime exception for you. We will try and reproduce and get a fix for you.

In the meantime, if you can try and re-usue a single instance of StreamsRegistry, that would be the preferred way of using the streams agent (for the performance reasons outlined).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants