An Airflow operator to call the main function from the dbt-core Python package

Overview

airflow-dbt-python

PyPI version GitHub build status Code style: black

An Airflow operator to call the main function from the dbt-core Python package

Motivation

Airflow running in a managed environment

Although dbt is meant to be installed and used as a CLI, we may not have control of the environment where Airflow is running, disallowing us the option of using dbt as a CLI.

This is exactly what happens when using Amazon's Managed Workflows for Apache Airflow or MWAA: although a list of Python requirements can be passed, the CLI cannot be found in the worker's PATH.

There is a workaround which involves using Airflow's BashOperator and running Python from the command line:

from airflow.operators.bash import BashOperator

BASH_COMMAND = "python -c 'from dbt.main import main; main()' run"
operator = BashOperator(
    task_id="dbt_run",
    bash_command=BASH_COMMAND,
)

But it can get sloppy when appending all potential arguments a dbt run command (or other subcommand) can take.

As you may expect, airflow-dbt-python abstracts the complexity of handling CLI arguments by defining an operator for each dbt subcommand, and having each operator be defined with attribute for each possible CLI argument.

An alternative to airflow-dbt that works without the dbt CLI

The existing airflow-dbt package, by default, would not work if the dbt CLI is not in PATH, which means it would not be usable in MWAA. There is a workaround via the dbt_bin argument, which can be set to "python -c 'from dbt.main import main; main()' run", in similar fashion as the BashOperator example. Yet this approach is not without its limitations:

  • airflow-dbt works by wrapping the dbt CLI, which makes our code dependent on the environment in which it runs.
  • airflow-dbt does not support the full range of arguments a command can take. For example, DbtRunOperator does not have an attribute for fail_fast.
  • airflow-dbt does not return anything after the execution, which no information is available for downstream tasks to pull via XCom. An even if it tried to, since it works by wrapping the CLI, it could only attempt to parse the lines printed by dbt to STDOUT. On the other hand, airflow-dbt-python will try to return the information of a dbt result class, as defined in dbt.contracts.results, which opens up possibilities for downstream tasks to condition their execution on the result of a dbt command.

Avoid installing unnecessary dbt plugins

Finally, airflow-dbt-python does not depend on dbt but on dbt-core. The connectors: dbt-redshift, dbt-postgres, dbt-snowflake, and dbt-bigquery are available as installation extras instead of being bundled up by default, which happens when you attempt to install dbt via python -m pip install dbt.

This allows you to easily control what is installed in your environment. One particular example of when this is extremely useful is in the case of the dbt-snowflake connector, which depends on cryptography. This dependency requires the Rust toolchain to run, and this is not supported in a few distributions (like the one MWAA runs on). Even if that's not the case, airflow-dbt-python results in a lighter installation due to only depending on dbt-core.

Usage

Currently, the following dbt commands are supported:

  • clean
  • compile
  • debug
  • deps
  • ls
  • parse
  • run
  • run-operation
  • seed
  • snapshot
  • source (Not well tested)
  • test

Examples

> dbt_seed >> dbt_run ">
from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow_dbt_python.operators.dbt import (
    DbtRunOperator,
    DbtSeedOperator,
    DbtTestoperator,
)

args = {
    'owner': 'airflow',
}

with DAG(
    dag_id='example_dbt_operator',
    default_args=args,
    schedule_interval='0 0 * * *',
    start_date=days_ago(2),
    dagrun_timeout=timedelta(minutes=60),
    tags=['example', 'example2'],
) as dag:
    dbt_test = DbtTestOperator(
        task_id="dbt_test",
        selector="pre-run-tests",
    )

    dbt_seed = DbtSeedOperator(
        task_id="dbt_seed",
        select=["/path/to/first.csv", "/path/to/second.csv"],
        full_refresh=True,
    )

    dbt_run = DbtRunOperator(
        task_id="dbt_run",
        models=["/path/to/models"],
        full_refresh=True,
        fail_fast=True,
    )

    dbt_test >> dbt_seed >> dbt_run

Requirements

