An Airflow operator to call the main function from the dbt-core Python package

Overview

airflow-dbt-python

PyPI version GitHub build status Code style: black

An Airflow operator to call the main function from the dbt-core Python package

Motivation

Airflow running in a managed environment

Although dbt is meant to be installed and used as a CLI, we may not have control of the environment where Airflow is running, disallowing us the option of using dbt as a CLI.

This is exactly what happens when using Amazon's Managed Workflows for Apache Airflow or MWAA: although a list of Python requirements can be passed, the CLI cannot be found in the worker's PATH.

There is a workaround which involves using Airflow's BashOperator and running Python from the command line:

from airflow.operators.bash import BashOperator

BASH_COMMAND = "python -c 'from dbt.main import main; main()' run"
operator = BashOperator(
    task_id="dbt_run",
    bash_command=BASH_COMMAND,
)

But it can get sloppy when appending all potential arguments a dbt run command (or other subcommand) can take.

As you may expect, airflow-dbt-python abstracts the complexity of handling CLI arguments by defining an operator for each dbt subcommand, and having each operator be defined with attribute for each possible CLI argument.

An alternative to airflow-dbt that works without the dbt CLI

The existing airflow-dbt package, by default, would not work if the dbt CLI is not in PATH, which means it would not be usable in MWAA. There is a workaround via the dbt_bin argument, which can be set to "python -c 'from dbt.main import main; main()' run", in similar fashion as the BashOperator example. Yet this approach is not without its limitations:

  • airflow-dbt works by wrapping the dbt CLI, which makes our code dependent on the environment in which it runs.
  • airflow-dbt does not support the full range of arguments a command can take. For example, DbtRunOperator does not have an attribute for fail_fast.
  • airflow-dbt does not return anything after the execution, which no information is available for downstream tasks to pull via XCom. An even if it tried to, since it works by wrapping the CLI, it could only attempt to parse the lines printed by dbt to STDOUT. On the other hand, airflow-dbt-python will try to return the information of a dbt result class, as defined in dbt.contracts.results, which opens up possibilities for downstream tasks to condition their execution on the result of a dbt command.

Avoid installing unnecessary dbt plugins

Finally, airflow-dbt-python does not depend on dbt but on dbt-core. The connectors: dbt-redshift, dbt-postgres, dbt-snowflake, and dbt-bigquery are available as installation extras instead of being bundled up by default, which happens when you attempt to install dbt via python -m pip install dbt.

This allows you to easily control what is installed in your environment. One particular example of when this is extremely useful is in the case of the dbt-snowflake connector, which depends on cryptography. This dependency requires the Rust toolchain to run, and this is not supported in a few distributions (like the one MWAA runs on). Even if that's not the case, airflow-dbt-python results in a lighter installation due to only depending on dbt-core.

Usage

Currently, the following dbt commands are supported:

  • clean
  • compile
  • debug
  • deps
  • ls
  • parse
  • run
  • run-operation
  • seed
  • snapshot
  • source (Not well tested)
  • test

Examples

> dbt_seed >> dbt_run ">
from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow_dbt_python.operators.dbt import (
    DbtRunOperator,
    DbtSeedOperator,
    DbtTestoperator,
)

args = {
    'owner': 'airflow',
}

with DAG(
    dag_id='example_dbt_operator',
    default_args=args,
    schedule_interval='0 0 * * *',
    start_date=days_ago(2),
    dagrun_timeout=timedelta(minutes=60),
    tags=['example', 'example2'],
) as dag:
    dbt_test = DbtTestOperator(
        task_id="dbt_test",
        selector="pre-run-tests",
    )

    dbt_seed = DbtSeedOperator(
        task_id="dbt_seed",
        select=["/path/to/first.csv", "/path/to/second.csv"],
        full_refresh=True,
    )

    dbt_run = DbtRunOperator(
        task_id="dbt_run",
        models=["/path/to/models"],
        full_refresh=True,
        fail_fast=True,
    )

    dbt_test >> dbt_seed >> dbt_run

Requirements

