An Airflow operator to call the main function from the dbt-core Python package

Overview

airflow-dbt-python

PyPI version GitHub build status Code style: black

An Airflow operator to call the main function from the dbt-core Python package

Motivation

Airflow running in a managed environment

Although dbt is meant to be installed and used as a CLI, we may not have control of the environment where Airflow is running, disallowing us the option of using dbt as a CLI.

This is exactly what happens when using Amazon's Managed Workflows for Apache Airflow or MWAA: although a list of Python requirements can be passed, the CLI cannot be found in the worker's PATH.

There is a workaround which involves using Airflow's BashOperator and running Python from the command line:

from airflow.operators.bash import BashOperator

BASH_COMMAND = "python -c 'from dbt.main import main; main()' run"
operator = BashOperator(
    task_id="dbt_run",
    bash_command=BASH_COMMAND,
)

But it can get sloppy when appending all potential arguments a dbt run command (or other subcommand) can take.

As you may expect, airflow-dbt-python abstracts the complexity of handling CLI arguments by defining an operator for each dbt subcommand, and having each operator be defined with attribute for each possible CLI argument.

An alternative to airflow-dbt that works without the dbt CLI

The existing airflow-dbt package, by default, would not work if the dbt CLI is not in PATH, which means it would not be usable in MWAA. There is a workaround via the dbt_bin argument, which can be set to "python -c 'from dbt.main import main; main()' run", in similar fashion as the BashOperator example. Yet this approach is not without its limitations:

  • airflow-dbt works by wrapping the dbt CLI, which makes our code dependent on the environment in which it runs.
  • airflow-dbt does not support the full range of arguments a command can take. For example, DbtRunOperator does not have an attribute for fail_fast.
  • airflow-dbt does not return anything after the execution, which no information is available for downstream tasks to pull via XCom. An even if it tried to, since it works by wrapping the CLI, it could only attempt to parse the lines printed by dbt to STDOUT. On the other hand, airflow-dbt-python will try to return the information of a dbt result class, as defined in dbt.contracts.results, which opens up possibilities for downstream tasks to condition their execution on the result of a dbt command.

Avoid installing unnecessary dbt plugins

Finally, airflow-dbt-python does not depend on dbt but on dbt-core. The connectors: dbt-redshift, dbt-postgres, dbt-snowflake, and dbt-bigquery are available as installation extras instead of being bundled up by default, which happens when you attempt to install dbt via python -m pip install dbt.

This allows you to easily control what is installed in your environment. One particular example of when this is extremely useful is in the case of the dbt-snowflake connector, which depends on cryptography. This dependency requires the Rust toolchain to run, and this is not supported in a few distributions (like the one MWAA runs on). Even if that's not the case, airflow-dbt-python results in a lighter installation due to only depending on dbt-core.

Usage

Currently, the following dbt commands are supported:

  • clean
  • compile
  • debug
  • deps
  • ls
  • parse
  • run
  • run-operation
  • seed
  • snapshot
  • source (Not well tested)
  • test

Examples

> dbt_seed >> dbt_run ">
from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow_dbt_python.operators.dbt import (
    DbtRunOperator,
    DbtSeedOperator,
    DbtTestoperator,
)

args = {
    'owner': 'airflow',
}

with DAG(
    dag_id='example_dbt_operator',
    default_args=args,
    schedule_interval='0 0 * * *',
    start_date=days_ago(2),
    dagrun_timeout=timedelta(minutes=60),
    tags=['example', 'example2'],
) as dag:
    dbt_test = DbtTestOperator(
        task_id="dbt_test",
        selector="pre-run-tests",
    )

    dbt_seed = DbtSeedOperator(
        task_id="dbt_seed",
        select=["/path/to/first.csv", "/path/to/second.csv"],
        full_refresh=True,
    )

    dbt_run = DbtRunOperator(
        task_id="dbt_run",
        models=["/path/to/models"],
        full_refresh=True,
        fail_fast=True,
    )

    dbt_test >> dbt_seed >> dbt_run

Requirements

airflow-dbt-python is tested in Python 3.7, 3.8, and 3.9, although it could also support older versions.

