Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add a new time_sharding_ignore_recent option to ignore recent logs
  • Loading branch information
na-- committed Nov 6, 2024
commit 778773528be3b804c50567414df57673ac17daf1
64 changes: 54 additions & 10 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,9 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
validatedLineCount := 0

var validationErrors util.GroupedErrors
validationContext := d.validator.getValidationContextForTime(time.Now(), tenantID)

now := time.Now()
validationContext := d.validator.getValidationContextForTime(now, tenantID)

shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
maybeShardByRate := func(stream logproto.Stream, pushSize int) {
Expand All @@ -472,7 +474,13 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return
}

streamsByTime := shardStreamByTime(stream, labels, d.ingesterCfg.MaxChunkAge/2)
ignoreRecentFrom := now.Add(-shardStreamsCfg.TimeShardingIgnoreRecent)
streamsByTime, ok := shardStreamByTime(stream, labels, d.ingesterCfg.MaxChunkAge/2, ignoreRecentFrom)
if !ok {
maybeShardByRate(stream, pushSize)
return
}

for _, ts := range streamsByTime {
maybeShardByRate(ts.Stream, ts.linesTotalLen)
}
Expand Down Expand Up @@ -579,8 +587,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return &logproto.PushResponse{}, validationErr
}

now := time.Now()

if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.BlockedIngestion)