airflow-dbt-python is tested in Python 3.7, 3.8, and 3.9, although it could also support older versions.

On the Airflow side, we unit test with versions 1.10.12 and upwards, including the latest version 2 release. Regardless, more testing is planned to ensure compatibility with version 2 of Airflow.

Finally, airflow-dbt-python requires at least dbt version 0.19. Unit tests have verified to pass with version 0.20 after minor changes that should not have major effects anywhere else. Regardless, support for version 0.20 of dbt should be considered experimental.

Installing

From PyPI:

pip install airflow-dbt-python

Any dbt connectors you require may be installed by specifying extras:

pip install airflow-dby-python[snowflake,postgres]

From this repo:

Clone the repo:

git clone https://github.com/tomasfarias/airflow-dbt-python.git
cd airflow-dbt-python

With poetry:

poetry install

Install any extras you need, and only those you need:

poetry install -E postgres -E redshift

Testing

Tests are written using pytest, can be located in test/, and they can be run locally with poetry:

poetry run pytest -vv

License

This project is licensed under the MIT license. See LICENSE.

Comments
  • dbt not writing log to file

    dbt not writing log to file

    It seems that the dbt.log file is not being written inside the temp directory generated by the run.

    Is there any extra config that needs to be done?

    Thanks!

    opened by gvillafanetapia 11
  • Loading dbt from S3 zip file doesn't work

    Loading dbt from S3 zip file doesn't work

    Hello, testing version airflow-dbt-python==0.11.0 on MWAA 2.0.2, it works flawlessly with:

    dbt_test = DbtTestOperator( task_id="dbt_run_daily", project_dir="s3://s3-bucket/s3-prefix/dbt/", profiles_dir="s3://s3-bucket/s3-prefix/dbt_profile", target="dev", profile="dev_profile", do_xcom_push_artifacts=["run_results.json"], )

    but if I try using a dbt.zip file:

    dbt_test = DbtTestOperator( task_id="dbt_run_daily", project_dir="s3://s3-bucket/prefix/dbt/dbt.zip", profiles_dir="s3://s3-bucket/prefix/dbt_profile", target="dev", profile="dev_profile", do_xcom_push_artifacts=["run_results.json"], )

    where dbt.zip file structure is:

    d----- 24/01/2022 19:05 analyses d----- 24/01/2022 19:05 data d----- 27/01/2022 14:59 logs d----- 24/01/2022 19:05 macros d----- 01/02/2022 18:42 models d----- 27/01/2022 14:48 packages d----- 24/01/2022 19:05 snapshots d----- 27/01/2022 15:01 target d----- 01/02/2022 18:54 tests -a---- 24/01/2022 19:05 29 .gitignore -a---- 27/01/2022 15:19 1339 dbt_project.yml -a---- 27/01/2022 14:50 88 packages.yml -a---- 24/01/2022 19:05 571 README.md

    I can see these logs:

    [2022-02-07 09:57:04,793] {{dbt.py:263}} INFO - Fetching profiles.yml from S3: s3://s3-bucket/s3-prefix/dbt_profile [2022-02-07 09:57:04,805] {{s3.py:35}} INFO - Downloading dbt profiles file from: s3://s3-bucket/s3-prefix/dbt_profile [2022-02-07 09:57:04,806] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default [2022-02-07 09:57:04,840] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:04,840] {{base_aws.py:179}} INFO - No credentials retrieved from Connection [2022-02-07 09:57:04,840] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:04,840] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=None [2022-02-07 09:57:04,849] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:04,849] {{base_aws.py:157}} INFO - role_arn is None [2022-02-07 09:57:05,063] {{s3.py:53}} INFO - Saving s3.Object(bucket_name='s3-bucket', key='s3-prefix/dbt_profile/profiles.yml') file to: /tmp/airflowtmpmtcgxrxd/profiles.yml [2022-02-07 09:57:05,093] {{dbt.py:271}} INFO - Fetching dbt project from S3: s3://s3-bucket/s3-prefix/dbt/dbt.zip [2022-02-07 09:57:05,094] {{s3.py:85}} INFO - Downloading dbt project files from: s3://s3-bucket/s3-prefix/dbt/dbt.zip [2022-02-07 09:57:05,094] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default [2022-02-07 09:57:05,127] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:05,127] {{base_aws.py:179}} INFO - No credentials retrieved from Connection [2022-02-07 09:57:05,127] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:05,127] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=None [2022-02-07 09:57:05,137] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:05,137] {{base_aws.py:157}} INFO - role_arn is None [2022-02-07 09:57:05,222] {{s3.py:53}} INFO - Saving s3.Object(bucket_name='s3-bucket', key='s3-prefix/dbt/dbt.zip') file to: /tmp/airflowtmpmtcgxrxd/dbt_project.zip [2022-02-07 09:57:05,411] {{dbt.py:152}} INFO - Running dbt configuration: TestTaskConfig(cls=<class 'dbt.task.test.TestTask'>, project_dir='/tmp/airflowtmpmtcgxrxd/', profiles_dir='/tmp/airflowtmpmtcgxrxd/', profile='dev_profile', target='dev', compiled_target=None, fail_fast=None, single_threaded=None, threads=None, use_experimental_parser=None, vars='{}', warn_error=None, log_format=None, log_cache_events=False, record_timing_info=None, debug=None, defer=None, partial_parse=None, use_colors=None, static_parser=None, version_check=None, send_anonymous_usage_stats=None, write_json=None, exclude=None, select=None, selector_name=None, state=None, models=None, generic=None, indirect_selection=None, singular=None, store_failures=None, which='test') [2022-02-07 09:57:05,632] {{functions.py:248}} INFO - 09:57:05.632049 [info ] [MainThread]: Partial parse save file not found. Starting full parse. [2022-02-07 09:57:05,632] {{log.py:235}} WARNING - 09:57:05 Partial parse save file not found. Starting full parse. [2022-02-07 09:57:05,632] {{functions.py:248}} INFO - 09:57:05 Partial parse save file not found. Starting full parse. [2022-02-07 09:57:06,226] {{functions.py:248}} INFO - 09:57:06.226743 [info ] [MainThread]: Found 0 models, 0 tests, 0 snapshots, 0 analyses, 191 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics [2022-02-07 09:57:06,227] {{log.py:235}} WARNING - 09:57:06 Found 0 models, 0 tests, 0 snapshots, 0 analyses, 191 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics [2022-02-07 09:57:06,227] {{functions.py:248}} INFO - 09:57:06 Found 0 models, 0 tests, 0 snapshots, 0 analyses, 191 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics [2022-02-07 09:57:06,228] {{functions.py:248}} INFO - 09:57:06.228139 [info ] [MainThread]: [2022-02-07 09:57:06,228] {{log.py:235}} WARNING - 09:57:06 [2022-02-07 09:57:06,228] {{functions.py:248}} INFO - 09:57:06
    [2022-02-07 09:57:06,228] {{functions.py:250}} WARNING - 09:57:06.228662 [warn ] [MainThread]: [WARNING]: Nothing to do. Try checking your model configs and model specification args [2022-02-07 09:57:06,229] {{log.py:235}} WARNING - 09:57:06 [ [2022-02-07 09:57:06,229] {{log.py:235}} WARNING - WARNING [2022-02-07 09:57:06,229] {{log.py:235}} WARNING - ]: Nothing to do. Try checking your model configs and model specification args [2022-02-07 09:57:06,229] {{functions.py:250}} WARNING - 09:57:06 [WARNING]: Nothing to do. Try checking your model configs and model specification args [2022-02-07 09:57:06,282] {{taskinstance.py:1192}} INFO - Marking task as SUCCESS. dag_id=dbt_python_tests, task_id=dbt_run_daily, execution_date=20220207T095658, start_date=20220207T095704, end_date=20220207T095706 [2022-02-07 09:57:06,310] {{taskinstance.py:1246}} INFO - 1 downstream tasks scheduled from follow-on schedule check [2022-02-07 09:57:09,764] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:09,764] {{local_task_job.py:188}} WARNING - State of this instance has been externally set to success. Terminating instance.

    But tests (there are 6) and macros (more than 500) are not being retrieved. I tried to perform several tests but got no luck. Did I do something wrong?

    question 
    opened by yarimarchetti 10
  • dbt seed operator only runs successfully on first run after reboot of mwaa

    dbt seed operator only runs successfully on first run after reboot of mwaa

    I'm using this very helpful package to run dbt code on mwaa and have encountered a weird bug where the dbt seed operator only works on the first run after the mwaa environment has been created or updated. Subsequent runs fail with a .csv not found error. Notably, dbt seed appears to be looking in the wrong temp directory on the second run.

    An example of the failure: [Errno 2] No such file or directory: '/tmp/airflowtmpeijb2yn7/data/METRICS_SEED.csv' The temp directory, in this case airflowtmpeijb2yn7, does not match the directory that the "fetch from s3" step has downloaded all the dbt files into. Looking at that log, I can see that the .csv file I wanted was downloaded, along with all the other dbt files into a different subdirectory of tmp.

    • All dbt related files live in s3 buckets
    • dbt deps is run in ci so that it isn't called every time in airflow

    I'm not sure if this is even an issue with this package or a bug with mwaa or dbt, but I thought I'd raise it here first.

    bug 
    opened by samLozier 10
  • dbt_deps question

    dbt_deps question

    @tomasfarias I'm playing around with trying to use the dbt_deps operator to install packages during mwaa run. So far I've been able to get the deps operator to run correctly, but the downstream task, dbt_run will fail due to not being able to find the packages that were just installed? Do I need to hand off the dbt_packages folder via xcom somehow?

    I'm happy to add some documentation for the next person, I'm just hoping to get pointed in the right direction.

    Thanks again for making this library!

    enhancement 
    opened by samLozier 8
  • Not all default arguments are being passed to the DbtBuildOperator

    Not all default arguments are being passed to the DbtBuildOperator

    Hey!

    I noticed that the on_failure_callback from the default argument is not being passed to the DBTBuildOperator as wall as the sla argument.

    I can see from the "Instance Details" these arguments are being passed to my other tasks but not to the airflow_dbt_python operators I bring in. Will try and come up with a PR for the fix, but if you have any guidance on where to start that would be great!

    Thanks

    question 
    opened by marcusintrohive 5
  • fix(s3): Split bucket name and key before uploading

    fix(s3): Split bucket name and key before uploading

    S3 files were uploaded to incorrect keys when running Airflow 2. This was caused by differences between Airflow 1.10 and Airflow 2: the latter assumes that when passing both bucket_name and key that the key is to be taken as it is, where as the former seemed to work even when the key contained the full url.

    Now, we do the parsing and splitting ourselves. We would just set bucket_name to None, however with Airflow 2.0 the upload function checks for the presence of the bucket_name argument, to decide whether to parse the url, not if it's set to None (I think this may be a bug).

    opened by tomasfarias 4
  • Push to S3: malformed URL

    Push to S3: malformed URL

    Hi there,

    I'm testing out airflow-dbt-python==0.12.0 on AWS MWAA. It's working brilliantly with push_dbt_project=False, but fails when trying to push things back to S3 using push_dbt_project=True. I definitely have an IAM policy associated with MWAA that gives the correct PutObject permission for the bucket.

    Looking at my log output, I think there's something wrong with the arguments being passed into load_file_handle_replace_error.

    The log message written by the operator shows the correct target URL:

    [2022-02-21, 03:24:01 UTC] {{dbt.py:258}} INFO - Pushing dbt project back to S3: s3://mwaa-test-bucket/dbt/project/dbt.zip
    

    But when the boto3 S3UploadFailedError occurs later, the error output shows a malformed URL:

    boto3.exceptions.S3UploadFailedError: Failed to upload /tmp/airflowtmptr4_9_un/dbt_project.zip to mwaa-test-bucket/s3://mwaa-test-bucket/dbt/project/dbt.zip: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
    

    I had a quick look at the S3 hook code, and I think it may be to do with this bit:

    https://github.com/tomasfarias/airflow-dbt-python/blob/54729271fa215442149bc21e8b301f6317a33157/airflow_dbt_python/hooks/s3.py#L174-L181

    As I understand it, what's happening here is that the full S3 URL is being passed in as the object key, where I think it should just be the object key itself (assuming that the underlying Airflow S3 hook code should be able to construct the final S3 URL using the supplied bucket name and key).

    This probably explains why the logs have a malformed URL - it looks like it's concatenating the bucket name (mwaa-test-bucket/) and key (s3://mwaa-test-bucket/dbt/project/dbt.zip).

    bug 
    opened by sparkysean 4
  • Make apache-airflow not required

    Make apache-airflow not required

    Since we wish to support all MWAA supported versions of Airflow, this requires supporting both 1.10.12 and 2.X releases. However, There are several conflicting dependencies between dbt-core and apache-airflow. None of the conflicts should break us, but they do make dependency resolution impossible, which means airflow-dbt-python cannot be installed.

    For this reason we have removed the dependency to apache-airflow. Assuming that as an Airflow operator we are always installing airflow-dbt-python in an environment that already has Airflow installed, this shouldn't cause any issues in production deployments.

    Moreover, this makes testing multiple Airflow versions easier as we can pre-install apache-airflow.

    opened by tomasfarias 4
  • Error downloading dbt project files from s3

    Error downloading dbt project files from s3

    When passing an S3 URL to project_dir parameter, DbtS3Hook tries to download the root folder (prefix) as the temp dir throwing an exception.

    With project_dir="s3://MYBUCKET.com/dbt"

    ...
    [2021-12-14 20:15:22,510] {{dbt.py:259}} INFO - Fetching dbt project from S3: s3://MYBUCKET.com/dbt/
    [2021-12-14 20:15:22,511] {{s3.py:82}} INFO - Downloading dbt project files from: s3://MYBUCKET.com/dbt/
    ...
    [2021-12-14 20:15:24,950] {{s3.py:58}} INFO - Saving s3.Object(bucket_name='MYBUCKET.com', key='dbt/') file to: /tmp/airflowtmpimiwzxx7
    [2021-12-14 20:15:24,952] {{taskinstance.py:1482}} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
        self._prepare_and_execute_task_with_callbacks(context, task)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
        result = self._execute_task(context, task_copy)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
        result = task_copy.execute(context=context)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 147, in execute
        with self.dbt_directory() as dbt_dir:  # type: str
      File "/usr/lib64/python3.7/contextlib.py", line 112, in __enter__
        return next(self.gen)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 234, in dbt_directory
        self.prepare_directory(tmp_dir)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 262, in prepare_directory
        tmp_dir,
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/s3.py", line 112, in get_dbt_project
        bucket_name, s3_object_keys, local_project_dir, key_prefix
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/s3.py", line 131, in download_many_s3_keys
        self.download_one_s3_object(local_project_file, s3_object)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/s3.py", line 60, in download_one_s3_object
        with open(target, "wb+") as f:
    IsADirectoryError: [Errno 21] Is a directory: '/tmp/airflowtmpimiwzxx7'
    

    I wasn't able to figure out why airflow_dbt_python/hooks/s3.py, line 98:

    s3_object_keys = self.list_keys(bucket_name=bucket_name, prefix=f"{key_prefix}")
    

    Returns the prefix folder itself (dbt/) as one of the keys.

    Then airflow_dbt_python/hooks/s3.py, line 55, fails to open the file as it is actually a folder:

    with open(target, "wb+") as f:
        s3_object.download_fileobj(f)
    

    By adding the following check after line 112, I was able to workaround the error.

    if s3_object_key == prefix:
        continue
    
    bug 
    opened by tpcarneiro 4
  • MWAA and Environment Variables in profiles.yml

    MWAA and Environment Variables in profiles.yml

    Hello! I found your package tonight and am currently trying to get this set up. After finding workarounds for the below by essentially using the /tmp directory on MWAA, I believe I am on my last issue before successful deploy.

    modules-path
    target_path
    log-path
    

    I am trying to get secrets read in through my profiles.yml file. I have tried entries like the below but to no avail.

    host: "{{ env_var('AIRFLOW__DB_ENDPOINT') }}"
    host: "{{ env_var('AIRFLOW_DB_ENDPOINT') }}"
    host: "{{ env_var('DB_ENDPOINT') }}"
    host: "{{ env_var('AIRFLOW__DB_ENDPOINT', 'not_set') }}"
    

    The last option let me bypass the error temporarily before it failed when building the model. I know this may not be entirely specific to your package, but noticed that you have helped people out before with issues trying to get MWAA running.

    https://docs.aws.amazon.com/mwaa/latest/userguide/configuring-env-variables.html - I was using this somewhat but have not yet actually added anything under custom options. As you can see above, I tried adding AIRFLOW_ and AIRFLOW__ to the beginning but no luck.

    Thought I would reach out and see if you have had luck with something similar. We just switched over to using AWS's secret manager in MWAA and its been working like a charm in our Python tools and it still uses Variable.get() (from Airflows library).

    The typical error I get is:

    Env var required but not provided: 'DB_ENDPOINT'

    but have also gotten this:

    Env var required but not provided: 'DB_ENDPOINT'. This can happen when calling a macro that does not exist. Check for typos and/or install package dependencies with "dbt deps".

    Any help would be appreciated if you have the time!

    opened by lukedappleton 4
  • chore: Update flake8 pre-commit URL to point to GitHub repository

    chore: Update flake8 pre-commit URL to point to GitHub repository

    flake8 has been migrated to GitHub, and the GitLab repository has been deleted.

    For existing contributors, pre-commit still works because a virtualenv with the current configured flake8 version is cached. However, cleaning up the cache and trying to run pre-commit run is enough to reproduce the issue.

    bug 
    opened by adamantike 3
  • Debug logs are default - cannot be turned off

    Debug logs are default - cannot be turned off

    Running on MWAA 2.2.2 I upgraded to 0.15.2 and airflow-redshift==1.2.1. Prior to the upgrade, logs appeared like this:

    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | Concurrency: 4 threads (target='prod')
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 |
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | 1 of 15 START table model ***.................... [RUN]
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | 2 of 15 START table model ****.......................... [RUN]
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | 3 of 15 START table model ***................. [RUN]
    

    After the upgrade,

    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.678976 [debug] [MainThread]: Partial parsing not enabled
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.691536 [debug] [MainThread]: Parsing macros/generate_schema_name.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.692644 [debug] [MainThread]: Parsing macros/adapters.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.710620 [debug] [MainThread]: Parsing macros/catalog.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.718226 [debug] [MainThread]: Parsing macros/relations.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.718708 [debug] [MainThread]: Parsing macros/adapters/apply_grants.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.719046 [debug] [MainThread]: Parsing macros/utils/datediff.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.719490 [debug] [MainThread]: Parsing macros/utils/listagg.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.721693 [debug] [MainThread]: Parsing macros/utils/split_part.sql
    

    Not sure if this is due to dbt or airflow-dbt-python Since the filename is functions.py (previously logging_mixin.py), I assume that this is due to the dbt upgrade. But when I run dbt from the command line, I do not see these debug logs, so I wanted to check.

    I've tried

    config:
        debug: False
    

    in profiles.yml but this doesn't seem to help

    opened by cvlendistry 1
  • connection as a target does not work in MWAA

    connection as a target does not work in MWAA

    Running the example code producing the following on AWS MWAA :

    session = settings.Session()  # type: ignore
    existing = session.query(Connection).filter_by(conn_id="my_db_connection").first()
    
    if existing is None:
        # For illustration purposes, and to keep the example self-contained, we create
        # a Connection using Airflow's ORM. However, any method of loading connections would
        # work, like Airflow's UI, Airflow's CLI, or in deployment scripts.
    
        my_conn = Connection(
            conn_id="my_db_connection",
            conn_type="redshift",
            description="redshift connection",
            host="abc.ch0n7gct0zxb.us-west-2.redshift.amazonaws.com",
            login="dbt_process_user",
            port=5439,
            schema="stage",
            password= get_secret("dev/dbt_process_user")['password'],  # pragma: allowlist secret
            # Other dbt parameters can be added as extras
            extra=json.dumps(dict(threads=4, sslmode="require")),
        )
        session.add(my_conn)
        session.commit()
    with DAG(
        dag_id="dbt_tomasfarias", catchup=False, default_args=default_args, tags=["dbt", "loan tape"], schedule_interval="0 11 * * *"
    ) as dag:
        dbt_run = DbtRunOperator(
            task_id="dbt_run",
            target="my_db_connection",
            #dbt_bin="/usr/local/airflow/.local/bin/dbt",
            profiles_dir=None,
            project_dir="/usr/local/airflow/dags/dbt/etl/",
      
        )
    
    [2022-11-02, 16:11:29 UTC] {{taskinstance.py:1703}} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
        self._execute_task_with_callbacks(context)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
        result = self._execute_task(context, self.task)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
        result = execute_callable(context=context)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 140, in execute
        config = self.get_dbt_config()
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 185, in get_dbt_config
        return factory.create_config(**config_kwargs)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/dbt.py", line 374, in create_config
        initialize_config_values(config)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/main.py", line 170, in initialize_config_values
        cfg = read_user_config(parsed.profiles_dir)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/config/profile.py", line 74, in read_user_config
        profile = read_profile(directory)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/config/profile.py", line 50, in read_profile
        path = os.path.join(profiles_dir, 'profiles.yml')
      File "/usr/lib64/python3.7/posixpath.py", line 80, in join
        a = os.fspath(a)
    TypeError: expected str, bytes or os.PathLike object, not NoneType
    
    awaiting response 
    opened by liongitusr 1
  • feature: Support pulling dbt artifacts from XCOM

    feature: Support pulling dbt artifacts from XCOM

    Currently, airflow-dbt-python supports pushing artifacts to XCOM. However, we do not offer pulling artifacts from XCOM. This would be particularly useful for stateful runs, like when using the source_status:fresher+ selector method that came with dbt 1.1.

    enhancement 
    opened by tomasfarias 0
  • Implement Apache Airflow provider's hook interface

    Implement Apache Airflow provider's hook interface

    Not quite an interface as it's not defined in BaseHook, but most Apache Airflow providers define proper connection types for each of their hooks. This may be a chance to refactor the hooks module into individual hook classes for each dbt supported target type. Furthermore, this can be the chance to clean up the docstring documentation as it's missing a lot of information.

    Perhaps we can apply to be integrated into Apache Airflow as an official provider? Currently, a dbt Cloud provider package exists, but nothing for "vanilla" dbt.

    documentation enhancement 
    opened by tomasfarias 0
  • Duplicate log lines

    Duplicate log lines

    The dbt loggers are producing multiple lines of output that are essentially the same, but one of them is the debug output.

    This is caused by both the file logger and the stdout logger being fired off. We tried to clear the handlers from one of them in operators.dbt.BaseDbtOperator.override_dbt_logging but this is not working.

    We should dig into this issue further to avoid verbose logging outputs.

    bug 
    opened by tomasfarias 0
