# Streaming Data Flow: A Story About Performance, Programmability, and Correctness

#### Albert Cohen

with Léonard Gérard, Adrien Guatto, Nhat Minh Lê, Feng Li, Cupertino Miranda, **Antoniu Pop**, Marc Pouzet

> PARKAS Team INRIA and ENS Paris

Lyon, July 1st, 2013

Funded by: TERAFLUX & PHARAON FP7, and ManycoreLabs "investissements d'avenir" grants





# 1. Stream Processing?

#### Stream Processing?

Driving Force: Correct Concurrency by Construction

The Hammer: Language Design

The Anvil: Runtime System Design

Wrap-Up

"A model that uses sequences of data and computation kernels to expose and exploit concurrency and locality for efficiency [of execution and programmability]." "A model that uses sequences of data and computation kernels to expose and exploit concurrency and locality for efficiency [of execution and programmability]."

Two Workshops on Streaming Systems WSS03: http://groups.csail.mit.edu/cag/wss03 WSS08: http://people.csail.mit.edu/rabbah/ conferences/08/micro/wss

## Stream Processing – In This Talk

### Application domains

- 1. High-productivity computing systems
- 2. Efficient runtimes, task-level pipelines
- 3. Embedded control, safety-critical, certified systems

## Stream Processing – In This Talk

### Application domains

- 1. High-productivity computing systems
- 2. Efficient runtimes, task-level pipelines
- 3. Embedded control, safety-critical, certified systems

### Some influential languages

- ► Block-diagram: STATECHARTS, SCADE, SIMULINK, LabVIEW
- ► Synchronous: LUSTRE, ESTEREL, SIGNAL
- Data-flow: Lucid, Linda, SISAL, pH, SAC, CnC
- Kahn network APIs: YAPI, CAL
- Cyclo-static data flow: StreamIt, SigmaC

Bill Dally's streaming processors

Bulk-Synchronous Parallelism?

Vector processing pipelines?

Brook, Cg? CUDA, OpenCL?

### Bill Dally's streaming processors

- Bulk-Synchronous Parallelism? Vector processing pipelines?
- Brook, Cg? CUDA, OpenCL?

#### 3-phase, "hardware-centric" decoupling: *load* → *compute* → *store*

### Bill Dally's streaming processors

- Bulk-Synchronous Parallelism? Vector processing pipelines?
- Brook, Cg? CUDA, OpenCL?

3-phase, "hardware-centric" decoupling:  $load \rightarrow compute \rightarrow store$ 

- Special case of Kahn networks
- Implicitely relies on chaining/fusion to save on memory transfers

## Foundations: Kahn Process Networks



### Kahn networks, 1974 Gilles Kahn (1946–2006)

Denotational: least fixpoint of a *system of equations* over *continuous* functions, for the Scott topology lifted to unbounded *streams* 

 $s\sqsubseteq s'\Longrightarrow f(s)\sqsubseteq f(s') \quad \ \ + \ \, {\rm lifted \ to \ the \ limit}$ 

Operational: communicating processes over *unbounded FIFOs* with *blocking reads* 

- $\rightarrow$  *Deterministic* by design
- → General recursion (dynamic process creation), *parallel composition*, *reactive systems*
- $\rightarrow\,$  Distribute and decouple computations from communications

2. Driving Force: Correct Concurrency by Construction

Stream Processing?

Driving Force: Correct Concurrency by Construction

The Hammer: Language Design

The Anvil: Runtime System Design

Wrap-Up

## Challenges

HPC Between the hammer of *programmability* and the anvil of *performance* 



Embedded Program, test, verify, simulate, compile a single source code, serving as

- an abstract model for static analysis
- a concrete model for simulation
- the actual implementation from which sequential/parallel code can be generated
- Ensure strong properties of safety/efficiency at compile-time
- Both Rely on efficient, proven runtime execution primitives
  - lightweight scheduling
  - ring buffers

### Questions

#### ▷ What are the semantic requirements for source programs?

#### Should programmers care

About parallelism? About the memory and power walls? Which programmers?

▷ What role for the software stack?

Compilers Runtime systems Libraries, library generators Auto-tuning, dynamic optimization Operating system, virtual machine monitor

# 3. The Hammer: Language Design

Stream Processing?

Driving Force: Correct Concurrency by Construction

The Hammer: Language Design

OpenStream: dealing with the Von Neumann bottlenecks Heptagon: safety-critical embedded systems

The Anvil: Runtime System Design

Wrap-Up

# 3. The Hammer: Language Design

#### OpenStream: dealing with the Von Neumann bottlenecks

Heptagon: safety-critical embedded systems

# OpenStream

#### **OpenMP** extension

- leverage existing toolchains and knowledge
- maximize productivity



# OpenStream

### OpenMP extension

- leverage existing toolchains and knowledge
- maximize productivity

### Parallelize irregular, dynamic codes

- no static/periodic restriction
- maximize expressiveness



# OpenStream

#### **OpenMP** extension

- leverage existing toolchains and knowledge
- maximize productivity

### Parallelize irregular, dynamic codes

- no static/periodic restriction
- maximize expressiveness

#### Mitigate the von Neumann bottlenecks

- decoupled, producer/consumer task-parallel pipelines
- fight Amdahl's law, scavenge parallelism from all sources
- but also preserve local memory and communication bandwidth



## OpenStream Introductory Example

```
for (i = 0; i < N; ++i) {
    #pragma omp task firstprivate (i) output (x) // T1
    x = foo (i);
    #pragma omp task input (x) // T2
    print (x);
}</pre>
```

## OpenStream Introductory Example

```
for (i = 0; i < N; ++i) {
    #pragma omp task firstprivate (i) output (x) // T1
    x = foo (i);
    #pragma omp task input (x) // T2
    print (x);
}</pre>
```

Control program sequentially creates N instances of T1 and of T2 Firstprivate clause privatizes variable i with initialization at task creation Output clause gives write access to stream x Input clause gives read access to stream x Stream x has FIFO semantics

## Stream FIFO Semantics

## Stream FIFO Semantics

```
#pragma omp task output (x)
                                           // Task T1
 x = ...;
for (i = 0; i < N; ++i) {</pre>
 int window_a[2], window_b[2];
                                                                      T1
 #pragma omp task output (x << window_a[2]) // Task T2</pre>
                                                                          Т2
   window_a[0] = ...; window_a[1] = ...;
 if (i % 2) {
   #pragma omp task input (x >> window_b[2]) // Task T3
                                                             Stream "x"
     use (window_b[0], window_b[1]);
 }
                                                                          τз
 #pragma omp task input (x)
                                           // Task T4
   use (x);
                                                                      Т4
}
```

## Stream FIFO Semantics

```
#pragma omp task output (x)
                                           // Task T1
 x = ...;
for (i = 0; i < N; ++i) {</pre>
 int window_a[2], window_b[2];
                                                                      T1
 #pragma omp task output (x << window_a[2]) // Task T2</pre>
                                                                          Т2
   window_a[0] = ...; window_a[1] = ...;
 if (i % 2) {
   #pragma omp task input (x >> window_b[2]) // Task T3
                                                            Stream "x"
     use (window_b[0], window_b[1]);
 }
                                                                          тз
 #pragma omp task input (x)
                                           // Task T4
   use (x);
                                                                      Т4
}
```





| http://openstream.info OpenStream | ACM TACO'13, IJPP'11, HIPEAC'11 |
|-----------------------------------|---------------------------------|
|-----------------------------------|---------------------------------|





Define the formal semantics of imperative programming languages with dynamic, dependent task creation

- control flow: dynamic construction of task graphs
- data flow: decoupling dependent computations (Kahn)

# CDDF model

### Control program

- imperative program
- creates tasks
- model: execution graph of activation points, each generating a task activation

#### Tasks

- imperative program with a dynamic stream access signature
- becomes executable once its dependences are satisfied
- recursively becomes the control program for tasks created within
  - work in progress
  - link with synchronous languages
- model: task activation defined as a set of stream accesses

#### Streams

- Kahn-style unbounded, indexed channels
- multiple producers and/or consumers
- specify dependences and/or communication
- model: indexed set of memory locations, defined on a finite subset

## Control-Driven Data Flow – Results

- Deadlock classification
  - insufficiency deadlock: missing producer before a barrier or control program termination
  - functional deadlock: dependence cycle
  - spurious deadlock: deadlock induced by CDDF semantics on dependence enforcement (Kahn prefixes)
- Conditions on program state allowing to prove
  - deadlock freedom
  - compile-time serializability
  - functional and deadlock determinism

| Condition on state                                                                                    | Deadlock Freedom properties |                   |                                        | Serializability       |            | Determinism |                                  |
|-------------------------------------------------------------------------------------------------------|-----------------------------|-------------------|----------------------------------------|-----------------------|------------|-------------|----------------------------------|
| $\sigma = (k_{e}, \mathcal{A}_{e}, \mathcal{A}_{0})$                                                  | $\neg D(\sigma)$            | $\neg ID(\sigma)$ | $\neg FD(\sigma)$<br>$\lor ID(\sigma)$ | $\neg$ SD( $\sigma$ ) | Dyn. order | CP order    | Func <sup>al</sup> &<br>Deadlock |
| TC( $\sigma$ ) $\land \forall s, \neg MPMC(s)$<br>Weaker than Kahn monotonicity                       | no                          | no                | yes                                    | yes                   | if ¬ID(σ)  | no          | yes                              |
| $\begin{array}{l} SCC(H(\sigma)) = \varnothing \\ Common \ case, \ static \ over-approx. \end{array}$ | no                          | no                | yes                                    | yes                   | if ¬ID(σ)  | no          | yes                              |
| $SC(\sigma) \lor \Omega(k_e) \in \Pi$<br>Less restrictive than strictness                             | yes                         | yes               | yes                                    | yes                   | yes        | no          | yes                              |
| $\forall \sigma, SC(\sigma)$<br>Relaxed strictness                                                    | yes                         | yes               | yes                                    | yes                   | yes        | yes         | yes                              |



## Runtime Design and Implementation

Efficiency requirements

- 1. eliminate false sharing
- 2. use software caching to reduce cache traffic
- 3. avoid atomic operations on data that is effectively shared across many cores
- 4. avoid *effective sharing* of concurrent structures

- 1. Resolve dependences at task creation
- 2. Producers know their consumers before executing *Feed-forward, a.k.a. argument fetch data flow*

- 1. Resolve dependences at task creation
- 2. Producers know their consumers before executing *Feed-forward, a.k.a. argument fetch data flow*





- 1. Resolve dependences at task creation
- 2. Producers know their consumers before executing *Feed-forward, a.k.a. argument fetch data flow*
- 3. Local work-stealing queue for ready tasks



- 1. Resolve dependences at task creation
- 2. Producers know their consumers before executing *Feed-forward, a.k.a. argument fetch data flow*
- 3. Local work-stealing queue for ready tasks
- 4. Producer decides which consumers become executable
  - *local* consensus among producers providing data to the same task
  - without traversing *effectively* shared data structures



#### Comparison to Polling-Based Runtime

Block-sparse LU factorization on StarSs (block size 128 imes 128)



## Alternative: Using Explicit FIFOs

Arvind and Nikhil's I-structures Book: *Implicit parallel programming in pH*, 2001. I-structures: infinite, operational indexed streams, with *full/empty* bits, Single-Producer-Multiple-Consumers (SPMC)

- I-structures where introduced for in-place operations in data-flow programs
- Kahn networks provide a functional definition (and denotational semantics)
- Does not depend on a task scheduler to enforce dependences
- Caveats:
  - No load balancing
  - Need to derive a bounded implementation
  - Cumbersome interaction with user-level thread scheduling: blocking a task blocks the underlying worker (POSIX) thread! Need a mechanism for lightweight task suspension

Implementation: ring buffer, vector of futures

# Streaming With FIFOs Wins: Evaluation on FFT



4-socket Opteron - 16 cores

How does it work? Please wait for implementation details

Runtime optimization

#### Runtime optimization

task placement and topology-aware stealing



#### Runtime optimization

- task placement and topology-aware stealing
- automatic task aggregation

#### Runtime optimization

- task placement and topology-aware stealing
- automatic task aggregation
- runtime deadlock detection in presence of speculative aggregation

#### Distributed memory and heterogeneous platform execution

- Owner Writable Memory (OWM)
  - software coherence protocol
  - code-generation geared towards Distributed Shared Memory
  - explicit cache/publish operations
  - leverage one-sided communications and rDMA
- Nesting transactions and dynamic, task-level data flow
- Compiler transformations for dynamic, task-level data flow



## 3. The Hammer: Language Design

OpenStream: dealing with the Von Neumann bottlenecks Heptagon: safety-critical embedded systems

## **Application Challenges**

Computational applications with real-time control aspect



Embedded control systems running on more and more complex computer architectures, with compiler optimizations, parallel execution, and dynamic power management

#### **Application Challenges**

Safety-critical applications with simulation in the loop

Consolidating applications in mixed-critical systems, enabling communications between critical and non-critical components



#### Heptagon Goals

Generate efficient parallel code

Preserving...

- the functional semantics of the program
- non-functional properties like static and bounded memory
- existing sequential compilation algorithms
- existing certification methodologies for embedded software

[Gérard et al., EMSOFT 2012]

#### Common Practice in Embedded System Design

Matlab Simulink/StateFlow: mixed continuous/discrete signals, data-flow and automata



```
node sum(x:int)=(y:int)
var m :int;
let
    y = x + m;
    m = 0 fby y;
tel
```

- Functional synchronous
- Declarative data-flow

```
node sum(x:int)=(y:int)
var m :int;
let
    m = 0 fby y;
    y = x + m;
tel
```

- Functional synchronous
- Declarative data-flow

```
node sum(x:int)=(y:int)
var m :int;
let
    m = 0 fby y;
    y = x + m;
tel
```

- Functional synchronous
- Declarative data-flow
- Values are streams
- Types and operators are lifted pointwise
- The synchronous register fby

| m | 0 |
|---|---|
| х |   |
| У |   |

```
node sum(x:int)=(y:int)
var m :int;
let
    m = 0 fby y;
    y = x + m;
tel
```

- Functional synchronous
- Declarative data-flow
- Values are streams
- Types and operators are lifted pointwise
- The synchronous register fby

| m | 0 |
|---|---|
| х | 0 |
| У |   |

```
node sum(x:int)=(y:int)
var m :int;
let
    m = 0 fby y;
    y = x + m;
tel
```

- Functional synchronous
- Declarative data-flow
- Values are streams
- Types and operators are lifted pointwise
- The synchronous register fby

| m | 0 |
|---|---|
| x | 0 |
| у | 0 |

```
node sum(x:int)=(y:int)
var m :int;
let
    m = 0 fby y;
    y = x + m;
tel
```

- Functional synchronous
- Declarative data-flow
- Values are streams
- Types and operators are lifted pointwise
- The synchronous register fby



```
node sum(x:int)=(y:int)
var m :int;
let
    m = 0 fby y;
    y = x + m;
tel
```

- Functional synchronous
- Declarative data-flow
- Values are streams
- Types and operators are lifted pointwise
- The synchronous register fby

| m | 0 | 0 |  |
|---|---|---|--|
| х | 0 | 1 |  |
| у | 0 |   |  |

