-
Google Spreadsheet 데이터 DB화 하기About Data 2021. 7. 3. 20:39
1. 도입 배경
- 마케팅, 비즈니스, 물류, 콘텐츠 등.. 수많은 유저들이 구글 스프레드시트에 데이터를 기록합니다.
- DB 데이터와 연결하여 분석하기 위해서 기존에는 Re:dash의 스프레드시트 커넥터를 사용했습니다.
- Tableau 등 시각화에 대한 니즈가 커지면서 Re:dash에서 데이터를 보는 것만으로는 부족하게 되었습니다.
2. 작업 내역
- GCP에서 프로젝트를 만들고, 서비스 계정을 생성합니다.
- api 접속을 위한 키를 발급 받고, auth 코드를 작성합니다.
def gs_auth() : # 구글 API 인증 scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive'] credentials = ServiceAccountCredentials.from_json_keyfile_name( 'google_api_key.json', scope) gc = gspread.authorize(credentials) return gc
- DB화 할 시트들을 또 하나의 스프레드시트 안에 목록으로 정리하고 읽어 옵니다.
목록은 실행 상태, 스케줄, 생성자 정보, 시트명, 생성할 테이블 이름으로 구성됩니다.
def gs_list() : # 대상 시트 리스트 gc1 = gs_auth().open_by_url("https://docs.google.com/spreadsheets/d/{id}") return gc1
- 읽어 온 목록 안에서 각각의 시트를 추출합니다.
# 생성할 테이블 리스트 추출 gc2 = gs_list().worksheet('sheets').get_all_values() gslist = [] for n in range(len(gc2)) : if n > 1 : gslist.append(gc2[n])
- 시트의 내용을 추출하여 S3에 Json 파일로 적재하고,
스케줄 형식이 맞지 않거나 컬럼 이름이 잘못되었을 때의 오류 케이스를 생성합니다.
# 대상 시트 추출 for i in range(len(gslist)) : tablename = '{}'.format(gslist[i][6]) # 스케줄 확인 now = datetime.now()+timedelta(hours=9) cron = '{}'.format(gslist[i][1]) try : if croniter.is_valid(cron) : if croniter.match(cron, now) : if gslist[i][0] == 'delete' : if db_list()['tablename'].str.contains(tablename).any() : engine.execute("drop table if exists hive.tier1_external_gs.{}".format(tablename)) resultdf.append([gslist[i][2], gslist[i][4], tablename, 'deleted', ' ']) print(tablename, 'deleted') else : continue elif gslist[i][0] == 'stop' : continue elif gslist[i][0] == 'start' : gs = gs_auth().open_by_url(gslist[i][4]).worksheet(gslist[i][5]).get_all_values() df = pd.DataFrame(gs, columns = gs[0]) df = df.reindex(df.index.drop(0)) if re.search(r'\W', ''.join(df.columns)) : raise ValueError('컬럼에 잘못된 문자가 있습니다.') else : pq = df.to_json(orient='records', lines=True, force_ascii=False) s3.put_object(Body=pq, Bucket='bucket', Key=f'tier1_external_gs/{tablename}/{tablename}.json') resultdf.append([gslist[i][2], gslist[i][4], tablename, 'success', ' ']) print(tablename, 'success') else : continue else : raise ValueError("{} 표기 형식이 잘못되었습니다.".format(cron)) except Exception as e : resultdf.append([gslist[i][2], gslist[i][4], tablename, 'fail', 'error']) print(tablename, 'fail', '{}'.format(e))
- Glue Crawler를 사용해 해당 S3 경로에 적재된 디렉토리들을 Hive 테이블로 가져옵니다.
3. 비하인드
- 이 서비스는 전 회사 Y에서 너무 잘 사용했던 것이었는데, 현 회사에 없어서 직접 만들어 봤습니다.
(사실 입사하기 전부터 필요하다고 했는데 이전 DE가 3개월 넘게 스루하길래 빡쳐서 직접 만들게 된...) - 처음에는 RDS에 직접 to_sql로 insert하는 방식으로 구현했는데 매우매우 사이즈가 큰 시트가 있어서 겸사겸사 S3로 선회했습니다.
'About Data' 카테고리의 다른 글
BigQueryML로 유저 세그먼트 클러스터링하기 (0) 2023.10.27 Airflow로 마케팅 데이터 파이프라인 관리하기 (0) 2021.07.09 Segment -> AWS Glue, S3, Kinesis, Lambda를 이용한 클라이언트 로그 스트림 구축 (0) 2021.07.03 Terraform을 활용한 EMR Presto 도입기 (1) 2021.06.05 사이즈가 큰 csv데이터 S3 -> redshift DB로 Copy하기 (0) 2020.12.18