Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lerenn committed Sep 11, 2023
1 parent d174e43 commit 629ff5f
Show file tree
Hide file tree
Showing 23 changed files with 90 additions and 3,253 deletions.
3 changes: 0 additions & 3 deletions .gitmodules

This file was deleted.

2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ lint: ## Lint the code
clean: __examples/clean brokers/down ## Clean up the project

.PHONY: check
check: generate lint examples test ## Check that everything is ready for commit
check: clean generate lint examples test ## Check that everything is ready for commit

.PHONY: __examples/clean
__examples/clean:
Expand Down
26 changes: 0 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -549,29 +549,6 @@ ctrl, _ := NewAppController(brokers.NewNATSController(nc))
ctrl.SetLogger(SimpleLogger{})
```

### Use of queue groups and queue name customization

Queues are used under the hood by implemented brokers to have replicates of a
same service that automatically load-balance request between instances. By
default, the queue name used by implemented brokers is `asyncapi`.

However, it is possible to set a custom queue name if you want to have multiple
applications which uses code generated by `asyncapi-codegen` but on different
queues:

```golang
import(
"github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers"
/* ... */
)

// Generate a new NATS controller
ctrl := brokers.NewNATSController(nc)

// Set queue name on the NATS controller
ctrl.SetQueueName("my-custom-queue-name")
```

### Implementing your own broker controller

In order to connect your application and your client to your broker, we need to
Expand All @@ -591,9 +568,6 @@ type BrokerController interface {

// Subscribe to messages from the broker
Subscribe(ctx context.Context, channel string) (msgs chan extensions.BrokerMessage, stop chan interface{}, err error)

// SetQueueName sets the name of the queue that will be used by the broker
SetQueueName(name string)
}
```

Expand Down
10 changes: 6 additions & 4 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ services:
kafka:
image: 'bitnami/kafka:latest'
ports:
- 9092:9092
# Listening on a different port to avoid collision with 9092 internal listener
# Prefering to keep 9092 for internal listener as it is the port for examples
- 9094:9094
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=INTERNAL://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
Expand Down
2 changes: 2 additions & 0 deletions examples/ping/kafka/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers"
"github.com/lerenn/asyncapi-codegen/pkg/extensions/loggers"
"github.com/lerenn/asyncapi-codegen/pkg/extensions/middlewares"
)

