From a7b127e4402353ef046db2ff347f47e56bda7bd7 Mon Sep 17 00:00:00 2001 From: Mike Mason Date: Thu, 3 Aug 2023 15:57:51 +0000 Subject: [PATCH] move reply to request message type Signed-off-by: Mike Mason --- events/connection.go | 2 +- events/message.go | 12 ++-- events/nats_message.go | 93 +++++++++++++++++---------- events/nats_subscribe.go | 4 +- events/nats_test.go | 2 +- testing/eventtools/mock_connection.go | 4 +- 6 files changed, 72 insertions(+), 45 deletions(-) diff --git a/events/connection.go b/events/connection.go index 6d553311..c51b55f4 100644 --- a/events/connection.go +++ b/events/connection.go @@ -40,7 +40,7 @@ type Publisher interface { // AuthRelationshipSubscriber specifies the auth relationship subscriber methods. type AuthRelationshipSubscriber interface { // SubscribeAuthRelationshipRequests subscribes to the provided topic responding with an AuthRelationshipRequest message. - SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Message[AuthRelationshipRequest], error) + SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Request[AuthRelationshipRequest, AuthRelationshipResponse], error) } // AuthRelationshipPublisher specifies the auth relationship publisher methods. diff --git a/events/message.go b/events/message.go index 2fc9f5fa..15a7be00 100644 --- a/events/message.go +++ b/events/message.go @@ -53,14 +53,18 @@ type Message[T any] interface { // Error returns any error encountered while decoding the message Error() error - // ReplyAuthRelationshipRequest publishes an AuthRelationshipResponse message. - // An error is returned if the message is not an AuthRelationshipRequest. - ReplyAuthRelationshipRequest(ctx context.Context, message AuthRelationshipResponse) (Message[AuthRelationshipResponse], error) - // Source returns the underlying message object. Source() any } +// Request extends Message by allowing replies to be sent for the received message. +type Request[TRequest, TResponse any] interface { + Message[TRequest] + + // Reply publishes a response to the received message. + Reply(ctx context.Context, message TResponse) (Message[TResponse], error) +} + // ChangeType represents the possible event types for a ChangeMessage type ChangeType string diff --git a/events/nats_message.go b/events/nats_message.go index 26484d60..1df903eb 100644 --- a/events/nats_message.go +++ b/events/nats_message.go @@ -30,6 +30,22 @@ func natsSubscriptionMessageChan[T any](conn *NatsConnection, batchSize int, nat return msgCh } +func natsSubscriptionAuthRelationshipRequestChan(conn *NatsConnection, batchSize int, natsCh <-chan *nats.Msg) chan Request[AuthRelationshipRequest, AuthRelationshipResponse] { + msgCh := make(chan Request[AuthRelationshipRequest, AuthRelationshipResponse], batchSize) + + go func() { + for nMsg := range natsCh { + msg := natsDecodeMessage[AuthRelationshipRequest](conn, nMsg) + + msgCh <- &NatsAuthRelationshipRequest{NatsMessage: msg.(*NatsMessage[AuthRelationshipRequest])} + } + + close(msgCh) + }() + + return msgCh +} + func natsDecodeMessage[T any](conn *NatsConnection, nMsg *nats.Msg) Message[T] { msg := &NatsMessage[T]{ conn: conn, @@ -125,13 +141,49 @@ func (m *NatsMessage[T]) Error() error { return nil } -// ReplyAuthRelationshipRequest responds to an AuthRelationshipRequest with an AuthRelationshipResponse -func (m *NatsMessage[T]) ReplyAuthRelationshipRequest(ctx context.Context, message AuthRelationshipResponse) (Message[AuthRelationshipResponse], error) { - ctx, span := m.conn.tracer.Start(ctx, "events.ReplyAuthRelationshipRequest") +// Source returns the underlying nats message. +func (m *NatsMessage[T]) Source() any { + return m.source +} - defer span.End() +func (m *NatsMessage[T]) publish() error { + return m.conn.conn.PublishMsg(m.source) +} +func (m *NatsMessage[T]) request(ctx context.Context) (Message[AuthRelationshipResponse], error) { if m.source.Reply == "" { + m.source.Reply = m.conn.conn.NewRespInbox() + } + + nMsg, err := m.conn.conn.RequestMsgWithContext(ctx, m.source) + if err != nil { + // ensure we wrap no responder errors with ErrRequestNoResponders. + if errors.Is(err, nats.ErrNoResponders) { + return nil, fmt.Errorf("%w: %w", ErrRequestNoResponders, err) + } + + return nil, err + } + + respMsg := natsDecodeMessage[AuthRelationshipResponse](m.conn, nMsg) + + return respMsg, nil +} + +var _ Request[AuthRelationshipRequest, AuthRelationshipResponse] = (*NatsAuthRelationshipRequest)(nil) + +// NatsAuthRelationshipRequest implements Request for AuthRelationshipRequest / AuthRelationshipResponse +type NatsAuthRelationshipRequest struct { + *NatsMessage[AuthRelationshipRequest] +} + +// Reply responds to an AuthRelationshipRequest with an AuthRelationshipResponse. +func (r *NatsAuthRelationshipRequest) Reply(ctx context.Context, message AuthRelationshipResponse) (Message[AuthRelationshipResponse], error) { + ctx, span := r.conn.tracer.Start(ctx, "events.Reply") + + defer span.End() + + if r.source.Reply == "" { span.RecordError(ErrNatsMessageNoReplySubject) span.SetStatus(codes.Error, ErrNatsMessageNoReplySubject.Error()) @@ -152,43 +204,14 @@ func (m *NatsMessage[T]) ReplyAuthRelationshipRequest(ctx context.Context, messa message.TraceContext = mapCarrier - respMsg, err := newNatsMessage(m.conn, m.source.Reply, message) + respMsg, err := newNatsMessage(r.conn, r.source.Reply, message) if err != nil { return nil, err } - if err := m.source.RespondMsg(respMsg.source); err != nil { + if err := r.source.RespondMsg(respMsg.source); err != nil { return respMsg, err } return respMsg, nil } - -// Source returns the underlying nats message. -func (m *NatsMessage[T]) Source() any { - return m.source -} - -func (m *NatsMessage[T]) publish() error { - return m.conn.conn.PublishMsg(m.source) -} - -func (m *NatsMessage[T]) request(ctx context.Context) (Message[AuthRelationshipResponse], error) { - if m.source.Reply == "" { - m.source.Reply = m.conn.conn.NewRespInbox() - } - - nMsg, err := m.conn.conn.RequestMsgWithContext(ctx, m.source) - if err != nil { - // ensure we wrap no responder errors with ErrRequestNoResponders. - if errors.Is(err, nats.ErrNoResponders) { - return nil, fmt.Errorf("%w: %w", ErrRequestNoResponders, err) - } - - return nil, err - } - - respMsg := natsDecodeMessage[AuthRelationshipResponse](m.conn, nMsg) - - return respMsg, nil -} diff --git a/events/nats_subscribe.go b/events/nats_subscribe.go index 2b440b61..8b82447d 100644 --- a/events/nats_subscribe.go +++ b/events/nats_subscribe.go @@ -138,7 +138,7 @@ func (c *NatsConnection) nextMessage(ctx context.Context, sub *nats.Subscription } // SubscribeAuthRelationshipRequests creates a new pull subscription parsing incoming messages as AuthRelationshipRequest messages and returning a new Message channel. -func (c *NatsConnection) SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Message[AuthRelationshipRequest], error) { +func (c *NatsConnection) SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Request[AuthRelationshipRequest, AuthRelationshipResponse], error) { topic = c.buildSubscribeSubject("auth", "relationships", topic) natsCh, err := c.coreSubscribe(ctx, topic) @@ -148,7 +148,7 @@ func (c *NatsConnection) SubscribeAuthRelationshipRequests(ctx context.Context, c.logger.Debugf("subscribing to auth relation request message on topic %s", topic) - return natsSubscriptionMessageChan[AuthRelationshipRequest](c, c.cfg.SubscriberFetchBatchSize, natsCh), nil + return natsSubscriptionAuthRelationshipRequestChan(c, c.cfg.SubscriberFetchBatchSize, natsCh), nil } // SubscribeChanges creates a new pull subscription parsing incoming messages as ChangeMessage messages and returning a new Message channel. diff --git a/events/nats_test.go b/events/nats_test.go index f3fed869..ad829736 100644 --- a/events/nats_test.go +++ b/events/nats_test.go @@ -130,7 +130,7 @@ func TestNatsRequestReply(t *testing.T) { reqGot <- reqMsg - respMsg, err := reqMsg.ReplyAuthRelationshipRequest(ctx, authResponse) + respMsg, err := reqMsg.Reply(ctx, authResponse) assert.NoError(t, err) assert.NotNil(t, respMsg) case <-time.After(time.Second * 2): diff --git a/testing/eventtools/mock_connection.go b/testing/eventtools/mock_connection.go index 0e068dc0..8f213fb6 100644 --- a/testing/eventtools/mock_connection.go +++ b/testing/eventtools/mock_connection.go @@ -51,10 +51,10 @@ func (c *MockConnection) Source() any { } // SubscribeAuthRelationshipRequests implements events.Connection -func (c *MockConnection) SubscribeAuthRelationshipRequests(_ context.Context, topic string) (<-chan events.Message[events.AuthRelationshipRequest], error) { +func (c *MockConnection) SubscribeAuthRelationshipRequests(_ context.Context, topic string) (<-chan events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse], error) { args := c.Called(topic) - return args.Get(0).(<-chan events.Message[events.AuthRelationshipRequest]), args.Error(1) + return args.Get(0).(<-chan events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse]), args.Error(1) } // SubscribeChanges implements events.Connection