1. DAG 생성 (Bash operator)
2. Task의 수행 주체
예시 코드를 통해 실습해보자.
> sudo docker compose up
웹 페이지 열고
http://localhost:8080/
example_bash_operator를 열자
from __future__ import annotations
import datetime
import pendulum
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="dags_bash_operator", # Dag이름, DAG 파일명과 Dag id는 일치시키자
schedule="0 0 * * *", # 5개 항목, 분, 시, 일, 월, 요일
start_date=pendulum.datetime(2024, 9, 23, tz="Asia/Seoul"),
catchup=False, # False: start_date 설정에서 현재 시간 누락된 이미 지난 시간의 스케쥴은 run하지 않음
dagrun_timeout=datetime.timedelta(minutes=60), # 60분이상 수행못하면 실패
tags=["example", "example2"], # DAG의 태그를 걸어줌
params={"example_key": "example_value"}, # task들에게 전달해줄 공통된 파라미터
) as dag:
# [START howto_operator_bash]
# task 객체 명
bash_t1 = BashOperator(
task_id="bash_t1", # task id로 task id도 task 객체명과 동일하게 두자
bash_command="echo whoami", # task를 통해 어떤 쉘 스크립트를 수행할 것이냐
)
bash_t2 = BashOperator(
task_id="bash_t2",
bash_command="echo $HOSTNAME",
)
bash_t1 >> bash_t2 # bash_t1을 수행하고 bash_t2를 수행하자
다음으로 git 업로드
WSL에서 git pull하기
nano로 docker-copiose.yaml을 열어보자.
> nano docker-compose.yaml
volumes 항목은 WSL의 볼륨과 연결해줄 컨테이너 디렉토리 부분을 매핑해주는 요소
:을 기준으로
왼쪽이 WSL의 볼륨,
오른쪽이 컨테이너의 볼륨
${AIRFLOW_PROJ_DIR:-.}게 무슨 뜻인지 확인해보자.
AIRFLOW_PROJ_DIR 값이 있으면 출력하고 그 값이 없으면 점을 출력
그럼 ./dags:/opt/airflow/dags로 연결시켜라
./ 는
를 의미
즉
도커 컨테이너 /opt/airflow/dags가 ./dags와 연결되어 있음을 의미
우리는 git 폴더인 airflow의 dags와 연동되기를 원함
아래와 같이 수정하고 저장
지금 Airflow 서비스가 실행되고 있기 때문에, Airflow 서비스를 먼저 내리고 다시 올린다.
> sudo docker compose down
> sudo docker compose up
꽤 걸림
웹페이지에서 http://localhost:8080/를 다시 확인해 보면
Pause된 상태로 올라오니 Unpause 시켜준다.
이렇게 초록색이 떠야 성공적으로 실행되는것이다.
Logs를 보면 whoami가 뜬것을 볼 수 있다.
bash_t2도 확인
여기서 HOSTNAME으로 output된 값이 뭘까?
에어플로우에서는 실제 TASK를 처리하는 요소는 worker이다.
그리고 우리가 에어플로우 설치를 한 방식이 컨테이너 방식으로 설치하였기 때문에
worker 컨테이너가 실제 task를 처리한다.
위와 같이 실제 도커의 worker-1 컨테이너에 들어가서 HOSTNAME을 확인하면 airflow를 통해 task가 수행하여 보인 bash_t2의 output 값이 보인다.