From 759afc6617e248c53cf5f63adb09628b6073dad8 Mon Sep 17 00:00:00 2001 From: Olivier Blanvillain <olivier.blanvillain@epfl.ch> Date: Mon, 24 May 2021 16:00:02 +0200 Subject: [PATCH] Add exercises/solutions-7.md --- exercises/solutions-7.md | 83 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 exercises/solutions-7.md diff --git a/exercises/solutions-7.md b/exercises/solutions-7.md new file mode 100644 index 0000000..99fa983 --- /dev/null +++ b/exercises/solutions-7.md @@ -0,0 +1,83 @@ +# Exercise 1 + +## Question 1.1 + +### Part 1 + +```scala +val planContracts: RDD[(Int, Int)] = + contracts.map(c => (c.planId, c.contractId)) +val planPrices: RDD[(Int, Double)] = + plans.map(p => (p.planId, p.price)) +val contractPrices: RDD[(Int, Double)] = + planContracts.join(planPrices).values +val contractBrokers: RDD[(Int, Int)] = + contracts.map(c => (c.contractId, c.brokerId)) +val brokerContractPrices: RDD[(Int, Double)] = + contractBrokers.join(contractPrices).values +val brokerIdRevenues: RDD[(Int, Double)] = + brokerContractPrices.reduceByKey(_ + _) +val brokerNames: RDD[(Int, String)] = + brokers.map(b => (b.brokerId, b.fullName)) +val brokerNameRevenues: RDD[(String, Double)] = + brokerNames.leftOuterJoin(brokerIdRevenues) + .values + .mapValues(_.getOrElse(0.0)) +``` + +### Part 2 + +```scala +val low20 = brokerNameRevenue + .top((brokers.count * 0.2).toInt)(Ordering.by(_._2).reverse) +``` + +## Question 1.2 + +### Part 1 + +```scala +val planYear: RDD[(Int, Int)] = + plans.map(p => (p.planId, p.yearIntroducedIn)) +val planClient: RDD[(Int, Int)] = + contracts.map(c => (c.planId, p.clientId)) +val clientYear: RDD[(Int, Int)] = + planClient.join(planYear).values +val clientLatestYear: RDD[(Int, Int)] = + clientYear.reduceByKey(_ max _) +val clientNames: RDD[(Int, String)] = + clients.map(c => (c.clientId, c.fullName)) +val upsellClientNames: RDD[String] = + clientLatestYear.filter(p => 2019 - p._2 > 10) + .join(clientNames) + .values +``` + +### Part 2 + +```scala +val contractClient: RDD[(Int, Int)] = ... // similar to above +val contractPlan: RDD[(Int, Plan)] = + contracts.map(c => (c.planId, c.contractId)) + .join(plans.map(p => (p.planId, p))) + .values +val clientPlan: RDD[(Int, Plan)] = + contractClient.join(contractPlan).values +val clientUpsells: RDD[(Int, Plan)] = + clientLatestPlan.cartesian(plans).filter { + case ((clientId, clientPlan), newPlan) => + newPlan.price > clientPlan.price + }.map { + case ((clientId, clientPlan), newPlan) => + (clientId, newPlan) + } +val clientCheapeastUpsell: RDD[(Int, Plan)] = + clientUpsells.reduceByKey { + case (p1, p2) if p1.price < p2.price => p1 + case (_, p2) => p2 + } +val clientNames: RDD[(Int, String)] = + clients.map(c => (c.clientId, c.fullName)) +val upsellSuggestion: RDD[(String, Plan)] = + clientCheapeastUpsell.join(clientNames).values +``` -- GitLab