Skip to content

Add repair for checkpoint/WAL #2105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 5, 2020
Merged

Conversation

codesome
Copy link
Contributor

@codesome codesome commented Feb 10, 2020

@gouthamve suggested me to add it to the release and that v0.6.2 would be cut with this.

Things changed during checkpointing: If the old checkpoint was X and the new was Y, we were deleting the segments before Y. Now in this PR, I have changed it to delete WAL segments to delete before X so that we can recover from checkpoint X even if checkpoint Y gets corrupt.

How the replay works now with repair:

  1. Attempt replay of checkpoint Y. If the checkpoint Y is corrupt, delete the checkpoint Y and recover from checkpoint X instead.
  2. If checkpoint X is also corrupt, it is a hard fail right now.
  3. Depending on which checkpoint was recovered, we start replaying the WAL from either segment X or Y.
  4. If WAL is corrupt, we do the usual Prometheus repair to discard everything after the corrupt record. If the repair fails its again a hard error.

I will be adding a test for repair now.

Checklist

  • Tests updated
  • Documentation added
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX]
@codesome codesome marked this pull request as ready for review February 11, 2020 11:34
@codesome codesome force-pushed the wal-repair branch 2 times, most recently from 76c6088 to fbfc3d7 Compare February 12, 2020 11:23
@codesome codesome changed the base branch from release-0.6 to master February 12, 2020 11:23
@codesome
Copy link
Contributor Author

I have rebased this PR and it is now pointing to master (following the discussion on the slack)

@codesome codesome force-pushed the wal-repair branch 2 times, most recently from a07998b to 930e545 Compare February 13, 2020 07:37
@codesome
Copy link
Contributor Author

Following @pracucci's comments on having only 1 or no checkpoint on disk and after fixing some code, I have added the case of having 0 or 1 checkpoint in the unit test now.

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
Copy link
Contributor

@gouthamve gouthamve left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with nit. Will approve after comments are fixed.

Ganesh Vernekar added 2 commits March 2, 2020 20:19
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
@codesome
Copy link
Contributor Author

codesome commented Mar 2, 2020

@gouthamve done

sandeepsukhani added a commit to grafana/cortex that referenced this pull request Mar 3, 2020
Copy link
Contributor

@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! It's a bit tricky to me, so my personal confidence level is not super high, but all in all LGTM.

stateCache []map[string]*userState, seriesCache []map[string]map[uint64]*memorySeries) (*userStates, int, error) {

// Use a local userStates, so we don't need to worry about locking.
userStates := newUserStates(ingester.limiter, ingester.cfg, ingester.metrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this going to affect real ingester metrics? Is that a problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will check about that, good catch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cross checked with how transfer handles them

// TransferChunks receives all the chunks from another ingester.
func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) error {
fromIngesterID := ""
seriesReceived := 0
xfer := func() error {
userStates := newUserStates(i.limiter, i.cfg, i.metrics)
for {
wireSeries, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return errors.Wrap(err, "TransferChunks: Recv")
}
// We can't send "extra" fields with a streaming call, so we repeat
// wireSeries.FromIngesterId and assume it is the same every time
// round this loop.
if fromIngesterID == "" {
fromIngesterID = wireSeries.FromIngesterId
level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID)
// Before transfer, make sure 'from' ingester is in correct state to call ClaimTokensFor later
err := i.checkFromIngesterIsInLeavingState(stream.Context(), fromIngesterID)
if err != nil {
return errors.Wrap(err, "TransferChunks: checkFromIngesterIsInLeavingState")
}
}
descs, err := fromWireChunks(wireSeries.Chunks)
if err != nil {
return errors.Wrap(err, "TransferChunks: fromWireChunks")
}
state, fp, series, err := userStates.getOrCreateSeries(stream.Context(), wireSeries.UserId, wireSeries.Labels, nil)
if err != nil {
return errors.Wrapf(err, "TransferChunks: getOrCreateSeries: user %s series %s", wireSeries.UserId, wireSeries.Labels)
}
prevNumChunks := len(series.chunkDescs)
err = series.setChunks(descs)
state.fpLocker.Unlock(fp) // acquired in getOrCreateSeries
if err != nil {
return errors.Wrapf(err, "TransferChunks: setChunks: user %s series %s", wireSeries.UserId, wireSeries.Labels)
}
seriesReceived++
memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks))
receivedChunks.Add(float64(len(descs)))
}
if seriesReceived == 0 {
level.Error(util.Logger).Log("msg", "received TransferChunks request with no series", "from_ingester", fromIngesterID)
return fmt.Errorf("TransferChunks: no series")
}
if fromIngesterID == "" {
level.Error(util.Logger).Log("msg", "received TransferChunks request with no ID from ingester")
return fmt.Errorf("no ingester id")
}
if err := i.lifecycler.ClaimTokensFor(stream.Context(), fromIngesterID); err != nil {
return errors.Wrap(err, "TransferChunks: ClaimTokensFor")
}
i.userStatesMtx.Lock()
defer i.userStatesMtx.Unlock()
i.userStates = userStates
return nil
}
if err := i.transfer(stream.Context(), xfer); err != nil {
return err
}
// Close the stream last, as this is what tells the "from" ingester that
// it's OK to shut down.
if err := stream.SendAndClose(&client.TransferChunksResponse{}); err != nil {
level.Error(util.Logger).Log("msg", "Error closing TransferChunks stream", "from_ingester", fromIngesterID, "err", err)
return err
}
level.Info(util.Logger).Log("msg", "Successfully transferred chunks", "from_ingester", fromIngesterID, "series_received", seriesReceived)
return nil
}

Looks like it's the same here too (including the memory chunks metrics). So I guess all good here.

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
@gouthamve
Copy link
Contributor

Thanks Ganesh!

@gouthamve gouthamve merged commit 28362da into cortexproject:master Mar 5, 2020
@codesome codesome deleted the wal-repair branch March 5, 2020 10:16
@sandeepsukhani sandeepsukhani mentioned this pull request Mar 5, 2020
1 task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
4 participants