From 6706d612c68dfe8c746e9245f09c104ab85992aa Mon Sep 17 00:00:00 2001
From: Jeffrey Wigger <jeffrey.wigger@epfl.ch>
Date: Mon, 9 May 2022 12:42:20 +0000
Subject: [PATCH] Compression

---
 setup.cfg                                     |   1 +
 src/decentralizepy/communication/TCP.py       |  54 +++++++-
 src/decentralizepy/compression/Compression.py |  81 +++++++++++
 src/decentralizepy/compression/Elias.py       | 131 ++++++++++++++++++
 src/decentralizepy/compression/EliasFpzip.py  |  52 +++++++
 .../compression/EliasFpzipLossy.py            |  52 +++++++
 src/decentralizepy/compression/Lz4Wrapper.py  | 100 +++++++++++++
 src/decentralizepy/node/Node.py               |  10 +-
 src/decentralizepy/sharing/FFT.py             |   2 +-
 src/decentralizepy/sharing/PartialModel.py    |   9 +-
 src/decentralizepy/sharing/Sharing.py         |   3 +-
 src/decentralizepy/sharing/Wavelet.py         |  14 +-
 src/decentralizepy/train_test_evaluation.py   |   6 +-
 13 files changed, 485 insertions(+), 30 deletions(-)
 create mode 100644 src/decentralizepy/compression/Compression.py
 create mode 100644 src/decentralizepy/compression/Elias.py
 create mode 100644 src/decentralizepy/compression/EliasFpzip.py
 create mode 100644 src/decentralizepy/compression/EliasFpzipLossy.py
 create mode 100644 src/decentralizepy/compression/Lz4Wrapper.py

diff --git a/setup.cfg b/setup.cfg
index d7f7a7f..08f34d4 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -46,6 +46,7 @@ install_requires =
         pandas
         crudini
         sklearn
+        lz4
 include_package_data = True
 python_requires = >=3.6
 [options.packages.find]
diff --git a/src/decentralizepy/communication/TCP.py b/src/decentralizepy/communication/TCP.py
index fed9301..c609699 100644
--- a/src/decentralizepy/communication/TCP.py
+++ b/src/decentralizepy/communication/TCP.py
@@ -1,6 +1,6 @@
+import importlib
 import json
 import logging
-import lzma
 import pickle
 from collections import deque
 
@@ -48,6 +48,8 @@ class TCP(Communication):
         addresses_filepath,
         compress=False,
         offset=20000,
+        compression_package=None,
+        compression_class=None,
     ):
         """
         Constructor
@@ -64,6 +66,10 @@ class TCP(Communication):
             Total number of processes
         addresses_filepath : str
             JSON file with machine_id -> ip mapping
+        compression_package : str
+            Import path of a module that implements the compression.Compression.Compression class
+        compression_class : str
+            Name of the compression class inside the compression package
 
         """
         super().__init__(rank, machine_id, mapping, total_procs)
@@ -85,6 +91,17 @@ class TCP(Communication):
         self.sent_disconnections = False
         self.compress = compress
 
+        if compression_package and compression_class:
+            compressor_module = importlib.import_module(compression_package)
+            compressor_class = getattr(compressor_module, compression_class)
+            self.compressor = compressor_class()
+            logging.info(f"Using the {compressor_class} to compress the data")
+        else:
+            assert not self.compress
+
+        self.total_data = 0
+        self.total_meta = 0
+
         self.peer_deque = deque()
         self.peer_sockets = dict()
         self.barrier = set()
@@ -112,11 +129,27 @@ class TCP(Communication):
 
         """
         if self.compress:
-            compressor = lzma.LZMACompressor()
-            output = compressor.compress(pickle.dumps(data)) + compressor.flush()
+            if "indices" in data:
+                data["indices"] = self.compressor.compress(data["indices"])
+                meta_len = len(
+                    pickle.dumps(data["indices"])
+                )  # ONLY necessary for the statistics
+            if "params" in data:
+                data["params"] = self.compressor.compress_float(data["params"])
+            output = pickle.dumps(data)
+            # the compressed meta data gets only a few bytes smaller after pickling
+            self.total_meta += meta_len
+            self.total_data += len(output) - meta_len
         else:
             output = pickle.dumps(data)
-
+            # centralized testing uses its own instance
+            if type(data) == dict:
+                if "indices" in data:
+                    meta_len = len(pickle.dumps(data["indices"]))
+                else:
+                    meta_len = 0
+                self.total_meta += meta_len
+                self.total_data += len(output) - meta_len
         return output
 
     def decrypt(self, sender, data):
@@ -138,7 +171,11 @@ class TCP(Communication):
         """
         sender = int(sender.decode())
         if self.compress:
