Skip to content
Prev Previous commit
Next Next commit
chore(engine): set physical node ULID during plan construction
When creating a physical plan, each plan node will now have a unique
ULID. The Clone method has been updated to generate a new ULID for the
resulting cloned node.

Workflows, for the time being, will reuse some node ULIDs when a node is
found across multiple sharded tasks.
  • Loading branch information
rfratto committed Nov 1, 2025
commit 66883ad7eff4d3195065fcc6181217f163f51a6c
7 changes: 3 additions & 4 deletions pkg/engine/internal/planner/physical/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ type ColumnCompat struct {
// Returns a string that uniquely identifies the node in the plan.
func (m *ColumnCompat) ID() string { return m.NodeID.String() }

return m.id
}

// Clone returns a deep copy of the node (minus its ID).
// Clone returns a deep copy of the node with a new unique ID.
func (m *ColumnCompat) Clone() Node {
return &ColumnCompat{
NodeID: ulid.Make(),

Source: m.Source,
Destination: m.Destination,
Collision: m.Collision,
Expand Down
4 changes: 3 additions & 1 deletion pkg/engine/internal/planner/physical/dataobjscan.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ type DataObjScan struct {
// Returns a string that uniquely identifies the node in the plan.
func (s *DataObjScan) ID() string { return s.NodeID.String() }

// Clone returns a deep copy of the node (minus its ID).
// Clone returns a deep copy of the node with a new unique ID.
func (s *DataObjScan) Clone() Node {
return &DataObjScan{
NodeID: ulid.Make(),

Location: s.Location,
Section: s.Section,
StreamIDs: slices.Clone(s.StreamIDs),
Expand Down
4 changes: 3 additions & 1 deletion pkg/engine/internal/planner/physical/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ type Filter struct {
// Returns a string that uniquely identifies the node in the plan.
func (f *Filter) ID() string { return f.NodeID.String() }

// Clone returns a deep copy of the node (minus its ID).
// Clone returns a deep copy of the node with a new unique ID.
func (f *Filter) Clone() Node {
return &Filter{
NodeID: ulid.Make(),

Predicates: cloneExpressions(f.Predicates),
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/engine/internal/planner/physical/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ type Join struct {
// Returns a string that uniquely identifies the node in the plan.
func (f *Join) ID() string { return f.NodeID.String() }

// Clone returns a deep copy of the node (minus its ID).
// Clone returns a deep copy of the node with a new unique ID.
func (f *Join) Clone() Node {
return &Join{}
return &Join{
NodeID: ulid.Make(),
}
}

// Type implements the [Node] interface.
Expand Down
4 changes: 3 additions & 1 deletion pkg/engine/internal/planner/physical/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ type Limit struct {
// Returns a string that uniquely identifies the node in the plan.
func (l *Limit) ID() string { return l.NodeID.String() }

// Clone returns a deep copy of the node (minus its ID).
// Clone returns a deep copy of the node with a new unique ID.
func (l *Limit) Clone() Node {
return &Limit{
NodeID: ulid.Make(),

Skip: l.Skip,
Fetch: l.Fetch,
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/engine/internal/planner/physical/parallelize.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ type Parallelize struct {
// ID returns a string that uniquely identifies the node in the plan.
func (p *Parallelize) ID() string { return p.NodeID.String() }

// Clone returns a deep copy of the node (minus its ID).
// Clone returns a deep copy of the node with a new unique ID.
func (p *Parallelize) Clone() Node {
return &Parallelize{ /* nothing to clone */ }
return &Parallelize{
NodeID: ulid.Make(),
}
}

// Type returns [NodeTypeParallelize].
Expand Down
34 changes: 31 additions & 3 deletions pkg/engine/internal/planner/physical/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,14 @@ func (p *Planner) processMakeTable(lp *logical.MakeTable, ctx *Context) (Node, e

// Scan work can be parallelized across multiple workers, so we wrap
// everything into a single Parallelize node.
var parallelize Node = &Parallelize{}
var parallelize Node = &Parallelize{
NodeID: ulid.Make(),
}
p.plan.graph.Add(parallelize)

scanSet := &ScanSet{}
scanSet := &ScanSet{
NodeID: ulid.Make(),
}
p.plan.graph.Add(scanSet)

for _, desc := range filteredShardDescriptors {
Expand All @@ -212,6 +216,8 @@ func (p *Planner) processMakeTable(lp *logical.MakeTable, ctx *Context) (Node, e
Type: ScanTypeDataObject,

DataObject: &DataObjScan{
NodeID: ulid.Make(),

Location: desc.Location,
StreamIDs: desc.Streams,
Section: section,
Expand All @@ -224,6 +230,8 @@ func (p *Planner) processMakeTable(lp *logical.MakeTable, ctx *Context) (Node, e

if p.context.v1Compatible {
compat := &ColumnCompat{
NodeID: ulid.Make(),

Source: types.ColumnTypeMetadata,
Destination: types.ColumnTypeMetadata,
Collision: types.ColumnTypeLabel,
Expand All @@ -245,6 +253,8 @@ func (p *Planner) processMakeTable(lp *logical.MakeTable, ctx *Context) (Node, e
// Convert [logical.Select] into one [Filter] node.
func (p *Planner) processSelect(lp *logical.Select, ctx *Context) (Node, error) {
node := &Filter{
NodeID: ulid.Make(),

Predicates: []Expression{p.convertPredicate(lp.Predicate)},
}
p.plan.graph.Add(node)
Expand All @@ -266,6 +276,8 @@ func (p *Planner) processSort(lp *logical.Sort, ctx *Context) (Node, error) {
}

node := &TopK{
NodeID: ulid.Make(),

SortBy: &ColumnExpr{Ref: lp.Column.Ref},
Ascending: order == ASC,
NullsFirst: false,
Expand Down Expand Up @@ -303,6 +315,8 @@ func (p *Planner) processProjection(lp *logical.Projection, ctx *Context) (Node,
}

var node Node = &Projection{
NodeID: ulid.Make(),

Expressions: expressions,
All: lp.All,
Expand: lp.Expand,
Expand All @@ -320,6 +334,8 @@ func (p *Planner) processProjection(lp *logical.Projection, ctx *Context) (Node,

if needsCompat && p.context.v1Compatible {
compat := &ColumnCompat{
NodeID: ulid.Make(),

Source: types.ColumnTypeParsed,
Destination: types.ColumnTypeParsed,
Collision: types.ColumnTypeLabel,
Expand All @@ -337,6 +353,8 @@ func (p *Planner) processProjection(lp *logical.Projection, ctx *Context) (Node,
// Convert [logical.Limit] into one [Limit] node.
func (p *Planner) processLimit(lp *logical.Limit, ctx *Context) (Node, error) {
node := &Limit{
NodeID: ulid.Make(),

Skip: lp.Skip,
Fetch: lp.Fetch,
}
Expand All @@ -358,6 +376,8 @@ func (p *Planner) processRangeAggregation(r *logical.RangeAggregation, ctx *Cont
}

node := &RangeAggregation{
NodeID: ulid.Make(),

PartitionBy: partitionBy,
Operation: r.Operation,
Start: r.Start,
Expand Down Expand Up @@ -386,6 +406,8 @@ func (p *Planner) processVectorAggregation(lp *logical.VectorAggregation, ctx *C
}

node := &VectorAggregation{
NodeID: ulid.Make(),

GroupBy: groupBy,
Operation: lp.Operation,
}
Expand Down Expand Up @@ -440,10 +462,12 @@ func (p *Planner) collapseMathExpressions(lp logical.Value, rootNode bool, ctx *
}

// Insert an InnerJoin on timestamp before Projection
join := &Join{}
join := &Join{NodeID: ulid.Make()}
p.plan.graph.Add(join)

projection := &Projection{
NodeID: ulid.Make(),

Expressions: []Expression{
&BinaryExpr{
Left: leftChild,
Expand Down Expand Up @@ -492,6 +516,8 @@ func (p *Planner) collapseMathExpressions(lp logical.Value, rootNode bool, ctx *
// we have to stop here and produce a Node, otherwise keep collapsing
if rootNode {
projection := &Projection{
NodeID: ulid.Make(),

Expressions: []Expression{expr},
All: true,
Expand: true,
Expand All @@ -518,6 +544,8 @@ func (p *Planner) collapseMathExpressions(lp logical.Value, rootNode bool, ctx *
}
if rootNode {
projection := &Projection{
NodeID: ulid.Make(),

Expressions: []Expression{expr},
All: true,
Expand: true,
Expand Down
4 changes: 3 additions & 1 deletion pkg/engine/internal/planner/physical/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ type Projection struct {
// Returns a string that uniquely identifies the node in the plan.
func (p *Projection) ID() string { return p.NodeID.String() }

// Clone returns a deep copy of the node (minus its ID).
// Clone returns a deep copy of the node with a new unique ID.
func (p *Projection) Clone() Node {
return &Projection{
NodeID: ulid.Make(),

Expressions: cloneExpressions(p.Expressions),
All: p.All,
Expand: p.Expand,
Expand Down
4 changes: 3 additions & 1 deletion pkg/engine/internal/planner/physical/range_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ type RangeAggregation struct {
// ID returns a string that uniquely identifies the node in the plan.
func (r *RangeAggregation) ID() string { return r.NodeID.String() }

// Clone returns a deep copy of the node (minus its ID).
// Clone returns a deep copy of the node with a new unique ID.
func (r *RangeAggregation) Clone() Node {
return &RangeAggregation{
NodeID: ulid.Make(),

PartitionBy: cloneExpressions(r.PartitionBy),

Operation: r.Operation,
Expand Down
8 changes: 6 additions & 2 deletions pkg/engine/internal/planner/physical/scanset.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,18 @@ type ScanSet struct {
// ID returns a string that uniquely identifies the node in the plan.
func (s *ScanSet) ID() string { return s.NodeID.String() }

// Clone returns a deep copy of the node (minus its ID).
// Clone returns a deep copy of the node with a new unique ID.
func (s *ScanSet) Clone() Node {
newTargets := make([]*ScanTarget, 0, len(s.Targets))
for _, target := range s.Targets {
newTargets = append(newTargets, target.Clone())
}

return &ScanSet{Targets: newTargets}
return &ScanSet{
NodeID: ulid.Make(),

Targets: newTargets,
}
}

// Type returns [NodeTypeScanSet].
Expand Down
4 changes: 3 additions & 1 deletion pkg/engine/internal/planner/physical/topk.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ type TopK struct {
// Returns a string that uniquely identifies the node in the plan.
func (t *TopK) ID() string { return t.NodeID.String() }

// Clone returns a deep copy of the node (minus its ID).
// Clone returns a deep copy of the node with a new unique ID.
func (t *TopK) Clone() Node {
return &TopK{
NodeID: ulid.Make(),

SortBy: t.SortBy.Clone().(ColumnExpression),
Ascending: t.Ascending,
NullsFirst: t.NullsFirst,
Expand Down
4 changes: 3 additions & 1 deletion pkg/engine/internal/planner/physical/vector_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ type VectorAggregation struct {
// Returns a string that uniquely identifies the node in the plan.
func (v *VectorAggregation) ID() string { return v.NodeID.String() }

// Clone returns a deep copy of the node (minus its ID).
// Clone returns a deep copy of the node with a new unique ID.
func (v *VectorAggregation) Clone() Node {
return &VectorAggregation{
NodeID: ulid.Make(),

GroupBy: cloneExpressions(v.GroupBy),
Operation: v.Operation,
}
Expand Down