-
Notifications
You must be signed in to change notification settings - Fork 823
Add repair for checkpoint/WAL #2105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
76c6088
to
fbfc3d7
Compare
I have rebased this PR and it is now pointing to |
a07998b
to
930e545
Compare
Following @pracucci's comments on having only 1 or no checkpoint on disk and after fixing some code, I have added the case of having 0 or 1 checkpoint in the unit test now. |
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with nit. Will approve after comments are fixed.
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
@gouthamve done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! It's a bit tricky to me, so my personal confidence level is not super high, but all in all LGTM.
pkg/ingester/wal.go
Outdated
stateCache []map[string]*userState, seriesCache []map[string]map[uint64]*memorySeries) (*userStates, int, error) { | ||
|
||
// Use a local userStates, so we don't need to worry about locking. | ||
userStates := newUserStates(ingester.limiter, ingester.cfg, ingester.metrics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this going to affect real ingester metrics? Is that a problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will check about that, good catch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cross checked with how transfer handles them
cortex/pkg/ingester/transfer.go
Lines 65 to 151 in 5d7b05c
// TransferChunks receives all the chunks from another ingester. | |
func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) error { | |
fromIngesterID := "" | |
seriesReceived := 0 | |
xfer := func() error { | |
userStates := newUserStates(i.limiter, i.cfg, i.metrics) | |
for { | |
wireSeries, err := stream.Recv() | |
if err == io.EOF { | |
break | |
} | |
if err != nil { | |
return errors.Wrap(err, "TransferChunks: Recv") | |
} | |
// We can't send "extra" fields with a streaming call, so we repeat | |
// wireSeries.FromIngesterId and assume it is the same every time | |
// round this loop. | |
if fromIngesterID == "" { | |
fromIngesterID = wireSeries.FromIngesterId | |
level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID) | |
// Before transfer, make sure 'from' ingester is in correct state to call ClaimTokensFor later | |
err := i.checkFromIngesterIsInLeavingState(stream.Context(), fromIngesterID) | |
if err != nil { | |
return errors.Wrap(err, "TransferChunks: checkFromIngesterIsInLeavingState") | |
} | |
} | |
descs, err := fromWireChunks(wireSeries.Chunks) | |
if err != nil { | |
return errors.Wrap(err, "TransferChunks: fromWireChunks") | |
} | |
state, fp, series, err := userStates.getOrCreateSeries(stream.Context(), wireSeries.UserId, wireSeries.Labels, nil) | |
if err != nil { | |
return errors.Wrapf(err, "TransferChunks: getOrCreateSeries: user %s series %s", wireSeries.UserId, wireSeries.Labels) | |
} | |
prevNumChunks := len(series.chunkDescs) | |
err = series.setChunks(descs) | |
state.fpLocker.Unlock(fp) // acquired in getOrCreateSeries | |
if err != nil { | |
return errors.Wrapf(err, "TransferChunks: setChunks: user %s series %s", wireSeries.UserId, wireSeries.Labels) | |
} | |
seriesReceived++ | |
memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks)) | |
receivedChunks.Add(float64(len(descs))) | |
} | |
if seriesReceived == 0 { | |
level.Error(util.Logger).Log("msg", "received TransferChunks request with no series", "from_ingester", fromIngesterID) | |
return fmt.Errorf("TransferChunks: no series") | |
} | |
if fromIngesterID == "" { | |
level.Error(util.Logger).Log("msg", "received TransferChunks request with no ID from ingester") | |
return fmt.Errorf("no ingester id") | |
} | |
if err := i.lifecycler.ClaimTokensFor(stream.Context(), fromIngesterID); err != nil { | |
return errors.Wrap(err, "TransferChunks: ClaimTokensFor") | |
} | |
i.userStatesMtx.Lock() | |
defer i.userStatesMtx.Unlock() | |
i.userStates = userStates | |
return nil | |
} | |
if err := i.transfer(stream.Context(), xfer); err != nil { | |
return err | |
} | |
// Close the stream last, as this is what tells the "from" ingester that | |
// it's OK to shut down. | |
if err := stream.SendAndClose(&client.TransferChunksResponse{}); err != nil { | |
level.Error(util.Logger).Log("msg", "Error closing TransferChunks stream", "from_ingester", fromIngesterID, "err", err) | |
return err | |
} | |
level.Info(util.Logger).Log("msg", "Successfully transferred chunks", "from_ingester", fromIngesterID, "series_received", seriesReceived) | |
return nil | |
} |
Looks like it's the same here too (including the memory chunks metrics). So I guess all good here.
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
Thanks Ganesh! |
@gouthamve suggested me to add it to the release and that
v0.6.2
would be cut with this.Things changed during checkpointing: If the old checkpoint was X and the new was Y, we were deleting the segments before Y. Now in this PR, I have changed it to delete WAL segments to delete before X so that we can recover from checkpoint X even if checkpoint Y gets corrupt.
How the replay works now with repair:
I will be adding a test for repair now.
Checklist
Documentation addedCHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]