diff --git a/eval/plot.py b/eval/plot.py index f354937f580c7d016c451f17d253bf6712cc95b5..0b7b66c448d4ad75281c729d18f47bf7840da1c6 100644 --- a/eval/plot.py +++ b/eval/plot.py @@ -4,6 +4,7 @@ import sys import numpy as np from matplotlib import pyplot as plt +import pandas as pd def get_stats(l): @@ -61,20 +62,20 @@ def plot_results(path): plt.figure(1) means, stdevs, mins, maxs = get_stats([x["train_loss"] for x in results]) plot(means, stdevs, mins, maxs, "Training Loss", folder, "upper right") - with open(os.path.join(path, "train_loss_" + folder + ".json"), "w") as f: - json.dump({"mean": means, "std": stdevs}, f) + df = pd.DataFrame({"mean": list(means.values()), "std": list(stdevs.values()), "nr_nodes": [len(results)]*len(means)}, list(means.keys()), columns=["mean", "std", "nr_nodes"]) + df.to_csv(os.path.join(path, "train_loss_" + folder + ".csv")) # Plot Testing loss plt.figure(2) means, stdevs, mins, maxs = get_stats([x["test_loss"] for x in results]) plot(means, stdevs, mins, maxs, "Testing Loss", folder, "upper right") - with open(os.path.join(path, "test_loss_" + folder + ".json"), "w") as f: - json.dump({"mean": means, "std": stdevs}, f) + df = pd.DataFrame({"mean": list(means.values()), "std": list(stdevs.values()), "nr_nodes": [len(results)]*len(means)}, list(means.keys()), columns=["mean", "std", "nr_nodes"]) + df.to_csv(os.path.join(path, "test_loss_" + folder + ".csv")) # Plot Testing Accuracy plt.figure(3) means, stdevs, mins, maxs = get_stats([x["test_acc"] for x in results]) plot(means, stdevs, mins, maxs, "Testing Accuracy", folder, "lower right") - with open(os.path.join(path, "test_acc_" + folder + ".json"), "w") as f: - json.dump({"mean": means, "std": stdevs}, f) + df = pd.DataFrame({"mean": list(means.values()), "std": list(stdevs.values()), "nr_nodes": [len(results)]*len(means)}, list(means.keys()), columns=["mean", "std", "nr_nodes"]) + df.to_csv(os.path.join(path, "test_acc_" + folder + ".csv")) plt.figure(6) means, stdevs, mins, maxs = get_stats([x["grad_std"] for x in results]) plot( diff --git a/eval/run.sh b/eval/run.sh index 20a9d2ba08b8572f8df051f7b39e9221afbcd678..528bdc97ab23b77be71c5d8d1413a740eacd946f 100755 --- a/eval/run.sh +++ b/eval/run.sh @@ -13,11 +13,10 @@ iterations=200 test_after=10 eval_file=testing.py log_level=INFO -log_dir_base=/mnt/nfs/some_user/logs/test m=`cat $(grep addresses_filepath $original_config | awk '{print $3}') | grep $(/sbin/ifconfig ens785 | grep 'inet ' | awk '{print $2}') | cut -d'"' -f2` - -log_dir=$log_dir_base$m +log_dir=$(date '+%Y-%m-%dT%H:%M')/machine$m +mkdir -p $log_dir cp $original_config $config_file # echo "alpha = 0.10" >> $config_file diff --git a/eval/step_configs/config_femnist_100.ini b/eval/step_configs/config_femnist_100.ini index 4e3e9ba57519f265240130dc8c054355e3bd4d18..e1af10b3822402030cd23e80b67318e370df80aa 100644 --- a/eval/step_configs/config_femnist_100.ini +++ b/eval/step_configs/config_femnist_100.ini @@ -1,11 +1,12 @@ [DATASET] dataset_package = decentralizepy.datasets.Femnist dataset_class = Femnist +random_seed = 97 model_class = CNN -train_dir = /home/risharma/leaf/data/femnist/per_user_data/train -test_dir = /home/risharma/leaf/data/femnist/data/test +train_dir = /mnt/nfs/shared/leaf/data/femnist/per_user_data/train +test_dir = /mnt/nfs/shared/leaf/data/femnist/data/test ; python list of fractions below -sizes = +sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim @@ -15,9 +16,9 @@ lr = 0.001 [TRAIN_PARAMS] training_package = decentralizepy.training.Training training_class = Training -rounds = 20 +rounds = 47 full_epochs = False -batch_size = 64 +batch_size = 16 shuffle = True loss_package = torch.nn loss_class = CrossEntropyLoss diff --git a/eval/step_configs/config_femnist_fft.ini b/eval/step_configs/config_femnist_fft.ini index b0eda41e799d9222837c409b9957ef39247011c1..13a769c55247c7f8bcbf56ffe7541b75b2295a7d 100644 --- a/eval/step_configs/config_femnist_fft.ini +++ b/eval/step_configs/config_femnist_fft.ini @@ -34,5 +34,4 @@ addresses_filepath = ip_addr_6Machines.json sharing_package = decentralizepy.sharing.FFT sharing_class = FFT alpha = 0.1 -change_based_selection = True -accumulation = True \ No newline at end of file +change_based_selection = True \ No newline at end of file diff --git a/eval/step_configs/config_femnist_sharing.ini b/eval/step_configs/config_femnist_sharing.ini deleted file mode 100644 index c816302eca816cc23ed662bb21669b20ac9b0c4d..0000000000000000000000000000000000000000 --- a/eval/step_configs/config_femnist_sharing.ini +++ /dev/null @@ -1,33 +0,0 @@ -[DATASET] -dataset_package = decentralizepy.datasets.Femnist -dataset_class = Femnist -random_seed = 97 -model_class = CNN -train_dir = /mnt/nfs/shared/leaf/data/femnist/per_user_data/train -test_dir = /mnt/nfs/shared/leaf/data/femnist/data/test -; python list of fractions below -sizes = - -[OPTIMIZER_PARAMS] -optimizer_package = torch.optim -optimizer_class = Adam -lr = 0.001 - -[TRAIN_PARAMS] -training_package = decentralizepy.training.Training -training_class = Training -rounds = 47 -full_epochs = False -batch_size = 16 -shuffle = True -loss_package = torch.nn -loss_class = CrossEntropyLoss - -[COMMUNICATION] -comm_package = decentralizepy.communication.TCP -comm_class = TCP -addresses_filepath = ip_addr_6Machines.json - -[SHARING] -sharing_package = decentralizepy.sharing.Sharing -sharing_class = Sharing diff --git a/eval/step_configs/config_femnist_topk.ini b/eval/step_configs/config_femnist_topkacc.ini similarity index 84% rename from eval/step_configs/config_femnist_topk.ini rename to eval/step_configs/config_femnist_topkacc.ini index 7c905885cd8b72a71b39647dca58f7ecf235ae27..e65f225d392716bdcbd1fcd90e52a6de23716c52 100644 --- a/eval/step_configs/config_femnist_topk.ini +++ b/eval/step_configs/config_femnist_topkacc.ini @@ -15,8 +15,8 @@ lr = 0.001 # There are 734463 femnist samples [TRAIN_PARAMS] -training_package = decentralizepy.training.ModelChangeAccumulator -training_class = ModelChangeAccumulator +training_package = decentralizepy.training.ChangeAccumulator +training_class = ChangeAccumulator rounds = 47 full_epochs = False batch_size = 16 @@ -33,5 +33,4 @@ addresses_filepath = ip_addr_6Machines.json [SHARING] sharing_package = decentralizepy.sharing.TopK sharing_class = TopK -alpha = 0.1 -accumulation = True \ No newline at end of file +alpha = 0.1 \ No newline at end of file diff --git a/eval/step_configs/config_femnist_wavelet.ini b/eval/step_configs/config_femnist_wavelet.ini index 5228709c94a97c6474f5b5a2ec048c7652e27b3a..ac3bac22a5735a27dc7766fba3c19ffa3dc3f0b3 100644 --- a/eval/step_configs/config_femnist_wavelet.ini +++ b/eval/step_configs/config_femnist_wavelet.ini @@ -39,4 +39,3 @@ change_based_selection = True alpha = 0.1 wavelet=sym2 level= None -accumulation = True diff --git a/eval/testing.py b/eval/testing.py index 0ae70de77c711637180d9e3701fa081c9ad914d8..bb16c2f3cc68cba4a3f30635cbf58bcc0a96e7cb 100644 --- a/eval/testing.py +++ b/eval/testing.py @@ -25,7 +25,7 @@ if __name__ == "__main__": args = utils.get_args() # prevents accidental log overwrites - Path(args.log_dir).mkdir(parents=True, exist_ok=False) + Path(args.log_dir).mkdir(parents=True, exist_ok=True) log_level = { "INFO": logging.INFO, diff --git a/setup.cfg b/setup.cfg index 2ffd5727e4320e5618ab44b21b6b9d1ea0e6229e..0b85f720b6002ff5292413a3df5697926d622f0a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -43,6 +43,7 @@ install_requires = smallworld localconfig PyWavelets + pandas include_package_data = True python_requires = >=3.6 [options.packages.find] diff --git a/src/decentralizepy/models/Model.py b/src/decentralizepy/models/Model.py index e9e556b05f950e159d58971cb5343131982e898c..643eec5c17416aff0735307277acef8352df6e3d 100644 --- a/src/decentralizepy/models/Model.py +++ b/src/decentralizepy/models/Model.py @@ -17,9 +17,7 @@ class Model(nn.Module): self.accumulated_gradients = [] self._param_count_ot = None self._param_count_total = None - self.accumulated_frequency = None - self.prev_model_params = None - self.prev = None + self.accumulated_changes = None def count_params(self, only_trainable=False): """ @@ -46,3 +44,16 @@ class Model(nn.Module): if not self._param_count_total: self._param_count_total = sum(p.numel() for p in self.parameters()) return self._param_count_total + + def rewind_accumulation(self, indices): + """ + resets accumulated_changes at the given indices + + Parameters + ---------- + indices : torch.Tensor + Tensor that contains indices corresponding to the flatten model + + """ + if self.accumulated_changes is not None: + self.accumulated_changes[indices] = 0.0 \ No newline at end of file diff --git a/src/decentralizepy/node/Node.py b/src/decentralizepy/node/Node.py index fd2c75f7fe4a7017570dee702b7ffa7a4995b609..7854c38d2bd32832bf641c30c87a7d5d0855d2e4 100644 --- a/src/decentralizepy/node/Node.py +++ b/src/decentralizepy/node/Node.py @@ -93,7 +93,7 @@ class Node: graph : decentralizepy.graphs The object containing the global graph iterations : int - Number of iterations (communication steps) ) for which the model should be trained + Number of iterations (communication steps) for which the model should be trained log_dir : str Logging directory reset_optimizer : int @@ -281,7 +281,7 @@ class Node: config : dict A dictionary of configurations. iterations : int - Number of iterations (communication steps) ) for which the model should be trained + Number of iterations (communication steps) for which the model should be trained log_dir : str Logging directory log_level : logging.Level @@ -448,7 +448,7 @@ class Node: epochs_per_round = 25 batch_size = 64 iterations : int - Number of iterations (communication steps) ) for which the model should be trained + Number of iterations (communication steps) for which the model should be trained log_dir : str Logging directory log_level : logging.Level diff --git a/src/decentralizepy/sharing/FFT.py b/src/decentralizepy/sharing/FFT.py index 1cdf701f51fb3ae205b0e6016e26f7859104d163..a4c3b59d3a71b8983e59b555e2bf0fac5378867e 100644 --- a/src/decentralizepy/sharing/FFT.py +++ b/src/decentralizepy/sharing/FFT.py @@ -114,27 +114,19 @@ class FFT(Sharing): logging.info("Returning fft compressed model weights") tensors_to_cat = [v.data.flatten() for _, v in self.model.state_dict().items()] concated = torch.cat(tensors_to_cat, dim=0) - + flat_fft = fft.rfft(concated) if self.change_based_selection: - flat_fft = fft.rfft(concated) - if self.accumulation: - logging.info( - "fft topk extract frequencies based on accumulated model frequency change" - ) - diff = self.model.accumulated_frequency + (flat_fft - self.model.prev) - else: - diff = flat_fft - self.model.accumulated_frequency + + assert len(self.model.accumulated_gradients) == 1 + diff = self.model.accumulated_gradients[0] _, index = torch.topk( diff.abs(), round(self.alpha * len(flat_fft)), dim=0, sorted=False ) else: - flat_fft = fft.rfft(concated) _, index = torch.topk( flat_fft.abs(), round(self.alpha * len(flat_fft)), dim=0, sorted=False ) - if self.accumulation: - self.model.accumulated_frequency[index] = 0.0 return flat_fft[index], index def serialized_model(self): @@ -153,6 +145,8 @@ class FFT(Sharing): with torch.no_grad(): topk, indices = self.apply_fft() + self.model.rewind_accumulation(indices) + if self.save_shared: shared_params = dict() shared_params["order"] = list(self.model.state_dict().keys()) diff --git a/src/decentralizepy/sharing/PartialModel.py b/src/decentralizepy/sharing/PartialModel.py index 6a8f0cb71a3468f7aa33d0bbd91172c4be4ffefd..204ee2334cb34686e34959ee3eb4b41dba3112c7 100644 --- a/src/decentralizepy/sharing/PartialModel.py +++ b/src/decentralizepy/sharing/PartialModel.py @@ -124,6 +124,7 @@ class PartialModel(Sharing): with torch.no_grad(): _, G_topk = self.extract_top_gradients() + self.model.rewind_accumulation(G_topk) if self.save_shared: shared_params = dict() shared_params["order"] = list(self.model.state_dict().keys()) diff --git a/src/decentralizepy/sharing/TopK.py b/src/decentralizepy/sharing/TopK.py deleted file mode 100644 index f50ba7e28538ddd1d6c070d1b53380cec9368143..0000000000000000000000000000000000000000 --- a/src/decentralizepy/sharing/TopK.py +++ /dev/null @@ -1,228 +0,0 @@ -import json -import logging -import os -from pathlib import Path - -import numpy as np -import torch - -from decentralizepy.sharing.Sharing import Sharing - - -class TopK(Sharing): - """ - This class implements topk selection of model parameters based on the model change since the beginning of the - communication step: --> Use ModelChangeAccumulator - - """ - - def __init__( - self, - rank, - machine_id, - communication, - mapping, - graph, - model, - dataset, - log_dir, - alpha=1.0, - dict_ordered=True, - save_shared=False, - metadata_cap=1.0, - accumulation=False, - ): - """ - Constructor - - Parameters - ---------- - rank : int - Local rank - machine_id : int - Global machine id - communication : decentralizepy.communication.Communication - Communication module used to send and receive messages - mapping : decentralizepy.mappings.Mapping - Mapping (rank, machine_id) -> uid - graph : decentralizepy.graphs.Graph - Graph reprensenting neighbors - model : decentralizepy.models.Model - Model to train - dataset : decentralizepy.datasets.Dataset - Dataset for sharing data. Not implemented yet! TODO - log_dir : str - Location to write shared_params (only writing for 2 procs per machine) - alpha : float - Percentage of model to share - dict_ordered : bool - Specifies if the python dict maintains the order of insertion - save_shared : bool - Specifies if the indices of shared parameters should be logged - metadata_cap : float - Share full model when self.alpha > metadata_cap - - """ - super().__init__( - rank, machine_id, communication, mapping, graph, model, dataset, log_dir - ) - self.alpha = alpha - self.dict_ordered = dict_ordered - self.save_shared = save_shared - self.metadata_cap = metadata_cap - self.total_meta = 0 - self.accumulation = accumulation - - if self.save_shared: - # Only save for 2 procs: Save space - if rank != 0 or rank != 1: - self.save_shared = False - - if self.save_shared: - self.folder_path = os.path.join( - self.log_dir, "shared_params/{}".format(self.rank) - ) - Path(self.folder_path).mkdir(parents=True, exist_ok=True) - - def extract_top_gradients(self): - """ - Extract the indices and values of the topK gradients. - The gradients must have been accumulationd. - - Returns - ------- - tuple - (a,b). a: The magnitudes of the topK gradients, b: Their indices. - - """ - tensors_to_cat = [v.data.flatten() for _, v in self.model.state_dict().items()] - concated = torch.cat(tensors_to_cat, dim=0) - if self.accumulation: - logging.info( - "TopK extract gradients based on accumulated model parameter change" - ) - diff = self.model.prev_model_params + (concated - self.model.prev) - else: - diff = concated - self.model.prev_model_params - G_topk = torch.abs(diff) - - std, mean = torch.std_mean(G_topk, unbiased=False) - self.std = std.item() - self.mean = mean.item() - value, ind = torch.topk( - G_topk, round(self.alpha * G_topk.shape[0]), dim=0, sorted=False - ) - - # only needed when ModelChangeAccumulator.accumulation = True - # does not cause problems otherwise - if self.accumulation: - self.model.prev_model_params[ind] = 0.0 # torch.zeros((len(G_topk),)) - return value, ind - - def serialized_model(self): - """ - Convert model to a dict. self.alpha specifies the fraction of model to send. - - Returns - ------- - dict - Model converted to a dict - - """ - if self.alpha > self.metadata_cap: # Share fully - return super().serialized_model() - - with torch.no_grad(): - _, G_topk = self.extract_top_gradients() - - if self.save_shared: - shared_params = dict() - shared_params["order"] = list(self.model.state_dict().keys()) - shapes = dict() - for k, v in self.model.state_dict().items(): - shapes[k] = list(v.shape) - shared_params["shapes"] = shapes - - shared_params[self.communication_round] = G_topk.tolist() - - with open( - os.path.join( - self.folder_path, - "{}_shared_params.json".format(self.communication_round + 1), - ), - "w", - ) as of: - json.dump(shared_params, of) - - logging.info("Extracting topk params") - - tensors_to_cat = [v.data.flatten() for v in self.model.parameters()] - T = torch.cat(tensors_to_cat, dim=0) - T_topk = T[G_topk] - - logging.info("Generating dictionary to send") - - m = dict() - - if not self.dict_ordered: - raise NotImplementedError - - m["indices"] = G_topk.numpy().astype(np.int32) - m["params"] = T_topk.numpy() - - assert len(m["indices"]) == len(m["params"]) - logging.info("Elements sending: {}".format(len(m["indices"]))) - - logging.info("Generated dictionary to send") - - logging.info("Converted dictionary to pickle") - self.total_data += len(self.communication.encrypt(m["params"])) - self.total_meta += len(self.communication.encrypt(m["indices"])) - - return m - - def deserialized_model(self, m): - """ - Convert received dict to state_dict. - - Parameters - ---------- - m : dict - dict received - - Returns - ------- - state_dict - state_dict of received - - """ - if self.alpha > self.metadata_cap: # Share fully - return super().deserialized_model(m) - - with torch.no_grad(): - state_dict = self.model.state_dict() - - if not self.dict_ordered: - raise NotImplementedError - - shapes = [] - lens = [] - tensors_to_cat = [] - for _, v in state_dict.items(): - shapes.append(v.shape) - t = v.flatten() - lens.append(t.shape[0]) - tensors_to_cat.append(t) - - T = torch.cat(tensors_to_cat, dim=0) - index_tensor = torch.tensor(m["indices"], dtype=torch.long) - logging.debug("Original tensor: {}".format(T[index_tensor])) - T[index_tensor] = torch.tensor(m["params"]) - logging.debug("Final tensor: {}".format(T[index_tensor])) - start_index = 0 - for i, key in enumerate(state_dict): - end_index = start_index + lens[i] - state_dict[key] = T[start_index:end_index].reshape(shapes[i]) - start_index = end_index - - return state_dict diff --git a/src/decentralizepy/sharing/Wavelet.py b/src/decentralizepy/sharing/Wavelet.py index 774dfe0b79bb580e4eeebbc4eff28605bfc578fb..2ec700a65eb8faebbfd30120e74a514f7ddedd39 100644 --- a/src/decentralizepy/sharing/Wavelet.py +++ b/src/decentralizepy/sharing/Wavelet.py @@ -122,32 +122,23 @@ class Wavelet(Sharing): logging.info("Returning dwt compressed model weights") tensors_to_cat = [v.data.flatten() for _, v in self.model.state_dict().items()] concated = torch.cat(tensors_to_cat, dim=0) + + coeff = pywt.wavedec(concated.numpy(), self.wavelet, level=self.level) + data, coeff_slices = pywt.coeffs_to_array( + coeff + ) # coeff_slices will be reproduced on the receiver + data = data.ravel() + if self.change_based_selection: - coeff = pywt.wavedec(concated.numpy(), self.wavelet, level=self.level) - data, coeff_slices = pywt.coeffs_to_array( - coeff - ) # coeff_slices will be reproduced on the receiver - data = data.ravel() - - if self.accumulation: - logging.info( - "wavelet topk extract frequencies based on accumulated model frequency change" - ) - diff = self.model.accumulated_frequency + (data - self.model.prev) - else: - diff = data - self.model.accumulated_frequency + assert len(self.model.accumulated_gradients) == 1 + diff = self.model.accumulated_gradients[0] _, index = torch.topk( - torch.from_numpy(diff).abs(), + diff.abs(), round(self.alpha * len(data)), dim=0, sorted=False, ) else: - coeff = pywt.wavedec(concated.numpy(), self.wavelet, level=self.level) - data, coeff_slices = pywt.coeffs_to_array( - coeff - ) # coeff_slices will be reproduced on the receiver - data = data.ravel() _, index = torch.topk( torch.from_numpy(data).abs(), round(self.alpha * len(data)), @@ -155,8 +146,6 @@ class Wavelet(Sharing): sorted=False, ) - if self.accumulation: - self.model.accumulated_frequency[index] = 0.0 return torch.from_numpy(data[index]), index def serialized_model(self): @@ -175,6 +164,8 @@ class Wavelet(Sharing): with torch.no_grad(): topk, indices = self.apply_wavelet() + self.model.rewind_accumulation(indices) + if self.save_shared: shared_params = dict() shared_params["order"] = list(self.model.state_dict().keys()) diff --git a/src/decentralizepy/training/ChangeAccumulator.py b/src/decentralizepy/training/ChangeAccumulator.py index 6ee5dc79f1e32aa36cc09ab06f1309bc28fd2f32..c3bc81abeedd4f173fec3808276747cc8f910665 100644 --- a/src/decentralizepy/training/ChangeAccumulator.py +++ b/src/decentralizepy/training/ChangeAccumulator.py @@ -28,6 +28,7 @@ class ChangeAccumulator(Training): batch_size="", shuffle="", save_accumulated="", + accumulation=True, ): """ Constructor @@ -58,6 +59,8 @@ class ChangeAccumulator(Training): True if the dataset should be shuffled before training. save_accumulated : bool True if accumulated weight change should be written to file + accumulation : bool + True if the model change should be accumulated across communication steps """ super().__init__( @@ -85,6 +88,9 @@ class ChangeAccumulator(Training): self.log_dir, "model_val/{}".format(self.rank) ) Path(self.model_val_path).mkdir(parents=True, exist_ok=True) + self.accumulation = accumulation + self.init_model = None + self.prev = None def save_vector(self, v, s): """ @@ -152,12 +158,31 @@ class ChangeAccumulator(Training): k: v.data.clone().detach() for k, v in zip(self.model.state_dict(), self.model.parameters()) } + if self.accumulation: + if self.model.accumulated_changes is None: + flats = [v.data.flatten() for _, v in self.init_model.items()] + flat = torch.cat(flats) + self.model.accumulated_changes = torch.zeros_like(flat) + self.prev = flat + else: + flats = [v.data.flatten() for _, v in self.init_model.items()] + flat = torch.cat(flats) + self.model.accumulated_changes += (flat - self.prev) + self.prev = flat + super().train(dataset) with torch.no_grad(): change = { k: v.data.clone().detach() - self.init_model[k] for k, v in zip(self.model.state_dict(), self.model.parameters()) } + if self.accumulation: + flats_change = [v.data.flatten() for _, v in change.items()] + flat_change = torch.cat(flats_change) + # flatten does not copy data if input is already flattened + # however cat copies + change = {"flat" : self.model.accumulated_changes + flat_change} + self.model.accumulated_gradients.append(change) if self.save_accumulated: diff --git a/src/decentralizepy/training/FrequencyAccumulator.py b/src/decentralizepy/training/FrequencyAccumulator.py index 9c264cc5d1140afd806bee20739900cc4455dd43..7d7c9ab08bfe6eb7a7df80ec05dc88c2f54fb9b8 100644 --- a/src/decentralizepy/training/FrequencyAccumulator.py +++ b/src/decentralizepy/training/FrequencyAccumulator.py @@ -71,6 +71,8 @@ class FrequencyAccumulator(Training): shuffle, ) self.accumulation = accumulation + self.init_model = None + self.prev = None def train(self, dataset): """ @@ -84,22 +86,27 @@ class FrequencyAccumulator(Training): The training dataset. Should implement get_trainset(batch_size, shuffle) """ - - # this looks at the change from the last round averaging of the frequencies - tensors_to_cat = [v.data.flatten() for _, v in self.model.state_dict().items()] - concated = torch.cat(tensors_to_cat, dim=0) - flat_fft = fft.rfft(concated) - if self.accumulation: - if self.model.accumulated_frequency is None: - logging.info("Initialize fft frequency accumulation") - self.model.accumulated_frequency = torch.zeros_like(flat_fft) - self.model.prev = flat_fft - else: - logging.info("fft frequency accumulation step") - self.model.accumulated_frequency += flat_fft - self.model.prev - self.model.prev = flat_fft - else: - logging.info("fft frequency accumulation reset") - self.model.accumulated_frequency = flat_fft + with torch.no_grad(): + self.model.accumulated_gradients = [] + tensors_to_cat = [v.data.flatten() for _, v in self.model.state_dict().items()] + concated = torch.cat(tensors_to_cat, dim=0) + self.init_model = fft.rfft(concated) + if self.accumulation: + if self.model.accumulated_changes is None: + self.model.accumulated_changes = torch.zeros_like(self.init_model) + self.prev = self.init_model + else: + self.model.accumulated_changes += (self.init_model - self.prev) + self.prev = self.init_model super().train(dataset) + + with torch.no_grad(): + tensors_to_cat = [v.data.flatten() for _, v in self.model.state_dict().items()] + concated = torch.cat(tensors_to_cat, dim=0) + end_model = fft.rfft(concated) + change = end_model - self.init_model + if self.accumulation: + change += self.model.accumulated_changes + + self.model.accumulated_gradients.append(change) \ No newline at end of file diff --git a/src/decentralizepy/training/FrequencyWaveletAccumulator.py b/src/decentralizepy/training/FrequencyWaveletAccumulator.py index cf65724020a308f3c9a88d0f0f4fbbd1f6985305..ee36894d3d91de9a9cfeca0cc18af56d6c6d0bbb 100644 --- a/src/decentralizepy/training/FrequencyWaveletAccumulator.py +++ b/src/decentralizepy/training/FrequencyWaveletAccumulator.py @@ -91,23 +91,31 @@ class FrequencyWaveletAccumulator(Training): """ # this looks at the change from the last round averaging of the frequencies - tensors_to_cat = [v.data.flatten() for _, v in self.model.state_dict().items()] - concated = torch.cat(tensors_to_cat, dim=0) - coeff = pywt.wavedec(concated.numpy(), self.wavelet, level=self.level) - data, coeff_slices = pywt.coeffs_to_array(coeff) - data = data.ravel() - if self.accumulation: - if self.model.accumulated_frequency is None: - logging.info("Initialize wavelet frequency accumulation") - self.model.accumulated_frequency = np.zeros_like( - data - ) # torch.zeros_like(data) - self.model.prev = data - else: - logging.info("wavelet frequency accumulation step") - self.model.accumulated_frequency += data - self.model.prev - self.model.prev = data - else: - logging.info("wavelet frequency accumulation reset") - self.model.accumulated_frequency = data + with torch.no_grad(): + self.model.accumulated_gradients = [] + tensors_to_cat = [v.data.flatten() for _, v in self.model.state_dict().items()] + concated = torch.cat(tensors_to_cat, dim=0) + coeff = pywt.wavedec(concated.numpy(), self.wavelet, level=self.level) + data, coeff_slices = pywt.coeffs_to_array(coeff) + self.init_model = torch.from_numpy(data.ravel()) + if self.accumulation: + if self.model.accumulated_changes is None: + self.model.accumulated_changes = torch.zeros_like(self.init_model) + self.prev = self.init_model + else: + self.model.accumulated_changes += (self.init_model - self.prev) + self.prev = self.init_model + super().train(dataset) + + with torch.no_grad(): + tensors_to_cat = [v.data.flatten() for _, v in self.model.state_dict().items()] + concated = torch.cat(tensors_to_cat, dim=0) + coeff = pywt.wavedec(concated.numpy(), self.wavelet, level=self.level) + data, coeff_slices = pywt.coeffs_to_array(coeff) + end_model = torch.from_numpy(data.ravel()) + change = end_model - self.init_model + if self.accumulation: + change += self.model.accumulated_changes + + self.model.accumulated_gradients.append(change) diff --git a/src/decentralizepy/training/ModelChangeAccumulator.py b/src/decentralizepy/training/ModelChangeAccumulator.py deleted file mode 100644 index 1c70283c22f0affbb2e58345e2e210b446d685e9..0000000000000000000000000000000000000000 --- a/src/decentralizepy/training/ModelChangeAccumulator.py +++ /dev/null @@ -1,103 +0,0 @@ -import logging - -import torch -from torch import fft - -from decentralizepy.training.Training import Training - - -class ModelChangeAccumulator(Training): - """ - This class implements the training module which also accumulates the model change at the beginning of a communication round. - - """ - - def __init__( - self, - rank, - machine_id, - mapping, - model, - optimizer, - loss, - log_dir, - rounds="", - full_epochs="", - batch_size="", - shuffle="", - accumulation=True, - ): - """ - Constructor - - Parameters - ---------- - rank : int - Rank of process local to the machine - machine_id : int - Machine ID on which the process in running - mapping : decentralizepy.mappings - The object containing the mapping rank <--> uid - model : torch.nn.Module - Neural Network for training - optimizer : torch.optim - Optimizer to learn parameters - loss : function - Loss function - log_dir : str - Directory to log the model change. - rounds : int, optional - Number of steps/epochs per training call - full_epochs: bool, optional - True if 1 round = 1 epoch. False if 1 round = 1 minibatch - batch_size : int, optional - Number of items to learn over, in one batch - shuffle : bool - True if the dataset should be shuffled before training. - accumulation : bool - True if the model change should be accumulated across communication steps - - """ - super().__init__( - rank, - machine_id, - mapping, - model, - optimizer, - loss, - log_dir, - rounds, - full_epochs, - batch_size, - shuffle, - ) - self.accumulation = accumulation - - def train(self, dataset): - """ - Does one training iteration. - If self.accumulation is True then it accumulates model parameter changes in model.prev_model_params. - Otherwise it stores the current model parameters in model.prev_model_params. - - Parameters - ---------- - dataset : decentralizepy.datasets.Dataset - The training dataset. Should implement get_trainset(batch_size, shuffle) - - """ - - tensors_to_cat = [v.data.flatten() for _, v in self.model.state_dict().items()] - concated = torch.cat(tensors_to_cat, dim=0) - if self.accumulation: - if self.model.prev_model_params is None: - logging.info("Initialize model parameter accumulation.") - self.model.prev_model_params = torch.zeros_like(concated) - self.model.prev = concated - else: - logging.info("model parameter accumulation step") - self.model.prev_model_params += concated - self.model.prev - self.model.prev = concated - else: - logging.info("model parameter reset") - self.model.prev_model_params = concated - super().train(dataset)