From bd7a21dc69b9d43bbd19a56591297b8ded68ca2a Mon Sep 17 00:00:00 2001 From: Thomas Bianchi Date: Thu, 27 May 2021 10:05:15 +0200 Subject: [PATCH] style: go fmt --- .pre-commit-config.yaml | 17 ++++ batcher.go | 189 ++++++++++++++++++++-------------------- config.go | 1 - config_schema.go | 14 +-- dispatcher.go | 27 +++--- go.mod | 11 ++- go.sum | 23 +++-- main.go | 2 +- processor.go | 13 ++- 9 files changed, 160 insertions(+), 137 deletions(-) create mode 100644 .pre-commit-config.yaml diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..1671a4d --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,17 @@ +repos: +- repo: git://github.com/dnephin/pre-commit-golang + rev: master + hooks: + - id: go-fmt + - id: go-mod-tidy + - id: go-unit-tests + # - id: go-vet + # - id: go-lint + # - id: go-imports + # - id: go-cyclo + # args: [-over=15] + # - id: validate-toml + # - id: no-go-testing + # - id: golangci-lint + # - id: go-critic + # - id: go-build diff --git a/batcher.go b/batcher.go index 9536663..2b7d919 100644 --- a/batcher.go +++ b/batcher.go @@ -26,18 +26,18 @@ import ( //lint:file-ignore U1000 TDD const ( - TIMEOUT = 30 + TIMEOUT = 30 ) type timeseries_sender func(proc *processor, tenant string, timeseries []prompb.TimeSeries) (code int, body []byte, err error) type Worker struct { - batchsize int - timeout int - tenant string - buffer []prompb.TimeSeries - sender timeseries_sender - proc *processor + batchsize int + timeout int + tenant string + buffer []prompb.TimeSeries + sender timeseries_sender + proc *processor } func marshal(wr *prompb.WriteRequest) (bufOut []byte, err error) { @@ -50,102 +50,101 @@ func marshal(wr *prompb.WriteRequest) (bufOut []byte, err error) { return snappy.Encode(nil, b), nil } - func send_timeseries(proc *processor, tenant string, timeseries []prompb.TimeSeries) (int, []byte, error) { for c := 0; c < 10; c++ { - var code int - var body []byte - var err error - req := fh.AcquireRequest() - resp := fh.AcquireResponse() - - defer func() { - fh.ReleaseRequest(req) - fh.ReleaseResponse(resp) - }() - - wr := prompb.WriteRequest{ - Timeseries: timeseries, - } - - buf, err := marshal(&wr) - if err != nil { - return code, body, err - } - - req.Header.SetMethod("POST") - req.Header.Set("Content-Encoding", "snappy") - req.Header.Set("Content-Type", "application/x-protobuf") - req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") - req.Header.Set(proc.cfg.Tenant.Header, tenant) - req.SetRequestURI(proc.cfg.Target) - req.SetBody(buf) - if err = proc.cli.DoTimeout(req, resp, proc.cfg.Timeout); err != nil { - log.Warnf("Error on do timeout: %s", err) - return code, body, err - } - - code = resp.Header.StatusCode() - body = make([]byte, len(resp.Body())) - copy(body, resp.Body()) - - if code == 429 { - // resp.Header.VisitAll(func (key, value []byte) { - // log.Printf("Headers: %v: %v", string(key), string(value)) - // }) - time.Sleep(500) - continue - } - - if code != fh.StatusOK { - log.Errorf("Error on senting writerequest to Cortex: code %d Body %s", code, body) - return code, body, err - } - if code == fh.StatusOK { - return code, body, err - } - } - return 0, nil, errors.New("finished retry attemps") + var code int + var body []byte + var err error + req := fh.AcquireRequest() + resp := fh.AcquireResponse() + + defer func() { + fh.ReleaseRequest(req) + fh.ReleaseResponse(resp) + }() + + wr := prompb.WriteRequest{ + Timeseries: timeseries, + } + + buf, err := marshal(&wr) + if err != nil { + return code, body, err + } + + req.Header.SetMethod("POST") + req.Header.Set("Content-Encoding", "snappy") + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + req.Header.Set(proc.cfg.Tenant.Header, tenant) + req.SetRequestURI(proc.cfg.Target) + req.SetBody(buf) + if err = proc.cli.DoTimeout(req, resp, proc.cfg.Timeout); err != nil { + log.Warnf("Error on do timeout: %s", err) + return code, body, err + } + + code = resp.Header.StatusCode() + body = make([]byte, len(resp.Body())) + copy(body, resp.Body()) + + if code == 429 { + // resp.Header.VisitAll(func (key, value []byte) { + // log.Printf("Headers: %v: %v", string(key), string(value)) + // }) + time.Sleep(500) + continue + } + + if code != fh.StatusOK { + log.Errorf("Error on senting writerequest to Cortex: code %d Body %s", code, body) + return code, body, err + } + if code == fh.StatusOK { + return code, body, err + } + } + return 0, nil, errors.New("finished retry attemps") } -func createWorker(tenant string, proc *processor) *Worker{ - return &Worker{ - batchsize: proc.cfg.Tenant.BatchSize, - timeout: TIMEOUT, - tenant: tenant, - buffer: make([]prompb.TimeSeries, 0, proc.cfg.Tenant.BatchSize), - sender: send_timeseries, - proc: proc, - } +func createWorker(tenant string, proc *processor) *Worker { + return &Worker{ + batchsize: proc.cfg.Tenant.BatchSize, + timeout: TIMEOUT, + tenant: tenant, + buffer: make([]prompb.TimeSeries, 0, proc.cfg.Tenant.BatchSize), + sender: send_timeseries, + proc: proc, + } } func (w *Worker) flush_buffer() { - cpy := make([]prompb.TimeSeries, len(w.buffer)) - copy(cpy, w.buffer) - log.Debugf("flushing batcher for tenant: %s", w.tenant) - go w.sender(w.proc, w.tenant, cpy) - w.buffer = w.buffer[:0] + cpy := make([]prompb.TimeSeries, len(w.buffer)) + copy(cpy, w.buffer) + log.Debugf("flushing batcher for tenant: %s", w.tenant) + go w.sender(w.proc, w.tenant, cpy) + w.buffer = w.buffer[:0] } -func (w *Worker) run(tschan <-chan prompb.TimeSeries){ - // quando le timeseries sono tot o è passato tot tempo le manda - for { - select { - case ts, more := <-tschan: - if !more { - w.flush_buffer() - return - } - w.buffer = append(w.buffer, ts) - if len(w.buffer) == w.batchsize { - w.flush_buffer() - } - case <-time.After(time.Duration(w.timeout) * time.Second): - if len(w.buffer) != 0 { - w.flush_buffer() - } - } - } +func (w *Worker) run(tschan <-chan prompb.TimeSeries) { + // quando le timeseries sono tot o è passato tot tempo le manda + for { + select { + case ts, more := <-tschan: + if !more { + w.flush_buffer() + return + } + w.buffer = append(w.buffer, ts) + if len(w.buffer) == w.batchsize { + w.flush_buffer() + } + case <-time.After(time.Duration(w.timeout) * time.Second): + if len(w.buffer) != 0 { + w.flush_buffer() + } + } + } } // func k8spoller(){ @@ -158,4 +157,4 @@ func (w *Worker) run(tschan <-chan prompb.TimeSeries){ // lancia in background il tenantAdmin // lancia in background server http che lancia handle per ogni chiamata che gli arriva // aspetta per sempre -// } \ No newline at end of file +// } diff --git a/config.go b/config.go index 8b94172..4cbcc00 100644 --- a/config.go +++ b/config.go @@ -12,7 +12,6 @@ import ( "gopkg.in/yaml.v2" ) - func configParse(b []byte) (*config, error) { cfg := &config{} if err := yaml.UnmarshalStrict(b, cfg); err != nil { diff --git a/config_schema.go b/config_schema.go index 25c5730..a6f585b 100644 --- a/config_schema.go +++ b/config_schema.go @@ -17,15 +17,15 @@ type config struct { TimeoutShutdown time.Duration `yaml:"timeout_shutdown"` Tenant struct { - Label string `yaml:"label,omitempty"` - LabelRemove bool `yaml:"label_remove,omitempty"` + Label string `yaml:"label,omitempty"` + LabelRemove bool `yaml:"label_remove,omitempty"` NamespaceLabel string `yaml:"namespace_label,omitempty"` - BatchSize int `yaml:"batch_size,omitempty"` - QueryInterval int `yaml:"query_interval,omitempty"` - Header string - Default string + BatchSize int `yaml:"batch_size,omitempty"` + QueryInterval int `yaml:"query_interval,omitempty"` + Header string + Default string } pipeIn *fhu.InmemoryListener pipeOut *fhu.InmemoryListener -} \ No newline at end of file +} diff --git a/dispatcher.go b/dispatcher.go index 5c307ce..3c882a3 100644 --- a/dispatcher.go +++ b/dispatcher.go @@ -11,23 +11,22 @@ import ( "k8s.io/client-go/rest" ) - type dispatcher struct { clientset *kubernetes.Clientset - nstenant map[string]string // namespace_name: tenant_name - nstschan map[string]chan prompb.TimeSeries + nstenant map[string]string // namespace_name: tenant_name + nstschan map[string]chan prompb.TimeSeries labelName string - interval int - proc *processor + interval int + proc *processor } func newdispatcher(labelName string, interval int, proc *processor) (*dispatcher, error) { k := &dispatcher{ - nstenant: make(map[string]string), - nstschan: make(map[string]chan prompb.TimeSeries), + nstenant: make(map[string]string), + nstschan: make(map[string]chan prompb.TimeSeries), labelName: labelName, - proc: proc, - interval: interval, + proc: proc, + interval: interval, } config, err := rest.InClusterConfig() if err != nil { @@ -41,16 +40,16 @@ func newdispatcher(labelName string, interval int, proc *processor) (*dispatcher return k, nil } - + func (d *dispatcher) updateMap() (err error) { nsList, err := d.clientset.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{}) if err != nil { - return(err) + return (err) } - + for _, ns := range nsList.Items { if ns.ObjectMeta.Labels[d.labelName] != "" { - d.nstenant[ns.ObjectMeta.Name] = ns.ObjectMeta.Labels[d.labelName] + d.nstenant[ns.ObjectMeta.Name] = ns.ObjectMeta.Labels[d.labelName] } else { delete(d.nstenant, ns.ObjectMeta.Name) } @@ -87,4 +86,4 @@ func (d *dispatcher) run() { } d.updateBatchers() } -} \ No newline at end of file +} diff --git a/go.mod b/go.mod index dee9ffb..24d9df9 100644 --- a/go.mod +++ b/go.mod @@ -8,14 +8,17 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.2 github.com/google/uuid v1.1.5 - github.com/hashicorp/go-multierror v1.1.0 + github.com/hashicorp/go-multierror v1.1.1 github.com/klauspost/compress v1.11.7 // indirect github.com/pkg/errors v0.9.1 github.com/prometheus/prometheus v1.8.2-0.20210120113717-82330b96ee74 - github.com/sirupsen/logrus v1.7.0 - github.com/stretchr/testify v1.6.1 + github.com/sirupsen/logrus v1.8.1 + github.com/stretchr/testify v1.7.0 github.com/valyala/fasthttp v1.19.0 - golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect + golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 // indirect + golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect + golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 // indirect + golang.org/x/text v0.3.5 // indirect gopkg.in/yaml.v2 v2.4.0 k8s.io/apimachinery v0.20.1 k8s.io/client-go v0.20.1 diff --git a/go.sum b/go.sum index 13bc541..f2abe97 100644 --- a/go.sum +++ b/go.sum @@ -397,8 +397,9 @@ github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh github.com/hashicorp/go-immutable-radix v1.2.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= -github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI= github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU= github.com/hashicorp/go-rootcerts v1.0.2/go.mod h1:pqUvnprVnM5bf7AOirdbb01K4ccR319Vf4pU3K5EGc8= @@ -657,8 +658,9 @@ github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= -github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= @@ -683,8 +685,9 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= @@ -751,8 +754,9 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191202143827-86a70503ff7e/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20201208171446-5f87f3452ae9 h1:sYNJzB4J8toYPQTM6pAkcmBRgw9SnQKP9oXCHfgy604= golang.org/x/crypto v0.0.0-20201208171446-5f87f3452ae9/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 h1:/ZScEX8SfEmUGRHs0gxpqteO5nfNW6axyZbBdw9A12g= +golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -830,8 +834,9 @@ golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201224014010-6772e930b67b h1:iFwSg7t5GZmB/Q5TjiEAsdoLDrdJRC1RiF2WhuV29Qw= golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -914,8 +919,9 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201223074533-0d417f636930/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 h1:myAQVi0cGEoqQVR5POX+8RR2mrocKqNN1hmeMqhX27k= -golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 h1:hZR0X1kPW+nwyJ9xRxqZk1vx5RUObAPBdKVvXPDUH/E= +golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -924,8 +930,9 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.4 h1:0YWbFKbhXG/wIiuHDSKpS0Iy7FSA+u45VtBMfQcFTTc= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ= +golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/main.go b/main.go index 16de9e8..39a84af 100644 --- a/main.go +++ b/main.go @@ -44,7 +44,7 @@ func main() { log.SetLevel(lvl) } - + var disp *dispatcher proc := newProcessor(*cfg, disp) diff --git a/processor.go b/processor.go index fca0b02..a20b352 100644 --- a/processor.go +++ b/processor.go @@ -26,7 +26,7 @@ type result struct { } type processor struct { - cfg config + cfg config disp *dispatcher srv *fh.Server @@ -40,7 +40,7 @@ type processor struct { func newProcessor(c config, disp *dispatcher) *processor { p := &processor{ cfg: c, - disp: disp, + disp: disp, Logger: logger.NewSimpleLogger("proc"), } @@ -126,7 +126,6 @@ func (p *processor) handle(ctx *fh.RequestCtx) { log.Debugf("incoming timeseries numbers: %d", len(wrReqIn.Timeseries)) - if p.cfg.Tenant.NamespaceLabel != "" { for _, ts := range wrReqIn.Timeseries { tenant, err := p.processTimeseries(&ts) @@ -141,7 +140,7 @@ func (p *processor) handle(ctx *fh.RequestCtx) { p.disp.nstschan[tenant] <- ts } ctx.SetStatusCode(fh.StatusOK) - return + return } clientIP := ctx.RemoteAddr() reqID, _ := uuid.NewRandom() @@ -256,7 +255,7 @@ func (p *processor) processTimeseries(ts *prompb.TimeSeries) (tenant string, err break } } - } else { + } else { idx = 0 for i, l := range ts.Labels { if l.Name == p.cfg.Tenant.Label { @@ -265,7 +264,7 @@ func (p *processor) processTimeseries(ts *prompb.TimeSeries) (tenant string, err } } } - + if tenant == "" { if p.cfg.Tenant.Default == "" { return "", fmt.Errorf("label '%s' not found", p.cfg.Tenant.Label) @@ -275,7 +274,7 @@ func (p *processor) processTimeseries(ts *prompb.TimeSeries) (tenant string, err } // Remove label if label_remove = true - if p.cfg.Tenant.LabelRemove && p.cfg.Tenant.NamespaceLabel == ""{ + if p.cfg.Tenant.LabelRemove && p.cfg.Tenant.NamespaceLabel == "" { l := len(ts.Labels) ts.Labels[idx] = ts.Labels[l-1] ts.Labels = ts.Labels[:l-1]