From 1586bbba22a0fdf019e361214cdd494455f14256 Mon Sep 17 00:00:00 2001
From: Rishi Sharma <>
Date: Thu, 28 Apr 2022 22:28:46 +0200
Subject: [PATCH] Add centrality tests for Odysseas

 src/decentralizepy/graphs/            |  20 ++
 .../sharing/              | 203 ++++++++++++++++++
 2 files changed, 223 insertions(+)
 create mode 100644 src/decentralizepy/sharing/

diff --git a/src/decentralizepy/graphs/ b/src/decentralizepy/graphs/
index 3e20b63..7e4c063 100644
--- a/src/decentralizepy/graphs/
+++ b/src/decentralizepy/graphs/
@@ -1,3 +1,6 @@
+import networkx as nx
+import numpy as np
 class Graph:
     This class defines the graph topology.
@@ -144,3 +147,20 @@ class Graph:
         return self.adj_list[uid]
+    def centr(self):
+        my_adj = {x: list(adj) for x, adj in enumerate(self.adj_list)}
+        nxGraph = nx.Graph(my_adj)
+        a=nx.to_numpy_matrix(nxGraph)
+        self.averaging_weights = np.ones((self.n_procs, self.n_procs), dtype=float)
+        centrality= nx.betweenness_centrality(nxGraph)
+        for i in range(len(centrality)):
+            centrality[i]+=0.01
+        for i in range(self.averaging_weights.shape[0]):
+            s=0
+            for j in range(self.averaging_weights.shape[0]):
+                self.averaging_weights[i,j] = 1.0/centrality[j]
+                s += self.averaging_weights[i,j]
+            for j in range(self.averaging_weights.shape[0]):
+                self.averaging_weights[i,j]=self.averaging_weights[i,j]/s
+        return self.averaging_weights
diff --git a/src/decentralizepy/sharing/ b/src/decentralizepy/sharing/
new file mode 100644
index 0000000..52d79fa
--- /dev/null
+++ b/src/decentralizepy/sharing/
@@ -0,0 +1,203 @@
+import logging
+from collections import deque
+import torch
+class Sharing:
+    """
+    API defining who to share with and what, and what to do on receiving
+    """
+    def __init__(
+        self, rank, machine_id, communication, mapping, graph, model, dataset, log_dir
+    ):
+        """
+        Constructor
+        Parameters
+        ----------
+        rank : int
+            Local rank
+        machine_id : int
+            Global machine id
+        communication : decentralizepy.communication.Communication
+            Communication module used to send and receive messages
+        mapping : decentralizepy.mappings.Mapping
+            Mapping (rank, machine_id) -> uid
+        graph : decentralizepy.graphs.Graph
+            Graph reprensenting neighbors
+        model : decentralizepy.models.Model
+            Model to train
+        dataset : decentralizepy.datasets.Dataset
+            Dataset for sharing data. Not implemented yet! TODO
+        log_dir : str
+            Location to write shared_params (only writing for 2 procs per machine)
+        """
+        self.rank = rank
+        self.machine_id = machine_id
+        self.uid = mapping.get_uid(rank, machine_id)
+        self.communication = communication
+        self.mapping = mapping
+        self.graph = graph
+        self.model = model
+        self.dataset = dataset
+        self.communication_round = 0
+        self.log_dir = log_dir
+        self.total_data = 0
+        self.peer_deques = dict()
+        my_neighbors = self.graph.neighbors(self.uid)
+        for n in my_neighbors:
+            self.peer_deques[n] = deque()
+        self.averaging_weights = self.graph.centr()
+    def received_from_all(self):
+        """
+        Check if all neighbors have sent the current iteration
+        Returns
+        -------
+        bool
+            True if required data has been received, False otherwise
+        """
+        for _, i in self.peer_deques.items():
+            if len(i) == 0:
+                return False
+        return True
+    def get_neighbors(self, neighbors):
+        """
+        Choose which neighbors to share with
+        Parameters
+        ----------
+        neighbors : list(int)
+            List of all neighbors
+        Returns
+        -------
+        list(int)
+            Neighbors to share with
+        """
+        # modify neighbors here
+        return neighbors
+    def serialized_model(self):
+        """
+        Convert model to a dictionary. Here we can choose how much to share
+        Returns
+        -------
+        dict
+            Model converted to dict
+        """
+        m = dict()
+        for key, val in self.model.state_dict().items():
+            m[key] = val.numpy()
+            self.total_data += len(self.communication.encrypt(m[key]))
+        return m
+    def deserialized_model(self, m):
+        """
+        Convert received dict to state_dict.
+        Parameters
+        ----------
+        m : dict
+            received dict
+        Returns
+        -------
+        state_dict
+            state_dict of received
+        """
+        state_dict = dict()
+        for key, value in m.items():
+            state_dict[key] = torch.from_numpy(value)
+        return state_dict
+    def _pre_step(self):
+        """
+        Called at the beginning of step.
+        """
+        pass
+    def _post_step(self):
+        """
+        Called at the end of step.
+        """
+        pass
+    def _averaging(self):
+        """
+        Averages the received model with the local model
+        """
+        with torch.no_grad():
+            total = dict()
+            for _, n in enumerate(self.peer_deques):
+                _, iteration, data = self.peer_deques[n].popleft()
+                logging.debug(
+                    "Averaging model from neighbor {} of iteration {}".format(
+                        n, iteration
+                    )
+                )
+                data = self.deserialized_model(data)
+                weight = self.averaging_weights[self.uid,n]
+                for key, value in data.items():
+                    if key in total:
+                        total[key] += value * weight
+                    else:
+                        total[key] = value * weight
+            for key, value in self.model.state_dict().items():
+                total[key] += self.averaging_weights[self.uid,self.uid] * value  # Metro-Hastings
+        self.model.load_state_dict(total)
+    def step(self):
+        """
+        Perform a sharing step. Implements D-PSGD.
+        """
+        self._pre_step()
+        data = self.serialized_model()
+        my_uid = self.mapping.get_uid(self.rank, self.machine_id)
+        all_neighbors = self.graph.neighbors(my_uid)
+        iter_neighbors = self.get_neighbors(all_neighbors)
+        data["degree"] = len(all_neighbors)
+        data["iteration"] = self.communication_round
+        for neighbor in iter_neighbors:
+            self.communication.send(neighbor, data)
+"Waiting for messages from neighbors")
+        while not self.received_from_all():
+            sender, data = self.communication.receive()
+            logging.debug("Received model from {}".format(sender))
+            degree = data["degree"]
+            iteration = data["iteration"]
+            del data["degree"]
+            del data["iteration"]
+            self.peer_deques[sender].append((degree, iteration, data))
+                "Deserialized received model from {} of iteration {}".format(
+                    sender, iteration
+                )
+            )
+"Starting model averaging after receiving from all neighbors")
+        self._averaging()
+"Model averaging complete")
+        self.communication_round += 1
+        self._post_step()