From 75a720aae900b7eb36331ac5f9bc710f472e836c Mon Sep 17 00:00:00 2001 From: Jeffrey Wigger <jeffrey.wigger@epfl.ch> Date: Thu, 28 Apr 2022 15:51:19 +0200 Subject: [PATCH] reformat --- src/decentralizepy/datasets/Femnist.py | 78 ++++++++++++++------ src/decentralizepy/models/Resnet.py | 43 +++++++++-- src/decentralizepy/sharing/LowerBoundTopK.py | 49 ++++++------ src/decentralizepy/sharing/Synchronous.py | 3 +- src/decentralizepy/sharing/TopKNormalized.py | 29 +++++--- 5 files changed, 134 insertions(+), 68 deletions(-) diff --git a/src/decentralizepy/datasets/Femnist.py b/src/decentralizepy/datasets/Femnist.py index 9bbbd87..a7b2677 100644 --- a/src/decentralizepy/datasets/Femnist.py +++ b/src/decentralizepy/datasets/Femnist.py @@ -14,7 +14,7 @@ from decentralizepy.datasets.Dataset import Dataset from decentralizepy.datasets.Partitioner import DataPartitioner from decentralizepy.mappings.Mapping import Mapping from decentralizepy.models.Model import Model -from decentralizepy.models.Resnet import BasicBlock, Bottleneck, conv1x1, conv3x3 +from decentralizepy.models.Resnet import BasicBlock, Bottleneck, conv1x1 NUM_CLASSES = 62 IMAGE_SIZE = (28, 28) @@ -291,7 +291,10 @@ class Femnist(Dataset): """ if self.__training__: return DataLoader( - Data(self.train_x, self.train_y), batch_size=batch_size, shuffle=shuffle, drop_last=True # needed for resnet + Data(self.train_x, self.train_y), + batch_size=batch_size, + shuffle=shuffle, + drop_last=True, # needed for resnet ) raise RuntimeError("Training set not initialized!") @@ -450,6 +453,7 @@ class CNN(Model): x = self.fc2(x) return x + class RNET(Model): """ From PyTorch: @@ -457,12 +461,19 @@ class RNET(Model): Copied and modified from https://github.com/pytorch/pytorch/blob/75024e228ca441290b6a1c2e564300ad507d7af6/benchmarks/functional_autograd_benchmark/torchvision_models.py For the license see models/Resnet.py """ - def __init__(self, num_classes=NUM_CLASSES, zero_init_residual=False, - groups=1, width_per_group=32, replace_stride_with_dilation=None, - norm_layer=None): + + def __init__( + self, + num_classes=NUM_CLASSES, + zero_init_residual=False, + groups=1, + width_per_group=32, + replace_stride_with_dilation=None, + norm_layer=None, + ): super(RNET, self).__init__() block = BasicBlock - layers = [2,2,2,2] + layers = [2, 2, 2, 2] if norm_layer is None: norm_layer = nn.BatchNorm2d self._norm_layer = norm_layer @@ -474,28 +485,34 @@ class RNET(Model): # the 2x2 stride with a dilated convolution instead replace_stride_with_dilation = [False, False, False] if len(replace_stride_with_dilation) != 3: - raise ValueError("replace_stride_with_dilation should be None " - "or a 3-element tuple, got {}".format(replace_stride_with_dilation)) + raise ValueError( + "replace_stride_with_dilation should be None " + "or a 3-element tuple, got {}".format(replace_stride_with_dilation) + ) self.groups = groups self.base_width = width_per_group - self.conv1 = nn.Conv2d(1, self.inplanes, kernel_size=7, stride=2, padding=3, - bias=False) + self.conv1 = nn.Conv2d( + 1, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False + ) self.bn1 = norm_layer(self.inplanes) self.relu = nn.ReLU(inplace=True) self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1) self.layer1 = self._make_layer(block, 32, layers[0]) - self.layer2 = self._make_layer(block, 64, layers[1], stride=2, - dilate=replace_stride_with_dilation[0]) - self.layer3 = self._make_layer(block, 128, layers[2], stride=2, - dilate=replace_stride_with_dilation[1]) - self.layer4 = self._make_layer(block, 256, layers[3], stride=2, - dilate=replace_stride_with_dilation[2]) + self.layer2 = self._make_layer( + block, 64, layers[1], stride=2, dilate=replace_stride_with_dilation[0] + ) + self.layer3 = self._make_layer( + block, 128, layers[2], stride=2, dilate=replace_stride_with_dilation[1] + ) + self.layer4 = self._make_layer( + block, 256, layers[3], stride=2, dilate=replace_stride_with_dilation[2] + ) self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) self.fc = nn.Linear(256 * block.expansion, num_classes) for m in self.modules(): if isinstance(m, nn.Conv2d): - nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') + nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu") elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) @@ -524,13 +541,30 @@ class RNET(Model): ) layers = [] - layers.append(block(self.inplanes, planes, stride, downsample, self.groups, - self.base_width, previous_dilation, norm_layer)) + layers.append( + block( + self.inplanes, + planes, + stride, + downsample, + self.groups, + self.base_width, + previous_dilation, + norm_layer, + ) + ) self.inplanes = planes * block.expansion for _ in range(1, blocks): - layers.append(block(self.inplanes, planes, groups=self.groups, - base_width=self.base_width, dilation=self.dilation, - norm_layer=norm_layer)) + layers.append( + block( + self.inplanes, + planes, + groups=self.groups, + base_width=self.base_width, + dilation=self.dilation, + norm_layer=norm_layer, + ) + ) return nn.Sequential(*layers) diff --git a/src/decentralizepy/models/Resnet.py b/src/decentralizepy/models/Resnet.py index eb3c6bb..158a2ed 100644 --- a/src/decentralizepy/models/Resnet.py +++ b/src/decentralizepy/models/Resnet.py @@ -72,13 +72,22 @@ POSSIBILITY OF SUCH DAMAGE. """ from torch import nn -import torch + # Copied and modified from https://github.com/pytorch/pytorch/blob/75024e228ca441290b6a1c2e564300ad507d7af6/benchmarks/functional_autograd_benchmark/torchvision_models.py + def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1): """3x3 convolution with padding""" - return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, - padding=dilation, groups=groups, bias=False, dilation=dilation) + return nn.Conv2d( + in_planes, + out_planes, + kernel_size=3, + stride=stride, + padding=dilation, + groups=groups, + bias=False, + dilation=dilation, + ) def conv1x1(in_planes, out_planes, stride=1): @@ -89,8 +98,17 @@ def conv1x1(in_planes, out_planes, stride=1): class BasicBlock(nn.Module): expansion = 1 - def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1, - base_width=64, dilation=1, norm_layer=None): + def __init__( + self, + inplanes, + planes, + stride=1, + downsample=None, + groups=1, + base_width=64, + dilation=1, + norm_layer=None, + ): super(BasicBlock, self).__init__() if norm_layer is None: norm_layer = nn.BatchNorm2d @@ -133,12 +151,21 @@ class Bottleneck(nn.Module): expansion = 4 - def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1, - base_width=64, dilation=1, norm_layer=None): + def __init__( + self, + inplanes, + planes, + stride=1, + downsample=None, + groups=1, + base_width=64, + dilation=1, + norm_layer=None, + ): super(Bottleneck, self).__init__() if norm_layer is None: norm_layer = nn.BatchNorm2d - width = int(planes * (base_width / 64.)) * groups + width = int(planes * (base_width / 64.0)) * groups # Both self.conv2 and self.downsample layers downsample the input when stride != 1 self.conv1 = conv1x1(inplanes, width) self.bn1 = norm_layer(width) diff --git a/src/decentralizepy/sharing/LowerBoundTopK.py b/src/decentralizepy/sharing/LowerBoundTopK.py index d470c64..6ac5329 100644 --- a/src/decentralizepy/sharing/LowerBoundTopK.py +++ b/src/decentralizepy/sharing/LowerBoundTopK.py @@ -1,19 +1,14 @@ -import json import logging -import os -from pathlib import Path import numpy as np import torch -from decentralizepy.sharing.PartialModel import PartialModel -from decentralizepy.sharing.Sharing import Sharing -from decentralizepy.utils import conditional_value, identity +from decentralizepy.sharing.PartialModel import PartialModel class LowerBoundTopK(PartialModel): """ - This class implements a bounded version of topK. + This class implements a bounded version of topK. """ @@ -27,8 +22,8 @@ class LowerBoundTopK(PartialModel): model, dataset, log_dir, - lower_bound = 0.1, - metro_hastings = True, + lower_bound=0.1, + metro_hastings=True, **kwargs, ): """ @@ -85,12 +80,13 @@ class LowerBoundTopK(PartialModel): graph, model, dataset, - log_dir, **kwargs + log_dir, + **kwargs, ) self.lower_bound = lower_bound self.metro_hastings = metro_hastings if self.lower_bound > 0: - self.start_lower_bounding_at = 1/self.lower_bound + self.start_lower_bounding_at = 1 / self.lower_bound def extract_top_gradients(self): """ @@ -103,7 +99,7 @@ class LowerBoundTopK(PartialModel): (a,b). a: The magnitudes of the topK gradients, b: Their indices. """ - if (self.lower_bound == 0.0): + if self.lower_bound == 0.0: return super().extract_top_gradients() logging.info("Returning topk gradients bounded") @@ -119,19 +115,23 @@ class LowerBoundTopK(PartialModel): # because the superclass increases it where it is inconvenient for this subclass currently_shared = self.model.shared_parameters_counter.clone().detach() currently_shared[ind] += 1 - ind_small = (currently_shared < self.communication_round*self.lower_bound).nonzero(as_tuple=True)[0] - ind_small_unique = np.setdiff1d(ind_small.numpy(), ind.numpy(), assume_unique=True) - take_max = round(self.lower_bound*self.alpha * G_topk.shape[0]) - logging.info("lower: %i %i %i", len(ind_small), len(ind_small_unique), take_max) + ind_small = ( + currently_shared < self.communication_round * self.lower_bound + ).nonzero(as_tuple=True)[0] + ind_small_unique = np.setdiff1d( + ind_small.numpy(), ind.numpy(), assume_unique=True + ) + take_max = round(self.lower_bound * self.alpha * G_topk.shape[0]) + logging.info( + "lower: %i %i %i", len(ind_small), len(ind_small_unique), take_max + ) if take_max > ind_small_unique.shape[0]: take_max = ind_small_unique.shape[0] to_take = torch.rand(ind_small_unique.shape[0]) - _, ind_of_to_take = torch.topk( - to_take, take_max, dim=0, sorted=False - ) + _, ind_of_to_take = torch.topk(to_take, take_max, dim=0, sorted=False) ind_bound = torch.from_numpy(ind_small_unique)[ind_of_to_take] logging.info("lower bounding: %i %i", len(ind), len(ind_bound)) - #val = torch.concat(val, G_topk[ind_bound]) # not really needed, as thes are abs values and not further used + # val = torch.concat(val, G_topk[ind_bound]) # not really needed, as thes are abs values and not further used ind = torch.cat([ind, ind_bound]) return val, ind @@ -196,15 +196,15 @@ class LowerBoundTopK(PartialModel): ) data, ind = self.deserialized_model_avg(data) weight_vector[ind] += 1 - #weight = 1 / (max(len(self.peer_deques), degree) + 1) # Metro-Hastings - #weight_total += weight + # weight = 1 / (max(len(self.peer_deques), degree) + 1) # Metro-Hastings + # weight_total += weight datas.append(data) weight_vector = 1.0 / weight_vector # speed up by exploiting sparsity T = T * weight_vector for d in datas: - T += (d * weight_vector) + T += d * weight_vector start_index = 0 total = dict() @@ -213,6 +213,5 @@ class LowerBoundTopK(PartialModel): total[key] = T[start_index:end_index].reshape(self.shapes[i]) start_index = end_index - logging.info("new averaging") - self.model.load_state_dict(total) \ No newline at end of file + self.model.load_state_dict(total) diff --git a/src/decentralizepy/sharing/Synchronous.py b/src/decentralizepy/sharing/Synchronous.py index ad03705..29d7f62 100644 --- a/src/decentralizepy/sharing/Synchronous.py +++ b/src/decentralizepy/sharing/Synchronous.py @@ -58,7 +58,6 @@ class Synchronous: for k, v in self.model.state_dict().items(): self.init_model[k] = v.clone().detach() - def received_from_all(self): """ Check if all neighbors have sent the current iteration @@ -104,7 +103,7 @@ class Synchronous: """ m = dict() for key, val in self.model.state_dict().items(): - m[key] = val - self.init_model[key] # this is -lr*gradient + m[key] = val - self.init_model[key] # this is -lr*gradient self.total_data += len(self.communication.encrypt(m)) return m diff --git a/src/decentralizepy/sharing/TopKNormalized.py b/src/decentralizepy/sharing/TopKNormalized.py index a9d6655..15a3caf 100644 --- a/src/decentralizepy/sharing/TopKNormalized.py +++ b/src/decentralizepy/sharing/TopKNormalized.py @@ -1,14 +1,9 @@ -import json import logging -import os -from pathlib import Path -import numpy as np import torch -from decentralizepy.sharing.PartialModel import PartialModel -from decentralizepy.sharing.Sharing import Sharing -from decentralizepy.utils import conditional_value, identity +from decentralizepy.sharing.PartialModel import PartialModel +from decentralizepy.utils import identity class TopKNormalized(PartialModel): @@ -35,7 +30,7 @@ class TopKNormalized(PartialModel): save_accumulated="", change_transformer=identity, accumulate_averaging_changes=False, - epsilon = 0.01 + epsilon=0.01, ): """ Constructor @@ -80,8 +75,21 @@ class TopKNormalized(PartialModel): """ super().__init__( - rank, machine_id, communication, mapping, graph, model, dataset, log_dir, - alpha, dict_ordered, save_shared, metadata_cap, accumulation, save_accumulated, change_transformer, + rank, + machine_id, + communication, + mapping, + graph, + model, + dataset, + log_dir, + alpha, + dict_ordered, + save_shared, + metadata_cap, + accumulation, + save_accumulated, + change_transformer, accumulate_averaging_changes, ) self.epsilon = epsilon @@ -107,4 +115,3 @@ class TopKNormalized(PartialModel): return torch.topk( G_topk_normalized, round(self.alpha * G_topk.shape[0]), dim=0, sorted=False ) - -- GitLab