Skip to content
Prev Previous commit
Next Next commit
mvs existing files to builder pkg
  • Loading branch information
owen-d committed Dec 2, 2024
commit 22059c29d146421f459be12cf442a97e69a1761e
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package blockbuilder
package builder

import (
"context"
"fmt"
"time"

"github.com/grafana/loki/v3/pkg/blockbuilder/types"

"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/dskit/backoff"
Expand All @@ -31,7 +33,7 @@ type PartitionController interface {
// so it's advised to not buffer the channel for natural backpressure.
// As a convenience, it returns the last seen offset, which matches
// the final record sent on the channel.
Process(context.Context, Offsets, chan<- []AppendInput) (int64, error)
Process(context.Context, types.Offsets, chan<- []AppendInput) (int64, error)

Close() error
}
Expand Down Expand Up @@ -106,7 +108,7 @@ func (l *PartitionJobController) EarliestPartitionOffset(ctx context.Context) (i
)
}

func (l *PartitionJobController) Process(ctx context.Context, offsets Offsets, ch chan<- []AppendInput) (int64, error) {
func (l *PartitionJobController) Process(ctx context.Context, offsets types.Offsets, ch chan<- []AppendInput) (int64, error) {
l.part.SetOffsetForConsumption(offsets.Min)

var (
Expand Down Expand Up @@ -167,7 +169,7 @@ func (l *PartitionJobController) Process(ctx context.Context, offsets Offsets, c

// LoadJob(ctx) returns the next job by finding the most recent unconsumed offset in the partition
// Returns whether an applicable job exists, the job, and an error
func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, *Job, error) {
func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, *types.Job, error) {
// Read the most recent committed offset
committedOffset, err := l.HighestCommittedOffset(ctx)
if err != nil {
Expand All @@ -193,12 +195,13 @@ func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, *Job, error
}

// Create the job with the calculated offsets
offsets := Offsets{
offsets := types.Offsets{
Min: startOffset,
Max: min(startOffset+l.stepLen, highestOffset),
}

job := NewJob(l.part.Partition(), offsets)
// Convert partition from int32 to int
job := types.NewJob(int(l.part.Partition()), offsets)
return true, job, nil
}

Expand Down Expand Up @@ -249,7 +252,7 @@ func (d *dummyPartitionController) Commit(_ context.Context, offset int64) error
return nil
}

func (d *dummyPartitionController) Process(ctx context.Context, offsets Offsets, ch chan<- []AppendInput) (int64, error) {
func (d *dummyPartitionController) Process(ctx context.Context, offsets types.Offsets, ch chan<- []AppendInput) (int64, error) {
for i := int(offsets.Min); i < int(offsets.Max); i++ {
batch := d.createBatch(i)
select {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package blockbuilder
package builder

import (
"github.com/prometheus/client_golang/prometheus"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package blockbuilder
package builder

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package blockbuilder
package builder

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package blockbuilder
package builder

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package blockbuilder
package builder

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package blockbuilder
package builder

import (
"os"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package blockbuilder
package builder

import (
"bytes"
Expand Down