Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
478d1f3
Initial commit to add timeout as a parm to export, make retries encom…
DylanRussell Apr 30, 2025
ccdd224
Fix lint issues
DylanRussell Apr 30, 2025
5bc8894
Fix a bunch of failing style/lint/spellcheck checks
DylanRussell May 1, 2025
ba92c5a
Remove timeout param from the export calls.
DylanRussell May 2, 2025
29144a1
Fix flaky windows test ?
DylanRussell May 2, 2025
838d7d9
Merge branch 'main' into retry2
DylanRussell May 6, 2025
66a4ebe
Merge branch 'main' into retry2
DylanRussell May 8, 2025
95ccfea
Respond to review comments..
DylanRussell May 9, 2025
d5ca894
Merge branch 'main' of github.com:DylanRussell/opentelemetry-python i…
DylanRussell May 12, 2025
8770e15
Delete exponential backoff code that is now unused
DylanRussell May 13, 2025
4c74411
Merge remote-tracking branch 'origin/main' into retry2
DylanRussell May 13, 2025
f373caa
Add changelog and remove some unused imports..
DylanRussell May 13, 2025
d1e04e1
fix typo and unit test flaking on windows
DylanRussell May 13, 2025
f42ecd3
Refactor tests, HTTP exporters a bit
DylanRussell May 22, 2025
096b9f8
Merge remote-tracking branch 'origin/main' into retry2
DylanRussell May 22, 2025
8673b45
Merge remote-tracking branch 'origin/main' into retry2
DylanRussell May 22, 2025
46e15f1
Remove unneeded test reqs
DylanRussell May 22, 2025
dcba91a
Remove gRPC retry config
DylanRussell Jun 5, 2025
d506d54
Merge remote-tracking branch 'origin' into retry2
DylanRussell Jun 5, 2025
71b77e1
Tweak backoff calculation
DylanRussell Jun 5, 2025
2ae79bb
Lint and precommit
DylanRussell Jun 5, 2025
553ea3e
Empty commit
DylanRussell Jun 5, 2025
28b9399
Another empty commit
DylanRussell Jun 5, 2025
b4df54a
Calculate backoff in 1 place instead of 2
DylanRussell Jun 5, 2025
9e1ba28
Update changelog
DylanRussell Jun 5, 2025
0b54090
Update changelog
DylanRussell Jun 5, 2025
bc3110a
Make new _common directory in the http exporter for shared code
DylanRussell Jun 5, 2025
4bbecf8
precommit
DylanRussell Jun 5, 2025
3076c0f
Make many changes
DylanRussell Jun 9, 2025
f2583e0
Reorder shutdown stuff
DylanRussell Jun 10, 2025
f503053
Merge remote-tracking branch 'origin/main' into shutdown_refactor
DylanRussell Jun 13, 2025
5fa8c23
Fix merging
DylanRussell Jun 13, 2025
62d3699
Don't join the thread in case we are stuck in an individual export call
DylanRussell Jun 16, 2025
eb59db3
Add tests, changelog entry
DylanRussell Jun 16, 2025
61c28da
Update time assertions to satisfy windows.. Fix lint issues
DylanRussell Jun 16, 2025
23cd24a
Skip test on windows
DylanRussell Jun 16, 2025
b81c619
Merge branch 'main' into shutdown_refactor
DylanRussell Jun 17, 2025
33a860f
Use threading Event instead of sleep loop.
DylanRussell Jun 27, 2025
deec916
Merge remote-tracking branch 'origin' into shutdown_refactor
DylanRussell Jul 14, 2025
fb7f320
Respond to review comments..
DylanRussell Jul 14, 2025
79946d8
Pass remaining timeout to shutdown
DylanRussell Jul 16, 2025
adbe4b0
Run precommit
DylanRussell Jul 16, 2025
f47c24d
Change variable names
DylanRussell Jul 17, 2025
48309b6
Switch timeout back to timeout_millis
DylanRussell Jul 17, 2025
5218399
Merge branch 'main' of github.com:DylanRussell/opentelemetry-python i…
DylanRussell Jul 17, 2025
023f259
Update CHANGELOG.md
DylanRussell Jul 18, 2025
768435b
Update CHANGELOG.md
DylanRussell Jul 18, 2025
950f57a
Rename variable
DylanRussell Jul 18, 2025
eb17ec2
Merge branch 'shutdown_refactor' of github.com:DylanRussell/opentelem…
DylanRussell Jul 18, 2025
235d22d
Fix variable name
DylanRussell Jul 18, 2025
dbfd27d
Merge branch 'main' into shutdown_refactor
DylanRussell Jul 22, 2025
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove gRPC retry config
  • Loading branch information
