Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • sacs/decentralizepy
  • mvujas/decentralizepy
  • randl/decentralizepy
3 results
Show changes
Showing
with 1173 additions and 179 deletions
import random import random
from decentralizepy.sharing.PartialModel import PartialModel from decentralizepy.sharing.PartialModel import PartialModel
from decentralizepy.utils import identity
class RandomAlpha(PartialModel): class RandomAlpha(PartialModel):
...@@ -19,9 +20,17 @@ class RandomAlpha(PartialModel): ...@@ -19,9 +20,17 @@ class RandomAlpha(PartialModel):
model, model,
dataset, dataset,
log_dir, log_dir,
alpha_list=[0.1, 0.2, 0.3, 0.4, 1.0],
dict_ordered=True, dict_ordered=True,
save_shared=False, save_shared=False,
metadata_cap=1.0, metadata_cap=1.0,
accumulation=False,
save_accumulated="",
change_transformer=identity,
accumulate_averaging_changes=False,
compress=False,
compression_package=None,
compression_class=None,
): ):
""" """
Constructor Constructor
...@@ -65,15 +74,21 @@ class RandomAlpha(PartialModel): ...@@ -65,15 +74,21 @@ class RandomAlpha(PartialModel):
dict_ordered, dict_ordered,
save_shared, save_shared,
metadata_cap, metadata_cap,
accumulation,
save_accumulated,
change_transformer,
accumulate_averaging_changes,
compress,
compression_package,
compression_class,
) )
self.alpha_list = eval(alpha_list)
random.seed(self.mapping.get_uid(self.rank, self.machine_id))
def step(self): def get_data_to_send(self):
""" """
Perform a sharing step. Implements D-PSGD with alpha randomly chosen. Perform a sharing step. Implements D-PSGD with alpha randomly chosen.
""" """
random.seed( self.alpha = random.choice(self.alpha_list)
self.mapping.get_uid(self.rank, self.machine_id) + self.communication_round return super().get_data_to_send()
)
self.alpha = random.randint(1, 7) / 10.0
super().step()
# Deprecated
import random import random
from decentralizepy.sharing.PartialModel import PartialModel from decentralizepy.sharing.PartialModel import PartialModel
...@@ -24,6 +25,9 @@ class RandomAlphaIncremental(PartialModel): ...@@ -24,6 +25,9 @@ class RandomAlphaIncremental(PartialModel):
metadata_cap=1.0, metadata_cap=1.0,
range_start=0.1, range_start=0.1,
range_end=0.2, range_end=0.2,
compress=False,
compression_package=None,
compression_class=None,
): ):
""" """
Constructor Constructor
...@@ -67,16 +71,19 @@ class RandomAlphaIncremental(PartialModel): ...@@ -67,16 +71,19 @@ class RandomAlphaIncremental(PartialModel):
dict_ordered, dict_ordered,
save_shared, save_shared,
metadata_cap, metadata_cap,
compress,
compression_package,
compression_class,
) )
random.seed(self.mapping.get_uid(self.rank, self.machine_id)) random.seed(self.mapping.get_uid(self.rank, self.machine_id))
self.range_start = range_start self.range_start = range_start
self.range_end = range_end self.range_end = range_end
def step(self): def get_data_to_send(self):
""" """
Perform a sharing step. Implements D-PSGD with alpha randomly chosen from an increasing range. Perform a sharing step. Implements D-PSGD with alpha randomly chosen from an increasing range.
""" """
self.alpha = round(random.uniform(self.range_start, self.range_end), 2) self.alpha = round(random.uniform(self.range_start, self.range_end), 2)
self.range_end = min(1.0, self.range_end + round(random.uniform(0.0, 0.1), 2)) self.range_end = min(1.0, self.range_end + round(random.uniform(0.0, 0.1), 2))
super().step() return super().get_data_to_send()
import random
from decentralizepy.sharing.Wavelet import Wavelet
class RandomAlpha(Wavelet):
"""
This class implements the partial model sharing with a random alpha each iteration.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha_list="[0.1, 0.2, 0.3, 0.4, 1.0]",
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
wavelet="haar",
level=4,
change_based_selection=True,
save_accumulated="",
accumulation=False,
accumulate_averaging_changes=False,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
1.0,
dict_ordered,
save_shared,
metadata_cap,
wavelet,
level,
change_based_selection,
save_accumulated,
accumulation,
accumulate_averaging_changes,
compress,
compression_package,
compression_class,
)
self.alpha_list = eval(alpha_list)
random.seed(self.mapping.get_uid(self.rank, self.machine_id))
def get_data_to_send(self):
"""
Perform a sharing step. Implements D-PSGD with alpha randomly chosen.
"""
self.alpha = random.choice(self.alpha_list)
return super().get_data_to_send()
...@@ -25,6 +25,9 @@ class RoundRobinPartial(Sharing): ...@@ -25,6 +25,9 @@ class RoundRobinPartial(Sharing):
dataset, dataset,
log_dir, log_dir,
alpha=1.0, alpha=1.0,
compress=False,
compression_package=None,
compression_class=None,
): ):
""" """
Constructor Constructor
...@@ -52,7 +55,17 @@ class RoundRobinPartial(Sharing): ...@@ -52,7 +55,17 @@ class RoundRobinPartial(Sharing):
""" """
super().__init__( super().__init__(
rank, machine_id, communication, mapping, graph, model, dataset, log_dir rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
compress,
compression_package,
compression_class,
) )
self.alpha = alpha self.alpha = alpha
random.seed(self.mapping.get_uid(rank, machine_id)) random.seed(self.mapping.get_uid(rank, machine_id))
...@@ -84,6 +97,7 @@ class RoundRobinPartial(Sharing): ...@@ -84,6 +97,7 @@ class RoundRobinPartial(Sharing):
block_end = min(T.shape[0], (self.current_block + 1) * self.block_size) block_end = min(T.shape[0], (self.current_block + 1) * self.block_size)
self.current_block = (self.current_block + 1) % self.num_blocks self.current_block = (self.current_block + 1) % self.num_blocks
T_send = T[block_start:block_end] T_send = T[block_start:block_end]
self.model.shared_parameters_counter[block_start:block_end] += 1
logging.info("Range sending: {}-{}".format(block_start, block_end)) logging.info("Range sending: {}-{}".format(block_start, block_end))
logging.info("Generating dictionary to send") logging.info("Generating dictionary to send")
...@@ -103,7 +117,7 @@ class RoundRobinPartial(Sharing): ...@@ -103,7 +117,7 @@ class RoundRobinPartial(Sharing):
logging.info("Converted dictionary to json") logging.info("Converted dictionary to json")
self.total_data += len(self.communication.encrypt(m["params"])) self.total_data += len(self.communication.encrypt(m["params"]))
return m return self.compress_data(m)
def deserialized_model(self, m): def deserialized_model(self, m):
""" """
...@@ -120,9 +134,9 @@ class RoundRobinPartial(Sharing): ...@@ -120,9 +134,9 @@ class RoundRobinPartial(Sharing):
state_dict of received state_dict of received
""" """
m = self.decompress_data(m)
with torch.no_grad(): with torch.no_grad():
state_dict = self.model.state_dict() state_dict = self.model.state_dict()
shapes = [] shapes = []
lens = [] lens = []
tensors_to_cat = [] tensors_to_cat = []
......
import importlib
import logging import logging
from collections import deque
import torch import torch
...@@ -11,7 +11,18 @@ class Sharing: ...@@ -11,7 +11,18 @@ class Sharing:
""" """
def __init__( def __init__(
self, rank, machine_id, communication, mapping, graph, model, dataset, log_dir self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
compress=False,
compression_package=None,
compression_class=None,
): ):
""" """
Constructor Constructor
...@@ -46,45 +57,37 @@ class Sharing: ...@@ -46,45 +57,37 @@ class Sharing:
self.dataset = dataset self.dataset = dataset
self.communication_round = 0 self.communication_round = 0
self.log_dir = log_dir self.log_dir = log_dir
self.total_data = 0
self.peer_deques = dict() self.shapes = []
my_neighbors = self.graph.neighbors(self.uid) self.lens = []
for n in my_neighbors: with torch.no_grad():
self.peer_deques[n] = deque() for _, v in self.model.state_dict().items():
self.shapes.append(v.shape)
def received_from_all(self): t = v.flatten().numpy()
""" self.lens.append(t.shape[0])
Check if all neighbors have sent the current iteration
self.compress = compress
Returns
------- if compression_package and compression_class:
bool compressor_module = importlib.import_module(compression_package)
True if required data has been received, False otherwise compressor_class = getattr(compressor_module, compression_class)
self.compressor = compressor_class()
""" logging.info(f"Using the {compressor_class} to compress the data")
for _, i in self.peer_deques.items(): else:
if len(i) == 0: assert not self.compress
return False
return True def compress_data(self, data):
result = dict(data)
def get_neighbors(self, neighbors): if self.compress:
""" if "params" in result:
Choose which neighbors to share with result["params"] = self.compressor.compress_float(result["params"])
return result
Parameters
---------- def decompress_data(self, data):
neighbors : list(int) if self.compress:
List of all neighbors if "params" in data:
data["params"] = self.compressor.decompress_float(data["params"])
Returns return data
-------
list(int)
Neighbors to share with
"""
# modify neighbors here
return neighbors
def serialized_model(self): def serialized_model(self):
""" """
...@@ -96,11 +99,15 @@ class Sharing: ...@@ -96,11 +99,15 @@ class Sharing:
Model converted to dict Model converted to dict
""" """
m = dict() to_cat = []
for key, val in self.model.state_dict().items(): with torch.no_grad():
m[key] = val.numpy() for _, v in self.model.state_dict().items():
self.total_data += len(self.communication.encrypt(m[key])) t = v.flatten()
return m to_cat.append(t)
flat = torch.cat(to_cat)
data = dict()
data["params"] = flat.numpy()
return self.compress_data(data)
def deserialized_model(self, m): def deserialized_model(self, m):
""" """
...@@ -118,8 +125,16 @@ class Sharing: ...@@ -118,8 +125,16 @@ class Sharing:
""" """
state_dict = dict() state_dict = dict()
for key, value in m.items(): m = self.decompress_data(m)
state_dict[key] = torch.from_numpy(value) T = m["params"]
start_index = 0
for i, key in enumerate(self.model.state_dict()):
end_index = start_index + self.lens[i]
state_dict[key] = torch.from_numpy(
T[start_index:end_index].reshape(self.shapes[i])
)
start_index = end_index
return state_dict return state_dict
def _pre_step(self): def _pre_step(self):
...@@ -136,7 +151,7 @@ class Sharing: ...@@ -136,7 +151,7 @@ class Sharing:
""" """
pass pass
def _averaging(self): def _averaging(self, peer_deques):
""" """
Averages the received model with the local model Averages the received model with the local model
...@@ -144,13 +159,20 @@ class Sharing: ...@@ -144,13 +159,20 @@ class Sharing:
with torch.no_grad(): with torch.no_grad():
total = dict() total = dict()
weight_total = 0 weight_total = 0
for i, n in enumerate(self.peer_deques): for i, n in enumerate(peer_deques):
degree, iteration, data = self.peer_deques[n].popleft() data = peer_deques[n].popleft()
degree, iteration = data["degree"], data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
logging.debug( logging.debug(
"Averaging model from neighbor {} of iteration {}".format(n, iteration) "Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
) )
data = self.deserialized_model(data) data = self.deserialized_model(data)
weight = 1 / (max(len(self.peer_deques), degree) + 1) # Metro-Hastings # Metro-Hastings
weight = 1 / (max(len(peer_deques), degree) + 1)
weight_total += weight weight_total += weight
for key, value in data.items(): for key, value in data.items():
if key in total: if key in total:
...@@ -162,40 +184,45 @@ class Sharing: ...@@ -162,40 +184,45 @@ class Sharing:
total[key] += (1 - weight_total) * value # Metro-Hastings total[key] += (1 - weight_total) * value # Metro-Hastings
self.model.load_state_dict(total) self.model.load_state_dict(total)
self._post_step()
self.communication_round += 1
def step(self): def get_data_to_send(self):
"""
Perform a sharing step. Implements D-PSGD.
"""
self._pre_step() self._pre_step()
data = self.serialized_model() data = self.serialized_model()
my_uid = self.mapping.get_uid(self.rank, self.machine_id) my_uid = self.mapping.get_uid(self.rank, self.machine_id)
all_neighbors = self.graph.neighbors(my_uid) all_neighbors = self.graph.neighbors(my_uid)
iter_neighbors = self.get_neighbors(all_neighbors)
data["degree"] = len(all_neighbors) data["degree"] = len(all_neighbors)
data["iteration"] = self.communication_round data["iteration"] = self.communication_round
for neighbor in iter_neighbors: return data
self.communication.send(neighbor, data)
logging.info("Waiting for messages from neighbors")
while not self.received_from_all():
sender, data = self.communication.receive()
logging.debug("Received model from {}".format(sender))
degree = data["degree"]
iteration = data["iteration"]
del data["degree"]
del data["iteration"]
self.peer_deques[sender].append((degree, iteration, data))
logging.info(
"Deserialized received model from {} of iteration {}".format(
sender, iteration
)
)
logging.info("Starting model averaging after receiving from all neighbors") def _averaging_server(self, peer_deques):
self._averaging() """
logging.info("Model averaging complete") Averages the received models of all working nodes
self.communication_round += 1 """
with torch.no_grad():
total = dict()
for i, n in enumerate(peer_deques):
data = peer_deques[n].popleft()
degree, iteration = data["degree"], data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
)
data = self.deserialized_model(data)
weight = 1 / len(peer_deques)
for key, value in data.items():
if key in total:
total[key] += weight * value
else:
total[key] = weight * value
self.model.load_state_dict(total)
self._post_step() self._post_step()
self.communication_round += 1
return total
# Deprecated
import logging
from collections import deque
import torch
class Sharing:
"""
API defining who to share with and what, and what to do on receiving
"""
def __init__(
self, rank, machine_id, communication, mapping, graph, model, dataset, log_dir
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
"""
self.rank = rank
self.machine_id = machine_id
self.uid = mapping.get_uid(rank, machine_id)
self.communication = communication
self.mapping = mapping
self.graph = graph
self.model = model
self.dataset = dataset
self.communication_round = 0
self.log_dir = log_dir
self.peer_deques = dict()
my_neighbors = self.graph.neighbors(self.uid)
for n in my_neighbors:
self.peer_deques[n] = deque()
self.averaging_weights = self.graph.centr()
def received_from_all(self):
"""
Check if all neighbors have sent the current iteration
Returns
-------
bool
True if required data has been received, False otherwise
"""
for _, i in self.peer_deques.items():
if len(i) == 0:
return False
return True
def get_neighbors(self, neighbors):
"""
Choose which neighbors to share with
Parameters
----------
neighbors : list(int)
List of all neighbors
Returns
-------
list(int)
Neighbors to share with
"""
# modify neighbors here
return neighbors
def serialized_model(self):
"""
Convert model to a dictionary. Here we can choose how much to share
Returns
-------
dict
Model converted to dict
"""
m = dict()
for key, val in self.model.state_dict().items():
m[key] = val.numpy()
return m
def deserialized_model(self, m):
"""
Convert received dict to state_dict.
Parameters
----------
m : dict
received dict
Returns
-------
state_dict
state_dict of received
"""
state_dict = dict()
for key, value in m.items():
state_dict[key] = torch.from_numpy(value)
return state_dict
def _pre_step(self):
"""
Called at the beginning of step.
"""
pass
def _post_step(self):
"""
Called at the end of step.
"""
pass
def _averaging(self):
"""
Averages the received model with the local model
"""
with torch.no_grad():
total = dict()
for _, n in enumerate(self.peer_deques):
_, iteration, data = self.peer_deques[n].popleft()
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
)
data = self.deserialized_model(data)
weight = self.averaging_weights[self.uid, n]
for key, value in data.items():
if key in total:
total[key] += value * weight
else:
total[key] = value * weight
for key, value in self.model.state_dict().items():
total[key] += (
self.averaging_weights[self.uid, self.uid] * value
) # Metro-Hastings
self.model.load_state_dict(total)
def step(self):
"""
Perform a sharing step. Implements D-PSGD.
"""
self._pre_step()
data = self.serialized_model()
my_uid = self.mapping.get_uid(self.rank, self.machine_id)
all_neighbors = self.graph.neighbors(my_uid)
iter_neighbors = self.get_neighbors(all_neighbors)
data["degree"] = len(all_neighbors)
data["iteration"] = self.communication_round
for neighbor in iter_neighbors:
self.communication.send(neighbor, data)
logging.info("Waiting for messages from neighbors")
while not self.received_from_all():
sender, data = self.communication.receive()
logging.debug("Received model from {}".format(sender))
degree = data["degree"]
iteration = data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
self.peer_deques[sender].append((degree, iteration, data))
logging.info(
"Deserialized received model from {} of iteration {}".format(
sender, iteration
)
)
logging.info("Starting model averaging after receiving from all neighbors")
self._averaging()
logging.info("Model averaging complete")
self.communication_round += 1
self._post_step()
...@@ -31,6 +31,9 @@ class SubSampling(Sharing): ...@@ -31,6 +31,9 @@ class SubSampling(Sharing):
metadata_cap=1.0, metadata_cap=1.0,
pickle=True, pickle=True,
layerwise=False, layerwise=False,
compress=False,
compression_package=None,
compression_class=None,
): ):
""" """
Constructor Constructor
...@@ -66,13 +69,22 @@ class SubSampling(Sharing): ...@@ -66,13 +69,22 @@ class SubSampling(Sharing):
""" """
super().__init__( super().__init__(
rank, machine_id, communication, mapping, graph, model, dataset, log_dir rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
compress,
compression_package,
compression_class,
) )
self.alpha = alpha self.alpha = alpha
self.dict_ordered = dict_ordered self.dict_ordered = dict_ordered
self.save_shared = save_shared self.save_shared = save_shared
self.metadata_cap = metadata_cap self.metadata_cap = metadata_cap
self.total_meta = 0
# self.random_seed_generator = torch.Generator() # self.random_seed_generator = torch.Generator()
# # Will use the random device if supported by CPU, else uses the system time # # Will use the random device if supported by CPU, else uses the system time
...@@ -101,6 +113,17 @@ class SubSampling(Sharing): ...@@ -101,6 +113,17 @@ class SubSampling(Sharing):
) )
Path(self.folder_path).mkdir(parents=True, exist_ok=True) Path(self.folder_path).mkdir(parents=True, exist_ok=True)
with torch.no_grad():
tensors_to_cat = []
for _, v in self.model.state_dict().items():
t = v.flatten()
tensors_to_cat.append(t)
self.init_model = torch.cat(tensors_to_cat, dim=0)
self.model.shared_parameters_counter = torch.zeros(
self.init_model.shape[0], dtype=torch.int32
)
def apply_subsampling(self): def apply_subsampling(self):
""" """
Creates a random binary mask that is used to subsample the parameters that will be shared Creates a random binary mask that is used to subsample the parameters that will be shared
...@@ -131,6 +154,7 @@ class SubSampling(Sharing): ...@@ -131,6 +154,7 @@ class SubSampling(Sharing):
<= self.alpha <= self.alpha
) )
subsample = concated[binary_mask] subsample = concated[binary_mask]
self.model.shared_parameters_counter[binary_mask] += 1
# logging.debug("Subsampling vector is of size: " + str(subsample.size(dim = 0))) # logging.debug("Subsampling vector is of size: " + str(subsample.size(dim = 0)))
return (subsample, curr_seed, self.alpha) return (subsample, curr_seed, self.alpha)
else: else:
...@@ -147,6 +171,7 @@ class SubSampling(Sharing): ...@@ -147,6 +171,7 @@ class SubSampling(Sharing):
) )
<= self.alpha <= self.alpha
) )
# TODO: support shared_parameters_counter
selected = flat[binary_mask] selected = flat[binary_mask]
values_list.append(selected) values_list.append(selected)
off += selected.size(dim=0) off += selected.size(dim=0)
...@@ -203,13 +228,7 @@ class SubSampling(Sharing): ...@@ -203,13 +228,7 @@ class SubSampling(Sharing):
m["alpha"] = alpha m["alpha"] = alpha
m["params"] = subsample.numpy() m["params"] = subsample.numpy()
# logging.info("Converted dictionary to json") return self.compress_data(m)
self.total_data += len(self.communication.encrypt(m["params"]))
self.total_meta += len(self.communication.encrypt(m["seed"])) + len(
self.communication.encrypt(m["alpha"])
)
return m
def deserialized_model(self, m): def deserialized_model(self, m):
""" """
...@@ -229,6 +248,8 @@ class SubSampling(Sharing): ...@@ -229,6 +248,8 @@ class SubSampling(Sharing):
if self.alpha > self.metadata_cap: # Share fully if self.alpha > self.metadata_cap: # Share fully
return super().deserialized_model(m) return super().deserialized_model(m)
m = self.decompress_data(m)
with torch.no_grad(): with torch.no_grad():
state_dict = self.model.state_dict() state_dict = self.model.state_dict()
......
# Deprecated
import logging
from collections import deque
import torch
class Synchronous:
"""
Synchronous training
"""
def __init__(
self, rank, machine_id, communication, mapping, graph, model, dataset, log_dir
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
"""
self.rank = rank
self.machine_id = machine_id
self.uid = mapping.get_uid(rank, machine_id)
self.communication = communication
self.mapping = mapping
self.graph = graph
self.model = model
self.dataset = dataset
self.communication_round = 0
self.log_dir = log_dir
self.peer_deques = dict()
self.my_neighbors = self.graph.neighbors(self.uid)
for n in self.my_neighbors:
self.peer_deques[n] = deque()
with torch.no_grad():
self.init_model = {}
for k, v in self.model.state_dict().items():
self.init_model[k] = v.clone().detach()
def received_from_all(self):
"""
Check if all neighbors have sent the current iteration
Returns
-------
bool
True if required data has been received, False otherwise
"""
for _, i in self.peer_deques.items():
if len(i) == 0:
return False
return True
def get_neighbors(self, neighbors):
"""
Choose which neighbors to share with
Parameters
----------
neighbors : list(int)
List of all neighbors
Returns
-------
list(int)
Neighbors to share with
"""
# modify neighbors here
return neighbors
def serialized_gradient(self):
"""
Convert model to a dictionary. Here we can choose how much to share
Returns
-------
dict
Model converted to dict
"""
m = dict()
for key, val in self.model.state_dict().items():
m[key] = val - self.init_model[key] # this is -lr*gradient
return m
def serialized_model(self):
"""
Convert model to a dictionary. Here we can choose how much to share
Returns
-------
dict
Model converted to dict
"""
m = dict()
for key, val in self.model.state_dict().items():
m[key] = val.clone().detach()
return m
def deserialized_model(self, m):
"""
Convert received dict to state_dict.
Parameters
----------
m : dict
received dict
Returns
-------
state_dict
state_dict of received
"""
return m
def _pre_step(self):
"""
Called at the beginning of step.
"""
pass
def _post_step(self):
"""
Called at the end of step.
"""
with torch.no_grad():
self.init_model = {}
for k, v in self.model.state_dict().items():
self.init_model[k] = v.clone().detach()
def _apply_gradients(self):
"""
Averages the received model with the local model
"""
with torch.no_grad():
total = dict()
for i, n in enumerate(self.peer_deques):
gradient = self.peer_deques[n].popleft()
logging.debug(
"Applying gradient from neighbor {}".format(
n,
)
)
grad = self.deserialized_model(gradient)
for key, value in grad.items():
if key in total:
total[key] += value
else:
total[key] = value
my_grad = self.serialized_gradient()
for key, value in my_grad.items():
if key in total:
total[key] += value
else:
total[key] = value
new_model = {}
for key, value in self.init_model.items():
new_model[key] = value + total[key] * (1 / (len(self.my_neighbors) + 1))
self.model.load_state_dict(new_model)
def step(self):
"""
Perform a sharing step. Implements D-PSGD.
"""
self._pre_step()
logging.info("--- COMMUNICATION ROUND {} ---".format(self.communication_round))
if self.uid != 0:
gradient = self.serialized_gradient()
# Should be only one neighbour
self.communication.send(0, gradient)
logging.info("Waiting for messages from central node")
sender, data = self.communication.receive()
logging.debug("Received model from {}".format(sender))
logging.info(
"Deserialized received model from {} of iteration {}".format(
sender, self.communication_round
)
)
self.model.load_state_dict(data)
else:
logging.info("Waiting for messages from leaf nodes")
while not self.received_from_all():
sender, data = self.communication.receive()
logging.debug("Received gradient from {}".format(sender))
self.peer_deques[sender].append(data)
logging.info(
"Deserialized gradient model from {} of iteration {}".format(
sender, self.communication_round
)
)
self._apply_gradients()
data = self.serialized_model()
all_neighbors = self.graph.neighbors(self.uid)
iter_neighbors = self.get_neighbors(all_neighbors)
for neighbor in iter_neighbors:
self.communication.send(neighbor, data)
self.communication_round += 1
self._post_step()
import logging
import torch
from decentralizepy.sharing.PartialModel import PartialModel
from decentralizepy.utils import identity
class TopKNormalized(PartialModel):
"""
This class implements the vanilla version of partial model sharing.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha=1.0,
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
accumulation=False,
save_accumulated="",
change_transformer=identity,
accumulate_averaging_changes=False,
epsilon=0.01,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
alpha : float
Percentage of model to share
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
accumulation : bool
True if the the indices to share should be selected based on accumulated frequency change
save_accumulated : bool
True if accumulated weight change should be written to file. In case of accumulation the accumulated change
is stored. If a change_transformer is used then the transformed change is stored.
change_transformer : (x: Tensor) -> Tensor
A function that transforms the model change into other domains. Default: identity function
accumulate_averaging_changes: bool
True if the accumulation should account the model change due to averaging
epsilon : float
numerical stability parameter used during normalization
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha,
dict_ordered,
save_shared,
metadata_cap,
accumulation,
save_accumulated,
change_transformer,
accumulate_averaging_changes,
compress,
compression_package,
compression_class,
)
self.epsilon = epsilon
def extract_top_gradients(self):
"""
Extract the indices and values of the topK gradients.
The gradients must have been accumulated.
Returns
-------
tuple
(a,b). a: The magnitudes of the topK gradients, b: Their indices.
"""
logging.info("Returning topk gradients")
G_topk = torch.abs(self.model.model_change)
G_topk_normalized = G_topk / (torch.abs(self.pre_share_model) + self.epsilon)
std, mean = torch.std_mean(G_topk, unbiased=False)
self.std = std.item()
self.mean = mean.item()
return torch.topk(
G_topk_normalized, round(self.alpha * G_topk.shape[0]), dim=0, sorted=False
)
...@@ -29,6 +29,9 @@ class TopKParams(Sharing): ...@@ -29,6 +29,9 @@ class TopKParams(Sharing):
dict_ordered=True, dict_ordered=True,
save_shared=False, save_shared=False,
metadata_cap=1.0, metadata_cap=1.0,
compress=False,
compression_package=None,
compression_class=None,
): ):
""" """
Constructor Constructor
...@@ -62,13 +65,22 @@ class TopKParams(Sharing): ...@@ -62,13 +65,22 @@ class TopKParams(Sharing):
""" """
super().__init__( super().__init__(
rank, machine_id, communication, mapping, graph, model, dataset, log_dir rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
compress,
compression_package,
compression_class,
) )
self.alpha = alpha self.alpha = alpha
self.dict_ordered = dict_ordered self.dict_ordered = dict_ordered
self.save_shared = save_shared self.save_shared = save_shared
self.metadata_cap = metadata_cap self.metadata_cap = metadata_cap
self.total_meta = 0
if self.save_shared: if self.save_shared:
# Only save for 2 procs: Save space # Only save for 2 procs: Save space
...@@ -128,7 +140,7 @@ class TopKParams(Sharing): ...@@ -128,7 +140,7 @@ class TopKParams(Sharing):
with torch.no_grad(): with torch.no_grad():
values, index, offsets = self.extract_top_params() values, index, offsets = self.extract_top_params()
self.model.shared_parameters_counter[index] += 1
if self.save_shared: if self.save_shared:
shared_params = dict() shared_params = dict()
shared_params["order"] = list(self.model.state_dict().keys()) shared_params["order"] = list(self.model.state_dict().keys())
...@@ -171,12 +183,8 @@ class TopKParams(Sharing): ...@@ -171,12 +183,8 @@ class TopKParams(Sharing):
# m[key] = json.dumps(m[key]) # m[key] = json.dumps(m[key])
logging.info("Converted dictionary to json") logging.info("Converted dictionary to json")
self.total_data += len(self.communication.encrypt(m["params"]))
self.total_meta += len(self.communication.encrypt(m["indices"])) + len(
self.communication.encrypt(m["offsets"])
)
return m return self.compress_data(m)
def deserialized_model(self, m): def deserialized_model(self, m):
""" """
...@@ -196,6 +204,8 @@ class TopKParams(Sharing): ...@@ -196,6 +204,8 @@ class TopKParams(Sharing):
if self.alpha > self.metadata_cap: # Share fully if self.alpha > self.metadata_cap: # Share fully
return super().deserialized_model(m) return super().deserialized_model(m)
m = self.decompress_data(m)
with torch.no_grad(): with torch.no_grad():
state_dict = self.model.state_dict() state_dict = self.model.state_dict()
......
...@@ -26,6 +26,9 @@ class TopKPlusRandom(PartialModel): ...@@ -26,6 +26,9 @@ class TopKPlusRandom(PartialModel):
dict_ordered=True, dict_ordered=True,
save_shared=False, save_shared=False,
metadata_cap=1.0, metadata_cap=1.0,
compress=False,
compression_package=None,
compression_class=None,
): ):
""" """
Constructor Constructor
...@@ -71,6 +74,9 @@ class TopKPlusRandom(PartialModel): ...@@ -71,6 +74,9 @@ class TopKPlusRandom(PartialModel):
dict_ordered, dict_ordered,
save_shared, save_shared,
metadata_cap, metadata_cap,
compress,
compression_package,
compression_class,
) )
def extract_top_gradients(self): def extract_top_gradients(self):
......
import json import json
import logging import logging
import os import os
from pathlib import Path
from time import time
import numpy as np import numpy as np
import pywt import pywt
...@@ -10,27 +8,29 @@ import torch ...@@ -10,27 +8,29 @@ import torch
from decentralizepy.sharing.PartialModel import PartialModel from decentralizepy.sharing.PartialModel import PartialModel
def change_transformer_wavelet(x, wavelet, level):
"""
Transforms the model changes into wavelet frequency domain
Parameters def change_transformer_wavelet(x, wavelet, level):
---------- """
x : torch.Tensor Transforms the model changes into wavelet frequency domain
Model change in the space domain
wavelet : str Parameters
name of the wavelet to be used in gradient compression ----------
level: int x : torch.Tensor
name of the wavelet to be used in gradient compression Model change in the space domain
wavelet : str
name of the wavelet to be used in gradient compression
level: int
name of the wavelet to be used in gradient compression
Returns
-------
x : torch.Tensor
Representation of the change int the wavelet domain
"""
coeff = pywt.wavedec(x, wavelet, level=level)
data, coeff_slices = pywt.coeffs_to_array(coeff)
return torch.from_numpy(data.ravel())
Returns
-------
x : torch.Tensor
Representation of the change int the wavelet domain
"""
coeff = pywt.wavedec(x, wavelet, level=level)
data, coeff_slices = pywt.coeffs_to_array(coeff)
return torch.from_numpy(data.ravel())
class Wavelet(PartialModel): class Wavelet(PartialModel):
""" """
...@@ -58,7 +58,10 @@ class Wavelet(PartialModel): ...@@ -58,7 +58,10 @@ class Wavelet(PartialModel):
change_based_selection=True, change_based_selection=True,
save_accumulated="", save_accumulated="",
accumulation=False, accumulation=False,
accumulate_averaging_changes = False accumulate_averaging_changes=False,
compress=False,
compression_package=None,
compression_class=None,
): ):
""" """
Constructor Constructor
...@@ -107,9 +110,25 @@ class Wavelet(PartialModel): ...@@ -107,9 +110,25 @@ class Wavelet(PartialModel):
self.level = level self.level = level
super().__init__( super().__init__(
rank, machine_id, communication, mapping, graph, model, dataset, log_dir, alpha, dict_ordered, save_shared, rank,
metadata_cap, accumulation, save_accumulated, lambda x : change_transformer_wavelet(x, wavelet, level), machine_id,
accumulate_averaging_changes communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha,
dict_ordered,
save_shared,
metadata_cap,
accumulation,
save_accumulated,
lambda x: change_transformer_wavelet(x, wavelet, level),
accumulate_averaging_changes,
compress,
compression_package,
compression_class,
) )
self.change_based_selection = change_based_selection self.change_based_selection = change_based_selection
...@@ -132,13 +151,9 @@ class Wavelet(PartialModel): ...@@ -132,13 +151,9 @@ class Wavelet(PartialModel):
""" """
logging.info("Returning dwt compressed model weights") logging.info("Returning wavelet compressed model weights")
tensors_to_cat = [v.data.flatten() for _, v in self.model.state_dict().items()] data = self.pre_share_model_transformed
concated = torch.cat(tensors_to_cat, dim=0)
data = self.change_transformer(concated)
logging.info("produced wavelet representation of current model")
if self.change_based_selection: if self.change_based_selection:
logging.info("changed based selection")
diff = self.model.model_change diff = self.model.model_change
_, index = torch.topk( _, index = torch.topk(
diff.abs(), diff.abs(),
...@@ -146,7 +161,6 @@ class Wavelet(PartialModel): ...@@ -146,7 +161,6 @@ class Wavelet(PartialModel):
dim=0, dim=0,
sorted=False, sorted=False,
) )
logging.info("finished change based selection")
else: else:
_, index = torch.topk( _, index = torch.topk(
data.abs(), data.abs(),
...@@ -154,7 +168,7 @@ class Wavelet(PartialModel): ...@@ -154,7 +168,7 @@ class Wavelet(PartialModel):
dim=0, dim=0,
sorted=False, sorted=False,
) )
index, _ = torch.sort(index)
return data[index], index return data[index], index
def serialized_model(self): def serialized_model(self):
...@@ -167,15 +181,20 @@ class Wavelet(PartialModel): ...@@ -167,15 +181,20 @@ class Wavelet(PartialModel):
Model converted to json dict Model converted to json dict
""" """
logging.info("serializing wavelet model") m = dict()
if self.alpha > self.metadata_cap: # Share fully if self.alpha >= self.metadata_cap: # Share fully
return super().serialized_model() data = self.pre_share_model_transformed
m["params"] = data.numpy()
if self.model.accumulated_changes is not None:
self.model.accumulated_changes = torch.zeros_like(
self.model.accumulated_changes
)
return self.compress_data(m)
with torch.no_grad(): with torch.no_grad():
topk, indices = self.apply_wavelet() topk, indices = self.apply_wavelet()
self.model.shared_parameters_counter[indices] += 1
self.model.rewind_accumulation(indices) self.model.rewind_accumulation(indices)
logging.info("finished rewind")
if self.save_shared: if self.save_shared:
shared_params = dict() shared_params = dict()
shared_params["order"] = list(self.model.state_dict().keys()) shared_params["order"] = list(self.model.state_dict().keys())
...@@ -184,7 +203,8 @@ class Wavelet(PartialModel): ...@@ -184,7 +203,8 @@ class Wavelet(PartialModel):
shapes[k] = list(v.shape) shapes[k] = list(v.shape)
shared_params["shapes"] = shapes shared_params["shapes"] = shapes
shared_params[self.communication_round] = indices.tolist() # is slow # is slow
shared_params[self.communication_round] = indices.tolist()
shared_params["alpha"] = self.alpha shared_params["alpha"] = self.alpha
...@@ -197,8 +217,6 @@ class Wavelet(PartialModel): ...@@ -197,8 +217,6 @@ class Wavelet(PartialModel):
) as of: ) as of:
json.dump(shared_params, of) json.dump(shared_params, of)
m = dict()
if not self.dict_ordered: if not self.dict_ordered:
raise NotImplementedError raise NotImplementedError
...@@ -208,12 +226,9 @@ class Wavelet(PartialModel): ...@@ -208,12 +226,9 @@ class Wavelet(PartialModel):
m["indices"] = indices.numpy().astype(np.int32) m["indices"] = indices.numpy().astype(np.int32)
self.total_data += len(self.communication.encrypt(m["params"])) m["send_partial"] = True
self.total_meta += len(self.communication.encrypt(m["indices"])) + len(
self.communication.encrypt(m["alpha"])
)
return m return self.compress_data(m)
def deserialized_model(self, m): def deserialized_model(self, m):
""" """
...@@ -230,26 +245,28 @@ class Wavelet(PartialModel): ...@@ -230,26 +245,28 @@ class Wavelet(PartialModel):
state_dict of received state_dict of received
""" """
logging.info("deserializing wavelet model") m = self.decompress_data(m)
if self.alpha > self.metadata_cap: # Share fully ret = dict()
return super().deserialized_model(m) if "send_partial" not in m:
params = m["params"]
params_tensor = torch.tensor(params)
ret["params"] = params_tensor
return ret
with torch.no_grad(): with torch.no_grad():
if not self.dict_ordered: if not self.dict_ordered:
raise NotImplementedError raise NotImplementedError
indices = m["indices"]
alpha = m["alpha"] alpha = m["alpha"]
params = m["params"]
params_tensor = torch.tensor(params) params_tensor = torch.tensor(m["params"])
indices_tensor = torch.tensor(indices, dtype=torch.long) indices_tensor = torch.tensor(m["indices"], dtype=torch.long)
ret = dict() ret = dict()
ret["indices"] = indices_tensor ret["indices"] = indices_tensor
ret["params"] = params_tensor ret["params"] = params_tensor
return ret ret["send_partial"] = True
return ret
def _averaging(self): def _averaging(self, peer_deques):
""" """
Averages the received model with the local model Averages the received model with the local model
...@@ -257,25 +274,31 @@ class Wavelet(PartialModel): ...@@ -257,25 +274,31 @@ class Wavelet(PartialModel):
with torch.no_grad(): with torch.no_grad():
total = None total = None
weight_total = 0 weight_total = 0
tensors_to_cat = [ wt_params = self.pre_share_model_transformed
v.data.flatten() for _, v in self.model.state_dict().items() for i, n in enumerate(peer_deques):
] data = peer_deques[n].popleft()
pre_share_model = torch.cat(tensors_to_cat, dim=0) degree, iteration = data["degree"], data["iteration"]
wt_params = self.change_transformer(pre_share_model) del data["degree"]
for i, n in enumerate(self.peer_deques): del data["iteration"]
degree, iteration, data = self.peer_deques[n].popleft() del data["CHANNEL"]
logging.debug( logging.debug(
"Averaging model from neighbor {} of iteration {}".format(n, iteration) "Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
) )
data = self.deserialized_model(data) data = self.deserialized_model(data)
params = data["params"] params = data["params"]
indices = data["indices"] if "indices" in data:
# use local data to complement indices = data["indices"]
topkwf = wt_params.clone().detach() # use local data to complement
topkwf[indices] = params topkwf = wt_params.clone().detach()
topkwf = topkwf.reshape(self.wt_shape) topkwf[indices] = params
topkwf = topkwf.reshape(self.wt_shape)
else:
topkwf = params.reshape(self.wt_shape)
weight = 1 / (max(len(self.peer_deques), degree) + 1) # Metro-Hastings # Metro-Hastings
weight = 1 / (max(len(peer_deques), degree) + 1)
weight_total += weight weight_total += weight
if total is None: if total is None:
total = weight * topkwf total = weight * topkwf
...@@ -296,8 +319,67 @@ class Wavelet(PartialModel): ...@@ -296,8 +319,67 @@ class Wavelet(PartialModel):
std_dict = {} std_dict = {}
for i, key in enumerate(self.model.state_dict()): for i, key in enumerate(self.model.state_dict()):
end_index = start_index + self.lens[i] end_index = start_index + self.lens[i]
std_dict[key] = reverse_total[start_index:end_index].reshape(self.shapes[i]) std_dict[key] = reverse_total[start_index:end_index].reshape(
self.shapes[i]
)
start_index = end_index start_index = end_index
self.model.load_state_dict(std_dict) self.model.load_state_dict(std_dict)
self._post_step()
self.communication_round += 1
def _averaging_server(self, peer_deques):
"""
Averages the received models of all working nodes
"""
with torch.no_grad():
total = None
wt_params = self.pre_share_model_transformed
for i, n in enumerate(peer_deques):
data = peer_deques[n].popleft()
degree, iteration = data["degree"], data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
)
data = self.deserialized_model(data)
params = data["params"]
if "indices" in data:
indices = data["indices"]
# use local data to complement
topkwf = wt_params.clone().detach()
topkwf[indices] = params
topkwf = topkwf.reshape(self.wt_shape)
else:
topkwf = params.reshape(self.wt_shape)
weight = 1 / len(peer_deques)
if total is None:
total = weight * topkwf
else:
total += weight * topkwf
avg_wf_params = pywt.array_to_coeffs(
total.numpy(), self.coeff_slices, output_format="wavedec"
)
reverse_total = torch.from_numpy(
pywt.waverec(avg_wf_params, wavelet=self.wavelet)
)
start_index = 0
std_dict = {}
for i, key in enumerate(self.model.state_dict()):
end_index = start_index + self.lens[i]
std_dict[key] = reverse_total[start_index:end_index].reshape(
self.shapes[i]
)
start_index = end_index
self.model.load_state_dict(std_dict)
self._post_step()
self.communication_round += 1
...@@ -69,13 +69,23 @@ def get_args(): ...@@ -69,13 +69,23 @@ def get_args():
type=str, type=str,
default="./{}".format(datetime.datetime.now().isoformat(timespec="minutes")), default="./{}".format(datetime.datetime.now().isoformat(timespec="minutes")),
) )
parser.add_argument(
"-wsd",
"--weights_store_dir",
type=str,
default="./{}_ws".format(datetime.datetime.now().isoformat(timespec="minutes")),
)
parser.add_argument("-is", "--iterations", type=int, default=1) parser.add_argument("-is", "--iterations", type=int, default=1)
parser.add_argument("-cf", "--config_file", type=str, default="config.ini") parser.add_argument("-cf", "--config_file", type=str, default="config.ini")
parser.add_argument("-ll", "--log_level", type=str, default="INFO") parser.add_argument("-ll", "--log_level", type=str, default="INFO")
parser.add_argument("-gf", "--graph_file", type=str, default="36_nodes.edges") parser.add_argument("-gf", "--graph_file", type=str, default="36_nodes.edges")
parser.add_argument("-gt", "--graph_type", type=str, default="edges") parser.add_argument("-gt", "--graph_type", type=str, default="edges")
parser.add_argument("-ta", "--test_after", type=int, default=5) parser.add_argument("-ta", "--test_after", type=int, default=5)
parser.add_argument("-tea", "--train_evaluate_after", type=int, default=1)
parser.add_argument("-ro", "--reset_optimizer", type=int, default=1) parser.add_argument("-ro", "--reset_optimizer", type=int, default=1)
parser.add_argument("-sm", "--server_machine", type=int, default=0)
parser.add_argument("-sr", "--server_rank", type=int, default=-1)
parser.add_argument("-wr", "--working_rate", type=float, default=1.0)
args = parser.parse_args() args = parser.parse_args()
return args return args
...@@ -98,17 +108,21 @@ def write_args(args, path): ...@@ -98,17 +108,21 @@ def write_args(args, path):
"procs_per_machine": args.procs_per_machine, "procs_per_machine": args.procs_per_machine,
"machines": args.machines, "machines": args.machines,
"log_dir": args.log_dir, "log_dir": args.log_dir,
"weights_store_dir": args.weights_store_dir,
"iterations": args.iterations, "iterations": args.iterations,
"config_file": args.config_file, "config_file": args.config_file,
"log_level": args.log_level, "log_level": args.log_level,
"graph_file": args.graph_file, "graph_file": args.graph_file,
"graph_type": args.graph_type, "graph_type": args.graph_type,
"test_after": args.test_after, "test_after": args.test_after,
"train_evaluate_after": args.train_evaluate_after,
"reset_optimizer": args.reset_optimizer, "reset_optimizer": args.reset_optimizer,
"working_rate": args.working_rate,
} }
with open(os.path.join(path, "args.json"), "w") as of: with open(os.path.join(path, "args.json"), "w") as of:
json.dump(data, of) json.dump(data, of)
def identity(obj): def identity(obj):
""" """
Identity function Identity function
...@@ -121,4 +135,4 @@ def identity(obj): ...@@ -121,4 +135,4 @@ def identity(obj):
obj obj
The same object The same object
""" """
return obj return obj
\ No newline at end of file
[DATASET]
dataset_package = decentralizepy.datasets.CIFAR10
dataset_class = CIFAR10
model_class = LeNet
train_dir = /mnt/nfs/shared/CIFAR
test_dir = /mnt/nfs/shared/CIFAR
; python list of fractions below
sizes =
random_seed = 90
partition_niid = True
shards = 4
[OPTIMIZER_PARAMS]
optimizer_package = torch.optim
optimizer_class = SGD
lr = 0.01
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
rounds = 3
full_epochs = False
batch_size = 8
shuffle = True
loss_package = torch.nn
loss_class = CrossEntropyLoss
[COMMUNICATION]
comm_package = decentralizepy.communication.TCP
comm_class = TCP
addresses_filepath = /mnt/nfs/risharma/Gitlab/tutorial/ip.json
[SHARING]
sharing_package = decentralizepy.sharing.Sharing
sharing_class = Sharing
{
"0": "127.0.0.1"
}
\ No newline at end of file
16
0 12
0 14
0 15
1 8
1 3
1 6
2 9
2 10
2 5
3 1
3 11
3 9
4 9
4 12
4 13
5 2
5 6
5 7
6 1
6 5
6 7
7 5
7 6
7 14
8 1
8 13
8 14
9 2
9 3
9 4
10 2
10 11
10 13
11 10
11 3
11 15
12 0
12 4
12 15
13 8
13 10
13 4
14 0
14 8
14 7
15 0
15 11
15 12
#!/bin/bash
decpy_path=/mnt/nfs/risharma/Gitlab/decentralizepy/eval
cd $decpy_path
env_python=~/miniconda3/envs/decpy/bin/python3
graph=/mnt/nfs/risharma/Gitlab/tutorial/96_regular.edges
original_config=/mnt/nfs/risharma/Gitlab/tutorial/config_celeba_sharing.ini
config_file=~/tmp/config.ini
procs_per_machine=16
machines=1
iterations=80
test_after=20
eval_file=testingPeerSampler.py
log_level=INFO
m=`cat $(grep addresses_filepath $original_config | awk '{print $3}') | grep $(/sbin/ifconfig ens785 | grep 'inet ' | awk '{print $2}') | cut -d'"' -f2`
echo M is $m
log_dir=$(date '+%Y-%m-%dT%H:%M')/machine$m
mkdir -p $log_dir
cp $original_config $config_file
# echo "alpha = 0.10" >> $config_file
$env_python $eval_file -ro 0 -tea $test_after -ld $log_dir -mid $m -ps $procs_per_machine -ms $machines -is $iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level -wsd $log_dir
\ No newline at end of file
#!/bin/bash
decpy_path=/mnt/nfs/risharma/Gitlab/decentralizepy/eval
cd $decpy_path
env_python=~/miniconda3/envs/decpy/bin/python3
graph=/mnt/nfs/risharma/Gitlab/tutorial/96_regular.edges
original_config=/mnt/nfs/risharma/Gitlab/tutorial/config_celeba_sharing.ini
config_file=~/tmp/config.ini
procs_per_machine=16
machines=1
iterations=80
test_after=20
eval_file=testingFederated.py
log_level=INFO
server_rank=-1
server_machine=0
working_rate=0.5
m=`cat $(grep addresses_filepath $original_config | awk '{print $3}') | grep $(/sbin/ifconfig ens785 | grep 'inet ' | awk '{print $2}') | cut -d'"' -f2`
echo M is $m
log_dir=$(date '+%Y-%m-%dT%H:%M')/machine$m
mkdir -p $log_dir
cp $original_config $config_file
# echo "alpha = 0.10" >> $config_file
$env_python $eval_file -ro 0 -tea $test_after -ld $log_dir -mid $m -ps $procs_per_machine -ms $machines -is $iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level -ctr 0 -cte 0 -wsd $log_dir -sm $server_machine -sr $server_rank -wr $working_rate
\ No newline at end of file