11,899 questions
Score of -1
0 answers
38 views
How to Insert datas from a dataframe into a database?
I have a DAG with three tasks in my project. The three tasks call three function which are in differents files. I'm using the ELT/ETL process. Everything seems fine until the injection of the datas. I ...
Tooling
0
votes
0
replies
33
views
Does the airflow-db-cleanup.py DAG need any import modifications when using SparkKubernetesOperator?
I need to set up a metadata cleanup DAG for our Airflow environment, which uses the SparkKubernetesOperator (from apache-airflow-providers-cncf-kubernetes) to run Spark jobs on Kubernetes.
I'm ...
Score of 1
0 answers
47 views
Forked errors in a task
I just started using airflow to schedule data pulls from a few sources to save them to a postgresql database on my laptop for further processing.
Everything was running relatively smoothly until I ...
Tooling
1
vote
1
replies
133
views
Airflow use cases for different repos
I am newbie in Airflow and I use currently task scheduler on remote desktop to automate our tasks but it is based on my credentials and I cannot share with other team members, so I clone all projects ...
Score of 4
1 answer
105 views
Conditionally running tasks in Airflow
I'm trying to write a DAG that conditionally executes another task. The simplified version of what I'm working with is this:
to_be_triggered = EmptyOperator(task_id="to_be_triggered")
@task....
Score of 0
0 answers
59 views
How do you add GCP billing report labels onto Airflow's BatchPredictionJobHook.create_batch_prediction_job()?
According to the official Airflow documentation we can use labels arg to mark certain batch predictions which we can use in GCP Billing > Report to filter by the labels. I tried it like this
...
...
Score of 0
0 answers
58 views
OSError when running containerized task in `airflow` image
I'm trying to run a simple task in the apache/airflow image with the following Python script:
from airflow.sdk import dag, task
@task.docker(
image="docker.1ms.run/apache/airflow:3.2.0-...
Score of 0
1 answer
70 views
Why does a DAG created in /dags take time to appear in the UI?
In Apache Airflow, when a new DAG file is created in the /dags directory, it doesn't show up immediately in the Airflow UI. There is some delay before the DAG becomes visible and accessible.
Why does ...
Score of 0
1 answer
67 views
How to automatically install latest Python package versions in AWS MWAA from S3 requirements.txt without manual updates?
Body:
I am using AWS Managed Workflows for Apache Airflow (MWAA) where the requirements.txt file is stored in an S3 bucket and MWAA syncs it during environment updates.
Current Setup:
requirements....
Score of 0
0 answers
77 views
KubernetesPodOperator with deferrable=True returns Forbidden on Cloud Composer 3
Environment:
Cloud Composer 3 (composer-3-airflow-2.10.2-build.13)
Region: europe-west1
Environment name: composer3-npd
Project: XXXXXXXXXX
Private environment: enabled
Problem: When using ...
Score of 0
0 answers
85 views
Airflow task_group conditional logic always enters else block even when env is "PROD" or "STAGE"
Problem
I have an Airflow DAG that uses a @task_group with conditional logic to set task dependencies based on an environment variable. The intent is:
In PROD or STAGE: run only table_task (no ...
Score of 0
0 answers
44 views
Airflow: Dynamic sequential task groups where number of groups is unknown at parse time
I have an Airflow DAG where I need to:
Fetch a list of items from an Airflow Variable
Task A will Batch them into sublists
For each batch, create a task group with dynamic task mapping (one task ...
Score of 0
1 answer
86 views
Making SQL queries from a task, not SqlExecuteQueryOperator
I am having trouble writing an Apache Airflow (v. 3.1.7) DAG for the following pipeline:
Fetch rows from MS SQL database, based on data_interval
Output from (1) is sorted and processed in a Python ...
Score of 4
1 answer
213 views
Airflow Task dies exactly 24h after starting
The Problem:
when a task duration reaches 24h it is immediately killed. The log message we get indicates token expiration.
System infos
Airflow 3.1.6 (running in docker)
Celery executor (...
Score of 1
1 answer
86 views
Python Multiprocessing isn't working anymore after changing from Airflow 2 to Airflow 3
I am currently migrating my processes and DAGs from Airflow 2 to Airflow 3. In doing so, I am encountering the problem that the DAG that executes an XML parser freezes after completing its work. As a ...