Skip to content

Commit 5b70be6

Browse files
committed
feat: add per-tenant time sharding for long out-of-order ingestion
1 parent cc496c6 commit 5b70be6

File tree

5 files changed

+262
-10
lines changed

5 files changed

+262
-10
lines changed

‎docs/sources/shared/configuration.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3707,6 +3707,13 @@ shard_streams:
37073707
# CLI flag: -shard-streams.enabled
37083708
[enabled: <boolean> | default = true]
37093709

3710+
# Automatically shard streams by adding a __time_shard__ label, with values
3711+
# calculated from the log timestamps divided by MaxChunkAge/2. This allows the
3712+
# out-of-order ingestion of very old logs. If both flags are enabled,
3713+
# time-based sharding will happen before rate-based sharding.
3714+
# CLI flag: -shard-streams.time-sharding-enabled
3715+
[time_sharding_enabled: <boolean> | default = false]
3716+
37103717
# Whether to log sharding streams behavior or not. Not recommended for
37113718
# production environments.
37123719
# CLI flag: -shard-streams.logging-enabled

‎pkg/distributor/distributor.go

Lines changed: 76 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"math"
99
"net/http"
10+
"slices"
1011
"sort"
1112
"strconv"
1213
"strings"
@@ -64,6 +65,8 @@ const (
6465
ringKey = "distributor"
6566

6667
ringAutoForgetUnhealthyPeriods = 2
68+
69+
timeShardLabel = "__time_shard__"
6770
)
6871

