Skip to content

Commit b5ac6a0

Browse files
authored
feat: Limit to block ingestion until configured date (#13958)
1 parent 5cedb19 commit b5ac6a0

File tree

7 files changed

+117
-0
lines changed

7 files changed

+117
-0
lines changed

‎docs/sources/shared/configuration.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4057,6 +4057,17 @@ otlp_config:
40574057
# Configuration for log attributes to store them as Structured Metadata or
40584058
# drop them altogether
40594059
[log_attributes: <list of attributes_configs>]
4060+
4061+
# Block ingestion until the configured date. The time should be in RFC3339
4062+
# format.
4063+
# CLI flag: -limits.block-ingestion-until
4064+
[block_ingestion_until: <time> | default = 0]
4065+
4066+
# HTTP status code to return when ingestion is blocked. If 200, the ingestion
4067+
# will be blocked without returning an error to the client. By Default, a custom
4068+
# status code (260) is returned to the client along with an error message.
4069+
# CLI flag: -limits.block-ingestion-status-code
4070+
[block_ingestion_status_code: <int> | default = 260]
40604071
```
40614072
40624073
### local_storage_config

‎pkg/distributor/distributor.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,23 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
454454
}
455455

456456
now := time.Now()
457+
458+
if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block {
459+
validation.DiscardedSamples.WithLabelValues(validation.BlockedIngestion, tenantID).Add(float64(validatedLineCount))
460+
validation.DiscardedBytes.WithLabelValues(validation.BlockedIngestion, tenantID).Add(float64(validatedLineSize))
461+
462+
err = fmt.Errorf(validation.BlockedIngestionErrorMsg, tenantID, until.Format(time.RFC3339), retStatusCode)
463+
d.writeFailuresManager.Log(tenantID, err)
464+
465+
// If the status code is 200, return success.
466+
// Note that we still log the error and increment the metrics.
467+
if retStatusCode == http.StatusOK {
468+
return &logproto.PushResponse{}, nil
469+
}
470+
471+
return nil, httpgrpc.Errorf(retStatusCode, err.Error())
472+
}
473+
457474
if !d.ingestionRateLimiter.AllowN(now, tenantID, validatedLineSize) {
458475
// Return a 429 to indicate to the client they are being rate limited
459476
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, tenantID).Add(float64(validatedLineCount))

‎pkg/distributor/distributor_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1171,6 +1171,60 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
11711171
}
11721172
}
11731173

1174+
func TestDistributor_PushIngestionBlocked(t *testing.T) {
1175+
for _, tc := range []struct {
1176+
name string
1177+
blockUntil time.Time
1178+
blockStatusCode int
1179+
expectError bool
1180+
expectedStatusCode int
1181+
}{
1182+
{
1183+
name: "not configured",
1184+
expectedStatusCode: http.StatusOK,
1185+
},
1186+
{
1187+
name: "not blocked",
1188+
blockUntil: time.Now().Add(-1 * time.Hour),
1189+
expectedStatusCode: http.StatusOK,
1190+
},
1191+
{
1192+
name: "blocked",
1193+
blockUntil: time.Now().Add(1 * time.Hour),
1194+
blockStatusCode: 456,
1195+
expectError: true,
1196+
expectedStatusCode: 456,
1197+
},
1198+
{
1199+
name: "blocked with status code 200",
1200+
blockUntil: time.Now().Add(1 * time.Hour),
1201+
blockStatusCode: http.StatusOK,
1202+
expectError: false,
1203+
expectedStatusCode: http.StatusOK,
1204+
},
1205+
} {
1206+
t.Run(tc.name, func(t *testing.T) {
1207+
limits := &validation.Limits{}
1208+
flagext.DefaultValues(limits)
1209+
limits.BlockIngestionUntil = flagext.Time(tc.blockUntil)
1210+
limits.BlockIngestionStatusCode = tc.blockStatusCode
1211+
1212+
distributors, _ := prepare(t, 1, 5, limits, nil)
1213+
request := makeWriteRequest(1, 1024)
1214+
response, err := distributors[0].Push(ctx, request)
1215+
1216+
if tc.expectError {
1217+
expectedErr := fmt.Sprintf(validation.BlockedIngestionErrorMsg, "test", tc.blockUntil.Format(time.RFC3339), tc.blockStatusCode)
1218+
require.ErrorContains(t, err, expectedErr)
1219+
require.Nil(t, response)
1220+
} else {
1221+
require.NoError(t, err)
1222+
require.Equal(t, success, response)
1223+
}
1224+
})
1225+
}
1226+
}
1227+
11741228
func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation.Limits, factory func(addr string) (ring_client.PoolClient, error)) ([]*Distributor, []mockIngester) {
11751229
t.Helper()
11761230

‎pkg/distributor/limits.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,7 @@ type Limits interface {
3333
MaxStructuredMetadataSize(userID string) int
3434
MaxStructuredMetadataCount(userID string) int
3535
OTLPConfig(userID string) push.OTLPConfig
36+
37+
BlockIngestionUntil(userID string) time.Time
38+
BlockIngestionStatusCode(userID string) int
3639
}

‎pkg/distributor/validator.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ type validationContext struct {
5050
maxStructuredMetadataSize int
5151
maxStructuredMetadataCount int
5252

53+
blockIngestionUntil time.Time
54+
blockIngestionStatusCode int
55+
5356
userID string
5457
}
5558

@@ -70,6 +73,8 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val
7073
allowStructuredMetadata: v.AllowStructuredMetadata(userID),
7174
maxStructuredMetadataSize: v.MaxStructuredMetadataSize(userID),
7275
maxStructuredMetadataCount: v.MaxStructuredMetadataCount(userID),
76+
blockIngestionUntil: v.BlockIngestionUntil(userID),
77+
blockIngestionStatusCode: v.BlockIngestionStatusCode(userID),
7378
}
7479
}
7580

@@ -192,6 +197,15 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea
192197
return nil
193198
}
194199

200+
// ShouldBlockIngestion returns whether ingestion should be blocked, until when and the status code.
201+
func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time) (bool, time.Time, int) {
202+
if ctx.blockIngestionUntil.IsZero() {
203+
return false, time.Time{}, 0
204+
}
205+
206+
return now.Before(ctx.blockIngestionUntil), ctx.blockIngestionUntil, ctx.blockIngestionStatusCode
207+
}
208+
195209
func updateMetrics(reason, userID string, stream logproto.Stream) {
196210
validation.DiscardedSamples.WithLabelValues(reason, userID).Inc()
197211
bytes := 0

‎pkg/validation/limits.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ const (
6161
defaultMaxStructuredMetadataCount = 128
6262
defaultBloomCompactorMaxBlockSize = "200MB"
6363
defaultBloomCompactorMaxBloomSize = "128MB"
64+
65+
defaultBlockedIngestionStatusCode = 260 // 260 is a custom status code to indicate blocked ingestion
6466
)
6567

6668
// Limits describe all the limits for users; can be used to describe global default
@@ -222,6 +224,9 @@ type Limits struct {
222224
MaxStructuredMetadataEntriesCount int `yaml:"max_structured_metadata_entries_count" json:"max_structured_metadata_entries_count" doc:"description=Maximum number of structured metadata entries per log line."`
223225
OTLPConfig push.OTLPConfig `yaml:"otlp_config" json:"otlp_config" doc:"description=OTLP log ingestion configurations"`
224226
GlobalOTLPConfig push.GlobalOTLPConfig `yaml:"-" json:"-"`
227+
228+
BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until"`
229+
BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"`
225230
}
226231

