Skip to content

Commit

Permalink
[Fix] GRPC rate limit and add exponential backoff for CD (#2125)
Browse files Browse the repository at this point in the history
* update rate limit to use new method

Signed-off-by: pxp928 <parth.psu@gmail.com>

* udpate grpc rate limit package

Signed-off-by: pxp928 <parth.psu@gmail.com>

* udpate grpc rate limit unit tests

Signed-off-by: pxp928 <parth.psu@gmail.com>

* update deps.dev unit test

Signed-off-by: pxp928 <parth.psu@gmail.com>

* update deps.dev to check already queried map

Signed-off-by: pxp928 <parth.psu@gmail.com>

* update grpc rate limit unit test

Signed-off-by: pxp928 <parth.psu@gmail.com>

* add expoential backoff for CD

Signed-off-by: pxp928 <parth.psu@gmail.com>

* update deps.dev unit test

Signed-off-by: pxp928 <parth.psu@gmail.com>

---------

Signed-off-by: pxp928 <parth.psu@gmail.com>
  • Loading branch information
pxp928 committed Sep 11, 2024
1 parent 2b018e2 commit 9c7f881
Show file tree
Hide file tree
Showing 13 changed files with 249 additions and 405 deletions.
2 changes: 1 addition & 1 deletion cmd/guaccollect/cmd/deps_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ you have access to read and write to the respective blob store.`,
os.Exit(1)
}
// Register collector
depsDevCollector, err := deps_dev.NewDepsCollector(ctx, opts.dataSource, opts.poll, opts.retrieveDependencies, 30*time.Second, opts.addedLatency, nil)
depsDevCollector, err := deps_dev.NewDepsCollector(ctx, opts.dataSource, opts.poll, opts.retrieveDependencies, 30*time.Second, opts.addedLatency)
if err != nil {
logger.Fatalf("unable to register oci collector: %v", err)
}
Expand Down
1 change: 0 additions & 1 deletion cmd/guaccollect/cmd/osv.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ func initializeNATsandCertifier(ctx context.Context, blobAddr, pubsubAddr string
// Collect
errHandler := func(err error) bool {
if err == nil {
logger.Info("certifier ended gracefully")
return true
}
logger.Errorf("certifier ended with error: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/guacone/cmd/deps_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var depsDevCmd = &cobra.Command{
transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport)

// Register collector
depsDevCollector, err := deps_dev.NewDepsCollector(ctx, opts.dataSource, opts.poll, opts.retrieveDependencies, 30*time.Second, opts.addedLatency, nil)
depsDevCollector, err := deps_dev.NewDepsCollector(ctx, opts.dataSource, opts.poll, opts.retrieveDependencies, 30*time.Second, opts.addedLatency)
if err != nil {
logger.Fatalf("unable to register depsdev collector: %v", err)
}
Expand Down
1 change: 0 additions & 1 deletion cmd/guacone/cmd/osv.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ var osvCmd = &cobra.Command{
// Collect
errHandler := func(err error) bool {
if err == nil {
logger.Info("certifier ended gracefully")
return true
}
logger.Errorf("certifier ended with error: %v", err)
Expand Down
1 change: 0 additions & 1 deletion cmd/guacone/cmd/scorecard.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ var scorecardCmd = &cobra.Command{
// Collect
errHandler := func(err error) bool {
if err == nil {
logger.Info("certifier ended gracefully")
return true
}
logger.Errorf("certifier ended with error: %v", err)
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 // indirect
github.com/aws/smithy-go v1.20.4 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bombsimon/logrusr/v2 v2.0.1 // indirect
github.com/bradleyfalzon/ghinstallation/v2 v2.8.0 // indirect
Expand Down Expand Up @@ -332,6 +333,7 @@ require (
github.com/tikv/client-go/v2 v2.0.8-0.20231115083414-7c96dfd783fb
github.com/vektah/gqlparser/v2 v2.5.16
go.uber.org/mock v0.4.0
go.uber.org/ratelimit v0.3.1
gocloud.dev v0.39.0
gocloud.dev/pubsub/kafkapubsub v0.37.0
gocloud.dev/pubsub/rabbitpubsub v0.39.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.30.5/go.mod h1:vmSqFK+BVIwVpDAGZB3Co
github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4=
github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w=
Expand Down Expand Up @@ -872,6 +874,8 @@ go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9i
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0=
go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk=
go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
Expand Down
7 changes: 4 additions & 3 deletions pkg/certifier/certify/certify.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func Certify(ctx context.Context, query certifier.QueryComponents, emitter certi
}

go func() {
wrappedOperation := retryWithBackoff(backoffOperation)
wrappedOperation := retryWithBackoff(ctx, backoffOperation)
errChan <- wrappedOperation()
}()

Expand Down Expand Up @@ -191,7 +191,8 @@ func generateDocuments(ctx context.Context, collectedComponent interface{}, emit
type retryFunc func() error

// retryWithBackoff retries the given operation with exponential backoff
func retryWithBackoff(operation retryFunc) retryFunc {
func retryWithBackoff(ctx context.Context, operation retryFunc) retryFunc {
logger := logging.FromContext(ctx)
return func() error {
var lastError error
var urlErr *url.Error
Expand All @@ -203,7 +204,7 @@ func retryWithBackoff(operation retryFunc) retryFunc {
}
if errors.As(err, &urlErr) {
secRetry := math.Pow(2, float64(i))
fmt.Printf("Retrying operation in %f seconds\n", secRetry)
logger.Infof("Retrying operation in %f seconds\n", secRetry)
delay := time.Duration(secRetry) * baseDelay
time.Sleep(delay)
lastError = err
Expand Down
68 changes: 45 additions & 23 deletions pkg/certifier/clearlydefined/clearlydefined.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"io"
"math"
"net/http"
"strings"
"time"
Expand Down Expand Up @@ -49,6 +50,8 @@ var rateLimitInterval = 30 * time.Second
const (
PRODUCER_ID string = "guacsec/guac"
CDCollector string = "clearlydefined"
maxRetries = 10
baseDelay = 1 * time.Second
)

var ErrComponentTypeMismatch error = errors.New("rootComponent type is not []*root_package.PackageNode")
Expand All @@ -72,7 +75,7 @@ func NewClearlyDefinedHTTPClient(limiter *rate.Limiter) *http.Client {
}

// getDefinitions uses the coordinates to query clearly defined for license definition
func getDefinitions(_ context.Context, client *http.Client, purls []string, coordinates []string) (map[string]*attestation.Definition, error) {
func getDefinitions(ctx context.Context, client *http.Client, purls []string, coordinates []string) (map[string]*attestation.Definition, error) {

coordinateToPurl := make(map[string]string)
for i, purl := range purls {
Expand All @@ -87,30 +90,17 @@ func getDefinitions(_ context.Context, client *http.Client, purls []string, coor
return nil, fmt.Errorf("error marshalling coordinates: %w", err)
}

// retries if a 429 is encountered. This could occur even with the rate limiting
// as multiple services may be hitting it.
var resp *http.Response
maxRetries := 5
for retries := 0; retries < maxRetries; retries++ {
// Make the POST request
resp, err = client.Post("https://api.clearlydefined.io/definitions", "application/json", bytes.NewBuffer(jsonData))
if err != nil {
return nil, fmt.Errorf("error making POST request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
if resp.StatusCode != http.StatusTooManyRequests {
// otherwise return an error
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
} else {
break
}
// retries if a 429 is encountered
backoffOperation := func() (*http.Response, error) {
return client.Post("https://api.clearlydefined.io/definitions", "application/json", bytes.NewBuffer(jsonData))
}

// Retry after a delay if status code is 429
time.Sleep(5 * time.Second)
wrappedOperation := retryWithBackoff(ctx, backoffOperation)
resp, err := wrappedOperation()
if err != nil {
return nil, fmt.Errorf("clearly defined POST request failed with error: %w", err)
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
Expand Down Expand Up @@ -300,3 +290,35 @@ func createAttestation(purl string, definition *attestation.Definition, currentT

return attestation
}

// retryFunc is a function that can be retried
type retryFunc func() (*http.Response, error)

// retryWithBackoff retries the given operation with exponential backoff
func retryWithBackoff(ctx context.Context, operation retryFunc) retryFunc {
logger := logging.FromContext(ctx)
return func() (*http.Response, error) {
var collectedResp *http.Response
for i := 0; i < maxRetries; i++ {
resp, err := operation()
if err != nil {
return nil, fmt.Errorf("error making POST request: %w", err)
}
if resp.StatusCode != http.StatusOK {
if resp.StatusCode != http.StatusTooManyRequests {
// otherwise return an error
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
} else {
secRetry := math.Pow(2, float64(i))
logger.Infof("Retrying operation in %f seconds\n", secRetry)
delay := time.Duration(secRetry) * baseDelay
time.Sleep(delay)
}
} else {
collectedResp = resp
break
}
}
return collectedResp, nil
}
}
68 changes: 23 additions & 45 deletions pkg/clients/grpcRateLimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,61 +18,39 @@ package clients
import (
"context"

grpc_ratelimit "github.com/grpc-ecosystem/go-grpc-middleware/ratelimit"
"github.com/guacsec/guac/pkg/logging"

"golang.org/x/time/rate"
"go.uber.org/ratelimit"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// RateLimitedClient is a wrapper around grpc.ClientConn that adds rate limiting
// functionality to gRPC calls. It uses a rate.Limiter to control the rate of
// outgoing requests.
type RateLimitedClient struct {
ClientConn *grpc.ClientConn
Limiter *rate.Limiter
type limiter struct {
ratelimit.Limiter
}

// Invoke performs a gRPC call on the wrapped grpc.ClientConn, applying
// rate limiting before making the call. If the rate limit is exceeded, it waits
// until the limiter allows the request.
func (c *RateLimitedClient) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
logger := logging.FromContext(ctx)
if !c.Limiter.Allow() {
logger.Debugf("Rate limit exceeded for method: %s", method)
if err := c.Limiter.Wait(ctx); err != nil {
return err
}
}
return c.ClientConn.Invoke(ctx, method, args, reply, opts...)
// Limit blocks to ensure that RPS is met
func (l *limiter) Limit() bool {
l.Take()
return false
}

// NewStream creates a new stream on the wrapped grpc.ClientConn, applying rate
// limiting before creating the stream. If the rate limit is exceeded, it waits
// until the limiter allows the request.
func (c *RateLimitedClient) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
logger := logging.FromContext(ctx)
if !c.Limiter.Allow() {
logger.Debugf("Rate limit exceeded for method: %s", method)
if err := c.Limiter.Wait(ctx); err != nil {
return nil, err
}
// NewLimiter return new go-grpc Limiter, specified the number of requests you want to limit as a counts per second.
func NewLimiter(count int) grpc_ratelimit.Limiter {
return &limiter{
Limiter: ratelimit.New(count),
}
return c.ClientConn.NewStream(ctx, desc, method, opts...)
}

// NewRateLimitedClient creates a new RateLimitedClient that wraps the provided
// grpc.ClientConn and uses the provided rate.Limiter to control the rate of
// outgoing requests. It returns a grpc.ClientConnInterface that can be used
// wherever a grpc.ClientConn is expected.
//
// Parameters:
// - conn: The underlying grpc.ClientConn to wrap. This is typically an instance
// of grpc.ClientConn created using grpc.NewClient or any custom implementation of
// grpc.ClientConnInterface.
// - limiter: The rate.Limiter to use for controlling the rate of outgoing requests.
func NewRateLimitedClient(conn *grpc.ClientConn, limiter *rate.Limiter) grpc.ClientConnInterface {
return &RateLimitedClient{
ClientConn: conn,
Limiter: limiter,
// UnaryClientInterceptor return server unary interceptor that limit requests.
func UnaryClientInterceptor(limiter grpc_ratelimit.Limiter) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
logger := logging.FromContext(ctx)
if limiter.Limit() {
logger.Infof("Rate limit exceeded for method: %s", method)
return status.Errorf(codes.ResourceExhausted, "%s have been rejected by rate limiting.", method)
}
return invoker(ctx, method, req, reply, cc, opts...)
}
}
Loading

0 comments on commit 9c7f881

Please sign in to comment.