Releases(v0.15.2)
Owner
Tomás Farías Santana
I like writing code and sometimes I'm paid for it. Other times I try to blog about code.
Tomás Farías Santana
✔️ Create to-do lists to easily manage your ideas and work.

Todo List + Add task + Remove task + List completed task + List not completed task + Set clock task time + View task statistics by date Changelog v 1.

Abbas Ataei 30 Nov 28, 2022
Pyhexdmp - Python hex dump module

Pyhexdmp - Python hex dump module

25 Oct 23, 2022
Python Project Template

A low dependency and really simple to start project template for Python Projects.

Bruno Rocha 651 Dec 29, 2022
Estimate the Market Size for Electic and Plug-In Hybrid Vehicles In Africa

Estimate the Market Size for Electic and Plug-In Hybrid Vehicles In Africa The goal of this repository is to use open data repositories to answer the

Leonce Nshuti 0 Feb 21, 2022
Why write code when you can import it directly from GitHub Copilot?

Copilot Importer Why write code when you can import it directly from GitHub Copilot? What is Copilot Importer? The copilot python module will dynamica

Mythic 41 Jan 04, 2023
Gives criticality score for an open source project

Open Source Project Criticality Score (Beta) This project is maintained by members of the Securing Critical Projects WG. Goals Generate a criticality

