Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • sacs/decentralizepy
  • mvujas/decentralizepy
  • randl/decentralizepy
3 results
Show changes
Showing
with 5084 additions and 171 deletions
import pickle
from pathlib import Path
import torch
from torch import nn
class Model(nn.Module):
"""
This class wraps the torch model
More fields can be added here
"""
def __init__(self):
"""
Constructor
"""
super().__init__()
self.accumulated_gradients = []
self.model_change = None
self._param_count_ot = None
self._param_count_total = None
self.accumulated_changes = None
self.shared_parameters_counter = None
def count_params(self, only_trainable=False):
"""
Counts the total number of params
Parameters
----------
only_trainable : bool
Counts only parameters with gradients when True
Returns
-------
int
Total number of parameters
"""
if only_trainable:
if not self._param_count_ot:
self._param_count_ot = sum(
p.numel() for p in self.parameters() if p.requires_grad
)
return self._param_count_ot
else:
if not self._param_count_total:
self._param_count_total = sum(p.numel() for p in self.parameters())
return self._param_count_total
def rewind_accumulation(self, indices):
"""
resets accumulated_changes at the given indices
Parameters
----------
indices : torch.Tensor
Tensor that contains indices corresponding to the flatten model
"""
if self.accumulated_changes is not None:
self.accumulated_changes[indices] = 0.0
def dump_weights(self, directory, uid, round):
"""
dumps the current model as a pickle file into the specified direcectory
Parameters
----------
directory : str
directory in which the weights are dumped
uid : int
uid of the node, will be used to give the weight a unique name
round : int
current round, will be used to give the weight a unique name
"""
with torch.no_grad():
tensors_to_cat = []
for _, v in self.state_dict().items():
tensors_to_cat.append(v.flatten())
flat = torch.cat(tensors_to_cat)
with open(Path(directory) / f"{round}_weight_{uid}.pk", "wb") as f:
pickle.dump(flat, f)
def get_weights(self):
"""
flattens the current weights
"""
with torch.no_grad():
tensors_to_cat = []
for _, v in self.state_dict().items():
tensors_to_cat.append(v.flatten())
flat = torch.cat(tensors_to_cat)
return flat
"""
Copyright (c) 2016- Facebook, Inc (Adam Paszke)
Copyright (c) 2014- Facebook, Inc (Soumith Chintala)
Copyright (c) 2011-2014 Idiap Research Institute (Ronan Collobert)
Copyright (c) 2012-2014 Deepmind Technologies (Koray Kavukcuoglu)
Copyright (c) 2011-2012 NEC Laboratories America (Koray Kavukcuoglu)
Copyright (c) 2011-2013 NYU (Clement Farabet)
Copyright (c) 2006-2010 NEC Laboratories America (Ronan Collobert, Leon Bottou, Iain Melvin, Jason Weston)
Copyright (c) 2006 Idiap Research Institute (Samy Bengio)
Copyright (c) 2001-2004 Idiap Research Institute (Ronan Collobert, Samy Bengio, Johnny Mariethoz)
From Caffe2:
Copyright (c) 2016-present, Facebook Inc. All rights reserved.
All contributions by Facebook:
Copyright (c) 2016 Facebook Inc.
All contributions by Google:
Copyright (c) 2015 Google Inc.
All rights reserved.
All contributions by Yangqing Jia:
Copyright (c) 2015 Yangqing Jia
All rights reserved.
All contributions by Kakao Brain:
Copyright 2019-2020 Kakao Brain
All contributions from Caffe:
Copyright(c) 2013, 2014, 2015, the respective contributors
All rights reserved.
All other contributions:
Copyright(c) 2015, 2016 the respective contributors
All rights reserved.
Caffe2 uses a copyright model similar to Caffe: each contributor holds
copyright over their contributions to Caffe2. The project versioning records
all such contribution and copyright details. If a contributor wants to further
mark their specific copyright on a particular contribution, they should
indicate their copyright solely in the commit message of the change when it is
committed.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the names of Facebook, Deepmind Technologies, NYU, NEC Laboratories America
and IDIAP Research Institute nor the names of its contributors may be
used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
"""
from torch import nn
# Copied and modified from https://github.com/pytorch/pytorch/blob/75024e228ca441290b6a1c2e564300ad507d7af6/benchmarks/functional_autograd_benchmark/torchvision_models.py
def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1):
"""3x3 convolution with padding"""
return nn.Conv2d(
in_planes,
out_planes,
kernel_size=3,
stride=stride,
padding=dilation,
groups=groups,
bias=False,
dilation=dilation,
)
def conv1x1(in_planes, out_planes, stride=1):
"""1x1 convolution"""
return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)
class BasicBlock(nn.Module):
expansion = 1
def __init__(
self,
inplanes,
planes,
stride=1,
downsample=None,
groups=1,
base_width=64,
dilation=1,
norm_layer=None,
):
super(BasicBlock, self).__init__()
if norm_layer is None:
norm_layer = nn.BatchNorm2d
if dilation > 1:
raise NotImplementedError("Dilation > 1 not supported in BasicBlock")
# Both self.conv1 and self.downsample layers downsample the input when stride != 1
self.conv1 = conv3x3(inplanes, planes, stride)
self.bn1 = norm_layer(planes)
self.relu = nn.ReLU(inplace=True)
self.conv2 = conv3x3(planes, planes)
self.bn2 = norm_layer(planes)
self.downsample = downsample
self.stride = stride
def forward(self, x):
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
class Bottleneck(nn.Module):
# Bottleneck in torchvision places the stride for downsampling at 3x3 convolution(self.conv2)
# while original implementation places the stride at the first 1x1 convolution(self.conv1)
# according to "Deep residual learning for image recognition"https://arxiv.org/abs/1512.03385.
# This variant is also known as ResNet V1.5 and improves accuracy according to
# https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch.
expansion = 4
def __init__(
self,
inplanes,
planes,
stride=1,
downsample=None,
groups=1,
base_width=64,
dilation=1,
norm_layer=None,
):
super(Bottleneck, self).__init__()
if norm_layer is None:
norm_layer = nn.BatchNorm2d
width = int(planes * (base_width / 64.0)) * groups
# Both self.conv2 and self.downsample layers downsample the input when stride != 1
self.conv1 = conv1x1(inplanes, width)
self.bn1 = norm_layer(width)
self.conv2 = conv3x3(width, width, stride, groups, dilation)
self.bn2 = norm_layer(width)
self.conv3 = conv1x1(width, planes * self.expansion)
self.bn3 = norm_layer(planes * self.expansion)
self.relu = nn.ReLU(inplace=True)
self.downsample = downsample
self.stride = stride
def forward(self, x):
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
import importlib
import json
import logging
import math
import os
from collections import deque
import torch
from matplotlib import pyplot as plt
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.node.Node import Node
class DPSGDNode(Node):
"""
This class defines the node for DPSGD
"""
def save_plot(self, l, label, title, xlabel, filename):
"""
Save Matplotlib plot. Clears previous plots.
Parameters
----------
l : dict
dict of x -> y. `x` must be castable to int.
label : str
label of the plot. Used for legend.
title : str
Header
xlabel : str
x-axis label
filename : str
Name of file to save the plot as.
"""
plt.clf()
y_axis = [l[key] for key in l.keys()]
x_axis = list(map(int, l.keys()))
plt.plot(x_axis, y_axis, label=label)
plt.xlabel(xlabel)
plt.title(title)
plt.savefig(filename)
def get_neighbors(self, node=None):
return self.my_neighbors
# def instantiate_peer_deques(self):
# for neighbor in self.my_neighbors:
# if neighbor not in self.peer_deques:
# self.peer_deques[neighbor] = deque()
def receive_DPSGD(self):
return self.receive_channel("DPSGD")
def run(self):
"""
Start the decentralized learning
"""
self.testset = self.dataset.get_testset()
rounds_to_test = self.test_after
rounds_to_train_evaluate = self.train_evaluate_after
global_epoch = 1
change = 1
for iteration in range(self.iterations):
logging.info("Starting training iteration: %d", iteration)
rounds_to_train_evaluate -= 1
rounds_to_test -= 1
self.iteration = iteration
self.trainer.train(self.dataset)
new_neighbors = self.get_neighbors()
# The following code does not work because TCP sockets are supposed to be long lived.
# for neighbor in self.my_neighbors:
# if neighbor not in new_neighbors:
# logging.info("Removing neighbor {}".format(neighbor))
# if neighbor in self.peer_deques:
# assert len(self.peer_deques[neighbor]) == 0
# del self.peer_deques[neighbor]
# self.communication.destroy_connection(neighbor, linger = 10000)
# self.barrier.remove(neighbor)
self.my_neighbors = new_neighbors
self.connect_neighbors()
logging.info("Connected to all neighbors")
# self.instantiate_peer_deques()
to_send = self.sharing.get_data_to_send()
to_send["CHANNEL"] = "DPSGD"
for neighbor in self.my_neighbors:
self.communication.send(neighbor, to_send)
while not self.received_from_all():
sender, data = self.receive_DPSGD()
logging.info(
"Received Model from {} of iteration {}".format(
sender, data["iteration"]
)
)
if sender not in self.peer_deques:
self.peer_deques[sender] = deque()
self.peer_deques[sender].append(data)
averaging_deque = dict()
for neighbor in self.my_neighbors:
averaging_deque[neighbor] = self.peer_deques[neighbor]
self.sharing._averaging(averaging_deque)
if self.reset_optimizer:
self.optimizer = self.optimizer_class(
self.model.parameters(), **self.optimizer_params
) # Reset optimizer state
self.trainer.reset_optimizer(self.optimizer)
if iteration:
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)),
"r",
) as inf:
results_dict = json.load(inf)
else:
results_dict = {
"train_loss": {},
"test_loss": {},
"test_acc": {},
"total_bytes": {},
"total_meta": {},
"total_data_per_n": {},
}
results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes
if hasattr(self.communication, "total_meta"):
results_dict["total_meta"][
iteration + 1
] = self.communication.total_meta
if hasattr(self.communication, "total_data"):
results_dict["total_data_per_n"][
iteration + 1
] = self.communication.total_data
if rounds_to_train_evaluate == 0:
logging.info("Evaluating on train set.")
rounds_to_train_evaluate = self.train_evaluate_after * change
loss_after_sharing = self.trainer.eval_loss(self.dataset)
results_dict["train_loss"][iteration + 1] = loss_after_sharing
self.save_plot(
results_dict["train_loss"],
"train_loss",
"Training Loss",
"Communication Rounds",
os.path.join(self.log_dir, "{}_train_loss.png".format(self.rank)),
)
if self.dataset.__testing__ and rounds_to_test == 0:
rounds_to_test = self.test_after * change
logging.info("Evaluating on test set.")
ta, tl = self.dataset.test(self.model, self.loss)
results_dict["test_acc"][iteration + 1] = ta
results_dict["test_loss"][iteration + 1] = tl
if global_epoch == 49:
change *= 2
global_epoch += change
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)), "w"
) as of:
json.dump(results_dict, of)
if self.model.shared_parameters_counter is not None:
logging.info("Saving the shared parameter counts")
with open(
os.path.join(
self.log_dir, "{}_shared_parameters.json".format(self.rank)
),
"w",
) as of:
json.dump(self.model.shared_parameters_counter.numpy().tolist(), of)
self.disconnect_neighbors()
logging.info("Storing final weight")
self.model.dump_weights(self.weights_store_dir, self.uid, iteration)
logging.info("All neighbors disconnected. Process complete!")
def cache_fields(
self,
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
reset_optimizer,
):
"""
Instantiate object field with arguments.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
"""
self.rank = rank
self.machine_id = machine_id
self.graph = graph
self.mapping = mapping
self.uid = self.mapping.get_uid(rank, machine_id)
self.log_dir = log_dir
self.weights_store_dir = weights_store_dir
self.iterations = iterations
self.test_after = test_after
self.train_evaluate_after = train_evaluate_after
self.reset_optimizer = reset_optimizer
self.sent_disconnections = False
logging.info("Rank: %d", self.rank)
logging.info("type(graph): %s", str(type(self.rank)))
logging.info("type(mapping): %s", str(type(self.mapping)))
def init_comm(self, comm_configs):
"""
Instantiate communication module from config.
Parameters
----------
comm_configs : dict
Python dict containing communication config params
"""
comm_module = importlib.import_module(comm_configs["comm_package"])
comm_class = getattr(comm_module, comm_configs["comm_class"])
comm_params = utils.remove_keys(comm_configs, ["comm_package", "comm_class"])
self.addresses_filepath = comm_params.get("addresses_filepath", None)
self.communication = comm_class(
self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params
)
def instantiate(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
*args
):
"""
Construct objects.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations.
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
args : optional
Other arguments
"""
logging.info("Started process.")
self.init_log(log_dir, rank, log_level)
self.cache_fields(
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
reset_optimizer,
)
self.init_dataset_model(config["DATASET"])
self.init_optimizer(config["OPTIMIZER_PARAMS"])
self.init_trainer(config["TRAIN_PARAMS"])
self.init_comm(config["COMMUNICATION"])
self.message_queue = dict()
self.barrier = set()
self.my_neighbors = self.graph.neighbors(self.uid)
self.init_sharing(config["SHARING"])
self.peer_deques = dict()
self.connect_neighbors()
def received_from_all(self):
"""
Check if all neighbors have sent the current iteration
Returns
-------
bool
True if required data has been received, False otherwise
"""
for k in self.my_neighbors:
if (k not in self.peer_deques) or len(self.peer_deques[k]) == 0:
return False
return True
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
args : optional
Other arguments
"""
total_threads = os.cpu_count()
self.threads_per_proc = max(
math.floor(total_threads / mapping.procs_per_machine), 1
)
torch.set_num_threads(self.threads_per_proc)
torch.set_num_interop_threads(1)
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
weights_store_dir,
log_level,
test_after,
train_evaluate_after,
reset_optimizer,
*args
)
logging.info(
"Each proc uses %d threads out of %d.", self.threads_per_proc, total_threads
)
self.run()
import importlib
import json
import logging
import math
import os
from collections import deque
import torch
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.node.Node import Node
class DPSGDNodeFederated(Node):
"""
This class defines the node for federated DPSGD
"""
def run(self):
"""
Start the decentralized learning
"""
while len(self.barrier):
sender, data = self.receive_channel("WORKER_REQUEST")
if "BYE" in data:
logging.info("Received {} from {}".format("BYE", sender))
self.barrier.remove(sender)
break
iteration = data["iteration"]
del data["iteration"]
del data["CHANNEL"]
self.model.load_state_dict(data["params"])
self.sharing._post_step()
self.sharing.communication_round += 1
logging.info(
"Received worker request at node {}, global iteration {}, local round {}".format(
self.uid, iteration, self.participated
)
)
if self.reset_optimizer:
self.optimizer = self.optimizer_class(
self.model.parameters(), **self.optimizer_params
) # Reset optimizer state
self.trainer.reset_optimizer(self.optimizer)
# Perform iteration
logging.info("Starting training iteration")
self.trainer.train(self.dataset)
# Send update to server
to_send = self.sharing.get_data_to_send()
to_send["CHANNEL"] = "DPSGD"
self.communication.send(self.parameter_server_uid, to_send)
if self.participated > 0:
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)),
"r",
) as inf:
results_dict = json.load(inf)
else:
results_dict = {
"train_loss": {},
"test_loss": {},
"test_acc": {},
"total_bytes": {},
"total_meta": {},
"total_data_per_n": {},
}
results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes
if hasattr(self.communication, "total_meta"):
results_dict["total_meta"][
iteration + 1
] = self.communication.total_meta
if hasattr(self.communication, "total_data"):
results_dict["total_data_per_n"][
iteration + 1
] = self.communication.total_data
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)), "w"
) as of:
json.dump(results_dict, of)
self.participated += 1
# only if has participated in learning
if self.participated > 0:
logging.info("Storing final weight")
self.model.dump_weights(self.weights_store_dir, self.uid, iteration)
logging.info("Server disconnected. Process complete!")
def cache_fields(
self,
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
reset_optimizer,
):
"""
Instantiate object field with arguments.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
"""
self.rank = rank
self.machine_id = machine_id
self.graph = graph
self.mapping = mapping
self.uid = self.mapping.get_uid(rank, machine_id)
self.log_dir = log_dir
self.weights_store_dir = weights_store_dir
self.iterations = iterations
self.test_after = test_after
self.train_evaluate_after = train_evaluate_after
self.reset_optimizer = reset_optimizer
self.sent_disconnections = False
logging.info("Rank: %d", self.rank)
logging.info("type(graph): %s", str(type(self.rank)))
logging.info("type(mapping): %s", str(type(self.mapping)))
def init_comm(self, comm_configs):
"""
Instantiate communication module from config.
Parameters
----------
comm_configs : dict
Python dict containing communication config params
"""
comm_module = importlib.import_module(comm_configs["comm_package"])
comm_class = getattr(comm_module, comm_configs["comm_class"])
comm_params = utils.remove_keys(comm_configs, ["comm_package", "comm_class"])
self.addresses_filepath = comm_params.get("addresses_filepath", None)
self.communication = comm_class(
self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params
)
def instantiate(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
*args
):
"""
Construct objects.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations.
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
args : optional
Other arguments
"""
logging.info("Started process.")
self.init_log(log_dir, rank, log_level)
self.cache_fields(
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
reset_optimizer,
)
self.init_dataset_model(config["DATASET"])
self.init_optimizer(config["OPTIMIZER_PARAMS"])
self.init_trainer(config["TRAIN_PARAMS"])
self.init_comm(config["COMMUNICATION"])
self.message_queue = dict()
self.barrier = set()
self.participated = 0
self.init_sharing(config["SHARING"])
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
parameter_server_uid=-1,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
parameter_server_uid: int
The parameter server's uid
args : optional
Other arguments
"""
total_threads = os.cpu_count()
self.threads_per_proc = max(
math.floor(total_threads / mapping.procs_per_machine), 1
)
torch.set_num_threads(self.threads_per_proc)
torch.set_num_interop_threads(1)
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
weights_store_dir,
log_level,
test_after,
train_evaluate_after,
reset_optimizer,
*args
)
logging.info(
"Each proc uses %d threads out of %d.", self.threads_per_proc, total_threads
)
self.message_queue["PEERS"] = deque()
self.parameter_server_uid = parameter_server_uid
self.connect_neighbor(self.parameter_server_uid)
self.wait_for_hello(self.parameter_server_uid)
self.run()
import logging
import math
import os
from collections import deque
import torch
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.node.DPSGDNode import DPSGDNode
class DPSGDWithPeerSampler(DPSGDNode):
"""
This class defines the node for DPSGD
"""
def receive_neighbors(self):
return self.receive_channel("PEERS")[1]["NEIGHBORS"]
def get_neighbors(self, node=None):
logging.info("Requesting neighbors from the peer sampler.")
self.communication.send(
self.peer_sampler_uid,
{
"REQUEST_NEIGHBORS": self.uid,
"iteration": self.iteration,
"CHANNEL": "SERVER_REQUEST",
},
)
my_neighbors = self.receive_neighbors()
logging.info("Neighbors this round: {}".format(my_neighbors))
return my_neighbors
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
peer_sampler_uid=-1,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
args : optional
Other arguments
"""
total_threads = os.cpu_count()
self.threads_per_proc = max(
math.floor(total_threads / mapping.procs_per_machine), 1
)
torch.set_num_threads(self.threads_per_proc)
torch.set_num_interop_threads(1)
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
weights_store_dir,
log_level,
test_after,
train_evaluate_after,
reset_optimizer,
*args
)
logging.info(
"Each proc uses %d threads out of %d.", self.threads_per_proc, total_threads
)
self.message_queue["PEERS"] = deque()
self.peer_sampler_uid = peer_sampler_uid
self.connect_neighbor(self.peer_sampler_uid)
self.wait_for_hello(self.peer_sampler_uid)
self.run()
def disconnect_neighbors(self):
"""
Disconnects all neighbors.
Raises
------
RuntimeError
If received another message while waiting for BYEs
"""
if not self.sent_disconnections:
logging.info("Disconnecting neighbors")
for uid in self.barrier:
self.communication.send(uid, {"BYE": self.uid, "CHANNEL": "DISCONNECT"})
self.communication.send(
self.peer_sampler_uid, {"BYE": self.uid, "CHANNEL": "SERVER_REQUEST"}
)
self.sent_disconnections = True
self.barrier.remove(self.peer_sampler_uid)
while len(self.barrier):
sender, _ = self.receive_disconnect()
self.barrier.remove(sender)
import importlib
import json
import logging
import math
import os
import random
from collections import deque
from matplotlib import pyplot as plt
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.node.Node import Node
class FederatedParameterServer(Node):
"""
This class defines the parameter serving service
"""
def save_plot(self, l, label, title, xlabel, filename):
"""
Save Matplotlib plot. Clears previous plots.
Parameters
----------
l : dict
dict of x -> y. `x` must be castable to int.
label : str
label of the plot. Used for legend.
title : str
Header
xlabel : str
x-axis label
filename : str
Name of file to save the plot as.
"""
plt.clf()
y_axis = [l[key] for key in l.keys()]
x_axis = list(map(int, l.keys()))
plt.plot(x_axis, y_axis, label=label)
plt.xlabel(xlabel)
plt.title(title)
plt.savefig(filename)
def init_log(self, log_dir, log_level, force=True):
"""
Instantiate Logging.
Parameters
----------
log_dir : str
Logging directory
rank : rank : int
Rank of process local to the machine
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
force : bool
Argument to logging.basicConfig()
"""
log_file = os.path.join(log_dir, "ParameterServer.log")
logging.basicConfig(
filename=log_file,
format="[%(asctime)s][%(module)s][%(levelname)s] %(message)s",
level=log_level,
force=force,
)
def cache_fields(
self,
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
):
"""
Instantiate object field with arguments.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
"""
self.rank = rank
self.machine_id = machine_id
self.graph = graph
self.mapping = mapping
self.uid = self.mapping.get_uid(rank, machine_id)
self.log_dir = log_dir
self.iterations = iterations
self.sent_disconnections = False
self.weights_store_dir = weights_store_dir
self.test_after = test_after
self.train_evaluate_after = train_evaluate_after
logging.info("Rank: %d", self.rank)
logging.info("type(graph): %s", str(type(self.rank)))
logging.info("type(mapping): %s", str(type(self.mapping)))
def init_comm(self, comm_configs):
"""
Instantiate communication module from config.
Parameters
----------
comm_configs : dict
Python dict containing communication config params
"""
comm_module = importlib.import_module(comm_configs["comm_package"])
comm_class = getattr(comm_module, comm_configs["comm_class"])
comm_params = utils.remove_keys(comm_configs, ["comm_package", "comm_class"])
self.addresses_filepath = comm_params.get("addresses_filepath", None)
self.communication = comm_class(
self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params
)
def instantiate(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
*args
):
"""
Construct objects.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations.
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
args : optional
Other arguments
"""
logging.info("Started process.")
self.init_log(log_dir, log_level)
self.cache_fields(
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
)
self.message_queue = dict()
self.barrier = set()
self.peer_deques = dict()
self.init_dataset_model(config["DATASET"])
self.init_comm(config["COMMUNICATION"])
self.init_optimizer(config["OPTIMIZER_PARAMS"])
self.init_trainer(config["TRAIN_PARAMS"])
self.my_neighbors = self.graph.get_all_nodes()
self.connect_neighbors()
self.init_sharing(config["SHARING"])
def received_from_all(self):
"""
Check if all current workers have sent the current iteration
Returns
-------
bool
True if required data has been received, False otherwise
"""
for k in self.current_workers:
if (k not in self.peer_deques) or len(self.peer_deques[k]) == 0:
return False
return True
def disconnect_neighbors(self):
"""
Disconnects all neighbors.
Raises
------
RuntimeError
If received another message while waiting for BYEs
"""
if not self.sent_disconnections:
logging.info("Disconnecting neighbors")
for neighbor in self.my_neighbors:
self.communication.send(
neighbor, {"BYE": self.uid, "CHANNEL": "WORKER_REQUEST"}
)
self.barrier.remove(neighbor)
self.sent_disconnections = True
def get_working_nodes(self):
"""
Randomly select set of clients for the current iteration
"""
k = int(math.ceil(len(self.my_neighbors) * self.working_fraction))
return random.sample(self.my_neighbors, k)
def run(self):
"""
Start the federated parameter-serving service.
"""
self.testset = self.dataset.get_testset()
rounds_to_test = self.test_after
rounds_to_train_evaluate = self.train_evaluate_after
global_epoch = 1
change = 1
to_send = dict()
for iteration in range(self.iterations):
self.iteration = iteration
# reset deques after each iteration
self.peer_deques = dict()
# Get workers for this iteration
self.current_workers = self.get_working_nodes()
# Params to send to workers
to_send["params"] = self.model.state_dict()
to_send["CHANNEL"] = "WORKER_REQUEST"
to_send["iteration"] = iteration
# Notify workers
for worker in self.current_workers:
self.communication.send(worker, to_send)
# Receive updates from current workers
while not self.received_from_all():
sender, data = self.receive_channel("DPSGD")
if sender not in self.peer_deques:
self.peer_deques[sender] = deque()
self.peer_deques[sender].append(data)
logging.info("Received from all current workers")
# Average received updates
averaging_deque = dict()
for worker in self.current_workers:
averaging_deque[worker] = self.peer_deques[worker]
self.sharing._pre_step()
self.sharing._averaging_server(averaging_deque)
if iteration:
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)),
"r",
) as inf:
results_dict = json.load(inf)
else:
results_dict = {
"train_loss": {},
"test_loss": {},
"test_acc": {},
"total_bytes": {},
"total_meta": {},
"total_data_per_n": {},
}
results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes
if hasattr(self.communication, "total_meta"):
results_dict["total_meta"][
iteration + 1
] = self.communication.total_meta
if hasattr(self.communication, "total_data"):
results_dict["total_data_per_n"][
iteration + 1
] = self.communication.total_data
rounds_to_train_evaluate -= 1
if rounds_to_train_evaluate == 0:
logging.info("Evaluating on train set.")
rounds_to_train_evaluate = self.train_evaluate_after * change
loss_after_sharing = self.trainer.eval_loss(self.dataset)
results_dict["train_loss"][iteration + 1] = loss_after_sharing
self.save_plot(
results_dict["train_loss"],
"train_loss",
"Training Loss",
"Communication Rounds",
os.path.join(self.log_dir, "{}_train_loss.png".format(self.rank)),
)
rounds_to_test -= 1
if self.dataset.__testing__ and rounds_to_test == 0:
rounds_to_test = self.test_after * change
logging.info("Evaluating on test set.")
ta, tl = self.dataset.test(self.model, self.loss)
results_dict["test_acc"][iteration + 1] = ta
results_dict["test_loss"][iteration + 1] = tl
if global_epoch == 49:
change *= 2
global_epoch += change
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)), "w"
) as of:
json.dump(results_dict, of)
if self.model.shared_parameters_counter is not None:
logging.info("Saving the shared parameter counts")
with open(
os.path.join(
self.log_dir, "{}_shared_parameters.json".format(self.rank)
),
"w",
) as of:
json.dump(self.model.shared_parameters_counter.numpy().tolist(), of)
self.disconnect_neighbors()
logging.info("Storing final weight")
self.model.dump_weights(self.weights_store_dir, self.uid, iteration)
logging.info("All neighbors disconnected. Process complete!")
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
working_fraction=1.0,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
working_fraction : float
Percentage of nodes participating in one global iteration
args : optional
Other arguments
"""
super().__init__(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
log_level,
*args
)
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
weights_store_dir,
log_level,
test_after,
train_evaluate_after,
*args
)
self.working_fraction = working_fraction
random.seed(self.mapping.get_uid(self.rank, self.machine_id))
self.run()
logging.info("Parameter Server exiting")
import logging
import math
import os
import queue
from random import Random
from threading import Lock, Thread
import numpy as np
import torch
from numpy.linalg import norm
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.node.OverlayNode import OverlayNode
class KNN(OverlayNode):
"""
This class defines the node for KNN Learning Node
"""
def similarityMetric(self, candidate):
logging.debug("A: {}".format(self.othersInfo[self.uid]))
logging.debug("B: {}".format(self.othersInfo[candidate]))
A = np.array(self.othersInfo[self.uid])
B = np.array(self.othersInfo[candidate])
return np.dot(A, B) / (norm(A) * norm(B))
def get_most_similar(self, candidates, to_keep=4):
if len(candidates) <= to_keep:
return candidates
cur_candidates = dict()
for i in candidates:
simil = round(self.similarityMetric(i), 3)
if simil not in cur_candidates:
cur_candidates[simil] = []
cur_candidates[simil].append(i)
similarity_scores = list(cur_candidates.keys())
similarity_scores.sort()
left_to_keep = to_keep
return_result = set()
for i in similarity_scores:
if left_to_keep >= len(cur_candidates[i]):
return_result.update(cur_candidates[i])
left_to_keep -= len(cur_candidates[i])
elif left_to_keep > 0:
return_result.update(
list(self.rng.sample(cur_candidates[i], left_to_keep))
)
left_to_keep = 0
break
else:
break
return return_result
def create_message_to_send(
self,
channel="KNNConstr",
boolean_flags=[],
add_my_info=False,
add_neighbor_info=False,
):
message = {"CHANNEL": channel, "KNNRound": self.knn_round}
for x in boolean_flags:
message[x] = True
if add_my_info:
message[self.uid] = self.othersInfo[self.uid]
if add_neighbor_info:
for neighbors in self.out_edges:
if neighbors in self.othersInfo:
message[neighbors] = self.othersInfo[neighbors]
return message
def receive_KNN_message(self):
return self.receive_channel("KNNConstr", block=False)
def process_init_receive(self, message):
self.mutex.acquire()
if "RESPONSE" in message[1]:
self.num_initializations += 1
else:
self.communication.send(
message[0],
self.create_message_to_send(
boolean_flags=["INIT", "RESPONSE"], add_my_info=True
),
)
x = (
message[0],
utils.remove_keys(message[1], ["RESPONSE", "INIT", "CHANNEL", "KNNRound"]),
)
self.othersInfo.update(x[1])
self.mutex.release()
def remove_meta_from_message(self, message):
return (
message[0],
utils.remove_keys(message[1], ["RESPONSE", "INIT", "CHANNEL", "KNNRound"]),
)
def process_candidates_without_lock(self, current_candidates, message):
if not self.exit_receiver:
message = (
message[0],
utils.remove_keys(
message[1], ["CHANNEL", "RESPONSE", "INIT", "KNNRound"]
),
)
self.othersInfo.update(message[1])
new_candidates = set(message[1].keys())
current_candidates = current_candidates.union(new_candidates)
if self.uid in current_candidates:
current_candidates.remove(self.uid)
self.out_edges = self.get_most_similar(current_candidates)
def send_response(self, message, add_neighbor_info=False, process_candidates=False):
self.mutex.acquire()
logging.debug("Responding to {}".format(message[0]))
self.communication.send(
message[0],
self.create_message_to_send(
boolean_flags=["RESPONSE"],
add_my_info=True,
add_neighbor_info=add_neighbor_info,
),
)
if process_candidates:
self.process_candidates_without_lock(set(self.out_edges), message)
self.mutex.release()
def receiver_thread(self):
knnBYEs = set()
self.num_initializations = 0
waiting_queue = queue.Queue()
while True:
if len(knnBYEs) == self.mapping.get_n_procs() - 1:
self.mutex.acquire()
if self.exit_receiver:
self.mutex.release()
logging.debug("Exiting thread")
return
self.mutex.release()
if self.num_initializations < self.initial_neighbors:
x = self.receive_KNN_message()
if x == None:
continue
elif "INIT" in x[1]:
self.process_init_receive(x)
else:
waiting_queue.put(x)
else:
logging.debug("Waiting for messages")
if waiting_queue.empty():
x = self.receive_KNN_message()
if x == None:
continue
else:
x = waiting_queue.get()
if "INIT" in x[1]:
logging.debug("A past INIT Message received from {}".format(x[0]))
self.process_init_receive(x)
elif "RESPONSE" in x[1]:
logging.debug(
"A response message received from {} from KNNRound {}".format(
x[0], x[1]["KNNRound"]
)
)
x = self.remove_meta_from_message(x)
self.responseQueue.put(x)
elif "RANDOM_DISCOVERY" in x[1]:
logging.debug(
"A Random Discovery message received from {} from KNNRound {}".format(
x[0], x[1]["KNNRound"]
)
)
self.send_response(
x, add_neighbor_info=False, process_candidates=False
)
elif "KNNBYE" in x[1]:
self.mutex.acquire()
knnBYEs.add(x[0])
logging.debug("{} KNN Byes received".format(knnBYEs))
if self.uid in x[1]["CLOSE"]:
self.in_edges.add(x[0])
self.mutex.release()
else:
logging.debug(
"A KNN sharing message received from {} from KNNRound {}".format(
x[0], x[1]["KNNRound"]
)
)
self.send_response(
x, add_neighbor_info=True, process_candidates=True
)
def build_topology(self, rounds=30, random_nodes=4):
self.knn_round = 0
self.exit_receiver = False
t = Thread(target=self.receiver_thread)
t.start()
# Initializations : Send my dataset info to others
self.mutex.acquire()
initial_KNN_message = self.create_message_to_send(
boolean_flags=["INIT"], add_my_info=True
)
for x in self.out_edges:
self.communication.send(x, initial_KNN_message)
self.mutex.release()
for round in range(rounds):
self.knn_round = round
logging.info("Starting KNN Round {}".format(round))
self.mutex.acquire()
rand_neighbor = self.rng.choice(list(self.out_edges))
logging.debug("Random neighbor: {}".format(rand_neighbor))
self.communication.send(
rand_neighbor,
self.create_message_to_send(add_my_info=True, add_neighbor_info=True),
)
self.mutex.release()
logging.debug("Waiting for knn response from {}".format(rand_neighbor))
response = self.responseQueue.get(block=True)
logging.debug("Got response from random neighbor")
self.mutex.acquire()
random_candidates = set(
self.rng.sample(list(range(self.mapping.get_n_procs())), random_nodes)
)
req_responses = 0
for rc in random_candidates:
logging.debug("Current random discovery: {}".format(rc))
if rc not in self.othersInfo and rc != self.uid:
logging.debug("Sending discovery request to {}".format(rc))
self.communication.send(
rc,
self.create_message_to_send(boolean_flags=["RANDOM_DISCOVERY"]),
)
req_responses += 1
self.mutex.release()
while req_responses > 0:
logging.debug(
"Waiting for {} random discovery responses.".format(req_responses)
)
req_responses -= 1
random_discovery_response = self.responseQueue.get(block=True)
logging.debug(
"Received discovery response from {}".format(
random_discovery_response[0]
)
)
self.mutex.acquire()
self.othersInfo.update(random_discovery_response[1])
self.mutex.release()
self.mutex.acquire()
self.process_candidates_without_lock(
random_candidates.union(self.out_edges), response
)
self.mutex.release()
logging.info("Completed KNN Round {}".format(round))
logging.debug("OutNodes: {}".format(self.out_edges))
# Send out_edges and BYE to all
to_send = self.create_message_to_send(boolean_flags=["KNNBYE"])
logging.info("Sending KNNByes")
self.mutex.acquire()
self.exit_receiver = True
to_send["CLOSE"] = list(self.out_edges) # Optimize to only send Yes/No
for receiver in range(self.mapping.get_n_procs()):
if receiver != self.uid:
self.communication.send(receiver, to_send)
self.mutex.release()
logging.info("KNNByes Sent")
t.join()
logging.info("Receiver Thread Returned")
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
initial_neighbors=4,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
args : optional
Other arguments
"""
total_threads = os.cpu_count()
self.threads_per_proc = max(
math.floor(total_threads / mapping.procs_per_machine), 1
)
torch.set_num_threads(self.threads_per_proc)
torch.set_num_interop_threads(1)
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
weights_store_dir,
log_level,
test_after,
train_evaluate_after,
reset_optimizer,
*args
)
self.rng = Random()
self.rng.seed(self.uid + 100)
self.initial_neighbors = initial_neighbors
self.in_edges = set()
self.out_edges = set(
self.rng.sample(
list(self.graph.neighbors(self.uid)), self.initial_neighbors
)
)
self.responseQueue = queue.Queue()
self.mutex = Lock()
self.othersInfo = {self.uid: list(self.dataset.get_label_distribution())}
# ld = self.dataset.get_label_distribution()
# ld_keys = sorted(list(ld.keys()))
# self.othersInfo = {self.uid: []}
# for key in range(max(ld_keys) + 1):
# if key in ld:
# self.othersInfo[self.uid].append(ld[key])
# else:
# self.othersInfo[self.uid].append(0)
logging.info("Label Distributions: {}".format(self.othersInfo))
logging.info(
"Each proc uses %d threads out of %d.", self.threads_per_proc, total_threads
)
self.run()
import importlib
import json
import logging
import math
import os
from collections import deque
from matplotlib import pyplot as plt
import torch
from decentralizepy import utils
from decentralizepy.communication.Communication import Communication
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
......@@ -14,114 +14,242 @@ from decentralizepy.mappings.Mapping import Mapping
class Node:
"""
This class defines the node (entity that performs learning, sharing and communication).
"""
def save_plot(self, l, label, title, xlabel, filename):
plt.clf()
y_axis = [l[key] for key in l.keys()]
x_axis = list(map(int, l.keys()))
plt.plot(x_axis, y_axis, label=label)
plt.xlabel(xlabel)
plt.title(title)
plt.savefig(filename)
def connect_neighbor(self, neighbor):
"""
Connects given neighbor. Sends HELLO.
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
log_level=logging.INFO,
test_after=5,
*args
):
"""
Constructor
logging.info("Sending connection request to {}".format(neighbor))
self.communication.init_connection(neighbor)
self.communication.send(neighbor, {"HELLO": self.uid, "CHANNEL": "CONNECT"})
def receive_channel(self, channel, block=True):
if channel not in self.message_queue:
self.message_queue[channel] = deque()
if len(self.message_queue[channel]) > 0:
return self.message_queue[channel].popleft()
else:
x = self.communication.receive(block=block)
if x == None:
assert not block
return None
sender, recv = x
logging.info(
"Received some message from {} with CHANNEL: {}".format(
sender, recv["CHANNEL"]
)
)
assert "CHANNEL" in recv
while recv["CHANNEL"] != channel:
if recv["CHANNEL"] not in self.message_queue:
self.message_queue[recv["CHANNEL"]] = deque()
self.message_queue[recv["CHANNEL"]].append((sender, recv))
x = self.communication.receive(block=block)
if x == None:
assert not block
return None
sender, recv = x
logging.info(
"Received some message from {} with CHANNEL: {}".format(
sender, recv["CHANNEL"]
)
)
return (sender, recv)
def receive_hello(self):
return self.receive_channel("CONNECT")
def wait_for_hello(self, neighbor):
"""
Waits for HELLO.
Caches any data received while waiting for HELLOs.
Raises
------
RuntimeError
If received BYE while waiting for HELLO
"""
while neighbor not in self.barrier:
logging.info("Waiting HELLO from {}".format(neighbor))
sender, _ = self.receive_hello()
logging.info("Received HELLO from {}".format(sender))
self.barrier.add(sender)
def connect_neighbors(self):
"""
Connects all neighbors. Sends HELLO. Waits for HELLO.
Caches any data received while waiting for HELLOs.
Raises
------
RuntimeError
If received BYE while waiting for HELLO
"""
logging.info("Sending connection request to all neighbors")
wait_acknowledgements = []
for neighbor in self.my_neighbors:
if not self.communication.already_connected(neighbor):
self.connect_neighbor(neighbor)
wait_acknowledgements.append(neighbor)
for neighbor in wait_acknowledgements:
self.wait_for_hello(neighbor)
def receive_disconnect(self):
return self.receive_channel("DISCONNECT")
def disconnect_neighbors(self):
"""
Disconnects all neighbors.
Raises
------
RuntimeError
If received another message while waiting for BYEs
"""
if not self.sent_disconnections:
logging.info("Disconnecting neighbors")
for uid in self.barrier:
self.communication.send(uid, {"BYE": self.uid, "CHANNEL": "DISCONNECT"})
self.sent_disconnections = True
while len(self.barrier):
sender, _ = self.receive_disconnect()
self.barrier.remove(sender)
def init_log(self, log_dir, rank, log_level, force=True):
"""
Instantiate Logging.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
n_procs_local : int
Number of processes on current machine
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
log_dir : str
Logging directory
rank : rank : int
Rank of process local to the machine
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
args : optional
Other arguments
force : bool
Argument to logging.basicConfig()
"""
log_file = os.path.join(log_dir, str(rank) + ".log")
logging.basicConfig(
filename=log_file,
format="[%(asctime)s][%(module)s][%(levelname)s] %(message)s",
level=log_level,
force=True,
force=force,
)
logging.info("Started process.")
def cache_fields(
self,
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
):
"""
Instantiate object field with arguments.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
"""
self.rank = rank
self.machine_id = machine_id
self.graph = graph
self.mapping = mapping
self.uid = self.mapping.get_uid(rank, machine_id)
self.log_dir = log_dir
self.iterations = iterations
self.sent_disconnections = False
logging.info("Rank: %d", self.rank)
logging.info("type(graph): %s", str(type(self.rank)))
logging.info("type(mapping): %s", str(type(self.mapping)))
def init_dataset_model(self, dataset_configs):
"""
Instantiate dataset and model from config.
logging.debug("Rank: %d", self.rank)
logging.debug("type(graph): %s", str(type(self.rank)))
logging.debug("type(mapping): %s", str(type(self.mapping)))
Parameters
----------
dataset_configs : dict
Python dict containing dataset config params
dataset_configs = config["DATASET"]
"""
dataset_module = importlib.import_module(dataset_configs["dataset_package"])
dataset_class = getattr(dataset_module, dataset_configs["dataset_class"])
dataset_params = utils.remove_keys(
dataset_configs, ["dataset_package", "dataset_class", "model_class"]
self.dataset_class = getattr(dataset_module, dataset_configs["dataset_class"])
random_seed = (
dataset_configs["random_seed"] if "random_seed" in dataset_configs else 97
)
torch.manual_seed(random_seed)
self.dataset_params = utils.remove_keys(
dataset_configs,
["dataset_package", "dataset_class", "model_class", "random_seed"],
)
self.dataset = dataset_class(
self.rank, self.machine_id, self.mapping, **dataset_params
self.dataset = self.dataset_class(
self.rank, self.machine_id, self.mapping, **self.dataset_params
)
logging.info("Dataset instantiation complete.")
model_class = getattr(dataset_module, dataset_configs["model_class"])
self.model = model_class()
self.model_class = getattr(dataset_module, dataset_configs["model_class"])
self.model = self.model_class()
optimizer_configs = config["OPTIMIZER_PARAMS"]
def init_optimizer(self, optimizer_configs):
"""
Instantiate optimizer from config.
Parameters
----------
optimizer_configs : dict
Python dict containing optimizer config params
"""
optimizer_module = importlib.import_module(
optimizer_configs["optimizer_package"]
)
optimizer_class = getattr(
self.optimizer_class = getattr(
optimizer_module, optimizer_configs["optimizer_class"]
)
optimizer_params = utils.remove_keys(
self.optimizer_params = utils.remove_keys(
optimizer_configs, ["optimizer_package", "optimizer_class"]
)
self.optimizer = optimizer_class(self.model.parameters(), **optimizer_params)
self.optimizer = self.optimizer_class(
self.model.parameters(), **self.optimizer_params
)
train_configs = config["TRAIN_PARAMS"]
def init_trainer(self, train_configs):
"""
Instantiate training module and loss from config.
Parameters
----------
train_configs : dict
Python dict containing training config params
"""
train_module = importlib.import_module(train_configs["training_package"])
train_class = getattr(train_module, train_configs["training_class"])
......@@ -143,19 +271,44 @@ class Node:
],
)
self.trainer = train_class(
self.model, self.optimizer, self.loss, **train_params
self.rank,
self.machine_id,
self.mapping,
self.model,
self.optimizer,
self.loss,
self.log_dir,
**train_params
)
comm_configs = config["COMMUNICATION"]
def init_comm(self, comm_configs):
"""
Instantiate communication module from config.
Parameters
----------
comm_configs : dict
Python dict containing communication config params
"""
comm_module = importlib.import_module(comm_configs["comm_package"])
comm_class = getattr(comm_module, comm_configs["comm_class"])
comm_params = utils.remove_keys(comm_configs, ["comm_package", "comm_class"])
self.addresses_filepath = comm_params.get("addresses_filepath", None)
self.communication = comm_class(
self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params
)
self.communication.connect_neighbors(self.graph.neighbors(self.uid))
sharing_configs = config["SHARING"]
def init_sharing(self, sharing_configs):
"""
Instantiate sharing module from config.
Parameters
----------
sharing_configs : dict
Python dict containing sharing config params
"""
sharing_package = importlib.import_module(sharing_configs["sharing_package"])
sharing_class = getattr(sharing_package, sharing_configs["sharing_class"])
sharing_params = utils.remove_keys(
......@@ -173,73 +326,142 @@ class Node:
**sharing_params
)
self.testset = self.dataset.get_testset()
rounds_to_test = test_after
for iteration in range(iterations):
logging.info("Starting training iteration: %d", iteration)
self.trainer.train(self.dataset)
self.sharing.step()
self.optimizer = optimizer_class(
self.model.parameters(), **optimizer_params
) # Reset optimizer state
self.trainer.reset_optimizer(self.optimizer)
loss_after_sharing = self.trainer.eval_loss(self.dataset)
if iteration:
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)),
"r",
) as inf:
results_dict = json.load(inf)
else:
results_dict = {
"train_loss": {},
"test_loss": {},
"test_acc": {},
"total_bytes": {},
}
results_dict["train_loss"][iteration + 1] = loss_after_sharing
results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes
self.save_plot(
results_dict["train_loss"],
"train_loss",
"Training Loss",
"Communication Rounds",
os.path.join(log_dir, "{}_train_loss.png".format(self.rank)),
)
def instantiate(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
log_level=logging.INFO,
*args
):
"""
Construct objects.
rounds_to_test -= 1
if self.dataset.__testing__ and rounds_to_test == 0:
logging.info("Evaluating on test set.")
rounds_to_test = test_after
ta, tl = self.dataset.test(self.model, self.loss)
results_dict["test_acc"][iteration + 1] = ta
results_dict["test_loss"][iteration + 1] = tl
self.save_plot(
results_dict["test_loss"],
"test_loss",
"Testing Loss",
"Communication Rounds",
os.path.join(log_dir, "{}_test_loss.png".format(self.rank)),
)
self.save_plot(
results_dict["test_acc"],
"test_acc",
"Testing Accuracy",
"Communication Rounds",
os.path.join(log_dir, "{}_test_acc.png".format(self.rank)),
)
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations.
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
args : optional
Other arguments
"""
logging.info("Started process.")
self.init_log(log_dir, rank, log_level)
self.cache_fields(
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
)
self.init_dataset_model(config["DATASET"])
self.init_optimizer(config["OPTIMIZER_PARAMS"])
self.init_trainer(config["TRAIN_PARAMS"])
self.init_comm(config["COMMUNICATION"])
self.message_queue = dict()
self.barrier = set()
self.my_neighbors = self.graph.neighbors(self.uid)
with open(
os.path.join(log_dir, "{}_results.json".format(self.rank)), "w"
) as of:
json.dump(results_dict, of)
self.init_sharing(config["SHARING"])
self.communication.disconnect_neighbors()
def run(self):
"""
Start the decentralized learning
"""
raise NotImplementedError
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
log_level=logging.INFO,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
args : optional
Other arguments
"""
total_threads = os.cpu_count()
self.threads_per_proc = max(
math.floor(total_threads / mapping.procs_per_machine), 1
)
torch.set_num_threads(self.threads_per_proc)
torch.set_num_interop_threads(1)
# self.instantiate(
# rank,
# machine_id,
# mapping,
# graph,
# config,
# iterations,
# log_dir,
# log_level,
# *args
# )
logging.info(
"Each proc uses %d threads out of %d.", self.threads_per_proc, total_threads
)
import importlib
import json
import logging
import math
import os
from collections import deque
import torch
from matplotlib import pyplot as plt
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.node.Node import Node
class OverlayNode(Node):
"""
This class defines the node on overlay graph
"""
def save_plot(self, l, label, title, xlabel, filename):
"""
Save Matplotlib plot. Clears previous plots.
Parameters
----------
l : dict
dict of x -> y. `x` must be castable to int.
label : str
label of the plot. Used for legend.
title : str
Header
xlabel : str
x-axis label
filename : str
Name of file to save the plot as.
"""
plt.clf()
y_axis = [l[key] for key in l.keys()]
x_axis = list(map(int, l.keys()))
plt.plot(x_axis, y_axis, label=label)
plt.xlabel(xlabel)
plt.title(title)
plt.savefig(filename)
def get_neighbors(self, node=None):
return self.my_neighbors
def receive_DPSGD(self):
return self.receive_channel("DPSGD")
def build_topology(self):
pass
def run(self):
"""
Start the decentralized learning
"""
self.testset = self.dataset.get_testset()
rounds_to_test = self.test_after
rounds_to_train_evaluate = self.train_evaluate_after
global_epoch = 1
change = 1
self.connect_neighbors()
logging.info("Connected to all neighbors")
self.build_topology()
logging.info("OutNodes: {}".format(self.out_edges))
logging.info("InNodes: {}".format(self.in_edges))
logging.info("Unifying edges")
self.out_edges = self.out_edges.union(self.in_edges)
self.my_neighbors = self.in_edges = set(self.out_edges)
logging.info("Total number of neighbor: {}".format(len(self.my_neighbors)))
for iteration in range(self.iterations):
logging.info("Starting training iteration: %d", iteration)
rounds_to_train_evaluate -= 1
rounds_to_test -= 1
self.iteration = iteration
self.trainer.train(self.dataset)
to_send = self.sharing.get_data_to_send()
to_send["CHANNEL"] = "DPSGD"
to_send["degree"] = len(self.in_edges)
assert len(self.out_edges) != 0
assert len(self.in_edges) != 0
for neighbor in self.out_edges:
self.communication.send(neighbor, to_send)
while not self.received_from_all():
sender, data = self.receive_DPSGD()
logging.info(
"Received Model from {} of iteration {}".format(
sender, data["iteration"]
)
)
if sender not in self.peer_deques:
self.peer_deques[sender] = deque()
self.peer_deques[sender].append(data)
averaging_deque = dict()
for neighbor in self.in_edges:
averaging_deque[neighbor] = self.peer_deques[neighbor]
self.sharing._averaging(averaging_deque)
if self.reset_optimizer:
self.optimizer = self.optimizer_class(
self.model.parameters(), **self.optimizer_params
) # Reset optimizer state
self.trainer.reset_optimizer(self.optimizer)
if iteration:
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)),
"r",
) as inf:
results_dict = json.load(inf)
else:
results_dict = {
"train_loss": {},
"test_loss": {},
"test_acc": {},
"total_bytes": {},
"total_meta": {},
"total_data_per_n": {},
}
results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes
if hasattr(self.communication, "total_meta"):
results_dict["total_meta"][
iteration + 1
] = self.communication.total_meta
if hasattr(self.communication, "total_data"):
results_dict["total_data_per_n"][
iteration + 1
] = self.communication.total_data
if rounds_to_train_evaluate == 0:
logging.info("Evaluating on train set.")
rounds_to_train_evaluate = self.train_evaluate_after * change
loss_after_sharing = self.trainer.eval_loss(self.dataset)
results_dict["train_loss"][iteration + 1] = loss_after_sharing
self.save_plot(
results_dict["train_loss"],
"train_loss",
"Training Loss",
"Communication Rounds",
os.path.join(self.log_dir, "{}_train_loss.png".format(self.rank)),
)
if self.dataset.__testing__ and rounds_to_test == 0:
rounds_to_test = self.test_after * change
logging.info("Evaluating on test set.")
ta, tl = self.dataset.test(self.model, self.loss)
results_dict["test_acc"][iteration + 1] = ta
results_dict["test_loss"][iteration + 1] = tl
if global_epoch == 49:
change *= 2
global_epoch += change
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)), "w"
) as of:
json.dump(results_dict, of)
# if self.model.shared_parameters_counter is not None:
# logging.info("Saving the shared parameter counts")
# with open(
# os.path.join(
# self.log_dir, "{}_shared_parameters.json".format(self.rank)
# ),
# "w",
# ) as of:
# json.dump(self.model.shared_parameters_counter.numpy().tolist(), of)
self.disconnect_neighbors()
# logging.info("Storing final weight")
# self.model.dump_weights(self.weights_store_dir, self.uid, iteration)
logging.info("All neighbors disconnected. Process complete!")
def cache_fields(
self,
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
reset_optimizer,
):
"""
Instantiate object field with arguments.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
"""
self.rank = rank
self.machine_id = machine_id
self.graph = graph
self.mapping = mapping
self.uid = self.mapping.get_uid(rank, machine_id)
self.log_dir = log_dir
self.weights_store_dir = weights_store_dir
self.iterations = iterations
self.test_after = test_after
self.train_evaluate_after = train_evaluate_after
self.reset_optimizer = reset_optimizer
self.sent_disconnections = False
logging.info("Rank: %d", self.rank)
logging.info("type(graph): %s", str(type(self.rank)))
logging.info("type(mapping): %s", str(type(self.mapping)))
def init_comm(self, comm_configs):
"""
Instantiate communication module from config.
Parameters
----------
comm_configs : dict
Python dict containing communication config params
"""
comm_module = importlib.import_module(comm_configs["comm_package"])
comm_class = getattr(comm_module, comm_configs["comm_class"])
comm_params = utils.remove_keys(comm_configs, ["comm_package", "comm_class"])
self.addresses_filepath = comm_params.get("addresses_filepath", None)
self.communication = comm_class(
self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params
)
def instantiate(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
*args
):
"""
Construct objects.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations.
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
args : optional
Other arguments
"""
logging.info("Started process.")
self.init_log(log_dir, rank, log_level)
self.cache_fields(
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
reset_optimizer,
)
self.init_dataset_model(config["DATASET"])
self.init_optimizer(config["OPTIMIZER_PARAMS"])
self.init_trainer(config["TRAIN_PARAMS"])
self.init_comm(config["COMMUNICATION"])
self.message_queue = dict()
self.barrier = set()
self.my_neighbors = self.graph.neighbors(self.uid)
self.init_sharing(config["SHARING"])
self.peer_deques = dict()
self.connect_neighbors()
def received_from_all(self):
"""
Check if all neighbors have sent the current iteration
Returns
-------
bool
True if required data has been received, False otherwise
"""
for k in self.in_edges:
if (k not in self.peer_deques) or len(self.peer_deques[k]) == 0:
return False
return True
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
args : optional
Other arguments
"""
total_threads = os.cpu_count()
self.threads_per_proc = max(
math.floor(total_threads / mapping.procs_per_machine), 1
)
torch.set_num_threads(self.threads_per_proc)
torch.set_num_interop_threads(1)
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
weights_store_dir,
log_level,
test_after,
train_evaluate_after,
reset_optimizer,
*args
)
self.in_edges = set()
self.out_edges = set()
logging.info(
"Each proc uses %d threads out of %d.", self.threads_per_proc, total_threads
)
self.run()
import importlib
import logging
import os
from collections import deque
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.node.Node import Node
class PeerSampler(Node):
"""
This class defines the peer sampling service
"""
def init_log(self, log_dir, log_level, force=True):
"""
Instantiate Logging.
Parameters
----------
log_dir : str
Logging directory
rank : rank : int
Rank of process local to the machine
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
force : bool
Argument to logging.basicConfig()
"""
log_file = os.path.join(log_dir, "PeerSampler.log")
logging.basicConfig(
filename=log_file,
format="[%(asctime)s][%(module)s][%(levelname)s] %(message)s",
level=log_level,
force=force,
)
def cache_fields(
self,
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
):
"""
Instantiate object field with arguments.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
"""
self.rank = rank
self.machine_id = machine_id
self.graph = graph
self.mapping = mapping
self.uid = self.mapping.get_uid(rank, machine_id)
self.log_dir = log_dir
self.iterations = iterations
self.sent_disconnections = False
logging.info("Rank: %d", self.rank)
logging.info("type(graph): %s", str(type(self.rank)))
logging.info("type(mapping): %s", str(type(self.mapping)))
def init_comm(self, comm_configs):
"""
Instantiate communication module from config.
Parameters
----------
comm_configs : dict
Python dict containing communication config params
"""
comm_module = importlib.import_module(comm_configs["comm_package"])
comm_class = getattr(comm_module, comm_configs["comm_class"])
comm_params = utils.remove_keys(comm_configs, ["comm_package", "comm_class"])
self.addresses_filepath = comm_params.get("addresses_filepath", None)
self.communication = comm_class(
self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params
)
def instantiate(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
log_level=logging.INFO,
*args
):
"""
Construct objects.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations.
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
args : optional
Other arguments
"""
logging.info("Started process.")
self.init_log(log_dir, log_level)
self.cache_fields(
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
)
self.message_queue = dict()
self.barrier = set()
self.init_comm(config["COMMUNICATION"])
self.my_neighbors = self.graph.get_all_nodes()
self.connect_neighbors()
def get_neighbors(self, node, iteration=None):
return self.graph.neighbors(node)
def receive_server_request(self):
return self.receive_channel("SERVER_REQUEST")
def run(self):
"""
Start the peer-sampling service.
"""
while len(self.barrier) > 0:
sender, data = self.receive_server_request()
if "BYE" in data:
logging.debug("Received {} from {}".format("BYE", sender))
self.barrier.remove(sender)
elif "REQUEST_NEIGHBORS" in data:
logging.debug("Received {} from {}".format("Request", sender))
if "iteration" in data:
resp = {
"NEIGHBORS": self.get_neighbors(sender, data["iteration"]),
"CHANNEL": "PEERS",
}
else:
resp = {"NEIGHBORS": self.get_neighbors(sender), "CHANNEL": "PEERS"}
self.communication.send(sender, resp)
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
log_level=logging.INFO,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
args : optional
Other arguments
"""
super().__init__(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
log_level,
*args
)
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
log_level,
*args
)
self.run()
logging.info("Peer Sampler exiting")
import logging
from collections import deque
from decentralizepy.graphs.Graph import Graph
from decentralizepy.graphs.Regular import Regular
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.node.PeerSampler import PeerSampler
class PeerSamplerDynamic(PeerSampler):
"""
This class defines the peer sampling service
"""
def get_neighbors(self, node, iteration=None):
if iteration != None:
if iteration > self.iteration:
logging.info(
"iteration, self.iteration: {}, {}".format(
iteration, self.iteration
)
)
assert iteration == self.iteration + 1
self.iteration = iteration
self.graphs.append(Regular(self.graph.n_procs, self.graph_degree))
return self.graphs[iteration].neighbors(node)
else:
return self.graph.neighbors(node)
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
log_level=logging.INFO,
graph_degree=4,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
args : optional
Other arguments
"""
self.iteration = -1
self.graphs = []
self.graph_degree = graph_degree
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
log_level,
*args
)
self.run()
logging.info("Peer Sampler exiting")
import logging
from collections import OrderedDict
import torch
from decentralizepy.sharing.Sharing import Sharing
def zeros_like_state_dict(state_dict):
"""
Creates a new state dictionary such that it has same
layers (name and size) as the input state dictionary, but all values
are zero
Parameters
----------
state_dict: dict[str, torch.Tensor]
"""
result_dict = OrderedDict()
for tensor_name, tensor_values in state_dict.items():
result_dict[tensor_name] = torch.zeros_like(tensor_values)
return result_dict
def get_dict_keys_and_check_matching(dict_1, dict_2):
"""
Checks if keys of the two dictionaries match and
reutrns them if they do, otherwise raises ValueError
Parameters
----------
dict_1: dict
dict_2: dict
Raises
------
ValueError
If the keys of the dictionaries don't match
"""
keys = dict_1.keys()
if set(keys).difference(set(dict_2.keys())):
raise ValueError("Dictionaries must have matching keys")
return keys
def subtract_state_dicts(_1, _2):
"""
Subtracts one state dictionary from another
Parameters
----------
_1: dict[str, torch.Tensor]
Minuend state dictionary
_2: dict[str, torch.Tensor]
Subtrahend state dictionary
Raises
------
ValueError
If the keys of the state dictionaries don't match
"""
keys = get_dict_keys_and_check_matching(_1, _2)
result_dict = OrderedDict()
for key in keys:
# Size checking is done by torch during the subtraction
result_dict[key] = _1[key] - _2[key]
return result_dict
def self_add_state_dict(_1, _2, constant=1.0):
"""
Scales one state dictionary by a constant and
adds it directly to another minimizing copies
created. Equivalent to operation `_1 += constant * _2`
Parameters
----------
_1: dict[str, torch.Tensor]
State dictionary
_2: dict[str, torch.Tensor]
State dictionary
constant: float
Constant to scale _2 with
Raises
------
ValueError
If the keys of the state dictionaries don't match
"""
keys = get_dict_keys_and_check_matching(_1, _2)
for key in keys:
# Size checking is done by torch during the subtraction
_1[key] += constant * _2[key]
def flatten_state_dict(state_dict):
"""
Transforms state dictionary into a flat tensor
by flattening and concatenating tensors of the
state dictionary.
Note: changes made to the result won't affect state dictionary
Parameters
----------
state_dict : OrderedDict[str, torch.tensor]
A state dictionary to flatten
"""
return torch.cat([tensor.flatten() for tensor in state_dict.values()], axis=0)
def unflatten_state_dict(flat_tensor, reference_state_dict):
"""
Transforms a falt tensor into a state dictionary
by using another state dictionary as a reference
for size and names of the tensors. Assumes
that the number of elements of the flat tensor
is the same as the number of elements in the
reference state dictionary.
This operation is inverse operation to flatten_state_dict
Note: changes made to the result will affect the flat tensor
Parameters
----------
flat_tensor : torch.tensor
A 1-dim tensor
reference_state_dict : OrderedDict[str, torch.tensor]
A state dictionary used as a reference for tensor names
and shapes of the result
"""
result = OrderedDict()
start_index = 0
for tensor_name, tensor in reference_state_dict.items():
end_index = start_index + tensor.numel()
result[tensor_name] = flat_tensor[start_index:end_index].reshape(tensor.shape)
start_index = end_index
return result
def serialize_sparse_tensor(tensor):
"""
Serializes sparse tensor by flattening it and
returning values and indices of it that are not 0
Parameters
----------
tensor: torch.Tensor
"""
flat = tensor.flatten()
indices = flat.nonzero(as_tuple=True)[0]
values = flat[indices]
return values, indices
def deserialize_sparse_tensor(values, indices, shape):
"""
Deserializes tensor from its non-zero values and indices
in flattened form and original shape of the tensor.
Parameters
----------
values: torch.Tensor
Non-zero entries of flattened original tensor
indices: torch.Tensor
Respective indices of non-zero entries of flattened original tensor
shape: torch.Size or tuple[*int]
Shape of the original tensor
"""
result = torch.zeros(size=shape)
if len(indices):
flat_result = result.flatten()
flat_result[indices] = values
return result
def topk_sparsification_tensor(tensor, alpha):
"""
Performs topk sparsification of a tensor and returns
the same tensor from the input but transformed.
Note: no copies are created, but input vector is directly changed
Parameters
----------
tensor : torch.tensor
A tensor to perform the sparsification on
alpha : float
Percentage of topk values to keep
"""
tensor_abs = tensor.abs()
flat_abs_tensor = tensor_abs.flatten()
numel_to_keep = round(alpha * flat_abs_tensor.numel())
if numel_to_keep > 0:
cutoff_value, _ = torch.kthvalue(-flat_abs_tensor, numel_to_keep)
tensor[tensor_abs < -cutoff_value] = 0
return tensor
def topk_sparsification(state_dict, alpha):
"""
Performs topk sparsification of a state_dict
applying it over all elements together.
Note: the changes made to the result won't affect
the input state dictionary
Parameters
----------
state_dict : OrderedDict[str, torch.tensor]
A state dictionary to perform the sparsification on
alpha : float
Percentage of topk values to keep
"""
flat_tensor = flatten_state_dict(state_dict)
return unflatten_state_dict(
topk_sparsification_tensor(flat_tensor, alpha), state_dict
)
def serialize_sparse_state_dict(state_dict):
with torch.no_grad():
concatted_tensors = torch.cat(
[tensor.flatten() for tensor in state_dict.values()], axis=0
)
return serialize_sparse_tensor(concatted_tensors)
def deserialize_sparse_state_dict(values, indices, reference_state_dict):
with torch.no_grad():
keys = []
lens = []
shapes = []
for k, v in reference_state_dict.items():
keys.append(k)
shapes.append(v.shape)
lens.append(v.numel())
total_num_el = sum(lens)
T = deserialize_sparse_tensor(values, indices, (total_num_el,))
result_state_dict = OrderedDict()
start_index = 0
for i, k in enumerate(keys):
end_index = start_index + lens[i]
result_state_dict[k] = T[start_index:end_index].reshape(shapes[i])
start_index = end_index
return result_state_dict
class Choco(Sharing):
"""
API defining who to share with and what, and what to do on receiving
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
step_size,
alpha,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
step_size : float
Step size from the formulation of Choco
alpha : float
Percentage of elements to keep during topk sparsification
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
compress,
compression_package,
compression_class,
)
self.step_size = step_size
self.alpha = alpha
logging.info(
"type(step_size): %s, value: %s",
str(type(self.step_size)),
str(self.step_size),
)
logging.info(
"type(alpha): %s, value: %s", str(type(self.alpha)), str(self.alpha)
)
model_state_dict = model.state_dict()
self.model_hat = zeros_like_state_dict(model_state_dict)
self.s = zeros_like_state_dict(model_state_dict)
self.my_q = None
def compress_data(self, data):
result = dict(data)
if self.compress:
if "indices" in result:
result["indices"] = self.compressor.compress(result["indices"])
if "params" in result:
result["params"] = self.compressor.compress_float(result["params"])
return result
def decompress_data(self, data):
if self.compress:
if "indices" in data:
data["indices"] = self.compressor.decompress(data["indices"])
if "params" in data:
data["params"] = self.compressor.decompress_float(data["params"])
return data
def _compress(self, x):
return topk_sparsification(x, self.alpha)
def _pre_step(self):
"""
Called at the beginning of step.
"""
with torch.no_grad():
self.my_q = self._compress(
subtract_state_dicts(self.model.state_dict(), self.model_hat)
)
def serialized_model(self):
"""
Convert self q to a dictionary. Here we can choose how much to share
Returns
-------
dict
Model converted to dict
"""
values, indices = serialize_sparse_state_dict(self.my_q)
data = dict()
data["params"] = values.numpy()
data["indices"] = indices.numpy()
data["send_partial"] = True
return self.compress_data(data)
def deserialized_model(self, m):
"""
Convert received dict to state_dict.
Parameters
----------
m : dict
received dict
Returns
-------
state_dict
state_dict of received
"""
if "send_partial" not in m:
return super().deserialized_model(m)
with torch.no_grad():
m = self.decompress_data(m)
indices = torch.tensor(m["indices"], dtype=torch.long)
values = torch.tensor(m["params"])
return deserialize_sparse_state_dict(
values, indices, self.model.state_dict()
)
def _averaging(self, peer_deques):
"""
Averages the received model with the local model
"""
with torch.no_grad():
self_add_state_dict(self.model_hat, self.my_q) # x_hat = q_self + x_hat
weight_total = 0
for i, n in enumerate(peer_deques):
data = peer_deques[n].popleft()
degree, iteration = data["degree"], data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
)
data = self.deserialized_model(data)
# Metro-Hastings
weight = 1 / (max(len(peer_deques), degree) + 1)
weight_total += weight
for key, value in data.items():
if key in self.s:
self.s[key] += value * weight
# else:
# self.s[key] = value * weight
for key, value in self.my_q.items():
self.s[key] += (1 - weight_total) * value # Metro-Hastings
total = self.model.state_dict().copy()
self_add_state_dict(
total,
subtract_state_dicts(self.s, self.model_hat),
constant=self.step_size,
) # x = x + gamma * (s - x_hat)
self.model.load_state_dict(total)
self._post_step()
self.communication_round += 1
def _averaging_server(self, peer_deques):
"""
Averages the received models of all working nodes
"""
raise NotImplementedError()
import json
import logging
import os
import numpy as np
import torch
import torch.fft as fft
from decentralizepy.sharing.PartialModel import PartialModel
def change_transformer_fft(x):
"""
Transforms the model changes into frequency domain
Parameters
----------
x : torch.Tensor
Model change in the space domain
Returns
-------
x : torch.Tensor
Representation of the change int the frequency domain
"""
return fft.rfft(x)
class FFT(PartialModel):
"""
This class implements the fft version of model sharing
It is based on PartialModel.py
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha=1.0,
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
change_based_selection=True,
save_accumulated="",
accumulation=False,
accumulate_averaging_changes=False,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
alpha : float
Percentage of model to share
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
change_based_selection : bool
use frequency change to select topk frequencies
save_accumulated : bool
True if accumulated weight change in the frequency domain should be written to file. In case of accumulation
the accumulated change is stored.
accumulation : bool
True if the the indices to share should be selected based on accumulated frequency change
accumulate_averaging_changes: bool
True if the accumulation should account the model change due to averaging
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha,
dict_ordered,
save_shared,
metadata_cap,
accumulation,
save_accumulated,
change_transformer_fft,
accumulate_averaging_changes,
compress,
compression_package,
compression_class,
)
self.change_based_selection = change_based_selection
def apply_fft(self):
"""
Does fft transformation of the model parameters and selects topK (alpha) of them in the frequency domain
based on the undergone change during the current training step
Returns
-------
tuple
(a,b). a: selected fft frequencies (complex numbers), b: Their indices.
"""
logging.info("Returning fft compressed model weights")
with torch.no_grad():
flat_fft = self.pre_share_model_transformed
if self.change_based_selection:
diff = self.model.model_change
_, index = torch.topk(
diff.abs(), round(self.alpha * len(diff)), dim=0, sorted=False
)
else:
_, index = torch.topk(
flat_fft.abs(),
round(self.alpha * len(flat_fft)),
dim=0,
sorted=False,
)
index, _ = torch.sort(index)
return flat_fft[index], index
def serialized_model(self):
"""
Convert model to json dict. self.alpha specifies the fraction of model to send.
Returns
-------
dict
Model converted to json dict
"""
m = dict()
if self.alpha >= self.metadata_cap: # Share fully
data = self.pre_share_model_transformed
m["params"] = data.numpy()
if self.model.accumulated_changes is not None:
self.model.accumulated_changes = torch.zeros_like(
self.model.accumulated_changes
)
return self.compress_data(m)
with torch.no_grad():
topk, indices = self.apply_fft()
self.model.shared_parameters_counter[indices] += 1
self.model.rewind_accumulation(indices)
if self.save_shared:
shared_params = dict()
shared_params["order"] = list(self.model.state_dict().keys())
shapes = dict()
for k, v in self.model.state_dict().items():
shapes[k] = list(v.shape)
shared_params["shapes"] = shapes
shared_params[self.communication_round] = indices.tolist() # is slow
shared_params["alpha"] = self.alpha
with open(
os.path.join(
self.folder_path,
"{}_shared_params.json".format(self.communication_round + 1),
),
"w",
) as of:
json.dump(shared_params, of)
if not self.dict_ordered:
raise NotImplementedError
m["alpha"] = self.alpha
m["params"] = topk.numpy()
m["indices"] = indices.numpy().astype(np.int32)
m["send_partial"] = True
return self.compress_data(m)
def deserialized_model(self, m):
"""
Convert received json dict to state_dict.
Parameters
----------
m : dict
json dict received
Returns
-------
state_dict
state_dict of received
"""
m = self.decompress_data(m)
ret = dict()
if "send_partial" not in m:
params = m["params"]
params_tensor = torch.tensor(params)
ret["params"] = params_tensor
with torch.no_grad():
if not self.dict_ordered:
raise NotImplementedError
indices = m["indices"]
alpha = m["alpha"]
params = m["params"]
params_tensor = torch.tensor(params)
indices_tensor = torch.tensor(indices, dtype=torch.long)
ret["indices"] = indices_tensor
ret["params"] = params_tensor
ret["send_partial"] = True
return ret
def _averaging(self, peer_deques):
"""
Averages the received model with the local model
"""
with torch.no_grad():
total = None
weight_total = 0
tensors_to_cat = [
v.data.flatten() for _, v in self.model.state_dict().items()
]
pre_share_model = torch.cat(tensors_to_cat, dim=0)
flat_fft = self.change_transformer(pre_share_model)
for i, n in enumerate(peer_deques):
data = peer_deques[n].popleft()
degree, iteration = data["degree"], data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
)
data = self.deserialized_model(data)
params = data["params"]
if "indices" in data:
indices = data["indices"]
# use local data to complement
topkf = flat_fft.clone().detach()
topkf[indices] = params
else:
topkf = params
weight = 1 / (max(len(peer_deques), degree) + 1) # Metro-Hastings
weight_total += weight
if total is None:
total = weight * topkf
else:
total += weight * topkf
# Metro-Hastings
total += (1 - weight_total) * flat_fft
reverse_total = fft.irfft(total)
start_index = 0
std_dict = {}
for i, key in enumerate(self.model.state_dict()):
end_index = start_index + self.lens[i]
std_dict[key] = reverse_total[start_index:end_index].reshape(
self.shapes[i]
)
start_index = end_index
self.model.load_state_dict(std_dict)
self._post_step()
self.communication_round += 1
# Deprecated
import logging
from decentralizepy.sharing.PartialModel import PartialModel
class GrowingAlpha(PartialModel):
"""
This class implements the basic growing partial model sharing using a linear function.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
init_alpha=0.0,
max_alpha=1.0,
k=10,
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
init_alpha : float
Percentage of model to share initially
max_alpha : float
Maximum alpha to reach in k steps
k : int
Steps to reach maximum alpha. Also steps after which alpha increases.
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
init_alpha,
dict_ordered,
save_shared,
metadata_cap,
compress,
compression_package,
compression_class,
)
self.init_alpha = init_alpha
self.max_alpha = max_alpha
self.k = k
def get_data_to_send(self):
"""
Perform a sharing step. Implements D-PSGD with alpha increasing as a linear function.
"""
if (self.communication_round + 1) % self.k == 0:
self.alpha += (self.max_alpha - self.init_alpha) / self.k
self.alpha = min(self.alpha, 1.00)
if self.alpha == 0.0:
logging.info("Not sending/receiving data (alpha=0.0)")
self.communication_round += 1
return
return super().get_data_to_send()
import logging
import numpy as np
import torch
from decentralizepy.sharing.PartialModel import PartialModel
class LowerBoundTopK(PartialModel):
"""
This class implements a bounded version of topK.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
lower_bound=0.1,
metro_hastings=True,
compress=False,
compression_package=None,
compression_class=None,
**kwargs,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
alpha : float
Percentage of model to share
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
accumulation : bool
True if the the indices to share should be selected based on accumulated frequency change
save_accumulated : bool
True if accumulated weight change should be written to file. In case of accumulation the accumulated change
is stored. If a change_transformer is used then the transformed change is stored.
change_transformer : (x: Tensor) -> Tensor
A function that transforms the model change into other domains. Default: identity function
accumulate_averaging_changes: bool
True if the accumulation should account the model change due to averaging
lower_bound : float
Increases the communication budget per communication round by lower_bound, i.e. the new effective
alpha will be alpha + alpha*lower_bound. The extra budget is used to randomly selected parameters that
were shared in less than alpha*lower_bound*100 percentage of the rounds.
metro_hastings: bool
If True then metro hastings averaging is used otherwise it does per parameter averaging.
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
compress,
compression_package,
compression_class**kwargs,
)
self.lower_bound = lower_bound
self.metro_hastings = metro_hastings
if self.lower_bound > 0:
self.start_lower_bounding_at = 1 / self.lower_bound
def extract_top_gradients(self):
"""
Extract the indices and values of the topK gradients.
The gradients must have been accumulated.
Returns
-------
tuple
(a,b). a: The magnitudes of the topK gradients, b: Their indices.
"""
if self.lower_bound == 0.0:
return super().extract_top_gradients()
logging.info("Returning topk gradients bounded")
G_topk = torch.abs(self.model.model_change)
std, mean = torch.std_mean(G_topk, unbiased=False)
self.std = std.item()
self.mean = mean.item()
val, ind = torch.topk(
G_topk, round(self.alpha * G_topk.shape[0]), dim=0, sorted=False
)
if self.communication_round > self.start_lower_bounding_at:
# because the superclass increases it where it is inconvenient for this subclass
currently_shared = self.model.shared_parameters_counter.clone().detach()
currently_shared[ind] += 1
ind_small = (
currently_shared < self.communication_round * self.lower_bound
).nonzero(as_tuple=True)[0]
ind_small_unique = np.setdiff1d(
ind_small.numpy(), ind.numpy(), assume_unique=True
)
take_max = round(self.lower_bound * self.alpha * G_topk.shape[0])
logging.info(
"lower: %i %i %i", len(ind_small), len(ind_small_unique), take_max
)
if take_max > ind_small_unique.shape[0]:
take_max = ind_small_unique.shape[0]
to_take = torch.rand(ind_small_unique.shape[0])
_, ind_of_to_take = torch.topk(to_take, take_max, dim=0, sorted=False)
ind_bound = torch.from_numpy(ind_small_unique)[ind_of_to_take]
logging.info("lower bounding: %i %i", len(ind), len(ind_bound))
# val = torch.concat(val, G_topk[ind_bound]) # not really needed, as thes are abs values and not further used
ind = torch.cat([ind, ind_bound])
return val, ind
def deserialized_model_avg(self, m):
"""
Convert received dict to state_dict.
Parameters
----------
m : dict
dict received
Returns
-------
state_dict
state_dict of received
"""
if "send_partial" not in m:
return super().deserialized_model(m)
m = self.decompress_data(m)
with torch.no_grad():
state_dict = self.model.state_dict()
if not self.dict_ordered:
raise NotImplementedError
# could be made more efficent
T = torch.zeros_like(self.init_model)
index_tensor = torch.tensor(m["indices"], dtype=torch.long)
logging.debug("Original tensor: {}".format(T[index_tensor]))
T[index_tensor] = torch.tensor(m["params"])
logging.debug("Final tensor: {}".format(T[index_tensor]))
return T, index_tensor
def _averaging(self, peer_deques):
"""
Averages the received model with the local model
"""
if self.metro_hastings:
super()._averaging()
else:
with torch.no_grad():
tensors_to_cat = []
for _, v in self.model.state_dict().items():
t = v.flatten()
tensors_to_cat.append(t)
T = torch.cat(tensors_to_cat, dim=0)
weight_total = 0
weight_vector = torch.ones_like(self.init_model)
datas = []
for i, n in enumerate(peer_deques):
data = peer_deques[n].popleft()
degree, iteration = data["degree"], data["iteration"]
del data["degree"]
del data["iteration"]
del data["CHANNEL"]
logging.debug(
"Averaging model from neighbor {} of iteration {}".format(
n, iteration
)
)
data, ind = self.deserialized_model_avg(data)
weight_vector[ind] += 1
# weight = 1 / (max(len(self.peer_deques), degree) + 1) # Metro-Hastings
# weight_total += weight
datas.append(data)
weight_vector = 1.0 / weight_vector
# speed up by exploiting sparsity
T = T * weight_vector
for d in datas:
T += d * weight_vector
start_index = 0
total = dict()
for i, key in enumerate(self.model.state_dict()):
end_index = start_index + self.lens[i]
total[key] = T[start_index:end_index].reshape(self.shapes[i])
start_index = end_index
logging.info("new averaging")
self.model.load_state_dict(total)
self._post_step()
self.communication_round += 1
# Deprecated
import logging
from decentralizepy.sharing.PartialModel import PartialModel
class ManualAdapt(PartialModel):
"""
This class implements the basic growing partial model sharing provided when and what alpha to set.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
change_alpha,
change_rounds,
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
change_alpha : list
List of alphas to set. change_alpha[0] must be initial alpha.
change_rounds : list
List of iterations to change alpha. len(change_alpha) = len(change_rounds) + 1.
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
"""
assert change_alpha != ""
assert change_alpha != None
assert change_rounds != ""
assert change_rounds != None
if type(change_alpha) == str:
change_alpha = eval(change_alpha)
if type(change_rounds) == str:
change_rounds = eval(change_rounds)
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
change_alpha[0],
dict_ordered,
save_shared,
metadata_cap,
compress,
compression_package,
compression_class,
)
self.change_alpha = change_alpha[1:]
self.change_rounds = change_rounds
def get_data_to_send(self):
"""
Perform a sharing step. Implements D-PSGD with alpha manually given.
"""
if (
len(self.change_rounds)
and (self.communication_round + 1) == self.change_rounds[0]
):
self.alpha = min(self.change_alpha[0], 1.00)
self.change_alpha = self.change_alpha[1:]
self.change_rounds = self.change_rounds[1:]
if self.alpha == 0.0:
logging.info("Not sending/receiving data (alpha=0.0)")
self.communication_round += 1
return dict()
return super().get_data_to_send()
......@@ -3,13 +3,19 @@ import logging
import os
from pathlib import Path
import numpy
import numpy as np
import torch
from decentralizepy.sharing.Sharing import Sharing
from decentralizepy.utils import conditional_value, identity
class PartialModel(Sharing):
"""
This class implements the vanilla version of partial model sharing.
"""
def __init__(
self,
rank,
......@@ -23,18 +29,108 @@ class PartialModel(Sharing):
alpha=1.0,
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
accumulation=False,
save_accumulated="",
change_transformer=identity,
accumulate_averaging_changes=False,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
alpha : float
Percentage of model to share
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
accumulation : bool
True if the the indices to share should be selected based on accumulated frequency change
save_accumulated : bool
True if accumulated weight change should be written to file. In case of accumulation the accumulated change
is stored. If a change_transformer is used then the transformed change is stored.
change_transformer : (x: Tensor) -> Tensor
A function that transforms the model change into other domains. Default: identity function
accumulate_averaging_changes: bool
True if the accumulation should account the model change due to averaging
"""
super().__init__(
rank, machine_id, communication, mapping, graph, model, dataset, log_dir
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
compress,
compression_package,
compression_class,
)
self.alpha = alpha
self.dict_ordered = dict_ordered
self.communication_round = 0
self.save_shared = save_shared
self.metadata_cap = metadata_cap
self.accumulation = accumulation
self.save_accumulated = conditional_value(save_accumulated, "", False)
self.change_transformer = change_transformer
self.accumulate_averaging_changes = accumulate_averaging_changes
# getting the initial model
self.shapes = []
self.lens = []
with torch.no_grad():
tensors_to_cat = []
for _, v in self.model.state_dict().items():
self.shapes.append(v.shape)
t = v.flatten()
self.lens.append(t.shape[0])
tensors_to_cat.append(t)
self.init_model = torch.cat(tensors_to_cat, dim=0)
if self.accumulation:
self.model.accumulated_changes = torch.zeros_like(
self.change_transformer(self.init_model)
)
self.prev = self.init_model
self.number_of_params = self.init_model.shape[0]
if self.save_accumulated:
self.model_change_path = os.path.join(
self.log_dir, "model_change/{}".format(self.rank)
)
Path(self.model_change_path).mkdir(parents=True, exist_ok=True)
self.model_val_path = os.path.join(
self.log_dir, "model_val/{}".format(self.rank)
)
Path(self.model_val_path).mkdir(parents=True, exist_ok=True)
# Only save for 2 procs
if rank == 0 or rank == 1:
self.save_shared = True
# Only save for 2 procs: Save space
if self.save_shared and not (rank == 0 or rank == 1):
self.save_shared = False
if self.save_shared:
self.folder_path = os.path.join(
......@@ -42,25 +138,73 @@ class PartialModel(Sharing):
)
Path(self.folder_path).mkdir(parents=True, exist_ok=True)
self.model.shared_parameters_counter = torch.zeros(
self.change_transformer(self.init_model).shape[0], dtype=torch.int32
)
def compress_data(self, data):
result = dict(data)
if self.compress:
if "indices" in result:
result["indices"] = self.compressor.compress(result["indices"])
if "params" in result:
result["params"] = self.compressor.compress_float(result["params"])
return result
def decompress_data(self, data):
if self.compress:
if "indices" in data:
data["indices"] = self.compressor.decompress(data["indices"])
if "params" in data:
data["params"] = self.compressor.decompress_float(data["params"])
return data
def extract_top_gradients(self):
logging.info("Summing up gradients")
assert len(self.model.accumulated_gradients) > 0
gradient_sum = self.model.accumulated_gradients[0]
for i in range(1, len(self.model.accumulated_gradients)):
for key in self.model.accumulated_gradients[i]:
gradient_sum[key] += self.model.accumulated_gradients[i][key]
"""
Extract the indices and values of the topK gradients.
The gradients must have been accumulated.
Returns
-------
tuple
(a,b). a: The magnitudes of the topK gradients, b: Their indices.
"""
logging.info("Returning topk gradients")
tensors_to_cat = [v.data.flatten() for _, v in gradient_sum.items()]
G_topk = torch.abs(torch.cat(tensors_to_cat, dim=0))
return torch.topk(
G_topk, round(self.alpha * G_topk.shape[0]), dim=0, sorted=False
G_topk = torch.abs(self.model.model_change)
std, mean = torch.std_mean(G_topk, unbiased=False)
self.std = std.item()
self.mean = mean.item()
_, index = torch.topk(
G_topk, round(self.alpha * G_topk.shape[0]), dim=0, sorted=True
)
index, _ = torch.sort(index)
return _, index
def serialized_model(self):
"""
Convert model to a dict. self.alpha specifies the fraction of model to send.
Returns
-------
dict
Model converted to a dict
"""
if self.alpha >= self.metadata_cap: # Share fully
if self.model.accumulated_changes is not None:
self.model.accumulated_changes = torch.zeros_like(
self.model.accumulated_changes
)
return super().serialized_model()
with torch.no_grad():
_, G_topk = self.extract_top_gradients()
self.model.shared_parameters_counter[G_topk] += 1
if self.accumulation:
self.model.rewind_accumulation(G_topk)
if self.save_shared:
shared_params = dict()
shared_params["order"] = list(self.model.state_dict().keys())
......@@ -82,9 +226,7 @@ class PartialModel(Sharing):
logging.info("Extracting topk params")
tensors_to_cat = [v.data.flatten() for v in self.model.parameters()]
T = torch.cat(tensors_to_cat, dim=0)
T_topk = T[G_topk]
T_topk = self.pre_share_model[G_topk]
logging.info("Generating dictionary to send")
......@@ -93,25 +235,44 @@ class PartialModel(Sharing):
if not self.dict_ordered:
raise NotImplementedError
m["indices"] = G_topk.numpy().tolist()
m["params"] = T_topk.numpy().tolist()
m["alpha"] = self.alpha
m["indices"] = G_topk.numpy().astype(np.int32)
m["params"] = T_topk.numpy()
m["send_partial"] = True
assert len(m["indices"]) == len(m["params"])
logging.info("Elements sending: {}".format(len(m["indices"])))
logging.info("Generated dictionary to send")
for key in m:
m[key] = json.dumps(m[key])
logging.info("Converted dictionary to pickle")
logging.info("Converted dictionary to json")
return self.compress_data(m)
self.communication_round += 1
def deserialized_model(self, m):
"""
Convert received dict to state_dict.
return m
Parameters
----------
m : dict
dict received
Returns
-------
state_dict
state_dict of received
"""
if "send_partial" not in m:
return super().deserialized_model(m)
def deserialized_model(self, m):
with torch.no_grad():
m = self.decompress_data(m)
state_dict = self.model.state_dict()
if not self.dict_ordered:
......@@ -127,9 +288,9 @@ class PartialModel(Sharing):
tensors_to_cat.append(t)
T = torch.cat(tensors_to_cat, dim=0)
index_tensor = torch.tensor(json.loads(m["indices"]))
index_tensor = torch.tensor(m["indices"], dtype=torch.long)
logging.debug("Original tensor: {}".format(T[index_tensor]))
T[index_tensor] = torch.tensor(json.loads(m["params"]))
T[index_tensor] = torch.tensor(m["params"])
logging.debug("Final tensor: {}".format(T[index_tensor]))
start_index = 0
for i, key in enumerate(state_dict):
......@@ -138,3 +299,90 @@ class PartialModel(Sharing):
start_index = end_index
return state_dict
def _pre_step(self):
"""
Called at the beginning of step.
"""
logging.info("PartialModel _pre_step")
with torch.no_grad():
tensors_to_cat = [
v.data.flatten() for _, v in self.model.state_dict().items()
]
self.pre_share_model = torch.cat(tensors_to_cat, dim=0)
# Would only need one of the transforms
self.pre_share_model_transformed = self.change_transformer(
self.pre_share_model
)
change = self.change_transformer(self.pre_share_model - self.init_model)
if self.accumulation:
if not self.accumulate_averaging_changes:
# Need to accumulate in _pre_step as the accumulation gets rewind during the step
self.model.accumulated_changes += change
change = self.model.accumulated_changes.clone().detach()
else:
# For the legacy implementation, we will only rewind currently accumulated values
# and add the model change due to averaging in the end
change += self.model.accumulated_changes
# stores change of the model due to training, change due to averaging is not accounted
self.model.model_change = change
def _post_step(self):
"""
Called at the end of step.
"""
logging.info("PartialModel _post_step")
with torch.no_grad():
tensors_to_cat = [
v.data.flatten() for _, v in self.model.state_dict().items()
]
post_share_model = torch.cat(tensors_to_cat, dim=0)
self.init_model = post_share_model
if self.accumulation:
if self.accumulate_averaging_changes:
self.model.accumulated_changes += self.change_transformer(
self.init_model - self.prev
)
self.prev = self.init_model
self.model.model_change = None
if self.save_accumulated:
self.save_change()
def save_vector(self, v, s):
"""
Saves the given vector to the file.
Parameters
----------
v : torch.tensor
The torch tensor to write to file
s : str
Path to folder to write to
"""
output_dict = dict()
output_dict["order"] = list(self.model.state_dict().keys())
shapes = dict()
for k, v1 in self.model.state_dict().items():
shapes[k] = list(v1.shape)
output_dict["shapes"] = shapes
output_dict["tensor"] = v.tolist()
with open(
os.path.join(
s,
"{}.json".format(self.communication_round + 1),
),
"w",
) as of:
json.dump(output_dict, of)
def save_change(self):
"""
Saves the change and the gradient values for every iteration
"""
self.save_vector(self.model.model_change, self.model_change_path)
import random
from decentralizepy.sharing.PartialModel import PartialModel
from decentralizepy.utils import identity
class RandomAlpha(PartialModel):
"""
This class implements the partial model sharing with a random alpha each iteration.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha_list=[0.1, 0.2, 0.3, 0.4, 1.0],
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
accumulation=False,
save_accumulated="",
change_transformer=identity,
accumulate_averaging_changes=False,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
1.0,
dict_ordered,
save_shared,
metadata_cap,
accumulation,
save_accumulated,
change_transformer,
accumulate_averaging_changes,
compress,
compression_package,
compression_class,
)
self.alpha_list = eval(alpha_list)
random.seed(self.mapping.get_uid(self.rank, self.machine_id))
def get_data_to_send(self):
"""
Perform a sharing step. Implements D-PSGD with alpha randomly chosen.
"""
self.alpha = random.choice(self.alpha_list)
return super().get_data_to_send()
# Deprecated
import random
from decentralizepy.sharing.PartialModel import PartialModel
class RandomAlphaIncremental(PartialModel):
"""
This class implements the partial model sharing with a random alpha from an increasing range each iteration.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
range_start=0.1,
range_end=0.2,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
1.0,
dict_ordered,
save_shared,
metadata_cap,
compress,
compression_package,
compression_class,
)
random.seed(self.mapping.get_uid(self.rank, self.machine_id))
self.range_start = range_start
self.range_end = range_end
def get_data_to_send(self):
"""
Perform a sharing step. Implements D-PSGD with alpha randomly chosen from an increasing range.
"""
self.alpha = round(random.uniform(self.range_start, self.range_end), 2)
self.range_end = min(1.0, self.range_end + round(random.uniform(0.0, 0.1), 2))
return super().get_data_to_send()
import random
from decentralizepy.sharing.Wavelet import Wavelet
class RandomAlpha(Wavelet):
"""
This class implements the partial model sharing with a random alpha each iteration.
"""
def __init__(
self,
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
alpha_list="[0.1, 0.2, 0.3, 0.4, 1.0]",
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
wavelet="haar",
level=4,
change_based_selection=True,
save_accumulated="",
accumulation=False,
accumulate_averaging_changes=False,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
Parameters
----------
rank : int
Local rank
machine_id : int
Global machine id
communication : decentralizepy.communication.Communication
Communication module used to send and receive messages
mapping : decentralizepy.mappings.Mapping
Mapping (rank, machine_id) -> uid
graph : decentralizepy.graphs.Graph
Graph reprensenting neighbors
model : decentralizepy.models.Model
Model to train
dataset : decentralizepy.datasets.Dataset
Dataset for sharing data. Not implemented yet! TODO
log_dir : str
Location to write shared_params (only writing for 2 procs per machine)
dict_ordered : bool
Specifies if the python dict maintains the order of insertion
save_shared : bool
Specifies if the indices of shared parameters should be logged
metadata_cap : float
Share full model when self.alpha > metadata_cap
"""
super().__init__(
rank,
machine_id,
communication,
mapping,
graph,
model,
dataset,
log_dir,
1.0,
dict_ordered,
save_shared,
metadata_cap,
wavelet,
level,
change_based_selection,
save_accumulated,
accumulation,
accumulate_averaging_changes,
compress,
compression_package,
compression_class,
)
self.alpha_list = eval(alpha_list)
random.seed(self.mapping.get_uid(self.rank, self.machine_id))
def get_data_to_send(self):
"""
Perform a sharing step. Implements D-PSGD with alpha randomly chosen.
"""
self.alpha = random.choice(self.alpha_list)
return super().get_data_to_send()