-
Git 작업을 위한 airflow Dag 관리etc. 2021. 10. 9. 16:59
배경
- Airflow를 Staging과 Production 환경으로 나누어 운영 중
- Github Actions를 이용하여 PR 생성 시 Staging에 배포 -> master merge 시 Production에 배포 프로세스
문제점
- 기본 airflow dag 템플릿에 맞추어 작업할 때, 하나의 dag.py 파일 아래 복수의 태스크를 작성함
- 여러 사람이 동시에 서로 다른 Task 추가를 위해 같은 DAG를 편집하면 Staging 환경에서 Conflict 발생
대안
- DAG 파일 하나를 동시에 수정하는 일이 없도록 개별 태스크를 각각의 파일로 처리한다.
- DAG 파일에서 각 Task 생성은 함수로 처리한다.
코드 참조
from datetime import timedelta from utils.sql_operator import sql_task from airflow.operators.python import PythonOperator from airflow.sensors.external_task import ExternalTaskSensor import os import json def make_tasks(path, ds): streaming = [] tasks = [] #sql operator를 실행하기 위한 태스크 생성 for task in os.listdir('/opt/airflow/'+path): with open('/opt/airflow/'+path+task) as t: taskinfo = json.loads(t.read()) task_id = taskinfo['task_id'] sqlpath = taskinfo['sql_path'] sql = taskinfo['sql_path'] + '/' + task_id + '.sql' upstream = taskinfo['upstream'] downstream = taskinfo['downstream'] # external sensor task 생성을 위한 구문 try: for sensor in taskinfo['sensor_task']: sensor_id = sensor['sensor_id'] external_dag_id = sensor['external_dag_id'] external_task_id = sensor['external_task_id'] sensor_date_fn = sensor['sensor_date_fn'] try: globals() [sensor_id] = ExternalTaskSensor( task_id=sensor_id, external_dag_id=external_dag_id, external_task_id=external_task_id, timeout=3600, allowed_states=['success'], failed_states=['failed', 'skipped'], execution_date_fn=lambda x: x - timedelta(minutes=sensor_date_fn), mode="reschedule", ) except: pass except Exception as e: pass def make_task(**kwargs): tid = kwargs['ti'].task_id query = sqlpath+'/'+tid+'.sql' sql_task(sql=query, ds=kwargs[ds]) globals() [task_id] = PythonOperator(python_callable=make_task, task_id=task_id) # Task 간의 의존성 설정을 위한 리스트업 for up in upstream : streaming.append({"dependency":"up", "stream":up, "task": task_id}) for down in downstream : streaming.append({"dependency":"down", "stream":down, "task": task_id}) # airflow 태스크 실행 영역 for stream in streaming: if stream['dependency'] == 'up' and stream['stream'] != '' : globals() [stream['stream']] >> globals() [stream['task']] elif stream['dependency'] == 'down' and stream['stream'] != '': globals() [stream['task']] >> globals() [stream['stream']] else : globals() [stream['task']]
- 개별 task 정보 json 파일 예시
{ "sql_path": "tier2", "task_id": "products", "upstream": ["tier1_mongo_products", "tier1_mongo_creators", "tier1_rds_users"], "downstream": [""], "sensor_task": [ { "sensor_id": "tier1_mongo_products", "external_dag_id": "tier1_mongo_dag_0.0.2", "external_task_id": "products", "sensor_date_fn": 30 }, { "sensor_id": "tier1_mongo_creators", "external_dag_id": "tier1_mongo_dag_0.0.2", "external_task_id": "creators", "sensor_date_fn": 30 }, { "sensor_id": "tier1_rds_users", "external_dag_id": "tier1_rds_dag_0.0.2", "external_task_id": "user_users", "sensor_date_fn": 30 } ] }
- 최종 DAG 파일 예시
from datetime import datetime, timedelta from airflow import DAG from utils import make_dags_module as md import os default_args={ 'owner': 'data-platform', 'depends_on_past': False, 'start_date': datetime(2021, 8, 31, 15, 0), 'retries': 5, 'retry_delay': timedelta(minutes=2), 'task_concurrency': 1 } ds = 'tomorrow_ds_nodash' if os.environ['STATUS'] == 'stg' : schedule = '@once' else : schedule = '30 18 * * *' with DAG( 'tier2_dag_0.0.2', default_args=default_args, catchup=False, description='hive tier1 -> hive tier2', schedule_interval=schedule, concurrency=20 ) as dag: """ json file 추가 시 참고 (*:Required, -:Optional) * sql_path : code 하위의 쿼리가 포함된 디렉토리 * task_id : dag에 표시될 task_id * upstream : 해당 task 실행을 위해 먼저 성공해야 하는 task (없을 시 공란) * downstream : 해당 task 성공 후 실행할 task (없을 시 공란) - sensor_task : 외부 dag와의 의존성 설정을 위한 센서 태스크 """ path = 'dags/tier2/tier2_dag_tasks/' md.make_task(path, ds)
'etc.' 카테고리의 다른 글
웹알못의 데이터 어드민 사이트 만들기(feat. Streamlit) 0.Prologue (0) 2022.09.10 AWS EMR Prestosql(Trino) + Redash 업그레이드 (0) 2021.10.09 Github Auth 정책 업데이트 : Personal Access Token 사용하기 (1) 2021.08.16 Airflow, Jupyterhub에 Google OAuth 적용하기 (0) 2021.06.09 Jupyterhub - nbviewer 연동하기 (0) 2021.01.19