Skip to content

Commit ec95ed1

Browse files
authored
fix: Ensure successive WAL replays don't overwrite each other (#14848)
1 parent 947a66f commit ec95ed1

File tree

4 files changed

+42
-8
lines changed

4 files changed

+42
-8
lines changed

‎pkg/ingester/recovery.go

-3
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,6 @@ func (r *ingesterRecoverer) Close() {
196196
s.chunkMtx.Lock()
197197
defer s.chunkMtx.Unlock()
198198

199-
// reset all the incrementing stream counters after a successful WAL replay.
200-
s.resetCounter()
201-
202199
if len(s.chunks) == 0 {
203200
inst.removeStream(s)
204201
return nil

‎pkg/ingester/recovery_test.go

+40
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/prometheus/prometheus/model/labels"
1515
"github.com/prometheus/prometheus/tsdb/chunks"
1616
"github.com/prometheus/prometheus/tsdb/record"
17+
"github.com/stretchr/testify/assert"
1718
"github.com/stretchr/testify/require"
1819

1920
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
@@ -302,3 +303,42 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) {
302303
}
303304
require.Equal(t, expected, result.resps[0].Streams)
304305
}
306+
307+
func TestRecoveryWritesContinuesEntryCountAfterWALReplay(t *testing.T) {
308+
ingesterConfig := defaultIngesterTestConfig(t)
309+
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
310+
require.NoError(t, err)
311+
312+
store := &mockStore{
313+
chunks: map[string][]chunk.Chunk{},
314+
}
315+
316+
readRingMock := mockReadRingWithOneActiveIngester()
317+
318+
i, err := New(ingesterConfig, client.Config{}, store, limits, loki_runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
319+
require.NoError(t, err)
320+
321+
var (
322+
users = 10
323+
streamsCt = 1000
324+
entriesPerStream = 50
325+
)
326+
327+
reader, _ := buildMemoryReader(users, streamsCt, entriesPerStream, true)
328+
329+
recoverer := newIngesterRecoverer(i)
330+
331+
err = RecoverWAL(context.Background(), reader, recoverer)
332+
require.NoError(t, err)
333+
334+
recoverer.Close()
335+
336+
// Check that the entry count continues counting from the last WAL entry to avoid overwriting existing entries in the WAL on future replays.
337+
for _, inst := range i.getInstances() {
338+
err := inst.forAllStreams(context.Background(), func(s *stream) error {
339+
assert.Equal(t, int64(entriesPerStream), s.entryCt)
340+
return nil
341+
})
342+
require.NoError(t, err)
343+
}
344+
}

‎pkg/ingester/stream.go

-4
Original file line numberDiff line numberDiff line change
@@ -649,10 +649,6 @@ func (s *stream) addTailer(t *tailer) {
649649
s.tailers[t.getID()] = t
650650
}
651651

652-
func (s *stream) resetCounter() {
653-
s.entryCt = 0
654-
}
655-
656652
func headBlockType(chunkfmt byte, unorderedWrites bool) chunkenc.HeadBlockFmt {
657653
if unorderedWrites {
658654
if chunkfmt >= chunkenc.ChunkFormatV3 {

‎pkg/ingester/wal.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"github.com/go-kit/log/level"
10+
"github.com/pkg/errors"
1011
"github.com/prometheus/client_golang/prometheus"
1112
"github.com/prometheus/prometheus/tsdb/wlog"
1213

@@ -108,7 +109,7 @@ func (w *walWrapper) Log(record *wal.Record) error {
108109
}
109110
select {
110111
case <-w.quit:
111-
return nil
112+
return errors.New("wal is stopped")
112113
default:
113114
buf := recordPool.GetBytes()
114115
defer func() {

0 commit comments

Comments
 (0)