The easiest way to automate your data

Overview

Hello, world! πŸ‘‹

We've rebuilt data engineering for the data science era.

Prefect is a new workflow management system, designed for modern infrastructure and powered by the open-source Prefect Core workflow engine. Users organize Tasks into Flows, and Prefect takes care of the rest.

Read the docs; get the code; ask us anything!

Welcome to Workflows

Prefect's Pythonic API should feel familiar for newcomers. Mark functions as tasks and call them on each other to build up a flow.

from prefect import task, Flow, Parameter


@task(log_stdout=True)
def say_hello(name):
    print("Hello, {}!".format(name))


with Flow("My First Flow") as flow:
    name = Parameter('name')
    say_hello(name)


flow.run(name='world') # "Hello, world!"
flow.run(name='Marvin') # "Hello, Marvin!"

For more detail, please see the Core docs

UI and Server

In addition to the Prefect Cloud platform, Prefect includes an open-source backend for orchestrating and managing flows, consisting primarily of Prefect Server and Prefect UI. This local server stores flow metadata in a Postgres database and exposes a GraphQL API.

Before running the server for the first time, run prefect backend server to configure Prefect for local orchestration. Please note the server requires Docker and Docker Compose to be running.

To start the server, UI, and all required infrastructure, run:

prefect server start

Once all components are running, you can view the UI by visiting http://localhost:8080.

Please note that executing flows from the server requires at least one Prefect Agent to be running: prefect agent local start.

Finally, to register any flow with the server, call flow.register(). For more detail, please see the orchestration docs.

"...Prefect?"

From the Latin praefectus, meaning "one who is in charge", a prefect is an official who oversees a domain and makes sure that the rules are followed. Similarly, Prefect is responsible for making sure that workflows execute properly.

It also happens to be the name of a roving researcher for that wholly remarkable book, The Hitchhiker's Guide to the Galaxy.

Integrations

Thanks to Prefect's growing task library and deep ecosystem integrations, building data applications is easier than ever.

Something missing? Open a feature request or contribute a PR! Prefect was designed to make adding new functionality extremely easy, whether you build on top of the open-source package or maintain an internal task library for your team.

Task Library

Airtable

Asana

AWS

Azure

Azure ML

Databricks

DBT

Docker

Dremio

Dropbox

Email

GitHub

Google Cloud

Google Sheets

Great Expectations

Jira

Jupyter

Kubernetes

Monday

MySQL

PostgreSQL

Python

Pushbullet

Redis

RSS

Shell

Slack

Snowflake

SpaCy

SQLite

SQL Server

Trello

Twitter

Deployment & Execution

Azure

AWS

Dask

Docker

Google Cloud

Kubernetes

Universal Deploy

Resources

Prefect provides a variety of resources to help guide you to a successful outcome.

We are committed to ensuring a positive environment, and all interactions are governed by our Code of Conduct.

Documentation

Prefect's documentation -- including concepts, tutorials, and a full API reference -- is always available at docs.prefect.io.

Instructions for contributing to documentation can be found in the development guide.

Slack Community

Join our Slack to chat about Prefect, ask questions, and share tips.

Blog

Visit the Prefect Blog for updates and insights from the Prefect team.

Support

Prefect offers a variety of community and premium support options for users of both Prefect Core and Prefect Cloud.

Contributing

Read about Prefect's community or dive in to the development guides for information about contributions, documentation, code style, and testing.

Installation

Requirements

Prefect requires Python 3.6+. If you're new to Python, we recommend installing the Anaconda distribution.

Latest Release

To install Prefect, run:

pip install prefect

or, if you prefer to use conda:

conda install -c conda-forge prefect

or pipenv:

pipenv install --pre prefect

Bleeding Edge

For development or just to try out the latest features, you may want to install Prefect directly from source.

Please note that the master branch of Prefect is not guaranteed to be compatible with Prefect Cloud or the local server.

git clone https://github.com/PrefectHQ/prefect.git
pip install ./prefect

License

Prefect Core is licensed under the Apache Software License Version 2.0. Please note that Prefect Core includes utilities for running Prefect Server and the Prefect UI, which are themselves licensed under the Prefect Community License.

