Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • lamp/cs206
  • bwermeil/cs206-2020
  • zabifade/cs206-2020
  • cauderan/cs206-2020
  • malonga/cs206-2020
  • dumoncel/cs206
  • bounekhe/cs206
  • bergerault/cs206
  • flealsan/cs206
  • hsu/cs206
  • mouchel/cs206
  • vebraun/cs206
  • vcanard/cs206
  • ybelghmi/cs206
  • belghmi/cs206
  • bousbina/cs206
  • waked/cs206
  • gtagemou/cs206
  • arahmoun/cs206
  • elhachem/cs206
  • benrahha/cs206
  • benslima/cs206
22 results
Show changes
Showing
with 0 additions and 40866 deletions
// Student tasks (i.e. submit, packageSubmission)
enablePlugins(StudentTasks)
course := "final"
assignment := "f4"
scalaVersion := "3.0.0-RC1"
scalacOptions ++= Seq("-language:implicitConversions", "-deprecation")
libraryDependencies ++= Seq(
("org.apache.spark" %% "spark-core" % "3.2.0-SNAPSHOT").withDottyCompat(scalaVersion.value),
)
// Contains Spark 3 snapshot built against 2.13: https://github.com/smarter/spark/tree/scala-2.13
resolvers += "Spark Snapshots Copy" at "https://scala-webapps.epfl.ch/artifactory/spark-snapshot/"
libraryDependencies += "org.scalameta" %% "munit" % "0.7.22"
val MUnitFramework = new TestFramework("munit.Framework")
testFrameworks += MUnitFramework
// Decode Scala names
testOptions += Tests.Argument(MUnitFramework, "-s")
testSuite := "f4.F4Suite"
// Without forking, ctrl-c doesn't actually fully stop Spark
fork in run := true
fork in Test := true
File deleted
package sbt // To access the private[sbt] compilerReporter key
package filteringReporterPlugin
import Keys._
import ch.epfl.lamp._
object FilteringReporterPlugin extends AutoPlugin {
override lazy val projectSettings = Seq(
// Turn off warning coming from scalameter that we cannot fix without changing scalameter
compilerReporter in (Compile, compile) ~= { reporter => new FilteringReporter(reporter) }
)
}
class FilteringReporter(reporter: xsbti.Reporter) extends xsbti.Reporter {
def reset(): Unit = reporter.reset()
def hasErrors: Boolean = reporter.hasErrors
def hasWarnings: Boolean = reporter.hasWarnings
def printSummary(): Unit = reporter.printSummary()
def problems: Array[xsbti.Problem] = reporter.problems
def log(problem: xsbti.Problem): Unit = {
if (!problem.message.contains("An existential type that came from a Scala-2 classfile cannot be"))
reporter.log(problem)
}
def comment(pos: xsbti.Position, msg: String): Unit =
reporter.comment(pos, msg)
override def toString = s"CollectingReporter($reporter)"
}
package ch.epfl.lamp
import sbt._
import sbt.Keys._
/**
* Coursera uses two versions of each assignment. They both have the same assignment key and part id but have
* different item ids.
*
* @param key Assignment key
* @param partId Assignment partId
* @param itemId Item id of the non premium version
* @param premiumItemId Item id of the premium version (`None` if the assignment is optional)
*/
case class CourseraId(key: String, partId: String, itemId: String, premiumItemId: Option[String])
/**
* Settings shared by all assignments, reused in various tasks.
*/
object MOOCSettings extends AutoPlugin {
override def requires = super.requires && filteringReporterPlugin.FilteringReporterPlugin
object autoImport {
val course = SettingKey[String]("course")
val assignment = SettingKey[String]("assignment")
val options = SettingKey[Map[String, Map[String, String]]]("options")
val courseraId = settingKey[CourseraId]("Coursera-specific information identifying the assignment")
val testSuite = settingKey[String]("Fully qualified name of the test suite of this assignment")
.withRank(KeyRanks.Invisible)
// Convenient alias
type CourseraId = ch.epfl.lamp.CourseraId
val CourseraId = ch.epfl.lamp.CourseraId
}
import autoImport._
override val globalSettings: Seq[Def.Setting[_]] = Seq(
// supershell is verbose, buggy and useless.
useSuperShell := false
)
override val projectSettings: Seq[Def.Setting[_]] = Seq(
parallelExecution in Test := false,
// Report test result after each test instead of waiting for every test to finish
logBuffered in Test := false,
name := s"${course.value}-${assignment.value}"
)
}
package ch.epfl.lamp
import sbt._
import Keys._
// import scalaj.http._
import java.io.{File, FileInputStream, IOException}
import org.apache.commons.codec.binary.Base64
// import play.api.libs.json.{Json, JsObject, JsPath}
import scala.util.{Failure, Success, Try}
/**
* Provides tasks for submitting the assignment
*/
object StudentTasks extends AutoPlugin {
override def requires = super.requires && MOOCSettings
object autoImport {
val packageSourcesOnly = TaskKey[File]("packageSourcesOnly", "Package the sources of the project")
val packageBinWithoutResources = TaskKey[File]("packageBinWithoutResources", "Like packageBin, but without the resources")
val packageSubmissionZip = TaskKey[File]("packageSubmissionZip")
val packageSubmission = inputKey[Unit]("package solution as an archive file")
lazy val Grading = config("grading") extend(Runtime)
}
import autoImport._
import MOOCSettings.autoImport._
override lazy val projectSettings = Seq(
packageSubmissionSetting,
fork := true,
connectInput in run := true,
outputStrategy := Some(StdoutOutput),
) ++
packageSubmissionZipSettings ++
inConfig(Grading)(Defaults.testSettings ++ Seq(
unmanagedJars += file("grading-tests.jar"),
definedTests := (definedTests in Test).value,
internalDependencyClasspath := (internalDependencyClasspath in Test).value
))
/** **********************************************************
* SUBMITTING A SOLUTION TO COURSERA
*/
val packageSubmissionZipSettings = Seq(
packageSubmissionZip := {
val submission = crossTarget.value / "submission.zip"
val sources = (packageSourcesOnly in Compile).value
val binaries = (packageBinWithoutResources in Compile).value
IO.zip(Seq(sources -> "sources.zip", binaries -> "binaries.jar"), submission, None)
submission
},
artifactClassifier in packageSourcesOnly := Some("sources"),
artifact in (Compile, packageBinWithoutResources) ~= (art => art.withName(art.name + "-without-resources"))
) ++
inConfig(Compile)(
Defaults.packageTaskSettings(packageSourcesOnly, Defaults.sourceMappings) ++
Defaults.packageTaskSettings(packageBinWithoutResources, Def.task {
val relativePaths =
(unmanagedResources in Compile).value.flatMap(Path.relativeTo((unmanagedResourceDirectories in Compile).value)(_))
(mappings in (Compile, packageBin)).value.filterNot { case (_, path) => relativePaths.contains(path) }
})
)
val maxSubmitFileSize = {
val mb = 1024 * 1024
10 * mb
}
/** Check that the jar exists, isn't empty, isn't crazy big, and can be read
* If so, encode jar as base64 so we can send it to Coursera
*/
def prepareJar(jar: File, s: TaskStreams): String = {
val errPrefix = "Error submitting assignment jar: "
val fileLength = jar.length()
if (!jar.exists()) {
s.log.error(errPrefix + "jar archive does not exist\n" + jar.getAbsolutePath)
failSubmit()
} else if (fileLength == 0L) {
s.log.error(errPrefix + "jar archive is empty\n" + jar.getAbsolutePath)
failSubmit()
} else if (fileLength > maxSubmitFileSize) {
s.log.error(errPrefix + "jar archive is too big. Allowed size: " +
maxSubmitFileSize + " bytes, found " + fileLength + " bytes.\n" +
jar.getAbsolutePath)
failSubmit()
} else {
val bytes = new Array[Byte](fileLength.toInt)
val sizeRead = try {
val is = new FileInputStream(jar)
val read = is.read(bytes)
is.close()
read
} catch {
case ex: IOException =>
s.log.error(errPrefix + "failed to read sources jar archive\n" + ex.toString)
failSubmit()
}
if (sizeRead != bytes.length) {
s.log.error(errPrefix + "failed to read the sources jar archive, size read: " + sizeRead)
failSubmit()
} else encodeBase64(bytes)
}
}
/** Task to package solution to a given file path */
lazy val packageSubmissionSetting = packageSubmission := {
val args: Seq[String] = Def.spaceDelimited("[path]").parsed
val s: TaskStreams = streams.value // for logging
val jar = (packageSubmissionZip in Compile).value
val base64Jar = prepareJar(jar, s)
val path = args.headOption.getOrElse((baseDirectory.value / "submission.jar").absolutePath)
scala.tools.nsc.io.File(path).writeAll(base64Jar)
}
/*
/** Task to submit a solution to coursera */
val submit = inputKey[Unit]("submit solution to Coursera")
lazy val submitSetting = submit := {
// Fail if scalafix linting does not pass.
scalafixLinting.value
val args: Seq[String] = Def.spaceDelimited("<arg>").parsed
val s: TaskStreams = streams.value // for logging
val jar = (packageSubmissionZip in Compile).value
val assignmentDetails =
courseraId.?.value.getOrElse(throw new MessageOnlyException("This assignment can not be submitted to Coursera because the `courseraId` setting is undefined"))
val assignmentKey = assignmentDetails.key
val courseName =
course.value match {
case "capstone" => "scala-capstone"
case "bigdata" => "scala-spark-big-data"
case other => other
}
val partId = assignmentDetails.partId
val itemId = assignmentDetails.itemId
val premiumItemId = assignmentDetails.premiumItemId
val (email, secret) = args match {
case email :: secret :: Nil =>
(email, secret)
case _ =>
val inputErr =
s"""|Invalid input to `submit`. The required syntax for `submit` is:
|submit <email-address> <submit-token>
|
|The submit token is NOT YOUR LOGIN PASSWORD.
|It can be obtained from the assignment page:
|https://www.coursera.org/learn/$courseName/programming/$itemId
|${
premiumItemId.fold("") { id =>
s"""or (for premium learners):
|https://www.coursera.org/learn/$courseName/programming/$id
""".stripMargin
}
}
""".stripMargin
s.log.error(inputErr)
failSubmit()
}
val base64Jar = prepareJar(jar, s)
val json =
s"""|{
| "assignmentKey":"$assignmentKey",
| "submitterEmail":"$email",
| "secret":"$secret",
| "parts":{
| "$partId":{
| "output":"$base64Jar"
| }
| }
|}""".stripMargin
def postSubmission[T](data: String): Try[HttpResponse[String]] = {
val http = Http("https://www.coursera.org/api/onDemandProgrammingScriptSubmissions.v1")
val hs = List(
("Cache-Control", "no-cache"),
("Content-Type", "application/json")
)
s.log.info("Connecting to Coursera...")
val response = Try(http.postData(data)
.headers(hs)
.option(HttpOptions.connTimeout(10000)) // scalaj default timeout is only 100ms, changing that to 10s
.asString) // kick off HTTP POST
response
}
val connectMsg =
s"""|Attempting to submit "${assignment.value}" assignment in "$courseName" course
|Using:
|- email: $email
|- submit token: $secret""".stripMargin
s.log.info(connectMsg)
def reportCourseraResponse(response: HttpResponse[String]): Unit = {
val code = response.code
val respBody = response.body
/* Sample JSON response from Coursera
{
"message": "Invalid email or token.",
"details": {
"learnerMessage": "Invalid email or token."
}
}
*/
// Success, Coursera responds with 2xx HTTP status code
if (response.is2xx) {
val successfulSubmitMsg =
s"""|Successfully connected to Coursera. (Status $code)
|
|Assignment submitted successfully!
|
|You can see how you scored by going to:
|https://www.coursera.org/learn/$courseName/programming/$itemId/
|${
premiumItemId.fold("") { id =>
s"""or (for premium learners):
|https://www.coursera.org/learn/$courseName/programming/$id
""".stripMargin
}
}
|and clicking on "My Submission".""".stripMargin
s.log.info(successfulSubmitMsg)
}
// Failure, Coursera responds with 4xx HTTP status code (client-side failure)
else if (response.is4xx) {
val result = Try(Json.parse(respBody)).toOption
val learnerMsg = result match {
case Some(resp: JsObject) =>
(JsPath \ "details" \ "learnerMessage").read[String].reads(resp).get
case Some(x) => // shouldn't happen
"Could not parse Coursera's response:\n" + x
case None =>
"Could not parse Coursera's response:\n" + respBody
}
val failedSubmitMsg =
s"""|Submission failed.
|There was something wrong while attempting to submit.
|Coursera says:
|$learnerMsg (Status $code)""".stripMargin
s.log.error(failedSubmitMsg)
}
// Failure, Coursera responds with 5xx HTTP status code (server-side failure)
else if (response.is5xx) {
val failedSubmitMsg =
s"""|Submission failed.
|Coursera seems to be unavailable at the moment (Status $code)
|Check https://status.coursera.org/ and try again in a few minutes.
""".stripMargin
s.log.error(failedSubmitMsg)
}
// Failure, Coursera repsonds with an unexpected status code
else {
val failedSubmitMsg =
s"""|Submission failed.
|Coursera replied with an unexpected code (Status $code)
""".stripMargin
s.log.error(failedSubmitMsg)
}
}
// kick it all off, actually make request
postSubmission(json) match {
case Success(resp) => reportCourseraResponse(resp)
case Failure(e) =>
val failedConnectMsg =
s"""|Connection to Coursera failed.
|There was something wrong while attempting to connect to Coursera.
|Check your internet connection.
|${e.toString}""".stripMargin
s.log.error(failedConnectMsg)
}
}
*/
def failSubmit(): Nothing = {
sys.error("Submission failed")
}
/**
* *****************
* DEALING WITH JARS
*/
def encodeBase64(bytes: Array[Byte]): String =
new String(Base64.encodeBase64(bytes))
}
sbt.version=1.4.7
// Used for Coursera submission (StudentPlugin)
// libraryDependencies += "org.scalaj" %% "scalaj-http" % "2.4.2"
// libraryDependencies += "com.typesafe.play" %% "play-json" % "2.7.4"
// Used for Base64 (StudentPlugin)
libraryDependencies += "commons-codec" % "commons-codec" % "1.10"
// addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.28")
addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.8.8")
addSbtPlugin("ch.epfl.lamp" % "sbt-dotty" % "0.5.3")
Source diff could not be displayed: it is too large. Options to address this: view the blob.
package f4
import scala.io.Source
import scala.io.Codec
object F4Data {
val punctuationRx = "[,:;?!.'\"]+".r
def linesIterator: Iterator[String] = {
Option(getClass.getResourceAsStream("/f4/shakespeare.txt")) match {
case Some(resource) =>
DoubleLineIterator(Source.fromInputStream(resource)(Codec.UTF8))
case None =>
throw new RuntimeException("The resource with the corpus is unexpectedly missing, please inform the staff.")
}
}
def lines = linesIterator.toSeq
}
class DoubleLineIterator(underlying: Iterator[Char]) extends scala.collection.AbstractIterator[String] with Iterator[String] {
private[this] val sb = new StringBuilder
lazy val iter: BufferedIterator[Char] = underlying.buffered
def getc(): Boolean = iter.hasNext && {
val ch = iter.next()
if (ch == '\n') {
val has = iter.hasNext
val ch2 = if has then iter.next() else ' '
if (has && ch2 == '\n')
false
else {
sb append ' '
sb append ch2
true
}
} else {
sb append ch
true
}
}
def hasNext: Boolean = iter.hasNext
def next(): String = {
sb.clear()
while (getc()) { }
sb.toString
}
}
package f4
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.log4j.{Logger, Level}
import org.apache.spark.rdd.RDD
import scala.util.Properties.isWin
/** This class calculates word n-gram frequencies in the text it receives, but
* the implementation is faulty and the result is calculated locally.
*
* Your tasks will revolve around fixing the implementation, making the
* calculation distributed and finally, calculating per-speaker word n-gram
* frequencies.
*/
abstract class NGrams {
type Triple[T] = (T, T, T)
type NGramMap = Map[(String, String, String), Int]
val sc: SparkContext
val lines: Seq[String]
val punctuationRx = "[-,:;?!.'\"]+".r
def removePunctuation(str: String) = punctuationRx.replaceAllIn(str, "")
def splitIntoWords(str: String) = str.trim.split("\\s+")
/** Splits a string into a sequence of n-grams.
*
* There are two issues with the implementation:
* [TASK 1]
* Capitalization of words should not matter.
* Convert all words in n-grams to lower case.
*
* [TASK 2]
* If a sentence has less than 3 words, "fake" words should be added to its end.
* For such sentences, add empty strings at the end.
*/
def splitIntoNGrams(line: String): List[Triple[String]] = {
val words = splitIntoWords(removePunctuation(line).toLowerCase)
words.sliding(3).map { seq =>
def at(i: Int) = if i < seq.length then seq(i) else ""
(at(0), at(1), at(2))
}.toList
}
def createNGramsRDD(lines: Seq[String]): RDD[List[Triple[String]]] =
sc.parallelize(lines).map(splitIntoNGrams)
/** This function _locally_ calculates the frequency of n-grams it receives. */
def localNGrams(ngrams: RDD[List[Triple[String]]]): NGramMap = {
ngrams.toLocalIterator
.flatMap(ngramSeq => ngramSeq.map { ng =>
Map(ng -> 1)
})
.reduce { (left, right) =>
right.foldLeft(left) {
case (left, (ng, weightA)) =>
left.updatedWith(ng)(weightBOpt => Some(weightA + weightBOpt.getOrElse(0)))
}
}
}
/** [TASK 3] Based on the above code, calculate n-gram frequency in a
* distributed manner. Define [[aggregateNGrams_zero]],
* [[aggregateNGrams_seqOp]], [[aggregateNGrams_combOp]] in such a way that
* this function will return the same result as the one above.
*/
final def aggregateNGrams(ngrams: RDD[List[Triple[String]]]): NGramMap =
ngrams.aggregate(aggregateNGrams_zero)(aggregateNGrams_seqOp, aggregateNGrams_combOp)
def aggregateNGrams_zero: NGramMap =
Map.empty[Triple[String], Int]
def aggregateNGrams_seqOp(acc: NGramMap, ngramSeq: List[Triple[String]]) =
ngramSeq.foldLeft(acc) {
case (acc, ng) =>
acc.updatedWith(ng)(w => Some(w.getOrElse(0) + 1))
}
def aggregateNGrams_combOp(acc: NGramMap, ngramMap: NGramMap) =
ngramMap.foldLeft(acc) {
case (acc, (ng, w)) =>
acc.updatedWith(ng)(ww => Some(w + ww.getOrElse(0)))
}
/** Your two final tasks are about using Spark to calculate the n-gram frequency
* per each speaker in the play.
*
* [TASK 4]
* To extract each line's speaker from the text, implement [[createSpeakerLinesRDD]].
*
* [TASK 5]
* To calculate the n-gram frequency, define [[createSpeakerLineNGramsRDD]]
* and [[createSpeakerNGramsRDD]].
*/
final def calculateSpeakerLineNGrams(lines: Seq[String]): Map[String, NGramMap] =
createSpeakerNGramsRDD(createSpeakerLineNGramsRDD(createSpeakerLinesRDD(lines)))
.toLocalIterator.toMap
/** Use [[lines]] to create an RDD of (Speaker, Line) pairs.
*
* Each line in [[lines]] looks like "SPEAKER: LINE". To separate the two,
* consider using the `.split` method on [[String]]. You can assume that the
* speaker's name doesn't contain a colon.
*/
def createSpeakerLinesRDD(lines: Seq[String]): RDD[(String, String)] =
sc.parallelize(lines)
.map { s =>
val Array(speaker, line) = s.split(":", 2)
assert(line != "", speaker)
speaker -> line
}
/** Takes the result of [[createSpeakerLinesRDD]] and calculates n-grams for each
* sentence, WITHOUT aggregating by speaker. Example:
* ```
* val input = Seq(
* "First Citizen" -> "Before we proceed! Before we proceed, hear me speak."
* "First Citizen" -> "Hear me speak."
* "SEBASTIAN" -> "Art thou waking?"
* )
*
* val output = Seq(
* "First Citizen" -> Map(
* ("before", "we", "proceed") -> 2,
* ("hear", "me", "speak") -> 1,
* // other n-grams following from the first sentence...
* ),
* "First Citizen" -> Map(
* ("hear", "me", "speak") -> 1,
* ),
* "SEBASTIAN" -> Map(
* ("art", "thou", "waking") -> 1,
* ),
* )
* ```
*/
def createSpeakerLineNGramsRDD(
speakerLines: RDD[(String, String)]
): RDD[(String, NGramMap)] =
speakerLines
.mapValues { s =>
splitIntoNGrams(s)
.map(List(_))
.foldLeft(aggregateNGrams_zero)(aggregateNGrams_seqOp)
}
/** Takes the result of [[createSpeakerLineNGramsRDD]] and aggregates the n-grams
* per each speaker. Example:
* ```
* val input = Seq(
* "First Citizen" -> Map(
* ("before", "we", "proceed") -> 1,
* ("hear", "me", "speak") -> 1,
* ),
* "First Citizen" -> Map(("hear", "me", "speak") -> 1),
* "SEBASTIAN" -> Map(("art", "thou", "waking") -> 1),
* "SEBASTIAN" -> Map(("art", "thou", "waking") -> 1),
* )
*
* val output = Seq(
* "First Citizen" -> Map(
* ("before", "we", "proceed") -> 1,
* ("hear", "me", "speak") -> 2,
* ),
* "SEBASTIAN" -> Map(("art", "thou", "waking") -> 2),
* )
* ```
*/
def createSpeakerNGramsRDD(
speakerLineNGrams: RDD[(String, NGramMap)]
): RDD[(String, NGramMap)] =
speakerLineNGrams
.groupByKey
.mapValues(_.reduce(aggregateNGrams_combOp))
}
object NGrams extends NGrams {
// Reduce Spark logging verbosity
Logger.getLogger("org").setLevel(Level.ERROR)
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("NGram")
val sc: SparkContext = new SparkContext(conf)
val lines = F4Data.lines
def main(args: Array[String]): Unit = {
timed("main", {
val rdd =
createSpeakerNGramsRDD(createSpeakerLineNGramsRDD(createSpeakerLinesRDD(lines)))
rdd.toLocalIterator.foreach { p =>
println(p)
}
})
}
val timing = new StringBuilder
def timed[T](label: String, code: => T): T = {
val start = System.currentTimeMillis()
val result = code
val stop = System.currentTimeMillis()
timing.append(s"Processing $label took ${stop - start} ms.\n")
result
}
}
package f4
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
class F4Suite extends munit.FunSuite, HelperMethods {
/** If this method fails, it means that the code provided as a starting point no longer works correctly. */
private def sanityCheck(): Unit = {
val line = "before we proceed - before we proceed, hear me speak."
assertSameElements(
actual = TestNGrams.splitIntoNGrams(line),
expected = List(
("before", "we", "proceed"),
("we", "proceed", "before"),
("proceed", "before", "we"),
("before", "we", "proceed"),
("we", "proceed", "hear"),
("proceed", "hear", "me"),
("hear", "me", "speak"),
)
)
}
test("'splitIntoNGrams' should lowercase all words in the sentence (1pts)") {
sanityCheck()
val line = "Hear CLAUDIUS speak!"
assertSameElements(
actual = TestNGrams.splitIntoNGrams(line),
expected = List(("hear", "claudius", "speak")),
)
}
test("'splitIntoNGrams' should pad ngrams with empty strings (1pts)") {
sanityCheck()
assertSameElements(
actual = TestNGrams.splitIntoNGrams("no."),
expected = List(("no", "", "")),
)
assertSameElements(
actual = TestNGrams.splitIntoNGrams("not yet"),
expected = List(("not", "yet", ""))
)
}
// NOTE: you will be graded based on the result of running your code on the entire corpus
test("'aggregateNGrams' should correctly aggregate n-grams (unit test) (1pts)") {
val testLines = Seq(
"Before we proceed, hear me speak.",
"Before we proceed.",
"Hear me speak.",
"Art thou waking?",
)
def aggregateNGrams(lines: Seq[String]) = {
import TestNGrams.{lines => _, *}
createNGramsRDD(lines)
.aggregate(aggregateNGrams_zero)(aggregateNGrams_seqOp, aggregateNGrams_combOp)
}
assertSameElements(
actual = aggregateNGrams(testLines).toList,
expected = List(
("before", "we", "proceed") -> 2,
("art", "thou", "waking") -> 1,
("hear", "me", "speak") -> 2,
("proceed", "hear", "me") -> 1,
("we", "proceed", "hear") -> 1,
)
)
}
test("'createSpeakerLinesRDD' should correctly split speakers and lines (1pts)") {
val testLines = List(
"First Citizen: Before we proceed, hear me speak.",
"First Citizen: Before we proceed: hear me speak.",
"First Citizen: Before we proceed.",
"Second Citizen: Hear me speak.",
"SEBASTIAN: Art thou waking?",
"ANTONIO: Do you not hear me speak?",
)
val expected = List(
"First Citizen" -> "Before we proceed, hear me speak.",
"First Citizen" -> "Before we proceed: hear me speak.",
"First Citizen" -> "Before we proceed.",
"Second Citizen" -> "Hear me speak.",
"SEBASTIAN" -> "Art thou waking?",
"ANTONIO" -> "Do you not hear me speak?",
)
assertSameElements(
actual = TestNGrams.createSpeakerLinesRDD(testLines).toLocalIterator.map {
(k, v) => k.trim -> v.trim
}.toList,
expected = expected,
)
}
// NOTE: you will be graded based on the result of running your code on the entire corpus
test("Speaker-specific n-grams should be correctly calculated (unit test) (1pts)") {
val testLines = List(
"First Citizen: Before we proceed, hear me speak.",
"First Citizen: Before we proceed.",
"Second Citizen: Hear me speak.",
"SEBASTIAN: Art thou waking?",
"ANTONIO: Do you not hear me speak?",
)
def speakerNGramsIter(lines: Seq[String]) = {
import TestNGrams.{lines => _, *}
createSpeakerNGramsRDD(createSpeakerLineNGramsRDD(createSpeakerLinesRDD(lines)))
.toLocalIterator
}
assertSameElements(
actual = speakerNGramsIter(testLines).toList,
expected = List(
("SEBASTIAN", Map(("art","thou","waking") -> 1)),
("ANTONIO", Map(("do", "you", "not") -> 1, ("you", "not", "hear") -> 1, ("not", "hear", "me") -> 1, ("hear", "me", "speak") -> 1)),
("First Citizen", Map(("before", "we", "proceed") -> 2, ("we", "proceed", "hear") -> 1, ("proceed", "hear", "me") -> 1, ("hear", "me", "speak") -> 1)),
("Second Citizen", Map(("hear", "me", "speak") -> 1))
)
)
}
}
trait HelperMethods {
/**
* Creates a truncated string representation of a list, adding ", ...)" if there
* are too many elements to show
* @param l The list to preview
* @param n The number of elements to cut it at
* @return A preview of the list, containing at most n elements.
*/
def previewList[A](l: List[A], n: Int = 10): String =
if (l.length <= n) l.toString
else l.take(n).toString.dropRight(1) + ", ...)"
/**
* Asserts that all the elements in a given list and an expected list are the same,
* regardless of order. For a prettier output, given and expected should be sorted
* with the same ordering.
* @param actual The actual list
* @param expected The expected list
* @tparam A Type of the list elements
*/
def assertSameElements[A](actual: List[A], expected: List[A]): Unit = {
val givenSet = actual.toSet
val expectedSet = expected.toSet
val unexpected = givenSet -- expectedSet
val missing = expectedSet -- givenSet
val noUnexpectedElements = unexpected.isEmpty
val noMissingElements = missing.isEmpty
val noMatchString =
s"""
|Expected: ${previewList(expected)}
|Actual: ${previewList(actual)}""".stripMargin
assert(noUnexpectedElements,
s"""|$noMatchString
|The given collection contains some unexpected elements: ${previewList(unexpected.toList, 5)}""".stripMargin)
assert(noMissingElements,
s"""|$noMatchString
|The given collection is missing some expected elements: ${previewList(missing.toList, 5)}""".stripMargin)
}
// Conditions:
// (1) the language stats contain the same elements
// (2) they are ordered (and the order doesn't matter if there are several languages with the same count)
def assertEquivalentAndOrdered(actual: List[(String, Int)], expected: List[(String, Int)]): Unit = {
// (1)
assertSameElements(actual, expected)
// (2)
assert(
!(actual zip actual.tail).exists({ case ((_, occ1), (_, occ2)) => occ1 < occ2 }),
"The given elements are not in descending order"
)
}
}
object TestNGrams extends NGrams {
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.log4j.{Logger, Level}
import org.apache.spark.rdd.RDD
import scala.util.Properties.isWin
// Reduce Spark logging verbosity
Logger.getLogger("org").setLevel(Level.ERROR)
if (isWin) System.setProperty("hadoop.home.dir", System.getProperty("user.dir") + "\\winutils\\hadoop-2.7.4")
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("NGram")
val sc: SparkContext = new SparkContext(conf)
val lines = F4TestData.lines
}
object F4TestData {
import scala.io.Source
import scala.io.Codec
val punctuationRx = "[,:;?!.'\"]+".r
def linesIterator: Iterator[String] = {
Option(getClass.getResourceAsStream("/f4/shakespeare.txt")) match {
case Some(resource) =>
DoubleLineIterator(Source.fromInputStream(resource)(Codec.UTF8))
case None =>
throw new RuntimeException("The resource with the corpus is unexpectedly missing, please inform the staff.")
}
}
def lines = linesIterator.toSeq
}
class DoubleLineIterator(underlying: Iterator[Char]) extends scala.collection.AbstractIterator[String] with Iterator[String] {
private[this] val sb = new StringBuilder
lazy val iter: BufferedIterator[Char] = underlying.buffered
def getc(): Boolean = iter.hasNext && {
val ch = iter.next()
if (ch == '\n') {
val has = iter.hasNext
val ch2 = if has then iter.next() else ' '
if (has && ch2 == '\n')
false
else {
sb append ' '
sb append ch2
true
}
} else {
sb append ch
true
}
}
def hasNext: Boolean = iter.hasNext
def next(): String = {
sb.clear()
while (getc()) { }
sb.toString
}
}
Use the following commands to make a fresh clone of your repository:
```
git clone -b m1 git@gitlab.epfl.ch:lamp/student-repositories-s21/cs206-GASPAR.git m1
```
## Useful links
* [A guide to the Scala parallel collections](https://docs.scala-lang.org/overviews/parallel-collections/overview.html)
* [The API documentation of the Scala parallel collections](https://www.javadoc.io/doc/org.scala-lang.modules/scala-parallel-collections_2.13/latest/scala/collection/index.html)
* [The API documentation of the Scala standard library](https://www.scala-lang.org/files/archive/api/2.13.4)
* [The API documentation of the Java standard library](https://docs.oracle.com/en/java/javase/15/docs/api/index.html)
**If you have issues with the IDE, try [reimporting the
build](https://gitlab.epfl.ch/lamp/cs206/-/blob/master/labs/example-lab.md#ide-features-like-type-on-hover-or-go-to-definition-do-not-work),
if you still have problems, use `compile` in sbt instead.**
## Exercise
Given the following sequential implementation of a function that computes the sequence of rolling averages, your task will be to complete and optimize a parallel version of this code.
```scala
/** Compute the rolling average of array.
*
* For an array `arr = Arr(x1, x2, x3, ..., xn)` the result is
* `Arr(x1 / 1, (x1 + x2) / 2, (x1 + x2 + x3) / 3, ..., (x1 + x2 + x3 + ... + xn) / n)`
*/
def rollingAveragesSequential(arr: Array[Int]): Array[Double] =
// Transform all numbers to fractions with denominator 1
val arr1 = arr.map(x => Frac(x, 1))
// Compute the rolling average keeping the sum of all elements in the numerator and the count of elements in the denominator.
val arr2 = arr1.scan(Frac(0, 0))((acc, x) => Frac(acc.numerator + x.numerator, acc.denominator + x.denominator))
// Transform fractions to Doubles
arr2.map(frac => frac.toDouble)
// Drop the extra initial element that was added by the scan
arr3.tail
```
This implementation has some issues:
- It does not use parallelism
- Creates two intermediate arrays by calling `map`
- Creates an extra intermediate arrays by calling `tail`
- Scan returns an extra element we do not need
We want to parallelize and avoid the creation of the extra arrays.
As we are calling a `scan` the natural operations we need are `upsweep` and `downsweep`.
It is possible specialize those operations for our problem by letting those operations do the mapping.
It is also possible to change those operations to not generate the first element.
We give you a version of `rollingAveragesSequential` that partially implements the parallelization using `upsweep` and `downsweep`.
Your tasks in the exercise will be to:
- TASK 1: Implement the parallelization of `upsweep` and `downsweep`
- TASK 2: Remove the calls to the `map`
- TASK 3: Remove the call to `tail`
You can get partial points for solving part of the tasks.
The order of the tasks is a suggestion, you may do them in any order if that is simpler for you.
Look at the `Lib` trait to find the definitions of functions and classes you can use (or already used).
In this question we use a `Arr` array class instead of the normal `Array`. You may assume that this class has the same performance characteristics as the normal array. `Arr` provides only a limited set of operations.
# General
*.DS_Store
*.swp
*~
# Dotty
*.class
*.tasty
*.hasTasty
# sbt
target/
# IDE
.bsp
.bloop
.metals
.vscode
# datasets
stackoverflow-grading.csv
wikipedia-grading.dat
// Student tasks (i.e. submit, packageSubmission)
enablePlugins(StudentTasks)
course := "midterm"
assignment := "m1"
scalaVersion := "3.0.0-RC1"
scalacOptions ++= Seq("-language:implicitConversions", "-deprecation")
libraryDependencies += "org.scalameta" %% "munit" % "0.7.22"
val MUnitFramework = new TestFramework("munit.Framework")
testFrameworks += MUnitFramework
// Decode Scala names
testOptions += Tests.Argument(MUnitFramework, "-s")
testSuite := "m1.M1Suite"
File deleted
package sbt // To access the private[sbt] compilerReporter key
package filteringReporterPlugin
import Keys._
import ch.epfl.lamp._
object FilteringReporterPlugin extends AutoPlugin {
override lazy val projectSettings = Seq(
// Turn off warning coming from scalameter that we cannot fix without changing scalameter
compilerReporter in (Compile, compile) ~= { reporter => new FilteringReporter(reporter) }
)
}
class FilteringReporter(reporter: xsbti.Reporter) extends xsbti.Reporter {
def reset(): Unit = reporter.reset()
def hasErrors: Boolean = reporter.hasErrors
def hasWarnings: Boolean = reporter.hasWarnings
def printSummary(): Unit = reporter.printSummary()
def problems: Array[xsbti.Problem] = reporter.problems
def log(problem: xsbti.Problem): Unit = {
if (!problem.message.contains("An existential type that came from a Scala-2 classfile cannot be"))
reporter.log(problem)
}
def comment(pos: xsbti.Position, msg: String): Unit =
reporter.comment(pos, msg)
override def toString = s"CollectingReporter($reporter)"
}
package ch.epfl.lamp
import sbt._
import sbt.Keys._
/**
* Coursera uses two versions of each assignment. They both have the same assignment key and part id but have
* different item ids.
*
* @param key Assignment key
* @param partId Assignment partId
* @param itemId Item id of the non premium version
* @param premiumItemId Item id of the premium version (`None` if the assignment is optional)
*/
case class CourseraId(key: String, partId: String, itemId: String, premiumItemId: Option[String])
/**
* Settings shared by all assignments, reused in various tasks.
*/
object MOOCSettings extends AutoPlugin {
override def requires = super.requires && filteringReporterPlugin.FilteringReporterPlugin
object autoImport {
val course = SettingKey[String]("course")
val assignment = SettingKey[String]("assignment")
val options = SettingKey[Map[String, Map[String, String]]]("options")
val courseraId = settingKey[CourseraId]("Coursera-specific information identifying the assignment")
val testSuite = settingKey[String]("Fully qualified name of the test suite of this assignment")
.withRank(KeyRanks.Invisible)
// Convenient alias
type CourseraId = ch.epfl.lamp.CourseraId
val CourseraId = ch.epfl.lamp.CourseraId
}
import autoImport._
override val globalSettings: Seq[Def.Setting[_]] = Seq(
// supershell is verbose, buggy and useless.
useSuperShell := false
)
override val projectSettings: Seq[Def.Setting[_]] = Seq(
parallelExecution in Test := false,
// Report test result after each test instead of waiting for every test to finish
logBuffered in Test := false,
name := s"${course.value}-${assignment.value}"
)
}