출처 : https://if.kakao.com/2019/program?sessionId=de3ff829-ac4c-4090-9ea1-046df55429a0
if(kakao)2021
함께 나아가는 더 나은 세상
if.kakao.com
먼저 Airflow와 같은 workflow management tool이 왜 필요한지에 대해 살펴보고, 유사한 기능을 하는 Oozie, Azkaban, AWS Data Pipeline 등과 장단점을 비교하여 카카오페이지가 Airflow를 선택한 이유에 대해 설명한다. 그리고 Airflow에 익숙하지 않은 데이터 엔지니어를 위해 주요 개념 및 구조에 대해서 설명하고, 기본적인 활용법을 소개한다. 또한 해당 플랫폼의 가용성을 높게 유지하려면 어떠한 구성을 취하면 되는지 살펴본다. 마지막으로, Airflow를 활용하는 카카오페이지의 고유한 방식에 대하여 소개를 하고 끝을 맺는다.
리뷰 포인트
- 카카오페이지가 Airflow를 선택한 이유
- Airflow 소개
- 카카오페이지에서 Airflow를 활용하는 방법

카카오페이지가 Airflow를 선택한 이유
- 카카오페이지의 데이터 분석 문제점과 해결방안
- 웹툰/웹소설을 주력 콘텐츠로 제공하는 카카오페이지가 성장하면서 열람건수가 증가하고 있다. 이러한 성장은 데이터 분석을 어렵게 만든다. -> 데이터의 양 증가
- 카카오에는 카카오페이지 뿐만아니라 다음웹툰(현 카카오웹툰) 서비스를 제공한다. 두 서비스의 내용은 매우 유사하지만 각각을 구현하는 개발 조직(회원, 결재, 콘텐츠 관리, 운영 도구, 정산, 광고, 데이터플랫폼)은 분화되어 MSA(Micro Service Architecture)로 운영되고 있고, 각 조직마다 DB들(회원DB, 결재DB, 카카오페이지DB, 다음웹툰DB 등)을 따로 관리 하기 때문에 데이터 분석은 어려워 진다. -> 서로 다른 DB간 Join(cross database join)이 필요함
- 해결 방법 1 - Data Lake
- 여러 DB에 저장된 엄청난 양의 데이터를 모두 싸고, 확장성이 좋고, 안정성이 뛰어난 분산 파일 시스템인 하둡 HDFS 혹은 S3에 모아서 적재, 변환(가공)해 저장한다.
- 데이터 분석가는 Hive, Spark, Impala, Presto 같은 분산형 쿼리/분석 엔진을 사용해 데이터를 분석한다.
- 이렇게 하면 매우 많은 양의 데이터를 cross database join 없이 분석할 수 있다.
- Data Lake를 위해서는 여러 DB에서 데이터를 적재하고, 적재가 완료되면 변환(가공)한다. 이러한 Workflow를 어떻게 제어할 것인가? -> workflow management가 필요함
- 해결 방법 2- workflow management system 도입
- 어떤 workflow management system을 사용할 것인가?
- workflow management system 비교
- Azkaban
- Azkaban에서 정의한 문법대로 Flow(워크플로우) 파일을 만들고 이 파일을 압축하여 Azkaban에 올려서 워크플로우를 실행 -> 거추장스럽다. 귀찮다
- LUigi
- 작업을 Triggering하는 기능을 지원하지 않고 외부 시스템(ex. crontab 등)에 의존해야 한다.
- oozie
- Contributions (2019년 기준)
- First Commit - 2011.08.09
- Commit 수 - 2311개
- Contributor 수 - 17명
- Release 수 - 46번
- Features
- DAG 표현 방식 - XML
- GUI - Hue or Eclipse Plugin
- REST API - Web Service API
- Availability - Highly Available
- Contributions (2019년 기준)
- Airflow
- Contributions (2019년 기준)
- First Commit - 2014.10.06
- Commit 수 - 6631개
- Contributor 수 - 900명
- Release 수 - 116번
- Features
- DAG 표현 방식 - Python Code - 표현이 간결하고, 표현의 자유도가 높다.
- GUI - Rich UI
- REST API - Experimental
- Availability - SPoF(Single Point of Failure, 단일 장애점): Scheduler
- Contributions (2019년 기준)
- Azkaban
- 해결 방법 3 - Airflow를 선택 (그림 1)
- Contributions이 많다 -> 앞으로 차이가 더 벌어질 것이다.
- DAG 표현 방식이 더 간결하고 자유도가 높다
- 직관적인 User Interface
- 뛰어난 확장성
- 사전 정의된 Task가 풍부
- 웹툰/웹소설을 주력 콘텐츠로 제공하는 카카오페이지가 성장하면서 열람건수가 증가하고 있다. 이러한 성장은 데이터 분석을 어렵게 만든다. -> 데이터의 양 증가

