Skip to content
Snippets Groups Projects
Commit db2d7161 authored by Rishi Sharma's avatar Rishi Sharma
Browse files

Communication

parent adcd9b05
No related branches found
No related tags found
No related merge requests found
......@@ -5,30 +5,32 @@ graph_class = SmallWorld
[DATASET]
dataset_package = decentralizepy.datasets.Femnist
dataset_class = Femnist
model_class = LogisticRegression
n_procs = 1.0
model_class = CNN
n_procs = 1
train_dir = leaf/data/femnist/data/train
test_dir =
test_dir = leaf/data/femnist/data/test
; python list of fractions below
sizes = [0.4, 0.2, 0.3, 0.1]
sizes =
[OPTIMIZER_PARAMS]
optimizer_package = torch.optim
optimizer_class = SGD
lr = 0.1
optimizer_class = Adam
lr = 0.01
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
batch_size = 64
shuffle = False
loss_package = torch.nn.functional
loss = nll_loss
batch_size = 512
shuffle = True
loss_package = torch.nn
loss_class = CrossEntropyLoss
[COMMUNICATION]
comm_package = decentralizepy.communication.Communication
comm_class = Communcation
comm_class = Communication
addresses_filepath = ip_addr.json
total_procs = 4
[SHARING]
sharing_package = decentralizepy.sharing.Sharing
......
{
"0": "labostrex131",
"1": "labostrex132"
}
\ No newline at end of file
%% Cell type:code id: tags:
```
from datasets.Femnist import Femnist
from graphs import SmallWorld
from collections import defaultdict
import os
import json
import numpy as np
```
%% Cell type:code id: tags:
```
a = FEMNIST
a
```
%% Cell type:code id: tags:
```
b = SmallWorld(6, 2, 2, 1)
```
%% Cell type:code id: tags:
```
b.adj_list
```
%% Cell type:code id: tags:
```
for i in range(12):
print(b.neighbors(i))
```
%% Cell type:code id: tags:
```
clients = []
```
%% Cell type:code id: tags:
```
num_samples = []
data = defaultdict(lambda : None)
```
%% Cell type:code id: tags:
```
datadir = "./leaf/data/femnist/data/train"
files = os.listdir(datadir)
total_users=0
users = set()
```
%% Cell type:code id: tags:
```
files = os.listdir(datadir)[0:1]
```
%% Cell type:code id: tags:
```
for f in files:
file_path = os.path.join(datadir, f)
print(file_path)
with open(file_path, 'r') as inf:
client_data = json.load(inf)
current_users = len(client_data['users'])
print("Current_Users: ", current_users)
total_users += current_users
users.update(client_data['users'])
print("total_users: ", total_users)
print("total_users: ", len(users))
print(client_data['user_data'].keys())
print(np.array(client_data['user_data']['f3408_47']['x']).shape)
print(np.array(client_data['user_data']['f3408_47']['y']).shape)
print(np.array(client_data['user_data']['f3327_11']['x']).shape)
print(np.array(client_data['user_data']['f3327_11']['y']).shape)
print(np.unique(np.array(client_data['user_data']['f3327_11']['y'])))
```
%% Cell type:code id: tags:
```
file = 'run.py'
with open(file, 'r') as inf:
print(inf.readline().strip())
print(inf.readlines())
```
%% Cell type:code id: tags:
```
def f(l):
l[2] = 'c'
a = ['a', 'a', 'a']
print(a)
f(a)
print(a)
```
%% Cell type:code id: tags:
```
l = ['a', 'b', 'c']
print(l[:-1])
```
%% Cell type:code id: tags:
```
from localconfig import LocalConfig
def read_ini(file_path):
config = LocalConfig(file_path)
for section in config:
print("Section: ", section)
for key, value in config.items(section):
print((key, value))
print(dict(config.items('DATASET')))
return config
read_ini("config.ini")
config = read_ini("config.ini")
for section in config:
print(section)
#d = dict(config.sections())
```
%% Cell type:code id: tags:
```
def func(a = 1, b = 2, c = 3):
print(a + b + c)
l = [3, 5, 7]
func(*l)
```
%% Cell type:code id: tags:
```
from torch import multiprocessing as mp
mp.spawn(fn = func, nprocs = 2, args = [], kwargs = {'a': 4, 'b': 5, 'c': 6})
```
%% Cell type:code id: tags:
```
l = '[0.4, 0.2, 0.3, 0.1]'
type(eval(l))
```
%% Cell type:code id: tags:
```
f1 = Femnist(1, 'leaf/data/femnist/data/train')
f1.instantiate_dataset()
f1.train_x.shape
from decentralizepy.datasets.Femnist import Femnist
f1 = Femnist(0, 1, 'leaf/data/femnist/data/train')
ts = f1.get_trainset(1)
for data, target in ts:
print(data)
break
```
%% Cell type:code id: tags:
```
from decentralizepy.datasets.Femnist import Femnist
from decentralizepy.graphs.SmallWorld import SmallWorld
from decentralizepy.mappings.Linear import Linear
f = Femnist(2, 'leaf/data/femnist/data/train', sizes=[0.6, 0.4])
g = SmallWorld(4, 1, 0.5)
l = Linear(2, 2)
```
%% Cell type:code id: tags:
```
from decentralizepy.node.Node import Node
from torch import multiprocessing as mp
import logging
n1 = Node(0, l, g, f, "./results", logging.DEBUG)
n2 = Node(1, l, g, f, "./results", logging.DEBUG)
# mp.spawn(fn = Node, nprocs = 2, args=[l,g,f])
```
%% Cell type:code id: tags:
```
from testing import f
```
%% Cell type:code id: tags:
```
from torch import multiprocessing as mp
import torch
m1 = torch.nn.Linear(1,1)
o1 = torch.optim.SGD(m1.parameters(), 0.6)
print(m1)
mp.spawn(fn = f, nprocs = 2, args=[m1, o1])
```
%% Output
Linear(in_features=1, out_features=1, bias=True)
1 OrderedDict([('weight', tensor([[0.9654]])), ('bias', tensor([-0.2141]))])
1 [{'params': [Parameter containing:
tensor([[0.9654]], requires_grad=True), Parameter containing:
tensor([-0.2141], requires_grad=True)], 'lr': 0.6, 'momentum': 0, 'dampening': 0, 'weight_decay': 0, 'nesterov': False}]
1 OrderedDict([('weight', tensor([[0.]])), ('bias', tensor([-0.2141]))])
1 [{'params': [Parameter containing:
tensor([[0.]], requires_grad=True), Parameter containing:
tensor([-0.2141], requires_grad=True)], 'lr': 0.6, 'momentum': 0, 'dampening': 0, 'weight_decay': 0, 'nesterov': False}]
0 OrderedDict([('weight', tensor([[0.]])), ('bias', tensor([-0.2141]))])
0 [{'params': [Parameter containing:
tensor([[0.]], requires_grad=True), Parameter containing:
tensor([-0.2141], requires_grad=True)], 'lr': 0.6, 'momentum': 0, 'dampening': 0, 'weight_decay': 0, 'nesterov': False}]
0 OrderedDict([('weight', tensor([[0.]])), ('bias', tensor([-0.2141]))])
0 [{'params': [Parameter containing:
tensor([[0.]], requires_grad=True), Parameter containing:
tensor([-0.2141], requires_grad=True)], 'lr': 0.6, 'momentum': 0, 'dampening': 0, 'weight_decay': 0, 'nesterov': False}]
%% Cell type:markdown id: tags:
%% Cell type:code id: tags:
```
o1.param_groups
```
%% Cell type:code id: tags:
```
with torch.no_grad():
o1.param_groups[0]["params"][0].copy_(torch.zeros(1,))
```
%% Cell type:code id: tags:
```
o1.param_groups
```
%% Cell type:code id: tags:
```
m1.state_dict()
```
%% Cell type:code id: tags:
```
import torch
loss = getattr(torch.nn.functional, 'nll_loss')
```
%% Cell type:code id: tags:
```
loss
```
%% Cell type:code id: tags:
```
%matplotlib inline
from decentralizepy.node.Node import Node
from decentralizepy.graphs.SmallWorld import SmallWorld
from decentralizepy.mappings.Linear import Linear
from torch import multiprocessing as mp
import torch
import logging
from localconfig import LocalConfig
def read_ini(file_path):
config = LocalConfig(file_path)
for section in config:
print("Section: ", section)
for key, value in config.items(section):
print((key, value))
print(dict(config.items('DATASET')))
return config
config = read_ini("config.ini")
my_config = dict()
for section in config:
my_config[section] = dict(config.items(section))
#f = Femnist(2, 'leaf/data/femnist/data/train', sizes=[0.6, 0.4])
g = SmallWorld(4, 1, 0.5)
print(g)
l = Linear(2, 2)
#Node(0, 0, l, g, my_config, 20, "results", logging.DEBUG)
#mp.spawn(fn = Node, nprocs = 1, args=[0,l,g,my_config,20,"results",logging.DEBUG])
# mp.spawn(fn = Node, args = [l, g, config, 10, "results", logging.DEBUG], nprocs=2)
```
%% Cell type:code id: tags:
```
from decentralizepy.mappings.Linear import Linear
from testing import f
from torch import multiprocessing as mp
l = Linear(1, 2)
mp.spawn(fn = f, nprocs = 2, args = [0, 2, "ip_addr.json", l])
```
%% Output
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
/tmp/ipykernel_1457289/353106489.py in <module>
3
4 l = Linear(1, 2)
----> 5 mp.spawn(fn = f, nprocs = 2, args = [0, 2, "ip_addr.json", l])
NameError: name 'mp' is not defined
%% Cell type:code id: tags:
```
```
......
import json
import logging
from collections import deque
import zmq
HELLO = b"HELLO"
BYE = b"BYE"
class Communication:
"""
Communcation API
"""
def __init__():
raise NotImplementedError
\ No newline at end of file
def addr(self, rank, machine_id):
machine_addr = self.ip_addrs[str(machine_id)]
port = rank + 20000
return "tcp://{}:{}".format(machine_addr, port)
def __init__(self, rank, machine_id, total_procs, addresses_filepath, mapping):
with open(addresses_filepath) as addrs:
self.ip_addrs = json.load(addrs)
self.total_procs = total_procs
self.rank = rank
self.machine_id = machine_id
self.mapping = mapping
self.uid = mapping.get_uid(rank, machine_id)
self.identity = str(self.uid).encode()
self.context = zmq.Context()
self.router = self.context.socket(zmq.ROUTER)
self.router.setsockopt(zmq.IDENTITY, self.identity)
self.router.bind(self.addr(rank, machine_id))
self.sent_disconnections = False
self.peer_deque = deque()
self.peer_sockets = dict()
self.barrier = set()
def encrypt(self, data):
return json.dumps(data).encode("utf8")
def decrypt(self, sender, data):
sender = int(sender.decode())
data = json.loads(data.decode("utf8"))
return sender, data
def connect_neighbours(self, neighbours):
for uid in neighbours:
id = str(uid).encode()
req = self.context.socket(zmq.DEALER)
req.setsockopt(zmq.IDENTITY, self.identity)
req.connect(self.addr(*self.mapping.get_machine_and_rank(uid)))
self.peer_sockets[id] = req
req.send(HELLO)
num_neighbours = len(neighbours)
while len(self.barrier) < num_neighbours:
sender, recv = self.router.recv_multipart()
if recv == HELLO:
logging.info("Recieved {} from {}".format(HELLO, sender))
self.barrier.add(sender)
elif recv == BYE:
logging.info("Recieved {} from {}".format(BYE, sender))
raise RuntimeError(
"A neighbour wants to disconnect before training started!"
)
else:
logging.info(
"Recieved message from {} @ connect_neighbours".format(sender)
)
self.peer_deque.append(self.decrypt(sender, recv))
def receive(self):
if len(self.peer_deque) != 0:
resp = self.peer_deque[0]
self.peer_deque.popleft()
return resp
sender, recv = self.router.recv_multipart()
if recv == HELLO:
logging.info("Recieved {} from {}".format(HELLO, sender))
raise RuntimeError(
"A neighbour wants to connect when everyone is connected!"
)
elif recv == BYE:
logging.info("Recieved {} from {}".format(BYE, sender))
self.barrier.remove(sender)
if not self.sent_disconnections:
for sock in self.peer_sockets.values():
sock.send(BYE)
self.sent_disconnections = True
else:
logging.info("Recieved message from {}".format(sender))
return self.decrypt(sender, recv)
def send(self, uid, data):
to_send = self.encrypt(data)
id = str(uid).encode()
self.peer_sockets[id].send(to_send)
print("Message sent")
from decentralizepy import utils
class Dataset:
"""
This class defines the Dataset API.
......@@ -33,7 +34,6 @@ class Dataset:
if type(self.sizes) == str:
self.sizes = eval(self.sizes)
if train_dir:
self.__training__ = True
else:
......
......@@ -3,10 +3,13 @@ import logging
import os
from collections import defaultdict
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn.functional as F
import torchvision
from torch import nn
from torch._C import ParameterDict
from torch.utils.data import DataLoader
from decentralizepy.datasets.Data import Data
......@@ -109,17 +112,21 @@ class Femnist(Dataset):
logging.debug("train_y.shape: %s", str(self.train_y.shape))
if self.__testing__:
logging.info("Loading training set.")
logging.info("Loading testing set.")
_, _, test_data = self.__read_dir__(self.test_dir)
test_data = test_data.values()
test_x = []
test_y = []
for test_data in test_data.values():
for x in test_data["x"]:
test_x.append(x)
for y in test_data["y"]:
test_y.append(y)
self.test_x = (
np.array(test_data["x"], dtype=np.dtype("float32"))
np.array(test_x, dtype=np.dtype("float32"))
.reshape(-1, 28, 28, 1)
.transpose(0, 3, 1, 2)
)
self.test_y = np.array(test_data["y"], dtype=np.dtype("int64")).reshape(
-1
)
self.test_y = np.array(test_y, dtype=np.dtype("int64")).reshape(-1)
logging.debug("test_x.shape: %s", str(self.test_x.shape))
logging.debug("test_y.shape: %s", str(self.test_y.shape))
......@@ -158,12 +165,12 @@ class Femnist(Dataset):
raise IndexError("i is out of bounds!")
def get_trainset(self, batch_size, shuffle = False):
def get_trainset(self, batch_size=1, shuffle=False):
"""
Function to get the training set
Parameters
----------
batch_size : int
batch_size : int, optional
Batch size for learning
Returns
-------
......@@ -174,7 +181,9 @@ class Femnist(Dataset):
If the training set was not initialized
"""
if self.__training__:
return DataLoader(Data(self.train_x, self.train_y), batch_size = batch_size, shuffle = shuffle)
return DataLoader(
Data(self.train_x, self.train_y), batch_size=batch_size, shuffle=shuffle
)
raise RuntimeError("Training set not initialized!")
def get_testset(self):
......@@ -189,9 +198,43 @@ class Femnist(Dataset):
If the test set was not initialized
"""
if self.__testing__:
return Data(self.test_x, self.test_y)
return DataLoader(Data(self.test_x, self.test_y))
raise RuntimeError("Test set not initialized!")
def imshow(self, img):
npimg = img.numpy()
plt.imshow(np.transpose(npimg, (1, 2, 0)))
plt.show()
def test(self, model):
testloader = self.get_testset()
# dataiter = iter(testloader)
# images, labels = dataiter.next()
# self.imshow(torchvision.utils.make_grid(images))
# plt.savefig(' '.join('%5s' % j for j in labels) + ".png")
# print(' '.join('%5s' % j for j in labels))
correct_pred = [0 for _ in range(NUM_CLASSES)]
total_pred = [0 for _ in range(NUM_CLASSES)]
with torch.no_grad():
for elems, labels in testloader:
outputs = model(elems)
_, predictions = torch.max(outputs, 1)
for label, prediction in zip(labels, predictions):
if label == prediction:
correct_pred[label] += 1
total_pred[label] += 1
total_correct = 0
for key, value in enumerate(correct_pred):
accuracy = 100 * float(value) / total_pred[key]
total_correct += value
logging.debug("Accuracy for class {} is: {:.1f} %".format(key, accuracy))
accuracy = 100 * float(total_correct) / testloader.__len__()
logging.info("Overall accuracy is: {:.1f} %".format(accuracy))
class LogisticRegression(nn.Module):
"""
......@@ -220,4 +263,22 @@ class LogisticRegression(nn.Module):
"""
x = torch.flatten(x, start_dim=1)
x = self.fc1(x)
return F.log_softmax(x, dim=1)
return x
class CNN(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(1, 32, 5, padding=2)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(32, 64, 5, padding=2)
self.fc1 = nn.Linear(7 * 7 * 64, 512)
self.fc2 = nn.Linear(512, NUM_CLASSES)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = torch.flatten(x, 1)
x = F.relu(self.fc1(x))
x = self.fc2(x)
return x
......@@ -47,6 +47,6 @@ class Linear(Mapping):
Returns
-------
2-tuple
a tuple of machine_id and rank
a tuple of rank and machine_id
"""
return (uid // self.procs_per_machine), (uid % self.procs_per_machine)
return (uid % self.procs_per_machine), (uid // self.procs_per_machine)
......@@ -38,7 +38,7 @@ class Mapping:
Returns
-------
2-tuple
a tuple of machine_id and rank
a tuple of rank and machine_id
"""
raise NotImplementedError
import importlib
import logging
import os
from decentralizepy.datasets.Dataset import Dataset
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy import utils
from torch import optim
import importlib
class Node:
"""
This class defines the node (entity that performs learning, sharing and communication).
"""
def __init__(
self,
rank: int,
......@@ -21,7 +19,7 @@ class Node:
mapping: Mapping,
graph: Graph,
config,
iterations = 1,
iterations=1,
log_dir=".",
log_level=logging.INFO,
*args
......@@ -78,12 +76,14 @@ class Node:
logging.debug("Rank: %d", self.rank)
logging.debug("type(graph): %s", str(type(self.rank)))
logging.debug("type(mapping): %s", str(type(self.mapping)))
dataset_configs = config["DATASET"]
dataset_module = importlib.import_module(dataset_configs["dataset_package"])
dataset_class = getattr(dataset_module, dataset_configs["dataset_class"])
dataset_params = utils.remove_keys(dataset_configs, ["dataset_package", "dataset_class", "model_class"])
self.dataset = dataset_class(rank, **dataset_params)
dataset_params = utils.remove_keys(
dataset_configs, ["dataset_package", "dataset_class", "model_class"]
)
self.dataset = dataset_class(rank, **dataset_params)
logging.info("Dataset instantiation complete.")
......@@ -91,9 +91,15 @@ class Node:
self.model = model_class()
optimizer_configs = config["OPTIMIZER_PARAMS"]
optimizer_module = importlib.import_module(optimizer_configs["optimizer_package"])
optimizer_class = getattr(optimizer_module, optimizer_configs["optimizer_class"])
optimizer_params = utils.remove_keys(optimizer_configs, ["optimizer_package", "optimizer_class"])
optimizer_module = importlib.import_module(
optimizer_configs["optimizer_package"]
)
optimizer_class = getattr(
optimizer_module, optimizer_configs["optimizer_class"]
)
optimizer_params = utils.remove_keys(
optimizer_configs, ["optimizer_package", "optimizer_class"]
)
self.optimizer = optimizer_class(self.model.parameters(), **optimizer_params)
train_configs = config["TRAIN_PARAMS"]
......@@ -101,11 +107,28 @@ class Node:
train_class = getattr(train_module, train_configs["training_class"])
loss_package = importlib.import_module(train_configs["loss_package"])
loss = getattr(loss_package, train_configs["loss"])
if "loss_class" in train_configs.keys():
loss_class = getattr(loss_package, train_configs["loss_class"])
loss = loss_class()
else:
loss = getattr(loss_package, train_configs["loss"])
train_params = utils.remove_keys(train_configs, ["training_package", "training_class", "loss", "loss_package"])
train_params = utils.remove_keys(
train_configs,
[
"training_package",
"training_class",
"loss",
"loss_package",
"loss_class",
],
)
self.trainer = train_class(self.model, self.optimizer, loss, **train_params)
self.testset = self.dataset.get_trainset()
for iteration in range(iterations):
logging.info("Starting training iteration: %d", iteration)
self.trainer.train(self.dataset)
\ No newline at end of file
self.trainer.train(self.dataset)
if self.dataset.__testing__:
self.dataset.test(self.model)
......@@ -2,5 +2,6 @@ class Sharing:
"""
API defining who to share with and what, and what to do on receiving
"""
def __init__():
raise NotImplementedError
\ No newline at end of file
raise NotImplementedError
import logging
import matplotlib.pyplot as plt
import numpy as np
import torch
import torchvision
from decentralizepy import utils
import logging
class Training:
"""
This class implements the training module for a single node.
"""
def __init__(self, model, optimizer, loss, epochs_per_round = "", batch_size = "", shuffle = ""):
def __init__(
self, model, optimizer, loss, epochs_per_round="", batch_size="", shuffle=""
):
"""
Constructor
Parameters
......@@ -24,10 +34,15 @@ class Training:
self.model = model
self.optimizer = optimizer
self.loss = loss
self.epochs_per_round = utils.conditional_value(epochs_per_round, "", 1)
self.batch_size = utils.conditional_value(batch_size, "", 1)
self.epochs_per_round = utils.conditional_value(epochs_per_round, "", int(1))
self.batch_size = utils.conditional_value(batch_size, "", int(1))
self.shuffle = utils.conditional_value(shuffle, "", False)
def imshow(self, img):
npimg = img.numpy()
plt.imshow(np.transpose(npimg, (1, 2, 0)))
plt.show()
def train(self, dataset):
"""
One training iteration
......@@ -37,8 +52,16 @@ class Training:
The training dataset. Should implement get_trainset(batch_size, shuffle)
"""
trainset = dataset.get_trainset(self.batch_size, self.shuffle)
# dataiter = iter(trainset)
# images, labels = dataiter.next()
# self.imshow(torchvision.utils.make_grid(images[:16]))
# plt.savefig(' '.join('%5s' % j for j in labels) + ".png")
# print(' '.join('%5s' % j for j in labels[:16]))
for epoch in range(self.epochs_per_round):
epoch_loss = 0.0
count = 0
for data, target in trainset:
self.model.zero_grad()
output = self.model(data)
......@@ -46,4 +69,5 @@ class Training:
epoch_loss += loss_val.item()
loss_val.backward()
self.optimizer.step()
logging.info("Epoch_loss: %d", epoch_loss)
count += 1
logging.info("Epoch: {} loss: {}".format(epoch, epoch_loss / count))
......@@ -4,5 +4,6 @@ def conditional_value(var, nul, default):
else:
return default
def remove_keys(d, keys_to_remove):
return {key: d[key] for key in d if key not in keys_to_remove}
\ No newline at end of file
return {key: d[key] for key in d if key not in keys_to_remove}
from torch import multiprocessing as mp
from decentralizepy.communication.Communication import Communication
from decentralizepy.mappings.Linear import Linear
def f(rank, m_id, total_procs, filePath, mapping):
c = Communication(rank, m_id, total_procs, filePath, mapping)
c.connect_neighbours([i for i in range(total_procs) if i != rank])
send = {}
send["message"] = "Hi I am rank {}".format(rank)
c.send((rank + 1) % total_procs, send)
print(rank, c.receive())
if __name__ == "__main__":
l = Linear(2, 2)
m_id = int(input())
mp.spawn(fn=f, nprocs=2, args=[m_id, 4, "ip_addr.json", l])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment