Skip to content

Commit 89f0ed7

Browse files
authored
feat(logical planning): Planner playground (#16403)
1 parent 475d25f commit 89f0ed7

19 files changed

+2747
-0
lines changed
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package logical
2+
3+
import (
4+
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
5+
"github.com/grafana/loki/v3/pkg/dataobj/planner/schema"
6+
)
7+
8+
// AggregateOp represents the type of aggregation operation to perform.
9+
// It is a string-based enum that identifies different aggregation functions
10+
// that can be applied to expressions.
11+
type AggregateOp string
12+
13+
const (
14+
// AggregateOpSum represents a sum aggregation
15+
AggregateOpSum AggregateOp = "sum"
16+
// AggregateOpAvg represents an average aggregation
17+
AggregateOpAvg AggregateOp = "avg"
18+
// AggregateOpMin represents a minimum value aggregation
19+
AggregateOpMin AggregateOp = "min"
20+
// AggregateOpMax represents a maximum value aggregation
21+
AggregateOpMax AggregateOp = "max"
22+
// AggregateOpCount represents a count aggregation
23+
AggregateOpCount AggregateOp = "count"
24+
)
25+
26+
// Convenience constructors for each aggregate operation
27+
var (
28+
// Sum creates a sum aggregation expression
29+
Sum = newAggregateExprConstructor(AggregateOpSum)
30+
// Avg creates an average aggregation expression
31+
Avg = newAggregateExprConstructor(AggregateOpAvg)
32+
// Min creates a minimum value aggregation expression
33+
Min = newAggregateExprConstructor(AggregateOpMin)
34+
// Max creates a maximum value aggregation expression
35+
Max = newAggregateExprConstructor(AggregateOpMax)
36+
// Count creates a count aggregation expression
37+
Count = newAggregateExprConstructor(AggregateOpCount)
38+
)
39+
40+
// AggregateExpr represents an aggregation operation on an expression.
41+
// It encapsulates the operation to perform (sum, avg, etc.), the expression
42+
// to aggregate, and a name for the result.
43+
type AggregateExpr struct {
44+
// name is the identifier for this aggregation
45+
name string
46+
// op specifies which aggregation operation to perform
47+
op AggregateOp
48+
// expr is the expression to aggregate
49+
expr Expr
50+
}
51+
52+
// newAggregateExprConstructor creates a constructor function for a specific aggregate operation.
53+
// This is a higher-order function that returns a function for creating aggregate expressions
54+
// with a specific operation type.
55+
func newAggregateExprConstructor(op AggregateOp) func(name string, expr Expr) AggregateExpr {
56+
return func(name string, expr Expr) AggregateExpr {
57+
return AggregateExpr{
58+
name: name,
59+
op: op,
60+
expr: expr,
61+
}
62+
}
63+
}
64+
65+
// Type returns the type of the expression.
66+
// For aggregate expressions, this is always ExprTypeAggregate.
67+
func (a AggregateExpr) Type() ExprType {
68+
return ExprTypeAggregate
69+
}
70+
71+
// Name returns the name of the aggregation.
72+
// This is used as the column name in the output schema.
73+
func (a AggregateExpr) Name() string {
74+
return a.name
75+
}
76+
77+
// Op returns the aggregation operation.
78+
// This identifies which aggregation function to apply.
79+
func (a AggregateExpr) Op() AggregateOp {
80+
return a.op
81+
}
82+
83+
// SubExpr returns the expression being aggregated.
84+
// This is the input to the aggregation function.
85+
func (a AggregateExpr) SubExpr() Expr {
86+
return a.expr
87+
}
88+
89+
// ToField converts the aggregation expression to a column schema.
90+
// It determines the output type based on the input expression and
91+
// the aggregation operation.
92+
func (a AggregateExpr) ToField(p Plan) schema.ColumnSchema {
93+
// Get the input field schema
94+
95+
return schema.ColumnSchema{
96+
Name: a.name,
97+
// Aggregations typically result in numeric types
98+
Type: determineAggregationTypeFromFieldType(a.expr.ToField(p).Type, a.op),
99+
}
100+
}
101+
102+
// determineAggregationTypeFromFieldType calculates the output type of an aggregation
103+
// based on the input field type and the aggregation operation.
104+
// Currently, this is a placeholder that always returns int64, but it should be
105+
// implemented to handle different input types and operations correctly.
106+
func determineAggregationTypeFromFieldType(_ datasetmd.ValueType, _ AggregateOp) datasetmd.ValueType {
107+
// TODO: implement
108+
return datasetmd.VALUE_TYPE_INT64
109+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package logical
2+
3+
import (
4+
"github.com/grafana/loki/v3/pkg/dataobj/planner/schema"
5+
)
6+
7+
// Aggregate represents a plan node that performs aggregation operations.
8+
// The output schema is organized with grouping columns followed by aggregate expressions.
9+
// It corresponds to the GROUP BY clause in SQL and is used to compute aggregate
10+
// functions like SUM, AVG, MIN, MAX, and COUNT over groups of rows.
11+
type Aggregate struct {
12+
// input is the child plan node providing data to aggregate
13+
input Plan
14+
// groupExprs are the expressions to group by
15+
groupExprs []Expr
16+
// aggExprs are the aggregate expressions to compute
17+
aggExprs []AggregateExpr
18+
}
19+
20+
// NewAggregate creates a new Aggregate plan node.
21+
// The Aggregate logical plan calculates aggregates of underlying data such as
22+
// calculating minimum, maximum, averages, and sums of data. Aggregates are often
23+
// grouped by other columns (or expressions).
24+
// A simple example would be SELECT region, SUM(sales) FROM orders GROUP BY region.
25+
func newAggregate(input Plan, groupExprs []Expr, aggExprs []AggregateExpr) *Aggregate {
26+
return &Aggregate{
27+
input: input,
28+
groupExprs: groupExprs,
29+
aggExprs: aggExprs,
30+
}
31+
}
32+
33+
// Schema returns the schema of the data produced by this aggregate.
34+
// The schema consists of group-by expressions followed by aggregate expressions.
35+
// This ordering is important for downstream operations that expect group columns
36+
// to come before aggregate columns.
37+
func (a *Aggregate) Schema() schema.Schema {
38+
var columns []schema.ColumnSchema
39+
40+
// Group expressions come first
41+
for _, expr := range a.groupExprs {
42+
columns = append(columns, expr.ToField(a.input))
43+
}
44+
45+
// Followed by aggregate expressions
46+
for _, expr := range a.aggExprs {
47+
columns = append(columns, expr.ToField(a.input))
48+
}
49+
50+
return schema.FromColumns(columns)
51+
}
52+
53+
// Type implements the ast interface
54+
func (a *Aggregate) Type() PlanType {
55+
return PlanTypeAggregate
56+
}
57+
58+
// GroupExprs returns the list of expressions to group by.
59+
// These expressions define the grouping keys for the aggregation.
60+
func (a *Aggregate) GroupExprs() []Expr {
61+
return a.groupExprs
62+
}
63+
64+
// AggregateExprs returns the list of aggregate expressions to compute.
65+
// These expressions define the aggregate functions to apply to each group.
66+
func (a *Aggregate) AggregateExprs() []AggregateExpr {
67+
return a.aggExprs
68+
}
69+
70+
// Child returns the input plan.
71+
// This is a convenience method for accessing the child plan.
72+
func (a *Aggregate) Child() Plan {
73+
return a.input
74+
}

0 commit comments

Comments
 (0)