diff --git a/eval/4_node_fullyConnected.edges b/eval/4_node_fullyConnected.edges new file mode 100644 index 0000000000000000000000000000000000000000..8815c900036158941995514b0dabb7761156f589 --- /dev/null +++ b/eval/4_node_fullyConnected.edges @@ -0,0 +1,13 @@ +4 +0 1 +0 2 +0 3 +1 0 +1 2 +1 3 +2 0 +2 1 +2 3 +3 0 +3 1 +3 2 diff --git a/eval/config.ini b/eval/config.ini index 74b79298ed917774a5cff7aab6776858b58c3e9e..f21f3da6067b68e0bd5ab3d8f4a96329a4e8436e 100644 --- a/eval/config.ini +++ b/eval/config.ini @@ -2,7 +2,7 @@ dataset_package = decentralizepy.datasets.Femnist dataset_class = Femnist model_class = CNN -n_procs = 16 +n_procs = 4 train_dir = /home/risharma/Gitlab/decentralizepy/leaf/data/femnist/per_user_data/train test_dir = /home/risharma/Gitlab/decentralizepy/leaf/data/femnist/data/test ; python list of fractions below @@ -14,9 +14,9 @@ optimizer_class = Adam lr = 0.01 [TRAIN_PARAMS] -training_package = decentralizepy.training.Training -training_class = Training -epochs_per_round = 5 +training_package = decentralizepy.training.GradientAccumulator +training_class = GradientAccumulator +epochs_per_round = 2 batch_size = 1024 shuffle = True loss_package = torch.nn @@ -28,5 +28,6 @@ comm_class = TCP addresses_filepath = ip_addr_6Machines.json [SHARING] -sharing_package = decentralizepy.sharing.Sharing -sharing_class = Sharing \ No newline at end of file +sharing_package = decentralizepy.sharing.PartialModel +sharing_class = PartialModel +alpha = 0.5 \ No newline at end of file diff --git a/eval/main.ipynb b/eval/main.ipynb index 0d4bbb2a5c0e0845bbc9a23f9a3daffca43dabbb..dced52aab484a7dde59e30c854e21bca3be489e7 100644 --- a/eval/main.ipynb +++ b/eval/main.ipynb @@ -507,6 +507,18 @@ "print(*a)" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from decentralizepy.graphs.FullyConnected import FullyConnected\n", + "\n", + "s = FullyConnected(4)\n", + "s.write_graph_to_file('4_node_fullyConnected.edges')" + ] + }, { "cell_type": "code", "execution_count": 2, diff --git a/eval/testing.py b/eval/testing.py index 459c1de1ce2a78be8a74459a02e34f98954de9ae..563b73d80acccf68fbd99c24b7a12865bd475ec8 100644 --- a/eval/testing.py +++ b/eval/testing.py @@ -1,11 +1,11 @@ -import argparse -import datetime import logging from pathlib import Path +from shutil import copy from localconfig import LocalConfig from torch import multiprocessing as mp +from decentralizepy import utils from decentralizepy.graphs.Graph import Graph from decentralizepy.mappings.Linear import Linear from decentralizepy.node.Node import Node @@ -22,23 +22,10 @@ def read_ini(file_path): if __name__ == "__main__": - - parser = argparse.ArgumentParser() - parser.add_argument("-mid", "--machine_id", type=int, default=0) - parser.add_argument("-ps", "--procs_per_machine", type=int, default=1) - parser.add_argument("-ms", "--machines", type=int, default=1) - parser.add_argument( - "-ld", "--log_dir", type=str, default="./{}".format(datetime.datetime.now()) - ) - parser.add_argument("-is", "--iterations", type=int, default=1) - parser.add_argument("-cf", "--config_file", type=str, default="config.ini") - parser.add_argument("-ll", "--log_level", type=str, default="INFO") - parser.add_argument("-gf", "--graph_file", type=str, default="36_nodes.edges") - parser.add_argument("-gt", "--graph_type", type=str, default="edges") - - args = parser.parse_args() + args = utils.get_args() Path(args.log_dir).mkdir(parents=True, exist_ok=True) + log_level = { "INFO": logging.INFO, "DEBUG": logging.DEBUG, @@ -52,6 +39,10 @@ if __name__ == "__main__": for section in config: my_config[section] = dict(config.items(section)) + copy(args.config_file, args.log_dir) + copy(args.graph_file, args.log_dir) + utils.write_args(args, args.log_dir) + g = Graph() g.read_graph_from_file(args.graph_file, args.graph_type) n_machines = args.machines diff --git a/src/decentralizepy/node/Node.py b/src/decentralizepy/node/Node.py index 5e15bbae896901a13794d87c10b551930d7f40dc..8140d12db5a53502b4fd24f7bb0c1403fa5d8d3b 100644 --- a/src/decentralizepy/node/Node.py +++ b/src/decentralizepy/node/Node.py @@ -141,6 +141,9 @@ class Node: sharing_configs = config["SHARING"] sharing_package = importlib.import_module(sharing_configs["sharing_package"]) sharing_class = getattr(sharing_package, sharing_configs["sharing_class"]) + sharing_params = utils.remove_keys( + sharing_configs, ["sharing_package", "sharing_class"] + ) self.sharing = sharing_class( self.rank, self.machine_id, @@ -149,6 +152,7 @@ class Node: self.graph, self.model, self.dataset, + **sharing_params ) self.testset = self.dataset.get_testset() diff --git a/src/decentralizepy/sharing/PartialModel.py b/src/decentralizepy/sharing/PartialModel.py index 46ba8cf721eb7b45b40b93a9744641c444e00f24..ac97c7dac6d27898f05dac5de59751170a129e3d 100644 --- a/src/decentralizepy/sharing/PartialModel.py +++ b/src/decentralizepy/sharing/PartialModel.py @@ -1,5 +1,5 @@ import json -import math +import logging import numpy import torch @@ -17,6 +17,7 @@ class PartialModel(Sharing): self.alpha = alpha def extract_sorted_gradients(self): + logging.info("Summing up gradients") assert len(self.model.accumulated_gradients) > 0 gradient_sum = self.model.accumulated_gradients[0] for i in range(1, len(self.model.accumulated_gradients)): @@ -24,28 +25,42 @@ class PartialModel(Sharing): gradient_sum[key] += self.model.accumulated_gradients[i][key] gradient_sequence = [] + logging.info("Turning gradients into tuples") + for key, gradient in gradient_sum.items(): for index, val in enumerate(torch.flatten(gradient)): gradient_sequence.append((val, key, index)) - gradient_sequence.sort() + logging.info("Sorting gradient tuples") + + gradient_sequence.sort() # bottleneck return gradient_sequence def serialized_model(self): gradient_sequence = self.extract_sorted_gradients() + logging.info("Extracted sorted gradients") gradient_sequence = gradient_sequence[ - : math.round(len(gradient_sequence) * self.alpha) + : round(len(gradient_sequence) * self.alpha) ] m = dict() for _, key, index in gradient_sequence: if key not in m: m[key] = [] - m[key].append(index, torch.flatten(self.model.state_dict()[key])[index]) + m[key].append( + ( + index, + torch.flatten(self.model.state_dict()[key])[index].numpy().tolist(), + ) + ) + + logging.info("Generated dictionary to send") for key in m: m[key] = json.dumps(m[key]) + logging.info("Converted dictionary to json") + return m def deserialized_model(self, m): @@ -54,5 +69,4 @@ class PartialModel(Sharing): for key, value in m.items(): for index, param_val in json.loads(value): torch.flatten(state_dict[key])[index] = param_val - state_dict[key] = torch.from_numpy(numpy.array(json.loads(value))) return state_dict diff --git a/src/decentralizepy/training/GradientAccumulator.py b/src/decentralizepy/training/GradientAccumulator.py index 727f6519092da3c46f2d0a9445618c6b3b35e703..9ea42e62b68c01c1c6a620d7a62c28f46a0dc2eb 100644 --- a/src/decentralizepy/training/GradientAccumulator.py +++ b/src/decentralizepy/training/GradientAccumulator.py @@ -7,7 +7,7 @@ class GradientAccumulator(Training): def __init__( self, model, optimizer, loss, epochs_per_round="", batch_size="", shuffle="" ): - super().__init__() + super().__init__(model, optimizer, loss, epochs_per_round, batch_size, shuffle) def train(self, dataset): """ @@ -30,9 +30,11 @@ class GradientAccumulator(Training): epoch_loss += loss_val.item() loss_val.backward() self.model.accumulated_gradients.append( - grad_dict={ + { k: v.grad.clone().detach() - for k, v in zip(self.model.state_dict(), self.parameters()) + for k, v in zip( + self.model.state_dict(), self.model.parameters() + ) } ) self.optimizer.step() diff --git a/src/decentralizepy/utils.py b/src/decentralizepy/utils.py index ff562697648fbb7a8bd968125dabe2ac346d538f..00cfd4f045544228f8f32fed6738d901684e8069 100644 --- a/src/decentralizepy/utils.py +++ b/src/decentralizepy/utils.py @@ -1,3 +1,9 @@ +import argparse +import datetime +import json +import os + + def conditional_value(var, nul, default): if var != nul: return var @@ -7,3 +13,37 @@ def conditional_value(var, nul, default): def remove_keys(d, keys_to_remove): return {key: d[key] for key in d if key not in keys_to_remove} + + +def get_args(): + parser = argparse.ArgumentParser() + parser.add_argument("-mid", "--machine_id", type=int, default=0) + parser.add_argument("-ps", "--procs_per_machine", type=int, default=1) + parser.add_argument("-ms", "--machines", type=int, default=1) + parser.add_argument( + "-ld", "--log_dir", type=str, default="./{}".format(datetime.datetime.now()) + ) + parser.add_argument("-is", "--iterations", type=int, default=1) + parser.add_argument("-cf", "--config_file", type=str, default="config.ini") + parser.add_argument("-ll", "--log_level", type=str, default="INFO") + parser.add_argument("-gf", "--graph_file", type=str, default="36_nodes.edges") + parser.add_argument("-gt", "--graph_type", type=str, default="edges") + + args = parser.parse_args() + return args + + +def write_args(args, path): + data = { + "machine_id": args.machine_id, + "procs_per_machine": args.procs_per_machine, + "machines": args.machines, + "log_dir": args.log_dir, + "iterations": args.iterations, + "config_file": args.config_file, + "log_level": args.log_level, + "graph_file": args.graph_file, + "graph_type": args.graph_type, + } + with open(os.path.join(path, "args.json"), "w") as of: + json.dump(data, of)