Skip to content

Commit

Permalink
move reply to request message type
Browse files Browse the repository at this point in the history
Signed-off-by: Mike Mason <mimason@equinix.com>
  • Loading branch information
mikemrm committed Aug 3, 2023
1 parent cc189e5 commit 29a1c2c
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 45 deletions.
2 changes: 1 addition & 1 deletion events/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Publisher interface {
// AuthRelationshipSubscriber specifies the auth relationship subscriber methods.
type AuthRelationshipSubscriber interface {
// SubscribeAuthRelationshipRequests subscribes to the provided topic responding with an AuthRelationshipRequest message.
SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Message[AuthRelationshipRequest], error)
SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Request[AuthRelationshipRequest, AuthRelationshipResponse], error)
}

// AuthRelationshipPublisher specifies the auth relationship publisher methods.
Expand Down
12 changes: 8 additions & 4 deletions events/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,18 @@ type Message[T any] interface {
// Error returns any error encountered while decoding the message
Error() error

// ReplyAuthRelationshipRequest publishes an AuthRelationshipResponse message.
// An error is returned if the message is not an AuthRelationshipRequest.
ReplyAuthRelationshipRequest(ctx context.Context, message AuthRelationshipResponse) (Message[AuthRelationshipResponse], error)

// Source returns the underlying message object.
Source() any
}

// Request extends Message by allowing replies to be sent for the received message.
type Request[TRequest, TResponse any] interface {
Message[TRequest]

// Reply publishes a response to the received message.
Reply(ctx context.Context, message TResponse) (Message[TResponse], error)
}

// ChangeType represents the possible event types for a ChangeMessage
type ChangeType string

Expand Down
93 changes: 58 additions & 35 deletions events/nats_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,22 @@ func natsSubscriptionMessageChan[T any](conn *NatsConnection, batchSize int, nat
return msgCh
}

func natsSubscriptionAuthRelationshipRequestChan(conn *NatsConnection, batchSize int, natsCh <-chan *nats.Msg) chan Request[AuthRelationshipRequest, AuthRelationshipResponse] {
msgCh := make(chan Request[AuthRelationshipRequest, AuthRelationshipResponse], batchSize)

go func() {
for nMsg := range natsCh {
msg := natsDecodeMessage[AuthRelationshipRequest](conn, nMsg)

msgCh <- &NatsAuthRelationshipRequest{NatsMessage: msg.(*NatsMessage[AuthRelationshipRequest])}
}

close(msgCh)
}()

return msgCh
}