On the Airflow side, we unit test with versions 1.10.12 and upwards, including the latest version 2 release. Regardless, more testing is planned to ensure compatibility with version 2 of Airflow.

Finally, airflow-dbt-python requires at least dbt version 0.19. Unit tests have verified to pass with version 0.20 after minor changes that should not have major effects anywhere else. Regardless, support for version 0.20 of dbt should be considered experimental.

Installing

From PyPI:

pip install airflow-dbt-python

Any dbt connectors you require may be installed by specifying extras:

pip install airflow-dby-python[snowflake,postgres]

From this repo:

Clone the repo:

git clone https://github.com/tomasfarias/airflow-dbt-python.git
cd airflow-dbt-python

With poetry:

poetry install

Install any extras you need, and only those you need:

poetry install -E postgres -E redshift

Testing

Tests are written using pytest, can be located in test/, and they can be run locally with poetry:

poetry run pytest -vv

License

This project is licensed under the MIT license. See LICENSE.

Comments
  • dbt not writing log to file

    dbt not writing log to file

    It seems that the dbt.log file is not being written inside the temp directory generated by the run.

    Is there any extra config that needs to be done?

    Thanks!

    opened by gvillafanetapia 11
  • Loading dbt from S3 zip file doesn't work

    Loading dbt from S3 zip file doesn't work

    Hello, testing version airflow-dbt-python==0.11.0 on MWAA 2.0.2, it works flawlessly with:

    dbt_test = DbtTestOperator( task_id="dbt_run_daily", project_dir="s3://s3-bucket/s3-prefix/dbt/", profiles_dir="s3://s3-bucket/s3-prefix/dbt_profile", target="dev", profile="dev_profile", do_xcom_push_artifacts=["run_results.json"], )

    but if I try using a dbt.zip file:

    dbt_test = DbtTestOperator( task_id="dbt_run_daily", project_dir="s3://s3-bucket/prefix/dbt/dbt.zip", profiles_dir="s3://s3-bucket/prefix/dbt_profile", target="dev", profile="dev_profile", do_xcom_push_artifacts=["run_results.json"], )

    where dbt.zip file structure is:

    d----- 24/01/2022 19:05 analyses d----- 24/01/2022 19:05 data d----- 27/01/2022 14:59 logs d----- 24/01/2022 19:05 macros d----- 01/02/2022 18:42 models d----- 27/01/2022 14:48 packages d----- 24/01/2022 19:05 snapshots d----- 27/01/2022 15:01 target d----- 01/02/2022 18:54 tests -a---- 24/01/2022 19:05 29 .gitignore -a---- 27/01/2022 15:19 1339 dbt_project.yml -a---- 27/01/2022 14:50 88 packages.yml -a---- 24/01/2022 19:05 571 README.md

    I can see these logs:

    [2022-02-07 09:57:04,793] {{dbt.py:263}} INFO - Fetching profiles.yml from S3: s3://s3-bucket/s3-prefix/dbt_profile [2022-02-07 09:57:04,805] {{s3.py:35}} INFO - Downloading dbt profiles file from: s3://s3-bucket/s3-prefix/dbt_profile [2022-02-07 09:57:04,806] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default [2022-02-07 09:57:04,840] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:04,840] {{base_aws.py:179}} INFO - No credentials retrieved from Connection [2022-02-07 09:57:04,840] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:04,840] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=None [2022-02-07 09:57:04,849] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:04,849] {{base_aws.py:157}} INFO - role_arn is None [2022-02-07 09:57:05,063] {{s3.py:53}} INFO - Saving s3.Object(bucket_name='s3-bucket', key='s3-prefix/dbt_profile/profiles.yml') file to: /tmp/airflowtmpmtcgxrxd/profiles.yml [2022-02-07 09:57:05,093] {{dbt.py:271}} INFO - Fetching dbt project from S3: s3://s3-bucket/s3-prefix/dbt/dbt.zip [2022-02-07 09:57:05,094] {{s3.py:85}} INFO - Downloading dbt project files from: s3://s3-bucket/s3-prefix/dbt/dbt.zip [2022-02-07 09:57:05,094] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default [2022-02-07 09:57:05,127] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:05,127] {{base_aws.py:179}} INFO - No credentials retrieved from Connection [2022-02-07 09:57:05,127] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:05,127] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=None [2022-02-07 09:57:05,137] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:05,137] {{base_aws.py:157}} INFO - role_arn is None [2022-02-07 09:57:05,222] {{s3.py:53}} INFO - Saving s3.Object(bucket_name='s3-bucket', key='s3-prefix/dbt/dbt.zip') file to: /tmp/airflowtmpmtcgxrxd/dbt_project.zip [2022-02-07 09:57:05,411] {{dbt.py:152}} INFO - Running dbt configuration: TestTaskConfig(cls=<class 'dbt.task.test.TestTask'>, project_dir='/tmp/airflowtmpmtcgxrxd/', profiles_dir='/tmp/airflowtmpmtcgxrxd/', profile='dev_profile', target='dev', compiled_target=None, fail_fast=None, single_threaded=None, threads=None, use_experimental_parser=None, vars='{}', warn_error=None, log_format=None, log_cache_events=False, record_timing_info=None, debug=None, defer=None, partial_parse=None, use_colors=None, static_parser=None, version_check=None, send_anonymous_usage_stats=None, write_json=None, exclude=None, select=None, selector_name=None, state=None, models=None, generic=None, indirect_selection=None, singular=None, store_failures=None, which='test') [2022-02-07 09:57:05,632] {{functions.py:248}} INFO - 09:57:05.632049 [info ] [MainThread]: Partial parse save file not found. Starting full parse. [2022-02-07 09:57:05,632] {{log.py:235}} WARNING - 09:57:05 Partial parse save file not found. Starting full parse. [2022-02-07 09:57:05,632] {{functions.py:248}} INFO - 09:57:05 Partial parse save file not found. Starting full parse. [2022-02-07 09:57:06,226] {{functions.py:248}} INFO - 09:57:06.226743 [info ] [MainThread]: Found 0 models, 0 tests, 0 snapshots, 0 analyses, 191 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics [2022-02-07 09:57:06,227] {{log.py:235}} WARNING - 09:57:06 Found 0 models, 0 tests, 0 snapshots, 0 analyses, 191 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics [2022-02-07 09:57:06,227] {{functions.py:248}} INFO - 09:57:06 Found 0 models, 0 tests, 0 snapshots, 0 analyses, 191 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics [2022-02-07 09:57:06,228] {{functions.py:248}} INFO - 09:57:06.228139 [info ] [MainThread]: [2022-02-07 09:57:06,228] {{log.py:235}} WARNING - 09:57:06 [2022-02-07 09:57:06,228] {{functions.py:248}} INFO - 09:57:06
    [2022-02-07 09:57:06,228] {{functions.py:250}} WARNING - 09:57:06.228662 [warn ] [MainThread]: [WARNING]: Nothing to do. Try checking your model configs and model specification args [2022-02-07 09:57:06,229] {{log.py:235}} WARNING - 09:57:06 [ [2022-02-07 09:57:06,229] {{log.py:235}} WARNING - WARNING [2022-02-07 09:57:06,229] {{log.py:235}} WARNING - ]: Nothing to do. Try checking your model configs and model specification args [2022-02-07 09:57:06,229] {{functions.py:250}} WARNING - 09:57:06 [WARNING]: Nothing to do. Try checking your model configs and model specification args [2022-02-07 09:57:06,282] {{taskinstance.py:1192}} INFO - Marking task as SUCCESS. dag_id=dbt_python_tests, task_id=dbt_run_daily, execution_date=20220207T095658, start_date=20220207T095704, end_date=20220207T095706 [2022-02-07 09:57:06,310] {{taskinstance.py:1246}} INFO - 1 downstream tasks scheduled from follow-on schedule check [2022-02-07 09:57:09,764] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:09,764] {{local_task_job.py:188}} WARNING - State of this instance has been externally set to success. Terminating instance.

    But tests (there are 6) and macros (more than 500) are not being retrieved. I tried to perform several tests but got no luck. Did I do something wrong?

    question 
    opened by yarimarchetti 10
  • dbt seed operator only runs successfully on first run after reboot of mwaa

    dbt seed operator only runs successfully on first run after reboot of mwaa

    I'm using this very helpful package to run dbt code on mwaa and have encountered a weird bug where the dbt seed operator only works on the first run after the mwaa environment has been created or updated. Subsequent runs fail with a .csv not found error. Notably, dbt seed appears to be looking in the wrong temp directory on the second run.

    An example of the failure: [Errno 2] No such file or directory: '/tmp/airflowtmpeijb2yn7/data/METRICS_SEED.csv' The temp directory, in this case airflowtmpeijb2yn7, does not match the directory that the "fetch from s3" step has downloaded all the dbt files into. Looking at that log, I can see that the .csv file I wanted was downloaded, along with all the other dbt files into a different subdirectory of tmp.

    • All dbt related files live in s3 buckets
    • dbt deps is run in ci so that it isn't called every time in airflow

    I'm not sure if this is even an issue with this package or a bug with mwaa or dbt, but I thought I'd raise it here first.

    bug 
    opened by samLozier 10
  • dbt_deps question

    dbt_deps question

    @tomasfarias I'm playing around with trying to use the dbt_deps operator to install packages during mwaa run. So far I've been able to get the deps operator to run correctly, but the downstream task, dbt_run will fail due to not being able to find the packages that were just installed? Do I need to hand off the dbt_packages folder via xcom somehow?

    I'm happy to add some documentation for the next person, I'm just hoping to get pointed in the right direction.

    Thanks again for making this library!

    enhancement 
    opened by samLozier 8
  • Not all default arguments are being passed to the DbtBuildOperator

    Not all default arguments are being passed to the DbtBuildOperator

    Hey!

    I noticed that the on_failure_callback from the default argument is not being passed to the DBTBuildOperator as wall as the sla argument.

    I can see from the "Instance Details" these arguments are being passed to my other tasks but not to the airflow_dbt_python operators I bring in. Will try and come up with a PR for the fix, but if you have any guidance on where to start that would be great!

    Thanks

    question 
    opened by marcusintrohive 5
  • fix(s3): Split bucket name and key before uploading

    fix(s3): Split bucket name and key before uploading

    S3 files were uploaded to incorrect keys when running Airflow 2. This was caused by differences between Airflow 1.10 and Airflow 2: the latter assumes that when passing both bucket_name and key that the key is to be taken as it is, where as the former seemed to work even when the key contained the full url.

    Now, we do the parsing and splitting ourselves. We would just set bucket_name to None, however with Airflow 2.0 the upload function checks for the presence of the bucket_name argument, to decide whether to parse the url, not if it's set to None (I think this may be a bug).

    opened by tomasfarias 4
  • Push to S3: malformed URL

    Push to S3: malformed URL

    Hi there,

    I'm testing out airflow-dbt-python==0.12.0 on AWS MWAA. It's working brilliantly with push_dbt_project=False, but fails when trying to push things back to S3 using push_dbt_project=True. I definitely have an IAM policy associated with MWAA that gives the correct PutObject permission for the bucket.

    Looking at my log output, I think there's something wrong with the arguments being passed into load_file_handle_replace_error.

    The log message written by the operator shows the correct target URL:

    [2022-02-21, 03:24:01 UTC] {{dbt.py:258}} INFO - Pushing dbt project back to S3: s3://mwaa-test-bucket/dbt/project/dbt.zip
    

    But when the boto3 S3UploadFailedError occurs later, the error output shows a malformed URL:

    boto3.exceptions.S3UploadFailedError: Failed to upload /tmp/airflowtmptr4_9_un/dbt_project.zip to mwaa-test-bucket/s3://mwaa-test-bucket/dbt/project/dbt.zip: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
    

    I had a quick look at the S3 hook code, and I think it may be to do with this bit:

    https://github.com/tomasfarias/airflow-dbt-python/blob/54729271fa215442149bc21e8b301f6317a33157/airflow_dbt_python/hooks/s3.py#L174-L181

    As I understand it, what's happening here is that the full S3 URL is being passed in as the object key, where I think it should just be the object key itself (assuming that the underlying Airflow S3 hook code should be able to construct the final S3 URL using the supplied bucket name and key).

    This probably explains why the logs have a malformed URL - it looks like it's concatenating the bucket name (mwaa-test-bucket/) and key (s3://mwaa-test-bucket/dbt/project/dbt.zip).

    bug 
    opened by sparkysean 4
  • Make apache-airflow not required

    Make apache-airflow not required

    Since we wish to support all MWAA supported versions of Airflow, this requires supporting both 1.10.12 and 2.X releases. However, There are several conflicting dependencies between dbt-core and apache-airflow. None of the conflicts should break us, but they do make dependency resolution impossible, which means airflow-dbt-python cannot be installed.

    For this reason we have removed the dependency to apache-airflow. Assuming that as an Airflow operator we are always installing airflow-dbt-python in an environment that already has Airflow installed, this shouldn't cause any issues in production deployments.

    Moreover, this makes testing multiple Airflow versions easier as we can pre-install apache-airflow.

    opened by tomasfarias 4
  • Error downloading dbt project files from s3

    Error downloading dbt project files from s3

    When passing an S3 URL to project_dir parameter, DbtS3Hook tries to download the root folder (prefix) as the temp dir throwing an exception.

    With project_dir="s3://MYBUCKET.com/dbt"

    ...
    [2021-12-14 20:15:22,510] {{dbt.py:259}} INFO - Fetching dbt project from S3: s3://MYBUCKET.com/dbt/
    [2021-12-14 20:15:22,511] {{s3.py:82}} INFO - Downloading dbt project files from: s3://MYBUCKET.com/dbt/
    ...
    [2021-12-14 20:15:24,950] {{s3.py:58}} INFO - Saving s3.Object(bucket_name='MYBUCKET.com', key='dbt/') file to: /tmp/airflowtmpimiwzxx7
    [2021-12-14 20:15:24,952] {{taskinstance.py:1482}} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
        self._prepare_and_execute_task_with_callbacks(context, task)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
        result = self._execute_task(context, task_copy)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
        result = task_copy.execute(context=context)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 147, in execute
        with self.dbt_directory() as dbt_dir:  # type: str
      File "/usr/lib64/python3.7/contextlib.py", line 112, in __enter__
        return next(self.gen)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 234, in dbt_directory
        self.prepare_directory(tmp_dir)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 262, in prepare_directory
        tmp_dir,
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/s3.py", line 112, in get_dbt_project
        bucket_name, s3_object_keys, local_project_dir, key_prefix
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/s3.py", line 131, in download_many_s3_keys
        self.download_one_s3_object(local_project_file, s3_object)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/s3.py", line 60, in download_one_s3_object
        with open(target, "wb+") as f:
    IsADirectoryError: [Errno 21] Is a directory: '/tmp/airflowtmpimiwzxx7'
    

    I wasn't able to figure out why airflow_dbt_python/hooks/s3.py, line 98:

    s3_object_keys = self.list_keys(bucket_name=bucket_name, prefix=f"{key_prefix}")
    

    Returns the prefix folder itself (dbt/) as one of the keys.

    Then airflow_dbt_python/hooks/s3.py, line 55, fails to open the file as it is actually a folder:

    with open(target, "wb+") as f:
        s3_object.download_fileobj(f)
    

    By adding the following check after line 112, I was able to workaround the error.

    if s3_object_key == prefix:
        continue
    
    bug 
    opened by tpcarneiro 4
  • MWAA and Environment Variables in profiles.yml

    MWAA and Environment Variables in profiles.yml

    Hello! I found your package tonight and am currently trying to get this set up. After finding workarounds for the below by essentially using the /tmp directory on MWAA, I believe I am on my last issue before successful deploy.

    modules-path
    target_path
    log-path
    

    I am trying to get secrets read in through my profiles.yml file. I have tried entries like the below but to no avail.

    host: "{{ env_var('AIRFLOW__DB_ENDPOINT') }}"
    host: "{{ env_var('AIRFLOW_DB_ENDPOINT') }}"
    host: "{{ env_var('DB_ENDPOINT') }}"
    host: "{{ env_var('AIRFLOW__DB_ENDPOINT', 'not_set') }}"
    

    The last option let me bypass the error temporarily before it failed when building the model. I know this may not be entirely specific to your package, but noticed that you have helped people out before with issues trying to get MWAA running.

    https://docs.aws.amazon.com/mwaa/latest/userguide/configuring-env-variables.html - I was using this somewhat but have not yet actually added anything under custom options. As you can see above, I tried adding AIRFLOW_ and AIRFLOW__ to the beginning but no luck.

    Thought I would reach out and see if you have had luck with something similar. We just switched over to using AWS's secret manager in MWAA and its been working like a charm in our Python tools and it still uses Variable.get() (from Airflows library).

    The typical error I get is:

    Env var required but not provided: 'DB_ENDPOINT'

    but have also gotten this:

    Env var required but not provided: 'DB_ENDPOINT'. This can happen when calling a macro that does not exist. Check for typos and/or install package dependencies with "dbt deps".

    Any help would be appreciated if you have the time!

    opened by lukedappleton 4
  • chore: Update flake8 pre-commit URL to point to GitHub repository

    chore: Update flake8 pre-commit URL to point to GitHub repository

    flake8 has been migrated to GitHub, and the GitLab repository has been deleted.

    For existing contributors, pre-commit still works because a virtualenv with the current configured flake8 version is cached. However, cleaning up the cache and trying to run pre-commit run is enough to reproduce the issue.

    bug 
    opened by adamantike 3
  • Debug logs are default - cannot be turned off

    Debug logs are default - cannot be turned off

    Running on MWAA 2.2.2 I upgraded to 0.15.2 and airflow-redshift==1.2.1. Prior to the upgrade, logs appeared like this:

    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | Concurrency: 4 threads (target='prod')
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 |
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | 1 of 15 START table model ***.................... [RUN]
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | 2 of 15 START table model ****.......................... [RUN]
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | 3 of 15 START table model ***................. [RUN]
    

    After the upgrade,

    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.678976 [debug] [MainThread]: Partial parsing not enabled
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.691536 [debug] [MainThread]: Parsing macros/generate_schema_name.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.692644 [debug] [MainThread]: Parsing macros/adapters.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.710620 [debug] [MainThread]: Parsing macros/catalog.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.718226 [debug] [MainThread]: Parsing macros/relations.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.718708 [debug] [MainThread]: Parsing macros/adapters/apply_grants.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.719046 [debug] [MainThread]: Parsing macros/utils/datediff.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.719490 [debug] [MainThread]: Parsing macros/utils/listagg.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.721693 [debug] [MainThread]: Parsing macros/utils/split_part.sql
    

    Not sure if this is due to dbt or airflow-dbt-python Since the filename is functions.py (previously logging_mixin.py), I assume that this is due to the dbt upgrade. But when I run dbt from the command line, I do not see these debug logs, so I wanted to check.

    I've tried

    config:
        debug: False
    

    in profiles.yml but this doesn't seem to help

    opened by cvlendistry 1
  • connection as a target does not work in MWAA

    connection as a target does not work in MWAA

    Running the example code producing the following on AWS MWAA :

    session = settings.Session()  # type: ignore
    existing = session.query(Connection).filter_by(conn_id="my_db_connection").first()
    
    if existing is None:
        # For illustration purposes, and to keep the example self-contained, we create
        # a Connection using Airflow's ORM. However, any method of loading connections would
        # work, like Airflow's UI, Airflow's CLI, or in deployment scripts.
    
        my_conn = Connection(
            conn_id="my_db_connection",
            conn_type="redshift",
            description="redshift connection",
            host="abc.ch0n7gct0zxb.us-west-2.redshift.amazonaws.com",
            login="dbt_process_user",
            port=5439,
            schema="stage",
            password= get_secret("dev/dbt_process_user")['password'],  # pragma: allowlist secret
            # Other dbt parameters can be added as extras
            extra=json.dumps(dict(threads=4, sslmode="require")),
        )
        session.add(my_conn)
        session.commit()
    with DAG(
        dag_id="dbt_tomasfarias", catchup=False, default_args=default_args, tags=["dbt", "loan tape"], schedule_interval="0 11 * * *"
    ) as dag:
        dbt_run = DbtRunOperator(
            task_id="dbt_run",
            target="my_db_connection",
            #dbt_bin="/usr/local/airflow/.local/bin/dbt",
            profiles_dir=None,
            project_dir="/usr/local/airflow/dags/dbt/etl/",
      
        )
    
    [2022-11-02, 16:11:29 UTC] {{taskinstance.py:1703}} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
        self._execute_task_with_callbacks(context)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
        result = self._execute_task(context, self.task)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
        result = execute_callable(context=context)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 140, in execute
        config = self.get_dbt_config()
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 185, in get_dbt_config
        return factory.create_config(**config_kwargs)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/dbt.py", line 374, in create_config
        initialize_config_values(config)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/main.py", line 170, in initialize_config_values
        cfg = read_user_config(parsed.profiles_dir)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/config/profile.py", line 74, in read_user_config
        profile = read_profile(directory)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/config/profile.py", line 50, in read_profile
        path = os.path.join(profiles_dir, 'profiles.yml')
      File "/usr/lib64/python3.7/posixpath.py", line 80, in join
        a = os.fspath(a)
    TypeError: expected str, bytes or os.PathLike object, not NoneType
    
    awaiting response 
    opened by liongitusr 1
  • feature: Support pulling dbt artifacts from XCOM

    feature: Support pulling dbt artifacts from XCOM

    Currently, airflow-dbt-python supports pushing artifacts to XCOM. However, we do not offer pulling artifacts from XCOM. This would be particularly useful for stateful runs, like when using the source_status:fresher+ selector method that came with dbt 1.1.

    enhancement 
    opened by tomasfarias 0
  • Implement Apache Airflow provider's hook interface

    Implement Apache Airflow provider's hook interface

    Not quite an interface as it's not defined in BaseHook, but most Apache Airflow providers define proper connection types for each of their hooks. This may be a chance to refactor the hooks module into individual hook classes for each dbt supported target type. Furthermore, this can be the chance to clean up the docstring documentation as it's missing a lot of information.

    Perhaps we can apply to be integrated into Apache Airflow as an official provider? Currently, a dbt Cloud provider package exists, but nothing for "vanilla" dbt.

    documentation enhancement 
    opened by tomasfarias 0
  • Duplicate log lines

    Duplicate log lines

    The dbt loggers are producing multiple lines of output that are essentially the same, but one of them is the debug output.

    This is caused by both the file logger and the stdout logger being fired off. We tried to clear the handlers from one of them in operators.dbt.BaseDbtOperator.override_dbt_logging but this is not working.

    We should dig into this issue further to avoid verbose logging outputs.

    bug 
    opened by tomasfarias 0
