Dbt operator

from typing import Optional

from airflow.models import Variable
from airflow import DAG

from dataverk_airflow import python_operator


# Eksempel dbt-operator
# https://github.com/navikt/dv-a-team-dags/blob/main/operators/dbt_operator.py
def dbt_operator(
    dag: DAG,
    name: str,
    repo: str,
    script_path: str,
    dbt_secret_name: str,
    branch: str = "main",
    retries: int = 2,
    startup_timeout_seconds: int = 60 * 10,
    dbt_command: str = "build",
    dbt_image: str = "ghcr.io/navikt/dvh-images/airflow-dbt:2024-10-21-0b5d929", # oppdateres manuelt når det kommer nytt image
    dbt_models: Optional[str] = None,
    do_xcom_push: bool = False,
    allowlist: list = [],
    publish_docs: bool = False,
    dbt_docs_project_name: str = None,
):
    dvh_db_environment = Variable.get("DVH_DB_ENVIRONMENT")

    env_vars = {
        "DBT_COMMAND": dbt_command,
        "TEAM_GCP_PROJECT": Variable.get("TEAM_GCP_PROJECT"),
        "DBT_DB_TARGET": dvh_db_environment,
        "TEAM_GCP_SECRET_PATH": f"projects/{Variable.get('TEAM_GCP_PROJECT')}/secrets/{dbt_secret_name}/versions/latest",
    }
    if dbt_models:
        env_vars["DBT_MODELS"] = dbt_models

    if publish_docs:
        if not dbt_docs_project_name:
            raise ValueError(
                "Provide a not-null value for dbt_docs_project_name when publish docs=True"
            )
        env_vars["DBT_DOCS_PROJECT_NAME"] = dbt_docs_project_name
        env_vars["DBT_DOCS_URL"] = Variable.get("DBT_DOCS_URL")

    return python_operator(
        dag=dag,
        name=name,
        startup_timeout_seconds=startup_timeout_seconds,
        repo=repo,
        branch=branch,
        script_path=script_path,
        allowlist=allowlist,
        slack_channel=Variable.get("SLACK_OPS_CHANNEL"),
        retries=retries,
        extra_envs=env_vars,
        image=dbt_image,
        do_xcom_push=do_xcom_push,
    )