airflow-dbt-python is tested in Python 3.7, 3.8, and 3.9, although it could also support older versions.

On the Airflow side, we unit test with versions 1.10.12 and upwards, including the latest version 2 release. Regardless, more testing is planned to ensure compatibility with version 2 of Airflow.

Finally, airflow-dbt-python requires at least dbt version 0.19. Unit tests have verified to pass with version 0.20 after minor changes that should not have major effects anywhere else. Regardless, support for version 0.20 of dbt should be considered experimental.

Installing

From PyPI:

pip install airflow-dbt-python

Any dbt connectors you require may be installed by specifying extras:

pip install airflow-dby-python[snowflake,postgres]

From this repo:

Clone the repo:

git clone https://github.com/tomasfarias/airflow-dbt-python.git
cd airflow-dbt-python

With poetry:

poetry install

Install any extras you need, and only those you need:

poetry install -E postgres -E redshift

Testing

Tests are written using pytest, can be located in test/, and they can be run locally with poetry:

poetry run pytest -vv

License

This project is licensed under the MIT license. See LICENSE.

Comments
  • dbt not writing log to file

    dbt not writing log to file

    It seems that the dbt.log file is not being written inside the temp directory generated by the run.

    Is there any extra config that needs to be done?

    Thanks!

    opened by gvillafanetapia 11
  • Loading dbt from S3 zip file doesn't work

    Loading dbt from S3 zip file doesn't work

    Hello, testing version airflow-dbt-python==0.11.0 on MWAA 2.0.2, it works flawlessly with:

    dbt_test = DbtTestOperator( task_id="dbt_run_daily", project_dir="s3://s3-bucket/s3-prefix/dbt/", profiles_dir="s3://s3-bucket/s3-prefix/dbt_profile", target="dev", profile="dev_profile", do_xcom_push_artifacts=["run_results.json"], )

    but if I try using a dbt.zip file:

    dbt_test = DbtTestOperator( task_id="dbt_run_daily", project_dir="s3://s3-bucket/prefix/dbt/dbt.zip", profiles_dir="s3://s3-bucket/prefix/dbt_profile", target="dev", profile="dev_profile", do_xcom_push_artifacts=["run_results.json"], )

    where dbt.zip file structure is:

    d----- 24/01/2022 19:05 analyses d----- 24/01/2022 19:05 data d----- 27/01/2022 14:59 logs d----- 24/01/2022 19:05 macros d----- 01/02/2022 18:42 models d----- 27/01/2022 14:48 packages d----- 24/01/2022 19:05 snapshots d----- 27/01/2022 15:01 target d----- 01/02/2022 18:54 tests -a---- 24/01/2022 19:05 29 .gitignore -a---- 27/01/2022 15:19 1339 dbt_project.yml -a---- 27/01/2022 14:50 88 packages.yml -a---- 24/01/2022 19:05 571 README.md

    I can see these logs:

    [2022-02-07 09:57:04,793] {{dbt.py:263}} INFO - Fetching profiles.yml from S3: s3://s3-bucket/s3-prefix/dbt_profile [2022-02-07 09:57:04,805] {{s3.py:35}} INFO - Downloading dbt profiles file from: s3://s3-bucket/s3-prefix/dbt_profile [2022-02-07 09:57:04,806] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default [2022-02-07 09:57:04,840] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:04,840] {{base_aws.py:179}} INFO - No credentials retrieved from Connection [2022-02-07 09:57:04,840] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:04,840] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=None [2022-02-07 09:57:04,849] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:04,849] {{base_aws.py:157}} INFO - role_arn is None [2022-02-07 09:57:05,063] {{s3.py:53}} INFO - Saving s3.Object(bucket_name='s3-bucket', key='s3-prefix/dbt_profile/profiles.yml') file to: /tmp/airflowtmpmtcgxrxd/profiles.yml [2022-02-07 09:57:05,093] {{dbt.py:271}} INFO - Fetching dbt project from S3: s3://s3-bucket/s3-prefix/dbt/dbt.zip [2022-02-07 09:57:05,094] {{s3.py:85}} INFO - Downloading dbt project files from: s3://s3-bucket/s3-prefix/dbt/dbt.zip [2022-02-07 09:57:05,094] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default [2022-02-07 09:57:05,127] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:05,127] {{base_aws.py:179}} INFO - No credentials retrieved from Connection [2022-02-07 09:57:05,127] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:05,127] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=None [2022-02-07 09:57:05,137] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:05,137] {{base_aws.py:157}} INFO - role_arn is None [2022-02-07 09:57:05,222] {{s3.py:53}} INFO - Saving s3.Object(bucket_name='s3-bucket', key='s3-prefix/dbt/dbt.zip') file to: /tmp/airflowtmpmtcgxrxd/dbt_project.zip [2022-02-07 09:57:05,411] {{dbt.py:152}} INFO - Running dbt configuration: TestTaskConfig(cls=<class 'dbt.task.test.TestTask'>, project_dir='/tmp/airflowtmpmtcgxrxd/', profiles_dir='/tmp/airflowtmpmtcgxrxd/', profile='dev_profile', target='dev', compiled_target=None, fail_fast=None, single_threaded=None, threads=None, use_experimental_parser=None, vars='{}', warn_error=None, log_format=None, log_cache_events=False, record_timing_info=None, debug=None, defer=None, partial_parse=None, use_colors=None, static_parser=None, version_check=None, send_anonymous_usage_stats=None, write_json=None, exclude=None, select=None, selector_name=None, state=None, models=None, generic=None, indirect_selection=None, singular=None, store_failures=None, which='test') [2022-02-07 09:57:05,632] {{functions.py:248}} INFO - 09:57:05.632049 [info ] [MainThread]: Partial parse save file not found. Starting full parse. [2022-02-07 09:57:05,632] {{log.py:235}} WARNING - 09:57:05 Partial parse save file not found. Starting full parse. [2022-02-07 09:57:05,632] {{functions.py:248}} INFO - 09:57:05 Partial parse save file not found. Starting full parse. [2022-02-07 09:57:06,226] {{functions.py:248}} INFO - 09:57:06.226743 [info ] [MainThread]: Found 0 models, 0 tests, 0 snapshots, 0 analyses, 191 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics [2022-02-07 09:57:06,227] {{log.py:235}} WARNING - 09:57:06 Found 0 models, 0 tests, 0 snapshots, 0 analyses, 191 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics [2022-02-07 09:57:06,227] {{functions.py:248}} INFO - 09:57:06 Found 0 models, 0 tests, 0 snapshots, 0 analyses, 191 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics [2022-02-07 09:57:06,228] {{functions.py:248}} INFO - 09:57:06.228139 [info ] [MainThread]: [2022-02-07 09:57:06,228] {{log.py:235}} WARNING - 09:57:06 [2022-02-07 09:57:06,228] {{functions.py:248}} INFO - 09:57:06
    [2022-02-07 09:57:06,228] {{functions.py:250}} WARNING - 09:57:06.228662 [warn ] [MainThread]: [WARNING]: Nothing to do. Try checking your model configs and model specification args [2022-02-07 09:57:06,229] {{log.py:235}} WARNING - 09:57:06 [ [2022-02-07 09:57:06,229] {{log.py:235}} WARNING - WARNING [2022-02-07 09:57:06,229] {{log.py:235}} WARNING - ]: Nothing to do. Try checking your model configs and model specification args [2022-02-07 09:57:06,229] {{functions.py:250}} WARNING - 09:57:06 [WARNING]: Nothing to do. Try checking your model configs and model specification args [2022-02-07 09:57:06,282] {{taskinstance.py:1192}} INFO - Marking task as SUCCESS. dag_id=dbt_python_tests, task_id=dbt_run_daily, execution_date=20220207T095658, start_date=20220207T095704, end_date=20220207T095706 [2022-02-07 09:57:06,310] {{taskinstance.py:1246}} INFO - 1 downstream tasks scheduled from follow-on schedule check [2022-02-07 09:57:09,764] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:09,764] {{local_task_job.py:188}} WARNING - State of this instance has been externally set to success. Terminating instance.

    But tests (there are 6) and macros (more than 500) are not being retrieved. I tried to perform several tests but got no luck. Did I do something wrong?

    question 
    opened by yarimarchetti 10
  • dbt seed operator only runs successfully on first run after reboot of mwaa

    dbt seed operator only runs successfully on first run after reboot of mwaa

    I'm using this very helpful package to run dbt code on mwaa and have encountered a weird bug where the dbt seed operator only works on the first run after the mwaa environment has been created or updated. Subsequent runs fail with a .csv not found error. Notably, dbt seed appears to be looking in the wrong temp directory on the second run.

    An example of the failure: [Errno 2] No such file or directory: '/tmp/airflowtmpeijb2yn7/data/METRICS_SEED.csv' The temp directory, in this case airflowtmpeijb2yn7, does not match the directory that the "fetch from s3" step has downloaded all the dbt files into. Looking at that log, I can see that the .csv file I wanted was downloaded, along with all the other dbt files into a different subdirectory of tmp.

    • All dbt related files live in s3 buckets
    • dbt deps is run in ci so that it isn't called every time in airflow

    I'm not sure if this is even an issue with this package or a bug with mwaa or dbt, but I thought I'd raise it here first.

    bug 
    opened by samLozier 10
  • dbt_deps question

    dbt_deps question

    @tomasfarias I'm playing around with trying to use the dbt_deps operator to install packages during mwaa run. So far I've been able to get the deps operator to run correctly, but the downstream task, dbt_run will fail due to not being able to find the packages that were just installed? Do I need to hand off the dbt_packages folder via xcom somehow?

    I'm happy to add some documentation for the next person, I'm just hoping to get pointed in the right direction.

    Thanks again for making this library!

    enhancement 
    opened by samLozier 8
  • Not all default arguments are being passed to the DbtBuildOperator

    Not all default arguments are being passed to the DbtBuildOperator

    Hey!

    I noticed that the on_failure_callback from the default argument is not being passed to the DBTBuildOperator as wall as the sla argument.

    I can see from the "Instance Details" these arguments are being passed to my other tasks but not to the airflow_dbt_python operators I bring in. Will try and come up with a PR for the fix, but if you have any guidance on where to start that would be great!

    Thanks

    question 
    opened by marcusintrohive 5
  • fix(s3): Split bucket name and key before uploading

    fix(s3): Split bucket name and key before uploading

    S3 files were uploaded to incorrect keys when running Airflow 2. This was caused by differences between Airflow 1.10 and Airflow 2: the latter assumes that when passing both bucket_name and key that the key is to be taken as it is, where as the former seemed to work even when the key contained the full url.

    Now, we do the parsing and splitting ourselves. We would just set bucket_name to None, however with Airflow 2.0 the upload function checks for the presence of the bucket_name argument, to decide whether to parse the url, not if it's set to None (I think this may be a bug).

    opened by tomasfarias 4
  • Push to S3: malformed URL

    Push to S3: malformed URL

    Hi there,

    I'm testing out airflow-dbt-python==0.12.0 on AWS MWAA. It's working brilliantly with push_dbt_project=False, but fails when trying to push things back to S3 using push_dbt_project=True. I definitely have an IAM policy associated with MWAA that gives the correct PutObject permission for the bucket.

    Looking at my log output, I think there's something wrong with the arguments being passed into load_file_handle_replace_error.

    The log message written by the operator shows the correct target URL:

    [2022-02-21, 03:24:01 UTC] {{dbt.py:258}} INFO - Pushing dbt project back to S3: s3://mwaa-test-bucket/dbt/project/dbt.zip
    

    But when the boto3 S3UploadFailedError occurs later, the error output shows a malformed URL:

    boto3.exceptions.S3UploadFailedError: Failed to upload /tmp/airflowtmptr4_9_un/dbt_project.zip to mwaa-test-bucket/s3://mwaa-test-bucket/dbt/project/dbt.zip: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
    

    I had a quick look at the S3 hook code, and I think it may be to do with this bit:

    https://github.com/tomasfarias/airflow-dbt-python/blob/54729271fa215442149bc21e8b301f6317a33157/airflow_dbt_python/hooks/s3.py#L174-L181

    As I understand it, what's happening here is that the full S3 URL is being passed in as the object key, where I think it should just be the object key itself (assuming that the underlying Airflow S3 hook code should be able to construct the final S3 URL using the supplied bucket name and key).

    This probably explains why the logs have a malformed URL - it looks like it's concatenating the bucket name (mwaa-test-bucket/) and key (s3://mwaa-test-bucket/dbt/project/dbt.zip).

    bug 
    opened by sparkysean 4
  • Make apache-airflow not required

    Make apache-airflow not required

    Since we wish to support all MWAA supported versions of Airflow, this requires supporting both 1.10.12 and 2.X releases. However, There are several conflicting dependencies between dbt-core and apache-airflow. None of the conflicts should break us, but they do make dependency resolution impossible, which means airflow-dbt-python cannot be installed.

    For this reason we have removed the dependency to apache-airflow. Assuming that as an Airflow operator we are always installing airflow-dbt-python in an environment that already has Airflow installed, this shouldn't cause any issues in production deployments.

    Moreover, this makes testing multiple Airflow versions easier as we can pre-install apache-airflow.

    opened by tomasfarias 4
  • Error downloading dbt project files from s3

    Error downloading dbt project files from s3

    When passing an S3 URL to project_dir parameter, DbtS3Hook tries to download the root folder (prefix) as the temp dir throwing an exception.

    With project_dir="s3://MYBUCKET.com/dbt"

    ...
    [2021-12-14 20:15:22,510] {{dbt.py:259}} INFO - Fetching dbt project from S3: s3://MYBUCKET.com/dbt/
    [2021-12-14 20:15:22,511] {{s3.py:82}} INFO - Downloading dbt project files from: s3://MYBUCKET.com/dbt/
    ...
    [2021-12-14 20:15:24,950] {{s3.py:58}} INFO - Saving s3.Object(bucket_name='MYBUCKET.com', key='dbt/') file to: /tmp/airflowtmpimiwzxx7
    [2021-12-14 20:15:24,952] {{taskinstance.py:1482}} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
        self._prepare_and_execute_task_with_callbacks(context, task)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
        result = self._execute_task(context, task_copy)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
        result = task_copy.execute(context=context)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 147, in execute
        with self.dbt_directory() as dbt_dir:  # type: str
      File "/usr/lib64/python3.7/contextlib.py", line 112, in __enter__
        return next(self.gen)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 234, in dbt_directory
        self.prepare_directory(tmp_dir)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 262, in prepare_directory
        tmp_dir,
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/s3.py", line 112, in get_dbt_project
        bucket_name, s3_object_keys, local_project_dir, key_prefix
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/s3.py", line 131, in download_many_s3_keys
        self.download_one_s3_object(local_project_file, s3_object)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/s3.py", line 60, in download_one_s3_object
        with open(target, "wb+") as f:
    IsADirectoryError: [Errno 21] Is a directory: '/tmp/airflowtmpimiwzxx7'
    

    I wasn't able to figure out why airflow_dbt_python/hooks/s3.py, line 98:

    s3_object_keys = self.list_keys(bucket_name=bucket_name, prefix=f"{key_prefix}")
    

    Returns the prefix folder itself (dbt/) as one of the keys.

    Then airflow_dbt_python/hooks/s3.py, line 55, fails to open the file as it is actually a folder:

    with open(target, "wb+") as f:
        s3_object.download_fileobj(f)
    

    By adding the following check after line 112, I was able to workaround the error.

    if s3_object_key == prefix:
        continue
    
    bug 
    opened by tpcarneiro 4
  • MWAA and Environment Variables in profiles.yml

    MWAA and Environment Variables in profiles.yml

    Hello! I found your package tonight and am currently trying to get this set up. After finding workarounds for the below by essentially using the /tmp directory on MWAA, I believe I am on my last issue before successful deploy.

    modules-path
    target_path
    log-path
    

    I am trying to get secrets read in through my profiles.yml file. I have tried entries like the below but to no avail.

    host: "{{ env_var('AIRFLOW__DB_ENDPOINT') }}"
    host: "{{ env_var('AIRFLOW_DB_ENDPOINT') }}"
    host: "{{ env_var('DB_ENDPOINT') }}"
    host: "{{ env_var('AIRFLOW__DB_ENDPOINT', 'not_set') }}"
    

    The last option let me bypass the error temporarily before it failed when building the model. I know this may not be entirely specific to your package, but noticed that you have helped people out before with issues trying to get MWAA running.

    https://docs.aws.amazon.com/mwaa/latest/userguide/configuring-env-variables.html - I was using this somewhat but have not yet actually added anything under custom options. As you can see above, I tried adding AIRFLOW_ and AIRFLOW__ to the beginning but no luck.

    Thought I would reach out and see if you have had luck with something similar. We just switched over to using AWS's secret manager in MWAA and its been working like a charm in our Python tools and it still uses Variable.get() (from Airflows library).

    The typical error I get is:

    Env var required but not provided: 'DB_ENDPOINT'

    but have also gotten this:

    Env var required but not provided: 'DB_ENDPOINT'. This can happen when calling a macro that does not exist. Check for typos and/or install package dependencies with "dbt deps".

    Any help would be appreciated if you have the time!

    opened by lukedappleton 4
  • chore: Update flake8 pre-commit URL to point to GitHub repository

    chore: Update flake8 pre-commit URL to point to GitHub repository

    flake8 has been migrated to GitHub, and the GitLab repository has been deleted.

    For existing contributors, pre-commit still works because a virtualenv with the current configured flake8 version is cached. However, cleaning up the cache and trying to run pre-commit run is enough to reproduce the issue.

    bug 
    opened by adamantike 3
  • Debug logs are default - cannot be turned off

    Debug logs are default - cannot be turned off

    Running on MWAA 2.2.2 I upgraded to 0.15.2 and airflow-redshift==1.2.1. Prior to the upgrade, logs appeared like this:

    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | Concurrency: 4 threads (target='prod')
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 |
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | 1 of 15 START table model ***.................... [RUN]
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | 2 of 15 START table model ****.......................... [RUN]
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | 3 of 15 START table model ***................. [RUN]
    

    After the upgrade,

    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.678976 [debug] [MainThread]: Partial parsing not enabled
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.691536 [debug] [MainThread]: Parsing macros/generate_schema_name.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.692644 [debug] [MainThread]: Parsing macros/adapters.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.710620 [debug] [MainThread]: Parsing macros/catalog.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.718226 [debug] [MainThread]: Parsing macros/relations.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.718708 [debug] [MainThread]: Parsing macros/adapters/apply_grants.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.719046 [debug] [MainThread]: Parsing macros/utils/datediff.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.719490 [debug] [MainThread]: Parsing macros/utils/listagg.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.721693 [debug] [MainThread]: Parsing macros/utils/split_part.sql
    

    Not sure if this is due to dbt or airflow-dbt-python Since the filename is functions.py (previously logging_mixin.py), I assume that this is due to the dbt upgrade. But when I run dbt from the command line, I do not see these debug logs, so I wanted to check.

    I've tried

    config:
        debug: False
    

    in profiles.yml but this doesn't seem to help

    opened by cvlendistry 1
  • connection as a target does not work in MWAA

    connection as a target does not work in MWAA

    Running the example code producing the following on AWS MWAA :

    session = settings.Session()  # type: ignore
    existing = session.query(Connection).filter_by(conn_id="my_db_connection").first()
    
    if existing is None:
        # For illustration purposes, and to keep the example self-contained, we create
        # a Connection using Airflow's ORM. However, any method of loading connections would
        # work, like Airflow's UI, Airflow's CLI, or in deployment scripts.
    
        my_conn = Connection(
            conn_id="my_db_connection",
            conn_type="redshift",
            description="redshift connection",
            host="abc.ch0n7gct0zxb.us-west-2.redshift.amazonaws.com",
            login="dbt_process_user",
            port=5439,
            schema="stage",
            password= get_secret("dev/dbt_process_user")['password'],  # pragma: allowlist secret
            # Other dbt parameters can be added as extras
            extra=json.dumps(dict(threads=4, sslmode="require")),
        )
        session.add(my_conn)
        session.commit()
    with DAG(
        dag_id="dbt_tomasfarias", catchup=False, default_args=default_args, tags=["dbt", "loan tape"], schedule_interval="0 11 * * *"
    ) as dag:
        dbt_run = DbtRunOperator(
            task_id="dbt_run",
            target="my_db_connection",
            #dbt_bin="/usr/local/airflow/.local/bin/dbt",
            profiles_dir=None,
            project_dir="/usr/local/airflow/dags/dbt/etl/",
      
        )
    
    [2022-11-02, 16:11:29 UTC] {{taskinstance.py:1703}} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
        self._execute_task_with_callbacks(context)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
        result = self._execute_task(context, self.task)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
        result = execute_callable(context=context)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 140, in execute
        config = self.get_dbt_config()
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 185, in get_dbt_config
        return factory.create_config(**config_kwargs)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/dbt.py", line 374, in create_config
        initialize_config_values(config)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/main.py", line 170, in initialize_config_values
        cfg = read_user_config(parsed.profiles_dir)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/config/profile.py", line 74, in read_user_config
        profile = read_profile(directory)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/config/profile.py", line 50, in read_profile
        path = os.path.join(profiles_dir, 'profiles.yml')
      File "/usr/lib64/python3.7/posixpath.py", line 80, in join
        a = os.fspath(a)
    TypeError: expected str, bytes or os.PathLike object, not NoneType
    
    awaiting response 
    opened by liongitusr 1
  • feature: Support pulling dbt artifacts from XCOM

    feature: Support pulling dbt artifacts from XCOM

    Currently, airflow-dbt-python supports pushing artifacts to XCOM. However, we do not offer pulling artifacts from XCOM. This would be particularly useful for stateful runs, like when using the source_status:fresher+ selector method that came with dbt 1.1.

    enhancement 
    opened by tomasfarias 0
  • Implement Apache Airflow provider's hook interface

    Implement Apache Airflow provider's hook interface

    Not quite an interface as it's not defined in BaseHook, but most Apache Airflow providers define proper connection types for each of their hooks. This may be a chance to refactor the hooks module into individual hook classes for each dbt supported target type. Furthermore, this can be the chance to clean up the docstring documentation as it's missing a lot of information.

    Perhaps we can apply to be integrated into Apache Airflow as an official provider? Currently, a dbt Cloud provider package exists, but nothing for "vanilla" dbt.

    documentation enhancement 
    opened by tomasfarias 0
  • Duplicate log lines

    Duplicate log lines

    The dbt loggers are producing multiple lines of output that are essentially the same, but one of them is the debug output.

    This is caused by both the file logger and the stdout logger being fired off. We tried to clear the handlers from one of them in operators.dbt.BaseDbtOperator.override_dbt_logging but this is not working.

    We should dig into this issue further to avoid verbose logging outputs.

    bug 
    opened by tomasfarias 0
Releases(v0.15.2)
Owner
Tomás Farías Santana
I like writing code and sometimes I'm paid for it. Other times I try to blog about code.
Tomás Farías Santana
Multtable is a collection of multiplication table generators in various languages.

Multtable Multtable is a collection of multiplication table generators in various languages. This project was created as a joke based on one of my bro

pollen__ 7 Mar 05, 2022
Given tool find related trending keywords of input keyword

blog_generator Given tool find related trending keywords of input keyword (blog_related_to_keyword). Then cretes a mini blog. Currently its customised

Shivanshu Srivastava 2 Nov 30, 2021
SDX: Software Defined Internet Exchange

Installation steps: Download and import the Internet2-SDX virtual machine (VM) image, below, in VirtualBox and you are all set :) $ wget http://sites.

Software Defined Internet Exchange Point 15 Nov 21, 2021
A multi-platform fuzzer for poking at userland binaries and servers

litefuzz A multi-platform fuzzer for poking at userland binaries and servers litefuzz intro why how it works what it does what it doesn't do support p

52 Nov 18, 2022
30DaysOfCode-PhoenixClub - Solution of everyday coding problem given in 30DaysofCode contest held on Hackerrank

30DaysOfCode-PhoenixClub 👨‍💻 Every day problems solution given in 30DaysOfCode

