Skip to main content
Ray can run your tasks in parallel by distributing them over multiple machines. The prefect-ray integration makes it easy to accelerate your flow runs with Ray.

Install prefect-ray

The following command will install a version of prefect-ray compatible with your installed version of prefect. If you don’t already have prefect installed, it will install the newest version of prefect as well.
pip install "prefect[ray]"
Upgrade to the latest versions of prefect and prefect-ray:
pip install -U "prefect[ray]"
Ray limitationsThere are a few limitations with Ray:
  • Ray has experimental support for Python 3.13, but Prefect does not currently support Python 3.13.
  • Ray support for non-x86/64 architectures such as ARM/M1 processors with installation from pip alone and will be skipped during installation of Prefect. It is possible to manually install the blocking component with conda. See the Ray documentation for instructions.
  • Ray support for Windows is currently in beta.
See the Ray installation documentation for further compatibility information.

Run tasks on Ray

The RayTaskRunner is a Prefect task runner that submits tasks to Ray for parallel execution. By default, a temporary Ray instance is created for the duration of the flow run. For example, this flow counts to three in parallel:
import time

from prefect import flow, task
from prefect_ray import RayTaskRunner

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow(task_runner=RayTaskRunner)
def count_to(highest_number):
    shout.map(range(highest_number)).wait()

if __name__ == "__main__":
    count_to(10)

# outputs
#3
#7
#2
#6
#4
#0
#1
#5
#8
#9
If you already have a Ray instance running, you can provide the connection URL via an address argument. To configure your flow to use the RayTaskRunner:
  1. Make sure the prefect-ray collection is installed as described earlier: pip install prefect-ray.
  2. In your flow code, import RayTaskRunner from prefect_ray.task_runners.
  3. Assign it as the task runner when the flow is defined using the task_runner=RayTaskRunner argument.
For example, this flow uses the RayTaskRunner with a local, temporary Ray instance created by Prefect at flow run time.
from prefect import flow
from prefect_ray.task_runners import RayTaskRunner

@flow(task_runner=RayTaskRunner())
def my_flow():
    ...
This flow uses the RayTaskRunner configured to access an existing Ray instance at ray://<head_node_host>:10001.
from prefect import flow
from prefect_ray.task_runners import RayTaskRunner

@flow(
    task_runner=RayTaskRunner(
        address="ray://<head_node_host>:10001",
        init_kwargs={"runtime_env": {"pip": ["prefect-ray"]}},
    )
)
def my_flow():
    ...
RayTaskRunner accepts the following optional parameters:
ParameterDescription
addressAddress of a currently running Ray instance, starting with the ray:// URI.
init_kwargsAdditional kwargs to use when calling ray.init.
The Ray client uses the ray:// URI to indicate the address of a Ray instance. If you don’t provide the address of a Ray instance, Prefect creates a temporary instance automatically.

Where the flow’s driver runs

When you connect RayTaskRunner to a shared Ray cluster, the address you pass determines where the driver — the process actually running the flow engine and calling .submit() — sits relative to that cluster. This choice matters more than it looks.

