Skip to content

Commit 395030b

Browse files
authored
feat(dataobj executor): project node (#17312)
1 parent 3fe0a21 commit 395030b

File tree

5 files changed

+271
-17
lines changed

5 files changed

+271
-17
lines changed

‎pkg/engine/executor/executor.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ func Run(ctx context.Context, cfg Config, plan *physical.Plan) Pipeline {
3131
type Context struct {
3232
batchSize int64
3333
plan *physical.Plan
34+
evaluator expressionEvaluator
3435
}
3536

3637
func (c *Context) execute(ctx context.Context, node physical.Node) Pipeline {
@@ -98,12 +99,17 @@ func (c *Context) executeProjection(_ context.Context, proj *physical.Projection
9899
}
99100

100101
if len(inputs) > 1 {
102+
// unsupported for now
101103
return errorPipeline(fmt.Errorf("projection expects exactly one input, got %d", len(inputs)))
102104
}
103105

104106
if len(proj.Columns) == 0 {
105107
return errorPipeline(fmt.Errorf("projection expects at least one column, got 0"))
106108
}
107109

108-
return errorPipeline(errNotImplemented)
110+
p, err := NewProjectPipeline(inputs[0], proj.Columns, &c.evaluator)
111+
if err != nil {
112+
return errorPipeline(err)
113+
}
114+
return p
109115
}

‎pkg/engine/executor/executor_test.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66

77
"github.com/stretchr/testify/require"
88

9-
"github.com/grafana/loki/v3/pkg/engine/internal/types"
109
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
1110
)
1211

@@ -104,21 +103,6 @@ func TestExecutor_Projection(t *testing.T) {
104103
require.ErrorContains(t, err, "projection expects at least one column, got 0")
105104
})
106105

