diff --git a/src/decentralizepy/graphs/Graph.py b/src/decentralizepy/graphs/Graph.py index 3e20b639fd09a6d8f432417efd8eb737b1566604..7e4c0635bd0ef1762217f594bded826f547b70be 100644 --- a/src/decentralizepy/graphs/Graph.py +++ b/src/decentralizepy/graphs/Graph.py @@ -1,3 +1,6 @@ +import networkx as nx +import numpy as np + class Graph: """ This class defines the graph topology. @@ -144,3 +147,20 @@ class Graph: """ return self.adj_list[uid] + + def centr(self): + my_adj = {x: list(adj) for x, adj in enumerate(self.adj_list)} + nxGraph = nx.Graph(my_adj) + a=nx.to_numpy_matrix(nxGraph) + self.averaging_weights = np.ones((self.n_procs, self.n_procs), dtype=float) + centrality= nx.betweenness_centrality(nxGraph) + for i in range(len(centrality)): + centrality[i]+=0.01 + for i in range(self.averaging_weights.shape[0]): + s=0 + for j in range(self.averaging_weights.shape[0]): + self.averaging_weights[i,j] = 1.0/centrality[j] + s += self.averaging_weights[i,j] + for j in range(self.averaging_weights.shape[0]): + self.averaging_weights[i,j]=self.averaging_weights[i,j]/s + return self.averaging_weights diff --git a/src/decentralizepy/sharing/SharingCentrality.py b/src/decentralizepy/sharing/SharingCentrality.py new file mode 100644 index 0000000000000000000000000000000000000000..52d79fae1c7d9005e8ec7b5d8697d4d502b63572 --- /dev/null +++ b/src/decentralizepy/sharing/SharingCentrality.py @@ -0,0 +1,203 @@ +import logging +from collections import deque + +import torch + + +class Sharing: + """ + API defining who to share with and what, and what to do on receiving + + """ + + def __init__( + self, rank, machine_id, communication, mapping, graph, model, dataset, log_dir + ): + """ + Constructor + + Parameters + ---------- + rank : int + Local rank + machine_id : int + Global machine id + communication : decentralizepy.communication.Communication + Communication module used to send and receive messages + mapping : decentralizepy.mappings.Mapping + Mapping (rank, machine_id) -> uid + graph : decentralizepy.graphs.Graph + Graph reprensenting neighbors + model : decentralizepy.models.Model + Model to train + dataset : decentralizepy.datasets.Dataset + Dataset for sharing data. Not implemented yet! TODO + log_dir : str + Location to write shared_params (only writing for 2 procs per machine) + + """ + self.rank = rank + self.machine_id = machine_id + self.uid = mapping.get_uid(rank, machine_id) + self.communication = communication + self.mapping = mapping + self.graph = graph + self.model = model + self.dataset = dataset + self.communication_round = 0 + self.log_dir = log_dir + self.total_data = 0 + + self.peer_deques = dict() + my_neighbors = self.graph.neighbors(self.uid) + for n in my_neighbors: + self.peer_deques[n] = deque() + + self.averaging_weights = self.graph.centr() + + def received_from_all(self): + """ + Check if all neighbors have sent the current iteration + + Returns + ------- + bool + True if required data has been received, False otherwise + + """ + for _, i in self.peer_deques.items(): + if len(i) == 0: + return False + return True + + def get_neighbors(self, neighbors): + """ + Choose which neighbors to share with + + Parameters + ---------- + neighbors : list(int) + List of all neighbors + + Returns + ------- + list(int) + Neighbors to share with + + """ + # modify neighbors here + return neighbors + + def serialized_model(self): + """ + Convert model to a dictionary. Here we can choose how much to share + + Returns + ------- + dict + Model converted to dict + + """ + m = dict() + for key, val in self.model.state_dict().items(): + m[key] = val.numpy() + self.total_data += len(self.communication.encrypt(m[key])) + return m + + def deserialized_model(self, m): + """ + Convert received dict to state_dict. + + Parameters + ---------- + m : dict + received dict + + Returns + ------- + state_dict + state_dict of received + + """ + state_dict = dict() + for key, value in m.items(): + state_dict[key] = torch.from_numpy(value) + return state_dict + + def _pre_step(self): + """ + Called at the beginning of step. + + """ + pass + + def _post_step(self): + """ + Called at the end of step. + + """ + pass + + def _averaging(self): + """ + Averages the received model with the local model + + """ + with torch.no_grad(): + total = dict() + for _, n in enumerate(self.peer_deques): + _, iteration, data = self.peer_deques[n].popleft() + logging.debug( + "Averaging model from neighbor {} of iteration {}".format( + n, iteration + ) + ) + data = self.deserialized_model(data) + weight = self.averaging_weights[self.uid,n] + for key, value in data.items(): + if key in total: + total[key] += value * weight + else: + total[key] = value * weight + + for key, value in self.model.state_dict().items(): + total[key] += self.averaging_weights[self.uid,self.uid] * value # Metro-Hastings + + self.model.load_state_dict(total) + + def step(self): + """ + Perform a sharing step. Implements D-PSGD. + + """ + self._pre_step() + data = self.serialized_model() + my_uid = self.mapping.get_uid(self.rank, self.machine_id) + all_neighbors = self.graph.neighbors(my_uid) + iter_neighbors = self.get_neighbors(all_neighbors) + data["degree"] = len(all_neighbors) + data["iteration"] = self.communication_round + for neighbor in iter_neighbors: + self.communication.send(neighbor, data) + + logging.info("Waiting for messages from neighbors") + while not self.received_from_all(): + sender, data = self.communication.receive() + logging.debug("Received model from {}".format(sender)) + degree = data["degree"] + iteration = data["iteration"] + del data["degree"] + del data["iteration"] + self.peer_deques[sender].append((degree, iteration, data)) + logging.info( + "Deserialized received model from {} of iteration {}".format( + sender, iteration + ) + ) + + logging.info("Starting model averaging after receiving from all neighbors") + self._averaging() + logging.info("Model averaging complete") + + self.communication_round += 1 + self._post_step()