Skip to content

Commit f2fc0c2

Browse files
authored
chore(dataobj): column building (#15634)
This commit adds the ability to accumulate sequences of dataset.Value into a column, which is split up across multiple pages. Each page is broken down into two parts: * A bitmap-encoded sequence of booleans, where 1 indicates a row has a value and 0 indicates the row is NULL, and * the encoded sequence of non-NULL values, whose encoding is determined by the column options. The sequence of non-NULL values is then optionally compressed. This commit also includes initial support for reading these columns, starting with internal-only helper utilities for unit tests.
1 parent 672f91c commit f2fc0c2

File tree

13 files changed

+1519
-21
lines changed

13 files changed

+1519
-21
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package dataset
2+
3+
import "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
4+
5+
// Helper types.
6+
type (
7+
// ColumnInfo describes a column.
8+
ColumnInfo struct {
9+
Name string // Name of the column, if any.
10+
Type datasetmd.ValueType // Type of values in the column.
11+
Compression datasetmd.CompressionType // Compression used for the column.
12+
13+
RowsCount int // Total number of rows in the column.
14+
CompressedSize int // Total size of all pages in the column after compression.
15+
UncompressedSize int // Total size of all pages in the column before compression.
16+
17+
Statistics *datasetmd.Statistics // Optional statistics for the column.
18+
}
19+
)
20+
21+
// MemColumn holds a set of pages of a common type.
22+
type MemColumn struct {
23+
Info ColumnInfo // Information about the column.
24+
Pages []*MemPage // The set of pages in the column.
25+
}
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package dataset
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
7+
)
8+
9+
// BuilderOptions configures common settings for building pages.
10+
type BuilderOptions struct {
11+
// PageSizeHint is the soft limit for the size of the page. Builders try to
12+
// fill pages as close to this size as possible, but the actual size may be
13+
// slightly larger or smaller.
14+
PageSizeHint int
15+
16+
// Value is the value type of data to write.
17+
Value datasetmd.ValueType
18+
19+
// Encoding is the encoding algorithm to use for values.
20+
Encoding datasetmd.EncodingType
21+
22+
// Compression is the compression algorithm to use for values.
23+
Compression datasetmd.CompressionType
24+
}
25+
26+
// A ColumnBuilder builds a sequence of [Value] entries of a common type into a
27+
// column. Values are accumulated into a buffer and then flushed into
28+
// [MemPage]s once the size of data exceeds a configurable limit.
29+
type ColumnBuilder struct {
30+
name string
31+
opts BuilderOptions
32+
33+
rows int // Total number of rows in the column.
34+
35+
pages []*MemPage
36+
builder *pageBuilder
37+
}
38+
39+
// NewColumnBuilder creates a new ColumnBuilder from the optional name and
40+
// provided options. NewColumnBuilder returns an error if the options are
41+
// invalid.
42+
func NewColumnBuilder(name string, opts BuilderOptions) (*ColumnBuilder, error) {
43+
builder, err := newPageBuilder(opts)
44+
if err != nil {
45+
return nil, fmt.Errorf("creating page builder: %w", err)
46+
}
47+
48+
return &ColumnBuilder{
49+
name: name,
50+
opts: opts,
51+
52+
builder: builder,
53+
}, nil
54+
}
55+
56+
// Append adds a new value into cb with the given zero-indexed row number. If
57+
// the row number is higher than the current number of rows in cb, null values
58+
// are added up to the new row.
59+
//
60+
// Append returns an error if the row number is out-of-order.
61+
func (cb *ColumnBuilder) Append(row int, value Value) error {
62+
if row < cb.rows {
63+
return fmt.Errorf("row %d is older than current row %d", row, cb.rows)
64+
}
65+
66+
// We give two attempts to append the data to the buffer; if the buffer is
67+
// full, we cut a page and then append to the newly reset buffer.
68+
//
69+
// The second iteration should never fail, as the buffer will always be empty
70+
// then.
71+
for range 2 {
72+
if cb.append(row, value) {
73+
cb.rows = row + 1
74+
return nil
75+
}
76+
77+
cb.flushPage()
78+
}
79+
80+
panic("ColumnBuilder.Append: failed to append value to fresh buffer")
81+
}
82+
83+
// Backfill adds NULLs into cb up to (but not including) the provided row
84+
// number. If values exist up to the provided row number, Backfill does
85+
// nothing.
86+
func (cb *ColumnBuilder) Backfill(row int) {
87+
// We give two attempts to append the data to the buffer; if the buffer is
88+
// full, we cut a page and then append again to the newly reset buffer.
89+
//
90+
// The second iteration should never fail, as the buffer will always be
91+
// empty.
92+
for range 2 {
93+
if cb.backfill(row) {
94+
return
95+
}
96+
cb.flushPage()
97+
}
98+
99+
panic("ColumnBuilder.Backfill: failed to backfill buffer")
100+
}
101+
102+
func (cb *ColumnBuilder) backfill(row int) bool {
103+
for row > cb.rows {
104+
if !cb.builder.AppendNull() {
105+
return false
106+
}
107+
cb.rows++
108+
}
109+
110+
return true
111+
}
112+
113+
func (cb *ColumnBuilder) append(row int, value Value) bool {
114+
// Backfill up to row.
115+
if !cb.backfill(row) {
116+
return false
117+
}
118+
return cb.builder.Append(value)
119+
}
120+
121+
// Flush converts data in cb into a [MemColumn]. Afterwards, cb is reset to a
122+
// fresh state and can be reused.
123+
func (cb *ColumnBuilder) Flush() (*MemColumn, error) {
124+
cb.flushPage()
125+
126+
info := ColumnInfo{
127+
Name: cb.name,
128+
Type: cb.opts.Value,
129+
130+
Compression: cb.opts.Compression,
131+
}
132+
133+
// TODO(rfratto): Should we compute column-wide statistics if they're
134+
// available in pages?
135+
//
136+
// That would potentially work for min/max values, but not for count
137+
// distinct, unless we had a way to pass sketches around.
138+
139+
for _, page := range cb.pages {
140+
info.RowsCount += page.Info.RowCount
141+
info.CompressedSize += page.Info.CompressedSize
142+
info.UncompressedSize += page.Info.UncompressedSize
143+
}
144+
145+
column := &MemColumn{
146+
Info: info,
147+
Pages: cb.pages,
148+
}
149+
150+
cb.Reset()
151+
return column, nil
152+
}
153+
154+
func (cb *ColumnBuilder) flushPage() {
155+
if cb.builder.Rows() == 0 {
156+
return
157+
}
158+
159+
page, err := cb.builder.Flush()
160+
if err != nil {
161+
// Flush should only return an error when it's empty, which we already
162+
// ensure it's not in the lines above.
163+
panic(fmt.Sprintf("failed to flush page: %s", err))
164+
}
165+
cb.pages = append(cb.pages, page)
166+
}
167+
168+
// Reset clears all data in cb and resets it to a fresh state.
169+
func (cb *ColumnBuilder) Reset() {
170+
cb.rows = 0
171+
cb.pages = nil
172+
cb.builder.Reset()
173+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package dataset
2+
3+
import "github.com/grafana/loki/v3/pkg/dataobj/internal/result"
4+
5+
func iterMemColumn(col *MemColumn) result.Seq[Value] {
6+
return result.Iter(func(yield func(Value) bool) error {
7+
for _, page := range col.Pages {
8+
for result := range iterMemPage(page, col.Info.Type, col.Info.Compression) {
9+
val, err := result.Value()
10+
if err != nil {
11+
return err
12+
} else if !yield(val) {
13+
return nil
14+
}
15+
}
16+
}
17+
18+
return nil
19+
})
20+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package dataset
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
8+
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
9+
)
10+
11+
func TestColumnBuilder_ReadWrite(t *testing.T) {
12+
in := []string{
13+
"hello, world!",
14+
"",
15+
"this is a test of the emergency broadcast system",
16+
"this is only a test",
17+
"if this were a real emergency, you would be instructed to panic",
18+
"but it's not, so don't",
19+
"",
20+
"this concludes the test",
21+
"thank you for your cooperation",
22+
"goodbye",
23+
}
24+
25+
opts := BuilderOptions{
26+
// Set the size to 0 so each column has exactly one value.
27+
PageSizeHint: 0,
28+
Value: datasetmd.VALUE_TYPE_STRING,
29+
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
30+
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
31+
}
32+
b, err := NewColumnBuilder("", opts)
33+
require.NoError(t, err)
34+
35+
for i, s := range in {
36+
require.NoError(t, b.Append(i, StringValue(s)))
37+
}
38+
39+
col, err := b.Flush()
40+
require.NoError(t, err)
41+
require.Equal(t, datasetmd.VALUE_TYPE_STRING, col.Info.Type)
42+
require.Greater(t, len(col.Pages), 1)
43+
44+
t.Log("Uncompressed size: ", col.Info.UncompressedSize)
45+
t.Log("Compressed size: ", col.Info.CompressedSize)
46+
t.Log("Pages: ", len(col.Pages))
47+
48+
var actual []string
49+
for result := range iterMemColumn(col) {
50+
val, err := result.Value()
51+
require.NoError(t, err)
52+
53+
if val.IsNil() || val.IsZero() {
54+
actual = append(actual, "")
55+
} else {
56+
require.Equal(t, datasetmd.VALUE_TYPE_STRING, val.Type())
57+
actual = append(actual, val.String())
58+
}
59+
}
60+
require.Equal(t, in, actual)
61+
}

‎pkg/dataobj/internal/dataset/page.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package dataset
2+
3+
import (
4+
"bytes"
5+
"encoding/binary"
6+
"fmt"
7+
"hash/crc32"
8+
"io"
9+
10+
"github.com/golang/snappy"
11+
"github.com/klauspost/compress/zstd"
12+
13+
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
14+
)
15+
16+
// Helper types.
17+
type (
18+
// PageData holds the raw data for a page. Data is formatted as:
19+
//
20+
// <uvarint(presence-bitmap-size)> <presence-bitmap> <values-data>
21+
//
22+
// The presence-bitmap is a bitmap-encoded sequence of booleans, where values
23+
// describe which rows are present (1) or nil (0). The presence bitmap is
24+
// always stored uncompressed.
25+
//
26+
// values-data is then the encoded and optionally compressed sequence of
27+
// non-NULL values.
28+
PageData []byte
29+
30+
// PageInfo describes a page.
31+
PageInfo struct {
32+
UncompressedSize int // UncompressedSize is the size of a page before compression.
33+
CompressedSize int // CompressedSize is the size of a page after compression.
34+
CRC32 uint32 // CRC32 checksum of the page after encoding and compression.
35+
RowCount int // RowCount is the number of rows in the page, including NULLs.
36+
37+
Encoding datasetmd.EncodingType // Encoding used for values in the page.
38+
Stats *datasetmd.Statistics // Optional statistics for the page.
39+
}
40+
)
41+
42+
// MemPage holds an encoded (and optionally compressed) sequence of [Value]
43+
// entries of a common type. Use [ColumnBuilder] to construct sets of pages.
44+
type MemPage struct {
45+
Info PageInfo // Information about the page.
46+
Data PageData // Data for the page.
47+
}
48+
49+
var checksumTable = crc32.MakeTable(crc32.Castagnoli)
50+
51+
// reader returns a reader for decompressed page data. Reader returns an error
52+
// if the CRC32 fails to validate.
53+
func (p *MemPage) reader(compression datasetmd.CompressionType) (presence io.Reader, values io.ReadCloser, err error) {
54+
if actual := crc32.Checksum(p.Data, checksumTable); p.Info.CRC32 != actual {
55+
return nil, nil, fmt.Errorf("invalid CRC32 checksum %x, expected %x", actual, p.Info.CRC32)
56+
}
57+
58+
bitmapSize, n := binary.Uvarint(p.Data)
59+
if n <= 0 {
60+
return nil, nil, fmt.Errorf("reading presence bitmap size: %w", err)
61+
}
62+
63+
var (
64+
bitmapReader = bytes.NewReader(p.Data[n : n+int(bitmapSize)])
65+
compressedDataReader = bytes.NewReader(p.Data[n+int(bitmapSize):])
66+
)
67+
68+
switch compression {
69+
case datasetmd.COMPRESSION_TYPE_UNSPECIFIED, datasetmd.COMPRESSION_TYPE_NONE:
70+
return bitmapReader, io.NopCloser(compressedDataReader), nil
71+
72+
case datasetmd.COMPRESSION_TYPE_SNAPPY:
73+
sr := snappy.NewReader(compressedDataReader)
74+
return bitmapReader, io.NopCloser(sr), nil
75+
76+
case datasetmd.COMPRESSION_TYPE_ZSTD:
77+
zr, err := zstd.NewReader(compressedDataReader)
78+
if err != nil {
79+
return nil, nil, fmt.Errorf("opening zstd reader: %w", err)
80+
}
81+
return bitmapReader, newZstdReader(zr), nil
82+
}
83+
84+
panic(fmt.Sprintf("dataset.MemPage.reader: unknown compression type %q", compression.String()))
85+
}
86+
87+
// zstdReader implements [io.ReadCloser] for a [zstd.Decoder].
88+
type zstdReader struct{ *zstd.Decoder }
89+
90+
// newZstdReader returns a new [io.ReadCloser] for a [zstd.Decoder].
91+
func newZstdReader(dec *zstd.Decoder) io.ReadCloser {
92+
return &zstdReader{Decoder: dec}
93+
}
94+
95+
// Close implements [io.Closer].
96+
func (r *zstdReader) Close() error {
97+
r.Decoder.Close()
98+
return nil
99+
}

0 commit comments

Comments
 (0)