Releases(v0.15.2)
Owner
Tomás Farías Santana
I like writing code and sometimes I'm paid for it. Other times I try to blog about code.
Tomás Farías Santana
Airflow Operator for running Soda SQL scans

Airflow Operator for running Soda SQL scans

Todd de Quincey 7 Oct 18, 2022
Viewflow is an Airflow-based framework that allows data scientists to create data models without writing Airflow code.

Viewflow Viewflow is a framework built on the top of Airflow that enables data scientists to create materialized views. It allows data scientists to f

DataCamp 114 Oct 12, 2022
This is a practice on Airflow, which is building virtual env, installing Airflow and constructing data pipeline (DAGs)

airflow-test This is a practice on Airflow, which is Builing virtualbox env and setting Airflow on that env Installing Airflow using python virtual en

Jaeyoung 1 Nov 1, 2021
Yet another Airflow plugin using CLI command as RESTful api, supports Airflow v2.X.

中文版文档 Airflow Extended API Plugin Airflow Extended API, which export airflow CLI command as REST-ful API to extend the ability of airflow official API

Eric Cao 106 Nov 9, 2022
:fishing_pole_and_fish: List of `pre-commit` hooks to ensure the quality of your `dbt` projects.

pre-commit-dbt List of pre-commit hooks to ensure the quality of your dbt projects. BETA NOTICE: This tool is still BETA and may have some bugs, so pl

Offbi 262 Nov 25, 2022
dbt adapter for Firebolt

dbt-firebolt dbt adapter for Firebolt dbt-firebolt supports dbt 0.21 and newer Installation First, download the JDBC driver and place it wherever you'

null 23 Dec 14, 2022
Make dbt docs and Apache Superset talk to one another

dbt-superset-lineage Make dbt docs and Apache Superset talk to one another Why do I need something like this? Odds are rather high that you use dbt to

Slido 81 Jan 6, 2023
Model synchronization from dbt to Metabase.

dbt-metabase Model synchronization from dbt to Metabase. If dbt is your source of truth for database schemas and you use Metabase as your analytics to

Mike Gouline 270 Jan 8, 2023
dbt (data build tool) adapter for Oracle Autonomous Database

dbt-oracle version 1.0.0 dbt (data build tool) adapter for the Oracle database. dbt "adapters" are responsible for adapting dbt's functionality to a g

Oracle 22 Nov 15, 2022
LSO, also known as Linux Swap Operator, is a software with both GUI and terminal versions that you can manage the Swap area for Linux operating systems.

LSO - Linux Swap Operator Türkçe - LSO Nedir? LSO, diğer adıyla Linux Swap Operator Linux işletim sistemleri için Swap alanını yönetebileceğiniz hem G

