-
Airflow로 마케팅 데이터 파이프라인 관리하기About Data 2021. 7. 9. 14:20
아래 이미지는 현 회사에서 관리하고 있는 ETL 파이프라인 중 마케팅 Summary data에 관한 Airflow DAG 그래프입니다.
마케팅 데이터 파이프라인의 구성
1. 소스 데이터 입수
- 위의 DAG에는 포함되어있지 않지만, 광고 매체의 보고서 API를 호출하거나 서비스 DB에서 소스를 추출하는 스케줄도 에어플로우로 관리합니다.
2. 가공 및 적재
- 위의 DAG에서 가장 처음으로 실행되는 두 개의 Task인 userid_mapping >> userid_mapped_logs에 해당합니다.
이 Task는 log 데이터에서 로그인 전 생성되는 anonymous_id와 로그인 후 기록되는 user_id를 매핑해서
특정 유저가 로그인하기 전의 로그까지 해당 유저의 기록으로 연결해 주는 역할을 합니다. - 다음으로 분기되는 daily_active_users_list, users_first_visit Task는 매핑된 로그 테이블을 기반으로 해당일 방문한 전체 유저 리스트와 처음 방문한 유저 리스트를 기록합니다.
- 이후 다른 서버에서 진행되는 ETL 작업이 완료되었는지 체크하는 DW_data_check Task를 거쳐, 해당 데이터를 활용하는 Task들로 이어지게 설계했습니다.
3. 복잡한 테이블 합성
- daily_utm_funnel Task의 경우 DW_data_check와 userid_mapped_logs 두 가지 Task에 의존성을 가지고 있습니다.
여기서는 로그 데이터와 DB 정보를 조합해 유저 별로 해당 일자의 행동 흐름을 주요 이벤트 단위로 집계하는 작업을 합니다. - user_signed_up_utm, utm_orders_status_final Task는 위의 daily_utm_funnel, DW_data_check에 의존성이 있으며,
유저 별 가입 경로와 주문 경로, 이후 액션들을 UTM Tag 단위로 집계하는 작업입니다. - 마케팅 데이터 가공의 마지막 단계는 utm_spend_full_joined Task입니다.
이 Task는 위의 daily_utm_funnel, user_signed_up_utm, utm_orders_status_final, mkt_spend까지 총 네 개의 Task와 연결되어 있으며, 각 작업을 통해 생성된 테이블을 full join하여 전체 마케팅 비용과 유입 경로부터 구매 기여까지를 집계하게 됩니다.
4. Tableau Refresh
- 앞서 이야기한 작업들이 데이터 ETL 과정이었다고 하면, 이 단계에서는 완전히 다른 작업을 수행합니다.
- Tableau Server를 통해 시각화된 대시보드를 데이터 소비자들에게 제공하고 있기 때문에, 기존에는 ETL 작업이 완료되는 시점에 맞추어 Tableau Server에서 직접 연결된 데이터 원본을 새로고침하는 스케줄을 운영했었습니다.
- 현재는 Tableau API와 Airflow의 PythonOperator를 활용하여 하나의 프로세스로 관리하고 있습니다.
- 관련 코드는 아래와 같습니다.
# Tableau API 호출 코드 import tableauserverclient as TSC import pandas as pd server = TSC.Server('https://{tabeau_server_host}', use_server_version=True) tableau_auth = TSC.PersonalAccessTokenAuth(token_name, personal_access_token) request_options = TSC.RequestOptions(pagesize=100) sources = [] def tableau_refresh(source_name=str): with server.auth.sign_in(tableau_auth): all_datasources, pagination_item = server.datasources.get(request_options) for datas in all_datasources: sources.append([datas.name, datas.id]) df = pd.DataFrame(sources, columns=['name', 'id']) target_id = df.loc[df['name'].str.contains(source_name), 'id'].values print(df.loc[df['name'].str.contains(source_name), 'name'].values) for target in target_id : data = server.datasources.get_by_id(target) server.datasources.refresh(data) # Airflow Task 코드 from airflow.operators.python import PythonOperator import tableau_refresh as ta def tableau_utm_spend_full_joined(): t = ta.tableau_refresh('utm_spend_full_joined') return t tableau_utm_spend_full_joined_refresh = PythonOperator( task_id='tableau_utm_spend_full_joined_refresh', python_callable=tableau_utm_spend_full_joined, dag=dag )
Task 간 의존성 설정 방법
- 병렬 관계 : ',' 로 연결
- 의존 관계 : '>>'로 연결
- Task 그룹 : []로 묶음
[userid_mapped >> userid_mapped_logs] >> daily_utm_funnel [userid_mapped >> userid_mapped_logs] >> daily_active_users_list >> DW_data_check [userid_mapped >> userid_mapped_logs] >> users_first_visit >> DW_data_check [userid_mapped >> userid_mapped_logs >> userid_mapped_utm] >> user_signed_up_utm [userid_mapped_logs, userid_mapped_utm] >> userid_mapped_session_logs userid_mapped_utm >> utm_orders_status_final userid_mapped_utm >> utm_orders_status_final_1d userid_mapped_session_logs >> daily_utm_funnel DW_data_check >> [utm_orders_status_final, user_signed_up_utm] >> utm_spend_full_joined DW_data_check >> utm_orders_status_final_1d DW_data_check >> daily_dormant_user_order DW_data_check >> users_history >> user_segment_category >> tableau_user_segment DW_data_check >> daily_utm_funnel >> utm_spend_full_joined MKT_data_check >> mkt_spend >> utm_spend_full_joined utm_spend_full_joined >> tableau_utm_spend_full_joined_refresh [utm_orders_status_final, utm_orders_status_final_1d] >> tableau_utm_orders_status_final_refresh user_signed_up_utm >> tableau_user_signed_up_utm_refresh utm_orders_status_final >> tableau_user_signed_up_utm_refresh
Airflow를 통한 마케팅 데이터 파이프라인 관리의 효용성
- 하나의 DAG로 마케팅 데이터 파이프라인을 구성함으로써 각 ETL Task의 관계를 시각적으로 편리하게 확인할 수 있습니다.
- 특정 Task가 실패했을 때, 의존성을 지닌 Upstream / Downstream Task들까지 쉽게 재시작할 수 있습니다.
- execution_data에 대한 개념만 잘 잡혀 있다면 스케줄 변경도 자유롭게 할 수 있습니다.
- 일 단위 스케줄일 때, execution_date는 실행하는 시점의 전날을 가리킵니다.
- 시간 단위 스케줄일 때, execution_date는 실행하는 시점의 이전 시각을 가리킵니다.
- DAG의 start_date는 실행 시각이 아닌 execution_date를 기준으로 합니다.
'About Data' 카테고리의 다른 글
BigQueryML로 유저 세그먼트 클러스터링하기 (0) 2023.10.27 Google Spreadsheet 데이터 DB화 하기 (0) 2021.07.03 Segment -> AWS Glue, S3, Kinesis, Lambda를 이용한 클라이언트 로그 스트림 구축 (0) 2021.07.03 Terraform을 활용한 EMR Presto 도입기 (1) 2021.06.05 사이즈가 큰 csv데이터 S3 -> redshift DB로 Copy하기 (0) 2020.12.18