ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 사이즈가 큰 csv데이터 S3 -> redshift DB로 Copy하기
    About Data 2020. 12. 18. 17:27

    약 7만 row, 40mb정도 되는 파일을 redshift로 올리기 위해 가장 먼저 아래와 같은 python 코드를 사용했었습니다.

    df.to_sql(tablename, if_exists = 'replace', con=engine, schema = schema, index=False)

    그런데 이렇게 하고 redshift 쿼리 로그를 보니 무려 한 줄씩 일일히 insert로 들어가고 있던.....

    한 줄마다 평균 0.5초씩만 계산해도 9시간이 넘게 소요되는 속도입니다.

     

    이건 아니다 싶어서 방법을 찾다가 S3 버킷에 파일을 올리고 redshift copy문을 찾아서 적용해 봤습니다.

     

    먼저 내용이 있는 데이터프레임을 csv형식으로 S3에 올리기 위해 사전 작업을 합니다.

    csv_buffer = StringIO()
    df.to_csv(csv_buffer, compression='gzip', index = False)

    스케줄로 돌릴 작업이기 때문에 시시각각 csv 파일을 로컬에 생성하고 싶지 않아서

    StringIO 모듈을 활용해 임시 텍스트로 저장합니다.

    그리고 이 텍스트 값을 gzip 포맷으로 압축해서 사이즈를 줄입니다.

     

    boto3 라이브러리를 활용하여 S3에 csv 파일을 업로드합니다.

    import boto3
    s3 = boto3.client("s3",\
                      region_name=region_name,\
                      aws_access_key_id=aws_access_key_id,\
                      aws_secret_access_key=aws_secret_access_key)
    s3.put_object(Bucket=bucketname, Body=csv_buffer.getvalue(), Key=filename)

    이 때, 업로드할 버킷은 미리 만들어져 있어야 하고, 파일명은 폴더/파일 형식으로 계층을 나눌 수도 있습니다.

     

    다음으로 redshift에서 카피할 s3 파일 경로를 불러옵니다.

    file_path = f's3://{bucketname}/{tablename}/'

     

    sql engine을 구성하고 copy query를 실행하기 전에, 테이블을 먼저 생성해 주어야 합니다.

    스케줄 잡의 특성 상 이미 있을 수도 있는 테이블을 먼저 제거하고 다시 생성해 줍니다.

    redshift에서는 테이블을 생성할 때 컬럼 정보를 모두 기입해야 하는데, 우선 VARCHAR로 통일하기로 하고

    다양한 파일을 업로드하는 코드 목적에 따라 for문으로 컬럼 정보를 읽어 저장합니다.

    col = StringIO()
    
    for c in range(len(df.columns)) :
      if c == len(df.columns) - 1 :
        col.write(df.columns[c]+' varchar')
      else :
        col.write(df.columns[c]+' varchar,\n')
        
        
     engine.execute(
     f"""
       drop table if exists {schema}.{tablename}
     """)
     
     engine.execute(
     f"""
       create table if not exists {schema}.{tablename}
       (
       	{col.getvalue()}
       )
     """)

     

    마지막으로 생성한 테이블에 맞게 copy문을 작성합니다.

    import psycopg2
    from sqlalchemy import create_engine
    
    engine = create_engine(f'postgresql+psycopg2://{user}:{pw}@{host}:{port}/{dbname}')
    
    engine.execute(
    	f"copy {schema}.{tablename} from '{file_path}' iam_role '{iam_arn}' \
        csv ignoreheader 1; commit;")

    여기서 골치 아픈 문제를 겪었는데요,
    copy문을 실행한 뒤에 error가 없는데도 DB에 파일이 정상적으로 insert되지 않고 undoing으로 롤백되어

    매우 오랫동안 서치를 해 본 결과

    AWS redshift 콘솔 외에서는 마지막에 commit을 하거나 autocommit 설정을 해줘야 한다는 사실을 알았습니다.

     

    이렇게 짠 코드를 airflow에서 스케줄로 실행해본 결과 40초 내외로 7만 row가 모두 insert되었습니다.

    처음 예상 시간 9시간에서 비교할 수 없을 만큼 줄어들었습니다.

     

    +

    주의해야 할 부분

    aws cli를 이용해서 코드를 실행할 서버에 적절한 권한을 부여해야 합니다.

    iam role을 이용할 경우 기본적으로 s3, redshift 권한이 둘 다 있어야 합니다.

     

     

     

     

Designed by Tistory.