일시적으로 회사 업무의 변동이 있었습니다. 그래서 프론트 개발은 잠시 미뤄두고 airflow를 사용한 데이터 처리를 시도하고 있습니다.
평소에 자주 들었던 airflow를 처음 사용해봤습니다.
생소한 용어가 많이 처음에는 살짝 혼란스러웠지만 익숙해지니 매우 직관적이고 간단하게 script만 작성하면 쉽게 사용할 수 있었습니다.
오늘은 가볍게 dag를 생성하고 몇 가지 operator를 사용해봤습니다.
from airflow.models import DAG
from airflow.operators.mysql_operator import MySqlOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
from sqlalchemy import create_engine
import json
import os
import pandas as pd
default_args = {
'start_date': datetime(2021, 1, 1),
}
def _processing_user(ti):
users = ti.xcom_pull(task_ids=['extracting_user'])
if not len(users) or 'results' not in users[0]:
raise ValueError('User is empty')
user = users[0]['results'][0]
processed_user = pd.json_normalize({
'firstname': user['name']['first'],
'lastname': user['name']['last'],
'country': user['location']['country'],
'username': user['login']['username'],
'password': user['login']['password'],
'email': user['email']
})
processed_user.to_csv('/tmp/processed_user.csv', index=None, header=False)
def _csvToSql():
try:
engine = create_engine("mysql+pymysql://{user}:{password}@{host}/{database}"
.format(user=os.environ["external_raw_data_mysql_user"],
password=os.environ["external_raw_data_mysql_password"],
host=os.environ["external_raw_data_mysql_host"],
database="airflow_test"),
encoding='utf-8')
conn = engine.connect()
except:
print('Cannot connect.')
df = pd.read_csv('/tmp/processed_user.csv',
names=['firstname', 'lastname',
'country', 'username', 'password', 'email'])
print(df.head())
df.to_sql(name='users', con=engine, if_exists='append', index=False)
conn.close()
return 'Read .csv and written to the MySQL database'
with DAG('user_processing', schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
# define tasks/operators
create_table = MySqlOperator(
task_id='creating_table',
mysql_conn_id='local_mysql_default',
sql='''
CREATE TABLE IF NOT EXISTS users (
email VARCHAR(255) NOT NULL,
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
PRIMARY KEY (email)
);
''',
)
is_api_available = HttpSensor(
task_id='is_api_available',
http_conn_id='user_api',
endpoint='api/'
)
extracting_user = SimpleHttpOperator(
task_id='extracting_user',
http_conn_id='user_api',
endpoint='api/',
method='GET',
response_filter=lambda response: json.loads(response.text),
log_response=True
)
processing_user = PythonOperator(
task_id='processing_user',
python_callable=_processing_user
)
storing_user = PythonOperator(
task_id='storing_user',
python_callable=_csvToSql
)
create_table >> is_api_available >> extracting_user >> processing_user >> storing_user
task를 병렬로 실행할 수도 있습니다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
default_args = {
'start_date': datetime(2021, 1, 1)
}
with DAG('parallel_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
task_1 = BashOperator(
task_id='task_1',
bash_command='sleep 3'
)
task_2 = BashOperator(
task_id='task_2',
bash_command='sleep 3'
)
task_3 = BashOperator(
task_id='task_3',
bash_command='sleep 3'
)
task_4 = BashOperator(
task_id='task_4',
bash_command='sleep 3'
)
task_1 >> [task_2, task_3] >> task_4
sqlite로는 task를 병렬로 실행할 수 없습니다. 그래서 MySQL이나 Postgres 같은 DB가 필요합니다.
이번 테스트에는 MySQL을 사용했습니다.
pip로 관련 package를 설치합니다.
pip install 'apache-airflow[mysql]'
airflow.cfg에서 sqlclchemyconn, executor를 수정합니다.
sql_alchemy_conn = mysql://[DB_USER]:[DB_PASSWORD]@127.0.0.1:3306/[DB_DATABASE]
executor = LocalExecutor
connector를 수정했다면 airflow db init
을 다시 실행해야 합니다.
executor를 수정했다면 airflow scheduler
를 다시 실행해야 합니다.
내일은 오늘 한 실습을 토대로 DB to DB ETL 작업을 시도할 예정입니다.