6972
var (
@@ -132,6 +135,7 @@ type Distributor struct {
132135
services.Service
133136

134137
cfg Config
138+
ingesterCfg ingester.Config
135139
logger log.Logger
136140
clientCfg client.Config
137141
tenantConfigs *runtime.TenantConfigs
@@ -187,6 +191,7 @@ type Distributor struct {
187191
// New a distributor creates.
188192
func New(
189193
cfg Config,
194+
ingesterCfg ingester.Config,
190195
clientCfg client.Config,
191196
configs *runtime.TenantConfigs,
192197
ingestersRing ring.ReadRing,
@@ -245,6 +250,7 @@ func New(
245250

246251
d := &Distributor{
247252
cfg: cfg,
253+
ingesterCfg: ingesterCfg,
248254
logger: logger,
249255
clientCfg: clientCfg,
250256
tenantConfigs: configs,
@@ -448,6 +454,28 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
448454
var validationErrors util.GroupedErrors
449455
validationContext := d.validator.getValidationContextForTime(time.Now(), tenantID)
450456

457+
shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
458+
maybeShardByRate := func(stream logproto.Stream, pushSize int) {
459+
if shardStreamsCfg.Enabled {
460+
streams = append(streams, d.shardStream(stream, pushSize, tenantID)...)
461+
} else {
462+
streams = append(streams, KeyedStream{
463+
HashKey: lokiring.TokenFor(tenantID, stream.Labels),
464+
Stream: stream,
465+
})
466+
}
467+
}
468+
maybeShardByTime := func(stream logproto.Stream, labels labels.Labels, pushSize int) {
469+
if shardStreamsCfg.TimeShardingEnabled {
470+
streamsByTime := shardStreamByTime(stream, labels, d.ingesterCfg.MaxChunkAge/2)
471+
for _, ts := range streamsByTime {
472+
maybeShardByRate(ts.Stream, ts.linesTotalLen)
473+
}
474+
} else {
475+
maybeShardByRate(stream, pushSize)
476+
}
477+
}
478+
451479
func() {
452480
sp := opentracing.SpanFromContext(ctx)
453481
if sp != nil {
@@ -456,6 +484,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
456484
sp.LogKV("event", "finished to validate request")
457485
}()
458486
}
487+
459488
for _, stream := range req.Streams {
460489
// Return early if stream does not contain any entries
461490
if len(stream.Entries) == 0 {
@@ -534,15 +563,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
534563
continue
535564
}
536565

537-
shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
538-
if shardStreamsCfg.Enabled {
539-
streams = append(streams, d.shardStream(stream, pushSize, tenantID)...)
540-
} else {
541-
streams = append(streams, KeyedStream{
542-
HashKey: lokiring.TokenFor(tenantID, stream.Labels),
543-
Stream: stream,
544-
})
545-
}
566+
maybeShardByTime(stream, lbs, pushSize)
546567
}
547568
}()
548569

@@ -721,6 +742,52 @@ func hasAnyLevelLabels(l labels.Labels) (string, bool) {
721742
return "", false
722743
}
723744

745+
type streamWithTimeShard struct {
746+
logproto.Stream
747+
linesTotalLen int
748+
}
749+
750+
// This should shard the stream into multiple sub-streams based on the log
751+
// timestamps, but with no new alocations for the log entries. It will sort them
752+
// in-place in the given stream object (so it may modify it!) and reference
753+
// sub-slices of the same stream.Entries slice.
754+
func shardStreamByTime(stream logproto.Stream, lbls labels.Labels, timeShardLen time.Duration) []streamWithTimeShard {
755+
entries := stream.Entries
756+
entriesLen := len(entries)
757+
if entriesLen == 0 {
758+
return nil
759+
}
760+
761+
slices.SortStableFunc(entries, func(a, b logproto.Entry) int { return a.Timestamp.Compare(b.Timestamp) })
762+
763+
result := make([]streamWithTimeShard, 0, entries[entriesLen-1].Timestamp.Sub(entries[0].Timestamp)/timeShardLen)
764+
labelBuilder := labels.NewBuilder(lbls)
765+
766+
for startIdx := 0; startIdx < entriesLen; /* the index is changed below */ {
767+
timeShardStart := entries[startIdx].Timestamp.Truncate(timeShardLen)
768+
timeShardEnd := timeShardStart.Add(timeShardLen)
769+
770+
endIdx := startIdx + 1
771+
linesTotalLen := len(entries[startIdx].Line)
772+
for ; endIdx < entriesLen && entries[endIdx].Timestamp.Before(timeShardEnd); endIdx++ {
773+
linesTotalLen += len(entries[endIdx].Line)
774+
}
775+
776+
shardLbls := labelBuilder.Set(timeShardLabel, fmt.Sprintf("%d_%d", timeShardStart.Unix(), timeShardEnd.Unix())).Labels()
777+
result = append(result, streamWithTimeShard{
778+
Stream: logproto.Stream{
779+
Labels: shardLbls.String(),
780+
Hash: shardLbls.Hash(),
781+
Entries: stream.Entries[startIdx:endIdx],
782+
},
783+
linesTotalLen: linesTotalLen,
784+
})
785+
786+
startIdx = endIdx
787+
}
788+
return result
789+
}
790+
724791
// shardStream shards (divides) the given stream into N smaller streams, where
725792
// N is the sharding size for the given stream. shardSteam returns the smaller
726793
// streams and their associated keys for hashing to ingesters.

‎pkg/distributor/distributor_test.go

Lines changed: 175 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,178 @@ func TestStreamShardAcrossCalls(t *testing.T) {
796796
})
797797
}
798798

799+
func TestStreamShardByTime(t *testing.T) {
800+
baseTimestamp := time.Date(2024, 10, 31, 12, 34, 56, 0, time.UTC)
801+
t.Logf("Base timestamp: %s (unix %d)", baseTimestamp.Format(time.RFC3339Nano), baseTimestamp.Unix())
802+
803+
for _, tc := range []struct {
804+
test string
805+
labels string
806+
entries []logproto.Entry
807+
timeShardLen time.Duration
808+
expResult []streamWithTimeShard
809+
}{
810+
{
811+
test: "zero shard because no entries",
812+
labels: "{app='myapp'}",
813+
entries: nil,
814+
timeShardLen: time.Hour,
815+
expResult: nil,
816+
},
817+
{
818+
test: "single shard with one entry",
819+
labels: `{app="myapp"}`,
820+
entries: []logproto.Entry{
821+
{Timestamp: baseTimestamp, Line: "foo"},
822+
//{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
823+
},
824+
timeShardLen: time.Hour,
825+
expResult: []streamWithTimeShard{
826+
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
827+
{Timestamp: baseTimestamp, Line: "foo"},
828+
}}, linesTotalLen: 3},
829+
},
830+
},
831+
{
832+
test: "single shard with two entries",
833+
labels: `{app="myapp"}`,
834+
entries: []logproto.Entry{
835+
{Timestamp: baseTimestamp, Line: "foo"},
836+
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
837+
},
838+
timeShardLen: time.Hour,
839+
expResult: []streamWithTimeShard{
840+
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
841+
{Timestamp: baseTimestamp, Line: "foo"},
842+
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
843+
}}, linesTotalLen: 6},
844+
},
845+
},
846+
{
847+
test: "single shard with two entries reversed",
848+
labels: `{app="myapp"}`,
849+
entries: []logproto.Entry{
850+
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
851+
{Timestamp: baseTimestamp, Line: "foo"},
852+
},
853+
timeShardLen: time.Hour,
854+
expResult: []streamWithTimeShard{
855+
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
856+
{Timestamp: baseTimestamp, Line: "foo"},
857+
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
858+
}}, linesTotalLen: 6},
859+
},
860+
},
861+
{
862+
test: "two shards without a gap",
863+
labels: `{app="myapp"}`,
864+
entries: []logproto.Entry{
865+
{Timestamp: baseTimestamp, Line: "foo"},
866+
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
867+
{Timestamp: baseTimestamp.Add(time.Hour), Line: "baz"},
868+
},
869+
timeShardLen: time.Hour,
870+
expResult: []streamWithTimeShard{
871+
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
872+
{Timestamp: baseTimestamp, Line: "foo"},
873+
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
874+
}}, linesTotalLen: 6},
875+
{Stream: logproto.Stream{Labels: `{__time_shard__="1730379600_1730383200", app="myapp"}`, Entries: []logproto.Entry{
876+
{Timestamp: baseTimestamp.Add(time.Hour), Line: "baz"},
877+
}}, linesTotalLen: 3},
878+
},
879+
},
880+
{
881+
test: "two shards with a gap",
882+
labels: `{app="myapp"}`,
883+
entries: []logproto.Entry{
884+
{Timestamp: baseTimestamp, Line: "foo"},
885+
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
886+
{Timestamp: baseTimestamp.Add(4 * time.Hour), Line: "baz"},
887+
},
888+
timeShardLen: time.Hour,
889+
expResult: []streamWithTimeShard{
890+
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
891+
{Timestamp: baseTimestamp, Line: "foo"},
892+
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
893+
}}, linesTotalLen: 6},
894+
{Stream: logproto.Stream{Labels: `{__time_shard__="1730390400_1730394000", app="myapp"}`, Entries: []logproto.Entry{
895+
{Timestamp: baseTimestamp.Add(4 * time.Hour), Line: "baz"},
896+
}}, linesTotalLen: 3},
897+
},
898+
},
899+
{
900+
test: "bigger shard len",
901+
labels: `{app="myapp"}`,
902+
entries: []logproto.Entry{
903+
{Timestamp: baseTimestamp, Line: "foo"},
904+
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
905+
{Timestamp: baseTimestamp.Add(6 * time.Hour), Line: "baz"},
906+
},
907+
timeShardLen: 24 * time.Hour,
908+
expResult: []streamWithTimeShard{
909+
{Stream: logproto.Stream{Labels: `{__time_shard__="1730332800_1730419200", app="myapp"}`, Entries: []logproto.Entry{
910+
{Timestamp: baseTimestamp, Line: "foo"},
911+
{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
912+
{Timestamp: baseTimestamp.Add(6 * time.Hour), Line: "baz"},
913+
}}, linesTotalLen: 9},
914+
},
915+
},
916+
{
917+
test: "longer messy gaps",
918+
labels: `{app="myapp"}`,
919+
entries: []logproto.Entry{
920+
{Timestamp: baseTimestamp.Truncate(time.Hour), Line: "11"},
921+
{Timestamp: baseTimestamp, Line: "13"},
922+
{Timestamp: baseTimestamp, Line: "14"},
923+
{Timestamp: baseTimestamp.Truncate(time.Hour), Line: "12"},
924+
{Timestamp: baseTimestamp.Add(2 * time.Hour), Line: "32"},
925+
{Timestamp: baseTimestamp.Truncate(time.Hour).Add(2 * time.Hour), Line: "31"},
926+
{Timestamp: baseTimestamp.Add(5 * time.Hour), Line: "41"},
927+
{Timestamp: baseTimestamp.Truncate(time.Hour).Add(time.Hour), Line: "21"},
928+
},
929+
timeShardLen: time.Hour,
930+
expResult: []streamWithTimeShard{
931+
{Stream: logproto.Stream{Labels: `{__time_shard__="1730376000_1730379600", app="myapp"}`, Entries: []logproto.Entry{
932+
{Timestamp: baseTimestamp.Truncate(time.Hour), Line: "11"},
933+
{Timestamp: baseTimestamp.Truncate(time.Hour), Line: "12"},
934+
{Timestamp: baseTimestamp, Line: "13"},
935+
{Timestamp: baseTimestamp, Line: "14"},
936+
}}, linesTotalLen: 8},
937+
{Stream: logproto.Stream{Labels: `{__time_shard__="1730379600_1730383200", app="myapp"}`, Entries: []logproto.Entry{
938+
{Timestamp: baseTimestamp.Truncate(time.Hour).Add(time.Hour), Line: "21"},
939+
}}, linesTotalLen: 2},
940+
{Stream: logproto.Stream{Labels: `{__time_shard__="1730383200_1730386800", app="myapp"}`, Entries: []logproto.Entry{
941+
{Timestamp: baseTimestamp.Truncate(time.Hour).Add(2 * time.Hour), Line: "31"},
942+
{Timestamp: baseTimestamp.Add(2 * time.Hour), Line: "32"},
943+
}}, linesTotalLen: 4},
944+
{Stream: logproto.Stream{Labels: `{__time_shard__="1730394000_1730397600", app="myapp"}`, Entries: []logproto.Entry{
945+
{Timestamp: baseTimestamp.Add(5 * time.Hour), Line: "41"},
946+
}}, linesTotalLen: 2},
947+
},
948+
},
949+
} {
950+
t.Run(tc.test, func(t *testing.T) {
951+
lbls, err := syntax.ParseLabels(tc.labels)
952+
require.NoError(t, err)
953+
stream := logproto.Stream{
954+
Labels: tc.labels,
955+
Hash: lbls.Hash(),
956+
Entries: tc.entries,
957+
}
958+
959+
shardedStreams := shardStreamByTime(stream, lbls, tc.timeShardLen)
960+
require.Len(t, shardedStreams, len(tc.expResult))
961+
962+
for i, ss := range shardedStreams {
963+
assert.Equal(t, tc.expResult[i].linesTotalLen, ss.linesTotalLen)
964+
assert.Equal(t, tc.expResult[i].Labels, ss.Labels)
965+
assert.EqualValues(t, tc.expResult[i].Entries, ss.Entries)
966+
}
967+
})
968+
}
969+
}
970+
799971
func generateEntries(n int) []logproto.Entry {
800972
var entries []logproto.Entry
801973
for i := 0; i < n; i++ {
@@ -1386,7 +1558,9 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation
13861558
overrides, err := validation.NewOverrides(*limits, nil)
13871559
require.NoError(t, err)
13881560

1389-
d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, partitionRingReader, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, log.NewNopLogger())
1561+
ingesterConfig := ingester.Config{MaxChunkAge: 2 * time.Hour}
1562+
1563+
d, err := New(distributorConfig, ingesterConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, partitionRingReader, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, log.NewNopLogger())
13901564
require.NoError(t, err)
13911565
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))
13921566
distributors[i] = d