func natsDecodeMessage[T any](conn *NatsConnection, nMsg *nats.Msg) Message[T] {
msg := &NatsMessage[T]{
conn: conn,
Expand Down Expand Up @@ -130,13 +146,49 @@ func (m *NatsMessage[T]) Error() error {
return nil
}

// ReplyAuthRelationshipRequest responds to an AuthRelationshipRequest with an AuthRelationshipResponse
func (m *NatsMessage[T]) ReplyAuthRelationshipRequest(ctx context.Context, message AuthRelationshipResponse) (Message[AuthRelationshipResponse], error) {
ctx, span := m.conn.tracer.Start(ctx, "events.ReplyAuthRelationshipRequest")
// Source returns the underlying nats message.
func (m *NatsMessage[T]) Source() any {
return m.source
}

defer span.End()
func (m *NatsMessage[T]) publish() error {
return m.conn.conn.PublishMsg(m.source)
}

func (m *NatsMessage[T]) request(ctx context.Context) (Message[AuthRelationshipResponse], error) {
if m.source.Reply == "" {
m.source.Reply = m.conn.conn.NewRespInbox()
}

nMsg, err := m.conn.conn.RequestMsgWithContext(ctx, m.source)
if err != nil {
// ensure we wrap no responder errors with ErrAuthRelationshipNoResponders.
if errors.Is(err, nats.ErrNoResponders) {
return nil, fmt.Errorf("%w: %w", ErrAuthRelationshipNoResponders, err)
}

return nil, err
}

respMsg := natsDecodeMessage[AuthRelationshipResponse](m.conn, nMsg)

return respMsg, nil
}

var _ Request[AuthRelationshipRequest, AuthRelationshipResponse] = (*NatsAuthRelationshipRequest)(nil)

// NatsAuthRelationshipRequest implements Request for AuthRelationshipRequest / AuthRelationshipResponse
type NatsAuthRelationshipRequest struct {
*NatsMessage[AuthRelationshipRequest]
}

// Reply responds to an AuthRelationshipRequest with an AuthRelationshipResponse.
func (r *NatsAuthRelationshipRequest) Reply(ctx context.Context, message AuthRelationshipResponse) (Message[AuthRelationshipResponse], error) {
ctx, span := r.conn.tracer.Start(ctx, "events.Reply")

defer span.End()

if r.source.Reply == "" {
span.RecordError(ErrNatsMessageNoReplySubject)
span.SetStatus(codes.Error, ErrNatsMessageNoReplySubject.Error())

Expand All @@ -157,43 +209,14 @@ func (m *NatsMessage[T]) ReplyAuthRelationshipRequest(ctx context.Context, messa

message.TraceContext = mapCarrier

respMsg, err := newNatsMessage(m.conn, m.source.Reply, message)
respMsg, err := newNatsMessage(r.conn, r.source.Reply, message)
if err != nil {
return nil, err
}

if err := m.source.RespondMsg(respMsg.source); err != nil {
if err := r.source.RespondMsg(respMsg.source); err != nil {
return respMsg, err
}

return respMsg, nil
}

// Source returns the underlying nats message.
func (m *NatsMessage[T]) Source() any {
return m.source
}

func (m *NatsMessage[T]) publish() error {
return m.conn.conn.PublishMsg(m.source)
}

func (m *NatsMessage[T]) request(ctx context.Context) (Message[AuthRelationshipResponse], error) {
if m.source.Reply == "" {
m.source.Reply = m.conn.conn.NewRespInbox()
}

nMsg, err := m.conn.conn.RequestMsgWithContext(ctx, m.source)
if err != nil {
// ensure we wrap no responder errors with ErrAuthRelationshipNoResponders.
if errors.Is(err, nats.ErrNoResponders) {
return nil, fmt.Errorf("%w: %w", ErrAuthRelationshipNoResponders, err)
}

return nil, err
}

respMsg := natsDecodeMessage[AuthRelationshipResponse](m.conn, nMsg)

return respMsg, nil
}
4 changes: 2 additions & 2 deletions events/nats_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (c *NatsConnection) nextMessage(ctx context.Context, sub *nats.Subscription
}

// SubscribeAuthRelationshipRequests creates a new pull subscription parsing incoming messages as AuthRelationshipRequest messages and returning a new Message channel.
func (c *NatsConnection) SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Message[AuthRelationshipRequest], error) {
func (c *NatsConnection) SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Request[AuthRelationshipRequest, AuthRelationshipResponse], error) {
topic = c.buildSubscribeSubject("auth", "relationships", topic)

natsCh, err := c.coreSubscribe(ctx, topic)
Expand All @@ -148,7 +148,7 @@ func (c *NatsConnection) SubscribeAuthRelationshipRequests(ctx context.Context,

c.logger.Debugf("subscribing to auth relation request message on topic %s", topic)

return natsSubscriptionMessageChan[AuthRelationshipRequest](c, c.cfg.SubscriberFetchBatchSize, natsCh), nil
return natsSubscriptionAuthRelationshipRequestChan(c, c.cfg.SubscriberFetchBatchSize, natsCh), nil
}

// SubscribeChanges creates a new pull subscription parsing incoming messages as ChangeMessage messages and returning a new Message channel.
Expand Down
2 changes: 1 addition & 1 deletion events/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func TestNatsRequestReply(t *testing.T) {

reqGot <- reqMsg

respMsg, err := reqMsg.ReplyAuthRelationshipRequest(ctx, authResponse)
respMsg, err := reqMsg.Reply(ctx, authResponse)
assert.NoError(t, err)
assert.NotNil(t, respMsg)
case <-time.After(time.Second * 2):
Expand Down
4 changes: 2 additions & 2 deletions testing/eventtools/mock_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ func (c *MockConnection) Source() any {
}

// SubscribeAuthRelationshipRequests implements events.Connection
func (c *MockConnection) SubscribeAuthRelationshipRequests(_ context.Context, topic string) (<-chan events.Message[events.AuthRelationshipRequest], error) {
func (c *MockConnection) SubscribeAuthRelationshipRequests(_ context.Context, topic string) (<-chan events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse], error) {
args := c.Called(topic)

return args.Get(0).(<-chan events.Message[events.AuthRelationshipRequest]), args.Error(1)
return args.Get(0).(<-chan events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse]), args.Error(1)
}

// SubscribeChanges implements events.Connection
Expand Down

0 comments on commit 29a1c2c

Please sign in to comment.