Driver outside the cluster (ray://)

from prefect import flow
from prefect_ray.task_runners import RayTaskRunner

@flow(
    task_runner=RayTaskRunner(
        address="ray://<head_node_host>:10001",
        init_kwargs={"runtime_env": {"pip": ["prefect-ray"]}},
    )
)
def my_flow():
    ...
Your Prefect worker runs off-cluster and reaches the Ray head over the Ray Client protocol. Easy to set up from a laptop or a worker that lives outside Kubernetes.
runtime_env only covers workers hereRay’s dependency docs spell out that a runtime_env passed to ray.init(...) — which is what init_kwargs feeds into — “is only applied to all children Tasks and Actors, not the entrypoint script (Driver) itself.” Whatever your flow module imports at the top (including prefect and prefect-ray) must already be installed on the machine running the driver; you cannot ship driver dependencies via init_kwargs.
A couple of other sharp edges to know about when running real workloads over ray://:
  • The Ray Client connection is not durable. Per Ray’s Ray Client docs, a network interruption of 30+ seconds will terminate the workload. Ray recommends the Jobs API over Ray Client for long-running and ML workloads; the same page labels Ray Client “(For Experts only)” for interactive use.
  • The Ray head node needs prefect-ray and any package your tasks expose in their signatures or close over. When you .submit() a task over ray://, the Ray Client server on the head has to prepare the pickled task before scheduling it onto a worker. Anything in that pickled object graph that the head’s Python environment cannot import surfaces as a ModuleNotFoundError returned from the head, even when your workers would handle the task fine. In practice this means the head image must have:
    • prefect-ray itself installed — RayTaskRunner submissions reference prefect_ray classes, so a head without it fails before a task ever runs.
    • Any package whose classes appear in a task’s parameter or return type annotations.
    • Any package whose objects are passed as task arguments or captured as module-level closures.
    A plain import pandas at the top of your flow file is not enough to trip this on its own — the head only needs the packages that actually end up in the pickled task graph. But as soon as a task signature mentions pd.DataFrame, or you .submit(df), or a task reads a module-level DataFrame, the head needs pandas too. The usual mitigations (matching the head image to your worker image, runtime installs on the head, running one head per flow stack) are all costly. The cleanest fix is usually to move the driver inside the cluster so the Ray Client path is out of the picture entirely.

Driver inside the cluster (address="auto")

from prefect import flow
from prefect_ray.task_runners import RayTaskRunner

@flow(task_runner=RayTaskRunner(address="auto"))
def my_flow():
    ...
With address="auto", ray.init attaches to a raylet running on the same machine as the driver instead of dialing the head’s Ray Client server. Pickling happens in the driver process and puts objects into Ray’s distributed object store; workers on other nodes pull those objects and unpickle them at task-run time. The head stays pure control plane and does not need to import your flow’s dependencies — not even prefect-ray itself. There are a few ways to give your Prefect driver process a local raylet to attach to:
  • Prefect pod joins the Ray cluster as a zero-capacity node. Run ray start --address=<head-gcs-host>:6379 --num-cpus=0 --num-gpus=0 --block on your Prefect worker pod at startup, wait for the raylet socket to appear, then exec the normal Prefect entrypoint. The pod joins the Ray cluster as a zero-capacity node: it never runs Ray tasks itself, but it hosts a local raylet that address="auto" can attach to. A minimal image entrypoint script:
    #!/usr/bin/env bash
    set -euo pipefail
    : "${RAY_HEAD_GCS:?set RAY_HEAD_GCS to host:port of the Ray head GCS}"
    ray start --address="${RAY_HEAD_GCS}" --num-cpus=0 --num-gpus=0 --block &
    until [[ -S /tmp/ray/session_latest/sockets/raylet ]]; do sleep 1; done
    exec "$@"
    
    This is usually the cleanest production pattern: your Prefect worker image is still a Prefect image, the Ray head image stays stock, and the two are only coupled by the GCS address.
  • Sidecar container in an existing Ray pod. Run the Prefect worker as a second container in the Ray head pod (or a Ray worker-group pod) with a shared /tmp/ray emptyDir volume. Simpler to set up if you already manage Ray via KubeRay, but couples the Prefect worker lifecycle to that of the Ray pod. If the always-on sidecar is a cost concern, pair it with something like KEDA for scale-to-zero.
  • Submit the flow script as a Ray Job. ray job submit --runtime-env-json='...' -- python run_flow.py places the entrypoint on a cluster node, sets RAY_ADDRESS in its environment, and — unlike the ray:// case above — applies runtime_env to the entrypoint script itself. Inside run_flow.py, RayTaskRunner(address="auto") attaches to the same cluster. Good for one-off submissions; harder to reconcile with Prefect deployments that themselves want to kick off runs.
All three avoid the ray:// sharp edges above. The ephemeral “one Ray cluster per flow run” pattern (e.g. via KubeRay) is a further option when you want full per-run isolation, at the cost of cluster start-up time on every run.

Troubleshooting a remote Ray cluster

When using the RayTaskRunner with a remote Ray cluster, you may run into issues that are not seen when using a local Ray instance. To resolve these issues, we recommend taking the following steps when working with a remote Ray cluster:
  1. By default, Prefect will not persist any data to the filesystem of the remote ray worker. However, if you want to take advantage of Prefect’s caching ability, you will need to configure a remote result storage to persist results across task runs.
We recommend using the Prefect UI to configure a storage block to use for remote results storage. Here’s an example of a flow that uses caching and remote result storage:
from typing import List

from prefect import flow, task
from prefect.logging import get_run_logger
from prefect.tasks import task_input_hash
from prefect_aws import S3Bucket
from prefect_ray.task_runners import RayTaskRunner


# The result of this task will be cached in the configured result storage
@task(cache_key_fn=task_input_hash)
def say_hello(name: str) -> None:
    logger = get_run_logger()
    # This log statement will print only on the first run. Subsequent runs will be cached.
    logger.info(f"hello {name}!")
    return name


@flow(
    task_runner=RayTaskRunner(
        address="ray://<instance_public_ip_address>:10001",
    ),
    # Using an S3 block that has already been created via the Prefect UI
    result_storage="s3/my-result-storage",
)
def greetings(names: List[str]) -> None:
    say_hello.map(names).wait()


if __name__ == "__main__":
    greetings(["arthur", "trillian", "ford", "marvin"])
  1. If you get an error stating that the module ‘prefect’ cannot be found, ensure prefect is installed on the remote cluster, with:
pip install prefect
  1. If you get an error with a message similar to “File system created with scheme ‘s3’ could not be created”, ensure the required Python modules are installed on both local and remote machines. For example, if using S3 for storage:
pip install s3fs
  1. If you are seeing timeout or other connection errors, double check the address provided to the RayTaskRunner. The address should look similar to: address='ray://<head_node_ip_address>:10001':
RayTaskRunner(address="ray://1.23.199.255:10001")

Specify remote options

The remote_options context can be used to control the task’s remote options. For example, we can set the number of CPUs and GPUs to use for the process task:
from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options


@task
def process(x):
    return x + 1


@flow(task_runner=RayTaskRunner())
def my_flow():
    # equivalent to setting @ray.remote(num_cpus=4, num_gpus=2)
    with remote_options(num_cpus=4, num_gpus=2):
        process.submit(42).wait()

Resources

Refer to the prefect-ray SDK documentation to explore all the capabilities of the prefect-ray library. For further assistance using Ray, consult the Ray documentation.