227232
type StreamRetention struct {
@@ -411,6 +416,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
411416
f.Var(&l.MaxStructuredMetadataSize, "limits.max-structured-metadata-size", "Maximum size accepted for structured metadata per entry. Default: 64 kb. Any log line exceeding this limit will be discarded. There is no limit when unset or set to 0.")
412417
f.IntVar(&l.MaxStructuredMetadataEntriesCount, "limits.max-structured-metadata-entries-count", defaultMaxStructuredMetadataCount, "Maximum number of structured metadata entries per log line. Default: 128. Any log line exceeding this limit will be discarded. There is no limit when unset or set to 0.")
413418
f.BoolVar(&l.VolumeEnabled, "limits.volume-enabled", true, "Enable log volume endpoint.")
419+
420+
f.Var(&l.BlockIngestionUntil, "limits.block-ingestion-until", "Block ingestion until the configured date. The time should be in RFC3339 format.")
421+
f.IntVar(&l.BlockIngestionStatusCode, "limits.block-ingestion-status-code", defaultBlockedIngestionStatusCode, "HTTP status code to return when ingestion is blocked. If 200, the ingestion will be blocked without returning an error to the client. By Default, a custom status code (260) is returned to the client along with an error message.")
414422
}
415423

416424
// SetGlobalOTLPConfig set GlobalOTLPConfig which is used while unmarshaling per-tenant otlp config to use the default list of resource attributes picked as index labels.
@@ -1051,6 +1059,14 @@ func (o *Overrides) OTLPConfig(userID string) push.OTLPConfig {
10511059
return o.getOverridesForUser(userID).OTLPConfig
10521060
}
10531061

1062+
func (o *Overrides) BlockIngestionUntil(userID string) time.Time {
1063+
return time.Time(o.getOverridesForUser(userID).BlockIngestionUntil)
1064+
}
1065+
1066+
func (o *Overrides) BlockIngestionStatusCode(userID string) int {
1067+
return o.getOverridesForUser(userID).BlockIngestionStatusCode
1068+
}
1069+
10541070
func (o *Overrides) getOverridesForUser(userID string) *Limits {
10551071
if o.tenantLimits != nil {
10561072
l := o.tenantLimits.TenantLimits(userID)

‎pkg/validation/validate.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ const (
6767
StructuredMetadataTooLargeErrorMsg = "stream '%s' has structured metadata too large: '%d' bytes, limit: '%d' bytes. Please see `limits_config.max_structured_metadata_size` or contact your Loki administrator to increase it."
6868
StructuredMetadataTooMany = "structured_metadata_too_many"
6969
StructuredMetadataTooManyErrorMsg = "stream '%s' has too many structured metadata labels: '%d', limit: '%d'. Please see `limits_config.max_structured_metadata_entries_count` or contact your Loki administrator to increase it."
70+
BlockedIngestion = "blocked_ingestion"
71+
BlockedIngestionErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d'"
7072
)
7173

7274
type ErrStreamRateLimit struct {

0 commit comments

Comments
 (0)