Post

AirFlow Summary

AirFlow

  • 작업 흐름을 실행, 관리하는 오픈소스 플랫폼. 작업을 스케쥴링을 하는 역할을 한다.
  • DAG(Directed Acyclic Graph, 비순환 방향 그래프)로 각 배치 스케쥴이 관리되며, DAG 하위에는 고유한 여러 Task들이 존재한다.

Untitled

1. Webserver 

  • Web UI 제공  
  • Workflow의 전반적인 조작 및 기록 관리

2. Scheduler 

  • DAG 가 저장된 디렉토리(DAG Bag)를 주기적으로 스캔하여 DAG를 최신화함  - 스케쥴에 따라서 DAG 에 정의된 Task 들을 순차적으로 실행   
  • Task 를 실행할 Worker를 지정하여 작업을 명령

3. MetaDB  

  • DAG 정보, 스케쥴링 정보, 작업 실행 결과 등이 저장 
  • SQLite, PostgreSQL, MySQL 등의 DB 를 지원

4. Worker  

  • Airflow의 Task 가 실행되는 환경 
  • Executor 설정에 의하여 Local 에서 실행되거나, 별도의 노드 또는 컨테이너에서 실행됨

5. DAGs  

  • Task 간의 관계와 순서를 정의, Task 는 Operator 로 구현됨

출처: https://isthecj.tistory.com/78

특징 : 1) Airflow DAG 코드에서 catchup 파라미터가 True이면 지정된 start_date부터 현재까지 밀린 job들에 대해 모두 수행하는 것을 말한다.

2) start_date를 지정해도 실제 작동이 되는 것은 그 다음날이다. ex) start_date = 2022/04/11 → 실제 실행은 2022/04/12. execution_date는 2022/04/11로 뜬다. (start_date = execution_date이며, 실제 실행은 start_date + 1 이라고 할 수 있다. daily라면 하루 뒤, hourly라면 1시간 뒤)

Untitled 1

예제

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

dag = DAG(
    dag_id = 'my_first_dag',
    start_date = datetime(2021,8,26),
    catchup=False,
    tags=['example'],
    schedule_interval = '0 2 * * *')

def print_hello():
    print("hello!")
    return "hello!"

def print_goodbye():
    print("goodbye!")
    return "goodbye!"

print_hello = PythonOperator(
    task_id = 'print_hello',
    #python_callable param points to the function you want to run 
    python_callable = print_hello,
    #dag param points to the DAG that this task is a part of
    dag = dag)

print_goodbye = PythonOperator(
    task_id = 'print_goodbye',
    python_callable = print_goodbye,
    dag = dag)

#Assign the order of the tasks in our DAG
print_hello >> print_goodbye
This post is licensed under CC BY 4.0 by the author.