Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
reviews + cleanups
  • Loading branch information
mstrandboge committed Jul 10, 2024
commit 9c850838d56785faf8eaa7be8be5558c8fab7cb8
4 changes: 4 additions & 0 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ type Accumulator interface {
type TrackingID uint64

type TrackingData interface {
// ID is the TrackingID
ID() TrackingID

// RefCount is the number of tracking metrics still persistent and referencing this tracking ID
RefCount() int32
}

// DeliveryInfo provides the results of a delivered metric group.
Expand Down
13 changes: 10 additions & 3 deletions metric/deserialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

// storage for tracking data that can't be serialized to disk
var (
// todo need some way to empty this map out when done with a tracking ID.
// grouped tracking metrics means that ID->Data association is not one to one,
// many metrics could be associated with one tracking ID so we cannot just
// clear this every time in FromBytes.
Expand Down Expand Up @@ -67,11 +66,19 @@ func FromBytes(b []byte) (telegraf.Metric, error) {
if sm.TID != 0 {
mu.Lock()
td := trackingStore[sm.TID]
mu.Unlock()

if td == nil {
mu.Unlock()
return nil, ErrSkipTracking
}
rc := td.RefCount()
if rc <= 1 {
// only 1 metric left referencing this tracking ID, we can remove here since no subsequent metrics
// read can use this ID. If another metric in a metric group with this ID gets added later, it will
// simply be added back into the tracking store again.
trackingStore[sm.TID] = nil
}
mu.Unlock()

m = rebuildTrackingMetric(m, td)
}
return m, nil
Expand Down
4 changes: 4 additions & 0 deletions metric/tracking.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (d *trackingData) incr() {
atomic.AddInt32(&d.Rc, 1)
}

func (d *trackingData) RefCount() int32 {
return d.Rc
}

func (d *trackingData) decr() int32 {
return atomic.AddInt32(&d.Rc, -1)
}
Expand Down
41 changes: 13 additions & 28 deletions models/buffer_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,15 @@ func NewDiskBuffer(name string, path string, stats BufferStats) (*DiskBuffer, er
if err != nil {
return nil, fmt.Errorf("failed to open wal file: %w", err)
}
return &DiskBuffer{
buf := &DiskBuffer{
BufferStats: stats,
file: walFile,
path: filePath,
}, nil
}
if buf.length() > 0 {
buf.originalEnd = buf.writeIndex()
}
return buf, nil
}

func (b *DiskBuffer) Len() int {
Expand Down Expand Up @@ -78,16 +82,15 @@ func (b *DiskBuffer) Add(metrics ...telegraf.Metric) int {

dropped := 0
for _, m := range metrics {
if !b.addSingle(m) {
if !b.addSingleMetric(m) {
dropped++
}
}
b.BufferSize.Set(int64(b.length()))
return dropped
// todo implement batched writes
}

func (b *DiskBuffer) addSingle(m telegraf.Metric) bool {
func (b *DiskBuffer) addSingleMetric(m telegraf.Metric) bool {
data, err := metric.ToBytes(m)
if err != nil {
panic(err)
Expand All @@ -100,26 +103,6 @@ func (b *DiskBuffer) addSingle(m telegraf.Metric) bool {
return false
}

//nolint:unused // to be implemented in the future
func (b *DiskBuffer) addBatch(metrics []telegraf.Metric) int {
written := 0
batch := new(wal.Batch)
for _, m := range metrics {
data, err := metric.ToBytes(m)
if err != nil {
panic(err)
}
batch.Write(b.writeIndex(), data)
b.metricAdded()
written++
}
err := b.file.WriteBatch(batch)
if err != nil {
return 0 // todo error handle, test if a partial write occur
}
return written
}

func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric {
b.Lock()
defer b.Unlock()
Expand Down Expand Up @@ -177,6 +160,7 @@ func (b *DiskBuffer) Accept(batch []telegraf.Metric) {
b.resetWalFile()
} else {
err := b.file.TruncateFront(b.batchFirst + uint64(len(batch)))
b.file.ClearCache()
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -208,9 +192,10 @@ func (b *DiskBuffer) resetBatch() {
b.batchSize = 0
}

// todo This is very messy and not ideal, but serves as the only way I can find currently
// todo to actually clear the walfile completely if needed, since Truncate() calls require
// todo at least one entry remains in them otherwise they return an error.
// This is very messy and not ideal, but serves as the only way I can find currently
// to actually clear the walfile completely if needed, since Truncate() calls require
// that at least one entry remains in them otherwise they return an error.
// Related issue: https://github.com/tidwall/wal/issues/20
func (b *DiskBuffer) resetWalFile() {
b.file.Close()
os.Remove(b.path)
Expand Down
103 changes: 59 additions & 44 deletions models/buffer_disk_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package models

import (
"fmt"
"io"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/tidwall/wal"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

func newTestDiskBuffer(t testing.TB) Buffer {
Expand Down Expand Up @@ -41,55 +42,69 @@ func TestBuffer_RetainsTrackingInformation(t *testing.T) {
require.Equal(t, 1, delivered)
}

// WAL file tested here was written as:
// 1: Metric()
// 2: Metric()
// 3: Metric()
// 4: metric.WithTracking(Metric())
// 5: Metric()
//
// Expected to drop the 4th metric, as tracking metrics from
// previous instances are dropped when the wal file is reopened.
func TestBuffer_TrackingDroppedFromOldWal(t *testing.T) {
// copy the testdata so we do not destroy the testdata wal file
path, err := os.MkdirTemp("", "*-buffer-test")
require.NoError(t, err)
f, err := os.Create(path + "/00000000000000000001")
require.NoError(t, err)
f1, err := os.Open("testdata/testwal/00000000000000000001")
require.NoError(t, err)
written, err := io.Copy(f, f1)
walfile, err := wal.Open(path, nil)
require.NoError(t, err)
fmt.Println(written)

tm, _ := metric.WithTracking(Metric(), func(_ telegraf.DeliveryInfo) {})

metrics := []telegraf.Metric{
// Basic metric with 1 field, 0 timestamp
Metric(),
// Basic metric with 1 field, different timestamp
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 20.0,
},
time.Now(),
),
// Metric with a field
metric.New(
"cpu",
map[string]string{
"x": "y",
},
map[string]interface{}{
"value": 18.0,
},
time.Now(),
),
// Tracking metric
tm,
// Metric with lots of tag types
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value_f64": 20.0,
"value_uint64": uint64(10),
"value_int16": int16(5),
"value_string": "foo",
"value_boolean": true,
"value_byte_array": []byte{1, 2, 3, 4, 5},
},
time.Now(),
),
}

// call manually so that we can properly use metric.ToBytes() without having initialized a buffer
registerGob()

for i, m := range metrics {
data, err := metric.ToBytes(m)
require.NoError(t, err)
require.NoError(t, walfile.Write(uint64(i+1), data))
}

b := newTestDiskBufferWithPath(t, filepath.Base(path), filepath.Dir(path))
batch := b.Batch(4)
// expected skips the tracking metric
expected := []telegraf.Metric{
Metric(), Metric(), Metric(), Metric(),
metrics[0], metrics[1], metrics[2], metrics[4],
}
testutil.RequireMetricsEqual(t, expected, batch)
}

/*
// Function used to create the test data used in the test above
func Test_CreateTestData(t *testing.T) {
metric.Init()
walfile, _ := wal.Open("testdata/testwal", nil)
data, err := metric.ToBytes(Metric())
require.NoError(t, err)
require.NoError(t, walfile.Write(1, data))
data, err = metric.ToBytes(Metric())
require.NoError(t, err)
require.NoError(t, walfile.Write(2, data))
data, err = metric.ToBytes(Metric())
require.NoError(t, err)
require.NoError(t, walfile.Write(3, data))
m, _ := metric.WithTracking(Metric(), func(di telegraf.DeliveryInfo) {})
data, err = metric.ToBytes(m)
require.NoError(t, err)
require.NoError(t, walfile.Write(4, data))
data, err = metric.ToBytes(Metric())
require.NoError(t, err)
require.NoError(t, walfile.Write(5, data))
}
*/
Binary file removed models/testdata/testwal/00000000000000000001
Binary file not shown.