-            data = pickle.loads(lzma.decompress(data))
+            data = pickle.loads(data)
+            if "indices" in data:
+                data["indices"] = self.compressor.decompress(data["indices"])
+            if "params" in data:
+                data["params"] = self.compressor.decompress_float(data["params"])
         else:
             data = pickle.loads(data)
         return sender, data
@@ -226,7 +263,7 @@ class TCP(Communication):
             logging.debug("Received message from {}".format(sender))
             return self.decrypt(sender, recv)
 
-    def send(self, uid, data):
+    def send(self, uid, data, encrypt=True):
         """
         Send a message to a process.
 
@@ -239,7 +276,10 @@ class TCP(Communication):
 
         """
         assert self.initialized == True
-        to_send = self.encrypt(data)
+        if encrypt:
+            to_send = self.encrypt(data)
+        else:
+            to_send = data
         data_size = len(to_send)
         self.total_bytes += data_size
         id = str(uid).encode()
diff --git a/src/decentralizepy/compression/Compression.py b/src/decentralizepy/compression/Compression.py
new file mode 100644
index 0000000..0924caf
--- /dev/null
+++ b/src/decentralizepy/compression/Compression.py
@@ -0,0 +1,81 @@
+import numpy as np
+
+
+class Compression:
+    """
+    Compression API
+
+    """
+
+    def __init__(self):
+        """
+        Constructor
+        """
+
+    def compress(self, arr):
+        """
+        compression function
+
+        Parameters
+        ----------
+        arr : np.ndarray
+            Data to compress
+
+        Returns
+        -------
+        bytearray
+            encoded data as bytes
+
+        """
+        raise NotImplementedError
+
+    def decompress(self, bytes):
+        """
+        decompression function
+
+        Parameters
+        ----------
+        bytes :bytearray
+            compressed data
+
+        Returns
+        -------
+        arr : np.ndarray
+            decompressed data as array
+
+        """
+        raise NotImplementedError
+
+    def compress_float(self, arr):
+        """
+        compression function for float arrays
+
+        Parameters
+        ----------
+        arr : np.ndarray
+            Data to compress
+
+        Returns
+        -------
+        bytearray
+            encoded data as bytes
+
+        """
+        raise NotImplementedError
+
+    def decompress_float(self, bytes):
+        """
+        decompression function for compressed float arrays
+
+        Parameters
+        ----------
+        bytes :bytearray
+            compressed data
+
+        Returns
+        -------
+        arr : np.ndarray
+            decompressed data as array
+
+        """
+        raise NotImplementedError
diff --git a/src/decentralizepy/compression/Elias.py b/src/decentralizepy/compression/Elias.py
new file mode 100644
index 0000000..235cf00
--- /dev/null
+++ b/src/decentralizepy/compression/Elias.py
@@ -0,0 +1,131 @@
+# elias implementation: taken from this stack overflow post:
+# https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma
+import fpzip
+import numpy as np
+
+from decentralizepy.compression.Compression import Compression
+
+
+class Elias(Compression):
+    """
+    Compression API
+
+    """
+
+    def __init__(self):
+        """
+        Constructor
+        """
+
+    def compress(self, arr):
+        """
+        compression function
+
+        Parameters
+        ----------
+        arr : np.ndarray
+            Data to compress
+
+        Returns
+        -------
+        bytearray
+            encoded data as bytes
+
+        """
+        arr.sort()
+        first = arr[0]
+        arr = np.diff(arr).astype(np.int32)
+        arr = arr.view(f"u{arr.itemsize}")
+        l = np.log2(arr).astype("u1")
+        L = ((l << 1) + 1).cumsum()
+        out = np.zeros(int(L[-1] + 128), "u1")
+        for i in range(l.max() + 1):
+            out[L - i - 1] += (arr >> i) & 1
+
+        s = np.array([out.size], dtype=np.int64)
+        size = np.ndarray(8, dtype="u1", buffer=s.data)
+        packed = np.packbits(out)
+        packed[-8:] = size
+        s = np.array([first], dtype=np.int64)
+        size = np.ndarray(8, dtype="u1", buffer=s.data)
+        packed[-16:-8] = size
+        return packed
+
+    def decompress(self, bytes):
+        """
+        decompression function
+
+        Parameters
+        ----------
+        bytes :bytearray
+            compressed data
+
+        Returns
+        -------
+        arr : np.ndarray
+            decompressed data as array
+
+        """
+        n_arr = bytes[-8:]
+        n = np.ndarray(1, dtype=np.int64, buffer=n_arr.data)[0]
+        first = bytes[-16:-8]
+        first = np.ndarray(1, dtype=np.int64, buffer=first.data)[0]
+        b = bytes[:-16]
+        b = np.unpackbits(b, count=n).view(bool)
+        s = b.nonzero()[0]
+        s = (s << 1).repeat(np.diff(s, prepend=-1))
+        s -= np.arange(-1, len(s) - 1)
+        s = s.tolist()  # list has faster __getitem__
+        ns = len(s)
+
+        def gen():
+            idx = 0
+            yield idx
+            while idx < ns:
+                idx = s[idx]
+                yield idx
+
+        offs = np.fromiter(gen(), int)
+        sz = np.diff(offs) >> 1
+        mx = sz.max() + 1
+        out_fin = np.zeros(offs.size, int)
+        out_fin[0] = first
+        out = out_fin[1:]
+        for i in range(mx):
+            out[b[offs[1:] - i - 1] & (sz >= i)] += 1 << i
+        out = np.cumsum(out_fin)
+        return out
+
+    def compress_float(self, arr):
+        """
+        compression function for float arrays
+
+        Parameters
+        ----------
+        arr : np.ndarray
+            Data to compress
+
+        Returns
+        -------
+        bytearray
+            encoded data as bytes
+
+        """
+        return arr
+
+    def decompress_float(self, bytes):
+        """
+        decompression function for compressed float arrays
+
+        Parameters
+        ----------
+        bytes :bytearray
+            compressed data
+
+        Returns
+        -------
+        arr : np.ndarray
+            decompressed data as array
+
+        """
+        return bytes
diff --git a/src/decentralizepy/compression/EliasFpzip.py b/src/decentralizepy/compression/EliasFpzip.py
new file mode 100644
index 0000000..dc1413a
--- /dev/null
+++ b/src/decentralizepy/compression/EliasFpzip.py
@@ -0,0 +1,52 @@
+# elias implementation: taken from this stack overflow post:
+# https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma
+import fpzip
+import numpy as np
+
+from decentralizepy.compression.Elias import Elias
+
+
+class EliasFpzip(Elias):
+    """
+    Compression API
+
+    """
+
+    def __init__(self):
+        """
+        Constructor
+        """
+
+    def compress_float(self, arr):
+        """
+        compression function for float arrays
+
+        Parameters
+        ----------
+        arr : np.ndarray
+            Data to compress
+
+        Returns
+        -------
+        bytearray
+            encoded data as bytes
+
+        """
+        return fpzip.compress(arr, precision=0, order="C")
+
+    def decompress_float(self, bytes):
+        """
+        decompression function for compressed float arrays
+
+        Parameters
+        ----------
+        bytes :bytearray
+            compressed data
+
+        Returns
+        -------
+        arr : np.ndarray
+            decompressed data as array
+
+        """
+        return fpzip.decompress(bytes, order="C")
diff --git a/src/decentralizepy/compression/EliasFpzipLossy.py b/src/decentralizepy/compression/EliasFpzipLossy.py
new file mode 100644
index 0000000..30e0111
--- /dev/null
+++ b/src/decentralizepy/compression/EliasFpzipLossy.py
@@ -0,0 +1,52 @@
+# elias implementation: taken from this stack overflow post:
+# https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma
+import fpzip
+import numpy as np
+
+from decentralizepy.compression.Elias import Elias
+
+
+class EliasFpzipLossy(Elias):
+    """
+    Compression API
+
+    """
+
+    def __init__(self):
+        """
+        Constructor
+        """
+
+    def compress_float(self, arr):
+        """
+        compression function for float arrays
+
+        Parameters
+        ----------
+        arr : np.ndarray
+            Data to compress
+
+        Returns
+        -------
+        bytearray
+            encoded data as bytes
+
+        """
+        return fpzip.compress(arr, precision=18, order="C")
+
+    def decompress_float(self, bytes):
+        """
+        decompression function for compressed float arrays
+
+        Parameters
+        ----------
+        bytes :bytearray
+            compressed data
+
+        Returns
+        -------
+        arr : np.ndarray
+            decompressed data as array
+
+        """
+        return fpzip.decompress(bytes, order="C")
diff --git a/src/decentralizepy/compression/Lz4Wrapper.py b/src/decentralizepy/compression/Lz4Wrapper.py
new file mode 100644
index 0000000..cb65773
--- /dev/null
+++ b/src/decentralizepy/compression/Lz4Wrapper.py
@@ -0,0 +1,100 @@
+import lz4.frame
+import numpy as np
+
+from decentralizepy.compression.Compression import Compression
+
+
+class Lz4Wrapper(Compression):
+    """
+    Compression API
+
+    """
+
+    def __init__(self, compress_metadata=True, compress_data=False):
+        """
+        Constructor
+        """
+        self.compress_metadata = compress_metadata
+        self.compress_data = compress_data
+
+    def compress(self, arr):
+        """
+        compression function
+
+        Parameters
+        ----------
+        arr : np.ndarray
+            Data to compress
+
+        Returns
+        -------
+        bytearray
+            encoded data as bytes
+
+        """
+        if self.compress_metadata:
+            arr.sort()
+            diff = np.diff(arr, prepend=0).astype(np.int32)
+            to_compress = diff.tobytes("C")
+            return lz4.frame.compress(to_compress)
+        return arr
+
+    def decompress(self, bytes):
+        """
+        decompression function
+
+        Parameters
+        ----------
+        bytes :bytearray
+            compressed data
+
+        Returns
+        -------
+        arr : np.ndarray
+            decompressed data as array
+
+        """
+        if self.compress_metadata:
+            decomp = lz4.frame.decompress(bytes)
+            return np.cumsum(np.frombuffer(decomp, dtype=np.int32))
+        return bytes
+
+    def compress_float(self, arr):
+        """
+        compression function for float arrays
+
+        Parameters
+        ----------
+        arr : np.ndarray
+            Data to compress
+
+        Returns
+        -------
+        bytearray
+            encoded data as bytes
+
+        """
+        if self.compress_data:
+            to_compress = arr.tobytes("C")
+            return lz4.frame.compress(to_compress)
+        return arr
+
+    def decompress_float(self, bytes):
+        """
+        decompression function for compressed float arrays
+
+        Parameters
+        ----------
+        bytes :bytearray
+            compressed data
+
+        Returns
+        -------
+        arr : np.ndarray
+            decompressed data as array
+
+        """
+        if self.compress_data:
+            decomp = lz4.frame.decompress(bytes)
+            return np.frombuffer(decomp, dtype=np.float32)
+        return bytes
diff --git a/src/decentralizepy/node/Node.py b/src/decentralizepy/node/Node.py
index e387c58..dd1b9d9 100644
--- a/src/decentralizepy/node/Node.py
+++ b/src/decentralizepy/node/Node.py
@@ -433,12 +433,14 @@ class Node:
 
             results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes
 
