Skip to content

Commit 0253954

Browse files
authored
database_observability: add function to build loki entry with timestamp (#3469)
1 parent 5dc495d commit 0253954

File tree

5 files changed

+29
-10
lines changed

5 files changed

+29
-10
lines changed

‎CHANGELOG.md‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ Main (unreleased)
3434
- make tidbparser the default choice (@cristiangreco)
3535
- `query_sample`: better handling of timer overflows (@fridgepoet)
3636
- collect metrics on enabled `performance_schema.setup_consumers` (@fridgepoet)
37+
- `query_sample`: base log entries on calculated timestamp from rows, not now() (@fridgepoet)
3738

3839
- Mixin dashboards improvements: added minimum cluster size to Cluster Overview dashboard, fixed units in OpenTelemetry dashboard, fixed slow components evaluation time units in Controller dashboard and updated Prometheus dashboard to correctly aggregate across instances. (@thampiotr)
3940

‎internal/component/database_observability/mysql/collector/loki_entry.go‎

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,20 @@ import (
1212
"github.com/grafana/alloy/internal/runtime/logging"
1313
)
1414

15-
func buildLokiEntry(level logging.Level, op, instanceKey, line string) loki.Entry {
15+
func buildLokiEntryWithTimestamp(level logging.Level, op, instanceKey, line string, timestamp int64) loki.Entry {
1616
return loki.Entry{
1717
Labels: model.LabelSet{
1818
"job": database_observability.JobName,
1919
"op": model.LabelValue(op),
2020
"instance": model.LabelValue(instanceKey),
2121
},
2222
Entry: logproto.Entry{
23-
Timestamp: time.Unix(0, time.Now().UnixNano()),
23+
Timestamp: time.Unix(0, timestamp),
2424
Line: fmt.Sprintf(`level="%s" %s`, level, line),
2525
},
2626
}
2727
}
28+
29+
func buildLokiEntry(level logging.Level, op, instanceKey, line string) loki.Entry {
30+
return buildLokiEntryWithTimestamp(level, op, instanceKey, line, time.Now().UnixNano())
31+
}

‎internal/component/database_observability/mysql/collector/loki_entry_test.go‎

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package collector
22

33
import (
44
"testing"
5+
"time"
56

67
"github.com/stretchr/testify/require"
78

@@ -18,3 +19,10 @@ func TestBuildLokiEntry(t *testing.T) {
1819
require.Equal(t, "test-instance", string(entry.Labels["instance"]))
1920
require.Equal(t, `level="debug" This is a test log line`, entry.Line)
2021
}
22+
23+
func TestBuildLokiEntryWithTimestamp(t *testing.T) {
24+
entry := buildLokiEntryWithTimestamp(logging.LevelInfo, "test-operation", "test-instance", "This is a test log line", 5)
25+
26+
require.Equal(t, int64(5), entry.Entry.Timestamp.UnixNano())
27+
require.Equal(t, time.Unix(0, 5), entry.Entry.Timestamp)
28+
}

‎internal/component/database_observability/mysql/collector/query_sample.go‎

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
204204

205205
// sample time
206206
TimerEndPicoseconds sql.NullFloat64
207-
TimestampMilliseconds uint64
207+
TimestampMilliseconds float64
208208
ElapsedTimePicoseconds sql.NullFloat64
209209
CPUTime float64
210210

@@ -286,11 +286,12 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
286286
logMessage += fmt.Sprintf(` sql_text="%s"`, row.SQLText.String)
287287
}
288288

289-
c.entryHandler.Chan() <- buildLokiEntry(
289+
c.entryHandler.Chan() <- buildLokiEntryWithTimestamp(
290290
logging.LevelInfo,
291291
OP_QUERY_SAMPLE,
292292
c.instanceKey,
293293
logMessage,
294+
int64(millisecondsToNanoseconds(row.TimestampMilliseconds)),
294295
)
295296
}
296297

@@ -302,7 +303,7 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
302303
return nil
303304
}
304305

305-
func (c *QuerySample) calculateWallTime(serverStartTime, timer float64) uint64 {
306+
func (c *QuerySample) calculateWallTime(serverStartTime, timer float64) float64 {
306307
// timer indicates event timing since server startup.
307308
// The timer value is in picoseconds with a column type of bigint unsigned. This value can overflow after about ~213 days.
308309
// We need to account for this overflow when calculating the timestamp.
@@ -358,6 +359,7 @@ const (
358359
picosecondsPerSecond float64 = 1e12
359360
millisecondsPerSecond float64 = 1e3
360361
millisecondsPerPicosecond float64 = 1e9
362+
nanosecondsPerMillisecond float64 = 1e6
361363
)
362364

363365
func picosecondsToSeconds(picoseconds float64) float64 {
@@ -368,10 +370,14 @@ func picosecondsToMilliseconds(picoseconds float64) float64 {
368370
return picoseconds / millisecondsPerPicosecond
369371
}
370372

373+
func millisecondsToNanoseconds(milliseconds float64) float64 {
374+
return milliseconds * nanosecondsPerMillisecond
375+
}
376+
371377
func secondsToPicoseconds(seconds float64) float64 {
372378
return seconds * picosecondsPerSecond
373379
}
374380

375-
func secondsToMilliseconds(seconds float64) uint64 {
376-
return uint64(seconds * millisecondsPerSecond)
381+
func secondsToMilliseconds(seconds float64) float64 {
382+
return seconds * millisecondsPerSecond
377383
}

‎internal/component/database_observability/mysql/collector/query_sample_test.go‎

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1482,7 +1482,7 @@ func TestQuerySample_calculateWallTime(t *testing.T) {
14821482
timer := 2e12 // Timer indicates event timing, counted since server startup. 2 seconds in picoseconds
14831483

14841484
result := c.calculateWallTime(serverStartTime, timer)
1485-
assert.Equalf(t, uint64(4000), result, "got %d, want 4000", result)
1485+
assert.Equalf(t, float64(4000), result, "got %f, want 4000", result)
14861486
})
14871487

14881488
t.Run("calculates the timestamp, taking into account the overflows", func(t *testing.T) {
@@ -1492,7 +1492,7 @@ func TestQuerySample_calculateWallTime(t *testing.T) {
14921492

14931493
result := c.calculateWallTime(serverStartTime, timer)
14941494

1495-
assert.Equalf(t, uint64(18446749073), result, "got %d, want 18446749073", result)
1495+
assert.Equalf(t, 18446749073.709553, result, "got %f, want 18446749073.709553", result)
14961496
})
14971497

14981498
t.Run("calculates another timestamp when timer approaches overflow", func(t *testing.T) {
@@ -1502,7 +1502,7 @@ func TestQuerySample_calculateWallTime(t *testing.T) {
15021502

15031503
result := c.calculateWallTime(serverStartTime, timer)
15041504

1505-
assert.Equalf(t, uint64(36893491147), result, "got %d, want 36893491147", result)
1505+
assert.Equalf(t, 3.6893491147419106e+10, result, "got %f, want 3.6893491147419106e+10", result)
15061506
})
15071507
}
15081508

0 commit comments

Comments
 (0)