diff --git a/auditlogsreceiver/audit_logs.go b/auditlogsreceiver/audit_logs.go index 8c5a4ec..7b0534b 100644 --- a/auditlogsreceiver/audit_logs.go +++ b/auditlogsreceiver/audit_logs.go @@ -6,6 +6,7 @@ import ( "fmt" "strconv" "sync" + "syscall" "time" "github.com/go-resty/resty/v2" @@ -36,6 +37,7 @@ type auditLogsReceiver struct { func (a *auditLogsReceiver) Start(ctx context.Context, _ component.Host) error { a.logger.Debug("starting audit logs receiver") + a.wg.Add(1) go a.startPolling(ctx) @@ -53,11 +55,13 @@ func (a *auditLogsReceiver) Shutdown(_ context.Context) error { func (a *auditLogsReceiver) startPolling(ctx context.Context) { defer a.wg.Done() + ctx, cancel := context.WithCancel(ctx) + t := time.NewTicker(a.pollInterval) defer t.Stop() for { - err := a.poll(ctx) + err := a.poll(ctx, cancel) if err != nil { a.logger.Error("there was an error during the poll", zap.Error(err)) } @@ -73,7 +77,7 @@ func (a *auditLogsReceiver) startPolling(ctx context.Context) { } } -func (a *auditLogsReceiver) poll(ctx context.Context) error { +func (a *auditLogsReceiver) poll(ctx context.Context, cancel context.CancelFunc) error { // It is OK to have long durations (to - from) as backend will handle it through pagination & page limit. fromDate := a.store.GetFromDate() toDate := time.Now() @@ -95,11 +99,26 @@ func (a *auditLogsReceiver) poll(ctx context.Context) error { if err != nil { return err } - if resp.StatusCode()/100 != 2 { // nolint:gomnd - return fmt.Errorf("unexpected response from audit logs api: code=%d, payload='%v'", resp.StatusCode(), string(resp.Body())) + if resp.StatusCode() > 399 { + switch resp.StatusCode() { + case 401, 403: + // Shutdown collector if unable to authenticate to the api. + cancel() + err := syscall.Kill(syscall.Getpid(), syscall.SIGTERM) + if err != nil { + return err + } + return fmt.Errorf("invalid api token, response code: %d", resp.StatusCode()) + default: + a.logger.Warn("unexpected response from audit logs api:", zap.Any("response_code", resp.StatusCode())) + return fmt.Errorf("got non 200 status code %d", resp.StatusCode()) + } } auditLogsMap, err := a.processResponseBody(ctx, resp.Body(), toDate) + if err != nil { + return err + } c, ok := auditLogsMap["nextCursor"] if !ok { // Cursor data is not provided, so it is the last page. diff --git a/auditlogsreceiver/config.go b/auditlogsreceiver/config.go index d37d33d..e6ed8be 100644 --- a/auditlogsreceiver/config.go +++ b/auditlogsreceiver/config.go @@ -2,6 +2,7 @@ package auditlogs import ( "errors" + "net/url" "go.opentelemetry.io/collector/component" ) @@ -24,7 +25,14 @@ func newDefaultConfig() component.Config { } func (c Config) Validate() error { - // TODO: Validate URL and trim last '/' if present + if c.Url == "" { + return errors.New("api url must be specified") + } + + _, err := url.ParseRequestURI(c.Url) + if err != nil { + return errors.New("api url must be in the form of ://:") + } if c.Token == "" { return errors.New("api token cannot be empty") diff --git a/auditlogsreceiver/factory.go b/auditlogsreceiver/factory.go index 9b18d6f..793f8e7 100644 --- a/auditlogsreceiver/factory.go +++ b/auditlogsreceiver/factory.go @@ -3,10 +3,12 @@ package auditlogs import ( "context" "errors" - "github.com/castai/otel-receivers/audit-logs/storage" + "strings" "sync" "time" + "github.com/castai/otel-receivers/audit-logs/storage" + "github.com/go-resty/resty/v2" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -36,13 +38,6 @@ func CreateAuditLogsReceiver( return nil, errInvalidConfig } - rest := resty.New() - rest.SetBaseURL(cfg.Url + "/v1/audit") - rest.SetHeader("X-API-Key", cfg.Token) - rest.SetHeader("Content-Type", "application/json") - rest.SetRetryCount(1) - rest.SetTimeout(time.Second * 10) - // TODO: introduce possibility to use Persistent Store based on configuration. store := storage.NewEphemeralStore(time.Now().Add(-1 * time.Duration(cfg.PollIntervalSec) * time.Second)) @@ -54,7 +49,17 @@ func CreateAuditLogsReceiver( wg: &sync.WaitGroup{}, doneChan: make(chan bool), store: store, - rest: rest, + rest: newRestyClient(cfg), consumer: consumer, }, nil } + +func newRestyClient(cfg *Config) *resty.Client { + client := resty.New(). + SetHeader("Content-Type", "application/json"). + SetRetryCount(1). + SetTimeout(time.Second*10). + SetBaseURL(strings.TrimSuffix(cfg.Url, "/")+"/v1/audit"). + SetHeader("X-API-Key", cfg.Token) + return client +} diff --git a/builder-config-console.yaml b/builder-config-console.yaml index 45db7b4..5d9d452 100644 --- a/builder-config-console.yaml +++ b/builder-config-console.yaml @@ -10,7 +10,14 @@ receivers: name: "auditlogsreceiver" path: "./auditlogsreceiver/" +extensions: + - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension v0.80.0 + processors: exporters: - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter v0.80.0 + +replaces: + # Override Lokiexporter dependencies. + - google.golang.org/genproto => google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc diff --git a/collector-console-config.yaml b/collector-console-config.yaml index ef55211..346497d 100644 --- a/collector-console-config.yaml +++ b/collector-console-config.yaml @@ -12,7 +12,11 @@ exporters: processors: +extensions: + health_check: + service: + extensions: [health_check] telemetry: logs: level: "debug" diff --git a/examples/loki/builder-config.yaml b/examples/loki/builder-config.yaml index 33762dd..536e594 100644 --- a/examples/loki/builder-config.yaml +++ b/examples/loki/builder-config.yaml @@ -10,6 +10,9 @@ receivers: name: "auditlogsreceiver" path: "./auditlogsreceiver/" +extensions: + - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension v0.80.0 + processors: - gomod: "github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.80.0" diff --git a/examples/loki/collector-config.yaml b/examples/loki/collector-config.yaml index b0b547d..ec6acad 100644 --- a/examples/loki/collector-config.yaml +++ b/examples/loki/collector-config.yaml @@ -16,7 +16,11 @@ processors: key: loki.attribute.labels value: log.file.name +extensions: + health_check: + service: + extensions: [health_check] telemetry: logs: level: "debug"