Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • sacs/decentralizepy
  • mvujas/decentralizepy
  • randl/decentralizepy
3 results
Show changes
Showing
with 3085 additions and 39 deletions
import json
import logging
import os
from collections import defaultdict
import numpy as np
import torch
import torch.nn.functional as F
from PIL import Image
from torch import nn
from torch.utils.data import DataLoader
import decentralizepy.utils as utils
from decentralizepy.datasets.Data import Data
from decentralizepy.datasets.Dataset import Dataset
from decentralizepy.datasets.Partitioner import DataPartitioner
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.models.Model import Model
IMAGE_DIM = 84
CHANNELS = 3
NUM_CLASSES = 2
class Celeba(Dataset):
"""
Class for the Celeba dataset
"""
def __read_file__(self, file_path):
"""
Read data from the given json file
Parameters
----------
file_path : str
The file path
Returns
-------
tuple
(users, num_samples, data)
"""
with open(file_path, "r") as inf:
client_data = json.load(inf)
return (
client_data["users"],
client_data["num_samples"],
client_data["user_data"],
)
def __read_dir__(self, data_dir):
"""
Function to read all the FEMNIST data files in the directory
Parameters
----------
data_dir : str
Path to the folder containing the data files
Returns
-------
3-tuple
A tuple containing list of clients, number of samples per client,
and the data items per client
"""
clients = []
num_samples = []
data = defaultdict(lambda: None)
files = os.listdir(data_dir)
files = [f for f in files if f.endswith(".json")]
for f in files:
file_path = os.path.join(data_dir, f)
u, n, d = self.__read_file__(file_path)
clients.extend(u)
num_samples.extend(n)
data.update(d)
return clients, num_samples, data
def file_per_user(self, dir, write_dir):
"""
Function to read all the FEMNIST data files and write one file per user
Parameters
----------
dir : str
Path to the folder containing the data files
write_dir : str
Path to the folder to write the files
"""
clients, num_samples, train_data = self.__read_dir__(dir)
for index, client in enumerate(clients):
my_data = dict()
my_data["users"] = [client]
my_data["num_samples"] = num_samples[index]
my_samples = {"x": train_data[client]["x"], "y": train_data[client]["y"]}
my_data["user_data"] = {client: my_samples}
with open(os.path.join(write_dir, client + ".json"), "w") as of:
json.dump(my_data, of)
print("Created File: ", client + ".json")
def load_trainset(self):
"""
Loads the training set. Partitions it if needed.
"""
logging.info("Loading training set.")
files = os.listdir(self.train_dir)
files = [f for f in files if f.endswith(".json")]
files.sort()
c_len = len(files)
# clients, num_samples, train_data = self.__read_dir__(self.train_dir)
if self.sizes == None: # Equal distribution of data among processes
e = c_len // self.n_procs
frac = e / c_len
self.sizes = [frac] * self.n_procs
self.sizes[-1] += 1.0 - frac * self.n_procs
logging.debug("Size fractions: {}".format(self.sizes))
self.uid = self.mapping.get_uid(self.rank, self.machine_id)
my_clients = DataPartitioner(files, self.sizes).use(self.uid)
my_train_data = {"x": [], "y": []}
self.clients = []
self.num_samples = []
logging.debug("Clients Length: %d", c_len)
logging.debug("My_clients_len: %d", my_clients.__len__())
for i in range(my_clients.__len__()):
cur_file = my_clients.__getitem__(i)
clients, _, train_data = self.__read_file__(
os.path.join(self.train_dir, cur_file)
)
for cur_client in clients:
logging.debug("Got data of client: {}".format(cur_client))
self.clients.append(cur_client)
my_train_data["x"].extend(self.process_x(train_data[cur_client]["x"]))
my_train_data["y"].extend(train_data[cur_client]["y"])
self.num_samples.append(len(train_data[cur_client]["y"]))
logging.debug(
"Initial shape of x: {}".format(
np.array(my_train_data["x"], dtype=np.dtype("float32")).shape
)
)
self.train_x = (
np.array(my_train_data["x"], dtype=np.dtype("float32"))
.reshape(-1, IMAGE_DIM, IMAGE_DIM, CHANNELS)
.transpose(0, 3, 1, 2) # Channel first: torch
)
self.train_y = np.array(my_train_data["y"], dtype=np.dtype("int64")).reshape(-1)
logging.info("train_x.shape: %s", str(self.train_x.shape))
logging.info("train_y.shape: %s", str(self.train_y.shape))
assert self.train_x.shape[0] == self.train_y.shape[0]
assert self.train_x.shape[0] > 0
def load_testset(self):
"""
Loads the testing set.
"""
logging.info("Loading testing set.")
_, _, d = self.__read_dir__(self.test_dir)
test_x = []
test_y = []
for test_data in d.values():
test_x.extend(self.process_x(test_data["x"]))
test_y.extend(test_data["y"])
self.test_x = (
np.array(test_x, dtype=np.dtype("float32"))
.reshape(-1, IMAGE_DIM, IMAGE_DIM, CHANNELS)
.transpose(0, 3, 1, 2)
)
self.test_y = np.array(test_y, dtype=np.dtype("int64")).reshape(-1)
logging.info("test_x.shape: %s", str(self.test_x.shape))
logging.info("test_y.shape: %s", str(self.test_y.shape))
assert self.test_x.shape[0] == self.test_y.shape[0]
assert self.test_x.shape[0] > 0
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
train_dir="",
test_dir="",
images_dir="",
sizes="",
test_batch_size=128,
):
"""
Constructor which reads the data files, instantiates and partitions the dataset
Parameters
----------
rank : int
Rank of the current process (to get the partition).
machine_id : int
Machine ID
mapping : decentralizepy.mappings.Mapping
Mapping to convert rank, machine_id -> uid for data partitioning
It also provides the total number of global processes
train_dir : str, optional
Path to the training data files. Required to instantiate the training set
The training set is partitioned according to the number of global processes and sizes
test_dir : str. optional
Path to the testing data files Required to instantiate the testing set
sizes : list(int), optional
A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0
By default, each process gets an equal amount.
test_batch_size : int, optional
Batch size during testing. Default value is 64
"""
super().__init__(
rank,
machine_id,
mapping,
train_dir,
test_dir,
sizes,
test_batch_size,
)
self.IMAGES_DIR = utils.conditional_value(images_dir, "", None)
assert self.IMAGES_DIR != None
self.num_classes = NUM_CLASSES
if self.__training__:
self.load_trainset()
if self.__testing__:
self.load_testset()
# TODO: Add Validation
def process_x(self, raw_x_batch):
"""
Preprocesses the whole batch of images
Returns
-------
np.array
The images as a numpy array
"""
x_batch = [self._load_image(i) for i in raw_x_batch]
x_batch = np.array(x_batch)
return x_batch
def _load_image(self, img_name):
"""
Open and load image.
Returns
-------
np.array
The image as a numpy array
"""
img = Image.open(os.path.join(self.IMAGES_DIR, img_name[:-4] + ".png"))
img = img.resize((IMAGE_DIM, IMAGE_DIM)).convert("RGB")
return np.array(img)
def get_client_ids(self):
"""
Function to retrieve all the clients of the current process
Returns
-------
list(str)
A list of strings of the client ids.
"""
return self.clients
def get_client_id(self, i):
"""
Function to get the client id of the ith sample
Parameters
----------
i : int
Index of the sample
Returns
-------
str
Client ID
Raises
------
IndexError
If the sample index is out of bounds
"""
lb = 0
for j in range(len(self.clients)):
if i < lb + self.num_samples[j]:
return self.clients[j]
raise IndexError("i is out of bounds!")
def get_trainset(self, batch_size=1, shuffle=False):
"""
Function to get the training set
Parameters
----------
batch_size : int, optional
Batch size for learning
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the training set was not initialized
"""
if self.__training__:
return DataLoader(
Data(self.train_x, self.train_y), batch_size=batch_size, shuffle=shuffle
)
raise RuntimeError("Training set not initialized!")
def get_testset(self):
"""
Function to get the test set
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the test set was not initialized
"""
if self.__testing__:
return DataLoader(
Data(self.test_x, self.test_y), batch_size=self.test_batch_size
)
raise RuntimeError("Test set not initialized!")
def test(self, model, loss):
"""
Function to evaluate model on the test dataset.
Parameters
----------
model : decentralizepy.models.Model
Model to evaluate
loss : torch.nn.loss
Loss function to evaluate
Returns
-------
tuple
(accuracy, loss_value)
"""
testloader = self.get_testset()
logging.debug("Test Loader instantiated.")
correct_pred = [0 for _ in range(NUM_CLASSES)]
total_pred = [0 for _ in range(NUM_CLASSES)]
total_correct = 0
total_predicted = 0
with torch.no_grad():
loss_val = 0.0
count = 0
for elems, labels in testloader:
outputs = model(elems)
loss_val += loss(outputs, labels).item()
count += 1
_, predictions = torch.max(outputs, 1)
for label, prediction in zip(labels, predictions):
logging.debug("{} predicted as {}".format(label, prediction))
if label == prediction:
correct_pred[label] += 1
total_correct += 1
total_pred[label] += 1
total_predicted += 1
logging.debug("Predicted on the test set")
for key, value in enumerate(correct_pred):
if total_pred[key] != 0:
accuracy = 100 * float(value) / total_pred[key]
else:
accuracy = 100.0
logging.debug("Accuracy for class {} is: {:.1f} %".format(key, accuracy))
accuracy = 100 * float(total_correct) / total_predicted
loss_val = loss_val / count
logging.info("Overall accuracy is: {:.1f} %".format(accuracy))
return accuracy, loss_val
class CNN(Model):
"""
Class for a CNN Model for Celeba
"""
def __init__(self):
"""
Constructor. Instantiates the CNN Model
with 84*84*3 Input and 2 output classes
"""
super().__init__()
# 2.8k parameters
self.conv1 = nn.Conv2d(CHANNELS, 32, 3, padding="same")
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(32, 32, 3, padding="same")
self.conv3 = nn.Conv2d(32, 32, 3, padding="same")
self.conv4 = nn.Conv2d(32, 32, 3, padding="same")
self.fc1 = nn.Linear(5 * 5 * 32, NUM_CLASSES)
def forward(self, x):
"""
Forward pass of the model
Parameters
----------
x : torch.tensor
The input torch tensor
Returns
-------
torch.tensor
The output torch tensor
"""
x = F.relu(self.pool(self.conv1(x)))
x = F.relu(self.pool(self.conv2(x)))
x = F.relu(self.pool(self.conv3(x)))
x = F.relu(self.pool(self.conv4(x)))
x = torch.flatten(x, 1)
x = self.fc1(x)
return x
class Data:
"""
This class defines the API for Data.
"""
def __init__(self, x, y):
"""
Constructor
Parameters
----------
x : numpy array
A numpy array of data samples
y : numpy array
A numpy array of outputs corresponding to the sample
"""
self.x = x
self.y = y
def __len__(self):
"""
Return the number of samples in the dataset
Returns
-------
int
Number of samples
"""
return self.y.shape[0]
def __getitem__(self, i):
"""
Function to get the item with index i.
Parameters
----------
i : int
Index
Returns
-------
2-tuple
A tuple of the ith data sample and it's corresponding label
"""
return self.x[i], self.y[i]
from decentralizepy import utils
from decentralizepy.mappings.Mapping import Mapping
class Dataset:
"""
This class defines the Dataset API.
All datasets must follow this API.
"""
def __init__(
self,
rank="",
n_procs="",
rank: int,
machine_id: int,
mapping: Mapping,
train_dir="",
test_dir="",
sizes="",
......@@ -18,27 +21,38 @@ class Dataset:
):
"""
Constructor which reads the data files, instantiates and partitions the dataset
Parameters
----------
rank : int, optional
Rank of the current process (to get the partition). Default value is assigned 0
n_procs : int, optional
The number of processes among which to divide the data. Default value is assigned 1
rank : int
Rank of the current process (to get the partition).
machine_id : int
Machine ID
mapping : decentralizepy.mappings.Mapping
Mapping to convert rank, machine_id -> uid for data partitioning
It also provides the total number of global processes
train_dir : str, optional
Path to the training data files. Required to instantiate the training set
The training set is partitioned according to n_procs and sizes
The training set is partitioned according to the number of global processes and sizes
test_dir : str. optional
Path to the testing data files Required to instantiate the testing set
sizes : list(int), optional
A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0
By default, each process gets an equal amount.
test_batch_size : int, optional
Batch size during testing. Default value is 64
"""
self.rank = utils.conditional_value(rank, "", 0)
self.n_procs = utils.conditional_value(n_procs, "", 1)
self.rank = rank
self.machine_id = machine_id
self.mapping = mapping
# the number of global processes, needed to split-up the dataset
self.n_procs = mapping.get_n_procs()
self.train_dir = utils.conditional_value(train_dir, "", None)
self.test_dir = utils.conditional_value(test_dir, "", None)
self.sizes = utils.conditional_value(sizes, "", None)
self.test_batch_size = utils.conditional_value(test_batch_size, "", 64)
self.num_classes = None
if self.sizes:
if type(self.sizes) == str:
self.sizes = eval(self.sizes)
......@@ -53,28 +67,48 @@ class Dataset:
else:
self.__testing__ = False
self.label_distribution = None
def get_label_distribution(self):
# Only supported for classification
if self.label_distribution == None:
self.label_distribution = [0 for _ in range(self.num_classes)]
tr_set = self.get_trainset()
for _, ys in tr_set:
for y in ys:
y_val = y.item()
self.label_distribution[y_val] += 1
return self.label_distribution
def get_trainset(self):
"""
Function to get the training set
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the training set was not initialized
"""
raise NotImplementedError
def get_testset(self):
"""
Function to get the test set
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the test set was not initialized
"""
raise NotImplementedError
......@@ -3,18 +3,18 @@ import logging
import os
from collections import defaultdict
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn.functional as F
import torchvision
from torch import nn
from torch._C import ParameterDict
from torch.utils.data import DataLoader
from decentralizepy.datasets.Data import Data
from decentralizepy.datasets.Dataset import Dataset
from decentralizepy.datasets.Partitioner import DataPartitioner
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.models.Model import Model
from decentralizepy.models.Resnet import BasicBlock, Bottleneck, conv1x1
NUM_CLASSES = 62
IMAGE_SIZE = (28, 28)
......@@ -25,9 +25,24 @@ PIXEL_RANGE = 256.0
class Femnist(Dataset):
"""
Class for the FEMNIST dataset
"""
def __read_file__(self, file_path):
"""
Read data from the given json file
Parameters
----------
file_path : str
The file path
Returns
-------
tuple
(users, num_samples, data)
"""
with open(file_path, "r") as inf:
client_data = json.load(inf)
return (
......@@ -39,15 +54,18 @@ class Femnist(Dataset):
def __read_dir__(self, data_dir):
"""
Function to read all the FEMNIST data files in the directory
Parameters
----------
data_dir : str
Path to the folder containing the data files
Returns
-------
3-tuple
A tuple containing list of clients, number of samples per client,
and the data items per client
"""
clients = []
num_samples = []
......@@ -64,6 +82,17 @@ class Femnist(Dataset):
return clients, num_samples, data
def file_per_user(self, dir, write_dir):
"""
Function to read all the FEMNIST data files and write one file per user
Parameters
----------
dir : str
Path to the folder containing the data files
write_dir : str
Path to the folder to write the files
"""
clients, num_samples, train_data = self.__read_dir__(dir)
for index, client in enumerate(clients):
my_data = dict()
......@@ -76,6 +105,10 @@ class Femnist(Dataset):
print("Created File: ", client + ".json")
def load_trainset(self):
"""
Loads the training set. Partitions it if needed.
"""
logging.info("Loading training set.")
files = os.listdir(self.train_dir)
files = [f for f in files if f.endswith(".json")]
......@@ -91,7 +124,8 @@ class Femnist(Dataset):
self.sizes[-1] += 1.0 - frac * self.n_procs
logging.debug("Size fractions: {}".format(self.sizes))
my_clients = DataPartitioner(files, self.sizes).use(self.rank)
self.uid = self.mapping.get_uid(self.rank, self.machine_id)
my_clients = DataPartitioner(files, self.sizes).use(self.uid)
my_train_data = {"x": [], "y": []}
self.clients = []
self.num_samples = []
......@@ -114,17 +148,21 @@ class Femnist(Dataset):
.transpose(0, 3, 1, 2)
)
self.train_y = np.array(my_train_data["y"], dtype=np.dtype("int64")).reshape(-1)
logging.debug("train_x.shape: %s", str(self.train_x.shape))
logging.debug("train_y.shape: %s", str(self.train_y.shape))
logging.info("train_x.shape: %s", str(self.train_x.shape))
logging.info("train_y.shape: %s", str(self.train_y.shape))
assert self.train_x.shape[0] == self.train_y.shape[0]
assert self.train_x.shape[0] > 0
def load_testset(self):
"""
Loads the testing set.
"""
logging.info("Loading testing set.")
_, _, test_data = self.__read_dir__(self.test_dir)
_, _, d = self.__read_dir__(self.test_dir)
test_x = []
test_y = []
for test_data in test_data.values():
for test_data in d.values():
for x in test_data["x"]:
test_x.append(x)
for y in test_data["y"]:
......@@ -135,14 +173,16 @@ class Femnist(Dataset):
.transpose(0, 3, 1, 2)
)
self.test_y = np.array(test_y, dtype=np.dtype("int64")).reshape(-1)
logging.debug("test_x.shape: %s", str(self.test_x.shape))
logging.debug("test_y.shape: %s", str(self.test_y.shape))
logging.info("test_x.shape: %s", str(self.test_x.shape))
logging.info("test_y.shape: %s", str(self.test_y.shape))
assert self.test_x.shape[0] == self.test_y.shape[0]
assert self.test_x.shape[0] > 0
def __init__(
self,
rank=0,
rank: int,
machine_id: int,
mapping: Mapping,
n_procs="",
train_dir="",
test_dir="",
......@@ -151,22 +191,39 @@ class Femnist(Dataset):
):
"""
Constructor which reads the data files, instantiates and partitions the dataset
Parameters
----------
rank : int, optional
Rank of the current process (to get the partition). Default value is assigned 0
n_procs : int, optional
The number of processes among which to divide the data. Default value is assigned 1
rank : int
Rank of the current process (to get the partition).
machine_id : int
Machine ID
mapping : decentralizepy.mappings.Mapping
Mapping to convert rank, machine_id -> uid for data partitioning
It also provides the total number of global processes
train_dir : str, optional
Path to the training data files. Required to instantiate the training set
The training set is partitioned according to n_procs and sizes
The training set is partitioned according to the number of global processes and sizes
test_dir : str. optional
Path to the testing data files Required to instantiate the testing set
sizes : list(int), optional
A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0
By default, each process gets an equal amount.
test_batch_size : int, optional
Batch size during testing. Default value is 64
"""
super().__init__(rank, n_procs, train_dir, test_dir, sizes, test_batch_size)
super().__init__(
rank,
machine_id,
mapping,
train_dir,
test_dir,
sizes,
test_batch_size,
)
self.num_classes = NUM_CLASSES
if self.__training__:
self.load_trainset()
......@@ -179,28 +236,34 @@ class Femnist(Dataset):
def get_client_ids(self):
"""
Function to retrieve all the clients of the current process
Returns
-------
list(str)
A list of strings of the client ids.
"""
return self.clients
def get_client_id(self, i):
"""
Function to get the client id of the ith sample
Parameters
----------
i : int
Index of the sample
Returns
-------
str
Client ID
Raises
------
IndexError
If the sample index is out of bounds
"""
lb = 0
for j in range(len(self.clients)):
......@@ -212,34 +275,44 @@ class Femnist(Dataset):
def get_trainset(self, batch_size=1, shuffle=False):
"""
Function to get the training set
Parameters
----------
batch_size : int, optional
Batch size for learning
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the training set was not initialized
"""
if self.__training__:
return DataLoader(
Data(self.train_x, self.train_y), batch_size=batch_size, shuffle=shuffle
Data(self.train_x, self.train_y),
batch_size=batch_size,
shuffle=shuffle,
drop_last=True, # needed for resnet
)
raise RuntimeError("Training set not initialized!")
def get_testset(self):
"""
Function to get the test set
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the test set was not initialized
"""
if self.__testing__:
return DataLoader(
......@@ -247,13 +320,23 @@ class Femnist(Dataset):
)
raise RuntimeError("Test set not initialized!")
def imshow(self, img):
npimg = img.numpy()
plt.imshow(np.transpose(npimg, (1, 2, 0)))
plt.show()
def test(self, model, loss):
"""
Function to evaluate model on the test dataset.
def test(self, model):
logging.debug("Evaluating on test set.")
Parameters
----------
model : decentralizepy.models.Model
Model to evaluate
loss : torch.nn.loss
Loss function to evaluate
Returns
-------
tuple
(accuracy, loss_value)
"""
testloader = self.get_testset()
logging.debug("Test Loader instantiated.")
......@@ -265,8 +348,12 @@ class Femnist(Dataset):
total_predicted = 0
with torch.no_grad():
loss_val = 0.0
count = 0
for elems, labels in testloader:
outputs = model(elems)
loss_val += loss(outputs, labels).item()
count += 1
_, predictions = torch.max(outputs, 1)
for label, prediction in zip(labels, predictions):
logging.debug("{} predicted as {}".format(label, prediction))
......@@ -276,7 +363,7 @@ class Femnist(Dataset):
total_pred[label] += 1
total_predicted += 1
logging.info("Predicted on the test set")
logging.debug("Predicted on the test set")
for key, value in enumerate(correct_pred):
if total_pred[key] != 0:
......@@ -286,19 +373,22 @@ class Femnist(Dataset):
logging.debug("Accuracy for class {} is: {:.1f} %".format(key, accuracy))
accuracy = 100 * float(total_correct) / total_predicted
loss_val = loss_val / count
logging.info("Overall accuracy is: {:.1f} %".format(accuracy))
logging.info("Evaluating complete.")
return accuracy, loss_val
class LogisticRegression(nn.Module):
class LogisticRegression(Model):
"""
Class for a Logistic Regression Neural Network for FEMNIST
"""
def __init__(self):
"""
Constructor. Instantiates the Logistic Regression Model
with 28*28 Input and 62 output classes
"""
super().__init__()
self.fc1 = nn.Linear(FLAT_SIZE, NUM_CLASSES)
......@@ -306,23 +396,37 @@ class LogisticRegression(nn.Module):
def forward(self, x):
"""
Forward pass of the model
Parameters
----------
x : torch.tensor
The input torch tensor
Returns
-------
torch.tensor
The output torch tensor
"""
x = torch.flatten(x, start_dim=1)
x = self.fc1(x)
return x
class CNN(nn.Module):
class CNN(Model):
"""
Class for a CNN Model for FEMNIST
"""
def __init__(self):
"""
Constructor. Instantiates the CNN Model
with 28*28*1 Input and 62 output classes
"""
super().__init__()
# 1.6 million params
self.conv1 = nn.Conv2d(1, 32, 5, padding=2)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(32, 64, 5, padding=2)
......@@ -330,9 +434,159 @@ class CNN(nn.Module):
self.fc2 = nn.Linear(512, NUM_CLASSES)
def forward(self, x):
"""
Forward pass of the model
Parameters
----------
x : torch.tensor
The input torch tensor
Returns
-------
torch.tensor
The output torch tensor
"""
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = torch.flatten(x, 1)
x = F.relu(self.fc1(x))
x = self.fc2(x)
return x
class RNET(Model):
"""
From PyTorch:
Class for a Resnet Model for FEMNIST
Copied and modified from https://github.com/pytorch/pytorch/blob/75024e228ca441290b6a1c2e564300ad507d7af6/benchmarks/functional_autograd_benchmark/torchvision_models.py
For the license see models/Resnet.py
"""
def __init__(
self,
num_classes=NUM_CLASSES,
zero_init_residual=False,
groups=1,
width_per_group=32,
replace_stride_with_dilation=None,
norm_layer=None,
):
super(RNET, self).__init__()
block = BasicBlock
layers = [2, 2, 2, 2]
if norm_layer is None:
norm_layer = nn.BatchNorm2d
self._norm_layer = norm_layer
self.inplanes = 32
self.dilation = 1
if replace_stride_with_dilation is None:
# each element in the tuple indicates if we should replace
# the 2x2 stride with a dilated convolution instead
replace_stride_with_dilation = [False, False, False]
if len(replace_stride_with_dilation) != 3:
raise ValueError(
"replace_stride_with_dilation should be None "
"or a 3-element tuple, got {}".format(replace_stride_with_dilation)
)
self.groups = groups
self.base_width = width_per_group
self.conv1 = nn.Conv2d(
1, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False
)
self.bn1 = norm_layer(self.inplanes)
self.relu = nn.ReLU(inplace=True)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = self._make_layer(block, 32, layers[0])
self.layer2 = self._make_layer(
block, 64, layers[1], stride=2, dilate=replace_stride_with_dilation[0]
)
self.layer3 = self._make_layer(
block, 128, layers[2], stride=2, dilate=replace_stride_with_dilation[1]
)
self.layer4 = self._make_layer(
block, 256, layers[3], stride=2, dilate=replace_stride_with_dilation[2]
)
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(256 * block.expansion, num_classes)
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
# Zero-initialize the last BN in each residual branch,
# so that the residual branch starts with zeros, and each residual block behaves like an identity.
# This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
if zero_init_residual:
for m in self.modules():
if isinstance(m, Bottleneck):
nn.init.constant_(m.bn3.weight, 0)
elif isinstance(m, BasicBlock):
nn.init.constant_(m.bn2.weight, 0)
def _make_layer(self, block, planes, blocks, stride=1, dilate=False):
norm_layer = self._norm_layer
downsample = None
previous_dilation = self.dilation
if dilate:
self.dilation *= stride
stride = 1
if stride != 1 or self.inplanes != planes * block.expansion:
downsample = nn.Sequential(
conv1x1(self.inplanes, planes * block.expansion, stride),
norm_layer(planes * block.expansion),
)
layers = []
layers.append(
block(
self.inplanes,
planes,
stride,
downsample,
self.groups,
self.base_width,
previous_dilation,
norm_layer,
)
)
self.inplanes = planes * block.expansion
for _ in range(1, blocks):
layers.append(
block(
self.inplanes,
planes,
groups=self.groups,
base_width=self.base_width,
dilation=self.dilation,
norm_layer=norm_layer,
)
)
return nn.Sequential(*layers)
def _forward_impl(self, x):
# See note [TorchScript super()]
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.fc(x)
return x
def forward(self, x):
return self._forward_impl(x)
import logging
import math
import os
import zipfile
import pandas as pd
import requests
import torch
from sklearn import metrics
from torch.utils.data import DataLoader
from decentralizepy.datasets.Data import Data
from decentralizepy.datasets.Dataset import Dataset
from decentralizepy.mappings import Mapping
from decentralizepy.models.Model import Model
class MovieLens(Dataset):
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
train_dir="",
test_dir="",
sizes="",
test_batch_size=1,
):
super().__init__(
rank, machine_id, mapping, train_dir, test_dir, sizes, test_batch_size
)
self.n_users, self.n_items, df_train, df_test = self._load_data()
self.train_data, self.test_data = self._split_data(
df_train, df_test, self.n_procs
)
# [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0]
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ]
self.NUM_CLASSES = 10
self.RATING_DICT = {
0.5: 0,
1.0: 1,
1.5: 2,
2.0: 3,
2.5: 4,
3.0: 5,
3.5: 6,
4.0: 7,
4.5: 8,
5.0: 9,
}
def _load_data(self):
f_ratings = os.path.join(self.train_dir, "ml-latest-small", "ratings.csv")
names = ["user_id", "item_id", "rating", "timestamp"]
df_ratings = pd.read_csv(f_ratings, sep=",", names=names, skiprows=1).drop(
columns=["timestamp"]
)
# map item_id properly
items_count = df_ratings["item_id"].nunique()
items_ids = sorted(list(df_ratings["item_id"].unique()))
assert items_count == len(items_ids)
for i in range(0, items_count):
df_ratings.loc[df_ratings["item_id"] == items_ids[i], "item_id"] = i + 1
# split train, test - 70% : 30%
grouped_users = df_ratings.groupby(["user_id"])
users_count = len(grouped_users)
df_train = pd.DataFrame()
df_test = pd.DataFrame()
for i in range(0, users_count):
df_user = df_ratings[df_ratings["user_id"] == i + 1]
df_user_train = df_user.sample(frac=0.7)
df_user_test = pd.concat([df_user, df_user_train]).drop_duplicates(
keep=False
)
assert len(df_user_train) + len(df_user_test) == len(df_user)
df_train = pd.concat([df_train, df_user_train])
df_test = pd.concat([df_test, df_user_test])
# 610, 9724
return users_count, items_count, df_train, df_test
def _split_data(self, train_data, test_data, world_size):
# SPLITTING BY USERS: group by users and split the data accordingly
mod = self.n_users % world_size
users_count = self.n_users // world_size
if self.rank < mod:
users_count += 1
offset = users_count * self.rank
else:
offset = users_count * self.rank + mod
my_train_data = pd.DataFrame()
my_test_data = pd.DataFrame()
for i in range(offset, offset + users_count):
my_train_data = pd.concat(
[my_train_data, train_data[train_data["user_id"] == i + 1]]
)
my_test_data = pd.concat(
[my_test_data, test_data[test_data["user_id"] == i + 1]]
)
logging.info("Data split for test and train.")
return my_train_data, my_test_data
def get_trainset(self, batch_size=1, shuffle=False):
if self.__training__:
train_x = self.train_data[["user_id", "item_id"]].to_numpy()
train_y = self.train_data.rating.values.astype("float32")
return DataLoader(
Data(train_x, train_y), batch_size=batch_size, shuffle=shuffle
)
raise RuntimeError("Training set not initialized!")
def get_testset(self):
if self.__testing__:
test_x = self.test_data[["user_id", "item_id"]].to_numpy()
test_y = self.test_data.rating.values
return DataLoader(Data(test_x, test_y), batch_size=self.test_batch_size)
raise RuntimeError("Test set not initialized!")
def test(self, model, loss):
test_set = self.get_testset()
logging.debug("Test Loader instantiated.")
correct_pred = [0 for _ in range(self.NUM_CLASSES)]
total_pred = [0 for _ in range(self.NUM_CLASSES)]
total_correct = 0
total_predicted = 0
with torch.no_grad():
loss_val = 0.0
loss_predicted = 0.0
count = 0
for test_x, test_y in test_set:
output = model(test_x)
loss_val += loss(output, test_y).item()
count += 1
# threshold values to range [0.5, 5.0]
o1 = (output > 5.0).nonzero(as_tuple=False)
output[o1] = 5.0
o1 = (output < 0.5).nonzero(as_tuple=False)
output[o1] = 0.5
# round a number to the closest half integer
output = torch.round(output * 2) / 2
loss_predicted += metrics.mean_absolute_error(output, test_y)
for rating, prediction in zip(test_y.tolist(), output):
# print(rating, prediction)
logging.debug("{} predicted as {}".format(rating, prediction))
if rating == prediction:
correct_pred[self.RATING_DICT[rating]] += 1
total_correct += 1
total_pred[self.RATING_DICT[rating]] += 1
total_predicted += 1
logging.debug("Predicted on the test set")
for key, value in enumerate(correct_pred):
if total_pred[key] != 0:
accuracy = 100 * float(value) / total_pred[key]
else:
accuracy = 100.0
logging.debug("Accuracy for class {} is: {:.1f} %".format(key, accuracy))
accuracy = 100 * float(total_correct) / total_predicted
loss_val = math.sqrt(loss_val / count)
loss_predicted = loss_predicted / count
logging.info(
"MSE loss: {:.8f} | Rounded MAE loss: {:.8f}".format(
loss_val, loss_predicted
)
)
logging.info("Overall accuracy is: {:.1f} %".format(accuracy))
return accuracy, loss_val
# todo: this class should be in 'models' package; add support for reading it from there and move it
class MatrixFactorization(Model):
"""
Class for a Matrix Factorization model for MovieLens.
"""
def __init__(self, n_users=610, n_items=9724, n_factors=20):
"""
Instantiates the Matrix Factorization model with user and item embeddings.
Parameters
----------
n_users
The number of unique users.
n_items
The number of unique items.
n_factors
The number of columns in embeddings matrix.
"""
super().__init__()
self.user_factors = torch.nn.Embedding(n_users, n_factors)
self.item_factors = torch.nn.Embedding(n_items, n_factors)
self.user_factors.weight.data.uniform_(-0.05, 0.05)
self.item_factors.weight.data.uniform_(-0.05, 0.05)
def forward(self, data):
"""
Forward pass of the model, it does matrix multiplication and returns predictions for given users and items.
"""
users = torch.LongTensor(data[:, 0]) - 1
items = torch.LongTensor(data[:, 1]) - 1
u, it = self.user_factors(users), self.item_factors(items)
x = (u * it).sum(dim=1, keepdim=True)
return x.squeeze(1)
def download_movie_lens(dest_path):
"""
Downloads the movielens latest small dataset.
This data set consists of:
* 100836 ratings from 610 users on 9742 movies.
* Each user has rated at least 20 movies.
https://files.grouplens.org/datasets/movielens/ml-latest-small-README.html
"""
url = "http://files.grouplens.org/datasets/movielens/ml-latest-small.zip"
req = requests.get(url, stream=True)
print("Downloading MovieLens Latest Small data...")
with open(os.path.join(dest_path, "ml-latest-small.zip"), "wb") as fd:
for chunk in req.iter_content(chunk_size=None):
fd.write(chunk)
with zipfile.ZipFile(os.path.join(dest_path, "ml-latest-small.zip"), "r") as z:
z.extractall(dest_path)
print("Downloaded MovieLens Latest Small dataset at", dest_path)
if __name__ == "__main__":
path = "/mnt/nfs/shared/leaf/data/movielens"
zip_file = os.path.join(path, "ml-latest-small.zip")
if not os.path.isfile(zip_file):
download_movie_lens(path)
......@@ -6,16 +6,19 @@ from random import Random
class Partition(object):
"""
Class for holding the data partition
"""
def __init__(self, data, index):
"""
Constructor. Caches the data and the indices
Parameters
----------
data : indexable
index : list
A list of indices
"""
self.data = data
self.index = index
......@@ -23,23 +26,28 @@ class Partition(object):
def __len__(self):
"""
Function to retrieve the length
Returns
-------
int
Number of items in the data
"""
return len(self.index)
def __getitem__(self, index):
"""
Retrieves the item in data with the given index
Parameters
----------
index : int
Returns
-------
Data
The data sample with the given `index` in the dataset
"""
data_idx = self.index[index]
return self.data[data_idx]
......@@ -48,11 +56,13 @@ class Partition(object):
class DataPartitioner(object):
"""
Class to partition the dataset
"""
def __init__(self, data, sizes=[1.0], seed=1234):
"""
Constructor. Partitions the data according the parameters
Parameters
----------
data : indexable
......@@ -61,6 +71,7 @@ class DataPartitioner(object):
A list of fractions for each process
seed : int, optional
Seed for generating a random subset
"""
self.data = data
self.partitions = []
......@@ -78,13 +89,92 @@ class DataPartitioner(object):
def use(self, rank):
"""
Get the partition for the process with the given `rank`
Parameters
----------
rank : int
Rank of the current process
Returns
-------
Partition
The dataset partition of the current process
"""
return Partition(self.data, self.partitions[rank])
class SimpleDataPartitioner(DataPartitioner):
"""
Class to partition the dataset
"""
def __init__(self, data, sizes=[1.0]):
"""
Constructor. Partitions the data according the parameters
Parameters
----------
data : indexable
An indexable list of data items
sizes : list(float)
A list of fractions for each process
"""
self.data = data
self.partitions = []
data_len = len(data)
indexes = [x for x in range(0, data_len)]
for frac in sizes:
part_len = int(frac * data_len)
self.partitions.append(indexes[0:part_len])
indexes = indexes[part_len:]
class KShardDataPartitioner(DataPartitioner):
"""
Class to partition the dataset
"""
def __init__(self, data, sizes=[1.0], shards=1, seed=1234):
"""
Constructor. Partitions the data according the parameters
Parameters
----------
data : indexable
An indexable list of data items
sizes : list(float)
A list of fractions for each process
shards : int
Number of shards to allot to process
seed : int, optional
Seed for generating a random subset
"""
self.data = data
self.partitions = []
data_len = len(data)
indexes = [x for x in range(0, data_len)]
rng = Random()
rng.seed(seed)
for frac in sizes:
self.partitions.append([])
for _ in range(shards):
start = rng.randint(0, len(indexes) - 1)
part_len = int(frac * data_len) // shards
if start + part_len > len(indexes):
self.partitions[-1].extend(indexes[start:])
self.partitions[-1].extend(
indexes[: (start + part_len - len(indexes))]
)
indexes = indexes[(start + part_len - len(indexes)) : start]
else:
self.partitions[-1].extend(indexes[start : start + part_len])
index_start = indexes[:start]
index_start.extend(indexes[start + part_len :])
indexes = index_start
import collections
import json
import logging
import os
import pickle
from collections import defaultdict
from pathlib import Path
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
from torch.utils.data import DataLoader
from decentralizepy.datasets.Data import Data
from decentralizepy.datasets.Dataset import Dataset
from decentralizepy.datasets.Partitioner import DataPartitioner
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.models.Model import Model
VOCAB_LEN = 9999 # 10000 was used as it needed to be +1 due to using mask_zero in the tf embedding
SEQ_LEN = 10
EMBEDDING_DIM = 200
class Reddit(Dataset):
"""
Class for the Reddit dataset
-- Based on https://gitlab.epfl.ch/sacs/efficient-federated-learning/-/blob/master/grad_guessing/data_utils.py
and Femnist.py
"""
def __read_file__(self, file_path):
"""
Read data from the given json file
Parameters
----------
file_path : str
The file path
Returns
-------
tuple
(users, num_samples, data)
"""
with open(file_path, "r") as inf:
client_data = json.load(inf)
return (
client_data["users"],
client_data["num_samples"],
client_data["user_data"],
)
def __read_dir__(self, data_dir):
"""
Function to read all the Reddit data files in the directory
Parameters
----------
data_dir : str
Path to the folder containing the data files
Returns
-------
3-tuple
A tuple containing list of users, number of samples per client,
and the data items per client
"""
users = []
num_samples = []
data = defaultdict(lambda: None)
files = os.listdir(data_dir)
files = [f for f in files if f.endswith(".json")]
for f in files:
file_path = os.path.join(data_dir, f)
u, n, d = self.__read_file__(file_path)
users.extend(u)
num_samples.extend(n)
data.update(d)
return users, num_samples, data
def file_per_user(self, dir, write_dir):
"""
Function to read all the Reddit data files and write one file per user
Parameters
----------
dir : str
Path to the folder containing the data files
write_dir : str
Path to the folder to write the files
"""
clients, num_samples, train_data = self.__read_dir__(dir)
for index, client in enumerate(clients):
my_data = dict()
my_data["users"] = [client]
my_data["num_samples"] = num_samples[index]
my_samples = {"x": train_data[client]["x"], "y": train_data[client]["y"]}
my_data["user_data"] = {client: my_samples}
with open(os.path.join(write_dir, client + ".json"), "w") as of:
json.dump(my_data, of)
print("Created File: ", client + ".json")
def load_trainset(self):
"""
Loads the training set. Partitions it if needed.
"""
logging.info("Loading training set.")
files = os.listdir(self.train_dir)
files = [f for f in files if f.endswith(".json")]
files.sort()
c_len = len(files)
# clients, num_samples, train_data = self.__read_dir__(self.train_dir)
if self.sizes == None: # Equal distribution of data among processes
e = c_len // self.n_procs
frac = e / c_len
self.sizes = [frac] * self.n_procs
self.sizes[-1] += 1.0 - frac * self.n_procs
logging.debug("Size fractions: {}".format(self.sizes))
self.uid = self.mapping.get_uid(self.rank, self.machine_id)
my_clients = DataPartitioner(files, self.sizes).use(self.uid)
my_train_data = {"x": [], "y": []}
self.clients = []
self.num_samples = []
logging.debug("Clients Length: %d", c_len)
logging.debug("My_clients_len: %d", my_clients.__len__())
for i in range(my_clients.__len__()):
cur_file = my_clients.__getitem__(i)
clients, _, train_data = self.__read_file__(
os.path.join(self.train_dir, cur_file)
)
for cur_client in clients:
self.clients.append(cur_client)
processed_x, processed_y = self.prepare_data(train_data[cur_client])
# processed_x is an list of fixed size word id arrays that represent a phrase
# processed_y is a list of word ids that each represent the next word of a phrase
my_train_data["x"].extend(processed_x)
my_train_data["y"].extend(processed_y)
self.num_samples.append(len(processed_y))
# turns the list of lists into a single list
self.train_y = np.array(my_train_data["y"], dtype=np.dtype("int64")).reshape(-1)
self.train_x = np.array(
my_train_data["x"], dtype=np.dtype("int64")
) # .reshape(-1)
logging.info("train_x.shape: %s", str(self.train_x.shape))
logging.info("train_y.shape: %s", str(self.train_y.shape))
assert self.train_x.shape[0] == self.train_y.shape[0]
assert self.train_x.shape[0] > 0
def load_testset(self):
"""
Loads the testing set.
"""
logging.info("Loading testing set.")
_, _, d = self.__read_dir__(self.test_dir)
test_x = []
test_y = []
for test_data in d.values():
processed_x, processed_y = self.prepare_data(test_data)
# processed_x is an list of fixed size word id arrays that represent a phrase
# processed_y is a list of word ids that each represent the next word of a phrase
test_x.extend(processed_x)
test_y.extend(processed_y)
self.test_y = np.array(test_y, dtype=np.dtype("int64")).reshape(-1)
self.test_x = np.array(test_x, dtype=np.dtype("int64"))
logging.info("test_x.shape: %s", str(self.test_x.shape))
logging.info("test_y.shape: %s", str(self.test_y.shape))
assert self.test_x.shape[0] == self.test_y.shape[0]
assert self.test_x.shape[0] > 0
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
n_procs="",
train_dir="",
test_dir="",
sizes="",
test_batch_size=1024,
):
"""
Constructor which reads the data files, instantiates and partitions the dataset
Parameters
----------
rank : int
Rank of the current process (to get the partition).
machine_id : int
Machine ID
mapping : decentralizepy.mappings.Mapping
Mapping to convert rank, machine_id -> uid for data partitioning
It also provides the total number of global processes
train_dir : str, optional
Path to the training data files. Required to instantiate the training set
The training set is partitioned according to the number of global processes and sizes
test_dir : str. optional
Path to the testing data files Required to instantiate the testing set
sizes : list(int), optional
A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0
By default, each process gets an equal amount.
test_batch_size : int, optional
Batch size during testing. Default value is 64
"""
super().__init__(
rank,
machine_id,
mapping,
train_dir,
test_dir,
sizes,
test_batch_size,
)
if self.train_dir and Path(self.train_dir).exists():
vocab_path = os.path.join(self.train_dir, "../../vocab/reddit_vocab.pck")
(
self.vocab,
self.vocab_size,
self.unk_symbol,
self.pad_symbol,
) = self._load_vocab(vocab_path)
logging.info("The reddit vocab has %i tokens.", len(self.vocab))
if self.__training__:
self.load_trainset()
if self.__testing__:
self.load_testset()
# TODO: Add Validation
def _load_vocab(self, VOCABULARY_PATH):
"""
loads the training vocabulary
copied from https://gitlab.epfl.ch/sacs/efficient-federated-learning/-/blob/master/grad_guessing/data_utils.py
Parameters
----------
VOCABULARY_PATH : str
Path to the pickled training vocabulary
Returns
-------
Tuple
vocabulary, size, unk symbol, pad symbol
"""
vocab_file = pickle.load(open(VOCABULARY_PATH, "rb"))
vocab = collections.defaultdict(lambda: vocab_file["unk_symbol"])
vocab.update(vocab_file["vocab"])
return (
vocab,
vocab_file["size"],
vocab_file["unk_symbol"],
vocab_file["pad_symbol"],
)
def prepare_data(self, data):
"""
copied from https://gitlab.epfl.ch/sacs/efficient-federated-learning/-/blob/master/grad_guessing/data_utils.py
Parameters
----------
data
Returns
-------
"""
data_x = data["x"]
data_y = data["y"]
# flatten lists
def flatten_lists(data_x_by_comment, data_y_by_comment):
data_x_by_seq, data_y_by_seq = [], []
for c, l in zip(data_x_by_comment, data_y_by_comment):
data_x_by_seq.extend(c)
data_y_by_seq.extend(l["target_tokens"])
return data_x_by_seq, data_y_by_seq
data_x, data_y = flatten_lists(data_x, data_y)
data_x_processed = self.process_x(data_x)
data_y_processed = self.process_y(data_y)
filtered_x, filtered_y = [], []
for i in range(len(data_x_processed)):
if np.sum(data_y_processed[i]) != 0:
filtered_x.append(data_x_processed[i])
filtered_y.append(data_y_processed[i])
return (filtered_x, filtered_y)
def _tokens_to_ids(self, raw_batch):
"""
Turns an list of list of tokens that are of the same size (with padding <PAD>) if needed
into a list of list of word ids
[['<BOS>', 'do', 'you', 'have', 'proof', 'of', 'purchase', 'for', 'clay', 'play'], [ ...], ...]
turns into:
[[ 5 45 13 24 1153 11 1378 17 6817 165], ...]
copied from https://gitlab.epfl.ch/sacs/efficient-federated-learning/-/blob/master/grad_guessing/data_utils.py
Parameters
----------
raw_batch : list
list of fixed size token lists
Returns
-------
2D array with the rows representing fixed size token_ids pharases
"""
def tokens_to_word_ids(tokens, word2id):
return [word2id[word] for word in tokens]
to_ret = [tokens_to_word_ids(seq, self.vocab) for seq in raw_batch]
return np.array(to_ret)
def process_x(self, raw_x_batch):
"""
copied from https://gitlab.epfl.ch/sacs/efficient-federated-learning/-/blob/master/grad_guessing/data_utils.py
Parameters
----------
raw_x_batch
Returns
-------
"""
tokens = self._tokens_to_ids([s for s in raw_x_batch])
return tokens
def process_y(self, raw_y_batch):
"""
copied from https://gitlab.epfl.ch/sacs/efficient-federated-learning/-/blob/master/grad_guessing/data_utils.py
Parameters
----------
raw_y_batch
Returns
-------
"""
tokens = self._tokens_to_ids([s for s in raw_y_batch])
def getNextWord(token_ids):
n = len(token_ids)
for i in range(n):
# gets the word at the end of the phrase that should be predicted
# that is the last token that is not a pad.
if token_ids[n - i - 1] != self.pad_symbol:
return token_ids[n - i - 1]
return self.pad_symbol
return [getNextWord(t) for t in tokens]
def get_client_ids(self):
"""
Function to retrieve all the clients of the current process
Returns
-------
list(str)
A list of strings of the client ids.
"""
return self.clients
def get_client_id(self, i):
"""
Function to get the client id of the ith sample
Parameters
----------
i : int
Index of the sample
Returns
-------
str
Client ID
Raises
------
IndexError
If the sample index is out of bounds
"""
lb = 0
for j in range(len(self.clients)):
if i < lb + self.num_samples[j]:
return self.clients[j]
raise IndexError("i is out of bounds!")
def get_trainset(self, batch_size=1, shuffle=False):
"""
Function to get the training set
Parameters
----------
batch_size : int, optional
Batch size for learning
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the training set was not initialized
"""
if self.__training__:
return DataLoader(
Data(self.train_x, self.train_y), batch_size=batch_size, shuffle=shuffle
)
raise RuntimeError("Training set not initialized!")
def get_testset(self):
"""
Function to get the test set
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the test set was not initialized
"""
if self.__testing__:
return DataLoader(
Data(self.test_x, self.test_y), batch_size=self.test_batch_size
)
raise RuntimeError("Test set not initialized!")
def test(self, model, loss):
"""
Function to evaluate model on the test dataset.
Parameters
----------
model : decentralizepy.models.Model
Model to evaluate
loss : torch.nn.loss
Loss function to evaluate
Returns
-------
tuple
(accuracy, loss_value)
"""
testloader = self.get_testset()
logging.debug("Test Loader instantiated.")
correct_pred = [0 for _ in range(VOCAB_LEN)]
total_pred = [0 for _ in range(VOCAB_LEN)]
total_correct = 0
total_predicted = 0
with torch.no_grad():
loss_val = 0.0
count = 0
for elems, labels in testloader:
outputs = model(elems)
loss_val += loss(outputs, labels).item()
count += 1
_, predictions = torch.max(outputs, 1)
for label, prediction in zip(labels, predictions):
logging.debug("{} predicted as {}".format(label, prediction))
if label == prediction:
correct_pred[label] += 1
total_correct += 1
total_pred[label] += 1
total_predicted += 1
logging.debug("Predicted on the test set")
for key, value in enumerate(correct_pred):
if total_pred[key] != 0:
accuracy = 100 * float(value) / total_pred[key]
else:
accuracy = 100.0
logging.debug("Accuracy for class {} is: {:.1f} %".format(key, accuracy))
accuracy = 100 * float(total_correct) / total_predicted
loss_val = loss_val / count
logging.info("Overall accuracy is: {:.1f} %".format(accuracy))
return accuracy, loss_val
class RNN(Model):
"""
Class for a RNN Model for Reddit
"""
def __init__(self):
"""
Constructor. Instantiates the RNN Model to predict the next word of a sequence of word.
Based on the TensorFlow model found here: https://gitlab.epfl.ch/sacs/efficient-federated-learning/-/blob/master/grad_guessing/data_utils.py
"""
super().__init__()
# input_length does not exist
self.embedding = nn.Embedding(VOCAB_LEN, EMBEDDING_DIM, padding_idx=0)
self.rnn_cells = nn.LSTM(EMBEDDING_DIM, 256, batch_first=True, num_layers=2)
# activation function is added in the forward pass
# Note: the tensorflow implementation did not use any activation function in this step?
# should I use one.
self.l1 = nn.Linear(256, 128)
# the tf model used sofmax activation here
self.l2 = nn.Linear(128, VOCAB_LEN)
def forward(self, x):
"""
Forward pass of the model
Parameters
----------
x : torch.tensor
The input torch tensor
Returns
-------
torch.tensor
The output torch tensor
"""
x = self.embedding(x)
x = self.rnn_cells(x)
last_layer_output = x[1][0][1, ...]
x = F.relu(self.l1(last_layer_output))
x = self.l2(x)
# softmax is applied by the CrossEntropyLoss used during training
return x
import json
import logging
import os
from collections import defaultdict
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
from torch.utils.data import DataLoader
from decentralizepy.datasets.Data import Data
from decentralizepy.datasets.Dataset import Dataset
from decentralizepy.datasets.Partitioner import DataPartitioner
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.models.Model import Model
VOCAB = list(
"dhlptx@DHLPTX $(,048cgkoswCGKOSW[_#'/37;?bfjnrvzBFJNRVZ\"&*.26:\naeimquyAEIMQUY]!%)-159\r{{}}<>"
)
VOCAB_LEN = len(VOCAB)
# Creating a mapping from unique characters to indices
char2idx = {u: i for i, u in enumerate(VOCAB)}
idx2char = np.array(VOCAB)
EMBEDDING_DIM = 8
HIDDEN_DIM = 256
NUM_CLASSES = VOCAB_LEN
NUM_LAYERS = 2
SEQ_LENGTH = 80
class Shakespeare(Dataset):
"""
Class for the Shakespeare dataset
-- Based on https://gitlab.epfl.ch/sacs/efficient-federated-learning/-/blob/master/grad_guessing/data_utils.py
"""
def __read_file__(self, file_path):
"""
Read data from the given json file
Parameters
----------
file_path : str
The file path
Returns
-------
tuple
(users, num_samples, data)
"""
with open(file_path, "r") as inf:
client_data = json.load(inf)
return (
client_data["users"],
client_data["num_samples"],
client_data["user_data"],
)
def __read_dir__(self, data_dir):
"""
Function to read all the Reddit data files in the directory
Parameters
----------
data_dir : str
Path to the folder containing the data files
Returns
-------
3-tuple
A tuple containing list of users, number of samples per client,
and the data items per client
"""
users = []
num_samples = []
data = defaultdict(lambda: None)
files = os.listdir(data_dir)
files = [f for f in files if f.endswith(".json")]
for f in files:
file_path = os.path.join(data_dir, f)
u, n, d = self.__read_file__(file_path)
users.extend(u)
num_samples.extend(n)
data.update(d)
return users, num_samples, data
def file_per_user(self, dir, write_dir):
"""
Function to read all the Reddit data files and write one file per user
Parameters
----------
dir : str
Path to the folder containing the data files
write_dir : str
Path to the folder to write the files
"""
clients, num_samples, train_data = self.__read_dir__(dir)
for index, client in enumerate(clients):
my_data = dict()
my_data["users"] = [client]
my_data["num_samples"] = num_samples[index]
my_samples = {"x": train_data[client]["x"], "y": train_data[client]["y"]}
my_data["user_data"] = {client: my_samples}
with open(os.path.join(write_dir, client + ".json"), "w") as of:
json.dump(my_data, of)
print("Created File: ", client + ".json")
def load_trainset(self):
"""
Loads the training set. Partitions it if needed.
"""
logging.info("Loading training set.")
files = os.listdir(self.train_dir)
files = [f for f in files if f.endswith(".json")]
files.sort()
c_len = len(files)
# clients, num_samples, train_data = self.__read_dir__(self.train_dir)
if self.sizes == None: # Equal distribution of data among processes
e = c_len // self.n_procs
frac = e / c_len
self.sizes = [frac] * self.n_procs
self.sizes[-1] += 1.0 - frac * self.n_procs
logging.debug("Size fractions: {}".format(self.sizes))
self.uid = self.mapping.get_uid(self.rank, self.machine_id)
my_clients = DataPartitioner(files, self.sizes).use(self.uid)
my_train_data = {"x": [], "y": []}
self.clients = []
self.num_samples = []
logging.debug("Clients Length: %d", c_len)
logging.debug("My_clients_len: %d", my_clients.__len__())
for i in range(my_clients.__len__()):
cur_file = my_clients.__getitem__(i)
clients, _, train_data = self.__read_file__(
os.path.join(self.train_dir, cur_file)
)
for cur_client in clients:
self.clients.append(cur_client)
my_train_data["x"].extend(self.process(train_data[cur_client]["x"]))
my_train_data["y"].extend(self.process(train_data[cur_client]["y"]))
self.num_samples.append(len(train_data[cur_client]["y"]))
# turns the list of lists into a single list
self.train_y = np.array(my_train_data["y"], dtype=np.dtype("int64")).reshape(-1)
self.train_x = np.array(
my_train_data["x"], dtype=np.dtype("int64")
) # .reshape(-1)
logging.info("train_x.shape: %s", str(self.train_x.shape))
logging.info("train_y.shape: %s", str(self.train_y.shape))
assert self.train_x.shape[0] == self.train_y.shape[0]
assert self.train_x.shape[0] > 0
def load_testset(self):
"""
Loads the testing set.
"""
logging.info("Loading testing set.")
_, _, d = self.__read_dir__(self.test_dir)
test_x = []
test_y = []
for test_data in d.values():
test_x.extend(self.process(test_data["x"]))
test_y.extend(self.process(test_data["y"]))
self.test_y = np.array(test_y, dtype=np.dtype("int64")).reshape(-1)
self.test_x = np.array(test_x, dtype=np.dtype("int64"))
logging.info("test_x.shape: %s", str(self.test_x.shape))
logging.info("test_y.shape: %s", str(self.test_y.shape))
assert self.test_x.shape[0] == self.test_y.shape[0]
assert self.test_x.shape[0] > 0
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
n_procs="",
train_dir="",
test_dir="",
sizes="",
test_batch_size=1024,
):
"""
Constructor which reads the data files, instantiates and partitions the dataset
Parameters
----------
rank : int
Rank of the current process (to get the partition).
machine_id : int
Machine ID
mapping : decentralizepy.mappings.Mapping
Mapping to convert rank, machine_id -> uid for data partitioning
It also provides the total number of global processes
train_dir : str, optional
Path to the training data files. Required to instantiate the training set
The training set is partitioned according to the number of global processes and sizes
test_dir : str, optional
Path to the testing data files Required to instantiate the testing set
sizes : list(int), optional
A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0
By default, each process gets an equal amount.
test_batch_size : int, optional
Batch size during testing. Default value is 64
"""
super().__init__(
rank,
machine_id,
mapping,
train_dir,
test_dir,
sizes,
test_batch_size,
)
if self.__training__:
self.load_trainset()
if self.__testing__:
self.load_testset()
def process(self, x):
output = list(
map(lambda sentences: list(map(lambda c: char2idx[c], list(sentences))), x)
)
return output
def get_client_ids(self):
"""
Function to retrieve all the clients of the current process
Returns
-------
list(str)
A list of strings of the client ids.
"""
return self.clients
def get_client_id(self, i):
"""
Function to get the client id of the ith sample
Parameters
----------
i : int
Index of the sample
Returns
-------
str
Client ID
Raises
------
IndexError
If the sample index is out of bounds
"""
lb = 0
for j in range(len(self.clients)):
if i < lb + self.num_samples[j]:
return self.clients[j]
raise IndexError("i is out of bounds!")
def get_trainset(self, batch_size=1, shuffle=False):
"""
Function to get the training set
Parameters
----------
batch_size : int, optional
Batch size for learning
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the training set was not initialized
"""
if self.__training__:
return DataLoader(
Data(self.train_x, self.train_y), batch_size=batch_size, shuffle=shuffle
)
raise RuntimeError("Training set not initialized!")
def get_testset(self):
"""
Function to get the test set
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the test set was not initialized
"""
if self.__testing__:
thirstiest = torch.arange(0, self.test_x.shape[0], 30)
return DataLoader(
Data(self.test_x[thirstiest], self.test_y[thirstiest]),
batch_size=self.test_batch_size,
)
raise RuntimeError("Test set not initialized!")
def test(self, model, loss):
"""
Function to evaluate model on the test dataset.
Parameters
----------
model : decentralizepy.models.Model
Model to evaluate
loss : torch.nn.loss
Loss function to evaluate
Returns
-------
tuple
(accuracy, loss_value)
"""
testloader = self.get_testset()
logging.debug("Test Loader instantiated.")
correct_pred = [0 for _ in range(NUM_CLASSES)]
total_pred = [0 for _ in range(NUM_CLASSES)]
total_correct = 0
total_predicted = 0
with torch.no_grad():
loss_val = 0.0
count = 0
for elems, labels in testloader:
outputs = model(elems)
loss_val += loss(outputs, labels).item()
count += 1
_, predictions = torch.max(outputs, 1)
for label, prediction in zip(labels, predictions):
logging.debug("{} predicted as {}".format(label, prediction))
if label == prediction:
correct_pred[label] += 1
total_correct += 1
total_pred[label] += 1
total_predicted += 1
logging.debug("Predicted on the test set")
for key, value in enumerate(correct_pred):
if total_pred[key] != 0:
accuracy = 100 * float(value) / total_pred[key]
else:
accuracy = 100.0
logging.debug("Accuracy for class {} is: {:.1f} %".format(key, accuracy))
accuracy = 100 * float(total_correct) / total_predicted
loss_val = loss_val / count
logging.info("Overall accuracy is: {:.1f} %".format(accuracy))
return accuracy, loss_val
class LSTM(Model):
"""
Class for a RNN Model for Sent140
"""
def __init__(self):
"""
Constructor. Instantiates the RNN Model to predict the next word of a sequence of word.
Based on the TensorFlow model found here: https://gitlab.epfl.ch/sacs/efficient-federated-learning/-/blob/master/grad_guessing/data_utils.py
"""
super().__init__()
# input_length does not exist
self.embedding = nn.Embedding(VOCAB_LEN, EMBEDDING_DIM)
self.lstm = nn.LSTM(
EMBEDDING_DIM, HIDDEN_DIM, batch_first=True, num_layers=NUM_LAYERS
)
# activation function is added in the forward pass
# Note: the tensorflow implementation did not use any activation function in this step?
# should I use one.
self.l1 = nn.Linear(HIDDEN_DIM * SEQ_LENGTH, VOCAB_LEN)
def forward(self, x):
"""
Forward pass of the model
Parameters
----------
x : torch.tensor
The input torch tensor
Returns
-------
torch.tensor
The output torch tensor
"""
# logging.info("Initial Shape: {}".format(x.shape))
x = self.embedding(x)
# logging.info("Embedding Shape: {}".format(x.shape))
x, _ = self.lstm(x)
# logging.info("LSTM Shape: {}".format(x.shape))
x = F.relu(x.reshape((-1, HIDDEN_DIM * SEQ_LENGTH)))
# logging.info("View Shape: {}".format(x.shape))
x = self.l1(x)
# logging.info("Output Shape: {}".format(x.shape))
return x
from decentralizepy.graphs.Graph import Graph
class FullyConnected(Graph):
"""
The class for generating a Fully Connected Graph Topology
"""
def __init__(self, n_procs):
"""
Constructor. Generates a Fully Connected graph
Parameters
----------
n_procs : int
total number of nodes in the graph
"""
super().__init__(n_procs)
for node in range(n_procs):
neighbors = set([x for x in range(n_procs) if x != node])
self.adj_list[node] = neighbors
import networkx as nx
import numpy as np
class Graph:
"""
This class defines the graph topology.
......@@ -7,36 +11,45 @@ class Graph:
def __init__(self, n_procs=None):
"""
Constructor
Parameters
----------
n_procs : int, optional
Number of processes in the graph, if already known
"""
if n_procs != None:
self.n_procs = n_procs
self.adj_list = [set() for i in range(self.n_procs)]
def get_all_nodes(self):
return [i for i in range(self.n_procs)]
def __insert_adj__(self, node, neighbours):
"""
Inserts `neighbours` into the adjacency list of `node`
Parameters
----------
node : int
The vertex in question
neighbours : list(int)
A list of neighbours of the `node`
"""
self.adj_list[node].update(neighbours)
def __insert_edge__(self, x, y):
"""
Inserts edge `x -> y` into the graph
Parameters
----------
x : int
The source vertex
y : int
The destination vertex
"""
self.adj_list[x].add(y)
self.adj_list[y].add(x)
......@@ -44,21 +57,26 @@ class Graph:
def read_graph_from_file(self, file, type="edges", force_connect=False):
"""
Reads the graph from a given file
Parameters
----------
file : str
path to the file
type : `edges` or `adjacency`
type : str
`edges` or `adjacency`
force_connect : bool, optional
Should the graph be force-connected using a ring
Returns
-------
int
Number of processes, read from the first line of the file
Raises
------
ValueError
If the type is not either `edges` or `adjacency`
"""
with open(file, "r") as inf:
......@@ -85,6 +103,17 @@ class Graph:
return self.n_procs
def write_graph_to_file(self, file, type="edges"):
"""
Writes graph to file
Parameters
----------
file : str
File path
type : str
One of {"edges", "adjacency"}. Writes the corresponding format.
"""
with open(file, "w") as of:
of.write(str(self.n_procs) + "\n")
if type == "edges":
......@@ -100,6 +129,7 @@ class Graph:
def connect_graph(self):
"""
Connects the graph using a Ring
"""
for node in range(self.n_procs):
self.adj_list[node].add((node + 1) % self.n_procs)
......@@ -108,13 +138,33 @@ class Graph:
def neighbors(self, uid):
"""
Gives the neighbors of a node
Parameters
----------
uid : int
globally unique identifier of the node
Returns
-------
set(int)
a set of neighbours
"""
return self.adj_list[uid]
def centr(self):
my_adj = {x: list(adj) for x, adj in enumerate(self.adj_list)}
nxGraph = nx.Graph(my_adj)
a = nx.to_numpy_matrix(nxGraph)
self.averaging_weights = np.ones((self.n_procs, self.n_procs), dtype=float)
centrality = nx.betweenness_centrality(nxGraph)
for i in range(len(centrality)):
centrality[i] += 0.01
for i in range(self.averaging_weights.shape[0]):
s = 0
for j in range(self.averaging_weights.shape[0]):
self.averaging_weights[i, j] = 1.0 / centrality[j]
s += self.averaging_weights[i, j]
for j in range(self.averaging_weights.shape[0]):
self.averaging_weights[i, j] = self.averaging_weights[i, j] / s
return self.averaging_weights
import networkx as nx
from decentralizepy.graphs.Graph import Graph
class Regular(Graph):
"""
The class for generating a Regular topology
"""
def __init__(self, n_procs, degree, seed=None):
"""
Constructor. Generates a Ring graph
Parameters
----------
n_procs : int
total number of nodes in the graph
degree : int
Neighbors of each node
"""
super().__init__(n_procs)
G = nx.random_regular_graph(degree, n_procs, seed)
adj = G.adjacency()
for i, l in adj:
self.adj_list[i] = set() # new set
for k in l:
self.adj_list[i].add(k)
if not nx.is_connected(G):
self.connect_graph()
from decentralizepy.graphs.Graph import Graph
class Ring(Graph):
"""
The class for generating a Ring topology
"""
def __init__(self, n_procs):
"""
Constructor. Generates a Ring graph
Parameters
----------
n_procs : int
total number of nodes in the graph
"""
super().__init__(n_procs)
self.connect_graph()
......@@ -8,11 +8,13 @@ class SmallWorld(Graph):
The class for generating a SmallWorld topology Graph
Adapted from https://gitlab.epfl.ch/sacs/ml-rawdatasharing/dnn-recommender/-/blob/master/topologies.py
"""
def __init__(self, n_procs, k_over_2, beta):
"""
Constructor. Generates a random connected SmallWorld graph
Parameters
----------
n_procs : int
......@@ -21,6 +23,7 @@ class SmallWorld(Graph):
k_over_2 config for smallworld
beta : int
beta config for smallworld. β = 1 is truly equal to the Erdős-Rényi network model
"""
super().__init__(n_procs)
G = smallworld.get_smallworld_graph(self.n_procs, k_over_2, beta)
......
import networkx as nx
from decentralizepy.graphs.Graph import Graph
class Star(Graph):
"""
The class for generating a Star topology
Adapted from ./Regular.py
"""
def __init__(self, n_procs):
"""
Constructor. Generates a Ring graph
Parameters
----------
n_procs : int
total number of nodes in the graph
"""
super().__init__(n_procs)
G = nx.star_graph(n_procs - 1)
adj = G.adjacency()
for i, l in adj:
self.adj_list[i] = set() # new set
for k in l:
self.adj_list[i].add(k)
if not nx.is_connected(G):
self.connect_graph()
......@@ -5,48 +5,75 @@ class Linear(Mapping):
"""
This class defines the mapping:
uid = machine_id * procs_per_machine + rank
"""
def __init__(self, n_machines, procs_per_machine):
def __init__(self, n_machines, procs_per_machine, global_service_machine=0):
"""
Constructor
Parameters
----------
n_machines : int
Number of machines involved in learning
procs_per_machine : int
Number of processes spawned per machine
"""
super().__init__(n_machines * procs_per_machine)
self.n_machines = n_machines
self.procs_per_machine = procs_per_machine
self.global_service_machine = global_service_machine
def get_uid(self, rank: int, machine_id: int):
"""
Gives the global unique identifier of the node
Parameters
----------
rank : int
Node's rank on its machine
machine_id : int
node's machine in the cluster
Returns
-------
int
the unique identifier
"""
if rank < 0:
return rank
return machine_id * self.procs_per_machine + rank
def get_machine_and_rank(self, uid: int):
"""
Gives the rank and machine_id of the node
Parameters
----------
uid : int
globally unique identifier of the node
Returns
-------
2-tuple
a tuple of rank and machine_id
"""
if uid < 0:
return uid, self.global_service_machine
return (uid % self.procs_per_machine), (uid // self.procs_per_machine)
def get_local_procs_count(self):
"""
Gives number of processes that run on the node
Returns
-------
int
the number of local processes
"""
return self.procs_per_machine
......@@ -3,27 +3,49 @@ class Mapping:
This class defines the bidirectional mapping between:
1. The unique identifier
2. machine_id and rank
"""
def __init__(self, n_procs):
"""
Constructor
Parameters
----------
n_procs : int
Total number of processes
"""
self.n_procs = n_procs
def get_n_procs(self):
"""
Gives the global sum of all processes that are spawned on the machines
Returns
-------
int
the number of global processes
"""
return self.n_procs
def get_uid(self, rank: int, machine_id: int):
"""
Gives the global unique identifier of the node
Parameters
----------
rank : int
Node's rank on its machine
machine_id : int
node's machine in the cluster
Returns
-------
int
the unique identifier
"""
raise NotImplementedError
......@@ -31,14 +53,30 @@ class Mapping:
def get_machine_and_rank(self, uid: int):
"""
Gives the rank and machine_id of the node
Parameters
----------
uid : int
globally unique identifier of the node
Returns
-------
2-tuple
a tuple of rank and machine_id
"""
raise NotImplementedError
def get_local_procs_count(self):
"""
Gives number of processes that run on the node
Returns
-------
int
the number of local processes
"""
raise NotImplementedError
import pickle
from pathlib import Path
import torch
from torch import nn
class Model(nn.Module):
"""
This class wraps the torch model
More fields can be added here
"""
def __init__(self):
"""
Constructor
"""
super().__init__()
self.model_change = None
self._param_count_ot = None
self._param_count_total = None
self.accumulated_changes = None
self.shared_parameters_counter = None
def count_params(self, only_trainable=False):
"""
Counts the total number of params
Parameters
----------
only_trainable : bool
Counts only parameters with gradients when True
Returns
-------
int
Total number of parameters
"""
if only_trainable:
if not self._param_count_ot:
self._param_count_ot = sum(
p.numel() for p in self.parameters() if p.requires_grad
)
return self._param_count_ot
else:
if not self._param_count_total:
self._param_count_total = sum(p.numel() for p in self.parameters())
return self._param_count_total
def rewind_accumulation(self, indices):
"""
resets accumulated_changes at the given indices
Parameters
----------
indices : torch.Tensor
Tensor that contains indices corresponding to the flatten model
"""
if self.accumulated_changes is not None:
self.accumulated_changes[indices] = 0.0
def dump_weights(self, directory, uid, round):
"""
dumps the current model as a pickle file into the specified direcectory
Parameters
----------
directory : str
directory in which the weights are dumped
uid : int
uid of the node, will be used to give the weight a unique name
round : int
current round, will be used to give the weight a unique name
"""
with torch.no_grad():
tensors_to_cat = []
for _, v in self.state_dict().items():
tensors_to_cat.append(v.flatten())
flat = torch.cat(tensors_to_cat)
with open(Path(directory) / f"{round}_weight_{uid}.pk", "wb") as f:
pickle.dump(flat, f)
def get_weights(self):
"""
flattens the current weights
"""
with torch.no_grad():
tensors_to_cat = []
for _, v in self.state_dict().items():
tensors_to_cat.append(v.flatten())
flat = torch.cat(tensors_to_cat)
return flat
"""
Copyright (c) 2016- Facebook, Inc (Adam Paszke)
Copyright (c) 2014- Facebook, Inc (Soumith Chintala)
Copyright (c) 2011-2014 Idiap Research Institute (Ronan Collobert)
Copyright (c) 2012-2014 Deepmind Technologies (Koray Kavukcuoglu)
Copyright (c) 2011-2012 NEC Laboratories America (Koray Kavukcuoglu)
Copyright (c) 2011-2013 NYU (Clement Farabet)
Copyright (c) 2006-2010 NEC Laboratories America (Ronan Collobert, Leon Bottou, Iain Melvin, Jason Weston)
Copyright (c) 2006 Idiap Research Institute (Samy Bengio)
Copyright (c) 2001-2004 Idiap Research Institute (Ronan Collobert, Samy Bengio, Johnny Mariethoz)
From Caffe2:
Copyright (c) 2016-present, Facebook Inc. All rights reserved.
All contributions by Facebook:
Copyright (c) 2016 Facebook Inc.
All contributions by Google:
Copyright (c) 2015 Google Inc.
All rights reserved.
All contributions by Yangqing Jia:
Copyright (c) 2015 Yangqing Jia
All rights reserved.
All contributions by Kakao Brain:
Copyright 2019-2020 Kakao Brain
All contributions from Caffe:
Copyright(c) 2013, 2014, 2015, the respective contributors
All rights reserved.
All other contributions:
Copyright(c) 2015, 2016 the respective contributors
All rights reserved.
Caffe2 uses a copyright model similar to Caffe: each contributor holds
copyright over their contributions to Caffe2. The project versioning records
all such contribution and copyright details. If a contributor wants to further
mark their specific copyright on a particular contribution, they should
indicate their copyright solely in the commit message of the change when it is
committed.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the names of Facebook, Deepmind Technologies, NYU, NEC Laboratories America
and IDIAP Research Institute nor the names of its contributors may be
used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
"""
from torch import nn
# Copied and modified from https://github.com/pytorch/pytorch/blob/75024e228ca441290b6a1c2e564300ad507d7af6/benchmarks/functional_autograd_benchmark/torchvision_models.py
def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1):
"""3x3 convolution with padding"""
return nn.Conv2d(
in_planes,
out_planes,
kernel_size=3,
stride=stride,
padding=dilation,
groups=groups,
bias=False,
dilation=dilation,
)
def conv1x1(in_planes, out_planes, stride=1):
"""1x1 convolution"""
return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)
class BasicBlock(nn.Module):
expansion = 1
def __init__(
self,
inplanes,
planes,
stride=1,
downsample=None,
groups=1,
base_width=64,
dilation=1,
norm_layer=None,
):
super(BasicBlock, self).__init__()
if norm_layer is None:
norm_layer = nn.BatchNorm2d
if dilation > 1:
raise NotImplementedError("Dilation > 1 not supported in BasicBlock")
# Both self.conv1 and self.downsample layers downsample the input when stride != 1
self.conv1 = conv3x3(inplanes, planes, stride)
self.bn1 = norm_layer(planes)
self.relu = nn.ReLU(inplace=True)
self.conv2 = conv3x3(planes, planes)
self.bn2 = norm_layer(planes)
self.downsample = downsample
self.stride = stride
def forward(self, x):
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
class Bottleneck(nn.Module):
# Bottleneck in torchvision places the stride for downsampling at 3x3 convolution(self.conv2)
# while original implementation places the stride at the first 1x1 convolution(self.conv1)
# according to "Deep residual learning for image recognition"https://arxiv.org/abs/1512.03385.
# This variant is also known as ResNet V1.5 and improves accuracy according to
# https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch.
expansion = 4
def __init__(
self,
inplanes,
planes,
stride=1,
downsample=None,
groups=1,
base_width=64,
dilation=1,
norm_layer=None,
):
super(Bottleneck, self).__init__()
if norm_layer is None:
norm_layer = nn.BatchNorm2d
width = int(planes * (base_width / 64.0)) * groups
# Both self.conv2 and self.downsample layers downsample the input when stride != 1
self.conv1 = conv1x1(inplanes, width)
self.bn1 = norm_layer(width)
self.conv2 = conv3x3(width, width, stride, groups, dilation)
self.bn2 = norm_layer(width)
self.conv3 = conv1x1(width, planes * self.expansion)
self.bn3 = norm_layer(planes * self.expansion)
self.relu = nn.ReLU(inplace=True)
self.downsample = downsample
self.stride = stride
def forward(self, x):
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
import importlib
import json
import logging
import math
import os
from collections import deque
import torch
from matplotlib import pyplot as plt
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.node.Node import Node
class DPSGDNode(Node):
"""
This class defines the node for DPSGD
"""
def save_plot(self, l, label, title, xlabel, filename):
"""
Save Matplotlib plot. Clears previous plots.
Parameters
----------
l : dict
dict of x -> y. `x` must be castable to int.
label : str
label of the plot. Used for legend.
title : str
Header
xlabel : str
x-axis label
filename : str
Name of file to save the plot as.
"""
plt.clf()
y_axis = [l[key] for key in l.keys()]
x_axis = list(map(int, l.keys()))
plt.plot(x_axis, y_axis, label=label)
plt.xlabel(xlabel)
plt.title(title)
plt.savefig(filename)
def get_neighbors(self, node=None):
return self.my_neighbors
# def instantiate_peer_deques(self):
# for neighbor in self.my_neighbors:
# if neighbor not in self.peer_deques:
# self.peer_deques[neighbor] = deque()
def receive_DPSGD(self):
return self.receive_channel("DPSGD")
def run(self):
"""
Start the decentralized learning
"""
self.testset = self.dataset.get_testset()
rounds_to_test = self.test_after
rounds_to_train_evaluate = self.train_evaluate_after
global_epoch = 1
change = 1
for iteration in range(self.iterations):
logging.info("Starting training iteration: %d", iteration)
rounds_to_train_evaluate -= 1
rounds_to_test -= 1
self.iteration = iteration
self.trainer.train(self.dataset)
new_neighbors = self.get_neighbors()
# The following code does not work because TCP sockets are supposed to be long lived.
# for neighbor in self.my_neighbors:
# if neighbor not in new_neighbors:
# logging.info("Removing neighbor {}".format(neighbor))
# if neighbor in self.peer_deques:
# assert len(self.peer_deques[neighbor]) == 0
# del self.peer_deques[neighbor]
# self.communication.destroy_connection(neighbor, linger = 10000)
# self.barrier.remove(neighbor)
self.my_neighbors = new_neighbors
self.connect_neighbors()
logging.info("Connected to all neighbors")
# self.instantiate_peer_deques()
to_send = self.sharing.get_data_to_send()
to_send["CHANNEL"] = "DPSGD"
for neighbor in self.my_neighbors:
self.communication.send(neighbor, to_send)
while not self.received_from_all():
sender, data = self.receive_DPSGD()
logging.info(
"Received Model from {} of iteration {}".format(
sender, data["iteration"]
)
)
if sender not in self.peer_deques:
self.peer_deques[sender] = deque()
self.peer_deques[sender].append(data)
averaging_deque = dict()
for neighbor in self.my_neighbors:
averaging_deque[neighbor] = self.peer_deques[neighbor]
self.sharing._averaging(averaging_deque)
if self.reset_optimizer:
self.optimizer = self.optimizer_class(
self.model.parameters(), **self.optimizer_params
) # Reset optimizer state
self.trainer.reset_optimizer(self.optimizer)
if iteration:
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)),
"r",
) as inf:
results_dict = json.load(inf)
else:
results_dict = {
"train_loss": {},
"test_loss": {},
"test_acc": {},
"total_bytes": {},
"total_meta": {},
"total_data_per_n": {},
}
results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes
if hasattr(self.communication, "total_meta"):
results_dict["total_meta"][
iteration + 1
] = self.communication.total_meta
if hasattr(self.communication, "total_data"):
results_dict["total_data_per_n"][
iteration + 1
] = self.communication.total_data
if rounds_to_train_evaluate == 0:
logging.info("Evaluating on train set.")
rounds_to_train_evaluate = self.train_evaluate_after * change
loss_after_sharing = self.trainer.eval_loss(self.dataset)
results_dict["train_loss"][iteration + 1] = loss_after_sharing
self.save_plot(
results_dict["train_loss"],
"train_loss",
"Training Loss",
"Communication Rounds",
os.path.join(self.log_dir, "{}_train_loss.png".format(self.rank)),
)
if self.dataset.__testing__ and rounds_to_test == 0:
rounds_to_test = self.test_after * change
logging.info("Evaluating on test set.")
ta, tl = self.dataset.test(self.model, self.loss)
results_dict["test_acc"][iteration + 1] = ta
results_dict["test_loss"][iteration + 1] = tl
if global_epoch == 49:
change *= 2
global_epoch += change
with open(
os.path.join(self.log_dir, "{}_results.json".format(self.rank)), "w"
) as of:
json.dump(results_dict, of)
if self.model.shared_parameters_counter is not None:
logging.info("Saving the shared parameter counts")
with open(
os.path.join(
self.log_dir, "{}_shared_parameters.json".format(self.rank)
),
"w",
) as of:
json.dump(self.model.shared_parameters_counter.numpy().tolist(), of)
self.disconnect_neighbors()
logging.info("Storing final weight")
self.model.dump_weights(self.weights_store_dir, self.uid, iteration)
logging.info("All neighbors disconnected. Process complete!")
def cache_fields(
self,
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
reset_optimizer,
):
"""
Instantiate object field with arguments.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
"""
self.rank = rank
self.machine_id = machine_id
self.graph = graph
self.mapping = mapping
self.uid = self.mapping.get_uid(rank, machine_id)
self.log_dir = log_dir
self.weights_store_dir = weights_store_dir
self.iterations = iterations
self.test_after = test_after
self.train_evaluate_after = train_evaluate_after
self.reset_optimizer = reset_optimizer
self.sent_disconnections = False
logging.info("Rank: %d", self.rank)
logging.info("type(graph): %s", str(type(self.rank)))
logging.info("type(mapping): %s", str(type(self.mapping)))
def init_comm(self, comm_configs):
"""
Instantiate communication module from config.
Parameters
----------
comm_configs : dict
Python dict containing communication config params
"""
comm_module = importlib.import_module(comm_configs["comm_package"])
comm_class = getattr(comm_module, comm_configs["comm_class"])
comm_params = utils.remove_keys(comm_configs, ["comm_package", "comm_class"])
self.addresses_filepath = comm_params.get("addresses_filepath", None)
self.communication = comm_class(
self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params
)
def instantiate(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
*args
):
"""
Construct objects.
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations.
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
args : optional
Other arguments
"""
logging.info("Started process.")
self.init_log(log_dir, rank, log_level)
self.cache_fields(
rank,
machine_id,
mapping,
graph,
iterations,
log_dir,
weights_store_dir,
test_after,
train_evaluate_after,
reset_optimizer,
)
self.init_dataset_model(config["DATASET"])
self.init_optimizer(config["OPTIMIZER_PARAMS"])
self.init_trainer(config["TRAIN_PARAMS"])
self.init_comm(config["COMMUNICATION"])
self.message_queue = dict()
self.barrier = set()
self.my_neighbors = self.graph.neighbors(self.uid)
self.init_sharing(config["SHARING"])
self.peer_deques = dict()
self.connect_neighbors()
def received_from_all(self):
"""
Check if all neighbors have sent the current iteration
Returns
-------
bool
True if required data has been received, False otherwise
"""
for k in self.my_neighbors:
if (k not in self.peer_deques) or len(self.peer_deques[k]) == 0:
return False
return True
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
graph: Graph,
config,
iterations=1,
log_dir=".",
weights_store_dir=".",
log_level=logging.INFO,
test_after=5,
train_evaluate_after=1,
reset_optimizer=1,
*args
):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
machine_id : int
Machine ID on which the process in running
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
iterations : int
Number of iterations (communication steps) for which the model should be trained
log_dir : str
Logging directory
weights_store_dir : str
Directory in which to store model weights
log_level : logging.Level
One of DEBUG, INFO, WARNING, ERROR, CRITICAL
test_after : int
Number of iterations after which the test loss and accuracy arecalculated
train_evaluate_after : int
Number of iterations after which the train loss is calculated
reset_optimizer : int
1 if optimizer should be reset every communication round, else 0
args : optional
Other arguments
"""
total_threads = os.cpu_count()
self.threads_per_proc = max(
math.floor(total_threads / mapping.procs_per_machine), 1
)
torch.set_num_threads(self.threads_per_proc)
torch.set_num_interop_threads(1)
self.instantiate(
rank,
machine_id,
mapping,
graph,
config,
iterations,
log_dir,
weights_store_dir,
log_level,
test_after,
train_evaluate_after,
reset_optimizer,
*args
)
logging.info(
"Each proc uses %d threads out of %d.", self.threads_per_proc, total_threads
)
self.run()