type ServerSubscriber struct {
Expand Down Expand Up @@ -46,6 +47,7 @@ func main() {
// Attach a logger (optional)
logger := loggers.NewECS()
ctrl.SetLogger(logger)
ctrl.AddMiddlewares(middlewares.Logging(logger))

// Subscribe to all (we could also have just listened on the ping request channel)
sub := ServerSubscriber{Controller: ctrl}
Expand Down
2 changes: 2 additions & 0 deletions examples/ping/nats/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers"
"github.com/lerenn/asyncapi-codegen/pkg/extensions/loggers"
"github.com/lerenn/asyncapi-codegen/pkg/extensions/middlewares"
"github.com/nats-io/nats.go"
)

Expand Down Expand Up @@ -48,6 +49,7 @@ func main() {
// Attach a logger (optional)
logger := loggers.NewECS()
ctrl.SetLogger(logger)
ctrl.AddMiddlewares(middlewares.Logging(logger))

// Subscribe to all (we could also have just listened on the ping request channel)
sub := ServerSubscriber{Controller: ctrl}
Expand Down
3 changes: 0 additions & 3 deletions pkg/extensions/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,4 @@ type BrokerController interface {

// Subscribe to messages from the broker
Subscribe(ctx context.Context, channel string) (msgs chan BrokerMessage, stop chan interface{}, err error)

// SetQueueName sets the name of the queue that will be used by the broker
SetQueueName(name string)
}
6 changes: 6 additions & 0 deletions pkg/extensions/brokers/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package brokers

const (
// DefaultQueueGroupID is the default queue name used by brokers
DefaultQueueGroupID = "asyncapi"
)
41 changes: 25 additions & 16 deletions pkg/extensions/brokers/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package brokers

import (
"context"
"errors"
"fmt"
"time"

"github.com/lerenn/asyncapi-codegen/pkg/extensions"
"github.com/segmentio/kafka-go"
Expand All @@ -11,9 +13,8 @@ import (
// KafkaController is the Kafka implementation for asyncapi-codegen
type KafkaController struct {
logger extensions.Logger
queueName string
hosts []string
groupID string
hosts []string
partition int
maxBytes int
}
Expand All @@ -23,7 +24,8 @@ type KafkaControllerOption func(controller *KafkaController)
// NewKafkaController creates a new KafkaController that fulfill the BrokerLinker interface
func NewKafkaController(hosts []string, options ...KafkaControllerOption) *KafkaController {
controller := &KafkaController{
queueName: "asyncapi",
logger: extensions.DummyLogger{},
groupID: DefaultQueueGroupID,
hosts: hosts,
partition: 0,
maxBytes: 10e6, // 10MB
Expand Down Expand Up @@ -52,14 +54,6 @@ func WithMaxBytes(maxBytes int) KafkaControllerOption {
}
}

// SetQueueName sets a custom queue name for channel subscription
//
// It can be used for multiple applications listening one the same channel but
// wants to listen on different queues.
func (c *KafkaController) SetQueueName(name string) {
c.queueName = name
}

// SetLogger set a custom logger that will log operations on broker controller
func (c *KafkaController) SetLogger(logger extensions.Logger) {
c.logger = logger
Expand All @@ -84,12 +78,26 @@ func (c *KafkaController) Publish(ctx context.Context, channel string, um extens
msg.Headers = append(msg.Headers, kafka.Header{Key: k, Value: v})
}

// Publish message
if err := w.WriteMessages(ctx, msg); err != nil {
for {
// Publish message
err := w.WriteMessages(ctx, msg)

// If there is no error then return
if err == nil {
return nil
}

// Create topic if not exists, then it means that the topic is being
// created, so let's retry
if errors.Is(err, kafka.UnknownTopicOrPartition) {
c.logger.Warning(ctx, fmt.Sprintf("Topic %s does not exists: waiting for creation and retry", channel))
time.Sleep(100 * time.Millisecond)
continue
}

// Unexpected error
return err
}

return nil
}

// Subscribe to messages from the broker
Expand All @@ -99,6 +107,7 @@ func (c *KafkaController) Subscribe(ctx context.Context, channel string) (msgs c
Topic: channel,
Partition: c.partition,
MaxBytes: c.maxBytes,
GroupID: c.groupID,
})

// Handle events
Expand Down Expand Up @@ -128,7 +137,7 @@ func (c *KafkaController) Subscribe(ctx context.Context, channel string) (msgs c
go func() {
// Handle closure request from function caller
for range stop {
fmt.Print("Stopping subscriber")
c.logger.Info(ctx, "Stopping subscriber")
if err := r.Close(); err != nil && c.logger != nil {
c.logger.Error(ctx, err.Error())
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/extensions/brokers/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,24 @@ import (
type NATSController struct {
connection *nats.Conn
logger extensions.Logger
queueName string
queueGroup string
}

// NewNATSController creates a new NATS that fulfill the BrokerLinker interface
func NewNATSController(connection *nats.Conn) *NATSController {
return &NATSController{
connection: connection,
queueName: "asyncapi",
queueGroup: DefaultQueueGroupID,
logger: extensions.DummyLogger{},
}
}

// SetQueueName sets a custom queue name for channel subscription
// SetQueueGroup sets a custom queue group name for channel subscription
//
// It can be used for multiple applications listening one the same channel but
// wants to listen on different queues.
func (c *NATSController) SetQueueName(name string) {
c.queueName = name
func (c *NATSController) SetQueueGroup(name string) {
c.queueGroup = name
}

// SetLogger set a custom logger that will log operations on broker controller
Expand Down Expand Up @@ -59,7 +60,7 @@ func (c *NATSController) Publish(_ context.Context, channel string, bm extension
func (c *NATSController) Subscribe(ctx context.Context, channel string) (msgs chan extensions.BrokerMessage, stop chan interface{}, err error) {
// Subscribe to channel
natsMsgs := make(chan *nats.Msg, 64)
sub, err := c.connection.QueueSubscribeSyncWithChan(channel, c.queueName, natsMsgs)
sub, err := c.connection.QueueSubscribeSyncWithChan(channel, c.queueGroup, natsMsgs)
if err != nil {
return nil, nil, err
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/extensions/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ type Logger interface {
// Info logs information based on a message and key-value elements
Info(ctx context.Context, msg string, info ...LogInfo)

// Warning logs information based on a message and key-value elements
// This levels indicates a non-expected state but that does not prevent the
// application to work properly
Warning(ctx context.Context, msg string, info ...LogInfo)

// Error logs error based on a message and key-value elements
Error(ctx context.Context, msg string, info ...LogInfo)
}
Expand All @@ -24,5 +29,8 @@ type DummyLogger struct {
// Info logs information based on a message and key-value elements
func (dl DummyLogger) Info(_ context.Context, _ string, _ ...LogInfo) {}

// Warning logs information based on a message and key-value elements
func (dl DummyLogger) Warning(_ context.Context, _ string, _ ...LogInfo) {}

// Error logs error based on a message and key-value elements
func (dl DummyLogger) Error(_ context.Context, _ string, _ ...LogInfo) {}
19 changes: 11 additions & 8 deletions pkg/extensions/loggers/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"time"

Expand Down Expand Up @@ -96,18 +95,22 @@ func (logger ECS) formatLog(ctx context.Context, msg string, info ...extensions.
return string(b)
}

func (logger ECS) Info(ctx context.Context, msg string, info ...extensions.LogInfo) {
func (logger ECS) logWithLevel(ctx context.Context, level string, msg string, info ...extensions.LogInfo) {
// Add additional keys
info = append(info, extensions.LogInfo{Key: "log.level", Value: "info"})

// Print log
log.Print(logger.formatLog(ctx, msg, info...))
fmt.Println(logger.formatLog(ctx, msg, info...))
}

func (logger ECS) Error(ctx context.Context, msg string, info ...extensions.LogInfo) {
// Add additional keys
info = append(info, extensions.LogInfo{Key: "log.level", Value: "error"})
func (logger ECS) Info(ctx context.Context, msg string, info ...extensions.LogInfo) {
logger.logWithLevel(ctx, "info", msg, info...)
}

// Print log
log.Print(logger.formatLog(ctx, msg, info...))
func (logger ECS) Warning(ctx context.Context, msg string, info ...extensions.LogInfo) {
logger.logWithLevel(ctx, "warning", msg, info...)
}

func (logger ECS) Error(ctx context.Context, msg string, info ...extensions.LogInfo) {
logger.logWithLevel(ctx, "error", msg, info...)
}
2 changes: 1 addition & 1 deletion test/brokers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func BrokerControllers(t *testing.T) []extensions.BrokerController {
natsController := brokers.NewNATSController(nc)

// Kafka
kafkaController := brokers.NewKafkaController([]string{"127.0.0.1:9092"})
kafkaController := brokers.NewKafkaController([]string{"localhost:9094"})

return []extensions.BrokerController{
natsController,
Expand Down
18 changes: 16 additions & 2 deletions test/issues/74/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,33 @@ func (suite *Suite) TestHeaders() {

// Check what the app receive and translate
var recvMsg TestMessage
suite.app.SubscribeTestChannel(context.Background(), func(_ context.Context, msg TestMessage, _ bool) {
err := suite.app.SubscribeTestChannel(context.Background(), func(_ context.Context, msg TestMessage, _ bool) {
recvMsg = msg
wg.Done()
})
suite.Require().NoError(err)
wg.Add(1)

// Publish the message
err := suite.client.PublishTestChannel(context.Background(), sent)
err = suite.client.PublishTestChannel(context.Background(), sent)
suite.Require().NoError(err)

// Wait for the message to be received by the app
wg.Wait()

// Check received message
suite.Require().Equal(sent, recvMsg)

// Check sent message to broker
bMsg := <-suite.interceptor

// Check that version is in the header
version, exists := bMsg.Headers["version"]
suite.Require().True(exists)
suite.Require().Equal([]byte("1.0.0"), version)

// Check that datetime is in the header
datetime, exists := bMsg.Headers["dateTime"]
suite.Require().True(exists)
suite.Require().Equal([]byte("2020-01-01T00:00:00Z"), datetime)
}
5 changes: 5 additions & 0 deletions test/issues/TODO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
- [anyof](https://github.com/asyncapi/spec/blob/master/examples/anyof.yml)
- [correlation-id](https://github.com/asyncapi/spec/blob/master/examples/correlation-id.yml)
- [oneof](https://github.com/asyncapi/spec/blob/master/examples/oneof.yml)
- [rpc-server](https://github.com/asyncapi/spec/blob/master/examples/rpc-server.yml)
- [simple](https://github.com/asyncapi/spec/blob/master/examples/simple.yml)
Loading

0 comments on commit 629ff5f

Please sign in to comment.