Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Use context propagation to use the same ingester in GetChunksID
  • Loading branch information
cyriltovena committed Nov 29, 2024
commit 650512386411a3beba071a3ada3924e0b179c2e2
100 changes: 87 additions & 13 deletions pkg/querier/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"slices"
"strings"
"sync"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -82,10 +83,74 @@ func newIngesterQuerier(querierConfig Config, clientCfg client.Config, ring ring
return &iq, nil
}

type ctxKeyType string

const (
partitionCtxKey ctxKeyType = "partitionCtx"
)

type PartitionContext struct {
isPartitioned bool
ingestersUsed []PartitionIngesterUsed
mtx sync.Mutex
}

type PartitionIngesterUsed struct {
client logproto.QuerierClient
addr string
}

func (p *PartitionContext) AddClient(client logproto.QuerierClient, addr string) {
p.mtx.Lock()
defer p.mtx.Unlock()
if !p.isPartitioned {
return
}
p.ingestersUsed = append(p.ingestersUsed, PartitionIngesterUsed{client: client, addr: addr})
}

func (p *PartitionContext) SetIsPartitioned(isPartitioned bool) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.isPartitioned = isPartitioned
}

func (p *PartitionContext) IsPartitioned() bool {
return p.isPartitioned
}

func (p *PartitionContext) forQueriedIngesters(ctx context.Context, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
p.mtx.Lock()
defer p.mtx.Unlock()

return concurrency.ForEachJobMergeResults(ctx, p.ingestersUsed, 0, func(ctx context.Context, job PartitionIngesterUsed) ([]responseFromIngesters, error) {
resp, err := f(ctx, job.client)
if err != nil {
return nil, err
}
return []responseFromIngesters{{addr: job.addr, response: resp}}, nil
})
}

// NewPartitionContext creates a new partition context
// This is used to track which ingesters were used in the query and reuse the same ingesters for consecutive queries
func NewPartitionContext(ctx context.Context) context.Context {
return context.WithValue(ctx, partitionCtxKey, &PartitionContext{})
}

func FromPartitionContext(ctx context.Context) *PartitionContext {
v, ok := ctx.Value(partitionCtxKey).(*PartitionContext)
if !ok {
return &PartitionContext{}
}
return v
}

// forAllIngesters runs f, in parallel, for all ingesters
// waitForAllResponses param can be used to require results from all ingesters in the replication set. If this is set to false, the call will return as soon as we have a quorum by zone. Only valid for partition-ingesters.
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, waitForAllResponses bool, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
if q.querierConfig.QueryPartitionIngesters {
FromPartitionContext(ctx).SetIsPartitioned(true)
tenantID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -135,7 +200,7 @@ func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet
if err != nil {
return responseFromIngesters{addr: ingester.Addr}, err
}

FromPartitionContext(ctx).AddClient(client.(logproto.QuerierClient), ingester.Addr)
resp, err := f(ctx, client.(logproto.QuerierClient))
if err != nil {
return responseFromIngesters{addr: ingester.Addr}, err
Expand All @@ -145,7 +210,6 @@ func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet
}, func(responseFromIngesters) {
// Nothing to do
})

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -325,15 +389,28 @@ func (q *IngesterQuerier) TailersCount(ctx context.Context) ([]uint32, error) {
}

func (q *IngesterQuerier) GetChunkIDs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) {
// We must wait for all responses when using partition-ingesters to avoid a race between Query and GetChunkIDs calls.
// This occurs if call Query on an ingester after a recent flush then call GetChunkIDs on a different, unflushed ingester in the same partition.
resps, err := q.forAllIngesters(ctx, q.querierConfig.QueryPartitionIngesters, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
return querierClient.GetChunkIDs(ctx, &logproto.GetChunkIDsRequest{
Matchers: convertMatchersToString(matchers),
Start: from.Time(),
End: through.Time(),
partitionCtx := FromPartitionContext(ctx)
var resps []responseFromIngesters
var err error
if partitionCtx.IsPartitioned() {
// We need to query the same ingesters as the previous query
resps, err = partitionCtx.forQueriedIngesters(ctx, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
return querierClient.GetChunkIDs(ctx, &logproto.GetChunkIDsRequest{
Matchers: convertMatchersToString(matchers),
Start: from.Time(),
End: through.Time(),
})
})
})
} else {
resps, err = q.forAllIngesters(ctx, false, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
return querierClient.GetChunkIDs(ctx, &logproto.GetChunkIDsRequest{
Matchers: convertMatchersToString(matchers),
Start: from.Time(),
End: through.Time(),
})
})
}

if err != nil {
return nil, err
}
Expand All @@ -354,7 +431,6 @@ func (q *IngesterQuerier) Stats(ctx context.Context, _ string, from, through mod
Matchers: syntax.MatchersString(matchers),
})
})

if err != nil {
if isUnimplementedCallError(err) {
// Handle communication with older ingesters gracefully
Expand Down Expand Up @@ -388,7 +464,6 @@ func (q *IngesterQuerier) Volume(ctx context.Context, _ string, from, through mo
AggregateBy: aggregateBy,
})
})

if err != nil {
if isUnimplementedCallError(err) {
// Handle communication with older ingesters gracefully
Expand All @@ -410,7 +485,6 @@ func (q *IngesterQuerier) DetectedLabel(ctx context.Context, req *logproto.Detec
ingesterResponses, err := q.forAllIngesters(ctx, false, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
return client.GetDetectedLabels(ctx, req)
})

if err != nil {
level.Error(q.logger).Log("msg", "error getting detected labels", "err", err)
return nil, err
Expand Down
6 changes: 6 additions & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ func New(cfg Config, store Store, ingesterQuerier *IngesterQuerier, limits Limit

// Select Implements logql.Querier which select logs via matchers and regex filters.
func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) {
// Create a new partition context for the query
// This is used to track which ingesters were used in the query and reuse the same ingesters for consecutive queries
ctx = NewPartitionContext(ctx)
var err error
params.Start, params.End, err = q.validateQueryRequest(ctx, params)
if err != nil {
Expand Down Expand Up @@ -211,6 +214,9 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec
}

func (q *SingleTenantQuerier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) {
// Create a new partition context for the query
// This is used to track which ingesters were used in the query and reuse the same ingesters for consecutive queries
ctx = NewPartitionContext(ctx)
var err error
params.Start, params.End, err = q.validateQueryRequest(ctx, params)
if err != nil {
Expand Down
Loading