콘텐츠로 이동

4. 모델 작성

Source 정의

먼저 원천 테이블을 source로 등록합니다.

# models/staging/_sources.yml
version: 2

sources:
  - name: raw
    database: my_database
    schema: raw_data
    tables:
      - name: users
        description: "사용자 원천 테이블"
      - name: events
        description: "이벤트 로그 테이블"
        loaded_at_field: created_at
        freshness:
          warn_after: {count: 12, period: hour}
          error_after: {count: 24, period: hour}

Staging 모델

원천 데이터를 1:1로 정제합니다.

-- models/staging/stg_users.sql
WITH source AS (
    SELECT * FROM {{ source('raw', 'users') }}
),

renamed AS (
    SELECT
        id          AS user_id,
        name        AS user_name,
        email,
        status,
        created_at,
        updated_at
    FROM source
)

SELECT * FROM renamed

staging 원칙

  • 컬럼 rename, type cast만 수행
  • 비즈니스 로직 X
  • 원천 테이블당 1개의 staging 모델

ref()로 모델 연결

-- models/intermediate/int_user_events.sql
WITH users AS (
    SELECT * FROM {{ ref('stg_users') }}
),

events AS (
    SELECT * FROM {{ ref('stg_events') }}
)

SELECT
    u.user_id,
    u.user_name,
    e.event_type,
    e.event_at
FROM users u
JOIN events e ON u.user_id = e.user_id

Mart 모델

비즈니스 요구사항에 맞는 최종 테이블을 만듭니다.

-- models/marts/fct_daily_events.sql
WITH user_events AS (
    SELECT * FROM {{ ref('int_user_events') }}
)

SELECT
    user_id,
    user_name,
    DATE(event_at)  AS event_date,
    event_type,
    COUNT(*)        AS event_count
FROM user_events
GROUP BY 1, 2, 3, 4

Jinja 활용

조건문

SELECT
    *,
    {% if target.name == 'dev' %}
        LIMIT 1000    -- 개발 환경에서는 1000건만
    {% endif %}
FROM {{ ref('stg_events') }}

반복문

{% set event_types = ['login', 'purchase', 'logout'] %}

SELECT
    user_id,
    {% for event_type in event_types %}
    COUNT(CASE WHEN event_type = '{{ event_type }}' THEN 1 END)
        AS {{ event_type }}_count
    {% if not loop.last %},{% endif %}
    {% endfor %}
FROM {{ ref('stg_events') }}
GROUP BY 1

매크로

-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
    ROUND({{ column_name }} / 100.0, 2)
{% endmacro %}

-- 사용:
SELECT
    {{ cents_to_dollars('amount_cents') }} AS amount_dollars
FROM {{ ref('stg_payments') }}

Incremental 모델

대용량 데이터를 효율적으로 처리합니다.

-- models/marts/fct_events.sql
{{
    config(
        materialized='incremental',
        unique_key='event_id',
        incremental_strategy='merge'
    )
}}

SELECT
    event_id,
    user_id,
    event_type,
    event_at
FROM {{ ref('stg_events') }}

{% if is_incremental() %}
    -- 기존 테이블이 있으면 신규 데이터만 처리
    WHERE event_at > (SELECT MAX(event_at) FROM {{ this }})
{% endif %}

주요 dbt 명령어

dbt run                        # 모든 모델 실행
dbt run --select stg_users     # 특정 모델만 실행
dbt run --select staging.*     # staging 폴더 전체
dbt run --select +fct_daily_events  # 해당 모델 + 업스트림 전체
dbt run --full-refresh         # incremental 모델 전체 재빌드

직접 체험해보기

모델을 작성한 뒤 dbt compile로 렌더링된 SQL을 확인하고, dbt run으로 실행해보세요.

Terminal
jane@mac ~/my_project $ _

다음 단계

모델을 작성할 수 있게 되었다면, Materialization 심화에서 저장 방식별 차이를 알아보세요.

댓글