AirFlow Summary
AirFlow
- 작업 흐름을 실행, 관리하는 오픈소스 플랫폼. 작업을 스케쥴링을 하는 역할을 한다.
- DAG(Directed Acyclic Graph, 비순환 방향 그래프)로 각 배치 스케쥴이 관리되며, DAG 하위에는 고유한 여러 Task들이 존재한다.
1. Webserver
- Web UI 제공
- Workflow의 전반적인 조작 및 기록 관리
2. Scheduler
- DAG 가 저장된 디렉토리(DAG Bag)를 주기적으로 스캔하여 DAG를 최신화함 - 스케쥴에 따라서 DAG 에 정의된 Task 들을 순차적으로 실행
- Task 를 실행할 Worker를 지정하여 작업을 명령
3. MetaDB
- DAG 정보, 스케쥴링 정보, 작업 실행 결과 등이 저장
- SQLite, PostgreSQL, MySQL 등의 DB 를 지원
4. Worker
- Airflow의 Task 가 실행되는 환경
- Executor 설정에 의하여 Local 에서 실행되거나, 별도의 노드 또는 컨테이너에서 실행됨
5. DAGs
- Task 간의 관계와 순서를 정의, Task 는 Operator 로 구현됨
출처: https://isthecj.tistory.com/78
특징 : 1) Airflow DAG 코드에서 catchup 파라미터가 True이면 지정된 start_date부터 현재까지 밀린 job들에 대해 모두 수행하는 것을 말한다.
2) start_date를 지정해도 실제 작동이 되는 것은 그 다음날이다. ex) start_date = 2022/04/11 → 실제 실행은 2022/04/12. execution_date는 2022/04/11로 뜬다. (start_date = execution_date이며, 실제 실행은 start_date + 1 이라고 할 수 있다. daily라면 하루 뒤, hourly라면 1시간 뒤)
예제
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
dag_id = 'my_first_dag',
start_date = datetime(2021,8,26),
catchup=False,
tags=['example'],
schedule_interval = '0 2 * * *')
def print_hello():
print("hello!")
return "hello!"
def print_goodbye():
print("goodbye!")
return "goodbye!"
print_hello = PythonOperator(
task_id = 'print_hello',
#python_callable param points to the function you want to run
python_callable = print_hello,
#dag param points to the DAG that this task is a part of
dag = dag)
print_goodbye = PythonOperator(
task_id = 'print_goodbye',
python_callable = print_goodbye,
dag = dag)
#Assign the order of the tasks in our DAG
print_hello >> print_goodbye
This post is licensed under CC BY 4.0 by the author.