Skip to content

Commit 3c36ba9

Browse files
authored
feat: Implement owned streams calculation using Partition Ring (#14282)
1 parent 923a3e4 commit 3c36ba9

File tree

9 files changed

+235
-74
lines changed

9 files changed

+235
-74
lines changed

‎pkg/ingester/checkpoint_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func TestIngesterWAL(t *testing.T) {
7373

7474
readRingMock := mockReadRingWithOneActiveIngester()
7575

76-
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
76+
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
7777
require.NoError(t, err)
7878
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
7979
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@@ -116,7 +116,7 @@ func TestIngesterWAL(t *testing.T) {
116116
expectCheckpoint(t, walDir, false, time.Second)
117117

118118
// restart the ingester
119-
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
119+
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
120120
require.NoError(t, err)
121121
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
122122
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
@@ -130,7 +130,7 @@ func TestIngesterWAL(t *testing.T) {
130130
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))
131131

132132
// restart the ingester
133-
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
133+
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
134134
require.NoError(t, err)
135135
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
136136
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
@@ -155,7 +155,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
155155

156156
readRingMock := mockReadRingWithOneActiveIngester()
157157

158-
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
158+
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
159159
require.NoError(t, err)
160160
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
161161
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@@ -201,7 +201,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
201201
require.NoError(t, err)
202202

203203
// restart the ingester
204-
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
204+
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
205205
require.NoError(t, err)
206206
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
207207
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
@@ -260,7 +260,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) {
260260

261261
readRingMock := mockReadRingWithOneActiveIngester()
262262

263-
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
263+
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
264264
require.NoError(t, err)
265265
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
266266
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@@ -281,7 +281,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) {
281281
expectCheckpoint(t, walDir, false, time.Second)
282282

283283
// restart the ingester, ensuring we replayed from WAL.
284-
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
284+
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
285285
require.NoError(t, err)
286286
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
287287
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
@@ -304,7 +304,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) {
304304

305305
readRingMock := mockReadRingWithOneActiveIngester()
306306

307-
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
307+
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
308308
require.NoError(t, err)
309309
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
310310
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@@ -325,7 +325,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) {
325325
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))
326326

327327
// restart the ingester, ensuring we can replay from the checkpoint as well.
328-
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
328+
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
329329
require.NoError(t, err)
330330
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
331331
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
@@ -602,7 +602,7 @@ func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) {
602602

603603
readRingMock := mockReadRingWithOneActiveIngester()
604604

605-
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
605+
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
606606
require.NoError(t, err)
607607
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
608608
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@@ -674,7 +674,7 @@ func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) {
674674
require.NoError(t, err)
675675

676676
// restart the ingester
677-
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
677+
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
678678
require.NoError(t, err)
679679
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
680680
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))

‎pkg/ingester/flush_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore,
393393
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
394394
require.NoError(t, err)
395395

396-
ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokitlog.NewNopLogger(), nil, readRingMock)
396+
ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokitlog.NewNopLogger(), nil, readRingMock, nil)
397397
require.NoError(t, err)
398398
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
399399

‎pkg/ingester/ingester.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -292,15 +292,15 @@ type Ingester struct {
292292

293293
// recalculateOwnedStreams periodically checks the ring for changes and recalculates owned streams for each instance.
294294
readRing ring.ReadRing
295-
recalculateOwnedStreams *recalculateOwnedStreams
295+
recalculateOwnedStreams *recalculateOwnedStreamsSvc
296296

297297
ingestPartitionID int32
298298
partitionRingLifecycler *ring.PartitionInstanceLifecycler
299299
partitionReader *partition.Reader
300300
}
301301

302302
// New makes a new Ingester.
303-
func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg, metricsNamespace string, logger log.Logger, customStreamsTracker push.UsageTracker, readRing ring.ReadRing) (*Ingester, error) {
303+
func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg, metricsNamespace string, logger log.Logger, customStreamsTracker push.UsageTracker, readRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher) (*Ingester, error) {
304304
if cfg.ingesterClientFactory == nil {
305305
cfg.ingesterClientFactory = client.New
306306
}
@@ -408,7 +408,13 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
408408
i.SetExtractorWrapper(i.cfg.SampleExtractorWrapper)
409409
}
410410

