diff --git a/Makefile b/Makefile index 2df6b691..bdaae3d2 100644 --- a/Makefile +++ b/Makefile @@ -16,6 +16,15 @@ endif ci: ## Run the CI @${DAGGER_COMMAND} all +.PHONY: dev/up +dev/up: ## Start the development environment + @go run ./tools/generate-certs + @docker-compose up -d + +.PHONY: dev/down +dev/down: ## Stop the development environment + @docker-compose down + .PHONY: lint lint: ## Lint the code @${DAGGER_COMMAND} linter diff --git a/build/ci/dagger.go b/build/ci/dagger.go index 0b86c62d..2d26f652 100755 --- a/build/ci/dagger.go +++ b/build/ci/dagger.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "os" - "strings" "sync" "dagger.io/dagger" @@ -21,11 +20,6 @@ var ( testFlag string brokers map[string]*dagger.Service - - generator func(context.Context) error - linter *dagger.Container - examples map[string]*dagger.Container - tests map[string]*dagger.Container ) var rootCmd = &cobra.Command{ @@ -42,12 +36,6 @@ var rootCmd = &cobra.Command{ // Create services brokers = ci.Brokers(client) - // Create containers - generator = ci.Generator(client) - linter = ci.Linter(client) - examples = ci.Examples(client, brokers) - tests = ci.Tests(client, brokers) - return nil }, } @@ -57,9 +45,10 @@ var allCmd = &cobra.Command{ Aliases: []string{"a"}, Short: "Execute all CI", Run: func(cmd *cobra.Command, args []string) { - execute(context.Background(), generator) - executeContainers(context.Background(), []*dagger.Container{linter}) - executeContainers(context.Background(), utils.MapToList(tests), utils.MapToList(examples)) + execute(context.Background(), ci.Generator(client)) + executeContainers(context.Background(), ci.Linter(client)) + executeContainers(context.Background(), ci.Tests(client, brokers, "./...")) + executeContainers(context.Background(), ci.Tests(client, brokers, "./...")) }, } @@ -68,14 +57,16 @@ var examplesCmd = &cobra.Command{ Aliases: []string{"g"}, Short: "Execute examples step of the CI", Run: func(cmd *cobra.Command, args []string) { + examples := ci.Examples(client, brokers) + if exampleFlag != "" { _, exists := examples[exampleFlag] if !exists { panic(fmt.Errorf("example %q doesn't exist", exampleFlag)) } - executeContainers(context.Background(), []*dagger.Container{examples[exampleFlag]}) + executeContainers(context.Background(), examples[exampleFlag]) } else { - executeContainers(context.Background(), utils.MapToList(examples)) + executeContainers(context.Background(), utils.MapToList(examples)...) } }, } @@ -85,7 +76,7 @@ var generatorCmd = &cobra.Command{ Aliases: []string{"g"}, Short: "Execute generator step of the CI", Run: func(cmd *cobra.Command, args []string) { - execute(context.Background(), generator) + execute(context.Background(), ci.Generator(client)) }, } @@ -94,7 +85,7 @@ var linterCmd = &cobra.Command{ Aliases: []string{"g"}, Short: "Execute linter step of the CI", Run: func(cmd *cobra.Command, args []string) { - executeContainers(context.Background(), []*dagger.Container{linter}) + executeContainers(context.Background(), ci.Linter(client)) }, } @@ -112,19 +103,11 @@ var testCmd = &cobra.Command{ Aliases: []string{"g"}, Short: "Execute tests step of the CI", Run: func(cmd *cobra.Command, args []string) { - if testFlag != "" { - if !strings.HasPrefix(testFlag, "./") { - testFlag = "./" + testFlag - } - - _, exists := tests[testFlag] - if !exists { - panic(fmt.Errorf("test %q doesn't exist in %+v", testFlag, tests)) - } - executeContainers(context.Background(), []*dagger.Container{tests[testFlag]}) - } else { - executeContainers(context.Background(), utils.MapToList(tests)) + if testFlag == "" { + testFlag = "./..." } + + executeContainers(context.Background(), ci.Tests(client, brokers, testFlag)) }, } @@ -150,24 +133,22 @@ func main() { } } -func executeContainers(ctx context.Context, containers ...[]*dagger.Container) { +func executeContainers(ctx context.Context, containers ...*dagger.Container) { // Regroup arg funcs := make([]func(context.Context) error, 0) for _, l1 := range containers { - for _, l2 := range l1 { - if l2 == nil { - continue - } + if l1 == nil { + continue + } - // Note: create a new local variable to store value of actual l2 - callback := l2 + // Note: create a new local variable to store value of actual l2 + callback := l1 - fn := func(ctx context.Context) error { - _, err := callback.Stderr(ctx) - return err - } - funcs = append(funcs, fn) + fn := func(ctx context.Context) error { + _, err := callback.Stderr(ctx) + return err } + funcs = append(funcs, fn) } execute(ctx, funcs...) diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 00000000..3b88b65f --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,122 @@ +services: + # NATS variants + nats: + image: nats:latest + ports: + - 4222:4222 + nats-tls: + image: nats:latest + ports: + - "4223:4222" + command: [ + "--tls", + "--tlscert", "/certs/server-cert.pem", + "--tlskey", "/certs/server-key.pem", + ] + volumes: + - ./tmp/certs/nats:/certs + nats-tls-basic-auth: + image: nats:latest + ports: + - 4224:4222 + command: [ + "--tls", + "--tlscert", "/certs/server-cert.pem", + "--tlskey", "/certs/server-key.pem", + "--user", "user", + "--pass", "password", + ] + volumes: + - ./tmp/certs/nats:/certs + + # NATS Jetstream variants + nats-jetstream: + image: nats:latest + ports: + - 4225:4222 + command: [ + "-js", + ] + nats-jetstream-tls: + image: nats:latest + ports: + - 4226:4222 + command: [ + "-js", + "--tls", + "--tlscert", "/certs/server-cert.pem", + "--tlskey", "/certs/server-key.pem", + ] + volumes: + - ./tmp/certs/nats:/certs + nats-jetstream-tls-basic-auth: + image: nats:latest + ports: + - 4227:4222 + command: [ + "-js", + "--tls", + "--tlscert", "/certs/server-cert.pem", + "--tlskey", "/certs/server-key.pem", + "--user", "user", + "--pass", "password", + ] + volumes: + - ./tmp/certs/nats:/certs + + # Kafka variants + kafka: + image: bitnami/kafka:3.5.1 + ports: + - 9092:9092 + - 9093:9093 + environment: + - KAFKA_CFG_NODE_ID=0 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_LISTENERS=INTERNAL://:9092,CONTROLLER://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://localhost:9092 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@:9093 + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL + kafka-tls: + image: bitnami/kafka:3.5.1 + ports: + - 9094:9094 + - 9095:9095 + environment: + - KAFKA_CFG_NODE_ID=0 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_LISTENERS=INTERNAL://:9094,CONTROLLER://:9095 + - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://localhost:9094 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,INTERNAL:SSL + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@:9095 + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL + - KAFKA_TLS_TYPE=PEM + - KAFKA_TLS_CLIENT_AUTH=none + volumes: + - ./tmp/certs/kafka:/bitnami/kafka/config/certs/ + kafka-tls-basic-auth: + image: bitnami/kafka:3.5.1 + ports: + - 9096:9096 + - 9097:9097 + environment: + - KAFKA_CFG_NODE_ID=0 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_LISTENERS=INTERNAL://:9096,CONTROLLER://:9097 + - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://localhost:9096 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,INTERNAL:SASL_SSL + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@:9097 + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL + - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=SCRAM-SHA-512 + - KAFKA_TLS_TYPE=PEM + - KAFKA_TLS_CLIENT_AUTH=none + - KAFKA_INTER_BROKER_USER=user + - KAFKA_INTER_BROKER_PASSWORD=password + volumes: + - ./tmp/certs/kafka:/bitnami/kafka/config/certs/ + + \ No newline at end of file diff --git a/pkg/asyncapi/parse_test.go b/pkg/asyncapi/parse_test.go index 504a37cc..6005e2e5 100644 --- a/pkg/asyncapi/parse_test.go +++ b/pkg/asyncapi/parse_test.go @@ -24,7 +24,7 @@ func (suite *ParseSuite) TestCorrectVersions() { suite.Require().Equal(len(correctVersions), len(SupportedVersions)) for _, v := range correctVersions { - b := []byte(fmt.Sprintf("{\"version\":\"%s\"}", v)) + b := []byte(fmt.Sprintf("{\"asyncapi\":\"%s\"}", v)) _, err := FromJSON(b) suite.Require().NoError(err) } @@ -37,7 +37,7 @@ func (suite *ParseSuite) TestIncorrectVersions() { } for _, v := range correctVersions { - b := []byte(fmt.Sprintf("{\"version\":\"%s\"}", v)) + b := []byte(fmt.Sprintf("{\"asyncapi\":\"%s\"}", v)) _, err := FromJSON(b) suite.Require().Error(err) suite.Require().ErrorIs(err, ErrInvalidVersion) diff --git a/pkg/ci/brokers.go b/pkg/ci/brokers.go index 65e9db37..95b8254c 100644 --- a/pkg/ci/brokers.go +++ b/pkg/ci/brokers.go @@ -1,25 +1,24 @@ package ci import ( - "crypto/rand" - "crypto/rsa" - "crypto/x509" - "crypto/x509/pkix" - "encoding/pem" "fmt" - "math/big" - "time" "dagger.io/dagger" + + testutil "github.com/lerenn/asyncapi-codegen/pkg/utils/test" ) // BindBrokers is used as a helper to bind brokers to a container. func BindBrokers(brokers map[string]*dagger.Service) func(r *dagger.Container) *dagger.Container { return func(r *dagger.Container) *dagger.Container { + // Bind all brokers to the container. for n, b := range brokers { r = r.WithServiceBinding(n, b) } - return r + + // Set environment variable to indicate that the application is running + // in a dockerized environment. + return r.WithEnvVariable("ASYNCAPI_DOCKERIZED", "true") } } @@ -27,12 +26,17 @@ func BindBrokers(brokers map[string]*dagger.Service) func(r *dagger.Container) * func Brokers(client *dagger.Client) map[string]*dagger.Service { brokers := make(map[string]*dagger.Service) - brokers["kafka"] = BrokerRedPandaAsKafka(client) + // Kafka + brokers["kafka"] = BrokerKafka(client) brokers["kafka-tls"] = BrokerKafkaSecure(client) brokers["kafka-tls-basic-auth"] = BrokerKafkaSecureBasicAuth(client) + + // NATS brokers["nats"] = BrokerNATS(client) brokers["nats-tls"] = BrokerNATSSecure(client) brokers["nats-tls-basic-auth"] = BrokerNATSSecureBasicAuth(client) + + // NATS Jetstream brokers["nats-jetstream"] = BrokerNATSJetstream(client) brokers["nats-jetstream-tls"] = BrokerNATSJetstreamSecure(client) brokers["nats-jetstream-tls-basic-auth"] = BrokerNATSJetstreamSecureBasicAuth(client) @@ -40,25 +44,26 @@ func Brokers(client *dagger.Client) map[string]*dagger.Service { return brokers } -func BrokerRedPandaAsKafka(client *dagger.Client) *dagger.Service { +// BrokerKafka returns a service for the Kafka broker. +func BrokerKafka(client *dagger.Client) *dagger.Service { return client.Container(). // Set container image - From(RedPandaImage). + From(KafkaImage). - // Add Command - WithExec([]string{ - "redpanda", - "start", - "--kafka-addr internal://0.0.0.0:9092,controller://0.0.0.0:9093", - "--advertise-kafka-addr internal://kafka:9092,external://localhost:9093", - // Mode dev-container uses well-known configuration properties - // for development in containers. - "--mode dev-container", - // Tells Seastar (the framework Redpanda uses under the hood) to - // use 1 core on the system. - "--smp 1", - "--default-log-level=info", - }). + // Add environment variables + WithEnvVariable("KAFKA_CFG_NODE_ID", "0"). + WithEnvVariable("KAFKA_CFG_PROCESS_ROLES", "controller,broker"). + WithEnvVariable("KAFKA_CFG_LISTENERS", "INTERNAL://:9092,CONTROLLER://:9093"). + WithEnvVariable("KAFKA_CFG_ADVERTISED_LISTENERS", "INTERNAL://kafka:9092"). + WithEnvVariable("KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP", + "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT"). + WithEnvVariable("KAFKA_CFG_CONTROLLER_QUORUM_VOTERS", "0@:9093"). + WithEnvVariable("KAFKA_CFG_CONTROLLER_LISTENER_NAMES", "CONTROLLER"). + WithEnvVariable("KAFKA_CFG_INTER_BROKER_LISTENER_NAME", "INTERNAL"). + + // Add exposed ports + WithExposedPort(9092). + WithExposedPort(9093). // Return container as a service AsService() @@ -66,7 +71,7 @@ func BrokerRedPandaAsKafka(client *dagger.Client) *dagger.Service { // BrokerKafkaSecure returns a service for the Kafka broker secured with TLS. func BrokerKafkaSecure(client *dagger.Client) *dagger.Service { - key, cert, cacert, err := generateSelfSignedCertificateWithCA("kafka-tls") + key, cert, cacert, err := testutil.GenerateSelfSignedCertificateWithCA("kafka-tls") if err != nil { panic(fmt.Errorf("failed to generate self signed certificate: %w", err)) } @@ -109,7 +114,7 @@ func BrokerKafkaSecure(client *dagger.Client) *dagger.Service { // BrokerKafkaSecureBasicAuth returns a service for the Kafka broker secured with TLS and basic auth. func BrokerKafkaSecureBasicAuth(client *dagger.Client) *dagger.Service { - key, cert, cacert, err := generateSelfSignedCertificateWithCA("kafka-tls-basic-auth") + key, cert, cacert, err := testutil.GenerateSelfSignedCertificateWithCA("kafka-tls-basic-auth") if err != nil { panic(fmt.Errorf("failed to generate self signed certificate: %w", err)) } @@ -170,7 +175,7 @@ func BrokerNATS(client *dagger.Client) *dagger.Service { // BrokerNATSSecure returns a service for the NATS broker secured with TLS. func BrokerNATSSecure(client *dagger.Client) *dagger.Service { - key, cert, err := generateSelfSignedCertificate("nats-tls") + key, cert, err := testutil.GenerateSelfSignedCertificate("nats-tls") if err != nil { panic(fmt.Errorf("failed to generate self signed certificate: %w", err)) } @@ -192,7 +197,7 @@ func BrokerNATSSecure(client *dagger.Client) *dagger.Service { // BrokerNATSSecureBasicAuth returns a service for the NATS broker secured with TLS // and basic auth user: user password: password. func BrokerNATSSecureBasicAuth(client *dagger.Client) *dagger.Service { - key, cert, err := generateSelfSignedCertificate("nats-tls-basic-auth") + key, cert, err := testutil.GenerateSelfSignedCertificate("nats-tls-basic-auth") if err != nil { panic(fmt.Errorf("failed to generate self signed certificate: %w", err)) } @@ -206,7 +211,12 @@ func BrokerNATSSecureBasicAuth(client *dagger.Client) *dagger.Service { // Add server cert and key directory WithDirectory("./tls", tlsDir). // Start NATS with tls and credentials - WithExec([]string{"--tls", "--tlscert=/tls/server-cert.pem", "--tlskey=/tls/server-key.pem", "--user", "user", "--pass", "password"}). //nolint:lll + WithExec([]string{ + "--tls", + "--tlscert=/tls/server-cert.pem", + "--tlskey=/tls/server-key.pem", + "--user", "user", + "--pass", "password"}). // Return container as a service AsService() } @@ -226,7 +236,7 @@ func BrokerNATSJetstream(client *dagger.Client) *dagger.Service { // BrokerNATSJetstreamSecure returns a service for the NATS broker secured with TLS. func BrokerNATSJetstreamSecure(client *dagger.Client) *dagger.Service { - key, cert, err := generateSelfSignedCertificate("nats-jetstream-tls-basic-auth") + key, cert, err := testutil.GenerateSelfSignedCertificate("nats-jetstream-tls-basic-auth") if err != nil { panic(fmt.Errorf("failed to generate self signed certificate: %w", err)) } @@ -248,7 +258,7 @@ func BrokerNATSJetstreamSecure(client *dagger.Client) *dagger.Service { // BrokerNATSJetstreamSecureBasicAuth returns a service for the NATS broker secured with TLS // and basic auth user: user password: password. func BrokerNATSJetstreamSecureBasicAuth(client *dagger.Client) *dagger.Service { - key, cert, err := generateSelfSignedCertificate("nats-jetstream-tls") + key, cert, err := testutil.GenerateSelfSignedCertificate("nats-jetstream-tls") if err != nil { panic(fmt.Errorf("failed to generate self signed certificate: %w", err)) } @@ -266,105 +276,3 @@ func BrokerNATSJetstreamSecureBasicAuth(client *dagger.Client) *dagger.Service { // Return container as a service AsService() } - -func certificateTemplateForHost(name string) x509.Certificate { - return x509.Certificate{ - SerialNumber: big.NewInt(1), - Subject: pkix.Name{ - Organization: []string{"asyncapi-codegen"}, - OrganizationalUnit: []string{"localtest"}, - CommonName: name, - }, - NotBefore: time.Now(), - NotAfter: time.Now().AddDate(0, 0, 1), // Valid for 1 day - KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, - ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, - BasicConstraintsValid: true, - DNSNames: []string{name, "localhost", "127.0.0.1"}, - } -} - -func generateSelfSignedCertificate(name string) ([]byte, []byte, error) { - // Generate private key - privateKey, err := rsa.GenerateKey(rand.Reader, 4096) - if err != nil { - return nil, nil, err - } - - template := certificateTemplateForHost(name) - - // Generate self-signed certificate - derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey) - if err != nil { - return nil, nil, err - } - - // Encode private key to PEM format - keyBytes := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(privateKey)}) - - // Encode certificate to PEM format - certBytes := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: derBytes}) - - return keyBytes, certBytes, nil -} - -func generateSelfSignedCertificateWithCA(name string) ([]byte, []byte, []byte, error) { - // Generate private key for CA - caPrivateKey, err := rsa.GenerateKey(rand.Reader, 4096) - if err != nil { - return nil, nil, nil, err - } - - // Create a self-signed CA certificate - caCertTemplate := x509.Certificate{ - SerialNumber: big.NewInt(2), // Use a different serial number for the CA certificate - Subject: pkix.Name{ - Organization: []string{"asyncapi-codegen"}, - CommonName: "CA asyncapi-codegen", - }, - NotBefore: time.Now(), - NotAfter: time.Now().AddDate(10, 0, 0), // Valid for 10 years - KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageCRLSign, - IsCA: true, - BasicConstraintsValid: true, - } - - // Generate self-signed CA certificate - caDERBytes, err := x509.CreateCertificate(rand.Reader, &caCertTemplate, &caCertTemplate, - &caPrivateKey.PublicKey, caPrivateKey) - if err != nil { - return nil, nil, nil, err - } - - // Generate private key for server - privateKey, err := rsa.GenerateKey(rand.Reader, 4096) - if err != nil { - return nil, nil, nil, err - } - - // Create server certificate signed by CA - certTemplate := certificateTemplateForHost(name) - - derBytes, err := x509.CreateCertificate(rand.Reader, &certTemplate, &caCertTemplate, - &privateKey.PublicKey, caPrivateKey) - if err != nil { - return nil, nil, nil, err - } - - // Convert private key to PKCS #8 - privatKeyPKC8Bytes, err := x509.MarshalPKCS8PrivateKey(privateKey) - if err != nil { - return nil, nil, nil, err - } - - // Encode server private key to PEM format - keyBytes := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: privatKeyPKC8Bytes}) - - // Encode server certificate to PEM format - certBytes := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: derBytes}) - - // Encode CA certificate to PEM format - caCertBytes := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: caDERBytes}) - - return keyBytes, certBytes, caCertBytes, nil -} diff --git a/pkg/ci/constants.go b/pkg/ci/constants.go index c9e56cd0..4a6cf4c1 100644 --- a/pkg/ci/constants.go +++ b/pkg/ci/constants.go @@ -1,8 +1,6 @@ package ci const ( - // RedPandaImage is the image used for redpanda. - RedPandaImage = "docker.redpanda.com/redpandadata/redpanda:v23.3.11" // KafkaImage is the image used for kafka. KafkaImage = "bitnami/kafka:3.5.1" // GolangImage is the image used for golang execution. diff --git a/pkg/ci/linter.go b/pkg/ci/linter.go index ea7504e1..8c604d60 100644 --- a/pkg/ci/linter.go +++ b/pkg/ci/linter.go @@ -14,5 +14,5 @@ func Linter(client *dagger.Client) *dagger.Container { // Add golangci-lint cache WithMountedCache("/root/.cache/golangci-lint", client.CacheVolume("golangci-lint")). // Add command - WithExec([]string{"golangci-lint", "run"}) + WithExec([]string{"golangci-lint", "run", "--timeout=10m"}) } diff --git a/pkg/ci/tests.go b/pkg/ci/tests.go index 68cb6b27..e61551c8 100644 --- a/pkg/ci/tests.go +++ b/pkg/ci/tests.go @@ -5,24 +5,14 @@ import ( ) // Tests returns containers for all tests. -func Tests(client *dagger.Client, brokers map[string]*dagger.Service) map[string]*dagger.Container { - containers := make(map[string]*dagger.Container, 0) - - // Set examples - for _, p := range directoriesAtSublevel(2, "./test") { - t := client.Container(). - // Add base image - From(GolangImage). - // Add source code as work directory - With(sourceAsWorkdir(client)). - // Set brokers as dependencies of app and user - With(BindBrokers(brokers)). - // Execute command - WithExec([]string{"go", "test", p}) - - // Add user containers to containers - containers[p] = t - } - - return containers +func Tests(client *dagger.Client, brokers map[string]*dagger.Service, path string) *dagger.Container { + return client.Container(). + // Add base image + From(GolangImage). + // Add source code as work directory + With(sourceAsWorkdir(client)). + // Set brokers as dependencies of app and user + With(BindBrokers(brokers)). + // Execute command + WithExec([]string{"go", "test", path}) } diff --git a/pkg/extensions/brokers/kafka/kafka_test.go b/pkg/extensions/brokers/kafka/kafka_test.go index bf4751a4..fedf40f1 100644 --- a/pkg/extensions/brokers/kafka/kafka_test.go +++ b/pkg/extensions/brokers/kafka/kafka_test.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "testing" + testutil "github.com/lerenn/asyncapi-codegen/pkg/utils/test" "github.com/segmentio/kafka-go/sasl/scram" "github.com/stretchr/testify/assert" ) @@ -13,13 +14,27 @@ func TestSecureConnectionToKafka(t *testing.T) { tlsConfig := &tls.Config{InsecureSkipVerify: true} t.Run("test connection is not successfully to TLS secured kafka broker without TLS config", func(t *testing.T) { - _, err := NewController([]string{"kafka-tls:9092"}, + _, err := NewController( + []string{ + testutil.BrokerAddress(testutil.BrokerAddressParams{ + DockerizedAddr: "kafka-tls", + DockerizedPort: "9092", + LocalPort: "9094", + }), + }, WithGroupID("secureConnectTestWithoutTLS")) assert.Error(t, err, "new connection to TLS secured kafka broker without TLS config should return a error") }) t.Run("test connection is successfully to TLS secured kafka broker with TLS config", func(t *testing.T) { - _, err := NewController([]string{"kafka-tls:9092"}, + _, err := NewController( + []string{ + testutil.BrokerAddress(testutil.BrokerAddressParams{ + DockerizedAddr: "kafka-tls", + DockerizedPort: "9092", + LocalPort: "9094", + }), + }, WithGroupID("secureConnectTestWithTLS"), WithTLS(tlsConfig), ) @@ -28,7 +43,14 @@ func TestSecureConnectionToKafka(t *testing.T) { t.Run("test connection is not successfully to TLS secured kafka broker with TLS config and missing credentials", func(t *testing.T) { - _, err := NewController([]string{"kafka-tls-basic-auth:9092"}, + _, err := NewController( + []string{ + testutil.BrokerAddress(testutil.BrokerAddressParams{ + DockerizedAddr: "kafka-tls-basic-auth", + DockerizedPort: "9092", + LocalPort: "9096", + }), + }, WithGroupID("secureConnectTestWithTLSAndWithoutCredentials"), WithTLS(tlsConfig), ) @@ -40,7 +62,14 @@ func TestSecureConnectionToKafka(t *testing.T) { sha512Mechanism, err := scram.Mechanism(scram.SHA512, "user", "password") assert.NoError(t, err, "new scram.SHA512 should not return a error") - _, err = NewController([]string{"kafka-tls-basic-auth:9092"}, + _, err = NewController( + []string{ + testutil.BrokerAddress(testutil.BrokerAddressParams{ + DockerizedAddr: "kafka-tls-basic-auth", + DockerizedPort: "9092", + LocalPort: "9096", + }), + }, WithGroupID("secureConnectTestWithTLSAndWithCredentials"), WithTLS(tlsConfig), WithSasl(sha512Mechanism), diff --git a/pkg/extensions/brokers/nats/nats_test.go b/pkg/extensions/brokers/nats/nats_test.go index b5579717..33c4e44f 100644 --- a/pkg/extensions/brokers/nats/nats_test.go +++ b/pkg/extensions/brokers/nats/nats_test.go @@ -6,13 +6,20 @@ import ( "sync" "testing" + testutil "github.com/lerenn/asyncapi-codegen/pkg/utils/test" "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" ) func TestValidateAckMechanism(t *testing.T) { subj := "CoreNatsValidateAckMechanism" - nb, err := NewController("nats://nats:4222", WithQueueGroup(subj)) + nb, err := NewController( + testutil.BrokerAddress(testutil.BrokerAddressParams{ + Schema: "nats", + DockerizedAddr: "nats", + Port: "4222", + }), + WithQueueGroup(subj)) assert.NoError(t, err, "new controller should not return error") t.Run("validate ack is not supported in core NATS", func(t *testing.T) { @@ -70,13 +77,25 @@ func TestSecureConnectionToNATSCore(t *testing.T) { tlsConfig := &tls.Config{InsecureSkipVerify: true} t.Run("test connection is not successfully to TLS secured core NATS broker without TLS config", func(t *testing.T) { - _, err := NewController("nats://nats-tls:4222", + _, err := NewController( + testutil.BrokerAddress(testutil.BrokerAddressParams{ + Schema: "nats", + DockerizedAddr: "nats-tls", + DockerizedPort: "4222", + LocalPort: "4223", + }), WithQueueGroup("secureConnectTest")) assert.Error(t, err, "new connection to TLS secured NATS broker without TLS config should return a error") }) t.Run("test connection is successfully to TLS secured core NATS broker with TLS config", func(t *testing.T) { - nb, err := NewController("nats://nats-tls:4222", + nb, err := NewController( + testutil.BrokerAddress(testutil.BrokerAddressParams{ + Schema: "nats", + DockerizedAddr: "nats-tls", + DockerizedPort: "4222", + LocalPort: "4223", + }), WithQueueGroup("secureConnectTest"), WithConnectionOpts(nats.Secure(tlsConfig))) assert.NoError(t, err, "new connection to TLS secured NATS broker with TLS config should return no error") @@ -85,7 +104,13 @@ func TestSecureConnectionToNATSCore(t *testing.T) { t.Run("test connection is not successfully to TLS secured core NATS broker with TLS config and missing credentials", func(t *testing.T) { - _, err := NewController("nats://nats-tls-basic-auth:4222", + _, err := NewController( + testutil.BrokerAddress(testutil.BrokerAddressParams{ + Schema: "nats", + DockerizedAddr: "nats-tls-basic-auth", + DockerizedPort: "4222", + LocalPort: "4224", + }), WithQueueGroup("secureConnectTest"), WithConnectionOpts(nats.Secure(tlsConfig)), ) @@ -94,7 +119,13 @@ func TestSecureConnectionToNATSCore(t *testing.T) { t.Run("test connection is successfully to TLS secured core NATS broker with TLS config and credentials", func(t *testing.T) { - nb, err := NewController("nats://nats-tls-basic-auth:4222", + nb, err := NewController( + testutil.BrokerAddress(testutil.BrokerAddressParams{ + Schema: "nats", + DockerizedAddr: "nats-tls-basic-auth", + DockerizedPort: "4222", + LocalPort: "4224", + }), WithQueueGroup("secureConnectTest"), WithConnectionOpts( nats.Secure(tlsConfig), diff --git a/pkg/extensions/brokers/natsjetstream/natsjetstream_test.go b/pkg/extensions/brokers/natsjetstream/natsjetstream_test.go index 889e5778..2e52dc4a 100644 --- a/pkg/extensions/brokers/natsjetstream/natsjetstream_test.go +++ b/pkg/extensions/brokers/natsjetstream/natsjetstream_test.go @@ -6,6 +6,7 @@ import ( "sync" "testing" + testutil "github.com/lerenn/asyncapi-codegen/pkg/utils/test" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" "github.com/stretchr/testify/assert" @@ -16,7 +17,12 @@ func TestValidateAckMechanism(t *testing.T) { subj := "NatsJetstreamValidateAckMechanism" broker, err := NewController( - "nats://nats-jetstream:4222", + testutil.BrokerAddress(testutil.BrokerAddressParams{ + Schema: "nats", + DockerizedAddr: "nats-jetstream", + DockerizedPort: "4222", + LocalPort: "4225", + }), WithStreamConfig(jetstream.StreamConfig{ Name: subj, Subjects: []string{subj}, @@ -87,7 +93,12 @@ func TestSecureConnectionToNATSJetstream(t *testing.T) { subj := "secureConnectTestWithoutTLSConfig" _, err := NewController( - "nats://nats-jetstream-tls:4222", + testutil.BrokerAddress(testutil.BrokerAddressParams{ + Schema: "nats", + DockerizedAddr: "nats-jetstream-tls", + DockerizedPort: "4222", + LocalPort: "4226", + }), WithStreamConfig(jetstream.StreamConfig{ Name: subj, Subjects: []string{subj}, @@ -101,7 +112,12 @@ func TestSecureConnectionToNATSJetstream(t *testing.T) { subj := "secureConnectTestWithTLSConfig" jc, err := NewController( - "nats://nats-jetstream-tls:4222", + testutil.BrokerAddress(testutil.BrokerAddressParams{ + Schema: "nats", + DockerizedAddr: "nats-jetstream-tls", + DockerizedPort: "4222", + LocalPort: "4226", + }), WithStreamConfig(jetstream.StreamConfig{ Name: subj, Subjects: []string{subj}, @@ -119,7 +135,12 @@ func TestSecureConnectionToNATSJetstream(t *testing.T) { subj := "secureConnectTestWithTLSConfigAndWithoutCredentials" _, err := NewController( - "nats://nats-jetstream-tls-basic-auth:4222", + testutil.BrokerAddress(testutil.BrokerAddressParams{ + Schema: "nats", + DockerizedAddr: "nats-jetstream-tls-basic-auth", + DockerizedPort: "4222", + LocalPort: "4227", + }), WithStreamConfig(jetstream.StreamConfig{ Name: subj, Subjects: []string{subj}, @@ -136,7 +157,12 @@ func TestSecureConnectionToNATSJetstream(t *testing.T) { subj := "secureConnectTestWithTLSConfigAndCredentials" jc, err := NewController( - "nats://nats-jetstream-tls-basic-auth:4222", + testutil.BrokerAddress(testutil.BrokerAddressParams{ + Schema: "nats", + DockerizedAddr: "nats-jetstream-tls-basic-auth", + DockerizedPort: "4222", + LocalPort: "4227", + }), WithStreamConfig(jetstream.StreamConfig{ Name: subj, Subjects: []string{subj}, diff --git a/pkg/utils/test/brokers.go b/pkg/utils/test/brokers.go new file mode 100644 index 00000000..c0a276d5 --- /dev/null +++ b/pkg/utils/test/brokers.go @@ -0,0 +1,55 @@ +package test + +import ( + "os" +) + +type BrokerAddressParams struct { + Schema string + Port string + + DockerizedAddr string + DockerizedPort string + + LocalAddr string + LocalPort string +} + +// BrokerAddress returns the broker address based on the environment. +// If the environment variable ASYNCAPI_DOCKERIZED is set, it returns +// the dockerized address. +func BrokerAddress(params BrokerAddressParams) string { + var url string + + // Set schema if not empty + if params.Schema != "" { + url = params.Schema + "://" + } + + // Set address based on environment + if os.Getenv("ASYNCAPI_DOCKERIZED") != "" { + url += params.DockerizedAddr + + // Set port if not empty + if params.DockerizedPort != "" { + url += ":" + params.DockerizedPort + } else if params.Port != "" { + url += ":" + params.Port + } + } else { + if params.LocalAddr != "" { + url += params.LocalAddr + } else { + url += "localhost" + } + + // Set port if not empty + if params.LocalPort != "" { + url += ":" + params.LocalPort + } else if params.Port != "" { + url += ":" + params.Port + } + } + + return url +} diff --git a/pkg/utils/test/certs.go b/pkg/utils/test/certs.go new file mode 100644 index 00000000..012dc4f0 --- /dev/null +++ b/pkg/utils/test/certs.go @@ -0,0 +1,113 @@ +package test + +import ( + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "math/big" + "time" +) + +func GenerateSelfSignedCertificate(name string) ([]byte, []byte, error) { + // Generate private key + privateKey, err := rsa.GenerateKey(rand.Reader, 4096) + if err != nil { + return nil, nil, err + } + + template := certificateTemplateForHost(name) + + // Generate self-signed certificate + derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey) + if err != nil { + return nil, nil, err + } + + // Encode private key to PEM format + keyBytes := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(privateKey)}) + + // Encode certificate to PEM format + certBytes := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: derBytes}) + + return keyBytes, certBytes, nil +} + +func GenerateSelfSignedCertificateWithCA(name string) ([]byte, []byte, []byte, error) { + // Generate private key for CA + caPrivateKey, err := rsa.GenerateKey(rand.Reader, 4096) + if err != nil { + return nil, nil, nil, err + } + + // Create a self-signed CA certificate + caCertTemplate := x509.Certificate{ + SerialNumber: big.NewInt(2), // Use a different serial number for the CA certificate + Subject: pkix.Name{ + Organization: []string{"asyncapi-codegen"}, + CommonName: "CA asyncapi-codegen", + }, + NotBefore: time.Now(), + NotAfter: time.Now().AddDate(10, 0, 0), // Valid for 10 years + KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageCRLSign, + IsCA: true, + BasicConstraintsValid: true, + } + + // Generate self-signed CA certificate + caDERBytes, err := x509.CreateCertificate(rand.Reader, &caCertTemplate, &caCertTemplate, + &caPrivateKey.PublicKey, caPrivateKey) + if err != nil { + return nil, nil, nil, err + } + + // Generate private key for server + privateKey, err := rsa.GenerateKey(rand.Reader, 4096) + if err != nil { + return nil, nil, nil, err + } + + // Create server certificate signed by CA + certTemplate := certificateTemplateForHost(name) + + derBytes, err := x509.CreateCertificate(rand.Reader, &certTemplate, &caCertTemplate, + &privateKey.PublicKey, caPrivateKey) + if err != nil { + return nil, nil, nil, err + } + + // Convert private key to PKCS #8 + privatKeyPKC8Bytes, err := x509.MarshalPKCS8PrivateKey(privateKey) + if err != nil { + return nil, nil, nil, err + } + + // Encode server private key to PEM format + keyBytes := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: privatKeyPKC8Bytes}) + + // Encode server certificate to PEM format + certBytes := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: derBytes}) + + // Encode CA certificate to PEM format + caCertBytes := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: caDERBytes}) + + return keyBytes, certBytes, caCertBytes, nil +} + +func certificateTemplateForHost(name string) x509.Certificate { + return x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{ + Organization: []string{"asyncapi-codegen"}, + OrganizationalUnit: []string{"localtest"}, + CommonName: name, + }, + NotBefore: time.Now(), + NotAfter: time.Now().AddDate(0, 0, 1), // Valid for 1 day + KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + DNSNames: []string{name, "localhost", "127.0.0.1"}, + } +} diff --git a/test/brokers.go b/test/brokers.go index 6d77d574..2b32d107 100644 --- a/test/brokers.go +++ b/test/brokers.go @@ -1,43 +1,65 @@ -package asyncapi_test +package test import ( "fmt" "testing" - "time" "github.com/lerenn/asyncapi-codegen/pkg/extensions" "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/kafka" "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/nats" + testutil "github.com/lerenn/asyncapi-codegen/pkg/utils/test" ) +type BrokerAddressParams struct { + Schema string + Port string + + DockerizedAddr string + DockerizedPort string + + LocalAddr string + LocalPort string +} + // BrokerControllers returns a list of BrokerController to test based on the // docker-compose file of the project. -func BrokerControllers(t *testing.T) (brokers []extensions.BrokerController, cleanup func()) { +func BrokerControllers(t *testing.T) ([]extensions.BrokerController, func()) { t.Helper() // Set this function as a helper - // Initialize returned values - brokers = make([]extensions.BrokerController, 0) - // Set a specific queueGroupID to avoid collision between tests - queueGroupID := fmt.Sprintf("test-%d", time.Now().UnixNano()) + queueGroupID := fmt.Sprintf("test-%s", t.Name()) + fmt.Println(queueGroupID) // Add NATS broker - nb, err := nats.NewController("nats://nats:4222", nats.WithQueueGroup(queueGroupID)) + natsController, err := nats.NewController( + testutil.BrokerAddress(testutil.BrokerAddressParams{ + Schema: "nats", + DockerizedAddr: "nats", + Port: "4222", + }), + nats.WithQueueGroup(queueGroupID)) if err != nil { panic(err) } - brokers = append(brokers, nb) - // Add kafka broker - kb, err := kafka.NewController([]string{"kafka:9092"}, kafka.WithGroupID(queueGroupID)) + // Add Kafka broker + kafkaController, err := kafka.NewController( + []string{ + testutil.BrokerAddress(testutil.BrokerAddressParams{ + DockerizedAddr: "kafka", + Port: "9092", + }), + }, + kafka.WithGroupID(queueGroupID)) if err != nil { panic(err) } - brokers = append(brokers, kb) // Return brokers with their cleanup functions - return brokers, func() { - // Clean up NATS - nb.Close() - } + return []extensions.BrokerController{ + natsController, + kafkaController, + }, func() { + natsController.Close() + } } diff --git a/test/v2/issues/101/suite_test.go b/test/v2/issues/101/suite_test.go index 00786c45..dcef290a 100644 --- a/test/v2/issues/101/suite_test.go +++ b/test/v2/issues/101/suite_test.go @@ -8,12 +8,12 @@ import ( "testing" "github.com/lerenn/asyncapi-codegen/pkg/extensions" - asyncapi_test "github.com/lerenn/asyncapi-codegen/test" + testutil "github.com/lerenn/asyncapi-codegen/test" "github.com/stretchr/testify/suite" ) func TestSuite(t *testing.T) { - brokers, cleanup := asyncapi_test.BrokerControllers(t) + brokers, cleanup := testutil.BrokerControllers(t) defer cleanup() // Only do it with one broker as this is not testing the broker @@ -70,7 +70,6 @@ func (suite *Suite) TestAddingHeader() { } // Check what the app receive - wg.Add(1) err := suite.app.SubscribeV2Issue101Test( context.Background(), func(_ context.Context, msg V2Issue101TestMessage) error { @@ -80,6 +79,7 @@ func (suite *Suite) TestAddingHeader() { suite.Require().NoError(err) // Publish the message + wg.Add(1) err = suite.user.PublishV2Issue101Test(context.Background(), sent) suite.Require().NoError(err) diff --git a/test/v2/issues/122/asyncapi.yaml b/test/v2/issues/122/asyncapi.yaml index 8ea58249..386775aa 100644 --- a/test/v2/issues/122/asyncapi.yaml +++ b/test/v2/issues/122/asyncapi.yaml @@ -8,10 +8,6 @@ info: version: 1.0.0 channels: v2.issue122.msg: # channel that has the two operations - subscribe: - message: - payload: - type: string publish: message: payload: diff --git a/test/v2/issues/122/suite_test.go b/test/v2/issues/122/suite_test.go index fb37e83f..55c11b70 100644 --- a/test/v2/issues/122/suite_test.go +++ b/test/v2/issues/122/suite_test.go @@ -10,7 +10,7 @@ import ( "testing" "github.com/lerenn/asyncapi-codegen/pkg/extensions" - asyncapi_test "github.com/lerenn/asyncapi-codegen/test" + testutil "github.com/lerenn/asyncapi-codegen/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" ) @@ -18,7 +18,7 @@ import ( var errTest = errors.New("some test error") func TestSuite(t *testing.T) { - brokers, cleanup := asyncapi_test.BrokerControllers(t) + brokers, cleanup := testutil.BrokerControllers(t) defer cleanup() // Only do it with one broker as this is not testing the broker @@ -76,10 +76,10 @@ func (suite *Suite) TestErrorHandlerForApp() { return errTest }) suite.Require().NoError(err) - - suite.wg.Add(1) + defer suite.app.UnsubscribeV2Issue122Msg(context.Background()) // Publish the message + suite.wg.Add(1) err = suite.user.PublishV2Issue122Msg(context.Background(), sent) suite.Require().NoError(err) @@ -100,10 +100,10 @@ func (suite *Suite) TestErrorHandlerForUser() { return errTest }) suite.Require().NoError(err) - - suite.wg.Add(1) + defer suite.user.UnsubscribeV2Issue122Msg(context.Background()) // Publish the message + suite.wg.Add(1) err = suite.user.PublishV2Issue122Msg(context.Background(), sent) suite.Require().NoError(err) diff --git a/test/v2/issues/129/suite_test.go b/test/v2/issues/129/suite_test.go index 72f779ac..22233559 100644 --- a/test/v2/issues/129/suite_test.go +++ b/test/v2/issues/129/suite_test.go @@ -12,7 +12,7 @@ import ( "github.com/lerenn/asyncapi-codegen/pkg/extensions" "github.com/lerenn/asyncapi-codegen/pkg/extensions/middlewares" "github.com/lerenn/asyncapi-codegen/pkg/utils" - asyncapi_test "github.com/lerenn/asyncapi-codegen/test" + testutil "github.com/lerenn/asyncapi-codegen/test" "github.com/lerenn/asyncapi-codegen/test/v2/issues/129/camel" "github.com/lerenn/asyncapi-codegen/test/v2/issues/129/kebab" "github.com/lerenn/asyncapi-codegen/test/v2/issues/129/none" @@ -21,7 +21,7 @@ import ( ) func TestSuite(t *testing.T) { - brokers, cleanup := asyncapi_test.BrokerControllers(t) + brokers, cleanup := testutil.BrokerControllers(t) defer cleanup() for _, b := range brokers { diff --git a/test/v2/issues/164/suite_test.go b/test/v2/issues/164/suite_test.go index fb68ff8c..583e32fd 100644 --- a/test/v2/issues/164/suite_test.go +++ b/test/v2/issues/164/suite_test.go @@ -10,12 +10,12 @@ import ( "github.com/lerenn/asyncapi-codegen/pkg/extensions" "github.com/lerenn/asyncapi-codegen/pkg/extensions/middlewares" "github.com/lerenn/asyncapi-codegen/pkg/utils" - asyncapi_test "github.com/lerenn/asyncapi-codegen/test" + testutil "github.com/lerenn/asyncapi-codegen/test" "github.com/stretchr/testify/suite" ) func TestSuite(t *testing.T) { - brokers, cleanup := asyncapi_test.BrokerControllers(t) + brokers, cleanup := testutil.BrokerControllers(t) defer cleanup() for _, b := range brokers { @@ -74,7 +74,6 @@ func (suite *Suite) TestAdditionalProperties() { // Check what the app receive and translate var recvMsg TestMapMessage - wg.Add(1) err := suite.app.SubscribeV2Issue164TestMap( context.Background(), func(_ context.Context, msg TestMapMessage) error { @@ -85,6 +84,7 @@ func (suite *Suite) TestAdditionalProperties() { suite.Require().NoError(err) // Send the message + wg.Add(1) err = suite.user.PublishV2Issue164TestMap(context.Background(), sent) suite.Require().NoError(err) diff --git a/test/v2/issues/169/asyncapi.gen.go b/test/v2/issues/169/asyncapi.gen.go index 8a758665..9bffaa1e 100644 --- a/test/v2/issues/169/asyncapi.gen.go +++ b/test/v2/issues/169/asyncapi.gen.go @@ -13,7 +13,7 @@ import ( // AppSubscriber represents all handlers that are expecting messages for App type AppSubscriber interface { // Issue169Msg subscribes to messages placed on the 'issue169.msg' channel - Issue169Msg(ctx context.Context, msg Issue169MsgSubscribeMessage) error + Issue169Msg(ctx context.Context, msg Issue169MsgMessage) error } // AppController is the structure that provides publishing capabilities to the @@ -141,7 +141,7 @@ func (c *AppController) UnsubscribeAll(ctx context.Context) { // Callback function 'fn' will be called each time a new message is received. func (c *AppController) SubscribeIssue169Msg( ctx context.Context, - fn func(ctx context.Context, msg Issue169MsgSubscribeMessage) error, + fn func(ctx context.Context, msg Issue169MsgMessage) error, ) error { // Get channel path path := "issue169.msg" @@ -184,7 +184,7 @@ func (c *AppController) SubscribeIssue169Msg( // Execute middlewares before handling the message if err := c.executeMiddlewares(ctx, &acknowledgeableBrokerMessage.BrokerMessage, func(ctx context.Context) error { // Process message - msg, err := newIssue169MsgSubscribeMessageFromBrokerMessage(acknowledgeableBrokerMessage.BrokerMessage) + msg, err := newIssue169MsgMessageFromBrokerMessage(acknowledgeableBrokerMessage.BrokerMessage) if err != nil { return err } @@ -236,39 +236,6 @@ func (c *AppController) UnsubscribeIssue169Msg(ctx context.Context) { c.logger.Info(ctx, "Unsubscribed from channel") } -// PublishIssue169Msg will publish messages to 'issue169.msg' channel -func (c *AppController) PublishIssue169Msg( - ctx context.Context, - msg Issue169MsgPublishMessage, -) error { - // Get channel path - path := "issue169.msg" - - // Set context - ctx = addAppContextValues(ctx, path) - ctx = context.WithValue(ctx, extensions.ContextKeyIsDirection, "publication") - - // Convert to BrokerMessage - brokerMsg, err := msg.toBrokerMessage() - if err != nil { - return err - } - - // Set broker message to context - ctx = context.WithValue(ctx, extensions.ContextKeyIsBrokerMessage, brokerMsg.String()) - - // Publish the message on event-broker through middlewares - return c.executeMiddlewares(ctx, &brokerMsg, func(ctx context.Context) error { - return c.broker.Publish(ctx, path, brokerMsg) - }) -} - -// UserSubscriber represents all handlers that are expecting messages for User -type UserSubscriber interface { - // Issue169Msg subscribes to messages placed on the 'issue169.msg' channel - Issue169Msg(ctx context.Context, msg Issue169MsgSubscribeMessage) error -} - // UserController is the structure that provides publishing capabilities to the // developer and and connect the broker with the User type UserController struct { @@ -365,134 +332,12 @@ func addUserContextValues(ctx context.Context, path string) context.Context { // Close will clean up any existing resources on the controller func (c *UserController) Close(ctx context.Context) { // Unsubscribing remaining channels - c.UnsubscribeAll(ctx) - - c.logger.Info(ctx, "Closed user controller") -} - -// SubscribeAll will subscribe to channels without parameters on which the app is expecting messages. -// For channels with parameters, they should be subscribed independently. -func (c *UserController) SubscribeAll(ctx context.Context, as UserSubscriber) error { - if as == nil { - return extensions.ErrNilUserSubscriber - } - - if err := c.SubscribeIssue169Msg(ctx, as.Issue169Msg); err != nil { - return err - } - - return nil -} - -// UnsubscribeAll will unsubscribe all remaining subscribed channels -func (c *UserController) UnsubscribeAll(ctx context.Context) { - c.UnsubscribeIssue169Msg(ctx) -} - -// SubscribeIssue169Msg will subscribe to new messages from 'issue169.msg' channel. -// -// Callback function 'fn' will be called each time a new message is received. -func (c *UserController) SubscribeIssue169Msg( - ctx context.Context, - fn func(ctx context.Context, msg Issue169MsgSubscribeMessage) error, -) error { - // Get channel path - path := "issue169.msg" - - // Set context - ctx = addUserContextValues(ctx, path) - ctx = context.WithValue(ctx, extensions.ContextKeyIsDirection, "reception") - - // Check if there is already a subscription - _, exists := c.subscriptions[path] - if exists { - err := fmt.Errorf("%w: %q channel is already subscribed", extensions.ErrAlreadySubscribedChannel, path) - c.logger.Error(ctx, err.Error()) - return err - } - - // Subscribe to broker channel - sub, err := c.broker.Subscribe(ctx, path) - if err != nil { - c.logger.Error(ctx, err.Error()) - return err - } - c.logger.Info(ctx, "Subscribed to channel") - - // Asynchronously listen to new messages and pass them to app subscriber - go func() { - for { - // Wait for next message - acknowledgeableBrokerMessage, open := <-sub.MessagesChannel() - - // If subscription is closed and there is no more message - // (i.e. uninitialized message), then exit the function - if !open && acknowledgeableBrokerMessage.IsUninitialized() { - return - } - - // Set broker message to context - ctx = context.WithValue(ctx, extensions.ContextKeyIsBrokerMessage, acknowledgeableBrokerMessage.String()) - - // Execute middlewares before handling the message - if err := c.executeMiddlewares(ctx, &acknowledgeableBrokerMessage.BrokerMessage, func(ctx context.Context) error { - // Process message - msg, err := newIssue169MsgSubscribeMessageFromBrokerMessage(acknowledgeableBrokerMessage.BrokerMessage) - if err != nil { - return err - } - - // Execute the subscription function - if err := fn(ctx, msg); err != nil { - return err - } - - acknowledgeableBrokerMessage.Ack() - - return nil - }); err != nil { - c.errorHandler(ctx, path, &acknowledgeableBrokerMessage, err) - // On error execute the acknowledgeableBrokerMessage nack() function and - // let the BrokerAcknowledgment decide what is the right nack behavior for the broker - acknowledgeableBrokerMessage.Nak() - } - } - }() - - // Add the cancel channel to the inside map - c.subscriptions[path] = sub - - return nil -} - -// UnsubscribeIssue169Msg will unsubscribe messages from 'issue169.msg' channel. -// A timeout can be set in context to avoid blocking operation, if needed. -func (c *UserController) UnsubscribeIssue169Msg(ctx context.Context) { - // Get channel path - path := "issue169.msg" - - // Check if there subscribers for this channel - sub, exists := c.subscriptions[path] - if !exists { - return - } - - // Set context - ctx = addUserContextValues(ctx, path) - - // Stop the subscription - sub.Cancel(ctx) - - // Remove if from the subscribers - delete(c.subscriptions, path) - - c.logger.Info(ctx, "Unsubscribed from channel") } // PublishIssue169Msg will publish messages to 'issue169.msg' channel func (c *UserController) PublishIssue169Msg( ctx context.Context, - msg Issue169MsgPublishMessage, + msg Issue169MsgMessage, ) error { // Get channel path path := "issue169.msg" @@ -574,62 +419,21 @@ func (e *Error) Error() string { return fmt.Sprintf("channel %q: err %v", e.Channel, e.Err) } -// Issue169MsgSubscribeMessage is the message expected for 'Issue169MsgSubscribeMessage' channel. -type Issue169MsgSubscribeMessage struct { - // Payload will be inserted in the message payload - Payload string -} - -func NewIssue169MsgSubscribeMessage() Issue169MsgSubscribeMessage { - var msg Issue169MsgSubscribeMessage - - return msg -} - -// newIssue169MsgSubscribeMessageFromBrokerMessage will fill a new Issue169MsgSubscribeMessage with data from generic broker message -func newIssue169MsgSubscribeMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (Issue169MsgSubscribeMessage, error) { - var msg Issue169MsgSubscribeMessage - - // Convert to string - payload := string(bMsg.Payload) - msg.Payload = payload // No need for type conversion to reference - - // TODO: run checks on msg type - - return msg, nil -} - -// toBrokerMessage will generate a generic broker message from Issue169MsgSubscribeMessage data -func (msg Issue169MsgSubscribeMessage) toBrokerMessage() (extensions.BrokerMessage, error) { - // TODO: implement checks on message - - // Convert to []byte - payload := []byte(msg.Payload) - - // There is no headers here - headers := make(map[string][]byte, 0) - - return extensions.BrokerMessage{ - Headers: headers, - Payload: payload, - }, nil -} - -// Issue169MsgPublishMessage is the message expected for 'Issue169MsgPublishMessage' channel. -type Issue169MsgPublishMessage struct { +// Issue169MsgMessage is the message expected for 'Issue169MsgMessage' channel. +type Issue169MsgMessage struct { // Payload will be inserted in the message payload Payload string } -func NewIssue169MsgPublishMessage() Issue169MsgPublishMessage { - var msg Issue169MsgPublishMessage +func NewIssue169MsgMessage() Issue169MsgMessage { + var msg Issue169MsgMessage return msg } -// newIssue169MsgPublishMessageFromBrokerMessage will fill a new Issue169MsgPublishMessage with data from generic broker message -func newIssue169MsgPublishMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (Issue169MsgPublishMessage, error) { - var msg Issue169MsgPublishMessage +// newIssue169MsgMessageFromBrokerMessage will fill a new Issue169MsgMessage with data from generic broker message +func newIssue169MsgMessageFromBrokerMessage(bMsg extensions.BrokerMessage) (Issue169MsgMessage, error) { + var msg Issue169MsgMessage // Convert to string payload := string(bMsg.Payload) @@ -640,8 +444,8 @@ func newIssue169MsgPublishMessageFromBrokerMessage(bMsg extensions.BrokerMessage return msg, nil } -// toBrokerMessage will generate a generic broker message from Issue169MsgPublishMessage data -func (msg Issue169MsgPublishMessage) toBrokerMessage() (extensions.BrokerMessage, error) { +// toBrokerMessage will generate a generic broker message from Issue169MsgMessage data +func (msg Issue169MsgMessage) toBrokerMessage() (extensions.BrokerMessage, error) { // TODO: implement checks on message // Convert to []byte diff --git a/test/v2/issues/169/asyncapi.yaml b/test/v2/issues/169/asyncapi.yaml index f94ebf3b..5757c2a8 100644 --- a/test/v2/issues/169/asyncapi.yaml +++ b/test/v2/issues/169/asyncapi.yaml @@ -7,11 +7,7 @@ info: title: Sample App version: 1.0.0 channels: - issue169.msg: # channel that has the two operations - subscribe: - message: - payload: - type: string + issue169.msg: publish: message: payload: diff --git a/test/v2/issues/169/suite_test.go b/test/v2/issues/169/suite_test.go index 46115fc6..85fe7d75 100644 --- a/test/v2/issues/169/suite_test.go +++ b/test/v2/issues/169/suite_test.go @@ -12,6 +12,7 @@ import ( "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/kafka" "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/nats" "github.com/lerenn/asyncapi-codegen/pkg/extensions/brokers/natsjetstream" + testutil "github.com/lerenn/asyncapi-codegen/pkg/utils/test" natsio "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" "github.com/segmentio/kafka-go/sasl/scram" @@ -22,8 +23,15 @@ import ( func TestSuite(t *testing.T) { name := "issue169" - // core nats with TLS and basic auth - natsBrokerTLSBasicAuth, err := nats.NewController("nats://nats-tls-basic-auth:4222", nats.WithQueueGroup(name), + // NATS Core with TLS and basic auth + natsBrokerTLSBasicAuth, err := nats.NewController( + testutil.BrokerAddress(testutil.BrokerAddressParams{ + Schema: "nats", + DockerizedAddr: "nats-tls-basic-auth", + DockerizedPort: "4222", + LocalPort: "4224", + }), + nats.WithQueueGroup(name), nats.WithConnectionOpts(natsio.Secure(&tls.Config{InsecureSkipVerify: true}), natsio.UserInfo("user", "password"), ), @@ -32,9 +40,14 @@ func TestSuite(t *testing.T) { defer natsBrokerTLSBasicAuth.Close() suite.Run(t, NewSuite(natsBrokerTLSBasicAuth)) - // nats jetstream with TLS and basic auth + // NATS jetstream with TLS and basic auth natsJSBrokerTLSBasicAuth, err := natsjetstream.NewController( - "nats://nats-jetstream-tls-basic-auth:4222", + testutil.BrokerAddress(testutil.BrokerAddressParams{ + Schema: "nats", + DockerizedAddr: "nats-jetstream-tls-basic-auth", + DockerizedPort: "4222", + LocalPort: "4227", + }), natsjetstream.WithStreamConfig(jetstream.StreamConfig{ Name: name, Subjects: ChannelsPaths, @@ -48,10 +61,17 @@ func TestSuite(t *testing.T) { defer natsJSBrokerTLSBasicAuth.Close() suite.Run(t, NewSuite(natsJSBrokerTLSBasicAuth)) - // kafka with TLS and basic auth + // Kafka with TLS and basic auth sha512Mechanism, err := scram.Mechanism(scram.SHA512, "user", "password") assert.NoError(t, err, "new scram.SHA512 should not return a error") - kafkaBrokerTLSBasicAuth, err := kafka.NewController([]string{"kafka-tls-basic-auth:9092"}, + kafkaBrokerTLSBasicAuth, err := kafka.NewController( + []string{ + testutil.BrokerAddress(testutil.BrokerAddressParams{ + DockerizedAddr: "kafka-tls-basic-auth", + DockerizedPort: "9092", + LocalPort: "9096", + }), + }, kafka.WithGroupID(name), kafka.WithTLS(&tls.Config{InsecureSkipVerify: true}), kafka.WithSasl(sha512Mechanism), @@ -65,8 +85,6 @@ type Suite struct { app *AppController user *UserController suite.Suite - - wg sync.WaitGroup } func NewSuite(broker extensions.BrokerController) *Suite { @@ -92,52 +110,28 @@ func (suite *Suite) TearDownTest() { suite.user.Close(context.Background()) } -func (suite *Suite) TestIssue169App() { - // Test message - sent := Issue169MsgPublishMessage{ - Payload: "some test msg", - } - - // validate msg - err := suite.app.SubscribeIssue169Msg(context.Background(), - func(ctx context.Context, msg Issue169MsgSubscribeMessage) error { - defer suite.wg.Done() - suite.app.UnsubscribeIssue169Msg(ctx) - suite.Require().Equal(sent.Payload, msg.Payload) - return nil - }) - suite.Require().NoError(err) - - suite.wg.Add(1) - - // Publish the message - err = suite.app.PublishIssue169Msg(context.Background(), sent) - suite.Require().NoError(err) - - suite.wg.Wait() -} +func (suite *Suite) TestIssue169() { + var wg sync.WaitGroup -func (suite *Suite) TestIssue169User() { // Test message - sent := Issue169MsgPublishMessage{ + sent := Issue169MsgMessage{ Payload: "some test msg", } - // validate message - err := suite.user.SubscribeIssue169Msg(context.Background(), - func(ctx context.Context, msg Issue169MsgSubscribeMessage) error { - defer suite.wg.Done() - suite.user.UnsubscribeIssue169Msg(ctx) + // Validate msg + err := suite.app.SubscribeIssue169Msg(context.Background(), + func(ctx context.Context, msg Issue169MsgMessage) error { suite.Require().Equal(sent.Payload, msg.Payload) + wg.Done() return nil }) suite.Require().NoError(err) - - suite.wg.Add(1) + defer suite.app.UnsubscribeIssue169Msg(context.Background()) // Publish the message + wg.Add(1) err = suite.user.PublishIssue169Msg(context.Background(), sent) suite.Require().NoError(err) - suite.wg.Wait() + wg.Wait() } diff --git a/test/v2/issues/65/suite_test.go b/test/v2/issues/65/suite_test.go index bcea80fe..6f4b6152 100644 --- a/test/v2/issues/65/suite_test.go +++ b/test/v2/issues/65/suite_test.go @@ -1,4 +1,4 @@ -package asyncapi_test +package testutil import ( "regexp" diff --git a/test/v2/issues/73/suite_test.go b/test/v2/issues/73/suite_test.go index cb7a47ff..cc3c1414 100644 --- a/test/v2/issues/73/suite_test.go +++ b/test/v2/issues/73/suite_test.go @@ -14,14 +14,14 @@ import ( "github.com/lerenn/asyncapi-codegen/pkg/extensions/middlewares" "github.com/lerenn/asyncapi-codegen/pkg/extensions/versioning" "github.com/lerenn/asyncapi-codegen/pkg/utils" - asyncapi_test "github.com/lerenn/asyncapi-codegen/test" + testutil "github.com/lerenn/asyncapi-codegen/test" v1 "github.com/lerenn/asyncapi-codegen/test/v2/issues/73/v1" v2 "github.com/lerenn/asyncapi-codegen/test/v2/issues/73/v2" "github.com/stretchr/testify/suite" ) func TestSuite(t *testing.T) { - brokers, cleanup := asyncapi_test.BrokerControllers(t) + brokers, cleanup := testutil.BrokerControllers(t) defer cleanup() for _, b := range brokers { @@ -97,7 +97,6 @@ func (suite *Suite) TestV1Reception() { // Check what the app receive and translate var recvMsg v1.V2Issue73HelloMessage - wg.Add(1) err := suite.v1.app.SubscribeV2Issue73Hello( context.Background(), func(_ context.Context, msg v1.V2Issue73HelloMessage) error { @@ -117,6 +116,7 @@ func (suite *Suite) TestV1Reception() { suite.Require().NoError(err) // Publish the message + wg.Add(1) err = suite.v1.user.PublishV2Issue73Hello(context.Background(), sent) suite.Require().NoError(err) @@ -158,7 +158,6 @@ func (suite *Suite) TestV2Reception() { // Check what the app receive and translate var recvMsg v2.V2Issue73HelloMessage - wg.Add(1) err = suite.v2.app.SubscribeV2Issue73Hello( context.Background(), func(_ context.Context, msg v2.V2Issue73HelloMessage) error { @@ -169,6 +168,7 @@ func (suite *Suite) TestV2Reception() { suite.Require().NoError(err) // Publish the message + wg.Add(1) err = suite.v2.user.PublishV2Issue73Hello(context.Background(), sent) suite.Require().NoError(err) diff --git a/test/v2/issues/74/suite_test.go b/test/v2/issues/74/suite_test.go index f12a317b..a7d50ab1 100644 --- a/test/v2/issues/74/suite_test.go +++ b/test/v2/issues/74/suite_test.go @@ -11,12 +11,12 @@ import ( "github.com/lerenn/asyncapi-codegen/pkg/extensions" "github.com/lerenn/asyncapi-codegen/pkg/extensions/middlewares" "github.com/lerenn/asyncapi-codegen/pkg/utils" - asyncapi_test "github.com/lerenn/asyncapi-codegen/test" + testutil "github.com/lerenn/asyncapi-codegen/test" "github.com/stretchr/testify/suite" ) func TestSuite(t *testing.T) { - brokers, cleanup := asyncapi_test.BrokerControllers(t) + brokers, cleanup := testutil.BrokerControllers(t) defer cleanup() for _, b := range brokers { @@ -73,7 +73,6 @@ func (suite *Suite) TestHeaders() { // Check what the app receive and translate var recvMsg TestMessage - wg.Add(1) err := suite.app.SubscribeV2Issue74TestChannel(context.Background(), func(_ context.Context, msg TestMessage) error { recvMsg = msg wg.Done() @@ -82,6 +81,7 @@ func (suite *Suite) TestHeaders() { suite.Require().NoError(err) // Publish the message + wg.Add(1) err = suite.user.PublishV2Issue74TestChannel(context.Background(), sent) suite.Require().NoError(err) diff --git a/test/v2/issues/99/suite_test.go b/test/v2/issues/99/suite_test.go index 76e61e39..863d4614 100644 --- a/test/v2/issues/99/suite_test.go +++ b/test/v2/issues/99/suite_test.go @@ -9,12 +9,12 @@ import ( "github.com/lerenn/asyncapi-codegen/pkg/extensions" "github.com/lerenn/asyncapi-codegen/pkg/extensions/middlewares" - asyncapi_test "github.com/lerenn/asyncapi-codegen/test" + testutil "github.com/lerenn/asyncapi-codegen/test" "github.com/stretchr/testify/suite" ) func TestSuite(t *testing.T) { - brokers, cleanup := asyncapi_test.BrokerControllers(t) + brokers, cleanup := testutil.BrokerControllers(t) defer cleanup() for _, b := range brokers { @@ -76,7 +76,6 @@ func (suite *Suite) TestAddingHeader() { // Check what the app receive and translate var recvMsg V2Issue99TestMessage - wg.Add(1) err := suite.app.SubscribeV2Issue99Test(context.Background(), func(_ context.Context, msg V2Issue99TestMessage) error { recvMsg = msg wg.Done() @@ -85,6 +84,7 @@ func (suite *Suite) TestAddingHeader() { suite.Require().NoError(err) // Publish the message + wg.Add(1) err = suite.user.PublishV2Issue99Test(context.Background(), sent) suite.Require().NoError(err) diff --git a/test/v3/issues/129/suite_test.go b/test/v3/issues/129/suite_test.go index 9ab268f8..7dc6b76a 100644 --- a/test/v3/issues/129/suite_test.go +++ b/test/v3/issues/129/suite_test.go @@ -12,7 +12,7 @@ import ( "github.com/lerenn/asyncapi-codegen/pkg/extensions" "github.com/lerenn/asyncapi-codegen/pkg/extensions/middlewares" "github.com/lerenn/asyncapi-codegen/pkg/utils" - asyncapi_test "github.com/lerenn/asyncapi-codegen/test" + testutil "github.com/lerenn/asyncapi-codegen/test" "github.com/lerenn/asyncapi-codegen/test/v3/issues/129/camel" "github.com/lerenn/asyncapi-codegen/test/v3/issues/129/kebab" "github.com/lerenn/asyncapi-codegen/test/v3/issues/129/none" @@ -21,7 +21,7 @@ import ( ) func TestSuite(t *testing.T) { - brokers, cleanup := asyncapi_test.BrokerControllers(t) + brokers, cleanup := testutil.BrokerControllers(t) defer cleanup() for _, b := range brokers { diff --git a/test/v3/issues/130/decoupling/suite.go b/test/v3/issues/130/decoupling/suite.go index cc6852e5..a2b5c59b 100644 --- a/test/v3/issues/130/decoupling/suite.go +++ b/test/v3/issues/130/decoupling/suite.go @@ -56,13 +56,13 @@ func (suite *Suite) TestSendReceive() { }) suite.Require().NoError(err) defer suite.app.UnsubscribeFromConsumeUserSignupOperation(context.Background()) - wg.Add(1) // Set a new message var msg UserMessage msg.Payload.DisplayName = utils.ToPointer("testing") // Send the new message + wg.Add(1) err = suite.user.SendToConsumeUserSignupOperation(context.Background(), msg) suite.Require().NoError(err) diff --git a/test/v3/issues/130/parameters/suite.go b/test/v3/issues/130/parameters/suite.go index 125a645a..ac097e2e 100644 --- a/test/v3/issues/130/parameters/suite.go +++ b/test/v3/issues/130/parameters/suite.go @@ -62,13 +62,13 @@ func (suite *Suite) TestParameter() { }) suite.Require().NoError(err) defer suite.app.UnsubscribeFromReceiveUserSignedUpOperation(context.Background(), params) - wg.Add(1) // Set a new message var msg UserMessage msg.Payload.Name = utils.ToPointer("testing") // Send the new message + wg.Add(1) err = suite.user.SendToReceiveUserSignedUpOperation(context.Background(), params, msg) suite.Require().NoError(err) diff --git a/test/v3/issues/130/suite_test.go b/test/v3/issues/130/suite_test.go index 48475fbb..e443c844 100644 --- a/test/v3/issues/130/suite_test.go +++ b/test/v3/issues/130/suite_test.go @@ -5,7 +5,7 @@ package issue130 import ( "testing" - asyncapi_test "github.com/lerenn/asyncapi-codegen/test" + testutil "github.com/lerenn/asyncapi-codegen/test" "github.com/lerenn/asyncapi-codegen/test/v3/issues/130/decoupling" "github.com/lerenn/asyncapi-codegen/test/v3/issues/130/parameters" "github.com/lerenn/asyncapi-codegen/test/v3/issues/130/requestreply" @@ -14,7 +14,7 @@ import ( ) func TestSuite(t *testing.T) { - brokers, cleanup := asyncapi_test.BrokerControllers(t) + brokers, cleanup := testutil.BrokerControllers(t) defer cleanup() for _, b := range brokers { diff --git a/test/v3/issues/145/suite_test.go b/test/v3/issues/145/suite_test.go index 5c91ef4d..d2480e85 100644 --- a/test/v3/issues/145/suite_test.go +++ b/test/v3/issues/145/suite_test.go @@ -9,12 +9,12 @@ import ( "github.com/lerenn/asyncapi-codegen/pkg/extensions" "github.com/lerenn/asyncapi-codegen/pkg/utils" - asyncapi_test "github.com/lerenn/asyncapi-codegen/test" + testutil "github.com/lerenn/asyncapi-codegen/test" "github.com/stretchr/testify/suite" ) func TestSuite(t *testing.T) { - brokers, cleanup := asyncapi_test.BrokerControllers(t) + brokers, cleanup := testutil.BrokerControllers(t) defer cleanup() for _, b := range brokers { @@ -108,7 +108,6 @@ func (suite *Suite) TestRequestReplyOnRawChannel() { wg.Done() }() - wg.Add(1) // Set a new ping var msg PingMessage @@ -116,6 +115,7 @@ func (suite *Suite) TestRequestReplyOnRawChannel() { msg.Headers.ReplyTo = utils.ToPointer("issue145.pong.2345") // Send a request + wg.Add(1) err = suite.user.SendToPingRequestOperation(context.Background(), msg) suite.Require().NoError(err) diff --git a/test/v3/issues/164/suite_test.go b/test/v3/issues/164/suite_test.go index 4960b6be..97b99a61 100644 --- a/test/v3/issues/164/suite_test.go +++ b/test/v3/issues/164/suite_test.go @@ -10,12 +10,12 @@ import ( "github.com/lerenn/asyncapi-codegen/pkg/extensions" "github.com/lerenn/asyncapi-codegen/pkg/extensions/middlewares" "github.com/lerenn/asyncapi-codegen/pkg/utils" - asyncapi_test "github.com/lerenn/asyncapi-codegen/test" + testutil "github.com/lerenn/asyncapi-codegen/test" "github.com/stretchr/testify/suite" ) func TestSuite(t *testing.T) { - brokers, cleanup := asyncapi_test.BrokerControllers(t) + brokers, cleanup := testutil.BrokerControllers(t) defer cleanup() for _, b := range brokers { @@ -74,7 +74,6 @@ func (suite *Suite) TestAdditionalProperties() { // Check what the app receive and translate var recvMsg TestMapMessage - wg.Add(1) err := suite.app.SubscribeToTestMapOperation( context.Background(), func(_ context.Context, msg TestMapMessage) error { @@ -85,6 +84,7 @@ func (suite *Suite) TestAdditionalProperties() { suite.Require().NoError(err) // Send the message + wg.Add(1) err = suite.user.SendToTestMapOperation(context.Background(), sent) suite.Require().NoError(err) diff --git a/tools/generate-certs/main.go b/tools/generate-certs/main.go new file mode 100644 index 00000000..c2c7f0b3 --- /dev/null +++ b/tools/generate-certs/main.go @@ -0,0 +1,47 @@ +package main + +import ( + "fmt" + "os" + "path/filepath" + + testutil "github.com/lerenn/asyncapi-codegen/pkg/utils/test" +) + +func main() { + createKafkaCerts() + createNATSCerts() +} + +func createNATSCerts() { + // Create NATS certs + key, cert, err := testutil.GenerateSelfSignedCertificate("localhost") + if err != nil { + panic(fmt.Errorf("failed to generate self signed certificate: %w", err)) + } + + // Export NATS certs + basePath := filepath.Join(".", "tmp", "certs", "nats") + if err := os.MkdirAll(basePath, os.ModePerm); err != nil { + panic(err) + } + os.WriteFile(filepath.Join(basePath, "server-key.pem"), key, os.ModePerm) + os.WriteFile(filepath.Join(basePath, "server-cert.pem"), cert, os.ModePerm) +} + +func createKafkaCerts() { + // Create Kafka certs + key, cert, cacert, err := testutil.GenerateSelfSignedCertificateWithCA("localhost") + if err != nil { + panic(fmt.Errorf("failed to generate self signed certificate: %w", err)) + } + + // Export Kafka certs + basePath := filepath.Join(".", "tmp", "certs", "kafka") + if err := os.MkdirAll(basePath, os.ModePerm); err != nil { + panic(err) + } + os.WriteFile(filepath.Join(basePath, "kafka.keystore.key"), key, os.ModePerm) + os.WriteFile(filepath.Join(basePath, "kafka.keystore.pem"), cert, os.ModePerm) + os.WriteFile(filepath.Join(basePath, "kafka.truststore.pem"), cacert, os.ModePerm) +}