Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • sacs/decentralizepy
  • mvujas/decentralizepy
  • randl/decentralizepy
3 results
Show changes
Showing
with 1521 additions and 513 deletions
[DATASET]
dataset_package = decentralizepy.datasets.Shakespeare
dataset_class = Shakespeare
random_seed = 97
model_class = LSTM
train_dir = /mnt/nfs/shared/leaf/data/shakespeare_sub96/per_user_data/train
test_dir = /mnt/nfs/shared/leaf/data/shakespeare_sub96/data/test
; python list of fractions below
sizes =
[OPTIMIZER_PARAMS]
optimizer_package = torch.optim
optimizer_class = SGD
lr = 0.1
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
rounds = 10
full_epochs = False
batch_size = 16
shuffle = True
loss_package = torch.nn
loss_class = CrossEntropyLoss
[COMMUNICATION]
comm_package = decentralizepy.communication.TCP
comm_class = TCP
addresses_filepath = ip_addr_6Machines.json
[SHARING]
sharing_package = decentralizepy.sharing.Wavelet
sharing_class = Wavelet
change_based_selection = True
alpha = 0.1
wavelet=sym2
level= 4
accumulation = True
accumulate_averaging_changes = True
import logging
from pathlib import Path
from shutil import copy
from localconfig import LocalConfig
from torch import multiprocessing as mp
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Linear import Linear
from decentralizepy.node.DPSGDNode import DPSGDNode
def read_ini(file_path):
config = LocalConfig(file_path)
for section in config:
print("Section: ", section)
for key, value in config.items(section):
print((key, value))
print(dict(config.items("DATASET")))
return config
if __name__ == "__main__":
args = utils.get_args()
Path(args.log_dir).mkdir(parents=True, exist_ok=True)
log_level = {
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}
config = read_ini(args.config_file)
my_config = dict()
for section in config:
my_config[section] = dict(config.items(section))
copy(args.config_file, args.log_dir)
copy(args.graph_file, args.log_dir)
utils.write_args(args, args.log_dir)
g = Graph()
g.read_graph_from_file(args.graph_file, args.graph_type)
n_machines = args.machines
procs_per_machine = args.procs_per_machine
l = Linear(n_machines, procs_per_machine)
m_id = args.machine_id
processes = []
for r in range(procs_per_machine):
processes.append(
mp.Process(
target=DPSGDNode,
args=[
r,
m_id,
l,
g,
my_config,
args.iterations,
args.log_dir,
args.weights_store_dir,
log_level[args.log_level],
args.test_after,
args.train_evaluate_after,
args.reset_optimizer,
],
)
)
for p in processes:
p.start()
for p in processes:
p.join()
import logging
from pathlib import Path
from shutil import copy
from localconfig import LocalConfig
from torch import multiprocessing as mp
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Linear import Linear
from decentralizepy.node.DPSGDNodeFederated import DPSGDNodeFederated
from decentralizepy.node.FederatedParameterServer import FederatedParameterServer
def read_ini(file_path):
config = LocalConfig(file_path)
for section in config:
print("Section: ", section)
for key, value in config.items(section):
print((key, value))
print(dict(config.items("DATASET")))
return config
if __name__ == "__main__":
args = utils.get_args()
Path(args.log_dir).mkdir(parents=True, exist_ok=True)
log_level = {
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}
config = read_ini(args.config_file)
my_config = dict()
for section in config:
my_config[section] = dict(config.items(section))
copy(args.config_file, args.log_dir)
copy(args.graph_file, args.log_dir)
utils.write_args(args, args.log_dir)
g = Graph()
g.read_graph_from_file(args.graph_file, args.graph_type)
n_machines = args.machines
procs_per_machine = args.procs_per_machine
l = Linear(n_machines, procs_per_machine)
m_id = args.machine_id
sm = args.server_machine
sr = args.server_rank
processes = []
if sm == m_id:
processes.append(
mp.Process(
target=FederatedParameterServer,
args=[
sr,
m_id,
l,
g,
my_config,
args.iterations,
args.log_dir,
args.weights_store_dir,
log_level[args.log_level],
args.test_after,
args.train_evaluate_after,
args.working_rate,
],
)
)
for r in range(0, procs_per_machine):
processes.append(
mp.Process(
target=DPSGDNodeFederated,
args=[
r,
m_id,
l,
g,
my_config,
args.iterations,
args.log_dir,
args.weights_store_dir,
log_level[args.log_level],
args.test_after,
args.train_evaluate_after,
args.reset_optimizer,
],
)
)
for p in processes:
p.start()
for p in processes:
p.join()
import argparse
import logging
from pathlib import Path
from shutil import copy
from localconfig import LocalConfig
from torch import multiprocessing as mp
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Linear import Linear
from decentralizepy.node.Node import Node
from decentralizepy.node.KNN import KNN
def read_ini(file_path):
......@@ -20,27 +22,58 @@ def read_ini(file_path):
if __name__ == "__main__":
config = read_ini("config.ini")
args = utils.get_args()
Path(args.log_dir).mkdir(parents=True, exist_ok=True)
log_level = {
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}
config = read_ini(args.config_file)
my_config = dict()
for section in config:
my_config[section] = dict(config.items(section))
parser = argparse.ArgumentParser()
parser.add_argument("-mid", "--machine_id", type=int, default=0)
parser.add_argument("-ps", "--procs_per_machine", type=int, default=1)
parser.add_argument("-ms", "--machines", type=int, default=1)
args = parser.parse_args()
copy(args.config_file, args.log_dir)
copy(args.graph_file, args.log_dir)
utils.write_args(args, args.log_dir)
g = Graph()
g.read_graph_from_file("36_nodes.edges", "edges")
g.read_graph_from_file(args.graph_file, args.graph_type)
n_machines = args.machines
procs_per_machine = args.procs_per_machine
l = Linear(n_machines, procs_per_machine)
m_id = args.machine_id
mp.spawn(
fn=Node,
nprocs=procs_per_machine,
args=[m_id, l, g, my_config, 20, "results", logging.INFO],
)
processes = []
for r in range(procs_per_machine):
processes.append(
mp.Process(
target=KNN,
args=[
r,
m_id,
l,
g,
my_config,
args.iterations,
args.log_dir,
args.weights_store_dir,
log_level[args.log_level],
args.test_after,
args.train_evaluate_after,
args.reset_optimizer,
],
)
)
for p in processes:
p.start()
for p in processes:
p.join()
import logging
from pathlib import Path
from shutil import copy
from localconfig import LocalConfig
from torch import multiprocessing as mp
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Linear import Linear
from decentralizepy.node.DPSGDWithPeerSampler import DPSGDWithPeerSampler
from decentralizepy.node.PeerSampler import PeerSampler
def read_ini(file_path):
config = LocalConfig(file_path)
for section in config:
print("Section: ", section)
for key, value in config.items(section):
print((key, value))
print(dict(config.items("DATASET")))
return config
if __name__ == "__main__":
args = utils.get_args()
Path(args.log_dir).mkdir(parents=True, exist_ok=True)
log_level = {
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}
config = read_ini(args.config_file)
my_config = dict()
for section in config:
my_config[section] = dict(config.items(section))
copy(args.config_file, args.log_dir)
copy(args.graph_file, args.log_dir)
utils.write_args(args, args.log_dir)
g = Graph()
g.read_graph_from_file(args.graph_file, args.graph_type)
n_machines = args.machines
procs_per_machine = args.procs_per_machine
l = Linear(n_machines, procs_per_machine)
m_id = args.machine_id
sm = args.server_machine
sr = args.server_rank
processes = []
if sm == m_id:
processes.append(
mp.Process(
# target=PeerSamplerDynamic,
target=PeerSampler,
args=[
sr,
m_id,
l,
g,
my_config,
args.iterations,
args.log_dir,
log_level[args.log_level],
],
)
)
for r in range(0, procs_per_machine):
processes.append(
mp.Process(
target=DPSGDWithPeerSampler,
args=[
r,
m_id,
l,
g,
my_config,
args.iterations,
args.log_dir,
args.weights_store_dir,
log_level[args.log_level],
args.test_after,
args.train_evaluate_after,
args.reset_optimizer,
],
)
)
for p in processes:
p.start()
for p in processes:
p.join()
import logging
from pathlib import Path
from shutil import copy
from localconfig import LocalConfig
from torch import multiprocessing as mp
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Linear import Linear
from decentralizepy.node.DPSGDWithPeerSampler import DPSGDWithPeerSampler
from decentralizepy.node.PeerSamplerDynamic import PeerSamplerDynamic
def read_ini(file_path):
config = LocalConfig(file_path)
for section in config:
print("Section: ", section)
for key, value in config.items(section):
print((key, value))
print(dict(config.items("DATASET")))
return config
if __name__ == "__main__":
args = utils.get_args()
Path(args.log_dir).mkdir(parents=True, exist_ok=True)
log_level = {
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}
config = read_ini(args.config_file)
my_config = dict()
for section in config:
my_config[section] = dict(config.items(section))
copy(args.config_file, args.log_dir)
copy(args.graph_file, args.log_dir)
utils.write_args(args, args.log_dir)
g = Graph()
g.read_graph_from_file(args.graph_file, args.graph_type)
n_machines = args.machines
procs_per_machine = args.procs_per_machine
l = Linear(n_machines, procs_per_machine)
m_id = args.machine_id
sm = args.server_machine
sr = args.server_rank
processes = []
if sm == m_id:
processes.append(
mp.Process(
target=PeerSamplerDynamic,
args=[
sr,
m_id,
l,
g,
my_config,
args.iterations,
args.log_dir,
log_level[args.log_level],
],
)
)
for r in range(0, procs_per_machine):
processes.append(
mp.Process(
target=DPSGDWithPeerSampler,
args=[
r,
m_id,
l,
g,
my_config,
args.iterations,
args.log_dir,
args.weights_store_dir,
log_level[args.log_level],
args.test_after,
args.train_evaluate_after,
args.reset_optimizer,
],
)
)
for p in processes:
p.start()
for p in processes:
p.join()
#!\bin\bash
cd
mkdir -p Gitlab
cd Gitlab
git clone git@gitlab.epfl.ch:risharma/decentralizepy.git
cd decentralizepy
mkdir -p leaf/data/femnist/data/train
mkdir -p leaf/data/femnist/data/test
mkdir -p leaf/data/femnist/per_user_data/train
~/miniconda3/bin/conda remove --name decpy --all
~/miniconda3/bin/conda create -n decpy python=3.9
~/miniconda3/envs/decpy/bin/pip install --upgrade pip --quiet
~/miniconda3/envs/decpy/bin/pip install --editable .\[dev\]
echo "[Cloning leaf repository]"
git clone https://github.com/TalwalkarLab/leaf.git
echo "[Installing unzip]"
sudo apt-get install unzip
cd leaf/data/shakespeare
echo "[Generating non-iid data]"
./preprocess.sh -s niid --sf 1.0 -k 0 -t sample -tf 0.8 --smplseed 10 --spltseed 10
echo "[Data Generated]"
\ No newline at end of file
%% Cell type:code id: tags:
```
from datasets.Femnist import Femnist
from graphs import SmallWorld
from collections import defaultdict
import os
import json
import numpy as np
```
%% Cell type:code id: tags:
```
a = FEMNIST
a
```
%% Cell type:code id: tags:
```
b = SmallWorld(6, 2, 2, 1)
```
%% Cell type:code id: tags:
```
b.adj_list
```
%% Cell type:code id: tags:
```
for i in range(12):
print(b.neighbors(i))
```
%% Cell type:code id: tags:
```
clients = []
```
%% Cell type:code id: tags:
```
num_samples = []
data = defaultdict(lambda : None)
```
%% Cell type:code id: tags:
```
datadir = "./leaf/data/femnist/data/train"
files = os.listdir(datadir)
total_users=0
users = set()
```
%% Cell type:code id: tags:
```
files = os.listdir(datadir)[0:1]
```
%% Cell type:code id: tags:
```
for f in files:
file_path = os.path.join(datadir, f)
print(file_path)
with open(file_path, 'r') as inf:
client_data = json.load(inf)
current_users = len(client_data['users'])
print("Current_Users: ", current_users)
total_users += current_users
users.update(client_data['users'])
print("total_users: ", total_users)
print("total_users: ", len(users))
print(client_data['user_data'].keys())
print(np.array(client_data['user_data']['f3408_47']['x']).shape)
print(np.array(client_data['user_data']['f3408_47']['y']).shape)
print(np.array(client_data['user_data']['f3327_11']['x']).shape)
print(np.array(client_data['user_data']['f3327_11']['y']).shape)
print(np.unique(np.array(client_data['user_data']['f3327_11']['y'])))
```
%% Cell type:code id: tags:
```
file = 'run.py'
with open(file, 'r') as inf:
print(inf.readline().strip())
print(inf.readlines())
```
%% Cell type:code id: tags:
```
def f(l):
l[2] = 'c'
a = ['a', 'a', 'a']
print(a)
f(a)
print(a)
```
%% Cell type:code id: tags:
```
l = ['a', 'b', 'c']
print(l[:-1])
```
%% Cell type:code id: tags:
```
from localconfig import LocalConfig
def read_ini(file_path):
config = LocalConfig(file_path)
for section in config:
print("Section: ", section)
for key, value in config.items(section):
print((key, value))
print(dict(config.items('DATASET')))
return config
config = read_ini("config.ini")
for section in config:
print(section)
#d = dict(config.sections())
```
%% Cell type:code id: tags:
```
def func(a = 1, b = 2, c = 3):
print(a + b + c)
l = [3, 5, 7]
func(*l)
```
%% Cell type:code id: tags:
```
from torch import multiprocessing as mp
mp.spawn(fn = func, nprocs = 2, args = [], kwargs = {'a': 4, 'b': 5, 'c': 6})
```
%% Cell type:code id: tags:
```
l = '[0.4, 0.2, 0.3, 0.1]'
type(eval(l))
```
%% Cell type:code id: tags:
```
from decentralizepy.datasets.Femnist import Femnist
f1 = Femnist(0, 1, 'leaf/data/femnist/data/train')
ts = f1.get_trainset(1)
for data, target in ts:
print(data)
break
```
%% Cell type:code id: tags:
```
from decentralizepy.datasets.Femnist import Femnist
from decentralizepy.graphs.SmallWorld import SmallWorld
from decentralizepy.mappings.Linear import Linear
f = Femnist(2, 'leaf/data/femnist/data/train', sizes=[0.6, 0.4])
g = SmallWorld(4, 1, 0.5)
l = Linear(2, 2)
```
%% Cell type:code id: tags:
```
from decentralizepy.node.Node import Node
from torch import multiprocessing as mp
import logging
n1 = Node(0, l, g, f, "./results", logging.DEBUG)
n2 = Node(1, l, g, f, "./results", logging.DEBUG)
# mp.spawn(fn = Node, nprocs = 2, args=[l,g,f])
```
%% Cell type:code id: tags:
```
from testing import f
```
%% Cell type:code id: tags:
```
from torch import multiprocessing as mp
import torch
m1 = torch.nn.Linear(1,1)
o1 = torch.optim.SGD(m1.parameters(), 0.6)
print(m1)
mp.spawn(fn = f, nprocs = 2, args=[m1, o1])
```
%% Cell type:markdown id: tags:
%% Cell type:code id: tags:
```
o1.param_groups
```
%% Cell type:code id: tags:
```
with torch.no_grad():
o1.param_groups[0]["params"][0].copy_(torch.zeros(1,))
```
%% Cell type:code id: tags:
```
o1.param_groups
```
%% Cell type:code id: tags:
```
m1.state_dict()
```
%% Cell type:code id: tags:
```
import torch
loss = getattr(torch.nn.functional, 'nll_loss')
```
%% Cell type:code id: tags:
```
loss
```
%% Cell type:code id: tags:
```
%matplotlib inline
from decentralizepy.node.Node import Node
from decentralizepy.graphs.SmallWorld import SmallWorld
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Linear import Linear
from torch import multiprocessing as mp
import torch
import logging
from localconfig import LocalConfig
def read_ini(file_path):
config = LocalConfig(file_path)
for section in config:
print("Section: ", section)
for key, value in config.items(section):
print((key, value))
print(dict(config.items('DATASET')))
return config
config = read_ini("config.ini")
my_config = dict()
for section in config:
my_config[section] = dict(config.items(section))
#f = Femnist(2, 'leaf/data/femnist/data/train', sizes=[0.6, 0.4])
g = Graph()
g.read_graph_from_file("36_nodes.edges", "edges")
l = Linear(1, 36)
#Node(0, 0, l, g, my_config, 20, "results", logging.DEBUG)
mp.spawn(fn = Node, nprocs = g.n_procs, args=[0,l,g,my_config,20,"results",logging.INFO])
# mp.spawn(fn = Node, args = [l, g, config, 10, "results", logging.DEBUG], nprocs=2)
```
%% Output
Section: GRAPH
('package', 'decentralizepy.graphs.SmallWorld')
('graph_class', 'SmallWorld')
Section: DATASET
('dataset_package', 'decentralizepy.datasets.Femnist')
('dataset_class', 'Femnist')
('model_class', 'CNN')
('n_procs', 36)
('train_dir', 'leaf/data/femnist/per_user_data/train')
('test_dir', 'leaf/data/femnist/data/test')
('sizes', '')
Section: OPTIMIZER_PARAMS
('optimizer_package', 'torch.optim')
('optimizer_class', 'Adam')
('lr', 0.01)
Section: TRAIN_PARAMS
('training_package', 'decentralizepy.training.Training')
('training_class', 'Training')
('epochs_per_round', 1)
('batch_size', 1024)
('shuffle', True)
('loss_package', 'torch.nn')
('loss_class', 'CrossEntropyLoss')
Section: COMMUNICATION
('comm_package', 'decentralizepy.communication.TCP')
('comm_class', 'TCP')
('addresses_filepath', 'ip_addr.json')
Section: SHARING
('sharing_package', 'decentralizepy.sharing.Sharing')
('sharing_class', 'Sharing')
{'dataset_package': 'decentralizepy.datasets.Femnist', 'dataset_class': 'Femnist', 'model_class': 'CNN', 'n_procs': 36, 'train_dir': 'leaf/data/femnist/per_user_data/train', 'test_dir': 'leaf/data/femnist/data/test', 'sizes': ''}
%% Cell type:code id: tags:
```
```
%% Cell type:code id: tags:
```
from decentralizepy.mappings.Linear import Linear
from testing import f
from torch import multiprocessing as mp
l = Linear(1, 2)
mp.spawn(fn = f, nprocs = 2, args = [0, 2, "ip_addr.json", l])
```
%% Cell type:code id: tags:
```
from decentralizepy.datasets.Femnist import Femnist
f = Femnist()
f.file_per_user('leaf/data/femnist/data/train','leaf/data/femnist/per_user_data/train')
```
%% Cell type:code id: tags:
```
a = set()
a.update([2, 3, 4, 5])
```
%% Cell type:code id: tags:
```
a
```
%% Output
{2, 3, 4, 5}
%% Cell type:code id: tags:
```
print(*a)
```
%% Output
2 3 4 5
%% Cell type:code id: tags:
```
from decentralizepy.graphs.SmallWorld import SmallWorld
s = SmallWorld(36, 2, .5)
s.write_graph_to_file('36_nodes.edges')
```
%% Cell type:code id: tags:
```
import sys
sys.argv
```
%% Output
['/home/risharma/miniconda3/envs/decpy/lib/python3.9/site-packages/ipykernel_launcher.py',
'--ip=127.0.0.1',
'--stdin=9008',
'--control=9006',
'--hb=9005',
'--Session.signature_scheme="hmac-sha256"',
'--Session.key=b"eac5d2f8-c460-45f1-a268-1e4b46a6efd6"',
'--shell=9007',
'--transport="tcp"',
'--iopub=9009',
'--f=/tmp/tmp-21212479paJaUBJBN84.json']
%% Cell type:code id: tags:
```
```
import torch
import torch.multiprocessing as mp
x = [1, 2]
def f(id, a):
print(id, x)
print(id, a)
if __name__ == "__main__":
x.append(3)
mp.spawn(f, nprocs=2, args=(x,))
......@@ -42,13 +42,19 @@ install_requires =
pillow
smallworld
localconfig
PyWavelets
pandas
crudini
sklearn
lz4
fpzip
include_package_data = True
python_requires = >=3.6
[options.packages.find]
where = src
[options.extras_require]
dev =
black
black>22.3.0
coverage
isort
pytest
......
from decentralizepy.datasets.Femnist import Femnist
import sys
f = Femnist()
from decentralizepy.datasets.Reddit import Reddit
from decentralizepy.mappings import Linear
f.file_per_user("leaf/data/femnist/data/train", "leaf/data/femnist/per_user_data/train")
if __name__ == "__main__":
mapping = Linear(6, 16)
f = Reddit(0, 0, mapping)
assert len(sys.argv) == 3
frm = sys.argv[1]
to = sys.argv[2]
f.file_per_user(frm, to)
class Communication:
"""
Communcation API
"""
def __init__(self, rank, machine_id, mapping, total_procs):
"""
Constructor
Parameters
----------
rank : int
Local rank of the process
machine_id : int
Machine id of the process
mapping : decentralizepy.mappings.Mapping
uid, rank, machine_id invertible mapping
total_procs : int
Total number of processes
"""
self.total_procs = total_procs
self.rank = rank
self.machine_id = machine_id
self.mapping = mapping
self.uid = mapping.get_uid(rank, machine_id)
self.total_bytes = 0
def encrypt(self, data):
"""
Encode/Encrypt data.
Parameters
----------
data : dict
Data dict to send
Returns
-------
byte
Encoded data
"""
raise NotImplementedError
def decrypt(self, sender, data):
"""
Decodes received data.
Parameters
----------
sender : byte
sender of the data
data : byte
Data received
Returns
-------
tuple
(sender: int, data: dict)
"""
raise NotImplementedError
def connect_neighbors(self, neighbors):
"""
Connects all neighbors.
Parameters
----------
neighbors : list(int)
List of neighbors
"""
raise NotImplementedError
def receive(self):
"""
Returns ONE message received.
Returns
----------
dict
Received and decrypted data
"""
raise NotImplementedError
def send(self, uid, data):
"""
Send a message to a process.
Parameters
----------
uid : int
Neighbor's unique ID
data : dict
Message as a Python dictionary
"""
raise NotImplementedError
def disconnect_neighbors(self):
"""
Disconnects all neighbors.
"""
raise NotImplementedError
import json
import logging
import pickle
from collections import deque
import zmq
......@@ -13,14 +14,62 @@ BYE = b"BYE"
class TCP(Communication):
"""
TCP Communication API
"""
def addr(self, rank, machine_id):
"""
Returns TCP address of the process.
Parameters
----------
rank : int
Local rank of the process
machine_id : int
Machine id of the process
Returns
-------
str
Full address of the process using TCP
"""
machine_addr = self.ip_addrs[str(machine_id)]
port = rank + 20000
port = (2 * rank + 1) + self.offset
assert port > 0
return "tcp://{}:{}".format(machine_addr, port)
def __init__(self, rank, machine_id, mapping, total_procs, addresses_filepath):
def __init__(
self,
rank,
machine_id,
mapping,
total_procs,
addresses_filepath,
offset=9000,
recv_timeout=50,
):
"""
Constructor
Parameters
----------
rank : int
Local rank of the process
machine_id : int
Machine id of the process
mapping : decentralizepy.mappings.Mapping
uid, rank, machine_id invertible mapping
total_procs : int
Total number of processes
addresses_filepath : str
JSON file with machine_id -> ip mapping
compression_package : str
Import path of a module that implements the compression.Compression.Compression class
compression_class : str
Name of the compression class inside the compression package
"""
super().__init__(rank, machine_id, mapping, total_procs)
with open(addresses_filepath) as addrs:
......@@ -30,87 +79,149 @@ class TCP(Communication):
self.rank = rank
self.machine_id = machine_id
self.mapping = mapping
self.offset = offset
self.recv_timeout = recv_timeout
self.uid = mapping.get_uid(rank, machine_id)
self.identity = str(self.uid).encode()
self.context = zmq.Context()
self.router = self.context.socket(zmq.ROUTER)
self.router.setsockopt(zmq.IDENTITY, self.identity)
self.router.setsockopt(zmq.RCVTIMEO, self.recv_timeout)
self.router.setsockopt(zmq.ROUTER_MANDATORY, 1)
self.router.bind(self.addr(rank, machine_id))
self.sent_disconnections = False
self.total_data = 0
self.total_meta = 0
self.peer_deque = deque()
self.peer_sockets = dict()
self.barrier = set()
def __del__(self):
"""
Destroys zmq context
"""
self.context.destroy(linger=0)
def encrypt(self, data):
return json.dumps(data).encode("utf8")
"""
Encode data as python pickle.
Parameters
----------
data : dict
Data dict to send
Returns
-------
byte
Encoded data
"""
data_len = 0
if "params" in data:
data_len = len(pickle.dumps(data["params"]))
output = pickle.dumps(data)
self.total_meta += len(output) - data_len
self.total_data += data_len
return output
def decrypt(self, sender, data):
"""
Decode received pickle data.
Parameters
----------
sender : byte
sender of the data
data : byte
Data received
Returns
-------
tuple
(sender: int, data: dict)
"""
sender = int(sender.decode())
data = json.loads(data.decode("utf8"))
data = pickle.loads(data)
return sender, data
def connect_neighbors(self, neighbors):
for uid in neighbors:
logging.info("Connecting to my neighbour: {}".format(uid))
id = str(uid).encode()
req = self.context.socket(zmq.DEALER)
req.setsockopt(zmq.IDENTITY, self.identity)
req.connect(self.addr(*self.mapping.get_machine_and_rank(uid)))
self.peer_sockets[id] = req
req.send(HELLO)
num_neighbors = len(neighbors)
while len(self.barrier) < num_neighbors:
sender, recv = self.router.recv_multipart()
if recv == HELLO:
logging.info("Recieved {} from {}".format(HELLO, sender))
self.barrier.add(sender)
elif recv == BYE:
logging.info("Recieved {} from {}".format(BYE, sender))
raise RuntimeError(
"A neighbour wants to disconnect before training started!"
)
else:
logging.debug(
"Recieved message from {} @ connect_neighbors".format(sender)
)
self.peer_deque.append(self.decrypt(sender, recv))
def receive(self):
if len(self.peer_deque) != 0:
resp = self.peer_deque[0]
self.peer_deque.popleft()
return resp
sender, recv = self.router.recv_multipart()
if recv == HELLO:
logging.info("Recieved {} from {}".format(HELLO, sender))
raise RuntimeError(
"A neighbour wants to connect when everyone is connected!"
)
elif recv == BYE:
logging.info("Recieved {} from {}".format(BYE, sender))
self.barrier.remove(sender)
self.disconnect_neighbors()
def init_connection(self, neighbor):
"""
Initiates a socket to a given node.
Parameters
----------
neighbor : int
neighbor to connect to
"""
logging.debug("Connecting to my neighbour: {}".format(neighbor))
id = str(neighbor).encode()
req = self.context.socket(zmq.DEALER)
req.setsockopt(zmq.IDENTITY, self.identity)
req.connect(self.addr(*self.mapping.get_machine_and_rank(neighbor)))
self.peer_sockets[id] = req
def destroy_connection(self, neighbor, linger=None):
id = str(neighbor).encode()
if self.already_connected(neighbor):
self.peer_sockets[id].close(linger=linger)
del self.peer_sockets[id]
def already_connected(self, neighbor):
id = str(neighbor).encode()
return id in self.peer_sockets
def receive(self, block=True):
"""
Returns ONE message received.
Returns
----------
dict
Received and decrypted data
Raises
------
RuntimeError
If received HELLO
"""
while True:
try:
sender, recv = self.router.recv_multipart()
s, r = self.decrypt(sender, recv)
return s, r
except zmq.ZMQError as exc:
if exc.errno == zmq.EAGAIN:
if not block:
return None
else:
continue
else:
raise
def send(self, uid, data, encrypt=True):
"""
Send a message to a process.
Parameters
----------
uid : int
Neighbor's unique ID
data : dict
Message as a Python dictionary
"""
if encrypt:
to_send = self.encrypt(data)
else:
logging.debug("Recieved message from {}".format(sender))
return self.decrypt(sender, recv)
def send(self, uid, data):
to_send = self.encrypt(data)
to_send = data
data_size = len(to_send)
self.total_bytes += data_size
id = str(uid).encode()
self.peer_sockets[id].send(to_send)
logging.info("{} sent the message to {}.".format(self.uid, uid))
def disconnect_neighbors(self):
if not self.sent_disconnections:
for sock in self.peer_sockets.values():
sock.send(BYE)
self.sent_disconnections = True
logging.debug("{} sent the message to {}.".format(self.uid, uid))
logging.info("Sent message size: {}".format(data_size))
class Compression:
"""
Compression API
"""
def __init__(self):
"""
Constructor
"""
def compress(self, arr):
"""
compression function
Parameters
----------
arr : np.ndarray
Data to compress
Returns
-------
bytearray
encoded data as bytes
"""
raise NotImplementedError
def decompress(self, bytes):
"""
decompression function
Parameters
----------
bytes :bytearray
compressed data
Returns
-------
arr : np.ndarray
decompressed data as array
"""
raise NotImplementedError
def compress_float(self, arr):
"""
compression function for float arrays
Parameters
----------
arr : np.ndarray
Data to compress
Returns
-------
bytearray
encoded data as bytes
"""
raise NotImplementedError
def decompress_float(self, bytes):
"""
decompression function for compressed float arrays
Parameters
----------
bytes :bytearray
compressed data
Returns
-------
arr : np.ndarray
decompressed data as array
"""
raise NotImplementedError
# elias implementation: taken from this stack overflow post:
# https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma
import numpy as np
from decentralizepy.compression.Compression import Compression
class Elias(Compression):
"""
Compression API
"""
def __init__(self):
"""
Constructor
"""
def compress(self, arr):
"""
compression function
Parameters
----------
arr : np.ndarray
Data to compress
Returns
-------
bytearray
encoded data as bytes
"""
arr.sort()
first = arr[0]
arr = np.diff(arr).astype(np.int32)
arr = arr.view(f"u{arr.itemsize}")
l = np.log2(arr).astype("u1")
L = ((l << 1) + 1).cumsum()
out = np.zeros(int(L[-1] + 128), "u1")
for i in range(l.max() + 1):
out[L - i - 1] += (arr >> i) & 1
s = np.array([out.size], dtype=np.int64)
size = np.ndarray(8, dtype="u1", buffer=s.data)
packed = np.packbits(out)
packed[-8:] = size
s = np.array([first], dtype=np.int64)
size = np.ndarray(8, dtype="u1", buffer=s.data)
packed[-16:-8] = size
return packed
def decompress(self, bytes):
"""
decompression function
Parameters
----------
bytes :bytearray
compressed data
Returns
-------
arr : np.ndarray
decompressed data as array
"""
n_arr = bytes[-8:]
n = np.ndarray(1, dtype=np.int64, buffer=n_arr.data)[0]
first = bytes[-16:-8]
first = np.ndarray(1, dtype=np.int64, buffer=first.data)[0]
b = bytes[:-16]
b = np.unpackbits(b, count=n).view(bool)
s = b.nonzero()[0]
s = (s << 1).repeat(np.diff(s, prepend=-1))
s -= np.arange(-1, len(s) - 1)
s = s.tolist() # list has faster __getitem__
ns = len(s)
def gen():
idx = 0
yield idx
while idx < ns:
idx = s[idx]
yield idx
offs = np.fromiter(gen(), int)
sz = np.diff(offs) >> 1
mx = sz.max() + 1
out_fin = np.zeros(offs.size, int)
out_fin[0] = first
out = out_fin[1:]
for i in range(mx):
out[b[offs[1:] - i - 1] & (sz >= i)] += 1 << i
out = np.cumsum(out_fin)
return out
def compress_float(self, arr):
"""
compression function for float arrays
Parameters
----------
arr : np.ndarray
Data to compress
Returns
-------
bytearray
encoded data as bytes
"""
return arr
def decompress_float(self, bytes):
"""
decompression function for compressed float arrays
Parameters
----------
bytes :bytearray
compressed data
Returns
-------
arr : np.ndarray
decompressed data as array
"""
return bytes
# elias implementation: taken from this stack overflow post:
# https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma
import fpzip
from decentralizepy.compression.Elias import Elias
class EliasFpzip(Elias):
"""
Compression API
"""
def __init__(self):
"""
Constructor
"""
def compress_float(self, arr):
"""
compression function for float arrays
Parameters
----------
arr : np.ndarray
Data to compress
Returns
-------
bytearray
encoded data as bytes
"""
return fpzip.compress(arr, precision=0, order="C")
def decompress_float(self, bytes):
"""
decompression function for compressed float arrays
Parameters
----------
bytes :bytearray
compressed data
Returns
-------
arr : np.ndarray
decompressed data as array
"""
return fpzip.decompress(bytes, order="C").squeeze()
# elias implementation: taken from this stack overflow post:
# https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma
import fpzip
from decentralizepy.compression.Elias import Elias
class EliasFpzipLossy(Elias):
"""
Compression API
"""
def __init__(self):
"""
Constructor
"""
def compress_float(self, arr):
"""
compression function for float arrays
Parameters
----------
arr : np.ndarray
Data to compress
Returns
-------
bytearray
encoded data as bytes
"""
return fpzip.compress(arr, precision=18, order="C")
def decompress_float(self, bytes):
"""
decompression function for compressed float arrays
Parameters
----------
bytes :bytearray
compressed data
Returns
-------
arr : np.ndarray
decompressed data as array
"""
return fpzip.decompress(bytes, order="C").squeeze()
import lz4.frame
import numpy as np
from decentralizepy.compression.Compression import Compression
class Lz4Wrapper(Compression):
"""
Compression API
"""
def __init__(self, compress_metadata=True, compress_data=False):
"""
Constructor
"""
self.compress_metadata = compress_metadata
self.compress_data = compress_data
def compress(self, arr):
"""
compression function
Parameters
----------
arr : np.ndarray
Data to compress
Returns
-------
bytearray
encoded data as bytes
"""
if self.compress_metadata:
arr.sort()
diff = np.diff(arr, prepend=0).astype(np.int32)
to_compress = diff.tobytes("C")
return lz4.frame.compress(to_compress)
return arr
def decompress(self, bytes):
"""
decompression function
Parameters
----------
bytes :bytearray
compressed data
Returns
-------
arr : np.ndarray
decompressed data as array
"""
if self.compress_metadata:
decomp = lz4.frame.decompress(bytes)
return np.cumsum(np.frombuffer(decomp, dtype=np.int32))
return bytes
def compress_float(self, arr):
"""
compression function for float arrays
Parameters
----------
arr : np.ndarray
Data to compress
Returns
-------
bytearray
encoded data as bytes
"""
if self.compress_data:
to_compress = arr.tobytes("C")
return lz4.frame.compress(to_compress)
return arr
def decompress_float(self, bytes):
"""
decompression function for compressed float arrays
Parameters
----------
bytes :bytearray
compressed data
Returns
-------
arr : np.ndarray
decompressed data as array
"""
if self.compress_data:
decomp = lz4.frame.decompress(bytes)
return np.frombuffer(decomp, dtype=np.float32)
return bytes
import logging
import torch
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from torch import nn
from torch.utils.data import DataLoader
from decentralizepy.datasets.Dataset import Dataset
from decentralizepy.datasets.Partitioner import DataPartitioner, KShardDataPartitioner
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.models.Model import Model
NUM_CLASSES = 10
class CIFAR10(Dataset):
"""
Class for the FEMNIST dataset
"""
def load_trainset(self):
"""
Loads the training set. Partitions it if needed.
"""
logging.info("Loading training set.")
trainset = torchvision.datasets.CIFAR10(
root=self.train_dir, train=True, download=True, transform=self.transform
)
c_len = len(trainset)
if self.sizes == None: # Equal distribution of data among processes
e = c_len // self.n_procs
frac = e / c_len
self.sizes = [frac] * self.n_procs
self.sizes[-1] += 1.0 - frac * self.n_procs
logging.debug("Size fractions: {}".format(self.sizes))
self.uid = self.mapping.get_uid(self.rank, self.machine_id)
if not self.partition_niid:
self.trainset = DataPartitioner(trainset, self.sizes).use(self.uid)
else:
train_data = {key: [] for key in range(10)}
for x, y in trainset:
train_data[y].append(x)
all_trainset = []
for y, x in train_data.items():
all_trainset.extend([(a, y) for a in x])
self.trainset = KShardDataPartitioner(
all_trainset, self.sizes, shards=self.shards
).use(self.uid)
def load_testset(self):
"""
Loads the testing set.
"""
logging.info("Loading testing set.")
self.testset = torchvision.datasets.CIFAR10(
root=self.test_dir, train=False, download=True, transform=self.transform
)
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
n_procs="",
train_dir="",
test_dir="",
sizes="",
test_batch_size=1024,
partition_niid=False,
shards=1,
):
"""
Constructor which reads the data files, instantiates and partitions the dataset
Parameters
----------
rank : int
Rank of the current process (to get the partition).
machine_id : int
Machine ID
mapping : decentralizepy.mappings.Mapping
Mapping to convert rank, machine_id -> uid for data partitioning
It also provides the total number of global processes
train_dir : str, optional
Path to the training data files. Required to instantiate the training set
The training set is partitioned according to the number of global processes and sizes
test_dir : str. optional
Path to the testing data files Required to instantiate the testing set
sizes : list(int), optional
A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0
By default, each process gets an equal amount.
test_batch_size : int, optional
Batch size during testing. Default value is 64
partition_niid: bool, optional
When True, partitions dataset in a non-iid way
"""
super().__init__(
rank,
machine_id,
mapping,
train_dir,
test_dir,
sizes,
test_batch_size,
)
self.num_classes = NUM_CLASSES
self.partition_niid = partition_niid
self.shards = shards
self.transform = transforms.Compose(
[
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
]
)
if self.__training__:
self.load_trainset()
if self.__testing__:
self.load_testset()
# TODO: Add Validation
def get_trainset(self, batch_size=1, shuffle=False):
"""
Function to get the training set
Parameters
----------
batch_size : int, optional
Batch size for learning
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the training set was not initialized
"""
if self.__training__:
return DataLoader(self.trainset, batch_size=batch_size, shuffle=shuffle)
raise RuntimeError("Training set not initialized!")
def get_testset(self):
"""
Function to get the test set
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the test set was not initialized
"""
if self.__testing__:
return DataLoader(self.testset, batch_size=self.test_batch_size)
raise RuntimeError("Test set not initialized!")
def test(self, model, loss):
"""
Function to evaluate model on the test dataset.
Parameters
----------
model : decentralizepy.models.Model
Model to evaluate
loss : torch.nn.loss
Loss function to evaluate
Returns
-------
tuple
(accuracy, loss_value)
"""
testloader = self.get_testset()
logging.debug("Test Loader instantiated.")
correct_pred = [0 for _ in range(NUM_CLASSES)]
total_pred = [0 for _ in range(NUM_CLASSES)]
total_correct = 0
total_predicted = 0
with torch.no_grad():
loss_val = 0.0
count = 0
for elems, labels in testloader:
outputs = model(elems)
loss_val += loss(outputs, labels).item()
count += 1
_, predictions = torch.max(outputs, 1)
for label, prediction in zip(labels, predictions):
logging.debug("{} predicted as {}".format(label, prediction))
if label == prediction:
correct_pred[label] += 1
total_correct += 1
total_pred[label] += 1
total_predicted += 1
logging.debug("Predicted on the test set")
for key, value in enumerate(correct_pred):
if total_pred[key] != 0:
accuracy = 100 * float(value) / total_pred[key]
else:
accuracy = 100.0
logging.debug("Accuracy for class {} is: {:.1f} %".format(key, accuracy))
accuracy = 100 * float(total_correct) / total_predicted
loss_val = loss_val / count
logging.info("Overall accuracy is: {:.1f} %".format(accuracy))
return accuracy, loss_val
class CNN(Model):
"""
Class for a CNN Model for CIFAR10
"""
def __init__(self):
"""
Constructor. Instantiates the CNN Model
with 10 output classes
"""
super().__init__()
# 1.6 million params
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, NUM_CLASSES)
def forward(self, x):
"""
Forward pass of the model
Parameters
----------
x : torch.tensor
The input torch tensor
Returns
-------
torch.tensor
The output torch tensor
"""
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = torch.flatten(x, 1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
class LeNet(Model):
"""
Class for a LeNet Model for CIFAR10
Inspired by original LeNet network for MNIST: https://ieeexplore.ieee.org/abstract/document/726791
"""
def __init__(self):
"""
Constructor. Instantiates the CNN Model
with 10 output classes
"""
super().__init__()
self.conv1 = nn.Conv2d(3, 32, 5, padding="same")
self.pool = nn.MaxPool2d(2, 2)
self.gn1 = nn.GroupNorm(2, 32)
self.conv2 = nn.Conv2d(32, 32, 5, padding="same")
self.gn2 = nn.GroupNorm(2, 32)
self.conv3 = nn.Conv2d(32, 64, 5, padding="same")
self.gn3 = nn.GroupNorm(2, 64)
self.fc1 = nn.Linear(64 * 4 * 4, NUM_CLASSES)
def forward(self, x):
"""
Forward pass of the model
Parameters
----------
x : torch.tensor
The input torch tensor
Returns
-------
torch.tensor
The output torch tensor
"""
x = self.pool(F.relu(self.gn1(self.conv1(x))))
x = self.pool(F.relu(self.gn2(self.conv2(x))))
x = self.pool(F.relu(self.gn3(self.conv3(x))))
x = torch.flatten(x, 1)
x = self.fc1(x)
return x