Airflow cli로 데이터 백필하기(위해서 삽질한 과정들)

2023. 11. 22. 23:26카테고리 없음

정확히는 Airflow cli로 특정 task만 백필하기가 제목이다.

 

우리 팀은 Airflow로 대부분의 batch job을 스케줄링 하고있다. 매일 새벽에 수십 수백건의 테이블들이 전날 들어온 데이터들을 적재한다. 소스로부터 오는 데이터도 있지만, 데이터 사이언티스트분들이 구현하고 관리하는 mart들도 새벽 배치에서 적재되는데, 이 mart 테이블의 스키마가 변경되거나 쿼리 로직이 변경되는 일이 꽤 자주 있다. 그럴 때 해줘야하는 일은 1. catalog schema 변경 2. data backfill이다. 그 중 2번은 나같은 데이터 엔지니어의 역할인 경우가 많다.

 

우선, Airflow자체에서 backfill용 cli를 제공한다.

https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#backfill

해당 독스를 참고하면 된다. 웬만하면 `dry-run`을 먼저 하라고 추천하고싶다. 데이터 엔지니어는 항상.. 데이터 날림에 주의해야한다. 그런 불상사가 일어나면 절대로 안 된다. 늘 백업과 dry run을 진행해서, 보수적으로 데이터를 다루는 것을 추천한다.

 

내 환경은 docker환경이었기 때문에 airflow scheduler로 먼저 접근을 해줘야 했다. 모르는 사람들을 위해..

#airflow scheduler 컨테이너 아이디 확인
docker ps

#scheduler 컨테이너로 접속
docker exec -it {container_id} /bin/bash/

 

이렇게 하면 airflow scheduler컨테이너에 접근할 수 있다. airflow 공식문서를 보면 backfill command를 아래와 같이 안내한다.

 

airflow dags backfill -s {start_date} -e {end_date} dag_id

 

여러가지 옵션이 있는데 본인에게 필요한 옵션을 선택해서 command line을 만들면 된다.

 

위 옵션중에 필요한 걸 사용하면 된다. 잘 안 보이면 공식 docs나 airflow dags backfill -h 명령어로 확인 해보면 된다. 나 같은 경우는 mart라는 이름의 dag에 tableau_bank_loan_bk라는 specific한 태스크만 백필하고 싶었다. 그래서 처음 트라이했던 command는 다음과 같다. 

  • dag id: mart
  • task id: tableau_bank_loan_bk    <-- 얘만 백필하고 싶었음
airflow dags backfill -n -t tableau_bank_loan_bk -s 2023-11-01 -e 2023-11-02 mart

 

근데 실패했다. 어떻게 실패했냐면 mart dag에 있는 수십개의 task가 모두 백필되는 현상이 있었다. 아니, -t 옵션으로 task_id를 명시해줬는데 왜? 옵션들의 위치를 바꿔보기도 하고 쿼테이션을 붙여보기도 하고 여러가지 시도를 하다가 airflow 슬랙에 당장 가입했다 ㅋㅋㅋ 그리고 질문함. 

 

내가 웰시코기임

 

이 때 약간 열받았고 흥분해서(성질;ㅎㅎ) 영어를 막 갈겼더니 외국인이 질문의 의도를 다시 물어봤다. 머쓱;; 저 Macari Saura Lopez라는 착한 분이 자기가 한 번 해보고 말해주겠다고 선뜻 나서주셨다. 결국 해결을 했는데 두 가지 옵션을 더 붙여야했다.

 

airflow dags backfill -n \
		      --disable-retry \
                      --ignore-dependencies \
                      --task-regex {task_id} \
                      -s {YYYY-MM-DD} \
                      -e {YYYY-MM-DD} \
                      {DAG ID}

 

-disable-retry와 --ignore-dependencies를 붙여주니 잘 작동했다. 요즘 슬랙이 잘 발달해서, 어떤 오픈소스를 쓰더라도 커뮤니티에 바로바로 질문할 수 있는 점이 좋은 것 같다. 그러고보니 chat gpt를 써 볼 생각을 못했네. 다음에 써봐야지..

어쨌든 특정 task의 백필은 이렇게 하면 된다.