Skip to content

Commit

Permalink
hide channels in subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
lerenn committed Sep 22, 2023
1 parent 17a0fb7 commit 534c87f
Show file tree
Hide file tree
Showing 15 changed files with 155 additions and 158 deletions.
7 changes: 3 additions & 4 deletions examples/helloworld/nats/app/app.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions examples/ping/kafka/app/app.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 6 additions & 8 deletions examples/ping/kafka/user/user.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions examples/ping/nats/app/app.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 6 additions & 8 deletions examples/ping/nats/user/user.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 6 additions & 8 deletions pkg/codegen/generators/templates/controller.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (c *{{ $.Prefix }}Controller) Subscribe{{namify $key}}(ctx context.Context,
go func() {
for {
// Wait for next message
bMsg, open := <-sub.Messages
bMsg, open := <-sub.MessagesChannel()

// If subscription is closed and there is no more message
// (i.e. uninitialized message), then exit the function
Expand Down Expand Up @@ -204,9 +204,8 @@ func (c *{{ $.Prefix }}Controller) Unsubscribe{{namify $key}}(ctx context.Contex
// Set context
ctx = add{{ $.Prefix }}ContextValues(ctx, path)

// Stop the subscription and wait for its closure to be complete
sub.Cancel <- true
<- sub.Cancel
// Stop the subscription
sub.Cancel()

// Remove if from the subscribers
delete(c.subscriptions, path)
Expand Down Expand Up @@ -287,9 +286,8 @@ func (cc *UserController) WaitFor{{namify $key}}(ctx context.Context, publishMsg

// Close subscriber on leave
defer func(){
// Stop the subscription and wait for its closure to be complete
sub.Cancel <- true
<- sub.Cancel
// Stop the subscription
sub.Cancel()

// Logging unsubscribing
cc.logger.Info(ctx, "Unsubscribed from channel")
Expand All @@ -303,7 +301,7 @@ func (cc *UserController) WaitFor{{namify $key}}(ctx context.Context, publishMsg
// Wait for corresponding response
for {
select {
case bMsg, open := <-sub.Messages:
case bMsg, open := <-sub.MessagesChannel():
// If subscription is closed and there is no more message
// (i.e. uninitialized message), then the subscription ended before
// receiving the expected message
Expand Down
56 changes: 54 additions & 2 deletions pkg/extensions/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,60 @@ import (
// BrokerChannelSubscription is a struct that contains every returned structures
// when subscribing a channel.
type BrokerChannelSubscription struct {
Messages chan BrokerMessage
Cancel chan any
messages chan BrokerMessage
cancel chan any
}

// NewBrokerChannelSubscription creates a new broker channel subscription based
// on the channels used to receive message and cancel the subscription.
func NewBrokerChannelSubscription(messages chan BrokerMessage, cancel chan any) BrokerChannelSubscription {
return BrokerChannelSubscription{
messages: messages,
cancel: cancel,
}
}

// TransmitReceivedMessage should only be used by the broker to transmit the
// new received messages to the user.
func (bcs BrokerChannelSubscription) TransmitReceivedMessage(msg BrokerMessage) {
bcs.messages <- msg
}

// MessagesChannel returns the channel that will get the received messages from
// broker and from which the user should listen.
func (bcs BrokerChannelSubscription) MessagesChannel() <-chan BrokerMessage {
return bcs.messages
}

// WaitForCancellationAsync should be used by the broker only to wait for user request
// for cancellation. As it is asynchronous, it will return immediately after the call.
func (bcs BrokerChannelSubscription) WaitForCancellationAsync(cleanup func()) {
go func() {
// Wait for cancel request
<-bcs.cancel

// Execute cleanup function
cleanup()

// Close messages in order to avoid new messages
close(bcs.messages)

// Close cancel to let listeners know that the cancellation is complete
close(bcs.cancel)
}()
}

// Cancel cancels the subscription from user perspective. It will ask for clean
// up on broker, which will return when finished to avoid dangling resources, such
// as non-existent queue listeners on (broker) server side.
func (bcs BrokerChannelSubscription) Cancel() {
// Send a cancellation request
bcs.cancel <- true

// Wait for the cancellation to be effective
for open := true; open; {
_, open = <-bcs.cancel
}
}

// BrokerMessage is a wrapper that will contain all information regarding a message.
Expand Down
72 changes: 31 additions & 41 deletions pkg/extensions/brokers/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,57 +133,47 @@ func (c *Controller) Subscribe(ctx context.Context, channel string) (extensions.
GroupID: c.groupID,
})

// Handle events
messages := make(chan extensions.BrokerMessage, brokers.BrokerMessagesQueueSize)
cancel := make(chan any, 1)
go func() {
for {
c.messagesHandler(ctx, r, messages)
}
}()
// Create subscription
sub := extensions.NewBrokerChannelSubscription(
make(chan extensions.BrokerMessage, brokers.BrokerMessagesQueueSize),
make(chan any, 1),
)

go func() {
// Wait for cancel request
<-cancel
// Handle events
go c.messagesHandler(ctx, r, sub)

// Stopping the Kafka listener
// Wait for cancellation and stop the kafka listener when it happens
sub.WaitForCancellationAsync(func() {
if err := r.Close(); err != nil {
c.logger.Error(ctx, err.Error())
}
})

// Close messages in order to avoid new messages
close(messages)

// Close cancel to let listeners know that the cancellation is complete
close(cancel)
}()

return extensions.BrokerChannelSubscription{
Messages: messages,
Cancel: cancel,
}, nil
return sub, nil
}

func (c *Controller) messagesHandler(ctx context.Context, r *kafka.Reader, messages chan extensions.BrokerMessage) {
msg, err := r.ReadMessage(ctx)
if err != nil {
// If the error is not io.EOF, then it is a real error
if !errors.Is(err, io.EOF) {
c.logger.Warning(ctx, fmt.Sprintf("Error when reading message: %q", err.Error()))
func (c *Controller) messagesHandler(ctx context.Context, r *kafka.Reader, sub extensions.BrokerChannelSubscription) {
for {
msg, err := r.ReadMessage(ctx)
if err != nil {
// If the error is not io.EOF, then it is a real error
if !errors.Is(err, io.EOF) {
c.logger.Warning(ctx, fmt.Sprintf("Error when reading message: %q", err.Error()))
}

return
}

return
}

// Get headers
headers := make(map[string][]byte, len(msg.Headers))
for _, header := range msg.Headers {
headers[header.Key] = header.Value
}
// Get headers
headers := make(map[string][]byte, len(msg.Headers))
for _, header := range msg.Headers {
headers[header.Key] = header.Value
}

// Create message
messages <- extensions.BrokerMessage{
Headers: headers,
Payload: msg.Value,
// Send received message
sub.TransmitReceivedMessage(extensions.BrokerMessage{
Headers: headers,
Payload: msg.Value,
})
}
}
Loading

0 comments on commit 534c87f

Please sign in to comment.