diff --git a/eval/plot.py b/eval/plot.py index 282418575fc96518bd7d80a3a138c2dbb4e830b4..fa8be5299eca5749a850239124f3d5e3f27cffca 100644 --- a/eval/plot.py +++ b/eval/plot.py @@ -39,6 +39,8 @@ def plot_results(path): print("Reading folders from: ", path) print("Folders: ", folders) bytes_means, bytes_stdevs = {}, {} + meta_means, meta_stdevs = {}, {} + data_means, data_stdevs = {}, {} for folder in folders: folder_path = os.path.join(path, folder) if not os.path.isdir(folder_path): @@ -76,6 +78,22 @@ def plot_results(path): bytes_means[folder] = list(means.values())[0] bytes_stdevs[folder] = list(stdevs.values())[0] + meta_list = [] + for x in results: + max_key = str(max(list(map(int, x["total_meta"].keys())))) + meta_list.append({max_key: x["total_meta"][max_key]}) + means, stdevs, mins, maxs = get_stats(meta_list) + meta_means[folder] = list(means.values())[0] + meta_stdevs[folder] = list(stdevs.values())[0] + + data_list = [] + for x in results: + max_key = str(max(list(map(int, x["total_data_per_n"].keys())))) + data_list.append({max_key: x["total_data_per_n"][max_key]}) + means, stdevs, mins, maxs = get_stats(data_list) + data_means[folder] = list(means.values())[0] + data_stdevs[folder] = list(stdevs.values())[0] + plt.figure(1) plt.savefig(os.path.join(path, "train_loss.png")) plt.figure(2) @@ -97,6 +115,30 @@ def plot_results(path): plt.xticks(x_pos, list(bytes_means.keys())) plt.savefig(os.path.join(path, "data_shared.png")) + # Plot stacked_bytes + plt.figure(4) + plt.title("Data Shared per Neighbor") + x_pos = np.arange(len(meta_means.keys())) + plt.bar( + x_pos, + np.array(list(data_means.values())) // (1024 * 1024), + yerr=np.array(list(data_stdevs.values())) // (1024 * 1024), + align="center", + label="Parameters", + ) + plt.bar( + x_pos, + np.array(list(meta_means.values())) // (1024 * 1024), + bottom=np.array(list(data_means.values())) // (1024 * 1024), + yerr=np.array(list(meta_stdevs.values())) // (1024 * 1024), + align="center", + label="Metadata", + ) + plt.ylabel("Data shared in MBs") + plt.xlabel("Fraction of Model Shared") + plt.xticks(x_pos, list(meta_means.keys())) + plt.savefig(os.path.join(path, "parameters_metadata.png")) + def plot_parameters(path): plt.figure(4) diff --git a/src/decentralizepy/datasets/Celeba.py b/src/decentralizepy/datasets/Celeba.py index baf3434fff737f93a88848e9964e3f827e6883e3..10b99cbbd2726fe488168aa1524964f307c5817c 100644 --- a/src/decentralizepy/datasets/Celeba.py +++ b/src/decentralizepy/datasets/Celeba.py @@ -144,7 +144,11 @@ class Celeba(Dataset): my_train_data["y"].extend(train_data[cur_client]["y"]) self.num_samples.append(len(train_data[cur_client]["y"])) - logging.debug("Initial shape of x: {}".format(np.array(my_train_data["x"], dtype=np.dtype("float32")).shape)) + logging.debug( + "Initial shape of x: {}".format( + np.array(my_train_data["x"], dtype=np.dtype("float32")).shape + ) + ) self.train_x = ( np.array(my_train_data["x"], dtype=np.dtype("float32")) .reshape(-1, IMAGE_DIM, IMAGE_DIM, CHANNELS) diff --git a/src/decentralizepy/node/Node.py b/src/decentralizepy/node/Node.py index eb7200a894c6769c84a5711de9d17cb7d6ea2ff1..11156100c12bc3e0347a6ed3dd1a41ebf305582c 100644 --- a/src/decentralizepy/node/Node.py +++ b/src/decentralizepy/node/Node.py @@ -310,6 +310,17 @@ class Node: results_dict["train_loss"][iteration + 1] = loss_after_sharing results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes + if self.sharing.total_meta: + results_dict["total_meta"][iteration + 1] = self.sharing.total_meta + if self.sharing.total_data: + results_dict["total_data_per_n"][ + iteration + 1 + ] = self.sharing.total_data + if self.sharing.mean: + results_dict["grad_mean"][iteration + 1] = self.sharing.mean + if self.sharing.std: + results_dict["grad_std"][iteration + 1] = self.sharing.std + self.save_plot( results_dict["train_loss"], "train_loss", diff --git a/src/decentralizepy/sharing/PartialModel.py b/src/decentralizepy/sharing/PartialModel.py index 7eef218a0d9c6d309020687028796c09b351f0d0..6102242d13d6b6151285e60cb24d3e36336cc64c 100644 --- a/src/decentralizepy/sharing/PartialModel.py +++ b/src/decentralizepy/sharing/PartialModel.py @@ -67,6 +67,7 @@ class PartialModel(Sharing): self.dict_ordered = dict_ordered self.save_shared = save_shared self.metadata_cap = metadata_cap + self.total_meta = 0 # Only save for 2 procs: Save space if rank == 0 or rank == 1: @@ -99,6 +100,9 @@ class PartialModel(Sharing): logging.info("Returning topk gradients") tensors_to_cat = [v.data.flatten() for _, v in gradient_sum.items()] G_topk = torch.abs(torch.cat(tensors_to_cat, dim=0)) + std, mean = torch.std_mean(G_topk, unbiased=False) + self.std = std.item() + self.mean = mean.item() return torch.topk( G_topk, round(self.alpha * G_topk.shape[0]), dim=0, sorted=False ) @@ -152,6 +156,7 @@ class PartialModel(Sharing): raise NotImplementedError m["indices"] = G_topk.numpy().tolist() + m["params"] = T_topk.numpy().tolist() assert len(m["indices"]) == len(m["params"]) @@ -163,6 +168,8 @@ class PartialModel(Sharing): m[key] = json.dumps(m[key]) logging.info("Converted dictionary to json") + self.total_data += len(self.communication.encrypt(m["params"])) + self.total_meta += len(self.communication.encrypt(m["indices"])) return m diff --git a/src/decentralizepy/sharing/Sharing.py b/src/decentralizepy/sharing/Sharing.py index 92f6475b207bc79d4ec0bd08c2494393e0564335..e5ef5cb92dd145f1eca3a5659381b0aec684515f 100644 --- a/src/decentralizepy/sharing/Sharing.py +++ b/src/decentralizepy/sharing/Sharing.py @@ -48,6 +48,7 @@ class Sharing: self.dataset = dataset self.communication_round = 0 self.log_dir = log_dir + self.total_data = 0 self.peer_deques = dict() my_neighbors = self.graph.neighbors(self.uid) @@ -100,6 +101,7 @@ class Sharing: m = dict() for key, val in self.model.state_dict().items(): m[key] = json.dumps(val.numpy().tolist()) + self.total_data += len(self.communication.encrypt(m[key])) return m def deserialized_model(self, m): diff --git a/src/decentralizepy/training/GradientAccumulator.py b/src/decentralizepy/training/GradientAccumulator.py index 718e793aeadeba850c04db5dfa88a63895e40161..3c160594d74e6f50ae107cc0f486077e5428c6bc 100644 --- a/src/decentralizepy/training/GradientAccumulator.py +++ b/src/decentralizepy/training/GradientAccumulator.py @@ -75,24 +75,6 @@ class GradientAccumulator(Training): self.optimizer.step() return loss_val.item() - def train_full(self, trainset): - """ - One training iteration, goes through the entire dataset - - Parameters - ---------- - trainset : torch.utils.data.Dataloader - The training dataset. - - """ - for epoch in range(self.rounds): - epoch_loss = 0.0 - count = 0 - for data, target in trainset: - epoch_loss += self.trainstep(data, target) - count += 1 - logging.info("Epoch: {} loss: {}".format(epoch, epoch_loss / count)) - def train(self, dataset): """ One training iteration with accumulation of gradients in model.accumulated_gradients. diff --git a/src/decentralizepy/training/Training.py b/src/decentralizepy/training/Training.py index bd8077343a5081180f1077581c577877267c6ac8..dbb636ce9ed87276ac6aadbdbab9b74395be4422 100644 --- a/src/decentralizepy/training/Training.py +++ b/src/decentralizepy/training/Training.py @@ -124,7 +124,11 @@ class Training: epoch_loss = 0.0 count = 0 for data, target in trainset: - logging.info("Starting minibatch {} with num_samples: {}".format(count, len(data))) + logging.info( + "Starting minibatch {} with num_samples: {}".format( + count, len(data) + ) + ) logging.info("Classes: {}".format(target)) epoch_loss += self.trainstep(data, target) count += 1