Open Source Security Foundation (OpenSSF) 1.1k Dec 23, 2022
BlueBorne Dockerized

BlueBorne Dockerized This is the repo to reproduce the BlueBorne kill-chain on Dockerized Android as described here, to fully understand the code you

SecSI 5 Sep 14, 2022
IDA Pro plugin that shows the comments in a database

ShowComments A Simple IDA Pro plugin that shows the comments in a database Installation Copy the file showcomments.py to the plugins folder under IDA

Fernando Mercês 32 Dec 10, 2022
Simple Python-based web application to allow UGM students to fill their QR presence list without having another device in hand.

Praesentia Praesentia is a simple Python-based web application to allow UGM students to fill their QR presence list without having another device in h

loncat 20 Sep 29, 2022
An evolutionary multi-agent platform based on mesa and NEAT

An evolutionary multi-agent platform based on mesa and NEAT

Valerio1988 6 Dec 04, 2022
RISE allows you to instantly turn your Jupyter Notebooks into a slideshow

RISE RISE allows you to instantly turn your Jupyter Notebooks into a slideshow. No out-of-band conversion is needed, switch from jupyter notebook to a

Damian Avila 3.4k Jan 04, 2023
[arXiv 2020] Video Representation Learning with Visual Tempo Consistency

Video Representation Learning with Visual Tempo Consistency [Paper] [Project Page] News Full codebae is coming soon Pretained Models For now, we provi