```
node sum(x:int)=(y:int)
var m :int;
let
    m = 0 fby y;
    y = x + m;
tel
```

- Functional synchronous
- Declarative data-flow
- Values are streams
- Types and operators are lifted pointwise
- The synchronous register fby

| m | 0 | 0 |  |
|---|---|---|--|
| x | 0 | 1 |  |
| у | 0 | 1 |  |

```
node sum(x:int)=(y:int)
var m :int;
let
    m = 0 fby y;
    y = x + m;
tel
```

- Functional synchronous
- Declarative data-flow
- Values are streams
- Types and operators are lifted pointwise
- The synchronous register fby



```
node sum(x:int)=(y:int)
var m :int;
let
    m = 0 fby y;
    y = x + m;
tel
```

- Functional synchronous
- Declarative data-flow
- Values are streams
- Types and operators are lifted pointwise
- The synchronous register fby

| m | 0 | 0 | 1 |  |
|---|---|---|---|--|
| x | 0 | 1 | 0 |  |
| у | 0 | 1 | 1 |  |

```
node sum(x:int)=(y:int)
var m :int;
let
    m = 0 fby y;
    y = x + m;
tel
```

- Functional synchronous
- Declarative data-flow
- Values are streams
- Types and operators are lifted pointwise
- The synchronous register fby



```
node sum(x:int)=(y:int)
var m :int;
let
    m = 0 fby y;
    y = x + m;
tel
```

- Functional synchronous
- Declarative data-flow
- Values are streams
- Types and operators are lifted pointwise
- The synchronous register fby



