Software Engineering

Airflow에서 TaskFlow API(@task)와 PythonOperator가 아닌 Operator 같이 사용하기

머니기어 2024. 2. 2. 12:25
반응형

 

Introduction

 기존의 DAGs에서 작성되는 고전적인 방식에 대비되는 TaskFlow API 패러다임은  Airflow 2.0에서 소개되었다. 데이터 파이프라인에서 파이썬 함수에 @task 데코레이터를 붙이는 것으로 간단하게 task를 생성할 수 있다. 함수의 이름이 task의 유일한 식별자 역할을 한다. 그런데 PythonOperator가 아닌 다른 Operator를 사용하면서 @task 데코레이터를 쓰려면 어떻게 해야할까?

 

QuickStart

 

간단하게 context를 파이썬 함수의 파라미터로 받고 이걸 내부에 있는 다른 operator에 넘겨주면 된다! 아래는 예시이다.

 

@dag()
def dag():

    @task()
    def python_task(**context):
    	bash_task = BashOperator(
            task_id='bash_task',
            bash_command='echo "this is BashOperator"',
        )
        bash_task.execute(context)

    task()

 

어떤 operator이든 내부적으로는 execute()를 통해 실행된다. 따라서 @task 데코레이터로 등록된 파이썬 함수 내부에서 직접 실행할 수 있다. 다만, task의 정보를 담고 있는 context를 페이로드 전달해줘야 한다. 위 경우 BashOperator에는 "task_id" 파라미터가 필수이지만 execute()에 context인자를 전달해 실행하는 순간 "python_task"로 덮어씌워진다.

반응형