Skip to content

Commit

Permalink
add context timeout on cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
lerenn committed Sep 22, 2023
1 parent 534c87f commit 7c60eb6
Show file tree
Hide file tree
Showing 14 changed files with 73 additions and 46 deletions.
5 changes: 3 additions & 2 deletions examples/helloworld/nats/app/app.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions examples/ping/kafka/app/app.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 9 additions & 6 deletions examples/ping/kafka/user/user.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions examples/ping/nats/app/app.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 9 additions & 6 deletions examples/ping/nats/user/user.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 9 additions & 6 deletions pkg/codegen/generators/templates/controller.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ func (c *{{ $.Prefix }}Controller) Subscribe{{namify $key}}(ctx context.Context,
return nil
}

// Unsubscribe{{namify $key}} will unsubscribe messages from '{{$key}}' channel
// Unsubscribe{{namify $key}} will unsubscribe messages from '{{$key}}' channel.
// A timeout can be set in context to avoid blocking operation, if needed.
{{- if .Parameters}}
func (c *{{ $.Prefix }}Controller) Unsubscribe{{namify $key}}(ctx context.Context, params {{namify $key}}Parameters) {
{{- else}}
Expand All @@ -205,7 +206,7 @@ func (c *{{ $.Prefix }}Controller) Unsubscribe{{namify $key}}(ctx context.Contex
ctx = add{{ $.Prefix }}ContextValues(ctx, path)

// Stop the subscription
sub.Cancel()
sub.Cancel(ctx)

// Remove if from the subscribers
delete(c.subscriptions, path)
Expand Down Expand Up @@ -261,10 +262,12 @@ func (c *{{ $.Prefix }}Controller) Publish{{namify $key}}(ctx context.Context, m
{{if eq .Prefix "User" -}}
{{- range $key, $value := .SubscribeChannels -}}
{{- if ne $value.Subscribe.Message.CorrelationIDLocation ""}}
// WaitFor{{namify $key}} will wait for a specific message by its correlation ID
// WaitFor{{namify $key}} will wait for a specific message by its correlation ID.
//
// The pub function is the publication function that should be used to send the message
// It will be called after subscribing to the channel to avoid race condition, and potentially loose the message
// The pub function is the publication function that should be used to send the message.
// It will be called after subscribing to the channel to avoid race condition, and potentially loose the message.
//
// A timeout can be set in context to avoid blocking operation, if needed.
{{- if .Parameters}}
func (cc *UserController) WaitFor{{namify $key}}(ctx context.Context, params {{namify $key}}Parameters, publishMsg MessageWithCorrelationID, pub func(ctx context.Context) error) ({{channelToMessageTypeName $value}}, error) {
{{- else}}
Expand All @@ -287,7 +290,7 @@ func (cc *UserController) WaitFor{{namify $key}}(ctx context.Context, publishMsg
// Close subscriber on leave
defer func(){
// Stop the subscription
sub.Cancel()
sub.Cancel(ctx)

// Logging unsubscribing
cc.logger.Info(ctx, "Unsubscribed from channel")
Expand Down
7 changes: 4 additions & 3 deletions pkg/extensions/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ func (bcs BrokerChannelSubscription) WaitForCancellationAsync(cleanup func()) {
// Cancel cancels the subscription from user perspective. It will ask for clean
// up on broker, which will return when finished to avoid dangling resources, such
// as non-existent queue listeners on (broker) server side.
func (bcs BrokerChannelSubscription) Cancel() {
func (bcs BrokerChannelSubscription) Cancel(ctx context.Context) {
// Send a cancellation request
bcs.cancel <- true

// Wait for the cancellation to be effective
for open := true; open; {
_, open = <-bcs.cancel
select {
case <-bcs.cancel:
case <-ctx.Done():
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/extensions/versioning/broker_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func newBrokerSubscription(
}
}

func (bs *brokerSubscription) createVersionListener(version string) (versionSubcription, error) {
func (bs *brokerSubscription) createVersionListener(ctx context.Context, version string) (versionSubcription, error) {
// Lock the versions to avoid conflict
bs.versionsMutex.Lock()
defer bs.versionsMutex.Unlock()
Expand All @@ -44,12 +44,12 @@ func (bs *brokerSubscription) createVersionListener(version string) (versionSubc
// Create the channels necessary
cbv := newVersionSubscription(version, bs)
bs.versionsChannels[version] = cbv
defer cbv.launchListener()
defer cbv.launchListener(ctx)

return cbv, nil
}

func (bs *brokerSubscription) removeVersionListener(vs *versionSubcription) {
func (bs *brokerSubscription) removeVersionListener(ctx context.Context, vs *versionSubcription) {
// Lock the versions to avoid conflict
bs.versionsMutex.Lock()
defer bs.versionsMutex.Unlock()
Expand All @@ -67,7 +67,7 @@ func (bs *brokerSubscription) removeVersionListener(vs *versionSubcription) {
}

// Otherwise cancel the broker listener
bs.subscription.Cancel()
bs.subscription.Cancel(ctx)

// Then delete the channelsByBroker from the Version Switch Wrapper
delete(bs.parent.channels, bs.channel)
Expand Down
12 changes: 10 additions & 2 deletions pkg/extensions/versioning/version_subscription.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package versioning

import (
"context"
"time"

"github.com/lerenn/asyncapi-codegen/pkg/extensions"
"github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers"
)
Expand All @@ -22,9 +25,14 @@ func newVersionSubscription(version string, parent *brokerSubscription) versionS
}
}

func (vs *versionSubcription) launchListener() {
func (vs *versionSubcription) launchListener(ctx context.Context) {
// Wait for cancellation and remove version listener when it happens
vs.subscription.WaitForCancellationAsync(func() {
vs.parent.removeVersionListener(vs)
// Create cancel function in case there is a problem with broker removal
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

// Remove the version listener
vs.parent.removeVersionListener(ctx, vs)
})
}
2 changes: 1 addition & 1 deletion pkg/extensions/versioning/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (w *Wrapper) Subscribe(ctx context.Context, channel string) (extensions.Bro
}

// Check if the version already exists
cbv, err := brokerChannel.createVersionListener(version)
cbv, err := brokerChannel.createVersionListener(ctx, version)

return cbv.subscription, err
}
Expand Down
15 changes: 9 additions & 6 deletions test/issues/49/asyncapi.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions test/issues/73/v1/asyncapi.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions test/issues/73/v2/asyncapi.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions test/issues/74/asyncapi.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 7c60eb6

Please sign in to comment.