ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Git 작업을 위한 airflow Dag 관리
    etc. 2021. 10. 9. 16:59

    배경

    - Airflow를 Staging과 Production 환경으로 나누어 운영 중

    - Github Actions를 이용하여 PR 생성 시 Staging에 배포 -> master merge 시 Production에 배포 프로세스

    현재 활용중인 airflow 배포 모식도

    문제점

    - 기본 airflow dag 템플릿에 맞추어 작업할 때, 하나의 dag.py 파일 아래 복수의 태스크를 작성함

    - 여러 사람이 동시에 서로 다른 Task 추가를 위해 같은 DAG를 편집하면 Staging 환경에서 Conflict 발생

     

    대안

    - DAG 파일 하나를 동시에 수정하는 일이 없도록 개별 태스크를 각각의 파일로 처리한다.

    - DAG 파일에서 각 Task 생성은 함수로 처리한다.

    코드 참조

    from datetime import timedelta
    from utils.sql_operator import sql_task
    from airflow.operators.python import PythonOperator
    from airflow.sensors.external_task import ExternalTaskSensor
    import os
    import json
    
    def make_tasks(path, ds):
        streaming = []
        tasks = []
    
    	#sql operator를 실행하기 위한 태스크 생성
        for task in os.listdir('/opt/airflow/'+path):
            with open('/opt/airflow/'+path+task) as t:
                taskinfo = json.loads(t.read())
                task_id = taskinfo['task_id']
                sqlpath = taskinfo['sql_path']
                sql = taskinfo['sql_path'] + '/' + task_id + '.sql'
                upstream = taskinfo['upstream']
                downstream = taskinfo['downstream']
    
    		# external sensor task 생성을 위한 구문
            try:
                for sensor in taskinfo['sensor_task']:
                    sensor_id = sensor['sensor_id']
                    external_dag_id = sensor['external_dag_id']
                    external_task_id = sensor['external_task_id']
                    sensor_date_fn = sensor['sensor_date_fn']
    
                    try:
                        globals() [sensor_id] = ExternalTaskSensor(
                             task_id=sensor_id,
                             external_dag_id=external_dag_id,
                             external_task_id=external_task_id,
                             timeout=3600,
                             allowed_states=['success'],
                             failed_states=['failed', 'skipped'],
                             execution_date_fn=lambda x: x - timedelta(minutes=sensor_date_fn),
                             mode="reschedule",
                         )
                    except:
                        pass
    
            except Exception as e:
                pass
    
            def make_task(**kwargs):
                tid = kwargs['ti'].task_id
                query = sqlpath+'/'+tid+'.sql'
                sql_task(sql=query, ds=kwargs[ds])
    
            globals() [task_id] = PythonOperator(python_callable=make_task, task_id=task_id)
    
    		# Task 간의 의존성 설정을 위한 리스트업 
            for up in upstream :
                streaming.append({"dependency":"up", "stream":up, "task": task_id})
            for down in downstream :
                streaming.append({"dependency":"down", "stream":down, "task": task_id})
    
    	# airflow 태스크 실행 영역
        for stream in streaming:
            if stream['dependency'] == 'up' and stream['stream'] != '' :
                globals() [stream['stream']] >> globals() [stream['task']]
            elif stream['dependency'] == 'down' and stream['stream'] != '':
                globals() [stream['task']] >> globals() [stream['stream']]
            else :
                globals() [stream['task']]

    - 개별 task 정보 json 파일 예시

    {
      "sql_path": "tier2",
      "task_id": "products",
      "upstream": ["tier1_mongo_products", "tier1_mongo_creators", "tier1_rds_users"],
      "downstream": [""],
      "sensor_task": [
        {
          "sensor_id": "tier1_mongo_products",
          "external_dag_id": "tier1_mongo_dag_0.0.2",
          "external_task_id": "products",
          "sensor_date_fn": 30
        },
        {
          "sensor_id": "tier1_mongo_creators",
          "external_dag_id": "tier1_mongo_dag_0.0.2",
          "external_task_id": "creators",
          "sensor_date_fn": 30
        },
        {
          "sensor_id": "tier1_rds_users",
          "external_dag_id": "tier1_rds_dag_0.0.2",
          "external_task_id": "user_users",
          "sensor_date_fn": 30
        }
      ]
    }

    - 최종 DAG 파일 예시

    from datetime import datetime, timedelta
    from airflow import DAG
    from utils import make_dags_module as md
    import os
    
    default_args={
        'owner': 'data-platform',
        'depends_on_past': False,
        'start_date': datetime(2021, 8, 31, 15, 0),
        'retries': 5,
        'retry_delay': timedelta(minutes=2),
        'task_concurrency': 1
    }
    
    ds = 'tomorrow_ds_nodash'
    
    if os.environ['STATUS'] == 'stg' :
        schedule = '@once'
    
    else :
        schedule = '30 18 * * *'
    
    with DAG(
            'tier2_dag_0.0.2',
            default_args=default_args,
            catchup=False,
            description='hive tier1 -> hive tier2',
            schedule_interval=schedule,
            concurrency=20
        ) as dag:
    
        """
        json file 추가 시 참고 (*:Required, -:Optional)
        * sql_path : code 하위의 쿼리가 포함된 디렉토리
        * task_id : dag에 표시될 task_id
        * upstream : 해당 task 실행을 위해 먼저 성공해야 하는 task (없을 시 공란)
        * downstream : 해당 task 성공 후 실행할 task (없을 시 공란)
        - sensor_task : 외부 dag와의 의존성 설정을 위한 센서 태스크
        """
    
        path = 'dags/tier2/tier2_dag_tasks/'
        md.make_task(path, ds)
Designed by Tistory.