-
Notifications
You must be signed in to change notification settings - Fork 823
Use TSDB's WAL for writes. #1103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
79 commits
Select commit
Hold shift + click to select a range
7228c1c
Update vendored libraries:
tomwilkie 831080f
Use TSDB's WAL for writes.
tomwilkie 8710a22
Merge remote-tracking branch 'upstream/master' into wal
0802aa6
Merge pull request #23 from codesome/wal
codesome 2a6fc2d
Fix merge conflicts
bc25534
Merge remote-tracking branch 'grafana/wal' into wal
f023ddb
Fix creation of user states in WAL recover
5c022da
Merge remote-tracking branch 'upstream/master' into wal
ac98b13
Remove WAL recovery on start-up and allow nil Record
b84b02e
Fix types
587b8ad
Merge remote-tracking branch 'upstream/master' into wal
05127b0
WAL compression always enabled
787fbb8
Change checkpoint logic to be more like prometheus's tsdb
9845a5e
Add metrics for checkpoint and name changes
5bb950f
Merge remote-tracking branch 'upstream/master' into wal
f21f814
Initial attempt for flushing chunks from WAL [WIP]
46eaf36
Combine checkpoint and WAL chunks before flushing
2b17103
Bring back recovery and tests
935b73e
Fix race in the test
a7844c7
Merge remote-tracking branch 'upstream/master' into wal
538f407
Recover on startup
ff35948
Dont remove the last segment in truncation
2913b99
Always read WAL and remove recover-only mode
e691e01
Timer for WAL recovery time
dbd336e
More rigorous test for WAL
0e1577a
More profiling in debug mode
be2ccbf
Merge remote-tracking branch 'upstream/master' into wal
eeaae15
Fix race in test
214b32e
No limits on number of series during recovery
43dbc9e
No rate limiting of checkpoint
7a9fec0
Change segment deletion logic
2d85a98
Process WAL records in parallel.
50d22c8
Added comments and some refactoring and not returning on no-series fo…
7282ee8
Process checkpoint series in parallel
6cd9940
Merge remote-tracking branch 'upstream/master' into wal
b8a19d2
Merge remote-tracking branch 'upstream/master' into wal
370dcea
Merge remote-tracking branch 'upstream/master' into wal
959cf20
Fix race in processing WAL
8eb90c9
Merge remote-tracking branch 'upstream/master' into wal
d954865
Small enhancements
fb75c9c
Cache the user states and series when processing samples
51beb93
Enhancement in the user state cache and fix in samples buffer
47a8434
Cache user states and series right from the checkpoint
248566a
Small enhancements
f147570
Fix the flag
bc8a194
Remove test files
f6ddbdf
Use tsdb from prometheus/prometheus repo
ad3476e
Avoid flushing on shutdown
737c415
Merge remote-tracking branch 'upstream/master' into wal
251bde5
Fix after rebase
88009b9
Fix bug of resetting userStates
5dfe853
Fix review comments
d661843
Merge remote-tracking branch 'upstream/master' into wal
c55cedc
Small enhancements
02c3fd6
Merge remote-tracking branch 'upstream/master' into wal
902917f
Update comments
4b0d578
Remove ingester<->WAL circular dependancy
d2fb739
Merge remote-tracking branch 'upstream/master' into wal
d799951
Change segment size of the WAL
69e0a48
Use same directory for temporary tokens file
9758838
Merge branch 'fix-tokens-on-file' into wal
14218bf
Disble transfer out when WAL is enabled
3f31526
Flush on shutdown endpoint irrespective of WAL
db679b1
Merge remote-tracking branch 'upstream/master' into wal
7e6f5de
Use sync.Pool for the records
60637c1
Merge remote-tracking branch 'upstream/master' into wal
f2d0b29
Fix Goutham's comments
4dd4ce6
Merge remote-tracking branch 'upstream/master' into wal
ce6a244
Fix Goutham's comments
3fc8e31
Fix possible data corruption, goroutine deadlock and memory leak
e618463
Fix review comments
037d4b3
memoryChunks counter fix, metics updated, small cleanup
27c6f85
Merge remote-tracking branch 'upstream/master' into wal
3fb38ca
Update config file and argument doc
92c1149
Add guide to run/migrate-to WAL in ingesters
30c71a1
Merge remote-tracking branch 'upstream/master' into wal
8b55cdb
Fix review comments
aaa2aa2
Merge remote-tracking branch 'upstream/master' into wal
fe3cc08
Fix review comments
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
--- | ||
title: "Ingesters with WAL" | ||
linkTitle: "Ingesters with WAL" | ||
weight: 5 | ||
slug: ingesters-with-wal | ||
--- | ||
|
||
Currently the ingesters running in the chunks storage mode, store all their data in memory. If there is a crash, there could be loss of data. WAL helps fill this gap in reliability. | ||
|
||
To use WAL, there are some changes that needs to be made in the deployment. | ||
|
||
## Changes to deployment | ||
|
||
1. Since ingesters need to have the same persistent volume across restarts/rollout, all the ingesters should be run on [statefulset](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/) with fixed volumes. | ||
|
||
2. Following flags needs to be set | ||
* `--ingester.wal-dir` to the directory where the WAL data should be stores and/or recovered from. Note that this should be on the mounted volume. | ||
* `--ingester.wal-enabled` to `true` which enables writing to WAL during ingestion. | ||
* `--ingester.checkpoint-enabled` to `true` to enable checkpointing of in-memory chunks to disk. This is optional which helps in speeding up the replay process. | ||
* `--ingester.checkpoint-duration` to the interval at which checkpoints should be created. Default is `30m`, and depending on the number of series, it can be brought down to `15m` if there are less series per ingester (say 1M). | ||
* `--ingester.recover-from-wal` to `true` to recover data from an existing WAL. The data is recovered even if WAL is disabled and this is set to `true`. The WAL dir needs to be set for this. | ||
* If you are going to enable WAL, it is advisable to always set this to `true`. | ||
* `--ingester.tokens-file-path` should be set to the filepath where the tokens should be stored. Note that this should be on the mounted volume. Why this is required is described below. | ||
|
||
## Changes in lifecycle when WAL is enabled | ||
|
||
1. Flushing of data to chunk store during rollouts or scale down is disabled. This is because during a rollout of statefulset there are no ingesters that are simultaneously leaving and joining, rather the same ingester is shut down and brought back again with updated config. Hence flushing is skipped and the data is recovered from the WAL. | ||
|
||
2. As there are no transfers between ingesters, the tokens are stored and recovered from disk between rollout/restarts. This is [not a new thing](https://github.com/cortexproject/cortex/pull/1750) but it is effective when using statefulsets. | ||
|
||
## Migrating from stateless deployments | ||
|
||
The ingester _deployment without WAL_ and _statefulset with WAL_ should be scaled down and up respectively in sync without transfer of data between them to ensure that any ingestion after migration is reliable immediately. | ||
|
||
Let's take an example of 4 ingesters. The migration would look something like this: | ||
|
||
1. Bring up one stateful ingester `ingester-0` and wait till it's ready (accepting read and write requests). | ||
2. Scale down old ingester deployment to 3 and wait till the leaving ingester flushes all the data to chunk store. | ||
3. Once that ingester has disappeared from `kc get pods ...`, add another stateful ingester and wait till it's ready. This assures not transfer. Now you have `ingester-0 ingester-1`. | ||
4. Repeat step 2 to reduce remove another ingester from old deployment. | ||
5. Repeat step 3 to add another stateful ingester. Now you have `ingester-0 ingester-1 ingester-2`. | ||
6. Repeat step 4 and 5, and now you will finally have `ingester-0 ingester-1 ingester-2 ingester-3`. | ||
|
||
## How to scale up/down | ||
|
||
### Scale up | ||
|
||
Scaling up is same as what you would do without WAL or statefulsets. Nothing to change here. | ||
|
||
### Scale down | ||
|
||
Since Kubernetes doesn't differentiate between rollout and scale down when sending a signal, the flushing of chunks is disabled by default. Hence the only thing to take care during scale down is flushing of chunks. | ||
|
||
There are 2 ways to do it, with the latter being a fallback option. | ||
|
||
**First option** | ||
Consider you have 4 ingesters `ingester-0 ingester-1 ingester-2 ingester-3` and you want to scale down to 2 ingesters, the ingesters which will be shutdown according to statefulset rules are `ingester-3` and then `ingester-2`. | ||
|
||
Hence before actually scaling down in Kubernetes, port forward those ingesters and hit the [`/shutdown`](https://github.com/cortexproject/cortex/pull/1746) endpoint. This will flush the chunks and shut down the ingesters (while also removing itself from the ring). | ||
|
||
After hitting the endpoint for `ingester-2 ingester-3`, scale down the ingesters to 2. | ||
|
||
PS: Given you have to scale down 1 ingester at a time, you can pipeline the shutdown and scaledown process instead of hitting shutdown endpoint for all to-be-scaled-down ingesters at the same time. | ||
gouthamve marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
**Fallback option** | ||
|
||
There is a [flush mode ingester](https://github.com/cortexproject/cortex/pull/1747) in progress, and with recent discussions there will be a separate target called flusher in it's place. | ||
|
||
You can run it as a kubernetes job which will | ||
* Attach to the volume of the scaled down ingester | ||
* Recover from the WAL | ||
* And flush all the chunks. | ||
|
||
This job is to be run for all the ingesters that you missed hitting the shutdown endpoint as a first option. | ||
|
||
More info about the flusher target will be added once it's upstream. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,8 +35,14 @@ const ( | |
queryStreamBatchSize = 128 | ||
) | ||
|
||
var ( | ||
// This is initialised if the WAL is enabled and the records are fetched from this pool. | ||
recordPool sync.Pool | ||
) | ||
|
||
// Config for an Ingester. | ||
type Config struct { | ||
WALConfig WALConfig | ||
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"` | ||
|
||
// Config for transferring chunks. Zero or negative = no retries. | ||
|
@@ -70,6 +76,7 @@ type Config struct { | |
// RegisterFlags adds the flags required to config this to the given FlagSet | ||
func (cfg *Config) RegisterFlags(f *flag.FlagSet) { | ||
cfg.LifecyclerConfig.RegisterFlags(f) | ||
cfg.WALConfig.RegisterFlags(f) | ||
|
||
f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing. Negative value or zero disables hand-over.") | ||
f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.") | ||
|
@@ -109,6 +116,9 @@ type Ingester struct { | |
flushQueues []*util.PriorityQueue | ||
flushQueuesDone sync.WaitGroup | ||
|
||
// This should never be nil. | ||
wal WAL | ||
|
||
// Hook for injecting behaviour from tests. | ||
preFlushUserSeries func() | ||
|
||
|
@@ -131,6 +141,19 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c | |
return NewV2(cfg, clientConfig, limits, registerer) | ||
} | ||
|
||
if cfg.WALConfig.walEnabled { | ||
// If WAL is enabled, we don't transfer out the data to any ingester. | ||
// Either the next ingester which takes it's place should recover from WAL | ||
// or the data has to be flushed during scaledown. | ||
cfg.MaxTransferRetries = 0 | ||
|
||
recordPool = sync.Pool{ | ||
gouthamve marked this conversation as resolved.
Show resolved
Hide resolved
|
||
New: func() interface{} { | ||
return &Record{} | ||
}, | ||
} | ||
} | ||
|
||
i := &Ingester{ | ||
cfg: cfg, | ||
clientConfig: clientConfig, | ||
|
@@ -142,14 +165,36 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c | |
} | ||
|
||
var err error | ||
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey) | ||
// During WAL recovery, it will create new user states which requires the limiter. | ||
// Hence initialise the limiter before creating the WAL. | ||
// The '!cfg.WALConfig.walEnabled' argument says don't flush on shutdown if the WAL is enabled. | ||
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.walEnabled) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// Init the limter and instantiate the user states which depend on it | ||
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels) | ||
i.userStates = newUserStates(i.limiter, cfg, i.metrics) | ||
|
||
if cfg.WALConfig.recover { | ||
level.Info(util.Logger).Log("msg", "recovering from WAL") | ||
start := time.Now() | ||
if err := recoverFromWAL(i); err != nil { | ||
level.Error(util.Logger).Log("msg", "failed to recover from WAL", "time", time.Since(start).String()) | ||
return nil, err | ||
} | ||
elapsed := time.Since(start) | ||
level.Info(util.Logger).Log("msg", "recovery from WAL completed", "time", elapsed.String()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we also make this a metric? So that we can compare the duration changes over releases and also correlate it with the number of series, etc. |
||
i.metrics.walReplayDuration.Set(elapsed.Seconds()) | ||
} | ||
|
||
// If the WAL recover happened, then the userStates would already be set. | ||
if i.userStates == nil { | ||
gouthamve marked this conversation as resolved.
Show resolved
Hide resolved
|
||
i.userStates = newUserStates(i.limiter, cfg, i.metrics) | ||
} | ||
|
||
i.wal, err = newWAL(cfg.WALConfig, i.userStates.cp) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// Now that user states have been created, we can start the lifecycler | ||
i.lifecycler.Start() | ||
|
@@ -200,6 +245,8 @@ func (i *Ingester) Shutdown() { | |
close(i.quit) | ||
i.done.Wait() | ||
|
||
i.wal.Stop() | ||
|
||
// Next initiate our graceful exit from the ring. | ||
i.lifecycler.Shutdown() | ||
} | ||
|
@@ -209,7 +256,11 @@ func (i *Ingester) Shutdown() { | |
// * Change the state of ring to stop accepting writes. | ||
// * Flush all the chunks. | ||
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { | ||
originalState := i.lifecycler.FlushOnShutdown() | ||
// We want to flush the chunks if transfer fails irrespective of original flag. | ||
i.lifecycler.SetFlushOnShutdown(true) | ||
i.Shutdown() | ||
i.lifecycler.SetFlushOnShutdown(originalState) | ||
w.WriteHeader(http.StatusNoContent) | ||
} | ||
|
||
|
@@ -232,11 +283,25 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. | |
if err != nil { | ||
return nil, fmt.Errorf("no user id") | ||
} | ||
|
||
var lastPartialErr *validationError | ||
var record *Record | ||
if i.cfg.WALConfig.walEnabled { | ||
record = recordPool.Get().(*Record) | ||
record.UserId = userID | ||
// Assuming there is not much churn in most cases, there is no use | ||
// keeping the record.Labels slice hanging around. | ||
record.Labels = nil | ||
if cap(record.Samples) < len(req.Timeseries) { | ||
record.Samples = make([]Sample, 0, len(req.Timeseries)) | ||
} else { | ||
record.Samples = record.Samples[:0] | ||
} | ||
} | ||
|
||
for _, ts := range req.Timeseries { | ||
for _, s := range ts.Samples { | ||
err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source) | ||
err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source, record) | ||
if err == nil { | ||
continue | ||
} | ||
|
@@ -254,10 +319,19 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. | |
if lastPartialErr != nil { | ||
return &client.WriteResponse{}, lastPartialErr.WrapWithUser(userID).WrappedError() | ||
} | ||
|
||
if record != nil { | ||
// Log the record only if there was no error in ingestion. | ||
if err := i.wal.Log(record); err != nil { | ||
return nil, err | ||
} | ||
recordPool.Put(record) | ||
} | ||
|
||
return &client.WriteResponse{}, nil | ||
} | ||
|
||
func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum) error { | ||
func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum, record *Record) error { | ||
labels.removeBlanks() | ||
|
||
var ( | ||
|
@@ -274,7 +348,8 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, | |
if i.stopped { | ||
return fmt.Errorf("ingester stopping") | ||
} | ||
state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels) | ||
|
||
state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels, record) | ||
if err != nil { | ||
if ve, ok := err.(*validationError); ok { | ||
state.discardedSamples.WithLabelValues(ve.errorType).Inc() | ||
|
@@ -310,6 +385,14 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, | |
return err | ||
} | ||
|
||
if record != nil { | ||
record.Samples = append(record.Samples, Sample{ | ||
Fingerprint: uint64(fp), | ||
Timestamp: uint64(timestamp), | ||
Value: float64(value), | ||
}) | ||
} | ||
|
||
memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks)) | ||
i.metrics.ingestedSamples.Inc() | ||
switch source { | ||
|
@@ -430,7 +513,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ | |
} | ||
|
||
numSeries++ | ||
wireChunks, err := toWireChunks(chunks) | ||
wireChunks, err := toWireChunks(chunks, nil) | ||
if err != nil { | ||
return err | ||
} | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.