diff --git a/Makefile b/Makefile index a3aff42a..c8e4cd3e 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ lint/fix: ## Fix what can be fixed regarding the linter clean: __examples/clean brokers/down ## Clean up the project .PHONY: check -check: clean generate lint examples test ## Check that everything is ready for commit +check: generate lint clean examples test ## Check that everything is ready for commit .PHONY: __examples/clean __examples/clean: @@ -34,7 +34,7 @@ examples: brokers/up ## Perform examples .PHONY: test test: brokers/up ## Perform tests - @go test ./... -timeout=30s + @go test ./... -p 1 -timeout=1m .PHONY: generate generate: ## Generate files diff --git a/README.md b/README.md index 366221c9..543cd3c8 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,11 @@ Generate Go application and user boilerplate from AsyncAPI specifications. * [Custom broker](#custom-broker) * [CLI options](#cli-options) * [Advanced topics](#advanced-topics) + * [Middlewares](#middlewares) + * [Context](#context) + * [Logging](#logging) + * [Versioning](#versioning) + * [AsyncAPI Extensions](#asyncapi-extensions) * [Contributing and support](#contributing-and-support) ## Supported functionalities @@ -37,6 +42,8 @@ Generate Go application and user boilerplate from AsyncAPI specifications. * Elastic Common Schema (JSON) * Text (Humand readable) * Custom +* Others: + * Versioning support ## Usage @@ -172,22 +179,24 @@ code with NATS (you can also find it [here](./examples/helloworld/nats/app/main. ```go import( "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/nats" - /* ... */ + // ... ) -// Create a new application controller -ctrl, _ := NewAppController(nats.NewController("nats://nats:4222")) -defer ctrl.Close(context.Background()) - -// Subscribe to HelloWorld messages -// Note: it will indefinitely wait for messages as context has no timeout -log.Println("Subscribe to hello world...") -ctrl.SubscribeHello(context.Background(), func(_ context.Context, msg HelloMessage, _ bool) { - log.Println("Received message:", msg.Payload) -}) - -// Process messages until interruption signal -/* ... */ +func main() { + // Create a new application controller + ctrl, _ := NewAppController(nats.NewController("nats://nats:4222")) + defer ctrl.Close(context.Background()) + + // Subscribe to HelloWorld messages + // Note: it will indefinitely wait for messages as context has no timeout + log.Println("Subscribe to hello world...") + ctrl.SubscribeHello(context.Background(), func(_ context.Context, msg HelloMessage, _ bool) { + log.Println("Received message:", msg.Payload) + }) + + // Process messages until interruption signal + // ... +} ``` #### User @@ -221,16 +230,20 @@ code with NATS (you can also find it [here](./examples/helloworld/nats/app/main. ```go import( "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/nats" - /* ... */ + // ... ) -// Create a new user controller -ctrl, _ := NewUserController(nats.NewController("nats://nats:4222")) -defer ctrl.Close(context.Background()) +func main() { + // Create a new user controller + ctrl, _ := NewUserController(nats.NewController("nats://nats:4222")) + defer ctrl.Close(context.Background()) -// Send HelloWorld -log.Println("Publishing 'hello world' message") -ctrl.PublishHello(context.Background(), HelloMessage{Payload: "HelloWorld!"}) + // Send HelloWorld + log.Println("Publishing 'hello world' message") + ctrl.PublishHello(context.Background(), HelloMessage{Payload: "HelloWorld!"}) + + // ... +} ``` #### Types @@ -288,7 +301,7 @@ func (s Subscriber) Ping(req PingMessage, _ bool) { } func main() { - /* ... */ + // ... // Create a new application controller ctrl, _ := NewAppController(/* Add corresponding broker controller */) @@ -299,7 +312,7 @@ func main() { ctrl.SubscribeAll(context.Background(), sub) // Process messages until interruption signal - /* ... */ + // ... } ``` @@ -379,7 +392,7 @@ type BrokerController interface { Publish(ctx context.Context, channel string, mw extensions.BrokerMessage) error // Subscribe to messages from the broker - Subscribe(ctx context.Context, channel string) (msgs chan extensions.BrokerMessage, stop chan interface{}, err error) + Subscribe(ctx context.Context, channel string) (msgs chan extensions.BrokerMessage, stop chan any, err error) } ``` @@ -435,7 +448,7 @@ If you want to target specific messages, you can use the context passed in argum ```golang import( "github.com/lerenn/asyncapi-codegen/pkg/extensions" - /* ... */ + // ... ) func myMiddleware(ctx context.Context, _ middleware.Next) context.Context { @@ -462,7 +475,7 @@ Here is an example: ```golang import( "github.com/lerenn/asyncapi-codegen/pkg/extensions" - /* ... */ + // ... ) func surroundingMiddleware(ctx context.Context, next extensions.NextMiddleware) context.Context { @@ -511,11 +524,15 @@ to initialize the controller with a logger, with the function `WithLogger()`: ```golang import( "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers" - /* ... */ + // ... ) -// Create a new app controller with an Elastic Common Schema JSON compatible logger -ctrl, _ := NewAppController(/* Broker of your choice */, WithLogger(log.NewECS())) +func main() { + // Create a new app controller with an Elastic Common Schema JSON compatible logger + ctrl, _ := NewAppController(/* Broker of your choice */, WithLogger(log.NewECS())) + + // ... +} ``` You can find all loggers in the directory `pkg/log`. @@ -528,12 +545,16 @@ in order to execute it on every published and received messages: ```golang import( "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers" - /* ... */ + // ... ) -// Create a new app controller with a middleware for logging incoming/outgoing messages -loggingMiddleware := middleware.Logging(log.NewECS()) -ctrl, _ := NewAppController(/* Broker of your choice */, WithMiddlewares(loggingMiddleware)) +func main() { + // Create a new app controller with a middleware for logging incoming/outgoing messages + loggingMiddleware := middleware.Logging(log.NewECS()) + ctrl, _ := NewAppController(/* Broker of your choice */, WithMiddlewares(loggingMiddleware)) + + // ... +} ``` #### Custom logging @@ -584,7 +605,78 @@ ctrl, _ := NewAppController( ) ``` -### Extensions +### Versioning + +If you are in need to do a migration or support multiple versions of your +AsyncAPI specifications, you can use the `versioning` package: + +```golang + +import ( + "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/nats" + "github.com/lerenn/asyncapi-codegen/pkg/extensions/versioning" + v1 "path/to/asyncapi/spec/version/1" + v2 "path/to/asyncapi/spec/version/2" +) + +func main() { + // Create a broker (here from NATS) + broker := nats.NewController("nats://nats:4222")) + + // Add a version wrapper to the broker + vw := versioning.NewWrapper(broker) + + // Create application for version 1 + appV1, _ := v1.NewAppController(vw, /* controller options */) + defer appV1.Close(context.Background()) + + // Create v2 app + appV2, _ := v2.NewAppController(vw, /* controller options */) + defer appV2.Close(context.Background()) + + // ... +} +``` + +Then you can use each application independently: + +```golang +err := appV1.SubscribeHello(context.Background(), func(_ context.Context, msg v1.HelloMessage, _ bool) { + // Stuff for version 1 +}) + +err := appV2.SubscribeHello(context.Background(), func(_ context.Context, msg v2.HelloMessage, _ bool) { + // Stuff for version 2 +}) +``` + +That way, you can support multiple different versions with the same broker. + +#### Version tagging + +**Important**: this feature will add an `application-version` header to each +message in order to have the correct version of the application on each of +them. + +##### Non-tagged messages + +If messages can have no `application-version`, you can use the option `WithDefaultVersion` +to add a default version to non-tagged messages. + +```golang +vw := versioning.NewWrapper(broker, versioning.WithDefaultVersion("1.1.4")) +``` + +##### Change header key for application version + +Also, if you don't want to use this header as a recipient to the application version, +you can specify your own header with the option `WithVersionHeaderKey`. + +```golang +vw := versioning.NewWrapper(broker, versioning.WithVersionHeaderKey("my-version-key")) +``` + +### AsyncAPI Extensions #### Schema Object extensions diff --git a/docker-compose.yaml b/docker-compose.yaml index c90d5cb0..8b00c2da 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -2,7 +2,7 @@ version: '3.5' services: kafka: - image: 'bitnami/kafka:latest' + image: 'bitnami/kafka:3.5.1' ports: # Listening on a different port to avoid collision with 9092 internal listener # Prefering to keep 9092 for internal listener as it is the port for examples @@ -19,7 +19,8 @@ services: networks: - brokers nats: - image: nats:alpine + image: nats:2.10 + command: ["-V"] ports: - 4222:4222 networks: diff --git a/examples/helloworld/nats/app/app.gen.go b/examples/helloworld/nats/app/app.gen.go index 6cc1196c..354a16c2 100644 --- a/examples/helloworld/nats/app/app.gen.go +++ b/examples/helloworld/nats/app/app.gen.go @@ -5,8 +5,6 @@ package main import ( "context" - "encoding/json" - "errors" "fmt" "github.com/lerenn/asyncapi-codegen/pkg/extensions" @@ -28,15 +26,15 @@ type AppController struct { func NewAppController(bc extensions.BrokerController, options ...ControllerOption) (*AppController, error) { // Check if broker controller has been provided if bc == nil { - return nil, ErrNilBrokerController + return nil, extensions.ErrNilBrokerController } // Create default controller controller := controller{ - broker: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), + broker: bc, + cancelChannels: make(map[string]chan any), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), } // Apply options @@ -86,6 +84,7 @@ func (c AppController) executeMiddlewares(ctx context.Context, callback func(ctx } func addAppContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "0.1.0") ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "app") return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) } @@ -94,6 +93,7 @@ func addAppContextValues(ctx context.Context, path string) context.Context { func (c *AppController) Close(ctx context.Context) { // Unsubscribing remaining channels c.UnsubscribeAll(ctx) + c.logger.Info(ctx, "Closed app controller") } @@ -101,7 +101,7 @@ func (c *AppController) Close(ctx context.Context) { // For channels with parameters, they should be subscribed independently. func (c *AppController) SubscribeAll(ctx context.Context, as AppSubscriber) error { if as == nil { - return ErrNilAppSubscriber + return extensions.ErrNilAppSubscriber } if err := c.SubscribeHello(ctx, as.Hello); err != nil { @@ -113,14 +113,7 @@ func (c *AppController) SubscribeAll(ctx context.Context, as AppSubscriber) erro // UnsubscribeAll will unsubscribe all remaining subscribed channels func (c *AppController) UnsubscribeAll(ctx context.Context) { - // Unsubscribe channels with no parameters (if any) c.UnsubscribeHello(ctx) - - // Unsubscribe remaining channels - for n, stopChan := range c.stopSubscribers { - stopChan <- true - delete(c.stopSubscribers, n) - } } // SubscribeHello will subscribe to new messages from 'hello' channel. @@ -136,15 +129,15 @@ func (c *AppController) SubscribeHello(ctx context.Context, fn func(ctx context. ctx = addAppContextValues(ctx, path) // Check if there is already a subscription - _, exists := c.stopSubscribers[path] + _, exists := c.cancelChannels[path] if exists { - err := fmt.Errorf("%w: %q channel is already subscribed", ErrAlreadySubscribedChannel, path) + err := fmt.Errorf("%w: %q channel is already subscribed", extensions.ErrAlreadySubscribedChannel, path) c.logger.Error(ctx, err.Error()) return err } // Subscribe to broker channel - msgs, stop, err := c.broker.Subscribe(ctx, path) + msgs, cancel, err := c.broker.Subscribe(ctx, path) if err != nil { c.logger.Error(ctx, err.Error()) return err @@ -185,8 +178,8 @@ func (c *AppController) SubscribeHello(ctx context.Context, fn func(ctx context. } }() - // Add the stop channel to the inside map - c.stopSubscribers[path] = stop + // Add the cancel channel to the inside map + c.cancelChannels[path] = cancel return nil } @@ -196,54 +189,33 @@ func (c *AppController) UnsubscribeHello(ctx context.Context) { // Get channel path path := "hello" - // Set context - ctx = addAppContextValues(ctx, path) - - // Get stop channel - stopChan, exists := c.stopSubscribers[path] + // Check if there subscribers for this channel + cancel, exists := c.cancelChannels[path] if !exists { return } - // Stop the channel and remove the entry - stopChan <- true - delete(c.stopSubscribers, path) - - c.logger.Info(ctx, "Unsubscribed from channel") -} - -var ( - // Generic error for AsyncAPI generated code - ErrAsyncAPI = errors.New("error when using AsyncAPI") - - // ErrContextCanceled is given when a given context is canceled - ErrContextCanceled = fmt.Errorf("%w: context canceled", ErrAsyncAPI) - - // ErrNilBrokerController is raised when a nil broker controller is user - ErrNilBrokerController = fmt.Errorf("%w: nil broker controller has been used", ErrAsyncAPI) - - // ErrNilAppSubscriber is raised when a nil app subscriber is user - ErrNilAppSubscriber = fmt.Errorf("%w: nil app subscriber has been used", ErrAsyncAPI) + // Set context + ctx = addAppContextValues(ctx, path) - // ErrNilUserSubscriber is raised when a nil user subscriber is user - ErrNilUserSubscriber = fmt.Errorf("%w: nil user subscriber has been used", ErrAsyncAPI) + // Stop the subscription and wait for its closure to be complete + cancel <- true + <-cancel - // ErrAlreadySubscribedChannel is raised when a subscription is done twice - // or more without unsubscribing - ErrAlreadySubscribedChannel = fmt.Errorf("%w: the channel has already been subscribed", ErrAsyncAPI) + // Remove if from the subscribers + delete(c.cancelChannels, path) - // ErrSubscriptionCanceled is raised when expecting something and the subscription has been canceled before it happens - ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI) -) + c.logger.Info(ctx, "Unsubscribed from channel") +} // controller is the controller that will be used to communicate with the broker // It will be used internally by AppController and UserController type controller struct { // broker is the broker controller that will be used to communicate broker extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller + // cancelChannels is a map of cancel channels for each subscribed channel + cancelChannels map[string]chan any + // logger is the logger that will be used² to log operations on controller logger extensions.Logger // middlewares are the middlewares that will be executed when sending or // receiving messages @@ -298,11 +270,8 @@ func NewHelloMessage() HelloMessage { func newHelloMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (HelloMessage, error) { var msg HelloMessage - // Unmarshal payload to expected message payload format - err := json.Unmarshal(bMsg.Payload, &msg.Payload) - if err != nil { - return msg, err - } + // Convert to string + msg.Payload = string(bMsg.Payload) // TODO: run checks on msg type @@ -313,11 +282,8 @@ func newHelloMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (HelloMessa func (msg HelloMessage) toBrokerMessage() (extensions.BrokerMessage, error) { // TODO: implement checks on message - // Marshal payload to JSON - payload, err := json.Marshal(msg.Payload) - if err != nil { - return extensions.BrokerMessage{}, err - } + // Convert to []byte + payload := []byte(msg.Payload) // There is no headers here headers := make(map[string][]byte, 0) diff --git a/examples/helloworld/nats/app/main.go b/examples/helloworld/nats/app/main.go index 84bba120..6df78f76 100644 --- a/examples/helloworld/nats/app/main.go +++ b/examples/helloworld/nats/app/main.go @@ -12,8 +12,11 @@ import ( ) func main() { + // Create a new broker + broker := nats.NewController("nats://nats:4222", nats.WithQueueGroup("helloworld-apps")) + // Create a new application controller - ctrl, err := NewAppController(nats.NewController("nats://nats:4222")) + ctrl, err := NewAppController(broker) if err != nil { panic(err) } diff --git a/examples/helloworld/nats/user/main.go b/examples/helloworld/nats/user/main.go index 675cca33..8a63644e 100644 --- a/examples/helloworld/nats/user/main.go +++ b/examples/helloworld/nats/user/main.go @@ -10,8 +10,11 @@ import ( ) func main() { + // Create a new broker + broker := nats.NewController("nats://nats:4222", nats.WithQueueGroup("helloworld-users")) + // Create a new user controller - ctrl, err := NewUserController(nats.NewController("nats://nats:4222")) + ctrl, err := NewUserController(broker) if err != nil { panic(err) } diff --git a/examples/helloworld/nats/user/user.gen.go b/examples/helloworld/nats/user/user.gen.go index 3e70ba4a..6046aeb7 100644 --- a/examples/helloworld/nats/user/user.gen.go +++ b/examples/helloworld/nats/user/user.gen.go @@ -5,8 +5,6 @@ package main import ( "context" - "encoding/json" - "errors" "fmt" "github.com/lerenn/asyncapi-codegen/pkg/extensions" @@ -22,15 +20,15 @@ type UserController struct { func NewUserController(bc extensions.BrokerController, options ...ControllerOption) (*UserController, error) { // Check if broker controller has been provided if bc == nil { - return nil, ErrNilBrokerController + return nil, extensions.ErrNilBrokerController } // Create default controller controller := controller{ - broker: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), + broker: bc, + cancelChannels: make(map[string]chan any), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), } // Apply options @@ -80,6 +78,7 @@ func (c UserController) executeMiddlewares(ctx context.Context, callback func(ct } func addUserContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "0.1.0") ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "user") return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) } @@ -117,38 +116,14 @@ func (c *UserController) PublishHello(ctx context.Context, msg HelloMessage) err return err } -var ( - // Generic error for AsyncAPI generated code - ErrAsyncAPI = errors.New("error when using AsyncAPI") - - // ErrContextCanceled is given when a given context is canceled - ErrContextCanceled = fmt.Errorf("%w: context canceled", ErrAsyncAPI) - - // ErrNilBrokerController is raised when a nil broker controller is user - ErrNilBrokerController = fmt.Errorf("%w: nil broker controller has been used", ErrAsyncAPI) - - // ErrNilAppSubscriber is raised when a nil app subscriber is user - ErrNilAppSubscriber = fmt.Errorf("%w: nil app subscriber has been used", ErrAsyncAPI) - - // ErrNilUserSubscriber is raised when a nil user subscriber is user - ErrNilUserSubscriber = fmt.Errorf("%w: nil user subscriber has been used", ErrAsyncAPI) - - // ErrAlreadySubscribedChannel is raised when a subscription is done twice - // or more without unsubscribing - ErrAlreadySubscribedChannel = fmt.Errorf("%w: the channel has already been subscribed", ErrAsyncAPI) - - // ErrSubscriptionCanceled is raised when expecting something and the subscription has been canceled before it happens - ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI) -) - // controller is the controller that will be used to communicate with the broker // It will be used internally by AppController and UserController type controller struct { // broker is the broker controller that will be used to communicate broker extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller + // cancelChannels is a map of cancel channels for each subscribed channel + cancelChannels map[string]chan any + // logger is the logger that will be used² to log operations on controller logger extensions.Logger // middlewares are the middlewares that will be executed when sending or // receiving messages @@ -203,11 +178,8 @@ func NewHelloMessage() HelloMessage { func newHelloMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (HelloMessage, error) { var msg HelloMessage - // Unmarshal payload to expected message payload format - err := json.Unmarshal(bMsg.Payload, &msg.Payload) - if err != nil { - return msg, err - } + // Convert to string + msg.Payload = string(bMsg.Payload) // TODO: run checks on msg type @@ -218,11 +190,8 @@ func newHelloMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (HelloMessa func (msg HelloMessage) toBrokerMessage() (extensions.BrokerMessage, error) { // TODO: implement checks on message - // Marshal payload to JSON - payload, err := json.Marshal(msg.Payload) - if err != nil { - return extensions.BrokerMessage{}, err - } + // Convert to []byte + payload := []byte(msg.Payload) // There is no headers here headers := make(map[string][]byte, 0) diff --git a/examples/ping/kafka/app/app.gen.go b/examples/ping/kafka/app/app.gen.go index be73d509..dca77c20 100644 --- a/examples/ping/kafka/app/app.gen.go +++ b/examples/ping/kafka/app/app.gen.go @@ -6,7 +6,6 @@ package main import ( "context" "encoding/json" - "errors" "fmt" "time" @@ -31,15 +30,15 @@ type AppController struct { func NewAppController(bc extensions.BrokerController, options ...ControllerOption) (*AppController, error) { // Check if broker controller has been provided if bc == nil { - return nil, ErrNilBrokerController + return nil, extensions.ErrNilBrokerController } // Create default controller controller := controller{ - broker: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), + broker: bc, + cancelChannels: make(map[string]chan any), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), } // Apply options @@ -89,6 +88,7 @@ func (c AppController) executeMiddlewares(ctx context.Context, callback func(ctx } func addAppContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.0.0") ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "app") return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) } @@ -97,6 +97,7 @@ func addAppContextValues(ctx context.Context, path string) context.Context { func (c *AppController) Close(ctx context.Context) { // Unsubscribing remaining channels c.UnsubscribeAll(ctx) + c.logger.Info(ctx, "Closed app controller") } @@ -104,7 +105,7 @@ func (c *AppController) Close(ctx context.Context) { // For channels with parameters, they should be subscribed independently. func (c *AppController) SubscribeAll(ctx context.Context, as AppSubscriber) error { if as == nil { - return ErrNilAppSubscriber + return extensions.ErrNilAppSubscriber } if err := c.SubscribePing(ctx, as.Ping); err != nil { @@ -116,14 +117,7 @@ func (c *AppController) SubscribeAll(ctx context.Context, as AppSubscriber) erro // UnsubscribeAll will unsubscribe all remaining subscribed channels func (c *AppController) UnsubscribeAll(ctx context.Context) { - // Unsubscribe channels with no parameters (if any) c.UnsubscribePing(ctx) - - // Unsubscribe remaining channels - for n, stopChan := range c.stopSubscribers { - stopChan <- true - delete(c.stopSubscribers, n) - } } // SubscribePing will subscribe to new messages from 'ping' channel. @@ -139,15 +133,15 @@ func (c *AppController) SubscribePing(ctx context.Context, fn func(ctx context.C ctx = addAppContextValues(ctx, path) // Check if there is already a subscription - _, exists := c.stopSubscribers[path] + _, exists := c.cancelChannels[path] if exists { - err := fmt.Errorf("%w: %q channel is already subscribed", ErrAlreadySubscribedChannel, path) + err := fmt.Errorf("%w: %q channel is already subscribed", extensions.ErrAlreadySubscribedChannel, path) c.logger.Error(ctx, err.Error()) return err } // Subscribe to broker channel - msgs, stop, err := c.broker.Subscribe(ctx, path) + msgs, cancel, err := c.broker.Subscribe(ctx, path) if err != nil { c.logger.Error(ctx, err.Error()) return err @@ -193,8 +187,8 @@ func (c *AppController) SubscribePing(ctx context.Context, fn func(ctx context.C } }() - // Add the stop channel to the inside map - c.stopSubscribers[path] = stop + // Add the cancel channel to the inside map + c.cancelChannels[path] = cancel return nil } @@ -204,18 +198,21 @@ func (c *AppController) UnsubscribePing(ctx context.Context) { // Get channel path path := "ping" - // Set context - ctx = addAppContextValues(ctx, path) - - // Get stop channel - stopChan, exists := c.stopSubscribers[path] + // Check if there subscribers for this channel + cancel, exists := c.cancelChannels[path] if !exists { return } - // Stop the channel and remove the entry - stopChan <- true - delete(c.stopSubscribers, path) + // Set context + ctx = addAppContextValues(ctx, path) + + // Stop the subscription and wait for its closure to be complete + cancel <- true + <-cancel + + // Remove if from the subscribers + delete(c.cancelChannels, path) c.logger.Info(ctx, "Unsubscribed from channel") } @@ -254,38 +251,14 @@ func (c *AppController) PublishPong(ctx context.Context, msg PongMessage) error return err } -var ( - // Generic error for AsyncAPI generated code - ErrAsyncAPI = errors.New("error when using AsyncAPI") - - // ErrContextCanceled is given when a given context is canceled - ErrContextCanceled = fmt.Errorf("%w: context canceled", ErrAsyncAPI) - - // ErrNilBrokerController is raised when a nil broker controller is user - ErrNilBrokerController = fmt.Errorf("%w: nil broker controller has been used", ErrAsyncAPI) - - // ErrNilAppSubscriber is raised when a nil app subscriber is user - ErrNilAppSubscriber = fmt.Errorf("%w: nil app subscriber has been used", ErrAsyncAPI) - - // ErrNilUserSubscriber is raised when a nil user subscriber is user - ErrNilUserSubscriber = fmt.Errorf("%w: nil user subscriber has been used", ErrAsyncAPI) - - // ErrAlreadySubscribedChannel is raised when a subscription is done twice - // or more without unsubscribing - ErrAlreadySubscribedChannel = fmt.Errorf("%w: the channel has already been subscribed", ErrAsyncAPI) - - // ErrSubscriptionCanceled is raised when expecting something and the subscription has been canceled before it happens - ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI) -) - // controller is the controller that will be used to communicate with the broker // It will be used internally by AppController and UserController type controller struct { // broker is the broker controller that will be used to communicate broker extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller + // cancelChannels is a map of cancel channels for each subscribed channel + cancelChannels map[string]chan any + // logger is the logger that will be used² to log operations on controller logger extensions.Logger // middlewares are the middlewares that will be executed when sending or // receiving messages @@ -350,11 +323,8 @@ func NewPingMessage() PingMessage { func newPingMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (PingMessage, error) { var msg PingMessage - // Unmarshal payload to expected message payload format - err := json.Unmarshal(bMsg.Payload, &msg.Payload) - if err != nil { - return msg, err - } + // Convert to string + msg.Payload = string(bMsg.Payload) // Get each headers from broker message for k, v := range bMsg.Headers { @@ -376,11 +346,8 @@ func newPingMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (PingMessage func (msg PingMessage) toBrokerMessage() (extensions.BrokerMessage, error) { // TODO: implement checks on message - // Marshal payload to JSON - payload, err := json.Marshal(msg.Payload) - if err != nil { - return extensions.BrokerMessage{}, err - } + // Convert to []byte + payload := []byte(msg.Payload) // Add each headers to broker message headers := make(map[string][]byte, 1) diff --git a/examples/ping/kafka/user/main.go b/examples/ping/kafka/user/main.go index fdd5bd4a..55f8c65f 100644 --- a/examples/ping/kafka/user/main.go +++ b/examples/ping/kafka/user/main.go @@ -4,7 +4,6 @@ package main import ( "context" - "time" "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/kafka" "github.com/lerenn/asyncapi-codegen/pkg/extensions/loggers" @@ -53,7 +52,4 @@ func main() { if err != nil { panic(err) } - - // Wait for the message to be received - time.Sleep(time.Second) } diff --git a/examples/ping/kafka/user/user.gen.go b/examples/ping/kafka/user/user.gen.go index c780d36e..8aba9f00 100644 --- a/examples/ping/kafka/user/user.gen.go +++ b/examples/ping/kafka/user/user.gen.go @@ -6,7 +6,6 @@ package main import ( "context" "encoding/json" - "errors" "fmt" "time" @@ -31,15 +30,15 @@ type UserController struct { func NewUserController(bc extensions.BrokerController, options ...ControllerOption) (*UserController, error) { // Check if broker controller has been provided if bc == nil { - return nil, ErrNilBrokerController + return nil, extensions.ErrNilBrokerController } // Create default controller controller := controller{ - broker: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), + broker: bc, + cancelChannels: make(map[string]chan any), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), } // Apply options @@ -89,6 +88,7 @@ func (c UserController) executeMiddlewares(ctx context.Context, callback func(ct } func addUserContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.0.0") ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "user") return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) } @@ -97,6 +97,7 @@ func addUserContextValues(ctx context.Context, path string) context.Context { func (c *UserController) Close(ctx context.Context) { // Unsubscribing remaining channels c.UnsubscribeAll(ctx) + c.logger.Info(ctx, "Closed user controller") } @@ -104,7 +105,7 @@ func (c *UserController) Close(ctx context.Context) { // For channels with parameters, they should be subscribed independently. func (c *UserController) SubscribeAll(ctx context.Context, as UserSubscriber) error { if as == nil { - return ErrNilUserSubscriber + return extensions.ErrNilUserSubscriber } if err := c.SubscribePong(ctx, as.Pong); err != nil { @@ -116,14 +117,7 @@ func (c *UserController) SubscribeAll(ctx context.Context, as UserSubscriber) er // UnsubscribeAll will unsubscribe all remaining subscribed channels func (c *UserController) UnsubscribeAll(ctx context.Context) { - // Unsubscribe channels with no parameters (if any) c.UnsubscribePong(ctx) - - // Unsubscribe remaining channels - for n, stopChan := range c.stopSubscribers { - stopChan <- true - delete(c.stopSubscribers, n) - } } // SubscribePong will subscribe to new messages from 'pong' channel. @@ -139,15 +133,15 @@ func (c *UserController) SubscribePong(ctx context.Context, fn func(ctx context. ctx = addUserContextValues(ctx, path) // Check if there is already a subscription - _, exists := c.stopSubscribers[path] + _, exists := c.cancelChannels[path] if exists { - err := fmt.Errorf("%w: %q channel is already subscribed", ErrAlreadySubscribedChannel, path) + err := fmt.Errorf("%w: %q channel is already subscribed", extensions.ErrAlreadySubscribedChannel, path) c.logger.Error(ctx, err.Error()) return err } // Subscribe to broker channel - msgs, stop, err := c.broker.Subscribe(ctx, path) + msgs, cancel, err := c.broker.Subscribe(ctx, path) if err != nil { c.logger.Error(ctx, err.Error()) return err @@ -193,8 +187,8 @@ func (c *UserController) SubscribePong(ctx context.Context, fn func(ctx context. } }() - // Add the stop channel to the inside map - c.stopSubscribers[path] = stop + // Add the cancel channel to the inside map + c.cancelChannels[path] = cancel return nil } @@ -204,18 +198,21 @@ func (c *UserController) UnsubscribePong(ctx context.Context) { // Get channel path path := "pong" - // Set context - ctx = addUserContextValues(ctx, path) - - // Get stop channel - stopChan, exists := c.stopSubscribers[path] + // Check if there subscribers for this channel + cancel, exists := c.cancelChannels[path] if !exists { return } - // Stop the channel and remove the entry - stopChan <- true - delete(c.stopSubscribers, path) + // Set context + ctx = addUserContextValues(ctx, path) + + // Stop the subscription and wait for its closure to be complete + cancel <- true + <-cancel + + // Remove if from the subscribers + delete(c.cancelChannels, path) c.logger.Info(ctx, "Unsubscribed from channel") } @@ -266,7 +263,7 @@ func (cc *UserController) WaitForPong(ctx context.Context, publishMsg MessageWit ctx = addUserContextValues(ctx, path) // Subscribe to broker channel - msgs, stop, err := cc.broker.Subscribe(ctx, path) + messages, cancel, err := cc.broker.Subscribe(ctx, path) if err != nil { cc.logger.Error(ctx, err.Error()) return PongMessage{}, err @@ -275,8 +272,9 @@ func (cc *UserController) WaitForPong(ctx context.Context, publishMsg MessageWit // Close subscriber on leave defer func() { - // Unsubscribe - stop <- true + // Stop the subscription and wait for its closure to be complete + cancel <- true + <-cancel // Logging unsubscribing cc.logger.Info(ctx, "Unsubscribed from channel") @@ -290,7 +288,7 @@ func (cc *UserController) WaitForPong(ctx context.Context, publishMsg MessageWit // Wait for corresponding response for { select { - case bMsg, open := <-msgs: + case bMsg, open := <-messages: // Get new message msg, err := newPongMessageFromBrokerMessage(bMsg) if err != nil { @@ -313,47 +311,23 @@ func (cc *UserController) WaitForPong(ctx context.Context, publishMsg MessageWit return msg, nil } else if !open { // If message is invalid or not corresponding and the subscription is closed, then set corresponding error cc.logger.Error(ctx, "Channel closed before getting message") - return PongMessage{}, ErrSubscriptionCanceled + return PongMessage{}, extensions.ErrSubscriptionCanceled } case <-ctx.Done(): // Set corrsponding error if context is done cc.logger.Error(ctx, "Context done before getting message") - return PongMessage{}, ErrContextCanceled + return PongMessage{}, extensions.ErrContextCanceled } } } -var ( - // Generic error for AsyncAPI generated code - ErrAsyncAPI = errors.New("error when using AsyncAPI") - - // ErrContextCanceled is given when a given context is canceled - ErrContextCanceled = fmt.Errorf("%w: context canceled", ErrAsyncAPI) - - // ErrNilBrokerController is raised when a nil broker controller is user - ErrNilBrokerController = fmt.Errorf("%w: nil broker controller has been used", ErrAsyncAPI) - - // ErrNilAppSubscriber is raised when a nil app subscriber is user - ErrNilAppSubscriber = fmt.Errorf("%w: nil app subscriber has been used", ErrAsyncAPI) - - // ErrNilUserSubscriber is raised when a nil user subscriber is user - ErrNilUserSubscriber = fmt.Errorf("%w: nil user subscriber has been used", ErrAsyncAPI) - - // ErrAlreadySubscribedChannel is raised when a subscription is done twice - // or more without unsubscribing - ErrAlreadySubscribedChannel = fmt.Errorf("%w: the channel has already been subscribed", ErrAsyncAPI) - - // ErrSubscriptionCanceled is raised when expecting something and the subscription has been canceled before it happens - ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI) -) - // controller is the controller that will be used to communicate with the broker // It will be used internally by AppController and UserController type controller struct { // broker is the broker controller that will be used to communicate broker extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller + // cancelChannels is a map of cancel channels for each subscribed channel + cancelChannels map[string]chan any + // logger is the logger that will be used² to log operations on controller logger extensions.Logger // middlewares are the middlewares that will be executed when sending or // receiving messages @@ -418,11 +392,8 @@ func NewPingMessage() PingMessage { func newPingMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (PingMessage, error) { var msg PingMessage - // Unmarshal payload to expected message payload format - err := json.Unmarshal(bMsg.Payload, &msg.Payload) - if err != nil { - return msg, err - } + // Convert to string + msg.Payload = string(bMsg.Payload) // Get each headers from broker message for k, v := range bMsg.Headers { @@ -444,11 +415,8 @@ func newPingMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (PingMessage func (msg PingMessage) toBrokerMessage() (extensions.BrokerMessage, error) { // TODO: implement checks on message - // Marshal payload to JSON - payload, err := json.Marshal(msg.Payload) - if err != nil { - return extensions.BrokerMessage{}, err - } + // Convert to []byte + payload := []byte(msg.Payload) // Add each headers to broker message headers := make(map[string][]byte, 1) diff --git a/examples/ping/nats/app/app.gen.go b/examples/ping/nats/app/app.gen.go index be73d509..dca77c20 100644 --- a/examples/ping/nats/app/app.gen.go +++ b/examples/ping/nats/app/app.gen.go @@ -6,7 +6,6 @@ package main import ( "context" "encoding/json" - "errors" "fmt" "time" @@ -31,15 +30,15 @@ type AppController struct { func NewAppController(bc extensions.BrokerController, options ...ControllerOption) (*AppController, error) { // Check if broker controller has been provided if bc == nil { - return nil, ErrNilBrokerController + return nil, extensions.ErrNilBrokerController } // Create default controller controller := controller{ - broker: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), + broker: bc, + cancelChannels: make(map[string]chan any), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), } // Apply options @@ -89,6 +88,7 @@ func (c AppController) executeMiddlewares(ctx context.Context, callback func(ctx } func addAppContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.0.0") ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "app") return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) } @@ -97,6 +97,7 @@ func addAppContextValues(ctx context.Context, path string) context.Context { func (c *AppController) Close(ctx context.Context) { // Unsubscribing remaining channels c.UnsubscribeAll(ctx) + c.logger.Info(ctx, "Closed app controller") } @@ -104,7 +105,7 @@ func (c *AppController) Close(ctx context.Context) { // For channels with parameters, they should be subscribed independently. func (c *AppController) SubscribeAll(ctx context.Context, as AppSubscriber) error { if as == nil { - return ErrNilAppSubscriber + return extensions.ErrNilAppSubscriber } if err := c.SubscribePing(ctx, as.Ping); err != nil { @@ -116,14 +117,7 @@ func (c *AppController) SubscribeAll(ctx context.Context, as AppSubscriber) erro // UnsubscribeAll will unsubscribe all remaining subscribed channels func (c *AppController) UnsubscribeAll(ctx context.Context) { - // Unsubscribe channels with no parameters (if any) c.UnsubscribePing(ctx) - - // Unsubscribe remaining channels - for n, stopChan := range c.stopSubscribers { - stopChan <- true - delete(c.stopSubscribers, n) - } } // SubscribePing will subscribe to new messages from 'ping' channel. @@ -139,15 +133,15 @@ func (c *AppController) SubscribePing(ctx context.Context, fn func(ctx context.C ctx = addAppContextValues(ctx, path) // Check if there is already a subscription - _, exists := c.stopSubscribers[path] + _, exists := c.cancelChannels[path] if exists { - err := fmt.Errorf("%w: %q channel is already subscribed", ErrAlreadySubscribedChannel, path) + err := fmt.Errorf("%w: %q channel is already subscribed", extensions.ErrAlreadySubscribedChannel, path) c.logger.Error(ctx, err.Error()) return err } // Subscribe to broker channel - msgs, stop, err := c.broker.Subscribe(ctx, path) + msgs, cancel, err := c.broker.Subscribe(ctx, path) if err != nil { c.logger.Error(ctx, err.Error()) return err @@ -193,8 +187,8 @@ func (c *AppController) SubscribePing(ctx context.Context, fn func(ctx context.C } }() - // Add the stop channel to the inside map - c.stopSubscribers[path] = stop + // Add the cancel channel to the inside map + c.cancelChannels[path] = cancel return nil } @@ -204,18 +198,21 @@ func (c *AppController) UnsubscribePing(ctx context.Context) { // Get channel path path := "ping" - // Set context - ctx = addAppContextValues(ctx, path) - - // Get stop channel - stopChan, exists := c.stopSubscribers[path] + // Check if there subscribers for this channel + cancel, exists := c.cancelChannels[path] if !exists { return } - // Stop the channel and remove the entry - stopChan <- true - delete(c.stopSubscribers, path) + // Set context + ctx = addAppContextValues(ctx, path) + + // Stop the subscription and wait for its closure to be complete + cancel <- true + <-cancel + + // Remove if from the subscribers + delete(c.cancelChannels, path) c.logger.Info(ctx, "Unsubscribed from channel") } @@ -254,38 +251,14 @@ func (c *AppController) PublishPong(ctx context.Context, msg PongMessage) error return err } -var ( - // Generic error for AsyncAPI generated code - ErrAsyncAPI = errors.New("error when using AsyncAPI") - - // ErrContextCanceled is given when a given context is canceled - ErrContextCanceled = fmt.Errorf("%w: context canceled", ErrAsyncAPI) - - // ErrNilBrokerController is raised when a nil broker controller is user - ErrNilBrokerController = fmt.Errorf("%w: nil broker controller has been used", ErrAsyncAPI) - - // ErrNilAppSubscriber is raised when a nil app subscriber is user - ErrNilAppSubscriber = fmt.Errorf("%w: nil app subscriber has been used", ErrAsyncAPI) - - // ErrNilUserSubscriber is raised when a nil user subscriber is user - ErrNilUserSubscriber = fmt.Errorf("%w: nil user subscriber has been used", ErrAsyncAPI) - - // ErrAlreadySubscribedChannel is raised when a subscription is done twice - // or more without unsubscribing - ErrAlreadySubscribedChannel = fmt.Errorf("%w: the channel has already been subscribed", ErrAsyncAPI) - - // ErrSubscriptionCanceled is raised when expecting something and the subscription has been canceled before it happens - ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI) -) - // controller is the controller that will be used to communicate with the broker // It will be used internally by AppController and UserController type controller struct { // broker is the broker controller that will be used to communicate broker extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller + // cancelChannels is a map of cancel channels for each subscribed channel + cancelChannels map[string]chan any + // logger is the logger that will be used² to log operations on controller logger extensions.Logger // middlewares are the middlewares that will be executed when sending or // receiving messages @@ -350,11 +323,8 @@ func NewPingMessage() PingMessage { func newPingMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (PingMessage, error) { var msg PingMessage - // Unmarshal payload to expected message payload format - err := json.Unmarshal(bMsg.Payload, &msg.Payload) - if err != nil { - return msg, err - } + // Convert to string + msg.Payload = string(bMsg.Payload) // Get each headers from broker message for k, v := range bMsg.Headers { @@ -376,11 +346,8 @@ func newPingMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (PingMessage func (msg PingMessage) toBrokerMessage() (extensions.BrokerMessage, error) { // TODO: implement checks on message - // Marshal payload to JSON - payload, err := json.Marshal(msg.Payload) - if err != nil { - return extensions.BrokerMessage{}, err - } + // Convert to []byte + payload := []byte(msg.Payload) // Add each headers to broker message headers := make(map[string][]byte, 1) diff --git a/examples/ping/nats/app/main.go b/examples/ping/nats/app/main.go index 745fbe25..b4ff168a 100644 --- a/examples/ping/nats/app/main.go +++ b/examples/ping/nats/app/main.go @@ -35,7 +35,11 @@ func (s ServerSubscriber) Ping(ctx context.Context, req PingMessage, _ bool) { func main() { // Instanciate a NATS controller with a logger logger := loggers.NewText() - broker := nats.NewController("nats://nats:4222", nats.WithLogger(logger)) + broker := nats.NewController( + "nats://nats:4222", // Set URL to broker + nats.WithLogger(logger), // Attach an internal logger + nats.WithQueueGroup("ping-apps"), // Set a specific queue group to avoid collisions + ) // Create a new app controller ctrl, err := NewAppController( diff --git a/examples/ping/nats/user/main.go b/examples/ping/nats/user/main.go index 60002627..50165447 100644 --- a/examples/ping/nats/user/main.go +++ b/examples/ping/nats/user/main.go @@ -4,7 +4,6 @@ package main import ( "context" - "time" "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/nats" "github.com/lerenn/asyncapi-codegen/pkg/extensions/loggers" @@ -14,7 +13,11 @@ import ( func main() { // Instanciate a NATS controller with a logger logger := loggers.NewText() - broker := nats.NewController("nats://nats:4222", nats.WithLogger(logger)) + broker := nats.NewController( + "nats://nats:4222", // Set URL to broker + nats.WithLogger(logger), // Attach an internal logger + nats.WithQueueGroup("ping-users"), // Set a specific queue group to avoid collisions + ) // Create a new user controller ctrl, err := NewUserController( @@ -49,7 +52,4 @@ func main() { if err != nil { panic(err) } - - // Wait for the message to be received - time.Sleep(time.Second) } diff --git a/examples/ping/nats/user/user.gen.go b/examples/ping/nats/user/user.gen.go index c780d36e..8aba9f00 100644 --- a/examples/ping/nats/user/user.gen.go +++ b/examples/ping/nats/user/user.gen.go @@ -6,7 +6,6 @@ package main import ( "context" "encoding/json" - "errors" "fmt" "time" @@ -31,15 +30,15 @@ type UserController struct { func NewUserController(bc extensions.BrokerController, options ...ControllerOption) (*UserController, error) { // Check if broker controller has been provided if bc == nil { - return nil, ErrNilBrokerController + return nil, extensions.ErrNilBrokerController } // Create default controller controller := controller{ - broker: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), + broker: bc, + cancelChannels: make(map[string]chan any), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), } // Apply options @@ -89,6 +88,7 @@ func (c UserController) executeMiddlewares(ctx context.Context, callback func(ct } func addUserContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.0.0") ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "user") return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) } @@ -97,6 +97,7 @@ func addUserContextValues(ctx context.Context, path string) context.Context { func (c *UserController) Close(ctx context.Context) { // Unsubscribing remaining channels c.UnsubscribeAll(ctx) + c.logger.Info(ctx, "Closed user controller") } @@ -104,7 +105,7 @@ func (c *UserController) Close(ctx context.Context) { // For channels with parameters, they should be subscribed independently. func (c *UserController) SubscribeAll(ctx context.Context, as UserSubscriber) error { if as == nil { - return ErrNilUserSubscriber + return extensions.ErrNilUserSubscriber } if err := c.SubscribePong(ctx, as.Pong); err != nil { @@ -116,14 +117,7 @@ func (c *UserController) SubscribeAll(ctx context.Context, as UserSubscriber) er // UnsubscribeAll will unsubscribe all remaining subscribed channels func (c *UserController) UnsubscribeAll(ctx context.Context) { - // Unsubscribe channels with no parameters (if any) c.UnsubscribePong(ctx) - - // Unsubscribe remaining channels - for n, stopChan := range c.stopSubscribers { - stopChan <- true - delete(c.stopSubscribers, n) - } } // SubscribePong will subscribe to new messages from 'pong' channel. @@ -139,15 +133,15 @@ func (c *UserController) SubscribePong(ctx context.Context, fn func(ctx context. ctx = addUserContextValues(ctx, path) // Check if there is already a subscription - _, exists := c.stopSubscribers[path] + _, exists := c.cancelChannels[path] if exists { - err := fmt.Errorf("%w: %q channel is already subscribed", ErrAlreadySubscribedChannel, path) + err := fmt.Errorf("%w: %q channel is already subscribed", extensions.ErrAlreadySubscribedChannel, path) c.logger.Error(ctx, err.Error()) return err } // Subscribe to broker channel - msgs, stop, err := c.broker.Subscribe(ctx, path) + msgs, cancel, err := c.broker.Subscribe(ctx, path) if err != nil { c.logger.Error(ctx, err.Error()) return err @@ -193,8 +187,8 @@ func (c *UserController) SubscribePong(ctx context.Context, fn func(ctx context. } }() - // Add the stop channel to the inside map - c.stopSubscribers[path] = stop + // Add the cancel channel to the inside map + c.cancelChannels[path] = cancel return nil } @@ -204,18 +198,21 @@ func (c *UserController) UnsubscribePong(ctx context.Context) { // Get channel path path := "pong" - // Set context - ctx = addUserContextValues(ctx, path) - - // Get stop channel - stopChan, exists := c.stopSubscribers[path] + // Check if there subscribers for this channel + cancel, exists := c.cancelChannels[path] if !exists { return } - // Stop the channel and remove the entry - stopChan <- true - delete(c.stopSubscribers, path) + // Set context + ctx = addUserContextValues(ctx, path) + + // Stop the subscription and wait for its closure to be complete + cancel <- true + <-cancel + + // Remove if from the subscribers + delete(c.cancelChannels, path) c.logger.Info(ctx, "Unsubscribed from channel") } @@ -266,7 +263,7 @@ func (cc *UserController) WaitForPong(ctx context.Context, publishMsg MessageWit ctx = addUserContextValues(ctx, path) // Subscribe to broker channel - msgs, stop, err := cc.broker.Subscribe(ctx, path) + messages, cancel, err := cc.broker.Subscribe(ctx, path) if err != nil { cc.logger.Error(ctx, err.Error()) return PongMessage{}, err @@ -275,8 +272,9 @@ func (cc *UserController) WaitForPong(ctx context.Context, publishMsg MessageWit // Close subscriber on leave defer func() { - // Unsubscribe - stop <- true + // Stop the subscription and wait for its closure to be complete + cancel <- true + <-cancel // Logging unsubscribing cc.logger.Info(ctx, "Unsubscribed from channel") @@ -290,7 +288,7 @@ func (cc *UserController) WaitForPong(ctx context.Context, publishMsg MessageWit // Wait for corresponding response for { select { - case bMsg, open := <-msgs: + case bMsg, open := <-messages: // Get new message msg, err := newPongMessageFromBrokerMessage(bMsg) if err != nil { @@ -313,47 +311,23 @@ func (cc *UserController) WaitForPong(ctx context.Context, publishMsg MessageWit return msg, nil } else if !open { // If message is invalid or not corresponding and the subscription is closed, then set corresponding error cc.logger.Error(ctx, "Channel closed before getting message") - return PongMessage{}, ErrSubscriptionCanceled + return PongMessage{}, extensions.ErrSubscriptionCanceled } case <-ctx.Done(): // Set corrsponding error if context is done cc.logger.Error(ctx, "Context done before getting message") - return PongMessage{}, ErrContextCanceled + return PongMessage{}, extensions.ErrContextCanceled } } } -var ( - // Generic error for AsyncAPI generated code - ErrAsyncAPI = errors.New("error when using AsyncAPI") - - // ErrContextCanceled is given when a given context is canceled - ErrContextCanceled = fmt.Errorf("%w: context canceled", ErrAsyncAPI) - - // ErrNilBrokerController is raised when a nil broker controller is user - ErrNilBrokerController = fmt.Errorf("%w: nil broker controller has been used", ErrAsyncAPI) - - // ErrNilAppSubscriber is raised when a nil app subscriber is user - ErrNilAppSubscriber = fmt.Errorf("%w: nil app subscriber has been used", ErrAsyncAPI) - - // ErrNilUserSubscriber is raised when a nil user subscriber is user - ErrNilUserSubscriber = fmt.Errorf("%w: nil user subscriber has been used", ErrAsyncAPI) - - // ErrAlreadySubscribedChannel is raised when a subscription is done twice - // or more without unsubscribing - ErrAlreadySubscribedChannel = fmt.Errorf("%w: the channel has already been subscribed", ErrAsyncAPI) - - // ErrSubscriptionCanceled is raised when expecting something and the subscription has been canceled before it happens - ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI) -) - // controller is the controller that will be used to communicate with the broker // It will be used internally by AppController and UserController type controller struct { // broker is the broker controller that will be used to communicate broker extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller + // cancelChannels is a map of cancel channels for each subscribed channel + cancelChannels map[string]chan any + // logger is the logger that will be used² to log operations on controller logger extensions.Logger // middlewares are the middlewares that will be executed when sending or // receiving messages @@ -418,11 +392,8 @@ func NewPingMessage() PingMessage { func newPingMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (PingMessage, error) { var msg PingMessage - // Unmarshal payload to expected message payload format - err := json.Unmarshal(bMsg.Payload, &msg.Payload) - if err != nil { - return msg, err - } + // Convert to string + msg.Payload = string(bMsg.Payload) // Get each headers from broker message for k, v := range bMsg.Headers { @@ -444,11 +415,8 @@ func newPingMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (PingMessage func (msg PingMessage) toBrokerMessage() (extensions.BrokerMessage, error) { // TODO: implement checks on message - // Marshal payload to JSON - payload, err := json.Marshal(msg.Payload) - if err != nil { - return extensions.BrokerMessage{}, err - } + // Convert to []byte + payload := []byte(msg.Payload) // Add each headers to broker message headers := make(map[string][]byte, 1) diff --git a/pkg/asyncapi/message.go b/pkg/asyncapi/message.go index 88612e92..402c9fb6 100644 --- a/pkg/asyncapi/message.go +++ b/pkg/asyncapi/message.go @@ -147,7 +147,7 @@ func downToCorrelationID(path []string, child *Schema) (correlationIDParent *Sch return correlationIDParent } -func (msg *Message) referenceFrom(ref []string) interface{} { +func (msg *Message) referenceFrom(ref []string) any { if len(ref) == 0 { return msg } diff --git a/pkg/asyncapi/specification.go b/pkg/asyncapi/specification.go index 242126de..6dd64f84 100644 --- a/pkg/asyncapi/specification.go +++ b/pkg/asyncapi/specification.go @@ -58,7 +58,7 @@ func (s Specification) ReferenceSchema(ref string) *Schema { return msg } -func (s Specification) reference(ref string) interface{} { +func (s Specification) reference(ref string) any { refPath := strings.Split(ref, "/")[1:] if refPath[0] == "components" { diff --git a/pkg/codegen/errors.go b/pkg/codegen/errors.go index d8223242..959c13cf 100644 --- a/pkg/codegen/errors.go +++ b/pkg/codegen/errors.go @@ -1,11 +1,15 @@ package codegen -import "errors" +import ( + "fmt" + + "github.com/lerenn/asyncapi-codegen/pkg/extensions" +) var ( // ErrInvalidBroker is an error raised when using an unknown broker. - ErrInvalidBroker = errors.New("invalid broker") + ErrInvalidBroker = fmt.Errorf("%w: invalid broker", extensions.ErrAsyncAPI) // ErrInvalidFileFormat is returned when using an invalid format for AsyncAPI specification. - ErrInvalidFileFormat = errors.New("invalid file format") + ErrInvalidFileFormat = fmt.Errorf("%w: invalid file format", extensions.ErrAsyncAPI) ) diff --git a/pkg/codegen/generators/controller.go b/pkg/codegen/generators/controller.go index 3394dd18..49ef9023 100644 --- a/pkg/codegen/generators/controller.go +++ b/pkg/codegen/generators/controller.go @@ -13,6 +13,7 @@ type ControllerGenerator struct { SubscribeChannels map[string]*asyncapi.Channel PublishChannels map[string]*asyncapi.Channel Prefix string + Version string } // NewControllerGenerator will create a new controller code generator. @@ -49,6 +50,9 @@ func NewControllerGenerator(side Side, spec asyncapi.Specification) ControllerGe gen.Prefix = "User" } + // Set version + gen.Version = spec.Info.Version + return gen } diff --git a/pkg/codegen/generators/templates/controller.tmpl b/pkg/codegen/generators/templates/controller.tmpl index d2239b3c..bdb98fbf 100644 --- a/pkg/codegen/generators/templates/controller.tmpl +++ b/pkg/codegen/generators/templates/controller.tmpl @@ -8,17 +8,17 @@ type {{ .Prefix }}Controller struct { func New{{ .Prefix }}Controller(bc extensions.BrokerController, options ...ControllerOption) (*{{ .Prefix }}Controller, error) { // Check if broker controller has been provided if bc == nil { - return nil, ErrNilBrokerController + return nil, extensions.ErrNilBrokerController } // Create default controller controller := controller{ - broker: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), + broker: bc, + cancelChannels: make(map[string]chan any), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), } - + // Apply options for _, option := range options { option(&controller) @@ -29,7 +29,7 @@ func New{{ .Prefix }}Controller(bc extensions.BrokerController, options ...Contr func (c {{ .Prefix }}Controller) wrapMiddlewares(middlewares []extensions.Middleware, last extensions.NextMiddleware) func(ctx context.Context) { var called bool - + // If there is no more middleware if len(middlewares) == 0 { return func(ctx context.Context) { @@ -66,6 +66,7 @@ func (c {{ .Prefix }}Controller) executeMiddlewares(ctx context.Context, callbac } func add{{ .Prefix }}ContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "{{ .Version }}") ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "{{ snakeCase .Prefix }}") return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) } @@ -75,6 +76,7 @@ func (c *{{ .Prefix }}Controller) Close(ctx context.Context) { // Unsubscribing remaining channels {{if .MethodCount -}} c.UnsubscribeAll(ctx) + c.logger.Info(ctx, "Closed {{ snakeCase .Prefix }} controller") {{end -}} } @@ -84,7 +86,7 @@ func (c *{{ .Prefix }}Controller) Close(ctx context.Context) { // For channels with parameters, they should be subscribed independently. func (c *{{ .Prefix }}Controller) SubscribeAll(ctx context.Context, as {{ .Prefix }}Subscriber) error { if as == nil { - return ErrNil{{ .Prefix }}Subscriber + return extensions.ErrNil{{ .Prefix }}Subscriber } {{range $key, $value := .SubscribeChannels -}} @@ -100,18 +102,11 @@ func (c *{{ .Prefix }}Controller) SubscribeAll(ctx context.Context, as {{ .Prefi // UnsubscribeAll will unsubscribe all remaining subscribed channels func (c *{{ .Prefix }}Controller) UnsubscribeAll(ctx context.Context) { - // Unsubscribe channels with no parameters (if any) {{- range $key, $value := .SubscribeChannels}} {{- if not .Parameters}} c.Unsubscribe{{namify $key}}(ctx) {{- end}} {{- end}} - - // Unsubscribe remaining channels - for n, stopChan := range c.stopSubscribers { - stopChan <- true - delete(c.stopSubscribers, n) - } } {{- end}} @@ -131,17 +126,17 @@ func (c *{{ $.Prefix }}Controller) Subscribe{{namify $key}}(ctx context.Context, // Set context ctx = add{{ $.Prefix }}ContextValues(ctx, path) - + // Check if there is already a subscription - _, exists := c.stopSubscribers[path] + _, exists := c.cancelChannels[path] if exists { - err := fmt.Errorf("%w: %q channel is already subscribed", ErrAlreadySubscribedChannel, path) + err := fmt.Errorf("%w: %q channel is already subscribed", extensions.ErrAlreadySubscribedChannel, path) c.logger.Error(ctx, err.Error()) return err } // Subscribe to broker channel - msgs, stop, err := c.broker.Subscribe(ctx, path) + msgs, cancel, err := c.broker.Subscribe(ctx, path) if err != nil { c.logger.Error(ctx, err.Error()) return err @@ -168,14 +163,14 @@ func (c *{{ $.Prefix }}Controller) Subscribe{{namify $key}}(ctx context.Context, // Add context msgCtx := context.WithValue(ctx, extensions.ContextKeyIsMessage, msg) msgCtx = context.WithValue(msgCtx, extensions.ContextKeyIsMessageDirection, "reception") - + {{if ne $value.GetChannelMessage.CorrelationIDLocation "" -}} // Add correlation ID to context if it exists if id := msg.CorrelationID(); id != "" { ctx = context.WithValue(ctx, extensions.ContextKeyIsCorrelationID, id) } {{- end}} - + // Process message if no error and still open if err == nil && open { // Execute middlewares with the callback @@ -191,8 +186,8 @@ func (c *{{ $.Prefix }}Controller) Subscribe{{namify $key}}(ctx context.Context, } } () - // Add the stop channel to the inside map - c.stopSubscribers[path] = stop + // Add the cancel channel to the inside map + c.cancelChannels[path] = cancel return nil } @@ -206,18 +201,21 @@ func (c *{{ $.Prefix }}Controller) Unsubscribe{{namify $key}}(ctx context.Contex // Get channel path path := {{ generateChannelPath $value }} - // Set context - ctx = add{{ $.Prefix }}ContextValues(ctx, path) - - // Get stop channel - stopChan, exists := c.stopSubscribers[path] + // Check if there subscribers for this channel + cancel, exists := c.cancelChannels[path] if !exists { return } - // Stop the channel and remove the entry - stopChan <- true - delete(c.stopSubscribers, path) + // Set context + ctx = add{{ $.Prefix }}ContextValues(ctx, path) + + // Stop the subscription and wait for its closure to be complete + cancel <- true + <- cancel + + // Remove if from the subscribers + delete(c.cancelChannels, path) c.logger.Info(ctx, "Unsubscribed from channel") } @@ -286,7 +284,7 @@ func (cc *UserController) WaitFor{{namify $key}}(ctx context.Context, publishMsg ctx = add{{ $.Prefix }}ContextValues(ctx, path) // Subscribe to broker channel - msgs, stop, err := cc.broker.Subscribe(ctx, path) + messages, cancel, err := cc.broker.Subscribe(ctx, path) if err != nil { cc.logger.Error(ctx, err.Error()) return {{channelToMessageTypeName $value}}{}, err @@ -295,8 +293,9 @@ func (cc *UserController) WaitFor{{namify $key}}(ctx context.Context, publishMsg // Close subscriber on leave defer func(){ - // Unsubscribe - stop <- true + // Stop the subscription and wait for its closure to be complete + cancel <- true + <- cancel // Logging unsubscribing cc.logger.Info(ctx, "Unsubscribed from channel") @@ -310,7 +309,7 @@ func (cc *UserController) WaitFor{{namify $key}}(ctx context.Context, publishMsg // Wait for corresponding response for { select { - case bMsg, open := <-msgs: + case bMsg, open := <-messages: // Get new message msg, err := new{{channelToMessageTypeName $value}}FromBrokerMessage(bMsg) if err != nil { @@ -333,11 +332,11 @@ func (cc *UserController) WaitFor{{namify $key}}(ctx context.Context, publishMsg return msg, nil } else if !open { // If message is invalid or not corresponding and the subscription is closed, then set corresponding error cc.logger.Error(ctx, "Channel closed before getting message") - return {{channelToMessageTypeName $value}}{}, ErrSubscriptionCanceled + return {{channelToMessageTypeName $value}}{}, extensions.ErrSubscriptionCanceled } case <-ctx.Done(): // Set corrsponding error if context is done cc.logger.Error(ctx, "Context done before getting message") - return {{channelToMessageTypeName $value}}{}, ErrContextCanceled + return {{channelToMessageTypeName $value}}{}, extensions.ErrContextCanceled } } } diff --git a/pkg/codegen/generators/templates/helpers.go b/pkg/codegen/generators/templates/helpers.go index 1585787b..940f3b25 100644 --- a/pkg/codegen/generators/templates/helpers.go +++ b/pkg/codegen/generators/templates/helpers.go @@ -94,7 +94,7 @@ func ReferenceToStructAttributePath(ref string) string { } // HasField will check if a struct has a field with the given name. -func HasField(v interface{}, name string) bool { +func HasField(v any, name string) bool { rv := reflect.ValueOf(v) if rv.Kind() == reflect.Ptr { rv = rv.Elem() diff --git a/pkg/codegen/generators/templates/imports.tmpl b/pkg/codegen/generators/templates/imports.tmpl index 5c846995..41fba115 100644 --- a/pkg/codegen/generators/templates/imports.tmpl +++ b/pkg/codegen/generators/templates/imports.tmpl @@ -9,7 +9,9 @@ import ( "errors" "fmt" "context" - + "encoding/binary" + "math" + "github.com/lerenn/asyncapi-codegen/pkg/extensions" "github.com/google/uuid" diff --git a/pkg/codegen/generators/templates/message.tmpl b/pkg/codegen/generators/templates/message.tmpl index 996394b5..010eb2e4 100644 --- a/pkg/codegen/generators/templates/message.tmpl +++ b/pkg/codegen/generators/templates/message.tmpl @@ -26,20 +26,47 @@ func New{{namify .Name}}Message() {{namify .Name}}Message { u := uuid.New().String() msg.{{referenceToStructAttributePath $.CorrelationIDLocation}} = {{if not $.CorrelationIDRequired}}&{{end}}u {{- end}} - + return msg } // new{{namify .Name}}MessageFromBrokerMessage will fill a new {{namify .Name}}Message with data from generic broker message func new{{namify .Name}}MessageFromBrokerMessage(bMsg extensions.BrokerMessage) ({{namify .Name}}Message, error) { var msg {{namify .Name}}Message - - // Unmarshal payload to expected message payload format - err := json.Unmarshal(bMsg.Payload, &msg.Payload) - if err != nil { - return msg, err - } - + + {{if eq .Payload.Type "string" -}} + // Convert to string + {{- if and .Payload.Format (or (eq .Payload.Format "date") (eq .Payload.Format "date-time"))}} + t, err := time.Parse(time.RFC3339, string(bMsg.Payload)) + if err != nil { + return {{namify .Name}}Message{}, err + } + msg.Payload = t + {{- else}} + msg.Payload = string(bMsg.Payload) + {{- end}} + {{else if eq .Payload.Type "integer" -}} + // Convert to integer + {{- if and .Payload.Format (eq .Payload.Format "int32")}} + msg.Payload = int32(binary.LittleEndian.Uint32(bMsg.Payload)) + {{- else}} + msg.Payload = int64(binary.LittleEndian.Uint64(bMsg.Payload)) + {{- end}} + {{else if eq .Payload.Type "number" -}} + // Convert to float + {{- if and .Payload.Format (eq .Payload.Format "float") -}} + msg.Payload = math.Float32frombits(binary.LittleEndian.Uint32(bMsg.Payload)) + {{- else}} + msg.Payload = math.Float64frombits(binary.LittleEndian.Uint64(bMsg.Payload)) + {{- end}} + {{else -}} + // Unmarshal payload to expected message payload format + err := json.Unmarshal(bMsg.Payload, &msg.Payload) + if err != nil { + return msg, err + } + {{- end}} + {{ if .Headers -}} // Get each headers from broker message for k, v := range bMsg.Headers { @@ -88,7 +115,7 @@ func new{{namify .Name}}MessageFromBrokerMessage(bMsg extensions.BrokerMessage) } } {{- end}} - + // TODO: run checks on msg type return msg, nil @@ -98,11 +125,37 @@ func new{{namify .Name}}MessageFromBrokerMessage(bMsg extensions.BrokerMessage) func (msg {{namify .Name}}Message) toBrokerMessage() (extensions.BrokerMessage, error) { // TODO: implement checks on message - // Marshal payload to JSON - payload, err := json.Marshal(msg.Payload) - if err != nil { - return extensions.BrokerMessage{}, err - } + {{if eq .Payload.Type "object" -}} + // Marshal payload to JSON + payload, err := json.Marshal(msg.Payload) + if err != nil { + return extensions.BrokerMessage{}, err + } + {{else if eq .Payload.Type "integer" -}} + // Convert to []byte{} + {{- if and .Payload.Format (eq .Payload.Format "int32")}} + payload := make([]byte, 4) + binary.BigEndian.PutUint32(payload, uint32(msg.Payload)) + {{- else}} + payload := make([]byte, 8) + binary.BigEndian.PutUint64(payload, uint64(msg.Payload)) + {{- end}} + {{else if eq .Payload.Type "number" -}} + // Convert to []byte{} + {{- if and .Payload.Format (eq .Payload.Format "float")}} + payload := make([]byte, 4) + binary.BigEndian.PutUint32(payload, math.Float32bits(msg.Payload)) + {{- else}} + payload := make([]byte, 8) + binary.BigEndian.PutUint64(payload, math.Float64bits(msg.Payload)) + {{- end}} + {{else if and (eq .Payload.Type "string") (and .Payload.Format (or (eq .Payload.Format "date") (eq .Payload.Format "date-time"))) -}} + // Convert to RFC3339 and to []byte + payload := []byte(msg.Payload.Format(time.RFC3339)) + {{else -}} + // Convert to []byte + payload := []byte(msg.Payload) + {{- end}} {{ if .Headers -}} // Add each headers to broker message diff --git a/pkg/codegen/generators/templates/schema.tmpl b/pkg/codegen/generators/templates/schema.tmpl index 8be759d9..d37c3eb0 100644 --- a/pkg/codegen/generators/templates/schema.tmpl +++ b/pkg/codegen/generators/templates/schema.tmpl @@ -24,7 +24,7 @@ bool {{- /* Type String */ -}} {{- else if eq .Type "string" -}} -{{- if or (eq .Format "date") (eq .Format "date-time") -}} +{{- if and .Format (or (eq .Format "date") (eq .Format "date-time")) -}} time.Time {{- else -}} string diff --git a/pkg/codegen/generators/templates/types.tmpl b/pkg/codegen/generators/templates/types.tmpl index 0c5cb4ff..c9d4d650 100644 --- a/pkg/codegen/generators/templates/types.tmpl +++ b/pkg/codegen/generators/templates/types.tmpl @@ -1,35 +1,11 @@ -var ( - // Generic error for AsyncAPI generated code - ErrAsyncAPI = errors.New("error when using AsyncAPI") - - // ErrContextCanceled is given when a given context is canceled - ErrContextCanceled = fmt.Errorf("%w: context canceled", ErrAsyncAPI) - - // ErrNilBrokerController is raised when a nil broker controller is user - ErrNilBrokerController = fmt.Errorf("%w: nil broker controller has been used", ErrAsyncAPI) - - // ErrNilAppSubscriber is raised when a nil app subscriber is user - ErrNilAppSubscriber = fmt.Errorf("%w: nil app subscriber has been used", ErrAsyncAPI) - - // ErrNilUserSubscriber is raised when a nil user subscriber is user - ErrNilUserSubscriber = fmt.Errorf("%w: nil user subscriber has been used", ErrAsyncAPI) - - // ErrAlreadySubscribedChannel is raised when a subscription is done twice - // or more without unsubscribing - ErrAlreadySubscribedChannel = fmt.Errorf("%w: the channel has already been subscribed", ErrAsyncAPI) - - // ErrSubscriptionCanceled is raised when expecting something and the subscription has been canceled before it happens - ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI) -) - // controller is the controller that will be used to communicate with the broker // It will be used internally by AppController and UserController type controller struct { // broker is the broker controller that will be used to communicate broker extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller + // cancelChannels is a map of cancel channels for each subscribed channel + cancelChannels map[string]chan any + // logger is the logger that will be used² to log operations on controller logger extensions.Logger // middlewares are the middlewares that will be executed when sending or // receiving messages @@ -116,5 +92,5 @@ func (t *{{namify $key}}Schema) UnmarshalJSON(data []byte) error { *t = {{namify $key}}Schema(timeFormat) return nil } -{{- end -}} +{{- end -}} {{end}} diff --git a/pkg/extensions/broker.go b/pkg/extensions/broker.go index c15127c1..ec9ca83c 100644 --- a/pkg/extensions/broker.go +++ b/pkg/extensions/broker.go @@ -17,5 +17,5 @@ type BrokerController interface { Publish(ctx context.Context, channel string, mw BrokerMessage) error // Subscribe to messages from the broker - Subscribe(ctx context.Context, channel string) (msgs chan BrokerMessage, stop chan interface{}, err error) + Subscribe(ctx context.Context, channel string) (messages chan BrokerMessage, cancel chan any, err error) } diff --git a/pkg/extensions/brokers/common.go b/pkg/extensions/brokers/common.go index 4de9321a..683bc264 100644 --- a/pkg/extensions/brokers/common.go +++ b/pkg/extensions/brokers/common.go @@ -3,4 +3,8 @@ package brokers const ( // DefaultQueueGroupID is the default queue name used by brokers. DefaultQueueGroupID = "asyncapi" + + // BrokerMessagesQueueSize is the size of the broker messages queue that + // will hold the messages processed from the broker to the universal format. + BrokerMessagesQueueSize = 64 ) diff --git a/pkg/extensions/brokers/kafka/kafka.go b/pkg/extensions/brokers/kafka/kafka.go index 600b396f..eca4c1c7 100644 --- a/pkg/extensions/brokers/kafka/kafka.go +++ b/pkg/extensions/brokers/kafka/kafka.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "time" "github.com/lerenn/asyncapi-codegen/pkg/extensions" @@ -13,11 +14,14 @@ import ( // Controller is the Kafka implementation for asyncapi-codegen. type Controller struct { - logger extensions.Logger - groupID string hosts []string partition int maxBytes int + + // Reception only + groupID string + + logger extensions.Logger } // ControllerOption is a function that can be used to configure a Kafka controller @@ -107,7 +111,7 @@ func (c *Controller) Publish(ctx context.Context, channel string, um extensions. // created, so let's retry if errors.Is(err, kafka.UnknownTopicOrPartition) { c.logger.Warning(ctx, fmt.Sprintf("Topic %s does not exists: waiting for creation and retry", channel)) - time.Sleep(100 * time.Millisecond) + time.Sleep(time.Second) continue } @@ -118,8 +122,8 @@ func (c *Controller) Publish(ctx context.Context, channel string, um extensions. // Subscribe to messages from the broker. func (c *Controller) Subscribe(ctx context.Context, channel string) ( - msgs chan extensions.BrokerMessage, - stop chan interface{}, + messages chan extensions.BrokerMessage, + cancel chan any, err error, ) { r := kafka.NewReader(kafka.ReaderConfig{ @@ -131,39 +135,53 @@ func (c *Controller) Subscribe(ctx context.Context, channel string) ( }) // Handle events - msgs = make(chan extensions.BrokerMessage, 64) - stop = make(chan interface{}, 1) + messages = make(chan extensions.BrokerMessage, brokers.BrokerMessagesQueueSize) + cancel = make(chan any, 1) go func() { for { - msg, err := r.ReadMessage(ctx) - if err != nil { - break - } - - // Get headers - headers := make(map[string][]byte, len(msg.Headers)) - for _, header := range msg.Headers { - headers[header.Key] = header.Value - } - - // Create message - msgs <- extensions.BrokerMessage{ - Headers: headers, - Payload: msg.Value, - } + c.messagesHandler(ctx, r, messages) } }() go func() { - // Handle closure request from function caller - for range stop { - c.logger.Info(ctx, "Stopping subscriber") - if err := r.Close(); err != nil && c.logger != nil { - c.logger.Error(ctx, err.Error()) - } - close(msgs) + // Wait for cancel request + <-cancel + + // Stopping the Kafka listener + if err := r.Close(); err != nil { + c.logger.Error(ctx, err.Error()) } + + // Close messages in order to avoid new messages + close(messages) + + // Close cancel to let listeners know that the cancellation is complete + close(cancel) }() - return msgs, stop, nil + return messages, cancel, nil +} + +func (c *Controller) messagesHandler(ctx context.Context, r *kafka.Reader, messages chan extensions.BrokerMessage) { + msg, err := r.ReadMessage(ctx) + if err != nil { + // If the error is not io.EOF, then it is a real error + if !errors.Is(err, io.EOF) { + c.logger.Warning(ctx, fmt.Sprintf("Error when reading message: %q", err.Error())) + } + + return + } + + // Get headers + headers := make(map[string][]byte, len(msg.Headers)) + for _, header := range msg.Headers { + headers[header.Key] = header.Value + } + + // Create message + messages <- extensions.BrokerMessage{ + Headers: headers, + Payload: msg.Value, + } } diff --git a/pkg/extensions/brokers/nats/nats.go b/pkg/extensions/brokers/nats/nats.go index 298c5395..c6e843eb 100644 --- a/pkg/extensions/brokers/nats/nats.go +++ b/pkg/extensions/brokers/nats/nats.go @@ -2,7 +2,6 @@ package nats import ( "context" - "errors" "github.com/lerenn/asyncapi-codegen/pkg/extensions" "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers" @@ -78,55 +77,53 @@ func (c *Controller) Publish(_ context.Context, channel string, bm extensions.Br // Subscribe to messages from the broker. func (c *Controller) Subscribe(ctx context.Context, channel string) ( - receivedBrokerMessages chan extensions.BrokerMessage, - cancelSubscription chan interface{}, + messages chan extensions.BrokerMessage, + cancel chan any, err error, ) { - // Subscribe to channel - natsChan := make(chan *nats.Msg, 64) - sub, err := c.connection.QueueSubscribeSyncWithChan(channel, c.queueGroup, natsChan) + // Initialize channels + messages = make(chan extensions.BrokerMessage, brokers.BrokerMessagesQueueSize) + cancel = make(chan any, 1) + + // Subscribe on subject + sub, err := c.connection.QueueSubscribe(channel, c.queueGroup, messagesHandler(messages)) if err != nil { return nil, nil, err } - // Handle events - receivedBrokerMessages = make(chan extensions.BrokerMessage, 64) - cancelSubscription = make(chan interface{}, 1) go func() { - for stop := false; !stop; { - select { - // If its a new message, then handle it - case msg := <-natsChan: - transferMessageToBroker(receivedBrokerMessages, msg) - // If its a closure request from function caller, then stop everything - case <-cancelSubscription: - stop = true - } - } + // Wait for cancel request + <-cancel - // Unsubscribe from channel - if err := sub.Unsubscribe(); err != nil && !errors.Is(err, nats.ErrConnectionClosed) && c.logger != nil { + // Drain the NATS subscription + if err := sub.Drain(); err != nil { c.logger.Error(ctx, err.Error()) } - close(receivedBrokerMessages) + // Close messages in order to avoid new messages + close(messages) + + // Close cancel to let listeners know that the cancellation is complete + close(cancel) }() - return receivedBrokerMessages, cancelSubscription, nil + return messages, cancel, nil } -func transferMessageToBroker(receivedBrokerMessages chan extensions.BrokerMessage, msg *nats.Msg) { - // Get headers - headers := make(map[string][]byte, len(msg.Header)) - for k, v := range msg.Header { - if len(v) > 0 { - headers[k] = []byte(v[0]) +func messagesHandler(messages chan extensions.BrokerMessage) nats.MsgHandler { + return func(msg *nats.Msg) { + // Get headers + headers := make(map[string][]byte, len(msg.Header)) + for k, v := range msg.Header { + if len(v) > 0 { + headers[k] = []byte(v[0]) + } } - } - // Create and transmit message to user - receivedBrokerMessages <- extensions.BrokerMessage{ - Headers: headers, - Payload: msg.Data, + // Create and transmit message to user + messages <- extensions.BrokerMessage{ + Headers: headers, + Payload: msg.Data, + } } } diff --git a/pkg/extensions/context.go b/pkg/extensions/context.go index 01c9d73f..76adf92c 100644 --- a/pkg/extensions/context.go +++ b/pkg/extensions/context.go @@ -10,6 +10,8 @@ const Prefix = "asyncapi-" type ContextKey string const ( + // ContextKeyIsVersion is the AsyncAPI specification version. + ContextKeyIsVersion ContextKey = Prefix + "version" // ContextKeyIsProvider is the name of the provider this data is coming from. // When coming from a generated user, it is `asyncapi`. ContextKeyIsProvider ContextKey = Prefix + "provider" @@ -31,7 +33,7 @@ func (k ContextKey) String() string { return string(k) } -// IfContextSetWith executes the function if the key is set in the context as a string. +// IfContextSetWith executes the function if the key is set in the context. func IfContextSetWith[T any](ctx context.Context, key ContextKey, fn func(value T)) { // Get value value := ctx.Value(key) @@ -45,6 +47,15 @@ func IfContextSetWith[T any](ctx context.Context, key ContextKey, fn func(value } } +// IfContextNotSetWith executes the function if the key is not set in the context. +func IfContextNotSetWith[T any](ctx context.Context, key ContextKey, fn func()) { + // Get value + value := ctx.Value(key) + if value == nil { + fn() + } +} + // IfContextValueEquals executes the function if the key is set in the context // as a given type and the value is equal to the expected value. func IfContextValueEquals[T comparable](ctx context.Context, key ContextKey, expected T, fn func()) { diff --git a/pkg/extensions/errors.go b/pkg/extensions/errors.go new file mode 100644 index 00000000..724b1606 --- /dev/null +++ b/pkg/extensions/errors.go @@ -0,0 +1,30 @@ +package extensions + +import ( + "errors" + "fmt" +) + +var ( + // ErrAsyncAPI is the generic error for AsyncAPI generated code. + ErrAsyncAPI = errors.New("error when using AsyncAPI") + + // ErrContextCanceled is given when a given context is canceled. + ErrContextCanceled = fmt.Errorf("%w: context canceled", ErrAsyncAPI) + + // ErrNilBrokerController is raised when a nil broker controller is user. + ErrNilBrokerController = fmt.Errorf("%w: nil broker controller has been used", ErrAsyncAPI) + + // ErrNilAppSubscriber is raised when a nil app subscriber is user. + ErrNilAppSubscriber = fmt.Errorf("%w: nil app subscriber has been used", ErrAsyncAPI) + + // ErrNilUserSubscriber is raised when a nil user subscriber is user. + ErrNilUserSubscriber = fmt.Errorf("%w: nil user subscriber has been used", ErrAsyncAPI) + + // ErrAlreadySubscribedChannel is raised when a subscription is done twice + // or more without unsubscribing. + ErrAlreadySubscribedChannel = fmt.Errorf("%w: the channel has already been subscribed", ErrAsyncAPI) + + // ErrSubscriptionCanceled is raised when expecting something and the subscription has been canceled before it happens. + ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI) +) diff --git a/pkg/extensions/logger.go b/pkg/extensions/logger.go index 2fbf27cf..31d1b721 100644 --- a/pkg/extensions/logger.go +++ b/pkg/extensions/logger.go @@ -5,7 +5,7 @@ import "context" // LogInfo is a key-value pair that will be added to the log. type LogInfo struct { Key string - Value interface{} + Value any } // Logger is the interface that must be implemented by a logger. diff --git a/pkg/extensions/versioning/broker_subscription.go b/pkg/extensions/versioning/broker_subscription.go new file mode 100644 index 00000000..27be961d --- /dev/null +++ b/pkg/extensions/versioning/broker_subscription.go @@ -0,0 +1,135 @@ +package versioning + +import ( + "context" + "fmt" + "sync" + + "github.com/lerenn/asyncapi-codegen/pkg/extensions" +) + +type brokerSubscription struct { + channelName string + messages chan extensions.BrokerMessage + cancel chan any + parent *Wrapper + + versionsChannels map[string]versionSubcription + versionsMutex sync.Mutex +} + +func newBrokerSubscription( + channelName string, + messages chan extensions.BrokerMessage, + cancel chan any, + parent *Wrapper, +) brokerSubscription { + return brokerSubscription{ + channelName: channelName, + messages: messages, + cancel: cancel, + parent: parent, + versionsChannels: make(map[string]versionSubcription), + } +} + +func (bs *brokerSubscription) createVersionListener(version string) (versionSubcription, error) { + // Lock the versions to avoid conflict + bs.versionsMutex.Lock() + defer bs.versionsMutex.Unlock() + + // Check if the version doesn't exist already + _, exists := bs.versionsChannels[version] + if exists { + return versionSubcription{}, extensions.ErrAlreadySubscribedChannel + } + + // Create the channels necessary + cbv := newVersionSubscription(version, bs) + bs.versionsChannels[version] = cbv + defer cbv.launchListener() + + return cbv, nil +} + +func (bs *brokerSubscription) removeVersionListener(vs *versionSubcription) { + // Lock the versions to avoid conflict + bs.versionsMutex.Lock() + defer bs.versionsMutex.Unlock() + + // Cleanup the channelsByVersion when leaving + // + // NOTE: this is important to make it cleanup at the end of this function as + // it should be cleanup AFTER the broker have been stopped (in case it was + // the last version listener), in order to let the caller knows that everything + // was cleaned up properly. + defer vs.closeChannels() + + // Remove the version from the channelsByBroker + delete(bs.versionsChannels, vs.version) + + // Lock the channels to avoid conflict + bs.parent.channelsMutex.Lock() + defer bs.parent.channelsMutex.Unlock() + + // If there is still version channels, do nothing + if len(bs.versionsChannels) > 0 { + return + } + + // Otherwise cancel the broker listener and wait for its closure + bs.cancel <- true + <-bs.cancel + + // Then delete the channelsByBroker from the Version Switch Wrapper + delete(bs.parent.channels, bs.channelName) +} + +func (bs *brokerSubscription) launchListener(ctx context.Context) { + go func() { + for { + // Wait for new messages + msg, open := <-bs.messages + if !open { + break + } + + // Get the version from the message + bVersion, exists := msg.Headers[bs.parent.versionHeaderKey] + version := string(bVersion) + + // Add default version if none is specified + if !exists || version == "" { + // If there is a default version activated, then go on with it + if bs.parent.defaultVersion != nil { + version = *bs.parent.defaultVersion + } else { + ctx = context.WithValue(ctx, extensions.ContextKeyIsBrokerMessage, msg) + bs.parent.logger.Error(ctx, "no version in the message and no default version") + continue + } + } + + // Lock the versions to avoid conflict + bs.versionsMutex.Lock() + + // Get the correct channel based on the version + ch, exists := bs.versionsChannels[version] + if !exists { + // Set context + ctx = context.WithValue(ctx, extensions.ContextKeyIsBrokerMessage, msg) + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, version) + + // Log the error + bs.parent.logger.Error(ctx, fmt.Sprintf("version %q is not registered", version)) + continue + } + + // Unlock the versions + bs.versionsMutex.Unlock() + + // Send the message to the correct channel + ch.messages <- msg + } + }() +} diff --git a/pkg/extensions/versioning/version_subscription.go b/pkg/extensions/versioning/version_subscription.go new file mode 100644 index 00000000..8795aeb8 --- /dev/null +++ b/pkg/extensions/versioning/version_subscription.go @@ -0,0 +1,40 @@ +package versioning + +import ( + "github.com/lerenn/asyncapi-codegen/pkg/extensions" + "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers" +) + +type versionSubcription struct { + version string + messages chan extensions.BrokerMessage + cancel chan any + parent *brokerSubscription +} + +func newVersionSubscription(version string, parent *brokerSubscription) versionSubcription { + return versionSubcription{ + version: version, + messages: make(chan extensions.BrokerMessage, brokers.BrokerMessagesQueueSize), + cancel: make(chan any, 1), + parent: parent, + } +} + +func (vs *versionSubcription) launchListener() { + go func() { + // Wait to receive cancel + <-vs.cancel + + // When cancel is received, then remove version listener + vs.parent.removeVersionListener(vs) + }() +} + +func (vs *versionSubcription) closeChannels() { + // Receiving no more messages + close(vs.messages) + + // Closing cancel channel to let caller knows that everything is cleaned up + close(vs.cancel) +} diff --git a/pkg/extensions/versioning/wrapper.go b/pkg/extensions/versioning/wrapper.go new file mode 100644 index 00000000..8798f29a --- /dev/null +++ b/pkg/extensions/versioning/wrapper.go @@ -0,0 +1,138 @@ +package versioning + +import ( + "context" + "fmt" + "sync" + + "github.com/lerenn/asyncapi-codegen/pkg/extensions" +) + +var _ extensions.BrokerController = (*Wrapper)(nil) + +// DefaultVersionHeaderKey is the field that will be added to a message to get the version. +const DefaultVersionHeaderKey = "application-version" + +var ( + // ErrNoVersion happens when there is no version in the context or the message. + ErrNoVersion = fmt.Errorf("%w: no version present", extensions.ErrAsyncAPI) +) + +// Wrapper allows to use multiple version of the same App/User Controllers +// on one Broker Controller in order to handle migrations. +type Wrapper struct { + broker extensions.BrokerController + logger extensions.Logger + defaultVersion *string + versionHeaderKey string + + channels map[string]*brokerSubscription + channelsMutex sync.Mutex +} + +// WrapperOption adds an option to Version Wrapper. +type WrapperOption func(versionWrapper *Wrapper) + +// NewWrapper creates a Version Wrapper around a Broker Controller. +func NewWrapper(broker extensions.BrokerController, options ...WrapperOption) *Wrapper { + // Create version Wrapper + w := Wrapper{ + broker: broker, + channels: make(map[string]*brokerSubscription), + logger: extensions.DummyLogger{}, + versionHeaderKey: DefaultVersionHeaderKey, + } + + // Execute options + for _, option := range options { + option(&w) + } + + return &w +} + +// WithLogger lets add a logger to the Wrapper struct. +func WithLogger(logger extensions.Logger) WrapperOption { + return func(wrapper *Wrapper) { + wrapper.logger = logger + } +} + +// WithDefaultVersion lets add a default version to tag messages that don't have +// versions tagged. +func WithDefaultVersion(version string) WrapperOption { + return func(wrapper *Wrapper) { + wrapper.defaultVersion = &version + } +} + +// WithVersionHeaderKey lets use a different version header key to add application +// version to published messages and retrieve it from received messages. +func WithVersionHeaderKey(headerKey string) WrapperOption { + return func(wrapper *Wrapper) { + wrapper.versionHeaderKey = headerKey + } +} + +// Publish a message to the broker. +func (w *Wrapper) Publish(ctx context.Context, channel string, mw extensions.BrokerMessage) error { + // Add version to message + extensions.IfContextSetWith(ctx, extensions.ContextKeyIsVersion, func(version string) { + mw.Headers[w.versionHeaderKey] = []byte(version) + }) + + // Send message + return w.broker.Publish(ctx, channel, mw) +} + +// Subscribe to messages from the broker. +func (w *Wrapper) Subscribe(ctx context.Context, channel string) ( + messages chan extensions.BrokerMessage, + cancel chan any, + err error, +) { + // Set context + ctx = context.WithValue(ctx, extensions.ContextKeyIsMessageDirection, "reception") + ctx = context.WithValue(ctx, extensions.ContextKeyIsChannel, channel) + + // Get version + var version string + extensions.IfContextSetWith(ctx, extensions.ContextKeyIsVersion, func(v string) { version = v }) + if version == "" { + return nil, nil, ErrNoVersion + } + + // Lock the channels to avoid conflict + w.channelsMutex.Lock() + defer w.channelsMutex.Unlock() + + // Check if the broker channel already exists + brokerChannel, exists := w.channels[channel] + if !exists { + cbb, err := w.createBrokerChannels(ctx, channel) + if err != nil { + return nil, nil, err + } + defer cbb.launchListener(ctx) + brokerChannel = cbb + } + + // Check if the version already exists + cbv, err := brokerChannel.createVersionListener(version) + + return cbv.messages, cbv.cancel, err +} + +func (w *Wrapper) createBrokerChannels(ctx context.Context, channel string) (*brokerSubscription, error) { + // Subscribe to broker + messages, cancel, err := w.broker.Subscribe(ctx, channel) + if err != nil { + return nil, err + } + + // Add channels from broker to brokerChannels + cbb := newBrokerSubscription(channel, messages, cancel, w) + w.channels[channel] = &cbb // Already locked in parent function + + return &cbb, nil +} diff --git a/test/brokers.go b/test/brokers.go index 54398746..f79793e1 100644 --- a/test/brokers.go +++ b/test/brokers.go @@ -1,7 +1,9 @@ package asyncapi_test import ( + "fmt" "testing" + "time" "github.com/lerenn/asyncapi-codegen/pkg/extensions" "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/kafka" @@ -10,11 +12,14 @@ import ( // BrokerControllers returns a list of BrokerController to test based on the // docker-compose file of the project. -func BrokerControllers(t *testing.T) []extensions.BrokerController { +func BrokerControllers(t *testing.T) map[string]extensions.BrokerController { t.Helper() // Set this function as a helper - return []extensions.BrokerController{ - nats.NewController("nats://localhost:4222"), - kafka.NewController([]string{"localhost:9094"}), + // Set a specific queueGroupeID to avoid collision between tests + queueGroupID := fmt.Sprintf("test-%d", time.Now().UnixNano()) + + return map[string]extensions.BrokerController{ + "NATS": nats.NewController("nats://localhost:4222", nats.WithQueueGroup(queueGroupID)), + "Kafka": kafka.NewController([]string{"localhost:9094"}, kafka.WithGroupID(queueGroupID)), } } diff --git a/test/issues/49/asyncapi.gen.go b/test/issues/49/asyncapi.gen.go index ee6a1383..074f340c 100644 --- a/test/issues/49/asyncapi.gen.go +++ b/test/issues/49/asyncapi.gen.go @@ -5,8 +5,6 @@ package issue49 import ( "context" - "encoding/json" - "errors" "fmt" "github.com/lerenn/asyncapi-codegen/pkg/extensions" @@ -28,15 +26,15 @@ type AppController struct { func NewAppController(bc extensions.BrokerController, options ...ControllerOption) (*AppController, error) { // Check if broker controller has been provided if bc == nil { - return nil, ErrNilBrokerController + return nil, extensions.ErrNilBrokerController } // Create default controller controller := controller{ - broker: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), + broker: bc, + cancelChannels: make(map[string]chan any), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), } // Apply options @@ -86,6 +84,7 @@ func (c AppController) executeMiddlewares(ctx context.Context, callback func(ctx } func addAppContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.0.0") ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "app") return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) } @@ -94,6 +93,7 @@ func addAppContextValues(ctx context.Context, path string) context.Context { func (c *AppController) Close(ctx context.Context) { // Unsubscribing remaining channels c.UnsubscribeAll(ctx) + c.logger.Info(ctx, "Closed app controller") } @@ -101,7 +101,7 @@ func (c *AppController) Close(ctx context.Context) { // For channels with parameters, they should be subscribed independently. func (c *AppController) SubscribeAll(ctx context.Context, as AppSubscriber) error { if as == nil { - return ErrNilAppSubscriber + return extensions.ErrNilAppSubscriber } if err := c.SubscribeChat(ctx, as.Chat); err != nil { @@ -113,14 +113,7 @@ func (c *AppController) SubscribeAll(ctx context.Context, as AppSubscriber) erro // UnsubscribeAll will unsubscribe all remaining subscribed channels func (c *AppController) UnsubscribeAll(ctx context.Context) { - // Unsubscribe channels with no parameters (if any) c.UnsubscribeChat(ctx) - - // Unsubscribe remaining channels - for n, stopChan := range c.stopSubscribers { - stopChan <- true - delete(c.stopSubscribers, n) - } } // SubscribeChat will subscribe to new messages from '/chat' channel. @@ -136,15 +129,15 @@ func (c *AppController) SubscribeChat(ctx context.Context, fn func(ctx context.C ctx = addAppContextValues(ctx, path) // Check if there is already a subscription - _, exists := c.stopSubscribers[path] + _, exists := c.cancelChannels[path] if exists { - err := fmt.Errorf("%w: %q channel is already subscribed", ErrAlreadySubscribedChannel, path) + err := fmt.Errorf("%w: %q channel is already subscribed", extensions.ErrAlreadySubscribedChannel, path) c.logger.Error(ctx, err.Error()) return err } // Subscribe to broker channel - msgs, stop, err := c.broker.Subscribe(ctx, path) + msgs, cancel, err := c.broker.Subscribe(ctx, path) if err != nil { c.logger.Error(ctx, err.Error()) return err @@ -185,8 +178,8 @@ func (c *AppController) SubscribeChat(ctx context.Context, fn func(ctx context.C } }() - // Add the stop channel to the inside map - c.stopSubscribers[path] = stop + // Add the cancel channel to the inside map + c.cancelChannels[path] = cancel return nil } @@ -196,18 +189,21 @@ func (c *AppController) UnsubscribeChat(ctx context.Context) { // Get channel path path := "/chat" - // Set context - ctx = addAppContextValues(ctx, path) - - // Get stop channel - stopChan, exists := c.stopSubscribers[path] + // Check if there subscribers for this channel + cancel, exists := c.cancelChannels[path] if !exists { return } - // Stop the channel and remove the entry - stopChan <- true - delete(c.stopSubscribers, path) + // Set context + ctx = addAppContextValues(ctx, path) + + // Stop the subscription and wait for its closure to be complete + cancel <- true + <-cancel + + // Remove if from the subscribers + delete(c.cancelChannels, path) c.logger.Info(ctx, "Unsubscribed from channel") } @@ -287,15 +283,15 @@ type UserController struct { func NewUserController(bc extensions.BrokerController, options ...ControllerOption) (*UserController, error) { // Check if broker controller has been provided if bc == nil { - return nil, ErrNilBrokerController + return nil, extensions.ErrNilBrokerController } // Create default controller controller := controller{ - broker: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), + broker: bc, + cancelChannels: make(map[string]chan any), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), } // Apply options @@ -345,6 +341,7 @@ func (c UserController) executeMiddlewares(ctx context.Context, callback func(ct } func addUserContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.0.0") ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "user") return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) } @@ -353,6 +350,7 @@ func addUserContextValues(ctx context.Context, path string) context.Context { func (c *UserController) Close(ctx context.Context) { // Unsubscribing remaining channels c.UnsubscribeAll(ctx) + c.logger.Info(ctx, "Closed user controller") } @@ -360,7 +358,7 @@ func (c *UserController) Close(ctx context.Context) { // For channels with parameters, they should be subscribed independently. func (c *UserController) SubscribeAll(ctx context.Context, as UserSubscriber) error { if as == nil { - return ErrNilUserSubscriber + return extensions.ErrNilUserSubscriber } if err := c.SubscribeChat(ctx, as.Chat); err != nil { @@ -375,15 +373,8 @@ func (c *UserController) SubscribeAll(ctx context.Context, as UserSubscriber) er // UnsubscribeAll will unsubscribe all remaining subscribed channels func (c *UserController) UnsubscribeAll(ctx context.Context) { - // Unsubscribe channels with no parameters (if any) c.UnsubscribeChat(ctx) c.UnsubscribeStatus(ctx) - - // Unsubscribe remaining channels - for n, stopChan := range c.stopSubscribers { - stopChan <- true - delete(c.stopSubscribers, n) - } } // SubscribeChat will subscribe to new messages from '/chat' channel. @@ -399,15 +390,15 @@ func (c *UserController) SubscribeChat(ctx context.Context, fn func(ctx context. ctx = addUserContextValues(ctx, path) // Check if there is already a subscription - _, exists := c.stopSubscribers[path] + _, exists := c.cancelChannels[path] if exists { - err := fmt.Errorf("%w: %q channel is already subscribed", ErrAlreadySubscribedChannel, path) + err := fmt.Errorf("%w: %q channel is already subscribed", extensions.ErrAlreadySubscribedChannel, path) c.logger.Error(ctx, err.Error()) return err } // Subscribe to broker channel - msgs, stop, err := c.broker.Subscribe(ctx, path) + msgs, cancel, err := c.broker.Subscribe(ctx, path) if err != nil { c.logger.Error(ctx, err.Error()) return err @@ -448,8 +439,8 @@ func (c *UserController) SubscribeChat(ctx context.Context, fn func(ctx context. } }() - // Add the stop channel to the inside map - c.stopSubscribers[path] = stop + // Add the cancel channel to the inside map + c.cancelChannels[path] = cancel return nil } @@ -459,18 +450,21 @@ func (c *UserController) UnsubscribeChat(ctx context.Context) { // Get channel path path := "/chat" - // Set context - ctx = addUserContextValues(ctx, path) - - // Get stop channel - stopChan, exists := c.stopSubscribers[path] + // Check if there subscribers for this channel + cancel, exists := c.cancelChannels[path] if !exists { return } - // Stop the channel and remove the entry - stopChan <- true - delete(c.stopSubscribers, path) + // Set context + ctx = addUserContextValues(ctx, path) + + // Stop the subscription and wait for its closure to be complete + cancel <- true + <-cancel + + // Remove if from the subscribers + delete(c.cancelChannels, path) c.logger.Info(ctx, "Unsubscribed from channel") } // SubscribeStatus will subscribe to new messages from '/status' channel. @@ -485,15 +479,15 @@ func (c *UserController) SubscribeStatus(ctx context.Context, fn func(ctx contex ctx = addUserContextValues(ctx, path) // Check if there is already a subscription - _, exists := c.stopSubscribers[path] + _, exists := c.cancelChannels[path] if exists { - err := fmt.Errorf("%w: %q channel is already subscribed", ErrAlreadySubscribedChannel, path) + err := fmt.Errorf("%w: %q channel is already subscribed", extensions.ErrAlreadySubscribedChannel, path) c.logger.Error(ctx, err.Error()) return err } // Subscribe to broker channel - msgs, stop, err := c.broker.Subscribe(ctx, path) + msgs, cancel, err := c.broker.Subscribe(ctx, path) if err != nil { c.logger.Error(ctx, err.Error()) return err @@ -534,8 +528,8 @@ func (c *UserController) SubscribeStatus(ctx context.Context, fn func(ctx contex } }() - // Add the stop channel to the inside map - c.stopSubscribers[path] = stop + // Add the cancel channel to the inside map + c.cancelChannels[path] = cancel return nil } @@ -545,18 +539,21 @@ func (c *UserController) UnsubscribeStatus(ctx context.Context) { // Get channel path path := "/status" - // Set context - ctx = addUserContextValues(ctx, path) - - // Get stop channel - stopChan, exists := c.stopSubscribers[path] + // Check if there subscribers for this channel + cancel, exists := c.cancelChannels[path] if !exists { return } - // Stop the channel and remove the entry - stopChan <- true - delete(c.stopSubscribers, path) + // Set context + ctx = addUserContextValues(ctx, path) + + // Stop the subscription and wait for its closure to be complete + cancel <- true + <-cancel + + // Remove if from the subscribers + delete(c.cancelChannels, path) c.logger.Info(ctx, "Unsubscribed from channel") } @@ -589,38 +586,14 @@ func (c *UserController) PublishChat(ctx context.Context, msg ChatMessage) error return err } -var ( - // Generic error for AsyncAPI generated code - ErrAsyncAPI = errors.New("error when using AsyncAPI") - - // ErrContextCanceled is given when a given context is canceled - ErrContextCanceled = fmt.Errorf("%w: context canceled", ErrAsyncAPI) - - // ErrNilBrokerController is raised when a nil broker controller is user - ErrNilBrokerController = fmt.Errorf("%w: nil broker controller has been used", ErrAsyncAPI) - - // ErrNilAppSubscriber is raised when a nil app subscriber is user - ErrNilAppSubscriber = fmt.Errorf("%w: nil app subscriber has been used", ErrAsyncAPI) - - // ErrNilUserSubscriber is raised when a nil user subscriber is user - ErrNilUserSubscriber = fmt.Errorf("%w: nil user subscriber has been used", ErrAsyncAPI) - - // ErrAlreadySubscribedChannel is raised when a subscription is done twice - // or more without unsubscribing - ErrAlreadySubscribedChannel = fmt.Errorf("%w: the channel has already been subscribed", ErrAsyncAPI) - - // ErrSubscriptionCanceled is raised when expecting something and the subscription has been canceled before it happens - ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI) -) - // controller is the controller that will be used to communicate with the broker // It will be used internally by AppController and UserController type controller struct { // broker is the broker controller that will be used to communicate broker extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller + // cancelChannels is a map of cancel channels for each subscribed channel + cancelChannels map[string]chan any + // logger is the logger that will be used² to log operations on controller logger extensions.Logger // middlewares are the middlewares that will be executed when sending or // receiving messages @@ -675,11 +648,8 @@ func NewChatMessage() ChatMessage { func newChatMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (ChatMessage, error) { var msg ChatMessage - // Unmarshal payload to expected message payload format - err := json.Unmarshal(bMsg.Payload, &msg.Payload) - if err != nil { - return msg, err - } + // Convert to string + msg.Payload = string(bMsg.Payload) // TODO: run checks on msg type @@ -690,11 +660,8 @@ func newChatMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (ChatMessage func (msg ChatMessage) toBrokerMessage() (extensions.BrokerMessage, error) { // TODO: implement checks on message - // Marshal payload to JSON - payload, err := json.Marshal(msg.Payload) - if err != nil { - return extensions.BrokerMessage{}, err - } + // Convert to []byte + payload := []byte(msg.Payload) // There is no headers here headers := make(map[string][]byte, 0) @@ -721,11 +688,8 @@ func NewMessage() Message { func newMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (Message, error) { var msg Message - // Unmarshal payload to expected message payload format - err := json.Unmarshal(bMsg.Payload, &msg.Payload) - if err != nil { - return msg, err - } + // Convert to string + msg.Payload = string(bMsg.Payload) // TODO: run checks on msg type @@ -736,11 +700,8 @@ func newMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (Message, error) func (msg Message) toBrokerMessage() (extensions.BrokerMessage, error) { // TODO: implement checks on message - // Marshal payload to JSON - payload, err := json.Marshal(msg.Payload) - if err != nil { - return extensions.BrokerMessage{}, err - } + // Convert to []byte + payload := []byte(msg.Payload) // There is no headers here headers := make(map[string][]byte, 0) @@ -767,11 +728,8 @@ func NewStatusMessage() StatusMessage { func newStatusMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (StatusMessage, error) { var msg StatusMessage - // Unmarshal payload to expected message payload format - err := json.Unmarshal(bMsg.Payload, &msg.Payload) - if err != nil { - return msg, err - } + // Convert to string + msg.Payload = string(bMsg.Payload) // TODO: run checks on msg type @@ -782,11 +740,8 @@ func newStatusMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (StatusMes func (msg StatusMessage) toBrokerMessage() (extensions.BrokerMessage, error) { // TODO: implement checks on message - // Marshal payload to JSON - payload, err := json.Marshal(msg.Payload) - if err != nil { - return extensions.BrokerMessage{}, err - } + // Convert to []byte + payload := []byte(msg.Payload) // There is no headers here headers := make(map[string][]byte, 0) diff --git a/test/issues/73/asyncapi-1.0.0.yaml b/test/issues/73/asyncapi-1.0.0.yaml new file mode 100644 index 00000000..a5ff3bef --- /dev/null +++ b/test/issues/73/asyncapi-1.0.0.yaml @@ -0,0 +1,10 @@ +asyncapi: 2.6.0 +info: + title: Hello world application V1 + version: '1.0.0' +channels: + hello: + publish: + message: + payload: + type: string diff --git a/test/issues/73/asyncapi-2.0.0.yaml b/test/issues/73/asyncapi-2.0.0.yaml new file mode 100644 index 00000000..9d76f133 --- /dev/null +++ b/test/issues/73/asyncapi-2.0.0.yaml @@ -0,0 +1,21 @@ +asyncapi: 2.6.0 +info: + title: Hello world application V2 + version: '2.0.0' +channels: + hello: + publish: + message: + payload: + type: object + required: + - timestamp + - message + properties: + timestamp: + type: string + format: date-time + example: '2018-11-21T15:00:00Z' + message: + type: string + example: Hello world! diff --git a/test/issues/73/suite_test.go b/test/issues/73/suite_test.go new file mode 100644 index 00000000..55ab26e1 --- /dev/null +++ b/test/issues/73/suite_test.go @@ -0,0 +1,181 @@ +//go:generate go run ../../../cmd/asyncapi-codegen -p v1 -i ./asyncapi-1.0.0.yaml -o ./v1/asyncapi.gen.go +//go:generate go run ../../../cmd/asyncapi-codegen -p v2 -i ./asyncapi-2.0.0.yaml -o ./v2/asyncapi.gen.go + +package issue73 + +import ( + "context" + "encoding/json" + "sync" + "testing" + "time" + + "github.com/lerenn/asyncapi-codegen/pkg/extensions" + "github.com/lerenn/asyncapi-codegen/pkg/extensions/middlewares" + "github.com/lerenn/asyncapi-codegen/pkg/extensions/versioning" + "github.com/lerenn/asyncapi-codegen/pkg/utils" + asyncapi_test "github.com/lerenn/asyncapi-codegen/test" + v1 "github.com/lerenn/asyncapi-codegen/test/issues/73/v1" + v2 "github.com/lerenn/asyncapi-codegen/test/issues/73/v2" + "github.com/stretchr/testify/suite" +) + +func TestSuite(t *testing.T) { + brokers := asyncapi_test.BrokerControllers(t) + + for _, b := range brokers { + suite.Run(t, NewSuite(b)) + } +} + +type Suite struct { + broker extensions.BrokerController + v1 struct { + app *v1.AppController + user *v1.UserController + } + v2 struct { + app *v2.AppController + user *v2.UserController + } + interceptor chan extensions.BrokerMessage + + suite.Suite +} + +func NewSuite(broker extensions.BrokerController) *Suite { + return &Suite{ + broker: broker, + } +} + +func (suite *Suite) SetupTest() { + // Create a channel to intercept message before sending to broker and after + // reception from broker + suite.interceptor = make(chan extensions.BrokerMessage, 8) + + // Add a version wrapper to the broker + vw := versioning.NewWrapper(suite.broker) + + // Create v1 appV1 + appV1, err := v1.NewAppController(vw, v1.WithMiddlewares(middlewares.Intercepter(suite.interceptor))) + suite.Require().NoError(err) + suite.v1.app = appV1 + + // Create v1 userV1 + userV1, err := v1.NewUserController(vw, v1.WithMiddlewares(middlewares.Intercepter(suite.interceptor))) + suite.Require().NoError(err) + suite.v1.user = userV1 + + // Create v2 app + appV2, err := v2.NewAppController(vw, v2.WithMiddlewares(middlewares.Intercepter(suite.interceptor))) + suite.Require().NoError(err) + suite.v2.app = appV2 + + // Create v2 user + userV2, err := v2.NewUserController(vw, v2.WithMiddlewares(middlewares.Intercepter(suite.interceptor))) + suite.Require().NoError(err) + suite.v2.user = userV2 +} + +func (suite *Suite) TearDownTest() { + suite.v1.app.Close(context.Background()) + suite.v1.user.Close(context.Background()) + suite.v2.app.Close(context.Background()) + suite.v2.user.Close(context.Background()) + close(suite.interceptor) +} + +func (suite *Suite) TestV1Reception() { + var wg sync.WaitGroup + + // Expected message + sent := v1.HelloMessage{ + Payload: "HelloWord!", + } + + // Check what the app receive and translate + var recvMsg v1.HelloMessage + wg.Add(1) + err := suite.v1.app.SubscribeHello(context.Background(), func(_ context.Context, msg v1.HelloMessage, _ bool) { + recvMsg = msg + wg.Done() + }) + suite.Require().NoError(err) + + // Check that the other app doesn't receive + err = suite.v2.app.SubscribeHello(context.Background(), func(_ context.Context, _ v2.HelloMessage, _ bool) { + suite.Require().FailNow("this should not happen") + }) + suite.Require().NoError(err) + + // Publish the message + err = suite.v1.user.PublishHello(context.Background(), sent) + suite.Require().NoError(err) + + // Wait for the message to be received by the app + wg.Wait() + + // Check received message + suite.Require().Equal(sent, recvMsg) + + // Check sent message to broker + bMsg := <-suite.interceptor + + // Check payload + suite.Require().Equal([]byte("HelloWord!"), bMsg.Payload) +} + +func (suite *Suite) TestV2Reception() { + var wg sync.WaitGroup + + // Expected message + sent := v2.HelloMessage{ + Payload: struct { + Message string `json:"message"` + Timestamp time.Time `json:"timestamp"` + }{ + Message: "HelloWord!", + Timestamp: utils.Must(time.Parse(time.RFC3339, "2020-01-01T00:00:00Z")).UTC(), + }, + } + + // Check that the other app doesn't receive + err := suite.v1.app.SubscribeHello(context.Background(), func(_ context.Context, _ v1.HelloMessage, _ bool) { + suite.Require().FailNow("this should not happen") + }) + suite.Require().NoError(err) + + // Check what the app receive and translate + var recvMsg v2.HelloMessage + wg.Add(1) + err = suite.v2.app.SubscribeHello(context.Background(), func(_ context.Context, msg v2.HelloMessage, _ bool) { + recvMsg = msg + wg.Done() + }) + suite.Require().NoError(err) + + // Publish the message + err = suite.v2.user.PublishHello(context.Background(), sent) + suite.Require().NoError(err) + + // Wait for the message to be received by the app + wg.Wait() + + // Check received message + suite.Require().Equal(sent, recvMsg) + + // Check sent message to broker + bMsg := <-suite.interceptor + + // Check payload + var p struct { + Message string `json:"message"` + Timestamp time.Time `json:"timestamp"` + } + suite.Require().NoError(json.Unmarshal(bMsg.Payload, &p)) + suite.Require().Equal("HelloWord!", p.Message) + + expected := utils.Must(time.Parse(time.RFC3339, "2020-01-01T00:00:00Z")).UTC() + suite.Require().WithinDuration(expected, p.Timestamp, time.Millisecond) +} diff --git a/test/issues/73/v1/asyncapi.gen.go b/test/issues/73/v1/asyncapi.gen.go new file mode 100644 index 00000000..d81ef0bc --- /dev/null +++ b/test/issues/73/v1/asyncapi.gen.go @@ -0,0 +1,401 @@ +// Package "v1" provides primitives to interact with the AsyncAPI specification. +// +// Code generated by github.com/lerenn/asyncapi-codegen version (devel) DO NOT EDIT. +package v1 + +import ( + "context" + "fmt" + + "github.com/lerenn/asyncapi-codegen/pkg/extensions" +) + +// AppSubscriber represents all handlers that are expecting messages for App +type AppSubscriber interface { + // Hello subscribes to messages placed on the 'hello' channel + Hello(ctx context.Context, msg HelloMessage, done bool) +} + +// AppController is the structure that provides publishing capabilities to the +// developer and and connect the broker with the App +type AppController struct { + controller +} + +// NewAppController links the App to the broker +func NewAppController(bc extensions.BrokerController, options ...ControllerOption) (*AppController, error) { + // Check if broker controller has been provided + if bc == nil { + return nil, extensions.ErrNilBrokerController + } + + // Create default controller + controller := controller{ + broker: bc, + cancelChannels: make(map[string]chan any), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + } + + // Apply options + for _, option := range options { + option(&controller) + } + + return &AppController{controller: controller}, nil +} + +func (c AppController) wrapMiddlewares(middlewares []extensions.Middleware, last extensions.NextMiddleware) func(ctx context.Context) { + var called bool + + // If there is no more middleware + if len(middlewares) == 0 { + return func(ctx context.Context) { + if !called { + called = true + last(ctx) + } + } + } + + // Wrap middleware into a check function that will call execute the middleware + // and call the next wrapped middleware if the returned function has not been + // called already + next := c.wrapMiddlewares(middlewares[1:], last) + return func(ctx context.Context) { + // Call the middleware and the following if it has not been done already + if !called { + called = true + ctx = middlewares[0](ctx, next) + + // If next has already been called in middleware, it should not be + // executed again + next(ctx) + } + } +} + +func (c AppController) executeMiddlewares(ctx context.Context, callback func(ctx context.Context)) { + // Wrap middleware to have 'next' function when calling them + wrapped := c.wrapMiddlewares(c.middlewares, callback) + + // Execute wrapped middlewares + wrapped(ctx) +} + +func addAppContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.0.0") + ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "app") + return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) +} + +// Close will clean up any existing resources on the controller +func (c *AppController) Close(ctx context.Context) { + // Unsubscribing remaining channels + c.UnsubscribeAll(ctx) + + c.logger.Info(ctx, "Closed app controller") +} + +// SubscribeAll will subscribe to channels without parameters on which the app is expecting messages. +// For channels with parameters, they should be subscribed independently. +func (c *AppController) SubscribeAll(ctx context.Context, as AppSubscriber) error { + if as == nil { + return extensions.ErrNilAppSubscriber + } + + if err := c.SubscribeHello(ctx, as.Hello); err != nil { + return err + } + + return nil +} + +// UnsubscribeAll will unsubscribe all remaining subscribed channels +func (c *AppController) UnsubscribeAll(ctx context.Context) { + c.UnsubscribeHello(ctx) +} + +// SubscribeHello will subscribe to new messages from 'hello' channel. +// +// Callback function 'fn' will be called each time a new message is received. +// The 'done' argument indicates when the subscription is canceled and can be +// used to clean up resources. +func (c *AppController) SubscribeHello(ctx context.Context, fn func(ctx context.Context, msg HelloMessage, done bool)) error { + // Get channel path + path := "hello" + + // Set context + ctx = addAppContextValues(ctx, path) + + // Check if there is already a subscription + _, exists := c.cancelChannels[path] + if exists { + err := fmt.Errorf("%w: %q channel is already subscribed", extensions.ErrAlreadySubscribedChannel, path) + c.logger.Error(ctx, err.Error()) + return err + } + + // Subscribe to broker channel + msgs, cancel, err := c.broker.Subscribe(ctx, path) + if err != nil { + c.logger.Error(ctx, err.Error()) + return err + } + c.logger.Info(ctx, "Subscribed to channel") + + // Asynchronously listen to new messages and pass them to app subscriber + go func() { + for { + // Wait for next message + bMsg, open := <-msgs + + // Set broker message to context + ctx = context.WithValue(ctx, extensions.ContextKeyIsBrokerMessage, bMsg) + + // Process message + msg, err := newHelloMessageFromBrokerMessage(bMsg) + if err != nil { + c.logger.Error(ctx, err.Error()) + } + + // Add context + msgCtx := context.WithValue(ctx, extensions.ContextKeyIsMessage, msg) + msgCtx = context.WithValue(msgCtx, extensions.ContextKeyIsMessageDirection, "reception") + + // Process message if no error and still open + if err == nil && open { + // Execute middlewares with the callback + c.executeMiddlewares(msgCtx, func(ctx context.Context) { + fn(ctx, msg, !open) + }) + } + + // If subscription is closed, then exit the function + if !open { + return + } + } + }() + + // Add the cancel channel to the inside map + c.cancelChannels[path] = cancel + + return nil +} + +// UnsubscribeHello will unsubscribe messages from 'hello' channel +func (c *AppController) UnsubscribeHello(ctx context.Context) { + // Get channel path + path := "hello" + + // Check if there subscribers for this channel + cancel, exists := c.cancelChannels[path] + if !exists { + return + } + + // Set context + ctx = addAppContextValues(ctx, path) + + // Stop the subscription and wait for its closure to be complete + cancel <- true + <-cancel + + // Remove if from the subscribers + delete(c.cancelChannels, path) + + c.logger.Info(ctx, "Unsubscribed from channel") +} + +// UserController is the structure that provides publishing capabilities to the +// developer and and connect the broker with the User +type UserController struct { + controller +} + +// NewUserController links the User to the broker +func NewUserController(bc extensions.BrokerController, options ...ControllerOption) (*UserController, error) { + // Check if broker controller has been provided + if bc == nil { + return nil, extensions.ErrNilBrokerController + } + + // Create default controller + controller := controller{ + broker: bc, + cancelChannels: make(map[string]chan any), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + } + + // Apply options + for _, option := range options { + option(&controller) + } + + return &UserController{controller: controller}, nil +} + +func (c UserController) wrapMiddlewares(middlewares []extensions.Middleware, last extensions.NextMiddleware) func(ctx context.Context) { + var called bool + + // If there is no more middleware + if len(middlewares) == 0 { + return func(ctx context.Context) { + if !called { + called = true + last(ctx) + } + } + } + + // Wrap middleware into a check function that will call execute the middleware + // and call the next wrapped middleware if the returned function has not been + // called already + next := c.wrapMiddlewares(middlewares[1:], last) + return func(ctx context.Context) { + // Call the middleware and the following if it has not been done already + if !called { + called = true + ctx = middlewares[0](ctx, next) + + // If next has already been called in middleware, it should not be + // executed again + next(ctx) + } + } +} + +func (c UserController) executeMiddlewares(ctx context.Context, callback func(ctx context.Context)) { + // Wrap middleware to have 'next' function when calling them + wrapped := c.wrapMiddlewares(c.middlewares, callback) + + // Execute wrapped middlewares + wrapped(ctx) +} + +func addUserContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.0.0") + ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "user") + return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) +} + +// Close will clean up any existing resources on the controller +func (c *UserController) Close(ctx context.Context) { + // Unsubscribing remaining channels +} + +// PublishHello will publish messages to 'hello' channel +func (c *UserController) PublishHello(ctx context.Context, msg HelloMessage) error { + // Get channel path + path := "hello" + + // Set context + ctx = addUserContextValues(ctx, path) + ctx = context.WithValue(ctx, extensions.ContextKeyIsMessage, msg) + ctx = context.WithValue(ctx, extensions.ContextKeyIsMessageDirection, "publication") + + // Convert to BrokerMessage + bMsg, err := msg.toBrokerMessage() + if err != nil { + return err + } + + // Set broker message to context + ctx = context.WithValue(ctx, extensions.ContextKeyIsBrokerMessage, bMsg) + + // Publish the message on event-broker through middlewares + c.executeMiddlewares(ctx, func(ctx context.Context) { + err = c.broker.Publish(ctx, path, bMsg) + }) + + // Return error from publication on broker + return err +} + +// controller is the controller that will be used to communicate with the broker +// It will be used internally by AppController and UserController +type controller struct { + // broker is the broker controller that will be used to communicate + broker extensions.BrokerController + // cancelChannels is a map of cancel channels for each subscribed channel + cancelChannels map[string]chan any + // logger is the logger that will be used² to log operations on controller + logger extensions.Logger + // middlewares are the middlewares that will be executed when sending or + // receiving messages + middlewares []extensions.Middleware +} + +// ControllerOption is the type of the options that can be passed +// when creating a new Controller +type ControllerOption func(controller *controller) + +// WithLogger attaches a logger to the controller +func WithLogger(logger extensions.Logger) ControllerOption { + return func(controller *controller) { + controller.logger = logger + } +} + +// WithMiddlewares attaches middlewares that will be executed when sending or receiving messages +func WithMiddlewares(middlewares ...extensions.Middleware) ControllerOption { + return func(controller *controller) { + controller.middlewares = middlewares + } +} + +type MessageWithCorrelationID interface { + CorrelationID() string + SetCorrelationID(id string) +} + +type Error struct { + Channel string + Err error +} + +func (e *Error) Error() string { + return fmt.Sprintf("channel %q: err %v", e.Channel, e.Err) +} + +// HelloMessage is the message expected for 'Hello' channel +type HelloMessage struct { + // Payload will be inserted in the message payload + Payload string +} + +func NewHelloMessage() HelloMessage { + var msg HelloMessage + + return msg +} + +// newHelloMessageFromBrokerMessage will fill a new HelloMessage with data from generic broker message +func newHelloMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (HelloMessage, error) { + var msg HelloMessage + + // Convert to string + msg.Payload = string(bMsg.Payload) + + // TODO: run checks on msg type + + return msg, nil +} + +// toBrokerMessage will generate a generic broker message from HelloMessage data +func (msg HelloMessage) toBrokerMessage() (extensions.BrokerMessage, error) { + // TODO: implement checks on message + + // Convert to []byte + payload := []byte(msg.Payload) + + // There is no headers here + headers := make(map[string][]byte, 0) + + return extensions.BrokerMessage{ + Headers: headers, + Payload: payload, + }, nil +} diff --git a/test/issues/73/v2/asyncapi.gen.go b/test/issues/73/v2/asyncapi.gen.go new file mode 100644 index 00000000..050b24f2 --- /dev/null +++ b/test/issues/73/v2/asyncapi.gen.go @@ -0,0 +1,412 @@ +// Package "v2" provides primitives to interact with the AsyncAPI specification. +// +// Code generated by github.com/lerenn/asyncapi-codegen version (devel) DO NOT EDIT. +package v2 + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/lerenn/asyncapi-codegen/pkg/extensions" +) + +// AppSubscriber represents all handlers that are expecting messages for App +type AppSubscriber interface { + // Hello subscribes to messages placed on the 'hello' channel + Hello(ctx context.Context, msg HelloMessage, done bool) +} + +// AppController is the structure that provides publishing capabilities to the +// developer and and connect the broker with the App +type AppController struct { + controller +} + +// NewAppController links the App to the broker +func NewAppController(bc extensions.BrokerController, options ...ControllerOption) (*AppController, error) { + // Check if broker controller has been provided + if bc == nil { + return nil, extensions.ErrNilBrokerController + } + + // Create default controller + controller := controller{ + broker: bc, + cancelChannels: make(map[string]chan any), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + } + + // Apply options + for _, option := range options { + option(&controller) + } + + return &AppController{controller: controller}, nil +} + +func (c AppController) wrapMiddlewares(middlewares []extensions.Middleware, last extensions.NextMiddleware) func(ctx context.Context) { + var called bool + + // If there is no more middleware + if len(middlewares) == 0 { + return func(ctx context.Context) { + if !called { + called = true + last(ctx) + } + } + } + + // Wrap middleware into a check function that will call execute the middleware + // and call the next wrapped middleware if the returned function has not been + // called already + next := c.wrapMiddlewares(middlewares[1:], last) + return func(ctx context.Context) { + // Call the middleware and the following if it has not been done already + if !called { + called = true + ctx = middlewares[0](ctx, next) + + // If next has already been called in middleware, it should not be + // executed again + next(ctx) + } + } +} + +func (c AppController) executeMiddlewares(ctx context.Context, callback func(ctx context.Context)) { + // Wrap middleware to have 'next' function when calling them + wrapped := c.wrapMiddlewares(c.middlewares, callback) + + // Execute wrapped middlewares + wrapped(ctx) +} + +func addAppContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "2.0.0") + ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "app") + return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) +} + +// Close will clean up any existing resources on the controller +func (c *AppController) Close(ctx context.Context) { + // Unsubscribing remaining channels + c.UnsubscribeAll(ctx) + + c.logger.Info(ctx, "Closed app controller") +} + +// SubscribeAll will subscribe to channels without parameters on which the app is expecting messages. +// For channels with parameters, they should be subscribed independently. +func (c *AppController) SubscribeAll(ctx context.Context, as AppSubscriber) error { + if as == nil { + return extensions.ErrNilAppSubscriber + } + + if err := c.SubscribeHello(ctx, as.Hello); err != nil { + return err + } + + return nil +} + +// UnsubscribeAll will unsubscribe all remaining subscribed channels +func (c *AppController) UnsubscribeAll(ctx context.Context) { + c.UnsubscribeHello(ctx) +} + +// SubscribeHello will subscribe to new messages from 'hello' channel. +// +// Callback function 'fn' will be called each time a new message is received. +// The 'done' argument indicates when the subscription is canceled and can be +// used to clean up resources. +func (c *AppController) SubscribeHello(ctx context.Context, fn func(ctx context.Context, msg HelloMessage, done bool)) error { + // Get channel path + path := "hello" + + // Set context + ctx = addAppContextValues(ctx, path) + + // Check if there is already a subscription + _, exists := c.cancelChannels[path] + if exists { + err := fmt.Errorf("%w: %q channel is already subscribed", extensions.ErrAlreadySubscribedChannel, path) + c.logger.Error(ctx, err.Error()) + return err + } + + // Subscribe to broker channel + msgs, cancel, err := c.broker.Subscribe(ctx, path) + if err != nil { + c.logger.Error(ctx, err.Error()) + return err + } + c.logger.Info(ctx, "Subscribed to channel") + + // Asynchronously listen to new messages and pass them to app subscriber + go func() { + for { + // Wait for next message + bMsg, open := <-msgs + + // Set broker message to context + ctx = context.WithValue(ctx, extensions.ContextKeyIsBrokerMessage, bMsg) + + // Process message + msg, err := newHelloMessageFromBrokerMessage(bMsg) + if err != nil { + c.logger.Error(ctx, err.Error()) + } + + // Add context + msgCtx := context.WithValue(ctx, extensions.ContextKeyIsMessage, msg) + msgCtx = context.WithValue(msgCtx, extensions.ContextKeyIsMessageDirection, "reception") + + // Process message if no error and still open + if err == nil && open { + // Execute middlewares with the callback + c.executeMiddlewares(msgCtx, func(ctx context.Context) { + fn(ctx, msg, !open) + }) + } + + // If subscription is closed, then exit the function + if !open { + return + } + } + }() + + // Add the cancel channel to the inside map + c.cancelChannels[path] = cancel + + return nil +} + +// UnsubscribeHello will unsubscribe messages from 'hello' channel +func (c *AppController) UnsubscribeHello(ctx context.Context) { + // Get channel path + path := "hello" + + // Check if there subscribers for this channel + cancel, exists := c.cancelChannels[path] + if !exists { + return + } + + // Set context + ctx = addAppContextValues(ctx, path) + + // Stop the subscription and wait for its closure to be complete + cancel <- true + <-cancel + + // Remove if from the subscribers + delete(c.cancelChannels, path) + + c.logger.Info(ctx, "Unsubscribed from channel") +} + +// UserController is the structure that provides publishing capabilities to the +// developer and and connect the broker with the User +type UserController struct { + controller +} + +// NewUserController links the User to the broker +func NewUserController(bc extensions.BrokerController, options ...ControllerOption) (*UserController, error) { + // Check if broker controller has been provided + if bc == nil { + return nil, extensions.ErrNilBrokerController + } + + // Create default controller + controller := controller{ + broker: bc, + cancelChannels: make(map[string]chan any), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + } + + // Apply options + for _, option := range options { + option(&controller) + } + + return &UserController{controller: controller}, nil +} + +func (c UserController) wrapMiddlewares(middlewares []extensions.Middleware, last extensions.NextMiddleware) func(ctx context.Context) { + var called bool + + // If there is no more middleware + if len(middlewares) == 0 { + return func(ctx context.Context) { + if !called { + called = true + last(ctx) + } + } + } + + // Wrap middleware into a check function that will call execute the middleware + // and call the next wrapped middleware if the returned function has not been + // called already + next := c.wrapMiddlewares(middlewares[1:], last) + return func(ctx context.Context) { + // Call the middleware and the following if it has not been done already + if !called { + called = true + ctx = middlewares[0](ctx, next) + + // If next has already been called in middleware, it should not be + // executed again + next(ctx) + } + } +} + +func (c UserController) executeMiddlewares(ctx context.Context, callback func(ctx context.Context)) { + // Wrap middleware to have 'next' function when calling them + wrapped := c.wrapMiddlewares(c.middlewares, callback) + + // Execute wrapped middlewares + wrapped(ctx) +} + +func addUserContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "2.0.0") + ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "user") + return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) +} + +// Close will clean up any existing resources on the controller +func (c *UserController) Close(ctx context.Context) { + // Unsubscribing remaining channels +} + +// PublishHello will publish messages to 'hello' channel +func (c *UserController) PublishHello(ctx context.Context, msg HelloMessage) error { + // Get channel path + path := "hello" + + // Set context + ctx = addUserContextValues(ctx, path) + ctx = context.WithValue(ctx, extensions.ContextKeyIsMessage, msg) + ctx = context.WithValue(ctx, extensions.ContextKeyIsMessageDirection, "publication") + + // Convert to BrokerMessage + bMsg, err := msg.toBrokerMessage() + if err != nil { + return err + } + + // Set broker message to context + ctx = context.WithValue(ctx, extensions.ContextKeyIsBrokerMessage, bMsg) + + // Publish the message on event-broker through middlewares + c.executeMiddlewares(ctx, func(ctx context.Context) { + err = c.broker.Publish(ctx, path, bMsg) + }) + + // Return error from publication on broker + return err +} + +// controller is the controller that will be used to communicate with the broker +// It will be used internally by AppController and UserController +type controller struct { + // broker is the broker controller that will be used to communicate + broker extensions.BrokerController + // cancelChannels is a map of cancel channels for each subscribed channel + cancelChannels map[string]chan any + // logger is the logger that will be used² to log operations on controller + logger extensions.Logger + // middlewares are the middlewares that will be executed when sending or + // receiving messages + middlewares []extensions.Middleware +} + +// ControllerOption is the type of the options that can be passed +// when creating a new Controller +type ControllerOption func(controller *controller) + +// WithLogger attaches a logger to the controller +func WithLogger(logger extensions.Logger) ControllerOption { + return func(controller *controller) { + controller.logger = logger + } +} + +// WithMiddlewares attaches middlewares that will be executed when sending or receiving messages +func WithMiddlewares(middlewares ...extensions.Middleware) ControllerOption { + return func(controller *controller) { + controller.middlewares = middlewares + } +} + +type MessageWithCorrelationID interface { + CorrelationID() string + SetCorrelationID(id string) +} + +type Error struct { + Channel string + Err error +} + +func (e *Error) Error() string { + return fmt.Sprintf("channel %q: err %v", e.Channel, e.Err) +} + +// HelloMessage is the message expected for 'Hello' channel +type HelloMessage struct { + // Payload will be inserted in the message payload + Payload struct { + Message string `json:"message"` + Timestamp time.Time `json:"timestamp"` + } +} + +func NewHelloMessage() HelloMessage { + var msg HelloMessage + + return msg +} + +// newHelloMessageFromBrokerMessage will fill a new HelloMessage with data from generic broker message +func newHelloMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (HelloMessage, error) { + var msg HelloMessage + + // Unmarshal payload to expected message payload format + err := json.Unmarshal(bMsg.Payload, &msg.Payload) + if err != nil { + return msg, err + } + + // TODO: run checks on msg type + + return msg, nil +} + +// toBrokerMessage will generate a generic broker message from HelloMessage data +func (msg HelloMessage) toBrokerMessage() (extensions.BrokerMessage, error) { + // TODO: implement checks on message + + // Marshal payload to JSON + payload, err := json.Marshal(msg.Payload) + if err != nil { + return extensions.BrokerMessage{}, err + } + + // There is no headers here + headers := make(map[string][]byte, 0) + + return extensions.BrokerMessage{ + Headers: headers, + Payload: payload, + }, nil +} diff --git a/test/issues/74/asyncapi.gen.go b/test/issues/74/asyncapi.gen.go index b157edd5..7dd0c538 100644 --- a/test/issues/74/asyncapi.gen.go +++ b/test/issues/74/asyncapi.gen.go @@ -6,7 +6,6 @@ package issue74 import ( "context" "encoding/json" - "errors" "fmt" "time" @@ -29,15 +28,15 @@ type AppController struct { func NewAppController(bc extensions.BrokerController, options ...ControllerOption) (*AppController, error) { // Check if broker controller has been provided if bc == nil { - return nil, ErrNilBrokerController + return nil, extensions.ErrNilBrokerController } // Create default controller controller := controller{ - broker: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), + broker: bc, + cancelChannels: make(map[string]chan any), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), } // Apply options @@ -87,6 +86,7 @@ func (c AppController) executeMiddlewares(ctx context.Context, callback func(ctx } func addAppContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.0.0") ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "app") return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) } @@ -95,6 +95,7 @@ func addAppContextValues(ctx context.Context, path string) context.Context { func (c *AppController) Close(ctx context.Context) { // Unsubscribing remaining channels c.UnsubscribeAll(ctx) + c.logger.Info(ctx, "Closed app controller") } @@ -102,7 +103,7 @@ func (c *AppController) Close(ctx context.Context) { // For channels with parameters, they should be subscribed independently. func (c *AppController) SubscribeAll(ctx context.Context, as AppSubscriber) error { if as == nil { - return ErrNilAppSubscriber + return extensions.ErrNilAppSubscriber } if err := c.SubscribeTestChannel(ctx, as.TestChannel); err != nil { @@ -114,14 +115,7 @@ func (c *AppController) SubscribeAll(ctx context.Context, as AppSubscriber) erro // UnsubscribeAll will unsubscribe all remaining subscribed channels func (c *AppController) UnsubscribeAll(ctx context.Context) { - // Unsubscribe channels with no parameters (if any) c.UnsubscribeTestChannel(ctx) - - // Unsubscribe remaining channels - for n, stopChan := range c.stopSubscribers { - stopChan <- true - delete(c.stopSubscribers, n) - } } // SubscribeTestChannel will subscribe to new messages from 'testChannel' channel. @@ -137,15 +131,15 @@ func (c *AppController) SubscribeTestChannel(ctx context.Context, fn func(ctx co ctx = addAppContextValues(ctx, path) // Check if there is already a subscription - _, exists := c.stopSubscribers[path] + _, exists := c.cancelChannels[path] if exists { - err := fmt.Errorf("%w: %q channel is already subscribed", ErrAlreadySubscribedChannel, path) + err := fmt.Errorf("%w: %q channel is already subscribed", extensions.ErrAlreadySubscribedChannel, path) c.logger.Error(ctx, err.Error()) return err } // Subscribe to broker channel - msgs, stop, err := c.broker.Subscribe(ctx, path) + msgs, cancel, err := c.broker.Subscribe(ctx, path) if err != nil { c.logger.Error(ctx, err.Error()) return err @@ -186,8 +180,8 @@ func (c *AppController) SubscribeTestChannel(ctx context.Context, fn func(ctx co } }() - // Add the stop channel to the inside map - c.stopSubscribers[path] = stop + // Add the cancel channel to the inside map + c.cancelChannels[path] = cancel return nil } @@ -197,18 +191,21 @@ func (c *AppController) UnsubscribeTestChannel(ctx context.Context) { // Get channel path path := "testChannel" - // Set context - ctx = addAppContextValues(ctx, path) - - // Get stop channel - stopChan, exists := c.stopSubscribers[path] + // Check if there subscribers for this channel + cancel, exists := c.cancelChannels[path] if !exists { return } - // Stop the channel and remove the entry - stopChan <- true - delete(c.stopSubscribers, path) + // Set context + ctx = addAppContextValues(ctx, path) + + // Stop the subscription and wait for its closure to be complete + cancel <- true + <-cancel + + // Remove if from the subscribers + delete(c.cancelChannels, path) c.logger.Info(ctx, "Unsubscribed from channel") } @@ -223,15 +220,15 @@ type UserController struct { func NewUserController(bc extensions.BrokerController, options ...ControllerOption) (*UserController, error) { // Check if broker controller has been provided if bc == nil { - return nil, ErrNilBrokerController + return nil, extensions.ErrNilBrokerController } // Create default controller controller := controller{ - broker: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), + broker: bc, + cancelChannels: make(map[string]chan any), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), } // Apply options @@ -281,6 +278,7 @@ func (c UserController) executeMiddlewares(ctx context.Context, callback func(ct } func addUserContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.0.0") ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "user") return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) } @@ -318,38 +316,14 @@ func (c *UserController) PublishTestChannel(ctx context.Context, msg TestMessage return err } -var ( - // Generic error for AsyncAPI generated code - ErrAsyncAPI = errors.New("error when using AsyncAPI") - - // ErrContextCanceled is given when a given context is canceled - ErrContextCanceled = fmt.Errorf("%w: context canceled", ErrAsyncAPI) - - // ErrNilBrokerController is raised when a nil broker controller is user - ErrNilBrokerController = fmt.Errorf("%w: nil broker controller has been used", ErrAsyncAPI) - - // ErrNilAppSubscriber is raised when a nil app subscriber is user - ErrNilAppSubscriber = fmt.Errorf("%w: nil app subscriber has been used", ErrAsyncAPI) - - // ErrNilUserSubscriber is raised when a nil user subscriber is user - ErrNilUserSubscriber = fmt.Errorf("%w: nil user subscriber has been used", ErrAsyncAPI) - - // ErrAlreadySubscribedChannel is raised when a subscription is done twice - // or more without unsubscribing - ErrAlreadySubscribedChannel = fmt.Errorf("%w: the channel has already been subscribed", ErrAsyncAPI) - - // ErrSubscriptionCanceled is raised when expecting something and the subscription has been canceled before it happens - ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI) -) - // controller is the controller that will be used to communicate with the broker // It will be used internally by AppController and UserController type controller struct { // broker is the broker controller that will be used to communicate broker extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller + // cancelChannels is a map of cancel channels for each subscribed channel + cancelChannels map[string]chan any + // logger is the logger that will be used² to log operations on controller logger extensions.Logger // middlewares are the middlewares that will be executed when sending or // receiving messages diff --git a/test/issues/74/asyncapi.yaml b/test/issues/74/asyncapi.yaml index 0a853501..6468450b 100644 --- a/test/issues/74/asyncapi.yaml +++ b/test/issues/74/asyncapi.yaml @@ -19,8 +19,8 @@ components: Test: description: test message headers: - $ref: '#/components/schemas/Header' - payload: + $ref: '#/components/schemas/Header' + payload: oneOf: - $ref: '#/components/schemas/TestSchema' schemas: diff --git a/test/issues/74/suite_test.go b/test/issues/74/suite_test.go index 0085d307..e531f091 100644 --- a/test/issues/74/suite_test.go +++ b/test/issues/74/suite_test.go @@ -37,7 +37,7 @@ func NewSuite(broker extensions.BrokerController) *Suite { } } -func (suite *Suite) SetupSuite() { +func (suite *Suite) SetupTest() { // Create a channel to intercept message before sending to broker and after // reception from broker suite.interceptor = make(chan extensions.BrokerMessage, 8) @@ -53,6 +53,12 @@ func (suite *Suite) SetupSuite() { suite.user = user } +func (suite *Suite) TearDownTest() { + suite.app.Close(context.Background()) + suite.user.Close(context.Background()) + close(suite.interceptor) +} + func (suite *Suite) TestHeaders() { var wg sync.WaitGroup @@ -66,12 +72,12 @@ func (suite *Suite) TestHeaders() { // Check what the app receive and translate var recvMsg TestMessage + wg.Add(1) err := suite.app.SubscribeTestChannel(context.Background(), func(_ context.Context, msg TestMessage, _ bool) { recvMsg = msg wg.Done() }) suite.Require().NoError(err) - wg.Add(1) // Publish the message err = suite.user.PublishTestChannel(context.Background(), sent)