Skip to content
Snippets Groups Projects
Commit aa1c3dcf authored by Rishi Sharma's avatar Rishi Sharma
Browse files

Add celeba, total_bytes

parent 166ded18
No related branches found
No related tags found
No related merge requests found
[DATASET]
dataset_package = decentralizepy.datasets.Celeba
dataset_class = Celeba
model_class = CNN
n_procs = 96
images_dir = /home/risharma/leaf/data/celeba/data/raw/img_align_celeba
train_dir = /home/risharma/leaf/data/celeba/per_user_data/train
test_dir = /home/risharma/leaf/data/celeba/data/test
; python list of fractions below
sizes =
[OPTIMIZER_PARAMS]
optimizer_package = torch.optim
optimizer_class = Adam
lr = 0.001
[TRAIN_PARAMS]
training_package = decentralizepy.training.GradientAccumulator
training_class = GradientAccumulator
epochs_per_round = 5
batch_size = 512
shuffle = True
loss_package = torch.nn
loss_class = CrossEntropyLoss
[COMMUNICATION]
comm_package = decentralizepy.communication.TCP
comm_class = TCP
addresses_filepath = ip_addr_6Machines.json
[SHARING]
sharing_package = decentralizepy.sharing.PartialModel
sharing_class = PartialModel
This diff is collapsed.
...@@ -40,6 +40,7 @@ install_requires = ...@@ -40,6 +40,7 @@ install_requires =
zmq zmq
jsonlines jsonlines
pillow pillow
pickle
smallworld smallworld
localconfig localconfig
include_package_data = True include_package_data = True
......
...@@ -9,6 +9,7 @@ class Communication: ...@@ -9,6 +9,7 @@ class Communication:
self.machine_id = machine_id self.machine_id = machine_id
self.mapping = mapping self.mapping = mapping
self.uid = mapping.get_uid(rank, machine_id) self.uid = mapping.get_uid(rank, machine_id)
self.total_bytes = 0
def encrypt(self, data): def encrypt(self, data):
raise NotImplementedError raise NotImplementedError
......
...@@ -108,9 +108,12 @@ class TCP(Communication): ...@@ -108,9 +108,12 @@ class TCP(Communication):
def send(self, uid, data): def send(self, uid, data):
to_send = self.encrypt(data) to_send = self.encrypt(data)
data_size = len(to_send)
self.total_bytes += data_size
id = str(uid).encode() id = str(uid).encode()
self.peer_sockets[id].send(to_send) self.peer_sockets[id].send(to_send)
logging.debug("{} sent the message to {}.".format(self.uid, uid)) logging.debug("{} sent the message to {}.".format(self.uid, uid))
logging.info("Sent this round: {}".format(data_size))
def disconnect_neighbors(self): def disconnect_neighbors(self):
if not self.sent_disconnections: if not self.sent_disconnections:
......
import json
import logging
import os
from collections import defaultdict
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn.functional as F
from PIL import Image
from torch import nn
from torch.utils.data import DataLoader
import decentralizepy.utils as utils
from decentralizepy.datasets.Data import Data
from decentralizepy.datasets.Dataset import Dataset
from decentralizepy.datasets.Partitioner import DataPartitioner
from decentralizepy.models.Model import Model
IMAGE_DIM = 84
CHANNELS = 3
NUM_CLASSES = 2
class Celeba(Dataset):
"""
Class for the Celeba dataset
"""
def __read_file__(self, file_path):
with open(file_path, "r") as inf:
client_data = json.load(inf)
return (
client_data["users"],
client_data["num_samples"],
client_data["user_data"],
)
def __read_dir__(self, data_dir):
"""
Function to read all the FEMNIST data files in the directory
Parameters
----------
data_dir : str
Path to the folder containing the data files
Returns
-------
3-tuple
A tuple containing list of clients, number of samples per client,
and the data items per client
"""
clients = []
num_samples = []
data = defaultdict(lambda: None)
files = os.listdir(data_dir)
files = [f for f in files if f.endswith(".json")]
for f in files:
file_path = os.path.join(data_dir, f)
u, n, d = self.__read_file__(file_path)
clients.extend(u)
num_samples.extend(n)
data.update(d)
return clients, num_samples, data
def file_per_user(self, dir, write_dir):
clients, num_samples, train_data = self.__read_dir__(dir)
for index, client in enumerate(clients):
my_data = dict()
my_data["users"] = [client]
my_data["num_samples"] = num_samples[index]
my_samples = {"x": train_data[client]["x"], "y": train_data[client]["y"]}
my_data["user_data"] = {client: my_samples}
with open(os.path.join(write_dir, client + ".json"), "w") as of:
json.dump(my_data, of)
print("Created File: ", client + ".json")
def load_trainset(self):
logging.info("Loading training set.")
files = os.listdir(self.train_dir)
files = [f for f in files if f.endswith(".json")]
files.sort()
c_len = len(files)
# clients, num_samples, train_data = self.__read_dir__(self.train_dir)
if self.sizes == None: # Equal distribution of data among processes
e = c_len // self.n_procs
frac = e / c_len
self.sizes = [frac] * self.n_procs
self.sizes[-1] += 1.0 - frac * self.n_procs
logging.debug("Size fractions: {}".format(self.sizes))
self.uid = self.mapping.get_uid(self.rank, self.machine_id)
my_clients = DataPartitioner(files, self.sizes).use(self.uid)
my_train_data = {"x": [], "y": []}
self.clients = []
self.num_samples = []
logging.debug("Clients Length: %d", c_len)
logging.debug("My_clients_len: %d", my_clients.__len__())
for i in range(my_clients.__len__()):
cur_file = my_clients.__getitem__(i)
clients, _, train_data = self.__read_file__(
os.path.join(self.train_dir, cur_file)
)
for cur_client in clients:
self.clients.append(cur_client)
my_train_data["x"].extend(self.process_x(train_data[cur_client]["x"]))
my_train_data["y"].extend(train_data[cur_client]["y"])
self.num_samples.append(len(train_data[cur_client]["y"]))
self.train_x = (
np.array(my_train_data["x"], dtype=np.dtype("float32"))
.reshape(-1, IMAGE_DIM, IMAGE_DIM, CHANNELS)
.transpose(0, 3, 1, 2) # Channel first: torch
)
self.train_y = np.array(my_train_data["y"], dtype=np.dtype("int64")).reshape(-1)
logging.debug("train_x.shape: %s", str(self.train_x.shape))
logging.debug("train_y.shape: %s", str(self.train_y.shape))
assert self.train_x.shape[0] == self.train_y.shape[0]
assert self.train_x.shape[0] > 0
def load_testset(self):
logging.info("Loading testing set.")
_, _, d = self.__read_dir__(self.test_dir)
test_x = []
test_y = []
for test_data in d.values():
test_x.extend(self.process_x(test_data["x"]))
test_y.extend(test_data["y"])
self.test_x = (
np.array(test_x, dtype=np.dtype("float32"))
.reshape(-1, IMAGE_DIM, IMAGE_DIM, CHANNELS)
.transpose(0, 3, 1, 2)
)
self.test_y = np.array(test_y, dtype=np.dtype("int64")).reshape(-1)
logging.debug("test_x.shape: %s", str(self.test_x.shape))
logging.debug("test_y.shape: %s", str(self.test_y.shape))
assert self.test_x.shape[0] == self.test_y.shape[0]
assert self.test_x.shape[0] > 0
def __init__(
self,
rank,
machine_id,
mapping,
n_procs="",
train_dir="",
test_dir="",
images_dir="",
sizes="",
test_batch_size=128,
):
"""
Constructor which reads the data files, instantiates and partitions the dataset
Parameters
----------
rank : int
Rank of the current process (to get the partition). Default value is assigned 0
n_procs : int, optional
The number of processes among which to divide the data. Default value is assigned 1
train_dir : str, optional
Path to the training data files. Required to instantiate the training set
The training set is partitioned according to n_procs and sizes
test_dir : str. optional
Path to the testing data files Required to instantiate the testing set
sizes : list(int), optional
A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0
By default, each process gets an equal amount.
"""
super().__init__(
rank,
machine_id,
mapping,
n_procs,
train_dir,
test_dir,
sizes,
test_batch_size,
)
self.IMAGES_DIR = utils.conditional_value(images_dir, "", None)
assert self.IMAGES_DIR != None
if self.__training__:
self.load_trainset()
if self.__testing__:
self.load_testset()
# TODO: Add Validation
def process_x(self, raw_x_batch):
x_batch = [self._load_image(i) for i in raw_x_batch]
x_batch = np.array(x_batch)
return x_batch
def _load_image(self, img_name):
img = Image.open(os.path.join(self.IMAGES_DIR, img_name[:-4] + ".png"))
img = img.resize((IMAGE_DIM, IMAGE_DIM)).convert("RGB")
return np.array(img)
def get_client_ids(self):
"""
Function to retrieve all the clients of the current process
Returns
-------
list(str)
A list of strings of the client ids.
"""
return self.clients
def get_client_id(self, i):
"""
Function to get the client id of the ith sample
Parameters
----------
i : int
Index of the sample
Returns
-------
str
Client ID
Raises
------
IndexError
If the sample index is out of bounds
"""
lb = 0
for j in range(len(self.clients)):
if i < lb + self.num_samples[j]:
return self.clients[j]
raise IndexError("i is out of bounds!")
def get_trainset(self, batch_size=1, shuffle=False):
"""
Function to get the training set
Parameters
----------
batch_size : int, optional
Batch size for learning
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the training set was not initialized
"""
if self.__training__:
return DataLoader(
Data(self.train_x, self.train_y), batch_size=batch_size, shuffle=shuffle
)
raise RuntimeError("Training set not initialized!")
def get_testset(self):
"""
Function to get the test set
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the test set was not initialized
"""
if self.__testing__:
return DataLoader(
Data(self.test_x, self.test_y), batch_size=self.test_batch_size
)
raise RuntimeError("Test set not initialized!")
def imshow(self, img):
npimg = img.numpy()
plt.imshow(np.transpose(npimg, (1, 2, 0)))
plt.show()
def test(self, model, loss):
testloader = self.get_testset()
logging.debug("Test Loader instantiated.")
correct_pred = [0 for _ in range(NUM_CLASSES)]
total_pred = [0 for _ in range(NUM_CLASSES)]
total_correct = 0
total_predicted = 0
with torch.no_grad():
loss_val = 0.0
count = 0
for elems, labels in testloader:
outputs = model(elems)
loss_val += loss(outputs, labels).item()
count += 1
_, predictions = torch.max(outputs, 1)
for label, prediction in zip(labels, predictions):
logging.debug("{} predicted as {}".format(label, prediction))
if label == prediction:
correct_pred[label] += 1
total_correct += 1
total_pred[label] += 1
total_predicted += 1
logging.debug("Predicted on the test set")
for key, value in enumerate(correct_pred):
if total_pred[key] != 0:
accuracy = 100 * float(value) / total_pred[key]
else:
accuracy = 100.0
logging.debug("Accuracy for class {} is: {:.1f} %".format(key, accuracy))
accuracy = 100 * float(total_correct) / total_predicted
loss_val = loss_val / count
logging.info("Overall accuracy is: {:.1f} %".format(accuracy))
return accuracy, loss_val
class CNN(Model):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(CHANNELS, 32, 3, padding="same")
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(32, 32, 3, padding="same")
self.conv3 = nn.Conv2d(32, 32, 3, padding="same")
self.conv4 = nn.Conv2d(32, 32, 3, padding="same")
self.fc1 = nn.Linear(5 * 5 * 32, NUM_CLASSES)
def forward(self, x):
x = F.relu(self.pool(self.conv1(x)))
x = F.relu(self.pool(self.conv2(x)))
x = F.relu(self.pool(self.conv3(x)))
x = F.relu(self.pool(self.conv4(x)))
x = torch.flatten(x, 1)
x = self.fc1(x)
return x
...@@ -9,7 +9,9 @@ class Dataset: ...@@ -9,7 +9,9 @@ class Dataset:
def __init__( def __init__(
self, self,
rank="", rank,
machine_id,
mapping,
n_procs="", n_procs="",
train_dir="", train_dir="",
test_dir="", test_dir="",
...@@ -33,7 +35,9 @@ class Dataset: ...@@ -33,7 +35,9 @@ class Dataset:
A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0 A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0
By default, each process gets an equal amount. By default, each process gets an equal amount.
""" """
self.rank = utils.conditional_value(rank, "", 0) self.rank = rank
self.machine_id = machine_id
self.mapping = mapping
self.n_procs = utils.conditional_value(n_procs, "", 1) self.n_procs = utils.conditional_value(n_procs, "", 1)
self.train_dir = utils.conditional_value(train_dir, "", None) self.train_dir = utils.conditional_value(train_dir, "", None)
self.test_dir = utils.conditional_value(test_dir, "", None) self.test_dir = utils.conditional_value(test_dir, "", None)
......
...@@ -120,10 +120,10 @@ class Femnist(Dataset): ...@@ -120,10 +120,10 @@ class Femnist(Dataset):
def load_testset(self): def load_testset(self):
logging.info("Loading testing set.") logging.info("Loading testing set.")
_, _, test_data = self.__read_dir__(self.test_dir) _, _, d = self.__read_dir__(self.test_dir)
test_x = [] test_x = []
test_y = [] test_y = []
for test_data in test_data.values(): for test_data in d.values():
for x in test_data["x"]: for x in test_data["x"]:
test_x.append(x) test_x.append(x)
for y in test_data["y"]: for y in test_data["y"]:
...@@ -141,7 +141,9 @@ class Femnist(Dataset): ...@@ -141,7 +141,9 @@ class Femnist(Dataset):
def __init__( def __init__(
self, self,
rank=0, rank,
machine_id,
mapping,
n_procs="", n_procs="",
train_dir="", train_dir="",
test_dir="", test_dir="",
...@@ -152,7 +154,7 @@ class Femnist(Dataset): ...@@ -152,7 +154,7 @@ class Femnist(Dataset):
Constructor which reads the data files, instantiates and partitions the dataset Constructor which reads the data files, instantiates and partitions the dataset
Parameters Parameters
---------- ----------
rank : int, optional rank : int
Rank of the current process (to get the partition). Default value is assigned 0 Rank of the current process (to get the partition). Default value is assigned 0
n_procs : int, optional n_procs : int, optional
The number of processes among which to divide the data. Default value is assigned 1 The number of processes among which to divide the data. Default value is assigned 1
...@@ -165,7 +167,16 @@ class Femnist(Dataset): ...@@ -165,7 +167,16 @@ class Femnist(Dataset):
A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0 A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0
By default, each process gets an equal amount. By default, each process gets an equal amount.
""" """
super().__init__(rank, n_procs, train_dir, test_dir, sizes, test_batch_size) super().__init__(
rank,
machine_id,
mapping,
n_procs,
train_dir,
test_dir,
sizes,
test_batch_size,
)
if self.__training__: if self.__training__:
self.load_trainset() self.load_trainset()
......
...@@ -100,7 +100,9 @@ class Node: ...@@ -100,7 +100,9 @@ class Node:
dataset_params = utils.remove_keys( dataset_params = utils.remove_keys(
dataset_configs, ["dataset_package", "dataset_class", "model_class"] dataset_configs, ["dataset_package", "dataset_class", "model_class"]
) )
self.dataset = dataset_class(rank, **dataset_params) self.dataset = dataset_class(
self.rank, self.machine_id, self.mapping, **dataset_params
)
logging.info("Dataset instantiation complete.") logging.info("Dataset instantiation complete.")
...@@ -193,9 +195,15 @@ class Node: ...@@ -193,9 +195,15 @@ class Node:
) as inf: ) as inf:
results_dict = json.load(inf) results_dict = json.load(inf)
else: else:
results_dict = {"train_loss": {}, "test_loss": {}, "test_acc": {}} results_dict = {
"train_loss": {},
"test_loss": {},
"test_acc": {},
"total_bytes": {},
}
results_dict["train_loss"][iteration + 1] = loss_after_sharing results_dict["train_loss"][iteration + 1] = loss_after_sharing
results_dict["total_bytes"][iteration + 1] = self.communication.total_bytes
self.save_plot( self.save_plot(
results_dict["train_loss"], results_dict["train_loss"],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment