Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • lamp/cs206
  • bwermeil/cs206-2020
  • zabifade/cs206-2020
  • cauderan/cs206-2020
  • malonga/cs206-2020
  • dumoncel/cs206
  • bounekhe/cs206
  • bergerault/cs206
  • flealsan/cs206
  • hsu/cs206
  • mouchel/cs206
  • vebraun/cs206
  • vcanard/cs206
  • ybelghmi/cs206
  • belghmi/cs206
  • bousbina/cs206
  • waked/cs206
  • gtagemou/cs206
  • arahmoun/cs206
  • elhachem/cs206
  • benrahha/cs206
  • benslima/cs206
22 results
Show changes
Showing
with 0 additions and 1448 deletions
package m15
abstract class AbstractBlockingQueue[T] extends Monitor {
private var underlying: List[T] = Nil
def getUnderlying(): List[T] =
underlying
def setUnderlying(newValue: List[T]): Unit =
underlying = newValue
def put(elem: T): Unit
def take(): T
}
package m15
abstract class AbstractThreadPoolExecutor {
def execute(task: Unit => Unit): Unit
def start(): Unit
def shutdown(): Unit
}
package m15
object M15 {
/** A thread pool that executes submitted task using one of several threads */
class ThreadPoolExecutor(taskQueue: BlockingQueue[Unit => Unit], poolSize: Int)
extends AbstractThreadPoolExecutor {
private class Worker extends Thread {
override def run(): Unit = {
try {
while (true) {
val task = taskQueue.take()
task(())
}
} catch {
case e: InterruptedException =>
// Nothing to do here, we are shutting down gracefully.
}
}
}
private val workers: List[Worker] = List.fill(poolSize)(new Worker())
/** Executes the given task, passed by name. */
def execute(task: Unit => Unit): Unit =
taskQueue.put(task)
/** Starts the thread pool. */
def start(): Unit =
workers.foreach(_.start())
/** Instantly shuts down all actively executing tasks using an interrupt. */
def shutdown(): Unit =
workers.foreach(_.interrupt())
}
/**
* A queue whose take operations blocks until the queue become non-empty.
* Elements must be retrived from this queue in a first in, first out order.
* All methods of this class are thread safe, that is, they can safely
* be used from multiple thread without any particular synchronization.
*/
class BlockingQueue[T] extends AbstractBlockingQueue[T] {
// The state of this queue is stored in an underlying List[T] defined in
// the AbstractBlockingQueue class. Your implementation should access and
// update this list using the following setter and getter methods:
// - def getUnderlying(): List[T]
// - def setUnderlying(newValue: List[T]): Unit
// Using these methods is required for testing purposes.
/** Inserts the specified element into this queue (non-blocking) */
def put(elem: T): Unit =
this.synchronized {
val underlying = getUnderlying()
setUnderlying(elem :: underlying)
this.notifyAll()
}
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available (blocking).
* This queue operates in a first in, first out order.
*/
def take(): T =
this.synchronized {
while (getUnderlying().isEmpty)
this.wait()
val underlying = getUnderlying()
val last = underlying.last
setUnderlying(underlying.init)
last
}
}
}
package m15
class Dummy
trait Monitor {
implicit val dummy: Dummy = new Dummy
def wait()(implicit i: Dummy) = waitDefault()
def synchronized[T](e: => T)(implicit i: Dummy) = synchronizedDefault(e)
def notify()(implicit i: Dummy) = notifyDefault()
def notifyAll()(implicit i: Dummy) = notifyAllDefault()
private val lock = new AnyRef
// Can be overriden.
def waitDefault(): Unit = lock.wait()
def synchronizedDefault[T](toExecute: => T): T = lock.synchronized(toExecute)
def notifyDefault(): Unit = lock.notify()
def notifyAllDefault(): Unit = lock.notifyAll()
}
package m15
import instrumentation.SchedulableBlockingQueue
import instrumentation.TestHelper._
import instrumentation.TestUtils._
class M15Suite extends munit.FunSuite {
import M15._
test("ThreadPool should put jobs in the queue, Workers should execute jobs from the queue (10pts)") {
case class PutE(e: Unit => Unit) extends Exception
val nThreads = 3
var taken = false
class TestBlockingQueue extends BlockingQueue[Unit => Unit] {
override def put(e: Unit => Unit): Unit =
throw new PutE(e)
override def take(): Unit => Unit =
x => {
taken = true
Thread.sleep(10 * 1000)
}
}
val tpe = new ThreadPoolExecutor(new TestBlockingQueue, nThreads)
val unit2unit: Unit => Unit = x => ()
try {
tpe.execute(unit2unit)
assert(false, "ThreadPoolExecutor does not put jobs in the queue")
} catch {
case PutE(e) =>
assert(e == unit2unit)
}
tpe.start()
Thread.sleep(1000)
assert(taken, s"ThreadPoolExecutor workers do no execute jobs from the queue")
tpe.shutdown()
}
test("BlockingQueue should work in a sequential setting (1pts)") {
testSequential[(Int, Int, Int, Int)]{ sched =>
val queue = new SchedulableBlockingQueue[Int](sched)
queue.put(1)
queue.put(2)
queue.put(3)
queue.put(4)
(queue.take(),
queue.take(),
queue.take(),
queue.take())
}{ tuple =>
(tuple == (1, 2, 3, 4), s"Expected (1, 2, 3, 4) got $tuple")
}
}
test("BlockingQueue should work when Thread 1: 'put(1)', Thread 2: 'take' (3pts)") {
testManySchedules(2, sched => {
val queue = new SchedulableBlockingQueue[Int](sched)
(List(() => queue.put(1), () => queue.take()),
args => (args(1) == 1, s"Expected 1, got ${args(1)}"))
})
}
test("BlockingQueue should not be able to take from an empty queue (3pts)") {
testSequential[Boolean]{ sched =>
val queue = new SchedulableBlockingQueue[Int](sched);
queue.put(1)
queue.put(2)
queue.take()
queue.take()
failsOrTimesOut(queue.take())
}{ res =>
(res, "Was able to retrieve an element from an empty queue")
}
}
test("Should work when Thread 1: 'put(1)', Thread 2: 'put(2)', Thread 3: 'take', and a buffer of size 1") {
testManySchedules(3, sched => {
val prodCons = new SchedulableBlockingQueue[Int](sched)
(List(() => prodCons.put(1), () => prodCons.put(2), () => prodCons.take())
, args => {
val takeRes = args(2).asInstanceOf[Int]
val nocreation = (takeRes == 1 || takeRes == 2)
if (!nocreation)
(false, s"'take' should return either 1 or 2")
else (true, "")
})
})
}
// testing no duplication
test("Should work when Thread 1: 'put(1)', Thread 2: 'put(2)', Thread 3: 'take', Thread 4: 'take', and a buffer of size 3") {
testManySchedules(4, sched => {
val prodCons = new SchedulableBlockingQueue[Int](sched)
(List(() => prodCons.put(1), () => prodCons.put(2), () => prodCons.take(), () => prodCons.take())
, args => {
def m(): (Boolean, String) = {
val takeRes1 = args(2).asInstanceOf[Int]
val takeRes2 = args(3).asInstanceOf[Int]
val nocreation = (x: Int) => List(1, 2).contains(x)
if (!nocreation(takeRes1))
return (false, s"'Thread 3: take' returned $takeRes1 but should return a value in {1, 2, 3}")
if (!nocreation(takeRes2))
return (false, s"'Thread 4: take' returned $takeRes2 but should return a value in {1, 2, 3}")
val noduplication = takeRes1 != takeRes2
if (!noduplication)
(false, s"'Thread 3 and 4' returned the same value: $takeRes1")
else (true, "")
}
m()
})
})
}
// testing no duplication with 5 threads
test("Should work when Thread 1: 'put(1)', Thread 2: 'put(2)', Thread 3: 'put(3)', Thread 4: 'take', Thread 5: 'take', and a buffer of size 1") {
testManySchedules(5, sched => {
val prodCons = new SchedulableBlockingQueue[Int](sched)
(List(() => prodCons.put(1), () => prodCons.put(2), () => prodCons.put(3),
() => prodCons.take(), () => prodCons.take())
, args => {
def m(): (Boolean, String) = {
val takeRes1 = args(3).asInstanceOf[Int]
val takeRes2 = args(4).asInstanceOf[Int]
val nocreation = (x: Int) => List(1, 2, 3).contains(x)
if (!nocreation(takeRes1))
return (false, s"'Thread 4: take' returned $takeRes1 but should return a value in {1, 2, 3}")
if (!nocreation(takeRes2))
return (false, s"'Thread 5: take' returned $takeRes2 but should return a value in {1, 2, 3}")
val noduplication = takeRes1 != takeRes2
if (!noduplication)
return (false, s"'Thread 4 and 5' returned the same value: $takeRes1")
else (true, "")
}
m()
})
})
}
// testing fifo buffer size 1
test("Should work when Thread 1: 'put(1); put(2)', Thread 2: 'take', Thread 3: 'put(3)', Thread 4: 'put(4)', and a buffer of size 3") {
testManySchedules(4, sched => {
val prodCons = new SchedulableBlockingQueue[Int](sched)
(List(() => { prodCons.put(1); prodCons.put(2) }, () => prodCons.take(),
() => prodCons.put(3), () => prodCons.put(4))
, args => {
def m(): (Boolean, String) = {
val takeRes = args(1).asInstanceOf[Int]
// no creation
val nocreation = (x: Int) => List(1, 2, 3, 4).contains(x)
if (!nocreation(takeRes))
return (false, s"'Thread 2: take' returned $takeRes, but should return a value in {1, 2, 3, 4}")
// fifo (cannot have 2 without 1)
if (takeRes == 2)
(false, s"'Thread 2' returned 2 before returning 1")
else
(true, "")
}
m()
})
})
}
// testing fifo buffer size 5
test("Should work when Thread 1: 'put(1); put(2)', Thread 2: 'take', Thread 3: 'put(11)', Thread 4: 'put(10)', and a buffer of size 5") {
testManySchedules(4, sched => {
val prodCons = new SchedulableBlockingQueue[Int](sched)
(List(() => { prodCons.put(1); prodCons.put(2) }, () => prodCons.take(),
() => prodCons.put(11), () => prodCons.put(10))
, args => {
def m(): (Boolean, String) = {
val takeRes = args(1).asInstanceOf[Int]
// no creation
val nocreation = (x: Int) => List(1, 2, 10, 11).contains(x)
if (!nocreation(takeRes))
return (false, s"'Thread 2: take' returned $takeRes, but should return a value in {1, 2, 10, 11}")
// fifo (cannot have 2 without 1)
if (takeRes == 2)
(false, s"'Thread 2' returned 2 before returning 1")
else
(true, "")
}
m()
})
})
}
// testing fifo on more complicated case
test("Should work when Thread 1: 'put(1); put(3)', Thread 2: 'put(2)', Thread 3: 'put(4)', Thread 4: 'take', Thread 5: 'take', and a buffer of size 10") {
testManySchedules(5, sched => {
val prodCons = new SchedulableBlockingQueue[Int](sched)
(List(() => { prodCons.put(1); prodCons.put(3) }, () => prodCons.put(2),
() => prodCons.put(4), () => prodCons.take(), () => prodCons.take())
, args => {
def m(): (Boolean, String) = {
val takeRes1 = args(3).asInstanceOf[Int]
val takeRes2 = args(4).asInstanceOf[Int]
// no creation
val nocreation = (x: Int) => List(1, 2, 3, 4).contains(x)
if (!nocreation(takeRes1))
return (false, s"'Thread 4: take' returned $takeRes1 but should return a value in {1, 2, 3, 4}")
if (!nocreation(takeRes2))
return (false, s"'Thread 5: take' returned $takeRes2 but should return a value in {1, 2, 3, 4}")
// no duplication
if (takeRes1 == takeRes2)
return (false, s"'Thread 4 and 5' returned the same value: $takeRes1")
// fifo (cannot have 3 without 1)
val takes = List(takeRes1, takeRes2)
if (takes.contains(3) && !takes.contains(1))
(false, s"'Thread 4 or 5' returned 3 before returning 1")
else
(true, "")
}
m()
})
})
}
// combining put and take in one thread
test("Should work when Thread 1: 'put(21); put(22)', Thread 2: 'take', Thread 3: 'put(23); take', Thread 4: 'put(24); take', and a buffer of size 2") {
testManySchedules(4, sched => {
val prodCons = new SchedulableBlockingQueue[Int](sched)
(List(() => { prodCons.put(21); prodCons.put(22) }, () => prodCons.take(),
() => { prodCons.put(23); prodCons.take() }, () => { prodCons.put(24); prodCons.take() })
, args => {
def m(): (Boolean, String) = {
val takes = List(args(1).asInstanceOf[Int], args(2).asInstanceOf[Int], args(3).asInstanceOf[Int])
// no creation
val vals = List(21, 22, 23, 24)
var i = 0
while (i < takes.length) {
val x = takes(i)
if (!vals.contains(x))
return (false, s"'Thread $i: take' returned $x but should return a value in $vals")
i += 1
}
// no duplication
if (takes.distinct.size != takes.size)
return (false, s"Takes did not return unique values: $takes")
// fifo (cannot have 22 without 21)
if (takes.contains(22) && !takes.contains(21))
(false, s"`Takes returned 22 before returning 21")
else
(true, "")
}
m()
})
})
}
// completely hidden hard to crack test
test("[Black box test] Values should be taken in the order they are put") {
testManySchedules(4, sched => {
val prodCons = new SchedulableBlockingQueue[(Char, Int)](sched)
val n = 2
(List(
() => for (i <- 1 to n) { prodCons.put(('a', i)) },
() => for (i <- 1 to n) { prodCons.put(('b', i)) },
() => for (i <- 1 to n) { prodCons.put(('c', i)) },
() => {
import scala.collection.mutable
var counts = mutable.HashMap.empty[Char, Int]
counts('a') = 0
counts('b') = 0
counts('c') = 0
for (i <- 1 to (3 * n)) {
val (c, n) = prodCons.take()
counts(c) += 1
assert(counts(c) == n)
}
})
, _ =>
(true, "")
)
})
}
}
package m15
package instrumentation
trait MockedMonitor extends Monitor {
def scheduler: Scheduler
// Can be overriden.
override def waitDefault() = {
scheduler.log("wait")
scheduler updateThreadState Wait(this, scheduler.threadLocks.tail)
}
override def synchronizedDefault[T](toExecute: =>T): T = {
scheduler.log("synchronized check")
val prevLocks = scheduler.threadLocks
scheduler updateThreadState Sync(this, prevLocks) // If this belongs to prevLocks, should just continue.
scheduler.log("synchronized -> enter")
try {
toExecute
} finally {
scheduler updateThreadState Running(prevLocks)
scheduler.log("synchronized -> out")
}
}
override def notifyDefault() = {
scheduler mapOtherStates {
state => state match {
case Wait(lockToAquire, locks) if lockToAquire == this => SyncUnique(this, state.locks)
case e => e
}
}
scheduler.log("notify")
}
override def notifyAllDefault() = {
scheduler mapOtherStates {
state => state match {
case Wait(lockToAquire, locks) if lockToAquire == this => Sync(this, state.locks)
case SyncUnique(lockToAquire, locks) if lockToAquire == this => Sync(this, state.locks)
case e => e
}
}
scheduler.log("notifyAll")
}
}
trait LockFreeMonitor extends Monitor {
override def waitDefault() = {
throw new Exception("Please use lock-free structures and do not use wait()")
}
override def synchronizedDefault[T](toExecute: =>T): T = {
throw new Exception("Please use lock-free structures and do not use synchronized()")
}
override def notifyDefault() = {
throw new Exception("Please use lock-free structures and do not use notify()")
}
override def notifyAllDefault() = {
throw new Exception("Please use lock-free structures and do not use notifyAll()")
}
}
abstract class ThreadState {
def locks: Seq[AnyRef]
}
trait CanContinueIfAcquiresLock extends ThreadState {
def lockToAquire: AnyRef
}
case object Start extends ThreadState { def locks: Seq[AnyRef] = Seq.empty }
case object End extends ThreadState { def locks: Seq[AnyRef] = Seq.empty }
case class Wait(lockToAquire: AnyRef, locks: Seq[AnyRef]) extends ThreadState
case class SyncUnique(lockToAquire: AnyRef, locks: Seq[AnyRef]) extends ThreadState with CanContinueIfAcquiresLock
case class Sync(lockToAquire: AnyRef, locks: Seq[AnyRef]) extends ThreadState with CanContinueIfAcquiresLock
case class Running(locks: Seq[AnyRef]) extends ThreadState
case class VariableReadWrite(locks: Seq[AnyRef]) extends ThreadState
package m15
package instrumentation
class SchedulableBlockingQueue[T](val scheduler: Scheduler)
extends m15.M15.BlockingQueue[T] with MockedMonitor {
private var underlying: List[T] = Nil
override def getUnderlying(): List[T] =
scheduler.exec {
underlying
}(s"Get $underlying")
override def setUnderlying(newValue: List[T]): Unit =
scheduler.exec {
underlying = newValue
}(s"Set $newValue")
}
package m15
package instrumentation
import java.util.concurrent._;
import scala.concurrent.duration._
import scala.collection.mutable._
import Stats._
import java.util.concurrent.atomic.AtomicInteger
sealed abstract class Result
case class RetVal(rets: List[Any]) extends Result
case class Except(msg: String, stackTrace: Array[StackTraceElement]) extends Result
case class Timeout(msg: String) extends Result
/**
* A class that maintains schedule and a set of thread ids.
* The schedules are advanced after an operation of a SchedulableBuffer is performed.
* Note: the real schedule that is executed may deviate from the input schedule
* due to the adjustments that had to be made for locks
*/
class Scheduler(sched: List[Int]) {
val maxOps = 500 // a limit on the maximum number of operations the code is allowed to perform
private var schedule = sched
private var numThreads = 0
private val realToFakeThreadId = Map[Long, Int]()
private val opLog = ListBuffer[String]() // a mutable list (used for efficient concat)
private val threadStates = Map[Int, ThreadState]()
/**
* Runs a set of operations in parallel as per the schedule.
* Each operation may consist of many primitive operations like reads or writes
* to shared data structure each of which should be executed using the function `exec`.
* @timeout in milliseconds
* @return true - all threads completed on time, false -some tests timed out.
*/
def runInParallel(timeout: Long, ops: List[() => Any]): Result = {
numThreads = ops.length
val threadRes = Array.fill(numThreads) { None: Any }
var exception: Option[Except] = None
val syncObject = new Object()
var completed = new AtomicInteger(0)
// create threads
val threads = ops.zipWithIndex.map {
case (op, i) =>
new Thread(new Runnable() {
def run(): Unit = {
val fakeId = i + 1
setThreadId(fakeId)
try {
updateThreadState(Start)
val res = op()
updateThreadState(End)
threadRes(i) = res
// notify the master thread if all threads have completed
if (completed.incrementAndGet() == ops.length) {
syncObject.synchronized { syncObject.notifyAll() }
}
} catch {
case e: Throwable if exception != None => // do nothing here and silently fail
case e: Throwable =>
log(s"throw ${e.toString}")
exception = Some(Except(s"Thread $fakeId crashed on the following schedule: \n" + opLog.mkString("\n"),
e.getStackTrace))
syncObject.synchronized { syncObject.notifyAll() }
//println(s"$fakeId: ${e.toString}")
//Runtime.getRuntime().halt(0) //exit the JVM and all running threads (no other way to kill other threads)
}
}
})
}
// start all threads
threads.foreach(_.start())
// wait for all threads to complete, or for an exception to be thrown, or for the time out to expire
var remTime = timeout
syncObject.synchronized {
timed { if(completed.get() != ops.length) syncObject.wait(timeout) } { time => remTime -= time }
}
if (exception.isDefined) {
exception.get
} else if (remTime <= 1) { // timeout ? using 1 instead of zero to allow for some errors
Timeout(opLog.mkString("\n"))
} else {
// every thing executed normally
RetVal(threadRes.toList)
}
}
// Updates the state of the current thread
def updateThreadState(state: ThreadState): Unit = {
val tid = threadId
synchronized {
threadStates(tid) = state
}
state match {
case Sync(lockToAquire, locks) =>
if (locks.indexOf(lockToAquire) < 0) waitForTurn else {
// Re-aqcuiring the same lock
updateThreadState(Running(lockToAquire +: locks))
}
case Start => waitStart()
case End => removeFromSchedule(tid)
case Running(_) =>
case _ => waitForTurn // Wait, SyncUnique, VariableReadWrite
}
}
def waitStart(): Unit = {
//while (threadStates.size < numThreads) {
//Thread.sleep(1)
//}
synchronized {
if (threadStates.size < numThreads) {
wait()
} else {
notifyAll()
}
}
}
def threadLocks = {
synchronized {
threadStates(threadId).locks
}
}
def threadState = {
synchronized {
threadStates(threadId)
}
}
def mapOtherStates(f: ThreadState => ThreadState) = {
val exception = threadId
synchronized {
for (k <- threadStates.keys if k != exception) {
threadStates(k) = f(threadStates(k))
}
}
}
def log(str: String) = {
if((realToFakeThreadId contains Thread.currentThread().getId())) {
val space = (" " * ((threadId - 1) * 2))
val s = space + threadId + ":" + "\n".r.replaceAllIn(str, "\n" + space + " ")
opLog += s
}
}
/**
* Executes a read or write operation to a global data structure as per the given schedule
* @param msg a message corresponding to the operation that will be logged
*/
def exec[T](primop: => T)(msg: => String, postMsg: => Option[T => String] = None): T = {
if(! (realToFakeThreadId contains Thread.currentThread().getId())) {
primop
} else {
updateThreadState(VariableReadWrite(threadLocks))
val m = msg
if(m != "") log(m)
if (opLog.size > maxOps)
throw new Exception(s"Total number of reads/writes performed by threads exceed $maxOps. A possible deadlock!")
val res = primop
postMsg match {
case Some(m) => log(m(res))
case None =>
}
res
}
}
private def setThreadId(fakeId: Int) = synchronized {
realToFakeThreadId(Thread.currentThread.getId) = fakeId
}
def threadId =
try {
realToFakeThreadId(Thread.currentThread().getId())
} catch {
case e: NoSuchElementException =>
throw new Exception("You are accessing shared variables in the constructor. This is not allowed. The variables are already initialized!")
}
private def isTurn(tid: Int) = synchronized {
(!schedule.isEmpty && schedule.head != tid)
}
def canProceed(): Boolean = {
val tid = threadId
canContinue match {
case Some((i, state)) if i == tid =>
//println(s"$tid: Runs ! Was in state $state")
canContinue = None
state match {
case Sync(lockToAquire, locks) => updateThreadState(Running(lockToAquire +: locks))
case SyncUnique(lockToAquire, locks) =>
mapOtherStates {
_ match {
case SyncUnique(lockToAquire2, locks2) if lockToAquire2 == lockToAquire => Wait(lockToAquire2, locks2)
case e => e
}
}
updateThreadState(Running(lockToAquire +: locks))
case VariableReadWrite(locks) => updateThreadState(Running(locks))
}
true
case Some((i, state)) =>
//println(s"$tid: not my turn but $i !")
false
case None =>
false
}
}
var threadPreference = 0 // In the case the schedule is over, which thread should have the preference to execute.
/** returns true if the thread can continue to execute, and false otherwise */
def decide(): Option[(Int, ThreadState)] = {
if (!threadStates.isEmpty) { // The last thread who enters the decision loop takes the decision.
//println(s"$threadId: I'm taking a decision")
if (threadStates.values.forall { case e: Wait => true case _ => false }) {
val waiting = threadStates.keys.map(_.toString).mkString(", ")
val s = if (threadStates.size > 1) "s" else ""
val are = if (threadStates.size > 1) "are" else "is"
throw new Exception(s"Deadlock: Thread$s $waiting $are waiting but all others have ended and cannot notify them.")
} else {
// Threads can be in Wait, Sync, SyncUnique, and VariableReadWrite mode.
// Let's determine which ones can continue.
val notFree = threadStates.collect { case (id, state) => state.locks }.flatten.toSet
val threadsNotBlocked = threadStates.toSeq.filter {
case (id, v: VariableReadWrite) => true
case (id, v: CanContinueIfAcquiresLock) => !notFree(v.lockToAquire) || (v.locks contains v.lockToAquire)
case _ => false
}
if (threadsNotBlocked.isEmpty) {
val waiting = threadStates.keys.map(_.toString).mkString(", ")
val s = if (threadStates.size > 1) "s" else ""
val are = if (threadStates.size > 1) "are" else "is"
val whoHasLock = threadStates.toSeq.flatMap { case (id, state) => state.locks.map(lock => (lock, id)) }.toMap
val reason = threadStates.collect {
case (id, state: CanContinueIfAcquiresLock) if !notFree(state.lockToAquire) =>
s"Thread $id is waiting on lock ${state.lockToAquire} held by thread ${whoHasLock(state.lockToAquire)}"
}.mkString("\n")
throw new Exception(s"Deadlock: Thread$s $waiting are interlocked. Indeed:\n$reason")
} else if (threadsNotBlocked.size == 1) { // Do not consume the schedule if only one thread can execute.
Some(threadsNotBlocked(0))
} else {
val next = schedule.indexWhere(t => threadsNotBlocked.exists { case (id, state) => id == t })
if (next != -1) {
//println(s"$threadId: schedule is $schedule, next chosen is ${schedule(next)}")
val chosenOne = schedule(next) // TODO: Make schedule a mutable list.
schedule = schedule.take(next) ++ schedule.drop(next + 1)
Some((chosenOne, threadStates(chosenOne)))
} else {
threadPreference = (threadPreference + 1) % threadsNotBlocked.size
val chosenOne = threadsNotBlocked(threadPreference) // Maybe another strategy
Some(chosenOne)
//threadsNotBlocked.indexOf(threadId) >= 0
/*
val tnb = threadsNotBlocked.map(_._1).mkString(",")
val s = if (schedule.isEmpty) "empty" else schedule.mkString(",")
val only = if (schedule.isEmpty) "" else " only"
throw new Exception(s"The schedule is $s but$only threads ${tnb} can continue")*/
}
}
}
} else canContinue
}
/**
* This will be called before a schedulable operation begins.
* This should not use synchronized
*/
var numThreadsWaiting = new AtomicInteger(0)
//var waitingForDecision = Map[Int, Option[Int]]() // Mapping from thread ids to a number indicating who is going to make the choice.
var canContinue: Option[(Int, ThreadState)] = None // The result of the decision thread Id of the thread authorized to continue.
private def waitForTurn = {
synchronized {
if (numThreadsWaiting.incrementAndGet() == threadStates.size) {
canContinue = decide()
notifyAll()
}
//waitingForDecision(threadId) = Some(numThreadsWaiting)
//println(s"$threadId Entering waiting with ticket number $numThreadsWaiting/${waitingForDecision.size}")
while (!canProceed()) wait()
}
numThreadsWaiting.decrementAndGet()
}
/**
* To be invoked when a thread is about to complete
*/
private def removeFromSchedule(fakeid: Int) = synchronized {
//println(s"$fakeid: I'm taking a decision because I finished")
schedule = schedule.filterNot(_ == fakeid)
threadStates -= fakeid
if (numThreadsWaiting.get() == threadStates.size) {
canContinue = decide()
notifyAll()
}
}
def getOperationLog() = opLog
}
package m15
package instrumentation
import java.lang.management._
/**
* A collection of methods that can be used to collect run-time statistics about Leon programs.
* This is mostly used to test the resources properties of Leon programs
*/
object Stats {
def timed[T](code: => T)(cont: Long => Unit): T = {
var t1 = System.currentTimeMillis()
val r = code
cont((System.currentTimeMillis() - t1))
r
}
def withTime[T](code: => T): (T, Long) = {
var t1 = System.currentTimeMillis()
val r = code
(r, (System.currentTimeMillis() - t1))
}
}
package m15
package instrumentation
import scala.util.Random
import scala.collection.mutable.{Map => MutableMap}
import Stats._
object TestHelper {
val noOfSchedules = 10000 // set this to 100k during deployment
val readWritesPerThread = 20 // maximum number of read/writes possible in one thread
val contextSwitchBound = 10
val testTimeout = 240 // the total time out for a test in seconds
val schedTimeout = 15 // the total time out for execution of a schedule in secs
// Helpers
/*def testManySchedules(op1: => Any): Unit = testManySchedules(List(() => op1))
def testManySchedules(op1: => Any, op2: => Any): Unit = testManySchedules(List(() => op1, () => op2))
def testManySchedules(op1: => Any, op2: => Any, op3: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3))
def testManySchedules(op1: => Any, op2: => Any, op3: => Any, op4: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3, () => op4))*/
def testSequential[T](ops: Scheduler => Any)(assertions: T => (Boolean, String)) =
testManySchedules(1,
(sched: Scheduler) => {
(List(() => ops(sched)),
(res: List[Any]) => assertions(res.head.asInstanceOf[T]))
})
/**
* @numThreads number of threads
* @ops operations to be executed, one per thread
* @assertion as condition that will executed after all threads have completed (without exceptions)
* the arguments are the results of the threads
*/
def testManySchedules(numThreads: Int,
ops: Scheduler =>
(List[() => Any], // Threads
List[Any] => (Boolean, String)) // Assertion
) = {
var timeout = testTimeout * 1000L
val threadIds = (1 to numThreads)
//(1 to scheduleLength).flatMap(_ => threadIds).toList.permutations.take(noOfSchedules).foreach {
val schedules = (new ScheduleGenerator(numThreads)).schedules()
var schedsExplored = 0
schedules.takeWhile(_ => schedsExplored <= noOfSchedules && timeout > 0).foreach {
//case _ if timeout <= 0 => // break
case schedule =>
schedsExplored += 1
val schedr = new Scheduler(schedule)
//println("Exploring Sched: "+schedule)
val (threadOps, assertion) = ops(schedr)
if (threadOps.size != numThreads)
throw new IllegalStateException(s"Number of threads: $numThreads, do not match operations of threads: $threadOps")
timed { schedr.runInParallel(schedTimeout * 1000, threadOps) } { t => timeout -= t } match {
case Timeout(msg) =>
throw new java.lang.AssertionError("assertion failed\n"+"The schedule took too long to complete. A possible deadlock! \n"+msg)
case Except(msg, stkTrace) =>
val traceStr = "Thread Stack trace: \n"+stkTrace.map(" at "+_.toString).mkString("\n")
throw new java.lang.AssertionError("assertion failed\n"+msg+"\n"+traceStr)
case RetVal(threadRes) =>
// check the assertion
val (success, custom_msg) = assertion(threadRes)
if (!success) {
val msg = "The following schedule resulted in wrong results: \n" + custom_msg + "\n" + schedr.getOperationLog().mkString("\n")
throw new java.lang.AssertionError("Assertion failed: "+msg)
}
}
}
if (timeout <= 0) {
throw new java.lang.AssertionError("Test took too long to complete! Cannot check all schedules as your code is too slow!")
}
}
/**
* A schedule generator that is based on the context bound
*/
class ScheduleGenerator(numThreads: Int) {
val scheduleLength = readWritesPerThread * numThreads
val rands = (1 to scheduleLength).map(i => new Random(0xcafe * i)) // random numbers for choosing a thread at each position
def schedules(): LazyList[List[Int]] = {
var contextSwitches = 0
var contexts = List[Int]() // a stack of thread ids in the order of context-switches
val remainingOps = MutableMap[Int, Int]()
remainingOps ++= (1 to numThreads).map(i => (i, readWritesPerThread)) // num ops remaining in each thread
val liveThreads = (1 to numThreads).toSeq.toBuffer
/**
* Updates remainingOps and liveThreads once a thread is chosen for a position in the schedule
*/
def updateState(tid: Int): Unit = {
val remOps = remainingOps(tid)
if (remOps == 0) {
liveThreads -= tid
} else {
remainingOps += (tid -> (remOps - 1))
}
}
val schedule = rands.foldLeft(List[Int]()) {
case (acc, r) if contextSwitches < contextSwitchBound =>
val tid = liveThreads(r.nextInt(liveThreads.size))
contexts match {
case prev :: tail if prev != tid => // we have a new context switch here
contexts +:= tid
contextSwitches += 1
case prev :: tail =>
case _ => // init case
contexts +:= tid
}
updateState(tid)
acc :+ tid
case (acc, _) => // here context-bound has been reached so complete the schedule without any more context switches
if (!contexts.isEmpty) {
contexts = contexts.dropWhile(remainingOps(_) == 0)
}
val tid = contexts match {
case top :: tail => top
case _ => liveThreads(0) // here, there has to be threads that have not even started
}
updateState(tid)
acc :+ tid
}
schedule #:: schedules()
}
}
}
package m15
package instrumentation
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object TestUtils {
def failsOrTimesOut[T](action: => T): Boolean = {
val asyncAction = Future {
action
}
try {
Await.result(asyncAction, 2000.millisecond)
} catch {
case _: Throwable => return true
}
return false
}
}
Use the following commands to make a fresh clone of your repository:
```
git clone -b m2 git@gitlab.epfl.ch:lamp/student-repositories-s21/cs206-GASPAR.git m2
```
## Useful links
* [A guide to the Scala parallel collections](https://docs.scala-lang.org/overviews/parallel-collections/overview.html)
* [The API documentation of the Scala parallel collections](https://www.javadoc.io/doc/org.scala-lang.modules/scala-parallel-collections_2.13/latest/scala/collection/index.html)
* [The API documentation of the Scala standard library](https://www.scala-lang.org/files/archive/api/2.13.4)
* [The API documentation of the Java standard library](https://docs.oracle.com/en/java/javase/15/docs/api/index.html)
**If you have issues with the IDE, try [reimporting the
build](https://gitlab.epfl.ch/lamp/cs206/-/blob/master/labs/example-lab.md#ide-features-like-type-on-hover-or-go-to-definition-do-not-work),
if you still have problems, use `compile` in sbt instead.**
## Exercise
Given the following sequential implementation of a function that computes the sequence of rolling geometric means, your task will be to complete and optimize a parallel version of this code.
```scala
/** Compute the rolling geometric mean of an array.
*
* For an array `arr = Arr(x1, x2, x3, ..., xn)` the result is
* `Arr(math.pow(x1, 1), math.pow((x1 + x2), 1.0/2), math.pow((x1 + x2 + x3), 1.0/3), ..., math.pow((x1 + x2 + x3 + ... + xn), 1.0/n))`
*/
def rollingGeoMeanParallel(arr: Arr[Int]): Arr[Double] = {
// Transform all numbers to roots with degree 1
val arr1 = arr.map(x => Root(x, 1))
// Compute the rolling geometric mean keeping the root structured
val arr2 = arr1.scan(Root(1, 0))((acc, x) => Root(acc.radicand * x.radicand, acc.degree + x.degree))
// Transform the roots to Doubles
arr2.map(root => root.toDouble)
// Drop the extra initial element that was added by the scan
arr3.tail
```
This implementation has some issues:
- It does not use parallelism
- Creates two intermediate arrays by calling `map`
- Creates an extra intermediate arrays by calling `tail`
- Scan returns an extra element we do not need
We want to parallelize and avoid the creation of the extra arrays.
As we are calling a `scan` the natural operations we need are `upsweep` and `downsweep`.
It is possible specialize those operations for our problem by letting those operations do the mapping.
It is also possible to change those operations to not generate the first element.
We give you a version of `rollingGeoMeanSequential` that partially implements the parallelization using `upsweep` and `downsweep`.
Your tasks in the exercise will be to:
- TASK 1: Implement the parallelization of `upsweep` and `downsweep`
- TASK 2: Remove the calls to the `map`
- TASK 3: Remove the call to `tail`
You can get partial points for solving part of the tasks.
The order of the tasks is a suggestion, you may do them in any order if that is simpler for you.
Look at the `Lib` trait to find the definitions of functions and classes you can use (or already used).
In this question we use a `Arr` array class instead of the normal `Array`. You may assume that this class has the same performance characteristics as the normal array. `Arr` provides only a limited set of operations.
# General
*.DS_Store
*.swp
*~
# Dotty
*.class
*.tasty
*.hasTasty
# sbt
target/
# IDE
.bsp
.bloop
.metals
.vscode
# datasets
stackoverflow-grading.csv
wikipedia-grading.dat
// Student tasks (i.e. submit, packageSubmission)
enablePlugins(StudentTasks)
course := "midterm"
assignment := "m2"
scalaVersion := "3.0.0-RC1"
scalacOptions ++= Seq("-language:implicitConversions", "-deprecation")
libraryDependencies += "org.scalameta" %% "munit" % "0.7.22"
val MUnitFramework = new TestFramework("munit.Framework")
testFrameworks += MUnitFramework
// Decode Scala names
testOptions += Tests.Argument(MUnitFramework, "-s")
testSuite := "m2.M2Suite"
File deleted
package sbt // To access the private[sbt] compilerReporter key
package filteringReporterPlugin
import Keys._
import ch.epfl.lamp._
object FilteringReporterPlugin extends AutoPlugin {
override lazy val projectSettings = Seq(
// Turn off warning coming from scalameter that we cannot fix without changing scalameter
compilerReporter in (Compile, compile) ~= { reporter => new FilteringReporter(reporter) }
)
}
class FilteringReporter(reporter: xsbti.Reporter) extends xsbti.Reporter {
def reset(): Unit = reporter.reset()
def hasErrors: Boolean = reporter.hasErrors
def hasWarnings: Boolean = reporter.hasWarnings
def printSummary(): Unit = reporter.printSummary()
def problems: Array[xsbti.Problem] = reporter.problems
def log(problem: xsbti.Problem): Unit = {
if (!problem.message.contains("An existential type that came from a Scala-2 classfile cannot be"))
reporter.log(problem)
}
def comment(pos: xsbti.Position, msg: String): Unit =
reporter.comment(pos, msg)
override def toString = s"CollectingReporter($reporter)"
}
package ch.epfl.lamp
import sbt._
import sbt.Keys._
/**
* Coursera uses two versions of each assignment. They both have the same assignment key and part id but have
* different item ids.
*
* @param key Assignment key
* @param partId Assignment partId
* @param itemId Item id of the non premium version
* @param premiumItemId Item id of the premium version (`None` if the assignment is optional)
*/
case class CourseraId(key: String, partId: String, itemId: String, premiumItemId: Option[String])
/**
* Settings shared by all assignments, reused in various tasks.
*/
object MOOCSettings extends AutoPlugin {
override def requires = super.requires && filteringReporterPlugin.FilteringReporterPlugin
object autoImport {
val course = SettingKey[String]("course")
val assignment = SettingKey[String]("assignment")
val options = SettingKey[Map[String, Map[String, String]]]("options")
val courseraId = settingKey[CourseraId]("Coursera-specific information identifying the assignment")
val testSuite = settingKey[String]("Fully qualified name of the test suite of this assignment")
.withRank(KeyRanks.Invisible)
// Convenient alias
type CourseraId = ch.epfl.lamp.CourseraId
val CourseraId = ch.epfl.lamp.CourseraId
}
import autoImport._
override val globalSettings: Seq[Def.Setting[_]] = Seq(
// supershell is verbose, buggy and useless.
useSuperShell := false
)
override val projectSettings: Seq[Def.Setting[_]] = Seq(
parallelExecution in Test := false,
// Report test result after each test instead of waiting for every test to finish
logBuffered in Test := false,
name := s"${course.value}-${assignment.value}"
)
}
package ch.epfl.lamp
import sbt._
import Keys._
// import scalaj.http._
import java.io.{File, FileInputStream, IOException}
import org.apache.commons.codec.binary.Base64
// import play.api.libs.json.{Json, JsObject, JsPath}
import scala.util.{Failure, Success, Try}
/**
* Provides tasks for submitting the assignment
*/
object StudentTasks extends AutoPlugin {
override def requires = super.requires && MOOCSettings
object autoImport {
val packageSourcesOnly = TaskKey[File]("packageSourcesOnly", "Package the sources of the project")
val packageBinWithoutResources = TaskKey[File]("packageBinWithoutResources", "Like packageBin, but without the resources")
val packageSubmissionZip = TaskKey[File]("packageSubmissionZip")
val packageSubmission = inputKey[Unit]("package solution as an archive file")
lazy val Grading = config("grading") extend(Runtime)
}
import autoImport._
import MOOCSettings.autoImport._
override lazy val projectSettings = Seq(
packageSubmissionSetting,
fork := true,
connectInput in run := true,
outputStrategy := Some(StdoutOutput),
) ++
packageSubmissionZipSettings ++
inConfig(Grading)(Defaults.testSettings ++ Seq(
unmanagedJars += file("grading-tests.jar"),
definedTests := (definedTests in Test).value,
internalDependencyClasspath := (internalDependencyClasspath in Test).value
))
/** **********************************************************
* SUBMITTING A SOLUTION TO COURSERA
*/
val packageSubmissionZipSettings = Seq(
packageSubmissionZip := {
val submission = crossTarget.value / "submission.zip"
val sources = (packageSourcesOnly in Compile).value
val binaries = (packageBinWithoutResources in Compile).value
IO.zip(Seq(sources -> "sources.zip", binaries -> "binaries.jar"), submission, None)
submission
},
artifactClassifier in packageSourcesOnly := Some("sources"),
artifact in (Compile, packageBinWithoutResources) ~= (art => art.withName(art.name + "-without-resources"))
) ++
inConfig(Compile)(
Defaults.packageTaskSettings(packageSourcesOnly, Defaults.sourceMappings) ++
Defaults.packageTaskSettings(packageBinWithoutResources, Def.task {
val relativePaths =
(unmanagedResources in Compile).value.flatMap(Path.relativeTo((unmanagedResourceDirectories in Compile).value)(_))
(mappings in (Compile, packageBin)).value.filterNot { case (_, path) => relativePaths.contains(path) }
})
)
val maxSubmitFileSize = {
val mb = 1024 * 1024
10 * mb
}
/** Check that the jar exists, isn't empty, isn't crazy big, and can be read
* If so, encode jar as base64 so we can send it to Coursera
*/
def prepareJar(jar: File, s: TaskStreams): String = {
val errPrefix = "Error submitting assignment jar: "
val fileLength = jar.length()
if (!jar.exists()) {
s.log.error(errPrefix + "jar archive does not exist\n" + jar.getAbsolutePath)
failSubmit()
} else if (fileLength == 0L) {
s.log.error(errPrefix + "jar archive is empty\n" + jar.getAbsolutePath)
failSubmit()
} else if (fileLength > maxSubmitFileSize) {
s.log.error(errPrefix + "jar archive is too big. Allowed size: " +
maxSubmitFileSize + " bytes, found " + fileLength + " bytes.\n" +
jar.getAbsolutePath)
failSubmit()
} else {
val bytes = new Array[Byte](fileLength.toInt)
val sizeRead = try {
val is = new FileInputStream(jar)
val read = is.read(bytes)
is.close()
read
} catch {
case ex: IOException =>
s.log.error(errPrefix + "failed to read sources jar archive\n" + ex.toString)
failSubmit()
}
if (sizeRead != bytes.length) {
s.log.error(errPrefix + "failed to read the sources jar archive, size read: " + sizeRead)
failSubmit()
} else encodeBase64(bytes)
}
}
/** Task to package solution to a given file path */
lazy val packageSubmissionSetting = packageSubmission := {
val args: Seq[String] = Def.spaceDelimited("[path]").parsed
val s: TaskStreams = streams.value // for logging
val jar = (packageSubmissionZip in Compile).value
val base64Jar = prepareJar(jar, s)
val path = args.headOption.getOrElse((baseDirectory.value / "submission.jar").absolutePath)
scala.tools.nsc.io.File(path).writeAll(base64Jar)
}
/*
/** Task to submit a solution to coursera */
val submit = inputKey[Unit]("submit solution to Coursera")
lazy val submitSetting = submit := {
// Fail if scalafix linting does not pass.
scalafixLinting.value
val args: Seq[String] = Def.spaceDelimited("<arg>").parsed
val s: TaskStreams = streams.value // for logging
val jar = (packageSubmissionZip in Compile).value
val assignmentDetails =
courseraId.?.value.getOrElse(throw new MessageOnlyException("This assignment can not be submitted to Coursera because the `courseraId` setting is undefined"))
val assignmentKey = assignmentDetails.key
val courseName =
course.value match {
case "capstone" => "scala-capstone"
case "bigdata" => "scala-spark-big-data"
case other => other
}
val partId = assignmentDetails.partId
val itemId = assignmentDetails.itemId
val premiumItemId = assignmentDetails.premiumItemId
val (email, secret) = args match {
case email :: secret :: Nil =>
(email, secret)
case _ =>
val inputErr =
s"""|Invalid input to `submit`. The required syntax for `submit` is:
|submit <email-address> <submit-token>
|
|The submit token is NOT YOUR LOGIN PASSWORD.
|It can be obtained from the assignment page:
|https://www.coursera.org/learn/$courseName/programming/$itemId
|${
premiumItemId.fold("") { id =>
s"""or (for premium learners):
|https://www.coursera.org/learn/$courseName/programming/$id
""".stripMargin
}
}
""".stripMargin
s.log.error(inputErr)
failSubmit()
}
val base64Jar = prepareJar(jar, s)
val json =
s"""|{
| "assignmentKey":"$assignmentKey",
| "submitterEmail":"$email",
| "secret":"$secret",
| "parts":{
| "$partId":{
| "output":"$base64Jar"
| }
| }
|}""".stripMargin
def postSubmission[T](data: String): Try[HttpResponse[String]] = {
val http = Http("https://www.coursera.org/api/onDemandProgrammingScriptSubmissions.v1")
val hs = List(
("Cache-Control", "no-cache"),
("Content-Type", "application/json")
)
s.log.info("Connecting to Coursera...")
val response = Try(http.postData(data)
.headers(hs)
.option(HttpOptions.connTimeout(10000)) // scalaj default timeout is only 100ms, changing that to 10s
.asString) // kick off HTTP POST
response
}
val connectMsg =
s"""|Attempting to submit "${assignment.value}" assignment in "$courseName" course
|Using:
|- email: $email
|- submit token: $secret""".stripMargin
s.log.info(connectMsg)
def reportCourseraResponse(response: HttpResponse[String]): Unit = {
val code = response.code
val respBody = response.body
/* Sample JSON response from Coursera
{
"message": "Invalid email or token.",
"details": {
"learnerMessage": "Invalid email or token."
}
}
*/
// Success, Coursera responds with 2xx HTTP status code
if (response.is2xx) {
val successfulSubmitMsg =
s"""|Successfully connected to Coursera. (Status $code)
|
|Assignment submitted successfully!
|
|You can see how you scored by going to:
|https://www.coursera.org/learn/$courseName/programming/$itemId/
|${
premiumItemId.fold("") { id =>
s"""or (for premium learners):
|https://www.coursera.org/learn/$courseName/programming/$id
""".stripMargin
}
}
|and clicking on "My Submission".""".stripMargin
s.log.info(successfulSubmitMsg)
}
// Failure, Coursera responds with 4xx HTTP status code (client-side failure)
else if (response.is4xx) {
val result = Try(Json.parse(respBody)).toOption
val learnerMsg = result match {
case Some(resp: JsObject) =>
(JsPath \ "details" \ "learnerMessage").read[String].reads(resp).get
case Some(x) => // shouldn't happen
"Could not parse Coursera's response:\n" + x
case None =>
"Could not parse Coursera's response:\n" + respBody
}
val failedSubmitMsg =
s"""|Submission failed.
|There was something wrong while attempting to submit.
|Coursera says:
|$learnerMsg (Status $code)""".stripMargin
s.log.error(failedSubmitMsg)
}
// Failure, Coursera responds with 5xx HTTP status code (server-side failure)
else if (response.is5xx) {
val failedSubmitMsg =
s"""|Submission failed.
|Coursera seems to be unavailable at the moment (Status $code)
|Check https://status.coursera.org/ and try again in a few minutes.
""".stripMargin
s.log.error(failedSubmitMsg)
}
// Failure, Coursera repsonds with an unexpected status code
else {
val failedSubmitMsg =
s"""|Submission failed.
|Coursera replied with an unexpected code (Status $code)
""".stripMargin
s.log.error(failedSubmitMsg)
}
}
// kick it all off, actually make request
postSubmission(json) match {
case Success(resp) => reportCourseraResponse(resp)
case Failure(e) =>
val failedConnectMsg =
s"""|Connection to Coursera failed.
|There was something wrong while attempting to connect to Coursera.
|Check your internet connection.
|${e.toString}""".stripMargin
s.log.error(failedConnectMsg)
}
}
*/
def failSubmit(): Nothing = {
sys.error("Submission failed")
}
/**
* *****************
* DEALING WITH JARS
*/
def encodeBase64(bytes: Array[Byte]): String =
new String(Base64.encodeBase64(bytes))
}
sbt.version=1.4.7