‎pkg/distributor/shardstreams/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
type Config struct {
1010
Enabled bool `yaml:"enabled" json:"enabled" doc:"description=Automatically shard streams to keep them under the per-stream rate limit. Sharding is dictated by the desired rate."`
1111

12+
TimeShardingEnabled bool `yaml:"time_sharding_enabled" json:"time_sharding_enabled" doc:"description=Automatically shard streams by adding a __time_shard__ label, with values calculated from the log timestamps divided by MaxChunkAge/2. This allows the out-of-order ingestion of very old logs. If both flags are enabled, time-based sharding will happen before rate-based sharding."`
13+
1214
LoggingEnabled bool `yaml:"logging_enabled" json:"logging_enabled" doc:"description=Whether to log sharding streams behavior or not. Not recommended for production environments."`
1315

1416
// DesiredRate is the threshold used to shard the stream into smaller pieces.
@@ -18,6 +20,7 @@ type Config struct {
1820

1921
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet) {
2022
fs.BoolVar(&cfg.Enabled, prefix+".enabled", true, "Automatically shard streams to keep them under the per-stream rate limit")
23+
fs.BoolVar(&cfg.TimeShardingEnabled, prefix+".time-sharding-enabled", false, "Automatically shard streams by time (in MaxChunkAge/2 buckets), to allow out-of-order ingestion of very old logs.")
2124
fs.BoolVar(&cfg.LoggingEnabled, prefix+".logging-enabled", false, "Enable logging when sharding streams")
2225
cfg.DesiredRate.Set("1536KB") //nolint:errcheck
2326
fs.Var(&cfg.DesiredRate, prefix+".desired-rate", "threshold used to cut a new shard. Default (1536KB) means if a rate is above 1536KB/s, it will be sharded.")

‎pkg/loki/modules.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ func (t *Loki) initDistributor() (services.Service, error) {
330330
logger := log.With(util_log.Logger, "component", "distributor")
331331
t.distributor, err = distributor.New(
332332
t.Cfg.Distributor,
333+
t.Cfg.Ingester,
333334
t.Cfg.IngesterClient,
334335
t.tenantConfigs,
335336
t.ring,

0 commit comments

Comments
 (0)