diff --git a/36_nodes.edges b/36_nodes.edges new file mode 100644 index 0000000000000000000000000000000000000000..fa7d2086818eb711d16906c5f4021dbc108393d4 --- /dev/null +++ b/36_nodes.edges @@ -0,0 +1,219 @@ +36 +0 1 +0 2 +0 35 +0 6 +1 0 +1 2 +1 17 +1 28 +1 30 +2 0 +2 1 +2 3 +2 7 +2 8 +2 19 +2 31 +3 2 +3 4 +3 5 +3 23 +3 25 +3 26 +4 34 +4 3 +4 5 +4 16 +4 18 +5 3 +5 4 +5 6 +5 10 +5 23 +6 0 +6 33 +6 5 +6 7 +6 9 +6 20 +6 26 +7 8 +7 2 +7 6 +8 32 +8 2 +8 34 +8 7 +8 9 +9 35 +9 6 +9 8 +9 10 +9 11 +9 18 +9 23 +9 31 +10 34 +10 5 +10 9 +10 11 +10 17 +10 18 +10 22 +10 23 +11 34 +11 9 +11 10 +11 12 +11 19 +11 25 +11 27 +11 29 +11 30 +12 32 +12 11 +12 13 +12 15 +12 16 +12 23 +13 12 +13 14 +13 15 +13 18 +13 25 +14 35 +14 13 +14 15 +14 16 +14 25 +15 33 +15 12 +15 13 +15 14 +15 16 +15 18 +15 27 +15 30 +16 35 +16 4 +16 12 +16 14 +16 15 +16 17 +17 1 +17 10 +17 16 +17 18 +17 19 +18 32 +18 4 +18 9 +18 10 +18 13 +18 15 +18 17 +18 19 +18 20 +19 2 +19 11 +19 17 +19 18 +19 20 +19 30 +20 35 +20 6 +20 18 +20 19 +20 21 +20 22 +20 27 +21 20 +21 22 +21 23 +21 29 +21 30 +22 10 +22 20 +22 21 +22 23 +22 25 +23 3 +23 5 +23 9 +23 10 +23 12 +23 21 +23 22 +23 24 +23 29 +24 25 +24 23 +25 33 +25 3 +25 35 +25 11 +25 13 +25 14 +25 22 +25 24 +25 26 +25 29 +25 31 +26 27 +26 25 +26 3 +26 6 +27 35 +27 11 +27 15 +27 20 +27 26 +27 28 +28 1 +28 27 +28 29 +29 11 +29 21 +29 23 +29 25 +29 28 +29 30 +30 32 +30 1 +30 11 +30 15 +30 19 +30 21 +30 29 +30 31 +31 32 +31 2 +31 9 +31 25 +31 30 +32 33 +32 8 +32 12 +32 18 +32 30 +32 31 +33 32 +33 34 +33 6 +33 15 +33 25 +34 33 +34 35 +34 4 +34 8 +34 10 +34 11 +35 0 +35 34 +35 9 +35 14 +35 16 +35 20 +35 25 +35 27 diff --git a/config.ini b/config.ini index e80c1db8ddd1d31b34bd2152dfe2779ae0024902..b0e596f0141cb39cd480bf420a49a40619d29795 100644 --- a/config.ini +++ b/config.ini @@ -1,13 +1,9 @@ -[GRAPH] -package = decentralizepy.graphs.SmallWorld -graph_class = SmallWorld - [DATASET] dataset_package = decentralizepy.datasets.Femnist dataset_class = Femnist model_class = CNN -n_procs = 6 -train_dir = leaf/data/femnist/data/train +n_procs = 12 +train_dir = leaf/data/femnist/per_user_data/train test_dir = leaf/data/femnist/data/test ; python list of fractions below sizes = @@ -20,8 +16,8 @@ lr = 0.01 [TRAIN_PARAMS] training_package = decentralizepy.training.Training training_class = Training -epochs_per_round = 5 -batch_size = 512 +epochs_per_round = 10 +batch_size = 1024 shuffle = True loss_package = torch.nn loss_class = CrossEntropyLoss diff --git a/ip_addr.json b/ip_addr.json index a700c5fb7f5f71480c816d5c26f1f56d3f88a586..bf21b91fe8c22b05a3d1a52a8438fca18d1036cd 100644 --- a/ip_addr.json +++ b/ip_addr.json @@ -1,4 +1,5 @@ { "0": "10.90.41.131", - "1": "10.90.41.132" + "1": "10.90.41.132", + "2": "10.90.41.133" } \ No newline at end of file diff --git a/main.ipynb b/main.ipynb index 8d3f9e107a9728ef5f7d0745828718c2c56b4f3b..b78550119d26beabaa3d890bc950086ef4c509b0 100644 --- a/main.ipynb +++ b/main.ipynb @@ -360,8 +360,8 @@ "('dataset_package', 'decentralizepy.datasets.Femnist')\n", "('dataset_class', 'Femnist')\n", "('model_class', 'CNN')\n", - "('n_procs', 2)\n", - "('train_dir', 'leaf/data/femnist/data/train')\n", + "('n_procs', 36)\n", + "('train_dir', 'leaf/data/femnist/per_user_data/train')\n", "('test_dir', 'leaf/data/femnist/data/test')\n", "('sizes', '')\n", "Section: OPTIMIZER_PARAMS\n", @@ -372,7 +372,7 @@ "('training_package', 'decentralizepy.training.Training')\n", "('training_class', 'Training')\n", "('epochs_per_round', 1)\n", - "('batch_size', 512)\n", + "('batch_size', 1024)\n", "('shuffle', True)\n", "('loss_package', 'torch.nn')\n", "('loss_class', 'CrossEntropyLoss')\n", @@ -383,35 +383,20 @@ "Section: SHARING\n", "('sharing_package', 'decentralizepy.sharing.Sharing')\n", "('sharing_class', 'Sharing')\n", - "{'dataset_package': 'decentralizepy.datasets.Femnist', 'dataset_class': 'Femnist', 'model_class': 'CNN', 'n_procs': 2, 'train_dir': 'leaf/data/femnist/data/train', 'test_dir': 'leaf/data/femnist/data/test', 'sizes': ''}\n", - "n: <class 'int'> 1\n", - "n: <class 'int'> 0\n", - "n: <class 'int'> 1\n", - "n: <class 'int'> 0\n", - "n: <class 'int'> 1\n", - "n: <class 'int'> 0\n", - "n: <class 'int'> 1\n", - "n: <class 'int'> 0\n", - "n: <class 'int'> 1\n", - "n: <class 'int'> 0\n", - "n: <class 'int'> 1\n", - "n: <class 'int'> 0\n", - "n: <class 'int'> 1\n", - "n: <class 'int'> 0\n", - "n: <class 'int'> 1\n", - "n: <class 'int'> 0\n", - "n: <class 'int'> 1\n", - "n: <class 'int'> 0\n", - "n: <class 'int'> 1\n", - "n: <class 'int'> 0\n", - "n: <class 'int'> 1\n", - "n: <class 'int'> 0\n", - "n: <class 'int'> 1\n", - "n: <class 'int'> 0\n", - "n: <class 'int'> 0\n", - "n: <class 'int'> 1\n", - "n: <class 'int'> 1\n", - "n: <class 'int'> 0\n" + "{'dataset_package': 'decentralizepy.datasets.Femnist', 'dataset_class': 'Femnist', 'model_class': 'CNN', 'n_procs': 36, 'train_dir': 'leaf/data/femnist/per_user_data/train', 'test_dir': 'leaf/data/femnist/data/test', 'sizes': ''}\n" + ] + }, + { + "ename": "IndexError", + "evalue": "list index out of range", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mIndexError\u001b[0m Traceback (most recent call last)", + "\u001b[0;32m/tmp/ipykernel_2255475/3991202644.py\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m 27\u001b[0m \u001b[0;31m#f = Femnist(2, 'leaf/data/femnist/data/train', sizes=[0.6, 0.4])\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 28\u001b[0m \u001b[0mg\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mGraph\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 29\u001b[0;31m \u001b[0mg\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mread_graph_from_file\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"36_nodes.edges\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"edges\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 30\u001b[0m \u001b[0ml\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mLinear\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m36\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 31\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/Gitlab/decentralizepy/src/decentralizepy/graphs/Graph.py\u001b[0m in \u001b[0;36mread_graph_from_file\u001b[0;34m(self, file, type, force_connect)\u001b[0m\n\u001b[1;32m 70\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mline\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mlines\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 71\u001b[0m \u001b[0mx\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0my\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mmap\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mint\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mline\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstrip\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msplit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 72\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__insert_edge__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0my\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 73\u001b[0m \u001b[0;32melif\u001b[0m \u001b[0mtype\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0;34m\"adjacency\"\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 74\u001b[0m \u001b[0mnode_id\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;36m0\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/Gitlab/decentralizepy/src/decentralizepy/graphs/Graph.py\u001b[0m in \u001b[0;36m__insert_edge__\u001b[0;34m(self, x, y)\u001b[0m\n\u001b[1;32m 39\u001b[0m \u001b[0mThe\u001b[0m \u001b[0mdestination\u001b[0m \u001b[0mvertex\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 40\u001b[0m \"\"\"\n\u001b[0;32m---> 41\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0madj_list\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0madd\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0my\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 42\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0madj_list\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0my\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0madd\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 43\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;31mIndexError\u001b[0m: list index out of range" ] } ], @@ -444,12 +429,12 @@ "\n", "#f = Femnist(2, 'leaf/data/femnist/data/train', sizes=[0.6, 0.4])\n", "g = Graph()\n", - "g.read_graph_from_file(\"graph.adj\", \"adjacency\")\n", - "l = Linear(1, 6)\n", + "g.read_graph_from_file(\"36_nodes.edges\", \"edges\")\n", + "l = Linear(1, 36)\n", "\n", "#Node(0, 0, l, g, my_config, 20, \"results\", logging.DEBUG)\n", "\n", - "mp.spawn(fn = Node, nprocs = 6, args=[0,l,g,my_config,20,\"results\",logging.DEBUG])\n", + "mp.spawn(fn = Node, nprocs = g.n_procs, args=[0,l,g,my_config,20,\"results\",logging.INFO])\n", "\n", "# mp.spawn(fn = Node, args = [l, g, config, 10, \"results\", logging.DEBUG], nprocs=2)\n" ] @@ -463,27 +448,76 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from decentralizepy.mappings.Linear import Linear\n", + "from testing import f\n", + "from torch import multiprocessing as mp\n", + "\n", + "l = Linear(1, 2)\n", + "mp.spawn(fn = f, nprocs = 2, args = [0, 2, \"ip_addr.json\", l])\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from decentralizepy.datasets.Femnist import Femnist\n", + "\n", + "f = Femnist()\n", + "\n", + "f.file_per_user('leaf/data/femnist/data/train','leaf/data/femnist/per_user_data/train')\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "a = set()\n", + "a.update([2, 3, 4, 5])" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{2, 3, 4, 5}" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "a" + ] + }, + { + "cell_type": "code", + "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "Message sent\n", - "Message sent\n", - "1 (0, {'message': 'Hi I am rank 0'})\n", - "0 (1, {'message': 'Hi I am rank 1'})\n" + "2 3 4 5\n" ] } ], "source": [ - "from decentralizepy.mappings.Linear import Linear\n", - "from testing import f\n", - "from torch import multiprocessing as mp\n", - "\n", - "l = Linear(1, 2)\n", - "mp.spawn(fn = f, nprocs = 2, args = [0, 2, \"ip_addr.json\", l])\n" + "print(*a)" ] }, { @@ -491,7 +525,11 @@ "execution_count": null, "metadata": {}, "outputs": [], - "source": [] + "source": [ + "from decentralizepy.graphs.SmallWorld import SmallWorld\n", + "\n", + "s = SmallWorld(36, 2, .5)" + ] } ], "metadata": { diff --git a/split_into_files.py b/split_into_files.py new file mode 100644 index 0000000000000000000000000000000000000000..ee10bc133cf7d5e85df4f96137d9c9d127df3b77 --- /dev/null +++ b/split_into_files.py @@ -0,0 +1,5 @@ +from decentralizepy.datasets.Femnist import Femnist + +f = Femnist() + +f.file_per_user('leaf/data/femnist/data/train','leaf/data/femnist/per_user_data/train') diff --git a/src/decentralizepy/communication/Communication.py b/src/decentralizepy/communication/Communication.py index bd3004e50ecd982a276e86c7cddfd7e5f2a9e1c8..42c627894940fa2e537d0865dd2290b99447fe46 100644 --- a/src/decentralizepy/communication/Communication.py +++ b/src/decentralizepy/communication/Communication.py @@ -23,3 +23,6 @@ class Communication: def send(self, uid, data): raise NotImplementedError + + def disconnect_neighbors(self): + raise NotImplementedError \ No newline at end of file diff --git a/src/decentralizepy/communication/TCP.py b/src/decentralizepy/communication/TCP.py index 898038bb5dc49dcf74d610cfc609d0d6fed2df20..5d91c57ac2aeb4e5bd9a51a76544dd08a6493864 100644 --- a/src/decentralizepy/communication/TCP.py +++ b/src/decentralizepy/communication/TCP.py @@ -55,6 +55,7 @@ class TCP(Communication): def connect_neighbors(self, neighbors): for uid in neighbors: + logging.info("Connecting to my neighbour: {}".format(uid)) id = str(uid).encode() req = self.context.socket(zmq.DEALER) req.setsockopt(zmq.IDENTITY, self.identity) @@ -75,7 +76,7 @@ class TCP(Communication): "A neighbour wants to disconnect before training started!" ) else: - logging.info( + logging.debug( "Recieved message from {} @ connect_neighbors".format(sender) ) @@ -97,12 +98,9 @@ class TCP(Communication): elif recv == BYE: logging.info("Recieved {} from {}".format(BYE, sender)) self.barrier.remove(sender) - if not self.sent_disconnections: - for sock in self.peer_sockets.values(): - sock.send(BYE) - self.sent_disconnections = True + self.disconnect_neighbors() else: - logging.info("Recieved message from {}".format(sender)) + logging.debug("Recieved message from {}".format(sender)) return self.decrypt(sender, recv) def send(self, uid, data): @@ -110,3 +108,9 @@ class TCP(Communication): id = str(uid).encode() self.peer_sockets[id].send(to_send) logging.info("{} sent the message to {}.".format(self.uid, uid)) + + def disconnect_neighbors(self): + if not self.sent_disconnections: + for sock in self.peer_sockets.values(): + sock.send(BYE) + self.sent_disconnections = True \ No newline at end of file diff --git a/src/decentralizepy/datasets/Dataset.py b/src/decentralizepy/datasets/Dataset.py index 065b86cdb065054c83f0d0645482eade756377bf..5adae3ba876933186bef1bd0cd7a4e0d917f3ef1 100644 --- a/src/decentralizepy/datasets/Dataset.py +++ b/src/decentralizepy/datasets/Dataset.py @@ -7,7 +7,7 @@ class Dataset: All datasets must follow this API. """ - def __init__(self, rank="", n_procs="", train_dir="", test_dir="", sizes=""): + def __init__(self, rank="", n_procs="", train_dir="", test_dir="", sizes="", test_batch_size=""): """ Constructor which reads the data files, instantiates and partitions the dataset Parameters @@ -30,6 +30,7 @@ class Dataset: self.train_dir = utils.conditional_value(train_dir, "", None) self.test_dir = utils.conditional_value(test_dir, "", None) self.sizes = utils.conditional_value(sizes, "", None) + self.test_batch_size = utils.conditional_value(test_batch_size, "", 64) if self.sizes: if type(self.sizes) == str: self.sizes = eval(self.sizes) diff --git a/src/decentralizepy/datasets/Femnist.py b/src/decentralizepy/datasets/Femnist.py index bbc067e53ff570c818b463ee9b00673175f9b911..16936abff02e7b9c42a8aca7496eae40720c09a9 100644 --- a/src/decentralizepy/datasets/Femnist.py +++ b/src/decentralizepy/datasets/Femnist.py @@ -27,6 +27,11 @@ class Femnist(Dataset): Class for the FEMNIST dataset """ + def __read_file__(self, file_path): + with open(file_path, "r") as inf: + client_data = json.load(inf) + return client_data["users"], client_data["num_samples"], client_data["user_data"] + def __read_dir__(self, data_dir): """ Function to read all the FEMNIST data files in the directory @@ -48,15 +53,90 @@ class Femnist(Dataset): files = [f for f in files if f.endswith(".json")] for f in files: file_path = os.path.join(data_dir, f) - with open(file_path, "r") as inf: - client_data = json.load(inf) - clients.extend(client_data["users"]) - num_samples.extend(client_data["num_samples"]) - data.update(client_data["user_data"]) - + u, n, d = self.__read_file__(file_path) + clients.extend(u) + num_samples.extend(n) + data.update(d) return clients, num_samples, data - def __init__(self, rank, n_procs="", train_dir="", test_dir="", sizes=""): + def file_per_user(self, dir, write_dir): + clients, num_samples, train_data = self.__read_dir__(dir) + for index, client in enumerate(clients): + my_data = dict() + my_data["users"] = [client] + my_data["num_samples"] = num_samples[index] + my_samples = {"x": train_data[client]["x"], "y": train_data[client]["y"]} + my_data["user_data"] = {client: my_samples} + with open(os.path.join(write_dir, client+".json"), "w") as of: + json.dump(my_data, of) + print("Created File: ", client+".json") + + + def load_trainset(self): + logging.info("Loading training set.") + files = os.listdir(self.train_dir) + files = [f for f in files if f.endswith(".json")] + files.sort() + c_len = len(files) + + #clients, num_samples, train_data = self.__read_dir__(self.train_dir) + + if self.sizes == None: # Equal distribution of data among processes + e = c_len // self.n_procs + frac = e / c_len + self.sizes = [frac] * self.n_procs + self.sizes[-1] += 1.0 - frac * self.n_procs + logging.debug("Size fractions: {}".format(self.sizes)) + + my_clients = DataPartitioner(files, self.sizes).use(self.rank) + my_train_data = {"x": [], "y": []} + self.clients = [] + self.num_samples = [] + logging.debug("Clients Length: %d", c_len) + logging.debug("My_clients_len: %d", my_clients.__len__()) + for i in range(my_clients.__len__()): + cur_file = my_clients.__getitem__(i) + + clients, _, train_data = self.__read_file__(os.path.join(self.train_dir, cur_file)) + for cur_client in clients: + self.clients.append(cur_client) + my_train_data["x"].extend(train_data[cur_client]["x"]) + my_train_data["y"].extend(train_data[cur_client]["y"]) + self.num_samples.append(len(train_data[cur_client]["y"])) + self.train_x = ( + np.array(my_train_data["x"], dtype=np.dtype("float32")) + .reshape(-1, 28, 28, 1) + .transpose(0, 3, 1, 2) + ) + self.train_y = np.array( + my_train_data["y"], dtype=np.dtype("int64") + ).reshape(-1) + logging.debug("train_x.shape: %s", str(self.train_x.shape)) + logging.debug("train_y.shape: %s", str(self.train_y.shape)) + + + def load_testset(self): + logging.info("Loading testing set.") + _, _, test_data = self.__read_dir__(self.test_dir) + test_x = [] + test_y = [] + for test_data in test_data.values(): + for x in test_data["x"]: + test_x.append(x) + for y in test_data["y"]: + test_y.append(y) + self.test_x = ( + np.array(test_x, dtype=np.dtype("float32")) + .reshape(-1, 28, 28, 1) + .transpose(0, 3, 1, 2) + ) + self.test_y = np.array(test_y, dtype=np.dtype("int64")).reshape(-1) + logging.debug("test_x.shape: %s", str(self.test_x.shape)) + logging.debug("test_y.shape: %s", str(self.test_y.shape)) + + + + def __init__(self, rank=0, n_procs="", train_dir="", test_dir="", sizes="", test_batch_size=1024): """ Constructor which reads the data files, instantiates and partitions the dataset Parameters @@ -74,61 +154,13 @@ class Femnist(Dataset): A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0 By default, each process gets an equal amount. """ - super().__init__(rank, n_procs, train_dir, test_dir, sizes) + super().__init__(rank, n_procs, train_dir, test_dir, sizes, test_batch_size) if self.__training__: - logging.info("Loading training set.") - clients, num_samples, train_data = self.__read_dir__(self.train_dir) - c_len = len(clients) - - if self.sizes == None: # Equal distribution of data among processes - e = c_len // self.n_procs - frac = e / c_len - self.sizes = [frac] * self.n_procs - self.sizes[-1] += 1.0 - frac * self.n_procs - logging.debug("Size fractions: {}".format(sizes)) - - my_clients = DataPartitioner(clients, self.sizes).use(self.rank) - my_train_data = {"x": [], "y": []} - self.clients = [] - self.num_samples = [] - logging.debug("Clients Length: %d", c_len) - logging.debug("My_clients_len: %d", my_clients.__len__()) - for i in range(my_clients.__len__()): - cur_client = my_clients.__getitem__(i) - self.clients.append(cur_client) - my_train_data["x"].extend(train_data[cur_client]["x"]) - my_train_data["y"].extend(train_data[cur_client]["y"]) - self.num_samples.append(len(train_data[cur_client]["y"])) - self.train_x = ( - np.array(my_train_data["x"], dtype=np.dtype("float32")) - .reshape(-1, 28, 28, 1) - .transpose(0, 3, 1, 2) - ) - self.train_y = np.array( - my_train_data["y"], dtype=np.dtype("int64") - ).reshape(-1) - logging.debug("train_x.shape: %s", str(self.train_x.shape)) - logging.debug("train_y.shape: %s", str(self.train_y.shape)) + self.load_trainset() if self.__testing__: - logging.info("Loading testing set.") - _, _, test_data = self.__read_dir__(self.test_dir) - test_x = [] - test_y = [] - for test_data in test_data.values(): - for x in test_data["x"]: - test_x.append(x) - for y in test_data["y"]: - test_y.append(y) - self.test_x = ( - np.array(test_x, dtype=np.dtype("float32")) - .reshape(-1, 28, 28, 1) - .transpose(0, 3, 1, 2) - ) - self.test_y = np.array(test_y, dtype=np.dtype("int64")).reshape(-1) - logging.debug("test_x.shape: %s", str(self.test_x.shape)) - logging.debug("test_y.shape: %s", str(self.test_y.shape)) + self.load_testset() # TODO: Add Validation @@ -198,7 +230,7 @@ class Femnist(Dataset): If the test set was not initialized """ if self.__testing__: - return DataLoader(Data(self.test_x, self.test_y)) + return DataLoader(Data(self.test_x, self.test_y), batch_size=self.test_batch_size) raise RuntimeError("Test set not initialized!") def imshow(self, img): @@ -207,33 +239,38 @@ class Femnist(Dataset): plt.show() def test(self, model): + logging.debug("Evaluating on test set.") testloader = self.get_testset() - # dataiter = iter(testloader) - # images, labels = dataiter.next() - # self.imshow(torchvision.utils.make_grid(images)) - # plt.savefig(' '.join('%5s' % j for j in labels) + ".png") - # print(' '.join('%5s' % j for j in labels)) + + logging.debug("Test Loader instantiated.") correct_pred = [0 for _ in range(NUM_CLASSES)] total_pred = [0 for _ in range(NUM_CLASSES)] + + total_correct = 0 + total_predicted = 0 + with torch.no_grad(): for elems, labels in testloader: outputs = model(elems) _, predictions = torch.max(outputs, 1) for label, prediction in zip(labels, predictions): + logging.debug("{} predicted as {}".format(label, prediction)) if label == prediction: correct_pred[label] += 1 + total_correct += 1 total_pred[label] += 1 + total_predicted += 1 - total_correct = 0 + logging.debug("Predicted on the test set") for key, value in enumerate(correct_pred): accuracy = 100 * float(value) / total_pred[key] - total_correct += value logging.debug("Accuracy for class {} is: {:.1f} %".format(key, accuracy)) - accuracy = 100 * float(total_correct) / testloader.__len__() + accuracy = 100 * float(total_correct) / total_predicted logging.info("Overall accuracy is: {:.1f} %".format(accuracy)) + logging.debug("Evaluating complete.") class LogisticRegression(nn.Module): diff --git a/src/decentralizepy/graphs/Graph.py b/src/decentralizepy/graphs/Graph.py index d21f97b1079e1a15f29b945505a61d7ce897c636..a2bd0e302e6685d370fe105e5be3ef9802ad4c43 100644 --- a/src/decentralizepy/graphs/Graph.py +++ b/src/decentralizepy/graphs/Graph.py @@ -77,13 +77,27 @@ class Graph: self.__insert_adj__(node_id, neighbours) node_id += 1 else: - raise ValueError("Type must be from {edges, adjacency}!") + raise ValueError("type must be from {edges, adjacency}!") if force_connect: self.connect_graph() return self.n_procs + def write_graph_to_file(self, file, type="edges"): + with open(file, "w") as of: + of.write(str(self.n_procs) + '\n') + if type == "edges": + for node, adj in enumerate(self.adj_list): + for neighbor in adj: + of.write("{} {}".format(node, neighbor)+ '\n') + elif type == "adjacency": + for adj in self.adj_list: + of.write(str(*adj) + '\n') + else: + raise ValueError("type must be from {edges, adjacency}!") + + def connect_graph(self): """ Connects the graph using a Ring diff --git a/src/decentralizepy/node/Node.py b/src/decentralizepy/node/Node.py index 9a4e9cc43a0dace06d7b5cfc38690efb8f14f0a8..c0732a60c4195a990ab3c4dadd4cbc56ec8d53db 100644 --- a/src/decentralizepy/node/Node.py +++ b/src/decentralizepy/node/Node.py @@ -23,6 +23,7 @@ class Node: iterations=1, log_dir=".", log_level=logging.INFO, + test_after = 5, *args ): """ @@ -144,12 +145,18 @@ class Node: self.testset = self.dataset.get_testset() + rounds_to_test = test_after for iteration in range(iterations): logging.info("Starting training iteration: %d", iteration) self.trainer.train(self.dataset) self.sharing.step() + + rounds_to_test -= 1 - if self.dataset.__testing__: + if self.dataset.__testing__ and rounds_to_test == 0: + rounds_to_test = test_after self.dataset.test(self.model) + + self.communication.disconnect_neighbors() diff --git a/src/decentralizepy/sharing/Sharing.py b/src/decentralizepy/sharing/Sharing.py index 8f09689b2c25c992cdd8b9b4f16bb0e03ba75be6..b29f8be197dd4792805d5032f05154abc6012ef3 100644 --- a/src/decentralizepy/sharing/Sharing.py +++ b/src/decentralizepy/sharing/Sharing.py @@ -64,11 +64,12 @@ class Sharing: del data["degree"] self.peer_deques[sender].append((degree, self.deserialized_model(data))) + logging.info("Starting model averaging after receiving from all neighbors") total = dict() weight_total = 0 - for n in self.peer_deques: + for i, n in enumerate(self.peer_deques): + logging.debug("Averaging model from neighbor {}".format(i)) degree, data = self.peer_deques[n].popleft() - #logging.info("top element: {}".format(d)) weight = 1/(max(len(self.peer_deques), degree) + 1) # Metro-Hastings weight_total += weight for key, value in data.items(): @@ -81,3 +82,5 @@ class Sharing: total[key] += (1 - weight_total) * value # Metro-Hastings self.model.load_state_dict(total) + + logging.info("Model averaging complete") \ No newline at end of file diff --git a/testing.py b/testing.py index c2cb432a3a2c4187f56287245215303c73f13f67..7dfdb958fa63fb336b8a2be1d6281de4c4ad8fb2 100644 --- a/testing.py +++ b/testing.py @@ -23,7 +23,9 @@ if __name__ == "__main__": my_config[section] = dict(config.items(section)) g = Graph() - g.read_graph_from_file("graph.adj", "adjacency") - l = Linear(1, 6) + g.read_graph_from_file("36_nodes.edges", "edges") + n_machines = 3 + procs_per_machine = 12 + l = Linear(n_machines, procs_per_machine) - mp.spawn(fn = Node, nprocs = 6, args=[0,l,g,my_config,20,"results",logging.DEBUG]) + mp.spawn(fn = Node, nprocs = procs_per_machine, args=[0,l,g,my_config,20,"results",logging.DEBUG])