From ccb3c24ffef5ff60c3ef47756390b9d026bf7c4e Mon Sep 17 00:00:00 2001 From: Rishi Sharma <rishi.sharma@epfl.ch> Date: Fri, 1 Apr 2022 20:30:56 +0200 Subject: [PATCH] Add compression --- src/decentralizepy/communication/TCP.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/src/decentralizepy/communication/TCP.py b/src/decentralizepy/communication/TCP.py index 3c41c53..a81e5bf 100644 --- a/src/decentralizepy/communication/TCP.py +++ b/src/decentralizepy/communication/TCP.py @@ -1,5 +1,6 @@ import json import logging +import lzma import pickle from collections import deque @@ -38,7 +39,9 @@ class TCP(Communication): port = rank + 20000 return "tcp://{}:{}".format(machine_addr, port) - def __init__(self, rank, machine_id, mapping, total_procs, addresses_filepath): + def __init__( + self, rank, machine_id, mapping, total_procs, addresses_filepath, compress=False + ): """ Constructor @@ -72,6 +75,7 @@ class TCP(Communication): self.router.setsockopt(zmq.IDENTITY, self.identity) self.router.bind(self.addr(rank, machine_id)) self.sent_disconnections = False + self.compress = compress self.peer_deque = deque() self.peer_sockets = dict() @@ -99,7 +103,13 @@ class TCP(Communication): Encoded data """ - return pickle.dumps(data) + if self.compress: + compressor = lzma.LZMACompressor() + output = compressor.compress(pickle.dumps(data)) + compressor.flush() + else: + output = pickle.dumps(data) + + return output def decrypt(self, sender, data): """ @@ -119,7 +129,10 @@ class TCP(Communication): """ sender = int(sender.decode()) - data = pickle.loads(data) + if self.compress: + data = pickle.loads(lzma.decompress(data)) + else: + data = pickle.loads(data) return sender, data def connect_neighbors(self, neighbors): -- GitLab