DBT + Argo Workflow + Snowflake

2024. 8. 12. 01:45카테고리 없음

사내에서 DBT를 이용해 유저 + 이벤트를 모델화 시켜서 이용 중이다. 각 이벤트 및 유저 데이터가 들어오는 주기가 다르고 이를 Argo Workflow를 이용해서 업데이트 하는 방법을 작성하려고 한다. 

 

다른 개발자 붙들이 쓴 글 대부분이 DBT + Airflow 인데 지금 팀에서는 Argo Workflow를 사용하고 있어서 Argo Workflow을 사용한다면 이 글이 도움이 될 것 같다!

 

Argo Workflow(link)

 

Arco CD에 비해서 많이 유명하진 않지만 Argo에서 Workflow를 개발해서 이용한 플랫폼이 있다. K8S가 있다면 Helm 및 일반적인 이미지를 이용해서 쉽게 배포할 수 있다. 

 

Argo Helm Chart 홈페이지, github

 

Argo Helm Charts

ArgoProj Helm Charts

argoproj.github.io

 

사내 다른 팀에서는 Airflow를 사용하고 있는데 사용해본 경험을 빗대어서 비교해보면 

 

Argo가 더 좋았던 점

1. 개인적으로 UI/UX가 보기 좋다.

그런데 이건 다른 팀에서 사용하는 Airflow 자체가 조금 더 복잡하고 실행 블럭이 많아서 그런 것 같기도 하다.

 

2. 각각의 실행 블럭이 Container가 되어 실행된다. 

하나의 Workflow에 각각의 실행 블럭이 container가 되어 기록으로 남는다. 그래서 workflow가 종료되어도 아래와 같이 쿠버네티스 안에 기록이 남는다.

 

또 각각의 container 안에서 독립적인 환경을 만들어서 사용할 수 있어서 각각의 블럭이 환경과 관련해서 영향을 끼치지 않아 이슈가 발생하지 않는다.

 

 

Airflow가 더 좋았던 점

1. 각 워크플로우의 자유로운 실행이 가능했다.

특정 블럭까지 실행했다 치고 나머지 블럭을 실행한다던가 Skipped이 가능하다던가 등등의 자유로운 실행이 가능했다. Argo에서는 이게 안된다.

 

2. 더 폭넓은 확장성

내가 만든 이미지 기반이 아닌 다양한 Executor가 지원되고 있어서 이는 더욱 편해보였다. 내가 귀찮게 이미지 안만들고 파이썬 함수로 편하게 사용할 수 있는 것은 더욱 편해보였다.

 

 

DBT 사용하고 있는 환경

Source Data

 

A 데이터: 오전 8시 30분

B, C, D 데이터: 1시간 interval로 데이터 들어옴.

실시간으로 들어오고 1시간에 한번씩 snowflake에 적재함.

E 데이터: 오전 11시

 

각 데이터들은 아래와 같은 형태로 구성되어 있다.

 

staging

1. source 데이터와 직접적으로 연결되어 있다. 

2. source에서 원하는 데이터를 뽑아서 1치적으로 사용할 수 있는 테이블로 구성한다.

 

mart

1. staging에서 뽑은 데이터를 조합해서 의미를 갖고 있는 데이터로 구성한다.

2. e.g) staging에서 결제 데이터, 주소 데이터, 유저 데이터가 있다면 이 세개를 조합해서 만드는 케이스

 

segment

1. 조건을 추가해서 공통적인 의미를 갖고 있는 데이터를 뽑아낸 케이스이다. 최종적인 Audience가 생성된 테이블이다.

2. e.g) 치킨을 좋아하는 사람들, 배달 음식을 자주 시키는 사람, 백화점을 자주 이용하는 유저

 

이런 데이터의 source 부터 staging, mart, segment 들은 snowflake에서 생성된다. 

스키마에 DBT라는 스키마가 따로 생성되고 view, table 등으로 설정해서 생성된다.

 

 

