Skip to content

Commit 4f962ef

Browse files
trevorwhitneysvennergrashwanthgoliJordanRushing
authored
fix: enable service detection for otlp endoint (#14036)
Co-authored-by: Sven Grossmann <Svennergr@gmail.com> Co-authored-by: Ashwanth <iamashwanth@gmail.com> Co-authored-by: JordanRushing <rushing.jordan@gmail.com>
1 parent aa1ac99 commit 4f962ef

File tree

5 files changed

+160
-15
lines changed

5 files changed

+160
-15
lines changed

‎docs/sources/shared/configuration.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3680,7 +3680,7 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v
36803680
# list to service_name. If none of the configured labels exist in the stream,
36813681
# label is set to unknown_service. Empty list disables setting the label.
36823682
# CLI flag: -validation.discover-service-name
3683-
[discover_service_name: <list of strings> | default = [service app application name app_kubernetes_io_name container container_name component workload job]]
3683+
[discover_service_name: <list of strings> | default = [service app application name app_kubernetes_io_name container container_name k8s_container_name component workload job k8s_job_name]]
36843684

36853685
# Discover and add log levels during ingestion, if not present already. Levels
36863686
# would be added to Structured Metadata with name

‎pkg/loghttp/push/otlp.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRe
4747
return nil, nil, err
4848
}
4949

50-
req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), tracker, stats)
50+
req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats)
5151
return req, stats, nil
5252
}
5353

@@ -98,7 +98,7 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) {
9898
return req.Logs(), nil
9999
}
100100

101-
func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, tracker UsageTracker, stats *Stats) *logproto.PushRequest {
101+
func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats) *logproto.PushRequest {
102102
if ld.LogRecordCount() == 0 {
103103
return &logproto.PushRequest{}
104104
}
@@ -111,12 +111,14 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
111111
res := rls.At(i).Resource()
112112
resAttrs := res.Attributes()
113113

114-
if v, ok := resAttrs.Get(attrServiceName); !ok || v.AsString() == "" {
115-
resAttrs.PutStr(attrServiceName, "unknown_service")
116-
}
117114
resourceAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, resAttrs.Len())
118115
streamLabels := make(model.LabelSet, 30) // we have a default labels limit of 30 so just initialize the map of same size
119116

