Dbt run
import requests
import os
import time
import json
import logging
from pathlib import Path
import shlex
from dbt.cli.main import dbtRunner, dbtRunnerResult
DBT_BASE_COMMAND = ["--no-use-colors", "--log-format-file", "json"]
def get_dbt_log(log_path) -> str:
with open(log_path) as log:
return log.read()
def set_secrets_as_envs(secret_name: str):
from google.cloud import secretmanager
secrets = secretmanager.SecretManagerServiceClient()
secret = secrets.access_secret_version(name=secret_name)
secret_str = secret.payload.data.decode("UTF-8")
secrets = json.loads(secret_str)
os.environ.update(secrets)
def publish_docs(dbt_project: str):
# Connection informasjon fo å pushe dbt docs
dbt_docs_url = f'{os.environ["DBT_DOCS_URL"]}{dbt_project}'
files = [
"target/manifest.json",
"target/catalog.json",
"target/index.html",
]
multipart_form_data = {}
for file_path in files:
file_name = os.path.basename(file_path)
with open(file_path, "rb") as file:
file_contents = file.read()
multipart_form_data[file_name] = (file_name, file_contents)
res = requests.put(dbt_docs_url, files=multipart_form_data)
res.raise_for_status()
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
if __name__ == "__main__":
if secret_name := os.getenv("TEAM_GCP_SECRET_PATH"):
set_secrets_as_envs(secret_name=secret_name)
else:
raise ValueError("missing environment variable TEAM_GCP_SECRET_PATH")
log_path = Path(__file__).parent / "logs/dbt.log"
os.environ["TZ"] = "Europe/Oslo"
time.tzset()
schema = os.environ["DB_SCHEMA"]
os.environ["DBT_ENV_SECRET_USER"] = f"{os.environ['DB_USER']}[{schema}]"
os.environ["DBT_DB_SCHEMA"] = schema
os.environ["DBT_DB_DSN"] = os.environ["DB_DSN"]
os.environ["DBT_ENV_SECRET_PASS"] = os.environ["DB_PASSWORD"]
logging.info("DBT miljøvariabler er lastet inn")
# default dbt kommando er build
command = shlex.split(os.getenv("DBT_COMMAND", "build"))
if dbt_models := os.getenv("DBT_MODELS", None):
command = command + ["--select", dbt_models]
dbt = dbtRunner()
dbt_deps = dbt.invoke(DBT_BASE_COMMAND + ["deps"])
output: dbtRunnerResult = dbt.invoke(DBT_BASE_COMMAND + command)
# Exit code 2, feil utenfor DBT
if output.exception:
raise output.exception
# Exit code 1, feil i dbt (test eller under kjøring)
if not output.success:
raise Exception(output.result)
if "docs" in command:
dbt_project = os.environ["DBT_DOCS_PROJECT_NAME"]
logging.info("publiserer dbt docs")
publish_docs(dbt_project=dbt_project)
# Legg til logikk for å skrive logg til xcom