Skip to content

Commit 947a66f

Browse files
authored
feat(thanos): disable retries when congestion control is enabled (#14867)
1 parent ac2e21f commit 947a66f

20 files changed

+125
-182
lines changed

‎go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ require (
137137
github.com/richardartoul/molecule v1.0.0
138138
github.com/schollz/progressbar/v3 v3.17.0
139139
github.com/shirou/gopsutil/v4 v4.24.10
140-
github.com/thanos-io/objstore v0.0.0-20241105144332-b598dceacb13
140+
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97
141141
github.com/twmb/franz-go v1.17.1
142142
github.com/twmb/franz-go/pkg/kadm v1.13.0
143143
github.com/twmb/franz-go/pkg/kfake v0.0.0-20241015013301-cea7aa5d8037

‎go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -2585,8 +2585,8 @@ github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.480/go.mod
25852585
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/cvm v1.0.480/go.mod h1:zaBIuDDs+rC74X8Aog+LSu91GFtHYRYDC196RGTm2jk=
25862586
github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM=
25872587
github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw=
2588-
github.com/thanos-io/objstore v0.0.0-20241105144332-b598dceacb13 h1:PQd6xZs18KGoCZJgL9eyYsrRGzzRwYCr4iXuehZm++w=
2589-
github.com/thanos-io/objstore v0.0.0-20241105144332-b598dceacb13/go.mod h1:/ZMUxFcp/nT6oYV5WslH9k07NU/+86+aibgZRmMMr/4=
2588+
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1DkncwDHFvrpd12/2TLfgYNRmEQA48ikp+0=
2589+
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw=
25902590
github.com/tidwall/gjson v1.6.0/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
25912591
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
25922592
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=

‎pkg/storage/bucket/azure/bucket_client.go

+2-6
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,12 @@ func newBucketClient(cfg Config, name string, logger log.Logger, factory func(lo
2222
bucketConfig.ContainerName = cfg.ContainerName
2323
bucketConfig.MaxRetries = cfg.MaxRetries
2424
bucketConfig.UserAssignedID = cfg.UserAssignedID
25+
bucketConfig.HTTPConfig.Transport = cfg.Transport
2526

2627
if cfg.Endpoint != "" {
2728
// azure.DefaultConfig has the default Endpoint, overwrite it only if a different one was explicitly provided.
2829
bucketConfig.Endpoint = cfg.Endpoint
2930
}
3031

31-
return factory(logger, bucketConfig, name, func(rt http.RoundTripper) http.RoundTripper {
32-
if cfg.Transport != nil {
33-
rt = cfg.Transport
34-
}
35-
return rt
36-
})
32+
return factory(logger, bucketConfig, name, nil)
3733
}

‎pkg/storage/bucket/client.go

+40
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"errors"
66
"flag"
7+
"fmt"
8+
"net/http"
79
"regexp"
810

911
"github.com/go-kit/log"
@@ -126,6 +128,44 @@ func (cfg *Config) Validate() error {
126128
return cfg.StorageBackendConfig.Validate()
127129
}
128130

131+
func (cfg *Config) disableRetries(backend string) error {
132+
switch backend {
133+
case S3:
134+
cfg.S3.MaxRetries = 1
135+
case GCS:
136+
cfg.GCS.MaxRetries = 1
137+
case Azure:
138+
cfg.Azure.MaxRetries = 1
139+
case Swift:
140+
cfg.Swift.MaxRetries = 1
141+
case Filesystem:
142+
// do nothing
143+
default:
144+
return fmt.Errorf("cannot disable retries for backend: %s", backend)
145+
}
146+
147+
return nil
148+
}
149+
150+
func (cfg *Config) configureTransport(backend string, rt http.RoundTripper) error {
151+
switch backend {
152+
case S3:
153+
cfg.S3.HTTP.Transport = rt
154+
case GCS:
155+
cfg.GCS.Transport = rt
156+
case Azure:
157+
cfg.Azure.Transport = rt
158+
case Swift:
159+
cfg.Swift.Transport = rt
160+
case Filesystem:
161+
// do nothing
162+
default:
163+
return fmt.Errorf("cannot configure transport for backend: %s", backend)
164+
}
165+
166+
return nil
167+
}
168+
129169
// NewClient creates a new bucket client based on the configured backend
130170
func NewClient(ctx context.Context, backend string, cfg Config, name string, logger log.Logger) (objstore.InstrumentedBucket, error) {
131171
var (

‎pkg/storage/bucket/gcs/bucket_client.go

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Lo
1515
bucketConfig.Bucket = cfg.BucketName
1616
bucketConfig.ServiceAccount = cfg.ServiceAccount.String()
1717
bucketConfig.ChunkSizeBytes = cfg.ChunkBufferSize
18+
bucketConfig.MaxRetries = cfg.MaxRetries
1819
bucketConfig.HTTPConfig.Transport = cfg.Transport
1920

2021
return gcs.NewBucketWithConfig(ctx, logger, bucketConfig, name, nil)

‎pkg/storage/bucket/gcs/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type Config struct {
1212
BucketName string `yaml:"bucket_name"`
1313
ServiceAccount flagext.Secret `yaml:"service_account" doc:"description_method=GCSServiceAccountLongDescription"`
1414
ChunkBufferSize int `yaml:"chunk_buffer_size"`
15+
MaxRetries int `yaml:"max_retries"`
1516

1617
// Allow upstream callers to inject a round tripper
1718
Transport http.RoundTripper `yaml:"-"`
@@ -27,6 +28,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
2728
f.StringVar(&cfg.BucketName, prefix+"gcs.bucket-name", "", "GCS bucket name")
2829
f.Var(&cfg.ServiceAccount, prefix+"gcs.service-account", cfg.GCSServiceAccountShortDescription())
2930
f.IntVar(&cfg.ChunkBufferSize, prefix+"gcs.chunk-buffer-size", 0, "The maximum size of the buffer that GCS client for a single PUT request. 0 to disable buffering.")
31+
f.IntVar(&cfg.MaxRetries, prefix+"gcs.max-retries", 10, "The maximum number of retries for idempotent operations. Overrides the default gcs storage client behavior if this value is greater than 0. Set this to 1 to disable retries.")
3032
}
3133

3234
func (cfg *Config) GCSServiceAccountShortDescription() string {

‎pkg/storage/bucket/object_client_adapter.go

+38-14
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,21 @@ package bucket
22

33
import (
44
"context"
5+
"fmt"
56
"io"
67
"slices"
78
"strings"
89

910
"github.com/go-kit/log"
1011
"github.com/go-kit/log/level"
1112
"github.com/pkg/errors"
13+
"github.com/prometheus/client_golang/prometheus"
1214
"github.com/thanos-io/objstore"
1315

1416
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
17+
"github.com/grafana/loki/v3/pkg/storage/chunk/client/aws"
18+
"github.com/grafana/loki/v3/pkg/storage/chunk/client/gcp"
19+
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
1520
)
1621

1722
type ObjectClientAdapter struct {
@@ -21,9 +26,33 @@ type ObjectClientAdapter struct {
2126
isRetryableErr func(err error) bool
2227
}
2328

24-
func NewObjectClientAdapter(bucket, hedgedBucket objstore.Bucket, logger log.Logger, opts ...ClientOptions) *ObjectClientAdapter {
25-
if hedgedBucket == nil {
26-
hedgedBucket = bucket
29+
func NewObjectClient(ctx context.Context, backend string, cfg Config, component string, hedgingCfg hedging.Config, disableRetries bool, logger log.Logger) (*ObjectClientAdapter, error) {
30+
if disableRetries {
31+
if err := cfg.disableRetries(backend); err != nil {
32+
return nil, fmt.Errorf("create bucket: %w", err)
33+
}
34+
}
35+
36+
bucket, err := NewClient(ctx, backend, cfg, component, logger)
37+
if err != nil {
38+
return nil, fmt.Errorf("create bucket: %w", err)
39+
}
40+
41+
hedgedBucket := bucket
42+
if hedgingCfg.At != 0 {
43+
hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer))
44+
if err != nil {
45+
return nil, fmt.Errorf("create hedged transport: %w", err)
46+
}
47+
48+
if err := cfg.configureTransport(backend, hedgedTrasport); err != nil {
49+
return nil, fmt.Errorf("create hedged bucket: %w", err)
50+
}
51+
52+
hedgedBucket, err = NewClient(ctx, backend, cfg, component, logger)
53+
if err != nil {
54+
return nil, fmt.Errorf("create hedged bucket: %w", err)
55+
}
2756
}
2857

2958
o := &ObjectClientAdapter{
@@ -37,19 +66,14 @@ func NewObjectClientAdapter(bucket, hedgedBucket objstore.Bucket, logger log.Log
3766
},
3867
}
3968

40-
for _, opt := range opts {
41-
opt(o)
69+
switch backend {
70+
case GCS:
71+
o.isRetryableErr = gcp.IsRetryableErr
72+
case S3:
73+
o.isRetryableErr = aws.IsRetryableErr
4274
}
4375

44-
return o
45-
}
46-
47-
type ClientOptions func(*ObjectClientAdapter)
48-
49-
func WithRetryableErrFunc(f func(err error) bool) ClientOptions {
50-
return func(o *ObjectClientAdapter) {
51-
o.isRetryableErr = f
52-
}
76+
return o, nil
5377
}
5478

5579
func (o *ObjectClientAdapter) Stop() {

‎pkg/storage/bucket/object_client_adapter_test.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ import (
66
"sort"
77
"testing"
88

9+
"github.com/go-kit/log"
910
"github.com/stretchr/testify/require"
1011

1112
"github.com/grafana/loki/v3/pkg/storage/bucket/filesystem"
1213
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
14+
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
1315
)
1416

1517
func TestObjectClientAdapter_List(t *testing.T) {
@@ -95,8 +97,12 @@ func TestObjectClientAdapter_List(t *testing.T) {
9597
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/b", buff))
9698
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/c", buff))
9799

98-
client := NewObjectClientAdapter(newBucket, nil, nil)
99-
client.bucket = newBucket
100+
client, err := NewObjectClient(context.Background(), "filesystem", Config{
101+
StorageBackendConfig: StorageBackendConfig{
102+
Filesystem: config,
103+
},
104+
}, "test", hedging.Config{}, false, log.NewNopLogger())
105+
require.NoError(t, err)
100106

101107
storageObj, storageCommonPref, err := client.List(context.Background(), tt.prefix, tt.delimiter)
102108
if tt.wantErr != nil {

‎pkg/storage/bucket/s3/bucket_client.go

+1
Original file line numberDiff line numberDiff line change
@@ -82,5 +82,6 @@ func newS3Config(cfg Config) (s3.Config, error) {
8282
Enable: cfg.TraceConfig.Enabled,
8383
},
8484
STSEndpoint: cfg.STSEndpoint,
85+
MaxRetries: cfg.MaxRetries,
8586
}, nil
8687
}

‎pkg/storage/bucket/s3/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ type Config struct {
118118
PartSize uint64 `yaml:"part_size" category:"experimental"`
119119
SendContentMd5 bool `yaml:"send_content_md5" category:"experimental"`
120120
STSEndpoint string `yaml:"sts_endpoint"`
121+
MaxRetries int `yaml:"max_retries"`
121122

122123
SSE SSEConfig `yaml:"sse"`
123124
HTTP HTTPConfig `yaml:"http"`
@@ -146,6 +147,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
146147
f.Var(newBucketLookupTypeValue(s3.AutoLookup, &cfg.BucketLookupType), prefix+"s3.bucket-lookup-type", fmt.Sprintf("Bucket lookup style type, used to access bucket in S3-compatible service. Default is auto. Supported values are: %s.", strings.Join(supportedBucketLookupTypes, ", ")))
147148
f.BoolVar(&cfg.DualstackEnabled, prefix+"s3.dualstack-enabled", true, "When enabled, direct all AWS S3 requests to the dual-stack IPv4/IPv6 endpoint for the configured region.")
148149
f.StringVar(&cfg.STSEndpoint, prefix+"s3.sts-endpoint", "", "Accessing S3 resources using temporary, secure credentials provided by AWS Security Token Service.")
150+
f.IntVar(&cfg.MaxRetries, prefix+"s3.max-retries", 10, "The maximum number of retries for S3 requests that are retryable. Default is 10, set this to 1 to disable retries.")
149151
cfg.SSE.RegisterFlagsWithPrefix(prefix+"s3.sse.", f)
150152
cfg.HTTP.RegisterFlagsWithPrefix(prefix, f)
151153
cfg.TraceConfig.RegisterFlagsWithPrefix(prefix+"s3.trace.", f)

‎pkg/storage/bucket/swift/bucket_client.go

+4-9
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import (
44
"github.com/go-kit/log"
55
"github.com/prometheus/common/model"
66
"github.com/thanos-io/objstore"
7+
"github.com/thanos-io/objstore/exthttp"
78
"github.com/thanos-io/objstore/providers/swift"
8-
yaml "gopkg.in/yaml.v2"
99
)
1010

1111
// NewBucketClient creates a new Swift bucket client
@@ -33,14 +33,9 @@ func NewBucketClient(cfg Config, _ string, logger log.Logger) (objstore.Bucket,
3333
// Hard-coded defaults.
3434
ChunkSize: swift.DefaultConfig.ChunkSize,
3535
UseDynamicLargeObjects: false,
36+
HTTPConfig: exthttp.DefaultHTTPConfig,
3637
}
38+
bucketConfig.HTTPConfig.Transport = cfg.Transport
3739

38-
// Thanos currently doesn't support passing the config as is, but expects a YAML,
39-
// so we're going to serialize it.
40-
serialized, err := yaml.Marshal(bucketConfig)
41-
if err != nil {
42-
return nil, err
43-
}
44-
45-
return swift.NewContainer(logger, serialized, nil)
40+
return swift.NewContainerFromConfig(logger, &bucketConfig, false, nil)
4641
}

‎pkg/storage/bucket/swift/config.go

+4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package swift
22

33
import (
44
"flag"
5+
"net/http"
56
"time"
67
)
78

@@ -26,6 +27,9 @@ type Config struct {
2627
MaxRetries int `yaml:"max_retries"`
2728
ConnectTimeout time.Duration `yaml:"connect_timeout"`
2829
RequestTimeout time.Duration `yaml:"request_timeout"`
30+
31+
// Allow upstream callers to inject a round tripper
32+
Transport http.RoundTripper `yaml:"-"`
2933
}
3034

3135
// RegisterFlags registers the flags for Swift storage

‎pkg/storage/chunk/client/aws/s3_thanos_object_client.go

-44
This file was deleted.

‎pkg/storage/chunk/client/azure/blob_storage_thanos_object_client.go

-44
This file was deleted.

0 commit comments

Comments
 (0)