-
Notifications
You must be signed in to change notification settings - Fork 16.2k
Description
Apache Airflow version
main (development)
If "Other Airflow 3 version" selected, which one?
No response
What happened?
The audit log page has been very slow in our environment. There are a lot of generated dags with same task_id present in different dags. Upon loading the events for a dag we noted that join for task instance is made on TaskInstance.task_id which results in incorrect join where events related to other task instances of same id are also returned. The query generated is as below where LEFT OUTER JOIN (task_instance based on task_id seems to be problematic. The default behavior should be fixed. The query is also loading everything though dag_id and task_display_name are the only required fields which can be improved.
airflow/airflow-core/src/airflow/models/log.py
Lines 60 to 66 in 98d5976
| task_instance = relationship( | |
| "TaskInstance", | |
| viewonly=True, | |
| foreign_keys=[task_id], | |
| primaryjoin="Log.task_id == TaskInstance.task_id", | |
| lazy="noload", | |
| ) |
SELECT log.id, log.dttm, log.dag_id, log.task_id, log.map_index, log.event, log.logical_date, log.run_id,
log.owner, log.owner_display_name, log.extra, log.try_number, dag_1.dag_display_name, dag_1.deadline,
dag_1.dag_id AS dag_id_1, dag_1.is_paused, dag_1.is_stale, dag_1.last_parsed_time,
dag_1.last_parse_duration, dag_1.last_expired, dag_1.fileloc, dag_1.relative_fileloc, dag_1.bundle_name, dag_1.bundle_version, dag_1.owners, dag_1.description, dag_1.timetable_summary, dag_1.timetable_description, dag_1.asset_expression, dag_1.max_active_tasks, dag_1.max_active_runs, dag_1.max_consecutive_failed_dag_runs, dag_1.has_task_concurrency_limits, dag_1.has_import_errors, dag_1.fail_fast, dag_1.next_dagrun, dag_1.next_dagrun_data_interval_start, dag_1.next_dagrun_data_interval_end, dag_1.next_dagrun_create_after, task_instance_1.rendered_map_index, task_instance_1.task_display_name, dag_run_1.state, dag_run_1.id AS id_1, dag_run_1.dag_id AS dag_id_2, dag_run_1.queued_at, dag_run_1.logical_date AS logical_date_1, dag_run_1.start_date, dag_run_1.end_date, dag_run_1.run_id AS run_id_1, dag_run_1.creating_job_id, dag_run_1.run_type, dag_run_1.triggered_by, dag_run_1.triggering_user_name, dag_run_1.conf, dag_run_1.data_interval_start, dag_run_1.data_interval_end, dag_run_1.run_after, dag_run_1.last_scheduling_decision, dag_run_1.log_template_id, dag_run_1.updated_at, dag_run_1.clear_number, dag_run_1.backfill_id, dag_run_1.bundle_version AS bundle_version_1, dag_run_1.scheduled_by_job_id, dag_run_1.context_carrier, dag_run_1.span_status, dag_run_1.created_dag_version_id, dag_run_1.partition_key, task_instance_1.id AS id_2, task_instance_1.task_id AS task_id_1, task_instance_1.dag_id AS dag_id_3, task_instance_1.run_id AS run_id_2, task_instance_1.map_index AS map_index_1, task_instance_1.start_date AS start_date_1, task_instance_1.end_date AS end_date_1, task_instance_1.duration, task_instance_1.state AS state_1, task_instance_1.try_number AS try_number_1, task_instance_1.max_tries, task_instance_1.hostname, task_instance_1.unixname, task_instance_1.pool, task_instance_1.pool_slots, task_instance_1.queue, task_instance_1.priority_weight, task_instance_1.operator, task_instance_1.custom_operator_name, task_instance_1.queued_dttm, task_instance_1.scheduled_dttm, task_instance_1.queued_by_job_id, task_instance_1.last_heartbeat_at, task_instance_1.pid, task_instance_1.executor, task_instance_1.executor_config, task_instance_1.updated_at AS updated_at_1, task_instance_1.context_carrier AS context_carrier_1, task_instance_1.span_status AS span_status_1, task_instance_1.external_executor_id, task_instance_1.trigger_id, task_instance_1.trigger_timeout, task_instance_1.next_method, task_instance_1.next_kwargs, task_instance_1.dag_version_id
FROM log LEFT OUTER JOIN dag AS dag_1 ON log.dag_id = dag_1.dag_id LEFT OUTER JOIN (task_instance AS task_instance_1 JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance_1.dag_id AND dag_run_1.run_id = task_instance_1.run_id) ON log.task_id = task_instance_1.task_id
WHERE log.dag_id = ? AND log.task_id = ? AND log.run_id = ? ORDER BY log.dttm DESC, log.id DESC
LIMIT ? OFFSET ?
What you think should happen instead?
No response
How to reproduce
- Parse and let the below dags complete 3 runs each.
- Go to audit log tab and check the events.
- Mark the task instance as failed and reload the page.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="log_query_1",
start_date=datetime(2025, 1, 1),
end_date=datetime(2025, 1, 3),
catchup=True,
schedule="@daily",
) as dag:
@task
def start():
import time
time.sleep(1)
start()
with DAG(
dag_id="log_query_2",
start_date=datetime(2025, 1, 1),
end_date=datetime(2025, 1, 3),
catchup=True,
schedule="@daily",
) as dag:
@task
def start():
import time
time.sleep(1)
start()Operating System
Ubuntu 20.04
Versions of Apache Airflow Providers
No response
Deployment
Virtualenv installation
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct