-
Terraform을 활용한 EMR Presto 도입기About Data 2021. 6. 5. 06:16
현 회사에서 EMR Presto를 도입한 과정에 대해 간략히 정리해 봅니다.
1. 기존의 환경
- 메인 서비스 DB : Mongodb
- Analytics Node를 활용한 MQL 데이터 분석
- 기타 서비스 DB : MariaDB
- RDS Replica를 활용한 SQL 데이터 분석
- Data Warehouse : Redshift1
- 위의 서비스DB들을 원본으로 PySpark & Glue Metastore를 통한 1일 주기의 ETL 진행
- Data Lake : S3
- Service Log : Redshift2
- Segment 플랫폼을 통한 1시간 주기의 원본 업데이트
- 기타 Summary Data : Postrgesql DB
- 데이터 소비자들이 위의 원천 데이터들을 가공하여 별도로 저장하는 공간
2. 문제점
- DW 업데이트 주기가 느리고 적재되지 않은 데이터가 많아서 실무자들이 원천 DB를 연결해서 사용해요
- Mongodb를 조회하기 위한 MQL의 진입장벽이 높아 DA에게 애드혹이 몰렸어요
- Data Partitioning이 제대로 되어 있지 않아 쿼리 조회 속도가 나지 않아요
- 기존 ETL 데이터 전체가 거대한 레거시가 되었어요
- 각종 데이터들이 여기저기 흩어져있어서 Join 한 번 걸기도 쉽지 않아요
- Re:dash의 Query Result 기능에 의존하고 있었어요
3. 작업 배경
- 당시 Data Engineer 역시 문제를 인식했지만 변화는 없었어요
- 그리고 쿨퇴사..
- 1년동안 8커밋 실화?
- 남은 Data Analyst들과 Data Scientist들의 고뇌
- 새 DE들이 올 때까지 유지하는 데만 힘을 쏟자
- 하지만 주인 잃은 환경이 문제를 드러내기 시작하는데...
- Main ETL 에어플로우 DAG 스케줄 문제가 생긴 걸 그냥 수동으로 돌리고 있었다고 합니다
- execution_date의 기본 개념에 관한 매우 간단한 문제였음
- EMR 보안에 심각한 취약점이 있었습니다
- 자세한 설명은 생략합니다
- 마케팅 팀에서 필요로 하는 광고 데이터 ETL은 아예 손을 놓고 있었습니다
- 날짜 단위로 이빨 빠진 데이터가 산더미....
- Main ETL 에어플로우 DAG 스케줄 문제가 생긴 걸 그냥 수동으로 돌리고 있었다고 합니다
=> 안되겠다 최소한 데이터를 제대로 쓸 수는 있게 하자
4. 작업 히스토리
Phase 1
- 인수인계 내용을 바탕으로 기존 환경에 대한 스터디 진행
- 에어플로우 스케줄링 문제 해결
- 만료된 키 교체
- 광고 ETL을 DA환경에서 진행
Phase 2
- Staging 환경에서 EMR Presto 테스트 진행
- Terraform을 이용해 배포되는 EMR configuration에 아래 내용을 추가했습니다.
정확히는 Presto가 아닌 PrestoSQL(a.k.a Trino)을 도입했는데.. 이유는 다음과 같습니다{ "Classification": "prestosql-connector-hive", "Properties": { "hive.metastore": "glue", "hive.metastore.glue.datacatalog.enabled": "true", "hive.s3-file-system-type": "PRESTO" } }, { "Classification": "prestosql-connector-postgresql", "Properties": { "connection-url": "", "connection-user": "", "connection-password": "" } }, { "Classification": "prestosql-connector-redshift", "Properties": { "connection-url": "", "connection-user": "", "connection-password": "" } }, { "Classification": "prestosql-connector-mysql", "Properties": { "connection-url": "", "connection-user": "", "connection-password": "" } }, { "Classification": "prestosql-connector-mongodb", "Properties": { "mongodb.seeds": "", "mongodb.credentials": "", "mongodb.ssl.enabled": "", "mongodb.read-preference": "SECONDARY", "mongodb.required-replica-set": "", "mongodb.case-insensitive-name-matching": "true" } },
Mongodb의 Collection 이름 등에 대소문자가 혼용되었는데 이를 구분하는 설정이 PrestoSQL에만 있기 때문입니다.
(맨 아랫줄 mongodb.case-insensitive-name-matching 부분에 해당합니다.)
참고 : PrestoDB vs PrestoSQL - 결과
- 대부분의 DB를 하나의 쿼리로 접근할 수 있게 되었습니다.
- Terraform을 이용해 배포되는 EMR configuration에 아래 내용을 추가했습니다.
Phase 3
- Data 통합
- 원천 DB(레플리카)를 제외하고, 별도 관리되고 있던 Postgresql DB의 Summary 데이터들을
Hive 카탈로그 아래로 마이그레이션했습니다.
(Partition도 추가해서 빠름빠름)
- 원천 DB(레플리카)를 제외하고, 별도 관리되고 있던 Postgresql DB의 Summary 데이터들을
- 자원 관리
- DE가 안 도와줘서.. 직접 얼기설기 만들었던 Summary 데이터용 Airflow 환경의 경우
PySpark를 활용하기 위해(그러나 EMR은 사용할 줄 몰랐었기 때문에) 매우 큰 인스턴스를 독점하고 있었습니다.
Presto를 도입하면서 보다 가벼워진 Summary Task만 처리할 수 있도록 효율화할 예정입니다.
- DE가 안 도와줘서.. 직접 얼기설기 만들었던 Summary 데이터용 Airflow 환경의 경우
- Airflow Template 제공
- DA들이 가공한 데이터를 Hive 카탈로그에 보다 쉽게 적재할 수 있도록
최소한의 요소(SQL 파일, task_id, partition key)만 입력하면 Airlfow DAG를 생성하는 코드를 작성했습니다.
(수행되는 코드)# sql_task.py from airflow.operators.sql import SQLCheckOperator ... def sql_task(ds, sql): default_path=('~/') for line in open(default_path+sql, 'r').read().split(';') : if ("delete" in line or "DELETE" in line) : engine.execute(line.format(ds=ds)) else : operator = SQLCheckOperator( task_id='Operator', sql=line.format(ds=ds), conn_id='presto_default' ) operator.execute(dict())
(DAG 예시)from airflow import DAG from sql_task import sql_task from airflow.operators.python import PythonOperator default_args={ 'owner': 'rami', 'depends_on_past': False, 'start_date': datetime(2021, 6, 1, 0, 0), 'retries': 7, 'retry_delay': timedelta(minutes=5), } ds = 'tomorrow_ds_nodash' with DAG( 'presto_dau_dag_0.0.1', default_args=default_args, catchup=False, description='이벤트 로그의 DAU, WAU, MAU', schedule_interval= '0 20 * * *' ) as dag: def dau(**kwargs): sql_task(sql='dau.sql', ds=kwargs[ds]) dau = PythonOperator(python_callable=dau, task_id='dau') def wau(**kwargs): sql_task(sql='wau.sql', ds=kwargs[ds]) wau = PythonOperator(python_callable=wau, task_id='wau') def mau(**kwargs): sql_task(sql='mau.sql', ds=kwargs[ds]) mau = PythonOperator(python_callable=mau, task_id='mau') dau, wau, mau
- DA들이 가공한 데이터를 Hive 카탈로그에 보다 쉽게 적재할 수 있도록
5. 해결하고 싶은 문제
- EMR Presto Configration으로 같은 종류의 Connector를 여러 개 연결하려면 어떻게 해야 할까요?ㅠ
- 애초에 Redshift를 두개 쓰고 있는 환경이 이상한 게 아닐까... 까지 도달했습니다
- 아니 그런데 Mysql이나 Postgresql을 여러 개 쓸 수도 있는거 아닌가요?!
- Terraform으로 EMR 배포할 때 Subnet 설정에서 막힙니다
- 정확히는 Subnet을 새로 만들어서 설정하면 EMR application 생성에 실패합니다
- 네트워크 전문가의 도움을 기다립니다ㅠㅠ
6. 후기
- 전문 엔지니어가 아니다보니 여전히 비효율적인 코드와 비효율적인 방법들을 시도했을 수 있습니다.
하지만 결과는 잘 돌아가고 있고, 미흡한 부분은 전문가의 도움을 받아 개선해 나가면 된다고 생각합니다. - 밑바닥부터 삽질해 준 동료 데이터 분석가 D에게 특별히 감사합니다.
- 저희는 이제 데이터 분석가가 아니라 그냥 '데이터 인간' 해야 할 것 같습니다..
- 직접 만든 환경에서 앞으로 분석도 열심히 하겠습니다...
'About Data' 카테고리의 다른 글
Airflow로 마케팅 데이터 파이프라인 관리하기 (0) 2021.07.09 Google Spreadsheet 데이터 DB화 하기 (0) 2021.07.03 Segment -> AWS Glue, S3, Kinesis, Lambda를 이용한 클라이언트 로그 스트림 구축 (0) 2021.07.03 사이즈가 큰 csv데이터 S3 -> redshift DB로 Copy하기 (0) 2020.12.18 수학으로만 답할 수 없는 데이터들 (0) 2020.10.19 - 메인 서비스 DB : Mongodb