Skip to content

Commit

Permalink
Receiver update (#16)
Browse files Browse the repository at this point in the history
* Add url validation

* Add termination on invalid api token

* Add termination on invalid api token

* Move errors

* Add api response validation

* Stop collector if unable to authenticate with api token

* Fix typos

* Add health_check extension
  • Loading branch information
julgircast committed Jul 13, 2023
1 parent c6fd0d0 commit b8ffec7
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 14 deletions.
27 changes: 23 additions & 4 deletions auditlogsreceiver/audit_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strconv"
"sync"
"syscall"
"time"

"github.com/go-resty/resty/v2"
Expand Down Expand Up @@ -36,6 +37,7 @@ type auditLogsReceiver struct {

func (a *auditLogsReceiver) Start(ctx context.Context, _ component.Host) error {
a.logger.Debug("starting audit logs receiver")

a.wg.Add(1)
go a.startPolling(ctx)

Expand All @@ -53,11 +55,13 @@ func (a *auditLogsReceiver) Shutdown(_ context.Context) error {
func (a *auditLogsReceiver) startPolling(ctx context.Context) {
defer a.wg.Done()

ctx, cancel := context.WithCancel(ctx)

t := time.NewTicker(a.pollInterval)
defer t.Stop()

for {
err := a.poll(ctx)
err := a.poll(ctx, cancel)
if err != nil {
a.logger.Error("there was an error during the poll", zap.Error(err))
}
Expand All @@ -73,7 +77,7 @@ func (a *auditLogsReceiver) startPolling(ctx context.Context) {
}
}

func (a *auditLogsReceiver) poll(ctx context.Context) error {
func (a *auditLogsReceiver) poll(ctx context.Context, cancel context.CancelFunc) error {
// It is OK to have long durations (to - from) as backend will handle it through pagination & page limit.
fromDate := a.store.GetFromDate()
toDate := time.Now()
Expand All @@ -95,11 +99,26 @@ func (a *auditLogsReceiver) poll(ctx context.Context) error {
if err != nil {
return err
}
if resp.StatusCode()/100 != 2 { // nolint:gomnd
return fmt.Errorf("unexpected response from audit logs api: code=%d, payload='%v'", resp.StatusCode(), string(resp.Body()))
if resp.StatusCode() > 399 {
switch resp.StatusCode() {
case 401, 403:
// Shutdown collector if unable to authenticate to the api.
cancel()
err := syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
if err != nil {
return err
}
return fmt.Errorf("invalid api token, response code: %d", resp.StatusCode())
default:
a.logger.Warn("unexpected response from audit logs api:", zap.Any("response_code", resp.StatusCode()))
return fmt.Errorf("got non 200 status code %d", resp.StatusCode())
}
}

auditLogsMap, err := a.processResponseBody(ctx, resp.Body(), toDate)
if err != nil {
return err
}
c, ok := auditLogsMap["nextCursor"]
if !ok {
// Cursor data is not provided, so it is the last page.
Expand Down
10 changes: 9 additions & 1 deletion auditlogsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package auditlogs

import (
"errors"
"net/url"

"go.opentelemetry.io/collector/component"
)
Expand All @@ -24,7 +25,14 @@ func newDefaultConfig() component.Config {
}

func (c Config) Validate() error {
// TODO: Validate URL and trim last '/' if present
if c.Url == "" {
return errors.New("api url must be specified")
}

_, err := url.ParseRequestURI(c.Url)
if err != nil {
return errors.New("api url must be in the form of <scheme>://<hostname>:<port>")
}

if c.Token == "" {
return errors.New("api token cannot be empty")
Expand Down
23 changes: 14 additions & 9 deletions auditlogsreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package auditlogs
import (
"context"
"errors"
"github.com/castai/otel-receivers/audit-logs/storage"
"strings"
"sync"
"time"

"github.com/castai/otel-receivers/audit-logs/storage"

"github.com/go-resty/resty/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand Down Expand Up @@ -36,13 +38,6 @@ func CreateAuditLogsReceiver(
return nil, errInvalidConfig
}

rest := resty.New()
rest.SetBaseURL(cfg.Url + "/v1/audit")
rest.SetHeader("X-API-Key", cfg.Token)
rest.SetHeader("Content-Type", "application/json")
rest.SetRetryCount(1)
rest.SetTimeout(time.Second * 10)

// TODO: introduce possibility to use Persistent Store based on configuration.
store := storage.NewEphemeralStore(time.Now().Add(-1 * time.Duration(cfg.PollIntervalSec) * time.Second))

Expand All @@ -54,7 +49,17 @@ func CreateAuditLogsReceiver(
wg: &sync.WaitGroup{},
doneChan: make(chan bool),
store: store,
rest: rest,
rest: newRestyClient(cfg),
consumer: consumer,
}, nil
}

func newRestyClient(cfg *Config) *resty.Client {
client := resty.New().
SetHeader("Content-Type", "application/json").
SetRetryCount(1).
SetTimeout(time.Second*10).
SetBaseURL(strings.TrimSuffix(cfg.Url, "/")+"/v1/audit").
SetHeader("X-API-Key", cfg.Token)
return client
}
7 changes: 7 additions & 0 deletions builder-config-console.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ receivers:
name: "auditlogsreceiver"
path: "./auditlogsreceiver/"

extensions:
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension v0.80.0

processors:

exporters:
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter v0.80.0

replaces:
# Override Lokiexporter dependencies.
- google.golang.org/genproto => google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc
4 changes: 4 additions & 0 deletions collector-console-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ exporters:

processors:

extensions:
health_check:

service:
extensions: [health_check]
telemetry:
logs:
level: "debug"
Expand Down
3 changes: 3 additions & 0 deletions examples/loki/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ receivers:
name: "auditlogsreceiver"
path: "./auditlogsreceiver/"

extensions:
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension v0.80.0

processors:
- gomod: "github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.80.0"

Expand Down
4 changes: 4 additions & 0 deletions examples/loki/collector-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ processors:
key: loki.attribute.labels
value: log.file.name

extensions:
health_check:

service:
extensions: [health_check]
telemetry:
logs:
level: "debug"
Expand Down

0 comments on commit b8ffec7

Please sign in to comment.