diff --git a/jvm/src/main/scala/cs214/webapp/server/web/GarbageCollector.scala b/jvm/src/main/scala/cs214/webapp/server/web/GarbageCollector.scala index 71258df56e68c5e418b86cc6f591542b48fbb45a..b8e921f836e0b514ea45e981e437b23dab35bd97 100644 --- a/jvm/src/main/scala/cs214/webapp/server/web/GarbageCollector.scala +++ b/jvm/src/main/scala/cs214/webapp/server/web/GarbageCollector.scala @@ -5,47 +5,24 @@ import scala.collection.concurrent import scala.concurrent.duration.* object GarbageCollector: - private val GC_INTERVAL = 1.minutes - /// Forces GC after this number of unused instances - private val GC_AFTER_UNUSED_INSTANCES = 10 - /// GC an instance after this unused time. - private val GC_AFTER_DURATION = 24.hours - - type LastUsedAt = Long - - /** Contains the list of instances that are not used */ - private val scheduledForDeletion = concurrent.TrieMap[InstanceId, LastUsedAt]() - - /** Shedules an instance for a garbage collection */ - def scheduleForGC(i: InstanceId) = - scheduledForDeletion += ((i, System.currentTimeMillis())) - - /** Unschedules an instance already scheduled for GC */ - def unscheduleForGC(i: InstanceId) = - scheduledForDeletion -= i - - def shutdownApp(i: InstanceId) = - unscheduleForGC(i) - WebServer.shutdownApp(i) - - /** Garbage collect instances */ - def garbageCollect(): Unit = - val snap = scheduledForDeletion.snapshot() - // Conditions to trigger garbage collection - if snap.size >= GC_AFTER_UNUSED_INSTANCES then - val (i, _) = snap.toList.minBy(_._2) - println(f"Stopping instance $i because there is more than $GC_AFTER_UNUSED_INSTANCES inactive instances.") - shutdownApp(i) - else - for - (i, t) <- snap - if (Duration(t, TimeUnit.MILLISECONDS) + GC_AFTER_DURATION).toMillis <= System.currentTimeMillis() - do - println(f"Stopping instance $i after $GC_AFTER_DURATION inactivity.") - shutdownApp(i) - - private var clock = - Clock("gc", GC_INTERVAL.toMillis)(using Clock.newScheduler())(garbageCollect()) - - def start() = clock.start() - def shutdown() = clock.shutdown() + private val GC_TICK = 1.minutes + private val GC_CREATION_TIME_GRACE_PERIOD = 1.hour + private val GC_LAST_ACTIVITY_GRACE_PERIOD = 3.days + + def garbageCollect(): Unit = // Not safe, needs lock or care; too slow? + val now = System.currentTimeMillis() + val lastActivityThreshold = now - GC_LAST_ACTIVITY_GRACE_PERIOD.toMillis + val creationTimeThreshold = now - GC_CREATION_TIME_GRACE_PERIOD.toMillis + WebServer.instances.snapshot() + .values.filter: inst => + inst.lastActivity < collectInactiveBefore && + inst.creationTime < collectNeverConnectedBefore + .toList.foreach: inst => + println(f"[gc] Stopping ${inst.appInfo.name}/${inst.instanceId}.") + WebServer.shutdownApp(inst.instanceId) + + private val clock = + val scheduler = Clock.newScheduler() + Clock("gc", GC_TICK.toMillis)(using scheduler)(garbageCollect()) + + clock.start() diff --git a/jvm/src/main/scala/cs214/webapp/server/web/ServerApp.scala b/jvm/src/main/scala/cs214/webapp/server/web/ServerApp.scala index c1e9dff7e4b39ad421f3d70dbededc76e85dc37a..a0ed2900cf5d47d5da23051618077d67218d0658 100644 --- a/jvm/src/main/scala/cs214/webapp/server/web/ServerApp.scala +++ b/jvm/src/main/scala/cs214/webapp/server/web/ServerApp.scala @@ -3,6 +3,7 @@ package server package web import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable import scala.util.{Success, Failure, Try} @@ -37,9 +38,6 @@ private[web] abstract class ServerApp: val instanceId: InstanceId val registeredUserIds: Seq[UserId] - // The instance should be considered inactive until a user connects. - inactive() - protected object instanceLock def appInfo: AppInfo @@ -60,13 +58,19 @@ private[web] abstract class ServerApp: */ def transition(clients: Seq[UserId])(userId: UserId, eventJs: ujson.Value): Try[PendingActions] + val creationTime = System.currentTimeMillis() + + private var _lastActivity = AtomicLong(0) + def recordActivity() = lastActivity.set(System.currentTimeMillis()) + def lastActivity = lastActivity.get() + private val channels = mutable.Map[UserId, mutable.Set[cask.WsChannelActor]]().withDefault(_ => mutable.Set()) def connect(userId: UserId) (implicit cc: castor.Context, log: cask.Logger): cask.WebsocketResult = instanceLock.synchronized: + recordActivity() cask.WsHandler: channel => - active() channels.getOrElseUpdate(userId, mutable.Set()).add(channel) println(f"[${appInfo.id}/$instanceId] client \"$userId\" connected") send(userId): @@ -82,16 +86,10 @@ private[web] abstract class ServerApp: } def disconnect(userId: UserId, channel: WsChannelActor): Unit = instanceLock.synchronized: + recordActivity() channels(userId).remove(channel) if channels(userId).isEmpty then channels.remove(userId) println(f"[${appInfo.id}/$instanceId] client \"$userId\" disconnected") - if channels.isEmpty then inactive() - - /** Called when the app becomes active */ - def active() = GarbageCollector.unscheduleForGC(instanceId) - - /** Called when the app becomes inactive */ - def inactive() = GarbageCollector.scheduleForGC(instanceId) /** Enumerates clients connected to this instance. */ def connectedClients: Seq[UserId] = instanceLock.synchronized: @@ -110,6 +108,7 @@ private[web] abstract class ServerApp: channel.send(cask.Ws.Text(wrapped)) def handleMessage(userId: UserId, msg: ujson.Value): Unit = instanceLock.synchronized: + recordActivity() announceTransition(userId): transition(connectedClients)(userId, msg) diff --git a/jvm/src/main/scala/cs214/webapp/server/web/WebServer.scala b/jvm/src/main/scala/cs214/webapp/server/web/WebServer.scala index 06b2d1b0fc032898588c0e237e23bee77c4867a8..c33e612bd1b51c0ba9de48f153adb203358c825c 100644 --- a/jvm/src/main/scala/cs214/webapp/server/web/WebServer.scala +++ b/jvm/src/main/scala/cs214/webapp/server/web/WebServer.scala @@ -35,7 +35,7 @@ object WebServer: private[web] val appDirectory: mutable.Map[AppId, ServerAppFactory] = mutable.Map() /** Mapping from app instance ids to their running instances */ - private[web] val instances: concurrent.Map[InstanceId, ServerApp] = + private[web] val instances: concurrent.TrieMap[InstanceId, ServerApp] = concurrent.TrieMap() /** Registers the given app */