Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixes from review:
* extra word typo in readme
* initialize timestamp field in init(), update tests
* always use string arrow type for tags
  • Loading branch information
powersj committed Jul 23, 2024
commit 98399b63733f2b6ad4cd4459cfa9215b2f7040f5
2 changes: 1 addition & 1 deletion plugins/outputs/parquet/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Parquet Output Plugin

This plugin sends writes metrics to parquet files. By default, the parquet
This plugin writes metrics to parquet files. By default, the parquet
output groups metrics by metric name and write those metrics all to the same
file. If a metric schema does not match then metrics are dropped.

Expand Down
19 changes: 8 additions & 11 deletions plugins/outputs/parquet/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
//go:embed sample.conf
var sampleConfig string

var defaultTimestampFieldName = "timestamp"

type metricGroup struct {
filename string
builder *array.RecordBuilder
Expand All @@ -47,11 +49,6 @@ func (p *Parquet) Init() error {
p.Directory = "."
}

if p.TimestampFieldName == nil {
timestampFieldName := "timestamp"
p.TimestampFieldName = &timestampFieldName
}

stat, err := os.Stat(p.Directory)
if os.IsNotExist(err) {
if err := os.MkdirAll(p.Directory, 0750); err != nil {
Expand Down Expand Up @@ -255,11 +252,7 @@ func (p *Parquet) createSchema(metrics []telegraf.Metric) (*arrow.Schema, error)
}
for _, tag := range metric.TagList() {
if _, ok := rawFields[tag.Key]; !ok {
arrowType, err := goToArrowType(tag.Value)
if err != nil {
return nil, fmt.Errorf("error converting '%s=%s' tag to arrow type: %w", tag.Key, tag.Value, err)
}
rawFields[tag.Key] = arrowType
rawFields[tag.Key] = arrow.BinaryTypes.String
}
}
}
Expand Down Expand Up @@ -335,5 +328,9 @@ func goToArrowType(value interface{}) (arrow.DataType, error) {
}

func init() {
outputs.Add("parquet", func() telegraf.Output { return &Parquet{} })
outputs.Add("parquet", func() telegraf.Output {
return &Parquet{
TimestampFieldName: &defaultTimestampFieldName,
}
})
}
12 changes: 6 additions & 6 deletions plugins/outputs/parquet/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ func TestCases(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
testDir := t.TempDir()
plugin := &Parquet{
Directory: testDir,
Directory: testDir,
TimestampFieldName: &defaultTimestampFieldName,
}
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
Expand Down Expand Up @@ -161,8 +162,9 @@ func TestRotation(t *testing.T) {

testDir := t.TempDir()
plugin := &Parquet{
Directory: testDir,
RotationInterval: config.Duration(1 * time.Second),
Directory: testDir,
RotationInterval: config.Duration(1 * time.Second),
TimestampFieldName: &defaultTimestampFieldName,
}

require.NoError(t, plugin.Init())
Expand All @@ -188,11 +190,9 @@ func TestOmitTimestamp(t *testing.T) {
),
}

emptyTimestamp := ""
testDir := t.TempDir()
plugin := &Parquet{
Directory: testDir,
TimestampFieldName: &emptyTimestamp,
Directory: testDir,
}
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
Expand Down