Skip to content

fix(promtail): remove flaky TestFileTarget_StopsTailersCleanly #16473

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions clients/pkg/promtail/targets/file/filetarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ type WatchConfig struct {
MaxPollFrequency time.Duration `mapstructure:"max_poll_frequency" yaml:"max_poll_frequency"`
}

var DefaultWatchConig = WatchConfig{
var DefaultWatchConfig = WatchConfig{
MinPollFrequency: 250 * time.Millisecond,
MaxPollFrequency: 250 * time.Millisecond,
}

// RegisterFlags with prefix registers flags where every name is prefixed by
// prefix. If prefix is a non-empty string, prefix should end with a period.
func (cfg *WatchConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
d := DefaultWatchConig
d := DefaultWatchConfig

f.DurationVar(&cfg.MinPollFrequency, prefix+"min_poll_frequency", d.MinPollFrequency, "Minimum period to poll for file changes")
f.DurationVar(&cfg.MaxPollFrequency, prefix+"max_poll_frequency", d.MaxPollFrequency, "Maximum period to poll for file changes")
Expand Down Expand Up @@ -247,7 +247,6 @@ func (t *FileTarget) sync() error {
} else {
// Gets current list of files to tail.
matches, err = doublestar.FilepathGlob(t.path)

if err != nil {
return errors.Wrap(err, "filetarget.sync.filepath.Glob")
}
Expand All @@ -257,7 +256,6 @@ func (t *FileTarget) sync() error {
matchesExcluded = []string{t.pathExclude}
} else {
matchesExcluded, err = doublestar.FilepathGlob(t.pathExclude)

if err != nil {
return errors.Wrap(err, "filetarget.sync.filepathexclude.Glob")
}
Expand Down
118 changes: 13 additions & 105 deletions clients/pkg/promtail/targets/file/filetarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"path/filepath"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -72,7 +71,7 @@ func TestFileTargetSync(t *testing.T) {
path := logDir1 + "/*.log"
target, err := NewFileTarget(metrics, logger, client, ps, path, "", nil, nil, &Config{
SyncPeriod: 1 * time.Minute, // assure the sync is not called by the ticker
}, DefaultWatchConig, nil, fakeHandler, "", nil)
}, DefaultWatchConfig, nil, fakeHandler, "", nil)
assert.NoError(t, err)

// Start with nothing watched.
Expand All @@ -84,7 +83,7 @@ func TestFileTargetSync(t *testing.T) {
}

// Create the base dir, still nothing watched.
err = os.MkdirAll(logDir1, 0750)
err = os.MkdirAll(logDir1, 0o750)
assert.NoError(t, err)

err = target.sync()
Expand Down Expand Up @@ -191,7 +190,7 @@ func TestFileTarget_StopsTailersCleanly(t *testing.T) {
registry := prometheus.NewRegistry()
target, err := NewFileTarget(NewMetrics(registry), logger, client, ps, pathToWatch, "", nil, nil, &Config{
SyncPeriod: 10 * time.Millisecond,
}, DefaultWatchConig, nil, fakeHandler, "", nil)
}, DefaultWatchConfig, nil, fakeHandler, "", nil)
assert.NoError(t, err)

_, err = os.Create(logFile)
Expand Down Expand Up @@ -247,95 +246,6 @@ func TestFileTarget_StopsTailersCleanly(t *testing.T) {
`), "promtail_files_active_total"))
}

func TestFileTarget_StopsTailersCleanly_Parallel(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

tempDir := t.TempDir()
positionsFileName := filepath.Join(tempDir, "positions.yml")

ps, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Millisecond,
PositionsFile: positionsFileName,
})
require.NoError(t, err)

client := fake.New(func() {})
defer client.Stop()

pathToWatch := filepath.Join(tempDir, "*.log")
registry := prometheus.NewRegistry()
metrics := NewMetrics(registry)

// Increase this to several thousand to make the test more likely to fail when debugging a race condition
iterations := 500
fakeHandler := make(chan fileTargetEvent, 10*iterations)
for i := 0; i < iterations; i++ {
logFile := filepath.Join(tempDir, fmt.Sprintf("test_%d.log", i))

target, err := NewFileTarget(metrics, logger, client, ps, pathToWatch, "", nil, nil, &Config{
SyncPeriod: 10 * time.Millisecond,
}, DefaultWatchConig, nil, fakeHandler, "", nil)
assert.NoError(t, err)

file, err := os.Create(logFile)
assert.NoError(t, err)

// Write some data to the file
for j := 0; j < 5; j++ {
_, _ = file.WriteString(fmt.Sprintf("test %d\n", j))
}
require.NoError(t, file.Close())

requireEventually(t, func() bool {
return testutil.CollectAndCount(registry, "promtail_read_lines_total") == 1
}, "expected 1 read_lines_total metric")

requireEventually(t, func() bool {
return testutil.CollectAndCount(registry, "promtail_read_bytes_total") == 1
}, "expected 1 read_bytes_total metric")

requireEventually(t, func() bool {
return testutil.ToFloat64(metrics.readLines) == 5
}, "expected 5 read_lines_total")

requireEventually(t, func() bool {
return testutil.ToFloat64(metrics.totalBytes) == 35
}, "expected 35 total_bytes")

requireEventually(t, func() bool {
return testutil.ToFloat64(metrics.readBytes) == 35
}, "expected 35 read_bytes")

// Concurrently stop the target and remove the file
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
sleepRandomDuration(time.Millisecond * 10)
target.Stop()
wg.Done()

}()
go func() {
sleepRandomDuration(time.Millisecond * 10)
_ = os.Remove(logFile)
wg.Done()
}()

wg.Wait()

requireEventually(t, func() bool {
return testutil.CollectAndCount(registry, "promtail_read_bytes_total") == 0
}, "expected read_bytes_total metric to be cleaned up")

requireEventually(t, func() bool {
return testutil.CollectAndCount(registry, "promtail_file_bytes_total") == 0
}, "expected file_bytes_total metric to be cleaned up")
}

ps.Stop()
}

// Make sure that Stop() doesn't hang if FileTarget is waiting on a channel send.
func TestFileTarget_StopAbruptly(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
Expand Down Expand Up @@ -367,11 +277,11 @@ func TestFileTarget_StopAbruptly(t *testing.T) {
registry := prometheus.NewRegistry()
target, err := NewFileTarget(NewMetrics(registry), logger, client, ps, pathToWatch, "", nil, nil, &Config{
SyncPeriod: 10 * time.Millisecond,
}, DefaultWatchConig, nil, fakeHandler, "", nil)
}, DefaultWatchConfig, nil, fakeHandler, "", nil)
assert.NoError(t, err)

// Create a directory, still nothing is watched.
err = os.MkdirAll(logDir1, 0750)
err = os.MkdirAll(logDir1, 0o750)
assert.NoError(t, err)
_, err = os.Create(logfile1)
assert.NoError(t, err)
Expand All @@ -392,12 +302,12 @@ func TestFileTarget_StopAbruptly(t *testing.T) {

// Create two directories - one more than the buffer of fakeHandler,
// so that the file target hands until we call Stop().
err = os.MkdirAll(logDir2, 0750)
err = os.MkdirAll(logDir2, 0o750)
assert.NoError(t, err)
_, err = os.Create(logfile2)
assert.NoError(t, err)

err = os.MkdirAll(logDir3, 0750)
err = os.MkdirAll(logDir3, 0o750)
assert.NoError(t, err)
_, err = os.Create(logfile3)
assert.NoError(t, err)
Expand Down Expand Up @@ -479,7 +389,7 @@ func TestFileTargetPathExclusion(t *testing.T) {
pathExclude := filepath.Join(dirName, "log3", "*.log")
target, err := NewFileTarget(metrics, logger, client, ps, path, pathExclude, nil, nil, &Config{
SyncPeriod: 1 * time.Minute, // assure the sync is not called by the ticker
}, DefaultWatchConig, nil, fakeHandler, "", nil)
}, DefaultWatchConfig, nil, fakeHandler, "", nil)
assert.NoError(t, err)

// Start with nothing watched.
Expand All @@ -491,11 +401,11 @@ func TestFileTargetPathExclusion(t *testing.T) {
}

// Create the base directories, still nothing watched.
err = os.MkdirAll(logDir1, 0750)
err = os.MkdirAll(logDir1, 0o750)
assert.NoError(t, err)
err = os.MkdirAll(logDir2, 0750)
err = os.MkdirAll(logDir2, 0o750)
assert.NoError(t, err)
err = os.MkdirAll(logDir3, 0750)
err = os.MkdirAll(logDir3, 0o750)
assert.NoError(t, err)

err = target.sync()
Expand Down Expand Up @@ -571,7 +481,7 @@ func TestHandleFileCreationEvent(t *testing.T) {
logFile := filepath.Join(logDir, "test1.log")
logFileIgnored := filepath.Join(logDir, "test.donot.log")

if err := os.MkdirAll(logDir, 0750); err != nil {
if err := os.MkdirAll(logDir, 0o750); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -610,7 +520,7 @@ func TestHandleFileCreationEvent(t *testing.T) {
target, err := NewFileTarget(metrics, logger, client, ps, path, pathExclude, nil, nil, &Config{
// To handle file creation event from channel, set enough long time as sync period
SyncPeriod: 10 * time.Minute,
}, DefaultWatchConig, fakeFileHandler, fakeTargetHandler, "", nil)
}, DefaultWatchConfig, fakeFileHandler, fakeTargetHandler, "", nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -653,7 +563,6 @@ func TestToStopTailing(t *testing.T) {
t.Error("Results mismatch, expected", expected[i], "got", st[i])
}
}

}

func BenchmarkToStopTailing(b *testing.B) {
Expand Down Expand Up @@ -717,7 +626,6 @@ func TestMissing(t *testing.T) {
if _, ok := c["str3"]; !ok {
t.Error("Expected the set to contain str3 but it did not")
}

}

func requireEventually(t *testing.T, f func() bool, msg string) {
Expand Down
11 changes: 5 additions & 6 deletions clients/pkg/promtail/targets/file/filetargetmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
func newTestLogDirectories(t *testing.T) string {
tmpDir := t.TempDir()
logFileDir := filepath.Join(tmpDir, "logs")
err := os.MkdirAll(logFileDir, 0750)
err := os.MkdirAll(logFileDir, 0o750)
assert.NoError(t, err)
return logFileDir
}
Expand Down Expand Up @@ -71,7 +71,7 @@ func newTestFileTargetManager(logger log.Logger, client api.EntryHandler, positi
}

metrics := NewMetrics(nil)
ftm, err := NewFileTargetManager(metrics, logger, positions, client, []scrapeconfig.Config{sc}, tc, DefaultWatchConig)
ftm, err := NewFileTargetManager(metrics, logger, positions, client, []scrapeconfig.Config{sc}, tc, DefaultWatchConfig)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -492,7 +492,7 @@ func TestDeadlockStartWatchingDuringSync(t *testing.T) {
go func() {
for i := 0; i < 10; i++ {
dir := filepath.Join(newLogDir, fmt.Sprintf("%d", i))
err := os.MkdirAll(dir, 0750)
err := os.MkdirAll(dir, 0o750)
assert.NoError(t, err)
time.Sleep(1 * time.Millisecond)
for j := 0; j < 100; j++ {
Expand Down Expand Up @@ -551,13 +551,13 @@ func TestLabelSetUpdate(t *testing.T) {
},
}

var target = model.LabelSet{
target := model.LabelSet{
hostLabel: "localhost",
pathLabel: "baz",
"job": "foo",
}

var target2 = model.LabelSet{
target2 := model.LabelSet{
hostLabel: "localhost",
pathLabel: "baz",
"job": "foo2",
Expand Down Expand Up @@ -593,7 +593,6 @@ func TestLabelSetUpdate(t *testing.T) {
}, targetEventHandler)
require.Equal(t, 0, len(syncer.targets))
require.Equal(t, 0, len(syncer.fileEventWatchers))

}

func TestFulfillKubePodSelector(t *testing.T) {
Expand Down