prefect-ray integration makes it easy to accelerate your flow runs with Ray.
Install prefect-ray
The following command will install a version of prefect-ray compatible with your installed version of prefect.
If you don’t already have prefect installed, it will install the newest version of prefect as well.
prefect and prefect-ray:
Run tasks on Ray
TheRayTaskRunner is a Prefect task runner that submits tasks to Ray for parallel execution.
By default, a temporary Ray instance is created for the duration of the flow run.
For example, this flow counts to three in parallel:
address argument.
To configure your flow to use the RayTaskRunner:
- Make sure the
prefect-raycollection is installed as described earlier:pip install prefect-ray. - In your flow code, import
RayTaskRunnerfromprefect_ray.task_runners. - Assign it as the task runner when the flow is defined using the
task_runner=RayTaskRunnerargument.
RayTaskRunner with a local, temporary Ray instance created by Prefect at flow run time.
RayTaskRunner configured to access an existing Ray instance at ray://<head_node_host>:10001.
RayTaskRunner accepts the following optional parameters:
| Parameter | Description |
|---|---|
| address | Address of a currently running Ray instance, starting with the ray:// URI. |
| init_kwargs | Additional kwargs to use when calling ray.init. |
address of a Ray instance, Prefect creates a temporary instance automatically.
Where the flow’s driver runs
When you connectRayTaskRunner to a shared Ray cluster, the address you pass determines where the driver — the process actually running the flow engine and calling .submit() — sits relative to that cluster. This choice matters more than it looks.
Driver outside the cluster (ray://)
ray://:
- The Ray Client connection is not durable. Per Ray’s Ray Client docs, a network interruption of 30+ seconds will terminate the workload. Ray recommends the Jobs API over Ray Client for long-running and ML workloads; the same page labels Ray Client “(For Experts only)” for interactive use.
-
The Ray head node needs
prefect-rayand any package your tasks expose in their signatures or close over. When you.submit()a task overray://, the Ray Client server on the head has to prepare the pickled task before scheduling it onto a worker. Anything in that pickled object graph that the head’s Python environment cannot import surfaces as aModuleNotFoundErrorreturned from the head, even when your workers would handle the task fine. In practice this means the head image must have:prefect-rayitself installed —RayTaskRunnersubmissions referenceprefect_rayclasses, so a head without it fails before a task ever runs.- Any package whose classes appear in a task’s parameter or return type annotations.
- Any package whose objects are passed as task arguments or captured as module-level closures.
import pandasat the top of your flow file is not enough to trip this on its own — the head only needs the packages that actually end up in the pickled task graph. But as soon as a task signature mentionspd.DataFrame, or you.submit(df), or a task reads a module-level DataFrame, the head needspandastoo. The usual mitigations (matching the head image to your worker image, runtime installs on the head, running one head per flow stack) are all costly. The cleanest fix is usually to move the driver inside the cluster so the Ray Client path is out of the picture entirely.
Driver inside the cluster (address="auto")
address="auto", ray.init attaches to a raylet running on the same machine as the driver instead of dialing the head’s Ray Client server. Pickling happens in the driver process and puts objects into Ray’s distributed object store; workers on other nodes pull those objects and unpickle them at task-run time. The head stays pure control plane and does not need to import your flow’s dependencies — not even prefect-ray itself.
There are a few ways to give your Prefect driver process a local raylet to attach to:
-
Prefect pod joins the Ray cluster as a zero-capacity node. Run
ray start --address=<head-gcs-host>:6379 --num-cpus=0 --num-gpus=0 --blockon your Prefect worker pod at startup, wait for the raylet socket to appear, then exec the normal Prefect entrypoint. The pod joins the Ray cluster as a zero-capacity node: it never runs Ray tasks itself, but it hosts a local raylet thataddress="auto"can attach to. A minimal image entrypoint script:This is usually the cleanest production pattern: your Prefect worker image is still a Prefect image, the Ray head image stays stock, and the two are only coupled by the GCS address. -
Sidecar container in an existing Ray pod. Run the Prefect worker as a second container in the Ray head pod (or a Ray worker-group pod) with a shared
/tmp/rayemptyDirvolume. Simpler to set up if you already manage Ray via KubeRay, but couples the Prefect worker lifecycle to that of the Ray pod. If the always-on sidecar is a cost concern, pair it with something like KEDA for scale-to-zero. -
Submit the flow script as a Ray Job.
ray job submit --runtime-env-json='...' -- python run_flow.pyplaces the entrypoint on a cluster node, setsRAY_ADDRESSin its environment, and — unlike theray://case above — appliesruntime_envto the entrypoint script itself. Insiderun_flow.py,RayTaskRunner(address="auto")attaches to the same cluster. Good for one-off submissions; harder to reconcile with Prefect deployments that themselves want to kick off runs.
ray:// sharp edges above. The ephemeral “one Ray cluster per flow run” pattern (e.g. via KubeRay) is a further option when you want full per-run isolation, at the cost of cluster start-up time on every run.
Troubleshooting a remote Ray cluster
When using theRayTaskRunner with a remote Ray cluster, you may run into issues that are not seen when using a local Ray instance.
To resolve these issues, we recommend taking the following steps when working with a remote Ray cluster:
- By default, Prefect will not persist any data to the filesystem of the remote ray worker. However, if you want to take advantage of Prefect’s caching ability, you will need to configure a remote result storage to persist results across task runs.
- If you get an error stating that the module ‘prefect’ cannot be found, ensure
prefectis installed on the remote cluster, with:
- If you get an error with a message similar to “File system created with scheme ‘s3’ could not be created”, ensure the required Python modules are installed on both local and remote machines. For example, if using S3 for storage:
- If you are seeing timeout or other connection errors, double check the address provided to the
RayTaskRunner. The address should look similar to:address='ray://<head_node_ip_address>:10001':
Specify remote options
Theremote_options context can be used to control the task’s remote options. For example, we can set the number of CPUs and GPUs to use for the process task:
Resources
Refer to theprefect-ray SDK documentation to explore all the capabilities of the prefect-ray library.
For further assistance using Ray, consult the Ray documentation.