Translate into JAVA syntax: class Sum { int m; void reset(){ m = 0; } int step(int x){ int y; y = x + m; m = y; return y; }

Modular compilation, each node is compiled into a class.

```
node sum(x:int)=(y:int)
var m :int;
let
  m = 0 fby y;
  y = x + m;
tel
```

```
Translate into JAVA syntax:
class Sum {
  int m;
  void reset(){ m = 0; }
  int step(int x){
    int y;
    y = x + m;
    m = y;
    return y;
  }
}
```

- Modular compilation, each node is compiled into a class.
- Synchronous registers are instance variables.

```
node sum(x:int)=(y:int)
var m :int;
let
  m = 0 fby y;
  y = x + m;
tel
```

```
Translate into JAVA syntax:
```

```
class Sum {
    int m;
    void reset(){ m = 0; }
    int step(int x){
        int y;
        y = x + m;
        m = y;
        return y;
    }
}
```

- Modular compilation, each node is compiled into a class.
- Synchronous registers are instance variables.
- Initialisation (and reinitialisation) method.

```
node sum(x:int)=(y:int)
var m :int;
let
  m = 0 fby y;
  y = x + m;
tel
```

```
Translate into JAVA syntax:
```

```
class Sum {
    int m;
    void reset(){ m = 0; }
    int step(int x){
        int y;
        y = x + m;
        m = y;
        return y;
    }
}
```

- Modular compilation, each node is compiled into a class.
- Synchronous registers are instance variables.
- Initialisation (and reinitialisation) method.
- Step method, with in place update of the state.

```
node sum(x:int)=(y:int)
var m :int;
let
    m = 0 fby y;
    y = x + m;
tel
```

```
Translate into JAVA syntax:
```

```
class Sum {
    int m;
    void reset(){ m = 0; }
    int step(int x){
        int y;
        y = x + m;
        m = y;
        return y;
    }
}
```

- Modular compilation, each node is compiled into a class.
- Synchronous registers are instance variables.
- Initialisation (and reinitialisation) method.
- Step method, with in place update of the state.

Two core data-flow operators to manipulate streams:

when: the sampling operator

Two core data-flow operators to manipulate streams:

- when: the sampling operator
- merge: the (lazy) complementing operator

Two core data-flow operators to manipulate streams:

- when: the sampling operator
- merge: the (lazy) complementing operator

| X                   | 0    |
|---------------------|------|
| big = period3()     | true |
| xt = x when big     |      |
| xf = x whenot big   |      |
| y = merge big xt xf |      |

Two core data-flow operators to manipulate streams:

- when: the sampling operator
- merge: the (lazy) complementing operator

| x                   | 0    |  |
|---------------------|------|--|
| big = period3()     | true |  |
| xt = x when big     | 0    |  |
| xf = x whenot big   | •    |  |
| y = merge big xt xf |      |  |
| whenot = when not   |      |  |

► (.) = absence of value

Two core data-flow operators to manipulate streams:

- when: the sampling operator
- merge: the (lazy) complementing operator

| x                            | 0    |  |
|------------------------------|------|--|
| big = period3()              | true |  |
| xt = x when big              | 0    |  |
| xf = x whenot big            | •    |  |
| y = merge big xt xf          | 0    |  |
| <pre>whenot = when not</pre> |      |  |

► (.) = absence of value

Two core data-flow operators to manipulate streams:

- when: the sampling operator
- merge: the (lazy) complementing operator

| x                            | 0    | 1     |  |
|------------------------------|------|-------|--|
| big = period3()              | true | false |  |
| xt = x when big              | 0    |       |  |
| xf = x whenot big            | •    |       |  |
| y = merge big xt xf          | 0    |       |  |
| <pre>whenot = when not</pre> |      |       |  |

► (.) = absence of value

Two core data-flow operators to manipulate streams:

- when: the sampling operator
- merge: the (lazy) complementing operator

| Х                     | 0    | 1     |
|-----------------------|------|-------|
| big = period3()       | true | false |
| xt = x when big       | 0    | •     |
| xf = x whenot big     | •    | 1     |
| y = merge big xt xf   | 0    |       |
| tribonot — tribon not |      |       |

whenot = when not

► (.) = absence of value

Two core data-flow operators to manipulate streams:

- when: the sampling operator
- merge: the (lazy) complementing operator

- whenot = when not
- ▶ (.) = absence of value
- merge is *lazy*, its inputs have to arrive only when needed.

Two core data-flow operators to manipulate streams:

- when: the sampling operator
- merge: the (lazy) complementing operator

| x                   | 0    | 1     | 2     |
|---------------------|------|-------|-------|
| big = period3()     | true | false | false |
| xt = x when big     | 0    | •     | •     |
| xf = x whenot big   | •    | 1     | 2     |
| y = merge big xt xf | 0    | 1     | 2     |

- whenot = when not
- ► (.) = absence of value
- merge is *lazy*, its inputs have to arrive only when needed.

Two core data-flow operators to manipulate streams:

- when: the sampling operator
- merge: the (lazy) complementing operator

| 0    | 1                          | 2                                                                                                          | 3                                                                                                                                                                | 4                                                                                                                                                                                                                     | •••                                                                                                                                                                                                                                                                         |
|------|----------------------------|------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| true | false                      | false                                                                                                      | true                                                                                                                                                             | false                                                                                                                                                                                                                 | •••                                                                                                                                                                                                                                                                         |
| 0    | •                          | •                                                                                                          | 3                                                                                                                                                                | •                                                                                                                                                                                                                     | •••                                                                                                                                                                                                                                                                         |
| •    | 1                          | 2                                                                                                          | •                                                                                                                                                                | 4                                                                                                                                                                                                                     | •••                                                                                                                                                                                                                                                                         |
| 0    | 1                          | 2                                                                                                          | 3                                                                                                                                                                | 4                                                                                                                                                                                                                     | •••                                                                                                                                                                                                                                                                         |
|      | 0<br><i>true</i><br>0<br>0 | 0         1           true         false           0         .           .         1           0         1 | 0         1         2           true         false         false           0         .         .           .         1         2           0         1         2 | 0         1         2         3           true         false         false         true           0         .         .         3           .         1         2         .           0         1         2         3 | 0         1         2         3         4           true         false         false         true         false           0         .         .         3         .           .         1         2         .         4           0         1         2         3         4 |

- whenot = when not
- (.) = absence of value
- merge is *lazy*, its inputs have to arrive only when needed.
- ▶ The compiler computes correct rhythm for every stream.

```
ys = 0 fby slow(1, ys);
```

#### ys 0 3.14 6.28 9.42 12.56 ...

slow: step integration with horizon of 1 second.

| ys | = | 0 | fby | slow(1,            | ys); |
|----|---|---|-----|--------------------|------|
| yf | = | 0 | fby | <pre>fast(1,</pre> | yf); |

| ys | 0 | 3.14 | 6.28 | 9.42 | 12.56 | ••• |
|----|---|------|------|------|-------|-----|
| yf | 0 | 3    | 6    | 9    | 12    | ••• |

- slow: step integration with horizon of 1 second.
- fast: fast approximate

| ys | = | 0 | fby | slow(1, y | s);  |
|----|---|---|-----|-----------|------|
| yf | = | 0 | fby | fast(1/3, | yf); |

| ys | 0 | 3.14 | 6.28 | 9.42 | 12.56 | • • • |   |   |   |   |    |
|----|---|------|------|------|-------|-------|---|---|---|---|----|
| yf | 0 | 1    | 2    | 3    | 4     | 5     | 6 | 7 | 8 | 9 | 10 |

- slow: step integration with horizon of 1 second.
- fast: fast approximate with horizon of 1/3 second.

```
ys = 0 fby slow(1, ys);
yf = 0 fby fast(1/3, yf);
big = period3();
y = merge big ys (yf whenot big);
```

| big | true | false | false | true | false | false | true | false | ••• |
|-----|------|-------|-------|------|-------|-------|------|-------|-----|
| ys  | 0    | •     | •     | 3.14 | •     | •     | 6.28 | •     | ••• |
| yf  | 0    | 1     | 2     | 3    | 4     | 5     | 6    | 7     | ••• |
| у   | 0√   | 1     | 2     | 3.14 | 4     | 5     | 6.28 | 7     | ••• |

- slow: step integration with horizon of 1 second.
- ▶ fast: fast approximate with horizon of 1/3 second.
- ▶ We use the correct value when possible.

```
ys = 0 fby slow(1, ys);
yf = 0 fby fast(1/3, yf);
big = period3();
y = merge big ys (yf whenot big);
```

| big | true | false | false | true | false | false | true | false | ••• |
|-----|------|-------|-------|------|-------|-------|------|-------|-----|
| ys  | 0    | •     | •     | 3.14 | •     | •     | 6.28 | •     | ••• |
| yf  | 0    | 1     | 2     | 3    | 4     | 5     | 6    | 7     | ••• |
| У   | 0    | 14    | 2 1   | 3.14 | 4 ×   | 5≁    | 6.28 | 7≁    | ••• |

- slow: step integration with horizon of 1 second.
- ▶ fast: fast approximate with horizon of 1/3 second.
- ▶ We use the correct value when possible.
- And complement with the approximate one.

```
ys = 0 fby slow(1, ys);
yf = 0 fby fast(1/3, y);
big = period3();
y = merge big ys (yf whenot big);
```

| big | true | false | false | true   | false  | false | true   | false | ••• |
|-----|------|-------|-------|--------|--------|-------|--------|-------|-----|
| ys  | 0    | •     | •     | 3.14   | •      | •     | 6.28   | •     | ••• |
| yf  | 0    | 入1    | 2     |        | 7 4.14 |       |        | /     |     |
| У   | 0 /  | 1     | 2     | 3.14 ′ | 4.14   | 5.14  | 6.28 / | 7.28  | ••• |

- slow: step integration with horizon of 1 second.
- ▶ fast: fast approximate with horizon of 1/3 second.
- ▶ We use the correct value when possible.
- And complement with the approximate one.

```
ys = 0 fby slow(1, ys);
yf = 0 fby fast(1/3, y);
big = period3();
y = merge big ys (yf whenot big);
```

| big | true | false | false | true | false | false | true | false | ••• |
|-----|------|-------|-------|------|-------|-------|------|-------|-----|
| ys  | 0    | •     | •     | 3.14 | •     | •     | 6.28 | •     | ••• |
| yf  | 0    | 1     | 2     | 3    | 4.14  | 5.14  | 6.14 | 7.28  | ••• |
| У   | 0    | 1     | 2     | 3.14 | 4.14  | 5.14  | 6.28 | 7.28  | ••• |

We would like to run them in parallel:



```
ys = 0 fby slow(1, ys);
yf = 0 fby fast(1/3, y);
big = period3();
y = merge big ys (yf whenot big);
```

| big | true | false | false | true | false | false | true | false | ••• |
|-----|------|-------|-------|------|-------|-------|------|-------|-----|
| ys  | 0    | •     | •     | 3.14 | •     | •     | 6.28 | •     | ••• |
| yf  | 0    | 1     | 2     | 3    | 4.14  | 5.14  | 6.14 | 7.28  | ••• |
| У   | 0    | 1     | 2     | 3.14 | 4.14  | 5.14  | 6.28 | 7.28  | ••• |

#### This is what happens, unfortunately:



# Synchronous register are synchronous

```
class Slow_fast {
 Fast fast;
 Slow slow:
 Period3 period3;
 float m:
 float m2:
 void reset () f
  period3.reset():
  slow.reset():
  fast.reset():
   m = 0.f:
   m2 = 0.f:
 float step () {
   float y;
   boolean big;
   big = period3.step();
   if
        (big)
    y = m;
    m = slow.step(1.f, y);
   } else {
    y = m2;
   m2 = fast.step(0.3f,y);
   return v;
 }
```

#### Reminder:

- > y gets the value of the register m.
- During the same step, m is updated for the next time.

# Synchronous register are synchronous

```
class Slow_fast {
 Fast fast;
 Slow slow:
 Period3 period3;
 float m:
 float m2:
 void reset () f
   period3.reset():
   slow.reset();
   fast.reset();
   m = 0.f:
   m2 = 0.f:
 float step () {
   float v:
   boolean big;
   big = period3.step();
   if
        (big)
    y = m;
          slow.step(1.f, y);
       =
    m
   }<sub>else {</sub>
    y = m2;
   m2 = fast.step(0.3f,y);
   return v;
```

### Reminder:

- y gets the value of the register m.
- During the same step, m is updated for the next time.

# This sequential compilation is:

- very efficient and simple
- traceable
- used and certified in Scade 6

But it prevents parallelization across step boundaries.

# Synchronous register are synchronous

```
class Slow_fast {
 Fast fast;
 Slow slow:
 Period3 period3;
 float m:
 float m2:
 void reset () f
   period3.reset();
   slow.reset():
   fast.reset():
   m = 0.f:
   m2 = 0.f:
 float step () {
   float v:
   boolean big;
   big = period3.step();
        (big)
    y = m;
    m = slow.step(1.f, y);
   } else {
    y = m2;
   m2 = fast.step(0.3f,y);
   return v;
```

### Reminder:

- y gets the value of the register m.
- During the same step, m is updated for the next time.

# This sequential compilation is:

- very efficient and simple
- traceable
- used and certified in Scade 6

But it prevents parallelization across step boundaries.

OCREP by A. Girault The distributed imperative code is *optimized to bypass* the synchronous register.

```
node slow_fast() = (y :float)
var big :bool; yf :float; ys :float
let
   ys = 0 fby slow(1, ys);
   yf = 0 fby fast(1/3, y);
   big = period3();
   y = merge big ys (yf whenot big);
tel
```

We had this:



```
node slow_fast() = (y :float)
var big :bool; yf :float; ys :float
let
   ys = 0 fby slow(1, ys);
   yf = 0 fby fast(1/3, y);
   big = period3();
   y = merge big ys (yf whenot big);
tel
```

We want this:



```
node slow_fast() = (y :float)
var big :bool; yf :float; ys :float
let
    ys = 0 fby slow(1, ys);
    yf = 0 fby fast(1/3, y);
    big = period3();
    y = merge big ys (yf whenot big);
tel
```



```
node slow_fast_a() = (y :float)
var big :bool; yf :float; ys :float
let
   ys = 0 fby (async slow(1, ys));
   yf = 0 fby fast(1/3, y);
   big = period3();
   y = merge big ys (yf whenot big);
tel
```



```
node slow_fast_a() = (y :float)
var big :bool; yf :float; ys :future float
let
  ys = (async 0) fby (async slow(1, ys));
  yf = 0 fby fast(1/3, y);
  big = period3();
  y = merge big ys (yf whenot big);
tel
```



```
node slow_fast_a() = (y :float)
var big :bool; yf :float; ys :future float
let
  ys = (async 0) fby (async slow(1, ys));
  yf = 0 fby fast(1/3, y);
  big = period3();
  y = merge big !ys (yf whenot big);
tel
```



```
node slow_fast_a() = (y :float)
var big :bool; yf :float; ys :future float
let
  ys = (async 0) fby (async slow(1, !ys));
  yf = 0 fby fast(1/3, y);
  big = period3();
  y = merge big !ys (yf whenot big);
tel
```



```
node slow_fast_a() = (y :float)
var big :bool; yf :float; ys :future float
let
  ys = (async 0) fby (async slow(1, !ys));
  yf = 0 fby fast(1/3, y);
  big = period3();
  y = merge big !ys (yf whenot big);
tel
```



- ▶ Futures appeared in MULTILISP [Halstead, 1985].
- Are now in most functional languages and Java, C++, etc.
- > Depending on language integration, it can be a mere library.

- ▶ Futures appeared in MULTILISP [Halstead, 1985].
- Are now in most functional languages and Java, C++, etc.
- Depending on language integration, it can be a mere library.

#### What is the future?

- ▶ Futures appeared in MULTILISP [Halstead, 1985].
- Are now in most functional languages and Java, C++, etc.
- Depending on language integration, it can be a mere library.

What is a future?

- ▶ Futures appeared in MULTILISP [Halstead, 1985].
- Are now in most functional languages and Java, C++, etc.
- Depending on language integration, it can be a mere library.

#### What is a future?

It is a value, which will hold the result of a closed term.

- ► Futures appeared in MULTILISP [Halstead, 1985].
- Are now in most functional languages and Java, C++, etc.
- Depending on language integration, it can be a mere library.

#### What is a future?

It is *a value*, which will hold the result of a *closed term*. Intuitively, it is a *promise* of result that is *bound to come*.

- ► Futures appeared in MULTILISP [Halstead, 1985].
- Are now in most functional languages and Java, C++, etc.
- Depending on language integration, it can be a mere library.

#### What is a future?

It is a value, which will hold the result of a *closed term*. Intuitively, it is a *promise* of result that is *bound to come*.

To guarantee futures integrity in Heptagon:

- ▶ future t is an *abstract type*, with t being the result type.
- A future may *only* be created from:
  - Constants: async 42
  - Asynchronous function calls: async f(x,y)
- Ix "get" the result held by the future x it is *blocking*.

```
class Slow fast {
 Fast fast;
 Slow slow:
 Period3 period3;
 float m; float m2;
 void reset () {
   period3.reset();
   slow.reset();
   fast.reset():
   m = 0.f;
   m2 = 0.f:
  }
 float step () {
   float y;
    boolean big = period3.step();
   if (big) {
     y = m;
     m = slow.step(1.f, y);
    } else {
     y = m2;
    }
   m2 = fast.step(0.3f.v):
   return y;
 }
```

```
class Slow fast a {
 Fast fast;
 Async<Slow> slow;
 Period3 period3;
 Future<float> m; float m2;
 void reset () {
   period3.reset();
   slow.reset();
   fast.reset():
   m = new Future(0.f);
   m2 = 0.f:
 }
 float step () {
   float v:
   boolean big = period3.step();
   if (big) {
     y = m.get();
     m = slow.step(1.f, y);
   } else {
     v = m2:
   }
   m2 = fast.step(0.3f.v);
   return y;
 }
```

}