-            if hasattr(self.sharing, "total_meta"):
-                results_dict["total_meta"][iteration + 1] = self.sharing.total_meta
-            if hasattr(self.sharing, "total_data"):
+            if hasattr(self.communication, "total_meta"):
+                results_dict["total_meta"][
+                    iteration + 1
+                ] = self.communication.total_meta
+            if hasattr(self.communication, "total_data"):
                 results_dict["total_data_per_n"][
                     iteration + 1
-                ] = self.sharing.total_data
+                ] = self.communication.total_data
             if hasattr(self.sharing, "mean"):
                 results_dict["grad_mean"][iteration + 1] = self.sharing.mean
             if hasattr(self.sharing, "std"):
diff --git a/src/decentralizepy/sharing/FFT.py b/src/decentralizepy/sharing/FFT.py
index cef3873..ba7b841 100644
--- a/src/decentralizepy/sharing/FFT.py
+++ b/src/decentralizepy/sharing/FFT.py
@@ -142,7 +142,7 @@ class FFT(PartialModel):
                     dim=0,
                     sorted=False,
                 )
-
+        index, _ = torch.sort(index)
         return flat_fft[index], index
 
     def serialized_model(self):
diff --git a/src/decentralizepy/sharing/PartialModel.py b/src/decentralizepy/sharing/PartialModel.py
index 018f895..1f1feca 100644
--- a/src/decentralizepy/sharing/PartialModel.py
+++ b/src/decentralizepy/sharing/PartialModel.py
@@ -147,10 +147,13 @@ class PartialModel(Sharing):
         std, mean = torch.std_mean(G_topk, unbiased=False)
         self.std = std.item()
         self.mean = mean.item()
