diff --git a/Makefile b/Makefile index 87f821c0..c6a91441 100644 --- a/Makefile +++ b/Makefile @@ -30,9 +30,7 @@ examples: brokers/up ## Perform examples .PHONY: test test: brokers/up ## Perform tests - @go test ./... -coverprofile cover.out -v -timeout=30s - @go tool cover -func cover.out - @rm cover.out + @go test ./... -timeout=30s .PHONY: generate generate: ## Generate files diff --git a/README.md b/README.md index a7955bbc..f7ebab18 100644 --- a/README.md +++ b/README.md @@ -121,7 +121,7 @@ type AppController struct // NewAppController will create a new App Controller and will connect the // BrokerController that you pass in argument to subscription and publication method. -func NewAppController(bs BrokerController) *AppController +func NewAppController(bs BrokerController, options ...ControllerOption) *AppController // Close function will clean up all resources and subscriptions left in the // application controller. This should be call right after NewAppController @@ -166,15 +166,12 @@ code with NATS (you can also find it [here](./examples/helloworld/nats/app/main. ```go import( - "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers" + "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/nats" /* ... */ ) -// Connect to NATS -nc, _ := nats.Connect("nats://nats:4222") - // Create a new application controller -ctrl, _ := NewAppController(brokers.NewNATSController(nc)) +ctrl, _ := NewAppController(nats.NewController("nats://nats:4222")) defer ctrl.Close(context.Background()) // Subscribe to HelloWorld messages @@ -201,7 +198,7 @@ type UserController struct // NewUserController will create a new User Controller and will connect the // BrokerController that you pass in argument to subscription and publication method. -func NewUserController(bs BrokerController) *UserController +func NewUserController(bs BrokerController, options ...ControllerOption) *UserController // Close function will clean up all resources and subscriptions left in the // application controller. This should be call right after NewAppController @@ -218,15 +215,12 @@ code with NATS (you can also find it [here](./examples/helloworld/nats/app/main. ```go import( - "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers" + "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/nats" /* ... */ ) -// Connect to NATS -nc, _ := nats.Connect("nats://nats:4222") - -// Create a new application controller -ctrl, _ := NewUserController(brokers.NewNATSController(nc)) +// Create a new user controller +ctrl, _ := NewUserController(nats.NewController("nats://nats:4222")) defer ctrl.Close(context.Background()) // Send HelloWorld @@ -273,16 +267,11 @@ use `ping.gen.go`. #### Application ```golang -import( - "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers" - /* ... */ -) - -type ServerSubscriber struct { +type Subscriber struct { Controller *AppController } -func (s ServerSubscriber) Ping(req PingMessage, _ bool) { +func (s Subscriber) Ping(req PingMessage, _ bool) { // Generate a pong message, set as a response of the request resp := NewPongMessage() resp.SetAsResponseFrom(&req) @@ -297,11 +286,11 @@ func main() { /* ... */ // Create a new application controller - ctrl, _ := NewAppController(brokers.NewNATSController(nc)) + ctrl, _ := NewAppController(/* Add corresponding broker controller */) defer ctrl.Close(context.Background()) // Subscribe to all (we could also have just listened on the ping request channel) - sub := ServerSubscriber{Controller: ctrl} + sub := AppSubscriber{Controller: ctrl} ctrl.SubscribeAll(context.Background(), sub) // Process messages until interruption signal @@ -357,20 +346,12 @@ Here are the universal parts that you can generate: ### Middlewares You can use middlewares that will be executing when receiving and publishing -messages. You can add one or multiple middlewares using the following function -on a controller: +messages. You can add one or multiple middlewares using the `WithMiddlewares` +function in the initialization of the App or User controller: ```golang -import( - "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers" - /* ... */ -) - -// Create a new app controller with a NATS controller for example -ctrl, _ := NewAppController(brokers.NewNATSController(nc)) - -// Add middleware -ctrl.AddMiddlewares(myMiddleware1, myMiddleware2 /*, ... */) +// Create a new app controller with middlewares +ctrl, _ := NewAppController(/* Broker of your choice */, WithMiddlewares(myMiddleware1, myMiddleware2 /*, ... */)) ``` Here the function signature that should be satisfied: @@ -460,7 +441,7 @@ You can have 2 types of logging: #### Controller logging To log internal operation of the controller, the only thing you have to do is -to set a logger to your controller with the function `SetLogger()`: +to initialize the controller with a logger, with the function `WithLogger()`: ```golang import( @@ -468,13 +449,8 @@ import( /* ... */ ) -// Create a new app controller with a NATS controller for example -ctrl, _ := NewAppController(brokers.NewNATSController(nc)) - -// Attach a logger (optional) -// You can find loggers in `github.com/lerenn/asyncapi-codegen/pkg/log` or create your own -logger := log.NewECS() -ctrl.SetLogger(logger) +// Create a new app controller with an Elastic Common Schema compatible logger +ctrl, _ := NewAppController(/* Broker of your choice */, WithLogger(log.NewECS())) ``` You can find all loggers in the directory `pkg/log`. @@ -490,11 +466,9 @@ import( /* ... */ ) -// Create a new app controller with a NATS controller for example -ctrl, _ := NewAppController(brokers.NewNATSController(nc)) - -// Add middleware -ctrl.AddMiddlewares(middleware.Logging(log.NewECS())) +// Create a new app controller with a middleware for logging incoming/outgoing messages +loggingMiddleware := middleware.Logging(log.NewECS()) +ctrl, _ := NewAppController(/* Broker of your choice */, WithMiddlewares(loggingMiddleware)) ``` #### Custom logging @@ -537,16 +511,12 @@ func (logger SimpleLogger) Error(ctx log.Context, msg string, info ...log.Additi You can then create a controller with a logger using similar lines: ```golang -import( - "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers" - /* ... */ +// Create a new app controller with the custom logger +ctrl, _ := NewAppController( + /* Broker of your choice */, + WithLogger(SimpleLogger{}), /* Use on as internal logger */ + WithMiddleware(middleware.Logging(SimpleLogger{})), /* Use to log incoming/outgoing messages */ ) - -// Create a new app controller with a NATS controller for example -ctrl, _ := NewAppController(brokers.NewNATSController(nc)) - -// Set a logger -ctrl.SetLogger(SimpleLogger{}) ``` ### Implementing your own broker controller @@ -559,10 +529,7 @@ import( "github.com/lerenn/asyncapi-codegen/pkg/extensions" ) -type BrokerController interface { - // SetLogger set a logger that will log operations on broker controller - SetLogger(logger extensions.Logger) - +type BrokerController interface { // Publish a message to the broker Publish(ctx context.Context, channel string, mw extensions.BrokerMessage) error diff --git a/examples/helloworld/nats/app/app.gen.go b/examples/helloworld/nats/app/app.gen.go index 98250428..6cc1196c 100644 --- a/examples/helloworld/nats/app/app.gen.go +++ b/examples/helloworld/nats/app/app.gen.go @@ -14,48 +14,37 @@ import ( // AppSubscriber represents all handlers that are expecting messages for App type AppSubscriber interface { - // Hello + // Hello subscribes to messages placed on the 'hello' channel Hello(ctx context.Context, msg HelloMessage, done bool) } // AppController is the structure that provides publishing capabilities to the // developer and and connect the broker with the App type AppController struct { - // brokerController is the broker controller that will be used to communicate - brokerController extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller - logger extensions.Logger - // middlewares are the middlewares that will be executed when sending or - // receiving messages - middlewares []extensions.Middleware + controller } // NewAppController links the App to the broker -func NewAppController(bc extensions.BrokerController) (*AppController, error) { +func NewAppController(bc extensions.BrokerController, options ...ControllerOption) (*AppController, error) { + // Check if broker controller has been provided if bc == nil { return nil, ErrNilBrokerController } - return &AppController{ - brokerController: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), - }, nil -} + // Create default controller + controller := controller{ + broker: bc, + stopSubscribers: make(map[string]chan interface{}), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + } -// SetLogger attaches a logger that will log operations on controller -func (c *AppController) SetLogger(logger extensions.Logger) { - c.logger = logger - c.brokerController.SetLogger(logger) -} + // Apply options + for _, option := range options { + option(&controller) + } -// AddMiddlewares attaches middlewares that will be executed when sending or -// receiving messages -func (c *AppController) AddMiddlewares(middleware ...extensions.Middleware) { - c.middlewares = append(c.middlewares, middleware...) + return &AppController{controller: controller}, nil } func (c AppController) wrapMiddlewares(middlewares []extensions.Middleware, last extensions.NextMiddleware) func(ctx context.Context) { @@ -155,7 +144,7 @@ func (c *AppController) SubscribeHello(ctx context.Context, fn func(ctx context. } // Subscribe to broker channel - msgs, stop, err := c.brokerController.Subscribe(ctx, path) + msgs, stop, err := c.broker.Subscribe(ctx, path) if err != nil { c.logger.Error(ctx, err.Error()) return err @@ -247,6 +236,38 @@ var ( ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI) ) +// controller is the controller that will be used to communicate with the broker +// It will be used internally by AppController and UserController +type controller struct { + // broker is the broker controller that will be used to communicate + broker extensions.BrokerController + // stopSubscribers is a map of stop channels for each subscribed channel + stopSubscribers map[string]chan interface{} + // logger is the logger that will be used to log operations on controller + logger extensions.Logger + // middlewares are the middlewares that will be executed when sending or + // receiving messages + middlewares []extensions.Middleware +} + +// ControllerOption is the type of the options that can be passed +// when creating a new Controller +type ControllerOption func(controller *controller) + +// WithLogger attaches a logger to the controller +func WithLogger(logger extensions.Logger) ControllerOption { + return func(controller *controller) { + controller.logger = logger + } +} + +// WithMiddlewares attaches middlewares that will be executed when sending or receiving messages +func WithMiddlewares(middlewares ...extensions.Middleware) ControllerOption { + return func(controller *controller) { + controller.middlewares = middlewares + } +} + type MessageWithCorrelationID interface { CorrelationID() string SetCorrelationID(id string) diff --git a/examples/helloworld/nats/app/main.go b/examples/helloworld/nats/app/main.go index 90ed24dd..84bba120 100644 --- a/examples/helloworld/nats/app/main.go +++ b/examples/helloworld/nats/app/main.go @@ -8,19 +8,12 @@ import ( "os" "os/signal" - "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers" - "github.com/nats-io/nats.go" + "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/nats" ) func main() { - // Connect to NATS - nc, err := nats.Connect("nats://nats:4222") - if err != nil { - panic(err) - } - // Create a new application controller - ctrl, err := NewAppController(brokers.NewNATSController(nc)) + ctrl, err := NewAppController(nats.NewController("nats://nats:4222")) if err != nil { panic(err) } diff --git a/examples/helloworld/nats/user/main.go b/examples/helloworld/nats/user/main.go index a2521f2f..675cca33 100644 --- a/examples/helloworld/nats/user/main.go +++ b/examples/helloworld/nats/user/main.go @@ -6,19 +6,12 @@ import ( "context" "log" - "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers" - "github.com/nats-io/nats.go" + "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/nats" ) func main() { - // Connect to NATS - nc, err := nats.Connect("nats://nats:4222") - if err != nil { - panic(err) - } - // Create a new user controller - ctrl, err := NewUserController(brokers.NewNATSController(nc)) + ctrl, err := NewUserController(nats.NewController("nats://nats:4222")) if err != nil { panic(err) } diff --git a/examples/helloworld/nats/user/user.gen.go b/examples/helloworld/nats/user/user.gen.go index 5fc8a9cc..3e70ba4a 100644 --- a/examples/helloworld/nats/user/user.gen.go +++ b/examples/helloworld/nats/user/user.gen.go @@ -15,41 +15,30 @@ import ( // UserController is the structure that provides publishing capabilities to the // developer and and connect the broker with the User type UserController struct { - // brokerController is the broker controller that will be used to communicate - brokerController extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller - logger extensions.Logger - // middlewares are the middlewares that will be executed when sending or - // receiving messages - middlewares []extensions.Middleware + controller } // NewUserController links the User to the broker -func NewUserController(bc extensions.BrokerController) (*UserController, error) { +func NewUserController(bc extensions.BrokerController, options ...ControllerOption) (*UserController, error) { + // Check if broker controller has been provided if bc == nil { return nil, ErrNilBrokerController } - return &UserController{ - brokerController: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), - }, nil -} + // Create default controller + controller := controller{ + broker: bc, + stopSubscribers: make(map[string]chan interface{}), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + } -// SetLogger attaches a logger that will log operations on controller -func (c *UserController) SetLogger(logger extensions.Logger) { - c.logger = logger - c.brokerController.SetLogger(logger) -} + // Apply options + for _, option := range options { + option(&controller) + } -// AddMiddlewares attaches middlewares that will be executed when sending or -// receiving messages -func (c *UserController) AddMiddlewares(middleware ...extensions.Middleware) { - c.middlewares = append(c.middlewares, middleware...) + return &UserController{controller: controller}, nil } func (c UserController) wrapMiddlewares(middlewares []extensions.Middleware, last extensions.NextMiddleware) func(ctx context.Context) { @@ -121,7 +110,7 @@ func (c *UserController) PublishHello(ctx context.Context, msg HelloMessage) err // Publish the message on event-broker through middlewares c.executeMiddlewares(ctx, func(ctx context.Context) { - err = c.brokerController.Publish(ctx, path, bMsg) + err = c.broker.Publish(ctx, path, bMsg) }) // Return error from publication on broker @@ -152,6 +141,38 @@ var ( ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI) ) +// controller is the controller that will be used to communicate with the broker +// It will be used internally by AppController and UserController +type controller struct { + // broker is the broker controller that will be used to communicate + broker extensions.BrokerController + // stopSubscribers is a map of stop channels for each subscribed channel + stopSubscribers map[string]chan interface{} + // logger is the logger that will be used to log operations on controller + logger extensions.Logger + // middlewares are the middlewares that will be executed when sending or + // receiving messages + middlewares []extensions.Middleware +} + +// ControllerOption is the type of the options that can be passed +// when creating a new Controller +type ControllerOption func(controller *controller) + +// WithLogger attaches a logger to the controller +func WithLogger(logger extensions.Logger) ControllerOption { + return func(controller *controller) { + controller.logger = logger + } +} + +// WithMiddlewares attaches middlewares that will be executed when sending or receiving messages +func WithMiddlewares(middlewares ...extensions.Middleware) ControllerOption { + return func(controller *controller) { + controller.middlewares = middlewares + } +} + type MessageWithCorrelationID interface { CorrelationID() string SetCorrelationID(id string) diff --git a/examples/ping/kafka/app/app.gen.go b/examples/ping/kafka/app/app.gen.go index 4addf691..be73d509 100644 --- a/examples/ping/kafka/app/app.gen.go +++ b/examples/ping/kafka/app/app.gen.go @@ -17,48 +17,37 @@ import ( // AppSubscriber represents all handlers that are expecting messages for App type AppSubscriber interface { - // Ping + // Ping subscribes to messages placed on the 'ping' channel Ping(ctx context.Context, msg PingMessage, done bool) } // AppController is the structure that provides publishing capabilities to the // developer and and connect the broker with the App type AppController struct { - // brokerController is the broker controller that will be used to communicate - brokerController extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller - logger extensions.Logger - // middlewares are the middlewares that will be executed when sending or - // receiving messages - middlewares []extensions.Middleware + controller } // NewAppController links the App to the broker -func NewAppController(bc extensions.BrokerController) (*AppController, error) { +func NewAppController(bc extensions.BrokerController, options ...ControllerOption) (*AppController, error) { + // Check if broker controller has been provided if bc == nil { return nil, ErrNilBrokerController } - return &AppController{ - brokerController: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), - }, nil -} + // Create default controller + controller := controller{ + broker: bc, + stopSubscribers: make(map[string]chan interface{}), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + } -// SetLogger attaches a logger that will log operations on controller -func (c *AppController) SetLogger(logger extensions.Logger) { - c.logger = logger - c.brokerController.SetLogger(logger) -} + // Apply options + for _, option := range options { + option(&controller) + } -// AddMiddlewares attaches middlewares that will be executed when sending or -// receiving messages -func (c *AppController) AddMiddlewares(middleware ...extensions.Middleware) { - c.middlewares = append(c.middlewares, middleware...) + return &AppController{controller: controller}, nil } func (c AppController) wrapMiddlewares(middlewares []extensions.Middleware, last extensions.NextMiddleware) func(ctx context.Context) { @@ -158,7 +147,7 @@ func (c *AppController) SubscribePing(ctx context.Context, fn func(ctx context.C } // Subscribe to broker channel - msgs, stop, err := c.brokerController.Subscribe(ctx, path) + msgs, stop, err := c.broker.Subscribe(ctx, path) if err != nil { c.logger.Error(ctx, err.Error()) return err @@ -258,7 +247,7 @@ func (c *AppController) PublishPong(ctx context.Context, msg PongMessage) error // Publish the message on event-broker through middlewares c.executeMiddlewares(ctx, func(ctx context.Context) { - err = c.brokerController.Publish(ctx, path, bMsg) + err = c.broker.Publish(ctx, path, bMsg) }) // Return error from publication on broker @@ -289,6 +278,38 @@ var ( ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI) ) +// controller is the controller that will be used to communicate with the broker +// It will be used internally by AppController and UserController +type controller struct { + // broker is the broker controller that will be used to communicate + broker extensions.BrokerController + // stopSubscribers is a map of stop channels for each subscribed channel + stopSubscribers map[string]chan interface{} + // logger is the logger that will be used to log operations on controller + logger extensions.Logger + // middlewares are the middlewares that will be executed when sending or + // receiving messages + middlewares []extensions.Middleware +} + +// ControllerOption is the type of the options that can be passed +// when creating a new Controller +type ControllerOption func(controller *controller) + +// WithLogger attaches a logger to the controller +func WithLogger(logger extensions.Logger) ControllerOption { + return func(controller *controller) { + controller.logger = logger + } +} + +// WithMiddlewares attaches middlewares that will be executed when sending or receiving messages +func WithMiddlewares(middlewares ...extensions.Middleware) ControllerOption { + return func(controller *controller) { + controller.middlewares = middlewares + } +} + type MessageWithCorrelationID interface { CorrelationID() string SetCorrelationID(id string) diff --git a/examples/ping/kafka/app/main.go b/examples/ping/kafka/app/main.go index 213a24b7..1aac0261 100644 --- a/examples/ping/kafka/app/main.go +++ b/examples/ping/kafka/app/main.go @@ -8,16 +8,16 @@ import ( "os/signal" "time" - "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers" + "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/kafka" "github.com/lerenn/asyncapi-codegen/pkg/extensions/loggers" "github.com/lerenn/asyncapi-codegen/pkg/extensions/middlewares" ) -type ServerSubscriber struct { +type Subscriber struct { Controller *AppController } -func (s ServerSubscriber) Ping(ctx context.Context, req PingMessage, _ bool) { +func (s Subscriber) Ping(ctx context.Context, req PingMessage, _ bool) { // Generate a pong message, set as a response of the request resp := NewPongMessage() resp.SetAsResponseFrom(&req) @@ -33,24 +33,26 @@ func (s ServerSubscriber) Ping(ctx context.Context, req PingMessage, _ bool) { } func main() { - time.Sleep(5 * time.Second) + // Instanciate a Kafka controller with a logger + logger := loggers.NewECS() + broker := kafka.NewController( + []string{"kafka:9092"}, // List of hosts + kafka.WithLogger(logger), // Attach an internal logger + kafka.WithGroupID("ping-apps"), // Change group id + ) - // Create a new user controller - host := "kafka:9092" // Create a new app controller - ctrl, err := NewAppController(brokers.NewKafkaController([]string{host})) + ctrl, err := NewAppController( + broker, // Attach the kafka controller + WithLogger(logger), // Attach an internal logger + WithMiddlewares(middlewares.Logging(logger))) // Attach a middleware to log messages if err != nil { panic(err) } defer ctrl.Close(context.Background()) - // Attach a logger (optional) - logger := loggers.NewECS() - ctrl.SetLogger(logger) - ctrl.AddMiddlewares(middlewares.Logging(logger)) - // Subscribe to all (we could also have just listened on the ping request channel) - sub := ServerSubscriber{Controller: ctrl} + sub := Subscriber{Controller: ctrl} if err := ctrl.SubscribeAll(context.Background(), sub); err != nil { panic(err) } diff --git a/examples/ping/kafka/user/main.go b/examples/ping/kafka/user/main.go index 8cbc7b4b..9a7cb94e 100644 --- a/examples/ping/kafka/user/main.go +++ b/examples/ping/kafka/user/main.go @@ -6,27 +6,30 @@ import ( "context" "time" - "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers" + "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/kafka" "github.com/lerenn/asyncapi-codegen/pkg/extensions/loggers" "github.com/lerenn/asyncapi-codegen/pkg/extensions/middlewares" ) func main() { - time.Sleep(6 * time.Second) + // Instanciate a Kafka controller with a logger + logger := loggers.NewECS() + broker := kafka.NewController( + []string{"kafka:9092"}, // List of hosts + kafka.WithLogger(logger), // Attach an internal logger + kafka.WithGroupID("ping-users"), // Change group id + ) // Create a new user controller - host := "kafka:9092" - ctrl, err := NewUserController(brokers.NewKafkaController([]string{host})) + ctrl, err := NewUserController( + broker, // Attach the kafka controller + WithLogger(logger), // Attach an internal logger + WithMiddlewares(middlewares.Logging(logger))) // Attach a middleware to log messages if err != nil { panic(err) } defer ctrl.Close(context.Background()) - // Attach a logger (optional) - logger := loggers.NewECS() - ctrl.SetLogger(logger) - ctrl.AddMiddlewares(middlewares.Logging(logger)) - // Make a new ping message req := NewPingMessage() req.Payload = "ping" diff --git a/examples/ping/kafka/user/user.gen.go b/examples/ping/kafka/user/user.gen.go index b1883bde..c780d36e 100644 --- a/examples/ping/kafka/user/user.gen.go +++ b/examples/ping/kafka/user/user.gen.go @@ -17,48 +17,37 @@ import ( // UserSubscriber represents all handlers that are expecting messages for User type UserSubscriber interface { - // Pong + // Pong subscribes to messages placed on the 'pong' channel Pong(ctx context.Context, msg PongMessage, done bool) } // UserController is the structure that provides publishing capabilities to the // developer and and connect the broker with the User type UserController struct { - // brokerController is the broker controller that will be used to communicate - brokerController extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller - logger extensions.Logger - // middlewares are the middlewares that will be executed when sending or - // receiving messages - middlewares []extensions.Middleware + controller } // NewUserController links the User to the broker -func NewUserController(bc extensions.BrokerController) (*UserController, error) { +func NewUserController(bc extensions.BrokerController, options ...ControllerOption) (*UserController, error) { + // Check if broker controller has been provided if bc == nil { return nil, ErrNilBrokerController } - return &UserController{ - brokerController: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), - }, nil -} + // Create default controller + controller := controller{ + broker: bc, + stopSubscribers: make(map[string]chan interface{}), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + } -// SetLogger attaches a logger that will log operations on controller -func (c *UserController) SetLogger(logger extensions.Logger) { - c.logger = logger - c.brokerController.SetLogger(logger) -} + // Apply options + for _, option := range options { + option(&controller) + } -// AddMiddlewares attaches middlewares that will be executed when sending or -// receiving messages -func (c *UserController) AddMiddlewares(middleware ...extensions.Middleware) { - c.middlewares = append(c.middlewares, middleware...) + return &UserController{controller: controller}, nil } func (c UserController) wrapMiddlewares(middlewares []extensions.Middleware, last extensions.NextMiddleware) func(ctx context.Context) { @@ -158,7 +147,7 @@ func (c *UserController) SubscribePong(ctx context.Context, fn func(ctx context. } // Subscribe to broker channel - msgs, stop, err := c.brokerController.Subscribe(ctx, path) + msgs, stop, err := c.broker.Subscribe(ctx, path) if err != nil { c.logger.Error(ctx, err.Error()) return err @@ -258,7 +247,7 @@ func (c *UserController) PublishPing(ctx context.Context, msg PingMessage) error // Publish the message on event-broker through middlewares c.executeMiddlewares(ctx, func(ctx context.Context) { - err = c.brokerController.Publish(ctx, path, bMsg) + err = c.broker.Publish(ctx, path, bMsg) }) // Return error from publication on broker @@ -277,7 +266,7 @@ func (cc *UserController) WaitForPong(ctx context.Context, publishMsg MessageWit ctx = addUserContextValues(ctx, path) // Subscribe to broker channel - msgs, stop, err := cc.brokerController.Subscribe(ctx, path) + msgs, stop, err := cc.broker.Subscribe(ctx, path) if err != nil { cc.logger.Error(ctx, err.Error()) return PongMessage{}, err @@ -357,6 +346,38 @@ var ( ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI) ) +// controller is the controller that will be used to communicate with the broker +// It will be used internally by AppController and UserController +type controller struct { + // broker is the broker controller that will be used to communicate + broker extensions.BrokerController + // stopSubscribers is a map of stop channels for each subscribed channel + stopSubscribers map[string]chan interface{} + // logger is the logger that will be used to log operations on controller + logger extensions.Logger + // middlewares are the middlewares that will be executed when sending or + // receiving messages + middlewares []extensions.Middleware +} + +// ControllerOption is the type of the options that can be passed +// when creating a new Controller +type ControllerOption func(controller *controller) + +// WithLogger attaches a logger to the controller +func WithLogger(logger extensions.Logger) ControllerOption { + return func(controller *controller) { + controller.logger = logger + } +} + +// WithMiddlewares attaches middlewares that will be executed when sending or receiving messages +func WithMiddlewares(middlewares ...extensions.Middleware) ControllerOption { + return func(controller *controller) { + controller.middlewares = middlewares + } +} + type MessageWithCorrelationID interface { CorrelationID() string SetCorrelationID(id string) diff --git a/examples/ping/nats/app/app.gen.go b/examples/ping/nats/app/app.gen.go index 4addf691..be73d509 100644 --- a/examples/ping/nats/app/app.gen.go +++ b/examples/ping/nats/app/app.gen.go @@ -17,48 +17,37 @@ import ( // AppSubscriber represents all handlers that are expecting messages for App type AppSubscriber interface { - // Ping + // Ping subscribes to messages placed on the 'ping' channel Ping(ctx context.Context, msg PingMessage, done bool) } // AppController is the structure that provides publishing capabilities to the // developer and and connect the broker with the App type AppController struct { - // brokerController is the broker controller that will be used to communicate - brokerController extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller - logger extensions.Logger - // middlewares are the middlewares that will be executed when sending or - // receiving messages - middlewares []extensions.Middleware + controller } // NewAppController links the App to the broker -func NewAppController(bc extensions.BrokerController) (*AppController, error) { +func NewAppController(bc extensions.BrokerController, options ...ControllerOption) (*AppController, error) { + // Check if broker controller has been provided if bc == nil { return nil, ErrNilBrokerController } - return &AppController{ - brokerController: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), - }, nil -} + // Create default controller + controller := controller{ + broker: bc, + stopSubscribers: make(map[string]chan interface{}), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + } -// SetLogger attaches a logger that will log operations on controller -func (c *AppController) SetLogger(logger extensions.Logger) { - c.logger = logger - c.brokerController.SetLogger(logger) -} + // Apply options + for _, option := range options { + option(&controller) + } -// AddMiddlewares attaches middlewares that will be executed when sending or -// receiving messages -func (c *AppController) AddMiddlewares(middleware ...extensions.Middleware) { - c.middlewares = append(c.middlewares, middleware...) + return &AppController{controller: controller}, nil } func (c AppController) wrapMiddlewares(middlewares []extensions.Middleware, last extensions.NextMiddleware) func(ctx context.Context) { @@ -158,7 +147,7 @@ func (c *AppController) SubscribePing(ctx context.Context, fn func(ctx context.C } // Subscribe to broker channel - msgs, stop, err := c.brokerController.Subscribe(ctx, path) + msgs, stop, err := c.broker.Subscribe(ctx, path) if err != nil { c.logger.Error(ctx, err.Error()) return err @@ -258,7 +247,7 @@ func (c *AppController) PublishPong(ctx context.Context, msg PongMessage) error // Publish the message on event-broker through middlewares c.executeMiddlewares(ctx, func(ctx context.Context) { - err = c.brokerController.Publish(ctx, path, bMsg) + err = c.broker.Publish(ctx, path, bMsg) }) // Return error from publication on broker @@ -289,6 +278,38 @@ var ( ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI) ) +// controller is the controller that will be used to communicate with the broker +// It will be used internally by AppController and UserController +type controller struct { + // broker is the broker controller that will be used to communicate + broker extensions.BrokerController + // stopSubscribers is a map of stop channels for each subscribed channel + stopSubscribers map[string]chan interface{} + // logger is the logger that will be used to log operations on controller + logger extensions.Logger + // middlewares are the middlewares that will be executed when sending or + // receiving messages + middlewares []extensions.Middleware +} + +// ControllerOption is the type of the options that can be passed +// when creating a new Controller +type ControllerOption func(controller *controller) + +// WithLogger attaches a logger to the controller +func WithLogger(logger extensions.Logger) ControllerOption { + return func(controller *controller) { + controller.logger = logger + } +} + +// WithMiddlewares attaches middlewares that will be executed when sending or receiving messages +func WithMiddlewares(middlewares ...extensions.Middleware) ControllerOption { + return func(controller *controller) { + controller.middlewares = middlewares + } +} + type MessageWithCorrelationID interface { CorrelationID() string SetCorrelationID(id string) diff --git a/examples/ping/nats/app/main.go b/examples/ping/nats/app/main.go index bc7afd97..e5f1f018 100644 --- a/examples/ping/nats/app/main.go +++ b/examples/ping/nats/app/main.go @@ -8,10 +8,9 @@ import ( "os/signal" "time" - "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers" + "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/nats" "github.com/lerenn/asyncapi-codegen/pkg/extensions/loggers" "github.com/lerenn/asyncapi-codegen/pkg/extensions/middlewares" - "github.com/nats-io/nats.go" ) type ServerSubscriber struct { @@ -34,23 +33,20 @@ func (s ServerSubscriber) Ping(ctx context.Context, req PingMessage, _ bool) { } func main() { - nc, err := nats.Connect("nats://nats:4222") - if err != nil { - panic(err) - } + // Instanciate a NATS controller with a logger + logger := loggers.NewECS() + broker := nats.NewController("nats://nats:4222", nats.WithLogger(logger)) // Create a new app controller - ctrl, err := NewAppController(brokers.NewNATSController(nc)) + ctrl, err := NewAppController( + broker, // Attach the NATS controller + WithLogger(logger), // Attach an internal logger + WithMiddlewares(middlewares.Logging(logger))) // Attach a middleware to log messages if err != nil { panic(err) } defer ctrl.Close(context.Background()) - // Attach a logger (optional) - logger := loggers.NewECS() - ctrl.SetLogger(logger) - ctrl.AddMiddlewares(middlewares.Logging(logger)) - // Subscribe to all (we could also have just listened on the ping request channel) sub := ServerSubscriber{Controller: ctrl} if err := ctrl.SubscribeAll(context.Background(), sub); err != nil { diff --git a/examples/ping/nats/user/main.go b/examples/ping/nats/user/main.go index e8d881c3..5bb7c8dd 100644 --- a/examples/ping/nats/user/main.go +++ b/examples/ping/nats/user/main.go @@ -6,31 +6,26 @@ import ( "context" "time" - "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers" + "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/nats" "github.com/lerenn/asyncapi-codegen/pkg/extensions/loggers" "github.com/lerenn/asyncapi-codegen/pkg/extensions/middlewares" - - "github.com/nats-io/nats.go" ) func main() { - nc, err := nats.Connect("nats://nats:4222") - if err != nil { - panic(err) - } + // Instanciate a NATS controller with a logger + logger := loggers.NewECS() + broker := nats.NewController("nats://nats:4222", nats.WithLogger(logger)) // Create a new user controller - ctrl, err := NewUserController(brokers.NewNATSController(nc)) + ctrl, err := NewUserController( + broker, // Attach the NATS controller + WithLogger(logger), // Attach an internal logger + WithMiddlewares(middlewares.Logging(logger))) // Attach a middleware to log messages if err != nil { panic(err) } defer ctrl.Close(context.Background()) - // Attach a logger (optional) - logger := loggers.NewECS() - ctrl.SetLogger(logger) - ctrl.AddMiddlewares(middlewares.Logging(logger)) - // Make a new ping message req := NewPingMessage() req.Payload = "ping" diff --git a/examples/ping/nats/user/user.gen.go b/examples/ping/nats/user/user.gen.go index b1883bde..c780d36e 100644 --- a/examples/ping/nats/user/user.gen.go +++ b/examples/ping/nats/user/user.gen.go @@ -17,48 +17,37 @@ import ( // UserSubscriber represents all handlers that are expecting messages for User type UserSubscriber interface { - // Pong + // Pong subscribes to messages placed on the 'pong' channel Pong(ctx context.Context, msg PongMessage, done bool) } // UserController is the structure that provides publishing capabilities to the // developer and and connect the broker with the User type UserController struct { - // brokerController is the broker controller that will be used to communicate - brokerController extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller - logger extensions.Logger - // middlewares are the middlewares that will be executed when sending or - // receiving messages - middlewares []extensions.Middleware + controller } // NewUserController links the User to the broker -func NewUserController(bc extensions.BrokerController) (*UserController, error) { +func NewUserController(bc extensions.BrokerController, options ...ControllerOption) (*UserController, error) { + // Check if broker controller has been provided if bc == nil { return nil, ErrNilBrokerController } - return &UserController{ - brokerController: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), - }, nil -} + // Create default controller + controller := controller{ + broker: bc, + stopSubscribers: make(map[string]chan interface{}), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + } -// SetLogger attaches a logger that will log operations on controller -func (c *UserController) SetLogger(logger extensions.Logger) { - c.logger = logger - c.brokerController.SetLogger(logger) -} + // Apply options + for _, option := range options { + option(&controller) + } -// AddMiddlewares attaches middlewares that will be executed when sending or -// receiving messages -func (c *UserController) AddMiddlewares(middleware ...extensions.Middleware) { - c.middlewares = append(c.middlewares, middleware...) + return &UserController{controller: controller}, nil } func (c UserController) wrapMiddlewares(middlewares []extensions.Middleware, last extensions.NextMiddleware) func(ctx context.Context) { @@ -158,7 +147,7 @@ func (c *UserController) SubscribePong(ctx context.Context, fn func(ctx context. } // Subscribe to broker channel - msgs, stop, err := c.brokerController.Subscribe(ctx, path) + msgs, stop, err := c.broker.Subscribe(ctx, path) if err != nil { c.logger.Error(ctx, err.Error()) return err @@ -258,7 +247,7 @@ func (c *UserController) PublishPing(ctx context.Context, msg PingMessage) error // Publish the message on event-broker through middlewares c.executeMiddlewares(ctx, func(ctx context.Context) { - err = c.brokerController.Publish(ctx, path, bMsg) + err = c.broker.Publish(ctx, path, bMsg) }) // Return error from publication on broker @@ -277,7 +266,7 @@ func (cc *UserController) WaitForPong(ctx context.Context, publishMsg MessageWit ctx = addUserContextValues(ctx, path) // Subscribe to broker channel - msgs, stop, err := cc.brokerController.Subscribe(ctx, path) + msgs, stop, err := cc.broker.Subscribe(ctx, path) if err != nil { cc.logger.Error(ctx, err.Error()) return PongMessage{}, err @@ -357,6 +346,38 @@ var ( ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI) ) +// controller is the controller that will be used to communicate with the broker +// It will be used internally by AppController and UserController +type controller struct { + // broker is the broker controller that will be used to communicate + broker extensions.BrokerController + // stopSubscribers is a map of stop channels for each subscribed channel + stopSubscribers map[string]chan interface{} + // logger is the logger that will be used to log operations on controller + logger extensions.Logger + // middlewares are the middlewares that will be executed when sending or + // receiving messages + middlewares []extensions.Middleware +} + +// ControllerOption is the type of the options that can be passed +// when creating a new Controller +type ControllerOption func(controller *controller) + +// WithLogger attaches a logger to the controller +func WithLogger(logger extensions.Logger) ControllerOption { + return func(controller *controller) { + controller.logger = logger + } +} + +// WithMiddlewares attaches middlewares that will be executed when sending or receiving messages +func WithMiddlewares(middlewares ...extensions.Middleware) ControllerOption { + return func(controller *controller) { + controller.middlewares = middlewares + } +} + type MessageWithCorrelationID interface { CorrelationID() string SetCorrelationID(id string) diff --git a/pkg/codegen/generators/templates/controller.tmpl b/pkg/codegen/generators/templates/controller.tmpl index 3cc43197..d2239b3c 100644 --- a/pkg/codegen/generators/templates/controller.tmpl +++ b/pkg/codegen/generators/templates/controller.tmpl @@ -1,41 +1,30 @@ // {{ .Prefix }}Controller is the structure that provides publishing capabilities to the // developer and and connect the broker with the {{ .Prefix }} type {{ .Prefix }}Controller struct { - // brokerController is the broker controller that will be used to communicate - brokerController extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller - logger extensions.Logger - // middlewares are the middlewares that will be executed when sending or - // receiving messages - middlewares []extensions.Middleware + controller } // New{{ .Prefix }}Controller links the {{ .Prefix }} to the broker -func New{{ .Prefix }}Controller(bc extensions.BrokerController) (*{{ .Prefix }}Controller, error) { +func New{{ .Prefix }}Controller(bc extensions.BrokerController, options ...ControllerOption) (*{{ .Prefix }}Controller, error) { + // Check if broker controller has been provided if bc == nil { return nil, ErrNilBrokerController } - return &{{ .Prefix }}Controller{ - brokerController: bc, + // Create default controller + controller := controller{ + broker: bc, stopSubscribers: make(map[string]chan interface{}), logger: extensions.DummyLogger{}, middlewares: make([]extensions.Middleware, 0), - }, nil -} - -// SetLogger attaches a logger that will log operations on controller -func (c *{{ .Prefix }}Controller) SetLogger(logger extensions.Logger) { - c.logger = logger - c.brokerController.SetLogger(logger) -} + } + + // Apply options + for _, option := range options { + option(&controller) + } -// AddMiddlewares attaches middlewares that will be executed when sending or -// receiving messages -func (c *{{ .Prefix }}Controller) AddMiddlewares(middleware ...extensions.Middleware) { - c.middlewares = append(c.middlewares, middleware...) + return &{{ .Prefix }}Controller{controller: controller}, nil } func (c {{ .Prefix }}Controller) wrapMiddlewares(middlewares []extensions.Middleware, last extensions.NextMiddleware) func(ctx context.Context) { @@ -152,7 +141,7 @@ func (c *{{ $.Prefix }}Controller) Subscribe{{namify $key}}(ctx context.Context, } // Subscribe to broker channel - msgs, stop, err := c.brokerController.Subscribe(ctx, path) + msgs, stop, err := c.broker.Subscribe(ctx, path) if err != nil { c.logger.Error(ctx, err.Error()) return err @@ -270,7 +259,7 @@ func (c *{{ $.Prefix }}Controller) Publish{{namify $key}}(ctx context.Context, m // Publish the message on event-broker through middlewares c.executeMiddlewares(ctx, func(ctx context.Context) { - err = c.brokerController.Publish(ctx, path, bMsg) + err = c.broker.Publish(ctx, path, bMsg) }) // Return error from publication on broker @@ -297,7 +286,7 @@ func (cc *UserController) WaitFor{{namify $key}}(ctx context.Context, publishMsg ctx = add{{ $.Prefix }}ContextValues(ctx, path) // Subscribe to broker channel - msgs, stop, err := cc.brokerController.Subscribe(ctx, path) + msgs, stop, err := cc.broker.Subscribe(ctx, path) if err != nil { cc.logger.Error(ctx, err.Error()) return {{channelToMessageTypeName $value}}{}, err diff --git a/pkg/codegen/generators/templates/subscriber.tmpl b/pkg/codegen/generators/templates/subscriber.tmpl index d2634e4b..526459d7 100644 --- a/pkg/codegen/generators/templates/subscriber.tmpl +++ b/pkg/codegen/generators/templates/subscriber.tmpl @@ -2,7 +2,7 @@ // {{ .Prefix }}Subscriber represents all handlers that are expecting messages for {{ .Prefix }} type {{ .Prefix }}Subscriber interface { {{- range $key, $value := .Channels}} - // {{namify $key}} + // {{namify $key}} subscribes to messages placed on the '{{ $key }}' channel {{namify $key}}(ctx context.Context, msg {{channelToMessageTypeName $value}}, done bool) {{end}} } diff --git a/pkg/codegen/generators/templates/types.tmpl b/pkg/codegen/generators/templates/types.tmpl index 80fb6afd..14634938 100644 --- a/pkg/codegen/generators/templates/types.tmpl +++ b/pkg/codegen/generators/templates/types.tmpl @@ -22,6 +22,38 @@ var ( ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI) ) +// controller is the controller that will be used to communicate with the broker +// It will be used internally by AppController and UserController +type controller struct { + // broker is the broker controller that will be used to communicate + broker extensions.BrokerController + // stopSubscribers is a map of stop channels for each subscribed channel + stopSubscribers map[string]chan interface{} + // logger is the logger that will be used to log operations on controller + logger extensions.Logger + // middlewares are the middlewares that will be executed when sending or + // receiving messages + middlewares []extensions.Middleware +} + +// ControllerOption is the type of the options that can be passed +// when creating a new Controller +type ControllerOption func(controller *controller) + +// WithLogger attaches a logger to the controller +func WithLogger(logger extensions.Logger) ControllerOption { + return func(controller *controller) { + controller.logger = logger + } +} + +// WithMiddlewares attaches middlewares that will be executed when sending or receiving messages +func WithMiddlewares(middlewares ...extensions.Middleware) ControllerOption { + return func(controller *controller) { + controller.middlewares = middlewares + } +} + type MessageWithCorrelationID interface { CorrelationID() string SetCorrelationID(id string) diff --git a/pkg/extensions/broker.go b/pkg/extensions/broker.go index 76b4bbe6..6038a2d6 100644 --- a/pkg/extensions/broker.go +++ b/pkg/extensions/broker.go @@ -13,9 +13,6 @@ type BrokerMessage struct { // BrokerController represents the functions that should be implemented to connect // the broker to the application or the user type BrokerController interface { - // SetLogger set a logger that will log operations on broker controller - SetLogger(logger Logger) - // Publish a message to the broker Publish(ctx context.Context, channel string, mw BrokerMessage) error diff --git a/pkg/extensions/brokers/kafka.go b/pkg/extensions/brokers/kafka/kafka.go similarity index 63% rename from pkg/extensions/brokers/kafka.go rename to pkg/extensions/brokers/kafka/kafka.go index bc1b3425..8de28547 100644 --- a/pkg/extensions/brokers/kafka.go +++ b/pkg/extensions/brokers/kafka/kafka.go @@ -1,4 +1,4 @@ -package brokers +package kafka import ( "context" @@ -7,11 +7,12 @@ import ( "time" "github.com/lerenn/asyncapi-codegen/pkg/extensions" + "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers" "github.com/segmentio/kafka-go" ) -// KafkaController is the Kafka implementation for asyncapi-codegen -type KafkaController struct { +// Controller is the Kafka implementation for asyncapi-codegen +type Controller struct { logger extensions.Logger groupID string hosts []string @@ -19,48 +20,57 @@ type KafkaController struct { maxBytes int } -type KafkaControllerOption func(controller *KafkaController) +type ControllerOption func(controller *Controller) -// NewKafkaController creates a new KafkaController that fulfill the BrokerLinker interface -func NewKafkaController(hosts []string, options ...KafkaControllerOption) *KafkaController { - controller := &KafkaController{ +// NewController creates a new KafkaController that fulfill the BrokerLinker interface +func NewController(hosts []string, options ...ControllerOption) *Controller { + // Create default controller + controller := &Controller{ logger: extensions.DummyLogger{}, - groupID: DefaultQueueGroupID, + groupID: brokers.DefaultQueueGroupID, hosts: hosts, partition: 0, maxBytes: 10e6, // 10MB } + + // Execute options for _, option := range options { option(controller) } + return controller } -func WithGroupID(groupID string) KafkaControllerOption { - return func(controller *KafkaController) { +// WithGroupID set a custom group ID for channel subscription +func WithGroupID(groupID string) ControllerOption { + return func(controller *Controller) { controller.groupID = groupID } } -func WithPartition(partition int) KafkaControllerOption { - return func(controller *KafkaController) { +// WithPartition set the partition to use for the topic +func WithPartition(partition int) ControllerOption { + return func(controller *Controller) { controller.partition = partition } } -func WithMaxBytes(maxBytes int) KafkaControllerOption { - return func(controller *KafkaController) { +// WithMaxBytes set the maximum size of a message +func WithMaxBytes(maxBytes int) ControllerOption { + return func(controller *Controller) { controller.maxBytes = maxBytes } } -// SetLogger set a custom logger that will log operations on broker controller -func (c *KafkaController) SetLogger(logger extensions.Logger) { - c.logger = logger +// WithLogger set a custom logger that will log operations on broker controller +func WithLogger(logger extensions.Logger) ControllerOption { + return func(controller *Controller) { + controller.logger = logger + } } // Publish a message to the broker -func (c *KafkaController) Publish(ctx context.Context, channel string, um extensions.BrokerMessage) error { +func (c *Controller) Publish(ctx context.Context, channel string, um extensions.BrokerMessage) error { w := &kafka.Writer{ Addr: kafka.TCP(c.hosts...), Topic: channel, @@ -101,7 +111,7 @@ func (c *KafkaController) Publish(ctx context.Context, channel string, um extens } // Subscribe to messages from the broker -func (c *KafkaController) Subscribe(ctx context.Context, channel string) (msgs chan extensions.BrokerMessage, stop chan interface{}, err error) { +func (c *Controller) Subscribe(ctx context.Context, channel string) (msgs chan extensions.BrokerMessage, stop chan interface{}, err error) { r := kafka.NewReader(kafka.ReaderConfig{ Brokers: c.hosts, Topic: channel, diff --git a/pkg/extensions/brokers/nats.go b/pkg/extensions/brokers/nats/nats.go similarity index 57% rename from pkg/extensions/brokers/nats.go rename to pkg/extensions/brokers/nats/nats.go index 9ca3f7ef..6bfb03b8 100644 --- a/pkg/extensions/brokers/nats.go +++ b/pkg/extensions/brokers/nats/nats.go @@ -1,44 +1,62 @@ -package brokers +package nats import ( "context" "errors" "github.com/lerenn/asyncapi-codegen/pkg/extensions" + "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers" "github.com/nats-io/nats.go" ) -// NATSController is the NATSController implementation for asyncapi-codegen -type NATSController struct { +// Controller is the Controller implementation for asyncapi-codegen +type Controller struct { connection *nats.Conn logger extensions.Logger queueGroup string } -// NewNATSController creates a new NATS that fulfill the BrokerLinker interface -func NewNATSController(connection *nats.Conn) *NATSController { - return &NATSController{ - connection: connection, - queueGroup: DefaultQueueGroupID, +type ControllerOption func(controller *Controller) + +// NewController creates a new NATS controller +func NewController(url string, options ...ControllerOption) *Controller { + // Connect to NATS + nc, err := nats.Connect(url) + if err != nil { + panic(err) + } + + // Creates default controller + controller := &Controller{ + connection: nc, + queueGroup: brokers.DefaultQueueGroupID, logger: extensions.DummyLogger{}, } + + // Execute options + for _, option := range options { + option(controller) + } + + return controller } -// SetQueueGroup sets a custom queue group name for channel subscription -// -// It can be used for multiple applications listening one the same channel but -// wants to listen on different queues. -func (c *NATSController) SetQueueGroup(name string) { - c.queueGroup = name +// WithQueueGroup set a custom queue group for channel subscription +func WithQueueGroup(name string) ControllerOption { + return func(controller *Controller) { + controller.queueGroup = name + } } -// SetLogger set a custom logger that will log operations on broker controller -func (c *NATSController) SetLogger(logger extensions.Logger) { - c.logger = logger +// WithLogger set a custom logger that will log operations on broker controller +func WithLogger(logger extensions.Logger) ControllerOption { + return func(controller *Controller) { + controller.logger = logger + } } // Publish a message to the broker -func (c *NATSController) Publish(_ context.Context, channel string, bm extensions.BrokerMessage) error { +func (c *Controller) Publish(_ context.Context, channel string, bm extensions.BrokerMessage) error { msg := nats.NewMsg(channel) // Set message headers and content @@ -57,7 +75,7 @@ func (c *NATSController) Publish(_ context.Context, channel string, bm extension } // Subscribe to messages from the broker -func (c *NATSController) Subscribe(ctx context.Context, channel string) (msgs chan extensions.BrokerMessage, stop chan interface{}, err error) { +func (c *Controller) Subscribe(ctx context.Context, channel string) (msgs chan extensions.BrokerMessage, stop chan interface{}, err error) { // Subscribe to channel natsMsgs := make(chan *nats.Msg, 64) sub, err := c.connection.QueueSubscribeSyncWithChan(channel, c.queueGroup, natsMsgs) diff --git a/test/brokers.go b/test/brokers.go index b1739468..2282ea6e 100644 --- a/test/brokers.go +++ b/test/brokers.go @@ -4,25 +4,15 @@ import ( "testing" "github.com/lerenn/asyncapi-codegen/pkg/extensions" - "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers" - "github.com/nats-io/nats.go" - - "github.com/stretchr/testify/assert" + "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/kafka" + "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/nats" ) // BrokerControllers returns a list of BrokerController to test based on the // docker-compose file of the project. func BrokerControllers(t *testing.T) []extensions.BrokerController { - // NATS - nc, err := nats.Connect("nats://localhost:4222") - assert.NoError(t, err) - natsController := brokers.NewNATSController(nc) - - // Kafka - kafkaController := brokers.NewKafkaController([]string{"localhost:9094"}) - return []extensions.BrokerController{ - natsController, - kafkaController, + nats.NewController("nats://localhost:4222"), + kafka.NewController([]string{"localhost:9094"}), } } diff --git a/test/issues/49/asyncapi.gen.go b/test/issues/49/asyncapi.gen.go index 3e0164b1..95bdcf85 100644 --- a/test/issues/49/asyncapi.gen.go +++ b/test/issues/49/asyncapi.gen.go @@ -14,48 +14,37 @@ import ( // AppSubscriber represents all handlers that are expecting messages for App type AppSubscriber interface { - // Chat + // Chat subscribes to messages placed on the '/chat' channel Chat(ctx context.Context, msg ChatMessage, done bool) } // AppController is the structure that provides publishing capabilities to the // developer and and connect the broker with the App type AppController struct { - // brokerController is the broker controller that will be used to communicate - brokerController extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller - logger extensions.Logger - // middlewares are the middlewares that will be executed when sending or - // receiving messages - middlewares []extensions.Middleware + controller } // NewAppController links the App to the broker -func NewAppController(bc extensions.BrokerController) (*AppController, error) { +func NewAppController(bc extensions.BrokerController, options ...ControllerOption) (*AppController, error) { + // Check if broker controller has been provided if bc == nil { return nil, ErrNilBrokerController } - return &AppController{ - brokerController: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), - }, nil -} + // Create default controller + controller := controller{ + broker: bc, + stopSubscribers: make(map[string]chan interface{}), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + } -// SetLogger attaches a logger that will log operations on controller -func (c *AppController) SetLogger(logger extensions.Logger) { - c.logger = logger - c.brokerController.SetLogger(logger) -} + // Apply options + for _, option := range options { + option(&controller) + } -// AddMiddlewares attaches middlewares that will be executed when sending or -// receiving messages -func (c *AppController) AddMiddlewares(middleware ...extensions.Middleware) { - c.middlewares = append(c.middlewares, middleware...) + return &AppController{controller: controller}, nil } func (c AppController) wrapMiddlewares(middlewares []extensions.Middleware, last extensions.NextMiddleware) func(ctx context.Context) { @@ -155,7 +144,7 @@ func (c *AppController) SubscribeChat(ctx context.Context, fn func(ctx context.C } // Subscribe to broker channel - msgs, stop, err := c.brokerController.Subscribe(ctx, path) + msgs, stop, err := c.broker.Subscribe(ctx, path) if err != nil { c.logger.Error(ctx, err.Error()) return err @@ -244,7 +233,7 @@ func (c *AppController) PublishStatus(ctx context.Context, msg StatusMessage) er // Publish the message on event-broker through middlewares c.executeMiddlewares(ctx, func(ctx context.Context) { - err = c.brokerController.Publish(ctx, path, bMsg) + err = c.broker.Publish(ctx, path, bMsg) }) // Return error from publication on broker @@ -253,51 +242,40 @@ func (c *AppController) PublishStatus(ctx context.Context, msg StatusMessage) er // UserSubscriber represents all handlers that are expecting messages for User type UserSubscriber interface { - // Chat + // Chat subscribes to messages placed on the '/chat' channel Chat(ctx context.Context, msg ChatMessage, done bool) - // Status + // Status subscribes to messages placed on the '/status' channel Status(ctx context.Context, msg StatusMessage, done bool) } // UserController is the structure that provides publishing capabilities to the // developer and and connect the broker with the User type UserController struct { - // brokerController is the broker controller that will be used to communicate - brokerController extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller - logger extensions.Logger - // middlewares are the middlewares that will be executed when sending or - // receiving messages - middlewares []extensions.Middleware + controller } // NewUserController links the User to the broker -func NewUserController(bc extensions.BrokerController) (*UserController, error) { +func NewUserController(bc extensions.BrokerController, options ...ControllerOption) (*UserController, error) { + // Check if broker controller has been provided if bc == nil { return nil, ErrNilBrokerController } - return &UserController{ - brokerController: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), - }, nil -} + // Create default controller + controller := controller{ + broker: bc, + stopSubscribers: make(map[string]chan interface{}), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + } -// SetLogger attaches a logger that will log operations on controller -func (c *UserController) SetLogger(logger extensions.Logger) { - c.logger = logger - c.brokerController.SetLogger(logger) -} + // Apply options + for _, option := range options { + option(&controller) + } -// AddMiddlewares attaches middlewares that will be executed when sending or -// receiving messages -func (c *UserController) AddMiddlewares(middleware ...extensions.Middleware) { - c.middlewares = append(c.middlewares, middleware...) + return &UserController{controller: controller}, nil } func (c UserController) wrapMiddlewares(middlewares []extensions.Middleware, last extensions.NextMiddleware) func(ctx context.Context) { @@ -397,7 +375,7 @@ func (c *UserController) SubscribeStatus(ctx context.Context, fn func(ctx contex } // Subscribe to broker channel - msgs, stop, err := c.brokerController.Subscribe(ctx, path) + msgs, stop, err := c.broker.Subscribe(ctx, path) if err != nil { c.logger.Error(ctx, err.Error()) return err @@ -486,7 +464,7 @@ func (c *UserController) PublishChat(ctx context.Context, msg ChatMessage) error // Publish the message on event-broker through middlewares c.executeMiddlewares(ctx, func(ctx context.Context) { - err = c.brokerController.Publish(ctx, path, bMsg) + err = c.broker.Publish(ctx, path, bMsg) }) // Return error from publication on broker @@ -517,6 +495,38 @@ var ( ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI) ) +// controller is the controller that will be used to communicate with the broker +// It will be used internally by AppController and UserController +type controller struct { + // broker is the broker controller that will be used to communicate + broker extensions.BrokerController + // stopSubscribers is a map of stop channels for each subscribed channel + stopSubscribers map[string]chan interface{} + // logger is the logger that will be used to log operations on controller + logger extensions.Logger + // middlewares are the middlewares that will be executed when sending or + // receiving messages + middlewares []extensions.Middleware +} + +// ControllerOption is the type of the options that can be passed +// when creating a new Controller +type ControllerOption func(controller *controller) + +// WithLogger attaches a logger to the controller +func WithLogger(logger extensions.Logger) ControllerOption { + return func(controller *controller) { + controller.logger = logger + } +} + +// WithMiddlewares attaches middlewares that will be executed when sending or receiving messages +func WithMiddlewares(middlewares ...extensions.Middleware) ControllerOption { + return func(controller *controller) { + controller.middlewares = middlewares + } +} + type MessageWithCorrelationID interface { CorrelationID() string SetCorrelationID(id string) diff --git a/test/issues/74/asyncapi.gen.go b/test/issues/74/asyncapi.gen.go index 108f6536..b157edd5 100644 --- a/test/issues/74/asyncapi.gen.go +++ b/test/issues/74/asyncapi.gen.go @@ -15,48 +15,37 @@ import ( // AppSubscriber represents all handlers that are expecting messages for App type AppSubscriber interface { - // TestChannel + // TestChannel subscribes to messages placed on the 'testChannel' channel TestChannel(ctx context.Context, msg TestMessage, done bool) } // AppController is the structure that provides publishing capabilities to the // developer and and connect the broker with the App type AppController struct { - // brokerController is the broker controller that will be used to communicate - brokerController extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller - logger extensions.Logger - // middlewares are the middlewares that will be executed when sending or - // receiving messages - middlewares []extensions.Middleware + controller } // NewAppController links the App to the broker -func NewAppController(bc extensions.BrokerController) (*AppController, error) { +func NewAppController(bc extensions.BrokerController, options ...ControllerOption) (*AppController, error) { + // Check if broker controller has been provided if bc == nil { return nil, ErrNilBrokerController } - return &AppController{ - brokerController: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), - }, nil -} + // Create default controller + controller := controller{ + broker: bc, + stopSubscribers: make(map[string]chan interface{}), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + } -// SetLogger attaches a logger that will log operations on controller -func (c *AppController) SetLogger(logger extensions.Logger) { - c.logger = logger - c.brokerController.SetLogger(logger) -} + // Apply options + for _, option := range options { + option(&controller) + } -// AddMiddlewares attaches middlewares that will be executed when sending or -// receiving messages -func (c *AppController) AddMiddlewares(middleware ...extensions.Middleware) { - c.middlewares = append(c.middlewares, middleware...) + return &AppController{controller: controller}, nil } func (c AppController) wrapMiddlewares(middlewares []extensions.Middleware, last extensions.NextMiddleware) func(ctx context.Context) { @@ -156,7 +145,7 @@ func (c *AppController) SubscribeTestChannel(ctx context.Context, fn func(ctx co } // Subscribe to broker channel - msgs, stop, err := c.brokerController.Subscribe(ctx, path) + msgs, stop, err := c.broker.Subscribe(ctx, path) if err != nil { c.logger.Error(ctx, err.Error()) return err @@ -227,41 +216,30 @@ func (c *AppController) UnsubscribeTestChannel(ctx context.Context) { // UserController is the structure that provides publishing capabilities to the // developer and and connect the broker with the User type UserController struct { - // brokerController is the broker controller that will be used to communicate - brokerController extensions.BrokerController - // stopSubscribers is a map of stop channels for each subscribed channel - stopSubscribers map[string]chan interface{} - // logger is the logger that will be used to log operations on controller - logger extensions.Logger - // middlewares are the middlewares that will be executed when sending or - // receiving messages - middlewares []extensions.Middleware + controller } // NewUserController links the User to the broker -func NewUserController(bc extensions.BrokerController) (*UserController, error) { +func NewUserController(bc extensions.BrokerController, options ...ControllerOption) (*UserController, error) { + // Check if broker controller has been provided if bc == nil { return nil, ErrNilBrokerController } - return &UserController{ - brokerController: bc, - stopSubscribers: make(map[string]chan interface{}), - logger: extensions.DummyLogger{}, - middlewares: make([]extensions.Middleware, 0), - }, nil -} + // Create default controller + controller := controller{ + broker: bc, + stopSubscribers: make(map[string]chan interface{}), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + } -// SetLogger attaches a logger that will log operations on controller -func (c *UserController) SetLogger(logger extensions.Logger) { - c.logger = logger - c.brokerController.SetLogger(logger) -} + // Apply options + for _, option := range options { + option(&controller) + } -// AddMiddlewares attaches middlewares that will be executed when sending or -// receiving messages -func (c *UserController) AddMiddlewares(middleware ...extensions.Middleware) { - c.middlewares = append(c.middlewares, middleware...) + return &UserController{controller: controller}, nil } func (c UserController) wrapMiddlewares(middlewares []extensions.Middleware, last extensions.NextMiddleware) func(ctx context.Context) { @@ -333,7 +311,7 @@ func (c *UserController) PublishTestChannel(ctx context.Context, msg TestMessage // Publish the message on event-broker through middlewares c.executeMiddlewares(ctx, func(ctx context.Context) { - err = c.brokerController.Publish(ctx, path, bMsg) + err = c.broker.Publish(ctx, path, bMsg) }) // Return error from publication on broker @@ -364,6 +342,38 @@ var ( ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI) ) +// controller is the controller that will be used to communicate with the broker +// It will be used internally by AppController and UserController +type controller struct { + // broker is the broker controller that will be used to communicate + broker extensions.BrokerController + // stopSubscribers is a map of stop channels for each subscribed channel + stopSubscribers map[string]chan interface{} + // logger is the logger that will be used to log operations on controller + logger extensions.Logger + // middlewares are the middlewares that will be executed when sending or + // receiving messages + middlewares []extensions.Middleware +} + +// ControllerOption is the type of the options that can be passed +// when creating a new Controller +type ControllerOption func(controller *controller) + +// WithLogger attaches a logger to the controller +func WithLogger(logger extensions.Logger) ControllerOption { + return func(controller *controller) { + controller.logger = logger + } +} + +// WithMiddlewares attaches middlewares that will be executed when sending or receiving messages +func WithMiddlewares(middlewares ...extensions.Middleware) ControllerOption { + return func(controller *controller) { + controller.middlewares = middlewares + } +} + type MessageWithCorrelationID interface { CorrelationID() string SetCorrelationID(id string) diff --git a/test/issues/74/suite_test.go b/test/issues/74/suite_test.go index 4aa7c85b..0085d307 100644 --- a/test/issues/74/suite_test.go +++ b/test/issues/74/suite_test.go @@ -38,20 +38,19 @@ func NewSuite(broker extensions.BrokerController) *Suite { } func (suite *Suite) SetupSuite() { + // Create a channel to intercept message before sending to broker and after + // reception from broker + suite.interceptor = make(chan extensions.BrokerMessage, 8) + // Create app - app, err := NewAppController(suite.broker) + app, err := NewAppController(suite.broker, WithMiddlewares(middlewares.Intercepter(suite.interceptor))) suite.Require().NoError(err) suite.app = app // Create user - user, err := NewUserController(suite.broker) + user, err := NewUserController(suite.broker, WithMiddlewares(middlewares.Intercepter(suite.interceptor))) suite.Require().NoError(err) suite.user = user - - // Add interceptor - suite.interceptor = make(chan extensions.BrokerMessage, 8) - app.AddMiddlewares(middlewares.Intercepter(suite.interceptor)) - user.AddMiddlewares(middlewares.Intercepter(suite.interceptor)) } func (suite *Suite) TestHeaders() {