```
class Slow fast {
 Fast fast;
 Slow slow:
 Period3 period3;
 float m; float m2;
 void reset () {
   period3.reset();
   slow.reset();
   fast.reset():
   m = 0.f;
   m2 = 0.f:
  }
 float step () {
   float y;
    boolean big = period3.step();
   if (big) {
     y = m;
     m = slow.step(1.f, y);
    } else {
     y = m2;
    }
   m2 = fast.step(0.3f.v):
   return y;
  }
```

```
class Slow fast a {
 Fast fast;
 Async<Slow> slow;
 Period3 period3;
 Future<float> m; float m2;
 void reset () {
   period3.reset();
   slow.reset();
   fast.reset():
   m = new Future(0.f);
   m2 = 0.f:
 }
 float step () {
   float v:
   boolean big = period3.step();
   if (big) {
     y = m.get();
     m = slow.step(1.f, y);
   } else {
     v = m2:
   }
   m2 = fast.step(0.3f.v);
   return y;
 }
}
```

```
class Slow fast {
 Fast fast;
 Slow slow:
 Period3 period3:
 float m; float m2;
 void reset () {
   period3.reset();
   slow.reset();
   fast.reset():
   m = 0.f;
   m2 = 0.f:
 }
 float step () {
   float y;
   boolean big = period3.step();
   if (big) {
     y = m;
     m = slow.step(1.f, y);
   } else {
     y = m2;
   }
   m2 = fast.step(0.3f.v):
   return y;
 }
```

```
class Slow fast a {
 Fast fast;
 Async<Slow> slow;
 Period3 period3:
 Future<float> m; float m2;
 void reset () {
   period3.reset();
   slow.reset();
   fast.reset():
   m = new Future(0.f);
   m2 = 0.f:
 }
 float step () {
   float v:
   boolean big = period3.step();
   if (big) {
     y = m.get();
     m = slow.step(1.f, y);
   } else {
     v = m2:
   }
   m2 = fast.step(0.3f.v);
   return y;
 }
```

}

```
class Slow_fast {
 Fast fast:
 Slow slow;
  Period3 period3;
 float m; float m2;
 void reset () {
   period3.reset():
   slow.reset();
   fast.reset();
   m = 0.f;
   m2 = 0.f:
  }
 float step () {
   float y;
   boolean big = period3.step();
   if (big) {
     y = m;
     m = slow.step(1.f. v):
   } else {
     y = m2;
   3
   m2 = fast.step(0.3f,y);
   return y;
}
```

```
class Slow fast a {
 Fast fast;
 Async<Slow> slow;
 Period3 period3:
 Future<float> m; float m2;
 void reset () {
   period3.reset();
   slow.reset();
   fast.reset():
   m = new Future(0.f);
   m2 = 0.f:
 }
 float step () {
   float y;
   boolean big = period3.step();
   if (big) {
     v = m.get():
     m = slow.step(1.f, y);
   } else {
     y = m2;
   }
   m2 = fast.step(0.3f,y);
   return y;
 }
}
```

```
class Slow fast {
 Fast fast;
 Slow slow:
 Period3 period3;
 float m; float m2;
 void reset () {
   period3.reset();
   slow.reset():
   fast.reset():
   m = 0.f;
   m2 = 0.f:
  }
 float step () {
   float y;
   boolean big = period3.step();
   if (big) {
     v = m:
     m = slow.step(1.f, y);
   } else {
     y = m2;
   }
   m2 = fast.step(0.3f,y);
   return y;
 }
}
```

