Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • sacs/decentralizepy
  • mvujas/decentralizepy
  • randl/decentralizepy
3 results
Show changes
Showing
with 2766 additions and 413 deletions
# Deprecated
import logging
from decentralizepy.sharing.PartialModel import PartialModel
......@@ -25,6 +26,9 @@ class GrowingAlpha(PartialModel):
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
......@@ -74,12 +78,15 @@ class GrowingAlpha(PartialModel):
dict_ordered,
save_shared,
metadata_cap,
compress,
compression_package,
compression_class,
)
self.init_alpha = init_alpha
self.max_alpha = max_alpha
self.k = k
def step(self):
def get_data_to_send(self):
"""
Perform a sharing step. Implements D-PSGD with alpha increasing as a linear function.
......@@ -93,4 +100,4 @@ class GrowingAlpha(PartialModel):
self.communication_round += 1
return
super().step()
return super().get_data_to_send()
import logging
import numpy as np
import torch
from decentralizepy.sharing.PartialModel import PartialModel
class LowerBoundTopK(PartialModel):
"""
This class implements a bounded version of topK.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
lower_bound=0.1,
metro_hastings=True,
compress=False,
compression_package=None,
compression_class=None,
**kwargs,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
alpha : float
Percentage of model to share
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
accumulation : bool
True if the the indices to share should be selected based on accumulated frequency change
save_accumulated : bool
True if accumulated weight change should be written to file. In case of accumulation the accumulated change
is stored. If a change_transformer is used then the transformed change is stored.
change_transformer : (x: Tensor) -> Tensor
A function that transforms the model change into other domains. Default: identity function
accumulate_averaging_changes: bool
True if the accumulation should account the model change due to averaging
lower_bound : float
Increases the communication budget per communication round by lower_bound, i.e. the new effective
alpha will be alpha + alpha*lower_bound. The extra budget is used to randomly selected parameters that
were shared in less than alpha*lower_bound*100 percentage of the rounds.
metro_hastings: bool
If True then metro hastings averaging is used otherwise it does per parameter averaging.
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
compress,
compression_package,
compression_class**kwargs,
)
self.lower_bound = lower_bound
self.metro_hastings = metro_hastings
if self.lower_bound > 0:
self.start_lower_bounding_at = 1 / self.lower_bound
def extract_top_gradients(self):
"""
Extract the indices and values of the topK gradients.
The gradients must have been accumulated.
Returns
-------
tuple
(a,b). a: The magnitudes of the topK gradients, b: Their indices.
"""
if self.lower_bound == 0.0:
return super().extract_top_gradients()
logging.info("Returning topk gradients bounded")
G_topk = torch.abs(self.model.model_change)
std, mean = torch.std_mean(G_topk, unbiased=False)
self.std = std.item()
self.mean = mean.item()
val, ind = torch.topk(
G_topk, round(self.alpha * G_topk.shape[0]), dim=0, sorted=False
)
if self.communication_round > self.start_lower_bounding_at:
# because the superclass increases it where it is inconvenient for this subclass
currently_shared = self.model.shared_parameters_counter.clone().detach()
currently_shared[ind] += 1
ind_small = (
currently_shared < self.communication_round * self.lower_bound
).nonzero(as_tuple=True)[0]
ind_small_unique = np.setdiff1d(
ind_small.numpy(), ind.numpy(), assume_unique=True
)
take_max = round(self.lower_bound * self.alpha * G_topk.shape[0])
logging.info(
"lower: %i %i %i", len(ind_small), len(ind_small_unique), take_max
)
if take_max > ind_small_unique.shape[0]:
take_max = ind_small_unique.shape[0]
to_take = torch.rand(ind_small_unique.shape[0])
_, ind_of_to_take = torch.topk(to_take, take_max, dim=0, sorted=False)
ind_bound = torch.from_numpy(ind_small_unique)[ind_of_to_take]
logging.info("lower bounding: %i %i", len(ind), len(ind_bound))
# val = torch.concat(val, G_topk[ind_bound]) # not really needed, as thes are abs values and not further used
ind = torch.cat([ind, ind_bound])
return val, ind
def deserialized_model_avg(self, m):
"""
Convert received dict to state_dict.
Parameters
----------
m : dict
dict received
Returns
-------
state_dict
state_dict of received
"""
if "send_partial" not in m:
return super().deserialized_model(m)
m = self.decompress_data(m)
with torch.no_grad():
state_dict = self.model.state_dict()
if not self.dict_ordered:
raise NotImplementedError
# could be made more efficent
T = torch.zeros_like(self.init_model)
index_tensor = torch.tensor(m["indices"], dtype=torch.long)
logging.debug("Original tensor: {}".format(T[index_tensor]))
T[index_tensor] = torch.tensor(m["params"])
logging.debug("Final tensor: {}".format(T[index_tensor]))
return T, index_tensor
def _averaging(self, peer_deques):
"""
Averages the received model with the local model
"""
if self.metro_hastings:
super()._averaging()
else:
with torch.no_grad():
tensors_to_cat = []
for _, v in self.model.state_dict().items():
t = v.flatten()
tensors_to_cat.append(t)
T = torch.cat(tensors_to_cat, dim=0)
weight_total = 0
weight_vector = torch.ones_like(self.init_model)
datas = []
for i, n in enumerate(peer_deques):
data = peer_deques[n].popleft()
degree, iteration = data["degree"], data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
)
data, ind = self.deserialized_model_avg(data)
weight_vector[ind] += 1
# weight = 1 / (max(len(self.peer_deques), degree) + 1) # Metro-Hastings
# weight_total += weight
datas.append(data)
weight_vector = 1.0 / weight_vector
# speed up by exploiting sparsity
T = T * weight_vector
for d in datas:
T += d * weight_vector
start_index = 0
total = dict()
for i, key in enumerate(self.model.state_dict()):
end_index = start_index + self.lens[i]
total[key] = T[start_index:end_index].reshape(self.shapes[i])
start_index = end_index
logging.info("new averaging")
self.model.load_state_dict(total)
self._post_step()
self.communication_round += 1
# Deprecated
import logging
from decentralizepy.sharing.PartialModel import PartialModel
class ManualAdapt(PartialModel):
"""
This class implements the basic growing partial model sharing provided when and what alpha to set.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
change_alpha,
change_rounds,
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
change_alpha : list
List of alphas to set. change_alpha[0] must be initial alpha.
change_rounds : list
List of iterations to change alpha. len(change_alpha) = len(change_rounds) + 1.
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
"""
assert change_alpha != ""
assert change_alpha != None
assert change_rounds != ""
assert change_rounds != None
if type(change_alpha) == str:
change_alpha = eval(change_alpha)
if type(change_rounds) == str:
change_rounds = eval(change_rounds)
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
change_alpha[0],
dict_ordered,
save_shared,
metadata_cap,
compress,
compression_package,
compression_class,
)
self.change_alpha = change_alpha[1:]
self.change_rounds = change_rounds
def get_data_to_send(self):
"""
Perform a sharing step. Implements D-PSGD with alpha manually given.
"""
if (
len(self.change_rounds)
and (self.communication_round + 1) == self.change_rounds[0]
):
self.alpha = min(self.change_alpha[0], 1.00)
self.change_alpha = self.change_alpha[1:]
self.change_rounds = self.change_rounds[1:]
if self.alpha == 0.0:
logging.info("Not sending/receiving data (alpha=0.0)")
self.communication_round += 1
return dict()
return super().get_data_to_send()
......@@ -3,9 +3,11 @@ import logging
import os
from pathlib import Path
import numpy as np
import torch
from decentralizepy.sharing.Sharing import Sharing
from decentralizepy.utils import conditional_value, identity
class PartialModel(Sharing):
......@@ -28,6 +30,13 @@ class PartialModel(Sharing):
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
accumulation=False,
save_accumulated="",
change_transformer=identity,
accumulate_averaging_changes=False,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
......@@ -58,20 +67,70 @@ class PartialModel(Sharing):
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
accumulation : bool
True if the the indices to share should be selected based on accumulated frequency change
save_accumulated : bool
True if accumulated weight change should be written to file. In case of accumulation the accumulated change
is stored. If a change_transformer is used then the transformed change is stored.
change_transformer : (x: Tensor) -> Tensor
A function that transforms the model change into other domains. Default: identity function
accumulate_averaging_changes: bool
True if the accumulation should account the model change due to averaging
"""
super().__init__(
rank, machine_id, communication, mapping, graph, model, dataset, log_dir
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
compress,
compression_package,
compression_class,
)
self.alpha = alpha
self.dict_ordered = dict_ordered
self.save_shared = save_shared
self.metadata_cap = metadata_cap
self.total_meta = 0
self.accumulation = accumulation
self.save_accumulated = conditional_value(save_accumulated, "", False)
self.change_transformer = change_transformer
self.accumulate_averaging_changes = accumulate_averaging_changes
# getting the initial model
self.shapes = []
self.lens = []
with torch.no_grad():
tensors_to_cat = []
for _, v in self.model.state_dict().items():
self.shapes.append(v.shape)
t = v.flatten()
self.lens.append(t.shape[0])
tensors_to_cat.append(t)
self.init_model = torch.cat(tensors_to_cat, dim=0)
if self.accumulation:
self.model.accumulated_changes = torch.zeros_like(
self.change_transformer(self.init_model)
)
self.prev = self.init_model
self.number_of_params = self.init_model.shape[0]
if self.save_accumulated:
self.model_change_path = os.path.join(
self.log_dir, "model_change/{}".format(self.rank)
)
Path(self.model_change_path).mkdir(parents=True, exist_ok=True)
self.model_val_path = os.path.join(
self.log_dir, "model_val/{}".format(self.rank)
)
Path(self.model_val_path).mkdir(parents=True, exist_ok=True)
# Only save for 2 procs: Save space
if rank == 0 or rank == 1:
self.save_shared = True
if self.save_shared and not (rank == 0 or rank == 1):
self.save_shared = False
if self.save_shared:
self.folder_path = os.path.join(
......@@ -79,6 +138,27 @@ class PartialModel(Sharing):
)
Path(self.folder_path).mkdir(parents=True, exist_ok=True)
self.model.shared_parameters_counter = torch.zeros(
self.change_transformer(self.init_model).shape[0], dtype=torch.int32
)
def compress_data(self, data):
result = dict(data)
if self.compress:
if "indices" in result:
result["indices"] = self.compressor.compress(result["indices"])
if "params" in result:
result["params"] = self.compressor.compress_float(result["params"])
return result
def decompress_data(self, data):
if self.compress:
if "indices" in data:
data["indices"] = self.compressor.decompress(data["indices"])
if "params" in data:
data["params"] = self.compressor.decompress_float(data["params"])
return data
def extract_top_gradients(self):
"""
Extract the indices and values of the topK gradients.
......@@ -90,39 +170,41 @@ class PartialModel(Sharing):
(a,b). a: The magnitudes of the topK gradients, b: Their indices.
"""
logging.info("Summing up gradients")
assert len(self.model.accumulated_gradients) > 0
gradient_sum = self.model.accumulated_gradients[0]
for i in range(1, len(self.model.accumulated_gradients)):
for key in self.model.accumulated_gradients[i]:
gradient_sum[key] += self.model.accumulated_gradients[i][key]
logging.info("Returning topk gradients")
tensors_to_cat = [v.data.flatten() for _, v in gradient_sum.items()]
G_topk = torch.abs(torch.cat(tensors_to_cat, dim=0))
G_topk = torch.abs(self.model.model_change)
std, mean = torch.std_mean(G_topk, unbiased=False)
self.std = std.item()
self.mean = mean.item()
return torch.topk(
G_topk, round(self.alpha * G_topk.shape[0]), dim=0, sorted=False
_, index = torch.topk(
G_topk, round(self.alpha * G_topk.shape[0]), dim=0, sorted=True
)
index, _ = torch.sort(index)
return _, index
def serialized_model(self):
"""
Convert model to json dict. self.alpha specifies the fraction of model to send.
Convert model to a dict. self.alpha specifies the fraction of model to send.
Returns
-------
dict
Model converted to json dict
Model converted to a dict
"""
if self.alpha > self.metadata_cap: # Share fully
if self.alpha >= self.metadata_cap: # Share fully
if self.model.accumulated_changes is not None:
self.model.accumulated_changes = torch.zeros_like(
self.model.accumulated_changes
)
return super().serialized_model()
with torch.no_grad():
_, G_topk = self.extract_top_gradients()
self.model.shared_parameters_counter[G_topk] += 1
if self.accumulation:
self.model.rewind_accumulation(G_topk)
if self.save_shared:
shared_params = dict()
shared_params["order"] = list(self.model.state_dict().keys())
......@@ -144,9 +226,7 @@ class PartialModel(Sharing):
logging.info("Extracting topk params")
tensors_to_cat = [v.data.flatten() for v in self.model.parameters()]
T = torch.cat(tensors_to_cat, dim=0)
T_topk = T[G_topk]
T_topk = self.pre_share_model[G_topk]
logging.info("Generating dictionary to send")
......@@ -155,32 +235,31 @@ class PartialModel(Sharing):
if not self.dict_ordered:
raise NotImplementedError
m["indices"] = G_topk.numpy().tolist()
m["alpha"] = self.alpha
m["indices"] = G_topk.numpy().astype(np.int32)
m["params"] = T_topk.numpy().tolist()
m["params"] = T_topk.numpy()
m["send_partial"] = True
assert len(m["indices"]) == len(m["params"])
logging.info("Elements sending: {}".format(len(m["indices"])))
logging.info("Generated dictionary to send")
for key in m:
m[key] = json.dumps(m[key])
logging.info("Converted dictionary to json")
self.total_data += len(self.communication.encrypt(m["params"]))
self.total_meta += len(self.communication.encrypt(m["indices"]))
logging.info("Converted dictionary to pickle")
return m
return self.compress_data(m)
def deserialized_model(self, m):
"""
Convert received json dict to state_dict.
Convert received dict to state_dict.
Parameters
----------
m : dict
json dict received
dict received
Returns
-------
......@@ -188,10 +267,12 @@ class PartialModel(Sharing):
state_dict of received
"""
if self.alpha > self.metadata_cap: # Share fully
if "send_partial" not in m:
return super().deserialized_model(m)
with torch.no_grad():
m = self.decompress_data(m)
state_dict = self.model.state_dict()
if not self.dict_ordered:
......@@ -207,9 +288,9 @@ class PartialModel(Sharing):
tensors_to_cat.append(t)
T = torch.cat(tensors_to_cat, dim=0)
index_tensor = torch.tensor(json.loads(m["indices"]))
index_tensor = torch.tensor(m["indices"], dtype=torch.long)
logging.debug("Original tensor: {}".format(T[index_tensor]))
T[index_tensor] = torch.tensor(json.loads(m["params"]))
T[index_tensor] = torch.tensor(m["params"])
logging.debug("Final tensor: {}".format(T[index_tensor]))
start_index = 0
for i, key in enumerate(state_dict):
......@@ -218,3 +299,90 @@ class PartialModel(Sharing):
start_index = end_index
return state_dict
def _pre_step(self):
"""
Called at the beginning of step.
"""
logging.info("PartialModel _pre_step")
with torch.no_grad():
tensors_to_cat = [
v.data.flatten() for _, v in self.model.state_dict().items()
]
self.pre_share_model = torch.cat(tensors_to_cat, dim=0)
# Would only need one of the transforms
self.pre_share_model_transformed = self.change_transformer(
self.pre_share_model
)
change = self.change_transformer(self.pre_share_model - self.init_model)
if self.accumulation:
if not self.accumulate_averaging_changes:
# Need to accumulate in _pre_step as the accumulation gets rewind during the step
self.model.accumulated_changes += change
change = self.model.accumulated_changes.clone().detach()
else:
# For the legacy implementation, we will only rewind currently accumulated values
# and add the model change due to averaging in the end
change += self.model.accumulated_changes
# stores change of the model due to training, change due to averaging is not accounted
self.model.model_change = change
def _post_step(self):
"""
Called at the end of step.
"""
logging.info("PartialModel _post_step")
with torch.no_grad():
tensors_to_cat = [
v.data.flatten() for _, v in self.model.state_dict().items()
]
post_share_model = torch.cat(tensors_to_cat, dim=0)
self.init_model = post_share_model
if self.accumulation:
if self.accumulate_averaging_changes:
self.model.accumulated_changes += self.change_transformer(
self.init_model - self.prev
)
self.prev = self.init_model
self.model.model_change = None
if self.save_accumulated:
self.save_change()
def save_vector(self, v, s):
"""
Saves the given vector to the file.
Parameters
----------
v : torch.tensor
The torch tensor to write to file
s : str
Path to folder to write to
"""
output_dict = dict()
output_dict["order"] = list(self.model.state_dict().keys())
shapes = dict()
for k, v1 in self.model.state_dict().items():
shapes[k] = list(v1.shape)
output_dict["shapes"] = shapes
output_dict["tensor"] = v.tolist()
with open(
os.path.join(
s,
"{}.json".format(self.communication_round + 1),
),
"w",
) as of:
json.dump(output_dict, of)
def save_change(self):
"""
Saves the change and the gradient values for every iteration
"""
self.save_vector(self.model.model_change, self.model_change_path)
import random
from decentralizepy.sharing.PartialModel import PartialModel
from decentralizepy.utils import identity
class RandomAlpha(PartialModel):
"""
This class implements the partial model sharing with a random alpha each iteration.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha_list=[0.1, 0.2, 0.3, 0.4, 1.0],
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
accumulation=False,
save_accumulated="",
change_transformer=identity,
accumulate_averaging_changes=False,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
1.0,
dict_ordered,
save_shared,
metadata_cap,
accumulation,
save_accumulated,
change_transformer,
accumulate_averaging_changes,
compress,
compression_package,
compression_class,
)
self.alpha_list = eval(alpha_list)
random.seed(self.mapping.get_uid(self.rank, self.machine_id))
def get_data_to_send(self):
"""
Perform a sharing step. Implements D-PSGD with alpha randomly chosen.
"""
self.alpha = random.choice(self.alpha_list)
return super().get_data_to_send()
# Deprecated
import random
from decentralizepy.sharing.PartialModel import PartialModel
class RandomAlphaIncremental(PartialModel):
"""
This class implements the partial model sharing with a random alpha from an increasing range each iteration.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
range_start=0.1,
range_end=0.2,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
1.0,
dict_ordered,
save_shared,
metadata_cap,
compress,
compression_package,
compression_class,
)
random.seed(self.mapping.get_uid(self.rank, self.machine_id))
self.range_start = range_start
self.range_end = range_end
def get_data_to_send(self):
"""
Perform a sharing step. Implements D-PSGD with alpha randomly chosen from an increasing range.
"""
self.alpha = round(random.uniform(self.range_start, self.range_end), 2)
self.range_end = min(1.0, self.range_end + round(random.uniform(0.0, 0.1), 2))
return super().get_data_to_send()
import random
from decentralizepy.sharing.Wavelet import Wavelet
class RandomAlpha(Wavelet):
"""
This class implements the partial model sharing with a random alpha each iteration.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha_list="[0.1, 0.2, 0.3, 0.4, 1.0]",
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
wavelet="haar",
level=4,
change_based_selection=True,
save_accumulated="",
accumulation=False,
accumulate_averaging_changes=False,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
1.0,
dict_ordered,
save_shared,
metadata_cap,
wavelet,
level,
change_based_selection,
save_accumulated,
accumulation,
accumulate_averaging_changes,
compress,
compression_package,
compression_class,
)
self.alpha_list = eval(alpha_list)
random.seed(self.mapping.get_uid(self.rank, self.machine_id))
def get_data_to_send(self):
"""
Perform a sharing step. Implements D-PSGD with alpha randomly chosen.
"""
self.alpha = random.choice(self.alpha_list)
return super().get_data_to_send()
import json
import logging
import math
import random
import torch
from decentralizepy.sharing.Sharing import Sharing
class RoundRobinPartial(Sharing):
"""
This class implements the Round robin partial model sharing.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha=1.0,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet.
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
alpha : float
Percentage of model to share
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
compress,
compression_package,
compression_class,
)
self.alpha = alpha
random.seed(self.mapping.get_uid(rank, machine_id))
n_params = self.model.count_params()
logging.info("Total number of parameters: {}".format(n_params))
self.block_size = math.ceil(self.alpha * n_params)
logging.info("Block_size: {}".format(self.block_size))
self.num_blocks = math.ceil(n_params / self.block_size)
logging.info("Total number of blocks: {}".format(n_params))
self.current_block = random.randint(0, self.num_blocks - 1)
def serialized_model(self):
"""
Convert model to json dict. self.alpha specifies the fraction of model to send.
Returns
-------
dict
Model converted to json dict
"""
with torch.no_grad():
logging.info("Extracting params to send")
tensors_to_cat = [v.data.flatten() for v in self.model.parameters()]
T = torch.cat(tensors_to_cat, dim=0)
block_start = self.current_block * self.block_size
block_end = min(T.shape[0], (self.current_block + 1) * self.block_size)
self.current_block = (self.current_block + 1) % self.num_blocks
T_send = T[block_start:block_end]
self.model.shared_parameters_counter[block_start:block_end] += 1
logging.info("Range sending: {}-{}".format(block_start, block_end))
logging.info("Generating dictionary to send")
m = dict()
m["block_start"] = block_start
m["block_end"] = block_end
m["params"] = T_send.numpy().tolist()
logging.info("Elements sending: {}".format(len(m["params"])))
logging.info("Generated dictionary to send")
for key in m:
m[key] = json.dumps(m[key])
logging.info("Converted dictionary to json")
self.total_data += len(self.communication.encrypt(m["params"]))
return self.compress_data(m)
def deserialized_model(self, m):
"""
Convert received json dict to state_dict.
Parameters
----------
m : dict
json dict received
Returns
-------
state_dict
state_dict of received
"""
m = self.decompress_data(m)
with torch.no_grad():
state_dict = self.model.state_dict()
shapes = []
lens = []
tensors_to_cat = []
for _, v in state_dict.items():
shapes.append(v.shape)
t = v.flatten()
lens.append(t.shape[0])
tensors_to_cat.append(t)
T = torch.cat(tensors_to_cat, dim=0)
block_start = json.loads(m["block_start"])
block_end = json.loads(m["block_end"])
T[block_start:block_end] = torch.tensor(json.loads(m["params"]))
start_index = 0
for i, key in enumerate(state_dict):
end_index = start_index + lens[i]
state_dict[key] = T[start_index:end_index].reshape(shapes[i])
start_index = end_index
return state_dict
import json
import importlib
import logging
from collections import deque
import numpy
import torch
......@@ -13,7 +11,18 @@ class Sharing:
"""
def __init__(
self, rank, machine_id, communication, mapping, graph, model, dataset, log_dir
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
......@@ -33,7 +42,7 @@ class Sharing:
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yer! TODO
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
......@@ -48,70 +57,66 @@ class Sharing:
self.dataset = dataset
self.communication_round = 0
self.log_dir = log_dir
self.total_data = 0
self.peer_deques = dict()
my_neighbors = self.graph.neighbors(self.uid)
for n in my_neighbors:
self.peer_deques[n] = deque()
def received_from_all(self):
"""
Check if all neighbors have sent the current iteration
Returns
-------
bool
True if required data has been received, False otherwise
"""
for _, i in self.peer_deques.items():
if len(i) == 0:
return False
return True
def get_neighbors(self, neighbors):
"""
Choose which neighbors to share with
Parameters
----------
neighbors : list(int)
List of all neighbors
Returns
-------
list(int)
Neighbors to share with
"""
# modify neighbors here
return neighbors
self.shapes = []
self.lens = []
with torch.no_grad():
for _, v in self.model.state_dict().items():
self.shapes.append(v.shape)
t = v.flatten().numpy()
self.lens.append(t.shape[0])
self.compress = compress
if compression_package and compression_class:
compressor_module = importlib.import_module(compression_package)
compressor_class = getattr(compressor_module, compression_class)
self.compressor = compressor_class()
logging.info(f"Using the {compressor_class} to compress the data")
else:
assert not self.compress
def compress_data(self, data):
result = dict(data)
if self.compress:
if "params" in result:
result["params"] = self.compressor.compress_float(result["params"])
return result
def decompress_data(self, data):
if self.compress:
if "params" in data:
data["params"] = self.compressor.decompress_float(data["params"])
return data
def serialized_model(self):
"""
Convert model to json dict. Here we can choose how much to share
Convert model to a dictionary. Here we can choose how much to share
Returns
-------
dict
Model converted to json dict
Model converted to dict
"""
m = dict()
for key, val in self.model.state_dict().items():
m[key] = json.dumps(val.numpy().tolist())
self.total_data += len(self.communication.encrypt(m[key]))
return m
to_cat = []
with torch.no_grad():
for _, v in self.model.state_dict().items():
t = v.flatten()
to_cat.append(t)
flat = torch.cat(to_cat)
data = dict()
data["params"] = flat.numpy()
return self.compress_data(data)
def deserialized_model(self, m):
"""
Convert received json dict to state_dict.
Convert received dict to state_dict.
Parameters
----------
m : dict
json dict received
received dict
Returns
-------
......@@ -120,61 +125,104 @@ class Sharing:
"""
state_dict = dict()
for key, value in m.items():
state_dict[key] = torch.from_numpy(numpy.array(json.loads(value)))
m = self.decompress_data(m)
T = m["params"]
start_index = 0
for i, key in enumerate(self.model.state_dict()):
end_index = start_index + self.lens[i]
state_dict[key] = torch.from_numpy(
T[start_index:end_index].reshape(self.shapes[i])
)
start_index = end_index
return state_dict
def step(self):
def _pre_step(self):
"""
Called at the beginning of step.
"""
Perform a sharing step. Implements D-PSGD.
pass
def _post_step(self):
"""
Called at the end of step.
"""
pass
def _averaging(self, peer_deques):
"""
Averages the received model with the local model
"""
with torch.no_grad():
total = dict()
weight_total = 0
for i, n in enumerate(peer_deques):
data = peer_deques[n].popleft()
degree, iteration = data["degree"], data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
)
data = self.deserialized_model(data)
# Metro-Hastings
weight = 1 / (max(len(peer_deques), degree) + 1)
weight_total += weight
for key, value in data.items():
if key in total:
total[key] += value * weight
else:
total[key] = value * weight
for key, value in self.model.state_dict().items():
total[key] += (1 - weight_total) * value # Metro-Hastings
self.model.load_state_dict(total)
self._post_step()
self.communication_round += 1
def get_data_to_send(self):
self._pre_step()
data = self.serialized_model()
my_uid = self.mapping.get_uid(self.rank, self.machine_id)
all_neighbors = self.graph.neighbors(my_uid)
iter_neighbors = self.get_neighbors(all_neighbors)
data["degree"] = len(all_neighbors)
data["iteration"] = self.communication_round
for neighbor in iter_neighbors:
self.communication.send(neighbor, data)
logging.info("Waiting for messages from neighbors")
while not self.received_from_all():
sender, data = self.communication.receive()
logging.debug("Received model from {}".format(sender))
degree = data["degree"]
iteration = data["iteration"]
del data["degree"]
del data["iteration"]
self.peer_deques[sender].append((degree, iteration, data))
logging.info(
"Deserialized received model from {} of iteration {}".format(
sender, iteration
)
)
logging.info("Starting model averaging after receiving from all neighbors")
total = dict()
weight_total = 0
for i, n in enumerate(self.peer_deques):
degree, iteration, data = self.peer_deques[n].popleft()
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(n, iteration)
)
data = self.deserialized_model(data)
weight = 1 / (max(len(self.peer_deques), degree) + 1) # Metro-Hastings
weight_total += weight
for key, value in data.items():
if key in total:
total[key] += value * weight
else:
total[key] = value * weight
for key, value in self.model.state_dict().items():
total[key] += (1 - weight_total) * value # Metro-Hastings
return data
self.model.load_state_dict(total)
def _averaging_server(self, peer_deques):
"""
Averages the received models of all working nodes
logging.info("Model averaging complete")
"""
with torch.no_grad():
total = dict()
for i, n in enumerate(peer_deques):
data = peer_deques[n].popleft()
degree, iteration = data["degree"], data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
)
data = self.deserialized_model(data)
weight = 1 / len(peer_deques)
for key, value in data.items():
if key in total:
total[key] += weight * value
else:
total[key] = weight * value
self.model.load_state_dict(total)
self._post_step()
self.communication_round += 1
return total
# Deprecated
import logging
from collections import deque
import torch
class Sharing:
"""
API defining who to share with and what, and what to do on receiving
"""
def __init__(
self, rank, machine_id, communication, mapping, graph, model, dataset, log_dir
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
"""
self.rank = rank
self.machine_id = machine_id
self.uid = mapping.get_uid(rank, machine_id)
self.communication = communication
self.mapping = mapping
self.graph = graph
self.model = model
self.dataset = dataset
self.communication_round = 0
self.log_dir = log_dir
self.peer_deques = dict()
my_neighbors = self.graph.neighbors(self.uid)
for n in my_neighbors:
self.peer_deques[n] = deque()
self.averaging_weights = self.graph.centr()
def received_from_all(self):
"""
Check if all neighbors have sent the current iteration
Returns
-------
bool
True if required data has been received, False otherwise
"""
for _, i in self.peer_deques.items():
if len(i) == 0:
return False
return True
def get_neighbors(self, neighbors):
"""
Choose which neighbors to share with
Parameters
----------
neighbors : list(int)
List of all neighbors
Returns
-------
list(int)
Neighbors to share with
"""
# modify neighbors here
return neighbors
def serialized_model(self):
"""
Convert model to a dictionary. Here we can choose how much to share
Returns
-------
dict
Model converted to dict
"""
m = dict()
for key, val in self.model.state_dict().items():
m[key] = val.numpy()
return m
def deserialized_model(self, m):
"""
Convert received dict to state_dict.
Parameters
----------
m : dict
received dict
Returns
-------
state_dict
state_dict of received
"""
state_dict = dict()
for key, value in m.items():
state_dict[key] = torch.from_numpy(value)
return state_dict
def _pre_step(self):
"""
Called at the beginning of step.
"""
pass
def _post_step(self):
"""
Called at the end of step.
"""
pass
def _averaging(self):
"""
Averages the received model with the local model
"""
with torch.no_grad():
total = dict()
for _, n in enumerate(self.peer_deques):
_, iteration, data = self.peer_deques[n].popleft()
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
)
data = self.deserialized_model(data)
weight = self.averaging_weights[self.uid, n]
for key, value in data.items():
if key in total:
total[key] += value * weight
else:
total[key] = value * weight
for key, value in self.model.state_dict().items():
total[key] += (
self.averaging_weights[self.uid, self.uid] * value
) # Metro-Hastings
self.model.load_state_dict(total)
def step(self):
"""
Perform a sharing step. Implements D-PSGD.
"""
self._pre_step()
data = self.serialized_model()
my_uid = self.mapping.get_uid(self.rank, self.machine_id)
all_neighbors = self.graph.neighbors(my_uid)
iter_neighbors = self.get_neighbors(all_neighbors)
data["degree"] = len(all_neighbors)
data["iteration"] = self.communication_round
for neighbor in iter_neighbors:
self.communication.send(neighbor, data)
logging.info("Waiting for messages from neighbors")
while not self.received_from_all():
sender, data = self.communication.receive()
logging.debug("Received model from {}".format(sender))
degree = data["degree"]
iteration = data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
self.peer_deques[sender].append((degree, iteration, data))
logging.info(
"Deserialized received model from {} of iteration {}".format(
sender, iteration
)
)
logging.info("Starting model averaging after receiving from all neighbors")
self._averaging()
logging.info("Model averaging complete")
self.communication_round += 1
self._post_step()
import json
import logging
import os
from pathlib import Path
import torch
from decentralizepy.sharing.Sharing import Sharing
class SubSampling(Sharing):
"""
This class implements the subsampling version of model sharing
It is based on PartialModel.py
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha=1.0,
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
pickle=True,
layerwise=False,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
alpha : float
Percentage of model to share
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
pickle : bool
use pickle to serialize the model parameters
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
compress,
compression_package,
compression_class,
)
self.alpha = alpha
self.dict_ordered = dict_ordered
self.save_shared = save_shared
self.metadata_cap = metadata_cap
# self.random_seed_generator = torch.Generator()
# # Will use the random device if supported by CPU, else uses the system time
# # In the latter case we could get duplicate seeds on some of the machines
# self.random_seed_generator.seed()
self.random_generator = torch.Generator()
# Will use the random device if supported by CPU, else uses the system time
# In the latter case we could get duplicate seeds on some of the machines
self.random_generator.seed()
self.seed = self.random_generator.initial_seed()
self.pickle = pickle
self.layerwise = layerwise
logging.info("subsampling pickling=" + str(pickle))
if self.save_shared:
# Only save for 2 procs: Save space
if rank != 0 or rank != 1:
self.save_shared = False
if self.save_shared:
self.folder_path = os.path.join(
self.log_dir, "shared_params/{}".format(self.rank)
)
Path(self.folder_path).mkdir(parents=True, exist_ok=True)
with torch.no_grad():
tensors_to_cat = []
for _, v in self.model.state_dict().items():
t = v.flatten()
tensors_to_cat.append(t)
self.init_model = torch.cat(tensors_to_cat, dim=0)
self.model.shared_parameters_counter = torch.zeros(
self.init_model.shape[0], dtype=torch.int32
)
def apply_subsampling(self):
"""
Creates a random binary mask that is used to subsample the parameters that will be shared
Returns
-------
tuple
(a,b,c). a: the selected parameters as flat vector, b: the random seed used to crate the binary mask
c: the alpha
"""
logging.info("Returning subsampling gradients")
if not self.layerwise:
tensors_to_cat = [
v.data.flatten() for _, v in self.model.state_dict().items()
]
concated = torch.cat(tensors_to_cat, dim=0)
curr_seed = self.seed + self.communication_round # is increased in step
self.random_generator.manual_seed(curr_seed)
# logging.debug("Subsampling seed for uid = " + str(self.uid) + " is: " + str(curr_seed))
# Or we could use torch.bernoulli
binary_mask = (
torch.rand(
size=(concated.size(dim=0),), generator=self.random_generator
)
<= self.alpha
)
subsample = concated[binary_mask]
self.model.shared_parameters_counter[binary_mask] += 1
# logging.debug("Subsampling vector is of size: " + str(subsample.size(dim = 0)))
return (subsample, curr_seed, self.alpha)
else:
values_list = []
offsets = [0]
off = 0
curr_seed = self.seed + self.communication_round # is increased in step
self.random_generator.manual_seed(curr_seed)
for _, v in self.model.state_dict().items():
flat = v.flatten()
binary_mask = (
torch.rand(
size=(flat.size(dim=0),), generator=self.random_generator
)
<= self.alpha
)
# TODO: support shared_parameters_counter
selected = flat[binary_mask]
values_list.append(selected)
off += selected.size(dim=0)
offsets.append(off)
subsample = torch.cat(values_list, dim=0)
return (subsample, curr_seed, self.alpha)
def serialized_model(self):
"""
Convert model to json dict. self.alpha specifies the fraction of model to send.
Returns
-------
dict
Model converted to json dict
"""
if self.alpha > self.metadata_cap: # Share fully
return super().serialized_model()
with torch.no_grad():
subsample, seed, alpha = self.apply_subsampling()
if self.save_shared:
shared_params = dict()
shared_params["order"] = list(self.model.state_dict().keys())
shapes = dict()
for k, v in self.model.state_dict().items():
shapes[k] = list(v.shape)
shared_params["shapes"] = shapes
# TODO: should store the shared indices and not the value
# shared_params[self.communication_round] = subsample.tolist() # is slow
shared_params["seed"] = seed
shared_params["alpha"] = alpha
with open(
os.path.join(
self.folder_path,
"{}_shared_params.json".format(self.communication_round + 1),
),
"w",
) as of:
json.dump(shared_params, of)
m = dict()
if not self.dict_ordered:
raise NotImplementedError
m["seed"] = seed
m["alpha"] = alpha
m["params"] = subsample.numpy()
return self.compress_data(m)
def deserialized_model(self, m):
"""
Convert received json dict to state_dict.
Parameters
----------
m : dict
json dict received
Returns
-------
state_dict
state_dict of received
"""
if self.alpha > self.metadata_cap: # Share fully
return super().deserialized_model(m)
m = self.decompress_data(m)
with torch.no_grad():
state_dict = self.model.state_dict()
if not self.dict_ordered:
raise NotImplementedError
seed = m["seed"]
alpha = m["alpha"]
params = m["params"]
random_generator = (
torch.Generator()
) # new generator, such that we do not overwrite the other one
random_generator.manual_seed(seed)
shapes = []
lens = []
tensors_to_cat = []
binary_submasks = []
for _, v in state_dict.items():
shapes.append(v.shape)
t = v.flatten()
lens.append(t.shape[0])
tensors_to_cat.append(t)
if self.layerwise:
binary_mask = (
torch.rand(size=(t.size(dim=0),), generator=random_generator)
<= alpha
)
binary_submasks.append(binary_mask)
T = torch.cat(tensors_to_cat, dim=0)
params_tensor = torch.from_numpy(params)
if not self.layerwise:
binary_mask = (
torch.rand(size=(T.size(dim=0),), generator=random_generator)
<= alpha
)
else:
binary_mask = torch.cat(binary_submasks, dim=0)
logging.debug("Original tensor: {}".format(T[binary_mask]))
T[binary_mask] = params_tensor
logging.debug("Final tensor: {}".format(T[binary_mask]))
start_index = 0
for i, key in enumerate(state_dict):
end_index = start_index + lens[i]
state_dict[key] = T[start_index:end_index].reshape(shapes[i])
start_index = end_index
return state_dict
# Deprecated
import logging
from collections import deque
import torch
class Synchronous:
"""
Synchronous training
"""
def __init__(
self, rank, machine_id, communication, mapping, graph, model, dataset, log_dir
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
"""
self.rank = rank
self.machine_id = machine_id
self.uid = mapping.get_uid(rank, machine_id)
self.communication = communication
self.mapping = mapping
self.graph = graph
self.model = model
self.dataset = dataset
self.communication_round = 0
self.log_dir = log_dir
self.peer_deques = dict()
self.my_neighbors = self.graph.neighbors(self.uid)
for n in self.my_neighbors:
self.peer_deques[n] = deque()
with torch.no_grad():
self.init_model = {}
for k, v in self.model.state_dict().items():
self.init_model[k] = v.clone().detach()
def received_from_all(self):
"""
Check if all neighbors have sent the current iteration
Returns
-------
bool
True if required data has been received, False otherwise
"""
for _, i in self.peer_deques.items():
if len(i) == 0:
return False
return True
def get_neighbors(self, neighbors):
"""
Choose which neighbors to share with
Parameters
----------
neighbors : list(int)
List of all neighbors
Returns
-------
list(int)
Neighbors to share with
"""
# modify neighbors here
return neighbors
def serialized_gradient(self):
"""
Convert model to a dictionary. Here we can choose how much to share
Returns
-------
dict
Model converted to dict
"""
m = dict()
for key, val in self.model.state_dict().items():
m[key] = val - self.init_model[key] # this is -lr*gradient
return m
def serialized_model(self):
"""
Convert model to a dictionary. Here we can choose how much to share
Returns
-------
dict
Model converted to dict
"""
m = dict()
for key, val in self.model.state_dict().items():
m[key] = val.clone().detach()
return m
def deserialized_model(self, m):
"""
Convert received dict to state_dict.
Parameters
----------
m : dict
received dict
Returns
-------
state_dict
state_dict of received
"""
return m
def _pre_step(self):
"""
Called at the beginning of step.
"""
pass
def _post_step(self):
"""
Called at the end of step.
"""
with torch.no_grad():
self.init_model = {}
for k, v in self.model.state_dict().items():
self.init_model[k] = v.clone().detach()
def _apply_gradients(self):
"""
Averages the received model with the local model
"""
with torch.no_grad():
total = dict()
for i, n in enumerate(self.peer_deques):
gradient = self.peer_deques[n].popleft()
logging.debug(
"Applying gradient from neighbor {}".format(
n,
)
)
grad = self.deserialized_model(gradient)
for key, value in grad.items():
if key in total:
total[key] += value
else:
total[key] = value
my_grad = self.serialized_gradient()
for key, value in my_grad.items():
if key in total:
total[key] += value
else:
total[key] = value
new_model = {}
for key, value in self.init_model.items():
new_model[key] = value + total[key] * (1 / (len(self.my_neighbors) + 1))
self.model.load_state_dict(new_model)
def step(self):
"""
Perform a sharing step. Implements D-PSGD.
"""
self._pre_step()
logging.info("--- COMMUNICATION ROUND {} ---".format(self.communication_round))
if self.uid != 0:
gradient = self.serialized_gradient()
# Should be only one neighbour
self.communication.send(0, gradient)
logging.info("Waiting for messages from central node")
sender, data = self.communication.receive()
logging.debug("Received model from {}".format(sender))
logging.info(
"Deserialized received model from {} of iteration {}".format(
sender, self.communication_round
)
)
self.model.load_state_dict(data)
else:
logging.info("Waiting for messages from leaf nodes")
while not self.received_from_all():
sender, data = self.communication.receive()
logging.debug("Received gradient from {}".format(sender))
self.peer_deques[sender].append(data)
logging.info(
"Deserialized gradient model from {} of iteration {}".format(
sender, self.communication_round
)
)
self._apply_gradients()
data = self.serialized_model()
all_neighbors = self.graph.neighbors(self.uid)
iter_neighbors = self.get_neighbors(all_neighbors)
for neighbor in iter_neighbors:
self.communication.send(neighbor, data)
self.communication_round += 1
self._post_step()
import logging
import torch
from decentralizepy.sharing.PartialModel import PartialModel
from decentralizepy.utils import identity
class TopKNormalized(PartialModel):
"""
This class implements the vanilla version of partial model sharing.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha=1.0,
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
accumulation=False,
save_accumulated="",
change_transformer=identity,
accumulate_averaging_changes=False,
epsilon=0.01,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
alpha : float
Percentage of model to share
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
accumulation : bool
True if the the indices to share should be selected based on accumulated frequency change
save_accumulated : bool
True if accumulated weight change should be written to file. In case of accumulation the accumulated change
is stored. If a change_transformer is used then the transformed change is stored.
change_transformer : (x: Tensor) -> Tensor
A function that transforms the model change into other domains. Default: identity function
accumulate_averaging_changes: bool
True if the accumulation should account the model change due to averaging
epsilon : float
numerical stability parameter used during normalization
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha,
dict_ordered,
save_shared,
metadata_cap,
accumulation,
save_accumulated,
change_transformer,
accumulate_averaging_changes,
compress,
compression_package,
compression_class,
)
self.epsilon = epsilon
def extract_top_gradients(self):
"""
Extract the indices and values of the topK gradients.
The gradients must have been accumulated.
Returns
-------
tuple
(a,b). a: The magnitudes of the topK gradients, b: Their indices.
"""
logging.info("Returning topk gradients")
G_topk = torch.abs(self.model.model_change)
G_topk_normalized = G_topk / (torch.abs(self.pre_share_model) + self.epsilon)
std, mean = torch.std_mean(G_topk, unbiased=False)
self.std = std.item()
self.mean = mean.item()
return torch.topk(
G_topk_normalized, round(self.alpha * G_topk.shape[0]), dim=0, sorted=False
)
import json
import logging
import os
from pathlib import Path
import numpy as np
import torch
from decentralizepy.sharing.Sharing import Sharing
class TopKParams(Sharing):
"""
This class implements the vanilla version of partial model sharing.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha=1.0,
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
alpha : float
Percentage of model to share
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
compress,
compression_package,
compression_class,
)
self.alpha = alpha
self.dict_ordered = dict_ordered
self.save_shared = save_shared
self.metadata_cap = metadata_cap
if self.save_shared:
# Only save for 2 procs: Save space
if rank != 0 or rank != 1:
self.save_shared = False
if self.save_shared:
self.folder_path = os.path.join(
self.log_dir, "shared_params/{}".format(self.rank)
)
Path(self.folder_path).mkdir(parents=True, exist_ok=True)
def extract_top_params(self):
"""
Extract the indices and values of the topK params layerwise.
The gradients must have been accumulated.
Returns
-------
tuple
(a,b,c). a: The topK params, b: Their indices, c: The offsets
"""
logging.info("Returning TopKParams gradients")
values_list = []
index_list = []
offsets = [0]
off = 0
for _, v in self.model.state_dict().items():
flat = v.flatten()
values, index = torch.topk(
flat.abs(), round(self.alpha * flat.size(dim=0)), dim=0, sorted=False
)
values_list.append(flat[index])
index_list.append(index)
off += values.size(dim=0)
offsets.append(off)
cat_values = torch.cat(values_list, dim=0)
cat_index = torch.cat(index_list, dim=0)
# logging.debug("Subsampling vector is of size: " + str(subsample.size(dim = 0)))
return (cat_values, cat_index, offsets)
def serialized_model(self):
"""
Convert model to json dict. self.alpha specifies the fraction of model to send.
Returns
-------
dict
Model converted to json dict
"""
if self.alpha > self.metadata_cap: # Share fully
return super().serialized_model()
with torch.no_grad():
values, index, offsets = self.extract_top_params()
self.model.shared_parameters_counter[index] += 1
if self.save_shared:
shared_params = dict()
shared_params["order"] = list(self.model.state_dict().keys())
shapes = dict()
for k, v in self.model.state_dict().items():
shapes[k] = list(v.shape)
shared_params["shapes"] = shapes
shared_params[self.communication_round] = index.tolist()
# TODO: store offsets
with open(
os.path.join(
self.folder_path,
"{}_shared_params.json".format(self.communication_round + 1),
),
"w",
) as of:
json.dump(shared_params, of)
logging.info("Extracting topk params")
logging.info("Generating dictionary to send")
m = dict()
if not self.dict_ordered:
raise NotImplementedError
m["indices"] = index.numpy().astype(np.int32)
m["params"] = values.numpy()
m["offsets"] = offsets
assert len(m["indices"]) == len(m["params"])
logging.info("Elements sending: {}".format(len(m["indices"])))
logging.info("Generated dictionary to send")
# for key in m:
# m[key] = json.dumps(m[key])
logging.info("Converted dictionary to json")
return self.compress_data(m)
def deserialized_model(self, m):
"""
Convert received json dict to state_dict.
Parameters
----------
m : dict
json dict received
Returns
-------
state_dict
state_dict of received
"""
if self.alpha > self.metadata_cap: # Share fully
return super().deserialized_model(m)
m = self.decompress_data(m)
with torch.no_grad():
state_dict = self.model.state_dict()
if not self.dict_ordered:
raise NotImplementedError
shapes = []
lens = []
tensors_to_cat = []
offsets = m["offsets"]
params = torch.tensor(m["params"])
indices = torch.tensor(m["indices"], dtype=torch.long)
for i, (_, v) in enumerate(state_dict.items()):
shapes.append(v.shape)
t = v.flatten().clone().detach() # it is not always copied
lens.append(t.shape[0])
index = indices[offsets[i] : offsets[i + 1]]
t[index] = params[offsets[i] : offsets[i + 1]]
tensors_to_cat.append(t)
start_index = 0
for i, key in enumerate(state_dict):
end_index = start_index + lens[i]
state_dict[key] = tensors_to_cat[i].reshape(shapes[i])
start_index = end_index
return state_dict
import logging
import numpy as np
import torch
from decentralizepy.sharing.PartialModel import PartialModel
class TopKPlusRandom(PartialModel):
"""
This class implements partial model sharing with some random additions.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha=1.0,
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
alpha : float
Percentage of model to share
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha,
dict_ordered,
save_shared,
metadata_cap,
compress,
compression_package,
compression_class,
)
def extract_top_gradients(self):
"""
Extract the indices and values of the topK gradients and put some extra random.
The gradients must have been accumulated.
Returns
-------
tuple
(a,b). a: The magnitudes of the topK gradients, b: Their indices.
"""
logging.info("Returning topk gradients")
G = torch.abs(self.model.model_change)
std, mean = torch.std_mean(G, unbiased=False)
self.std = std.item()
self.mean = mean.item()
elements_to_pick = round(self.alpha / 2.0 * G.shape[0])
G_topK = torch.topk(G, min(G.shape[0], elements_to_pick), dim=0, sorted=False)
more_indices = np.arange(G.shape[0], dtype=int)
np.delete(more_indices, G_topK[1].numpy())
more_indices = np.random.choice(
more_indices, min(more_indices.shape[0], elements_to_pick)
)
G_topK0 = torch.cat([G_topK[0], G[more_indices]], dim=0)
G_topK1 = torch.cat([G_topK[1], torch.tensor(more_indices)], dim=0)
return G_topK0, G_topK1
import json
import logging
import os
import numpy as np
import pywt
import torch
from decentralizepy.sharing.PartialModel import PartialModel
def change_transformer_wavelet(x, wavelet, level):
"""
Transforms the model changes into wavelet frequency domain
Parameters
----------
x : torch.Tensor
Model change in the space domain
wavelet : str
name of the wavelet to be used in gradient compression
level: int
name of the wavelet to be used in gradient compression
Returns
-------
x : torch.Tensor
Representation of the change int the wavelet domain
"""
coeff = pywt.wavedec(x, wavelet, level=level)
data, coeff_slices = pywt.coeffs_to_array(coeff)
return torch.from_numpy(data.ravel())
class Wavelet(PartialModel):
"""
This class implements the wavelet version of model sharing
It is based on PartialModel.py
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha=1.0,
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
wavelet="haar",
level=4,
change_based_selection=True,
save_accumulated="",
accumulation=False,
accumulate_averaging_changes=False,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
alpha : float
Percentage of model to share
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
wavelet: str
name of the wavelet to be used in gradient compression
level: int
name of the wavelet to be used in gradient compression
change_based_selection : bool
use frequency change to select topk frequencies
save_accumulated : bool
True if accumulated weight change in the wavelet domain should be written to file. In case of accumulation
the accumulated change is stored.
accumulation : bool
True if the the indices to share should be selected based on accumulated frequency change
accumulate_averaging_changes: bool
True if the accumulation should account the model change due to averaging
"""
self.wavelet = wavelet
self.level = level
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha,
dict_ordered,
save_shared,
metadata_cap,
accumulation,
save_accumulated,
lambda x: change_transformer_wavelet(x, wavelet, level),
accumulate_averaging_changes,
compress,
compression_package,
compression_class,
)
self.change_based_selection = change_based_selection
# Do a dummy transform to get the shape and coefficents slices
coeff = pywt.wavedec(self.init_model.numpy(), self.wavelet, level=self.level)
data, coeff_slices = pywt.coeffs_to_array(coeff)
self.wt_shape = data.shape
self.coeff_slices = coeff_slices
def apply_wavelet(self):
"""
Does wavelet transformation of the model parameters and selects topK (alpha) of them in the frequency domain
based on the undergone change during the current training step
Returns
-------
tuple
(a,b). a: selected wavelet coefficients, b: Their indices.
"""
logging.info("Returning wavelet compressed model weights")
data = self.pre_share_model_transformed
if self.change_based_selection:
diff = self.model.model_change
_, index = torch.topk(
diff.abs(),
round(self.alpha * len(diff)),
dim=0,
sorted=False,
)
else:
_, index = torch.topk(
data.abs(),
round(self.alpha * len(data)),
dim=0,
sorted=False,
)
index, _ = torch.sort(index)
return data[index], index
def serialized_model(self):
"""
Convert model to json dict. self.alpha specifies the fraction of model to send.
Returns
-------
dict
Model converted to json dict
"""
m = dict()
if self.alpha >= self.metadata_cap: # Share fully
data = self.pre_share_model_transformed
m["params"] = data.numpy()
if self.model.accumulated_changes is not None:
self.model.accumulated_changes = torch.zeros_like(
self.model.accumulated_changes
)
return self.compress_data(m)
with torch.no_grad():
topk, indices = self.apply_wavelet()
self.model.shared_parameters_counter[indices] += 1
self.model.rewind_accumulation(indices)
if self.save_shared:
shared_params = dict()
shared_params["order"] = list(self.model.state_dict().keys())
shapes = dict()
for k, v in self.model.state_dict().items():
shapes[k] = list(v.shape)
shared_params["shapes"] = shapes
# is slow
shared_params[self.communication_round] = indices.tolist()
shared_params["alpha"] = self.alpha
with open(
os.path.join(
self.folder_path,
"{}_shared_params.json".format(self.communication_round + 1),
),
"w",
) as of:
json.dump(shared_params, of)
if not self.dict_ordered:
raise NotImplementedError
m["alpha"] = self.alpha
m["params"] = topk.numpy()
m["indices"] = indices.numpy().astype(np.int32)
m["send_partial"] = True
return self.compress_data(m)
def deserialized_model(self, m):
"""
Convert received dict to state_dict.
Parameters
----------
m : dict
received dict
Returns
-------
state_dict
state_dict of received
"""
m = self.decompress_data(m)
ret = dict()
if "send_partial" not in m:
params = m["params"]
params_tensor = torch.tensor(params)
ret["params"] = params_tensor
return ret
with torch.no_grad():
if not self.dict_ordered:
raise NotImplementedError
alpha = m["alpha"]
params_tensor = torch.tensor(m["params"])
indices_tensor = torch.tensor(m["indices"], dtype=torch.long)
ret = dict()
ret["indices"] = indices_tensor
ret["params"] = params_tensor
ret["send_partial"] = True
return ret
def _averaging(self, peer_deques):
"""
Averages the received model with the local model
"""
with torch.no_grad():
total = None
weight_total = 0
wt_params = self.pre_share_model_transformed
for i, n in enumerate(peer_deques):
data = peer_deques[n].popleft()
degree, iteration = data["degree"], data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
)
data = self.deserialized_model(data)
params = data["params"]
if "indices" in data:
indices = data["indices"]
# use local data to complement
topkwf = wt_params.clone().detach()
topkwf[indices] = params
topkwf = topkwf.reshape(self.wt_shape)
else:
topkwf = params.reshape(self.wt_shape)
# Metro-Hastings
weight = 1 / (max(len(peer_deques), degree) + 1)
weight_total += weight
if total is None:
total = weight * topkwf
else:
total += weight * topkwf
# Metro-Hastings
total += (1 - weight_total) * wt_params
avg_wf_params = pywt.array_to_coeffs(
total.numpy(), self.coeff_slices, output_format="wavedec"
)
reverse_total = torch.from_numpy(
pywt.waverec(avg_wf_params, wavelet=self.wavelet)
)
start_index = 0
std_dict = {}
for i, key in enumerate(self.model.state_dict()):
end_index = start_index + self.lens[i]
std_dict[key] = reverse_total[start_index:end_index].reshape(
self.shapes[i]
)
start_index = end_index
self.model.load_state_dict(std_dict)
self._post_step()
self.communication_round += 1
def _averaging_server(self, peer_deques):
"""
Averages the received models of all working nodes
"""
with torch.no_grad():
total = None
wt_params = self.pre_share_model_transformed
for i, n in enumerate(peer_deques):
data = peer_deques[n].popleft()
degree, iteration = data["degree"], data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
)
data = self.deserialized_model(data)
params = data["params"]
if "indices" in data:
indices = data["indices"]
# use local data to complement
topkwf = wt_params.clone().detach()
topkwf[indices] = params
topkwf = topkwf.reshape(self.wt_shape)
else:
topkwf = params.reshape(self.wt_shape)
weight = 1 / len(peer_deques)
if total is None:
total = weight * topkwf
else:
total += weight * topkwf
avg_wf_params = pywt.array_to_coeffs(
total.numpy(), self.coeff_slices, output_format="wavedec"
)
reverse_total = torch.from_numpy(
pywt.waverec(avg_wf_params, wavelet=self.wavelet)
)
start_index = 0
std_dict = {}
for i, key in enumerate(self.model.state_dict()):
end_index = start_index + self.lens[i]
std_dict[key] = reverse_total[start_index:end_index].reshape(
self.shapes[i]
)
start_index = end_index
self.model.load_state_dict(std_dict)
self._post_step()
self.communication_round += 1
import json
import os
from pathlib import Path
import torch
from decentralizepy.training.Training import Training
from decentralizepy.utils import conditional_value
class ChangeAccumulator(Training):
"""
This class implements the training module which also accumulates model change in a list.
"""
def __init__(
self,
rank,
machine_id,
mapping,
model,
optimizer,
loss,
log_dir,
rounds="",
full_epochs="",
batch_size="",
shuffle="",
save_accumulated="",
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
model : torch.nn.Module
Neural Network for training
optimizer : torch.optim
Optimizer to learn parameters
loss : function
Loss function
log_dir : str
Directory to log the model change.
rounds : int, optional
Number of steps/epochs per training call
full_epochs: bool, optional
True if 1 round = 1 epoch. False if 1 round = 1 minibatch
batch_size : int, optional
Number of items to learn over, in one batch
shuffle : bool
True if the dataset should be shuffled before training.
save_accumulated : bool
True if accumulated weight change should be written to file
"""
super().__init__(
rank,
machine_id,
mapping,
model,
optimizer,
loss,
log_dir,
rounds,
full_epochs,
batch_size,
shuffle,
)
self.save_accumulated = conditional_value(save_accumulated, "", True)
self.communication_round = 0
if self.save_accumulated:
self.model_change_path = os.path.join(
self.log_dir, "model_change/{}".format(self.rank)
)
Path(self.model_change_path).mkdir(parents=True, exist_ok=True)
self.model_val_path = os.path.join(
self.log_dir, "model_val/{}".format(self.rank)
)
Path(self.model_val_path).mkdir(parents=True, exist_ok=True)
def save_vector(self, v, s):
"""
Saves the given vector to the file.
Parameters
----------
v : torch.tensor
The torch tensor to write to file
s : str
Path to folder to write to
"""
output_dict = dict()
output_dict["order"] = list(self.model.state_dict().keys())
shapes = dict()
for k, v1 in self.model.state_dict().items():
shapes[k] = list(v1.shape)
output_dict["shapes"] = shapes
output_dict["tensor"] = v.tolist()
with open(
os.path.join(
s,
"{}.json".format(self.communication_round + 1),
),
"w",
) as of:
json.dump(output_dict, of)
def save_change(self):
"""
Saves the change and the gradient values for every iteration
"""
tensors_to_cat = [
v.data.flatten() for _, v in self.model.accumulated_gradients[0].items()
]
change = torch.abs(torch.cat(tensors_to_cat, dim=0))
self.save_vector(change, self.model_change_path)
def save_model_params(self):
"""
Saves the change and the gradient values for every iteration
"""
tensors_to_cat = [v.data.flatten() for _, v in self.model.state_dict().items()]
params = torch.abs(torch.cat(tensors_to_cat, dim=0))
self.save_vector(params, self.model_val_path)
def train(self, dataset):
"""
One training iteration with accumulation of model change in model.accumulated_gradients.
Goes through the entire dataset.
Parameters
----------
dataset : decentralizepy.datasets.Dataset
The training dataset. Should implement get_trainset(batch_size, shuffle)
"""
self.model.accumulated_gradients = []
self.init_model = {
k: v.data.clone().detach()
for k, v in zip(self.model.state_dict(), self.model.parameters())
}
super().train(dataset)
with torch.no_grad():
change = {
k: v.data.clone().detach() - self.init_model[k]
for k, v in zip(self.model.state_dict(), self.model.parameters())
}
self.model.accumulated_gradients.append(change)
if self.save_accumulated:
self.save_change()
self.save_model_params()
self.communication_round += 1
import logging
from decentralizepy.training.Training import Training
class GradientAccumulator(Training):
"""
This class implements the training module which also accumulates gradients of steps in a list.
"""
def __init__(
self,
rank,
machine_id,
mapping,
model,
optimizer,
loss,
log_dir,
rounds="",
full_epochs="",
batch_size="",
shuffle="",
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
model : torch.nn.Module
Neural Network for training
optimizer : torch.optim
Optimizer to learn parameters
loss : function
Loss function
log_dir : str
Directory to log the model change.
rounds : int, optional
Number of steps/epochs per training call
full_epochs: bool, optional
True if 1 round = 1 epoch. False if 1 round = 1 minibatch
batch_size : int, optional
Number of items to learn over, in one batch
shuffle : bool
True if the dataset should be shuffled before training.
"""
super().__init__(
rank,
machine_id,
mapping,
model,
optimizer,
loss,
log_dir,
rounds,
full_epochs,
batch_size,
shuffle,
)
def trainstep(self, data, target):
"""
One training step on a minibatch.
Parameters
----------
data : any
Data item
target : any
Label
Returns
-------
int
Loss Value for the step
"""
self.model.zero_grad()
output = self.model(data)
loss_val = self.loss(output, target)
loss_val.backward()
logging.debug("Accumulating Gradients")
self.model.accumulated_gradients.append(
{
k: v.grad.clone().detach()
for k, v in zip(self.model.state_dict(), self.model.parameters())
}
)
self.optimizer.step()
return loss_val.item()
def train(self, dataset):
"""
One training iteration with accumulation of gradients in model.accumulated_gradients.
Goes through the entire dataset.
Parameters
----------
dataset : decentralizepy.datasets.Dataset
The training dataset. Should implement get_trainset(batch_size, shuffle)
"""
self.model.accumulated_gradients = []
super().train(dataset)
......@@ -46,7 +46,7 @@ class Training:
Directory to log the model change.
rounds : int, optional
Number of steps/epochs per training call
full_epochs: bool, optional
full_epochs : bool, optional
True if 1 round = 1 epoch. False if 1 round = 1 minibatch
batch_size : int, optional
Number of items to learn over, in one batch
......
......@@ -69,12 +69,23 @@ def get_args():
type=str,
default="./{}".format(datetime.datetime.now().isoformat(timespec="minutes")),
)
parser.add_argument(
"-wsd",
"--weights_store_dir",
type=str,
default="./{}_ws".format(datetime.datetime.now().isoformat(timespec="minutes")),
)
parser.add_argument("-is", "--iterations", type=int, default=1)
parser.add_argument("-cf", "--config_file", type=str, default="config.ini")
parser.add_argument("-ll", "--log_level", type=str, default="INFO")
parser.add_argument("-gf", "--graph_file", type=str, default="36_nodes.edges")
parser.add_argument("-gt", "--graph_type", type=str, default="edges")
parser.add_argument("-ta", "--test_after", type=int, default=5)
parser.add_argument("-tea", "--train_evaluate_after", type=int, default=1)
parser.add_argument("-ro", "--reset_optimizer", type=int, default=1)
parser.add_argument("-sm", "--server_machine", type=int, default=0)
parser.add_argument("-sr", "--server_rank", type=int, default=-1)
parser.add_argument("-wr", "--working_rate", type=float, default=1.0)
args = parser.parse_args()
return args
......@@ -97,12 +108,31 @@ def write_args(args, path):
"procs_per_machine": args.procs_per_machine,
"machines": args.machines,
"log_dir": args.log_dir,
"weights_store_dir": args.weights_store_dir,
"iterations": args.iterations,
"config_file": args.config_file,
"log_level": args.log_level,
"graph_file": args.graph_file,
"graph_type": args.graph_type,
"test_after": args.test_after,
"train_evaluate_after": args.train_evaluate_after,
"reset_optimizer": args.reset_optimizer,
"working_rate": args.working_rate,
}
with open(os.path.join(path, "args.json"), "w") as of:
json.dump(data, of)
def identity(obj):
"""
Identity function
Parameters
----------
obj
Some object
Returns
-------
obj
The same object
"""
return obj