Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
feat(thanos): add new metric to track status codes
  • Loading branch information
ashwanthgoli committed Nov 15, 2024
commit 062710e2fa871659c0fa20f20500ea95d8b202e0
8 changes: 4 additions & 4 deletions pkg/storage/bucket/azure/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"github.com/thanos-io/objstore/providers/azure"
)

func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
return newBucketClient(cfg, name, logger, azure.NewBucketWithConfig)
func NewBucketClient(cfg Config, name string, logger log.Logger, wrapRT func(http.RoundTripper) http.RoundTripper) (objstore.Bucket, error) {
return newBucketClient(cfg, name, logger, wrapRT, azure.NewBucketWithConfig)
}

func newBucketClient(cfg Config, name string, logger log.Logger, factory func(log.Logger, azure.Config, string, func(http.RoundTripper) http.RoundTripper) (*azure.Bucket, error)) (objstore.Bucket, error) {
func newBucketClient(cfg Config, name string, logger log.Logger, wrapRT func(http.RoundTripper) http.RoundTripper, factory func(log.Logger, azure.Config, string, func(http.RoundTripper) http.RoundTripper) (*azure.Bucket, error)) (objstore.Bucket, error) {
// Start with default config to make sure that all parameters are set to sensible values, especially
// HTTP Config field.
bucketConfig := azure.DefaultConfig
Expand All @@ -29,5 +29,5 @@ func newBucketClient(cfg Config, name string, logger log.Logger, factory func(lo
bucketConfig.Endpoint = cfg.Endpoint
}

return factory(logger, bucketConfig, name, nil)
return factory(logger, bucketConfig, name, wrapRT)
}
47 changes: 42 additions & 5 deletions pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/http"
"regexp"
"strconv"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -46,9 +47,24 @@ var (
ErrUnsupportedStorageBackend = errors.New("unsupported storage backend")
ErrInvalidCharactersInStoragePrefix = errors.New("storage prefix contains invalid characters, it may only contain digits and English alphabet letters")

metrics = objstore.BucketMetrics(prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer), "")
metrics *objstore.Metrics

// added to track the status codes by method
bucketRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "loki",
Name: "objstore_bucket_transport_requests_total",
Help: "Total number of HTTP transport requests made to the bucket backend by status code and method.",
},
[]string{"status_code", "method"},
)
)

func init() {
prometheus.MustRegister(bucketRequestsTotal)
metrics = objstore.BucketMetrics(prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer), "")
}

// StorageBackendConfig holds configuration for accessing long-term storage.
type StorageBackendConfig struct {
// Backends
Expand Down Expand Up @@ -176,13 +192,13 @@ func NewClient(ctx context.Context, backend string, cfg Config, name string, log
// TODO: add support for other backends that loki already supports
switch backend {
case S3:
client, err = s3.NewBucketClient(cfg.S3, name, logger)
client, err = s3.NewBucketClient(cfg.S3, name, logger, instrumentTransport())
case GCS:
client, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger)
client, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger, instrumentTransport())
case Azure:
client, err = azure.NewBucketClient(cfg.Azure, name, logger)
client, err = azure.NewBucketClient(cfg.Azure, name, logger, instrumentTransport())
case Swift:
client, err = swift.NewBucketClient(cfg.Swift, name, logger)
client, err = swift.NewBucketClient(cfg.Swift, name, logger, instrumentTransport())
case Filesystem:
client, err = filesystem.NewBucketClient(cfg.Filesystem)
default:
Expand All @@ -209,3 +225,24 @@ func NewClient(ctx context.Context, backend string, cfg Config, name string, log

return instrumentedClient, nil
}

type instrumentedRoundTripper struct {
next http.RoundTripper
}

func instrumentTransport() func(http.RoundTripper) http.RoundTripper {
return func(rt http.RoundTripper) http.RoundTripper {
return &instrumentedRoundTripper{next: rt}
}
}

func (i *instrumentedRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
resp, err := i.next.RoundTrip(req)
if err != nil {
return resp, err
}

// Record status code and method metrics
bucketRequestsTotal.WithLabelValues(strconv.Itoa(resp.StatusCode), req.Method).Inc()
return resp, nil
}
5 changes: 3 additions & 2 deletions pkg/storage/bucket/gcs/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package gcs

import (
"context"
"net/http"

"github.com/go-kit/log"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/gcs"
)

// NewBucketClient creates a new GCS bucket client
func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Logger, wrapRT func(http.RoundTripper) http.RoundTripper) (objstore.Bucket, error) {
// start with default http configs
bucketConfig := gcs.DefaultConfig
bucketConfig.Bucket = cfg.BucketName
Expand All @@ -18,5 +19,5 @@ func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Lo
bucketConfig.MaxRetries = cfg.MaxRetries
bucketConfig.HTTPConfig.Transport = cfg.Transport

return gcs.NewBucketWithConfig(ctx, logger, bucketConfig, name, nil)
return gcs.NewBucketWithConfig(ctx, logger, bucketConfig, name, wrapRT)
}
10 changes: 6 additions & 4 deletions pkg/storage/bucket/s3/bucket_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package s3