```
class Slow fast a {
 Fast fast;
 Async<Slow> slow;
 Period3 period3:
 Future<float> m; float m2;
 void reset () {
   period3.reset();
   slow.reset();
   fast.reset():
   m = new Future(0.f);
   m2 = 0.f:
 }
 float step () {
   float y;
   boolean big = period3.step();
   if (big) {
    v = m.get();
     m = slow.step(1.f, y);
   } else {
     y = m2;
   }
   m2 = fast.step(0.3f,y);
   return y;
 }
}
```

# Implementation: the async wrapper

#### The async wrapper

- runs asynchronously a node in a worker thread.
- behaves like a node:
  - ▶ step
    - At each input a future is returned.
    - Inputs are fed to the wrapped node through a buffer.
  - reset is done so as to allow data-parallelism.

## Implementation: the async wrapper

#### The async wrapper

- runs asynchronously a node in a worker thread.
- behaves like a node:
  - ▶ step
    - At each input a future is returned.
    - Inputs are fed to the wrapped node through a buffer.
  - reset is done so as to allow data-parallelism.

#### Important observations

- the need of an input buffer to allow decoupling
- the use of reset to enable data-parallelism

#### A future

- ▶ is a shared object with one producer, multiple consumers
- may be stored and used later on
- might not be used at all
- typically depends on the evaluation of upstream futures

#### A future

- ▶ is a shared object with one producer, multiple consumers
- may be stored and used later on
- might not be used at all
- typically depends on the evaluation of upstream futures

Without restrictions, the live-range of a future is *undecidable* and a *concurrent gc* is needed, as the one of java.

#### A future

- ▶ is a shared object with one producer, multiple consumers
- may be stored and used later on
- might not be used at all
- typically depends on the evaluation of upstream futures

Without restrictions, the live-range of a future is *undecidable* and a *concurrent gc* is needed, as the one of java.

#### Memory boundedness

Alive futures are bounded by the number of synchronous registers. *A slab allocator* is possible with *static allocation* and reuse.

#### A future

- ▶ is a shared object with one producer, multiple consumers
- may be stored and used later on
- might not be used at all
- typically depends on the evaluation of upstream futures

Without restrictions, the live-range of a future is *undecidable* and a *concurrent gc* is needed, as the one of java.

#### Memory boundedness

Alive futures are bounded by the number of synchronous registers. *A slab allocator* is possible with *static allocation* and reuse.

Scope restriction for node level memory management Preventing futures to be returned or passed to an async call, allows gc and slab to be *synchronous* and *node local*.

### Backends

#### Existing JAVA backend

- ► Futures are the ones of JAVA
- Static queues and worker threads
- But dynamic allocation of futures

### Backends

#### Existing $\operatorname{JAVA}$ backend

- Futures are the ones of JAVA
- Static queues and worker threads
- But dynamic allocation of futures

Existing  ${\rm C}$  backend, aiming for embedded systems

- Hand tailored futures, queues and threads
- Slab allocator local to each node
- Futures have scope restrictions

### Backends

#### Existing $\operatorname{JAVA}$ backend

- Futures are the ones of JAVA
- Static queues and worker threads
- But dynamic allocation of futures

#### Existing ${\rm C}$ backend, aiming for embedded systems

- Hand tailored futures, queues and threads
- Slab allocator local to each node
- Futures have scope restrictions

#### Work-in-progress **OPENSTREAM** backend

- > Data-flow parallel runtime with high-performance task scheduler
- Handle a large number of async

Code generation for embedded systems

async may be *annotated* with any needed static arguments.

## Code generation for embedded systems

async may be *annotated* with any needed static arguments.

Location annotations for distribution without scheduler:

- One thread per computing unit
- No surprise
- Usually not efficient

## Code generation for embedded systems

async may be *annotated* with any needed static arguments.

Location annotations for distribution without scheduler:

- One thread per computing unit
- No surprise
- Usually not efficient

#### Priority annotations for EDF scheduling:

- Well known
- May be optimal
- Existing tools need to be adapted

# Wrap-Up on Heptagon

#### Semantics

Same semantics as the sequential program without async and !

#### Expressivness

- Synchronous language: time programming
- Futures: decouple and make explicit beginning and end of computations
- ► Together they allow for *programing parallelism*:
  - decoupling, partial-decoupling
  - data-parallelism
  - fork-join, temporal fork-join
  - pipeline, etc.

#### Safety

- Statically serializable: futures in a pure language
- No dynamic memory allocation or thread creation
- Proven runtime system (scheduler, FIFO ring buffer)
- Proven compilation flow (part of it)

## 4. The Anvil: Runtime System Design

Stream Processing?

Driving Force: Correct Concurrency by Construction

The Hammer: Language Design

The Anvil: Runtime System Design

Wrap-Up

### Concurrent Programming: Which Abstraction?

Simplest asynchronous model: sequential consistency

...the result of any execution is the same as if the operations of all the processors were executed in some sequential order, and the operations of each individual processor appear in this sequence in the order specified by its program...

Lamport, 1979.





## Still... Non-Determinism, Data Races, Contention





## Real World is Worse: Relaxed Memory Models







excerpt from Linux spinlock.c

```
void lockfunc ##op## lock(locktype## t *lock)
        for (;;) {
                preempt disable();
                if (likely( raw ##op## trylock(lock)))
                        break;
                preempt_enable();
                if (!(lock)->break lock)
                        (lock)->break lock = 1;
                while (!op## can lock(lock) && (lock)->break lock)
                        raw ##op## relax(&lock->raw lock);
         (lock)->break lock = 0;
 }
excerpt from Linux spinlock.c
int steal(Deque *q) {
 size_t t = load_explicit(&g->top, acquire);
 thread_fence(seq_cst):
 size_t b = load_explicit(&q->bottom, acquire);
 int x = EMPTY:
 if (t < b) {
  /* Non-empty queue. */
   Array *a = load_explicit(&g->array, relaxed);
   x = load_explicit(&a->buffer[t % a->size], relaxed);
   if (!compare_exchange_strong_explicit(&g->top, &t, t + 1, seg_cst, relaxed))
    /* Failed race. */
    return ABORT:
 }
 return x:
}
```

```
void __lockfunc _##op##_lock(locktype##_t *lock)
               for (;;) {
                                                             Lemma 3. The following properties involving barriers apply:
                             preempt disable(
                             if (likely( raw
                                                             (i) (Wx, \_ \xrightarrow{sync} Wy, \_ \xrightarrow{pp-sat} Rz, \_ \lor Wx, \_ \xrightarrow{pp-sat} Ry, \_ \xrightarrow{sync} Rz, \_)
                                            break:
                             \begin{array}{l} \underline{\mathsf{preempt}}_{\texttt{enable}}() & \Longrightarrow & \mathsf{W}x, \underline{-} \underbrace{\overset{\mathtt{pp}}{\to} \mathsf{sat}}_{B, \mathsf{R}x, -} \mathsf{R}z, \underline{-} \\ \texttt{if } (!(\underline{\mathsf{lock}}) - \mathsf{sbre}; (ii) & A.\mathsf{W}x, \underline{-} \overset{\mathtt{ff}}{\to} B.\mathsf{R}x, \underline{-} \underbrace{\overset{\mathtt{sync}}{\to} B.\mathsf{W}y, \underline{-} \underbrace{\overset{\mathtt{pp}}{\to} \mathsf{sat}}_{D, \mathsf{R}x, -} C.\mathsf{R}x, \underline{-} \end{array}
                                                                  \implies A.Wx_{,-} \xrightarrow{\mathsf{pp-sat}} C.\mathsf{R}x_{,-}
                                            (lock) ->
                             while (lop##_can
raw ##o](iii) Let X stand for A.Wx_{,-} \xrightarrow{\text{rf}} B.Rx_{,-} \text{ or } (A \sim B).Wx_{,-}
                                                                   and Y stand for C.Wy, \xrightarrow{\mathbf{rf}} D.Ry, or (C \sim D).Wy,
                (lock)->break lock = 0;
                                                                   then the following holds:
 }
                                                                   \neg (X \xrightarrow{\text{sync}} B.\mathsf{R}y, \xrightarrow{\frown} fr \to C.\mathsf{W}y, -\wedge Y \xrightarrow{\text{sync}} D.\mathsf{R}x, \xrightarrow{fr} A.\mathsf{W}x, \_)
excerpt from Linux spinlock.c
int steal(Deque *q) {
  size_t t = load_explicit(&q->top, acquire);
  thread_fence(seq_cst):
  size_t b = load_explicit(&q->bottom, acquire);
  int x = EMPTY:
  if (t < b) {
     /* Non-empty queue. */
     Array *a = load_explicit(&g->array, relaxed);
     x = load_explicit(&a->buffer[t % a->size], relaxed);
     if (!compare_exchange_strong_explicit(&g->top, &t, t + 1, seg_cst, relaxed))
        /* Failed race. */
        return ABORT:
   }
  return x:
```