dbt 모델을 업데이트하기 위해서 했던 작업은 dbt 모델이 구성되어 있는 Repo를 Docker를 이용해 이미지로 만드는 작업이 필요하다.

 

FROM python:3.10-slim

WORKDIR /app

RUN pip install dbt-core dbt-snowflake
RUN apt-get update && apt-get install -y git

ARG GITHUB_TOKEN
ENV GOPRIVATE=github.com/

COPY . /app

 

위가 작성한 Docker 파일이고 특별한 내용은 없이 Python 이미지를 사용했고 python에서 dbt를 사용하는데 필요한 아래 라이브러리를 설치했다.

 

1. dbt-core

2. dbt-snowflake

 

이를 Docker Image로 만들어서 github action을 통해서 aws ecr에 저장했다.

 

Argo workflow template yaml

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  namespace: prod
  name: dbt-sync-template
spec:
  entrypoint: dbt-sync
  templates:
    - name: dbt-sync
      inputs:
        parameters:
          - name: tags
      container:
        name: dbt-sync
        image: image url
        imagePullPolicy: Always
        command: [ "/bin/bash", "-c" ]
        args:
          - |
            echo "Running dbt update"
            cat <<EOF > /app/profiles.yml
            airbloc_dbt:
              target: dev
              outputs:
                dev:
                  type: snowflake
                  account: ${DBT_SNOWFLAKE_ACCOUNT}
                  user: ${DBT_SNOWFLAKE_USER}
                  password: ${DBT_SNOWFLAKE_PASSWORD}
                  role: ${DBT_SNOWFLAKE_ROLE}
                  warehouse: ${DBT_SNOWFLAKE_WAREHOUSE}
                  database: ${DBT_SNOWFLAKE_DATABASE}
                  schema: ${DBT_SNOWFLAKE_SCHEMA}
                  threads: 4
            EOF
            dbt deps
            dbt run --profiles-dir=/app --select "{{inputs.parameters.tags}}"
        env:

 

1. profiles.yaml에 dbt 실행에 필요한 snowflake 유저 정보를 기록하게 cat 기능을 이용해서 작성했다.

여기서 snowflake 정보는 중요한 정보라 바로 작성하지 않고 k8s secret 파일에 필요한 값을 작성하고 환경변수로 사용할 수 있게 만들었다.

 

2. dbt run --select

workflow template 파일을 다른 곳에서 사용할 수 있는 template으로 만들어서 다른 곳에서는 사용할 때는 업데이트가 필요한  tag만 입력하여 업데이트 되도록 설정했다. 해당 workflow가 실행될 때 업데이트가 되길 원하는 태그를 추가해서 아래와 같이 작성할 수 있다.

            - name: dbt-model-update
              dependencies: [kbcard-send-slack-alert, lottecard-send-slack-alert]
              templateRef:
                name: dbt-sync-template
                template: dbt-sync
              arguments:
                parameters:
                  - name: tags
                    value: "+tag:ingest-consents"
              metadata:
                labels:
                  task: "dbt-daily-consents-update"
                  date: '{{tasks.resolve-last-day.outputs.result}}'

 

실행된 로그를 보면 아래와 같이 실행되는 것을 확인할 수 있다.

 

--select에 앞에 +을 붙이냐 뒤에 +을 붙이냐에 따라 다르게 모델이 업데이트 된다. 앞에 붙어있을 경우 이전의 데이터 뒤에 붙었을 경우 뒤의 데이터가 업데이트 된다. 

 

Argo Worfklow를 이용해서 dbt 모델을 업데이트 하는 방법을 공유했다. 대부분의 글들이 Airflow을 사용하긴 했지만 둘다 사용해봤을 때 Airflow의 장점도 많았지만 Argo에 있는것을 모두 버리고 Airflow로 바꾸기에는 비용이 꽤나 커서 이대로 계속 Argo Workflow를 사용하려고 한다.

 

Argo와 Snowflake, DBT를 함께 사용한다면 혹은 사용할 예정이라면 이 글이 도움이 될 것 이다!