import (
"net/http"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore"
Expand All @@ -14,23 +16,23 @@ const (
)

// NewBucketClient creates a new S3 bucket client
func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
func NewBucketClient(cfg Config, name string, logger log.Logger, wrapRT func(http.RoundTripper) http.RoundTripper) (objstore.Bucket, error) {
s3Cfg, err := newS3Config(cfg)
if err != nil {
return nil, err
}

return s3.NewBucketWithConfig(logger, s3Cfg, name, nil)
return s3.NewBucketWithConfig(logger, s3Cfg, name, wrapRT)
}

// NewBucketReaderClient creates a new S3 bucket client
func NewBucketReaderClient(cfg Config, name string, logger log.Logger) (objstore.BucketReader, error) {
func NewBucketReaderClient(cfg Config, name string, logger log.Logger, wrapRT func(http.RoundTripper) http.RoundTripper) (objstore.BucketReader, error) {
s3Cfg, err := newS3Config(cfg)
if err != nil {
return nil, err
}

return s3.NewBucketWithConfig(logger, s3Cfg, name, nil)
return s3.NewBucketWithConfig(logger, s3Cfg, name, wrapRT)
}

func newS3Config(cfg Config) (s3.Config, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bucket/sse_bucket_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestSSEBucketClient_Upload_ShouldInjectCustomSSEConfig(t *testing.T) {
Insecure: true,
}

s3Client, err := s3.NewBucketClient(s3Cfg, "test", log.NewNopLogger())
s3Client, err := s3.NewBucketClient(s3Cfg, "test", log.NewNopLogger(), nil)
require.NoError(t, err)

// Configure the config provider with NO KMS key ID.
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/bucket/swift/bucket_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package swift

import (
"net/http"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore"
Expand All @@ -9,7 +11,7 @@ import (
)

// NewBucketClient creates a new Swift bucket client
func NewBucketClient(cfg Config, _ string, logger log.Logger) (objstore.Bucket, error) {
func NewBucketClient(cfg Config, _ string, logger log.Logger, wrapper func(http.RoundTripper) http.RoundTripper) (objstore.Bucket, error) {
bucketConfig := swift.Config{
AuthVersion: cfg.AuthVersion,
AuthUrl: cfg.AuthURL,
Expand Down Expand Up @@ -37,5 +39,5 @@ func NewBucketClient(cfg Config, _ string, logger log.Logger) (objstore.Bucket,
}
bucketConfig.HTTPConfig.Transport = cfg.Transport

return swift.NewContainerFromConfig(logger, &bucketConfig, false, nil)
return swift.NewContainerFromConfig(logger, &bucketConfig, false, wrapper)
}
Loading