- Sequential consistency: behavior equivalent to serial interleaving of accesses.
  - Will necessarily read r0 = 1 or r1 = 1



- Sequential consistency: behavior equivalent to serial interleaving of accesses.
- Total Store Order (x86): write buffer delays visibility of stores from other processors



- Sequential consistency: behavior equivalent to serial interleaving of accesses.
- Total Store Order (x86): write buffer delays visibility of stores from other processors



- Sequential consistency: behavior equivalent to serial interleaving of accesses.
- Total Store Order (x86): write buffer delays visibility of stores from other processors



Sequential consistency possible interleavings



Additional TSO "perceived" interleavings... all permutations

- Sequential consistency: behavior equivalent to serial interleaving of accesses.
- Total Store Order (x86): write buffer delays visibility of stores from other processors
  - Can read r0 = 0 and r1 = 0



r1 = 0

- Sequential consistency: behavior equivalent to serial interleaving of accesses.
- Total Store Order (x86): write buffer delays visibility of stores from other processors
  - Can read r0 = 0 and r1 = 0



- Sequential consistency: behavior equivalent to serial interleaving of accesses.
- Total Store Order (x86): write buffer delays visibility of stores from other processors
  - Can read r0 = 0 and r1 = 0



- Sequential consistency: behavior equivalent to serial interleaving of accesses.
- Total Store Order (x86): write buffer delays visibility of stores from other processors
  - Can read r0 = 0 and r1 = 0



- Sequential consistency: behavior equivalent to serial interleaving of accesses.
- Total Store Order (x86): write buffer delays visibility of stores from other processors
  - Can read r0 = 0 and r1 = 0



- Sequential consistency: behavior equivalent to serial interleaving of accesses.
- Total Store Order (x86): write buffer delays visibility of stores from other processors
  - Can read r0 = 0 and r1 = 0



- Sequential consistency: behavior equivalent to serial interleaving of accesses.
- Total Store Order (x86): write buffer delays visibility of stores from other processors
  - Can read r0 = 0 and r1 = 0



# Memory Consistency and Relaxation

#### Sequential consistency – interleaving

- + total order of all memory operations
- no longer a valid hypothesis: performance bottleneck



# Memory Consistency and Relaxation

#### Sequential consistency – interleaving

- + total order of all memory operations
- no longer a valid hypothesis: performance bottleneck

#### Total store order (x86)

- + total store order: reason about global invariants
- does not scale well





# Memory Consistency and Relaxation

#### Sequential consistency – interleaving

- + total order of all memory operations
- no longer a valid hypothesis: performance bottleneck

#### Total store order (x86)

- + total store order: reason about global invariants
- does not scale well

#### POWER, ARM, C/C++11, RC, WC, DAG, PC, LC...

- partial order of memory operations
- processors may have conflicting views of memory
- + better scalability at a lower power price tag







## Example: First Provably Correct Work-Stealing Scheduler

#### A relaxed lock-free work-stealing algorithm.

Two implementations: C11 and ARM inline assembly.

Based on state-of-the-art sequentially consistent algorithm (Chase and Lev, 2005).

Proven for the POWER/ARM relaxed memory model (axiomatic POWER model by Mador-Haim et al., 2012). POWER and ARM have the same memory model.

[Lê et al., PPoPP 2013]

# Work stealing (1)

Each core has an associated double-ended queue (deque)

- New tasks are *pushed* to the core's deque
- ▶ When a task finishes, another is *taken* from the core's deque



# Work stealing (2)

If a task finishes, and the queue is empty, the core  $\ensuremath{\textit{steals}}$  from another deque

Cores alternate between the *worker* and *thief* roles

We focus on the study of a single deque (hereafter, *the* deque)



Legal reads Only tasks pushed are taken or stolen

- Uniqueness A task pushed into the deque cannot be taken or stolen more than once
  - Existence Tasks are not lost because of concurrency
  - Progress Given a finite number of tasks, the deque is eventually emptied

## Concurrency problems

Two thieves attempt to steal the same task Worker and thief contend for the same task



#### Steal/steal resolution in SC

Proven by Chase and Lev for sequential consistency (SC) Steal/steal resolution with compare-and-swap (CAS)



#### Steal/steal resolution in SC

Proven by Chase and Lev for sequential consistency (SC) Steal/steal resolution with compare-and-swap (CAS)



## Take/steal resolution in SC

Proven by Chase and Lev for sequential consistency (SC) Potential take/steal races only if one task left; detected through comparison of indices (t and b)

If one task: worker "self steals" from its own deque with a CAS



## Take/steal resolution in SC

Proven by Chase and Lev for sequential consistency (SC) Potential take/steal races only if one task left; detected through comparison of indices (t and b)

If one task: worker "self steals" from its own deque with a CAS



## Take/steal resolution in SC

Proven by Chase and Lev for sequential consistency (SC) Potential take/steal races only if one task left; detected through comparison of indices (t and b)

If one task: worker "self steals" from its own deque with a CAS



Summary of operations in SC

Table below shows how operations affect indices