Urveshkumar 8 Jan 30, 2022
Yet another basic python package.

ironmelts A basic python package. Easy to use. Minimum requirements. Installing Linux python3 -m pip install -U ironmelts macOS python3 -m pip install

IRONMELTS 1 Oct 26, 2021
SpellingBeeSolver - This program generates solutions to NYT style spelling bee problems.

SpellingBeeSolver This program generates solutions to NYT style spelling bee problems. The initial version of this program is being written in Python

1 Jan 01, 2022
Context-free grammar to Sublime-syntax file

Generate a sublime-syntax file from a non-left-recursive, follow-determined, context-free grammar

Haggai Nuchi 8 Nov 17, 2022
Movie recommend community

README 0. 초록 1) 목적 사용자의 Needs를 기반으로 영화를 추천해주는 커뮤니티 서비스 구현 2) p!ck 서비스란? "pick your taste!" 취향대로 영화 플레이리스트(이하 서비스 내에서의 명칭인 '바스켓'이라 함)를 만들고, 비슷한 취향을 가진

2 Dec 08, 2021
Manually Install Python 2.7 pip without any problem !

Python2.7_install_pip Manually Install Python 2.7 pip without any problem ! Download installPip.py to your system and Run the code using this Command

Ali Jafari 1 Dec 09, 2021
A Python application that helps users determine their calorie intake, and automatically generates customized weekly meal and workout plans based on metrics computed using their physical parameters

