Ray: A Distributed Framework
Disclaimer: These notes were taken while attending the COL 733:Cloud Computing course at IIT Delhi and the 6.5840:Distributed Systems course offered at MIT by Robert Morris.
Ray
Ray improves on Spark
- Till now we saw static task graph, but now we want to move to dynamic task graph.
- Small lineage because of course grained transformations and there used to be a single master, but now we want to move out of single master and have Fine grained transformations.
- We will also move from Stateless workers to stateful actors.
Recovery of stateful actors using lineage based recovery.
So far our systems had task dependencies defined upfront and then executed. Also we were doing lineage based recovery for FT thats why the tasks were kept coarse grained.
Now, we will be looking at possibilities where new tasks can be generated on the fly as the workers are executing the tasks. One of the simplest implementation of this is the Celery where they had a single pool of tasks and stateless workers. Here any workere can work on any task.
Tasks and Futures
Rays uses futures to handle tasks that haven’t completed yet. When you call a function with .remote() it starts the task and immediately returns a “future”.
1
2
3
4
5
6
7
8
9
10
11
12
13
import ray
ray.init()
@ray.remote
def f(x):
return x*x
@ray.remote
def g(a, b):
return a + b
x = g.remote(f.remote(3), f.remote(1))
print(ray.get(x)) # 10
We can use these futures as inputs for other tasks, creating dynamic dependencies. ray.get is used when you need the actual result, and waits for the task to finish.
flowchart LR
Start --> f1["f.remote(3)"]
Start --> f2["f.remote(1)"]
f1 --> g["g.remote"]
f2 --> g
g --> Result["ray.get"]
Result --> Output["Final Result: 10"]
-
The program starts and initiates two parallel tasks:
f.remote(3)andf.remote(1). - These tasks return futures (future_1 and future_2).
- Both futures are passed to g.remote().
- g.remote() returns another future (future_3).
- ray.get(), it is a blocking call, and is called on the final future that returns the value of the future.
- The final result (10) is obtained.
A simple system for dynamic tasks and futures(CIEL)
graph LR
z((z)) --> a[a]
a --> A((A))
A --> b[b]
A --> d[d]
b --> B((B))
d --> D((D))
B --> c[c]
D --> c
c --> C((C))
classDef future fill:#f9f,stroke:#333,stroke-width:2px;
classDef task fill:#bfb,stroke:#333,stroke-width:2px;
classDef input fill:#bbf,stroke:#333,stroke-width:2px;
class z input;
class A,B,C,D future;
class a,b,c,d task;
In this we have 3 workers and 4 tasks a,b,c,d and and 4 futures A,B,C,D and the input is z. There workers here contain an object store each. Object stores will store the immutable values for the futures and a master program will keep track of all the workers, tasks and objects.
classDiagram
class TaskTable {
Task: String
WaitingOnObject: String
Status: String
Worker: String
}
class ObjectTable {
Object: String
Location: String
WaitingTasks: String
}
TaskTable "1" --> "1" ObjectTable: References
class TaskTable1 {
a(z): ∅, Complete, w0
b(A): A, Running, w1
d(A): A, Running, w2
c(B,D): B D, Pending, -
}
class ObjectTable1 {
z: w0, -
A: w0, -
B: -, c(B,D)
C: -, -
D: -, c(B,D)
}
TaskTable <|-- TaskTable1: Instance
ObjectTable <|-- ObjectTable1: Instance
In this we can see the process of task assignment to the workers.
- Task Table:
- a(z) is complete, executed by worker w0. It didn’t wait for any object (∅).
- b(A) is currently running on worker w1, waiting on object A.
- d(A) is also running, but on worker w2, also waiting on object A.
- c(B,D) is pending, waiting for both objects B and D.
- Object Table:
- z is located on w0 and no tasks are waiting for it.
- A is also on w0, with no waiting tasks (b(A) and d(A) are already running).
- B doesn’t have a location yet, and c(B,D) is waiting for it.
- C doesn’t have a location or waiting tasks yet.
- D, like B, doesn’t have a location, and c(B,D) is waiting for it.
- The process of assigning tasks to workers goes as follows:
The master scheduler first assigned a(z) to w0, as z was likely already on w0 (locality-aware scheduling). After a(z) completed, producing object A on w0, the master was notified. With A available, the master then scheduled b(A) on w1 and d(A) on w2. w2 will need to fetch A from w0 to execute d(A), but this data transfer happens directly between workers. c(B,D) remains pending because B and D are not yet available. It will be scheduled once both these objects are produced by their respective tasks.
This approach allows for efficient parallel execution, with the master coordinating based on data dependencies, while actual data transfer and computation happen on the workers. The tables are continuously updated as tasks complete and new objects become available, enabling the master to make informed scheduling decisions.
Finally,
classDiagram
class TaskTable1 {
a(z): ∅, Complete, w0
b(A): A, Complete, w1
d(A): A, Complete, w2
c(B,D): B D, Complete, w1
}
class ObjectTable1 {
z: w0, -
A: w0, -
B: w1, -
C: w1, -
D: w2, -
}
What if w0 crashes? Well if it crashes while it was running B, everything on w0 will be given to some other worker.
Do tasks need to be deterministic? Yes, and also idempotent
What we observe in this is the latency on scheduling tasks because going through the master node is a longtrip and Ray solves this.
Ray
This diagram illustrates how Ray allows for dynamic task creation and dependency management using futures, enabling efficient parallel execution of tasks.
graph TB
subgraph "Ray Cluster"
GCS[Global Control Store]
TT[Task Table]
OS[Object Store]
subgraph "Node 1"
D1[Driver]
W1[Worker]
A1[Actor]
LOS1[Local Object Store]
end
subgraph "Node 2"
W2[Worker]
A2[Actor]
LOS2[Local Object Store]
end
subgraph "Node 3"
W3[Worker]
A3[Actor]
LOS3[Local Object Store]
end
end
D1 -->|Submit tasks| GCS
GCS <-->|Manage tasks| TT
GCS <-->|Manage objects| OS
GCS -->|Distribute tasks| W1
GCS -->|Distribute tasks| W2
GCS -->|Distribute tasks| W3
GCS -->|Manage state| A1
GCS -->|Manage state| A2
GCS -->|Manage state| A3
W1 -->|Execute tasks| A1
W2 -->|Execute tasks| A2
W3 -->|Execute tasks| A3
W1 <-->|Read/Write| LOS1
W2 <-->|Read/Write| LOS2
W3 <-->|Read/Write| LOS3
LOS1 <-->|Sync| OS
LOS2 <-->|Sync| OS
LOS3 <-->|Sync| OS
In the above figure we see the Ray Architecture on CIEL.
- Global Control Store (GCS): Ray uses a centralized control plane called the Global Control Store, which is inspired by CIEL’s master node. The GCS manages the distributed object store and task scheduling.
- Task Table (TT): This is part of the Global Control Store and manages task information.
- Object Store (OS): A distributed storage system for sharing data between tasks and actors.
- Local Object Stores (LOS): Each node has its own local object store for faster access to frequently used data.
- Connections between workers and their local object stores.
- Synchronization between local object stores and the global object store.
- Dynamic task graph: Unlike Spark’s static task graph, Ray supports dynamic task graphs, allowing for more flexible and adaptive computation.
- Fine-grained tasks: Ray supports fine-grained tasks and actors, improving on CIEL’s coarse-grained approach.
- Stateful actors: Ray introduces stateful actors, which are long-running computations with mutable state, extending CIEL’s stateless worker model.
- Distributed scheduler: Ray uses a distributed scheduler that can make local scheduling decisions, improving scalability compared to CIEL’s centralized scheduler.
- Object store: Ray implements a distributed object store for efficient data sharing between tasks and actors.
- Lineage-based recovery: Similar to CIEL, Ray uses lineage information for fault tolerance and recovery of stateful actors.
- Task and actor placement: Ray’s scheduler considers data locality and resource constraints when placing tasks and actors across the cluster.
- Python API: Ray provides a user-friendly Python API for defining and executing distributed computations.
We don’t want to overload master, it has task table and an object table. Now in Ray workers will have their own tasks locally and if the tasks are becoming too much locally then they pas it to global task table and ray calls this the Global Contol Store.
The workers also have their own object store. The mastere will eventually need to know all the tasks to maintain the lineage graph.
If we try to solve the above problem of tasks and futures in Ray
sequenceDiagram
participant Driver
participant Worker1
participant Worker2
participant Worker3
participant GCS as Global Control Store
Driver->>GCS: Submit DAG
GCS->>Worker1: Schedule task_a
GCS->>Worker2: Schedule task_b and task_d
GCS->>Worker3: Schedule task_c
Worker1->>Worker1: Execute task_a
Worker1-->>GCS: Update task completion
GCS->>Worker2: Notify A is ready
Worker2->>Worker2: Execute task_b and task_d
Worker2-->>GCS: Update task completion
GCS->>Worker3: Notify B and D are ready
Worker3->>Worker3: Execute task_c
Worker3-->>GCS: Update task completion
GCS->>Driver: Return result
Local Task Table
classDiagram
class Worker1TaskTable {
task_a(z): Running
task_a(z): Completed
}
class Worker2TaskTable {
task_b(A): Waiting
task_d(A): Waiting
task_b(A): Running
task_d(A): Running
task_b(A): Completed
task_d(A): Completed
}
class Worker3TaskTable {
task_c(B,D): Waiting
task_c(B,D): Running
task_c(B,D): Completed
}
Syncing with master
sequenceDiagram
participant Worker
participant GCS as Global Control Store
Worker->>Worker: Update local task table
Worker->>GCS: Send task status updates
GCS->>GCS: Update global task table
GCS->>Worker: Send any new task assignments
Worker->>Worker: Update local task table with new assignments
Object Store and Data Transfer
graph TB
subgraph "Worker 1"
W1OS[Object Store]
W1OS --> |z, A| W1[task_a]
end
subgraph "Worker 2"
W2OS[Object Store]
W2OS --> |A| W2B[task_b]
W2OS --> |A| W2D[task_d]
end
subgraph "Worker 3"
W3OS[Object Store]
W3OS --> |B, D| W3[task_c]
end
W1OS -.-> |A| W2OS
W2OS -.-> |B, D| W3OS
Actors
- Actors are stateful workers in Ray. They’re like objects that live on a specific machine and remember their state between method calls.
- You create an actor by decorating a class with
@ray.remote. - When you instantiate this class, Ray creates a process on a worker node to host this actor.
- All method calls on this actor instance are executed on that specific process, allowing the actor to maintain state.
- Actors are useful for maintaining mutable state in distributed applications, like managing shared parameters or aggregating results.
1
2
3
4
5
6
import ray
@ray.remote
class Counter:
def __init__(self):
self.value = 0
Creating an actor
graph LR
A[Define Actor Class] -->|@ray.remote| B[Ray Actor Class]
B -->|Instantiate| C[Actor Process on Worker Node]
C -->|Maintains State| D[Actor Instance]
Actor Method Calls
sequenceDiagram
participant Client
participant Ray
participant ActorProcess
Client->>Ray: Create Actor
Ray->>ActorProcess: Start Actor Process
ActorProcess-->>Ray: Actor Ready
Ray-->>Client: Actor Reference
loop Method Calls
Client->>Ray: Call Actor Method
Ray->>ActorProcess: Execute Method
ActorProcess-->>Ray: Method Result
Ray-->>Client: Return Result
end
Comparison of Tasks and Actors
graph TB
subgraph "Ray Cluster"
subgraph "Tasks"
T1[Task 1: Stateless]
T2[Task 2: Stateless]
T3[Task 3: Stateless]
W1[Worker 1]
W2[Worker 2]
W3[Worker 3]
end
subgraph "Actors"
A1[Actor 1: Stateful]
A2[Actor 2: Stateful]
AP1[Actor Process 1]
AP2[Actor Process 2]
end
GCS[Global Control Store]
end
Client1 -->|Submit| T1
Client1 -->|Submit| T2
Client2 -->|Submit| T3
T1 -.->|Can execute on| W1
T1 -.->|Can execute on| W2
T1 -.->|Can execute on| W3
T2 -.->|Can execute on| W1
T2 -.->|Can execute on| W2
T2 -.->|Can execute on| W3
T3 -.->|Can execute on| W1
T3 -.->|Can execute on| W2
T3 -.->|Can execute on| W3
Client3 -->|Method Call| A1
Client3 -->|Method Call| A2
Client4 -->|Method Call| A1
A1 -->|Always executes on| AP1
A2 -->|Always executes on| AP2
GCS -->|Manages| T1
GCS -->|Manages| T2
GCS -->|Manages| T3
GCS -->|Manages| A1
GCS -->|Manages| A2
classDef task fill:#f9f,stroke:#333,stroke-width:2px;
classDef actor fill:#bbf,stroke:#333,stroke-width:2px;
classDef worker fill:#bfb,stroke:#333,stroke-width:2px;
classDef process fill:#fbb,stroke:#333,stroke-width:2px;
classDef gcs fill:#ff9,stroke:#333,stroke-width:2px;
class T1,T2,T3 task;
class A1,A2 actor;
class W1,W2,W3 worker;
class AP1,AP2 process;
class GCS gcs;
- Tasks are stateless and can run on any worker.
- Actors are stateful and run on a specifc worker.
- Multiple clients can call methods on the same actor.
- Actors maintain their state between the method calls.
Lets have a look at this LCS code and try to make a dag for it. Hint: Notice that the program computes 9 2D array objects each of size 10,000 x 10,000. You can refer to these 9 array objects as L0,0, L0,1,….L2,2 , L0,1 , … L2,2 respectively.
graph LR
L00[L0,0]
L01[L0,1]
L02[L0,2]
L10[L1,0]
L11[L1,1]
L12[L1,2]
L20[L2,0]
L21[L2,1]
L22[L2,2]
L00 --> L01
L00 --> L10
L00 --> L11
L01 --> L02
L01 --> L11
L01 --> L12
L10 --> L11
L10 --> L20
L10 --> L21
L11 --> L12
L11 --> L21
L11 --> L22
L02 --> L12
L20 --> L21
L12 --> L22
L21 --> L22
classDef default fill:#f9f,stroke:#333,stroke-width:2px;
graph TD
subgraph "Dependency Matrix"
A["L00: []"] --- B["L01: [L00]"] --- C["L02: [L01]"]
D["L10: [L00]"] --- E["L11: [L00,L01,L10]"] --- F["L12: [L01,L02,L11]"]
G["L20: [L10]"] --- H["L21: [L10,L11,L20]"] --- I["L22: [L11,L12,L21]"]
end
classDef default fill:#f9f,stroke:#333,stroke-width:2px;
Ray also implements a Bottom up Scheduler, we try to schedule the tasks on the local worker, but if there are too many tasks we ask the master.
Execution
Ray can start millions of millisecond tasks every second. Having such fine grained tasks bring in two issues in scalability for the master program/
- the lineage graph that tracks dependencies between the objects and tasks become much larger, and
- the scheduling desicions need to be much more quicker.
To solve these challenges Ray shards the master.
-
All the state is kept in a sharded Global Control Store (just a replicated redis instances). This makes everything else (like the scheduler) stateless.
-
To make fast scheduling decisions, Ray designs a bottom-up scheduler. Each node always tries to schedule the (stateless) tasks on itself. Only if the task queue is beyond a threshold, the task is sent to the global scheduler that does locality-aware scheduling.
Comments powered by Disqus.