ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Google Spreadsheet 데이터 DB화 하기
    About Data 2021. 7. 3. 20:39

    1. 도입 배경

    • 마케팅, 비즈니스, 물류, 콘텐츠 등.. 수많은 유저들이 구글 스프레드시트에 데이터를 기록합니다.
    • DB 데이터와 연결하여 분석하기 위해서 기존에는 Re:dash의 스프레드시트 커넥터를 사용했습니다.
    • Tableau 등 시각화에 대한 니즈가 커지면서 Re:dash에서 데이터를 보는 것만으로는 부족하게 되었습니다.

    2. 작업 내역

    • GCP에서 프로젝트를 만들고, 서비스 계정을 생성합니다.
    • api 접속을 위한 키를 발급 받고, auth 코드를 작성합니다.
      def gs_auth() :
          # 구글 API 인증
          scope = ['https://spreadsheets.google.com/feeds',
                   'https://www.googleapis.com/auth/drive']
          credentials = ServiceAccountCredentials.from_json_keyfile_name(
                  'google_api_key.json', scope)
          gc = gspread.authorize(credentials)
      
          return gc​
    • DB화 할 시트들을 또 하나의 스프레드시트 안에 목록으로 정리하고 읽어 옵니다.
      목록은 실행 상태, 스케줄, 생성자 정보, 시트명, 생성할 테이블 이름으로 구성됩니다.
      def gs_list() :
          # 대상 시트 리스트
          gc1 = gs_auth().open_by_url("https://docs.google.com/spreadsheets/d/{id}")
      
          return gc1​
    • 읽어 온 목록 안에서 각각의 시트를 추출합니다.
      # 생성할 테이블 리스트 추출
          gc2 = gs_list().worksheet('sheets').get_all_values()
      
          gslist = []
          for n in range(len(gc2)) :
              if n > 1 :
                  gslist.append(gc2[n])​
    • 시트의 내용을 추출하여 S3에 Json 파일로 적재하고,
      스케줄 형식이 맞지 않거나 컬럼 이름이 잘못되었을 때의 오류 케이스를 생성합니다.
      # 대상 시트 추출
          for i in range(len(gslist)) :
              tablename = '{}'.format(gslist[i][6])
      
              # 스케줄 확인
              now = datetime.now()+timedelta(hours=9)
              cron = '{}'.format(gslist[i][1])
      
              try :
                  if croniter.is_valid(cron) :
                      if croniter.match(cron, now) :
                          if gslist[i][0] == 'delete' :
                              if db_list()['tablename'].str.contains(tablename).any() :
                                  engine.execute("drop table if exists hive.tier1_external_gs.{}".format(tablename))
                                  resultdf.append([gslist[i][2], gslist[i][4], tablename, 'deleted', ' '])
                                  print(tablename, 'deleted')
                              else :
                                  continue
      
                          elif gslist[i][0] == 'stop' :
                              continue
      
                          elif gslist[i][0] == 'start' :
                              gs = gs_auth().open_by_url(gslist[i][4]).worksheet(gslist[i][5]).get_all_values()
                              df = pd.DataFrame(gs, columns = gs[0])
                              df = df.reindex(df.index.drop(0))
      
                              if re.search(r'\W', ''.join(df.columns)) :
                                  raise ValueError('컬럼에 잘못된 문자가 있습니다.')
      
                              else :
                                  pq = df.to_json(orient='records', lines=True, force_ascii=False)
                                  s3.put_object(Body=pq, Bucket='bucket', Key=f'tier1_external_gs/{tablename}/{tablename}.json')
      
                                  resultdf.append([gslist[i][2], gslist[i][4], tablename, 'success', ' '])
                                  print(tablename, 'success')
                      else :
                          continue
                  else :
                      raise ValueError("{} 표기 형식이 잘못되었습니다.".format(cron))
      
              except Exception as e :
                  resultdf.append([gslist[i][2], gslist[i][4], tablename, 'fail', 'error'])
                  print(tablename, 'fail', '{}'.format(e))​
    • Glue Crawler를 사용해 해당 S3 경로에 적재된 디렉토리들을 Hive 테이블로 가져옵니다.

    3. 비하인드

    • 이 서비스는 전 회사 Y에서 너무 잘 사용했던 것이었는데, 현 회사에 없어서 직접 만들어 봤습니다.
      (사실 입사하기 전부터 필요하다고 했는데 이전 DE가 3개월 넘게 스루하길래 빡쳐서 직접 만들게 된...)
    • 처음에는 RDS에 직접 to_sql로 insert하는 방식으로 구현했는데 매우매우 사이즈가 큰 시트가 있어서 겸사겸사 S3로 선회했습니다.
Designed by Tistory.