@@ -11,12 +11,14 @@ import (
11
11
"github.com/thanos-io/objstore"
12
12
"golang.org/x/sync/errgroup"
13
13
14
+ "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
14
15
"github.com/grafana/loki/v3/pkg/dataobj/internal/encoding"
15
16
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
16
17
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
17
18
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
18
19
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
19
20
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
21
+ "github.com/grafana/loki/v3/pkg/dataobj/internal/sections/streams"
20
22
)
21
23
22
24
type FileMetadata struct {
@@ -68,6 +70,9 @@ type SectionMetadata struct {
68
70
TotalUncompressedSize uint64 `json:"totalUncompressedSize"`
69
71
ColumnCount int `json:"columnCount"`
70
72
Columns []ColumnWithPages `json:"columns"`
73
+ Distribution []uint64 `json:"distribution"`
74
+ MinTimestamp time.Time `json:"minTimestamp"`
75
+ MaxTimestamp time.Time `json:"maxTimestamp"`
71
76
}
72
77
73
78
func (s * Service ) handleInspect (w http.ResponseWriter , r * http.Request ) {
@@ -90,6 +95,10 @@ func (s *Service) handleInspect(w http.ResponseWriter, r *http.Request) {
90
95
91
96
metadata := inspectFile (r .Context (), s .bucket , filename )
92
97
metadata .LastModified = attrs .LastModified .UTC ()
98
+ for _ , section := range metadata .Sections {
99
+ section .MinTimestamp = section .MinTimestamp .UTC ().Truncate (time .Second )
100
+ section .MaxTimestamp = section .MaxTimestamp .UTC ().Truncate (time .Second )
101
+ }
93
102
94
103
w .Header ().Set ("Content-Type" , "application/json" )
95
104
if err := json .NewEncoder (w ).Encode (metadata ); err != nil {
@@ -147,14 +156,16 @@ func inspectFile(ctx context.Context, bucket objstore.BucketReader, path string)
147
156
sectionMeta , err = inspectLogsSection (ctx , reader , section )
148
157
if err != nil {
149
158
return FileMetadata {
150
- Error : fmt .Sprintf ("failed to inspect logs section: %v" , err ),
159
+ Sections : make ([]SectionMetadata , 0 , len (sections )),
160
+ Error : fmt .Sprintf ("failed to inspect logs section: %v" , err ),
151
161
}
152
162
}
153
163
case filemd .SECTION_TYPE_STREAMS :
154
164
sectionMeta , err = inspectStreamsSection (ctx , reader , section )
155
165
if err != nil {
156
166
return FileMetadata {
157
- Error : fmt .Sprintf ("failed to inspect streams section: %v" , err ),
167
+ Sections : make ([]SectionMetadata , 0 , len (sections )),
168
+ Error : fmt .Sprintf ("failed to inspect streams section: %v" , err ),
158
169
}
159
170
}
160
171
}
@@ -254,8 +265,10 @@ func inspectStreamsSection(ctx context.Context, reader encoding.Decoder, section
254
265
meta .ColumnCount = len (cols )
255
266
256
267
// Create error group for parallel execution
257
- g , ctx := errgroup .WithContext (ctx )
268
+ g , _ := errgroup .WithContext (ctx )
258
269
270
+ globalMaxTimestamp := time.Time {}
271
+ globalMinTimestamp := time.Time {}
259
272
// Process each column in parallel
260
273
for i , col := range cols {
261
274
meta .TotalCompressedSize += col .Info .CompressedSize
@@ -268,6 +281,18 @@ func inspectStreamsSection(ctx context.Context, reader encoding.Decoder, section
268
281
return err
269
282
}
270
283
284
+ if col .Type == streamsmd .COLUMN_TYPE_MAX_TIMESTAMP && col .Info .Statistics != nil {
285
+ var ts dataset.Value
286
+ _ = ts .UnmarshalBinary (col .Info .Statistics .MaxValue )
287
+ globalMaxTimestamp = time .Unix (0 , ts .Int64 ()).UTC ()
288
+ }
289
+
290
+ if col .Type == streamsmd .COLUMN_TYPE_MIN_TIMESTAMP && col .Info .Statistics != nil {
291
+ var ts dataset.Value
292
+ _ = ts .UnmarshalBinary (col .Info .Statistics .MinValue )
293
+ globalMinTimestamp = time .Unix (0 , ts .Int64 ()).UTC ()
294
+ }
295
+
271
296
var pageInfos []PageInfo
272
297
for _ , pages := range pageSets {
273
298
for _ , page := range pages {
@@ -309,5 +334,27 @@ func inspectStreamsSection(ctx context.Context, reader encoding.Decoder, section
309
334
return meta , err
310
335
}
311
336
337
+ if globalMaxTimestamp .IsZero () || globalMinTimestamp .IsZero () {
338
+ // Short circuit if we don't have any timestamps
339
+ return meta , nil
340
+ }
341
+
342
+ width := int (globalMaxTimestamp .Add (1 * time .Hour ).Truncate (time .Hour ).Sub (globalMinTimestamp .Truncate (time .Hour )).Hours ())
343
+ counts := make ([]uint64 , width )
344
+ for streamVal := range streams .Iter (ctx , reader ) {
345
+ stream , err := streamVal .Value ()
346
+ if err != nil {
347
+ return meta , err
348
+ }
349
+ for i := stream .MinTimestamp ; ! i .After (stream .MaxTimestamp ); i = i .Add (time .Hour ) {
350
+ hoursBeforeMax := int (globalMaxTimestamp .Sub (i ).Hours ())
351
+ counts [hoursBeforeMax ]++
352
+ }
353
+ }
354
+
355
+ meta .MinTimestamp = globalMinTimestamp
356
+ meta .MaxTimestamp = globalMaxTimestamp
357
+ meta .Distribution = counts
358
+
312
359
return meta , nil
313
360
}
0 commit comments