Skip to content
Snippets Groups Projects
Commit adcd9b05 authored by Rishi Sharma's avatar Rishi Sharma
Browse files

One node training, README

parent b67a8512
No related branches found
No related tags found
No related merge requests found
# decentralizepy
\ No newline at end of file
decentralizepy
==============
Node
----
* The Manager. Optimizations at process level.
Dataset
-------
* Static
Training
--------
* Heterogeneity. How much do I want to work?
Graph
-----
* Static. Who are my neighbours? Topologies.
Mapping
-------
* Naming. The globally unique ids of the processes <-> machine_id, local_rank
Sharing
-------
* Leverage Redundancy. Privacy. Optimizations in model and data sharing.
Communication
-------------
* IPC/Network level. Compression. Privacy. Reliability
......@@ -7,7 +7,7 @@ dataset_package = decentralizepy.datasets.Femnist
dataset_class = Femnist
model_class = LogisticRegression
n_procs = 1.0
train_dir =
train_dir = leaf/data/femnist/data/train
test_dir =
; python list of fractions below
sizes = [0.4, 0.2, 0.3, 0.1]
......@@ -22,3 +22,14 @@ training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
shuffle = False
loss_package = torch.nn.functional
loss = nll_loss
[COMMUNICATION]
comm_package = decentralizepy.communication.Communication
comm_class = Communcation
[SHARING]
sharing_package = decentralizepy.sharing.Sharing
sharing_class = Sharing
\ No newline at end of file
class Communication:
"""
Communcation API
"""
def __init__():
raise NotImplementedError
\ No newline at end of file
......@@ -16,6 +16,9 @@ class Data:
self.x = x
self.y = y
def __len__(self):
return self.y.shape[0]
def __getitem__(self, i):
"""
Function to get the item with index i.
......
......@@ -49,7 +49,7 @@ class Dataset:
Function to get the training set
Returns
-------
Dataset
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
......@@ -62,7 +62,7 @@ class Dataset:
Function to get the test set
Returns
-------
Dataset
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
......
......@@ -7,6 +7,7 @@ import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
from torch.utils.data import DataLoader
from decentralizepy.datasets.Data import Data
from decentralizepy.datasets.Dataset import Dataset
......@@ -15,6 +16,7 @@ from decentralizepy.datasets.Partitioner import DataPartitioner
NUM_CLASSES = 62
IMAGE_SIZE = (28, 28)
FLAT_SIZE = 28 * 28
PIXEL_RANGE = 256.0
class Femnist(Dataset):
......@@ -81,7 +83,7 @@ class Femnist(Dataset):
frac = e / c_len
self.sizes = [frac] * self.n_procs
self.sizes[-1] += 1.0 - frac * self.n_procs
print(self.sizes)
logging.debug("Size fractions: {}".format(sizes))
my_clients = DataPartitioner(clients, self.sizes).use(self.rank)
my_train_data = {"x": [], "y": []}
......@@ -96,13 +98,13 @@ class Femnist(Dataset):
my_train_data["y"].extend(train_data[cur_client]["y"])
self.num_samples.append(len(train_data[cur_client]["y"]))
self.train_x = (
np.array(my_train_data["x"], dtype=np.dtype("float64"))
np.array(my_train_data["x"], dtype=np.dtype("float32"))
.reshape(-1, 28, 28, 1)
.transpose(0, 3, 1, 2)
)
self.train_y = np.array(
my_train_data["y"], dtype=np.dtype("float64")
).reshape(-1, 1)
my_train_data["y"], dtype=np.dtype("int64")
).reshape(-1)
logging.debug("train_x.shape: %s", str(self.train_x.shape))
logging.debug("train_y.shape: %s", str(self.train_y.shape))
......@@ -111,12 +113,12 @@ class Femnist(Dataset):
_, _, test_data = self.__read_dir__(self.test_dir)
test_data = test_data.values()
self.test_x = (
np.array(test_data["x"], dtype=np.dtype("float64"))
np.array(test_data["x"], dtype=np.dtype("float32"))
.reshape(-1, 28, 28, 1)
.transpose(0, 3, 1, 2)
)
self.test_y = np.array(test_data["y"], dtype=np.dtype("float64")).reshape(
-1, 1
self.test_y = np.array(test_data["y"], dtype=np.dtype("int64")).reshape(
-1
)
logging.debug("test_x.shape: %s", str(self.test_x.shape))
logging.debug("test_y.shape: %s", str(self.test_y.shape))
......@@ -156,19 +158,23 @@ class Femnist(Dataset):
raise IndexError("i is out of bounds!")
def get_trainset(self):
def get_trainset(self, batch_size, shuffle = False):
"""
Function to get the training set
Parameters
----------
batch_size : int
Batch size for learning
Returns
-------
Dataset
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the training set was not initialized
"""
if self.__training__:
return Data(self.train_x, self.train_y)
return DataLoader(Data(self.train_x, self.train_y), batch_size = batch_size, shuffle = shuffle)
raise RuntimeError("Training set not initialized!")
def get_testset(self):
......@@ -176,7 +182,7 @@ class Femnist(Dataset):
Function to get the test set
Returns
-------
Dataset
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
......
......@@ -17,6 +17,7 @@ class Node:
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
......@@ -31,6 +32,8 @@ class Node:
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
n_procs_local : int
Number of processes on current machine
mapping : decentralizepy.mappings
......@@ -59,7 +62,6 @@ class Node:
Other arguments
"""
log_file = os.path.join(log_dir, str(rank) + ".log")
print(log_file)
logging.basicConfig(
filename=log_file,
format="[%(asctime)s][%(module)s][%(levelname)s] %(message)s",
......@@ -77,29 +79,33 @@ class Node:
logging.debug("type(graph): %s", str(type(self.rank)))
logging.debug("type(mapping): %s", str(type(self.mapping)))
dataset_configs = dict(config.items("DATASET"))
dataset_configs = config["DATASET"]
dataset_module = importlib.import_module(dataset_configs["dataset_package"])
dataset_class = getattr(dataset_module, dataset_configs["dataset_class"])
dataset_params = utils.remove_keys(dataset_configs, ["dataset_package", "dataset_class", "model_class"])
self.dataset = dataset_class(rank, **dataset_params)
self.trainset = self.dataset.get_trainset()
logging.info("Dataset instantiation complete.")
model_class = getattr(dataset_module, dataset_configs["model_class"])
self.model = model_class()
optimizer_configs = dict(config.items("OPTIMIZER_PARAMS"))
optimizer_configs = config["OPTIMIZER_PARAMS"]
optimizer_module = importlib.import_module(optimizer_configs["optimizer_package"])
optimizer_class = getattr(optimizer_module, optimizer_configs["optimizer_class"])
optimizer_params = utils.remove_keys(optimizer_configs, ["optimizer_package", "optimizer_class"])
self.optimizer = optimizer_class(self.model.parameters(), **optimizer_params)
train_configs = dict(config.items("TRAIN_PARAMS"))
train_configs = config["TRAIN_PARAMS"]
train_module = importlib.import_module(train_configs["training_package"])
train_class = getattr(train_module, train_configs["training_class"])
train_params = utils.remove_keys(train_configs, ["training_package", "training_class"])
self.trainer = train_class(self.model, self.optimizer, **train_params)
loss_package = importlib.import_module(train_configs["loss_package"])
loss = getattr(loss_package, train_configs["loss"])
train_params = utils.remove_keys(train_configs, ["training_package", "training_class", "loss", "loss_package"])
self.trainer = train_class(self.model, self.optimizer, loss, **train_params)
for iteration in range(iterations):
self.trainer.train(self.trainset)
\ No newline at end of file
logging.info("Starting training iteration: %d", iteration)
self.trainer.train(self.dataset)
\ No newline at end of file
class Sharing:
"""
API defining who to share with and what, and what to do on receiving
"""
def __init__():
raise NotImplementedError
\ No newline at end of file
from torch.optim import SGD
import torch
from decentralizepy import utils
import logging
class Training:
"""
This class implements the training module for a single node.
"""
def __init__(self, model, optimizer, epochs_per_round = "", batch_size = ""):
def __init__(self, model, optimizer, loss, epochs_per_round = "", batch_size = "", shuffle = ""):
"""
Constructor
Parameters
----------
epochs_per_round : int
model : torch.nn.Module
Neural Network for training
optimizer : torch.optim
Optimizer to learn parameters
loss : function
Loss function
epochs_per_round : int, optional
Number of epochs per training call
batch_size : int
batch_size : int, optional
Number of items to learn over, in one batch
"""
self.model = model
self.optimizer = optimizer
self.loss = loss
self.epochs_per_round = utils.conditional_value(epochs_per_round, "", 1)
self.batch_size = utils.conditional_value(batch_size, "", 1)
self.shuffle = utils.conditional_value(shuffle, "", False)
def train(self, trainset):
def train(self, dataset):
"""
One training iteration
Parameters
----------
trainset : decentralizepy.datasets.Data
The training dataset. Should implement __getitem__(i)
dataset : decentralizepy.datasets.Dataset
The training dataset. Should implement get_trainset(batch_size, shuffle)
"""
raise NotImplementedError
trainset = dataset.get_trainset(self.batch_size, self.shuffle)
for epoch in range(self.epochs_per_round):
epoch_loss = 0.0
for data, target in trainset:
self.model.zero_grad()
output = self.model(data)
loss_val = self.loss(output, target)
epoch_loss += loss_val.item()
loss_val.backward()
self.optimizer.step()
logging.info("Epoch_loss: %d", epoch_loss)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment