Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Dec 23, 2025

related: #55809

Resolves circular import issue affecting Airflow 2.x deployments when CeleryExecutor is configured with sentry enabled. This caused Celery worker readiness probes to fail and prevented scheduler startup with "cannot import name 'AirflowTaskTimeout' from partially initialized module" errors.

The root cause is that the import chain executor → version_compat → BaseOperator → taskinstance → sentry → executor creates a circular dependency when sentry is enabled, as sentry initialization attempts to load the executor while it's being imported.

How I repro'd it:

(apache-airflow) ➜  airflow git:(main-ci) ✗ docker run -it --rm --entrypoint bash apache/airflow:2.11.0
airflow@535e59b009c4:/opt/airflow$ pip install -q sentry-sdk apache-airflow-providers-celery==3.14.0 apache-airflow-providers-common-compat==1.10.1

[notice] A new release of pip is available: 25.1.1 -> 25.3
[notice] To update, run: pip install --upgrade pip
airflow@535e59b009c4:/opt/airflow$ export AIRFLOW__CORE__EXECUTOR=CeleryExecutor
export AIRFLOW__SENTRY__SENTRY_ON=True
export AIRFLOW__SENTRY__SENTRY_DSN=https://fake@fake.ingest.sentry.io/fake
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=sqlite:////tmp/test.db
airflow@535e59b009c4:/opt/airflow$ celery -A airflow.executors.celery_executor.app inspect ping
/home/airflow/.local/lib/python3.12/site-packages/airflow/metrics/base_stats_logger.py:22 RemovedInAirflow3Warning: Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.
Usage: celery [OPTIONS] COMMAND [ARGS]...
Try 'celery --help' for help.

Error: 
Unable to load celery application.
While trying to load the module airflow.executors.celery_executor.app the following error occurred:
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/deprecation_tools.py", line 56, in getattr_with_deprecation
    return getattr(importlib.import_module(new_module), new_class_name)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/importlib/__init__.py", line 90, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1331, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 935, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 999, in exec_module
  File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 47, in <module>
    from airflow.providers.common.compat.sdk import AirflowTaskTimeout, timeout
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/common/compat/sdk.py", line 28, in <module>
    from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/common/compat/version_compat.py", line 41, in <module>
    from airflow.models import BaseOperator
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/__init__.py", line 79, in __getattr__
    val = import_string(f"{path}.{name}")
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/module_loading.py", line 39, in import_string
    module = import_module(module_path)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/importlib/__init__.py", line 90, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 83, in <module>
    from airflow.models.mappedoperator import OperatorPartial, validate_mapping_kwargs
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/mappedoperator.py", line 54, in <module>
    from airflow.triggers.base import StartTriggerArgs
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/triggers/base.py", line 27, in <module>
    from airflow.models.taskinstance import SimpleTaskInstance
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 106, in <module>
    from airflow.sentry import Sentry
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sentry.py", line 196, in <module>
    Sentry = ConfiguredSentry()
             ^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sentry.py", line 88, in __init__
    executor_class, _ = ExecutorLoader.import_default_executor_cls(validate=False)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/executor_loader.py", line 314, in import_default_executor_cls
    executor, source = cls.import_executor_cls(executor_name, validate=validate)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/executor_loader.py", line 302, in import_executor_cls
    return _import_and_validate(executor_name.module_path), executor_name.connector_source
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/executor_loader.py", line 286, in _import_and_validate
    executor = import_string(path)
               ^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/module_loading.py", line 39, in import_string
    module = import_module(module_path)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/importlib/__init__.py", line 90, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/executors/celery_executor.py", line 57, in <module>
    from airflow.providers.common.compat.sdk import AirflowTaskTimeout
ImportError: cannot import name 'AirflowTaskTimeout' from partially initialized module 'airflow.providers.common.compat.sdk' (most likely due to a circular import) (/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/common/compat/sdk.py)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/celery/bin/celery.py", line 141, in celery
    app = find_app(app)
          ^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/celery/app/utils.py", line 383, in find_app
    sym = symbol_by_name(app, imp=imp)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/kombu/utils/imports.py", line 64, in symbol_by_name
    return getattr(module, cls_name) if cls_name else module
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/deprecation_tools.py", line 63, in getattr_with_deprecation
    raise ImportError(error_message) from e
ImportError: Could not import `airflow.providers.celery.executors.celery_executor_utils.app` while trying to import `airflow.executors.celery_executor.app`. For Celery executors, the `celery` provider should be >= 3.3.0. For Kubernetes executors, the `cncf.kubernetes` provider should be >= 7.4.0 for that. For Dask executors, any version of `daskexecutor` provider is needed..

Since the 2.11 is a slim image without editors, I managed to make changes via sed and recorded it:

airflow@535e59b009c4:~/.local/lib/python3.12/site-packages/airflow/providers/common/compat$ sed -i '/^if AIRFLOW_V_3_0_PLUS:/,/^else:/d' version_compat.py
sed -i '/^    from airflow.models import BaseOperator/d' version_compat.py
airflow@535e59b009c4:~/.local/lib/python3.12/site-packages/airflow/providers/common/compat$ cat version_compat.py 
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY
# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS
# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT
# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE
#
from __future__ import annotations


def get_base_airflow_version_tuple() -> tuple[int, int, int]:
    from packaging.version import Version

    from airflow import __version__

    airflow_version = Version(__version__)
    return airflow_version.major, airflow_version.minor, airflow_version.micro


AIRFLOW_V_3_0_PLUS: bool = get_base_airflow_version_tuple() >= (3, 0, 0)
AIRFLOW_V_3_1_PLUS: bool = get_base_airflow_version_tuple() >= (3, 1, 0)


__all__ = [
    "AIRFLOW_V_3_0_PLUS",
    "AIRFLOW_V_3_1_PLUS",
    "BaseOperator",
]
airflow@535e59b009c4:~/.local/lib/python3.12/site-packages/airflow/providers/common/compat$ sed -i '/"BaseOperator",/d' version_compat.py
airflow@535e59b009c4:~/.local/lib/python3.12/site-packages/airflow/providers/common/compat$ 
airflow@535e59b009c4:~/.local/lib/python3.12/site-packages/airflow/providers/common/compat$ 
airflow@535e59b009c4:~/.local/lib/python3.12/site-packages/airflow/providers/common/compat$ cd -
/opt/airflow
airflow@535e59b009c4:/opt/airflow$ ls
dags  logs
airflow@535e59b009c4:/opt/airflow$ celery -A airflow.executors.celery_executor.app inspect ping
/home/airflow/.local/lib/python3.12/site-packages/airflow/metrics/base_stats_logger.py:22 RemovedInAirflow3Warning: Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/kombu/transport/virtual/base.py", line 951, in create_channel
    return self._avail_channels.pop()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/redis/connection.py", line 357, in connect
    sock = self.retry.call_with_retry(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/redis/retry.py", line 62, in call_with_retry
    return do()
           ^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/redis/connection.py", line 358, in <lambda>
    lambda: self._connect(), lambda error: self.disconnect(error)
            ^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/redis/connection.py", line 698, in _connect
    for res in socket.getaddrinfo(
               ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/socket.py", line 978, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
socket.gaierror: [Errno -2] Name or service not known

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line 472, in _reraise_as_library_errors
    yield
  File "/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line 459, in _ensure_connection
    return retry_over_time(
           ^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/kombu/utils/functional.py", line 318, in retry_over_time
    return fun(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line 938, in _connection_factory
    self._connection = self._establish_connection()
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line 860, in _establish_connection
    conn = self.transport.establish_connection()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/kombu/transport/virtual/base.py", line 975, in establish_connection
    self._avail_channels.append(self.create_channel(self))
                                ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/kombu/transport/virtual/base.py", line 953, in create_channel
    channel = self.Channel(connection)
              ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/kombu/transport/redis.py", line 744, in __init__
    self.client.ping()
  File "/home/airflow/.local/lib/python3.12/site-packages/redis/commands/core.py", line 1212, in ping
    return self.execute_command("PING", **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/redis/client.py", line 559, in execute_command
    return self._execute_command(*args, **options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/redis/client.py", line 565, in _execute_command
    conn = self.connection or pool.get_connection(command_name, **options)
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/redis/connection.py", line 1422, in get_connection
    connection.connect()
  File "/home/airflow/.local/lib/python3.12/site-packages/redis/connection.py", line 363, in connect
    raise ConnectionError(self._error_message(e))
redis.exceptions.ConnectionError: Error -2 connecting to redis:6379. Name or service not known.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/bin/celery", line 8, in <module>
    sys.exit(main())
             ^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/celery/__main__.py", line 15, in main
    sys.exit(_main())
             ^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/celery/bin/celery.py", line 231, in main
    return celery(auto_envvar_prefix="CELERY")
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/click/core.py", line 1442, in __call__
    return self.main(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/click/core.py", line 1363, in main
    rv = self.invoke(ctx)
         ^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/click/core.py", line 1830, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/click/core.py", line 1226, in invoke
    return ctx.invoke(self.callback, **ctx.params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/click/core.py", line 794, in invoke
    return callback(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/click/decorators.py", line 34, in new_func
    return f(get_current_context(), *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/celery/bin/base.py", line 135, in caller
    return f(ctx, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/celery/bin/control.py", line 186, in inspect
    replies = inspect._request(command, **arguments)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/celery/app/control.py", line 106, in _request
    return self._prepare(self.app.control.broadcast(
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/celery/app/control.py", line 777, in broadcast
    return self.mailbox(conn)._broadcast(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/kombu/pidbox.py", line 330, in _broadcast
    chan = channel or self.connection.default_channel
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line 957, in default_channel
    self._ensure_connection(**conn_opts)
  File "/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line 458, in _ensure_connection
    with ctx():
         ^^^^^
  File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__
    self.gen.throw(value)
  File "/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line 476, in _reraise_as_library_errors
    raise ConnectionError(str(exc)) from exc
kombu.exceptions.OperationalError: Error -2 connecting to redis:6379. Name or service not known.

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@potiuk potiuk merged commit 5945bd5 into apache:main Dec 23, 2025
88 checks passed
@jedcunningham jedcunningham deleted the version-compat-circular-import branch December 23, 2025 15:10
Subham-KRLX pushed a commit to Subham-KRLX/airflow that referenced this pull request Jan 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment