Skip to content
Snippets Groups Projects

Exercise 1

Question 1.1

Part 1

val planContracts: RDD[(Int, Int)] =
  contracts.map(c => (c.planId, c.contractId))
val planPrices: RDD[(Int, Double)] =
  plans.map(p => (p.planId, p.price))
val contractPrices: RDD[(Int, Double)] =
  planContracts.join(planPrices).values
val contractBrokers: RDD[(Int, Int)] =
  contracts.map(c => (c.contractId, c.brokerId))
val brokerContractPrices: RDD[(Int, Double)] =
  contractBrokers.join(contractPrices).values
val brokerIdRevenues: RDD[(Int, Double)] =
  brokerContractPrices.reduceByKey(_ + _)
val brokerNames: RDD[(Int, String)] =
  brokers.map(b => (b.brokerId, b.fullName))
val brokerNameRevenues: RDD[(String, Double)] =
  brokerNames.leftOuterJoin(brokerIdRevenues)
             .values
             .mapValues(_.getOrElse(0.0))

Part 2

val low20 = brokerNameRevenue
  .top((brokers.count * 0.2).toInt)(Ordering.by(_._2).reverse)

Question 1.2

Part 1

val planYear: RDD[(Int, Int)] =
  plans.map(p => (p.planId, p.yearIntroducedIn))
val planClient: RDD[(Int, Int)] =
  contracts.map(c => (c.planId, p.clientId))
val clientYear: RDD[(Int, Int)] =
  planClient.join(planYear).values
val clientLatestYear: RDD[(Int, Int)] =
  clientYear.reduceByKey(_ max _)
val clientNames: RDD[(Int, String)] =
  clients.map(c => (c.clientId, c.fullName))
val upsellClientNames: RDD[String] =
  clientLatestYear.filter(p => 2019 - p._2 > 10)
                  .join(clientNames)
                  .values

Part 2

val contractClient: RDD[(Int, Int)] = ... // similar to above
val contractPlan: RDD[(Int, Plan)] =
  contracts.map(c => (c.planId, c.contractId))
           .join(plans.map(p => (p.planId, p)))
           .values
val clientPlan: RDD[(Int, Plan)] =
  contractClient.join(contractPlan).values
val clientUpsells: RDD[(Int, Plan)] =
  clientLatestPlan.cartesian(plans).filter {
    case ((clientId, clientPlan), newPlan) =>
      newPlan.price > clientPlan.price
  }.map {
    case ((clientId, clientPlan), newPlan) =>
      (clientId, newPlan)
  }
val clientCheapeastUpsell: RDD[(Int, Plan)] =
  clientUpsells.reduceByKey {
    case (p1, p2) if p1.price < p2.price => p1
    case (_, p2)                         => p2
  }
val clientNames: RDD[(Int, String)] =
  clients.map(c => (c.clientId, c.fullName))
val upsellSuggestion: RDD[(String, Plan)] =
  clientCheapeastUpsell.join(clientNames).values