Skip to content

Commit cfc3819

Browse files
feat(thanos): make use of the new function IterWithAttributes (#14793)
1 parent ab2721d commit cfc3819

File tree

19 files changed

+414
-59
lines changed

19 files changed

+414
-59
lines changed

‎go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ require (
137137
github.com/richardartoul/molecule v1.0.0
138138
github.com/schollz/progressbar/v3 v3.17.0
139139
github.com/shirou/gopsutil/v4 v4.24.10
140-
github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d
140+
github.com/thanos-io/objstore v0.0.0-20241105144332-b598dceacb13
141141
github.com/twmb/franz-go v1.17.1
142142
github.com/twmb/franz-go/pkg/kadm v1.13.0
143143
github.com/twmb/franz-go/pkg/kfake v0.0.0-20241015013301-cea7aa5d8037

‎go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -2585,8 +2585,8 @@ github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.480/go.mod
25852585
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/cvm v1.0.480/go.mod h1:zaBIuDDs+rC74X8Aog+LSu91GFtHYRYDC196RGTm2jk=
25862586
github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM=
25872587
github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw=
2588-
github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d h1:k+SLTP1mjNqXxsCiq4UYeKCe07le0ieffyuHm/YfmH8=
2589-
github.com/thanos-io/objstore v0.0.0-20241028150459-cfdd0e50390d/go.mod h1:/ZMUxFcp/nT6oYV5WslH9k07NU/+86+aibgZRmMMr/4=
2588+
github.com/thanos-io/objstore v0.0.0-20241105144332-b598dceacb13 h1:PQd6xZs18KGoCZJgL9eyYsrRGzzRwYCr4iXuehZm++w=
2589+
github.com/thanos-io/objstore v0.0.0-20241105144332-b598dceacb13/go.mod h1:/ZMUxFcp/nT6oYV5WslH9k07NU/+86+aibgZRmMMr/4=
25902590
github.com/tidwall/gjson v1.6.0/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
25912591
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
25922592
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=

‎pkg/ruler/rulestore/bucketclient/bucket_client.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func (b *BucketRuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rul
119119
Name: group,
120120
})
121121
return nil
122-
}, objstore.WithRecursiveIter)
122+
}, objstore.WithRecursiveIter())
123123

124124
if err != nil {
125125
return nil, err
@@ -156,7 +156,7 @@ func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context,
156156
Name: group,
157157
})
158158
return nil
159-
}, objstore.WithRecursiveIter)
159+
}, objstore.WithRecursiveIter())
160160
if err != nil {
161161
return nil, err
162162
}

‎pkg/storage/bucket/object_client_adapter.go

