본문 바로가기

Review

[if(kakao) dev 2019] Airflow를 활용하여 아름다운 데이터 파이프라인 구성하기 Review

출처 : https://if.kakao.com/2019/program?sessionId=de3ff829-ac4c-4090-9ea1-046df55429a0 

 

if(kakao)2021

함께 나아가는 더 나은 세상

if.kakao.com

먼저 Airflow와 같은 workflow management tool이 왜 필요한지에 대해 살펴보고, 유사한 기능을 하는 Oozie, Azkaban, AWS Data Pipeline 등과 장단점을 비교하여 카카오페이지가 Airflow를 선택한 이유에 대해 설명한다. 그리고 Airflow에 익숙하지 않은 데이터 엔지니어를 위해 주요 개념 및 구조에 대해서 설명하고, 기본적인 활용법을 소개한다. 또한 해당 플랫폼의 가용성을 높게 유지하려면 어떠한 구성을 취하면 되는지 살펴본다. 마지막으로, Airflow를 활용하는 카카오페이지의 고유한 방식에 대하여 소개를 하고 끝을 맺는다.

 

리뷰 포인트

  • 카카오페이지가 Airflow를 선택한 이유
  • Airflow 소개
  • 카카오페이지에서 Airflow를 활용하는 방법

 

 

[그림 1] 카카오페이지의 데이터 분석 과정

카카오페이지가 Airflow를 선택한 이유

  • 카카오페이지의 데이터 분석 문제점과 해결방안
    • 웹툰/웹소설을 주력 콘텐츠로 제공하는 카카오페이지가 성장하면서 열람건수가 증가하고 있다. 이러한 성장은 데이터 분석을 어렵게 만든다. -> 데이터의 양 증가

    • 카카오에는 카카오페이지 뿐만아니라 다음웹툰(현 카카오웹툰) 서비스를 제공한다. 두 서비스의 내용은 매우 유사하지만 각각을 구현하는 개발 조직(회원, 결재, 콘텐츠 관리, 운영 도구, 정산, 광고, 데이터플랫폼)은 분화되어 MSA(Micro Service Architecture)로 운영되고 있고, 각 조직마다 DB들(회원DB, 결재DB, 카카오페이지DB, 다음웹툰DB 등)을 따로 관리 하기 때문에 데이터 분석은 어려워 진다. -> 서로 다른 DB간 Join(cross database join)이 필요함

    • 해결 방법 1 - Data Lake
      • 여러 DB에 저장된 엄청난 양의 데이터를 모두 싸고, 확장성이 좋고, 안정성이 뛰어난 분산 파일 시스템인 하둡 HDFS 혹은 S3에 모아서 적재, 변환(가공)해 저장한다.
      • 데이터 분석가는 Hive, Spark, Impala, Presto 같은 분산형 쿼리/분석 엔진을 사용해 데이터를 분석한다.
      • 이렇게 하면 매우 많은 양의 데이터를 cross database join 없이 분석할 수 있다.
    • Data Lake를 위해서는 여러 DB에서 데이터를 적재하고, 적재가 완료되면 변환(가공)한다. 이러한 Workflow를 어떻게 제어할 것인가? -> workflow management가 필요함

    • 해결 방법 2- workflow management system 도입

    • 어떤 workflow management system을 사용할 것인가?

    • workflow management system 비교
      • Azkaban
        • Azkaban에서 정의한 문법대로 Flow(워크플로우) 파일을 만들고 이 파일을 압축하여 Azkaban에 올려서 워크플로우를 실행 -> 거추장스럽다. 귀찮다
      • LUigi
        • 작업을 Triggering하는 기능을 지원하지 않고 외부 시스템(ex. crontab 등)에 의존해야 한다.
      • oozie
        • Contributions (2019년 기준)
          • First Commit - 2011.08.09
          • Commit 수 - 2311개
          • Contributor 수 - 17명
          • Release 수 - 46번  
        • Features
          • DAG 표현 방식 - XML
          • GUI - Hue or Eclipse Plugin
          • REST API - Web Service API
          • Availability - Highly Available
      • Airflow
        • Contributions (2019년 기준)
          • First Commit - 2014.10.06
          • Commit 수 - 6631개
          • Contributor 수 - 900명
          • Release 수 - 116번 
        • Features
          • DAG 표현 방식 - Python Code - 표현이 간결하고, 표현의 자유도가 높다.
          • GUI - Rich UI
          • REST API - Experimental
          • Availability - SPoF(Single Point of Failure, 단일 장애점): Scheduler
    • 해결 방법 3 -  Airflow를 선택 (그림 1)
      • Contributions이 많다 -> 앞으로 차이가 더 벌어질 것이다.
      • DAG 표현 방식이 더 간결하고 자유도가 높다
      • 직관적인 User Interface
      • 뛰어난 확장성
      • 사전 정의된 Task가 풍부

