Skip to content

Commit 9e68fb0

Browse files
authored
feat(dataobj): Add query stats collection to the dataobj readers (#17128)
1 parent 69aeda1 commit 9e68fb0

File tree

14 files changed

+1092
-106
lines changed

14 files changed

+1092
-106
lines changed

‎pkg/dataobj/internal/dataset/column_reader.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"sort"
99

1010
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear"
11+
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
1112
)
1213

1314
type columnReader struct {
@@ -41,6 +42,7 @@ func (cr *columnReader) Read(ctx context.Context, v []Value) (n int, err error)
4142
return 0, err
4243
}
4344
}
45+
statistics := stats.FromContext(ctx)
4446

4547
for n < len(v) {
4648
// Make sure our reader is initialized to the right page for the row we
@@ -52,6 +54,9 @@ func (cr *columnReader) Read(ctx context.Context, v []Value) (n int, err error)
5254
if err != nil {
5355
return n, err
5456
}
57+
if pageIndex != cr.pageIndex {
58+
statistics.AddPagesScanned(1)
59+
}
5560

5661
switch cr.reader {
5762
case nil:

‎pkg/dataobj/internal/dataset/dataset.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,11 @@ type Row struct {
8787
Index int // Index of the row in the dataset.
8888
Values []Value // Values for the row, one per [Column].
8989
}
90+
91+
func (r Row) Size() int64 {
92+
var size int64
93+
for _, v := range r.Values {
94+
size += int64(v.Size())
95+
}
96+
return size
97+
}

‎pkg/dataobj/internal/dataset/reader.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
1111
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/bitmask"
1212
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear"
13+
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
1314
)
1415

1516
// ReaderOptions configures how a [Reader] will read [Row]s.
@@ -130,17 +131,22 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) {
130131
return 0, io.EOF
131132
}
132133

134+
var totalSizeBefore int64
135+
var totalSizePostPredicate int64
136+
var totalSizeAfterFill int64
133137
var passCount int // passCount tracks how many rows pass the predicate.
134138
for i := range count {
139+
size := s[i].Size()
140+
totalSizeBefore += size
135141
if !checkPredicate(r.opts.Predicate, r.origColumnLookup, s[i]) {
136142
continue
137143
}
138-
139144
// We move s[i] to s[passCount] by *swapping* the rows. Copying would
140145
// result in the Row.Values slice existing in two places in the buffer,
141146
// which causes memory corruption when filling in rows.
142147
s[passCount], s[i] = s[i], s[passCount]
143148
passCount++
149+
totalSizePostPredicate += size
144150
}
145151

146152
if secondary := r.dl.SecondaryColumns(); len(secondary) > 0 && passCount > 0 {
@@ -161,10 +167,19 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) {
161167
} else if count != passCount {
162168
return n, fmt.Errorf("failed to fill rows: expected %d, got %d", n, count)
163169
}
170+
for i := range count {
171+
totalSizeAfterFill += s[i].Size()
172+
}
164173
}
165174

166175
n += passCount
167176

177+
statistics := stats.FromContext(ctx)
178+
statistics.AddPrePredicateDecompressedRows(int64(count))
179+
statistics.AddPrePredicateDecompressedBytes(totalSizeBefore)
180+
statistics.AddPostPredicateRows(int64(passCount))
181+
statistics.AddPostPredicateDecompressedBytes(totalSizeAfterFill - totalSizePostPredicate)
182+
168183
// We only advance r.row after we successfully read and filled rows. This
169184
// allows the caller to retry reading rows if a sporadic error occurs.
170185
r.row += int64(count)
@@ -417,6 +432,13 @@ func (r *Reader) initDownloader(ctx context.Context) error {
417432
r.dl.SetDatasetRanges(ranges)
418433
r.ranges = ranges
419434

435+
var rowsCount uint64
436+
for _, column := range r.dl.AllColumns() {
437+
rowsCount = max(rowsCount, uint64(column.ColumnInfo().RowsCount))
438+
}
439+
statistics := stats.FromContext(ctx)
440+
statistics.AddTotalRowsAvailable(int64(rowsCount))
441+
420442
return nil
421443
}
422444

‎pkg/dataobj/internal/dataset/reader_downloader.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
77
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear"
8+
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
89
)
910

1011
// readerDownloader is a utility for downloading pages in bulk from a
@@ -302,6 +303,10 @@ func (dl *readerDownloader) buildDownloadBatch(ctx context.Context, requestor *r
302303
batchSize += pageSize
303304
}
304305

306+
statistics := stats.FromContext(ctx)
307+
statistics.AddPageBatches(1)
308+
statistics.AddPagesDownloaded(int64(len(pageBatch)))
309+
statistics.AddPagesDownloadedBytes(int64(batchSize))
305310
return pageBatch, nil
306311
}
307312

