Source code for foxes.engines.ray

from copy import deepcopy

from foxes.utils import import_module

from .pool import PoolEngine


ray = None


def load_ray():
    """On-demand loading of the ray package"""
    global ray
    if ray is None:
        ray = import_module("ray")


[docs] class RayEngine(PoolEngine): """ The ray engine for foxes calculations. :group: engines """ def _create_pool(self): """Creates the pool""" self.print(f"Initializing pool of {self.n_procs} ray workers") load_ray() ray.init(num_cpus=self.n_procs) def _submit(self, f, *args, **kwargs): """ Submits to the pool Parameters ---------- f: Callable The function f(*args, **kwargs) to be submitted args: tuple, optional Arguments for the function kwargs: dict, optional Arguments for the function Returns ------- future: object The future object """ @ray.remote def f_ray(*args, **kwargs): return f(*deepcopy(args), **deepcopy(kwargs)) return f_ray.remote(*args, **kwargs) def _result(self, future): """ Waits for result from a future Parameters ---------- future: object The future Returns ------- result: object The calculation result """ return ray.get(future) def _shutdown_pool(self): """Shuts down the pool""" self.print(f"Shutting down pool of {self.n_procs} ray workers") ray.shutdown()