Skip to content

Incorrect log and task instance join query based on task_id in eventLogs API #59965

@tirkarthi

Description

@tirkarthi

Apache Airflow version

main (development)

If "Other Airflow 3 version" selected, which one?

No response

What happened?

The audit log page has been very slow in our environment. There are a lot of generated dags with same task_id present in different dags. Upon loading the events for a dag we noted that join for task instance is made on TaskInstance.task_id which results in incorrect join where events related to other task instances of same id are also returned. The query generated is as below where LEFT OUTER JOIN (task_instance based on task_id seems to be problematic. The default behavior should be fixed. The query is also loading everything though dag_id and task_display_name are the only required fields which can be improved.

task_instance = relationship(
"TaskInstance",
viewonly=True,
foreign_keys=[task_id],
primaryjoin="Log.task_id == TaskInstance.task_id",
lazy="noload",
)

SELECT log.id, log.dttm, log.dag_id, log.task_id, log.map_index, log.event, log.logical_date, log.run_id, 
log.owner, log.owner_display_name, log.extra, log.try_number, dag_1.dag_display_name, dag_1.deadline, 
dag_1.dag_id AS dag_id_1, dag_1.is_paused, dag_1.is_stale, dag_1.last_parsed_time, 
dag_1.last_parse_duration, dag_1.last_expired, dag_1.fileloc, dag_1.relative_fileloc, dag_1.bundle_name, dag_1.bundle_version, dag_1.owners, dag_1.description, dag_1.timetable_summary, dag_1.timetable_description, dag_1.asset_expression, dag_1.max_active_tasks, dag_1.max_active_runs, dag_1.max_consecutive_failed_dag_runs, dag_1.has_task_concurrency_limits, dag_1.has_import_errors, dag_1.fail_fast, dag_1.next_dagrun, dag_1.next_dagrun_data_interval_start, dag_1.next_dagrun_data_interval_end, dag_1.next_dagrun_create_after, task_instance_1.rendered_map_index, task_instance_1.task_display_name, dag_run_1.state, dag_run_1.id AS id_1, dag_run_1.dag_id AS dag_id_2, dag_run_1.queued_at, dag_run_1.logical_date AS logical_date_1, dag_run_1.start_date, dag_run_1.end_date, dag_run_1.run_id AS run_id_1, dag_run_1.creating_job_id, dag_run_1.run_type, dag_run_1.triggered_by, dag_run_1.triggering_user_name, dag_run_1.conf, dag_run_1.data_interval_start, dag_run_1.data_interval_end, dag_run_1.run_after, dag_run_1.last_scheduling_decision, dag_run_1.log_template_id, dag_run_1.updated_at, dag_run_1.clear_number, dag_run_1.backfill_id, dag_run_1.bundle_version AS bundle_version_1, dag_run_1.scheduled_by_job_id, dag_run_1.context_carrier, dag_run_1.span_status, dag_run_1.created_dag_version_id, dag_run_1.partition_key, task_instance_1.id AS id_2, task_instance_1.task_id AS task_id_1, task_instance_1.dag_id AS dag_id_3, task_instance_1.run_id AS run_id_2, task_instance_1.map_index AS map_index_1, task_instance_1.start_date AS start_date_1, task_instance_1.end_date AS end_date_1, task_instance_1.duration, task_instance_1.state AS state_1, task_instance_1.try_number AS try_number_1, task_instance_1.max_tries, task_instance_1.hostname, task_instance_1.unixname, task_instance_1.pool, task_instance_1.pool_slots, task_instance_1.queue, task_instance_1.priority_weight, task_instance_1.operator, task_instance_1.custom_operator_name, task_instance_1.queued_dttm, task_instance_1.scheduled_dttm, task_instance_1.queued_by_job_id, task_instance_1.last_heartbeat_at, task_instance_1.pid, task_instance_1.executor, task_instance_1.executor_config, task_instance_1.updated_at AS updated_at_1, task_instance_1.context_carrier AS context_carrier_1, task_instance_1.span_status AS span_status_1, task_instance_1.external_executor_id, task_instance_1.trigger_id, task_instance_1.trigger_timeout, task_instance_1.next_method, task_instance_1.next_kwargs, task_instance_1.dag_version_id 
FROM log LEFT OUTER JOIN dag AS dag_1 ON log.dag_id = dag_1.dag_id LEFT OUTER JOIN (task_instance AS task_instance_1 JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance_1.dag_id AND dag_run_1.run_id = task_instance_1.run_id) ON log.task_id = task_instance_1.task_id 
WHERE log.dag_id = ? AND log.task_id = ? AND log.run_id = ? ORDER BY log.dttm DESC, log.id DESC
 LIMIT ? OFFSET ?

What you think should happen instead?

No response

How to reproduce

  1. Parse and let the below dags complete 3 runs each.
  2. Go to audit log tab and check the events.
  3. Mark the task instance as failed and reload the page.
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


with DAG(
    dag_id="log_query_1",
    start_date=datetime(2025, 1, 1),
    end_date=datetime(2025, 1, 3),
    catchup=True,
    schedule="@daily",
) as dag:

    @task
    def start():
        import time
        time.sleep(1)

    start()


with DAG(
    dag_id="log_query_2",
    start_date=datetime(2025, 1, 1),
    end_date=datetime(2025, 1, 3),
    catchup=True,
    schedule="@daily",
) as dag:

    @task
    def start():
        import time
        time.sleep(1)

    start()

Operating System

Ubuntu 20.04

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions