새소식

이것저것 개발노트

[AirFlow 공부하기] [2.오퍼레이터 기본] Bash operator DAG 만들기, DAG 디렉토리 세팅

  • -

1. DAG 생성 (Bash operator)
2. Task의 수행 주체

 

 

 

예시 코드를 통해 실습해보자.

 

 

 

> sudo docker compose up 

웹 페이지 열고 

http://localhost:8080/

 

 

example_bash_operator를 열자 

 

 

from __future__ import annotations

import datetime
import pendulum

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

with DAG(
    dag_id="dags_bash_operator",                             # Dag이름, DAG 파일명과 Dag id는 일치시키자 
    schedule="0 0 * * *",                                    # 5개 항목, 분, 시, 일, 월, 요일 
    start_date=pendulum.datetime(2024, 9, 23, tz="Asia/Seoul"),
    catchup=False,                                          # False: start_date 설정에서 현재 시간 누락된 이미 지난 시간의 스케쥴은 run하지 않음 
    dagrun_timeout=datetime.timedelta(minutes=60),          # 60분이상 수행못하면 실패 
    tags=["example", "example2"],                           # DAG의 태그를 걸어줌 
    params={"example_key": "example_value"},                # task들에게 전달해줄 공통된 파라미터
) as dag:
    # [START howto_operator_bash]
    # task 객체 명 
    bash_t1 = BashOperator(
        task_id="bash_t1",                                       # task id로 task id도 task 객체명과 동일하게 두자 
        bash_command="echo whoami",                              # task를 통해 어떤 쉘 스크립트를 수행할 것이냐
    )
    
    bash_t2 = BashOperator(
        task_id="bash_t2",                                       
        bash_command="echo $HOSTNAME",                              
    )
    
    bash_t1 >> bash_t2                                              # bash_t1을 수행하고 bash_t2를 수행하자

 

다음으로 git 업로드 

 

WSL에서 git pull하기 

 

nano로 docker-copiose.yaml을 열어보자.

 

> nano docker-compose.yaml

 

volumes 항목은 WSL의 볼륨과 연결해줄 컨테이너 디렉토리 부분을 매핑해주는 요소 

 

:을 기준으로

왼쪽이 WSL의 볼륨, 

오른쪽이 컨테이너의 볼륨

 

${AIRFLOW_PROJ_DIR:-.}게 무슨 뜻인지 확인해보자. 

 

AIRFLOW_PROJ_DIR 값이 있으면 출력하고 그 값이 없으면 점을 출력

 

그럼 ./dags:/opt/airflow/dags로 연결시켜라 

./ 는 

를 의미 

즉 

도커 컨테이너 /opt/airflow/dags가 ./dags와 연결되어 있음을 의미 

 

우리는 git 폴더인 airflow의 dags와 연동되기를 원함 

아래와 같이 수정하고 저장 

 

지금 Airflow 서비스가 실행되고 있기 때문에, Airflow 서비스를 먼저 내리고 다시 올린다. 

> sudo docker compose down 

 

> sudo docker compose up 

 

꽤 걸림 

 

웹페이지에서 http://localhost:8080/를 다시 확인해 보면

 

 

Pause된 상태로 올라오니 Unpause 시켜준다. 

 

 

이렇게 초록색이 떠야 성공적으로 실행되는것이다. 

Logs를 보면 whoami가 뜬것을 볼 수 있다. 

 

bash_t2도 확인 

 

여기서 HOSTNAME으로 output된 값이 뭘까?

 

 

 

에어플로우에서는 실제 TASK를 처리하는 요소는 worker이다. 

그리고 우리가 에어플로우 설치를 한 방식이 컨테이너 방식으로 설치하였기 때문에

worker 컨테이너가 실제 task를 처리한다. 

 

위와 같이 실제 도커의 worker-1 컨테이너에 들어가서 HOSTNAME을 확인하면 airflow를 통해 task가 수행하여 보인 bash_t2의 output 값이 보인다. 

 

 

 

Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.