diff --git a/eval/plot.py b/eval/plot.py index f119053db0f2ce90f3a09791a13c5f83d4fd7504..6056f6060d5950db7e8d565b018c2ab28aa3f0aa 100644 --- a/eval/plot.py +++ b/eval/plot.py @@ -67,7 +67,7 @@ def plot_results(path, centralized, data_machine="machine0", data_node=0): filepath = os.path.join(mf_path, f) with open(filepath, "r") as inf: results.append(json.load(inf)) - if folder.startswith("FL"): + if folder.startswith("FL") or folder.startswith("Parameter Server"): data_node = -1 else: data_node = 0 @@ -76,7 +76,8 @@ def plot_results(path, centralized, data_machine="machine0", data_node=0): main_data = [main_data] # Plot Training loss plt.figure(1) - means, stdevs, mins, maxs = get_stats([x["train_loss"] for x in results]) + means, stdevs, mins, maxs = get_stats( + [x["train_loss"] for x in results]) plot(means, stdevs, mins, maxs, "Training Loss", folder, "upper right") df = pd.DataFrame( { @@ -93,9 +94,11 @@ def plot_results(path, centralized, data_machine="machine0", data_node=0): # Plot Testing loss plt.figure(2) if centralized: - means, stdevs, mins, maxs = get_stats([x["test_loss"] for x in main_data]) + means, stdevs, mins, maxs = get_stats( + [x["test_loss"] for x in main_data]) else: - means, stdevs, mins, maxs = get_stats([x["test_loss"] for x in results]) + means, stdevs, mins, maxs = get_stats( + [x["test_loss"] for x in results]) plot(means, stdevs, mins, maxs, "Testing Loss", folder, "upper right") df = pd.DataFrame( { @@ -112,9 +115,11 @@ def plot_results(path, centralized, data_machine="machine0", data_node=0): # Plot Testing Accuracy plt.figure(3) if centralized: - means, stdevs, mins, maxs = get_stats([x["test_acc"] for x in main_data]) + means, stdevs, mins, maxs = get_stats( + [x["test_acc"] for x in main_data]) else: - means, stdevs, mins, maxs = get_stats([x["test_acc"] for x in results]) + means, stdevs, mins, maxs = get_stats( + [x["test_acc"] for x in results]) plot(means, stdevs, mins, maxs, "Testing Accuracy", folder, "lower right") df = pd.DataFrame( { @@ -153,6 +158,7 @@ def plot_results(path, centralized, data_machine="machine0", data_node=0): means, stdevs, mins, maxs = get_stats(bytes_list) bytes_means[folder] = list(means.values())[0] bytes_stdevs[folder] = list(stdevs.values())[0] + print(bytes_list) meta_list = [] for x in results: diff --git a/eval/testingPeerSampler.py b/eval/testingPeerSampler.py index decedf2b68ac4af4f8926f37a7ad21201136cd64..d8fb3cfb7a1b2f7a33f58e322a2f163cdf5c3bfe 100644 --- a/eval/testingPeerSampler.py +++ b/eval/testingPeerSampler.py @@ -11,8 +11,6 @@ from decentralizepy.mappings.Linear import Linear from decentralizepy.node.DPSGDWithPeerSampler import DPSGDWithPeerSampler from decentralizepy.node.PeerSamplerDynamic import PeerSamplerDynamic from decentralizepy.node.PeerSampler import PeerSampler -from decentralizepy.node.ParameterServer import ParameterServer -from decentralizepy.node.DPSGDNodeWithParameterServer import DPSGDNodeWithParameterServer def read_ini(file_path): @@ -62,8 +60,7 @@ if __name__ == "__main__": processes.append( mp.Process( # target=PeerSamplerDynamic, - target=ParameterServer, - # target=PeerSampler, + target=PeerSampler, args=[ sr, m_id, @@ -80,8 +77,7 @@ if __name__ == "__main__": for r in range(0, procs_per_machine): processes.append( mp.Process( - target=DPSGDNodeWithParameterServer, - # target=DPSGDWithPeerSampler, + target=DPSGDWithPeerSampler, args=[ r, m_id, diff --git a/src/decentralizepy/node/DPSGDNodeFederated.py b/src/decentralizepy/node/DPSGDNodeFederated.py index 7d8f5b1efdd4977413610cf114f27fa06cd83bab..b27c92da89b50491321f5907f3d1cea4ed1ad64a 100644 --- a/src/decentralizepy/node/DPSGDNodeFederated.py +++ b/src/decentralizepy/node/DPSGDNodeFederated.py @@ -1,4 +1,5 @@ import importlib +import json import logging import math import os @@ -35,7 +36,7 @@ class DPSGDNodeFederated(Node): del data["iteration"] del data["CHANNEL"] - self.model.load_state_dict(data) + self.model.load_state_dict(data["params"]) self.sharing._post_step() self.sharing.communication_round += 1 @@ -60,6 +61,47 @@ class DPSGDNodeFederated(Node): to_send["CHANNEL"] = "DPSGD" self.communication.send(self.parameter_server_uid, to_send) + if self.participated > 0: + with open( + os.path.join( + self.log_dir, "{}_results.json".format(self.rank)), + "r", + ) as inf: + results_dict = json.load(inf) + else: + results_dict = { + "train_loss": {}, + "test_loss": {}, + "test_acc": {}, + "total_bytes": {}, + "total_meta": {}, + "total_data_per_n": {}, + "grad_mean": {}, + "grad_std": {}, + } + + results_dict["total_bytes"][iteration + + 1] = self.communication.total_bytes + + if hasattr(self.communication, "total_meta"): + results_dict["total_meta"][ + iteration + 1 + ] = self.communication.total_meta + if hasattr(self.communication, "total_data"): + results_dict["total_data_per_n"][ + iteration + 1 + ] = self.communication.total_data + if hasattr(self.sharing, "mean"): + results_dict["grad_mean"][iteration + 1] = self.sharing.mean + if hasattr(self.sharing, "std"): + results_dict["grad_std"][iteration + 1] = self.sharing.std + + with open( + os.path.join( + self.log_dir, "{}_results.json".format(self.rank)), "w" + ) as of: + json.dump(results_dict, of) + self.participated += 1 # only if has participated in learning diff --git a/src/decentralizepy/node/DPSGDNodeWithParameterServer.py b/src/decentralizepy/node/DPSGDNodeWithParameterServer.py deleted file mode 100644 index 85fa862e0c61438533c3c764b566bdbd25e1cb46..0000000000000000000000000000000000000000 --- a/src/decentralizepy/node/DPSGDNodeWithParameterServer.py +++ /dev/null @@ -1,526 +0,0 @@ -import importlib -import json -import logging -import math -import os -from collections import deque - -import torch -from matplotlib import pyplot as plt - -from decentralizepy import utils -from decentralizepy.communication.TCP import TCP -from decentralizepy.graphs.Graph import Graph -from decentralizepy.graphs.Star import Star -from decentralizepy.mappings.Mapping import Mapping -from decentralizepy.node.Node import Node -from decentralizepy.train_test_evaluation import TrainTestHelper - - -class DPSGDNodeWithParameterServer(Node): - """ - This class defines the node for DPSGD - - """ - - def save_plot(self, l, label, title, xlabel, filename): - """ - Save Matplotlib plot. Clears previous plots. - - Parameters - ---------- - l : dict - dict of x -> y. `x` must be castable to int. - label : str - label of the plot. Used for legend. - title : str - Header - xlabel : str - x-axis label - filename : str - Name of file to save the plot as. - - """ - plt.clf() - y_axis = [l[key] for key in l.keys()] - x_axis = list(map(int, l.keys())) - plt.plot(x_axis, y_axis, label=label) - plt.xlabel(xlabel) - plt.title(title) - plt.savefig(filename) - - def run(self): - """ - Start the decentralized learning - - """ - self.testset = self.dataset.get_testset() - rounds_to_test = self.test_after - rounds_to_train_evaluate = self.train_evaluate_after - global_epoch = 1 - change = 1 - if self.uid == 0: - dataset = self.dataset - if self.centralized_train_eval: - dataset_params_copy = self.dataset_params.copy() - if "sizes" in dataset_params_copy: - del dataset_params_copy["sizes"] - self.whole_dataset = self.dataset_class( - self.rank, - self.machine_id, - self.mapping, - sizes=[1.0], - **dataset_params_copy - ) - dataset = self.whole_dataset - if self.centralized_test_eval: - tthelper = TrainTestHelper( - dataset, # self.whole_dataset, - # self.model_test, # todo: this only works if eval_train is set to false - self.model, - self.loss, - self.weights_store_dir, - self.mapping.get_n_procs(), - self.trainer, - self.testing_comm, - self.star, - self.threads_per_proc, - eval_train=self.centralized_train_eval, - ) - - for iteration in range(self.iterations): - logging.info("Starting training iteration: %d", iteration) - self.iteration = iteration - self.trainer.train(self.dataset) - - to_send = self.sharing.get_data_to_send() - to_send["CHANNEL"] = "DPSGD" - - self.communication.send(self.parameter_server_uid, to_send) - - sender, data = self.receive_channel("GRADS") - del data["CHANNEL"] - - self.model.load_state_dict(data) - self.sharing._post_step() - self.sharing.communication_round += 1 - - if self.reset_optimizer: - self.optimizer = self.optimizer_class( - self.model.parameters(), **self.optimizer_params - ) # Reset optimizer state - self.trainer.reset_optimizer(self.optimizer) - - if iteration: - with open( - os.path.join( - self.log_dir, "{}_results.json".format(self.rank)), - "r", - ) as inf: - results_dict = json.load(inf) - else: - results_dict = { - "train_loss": {}, - "test_loss": {}, - "test_acc": {}, - "total_bytes": {}, - "total_meta": {}, - "total_data_per_n": {}, - "grad_mean": {}, - "grad_std": {}, - } - - results_dict["total_bytes"][iteration - + 1] = self.communication.total_bytes - - if hasattr(self.communication, "total_meta"): - results_dict["total_meta"][ - iteration + 1 - ] = self.communication.total_meta - if hasattr(self.communication, "total_data"): - results_dict["total_data_per_n"][ - iteration + 1 - ] = self.communication.total_data - if hasattr(self.sharing, "mean"): - results_dict["grad_mean"][iteration + 1] = self.sharing.mean - if hasattr(self.sharing, "std"): - results_dict["grad_std"][iteration + 1] = self.sharing.std - - rounds_to_train_evaluate -= 1 - - if rounds_to_train_evaluate == 0 and not self.centralized_train_eval: - logging.info("Evaluating on train set.") - rounds_to_train_evaluate = self.train_evaluate_after * change - loss_after_sharing = self.trainer.eval_loss(self.dataset) - results_dict["train_loss"][iteration + 1] = loss_after_sharing - self.save_plot( - results_dict["train_loss"], - "train_loss", - "Training Loss", - "Communication Rounds", - os.path.join( - self.log_dir, "{}_train_loss.png".format(self.rank)), - ) - - rounds_to_test -= 1 - - if self.dataset.__testing__ and rounds_to_test == 0: - rounds_to_test = self.test_after * change - if self.centralized_test_eval: - if self.uid == 0: - ta, tl, trl = tthelper.train_test_evaluation(iteration) - results_dict["test_acc"][iteration + 1] = ta - results_dict["test_loss"][iteration + 1] = tl - if trl is not None: - results_dict["train_loss"][iteration + 1] = trl - else: - self.testing_comm.send(0, self.model.get_weights()) - sender, data = self.testing_comm.receive() - assert sender == 0 and data == "finished" - else: - logging.info("Evaluating on test set.") - ta, tl = self.dataset.test(self.model, self.loss) - results_dict["test_acc"][iteration + 1] = ta - results_dict["test_loss"][iteration + 1] = tl - - if global_epoch == 49: - change *= 2 - - global_epoch += change - - with open( - os.path.join( - self.log_dir, "{}_results.json".format(self.rank)), "w" - ) as of: - json.dump(results_dict, of) - if self.model.shared_parameters_counter is not None: - logging.info("Saving the shared parameter counts") - with open( - os.path.join( - self.log_dir, "{}_shared_parameters.json".format(self.rank) - ), - "w", - ) as of: - json.dump( - self.model.shared_parameters_counter.numpy().tolist(), of) - self.disconnect_parameter_server() - logging.info("Storing final weight") - self.model.dump_weights(self.weights_store_dir, self.uid, iteration) - logging.info("Server disconnected. Process complete!") - - def cache_fields( - self, - rank, - machine_id, - mapping, - graph, - iterations, - log_dir, - weights_store_dir, - test_after, - train_evaluate_after, - reset_optimizer, - centralized_train_eval, - centralized_test_eval, - ): - """ - Instantiate object field with arguments. - - Parameters - ---------- - rank : int - Rank of process local to the machine - machine_id : int - Machine ID on which the process in running - mapping : decentralizepy.mappings - The object containing the mapping rank <--> uid - graph : decentralizepy.graphs - The object containing the global graph - iterations : int - Number of iterations (communication steps) for which the model should be trained - log_dir : str - Logging directory - weights_store_dir : str - Directory in which to store model weights - test_after : int - Number of iterations after which the test loss and accuracy arecalculated - train_evaluate_after : int - Number of iterations after which the train loss is calculated - reset_optimizer : int - 1 if optimizer should be reset every communication round, else 0 - centralized_train_eval : bool - If set the train set evaluation happens at the node with uid 0 - centralized_test_eval : bool - If set the train set evaluation happens at the node with uid 0 - """ - self.rank = rank - self.machine_id = machine_id - self.graph = graph - self.mapping = mapping - self.uid = self.mapping.get_uid(rank, machine_id) - self.log_dir = log_dir - self.weights_store_dir = weights_store_dir - self.iterations = iterations - self.test_after = test_after - self.train_evaluate_after = train_evaluate_after - self.reset_optimizer = reset_optimizer - self.centralized_train_eval = centralized_train_eval - self.centralized_test_eval = centralized_test_eval - self.sent_disconnections = False - - logging.info("Rank: %d", self.rank) - logging.info("type(graph): %s", str(type(self.rank))) - logging.info("type(mapping): %s", str(type(self.mapping))) - - if centralized_test_eval or centralized_train_eval: - self.star = Star(self.mapping.get_n_procs()) - - def init_comm(self, comm_configs): - """ - Instantiate communication module from config. - - Parameters - ---------- - comm_configs : dict - Python dict containing communication config params - - """ - comm_module = importlib.import_module(comm_configs["comm_package"]) - comm_class = getattr(comm_module, comm_configs["comm_class"]) - comm_params = utils.remove_keys( - comm_configs, ["comm_package", "comm_class"]) - self.addresses_filepath = comm_params.get("addresses_filepath", None) - if self.centralized_test_eval: - self.testing_comm = TCP( - self.rank, - self.machine_id, - self.mapping, - self.star.n_procs, - self.addresses_filepath, - offset=self.star.n_procs, - ) - self.testing_comm.connect_neighbors(self.star.neighbors(self.uid)) - - self.communication = comm_class( - self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params - ) - - def instantiate( - self, - rank: int, - machine_id: int, - mapping: Mapping, - graph: Graph, - config, - iterations=1, - log_dir=".", - weights_store_dir=".", - log_level=logging.INFO, - test_after=5, - train_evaluate_after=1, - reset_optimizer=1, - centralized_train_eval=False, - centralized_test_eval=True, - *args - ): - """ - Construct objects. - - Parameters - ---------- - rank : int - Rank of process local to the machine - machine_id : int - Machine ID on which the process in running - mapping : decentralizepy.mappings - The object containing the mapping rank <--> uid - graph : decentralizepy.graphs - The object containing the global graph - config : dict - A dictionary of configurations. - iterations : int - Number of iterations (communication steps) for which the model should be trained - log_dir : str - Logging directory - weights_store_dir : str - Directory in which to store model weights - log_level : logging.Level - One of DEBUG, INFO, WARNING, ERROR, CRITICAL - test_after : int - Number of iterations after which the test loss and accuracy arecalculated - train_evaluate_after : int - Number of iterations after which the train loss is calculated - reset_optimizer : int - 1 if optimizer should be reset every communication round, else 0 - centralized_train_eval : bool - If set the train set evaluation happens at the node with uid 0 - centralized_test_eval : bool - If set the train set evaluation happens at the node with uid 0 - args : optional - Other arguments - - """ - logging.info("Started process.") - - self.init_log(log_dir, rank, log_level) - - self.cache_fields( - rank, - machine_id, - mapping, - graph, - iterations, - log_dir, - weights_store_dir, - test_after, - train_evaluate_after, - reset_optimizer, - centralized_train_eval, - centralized_test_eval, - ) - self.init_dataset_model(config["DATASET"]) - self.init_optimizer(config["OPTIMIZER_PARAMS"]) - self.init_trainer(config["TRAIN_PARAMS"]) - self.init_comm(config["COMMUNICATION"]) - - self.message_queue = dict() - - self.barrier = set() - self.my_neighbors = self.graph.neighbors(self.uid) - - self.init_sharing(config["SHARING"]) - - def __init__( - self, - rank: int, - machine_id: int, - mapping: Mapping, - graph: Graph, - config, - iterations=1, - log_dir=".", - weights_store_dir=".", - log_level=logging.INFO, - test_after=5, - train_evaluate_after=1, - reset_optimizer=1, - centralized_train_eval=0, - centralized_test_eval=1, - parameter_server_uid=-1, - *args - ): - """ - Constructor - - Parameters - ---------- - rank : int - Rank of process local to the machine - machine_id : int - Machine ID on which the process in running - mapping : decentralizepy.mappings - The object containing the mapping rank <--> uid - graph : decentralizepy.graphs - The object containing the global graph - config : dict - A dictionary of configurations. Must contain the following: - [DATASET] - dataset_package - dataset_class - model_class - [OPTIMIZER_PARAMS] - optimizer_package - optimizer_class - [TRAIN_PARAMS] - training_package = decentralizepy.training.Training - training_class = Training - epochs_per_round = 25 - batch_size = 64 - iterations : int - Number of iterations (communication steps) for which the model should be trained - log_dir : str - Logging directory - weights_store_dir : str - Directory in which to store model weights - log_level : logging.Level - One of DEBUG, INFO, WARNING, ERROR, CRITICAL - test_after : int - Number of iterations after which the test loss and accuracy arecalculated - train_evaluate_after : int - Number of iterations after which the train loss is calculated - reset_optimizer : int - 1 if optimizer should be reset every communication round, else 0 - centralized_train_eval : int - If set then the train set evaluation happens at the node with uid 0. - Note: If it is True then centralized_test_eval needs to be true as well! - centralized_test_eval : int - If set then the trainset evaluation happens at the node with uid 0 - parameter_server_uid: int - The parameter server's uid - args : optional - Other arguments - - """ - centralized_train_eval = centralized_train_eval == 1 - centralized_test_eval = centralized_test_eval == 1 - # If centralized_train_eval is True then centralized_test_eval needs to be true as well! - assert not centralized_train_eval or centralized_test_eval - - total_threads = os.cpu_count() - self.threads_per_proc = max( - math.floor(total_threads / mapping.procs_per_machine), 1 - ) - torch.set_num_threads(self.threads_per_proc) - torch.set_num_interop_threads(1) - self.instantiate( - rank, - machine_id, - mapping, - graph, - config, - iterations, - log_dir, - weights_store_dir, - log_level, - test_after, - train_evaluate_after, - reset_optimizer, - centralized_train_eval == 1, - centralized_test_eval == 1, - *args - ) - logging.info( - "Each proc uses %d threads out of %d.", self.threads_per_proc, total_threads - ) - - self.message_queue["PEERS"] = deque() - - self.parameter_server_uid = parameter_server_uid - self.connect_neighbor(self.parameter_server_uid) - self.wait_for_hello(self.parameter_server_uid) - - self.run() - - def disconnect_parameter_server(self): - """ - Disconnects from the parameter server. Sends BYE. - - Raises - ------ - RuntimeError - If received another message while waiting for BYEs - - """ - if not self.sent_disconnections: - logging.info("Disconnecting parameter server.") - self.communication.send( - self.parameter_server_uid, { - "BYE": self.uid, "CHANNEL": "SERVER_REQUEST"} - ) - self.sent_disconnections = True - - self.barrier.remove(self.parameter_server_uid) - - while len(self.barrier): - sender, _ = self.receive_disconnect() - self.barrier.remove(sender) diff --git a/src/decentralizepy/node/FederatedParameterServer.py b/src/decentralizepy/node/FederatedParameterServer.py index ee919b73abf34934029b9cfdac874ee183def6ab..a92de2886c2a6a212cff3f6d77a094f0694fec4e 100644 --- a/src/decentralizepy/node/FederatedParameterServer.py +++ b/src/decentralizepy/node/FederatedParameterServer.py @@ -285,8 +285,7 @@ class FederatedParameterServer(Node): self.current_workers = self.get_working_nodes() # Params to send to workers - to_send = self.model.state_dict() - + to_send["params"] = self.model.state_dict() to_send["CHANNEL"] = "WORKER_REQUEST" to_send["iteration"] = iteration diff --git a/src/decentralizepy/node/ParameterServer.py b/src/decentralizepy/node/ParameterServer.py deleted file mode 100644 index 616138ad581cb1f82251dc144dc8ea90d7639447..0000000000000000000000000000000000000000 --- a/src/decentralizepy/node/ParameterServer.py +++ /dev/null @@ -1,308 +0,0 @@ -import importlib -import logging -import os -from collections import deque - -from decentralizepy import utils -from decentralizepy.graphs.Graph import Graph -from decentralizepy.mappings.Mapping import Mapping -from decentralizepy.node.Node import Node - - -class ParameterServer(Node): - """ - This class defines the parameter serving service - - """ - - def init_log(self, log_dir, log_level, force=True): - """ - Instantiate Logging. - - Parameters - ---------- - log_dir : str - Logging directory - rank : rank : int - Rank of process local to the machine - log_level : logging.Level - One of DEBUG, INFO, WARNING, ERROR, CRITICAL - force : bool - Argument to logging.basicConfig() - - """ - log_file = os.path.join(log_dir, "ParameterServer.log") - logging.basicConfig( - filename=log_file, - format="[%(asctime)s][%(module)s][%(levelname)s] %(message)s", - level=log_level, - force=force, - ) - - def cache_fields( - self, - rank, - machine_id, - mapping, - graph, - iterations, - log_dir, - ): - """ - Instantiate object field with arguments. - - Parameters - ---------- - rank : int - Rank of process local to the machine - machine_id : int - Machine ID on which the process in running - mapping : decentralizepy.mappings - The object containing the mapping rank <--> uid - graph : decentralizepy.graphs - The object containing the global graph - iterations : int - Number of iterations (communication steps) for which the model should be trained - log_dir : str - Logging directory - - """ - self.rank = rank - self.machine_id = machine_id - self.graph = graph - self.mapping = mapping - self.uid = self.mapping.get_uid(rank, machine_id) - self.log_dir = log_dir - self.iterations = iterations - self.sent_disconnections = False - - logging.info("Rank: %d", self.rank) - logging.info("type(graph): %s", str(type(self.rank))) - logging.info("type(mapping): %s", str(type(self.mapping))) - - def init_comm(self, comm_configs): - """ - Instantiate communication module from config. - - Parameters - ---------- - comm_configs : dict - Python dict containing communication config params - - """ - comm_module = importlib.import_module(comm_configs["comm_package"]) - comm_class = getattr(comm_module, comm_configs["comm_class"]) - comm_params = utils.remove_keys( - comm_configs, ["comm_package", "comm_class"]) - self.addresses_filepath = comm_params.get("addresses_filepath", None) - self.communication = comm_class( - self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params - ) - - def instantiate( - self, - rank: int, - machine_id: int, - mapping: Mapping, - graph: Graph, - config, - iterations=1, - log_dir=".", - log_level=logging.INFO, - *args - ): - """ - Construct objects. - - Parameters - ---------- - rank : int - Rank of process local to the machine - machine_id : int - Machine ID on which the process in running - mapping : decentralizepy.mappings - The object containing the mapping rank <--> uid - graph : decentralizepy.graphs - The object containing the global graph - config : dict - A dictionary of configurations. - iterations : int - Number of iterations (communication steps) for which the model should be trained - log_dir : str - Logging directory - log_level : logging.Level - One of DEBUG, INFO, WARNING, ERROR, CRITICAL - args : optional - Other arguments - - """ - logging.info("Started process.") - - self.init_log(log_dir, log_level) - - self.cache_fields( - rank, - machine_id, - mapping, - graph, - iterations, - log_dir, - ) - - self.message_queue = dict() - - self.barrier = set() - - self.peer_deques = dict() - - self.init_dataset_model(config["DATASET"]) - self.init_comm(config["COMMUNICATION"]) - self.my_neighbors = self.graph.get_all_nodes() - self.connect_neighbors() - self.init_sharing(config["SHARING"]) - - def receive_server_request(self): - return self.receive_channel("SERVER_REQUEST") - - def received_from_all(self): - """ - Check if all neighbors have sent the current iteration - - Returns - ------- - bool - True if required data has been received, False otherwise - - """ - for k in self.my_neighbors: - if (k not in self.peer_deques) or len(self.peer_deques[k]) == 0: - return False - return True - - def run(self): - """ - Start the parameter-serving service. - - """ - for iteration in range(self.iterations): - self.iteration = iteration - # reset deques after each iteration - self.peer_deques = dict() - - while not self.received_from_all(): - sender, data = self.receive_channel("DPSGD") - if sender not in self.peer_deques: - self.peer_deques[sender] = deque() - self.peer_deques[sender].append(data) - - logging.info("Received from everybody") - - averaging_deque = dict() - total = dict() - for neighbor in self.my_neighbors: - averaging_deque[neighbor] = self.peer_deques[neighbor] - - for i, n in enumerate(averaging_deque): - data = averaging_deque[n].popleft() - degree, iteration = data["degree"], data["iteration"] - del data["degree"] - del data["iteration"] - del data["CHANNEL"] - data = self.sharing.deserialized_model(data) - for key, value in data.items(): - if key in total: - total[key] += value - else: - total[key] = value - - for key, value in total.items(): - total[key] = total[key] / len(averaging_deque) - - to_send = total - to_send["CHANNEL"] = "GRADS" - - for neighbor in self.my_neighbors: - self.communication.send(neighbor, to_send) - - while len(self.barrier): - sender, data = self.receive_server_request() - if "BYE" in data: - logging.debug("Received {} from {}".format("BYE", sender)) - self.barrier.remove(sender) - - def __init__( - self, - rank: int, - machine_id: int, - mapping: Mapping, - graph: Graph, - config, - iterations=1, - log_dir=".", - log_level=logging.INFO, - *args - ): - """ - Constructor - - Parameters - ---------- - rank : int - Rank of process local to the machine - machine_id : int - Machine ID on which the process in running - mapping : decentralizepy.mappings - The object containing the mapping rank <--> uid - graph : decentralizepy.graphs - The object containing the global graph - config : dict - A dictionary of configurations. Must contain the following: - [DATASET] - dataset_package - dataset_class - model_class - [OPTIMIZER_PARAMS] - optimizer_package - optimizer_class - [TRAIN_PARAMS] - training_package = decentralizepy.training.Training - training_class = Training - epochs_per_round = 25 - batch_size = 64 - iterations : int - Number of iterations (communication steps) for which the model should be trained - log_dir : str - Logging directory - log_level : logging.Level - One of DEBUG, INFO, WARNING, ERROR, CRITICAL - args : optional - Other arguments - - """ - super().__init__( - rank, - machine_id, - mapping, - graph, - config, - iterations, - log_dir, - log_level, - *args - ) - - self.instantiate( - rank, - machine_id, - mapping, - graph, - config, - iterations, - log_dir, - log_level, - *args - ) - - self.run() - - logging.info("Parameter Server exiting")