Post

Ray: A Distributed Framework

Disclaimer: These notes were taken while attending the COL 733:Cloud Computing course at IIT Delhi and the 6.5840:Distributed Systems course offered at MIT by Robert Morris.

Ray

Ray improves on Spark

  • Till now we saw static task graph, but now we want to move to dynamic task graph.
  • Small lineage because of course grained transformations and there used to be a single master, but now we want to move out of single master and have Fine grained transformations.
  • We will also move from Stateless workers to stateful actors.

Recovery of stateful actors using lineage based recovery.

So far our systems had task dependencies defined upfront and then executed. Also we were doing lineage based recovery for FT thats why the tasks were kept coarse grained.

Now, we will be looking at possibilities where new tasks can be generated on the fly as the workers are executing the tasks. One of the simplest implementation of this is the Celery where they had a single pool of tasks and stateless workers. Here any workere can work on any task.

Tasks and Futures

Rays uses futures to handle tasks that haven’t completed yet. When you call a function with .remote() it starts the task and immediately returns a “future”.

1
2
3
4
5
6
7
8
9
10
11
12
13
import ray
ray.init()

@ray.remote
def f(x):
	return x*x

@ray.remote
def g(a, b):
	return a + b

x = g.remote(f.remote(3), f.remote(1))
print(ray.get(x)) # 10

We can use these futures as inputs for other tasks, creating dynamic dependencies. ray.get is used when you need the actual result, and waits for the task to finish.

flowchart LR
    Start --> f1["f.remote(3)"]
    Start --> f2["f.remote(1)"]
    f1 --> g["g.remote"]
    f2 --> g
    g --> Result["ray.get"]
    Result --> Output["Final Result: 10"]
  1. The program starts and initiates two parallel tasks: f.remote(3) and f.remote(1).

  2. These tasks return futures (future_1 and future_2).
  3. Both futures are passed to g.remote().
  4. g.remote() returns another future (future_3).
  5. ray.get(), it is a blocking call, and is called on the final future that returns the value of the future.
  6. The final result (10) is obtained.

A simple system for dynamic tasks and futures(CIEL)

graph LR
    z((z)) --> a[a]
    a --> A((A))
    A --> b[b]
    A --> d[d]
    b --> B((B))
    d --> D((D))
    B --> c[c]
    D --> c
    c --> C((C))

    classDef future fill:#f9f,stroke:#333,stroke-width:2px;
    classDef task fill:#bfb,stroke:#333,stroke-width:2px;
    classDef input fill:#bbf,stroke:#333,stroke-width:2px;

    class z input;
    class A,B,C,D future;
    class a,b,c,d task;

In this we have 3 workers and 4 tasks a,b,c,d and and 4 futures A,B,C,D and the input is z. There workers here contain an object store each. Object stores will store the immutable values for the futures and a master program will keep track of all the workers, tasks and objects.

classDiagram
    class TaskTable {
        Task: String
        WaitingOnObject: String
        Status: String
        Worker: String
    }
    
    class ObjectTable {
        Object: String
        Location: String
        WaitingTasks: String
    }
    
    TaskTable "1" --> "1" ObjectTable: References
    
    class TaskTable1 {
        a(z): ∅, Complete, w0
        b(A): A, Running, w1
        d(A): A, Running, w2
        c(B,D): B D, Pending, -
    }
    
    class ObjectTable1 {
        z: w0, -
        A: w0, -
        B: -, c(B,D)
        C: -, -
        D: -, c(B,D)
    }
    
    TaskTable <|-- TaskTable1: Instance
    ObjectTable <|-- ObjectTable1: Instance

In this we can see the process of task assignment to the workers.

  • Task Table:
  1. a(z) is complete, executed by worker w0. It didn’t wait for any object (∅).
  2. b(A) is currently running on worker w1, waiting on object A.
  3. d(A) is also running, but on worker w2, also waiting on object A.
  4. c(B,D) is pending, waiting for both objects B and D.
  • Object Table:
  1. z is located on w0 and no tasks are waiting for it.
  2. A is also on w0, with no waiting tasks (b(A) and d(A) are already running).
  3. B doesn’t have a location yet, and c(B,D) is waiting for it.
  4. C doesn’t have a location or waiting tasks yet.
  5. D, like B, doesn’t have a location, and c(B,D) is waiting for it.
  • The process of assigning tasks to workers goes as follows:

