Wymyśliłem sposób tworzenia przepływów pracy na podstawie wyników poprzednich zadań.
Zasadniczo to, co chcesz zrobić, to mieć dwa subdagi z następującymi elementami:
- Xcom wypycha listę (lub cokolwiek, czego potrzebujesz później do utworzenia dynamicznego przepływu pracy) w subdagu, który jest wykonywany jako pierwszy (zobacz test1.py
def return_list()
)
- Przekaż główny obiekt dag jako parametr do drugiego subdagu
- Teraz, jeśli masz główny obiekt dag, możesz go użyć, aby uzyskać listę jego instancji zadań. Z tej listy instancji zadań można odfiltrować zadanie bieżącego uruchomienia przy użyciu
parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]
), prawdopodobnie można by tu dodać więcej filtrów.
- W tej instancji zadania możesz użyć xcom pull, aby uzyskać potrzebną wartość, określając dag_id do jednego z pierwszych subdagów:
dag_id='%s.%s' % (parent_dag_name, 'test1')
- Użyj listy / wartości, aby dynamicznie tworzyć zadania
Teraz przetestowałem to w mojej lokalnej instalacji przepływu powietrza i działa dobrze. Nie wiem, czy część ściągająca xcom będzie miała jakiekolwiek problemy, jeśli będzie więcej niż jedna instancja dag działająca w tym samym czasie, ale wtedy prawdopodobnie użyłbyś unikalnego klucza lub czegoś podobnego, aby jednoznacznie zidentyfikować xcom wartość, którą chcesz. Prawdopodobnie można by zoptymalizować 3. krok, aby mieć 100% pewności, że otrzymasz określone zadanie z bieżącego głównego dag, ale do mojego użytku działa to wystarczająco dobrze, myślę, że do użycia xcom_pull potrzebny jest tylko jeden obiekt task_instance.
Czyszczę również xcomy dla pierwszego subdagu przed każdym wykonaniem, aby upewnić się, że przypadkowo nie otrzymam złej wartości.
Jestem dość kiepski w wyjaśnianiu, więc mam nadzieję, że poniższy kod wyjaśni wszystko:
test1.py
from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
log = logging.getLogger(__name__)
def test1(parent_dag_name, start_date, schedule_interval):
dag = DAG(
'%s.test1' % parent_dag_name,
schedule_interval=schedule_interval,
start_date=start_date,
)
def return_list():
return ['test1', 'test2']
list_extract_folder = PythonOperator(
task_id='list',
dag=dag,
python_callable=return_list
)
clean_xcoms = PostgresOperator(
task_id='clean_xcoms',
postgres_conn_id='airflow_db',
sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
dag=dag)
clean_xcoms >> list_extract_folder
return dag
test2.py
from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator
log = logging.getLogger(__name__)
def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
dag = DAG(
'%s.test2' % parent_dag_name,
schedule_interval=schedule_interval,
start_date=start_date
)
if len(parent_dag.get_active_runs()) > 0:
test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
dag_id='%s.%s' % (parent_dag_name, 'test1'),
task_ids='list')
if test_list:
for i in test_list:
test = DummyOperator(
task_id=i,
dag=dag
)
return dag
i główny przepływ pracy:
test.py
from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2
DAG_NAME = 'test-dag'
dag = DAG(DAG_NAME,
description='Test workflow',
catchup=False,
schedule_interval='0 0 * * *',
start_date=datetime(2018, 8, 24))
test1 = SubDagOperator(
subdag=test1(DAG_NAME,
dag.start_date,
dag.schedule_interval),
task_id='test1',
dag=dag
)
test2 = SubDagOperator(
subdag=test2(DAG_NAME,
dag.start_date,
dag.schedule_interval,
parent_dag=dag),
task_id='test2',
dag=dag
)
test1 >> test2