‎pkg/dataobj/internal/dataset/value.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,3 +283,18 @@ func CompareValues(a, b Value) int {
283283
panic(fmt.Sprintf("page.CompareValues: unsupported type %s", a.Type()))
284284
}
285285
}
286+
287+
func (v Value) Size() int {
288+
switch v.Type() {
289+
case datasetmd.VALUE_TYPE_INT64:
290+
return int(unsafe.Sizeof(int64(0)))
291+
case datasetmd.VALUE_TYPE_UINT64:
292+
return int(unsafe.Sizeof(uint64(0)))
293+
case datasetmd.VALUE_TYPE_BYTE_ARRAY:
294+
return int(v.num)
295+
case datasetmd.VALUE_TYPE_UNSPECIFIED:
296+
return 0
297+
default:
298+
panic(fmt.Sprintf("dataset.Value.Size: unsupported type %s", v.Type()))
299+
}
300+
}

‎pkg/dataobj/querier/iter.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,8 @@ func newEntryIterator(ctx context.Context,
7171
streamExtractor log.StreamPipeline
7272
streamHash uint64
7373
top = newTopK(int(req.Limit), req.Direction)
74+
statistics = stats.FromContext(ctx)
7475
)
75-
statistics := stats.FromContext(ctx)
76-
// For dataobjs, this maps to sections downloaded
77-
statistics.AddChunksDownloaded(1)
7876

7977
for {
8078
n, err := reader.Read(ctx, buf)
@@ -98,12 +96,11 @@ func newEntryIterator(ctx context.Context,
9896
}
9997

10098
timestamp := record.Timestamp.UnixNano()
101-
statistics.AddDecompressedLines(1)
10299
line, parsedLabels, ok := streamExtractor.Process(timestamp, record.Line, record.Metadata...)
103100
if !ok {
104101
continue
105102
}
106-
statistics.AddPostFilterLines(1)
103+
statistics.AddPostFilterRows(1)
107104

108105
top.Add(entryWithLabels{
109106
Labels: parsedLabels.String(),
@@ -338,7 +335,6 @@ func newSampleIterator(ctx context.Context,
338335
// TODO(twhitney): when iterating over multiple extractors, we need a way to pre-process as much of the line as possible
339336
// In the case of multi-variant expressions, the only difference between the multiple extractors should be the final value, with all
340337
// other filters and processing already done.
341-
statistics.AddDecompressedLines(1)
342338
value, parsedLabels, ok := streamExtractor.Process(timestamp, record.Line, record.Metadata...)
343339
if !ok {
344340
continue

‎pkg/logql/bench/bench_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,9 +253,9 @@ func BenchmarkLogQL(b *testing.B) {
253253
for i := 0; i < b.N; i++ {
254254
r, err := q.Exec(ctx)
255255
require.NoError(b, err)
256-
b.ReportMetric(float64(r.Statistics.TotalDecompressedLines()), "linesScanned")
257-
b.ReportMetric(float64(r.Statistics.TotalChunksDownloaded()), "chunks/dataobjSections")
256+
b.ReportMetric(float64(r.Statistics.Summary.TotalLinesProcessed), "linesProcessed")
258257
b.ReportMetric(float64(r.Statistics.Summary.TotalPostFilterLines), "postFilterLines")
258+
b.ReportMetric(float64(r.Statistics.Summary.TotalBytesProcessed)/1024, "kilobytesProcessed")
259259
}
260260
})
261261
}

‎pkg/logqlmodel/stats/context.go

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,12 +181,16 @@ func JoinIngesters(ctx context.Context, inc Ingester) {
181181
// ComputeSummary compute the summary of the statistics.
182182
func (r *Result) ComputeSummary(execTime time.Duration, queueTime time.Duration, totalEntriesReturned int) {
183183
r.Summary.TotalBytesProcessed = r.Querier.Store.Chunk.DecompressedBytes + r.Querier.Store.Chunk.HeadChunkBytes +
184-
r.Ingester.Store.Chunk.DecompressedBytes + r.Ingester.Store.Chunk.HeadChunkBytes
184+
r.Ingester.Store.Chunk.DecompressedBytes + r.Ingester.Store.Chunk.HeadChunkBytes +
185+
r.Querier.Store.Dataobj.PrePredicateDecompressedBytes + r.Querier.Store.Dataobj.PostPredicateDecompressedBytes
185186
r.Summary.TotalStructuredMetadataBytesProcessed = r.Querier.Store.Chunk.DecompressedStructuredMetadataBytes + r.Querier.Store.Chunk.HeadChunkStructuredMetadataBytes +
186-
r.Ingester.Store.Chunk.DecompressedStructuredMetadataBytes + r.Ingester.Store.Chunk.HeadChunkStructuredMetadataBytes
187+
r.Ingester.Store.Chunk.DecompressedStructuredMetadataBytes + r.Ingester.Store.Chunk.HeadChunkStructuredMetadataBytes +
188+
r.Querier.Store.Dataobj.PrePredicateDecompressedStructuredMetadataBytes + r.Querier.Store.Dataobj.PostPredicateStructuredMetadataBytes
187189
r.Summary.TotalLinesProcessed = r.Querier.Store.Chunk.DecompressedLines + r.Querier.Store.Chunk.HeadChunkLines +
188-
r.Ingester.Store.Chunk.DecompressedLines + r.Ingester.Store.Chunk.HeadChunkLines
189-
r.Summary.TotalPostFilterLines = r.Querier.Store.Chunk.PostFilterLines + r.Ingester.Store.Chunk.PostFilterLines
190+
r.Ingester.Store.Chunk.DecompressedLines + r.Ingester.Store.Chunk.HeadChunkLines +
191+
r.Querier.Store.Dataobj.PrePredicateDecompressedRows
192+
r.Summary.TotalPostFilterLines = r.Querier.Store.Chunk.PostFilterLines + r.Ingester.Store.Chunk.PostFilterLines +
193+
r.Querier.Store.Dataobj.PostFilterRows
190194
r.Summary.ExecTime = execTime.Seconds()
191195
if execTime != 0 {
192196
r.Summary.BytesProcessedPerSecond = int64(float64(r.Summary.TotalBytesProcessed) /
@@ -217,6 +221,18 @@ func (s *Store) Merge(m Store) {
217221
s.Chunk.CompressedBytes += m.Chunk.CompressedBytes
218222
s.Chunk.TotalDuplicates += m.Chunk.TotalDuplicates
219223
s.Chunk.PostFilterLines += m.Chunk.PostFilterLines
224+
s.Dataobj.PrePredicateDecompressedRows += m.Dataobj.PrePredicateDecompressedRows
225+
s.Dataobj.PrePredicateDecompressedBytes += m.Dataobj.PrePredicateDecompressedBytes
226+
s.Dataobj.PrePredicateDecompressedStructuredMetadataBytes += m.Dataobj.PrePredicateDecompressedStructuredMetadataBytes
227+
s.Dataobj.PostPredicateDecompressedBytes += m.Dataobj.PostPredicateDecompressedBytes
228+
s.Dataobj.PostPredicateRows += m.Dataobj.PostPredicateRows
229+
s.Dataobj.PostPredicateStructuredMetadataBytes += m.Dataobj.PostPredicateStructuredMetadataBytes
230+
s.Dataobj.PostFilterRows += m.Dataobj.PostFilterRows
231+
s.Dataobj.PagesScanned += m.Dataobj.PagesScanned
232+
s.Dataobj.PagesDownloaded += m.Dataobj.PagesDownloaded
233+
s.Dataobj.PagesDownloadedBytes += m.Dataobj.PagesDownloadedBytes
234+
s.Dataobj.PageBatches += m.Dataobj.PageBatches
235+
s.Dataobj.TotalRowsAvailable += m.Dataobj.TotalRowsAvailable
220236
if m.QueryReferencedStructured {
221237
s.QueryReferencedStructured = true
222238
}
@@ -513,6 +529,50 @@ func (c *Context) AddSplitQueries(num int64) {
513529
atomic.AddInt64(&c.result.Summary.Splits, num)
514530
}
515531

532+
func (c *Context) AddPrePredicateDecompressedRows(i int64) {
533+
atomic.AddInt64(&c.store.Dataobj.PrePredicateDecompressedRows, i)
534+
}
535+
536+
func (c *Context) AddPrePredicateDecompressedBytes(i int64) {
537+
atomic.AddInt64(&c.store.Dataobj.PrePredicateDecompressedBytes, i)
538+
}
539+
540+
func (c *Context) AddPostPredicateDecompressedBytes(i int64) {
541+
atomic.AddInt64(&c.store.Dataobj.PostPredicateDecompressedBytes, i)
542+
}
543+
544+
func (c *Context) AddPostPredicateRows(i int64) {
545+
atomic.AddInt64(&c.store.Dataobj.PostPredicateRows, i)
546+
}
547+
548+
func (c *Context) AddPostPredicateStructuredMetadataBytes(i int64) {
549+
atomic.AddInt64(&c.store.Dataobj.PostPredicateStructuredMetadataBytes, i)
550+
}
551+
552+
func (c *Context) AddPostFilterRows(i int64) {
553+
atomic.AddInt64(&c.store.Dataobj.PostFilterRows, i)
554+
}
555+
556+
func (c *Context) AddPagesScanned(i int64) {
557+
atomic.AddInt64(&c.store.Dataobj.PagesScanned, i)
558+
}
559+
560+
func (c *Context) AddPagesDownloaded(i int64) {
561+
atomic.AddInt64(&c.store.Dataobj.PagesDownloaded, i)
562+
}
563+
564+
func (c *Context) AddPagesDownloadedBytes(i int64) {
565+
atomic.AddInt64(&c.store.Dataobj.PagesDownloadedBytes, i)
566+
}
567+
568+
func (c *Context) AddPageBatches(i int64) {
569+
atomic.AddInt64(&c.store.Dataobj.PageBatches, i)
570+
}
571+
572+
func (c *Context) AddTotalRowsAvailable(i int64) {
573+
atomic.AddInt64(&c.store.Dataobj.TotalRowsAvailable, i)
574+
}
575+
516576
func (c *Context) SetQueryReferencedStructuredMetadata() {
517577
c.store.QueryReferencedStructured = true
518578
}

0 commit comments

Comments
 (0)