Skip to content

feat(logical planning): Planner playground #16403

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Mar 6, 2025
Merged
109 changes: 109 additions & 0 deletions pkg/dataobj/planner/logical/aggregate_expr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package logical

import (
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/planner/schema"
)

// AggregateOp represents the type of aggregation operation to perform.
// It is a string-based enum that identifies different aggregation functions
// that can be applied to expressions.
type AggregateOp string

const (
// AggregateOpSum represents a sum aggregation
AggregateOpSum AggregateOp = "sum"
// AggregateOpAvg represents an average aggregation
AggregateOpAvg AggregateOp = "avg"
// AggregateOpMin represents a minimum value aggregation
AggregateOpMin AggregateOp = "min"
// AggregateOpMax represents a maximum value aggregation
AggregateOpMax AggregateOp = "max"
// AggregateOpCount represents a count aggregation
AggregateOpCount AggregateOp = "count"
)
Comment on lines +8 to +24
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use the same enum pattern (int type + iota + String() function) as in the rest of the PR.


// Convenience constructors for each aggregate operation
var (
// Sum creates a sum aggregation expression
Sum = newAggregateExprConstructor(AggregateOpSum)
// Avg creates an average aggregation expression
Avg = newAggregateExprConstructor(AggregateOpAvg)
// Min creates a minimum value aggregation expression
Min = newAggregateExprConstructor(AggregateOpMin)
// Max creates a maximum value aggregation expression
Max = newAggregateExprConstructor(AggregateOpMax)
// Count creates a count aggregation expression
Count = newAggregateExprConstructor(AggregateOpCount)
)

// AggregateExpr represents an aggregation operation on an expression.
// It encapsulates the operation to perform (sum, avg, etc.), the expression
// to aggregate, and a name for the result.
type AggregateExpr struct {
// name is the identifier for this aggregation
name string
// op specifies which aggregation operation to perform
op AggregateOp
// expr is the expression to aggregate
expr Expr
}

// newAggregateExprConstructor creates a constructor function for a specific aggregate operation.
// This is a higher-order function that returns a function for creating aggregate expressions
// with a specific operation type.
func newAggregateExprConstructor(op AggregateOp) func(name string, expr Expr) AggregateExpr {
return func(name string, expr Expr) AggregateExpr {
return AggregateExpr{
name: name,
op: op,
expr: expr,
}
}
}

// Type returns the type of the expression.
// For aggregate expressions, this is always ExprTypeAggregate.
func (a AggregateExpr) Type() ExprType {
return ExprTypeAggregate
}

// Name returns the name of the aggregation.
// This is used as the column name in the output schema.
func (a AggregateExpr) Name() string {
return a.name
}

// Op returns the aggregation operation.
// This identifies which aggregation function to apply.
func (a AggregateExpr) Op() AggregateOp {
return a.op
}

// SubExpr returns the expression being aggregated.
// This is the input to the aggregation function.
func (a AggregateExpr) SubExpr() Expr {
return a.expr
}

// ToField converts the aggregation expression to a column schema.
// It determines the output type based on the input expression and
// the aggregation operation.
func (a AggregateExpr) ToField(p Plan) schema.ColumnSchema {
// Get the input field schema

return schema.ColumnSchema{
Name: a.name,
// Aggregations typically result in numeric types
Type: determineAggregationTypeFromFieldType(a.expr.ToField(p).Type, a.op),
}
}

// determineAggregationTypeFromFieldType calculates the output type of an aggregation
// based on the input field type and the aggregation operation.
// Currently, this is a placeholder that always returns int64, but it should be
// implemented to handle different input types and operations correctly.
func determineAggregationTypeFromFieldType(_ datasetmd.ValueType, _ AggregateOp) datasetmd.ValueType {
// TODO: implement
return datasetmd.VALUE_TYPE_INT64
}
74 changes: 74 additions & 0 deletions pkg/dataobj/planner/logical/aggregate_plan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package logical

import (
"github.com/grafana/loki/v3/pkg/dataobj/planner/schema"
)

// Aggregate represents a plan node that performs aggregation operations.
// The output schema is organized with grouping columns followed by aggregate expressions.
// It corresponds to the GROUP BY clause in SQL and is used to compute aggregate
// functions like SUM, AVG, MIN, MAX, and COUNT over groups of rows.
type Aggregate struct {
// input is the child plan node providing data to aggregate
input Plan
// groupExprs are the expressions to group by
groupExprs []Expr
// aggExprs are the aggregate expressions to compute
aggExprs []AggregateExpr
}

// NewAggregate creates a new Aggregate plan node.
// The Aggregate logical plan calculates aggregates of underlying data such as
// calculating minimum, maximum, averages, and sums of data. Aggregates are often
// grouped by other columns (or expressions).
// A simple example would be SELECT region, SUM(sales) FROM orders GROUP BY region.
func newAggregate(input Plan, groupExprs []Expr, aggExprs []AggregateExpr) *Aggregate {
return &Aggregate{
input: input,
groupExprs: groupExprs,
aggExprs: aggExprs,
}
}

// Schema returns the schema of the data produced by this aggregate.
// The schema consists of group-by expressions followed by aggregate expressions.
// This ordering is important for downstream operations that expect group columns
// to come before aggregate columns.
func (a *Aggregate) Schema() schema.Schema {
var columns []schema.ColumnSchema

// Group expressions come first
for _, expr := range a.groupExprs {
columns = append(columns, expr.ToField(a.input))
}

// Followed by aggregate expressions
for _, expr := range a.aggExprs {
columns = append(columns, expr.ToField(a.input))
}

return schema.FromColumns(columns)
}

// Type implements the ast interface
func (a *Aggregate) Type() PlanType {
return PlanTypeAggregate
}

// GroupExprs returns the list of expressions to group by.
// These expressions define the grouping keys for the aggregation.
func (a *Aggregate) GroupExprs() []Expr {
return a.groupExprs
}

// AggregateExprs returns the list of aggregate expressions to compute.
// These expressions define the aggregate functions to apply to each group.
func (a *Aggregate) AggregateExprs() []AggregateExpr {
return a.aggExprs
}

// Child returns the input plan.
// This is a convenience method for accessing the child plan.
func (a *Aggregate) Child() Plan {
return a.input
}
Loading