Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • final-solutions
  • master
  • sguillou-master-patch-05600
  • sguillou-master-patch-48280
  • sguillou-master-patch-52410
5 results

Target

Select target project
  • lamp/cs206
  • bwermeil/cs206-2020
  • zabifade/cs206-2020
  • cauderan/cs206-2020
  • malonga/cs206-2020
  • dumoncel/cs206
  • bounekhe/cs206
  • bergerault/cs206
  • flealsan/cs206
  • hsu/cs206
  • mouchel/cs206
  • vebraun/cs206
  • vcanard/cs206
  • ybelghmi/cs206
  • belghmi/cs206
  • bousbina/cs206
  • waked/cs206
  • gtagemou/cs206
  • arahmoun/cs206
  • elhachem/cs206
  • benrahha/cs206
  • benslima/cs206
22 results
Select Git revision
  • master
  • sguillou-master-patch-05600
  • sguillou-master-patch-48280
  • sguillou-master-patch-52410
4 results
Show changes
Commits on Source (9)
Showing
with 1 addition and 1294 deletions
This repository will be used as the website for Parallelism and Concurrency CS-206. It will be updated weekly throughout the semester. This README contains general information about the class. Moved to [Moodle](https://moodle.epfl.ch/course/view.php?id=14388).
- [previous-exams](previous-exams) contains PDFs for the previous exams.
- [exercises](exercises) contains markdown documents for exercises and solutions.
- [slides](slides) contains the slides presented in class.
- [labs](labs) contains markdown documents for the labs.
We will use [piazza.com/epfl.ch/spring2022/cs206](Piazza) as discussion forum. Feel free to open a thread if you have any comments or questions.
# First-week tasks
1. Join [the Discord](https://discord.gg/tgbPcCFSm2)
1. Log into gitlab: https://gitlab.epfl.ch/users/sign_in
1. Please fill in [this form](https://forms.gle/N6F3Q3jZm71AASby9) with your GASPAR and SCIPER number
1. Please fill in [this doodle](https://doodle.com/poll/yi2c33zkb8nre3ug) by picking the room in which you will do the exercises.
1. Follow the [Tools Setup](labs/tools-setup.md) page.
1. Do the [example lab](labs/example-lab.md).
1. Watch all videos under *Parallelism 1: Introduction to Parallel Programming* below
1. Do the [first graded lab](labs/lab1-parallel-box-blur-filter/).
# Grading
The grading of the course is divided between labs (30%), midterm exam (30%) and final exam (40%).
# Staff
| Role | People |
| :--- | :--- |
| Professors | [Martin Odersky](https://people.epfl.ch/martin.odersky), [Kashyap Sanidhya](https://people.epfl.ch/sanidhya.kashyap) |
| TAs | [Dragana Milovancevic](https://people.epfl.ch/dragana.milovancevic), [Matthieu Bovel](https://people.epfl.ch/matthieu.bovel), [Simon Guilloud](https://people.epfl.ch/simon.guilloud), [Tao Lyu](https://people.epfl.ch/tao.lyu), [Gupta Vishal](https://people.epfl.ch/vishal.gupta)|
| Student TAs | Mohamed Boukhari, Martin Lenweiter, Nicolas Matekalo, Tuomas Pääkkönen, Abel Wilkinson |
# Course Schedule
Lectures are partially live and partially prerecorded (on YouTube).
Live sessions will be held on Wednesdays from 14:15 to 16:00 if it is a week with live lecture.
Exercise sessions will be held on Wednesdays from 14:15 to 16:00 if it is a week with exercises.
Lab sessions will be held on Wednesdays from 16:15 to 18:00.
You should watch the prerecorded lectures before doing the exercies.
<!-- seq 0 7 100 | xargs -i date -d "02/24/2021 {} days" +"%d.%m.%Y" -->
| Week | Date | Topic | Lectures (14:15-16:00) | Exercises (14:15-16:00) | Labs (16:15-18:00) |
| :-- | :-- | :-- | :-- | :-- | :-- |
| 1 | 2022.02.23 | Parallelism 1 | Prerecorded | Welcome session | Lab 1 |
| 2 | 2022.03.02 | Parallelism 2 | Prerecorded | Exercises 1 | Lab 1 & 2 |
| 3 | 2022.03.09 | Parallelism 3 | Prerecorded | Exercises 2 | Lab 2 & 3 |
| 4 | 2022.03.16 | Parallelism 4 | Prerecorded | Exercises 3 | Lab 3 & 4 |
| 5 | 2022.03.23 | Concurrency 1 | Live | None | Lab 4 & 5 |
| 6 | 2022.03.30 | Concurrency 2 | Live | None | Lab 5 & 6 |
| 7 | 2022.04.06 | Concurrency 3 | Live | None | Lab 6 |
| 8 | 2022.04.13 | Concurrency 4 | Live | None | Lab 6 & 7 |
| - | 2022.04.20 | Easter Break | None | None | None |
| 9 | 2022.04.27 | Midterm Exam | Midterm Exam | None | Lab 7 |
| 10 | 2022.05.04 | Futures | Live in CO1 | None | Midterm review |
| 11 | 2022.05.11 | Actors 1 | Prerecorded | Exercises 4 | Lab 8 |
| 12 | 2022.05.18 | Actors 2 | Prerecorded | Exercises 5 | Lab 9 |
| 13 | 2022.05.25 | Actors 3 | Prerecorded | Q&A | Final dry run |
Solutions to the exercises are released after each deadline. We do not provide solutions for the labs.
Before each Exercise session, students should watch videos corresponding to that week's topic:
### Intro
- [Welcome session (CO1)](slides/CS206 Intro.pdf)
### Parallelism 1: Introduction to Parallel Programming
- [Introduction to Parallel Computing](https://www.youtube.com/watch?v=94O72nyNFY0)
- [Parallelism on the JVM I](https://www.youtube.com/watch?v=I8w-q1TPtjA)
- [Parallelism on the JVM II](https://www.youtube.com/watch?v=BbVWGWTNAXw)
- [Running Computations in Parallel](https://www.youtube.com/watch?v=KkMZGJ3M2-o)
- [Monte Carlo Method to Estimate Pi](https://www.youtube.com/watch?v=VBCf-aTgpPU)
- [First-Class Tasks](https://www.youtube.com/watch?v=mrVVaXCuhBc)
- [How Fast are Parallel Programs?](https://www.youtube.com/watch?v=Lpnexp_Qxgo)
- [Benchmarking Parallel Programs](https://www.youtube.com/watch?v=LvS_kjCssfg)
### Parallelism 2: Basic Task Parallel Algorithms
- [Parallel Sorting](https://www.youtube.com/watch?v=AcuvVgQbphg)
- [Data Operations and Parallel Mapping](https://www.youtube.com/watch?v=ghYtMLrphZw)
- [Parallel Fold (Reduce) Operation](https://www.youtube.com/watch?v=hEBgyhIoWww)
- [Associativity I](https://www.youtube.com/watch?v=q-Cl3whISCY)
- [Associativity II](https://www.youtube.com/watch?v=XBjqYavDUB8)
- [Parallel Scan (Prefix Sum) Operation](https://www.youtube.com/watch?v=CYr3YaQiMwo)
### Parallelism 3: Data-Parallelism
- [Data-Parallel Programming](https://www.youtube.com/watch?v=WW7TabCiOV8)
- [Data-Parallel Operations I](https://www.youtube.com/watch?v=Vd35YQ8DEO4)
- [Data-Parallel Operations II](https://www.youtube.com/watch?v=dcMgKtuAh3s)
- [Scala Parallel Operations](https://www.youtube.com/watch?v=NjkxjAT7ohE)
- [Splitters and Combiners](https://www.youtube.com/watch?v=Redz85Nlle4)
### Parallelism 4: Data-Structures for Parallel Computing
- [Implementing Combiners](https://www.youtube.com/watch?v=dTP0ntniB2I)
- [Parallel Two-phase Construction](https://www.youtube.com/watch?v=XcMtq3OdjQ0)
- [Conc-Tree Data Structure](https://www.youtube.com/watch?v=cUXHXKL8Xvs)
- [Amortized, Constant-Time Append Operation](https://www.youtube.com/watch?v=Ic5DUZLITVI)
- [Conc-Tree Combiners](https://www.youtube.com/watch?v=aLfFlCC1vjc)
### Futures
- [Lecture from 2020](https://www.youtube.com/watch?v=XznYvMjA-7s&t=2499s)
### Actors 1: The Actor Model
- [Introduction: why actors?](https://www.youtube.com/watch?v=ZQAe9AItH8o)
- [The Actor Model](https://www.youtube.com/watch?v=c49tDZuFtPA)
- [Message Processing Semantics](https://www.youtube.com/watch?v=Uxn1eg6R0Fc)
- [Designing Actor Systems](https://www.youtube.com/watch?v=uxeMJLo3h9k)
- [Testing Actor Systems](https://www.youtube.com/watch?v=T_2nwLr-H2s)
### Actors 2: Handling Failure and State
- [Failure Handling With Actors](https://www.youtube.com/watch?v=tgQuuKUQKxE)
- [Lifecycle Monitoring and The Error Kernel](https://www.youtube.com/watch?v=GYy6mmGXR5E)
- [Persistent Actor State](https://www.youtube.com/watch?v=c-BdLeNxkYk)
### Actors 3: Distributed Computing
- [Actors Are Distributed (part 1)](https://www.youtube.com/watch?v=xQx01VDn-jk)
- [Actors Are Distributed (part 2)](https://www.youtube.com/watch?v=LunpgPsYjzQ)
- [Eventual Consistency](https://www.youtube.com/watch?v=Immhr4FKssM)
- [Actor Composition](https://www.youtube.com/watch?v=Ean2-1O9aBs)
- [Scalability](https://www.youtube.com/watch?v=JkcTGRYoZ_A)
- [Responsiveness](https://www.youtube.com/watch?v=KvP-8yrgOr4)
# Labs
Labs are individual assignments where you get to write Scala programs using the concepts learned during lectures.
Labs are submitted by pushing your code on GitLab, see details in the [grading and submission](labs/grading-and-submission.md) page.
| Labs | Name | Start date | Due date (23:59 [AoE](https://en.wikipedia.org/wiki/Anywhere_on_Earth)) |
| :-- | :-- | :-- | :-- |
| Lab 1 | Parallel Box Blur Filter | 2022.02.23 | 2022.03.06 |
| Lab 2 | Reductions and Prefix Sums | 2022.03.02 | 2022.03.13 |
| Lab 3 | K-Means | 2022.03.09 | 2022.03.20 |
| Lab 4 | Barnes-Hut Simulation | 2022.03.16 | 2022.03.27 |
| Lab 5 | Concurrent Bounded Buffer | 2022.03.23 | 2022.04.03 |
| Lab 6 | Lock-free sorted list | 2022.03.30 | 2022.04.17 |
| Lab 7 | Hazard Pointer | 2022.04.13 | 2022.05.01 |
| Lab 8 | Actors Binary Tree | 2022.05.11 | 2022.05.23 |
| Lab 9 | Key-Value Store | 2022.05.18 | 2022.06.03 |
# Exercises
Exercises are pen and paper style questions that will help you consolidate the knowledge learned during lectures.
Exercises should be done in groups during the lecture.
# Exams
The midterm exam will take place on 2022.04.27. The midterm exam will cover all the material seen in the class up to that point.
The final exam will take place on 2022.06.01. The final exam will cover all material seen during the semester.
Information about exams organization will be communicated by email.
[YouTubeConcurrency1]: https://www.youtube.com/watch?v=5oUpSoUoII4
[YouTubeConcurrency2]: https://www.youtube.com/watch?v=Jvo-vrxaGnk
[YouTubeConcurrency3]: https://www.youtube.com/watch?v=t4tqMzfvclk
# Exercise Session 1
# Problem 1: Introduction to Concurrency
Freshly graduated from EPFL, you all have been hired as contractors for a successful and rapidly growing bank. The bank has recently been experiencing problems with their money management system, coded in Scala, and so they hired the best and brightest young engineers they could find: you! The system has been working perfectly fine so far, they tell you. In the past days, due to an increased number of customers, they had to switch from a single threaded sequential execution environment to a multithreaded concurrent environment, in which multiple threads may perform transactions concurrently. That's when problems started, your manager says…
Below is the code responsible to withdraw money from the account from and transfer it to the account to, within the same bank.
```scala
def transfer(from: Account, to: Account, amount: BigInt) {
require(amount >= 0)
val balanceFrom = from.balance
if (balanceFrom >= amount) {
from.balance = balanceFrom - amount
val balanceTo = to.balance
to.balance = balanceTo + amount
}
}
```
For the bank, it is very important that the following two properties hold after any sequence of completed transfer transactions:
1. The balance of an account never goes below 0.
2. The total sum of money held by the bank is constant.
## Question 1
Does the above transfer method respect the two properties in a *sequential* execution environment, that is, when there is only one thread in the program?
## Question 2
What can go wrong in a setting where multiple threads can execute the `transfer` method concurrently? For each of the two desired properties of the system, check if its holds in this concurrent environment. If not, come up with an example execution which exhibits a violation of the property.
# Question 3
For each of the proposed implementations of `transfer` below, check which of the properties hold. Additionally, check if the system is vulnerable to *deadlocks*.
Variant 1
```scala
def transfer(from: Account, to: Account, amount: Long): Unit = {
require(amount >= 0)
val balanceFrom = from.balance
if (balanceFrom >= amount) {
from.synchronized {
from.balance = balanceFrom - amount
}
to.synchronized {
val balanceTo = to.balance
to.balance = balanceTo + amount
}
}
}
```
Variant 2
```scala
def transfer(from: Account, to: Account, amount: Long): Unit = {
require(amount >= 0)
from.synchronized {
val balanceFrom = from.balance
if (balanceFrom >= amount) {
from.balance = balanceFrom - amount
to.synchronized {
val balanceTo = to.balance
to.balance = balanceTo + amount
}
}
}
}
```
Variant 3
```scala
object lock // Global object
def transfer(from: Account, to: Account, amount: Long): Unit = {
require(amount >= 0)
lock.synchronized {
val balanceFrom = from.balance
if (balanceFrom >= amount) {
from.balance = balanceFrom - amount
val balanceTo = to.balance
to.balance = balanceTo + amount
}
}
}
```
# Problem 2: Parallel Reductions
## Question 1
As a group, write a function called `minMax`, which should take a non-empty array as input and return a pair containing the smallest and the largest element of the array.
```scala
def minMax(a: Array[Int]): (Int, Int) = ???
```
Now write a parallel version of the function. You may use the constructs `task` and/or `parallel`, as seen in the lectures.
## Question 2
Imagine that the data structure you are given, instead of an `Array[A]`, is one called `ParSeq[A]`. This class offers the two following methods, which work in parallel:
```scala
def map[B](f: A => B): ParSeq[B]
def reduce(f: (A, A) => A): A
```
Can you write the following `minMax` function in terms of `map` and/or `reduce` operations ?
```scala
def minMax(data: ParSeq[Int]): (Int, Int) = ???
```
## Question 3
What property does the function `f` passed to reduce need to satisfy in order to have the same result regardless on how reduce groups the applications of the operation f to the elements of the data structure? Prove that your function `f` indeed satisfies that property.
# Exercise Session 2
# Problem 1: Aggregate
In this week's lecture, you have been introduced to the aggregate method of `ParSeq[A]` (and other parallel data structures...). It has the following signature:
```scala
def aggregate[B](z: B)(f: (B, A) => B, g: (B, B) => B): B
```
Discuss, as a group, what aggregate does and what its arguments represent.
## Question 1
Consider the parallel sequence `xs` containing the three elements `x1`, `x2` and `x3`. Also consider the following call to aggregate:
```scala
xs.aggregate(z)(f, g)
```
The above call might potentially result in the following computation:
```scala
f(f(f(z, x1), x2), x3)
```
But it might also result in other computations. Come up with at least 2 other computations that may result from the above call to `aggregate`.
## Question 2
Below are other examples of calls to aggregate. In each case, check if the call can lead to different results depending on the strategy used by `aggregate` to aggregate all values contained in `data` down to a single value. You should assume that `data` is a parallel sequence of values of type `BigInt`.
Variant 1
```scala
data.aggregate(1)(_ + _, _ + _)
```
Variant 2
```scala
data.aggregate(0)((acc, x) => x - acc, _ + _)
```
Variant 3
```scala
data.aggregate(0)((acc, x) => acc - x, _ + _)
```
Variant 4
```scala
data.aggregate(1)((acc, x) => x * x * acc, _ * _)
```
## Question 3
Under which condition(s) on `z`, `f`, and `g` does aggregate always lead to the same result?
Come up with a formula on `z`, `f`, and `g` that implies the correctness of aggregate.
*Hint*: You may find it useful to use calls to `foldLeft(z)(f)` in your formula(s).
## Question 4
Implement `aggregate` using the methods `map` and/or `reduce` of the collection you are defining aggregate for.
## Question 5
Implement `aggregate` using the `task` and/or `parallel` constructs seen in the first week and the `Splitter[A]` interface seen in this week's lecture. The `Splitter` interface is defined as:
```scala
trait Splitter[A] extends Iterator[A] {
def split: Seq[Splitter[A]]
def remaining: Int
}
```
You can assume that the data structure you are defining aggregate for already implements a `splitter` method which returns an object of type `Splitter[A]`.
Your implementation of `aggregate` should work in parallel when the number of remaining elements is above the constant THRESHOLD and sequentially below it.
*Hint*: `Iterator`, and thus `Splitter`, implements the `foldLeft` method.
## Question 6
Discuss the implementations from questions 4 and 5. Which one do you think would be more efficient?
# Problem 2: Depth
Review the notion of depth seen in the lecture. What does it represent?
Below is a formula for the depth of a *divide and conquer* algorithm working on an array segment of *size L*, as a function of *L*. The values *c*, *d* and *T* are constants. We assume that *L>0* and *T>0*.
![](images/2-1.png)
Below the threshold *T*, the algorithm proceeds sequentially and takes time *c* to process each single element. Above the threshold, the algorithm is applied recursively over the two halves of the array. The results are then merged using an operation that takes *d* units of time.
## Question 1
Is it the case that for all *1 ≤ L1 ≤ L2* we have *D(L1) ≤ D(L2)*?
If it is the case, prove the property by induction on *L*. If it is not the case, give a counterexample showing values of *L1*, *L2*, *T*, *c*, and *d* for which the property does not hold.
## Question 2
Prove a logarithmic upper bound on *D(L)*. That is, prove that *D(L)* is in *O(log(L))* by finding specific constants *a*, *b* such that *D(L) ≤ a &ast; log2(L) + b*.
*Hint:* The proof is more complex that it might seem. One way to make it more manageable is to define and use a function *D'(L)* that has the property described in question 1, and is greater or equal to *D(L)*. We suggest you use:
![](images/2-2.png)
Also remark that computing *D'(L)* when *L* is a power of 2 is easy. Also remember that there always exists a power of 2 between any positive integer and its double.
# Exercise Session 3
## Problem 1: Parallel Encoding
In this exercise, your group will devise a parallel algorithm to encode sequences using the run-length encoding scheme. This encoding transforms sequences of letters such that all subsequences of the same letter are replaced by the letter and the sequence length. For instance:
```
"AAAAATTTGGGGTCCCAAC" ⇒ "A5T3G4T1C3A2C1"
```
Your goal in this exercise is to come up with a parallel implementation of this algorithm. The function should have the following shape:
```scala
def rle(data: ParSeq[Char]): Buffer[(Char, Int)] =
data.aggregate(???)(???, ???)
```
The Buffer class is already given to you. A buffer of type `Buffer[A]` represents sequences of elements of type `A`. It supports the following methods, all of which are efficient:
```scala
def isEmpty: Boolean // Checks if the buffer is empty.
def head: A // Returns the first element of the buffer.
def tail: Buffer[A] // Returns the buffer minus its first element.
def last: A // Returns the last element of the buffer.
def init: Buffer[A] // Returns the buffer minus its last element.
def ++(that: Buffer[A]): Buffer[A] // Concatenate two buffers.
def append(elem: A): Buffer[A] // Appends a single element to the right.
Buffer.empty[A]: Buffer[A] // Returns an empty buffer.
Buffer.singleton[A](element: A): Buffer[A] // Single element buffer.
```
## Problem 2: Parallel Two Phase Construction
In this exercise, you will implement an array Combiner using internally a double linked list (DLL). Below is a minimal implementation of the `DLLCombiner` class and the related `Node` class. Your goal for this exercise is to complete the implementation of the (simplified) Combiner interface of the `DLLCombiner` class.
```scala
class DLLCombiner[A] extends Combiner[A, Array[A]]:
var head: Node[A] = null // null for empty lists.
var last: Node[A] = null // null for empty lists.
var size: Int = 0
// Implement these three methods...
override def +=(elem: A): Unit = ???
override def combine(that: DLLCombiner[A]): DLLCombiner[A] = ???
override def result(): Array[A] = ???
class Node[A](val value: A):
var next: Node[A] // null for last node.
var previous: Node[A] // null for first node.
```
**Question 1:** What computational complexity do your methods have? Are the actual complexities of your methods acceptable according to the `Combiner` requirements?
**Question 2:** One of the three methods you have implemented, `result`, should work in parallel according to the `Combiner` contract. Can you think of a way to implement this method efficiently using 2 parallel tasks?
**Question 3:** Can you, given the current internal representation of your combiner, implement `result` so that it executes efficiently using 4 parallel tasks? If not, can you think of a way to make it possible?
*Hint:* This is an open-ended question, there might be multiple solutions. In your solution, you may want to add extra information to the class Node and/or the class DLLCombiner.
## Problem 3: Pipelines
In this exercise, we look at pipelines of functions. A pipeline is simply a function which applies its argument successively to each function of a sequence. To illustrate this, consider the following pipeline of 4 functions:
```scala
val p: Int => Int = toPipeline(ParSeq(_ + 1, _ * 2, _ + 3, _ / 4))
```
The pipeline `p` is itself a function. Given a value `x`, the pipeline `p` will perform the following computations to process it. In the above example,
```scala
p(x) = (((x + 1) Application of first function
* 2) Application of second function
+ 3) Application of third function
/ 4 Application of fourth function
```
In this exercise, we will investigate the possibility to process such pipelines in parallel.
**Question 1:** Implement the following `toPipeline` function, which turns a parallel sequence of functions into a pipeline. You may use any of the parallel combinators available on `ParSeq`, such as the parallel `fold` or the parallel `reduce` methods.
```scala
def toPipeline(fs: ParSeq[A => A]): A => A = ???
```
*Hint:* Functions have a method called andThen, which implements function composition: it takes as argument another function and also returns a function. The returned function first applies the first function, and then applies the function passed as argument to that result. You may find it useful in your implementation of pipeline.
**Question 2:** Given that your `toPipeline` function works in parallel, would the pipelines it returns also work in parallel? Would you expect pipelines returned by a sequential implementation of toPipeline to execute any slower? If so, why?
Discuss those questions with your group and try to get a good understanding of what is happening.
**Question 3:** Instead of arbitrary functions, we will now consider functions that are constant everywhere except on a finite domain. We represent such functions in the following way:
```scala
class FiniteFun[A](mappings: immutable.Map[A, A], default: A):
def apply(x: A): A =
mappings.get(x) match
case Some(y) => y
case None => default
def andThen(that: FiniteFun[A]): FiniteFun[A] = ???
```
Implement the andThen method. Can pipelines of such finite functions be efficiently constructed in parallel using the appropriately modified `toPipeline` method? Can the resulting pipelines be efficiently executed?
**Question 4:** Compare the *work* and *depth* of the following two functions, assuming infinite parallelism. For which kind of input would the parallel version be asymptotically faster?
```scala
def applyAllSeq[A](x: A, fs: Seq[FiniteFun[A]]): A =
// Applying each function sequentially.
var y = x
for f <- fs do
y = f(y)
y
def applyAllPar[A](x: A, fs: ParSeq[FiniteFun[A]]): A =
if fs.isEmpty then x
else
// Computing the composition in parallel.
val p = fs.reduce(_ andThen _)
// Applying the pipeline.
p(x)
```
# Exercise Session 4
## Problem 1: Implementing `map` and `filter` on Futures
In this exercise, you will come up with an implementation of the `map` and `filter` methods of `MyFuture`. The `MyFuture` trait is a simplified version of the `Future` trait from the Scala standard library, with a single abstract method:
```scala
trait MyFuture[+T]:
def onComplete(callback: Try[T] => Unit): Unit
```
First of all, spend some time as a group to make sure that you understand what those methods are supposed to do. Then, complete the following code to implement the two methods:
```scala
extension [T](self: MyFuture[T])
def map[S](f: T => S): MyFuture[S] = ???
def filter(p: T => Boolean): MyFuture[T] = ???
```
In the case of `filter`, if the original `MyFuture` successfully returns a value which does not satisfy the predicate, the new `MyFuture` should return a `Failure` containing a `NoSuchElementException`.
See:
- [`Try` API documentation](https://dotty.epfl.ch/api/scala/util/Try.html)
- [`Future` API documentation](https://dotty.epfl.ch/api/scala/concurrent/Future.html)
- [Futures and Promises guide](https://docs.scala-lang.org/overviews/core/futures.html)
## Problem 2: Coordinator / Worker
In this exercise, you will implement a Coordinator / Worker actor system, in which one actor, the coordinator, dispatches work to other actors, the workers. Between the coordinator and the workers, only two kinds of messages are sent: `Request` and `Ready` messages.
```scala
enum Message:
case Request(computation: () => Unit)
case Ready
```
<details>
<summary><em>Note:</em> <code>enum</code> syntax</summary>
Enumerations are the Scala 3 idiomatic syntax to define algebraic data
types (ADTs). The code below is desugared to something equivalent to:
```scala
trait Message
case class Request(computation: () => Unit) extends Message
object Ready extends Message
```
which is the syntax used in the lecture videos.
See:
- [Translation of Enums and ADTs](
https://docs.scala-lang.org/scala3/reference/enums/desugarEnums.html)
- [Enums slides from CS210](https://gitlab.epfl.ch/lamp/cs210/-/blob/master/slides/progfun1-4-4.pdf)
</details>
---
The coordinator actor sends `Request` messages to workers to request them to perform some computation (passed as an argument of `Request`). Upon reception of a `Request`, a worker should perform the computation. Workers should send a `Ready` message to their coordinator whenever they finish executing the requested computation, and also right after they are created.
The coordinator actor itself receives requests through `Request` messages from clients. The coordinator actor should then dispatch the work to worker actors. The coordinator should however never send a request to a worker which has not declared itself ready via a `Ready` message beforehand.
Implement the `Coordinator` and `Worker` classes.
```scala
class Coordinator extends Actor:
???
override def receive = ???
class Worker(coordinator: Coordinator) extends Actor:
???
override def receive = ???
```
An example system using the `Coordinator` and `Worker` actors is shown below.
```scala
@main def problem2 = new TestKit(ActorSystem("coordinator-workers")) with ImplicitSender:
try
val coordinator = system.actorOf(Props(Coordinator()), "coordinator")
val workers = Seq.tabulate(4)(i =>
system.actorOf(Props(Worker(coordinator)), f"worker$i")
)
// Now, clients should be able to send requests to the coordinator…
coordinator ! Request(() => println(3 + 5))
coordinator ! Request(() => println(67 * 3))
// And so on...
finally shutdown(system)
```
*Hint*: In order to fulfill its job, the coordinator should remember which workers are ready and what requests are still to be allocated to a worker.
File deleted
# Exercise Session 5
## Problem 1: Message Processing Semantics
Consider the following actor system:
```scala
enum Protocol:
case Write(value: Int)
case Read(requester: ActorRef)
import Protocol.*
enum Responses:
case Answer(value: Int)
import Responses.*
class Memory extends Actor:
var value = 0
override def receive: Receive = {
case Write(newValue) => value = newValue
case Read(requester) => requester ! Answer(value)
}
class Client(memory: ActorRef) extends Actor:
override def receive: Receive = { case Answer(value) =>
println(value)
}
class MyProxy(memory: ActorRef) extends Actor:
override def receive: Receive = { case message =>
memory ! message
}
```
### Problem 1.1
And the following test:
```scala
@main def problem1_1 =
for _ <- 1 to 1000 do
val system = ActorSystem("example")
try
val memory = system.actorOf(Props(Memory()))
val client = system.actorOf(Props(Client(memory)))
memory ! Read(client)
memory ! Write(1)
finally system.terminate()
```
What are the possible values printed by the `println` command in the `Client` actor? Why?
### Problem 1.2
Now, consider the following test:
```scala
@main def problem1_2 =
for _ <- 1 to 1000 do
val system = ActorSystem("example")
try
val memory = system.actorOf(Props(Memory()))
val proxy = system.actorOf(Props(MyProxy(memory)))
val client = system.actorOf(Props(Client(memory)))
proxy ! Read(client)
memory ! Write(1)
finally system.terminate()
```
1. What are the possible values printed by the `println` command in the `Client2` actor? Why?
2. Would the output be different if the `Read` and `Write` messages were issued in the other order?
3. What if both messages are sent through the `Proxy` actor?
## Problem 2: The Josephus Problem
In this exercise, we will revisit the famous [*Josephus problem*](https://en.wikipedia.org/wiki/Josephus_problem). In this problem, a group of soldiers trapped by the enemy decide to commit suicide instead of surrendering. In order not to have to take their own lives, the soldiers devise a plan. All soldiers are arranged in a single circle. Each soldier, when it is their turn to act, has to kill the next soldier alive next to them in the clockwise direction. Then, the next soldier that is still alive in the same direction acts. This continues until there remains only one soldier in the circle. This last soldier is the lucky one, and can surrender if he decides to. The *Josephus problem* consists in finding the position in the circle of this lucky soldier, depending on the number of soldiers.
In this exercise, you will implement a *simulation* of the mass killing of the soldiers. Each soldier will be modeled by an actor. Soldiers are arranged in a circle and when their turn comes to act, they kill the next alive soldier in the circle. The next soldier that is still alive in the circle should act next. The last soldier remaining alive does not kill himself but prints out its number to the standard output.
The code below covers the creation of all actors and the initialization of the system. Your goal is to implement the `receive` method of `Soldier`.
*Hint:* Think about what state the soldier must have.
```scala
import akka.actor.*
object Soldier:
// The different messages that can be sent between the actors:
enum Protocol:
// The recipient should die.
case Death
// The recipient should update its next reference.
case Next(next: ActorRef)
// The recipient should act.
case Act
class Soldier(number: Int) extends Actor:
import Soldier.*
import Protocol.*
def receive: Receive = behavior(None, None, false)
def behavior(
next: Option[ActorRef],
killer: Option[ActorRef],
mustAct: Boolean
): Receive = ???
@main def problem2(n: Int) =
import Soldier.*
import Soldier.Protocol.*
// Initialization
val system = ActorSystem("mySystem")
require(n >= 1)
// Creation of the actors.
val actors = Seq.tabulate(n)(
(i: Int) => system.actorOf(Props(classOf[Soldier], i), "Soldier" + i)
)
// Inform all actors of the next actor in the circle.
for i <- 0 to (n - 2) do actors(i) ! Next(actors(i + 1))
actors(n - 1) ! Next(actors(0))
// Inform the first actor to start acting.
actors(0) ! Act
```
File deleted
exercises/images/2-1.png

30.4 KiB

exercises/images/2-2.png

27.5 KiB

# Exercise 1 : Introduction to Concurrency
## Question 1
Yes
## Question 2
#### Property 1 holds
Assuming that the execution of two concurrent threads only interleaves instructions and that reads and writes are executed atomically, it can be shown that property 1 always holds. Unfortunately, such strong guarantees are not offered by the Java Memory Model. If you are interested, have a look at the note below on the Java Memory Model.
#### Violation of property 2
Consider 2 threads that execute concurrently `transfer(from, to, amount)` with the exact same parameters. Assume that the account from has sufficient funds for at least one transfer.
Thread 1 executes until it has computed the value balanceFrom - amount and then stops. Thread 2 then executes in its entirety the call to `transfer(from, to, amount)`. Then thread 1 resumes its execution and completes the call to `transfer`.
At the end of this execution, the total amount of money held by the bank has changed. It has in fact increased by the value amount.
#### Note on the Java Memory Model
Assuming the Java Memory Model, both of the two properties can potentially be violated. Indeed, the model only ensures that the execution of each thread appears sequential to the thread itself, and not to any other concurrently running threads. Seemingly atomic instructions can be arbitrarily decomposed by the underlying virtual machine. Sequences of instructions can also be reordered at will by the VM, as long as the execution of a single thread appears as if it were executed sequentially. In these settings, both properties can be violated.
## Question 3
Variant 1
In this variant, property 2 can be violated. It is not vulnerable to deadlocks.
Variant 2
In this variant, none of the two properties can be violated. However, it is susceptible to deadlocks.
Variant 3
In this last variant, none of the two properties can be violated and no deadlock can occur. It is however still not entirely satisfactory, since no two threads can execute transfers in parallel, even when the accounts are totally disjointed. Can you think of a better solution?
## Question 1
```scala
def minMax(a: Array[Int]): (Int, Int) = {
val threshold = 10
def minMaxPar(from: Int, until: Int): (Int, Int) = {
if (until - from <= threshold) {
var i = from
var min = a(from)
var max = a(from)
while (i < until) {
val x = a(i)
if(x < min) min = x
if(x > max) max = x
i = i + 1
}
(min, max)
} else {
val mid = from + ((until - from) / 2)
val ((xMin, xMax),
(yMin, yMax)) = parallel(minMaxPar(from, mid),
minMaxPar(mid, until))
(min(xMin, yMin), max(xMax, yMax))
}
}
minMaxPar(0, a.size)
}
```
## Question 2
```scala
def minMax(data: ParSeq[Int]): (Int, Int) = data.map({
(x: Int) => (x, x)
}).reduce({
case ((mn1, mx1), (mn2, mx2)) => (min(mn1, mn2), max(mx1, mx2))
})
```
Or:
```scala
def minMax(data: ParSeq[Int]): (Int, Int) =
(data.reduce(min), data.reduce(max))
```
## Question 3
The function `f` must be associative. That is, for any `x`, `y`, `z`, it should be the case that:
```
f(x, f(y, z)) == f(f(x, y), z).
```
Both the `min` and `max` functions are associative. In addition, it can be easily shown that pairwise application of associative functions is also associative. From this follows that `f` is indeed associative.
# Exercise 1 : Aggregate
## Question 1
- g(f(z, x1), f(f(z, x2), x3))
- g(f(f(z, x1), x2), f(z, x3))
- g(g(f(z, x1), f(z, x2)), f(z, x3))
- g(f(z, x1), g(f(z, x2), f(z, x3)))
## Question 2
Variant 1
This might lead to different results.
Variant 2
This might lead to different results.
Variant 3
This always leads to the same result.
Variant 4
This always leads to the same result.
## Question 3
A property that implies the correctness is:
```
forall xs, ys. g(xs.F, ys.F) == (xs ++ ys).F (split-invariance)
```
where we define
```
xs.F == xs.foldLeft(z)(f)
```
The intuition is the following. Take any computation tree for
`xs.aggregate`. Such a tree has internal nodes labelled by g and segments processed using `foldLeft(z)(f)`. The split-invariance law above says that any internal g-node can be removed by concatenating the segments. By repeating this transformation, we obtain the entire result equals `xs.foldLeft(z)(f)`.
The split-invariance condition uses `foldLeft`. The following two conditions together are a bit simpler and imply split-invariance:
```
forall u. g(u,z) == u (g-right-unit)
forall u, v. g(u, f(v,x)) == f(g(u,v), x) (g-f-assoc)
```
Assume g-right-unit and g-f-assoc. We wish to prove split-invariance. We do so by induction on the length of `ys`. If ys has length zero, then `ys.foldLeft` gives `z`, so by g-right-unit both sides reduce to xs.foldLeft. Let `ys` have length `n>0` and assume by I.H. split-invariance holds for all `ys` of length strictly less than `n`. Let `ys == ys1 :+ y` (that is, y is the last element of `ys`). Then
```
g(xs.F, (ys1 :+ y).F) == (foldLeft definition)
g(xs.F, f(ys1.F, y)) == (by g-f-assoc)
f(g(xs.F, ys1.F), y) == (by I.H.)
f((xs++ys1).F, y) == (foldLeft definition)
((xs++ys1) :+ y).F == (properties of lists)
(xs++(ys1 :+ y)).F
```
## Question 4
```scala
def aggregate[B](z: B)(f: (B, A) => B, g: (B, B) => B): B =
if (this.isEmpty) z
else this.map((x: A) => f(z, x)).reduce(g)
```
## Question 5
```scala
def aggregate(z: B)(f: (B, A) => B, g: (B, B) => B): B = {
def go(s: Splitter[A]): B = {
if (s.remaining <= THRESHOLD)
s.foldLeft(z)(f)
else {
val splitted = s.split
val subs = splitted.map((t: Splitter[A]) => task { go(t) })
subs.map(_.join()).reduce(g)
}
}
go(splitter)
}
```
## Question 6
The version from question 4 may require 2 traversals (one for `map`, one for `reduce`) and does not benefit from the (potentially faster) sequential operator `f`.
# Exercise 2 : Depth
## Question 1
Somewhat counterintuitively, the property doesn't hold. To show this, let's take the following values for *L1*, *L2*, *T*, *c*, and *d*.
```
L1 = 10, L2 = 12, T = 11, c = 1, and d = 1.
```
Using those values, we get that:
```
D(L1) = 10
D(L2) = max(D(6), D(6)) + 1 = 7
```
## Question 2
*Proof sketch*
Define the following function D'(L).
![](images/2-2.png)
Show that *D(L) ≤ D'(L)* for all *1 ≤ L*.
Then, show that, for any *1 ≤ L1 ≤ L2* we have *D'(L1) ≤ D'(L2)*. This property can be shown by induction on *L2*.
Finally, let *n* be such that *L ≤ 2n < 2L*. We have that:
```
D(L) ≤ D'(L) Proven earlier.
≤ D'(2n) Also proven earlier.
≤ log2(2n) (d + cT) + cT
< log2(2L) (d + cT) + cT
= log2(L) (d + cT) + log2(2) (d + cT) + cT
= log2(L) (d + cT) + d + 2cT
```
Done.
# Exercise Session 3, Solutions
## Problem 1: Parallel Encoding
```scala
def rle(data: ParSeq[Char]): Buffer[(Char, Int)] =
def g(as: Buffer[(Char, Int)], bs: Buffer[(Char, Int)]) =
if as.isEmpty || bs.isEmpty || as.last._1 != bs.head._1 then
as ++ bs
else
as.init.append((as.last._1, as.last._2 + bs.head._2)) ++ bs.tail
def f(acc: Buffer[(Char, Int)], x: Char) =
if acc.isEmpty || acc.last._1 != x then
acc.append((x, 1))
else
acc.init.append((x, acc.last._2 + 1))
val z: Buffer[(Char, Int)] = Buffer.empty
data.aggregate(z)(f, g)
```
## Problem 2: Parallel Two Phase Construction
```scala
class DLLCombiner[A] extends Combiner[A, Array[A]]
var head: Node[A] = null
var last: Node[A] = null
var size: Int = 0
override def +=(elem: A): Unit =
val node = new Node(elem)
if size == 0 then
head = node
last = node
size = 1
else
last.next = node
node.previous = last
last = node
size += 1
override def combine(that: DLLCombiner[A]): DLLCombiner[A] =
if this.size == 0 then
that
else if that.size == 0 then
this
else
this.last.next = that.head
that.head.previous = this.last
this.size = this.size + that.size
this.last = that.last
this
// This is not implemented in parallel yet.
override def result(): Array[A] =
val data = new Array[A](size)
var current = head
var i = 0
while i < size do
data(i) = current.value
i += 1
current = current.next
data
```
### Question 1
The complexity of `+=` is constant, as well as the complexity of `combine`. This is obviously well within the desired complexity range. The result function takes time linear in the size of the data, which is acceptable according to the Combiner requirements. However, the result function should work in parallel according to the contract. This isn't the case here.
### Question 2
The idea is to copy the double linked list to the array from both ends at the same time. For this, we create a task that handles the second half of the array, while the current thread copied the first half.
```scala
override def result(): Array[A] =
val data = new Array[A](size)
val mid = size / 2
// This is executed on a different thread.
val taskEnd = task {
var i = size - 1
var current = last
while i >= mid do
data(i) = current.value
current = current.previous
i -= 1
}
// This is executed on the current thread.
var i = 0
var current = head
while i < mid do
data(i) = current.value
current = current.next
i += 1
taskEnd.join()
data
```
### Question 3
The actual answer to this question is: *it depends*. Two see why, we first make the following observation:
All implementations of the result function must consist of primarily two operations:
1. Moving to the next node in the list, and,
2. Copying the value of the node to the array.
Depending on the actual cost of the two operations, one may devise schemes that can make efficient use of more than two threads. For instance, assume for a moment that copying a value to the array is significantly costlier than moving to the next node in the list. In this case, we could execute the function efficiently in parallel by spawning multiple threads starting from the head of the list, each handling a disjoint set of indexes (for instance, one thread takes indexes of the form 4n, another 4n + 1 and so on).
On the other hand, if we assume that moving to the next node in the list has a cost comparable to the one of copying a value to the array, then finding such a strategy is more challenging, or even impossible.
However, there are ways to circumvent this problem by modifying the data structure used. One way could be to keep track of the middle of the double linked lists. The result function could then execute in parallel on 4 different threads by copying the array from both ends and from the middle (in both directions) simultaneously. The problem would then be to efficiently maintain the pointer to the middle of the list, which might not be a trivial task when combining arbitrary lists together. If you are interested in learning more about such data-structures, we encourage you to look up the skip list data structure, which generalises on this idea.
Another solution would be to modify the nodes so that they also point to their successor's successor and their predecessor's predecessor. This way, two threads could start from the start of the list and two from the end. In each case, one thread would be responsible for odd indexes and the other for even ones. This solution does not change at all the complexity of the various Combiner operations, but requires a bit more bookkeeping.
## Problem 3: Pipelines
### Question 1
```scala
def toPipeline(fs: ParSeq[A => A]): A => A =
if (fs.isEmpty)
(x: A) => x
else
fs.reduce(_ andThen _)
```
### Question 2
Even though the pipeline is constructed in parallel, *it will not itself execute in parallel*. The resulting pipeline still has to apply its argument to all the functions it contains sequentially. This is due to the fact that the andThen method simply returns a function that will apply the first function and then the second, sequentially.
### Question 3
```scala
def andThen(that: FiniteFun[A]): FiniteFun[A] = {
val newDefault = that(default)
val newMappings = for
(x, y) <- mappings
val z = that(y)
if (z != newDefault)
yield (x, z)
FiniteFun(newMappings, newDefault)
}
```
Pipelines of such functions can be efficiently constructed in parallel, as was the case for "normal" functions also. Also, interestingly, the resulting pipeline can be executed efficiently. The execution time of the pipeline does not depend on the number of functions in the pipeline, only on the lookup time in the finite map mappings (which can be nearly constant time if the underlying map is a hashtable). The size of this map is upper bounded by the size of the mappings of the functions in the pipeline.
### Question 4
To simplify the analysis, we will assume that lookup in the mappings takes constant time, and thus that applying a FiniteFun also takes constant time. Let's also assume that fs is of size `n` for both functions.
Since the function is purely sequential, the work and depth of applyAllSeq are equal. They amount to `n` applications of a finite function, which is linear in `n`.
For applyAllPar, things are a bit more complex. Let's denote by `d` the size of the largest domain of all functions passed as argument.
The depth of the function is simply the depth of computing the pipeline (`fs.reduce(_ andThen _)`) plus a constant for applying the pipeline. Assuming infinite parallelism, this results in a depth that is in `O(log2(n) ⋅ d)`.
The work of applyAllPar is significantly more than its depth, and can be upper bounded by `O(n ⋅ d)`. Indeed, there are `n` applications of the `andThen` method, each of which takes `O(d)` time.
When `d` is a constant, then the parallel version will be asymptotically faster than its sequential counterpart. If `d` is exponentially larger than `n`, then the sequential version is expected to perform better.
.vscode
.metals
.bloop
.bsp
target
metals.sbt
build.properties
project/project
version = "3.4.0"
runner.dialect = scala3
rewrite.scala3.convertToNewSyntax = true
rewrite.scala3.removeOptionalBraces = true
# Exercise Session 4, Solutions
- Problem 1: [Problem1.scala](src/main/scala/Problem1.scala) and [Problem1Test.scala](src/test/scala/Problem1Test.scala)
- Problem 2: [Problem2.scala](src/main/scala/Problem2.scala)
val scala3Version = "3.1.2"
val akkaVersion = "2.6.19"
lazy val root = project
.in(file("."))
.settings(
name := "code",
version := "0.1.0-SNAPSHOT",
scalaVersion := scala3Version,
fork := true,
javaOptions ++= Seq(
"-Dakka.loglevel=Debug",
"-Dakka.actor.debug.receive=on"
),
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion,
"junit" % "junit" % "4.13" % Test,
"com.github.sbt" % "junit-interface" % "0.13.3" % Test
),
Test / testOptions += Tests.Argument(TestFrameworks.JUnit)
)
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
import scala.util.{Try, Success, Failure}
import scala.concurrent.ExecutionContext
import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicReference
// Test using `sbt "testOnly Problem1Test"`
// See tests in Problem1Test.scala
trait MyFuture[+T]:
def onComplete(callback: Try[T] => Unit): Unit
extension [T](self: MyFuture[T])
def map[S](f: T => S): MyFuture[S] =
new MyFuture:
def onComplete(callback: Try[S] => Unit): Unit =
self.onComplete {
case Success(v) => callback(Success(f(v)))
case Failure(e) => callback(Failure(e))
}
def filter(p: T => Boolean): MyFuture[T] =
new MyFuture:
def onComplete(callback: Try[T] => Unit): Unit =
self.onComplete {
case Success(v) =>
if p(v) then callback(Success(v))
else callback(Failure(new NoSuchElementException()))
case Failure(e) => callback(Failure(e))
}
import akka.actor.{Actor, Props, ActorSystem, ActorRef, ActorLogging}
import akka.testkit.{TestKit, ImplicitSender}
import akka.event.LoggingReceive
// Run using `sbt "runMain problem2"`
/** Type of messages exchanged between our Actors.
*
* Note: enumerations are the Scala 3 idiomatic syntax to define algebraic data
* types (ADTs). The code below is desugared to something equivalent to:
*
* ```
* trait Message
* case class Request(computation: () => Unit) extends Message
* object Ready extends Message
* ```
*
* which is the syntax used in the lecture videos.
*
* Read also:
* - Translation of Enums and ADTs:
* https://docs.scala-lang.org/scala3/reference/enums/desugarEnums.html
* - Enums slides from CS210:
* https://gitlab.epfl.ch/lamp/cs210/-/blob/master/slides/progfun1-4-4.pdf
*/
enum Message:
case Request(computation: () => Unit)
case Ready
import Message.*
class Coordinator extends Actor:
var availableWorkers: List[ActorRef] = Nil
var pendingRequests: List[Request] = Nil
override def receive = LoggingReceive {
case Ready =>
if pendingRequests.isEmpty then
availableWorkers = availableWorkers :+ sender()
else
val request = pendingRequests.head
pendingRequests = pendingRequests.tail
sender() ! request
case request: Request =>
availableWorkers match
case worker :: rest =>
worker ! request
availableWorkers = rest
case Nil =>
pendingRequests = pendingRequests :+ request
}
class Worker(coordinator: ActorRef) extends Actor:
coordinator ! Ready
override def receive: Receive = LoggingReceive { case Request(f) =>
f()
coordinator ! Ready
}
@main def problem2 = new TestKit(ActorSystem("coordinator-workers"))
with ImplicitSender:
try
val coordinator = system.actorOf(Props(Coordinator()), "coordinator")
val workers = Seq.tabulate(4)(i =>
system.actorOf(Props(Worker(coordinator)), f"worker$i")
)
// Now, clients should be able to send requests to the coordinator…
coordinator ! Request(() => println(3 + 5))
coordinator ! Request(() => println(67 * 3))
// And so on...
finally shutdown(system)