117+
shouldDiscoverServiceName := len(discoverServiceName) > 0 && !stats.IsAggregatedMetric
118+
hasServiceName := false
119+
if v, ok := resAttrs.Get(attrServiceName); ok && v.AsString() != "" {
120+
hasServiceName = true
121+
}
120122
resAttrs.Range(func(k string, v pcommon.Value) bool {
121123
action := otlpConfig.ActionForResourceAttribute(k)
122124
if action == Drop {
@@ -127,6 +129,16 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
127129
if action == IndexLabel {
128130
for _, lbl := range attributeAsLabels {
129131
streamLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
132+
133+
if !hasServiceName && shouldDiscoverServiceName {
134+
for _, labelName := range discoverServiceName {
135+
if lbl.Name == labelName {
136+
streamLabels[model.LabelName(LabelServiceName)] = model.LabelValue(lbl.Value)
137+
hasServiceName = true
138+
break
139+
}
140+
}
141+
}
130142
}
131143
} else if action == StructuredMetadata {
132144
resourceAttributesAsStructuredMetadata = append(resourceAttributesAsStructuredMetadata, attributeAsLabels...)
@@ -135,6 +147,10 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
135147
return true
136148
})
137149

150+
if !hasServiceName && shouldDiscoverServiceName {
151+
streamLabels[model.LabelName(LabelServiceName)] = model.LabelValue(ServiceUnknown)
152+
}
153+
138154
if err := streamLabels.Validate(); err != nil {
139155
stats.Errs = append(stats.Errs, fmt.Errorf("invalid labels: %w", err))
140156
continue

‎pkg/loghttp/push/otlp_test.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,20 @@ import (
2020

2121
func TestOTLPToLokiPushRequest(t *testing.T) {
2222
now := time.Unix(0, time.Now().UnixNano())
23+
defaultServiceDetection := []string{
24+
"service",
25+
"app",
26+
"application",
27+
"name",
28+
"app_kubernetes_io_name",
29+
"container",
30+
"container_name",
31+
"k8s_container_name",
32+
"component",
33+
"workload",
34+
"job",
35+
"k8s_job_name",
36+
}
2337

2438
for _, tc := range []struct {
2539
name string
@@ -346,7 +360,8 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
346360
{
347361
Action: IndexLabel,
348362
Attributes: []string{"pod.name"},
349-
}, {
363+
},
364+
{
350365
Action: IndexLabel,
351366
Regex: relabel.MustNewRegexp("service.*"),
352367
},
@@ -493,7 +508,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
493508
t.Run(tc.name, func(t *testing.T) {
494509
stats := newPushStats()
495510
tracker := NewMockTracker()
496-
pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, tracker, stats)
511+
pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, defaultServiceDetection, tracker, stats)
497512
require.Equal(t, tc.expectedPushRequest, *pushReq)
498513
require.Equal(t, tc.expectedStats, *stats)
499514

@@ -592,7 +607,6 @@ func TestOTLPLogToPushEntry(t *testing.T) {
592607
require.Equal(t, tc.expectedResp, otlpLogToPushEntry(tc.buildLogRecord(), DefaultOTLPConfig(defaultGlobalOTLPConfig)))
593608
})
594609
}
595-
596610
}
597611

598612
func TestAttributesToLabels(t *testing.T) {

‎pkg/loghttp/push/push_test.go

Lines changed: 119 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import (
66
"compress/gzip"
77
"context"
88
"fmt"
9+
"io"
910
"log"
11+
"net/http"
1012
"net/http/httptest"
1113
"strings"
1214
"testing"
@@ -16,6 +18,10 @@ import (
1618
"github.com/prometheus/prometheus/model/labels"
1719
"github.com/stretchr/testify/assert"
1820
"github.com/stretchr/testify/require"
21+
"go.opentelemetry.io/collector/pdata/pcommon"
22+
"go.opentelemetry.io/collector/pdata/plog"
23+
24+
"github.com/grafana/dskit/flagext"
1925

2026
util_log "github.com/grafana/loki/v3/pkg/util/log"
2127
)
@@ -256,7 +262,7 @@ func TestParseRequest(t *testing.T) {
256262
}
257263

258264
tracker := NewMockTracker()
259-
data, err := ParseRequest(util_log.Logger, "fake", request, nil, &fakeLimits{test.enableServiceDiscovery}, ParseLokiRequest, tracker)
265+
data, err := ParseRequest(util_log.Logger, "fake", request, nil, &fakeLimits{enabled: test.enableServiceDiscovery}, ParseLokiRequest, tracker)
260266

261267
structuredMetadataBytesReceived := int(structuredMetadataBytesReceivedStats.Value()["total"].(int64)) - previousStructuredMetadataBytesReceived
262268
previousStructuredMetadataBytesReceived += structuredMetadataBytesReceived
@@ -314,19 +320,124 @@ func TestParseRequest(t *testing.T) {
314320
}
315321
}
316322

323+
func Test_ServiceDetection(t *testing.T) {
324+
tracker := NewMockTracker()
325+
326+
createOtlpLogs := func(labels ...string) []byte {
327+
now := time.Unix(0, time.Now().UnixNano())
328+
ld := plog.NewLogs()
329+
for i := 0; i < len(labels); i += 2 {
330+
ld.ResourceLogs().AppendEmpty().Resource().Attributes().PutStr(labels[i], labels[i+1])
331+
}
332+
ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("test body")
333+
ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).SetTimestamp(pcommon.Timestamp(now.UnixNano()))
334+
335+
jsonMarshaller := plog.JSONMarshaler{}
336+
body, err := jsonMarshaller.MarshalLogs(ld)
337+
338+
require.NoError(t, err)
339+
return body
340+
}
341+
342+
createRequest := func(path string, body io.Reader) *http.Request {
343+
request := httptest.NewRequest(
344+
"POST",
345+
path,
346+
body,
347+
)
348+
request.Header.Add("Content-Type", "application/json")
349+
350+
return request
351+
}
352+
353+
t.Run("detects servce from loki push requests", func(t *testing.T) {
354+
body := `{"streams": [{ "stream": { "foo": "bar" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`
355+
request := createRequest("/loki/api/v1/push", strings.NewReader(body))
356+
357+
limits := &fakeLimits{enabled: true, labels: []string{"foo"}}
358+
data, err := ParseRequest(util_log.Logger, "fake", request, nil, limits, ParseLokiRequest, tracker)
359+
360+
require.NoError(t, err)
361+
require.Equal(t, labels.FromStrings("foo", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels)
362+
})
363+
364+
t.Run("detects servce from OTLP push requests using default indexing", func(t *testing.T) {
365+
body := createOtlpLogs("k8s.job.name", "bar")
366+
request := createRequest("/otlp/v1/push", bytes.NewReader(body))
367+
368+
limits := &fakeLimits{enabled: true}
369+
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
370+
require.NoError(t, err)
371+
require.Equal(t, labels.FromStrings("k8s_job_name", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels)
372+
})
373+
374+
t.Run("detects servce from OTLP push requests using custom indexing", func(t *testing.T) {
375+
body := createOtlpLogs("special", "sauce")
376+
request := createRequest("/otlp/v1/push", bytes.NewReader(body))
377+
378+
limits := &fakeLimits{
379+
enabled: true,
380+
labels: []string{"special"},
381+
indexAttributes: []string{"special"},
382+
}
383+
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
384+
require.NoError(t, err)
385+
require.Equal(t, labels.FromStrings("special", "sauce", LabelServiceName, "sauce").String(), data.Streams[0].Labels)
386+
})
387+
388+
t.Run("only detects custom service label from indexed labels", func(t *testing.T) {
389+
body := createOtlpLogs("special", "sauce")
390+
request := createRequest("/otlp/v1/push", bytes.NewReader(body))
391+
392+
limits := &fakeLimits{
393+
enabled: true,
394+
labels: []string{"special"},
395+
indexAttributes: []string{},
396+
}
397+
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
398+
require.NoError(t, err)
399+
require.Equal(t, labels.FromStrings(LabelServiceName, ServiceUnknown).String(), data.Streams[0].Labels)
400+
})
401+
}
402+
317403
type fakeLimits struct {
318-
enabled bool
404+
enabled bool
405+
labels []string
406+
indexAttributes []string
319407
}
320408

321-
func (l *fakeLimits) OTLPConfig(_ string) OTLPConfig {
322-
return OTLPConfig{}
409+
func (f *fakeLimits) RetentionPeriodFor(_ string, _ labels.Labels) time.Duration {
410+
return time.Hour
323411
}
324412

325-
func (l *fakeLimits) DiscoverServiceName(_ string) []string {
326-
if !l.enabled {
413+
func (f *fakeLimits) OTLPConfig(_ string) OTLPConfig {
414+
if len(f.indexAttributes) > 0 {
415+
return OTLPConfig{
416+
ResourceAttributes: ResourceAttributesConfig{
417+
AttributesConfig: []AttributesConfig{
418+
{
419+
Action: IndexLabel,
420+
Attributes: f.indexAttributes,
421+
},
422+
},
423+
},
424+
}
425+
}
426+
427+
defaultGlobalOTLPConfig := GlobalOTLPConfig{}
428+
flagext.DefaultValues(&defaultGlobalOTLPConfig)
429+
return DefaultOTLPConfig(defaultGlobalOTLPConfig)
430+
}
431+
432+
func (f *fakeLimits) DiscoverServiceName(_ string) []string {
433+
if !f.enabled {
327434
return nil
328435
}
329436

437+
if len(f.labels) > 0 {
438+
return f.labels
439+
}
440+
330441
return []string{
331442
"service",
332443
"app",
@@ -335,9 +446,11 @@ func (l *fakeLimits) DiscoverServiceName(_ string) []string {
335446
"app_kubernetes_io_name",
336447
"container",
337448
"container_name",
449+
"k8s_container_name",
338450
"component",
339451
"workload",
340452
"job",
453+
"k8s_job_name",
341454
}
342455
}
343456

‎pkg/validation/limits.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,9 +263,11 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
263263
"app_kubernetes_io_name",
264264
"container",
265265
"container_name",
266+
"k8s_container_name",
266267
"component",
267268
"workload",
268269
"job",
270+
"k8s_job_name",
269271
}
270272
f.Var((*dskit_flagext.StringSlice)(&l.DiscoverServiceName), "validation.discover-service-name", "If no service_name label exists, Loki maps a single label from the configured list to service_name. If none of the configured labels exist in the stream, label is set to unknown_service. Empty list disables setting the label.")
271273
f.BoolVar(&l.DiscoverLogLevels, "validation.discover-log-levels", true, "Discover and add log levels during ingestion, if not present already. Levels would be added to Structured Metadata with name level/LEVEL/Level/Severity/severity/SEVERITY/lvl/LVL/Lvl (case-sensitive) and one of the values from 'trace', 'debug', 'info', 'warn', 'error', 'critical', 'fatal' (case insensitive).")

0 commit comments

Comments
 (0)