ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Segment -> AWS Glue, S3, Kinesis, Lambda를 이용한 클라이언트 로그 스트림 구축
    About Data 2021. 7. 3. 20:00

    1. 배경

    • 기존 클라이언트 로그 환경 : Segment 서비스에서 Redshift로 바로 적재, 1시간 간격
    • 문제점 
      • 1시간 이내 로그 조회 불가
      • 파티셔닝이 되어 있지 않아 조회 속도 느림
      • 이벤트 소스, 유형 별로 각각 테이블이 생성되어 유저의 행동 흐름을 파악하기 어려움

    2. 작업 내역

    • Segment에서 Kinesis로 로그 전송하도록 설정
    • Kinesis에서 S3로 로그 적재하는 Firehose 데이터 전송 스트림 생성
      • S3 전송 시 디폴트로 연/월/일/시 4depth의 파티션이 구성됨
    • Lambda에서 S3에 로그가 적재될 때마다 특정 디렉토리 하나로 이동시키는 함수 생성
      • Kinesis에서 보내 주는 24시간 분량의 데이터만 파티션 없이 모아 두기
      • Glue Crawler로 테이블을 생성한 후 스키마를 일괄 편집하기 위함
    • Glue Crawler로 테이블 생성 후 스키마 편집
      • 3rd Party 툴인 Segment 특성 상, 들어오는 데이터 스키마가 균일하지 않고 Unreadable한 경우도 빈번
      • StructType 데이터를 모두 String으로 변환 -> Json 스트링으로 표시됨
    •  Presto Engine에서 Glue를 Hive Metastore로 활용하여 데이터 조회
      • Kinesis Firehose를 통해 S3로 데이터가 적재될 때마다 바로 추가된 데이터 이용 가능 (약 15초 간격)
      • Json String으로 묶여 있는 Context, Properties 등을 풀어 ETL, 데이터 소비자에게 제공

    3. 참고 사항

    • Segment에서 Kinesis 외 S3 Destination도 함께 사용함
      • Kinesis는 오직 Streaming을 위한 방안으로 1일 치 데이터만 사용
      • 로그 누적 및 장기 보관을 위해 별도 적재 중
    • Kinesis의 함정카드..!
      • json type의 데이터를 레코드로 받아올 때, 줄바꿈을 해주지 않아 전체 레코드 array가 한 줄로 인식되고,
        presto에서 조회할 때 첫 번째 레코드만 읽히는 비극이 발생합니다.
      • Kinesis 데이터 전송에서 Transform source records with AWS Lambda 기능을 활용해 아래와 같이 줄 바꿈을 추가해 주어야 전체 레코드를 읽을 수 있게 됩니다.
        import base64
        import json
        import boto3
        
        print('Loading function')
        
        def lambda_handler(event, context):
            output = []
            
            for record in event['records']:
                payload = base64.b64decode(record['data']).decode('utf-8')
                
                row_w_newline = payload + "\n"
                
                row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))
                
                output_record = {
                    'recordId': record['recordId'],
                    'result': 'Ok',
                    'data': row_w_newline
                }
                output.append(output_record)
                
            print('Processed {} records.'.format(len(event['records'])))
            
            return {'records': output}

    4. 후속 작업 계획

    • 사실은 Segment를 떼고 자체 API로 로그를 쌓는 것이 옳다
    • 로그 정의서부터 차곡차곡 장기 프로젝트로 진행할 예정
Designed by Tistory.