Comments
  • Enter the Prefect 5k contest!

    Enter the Prefect 5k contest!

    Thanks for helping Prefect hit a big milestone!

    To celebrate 5,000 stars, we're having a contest.

    The grand prize is a $5,000 Cloud license, but we've got plenty of other items: Prefect gear, Marvin ducks, coffee, etc.

    It's really easy to enter: just mention @marvin-robot in any issue or PR on this repo. Even this one - go ahead and try!

    You can also enter by mentioning @Marvin in our Slack.

    Good luck and happy engineering!

    opened by jlowin 53
  • Use 'from' to explicitly chain exceptions

    Use 'from' to explicitly chain exceptions

    Current behavior

    I wanted to spend a little time contributing this weekend, so I ran pylint over this repo. I see this project has a .pylintrc but it looks like pylint isn't run in CI, so some things do show up.

    I found one class of warning from pylint that I think is worth addressing: explicitly re-raising exceptions in try-catches with from.

    The newest version of pylint added a check for this. You can see details in "Explicit Exception Chaining". That PEP describes a way to improve stack traces from try-catched exceptions.

    without from

    try:
        {}["a"]
    except KeyError:
        raise RuntimeError("hey this does not exist")
    
    Traceback (most recent call last):
      File "<stdin>", line 2, in <module>
    KeyError: 'a'
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "<stdin>", line 4, in <module>
    RuntimeError: hey this does not exist
    

    with from

    try:
        {}["a"]
    except KeyError  as err:
        raise RuntimeError("hey this does not exist") from err
    
    Traceback (most recent call last):
      File "<stdin>", line 2, in <module>
    KeyError: 'a'
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "<stdin>", line 4, in <module>
    RuntimeError: hey this does not exist
    

    I think this language of "was the direct cause of the following exception" is clearer, and helps you to understand exactly where things started going wrong.

    Proposed behavior

    Would you be open to a pull request that changes try-catched exceptions in prefect to use this pattern? I wanted to ask first because right now there are 1334 places where that happens so it would touch a lot of code.

    you can see them all like this:

    pylint src/prefect/ | grep raise-missing-from > pylint.txt
    

    I think this change would improve prefect and help save users some debugging time. Especially users who are not very experienced in Python.

    Thanks for your time and consideration.

    Notes for Hacktoberfest Contributors

    This issue has been left open for Hacktoberfest 2020 contributors. If that describes you, welcome!

    1. Leave a comment below claiming a sub-module
    2. Identify all cases of this issue in that submodule by running code like the following in a terminal:
      pylint src/prefect/engine/cloud | grep "raise-missing-from"
      
    3. Fix all of these cases.
      • see #3320 for an example
    4. Open a pull request, following the steps in https://github.com/PrefectHQ/prefect/pulls
      • if there is already a changes/issue3306.yaml, add your name to the list of contributors
      • if there is not already a changes/issue3306.yaml, create one in your pull request, like in #3320

    sub-modules with this problem

    • ~src/prefect/client~ - @jameslamb (#3320)
    • ~src/prefect/core~ - @brainless (#3383)
    • ~src/prefect/engine~ - @brainless (#3383)
    • src/prefect/environments/execution/dask
    • src/prefect/environments/execution/k8s
    • ~src/prefect/environments/storage~ - @shalika10
    • src/prefect/tasks/airtable
    • ~src/prefect/tasks/aws~ - @heyitskevin
    • src/prefect/tasks/azure
    • src/prefect/tasks/azureml
    • ~src/prefect/tasks/databricks~ - @rikturr (#3448)
    • src/prefect/tasks/dbt
    • src/prefect/tasks/dropbox
    • ~src/prefect/tasks/gcp~ - @ericlundy87 (#3326)
    • src/prefect/tasks/great_expectations
    • src/prefect/tasks/gsheets
    • src/prefect/tasks/kubernetes
    • ~src/prefect/tasks/mysql~ - @luthrap (#3428)
    • ~src/prefect/tasks/postgres~ - @brunocasarotti
    • src/prefect/tasks/redis
    • src/prefect/tasks/rss
    • src/prefect/tasks/snowflake
    • ~src/prefect/tasks/spacy~ - @sp1thas
    • src/prefect/tasks/template
    • src/prefect/tasks/twitter
    • ~src/prefect/utilities~ - @gabrielcalderon (#3429)

    NOTE: This issue is not urgent. It's ok to claim a sub-module now and not contribute until Hacktoberfest begins on October 1st.

    good first issue 
    opened by jameslamb 43
  • Docker agent with server tasks stuck submitted: host.docker.internal connection error

    Docker agent with server tasks stuck submitted: host.docker.internal connection error

    Description

    The docker agent deployed locally on the same machine as the docker server fails to run tasks, they get stuck in the submitted state on the server. From the logs, it seems to relate to an inability to connect to host.docker.internal.

    [2021-09-13 15:59:17,700] INFO - agent | Deploying flow run 2d52c82a-e890-4f66-a845-7f5b0664ef0d to execution environment...
    [2021-09-13 15:59:18,038] INFO - agent | Completed deployment of flow run 2d52c82a-e890-4f66-a845-7f5b0664ef0d
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/urllib3/connection.py", line 169, in _new_conn
        conn = connection.create_connection(
      File "/usr/local/lib/python3.8/site-packages/urllib3/util/connection.py", line 96, in create_connection
        raise err
      File "/usr/local/lib/python3.8/site-packages/urllib3/util/connection.py", line 86, in create_connection
        sock.connect(sa)
    ConnectionRefusedError: [Errno 111] Connection refused
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/urllib3/connectionpool.py", line 699, in urlopen
        httplib_response = self._make_request(
      File "/usr/local/lib/python3.8/site-packages/urllib3/connectionpool.py", line 394, in _make_request
        conn.request(method, url, **httplib_request_kw)
      File "/usr/local/lib/python3.8/site-packages/urllib3/connection.py", line 234, in request
        super(HTTPConnection, self).request(method, url, body=body, headers=headers)
      File "/usr/local/lib/python3.8/http/client.py", line 1256, in request
        self._send_request(method, url, body, headers, encode_chunked)
      File "/usr/local/lib/python3.8/http/client.py", line 1302, in _send_request
        self.endheaders(body, encode_chunked=encode_chunked)
      File "/usr/local/lib/python3.8/http/client.py", line 1251, in endheaders
        self._send_output(message_body, encode_chunked=encode_chunked)
      File "/usr/local/lib/python3.8/http/client.py", line 1011, in _send_output
        self.send(msg)
      File "/usr/local/lib/python3.8/http/client.py", line 951, in send
        self.connect()
      File "/usr/local/lib/python3.8/site-packages/urllib3/connection.py", line 200, in connect
        conn = self._new_conn()
      File "/usr/local/lib/python3.8/site-packages/urllib3/connection.py", line 181, in _new_conn
        raise NewConnectionError(
    urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f25672ab070>: Failed to establish a new connection: [Errno 111] Connection refused
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/requests/adapters.py", line 439, in send
        resp = conn.urlopen(
      File "/usr/local/lib/python3.8/site-packages/urllib3/connectionpool.py", line 755, in urlopen
        retries = retries.increment(
      File "/usr/local/lib/python3.8/site-packages/urllib3/util/retry.py", line 574, in increment
        raise MaxRetryError(_pool, url, error or ResponseError(cause))
    urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='host.docker.internal', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f25672ab070>: Failed to establish a new connection: [Errno 111] Connection refused'))
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/usr/local/bin/prefect", line 8, in <module>
        sys.exit(cli())
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
        return self.main(*args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
        rv = self.invoke(ctx)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
        return callback(*args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 53, in flow_run
        result = client.graphql(query)
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 548, in graphql
        result = self.post(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 451, in post
        response = self._request(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 737, in _request
        response = self._send_request(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 602, in _send_request
        response = session.post(
      File "/usr/local/lib/python3.8/site-packages/requests/sessions.py", line 590, in post
        return self.request('POST', url, data=data, json=json, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/requests/sessions.py", line 542, in request
        resp = self.send(prep, **send_kwargs)
      File "/usr/local/lib/python3.8/site-packages/requests/sessions.py", line 655, in send
        r = adapter.send(request, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/requests/adapters.py", line 516, in send
        raise ConnectionError(e, request=request)
    requests.exceptions.ConnectionError: HTTPConnectionPool(host='host.docker.internal', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f25672ab070>: Failed to establish a new connection: [Errno 111] Connection refused'))
    
    

    There seem to have been similar or related issues in the past: https://github.com/PrefectHQ/server/issues/25 https://github.com/PrefectHQ/prefect/issues/2324 / https://github.com/PrefectHQ/prefect/pull/2328 https://github.com/PrefectHQ/prefect/pull/4714 https://github.com/PrefectHQ/prefect/pull/4777 https://github.com/PrefectHQ/prefect/pull/4809 However, they seemed to indicate they should be resolved in the version I'm using (details below), and are closed

    Expected Behavior

    I would expect the docker agent to be able to run the flow and communicate this back to the server

    Reproduction

    Shell:

    # shell 1 (first)
    pip install prefect==0.15.5
    prefect server start
    # http://localhost:8080
    
    # shell 2 (second)
    prefect backend server
    prefect create project "Test"
    prefect agent docker start --label docker --show-flow-logs
    

    Run Python script to register flow (third):

    import prefect
    from prefect import task, Flow
    from prefect.run_configs import DockerRun
    from prefect.storage import Docker
    
    
    @task
    def hello_task():
        logger = prefect.context.get("logger")
        logger.info("Hello world!")
    
    
    with Flow(
            "hello-flow",
            storage=Docker(
                image_name="my_testing_img",
            ),
            run_config=DockerRun(
                labels=["docker"],
            )
    ) as flow:
        hello = hello_task()
    
    
    if __name__ == '__main__':
        flow.register(project_name='Test')
    
    

    Trigger flow (fourth):

    # shell 3 
    prefect run --project Test --name "hello-flow"
    

    Environment

    Ubuntu 20.04

    ❯ prefect diagnostics
    {
      "config_overrides": {},
      "env_vars": [],
      "system_information": {
        "platform": "Linux-5.11.0-34-generic-x86_64-with-glibc2.29",
        "prefect_backend": "server",
        "prefect_version": "0.15.5",
        "python_version": "3.8.12"
      }
    }
    ❯ docker version
    Client: Docker Engine - Community
     Version:           20.10.8
     API version:       1.41
     Go version:        go1.16.6
     Git commit:        3967b7d
     Built:             Fri Jul 30 19:54:27 2021
     OS/Arch:           linux/amd64
     Context:           default
     Experimental:      true
    
    Server: Docker Engine - Community
     Engine:
      Version:          20.10.8
      API version:      1.41 (minimum version 1.12)
      Go version:       go1.16.6
      Git commit:       75249d8
      Built:            Fri Jul 30 19:52:33 2021
      OS/Arch:          linux/amd64
      Experimental:     false
     containerd:
      Version:          1.4.9
      GitCommit:        e25210fe30a0a703442421b0f60afac609f950a3
     runc:
      Version:          1.0.1
      GitCommit:        v1.0.1-0-g4144b63
     docker-init:
      Version:          0.19.0
      GitCommit:        de40ad0
    

    Thanks for taking a look, hopefully this is clear and I haven't overlooked something, let me know if you need any more detail?

    status:accepted great writeup 
    opened by cbrown1234 34
  • Network failures with self-hosted servers

    Network failures with self-hosted servers

    This is tracking issue for various reports of network failures when self-hosting Prefect Orion.

    Notably, these issues seem focused to usage of Prefect 2.6.6.

    If adding a report to this issue, please include the following information:

    • If using Prefect official Docker images for the client or server, provide the image tags
    • On the server, we are interested in the Prefect version, the database, and server library versions
    prefect version
    pip freeze | grep -E '(uvicorn|starlette)'
    
    • On the client, we are interested in Prefect versions and the client HTTP library versions
    prefect version
    pip freeze | grep -E '(httpx|httpcore)'
    
    • Please include the full traceback for the error
    • Check for any related error logs on the server

    Related to:

    • https://github.com/PrefectHQ/prefect/issues/7472
    bug priority:low status:upstream 
    opened by madkinsz 28
  • Add `.pipe` method to `prefect.Task`

    Add `.pipe` method to `prefect.Task`

    Summary

    Adds task.pipe, which allows for a pipeline-line method chain, e.g. outer(inner(arg)) can be rewritten as inner(arg).pipe(outer).

    Changes

    • Adds a single method to the base Task class: .pipe

    Importance

    Discussion of pipe operators exist in nearly all languages. Bash and other shells have the | pipe operator built-in. The R language has recently implemented a native pipe |>. A proposal for pipes in JavaScript was highly popular. In the Python world, Pandas has a DataFrame.pipe method which has the exact same motivations for chaining function calls when an appropriate DataFrame method doesn't exist. Workflow languages, which belong to the same class of software as Prefect, often have such pipe operators. For example nextflow recently added the pipe operator.

    Firstly, a pipe allows for "cleaner" code when large numbers of functions are composed. If for example we wanted to call 5 different tasks in Prefect, we might do task_5(task_4(task_3(task_2(task_1(arg))))). This has two issues: firstly, this needs to be read right-to-left instead of left-to-right as in most code. Secondly, it's very ugly.

    We might instead decide to put each task call on each line:

    res_1 = task_1(arg)
    res_2 = task_2(res_1)
    res_3 = task_3(res_2)
    res_4 = task_4(res_3)
    res_5 = task_5(res_4)
    

    This fixes the right-to-left issue, but forces us to assign intermediate variables to each result. This increases the mental effort involved in writing a flow ("what is the meaning of this result?"), but also pollutes the namespace, and it is very easy to mix up the result from one task with the result from another.

    Using pipes solves all of these issues:

    task_1(arg).pipe(task_2).pipe(task_3).pipe(task_4).pipe(task_5)
    

    There is perhaps some argument that the .pipe is too verbose, but it helps with clarity. In theory this could be improved by shortening the method name or instead overriding an operator like |.

    Checklist

    This PR:

    • [x] adds new tests (if appropriate)
    • [ ] adds a change file in the changes/ directory (if appropriate)
    • [x] updates docstrings for any new functions or function arguments, including docs/outline.toml for API reference docs (if appropriate)
    great writeup 
    opened by multimeric 27
  • Could not find a version that matches marshmallow-oneofschema<3.0,>=2.0.0b2

    Could not find a version that matches marshmallow-oneofschema<3.0,>=2.0.0b2

    $ python --version
    Python 3.5.2
    
    (prefect) [email protected]:~/code/prefect$ pipenv install prefect
    Installing prefect…
    Collecting prefect
      Downloading https://files.pythonhosted.org/packages/c3/86/6599cd54083875df820e3f980a22788a3e23756d8878f3b0e31a61b74dfe/prefect-0.5.0-py3-none-any.whl (208kB)
    Collecting marshmallow-oneofschema<3.0,>=2.0.0b2 (from prefect)
      Downloading https://files.pythonhosted.org/packages/d6/e4/9e95f3aeb376dd1e6ca65c72985e26d65e0124fe4d2d9623ad7d28fc1bc8/marshmallow_oneofschema-2.0.0b2-py2.py3-none-any.whl
    Collecting python-dateutil<3.0,>2.7.3 (from prefect)
      Downloading https://files.pythonhosted.org/packages/41/17/c62faccbfbd163c7f57f3844689e3a78bae1f403648a6afb1d0866d87fbb/python_dateutil-2.8.0-py2.py3-none-any.whl (226kB)
    Collecting pendulum<3.0,>=2.0.4 (from prefect)
      Downloading https://files.pythonhosted.org/packages/95/37/91ea39046ea2523d72e0607a94e3ededd65f0d945ccb0e042f0aecd87dfc/pendulum-2.0.4-cp35-cp35m-manylinux1_x86_64.whl (139kB)
    Collecting distributed<2.0,>=1.21.8 (from prefect)
      Downloading https://files.pythonhosted.org/packages/e7/52/08ead8f0968d42510630f307100307b481298696cb7de81d58f0cdf7f320/distributed-1.26.0-py2.py3-none-any.whl (506kB)
    Collecting requests<3.0,>=2.20 (from prefect)
      Downloading https://files.pythonhosted.org/packages/7d/e3/20f3d364d6c8e5d2353c72a67778eb189176f08e873c9900e10c0287b84b/requests-2.21.0-py2.py3-none-any.whl (57kB)
    Collecting xxhash<2.0,>=1.2.0 (from prefect)
      Downloading https://files.pythonhosted.org/packages/5e/bc/dc4c2f71a85b2952097fcf9c45475c7919cfe94290928a3467070dbec495/xxhash-1.3.0-cp35-cp35m-manylinux1_x86_64.whl (46kB)
    Collecting toml<1.0,>=0.9.4 (from prefect)
      Downloading https://files.pythonhosted.org/packages/a2/12/ced7105d2de62fa7c8fb5fce92cc4ce66b57c95fb875e9318dba7f8c5db0/toml-0.10.0-py2.py3-none-any.whl
    Collecting docker<4.0,>=3.4.1 (from prefect)
      Downloading https://files.pythonhosted.org/packages/fb/d8/8242b8fb3bd3000274fbf5ac1a06cdba8a5ccbcf4e2a8c05f0ab37999fd8/docker-3.7.1-py2.py3-none-any.whl (134kB)
    Collecting cryptography<3.0,>=2.2.2 (from prefect)
      Downloading https://files.pythonhosted.org/packages/5b/12/b0409a94dad366d98a8eee2a77678c7a73aafd8c0e4b835abea634ea3896/cryptography-2.6.1-cp34-abi3-manylinux1_x86_64.whl (2.3MB)
    Collecting pytz>=2018.7 (from prefect)
      Downloading https://files.pythonhosted.org/packages/61/28/1d3920e4d1d50b19bc5d24398a7cd85cc7b9a75a490570d5a30c57622d34/pytz-2018.9-py2.py3-none-any.whl (510kB)
    Collecting mypy-extensions<0.5,>=0.4.0 (from prefect)
      Downloading https://files.pythonhosted.org/packages/4d/72/8d54e2b296631b9b14961d583e56e90d9d7fba8a240d5ce7f1113cc5e887/mypy_extensions-0.4.1-py2.py3-none-any.whl
    Collecting cloudpickle<0.7,>=0.6.0 (from prefect)
      Downloading https://files.pythonhosted.org/packages/fc/87/7b7ef3038b4783911e3fdecb5c566e3a817ce3e890e164fc174c088edb1e/cloudpickle-0.6.1-py2.py3-none-any.whl
    Collecting idna<2.8,>=2.5 (from prefect)
      Downloading https://files.pythonhosted.org/packages/4b/2a/0276479a4b3caeb8a8c1af2f8e4355746a97fab05a372e4a2c6a6b876165/idna-2.7-py2.py3-none-any.whl (58kB)
    Collecting marshmallow==3.0.0b19 (from prefect)
      Downloading https://files.pythonhosted.org/packages/60/00/4f792fdbb7f0f243ce7fdb729bee3a8afde968e4bfda8365d47a9367a787/marshmallow-3.0.0b19-py2.py3-none-any.whl (48kB)
    Collecting mypy<0.700,>=0.600 (from prefect)
      Downloading https://files.pythonhosted.org/packages/88/11/8092fdd9cf4c507e6c799bf663e713a5418beb9fda422df810f72641224c/mypy-0.670-py3-none-any.whl (1.5MB)
    Collecting dask<2.0,>=0.18 (from prefect)
      Downloading https://files.pythonhosted.org/packages/b9/bc/0d747625c18397ed548c7890bf984a40d931b9ebac236c570a07565b0cc8/dask-1.1.4-py2.py3-none-any.whl (704kB)
    Collecting croniter<0.4,>=0.3 (from prefect)
      Downloading https://files.pythonhosted.org/packages/a9/c9/11182a2507798c661b04a7914739ea8ca73a738e6869a23742029f51bc1a/croniter-0.3.29-py2.py3-none-any.whl
    Collecting typing<4.0,>=3.6.4 (from prefect)
      Downloading https://files.pythonhosted.org/packages/4a/bd/eee1157fc2d8514970b345d69cb9975dcd1e42cd7e61146ed841f6e68309/typing-3.6.6-py3-none-any.whl
    Collecting typing-extensions<4.0,>=3.6.4 (from prefect)
      Downloading https://files.pythonhosted.org/packages/0f/62/c66e553258c37c33f9939abb2dd8d2481803d860ff68e635466f12aa7efa/typing_extensions-3.7.2-py3-none-any.whl
    Collecting click<8.0,>=7.0 (from prefect)
      Downloading https://files.pythonhosted.org/packages/fa/37/45185cb5abbc30d7257104c434fe0b07e5a195a6847506c074527aa599ec/Click-7.0-py2.py3-none-any.whl (81kB)
    Collecting pyyaml<4.3,>=3.13 (from prefect)
      Downloading https://files.pythonhosted.org/packages/9e/a3/1d13970c3f36777c583f136c136f804d70f500168edc1edea6daa7200769/PyYAML-3.13.tar.gz (270kB)
    Collecting six>=1.5 (from python-dateutil<3.0,>2.7.3->prefect)
      Downloading https://files.pythonhosted.org/packages/73/fb/00a976f728d0d1fecfe898238ce23f502a721c0ac0ecfedb80e0d88c64e9/six-1.12.0-py2.py3-none-any.whl
    Collecting pytzdata>=2018.3 (from pendulum<3.0,>=2.0.4->prefect)
      Downloading https://files.pythonhosted.org/packages/89/02/a3a1cef5074c28157df63846d05aa893f007a92f6bafec0d61cae36bf69d/pytzdata-2018.9-py2.py3-none-any.whl (981kB)
    Collecting toolz>=0.7.4 (from distributed<2.0,>=1.21.8->prefect)
      Downloading https://files.pythonhosted.org/packages/14/d0/a73c15bbeda3d2e7b381a36afb0d9cd770a9f4adc5d1532691013ba881db/toolz-0.9.0.tar.gz (45kB)
    Collecting zict>=0.1.3 (from distributed<2.0,>=1.21.8->prefect)
      Downloading https://files.pythonhosted.org/packages/bd/45/a2e6f95a850cd407d785f2f8624b02e72baf6ab910aea4bdcac7e18b4871/zict-0.1.4-py2.py3-none-any.whl
    Collecting sortedcontainers!=2.0.0,!=2.0.1 (from distributed<2.0,>=1.21.8->prefect)
      Downloading https://files.pythonhosted.org/packages/13/f3/cf85f7c3a2dbd1a515d51e1f1676d971abe41bba6f4ab5443240d9a78e5b/sortedcontainers-2.1.0-py2.py3-none-any.whl
    Collecting tornado>=5 (from distributed<2.0,>=1.21.8->prefect)
      Downloading https://files.pythonhosted.org/packages/03/3f/5f89d99fca3c0100c8cede4f53f660b126d39e0d6a1e943e95cc3ed386fb/tornado-6.0.2.tar.gz (481kB)
    Collecting tblib (from distributed<2.0,>=1.21.8->prefect)
      Downloading https://files.pythonhosted.org/packages/4a/82/1b9fba6e93629a8557f9784cd8f1ae063c8762c26446367a6764edd328ce/tblib-1.3.2-py2.py3-none-any.whl
    Collecting msgpack (from distributed<2.0,>=1.21.8->prefect)
      Downloading https://files.pythonhosted.org/packages/3d/44/4a8be4f56ab3c8fc58800c06b2d681d5622704746d094411f00e25300072/msgpack-0.6.1-cp35-cp35m-manylinux1_x86_64.whl (243kB)
    Collecting psutil>=5.0 (from distributed<2.0,>=1.21.8->prefect)
      Downloading https://files.pythonhosted.org/packages/2f/b8/11ec5006d2ec2998cb68349b8d1317c24c284cf918ecd6729739388e4c56/psutil-5.6.1.tar.gz (427kB)
    Collecting certifi>=2017.4.17 (from requests<3.0,>=2.20->prefect)
      Downloading https://files.pythonhosted.org/packages/60/75/f692a584e85b7eaba0e03827b3d51f45f571c2e793dd731e598828d380aa/certifi-2019.3.9-py2.py3-none-any.whl (158kB)
    Collecting chardet<3.1.0,>=3.0.2 (from requests<3.0,>=2.20->prefect)
      Downloading https://files.pythonhosted.org/packages/bc/a9/01ffebfb562e4274b6487b4bb1ddec7ca55ec7510b22e4c51f14098443b8/chardet-3.0.4-py2.py3-none-any.whl (133kB)
    Collecting urllib3<1.25,>=1.21.1 (from requests<3.0,>=2.20->prefect)
      Downloading https://files.pythonhosted.org/packages/62/00/ee1d7de624db8ba7090d1226aebefab96a2c71cd5cfa7629d6ad3f61b79e/urllib3-1.24.1-py2.py3-none-any.whl (118kB)
    Collecting docker-pycreds>=0.4.0 (from docker<4.0,>=3.4.1->prefect)
      Downloading https://files.pythonhosted.org/packages/f5/e8/f6bd1eee09314e7e6dee49cbe2c5e22314ccdb38db16c9fc72d2fa80d054/docker_pycreds-0.4.0-py2.py3-none-any.whl
    Collecting websocket-client>=0.32.0 (from docker<4.0,>=3.4.1->prefect)
      Downloading https://files.pythonhosted.org/packages/29/19/44753eab1fdb50770ac69605527e8859468f3c0fd7dc5a76dd9c4dbd7906/websocket_client-0.56.0-py2.py3-none-any.whl (200kB)
    Collecting cffi!=1.11.3,>=1.8 (from cryptography<3.0,>=2.2.2->prefect)
      Downloading https://files.pythonhosted.org/packages/5b/44/fdae2a8f66af426055f9b6fff0b155217081eddaf08a3df79ca11fe05bda/cffi-1.12.2-cp35-cp35m-manylinux1_x86_64.whl (428kB)
    Collecting asn1crypto>=0.21.0 (from cryptography<3.0,>=2.2.2->prefect)
      Downloading https://files.pythonhosted.org/packages/ea/cd/35485615f45f30a510576f1a56d1e0a7ad7bd8ab5ed7cdc600ef7cd06222/asn1crypto-0.24.0-py2.py3-none-any.whl (101kB)
    Collecting typed-ast<1.4.0,>=1.3.1 (from mypy<0.700,>=0.600->prefect)
      Downloading https://files.pythonhosted.org/packages/b1/62/7e0bfe7bb75bf8a41b58ccb7bd3ab3afaec61e06d9abfda113ad0dc227be/typed_ast-1.3.1-cp35-cp35m-manylinux1_x86_64.whl (734kB)
    Collecting heapdict (from zict>=0.1.3->distributed<2.0,>=1.21.8->prefect)
      Downloading https://files.pythonhosted.org/packages/e2/ca/f5feba2f939c97629dbce52a17acc95a0d10256ef620334795379dda8ce6/HeapDict-1.0.0.tar.gz
    Collecting pycparser (from cffi!=1.11.3,>=1.8->cryptography<3.0,>=2.2.2->prefect)
      Downloading https://files.pythonhosted.org/packages/68/9e/49196946aee219aead1290e00d1e7fdeab8567783e83e1b9ab5585e6206a/pycparser-2.19.tar.gz (158kB)
    Building wheels for collected packages: pyyaml, toolz, tornado, psutil, heapdict, pycparser
      Building wheel for pyyaml (setup.py): started
      Building wheel for pyyaml (setup.py): finished with status 'done'
      Stored in directory: /home/user/.cache/pipenv/wheels/ad/da/0c/74eb680767247273e2cf2723482cb9c924fe70af57c334513f
      Building wheel for toolz (setup.py): started
      Building wheel for toolz (setup.py): finished with status 'done'
      Stored in directory: /home/user/.cache/pipenv/wheels/f4/0c/f6/ce6b2d1aa459ee97cc3c0f82236302bd62d89c86c700219463
      Building wheel for tornado (setup.py): started
      Building wheel for tornado (setup.py): finished with status 'done'
      Stored in directory: /home/user/.cache/pipenv/wheels/61/7e/7a/5e02e60dc329aef32ecf70e0425319ee7e2198c3a7cf98b4a2
      Building wheel for psutil (setup.py): started
      Building wheel for psutil (setup.py): finished with status 'done'
      Stored in directory: /home/user/.cache/pipenv/wheels/86/36/f4/e1fc49f198c623b8dc2156e80011a52f19a959d538f6b1a5db
      Building wheel for heapdict (setup.py): started
      Building wheel for heapdict (setup.py): finished with status 'done'
      Stored in directory: /home/user/.cache/pipenv/wheels/40/b9/42/344857b482c954f48bcff6db72d388e30bf2bee4ed14706faa
      Building wheel for pycparser (setup.py): started
      Building wheel for pycparser (setup.py): finished with status 'done'
      Stored in directory: /home/user/.cache/pipenv/wheels/f2/9a/90/de94f8556265ddc9d9c8b271b0f63e57b26fb1d67a45564511
    Successfully built pyyaml toolz tornado psutil heapdict pycparser
    Installing collected packages: marshmallow, marshmallow-oneofschema, six, python-dateutil, pytzdata, pendulum, toolz, pyyaml, heapdict, zict, dask, sortedcontainers, click, tornado, tblib, cloudpickle, msgpack, psutil, distributed, idna, certifi, chardet, urllib3, requests, xxhash, toml, docker-pycreds, websocket-client, docker, pycparser, cffi, asn1crypto, cryptography, pytz, mypy-extensions, typed-ast, mypy, croniter, typing, typing-extensions, prefect
    Successfully installed asn1crypto-0.24.0 certifi-2019.3.9 cffi-1.12.2 chardet-3.0.4 click-7.0 cloudpickle-0.6.1 croniter-0.3.29 cryptography-2.6.1 dask-1.1.4 distributed-1.26.0 docker-3.7.1 docker-pycreds-0.4.0 heapdict-1.0.0 idna-2.7 marshmallow-3.0.0b19 marshmallow-oneofschema-2.0.0b2 msgpack-0.6.1 mypy-0.670 mypy-extensions-0.4.1 pendulum-2.0.4 prefect-0.5.0 psutil-5.6.1 pycparser-2.19 python-dateutil-2.8.0 pytz-2018.9 pytzdata-2018.9 pyyaml-3.13 requests-2.21.0 six-1.12.0 sortedcontainers-2.1.0 tblib-1.3.2 toml-0.10.0 toolz-0.9.0 tornado-6.0.2 typed-ast-1.3.1 typing-3.6.6 typing-extensions-3.7.2 urllib3-1.24.1 websocket-client-0.56.0 xxhash-1.3.0 zict-0.1.4
    
    Adding prefect to Pipfile's [packages]…
    Pipfile.lock not found, creating…
    Locking [dev-packages] dependencies…
    Locking [packages] dependencies…
    
    Warning: Your dependencies could not be resolved. You likely have a mismatch in your sub-dependencies.
      First try clearing your dependency cache with $ pipenv lock --clear, then try the original command again.
     Alternatively, you can use $ pipenv install --skip-lock to bypass this mechanism, then run $ pipenv graph to inspect the situation.
      Hint: try $ pipenv lock --pre if it is a pre-release dependency.
    Could not find a version that matches marshmallow-oneofschema<3.0,>=2.0.0b2
    Tried: 1.0, 1.0.1, 1.0.2, 1.0.3, 1.0.3, 1.0.4, 1.0.4, 1.0.5, 1.0.5
    Skipped pre-versions: 2.0.0b1, 2.0.0b1, 2.0.0b2, 2.0.0b2
    There are incompatible versions in the resolved dependencies.
    
    
    opened by dazzag24 25
  • Flow run with Ray and agent throws: `ImportError: cannot import name 'SubprocessFlowRunner' from partially initialized module 'prefect.flow_runners'`

    Flow run with Ray and agent throws: `ImportError: cannot import name 'SubprocessFlowRunner' from partially initialized module 'prefect.flow_runners'`

    Opened from the Prefect Public Slack Community

    christian.vogel: Hi Prefect Community, I am currently receiving an error when using a local (on my pc) prefect agent which pulls from a work queue in the the prefect cloud. The error only occurs when I try to use the RayTaskRunner:

    (begin_task_run pid=141324) ImportError: cannot import name 'SubprocessFlowRunner' from partially initialized module 'prefect.flow_runners' (most likely due to a circular import) (/home/christian/Documents/ray_and_prefect/env/lib/python3.9/site-packages/prefect/flow_runners/__init__.py)
    

    I am using the following dependencies: prefect==2.0b7 prefect-ray==0.1.0 ray==1.13.0 Apparently I am doing something wrong with my dependencies or when I am importing them. Do you have any idea?

    christian.vogel: This is my flow and deployment:

    from prefect import task, flow
    from prefect_ray import RayTaskRunner
    from prefect.deployments import DeploymentSpec
    
    
    @task
    def say_hello(name):
        print(f"hello {name}")
    
    
    @flow(name="temp-flow-example", task_runner=RayTaskRunner(address="auto"))
    def greetings():
        say_hello("Ford")
    
    
    DeploymentSpec(
        name="temp-flow-example",
        flow=greetings
    )
    

    christian.vogel: What confuses me a bit, is the fact, that the flow only fails when pulled from the cloud and seems to work when i run it directly locally via python and a main method.

    christian.vogel: Could it be related to the fact that I am running my agent in a virtual environment? Probably in the background the following is executed:

    from prefect.flow_runners import SubprocessFlowRunner
    

    which them leads to some import issues because of the already imported:

    from prefect_ray import RayTaskRunner
    

    Is that possible?

    anna: do you run your agent in the same virtual environment as you run your local process? perhaps you can explicitly define your flow runner with the virtual environment you use for your agent?

    flow_runner=SubprocessFlowRunner(condaenv="yourcondavenv"),
    

    anna: I was trying to replicate and sth also didn't work for me, even though I got a different error message using the same versions as you did

    anna: my error was:

    Failed to read dashboard log: [Errno 2] No such file or directory: '/tmp/ray/session_2022-07-07_14-08-35_008202_6668/logs/dashboard.log'
    

    anna: wow it took me forever to set this up but this helped https://docs.ray.io/en/master/ray-overview/installation.html#m1-mac-apple-silicon-support

    looks like you need to be super careful about your environment with Ray and grpcio package - do you use Conda? on which machine do you run it?

    christian.vogel: Tried specifying the SubprocessFlowRunner with the virtual env path in my case. but did not help

    christian.vogel: I am running a venv on a linux ubuntu machine

    anna: yup you're right, I was able to reproduce your error, thanks for reporting that

    <@ULVA73B9P> open "Flow run with Ray and agent throws: ImportError: cannot import name 'SubprocessFlowRunner' from partially initialized module 'prefect.flow_runners'"

    Original thread can be found here.

    bug from:slack v2 
    opened by marvin-robot 24
  • Adds ability to use ECS Capacity Provider in ECS Agent and ECS Run

    Adds ability to use ECS Capacity Provider in ECS Agent and ECS Run

    Summary

    PR to add the ability to use capacity providers in ECS Agent and ECSRun when set in run_task_kwargs

    Changes

    • Updates the way ECSAgent set's the launchType and default network configuration when a capacity provider is passed in via run-task-kwargs.

    • Deletes launchType in ECSAgent run_config (opts) in merge_run_task_kwargs if capacityProvider is found in ECSRun's run_config (opts2) and vice versa

    Importance

    Being able to use capacity providers in ECS enables the use of Fargate Spot and ASG's.

    Found in Issues: #4356 and #5210

    Checklist

    This PR:

    • [x] adds new tests (if appropriate)
    • [x] adds a change file in the changes/ directory (if appropriate)
    • [x] updates docstrings for any new functions or function arguments, including docs/outline.toml for API reference docs (if appropriate)
    opened by wfclark5 23
  • AssertionError on running tasks in parallel

    AssertionError on running tasks in parallel

    First check

    • [X] I added a descriptive title to this issue.
    • [X] I used the GitHub search to find a similar issue and didn't find it.
    • [X] I searched the Prefect documentation for this issue.
    • [X] I checked that this issue is related to Prefect and not one of its dependencies.

    Bug summary

    1. Having a main flow which uses sequential task runner, calling a subflow which just calls the .map function on the task trying to create parallel task runs.
    2. few of those tasks crash , giving AssertionError
    Screenshot 2022-11-25 at 10 57 50 AM

    Reproduction

    from prefect import task, flow
    from prefect import get_run_logger
    import time
    from prefect.task_runners import SequentialTaskRunner
    
    
    @task(name='run_scheduler')
    def run_scheduler(event):
        scheduler_output = [i for i in range(1, 300)]
        time.sleep(150)  # Adding sleep time to make task bit long running
        return scheduler_output
    
    
    @task(name='run_executor', tags=['spends_executor'])
    def run_executor(scheduler_output):
        time.sleep(150)  # Adding sleep time to make task bit long running
        executor_output = f"printing just the executor input {scheduler_output}"
        return executor_output
    
    
    @flow(task_runner=SequentialTaskRunner())
    def spends_flow(event):
        logger = get_run_logger()
        logger.info(event)
    
        # Calling Scheduler task
        scheduler_output = run_scheduler(event)
    
        # if the output is empty array just log it else calling subflow which creates parallel task executions
    
        if len(scheduler_output) < 1:
            logger.info("no elements in array")
        else:
            spends_executor(scheduler_output)
        logger.info('flow completed')
    
    
    @flow
    def spends_executor(scheduler_output):
        run_executor.map(scheduler_output)
    
    
    if __name__ == "__main__":
        event = "test"
        spends_flow(event)
    

    Error

    Encountered exception during execution:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1247, in orchestrate_task_run
        result = await run_sync(task.fn, *args, **kwargs)
      File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
        return await anyio.to_thread.run_sync(call, cancellable=True)
      File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
        return await get_asynclib().run_sync_in_worker_thread(
      File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
        return await future
      File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
        result = context.run(func, *args)
      File "flows/test_calculate_spends_flows/test_spends_flow.py", line 16, in run_executor
        output = run_deployment(
      File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 197, in coroutine_wrapper
        return run_async_from_worker_thread(async_fn, *args, **kwargs)
      File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 148, in run_async_from_worker_thread
        return anyio.from_thread.run(call)
      File "/usr/local/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
        return asynclib.run_async_from_thread(func, *args)
      File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
        return f.result()
      File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
        return self.__get_result()
      File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
        raise self._exception
      File "/usr/local/lib/python3.9/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
        return await fn(*args, **kwargs)
      File "/usr/local/lib/python3.9/site-packages/prefect/deployments.py", line 131, in run_deployment
        flow_run = await client.read_flow_run(flow_run_id)
      File "/usr/local/lib/python3.9/site-packages/prefect/client/orion.py", line 1443, in read_flow_run
        response = await self._client.get(f"/flow_runs/{flow_run_id}")
      File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1757, in get
        return await self.request(
      File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1533, in request
        return await self.send(request, auth=auth, follow_redirects=follow_redirects)
      File "/usr/local/lib/python3.9/site-packages/prefect/client/base.py", line 160, in send
        await super().send(*args, **kwargs)
      File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1620, in send
        response = await self._send_handling_auth(
      File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1648, in _send_handling_auth
        response = await self._send_handling_redirects(
      File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1685, in _send_handling_redirects
        response = await self._send_single_request(request)
      File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1722, in _send_single_request
        response = await transport.handle_async_request(request)
      File "/usr/local/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
        resp = await self._pool.handle_async_request(req)
      File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 221, in handle_async_request
        await self._attempt_to_acquire_connection(status)
      File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 160, in _attempt_to_acquire_connection
        status.set_connection(connection)
      File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 22, in set_connection
        assert self.connection is None
    AssertionError
    

    Versions

    agent
    Version:             2.6.6
    API version:         0.8.3
    Python version:      3.9.15
    Git commit:          87767cda
    Built:               Thu, Nov 3, 2022 1:15 PM
    OS/Arch:             linux/x86_64
    Profile:             default
    Server type:         hosted
    
    self hosted orion server
    Version:             2.6.0
    API version:         0.8.2
    Python version:      3.9.14
    Git commit:          96f09a51
    Built:               Thu, Oct 13, 2022 3:21 PM
    OS/Arch:             linux/x86_64
    Profile:             default
    Server type:         hosted
    

    Additional context

    No response

    bug status:upstream 
    opened by deepanshu-zluri 22
  • Bulk deletion of flows in the UI

    Bulk deletion of flows in the UI

    Description

    A dealbreaker that deterred me from using v2.0 (since it has async support) is not having bulk selection of flows to delete them. It's cumbersome to delete one by one, especially if you have a lot of flow runs. Having this feature would be great to enhance the UX.

    ui v2 priority:high 
    opened by SeifHediya 22
  • CLI registration inconsistently bumps flow version even when metadata hasn't changed

    CLI registration inconsistently bumps flow version even when metadata hasn't changed

    Opened from the Prefect Public Slack Community

    dkmarsh: Hi Everyone, I'm having an issue with registering flows. Whenever I register the same flow, it bumps the version, even if the metadata is unchanged. I've tried using the CLI as well as flow.register(). I've also tried using flow.register('project_1', idempotency_key=flow.serialized_hash()) and it still bumps the version. I am using a local agent, local storage and the cloud backend. Is there another configuration that needs to be set so this doesn't happen? Or should this all be done in docker?

    kevin701: Hey <@U022J2DB0UR>, I don’t think moving to Docker will do this. Will ask the team to be sure but what is your use case that you need the version pinned? We might be able to solve this some other way.

    ranugoldan: I once tried with cli command prefect register flow it doesn't bump version, but with prefect register it does

    kevin701: Actually that’s right <@ULCV623AT>! I know some people on the other side have been asking why their flow is not bumping version :sweat_smile:

    amanda.wee: <@U01QEJ9PP53> when should the flow version be bumped? In my ECS-based setup with a bunch of flows packaged together with a local agent, I build the Docker image each time we make a change, even if it is only to tweak a single task. When the Docker container starts up, it runs a shell script that runs the Python scripts that define and register the flows (i.e., not using the CLI). Like what <@ULCV623AT> observed, with serialized_hash() the flow versions kept increasing even if it was just because the ECS task was restarted, thereby starting up the Docker container again.

    My suspicion was that some Docker container metadata was changing each time, e.g., something host-related, so the serialized hash computation changed. My solution was to write my own hash function that took into account the dependencies (including Prefect version) and the code content, but I wonder if that is overkill.

    kevin701: Hey <@U01DJDK6AQ7>, correct me if I’m wrong but I think what you’re referring to is that DockerStorage was not respecting the cache and rebuilding everything. Have you seen this <https://github.com/PrefectHQ/prefect/pull/4584|recent PR> that fixed it?.

    amanda.wee: No, I'm using S3 storage. The Docker image is for ECS, not for Prefect.

    kevin701: Oh I see what you mean. Ok I’ll ask the team and get back to you

    dkmarsh: <@U01QEJ9PP53> It was my understanding that the version should only be bumped when the metadata changes. My use case is to automate the registering of flows once they are added or changed by a developer. I wrote a function that looks in a directory and collects all flows and registers them. I would like to not have the versions bumped on flows that have already been registered and have had no changes.

    Interestingly enough, I tried testing again this morning and discovered that in using the CLI or python api to register, the version will get bumped every other time. For example, if I run prefect register --project Project1 -p src/register.py -n Flow1 it will register the flow, then if I run the same command a few seconds later, it will skip with the message: "Skipped (metadata unchanged)". However, running it a third time, it will register the flow as version 2. So it seems to be skipping the registration as desired every other time.

    znicholasbrown: Hi <@U022J2DB0UR> - this sounds like an issue with CLI registration; I'm going to open an issue from this thread for the Core team to look into.

    znicholasbrown: <@ULVA73B9P> open "CLI registration inconsistently bumps flow version even when metadata hasn't changed"

    Original thread can be found here.

    from:slack 
    opened by marvin-robot 21
  • State pill in each FlowRunList item on the Flow Runs page is `Unknown`

    State pill in each FlowRunList item on the Flow Runs page is `Unknown`

    First check

    • [X] I added a descriptive title to this issue.
    • [X] I used the GitHub search to find a similar issue and didn't find it.
    • [X] I searched the Prefect documentation for this issue.
    • [X] I checked that this issue is related to Prefect and not one of its dependencies.

    Bug summary

    The state pill in each FlowRunList item on the Flow Runs page is Unknown, without state color. However, the scatterplot shows the color for the state of each flow run.

    image (2)

    Reproduction

    N/A
    

    Error

    No response

    Versions

    The user is using Prefect 2.75
    

    Additional context

    Community slack thread

    bug ui status:triage priority:high 
    opened by billpalombi 0
  • Moves `SQLALCHEMY_SILENCE_UBER_WARNING` to setup.cfg

    Moves `SQLALCHEMY_SILENCE_UBER_WARNING` to setup.cfg

    Moves SQLALCHEMY_SILENCE_UBER_WARNING to setup.cfg so that it disables warnings on local test runs as well as in CI.

    Checklist

    • [ ] This pull request references any related issue by including "closes <link to issue>"
      • If no issue exists and your change is not a small fix, please create an issue first.
    • [x] This pull request includes tests or only affects documentation.
    • [x] This pull request includes a label categorizing the change e.g. fix, feature, enhancement
    maintenance 
    opened by desertaxle 1
  • Migrate to SQLAlchemy 2.0

    Migrate to SQLAlchemy 2.0

    First check

    • [X] I am a contributor to the Prefect codebase

    Description

    Tracking issue for migrating to SQLAlchemy 2.0 using the migration guide. When migrating, the changes in #8042 can be removed.

    Impact

    No response

    Additional context

    No response

    maintenance 
    opened by desertaxle 0
  • Rename workerPool components, routes and variables to workPool

    Rename workerPool components, routes and variables to workPool

    To merge this PR changes from orion-design should be released and merged first

    Example

    Checklist

    • [ ] This pull request references any related issue by including "closes <link to issue>"
      • If no issue exists and your change is not a small fix, please create an issue first.
    • [ ] This pull request includes tests or only affects documentation.
    • [ ] This pull request includes a label categorizing the change e.g. fix, feature, enhancement
    maintenance 
    opened by marichka-offen 1
  • Long running kubernetes jobs are marked as crashed by the agent

    Long running kubernetes jobs are marked as crashed by the agent

    First check

    • [X] I added a descriptive title to this issue.
    • [X] I used the GitHub search to find a similar issue and didn't find it.
    • [X] I searched the Prefect documentation for this issue.
    • [X] I checked that this issue is related to Prefect and not one of its dependencies.

    Bug summary

    I have a flow that runs spark job using prefect-databricks connector. If the job is running for more than 4hrs, flow in prefect is marked as crashed after 4hrs + 1-4 minutes

    Reproduction

    # Kubernetes job infra defined as
    
    k8s_job = KubernetesJob(
            namespace=namespace,
            image=image_name,
            image_pull_policy=KubernetesImagePullPolicy.ALWAYS,
            finished_job_ttl=300,
            service_account_name="prefect-server",
            job_watch_timeout_seconds=180,
            customizations={ .. volumes & secrets here },
            env={
                .. bunch of envs here.. 
            }
        )
    
    # images is custom built on top of the 2.7.4 prefect:
    
    ARG PREFECT_VERSION=2.7.4
    ARG PYTHON_VERSION=python3.10
    FROM prefecthq/prefect:${PREFECT_VERSION}-${PYTHON_VERSION}
    
    
    # databricks job is submitted like this:
    
    @flow
    def databricks_run_reload(notebook_path, params):
    
        # compile job task settings
        job_task_settings = JobTaskSettings(
            new_cluster=new_cluster,
            notebook_task=spark_python_notebook_task,
            task_key=task_key,
            timeout_seconds=86400,
            libraries=[]
        )
    
        idempotency_key = str(uuid.uuid4())
        multi_task_runs = jobs_runs_submit_and_wait_for_completion(
            databricks_credentials=databricks_credentials,
            run_name=run_name,
            git_source=git,
            tasks=[job_task_settings],
            max_wait_seconds=86400,
            idempotency_token=idempotency_key
        )
        return multi_task_runs
    
    # and used in other flow:
    
    @flow
    # @flow(persist_result=True, result_storage=S3.load(config.DEFAULT_S3_BLOCK_NAME)) ## tried this as well, but no difference
    def do_reload():
        spark = databricks_run_reload(notebook_path="some_notebook", some_params)
        updated = execute_dml(merge_query, params=query_params, wait_for=[spark])
    

    Error

    Traceback (most recent call last):
      File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 637, in orchestrate_flow_run
        result = await run_sync(flow_call)
      File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
        return await anyio.to_thread.run_sync(
      File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
        return await get_asynclib().run_sync_in_worker_thread(
      File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
        return await future
      File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
        result = context.run(func, *args)
      File "/opt/prefect/flows/spark.py", line 45, in do_reload
        updated = execute_dml(query, params=query_params, wait_for=[spark])
      File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 436, in __call__
        return enter_task_run_engine(
      File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 927, in enter_task_run_engine
        return run_async_from_worker_thread(begin_run)
      File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
        return anyio.from_thread.run(call)
      File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
        return asynclib.run_async_from_thread(func, *args)
      File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
        return f.result()
      File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
        return self.__get_result()
      File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
        raise self._exception
      File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1068, in get_task_call_return_value
        return await future._result()
      File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
        return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
      File "/usr/local/lib/python3.10/site-packages/prefect/states.py", line 100, in _get_state_result
        raise MissingResult(
    prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
    

    Versions

    Version:             2.7.4
    API version:         0.8.4
    Python version:      3.10.8
    Git commit:          30db76e7
    Built:               Thu, Dec 22, 2022 2:55 PM
    OS/Arch:             linux/x86_64
    Profile:             dev
    Server type:         hosted
    

    Additional context

    No response

    bug status:triage 
    opened by mmartsen 0
  • merge permission types to include new WorkspaceFeatureFlag

    merge permission types to include new WorkspaceFeatureFlag

    with the recent PR that adds feature flags to orion-design, nebula and orion-ui will both need to update it's Permissions type to include the possibility of WorkspaceFeatureFlag

    maintenance ui 
    opened by stackoverfloweth 1
Releases(2.7.5)
  • 2.7.5(Dec 29, 2022)

    This release adds two features to the CLI: retrieval of flow run logs and the ability to schedule a flow run for a execution in the future. This release also highlights some updates in our GCP, Airbyte, and DBT collections!

    See the release notes for details.

    Source code(tar.gz)
    Source code(zip)
  • 2.7.4(Dec 22, 2022)

    This release greatly expands control over retry delays, including exponential backoffs and jitter! We've also added support for experimental flags behind the scenes, so expect to see some experimental features soon πŸ‘©β€πŸ”¬

    See the release notes for details.

    Source code(tar.gz)
    Source code(zip)
  • 2.7.3(Dec 16, 2022)

  • 2.7.2(Dec 15, 2022)

    This release adds rescheduling of paused flow runs, pausing flow runs from the UI, a new page for viewing individual task run concurrency limit details, and many other enhancements and fixes :)

    We've also added new collections for Bitbucket and Kubernetes!

    See the release notes for more details.

    Source code(tar.gz)
    Source code(zip)
  • 2.7.1(Dec 8, 2022)

    This release adds a new setting for using custom blocks, improves cancellation edge-cases, a new task concurrency page, and adds bulk deletion to more objects in the UI!

    See the release notes for details and bug fixes!

    Source code(tar.gz)
    Source code(zip)
  • 2.7.0(Dec 1, 2022)

    It's only been a little over a week since the last release, but there are a lot of new features we're excited to announce!

    This release includes:

    • Cancellation of flow runs
    • Pause and resume of flow runs
    • Logging of print statements
    • Agent-level flow run concurrency limits
    • Notification blocks for PagerDuty and Twilio
    • Improved reporting of crashed infrastructure
    • Improved handling of failures during HTTP requests
    • ... and dozens more fixes and enhancements!

    Thanks a bunch to our five external contributors ❀️

    See the release notes for examples and more details.

    Source code(tar.gz)
    Source code(zip)
  • 2.6.9(Nov 22, 2022)

    This release includes big updates to the Prefect Cloud login experience as well as improvements to logging (e.g. task run logs on remote workers are sent to the API now).

    See the release notes for details!

    Source code(tar.gz)
    Source code(zip)
  • 2.6.8(Nov 17, 2022)

  • 2.6.7(Nov 10, 2022)

    This release adds timeouts for task runs, colored logging, obfuscation of secret settings, dark mode for documentation, and more! It also includes usability updates in the UI, performance improvements for the scheduler, and lots of bug fixes.

    See the release notes for more details πŸŽ‰

    Source code(tar.gz)
    Source code(zip)
  • 2.6.6(Nov 3, 2022)

    This release adds status displays for work queue health to the UI and support for waiting for upstream tasks before starting a subflow!

    It also includes:

    • Scheduler performance improvements
    • Documentation for our versioning scheme
    • Three new collections: GitLab, Alerts, and Fivetran!

    See the full release notes for more details.

    Source code(tar.gz)
    Source code(zip)
  • 2.6.5(Oct 27, 2022)

    This release includes bug fixes and enhancements to the core library. The highlights are:

    • Support for manual flow run retries
    • Improved server performance when retrying flow runs with many tasks
    • Status checks in work queues
    • Support for Python 3.11

    See the release notes for details.

    Source code(tar.gz)
    Source code(zip)
  • 2.6.4(Oct 20, 2022)

    This release includes bug fixes and enhancements to the core library, but the highlights are in our collections. There's a new infrastructure block for running flows in containers on Azure, a new key-value store collection, and the ECS infrastructure block has been designated as ready for production.

    See the release notes for details.

    By the way, if you haven't checked out 2.6 yet, you should now! We launched first-class configuration of results in 2.6.0 and every release since includes additional improvements to result handling.

    Source code(tar.gz)
    Source code(zip)
  • 2.6.3(Oct 18, 2022)

  • 2.6.2(Oct 18, 2022)

    This release includes some major enhancements, including support for compound recurrence rule schedules and private GitHub repositories. There are also fixes and enhancements for the new results feature; notably, we now support reducing memory consumption by optionally always retrieving results from storage.

    As always, the highlights above are a small subset of the changes. See the release notes for everything else!

    Source code(tar.gz)
    Source code(zip)
  • 2.6.1(Oct 14, 2022)

  • 2.6.0(Oct 13, 2022)

    πŸ‘‹ This release includes some big features like first-class configuration of results, waiting for failed tasks, specifying an agent's work queues with a prefix string, and setting parameters when running deployments from the CLI. There are also a bunch of fixes and performance improvements.

    Check out the release notes for examples and more details.

    Source code(tar.gz)
    Source code(zip)
  • 2.5.0(Oct 6, 2022)

    This release includes the ability to easily orchestrate flow runs for deployments from other flow runs! If you're currently using OrionClient.create_flow_run_from_deployment, or if that's been too complicated, you'll be excited to see our sync-compatible run_deployment function that links created runs as subflows. There are a bunch of other enhancements and fixes too! Check out the release notes for details.

    Source code(tar.gz)
    Source code(zip)
  • 2.4.5(Sep 29, 2022)

    This release disables block protection. With block protection enabled, as in 2.4.3 and 2.4.4, client and server versions cannot be mismatched unless you are on a version before 2.4.0. Disabling block protection restores the ability for a client and server to have different version.

    Block protection was added in 2.4.1 to prevent users from deleting block types that are necessary for the system to function. With this change, you are able to delete block types that will cause your flow runs to fail. New safeguards that do not affect client/server compatibility will be added in the future.

    Note this release also restores block memoization by default, reverting the change in 2.4.4 as it did not patch the issue as intended.

    Source code(tar.gz)
    Source code(zip)
  • 2.4.4(Sep 29, 2022)

    When running a server with this version, the client must be the same version. This does not apply to clients connecting to Prefect Cloud.

    This release attempted to fix a bug affecting a subset of users on 2.4.3 where block schemas cannot be saved. This release did not address the issue as intended and is fixed in 2.4.5.

    Source code(tar.gz)
    Source code(zip)
  • 2.4.3(Sep 29, 2022)

    When running a server with this version, the client must be the same version. This does not apply to clients connecting to Prefect Cloud.

    This release includes several performance enhancements, CLI quality of life improvements, and numerous fixes - full release notes here

    Source code(tar.gz)
    Source code(zip)
  • 2.4.2(Sep 23, 2022)

  • 2.4.1(Sep 22, 2022)

    This release includes bug fixes and a lot of enhancements. We're sure you'll be excited about:

    • Microsoft Teams notifications
    • Improved orchestration for duplicate flow runs
    • A CLI command to generate a Kubernetes manifest for agents
    • Performance improvements for blocks

    Check out the release notes to see all of the changes.

    Source code(tar.gz)
    Source code(zip)
  • 1.4.0(Sep 19, 2022)

    Changes

    Enhancements

    • Add get_latest to TaskRunView β€” #6749
    • Update agents to allow logs to be disabled with PREFECT__CLOUD__SEND_FLOW_RUN_LOGS β€” #6833

    Fixes

    • Fix bug where scheduler_comm.close_rpc() was not awaited in DaskExecutor β€” #6793
    • Fix bug where the Prefect diagnostics logger could duplicate output β€” #6832

    Task library

    • Add validation for node type id and instance pool id in Databricks NewCluster β€” #6853
    Source code(tar.gz)
    Source code(zip)
  • 2.4.0(Sep 13, 2022)

  • 1.3.1(Sep 7, 2022)

    Changes

    Fixes

    • Fix bug where flows with emoji characters pass flow.validate(), but fail to extract from file - #6375
    • Fix FlowRunView task run query errors when all task runs in the flwo run are cached - #6572
    • Update FlowRunView to avoid retrieval of cached static tasks during latest calls - #6572
    • Update FlowRunView to be robust to multithreaded usage - #6572

    Task Library

    • Add state handler to log to Snowflake - #5671
    • Fix default idempotency token for DatabricksSubmitMultitaskRun - #6412
    • Fix custom tag type for Databricks NewCluster - #6573
    • Fix empty file handling in S3List task - #6028

    Contributors

    Source code(tar.gz)
    Source code(zip)
  • 2.3.2(Sep 6, 2022)

  • 2.3.1(Sep 1, 2022)

  • 2.3.0(Aug 30, 2022)

    There are some exciting features in this release:

    • Add support for deploying flows stored in Docker images β€” #6574
    • Add support for deploying flows stored on GitHub β€” #6598
    • Add file system block for reading directories from GitHub β€” #6517
    • Add a context manager to disable the flow and task run loggers for testing β€” #6575

    The list of changes is way to big to fit here! Check out the full release notes.

    Source code(tar.gz)
    Source code(zip)
  • 2.2.0(Aug 23, 2022)

    Exciting New Features πŸŽ‰

    • Added automatic detection of static arguments to Task.map in #6513

    Fixes

    • Updated deployment flow run retry settings with runtime values in #6489
    • Updated media queries for flow-run-filter in #6484
    • Added empirical_policy to flow run update route in #6486
    • Updated flow run policy retry settings to be nullable in #6488
    • Disallowed extra attribute initialization on Deployment objects in #6505
    • Updated deployment build to raise an informative error if two infrastructure configs are provided in #6504
    • Fixed calling async subflows from sync parents in #6514
    Source code(tar.gz)
    Source code(zip)
A flexible python library for building your own cron-like system, with REST APIs and a Web UI.

Nextdoor Scheduler ndscheduler is a flexible python library for building your own cron-like system to schedule jobs, which is to run a tornado process

1k Dec 15, 2022
Remote task execution tool

Gunnery Gunnery is a multipurpose task execution tool for distributed systems with web-based interface. If your application is divided into multiple s

Gunnery 747 Nov 09, 2022
Clepsydra is a mini framework for task scheduling

Intro Clepsydra is a mini framework for task scheduling All parts are designed to be replaceable. Main ideas are: No pickle! Tasks are stored in reada

Andrey Tikhonov 15 Nov 04, 2022
Python-Repeated-Timer is an open-source & highly performing timer using only standard-libraries.

Python Repeated Timer Python-Repeated-Timer is an open-source & highly performing timer using only standard-libraries.

TACKHYUN JUNG 3 Oct 09, 2022
A calendaring app for Django. It is now stable, Please feel free to use it now. Active development has been taken over by bartekgorny.

Django-schedule A calendaring/scheduling application, featuring: one-time and recurring events calendar exceptions (occurrences changed or cancelled)

Tony Hauber 814 Dec 26, 2022
A powerful workflow engine implemented in pure Python

Spiff Workflow Summary Spiff Workflow is a workflow engine implemented in pure Python. It is based on the excellent work of the Workflow Patterns init

Samuel 1.3k Jan 08, 2023
dragonscales is a highly customizable asynchronous job-scheduler framework

dragonscales πŸ‰ dragonscales is a highly customizable asynchronous job-scheduler framework. This framework is used to scale the execution of multiple

Sorcero 2 May 16, 2022
Another Scheduler is a Kubernetes controller that automatically starts, stops, or restarts pods from a deployment at a specified time using a cron annotation.

Another Scheduler Another Scheduler is a Kubernetes controller that automatically starts, stops, or restarts pods from a deployment at a specified tim

Diego Najar 66 Nov 19, 2022
Here is the live demonstration of endpoints and celery worker along with RabbitMQ

whelp-task Here is the live demonstration of endpoints and celery worker along with RabbitMQ Before running the application make sure that you have yo

Yalchin403 0 Nov 14, 2021
Ffxiv-blended-job-icons - All action icons for each class/job are blended together to create new backgrounds for each job/class icon!

ffxiv-blended-job-icons All action icons for each class/job are blended together to create new backgrounds for each job/class icon! I used python to c

Jon Strutz 2 Jul 07, 2022
CoSA: Scheduling by Constrained Optimization for Spatial Accelerators

CoSA is a scheduler for spatial DNN accelerators that generate high-performance schedules in one shot using mixed integer programming

UC Berkeley Architecture Research 44 Dec 13, 2022
A task scheduler with task scheduling, timing and task completion time tracking functions

A task scheduler with task scheduling, timing and task completion time tracking functions. Could be helpful for time management in daily life.

ArthurLCW 0 Jan 15, 2022
The easiest way to automate your data

Hello, world! πŸ‘‹ We've rebuilt data engineering for the data science era. Prefect is a new workflow management system, designed for modern infrastruct

Prefect 10.9k Jan 04, 2023
Crontab jobs management in Python

Plan Plan is a Python package for writing and deploying cron jobs. Plan will convert Python code to cron syntax. You can easily manage you

Shipeng Feng 1.2k Dec 28, 2022
generate HPC scheduler systems jobs input scripts and submit these scripts to HPC systems and poke until they finish

DPDispatcher DPDispatcher is a python package used to generate HPC(High Performance Computing) scheduler systems (Slurm/PBS/LSF/dpcloudserver) jobs in

DeepModeling 23 Nov 30, 2022
Aiorq is a distributed task queue with asyncio and redis

Aiorq is a distributed task queue with asyncio and redis, which rewrite from arq to make improvement and include web interface.

PY-GZKY 5 Mar 18, 2022
A Python concurrency scheduling library, compatible with asyncio and trio.

aiometer aiometer is a Python 3.6+ concurrency scheduling library compatible with asyncio and trio and inspired by Trimeter. It makes it easier to exe

Florimond Manca 182 Dec 26, 2022
Automate SQL Jobs Monitoring with python

Automate_SQLJobsMonitoring_python Using python 3rd party modules we can automate

Aejaz Ayaz 1 Dec 27, 2021
Vertigo is an application used to schedule @code4tomorrow classes.

Vertigo Vertigo is an application used to schedule @code4tomorrow classes. It uses the Google Sheets API and is deployed using AWS. Documentation Lear

Ben Nguyen 4 Feb 10, 2022
Python job scheduling for humans.

schedule Python job scheduling for humans. Run Python functions (or any other callable) periodically using a friendly syntax. A simple to use API for

Dan Bader 10.4k Jan 02, 2023