Skip to content

Commit

Permalink
add options
Browse files Browse the repository at this point in the history
  • Loading branch information
lerenn committed Sep 20, 2023
1 parent ad03a98 commit dd01f58
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 28 deletions.
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -652,10 +652,30 @@ err := appV2.SubscribeHello(context.Background(), func(_ context.Context, msg v2

That way, you can support multiple different versions with the same broker.

#### Version tagging

**Important**: this feature will add an `application-version` header to each
message in order to have the correction version of the application on each of
message in order to have the correct version of the application on each of
them.

##### Non-tagged messages

If messages can have no `application-version`, you can use the option `WithDefaultVersion`
to add a default version to non-tagged messages.

```golang
vw := versioning.NewWrapper(broker, versioning.WithDefaultVersion("1.1.4"))
```

##### Change header key for application version

Also, if you don't want to use this header as a recipient to the application version,
you can specify your own header with the option `WithVersionHeaderKey`.

```golang
vw := versioning.NewWrapper(broker, versioning.WithVersionHeaderKey("my-version-key"))
```

### AsyncAPI Extensions

#### Schema Object extensions
Expand Down
5 changes: 3 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3.5'

services:
kafka:
image: 'bitnami/kafka:latest'
image: 'bitnami/kafka:3.5.1'
ports:
# Listening on a different port to avoid collision with 9092 internal listener
# Prefering to keep 9092 for internal listener as it is the port for examples
Expand All @@ -19,7 +19,8 @@ services:
networks:
- brokers
nats:
image: nats:alpine
image: nats:2.10
command: ["-V"]
ports:
- 4222:4222
networks:
Expand Down
5 changes: 4 additions & 1 deletion examples/helloworld/nats/app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ import (
)

func main() {
// Create a new broker
broker := nats.NewController("nats://nats:4222", nats.WithQueueGroup("helloworld-apps"))

// Create a new application controller
ctrl, err := NewAppController(nats.NewController("nats://nats:4222"))
ctrl, err := NewAppController(broker)
if err != nil {
panic(err)
}
Expand Down
5 changes: 4 additions & 1 deletion examples/helloworld/nats/user/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import (
)

func main() {
// Create a new broker
broker := nats.NewController("nats://nats:4222", nats.WithQueueGroup("helloworld-users"))

// Create a new user controller
ctrl, err := NewUserController(nats.NewController("nats://nats:4222"))
ctrl, err := NewUserController(broker)
if err != nil {
panic(err)
}
Expand Down
6 changes: 5 additions & 1 deletion examples/ping/nats/app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ func (s ServerSubscriber) Ping(ctx context.Context, req PingMessage, _ bool) {
func main() {
// Instanciate a NATS controller with a logger
logger := loggers.NewText()
broker := nats.NewController("nats://nats:4222", nats.WithLogger(logger))
broker := nats.NewController(
"nats://nats:4222", // Set URL to broker
nats.WithLogger(logger), // Attach an internal logger
nats.WithQueueGroup("ping-apps"), // Set a specific queue group to avoid collisions
)

// Create a new app controller
ctrl, err := NewAppController(
Expand Down
6 changes: 5 additions & 1 deletion examples/ping/nats/user/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ import (
func main() {
// Instanciate a NATS controller with a logger
logger := loggers.NewText()
broker := nats.NewController("nats://nats:4222", nats.WithLogger(logger))
broker := nats.NewController(
"nats://nats:4222", // Set URL to broker
nats.WithLogger(logger), // Attach an internal logger
nats.WithQueueGroup("ping-users"), // Set a specific queue group to avoid collisions
)

// Create a new user controller
ctrl, err := NewUserController(
Expand Down
29 changes: 23 additions & 6 deletions pkg/extensions/versioning/broker_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ type brokerSubscription struct {
}

func newBrokerSubscription(
channel string,
channelName string,
messages chan extensions.BrokerMessage,
cancel chan any,
parent *Wrapper,
) brokerSubscription {
return brokerSubscription{
channelName: channel,
channelName: channelName,
messages: messages,
cancel: cancel,
versionsChannels: make(map[string]versionSubcription),
parent: parent,
versionsChannels: make(map[string]versionSubcription),
}
}

Expand Down Expand Up @@ -89,23 +89,40 @@ func (bs *brokerSubscription) launchListener(ctx context.Context) {
go func() {
for {
// Wait for new messages
msg := <-bs.messages
msg, open := <-bs.messages
if !open {
break
}

// Get the version from the message
version := string(msg.Headers[VersionField])
bVersion, exists := msg.Headers[bs.parent.versionHeaderKey]
version := string(bVersion)

// Add default version if none is specified
if !exists || version == "" {
// If there is a default version activated, then go on with it
if bs.parent.defaultVersion != nil {
version = *bs.parent.defaultVersion
} else {
ctx = context.WithValue(ctx, extensions.ContextKeyIsBrokerMessage, msg)
bs.parent.logger.Error(ctx, "no version in the message and no default version")
continue
}
}

// Lock the versions to avoid conflict
bs.versionsMutex.Lock()

// Get the correct channel based on the version
ch, exists := bs.versionsChannels[version]
if !exists {
// Set contextextensions.
// Set context
ctx = context.WithValue(ctx, extensions.ContextKeyIsBrokerMessage, msg)
ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, version)

// Log the error
bs.parent.logger.Error(ctx, fmt.Sprintf("version %q is not registered", version))
continue
}

// Unlock the versions
Expand Down
45 changes: 32 additions & 13 deletions pkg/extensions/versioning/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (

var _ extensions.BrokerController = (*Wrapper)(nil)

// VersionField is the field that will be added to a message to get the version.
const VersionField = "application-version"
// DefaultVersionHeaderKey is the field that will be added to a message to get the version.
const DefaultVersionHeaderKey = "application-version"

var (
// ErrNoVersion happens when there is no version in the context or the message.
Expand All @@ -21,8 +21,10 @@ var (
// Wrapper allows to use multiple version of the same App/User Controllers
// on one Broker Controller in order to handle migrations.
type Wrapper struct {
broker extensions.BrokerController
logger extensions.Logger
broker extensions.BrokerController
logger extensions.Logger
defaultVersion *string
versionHeaderKey string

channels map[string]*brokerSubscription
channelsMutex sync.Mutex
Expand All @@ -34,32 +36,49 @@ type WrapperOption func(versionWrapper *Wrapper)
// NewWrapper creates a Version Wrapper around a Broker Controller.
func NewWrapper(broker extensions.BrokerController, options ...WrapperOption) *Wrapper {
// Create version Wrapper
vw := Wrapper{
broker: broker,
channels: make(map[string]*brokerSubscription),
logger: extensions.DummyLogger{},
w := Wrapper{
broker: broker,
channels: make(map[string]*brokerSubscription),
logger: extensions.DummyLogger{},
versionHeaderKey: DefaultVersionHeaderKey,
}

// Execute options
for _, option := range options {
option(&vw)
option(&w)
}

return &vw
return &w
}

// WithLogger lets add a logger to the Wrapper struct.
func WithLogger(logger extensions.Logger) WrapperOption {
return func(versionWrapper *Wrapper) {
versionWrapper.logger = logger
return func(wrapper *Wrapper) {
wrapper.logger = logger
}
}

// WithDefaultVersion lets add a default version to tag messages that don't have
// versions tagged.
func WithDefaultVersion(version string) WrapperOption {
return func(wrapper *Wrapper) {
wrapper.defaultVersion = &version
}
}

// WithVersionHeaderKey lets use a different version header key to add application
// version to published messages and retrieve it from received messages.
func WithVersionHeaderKey(headerKey string) WrapperOption {
return func(wrapper *Wrapper) {
wrapper.versionHeaderKey = headerKey
}
}

// Publish a message to the broker.
func (w *Wrapper) Publish(ctx context.Context, channel string, mw extensions.BrokerMessage) error {
// Add version to message
extensions.IfContextSetWith(ctx, extensions.ContextKeyIsVersion, func(version string) {
mw.Headers[VersionField] = []byte(version)
mw.Headers[w.versionHeaderKey] = []byte(version)
})

// Send message
Expand Down
9 changes: 7 additions & 2 deletions test/brokers.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package asyncapi_test

import (
"fmt"
"testing"
"time"

"github.com/lerenn/asyncapi-codegen/pkg/extensions"
"github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/kafka"
Expand All @@ -13,8 +15,11 @@ import (
func BrokerControllers(t *testing.T) map[string]extensions.BrokerController {
t.Helper() // Set this function as a helper

// Set a specific queueGroupeID to avoid collision between tests
queueGroupID := fmt.Sprintf("test-%d", time.Now().UnixNano())

return map[string]extensions.BrokerController{
"NATS": nats.NewController("nats://localhost:4222"),
"Kafka": kafka.NewController([]string{"localhost:9094"}),
"NATS": nats.NewController("nats://localhost:4222", nats.WithQueueGroup(queueGroupID)),
"Kafka": kafka.NewController([]string{"localhost:9094"}, kafka.WithGroupID(queueGroupID)),
}
}

0 comments on commit dd01f58

Please sign in to comment.