DylanRussell committed Jun 5, 2025
commit dcba91a2cd91d4469d037d3a12ee4b99bfc04cf9
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@

"""OTLP Exporter"""

import json
import random
import threading
from abc import ABC, abstractmethod
from collections.abc import Sequence # noqa: F401
from logging import getLogger
from os import environ
from time import sleep, time
from typing import ( # noqa: F401
Any,
Callable,
Expand All @@ -34,6 +35,7 @@
from typing import Sequence as TypingSequence
from urllib.parse import urlparse

from google.rpc.error_details_pb2 import RetryInfo
from typing_extensions import deprecated

from grpc import (
Expand Down Expand Up @@ -72,35 +74,18 @@
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.util.re import parse_env_headers

# 5 is the maximum allowable attempts allowed by grpc retry policy.
# This policy results in backoffs of 1s, 2s, 4s, and then 8s after the initial failed attempt,
# plus or minus a 20% jitter. Timeout set on the RPC call encompasses the retry backoffs AND time spent waiting
# for a response. DEADLINE_EXCEEDED is returned if all the attempts cannot complete within the
# timeout, and all fail. A header `grpc-retry-pushback-ms` when set by the server will override
# and take precedence over this backoff. See https://grpc.io/docs/guides/retry/ for more details.
_GRPC_RETRY_POLICY = json.dumps(
{
"methodConfig": [
{
"name": [dict()],
"retryPolicy": {
"maxAttempts": 5,
"initialBackoff": "1s",
"maxBackoff": "9s",
"backoffMultiplier": 2,
"retryableStatusCodes": [
"UNAVAILABLE",
"CANCELLED",
"RESOURCE_EXHAUSTED",
"ABORTED",
"OUT_OF_RANGE",
"DATA_LOSS",
],
},
}
]
}
_RETRYABLE_ERROR_CODES = frozenset(
[
StatusCode.CANCELLED,
StatusCode.DEADLINE_EXCEEDED,
StatusCode.RESOURCE_EXHAUSTED,
StatusCode.ABORTED,
StatusCode.OUT_OF_RANGE,
StatusCode.UNAVAILABLE,
StatusCode.DATA_LOSS,
]
)
_MAX_RETRYS = 6
logger = getLogger(__name__)
SDKDataT = TypeVar("SDKDataT")
ResourceDataT = TypeVar("ResourceDataT")
Expand Down Expand Up @@ -273,9 +258,6 @@ def __init__(
self._channel = insecure_channel(
self._endpoint,
compression=compression,
options=[
("grpc.service_config", _GRPC_RETRY_POLICY),
],
)
else:
credentials = _get_credentials(
Expand All @@ -288,9 +270,6 @@ def __init__(
self._endpoint,
credentials,
compression=compression,
options=[
("grpc.service_config", _GRPC_RETRY_POLICY),
],
)
self._client = self._stub(self._channel)

Expand All @@ -314,23 +293,50 @@ def _export(
# FIXME remove this check if the export type for traces
# gets updated to a class that represents the proto
# TracesData and use the code below instead.
retry_info = RetryInfo()
with self._export_lock:
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
)
return self._result.SUCCESS
except RpcError as error:
logger.error(
"Failed to export %s to %s, error code: %s",
self._exporting,
self._endpoint,
error.code(),
exc_info=error.code() == StatusCode.UNKNOWN,
)
return self._result.FAILURE
deadline_sec = time() + self._timeout
backoff_seconds = 1 * random.uniform(0.8, 1.2)
for retry_num in range(1, _MAX_RETRYS + 1):
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
)
return self._result.SUCCESS
except RpcError as error:
retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
)
if retry_info_bin is not None:
retry_info.ParseFromString(retry_info_bin)
backoff_seconds = (
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)
if (
error.code() not in _RETRYABLE_ERROR_CODES
or retry_num == _MAX_RETRYS
or backoff_seconds > (deadline_sec - time())
):
logger.error(
"Failed to export %s to %s, error code: %s",
self._exporting,
self._endpoint,
error.code(),
exc_info=error.code() == StatusCode.UNKNOWN,
)
return self._result.FAILURE
logger.warning(
"Transient error %s encountered while exporting logs batch, retrying in %.2fs.",
error.code(),
backoff_seconds,
)
sleep(backoff_seconds)
backoff_seconds *= 2 * random.uniform(0.8, 1.2)
# Not possible to reach here but the linter is complaining.
return self._result.FAILURE

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
if self._shutdown:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@
from logging import WARNING, getLogger
from typing import Any, Optional, Sequence
from unittest import TestCase
from unittest.mock import ANY, Mock, patch
from unittest.mock import Mock, patch

from google.protobuf.duration_pb2 import ( # pylint: disable=no-name-in-module
Duration,
)
from google.rpc.error_details_pb2 import ( # pylint: disable=no-name-in-module
RetryInfo,
)
from grpc import Compression, StatusCode, server

from opentelemetry.exporter.otlp.proto.common.trace_encoder import (
Expand Down Expand Up @@ -84,35 +90,39 @@ class TraceServiceServicerWithExportParams(TraceServiceServicer):
def __init__(
self,
export_result: StatusCode,
optional_export_sleep: Optional[float] = None,
optional_first_time_retry_millis: Optional[int] = None,
optional_export_sleep: Optional[float] = None,
):
self.export_result = export_result
self.optional_export_sleep = optional_export_sleep
self.optional_first_time_retry_millis = (
optional_first_time_retry_millis
)
self.first_attempt = True
self.num_requests = 0
self.now = time.time()

# pylint: disable=invalid-name,unused-argument
def Export(self, request, context):
self.num_requests += 1
if self.optional_export_sleep:
time.sleep(self.optional_export_sleep)
if self.export_result != StatusCode.OK:
if self.optional_first_time_retry_millis and self.first_attempt:
self.first_attempt = False
if (
self.optional_first_time_retry_millis
and self.num_requests == 1
):
context.set_trailing_metadata(
(
(
"grpc-retry-pushback-ms",
str(self.optional_first_time_retry_millis),
"google.rpc.retryinfo-bin",
RetryInfo(
retry_delay=Duration(
nanos=self.optional_first_time_retry_millis
)
).SerializeToString(),
),
)
)
context.abort(self.export_result, "")
context.set_code(self.export_result)

return ExportTraceServiceResponse()

Expand Down Expand Up @@ -262,7 +272,6 @@ def test_otlp_exporter_otlp_compression_unspecified(
mock_insecure_channel.assert_called_once_with(
"localhost:4317",
compression=Compression.NoCompression,
options=ANY,
)

# pylint: disable=no-self-use, disable=unused-argument
Expand All @@ -286,7 +295,7 @@ def test_otlp_exporter_otlp_compression_envvar(
"""Just OTEL_EXPORTER_OTLP_COMPRESSION should work"""
OTLPSpanExporterForTesting(insecure=True)
mock_insecure_channel.assert_called_once_with(
"localhost:4317", compression=Compression.Gzip, options=ANY
"localhost:4317", compression=Compression.Gzip
)

def test_shutdown(self):
Expand Down Expand Up @@ -366,7 +375,7 @@ def test_export_over_closed_grpc_channel(self):
str(err.exception), "Cannot invoke RPC on closed channel!"
)

def test_retry_with_server_pushback(self):
def test_retry_info_is_respected(self):
mock_trace_service = TraceServiceServicerWithExportParams(
StatusCode.UNAVAILABLE, optional_first_time_retry_millis=200
)
Expand All @@ -381,34 +390,34 @@ def test_retry_with_server_pushback(self):
SpanExportResult.FAILURE,
)
after = time.time()
# We set the `grpc-retry-pushback-ms` header to 200 millis on the first server response.
# We set the `grpc-retry-pushback-ms` header to 200 millis on the first server response only,
# and then we do exponential backoff using that first value.
# So we expect the first request at time 0, second at time 0.2,
# third at 1.2 (start of backoff policy), fourth at time 3.2, last at time 7.2.
self.assertEqual(mock_trace_service.num_requests, 5)
# The backoffs have a jitter +- 20%, so we have to put a higher bound than 7.2.
self.assertTrue(after - before < 8.8)
# third at .6, fourth at 1.4, fifth at 3, and final one at 6.2.
self.assertEqual(mock_trace_service.num_requests, 6)
# The backoffs have a jitter +- 20%, so we have to put a higher bound than 6.2.
self.assertTrue(after - before < 7.5)

def test_retry_timeout(self):
def test_retry_not_made_if_would_exceed_timeout(self):
mock_trace_service = TraceServiceServicerWithExportParams(
StatusCode.UNAVAILABLE
)
add_TraceServiceServicer_to_server(
mock_trace_service,
self.server,
)
# Set timeout to 1.5 seconds.
exporter = OTLPSpanExporterForTesting(insecure=True, timeout=1.5)
exporter = OTLPSpanExporterForTesting(insecure=True, timeout=4)
before = time.time()
self.assertEqual(
exporter.export([self.span]),
SpanExportResult.FAILURE,
)
after = time.time()
# Our retry starts with a 1 second backoff then doubles.
# So we expect just two calls: one at time 0, one at time 1.
self.assertEqual(mock_trace_service.num_requests, 2)
# gRPC retry config waits for the timeout (1.5) before cancelling the request.
self.assertTrue(after - before < 1.6)
# First call at time 0, second at time 1, third at time 3, fourth would exceed timeout.
self.assertEqual(mock_trace_service.num_requests, 3)
# There's a +/-20% jitter on each backoff.
self.assertTrue(after - before < 3.7)

def test_timeout_set_correctly(self):
mock_trace_service = TraceServiceServicerWithExportParams(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from os.path import dirname
from typing import List
from unittest import TestCase
from unittest.mock import ANY, patch
from unittest.mock import patch

from grpc import ChannelCredentials, Compression

Expand Down Expand Up @@ -299,7 +299,6 @@ def test_otlp_exporter_otlp_compression_kwarg(self, mock_insecure_channel):
mock_insecure_channel.assert_called_once_with(
"localhost:4317",
compression=Compression.NoCompression,
options=ANY,
)

def test_split_metrics_data_many_data_points(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import os
from unittest import TestCase
from unittest.mock import ANY, Mock, PropertyMock, patch
from unittest.mock import Mock, PropertyMock, patch

from grpc import ChannelCredentials, Compression

Expand Down Expand Up @@ -335,7 +335,6 @@ def test_otlp_exporter_otlp_compression_kwarg(self, mock_insecure_channel):
mock_insecure_channel.assert_called_once_with(
"localhost:4317",
compression=Compression.NoCompression,
options=ANY,
)

# pylint: disable=no-self-use
Expand All @@ -354,7 +353,6 @@ def test_otlp_exporter_otlp_compression_precendence(
mock_insecure_channel.assert_called_once_with(
"localhost:4317",
compression=Compression.Gzip,
options=ANY,
)

def test_translate_spans(self):
Expand Down