66 "fmt"
77 "math"
88 "net/http"
9+ "slices"
910 "sort"
1011 "strconv"
1112 "strings"
@@ -58,6 +59,8 @@ const (
5859 ringKey = "distributor"
5960
6061 ringAutoForgetUnhealthyPeriods = 2
62+
63+ timeShardLabel = "__time_shard__"
6164)
6265
6366var (
@@ -120,6 +123,7 @@ type Distributor struct {
120123 services.Service
121124
122125 cfg Config
126+ ingesterCfg ingester.Config
123127 logger log.Logger
124128 clientCfg client.Config
125129 tenantConfigs * runtime.TenantConfigs
@@ -175,6 +179,7 @@ type Distributor struct {
175179// New a distributor creates.
176180func New (
177181 cfg Config ,
182+ ingesterCfg ingester.Config ,
178183 clientCfg client.Config ,
179184 configs * runtime.TenantConfigs ,
180185 ingestersRing ring.ReadRing ,
@@ -233,6 +238,7 @@ func New(
233238
234239 d := & Distributor {
235240 cfg : cfg ,
241+ ingesterCfg : ingesterCfg ,
236242 logger : logger ,
237243 clientCfg : clientCfg ,
238244 tenantConfigs : configs ,
@@ -434,10 +440,42 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
434440 validatedLineCount := 0
435441
436442 var validationErrors util.GroupedErrors
437- validationContext := d .validator .getValidationContextForTime (time .Now (), tenantID )
443+
444+ now := time .Now ()
445+ validationContext := d .validator .getValidationContextForTime (now , tenantID )
438446 levelDetector := newLevelDetector (validationContext )
439447 shouldDiscoverLevels := levelDetector .shouldDiscoverLogLevels ()
440448
449+ shardStreamsCfg := d .validator .Limits .ShardStreams (tenantID )
450+ maybeShardByRate := func (stream logproto.Stream , pushSize int ) {
451+ if shardStreamsCfg .Enabled {
452+ streams = append (streams , d .shardStream (stream , pushSize , tenantID )... )
453+ return
454+ }
455+ streams = append (streams , KeyedStream {
456+ HashKey : lokiring .TokenFor (tenantID , stream .Labels ),
457+ Stream : stream ,
458+ })
459+ }
460+
461+ maybeShardStreams := func (stream logproto.Stream , labels labels.Labels , pushSize int ) {
462+ if ! shardStreamsCfg .TimeShardingEnabled {
463+ maybeShardByRate (stream , pushSize )
464+ return
465+ }
466+
467+ ignoreRecentFrom := now .Add (- shardStreamsCfg .TimeShardingIgnoreRecent )
468+ streamsByTime , ok := shardStreamByTime (stream , labels , d .ingesterCfg .MaxChunkAge / 2 , ignoreRecentFrom )
469+ if ! ok {
470+ maybeShardByRate (stream , pushSize )
471+ return
472+ }
473+
474+ for _ , ts := range streamsByTime {
475+ maybeShardByRate (ts .Stream , ts .linesTotalLen )
476+ }
477+ }
478+
441479 func () {
442480 sp := opentracing .SpanFromContext (ctx )
443481 if sp != nil {
@@ -446,6 +484,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
446484 sp .LogKV ("event" , "finished to validate request" )
447485 }()
448486 }
487+
449488 for _ , stream := range req .Streams {
450489 // Return early if stream does not contain any entries
451490 if len (stream .Entries ) == 0 {
@@ -512,15 +551,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
512551 continue
513552 }
514553
515- shardStreamsCfg := d .validator .Limits .ShardStreams (tenantID )
516- if shardStreamsCfg .Enabled {
517- streams = append (streams , d .shardStream (stream , pushSize , tenantID )... )
518- } else {
519- streams = append (streams , KeyedStream {
520- HashKey : lokiring .TokenFor (tenantID , stream .Labels ),
521- Stream : stream ,
522- })
523- }
554+ maybeShardStreams (stream , lbs , pushSize )
524555 }
525556 }()
526557
@@ -534,8 +565,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
534565 return & logproto.PushResponse {}, validationErr
535566 }
536567
537- now := time .Now ()
538-
539568 if block , until , retStatusCode := d .validator .ShouldBlockIngestion (validationContext , now ); block {
540569 d .trackDiscardedData (ctx , req , validationContext , tenantID , validatedLineCount , validatedLineSize , validation .BlockedIngestion )
541570
@@ -690,6 +719,90 @@ func (d *Distributor) trackDiscardedData(
690719 }
691720}
692721
722+ type streamWithTimeShard struct {
723+ logproto.Stream
724+ linesTotalLen int
725+ }
726+
727+ // This should shard the stream into multiple sub-streams based on the log
728+ // timestamps, but with no new alocations for the log entries. It will sort them
729+ // in-place in the given stream object (so it may modify it!) and reference
730+ // sub-slices of the same stream.Entries slice.
731+ //
732+ // If the second result is false, it means that either there were no logs in the
733+ // stream, or all of the logs in the stream occurred after the given value of
734+ // ignoreLogsFrom, so there was no need to shard - the original `streams` value
735+ // can be used. However, due to the in-place logs sorting by their timestamp, it
736+ // might still have been reordered.
737+ func shardStreamByTime (stream logproto.Stream , lbls labels.Labels , timeShardLen time.Duration , ignoreLogsFrom time.Time ) ([]streamWithTimeShard , bool ) {
738+ entries := stream .Entries
739+ entriesLen := len (entries )
740+ if entriesLen == 0 {
741+ return nil , false
742+ }
743+
744+ slices .SortStableFunc (entries , func (a , b logproto.Entry ) int { return a .Timestamp .Compare (b .Timestamp ) })
745+
746+ // Shortcut to do no work if all of the logs are recent
747+ if entries [0 ].Timestamp .After (ignoreLogsFrom ) {
748+ return nil , false
749+ }
750+
751+ result := make ([]streamWithTimeShard , 0 , (entries [entriesLen - 1 ].Timestamp .Sub (entries [0 ].Timestamp )/ timeShardLen )+ 1 )
752+ labelBuilder := labels .NewBuilder (lbls )
753+
754+ startIdx := 0
755+ for startIdx < entriesLen && entries [startIdx ].Timestamp .Before (ignoreLogsFrom ) /* the index is changed below */ {
756+ timeShardStart := entries [startIdx ].Timestamp .Truncate (timeShardLen )
757+ timeShardEnd := timeShardStart .Add (timeShardLen )
758+
759+ timeShardCutoff := timeShardEnd
760+ if timeShardCutoff .After (ignoreLogsFrom ) {
761+ // If the time_sharding_ignore_recent is in the middle of this
762+ // shard, we need to cut off the logs at that point.
763+ timeShardCutoff = ignoreLogsFrom
764+ }
765+
766+ endIdx := startIdx + 1
767+ linesTotalLen := len (entries [startIdx ].Line )
768+ for ; endIdx < entriesLen && entries [endIdx ].Timestamp .Before (timeShardCutoff ); endIdx ++ {
769+ linesTotalLen += len (entries [endIdx ].Line )
770+ }
771+
772+ shardLbls := labelBuilder .Set (timeShardLabel , fmt .Sprintf ("%d_%d" , timeShardStart .Unix (), timeShardEnd .Unix ())).Labels ()
773+ result = append (result , streamWithTimeShard {
774+ Stream : logproto.Stream {
775+ Labels : shardLbls .String (),
776+ Hash : shardLbls .Hash (),
777+ Entries : stream .Entries [startIdx :endIdx ],
778+ },
779+ linesTotalLen : linesTotalLen ,
780+ })
781+
782+ startIdx = endIdx
783+ }
784+
785+ if startIdx == entriesLen {
786+ // We do not have any remaining entries
787+ return result , true
788+ }
789+
790+ // Append one last shard with all of the logs without a time shard
791+ logsWithoutTimeShardLen := 0
792+ for i := startIdx ; i < entriesLen ; i ++ {
793+ logsWithoutTimeShardLen += len (entries [i ].Line )
794+ }
795+
796+ return append (result , streamWithTimeShard {
797+ Stream : logproto.Stream {
798+ Labels : stream .Labels ,
799+ Hash : stream .Hash ,
800+ Entries : stream .Entries [startIdx :entriesLen ],
801+ },
802+ linesTotalLen : logsWithoutTimeShardLen ,
803+ }), true
804+ }
805+
693806// shardStream shards (divides) the given stream into N smaller streams, where
694807// N is the sharding size for the given stream. shardSteam returns the smaller
695808// streams and their associated keys for hashing to ingesters.
0 commit comments