107-
t.Run("is not implemented", func(t *testing.T) {
108-
cols := []physical.ColumnExpression{
109-
&physical.ColumnExpr{
110-
Ref: types.ColumnRef{
111-
Column: "a",
112-
Type: types.ColumnTypeBuiltin,
113-
},
114-
},
115-
}
116-
c := &Context{}
117-
pipeline := c.executeProjection(context.TODO(), &physical.Projection{Columns: cols}, []Pipeline{emptyPipeline()})
118-
err := pipeline.Read()
119-
require.ErrorContains(t, err, errNotImplemented.Error())
120-
})
121-
122106
t.Run("multiple inputs result in error", func(t *testing.T) {
123107
c := &Context{}
124108
pipeline := c.executeProjection(context.TODO(), &physical.Projection{}, []Pipeline{emptyPipeline(), emptyPipeline()})

‎pkg/engine/executor/pipeline_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ func CSVToArrow(fields []arrow.Field, csvData string) (arrow.Record, error) {
2020
// CSVToArrowWithAllocator converts a CSV string to an Arrow record based on the provided schema
2121
// using the specified memory allocator. It reads all rows from the CSV into a single record.
2222
func CSVToArrowWithAllocator(allocator memory.Allocator, fields []arrow.Field, csvData string) (arrow.Record, error) {
23+
// first, trim the csvData to remove any preceding and trailing whitespace/line breaks
24+
csvData = strings.TrimSpace(csvData)
25+
2326
// Create schema
2427
schema := arrow.NewSchema(fields, nil)
2528

‎pkg/engine/executor/project.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package executor
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/apache/arrow-go/v18/arrow"
7+
"github.com/apache/arrow-go/v18/arrow/array"
8+
9+
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
10+
)
11+
12+
func NewProjectPipeline(input Pipeline, columns []physical.ColumnExpression, evaluator *expressionEvaluator) (*GenericPipeline, error) {
13+
// Get the column names from the projection expressions
14+
columnNames := make([]string, len(columns))
15+
for i, col := range columns {
16+
if colExpr, ok := col.(*physical.ColumnExpr); ok {
17+
columnNames[i] = colExpr.Ref.Column
18+
} else {
19+
return nil, fmt.Errorf("projection column %d is not a column expression", i)
20+
}
21+
}
22+
23+
return newGenericPipeline(Local, func(inputs []Pipeline) state {
24+
// Pull the next item from the input pipeline
25+
input := inputs[0]
26+
err := input.Read()
27+
if err != nil {
28+
return failureState(err)
29+
}
30+
31+
batch, err := input.Value()
32+
if err != nil {
33+
return failureState(err)
34+
}
35+
36+
projected := make([]arrow.Array, 0, len(columns))
37+
fields := make([]arrow.Field, 0, len(columns))
38+
39+
for i := range columns {
40+
vec, err := evaluator.eval(columns[i], batch)
41+
if err != nil {
42+
return failureState(err)
43+
}
44+
fields = append(fields, arrow.Field{Name: columnNames[i], Type: vec.Type()})
45+
projected = append(projected, vec.ToArray())
46+
}
47+
48+
schema := arrow.NewSchema(fields, nil)
49+
// Create a new record with only the projected columns
50+
// retain the projected columns in a new batch then release the original record.
51+
projectedRecord := array.NewRecord(schema, projected, batch.NumRows())
52+
batch.Release()
53+
return successState(projectedRecord)
54+
}, input), nil
55+
}

‎pkg/engine/executor/project_test.go

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
package executor
2+
3+
import (
4+
"testing"
5+
6+
"github.com/apache/arrow-go/v18/arrow"
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/grafana/loki/v3/pkg/engine/internal/types"
10+
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
11+
)
12+
13+
func TestNewProjectPipeline(t *testing.T) {
14+
fields := []arrow.Field{
15+
{Name: "name", Type: arrow.BinaryTypes.String},
16+
{Name: "age", Type: arrow.PrimitiveTypes.Int32},
17+
{Name: "city", Type: arrow.BinaryTypes.String},
18+
}
19+
20+
t.Run("project single column", func(t *testing.T) {
21+
// Create input data
22+
inputCSV := "Alice,30,New York\nBob,25,Boston\nCharlie,35,Seattle"
23+
inputRecord, err := CSVToArrow(fields, inputCSV)
24+
require.NoError(t, err)
25+
defer inputRecord.Release()
26+
27+
// Create input pipeline
28+
inputPipeline := NewBufferedPipeline(inputRecord)
29+
30+
// Create projection columns (just the "name" column)
31+
columns := []physical.ColumnExpression{
32+
&physical.ColumnExpr{
33+
Ref: createColumnRef("name"),
34+
},
35+
}
36+
37+
// Create project pipeline
38+
projectPipeline, err := NewProjectPipeline(inputPipeline, columns, &expressionEvaluator{})
39+
require.NoError(t, err)
40+
41+
// Create expected output
42+
expectedCSV := "Alice\nBob\nCharlie"
43+
expectedFields := []arrow.Field{
44+
{Name: "name", Type: arrow.BinaryTypes.String},
45+
}
46+
expectedRecord, err := CSVToArrow(expectedFields, expectedCSV)
47+
require.NoError(t, err)
48+
defer expectedRecord.Release()
49+
50+
expectedPipeline := NewBufferedPipeline(expectedRecord)
51+
52+
// Assert that the pipelines produce equal results
53+
AssertPipelinesEqual(t, projectPipeline, expectedPipeline)
54+
})
55+
56+
t.Run("project multiple columns", func(t *testing.T) {
57+
// Create input data
58+
inputCSV := "Alice,30,New York\nBob,25,Boston\nCharlie,35,Seattle"
59+
inputRecord, err := CSVToArrow(fields, inputCSV)
60+
require.NoError(t, err)
61+
defer inputRecord.Release()
62+
63+
// Create input pipeline
64+
inputPipeline := NewBufferedPipeline(inputRecord)
65+
66+
// Create projection columns (both "name" and "city" columns)
67+
columns := []physical.ColumnExpression{
68+
&physical.ColumnExpr{
69+
Ref: createColumnRef("name"),
70+
},
71+
&physical.ColumnExpr{
72+
Ref: createColumnRef("city"),
73+
},
74+
}
75+
76+
// Create project pipeline
77+
projectPipeline, err := NewProjectPipeline(inputPipeline, columns, &expressionEvaluator{})
78+
require.NoError(t, err)
79+
80+
// Create expected output
81+
expectedCSV := "Alice,New York\nBob,Boston\nCharlie,Seattle"
82+
expectedFields := []arrow.Field{
83+
{Name: "name", Type: arrow.BinaryTypes.String},
84+
{Name: "city", Type: arrow.BinaryTypes.String},
85+
}
86+
expectedRecord, err := CSVToArrow(expectedFields, expectedCSV)
87+
require.NoError(t, err)
88+
defer expectedRecord.Release()
89+
90+
expectedPipeline := NewBufferedPipeline(expectedRecord)
91+
92+
// Assert that the pipelines produce equal results
93+
AssertPipelinesEqual(t, projectPipeline, expectedPipeline)
94+
})
95+
96+
t.Run("project columns in different order", func(t *testing.T) {
97+
// Create input data
98+
inputCSV := "Alice,30,New York\nBob,25,Boston\nCharlie,35,Seattle"
99+
inputRecord, err := CSVToArrow(fields, inputCSV)
100+
require.NoError(t, err)
101+
defer inputRecord.Release()
102+
103+
// Create input pipeline
104+
inputPipeline := NewBufferedPipeline(inputRecord)
105+
106+
// Create projection columns (reordering columns)
107+
columns := []physical.ColumnExpression{
108+
&physical.ColumnExpr{
109+
Ref: createColumnRef("city"),
110+
},
111+
&physical.ColumnExpr{
112+
Ref: createColumnRef("age"),
113+
},
114+
&physical.ColumnExpr{
115+
Ref: createColumnRef("name"),
116+
},
117+
}
118+
119+
// Create project pipeline
120+
projectPipeline, err := NewProjectPipeline(inputPipeline, columns, &expressionEvaluator{})
121+
require.NoError(t, err)
122+
123+
// Create expected output
124+
expectedCSV := "New York,30,Alice\nBoston,25,Bob\nSeattle,35,Charlie"
125+
expectedFields := []arrow.Field{
126+
{Name: "city", Type: arrow.BinaryTypes.String},
127+
{Name: "age", Type: arrow.PrimitiveTypes.Int32},
128+
{Name: "name", Type: arrow.BinaryTypes.String},
129+
}
130+
expectedRecord, err := CSVToArrow(expectedFields, expectedCSV)
131+
require.NoError(t, err)
132+
defer expectedRecord.Release()
133+
134+
expectedPipeline := NewBufferedPipeline(expectedRecord)
135+
136+
// Assert that the pipelines produce equal results
137+
AssertPipelinesEqual(t, projectPipeline, expectedPipeline)
138+
})
139+
140+
t.Run("project with multiple input batches", func(t *testing.T) {
141+
// Create input data split across multiple records
142+
inputCSV1 := "Alice,30,New York\nBob,25,Boston"
143+
inputCSV2 := "Charlie,35,Seattle\nDave,40,Portland"
144+
145+
inputRecord1, err := CSVToArrow(fields, inputCSV1)
146+
require.NoError(t, err)
147+
defer inputRecord1.Release()
148+
149+
inputRecord2, err := CSVToArrow(fields, inputCSV2)
150+
require.NoError(t, err)
151+
defer inputRecord2.Release()
152+
153+
// Create input pipeline with multiple batches
154+
inputPipeline := NewBufferedPipeline(inputRecord1, inputRecord2)
155+
156+
// Create projection columns
157+
columns := []physical.ColumnExpression{
158+
&physical.ColumnExpr{
159+
Ref: createColumnRef("name"),
160+
},
161+
&physical.ColumnExpr{
162+
Ref: createColumnRef("age"),
163+
},
164+
}
165+
166+
// Create project pipeline
167+
projectPipeline, err := NewProjectPipeline(inputPipeline, columns, &expressionEvaluator{})
168+
require.NoError(t, err)
169+
170+
// Create expected output also split across multiple records
171+
expectedFields := []arrow.Field{
172+
{Name: "name", Type: arrow.BinaryTypes.String},
173+
{Name: "age", Type: arrow.PrimitiveTypes.Int32},
174+
}
175+
176+
expected := `
177+
Alice,30
178+
Bob,25
179+
Charlie,35
180+
Dave,40
181+
`
182+
183+
expectedRecord, err := CSVToArrow(expectedFields, expected)
184+
require.NoError(t, err)
185+
defer expectedRecord.Release()
186+
187+
expectedPipeline := NewBufferedPipeline(expectedRecord)
188+
189+
// Assert that the pipelines produce equal results
190+
AssertPipelinesEqual(t, projectPipeline, expectedPipeline)
191+
})
192+
193+
}
194+
195+
// Helper to create a column reference
196+
func createColumnRef(name string) types.ColumnRef {
197+
return types.ColumnRef{
198+
Column: name,
199+
Type: types.ColumnTypeBuiltin,
200+
}
201+
}
202+
203+
// Helper to create a literal value
204+
func createLiteral(val any) types.Literal {
205+
return types.Literal{Value: val}
206+
}

0 commit comments

Comments
 (0)