Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • lamp/cs206
  • bwermeil/cs206-2020
  • zabifade/cs206-2020
  • cauderan/cs206-2020
  • malonga/cs206-2020
  • dumoncel/cs206
  • bounekhe/cs206
  • bergerault/cs206
  • flealsan/cs206
  • hsu/cs206
  • mouchel/cs206
  • vebraun/cs206
  • vcanard/cs206
  • ybelghmi/cs206
  • belghmi/cs206
  • bousbina/cs206
  • waked/cs206
  • gtagemou/cs206
  • arahmoun/cs206
  • elhachem/cs206
  • benrahha/cs206
  • benslima/cs206
22 results
Show changes
Showing
with 0 additions and 1346 deletions
package ch.epfl.lamp
import sbt._
import sbt.Keys._
/**
* Coursera uses two versions of each assignment. They both have the same assignment key and part id but have
* different item ids.
*
* @param key Assignment key
* @param partId Assignment partId
* @param itemId Item id of the non premium version
* @param premiumItemId Item id of the premium version (`None` if the assignment is optional)
*/
case class CourseraId(key: String, partId: String, itemId: String, premiumItemId: Option[String])
/**
* Settings shared by all assignments, reused in various tasks.
*/
object MOOCSettings extends AutoPlugin {
override def requires = super.requires && filteringReporterPlugin.FilteringReporterPlugin
object autoImport {
val course = SettingKey[String]("course")
val assignment = SettingKey[String]("assignment")
val options = SettingKey[Map[String, Map[String, String]]]("options")
val courseraId = settingKey[CourseraId]("Coursera-specific information identifying the assignment")
val testSuite = settingKey[String]("Fully qualified name of the test suite of this assignment")
.withRank(KeyRanks.Invisible)
// Convenient alias
type CourseraId = ch.epfl.lamp.CourseraId
val CourseraId = ch.epfl.lamp.CourseraId
}
import autoImport._
override val globalSettings: Seq[Def.Setting[_]] = Seq(
// supershell is verbose, buggy and useless.
useSuperShell := false
)
override val projectSettings: Seq[Def.Setting[_]] = Seq(
parallelExecution in Test := false,
// Report test result after each test instead of waiting for every test to finish
logBuffered in Test := false,
name := s"${course.value}-${assignment.value}"
)
}
package ch.epfl.lamp
import sbt._
import Keys._
// import scalaj.http._
import java.io.{File, FileInputStream, IOException}
import org.apache.commons.codec.binary.Base64
// import play.api.libs.json.{Json, JsObject, JsPath}
import scala.util.{Failure, Success, Try}
/**
* Provides tasks for submitting the assignment
*/
object StudentTasks extends AutoPlugin {
override def requires = super.requires && MOOCSettings
object autoImport {
val packageSourcesOnly = TaskKey[File]("packageSourcesOnly", "Package the sources of the project")
val packageBinWithoutResources = TaskKey[File]("packageBinWithoutResources", "Like packageBin, but without the resources")
val packageSubmissionZip = TaskKey[File]("packageSubmissionZip")
val packageSubmission = inputKey[Unit]("package solution as an archive file")
lazy val Grading = config("grading") extend(Runtime)
}
import autoImport._
import MOOCSettings.autoImport._
override lazy val projectSettings = Seq(
packageSubmissionSetting,
fork := true,
connectInput in run := true,
outputStrategy := Some(StdoutOutput),
) ++
packageSubmissionZipSettings ++
inConfig(Grading)(Defaults.testSettings ++ Seq(
unmanagedJars += file("grading-tests.jar"),
definedTests := (definedTests in Test).value,
internalDependencyClasspath := (internalDependencyClasspath in Test).value
))
/** **********************************************************
* SUBMITTING A SOLUTION TO COURSERA
*/
val packageSubmissionZipSettings = Seq(
packageSubmissionZip := {
val submission = crossTarget.value / "submission.zip"
val sources = (packageSourcesOnly in Compile).value
val binaries = (packageBinWithoutResources in Compile).value
IO.zip(Seq(sources -> "sources.zip", binaries -> "binaries.jar"), submission, None)
submission
},
artifactClassifier in packageSourcesOnly := Some("sources"),
artifact in (Compile, packageBinWithoutResources) ~= (art => art.withName(art.name + "-without-resources"))
) ++
inConfig(Compile)(
Defaults.packageTaskSettings(packageSourcesOnly, Defaults.sourceMappings) ++
Defaults.packageTaskSettings(packageBinWithoutResources, Def.task {
val relativePaths =
(unmanagedResources in Compile).value.flatMap(Path.relativeTo((unmanagedResourceDirectories in Compile).value)(_))
(mappings in (Compile, packageBin)).value.filterNot { case (_, path) => relativePaths.contains(path) }
})
)
val maxSubmitFileSize = {
val mb = 1024 * 1024
10 * mb
}
/** Check that the jar exists, isn't empty, isn't crazy big, and can be read
* If so, encode jar as base64 so we can send it to Coursera
*/
def prepareJar(jar: File, s: TaskStreams): String = {
val errPrefix = "Error submitting assignment jar: "
val fileLength = jar.length()
if (!jar.exists()) {
s.log.error(errPrefix + "jar archive does not exist\n" + jar.getAbsolutePath)
failSubmit()
} else if (fileLength == 0L) {
s.log.error(errPrefix + "jar archive is empty\n" + jar.getAbsolutePath)
failSubmit()
} else if (fileLength > maxSubmitFileSize) {
s.log.error(errPrefix + "jar archive is too big. Allowed size: " +
maxSubmitFileSize + " bytes, found " + fileLength + " bytes.\n" +
jar.getAbsolutePath)
failSubmit()
} else {
val bytes = new Array[Byte](fileLength.toInt)
val sizeRead = try {
val is = new FileInputStream(jar)
val read = is.read(bytes)
is.close()
read
} catch {
case ex: IOException =>
s.log.error(errPrefix + "failed to read sources jar archive\n" + ex.toString)
failSubmit()
}
if (sizeRead != bytes.length) {
s.log.error(errPrefix + "failed to read the sources jar archive, size read: " + sizeRead)
failSubmit()
} else encodeBase64(bytes)
}
}
/** Task to package solution to a given file path */
lazy val packageSubmissionSetting = packageSubmission := {
val args: Seq[String] = Def.spaceDelimited("[path]").parsed
val s: TaskStreams = streams.value // for logging
val jar = (packageSubmissionZip in Compile).value
val base64Jar = prepareJar(jar, s)
val path = args.headOption.getOrElse((baseDirectory.value / "submission.jar").absolutePath)
scala.tools.nsc.io.File(path).writeAll(base64Jar)
}
/*
/** Task to submit a solution to coursera */
val submit = inputKey[Unit]("submit solution to Coursera")
lazy val submitSetting = submit := {
// Fail if scalafix linting does not pass.
scalafixLinting.value
val args: Seq[String] = Def.spaceDelimited("<arg>").parsed
val s: TaskStreams = streams.value // for logging
val jar = (packageSubmissionZip in Compile).value
val assignmentDetails =
courseraId.?.value.getOrElse(throw new MessageOnlyException("This assignment can not be submitted to Coursera because the `courseraId` setting is undefined"))
val assignmentKey = assignmentDetails.key
val courseName =
course.value match {
case "capstone" => "scala-capstone"
case "bigdata" => "scala-spark-big-data"
case other => other
}
val partId = assignmentDetails.partId
val itemId = assignmentDetails.itemId
val premiumItemId = assignmentDetails.premiumItemId
val (email, secret) = args match {
case email :: secret :: Nil =>
(email, secret)
case _ =>
val inputErr =
s"""|Invalid input to `submit`. The required syntax for `submit` is:
|submit <email-address> <submit-token>
|
|The submit token is NOT YOUR LOGIN PASSWORD.
|It can be obtained from the assignment page:
|https://www.coursera.org/learn/$courseName/programming/$itemId
|${
premiumItemId.fold("") { id =>
s"""or (for premium learners):
|https://www.coursera.org/learn/$courseName/programming/$id
""".stripMargin
}
}
""".stripMargin
s.log.error(inputErr)
failSubmit()
}
val base64Jar = prepareJar(jar, s)
val json =
s"""|{
| "assignmentKey":"$assignmentKey",
| "submitterEmail":"$email",
| "secret":"$secret",
| "parts":{
| "$partId":{
| "output":"$base64Jar"
| }
| }
|}""".stripMargin
def postSubmission[T](data: String): Try[HttpResponse[String]] = {
val http = Http("https://www.coursera.org/api/onDemandProgrammingScriptSubmissions.v1")
val hs = List(
("Cache-Control", "no-cache"),
("Content-Type", "application/json")
)
s.log.info("Connecting to Coursera...")
val response = Try(http.postData(data)
.headers(hs)
.option(HttpOptions.connTimeout(10000)) // scalaj default timeout is only 100ms, changing that to 10s
.asString) // kick off HTTP POST
response
}
val connectMsg =
s"""|Attempting to submit "${assignment.value}" assignment in "$courseName" course
|Using:
|- email: $email
|- submit token: $secret""".stripMargin
s.log.info(connectMsg)
def reportCourseraResponse(response: HttpResponse[String]): Unit = {
val code = response.code
val respBody = response.body
/* Sample JSON response from Coursera
{
"message": "Invalid email or token.",
"details": {
"learnerMessage": "Invalid email or token."
}
}
*/
// Success, Coursera responds with 2xx HTTP status code
if (response.is2xx) {
val successfulSubmitMsg =
s"""|Successfully connected to Coursera. (Status $code)
|
|Assignment submitted successfully!
|
|You can see how you scored by going to:
|https://www.coursera.org/learn/$courseName/programming/$itemId/
|${
premiumItemId.fold("") { id =>
s"""or (for premium learners):
|https://www.coursera.org/learn/$courseName/programming/$id
""".stripMargin
}
}
|and clicking on "My Submission".""".stripMargin
s.log.info(successfulSubmitMsg)
}
// Failure, Coursera responds with 4xx HTTP status code (client-side failure)
else if (response.is4xx) {
val result = Try(Json.parse(respBody)).toOption
val learnerMsg = result match {
case Some(resp: JsObject) =>
(JsPath \ "details" \ "learnerMessage").read[String].reads(resp).get
case Some(x) => // shouldn't happen
"Could not parse Coursera's response:\n" + x
case None =>
"Could not parse Coursera's response:\n" + respBody
}
val failedSubmitMsg =
s"""|Submission failed.
|There was something wrong while attempting to submit.
|Coursera says:
|$learnerMsg (Status $code)""".stripMargin
s.log.error(failedSubmitMsg)
}
// Failure, Coursera responds with 5xx HTTP status code (server-side failure)
else if (response.is5xx) {
val failedSubmitMsg =
s"""|Submission failed.
|Coursera seems to be unavailable at the moment (Status $code)
|Check https://status.coursera.org/ and try again in a few minutes.
""".stripMargin
s.log.error(failedSubmitMsg)
}
// Failure, Coursera repsonds with an unexpected status code
else {
val failedSubmitMsg =
s"""|Submission failed.
|Coursera replied with an unexpected code (Status $code)
""".stripMargin
s.log.error(failedSubmitMsg)
}
}
// kick it all off, actually make request
postSubmission(json) match {
case Success(resp) => reportCourseraResponse(resp)
case Failure(e) =>
val failedConnectMsg =
s"""|Connection to Coursera failed.
|There was something wrong while attempting to connect to Coursera.
|Check your internet connection.
|${e.toString}""".stripMargin
s.log.error(failedConnectMsg)
}
}
*/
def failSubmit(): Nothing = {
sys.error("Submission failed")
}
/**
* *****************
* DEALING WITH JARS
*/
def encodeBase64(bytes: Array[Byte]): String =
new String(Base64.encodeBase64(bytes))
}
sbt.version=1.4.7
// Used for Coursera submission (StudentPlugin)
// libraryDependencies += "org.scalaj" %% "scalaj-http" % "2.4.2"
// libraryDependencies += "com.typesafe.play" %% "play-json" % "2.7.4"
// Used for Base64 (StudentPlugin)
libraryDependencies += "commons-codec" % "commons-codec" % "1.10"
// addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.28")
addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.8.8")
addSbtPlugin("ch.epfl.lamp" % "sbt-dotty" % "0.5.3")
package f3
import instrumentation._
import scala.collection.mutable
import scala.collection.concurrent.TrieMap
type FileName = String
/** An API for manipulating files. */
trait FileSystem:
/** Create a new file named `file` with the passed `content`. */
def createFile(file: FileName, content: String): Unit
/** If `file` exists, return its content, otherwise crashes. */
def readFile(file: FileName): String
/** If `file` exists, delete it, otherwise crash. */
def deleteFile(file: FileName): Unit
end FileSystem
/** An in-memory file system for testing purposes implemented using a Map.
*
* Every method in this class is thread-safe.
*/
class InMemoryFileSystem extends FileSystem:
val fsMap: mutable.Map[FileName, String] = TrieMap()
def createFile(file: FileName, content: String): Unit =
assert(!fsMap.contains(file), s"$file already exists")
fsMap(file) = content
def readFile(file: FileName): String =
fsMap.get(file) match
case Some(content) => content
case None => assert(false, s"Attempt to read non-existing $file")
def deleteFile(file: FileName): Unit =
fsMap.remove(file)
end InMemoryFileSystem
package f3
import instrumentation._
/** A synchronization mechanism allowing multiple reads to proceed concurrently
* with an update to the state.
*/
class RCU extends Monitor:
protected val latestVersion: AtomicLong = AtomicLong(0)
protected val readersVersion: ThreadMap[Long] = ThreadMap()
/** This method must be called before accessing shared data for reading. */
def startRead(): Unit =
assert(!readersVersion.currentThreadHasValue,
"startRead() cannot be called multiple times without an intervening stopRead()")
readersVersion.setCurrentThreadValue(latestVersion.get)
/** Once a thread which has previously called `startRead` has finished reading
* shared data, it must call this method.
*/
def stopRead(): Unit =
assert(readersVersion.currentThreadHasValue,
"stopRead() cannot be called without a preceding startRead()")
readersVersion.deleteCurrentThreadValue()
/** Wait until all reads started before this method was called have finished,
* then return.
*/
def waitForOldReads(): Unit =
val newVersion = latestVersion.incrementAndGet()
readersVersion.waitForall(_ >= newVersion)
package f3
import instrumentation._
import scala.collection.mutable
/** A map which associates every thread to at most one value of type A.
*
* Every method in this class is thread-safe.
*/
class ThreadMap[A] extends Monitor:
protected val theMap: mutable.Map[Thread, A] = mutable.Map()
/** Return the value in the map entry for the current thread if it exists,
* otherwise None. */
def currentThreadValue: Option[A] = synchronized {
theMap.get(Thread.currentThread)
}
/** Is there a map entry for the current thread? */
def currentThreadHasValue: Boolean =
synchronized {
theMap.contains(Thread.currentThread)
}
/** Set the map entry of the current thread to `value` and notify any thread
* waiting on `waitForall`. */
def setCurrentThreadValue(value: A): Unit =
synchronized {
theMap(Thread.currentThread) = value
notifyAll()
}
/** Delete the map entry associated with this thread (if it exists) and notify
* all threads waiting in `waitForall`. */
def deleteCurrentThreadValue(): Unit =
synchronized {
theMap.remove(Thread.currentThread)
notifyAll()
}
/** Wait until `predicate` returns true for all map entries, then return. */
def waitForall(predicate: A => Boolean): Unit =
synchronized {
while !theMap.forall((_, value) => predicate(value)) do
wait()
}
end ThreadMap
package f3
import instrumentation._
class UpdateServer(fs: FileSystem) extends Monitor:
val rcu = new RCU
/** The name of the file containing the latest update.
*
* This is `@volatile` to guarantee that `fetchUpdate` always sees the latest
* filename.
*/
@volatile private var updateFile: Option[FileName] = None
/** Return the content of the latest update if one is available, otherwise None.
*
* This method is thread-safe.
*/
def fetchUpdate(): Option[String] =
// TODO: use `rcu`
rcu.startRead()
val value = updateFile.map(fs.readFile)
rcu.stopRead()
value
/** Define a new update, more precisely this will:
* - Create a new update file called `newName` with content `newContent`
* - Ensure that any future call to `fetchUpdate` returns the new update
* content.
* - Delete the old update file.
*
* This method is _NOT_ thread-safe, it cannot be safely called from multiple
* threads at once.
*/
def newUpdate(newName: FileName, newContent: String): Unit =
// TODO: use `rcu`
val oldFile = updateFile
fs.createFile(newName, newContent)
updateFile = Some(newName)
rcu.waitForOldReads()
oldFile.foreach(fs.deleteFile)
end UpdateServer
package f3.instrumentation
/** A long value that may be updated atomically. */
class AtomicLong(initial: Long):
private val atomic = new java.util.concurrent.atomic.AtomicLong(initial)
/** Get the current value. */
def get: Long = atomic.get()
/** Set to the given `value`. */
def set(value: Long): Unit = atomic.set(value)
/** Atomically increment by one the current value and return the _original_ value. */
def getAndIncrement(): Long =
atomic.getAndIncrement()
/** Atomically increment by one the current value and return the _updated_ value. */
def incrementAndGet(): Long =
atomic.incrementAndGet()
/** Atomically set the value to `newValue` if the current value == `expected`.
*
* Return true if successful, otherwise return false to indicate that the
* actual value was not equal to the expected value.
*/
def compareAndSet(expected: Long, newValue: Long): Boolean =
atomic.compareAndSet(expected, newValue)
end AtomicLong
package f3.instrumentation
class Dummy
trait Monitor {
implicit val dummy: Dummy = new Dummy
def wait()(implicit i: Dummy) = waitDefault()
def synchronized[T](e: => T)(implicit i: Dummy) = synchronizedDefault(e)
def notify()(implicit i: Dummy) = notifyDefault()
def notifyAll()(implicit i: Dummy) = notifyAllDefault()
private val lock = new AnyRef
// Can be overridden.
def waitDefault(): Unit = lock.wait()
def synchronizedDefault[T](toExecute: =>T): T = lock.synchronized(toExecute)
def notifyDefault(): Unit = lock.notify()
def notifyAllDefault(): Unit = lock.notifyAll()
}
package f3
import scala.annotation.tailrec
import scala.concurrent._
import scala.concurrent.duration._
import scala.collection.mutable.HashMap
import scala.util.Random
import instrumentation._
import instrumentation.TestHelper._
import instrumentation.TestUtils._
class F3Suite extends munit.FunSuite:
test("Part 1: ThreadMap (3pts)") {
testManySchedules(4, sched =>
val tmap = new SchedulableThreadMap[Int](sched)
def writeThread(): Unit =
tmap.setCurrentThreadValue(0)
tmap.setCurrentThreadValue(-1)
val readBack = tmap.currentThreadValue
assertEquals(readBack, Some(-1))
def writeAndDeleteThread(): Unit =
tmap.setCurrentThreadValue(42)
tmap.deleteCurrentThreadValue()
@tailrec
def waitThread(): Unit =
tmap.waitForall(_ < 0)
val all = tmap.allValues
if all != List(-1) then
waitThread()
val threads = List(
() => writeThread(),
() => writeAndDeleteThread(),
() => waitThread(),
() => waitThread(),
)
(threads, _ => (true, ""))
)
}
test("Part 2: RCU (5pts)") {
testManySchedules(3, sched =>
val rcu = new SchedulableRCU(sched)
case class State(value: Int, isDeleted: AtomicLong = SchedulableAtomicLong(0, sched, "isDeleted"))
val sharedState = SchedulableAtomicReference(State(0), sched, "sharedState")
def readThread(): Unit =
rcu.startRead()
val state = sharedState.get
val stateWasDeleted = state.isDeleted.get != 0
assert(!stateWasDeleted, "RCU shared state deleted in the middle of a read.")
rcu.stopRead()
def writeThread(): Unit =
val oldState = sharedState.get
sharedState.set(State(oldState.value + 1))
rcu.waitForOldReads()
oldState.isDeleted.set(1)
val threads = List(
() => readThread(),
() => readThread(),
() => writeThread(),
)
(threads, _ => (true, ""))
)
}
test("Part 3: UpdateServer (2pts)") {
testManySchedules(3, sched =>
val fs = SchedulableInMemoryFileSystem(sched)
val server = new SchedulableUpdateServer(sched, fs)
def writeThread(): Unit =
server.newUpdate("update1.bin", "Update 1")
server.newUpdate("update2.bin", "Update 2")
assertEquals(fs.fsMap.toSet, Set("update2.bin" -> "Update 2"))
def fetchThread(): Unit =
val res = server.fetchUpdate()
assert(List(None, Some("Update 1"), Some("Update 2")).contains(res),
s"fetchUpdate returned unexpected value $res")
val threads = List(
() => writeThread(),
() => fetchThread(),
() => fetchThread(),
)
(threads, _ => (true, ""))
)
}
import scala.concurrent.duration._
override val munitTimeout = 200.seconds
end F3Suite
package f3.instrumentation
class AtomicReference[T](initial: T) {
private val atomic = new java.util.concurrent.atomic.AtomicReference[T](initial)
def get: T = atomic.get()
def set(value: T): Unit = atomic.set(value)
}
package f3.instrumentation
trait MockedMonitor extends Monitor {
def scheduler: Scheduler
// Can be overriden.
override def waitDefault() = {
scheduler.log("wait")
scheduler updateThreadState Wait(this, scheduler.threadLocks.tail)
}
override def synchronizedDefault[T](toExecute: =>T): T = {
scheduler.log("synchronized check")
val prevLocks = scheduler.threadLocks
scheduler updateThreadState Sync(this, prevLocks) // If this belongs to prevLocks, should just continue.
scheduler.log("synchronized -> enter")
try {
toExecute
} finally {
scheduler updateThreadState Running(prevLocks)
scheduler.log("synchronized -> out")
}
}
override def notifyDefault() = {
scheduler mapOtherStates {
state => state match {
case Wait(lockToAquire, locks) if lockToAquire == this => SyncUnique(this, state.locks)
case e => e
}
}
scheduler.log("notify")
}
override def notifyAllDefault() = {
scheduler mapOtherStates {
state => state match {
case Wait(lockToAquire, locks) if lockToAquire == this => Sync(this, state.locks)
case SyncUnique(lockToAquire, locks) if lockToAquire == this => Sync(this, state.locks)
case e => e
}
}
scheduler.log("notifyAll")
}
}
trait LockFreeMonitor extends Monitor {
override def waitDefault() = {
throw new Exception("Please use lock-free structures and do not use wait()")
}
override def synchronizedDefault[T](toExecute: =>T): T = {
throw new Exception("Please use lock-free structures and do not use synchronized()")
}
override def notifyDefault() = {
throw new Exception("Please use lock-free structures and do not use notify()")
}
override def notifyAllDefault() = {
throw new Exception("Please use lock-free structures and do not use notifyAll()")
}
}
abstract class ThreadState {
def locks: Seq[AnyRef]
}
trait CanContinueIfAcquiresLock extends ThreadState {
def lockToAquire: AnyRef
}
case object Start extends ThreadState { def locks: Seq[AnyRef] = Seq.empty }
case object End extends ThreadState { def locks: Seq[AnyRef] = Seq.empty }
case class Wait(lockToAquire: AnyRef, locks: Seq[AnyRef]) extends ThreadState
case class SyncUnique(lockToAquire: AnyRef, locks: Seq[AnyRef]) extends ThreadState with CanContinueIfAcquiresLock
case class Sync(lockToAquire: AnyRef, locks: Seq[AnyRef]) extends ThreadState with CanContinueIfAcquiresLock
case class Running(locks: Seq[AnyRef]) extends ThreadState
case class VariableReadWrite(locks: Seq[AnyRef]) extends ThreadState
package f3.instrumentation
import java.util.concurrent._;
import scala.concurrent.duration._
import scala.collection.mutable._
import Stats._
import java.util.concurrent.atomic.AtomicInteger
sealed abstract class Result
case class RetVal(rets: List[Any]) extends Result
case class Except(msg: String, stackTrace: Array[StackTraceElement]) extends Result
case class Timeout(msg: String) extends Result
/**
* A class that maintains schedule and a set of thread ids.
* The schedules are advanced after an operation of a SchedulableBuffer is performed.
* Note: the real schedule that is executed may deviate from the input schedule
* due to the adjustments that had to be made for locks
*/
class Scheduler(sched: List[Int]) {
val maxOps = 500 // a limit on the maximum number of operations the code is allowed to perform
private var schedule = sched
private var numThreads = 0
private val realToFakeThreadId = Map[Long, Int]()
private val opLog = ListBuffer[String]() // a mutable list (used for efficient concat)
private val threadStates = Map[Int, ThreadState]()
/**
* Runs a set of operations in parallel as per the schedule.
* Each operation may consist of many primitive operations like reads or writes
* to shared data structure each of which should be executed using the function `exec`.
* @timeout in milliseconds
* @return true - all threads completed on time, false -some tests timed out.
*/
def runInParallel(timeout: Long, ops: List[() => Any]): Result = {
numThreads = ops.length
val threadRes = Array.fill(numThreads) { None: Any }
var exception: Option[Except] = None
val syncObject = new Object()
var completed = new AtomicInteger(0)
// create threads
val threads = ops.zipWithIndex.map {
case (op, i) =>
new Thread(new Runnable() {
def run(): Unit = {
val fakeId = i + 1
setThreadId(fakeId)
try {
updateThreadState(Start)
val res = op()
updateThreadState(End)
threadRes(i) = res
// notify the master thread if all threads have completed
if (completed.incrementAndGet() == ops.length) {
syncObject.synchronized { syncObject.notifyAll() }
}
} catch {
case e: Throwable if exception != None => // do nothing here and silently fail
case e: Throwable =>
log(s"throw ${e.toString}")
exception = Some(Except(s"Thread $fakeId crashed on the following schedule: \n" + opLog.mkString("\n"),
e.getStackTrace))
syncObject.synchronized { syncObject.notifyAll() }
//println(s"$fakeId: ${e.toString}")
//Runtime.getRuntime().halt(0) //exit the JVM and all running threads (no other way to kill other threads)
}
}
})
}
// start all threads
threads.foreach(_.start())
// wait for all threads to complete, or for an exception to be thrown, or for the time out to expire
var remTime = timeout
syncObject.synchronized {
timed { if(completed.get() != ops.length) syncObject.wait(timeout) } { time => remTime -= time }
}
if (exception.isDefined) {
exception.get
} else if (remTime <= 1) { // timeout ? using 1 instead of zero to allow for some errors
Timeout(opLog.mkString("\n"))
} else {
// every thing executed normally
RetVal(threadRes.toList)
}
}
// Updates the state of the current thread
def updateThreadState(state: ThreadState): Unit = {
val tid = threadId
synchronized {
threadStates(tid) = state
}
state match {
case Sync(lockToAquire, locks) =>
if (locks.indexOf(lockToAquire) < 0) waitForTurn else {
// Re-aqcuiring the same lock
updateThreadState(Running(lockToAquire +: locks))
}
case Start => waitStart()
case End => removeFromSchedule(tid)
case Running(_) =>
case _ => waitForTurn // Wait, SyncUnique, VariableReadWrite
}
}
def waitStart(): Unit = {
//while (threadStates.size < numThreads) {
//Thread.sleep(1)
//}
synchronized {
if (threadStates.size < numThreads) {
wait()
} else {
notifyAll()
}
}
}
def threadLocks = {
synchronized {
threadStates(threadId).locks
}
}
def threadState = {
synchronized {
threadStates(threadId)
}
}
def mapOtherStates(f: ThreadState => ThreadState) = {
val exception = threadId
synchronized {
for (k <- threadStates.keys if k != exception) {
threadStates(k) = f(threadStates(k))
}
}
}
def log(str: String) = {
if((realToFakeThreadId contains Thread.currentThread().getId())) {
val space = (" " * ((threadId - 1) * 2))
val s = space + threadId + ":" + "\n".r.replaceAllIn(str, "\n" + space + " ")
opLog += s
}
}
/**
* Executes a read or write operation to a global data structure as per the given schedule
* @param msg a message corresponding to the operation that will be logged
*/
def exec[T](primop: => T)(msg: => String, postMsg: => Option[T => String] = None): T = {
if(! (realToFakeThreadId contains Thread.currentThread().getId())) {
primop
} else {
updateThreadState(VariableReadWrite(threadLocks))
val m = msg
if(m != "") log(m)
if (opLog.size > maxOps)
throw new Exception(s"Total number of reads/writes performed by threads exceed $maxOps. A possible deadlock!")
val res = primop
postMsg match {
case Some(m) => log(m(res))
case None =>
}
res
}
}
private def setThreadId(fakeId: Int) = synchronized {
realToFakeThreadId(Thread.currentThread.getId) = fakeId
}
def threadId =
try {
realToFakeThreadId(Thread.currentThread().getId())
} catch {
case e: NoSuchElementException =>
throw new Exception("You are accessing shared variables in the constructor. This is not allowed. The variables are already initialized!")
}
private def isTurn(tid: Int) = synchronized {
(!schedule.isEmpty && schedule.head != tid)
}
def canProceed(): Boolean = {
val tid = threadId
canContinue match {
case Some((i, state)) if i == tid =>
//println(s"$tid: Runs ! Was in state $state")
canContinue = None
state match {
case Sync(lockToAquire, locks) => updateThreadState(Running(lockToAquire +: locks))
case SyncUnique(lockToAquire, locks) =>
mapOtherStates {
_ match {
case SyncUnique(lockToAquire2, locks2) if lockToAquire2 == lockToAquire => Wait(lockToAquire2, locks2)
case e => e
}
}
updateThreadState(Running(lockToAquire +: locks))
case VariableReadWrite(locks) => updateThreadState(Running(locks))
}
true
case Some((i, state)) =>
//println(s"$tid: not my turn but $i !")
false
case None =>
false
}
}
var threadPreference = 0 // In the case the schedule is over, which thread should have the preference to execute.
/** returns true if the thread can continue to execute, and false otherwise */
def decide(): Option[(Int, ThreadState)] = {
if (!threadStates.isEmpty) { // The last thread who enters the decision loop takes the decision.
//println(s"$threadId: I'm taking a decision")
if (threadStates.values.forall { case e: Wait => true case _ => false }) {
val waiting = threadStates.keys.map(_.toString).mkString(", ")
val s = if (threadStates.size > 1) "s" else ""
val are = if (threadStates.size > 1) "are" else "is"
throw new Exception(s"Deadlock: Thread$s $waiting $are waiting but all others have ended and cannot notify them.")
} else {
// Threads can be in Wait, Sync, SyncUnique, and VariableReadWrite mode.
// Let's determine which ones can continue.
val notFree = threadStates.collect { case (id, state) => state.locks }.flatten.toSet
val threadsNotBlocked = threadStates.toSeq.filter {
case (id, v: VariableReadWrite) => true
case (id, v: CanContinueIfAcquiresLock) => !notFree(v.lockToAquire) || (v.locks contains v.lockToAquire)
case _ => false
}
if (threadsNotBlocked.isEmpty) {
val waiting = threadStates.keys.map(_.toString).mkString(", ")
val s = if (threadStates.size > 1) "s" else ""
val are = if (threadStates.size > 1) "are" else "is"
val whoHasLock = threadStates.toSeq.flatMap { case (id, state) => state.locks.map(lock => (lock, id)) }.toMap
val reason = threadStates.collect {
case (id, state: CanContinueIfAcquiresLock) if !notFree(state.lockToAquire) =>
s"Thread $id is waiting on lock ${state.lockToAquire} held by thread ${whoHasLock(state.lockToAquire)}"
}.mkString("\n")
throw new Exception(s"Deadlock: Thread$s $waiting are interlocked. Indeed:\n$reason")
} else if (threadsNotBlocked.size == 1) { // Do not consume the schedule if only one thread can execute.
Some(threadsNotBlocked(0))
} else {
val next = schedule.indexWhere(t => threadsNotBlocked.exists { case (id, state) => id == t })
if (next != -1) {
//println(s"$threadId: schedule is $schedule, next chosen is ${schedule(next)}")
val chosenOne = schedule(next) // TODO: Make schedule a mutable list.
schedule = schedule.take(next) ++ schedule.drop(next + 1)
Some((chosenOne, threadStates(chosenOne)))
} else {
threadPreference = (threadPreference + 1) % threadsNotBlocked.size
val chosenOne = threadsNotBlocked(threadPreference) // Maybe another strategy
Some(chosenOne)
//threadsNotBlocked.indexOf(threadId) >= 0
/*
val tnb = threadsNotBlocked.map(_._1).mkString(",")
val s = if (schedule.isEmpty) "empty" else schedule.mkString(",")
val only = if (schedule.isEmpty) "" else " only"
throw new Exception(s"The schedule is $s but$only threads ${tnb} can continue")*/
}
}
}
} else canContinue
}
/**
* This will be called before a schedulable operation begins.
* This should not use synchronized
*/
var numThreadsWaiting = new AtomicInteger(0)
//var waitingForDecision = Map[Int, Option[Int]]() // Mapping from thread ids to a number indicating who is going to make the choice.
var canContinue: Option[(Int, ThreadState)] = None // The result of the decision thread Id of the thread authorized to continue.
private def waitForTurn = {
synchronized {
if (numThreadsWaiting.incrementAndGet() == threadStates.size) {
canContinue = decide()
notifyAll()
}
//waitingForDecision(threadId) = Some(numThreadsWaiting)
//println(s"$threadId Entering waiting with ticket number $numThreadsWaiting/${waitingForDecision.size}")
while (!canProceed()) wait()
}
numThreadsWaiting.decrementAndGet()
}
/**
* To be invoked when a thread is about to complete
*/
private def removeFromSchedule(fakeid: Int) = synchronized {
//println(s"$fakeid: I'm taking a decision because I finished")
schedule = schedule.filterNot(_ == fakeid)
threadStates -= fakeid
if (numThreadsWaiting.get() == threadStates.size) {
canContinue = decide()
notifyAll()
}
}
def getOperationLog() = opLog
}
/* Copyright 2009-2015 EPFL, Lausanne */
package f3.instrumentation
import java.lang.management._
/**
* A collection of methods that can be used to collect run-time statistics about Leon programs.
* This is mostly used to test the resources properties of Leon programs
*/
object Stats {
def timed[T](code: => T)(cont: Long => Unit): T = {
var t1 = System.currentTimeMillis()
val r = code
cont((System.currentTimeMillis() - t1))
r
}
def withTime[T](code: => T): (T, Long) = {
var t1 = System.currentTimeMillis()
val r = code
(r, (System.currentTimeMillis() - t1))
}
}
package f3.instrumentation
import scala.util.Random
import scala.collection.mutable.{Map => MutableMap}
import Stats._
object TestHelper {
val noOfSchedules = 10000 // set this to 100k during deployment
val readWritesPerThread = 20 // maximum number of read/writes possible in one thread
val contextSwitchBound = 10
val testTimeout = 150 // the total time out for a test in seconds
val schedTimeout = 15 // the total time out for execution of a schedule in secs
// Helpers
/*def testManySchedules(op1: => Any): Unit = testManySchedules(List(() => op1))
def testManySchedules(op1: => Any, op2: => Any): Unit = testManySchedules(List(() => op1, () => op2))
def testManySchedules(op1: => Any, op2: => Any, op3: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3))
def testManySchedules(op1: => Any, op2: => Any, op3: => Any, op4: => Any): Unit = testManySchedules(List(() => op1, () => op2, () => op3, () => op4))*/
def testSequential[T](ops: Scheduler => Any)(assertions: T => (Boolean, String)) =
testManySchedules(1,
(sched: Scheduler) => {
(List(() => ops(sched)),
(res: List[Any]) => assertions(res.head.asInstanceOf[T]))
})
/**
* @numThreads number of threads
* @ops operations to be executed, one per thread
* @assertion as condition that will executed after all threads have completed (without exceptions)
* the arguments are the results of the threads
*/
def testManySchedules(numThreads: Int,
ops: Scheduler =>
(List[() => Any], // Threads
List[Any] => (Boolean, String)) // Assertion
) = {
var timeout = testTimeout * 1000L
val threadIds = (1 to numThreads)
//(1 to scheduleLength).flatMap(_ => threadIds).toList.permutations.take(noOfSchedules).foreach {
val schedules = (new ScheduleGenerator(numThreads)).schedules()
var schedsExplored = 0
schedules.takeWhile(_ => schedsExplored <= noOfSchedules && timeout > 0).foreach {
//case _ if timeout <= 0 => // break
case schedule =>
schedsExplored += 1
val schedr = new Scheduler(schedule)
//println("Exploring Sched: "+schedule)
val (threadOps, assertion) = ops(schedr)
if (threadOps.size != numThreads)
throw new IllegalStateException(s"Number of threads: $numThreads, do not match operations of threads: $threadOps")
timed { schedr.runInParallel(schedTimeout * 1000, threadOps) } { t => timeout -= t } match {
case Timeout(msg) =>
throw new java.lang.AssertionError("assertion failed\n"+"The schedule took too long to complete. A possible deadlock! \n"+msg)
case Except(msg, stkTrace) =>
val traceStr = "Thread Stack trace: \n"+stkTrace.map(" at "+_.toString).mkString("\n")
throw new java.lang.AssertionError("assertion failed\n"+msg+"\n"+traceStr)
case RetVal(threadRes) =>
// check the assertion
val (success, custom_msg) = assertion(threadRes)
if (!success) {
val msg = "The following schedule resulted in wrong results: \n" + custom_msg + "\n" + schedr.getOperationLog().mkString("\n")
throw new java.lang.AssertionError("Assertion failed: "+msg)
}
}
}
if (timeout <= 0) {
throw new java.lang.AssertionError("Test took too long to complete! Cannot check all schedules as your code is too slow!")
}
}
/**
* A schedule generator that is based on the context bound
*/
class ScheduleGenerator(numThreads: Int) {
val scheduleLength = readWritesPerThread * numThreads
val rands = (1 to scheduleLength).map(i => new Random(0xcafe * i)) // random numbers for choosing a thread at each position
def schedules(): LazyList[List[Int]] = {
var contextSwitches = 0
var contexts = List[Int]() // a stack of thread ids in the order of context-switches
val remainingOps = MutableMap[Int, Int]()
remainingOps ++= (1 to numThreads).map(i => (i, readWritesPerThread)) // num ops remaining in each thread
val liveThreads = (1 to numThreads).toSeq.toBuffer
/**
* Updates remainingOps and liveThreads once a thread is chosen for a position in the schedule
*/
def updateState(tid: Int): Unit = {
val remOps = remainingOps(tid)
if (remOps == 0) {
liveThreads -= tid
} else {
remainingOps += (tid -> (remOps - 1))
}
}
val schedule = rands.foldLeft(List[Int]()) {
case (acc, r) if contextSwitches < contextSwitchBound =>
val tid = liveThreads(r.nextInt(liveThreads.size))
contexts match {
case prev :: tail if prev != tid => // we have a new context switch here
contexts +:= tid
contextSwitches += 1
case prev :: tail =>
case _ => // init case
contexts +:= tid
}
updateState(tid)
acc :+ tid
case (acc, _) => // here context-bound has been reached so complete the schedule without any more context switches
if (!contexts.isEmpty) {
contexts = contexts.dropWhile(remainingOps(_) == 0)
}
val tid = contexts match {
case top :: tail => top
case _ => liveThreads(0) // here, there has to be threads that have not even started
}
updateState(tid)
acc :+ tid
}
schedule #:: schedules()
}
}
}
package f3.instrumentation
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object TestUtils {
def failsOrTimesOut[T](action: => T): Boolean = {
val asyncAction = Future {
action
}
try {
Await.result(asyncAction, 2000.millisecond)
} catch {
case _: Throwable => return true
}
return false
}
}
package f3
import instrumentation._
class SchedulableThreadMap[A](val scheduler: Scheduler) extends ThreadMap[A] with MockedMonitor:
override def currentThreadHasValue: Boolean = scheduler.exec {
super.currentThreadHasValue
} ("", Some(res => s"currentThreadHasValue is $res"))
override def currentThreadValue: Option[A] = scheduler.exec {
super.currentThreadValue
} ("", Some(res => s"currentThreadValue is $res"))
override def setCurrentThreadValue(value: A): Unit = scheduler.exec {
super.setCurrentThreadValue(value)
} (s"setCurrentThreadValue($value)")
override def deleteCurrentThreadValue(): Unit = scheduler.exec {
super.deleteCurrentThreadValue()
} ("deleteCurrentThreadValue()")
override def waitForall(predicate: A => Boolean): Unit = scheduler.exec {
super.waitForall(predicate)
} ("waitForall")
def allValues: List[A] = synchronized {
theMap.values.toList
}
end SchedulableThreadMap
class SchedulableRCU(scheduler: Scheduler) extends RCU with LockFreeMonitor:
override protected val latestVersion = SchedulableAtomicLong(0, scheduler, "latestVersion")
override protected val readersVersion: ThreadMap[Long] = SchedulableThreadMap(scheduler)
class SchedulableInMemoryFileSystem(scheduler: Scheduler) extends InMemoryFileSystem:
override def createFile(file: FileName, content: String): Unit = scheduler.exec {
super.createFile(file, content)
} (s"createFile($file)")
override def readFile(file: FileName): String = scheduler.exec {
super.readFile(file)
} (s"readFile($file)")
override def deleteFile(file: FileName): Unit = scheduler.exec {
super.deleteFile(file)
} (s"deleteFile($file)")
class SchedulableUpdateServer(scheduler: Scheduler, fs: InMemoryFileSystem) extends UpdateServer(fs) with LockFreeMonitor:
override val rcu = SchedulableRCU(scheduler)
class SchedulableAtomicLong(initial: Long, scheduler: Scheduler, name: String) extends AtomicLong(initial):
override def get: Long = scheduler.exec {
super.get
} (s"", Some(res => s"$name: get $res"))
override def set(value: Long): Unit = scheduler.exec {
super.set(value)
} (s"$name: set $value", None)
override def incrementAndGet(): Long = scheduler.exec {
super.incrementAndGet()
} (s"", Some(res => s"$name: incrementAndGet $res"))
override def getAndIncrement(): Long = scheduler.exec {
super.getAndIncrement()
} (s"", Some(res => s"$name: getandIncrement $res"))
override def compareAndSet(expected: Long, newValue: Long): Boolean =
scheduler.exec {
super.compareAndSet(expected, newValue)
} (s"$name: compareAndSet(expected = $expected, newValue = $newValue)",
Some(res => s"$name: Did it set? $res") )
end SchedulableAtomicLong
class SchedulableAtomicReference[T](initial: T, scheduler: Scheduler, name: String) extends AtomicReference(initial):
override def get: T = scheduler.exec {
super.get
} (s"", Some(res => s"$name: get $res"))
override def set(value: T): Unit = scheduler.exec {
super.set(value)
} (s"$name: set $value", None)
# General
*.DS_Store
*.swp
*~
# Dotty
*.class
*.tasty
*.hasTasty
# sbt
target/
# IDE
.bsp
.bloop
.metals
.vscode
# datasets
stackoverflow-grading.csv
wikipedia-grading.dat