Skip to content

Commit f2bff20

Browse files
fix: Determine when all logs have been filtered (#16073)
1 parent 91ff737 commit f2bff20

File tree

3 files changed

+80
-26
lines changed

3 files changed

+80
-26
lines changed

‎pkg/distributor/http.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package distributor
22

33
import (
4+
"errors"
45
"fmt"
56
"net/http"
67
"strings"
@@ -42,16 +43,26 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
4243
logPushRequestStreams := d.tenantConfigs.LogPushRequestStreams(tenantID)
4344
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser, d.usageTracker, logPushRequestStreams)
4445
if err != nil {
46+
if !errors.Is(err, push.ErrAllLogsFiltered) {
47+
if d.tenantConfigs.LogPushRequest(tenantID) {
48+
level.Debug(logger).Log(
49+
"msg", "push request failed",
50+
"code", http.StatusBadRequest,
51+
"err", err,
52+
)
53+
}
54+
d.writeFailuresManager.Log(tenantID, fmt.Errorf("couldn't parse push request: %w", err))
55+
56+
errorWriter(w, err.Error(), http.StatusBadRequest, logger)
57+
return
58+
}
59+
4560
if d.tenantConfigs.LogPushRequest(tenantID) {
4661
level.Debug(logger).Log(
47-
"msg", "push request failed",
48-
"code", http.StatusBadRequest,
49-
"err", err,
62+
"msg", "successful push request filtered all lines",
5063
)
5164
}
52-
d.writeFailuresManager.Log(tenantID, fmt.Errorf("couldn't parse push request: %w", err))
53-
54-
errorWriter(w, err.Error(), http.StatusBadRequest, logger)
65+
w.WriteHeader(http.StatusNoContent)
5566
return
5667
}
5768

‎pkg/distributor/http_test.go

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,27 +63,66 @@ func TestDistributorRingHandler(t *testing.T) {
6363
}
6464

6565
func TestRequestParserWrapping(t *testing.T) {
66-
limits := &validation.Limits{}
67-
flagext.DefaultValues(limits)
68-
limits.RejectOldSamples = false
69-
distributors, _ := prepare(t, 1, 3, limits, nil)
66+
t.Run("it calls the parser wrapper if there is one", func(t *testing.T) {
67+
limits := &validation.Limits{}
68+
flagext.DefaultValues(limits)
69+
limits.RejectOldSamples = false
70+
distributors, _ := prepare(t, 1, 3, limits, nil)
7071

71-
var called bool
72-
distributors[0].RequestParserWrapper = func(requestParser push.RequestParser) push.RequestParser {
73-
called = true
74-
return requestParser
75-
}
72+
var called bool
73+
distributors[0].RequestParserWrapper = func(requestParser push.RequestParser) push.RequestParser {
74+
called = true
75+
return requestParser
76+
}
77+
78+
ctx := user.InjectOrgID(context.Background(), "test-user")
79+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "fake-path", nil)
80+
require.NoError(t, err)
81+
82+
rec := httptest.NewRecorder()
83+
distributors[0].pushHandler(rec, req, newFakeParser().parseRequest, push.HTTPError)
7684

77-
ctx := user.InjectOrgID(context.Background(), "test-user")
78-
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "fake-path", nil)
79-
require.NoError(t, err)
85+
// unprocessable code because there are no streams in the request.
86+
require.Equal(t, http.StatusUnprocessableEntity, rec.Code)
87+
require.True(t, called)
88+
})
89+
90+
t.Run("it returns 204 when the parser wrapper filteres all log lines", func(t *testing.T) {
91+
limits := &validation.Limits{}
92+
flagext.DefaultValues(limits)
93+
limits.RejectOldSamples = false
94+
distributors, _ := prepare(t, 1, 3, limits, nil)
8095

81-
distributors[0].pushHandler(httptest.NewRecorder(), req, stubParser, push.HTTPError)
96+
var called bool
97+
distributors[0].RequestParserWrapper = func(requestParser push.RequestParser) push.RequestParser {
98+
called = true
99+
return requestParser
100+
}
101+
102+
ctx := user.InjectOrgID(context.Background(), "test-user")
103+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "fake-path", nil)
104+
require.NoError(t, err)
105+
106+
parser := newFakeParser()
107+
parser.parseErr = push.ErrAllLogsFiltered
108+
109+
rec := httptest.NewRecorder()
110+
distributors[0].pushHandler(rec, req, parser.parseRequest, push.HTTPError)
111+
112+
require.True(t, called)
113+
require.Equal(t, http.StatusNoContent, rec.Code)
114+
})
115+
}
116+
117+
type fakeParser struct {
118+
parseErr error
119+
}
82120

83-
require.True(t, called)
121+
func newFakeParser() *fakeParser {
122+
return &fakeParser{}
84123
}
85124

86-
func stubParser(
125+
func (p *fakeParser) parseRequest(
87126
_ string,
88127
_ *http.Request,
89128
_ push.TenantsRetention,
@@ -92,5 +131,5 @@ func stubParser(
92131
_ bool,
93132
_ log.Logger,
94133
) (*logproto.PushRequest, *push.Stats, error) {
95-
return &logproto.PushRequest{}, &push.Stats{}, nil
134+
return &logproto.PushRequest{}, &push.Stats{}, p.parseErr
96135
}

‎pkg/loghttp/push/push.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,19 @@ import (
1111
"time"
1212

1313
"github.com/go-kit/log/level"
14+
"github.com/pkg/errors"
1415

1516
"github.com/grafana/loki/pkg/push"
1617

18+
"google.golang.org/grpc/codes"
19+
grpcstatus "google.golang.org/grpc/status"
20+
1721
"github.com/dustin/go-humanize"
1822
"github.com/go-kit/log"
1923
"github.com/gogo/protobuf/proto"
2024
"github.com/prometheus/client_golang/prometheus"
2125
"github.com/prometheus/client_golang/prometheus/promauto"
2226
"github.com/prometheus/prometheus/model/labels"
23-
"google.golang.org/grpc/codes"
24-
grpcstatus "google.golang.org/grpc/status"
2527

2628
"github.com/grafana/loki/v3/pkg/analytics"
2729
"github.com/grafana/loki/v3/pkg/loghttp"
@@ -66,6 +68,8 @@ const (
6668
AggregatedMetricLabel = "__aggregated_metric__"
6769
)
6870

71+
var ErrAllLogsFiltered = errors.New("all logs lines filtered during parsing")
72+
6973
type TenantsRetention interface {
7074
RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration
7175
}
@@ -111,7 +115,7 @@ type Stats struct {
111115

112116
func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser, tracker UsageTracker, logPushRequestStreams bool) (*logproto.PushRequest, error) {
113117
req, pushStats, err := pushRequestParser(userID, r, tenantsRetention, limits, tracker, logPushRequestStreams, logger)
114-
if err != nil {
118+
if err != nil && !errors.Is(err, ErrAllLogsFiltered) {
115119
return nil, err
116120
}
117121

@@ -164,7 +168,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
164168
logValues = append(logValues, pushStats.Extra...)
165169
level.Debug(logger).Log(logValues...)
166170

167-
return req, nil
171+
return req, err
168172
}
169173

170174
func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) {

0 commit comments

Comments
 (0)