The master scheduler first assigned a(z) to w0, as z was likely already on w0 (locality-aware scheduling). After a(z) completed, producing object A on w0, the master was notified. With A available, the master then scheduled b(A) on w1 and d(A) on w2. w2 will need to fetch A from w0 to execute d(A), but this data transfer happens directly between workers. c(B,D) remains pending because B and D are not yet available. It will be scheduled once both these objects are produced by their respective tasks.

This approach allows for efficient parallel execution, with the master coordinating based on data dependencies, while actual data transfer and computation happen on the workers. The tables are continuously updated as tasks complete and new objects become available, enabling the master to make informed scheduling decisions.

Finally,

classDiagram
    class TaskTable1 {
        a(z): ∅, Complete, w0
        b(A): A, Complete, w1
        d(A): A, Complete, w2
        c(B,D): B D, Complete, w1
    }
    
    class ObjectTable1 {
        z: w0, -
        A: w0, -
        B: w1, -
        C: w1, -
        D: w2, -
    }

What if w0 crashes? Well if it crashes while it was running B, everything on w0 will be given to some other worker.

Do tasks need to be deterministic? Yes, and also idempotent

What we observe in this is the latency on scheduling tasks because going through the master node is a longtrip and Ray solves this.

Ray

This diagram illustrates how Ray allows for dynamic task creation and dependency management using futures, enabling efficient parallel execution of tasks.


graph TB
    subgraph "Ray Cluster"
        GCS[Global Control Store]
        TT[Task Table]
        OS[Object Store]
        subgraph "Node 1"
            D1[Driver]
            W1[Worker]
            A1[Actor]
            LOS1[Local Object Store]
        end
        subgraph "Node 2"
            W2[Worker]
            A2[Actor]
            LOS2[Local Object Store]
        end
        subgraph "Node 3"
            W3[Worker]
            A3[Actor]
            LOS3[Local Object Store]
        end
    end
    
    D1 -->|Submit tasks| GCS
    GCS <-->|Manage tasks| TT
    GCS <-->|Manage objects| OS
    GCS -->|Distribute tasks| W1
    GCS -->|Distribute tasks| W2
    GCS -->|Distribute tasks| W3
    GCS -->|Manage state| A1
    GCS -->|Manage state| A2
    GCS -->|Manage state| A3
    W1 -->|Execute tasks| A1
    W2 -->|Execute tasks| A2
    W3 -->|Execute tasks| A3
    W1 <-->|Read/Write| LOS1
    W2 <-->|Read/Write| LOS2
    W3 <-->|Read/Write| LOS3
    LOS1 <-->|Sync| OS
    LOS2 <-->|Sync| OS
    LOS3 <-->|Sync| OS

In the above figure we see the Ray Architecture on CIEL.

  • Global Control Store (GCS): Ray uses a centralized control plane called the Global Control Store, which is inspired by CIEL’s master node. The GCS manages the distributed object store and task scheduling.
  • Task Table (TT): This is part of the Global Control Store and manages task information.
  • Object Store (OS): A distributed storage system for sharing data between tasks and actors.
  • Local Object Stores (LOS): Each node has its own local object store for faster access to frequently used data.
  • Connections between workers and their local object stores.
  • Synchronization between local object stores and the global object store.
  • Dynamic task graph: Unlike Spark’s static task graph, Ray supports dynamic task graphs, allowing for more flexible and adaptive computation.
  • Fine-grained tasks: Ray supports fine-grained tasks and actors, improving on CIEL’s coarse-grained approach.
  • Stateful actors: Ray introduces stateful actors, which are long-running computations with mutable state, extending CIEL’s stateless worker model.
  • Distributed scheduler: Ray uses a distributed scheduler that can make local scheduling decisions, improving scalability compared to CIEL’s centralized scheduler.
  • Object store: Ray implements a distributed object store for efficient data sharing between tasks and actors.
  • Lineage-based recovery: Similar to CIEL, Ray uses lineage information for fault tolerance and recovery of stateful actors.
  • Task and actor placement: Ray’s scheduler considers data locality and resource constraints when placing tasks and actors across the cluster.
  • Python API: Ray provides a user-friendly Python API for defining and executing distributed computations.