Airflow 소개
- 용어
- 작업(Task) : 수집, 정제, 지표 생성, 리포팅 등 여러가지 처리해야 하는 일로 어떤 작업은 다른 작업이 끝나야 수행될 수 있다. 즉 어떤 작업은 다른 작업에 의존성을 가지고 있다.
- 워크플로우(workflow) == DAG(Directed Acyclic Graph) : 워크플로우는 수행해야 하는 모든 작업(node)의 의존성(edge)을 모두 표현한 작업흐름(graph)이다. 모든 작업은 그래프에 따라 순차 실행되고, 모든 작업은 ASAP(최대한 빨리)으로 종료된다.
- 실패 재시도 : 워크플로우 실행 도중 어떤 작업이 실패하면 실패한 작업은 다시 실행된다. 이때 연관성 없는 작업은 큰 영향없이 계속 실행되고, 연관성 있는 후속작업은 실패한 작업이 재실행되어 성공한 후 실행된다.
- 실행 주기 : 어떤 워크플로우를 얼마의 시간마다 실행할 것인가?
- 작업(Task) : 수집, 정제, 지표 생성, 리포팅 등 여러가지 처리해야 하는 일로 어떤 작업은 다른 작업이 끝나야 수행될 수 있다. 즉 어떤 작업은 다른 작업에 의존성을 가지고 있다.
- Airflow 시연 (19:15 ~ 24:20)
- Aiflow Architecture (그림 2)
- Webserver
- Airflow 모니터링, 조작 등을 위한 Web 페이지 제공
- Scheduler
- 워크플로우(DAG)를 실행 주기마다 실행
- 실행 주기가 되면 DAG안에 있는 작업(Task)에 대해 Meta DB에서 실행 가능 상태로 변경. 이 때 의존하는 작업이 모두 실행된 작업만 상태 변경
- Executor
- Meta DB를 주기적으로 확인하여 실행 가능 상태가된 작업(Task)를 Broker에 push
- Broker에서 작업을 받아서 해당 작업을 수행하는 Worker를 실행함
- Broker
- 실행가능한(=의존성이 만족된) 작업들이 들어가는 공간
- Worker
- 실제 작업을 실행하는 주체
- Task를 감싸는 Operator의 종류에 따라 아무것도 안할 수도 있고(DummyOperator), Bash 명령어를 수행해 Task를 실행할 수도 있고(BashOperator), 파이썬을 사용해 Task를 실행할 수도 있다.(PythonOperator)
- Meta DB
- DAG, Task 등이 정의 되어 있음
- DAG Run, Task Instance 등 워크플로우와 작업의 상태(존재,실행,종료 등)와 기록을 관리함
- Webserver
- Airflow의 취약점과 해결방법 (2019년 기준)
- Scheduler, Broker, MetaDB는 각각 하나의 프로세스이기 때문에 셋 중 하나라도 장애가 발생하면 Airflow 전체에 장애가 발생한다. SPoF 문제
- 해결 방법
- Broker
- Task를 외부 메시지 브로커(RabbitMQ or redis or ...)로 비동기 전송하는 Celery Executor를 사용하여 가용성이 높은 외부 메시지 브로커를 사용
- MetaDB
- DB를 중복(redundancy)구성하여 장애 발생 시 대처할 수 있게함
- Scheduler
- Airflow 2.0기준 여러 스케줄러를 실행할 수 있기 때문에 SPoF 문제가 해결됨
- Airflow 2.0 이전에는 Failover Controller를 둬서 메인 스케줄러에 장에가 나면 스탠바이 스케줄러를 동작 시킴. 공식 지원은 아니기 때문에 직접 작성해야 함
- Broker
카카오페이지에서 Airflow를 활용하는 방법
- Data Lake 데이터 수집 문제
- Data Lake를 구성하기 위해 여러 DB에서 처리되고 있는 데이터를 실행 주기마다 수집하여 하둡 HDFS로 저장한다.
- 문제는 DB안에 테이블 개수가 너무 많다. 어떤 한 DB에 존재하는 테이블의 수가 350개라면 1분당 한 테이블씩 수집해도 6시간이 걸린다. 각 테이블을 병렬로 동시에 수집하면 DB에 부하가 너무 커져서 서비스에 문제가 생길 수 있다. -> DB에 존재하는 데이터를 그냥 수집하면 시간이 오래걸리거나 부하가 크다.
- 해결 방법 - Airflow의 Pool 과 Priority Weight 활용
- Pool : 동시성의 한계치를 제어
- 만약 테이블 5개를 동시에 수집하고 싶다면 각 테이블을 수집하는 작업를 정의하고, Airflow Web Console에서 수집 작업들을 포함하는 Pool을 생성하고 Slots 값을 5를 주면 Pool에 포함된 작업들 중에서 5개씩 동시에 실행할 수 있다.
- 카카오페이지에서는 먼저 Table 용량에 따라 Pool을 분류한다. 용량이 작은 테이블을 수집하는 작업은 small_tables Pool에 모아두고 Slots 값을 3~5정도로 높게 설정하고, 용량이 큰 테이블을 수집하는 작업은 large_tables Pool에 모아두고 Slots 값을 1정도로 낮게 설정한다.
- Priority Weight : Pool 안에서 우선순위(Weight)에 따라 실행 순서를 제어
- 기본적으로는 의존하는 작업이 많은 작업의 우선순위(Weight)가 높지만, DAG에서 Task를 정의할 때 우선순위를 변경할 수 있음
- 카카오페이지에서 small_tables Pool에 있는 작업들 중 중요한 테이블을 수집하는 작업은 우선순위를 높게 잡는다.
- Pool : 동시성의 한계치를 제어
- Worker 역할 분리 문제
- (그림 1)을 보면 데이터 분석가는 Impala, Spark, R을 활용해 Data Lake에 존재하는 데이터를 분석한다.
- 워크플로우는 크게 "Data Lake 구성 Task들 -> 데이터 분석 애플리케이션 Task"가 될 것이다.
- 위에서 가용성을 위해 Celery Executor를 사용했다. Celery Executor의 특징은 브로커로 전송된 Task를 N대의 Worker Machine(이하 Worker 클러스터)에서 동작하는 Celery Worker가 읽어서 실행한다.
- 이 과정에서 Impala 애플리케이션, Spark 애플리케이션, R 애플리케이션이 하나의 Worker가 되어 Worker 클러스터에서 돌아가게 되면 R 애플리케이션을 실행시키는 Worker가 동작할 때 Worker 클러스터의 CPU와 IO를 많이 사용하기 때문에 부하가 발생하고 이 때문에 다른 Worker들이 동작할 때 영향을 줄 수 있다. 하지만 Impala 애플리케이션과 Spark 애플리케이션의 경우 분석 연산이 하둡에서 일어나기 때문에 애플리케이션 자체의 부하는 적다. -> Worker 클러스터 자원을 효율적으로 사용할 수 없다.
- 또 다른 문제는 보안이다. 하나의 Worker 클러스터에서 어떤 Worker는 보안상 민감한 데이터를 다루고 어떤 Worker는 외부 시스템과 통신을 한다면, 공격자가 외부 시스템과 통신하는 Worker를 타고 민감한 데이터에 접근할 수 있다. -> 하나의 Worker 클러스터는 환경을 공유하기 때문에 민감한 정보가 노출될 위험 있다.
- 해결 방법 - Worker 역할 분리
- Task를 정의할 때 Task를 처리하는 브로커(queue)를 다른 값으로 설정하면 다른 브로커에서 처리할 수 있고 Worker 클러스터도 달라진다.
- (celery 활용 시) 구현 방법 (?)
- step 1. airflow.cfg 파일을 수정하여 default_queue 옵션에 queue 이름을 지정한다.
- step 2. Worker 실행시에 '-q' 옵션과 같이 실행한다.
- step 3. Task 정의 시 queue 파라메터를 지정한다.
- 너무 거대한 DAG
- 구현을 하다보면 DAG가 매우 거대해질 수 있다. 이 경우 DAG 속 각 Task간 의존관계를 살펴보기 어렵고, DAG 실행에 실패한 경우 재실행하는 것이 비효율적이다.
- 해결 방법 - Let it be + ExternalTaskSensor (그림 3 참조)
- 일단 이미 만들어진 매우 거대한 DAG는 그냥 두고, 해당 DAG에 새로운 의존성을 가지는 Task를 추가해야 하는 경우 다른 DAG를 구성하고 ExternalTaskSensor를 사용한다.
- ExternalTaskSensor를 사용하면 DAG는 분리하되 의존성은 유지할 수 있다.
- Sensor : 특정 조건이 채워지기를 체크 주기(Poke_invterval)마다 확인하면서 대기 시간(Timeout)동안 기다리는 Operator
- ExternalTaskSensor : 내 DAG 안에서 외부에 있는 다른 DAG 속 한 Task의 상태(실행 전/중/성공/실패)를 체크할 수 있다.
- ExternalTaskSensor를 사용하면 DAG는 분리하되 의존성은 유지할 수 있다.
- 일단 이미 만들어진 매우 거대한 DAG는 그냥 두고, 해당 DAG에 새로운 의존성을 가지는 Task를 추가해야 하는 경우 다른 DAG를 구성하고 ExternalTaskSensor를 사용한다.

마무리
Data Lake를 구성하는 파이프라인을 위해 워크플로우 매니지 먼트를 위한 시스템을 선택하는 과정과 Airflow의 문제점과 해결방법을 공부할 수 있었다.