Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • sacs/decentralizepy
  • mvujas/decentralizepy
  • randl/decentralizepy
3 results
Show changes
import json
import logging
import os
from pathlib import Path
import torch
from decentralizepy.sharing.Sharing import Sharing
class SubSampling(Sharing):
"""
This class implements the subsampling version of model sharing
It is based on PartialModel.py
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha=1.0,
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
pickle=True,
layerwise=False,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
alpha : float
Percentage of model to share
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
pickle : bool
use pickle to serialize the model parameters
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
compress,
compression_package,
compression_class,
)
self.alpha = alpha
self.dict_ordered = dict_ordered
self.save_shared = save_shared
self.metadata_cap = metadata_cap
# self.random_seed_generator = torch.Generator()
# # Will use the random device if supported by CPU, else uses the system time
# # In the latter case we could get duplicate seeds on some of the machines
# self.random_seed_generator.seed()
self.random_generator = torch.Generator()
# Will use the random device if supported by CPU, else uses the system time
# In the latter case we could get duplicate seeds on some of the machines
self.random_generator.seed()
self.seed = self.random_generator.initial_seed()
self.pickle = pickle
self.layerwise = layerwise
logging.info("subsampling pickling=" + str(pickle))
if self.save_shared:
# Only save for 2 procs: Save space
if rank != 0 or rank != 1:
self.save_shared = False
if self.save_shared:
self.folder_path = os.path.join(
self.log_dir, "shared_params/{}".format(self.rank)
)
Path(self.folder_path).mkdir(parents=True, exist_ok=True)
with torch.no_grad():
tensors_to_cat = []
for _, v in self.model.state_dict().items():
t = v.flatten()
tensors_to_cat.append(t)
self.init_model = torch.cat(tensors_to_cat, dim=0)
self.model.shared_parameters_counter = torch.zeros(
self.init_model.shape[0], dtype=torch.int32
)
def apply_subsampling(self):
"""
Creates a random binary mask that is used to subsample the parameters that will be shared
Returns
-------
tuple
(a,b,c). a: the selected parameters as flat vector, b: the random seed used to crate the binary mask
c: the alpha
"""
logging.info("Returning subsampling gradients")
if not self.layerwise:
tensors_to_cat = [
v.data.flatten() for _, v in self.model.state_dict().items()
]
concated = torch.cat(tensors_to_cat, dim=0)
curr_seed = self.seed + self.communication_round # is increased in step
self.random_generator.manual_seed(curr_seed)
# logging.debug("Subsampling seed for uid = " + str(self.uid) + " is: " + str(curr_seed))
# Or we could use torch.bernoulli
binary_mask = (
torch.rand(
size=(concated.size(dim=0),), generator=self.random_generator
)
<= self.alpha
)
subsample = concated[binary_mask]
self.model.shared_parameters_counter[binary_mask] += 1
# logging.debug("Subsampling vector is of size: " + str(subsample.size(dim = 0)))
return (subsample, curr_seed, self.alpha)
else:
values_list = []
offsets = [0]
off = 0
curr_seed = self.seed + self.communication_round # is increased in step
self.random_generator.manual_seed(curr_seed)
for _, v in self.model.state_dict().items():
flat = v.flatten()
binary_mask = (
torch.rand(
size=(flat.size(dim=0),), generator=self.random_generator
)
<= self.alpha
)
# TODO: support shared_parameters_counter
selected = flat[binary_mask]
values_list.append(selected)
off += selected.size(dim=0)
offsets.append(off)
subsample = torch.cat(values_list, dim=0)
return (subsample, curr_seed, self.alpha)
def serialized_model(self):
"""
Convert model to json dict. self.alpha specifies the fraction of model to send.
Returns
-------
dict
Model converted to json dict
"""
if self.alpha > self.metadata_cap: # Share fully
return super().serialized_model()
with torch.no_grad():
subsample, seed, alpha = self.apply_subsampling()
if self.save_shared:
shared_params = dict()
shared_params["order"] = list(self.model.state_dict().keys())
shapes = dict()
for k, v in self.model.state_dict().items():
shapes[k] = list(v.shape)
shared_params["shapes"] = shapes
# TODO: should store the shared indices and not the value
# shared_params[self.communication_round] = subsample.tolist() # is slow
shared_params["seed"] = seed
shared_params["alpha"] = alpha
with open(
os.path.join(
self.folder_path,
"{}_shared_params.json".format(self.communication_round + 1),
),
"w",
) as of:
json.dump(shared_params, of)
m = dict()
if not self.dict_ordered:
raise NotImplementedError
m["seed"] = seed
m["alpha"] = alpha
m["params"] = subsample.numpy()
return self.compress_data(m)
def deserialized_model(self, m):
"""
Convert received json dict to state_dict.
Parameters
----------
m : dict
json dict received
Returns
-------
state_dict
state_dict of received
"""
if self.alpha > self.metadata_cap: # Share fully
return super().deserialized_model(m)
m = self.decompress_data(m)
with torch.no_grad():
state_dict = self.model.state_dict()
if not self.dict_ordered:
raise NotImplementedError
seed = m["seed"]
alpha = m["alpha"]
params = m["params"]
random_generator = (
torch.Generator()
) # new generator, such that we do not overwrite the other one
random_generator.manual_seed(seed)
shapes = []
lens = []
tensors_to_cat = []
binary_submasks = []
for _, v in state_dict.items():
shapes.append(v.shape)
t = v.flatten()
lens.append(t.shape[0])
tensors_to_cat.append(t)
if self.layerwise:
binary_mask = (
torch.rand(size=(t.size(dim=0),), generator=random_generator)
<= alpha
)
binary_submasks.append(binary_mask)
T = torch.cat(tensors_to_cat, dim=0)
params_tensor = torch.from_numpy(params)
if not self.layerwise:
binary_mask = (
torch.rand(size=(T.size(dim=0),), generator=random_generator)
<= alpha
)
else:
binary_mask = torch.cat(binary_submasks, dim=0)
logging.debug("Original tensor: {}".format(T[binary_mask]))
T[binary_mask] = params_tensor
logging.debug("Final tensor: {}".format(T[binary_mask]))
start_index = 0
for i, key in enumerate(state_dict):
end_index = start_index + lens[i]
state_dict[key] = T[start_index:end_index].reshape(shapes[i])
start_index = end_index
return state_dict
# Deprecated
import logging
from collections import deque
import torch
class Synchronous:
"""
Synchronous training
"""
def __init__(
self, rank, machine_id, communication, mapping, graph, model, dataset, log_dir
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
"""
self.rank = rank
self.machine_id = machine_id
self.uid = mapping.get_uid(rank, machine_id)
self.communication = communication
self.mapping = mapping
self.graph = graph
self.model = model
self.dataset = dataset
self.communication_round = 0
self.log_dir = log_dir
self.peer_deques = dict()
self.my_neighbors = self.graph.neighbors(self.uid)
for n in self.my_neighbors:
self.peer_deques[n] = deque()
with torch.no_grad():
self.init_model = {}
for k, v in self.model.state_dict().items():
self.init_model[k] = v.clone().detach()
def received_from_all(self):
"""
Check if all neighbors have sent the current iteration
Returns
-------
bool
True if required data has been received, False otherwise
"""
for _, i in self.peer_deques.items():
if len(i) == 0:
return False
return True
def get_neighbors(self, neighbors):
"""
Choose which neighbors to share with
Parameters
----------
neighbors : list(int)
List of all neighbors
Returns
-------
list(int)
Neighbors to share with
"""
# modify neighbors here
return neighbors
def serialized_gradient(self):
"""
Convert model to a dictionary. Here we can choose how much to share
Returns
-------
dict
Model converted to dict
"""
m = dict()
for key, val in self.model.state_dict().items():
m[key] = val - self.init_model[key] # this is -lr*gradient
return m
def serialized_model(self):
"""
Convert model to a dictionary. Here we can choose how much to share
Returns
-------
dict
Model converted to dict
"""
m = dict()
for key, val in self.model.state_dict().items():
m[key] = val.clone().detach()
return m
def deserialized_model(self, m):
"""
Convert received dict to state_dict.
Parameters
----------
m : dict
received dict
Returns
-------
state_dict
state_dict of received
"""
return m
def _pre_step(self):
"""
Called at the beginning of step.
"""
pass
def _post_step(self):
"""
Called at the end of step.
"""
with torch.no_grad():
self.init_model = {}
for k, v in self.model.state_dict().items():
self.init_model[k] = v.clone().detach()
def _apply_gradients(self):
"""
Averages the received model with the local model
"""
with torch.no_grad():
total = dict()
for i, n in enumerate(self.peer_deques):
gradient = self.peer_deques[n].popleft()
logging.debug(
"Applying gradient from neighbor {}".format(
n,
)
)
grad = self.deserialized_model(gradient)
for key, value in grad.items():
if key in total:
total[key] += value
else:
total[key] = value
my_grad = self.serialized_gradient()
for key, value in my_grad.items():
if key in total:
total[key] += value
else:
total[key] = value
new_model = {}
for key, value in self.init_model.items():
new_model[key] = value + total[key] * (1 / (len(self.my_neighbors) + 1))
self.model.load_state_dict(new_model)
def step(self):
"""
Perform a sharing step. Implements D-PSGD.
"""
self._pre_step()
logging.info("--- COMMUNICATION ROUND {} ---".format(self.communication_round))
if self.uid != 0:
gradient = self.serialized_gradient()
# Should be only one neighbour
self.communication.send(0, gradient)
logging.info("Waiting for messages from central node")
sender, data = self.communication.receive()
logging.debug("Received model from {}".format(sender))
logging.info(
"Deserialized received model from {} of iteration {}".format(
sender, self.communication_round
)
)
self.model.load_state_dict(data)
else:
logging.info("Waiting for messages from leaf nodes")
while not self.received_from_all():
sender, data = self.communication.receive()
logging.debug("Received gradient from {}".format(sender))
self.peer_deques[sender].append(data)
logging.info(
"Deserialized gradient model from {} of iteration {}".format(
sender, self.communication_round
)
)
self._apply_gradients()
data = self.serialized_model()
all_neighbors = self.graph.neighbors(self.uid)
iter_neighbors = self.get_neighbors(all_neighbors)
for neighbor in iter_neighbors:
self.communication.send(neighbor, data)
self.communication_round += 1
self._post_step()
import logging
import torch
from decentralizepy.sharing.PartialModel import PartialModel
from decentralizepy.utils import identity
class TopKNormalized(PartialModel):
"""
This class implements the vanilla version of partial model sharing.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha=1.0,
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
accumulation=False,
save_accumulated="",
change_transformer=identity,
accumulate_averaging_changes=False,
epsilon=0.01,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
alpha : float
Percentage of model to share
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
accumulation : bool
True if the the indices to share should be selected based on accumulated frequency change
save_accumulated : bool
True if accumulated weight change should be written to file. In case of accumulation the accumulated change
is stored. If a change_transformer is used then the transformed change is stored.
change_transformer : (x: Tensor) -> Tensor
A function that transforms the model change into other domains. Default: identity function
accumulate_averaging_changes: bool
True if the accumulation should account the model change due to averaging
epsilon : float
numerical stability parameter used during normalization
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha,
dict_ordered,
save_shared,
metadata_cap,
accumulation,
save_accumulated,
change_transformer,
accumulate_averaging_changes,
compress,
compression_package,
compression_class,
)
self.epsilon = epsilon
def extract_top_gradients(self):
"""
Extract the indices and values of the topK gradients.
The gradients must have been accumulated.
Returns
-------
tuple
(a,b). a: The magnitudes of the topK gradients, b: Their indices.
"""
logging.info("Returning topk gradients")
G_topk = torch.abs(self.model.model_change)
G_topk_normalized = G_topk / (torch.abs(self.pre_share_model) + self.epsilon)
std, mean = torch.std_mean(G_topk, unbiased=False)
self.std = std.item()
self.mean = mean.item()
return torch.topk(
G_topk_normalized, round(self.alpha * G_topk.shape[0]), dim=0, sorted=False
)
import json
import logging
import os
from pathlib import Path
import numpy as np
import torch
from decentralizepy.sharing.Sharing import Sharing
class TopKParams(Sharing):
"""
This class implements the vanilla version of partial model sharing.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha=1.0,
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
alpha : float
Percentage of model to share
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
compress,
compression_package,
compression_class,
)
self.alpha = alpha
self.dict_ordered = dict_ordered
self.save_shared = save_shared
self.metadata_cap = metadata_cap
if self.save_shared:
# Only save for 2 procs: Save space
if rank != 0 or rank != 1:
self.save_shared = False
if self.save_shared:
self.folder_path = os.path.join(
self.log_dir, "shared_params/{}".format(self.rank)
)
Path(self.folder_path).mkdir(parents=True, exist_ok=True)
def extract_top_params(self):
"""
Extract the indices and values of the topK params layerwise.
The gradients must have been accumulated.
Returns
-------
tuple
(a,b,c). a: The topK params, b: Their indices, c: The offsets
"""
logging.info("Returning TopKParams gradients")
values_list = []
index_list = []
offsets = [0]
off = 0
for _, v in self.model.state_dict().items():
flat = v.flatten()
values, index = torch.topk(
flat.abs(), round(self.alpha * flat.size(dim=0)), dim=0, sorted=False
)
values_list.append(flat[index])
index_list.append(index)
off += values.size(dim=0)
offsets.append(off)
cat_values = torch.cat(values_list, dim=0)
cat_index = torch.cat(index_list, dim=0)
# logging.debug("Subsampling vector is of size: " + str(subsample.size(dim = 0)))
return (cat_values, cat_index, offsets)
def serialized_model(self):
"""
Convert model to json dict. self.alpha specifies the fraction of model to send.
Returns
-------
dict
Model converted to json dict
"""
if self.alpha > self.metadata_cap: # Share fully
return super().serialized_model()
with torch.no_grad():
values, index, offsets = self.extract_top_params()
self.model.shared_parameters_counter[index] += 1
if self.save_shared:
shared_params = dict()
shared_params["order"] = list(self.model.state_dict().keys())
shapes = dict()
for k, v in self.model.state_dict().items():
shapes[k] = list(v.shape)
shared_params["shapes"] = shapes
shared_params[self.communication_round] = index.tolist()
# TODO: store offsets
with open(
os.path.join(
self.folder_path,
"{}_shared_params.json".format(self.communication_round + 1),
),
"w",
) as of:
json.dump(shared_params, of)
logging.info("Extracting topk params")
logging.info("Generating dictionary to send")
m = dict()
if not self.dict_ordered:
raise NotImplementedError
m["indices"] = index.numpy().astype(np.int32)
m["params"] = values.numpy()
m["offsets"] = offsets
assert len(m["indices"]) == len(m["params"])
logging.info("Elements sending: {}".format(len(m["indices"])))
logging.info("Generated dictionary to send")
# for key in m:
# m[key] = json.dumps(m[key])
logging.info("Converted dictionary to json")
return self.compress_data(m)
def deserialized_model(self, m):
"""
Convert received json dict to state_dict.
Parameters
----------
m : dict
json dict received
Returns
-------
state_dict
state_dict of received
"""
if self.alpha > self.metadata_cap: # Share fully
return super().deserialized_model(m)
m = self.decompress_data(m)
with torch.no_grad():
state_dict = self.model.state_dict()
if not self.dict_ordered:
raise NotImplementedError
shapes = []
lens = []
tensors_to_cat = []
offsets = m["offsets"]
params = torch.tensor(m["params"])
indices = torch.tensor(m["indices"], dtype=torch.long)
for i, (_, v) in enumerate(state_dict.items()):
shapes.append(v.shape)
t = v.flatten().clone().detach() # it is not always copied
lens.append(t.shape[0])
index = indices[offsets[i] : offsets[i + 1]]
t[index] = params[offsets[i] : offsets[i + 1]]
tensors_to_cat.append(t)
start_index = 0
for i, key in enumerate(state_dict):
end_index = start_index + lens[i]
state_dict[key] = tensors_to_cat[i].reshape(shapes[i])
start_index = end_index
return state_dict
import logging
import numpy as np
import torch
from decentralizepy.sharing.PartialModel import PartialModel
class TopKPlusRandom(PartialModel):
"""
This class implements partial model sharing with some random additions.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha=1.0,
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
alpha : float
Percentage of model to share
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha,
dict_ordered,
save_shared,
metadata_cap,
compress,
compression_package,
compression_class,
)
def extract_top_gradients(self):
"""
Extract the indices and values of the topK gradients and put some extra random.
The gradients must have been accumulated.
Returns
-------
tuple
(a,b). a: The magnitudes of the topK gradients, b: Their indices.
"""
logging.info("Returning topk gradients")
G = torch.abs(self.model.model_change)
std, mean = torch.std_mean(G, unbiased=False)
self.std = std.item()
self.mean = mean.item()
elements_to_pick = round(self.alpha / 2.0 * G.shape[0])
G_topK = torch.topk(G, min(G.shape[0], elements_to_pick), dim=0, sorted=False)
more_indices = np.arange(G.shape[0], dtype=int)
np.delete(more_indices, G_topK[1].numpy())
more_indices = np.random.choice(
more_indices, min(more_indices.shape[0], elements_to_pick)
)
G_topK0 = torch.cat([G_topK[0], G[more_indices]], dim=0)
G_topK1 = torch.cat([G_topK[1], torch.tensor(more_indices)], dim=0)
return G_topK0, G_topK1
import json
import logging
import os
import numpy as np
import pywt
import torch
from decentralizepy.sharing.PartialModel import PartialModel
def change_transformer_wavelet(x, wavelet, level):
"""
Transforms the model changes into wavelet frequency domain
Parameters
----------
x : torch.Tensor
Model change in the space domain
wavelet : str
name of the wavelet to be used in gradient compression
level: int
name of the wavelet to be used in gradient compression
Returns
-------
x : torch.Tensor
Representation of the change int the wavelet domain
"""
coeff = pywt.wavedec(x, wavelet, level=level)
data, coeff_slices = pywt.coeffs_to_array(coeff)
return torch.from_numpy(data.ravel())
class Wavelet(PartialModel):
"""
This class implements the wavelet version of model sharing
It is based on PartialModel.py
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha=1.0,
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
wavelet="haar",
level=4,
change_based_selection=True,
save_accumulated="",
accumulation=False,
accumulate_averaging_changes=False,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
alpha : float
Percentage of model to share
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
wavelet: str
name of the wavelet to be used in gradient compression
level: int
name of the wavelet to be used in gradient compression
change_based_selection : bool
use frequency change to select topk frequencies
save_accumulated : bool
True if accumulated weight change in the wavelet domain should be written to file. In case of accumulation
the accumulated change is stored.
accumulation : bool
True if the the indices to share should be selected based on accumulated frequency change
accumulate_averaging_changes: bool
True if the accumulation should account the model change due to averaging
"""
self.wavelet = wavelet
self.level = level
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha,
dict_ordered,
save_shared,
metadata_cap,
accumulation,
save_accumulated,
lambda x: change_transformer_wavelet(x, wavelet, level),
accumulate_averaging_changes,
compress,
compression_package,
compression_class,
)
self.change_based_selection = change_based_selection
# Do a dummy transform to get the shape and coefficents slices
coeff = pywt.wavedec(self.init_model.numpy(), self.wavelet, level=self.level)
data, coeff_slices = pywt.coeffs_to_array(coeff)
self.wt_shape = data.shape
self.coeff_slices = coeff_slices
def apply_wavelet(self):
"""
Does wavelet transformation of the model parameters and selects topK (alpha) of them in the frequency domain
based on the undergone change during the current training step
Returns
-------
tuple
(a,b). a: selected wavelet coefficients, b: Their indices.
"""
logging.info("Returning wavelet compressed model weights")
data = self.pre_share_model_transformed
if self.change_based_selection:
diff = self.model.model_change
_, index = torch.topk(
diff.abs(),
round(self.alpha * len(diff)),
dim=0,
sorted=False,
)
else:
_, index = torch.topk(
data.abs(),
round(self.alpha * len(data)),
dim=0,
sorted=False,
)
index, _ = torch.sort(index)
return data[index], index
def serialized_model(self):
"""
Convert model to json dict. self.alpha specifies the fraction of model to send.
Returns
-------
dict
Model converted to json dict
"""
m = dict()
if self.alpha >= self.metadata_cap: # Share fully
data = self.pre_share_model_transformed
m["params"] = data.numpy()
if self.model.accumulated_changes is not None:
self.model.accumulated_changes = torch.zeros_like(
self.model.accumulated_changes
)
return self.compress_data(m)
with torch.no_grad():
topk, indices = self.apply_wavelet()
self.model.shared_parameters_counter[indices] += 1
self.model.rewind_accumulation(indices)
if self.save_shared:
shared_params = dict()
shared_params["order"] = list(self.model.state_dict().keys())
shapes = dict()
for k, v in self.model.state_dict().items():
shapes[k] = list(v.shape)
shared_params["shapes"] = shapes
# is slow
shared_params[self.communication_round] = indices.tolist()
shared_params["alpha"] = self.alpha
with open(
os.path.join(
self.folder_path,
"{}_shared_params.json".format(self.communication_round + 1),
),
"w",
) as of:
json.dump(shared_params, of)
if not self.dict_ordered:
raise NotImplementedError
m["alpha"] = self.alpha
m["params"] = topk.numpy()
m["indices"] = indices.numpy().astype(np.int32)
m["send_partial"] = True
return self.compress_data(m)
def deserialized_model(self, m):
"""
Convert received dict to state_dict.
Parameters
----------
m : dict
received dict
Returns
-------
state_dict
state_dict of received
"""
m = self.decompress_data(m)
ret = dict()
if "send_partial" not in m:
params = m["params"]
params_tensor = torch.tensor(params)
ret["params"] = params_tensor
return ret
with torch.no_grad():
if not self.dict_ordered:
raise NotImplementedError
alpha = m["alpha"]
params_tensor = torch.tensor(m["params"])
indices_tensor = torch.tensor(m["indices"], dtype=torch.long)
ret = dict()
ret["indices"] = indices_tensor
ret["params"] = params_tensor
ret["send_partial"] = True
return ret
def _averaging(self, peer_deques):
"""
Averages the received model with the local model
"""
with torch.no_grad():
total = None
weight_total = 0
wt_params = self.pre_share_model_transformed
for i, n in enumerate(peer_deques):
data = peer_deques[n].popleft()
degree, iteration = data["degree"], data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
)
data = self.deserialized_model(data)
params = data["params"]
if "indices" in data:
indices = data["indices"]
# use local data to complement
topkwf = wt_params.clone().detach()
topkwf[indices] = params
topkwf = topkwf.reshape(self.wt_shape)
else:
topkwf = params.reshape(self.wt_shape)
# Metro-Hastings
weight = 1 / (max(len(peer_deques), degree) + 1)
weight_total += weight
if total is None:
total = weight * topkwf
else:
total += weight * topkwf
# Metro-Hastings
total += (1 - weight_total) * wt_params
avg_wf_params = pywt.array_to_coeffs(
total.numpy(), self.coeff_slices, output_format="wavedec"
)
reverse_total = torch.from_numpy(
pywt.waverec(avg_wf_params, wavelet=self.wavelet)
)
start_index = 0
std_dict = {}
for i, key in enumerate(self.model.state_dict()):
end_index = start_index + self.lens[i]
std_dict[key] = reverse_total[start_index:end_index].reshape(
self.shapes[i]
)
start_index = end_index
self.model.load_state_dict(std_dict)
self._post_step()
self.communication_round += 1
def _averaging_server(self, peer_deques):
"""
Averages the received models of all working nodes
"""
with torch.no_grad():
total = None
wt_params = self.pre_share_model_transformed
for i, n in enumerate(peer_deques):
data = peer_deques[n].popleft()
degree, iteration = data["degree"], data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
)
data = self.deserialized_model(data)
params = data["params"]
if "indices" in data:
indices = data["indices"]
# use local data to complement
topkwf = wt_params.clone().detach()
topkwf[indices] = params
topkwf = topkwf.reshape(self.wt_shape)
else:
topkwf = params.reshape(self.wt_shape)
weight = 1 / len(peer_deques)
if total is None:
total = weight * topkwf
else:
total += weight * topkwf
avg_wf_params = pywt.array_to_coeffs(
total.numpy(), self.coeff_slices, output_format="wavedec"
)
reverse_total = torch.from_numpy(
pywt.waverec(avg_wf_params, wavelet=self.wavelet)
)
start_index = 0
std_dict = {}
for i, key in enumerate(self.model.state_dict()):
end_index = start_index + self.lens[i]
std_dict[key] = reverse_total[start_index:end_index].reshape(
self.shapes[i]
)
start_index = end_index
self.model.load_state_dict(std_dict)
self._post_step()
self.communication_round += 1
import logging
import matplotlib.pyplot as plt
import numpy as np
import torch
import torchvision
from decentralizepy import utils
......@@ -11,67 +8,169 @@ from decentralizepy import utils
class Training:
"""
This class implements the training module for a single node.
"""
def __init__(
self, model, optimizer, loss, epochs_per_round="", batch_size="", shuffle=""
self,
rank,
machine_id,
mapping,
model,
optimizer,
loss,
log_dir,
rounds="",
full_epochs="",
batch_size="",
shuffle="",
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
model : torch.nn.Module
Neural Network for training
optimizer : torch.optim
Optimizer to learn parameters
loss : function
Loss function
epochs_per_round : int, optional
Number of epochs per training call
log_dir : str
Directory to log the model change.
rounds : int, optional
Number of steps/epochs per training call
full_epochs : bool, optional
True if 1 round = 1 epoch. False if 1 round = 1 minibatch
batch_size : int, optional
Number of items to learn over, in one batch
shuffle : bool
True if the dataset should be shuffled before training.
"""
self.model = model
self.optimizer = optimizer
self.loss = loss
self.epochs_per_round = utils.conditional_value(epochs_per_round, "", int(1))
self.log_dir = log_dir
self.rank = rank
self.machine_id = machine_id
self.mapping = mapping
self.rounds = utils.conditional_value(rounds, "", int(1))
self.full_epochs = utils.conditional_value(full_epochs, "", False)
self.batch_size = utils.conditional_value(batch_size, "", int(1))
self.shuffle = utils.conditional_value(shuffle, "", False)
def imshow(self, img):
npimg = img.numpy()
plt.imshow(np.transpose(npimg, (1, 2, 0)))
plt.show()
def reset_optimizer(self, optimizer):
"""
Replace the current optimizer with a new one
Parameters
----------
optimizer : torch.optim
A new optimizer
"""
self.optimizer = optimizer
def train(self, dataset):
def eval_loss(self, dataset):
"""
One training iteration
Evaluate the loss on the training set
Parameters
----------
dataset : decentralizepy.datasets.Dataset
The training dataset. Should implement get_trainset(batch_size, shuffle)
"""
trainset = dataset.get_trainset(self.batch_size, self.shuffle)
epoch_loss = 0.0
count = 0
with torch.no_grad():
for data, target in trainset:
output = self.model(data)
loss_val = self.loss(output, target)
epoch_loss += loss_val.item()
count += 1
loss = epoch_loss / count
logging.info("Loss after iteration: {}".format(loss))
return loss
def trainstep(self, data, target):
"""
One training step on a minibatch.
Parameters
----------
data : any
Data item
target : any
Label
# dataiter = iter(trainset)
# images, labels = dataiter.next()
# self.imshow(torchvision.utils.make_grid(images[:16]))
# plt.savefig(' '.join('%5s' % j for j in labels) + ".png")
# print(' '.join('%5s' % j for j in labels[:16]))
Returns
-------
int
Loss Value for the step
for epoch in range(self.epochs_per_round):
"""
self.model.zero_grad()
output = self.model(data)
loss_val = self.loss(output, target)
loss_val.backward()
self.optimizer.step()
return loss_val.item()
def train_full(self, dataset):
"""
One training iteration, goes through the entire dataset
Parameters
----------
trainset : torch.utils.data.Dataloader
The training dataset.
"""
for epoch in range(self.rounds):
trainset = dataset.get_trainset(self.batch_size, self.shuffle)
epoch_loss = 0.0
count = 0
for data, target in trainset:
self.model.zero_grad()
output = self.model(data)
loss_val = self.loss(output, target)
epoch_loss += loss_val.item()
loss_val.backward()
self.optimizer.step()
logging.debug(
"Starting minibatch {} with num_samples: {}".format(
count, len(data)
)
)
logging.debug("Classes: {}".format(target))
epoch_loss += self.trainstep(data, target)
count += 1
logging.info("Epoch: {} loss: {}".format(epoch, epoch_loss / count))
def train(self, dataset):
"""
One training iteration
Parameters
----------
dataset : decentralizepy.datasets.Dataset
The training dataset. Should implement get_trainset(batch_size, shuffle)
"""
if self.full_epochs:
self.train_full(dataset)
else:
iter_loss = 0.0
count = 0
trainset = dataset.get_trainset(self.batch_size, self.shuffle)
while count < self.rounds:
for data, target in trainset:
iter_loss += self.trainstep(data, target)
count += 1
logging.info("Round: {} loss: {}".format(count, iter_loss / count))
if count >= self.rounds:
break
import argparse
import datetime
import json
import os
def conditional_value(var, nul, default):
"""
Set the value to default if nul.
Parameters
----------
var : any
The value
nul : any
The null value. Assigns default if var == nul
default : any
The default value
Returns
-------
type(var)
The final value
"""
if var != nul:
return var
else:
......@@ -6,4 +30,109 @@ def conditional_value(var, nul, default):
def remove_keys(d, keys_to_remove):
"""
Removes given keys from the dict. Returns a new list.
Parameters
----------
d : dict
The initial dictionary
keys_to_remove : list
List of keys to remove from dict
Returns
-------
dict
A new dictionary with the given keys removed.
"""
return {key: d[key] for key in d if key not in keys_to_remove}
def get_args():
"""
Utility to parse arguments.
Returns
-------
args
Command line arguments
"""
parser = argparse.ArgumentParser()
parser.add_argument("-mid", "--machine_id", type=int, default=0)
parser.add_argument("-ps", "--procs_per_machine", type=int, default=1)
parser.add_argument("-ms", "--machines", type=int, default=1)
parser.add_argument(
"-ld",
"--log_dir",
type=str,
default="./{}".format(datetime.datetime.now().isoformat(timespec="minutes")),
)
parser.add_argument(
"-wsd",
"--weights_store_dir",
type=str,
default="./{}_ws".format(datetime.datetime.now().isoformat(timespec="minutes")),
)
parser.add_argument("-is", "--iterations", type=int, default=1)
parser.add_argument("-cf", "--config_file", type=str, default="config.ini")
parser.add_argument("-ll", "--log_level", type=str, default="INFO")
parser.add_argument("-gf", "--graph_file", type=str, default="36_nodes.edges")
parser.add_argument("-gt", "--graph_type", type=str, default="edges")
parser.add_argument("-ta", "--test_after", type=int, default=5)
parser.add_argument("-tea", "--train_evaluate_after", type=int, default=1)
parser.add_argument("-ro", "--reset_optimizer", type=int, default=1)
parser.add_argument("-sm", "--server_machine", type=int, default=0)
parser.add_argument("-sr", "--server_rank", type=int, default=-1)
parser.add_argument("-wr", "--working_rate", type=float, default=1.0)
args = parser.parse_args()
return args
def write_args(args, path):
"""
Write arguments to a json file
Parameters
----------
args : args
Command line args
path : str
Location of the file to write to
"""
data = {
"machine_id": args.machine_id,
"procs_per_machine": args.procs_per_machine,
"machines": args.machines,
"log_dir": args.log_dir,
"weights_store_dir": args.weights_store_dir,
"iterations": args.iterations,
"config_file": args.config_file,
"log_level": args.log_level,
"graph_file": args.graph_file,
"graph_type": args.graph_type,
"test_after": args.test_after,
"train_evaluate_after": args.train_evaluate_after,
"reset_optimizer": args.reset_optimizer,
"working_rate": args.working_rate,
}
with open(os.path.join(path, "args.json"), "w") as of:
json.dump(data, of)
def identity(obj):
"""
Identity function
Parameters
----------
obj
Some object
Returns
-------
obj
The same object
"""
return obj
[DATASET]
dataset_package = decentralizepy.datasets.CIFAR10
dataset_class = CIFAR10
model_class = LeNet
train_dir = /mnt/nfs/shared/CIFAR
test_dir = /mnt/nfs/shared/CIFAR
; python list of fractions below
sizes =
random_seed = 90
partition_niid = True
shards = 4
[OPTIMIZER_PARAMS]
optimizer_package = torch.optim
optimizer_class = SGD
lr = 0.01
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
rounds = 3
full_epochs = False
batch_size = 8
shuffle = True
loss_package = torch.nn
loss_class = CrossEntropyLoss
[COMMUNICATION]
comm_package = decentralizepy.communication.TCP
comm_class = TCP
addresses_filepath = /mnt/nfs/risharma/Gitlab/tutorial/ip.json
[SHARING]
sharing_package = decentralizepy.sharing.Sharing
sharing_class = Sharing
{
"0": "127.0.0.1"
}
\ No newline at end of file
16
0 12
0 14
0 15
1 8
1 3
1 6
2 9
2 10
2 5
3 1
3 11
3 9
4 9
4 12
4 13
5 2
5 6
5 7
6 1
6 5
6 7
7 5
7 6
7 14
8 1
8 13
8 14
9 2
9 3
9 4
10 2
10 11
10 13
11 10
11 3
11 15
12 0
12 4
12 15
13 8
13 10
13 4
14 0
14 8
14 7
15 0
15 11
15 12
#!/bin/bash
decpy_path=/mnt/nfs/risharma/Gitlab/decentralizepy/eval
cd $decpy_path
env_python=~/miniconda3/envs/decpy/bin/python3
graph=/mnt/nfs/risharma/Gitlab/tutorial/96_regular.edges
original_config=/mnt/nfs/risharma/Gitlab/tutorial/config_celeba_sharing.ini
config_file=~/tmp/config.ini
procs_per_machine=16
machines=1
iterations=80
test_after=20
eval_file=testingPeerSampler.py
log_level=INFO
m=`cat $(grep addresses_filepath $original_config | awk '{print $3}') | grep $(/sbin/ifconfig ens785 | grep 'inet ' | awk '{print $2}') | cut -d'"' -f2`
echo M is $m
log_dir=$(date '+%Y-%m-%dT%H:%M')/machine$m
mkdir -p $log_dir
cp $original_config $config_file
# echo "alpha = 0.10" >> $config_file
$env_python $eval_file -ro 0 -tea $test_after -ld $log_dir -mid $m -ps $procs_per_machine -ms $machines -is $iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level -wsd $log_dir
\ No newline at end of file
#!/bin/bash
decpy_path=/mnt/nfs/risharma/Gitlab/decentralizepy/eval
cd $decpy_path
env_python=~/miniconda3/envs/decpy/bin/python3
graph=/mnt/nfs/risharma/Gitlab/tutorial/96_regular.edges
original_config=/mnt/nfs/risharma/Gitlab/tutorial/config_celeba_sharing.ini
config_file=~/tmp/config.ini
procs_per_machine=16
machines=1
iterations=80
test_after=20
eval_file=testingFederated.py
log_level=INFO
server_rank=-1
server_machine=0
working_rate=0.5
m=`cat $(grep addresses_filepath $original_config | awk '{print $3}') | grep $(/sbin/ifconfig ens785 | grep 'inet ' | awk '{print $2}') | cut -d'"' -f2`
echo M is $m
log_dir=$(date '+%Y-%m-%dT%H:%M')/machine$m
mkdir -p $log_dir
cp $original_config $config_file
# echo "alpha = 0.10" >> $config_file
$env_python $eval_file -ro 0 -tea $test_after -ld $log_dir -mid $m -ps $procs_per_machine -ms $machines -is $iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level -ctr 0 -cte 0 -wsd $log_dir -sm $server_machine -sr $server_rank -wr $working_rate
\ No newline at end of file