Skip to content

Commit 2ab63d2

Browse files
authored
fix(promtail): remove flaky TestFileTarget_StopsTailersCleanly (#16473)
1 parent 5335a21 commit 2ab63d2

File tree

3 files changed

+20
-115
lines changed

3 files changed

+20
-115
lines changed

‎clients/pkg/promtail/targets/file/filetarget.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,15 @@ type WatchConfig struct {
5050
MaxPollFrequency time.Duration `mapstructure:"max_poll_frequency" yaml:"max_poll_frequency"`
5151
}
5252

53-
var DefaultWatchConig = WatchConfig{
53+
var DefaultWatchConfig = WatchConfig{
5454
MinPollFrequency: 250 * time.Millisecond,
5555
MaxPollFrequency: 250 * time.Millisecond,
5656
}
5757

5858
// RegisterFlags with prefix registers flags where every name is prefixed by
5959
// prefix. If prefix is a non-empty string, prefix should end with a period.
6060
func (cfg *WatchConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
61-
d := DefaultWatchConig
61+
d := DefaultWatchConfig
6262

6363
f.DurationVar(&cfg.MinPollFrequency, prefix+"min_poll_frequency", d.MinPollFrequency, "Minimum period to poll for file changes")
6464
f.DurationVar(&cfg.MaxPollFrequency, prefix+"max_poll_frequency", d.MaxPollFrequency, "Maximum period to poll for file changes")
@@ -247,7 +247,6 @@ func (t *FileTarget) sync() error {
247247
} else {
248248
// Gets current list of files to tail.
249249
matches, err = doublestar.FilepathGlob(t.path)
250-
251250
if err != nil {
252251
return errors.Wrap(err, "filetarget.sync.filepath.Glob")
253252
}
@@ -257,7 +256,6 @@ func (t *FileTarget) sync() error {
257256
matchesExcluded = []string{t.pathExclude}
258257
} else {
259258
matchesExcluded, err = doublestar.FilepathGlob(t.pathExclude)
260-
261259
if err != nil {
262260
return errors.Wrap(err, "filetarget.sync.filepathexclude.Glob")
263261
}

‎clients/pkg/promtail/targets/file/filetarget_test.go

Lines changed: 13 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"os"
99
"path/filepath"
1010
"sort"
11-
"sync"
1211
"testing"
1312
"time"
1413

@@ -72,7 +71,7 @@ func TestFileTargetSync(t *testing.T) {
7271
path := logDir1 + "/*.log"
7372
target, err := NewFileTarget(metrics, logger, client, ps, path, "", nil, nil, &Config{
7473
SyncPeriod: 1 * time.Minute, // assure the sync is not called by the ticker
75-
}, DefaultWatchConig, nil, fakeHandler, "", nil)
74+
}, DefaultWatchConfig, nil, fakeHandler, "", nil)
7675
assert.NoError(t, err)
7776

7877
// Start with nothing watched.
@@ -84,7 +83,7 @@ func TestFileTargetSync(t *testing.T) {
8483
}
8584

8685
// Create the base dir, still nothing watched.
87-
err = os.MkdirAll(logDir1, 0750)
86+
err = os.MkdirAll(logDir1, 0o750)
8887
assert.NoError(t, err)
8988

9089
err = target.sync()
@@ -191,7 +190,7 @@ func TestFileTarget_StopsTailersCleanly(t *testing.T) {
191190
registry := prometheus.NewRegistry()
192191
target, err := NewFileTarget(NewMetrics(registry), logger, client, ps, pathToWatch, "", nil, nil, &Config{
193192
SyncPeriod: 10 * time.Millisecond,
194-
}, DefaultWatchConig, nil, fakeHandler, "", nil)
193+
}, DefaultWatchConfig, nil, fakeHandler, "", nil)
195194
assert.NoError(t, err)
196195

197196
_, err = os.Create(logFile)
@@ -247,95 +246,6 @@ func TestFileTarget_StopsTailersCleanly(t *testing.T) {
247246
`), "promtail_files_active_total"))
248247
}
249248

250-
func TestFileTarget_StopsTailersCleanly_Parallel(t *testing.T) {
251-
w := log.NewSyncWriter(os.Stderr)
252-
logger := log.NewLogfmtLogger(w)
253-
254-
tempDir := t.TempDir()
255-
positionsFileName := filepath.Join(tempDir, "positions.yml")
256-
257-
ps, err := positions.New(logger, positions.Config{
258-
SyncPeriod: 10 * time.Millisecond,
259-
PositionsFile: positionsFileName,
260-
})
261-
require.NoError(t, err)
262-
263-
client := fake.New(func() {})
264-
defer client.Stop()
265-
266-
pathToWatch := filepath.Join(tempDir, "*.log")
267-
registry := prometheus.NewRegistry()
268-
metrics := NewMetrics(registry)
269-
270-
// Increase this to several thousand to make the test more likely to fail when debugging a race condition
271-
iterations := 500
272-
fakeHandler := make(chan fileTargetEvent, 10*iterations)
273-
for i := 0; i < iterations; i++ {
274-
logFile := filepath.Join(tempDir, fmt.Sprintf("test_%d.log", i))
275-
276-
target, err := NewFileTarget(metrics, logger, client, ps, pathToWatch, "", nil, nil, &Config{
277-
SyncPeriod: 10 * time.Millisecond,
278-
}, DefaultWatchConig, nil, fakeHandler, "", nil)
279-
assert.NoError(t, err)
280-
281-
file, err := os.Create(logFile)
282-
assert.NoError(t, err)
283-
284-
// Write some data to the file
285-
for j := 0; j < 5; j++ {
286-
_, _ = file.WriteString(fmt.Sprintf("test %d\n", j))
287-
}
288-
require.NoError(t, file.Close())
289-
290-
requireEventually(t, func() bool {
291-
return testutil.CollectAndCount(registry, "promtail_read_lines_total") == 1
292-
}, "expected 1 read_lines_total metric")
293-
294-
requireEventually(t, func() bool {
295-
return testutil.CollectAndCount(registry, "promtail_read_bytes_total") == 1
296-
}, "expected 1 read_bytes_total metric")
297-
298-
requireEventually(t, func() bool {
299-
return testutil.ToFloat64(metrics.readLines) == 5
300-
}, "expected 5 read_lines_total")
301-
302-
requireEventually(t, func() bool {
303-
return testutil.ToFloat64(metrics.totalBytes) == 35
304-
}, "expected 35 total_bytes")
305-
306-
requireEventually(t, func() bool {
307-
return testutil.ToFloat64(metrics.readBytes) == 35
308-
}, "expected 35 read_bytes")
309-
310-
// Concurrently stop the target and remove the file
311-
wg := sync.WaitGroup{}
312-
wg.Add(2)
313-
go func() {
314-
sleepRandomDuration(time.Millisecond * 10)
315-
target.Stop()
316-
wg.Done()
317-
318-
}()
319-
go func() {
320-
sleepRandomDuration(time.Millisecond * 10)
321-
_ = os.Remove(logFile)
322-
wg.Done()
323-
}()
324-
325-
wg.Wait()
326-
327-
requireEventually(t, func() bool {
328-
return testutil.CollectAndCount(registry, "promtail_read_bytes_total") == 0
329-
}, "expected read_bytes_total metric to be cleaned up")
330-
331-
requireEventually(t, func() bool {
332-
return testutil.CollectAndCount(registry, "promtail_file_bytes_total") == 0
333-
}, "expected file_bytes_total metric to be cleaned up")
334-
}
335-
336-
ps.Stop()
337-
}
338-
339249
// Make sure that Stop() doesn't hang if FileTarget is waiting on a channel send.
340250
func TestFileTarget_StopAbruptly(t *testing.T) {
341251
w := log.NewSyncWriter(os.Stderr)
@@ -367,11 +277,11 @@ func TestFileTarget_StopAbruptly(t *testing.T) {
367277
registry := prometheus.NewRegistry()
368278
target, err := NewFileTarget(NewMetrics(registry), logger, client, ps, pathToWatch, "", nil, nil, &Config{
369279
SyncPeriod: 10 * time.Millisecond,
370-
}, DefaultWatchConig, nil, fakeHandler, "", nil)
280+
}, DefaultWatchConfig, nil, fakeHandler, "", nil)
371281
assert.NoError(t, err)
372282

373283
// Create a directory, still nothing is watched.
374-
err = os.MkdirAll(logDir1, 0750)
284+
err = os.MkdirAll(logDir1, 0o750)
375285
assert.NoError(t, err)
376286
_, err = os.Create(logfile1)
377287
assert.NoError(t, err)
@@ -392,12 +302,12 @@ func TestFileTarget_StopAbruptly(t *testing.T) {
392302

393303
// Create two directories - one more than the buffer of fakeHandler,
394304
// so that the file target hands until we call Stop().
395-
err = os.MkdirAll(logDir2, 0750)
305+
err = os.MkdirAll(logDir2, 0o750)
396306
assert.NoError(t, err)
397307
_, err = os.Create(logfile2)
398308
assert.NoError(t, err)
399309

400-
err = os.MkdirAll(logDir3, 0750)
310+
err = os.MkdirAll(logDir3, 0o750)
401311
assert.NoError(t, err)
402312
_, err = os.Create(logfile3)
403313
assert.NoError(t, err)
@@ -479,7 +389,7 @@ func TestFileTargetPathExclusion(t *testing.T) {
479389
pathExclude := filepath.Join(dirName, "log3", "*.log")
480390
target, err := NewFileTarget(metrics, logger, client, ps, path, pathExclude, nil, nil, &Config{
481391
SyncPeriod: 1 * time.Minute, // assure the sync is not called by the ticker
482-
}, DefaultWatchConig, nil, fakeHandler, "", nil)
392+
}, DefaultWatchConfig, nil, fakeHandler, "", nil)
483393
assert.NoError(t, err)
484394

485395
// Start with nothing watched.
@@ -491,11 +401,11 @@ func TestFileTargetPathExclusion(t *testing.T) {
491401
}
492402

493403
// Create the base directories, still nothing watched.
494-
err = os.MkdirAll(logDir1, 0750)
404+
err = os.MkdirAll(logDir1, 0o750)
495405
assert.NoError(t, err)
496-
err = os.MkdirAll(logDir2, 0750)
406+
err = os.MkdirAll(logDir2, 0o750)
497407
assert.NoError(t, err)
498-
err = os.MkdirAll(logDir3, 0750)
408+
err = os.MkdirAll(logDir3, 0o750)
499409
assert.NoError(t, err)
500410

501411
err = target.sync()
@@ -571,7 +481,7 @@ func TestHandleFileCreationEvent(t *testing.T) {
571481
logFile := filepath.Join(logDir, "test1.log")
572482
logFileIgnored := filepath.Join(logDir, "test.donot.log")
573483

574-
if err := os.MkdirAll(logDir, 0750); err != nil {
484+
if err := os.MkdirAll(logDir, 0o750); err != nil {
575485
t.Fatal(err)
576486
}
577487

@@ -610,7 +520,7 @@ func TestHandleFileCreationEvent(t *testing.T) {
610520
target, err := NewFileTarget(metrics, logger, client, ps, path, pathExclude, nil, nil, &Config{
611521
// To handle file creation event from channel, set enough long time as sync period
612522
SyncPeriod: 10 * time.Minute,
613-
}, DefaultWatchConig, fakeFileHandler, fakeTargetHandler, "", nil)
523+
}, DefaultWatchConfig, fakeFileHandler, fakeTargetHandler, "", nil)
614524
if err != nil {
615525
t.Fatal(err)
616526
}
@@ -653,7 +563,6 @@ func TestToStopTailing(t *testing.T) {
653563
t.Error("Results mismatch, expected", expected[i], "got", st[i])
654564
}
655565
}
656-
657566
}
658567

659568
func BenchmarkToStopTailing(b *testing.B) {
@@ -717,7 +626,6 @@ func TestMissing(t *testing.T) {
717626
if _, ok := c["str3"]; !ok {
718627
t.Error("Expected the set to contain str3 but it did not")
719628
}
720-
721629
}
722630

723631
func requireEventually(t *testing.T, f func() bool, msg string) {

‎clients/pkg/promtail/targets/file/filetargetmanager_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
func newTestLogDirectories(t *testing.T) string {
2727
tmpDir := t.TempDir()
2828
logFileDir := filepath.Join(tmpDir, "logs")
29-
err := os.MkdirAll(logFileDir, 0750)
29+
err := os.MkdirAll(logFileDir, 0o750)
3030
assert.NoError(t, err)
3131
return logFileDir
3232
}
@@ -71,7 +71,7 @@ func newTestFileTargetManager(logger log.Logger, client api.EntryHandler, positi
7171
}
7272

7373
metrics := NewMetrics(nil)
74-
ftm, err := NewFileTargetManager(metrics, logger, positions, client, []scrapeconfig.Config{sc}, tc, DefaultWatchConig)
74+
ftm, err := NewFileTargetManager(metrics, logger, positions, client, []scrapeconfig.Config{sc}, tc, DefaultWatchConfig)
7575
if err != nil {
7676
return nil, err
7777
}
@@ -492,7 +492,7 @@ func TestDeadlockStartWatchingDuringSync(t *testing.T) {
492492
go func() {
493493
for i := 0; i < 10; i++ {
494494
dir := filepath.Join(newLogDir, fmt.Sprintf("%d", i))
495-
err := os.MkdirAll(dir, 0750)
495+
err := os.MkdirAll(dir, 0o750)
496496
assert.NoError(t, err)
497497
time.Sleep(1 * time.Millisecond)
498498
for j := 0; j < 100; j++ {
@@ -551,13 +551,13 @@ func TestLabelSetUpdate(t *testing.T) {
551551
},
552552
}
553553

554-
var target = model.LabelSet{
554+
target := model.LabelSet{
555555
hostLabel: "localhost",
556556
pathLabel: "baz",
557557
"job": "foo",
558558
}
559559

560-
var target2 = model.LabelSet{
560+
target2 := model.LabelSet{
561561
hostLabel: "localhost",
562562
pathLabel: "baz",
563563
"job": "foo2",
@@ -593,7 +593,6 @@ func TestLabelSetUpdate(t *testing.T) {
593593
}, targetEventHandler)
594594
require.Equal(t, 0, len(syncer.targets))
595595
require.Equal(t, 0, len(syncer.fileEventWatchers))
596-
597596
}
598597

599598
func TestFulfillKubePodSelector(t *testing.T) {

0 commit comments

Comments
 (0)