Skip to content

Commit

Permalink
Small improvements to events package for nats publishing/subscription (
Browse files Browse the repository at this point in the history
…#87)

- Adds small helps for setting up your subscriber/publisher configs with
NATS
- Adds missing definitions for event message event types
- Adjusts PublishEventMessage to publish to similar subject name as we
expect with PublishChangeMessage
- Adjusts NATS to use `DurableCalculator` instead of `QueuePrefix` per
NATS/watermill recommendations
- This will now use a small function to calculate the name of the
consumer instead of just using the queue/topic name. This was causing
issues with overlapping consumer names for multiple instances/apps that
were listening on the same subject. This now essentially will take the
queue name that is provided by the application
(`--event-subscriber-queuegroup`) and then concatenates it with a hex
encoded string of the full topic name (prefix+calculated topic) to give
us a consumer name that is easy to calculate.

* Using the DurablePrefix gives us the added benefit of when a member of
the queue group drops or rejoins, it will be able to pick back up where
the last consume left off vs potentially starting fresh if all members
of that group had previously dropped.

---------

Signed-off-by: Tyler Auerbeck <tylerauerbeck@users.noreply.github.com>
Signed-off-by: Tyler Auerbeck <tauerbeck@equinix.com>
Co-authored-by: Tyler Auerbeck <tylerauerbeck@users.noreply.github.com>
Co-authored-by: Matt Siwiec <rizzza@users.noreply.github.com>
  • Loading branch information
3 people committed Jun 5, 2023
1 parent cb717fb commit 67c1a1e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 9 deletions.
32 changes: 25 additions & 7 deletions events/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
package events

import (
"crypto/md5"
"encoding/hex"

"github.com/ThreeDotsLabs/watermill-nats/v2/pkg/nats"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/garsue/watermillzap"
Expand Down Expand Up @@ -56,14 +59,29 @@ func newNATSSubscriber(cfg SubscriberConfig, logger *zap.SugaredLogger) (message
nc.Timeout(cfg.Timeout),
}

switch {
case cfg.NATSConfig.CredsFile != "":
options = append(options, nc.UserCredentials(cfg.NATSConfig.CredsFile))
case cfg.NATSConfig.Token != "":
options = append(options, nc.Token(cfg.NATSConfig.Token))
}

jsConfig := nats.JetStreamConfig{
Disabled: false,
AutoProvision: false,
ConnectOptions: nil,
PublishOptions: nil,
TrackMsgId: false,
AckAsync: false,
DurablePrefix: "",
Disabled: false,
AutoProvision: false,
ConnectOptions: nil,
PublishOptions: nil,
SubscribeOptions: nil,
TrackMsgId: false,
AckAsync: false,
DurablePrefix: "",
}

if cfg.QueueGroup != "" {
jsConfig.DurableCalculator = func(_ string, topic string) string {
hash := md5.Sum([]byte(topic))
return cfg.QueueGroup + hex.EncodeToString(hash[:])
}
}

sub, err := nats.NewSubscriber(
Expand Down
9 changes: 7 additions & 2 deletions events/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ func (p *Publisher) PublishChange(ctx context.Context, subjectType string, chang
}

// PublishEvent will publish an EventMessage to the proper topic for that event
func (p *Publisher) PublishEvent(_ context.Context, event EventMessage) error {
func (p *Publisher) PublishEvent(_ context.Context, subjectType string, event EventMessage) error {
if event.EventType == "" {
return ErrMissingEventType
}

topic := strings.Join([]string{p.prefix, "events", event.EventType}, ".")
topic := strings.Join([]string{p.prefix, "events", subjectType, event.EventType}, ".")

event.Source = p.source

Expand All @@ -107,3 +107,8 @@ func (p *Publisher) PublishEvent(_ context.Context, event EventMessage) error {

return p.publisher.Publish(topic, msg)
}

// Close will close the publisher
func (p *Publisher) Close() error {
return p.publisher.Close()
}
5 changes: 5 additions & 0 deletions events/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,8 @@ func (s *Subscriber) SubscribeEvents(ctx context.Context, topic string) (<-chan

return s.subscriber.Subscribe(ctx, topic)
}

// Close will close the subscriber
func (s *Subscriber) Close() error {
return s.subscriber.Close()
}

0 comments on commit 67c1a1e

Please sign in to comment.