Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • sacs/decentralizepy
  • mvujas/decentralizepy
  • randl/decentralizepy
3 results
Show changes
Showing
with 3607 additions and 320 deletions
import importlib
import json
import logging
import math
import os
from collections import deque
import torch
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.node.Node import Node
class DPSGDNodeFederated(Node):
"""
This class defines the node for federated DPSGD
"""
def run(self):
"""
Start the decentralized learning
"""
while len(self.barrier):
sender, data = self.receive_channel("WORKER_REQUEST")
if "BYE" in data:
logging.info("Received {} from {}".format("BYE", sender))
self.barrier.remove(sender)
break
iteration = data["iteration"]
del data["iteration"]
del data["CHANNEL"]
self.model.load_state_dict(data["params"])
self.sharing._post_step()
self.sharing.communication_round += 1
logging.info(
"Received worker request at node {}, global iteration {}, local round {}".format(
self.uid, iteration, self.participated
)
)
if self.reset_optimizer:
self.optimizer = self.optimizer_class(
self.model.parameters(), **self.optimizer_params
) # Reset optimizer state
self.trainer.reset_optimizer(self.optimizer)
# Perform iteration
logging.info("Starting training iteration")
self.trainer.train(self.dataset)
# Send update to server
to_send = self.sharing.get_data_to_send()
to_send["CHANNEL"] = "DPSGD"
self.communication.send(self.parameter_server_uid, to_send)
if self.participated > 0:
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)),
"r",
) as inf:
results_dict = json.load(inf)
else:
results_dict = {
"train_loss": {},
"test_loss": {},
"test_acc": {},
"total_bytes": {},
"total_meta": {},
"total_data_per_n": {},
}
results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes
if hasattr(self.communication, "total_meta"):
results_dict["total_meta"][
iteration + 1
] = self.communication.total_meta
if hasattr(self.communication, "total_data"):
results_dict["total_data_per_n"][
iteration + 1
] = self.communication.total_data
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)), "w"
) as of:
json.dump(results_dict, of)
self.participated += 1
# only if has participated in learning
if self.participated > 0:
logging.info("Storing final weight")
self.model.dump_weights(self.weights_store_dir, self.uid, iteration)
logging.info("Server disconnected. Process complete!")
def cache_fields(
self,
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
reset_optimizer,
):
"""
Instantiate object field with arguments.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
"""
self.rank = rank
self.machine_id = machine_id
self.graph = graph
self.mapping = mapping
self.uid = self.mapping.get_uid(rank, machine_id)
self.log_dir = log_dir
self.weights_store_dir = weights_store_dir
self.iterations = iterations
self.test_after = test_after
self.train_evaluate_after = train_evaluate_after
self.reset_optimizer = reset_optimizer
self.sent_disconnections = False
logging.info("Rank: %d", self.rank)
logging.info("type(graph): %s", str(type(self.rank)))
logging.info("type(mapping): %s", str(type(self.mapping)))
def init_comm(self, comm_configs):
"""
Instantiate communication module from config.
Parameters
----------
comm_configs : dict
Python dict containing communication config params
"""
comm_module = importlib.import_module(comm_configs["comm_package"])
comm_class = getattr(comm_module, comm_configs["comm_class"])
comm_params = utils.remove_keys(comm_configs, ["comm_package", "comm_class"])
self.addresses_filepath = comm_params.get("addresses_filepath", None)
self.communication = comm_class(
self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params
)
def instantiate(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
*args
):
"""
Construct objects.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations.
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
args : optional
Other arguments
"""
logging.info("Started process.")
self.init_log(log_dir, rank, log_level)
self.cache_fields(
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
reset_optimizer,
)
self.init_dataset_model(config["DATASET"])
self.init_optimizer(config["OPTIMIZER_PARAMS"])
self.init_trainer(config["TRAIN_PARAMS"])
self.init_comm(config["COMMUNICATION"])
self.message_queue = dict()
self.barrier = set()
self.participated = 0
self.init_sharing(config["SHARING"])
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
parameter_server_uid=-1,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
parameter_server_uid: int
The parameter server's uid
args : optional
Other arguments
"""
total_threads = os.cpu_count()
self.threads_per_proc = max(
math.floor(total_threads / mapping.procs_per_machine), 1
)
torch.set_num_threads(self.threads_per_proc)
torch.set_num_interop_threads(1)
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
weights_store_dir,
log_level,
test_after,
train_evaluate_after,
reset_optimizer,
*args
)
logging.info(
"Each proc uses %d threads out of %d.", self.threads_per_proc, total_threads
)
self.message_queue["PEERS"] = deque()
self.parameter_server_uid = parameter_server_uid
self.connect_neighbor(self.parameter_server_uid)
self.wait_for_hello(self.parameter_server_uid)
self.run()
import logging
import math
import os
from collections import deque
import torch
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.node.DPSGDNode import DPSGDNode
class DPSGDWithPeerSampler(DPSGDNode):
"""
This class defines the node for DPSGD
"""
def receive_neighbors(self):
return self.receive_channel("PEERS")[1]["NEIGHBORS"]
def get_neighbors(self, node=None):
logging.info("Requesting neighbors from the peer sampler.")
self.communication.send(
self.peer_sampler_uid,
{
"REQUEST_NEIGHBORS": self.uid,
"iteration": self.iteration,
"CHANNEL": "SERVER_REQUEST",
},
)
my_neighbors = self.receive_neighbors()
logging.info("Neighbors this round: {}".format(my_neighbors))
return my_neighbors
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
peer_sampler_uid=-1,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
args : optional
Other arguments
"""
total_threads = os.cpu_count()
self.threads_per_proc = max(
math.floor(total_threads / mapping.procs_per_machine), 1
)
torch.set_num_threads(self.threads_per_proc)
torch.set_num_interop_threads(1)
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
weights_store_dir,
log_level,
test_after,
train_evaluate_after,
reset_optimizer,
*args
)
logging.info(
"Each proc uses %d threads out of %d.", self.threads_per_proc, total_threads
)
self.message_queue["PEERS"] = deque()
self.peer_sampler_uid = peer_sampler_uid
self.connect_neighbor(self.peer_sampler_uid)
self.wait_for_hello(self.peer_sampler_uid)
self.run()
def disconnect_neighbors(self):
"""
Disconnects all neighbors.
Raises
------
RuntimeError
If received another message while waiting for BYEs
"""
if not self.sent_disconnections:
logging.info("Disconnecting neighbors")
for uid in self.barrier:
self.communication.send(uid, {"BYE": self.uid, "CHANNEL": "DISCONNECT"})
self.communication.send(
self.peer_sampler_uid, {"BYE": self.uid, "CHANNEL": "SERVER_REQUEST"}
)
self.sent_disconnections = True
self.barrier.remove(self.peer_sampler_uid)
while len(self.barrier):
sender, _ = self.receive_disconnect()
self.barrier.remove(sender)
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
# Deprecated
import logging
from decentralizepy.sharing.PartialModel import PartialModel
......@@ -25,6 +26,9 @@ class GrowingAlpha(PartialModel):
dict_ordered=True,
save_shared=False,
metadata_cap=1.0,
compress=False,
compression_package=None,
compression_class=None,
):
"""
Constructor
......@@ -74,12 +78,15 @@ class GrowingAlpha(PartialModel):
dict_ordered,
save_shared,
metadata_cap,
compress,
compression_package,
compression_class,
)
self.init_alpha = init_alpha
self.max_alpha = max_alpha
self.k = k
def step(self):
def get_data_to_send(self):
"""
Perform a sharing step. Implements D-PSGD with alpha increasing as a linear function.
......@@ -93,4 +100,4 @@ class GrowingAlpha(PartialModel):
self.communication_round += 1
return
super().step()
return super().get_data_to_send()
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.