Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • lamp/cs206
  • bwermeil/cs206-2020
  • zabifade/cs206-2020
  • cauderan/cs206-2020
  • malonga/cs206-2020
  • dumoncel/cs206
  • bounekhe/cs206
  • bergerault/cs206
  • flealsan/cs206
  • hsu/cs206
  • mouchel/cs206
  • vebraun/cs206
  • vcanard/cs206
  • ybelghmi/cs206
  • belghmi/cs206
  • bousbina/cs206
  • waked/cs206
  • gtagemou/cs206
  • arahmoun/cs206
  • elhachem/cs206
  • benrahha/cs206
  • benslima/cs206
22 results
Select Git revision
Show changes
Showing
with 0 additions and 1101 deletions
// addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.28")
addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.8.8")
addSbtPlugin("ch.epfl.lamp" % "sbt-dotty" % "0.5.3")
package f2
import akka.actor._
import scala.collection.mutable
import akka.testkit.*
object F2 {
//////////////////////////////
// NOTIFICATION SERVICE //
//////////////////////////////
object NotificationService {
enum Protocol:
/** Notify all registered actors */
case NotifyAll
/** Register the actor that sent the `Register` request */
case Register //
/** Un-register the actor that sent the `Register` request */
case UnRegister
enum Responses:
/** Message sent to an actor when it is notified */
case Notification
/** Response sent to an actor after a `Register` or `UnRegister` */
case Registered(registered: Boolean)
}
class NotificationService extends Actor {
import NotificationService.Protocol.*
import NotificationService.Responses.*
private val registeredUsers = mutable.Set.empty[ActorRef]
def receive: Receive = {
case Register =>
registeredUsers += sender
sender ! Registered(true)
case UnRegister =>
registeredUsers -= sender
sender ! Registered(false)
case NotifyAll =>
for user <- registeredUsers do
user ! Notification
}
}
/////////////////////////
// DISCORD CHANNEL //
/////////////////////////
object DiscordChannel {
enum Protocol:
/** Post a message in the channel */
case Post(msg: String)
/** Ask for the list of most recent posts starting from the most recent one.
* The list must have at most `limit` posts.
*/
case GetLastPosts(limit: Int)
/** Activates the service channel using the provided notification service. */
case Init(notificationService: ActorRef)
enum Responses:
/** Response to `GetLastPosts` if active */
case Posts(msgs: List[String])
/** Response after `Init` if non-active */
case Active
/** Response `Post` and `GetLastPosts` if non-active */
case NotActive
/** Response after `Init` if active */
case AlreadyActive
}
class DiscordChannel extends Actor {
import DiscordChannel.Protocol.*
import DiscordChannel.Responses.*
import NotificationService.Protocol.*
private var messages: List[String] = Nil
def receive: Receive = nonActive
def nonActive: Receive = {
case Init(service) =>
context.become(active(service))
sender ! Active
case Post(_) | GetLastPosts(_) =>
sender ! NotActive
}
def active(notificationService: ActorRef): Receive = {
case Post(msg) =>
messages = msg :: messages
notificationService ! NotifyAll
case GetLastPosts(limit) =>
sender ! Posts(messages.take(limit))
case Init(_) =>
sender ! AlreadyActive
}
}
}
/////////////////////////
// DEBUG //
/////////////////////////
/** Infrastructure to help debugging. In sbt use `run` to execute this code.
* The TestKit is an actor that can send messages and check the messages it receives (or not).
*/
@main def debug() = new TestKit(ActorSystem("DebugSystem")) with ImplicitSender {
import F2.*
import DiscordChannel.Protocol.*
import DiscordChannel.Responses.*
import NotificationService.Protocol.*
import NotificationService.Responses.*
import concurrent.duration.*
try
val notificationService = system.actorOf(Props[NotificationService])
val channel = system.actorOf(Props[DiscordChannel])
notificationService ! NotifyAll
expectNoMessage(200.millis) // expects no message is received in the next 200 milliseconds
notificationService ! Register
expectMsg(200.millis, Registered(true)) // expects to receive `Registered(true)` in the next 200 milliseconds
finally shutdown(system)
}
package f2
import akka.actor._
import akka.testkit.*
import scala.collection.mutable
import concurrent.duration.*
import F2.*
class F2Suite extends munit.FunSuite {
import NotificationService.Protocol.*
import NotificationService.Responses.*
import DiscordChannel.Protocol.*
import DiscordChannel.Responses.*
test("Notification register (1pts)") {
new MyTestKit {
def tests() = {
val actor = system.actorOf(Props[NotificationService])
actor ! Register
expectMsg(2.second, Registered(true))
}
}
}
test("Notification register and un-register (1pts)") {
new MyTestKit {
def tests() = {
val actor = system.actorOf(Props[NotificationService])
actor ! Register
expectMsg(2.second, Registered(true))
actor ! UnRegister
expectMsg(2.second, Registered(false))
actor ! UnRegister
expectMsg(2.second, Registered(false))
actor ! Register
expectMsg(2.second, Registered(true))
actor ! UnRegister
expectMsg(2.second, Registered(false))
}
}
}
test("Notification notify (1pts)") {
new MyTestKit {
def tests() = {
val actor = system.actorOf(Props[NotificationService])
actor ! Register
expectMsg(2.second, Registered(true))
actor ! NotifyAll
expectMsg(2.second, Notification)
actor ! NotifyAll
expectMsg(2.second, Notification)
actor ! UnRegister
expectMsg(2.second, Registered(false))
actor ! NotifyAll
expectNoMessage(500.millis)
actor ! Register
expectMsg(2.second, Registered(true))
actor ! NotifyAll
expectMsg(2.second, Notification)
actor ! UnRegister
expectMsg(2.second, Registered(false))
actor ! NotifyAll
expectNoMessage(500.millis)
}
}
}
test("NotifyAll from other actor (1pts)") {
new MyTestKit {
def tests() = {
val actor = system.actorOf(Props[NotificationService])
val otherActor = system.actorOf(Props[DummyActor])
def notifyFormAllFromOtherActor() = {
given ActorRef = otherActor
actor ! NotifyAll
}
expectNoMessage(500.millis)
actor ! Register
expectMsg(2.second, Registered(true))
notifyFormAllFromOtherActor()
expectMsg(2.second, Notification)
}
}
}
test("Channel init (1pts)") {
new MyTestKit {
def tests() = {
val notificationService = system.actorOf(Props[NotificationService])
val channel = system.actorOf(Props[DiscordChannel])
channel ! Init(notificationService)
expectMsg(2.second, Active)
}
}
}
test("Channel post and get post (1pts)") {
new MyTestKit {
def tests() = {
val notificationService = system.actorOf(Props[NotificationService])
val channel = system.actorOf(Props[DiscordChannel])
channel ! Init(notificationService)
expectMsg(2.second, Active)
channel ! Post("hello")
channel ! GetLastPosts(1)
expectMsg(2.second, Posts(List("hello")))
channel ! GetLastPosts(10)
expectMsg(2.second, Posts(List("hello")))
channel ! GetLastPosts(0)
expectMsg(2.second, Posts(Nil))
}
}
}
test("Channel multiple posts (1pts)") {
new MyTestKit {
def tests() = {
val notificationService = system.actorOf(Props[NotificationService])
val channel = system.actorOf(Props[DiscordChannel])
channel ! Init(notificationService)
expectMsg(2.second, Active)
channel ! Post("hello")
channel ! Post("world")
channel ! GetLastPosts(2)
channel ! GetLastPosts(1)
channel ! Post("!")
channel ! GetLastPosts(3)
expectMsg(2.second, Posts(List("world", "hello")))
expectMsg(2.second, Posts(List("world")))
expectMsg(2.second, Posts(List("!", "world", "hello")))
}
}
}
test("Channel posts and notify (1pts)") {
new MyTestKit {
def tests() = {
val notificationService = system.actorOf(Props[NotificationService])
val channel = system.actorOf(Props[DiscordChannel])
channel ! Init(notificationService)
expectMsg(2.second, Active)
notificationService ! Register
expectMsg(2.second, Registered(true))
channel ! Post("hello")
channel ! Post("world")
expectMsg(2.second, Notification)
expectMsg(2.second, Notification)
}
}
}
test("Channel init twice (1pts)") {
new MyTestKit {
def tests() = {
val notificationService = system.actorOf(Props[NotificationService])
val channel = system.actorOf(Props[DiscordChannel])
channel ! Init(notificationService)
expectMsg(2.second, Active)
channel ! Init(notificationService)
expectMsg(2.second, AlreadyActive)
channel ! Init(notificationService)
expectMsg(2.second, AlreadyActive)
}
}
}
test("Channel not active (1pts)") {
new MyTestKit {
def tests() = {
val channel1 = system.actorOf(Props[DiscordChannel])
channel1 ! Post("hello")
expectMsg(2.second, NotActive)
val channel2 = system.actorOf(Props[DiscordChannel])
channel2 ! GetLastPosts(0)
expectMsg(2.second, NotActive)
}
}
}
abstract class MyTestKit extends TestKit(ActorSystem("TestSystem")) with ImplicitSender {
def tests(): Unit
try tests() finally shutdown(system)
}
}
class DummyActor extends Actor {
def receive: Receive = {
case _ => ()
}
}
# General
*.DS_Store
*.swp
*~
# Dotty
*.class
*.tasty
*.hasTasty
# sbt
target/
# IDE
.bsp
.bloop
.metals
.vscode
# datasets
stackoverflow-grading.csv
wikipedia-grading.dat
// Student tasks (i.e. submit, packageSubmission)
enablePlugins(StudentTasks)
course := "final"
assignment := "f3"
scalaVersion := "3.0.0-RC1"
scalacOptions ++= Seq("-language:implicitConversions", "-deprecation")
libraryDependencies += "org.scalameta" %% "munit" % "0.7.22"
val MUnitFramework = new TestFramework("munit.Framework")
testFrameworks += MUnitFramework
// Decode Scala names
testOptions += Tests.Argument(MUnitFramework, "-s")
testSuite := "f3.F3Suite"
File deleted
package sbt // To access the private[sbt] compilerReporter key
package filteringReporterPlugin
import Keys._
import ch.epfl.lamp._
object FilteringReporterPlugin extends AutoPlugin {
override lazy val projectSettings = Seq(
// Turn off warning coming from scalameter that we cannot fix without changing scalameter
compilerReporter in (Compile, compile) ~= { reporter => new FilteringReporter(reporter) }
)
}
class FilteringReporter(reporter: xsbti.Reporter) extends xsbti.Reporter {
def reset(): Unit = reporter.reset()
def hasErrors: Boolean = reporter.hasErrors
def hasWarnings: Boolean = reporter.hasWarnings
def printSummary(): Unit = reporter.printSummary()
def problems: Array[xsbti.Problem] = reporter.problems
def log(problem: xsbti.Problem): Unit = {
if (!problem.message.contains("An existential type that came from a Scala-2 classfile cannot be"))
reporter.log(problem)
}
def comment(pos: xsbti.Position, msg: String): Unit =
reporter.comment(pos, msg)
override def toString = s"CollectingReporter($reporter)"
}
package ch.epfl.lamp
import sbt._
import sbt.Keys._
/**
* Coursera uses two versions of each assignment. They both have the same assignment key and part id but have
* different item ids.
*
* @param key Assignment key
* @param partId Assignment partId
* @param itemId Item id of the non premium version
* @param premiumItemId Item id of the premium version (`None` if the assignment is optional)
*/
case class CourseraId(key: String, partId: String, itemId: String, premiumItemId: Option[String])
/**
* Settings shared by all assignments, reused in various tasks.
*/
object MOOCSettings extends AutoPlugin {
override def requires = super.requires && filteringReporterPlugin.FilteringReporterPlugin
object autoImport {
val course = SettingKey[String]("course")
val assignment = SettingKey[String]("assignment")
val options = SettingKey[Map[String, Map[String, String]]]("options")
val courseraId = settingKey[CourseraId]("Coursera-specific information identifying the assignment")
val testSuite = settingKey[String]("Fully qualified name of the test suite of this assignment")
.withRank(KeyRanks.Invisible)
// Convenient alias
type CourseraId = ch.epfl.lamp.CourseraId
val CourseraId = ch.epfl.lamp.CourseraId
}
import autoImport._
override val globalSettings: Seq[Def.Setting[_]] = Seq(
// supershell is verbose, buggy and useless.
useSuperShell := false
)
override val projectSettings: Seq[Def.Setting[_]] = Seq(
parallelExecution in Test := false,
// Report test result after each test instead of waiting for every test to finish
logBuffered in Test := false,
name := s"${course.value}-${assignment.value}"
)
}
package ch.epfl.lamp
import sbt._
import Keys._
// import scalaj.http._
import java.io.{File, FileInputStream, IOException}
import org.apache.commons.codec.binary.Base64
// import play.api.libs.json.{Json, JsObject, JsPath}
import scala.util.{Failure, Success, Try}
/**
* Provides tasks for submitting the assignment
*/
object StudentTasks extends AutoPlugin {
override def requires = super.requires && MOOCSettings
object autoImport {
val packageSourcesOnly = TaskKey[File]("packageSourcesOnly", "Package the sources of the project")
val packageBinWithoutResources = TaskKey[File]("packageBinWithoutResources", "Like packageBin, but without the resources")
val packageSubmissionZip = TaskKey[File]("packageSubmissionZip")
val packageSubmission = inputKey[Unit]("package solution as an archive file")
lazy val Grading = config("grading") extend(Runtime)
}
import autoImport._
import MOOCSettings.autoImport._
override lazy val projectSettings = Seq(
packageSubmissionSetting,
fork := true,
connectInput in run := true,
outputStrategy := Some(StdoutOutput),
) ++
packageSubmissionZipSettings ++
inConfig(Grading)(Defaults.testSettings ++ Seq(
unmanagedJars += file("grading-tests.jar"),
definedTests := (definedTests in Test).value,
internalDependencyClasspath := (internalDependencyClasspath in Test).value
))
/** **********************************************************
* SUBMITTING A SOLUTION TO COURSERA
*/
val packageSubmissionZipSettings = Seq(
packageSubmissionZip := {
val submission = crossTarget.value / "submission.zip"
val sources = (packageSourcesOnly in Compile).value
val binaries = (packageBinWithoutResources in Compile).value
IO.zip(Seq(sources -> "sources.zip", binaries -> "binaries.jar"), submission, None)
submission
},
artifactClassifier in packageSourcesOnly := Some("sources"),
artifact in (Compile, packageBinWithoutResources) ~= (art => art.withName(art.name + "-without-resources"))
) ++
inConfig(Compile)(
Defaults.packageTaskSettings(packageSourcesOnly, Defaults.sourceMappings) ++
Defaults.packageTaskSettings(packageBinWithoutResources, Def.task {
val relativePaths =
(unmanagedResources in Compile).value.flatMap(Path.relativeTo((unmanagedResourceDirectories in Compile).value)(_))
(mappings in (Compile, packageBin)).value.filterNot { case (_, path) => relativePaths.contains(path) }
})
)
val maxSubmitFileSize = {
val mb = 1024 * 1024
10 * mb
}
/** Check that the jar exists, isn't empty, isn't crazy big, and can be read
* If so, encode jar as base64 so we can send it to Coursera
*/
def prepareJar(jar: File, s: TaskStreams): String = {
val errPrefix = "Error submitting assignment jar: "
val fileLength = jar.length()
if (!jar.exists()) {
s.log.error(errPrefix + "jar archive does not exist\n" + jar.getAbsolutePath)
failSubmit()
} else if (fileLength == 0L) {
s.log.error(errPrefix + "jar archive is empty\n" + jar.getAbsolutePath)
failSubmit()
} else if (fileLength > maxSubmitFileSize) {
s.log.error(errPrefix + "jar archive is too big. Allowed size: " +
maxSubmitFileSize + " bytes, found " + fileLength + " bytes.\n" +
jar.getAbsolutePath)
failSubmit()
} else {
val bytes = new Array[Byte](fileLength.toInt)
val sizeRead = try {
val is = new FileInputStream(jar)
val read = is.read(bytes)
is.close()
read
} catch {
case ex: IOException =>
s.log.error(errPrefix + "failed to read sources jar archive\n" + ex.toString)
failSubmit()
}
if (sizeRead != bytes.length) {
s.log.error(errPrefix + "failed to read the sources jar archive, size read: " + sizeRead)
failSubmit()
} else encodeBase64(bytes)
}
}
/** Task to package solution to a given file path */
lazy val packageSubmissionSetting = packageSubmission := {
val args: Seq[String] = Def.spaceDelimited("[path]").parsed
val s: TaskStreams = streams.value // for logging
val jar = (packageSubmissionZip in Compile).value
val base64Jar = prepareJar(jar, s)
val path = args.headOption.getOrElse((baseDirectory.value / "submission.jar").absolutePath)
scala.tools.nsc.io.File(path).writeAll(base64Jar)
}
/*
/** Task to submit a solution to coursera */
val submit = inputKey[Unit]("submit solution to Coursera")
lazy val submitSetting = submit := {
// Fail if scalafix linting does not pass.
scalafixLinting.value
val args: Seq[String] = Def.spaceDelimited("<arg>").parsed
val s: TaskStreams = streams.value // for logging
val jar = (packageSubmissionZip in Compile).value
val assignmentDetails =
courseraId.?.value.getOrElse(throw new MessageOnlyException("This assignment can not be submitted to Coursera because the `courseraId` setting is undefined"))
val assignmentKey = assignmentDetails.key
val courseName =
course.value match {
case "capstone" => "scala-capstone"
case "bigdata" => "scala-spark-big-data"
case other => other
}
val partId = assignmentDetails.partId
val itemId = assignmentDetails.itemId
val premiumItemId = assignmentDetails.premiumItemId
val (email, secret) = args match {
case email :: secret :: Nil =>
(email, secret)
case _ =>
val inputErr =
s"""|Invalid input to `submit`. The required syntax for `submit` is:
|submit <email-address> <submit-token>
|
|The submit token is NOT YOUR LOGIN PASSWORD.
|It can be obtained from the assignment page:
|https://www.coursera.org/learn/$courseName/programming/$itemId
|${
premiumItemId.fold("") { id =>
s"""or (for premium learners):
|https://www.coursera.org/learn/$courseName/programming/$id
""".stripMargin
}
}
""".stripMargin
s.log.error(inputErr)
failSubmit()
}
val base64Jar = prepareJar(jar, s)
val json =
s"""|{
| "assignmentKey":"$assignmentKey",
| "submitterEmail":"$email",
| "secret":"$secret",
| "parts":{
| "$partId":{
| "output":"$base64Jar"
| }
| }
|}""".stripMargin
def postSubmission[T](data: String): Try[HttpResponse[String]] = {
val http = Http("https://www.coursera.org/api/onDemandProgrammingScriptSubmissions.v1")
val hs = List(
("Cache-Control", "no-cache"),
("Content-Type", "application/json")
)
s.log.info("Connecting to Coursera...")
val response = Try(http.postData(data)
.headers(hs)
.option(HttpOptions.connTimeout(10000)) // scalaj default timeout is only 100ms, changing that to 10s
.asString) // kick off HTTP POST
response
}
val connectMsg =
s"""|Attempting to submit "${assignment.value}" assignment in "$courseName" course
|Using:
|- email: $email
|- submit token: $secret""".stripMargin
s.log.info(connectMsg)
def reportCourseraResponse(response: HttpResponse[String]): Unit = {
val code = response.code
val respBody = response.body
/* Sample JSON response from Coursera
{
"message": "Invalid email or token.",
"details": {
"learnerMessage": "Invalid email or token."
}
}
*/
// Success, Coursera responds with 2xx HTTP status code
if (response.is2xx) {
val successfulSubmitMsg =
s"""|Successfully connected to Coursera. (Status $code)
|
|Assignment submitted successfully!
|
|You can see how you scored by going to:
|https://www.coursera.org/learn/$courseName/programming/$itemId/
|${
premiumItemId.fold("") { id =>
s"""or (for premium learners):
|https://www.coursera.org/learn/$courseName/programming/$id
""".stripMargin
}
}
|and clicking on "My Submission".""".stripMargin
s.log.info(successfulSubmitMsg)
}
// Failure, Coursera responds with 4xx HTTP status code (client-side failure)
else if (response.is4xx) {
val result = Try(Json.parse(respBody)).toOption
val learnerMsg = result match {
case Some(resp: JsObject) =>
(JsPath \ "details" \ "learnerMessage").read[String].reads(resp).get
case Some(x) => // shouldn't happen
"Could not parse Coursera's response:\n" + x
case None =>
"Could not parse Coursera's response:\n" + respBody
}
val failedSubmitMsg =
s"""|Submission failed.
|There was something wrong while attempting to submit.
|Coursera says:
|$learnerMsg (Status $code)""".stripMargin
s.log.error(failedSubmitMsg)
}
// Failure, Coursera responds with 5xx HTTP status code (server-side failure)
else if (response.is5xx) {
val failedSubmitMsg =
s"""|Submission failed.
|Coursera seems to be unavailable at the moment (Status $code)
|Check https://status.coursera.org/ and try again in a few minutes.
""".stripMargin
s.log.error(failedSubmitMsg)
}
// Failure, Coursera repsonds with an unexpected status code
else {
val failedSubmitMsg =
s"""|Submission failed.
|Coursera replied with an unexpected code (Status $code)
""".stripMargin
s.log.error(failedSubmitMsg)
}
}
// kick it all off, actually make request
postSubmission(json) match {
case Success(resp) => reportCourseraResponse(resp)
case Failure(e) =>
val failedConnectMsg =
s"""|Connection to Coursera failed.
|There was something wrong while attempting to connect to Coursera.
|Check your internet connection.
|${e.toString}""".stripMargin
s.log.error(failedConnectMsg)
}
}
*/
def failSubmit(): Nothing = {
sys.error("Submission failed")
}
/**
* *****************
* DEALING WITH JARS
*/
def encodeBase64(bytes: Array[Byte]): String =
new String(Base64.encodeBase64(bytes))
}
sbt.version=1.4.7
// Used for Coursera submission (StudentPlugin)
// libraryDependencies += "org.scalaj" %% "scalaj-http" % "2.4.2"
// libraryDependencies += "com.typesafe.play" %% "play-json" % "2.7.4"
// Used for Base64 (StudentPlugin)
libraryDependencies += "commons-codec" % "commons-codec" % "1.10"
// addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.28")
addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.8.8")
addSbtPlugin("ch.epfl.lamp" % "sbt-dotty" % "0.5.3")
package f3
import instrumentation._
import scala.collection.mutable
import scala.collection.concurrent.TrieMap
type FileName = String
/** An API for manipulating files. */
trait FileSystem:
/** Create a new file named `file` with the passed `content`. */
def createFile(file: FileName, content: String): Unit
/** If `file` exists, return its content, otherwise crashes. */
def readFile(file: FileName): String
/** If `file` exists, delete it, otherwise crash. */
def deleteFile(file: FileName): Unit
end FileSystem
/** An in-memory file system for testing purposes implemented using a Map.
*
* Every method in this class is thread-safe.
*/
class InMemoryFileSystem extends FileSystem:
val fsMap: mutable.Map[FileName, String] = TrieMap()
def createFile(file: FileName, content: String): Unit =
assert(!fsMap.contains(file), s"$file already exists")
fsMap(file) = content
def readFile(file: FileName): String =
fsMap.get(file) match
case Some(content) => content
case None => assert(false, s"Attempt to read non-existing $file")
def deleteFile(file: FileName): Unit =
fsMap.remove(file)
end InMemoryFileSystem
package f3
import instrumentation._
/** A synchronization mechanism allowing multiple reads to proceed concurrently
* with an update to the state.
*/
class RCU extends Monitor:
protected val latestVersion: AtomicLong = AtomicLong(0)
protected val readersVersion: ThreadMap[Long] = ThreadMap()
/** This method must be called before accessing shared data for reading. */
def startRead(): Unit =
assert(!readersVersion.currentThreadHasValue,
"startRead() cannot be called multiple times without an intervening stopRead()")
readersVersion.setCurrentThreadValue(latestVersion.get)
/** Once a thread which has previously called `startRead` has finished reading
* shared data, it must call this method.
*/
def stopRead(): Unit =
assert(readersVersion.currentThreadHasValue,
"stopRead() cannot be called without a preceding startRead()")
readersVersion.deleteCurrentThreadValue()
/** Wait until all reads started before this method was called have finished,
* then return.
*/
def waitForOldReads(): Unit =
val newVersion = latestVersion.incrementAndGet()
readersVersion.waitForall(_ >= newVersion)
package f3
import instrumentation._
import scala.collection.mutable
/** A map which associates every thread to at most one value of type A.
*
* Every method in this class is thread-safe.
*/
class ThreadMap[A] extends Monitor:
protected val theMap: mutable.Map[Thread, A] = mutable.Map()
/** Return the value in the map entry for the current thread if it exists,
* otherwise None. */
def currentThreadValue: Option[A] = synchronized {
theMap.get(Thread.currentThread)
}
/** Is there a map entry for the current thread? */
def currentThreadHasValue: Boolean =
synchronized {
theMap.contains(Thread.currentThread)
}
/** Set the map entry of the current thread to `value` and notify any thread
* waiting on `waitForall`. */
def setCurrentThreadValue(value: A): Unit =
synchronized {
theMap(Thread.currentThread) = value
notifyAll()
}
/** Delete the map entry associated with this thread (if it exists) and notify
* all threads waiting in `waitForall`. */
def deleteCurrentThreadValue(): Unit =
synchronized {
theMap.remove(Thread.currentThread)
notifyAll()
}
/** Wait until `predicate` returns true for all map entries, then return. */
def waitForall(predicate: A => Boolean): Unit =
synchronized {
while !theMap.forall((_, value) => predicate(value)) do
wait()
}
end ThreadMap
package f3
import instrumentation._
class UpdateServer(fs: FileSystem) extends Monitor:
val rcu = new RCU
/** The name of the file containing the latest update.
*
* This is `@volatile` to guarantee that `fetchUpdate` always sees the latest
* filename.
*/
@volatile private var updateFile: Option[FileName] = None
/** Return the content of the latest update if one is available, otherwise None.
*
* This method is thread-safe.
*/
def fetchUpdate(): Option[String] =
// TODO: use `rcu`
rcu.startRead()
val value = updateFile.map(fs.readFile)
rcu.stopRead()
value
/** Define a new update, more precisely this will:
* - Create a new update file called `newName` with content `newContent`
* - Ensure that any future call to `fetchUpdate` returns the new update
* content.
* - Delete the old update file.
*
* This method is _NOT_ thread-safe, it cannot be safely called from multiple
* threads at once.
*/
def newUpdate(newName: FileName, newContent: String): Unit =
// TODO: use `rcu`
val oldFile = updateFile
fs.createFile(newName, newContent)
updateFile = Some(newName)
rcu.waitForOldReads()
oldFile.foreach(fs.deleteFile)
end UpdateServer
package f3.instrumentation
/** A long value that may be updated atomically. */
class AtomicLong(initial: Long):
private val atomic = new java.util.concurrent.atomic.AtomicLong(initial)
/** Get the current value. */
def get: Long = atomic.get()
/** Set to the given `value`. */
def set(value: Long): Unit = atomic.set(value)
/** Atomically increment by one the current value and return the _original_ value. */
def getAndIncrement(): Long =
atomic.getAndIncrement()
/** Atomically increment by one the current value and return the _updated_ value. */
def incrementAndGet(): Long =
atomic.incrementAndGet()
/** Atomically set the value to `newValue` if the current value == `expected`.
*
* Return true if successful, otherwise return false to indicate that the
* actual value was not equal to the expected value.
*/
def compareAndSet(expected: Long, newValue: Long): Boolean =
atomic.compareAndSet(expected, newValue)
end AtomicLong
package f3.instrumentation
class Dummy
trait Monitor {
implicit val dummy: Dummy = new Dummy
def wait()(implicit i: Dummy) = waitDefault()
def synchronized[T](e: => T)(implicit i: Dummy) = synchronizedDefault(e)
def notify()(implicit i: Dummy) = notifyDefault()
def notifyAll()(implicit i: Dummy) = notifyAllDefault()
private val lock = new AnyRef
// Can be overridden.
def waitDefault(): Unit = lock.wait()
def synchronizedDefault[T](toExecute: =>T): T = lock.synchronized(toExecute)
def notifyDefault(): Unit = lock.notify()
def notifyAllDefault(): Unit = lock.notifyAll()
}
package f3
import scala.annotation.tailrec
import scala.concurrent._
import scala.concurrent.duration._
import scala.collection.mutable.HashMap
import scala.util.Random
import instrumentation._
import instrumentation.TestHelper._
import instrumentation.TestUtils._
class F3Suite extends munit.FunSuite:
test("Part 1: ThreadMap (3pts)") {
testManySchedules(4, sched =>
val tmap = new SchedulableThreadMap[Int](sched)
def writeThread(): Unit =
tmap.setCurrentThreadValue(0)
tmap.setCurrentThreadValue(-1)
val readBack = tmap.currentThreadValue
assertEquals(readBack, Some(-1))
def writeAndDeleteThread(): Unit =
tmap.setCurrentThreadValue(42)
tmap.deleteCurrentThreadValue()
@tailrec
def waitThread(): Unit =
tmap.waitForall(_ < 0)
val all = tmap.allValues
if all != List(-1) then
waitThread()
val threads = List(
() => writeThread(),
() => writeAndDeleteThread(),
() => waitThread(),
() => waitThread(),
)
(threads, _ => (true, ""))
)
}
test("Part 2: RCU (5pts)") {
testManySchedules(3, sched =>
val rcu = new SchedulableRCU(sched)
case class State(value: Int, isDeleted: AtomicLong = SchedulableAtomicLong(0, sched, "isDeleted"))
val sharedState = SchedulableAtomicReference(State(0), sched, "sharedState")
def readThread(): Unit =
rcu.startRead()
val state = sharedState.get
val stateWasDeleted = state.isDeleted.get != 0
assert(!stateWasDeleted, "RCU shared state deleted in the middle of a read.")
rcu.stopRead()
def writeThread(): Unit =
val oldState = sharedState.get
sharedState.set(State(oldState.value + 1))
rcu.waitForOldReads()
oldState.isDeleted.set(1)
val threads = List(
() => readThread(),
() => readThread(),
() => writeThread(),
)
(threads, _ => (true, ""))
)
}
test("Part 3: UpdateServer (2pts)") {
testManySchedules(3, sched =>
val fs = SchedulableInMemoryFileSystem(sched)
val server = new SchedulableUpdateServer(sched, fs)
def writeThread(): Unit =
server.newUpdate("update1.bin", "Update 1")
server.newUpdate("update2.bin", "Update 2")
assertEquals(fs.fsMap.toSet, Set("update2.bin" -> "Update 2"))
def fetchThread(): Unit =
val res = server.fetchUpdate()
assert(List(None, Some("Update 1"), Some("Update 2")).contains(res),
s"fetchUpdate returned unexpected value $res")
val threads = List(
() => writeThread(),
() => fetchThread(),
() => fetchThread(),
)
(threads, _ => (true, ""))
)
}
import scala.concurrent.duration._
override val munitTimeout = 200.seconds
end F3Suite