Skip to content
Snippets Groups Projects
Commit b132f117 authored by Rishi Sharma's avatar Rishi Sharma
Browse files

Sharing

parent d9ef1c38
No related branches found
No related tags found
No related merge requests found
......@@ -6,7 +6,7 @@ graph_class = SmallWorld
dataset_package = decentralizepy.datasets.Femnist
dataset_class = Femnist
model_class = CNN
n_procs = 1
n_procs = 6
train_dir = leaf/data/femnist/data/train
test_dir = leaf/data/femnist/data/test
; python list of fractions below
......@@ -20,17 +20,16 @@ lr = 0.01
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
epochs_per_round = 25
epochs_per_round = 5
batch_size = 512
shuffle = True
loss_package = torch.nn
loss_class = CrossEntropyLoss
[COMMUNICATION]
comm_package = decentralizepy.communication.Communication
comm_class = Communication
comm_package = decentralizepy.communication.TCP
comm_class = TCP
addresses_filepath = ip_addr.json
total_procs = 4
[SHARING]
sharing_package = decentralizepy.sharing.Sharing
......
6
1
0 3 4
3 5
1 2 5
1
2 3
\ No newline at end of file
%% Cell type:code id: tags:
```
from datasets.Femnist import Femnist
from graphs import SmallWorld
from collections import defaultdict
import os
import json
import numpy as np
```
%% Cell type:code id: tags:
```
a = FEMNIST
a
```
%% Cell type:code id: tags:
```
b = SmallWorld(6, 2, 2, 1)
```
%% Cell type:code id: tags:
```
b.adj_list
```
%% Cell type:code id: tags:
```
for i in range(12):
print(b.neighbors(i))
```
%% Cell type:code id: tags:
```
clients = []
```
%% Cell type:code id: tags:
```
num_samples = []
data = defaultdict(lambda : None)
```
%% Cell type:code id: tags:
```
datadir = "./leaf/data/femnist/data/train"
files = os.listdir(datadir)
total_users=0
users = set()
```
%% Cell type:code id: tags:
```
files = os.listdir(datadir)[0:1]
```
%% Cell type:code id: tags:
```
for f in files:
file_path = os.path.join(datadir, f)
print(file_path)
with open(file_path, 'r') as inf:
client_data = json.load(inf)
current_users = len(client_data['users'])
print("Current_Users: ", current_users)
total_users += current_users
users.update(client_data['users'])
print("total_users: ", total_users)
print("total_users: ", len(users))
print(client_data['user_data'].keys())
print(np.array(client_data['user_data']['f3408_47']['x']).shape)
print(np.array(client_data['user_data']['f3408_47']['y']).shape)
print(np.array(client_data['user_data']['f3327_11']['x']).shape)
print(np.array(client_data['user_data']['f3327_11']['y']).shape)
print(np.unique(np.array(client_data['user_data']['f3327_11']['y'])))
```
%% Cell type:code id: tags:
```
file = 'run.py'
with open(file, 'r') as inf:
print(inf.readline().strip())
print(inf.readlines())
```
%% Cell type:code id: tags:
```
def f(l):
l[2] = 'c'
a = ['a', 'a', 'a']
print(a)
f(a)
print(a)
```
%% Cell type:code id: tags:
```
l = ['a', 'b', 'c']
print(l[:-1])
```
%% Cell type:code id: tags:
```
from localconfig import LocalConfig
def read_ini(file_path):
config = LocalConfig(file_path)
for section in config:
print("Section: ", section)
for key, value in config.items(section):
print((key, value))
print(dict(config.items('DATASET')))
return config
config = read_ini("config.ini")
for section in config:
print(section)
#d = dict(config.sections())
```
%% Cell type:code id: tags:
```
def func(a = 1, b = 2, c = 3):
print(a + b + c)
l = [3, 5, 7]
func(*l)
```
%% Cell type:code id: tags:
```
from torch import multiprocessing as mp
mp.spawn(fn = func, nprocs = 2, args = [], kwargs = {'a': 4, 'b': 5, 'c': 6})
```
%% Cell type:code id: tags:
```
l = '[0.4, 0.2, 0.3, 0.1]'
type(eval(l))
```
%% Cell type:code id: tags:
```
from decentralizepy.datasets.Femnist import Femnist
f1 = Femnist(0, 1, 'leaf/data/femnist/data/train')
ts = f1.get_trainset(1)
for data, target in ts:
print(data)
break
```
%% Cell type:code id: tags:
```
from decentralizepy.datasets.Femnist import Femnist
from decentralizepy.graphs.SmallWorld import SmallWorld
from decentralizepy.mappings.Linear import Linear
f = Femnist(2, 'leaf/data/femnist/data/train', sizes=[0.6, 0.4])
g = SmallWorld(4, 1, 0.5)
l = Linear(2, 2)
```
%% Cell type:code id: tags:
```
from decentralizepy.node.Node import Node
from torch import multiprocessing as mp
import logging
n1 = Node(0, l, g, f, "./results", logging.DEBUG)
n2 = Node(1, l, g, f, "./results", logging.DEBUG)
# mp.spawn(fn = Node, nprocs = 2, args=[l,g,f])
```
%% Cell type:code id: tags:
```
from testing import f
```
%% Cell type:code id: tags:
```
from torch import multiprocessing as mp
import torch
m1 = torch.nn.Linear(1,1)
o1 = torch.optim.SGD(m1.parameters(), 0.6)
print(m1)
mp.spawn(fn = f, nprocs = 2, args=[m1, o1])
```
%% Cell type:markdown id: tags:
%% Cell type:code id: tags:
```
o1.param_groups
```
%% Cell type:code id: tags:
```
with torch.no_grad():
o1.param_groups[0]["params"][0].copy_(torch.zeros(1,))
```
%% Cell type:code id: tags:
```
o1.param_groups
```
%% Cell type:code id: tags:
```
m1.state_dict()
```
%% Cell type:code id: tags:
```
import torch
loss = getattr(torch.nn.functional, 'nll_loss')
```
%% Cell type:code id: tags:
```
loss
```
%% Cell type:code id: tags:
```
%matplotlib inline
from decentralizepy.node.Node import Node
from decentralizepy.graphs.SmallWorld import SmallWorld
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Linear import Linear
from torch import multiprocessing as mp
import torch
import logging
from localconfig import LocalConfig
def read_ini(file_path):
config = LocalConfig(file_path)
for section in config:
print("Section: ", section)
for key, value in config.items(section):
print((key, value))
print(dict(config.items('DATASET')))
return config
config = read_ini("config.ini")
my_config = dict()
for section in config:
my_config[section] = dict(config.items(section))
#f = Femnist(2, 'leaf/data/femnist/data/train', sizes=[0.6, 0.4])
g = SmallWorld(4, 1, 0.5)
print(g)
l = Linear(2, 2)
g = Graph()
g.read_graph_from_file("graph.adj", "adjacency")
l = Linear(1, 6)
#Node(0, 0, l, g, my_config, 20, "results", logging.DEBUG)
#mp.spawn(fn = Node, nprocs = 1, args=[0,l,g,my_config,20,"results",logging.DEBUG])
mp.spawn(fn = Node, nprocs = 6, args=[0,l,g,my_config,20,"results",logging.DEBUG])
# mp.spawn(fn = Node, args = [l, g, config, 10, "results", logging.DEBUG], nprocs=2)
```
%% Output
Section: GRAPH
('package', 'decentralizepy.graphs.SmallWorld')
('graph_class', 'SmallWorld')
Section: DATASET
('dataset_package', 'decentralizepy.datasets.Femnist')
('dataset_class', 'Femnist')
('model_class', 'CNN')
('n_procs', 2)
('train_dir', 'leaf/data/femnist/data/train')
('test_dir', 'leaf/data/femnist/data/test')
('sizes', '')
Section: OPTIMIZER_PARAMS
('optimizer_package', 'torch.optim')
('optimizer_class', 'Adam')
('lr', 0.01)
Section: TRAIN_PARAMS
('training_package', 'decentralizepy.training.Training')
('training_class', 'Training')
('epochs_per_round', 1)
('batch_size', 512)
('shuffle', True)
('loss_package', 'torch.nn')
('loss_class', 'CrossEntropyLoss')
Section: COMMUNICATION
('comm_package', 'decentralizepy.communication.TCP')
('comm_class', 'TCP')
('addresses_filepath', 'ip_addr.json')
Section: SHARING
('sharing_package', 'decentralizepy.sharing.Sharing')
('sharing_class', 'Sharing')
{'dataset_package': 'decentralizepy.datasets.Femnist', 'dataset_class': 'Femnist', 'model_class': 'CNN', 'n_procs': 2, 'train_dir': 'leaf/data/femnist/data/train', 'test_dir': 'leaf/data/femnist/data/test', 'sizes': ''}
n: <class 'int'> 1
n: <class 'int'> 0
n: <class 'int'> 1
n: <class 'int'> 0
n: <class 'int'> 1
n: <class 'int'> 0
n: <class 'int'> 1
n: <class 'int'> 0
n: <class 'int'> 1
n: <class 'int'> 0
n: <class 'int'> 1
n: <class 'int'> 0
n: <class 'int'> 1
n: <class 'int'> 0
n: <class 'int'> 1
n: <class 'int'> 0
n: <class 'int'> 1
n: <class 'int'> 0
n: <class 'int'> 1
n: <class 'int'> 0
n: <class 'int'> 1
n: <class 'int'> 0
n: <class 'int'> 1
n: <class 'int'> 0
n: <class 'int'> 0
n: <class 'int'> 1
n: <class 'int'> 1
n: <class 'int'> 0
%% Cell type:code id: tags:
```
```
%% Cell type:code id: tags:
```
from decentralizepy.mappings.Linear import Linear
from testing import f
from torch import multiprocessing as mp
l = Linear(1, 2)
mp.spawn(fn = f, nprocs = 2, args = [0, 2, "ip_addr.json", l])
```
%% Output
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
/tmp/ipykernel_1457289/353106489.py in <module>
3
4 l = Linear(1, 2)
----> 5 mp.spawn(fn = f, nprocs = 2, args = [0, 2, "ip_addr.json", l])
NameError: name 'mp' is not defined
Message sent
Message sent
1 (0, {'message': 'Hi I am rank 0'})
0 (1, {'message': 'Hi I am rank 1'})
%% Cell type:code id: tags:
```
```
......
import json
import logging
from collections import deque
import zmq
HELLO = b"HELLO"
BYE = b"BYE"
class Communication:
"""
Communcation API
"""
def addr(self, rank, machine_id):
machine_addr = self.ip_addrs[str(machine_id)]
port = rank + 20000
return "tcp://{}:{}".format(machine_addr, port)
def __init__(self, rank, machine_id, total_procs, addresses_filepath, mapping):
with open(addresses_filepath) as addrs:
self.ip_addrs = json.load(addrs)
def __init__(self, rank, machine_id, mapping, total_procs):
self.total_procs = total_procs
self.rank = rank
self.machine_id = machine_id
self.mapping = mapping
self.uid = mapping.get_uid(rank, machine_id)
self.identity = str(self.uid).encode()
self.context = zmq.Context()
self.router = self.context.socket(zmq.ROUTER)
self.router.setsockopt(zmq.IDENTITY, self.identity)
self.router.bind(self.addr(rank, machine_id))
self.sent_disconnections = False
self.peer_deque = deque()
self.peer_sockets = dict()
self.barrier = set()
def encrypt(self, data):
return json.dumps(data).encode("utf8")
raise NotImplementedError
def decrypt(self, sender, data):
sender = int(sender.decode())
data = json.loads(data.decode("utf8"))
return sender, data
def connect_neighbours(self, neighbours):
for uid in neighbours:
id = str(uid).encode()
req = self.context.socket(zmq.DEALER)
req.setsockopt(zmq.IDENTITY, self.identity)
req.connect(self.addr(*self.mapping.get_machine_and_rank(uid)))
self.peer_sockets[id] = req
req.send(HELLO)
num_neighbours = len(neighbours)
while len(self.barrier) < num_neighbours:
sender, recv = self.router.recv_multipart()
if recv == HELLO:
logging.info("Recieved {} from {}".format(HELLO, sender))
self.barrier.add(sender)
elif recv == BYE:
logging.info("Recieved {} from {}".format(BYE, sender))
raise RuntimeError(
"A neighbour wants to disconnect before training started!"
)
else:
logging.info(
"Recieved message from {} @ connect_neighbours".format(sender)
)
self.peer_deque.append(self.decrypt(sender, recv))
raise NotImplementedError
def connect_neighbors(self, neighbors):
raise NotImplementedError
def receive(self):
if len(self.peer_deque) != 0:
resp = self.peer_deque[0]
self.peer_deque.popleft()
return resp
sender, recv = self.router.recv_multipart()
if recv == HELLO:
logging.info("Recieved {} from {}".format(HELLO, sender))
raise RuntimeError(
"A neighbour wants to connect when everyone is connected!"
)
elif recv == BYE:
logging.info("Recieved {} from {}".format(BYE, sender))
self.barrier.remove(sender)
if not self.sent_disconnections:
for sock in self.peer_sockets.values():
sock.send(BYE)
self.sent_disconnections = True
else:
logging.info("Recieved message from {}".format(sender))
return self.decrypt(sender, recv)
raise NotImplementedError
def send(self, uid, data):
to_send = self.encrypt(data)
id = str(uid).encode()
self.peer_sockets[id].send(to_send)
print("{} sent the message to {}.".format(self.uid, uid))
raise NotImplementedError
import json
import logging
from collections import deque
import zmq
from decentralizepy.communication.Communication import Communication
HELLO = b"HELLO"
BYE = b"BYE"
class TCP(Communication):
"""
TCP Communication API
"""
def addr(self, rank, machine_id):
machine_addr = self.ip_addrs[str(machine_id)]
port = rank + 20000
return "tcp://{}:{}".format(machine_addr, port)
def __init__(self, rank, machine_id, mapping, total_procs, addresses_filepath):
super().__init__(rank, machine_id, mapping, total_procs)
with open(addresses_filepath) as addrs:
self.ip_addrs = json.load(addrs)
self.total_procs = total_procs
self.rank = rank
self.machine_id = machine_id
self.mapping = mapping
self.uid = mapping.get_uid(rank, machine_id)
self.identity = str(self.uid).encode()
self.context = zmq.Context()
self.router = self.context.socket(zmq.ROUTER)
self.router.setsockopt(zmq.IDENTITY, self.identity)
self.router.bind(self.addr(rank, machine_id))
self.sent_disconnections = False
self.peer_deque = deque()
self.peer_sockets = dict()
self.barrier = set()
def __del__(self):
self.context.destroy(linger=0)
def encrypt(self, data):
return json.dumps(data).encode("utf8")
def decrypt(self, sender, data):
sender = int(sender.decode())
data = json.loads(data.decode("utf8"))
return sender, data
def connect_neighbors(self, neighbors):
for uid in neighbors:
id = str(uid).encode()
req = self.context.socket(zmq.DEALER)
req.setsockopt(zmq.IDENTITY, self.identity)
req.connect(self.addr(*self.mapping.get_machine_and_rank(uid)))
self.peer_sockets[id] = req
req.send(HELLO)
num_neighbors = len(neighbors)
while len(self.barrier) < num_neighbors:
sender, recv = self.router.recv_multipart()
if recv == HELLO:
logging.info("Recieved {} from {}".format(HELLO, sender))
self.barrier.add(sender)
elif recv == BYE:
logging.info("Recieved {} from {}".format(BYE, sender))
raise RuntimeError(
"A neighbour wants to disconnect before training started!"
)
else:
logging.info(
"Recieved message from {} @ connect_neighbors".format(sender)
)
self.peer_deque.append(self.decrypt(sender, recv))
def receive(self):
if len(self.peer_deque) != 0:
resp = self.peer_deque[0]
self.peer_deque.popleft()
return resp
sender, recv = self.router.recv_multipart()
if recv == HELLO:
logging.info("Recieved {} from {}".format(HELLO, sender))
raise RuntimeError(
"A neighbour wants to connect when everyone is connected!"
)
elif recv == BYE:
logging.info("Recieved {} from {}".format(BYE, sender))
self.barrier.remove(sender)
if not self.sent_disconnections:
for sock in self.peer_sockets.values():
sock.send(BYE)
self.sent_disconnections = True
else:
logging.info("Recieved message from {}".format(sender))
return self.decrypt(sender, recv)
def send(self, uid, data):
to_send = self.encrypt(data)
id = str(uid).encode()
self.peer_sockets[id].send(to_send)
logging.info("{} sent the message to {}.".format(self.uid, uid))
......@@ -73,7 +73,7 @@ class Graph:
elif type == "adjacency":
node_id = 0
for line in lines:
neighbours = line.strip().split()
neighbours = map(int, line.strip().split())
self.__insert_adj__(node_id, neighbours)
node_id += 1
else:
......
......@@ -3,6 +3,7 @@ import logging
import os
from decentralizepy import utils
from decentralizepy.communication.Communication import Communication
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Mapping import Mapping
......@@ -70,8 +71,10 @@ class Node:
logging.info("Started process.")
self.rank = rank
self.machine_id = machine_id
self.graph = graph
self.mapping = mapping
self.uid = self.mapping.get_uid(rank, machine_id)
logging.debug("Rank: %d", self.rank)
logging.debug("type(graph): %s", str(type(self.rank)))
......@@ -125,10 +128,28 @@ class Node:
)
self.trainer = train_class(self.model, self.optimizer, loss, **train_params)
self.testset = self.dataset.get_trainset()
comm_configs = config["COMMUNICATION"]
comm_module = importlib.import_module(comm_configs["comm_package"])
comm_class = getattr(comm_module, comm_configs["comm_class"])
comm_params = utils.remove_keys(comm_configs, ["comm_package", "comm_class"])
self.communication = comm_class(self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params)
self.communication.connect_neighbors(self.graph.neighbors(self.uid))
sharing_configs = config["SHARING"]
sharing_package = importlib.import_module(sharing_configs["sharing_package"])
sharing_class = getattr(sharing_package, sharing_configs["sharing_class"])
self.sharing = sharing_class(self.rank, self.machine_id, self.communication, self.mapping, self.graph, self.model, self.dataset)
self.testset = self.dataset.get_testset()
for iteration in range(iterations):
logging.info("Starting training iteration: %d", iteration)
self.trainer.train(self.dataset)
self.sharing.step()
if self.dataset.__testing__:
self.dataset.test(self.model)
from collections import deque
import json
import logging
import torch
import numpy
class Sharing:
"""
API defining who to share with and what, and what to do on receiving
"""
def __init__():
raise NotImplementedError
def __init__(self, rank, machine_id, communication, mapping, graph, model, dataset):
self.rank = rank
self.machine_id = machine_id
self.uid = mapping.get_uid(rank, machine_id)
self.communication = communication
self.mapping = mapping
self.graph = graph
self.model = model
self.dataset = dataset
self.peer_deques = dict()
my_neighbors = self.graph.neighbors(self.uid)
for n in my_neighbors:
self.peer_deques[n] = deque()
def received_from_all(self):
for _, i in self.peer_deques.items():
if len(i) == 0:
return False
return True
def get_neighbors(self, neighbors):
# modify neighbors here
return neighbors
def serialized_model(self):
m = dict()
for key, val in self.model.state_dict().items():
m[key] = json.dumps(val.numpy().tolist())
return m
def deserialized_model(self, m):
state_dict = dict()
for key, value in m.items():
state_dict[key] = torch.from_numpy(numpy.array(json.loads(value)))
return state_dict
def step(self):
data = self.serialized_model()
my_uid = self.mapping.get_uid(self.rank, self.machine_id)
all_neighbors = self.graph.neighbors(my_uid)
iter_neighbors = self.get_neighbors(all_neighbors)
data['degree'] = len(all_neighbors)
for neighbor in iter_neighbors:
self.communication.send(neighbor, data)
while not self.received_from_all():
sender, data = self.communication.receive()
logging.info("Received model from {}".format(sender))
degree = data["degree"]
del data["degree"]
self.peer_deques[sender].append((degree, self.deserialized_model(data)))
total = dict()
weight_total = 0
for n in self.peer_deques:
degree, data = self.peer_deques[n].popleft()
#logging.info("top element: {}".format(d))
weight = 1/(max(len(self.peer_deques), degree) + 1) # Metro-Hastings
weight_total += weight
for key, value in data.items():
if key in total:
total[key] += value * weight
else:
total[key] = value * weight
for key, value in self.model.state_dict().items():
total[key] += (1 - weight_total) * value # Metro-Hastings
self.model.load_state_dict(total)
from decentralizepy.node.Node import Node
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Linear import Linear
from torch import multiprocessing as mp
import logging
from decentralizepy.communication.Communication import Communication
from decentralizepy.mappings.Linear import Linear
from localconfig import LocalConfig
def read_ini(file_path):
config = LocalConfig(file_path)
for section in config:
print("Section: ", section)
for key, value in config.items(section):
print((key, value))
print(dict(config.items('DATASET')))
return config
def f(rank, m_id, total_procs, filePath, mapping):
c = Communication(rank, m_id, total_procs, filePath, mapping)
c.connect_neighbours([i for i in range(total_procs) if i != c.uid])
send = {}
send["message"] = "Hi I am rank {}".format(rank)
c.send((c.uid + 1) % total_procs, send)
print(c.uid, c.receive())
if __name__ == "__main__":
config = read_ini("config.ini")
my_config = dict()
for section in config:
my_config[section] = dict(config.items(section))
g = Graph()
g.read_graph_from_file("graph.adj", "adjacency")
l = Linear(1, 6)
if __name__ == "__main__":
l = Linear(2, 2)
m_id = int(input())
mp.spawn(fn=f, nprocs=2, args=[m_id, 4, "ip_addr.json", l])
mp.spawn(fn = Node, nprocs = 6, args=[0,l,g,my_config,20,"results",logging.DEBUG])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment