-
Notifications
You must be signed in to change notification settings - Fork 3.7k
feat: add per-tenant time sharding for long out-of-order ingestion #14711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
5b70be6
ad1c0b2
7787735
36e0cfc
dfb092a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ import ( | |
"fmt" | ||
"math" | ||
"net/http" | ||
"slices" | ||
"sort" | ||
"strconv" | ||
"strings" | ||
|
@@ -64,6 +65,8 @@ const ( | |
ringKey = "distributor" | ||
|
||
ringAutoForgetUnhealthyPeriods = 2 | ||
|
||
timeShardLabel = "__time_shard__" | ||
) | ||
|
||
var ( | ||
|
@@ -132,6 +135,7 @@ type Distributor struct { | |
services.Service | ||
|
||
cfg Config | ||
ingesterCfg ingester.Config | ||
logger log.Logger | ||
clientCfg client.Config | ||
tenantConfigs *runtime.TenantConfigs | ||
|
@@ -187,6 +191,7 @@ type Distributor struct { | |
// New a distributor creates. | ||
func New( | ||
cfg Config, | ||
ingesterCfg ingester.Config, | ||
clientCfg client.Config, | ||
configs *runtime.TenantConfigs, | ||
ingestersRing ring.ReadRing, | ||
|
@@ -245,6 +250,7 @@ func New( | |
|
||
d := &Distributor{ | ||
cfg: cfg, | ||
ingesterCfg: ingesterCfg, | ||
logger: logger, | ||
clientCfg: clientCfg, | ||
tenantConfigs: configs, | ||
|
@@ -448,6 +454,28 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log | |
var validationErrors util.GroupedErrors | ||
validationContext := d.validator.getValidationContextForTime(time.Now(), tenantID) | ||
|
||
shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID) | ||
maybeShardByRate := func(stream logproto.Stream, pushSize int) { | ||
if shardStreamsCfg.Enabled { | ||
streams = append(streams, d.shardStream(stream, pushSize, tenantID)...) | ||
} else { | ||
streams = append(streams, KeyedStream{ | ||
HashKey: lokiring.TokenFor(tenantID, stream.Labels), | ||
Stream: stream, | ||
}) | ||
} | ||
} | ||
maybeShardByTime := func(stream logproto.Stream, labels labels.Labels, pushSize int) { | ||
if shardStreamsCfg.TimeShardingEnabled { | ||
na-- marked this conversation as resolved.
Show resolved
Hide resolved
|
||
streamsByTime := shardStreamByTime(stream, labels, d.ingesterCfg.MaxChunkAge/2) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would apply sharding by time only if the logs are older than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I considered this, but I'm somewhat worried that it might cause more issues than it solves:
So, because this is a per-tenant config, I think the tradeoffs are slightly better if we always inject the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I do not think so. With the changes that I propose, we would not create new buckets for In general, I would try to guess if the data will be rejected by the ingester or not (maybe with some safe margin, maybe 15m is enough), because we would not create new streams, every hour, for the data that is fresh enough... We already see that it almost doubles the stream count for the cases when a lot of fresh data is ingested. Also, it affects the chunks, because we get more underutilized chunks that are flushed due to the reason Also, this change does not give us any drawbacks in terms of ingesting out-of-order old logs... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The question is how are the streams going to be distributed over time. Because in your example, if you keep getting more and more logs that have But if these out-of-order logs arrive only occasionally, we'd be much better off with what you suggest, for sure. So I think I'll add another per-tenant config option called something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep, it would be ideal. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should now be resolved by the most recent commit, PTAL: 7787735 |
||
for _, ts := range streamsByTime { | ||
maybeShardByRate(ts.Stream, ts.linesTotalLen) | ||
} | ||
} else { | ||
maybeShardByRate(stream, pushSize) | ||
} | ||
} | ||
vlad-diachenko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
func() { | ||
sp := opentracing.SpanFromContext(ctx) | ||
if sp != nil { | ||
|
@@ -456,6 +484,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log | |
sp.LogKV("event", "finished to validate request") | ||
}() | ||
} | ||
|
||
for _, stream := range req.Streams { | ||
// Return early if stream does not contain any entries | ||
if len(stream.Entries) == 0 { | ||
|
@@ -534,15 +563,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log | |
continue | ||
} | ||
|
||
shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID) | ||
if shardStreamsCfg.Enabled { | ||
streams = append(streams, d.shardStream(stream, pushSize, tenantID)...) | ||
} else { | ||
streams = append(streams, KeyedStream{ | ||
HashKey: lokiring.TokenFor(tenantID, stream.Labels), | ||
Stream: stream, | ||
}) | ||
} | ||
maybeShardByTime(stream, lbs, pushSize) | ||
na-- marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
}() | ||
|
||
|
@@ -721,6 +742,52 @@ func hasAnyLevelLabels(l labels.Labels) (string, bool) { | |
return "", false | ||
} | ||
|
||
type streamWithTimeShard struct { | ||
logproto.Stream | ||
linesTotalLen int | ||
} | ||
|
||
// This should shard the stream into multiple sub-streams based on the log | ||
// timestamps, but with no new alocations for the log entries. It will sort them | ||
// in-place in the given stream object (so it may modify it!) and reference | ||
// sub-slices of the same stream.Entries slice. | ||
func shardStreamByTime(stream logproto.Stream, lbls labels.Labels, timeShardLen time.Duration) []streamWithTimeShard { | ||
entries := stream.Entries | ||
entriesLen := len(entries) | ||
if entriesLen == 0 { | ||
return nil | ||
} | ||
|
||
slices.SortStableFunc(entries, func(a, b logproto.Entry) int { return a.Timestamp.Compare(b.Timestamp) }) | ||
|
||
result := make([]streamWithTimeShard, 0, entries[entriesLen-1].Timestamp.Sub(entries[0].Timestamp)/timeShardLen) | ||
labelBuilder := labels.NewBuilder(lbls) | ||
|
||
for startIdx := 0; startIdx < entriesLen; /* the index is changed below */ { | ||
timeShardStart := entries[startIdx].Timestamp.Truncate(timeShardLen) | ||
timeShardEnd := timeShardStart.Add(timeShardLen) | ||
|
||
endIdx := startIdx + 1 | ||
linesTotalLen := len(entries[startIdx].Line) | ||
for ; endIdx < entriesLen && entries[endIdx].Timestamp.Before(timeShardEnd); endIdx++ { | ||
linesTotalLen += len(entries[endIdx].Line) | ||
} | ||
|
||
shardLbls := labelBuilder.Set(timeShardLabel, fmt.Sprintf("%d_%d", timeShardStart.Unix(), timeShardEnd.Unix())).Labels() | ||
result = append(result, streamWithTimeShard{ | ||
Stream: logproto.Stream{ | ||
Labels: shardLbls.String(), | ||
Hash: shardLbls.Hash(), | ||
Entries: stream.Entries[startIdx:endIdx], | ||
}, | ||
linesTotalLen: linesTotalLen, | ||
}) | ||
|
||
startIdx = endIdx | ||
} | ||
return result | ||
} | ||
|
||
// shardStream shards (divides) the given stream into N smaller streams, where | ||
// N is the sharding size for the given stream. shardSteam returns the smaller | ||
// streams and their associated keys for hashing to ingesters. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -330,6 +330,7 @@ func (t *Loki) initDistributor() (services.Service, error) { | |
logger := log.With(util_log.Logger, "component", "distributor") | ||
t.distributor, err = distributor.New( | ||
t.Cfg.Distributor, | ||
t.Cfg.Ingester, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is somewhat risky because, in microservices mode, the distributor and ingester can be ran with a different There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, we can not prevent it, even if it happens, because CLI flag can be assigned to ingesters only and distributors will be out of sync for this flag... |
||
t.Cfg.IngesterClient, | ||
t.tenantConfigs, | ||
t.ring, | ||
|
Uh oh!
There was an error while loading. Please reload this page.