From 2057178c3a130efe75f8d2f9f8127b88ac5d45f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Pit-Claudel?= <clement.pit-claudel@epfl.ch> Date: Sat, 21 Dec 2024 22:27:13 +0100 Subject: [PATCH] server/gc: Reuse Clock implementation --- .../scala/cs214/webapp/server/web/Clock.scala | 31 ++++++++++++++++ .../webapp/server/web/GarbageCollector.scala | 37 +++++-------------- .../cs214/webapp/server/web/ServerApp.scala | 24 +----------- 3 files changed, 42 insertions(+), 50 deletions(-) create mode 100644 jvm/src/main/scala/cs214/webapp/server/web/Clock.scala diff --git a/jvm/src/main/scala/cs214/webapp/server/web/Clock.scala b/jvm/src/main/scala/cs214/webapp/server/web/Clock.scala new file mode 100644 index 0000000..8829e6a --- /dev/null +++ b/jvm/src/main/scala/cs214/webapp/server/web/Clock.scala @@ -0,0 +1,31 @@ +package cs214.webapp +package server +package web + +import java.util.concurrent.{TimeUnit, ScheduledExecutorService, ScheduledThreadPoolExecutor, ScheduledFuture} + +import scala.util.Try + +private final class Clock(val name: String, periodMs: Long)(using scheduler: ScheduledExecutorService)(action: => Unit): + private var handler: Option[ScheduledFuture[?]] = None + + val tick = new Runnable: + def run() = Try(action) // Ignore errors (`action` should catch them) + + def start(): Unit = + scheduler.synchronized: + require(handler.isEmpty) + handler = Some(scheduler.scheduleWithFixedDelay(tick, 0, periodMs, TimeUnit.MILLISECONDS)) + println(f"[$name] started") + + def shutdown() = + scheduler.synchronized: + handler.foreach(_.cancel(true)) + handler = None + println(f"[$name] stopped") + +object Clock: + def newScheduler(): ScheduledThreadPoolExecutor = + val scheduler = ScheduledThreadPoolExecutor(1) + scheduler.setRemoveOnCancelPolicy(true) + scheduler diff --git a/jvm/src/main/scala/cs214/webapp/server/web/GarbageCollector.scala b/jvm/src/main/scala/cs214/webapp/server/web/GarbageCollector.scala index 0fa217d..71258df 100644 --- a/jvm/src/main/scala/cs214/webapp/server/web/GarbageCollector.scala +++ b/jvm/src/main/scala/cs214/webapp/server/web/GarbageCollector.scala @@ -2,17 +2,15 @@ package cs214.webapp package server.web import scala.collection.concurrent -import scala.concurrent.duration.Duration -import java.util.concurrent.TimeUnit +import scala.concurrent.duration.* object GarbageCollector: - /// When to do GC? - private val GC_EVERY = Duration(1, TimeUnit.MINUTES) + private val GC_INTERVAL = 1.minutes /// Forces GC after this number of unused instances private val GC_AFTER_UNUSED_INSTANCES = 10 /// GC an instance after this unused time. - private val GC_AFTER_DURATION = Duration(24, TimeUnit.HOURS) - + private val GC_AFTER_DURATION = 24.hours + type LastUsedAt = Long /** Contains the list of instances that are not used */ @@ -31,7 +29,7 @@ object GarbageCollector: WebServer.shutdownApp(i) /** Garbage collect instances */ - def garbageCollect = + def garbageCollect(): Unit = val snap = scheduledForDeletion.snapshot() // Conditions to trigger garbage collection if snap.size >= GC_AFTER_UNUSED_INSTANCES then @@ -46,25 +44,8 @@ object GarbageCollector: println(f"Stopping instance $i after $GC_AFTER_DURATION inactivity.") shutdownApp(i) - /** The thread running the garbage collection */ - class Deleter extends Thread("gc"): - override def run(): Unit = - println("GC started.") - try - while true do - Thread.sleep(GC_EVERY.toMillis) - garbageCollect - catch - case e: InterruptedException => println("GC stopped.") - - private var gcThread: Option[Thread] = None - - /** Starts the garbage collector thread */ - def start() = - gcThread = Some(Deleter()) - gcThread.foreach(_.start()) + private var clock = + Clock("gc", GC_INTERVAL.toMillis)(using Clock.newScheduler())(garbageCollect()) - /** Stops the garbage collection thread */ - def shutdown() = - gcThread.map(_.interrupt()) - gcThread = None + def start() = clock.start() + def shutdown() = clock.shutdown() diff --git a/jvm/src/main/scala/cs214/webapp/server/web/ServerApp.scala b/jvm/src/main/scala/cs214/webapp/server/web/ServerApp.scala index 0795424..c1e9dff 100644 --- a/jvm/src/main/scala/cs214/webapp/server/web/ServerApp.scala +++ b/jvm/src/main/scala/cs214/webapp/server/web/ServerApp.scala @@ -2,7 +2,7 @@ package cs214.webapp package server package web -import java.util.concurrent.{Executors, TimeUnit, ScheduledExecutorService, ScheduledThreadPoolExecutor, ScheduledFuture} +import java.util.concurrent.ScheduledExecutorService import scala.collection.mutable import scala.util.{Success, Failure, Try} @@ -28,9 +28,7 @@ private[web] case class StateMachineServerAppFactory[E, S, V](sm: StateMachine[E private[web] case class ClockDrivenStateMachineServerAppFactory[E, S, V](sm: ClockDrivenStateMachine[E, S, V]) extends ServerAppFactory: // Use one scheduler thread per app, not per app instance - val scheduler = ScheduledThreadPoolExecutor(1) - scheduler.setRemoveOnCancelPolicy(true) - + val scheduler = Clock.newScheduler() override def appInfo: AppInfo = sm.appInfo override def init(instanceId: InstanceId, registeredUserIds: Seq[UserId]): ServerApp = ClockDrivenStateMachineServerApp(sm, instanceId, registeredUserIds)(using scheduler) @@ -169,24 +167,6 @@ private[web] class StateMachineServerApp[E, S, V]( pending -private final class Clock(val name: String, periodMs: Long)(using scheduler: ScheduledExecutorService)(action: => Unit): - private var handler: Option[ScheduledFuture[?]] = None - - val tick = new Runnable: - def run() = Try(action) // Ignore errors (`action` should catch them) - - def start(): Unit = - scheduler.synchronized: - require(handler.isEmpty) - handler = Some(scheduler.scheduleWithFixedDelay(tick, 0, periodMs, TimeUnit.MILLISECONDS)) - println(f"[$name] started") - - def shutdown() = - scheduler.synchronized: - handler.foreach(_.cancel(true)) - handler = None - println(f"[$name] stopped") - private[web] final class ClockDrivenStateMachineServerApp[E, S, V]( sm: ClockDrivenStateMachine[E, S, V], instanceId: InstanceId, -- GitLab