ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Terraform을 활용한 EMR Presto 도입기
    About Data 2021. 6. 5. 06:16

    현 회사에서 EMR Presto를 도입한 과정에 대해 간략히 정리해 봅니다.

     

    1.  기존의 환경

    •  메인 서비스 DB : Mongodb
      • Analytics Node를 활용한 MQL 데이터 분석
    • 기타 서비스 DB : MariaDB
      • RDS Replica를 활용한 SQL 데이터 분석
    • Data Warehouse : Redshift1
      • 위의 서비스DB들을 원본으로 PySpark & Glue Metastore를 통한 1일 주기의 ETL 진행
      • Data Lake : S3
    • Service Log : Redshift2
      • Segment 플랫폼을 통한 1시간 주기의 원본 업데이트
    • 기타 Summary Data : Postrgesql DB
      • 데이터 소비자들이 위의 원천 데이터들을 가공하여 별도로 저장하는 공간

     

    2. 문제점

    • DW 업데이트 주기가 느리고 적재되지 않은 데이터가 많아서 실무자들이 원천 DB를 연결해서 사용해요
      • Mongodb를 조회하기 위한 MQL의 진입장벽이 높아 DA에게 애드혹이 몰렸어요
    • Data Partitioning이 제대로 되어 있지 않아 쿼리 조회 속도가 나지 않아요
      • 기존 ETL 데이터 전체가 거대한 레거시가 되었어요
    • 각종 데이터들이 여기저기 흩어져있어서 Join 한 번 걸기도 쉽지 않아요
      • Re:dash의 Query Result 기능에 의존하고 있었어요

     

    3. 작업 배경

    • 당시 Data Engineer 역시 문제를 인식했지만 변화는 없었어요
      • 그리고 쿨퇴사..
      • 1년동안 8커밋 실화?
    • 남은 Data Analyst들과 Data Scientist들의 고뇌
      • 새 DE들이 올 때까지 유지하는 데만 힘을 쏟자
      • 하지만 주인 잃은 환경이 문제를 드러내기 시작하는데...
        • Main ETL 에어플로우 DAG 스케줄 문제가 생긴 걸 그냥 수동으로 돌리고 있었다고 합니다
          • execution_date의 기본 개념에 관한 매우 간단한 문제였음
        • EMR 보안에 심각한 취약점이 있었습니다
          • 자세한 설명은 생략합니다
        • 마케팅 팀에서 필요로 하는 광고 데이터 ETL은 아예 손을 놓고 있었습니다
          • 날짜 단위로 이빨 빠진 데이터가 산더미....

    => 안되겠다 최소한 데이터를 제대로 쓸 수는 있게 하자

     

    4. 작업 히스토리

    Phase 1

    • 인수인계 내용을 바탕으로 기존 환경에 대한 스터디 진행
    • 에어플로우 스케줄링 문제 해결
    • 만료된 키 교체
    • 광고 ETL을 DA환경에서 진행

    Phase 2

    • Staging 환경에서 EMR Presto 테스트 진행
      • Terraform을 이용해 배포되는 EMR configuration에 아래 내용을 추가했습니다.
        {
            "Classification": "prestosql-connector-hive",
            "Properties": {
              "hive.metastore": "glue",
              "hive.metastore.glue.datacatalog.enabled": "true",
              "hive.s3-file-system-type": "PRESTO"
            }
          },
           {
            "Classification": "prestosql-connector-postgresql",
            "Properties": {
              "connection-url": "",
              "connection-user": "",
              "connection-password": ""
            }
          },
          {
            "Classification": "prestosql-connector-redshift",
            "Properties": {
              "connection-url": "",
              "connection-user": "",
              "connection-password": ""
            }
          },
          {
            "Classification": "prestosql-connector-mysql",
            "Properties": {
              "connection-url": "",
              "connection-user": "",
              "connection-password": ""
            }
          },
          {
            "Classification": "prestosql-connector-mongodb",
            "Properties": {
              "mongodb.seeds": "",
              "mongodb.credentials": "",
              "mongodb.ssl.enabled": "",
              "mongodb.read-preference": "SECONDARY",
              "mongodb.required-replica-set": "",
              "mongodb.case-insensitive-name-matching": "true"
            }
          },
        정확히는 Presto가 아닌 PrestoSQL(a.k.a Trino)을 도입했는데.. 이유는 다음과 같습니다
        Mongodb의 Collection 이름 등에 대소문자가 혼용되었는데 이를 구분하는 설정이 PrestoSQL에만 있기 때문입니다.
        (맨 아랫줄 mongodb.case-insensitive-name-matching 부분에 해당합니다.)
        참고 : PrestoDB vs PrestoSQL
      • 결과
        • 대부분의 DB를 하나의 쿼리로 접근할 수 있게 되었습니다.

    Phase 3

    • Data 통합
      • 원천 DB(레플리카)를 제외하고, 별도 관리되고 있던 Postgresql DB의 Summary 데이터들을 
        Hive 카탈로그 아래로 마이그레이션했습니다. 
        (Partition도 추가해서 빠름빠름)
    • 자원 관리
      • DE가 안 도와줘서.. 직접 얼기설기 만들었던 Summary 데이터용 Airflow 환경의 경우
        PySpark를 활용하기 위해(그러나 EMR은 사용할 줄 몰랐었기 때문에) 매우 큰 인스턴스를 독점하고 있었습니다.
        Presto를 도입하면서 보다 가벼워진 Summary Task만 처리할 수 있도록 효율화할 예정입니다.
    • Airflow Template 제공
      • DA들이 가공한 데이터를 Hive 카탈로그에 보다 쉽게 적재할 수 있도록
        최소한의 요소(SQL 파일, task_id, partition key)만 입력하면 Airlfow DAG를 생성하는 코드를 작성했습니다.
        # sql_task.py
        
        from airflow.operators.sql import SQLCheckOperator
        
        ...
        
        def sql_task(ds, sql):
            default_path=('~/')
        
            for line in open(default_path+sql, 'r').read().split(';') :
                if ("delete" in line or "DELETE" in line) :
                    engine.execute(line.format(ds=ds))
        
                else :
                    operator = SQLCheckOperator(
                        task_id='Operator',
                        sql=line.format(ds=ds),
                        conn_id='presto_default'
                    )
        
                    operator.execute(dict())
        (수행되는 코드)

        from airflow import DAG
        from sql_task import sql_task
        from airflow.operators.python import PythonOperator
        
        default_args={
            'owner': 'rami',
            'depends_on_past': False,
            'start_date': datetime(2021, 6, 1, 0, 0),
            'retries': 7,
            'retry_delay': timedelta(minutes=5),
        }
        
        ds = 'tomorrow_ds_nodash'
        
        with DAG(
                'presto_dau_dag_0.0.1',
                default_args=default_args,
                catchup=False,
                description='이벤트 로그의 DAU, WAU, MAU',
                schedule_interval= '0 20 * * *'
            ) as dag:
        
            def dau(**kwargs):
                sql_task(sql='dau.sql', ds=kwargs[ds])
            dau = PythonOperator(python_callable=dau, task_id='dau')
        
            def wau(**kwargs):
                sql_task(sql='wau.sql', ds=kwargs[ds])
            wau = PythonOperator(python_callable=wau, task_id='wau')
        
            def mau(**kwargs):
                sql_task(sql='mau.sql', ds=kwargs[ds])
            mau = PythonOperator(python_callable=mau, task_id='mau')
        
        
        dau, wau, mau​
        (DAG 예시)

    5. 해결하고 싶은 문제

    • EMR Presto Configration으로 같은 종류의 Connector를 여러 개 연결하려면 어떻게 해야 할까요?ㅠ
      • 애초에 Redshift를 두개 쓰고 있는 환경이 이상한 게 아닐까... 까지 도달했습니다
      • 아니 그런데 Mysql이나 Postgresql을 여러 개 쓸 수도 있는거 아닌가요?!
    • Terraform으로 EMR 배포할 때 Subnet 설정에서 막힙니다
      • 정확히는 Subnet을 새로 만들어서 설정하면 EMR application 생성에 실패합니다 
      • 네트워크 전문가의 도움을 기다립니다ㅠㅠ

     

    6. 후기

    • 전문 엔지니어가 아니다보니 여전히 비효율적인 코드와 비효율적인 방법들을 시도했을 수 있습니다. 
      하지만 결과는 잘 돌아가고 있고, 미흡한 부분은 전문가의 도움을 받아 개선해 나가면 된다고 생각합니다.
    • 밑바닥부터 삽질해 준 동료 데이터 분석가 D에게 특별히 감사합니다.
    • 저희는 이제 데이터 분석가가 아니라 그냥 '데이터 인간' 해야 할 것 같습니다..
    • 직접 만든 환경에서 앞으로 분석도 열심히 하겠습니다...
Designed by Tistory.