Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# Exercise 1
## Question 1.1
### Part 1
```scala
val planContracts: RDD[(Int, Int)] =
contracts.map(c => (c.planId, c.contractId))
val planPrices: RDD[(Int, Double)] =
plans.map(p => (p.planId, p.price))
val contractPrices: RDD[(Int, Double)] =
planContracts.join(planPrices).values
val contractBrokers: RDD[(Int, Int)] =
contracts.map(c => (c.contractId, c.brokerId))
val brokerContractPrices: RDD[(Int, Double)] =
contractBrokers.join(contractPrices).values
val brokerIdRevenues: RDD[(Int, Double)] =
brokerContractPrices.reduceByKey(_ + _)
val brokerNames: RDD[(Int, String)] =
brokers.map(b => (b.brokerId, b.fullName))
val brokerNameRevenues: RDD[(String, Double)] =
brokerNames.leftOuterJoin(brokerIdRevenues)
.values
.mapValues(_.getOrElse(0.0))
```
### Part 2
```scala
val low20 = brokerNameRevenue
.top((brokers.count * 0.2).toInt)(Ordering.by(_._2).reverse)
```
## Question 1.2
### Part 1
```scala
val planYear: RDD[(Int, Int)] =
plans.map(p => (p.planId, p.yearIntroducedIn))
val planClient: RDD[(Int, Int)] =
contracts.map(c => (c.planId, p.clientId))
val clientYear: RDD[(Int, Int)] =
planClient.join(planYear).values
val clientLatestYear: RDD[(Int, Int)] =
clientYear.reduceByKey(_ max _)
val clientNames: RDD[(Int, String)] =
clients.map(c => (c.clientId, c.fullName))
val upsellClientNames: RDD[String] =
clientLatestYear.filter(p => 2019 - p._2 > 10)
.join(clientNames)
.values
```
### Part 2
```scala
val contractClient: RDD[(Int, Int)] = ... // similar to above
val contractPlan: RDD[(Int, Plan)] =
contracts.map(c => (c.planId, c.contractId))
.join(plans.map(p => (p.planId, p)))
.values
val clientPlan: RDD[(Int, Plan)] =
contractClient.join(contractPlan).values
val clientUpsells: RDD[(Int, Plan)] =
clientLatestPlan.cartesian(plans).filter {
case ((clientId, clientPlan), newPlan) =>
newPlan.price > clientPlan.price
}.map {
case ((clientId, clientPlan), newPlan) =>
(clientId, newPlan)
}
val clientCheapeastUpsell: RDD[(Int, Plan)] =
clientUpsells.reduceByKey {
case (p1, p2) if p1.price < p2.price => p1
case (_, p2) => p2
}
val clientNames: RDD[(Int, String)] =
clients.map(c => (c.clientId, c.fullName))
val upsellSuggestion: RDD[(String, Plan)] =
clientCheapeastUpsell.join(clientNames).values
```