회사에서 Veritca to mysql(cloud sql) 로 배치를 만들어야 되는 일을 하게 되었다.
airflow 에서 지원하는 operator 에는 VerticaToMySqlOperator 가 있긴 하다.
처음에는 저걸로 ETL을 구현하려고 했으나 .... 테스트중 문제가 발생
데이터 건수가 작을때는 문제 없었다.
그러나 120만건 정도 넘어가니
아래와 같은 오류 메시지가 나오면서 task가 죽어 버리는 현상이 발생한다.
구글에 문의해서 얻은 답은 해당 task가 메모리를 과도하게 써서 kill 시킨거라고 한다.
VerticaToMySqlOperator 이 소스를 보니 fetchall 이였다. 양이 늘어 나면 늘어날수록 메모리에 적재하다 임계치에 오면 kill이 되는거였군..
음.. 평균 600-700만건은 적재를 해야 되는데 어쩌지...
다른 GCP의 dataflow를 써보려고 찾아봐도 jdbc to jdbc 는 templete이 존재하지 않았다.
몇일 이거 저거 찾다가 . 그냥 코딩하기로 맘먹구 구현해봤다
일단 코딩하기 쉬운 python으로 구현했다. 소스데이터를 한번에 읽으면 어차피 똑같이 메모리 이슈가 날거 같아서
pandas를 이용해서 10000건씩 잘라서 가져오게 구현했다.
from airflow import DAG
from airflow import models
from airflow.providers.mysql.transfers.vertica_to_mysql import VerticaToMySqlOperator
from airflow.models import Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
import pymysql
from datetime import datetime
from datetime import date
import pendulum
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy import text
import sqlalchemy as sa
#
from pendulum.tz.timezone import Timezone
from datetime import timedelta
import csv
import os
today = date.today()
# 어제 날짜: 오늘 - 1일
yesterday = today - timedelta(days=1)
basis_dy = yesterday.strftime("%Y%m%d")
basis_mm = yesterday.strftime("%Y%m")
kst=Timezone('Asia/Seoul')
# today1 = datetime.date.today()
# Local timezone setting
local_tz = pendulum.timezone("Asia/Seoul")
# Set default arguments for the DAG
default_dag_args = {
"start_date": datetime(2024,3,11, tzinfo=kst),
'depends_on_past': False,
"retries":0
}
# today = datetime.date.today()
# 어제 날짜: 오늘 - 1일
# yesterday = today1 - timedelta(days=1)
CUR_DIR = os.path.abspath(os.path.dirname(__file__))
dag = DAG(
dag_id='wmp_analytics_v5',
default_args=default_dag_args,
# schedule_interval=None,
schedule_interval="10 6 * * *",
catchup=False
)
start = DummyOperator(
task_id='start',
dag=dag
)
end = DummyOperator(
task_id='end',
dag=dag
)
mysql_username = Variable.get("mysql_username")
mysql_password = Variable.get("mysql_password")
mysql_ip_port = Variable.get("mysql_ip_port")
vertica_username = Variable.get("vertica_username")
vertica_password = Variable.get("vertica_password")
vertica_ip_port = Variable.get("vertica_ip_port")
#s_table : 소스테이블
#t_table : 타겟테이블
#s_sql : 소스 sql
#t_sql : 타겟 sql
#t_befor_sql : 타겟 선행 sql
#t_after_sql1 : 타겟 후행 1 sql
#t_after_sql2 : 타겟 후행 2 sql
s_sql = """
select col1, col2, col3 from test_table_mm where basis_mm = TO_CHAR(CURRENT_DATE - INTERVAL '2 DAY', 'YYYYMM')::INTEGER
"""
t_table = "test_table_mm"
def etl_job():
mysql_engine = create_engine(f'mysql+pymysql://{mysql_username}:{mysql_password}@{mysql_ip_port}/{my_db}',use_batch_mode=True )
c = mysql_engine.connect()
vertica_engine = create_engine(
f"vertica+vertica_python://{vertica_username}:{vertica_password}@{vertica_ip_port}/{vertica_db}", query_cache_size=0
)
for dataframes in pd.read_sql(s_sql, vertica_engine, chunksize=10000): ## 10000건식 짤라 넣는다
print(
f"data frame rows = w/{len(dataframes)} rows"
)
dataframes.to_sql(t_table, con=mysql_engine, if_exists='append', index=False)
tasks = []
i = 0
aa= PythonOperator(
task_id ="test_my",
python_callable=etl_job,
op_kwargs={},
dag=dag,
)
실행 해보면 잘 들어간다. 1000건씩 잘라넣어서 메모리로 죽거나 하지 않는다.
아 일주일 날렸네 ㅡ.ㅡ;
'GCP' 카테고리의 다른 글
CGP composer(Airflow) 알림 teams로 받기 (0) | 2023.08.30 |
---|