Как динамически добавлять значение bucket_key в поток воздуха S3KeySensor

Я пытаюсь установить S3KeySensor bucket_key вверх на основе входной переменной dagrun. У меня есть один dag "dag_trigger", который использует TriggerDagRunOperator для запуска dagrun для dag "dag_triggered". Я пытаюсь расширить пример https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_trigger_target_dag.py.

Поэтому я хочу отправить переменную в вызванный dag, и в соответствии с значением переменной я хочу установить значение backet_key в задаче S3KeySensor. Я знаю, как использовать переданную переменную в вызываемой функции PythonOperator, но я не знаю, как ее использовать на объекте датчика.

dag_trigger dag:

import datetime

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator


default_args = {
 'owner': 'airflow',
 'start_date': datetime.datetime.now()}

dag = DAG('dag_trigger', default_args=default_args, schedule_interval="@hourly")

def task_1_run(context, dag_run_object):
 sent_variable = '2018_02_19' # not important
 dag_run_object.payload = {'message': sent_variable}
 print "DAG dag_trigger triggered with payload: %s" % dag_run_object.payload)
 return dag_run_object

task_1 = TriggerDagRunOperator(task_id="task_1",
 trigger_dag_id="dag_triggered",
 provide_context=True,
 python_callable=task_1_run,
 dag=dag)

И dag_triggered dag:

import datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import S3KeySensor


default_args = {
 'owner': 'airflow',
 'start_date': datetime.datetime.now()
}

dag = DAG('dag_triggered', default_args=default_args, schedule_interval=None)

wait_files_to_arrive_task = S3KeySensor(
 task_id='wait_file_to_arrive',
 bucket_key='file_%s' % '', # Here I want to place conf['sent_variable']
 wildcard_match=True,
 bucket_name='test-bucket',
 s3_conn_id='test_s3_conn',
 timeout=18*60*60,
 poke_interval=120,
 dag=dag)

Я попытался получить значение из объекта dag с помощью dag.get_dagrun(). Conf ['sent_variable'], но у меня есть сомнение в том, как установить переменную dagrun create_date (dag_trigger будет запускать dag_triggered каждый час, а dag_triggered может подождать дольше для файла).

Я также попытался создать PythonOperator, который был бы upstream для wait_files_to_arrive_task. Вызываемая функция python может получать информацию о send_variable. После этого я попытался установить значение для bucket_key как bucket_key = callable_function() - но у меня проблема с аргументами.

И я также считаю, что глобальные переменные не являются хорошим решением.

Может, у кого-то есть идея, которая работает.

1 ответ

Невозможно получить значение в вашей учетной записи DAG прямо в вашем файле DAG. Это то, что не может быть определено без контекста, из которого DAG выполняет свою роль. Один из способов подумать о том, когда вы запускаете python my_dag.py чтобы проверить, компилируется ли ваш файл DAG, он должен инициализировать все эти операторы, не указывая дату выполнения. Таким образом, все, что может отличаться от запуска DAG, нельзя напрямую ссылаться.

Поэтому вместо этого вы можете передать его как значение шаблона, которое позже будет визуализироваться с помощью контекста при выполнении задачи.

wait_files_to_arrive_task = S3KeySensor(
 task_id='wait_file_to_arrive',
 bucket_key='file_{{ dag_run.conf["message"] }}',
 ...)

Обратите внимание, что будут отображаться только параметры, перечисленные в template_fields оператора. К счастью, кто-то ожидал этого, поэтому bucket_key действительно является полем шаблона.

licensed under cc by-sa 3.0 with attribution.