본문 바로가기

Review

[DEVIEW 2020] Kubernetes를 이용한 효율적인 데이터 엔지니어링 (Airflow on Kubernetes VS Airflow Kubernetes Executor) Review

출처 : https://deview.kr/2020/sessions/341

 

Kubernetes를 이용한 효율적인 데이터 엔지니어링 (Airflow on Kubernetes VS Airflow Kubernetes Executor)

발표자 : 이웅규

deview.kr

이 자리를 통해 Airflow Executor의 여러 가지 종류와 동작 방식에 대해 공유하고 일반적인 Kubernetes Airflow 환경과 새로운 방식의 특징 및 장/단점을 비교하고자 합니다. 마지막으로 새로운 방식인 "Kubernetes Executor" 및 "KubernetesPodOperator"를 적용하여 어떻게 데이터 플랫폼을 운영하고 개발하고 있는지 공유하려 합니다.

이를 통해 다양한 원천 데이터 소스, 다양한 타켓 데이터 소스 환경에서 기존보다 확장성 있는 데이터 엔지니어링 경험에 대해 설명해 드리겠습니다.

 

리뷰 포인트

  1. 데이터 엔지니어링을 위한 Apache Airflow
  2. Apache Airflow와 Kubernetes를 함께 사용하는 방법

 

 

데이터 엔지니어링을 위한 Apache Airflow

  • 데이터 엔지니어링이란?
    • 다양한 서비스에서 발생한 데이터를 수집/저장/통합/관리/가공/분석/서빙/송수신 등의 작업을 통해 데이터가 잘 흐를 수 있게 하는 것
  • Apache Airflow란? 
    • 프로그래밍을 통해 workflow들을 스케줄링 및 모니터링 할 수 있게 해주는 플랫폼으로 데이터 엔지니어링을 조금 더 쉽고 효율적으로 할 수 있게 만들어준다.
    • Airflow의 구성 요소
      • Dag : 개발자가 Python 코드로 작성한 Workflow, 즉 데이터의 흐름 (간선)으로 Python Dag Script에 모두 작성한다.
      • Task : 하나의 작업 단위 ex) 서비스 B의 MSSQL 데이터를 하둡에 수집 한다. (B_Service_MSSQL_2_HDFS)
    • Airflow의 동작 원리  
      • 대표적인 프로세스
        • Kerberos : 티켓(ticket) 기반의 컴퓨터 네트워크 인증 프로토콜
        • Web Server : 모니터링을 위한 Web UI 접속
        • Scheduler
          • 개발자가 정의한 Python Dag Script를 주기적으로 읽어서 현재 내가 어느 시점의 Dag를 실행해야 하는 지 파악
          • Task들이 Dag 안에서 어떤 형태로 workflow가 구성되어있는지를 Meta DB에 기입함
          • Executor 
            • 현재 시점에 실행해야 하는 Task들을 주기적으로 탐색
            • Meta DB에 있는 실행해야 하는 Task의 상태 변경
            • 실행해야 하는 Task를 내부 Queue에 입력
            • Queue에 있는 Task를 읽어서 Worker로 전달
        • Worker
          • 전달 받은 Task를 실행 시킬 수 있는 CLI Command를 실행
          • Meta DB에 있는 Task 상태 변경
    • Airflow Executor의 종류와 특징
      • Base Executor (부모 클래스, 모든 Executor는 Base Executor를 상속받음)
        • Local Executor (고가용성에 문제가 있어, Beta환경이나 Dev환경에서 주로 사용) [source] (참조)
          • Task를 병렬처리 할 수 있게 함 
          • _UnlimitedParallelism (airflow.cfg 설정 파일의 parallelism factor의 값이 0임)
            • 이론상 무한대로 Task를 병렬 처리할 수 있음
          • _LimitedParallelism (airflow.cfg 설정 파일의 parallelism factor의 값이 1이상 임)
            • parallelism factor의 값에 따라 Task가 병렬로 유한하게 실행됨
        • Celery Executor (고가용성 보장, 서비스에서 사용됨) [source] (참조)
          • Task를 외부 메시지 브로커(RabbitMQ or redis or ...)로 비동기 전송
          • 브로커로 전송된 Task를 N대의 Worker Machine에서 동작하는 Celery Worker가 읽어서 실행 
        • Kubernetes Executor
        • ...


