Ray: Application-level scheduling with custom resources

Alexey Tumanov blog, Distributed Systems, Ray 1 Comment


Application-level scheduling with custom resources

New to Ray? Start Here!

Ray intends to be a universal framework for a wide range of machine learning applications. This includes distributed training, machine learning inference, data processing, latency-sensitive applications, and throughput-oriented applications. Each of these applications has different, and, at times, conflicting requirements for resource management. Ray intends to cater to all of them, as the newly emerging microkernel for distributed machine learning. In order to achieve that kind of generality, Ray enables explicit developer control with respect to the task and actor placement by using custom resources. In this blog post we are going to talk about use cases and provide examples. This article is intended for readers already familiar with Ray. If you are new to Ray are are looking to easily and elegantly parallelize your Python code, please take a look at this tutorial

Use Cases

  1. Load Balancing. In many cases, the preferred behavior is to distribute tasks across all available nodes in the cluster in a round-robin fashion. Examples of this include workloads that are network I/O bound. Distributing such tasks as thinly as possible eliminates network bottlenecks and minimizers stragglers.
  2. Affinity. Some applications may require affinity between tasks and generally prefer the workload to be packed as tightly as possible. This is helpful, for instance, when dynamically auto-scaling the workload in the cloud.
  3. Anti-affinity. Others may gain significant performance improvement from preventing task co-location — in direct contrast to the former. In other words, in such cases, there’s a need for a repelling force between units of work with the goal of (a) better performance avoiding resource contention, (b) higher availability and elimination of a single point of failure. This class of workloads has a preference for anti-affinity
  4. Packing. Yet other applications require both. They may have strong affinity requirements within a group of tasks or actors, with an anti-affinity constraint across those groups. Examples of such applications include distributed data processing on partitioned datasets (Figure 1), which requires affinity of data processing tasks with tasks that, for instance read a partition of a CSV file. At the same time, we want to distribute vertical dataframe partitions (to maintain column-wise locality within partitions) broadly, to maximize computational parallelism.
data file partitioning with node-affinity and column-wise anti-affinity

Figure 1: maximizing data processing parallelism in Modin

Figure 2 illustrates a more involved example of AlphaGo implemented on Ray with a collection of CPU and GPU Actors. Briefly, AlphaGo performs a Monte Carlo Tree Search (MCTS) over a tree of game states, with vertices representing game board states and edges — valid moves that take source state to destination state. MCTS is CPU intensive, so a collection of Ray CPU Actors is responsible for aggregating the game states and search. They all share a GPU Actor responsible for evaluating game states and assigning scores, for search pruning. Thus, this application requires affinity within each group of CPU and GPU actors, while distributing the computation broadly across these groups of actors for horizontal scalability (and faster processing).

Figure 2a: AlphaGo game state graph


Figure 2b: relationship between GPU and CPU Ray actors in the AlphaGo implementation

Figure 2: AlphaGo on Ray implementation with Ray actors

For that reason Ray enables application-level scheduling with custom resources for precise placement control. We are going to look at one simple use case, where the application prefers to distribute tasks to a set of available nodes in a round-robin fashion, without prior knowledge of the cluster topology or the number of nodes in the cluster.

Figure 3: Ray nodes with resource labels and capacities. label_i is the custom label unique to each Ray node used for application-level placement control.


In the first example, we’re going to simply distribute two tasks over two existing Ray nodes explicitly. Note that all examples below can be run on your laptop as well as your cluster without any modifications to your code.

# Start two Ray nodes as follows:
ray start --head --resources '{"node0": 2}' --redis-port 6379  #start head
ray start --redis-address localhost:6379 --resources '{"node1": 2}'
import ray

# Connect to existing Ray cluster

def f(i):
    return i

# Create two ray functions, each with custom resource requirement for node_i

oids = [f._remote(args=[i], kwargs={}, resources={'node' + str(i) : 1})
        for i in range(2)]

# Get the results
ray.get(oids)  # [0 ,1]