[그림 2] Airflow Architecture (추가 출처 : https://deview.kr/2020/sessions/341)

Airflow 소개

  • 용어
    • 작업(Task) : 수집, 정제, 지표 생성, 리포팅 등 여러가지 처리해야 하는 일로 어떤 작업은 다른 작업이 끝나야 수행될 수 있다. 즉 어떤 작업은 다른 작업에 의존성을 가지고 있다.

    • 워크플로우(workflow) == DAG(Directed Acyclic Graph) : 워크플로우는 수행해야 하는 모든 작업(node)의 의존성(edge)을 모두 표현한 작업흐름(graph)이다. 모든 작업은 그래프에 따라 순차 실행되고, 모든 작업은 ASAP(최대한 빨리)으로 종료된다.

    • 실패 재시도 : 워크플로우 실행 도중 어떤 작업이 실패하면 실패한 작업은 다시 실행된다. 이때 연관성 없는 작업은 큰 영향없이 계속 실행되고, 연관성 있는 후속작업은 실패한 작업이 재실행되어 성공한 후 실행된다.

    • 실행 주기 : 어떤 워크플로우를 얼마의 시간마다 실행할 것인가?
  • Airflow 시연 (19:15 ~ 24:20)

  • Aiflow Architecture (그림 2)
    • Webserver
      • Airflow 모니터링, 조작 등을 위한 Web 페이지 제공
    •  Scheduler
      • 워크플로우(DAG)를 실행 주기마다 실행
      • 실행 주기가 되면 DAG안에 있는 작업(Task)에 대해 Meta DB에서 실행 가능 상태로 변경. 이 때 의존하는 작업이 모두 실행된 작업만 상태 변경
      • Executor
        • Meta DB를 주기적으로 확인하여 실행 가능 상태가된 작업(Task)를 Broker에 push
        • Broker에서 작업을 받아서 해당 작업을 수행하는 Worker를 실행함
    • Broker
      • 실행가능한(=의존성이 만족된) 작업들이 들어가는 공간
    • Worker
      • 실제 작업을 실행하는 주체
      • Task를 감싸는 Operator의 종류에 따라 아무것도 안할 수도 있고(DummyOperator), Bash 명령어를 수행해 Task를 실행할 수도 있고(BashOperator), 파이썬을 사용해 Task를 실행할 수도 있다.(PythonOperator) 
    • Meta DB
      • DAG, Task 등이 정의 되어 있음
      • DAG Run, Task Instance 등 워크플로우와 작업의 상태(존재,실행,종료 등)와 기록을 관리함
  • Airflow의 취약점과 해결방법 (2019년 기준) 
    • Scheduler, Broker, MetaDB는 각각 하나의 프로세스이기 때문에 셋 중 하나라도 장애가 발생하면 Airflow 전체에 장애가 발생한다. SPoF 문제
    • 해결 방법
      • Broker
        • Task를 외부 메시지 브로커(RabbitMQ or redis or ...)로 비동기 전송하는 Celery Executor를 사용하여 가용성이 높은 외부 메시지 브로커를 사용
      • MetaDB
        • DB를 중복(redundancy)구성하여 장애 발생 시 대처할 수 있게함
      • Scheduler
        • Airflow 2.0기준 여러 스케줄러를 실행할 수 있기 때문에 SPoF 문제가 해결됨
        • Airflow 2.0 이전에는 Failover Controller를 둬서 메인 스케줄러에 장에가 나면 스탠바이 스케줄러를 동작 시킴. 공식 지원은 아니기 때문에 직접 작성해야 함

카카오페이지에서 Airflow를 활용하는 방법

  • Data Lake 데이터 수집 문제
    • Data Lake를 구성하기 위해 여러 DB에서 처리되고 있는 데이터를 실행 주기마다 수집하여 하둡 HDFS로 저장한다.
    • 문제는 DB안에 테이블 개수가 너무 많다. 어떤 한 DB에 존재하는 테이블의 수가 350개라면 1분당 한 테이블씩 수집해도 6시간이 걸린다. 각 테이블을 병렬로 동시에 수집하면 DB에 부하가 너무 커져서 서비스에 문제가 생길 수 있다. -> DB에 존재하는 데이터를 그냥 수집하면 시간이 오래걸리거나 부하가 크다.
    • 해결 방법 - Airflow의 Pool 과 Priority Weight 활용
      • Pool : 동시성의 한계치를 제어
        • 만약 테이블 5개를 동시에 수집하고 싶다면 각 테이블을 수집하는 작업를 정의하고, Airflow Web Console에서 수집 작업들을 포함하는 Pool을 생성하고 Slots 값을 5를 주면 Pool에 포함된 작업들 중에서 5개씩 동시에 실행할 수 있다.
        • 카카오페이지에서는 먼저 Table 용량에 따라 Pool을 분류한다. 용량이 작은 테이블을 수집하는 작업은 small_tables Pool에 모아두고 Slots 값을 3~5정도로 높게 설정하고, 용량이 큰 테이블을 수집하는 작업은 large_tables Pool에 모아두고 Slots 값을 1정도로 낮게 설정한다.
      • Priority Weight : Pool 안에서 우선순위(Weight)에 따라 실행 순서를 제어
        • 기본적으로는 의존하는 작업이 많은 작업의 우선순위(Weight)가 높지만, DAG에서 Task를 정의할 때 우선순위를 변경할 수 있음
        • 카카오페이지에서 small_tables Pool에 있는 작업들 중 중요한 테이블을 수집하는 작업은 우선순위를 높게 잡는다.
    • Worker 역할 분리 문제
      • (그림 1)을 보면 데이터 분석가는 Impala, Spark, R을 활용해 Data Lake에 존재하는 데이터를 분석한다.
      • 워크플로우는 크게 "Data Lake 구성 Task들 -> 데이터 분석 애플리케이션 Task"가 될 것이다.
      • 위에서 가용성을 위해 Celery Executor를 사용했다. Celery Executor의 특징은 브로커로 전송된 Task를 N대의 Worker Machine(이하 Worker 클러스터)에서 동작하는 Celery Worker가 읽어서 실행한다.
      • 이 과정에서 Impala 애플리케이션, Spark 애플리케이션, R 애플리케이션이 하나의 Worker가 되어 Worker 클러스터에서 돌아가게 되면 R 애플리케이션을 실행시키는 Worker가 동작할 때 Worker 클러스터의 CPU와 IO를 많이 사용하기 때문에 부하가 발생하고 이 때문에 다른 Worker들이 동작할 때 영향을 줄 수 있다. 하지만 Impala 애플리케이션과 Spark 애플리케이션의 경우 분석 연산이 하둡에서 일어나기 때문에 애플리케이션 자체의 부하는 적다. -> Worker 클러스터 자원을 효율적으로 사용할 수 없다.
      • 또 다른 문제는 보안이다. 하나의 Worker 클러스터에서 어떤 Worker는 보안상 민감한 데이터를 다루고 어떤 Worker는 외부 시스템과 통신을 한다면, 공격자가 외부 시스템과 통신하는 Worker를 타고 민감한 데이터에 접근할 수 있다. -> 하나의 Worker 클러스터는 환경을 공유하기 때문에 민감한 정보가 노출될 위험 있다.
      • 해결 방법 - Worker 역할 분리
        • Task를 정의할 때 Task를 처리하는 브로커(queue)를 다른 값으로 설정하면 다른 브로커에서 처리할 수 있고 Worker 클러스터도 달라진다.
        • (celery 활용 시) 구현 방법 (?)
          • step 1. airflow.cfg 파일을 수정하여 default_queue 옵션에 queue 이름을 지정한다.
          • step 2. Worker 실행시에 '-q' 옵션과 같이 실행한다.
          • step 3. Task 정의 시 queue 파라메터를 지정한다.
    • 너무 거대한 DAG
      • 구현을 하다보면 DAG가 매우 거대해질 수 있다. 이 경우 DAG 속 각 Task간 의존관계를 살펴보기 어렵고, DAG 실행에 실패한 경우 재실행하는 것이 비효율적이다.
      • 해결 방법 - Let it be + ExternalTaskSensor (그림 3 참조)
        • 일단 이미 만들어진 매우 거대한 DAG는 그냥 두고, 해당 DAG에 새로운 의존성을 가지는 Task를 추가해야 하는 경우 다른 DAG를 구성하고 ExternalTaskSensor를 사용한다. 
          • ExternalTaskSensor를 사용하면 DAG는 분리하되 의존성은 유지할 수 있다. 
            • Sensor : 특정 조건이 채워지기를 체크 주기(Poke_invterval)마다 확인하면서 대기 시간(Timeout)동안 기다리는 Operator
            • ExternalTaskSensor : 내 DAG 안에서 외부에 있는 다른 DAG 속 한 Task의 상태(실행 전/중/성공/실패)를 체크할 수 있다.

[그림 3] Task 4~7을 추가할 때 DAG가 거대해지는 것을 막는 방법

마무리

Data Lake를 구성하는 파이프라인을 위해 워크플로우 매니지 먼트를 위한 시스템을 선택하는 과정과 Airflow의 문제점과 해결방법을 공부할 수 있었다.