Skip to content

chore(dataobj): predicate pushdown metadata label filters #16846

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
9 changes: 9 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,15 @@ dataobj:
# CLI flag: -dataobj-querier-shard-factor
[shard_factor: <int> | default = 32]

predicate_pushdown:
# Apply line filter predicates on dataobj reader.
# CLI flag: -dataobj-querier.predicate-pushdown.enable-for-line-filters
[enable_for_line_filters: <boolean> | default = true]

# Apply metadata filter predicates on dataobj reader.
# CLI flag: -dataobj-querier.predicate-pushdown.enable-for-metadata-filters
[enable_for_metadata_filters: <boolean> | default = true]

# The prefix to use for the storage bucket.
# CLI flag: -dataobj-storage-bucket-prefix
[storage_bucket_prefix: <string> | default = "dataobj/"]
Expand Down
1 change: 1 addition & 0 deletions pkg/dataobj/internal/dataset/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func checkPredicate(p Predicate, lookup map[Column]int, row Row) bool {
if !ok {
panic("checkPredicate: column not found")
}

return p.Keep(p.Column, row.Values[columnIndex])

default:
Expand Down
68 changes: 54 additions & 14 deletions pkg/dataobj/logs_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ type LogsReader struct {

buf []dataset.Row

reader *dataset.Reader
columns []dataset.Column
columnDesc []*logsmd.ColumnDesc
reader *dataset.Reader
columns []dataset.Column

sectionInfo *filemd.SectionInfo
columnDesc []*logsmd.ColumnDesc
}

// NewLogsReader creates a new LogsReader that reads from the logs section of
Expand Down Expand Up @@ -136,31 +138,64 @@ func (r *LogsReader) Read(ctx context.Context, s []Record) (int, error) {
return n, nil
}

func (r *LogsReader) initReader(ctx context.Context) error {
dec := r.obj.dec.LogsDecoder()
sec, err := r.findSection(ctx)
// Columns returns the column descriptions for the logs section.
func (r *LogsReader) Columns(ctx context.Context) ([]*logsmd.ColumnDesc, error) {
if r.columnDesc != nil {
return r.columnDesc, nil
}

si, err := r.findSection(ctx)
if err != nil {
return fmt.Errorf("finding section: %w", err)
return nil, fmt.Errorf("finding section: %w", err)
}

columnDescs, err := dec.Columns(ctx, sec)
columnDesc, err := r.obj.dec.LogsDecoder().Columns(ctx, si)
if err != nil {
return fmt.Errorf("reading columns: %w", err)
return nil, fmt.Errorf("reading columns: %w", err)
}

dset := encoding.LogsDataset(dec, sec)
r.sectionInfo = si
r.columnDesc = columnDesc

return r.columnDesc, nil
}

func (r *LogsReader) initReader(ctx context.Context) error {
dec := r.obj.dec.LogsDecoder()

// sectionInfo and columnDesc could be populated by a previous call to Columns method.
// avoid re-reading them.
if r.sectionInfo == nil {
si, err := r.findSection(ctx)
if err != nil {
return fmt.Errorf("finding section: %w", err)
}

r.sectionInfo = si
}

if r.columnDesc == nil {
columnDescs, err := dec.Columns(ctx, r.sectionInfo)
if err != nil {
return fmt.Errorf("reading columns: %w", err)
}

r.columnDesc = columnDescs
}

dset := encoding.LogsDataset(dec, r.sectionInfo)
columns, err := result.Collect(dset.ListColumns(ctx))
if err != nil {
return fmt.Errorf("reading columns: %w", err)
}

// r.predicate doesn't contain mappings of stream IDs; we need to build
// that as a separate predicate and AND them together.
predicate := streamIDPredicate(maps.Keys(r.matchIDs), columns, columnDescs)
predicate := streamIDPredicate(maps.Keys(r.matchIDs), columns, r.columnDesc)
if r.predicate != nil {
predicate = dataset.AndPredicate{
Left: predicate,
Right: translateLogsPredicate(r.predicate, columns, columnDescs),
Right: translateLogsPredicate(r.predicate, columns, r.columnDesc),
}
}

Expand All @@ -178,7 +213,6 @@ func (r *LogsReader) initReader(ctx context.Context) error {
r.reader.Reset(readerOpts)
}

r.columnDesc = columnDescs
r.columns = columns
r.ready = true
return nil
Expand Down Expand Up @@ -230,6 +264,7 @@ func (r *LogsReader) Reset(obj *Object, sectionIndex int) {

r.columns = nil
r.columnDesc = nil
r.sectionInfo = nil

// We leave r.reader as-is to avoid reallocating; it'll be reset on the first
// call to Read.
Expand Down Expand Up @@ -316,7 +351,9 @@ func translateLogsPredicate(p LogsPredicate, columns []dataset.Column, columnDes
return dataset.FuncPredicate{
Column: messageColumn,
Keep: func(_ dataset.Column, value dataset.Value) bool {
if value.Type() == datasetmd.VALUE_TYPE_STRING {
if value.IsNil() {
return p.Keep(nil)
} else if value.Type() == datasetmd.VALUE_TYPE_STRING {
// To handle older dataobjs that still use string type for message column. This can be removed in future.
return p.Keep([]byte(value.String()))
}
Expand Down Expand Up @@ -347,6 +384,9 @@ func translateLogsPredicate(p LogsPredicate, columns []dataset.Column, columnDes
return dataset.FuncPredicate{
Column: metadataColumn,
Keep: func(_ dataset.Column, value dataset.Value) bool {
if value.IsNil() {
return p.Keep(p.Key, "")
}
return p.Keep(p.Key, valueToString(value))
},
}
Expand Down
110 changes: 110 additions & 0 deletions pkg/dataobj/predicate.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package dataobj

import (
"fmt"
"strings"
"time"
)

type (
// Predicate is an expression used to filter entries in a data object.
Predicate interface {
isPredicate()
String() string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

general question: do we want to make String() string a required method on every Predicate? Or may some of them just happen to implement String?

}

// StreamsPredicate is a [Predicate] that can be used to filter streams in a
Expand Down Expand Up @@ -62,13 +65,15 @@ type (
// predicates.
LabelFilterPredicate struct {
Name string
Desc string // Description of the filter for debugging.
Keep func(name, value string) bool
}

// A LogMessageFilterPredicate is a [LogsPredicate] that requires the log message
// of the entry to pass a Keep function.
LogMessageFilterPredicate struct {
Keep func(line []byte) bool
Desc string // Description of the filter for debugging.
Comment on lines 75 to +76
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for consistency with the other function predicates, should Desc be before Keep here?

}

// A MetadataMatcherPredicate is a [LogsPredicate] that requires a metadata
Expand All @@ -86,6 +91,7 @@ type (
// predicates.
MetadataFilterPredicate struct {
Key string
Desc string // Description of the filter for debugging.
Keep func(key, value string) bool
}
)
Expand All @@ -109,3 +115,107 @@ func (LabelFilterPredicate) predicateKind(StreamsPredicate) {}
func (MetadataMatcherPredicate) predicateKind(LogsPredicate) {}
func (MetadataFilterPredicate) predicateKind(LogsPredicate) {}
func (LogMessageFilterPredicate) predicateKind(LogsPredicate) {}

func (p AndPredicate[P]) String() string {
var sb strings.Builder
sb.WriteString("(")
sb.WriteString(p.Left.String())
sb.WriteString(" AND ")
sb.WriteString(p.Right.String())
sb.WriteString(")")
return sb.String()
}

func (p OrPredicate[P]) String() string {
var sb strings.Builder
sb.WriteString("(")
sb.WriteString(p.Left.String())
sb.WriteString(" OR ")
sb.WriteString(p.Right.String())
sb.WriteString(")")
return sb.String()
}

func (p NotPredicate[P]) String() string {
var sb strings.Builder
sb.WriteString("NOT(")
sb.WriteString(p.Inner.String())
sb.WriteString(")")
return sb.String()
}

func (p TimeRangePredicate[P]) String() string {
var sb strings.Builder

// Use standard mathematical interval notation
sb.WriteString("TimeRange")
if p.IncludeStart {
sb.WriteString("[")
} else {
sb.WriteString("(")
}

sb.WriteString(p.StartTime.Format(time.RFC3339))
sb.WriteString(", ")
sb.WriteString(p.EndTime.Format(time.RFC3339))

if p.IncludeEnd {
sb.WriteString("]")
} else {
sb.WriteString(")")
}

return sb.String()
}

func (p LabelMatcherPredicate) String() string {
var sb strings.Builder
sb.WriteString("Label(")
sb.WriteString(p.Name)
sb.WriteString("=")
sb.WriteString(p.Value)
sb.WriteString(")")
return sb.String()
}

func (p LabelFilterPredicate) String() string {
var sb strings.Builder
sb.WriteString("LabelFilter(")
sb.WriteString(p.Name)
if p.Desc != "" {
sb.WriteString(fmt.Sprintf(", description=%q", p.Desc))
}
sb.WriteString(")")
return sb.String()
}

func (p LogMessageFilterPredicate) String() string {
var sb strings.Builder
sb.WriteString("LogMessageFilter(")
if p.Desc != "" {
sb.WriteString(fmt.Sprintf("description=%q", p.Desc))
}
sb.WriteString(")")
return sb.String()
}

func (p MetadataMatcherPredicate) String() string {
var sb strings.Builder
sb.WriteString("Metadata(")
sb.WriteString(p.Key)
sb.WriteString("=")
sb.WriteString(p.Value)
sb.WriteString(")")
return sb.String()
Comment on lines +203 to +209
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would

return fmt.Sprintf("Metadata(%s=%s)", p.Key, p.Value)

here be simpler?

(Similar comment for other String methods that don't have if branches)

}

func (p MetadataFilterPredicate) String() string {
var sb strings.Builder
sb.WriteString("MetadataFilter(")
sb.WriteString(p.Key)
if p.Desc != "" {
sb.WriteString(fmt.Sprintf(", description=%q", p.Desc))
}
sb.WriteString(")")
return sb.String()
}
Loading