Skip to content

Commit

Permalink
Allow setting a connection name. (#8)
Browse files Browse the repository at this point in the history
* Allow setting a connection name.

This can be helpful to identify specific connections in RabbitMQ, for
debugging or tracing purposes.

* Move constants to constants.go

* Initialized CHANGELOG.md

---------

Co-authored-by: Alex <alex.khoury.perso@gmail.com>
  • Loading branch information
rubenv and m3talux committed May 7, 2024
1 parent 3115f2f commit c40e20b
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 12 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# 1.1.0

- Allow setting a connection name ([PR](https://github.com/KardinalAI/gorabbit/pull/8))
1 change: 1 addition & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func newClientFromOptions(options *ClientOptions) MQTTClient {
client.connectionManager = newConnectionManager(
client.ctx,
dialURL,
options.ConnectionName,
options.KeepAlive,
options.RetryDelay,
options.MaxRetry,
Expand Down
10 changes: 10 additions & 0 deletions client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type ClientOptions struct {
// UseTLS defines whether we use amqp or amqps protocol.
UseTLS bool

// ConnectionName is the client connection name passed on to the RabbitMQ server.
ConnectionName string

// KeepAlive will determine whether the re-connection and retry mechanisms should be triggered.
KeepAlive bool

Expand Down Expand Up @@ -142,6 +145,13 @@ func (c *ClientOptions) SetUseTLS(use bool) *ClientOptions {
return c
}

// SetConnectionName will assign the ConnectionName.
func (c *ClientOptions) SetConnectionName(connectionName string) *ClientOptions {
c.ConnectionName = connectionName

return c
}

// SetKeepAlive will assign the KeepAlive status.
func (c *ClientOptions) SetKeepAlive(keepAlive bool) *ClientOptions {
c.KeepAlive = keepAlive
Expand Down
45 changes: 35 additions & 10 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type amqpConnection struct {
// uri represents the connection string to the RabbitMQ server.
uri string

// connectionName is the client connection name passed on to the RabbitMQ server.
connectionName string

// keepAlive is the flag that will define whether active guards and re-connections are enabled or not.
keepAlive bool

Expand Down Expand Up @@ -50,16 +53,18 @@ type amqpConnection struct {
// newConsumerConnection initializes a new consumer amqpConnection with given arguments.
// - ctx is the parent context.
// - uri is the connection string.
// - connectionName is the connection name.
// - keepAlive will keep the connection alive if true.
// - retryDelay defines the delay between each re-connection, if the keepAlive flag is set to true.
// - logger is the parent logger.
func newConsumerConnection(ctx context.Context, uri string, keepAlive bool, retryDelay time.Duration, logger logger) *amqpConnection {
return newConnection(ctx, uri, keepAlive, retryDelay, logger, connectionTypeConsumer)
func newConsumerConnection(ctx context.Context, uri, connectionName string, keepAlive bool, retryDelay time.Duration, logger logger) *amqpConnection {
return newConnection(ctx, uri, connectionName, keepAlive, retryDelay, logger, connectionTypeConsumer)
}

// newPublishingConnection initializes a new publisher amqpConnection with given arguments.
// - ctx is the parent context.
// - uri is the connection string.
// - connectionName is the connection name.
// - keepAlive will keep the connection alive if true.
// - retryDelay defines the delay between each re-connection, if the keepAlive flag is set to true.
// - maxRetry defines the publishing max retry header.
Expand All @@ -69,14 +74,15 @@ func newConsumerConnection(ctx context.Context, uri string, keepAlive bool, retr
func newPublishingConnection(
ctx context.Context,
uri string,
connectionName string,
keepAlive bool,
retryDelay time.Duration,
maxRetry uint,
publishingCacheSize uint64,
publishingCacheTTL time.Duration,
logger logger,
) *amqpConnection {
conn := newConnection(ctx, uri, keepAlive, retryDelay, logger, connectionTypePublisher)
conn := newConnection(ctx, uri, connectionName, keepAlive, retryDelay, logger, connectionTypePublisher)

conn.maxRetry = maxRetry
conn.publishingCacheSize = publishingCacheSize
Expand All @@ -88,16 +94,26 @@ func newPublishingConnection(
// newConnection initializes a new amqpConnection with given arguments.
// - ctx is the parent context.
// - uri is the connection string.
// - connectionName is the connection name.
// - keepAlive will keep the connection alive if true.
// - retryDelay defines the delay between each re-connection, if the keepAlive flag is set to true.
// - logger is the parent logger.
func newConnection(ctx context.Context, uri string, keepAlive bool, retryDelay time.Duration, logger logger, connectionType connectionType) *amqpConnection {
func newConnection(
ctx context.Context,
uri string,
connectionName string,
keepAlive bool,
retryDelay time.Duration,
logger logger,
connectionType connectionType,
) *amqpConnection {
conn := &amqpConnection{
ctx: ctx,
uri: uri,
keepAlive: keepAlive,
retryDelay: retryDelay,
channels: make(amqpChannels, 0),
ctx: ctx,
uri: uri,
connectionName: connectionName,
keepAlive: keepAlive,
retryDelay: retryDelay,
channels: make(amqpChannels, 0),
logger: inheritLogger(logger, map[string]interface{}{
"context": "connection",
"type": connectionType,
Expand Down Expand Up @@ -127,8 +143,17 @@ func (a *amqpConnection) open() error {

a.logger.Debug("Connecting to RabbitMQ server", logField{Key: "uri", Value: a.uriForLog()})

props := amqp.NewConnectionProperties()
if a.connectionName != "" {
props.SetClientConnectionName(a.connectionName)
}

// We request a connection from the RabbitMQ server.
conn, err := amqp.Dial(a.uri)
conn, err := amqp.DialConfig(a.uri, amqp.Config{
Heartbeat: defaultHeartbeat,
Locale: defaultLocale,
Properties: props,
})
if err != nil {
a.logger.Error(err, "Connection failed")

Expand Down
5 changes: 3 additions & 2 deletions connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type connectionManager struct {
func newConnectionManager(
ctx context.Context,
uri string,
connectionName string,
keepAlive bool,
retryDelay time.Duration,
maxRetry uint,
Expand All @@ -26,8 +27,8 @@ func newConnectionManager(
logger logger,
) *connectionManager {
c := &connectionManager{
consumerConnection: newConsumerConnection(ctx, uri, keepAlive, retryDelay, logger),
publisherConnection: newPublishingConnection(ctx, uri, keepAlive, retryDelay, maxRetry, publishingCacheSize, publishingCacheTTL, logger),
consumerConnection: newConsumerConnection(ctx, uri, connectionName, keepAlive, retryDelay, logger),
publisherConnection: newPublishingConnection(ctx, uri, connectionName, keepAlive, retryDelay, maxRetry, publishingCacheSize, publishingCacheTTL, logger),
}

return c
Expand Down
6 changes: 6 additions & 0 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ const (
defaultMode = Release
)

// Default values for the amqp Config.
const (
defaultHeartbeat = 10 * time.Second
defaultLocale = "en_US"
)

const (
xDeathCountHeader = "x-death-count"
)
Expand Down

0 comments on commit c40e20b

Please sign in to comment.