+26-10
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package bucket
33
import (
44
"context"
55
"io"
6+
"slices"
67
"strings"
78

89
"github.com/go-kit/log"
@@ -16,6 +17,7 @@ import (
1617
type ObjectClientAdapter struct {
1718
bucket, hedgedBucket objstore.Bucket
1819
logger log.Logger
20+
supportsUpdatedAt bool
1921
isRetryableErr func(err error) bool
2022
}
2123

@@ -25,9 +27,10 @@ func NewObjectClientAdapter(bucket, hedgedBucket objstore.Bucket, logger log.Log
2527
}
2628

2729
o := &ObjectClientAdapter{
28-
bucket: bucket,
29-
hedgedBucket: hedgedBucket,
30-
logger: log.With(logger, "component", "bucket_to_object_client_adapter"),
30+
bucket: bucket,
31+
hedgedBucket: hedgedBucket,
32+
logger: log.With(logger, "component", "bucket_to_object_client_adapter"),
33+
supportsUpdatedAt: slices.Contains(bucket.SupportedIterOptions(), objstore.UpdatedAt),
3134
// default to no retryable errors. Override with WithRetryableErrFunc
3235
isRetryableErr: func(_ error) bool {
3336
return false
@@ -103,26 +106,39 @@ func (o *ObjectClientAdapter) List(ctx context.Context, prefix, delimiter string
103106

104107
// If delimiter is empty we want to list all files
105108
if delimiter == "" {
106-
iterParams = append(iterParams, objstore.WithRecursiveIter)
109+
iterParams = append(iterParams, objstore.WithRecursiveIter())
107110
}
108111

109-
err := o.bucket.Iter(ctx, prefix, func(objectKey string) error {
112+
if o.supportsUpdatedAt {
113+
iterParams = append(iterParams, objstore.WithUpdatedAt())
114+
}
115+
116+
err := o.bucket.IterWithAttributes(ctx, prefix, func(attrs objstore.IterObjectAttributes) error {
110117
// CommonPrefixes are keys that have the prefix and have the delimiter
111118
// as a suffix
119+
objectKey := attrs.Name
112120
if delimiter != "" && strings.HasSuffix(objectKey, delimiter) {
113121
commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(objectKey))
114122
return nil
115123
}
116124

117-
// TODO: remove this once thanos support IterWithAttributes
118-
attr, err := o.bucket.Attributes(ctx, objectKey)
119-
if err != nil {
120-
return errors.Wrapf(err, "failed to get attributes for %s", objectKey)
125+
lastModified, ok := attrs.LastModified()
126+
if o.supportsUpdatedAt && !ok {
127+
return errors.Errorf("failed to get lastModified for %s", objectKey)
128+
}
129+
// Some providers do not support supports UpdatedAt option. For those we need
130+
// to make an additional request to get the last modified time.
131+
if !o.supportsUpdatedAt {
132+
attr, err := o.bucket.Attributes(ctx, objectKey)
133+
if err != nil {
134+
return errors.Wrapf(err, "failed to get attributes for %s", objectKey)
135+
}
136+
lastModified = attr.LastModified
121137
}
122138

123139
storageObjects = append(storageObjects, client.StorageObject{
124140
Key: objectKey,
125-
ModifiedAt: attr.LastModified,
141+
ModifiedAt: lastModified,
126142
})
127143

128144
return nil

‎pkg/storage/bucket/prefixed_bucket_client.go

+17
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ func (b *PrefixedBucketClient) Delete(ctx context.Context, name string) error {
4444
// Name returns the bucket name for the provider.
4545
func (b *PrefixedBucketClient) Name() string { return b.bucket.Name() }
4646

47+
// SupportedIterOptions returns a list of supported IterOptions by the underlying provider.
48+
func (b *PrefixedBucketClient) SupportedIterOptions() []objstore.IterOptionType {
49+
return b.bucket.SupportedIterOptions()
50+
}
51+
4752
// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
4853
// object name including the prefix of the inspected directory. The configured prefix will be stripped
4954
// before supplied function is applied.
@@ -53,6 +58,18 @@ func (b *PrefixedBucketClient) Iter(ctx context.Context, dir string, f func(stri
5358
}, options...)
5459
}
5560

61+
// IterWithAttributes calls f for each entry in the given directory similar to Iter.
62+
// In addition to Name, it also includes requested object attributes in the argument to f.
63+
//
64+
// Attributes can be requested using IterOption.
65+
// Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported.
66+
func (b *PrefixedBucketClient) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
67+
return b.bucket.IterWithAttributes(ctx, b.fullName(dir), func(attrs objstore.IterObjectAttributes) error {
68+
attrs.Name = strings.TrimPrefix(attrs.Name, b.prefix+objstore.DirDelim)
69+
return f(attrs)
70+
}, options...)
71+
}
72+
5673
// Get returns a reader for the given object name.
5774
func (b *PrefixedBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) {
5875
return b.bucket.Get(ctx, b.fullName(name))

‎pkg/storage/bucket/sse_bucket_client.go

+14
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,25 @@ func (b *SSEBucketClient) getCustomS3SSEConfig() (encrypt.ServerSide, error) {
9494
return sse, nil
9595
}
9696

97+
// SupportedIterOptions returns a list of supported IterOptions by the underlying provider.
98+
func (b *SSEBucketClient) SupportedIterOptions() []objstore.IterOptionType {
99+
return b.bucket.SupportedIterOptions()
100+
}
101+
97102
// Iter implements objstore.Bucket.
98103
func (b *SSEBucketClient) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
99104
return b.bucket.Iter(ctx, dir, f, options...)
100105
}
101106

107+
// IterWithAttributes calls f for each entry in the given directory similar to Iter.
108+
// In addition to Name, it also includes requested object attributes in the argument to f.
109+
//
110+
// Attributes can be requested using IterOption.
111+
// Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported.
112+
func (b *SSEBucketClient) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
113+
return b.bucket.IterWithAttributes(ctx, dir, f, options...)
114+
}
115+
102116
// Get implements objstore.Bucket.
103117
func (b *SSEBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) {
104118
return b.bucket.Get(ctx, name)

‎vendor/github.com/thanos-io/objstore/CHANGELOG.md

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎vendor/github.com/thanos-io/objstore/README.md

+19-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎vendor/github.com/thanos-io/objstore/inmem.go

+14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)