A Python application that helps users determine their calorie intake, and automatically generates customized weekly meal and workout plans based on metrics computed using their physical parameters

Anam Iqbal 1 Jan 13, 2022
Enjoyable scripting experience with Python

Enjoyable scripting experience with Python

8 Jun 08, 2022
AndroidEnv is a Python library that exposes an Android device as a Reinforcement Learning (RL) environment.

AndroidEnv is a Python library that exposes an Android device as a Reinforcement Learning (RL) environment.

DeepMind 814 Dec 26, 2022
Find out where all films you want to watch are streaming

Just Watch Letterboxd Find out where all films you want to watch are streaming Ever wonder what films you want to watch are already on the streaming p

Jordan Oslislo 2 Feb 04, 2022
WordPress-style shortcodes for Python

Python Shortcodes WordPress-style shortcodes for Python Create and use WordPress-style shortcodes in your Python based app. Example # static output de

Bob 1 Dec 22, 2021
A script that convert WiiU BotW mods to Switch

UltimateBoTWConverter A script that convert WiiU BotW mods to Switch. It uses every resource I could find under the sun that allows for conversion, wi

11 Nov 08, 2022
Hello, Welcome to this repo. don't forget to read guidelines in readme.md

Hacktoberfest_2021 If you looking for your first contribution, we are here to help. Just create a simple program using any language you like in our fo

Wafa Rifqi Anafin 117 Dec 14, 2022
Structured, dependable legos for Starknet development.

cairomate • Structured, dependable legos for starknet development. Directory Structure contracts ├─ defi │ ├─ ChainlinkPriceOracle — "Simple price or

andreas 127 Nov 23, 2022
Scripts to convert the Ted-MDB corpora into the formats for DISRPT shared task and the converted corpora

Scripts to convert the Ted-MDB corpora into the formats for DISRPT shared task and the converted corpora.

1 Feb 08, 2022
pgvector support for Python

pgvector-python pgvector support for Python Great for online recommendations 🎉 Supports Django, SQLAlchemy, Psycopg 2, Psycopg 3, and asyncpg Install

Andrew Kane 37 Dec 20, 2022