We don’t want to overload master, it has task table and an object table. Now in Ray workers will have their own tasks locally and if the tasks are becoming too much locally then they pas it to global task table and ray calls this the Global Contol Store.

The workers also have their own object store. The mastere will eventually need to know all the tasks to maintain the lineage graph.

If we try to solve the above problem of tasks and futures in Ray

sequenceDiagram
    participant Driver
    participant Worker1
    participant Worker2
    participant Worker3
    participant GCS as Global Control Store

    Driver->>GCS: Submit DAG
    GCS->>Worker1: Schedule task_a
    GCS->>Worker2: Schedule task_b and task_d
    GCS->>Worker3: Schedule task_c
    Worker1->>Worker1: Execute task_a
    Worker1-->>GCS: Update task completion
    GCS->>Worker2: Notify A is ready
    Worker2->>Worker2: Execute task_b and task_d
    Worker2-->>GCS: Update task completion
    GCS->>Worker3: Notify B and D are ready
    Worker3->>Worker3: Execute task_c
    Worker3-->>GCS: Update task completion
    GCS->>Driver: Return result

Local Task Table

classDiagram
    class Worker1TaskTable {
        task_a(z): Running
        task_a(z): Completed
    }
    class Worker2TaskTable {
        task_b(A): Waiting
        task_d(A): Waiting
        task_b(A): Running
        task_d(A): Running
        task_b(A): Completed
        task_d(A): Completed
    }
    class Worker3TaskTable {
        task_c(B,D): Waiting
        task_c(B,D): Running
        task_c(B,D): Completed
    }

Syncing with master

sequenceDiagram
    participant Worker
    participant GCS as Global Control Store

    Worker->>Worker: Update local task table
    Worker->>GCS: Send task status updates
    GCS->>GCS: Update global task table
    GCS->>Worker: Send any new task assignments
    Worker->>Worker: Update local task table with new assignments

Object Store and Data Transfer

graph TB
    subgraph "Worker 1"
        W1OS[Object Store]
        W1OS --> |z, A| W1[task_a]
    end
    subgraph "Worker 2"
        W2OS[Object Store]
        W2OS --> |A| W2B[task_b]
        W2OS --> |A| W2D[task_d]
    end
    subgraph "Worker 3"
        W3OS[Object Store]
        W3OS --> |B, D| W3[task_c]
    end
    W1OS -.-> |A| W2OS
    W2OS -.-> |B, D| W3OS

Actors

  1. Actors are stateful workers in Ray. They’re like objects that live on a specific machine and remember their state between method calls.
  2. You create an actor by decorating a class with @ray.remote.
  3. When you instantiate this class, Ray creates a process on a worker node to host this actor.
  4. All method calls on this actor instance are executed on that specific process, allowing the actor to maintain state.
  5. Actors are useful for maintaining mutable state in distributed applications, like managing shared parameters or aggregating results.
1
2
3
4
5
6
import ray

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0

Creating an actor

graph LR
    A[Define Actor Class] -->|@ray.remote| B[Ray Actor Class]
    B -->|Instantiate| C[Actor Process on Worker Node]
    C -->|Maintains State| D[Actor Instance]

Actor Method Calls


sequenceDiagram
    participant Client
    participant Ray
    participant ActorProcess

    Client->>Ray: Create Actor
    Ray->>ActorProcess: Start Actor Process
    ActorProcess-->>Ray: Actor Ready
    Ray-->>Client: Actor Reference

    loop Method Calls
        Client->>Ray: Call Actor Method
        Ray->>ActorProcess: Execute Method
        ActorProcess-->>Ray: Method Result
        Ray-->>Client: Return Result
    end

Comparison of Tasks and Actors

