From 67c1a1e705ac3695c31140fd9044ed301fd6c93e Mon Sep 17 00:00:00 2001 From: Tyler Auerbeck Date: Mon, 5 Jun 2023 14:09:22 -0400 Subject: [PATCH] Small improvements to events package for nats publishing/subscription (#87) - Adds small helps for setting up your subscriber/publisher configs with NATS - Adds missing definitions for event message event types - Adjusts PublishEventMessage to publish to similar subject name as we expect with PublishChangeMessage - Adjusts NATS to use `DurableCalculator` instead of `QueuePrefix` per NATS/watermill recommendations - This will now use a small function to calculate the name of the consumer instead of just using the queue/topic name. This was causing issues with overlapping consumer names for multiple instances/apps that were listening on the same subject. This now essentially will take the queue name that is provided by the application (`--event-subscriber-queuegroup`) and then concatenates it with a hex encoded string of the full topic name (prefix+calculated topic) to give us a consumer name that is easy to calculate. * Using the DurablePrefix gives us the added benefit of when a member of the queue group drops or rejoins, it will be able to pick back up where the last consume left off vs potentially starting fresh if all members of that group had previously dropped. --------- Signed-off-by: Tyler Auerbeck Signed-off-by: Tyler Auerbeck Co-authored-by: Tyler Auerbeck Co-authored-by: Matt Siwiec --- events/nats.go | 32 +++++++++++++++++++++++++------- events/publisher.go | 9 +++++++-- events/subscriber.go | 5 +++++ 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/events/nats.go b/events/nats.go index ff20860..34abc45 100644 --- a/events/nats.go +++ b/events/nats.go @@ -5,6 +5,9 @@ package events import ( + "crypto/md5" + "encoding/hex" + "github.com/ThreeDotsLabs/watermill-nats/v2/pkg/nats" "github.com/ThreeDotsLabs/watermill/message" "github.com/garsue/watermillzap" @@ -56,14 +59,29 @@ func newNATSSubscriber(cfg SubscriberConfig, logger *zap.SugaredLogger) (message nc.Timeout(cfg.Timeout), } + switch { + case cfg.NATSConfig.CredsFile != "": + options = append(options, nc.UserCredentials(cfg.NATSConfig.CredsFile)) + case cfg.NATSConfig.Token != "": + options = append(options, nc.Token(cfg.NATSConfig.Token)) + } + jsConfig := nats.JetStreamConfig{ - Disabled: false, - AutoProvision: false, - ConnectOptions: nil, - PublishOptions: nil, - TrackMsgId: false, - AckAsync: false, - DurablePrefix: "", + Disabled: false, + AutoProvision: false, + ConnectOptions: nil, + PublishOptions: nil, + SubscribeOptions: nil, + TrackMsgId: false, + AckAsync: false, + DurablePrefix: "", + } + + if cfg.QueueGroup != "" { + jsConfig.DurableCalculator = func(_ string, topic string) string { + hash := md5.Sum([]byte(topic)) + return cfg.QueueGroup + hex.EncodeToString(hash[:]) + } } sub, err := nats.NewSubscriber( diff --git a/events/publisher.go b/events/publisher.go index 399ffbc..069dc7b 100644 --- a/events/publisher.go +++ b/events/publisher.go @@ -89,12 +89,12 @@ func (p *Publisher) PublishChange(ctx context.Context, subjectType string, chang } // PublishEvent will publish an EventMessage to the proper topic for that event -func (p *Publisher) PublishEvent(_ context.Context, event EventMessage) error { +func (p *Publisher) PublishEvent(_ context.Context, subjectType string, event EventMessage) error { if event.EventType == "" { return ErrMissingEventType } - topic := strings.Join([]string{p.prefix, "events", event.EventType}, ".") + topic := strings.Join([]string{p.prefix, "events", subjectType, event.EventType}, ".") event.Source = p.source @@ -107,3 +107,8 @@ func (p *Publisher) PublishEvent(_ context.Context, event EventMessage) error { return p.publisher.Publish(topic, msg) } + +// Close will close the publisher +func (p *Publisher) Close() error { + return p.publisher.Close() +} diff --git a/events/subscriber.go b/events/subscriber.go index 712c002..468b354 100644 --- a/events/subscriber.go +++ b/events/subscriber.go @@ -61,3 +61,8 @@ func (s *Subscriber) SubscribeEvents(ctx context.Context, topic string) (<-chan return s.subscriber.Subscribe(ctx, topic) } + +// Close will close the subscriber +func (s *Subscriber) Close() error { + return s.subscriber.Close() +}