diff --git a/eval/plot.py b/eval/plot.py index 6056f6060d5950db7e8d565b018c2ab28aa3f0aa..62ac2302ff53dd2f2f7093e3fdc5e22fc9ec2cab 100644 --- a/eval/plot.py +++ b/eval/plot.py @@ -76,8 +76,7 @@ def plot_results(path, centralized, data_machine="machine0", data_node=0): main_data = [main_data] # Plot Training loss plt.figure(1) - means, stdevs, mins, maxs = get_stats( - [x["train_loss"] for x in results]) + means, stdevs, mins, maxs = get_stats([x["train_loss"] for x in results]) plot(means, stdevs, mins, maxs, "Training Loss", folder, "upper right") df = pd.DataFrame( { @@ -94,11 +93,9 @@ def plot_results(path, centralized, data_machine="machine0", data_node=0): # Plot Testing loss plt.figure(2) if centralized: - means, stdevs, mins, maxs = get_stats( - [x["test_loss"] for x in main_data]) + means, stdevs, mins, maxs = get_stats([x["test_loss"] for x in main_data]) else: - means, stdevs, mins, maxs = get_stats( - [x["test_loss"] for x in results]) + means, stdevs, mins, maxs = get_stats([x["test_loss"] for x in results]) plot(means, stdevs, mins, maxs, "Testing Loss", folder, "upper right") df = pd.DataFrame( { @@ -115,11 +112,9 @@ def plot_results(path, centralized, data_machine="machine0", data_node=0): # Plot Testing Accuracy plt.figure(3) if centralized: - means, stdevs, mins, maxs = get_stats( - [x["test_acc"] for x in main_data]) + means, stdevs, mins, maxs = get_stats([x["test_acc"] for x in main_data]) else: - means, stdevs, mins, maxs = get_stats( - [x["test_acc"] for x in results]) + means, stdevs, mins, maxs = get_stats([x["test_acc"] for x in results]) plot(means, stdevs, mins, maxs, "Testing Accuracy", folder, "lower right") df = pd.DataFrame( { @@ -133,23 +128,7 @@ def plot_results(path, centralized, data_machine="machine0", data_node=0): df.to_csv( os.path.join(path, "test_acc_" + folder + ".csv"), index_label="rounds" ) - plt.figure(6) - means, stdevs, mins, maxs = get_stats([x["grad_std"] for x in results]) - plot( - means, - stdevs, - mins, - maxs, - "Gradient Variation over Nodes", - folder, - "upper right", - ) - # Plot Testing loss - plt.figure(7) - means, stdevs, mins, maxs = get_stats([x["grad_mean"] for x in results]) - plot( - means, stdevs, mins, maxs, "Gradient Magnitude Mean", folder, "upper right" - ) + # Collect total_bytes shared bytes_list = [] for x in results: @@ -185,10 +164,6 @@ def plot_results(path, centralized, data_machine="machine0", data_node=0): plt.savefig(os.path.join(path, "test_loss.png"), dpi=300) plt.figure(3) plt.savefig(os.path.join(path, "test_acc.png"), dpi=300) - plt.figure(6) - plt.savefig(os.path.join(path, "grad_std.png"), dpi=300) - plt.figure(7) - plt.savefig(os.path.join(path, "grad_mean.png"), dpi=300) # Plot total_bytes plt.figure(4) plt.title("Data Shared") diff --git a/eval/testing.py b/eval/testing.py index 9d67b28da6626887591f6b70dd41e700b7eeb744..556d1796580e268b29691fdef81b33017473d2be 100644 --- a/eval/testing.py +++ b/eval/testing.py @@ -65,7 +65,33 @@ if __name__ == "__main__": args.test_after, args.train_evaluate_after, args.reset_optimizer, - args.centralized_train_eval, - args.centralized_test_eval, ], ) + + processes = [] + for r in range(procs_per_machine): + processes.append( + mp.Process( + target=DPSGDNode, + args=[ + r, + m_id, + l, + g, + my_config, + args.iterations, + args.log_dir, + args.weights_store_dir, + log_level[args.log_level], + args.test_after, + args.train_evaluate_after, + args.reset_optimizer, + ], + ) + ) + + for p in processes: + p.start() + + for p in processes: + p.join() diff --git a/eval/testingFederated.py b/eval/testingFederated.py index c1b0c4dd20142a2d768275ff07c16f229e11331f..e0666d791cff5fd778b8ffe4d499ed344ed1e5a5 100644 --- a/eval/testingFederated.py +++ b/eval/testingFederated.py @@ -8,8 +8,8 @@ from torch import multiprocessing as mp from decentralizepy import utils from decentralizepy.graphs.Graph import Graph from decentralizepy.mappings.Linear import Linear -from decentralizepy.node.FederatedParameterServer import FederatedParameterServer from decentralizepy.node.DPSGDNodeFederated import DPSGDNodeFederated +from decentralizepy.node.FederatedParameterServer import FederatedParameterServer def read_ini(file_path): diff --git a/eval/testingPeerSampler.py b/eval/testingPeerSampler.py index d8fb3cfb7a1b2f7a33f58e322a2f163cdf5c3bfe..1e0b39a838254bd078f0ae709f68b967f97d3caa 100644 --- a/eval/testingPeerSampler.py +++ b/eval/testingPeerSampler.py @@ -9,8 +9,8 @@ from decentralizepy import utils from decentralizepy.graphs.Graph import Graph from decentralizepy.mappings.Linear import Linear from decentralizepy.node.DPSGDWithPeerSampler import DPSGDWithPeerSampler -from decentralizepy.node.PeerSamplerDynamic import PeerSamplerDynamic from decentralizepy.node.PeerSampler import PeerSampler +# from decentralizepy.node.PeerSamplerDynamic import PeerSamplerDynamic def read_ini(file_path): @@ -91,8 +91,6 @@ if __name__ == "__main__": args.test_after, args.train_evaluate_after, args.reset_optimizer, - args.centralized_train_eval, - args.centralized_test_eval, ], ) ) diff --git a/logs/config_celeba_sharing.ini b/logs/config_celeba_sharing.ini deleted file mode 100644 index c5302ae575b7f96a604d1e04b8e78854718795dd..0000000000000000000000000000000000000000 --- a/logs/config_celeba_sharing.ini +++ /dev/null @@ -1,39 +0,0 @@ -[DATASET] -dataset_package = decentralizepy.datasets.Celeba -dataset_class = Celeba -model_class = CNN -images_dir = /mnt/nfs/shared/leaf/data/celeba/data/raw/img_align_celeba -train_dir = /mnt/nfs/shared/leaf/data/celeba/per_user_data/train -test_dir = /mnt/nfs/shared/leaf/data/celeba/data/test -; python list of fractions below -sizes = - -[OPTIMIZER_PARAMS] -optimizer_package = torch.optim -optimizer_class = SGD -lr = 0.001 - -[TRAIN_PARAMS] -training_package = decentralizepy.training.Training -training_class = Training -rounds = 4 -full_epochs = False -batch_size = 16 -shuffle = True -loss_package = torch.nn -loss_class = CrossEntropyLoss - -[COMMUNICATION] -comm_package = decentralizepy.communication.TCP -comm_class = TCP -addresses_filepath = /mnt/nfs/kirsten/Gitlab/tutorial/ip.json - -[SHARING] -sharing_package = decentralizepy.sharing.Sharing -sharing_class = Sharing - -;sharing_package = decentralizepy.sharing.PartialModel -;sharing_class = PartialModel -;alpha = 0.1 -;accumulation = True -;accumulate_averaging_changes = True diff --git a/src/decentralizepy/node/DPSGDNode.py b/src/decentralizepy/node/DPSGDNode.py index 0c8f0434d74249ff704d833a869907263b4c2c09..3951ad297f99137c19435386ecba05c7d8db5730 100644 --- a/src/decentralizepy/node/DPSGDNode.py +++ b/src/decentralizepy/node/DPSGDNode.py @@ -9,12 +9,9 @@ import torch from matplotlib import pyplot as plt from decentralizepy import utils -from decentralizepy.communication.TCP import TCP from decentralizepy.graphs.Graph import Graph -from decentralizepy.graphs.Star import Star from decentralizepy.mappings.Mapping import Mapping from decentralizepy.node.Node import Node -from decentralizepy.train_test_evaluation import TrainTestHelper class DPSGDNode(Node): @@ -70,42 +67,18 @@ class DPSGDNode(Node): rounds_to_train_evaluate = self.train_evaluate_after global_epoch = 1 change = 1 - if self.uid == 0: - dataset = self.dataset - if self.centralized_train_eval: - dataset_params_copy = self.dataset_params.copy() - if "sizes" in dataset_params_copy: - del dataset_params_copy["sizes"] - self.whole_dataset = self.dataset_class( - self.rank, - self.machine_id, - self.mapping, - sizes=[1.0], - **dataset_params_copy - ) - dataset = self.whole_dataset - if self.centralized_test_eval: - tthelper = TrainTestHelper( - dataset, # self.whole_dataset, - # self.model_test, # todo: this only works if eval_train is set to false - self.model, - self.loss, - self.weights_store_dir, - self.mapping.get_n_procs(), - self.trainer, - self.testing_comm, - self.star, - self.threads_per_proc, - eval_train=self.centralized_train_eval, - ) for iteration in range(self.iterations): logging.info("Starting training iteration: %d", iteration) + rounds_to_train_evaluate -= 1 + rounds_to_test -= 1 + self.iteration = iteration self.trainer.train(self.dataset) new_neighbors = self.get_neighbors() + # The following code does not work because TCP sockets are supposed to be long lived. # for neighbor in self.my_neighbors: # if neighbor not in new_neighbors: # logging.info("Removing neighbor {}".format(neighbor)) @@ -163,8 +136,6 @@ class DPSGDNode(Node): "total_bytes": {}, "total_meta": {}, "total_data_per_n": {}, - "grad_mean": {}, - "grad_std": {}, } results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes @@ -177,14 +148,8 @@ class DPSGDNode(Node): results_dict["total_data_per_n"][ iteration + 1 ] = self.communication.total_data - if hasattr(self.sharing, "mean"): - results_dict["grad_mean"][iteration + 1] = self.sharing.mean - if hasattr(self.sharing, "std"): - results_dict["grad_std"][iteration + 1] = self.sharing.std - rounds_to_train_evaluate -= 1 - - if rounds_to_train_evaluate == 0 and not self.centralized_train_eval: + if rounds_to_train_evaluate == 0: logging.info("Evaluating on train set.") rounds_to_train_evaluate = self.train_evaluate_after * change loss_after_sharing = self.trainer.eval_loss(self.dataset) @@ -197,26 +162,12 @@ class DPSGDNode(Node): os.path.join(self.log_dir, "{}_train_loss.png".format(self.rank)), ) - rounds_to_test -= 1 - if self.dataset.__testing__ and rounds_to_test == 0: rounds_to_test = self.test_after * change - if self.centralized_test_eval: - if self.uid == 0: - ta, tl, trl = tthelper.train_test_evaluation(iteration) - results_dict["test_acc"][iteration + 1] = ta - results_dict["test_loss"][iteration + 1] = tl - if trl is not None: - results_dict["train_loss"][iteration + 1] = trl - else: - self.testing_comm.send(0, self.model.get_weights()) - sender, data = self.testing_comm.receive() - assert sender == 0 and data == "finished" - else: - logging.info("Evaluating on test set.") - ta, tl = self.dataset.test(self.model, self.loss) - results_dict["test_acc"][iteration + 1] = ta - results_dict["test_loss"][iteration + 1] = tl + logging.info("Evaluating on test set.") + ta, tl = self.dataset.test(self.model, self.loss) + results_dict["test_acc"][iteration + 1] = ta + results_dict["test_loss"][iteration + 1] = tl if global_epoch == 49: change *= 2 @@ -253,8 +204,6 @@ class DPSGDNode(Node): test_after, train_evaluate_after, reset_optimizer, - centralized_train_eval, - centralized_test_eval, ): """ Instantiate object field with arguments. @@ -281,10 +230,6 @@ class DPSGDNode(Node): Number of iterations after which the train loss is calculated reset_optimizer : int 1 if optimizer should be reset every communication round, else 0 - centralized_train_eval : bool - If set the train set evaluation happens at the node with uid 0 - centralized_test_eval : bool - If set the train set evaluation happens at the node with uid 0 """ self.rank = rank self.machine_id = machine_id @@ -297,17 +242,12 @@ class DPSGDNode(Node): self.test_after = test_after self.train_evaluate_after = train_evaluate_after self.reset_optimizer = reset_optimizer - self.centralized_train_eval = centralized_train_eval - self.centralized_test_eval = centralized_test_eval self.sent_disconnections = False logging.info("Rank: %d", self.rank) logging.info("type(graph): %s", str(type(self.rank))) logging.info("type(mapping): %s", str(type(self.mapping))) - if centralized_test_eval or centralized_train_eval: - self.star = Star(self.mapping.get_n_procs()) - def init_comm(self, comm_configs): """ Instantiate communication module from config. @@ -322,17 +262,6 @@ class DPSGDNode(Node): comm_class = getattr(comm_module, comm_configs["comm_class"]) comm_params = utils.remove_keys(comm_configs, ["comm_package", "comm_class"]) self.addresses_filepath = comm_params.get("addresses_filepath", None) - if self.centralized_test_eval: - self.testing_comm = TCP( - self.rank, - self.machine_id, - self.mapping, - self.star.n_procs, - self.addresses_filepath, - offset=self.star.n_procs, - ) - self.testing_comm.connect_neighbors(self.star.neighbors(self.uid)) - self.communication = comm_class( self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params ) @@ -351,8 +280,6 @@ class DPSGDNode(Node): test_after=5, train_evaluate_after=1, reset_optimizer=1, - centralized_train_eval=False, - centralized_test_eval=True, *args ): """ @@ -384,10 +311,6 @@ class DPSGDNode(Node): Number of iterations after which the train loss is calculated reset_optimizer : int 1 if optimizer should be reset every communication round, else 0 - centralized_train_eval : bool - If set the train set evaluation happens at the node with uid 0 - centralized_test_eval : bool - If set the train set evaluation happens at the node with uid 0 args : optional Other arguments @@ -407,8 +330,6 @@ class DPSGDNode(Node): test_after, train_evaluate_after, reset_optimizer, - centralized_train_eval, - centralized_test_eval, ) self.init_dataset_model(config["DATASET"]) self.init_optimizer(config["OPTIMIZER_PARAMS"]) @@ -423,7 +344,6 @@ class DPSGDNode(Node): self.init_sharing(config["SHARING"]) self.peer_deques = dict() self.connect_neighbors() - # self.instantiate_peer_deques() def received_from_all(self): """ @@ -454,8 +374,6 @@ class DPSGDNode(Node): test_after=5, train_evaluate_after=1, reset_optimizer=1, - centralized_train_eval=0, - centralized_test_eval=1, *args ): """ @@ -499,19 +417,10 @@ class DPSGDNode(Node): Number of iterations after which the train loss is calculated reset_optimizer : int 1 if optimizer should be reset every communication round, else 0 - centralized_train_eval : int - If set then the train set evaluation happens at the node with uid 0. - Note: If it is True then centralized_test_eval needs to be true as well! - centralized_test_eval : int - If set then the trainset evaluation happens at the node with uid 0 args : optional Other arguments """ - centralized_train_eval = centralized_train_eval == 1 - centralized_test_eval = centralized_test_eval == 1 - # If centralized_train_eval is True then centralized_test_eval needs to be true as well! - assert not centralized_train_eval or centralized_test_eval total_threads = os.cpu_count() self.threads_per_proc = max( @@ -532,8 +441,6 @@ class DPSGDNode(Node): test_after, train_evaluate_after, reset_optimizer, - centralized_train_eval == 1, - centralized_test_eval == 1, *args ) logging.info( diff --git a/src/decentralizepy/node/DPSGDNodeFederated.py b/src/decentralizepy/node/DPSGDNodeFederated.py index b27c92da89b50491321f5907f3d1cea4ed1ad64a..a1e36df1b936b33a4f415b469f1d803e936f1eec 100644 --- a/src/decentralizepy/node/DPSGDNodeFederated.py +++ b/src/decentralizepy/node/DPSGDNodeFederated.py @@ -40,11 +40,11 @@ class DPSGDNodeFederated(Node): self.sharing._post_step() self.sharing.communication_round += 1 - logging.info("Received worker request at node {}, global iteration {}, local round {}".format( - self.uid, - iteration, - self.participated - )) + logging.info( + "Received worker request at node {}, global iteration {}, local round {}".format( + self.uid, iteration, self.participated + ) + ) if self.reset_optimizer: self.optimizer = self.optimizer_class( @@ -63,8 +63,7 @@ class DPSGDNodeFederated(Node): if self.participated > 0: with open( - os.path.join( - self.log_dir, "{}_results.json".format(self.rank)), + os.path.join(self.log_dir, "{}_results.json".format(self.rank)), "r", ) as inf: results_dict = json.load(inf) @@ -76,12 +75,9 @@ class DPSGDNodeFederated(Node): "total_bytes": {}, "total_meta": {}, "total_data_per_n": {}, - "grad_mean": {}, - "grad_std": {}, } - results_dict["total_bytes"][iteration - + 1] = self.communication.total_bytes + results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes if hasattr(self.communication, "total_meta"): results_dict["total_meta"][ @@ -91,14 +87,9 @@ class DPSGDNodeFederated(Node): results_dict["total_data_per_n"][ iteration + 1 ] = self.communication.total_data - if hasattr(self.sharing, "mean"): - results_dict["grad_mean"][iteration + 1] = self.sharing.mean - if hasattr(self.sharing, "std"): - results_dict["grad_std"][iteration + 1] = self.sharing.std - + with open( - os.path.join( - self.log_dir, "{}_results.json".format(self.rank)), "w" + os.path.join(self.log_dir, "{}_results.json".format(self.rank)), "w" ) as of: json.dump(results_dict, of) @@ -122,7 +113,7 @@ class DPSGDNodeFederated(Node): weights_store_dir, test_after, train_evaluate_after, - reset_optimizer + reset_optimizer, ): """ Instantiate object field with arguments. @@ -179,8 +170,7 @@ class DPSGDNodeFederated(Node): """ comm_module = importlib.import_module(comm_configs["comm_package"]) comm_class = getattr(comm_module, comm_configs["comm_class"]) - comm_params = utils.remove_keys( - comm_configs, ["comm_package", "comm_class"]) + comm_params = utils.remove_keys(comm_configs, ["comm_package", "comm_class"]) self.addresses_filepath = comm_params.get("addresses_filepath", None) self.communication = comm_class( self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params diff --git a/src/decentralizepy/node/DPSGDWithPeerSampler.py b/src/decentralizepy/node/DPSGDWithPeerSampler.py index d4e926c4c0515c617e86a10d20666bf8cb5cbd1a..f4f90e744e90d51916b3bf8f39667b74a49f6259 100644 --- a/src/decentralizepy/node/DPSGDWithPeerSampler.py +++ b/src/decentralizepy/node/DPSGDWithPeerSampler.py @@ -47,8 +47,6 @@ class DPSGDWithPeerSampler(DPSGDNode): test_after=5, train_evaluate_after=1, reset_optimizer=1, - centralized_train_eval=0, - centralized_test_eval=1, peer_sampler_uid=-1, *args ): @@ -93,19 +91,10 @@ class DPSGDWithPeerSampler(DPSGDNode): Number of iterations after which the train loss is calculated reset_optimizer : int 1 if optimizer should be reset every communication round, else 0 - centralized_train_eval : int - If set then the train set evaluation happens at the node with uid 0. - Note: If it is True then centralized_test_eval needs to be true as well! - centralized_test_eval : int - If set then the trainset evaluation happens at the node with uid 0 args : optional Other arguments """ - centralized_train_eval = centralized_train_eval == 1 - centralized_test_eval = centralized_test_eval == 1 - # If centralized_train_eval is True then centralized_test_eval needs to be true as well! - assert not centralized_train_eval or centralized_test_eval total_threads = os.cpu_count() self.threads_per_proc = max( @@ -126,8 +115,6 @@ class DPSGDWithPeerSampler(DPSGDNode): test_after, train_evaluate_after, reset_optimizer, - centralized_train_eval == 1, - centralized_test_eval == 1, *args ) logging.info( diff --git a/src/decentralizepy/node/FederatedParameterServer.py b/src/decentralizepy/node/FederatedParameterServer.py index a92de2886c2a6a212cff3f6d77a094f0694fec4e..75fb11134ac2289a8857f07d4a8139ed99a1316b 100644 --- a/src/decentralizepy/node/FederatedParameterServer.py +++ b/src/decentralizepy/node/FederatedParameterServer.py @@ -5,6 +5,7 @@ import math import os import random from collections import deque + from matplotlib import pyplot as plt from decentralizepy import utils @@ -134,8 +135,7 @@ class FederatedParameterServer(Node): """ comm_module = importlib.import_module(comm_configs["comm_package"]) comm_class = getattr(comm_module, comm_configs["comm_class"]) - comm_params = utils.remove_keys( - comm_configs, ["comm_package", "comm_class"]) + comm_params = utils.remove_keys(comm_configs, ["comm_package", "comm_class"]) self.addresses_filepath = comm_params.get("addresses_filepath", None) self.communication = comm_class( self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params @@ -291,9 +291,7 @@ class FederatedParameterServer(Node): # Notify workers for worker in self.current_workers: - self.communication.send( - worker, to_send - ) + self.communication.send(worker, to_send) # Receive updates from current workers while not self.received_from_all(): @@ -314,8 +312,7 @@ class FederatedParameterServer(Node): if iteration: with open( - os.path.join( - self.log_dir, "{}_results.json".format(self.rank)), + os.path.join(self.log_dir, "{}_results.json".format(self.rank)), "r", ) as inf: results_dict = json.load(inf) @@ -327,12 +324,9 @@ class FederatedParameterServer(Node): "total_bytes": {}, "total_meta": {}, "total_data_per_n": {}, - "grad_mean": {}, - "grad_std": {}, } - results_dict["total_bytes"][iteration - + 1] = self.communication.total_bytes + results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes if hasattr(self.communication, "total_meta"): results_dict["total_meta"][ @@ -342,10 +336,6 @@ class FederatedParameterServer(Node): results_dict["total_data_per_n"][ iteration + 1 ] = self.communication.total_data - if hasattr(self.sharing, "mean"): - results_dict["grad_mean"][iteration + 1] = self.sharing.mean - if hasattr(self.sharing, "std"): - results_dict["grad_std"][iteration + 1] = self.sharing.std rounds_to_train_evaluate -= 1 @@ -359,8 +349,7 @@ class FederatedParameterServer(Node): "train_loss", "Training Loss", "Communication Rounds", - os.path.join( - self.log_dir, "{}_train_loss.png".format(self.rank)), + os.path.join(self.log_dir, "{}_train_loss.png".format(self.rank)), ) rounds_to_test -= 1 @@ -378,8 +367,7 @@ class FederatedParameterServer(Node): global_epoch += change with open( - os.path.join( - self.log_dir, "{}_results.json".format(self.rank)), "w" + os.path.join(self.log_dir, "{}_results.json".format(self.rank)), "w" ) as of: json.dump(results_dict, of) @@ -391,8 +379,7 @@ class FederatedParameterServer(Node): ), "w", ) as of: - json.dump( - self.model.shared_parameters_counter.numpy().tolist(), of) + json.dump(self.model.shared_parameters_counter.numpy().tolist(), of) self.disconnect_neighbors() logging.info("Storing final weight") diff --git a/src/decentralizepy/sharing/Sharing.py b/src/decentralizepy/sharing/Sharing.py index 49419214a36260574d8c617d187273dc0501fd25..1becb58cf339f11d03833f846c354ba780de677a 100644 --- a/src/decentralizepy/sharing/Sharing.py +++ b/src/decentralizepy/sharing/Sharing.py @@ -80,15 +80,13 @@ class Sharing: result = dict(data) if self.compress: if "params" in result: - result["params"] = self.compressor.compress_float( - result["params"]) + result["params"] = self.compressor.compress_float(result["params"]) return result def decompress_data(self, data): if self.compress: if "params" in data: - data["params"] = self.compressor.decompress_float( - data["params"]) + data["params"] = self.compressor.decompress_float(data["params"]) return data def serialized_model(self): diff --git a/src/decentralizepy/sharing/Wavelet.py b/src/decentralizepy/sharing/Wavelet.py index b0a4796fa909cd7cfc3d333bfb4bab45236851a9..7fff1641650025dd8c0480957056e130bc7cab99 100644 --- a/src/decentralizepy/sharing/Wavelet.py +++ b/src/decentralizepy/sharing/Wavelet.py @@ -134,8 +134,7 @@ class Wavelet(PartialModel): self.change_based_selection = change_based_selection # Do a dummy transform to get the shape and coefficents slices - coeff = pywt.wavedec(self.init_model.numpy(), - self.wavelet, level=self.level) + coeff = pywt.wavedec(self.init_model.numpy(), self.wavelet, level=self.level) data, coeff_slices = pywt.coeffs_to_array(coeff) self.wt_shape = data.shape self.coeff_slices = coeff_slices @@ -212,8 +211,7 @@ class Wavelet(PartialModel): with open( os.path.join( self.folder_path, - "{}_shared_params.json".format( - self.communication_round + 1), + "{}_shared_params.json".format(self.communication_round + 1), ), "w", ) as of: diff --git a/src/decentralizepy/train_test_evaluation.py b/src/decentralizepy/train_test_evaluation.py deleted file mode 100644 index 95f407c2050a4b848d2c6d04fd5f4a5de06f5a2c..0000000000000000000000000000000000000000 --- a/src/decentralizepy/train_test_evaluation.py +++ /dev/null @@ -1,94 +0,0 @@ -import logging -import os -from pathlib import Path - -import numpy as np -import torch - -from decentralizepy.graphs import Graph - - -class TrainTestHelper: - def __init__( - self, - dataset, - model, - loss, - dir, - n_procs, - trainer, - comm, - graph: Graph, - threads_per_proc, - eval_train=False, - ): - self.dataset = dataset - self.model = model - self.loss = loss - self.dir = Path(dir) - self.n_procs = n_procs - self.trainer = trainer - self.comm = comm - self.star = graph - self.threads_per_proc = threads_per_proc - self.eval_train = eval_train - - def train_test_evaluation(self, iteration): - with torch.no_grad(): - self.model.eval() - total_threads = os.cpu_count() - torch.set_num_threads(total_threads) - - neighbors = self.star.neighbors(0) - state_dict_copy = {} - shapes = [] - lens = [] - to_cat = [] - for key, val in self.model.state_dict().items(): - shapes.append(val.shape) - clone_val = val.clone().detach() - state_dict_copy[key] = clone_val - flat = clone_val.flatten() - to_cat.append(flat) - lens.append(flat.shape[0]) - - my_weight = torch.cat(to_cat) - weights = [my_weight] - # TODO: add weight of node 0 - for i in neighbors: - sender, data = self.comm.receive() - logging.info(f"Received weight from {sender}") - weights.append(data) - - # averaging - average_weight = np.mean([w.numpy() for w in weights], axis=0) - - start_index = 0 - average_weight_dict = {} - for i, key in enumerate(state_dict_copy): - end_index = start_index + lens[i] - average_weight_dict[key] = torch.from_numpy( - average_weight[start_index:end_index].reshape(shapes[i]) - ) - start_index = end_index - self.model.load_state_dict(average_weight_dict) - if self.eval_train: - logging.info("Evaluating on train set.") - trl = self.trainer.eval_loss(self.dataset) - else: - trl = None - logging.info("Evaluating on test set.") - ta, tl = self.dataset.test(self.model, self.loss) - # reload old weight - self.model.load_state_dict(state_dict_copy) - - if trl is not None: - print(iteration, ta, tl, trl) - else: - print(iteration, ta, tl) - - torch.set_num_threads(self.threads_per_proc) - for neighbor in neighbors: - self.comm.send(neighbor, "finished") - self.model.train() - return ta, tl, trl diff --git a/src/decentralizepy/utils.py b/src/decentralizepy/utils.py index 03b36f4093b6a78bbe1d3561cc0a26dca5740b18..0d5bda39874ad348d63bc592c702b17a5e5f2290 100644 --- a/src/decentralizepy/utils.py +++ b/src/decentralizepy/utils.py @@ -83,8 +83,6 @@ def get_args(): parser.add_argument("-ta", "--test_after", type=int, default=5) parser.add_argument("-tea", "--train_evaluate_after", type=int, default=1) parser.add_argument("-ro", "--reset_optimizer", type=int, default=1) - parser.add_argument("-ctr", "--centralized_train_eval", type=int, default=0) - parser.add_argument("-cte", "--centralized_test_eval", type=int, default=0) parser.add_argument("-sm", "--server_machine", type=int, default=0) parser.add_argument("-sr", "--server_rank", type=int, default=-1) parser.add_argument("-wr", "--working_rate", type=float, default=1.0) @@ -119,8 +117,6 @@ def write_args(args, path): "test_after": args.test_after, "train_evaluate_after": args.train_evaluate_after, "reset_optimizer": args.reset_optimizer, - "centralized_train_eval": args.centralized_train_eval, - "centralized_test_eval": args.centralized_test_eval, "working_rate": args.working_rate, } with open(os.path.join(path, "args.json"), "w") as of: diff --git a/tutorial/96_nodes_random1.edges b/tutorial/96_nodes_random1.edges deleted file mode 100644 index 80a38679eee7560e52eb305a8ff0783f602c6071..0000000000000000000000000000000000000000 --- a/tutorial/96_nodes_random1.edges +++ /dev/null @@ -1,547 +0,0 @@ -96 -0 1 -0 67 -0 68 -0 9 -0 82 -0 19 -0 52 -0 91 -0 95 -1 0 -1 66 -1 2 -1 29 -1 51 -1 91 -1 93 -2 1 -2 3 -2 8 -2 41 -2 14 -2 89 -2 27 -2 93 -3 2 -3 4 -3 8 -3 11 -3 44 -3 77 -3 93 -4 3 -4 5 -4 76 -4 81 -4 18 -4 19 -4 59 -4 29 -5 37 -5 4 -5 53 -5 6 -6 5 -6 7 -6 72 -6 40 -6 47 -6 90 -7 8 -7 90 -7 19 -7 6 -8 2 -8 3 -8 7 -8 41 -8 9 -8 77 -8 78 -8 21 -9 0 -9 8 -9 10 -9 21 -9 89 -9 91 -9 28 -10 80 -10 9 -10 11 -10 93 -11 3 -11 10 -11 12 -11 45 -11 80 -11 87 -12 33 -12 11 -12 13 -12 18 -12 53 -12 55 -13 65 -13 50 -13 12 -13 14 -14 2 -14 13 -14 15 -14 81 -14 52 -14 21 -14 58 -14 91 -15 16 -15 60 -15 29 -15 14 -16 33 -16 15 -16 17 -16 21 -16 24 -16 60 -17 16 -17 81 -17 18 -17 63 -18 4 -18 12 -18 17 -18 51 -18 19 -18 89 -18 29 -19 0 -19 4 -19 37 -19 7 -19 40 -19 47 -19 18 -19 20 -19 22 -19 58 -20 69 -20 76 -20 79 -20 19 -20 21 -20 86 -20 95 -21 34 -21 8 -21 9 -21 74 -21 14 -21 16 -21 20 -21 22 -22 19 -22 21 -22 23 -23 66 -23 82 -23 22 -23 24 -23 89 -23 94 -24 37 -24 38 -24 16 -24 84 -24 23 -24 25 -24 58 -24 27 -25 24 -25 26 -25 61 -26 25 -26 27 -26 69 -27 64 -27 33 -27 2 -27 87 -27 24 -27 26 -27 28 -27 31 -28 69 -28 9 -28 74 -28 27 -28 29 -29 1 -29 4 -29 43 -29 15 -29 18 -29 91 -29 28 -29 30 -30 31 -30 29 -30 55 -31 32 -31 75 -31 27 -31 30 -32 65 -32 33 -32 39 -32 84 -32 31 -33 32 -33 34 -33 67 -33 68 -33 12 -33 77 -33 16 -33 27 -34 33 -34 35 -34 70 -34 21 -34 89 -34 92 -34 61 -35 66 -35 34 -35 36 -35 83 -35 58 -36 67 -36 35 -36 37 -36 85 -36 88 -37 36 -37 5 -37 38 -37 51 -37 19 -37 24 -37 61 -38 37 -38 39 -38 77 -38 82 -38 24 -39 32 -39 38 -39 40 -39 74 -39 43 -39 81 -40 68 -40 6 -40 39 -40 41 -40 19 -40 61 -41 2 -41 8 -41 40 -41 42 -41 43 -41 81 -42 41 -42 43 -42 70 -43 39 -43 41 -43 42 -43 44 -43 29 -44 43 -44 3 -44 45 -45 70 -45 75 -45 11 -45 44 -45 78 -45 79 -45 46 -46 45 -46 47 -47 6 -47 75 -47 46 -47 48 -47 19 -48 73 -48 47 -48 49 -49 48 -49 50 -49 52 -50 13 -50 49 -50 83 -50 51 -50 89 -50 92 -51 1 -51 37 -51 70 -51 80 -51 81 -51 18 -51 50 -51 52 -51 87 -52 0 -52 66 -52 14 -52 49 -52 51 -52 53 -52 62 -52 95 -53 5 -53 71 -53 12 -53 83 -53 52 -53 54 -54 55 -54 53 -54 87 -55 12 -55 54 -55 56 -55 92 -55 30 -55 95 -56 80 -56 81 -56 55 -56 57 -56 91 -57 56 -57 89 -57 90 -57 58 -58 35 -58 71 -58 14 -58 19 -58 24 -58 57 -58 59 -59 58 -59 4 -59 60 -60 15 -60 16 -60 84 -60 59 -60 61 -61 34 -61 37 -61 40 -61 25 -61 60 -61 62 -62 52 -62 61 -62 63 -63 64 -63 17 -63 68 -63 62 -64 65 -64 27 -64 63 -65 32 -65 64 -65 66 -65 13 -65 87 -66 1 -66 65 -66 67 -66 35 -66 52 -66 23 -67 0 -67 33 -67 66 -67 36 -67 68 -67 94 -68 0 -68 33 -68 67 -68 69 -68 40 -68 88 -68 94 -68 63 -69 68 -69 70 -69 20 -69 26 -69 28 -70 34 -70 69 -70 71 -70 42 -70 45 -70 51 -71 72 -71 58 -71 53 -71 70 -72 73 -72 71 -72 6 -72 95 -73 48 -73 74 -73 72 -73 84 -74 39 -74 73 -74 75 -74 21 -74 28 -75 74 -75 76 -75 45 -75 47 -75 93 -75 31 -76 4 -76 75 -76 77 -76 20 -76 87 -76 88 -76 95 -77 33 -77 3 -77 38 -77 8 -77 76 -77 78 -78 8 -78 77 -78 45 -78 79 -79 45 -79 78 -79 80 -79 20 -79 94 -80 88 -80 10 -80 11 -80 79 -80 81 -80 51 -80 56 -80 91 -81 4 -81 39 -81 41 -81 14 -81 80 -81 17 -81 82 -81 51 -81 56 -82 0 -82 38 -82 81 -82 83 -82 23 -82 89 -83 35 -83 82 -83 50 -83 84 -83 53 -84 32 -84 73 -84 83 -84 85 -84 24 -84 60 -85 36 -85 86 -85 84 -86 20 -86 85 -86 87 -87 65 -87 11 -87 76 -87 51 -87 54 -87 86 -87 88 -87 27 -88 36 -88 68 -88 76 -88 80 -88 87 -88 89 -89 2 -89 34 -89 9 -89 18 -89 50 -89 82 -89 23 -89 88 -89 57 -89 90 -90 6 -90 7 -90 89 -90 91 -90 57 -91 0 -91 1 -91 9 -91 14 -91 80 -91 56 -91 90 -91 92 -91 29 -92 34 -92 50 -92 55 -92 91 -92 93 -93 1 -93 2 -93 3 -93 10 -93 75 -93 92 -93 94 -94 67 -94 68 -94 79 -94 23 -94 93 -94 95 -95 0 -95 72 -95 76 -95 20 -95 52 -95 55 -95 94 diff --git a/tutorial/96_regular.edges b/tutorial/96_regular.edges deleted file mode 100644 index 0db09a2763b09045c8218ce25e640f052b6089e8..0000000000000000000000000000000000000000 --- a/tutorial/96_regular.edges +++ /dev/null @@ -1,381 +0,0 @@ -96 -0 24 -0 1 -0 26 -0 95 -1 2 -1 0 -1 82 -1 83 -2 33 -2 90 -2 3 -2 1 -3 2 -3 4 -3 14 -3 79 -4 3 -4 12 -4 5 -4 86 -5 64 -5 42 -5 4 -5 6 -6 9 -6 5 -6 62 -6 7 -7 24 -7 8 -7 45 -7 6 -8 81 -8 17 -8 9 -8 7 -9 8 -9 10 -9 53 -9 6 -10 9 -10 11 -10 29 -10 31 -11 80 -11 10 -11 36 -11 12 -12 11 -12 4 -12 13 -12 70 -13 12 -13 53 -13 30 -13 14 -14 3 -14 15 -14 13 -14 47 -15 16 -15 26 -15 14 -16 41 -16 17 -16 15 -17 8 -17 16 -17 18 -17 83 -18 17 -18 19 -18 95 -18 63 -19 82 -19 18 -19 20 -19 22 -20 19 -20 59 -20 21 -20 22 -21 72 -21 58 -21 20 -21 22 -22 19 -22 20 -22 21 -22 23 -23 24 -23 65 -23 85 -23 22 -24 0 -24 25 -24 23 -24 7 -25 32 -25 24 -25 26 -25 38 -26 0 -26 25 -26 27 -26 15 -27 32 -27 26 -27 28 -27 63 -28 27 -28 92 -28 29 -28 39 -29 10 -29 52 -29 28 -29 30 -30 66 -30 29 -30 13 -30 31 -31 32 -31 10 -31 36 -31 30 -32 25 -32 27 -32 31 -32 33 -33 32 -33 2 -33 84 -33 34 -34 33 -34 50 -34 35 -34 93 -35 57 -35 34 -35 43 -35 36 -36 35 -36 11 -36 37 -36 31 -37 88 -37 36 -37 38 -37 79 -38 25 -38 37 -38 39 -38 49 -39 40 -39 28 -39 77 -39 38 -40 41 -40 91 -40 39 -40 87 -41 16 -41 40 -41 42 -41 51 -42 41 -42 43 -42 5 -43 42 -43 35 -43 44 -44 72 -44 43 -44 75 -44 45 -45 67 -45 44 -45 46 -45 7 -46 76 -46 45 -46 54 -46 47 -47 48 -47 65 -47 14 -47 46 -48 56 -48 49 -48 61 -48 47 -49 48 -49 50 -49 38 -49 71 -50 49 -50 34 -50 51 -50 93 -51 41 -51 50 -51 52 -51 95 -52 51 -52 74 -52 53 -52 29 -53 9 -53 52 -53 13 -53 54 -54 75 -54 53 -54 46 -54 55 -55 56 -55 69 -55 85 -55 54 -56 48 -56 57 -56 69 -56 55 -57 56 -57 89 -57 58 -57 35 -58 57 -58 59 -58 21 -58 86 -59 73 -59 58 -59 20 -59 60 -60 62 -60 59 -60 61 -60 78 -61 48 -61 62 -61 60 -61 94 -62 60 -62 61 -62 6 -62 63 -63 64 -63 18 -63 27 -63 62 -64 65 -64 84 -64 5 -64 63 -65 64 -65 66 -65 23 -65 47 -66 65 -66 89 -66 67 -66 30 -67 80 -67 66 -67 68 -67 45 -68 67 -68 92 -68 69 -68 94 -69 56 -69 68 -69 70 -69 55 -70 90 -70 12 -70 69 -70 71 -71 72 -71 49 -71 70 -71 87 -72 73 -72 44 -72 21 -72 71 -73 72 -73 91 -73 59 -73 74 -74 73 -74 75 -74 52 -74 76 -75 74 -75 44 -75 54 -75 76 -76 74 -76 75 -76 77 -76 46 -77 81 -77 76 -77 78 -77 39 -78 88 -78 60 -78 77 -78 79 -79 80 -79 3 -79 37 -79 78 -80 81 -80 67 -80 11 -80 79 -81 8 -81 82 -81 80 -81 77 -82 81 -82 1 -82 83 -82 19 -83 1 -83 82 -83 84 -83 17 -84 64 -84 33 -84 83 -84 85 -85 84 -85 55 -85 86 -85 23 -86 58 -86 4 -86 85 -86 87 -87 40 -87 88 -87 86 -87 71 -88 89 -88 37 -88 78 -88 87 -89 88 -89 57 -89 66 -89 90 -90 89 -90 2 -90 91 -90 70 -91 40 -91 73 -91 90 -91 92 -92 93 -92 91 -92 68 -92 28 -93 50 -93 34 -93 94 -93 92 -94 93 -94 68 -94 61 -94 95 -95 0 -95 18 -95 51 -95 94 diff --git a/tutorial/ip.json b/tutorial/ip.json index 12eb2052c1a72995fe228925f39ca2ab547c5054..15d6591df53574707ac03627fa19c9ecd749b1e3 100644 --- a/tutorial/ip.json +++ b/tutorial/ip.json @@ -1,8 +1,3 @@ { - "0": "10.90.41.128", - "1": "10.90.41.129", - "2": "10.90.41.130", - "3": "10.90.41.131", - "4": "10.90.41.132", - "5": "10.90.41.133" + "0": "127.0.0.1" } \ No newline at end of file diff --git a/tutorial/run_decentralized.sh b/tutorial/run_decentralized.sh index 2fdf386a098dd9ae551ae41ffeb25401bceb6980..692bb430b2032bbd7e1109a27a97aa3225165f1e 100755 --- a/tutorial/run_decentralized.sh +++ b/tutorial/run_decentralized.sh @@ -8,7 +8,7 @@ graph=/mnt/nfs/risharma/Gitlab/tutorial/96_regular.edges original_config=/mnt/nfs/risharma/Gitlab/tutorial/config_celeba_sharing.ini config_file=~/tmp/config.ini procs_per_machine=16 -machines=6 +machines=1 iterations=80 test_after=20 eval_file=testingPeerSampler.py @@ -21,4 +21,4 @@ mkdir -p $log_dir cp $original_config $config_file # echo "alpha = 0.10" >> $config_file -$env_python $eval_file -ro 0 -tea $test_after -ld $log_dir -mid $m -ps $procs_per_machine -ms $machines -is $iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level -ctr 0 -cte 0 -wsd $log_dir \ No newline at end of file +$env_python $eval_file -ro 0 -tea $test_after -ld $log_dir -mid $m -ps $procs_per_machine -ms $machines -is $iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level -wsd $log_dir \ No newline at end of file diff --git a/tutorial/run_federated.sh b/tutorial/run_federated.sh index dfe360bb2b629c3035c7154d26a9354fc518d34b..5113ed79073cf94fb99a2b66597bd600007fd45c 100755 --- a/tutorial/run_federated.sh +++ b/tutorial/run_federated.sh @@ -8,7 +8,7 @@ graph=/mnt/nfs/risharma/Gitlab/tutorial/96_regular.edges original_config=/mnt/nfs/risharma/Gitlab/tutorial/config_celeba_sharing.ini config_file=~/tmp/config.ini procs_per_machine=16 -machines=6 +machines=1 iterations=80 test_after=20 eval_file=testingFederated.py