@@ -796,6 +796,178 @@ func TestStreamShardAcrossCalls(t *testing.T) {
796796 })
797797}
798798
799+ func TestStreamShardByTime (t * testing.T ) {
800+ baseTimestamp := time .Date (2024 , 10 , 31 , 12 , 34 , 56 , 0 , time .UTC )
801+ t .Logf ("Base timestamp: %s (unix %d)" , baseTimestamp .Format (time .RFC3339Nano ), baseTimestamp .Unix ())
802+
803+ for _ , tc := range []struct {
804+ test string
805+ labels string
806+ entries []logproto.Entry
807+ timeShardLen time.Duration
808+ expResult []streamWithTimeShard
809+ }{
810+ {
811+ test : "zero shard because no entries" ,
812+ labels : "{app='myapp'}" ,
813+ entries : nil ,
814+ timeShardLen : time .Hour ,
815+ expResult : nil ,
816+ },
817+ {
818+ test : "single shard with one entry" ,
819+ labels : `{app="myapp"}` ,
820+ entries : []logproto.Entry {
821+ {Timestamp : baseTimestamp , Line : "foo" },
822+ //{Timestamp: baseTimestamp.Add(time.Second), Line: "bar"},
823+ },
824+ timeShardLen : time .Hour ,
825+ expResult : []streamWithTimeShard {
826+ {Stream : logproto.Stream {Labels : `{__time_shard__="1730376000_1730379600", app="myapp"}` , Entries : []logproto.Entry {
827+ {Timestamp : baseTimestamp , Line : "foo" },
828+ }}, linesTotalLen : 3 },
829+ },
830+ },
831+ {
832+ test : "single shard with two entries" ,
833+ labels : `{app="myapp"}` ,
834+ entries : []logproto.Entry {
835+ {Timestamp : baseTimestamp , Line : "foo" },
836+ {Timestamp : baseTimestamp .Add (time .Second ), Line : "bar" },
837+ },
838+ timeShardLen : time .Hour ,
839+ expResult : []streamWithTimeShard {
840+ {Stream : logproto.Stream {Labels : `{__time_shard__="1730376000_1730379600", app="myapp"}` , Entries : []logproto.Entry {
841+ {Timestamp : baseTimestamp , Line : "foo" },
842+ {Timestamp : baseTimestamp .Add (time .Second ), Line : "bar" },
843+ }}, linesTotalLen : 6 },
844+ },
845+ },
846+ {
847+ test : "single shard with two entries reversed" ,
848+ labels : `{app="myapp"}` ,
849+ entries : []logproto.Entry {
850+ {Timestamp : baseTimestamp .Add (time .Second ), Line : "bar" },
851+ {Timestamp : baseTimestamp , Line : "foo" },
852+ },
853+ timeShardLen : time .Hour ,
854+ expResult : []streamWithTimeShard {
855+ {Stream : logproto.Stream {Labels : `{__time_shard__="1730376000_1730379600", app="myapp"}` , Entries : []logproto.Entry {
856+ {Timestamp : baseTimestamp , Line : "foo" },
857+ {Timestamp : baseTimestamp .Add (time .Second ), Line : "bar" },
858+ }}, linesTotalLen : 6 },
859+ },
860+ },
861+ {
862+ test : "two shards without a gap" ,
863+ labels : `{app="myapp"}` ,
864+ entries : []logproto.Entry {
865+ {Timestamp : baseTimestamp , Line : "foo" },
866+ {Timestamp : baseTimestamp .Add (time .Second ), Line : "bar" },
867+ {Timestamp : baseTimestamp .Add (time .Hour ), Line : "baz" },
868+ },
869+ timeShardLen : time .Hour ,
870+ expResult : []streamWithTimeShard {
871+ {Stream : logproto.Stream {Labels : `{__time_shard__="1730376000_1730379600", app="myapp"}` , Entries : []logproto.Entry {
872+ {Timestamp : baseTimestamp , Line : "foo" },
873+ {Timestamp : baseTimestamp .Add (time .Second ), Line : "bar" },
874+ }}, linesTotalLen : 6 },
875+ {Stream : logproto.Stream {Labels : `{__time_shard__="1730379600_1730383200", app="myapp"}` , Entries : []logproto.Entry {
876+ {Timestamp : baseTimestamp .Add (time .Hour ), Line : "baz" },
877+ }}, linesTotalLen : 3 },
878+ },
879+ },
880+ {
881+ test : "two shards with a gap" ,
882+ labels : `{app="myapp"}` ,
883+ entries : []logproto.Entry {
884+ {Timestamp : baseTimestamp , Line : "foo" },
885+ {Timestamp : baseTimestamp .Add (time .Second ), Line : "bar" },
886+ {Timestamp : baseTimestamp .Add (4 * time .Hour ), Line : "baz" },
887+ },
888+ timeShardLen : time .Hour ,
889+ expResult : []streamWithTimeShard {
890+ {Stream : logproto.Stream {Labels : `{__time_shard__="1730376000_1730379600", app="myapp"}` , Entries : []logproto.Entry {
891+ {Timestamp : baseTimestamp , Line : "foo" },
892+ {Timestamp : baseTimestamp .Add (time .Second ), Line : "bar" },
893+ }}, linesTotalLen : 6 },
894+ {Stream : logproto.Stream {Labels : `{__time_shard__="1730390400_1730394000", app="myapp"}` , Entries : []logproto.Entry {
895+ {Timestamp : baseTimestamp .Add (4 * time .Hour ), Line : "baz" },
896+ }}, linesTotalLen : 3 },
897+ },
898+ },
899+ {
900+ test : "bigger shard len" ,
901+ labels : `{app="myapp"}` ,
902+ entries : []logproto.Entry {
903+ {Timestamp : baseTimestamp , Line : "foo" },
904+ {Timestamp : baseTimestamp .Add (time .Second ), Line : "bar" },
905+ {Timestamp : baseTimestamp .Add (6 * time .Hour ), Line : "baz" },
906+ },
907+ timeShardLen : 24 * time .Hour ,
908+ expResult : []streamWithTimeShard {
909+ {Stream : logproto.Stream {Labels : `{__time_shard__="1730332800_1730419200", app="myapp"}` , Entries : []logproto.Entry {
910+ {Timestamp : baseTimestamp , Line : "foo" },
911+ {Timestamp : baseTimestamp .Add (time .Second ), Line : "bar" },
912+ {Timestamp : baseTimestamp .Add (6 * time .Hour ), Line : "baz" },
913+ }}, linesTotalLen : 9 },
914+ },
915+ },
916+ {
917+ test : "longer messy gaps" ,
918+ labels : `{app="myapp"}` ,
919+ entries : []logproto.Entry {
920+ {Timestamp : baseTimestamp .Truncate (time .Hour ), Line : "11" },
921+ {Timestamp : baseTimestamp , Line : "13" },
922+ {Timestamp : baseTimestamp , Line : "14" },
923+ {Timestamp : baseTimestamp .Truncate (time .Hour ), Line : "12" },
924+ {Timestamp : baseTimestamp .Add (2 * time .Hour ), Line : "32" },
925+ {Timestamp : baseTimestamp .Truncate (time .Hour ).Add (2 * time .Hour ), Line : "31" },
926+ {Timestamp : baseTimestamp .Add (5 * time .Hour ), Line : "41" },
927+ {Timestamp : baseTimestamp .Truncate (time .Hour ).Add (time .Hour ), Line : "21" },
928+ },
929+ timeShardLen : time .Hour ,
930+ expResult : []streamWithTimeShard {
931+ {Stream : logproto.Stream {Labels : `{__time_shard__="1730376000_1730379600", app="myapp"}` , Entries : []logproto.Entry {
932+ {Timestamp : baseTimestamp .Truncate (time .Hour ), Line : "11" },
933+ {Timestamp : baseTimestamp .Truncate (time .Hour ), Line : "12" },
934+ {Timestamp : baseTimestamp , Line : "13" },
935+ {Timestamp : baseTimestamp , Line : "14" },
936+ }}, linesTotalLen : 8 },
937+ {Stream : logproto.Stream {Labels : `{__time_shard__="1730379600_1730383200", app="myapp"}` , Entries : []logproto.Entry {
938+ {Timestamp : baseTimestamp .Truncate (time .Hour ).Add (time .Hour ), Line : "21" },
939+ }}, linesTotalLen : 2 },
940+ {Stream : logproto.Stream {Labels : `{__time_shard__="1730383200_1730386800", app="myapp"}` , Entries : []logproto.Entry {
941+ {Timestamp : baseTimestamp .Truncate (time .Hour ).Add (2 * time .Hour ), Line : "31" },
942+ {Timestamp : baseTimestamp .Add (2 * time .Hour ), Line : "32" },
943+ }}, linesTotalLen : 4 },
944+ {Stream : logproto.Stream {Labels : `{__time_shard__="1730394000_1730397600", app="myapp"}` , Entries : []logproto.Entry {
945+ {Timestamp : baseTimestamp .Add (5 * time .Hour ), Line : "41" },
946+ }}, linesTotalLen : 2 },
947+ },
948+ },
949+ } {
950+ t .Run (tc .test , func (t * testing.T ) {
951+ lbls , err := syntax .ParseLabels (tc .labels )
952+ require .NoError (t , err )
953+ stream := logproto.Stream {
954+ Labels : tc .labels ,
955+ Hash : lbls .Hash (),
956+ Entries : tc .entries ,
957+ }
958+
959+ shardedStreams := shardStreamByTime (stream , lbls , tc .timeShardLen )
960+ require .Len (t , shardedStreams , len (tc .expResult ))
961+
962+ for i , ss := range shardedStreams {
963+ assert .Equal (t , tc .expResult [i ].linesTotalLen , ss .linesTotalLen )
964+ assert .Equal (t , tc .expResult [i ].Labels , ss .Labels )
965+ assert .EqualValues (t , tc .expResult [i ].Entries , ss .Entries )
966+ }
967+ })
968+ }
969+ }
970+
799971func generateEntries (n int ) []logproto.Entry {
800972 var entries []logproto.Entry
801973 for i := 0 ; i < n ; i ++ {
@@ -1386,7 +1558,9 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation
13861558 overrides , err := validation .NewOverrides (* limits , nil )
13871559 require .NoError (t , err )
13881560
1389- d , err := New (distributorConfig , clientConfig , runtime .DefaultTenantConfigs (), ingestersRing , partitionRingReader , overrides , prometheus .NewPedanticRegistry (), constants .Loki , nil , nil , log .NewNopLogger ())
1561+ ingesterConfig := ingester.Config {MaxChunkAge : 2 * time .Hour }
1562+
1563+ d , err := New (distributorConfig , ingesterConfig , clientConfig , runtime .DefaultTenantConfigs (), ingestersRing , partitionRingReader , overrides , prometheus .NewPedanticRegistry (), constants .Loki , nil , nil , log .NewNopLogger ())
13901564 require .NoError (t , err )
13911565 require .NoError (t , services .StartAndAwaitRunning (context .Background (), d ))
13921566 distributors [i ] = d
0 commit comments