graph TB
    subgraph "Ray Cluster"
        subgraph "Tasks"
            T1[Task 1: Stateless]
            T2[Task 2: Stateless]
            T3[Task 3: Stateless]
            W1[Worker 1]
            W2[Worker 2]
            W3[Worker 3]
        end
        subgraph "Actors"
            A1[Actor 1: Stateful]
            A2[Actor 2: Stateful]
            AP1[Actor Process 1]
            AP2[Actor Process 2]
        end
        GCS[Global Control Store]
    end

    Client1 -->|Submit| T1
    Client1 -->|Submit| T2
    Client2 -->|Submit| T3
    
    T1 -.->|Can execute on| W1
    T1 -.->|Can execute on| W2
    T1 -.->|Can execute on| W3
    T2 -.->|Can execute on| W1
    T2 -.->|Can execute on| W2
    T2 -.->|Can execute on| W3
    T3 -.->|Can execute on| W1
    T3 -.->|Can execute on| W2
    T3 -.->|Can execute on| W3
    
    Client3 -->|Method Call| A1
    Client3 -->|Method Call| A2
    Client4 -->|Method Call| A1

    A1 -->|Always executes on| AP1
    A2 -->|Always executes on| AP2

    GCS -->|Manages| T1
    GCS -->|Manages| T2
    GCS -->|Manages| T3
    GCS -->|Manages| A1
    GCS -->|Manages| A2

    classDef task fill:#f9f,stroke:#333,stroke-width:2px;
    classDef actor fill:#bbf,stroke:#333,stroke-width:2px;
    classDef worker fill:#bfb,stroke:#333,stroke-width:2px;
    classDef process fill:#fbb,stroke:#333,stroke-width:2px;
    classDef gcs fill:#ff9,stroke:#333,stroke-width:2px;
    class T1,T2,T3 task;
    class A1,A2 actor;
    class W1,W2,W3 worker;
    class AP1,AP2 process;
    class GCS gcs;
  • Tasks are stateless and can run on any worker.
  • Actors are stateful and run on a specifc worker.
  • Multiple clients can call methods on the same actor.
  • Actors maintain their state between the method calls.

Lets have a look at this LCS code and try to make a dag for it. Hint: Notice that the program computes 9 2D array objects each of size 10,000 x 10,000. You can refer to these 9 array objects as L0,0, L0,1,….L2,2 , L0,1 , … L2,2 respectively.

graph LR
    L00[L0,0]
    L01[L0,1]
    L02[L0,2]
    L10[L1,0]
    L11[L1,1]
    L12[L1,2]
    L20[L2,0]
    L21[L2,1]
    L22[L2,2]

    L00 --> L01
    L00 --> L10
    L00 --> L11

    L01 --> L02
    L01 --> L11
    L01 --> L12

    L10 --> L11
    L10 --> L20
    L10 --> L21

    L11 --> L12
    L11 --> L21
    L11 --> L22

    L02 --> L12
    L20 --> L21
    L12 --> L22
    L21 --> L22

    classDef default fill:#f9f,stroke:#333,stroke-width:2px;
graph TD
    subgraph "Dependency Matrix"
        A["L00: []"] --- B["L01: [L00]"] --- C["L02: [L01]"]
        D["L10: [L00]"] --- E["L11: [L00,L01,L10]"] --- F["L12: [L01,L02,L11]"]
        G["L20: [L10]"] --- H["L21: [L10,L11,L20]"] --- I["L22: [L11,L12,L21]"]
    end

    classDef default fill:#f9f,stroke:#333,stroke-width:2px;

Ray also implements a Bottom up Scheduler, we try to schedule the tasks on the local worker, but if there are too many tasks we ask the master.

Execution

Ray can start millions of millisecond tasks every second. Having such fine grained tasks bring in two issues in scalability for the master program/

  • the lineage graph that tracks dependencies between the objects and tasks become much larger, and
  • the scheduling desicions need to be much more quicker.

To solve these challenges Ray shards the master.

  1. All the state is kept in a sharded Global Control Store (just a replicated redis instances). This makes everything else (like the scheduler) stateless.

  2. To make fast scheduling decisions, Ray designs a bottom-up scheduler. Each node always tries to schedule the (stateless) tasks on itself. Only if the task queue is beyond a threshold, the task is sent to the global scheduler that does locality-aware scheduling.

This post is licensed under CC BY 4.0 by the author.
Bookmarks
Notes & Highlights

Comments powered by Disqus.