DeciForce: Crossroads of Machine Perception and Autonomy 24 Nov 23, 2022
Declarative and extensible library for configuration & code separation

ClassyConf ClassyConf is the configuration architecture solution for perfectionists with deadlines. It provides a declarative way to define settings f

83 Dec 07, 2022
Retrying is an Apache 2.0 licensed general-purpose retrying library, written in Python, to simplify the task of adding retry behavior to just about anything.

Retrying Retrying is an Apache 2.0 licensed general-purpose retrying library, written in Python, to simplify the task of adding retry behavior to just

Ray Holder 1.9k Dec 29, 2022
Automated Content Feed Curator

Gathers posts from content feeds, filters, formats, delivers to you.

Alper S. Soylu 2 Jan 22, 2022
Simple control of Thorlabs Elliptec devices from Python.

Elliptec Simple control of Thorlabs Elliptec devices. No docs yet » Get started · Report a bug · Request a feature About The Project ThorLabs Elliptec

David Roesel 8 Sep 22, 2022
Forward RSS feeds to your email address, community maintained

Getting Started With rss2email We highly recommend that you watch the rss2email project on GitHub so you can keep up to date with the latest version,

248 Dec 28, 2022
AHP Calculator - A method for organizing and evaluating complicated decisions, using Maths and Psychology

AHP Calculator - A method for organizing and evaluating complicated decisions, using Maths and Psychology

16 Aug 08, 2022
This is a simple quizz which can ask user for login/register session, then consult to the Quiz interface.

SIMPLE-QUIZ- This is a simple quizz which can ask user for login/register session, then consult to the Quiz interface. By CHAKFI Ahmed MASTER SYSTEMES

CHAKFI Ahmed 1 Jan 10, 2022
Simple python script for AD enumeration

AutoAD - Simple python script for AD enumeration This tool was created on my spare time to help fellow penetration testers in automating the basic enu

Mohammad Arman 28 Jun 21, 2022