Airflow와 Kubernetes를 함께 사용하는 방법​

  • Airflow on Kubernetes
    • Kubernetes 클러스터 위에 N개의 pod(동일한 컴퓨팅 리소스를 공유하는 1개 이상의 컨테이너 모음)에 Airflow (celery executor인 경우 메시지 브로커와 worker machine 까지)를 띄움
    • 장점
      • 구성이 간단하여 Template화 하기 쉬움
        • Kubernetes에 Airflow를 위한 Central Platform을 만들고, A라는 팀이 새로운 Airflow환경을 구성하기 위해 Central Platform에 Airflow환경을 요청하면 해당 팀을 위한 Pod를 새로 띄워 줄 수 있음
    • 단점
      • 유지보수 비용이 많이 든다.
        • Airflow 전체 workflow의 시작부터 종료시점까지 Kubernetes환경에서 관련 Pod들이 상주해야 하기 때문에 모니터링 등 관리 포인트가 늘어난다.
      • 라이브러리 의존성에 따라 무거워 짐 (?)
        • 만약 어떤 Pod에 있는 Container에서 여러 버전의 하둡에 접근해야 한다면, 각 버전에 맞는 하둡 클라이언트, Airflow, echo system, 라이브러리 등이 설치되어야 하고, 접근해야 하는 하둡의 버전이 다양할 수록 container가 점점 무거워지고, 신규 Hadoop Client 설치, 환경변수 확인, 사용 Component기능 확인, 라이브러리 의존성 여부, Python Version 확인, 기 코드 정상 작동 여부 확인 등 해야 할 작업도 늘어남
      • Kubernetes의 장점을 살리지 못함
  • Kubernetes Executor 와 KubernetesPodOperator
    • Airflow 전체 workflow의 시작부터 종료 시점 중 특정 시점에 Task를 수행할 때 필요한 Container만 골라 Kubernetes상에 Pod 띄워 필요할 때만 필요한 만큼의 Kubernetes 자원을 사용한다.
      • 예시
        • 상황
          • Task A : 하둡 2.x에서 데이터를 읽어서 하둡 3.x에 쓰기
          • Task B : 하둡 3.x에서 데이터를 읽어서 kafka 쓰기 
          • Task C : ​kafka에서 데이터를 읽어서 Mysql에 쓰기
          • workflow : A->B->C
        • 처리과정
          1. Kubernetes에 Task A를 처리할 수 있는 Pod를 띄우고 실행이 완료되면 내림
          2. Kubernetes에 Task B를 처리할 수 있는 Pod를 띄우고 실행이 완료되면 내림
          3. Kubernetes에 Task C를 처리할 수 있는 Pod를 띄우고 실행이 완료되면 내림
    • Kubernetes Executor를 사용하는 두 가지 방법
      • 일반 Operator(Airflow 설치 시 같이 설치되는 기본 Operator)들과 함께 사용
        1. Kubernetes Executor가 현시점에 실행해야 하는 Task를 감지한다.
        2. Airflow 설정 파일에 있는 kubernetes 설정과 image 정보를 토대로 Airflow Worker 정보를 구성한다.
        3. Worker 정보들이 Kubernetes API 서버에 요청되면 Kubernetes 서버는 Kubernetes 상에 동적으로 Airflow Worker Pod를 띄운다.
        4. Worker Pod 안에서 일반 Operator들 중 하나가 작동한다. 이 때 모든 Task는 동일한 환경에서 실행된다.
        5. Worker Pod의 작업이 끝나면 Kubernetes에 자원을 반납하고 종료된다.
      • KubernetesPodOperator와 함께 사용
        1. Kubernetes Executor가 현시점에 실행해야 하는 Task를 감지한다.
        2. Airflow 설정 파일에 있는 kubernetes 설정과 image 정보를 토대로 Airflow Worker 정보를 구성한다.
        3. Worker 정보들이 Kubernetes API 서버에 요청되면 Kubernetes 서버는 Kubernetes 상에 동적으로 Airflow Worker Pod를 띄운다.
        4. Worker Pod에서는 KubernetesPodOperator에 의해 개발자가 Dag Script에 직접 정의한 custom Pod 정의서를 Kubernetes API 서버에 요청한다.
        5. 요청 결과로 custom Pod를 실행시킨다. 이 때 각 Task는 custom Pod 환경에 따라 수행된다.
        6. Worker Pod의 작업이 끝나면 Kubernetes에 자원을 반납하고 종료된다.
        7. costom Pod의 작업이 끝나면 Kubernetes에 자원을 반납하고 종료된다.
    • 장점
      • 가볍다
      • 유지보수 비용 절감
        • Docker를 통한 Task간 독립성 보장
        • 운영중인 여러 Airflow 통합 가능
      • 효율적 자원관리
      • 개발 효율성

 

마무리

Airflow의 동작방식과 Kubernetes와 Airflow를 함께 사용하는 방법에 대해 알 수 있었다. scheduler, MetaDB, Worker의 실행 순서, Scheduler안에서 동작하는 Executor의 차이, Worker안에서 동작하는 Operator의 차이를 알 수 있었다. Kubernetes 클러스터 위에 Airflow를 띄워서 마치 로컬에서 Airflow를 사용하듯 동작시킬 수도 있고, Kubernetes Executor와 KubernetesPodOperator를 통해 Kubernestes를 더 유연하게 활용할 수도 있었다.

  1.