Second, we’re going to leverage Ray’s Global State API to dynamically get total as well as available resources in the cluster.

### This gives you aggregate cluster resource total
Out[8]: {'CPU': 24.0, 'GPU': 0.0, 'node0': 2.0, 'node1': 2.0}

## This gives you aggregate cluster resources available
Out[9]: {'CPU': 24.0, 'GPU': 0.0, 'node0': 2.0, 'node1': 2.0}

## Querying information for all Ray nodes in the cluster.
 [{'ClientID': '5c88eacdf0465af552d2e0de0cd3b692f8f4754e',
  'Resources': {'CPU': 8.0, 'GPU': 0.0, 'node2': 2.0}}]

Third, we round robin an arbitrary number of tasks over an arbitrary number of Ray nodes, which we query using `ray.global_state.client_table()` above.

task_count = 0
num_nodes = 2
object_ids = []
while task_count < 1000:
    next_node_id = task_count % num_nodes
    object_id = f._remote(args=[task_count], kwargs={},
                          resources={'node' + str(next_node_id) : 1})
    task_count += 1

ray.get(object_ids)  # [0, 1, 2, 3, 4, 5, 6, 7, ..., 999]

Another use case we frequently get is task co-location, where launching two tasks on the same node is necessary or can offer better performance. This can also be achieved with custom resources. For instance, here’s an example of a writer task writing to a local file, and a reader task reading from the same file. Since the file is local to the node, these tasks must be co-located.

import os, time

# Declare functions without ray decorator
# The writer writes to a local file on the node
def writer():
    filepath = '/tmp/myfile.tmp'
    with open(filepath, 'w') as f:

# The reader reads from the local file
def reader():
    filepath = '/tmp/myfile.tmp'
    # Wait for the file to created if does not exist
    while(not os.path.isfile(filepath)):
    # Read the file once created
    with open(filepath, 'r') as f:
        data = f.read()
    return data

# Assume we have a node that was started like this:
# ray start --redis-address localhost:6379 --resources '{"node1": 2}'

# Create the reader and writer. They will be launched on the same node.
writer._remote(args=[], kwargs={}, resources={'node1': 1})
object_id = reader._remote(args=[], kwargs={}, resources={'node1': 1})

# Get the results
ray.get(object_id)  # 'foo'

Similarly, here’s an example where one task calls another, and both need to be co-located for performance.

class Actor:
    def method(self, i):
        return i

# This function calls an actor method. It may be faster if
# co-located with the actor.
def foo(i, actor_handle):
    return ray.get(actor_handle.method.remote(i))

# Assume we have a node that was started like this:
# ray start --redis-address localhost:6379 --resources '{"node1": 2}'

# Now collocate the actor and the remote function.
actor_handle = Actor._remote(args=[], kwargs={}, resources={"node1": 1})
object_id = foo._remote(args=[1, actor_handle], kwargs={},
                        resources={"node1": 1})

# Get the results.
ray.get(object_id)  # 1

Finally, to stop your Ray cluster, just run

ray stop

on the command line.


In conclusion, Ray provides the ability to precisely control task placement using custom resources at the application level. Ray Tune, for instance takes full advantage of this by managing its own scarce resource allocation to hyperparameter search experiments (e.g., GPU allocation). This explicit control of resource allocation at the application level enables Tune to implement its own experiment admission control. Thus we follow the Exokernel [1] philosophy, whereby the microkernel is responsible for resource accounting, lifecycle management, and protection mechanisms, while the application is given an interface to control its own desired resource allocation policy, while providing a high-performance, low-latency default policy for default behavior.


[1] D.R. Engler, M.F. Kaashoek, J.O’Toole, JR., Exokernel: an operating system architecture for application-level resource management. In Proc. of SIGOPS SOSP’95, Copper Mountain, Colorado, USA, Dec 03-06, 1995. [ACM DL]

Comments 1

  1. Pingback: 无服务器计算中的两个缺失链条:有状态计算和放置位置控制

Leave a Reply