Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • sacs/decentralizepy
  • mvujas/decentralizepy
  • randl/decentralizepy
3 results
Show changes
Showing
with 2821 additions and 279 deletions
# elias implementation: taken from this stack overflow post:
# https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma
import fpzip
import numpy as np
from decentralizepy.compression.Compression import Compression
......
# elias implementation: taken from this stack overflow post:
# https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma
import fpzip
import numpy as np
from decentralizepy.compression.Elias import Elias
......
# elias implementation: taken from this stack overflow post:
# https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma
import fpzip
import numpy as np
from decentralizepy.compression.Elias import Elias
......
......@@ -114,6 +114,8 @@ class CIFAR10(Dataset):
test_batch_size,
)
self.num_classes = NUM_CLASSES
self.partition_niid = partition_niid
self.shards = shards
self.transform = transforms.Compose(
......
......@@ -230,6 +230,8 @@ class Celeba(Dataset):
self.IMAGES_DIR = utils.conditional_value(images_dir, "", None)
assert self.IMAGES_DIR != None
self.num_classes = NUM_CLASSES
if self.__training__:
self.load_trainset()
......
......@@ -52,6 +52,7 @@ class Dataset:
self.test_dir = utils.conditional_value(test_dir, "", None)
self.sizes = utils.conditional_value(sizes, "", None)
self.test_batch_size = utils.conditional_value(test_batch_size, "", 64)
self.num_classes = None
if self.sizes:
if type(self.sizes) == str:
self.sizes = eval(self.sizes)
......@@ -66,6 +67,20 @@ class Dataset:
else:
self.__testing__ = False
self.label_distribution = None
def get_label_distribution(self):
# Only supported for classification
if self.label_distribution == None:
self.label_distribution = [0 for _ in range(self.num_classes)]
tr_set = self.get_trainset()
for _, ys in tr_set:
for y in ys:
y_val = y.item()
self.label_distribution[y_val] += 1
return self.label_distribution
def get_trainset(self):
"""
Function to get the training set
......
......@@ -223,6 +223,8 @@ class Femnist(Dataset):
test_batch_size,
)
self.num_classes = NUM_CLASSES
if self.__training__:
self.load_trainset()
......
......@@ -3,7 +3,6 @@ import math
import os
import zipfile
import numpy as np
import pandas as pd
import requests
import torch
......
import json
import logging
import os
import re
from collections import defaultdict
import numpy as np
......
......@@ -22,6 +22,9 @@ class Graph:
self.n_procs = n_procs
self.adj_list = [set() for i in range(self.n_procs)]
def get_all_nodes(self):
return [i for i in range(self.n_procs)]
def __insert_adj__(self, node, neighbours):
"""
Inserts `neighbours` into the adjacency list of `node`
......
......@@ -8,7 +8,7 @@ class Linear(Mapping):
"""
def __init__(self, n_machines, procs_per_machine):
def __init__(self, n_machines, procs_per_machine, global_service_machine=0):
"""
Constructor
......@@ -23,6 +23,7 @@ class Linear(Mapping):
super().__init__(n_machines * procs_per_machine)
self.n_machines = n_machines
self.procs_per_machine = procs_per_machine
self.global_service_machine = global_service_machine
def get_uid(self, rank: int, machine_id: int):
"""
......@@ -41,6 +42,8 @@ class Linear(Mapping):
the unique identifier
"""
if rank < 0:
return rank
return machine_id * self.procs_per_machine + rank
def get_machine_and_rank(self, uid: int):
......@@ -58,6 +61,8 @@ class Linear(Mapping):
a tuple of rank and machine_id
"""
if uid < 0:
return uid, self.global_service_machine
return (uid % self.procs_per_machine), (uid // self.procs_per_machine)
def get_local_procs_count(self):
......
import importlib
import json
import logging
import math
import os
from collections import deque
import torch
from matplotlib import pyplot as plt
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.node.Node import Node
class DPSGDNode(Node):
"""
This class defines the node for DPSGD
"""
def save_plot(self, l, label, title, xlabel, filename):
"""
Save Matplotlib plot. Clears previous plots.
Parameters
----------
l : dict
dict of x -> y. `x` must be castable to int.
label : str
label of the plot. Used for legend.
title : str
Header
xlabel : str
x-axis label
filename : str
Name of file to save the plot as.
"""
plt.clf()
y_axis = [l[key] for key in l.keys()]
x_axis = list(map(int, l.keys()))
plt.plot(x_axis, y_axis, label=label)
plt.xlabel(xlabel)
plt.title(title)
plt.savefig(filename)
def get_neighbors(self, node=None):
return self.my_neighbors
# def instantiate_peer_deques(self):
# for neighbor in self.my_neighbors:
# if neighbor not in self.peer_deques:
# self.peer_deques[neighbor] = deque()
def receive_DPSGD(self):
return self.receive_channel("DPSGD")
def run(self):
"""
Start the decentralized learning
"""
self.testset = self.dataset.get_testset()
rounds_to_test = self.test_after
rounds_to_train_evaluate = self.train_evaluate_after
global_epoch = 1
change = 1
for iteration in range(self.iterations):
logging.info("Starting training iteration: %d", iteration)
rounds_to_train_evaluate -= 1
rounds_to_test -= 1
self.iteration = iteration
self.trainer.train(self.dataset)
new_neighbors = self.get_neighbors()
# The following code does not work because TCP sockets are supposed to be long lived.
# for neighbor in self.my_neighbors:
# if neighbor not in new_neighbors:
# logging.info("Removing neighbor {}".format(neighbor))
# if neighbor in self.peer_deques:
# assert len(self.peer_deques[neighbor]) == 0
# del self.peer_deques[neighbor]
# self.communication.destroy_connection(neighbor, linger = 10000)
# self.barrier.remove(neighbor)
self.my_neighbors = new_neighbors
self.connect_neighbors()
logging.info("Connected to all neighbors")
# self.instantiate_peer_deques()
to_send = self.sharing.get_data_to_send()
to_send["CHANNEL"] = "DPSGD"
for neighbor in self.my_neighbors:
self.communication.send(neighbor, to_send)
while not self.received_from_all():
sender, data = self.receive_DPSGD()
logging.info(
"Received Model from {} of iteration {}".format(
sender, data["iteration"]
)
)
if sender not in self.peer_deques:
self.peer_deques[sender] = deque()
self.peer_deques[sender].append(data)
averaging_deque = dict()
for neighbor in self.my_neighbors:
averaging_deque[neighbor] = self.peer_deques[neighbor]
self.sharing._averaging(averaging_deque)
if self.reset_optimizer:
self.optimizer = self.optimizer_class(
self.model.parameters(), **self.optimizer_params
) # Reset optimizer state
self.trainer.reset_optimizer(self.optimizer)
if iteration:
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)),
"r",
) as inf:
results_dict = json.load(inf)
else:
results_dict = {
"train_loss": {},
"test_loss": {},
"test_acc": {},
"total_bytes": {},
"total_meta": {},
"total_data_per_n": {},
}
results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes
if hasattr(self.communication, "total_meta"):
results_dict["total_meta"][
iteration + 1
] = self.communication.total_meta
if hasattr(self.communication, "total_data"):
results_dict["total_data_per_n"][
iteration + 1
] = self.communication.total_data
if rounds_to_train_evaluate == 0:
logging.info("Evaluating on train set.")
rounds_to_train_evaluate = self.train_evaluate_after * change
loss_after_sharing = self.trainer.eval_loss(self.dataset)
results_dict["train_loss"][iteration + 1] = loss_after_sharing
self.save_plot(
results_dict["train_loss"],
"train_loss",
"Training Loss",
"Communication Rounds",
os.path.join(self.log_dir, "{}_train_loss.png".format(self.rank)),
)
if self.dataset.__testing__ and rounds_to_test == 0:
rounds_to_test = self.test_after * change
logging.info("Evaluating on test set.")
ta, tl = self.dataset.test(self.model, self.loss)
results_dict["test_acc"][iteration + 1] = ta
results_dict["test_loss"][iteration + 1] = tl
if global_epoch == 49:
change *= 2
global_epoch += change
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)), "w"
) as of:
json.dump(results_dict, of)
if self.model.shared_parameters_counter is not None:
logging.info("Saving the shared parameter counts")
with open(
os.path.join(
self.log_dir, "{}_shared_parameters.json".format(self.rank)
),
"w",
) as of:
json.dump(self.model.shared_parameters_counter.numpy().tolist(), of)
self.disconnect_neighbors()
logging.info("Storing final weight")
self.model.dump_weights(self.weights_store_dir, self.uid, iteration)
logging.info("All neighbors disconnected. Process complete!")
def cache_fields(
self,
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
reset_optimizer,
):
"""
Instantiate object field with arguments.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
"""
self.rank = rank
self.machine_id = machine_id
self.graph = graph
self.mapping = mapping
self.uid = self.mapping.get_uid(rank, machine_id)
self.log_dir = log_dir
self.weights_store_dir = weights_store_dir
self.iterations = iterations
self.test_after = test_after
self.train_evaluate_after = train_evaluate_after
self.reset_optimizer = reset_optimizer
self.sent_disconnections = False
logging.info("Rank: %d", self.rank)
logging.info("type(graph): %s", str(type(self.rank)))
logging.info("type(mapping): %s", str(type(self.mapping)))
def init_comm(self, comm_configs):
"""
Instantiate communication module from config.
Parameters
----------
comm_configs : dict
Python dict containing communication config params
"""
comm_module = importlib.import_module(comm_configs["comm_package"])
comm_class = getattr(comm_module, comm_configs["comm_class"])
comm_params = utils.remove_keys(comm_configs, ["comm_package", "comm_class"])
self.addresses_filepath = comm_params.get("addresses_filepath", None)
self.communication = comm_class(
self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params
)
def instantiate(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
*args
):
"""
Construct objects.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations.
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
args : optional
Other arguments
"""
logging.info("Started process.")
self.init_log(log_dir, rank, log_level)
self.cache_fields(
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
reset_optimizer,
)
self.init_dataset_model(config["DATASET"])
self.init_optimizer(config["OPTIMIZER_PARAMS"])
self.init_trainer(config["TRAIN_PARAMS"])
self.init_comm(config["COMMUNICATION"])
self.message_queue = dict()
self.barrier = set()
self.my_neighbors = self.graph.neighbors(self.uid)
self.init_sharing(config["SHARING"])
self.peer_deques = dict()
self.connect_neighbors()
def received_from_all(self):
"""
Check if all neighbors have sent the current iteration
Returns
-------
bool
True if required data has been received, False otherwise
"""
for k in self.my_neighbors:
if (k not in self.peer_deques) or len(self.peer_deques[k]) == 0:
return False
return True
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
args : optional
Other arguments
"""
total_threads = os.cpu_count()
self.threads_per_proc = max(
math.floor(total_threads / mapping.procs_per_machine), 1
)
torch.set_num_threads(self.threads_per_proc)
torch.set_num_interop_threads(1)
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
weights_store_dir,
log_level,
test_after,
train_evaluate_after,
reset_optimizer,
*args
)
logging.info(
"Each proc uses %d threads out of %d.", self.threads_per_proc, total_threads
)
self.run()
import importlib
import json
import logging
import math
import os
from collections import deque
import torch
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.node.Node import Node
class DPSGDNodeFederated(Node):
"""
This class defines the node for federated DPSGD
"""
def run(self):
"""
Start the decentralized learning
"""
while len(self.barrier):
sender, data = self.receive_channel("WORKER_REQUEST")
if "BYE" in data:
logging.info("Received {} from {}".format("BYE", sender))
self.barrier.remove(sender)
break
iteration = data["iteration"]
del data["iteration"]
del data["CHANNEL"]
self.model.load_state_dict(data["params"])
self.sharing._post_step()
self.sharing.communication_round += 1
logging.info(
"Received worker request at node {}, global iteration {}, local round {}".format(
self.uid, iteration, self.participated
)
)
if self.reset_optimizer:
self.optimizer = self.optimizer_class(
self.model.parameters(), **self.optimizer_params
) # Reset optimizer state
self.trainer.reset_optimizer(self.optimizer)
# Perform iteration
logging.info("Starting training iteration")
self.trainer.train(self.dataset)
# Send update to server
to_send = self.sharing.get_data_to_send()
to_send["CHANNEL"] = "DPSGD"
self.communication.send(self.parameter_server_uid, to_send)
if self.participated > 0:
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)),
"r",
) as inf:
results_dict = json.load(inf)
else:
results_dict = {
"train_loss": {},
"test_loss": {},
"test_acc": {},
"total_bytes": {},
"total_meta": {},
"total_data_per_n": {},
}
results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes
if hasattr(self.communication, "total_meta"):
results_dict["total_meta"][
iteration + 1
] = self.communication.total_meta
if hasattr(self.communication, "total_data"):
results_dict["total_data_per_n"][
iteration + 1
] = self.communication.total_data
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)), "w"
) as of:
json.dump(results_dict, of)
self.participated += 1
# only if has participated in learning
if self.participated > 0:
logging.info("Storing final weight")
self.model.dump_weights(self.weights_store_dir, self.uid, iteration)
logging.info("Server disconnected. Process complete!")
def cache_fields(
self,
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
reset_optimizer,
):
"""
Instantiate object field with arguments.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
"""
self.rank = rank
self.machine_id = machine_id
self.graph = graph
self.mapping = mapping
self.uid = self.mapping.get_uid(rank, machine_id)
self.log_dir = log_dir
self.weights_store_dir = weights_store_dir
self.iterations = iterations
self.test_after = test_after
self.train_evaluate_after = train_evaluate_after
self.reset_optimizer = reset_optimizer
self.sent_disconnections = False
logging.info("Rank: %d", self.rank)
logging.info("type(graph): %s", str(type(self.rank)))
logging.info("type(mapping): %s", str(type(self.mapping)))
def init_comm(self, comm_configs):
"""
Instantiate communication module from config.
Parameters
----------
comm_configs : dict
Python dict containing communication config params
"""
comm_module = importlib.import_module(comm_configs["comm_package"])
comm_class = getattr(comm_module, comm_configs["comm_class"])
comm_params = utils.remove_keys(comm_configs, ["comm_package", "comm_class"])
self.addresses_filepath = comm_params.get("addresses_filepath", None)
self.communication = comm_class(
self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params
)
def instantiate(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
*args
):
"""
Construct objects.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations.
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
args : optional
Other arguments
"""
logging.info("Started process.")
self.init_log(log_dir, rank, log_level)
self.cache_fields(
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
reset_optimizer,
)
self.init_dataset_model(config["DATASET"])
self.init_optimizer(config["OPTIMIZER_PARAMS"])
self.init_trainer(config["TRAIN_PARAMS"])
self.init_comm(config["COMMUNICATION"])
self.message_queue = dict()
self.barrier = set()
self.participated = 0
self.init_sharing(config["SHARING"])
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
parameter_server_uid=-1,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
parameter_server_uid: int
The parameter server's uid
args : optional
Other arguments
"""
total_threads = os.cpu_count()
self.threads_per_proc = max(
math.floor(total_threads / mapping.procs_per_machine), 1
)
torch.set_num_threads(self.threads_per_proc)
torch.set_num_interop_threads(1)
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
weights_store_dir,
log_level,
test_after,
train_evaluate_after,
reset_optimizer,
*args
)
logging.info(
"Each proc uses %d threads out of %d.", self.threads_per_proc, total_threads
)
self.message_queue["PEERS"] = deque()
self.parameter_server_uid = parameter_server_uid
self.connect_neighbor(self.parameter_server_uid)
self.wait_for_hello(self.parameter_server_uid)
self.run()
import logging
import math
import os
from collections import deque
import torch
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.node.DPSGDNode import DPSGDNode
class DPSGDWithPeerSampler(DPSGDNode):
"""
This class defines the node for DPSGD
"""
def receive_neighbors(self):
return self.receive_channel("PEERS")[1]["NEIGHBORS"]
def get_neighbors(self, node=None):
logging.info("Requesting neighbors from the peer sampler.")
self.communication.send(
self.peer_sampler_uid,
{
"REQUEST_NEIGHBORS": self.uid,
"iteration": self.iteration,
"CHANNEL": "SERVER_REQUEST",
},
)
my_neighbors = self.receive_neighbors()
logging.info("Neighbors this round: {}".format(my_neighbors))
return my_neighbors
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
peer_sampler_uid=-1,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
args : optional
Other arguments
"""
total_threads = os.cpu_count()
self.threads_per_proc = max(
math.floor(total_threads / mapping.procs_per_machine), 1
)
torch.set_num_threads(self.threads_per_proc)
torch.set_num_interop_threads(1)
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
weights_store_dir,
log_level,
test_after,
train_evaluate_after,
reset_optimizer,
*args
)
logging.info(
"Each proc uses %d threads out of %d.", self.threads_per_proc, total_threads
)
self.message_queue["PEERS"] = deque()
self.peer_sampler_uid = peer_sampler_uid
self.connect_neighbor(self.peer_sampler_uid)
self.wait_for_hello(self.peer_sampler_uid)
self.run()
def disconnect_neighbors(self):
"""
Disconnects all neighbors.
Raises
------
RuntimeError
If received another message while waiting for BYEs
"""
if not self.sent_disconnections:
logging.info("Disconnecting neighbors")
for uid in self.barrier:
self.communication.send(uid, {"BYE": self.uid, "CHANNEL": "DISCONNECT"})
self.communication.send(
self.peer_sampler_uid, {"BYE": self.uid, "CHANNEL": "SERVER_REQUEST"}
)
self.sent_disconnections = True
self.barrier.remove(self.peer_sampler_uid)
while len(self.barrier):
sender, _ = self.receive_disconnect()
self.barrier.remove(sender)
import importlib
import json
import logging
import math
import os
import random
from collections import deque
from matplotlib import pyplot as plt
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.node.Node import Node
class FederatedParameterServer(Node):
"""
This class defines the parameter serving service
"""
def save_plot(self, l, label, title, xlabel, filename):
"""
Save Matplotlib plot. Clears previous plots.
Parameters
----------
l : dict
dict of x -> y. `x` must be castable to int.
label : str
label of the plot. Used for legend.
title : str
Header
xlabel : str
x-axis label
filename : str
Name of file to save the plot as.
"""
plt.clf()
y_axis = [l[key] for key in l.keys()]
x_axis = list(map(int, l.keys()))
plt.plot(x_axis, y_axis, label=label)
plt.xlabel(xlabel)
plt.title(title)
plt.savefig(filename)
def init_log(self, log_dir, log_level, force=True):
"""
Instantiate Logging.
Parameters
----------
log_dir : str
Logging directory
rank : rank : int
Rank of process local to the machine
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
force : bool
Argument to logging.basicConfig()
"""
log_file = os.path.join(log_dir, "ParameterServer.log")
logging.basicConfig(
filename=log_file,
format="[%(asctime)s][%(module)s][%(levelname)s] %(message)s",
level=log_level,
force=force,
)
def cache_fields(
self,
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
):
"""
Instantiate object field with arguments.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
"""
self.rank = rank
self.machine_id = machine_id
self.graph = graph
self.mapping = mapping
self.uid = self.mapping.get_uid(rank, machine_id)
self.log_dir = log_dir
self.iterations = iterations
self.sent_disconnections = False
self.weights_store_dir = weights_store_dir
self.test_after = test_after
self.train_evaluate_after = train_evaluate_after
logging.info("Rank: %d", self.rank)
logging.info("type(graph): %s", str(type(self.rank)))
logging.info("type(mapping): %s", str(type(self.mapping)))
def init_comm(self, comm_configs):
"""
Instantiate communication module from config.
Parameters
----------
comm_configs : dict
Python dict containing communication config params
"""
comm_module = importlib.import_module(comm_configs["comm_package"])
comm_class = getattr(comm_module, comm_configs["comm_class"])
comm_params = utils.remove_keys(comm_configs, ["comm_package", "comm_class"])
self.addresses_filepath = comm_params.get("addresses_filepath", None)
self.communication = comm_class(
self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params
)
def instantiate(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
*args
):
"""
Construct objects.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations.
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
args : optional
Other arguments
"""
logging.info("Started process.")
self.init_log(log_dir, log_level)
self.cache_fields(
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
)
self.message_queue = dict()
self.barrier = set()
self.peer_deques = dict()
self.init_dataset_model(config["DATASET"])
self.init_comm(config["COMMUNICATION"])
self.init_optimizer(config["OPTIMIZER_PARAMS"])
self.init_trainer(config["TRAIN_PARAMS"])
self.my_neighbors = self.graph.get_all_nodes()
self.connect_neighbors()
self.init_sharing(config["SHARING"])
def received_from_all(self):
"""
Check if all current workers have sent the current iteration
Returns
-------
bool
True if required data has been received, False otherwise
"""
for k in self.current_workers:
if (k not in self.peer_deques) or len(self.peer_deques[k]) == 0:
return False
return True
def disconnect_neighbors(self):
"""
Disconnects all neighbors.
Raises
------
RuntimeError
If received another message while waiting for BYEs
"""
if not self.sent_disconnections:
logging.info("Disconnecting neighbors")
for neighbor in self.my_neighbors:
self.communication.send(
neighbor, {"BYE": self.uid, "CHANNEL": "WORKER_REQUEST"}
)
self.barrier.remove(neighbor)
self.sent_disconnections = True
def get_working_nodes(self):
"""
Randomly select set of clients for the current iteration
"""
k = int(math.ceil(len(self.my_neighbors) * self.working_fraction))
return random.sample(self.my_neighbors, k)
def run(self):
"""
Start the federated parameter-serving service.
"""
self.testset = self.dataset.get_testset()
rounds_to_test = self.test_after
rounds_to_train_evaluate = self.train_evaluate_after
global_epoch = 1
change = 1
to_send = dict()
for iteration in range(self.iterations):
self.iteration = iteration
# reset deques after each iteration
self.peer_deques = dict()
# Get workers for this iteration
self.current_workers = self.get_working_nodes()
# Params to send to workers
to_send["params"] = self.model.state_dict()
to_send["CHANNEL"] = "WORKER_REQUEST"
to_send["iteration"] = iteration
# Notify workers
for worker in self.current_workers:
self.communication.send(worker, to_send)
# Receive updates from current workers
while not self.received_from_all():
sender, data = self.receive_channel("DPSGD")
if sender not in self.peer_deques:
self.peer_deques[sender] = deque()
self.peer_deques[sender].append(data)
logging.info("Received from all current workers")
# Average received updates
averaging_deque = dict()
for worker in self.current_workers:
averaging_deque[worker] = self.peer_deques[worker]
self.sharing._pre_step()
self.sharing._averaging_server(averaging_deque)
if iteration:
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)),
"r",
) as inf:
results_dict = json.load(inf)
else:
results_dict = {
"train_loss": {},
"test_loss": {},
"test_acc": {},
"total_bytes": {},
"total_meta": {},
"total_data_per_n": {},
}
results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes
if hasattr(self.communication, "total_meta"):
results_dict["total_meta"][
iteration + 1
] = self.communication.total_meta
if hasattr(self.communication, "total_data"):
results_dict["total_data_per_n"][
iteration + 1
] = self.communication.total_data
rounds_to_train_evaluate -= 1
if rounds_to_train_evaluate == 0:
logging.info("Evaluating on train set.")
rounds_to_train_evaluate = self.train_evaluate_after * change
loss_after_sharing = self.trainer.eval_loss(self.dataset)
results_dict["train_loss"][iteration + 1] = loss_after_sharing
self.save_plot(
results_dict["train_loss"],
"train_loss",
"Training Loss",
"Communication Rounds",
os.path.join(self.log_dir, "{}_train_loss.png".format(self.rank)),
)
rounds_to_test -= 1
if self.dataset.__testing__ and rounds_to_test == 0:
rounds_to_test = self.test_after * change
logging.info("Evaluating on test set.")
ta, tl = self.dataset.test(self.model, self.loss)
results_dict["test_acc"][iteration + 1] = ta
results_dict["test_loss"][iteration + 1] = tl
if global_epoch == 49:
change *= 2
global_epoch += change
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)), "w"
) as of:
json.dump(results_dict, of)
if self.model.shared_parameters_counter is not None:
logging.info("Saving the shared parameter counts")
with open(
os.path.join(
self.log_dir, "{}_shared_parameters.json".format(self.rank)
),
"w",
) as of:
json.dump(self.model.shared_parameters_counter.numpy().tolist(), of)
self.disconnect_neighbors()
logging.info("Storing final weight")
self.model.dump_weights(self.weights_store_dir, self.uid, iteration)
logging.info("All neighbors disconnected. Process complete!")
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
working_fraction=1.0,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
working_fraction : float
Percentage of nodes participating in one global iteration
args : optional
Other arguments
"""
super().__init__(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
log_level,
*args
)
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
weights_store_dir,
log_level,
test_after,
train_evaluate_after,
*args
)
self.working_fraction = working_fraction
random.seed(self.mapping.get_uid(self.rank, self.machine_id))
self.run()
logging.info("Parameter Server exiting")
import logging
import math
import os
import queue
from random import Random
from threading import Lock, Thread
import numpy as np
import torch
from numpy.linalg import norm
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.node.OverlayNode import OverlayNode
class KNN(OverlayNode):
"""
This class defines the node for KNN Learning Node
"""
def similarityMetric(self, candidate):
logging.debug("A: {}".format(self.othersInfo[self.uid]))
logging.debug("B: {}".format(self.othersInfo[candidate]))
A = np.array(self.othersInfo[self.uid])
B = np.array(self.othersInfo[candidate])
return np.dot(A, B) / (norm(A) * norm(B))
def get_most_similar(self, candidates, to_keep=4):
if len(candidates) <= to_keep:
return candidates
cur_candidates = dict()
for i in candidates:
simil = round(self.similarityMetric(i), 3)
if simil not in cur_candidates:
cur_candidates[simil] = []
cur_candidates[simil].append(i)
similarity_scores = list(cur_candidates.keys())
similarity_scores.sort()
left_to_keep = to_keep
return_result = set()
for i in similarity_scores:
if left_to_keep >= len(cur_candidates[i]):
return_result.update(cur_candidates[i])
left_to_keep -= len(cur_candidates[i])
elif left_to_keep > 0:
return_result.update(
list(self.rng.sample(cur_candidates[i], left_to_keep))
)
left_to_keep = 0
break
else:
break
return return_result
def create_message_to_send(
self,
channel="KNNConstr",
boolean_flags=[],
add_my_info=False,
add_neighbor_info=False,
):
message = {"CHANNEL": channel, "KNNRound": self.knn_round}
for x in boolean_flags:
message[x] = True
if add_my_info:
message[self.uid] = self.othersInfo[self.uid]
if add_neighbor_info:
for neighbors in self.out_edges:
if neighbors in self.othersInfo:
message[neighbors] = self.othersInfo[neighbors]
return message
def receive_KNN_message(self):
return self.receive_channel("KNNConstr", block=False)
def process_init_receive(self, message):
self.mutex.acquire()
if "RESPONSE" in message[1]:
self.num_initializations += 1
else:
self.communication.send(
message[0],
self.create_message_to_send(
boolean_flags=["INIT", "RESPONSE"], add_my_info=True
),
)
x = (
message[0],
utils.remove_keys(message[1], ["RESPONSE", "INIT", "CHANNEL", "KNNRound"]),
)
self.othersInfo.update(x[1])
self.mutex.release()
def remove_meta_from_message(self, message):
return (
message[0],
utils.remove_keys(message[1], ["RESPONSE", "INIT", "CHANNEL", "KNNRound"]),
)
def process_candidates_without_lock(self, current_candidates, message):
if not self.exit_receiver:
message = (
message[0],
utils.remove_keys(
message[1], ["CHANNEL", "RESPONSE", "INIT", "KNNRound"]
),
)
self.othersInfo.update(message[1])
new_candidates = set(message[1].keys())
current_candidates = current_candidates.union(new_candidates)
if self.uid in current_candidates:
current_candidates.remove(self.uid)
self.out_edges = self.get_most_similar(current_candidates)
def send_response(self, message, add_neighbor_info=False, process_candidates=False):
self.mutex.acquire()
logging.debug("Responding to {}".format(message[0]))
self.communication.send(
message[0],
self.create_message_to_send(
boolean_flags=["RESPONSE"],
add_my_info=True,
add_neighbor_info=add_neighbor_info,
),
)
if process_candidates:
self.process_candidates_without_lock(set(self.out_edges), message)
self.mutex.release()
def receiver_thread(self):
knnBYEs = set()
self.num_initializations = 0
waiting_queue = queue.Queue()
while True:
if len(knnBYEs) == self.mapping.get_n_procs() - 1:
self.mutex.acquire()
if self.exit_receiver:
self.mutex.release()
logging.debug("Exiting thread")
return
self.mutex.release()
if self.num_initializations < self.initial_neighbors:
x = self.receive_KNN_message()
if x == None:
continue
elif "INIT" in x[1]:
self.process_init_receive(x)
else:
waiting_queue.put(x)
else:
logging.debug("Waiting for messages")
if waiting_queue.empty():
x = self.receive_KNN_message()
if x == None:
continue
else:
x = waiting_queue.get()
if "INIT" in x[1]:
logging.debug("A past INIT Message received from {}".format(x[0]))
self.process_init_receive(x)
elif "RESPONSE" in x[1]:
logging.debug(
"A response message received from {} from KNNRound {}".format(
x[0], x[1]["KNNRound"]
)
)
x = self.remove_meta_from_message(x)
self.responseQueue.put(x)
elif "RANDOM_DISCOVERY" in x[1]:
logging.debug(
"A Random Discovery message received from {} from KNNRound {}".format(
x[0], x[1]["KNNRound"]
)
)
self.send_response(
x, add_neighbor_info=False, process_candidates=False
)
elif "KNNBYE" in x[1]:
self.mutex.acquire()
knnBYEs.add(x[0])
logging.debug("{} KNN Byes received".format(knnBYEs))
if self.uid in x[1]["CLOSE"]:
self.in_edges.add(x[0])
self.mutex.release()
else:
logging.debug(
"A KNN sharing message received from {} from KNNRound {}".format(
x[0], x[1]["KNNRound"]
)
)
self.send_response(
x, add_neighbor_info=True, process_candidates=True
)
def build_topology(self, rounds=30, random_nodes=4):
self.knn_round = 0
self.exit_receiver = False
t = Thread(target=self.receiver_thread)
t.start()
# Initializations : Send my dataset info to others
self.mutex.acquire()
initial_KNN_message = self.create_message_to_send(
boolean_flags=["INIT"], add_my_info=True
)
for x in self.out_edges:
self.communication.send(x, initial_KNN_message)
self.mutex.release()
for round in range(rounds):
self.knn_round = round
logging.info("Starting KNN Round {}".format(round))
self.mutex.acquire()
rand_neighbor = self.rng.choice(list(self.out_edges))
logging.debug("Random neighbor: {}".format(rand_neighbor))
self.communication.send(
rand_neighbor,
self.create_message_to_send(add_my_info=True, add_neighbor_info=True),
)
self.mutex.release()
logging.debug("Waiting for knn response from {}".format(rand_neighbor))
response = self.responseQueue.get(block=True)
logging.debug("Got response from random neighbor")
self.mutex.acquire()
random_candidates = set(
self.rng.sample(list(range(self.mapping.get_n_procs())), random_nodes)
)
req_responses = 0
for rc in random_candidates:
logging.debug("Current random discovery: {}".format(rc))
if rc not in self.othersInfo and rc != self.uid:
logging.debug("Sending discovery request to {}".format(rc))
self.communication.send(
rc,
self.create_message_to_send(boolean_flags=["RANDOM_DISCOVERY"]),
)
req_responses += 1
self.mutex.release()
while req_responses > 0:
logging.debug(
"Waiting for {} random discovery responses.".format(req_responses)
)
req_responses -= 1
random_discovery_response = self.responseQueue.get(block=True)
logging.debug(
"Received discovery response from {}".format(
random_discovery_response[0]
)
)
self.mutex.acquire()
self.othersInfo.update(random_discovery_response[1])
self.mutex.release()
self.mutex.acquire()
self.process_candidates_without_lock(
random_candidates.union(self.out_edges), response
)
self.mutex.release()
logging.info("Completed KNN Round {}".format(round))
logging.debug("OutNodes: {}".format(self.out_edges))
# Send out_edges and BYE to all
to_send = self.create_message_to_send(boolean_flags=["KNNBYE"])
logging.info("Sending KNNByes")
self.mutex.acquire()
self.exit_receiver = True
to_send["CLOSE"] = list(self.out_edges) # Optimize to only send Yes/No
for receiver in range(self.mapping.get_n_procs()):
if receiver != self.uid:
self.communication.send(receiver, to_send)
self.mutex.release()
logging.info("KNNByes Sent")
t.join()
logging.info("Receiver Thread Returned")
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
initial_neighbors=4,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
args : optional
Other arguments
"""
total_threads = os.cpu_count()
self.threads_per_proc = max(
math.floor(total_threads / mapping.procs_per_machine), 1
)
torch.set_num_threads(self.threads_per_proc)
torch.set_num_interop_threads(1)
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
weights_store_dir,
log_level,
test_after,
train_evaluate_after,
reset_optimizer,
*args
)
self.rng = Random()
self.rng.seed(self.uid + 100)
self.initial_neighbors = initial_neighbors
self.in_edges = set()
self.out_edges = set(
self.rng.sample(
list(self.graph.neighbors(self.uid)), self.initial_neighbors
)
)
self.responseQueue = queue.Queue()
self.mutex = Lock()
self.othersInfo = {self.uid: list(self.dataset.get_label_distribution())}
# ld = self.dataset.get_label_distribution()
# ld_keys = sorted(list(ld.keys()))
# self.othersInfo = {self.uid: []}
# for key in range(max(ld_keys) + 1):
# if key in ld:
# self.othersInfo[self.uid].append(ld[key])
# else:
# self.othersInfo[self.uid].append(0)
logging.info("Label Distributions: {}".format(self.othersInfo))
logging.info(
"Each proc uses %d threads out of %d.", self.threads_per_proc, total_threads
)
self.run()
import importlib
import json
import logging
import math
import os
from collections import deque
import torch
from matplotlib import pyplot as plt
from decentralizepy import utils
from decentralizepy.communication.TCP import TCP
from decentralizepy.graphs.Graph import Graph
from decentralizepy.graphs.Star import Star
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.train_test_evaluation import TrainTestHelper
class Node:
......@@ -21,31 +17,112 @@ class Node:
"""
def save_plot(self, l, label, title, xlabel, filename):
def connect_neighbor(self, neighbor):
"""
Save Matplotlib plot. Clears previous plots.
Connects given neighbor. Sends HELLO.
Parameters
----------
l : dict
dict of x -> y. `x` must be castable to int.
label : str
label of the plot. Used for legend.
title : str
Header
xlabel : str
x-axis label
filename : str
Name of file to save the plot as.
"""
logging.info("Sending connection request to {}".format(neighbor))
self.communication.init_connection(neighbor)
self.communication.send(neighbor, {"HELLO": self.uid, "CHANNEL": "CONNECT"})
def receive_channel(self, channel, block=True):
if channel not in self.message_queue:
self.message_queue[channel] = deque()
if len(self.message_queue[channel]) > 0:
return self.message_queue[channel].popleft()
else:
x = self.communication.receive(block=block)
if x == None:
assert not block
return None
sender, recv = x
logging.info(
"Received some message from {} with CHANNEL: {}".format(
sender, recv["CHANNEL"]
)
)
assert "CHANNEL" in recv
while recv["CHANNEL"] != channel:
if recv["CHANNEL"] not in self.message_queue:
self.message_queue[recv["CHANNEL"]] = deque()
self.message_queue[recv["CHANNEL"]].append((sender, recv))
x = self.communication.receive(block=block)
if x == None:
assert not block
return None
sender, recv = x
logging.info(
"Received some message from {} with CHANNEL: {}".format(
sender, recv["CHANNEL"]
)
)
return (sender, recv)
def receive_hello(self):
return self.receive_channel("CONNECT")
def wait_for_hello(self, neighbor):
"""
Waits for HELLO.
Caches any data received while waiting for HELLOs.
Raises
------
RuntimeError
If received BYE while waiting for HELLO
"""
while neighbor not in self.barrier:
logging.info("Waiting HELLO from {}".format(neighbor))
sender, _ = self.receive_hello()
logging.info("Received HELLO from {}".format(sender))
self.barrier.add(sender)
def connect_neighbors(self):
"""
Connects all neighbors. Sends HELLO. Waits for HELLO.
Caches any data received while waiting for HELLOs.
Raises
------
RuntimeError
If received BYE while waiting for HELLO
"""
logging.info("Sending connection request to all neighbors")
wait_acknowledgements = []
for neighbor in self.my_neighbors:
if not self.communication.already_connected(neighbor):
self.connect_neighbor(neighbor)
wait_acknowledgements.append(neighbor)
for neighbor in wait_acknowledgements:
self.wait_for_hello(neighbor)
def receive_disconnect(self):
return self.receive_channel("DISCONNECT")
def disconnect_neighbors(self):
"""
Disconnects all neighbors.
Raises
------
RuntimeError
If received another message while waiting for BYEs
"""
plt.clf()
y_axis = [l[key] for key in l.keys()]
x_axis = list(map(int, l.keys()))
plt.plot(x_axis, y_axis, label=label)
plt.xlabel(xlabel)
plt.title(title)
plt.savefig(filename)
if not self.sent_disconnections:
logging.info("Disconnecting neighbors")
for uid in self.barrier:
self.communication.send(uid, {"BYE": self.uid, "CHANNEL": "DISCONNECT"})
self.sent_disconnections = True
while len(self.barrier):
sender, _ = self.receive_disconnect()
self.barrier.remove(sender)
def init_log(self, log_dir, rank, log_level, force=True):
"""
......@@ -68,7 +145,7 @@ class Node:
filename=log_file,
format="[%(asctime)s][%(module)s][%(levelname)s] %(message)s",
level=log_level,
force=True,
force=force,
)
def cache_fields(
......@@ -79,12 +156,6 @@ class Node:
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
reset_optimizer,
centralized_train_eval,
centralized_test_eval,
):
"""
Instantiate object field with arguments.
......@@ -103,18 +174,6 @@ class Node:
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
centralized_train_eval : bool
If set the train set evaluation happens at the node with uid 0
centralized_test_eval : bool
If set the train set evaluation happens at the node with uid 0
"""
self.rank = rank
self.machine_id = machine_id
......@@ -122,19 +181,12 @@ class Node:
self.mapping = mapping
self.uid = self.mapping.get_uid(rank, machine_id)
self.log_dir = log_dir
self.weights_store_dir = weights_store_dir
self.iterations = iterations
self.test_after = test_after
self.train_evaluate_after = train_evaluate_after
self.reset_optimizer = reset_optimizer
self.centralized_train_eval = centralized_train_eval
self.centralized_test_eval = centralized_test_eval
logging.debug("Rank: %d", self.rank)
logging.debug("type(graph): %s", str(type(self.rank)))
logging.debug("type(mapping): %s", str(type(self.mapping)))
self.sent_disconnections = False
self.star = Star(self.mapping.get_n_procs())
logging.info("Rank: %d", self.rank)
logging.info("type(graph): %s", str(type(self.rank)))
logging.info("type(mapping): %s", str(type(self.mapping)))
def init_dataset_model(self, dataset_configs):
"""
......@@ -243,17 +295,6 @@ class Node:
comm_class = getattr(comm_module, comm_configs["comm_class"])
comm_params = utils.remove_keys(comm_configs, ["comm_package", "comm_class"])
self.addresses_filepath = comm_params.get("addresses_filepath", None)
if self.centralized_test_eval:
self.testing_comm = TCP(
self.rank,
self.machine_id,
self.mapping,
self.star.n_procs,
self.addresses_filepath,
offset=self.star.n_procs,
)
self.testing_comm.connect_neighbors(self.star.neighbors(self.uid))
self.communication = comm_class(
self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params
)
......@@ -294,13 +335,7 @@ class Node:
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
centralized_train_eval=False,
centralized_test_eval=True,
*args
):
"""
......@@ -322,26 +357,16 @@ class Node:
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
centralized_train_eval : bool
If set the train set evaluation happens at the node with uid 0
centralized_test_eval : bool
If set the train set evaluation happens at the node with uid 0
args : optional
Other arguments
"""
logging.info("Started process.")
self.init_log(log_dir, rank, log_level)
self.cache_fields(
rank,
machine_id,
......@@ -349,18 +374,17 @@ class Node:
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
reset_optimizer,
centralized_train_eval,
centralized_test_eval,
)
self.init_log(log_dir, rank, log_level)
self.init_dataset_model(config["DATASET"])
self.init_optimizer(config["OPTIMIZER_PARAMS"])
self.init_trainer(config["TRAIN_PARAMS"])
self.init_comm(config["COMMUNICATION"])
self.message_queue = dict()
self.barrier = set()
self.my_neighbors = self.graph.neighbors(self.uid)
self.init_sharing(config["SHARING"])
def run(self):
......@@ -368,146 +392,7 @@ class Node:
Start the decentralized learning
"""
self.testset = self.dataset.get_testset()
self.communication.connect_neighbors(self.graph.neighbors(self.uid))
rounds_to_test = self.test_after
rounds_to_train_evaluate = self.train_evaluate_after
global_epoch = 1
change = 1
if self.uid == 0:
dataset = self.dataset
if self.centralized_train_eval:
dataset_params_copy = self.dataset_params.copy()
if "sizes" in dataset_params_copy:
del dataset_params_copy["sizes"]
self.whole_dataset = self.dataset_class(
self.rank,
self.machine_id,
self.mapping,
sizes=[1.0],
**dataset_params_copy
)
dataset = self.whole_dataset
if self.centralized_test_eval:
tthelper = TrainTestHelper(
dataset, # self.whole_dataset,
# self.model_test, # todo: this only works if eval_train is set to false
self.model,
self.loss,
self.weights_store_dir,
self.mapping.get_n_procs(),
self.trainer,
self.testing_comm,
self.star,
self.threads_per_proc,
eval_train=self.centralized_train_eval,
)
for iteration in range(self.iterations):
logging.info("Starting training iteration: %d", iteration)
self.trainer.train(self.dataset)
self.sharing.step()
if self.reset_optimizer:
self.optimizer = self.optimizer_class(
self.model.parameters(), **self.optimizer_params
) # Reset optimizer state
self.trainer.reset_optimizer(self.optimizer)
if iteration:
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)),
"r",
) as inf:
results_dict = json.load(inf)
else:
results_dict = {
"train_loss": {},
"test_loss": {},
"test_acc": {},
"total_bytes": {},
"total_meta": {},
"total_data_per_n": {},
"grad_mean": {},
"grad_std": {},
}
results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes
if hasattr(self.communication, "total_meta"):
results_dict["total_meta"][
iteration + 1
] = self.communication.total_meta
if hasattr(self.communication, "total_data"):
results_dict["total_data_per_n"][
iteration + 1
] = self.communication.total_data
if hasattr(self.sharing, "mean"):
results_dict["grad_mean"][iteration + 1] = self.sharing.mean
if hasattr(self.sharing, "std"):
results_dict["grad_std"][iteration + 1] = self.sharing.std
rounds_to_train_evaluate -= 1
if rounds_to_train_evaluate == 0 and not self.centralized_train_eval:
logging.info("Evaluating on train set.")
rounds_to_train_evaluate = self.train_evaluate_after * change
loss_after_sharing = self.trainer.eval_loss(self.dataset)
results_dict["train_loss"][iteration + 1] = loss_after_sharing
self.save_plot(
results_dict["train_loss"],
"train_loss",
"Training Loss",
"Communication Rounds",
os.path.join(self.log_dir, "{}_train_loss.png".format(self.rank)),
)
rounds_to_test -= 1
if self.dataset.__testing__ and rounds_to_test == 0:
rounds_to_test = self.test_after * change
# ta, tl = self.dataset.test(self.model, self.loss)
# self.model.dump_weights(self.weights_store_dir, self.uid, iteration)
if self.centralized_test_eval:
if self.uid == 0:
ta, tl, trl = tthelper.train_test_evaluation(iteration)
results_dict["test_acc"][iteration + 1] = ta
results_dict["test_loss"][iteration + 1] = tl
if trl is not None:
results_dict["train_loss"][iteration + 1] = trl
else:
self.testing_comm.send(0, self.model.get_weights())
sender, data = self.testing_comm.receive()
assert sender == 0 and data == "finished"
else:
logging.info("Evaluating on test set.")
ta, tl = self.dataset.test(self.model, self.loss)
results_dict["test_acc"][iteration + 1] = ta
results_dict["test_loss"][iteration + 1] = tl
if global_epoch == 49:
change *= 2
global_epoch += change
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)), "w"
) as of:
json.dump(results_dict, of)
if self.model.shared_parameters_counter is not None:
logging.info("Saving the shared parameter counts")
with open(
os.path.join(
self.log_dir, "{}_shared_parameters.json".format(self.rank)
),
"w",
) as of:
json.dump(self.model.shared_parameters_counter.numpy().tolist(), of)
self.communication.disconnect_neighbors()
logging.info("Storing final weight")
self.model.dump_weights(self.weights_store_dir, self.uid, iteration)
logging.info("All neighbors disconnected. Process complete!")
raise NotImplementedError
def __init__(
self,
......@@ -518,13 +403,7 @@ class Node:
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
centralized_train_eval=0,
centralized_test_eval=1,
*args
):
"""
......@@ -559,28 +438,12 @@ class Node:
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
centralized_train_eval : int
If set then the train set evaluation happens at the node with uid 0.
Note: If it is True then centralized_test_eval needs to be true as well!
centralized_test_eval : int
If set then the trainset evaluation happens at the node with uid 0
args : optional
Other arguments
"""
centralized_train_eval = centralized_train_eval == 1
centralized_test_eval = centralized_test_eval == 1
# If centralized_train_eval is True then centralized_test_eval needs to be true as well!
assert not centralized_train_eval or centralized_test_eval
total_threads = os.cpu_count()
self.threads_per_proc = max(
......@@ -588,25 +451,17 @@ class Node:
)
torch.set_num_threads(self.threads_per_proc)
torch.set_num_interop_threads(1)
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
weights_store_dir,
log_level,
test_after,
train_evaluate_after,
reset_optimizer,
centralized_train_eval == 1,
centralized_test_eval == 1,
*args
)
# self.instantiate(
# rank,
# machine_id,
# mapping,
# graph,
# config,
# iterations,
# log_dir,
# log_level,
# *args
# )
logging.info(
"Each proc uses %d threads out of %d.", self.threads_per_proc, total_threads
)
self.run()
import importlib
import json
import logging
import math
import os
from collections import deque
import torch
from matplotlib import pyplot as plt
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.node.Node import Node
class OverlayNode(Node):
"""
This class defines the node on overlay graph
"""
def save_plot(self, l, label, title, xlabel, filename):
"""
Save Matplotlib plot. Clears previous plots.
Parameters
----------
l : dict
dict of x -> y. `x` must be castable to int.
label : str
label of the plot. Used for legend.
title : str
Header
xlabel : str
x-axis label
filename : str
Name of file to save the plot as.
"""
plt.clf()
y_axis = [l[key] for key in l.keys()]
x_axis = list(map(int, l.keys()))
plt.plot(x_axis, y_axis, label=label)
plt.xlabel(xlabel)
plt.title(title)
plt.savefig(filename)
def get_neighbors(self, node=None):
return self.my_neighbors
def receive_DPSGD(self):
return self.receive_channel("DPSGD")
def build_topology(self):
pass
def run(self):
"""
Start the decentralized learning
"""
self.testset = self.dataset.get_testset()
rounds_to_test = self.test_after
rounds_to_train_evaluate = self.train_evaluate_after
global_epoch = 1
change = 1
self.connect_neighbors()
logging.info("Connected to all neighbors")
self.build_topology()
logging.info("OutNodes: {}".format(self.out_edges))
logging.info("InNodes: {}".format(self.in_edges))
logging.info("Unifying edges")
self.out_edges = self.out_edges.union(self.in_edges)
self.my_neighbors = self.in_edges = set(self.out_edges)
logging.info("Total number of neighbor: {}".format(len(self.my_neighbors)))
for iteration in range(self.iterations):
logging.info("Starting training iteration: %d", iteration)
rounds_to_train_evaluate -= 1
rounds_to_test -= 1
self.iteration = iteration
self.trainer.train(self.dataset)
to_send = self.sharing.get_data_to_send()
to_send["CHANNEL"] = "DPSGD"
to_send["degree"] = len(self.in_edges)
assert len(self.out_edges) != 0
assert len(self.in_edges) != 0
for neighbor in self.out_edges:
self.communication.send(neighbor, to_send)
while not self.received_from_all():
sender, data = self.receive_DPSGD()
logging.info(
"Received Model from {} of iteration {}".format(
sender, data["iteration"]
)
)
if sender not in self.peer_deques:
self.peer_deques[sender] = deque()
self.peer_deques[sender].append(data)
averaging_deque = dict()
for neighbor in self.in_edges:
averaging_deque[neighbor] = self.peer_deques[neighbor]
self.sharing._averaging(averaging_deque)
if self.reset_optimizer:
self.optimizer = self.optimizer_class(
self.model.parameters(), **self.optimizer_params
) # Reset optimizer state
self.trainer.reset_optimizer(self.optimizer)
if iteration:
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)),
"r",
) as inf:
results_dict = json.load(inf)
else:
results_dict = {
"train_loss": {},
"test_loss": {},
"test_acc": {},
"total_bytes": {},
"total_meta": {},
"total_data_per_n": {},
}
results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes
if hasattr(self.communication, "total_meta"):
results_dict["total_meta"][
iteration + 1
] = self.communication.total_meta
if hasattr(self.communication, "total_data"):
results_dict["total_data_per_n"][
iteration + 1
] = self.communication.total_data
if rounds_to_train_evaluate == 0:
logging.info("Evaluating on train set.")
rounds_to_train_evaluate = self.train_evaluate_after * change
loss_after_sharing = self.trainer.eval_loss(self.dataset)
results_dict["train_loss"][iteration + 1] = loss_after_sharing
self.save_plot(
results_dict["train_loss"],
"train_loss",
"Training Loss",
"Communication Rounds",
os.path.join(self.log_dir, "{}_train_loss.png".format(self.rank)),
)
if self.dataset.__testing__ and rounds_to_test == 0:
rounds_to_test = self.test_after * change
logging.info("Evaluating on test set.")
ta, tl = self.dataset.test(self.model, self.loss)
results_dict["test_acc"][iteration + 1] = ta
results_dict["test_loss"][iteration + 1] = tl
if global_epoch == 49:
change *= 2
global_epoch += change
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)), "w"
) as of:
json.dump(results_dict, of)
# if self.model.shared_parameters_counter is not None:
# logging.info("Saving the shared parameter counts")
# with open(
# os.path.join(
# self.log_dir, "{}_shared_parameters.json".format(self.rank)
# ),
# "w",
# ) as of:
# json.dump(self.model.shared_parameters_counter.numpy().tolist(), of)
self.disconnect_neighbors()
# logging.info("Storing final weight")
# self.model.dump_weights(self.weights_store_dir, self.uid, iteration)
logging.info("All neighbors disconnected. Process complete!")
def cache_fields(
self,
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
reset_optimizer,
):
"""
Instantiate object field with arguments.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
"""
self.rank = rank
self.machine_id = machine_id
self.graph = graph
self.mapping = mapping
self.uid = self.mapping.get_uid(rank, machine_id)
self.log_dir = log_dir
self.weights_store_dir = weights_store_dir
self.iterations = iterations
self.test_after = test_after
self.train_evaluate_after = train_evaluate_after
self.reset_optimizer = reset_optimizer
self.sent_disconnections = False
logging.info("Rank: %d", self.rank)
logging.info("type(graph): %s", str(type(self.rank)))
logging.info("type(mapping): %s", str(type(self.mapping)))
def init_comm(self, comm_configs):
"""
Instantiate communication module from config.
Parameters
----------
comm_configs : dict
Python dict containing communication config params
"""
comm_module = importlib.import_module(comm_configs["comm_package"])
comm_class = getattr(comm_module, comm_configs["comm_class"])
comm_params = utils.remove_keys(comm_configs, ["comm_package", "comm_class"])
self.addresses_filepath = comm_params.get("addresses_filepath", None)
self.communication = comm_class(
self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params
)
def instantiate(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
*args
):
"""
Construct objects.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations.
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
args : optional
Other arguments
"""
logging.info("Started process.")
self.init_log(log_dir, rank, log_level)
self.cache_fields(
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
reset_optimizer,
)
self.init_dataset_model(config["DATASET"])
self.init_optimizer(config["OPTIMIZER_PARAMS"])
self.init_trainer(config["TRAIN_PARAMS"])
self.init_comm(config["COMMUNICATION"])
self.message_queue = dict()
self.barrier = set()
self.my_neighbors = self.graph.neighbors(self.uid)
self.init_sharing(config["SHARING"])
self.peer_deques = dict()
self.connect_neighbors()
def received_from_all(self):
"""
Check if all neighbors have sent the current iteration
Returns
-------
bool
True if required data has been received, False otherwise
"""
for k in self.in_edges:
if (k not in self.peer_deques) or len(self.peer_deques[k]) == 0:
return False
return True
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
args : optional
Other arguments
"""
total_threads = os.cpu_count()
self.threads_per_proc = max(
math.floor(total_threads / mapping.procs_per_machine), 1
)
torch.set_num_threads(self.threads_per_proc)
torch.set_num_interop_threads(1)
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
weights_store_dir,
log_level,
test_after,
train_evaluate_after,
reset_optimizer,
*args
)
self.in_edges = set()
self.out_edges = set()
logging.info(
"Each proc uses %d threads out of %d.", self.threads_per_proc, total_threads
)
self.run()
import importlib
import logging
import os
from collections import deque
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.node.Node import Node
class PeerSampler(Node):
"""
This class defines the peer sampling service
"""
def init_log(self, log_dir, log_level, force=True):
"""
Instantiate Logging.
Parameters
----------
log_dir : str
Logging directory
rank : rank : int
Rank of process local to the machine
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
force : bool
Argument to logging.basicConfig()
"""
log_file = os.path.join(log_dir, "PeerSampler.log")
logging.basicConfig(
filename=log_file,
format="[%(asctime)s][%(module)s][%(levelname)s] %(message)s",
level=log_level,
force=force,
)
def cache_fields(
self,
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
):
"""
Instantiate object field with arguments.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
"""
self.rank = rank
self.machine_id = machine_id
self.graph = graph
self.mapping = mapping
self.uid = self.mapping.get_uid(rank, machine_id)
self.log_dir = log_dir
self.iterations = iterations
self.sent_disconnections = False
logging.info("Rank: %d", self.rank)
logging.info("type(graph): %s", str(type(self.rank)))
logging.info("type(mapping): %s", str(type(self.mapping)))
def init_comm(self, comm_configs):
"""
Instantiate communication module from config.
Parameters
----------
comm_configs : dict
Python dict containing communication config params
"""
comm_module = importlib.import_module(comm_configs["comm_package"])
comm_class = getattr(comm_module, comm_configs["comm_class"])
comm_params = utils.remove_keys(comm_configs, ["comm_package", "comm_class"])
self.addresses_filepath = comm_params.get("addresses_filepath", None)
self.communication = comm_class(
self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params
)
def instantiate(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
log_level=logging.INFO,
*args
):
"""
Construct objects.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations.
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
args : optional
Other arguments
"""
logging.info("Started process.")
self.init_log(log_dir, log_level)
self.cache_fields(
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
)
self.message_queue = dict()
self.barrier = set()
self.init_comm(config["COMMUNICATION"])
self.my_neighbors = self.graph.get_all_nodes()
self.connect_neighbors()
def get_neighbors(self, node, iteration=None):
return self.graph.neighbors(node)
def receive_server_request(self):
return self.receive_channel("SERVER_REQUEST")
def run(self):
"""
Start the peer-sampling service.
"""
while len(self.barrier) > 0:
sender, data = self.receive_server_request()
if "BYE" in data:
logging.debug("Received {} from {}".format("BYE", sender))
self.barrier.remove(sender)
elif "REQUEST_NEIGHBORS" in data:
logging.debug("Received {} from {}".format("Request", sender))
if "iteration" in data:
resp = {
"NEIGHBORS": self.get_neighbors(sender, data["iteration"]),
"CHANNEL": "PEERS",
}
else:
resp = {"NEIGHBORS": self.get_neighbors(sender), "CHANNEL": "PEERS"}
self.communication.send(sender, resp)
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
log_level=logging.INFO,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
args : optional
Other arguments
"""
super().__init__(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
log_level,
*args
)
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
log_level,
*args
)
self.run()
logging.info("Peer Sampler exiting")
import logging
from collections import deque
from decentralizepy.graphs.Graph import Graph
from decentralizepy.graphs.Regular import Regular
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.node.PeerSampler import PeerSampler
class PeerSamplerDynamic(PeerSampler):
"""
This class defines the peer sampling service
"""
def get_neighbors(self, node, iteration=None):
if iteration != None:
if iteration > self.iteration:
logging.info(
"iteration, self.iteration: {}, {}".format(
iteration, self.iteration
)
)
assert iteration == self.iteration + 1
self.iteration = iteration
self.graphs.append(Regular(self.graph.n_procs, self.graph_degree))
return self.graphs[iteration].neighbors(node)
else:
return self.graph.neighbors(node)
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
log_level=logging.INFO,
graph_degree=4,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
args : optional
Other arguments
"""
self.iteration = -1
self.graphs = []
self.graph_degree = graph_degree
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
log_level,
*args
)
self.run()
logging.info("Peer Sampler exiting")