[Airflow 시리즈 1] Airflow 찐초보의 airflow 시작하기

 

이 글을 읽으면 좋은 대상

  1. Airflow 개념을 처음 접하는 분
  2. Airflow 사용자
    • 사용자: 데이터마트 적재나 머신러닝 모델 돌릴 때 Airflow를 사용하려는 분 (분석가)
    • 비사용자: Airflow를 회사 환경에 도입/운영하려는 데이터 엔지니어 (관리자)

Airflow를 배우려는 이유

  • Airflow는 ETL, 머신러닝 등의 업무자동화에 있어 가장 범용적으로 사용되는 도구(?)
  • 현재 업무에는 Airflow가 도입되지 않아 사용할 일이 없지만 업무에 머신러닝 적용할 일 없다고 안 해보면 앞으로도 할 가능성이 없는 것처럼 Airflow도 미리 배워서 간단한 작업을 해보고 싶었음

Airflow 간단한 설명

  • Python 기반으로 워크플로우 설정 가능
    • 워크플로우를 만드는 코드가 Python인 것이지 워크플로우를 통해 실행하는 모든 코드가 py 파일일 필요는 없음
  • DAG(Directed Acyclic Graph) 기반으로 워크플로우 실행
    • 아래 그림 같이 방향성 있으면서 순환하지 않는 구조가 DAG 구조 (출처: 위키피디아)

  • 워크플로우를 설정해놓으면 Web UI를 통해 간편히 모니터링 및 실행 가능

    메인 모니터링 페이지
    Tree뷰
    Graph뷰
  • 다양한 Operator의 존재로 다양한 업무 수행 가능
    • BashOperator : git bash처럼 사용 가능 (bash가 안 익숙한 분들은 cmd나 powershell, terminal을 생각하시면 될 듯합니다)
      또한 echo ~ 통해 print처럼 로그에 원하는 문구를 출력할 수 있음
    • PythonOperator : 주로 파이썬 함수를 짜고 이를 활용할 때 사용
    • EmailOperator : email 보낼 때 사용
    • SlackOperator : slack 메시지 보낼 때 사용
    • 더 많은 Operator는 공홈 참고
  • crontab 등 다양한 스케줄 설정 통해 스케줄링
  • 워크플로우 기반으로 fail된 task가 있으면 해당 task뿐 아니라 task의 downstream을 재실행 가능

Airflow 핵심 용어들

위 간단한 설명에서 워크플로우? task? 같이 생각하신 분들을 위해 핵심 용어들만 설명하겠습니다

  • DAG : DAG 구조를 의미하기도 하며 하나의 DAG구조로 된 스케줄을 DAG라고 부름. 워크플로우 단어와 동일하게 사용
  • Task : DAG에 속한 하나하나의 노드를 Task로 부름
  • Upstream (Downstream) : 워크플로우 기준 A -> B 구조일 때 A는 B의 Upstream, 반대로 B는 A의 Downstream
  • start_date : DAG의 시작일자. DAG를 스케줄에 등록한 일자와 별개로 2021-08-15에 스케줄 등록을 해도, start_date가 2021-08-01이면 8월1일이 시작일자. 이 경우 스케줄 등록 시점 이전 14일이 진행되지 않았기에 이때부터 스케줄이 돌아감 (이런 액션을 backfill이라 부름. 원치 않으면 DAG 설정할 때 catchup=False로 설정)
    반대로 start_date가 2021-09-01이면 스케줄 등록해도 돌지 않음
  • end_date : DAG의 종료일자
  • schedule_interval : 스케줄이 돌아가는 텀. @daily, timedelta(days=1), crontab(‘0 8 * * *‘)과 같이 설정 가능 (crontab 형태가 제일 편한 듯? crontab 예제 사이트)
  • Trigger DAG : 스케줄 시간 아니라도 지금 DAG 실행시키라는 명령
  • Clear : DAG task 중 특정 task에 clear를 누르면 해당 task와 downstream들이 재실행 됨 (fail된 작업 재실행 할 때 유용)
  • Retry : DAG 작업이 제대로 안 됐을 때 설정한 시간 후에 설정한 횟수만큼 재실행 돌아감. Retry까지 다 실패해야 fail

