Skip to content

Commit 9000de1

Browse files
fix(push): Add guard clauses to prevent negative counter values (#17056)
Signed-off-by: Jordan Rushing <rushing.jordan@gmail.com> Co-authored-by: Trevor Whitney <trevorjwhitney@gmail.com>
1 parent e297c51 commit 9000de1

File tree

3 files changed

+109
-6
lines changed

3 files changed

+109
-6
lines changed

‎pkg/distributor/http.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
6868
// Add empty values for retention_hours and policy labels since we don't have
6969
// that information for request body too large errors
7070
validation.DiscardedBytes.WithLabelValues(validation.RequestBodyTooLarge, tenantID, "", "").Add(float64(r.ContentLength))
71+
} else {
72+
level.Error(logger).Log(
73+
"msg", "negative content length observed",
74+
"tenantID", tenantID,
75+
"contentLength", r.ContentLength)
7176
}
7277
errorWriter(w, err.Error(), http.StatusRequestEntityTooLarge, logger)
7378
return

‎pkg/loghttp/push/push.go

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,19 @@ func ParseRequest(logger log.Logger, userID string, maxRecvMsgSize int, r *http.
159159
for policyName, retentionToSizeMapping := range pushStats.LogLinesBytes {
160160
for retentionPeriod, size := range retentionToSizeMapping {
161161
retentionHours := RetentionPeriodToString(retentionPeriod)
162-
bytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric, policyName).Add(float64(size))
163-
bytesReceivedStats.Inc(size)
162+
// Add guard clause to prevent negative values from being passed to Prometheus counters
163+
if size > 0 {
164+
bytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric, policyName).Add(float64(size))
165+
bytesReceivedStats.Inc(size)
166+
} else {
167+
level.Error(logger).Log(
168+
"msg", "negative log lines bytes received",
169+
"userID", userID,
170+
"retentionHours", retentionHours,
171+
"isAggregatedMetric", isAggregatedMetric,
172+
"policyName", policyName,
173+
"size", size)
174+
}
164175
entriesSize += size
165176
}
166177
}
@@ -169,10 +180,21 @@ func ParseRequest(logger log.Logger, userID string, maxRecvMsgSize int, r *http.
169180
for retentionPeriod, size := range retentionToSizeMapping {
170181
retentionHours := RetentionPeriodToString(retentionPeriod)
171182

172-
structuredMetadataBytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric, policyName).Add(float64(size))
173-
bytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric, policyName).Add(float64(size))
174-
bytesReceivedStats.Inc(size)
175-
structuredMetadataBytesReceivedStats.Inc(size)
183+
// Add guard clause to prevent negative values from being passed to Prometheus counters
184+
if size > 0 {
185+
structuredMetadataBytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric, policyName).Add(float64(size))
186+
bytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric, policyName).Add(float64(size))
187+
bytesReceivedStats.Inc(size)
188+
structuredMetadataBytesReceivedStats.Inc(size)
189+
} else {
190+
level.Error(logger).Log(
191+
"msg", "negative structured metadata bytes received",
192+
"userID", userID,
193+
"retentionHours", retentionHours,
194+
"isAggregatedMetric", isAggregatedMetric,
195+
"policyName", policyName,
196+
"size", size)
197+
}
176198

177199
entriesSize += size
178200
structuredMetadataSize += size

‎pkg/loghttp/push/push_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"testing"
1515
"time"
1616

17+
kitlog "github.com/go-kit/log"
1718
"github.com/prometheus/client_golang/prometheus/testutil"
1819
"github.com/prometheus/prometheus/model/labels"
1920
"github.com/stretchr/testify/assert"
@@ -23,6 +24,7 @@ import (
2324

2425
"github.com/grafana/dskit/flagext"
2526

27+
"github.com/grafana/loki/v3/pkg/logproto"
2628
util_log "github.com/grafana/loki/v3/pkg/util/log"
2729
)
2830

@@ -573,6 +575,80 @@ func TestRetentionPeriodToString(t *testing.T) {
573575
}
574576
}
575577

578+
// TestNegativeSizeHandling tests that the code handles negative size values
579+
// properly without causing a panic when incrementing Prometheus counters.
580+
func TestNegativeSizeHandling(t *testing.T) {
581+
// Reset metrics for accurate testing
582+
structuredMetadataBytesIngested.Reset()
583+
bytesIngested.Reset()
584+
linesIngested.Reset()
585+
586+
// Create a custom request parser that will generate negative sizes
587+
var mockParser RequestParser = func(_ string, _ *http.Request, _ Limits, _ int, _ UsageTracker, _ StreamResolver, _ bool, _ kitlog.Logger) (*logproto.PushRequest, *Stats, error) {
588+
// Create a minimal valid request
589+
req := &logproto.PushRequest{
590+
Streams: []logproto.Stream{
591+
{
592+
Labels: `{foo="bar"}`,
593+
Entries: []logproto.Entry{
594+
{
595+
Timestamp: time.Now(),
596+
Line: "test line",
597+
},
598+
},
599+
},
600+
},
601+
}
602+
603+
// Create stats with negative sizes to test our guard clauses
604+
stats := NewPushStats()
605+
policy := ""
606+
retention := time.Hour
607+
608+
// Set up negative sizes in both maps
609+
stats.LogLinesBytes[policy] = make(map[time.Duration]int64)
610+
stats.LogLinesBytes[policy][retention] = -100
611+
612+
stats.StructuredMetadataBytes[policy] = make(map[time.Duration]int64)
613+
stats.StructuredMetadataBytes[policy][retention] = -200
614+
615+
return req, stats, nil
616+
}
617+
618+
// Create a mock request
619+
request := httptest.NewRequest("POST", "/loki/api/v1/push", strings.NewReader("{}"))
620+
request.Header.Add("Content-Type", "application/json")
621+
622+
// Use a mock stream resolver to ensure consistent results
623+
streamResolver := newMockStreamResolver("fake", &fakeLimits{})
624+
625+
// This should not panic with our guard clauses in place
626+
_, err := ParseRequest(
627+
util_log.Logger,
628+
"fake",
629+
100<<20,
630+
request,
631+
&fakeLimits{},
632+
mockParser,
633+
NewMockTracker(),
634+
streamResolver,
635+
false,
636+
)
637+
638+
// No error should be returned
639+
require.NoError(t, err)
640+
641+
// Check that the metrics were not incremented for negative values
642+
userID := "fake"
643+
isAggregatedMetric := "false"
644+
policy := ""
645+
646+
// Verify no counters were incremented since all sizes were negative
647+
// This test passes if no panic occurred and the counters remain at 0
648+
require.Equal(t, float64(0), testutil.ToFloat64(bytesIngested.WithLabelValues(userID, "1", isAggregatedMetric, policy)))
649+
require.Equal(t, float64(0), testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues(userID, "1", isAggregatedMetric, policy)))
650+
}
651+
576652
type fakeLimits struct {
577653
enabled bool
578654
labels []string

0 commit comments

Comments
 (0)