11. Bonus: dbt + Airflow on Kubernetes¶
챕터 1~10에서 Kubernetes의 핵심 개념을 학습했습니다. 이번 보너스 챕터에서는 데이터 엔지니어링의 핵심 도구인 Airflow와 dbt를 Kubernetes 위에 배포하는 실전 방법을 다룹니다.
이 챕터의 대상 독자
- Kubernetes 기본 개념(Pod, Deployment, Service, Volume, Secret)을 이해하고 있는 분
- Airflow 또는 dbt를 사용해본 경험이 있거나, 곧 도입하려는 데이터 엔지니어
- 프로덕션 환경에서 데이터 파이프라인을 안정적으로 운영하고 싶은 분
사전 지식
이 챕터는 챕터 5(YAML 매니페스트), 6(워크로드), 7(네트워킹), 8(스토리지와 설정)의 내용을 기반으로 합니다. 해당 챕터를 먼저 학습한 뒤 진행하세요.
Part 1: RBAC & ServiceAccount¶
Airflow가 Kubernetes 위에서 동작할 때 가장 먼저 부딪히는 문제는 권한입니다. KubernetesExecutor를 사용하면 Airflow Scheduler가 각 Task를 독립 Pod으로 동적 생성하는데, 이를 위해서는 적절한 Kubernetes API 권한이 필요합니다.
1.1 ServiceAccount란?¶
Kubernetes에서 Pod 안의 프로세스가 API Server와 통신할 때, 사람(User)이 아닌 서비스 계정(ServiceAccount)으로 인증합니다.
기본 ServiceAccount vs 커스텀 ServiceAccount¶
| 항목 | 기본(default) ServiceAccount | 커스텀 ServiceAccount |
|---|---|---|
| 생성 방식 | Namespace 생성 시 자동 생성 | 직접 생성 |
| 권한 | 거의 없음 (API discovery 정도) | 필요한 만큼 RBAC으로 부여 |
| 토큰 | 자동 마운트 (/var/run/secrets/kubernetes.io/serviceaccount/token) |
동일 |
| 사용 시나리오 | 별도 권한이 필요 없는 일반 앱 | Airflow처럼 K8s API를 호출하는 앱 |
기본 ServiceAccount에 권한을 주지 마세요
기본 ServiceAccount에 강력한 권한을 부여하면, 해당 Namespace의 모든 Pod이 그 권한을 갖게 됩니다. 반드시 전용 ServiceAccount를 만들어 필요한 Pod에만 바인딩하세요.
커스텀 ServiceAccount 생성¶
# airflow-sa.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: airflow-worker
namespace: airflow
labels:
app: airflow
component: worker
Pod에 ServiceAccount 바인딩하기¶
apiVersion: v1
kind: Pod
metadata:
name: airflow-scheduler
namespace: airflow
spec:
serviceAccountName: airflow-worker # (1)!
containers:
- name: scheduler
image: apache/airflow:2.9.0
serviceAccountName필드로 커스텀 ServiceAccount를 지정합니다. 지정하지 않으면defaultServiceAccount가 사용됩니다.
1.2 RBAC 완전 정복¶
RBAC(Role-Based Access Control)은 Kubernetes에서 "누가(Subject), 무엇을(Resource), 어떻게(Verb) 할 수 있는가"를 정의하는 권한 체계입니다.
Role vs ClusterRole¶
| 항목 | Role | ClusterRole |
|---|---|---|
| 범위 | 특정 Namespace 내 | 클러스터 전체 |
| 사용 사례 | "airflow NS에서만 Pod 관리" | "모든 NS에서 Node 조회" |
| 바인딩 | RoleBinding으로 연결 | ClusterRoleBinding 또는 RoleBinding |
# airflow-role.yaml — Namespace 범위의 Role
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: airflow-pod-manager
namespace: airflow
rules:
- apiGroups: [""] # (1)!
resources: ["pods"]
verbs: ["get", "list", "watch", "create", "delete", "patch"]
- apiGroups: [""]
resources: ["pods/log"] # (2)!
verbs: ["get", "list"]
- apiGroups: [""]
resources: ["pods/exec"]
verbs: ["create", "get"]
- apiGroups: [""]
resources: ["events"]
verbs: ["list", "watch"]
""(빈 문자열)은 Core API Group을 의미합니다. Pod, Service, ConfigMap 등이 여기에 속합니다.pods/log는 Pod의 로그를 조회하는 하위 리소스(subresource)입니다.
# ClusterRole 예시 — 클러스터 범위
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: airflow-node-reader
rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["get", "list"]
RoleBinding vs ClusterRoleBinding¶
# airflow-rolebinding.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow-pod-manager-binding
namespace: airflow
subjects:
- kind: ServiceAccount
name: airflow-worker # (1)!
namespace: airflow
roleRef:
kind: Role
name: airflow-pod-manager # (2)!
apiGroup: rbac.authorization.k8s.io
- 위에서 만든 ServiceAccount를 Subject로 지정합니다.
- 위에서 만든 Role을 참조합니다.
# ClusterRoleBinding 예시
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: airflow-node-reader-binding
subjects:
- kind: ServiceAccount
name: airflow-worker
namespace: airflow
roleRef:
kind: ClusterRole
name: airflow-node-reader
apiGroup: rbac.authorization.k8s.io
최소 권한 원칙 (Principle of Least Privilege)
RBAC을 설정할 때 항상 필요한 최소한의 권한만 부여하세요.
verbs: ["*"]대신 실제 필요한 동작만 나열resources: ["*"]대신 필요한 리소스만 나열- ClusterRole보다 Role을 먼저 고려 (범위를 좁게)
- 주기적으로 사용하지 않는 권한 제거
권한 검증¶
# airflow-worker ServiceAccount가 pods를 create할 수 있는지 확인
kubectl auth can-i create pods \
--as=system:serviceaccount:airflow:airflow-worker \
-n airflow
# 출력: yes
# pods를 delete할 수 있는지 확인
kubectl auth can-i delete pods \
--as=system:serviceaccount:airflow:airflow-worker \
-n airflow
# 출력: yes
# secrets를 get할 수 있는지 확인 (부여하지 않았으므로)
kubectl auth can-i get secrets \
--as=system:serviceaccount:airflow:airflow-worker \
-n airflow
# 출력: no
1.3 Namespace 전략¶
데이터 파이프라인 환경에서는 환경별 격리가 중요합니다. Namespace를 활용하면 dev/stg/prd를 같은 클러스터에서도 안전하게 분리할 수 있습니다.
환경별 Namespace 분리¶
kubectl create namespace airflow-dev
kubectl create namespace airflow-stg
kubectl create namespace airflow-prd
ResourceQuota — Namespace별 리소스 상한¶
# resourcequota-dev.yaml
apiVersion: v1
kind: ResourceQuota
metadata:
name: airflow-dev-quota
namespace: airflow-dev
spec:
hard:
requests.cpu: "4" # (1)!
requests.memory: 8Gi
limits.cpu: "8"
limits.memory: 16Gi
pods: "20" # (2)!
persistentvolumeclaims: "5"
- dev 환경에서는 총 CPU 요청량을 4코어로 제한합니다.
- 동시에 실행 가능한 Pod 수를 20개로 제한합니다. KubernetesExecutor 사용 시 Task마다 Pod이 생성되므로 이 값을 적절히 설정해야 합니다.
# resourcequota-prd.yaml
apiVersion: v1
kind: ResourceQuota
metadata:
name: airflow-prd-quota
namespace: airflow-prd
spec:
hard:
requests.cpu: "32"
requests.memory: 64Gi
limits.cpu: "64"
limits.memory: 128Gi
pods: "200"
persistentvolumeclaims: "20"
LimitRange — Pod/Container별 기본값과 상한¶
# limitrange.yaml
apiVersion: v1
kind: LimitRange
metadata:
name: airflow-limit-range
namespace: airflow-prd
spec:
limits:
- type: Container
default: # (1)!
cpu: 500m
memory: 512Mi
defaultRequest: # (2)!
cpu: 100m
memory: 128Mi
max: # (3)!
cpu: "4"
memory: 8Gi
min:
cpu: 50m
memory: 64Mi
- type: Pod
max:
cpu: "8"
memory: 16Gi
default: 컨테이너에 limits가 명시되지 않았을 때 적용되는 기본 limitsdefaultRequest: 컨테이너에 requests가 명시되지 않았을 때 적용되는 기본 requestsmax: 한 컨테이너가 사용할 수 있는 최대 리소스. 이를 초과하면 Pod 생성이 거부됩니다.
Part 2: Airflow on Kubernetes¶
2.1 Airflow 아키텍처 on K8s¶
Apache Airflow를 Kubernetes 위에 배포하면, 각 컴포넌트가 별도의 Pod으로 실행됩니다.
DAG 상태 모니터링
Deployment (replicas: 2)
KubernetesExecutor로 Pod 생성
Deployment (replicas: 2)
비동기 이벤트 대기
Deployment (replicas: 1)
Task 완료 후 자동 삭제
Task 완료 후 자동 삭제
Task 완료 후 자동 삭제
DAG/Task 상태 저장
StatefulSet or 외부 RDS
Scheduler/Webserver에 Sidecar
Task 로그 영구 저장
각 컴포넌트의 역할:
| 컴포넌트 | K8s 리소스 | 역할 |
|---|---|---|
| Webserver | Deployment + Service | UI 제공, DAG/Task 상태 조회 |
| Scheduler | Deployment | DAG 파싱, Task 스케줄링, Worker Pod 생성 |
| Triggerer | Deployment | Deferrable Operator의 비동기 이벤트 처리 |
| Worker | 동적 Pod (KubernetesExecutor) | 실제 Task 실행, 완료 후 자동 삭제 |
| Metadata DB | StatefulSet 또는 외부 DB | DAG/Task 상태, 변수, 커넥션 등 저장 |
| Redis | StatefulSet (CeleryExecutor 전용) | Celery 메시지 브로커 |
2.2 Executor 선택¶
Airflow의 Executor는 Task를 어떻게 실행할지 결정하는 핵심 설정입니다.
| 항목 | LocalExecutor | CeleryExecutor | KubernetesExecutor |
|---|---|---|---|
| Task 실행 방식 | Scheduler 프로세스 내 subprocess | Celery Worker (별도 프로세스) | 독립 Pod |
| 병렬 실행 | 제한적 (단일 머신) | 높음 (Worker 수에 비례) | 매우 높음 (노드 수에 비례) |
| 리소스 격리 | 없음 | Worker 단위 | Task 단위 (Pod) |
| 스케일링 | 불가 | Worker 수 조절 | 자동 (Task마다 Pod) |
| 추가 인프라 | 없음 | Redis + Celery Worker | 없음 |
| 유휴 리소스 | Scheduler 항상 실행 | Worker 항상 실행 | Task 없으면 Pod 없음 |
| Task별 이미지 | 불가 | 불가 | 가능 (Task마다 다른 이미지) |
| 적합한 환경 | 개발/소규모 | 대규모 + 빠른 시작 | K8s 네이티브 환경 |
KubernetesExecutor 동작 원리¶
KubernetesExecutor를 사용하면 각 Task가 독립적인 Pod으로 실행됩니다.
Scheduler가 Task를 스케줄링
│
▼
K8s API에 Pod 생성 요청 (ServiceAccount 권한 필요!)
│
▼
Worker Pod 생성 → Task 실행
│
▼
Task 완료 → Pod 상태를 Scheduler에 보고
│
▼
Worker Pod 자동 삭제 (설정에 따라 유지 가능)
왜 KubernetesExecutor인가?
- 리소스 효율: Task가 없으면 Worker Pod도 없음 → 유휴 리소스 최소화
- Task별 격리: 각 Task가 독립 Pod이므로, 메모리 누수나 라이브러리 충돌 없음
- Task별 이미지: dbt Task는 dbt 이미지, Spark Task는 Spark 이미지로 실행 가능
- 자동 스케일링: Node Autoscaler와 연동하면 Task 부하에 따라 노드 자동 확장
KubernetesExecutor의 단점
- 콜드 스타트: Pod 생성 + 이미지 Pull에 10~30초 소요 (짧은 Task에는 오버헤드)
- RBAC 설정 필수: Scheduler가 Pod를 생성/삭제할 권한 필요 (Part 1 참고)
- 디버깅 난이도: Task Pod가 사라지므로 로그 확인이 어려움 → Remote Logging 필수
2.3 Official Airflow Helm Chart¶
Apache Airflow 공식 Helm Chart는 프로덕션 수준의 Airflow를 쉽게 배포할 수 있게 해줍니다.
설치¶
# Helm repo 추가
helm repo add apache-airflow https://airflow.apache.org
helm repo update
# 기본 설치 (테스트용)
helm install airflow apache-airflow/airflow \
--namespace airflow \
--create-namespace
# values 파일로 커스텀 설치
helm install airflow apache-airflow/airflow \
--namespace airflow-prd \
--create-namespace \
-f values-prd.yaml
values.yaml 핵심 설정¶
# values-prd.yaml — 프로덕션 환경 Airflow Helm Chart 설정
# ──────────────────────────────────────────────────────
# 1) Executor 타입
# ─────────────────
executor: KubernetesExecutor # (1)!
# 2) Airflow 이미지 설정
# ─────────────────────
images:
airflow:
repository: my-registry.example.com/airflow # (2)!
tag: "2.9.0-custom"
pullPolicy: IfNotPresent
gitSync:
repository: registry.k8s.io/git-sync/git-sync
tag: "v4.2.1"
# 3) Webserver 설정
# ─────────────────
webserver:
replicas: 2 # (3)!
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: "1"
memory: 2Gi
service:
type: ClusterIP # (4)!
livenessProbe:
initialDelaySeconds: 15
periodSeconds: 10
readinessProbe:
initialDelaySeconds: 15
periodSeconds: 10
env:
- name: AIRFLOW__WEBSERVER__EXPOSE_CONFIG
value: "false" # (5)!
# 4) Scheduler 설정
# ─────────────────
scheduler:
replicas: 2 # (6)!
resources:
requests:
cpu: "1"
memory: 2Gi
limits:
cpu: "2"
memory: 4Gi
# 5) Triggerer 설정
# ─────────────────
triggerer:
enabled: true
replicas: 1
resources:
requests:
cpu: 250m
memory: 512Mi
# 6) PostgreSQL 설정 (Metadata DB)
# ─────────────────────────────────
postgresql:
enabled: false # (7)!
data:
metadataConnection:
user: airflow
pass: ""
protocol: postgresql
host: my-postgres.example.com
port: 5432
db: airflow_metadata
sslmode: require
metadataSecretName: airflow-metadata-secret # (8)!
# 7) Redis 설정
# ─────────────
redis:
enabled: false # (9)!
# 8) Worker 설정 (KubernetesExecutor용)
# ──────────────────────────────────────
workers:
serviceAccount:
name: airflow-worker # (10)!
# 9) ServiceAccount 설정
# ──────────────────────
serviceAccount:
create: true
name: airflow-worker
annotations:
eks.amazonaws.com/role-arn: arn:aws:iam::123456789012:role/airflow-s3-access # (11)!
# 10) DAG 동기화 (git-sync)
# ─────────────────────────
dags:
persistence:
enabled: false
gitSync:
enabled: true # (12)!
repo: https://github.com/myorg/airflow-dags.git
branch: main
subPath: dags
wait: 60 # (13)!
containerName: git-sync
resources:
requests:
cpu: 50m
memory: 64Mi
# 11) 환경변수 추가
# ─────────────────
env:
- name: AIRFLOW__CORE__LOAD_EXAMPLES
value: "false"
- name: AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION
value: "true"
extraEnv: |
- name: AIRFLOW__LOGGING__REMOTE_LOGGING
value: "true"
- name: AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER
value: "s3://my-airflow-logs/prd"
# 12) 추가 Volume / VolumeMount
# ─────────────────────────────
extraVolumes:
- name: dbt-profiles
secret:
secretName: dbt-profiles-secret
extraVolumeMounts:
- name: dbt-profiles
mountPath: /opt/dbt/profiles
readOnly: true
# 13) Ingress 설정
# ────────────────
ingress:
web:
enabled: true
ingressClassName: nginx
hosts:
- name: airflow.example.com
tls:
enabled: true
secretName: airflow-tls-secret
KubernetesExecutor를 선택하면 Redis, Celery Worker가 불필요합니다.- pip 추가 패키지가 포함된 커스텀 이미지를 사용할 수 있습니다.
- Webserver 2개로 고가용성을 확보합니다.
- Ingress를 사용할 것이므로 ClusterIP로 설정합니다.
- 보안을 위해 웹에서 설정 파일 노출을 비활성화합니다.
- Airflow 2.x부터 Scheduler HA가 지원됩니다.
- 프로덕션에서는 외부 관리형 DB(RDS 등)를 사용하는 것을 강력히 권장합니다.
- DB 비밀번호를 Secret으로 관리합니다.
- KubernetesExecutor에서는 Redis가 불필요합니다.
- Part 1에서 만든 ServiceAccount를 지정합니다.
- AWS EKS의 IRSA(IAM Roles for Service Accounts)로 S3 접근 권한을 부여합니다.
- git-sync는 DAG 배포의 가장 권장되는 방식입니다.
- 60초마다 Git 저장소에서 DAG를 동기화합니다.
환경별 values 오버라이드 패턴¶
# 공통 설정
values.yaml # 기본 values (모든 환경 공통)
# 환경별 오버라이드
values-dev.yaml # executor: LocalExecutor, replicas: 1
values-stg.yaml # executor: KubernetesExecutor, replicas: 1
values-prd.yaml # executor: KubernetesExecutor, replicas: 2
# 설치 시 공통 + 환경별 오버라이드 적용
helm install airflow apache-airflow/airflow \
-f values.yaml \
-f values-prd.yaml \
--namespace airflow-prd
2.4 DAG 배포 방식¶
DAG 파일을 Airflow Scheduler와 Webserver에 전달하는 방법은 크게 3가지입니다.
방식 비교¶
| 방식 | 장점 | 단점 | 추천 상황 |
|---|---|---|---|
| git-sync (Sidecar) | DAG만 수정하면 자동 반영, 이미지 재빌드 불필요 | Git 접근 설정 필요, 동기화 딜레이 | 프로덕션 환경 (권장) |
| PVC (Persistent Volume) | 단순한 구조 | 파일 업데이트 메커니즘 별도 필요, 멀티 노드에서 ReadWriteMany 필요 | 단순한 환경, NFS 사용 시 |
| Docker 이미지에 포함 | 버전 관리 명확, 불변 배포 | DAG 수정마다 이미지 재빌드/재배포 필요 | CI/CD가 잘 갖춰진 환경 |
git-sync 상세 설정¶
# values.yaml (git-sync 부분)
dags:
gitSync:
enabled: true
repo: https://github.com/myorg/airflow-dags.git
branch: main
rev: HEAD
depth: 1 # (1)!
maxFailures: 3 # (2)!
subPath: dags # (3)!
wait: 60
credentialsSecret: git-credentials # (4)!
sshKeySecret: ""
depth: 1로 shallow clone하여 디스크 사용량을 최소화합니다.- 연속 3번 실패하면 에러로 보고합니다.
- Git 저장소 내
dags/디렉토리만 동기화합니다. - private 저장소의 경우 인증 정보를 Secret으로 전달합니다.
# git-credentials Secret
apiVersion: v1
kind: Secret
metadata:
name: git-credentials
namespace: airflow-prd
type: Opaque
data:
GIT_SYNC_USERNAME: bXl1c2Vy # base64 encoded
GIT_SYNC_PASSWORD: bXlwYXNz # base64 encoded
SSH Key 방식
HTTPS 인증 대신 SSH Key를 사용할 수도 있습니다.
2.5 KubernetesPodOperator 사용법¶
KubernetesPodOperator는 Airflow에서 완전히 독립적인 Pod을 생성하여 Task를 실행하는 Operator입니다. KubernetesExecutor와는 별개로, 어떤 Executor를 사용하든 활용할 수 있습니다.
KubernetesExecutor vs KubernetesPodOperator
- KubernetesExecutor: Airflow의 모든 Task가 자동으로 별도 Pod에서 실행됨
- KubernetesPodOperator: 특정 Task만 별도 Pod에서 실행하도록 명시적으로 지정
CeleryExecutor를 사용하면서도 특정 무거운 Task만 KubernetesPodOperator로 실행할 수 있습니다.
왜 사용하는가?¶
- 환경 격리: Task별로 다른 Docker 이미지 사용 (Python 3.9 vs 3.11, dbt vs Spark)
- 라이브러리 의존성 분리: 충돌하는 패키지를 서로 다른 컨테이너에서 실행
- 리소스 세밀 제어: Task별로 CPU/Memory 요청량을 다르게 설정
- 보안: 민감한 작업을 격리된 환경에서 실행
완전한 DAG 예시¶
# dags/etl_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s
# Volume 정의
dbt_profiles_volume = k8s.V1Volume(
name="dbt-profiles",
secret=k8s.V1SecretVolumeSource(secret_name="dbt-profiles-secret"),
)
dbt_profiles_mount = k8s.V1VolumeMount(
name="dbt-profiles",
mount_path="/home/dbt/.dbt",
read_only=True,
)
default_args = {
"owner": "data-team",
"depends_on_past": False,
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="etl_pipeline",
default_args=default_args,
description="ETL pipeline using KubernetesPodOperator",
schedule="0 6 * * *", # 매일 06:00 UTC
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["etl", "dbt", "kubernetes"],
) as dag:
# Task 1: 원천 데이터 추출
extract = KubernetesPodOperator(
task_id="extract_source_data",
namespace="airflow-prd",
image="my-registry.example.com/etl-extract:1.2.0",
arguments=["python", "extract.py", "--date", "{{ ds }}"], # (1)!
env_vars={
"SOURCE_DB_HOST": "{{ var.value.source_db_host }}",
"BATCH_DATE": "{{ ds }}",
},
resources=k8s.V1ResourceRequirements(
requests={"cpu": "500m", "memory": "1Gi"},
limits={"cpu": "1", "memory": "2Gi"},
),
is_delete_operator_pod=True, # (2)!
get_logs=True, # (3)!
startup_timeout_seconds=300,
name="extract-source-data",
)
# Task 2: dbt 실행
dbt_run = KubernetesPodOperator(
task_id="dbt_run",
namespace="airflow-prd",
image="my-registry.example.com/dbt-project:1.0.0",
cmds=["dbt"],
arguments=["run", "--target", "prd", "--select", "tag:daily"],
volumes=[dbt_profiles_volume],
volume_mounts=[dbt_profiles_mount],
env_vars={
"DBT_PROFILES_DIR": "/home/dbt/.dbt",
},
resources=k8s.V1ResourceRequirements(
requests={"cpu": "1", "memory": "2Gi"},
limits={"cpu": "2", "memory": "4Gi"},
),
is_delete_operator_pod=True,
get_logs=True,
name="dbt-run-daily",
)
# Task 3: dbt 테스트
dbt_test = KubernetesPodOperator(
task_id="dbt_test",
namespace="airflow-prd",
image="my-registry.example.com/dbt-project:1.0.0",
cmds=["dbt"],
arguments=["test", "--target", "prd", "--select", "tag:daily"],
volumes=[dbt_profiles_volume],
volume_mounts=[dbt_profiles_mount],
env_vars={
"DBT_PROFILES_DIR": "/home/dbt/.dbt",
},
resources=k8s.V1ResourceRequirements(
requests={"cpu": "500m", "memory": "1Gi"},
limits={"cpu": "1", "memory": "2Gi"},
),
is_delete_operator_pod=True,
get_logs=True,
name="dbt-test-daily",
)
extract >> dbt_run >> dbt_test
- Airflow의 Jinja 템플릿으로 실행 날짜(
{{ ds }})를 인자로 전달할 수 있습니다. is_delete_operator_pod=True로 설정하면 Task 완료 후 Pod이 자동 삭제됩니다.get_logs=True로 설정하면 Pod의 stdout/stderr가 Airflow Task 로그에 표시됩니다.
pod_template_file 활용¶
반복되는 Pod 설정을 YAML 템플릿으로 분리할 수 있습니다.
# pod_templates/dbt_pod_template.yaml
apiVersion: v1
kind: Pod
metadata:
labels:
app: dbt
managed-by: airflow
spec:
serviceAccountName: airflow-worker
containers:
- name: base
resources:
requests:
cpu: "1"
memory: 2Gi
limits:
cpu: "2"
memory: 4Gi
volumeMounts:
- name: dbt-profiles
mountPath: /home/dbt/.dbt
readOnly: true
env:
- name: DBT_PROFILES_DIR
value: /home/dbt/.dbt
volumes:
- name: dbt-profiles
secret:
secretName: dbt-profiles-secret
restartPolicy: Never
# DAG에서 pod_template_file 사용
dbt_run = KubernetesPodOperator(
task_id="dbt_run",
namespace="airflow-prd",
image="my-registry.example.com/dbt-project:1.0.0",
cmds=["dbt"],
arguments=["run", "--target", "prd"],
pod_template_file="/opt/airflow/pod_templates/dbt_pod_template.yaml",
is_delete_operator_pod=True,
get_logs=True,
name="dbt-run",
)
2.6 Airflow Remote Logging (S3)¶
KubernetesExecutor를 사용하면 Worker Pod는 Task 완료 후 삭제됩니다. 로컬 로그 파일도 함께 사라지므로, Remote Logging은 필수입니다.
환경변수 방식 (values.yaml)¶
# values.yaml
extraEnv: |
- name: AIRFLOW__LOGGING__REMOTE_LOGGING
value: "true"
- name: AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID
value: "aws_default"
- name: AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER
value: "s3://my-airflow-logs/prd"
- name: AIRFLOW__LOGGING__ENCRYPT_S3_LOGS
value: "false"
# Connection 설정 (Secret으로 관리)
extraSecrets:
airflow-connections:
data: |
AIRFLOW_CONN_AWS_DEFAULT: 'aws://?region_name=ap-northeast-2'
IRSA (IAM Roles for Service Accounts) 활용
AWS EKS 환경에서는 IRSA를 사용하면 Access Key 없이 IAM Role만으로 S3에 접근할 수 있습니다. ServiceAccount에 IAM Role을 매핑하면 됩니다.
GCS (Google Cloud Storage) 사용 시¶
extraEnv: |
- name: AIRFLOW__LOGGING__REMOTE_LOGGING
value: "true"
- name: AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER
value: "gs://my-airflow-logs/prd"
- name: AIRFLOW__LOGGING__GOOGLE_KEY_PATH
value: "/opt/airflow/secrets/gcp-key.json"
extraVolumes:
- name: gcp-key
secret:
secretName: gcp-sa-key
extraVolumeMounts:
- name: gcp-key
mountPath: /opt/airflow/secrets
readOnly: true
2.7 Airflow 모니터링¶
프로덕션 환경에서는 Airflow 상태를 실시간으로 모니터링해야 합니다.
StatsD -> Prometheus -> Grafana 연동¶
# values.yaml — 모니터링 설정
statsd:
enabled: true # (1)!
resources:
requests:
cpu: 50m
memory: 64Mi
# StatsD Exporter가 Prometheus 포맷으로 변환
config:
metrics:
statsd_on: true
statsd_host: localhost
statsd_port: 8125
statsd_prefix: airflow
- Airflow는 StatsD 프로토콜로 메트릭을 내보냅니다. 공식 Helm Chart에는 StatsD Exporter가 포함되어 있어 Prometheus가 직접 스크랩할 수 있습니다.
Prometheus ServiceMonitor¶
# servicemonitor.yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: airflow-statsd
namespace: airflow-prd
labels:
release: prometheus # Prometheus Operator의 selector와 매칭
spec:
selector:
matchLabels:
component: statsd
endpoints:
- port: statsd-metrics
interval: 30s
path: /metrics
핵심 모니터링 메트릭¶
| 메트릭 | 설명 | 경고 기준 |
|---|---|---|
airflow_dag_processing_total_parse_time |
DAG 파일 파싱 소요 시간 | > 30초이면 DAG 코드 최적화 필요 |
airflow_dagrun_duration_success |
DAG Run 성공 소요 시간 | 평소 대비 2배 이상이면 조사 |
airflow_task_duration |
개별 Task 실행 시간 | 예상 범위 초과 시 알림 |
airflow_pool_open_slots |
Pool 사용 가능 슬롯 수 | 0이면 Task 대기 발생 |
airflow_scheduler_heartbeat |
Scheduler 생존 신호 | 60초 이상 없으면 Scheduler 다운 |
airflow_executor_running_tasks |
현재 실행 중인 Task 수 | 급격한 증가/감소 모니터링 |
airflow_zombie_tasks_killed |
좀비 Task 수 | 0이어야 정상. 잦으면 리소스 부족 의심 |
Part 3: dbt on Kubernetes¶
3.1 dbt 실행 패턴 비교¶
dbt를 Kubernetes 환경에서 실행하는 방법은 여러 가지입니다. 상황에 따라 적합한 패턴을 선택하세요.
| 패턴 | 격리 수준 | 유연성 | 복잡도 | 추천 상황 |
|---|---|---|---|---|
| K8s Job/CronJob | 높음 (독립 Pod) | 중간 | 낮음 | Airflow 없이 dbt만 운영 |
| KubernetesPodOperator | 높음 (독립 Pod) | 높음 | 중간 | Airflow + dbt 통합 (권장) |
| BashOperator (같은 Pod) | 낮음 (Worker와 공유) | 낮음 | 낮음 | 간단한 테스트, dbt가 Worker 이미지에 포함 |
| dbt Cloud API Operator | 없음 (SaaS) | 낮음 | 낮음 | dbt Cloud 사용 시 |
3.2 dbt Docker 이미지 빌드¶
dbt를 Kubernetes에서 실행하려면, dbt-core와 프로젝트 코드가 포함된 Docker 이미지를 빌드해야 합니다.
Dockerfile (Multi-stage Build)¶
# ──────────────────────────────────
# Stage 1: Builder
# ──────────────────────────────────
FROM python:3.11-slim AS builder
WORKDIR /build
# pip 패키지 설치 (캐시 활용을 위해 requirements만 먼저 복사)
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install \
dbt-core==1.8.0 \
dbt-postgres==1.8.0 \
-r requirements.txt
# ──────────────────────────────────
# Stage 2: Runtime
# ──────────────────────────────────
FROM python:3.11-slim AS runtime
# 보안: non-root 사용자 생성
RUN groupadd -r dbt && useradd -r -g dbt -d /home/dbt -m dbt
# Builder에서 설치된 패키지 복사
COPY --from=builder /install /usr/local
# dbt 프로젝트 코드 복사
COPY --chown=dbt:dbt dbt_project/ /home/dbt/project/
WORKDIR /home/dbt/project
USER dbt
# profiles.yml은 Secret으로 런타임 마운트 (이미지에 포함하지 않음!)
# 마운트 경로: /home/dbt/.dbt/profiles.yml
ENTRYPOINT ["dbt"]
CMD ["run"]
profiles.yml을 Docker 이미지에 절대 포함하지 마세요
profiles.yml에는 DB 접속 정보(호스트, 유저, 비밀번호)가 들어갑니다.
이미지에 포함하면 이미지에 접근할 수 있는 누구나 DB 접속 정보를 볼 수 있습니다.
반드시 Kubernetes Secret으로 런타임에 마운트하세요.
빌드 & 푸시¶
# 빌드
docker build -t my-registry.example.com/dbt-project:1.0.0 .
# 푸시
docker push my-registry.example.com/dbt-project:1.0.0
requirements.txt 예시¶
# requirements.txt
# dbt adapter는 Dockerfile에서 직접 설치하므로 여기에는 추가 패키지만
dbt-utils==1.1.1
elementary-data==0.14.0
3.3 dbt를 Job으로 실행하기¶
Airflow 없이 dbt만 Kubernetes에서 실행하려면 Job 또는 CronJob을 사용합니다.
완전한 Job YAML¶
# dbt-run-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: dbt-run-daily-20240115
namespace: data-pipelines
labels:
app: dbt
run-type: daily
spec:
backoffLimit: 3 # (1)!
activeDeadlineSeconds: 3600 # (2)!
ttlSecondsAfterFinished: 86400 # (3)!
template:
metadata:
labels:
app: dbt
run-type: daily
spec:
serviceAccountName: dbt-runner
restartPolicy: Never # (4)!
containers:
- name: dbt
image: my-registry.example.com/dbt-project:1.0.0
command: ["dbt"]
args:
- "run"
- "--target"
- "prd"
- "--select"
- "tag:daily"
env:
- name: DBT_PROFILES_DIR
value: /home/dbt/.dbt
- name: DBT_TARGET
value: prd
envFrom:
- secretRef:
name: dbt-env-secrets # (5)!
resources:
requests:
cpu: "1"
memory: 2Gi
limits:
cpu: "2"
memory: 4Gi
volumeMounts:
- name: dbt-profiles
mountPath: /home/dbt/.dbt
readOnly: true
volumes:
- name: dbt-profiles
secret:
secretName: dbt-profiles-secret
- 실패 시 최대 3번까지 재시도합니다.
- 전체 Job 실행 시간을 1시간(3600초)으로 제한합니다. 초과하면 강제 종료됩니다.
- Job 완료 후 24시간(86400초) 뒤에 자동 정리됩니다.
- Job에서는
restartPolicy: Never를 사용합니다.OnFailure를 쓰면 같은 Pod에서 재시작하지만,Never를 쓰면 새 Pod을 생성합니다. - DB 비밀번호 등을 환경변수로 주입합니다.
CronJob으로 주기적 실행¶
# dbt-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: dbt-run-daily
namespace: data-pipelines
spec:
schedule: "0 6 * * *" # (1)!
concurrencyPolicy: Forbid # (2)!
successfulJobsHistoryLimit: 3 # (3)!
failedJobsHistoryLimit: 5
startingDeadlineSeconds: 600 # (4)!
jobTemplate:
spec:
backoffLimit: 2
activeDeadlineSeconds: 3600
ttlSecondsAfterFinished: 172800
template:
metadata:
labels:
app: dbt
run-type: daily-cron
spec:
serviceAccountName: dbt-runner
restartPolicy: Never
containers:
- name: dbt
image: my-registry.example.com/dbt-project:1.0.0
command: ["dbt"]
args: ["run", "--target", "prd", "--select", "tag:daily"]
env:
- name: DBT_PROFILES_DIR
value: /home/dbt/.dbt
resources:
requests:
cpu: "1"
memory: 2Gi
limits:
cpu: "2"
memory: 4Gi
volumeMounts:
- name: dbt-profiles
mountPath: /home/dbt/.dbt
readOnly: true
volumes:
- name: dbt-profiles
secret:
secretName: dbt-profiles-secret
- 매일 06:00 UTC에 실행됩니다.
Forbid: 이전 Job이 아직 실행 중이면 새 Job을 생성하지 않습니다. 데이터 파이프라인에서 중복 실행 방지에 유용합니다.- 성공한 Job 3개, 실패한 Job 5개의 이력을 유지합니다.
- 예정된 시간으로부터 10분(600초) 내에 시작하지 못하면 해당 실행을 건너뜁니다.
# CronJob 상태 확인
kubectl get cronjob -n data-pipelines
kubectl get jobs -n data-pipelines --sort-by=.metadata.creationTimestamp
# 수동 트리거
kubectl create job dbt-manual-run --from=cronjob/dbt-run-daily -n data-pipelines
3.4 dbt + Airflow 통합¶
프로덕션 데이터 파이프라인에서는 dbt를 Airflow와 통합하여 다른 ETL Task와 의존 관계를 관리하는 것이 일반적입니다.
KubernetesPodOperator로 dbt 실행하는 DAG¶
# dags/dbt_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.operators.python import PythonOperator
from kubernetes.client import models as k8s
# ─── 공통 설정 ───
DBT_IMAGE = "my-registry.example.com/dbt-project:1.0.0"
NAMESPACE = "airflow-prd"
dbt_volumes = [
k8s.V1Volume(
name="dbt-profiles",
secret=k8s.V1SecretVolumeSource(secret_name="dbt-profiles-secret"),
),
]
dbt_volume_mounts = [
k8s.V1VolumeMount(
name="dbt-profiles",
mount_path="/home/dbt/.dbt",
read_only=True,
),
]
dbt_env = {
"DBT_PROFILES_DIR": "/home/dbt/.dbt",
"DBT_TARGET": "prd",
}
dbt_resources = k8s.V1ResourceRequirements(
requests={"cpu": "1", "memory": "2Gi"},
limits={"cpu": "2", "memory": "4Gi"},
)
# ─── 실패 콜백 ───
def on_failure_alert(context):
"""Task 실패 시 알림 (Slack, Email 등)"""
task_instance = context["task_instance"]
dag_id = context["dag"].dag_id
task_id = task_instance.task_id
execution_date = context["execution_date"]
log_url = task_instance.log_url
message = (
f"[ALERT] Task Failed!\n"
f"DAG: {dag_id}\n"
f"Task: {task_id}\n"
f"Execution Date: {execution_date}\n"
f"Log: {log_url}"
)
# Slack webhook, email 등으로 발송
print(message)
# ─── DAG 정의 ───
default_args = {
"owner": "data-team",
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"on_failure_callback": on_failure_alert,
}
with DAG(
dag_id="dbt_pipeline",
default_args=default_args,
description="dbt run/test pipeline on Kubernetes",
schedule="0 7 * * *",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["dbt", "data-warehouse"],
) as dag:
# 1. dbt deps — 패키지 설치
dbt_deps = KubernetesPodOperator(
task_id="dbt_deps",
namespace=NAMESPACE,
image=DBT_IMAGE,
cmds=["dbt"],
arguments=["deps"],
volumes=dbt_volumes,
volume_mounts=dbt_volume_mounts,
env_vars=dbt_env,
resources=dbt_resources,
is_delete_operator_pod=True,
get_logs=True,
name="dbt-deps",
)
# 2. dbt run — staging 모델
dbt_run_staging = KubernetesPodOperator(
task_id="dbt_run_staging",
namespace=NAMESPACE,
image=DBT_IMAGE,
cmds=["dbt"],
arguments=["run", "--select", "staging"],
volumes=dbt_volumes,
volume_mounts=dbt_volume_mounts,
env_vars=dbt_env,
resources=dbt_resources,
is_delete_operator_pod=True,
get_logs=True,
name="dbt-run-staging",
)
# 3. dbt run — mart 모델
dbt_run_marts = KubernetesPodOperator(
task_id="dbt_run_marts",
namespace=NAMESPACE,
image=DBT_IMAGE,
cmds=["dbt"],
arguments=["run", "--select", "marts"],
volumes=dbt_volumes,
volume_mounts=dbt_volume_mounts,
env_vars=dbt_env,
resources=dbt_resources,
is_delete_operator_pod=True,
get_logs=True,
name="dbt-run-marts",
)
# 4. dbt test — 전체 테스트
dbt_test = KubernetesPodOperator(
task_id="dbt_test",
namespace=NAMESPACE,
image=DBT_IMAGE,
cmds=["dbt"],
arguments=["test", "--select", "staging marts"],
volumes=dbt_volumes,
volume_mounts=dbt_volume_mounts,
env_vars=dbt_env,
resources=dbt_resources,
is_delete_operator_pod=True,
get_logs=True,
name="dbt-test",
)
# 5. dbt docs generate (선택)
dbt_docs = KubernetesPodOperator(
task_id="dbt_docs_generate",
namespace=NAMESPACE,
image=DBT_IMAGE,
cmds=["dbt"],
arguments=["docs", "generate"],
volumes=dbt_volumes,
volume_mounts=dbt_volume_mounts,
env_vars=dbt_env,
resources=dbt_resources,
is_delete_operator_pod=True,
get_logs=True,
name="dbt-docs",
)
dbt_deps >> dbt_run_staging >> dbt_run_marts >> dbt_test >> dbt_docs
3.5 dbt profiles.yml Secret 관리¶
dbt의 profiles.yml에는 데이터베이스 접속 정보가 포함됩니다. 이 정보를 Kubernetes Secret으로 안전하게 관리하는 방법을 알아봅시다.
방법 1: profiles.yml 전체를 Secret으로 관리¶
# profiles.yml (평문 — 이 파일 자체를 Secret으로 저장)
my_project:
target: prd
outputs:
dev:
type: postgres
host: "{{ env_var('DBT_DB_HOST') }}" # (1)!
port: 5432
user: "{{ env_var('DBT_DB_USER') }}"
password: "{{ env_var('DBT_DB_PASSWORD') }}"
dbname: "{{ env_var('DBT_DB_NAME') }}"
schema: analytics_dev
threads: 4
prd:
type: postgres
host: "{{ env_var('DBT_DB_HOST') }}"
port: 5432
user: "{{ env_var('DBT_DB_USER') }}"
password: "{{ env_var('DBT_DB_PASSWORD') }}"
dbname: "{{ env_var('DBT_DB_NAME') }}"
schema: analytics
threads: 8
- dbt의
env_var()함수를 사용하면, profiles.yml 안에서 환경변수를 참조할 수 있습니다. 이렇게 하면 profiles.yml 자체에는 실제 비밀번호가 포함되지 않습니다.
# dbt-profiles-secret.yaml — profiles.yml을 Secret으로 생성
apiVersion: v1
kind: Secret
metadata:
name: dbt-profiles-secret
namespace: airflow-prd
type: Opaque
stringData:
profiles.yml: |
my_project:
target: prd
outputs:
prd:
type: postgres
host: "{{ env_var('DBT_DB_HOST') }}"
port: 5432
user: "{{ env_var('DBT_DB_USER') }}"
password: "{{ env_var('DBT_DB_PASSWORD') }}"
dbname: "{{ env_var('DBT_DB_NAME') }}"
schema: analytics
threads: 8
# dbt-env-secrets.yaml — DB 접속 정보를 환경변수 Secret으로 관리
apiVersion: v1
kind: Secret
metadata:
name: dbt-env-secrets
namespace: airflow-prd
type: Opaque
stringData:
DBT_DB_HOST: warehouse.example.com
DBT_DB_USER: dbt_user
DBT_DB_PASSWORD: "super-secret-password-here"
DBT_DB_NAME: analytics_db
# Secret 생성 (명령줄에서)
kubectl apply -f dbt-profiles-secret.yaml
kubectl apply -f dbt-env-secrets.yaml
# Secret 확인
kubectl get secrets -n airflow-prd
kubectl describe secret dbt-profiles-secret -n airflow-prd
방법 2: 환경변수만으로 profiles.yml 구성¶
환경변수만으로 profiles.yml의 모든 값을 주입하는 패턴입니다.
# Job에서 Secret 마운트 + 환경변수 주입
apiVersion: batch/v1
kind: Job
metadata:
name: dbt-run-with-secrets
namespace: data-pipelines
spec:
template:
spec:
restartPolicy: Never
containers:
- name: dbt
image: my-registry.example.com/dbt-project:1.0.0
command: ["dbt"]
args: ["run", "--target", "prd"]
env:
- name: DBT_PROFILES_DIR
value: /home/dbt/.dbt
envFrom:
- secretRef:
name: dbt-env-secrets # (1)!
volumeMounts:
- name: dbt-profiles
mountPath: /home/dbt/.dbt
readOnly: true
volumes:
- name: dbt-profiles
secret:
secretName: dbt-profiles-secret
envFrom을 사용하면 Secret의 모든 키-값 쌍이 환경변수로 주입됩니다.
Secret 관리 도구
프로덕션에서는 YAML 파일로 Secret을 직접 관리하기보다, 다음 도구를 사용하는 것이 안전합니다:
- Sealed Secrets: Secret을 암호화하여 Git에 안전하게 저장
- External Secrets Operator: AWS Secrets Manager, HashiCorp Vault 등 외부 저장소에서 자동 동기화
- SOPS (Secrets OPerationS): 파일 암호화 도구. Git에 암호화된 Secret 저장
Part 4: 실전 아키텍처¶
4.1 전체 아키텍처 도식화¶
Service의 원천 데이터
실시간 이벤트 로그
서드파티 데이터
DAG 파싱 & Task 스케줄링
UI 대시보드
DAG/Task 상태 저장
Secret에서 profiles.yml 마운트
모델별 병렬 실행 가능
데이터 품질 검증
분석용 데이터 저장
대시보드 & 시각화
Raw 데이터 아카이브
K8s 리소스 매핑 요약¶
| 파이프라인 컴포넌트 | Kubernetes 리소스 | 비고 |
|---|---|---|
| Airflow Webserver | Deployment + Service + Ingress | HA 2 replicas |
| Airflow Scheduler | Deployment | HA 2 replicas, ServiceAccount 필수 |
| Airflow Triggerer | Deployment | 1 replica |
| Airflow Metadata DB | 외부 관리형 DB (RDS) 또는 StatefulSet | 외부 DB 권장 |
| Worker Pod | 동적 Pod (KubernetesExecutor) | Task마다 생성/삭제 |
| dbt Pod | 동적 Pod (KubernetesPodOperator) | Secret 마운트 |
| DAG 파일 | git-sync Sidecar | Scheduler/Webserver에 부착 |
| DB 접속 정보 | Secret | profiles.yml + 환경변수 |
| Airflow 설정 | ConfigMap + 환경변수 | values.yaml에서 관리 |
| 로그 | S3/GCS (Remote Logging) | Pod 삭제 후에도 보존 |
4.2 프로덕션 체크리스트¶
Airflow + dbt를 프로덕션에 배포하기 전에 반드시 확인해야 할 항목들입니다.
RBAC 설정¶
# 최종 RBAC 구성 — 모든 리소스를 한 파일에
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: airflow-worker
namespace: airflow-prd
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: airflow-pod-manager
namespace: airflow-prd
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch", "create", "delete", "patch"]
- apiGroups: [""]
resources: ["pods/log"]
verbs: ["get", "list"]
- apiGroups: [""]
resources: ["pods/exec"]
verbs: ["create", "get"]
- apiGroups: [""]
resources: ["events"]
verbs: ["list", "watch"]
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["secrets"]
verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow-pod-manager-binding
namespace: airflow-prd
subjects:
- kind: ServiceAccount
name: airflow-worker
namespace: airflow-prd
roleRef:
kind: Role
name: airflow-pod-manager
apiGroup: rbac.authorization.k8s.io
Resource 설정 가이드¶
| 컴포넌트 | CPU Request | CPU Limit | Memory Request | Memory Limit |
|---|---|---|---|---|
| Webserver | 500m | 1 | 1Gi | 2Gi |
| Scheduler | 1 | 2 | 2Gi | 4Gi |
| Triggerer | 250m | 500m | 512Mi | 1Gi |
| dbt Worker | 1 | 2 | 2Gi | 4Gi |
| ETL Worker | 500m | 2 | 1Gi | 4Gi |
| git-sync | 50m | 100m | 64Mi | 128Mi |
| StatsD | 50m | 100m | 64Mi | 128Mi |
리소스 설정은 반드시 모니터링 후 조정하세요
위 값은 출발점일 뿐입니다. 실제 사용 패턴에 따라 Prometheus/Grafana로 리소스 사용량을 모니터링하고,
requests는 평균 사용량에, limits는 피크 사용량에 맞추어 조정하세요.
PodDisruptionBudget (PDB) for Airflow Scheduler¶
노드 유지보수나 업데이트 시에도 Scheduler가 최소 1개 이상 실행되도록 보장합니다.
# pdb-scheduler.yaml
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: airflow-scheduler-pdb
namespace: airflow-prd
spec:
minAvailable: 1 # (1)!
selector:
matchLabels:
component: scheduler
release: airflow
- 최소 1개의 Scheduler Pod은 항상 실행 중이어야 합니다.
kubectl drain등으로 노드를 비울 때, 이 조건이 충족되지 않으면 eviction이 차단됩니다.
# pdb-webserver.yaml
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: airflow-webserver-pdb
namespace: airflow-prd
spec:
minAvailable: 1
selector:
matchLabels:
component: webserver
release: airflow
Secret 관리 — External Secrets Operator 예시¶
# external-secret.yaml
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
name: dbt-profiles-external
namespace: airflow-prd
spec:
refreshInterval: 1h # (1)!
secretStoreRef:
name: aws-secrets-manager
kind: ClusterSecretStore
target:
name: dbt-env-secrets # (2)!
creationPolicy: Owner
data:
- secretKey: DBT_DB_HOST
remoteRef:
key: prd/data-warehouse/credentials
property: host
- secretKey: DBT_DB_USER
remoteRef:
key: prd/data-warehouse/credentials
property: username
- secretKey: DBT_DB_PASSWORD
remoteRef:
key: prd/data-warehouse/credentials
property: password
- secretKey: DBT_DB_NAME
remoteRef:
key: prd/data-warehouse/credentials
property: dbname
- 1시간마다 외부 Secret Manager에서 값을 동기화합니다.
- 이 ExternalSecret이 자동으로
dbt-env-secrets라는 이름의 Kubernetes Secret을 생성/갱신합니다.
Metadata DB 백업¶
Airflow의 Metadata DB에는 DAG 실행 이력, 변수, 커넥션 등 중요한 데이터가 저장됩니다. 정기 백업은 필수입니다.
# metadata-db-backup-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: airflow-db-backup
namespace: airflow-prd
spec:
schedule: "0 2 * * *" # 매일 02:00 UTC
concurrencyPolicy: Forbid
jobTemplate:
spec:
backoffLimit: 2
template:
spec:
restartPolicy: Never
containers:
- name: pg-dump
image: postgres:16-alpine
command:
- /bin/sh
- -c
- |
FILENAME="airflow_backup_$(date +%Y%m%d_%H%M%S).sql.gz"
pg_dump -h $DB_HOST -U $DB_USER -d $DB_NAME | gzip > /backup/$FILENAME
echo "Backup completed: $FILENAME"
envFrom:
- secretRef:
name: airflow-metadata-secret
volumeMounts:
- name: backup-volume
mountPath: /backup
volumes:
- name: backup-volume
persistentVolumeClaim:
claimName: airflow-backup-pvc
최종 프로덕션 체크리스트¶
배포 전 점검표
RBAC & 보안
- [ ] 전용 ServiceAccount 생성
- [ ] 최소 권한 Role/RoleBinding 설정
- [ ]
kubectl auth can-i로 권한 검증 - [ ] Secret으로 DB 접속 정보 관리
- [ ] External Secrets Operator 또는 Sealed Secrets 설정
리소스 관리
- [ ] 모든 컴포넌트에 requests/limits 설정
- [ ] ResourceQuota로 Namespace 리소스 상한 설정
- [ ] LimitRange로 컨테이너 기본값 설정
- [ ] PodDisruptionBudget 설정 (Scheduler, Webserver)
DAG 배포
- [ ] git-sync 설정 및 동기화 확인
- [ ] Git 인증 (HTTPS token 또는 SSH key) 설정
- [ ] subPath로 DAG 디렉토리 지정
로깅 & 모니터링
- [ ] Remote Logging (S3/GCS) 설정
- [ ] IRSA 또는 GKE Workload Identity로 클라우드 접근 설정
- [ ] StatsD → Prometheus → Grafana 연동
- [ ] 주요 메트릭 알림 설정 (Scheduler heartbeat, Task 실패률)
데이터베이스
- [ ] 외부 관리형 DB (RDS/Cloud SQL) 사용
- [ ] DB 백업 CronJob 설정
- [ ] Connection pooling 설정 (PgBouncer)
네트워크
- [ ] Ingress + TLS 설정 (Webserver 접근)
- [ ] NetworkPolicy로 불필요한 트래픽 차단
마무리¶
이번 보너스 챕터에서는 데이터 파이프라인의 핵심 도구인 Airflow와 dbt를 Kubernetes 위에 프로덕션 수준으로 배포하는 방법을 다뤘습니다.
핵심 요약:
- RBAC & ServiceAccount: Airflow가 Pod를 동적으로 생성하려면 적절한 K8s API 권한이 필수
- KubernetesExecutor: Task별 Pod 격리로 리소스 효율성과 안정성 확보
- Helm Chart: 공식 Airflow Helm Chart로 복잡한 배포를 표준화
- dbt on K8s: Secret 마운트로 접속 정보 보호, KubernetesPodOperator로 Airflow 통합
- 프로덕션 체크리스트: RBAC, 리소스 관리, Remote Logging, 모니터링, 백업까지
더 깊이 알아보기