-        return torch.topk(
-            G_topk, round(self.alpha * G_topk.shape[0]), dim=0, sorted=False
+        _, index = torch.topk(
+            G_topk, round(self.alpha * G_topk.shape[0]), dim=0, sorted=True
         )
 
+        index, _ = torch.sort(index)
+        return _, index
+
     def serialized_model(self):
         """
         Convert model to a dict. self.alpha specifies the fraction of model to send.
@@ -217,8 +220,6 @@ class PartialModel(Sharing):
             logging.info("Generated dictionary to send")
 
             logging.info("Converted dictionary to pickle")
-            self.total_data += len(self.communication.encrypt(m["params"]))
-            self.total_meta += len(self.communication.encrypt(m["indices"]))
 
             return m
 
diff --git a/src/decentralizepy/sharing/Sharing.py b/src/decentralizepy/sharing/Sharing.py
index 9f7645b..22b4de9 100644
--- a/src/decentralizepy/sharing/Sharing.py
+++ b/src/decentralizepy/sharing/Sharing.py
@@ -177,8 +177,9 @@ class Sharing:
         iter_neighbors = self.get_neighbors(all_neighbors)
         data["degree"] = len(all_neighbors)
         data["iteration"] = self.communication_round
+        encrypted = self.communication.encrypt(data)
         for neighbor in iter_neighbors:
-            self.communication.send(neighbor, data)
+            self.communication.send(neighbor, encrypted, encrypt=False)
 
         logging.info("Waiting for messages from neighbors")
         while not self.received_from_all():
diff --git a/src/decentralizepy/sharing/Wavelet.py b/src/decentralizepy/sharing/Wavelet.py
index 407714d..b864f1f 100644
--- a/src/decentralizepy/sharing/Wavelet.py
+++ b/src/decentralizepy/sharing/Wavelet.py
@@ -164,7 +164,7 @@ class Wavelet(PartialModel):
                 dim=0,
                 sorted=False,
             )
-
+        index, _ = torch.sort(index)
         return data[index], index
 
     def serialized_model(self):
@@ -224,11 +224,6 @@ class Wavelet(PartialModel):
 
             m["send_partial"] = True
 
-            self.total_data += len(self.communication.encrypt(m["params"]))
-            self.total_meta += len(self.communication.encrypt(m["indices"])) + len(
-                self.communication.encrypt(m["alpha"])
-            )
-
             return m
 
     def deserialized_model(self, m):
@@ -256,13 +251,10 @@ class Wavelet(PartialModel):
         with torch.no_grad():
             if not self.dict_ordered:
                 raise NotImplementedError
-
-            indices = m["indices"]
             alpha = m["alpha"]
-            params = m["params"]
 
-            params_tensor = torch.tensor(params)
-            indices_tensor = torch.tensor(indices, dtype=torch.long)
+            params_tensor = torch.tensor(m["params"])
+            indices_tensor = torch.tensor(m["indices"], dtype=torch.long)
             ret = dict()
             ret["indices"] = indices_tensor
             ret["params"] = params_tensor
diff --git a/src/decentralizepy/train_test_evaluation.py b/src/decentralizepy/train_test_evaluation.py
index 316319d..319d308 100644
--- a/src/decentralizepy/train_test_evaluation.py
+++ b/src/decentralizepy/train_test_evaluation.py
@@ -36,6 +36,7 @@ class TrainTestHelper:
 
     def train_test_evaluation(self, iteration):
         with torch.no_grad():
+            self.model.eval()
             total_threads = os.cpu_count()
             torch.set_num_threads(total_threads)
 
@@ -49,7 +50,7 @@ class TrainTestHelper:
                 clone_val = val.clone().detach()
                 state_dict_copy[key] = clone_val
                 flat = clone_val.flatten()
-                to_cat.append(clone_val.flatten())
+                to_cat.append(flat)
                 lens.append(flat.shape[0])
 
             my_weight = torch.cat(to_cat)
@@ -90,4 +91,5 @@ class TrainTestHelper:
             torch.set_num_threads(self.threads_per_proc)
             for neighbor in neighbors:
                 self.comm.send(neighbor, "finished")
-            return ta, tl, trl
+            self.model.train()
+        return ta, tl, trl
-- 
GitLab