| push                     | ++b |                   |
|--------------------------|-----|-------------------|
| take (deque size $> 1$ ) | b   |                   |
| take (deque size $= 1$ ) | ++t | if CAS successful |
| steal                    | ++t | if CAS successful |

 $1. \ \mbox{Let's}$  assume the deque starts with three tasks



 $W_{x,v}$  denotes a write of the value v to the variable x Blue bins represent the views of Worker's deque from each core

- $1. \ \mbox{Let's}$  assume the deque starts with three tasks
- 2. Thief 1 steals two tasks; the others don't know yet



 $W_{x,v}$  denotes a write of the value v to the variable xBlue bins represent the views of Worker's deque from each core

- $1. \ \mbox{Let's}$  assume the deque starts with three tasks
- 2. Thief 1 steals two tasks; the others don't know yet
- 3. Thief 2 sees the two steals and attempts to steal x[2]



Rx, v denotes a read of the value v from the variable xBlack arrows represent communications

- 1. Let's assume the deque starts with three tasks
- 2. Thief 1 steals two tasks; the others don't know yet
- 3. Thief 2 sees the two steals and attempts to steal x[2]
- 4. Worker sees only the first steal and takes x[2]



Black arrows represent communications

#### Why does it happen?

Different views of the indices in each core The state of the deque is *relative* to the core that observes it ( $\neq$  SC where the state of the deque is the same for all)

The worker does not realize that it is taking the last element (from its viewpoint,  $t \neq b-1$ ) Hence no CAS to resolve the conflict

#### Sequentially consistent ideas (1)

#### Why is it different in SC?

In SC, all memory events are totally ordered Transitively so



#### Sequentially consistent ideas (1)

#### Why is it different in SC?

In SC, all memory events are totally ordered Transitively so



Rb,3 in Thief 2 occurs before Wb,2; otherwise, it would read 2

#### Sequentially consistent ideas (1)

#### Why is it different in SC?

In SC, all memory events are totally ordered Transitively so



#### Sequentially consistent ideas (2)

In SC, a test in one core gives information about *other* cores: the state is the same for all at any given time

Can test for special cases (e.g., single-task queue t=b-1) Can test for invariants (e.g., well-formed queue  $t\leqslant b)$  Can use induction on these invariants



Does not hold in a relaxed memory model.

#### POWER/ARM barriers and cumulativity

Recall the situation in the previous example



#### POWER/ARM barriers and cumulativity

Recall the situation in the previous example



#### POWER/ARM barriers and cumulativity

A **sync** memory barrier instruction guarantees that *all the writes that have been observed* by the core issuing the barrier instruction are propagated to all the other cores before the core can continue



#### Experimental results (on Tegra 3, 4-core ARM)



seacst

c11 native nofences

#### effective steal throughput (Hz)



#### More on Weak Memory Models

- First application of a formal relaxed memory model to the manual proof of a moderately complex real-world algorithm
- ► Two effcient implementations: C11 and inline assembly; tested on ARM, POWER and x86

Recent and ongoing work:

- Proof of a fast SPSC FIFO ring buffer in C11
- Index-based MPMC FIFO: "Erbium"
- Combined FIFO and scheduling with lightweight suspension and wake-up

Perspectives:

- Global address space models for software caches/DSMs
- Application to manycore processors with on-chip distributed memory
- E.g., Kalray MPPA

## FIFO Ring Buffer: Lock-Free Implementations

- SPSC: only needs release/acquire (free on x86) Lamport's FIFO, Lee et al.'s MCRB
- SPMC/MPSC: needs an atomic operation
- MPMC: often implemented using SPMC + MPSC (useful for data parallelism)



FastFlow: http://mc-fastflow.sourceforge.net

Indexed-based MPMC: only needs release/acquire (free on x86) stall/commit(idx1)
Physical stream buffer
update/release(idx3)
computed access indexes

#### Fast Implementations of SPSC Streams

#### Ring buffer with two pointers/indices

Issue: detect full buffers with back-pressure



- Simplest and efficient solution for powers of two: absolute indices Need arithmetic care to deal with wrap-arounds and (32bit) overflows
- Cache-aware optimization
  - Manual/software caching of the last index

#### Lamport's Lock-Free FIFO Queue in C11

```
atomic_size_t front;
atomic_size_t back;
T data[SIZE]:
void init(void) {
                                          bool pop(T *elem) {
  atomic_init(front, 0);
                                            size t b. f:
  atomic_init(back, 0);
                                            b = atomic_load(&back, seq_cst);
3
                                            f = atomic_load(&front, seq_cst);
                                            if (b == f)
bool push(T elem) {
                                             return false;
  size t b. f:
                                            *elem = data[b]:
  b = atomic_load(&back, seq_cst);
                                            atomic store(&front, (f+1)%SIZE, seg cst);
  f = atomic_load(&front, seq_cst);
                                            return true;
  if ((b + 1) \% SIZE == f)
   return false:
  data[b] = elem:
  atomic_store(&back, (b+1)%SIZE, seq_cst);
  return true;
}
```

#### Optimization: WeakRB

```
atomic_size_t front;
size_t pfront;
atomic_size_t back;
size_t cback;
_Static_assert(SIZE_MAX % SIZE == 0,
  "SIZE div SIZE_MAX");
T data[SIZE]:
void init(void) {
 atomic init(front. 0):
 atomic_init(back, 0);
}
bool push(const T *elems, size_t n) {
 size t b. f:
 b = atomic_load(&back, relaxed);
 if (pfront + SIZE - b < n) {</pre>
   pfront = atomic_load(&front, acquire);
   if (pfront + SIZE - b < n)
                                             }
     return false:
  3
 for (size_t i = 0; i < n; i++)</pre>
   data[(b+i) % SIZE] = elems[i]:
 atomic_store(&back, b + n, release);
 return true;
}
```

```
bool pop(T *elems, size_t n) {
   size_t b, f;
   f = atomic_load(&front, relaxed);
   if (cback - f < n) {
      cback = atomic_load(&back, acquire);
      if (cback - f < n)
        return false;
   }
   for (size_t i = 0; i < n; i++)
      elems[i] = data[(f+i) % SIZE];
   atomic_store(&front, f + n, release);
   return true;
</pre>
```

#### **Evaluation Platforms**

| Machine         | Cortex A9        | Core i7        |
|-----------------|------------------|----------------|
| Manufacturer    | Samsung          | Intel          |
| ISA             | ARMv7            | ×86_64         |
| Number of cores | 4                | 4 (8 logical)  |
| Clock frequency | <b>1.3</b> GHz   | <b>3.4</b> GHz |
| Best throughput | <b>2.2,</b> GB/s | <b>22</b> GB/s |

#### Performance Results



## Erbium: Fast Index-Based MPMC Streams



#### Stream synchronization primitives

- commit()/update(): pressure
- release()/stall(): back-pressure
- receive(): prefetch on a sliding window
- Deterministic initialization protocol and garbage collection of dead sliding windows

```
[Miranda et al., CASES 2010]
```

## Erbium: Fast Index-Based MPMC Streams



#### Erbium: Lightweight runtime

Ring buffer with two sets of indices, for the producer and consumer sides

- Cache the minimum of the producer (resp. consumer) indices
- Cache-aware optimization
  - Alignment of shared indices to avoid false sharing
  - Manual/software caching of the last index
  - Monotonicity tolerates races on minimum index computation
- Lock-free, consensus-free implementation
  - No HW atomic instruction, no fence on x86
- ho pprox 10 cycles per streaming communication cycle

## 5. Wrap-Up

Stream Processing?

Driving Force: Correct Concurrency by Construction

The Hammer: Language Design

The Anvil: Runtime System Design

Wrap-Up

## Streaming Data Flow

#### Kahn networks

- Deterministic parallelism
- Abstract description and/or distributed implementation
- Mathematical (ideal) model: stream equations, differential equations, automata
- The *compiler* plays a central role: restrict to executable specifications, optimization, exposing task parallelism
- Inspiration for a scalable and efficient *runtime system* for dynamic dependence resolution and task scheduling

#### Synchronous Kahn parallelism

- As a programming model for dealing with time and parallelism
- And as an internal representation in optimizing compilers
- ► And for *code generation* down to sequential and parallel code
- Full traceability between the source and target code

#### **OpenStream Impact and Dissemination**

- 1. Used in 3 collaborative research projects
- 2. Used in 3 ongoing PhD theses (ÉNS, INRIA and UPMC)
- 3. Used in a parallel programming course project at Politecnico di Torino
- 4. Ongoing work to port OpenStream on Kalray MPPA
- 5. Used for developing and evaluating communication channel synchronization algorithms by Preud'Homme et al. in "An Improvement of OpenMP Pipeline Parallelism with the BatchQueue Algorithm," ICPADS 2012
- 6. Proven work-stealing implementation discussed within OpenJDK and help debug SAP's distributed GC
- OpenStream featured in the HPCwire magazine www.hpcwire.com/hpcwire/2013-01-24/the\_week\_in\_hpc\_research.html?page=3
- Source code publicly available on Sourceforge http://sourceforge.net/p/open-stream/
- 9. Project website: http://openstream.info