Eren İnce 4 Feb 9, 2022
Blender addon for executing the operator in response to the received OSC message.

I/F Joiner 受信したOSCメッセージに応じてオペレータ(bpy.ops)を実行するアドオンです. OSC通信に対応したコントローラやアプリをインストールしたスマートフォンを使用してBlenderを操作することが可能になります. 同時開発しているAndroidコントローラ化アプリMocopa

simasimataiyo 6 Oct 2, 2022
Backup dc registry - A simple POC that abuses Backup Operator privileges to remote dump SAM, SYSTEM, and SECURITY

Backup Operator Registry Backup to Domain Compromise A simple POC that abuses Ba

Horizon 3 AI Inc 57 Dec 18, 2022
Project repository of Apache Airflow, deployed on Docker in Amazon EC2 via GitLab.

Airflow on Docker in EC2 + GitLab's CI/CD Personal project for simple data pipeline using Airflow. Airflow will be installed inside Docker container,

Ammar Chalifah 13 Nov 29, 2022
A tutorial presents several practical examples of how to build DAGs in Apache Airflow

Apache Airflow - Python Brasil 2021 Este tutorial apresenta vários exemplos práticos de como construir DAGs no Apache Airflow. Background Apache Airfl

Jusbrasil 14 Jun 3, 2022
Repositório para estudo do airflow

airflow-101 Repositório para estudo do airflow Docker criado baseado no tutorial Exemplo de API da pokeapi Para executar clone o repo execute as confi

Gabriel (Gabu) Bellon 1 Nov 23, 2021
A reproduction repo for a Scheduling bug in AirFlow 2.2.3

A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Ilya Strelnikov 1 Feb 9, 2022
A python package to manage the stored receiver-side Strain Green's Tensor (SGT) database of 3D background models and able to generate Green's function and synthetic waveform

A python package to manage the stored receiver-side Strain Green's Tensor (SGT) database of 3D background models and able to generate Green's function and synthetic waveform

Liang Ding 7 Dec 14, 2022
pspsps(1) is a compyuter software to call an online catgirl to the Linux terminyal.

pspsps(1): call a catgirl from the Internyet to the Linux terminyal show processes: ps show catgirls: pspsps —@[email protected] pspsps(1) is a compyute

Melissa Boiko 32 Dec 19, 2022
Access Modbus RTU via API call to Sungrow WiNet-S

SungrowModbusWebClient Access Modbus RTU via API call to Sungrow WiNet-S Class based on pymodbus.ModbusTcpClient, completely interchangeable, just rep

null 8 Oct 30, 2022