Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • sacs/decentralizepy
  • mvujas/decentralizepy
  • randl/decentralizepy
3 results
Show changes
Showing
with 1124 additions and 145 deletions
import random
from decentralizepy.sharing.PartialModel import PartialModel
from decentralizepy.utils import identity
class RandomAlpha(PartialModel):
......@@ -19,9 +20,17 @@ class RandomAlpha(PartialModel):
model,
dataset,
log_dir,
alpha_list=[0.1, 0.2, 0.3, 0.4, 1.0],
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
accumulation=False,
save_accumulated="",
change_transformer=identity,
accumulate_averaging_changes=False,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
......@@ -65,15 +74,21 @@ class RandomAlpha(PartialModel):
dict_ordered,
save_shared,
metadata_cap,
accumulation,
save_accumulated,
change_transformer,
accumulate_averaging_changes,
compress,
compression_package,
compression_class,
)
self.alpha_list = eval(alpha_list)
random.seed(self.mapping.get_uid(self.rank, self.machine_id))
def step(self):
def get_data_to_send(self):
"""
Perform a sharing step. Implements D-PSGD with alpha randomly chosen.
"""
random.seed(
self.mapping.get_uid(self.rank, self.machine_id) + self.communication_round
)
self.alpha = random.randint(1, 7) / 10.0
super().step()
self.alpha = random.choice(self.alpha_list)
return super().get_data_to_send()
# Deprecated
import random
from decentralizepy.sharing.PartialModel import PartialModel
......@@ -24,6 +25,9 @@ class RandomAlphaIncremental(PartialModel):
metadata_cap=1.0,
range_start=0.1,
range_end=0.2,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
......@@ -67,16 +71,19 @@ class RandomAlphaIncremental(PartialModel):
dict_ordered,
save_shared,
metadata_cap,
compress,
compression_package,
compression_class,
)
random.seed(self.mapping.get_uid(self.rank, self.machine_id))
self.range_start = range_start
self.range_end = range_end
def step(self):
def get_data_to_send(self):
"""
Perform a sharing step. Implements D-PSGD with alpha randomly chosen from an increasing range.
"""
self.alpha = round(random.uniform(self.range_start, self.range_end), 2)
self.range_end = min(1.0, self.range_end + round(random.uniform(0.0, 0.1), 2))
super().step()
return super().get_data_to_send()
import random
from decentralizepy.sharing.Wavelet import Wavelet
class RandomAlpha(Wavelet):
"""
This class implements the partial model sharing with a random alpha each iteration.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha_list="[0.1, 0.2, 0.3, 0.4, 1.0]",
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
wavelet="haar",
level=4,
change_based_selection=True,
save_accumulated="",
accumulation=False,
accumulate_averaging_changes=False,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
1.0,
dict_ordered,
save_shared,
metadata_cap,
wavelet,
level,
change_based_selection,
save_accumulated,
accumulation,
accumulate_averaging_changes,
compress,
compression_package,
compression_class,
)
self.alpha_list = eval(alpha_list)
random.seed(self.mapping.get_uid(self.rank, self.machine_id))
def get_data_to_send(self):
"""
Perform a sharing step. Implements D-PSGD with alpha randomly chosen.
"""
self.alpha = random.choice(self.alpha_list)
return super().get_data_to_send()
......@@ -25,6 +25,9 @@ class RoundRobinPartial(Sharing):
dataset,
log_dir,
alpha=1.0,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
......@@ -52,7 +55,17 @@ class RoundRobinPartial(Sharing):
"""
super().__init__(
rank, machine_id, communication, mapping, graph, model, dataset, log_dir
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
compress,
compression_package,
compression_class,
)
self.alpha = alpha
random.seed(self.mapping.get_uid(rank, machine_id))
......@@ -84,6 +97,7 @@ class RoundRobinPartial(Sharing):
block_end = min(T.shape[0], (self.current_block + 1) * self.block_size)
self.current_block = (self.current_block + 1) % self.num_blocks
T_send = T[block_start:block_end]
self.model.shared_parameters_counter[block_start:block_end] += 1
logging.info("Range sending: {}-{}".format(block_start, block_end))
logging.info("Generating dictionary to send")
......@@ -103,7 +117,7 @@ class RoundRobinPartial(Sharing):
logging.info("Converted dictionary to json")
self.total_data += len(self.communication.encrypt(m["params"]))
return m
return self.compress_data(m)
def deserialized_model(self, m):
"""
......@@ -120,9 +134,9 @@ class RoundRobinPartial(Sharing):
state_dict of received
"""
m = self.decompress_data(m)
with torch.no_grad():
state_dict = self.model.state_dict()
shapes = []
lens = []
tensors_to_cat = []
......
import importlib
import logging
from collections import deque
import torch
......@@ -11,7 +11,18 @@ class Sharing:
"""
def __init__(
self, rank, machine_id, communication, mapping, graph, model, dataset, log_dir
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
......@@ -46,45 +57,37 @@ class Sharing:
self.dataset = dataset
self.communication_round = 0
self.log_dir = log_dir
self.total_data = 0
self.peer_deques = dict()
my_neighbors = self.graph.neighbors(self.uid)
for n in my_neighbors:
self.peer_deques[n] = deque()
def received_from_all(self):
"""
Check if all neighbors have sent the current iteration
Returns
-------
bool
True if required data has been received, False otherwise
"""
for _, i in self.peer_deques.items():
if len(i) == 0:
return False
return True
def get_neighbors(self, neighbors):
"""
Choose which neighbors to share with
Parameters
----------
neighbors : list(int)
List of all neighbors
Returns
-------
list(int)
Neighbors to share with
"""
# modify neighbors here
return neighbors
self.shapes = []
self.lens = []
with torch.no_grad():
for _, v in self.model.state_dict().items():
self.shapes.append(v.shape)
t = v.flatten().numpy()
self.lens.append(t.shape[0])
self.compress = compress
if compression_package and compression_class:
compressor_module = importlib.import_module(compression_package)
compressor_class = getattr(compressor_module, compression_class)
self.compressor = compressor_class()
logging.info(f"Using the {compressor_class} to compress the data")
else:
assert not self.compress
def compress_data(self, data):
result = dict(data)
if self.compress:
if "params" in result:
result["params"] = self.compressor.compress_float(result["params"])
return result
def decompress_data(self, data):
if self.compress:
if "params" in data:
data["params"] = self.compressor.decompress_float(data["params"])
return data
def serialized_model(self):
"""
......@@ -96,11 +99,15 @@ class Sharing:
Model converted to dict
"""
m = dict()
for key, val in self.model.state_dict().items():
m[key] = val.numpy()
self.total_data += len(self.communication.encrypt(m[key]))
return m
to_cat = []
with torch.no_grad():
for _, v in self.model.state_dict().items():
t = v.flatten()
to_cat.append(t)
flat = torch.cat(to_cat)
data = dict()
data["params"] = flat.numpy()
return self.compress_data(data)
def deserialized_model(self, m):
"""
......@@ -118,8 +125,16 @@ class Sharing:
"""
state_dict = dict()
for key, value in m.items():
state_dict[key] = torch.from_numpy(value)
m = self.decompress_data(m)
T = m["params"]
start_index = 0
for i, key in enumerate(self.model.state_dict()):
end_index = start_index + self.lens[i]
state_dict[key] = torch.from_numpy(
T[start_index:end_index].reshape(self.shapes[i])
)
start_index = end_index
return state_dict
def _pre_step(self):
......@@ -136,7 +151,7 @@ class Sharing:
"""
pass
def _averaging(self):
def _averaging(self, peer_deques):
"""
Averages the received model with the local model
......@@ -144,15 +159,20 @@ class Sharing:
with torch.no_grad():
total = dict()
weight_total = 0
for i, n in enumerate(self.peer_deques):
degree, iteration, data = self.peer_deques[n].popleft()
for i, n in enumerate(peer_deques):
data = peer_deques[n].popleft()
degree, iteration = data["degree"], data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
)
data = self.deserialized_model(data)
weight = 1 / (max(len(self.peer_deques), degree) + 1) # Metro-Hastings
# Metro-Hastings
weight = 1 / (max(len(peer_deques), degree) + 1)
weight_total += weight
for key, value in data.items():
if key in total:
......@@ -164,40 +184,45 @@ class Sharing:
total[key] += (1 - weight_total) * value # Metro-Hastings
self.model.load_state_dict(total)
self._post_step()
self.communication_round += 1
def step(self):
"""
Perform a sharing step. Implements D-PSGD.
"""
def get_data_to_send(self):
self._pre_step()
data = self.serialized_model()
my_uid = self.mapping.get_uid(self.rank, self.machine_id)
all_neighbors = self.graph.neighbors(my_uid)
iter_neighbors = self.get_neighbors(all_neighbors)
data["degree"] = len(all_neighbors)
data["iteration"] = self.communication_round
for neighbor in iter_neighbors:
self.communication.send(neighbor, data)
logging.info("Waiting for messages from neighbors")
while not self.received_from_all():
sender, data = self.communication.receive()
logging.debug("Received model from {}".format(sender))
degree = data["degree"]
iteration = data["iteration"]
del data["degree"]
del data["iteration"]
self.peer_deques[sender].append((degree, iteration, data))
logging.info(
"Deserialized received model from {} of iteration {}".format(
sender, iteration
)
)
return data
logging.info("Starting model averaging after receiving from all neighbors")
self._averaging()
logging.info("Model averaging complete")
def _averaging_server(self, peer_deques):
"""
Averages the received models of all working nodes
self.communication_round += 1
"""
with torch.no_grad():
total = dict()
for i, n in enumerate(peer_deques):
data = peer_deques[n].popleft()
degree, iteration = data["degree"], data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
)
data = self.deserialized_model(data)
weight = 1 / len(peer_deques)
for key, value in data.items():
if key in total:
total[key] += weight * value
else:
total[key] = weight * value
self.model.load_state_dict(total)
self._post_step()
self.communication_round += 1
return total
# Deprecated
import logging
from collections import deque
import torch
class Sharing:
"""
API defining who to share with and what, and what to do on receiving
"""
def __init__(
self, rank, machine_id, communication, mapping, graph, model, dataset, log_dir
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
"""
self.rank = rank
self.machine_id = machine_id
self.uid = mapping.get_uid(rank, machine_id)
self.communication = communication
self.mapping = mapping
self.graph = graph
self.model = model
self.dataset = dataset
self.communication_round = 0
self.log_dir = log_dir
self.peer_deques = dict()
my_neighbors = self.graph.neighbors(self.uid)
for n in my_neighbors:
self.peer_deques[n] = deque()
self.averaging_weights = self.graph.centr()
def received_from_all(self):
"""
Check if all neighbors have sent the current iteration
Returns
-------
bool
True if required data has been received, False otherwise
"""
for _, i in self.peer_deques.items():
if len(i) == 0:
return False
return True
def get_neighbors(self, neighbors):
"""
Choose which neighbors to share with
Parameters
----------
neighbors : list(int)
List of all neighbors
Returns
-------
list(int)
Neighbors to share with
"""
# modify neighbors here
return neighbors
def serialized_model(self):
"""
Convert model to a dictionary. Here we can choose how much to share
Returns
-------
dict
Model converted to dict
"""
m = dict()
for key, val in self.model.state_dict().items():
m[key] = val.numpy()
return m
def deserialized_model(self, m):
"""
Convert received dict to state_dict.
Parameters
----------
m : dict
received dict
Returns
-------
state_dict
state_dict of received
"""
state_dict = dict()
for key, value in m.items():
state_dict[key] = torch.from_numpy(value)
return state_dict
def _pre_step(self):
"""
Called at the beginning of step.
"""
pass
def _post_step(self):
"""
Called at the end of step.
"""
pass
def _averaging(self):
"""
Averages the received model with the local model
"""
with torch.no_grad():
total = dict()
for _, n in enumerate(self.peer_deques):
_, iteration, data = self.peer_deques[n].popleft()
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
)
data = self.deserialized_model(data)
weight = self.averaging_weights[self.uid, n]
for key, value in data.items():
if key in total:
total[key] += value * weight
else:
total[key] = value * weight
for key, value in self.model.state_dict().items():
total[key] += (
self.averaging_weights[self.uid, self.uid] * value
) # Metro-Hastings
self.model.load_state_dict(total)
def step(self):
"""
Perform a sharing step. Implements D-PSGD.
"""
self._pre_step()
data = self.serialized_model()
my_uid = self.mapping.get_uid(self.rank, self.machine_id)
all_neighbors = self.graph.neighbors(my_uid)
iter_neighbors = self.get_neighbors(all_neighbors)
data["degree"] = len(all_neighbors)
data["iteration"] = self.communication_round
for neighbor in iter_neighbors:
self.communication.send(neighbor, data)
logging.info("Waiting for messages from neighbors")
while not self.received_from_all():
sender, data = self.communication.receive()
logging.debug("Received model from {}".format(sender))
degree = data["degree"]
iteration = data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
self.peer_deques[sender].append((degree, iteration, data))
logging.info(
"Deserialized received model from {} of iteration {}".format(
sender, iteration
)
)
logging.info("Starting model averaging after receiving from all neighbors")
self._averaging()
logging.info("Model averaging complete")
self.communication_round += 1
self._post_step()
......@@ -31,6 +31,9 @@ class SubSampling(Sharing):
metadata_cap=1.0,
pickle=True,
layerwise=False,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
......@@ -66,13 +69,22 @@ class SubSampling(Sharing):
"""
super().__init__(
rank, machine_id, communication, mapping, graph, model, dataset, log_dir
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
compress,
compression_package,
compression_class,
)
self.alpha = alpha
self.dict_ordered = dict_ordered
self.save_shared = save_shared
self.metadata_cap = metadata_cap
self.total_meta = 0
# self.random_seed_generator = torch.Generator()
# # Will use the random device if supported by CPU, else uses the system time
......@@ -101,6 +113,17 @@ class SubSampling(Sharing):
)
Path(self.folder_path).mkdir(parents=True, exist_ok=True)
with torch.no_grad():
tensors_to_cat = []
for _, v in self.model.state_dict().items():
t = v.flatten()
tensors_to_cat.append(t)
self.init_model = torch.cat(tensors_to_cat, dim=0)
self.model.shared_parameters_counter = torch.zeros(
self.init_model.shape[0], dtype=torch.int32
)
def apply_subsampling(self):
"""
Creates a random binary mask that is used to subsample the parameters that will be shared
......@@ -131,6 +154,7 @@ class SubSampling(Sharing):
<= self.alpha
)
subsample = concated[binary_mask]
self.model.shared_parameters_counter[binary_mask] += 1
# logging.debug("Subsampling vector is of size: " + str(subsample.size(dim = 0)))
return (subsample, curr_seed, self.alpha)
else:
......@@ -147,6 +171,7 @@ class SubSampling(Sharing):
)
<= self.alpha
)
# TODO: support shared_parameters_counter
selected = flat[binary_mask]
values_list.append(selected)
off += selected.size(dim=0)
......@@ -203,13 +228,7 @@ class SubSampling(Sharing):
m["alpha"] = alpha
m["params"] = subsample.numpy()
# logging.info("Converted dictionary to json")
self.total_data += len(self.communication.encrypt(m["params"]))
self.total_meta += len(self.communication.encrypt(m["seed"])) + len(
self.communication.encrypt(m["alpha"])
)
return m
return self.compress_data(m)
def deserialized_model(self, m):
"""
......@@ -229,6 +248,8 @@ class SubSampling(Sharing):
if self.alpha > self.metadata_cap: # Share fully
return super().deserialized_model(m)
m = self.decompress_data(m)
with torch.no_grad():
state_dict = self.model.state_dict()
......
# Deprecated
import logging
from collections import deque
import torch
class Synchronous:
"""
Synchronous training
"""
def __init__(
self, rank, machine_id, communication, mapping, graph, model, dataset, log_dir
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
"""
self.rank = rank
self.machine_id = machine_id
self.uid = mapping.get_uid(rank, machine_id)
self.communication = communication
self.mapping = mapping
self.graph = graph
self.model = model
self.dataset = dataset
self.communication_round = 0
self.log_dir = log_dir
self.peer_deques = dict()
self.my_neighbors = self.graph.neighbors(self.uid)
for n in self.my_neighbors:
self.peer_deques[n] = deque()
with torch.no_grad():
self.init_model = {}
for k, v in self.model.state_dict().items():
self.init_model[k] = v.clone().detach()
def received_from_all(self):
"""
Check if all neighbors have sent the current iteration
Returns
-------
bool
True if required data has been received, False otherwise
"""
for _, i in self.peer_deques.items():
if len(i) == 0:
return False
return True
def get_neighbors(self, neighbors):
"""
Choose which neighbors to share with
Parameters
----------
neighbors : list(int)
List of all neighbors
Returns
-------
list(int)
Neighbors to share with
"""
# modify neighbors here
return neighbors
def serialized_gradient(self):
"""
Convert model to a dictionary. Here we can choose how much to share
Returns
-------
dict
Model converted to dict
"""
m = dict()
for key, val in self.model.state_dict().items():
m[key] = val - self.init_model[key] # this is -lr*gradient
return m
def serialized_model(self):
"""
Convert model to a dictionary. Here we can choose how much to share
Returns
-------
dict
Model converted to dict
"""
m = dict()
for key, val in self.model.state_dict().items():
m[key] = val.clone().detach()
return m
def deserialized_model(self, m):
"""
Convert received dict to state_dict.
Parameters
----------
m : dict
received dict
Returns
-------
state_dict
state_dict of received
"""
return m
def _pre_step(self):
"""
Called at the beginning of step.
"""
pass
def _post_step(self):
"""
Called at the end of step.
"""
with torch.no_grad():
self.init_model = {}
for k, v in self.model.state_dict().items():
self.init_model[k] = v.clone().detach()
def _apply_gradients(self):
"""
Averages the received model with the local model
"""
with torch.no_grad():
total = dict()
for i, n in enumerate(self.peer_deques):
gradient = self.peer_deques[n].popleft()
logging.debug(
"Applying gradient from neighbor {}".format(
n,
)
)
grad = self.deserialized_model(gradient)
for key, value in grad.items():
if key in total:
total[key] += value
else:
total[key] = value
my_grad = self.serialized_gradient()
for key, value in my_grad.items():
if key in total:
total[key] += value
else:
total[key] = value
new_model = {}
for key, value in self.init_model.items():
new_model[key] = value + total[key] * (1 / (len(self.my_neighbors) + 1))
self.model.load_state_dict(new_model)
def step(self):
"""
Perform a sharing step. Implements D-PSGD.
"""
self._pre_step()
logging.info("--- COMMUNICATION ROUND {} ---".format(self.communication_round))
if self.uid != 0:
gradient = self.serialized_gradient()
# Should be only one neighbour
self.communication.send(0, gradient)
logging.info("Waiting for messages from central node")
sender, data = self.communication.receive()
logging.debug("Received model from {}".format(sender))
logging.info(
"Deserialized received model from {} of iteration {}".format(
sender, self.communication_round
)
)
self.model.load_state_dict(data)
else:
logging.info("Waiting for messages from leaf nodes")
while not self.received_from_all():
sender, data = self.communication.receive()
logging.debug("Received gradient from {}".format(sender))
self.peer_deques[sender].append(data)
logging.info(
"Deserialized gradient model from {} of iteration {}".format(
sender, self.communication_round
)
)
self._apply_gradients()
data = self.serialized_model()
all_neighbors = self.graph.neighbors(self.uid)
iter_neighbors = self.get_neighbors(all_neighbors)
for neighbor in iter_neighbors:
self.communication.send(neighbor, data)
self.communication_round += 1
self._post_step()
import logging
import torch
from decentralizepy.sharing.PartialModel import PartialModel
from decentralizepy.utils import identity
class TopKNormalized(PartialModel):
"""
This class implements the vanilla version of partial model sharing.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha=1.0,
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
accumulation=False,
save_accumulated="",
change_transformer=identity,
accumulate_averaging_changes=False,
epsilon=0.01,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
alpha : float
Percentage of model to share
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
accumulation : bool
True if the the indices to share should be selected based on accumulated frequency change
save_accumulated : bool
True if accumulated weight change should be written to file. In case of accumulation the accumulated change
is stored. If a change_transformer is used then the transformed change is stored.
change_transformer : (x: Tensor) -> Tensor
A function that transforms the model change into other domains. Default: identity function
accumulate_averaging_changes: bool
True if the accumulation should account the model change due to averaging
epsilon : float
numerical stability parameter used during normalization
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha,
dict_ordered,
save_shared,
metadata_cap,
accumulation,
save_accumulated,
change_transformer,
accumulate_averaging_changes,
compress,
compression_package,
compression_class,
)
self.epsilon = epsilon
def extract_top_gradients(self):
"""
Extract the indices and values of the topK gradients.
The gradients must have been accumulated.
Returns
-------
tuple
(a,b). a: The magnitudes of the topK gradients, b: Their indices.
"""
logging.info("Returning topk gradients")
G_topk = torch.abs(self.model.model_change)
G_topk_normalized = G_topk / (torch.abs(self.pre_share_model) + self.epsilon)
std, mean = torch.std_mean(G_topk, unbiased=False)
self.std = std.item()
self.mean = mean.item()
return torch.topk(
G_topk_normalized, round(self.alpha * G_topk.shape[0]), dim=0, sorted=False
)
......@@ -29,6 +29,9 @@ class TopKParams(Sharing):
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
......@@ -62,13 +65,22 @@ class TopKParams(Sharing):
"""
super().__init__(
rank, machine_id, communication, mapping, graph, model, dataset, log_dir
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
compress,
compression_package,
compression_class,
)
self.alpha = alpha
self.dict_ordered = dict_ordered
self.save_shared = save_shared
self.metadata_cap = metadata_cap
self.total_meta = 0
if self.save_shared:
# Only save for 2 procs: Save space
......@@ -128,7 +140,7 @@ class TopKParams(Sharing):
with torch.no_grad():
values, index, offsets = self.extract_top_params()
self.model.shared_parameters_counter[index] += 1
if self.save_shared:
shared_params = dict()
shared_params["order"] = list(self.model.state_dict().keys())
......@@ -171,12 +183,8 @@ class TopKParams(Sharing):
# m[key] = json.dumps(m[key])
logging.info("Converted dictionary to json")
self.total_data += len(self.communication.encrypt(m["params"]))
self.total_meta += len(self.communication.encrypt(m["indices"])) + len(
self.communication.encrypt(m["offsets"])
)
return m
return self.compress_data(m)
def deserialized_model(self, m):
"""
......@@ -196,6 +204,8 @@ class TopKParams(Sharing):
if self.alpha > self.metadata_cap: # Share fully
return super().deserialized_model(m)
m = self.decompress_data(m)
with torch.no_grad():
state_dict = self.model.state_dict()
......
......@@ -26,6 +26,9 @@ class TopKPlusRandom(PartialModel):
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
......@@ -71,6 +74,9 @@ class TopKPlusRandom(PartialModel):
dict_ordered,
save_shared,
metadata_cap,
compress,
compression_package,
compression_class,
)
def extract_top_gradients(self):
......
import json
import logging
import os
from pathlib import Path
from time import time
import numpy as np
import pywt
......@@ -61,6 +59,9 @@ class Wavelet(PartialModel):
save_accumulated="",
accumulation=False,
accumulate_averaging_changes=False,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
......@@ -125,6 +126,9 @@ class Wavelet(PartialModel):
save_accumulated,
lambda x: change_transformer_wavelet(x, wavelet, level),
accumulate_averaging_changes,
compress,
compression_package,
compression_class,
)
self.change_based_selection = change_based_selection
......@@ -148,9 +152,7 @@ class Wavelet(PartialModel):
"""
logging.info("Returning wavelet compressed model weights")
tensors_to_cat = [v.data.flatten() for _, v in self.model.state_dict().items()]
concated = torch.cat(tensors_to_cat, dim=0)
data = self.change_transformer(concated)
data = self.pre_share_model_transformed
if self.change_based_selection:
diff = self.model.model_change
_, index = torch.topk(
......@@ -166,7 +168,7 @@ class Wavelet(PartialModel):
dim=0,
sorted=False,
)
index, _ = torch.sort(index)
return data[index], index
def serialized_model(self):
......@@ -179,12 +181,19 @@ class Wavelet(PartialModel):
Model converted to json dict
"""
if self.alpha > self.metadata_cap: # Share fully
return super().serialized_model()
m = dict()
if self.alpha >= self.metadata_cap: # Share fully
data = self.pre_share_model_transformed
m["params"] = data.numpy()
if self.model.accumulated_changes is not None:
self.model.accumulated_changes = torch.zeros_like(
self.model.accumulated_changes
)
return self.compress_data(m)
with torch.no_grad():
topk, indices = self.apply_wavelet()
self.model.shared_parameters_counter[indices] += 1
self.model.rewind_accumulation(indices)
if self.save_shared:
shared_params = dict()
......@@ -194,7 +203,8 @@ class Wavelet(PartialModel):
shapes[k] = list(v.shape)
shared_params["shapes"] = shapes
shared_params[self.communication_round] = indices.tolist() # is slow
# is slow
shared_params[self.communication_round] = indices.tolist()
shared_params["alpha"] = self.alpha
......@@ -207,8 +217,6 @@ class Wavelet(PartialModel):
) as of:
json.dump(shared_params, of)
m = dict()
if not self.dict_ordered:
raise NotImplementedError
......@@ -218,12 +226,9 @@ class Wavelet(PartialModel):
m["indices"] = indices.numpy().astype(np.int32)
self.total_data += len(self.communication.encrypt(m["params"]))
self.total_meta += len(self.communication.encrypt(m["indices"])) + len(
self.communication.encrypt(m["alpha"])
)
m["send_partial"] = True
return m
return self.compress_data(m)
def deserialized_model(self, m):
"""
......@@ -240,25 +245,28 @@ class Wavelet(PartialModel):
state_dict of received
"""
if self.alpha > self.metadata_cap: # Share fully
return super().deserialized_model(m)
m = self.decompress_data(m)
ret = dict()
if "send_partial" not in m:
params = m["params"]
params_tensor = torch.tensor(params)
ret["params"] = params_tensor
return ret
with torch.no_grad():
if not self.dict_ordered:
raise NotImplementedError
indices = m["indices"]
alpha = m["alpha"]
params = m["params"]
params_tensor = torch.tensor(params)
indices_tensor = torch.tensor(indices, dtype=torch.long)
params_tensor = torch.tensor(m["params"])
indices_tensor = torch.tensor(m["indices"], dtype=torch.long)
ret = dict()
ret["indices"] = indices_tensor
ret["params"] = params_tensor
return ret
ret["send_partial"] = True
return ret
def _averaging(self):
def _averaging(self, peer_deques):
"""
Averages the received model with the local model
......@@ -266,13 +274,13 @@ class Wavelet(PartialModel):
with torch.no_grad():
total = None
weight_total = 0
tensors_to_cat = [
v.data.flatten() for _, v in self.model.state_dict().items()
]
pre_share_model = torch.cat(tensors_to_cat, dim=0)
wt_params = self.change_transformer(pre_share_model)
for i, n in enumerate(self.peer_deques):
degree, iteration, data = self.peer_deques[n].popleft()
wt_params = self.pre_share_model_transformed
for i, n in enumerate(peer_deques):
data = peer_deques[n].popleft()
degree, iteration = data["degree"], data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(
n, iteration
......@@ -280,13 +288,17 @@ class Wavelet(PartialModel):
)
data = self.deserialized_model(data)
params = data["params"]
indices = data["indices"]
# use local data to complement
topkwf = wt_params.clone().detach()
topkwf[indices] = params
topkwf = topkwf.reshape(self.wt_shape)
if "indices" in data:
indices = data["indices"]
# use local data to complement
topkwf = wt_params.clone().detach()
topkwf[indices] = params
topkwf = topkwf.reshape(self.wt_shape)
else:
topkwf = params.reshape(self.wt_shape)
weight = 1 / (max(len(self.peer_deques), degree) + 1) # Metro-Hastings
# Metro-Hastings
weight = 1 / (max(len(peer_deques), degree) + 1)
weight_total += weight
if total is None:
total = weight * topkwf
......@@ -313,3 +325,61 @@ class Wavelet(PartialModel):
start_index = end_index
self.model.load_state_dict(std_dict)
self._post_step()
self.communication_round += 1
def _averaging_server(self, peer_deques):
"""
Averages the received models of all working nodes
"""
with torch.no_grad():
total = None
wt_params = self.pre_share_model_transformed
for i, n in enumerate(peer_deques):
data = peer_deques[n].popleft()
degree, iteration = data["degree"], data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
)
data = self.deserialized_model(data)
params = data["params"]
if "indices" in data:
indices = data["indices"]
# use local data to complement
topkwf = wt_params.clone().detach()
topkwf[indices] = params
topkwf = topkwf.reshape(self.wt_shape)
else:
topkwf = params.reshape(self.wt_shape)
weight = 1 / len(peer_deques)
if total is None:
total = weight * topkwf
else:
total += weight * topkwf
avg_wf_params = pywt.array_to_coeffs(
total.numpy(), self.coeff_slices, output_format="wavedec"
)
reverse_total = torch.from_numpy(
pywt.waverec(avg_wf_params, wavelet=self.wavelet)
)
start_index = 0
std_dict = {}
for i, key in enumerate(self.model.state_dict()):
end_index = start_index + self.lens[i]
std_dict[key] = reverse_total[start_index:end_index].reshape(
self.shapes[i]
)
start_index = end_index
self.model.load_state_dict(std_dict)
self._post_step()
self.communication_round += 1
......@@ -69,13 +69,23 @@ def get_args():
type=str,
default="./{}".format(datetime.datetime.now().isoformat(timespec="minutes")),
)
parser.add_argument(
"-wsd",
"--weights_store_dir",
type=str,
default="./{}_ws".format(datetime.datetime.now().isoformat(timespec="minutes")),
)
parser.add_argument("-is", "--iterations", type=int, default=1)
parser.add_argument("-cf", "--config_file", type=str, default="config.ini")
parser.add_argument("-ll", "--log_level", type=str, default="INFO")
parser.add_argument("-gf", "--graph_file", type=str, default="36_nodes.edges")
parser.add_argument("-gt", "--graph_type", type=str, default="edges")
parser.add_argument("-ta", "--test_after", type=int, default=5)
parser.add_argument("-tea", "--train_evaluate_after", type=int, default=1)
parser.add_argument("-ro", "--reset_optimizer", type=int, default=1)
parser.add_argument("-sm", "--server_machine", type=int, default=0)
parser.add_argument("-sr", "--server_rank", type=int, default=-1)
parser.add_argument("-wr", "--working_rate", type=float, default=1.0)
args = parser.parse_args()
return args
......@@ -98,13 +108,16 @@ def write_args(args, path):
"procs_per_machine": args.procs_per_machine,
"machines": args.machines,
"log_dir": args.log_dir,
"weights_store_dir": args.weights_store_dir,
"iterations": args.iterations,
"config_file": args.config_file,
"log_level": args.log_level,
"graph_file": args.graph_file,
"graph_type": args.graph_type,
"test_after": args.test_after,
"train_evaluate_after": args.train_evaluate_after,
"reset_optimizer": args.reset_optimizer,
"working_rate": args.working_rate,
}
with open(os.path.join(path, "args.json"), "w") as of:
json.dump(data, of)
......
[DATASET]
dataset_package = decentralizepy.datasets.CIFAR10
dataset_class = CIFAR10
model_class = LeNet
train_dir = /mnt/nfs/shared/CIFAR
test_dir = /mnt/nfs/shared/CIFAR
; python list of fractions below
sizes =
random_seed = 90
partition_niid = True
shards = 4
[OPTIMIZER_PARAMS]
optimizer_package = torch.optim
optimizer_class = SGD
lr = 0.01
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
rounds = 3
full_epochs = False
batch_size = 8
shuffle = True
loss_package = torch.nn
loss_class = CrossEntropyLoss
[COMMUNICATION]
comm_package = decentralizepy.communication.TCP
comm_class = TCP
addresses_filepath = /mnt/nfs/risharma/Gitlab/tutorial/ip.json
[SHARING]
sharing_package = decentralizepy.sharing.Sharing
sharing_class = Sharing
{
"0": "127.0.0.1"
}
\ No newline at end of file
16
0 12
0 14
0 15
1 8
1 3
1 6
2 9
2 10
2 5
3 1
3 11
3 9
4 9
4 12
4 13
5 2
5 6
5 7
6 1
6 5
6 7
7 5
7 6
7 14
8 1
8 13
8 14
9 2
9 3
9 4
10 2
10 11
10 13
11 10
11 3
11 15
12 0
12 4
12 15
13 8
13 10
13 4
14 0
14 8
14 7
15 0
15 11
15 12
#!/bin/bash
decpy_path=/mnt/nfs/risharma/Gitlab/decentralizepy/eval
cd $decpy_path
env_python=~/miniconda3/envs/decpy/bin/python3
graph=/mnt/nfs/risharma/Gitlab/tutorial/96_regular.edges
original_config=/mnt/nfs/risharma/Gitlab/tutorial/config_celeba_sharing.ini
config_file=~/tmp/config.ini
procs_per_machine=16
machines=1
iterations=80
test_after=20
eval_file=testingPeerSampler.py
log_level=INFO
m=`cat $(grep addresses_filepath $original_config | awk '{print $3}') | grep $(/sbin/ifconfig ens785 | grep 'inet ' | awk '{print $2}') | cut -d'"' -f2`
echo M is $m
log_dir=$(date '+%Y-%m-%dT%H:%M')/machine$m
mkdir -p $log_dir
cp $original_config $config_file
# echo "alpha = 0.10" >> $config_file
$env_python $eval_file -ro 0 -tea $test_after -ld $log_dir -mid $m -ps $procs_per_machine -ms $machines -is $iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level -wsd $log_dir
\ No newline at end of file
#!/bin/bash
decpy_path=/mnt/nfs/risharma/Gitlab/decentralizepy/eval
cd $decpy_path
env_python=~/miniconda3/envs/decpy/bin/python3
graph=/mnt/nfs/risharma/Gitlab/tutorial/96_regular.edges
original_config=/mnt/nfs/risharma/Gitlab/tutorial/config_celeba_sharing.ini
config_file=~/tmp/config.ini
procs_per_machine=16
machines=1
iterations=80
test_after=20
eval_file=testingFederated.py
log_level=INFO
server_rank=-1
server_machine=0
working_rate=0.5
m=`cat $(grep addresses_filepath $original_config | awk '{print $3}') | grep $(/sbin/ifconfig ens785 | grep 'inet ' | awk '{print $2}') | cut -d'"' -f2`
echo M is $m
log_dir=$(date '+%Y-%m-%dT%H:%M')/machine$m
mkdir -p $log_dir
cp $original_config $config_file
# echo "alpha = 0.10" >> $config_file
$env_python $eval_file -ro 0 -tea $test_after -ld $log_dir -mid $m -ps $procs_per_machine -ms $machines -is $iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level -ctr 0 -cte 0 -wsd $log_dir -sm $server_machine -sr $server_rank -wr $working_rate
\ No newline at end of file