Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'main' into thanos-named-stores
  • Loading branch information
ashwanthgoli committed Nov 12, 2024
commit f35035471f3bb7cf48c8d6d5f9181c195846e017
8 changes: 6 additions & 2 deletions pkg/storage/bucket/azure/bucket_client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package azure

Check failure on line 1 in pkg/storage/bucket/azure/bucket_client.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

: # github.com/grafana/loki/v3/pkg/storage/bucket/azure

import (
"net/http"
Expand All @@ -8,7 +8,11 @@
"github.com/thanos-io/objstore/providers/azure"
)

func NewBucketClient(cfg Config, name string, logger log.Logger, rt http.RoundTripper) (objstore.Bucket, error) {
func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
return newBucketClient(cfg, name, logger, azure.NewBucketWithConfig)
}

func newBucketClient(cfg Config, name string, logger log.Logger, factory func(log.Logger, azure.Config, string, func(http.RoundTripper) http.RoundTripper) (*azure.Bucket, error)) (objstore.Bucket, error) {
// Start with default config to make sure that all parameters are set to sensible values, especially
// HTTP Config field.
bucketConfig := azure.DefaultConfig
Expand All @@ -25,5 +29,5 @@
bucketConfig.Endpoint = cfg.Endpoint
}

return azure.NewBucketWithConfig(logger, bucketConfig, name, rt)
return factory(logger, bucketConfig, name, nil)
}
1 change: 1 addition & 0 deletions pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"flag"
"fmt"
"net/http"
"regexp"

Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/bucket/gcs/bucket_client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package gcs

Check failure on line 1 in pkg/storage/bucket/gcs/bucket_client.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

: # github.com/grafana/loki/v3/pkg/storage/bucket/gcs

import (
"context"
Expand All @@ -16,6 +16,8 @@
bucketConfig.Bucket = cfg.BucketName
bucketConfig.ServiceAccount = cfg.ServiceAccount.String()
bucketConfig.ChunkSizeBytes = cfg.ChunkBufferSize
bucketConfig.MaxRetries = cfg.MaxRetries
bucketConfig.HTTPConfig.Transport = cfg.Transport

return gcs.NewBucketWithConfig(ctx, logger, bucketConfig, name, rt)
}
5 changes: 5 additions & 0 deletions pkg/storage/bucket/gcs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gcs

import (
"flag"
"net/http"

"github.com/grafana/dskit/flagext"
)
Expand All @@ -11,6 +12,10 @@ type Config struct {
BucketName string `yaml:"bucket_name"`
ServiceAccount flagext.Secret `yaml:"service_account" doc:"description_method=GCSServiceAccountLongDescription"`
ChunkBufferSize int `yaml:"chunk_buffer_size"`
MaxRetries int `yaml:"max_retries"`

// Allow upstream callers to inject a round tripper
Transport http.RoundTripper `yaml:"-"`
}

// RegisterFlags registers the flags for GCS storage
Expand Down
16 changes: 13 additions & 3 deletions pkg/storage/bucket/object_client_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type ObjectClientAdapter struct {
isRetryableErr func(err error) bool
}

func NewObjectClient(ctx context.Context, backend string, cfg ConfigWithNamedStores, component string, hedgingCfg hedging.Config, logger log.Logger) (*ObjectClientAdapter, error) {
func NewObjectClient(ctx context.Context, backend string, cfg ConfigWithNamedStores, component string, hedgingCfg hedging.Config, disableRetries bool, logger log.Logger) (*ObjectClientAdapter, error) {
var (
storeType = backend
storeCfg = cfg.Config
Expand All @@ -40,7 +40,13 @@ func NewObjectClient(ctx context.Context, backend string, cfg ConfigWithNamedSto
}
}

bucket, err := NewClient(ctx, storeType, storeCfg, component, logger, nil)
if disableRetries {
if err := cfg.disableRetries(storeType); err != nil {
return nil, fmt.Errorf("create bucket: %w", err)
}
}

bucket, err := NewClient(ctx, storeType, storeCfg, component, logger)
if err != nil {
return nil, fmt.Errorf("create bucket: %w", err)
}
Expand All @@ -52,7 +58,11 @@ func NewObjectClient(ctx context.Context, backend string, cfg ConfigWithNamedSto
return nil, fmt.Errorf("create hedged transport: %w", err)
}

bucket, err = NewClient(ctx, storeType, storeCfg, component, logger, hedgedTrasport)
if err := cfg.configureTransport(storeType, hedgedTrasport); err != nil {
return nil, fmt.Errorf("create hedged bucket: %w", err)
}

hedgedBucket, err = NewClient(ctx, storeType, storeCfg, component, logger)
if err != nil {
return nil, fmt.Errorf("create hedged bucket: %w", err)
}
Expand Down
9 changes: 1 addition & 8 deletions pkg/storage/bucket/swift/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,5 @@ func NewBucketClient(cfg Config, _ string, logger log.Logger, rt http.RoundTripp
}
bucketConfig.HTTPConfig.Transport = cfg.Transport

// Thanos currently doesn't support passing the config as is, but expects a YAML,
// so we're going to serialize it.
serialized, err := yaml.Marshal(bucketConfig)
if err != nil {
return nil, err
}

return swift.NewContainer(logger, serialized, rt)
return swift.NewContainerFromConfig(logger, &bucketConfig, false, nil)
}
2 changes: 1 addition & 1 deletion pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ func (c *ClientMetrics) Unregister() {
// NewObjectClient makes a new StorageClient with the prefix in the front.
func NewObjectClient(name, component string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) {
if cfg.UseThanosObjstore {
return bucket.NewObjectClient(context.Background(), name, cfg.ObjectStore, component, cfg.Hedging, util_log.Logger)
return bucket.NewObjectClient(context.Background(), name, cfg.ObjectStore, component, cfg.Hedging, cfg.CongestionControl.Enabled, util_log.Logger)
}

actual, err := internalNewObjectClient(name, component, cfg, clientMetrics)
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.