Skip to content

Commit

Permalink
style: go fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
thobianchi committed May 27, 2021
1 parent 777d352 commit bd7a21d
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 137 deletions.
17 changes: 17 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
repos:
- repo: git://github.com/dnephin/pre-commit-golang
rev: master
hooks:
- id: go-fmt
- id: go-mod-tidy
- id: go-unit-tests
# - id: go-vet
# - id: go-lint
# - id: go-imports
# - id: go-cyclo
# args: [-over=15]
# - id: validate-toml
# - id: no-go-testing
# - id: golangci-lint
# - id: go-critic
# - id: go-build
189 changes: 94 additions & 95 deletions batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ import (
//lint:file-ignore U1000 TDD

const (
TIMEOUT = 30
TIMEOUT = 30
)

type timeseries_sender func(proc *processor, tenant string, timeseries []prompb.TimeSeries) (code int, body []byte, err error)

type Worker struct {
batchsize int
timeout int
tenant string
buffer []prompb.TimeSeries
sender timeseries_sender
proc *processor
batchsize int
timeout int
tenant string
buffer []prompb.TimeSeries
sender timeseries_sender
proc *processor
}

func marshal(wr *prompb.WriteRequest) (bufOut []byte, err error) {
Expand All @@ -50,102 +50,101 @@ func marshal(wr *prompb.WriteRequest) (bufOut []byte, err error) {
return snappy.Encode(nil, b), nil
}


func send_timeseries(proc *processor, tenant string, timeseries []prompb.TimeSeries) (int, []byte, error) {
for c := 0; c < 10; c++ {
var code int
var body []byte
var err error
req := fh.AcquireRequest()
resp := fh.AcquireResponse()

defer func() {
fh.ReleaseRequest(req)
fh.ReleaseResponse(resp)
}()

wr := prompb.WriteRequest{
Timeseries: timeseries,
}

buf, err := marshal(&wr)
if err != nil {
return code, body, err
}

req.Header.SetMethod("POST")
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
req.Header.Set(proc.cfg.Tenant.Header, tenant)
req.SetRequestURI(proc.cfg.Target)
req.SetBody(buf)
if err = proc.cli.DoTimeout(req, resp, proc.cfg.Timeout); err != nil {
log.Warnf("Error on do timeout: %s", err)
return code, body, err
}

code = resp.Header.StatusCode()
body = make([]byte, len(resp.Body()))
copy(body, resp.Body())

if code == 429 {
// resp.Header.VisitAll(func (key, value []byte) {
// log.Printf("Headers: %v: %v", string(key), string(value))
// })
time.Sleep(500)
continue
}
if code != fh.StatusOK {
log.Errorf("Error on senting writerequest to Cortex: code %d Body %s", code, body)
return code, body, err
}
if code == fh.StatusOK {
return code, body, err
}
}
return 0, nil, errors.New("finished retry attemps")
var code int
var body []byte
var err error
req := fh.AcquireRequest()
resp := fh.AcquireResponse()

defer func() {
fh.ReleaseRequest(req)
fh.ReleaseResponse(resp)
}()

wr := prompb.WriteRequest{
Timeseries: timeseries,
}

buf, err := marshal(&wr)
if err != nil {
return code, body, err
}

req.Header.SetMethod("POST")
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
req.Header.Set(proc.cfg.Tenant.Header, tenant)
req.SetRequestURI(proc.cfg.Target)
req.SetBody(buf)
if err = proc.cli.DoTimeout(req, resp, proc.cfg.Timeout); err != nil {
log.Warnf("Error on do timeout: %s", err)
return code, body, err
}

code = resp.Header.StatusCode()
body = make([]byte, len(resp.Body()))
copy(body, resp.Body())

if code == 429 {
// resp.Header.VisitAll(func (key, value []byte) {
// log.Printf("Headers: %v: %v", string(key), string(value))
// })
time.Sleep(500)
continue
}

if code != fh.StatusOK {
log.Errorf("Error on senting writerequest to Cortex: code %d Body %s", code, body)
return code, body, err
}
if code == fh.StatusOK {
return code, body, err
}
}
return 0, nil, errors.New("finished retry attemps")
}

func createWorker(tenant string, proc *processor) *Worker{
return &Worker{
batchsize: proc.cfg.Tenant.BatchSize,
timeout: TIMEOUT,
tenant: tenant,
buffer: make([]prompb.TimeSeries, 0, proc.cfg.Tenant.BatchSize),
sender: send_timeseries,
proc: proc,
}
func createWorker(tenant string, proc *processor) *Worker {
return &Worker{
batchsize: proc.cfg.Tenant.BatchSize,
timeout: TIMEOUT,
tenant: tenant,
buffer: make([]prompb.TimeSeries, 0, proc.cfg.Tenant.BatchSize),
sender: send_timeseries,
proc: proc,
}
}

func (w *Worker) flush_buffer() {
cpy := make([]prompb.TimeSeries, len(w.buffer))
copy(cpy, w.buffer)
log.Debugf("flushing batcher for tenant: %s", w.tenant)
go w.sender(w.proc, w.tenant, cpy)
w.buffer = w.buffer[:0]
cpy := make([]prompb.TimeSeries, len(w.buffer))
copy(cpy, w.buffer)
log.Debugf("flushing batcher for tenant: %s", w.tenant)
go w.sender(w.proc, w.tenant, cpy)
w.buffer = w.buffer[:0]
}

func (w *Worker) run(tschan <-chan prompb.TimeSeries){
// quando le timeseries sono tot o è passato tot tempo le manda
for {
select {
case ts, more := <-tschan:
if !more {
w.flush_buffer()
return
}
w.buffer = append(w.buffer, ts)
if len(w.buffer) == w.batchsize {
w.flush_buffer()
}
case <-time.After(time.Duration(w.timeout) * time.Second):
if len(w.buffer) != 0 {
w.flush_buffer()
}
}
}
func (w *Worker) run(tschan <-chan prompb.TimeSeries) {
// quando le timeseries sono tot o è passato tot tempo le manda
for {
select {
case ts, more := <-tschan:
if !more {
w.flush_buffer()
return
}
w.buffer = append(w.buffer, ts)
if len(w.buffer) == w.batchsize {
w.flush_buffer()
}
case <-time.After(time.Duration(w.timeout) * time.Second):
if len(w.buffer) != 0 {
w.flush_buffer()
}
}
}
}

// func k8spoller(){
Expand All @@ -158,4 +157,4 @@ func (w *Worker) run(tschan <-chan prompb.TimeSeries){
// lancia in background il tenantAdmin
// lancia in background server http che lancia handle per ogni chiamata che gli arriva
// aspetta per sempre
// }
// }
1 change: 0 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"gopkg.in/yaml.v2"
)


func configParse(b []byte) (*config, error) {
cfg := &config{}
if err := yaml.UnmarshalStrict(b, cfg); err != nil {
Expand Down
14 changes: 7 additions & 7 deletions config_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ type config struct {
TimeoutShutdown time.Duration `yaml:"timeout_shutdown"`

Tenant struct {
Label string `yaml:"label,omitempty"`
LabelRemove bool `yaml:"label_remove,omitempty"`
Label string `yaml:"label,omitempty"`
LabelRemove bool `yaml:"label_remove,omitempty"`
NamespaceLabel string `yaml:"namespace_label,omitempty"`
BatchSize int `yaml:"batch_size,omitempty"`
QueryInterval int `yaml:"query_interval,omitempty"`
Header string
Default string
BatchSize int `yaml:"batch_size,omitempty"`
QueryInterval int `yaml:"query_interval,omitempty"`
Header string
Default string
}

pipeIn *fhu.InmemoryListener
pipeOut *fhu.InmemoryListener
}
}
27 changes: 13 additions & 14 deletions dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,22 @@ import (
"k8s.io/client-go/rest"
)


type dispatcher struct {
clientset *kubernetes.Clientset
nstenant map[string]string // namespace_name: tenant_name
nstschan map[string]chan prompb.TimeSeries
nstenant map[string]string // namespace_name: tenant_name
nstschan map[string]chan prompb.TimeSeries
labelName string
interval int
proc *processor
interval int
proc *processor
}

func newdispatcher(labelName string, interval int, proc *processor) (*dispatcher, error) {
k := &dispatcher{
nstenant: make(map[string]string),
nstschan: make(map[string]chan prompb.TimeSeries),
nstenant: make(map[string]string),
nstschan: make(map[string]chan prompb.TimeSeries),
labelName: labelName,
proc: proc,
interval: interval,
proc: proc,
interval: interval,
}
config, err := rest.InClusterConfig()
if err != nil {
Expand All @@ -41,16 +40,16 @@ func newdispatcher(labelName string, interval int, proc *processor) (*dispatcher

return k, nil
}

func (d *dispatcher) updateMap() (err error) {
nsList, err := d.clientset.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{})
if err != nil {
return(err)
return (err)
}

for _, ns := range nsList.Items {
if ns.ObjectMeta.Labels[d.labelName] != "" {
d.nstenant[ns.ObjectMeta.Name] = ns.ObjectMeta.Labels[d.labelName]
d.nstenant[ns.ObjectMeta.Name] = ns.ObjectMeta.Labels[d.labelName]
} else {
delete(d.nstenant, ns.ObjectMeta.Name)
}
Expand Down Expand Up @@ -87,4 +86,4 @@ func (d *dispatcher) run() {
}
d.updateBatchers()
}
}
}
11 changes: 7 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.2
github.com/google/uuid v1.1.5
github.com/hashicorp/go-multierror v1.1.0
github.com/hashicorp/go-multierror v1.1.1
github.com/klauspost/compress v1.11.7 // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/prometheus v1.8.2-0.20210120113717-82330b96ee74
github.com/sirupsen/logrus v1.7.0
github.com/stretchr/testify v1.6.1
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
github.com/valyala/fasthttp v1.19.0
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 // indirect
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 // indirect
golang.org/x/text v0.3.5 // indirect
gopkg.in/yaml.v2 v2.4.0
k8s.io/apimachinery v0.20.1
k8s.io/client-go v0.20.1
Expand Down
Loading

0 comments on commit bd7a21d

Please sign in to comment.