Airflow 흐름도

Airflow는 위 흐름도에 따라 움직입니다 (출처: 유튜브 coder2j 채널)


Airflow를 직접 해봤습니다

Airflow를 사용하려면 로컬에 직접 설치, 도커 사용, 클라우드 사용 3가지 방법이 있는데 개인적 편의성을 생각해 클라우드 사용을 선택했습니다. Airflow는 AWS, GCP에서 모두 사용 가능하며 저는 GCP Airflow인 Composer를 사용했습니다. Composer 설정은 참고자료를 보시면 좋을 것 같습니다.

first_airflow.py 파일을 만들고 GCP Cloud Storage 버킷 > dags/ 폴더에 업로드
하면?


Airflow 웹에서 에러가 뜹니다

에러 메시지를 확인해줍니다

first_airflow.py에서 해결하고 재업로드하면서 에러가 안 뜰때까지 수정해주면 저희가 등록하고 싶었던 DAG가 나타납니다
my_first_dag에서 빨간색(fail)이 뜨는 이유는 제가 업로드한 날짜보다 start_date가 6일 전이었기 때문입니다. 위에서 설명한 backfill 개념을 다시 확인해주세요
여기서 DAG의 실행버튼을 눌러주면 Trigger DAG와 Trigger DAG w/ config가 뜨는데 전자와 후자의 차이는 커스텀 값을 입력받는지 여부입니다. 아무튼 눌러주시면 스케줄과 상관없이 실행됩니다
이렇게 실행하면 위에서 보여드린 Tree뷰, Graph뷰로 DAG 및 Task 상황을 확인할 수 있습니다

개별 Task에 대해 작업을 할 때는 해당 노드를 클릭해서 나오는 아래와 같은 창을 띄우면 됩니다

여기서 Log를 누르면 해당 작업에 대한 로그 메시지를 확인 할 수 있고, Run을 누르면 실행, Clear를 누르면 설정에 따라 관련 작업들을 모두 재실행하게 되는 방식입니다 (그 외에 여러 기능이 있지만 패스…)

echo hello world! 수행하게 한 BashOperator의 로그 메시지 확인

fail된 Task가 있는 DAG가 있으면 아래와 같은 화면을 볼 수 있습니다

Clear 누르면 나타나는 화면인데, fail된 Task와 Downstream의 Task를 재실행하겠다고 하네요


UTC 대신 KTC로 스케줄하기

이렇게 한 사이클의 DAG를 만들어 실행시켜봤습니다. 그런데 Airflow를 이용하면 불편한 점이 UTC 기준으로 시간 설정이 된다는 점입니다. 즉 한국시간과 9시간 차이가 나죠.
crontab 설정을 ‘0 9 * * *‘로 했을 때 오전 9시에 돌아야 하는데 오후 6시에 도는 상황이 된다면 불편하겠죠.
아래와 같이 start_date를 수정해주시면 우리가 원하는 오전 9시에 돌게 할 수 있습니다.

import pendulum

# KST 시간
kst = pendulum.timezone("Asia/Seoul")

with DAG(
    dag_id='my_first_dag', # dag_id는 공백 없이
    default_args=default_args,
    description='my first dag with airflow!',
    start_date=datetime(2021, 8, 8, tzinfo=kst),
    end_date=datetime(2021, 8, 25),
    schedule_interval='* 9 * * *'
) as dag:

설명을 하자면 start_date를 kst 기준으로 변경해서 입력되게 했습니다.
start_date만 바뀌었는데도 schedule_interval 시간도 바뀐게 의아하지만 이는 실행시간(excution_date)이 start_date에 schedule_interval을 꾸준히 더해나가는 방식이기 때문이라고 이해하시면 됩니다.


Airflow 쌩초보가 Airflow를 공부하고 GCP환경에서 실행하는 과정 + 미세팁을 정리해봤습니다.
추가적으로 활용해보면서 Airflow 시리즈 2탄도 쓸 수 있도록 노력해보겠습니다.
읽어주셔서 감사합니다.