ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Airflow로 마케팅 데이터 파이프라인 관리하기
    About Data 2021. 7. 9. 14:20

    아래 이미지는 현 회사에서 관리하고 있는 ETL 파이프라인 중 마케팅 Summary data에 관한 Airflow DAG 그래프입니다.

    마케팅 데이터 파이프라인의 구성

    1. 소스 데이터 입수

    • 위의 DAG에는 포함되어있지 않지만, 광고 매체의 보고서 API를 호출하거나 서비스 DB에서 소스를 추출하는 스케줄도 에어플로우로 관리합니다.

    2. 가공 및 적재

    • 위의 DAG에서 가장 처음으로 실행되는 두 개의 Task인 userid_mapping >> userid_mapped_logs에 해당합니다.
      이 Task는 log 데이터에서 로그인 전 생성되는 anonymous_id와 로그인 후 기록되는 user_id를 매핑해서
      특정 유저가 로그인하기 전의 로그까지 해당 유저의 기록으로 연결해 주는 역할을 합니다.
    • 다음으로 분기되는 daily_active_users_list, users_first_visit Task는 매핑된 로그 테이블을 기반으로 해당일 방문한 전체 유저 리스트와 처음 방문한 유저 리스트를 기록합니다.
    • 이후 다른 서버에서 진행되는 ETL 작업이 완료되었는지 체크하는 DW_data_check Task를 거쳐, 해당 데이터를 활용하는 Task들로 이어지게 설계했습니다.

    3. 복잡한 테이블 합성

    • daily_utm_funnel Task의 경우 DW_data_check와 userid_mapped_logs 두 가지 Task에 의존성을 가지고 있습니다.
      여기서는 로그 데이터와 DB 정보를 조합해 유저 별로 해당 일자의 행동 흐름을 주요 이벤트 단위로 집계하는 작업을 합니다.
    • user_signed_up_utm, utm_orders_status_final Task는 위의 daily_utm_funnel, DW_data_check에 의존성이 있으며,
      유저 별 가입 경로와 주문 경로, 이후 액션들을 UTM Tag 단위로 집계하는 작업입니다.
    • 마케팅 데이터 가공의 마지막 단계는 utm_spend_full_joined Task입니다. 
      이 Task는 위의 daily_utm_funnel, user_signed_up_utm, utm_orders_status_final, mkt_spend까지 총 네 개의 Task와 연결되어 있으며, 각 작업을 통해 생성된 테이블을 full join하여 전체 마케팅 비용과 유입 경로부터 구매 기여까지를 집계하게 됩니다.

    4. Tableau Refresh

    • 앞서 이야기한 작업들이 데이터 ETL 과정이었다고 하면, 이 단계에서는 완전히 다른 작업을 수행합니다.
    • Tableau Server를 통해 시각화된 대시보드를 데이터 소비자들에게 제공하고 있기 때문에, 기존에는 ETL 작업이 완료되는 시점에 맞추어 Tableau Server에서 직접 연결된 데이터 원본을 새로고침하는 스케줄을 운영했었습니다.
    • 현재는 Tableau API와 Airflow의 PythonOperator를 활용하여 하나의 프로세스로 관리하고 있습니다.
    • 관련 코드는 아래와 같습니다.
      # Tableau API 호출 코드
      import tableauserverclient as TSC
      import pandas as pd
      
      server = TSC.Server('https://{tabeau_server_host}', use_server_version=True)
      tableau_auth = TSC.PersonalAccessTokenAuth(token_name, personal_access_token)
      request_options = TSC.RequestOptions(pagesize=100)
      sources = []
      
      def tableau_refresh(source_name=str):
          with server.auth.sign_in(tableau_auth):
              all_datasources, pagination_item = server.datasources.get(request_options)
              
              for datas in all_datasources:
                  sources.append([datas.name, datas.id])
      
              df = pd.DataFrame(sources, columns=['name', 'id'])
              target_id = df.loc[df['name'].str.contains(source_name), 'id'].values
      
              print(df.loc[df['name'].str.contains(source_name), 'name'].values)
      
              for target in target_id :
                  data = server.datasources.get_by_id(target)
      
                  server.datasources.refresh(data)
                  
                  
                  
      # Airflow Task 코드
      from airflow.operators.python import PythonOperator
      import tableau_refresh as ta
      
      def tableau_utm_spend_full_joined():
        t = ta.tableau_refresh('utm_spend_full_joined')
        return t
      
      tableau_utm_spend_full_joined_refresh = PythonOperator(
        task_id='tableau_utm_spend_full_joined_refresh',
        python_callable=tableau_utm_spend_full_joined,
        dag=dag
      )

    Task 간 의존성 설정 방법

    • 병렬 관계 : ',' 로 연결
    • 의존 관계 : '>>'로 연결
    • Task 그룹 : []로 묶음
    [userid_mapped >> userid_mapped_logs] >> daily_utm_funnel
    [userid_mapped >> userid_mapped_logs] >> daily_active_users_list >> DW_data_check
    [userid_mapped >> userid_mapped_logs] >> users_first_visit >> DW_data_check
    [userid_mapped >> userid_mapped_logs >> userid_mapped_utm] >> user_signed_up_utm
    [userid_mapped_logs, userid_mapped_utm] >> userid_mapped_session_logs
    userid_mapped_utm >> utm_orders_status_final
    userid_mapped_utm >> utm_orders_status_final_1d
    userid_mapped_session_logs >> daily_utm_funnel
    DW_data_check >> [utm_orders_status_final, user_signed_up_utm] >> utm_spend_full_joined
    DW_data_check >> utm_orders_status_final_1d
    DW_data_check >> daily_dormant_user_order
    DW_data_check >> users_history >> user_segment_category >> tableau_user_segment
    DW_data_check >> daily_utm_funnel >> utm_spend_full_joined
    MKT_data_check >> mkt_spend >> utm_spend_full_joined
    utm_spend_full_joined >> tableau_utm_spend_full_joined_refresh
    [utm_orders_status_final, utm_orders_status_final_1d] >> tableau_utm_orders_status_final_refresh
    user_signed_up_utm >> tableau_user_signed_up_utm_refresh
    utm_orders_status_final >> tableau_user_signed_up_utm_refresh

     

    Airflow를 통한 마케팅 데이터 파이프라인 관리의 효용성

    • 하나의 DAG로 마케팅 데이터 파이프라인을 구성함으로써 각 ETL Task의 관계를 시각적으로 편리하게 확인할 수 있습니다.
    • 특정 Task가 실패했을 때, 의존성을 지닌 Upstream / Downstream Task들까지 쉽게 재시작할 수 있습니다.
    • execution_data에 대한 개념만 잘 잡혀 있다면 스케줄 변경도 자유롭게 할 수 있습니다.
      - 일 단위 스케줄일 때, execution_date는 실행하는 시점의 전날을 가리킵니다.
      - 시간 단위 스케줄일 때, execution_date는 실행하는 시점의 이전 시각을 가리킵니다.
      - DAG의 start_date는 실행 시각이 아닌 execution_date를 기준으로 합니다.
Designed by Tistory.