diff --git a/split_into_files.py b/split_into_files.py index ee10bc133cf7d5e85df4f96137d9c9d127df3b77..b689e9b8d82ae20962a884a39d575e2b4e5626b1 100644 --- a/split_into_files.py +++ b/split_into_files.py @@ -2,4 +2,4 @@ from decentralizepy.datasets.Femnist import Femnist f = Femnist() -f.file_per_user('leaf/data/femnist/data/train','leaf/data/femnist/per_user_data/train') +f.file_per_user("leaf/data/femnist/data/train", "leaf/data/femnist/per_user_data/train") diff --git a/src/decentralizepy/communication/Communication.py b/src/decentralizepy/communication/Communication.py index 42c627894940fa2e537d0865dd2290b99447fe46..4991d73fdab03272be379d344e06fc9a40e3fdf5 100644 --- a/src/decentralizepy/communication/Communication.py +++ b/src/decentralizepy/communication/Communication.py @@ -2,6 +2,7 @@ class Communication: """ Communcation API """ + def __init__(self, rank, machine_id, mapping, total_procs): self.total_procs = total_procs self.rank = rank @@ -17,7 +18,7 @@ class Communication: def connect_neighbors(self, neighbors): raise NotImplementedError - + def receive(self): raise NotImplementedError @@ -25,4 +26,4 @@ class Communication: raise NotImplementedError def disconnect_neighbors(self): - raise NotImplementedError \ No newline at end of file + raise NotImplementedError diff --git a/src/decentralizepy/communication/TCP.py b/src/decentralizepy/communication/TCP.py index 5d91c57ac2aeb4e5bd9a51a76544dd08a6493864..18423a86bdaf8934634ea78bc1779ff41585d983 100644 --- a/src/decentralizepy/communication/TCP.py +++ b/src/decentralizepy/communication/TCP.py @@ -22,7 +22,7 @@ class TCP(Communication): def __init__(self, rank, machine_id, mapping, total_procs, addresses_filepath): super().__init__(rank, machine_id, mapping, total_procs) - + with open(addresses_filepath) as addrs: self.ip_addrs = json.load(addrs) @@ -111,6 +111,6 @@ class TCP(Communication): def disconnect_neighbors(self): if not self.sent_disconnections: - for sock in self.peer_sockets.values(): - sock.send(BYE) - self.sent_disconnections = True \ No newline at end of file + for sock in self.peer_sockets.values(): + sock.send(BYE) + self.sent_disconnections = True diff --git a/src/decentralizepy/datasets/Dataset.py b/src/decentralizepy/datasets/Dataset.py index 5adae3ba876933186bef1bd0cd7a4e0d917f3ef1..7a8e4a397a92dd3c27c1c16a8a7a76709f102ae9 100644 --- a/src/decentralizepy/datasets/Dataset.py +++ b/src/decentralizepy/datasets/Dataset.py @@ -7,7 +7,15 @@ class Dataset: All datasets must follow this API. """ - def __init__(self, rank="", n_procs="", train_dir="", test_dir="", sizes="", test_batch_size=""): + def __init__( + self, + rank="", + n_procs="", + train_dir="", + test_dir="", + sizes="", + test_batch_size="", + ): """ Constructor which reads the data files, instantiates and partitions the dataset Parameters diff --git a/src/decentralizepy/datasets/Femnist.py b/src/decentralizepy/datasets/Femnist.py index 16936abff02e7b9c42a8aca7496eae40720c09a9..ccb469c0332c10602029edbf079ba8088d14ce61 100644 --- a/src/decentralizepy/datasets/Femnist.py +++ b/src/decentralizepy/datasets/Femnist.py @@ -29,8 +29,12 @@ class Femnist(Dataset): def __read_file__(self, file_path): with open(file_path, "r") as inf: - client_data = json.load(inf) - return client_data["users"], client_data["num_samples"], client_data["user_data"] + client_data = json.load(inf) + return ( + client_data["users"], + client_data["num_samples"], + client_data["user_data"], + ) def __read_dir__(self, data_dir): """ @@ -67,10 +71,9 @@ class Femnist(Dataset): my_data["num_samples"] = num_samples[index] my_samples = {"x": train_data[client]["x"], "y": train_data[client]["y"]} my_data["user_data"] = {client: my_samples} - with open(os.path.join(write_dir, client+".json"), "w") as of: + with open(os.path.join(write_dir, client + ".json"), "w") as of: json.dump(my_data, of) - print("Created File: ", client+".json") - + print("Created File: ", client + ".json") def load_trainset(self): logging.info("Loading training set.") @@ -79,7 +82,7 @@ class Femnist(Dataset): files.sort() c_len = len(files) - #clients, num_samples, train_data = self.__read_dir__(self.train_dir) + # clients, num_samples, train_data = self.__read_dir__(self.train_dir) if self.sizes == None: # Equal distribution of data among processes e = c_len // self.n_procs @@ -97,7 +100,9 @@ class Femnist(Dataset): for i in range(my_clients.__len__()): cur_file = my_clients.__getitem__(i) - clients, _, train_data = self.__read_file__(os.path.join(self.train_dir, cur_file)) + clients, _, train_data = self.__read_file__( + os.path.join(self.train_dir, cur_file) + ) for cur_client in clients: self.clients.append(cur_client) my_train_data["x"].extend(train_data[cur_client]["x"]) @@ -108,13 +113,10 @@ class Femnist(Dataset): .reshape(-1, 28, 28, 1) .transpose(0, 3, 1, 2) ) - self.train_y = np.array( - my_train_data["y"], dtype=np.dtype("int64") - ).reshape(-1) + self.train_y = np.array(my_train_data["y"], dtype=np.dtype("int64")).reshape(-1) logging.debug("train_x.shape: %s", str(self.train_x.shape)) logging.debug("train_y.shape: %s", str(self.train_y.shape)) - def load_testset(self): logging.info("Loading testing set.") _, _, test_data = self.__read_dir__(self.test_dir) @@ -134,9 +136,15 @@ class Femnist(Dataset): logging.debug("test_x.shape: %s", str(self.test_x.shape)) logging.debug("test_y.shape: %s", str(self.test_y.shape)) - - - def __init__(self, rank=0, n_procs="", train_dir="", test_dir="", sizes="", test_batch_size=1024): + def __init__( + self, + rank=0, + n_procs="", + train_dir="", + test_dir="", + sizes="", + test_batch_size=1024, + ): """ Constructor which reads the data files, instantiates and partitions the dataset Parameters @@ -230,7 +238,9 @@ class Femnist(Dataset): If the test set was not initialized """ if self.__testing__: - return DataLoader(Data(self.test_x, self.test_y), batch_size=self.test_batch_size) + return DataLoader( + Data(self.test_x, self.test_y), batch_size=self.test_batch_size + ) raise RuntimeError("Test set not initialized!") def imshow(self, img): diff --git a/src/decentralizepy/graphs/Graph.py b/src/decentralizepy/graphs/Graph.py index a2bd0e302e6685d370fe105e5be3ef9802ad4c43..5f3ccf1c6d50faf1650571d040f99a905e7f5a98 100644 --- a/src/decentralizepy/graphs/Graph.py +++ b/src/decentralizepy/graphs/Graph.py @@ -86,18 +86,17 @@ class Graph: def write_graph_to_file(self, file, type="edges"): with open(file, "w") as of: - of.write(str(self.n_procs) + '\n') + of.write(str(self.n_procs) + "\n") if type == "edges": for node, adj in enumerate(self.adj_list): for neighbor in adj: - of.write("{} {}".format(node, neighbor)+ '\n') + of.write("{} {}".format(node, neighbor) + "\n") elif type == "adjacency": for adj in self.adj_list: - of.write(str(*adj) + '\n') + of.write(str(*adj) + "\n") else: raise ValueError("type must be from {edges, adjacency}!") - def connect_graph(self): """ Connects the graph using a Ring diff --git a/src/decentralizepy/node/Node.py b/src/decentralizepy/node/Node.py index c0732a60c4195a990ab3c4dadd4cbc56ec8d53db..3e0ffbabe6fd00050c4d1aa3e73b05df5f608b92 100644 --- a/src/decentralizepy/node/Node.py +++ b/src/decentralizepy/node/Node.py @@ -23,7 +23,7 @@ class Node: iterations=1, log_dir=".", log_level=logging.INFO, - test_after = 5, + test_after=5, *args ): """ @@ -129,32 +129,39 @@ class Node: ) self.trainer = train_class(self.model, self.optimizer, loss, **train_params) - comm_configs = config["COMMUNICATION"] comm_module = importlib.import_module(comm_configs["comm_package"]) comm_class = getattr(comm_module, comm_configs["comm_class"]) comm_params = utils.remove_keys(comm_configs, ["comm_package", "comm_class"]) - self.communication = comm_class(self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params) + self.communication = comm_class( + self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params + ) self.communication.connect_neighbors(self.graph.neighbors(self.uid)) - + sharing_configs = config["SHARING"] sharing_package = importlib.import_module(sharing_configs["sharing_package"]) sharing_class = getattr(sharing_package, sharing_configs["sharing_class"]) - self.sharing = sharing_class(self.rank, self.machine_id, self.communication, self.mapping, self.graph, self.model, self.dataset) + self.sharing = sharing_class( + self.rank, + self.machine_id, + self.communication, + self.mapping, + self.graph, + self.model, + self.dataset, + ) - - self.testset = self.dataset.get_testset() rounds_to_test = test_after for iteration in range(iterations): logging.info("Starting training iteration: %d", iteration) self.trainer.train(self.dataset) - + self.sharing.step() rounds_to_test -= 1 - + if self.dataset.__testing__ and rounds_to_test == 0: rounds_to_test = test_after self.dataset.test(self.model) diff --git a/src/decentralizepy/sharing/Sharing.py b/src/decentralizepy/sharing/Sharing.py index b29f8be197dd4792805d5032f05154abc6012ef3..0e78c15175a18ae2f5ca7cf31273092a3c311078 100644 --- a/src/decentralizepy/sharing/Sharing.py +++ b/src/decentralizepy/sharing/Sharing.py @@ -1,8 +1,10 @@ -from collections import deque import json import logging -import torch +from collections import deque + import numpy +import torch + class Sharing: """ @@ -29,7 +31,6 @@ class Sharing: if len(i) == 0: return False return True - def get_neighbors(self, neighbors): # modify neighbors here @@ -46,14 +47,13 @@ class Sharing: for key, value in m.items(): state_dict[key] = torch.from_numpy(numpy.array(json.loads(value))) return state_dict - def step(self): data = self.serialized_model() my_uid = self.mapping.get_uid(self.rank, self.machine_id) all_neighbors = self.graph.neighbors(my_uid) iter_neighbors = self.get_neighbors(all_neighbors) - data['degree'] = len(all_neighbors) + data["degree"] = len(all_neighbors) for neighbor in iter_neighbors: self.communication.send(neighbor, data) @@ -63,14 +63,14 @@ class Sharing: degree = data["degree"] del data["degree"] self.peer_deques[sender].append((degree, self.deserialized_model(data))) - + logging.info("Starting model averaging after receiving from all neighbors") total = dict() weight_total = 0 for i, n in enumerate(self.peer_deques): logging.debug("Averaging model from neighbor {}".format(i)) degree, data = self.peer_deques[n].popleft() - weight = 1/(max(len(self.peer_deques), degree) + 1) # Metro-Hastings + weight = 1 / (max(len(self.peer_deques), degree) + 1) # Metro-Hastings weight_total += weight for key, value in data.items(): if key in total: @@ -79,8 +79,8 @@ class Sharing: total[key] = value * weight for key, value in self.model.state_dict().items(): - total[key] += (1 - weight_total) * value # Metro-Hastings + total[key] += (1 - weight_total) * value # Metro-Hastings self.model.load_state_dict(total) - logging.info("Model averaging complete") \ No newline at end of file + logging.info("Model averaging complete") diff --git a/testing.py b/testing.py index 7dfdb958fa63fb336b8a2be1d6281de4c4ad8fb2..5a50a3b21450c45dcbf70347525f414604752d01 100644 --- a/testing.py +++ b/testing.py @@ -1,10 +1,13 @@ -from decentralizepy.node.Node import Node -from decentralizepy.graphs.Graph import Graph -from decentralizepy.mappings.Linear import Linear -from torch import multiprocessing as mp +import argparse import logging from localconfig import LocalConfig +from torch import multiprocessing as mp + +from decentralizepy.graphs.Graph import Graph +from decentralizepy.mappings.Linear import Linear +from decentralizepy.node.Node import Node + def read_ini(file_path): config = LocalConfig(file_path) @@ -12,20 +15,32 @@ def read_ini(file_path): print("Section: ", section) for key, value in config.items(section): print((key, value)) - print(dict(config.items('DATASET'))) + print(dict(config.items("DATASET"))) return config -if __name__ == "__main__": +if __name__ == "__main__": config = read_ini("config.ini") my_config = dict() for section in config: my_config[section] = dict(config.items(section)) + parser = argparse.ArgumentParser() + parser.add_argument("-mid", "--machine_id", type=int, default=0) + parser.add_argument("-ps", "--procs_per_machine", type=int, default=1) + parser.add_argument("-ms", "--machines", type=int, default=1) + + args = parser.parse_args() + g = Graph() g.read_graph_from_file("36_nodes.edges", "edges") - n_machines = 3 - procs_per_machine = 12 + n_machines = args.machines + procs_per_machine = args.procs_per_machine l = Linear(n_machines, procs_per_machine) + m_id = args.machine_id - mp.spawn(fn = Node, nprocs = procs_per_machine, args=[0,l,g,my_config,20,"results",logging.DEBUG]) + mp.spawn( + fn=Node, + nprocs=procs_per_machine, + args=[m_id, l, g, my_config, 20, "results", logging.DEBUG], + )