Expand Down Expand Up @@ -753,25 +759,44 @@ type streamWithTimeShard struct {
// timestamps, but with no new alocations for the log entries. It will sort them
// in-place in the given stream object (so it may modify it!) and reference
// sub-slices of the same stream.Entries slice.
func shardStreamByTime(stream logproto.Stream, lbls labels.Labels, timeShardLen time.Duration) []streamWithTimeShard {
//
// If the second result is false, it means that either there were no logs in the
// stream, or all of the logs in the stream occurred after the given value of
// ignoreLogsFrom, so there was no need to shard - the original `streams` value
// can be used. However, due to the in-place logs sorting by their timestamp, it
// might still have been reordered.
func shardStreamByTime(stream logproto.Stream, lbls labels.Labels, timeShardLen time.Duration, ignoreLogsFrom time.Time) ([]streamWithTimeShard, bool) {
entries := stream.Entries
entriesLen := len(entries)
if entriesLen == 0 {
return nil
return nil, false
}

slices.SortStableFunc(entries, func(a, b logproto.Entry) int { return a.Timestamp.Compare(b.Timestamp) })

result := make([]streamWithTimeShard, 0, entries[entriesLen-1].Timestamp.Sub(entries[0].Timestamp)/timeShardLen)
// Shortcut to do no work if all of the logs are recent
if entries[0].Timestamp.After(ignoreLogsFrom) {
return nil, false
}

result := make([]streamWithTimeShard, 0, (entries[entriesLen-1].Timestamp.Sub(entries[0].Timestamp)/timeShardLen)+1)
labelBuilder := labels.NewBuilder(lbls)

for startIdx := 0; startIdx < entriesLen; /* the index is changed below */ {
startIdx := 0
for startIdx < entriesLen && entries[startIdx].Timestamp.Before(ignoreLogsFrom) /* the index is changed below */ {
timeShardStart := entries[startIdx].Timestamp.Truncate(timeShardLen)
timeShardEnd := timeShardStart.Add(timeShardLen)

timeShardCutoff := timeShardEnd
if timeShardCutoff.After(ignoreLogsFrom) {
// If the time_sharding_ignore_recent is in the middle of this
// shard, we need to cut off the logs at that point.
timeShardCutoff = ignoreLogsFrom
}

endIdx := startIdx + 1
linesTotalLen := len(entries[startIdx].Line)
for ; endIdx < entriesLen && entries[endIdx].Timestamp.Before(timeShardEnd); endIdx++ {
for ; endIdx < entriesLen && entries[endIdx].Timestamp.Before(timeShardCutoff); endIdx++ {
linesTotalLen += len(entries[endIdx].Line)
}

Expand All @@ -787,7 +812,26 @@ func shardStreamByTime(stream logproto.Stream, lbls labels.Labels, timeShardLen

startIdx = endIdx
}
return result

if startIdx == entriesLen {
// We do not have any remaining entries
return result, true
}

// Append one last shard with all of the logs without a time shard
logsWithoutTimeShardLen := 0
for i := startIdx; i < entriesLen; i++ {
logsWithoutTimeShardLen += len(entries[i].Line)
}

return append(result, streamWithTimeShard{
Stream: logproto.Stream{
Labels: stream.Labels,
Hash: stream.Hash,
Entries: stream.Entries[startIdx:entriesLen],
},
linesTotalLen: logsWithoutTimeShardLen,
}), true
}

// shardStream shards (divides) the given stream into N smaller streams, where
Expand Down
97 changes: 95 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,7 @@ func TestStreamShardByTime(t *testing.T) {
labels string
entries []logproto.Entry
timeShardLen time.Duration
ignoreFrom time.Time
expResult []streamWithTimeShard
}{
{
Expand All @@ -819,15 +820,25 @@ func TestStreamShardByTime(t *testing.T) {
labels: `{app="myapp"}`,
entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
//{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
},
timeShardLen: time.Hour,
ignoreFrom: baseTimestamp.Add(1 * time.Second),
expResult: []streamWithTimeShard{
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
}}, linesTotalLen: 3},
},
},
{
test: "one entry that is ignored",
labels: `{app="myapp"}`,
entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
},
timeShardLen: time.Hour,
ignoreFrom: baseTimestamp.Add(-10 * time.Minute),
expResult: nil,
},
{
test: "single shard with two entries",
labels: `{app="myapp"}`,
Expand All @@ -836,13 +847,32 @@ func TestStreamShardByTime(t *testing.T) {
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
},
timeShardLen: time.Hour,
ignoreFrom: baseTimestamp.Add(2 * time.Second),
expResult: []streamWithTimeShard{
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
}}, linesTotalLen: 6},
},
},
{
test: "one shard and another stream with original labels",
labels: `{app="myapp"}`,
entries: []logproto.Entry{
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
{Timestamp: baseTimestamp, Line: "foo"},
},
timeShardLen: time.Hour,
ignoreFrom: baseTimestamp.Add(1 * time.Second),
expResult: []streamWithTimeShard{
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
}}, linesTotalLen: 3},
{Stream: logproto.Stream{Labels: `{app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
}}, linesTotalLen: 3},
},
},
{
test: "single shard with two entries reversed",
labels: `{app="myapp"}`,
Expand All @@ -851,6 +881,7 @@ func TestStreamShardByTime(t *testing.T) {
{Timestamp: baseTimestamp, Line: "foo"},
},
timeShardLen: time.Hour,
ignoreFrom: baseTimestamp.Add(2 * time.Second),
expResult: []streamWithTimeShard{
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
Expand All @@ -867,6 +898,7 @@ func TestStreamShardByTime(t *testing.T) {
{Timestamp: baseTimestamp.Add(time.Hour), Line: "baz"},
},
timeShardLen: time.Hour,
ignoreFrom: baseTimestamp.Add(2 * time.Hour),
expResult: []streamWithTimeShard{
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
Expand All @@ -886,6 +918,7 @@ func TestStreamShardByTime(t *testing.T) {
{Timestamp: baseTimestamp.Add(4 * time.Hour), Line: "baz"},
},
timeShardLen: time.Hour,
ignoreFrom: baseTimestamp.Add(5 * time.Hour),
expResult: []streamWithTimeShard{
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
Expand All @@ -905,6 +938,7 @@ func TestStreamShardByTime(t *testing.T) {
{Timestamp: baseTimestamp.Add(6 * time.Hour), Line: "baz"},
},
timeShardLen: 24 * time.Hour,
ignoreFrom: baseTimestamp.Add(7 * time.Hour),
expResult: []streamWithTimeShard{
{Stream: logproto.Stream{Labels: `{__time_shard__="1730332800_1730419200", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
Expand All @@ -913,6 +947,26 @@ func TestStreamShardByTime(t *testing.T) {
}}, linesTotalLen: 9},
},
},
{
test: "bigger shard len with some unsharded",
labels: `{app="myapp"}`,
entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
{Timestamp: baseTimestamp.Add(6 * time.Hour), Line: "baz"},
},
timeShardLen: 24 * time.Hour,
ignoreFrom: baseTimestamp.Add(5 * time.Hour),
expResult: []streamWithTimeShard{
{Stream: logproto.Stream{Labels: `{__time_shard__="1730332800_1730419200", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp, Line: "foo"},
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
}}, linesTotalLen: 6},
{Stream: logproto.Stream{Labels: `{app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp.Add(6 * time.Hour), Line: "baz"},
}}, linesTotalLen: 3},
},
},
{
test: "longer messy gaps",
labels: `{app="myapp"}`,
Expand All @@ -927,6 +981,7 @@ func TestStreamShardByTime(t *testing.T) {
{Timestamp: baseTimestamp.Truncate(time.Hour).Add(time.Hour), Line: "21"},
},
timeShardLen: time.Hour,
ignoreFrom: baseTimestamp.Add(7 * time.Hour),
expResult: []streamWithTimeShard{
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp.Truncate(time.Hour), Line: "11"},
Expand All @@ -946,6 +1001,38 @@ func TestStreamShardByTime(t *testing.T) {
}}, linesTotalLen: 2},
},
},
{
test: "longer messy with a couple ofc unsharded",
labels: `{app="myapp"}`,
entries: []logproto.Entry{
{Timestamp: baseTimestamp.Truncate(time.Hour), Line: "11"},
{Timestamp: baseTimestamp, Line: "13"},
{Timestamp: baseTimestamp, Line: "14"},
{Timestamp: baseTimestamp.Truncate(time.Hour), Line: "12"},
{Timestamp: baseTimestamp.Add(2 * time.Hour), Line: "32"},
{Timestamp: baseTimestamp.Truncate(time.Hour).Add(2 * time.Hour), Line: "31"},
{Timestamp: baseTimestamp.Add(5 * time.Hour), Line: "41"},
{Timestamp: baseTimestamp.Truncate(time.Hour).Add(time.Hour), Line: "21"},
},
timeShardLen: time.Hour,
ignoreFrom: baseTimestamp.Truncate(time.Hour).Add(1*time.Hour + 35*time.Minute),
expResult: []streamWithTimeShard{
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp.Truncate(time.Hour), Line: "11"},
{Timestamp: baseTimestamp.Truncate(time.Hour), Line: "12"},
{Timestamp: baseTimestamp, Line: "13"},
{Timestamp: baseTimestamp, Line: "14"},
}}, linesTotalLen: 8},
{Stream: logproto.Stream{Labels: `{__time_shard__="1730379600_1730383200", app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp.Truncate(time.Hour).Add(time.Hour), Line: "21"},
}}, linesTotalLen: 2},
{Stream: logproto.Stream{Labels: `{app="myapp"}`, Entries: []logproto.Entry{
{Timestamp: baseTimestamp.Truncate(time.Hour).Add(2 * time.Hour), Line: "31"},
{Timestamp: baseTimestamp.Add(2 * time.Hour), Line: "32"},
{Timestamp: baseTimestamp.Add(5 * time.Hour), Line: "41"},
}}, linesTotalLen: 6},
},
},
} {
t.Run(tc.test, func(t *testing.T) {
lbls, err := syntax.ParseLabels(tc.labels)
Expand All @@ -956,7 +1043,13 @@ func TestStreamShardByTime(t *testing.T) {
Entries: tc.entries,
}

shardedStreams := shardStreamByTime(stream, lbls, tc.timeShardLen)
shardedStreams, ok := shardStreamByTime(stream, lbls, tc.timeShardLen, tc.ignoreFrom)
if tc.expResult == nil {
assert.False(t, ok)
assert.Nil(t, shardedStreams)
return
}
require.True(t, ok)
require.Len(t, shardedStreams, len(tc.expResult))

for i, ss := range shardedStreams {
Expand Down
4 changes: 4 additions & 0 deletions pkg/distributor/shardstreams/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package shardstreams

import (
"flag"
"time"

"github.com/grafana/loki/v3/pkg/util/flagext"
)
Expand All @@ -11,6 +12,8 @@ type Config struct {

TimeShardingEnabled bool `yaml:"time_sharding_enabled" json:"time_sharding_enabled" doc:"description=Automatically shard streams by adding a __time_shard__ label, with values calculated from the log timestamps divided by MaxChunkAge/2. This allows the out-of-order ingestion of very old logs. If both flags are enabled, time-based sharding will happen before rate-based sharding."`

TimeShardingIgnoreRecent time.Duration `yaml:"time_sharding_ignore_recent" json:"time_sharding_ignore_recent" doc:"description=Logs with timestamps that are newer than this value will not be time-sharded."`

LoggingEnabled bool `yaml:"logging_enabled" json:"logging_enabled" doc:"description=Whether to log sharding streams behavior or not. Not recommended for production environments."`

// DesiredRate is the threshold used to shard the stream into smaller pieces.
Expand All @@ -21,6 +24,7 @@ type Config struct {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet) {
fs.BoolVar(&cfg.Enabled, prefix+".enabled", true, "Automatically shard streams to keep them under the per-stream rate limit")
fs.BoolVar(&cfg.TimeShardingEnabled, prefix+".time-sharding-enabled", false, "Automatically shard streams by time (in MaxChunkAge/2 buckets), to allow out-of-order ingestion of very old logs.")
fs.DurationVar(&cfg.TimeShardingIgnoreRecent, prefix+".time-sharding-ignore-recent", 40*time.Minute, "Logs with timestamps that are newer than this value will not be time-sharded.")
fs.BoolVar(&cfg.LoggingEnabled, prefix+".logging-enabled", false, "Enable logging when sharding streams")
cfg.DesiredRate.Set("1536KB") //nolint:errcheck
fs.Var(&cfg.DesiredRate, prefix+".desired-rate", "threshold used to cut a new shard. Default (1536KB) means if a rate is above 1536KB/s, it will be sharded.")
Expand Down