Skip to content

Commit fc90a63

Browse files
authored
feat: implement IsRetryableErr for S3ObjectClient (#14174)
**What this PR does / why we need it**: Adds support for `IsRetryableErr`in the `S3ObjectClient`, similar to what is found in the `GCSObjectClient`
1 parent 2840d48 commit fc90a63

File tree

2 files changed

+165
-2
lines changed

2 files changed

+165
-2
lines changed

‎pkg/storage/chunk/client/aws/s3_storage_client.go

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@ import (
2121
"github.com/aws/aws-sdk-go/service/s3"
2222
"github.com/aws/aws-sdk-go/service/s3/s3iface"
2323
awscommon "github.com/grafana/dskit/aws"
24+
2425
"github.com/grafana/dskit/backoff"
2526
"github.com/grafana/dskit/flagext"
2627
"github.com/grafana/dskit/instrument"
2728
"github.com/pkg/errors"
2829
"github.com/prometheus/client_golang/prometheus"
2930

31+
amnet "k8s.io/apimachinery/pkg/util/net"
32+
3033
bucket_s3 "github.com/grafana/loki/v3/pkg/storage/bucket/s3"
3134
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
3235
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
@@ -532,5 +535,61 @@ func (a *S3ObjectClient) IsObjectNotFoundErr(err error) bool {
532535
return false
533536
}
534537

535-
// TODO(dannyk): implement for client
536-
func (a *S3ObjectClient) IsRetryableErr(error) bool { return false }
538+
func isTimeoutError(err error) bool {
539+
var netErr net.Error
540+
return errors.As(err, &netErr) && netErr.Timeout()
541+
}
542+
543+
func isContextErr(err error) bool {
544+
return errors.Is(err, context.DeadlineExceeded) ||
545+
errors.Is(err, context.Canceled)
546+
}
547+
548+
// IsStorageTimeoutErr returns true if error means that object cannot be retrieved right now due to server-side timeouts.
549+
func (a *S3ObjectClient) IsStorageTimeoutErr(err error) bool {
550+
// TODO(dannyk): move these out to be generic
551+
// context errors are all client-side
552+
if isContextErr(err) {
553+
return false
554+
}
555+
556+
// connection misconfiguration, or writing on a closed connection
557+
// do NOT retry; this is not a server-side issue
558+
if errors.Is(err, net.ErrClosed) || amnet.IsConnectionRefused(err) {
559+
return false
560+
}
561+
562+
// this is a server-side timeout
563+
if isTimeoutError(err) {
564+
return true
565+
}
566+
567+
// connection closed (closed before established) or reset (closed after established)
568+
// this is a server-side issue
569+
if errors.Is(err, io.EOF) || amnet.IsConnectionReset(err) {
570+
return true
571+
}
572+
573+
if rerr, ok := err.(awserr.RequestFailure); ok {
574+
// https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html
575+
return rerr.StatusCode() == http.StatusRequestTimeout ||
576+
rerr.StatusCode() == http.StatusGatewayTimeout
577+
}
578+
579+
return false
580+
}
581+
582+
// IsStorageThrottledErr returns true if error means that object cannot be retrieved right now due to throttling.
583+
func (a *S3ObjectClient) IsStorageThrottledErr(err error) bool {
584+
if rerr, ok := err.(awserr.RequestFailure); ok {
585+
586+
// https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html
587+
return rerr.StatusCode() == http.StatusTooManyRequests ||
588+
(rerr.StatusCode()/100 == 5) // all 5xx errors are retryable
589+
}
590+
591+
return false
592+
}
593+
func (a *S3ObjectClient) IsRetryableErr(err error) bool {
594+
return a.IsStorageTimeoutErr(err) || a.IsStorageThrottledErr(err)
595+
}

‎pkg/storage/chunk/client/aws/s3_storage_client_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ import (
66
"errors"
77
"fmt"
88
"io"
9+
"net"
910
"net/http"
1011
"net/http/httptest"
1112
"strings"
13+
"syscall"
1214
"testing"
1315
"time"
1416

@@ -73,6 +75,108 @@ func TestIsObjectNotFoundErr(t *testing.T) {
7375
}
7476
}
7577

78+
func TestIsRetryableErr(t *testing.T) {
79+
tests := []struct {
80+
err error
81+
expected bool
82+
name string
83+
}{
84+
{
85+
name: "IsStorageThrottledErr - Too Many Requests",
86+
err: awserr.NewRequestFailure(
87+
awserr.New("TooManyRequests", "TooManyRequests", nil), 429, "reqId",
88+
),
89+
expected: true,
90+
},
91+
{
92+
name: "IsStorageThrottledErr - 500",
93+
err: awserr.NewRequestFailure(
94+
awserr.New("500", "500", nil), 500, "reqId",
95+
),
96+
expected: true,
97+
},
98+
{
99+
name: "IsStorageThrottledErr - 5xx",
100+
err: awserr.NewRequestFailure(
101+
awserr.New("501", "501", nil), 501, "reqId",
102+
),
103+
expected: true,
104+
},
105+
{
106+
name: "IsStorageTimeoutErr - Request Timeout",
107+
err: awserr.NewRequestFailure(
108+
awserr.New("Request Timeout", "Request Timeout", nil), 408, "reqId",
109+
),
110+
expected: true,
111+
},
112+
{
113+
name: "IsStorageTimeoutErr - Gateway Timeout",
114+
err: awserr.NewRequestFailure(
115+
awserr.New("Gateway Timeout", "Gateway Timeout", nil), 504, "reqId",
116+
),
117+
expected: true,
118+
},
119+
{
120+
name: "IsStorageTimeoutErr - EOF",
121+
err: io.EOF,
122+
expected: true,
123+
},
124+
{
125+
name: "IsStorageTimeoutErr - Connection Reset",
126+
err: syscall.ECONNRESET,
127+
expected: true,
128+
},
129+
{
130+
name: "IsStorageTimeoutErr - Timeout Error",
131+
err: awserr.NewRequestFailure(
132+
awserr.New("RequestCanceled", "request canceled due to timeout", nil), 408, "request-id",
133+
),
134+
expected: true,
135+
},
136+
{
137+
name: "IsStorageTimeoutErr - Closed",
138+
err: net.ErrClosed,
139+
expected: false,
140+
},
141+
{
142+
name: "IsStorageTimeoutErr - Connection Refused",
143+
err: syscall.ECONNREFUSED,
144+
expected: false,
145+
},
146+
{
147+
name: "IsStorageTimeoutErr - Context Deadline Exceeded",
148+
err: context.DeadlineExceeded,
149+
expected: false,
150+
},
151+
{
152+
name: "IsStorageTimeoutErr - Context Canceled",
153+
err: context.Canceled,
154+
expected: false,
155+
},
156+
{
157+
name: "Not a retryable error",
158+
err: syscall.EINVAL,
159+
expected: false,
160+
},
161+
{
162+
name: "Not found 404",
163+
err: awserr.NewRequestFailure(
164+
awserr.New("404", "404", nil), 404, "reqId",
165+
),
166+
expected: false,
167+
},
168+
}
169+
170+
for _, tt := range tests {
171+
t.Run(tt.name, func(t *testing.T) {
172+
client, err := NewS3ObjectClient(S3Config{BucketNames: "mybucket"}, hedging.Config{})
173+
require.NoError(t, err)
174+
175+
require.Equal(t, tt.expected, client.IsRetryableErr(tt.err))
176+
})
177+
}
178+
}
179+
76180
func TestRequestMiddleware(t *testing.T) {
77181
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
78182
fmt.Fprintln(w, r.Header.Get("echo-me"))

0 commit comments

Comments
 (0)