411-
i.recalculateOwnedStreams = newRecalculateOwnedStreams(i.getInstances, i.lifecycler.ID, i.readRing, cfg.OwnedStreamsCheckInterval, util_log.Logger)
411+
var ownedStreamsStrategy ownershipStrategy
412+
if i.cfg.KafkaIngestion.Enabled {
413+
ownedStreamsStrategy = newOwnedStreamsPartitionStrategy(i.ingestPartitionID, partitionRingWatcher, util_log.Logger)
414+
} else {
415+
ownedStreamsStrategy = newOwnedStreamsIngesterStrategy(i.lifecycler.ID, i.readRing, util_log.Logger)
416+
}
417+
i.recalculateOwnedStreams = newRecalculateOwnedStreamsSvc(i.getInstances, ownedStreamsStrategy, cfg.OwnedStreamsCheckInterval, util_log.Logger)
412418

413419
return i, nil
414420
}

‎pkg/ingester/ingester_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func TestPrepareShutdownMarkerPathNotSet(t *testing.T) {
6363

6464
mockRing := mockReadRingWithOneActiveIngester()
6565

66-
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, mockRing)
66+
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, mockRing, nil)
6767
require.NoError(t, err)
6868
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
6969

@@ -88,7 +88,7 @@ func TestPrepareShutdown(t *testing.T) {
8888

8989
readRingMock := mockReadRingWithOneActiveIngester()
9090

91-
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
91+
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
9292
require.NoError(t, err)
9393
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
9494

@@ -151,7 +151,7 @@ func TestIngester_GetStreamRates_Correctness(t *testing.T) {
151151

152152
readRingMock := mockReadRingWithOneActiveIngester()
153153

154-
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
154+
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
155155
require.NoError(t, err)
156156
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
157157

@@ -184,7 +184,7 @@ func BenchmarkGetStreamRatesAllocs(b *testing.B) {
184184
}
185185
readRingMock := mockReadRingWithOneActiveIngester()
186186

187-
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
187+
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
188188
require.NoError(b, err)
189189
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
190190

@@ -210,7 +210,7 @@ func TestIngester(t *testing.T) {
210210

211211
readRingMock := mockReadRingWithOneActiveIngester()
212212

213-
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
213+
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
214214
require.NoError(t, err)
215215
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
216216

@@ -397,7 +397,7 @@ func TestIngesterStreamLimitExceeded(t *testing.T) {
397397

398398
readRingMock := mockReadRingWithOneActiveIngester()
399399

400-
i, err := New(ingesterConfig, client.Config{}, store, overrides, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
400+
i, err := New(ingesterConfig, client.Config{}, store, overrides, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
401401
require.NoError(t, err)
402402
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
403403

@@ -819,7 +819,7 @@ func Test_InMemoryLabels(t *testing.T) {
819819

820820
readRingMock := mockReadRingWithOneActiveIngester()
821821

822-
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
822+
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
823823
require.NoError(t, err)
824824
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
825825

@@ -874,7 +874,7 @@ func TestIngester_GetDetectedLabels(t *testing.T) {
874874
}
875875
readRingMock := mockReadRingWithOneActiveIngester()
876876

877-
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
877+
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
878878
require.NoError(t, err)
879879
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
880880

@@ -938,7 +938,7 @@ func TestIngester_GetDetectedLabelsWithQuery(t *testing.T) {
938938
}
939939
readRingMock := mockReadRingWithOneActiveIngester()
940940

941-
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
941+
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
942942
require.NoError(t, err)
943943
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
944944

@@ -1306,7 +1306,7 @@ func TestStats(t *testing.T) {
13061306
require.NoError(t, err)
13071307
readRingMock := mockReadRingWithOneActiveIngester()
13081308

1309-
i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
1309+
i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
13101310
require.NoError(t, err)
13111311

13121312
i.instances["test"] = defaultInstance(t)
@@ -1334,7 +1334,7 @@ func TestVolume(t *testing.T) {
13341334
require.NoError(t, err)
13351335
readRingMock := mockReadRingWithOneActiveIngester()
13361336

1337-
i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
1337+
i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
13381338
require.NoError(t, err)
13391339

13401340
i.instances["test"] = defaultInstance(t)
@@ -1414,7 +1414,7 @@ func createIngesterServer(t *testing.T, ingesterConfig Config) (ingesterClient,
14141414
require.NoError(t, err)
14151415
readRingMock := mockReadRingWithOneActiveIngester()
14161416

1417-
ing, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
1417+
ing, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
14181418
require.NoError(t, err)
14191419

14201420
listener := bufconn.Listen(1024 * 1024)
@@ -1631,7 +1631,7 @@ func TestUpdateOwnedStreams(t *testing.T) {
16311631
require.NoError(t, err)
16321632
readRingMock := mockReadRingWithOneActiveIngester()
16331633

1634-
i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
1634+
i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
16351635
require.NoError(t, err)
16361636

16371637
i.instances["test"] = defaultInstance(t)

0 commit comments

Comments
 (0)