Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
refactor
  • Loading branch information
ashwanthgoli committed Nov 12, 2024
commit 9d6940444acac5db0844bdc06268dbc5bdf689ad
8 changes: 2 additions & 6 deletions pkg/storage/bucket/azure/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,12 @@ func newBucketClient(cfg Config, name string, logger log.Logger, factory func(lo
bucketConfig.ContainerName = cfg.ContainerName
bucketConfig.MaxRetries = cfg.MaxRetries
bucketConfig.UserAssignedID = cfg.UserAssignedID
bucketConfig.HTTPConfig.Transport = cfg.Transport

if cfg.Endpoint != "" {
// azure.DefaultConfig has the default Endpoint, overwrite it only if a different one was explicitly provided.
bucketConfig.Endpoint = cfg.Endpoint
}

return factory(logger, bucketConfig, name, func(rt http.RoundTripper) http.RoundTripper {
if cfg.Transport != nil {
rt = cfg.Transport
}
return rt
})
return factory(logger, bucketConfig, name, nil)
}
60 changes: 49 additions & 11 deletions pkg/storage/bucket/object_client_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@ package bucket

import (
"context"
"fmt"
"io"
"net/http"
"slices"
"strings"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"

"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/aws"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/gcp"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
)

type ObjectClientAdapter struct {
Expand All @@ -21,9 +27,27 @@ type ObjectClientAdapter struct {
isRetryableErr func(err error) bool
}

func NewObjectClientAdapter(bucket, hedgedBucket objstore.Bucket, logger log.Logger, opts ...ClientOptions) *ObjectClientAdapter {
if hedgedBucket == nil {
hedgedBucket = bucket
func NewObjectClient(ctx context.Context, backend string, cfg Config, component string, hedgingCfg hedging.Config, disableRetries bool, logger log.Logger) (*ObjectClientAdapter, error) {
bucket, err := NewClient(ctx, backend, cfg, component, logger)
if err != nil {
return nil, fmt.Errorf("create bucket: %w", err)
}

hedgedBucket := bucket
if hedgingCfg.At != 0 {
hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer))
if err != nil {
return nil, fmt.Errorf("create hedged transport: %w", err)
}

if err := configureTransport(backend, hedgedTrasport, &cfg); err != nil {
return nil, err
}

bucket, err = NewClient(ctx, backend, cfg, component, logger)
if err != nil {
return nil, fmt.Errorf("create hedged bucket: %w", err)
}
}

o := &ObjectClientAdapter{
Expand All @@ -37,19 +61,33 @@ func NewObjectClientAdapter(bucket, hedgedBucket objstore.Bucket, logger log.Log
},
}

for _, opt := range opts {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC we are removing this because we removed the ClientOptions right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. It was being used to apply retry strategy from the provider specific adapters, but that bit is moved to the init func

opt(o)
switch backend {
case GCS:
o.isRetryableErr = gcp.IsRetryableErr
case S3:
o.isRetryableErr = aws.IsRetryableErr
}

return o
return o, nil
}

type ClientOptions func(*ObjectClientAdapter)

func WithRetryableErrFunc(f func(err error) bool) ClientOptions {
return func(o *ObjectClientAdapter) {
o.isRetryableErr = f
func configureTransport(backend string, rt http.RoundTripper, cfg *Config) error {
switch backend {
case S3:
cfg.S3.HTTP.Transport = rt
case GCS:
cfg.GCS.Transport = rt
case Azure:
cfg.Azure.Transport = rt
case Swift:
cfg.Swift.Transport = rt
case Filesystem:
// do nothing
default:
return fmt.Errorf("hedging not supported for backend: %s", backend)
}

return nil
}

func (o *ObjectClientAdapter) Stop() {
Expand Down
10 changes: 8 additions & 2 deletions pkg/storage/bucket/object_client_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"sort"
"testing"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/storage/bucket/filesystem"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
)

func TestObjectClientAdapter_List(t *testing.T) {
Expand Down Expand Up @@ -95,8 +97,12 @@ func TestObjectClientAdapter_List(t *testing.T) {
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/b", buff))
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/c", buff))

client := NewObjectClientAdapter(newBucket, nil, nil)
client.bucket = newBucket
client, err := NewObjectClient(context.Background(), "filesystem", Config{
StorageBackendConfig: StorageBackendConfig{
Filesystem: config,
},
}, "test", hedging.Config{}, false, log.NewNopLogger())
require.NoError(t, err)

storageObj, storageCommonPref, err := client.List(context.Background(), tt.prefix, tt.delimiter)
if tt.wantErr != nil {
Expand Down
13 changes: 4 additions & 9 deletions pkg/storage/bucket/swift/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/exthttp"
"github.com/thanos-io/objstore/providers/swift"
yaml "gopkg.in/yaml.v2"
)

// NewBucketClient creates a new Swift bucket client
Expand Down Expand Up @@ -33,14 +33,9 @@ func NewBucketClient(cfg Config, _ string, logger log.Logger) (objstore.Bucket,
// Hard-coded defaults.
ChunkSize: swift.DefaultConfig.ChunkSize,
UseDynamicLargeObjects: false,
HTTPConfig: exthttp.DefaultHTTPConfig,
}
bucketConfig.HTTPConfig.Transport = cfg.Transport

// Thanos currently doesn't support passing the config as is, but expects a YAML,
// so we're going to serialize it.
serialized, err := yaml.Marshal(bucketConfig)
if err != nil {
return nil, err
}

return swift.NewContainer(logger, serialized, nil)
return swift.NewContainerFromConfig(logger, &bucketConfig, false, nil)
}
4 changes: 4 additions & 0 deletions pkg/storage/bucket/swift/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package swift

import (
"flag"
"net/http"
"time"
)

Expand All @@ -26,6 +27,9 @@ type Config struct {
MaxRetries int `yaml:"max_retries"`
ConnectTimeout time.Duration `yaml:"connect_timeout"`
RequestTimeout time.Duration `yaml:"request_timeout"`

// Allow upstream callers to inject a round tripper
Transport http.RoundTripper `yaml:"-"`
}

// RegisterFlags registers the flags for Swift storage
Expand Down
44 changes: 0 additions & 44 deletions pkg/storage/chunk/client/aws/s3_thanos_object_client.go

This file was deleted.

This file was deleted.

44 changes: 0 additions & 44 deletions pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go

This file was deleted.

19 changes: 5 additions & 14 deletions pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,12 +615,16 @@ func (c *ClientMetrics) Unregister() {

// NewObjectClient makes a new StorageClient with the prefix in the front.
func NewObjectClient(name, component string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) {
if cfg.UseThanosObjstore {
return bucket.NewObjectClient(context.Background(), name, cfg.ObjectStore, component, cfg.Hedging, cfg.CongestionControl.Enabled, util_log.Logger)
}

actual, err := internalNewObjectClient(name, component, cfg, clientMetrics)
if err != nil {
return nil, err
}

if cfg.UseThanosObjstore || cfg.ObjectPrefix == "" {
if cfg.ObjectPrefix == "" {
return actual, nil
} else {
prefix := strings.Trim(cfg.ObjectPrefix, "/") + "/"
Expand Down Expand Up @@ -659,9 +663,6 @@ func internalNewObjectClient(storeName, component string, cfg Config, clientMetr
s3Cfg.BackoffConfig.MaxRetries = 1
}

if cfg.UseThanosObjstore {
return aws.NewS3ThanosObjectClient(context.Background(), cfg.ObjectStore, component, util_log.Logger, cfg.Hedging)
}
return aws.NewS3ObjectClient(s3Cfg, cfg.Hedging)

case types.StorageTypeAlibabaCloud:
Expand Down Expand Up @@ -691,9 +692,6 @@ func internalNewObjectClient(storeName, component string, cfg Config, clientMetr
if cfg.CongestionControl.Enabled {
gcsCfg.EnableRetries = false
}
if cfg.UseThanosObjstore {
return gcp.NewGCSThanosObjectClient(context.Background(), cfg.ObjectStore, component, util_log.Logger, cfg.Hedging)
}
return gcp.NewGCSObjectClient(context.Background(), gcsCfg, cfg.Hedging)

case types.StorageTypeAzure:
Expand All @@ -705,9 +703,6 @@ func internalNewObjectClient(storeName, component string, cfg Config, clientMetr
}
azureCfg = (azure.BlobStorageConfig)(nsCfg)
}
if cfg.UseThanosObjstore {
return azure.NewBlobStorageThanosObjectClient(context.Background(), cfg.ObjectStore, component, util_log.Logger, cfg.Hedging)
}
return azure.NewBlobStorage(&azureCfg, clientMetrics.AzureMetrics, cfg.Hedging)

case types.StorageTypeSwift:
Expand Down Expand Up @@ -757,10 +752,6 @@ func internalNewObjectClient(storeName, component string, cfg Config, clientMetr
return ibmcloud.NewCOSObjectClient(cosCfg, cfg.Hedging)

default:
if cfg.UseThanosObjstore {
return nil, fmt.Errorf("Unrecognized storage client %v, choose one of: %s", storeName, strings.Join(cfg.ObjectStore.SupportedBackends(), ", "))
}

return nil, fmt.Errorf("Unrecognized storage client %v, choose one of: %v, %v, %v, %v, %v, %v, %v, %v, %v", storeName, types.StorageTypeAWS, types.StorageTypeS3, types.StorageTypeGCS, types.StorageTypeAzure, types.StorageTypeAlibabaCloud, types.StorageTypeSwift, types.StorageTypeBOS, types.StorageTypeCOS, types.StorageTypeFileSystem)
}
}