Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • sacs/decentralizepy
  • mvujas/decentralizepy
  • randl/decentralizepy
3 results
Show changes
Showing
with 2657 additions and 100 deletions
import logging
from pathlib import Path
from shutil import copy
from localconfig import LocalConfig
from torch import multiprocessing as mp
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Linear import Linear
from decentralizepy.node.DPSGDNodeFederated import DPSGDNodeFederated
from decentralizepy.node.FederatedParameterServer import FederatedParameterServer
def read_ini(file_path):
config = LocalConfig(file_path)
for section in config:
print("Section: ", section)
for key, value in config.items(section):
print((key, value))
print(dict(config.items("DATASET")))
return config
if __name__ == "__main__":
args = utils.get_args()
Path(args.log_dir).mkdir(parents=True, exist_ok=True)
log_level = {
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}
config = read_ini(args.config_file)
my_config = dict()
for section in config:
my_config[section] = dict(config.items(section))
copy(args.config_file, args.log_dir)
copy(args.graph_file, args.log_dir)
utils.write_args(args, args.log_dir)
g = Graph()
g.read_graph_from_file(args.graph_file, args.graph_type)
n_machines = args.machines
procs_per_machine = args.procs_per_machine
l = Linear(n_machines, procs_per_machine)
m_id = args.machine_id
sm = args.server_machine
sr = args.server_rank
processes = []
if sm == m_id:
processes.append(
mp.Process(
target=FederatedParameterServer,
args=[
sr,
m_id,
l,
g,
my_config,
args.iterations,
args.log_dir,
args.weights_store_dir,
log_level[args.log_level],
args.test_after,
args.train_evaluate_after,
args.working_rate,
],
)
)
for r in range(0, procs_per_machine):
processes.append(
mp.Process(
target=DPSGDNodeFederated,
args=[
r,
m_id,
l,
g,
my_config,
args.iterations,
args.log_dir,
args.weights_store_dir,
log_level[args.log_level],
args.test_after,
args.train_evaluate_after,
args.reset_optimizer,
],
)
)
for p in processes:
p.start()
for p in processes:
p.join()
import logging
from pathlib import Path
from shutil import copy
from localconfig import LocalConfig
from torch import multiprocessing as mp
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Linear import Linear
from decentralizepy.node.KNN import KNN
def read_ini(file_path):
config = LocalConfig(file_path)
for section in config:
print("Section: ", section)
for key, value in config.items(section):
print((key, value))
print(dict(config.items("DATASET")))
return config
if __name__ == "__main__":
args = utils.get_args()
Path(args.log_dir).mkdir(parents=True, exist_ok=True)
log_level = {
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}
config = read_ini(args.config_file)
my_config = dict()
for section in config:
my_config[section] = dict(config.items(section))
copy(args.config_file, args.log_dir)
copy(args.graph_file, args.log_dir)
utils.write_args(args, args.log_dir)
g = Graph()
g.read_graph_from_file(args.graph_file, args.graph_type)
n_machines = args.machines
procs_per_machine = args.procs_per_machine
l = Linear(n_machines, procs_per_machine)
m_id = args.machine_id
processes = []
for r in range(procs_per_machine):
processes.append(
mp.Process(
target=KNN,
args=[
r,
m_id,
l,
g,
my_config,
args.iterations,
args.log_dir,
args.weights_store_dir,
log_level[args.log_level],
args.test_after,
args.train_evaluate_after,
args.reset_optimizer,
],
)
)
for p in processes:
p.start()
for p in processes:
p.join()
import logging
from pathlib import Path
from shutil import copy
from localconfig import LocalConfig
from torch import multiprocessing as mp
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Linear import Linear
from decentralizepy.node.DPSGDWithPeerSampler import DPSGDWithPeerSampler
from decentralizepy.node.PeerSampler import PeerSampler
def read_ini(file_path):
config = LocalConfig(file_path)
for section in config:
print("Section: ", section)
for key, value in config.items(section):
print((key, value))
print(dict(config.items("DATASET")))
return config
if __name__ == "__main__":
args = utils.get_args()
Path(args.log_dir).mkdir(parents=True, exist_ok=True)
log_level = {
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}
config = read_ini(args.config_file)
my_config = dict()
for section in config:
my_config[section] = dict(config.items(section))
copy(args.config_file, args.log_dir)
copy(args.graph_file, args.log_dir)
utils.write_args(args, args.log_dir)
g = Graph()
g.read_graph_from_file(args.graph_file, args.graph_type)
n_machines = args.machines
procs_per_machine = args.procs_per_machine
l = Linear(n_machines, procs_per_machine)
m_id = args.machine_id
sm = args.server_machine
sr = args.server_rank
processes = []
if sm == m_id:
processes.append(
mp.Process(
# target=PeerSamplerDynamic,
target=PeerSampler,
args=[
sr,
m_id,
l,
g,
my_config,
args.iterations,
args.log_dir,
log_level[args.log_level],
],
)
)
for r in range(0, procs_per_machine):
processes.append(
mp.Process(
target=DPSGDWithPeerSampler,
args=[
r,
m_id,
l,
g,
my_config,
args.iterations,
args.log_dir,
args.weights_store_dir,
log_level[args.log_level],
args.test_after,
args.train_evaluate_after,
args.reset_optimizer,
],
)
)
for p in processes:
p.start()
for p in processes:
p.join()
import logging
from pathlib import Path
from shutil import copy
from localconfig import LocalConfig
from torch import multiprocessing as mp
from decentralizepy import utils
from decentralizepy.graphs.Graph import Graph
from decentralizepy.mappings.Linear import Linear
from decentralizepy.node.DPSGDWithPeerSampler import DPSGDWithPeerSampler
from decentralizepy.node.PeerSamplerDynamic import PeerSamplerDynamic
def read_ini(file_path):
config = LocalConfig(file_path)
for section in config:
print("Section: ", section)
for key, value in config.items(section):
print((key, value))
print(dict(config.items("DATASET")))
return config
if __name__ == "__main__":
args = utils.get_args()
Path(args.log_dir).mkdir(parents=True, exist_ok=True)
log_level = {
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}
config = read_ini(args.config_file)
my_config = dict()
for section in config:
my_config[section] = dict(config.items(section))
copy(args.config_file, args.log_dir)
copy(args.graph_file, args.log_dir)
utils.write_args(args, args.log_dir)
g = Graph()
g.read_graph_from_file(args.graph_file, args.graph_type)
n_machines = args.machines
procs_per_machine = args.procs_per_machine
l = Linear(n_machines, procs_per_machine)
m_id = args.machine_id
sm = args.server_machine
sr = args.server_rank
processes = []
if sm == m_id:
processes.append(
mp.Process(
target=PeerSamplerDynamic,
args=[
sr,
m_id,
l,
g,
my_config,
args.iterations,
args.log_dir,
log_level[args.log_level],
],
)
)
for r in range(0, procs_per_machine):
processes.append(
mp.Process(
target=DPSGDWithPeerSampler,
args=[
r,
m_id,
l,
g,
my_config,
args.iterations,
args.log_dir,
args.weights_store_dir,
log_level[args.log_level],
args.test_after,
args.train_evaluate_after,
args.reset_optimizer,
],
)
)
for p in processes:
p.start()
for p in processes:
p.join()
...@@ -45,13 +45,16 @@ install_requires = ...@@ -45,13 +45,16 @@ install_requires =
PyWavelets PyWavelets
pandas pandas
crudini crudini
sklearn
lz4
fpzip
include_package_data = True include_package_data = True
python_requires = >=3.6 python_requires = >=3.6
[options.packages.find] [options.packages.find]
where = src where = src
[options.extras_require] [options.extras_require]
dev = dev =
black black>22.3.0
coverage coverage
isort isort
pytest pytest
......
import sys import sys
from decentralizepy.datasets.Femnist import Femnist from decentralizepy.datasets.Reddit import Reddit
from decentralizepy.mappings import Linear
if __name__ == "__main__": if __name__ == "__main__":
f = Femnist(None, None, None) mapping = Linear(6, 16)
f = Reddit(0, 0, mapping)
assert len(sys.argv) == 3 assert len(sys.argv) == 3
frm = sys.argv[1] frm = sys.argv[1]
to = sys.argv[2] to = sys.argv[2]
......
...@@ -35,10 +35,20 @@ class TCP(Communication): ...@@ -35,10 +35,20 @@ class TCP(Communication):
""" """
machine_addr = self.ip_addrs[str(machine_id)] machine_addr = self.ip_addrs[str(machine_id)]
port = rank + 20000 port = (2 * rank + 1) + self.offset
assert port > 0
return "tcp://{}:{}".format(machine_addr, port) return "tcp://{}:{}".format(machine_addr, port)
def __init__(self, rank, machine_id, mapping, total_procs, addresses_filepath): def __init__(
self,
rank,
machine_id,
mapping,
total_procs,
addresses_filepath,
offset=9000,
recv_timeout=50,
):
""" """
Constructor Constructor
...@@ -54,6 +64,10 @@ class TCP(Communication): ...@@ -54,6 +64,10 @@ class TCP(Communication):
Total number of processes Total number of processes
addresses_filepath : str addresses_filepath : str
JSON file with machine_id -> ip mapping JSON file with machine_id -> ip mapping
compression_package : str
Import path of a module that implements the compression.Compression.Compression class
compression_class : str
Name of the compression class inside the compression package
""" """
super().__init__(rank, machine_id, mapping, total_procs) super().__init__(rank, machine_id, mapping, total_procs)
...@@ -65,17 +79,22 @@ class TCP(Communication): ...@@ -65,17 +79,22 @@ class TCP(Communication):
self.rank = rank self.rank = rank
self.machine_id = machine_id self.machine_id = machine_id
self.mapping = mapping self.mapping = mapping
self.offset = offset
self.recv_timeout = recv_timeout
self.uid = mapping.get_uid(rank, machine_id) self.uid = mapping.get_uid(rank, machine_id)
self.identity = str(self.uid).encode() self.identity = str(self.uid).encode()
self.context = zmq.Context() self.context = zmq.Context()
self.router = self.context.socket(zmq.ROUTER) self.router = self.context.socket(zmq.ROUTER)
self.router.setsockopt(zmq.IDENTITY, self.identity) self.router.setsockopt(zmq.IDENTITY, self.identity)
self.router.setsockopt(zmq.RCVTIMEO, self.recv_timeout)
self.router.setsockopt(zmq.ROUTER_MANDATORY, 1)
self.router.bind(self.addr(rank, machine_id)) self.router.bind(self.addr(rank, machine_id))
self.sent_disconnections = False
self.total_data = 0
self.total_meta = 0
self.peer_deque = deque() self.peer_deque = deque()
self.peer_sockets = dict() self.peer_sockets = dict()
self.barrier = set()
def __del__(self): def __del__(self):
""" """
...@@ -99,7 +118,13 @@ class TCP(Communication): ...@@ -99,7 +118,13 @@ class TCP(Communication):
Encoded data Encoded data
""" """
return pickle.dumps(data) data_len = 0
if "params" in data:
data_len = len(pickle.dumps(data["params"]))
output = pickle.dumps(data)
self.total_meta += len(output) - data_len
self.total_data += data_len
return output
def decrypt(self, sender, data): def decrypt(self, sender, data):
""" """
...@@ -122,55 +147,34 @@ class TCP(Communication): ...@@ -122,55 +147,34 @@ class TCP(Communication):
data = pickle.loads(data) data = pickle.loads(data)
return sender, data return sender, data
def connect_neighbors(self, neighbors): def init_connection(self, neighbor):
""" """
Connects all neighbors. Sends HELLO. Waits for HELLO. Initiates a socket to a given node.
Caches any data received while waiting for HELLOs.
Parameters Parameters
---------- ----------
neighbors : list(int) neighbor : int
List of neighbors neighbor to connect to
Raises """
------ logging.debug("Connecting to my neighbour: {}".format(neighbor))
RuntimeError id = str(neighbor).encode()
If received BYE while waiting for HELLO req = self.context.socket(zmq.DEALER)
req.setsockopt(zmq.IDENTITY, self.identity)
""" req.connect(self.addr(*self.mapping.get_machine_and_rank(neighbor)))
logging.info("Sending connection request to neighbors") self.peer_sockets[id] = req
for uid in neighbors:
logging.debug("Connecting to my neighbour: {}".format(uid)) def destroy_connection(self, neighbor, linger=None):
id = str(uid).encode() id = str(neighbor).encode()
req = self.context.socket(zmq.DEALER) if self.already_connected(neighbor):
req.setsockopt(zmq.IDENTITY, self.identity) self.peer_sockets[id].close(linger=linger)
req.connect(self.addr(*self.mapping.get_machine_and_rank(uid))) del self.peer_sockets[id]
self.peer_sockets[id] = req
req.send(HELLO) def already_connected(self, neighbor):
id = str(neighbor).encode()
num_neighbors = len(neighbors) return id in self.peer_sockets
while len(self.barrier) < num_neighbors:
sender, recv = self.router.recv_multipart() def receive(self, block=True):
if recv == HELLO:
logging.debug("Received {} from {}".format(HELLO, sender))
self.barrier.add(sender)
elif recv == BYE:
logging.debug("Received {} from {}".format(BYE, sender))
raise RuntimeError(
"A neighbour wants to disconnect before training started!"
)
else:
logging.debug(
"Received message from {} @ connect_neighbors".format(sender)
)
self.peer_deque.append(self.decrypt(sender, recv))
logging.info("Connected to all neighbors")
self.initialized = True
def receive(self):
""" """
Returns ONE message received. Returns ONE message received.
...@@ -185,27 +189,21 @@ class TCP(Communication): ...@@ -185,27 +189,21 @@ class TCP(Communication):
If received HELLO If received HELLO
""" """
assert self.initialized == True while True:
if len(self.peer_deque) != 0: try:
resp = self.peer_deque.popleft() sender, recv = self.router.recv_multipart()
return resp s, r = self.decrypt(sender, recv)
return s, r
sender, recv = self.router.recv_multipart() except zmq.ZMQError as exc:
if exc.errno == zmq.EAGAIN:
if recv == HELLO: if not block:
logging.debug("Received {} from {}".format(HELLO, sender)) return None
raise RuntimeError( else:
"A neighbour wants to connect when everyone is connected!" continue
) else:
elif recv == BYE: raise
logging.debug("Received {} from {}".format(BYE, sender))
self.barrier.remove(sender)
return self.receive()
else:
logging.debug("Received message from {}".format(sender))
return self.decrypt(sender, recv)
def send(self, uid, data): def send(self, uid, data, encrypt=True):
""" """
Send a message to a process. Send a message to a process.
...@@ -217,35 +215,13 @@ class TCP(Communication): ...@@ -217,35 +215,13 @@ class TCP(Communication):
Message as a Python dictionary Message as a Python dictionary
""" """
assert self.initialized == True if encrypt:
to_send = self.encrypt(data) to_send = self.encrypt(data)
else:
to_send = data
data_size = len(to_send) data_size = len(to_send)
self.total_bytes += data_size self.total_bytes += data_size
id = str(uid).encode() id = str(uid).encode()
self.peer_sockets[id].send(to_send) self.peer_sockets[id].send(to_send)
logging.debug("{} sent the message to {}.".format(self.uid, uid)) logging.debug("{} sent the message to {}.".format(self.uid, uid))
logging.info("Sent this round: {}".format(data_size)) logging.info("Sent message size: {}".format(data_size))
def disconnect_neighbors(self):
"""
Disconnects all neighbors.
"""
assert self.initialized == True
if not self.sent_disconnections:
logging.info("Disconnecting neighbors")
for sock in self.peer_sockets.values():
sock.send(BYE)
self.sent_disconnections = True
while len(self.barrier):
sender, recv = self.router.recv_multipart()
if recv == BYE:
logging.debug("Received {} from {}".format(BYE, sender))
self.barrier.remove(sender)
else:
logging.critical(
"Received unexpected {} from {}".format(recv, sender)
)
raise RuntimeError(
"Received a message when expecting BYE from {}".format(sender)
)
class Compression:
"""
Compression API
"""
def __init__(self):
"""
Constructor
"""
def compress(self, arr):
"""
compression function
Parameters
----------
arr : np.ndarray
Data to compress
Returns
-------
bytearray
encoded data as bytes
"""
raise NotImplementedError
def decompress(self, bytes):
"""
decompression function
Parameters
----------
bytes :bytearray
compressed data
Returns
-------
arr : np.ndarray
decompressed data as array
"""
raise NotImplementedError
def compress_float(self, arr):
"""
compression function for float arrays
Parameters
----------
arr : np.ndarray
Data to compress
Returns
-------
bytearray
encoded data as bytes
"""
raise NotImplementedError
def decompress_float(self, bytes):
"""
decompression function for compressed float arrays
Parameters
----------
bytes :bytearray
compressed data
Returns
-------
arr : np.ndarray
decompressed data as array
"""
raise NotImplementedError
# elias implementation: taken from this stack overflow post:
# https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma
import numpy as np
from decentralizepy.compression.Compression import Compression
class Elias(Compression):
"""
Compression API
"""
def __init__(self):
"""
Constructor
"""
def compress(self, arr):
"""
compression function
Parameters
----------
arr : np.ndarray
Data to compress
Returns
-------
bytearray
encoded data as bytes
"""
arr.sort()
first = arr[0]
arr = np.diff(arr).astype(np.int32)
arr = arr.view(f"u{arr.itemsize}")
l = np.log2(arr).astype("u1")
L = ((l << 1) + 1).cumsum()
out = np.zeros(int(L[-1] + 128), "u1")
for i in range(l.max() + 1):
out[L - i - 1] += (arr >> i) & 1
s = np.array([out.size], dtype=np.int64)
size = np.ndarray(8, dtype="u1", buffer=s.data)
packed = np.packbits(out)
packed[-8:] = size
s = np.array([first], dtype=np.int64)
size = np.ndarray(8, dtype="u1", buffer=s.data)
packed[-16:-8] = size
return packed
def decompress(self, bytes):
"""
decompression function
Parameters
----------
bytes :bytearray
compressed data
Returns
-------
arr : np.ndarray
decompressed data as array
"""
n_arr = bytes[-8:]
n = np.ndarray(1, dtype=np.int64, buffer=n_arr.data)[0]
first = bytes[-16:-8]
first = np.ndarray(1, dtype=np.int64, buffer=first.data)[0]
b = bytes[:-16]
b = np.unpackbits(b, count=n).view(bool)
s = b.nonzero()[0]
s = (s << 1).repeat(np.diff(s, prepend=-1))
s -= np.arange(-1, len(s) - 1)
s = s.tolist() # list has faster __getitem__
ns = len(s)
def gen():
idx = 0
yield idx
while idx < ns:
idx = s[idx]
yield idx
offs = np.fromiter(gen(), int)
sz = np.diff(offs) >> 1
mx = sz.max() + 1
out_fin = np.zeros(offs.size, int)
out_fin[0] = first
out = out_fin[1:]
for i in range(mx):
out[b[offs[1:] - i - 1] & (sz >= i)] += 1 << i
out = np.cumsum(out_fin)
return out
def compress_float(self, arr):
"""
compression function for float arrays
Parameters
----------
arr : np.ndarray
Data to compress
Returns
-------
bytearray
encoded data as bytes
"""
return arr
def decompress_float(self, bytes):
"""
decompression function for compressed float arrays
Parameters
----------
bytes :bytearray
compressed data
Returns
-------
arr : np.ndarray
decompressed data as array
"""
return bytes
# elias implementation: taken from this stack overflow post:
# https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma
import fpzip
from decentralizepy.compression.Elias import Elias
class EliasFpzip(Elias):
"""
Compression API
"""
def __init__(self):
"""
Constructor
"""
def compress_float(self, arr):
"""
compression function for float arrays
Parameters
----------
arr : np.ndarray
Data to compress
Returns
-------
bytearray
encoded data as bytes
"""
return fpzip.compress(arr, precision=0, order="C")
def decompress_float(self, bytes):
"""
decompression function for compressed float arrays
Parameters
----------
bytes :bytearray
compressed data
Returns
-------
arr : np.ndarray
decompressed data as array
"""
return fpzip.decompress(bytes, order="C").squeeze()
# elias implementation: taken from this stack overflow post:
# https://stackoverflow.com/questions/62843156/python-fast-compression-of-large-amount-of-numbers-with-elias-gamma
import fpzip
from decentralizepy.compression.Elias import Elias
class EliasFpzipLossy(Elias):
"""
Compression API
"""
def __init__(self):
"""
Constructor
"""
def compress_float(self, arr):
"""
compression function for float arrays
Parameters
----------
arr : np.ndarray
Data to compress
Returns
-------
bytearray
encoded data as bytes
"""
return fpzip.compress(arr, precision=18, order="C")
def decompress_float(self, bytes):
"""
decompression function for compressed float arrays
Parameters
----------
bytes :bytearray
compressed data
Returns
-------
arr : np.ndarray
decompressed data as array
"""
return fpzip.decompress(bytes, order="C").squeeze()
import lz4.frame
import numpy as np
from decentralizepy.compression.Compression import Compression
class Lz4Wrapper(Compression):
"""
Compression API
"""
def __init__(self, compress_metadata=True, compress_data=False):
"""
Constructor
"""
self.compress_metadata = compress_metadata
self.compress_data = compress_data
def compress(self, arr):
"""
compression function
Parameters
----------
arr : np.ndarray
Data to compress
Returns
-------
bytearray
encoded data as bytes
"""
if self.compress_metadata:
arr.sort()
diff = np.diff(arr, prepend=0).astype(np.int32)
to_compress = diff.tobytes("C")
return lz4.frame.compress(to_compress)
return arr
def decompress(self, bytes):
"""
decompression function
Parameters
----------
bytes :bytearray
compressed data
Returns
-------
arr : np.ndarray
decompressed data as array
"""
if self.compress_metadata:
decomp = lz4.frame.decompress(bytes)
return np.cumsum(np.frombuffer(decomp, dtype=np.int32))
return bytes
def compress_float(self, arr):
"""
compression function for float arrays
Parameters
----------
arr : np.ndarray
Data to compress
Returns
-------
bytearray
encoded data as bytes
"""
if self.compress_data:
to_compress = arr.tobytes("C")
return lz4.frame.compress(to_compress)
return arr
def decompress_float(self, bytes):
"""
decompression function for compressed float arrays
Parameters
----------
bytes :bytearray
compressed data
Returns
-------
arr : np.ndarray
decompressed data as array
"""
if self.compress_data:
decomp = lz4.frame.decompress(bytes)
return np.frombuffer(decomp, dtype=np.float32)
return bytes
import logging
import torch
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from torch import nn
from torch.utils.data import DataLoader
from decentralizepy.datasets.Dataset import Dataset
from decentralizepy.datasets.Partitioner import DataPartitioner, KShardDataPartitioner
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.models.Model import Model
NUM_CLASSES = 10
class CIFAR10(Dataset):
"""
Class for the FEMNIST dataset
"""
def load_trainset(self):
"""
Loads the training set. Partitions it if needed.
"""
logging.info("Loading training set.")
trainset = torchvision.datasets.CIFAR10(
root=self.train_dir, train=True, download=True, transform=self.transform
)
c_len = len(trainset)
if self.sizes == None: # Equal distribution of data among processes
e = c_len // self.n_procs
frac = e / c_len
self.sizes = [frac] * self.n_procs
self.sizes[-1] += 1.0 - frac * self.n_procs
logging.debug("Size fractions: {}".format(self.sizes))
self.uid = self.mapping.get_uid(self.rank, self.machine_id)
if not self.partition_niid:
self.trainset = DataPartitioner(trainset, self.sizes).use(self.uid)
else:
train_data = {key: [] for key in range(10)}
for x, y in trainset:
train_data[y].append(x)
all_trainset = []
for y, x in train_data.items():
all_trainset.extend([(a, y) for a in x])
self.trainset = KShardDataPartitioner(
all_trainset, self.sizes, shards=self.shards
).use(self.uid)
def load_testset(self):
"""
Loads the testing set.
"""
logging.info("Loading testing set.")
self.testset = torchvision.datasets.CIFAR10(
root=self.test_dir, train=False, download=True, transform=self.transform
)
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
n_procs="",
train_dir="",
test_dir="",
sizes="",
test_batch_size=1024,
partition_niid=False,
shards=1,
):
"""
Constructor which reads the data files, instantiates and partitions the dataset
Parameters
----------
rank : int
Rank of the current process (to get the partition).
machine_id : int
Machine ID
mapping : decentralizepy.mappings.Mapping
Mapping to convert rank, machine_id -> uid for data partitioning
It also provides the total number of global processes
train_dir : str, optional
Path to the training data files. Required to instantiate the training set
The training set is partitioned according to the number of global processes and sizes
test_dir : str. optional
Path to the testing data files Required to instantiate the testing set
sizes : list(int), optional
A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0
By default, each process gets an equal amount.
test_batch_size : int, optional
Batch size during testing. Default value is 64
partition_niid: bool, optional
When True, partitions dataset in a non-iid way
"""
super().__init__(
rank,
machine_id,
mapping,
train_dir,
test_dir,
sizes,
test_batch_size,
)
self.num_classes = NUM_CLASSES
self.partition_niid = partition_niid
self.shards = shards
self.transform = transforms.Compose(
[
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
]
)
if self.__training__:
self.load_trainset()
if self.__testing__:
self.load_testset()
# TODO: Add Validation
def get_trainset(self, batch_size=1, shuffle=False):
"""
Function to get the training set
Parameters
----------
batch_size : int, optional
Batch size for learning
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the training set was not initialized
"""
if self.__training__:
return DataLoader(self.trainset, batch_size=batch_size, shuffle=shuffle)
raise RuntimeError("Training set not initialized!")
def get_testset(self):
"""
Function to get the test set
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the test set was not initialized
"""
if self.__testing__:
return DataLoader(self.testset, batch_size=self.test_batch_size)
raise RuntimeError("Test set not initialized!")
def test(self, model, loss):
"""
Function to evaluate model on the test dataset.
Parameters
----------
model : decentralizepy.models.Model
Model to evaluate
loss : torch.nn.loss
Loss function to evaluate
Returns
-------
tuple
(accuracy, loss_value)
"""
testloader = self.get_testset()
logging.debug("Test Loader instantiated.")
correct_pred = [0 for _ in range(NUM_CLASSES)]
total_pred = [0 for _ in range(NUM_CLASSES)]
total_correct = 0
total_predicted = 0
with torch.no_grad():
loss_val = 0.0
count = 0
for elems, labels in testloader:
outputs = model(elems)
loss_val += loss(outputs, labels).item()
count += 1
_, predictions = torch.max(outputs, 1)
for label, prediction in zip(labels, predictions):
logging.debug("{} predicted as {}".format(label, prediction))
if label == prediction:
correct_pred[label] += 1
total_correct += 1
total_pred[label] += 1
total_predicted += 1
logging.debug("Predicted on the test set")
for key, value in enumerate(correct_pred):
if total_pred[key] != 0:
accuracy = 100 * float(value) / total_pred[key]
else:
accuracy = 100.0
logging.debug("Accuracy for class {} is: {:.1f} %".format(key, accuracy))
accuracy = 100 * float(total_correct) / total_predicted
loss_val = loss_val / count
logging.info("Overall accuracy is: {:.1f} %".format(accuracy))
return accuracy, loss_val
class CNN(Model):
"""
Class for a CNN Model for CIFAR10
"""
def __init__(self):
"""
Constructor. Instantiates the CNN Model
with 10 output classes
"""
super().__init__()
# 1.6 million params
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, NUM_CLASSES)
def forward(self, x):
"""
Forward pass of the model
Parameters
----------
x : torch.tensor
The input torch tensor
Returns
-------
torch.tensor
The output torch tensor
"""
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = torch.flatten(x, 1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
class LeNet(Model):
"""
Class for a LeNet Model for CIFAR10
Inspired by original LeNet network for MNIST: https://ieeexplore.ieee.org/abstract/document/726791
"""
def __init__(self):
"""
Constructor. Instantiates the CNN Model
with 10 output classes
"""
super().__init__()
self.conv1 = nn.Conv2d(3, 32, 5, padding="same")
self.pool = nn.MaxPool2d(2, 2)
self.gn1 = nn.GroupNorm(2, 32)
self.conv2 = nn.Conv2d(32, 32, 5, padding="same")
self.gn2 = nn.GroupNorm(2, 32)
self.conv3 = nn.Conv2d(32, 64, 5, padding="same")
self.gn3 = nn.GroupNorm(2, 64)
self.fc1 = nn.Linear(64 * 4 * 4, NUM_CLASSES)
def forward(self, x):
"""
Forward pass of the model
Parameters
----------
x : torch.tensor
The input torch tensor
Returns
-------
torch.tensor
The output torch tensor
"""
x = self.pool(F.relu(self.gn1(self.conv1(x))))
x = self.pool(F.relu(self.gn2(self.conv2(x))))
x = self.pool(F.relu(self.gn3(self.conv3(x))))
x = torch.flatten(x, 1)
x = self.fc1(x)
return x
...@@ -230,6 +230,8 @@ class Celeba(Dataset): ...@@ -230,6 +230,8 @@ class Celeba(Dataset):
self.IMAGES_DIR = utils.conditional_value(images_dir, "", None) self.IMAGES_DIR = utils.conditional_value(images_dir, "", None)
assert self.IMAGES_DIR != None assert self.IMAGES_DIR != None
self.num_classes = NUM_CLASSES
if self.__training__: if self.__training__:
self.load_trainset() self.load_trainset()
......
...@@ -52,6 +52,7 @@ class Dataset: ...@@ -52,6 +52,7 @@ class Dataset:
self.test_dir = utils.conditional_value(test_dir, "", None) self.test_dir = utils.conditional_value(test_dir, "", None)
self.sizes = utils.conditional_value(sizes, "", None) self.sizes = utils.conditional_value(sizes, "", None)
self.test_batch_size = utils.conditional_value(test_batch_size, "", 64) self.test_batch_size = utils.conditional_value(test_batch_size, "", 64)
self.num_classes = None
if self.sizes: if self.sizes:
if type(self.sizes) == str: if type(self.sizes) == str:
self.sizes = eval(self.sizes) self.sizes = eval(self.sizes)
...@@ -66,6 +67,20 @@ class Dataset: ...@@ -66,6 +67,20 @@ class Dataset:
else: else:
self.__testing__ = False self.__testing__ = False
self.label_distribution = None
def get_label_distribution(self):
# Only supported for classification
if self.label_distribution == None:
self.label_distribution = [0 for _ in range(self.num_classes)]
tr_set = self.get_trainset()
for _, ys in tr_set:
for y in ys:
y_val = y.item()
self.label_distribution[y_val] += 1
return self.label_distribution
def get_trainset(self): def get_trainset(self):
""" """
Function to get the training set Function to get the training set
......
...@@ -14,6 +14,7 @@ from decentralizepy.datasets.Dataset import Dataset ...@@ -14,6 +14,7 @@ from decentralizepy.datasets.Dataset import Dataset
from decentralizepy.datasets.Partitioner import DataPartitioner from decentralizepy.datasets.Partitioner import DataPartitioner
from decentralizepy.mappings.Mapping import Mapping from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.models.Model import Model from decentralizepy.models.Model import Model
from decentralizepy.models.Resnet import BasicBlock, Bottleneck, conv1x1
NUM_CLASSES = 62 NUM_CLASSES = 62
IMAGE_SIZE = (28, 28) IMAGE_SIZE = (28, 28)
...@@ -222,6 +223,8 @@ class Femnist(Dataset): ...@@ -222,6 +223,8 @@ class Femnist(Dataset):
test_batch_size, test_batch_size,
) )
self.num_classes = NUM_CLASSES
if self.__training__: if self.__training__:
self.load_trainset() self.load_trainset()
...@@ -290,7 +293,10 @@ class Femnist(Dataset): ...@@ -290,7 +293,10 @@ class Femnist(Dataset):
""" """
if self.__training__: if self.__training__:
return DataLoader( return DataLoader(
Data(self.train_x, self.train_y), batch_size=batch_size, shuffle=shuffle Data(self.train_x, self.train_y),
batch_size=batch_size,
shuffle=shuffle,
drop_last=True, # needed for resnet
) )
raise RuntimeError("Training set not initialized!") raise RuntimeError("Training set not initialized!")
...@@ -448,3 +454,139 @@ class CNN(Model): ...@@ -448,3 +454,139 @@ class CNN(Model):
x = F.relu(self.fc1(x)) x = F.relu(self.fc1(x))
x = self.fc2(x) x = self.fc2(x)
return x return x
class RNET(Model):
"""
From PyTorch:
Class for a Resnet Model for FEMNIST
Copied and modified from https://github.com/pytorch/pytorch/blob/75024e228ca441290b6a1c2e564300ad507d7af6/benchmarks/functional_autograd_benchmark/torchvision_models.py
For the license see models/Resnet.py
"""
def __init__(
self,
num_classes=NUM_CLASSES,
zero_init_residual=False,
groups=1,
width_per_group=32,
replace_stride_with_dilation=None,
norm_layer=None,
):
super(RNET, self).__init__()
block = BasicBlock
layers = [2, 2, 2, 2]
if norm_layer is None:
norm_layer = nn.BatchNorm2d
self._norm_layer = norm_layer
self.inplanes = 32
self.dilation = 1
if replace_stride_with_dilation is None:
# each element in the tuple indicates if we should replace
# the 2x2 stride with a dilated convolution instead
replace_stride_with_dilation = [False, False, False]
if len(replace_stride_with_dilation) != 3:
raise ValueError(
"replace_stride_with_dilation should be None "
"or a 3-element tuple, got {}".format(replace_stride_with_dilation)
)
self.groups = groups
self.base_width = width_per_group
self.conv1 = nn.Conv2d(
1, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False
)
self.bn1 = norm_layer(self.inplanes)
self.relu = nn.ReLU(inplace=True)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = self._make_layer(block, 32, layers[0])
self.layer2 = self._make_layer(
block, 64, layers[1], stride=2, dilate=replace_stride_with_dilation[0]
)
self.layer3 = self._make_layer(
block, 128, layers[2], stride=2, dilate=replace_stride_with_dilation[1]
)
self.layer4 = self._make_layer(
block, 256, layers[3], stride=2, dilate=replace_stride_with_dilation[2]
)
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(256 * block.expansion, num_classes)
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
# Zero-initialize the last BN in each residual branch,
# so that the residual branch starts with zeros, and each residual block behaves like an identity.
# This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
if zero_init_residual:
for m in self.modules():
if isinstance(m, Bottleneck):
nn.init.constant_(m.bn3.weight, 0)
elif isinstance(m, BasicBlock):
nn.init.constant_(m.bn2.weight, 0)
def _make_layer(self, block, planes, blocks, stride=1, dilate=False):
norm_layer = self._norm_layer
downsample = None
previous_dilation = self.dilation
if dilate:
self.dilation *= stride
stride = 1
if stride != 1 or self.inplanes != planes * block.expansion:
downsample = nn.Sequential(
conv1x1(self.inplanes, planes * block.expansion, stride),
norm_layer(planes * block.expansion),
)
layers = []
layers.append(
block(
self.inplanes,
planes,
stride,
downsample,
self.groups,
self.base_width,
previous_dilation,
norm_layer,
)
)
self.inplanes = planes * block.expansion
for _ in range(1, blocks):
layers.append(
block(
self.inplanes,
planes,
groups=self.groups,
base_width=self.base_width,
dilation=self.dilation,
norm_layer=norm_layer,
)
)
return nn.Sequential(*layers)
def _forward_impl(self, x):
# See note [TorchScript super()]
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.fc(x)
return x
def forward(self, x):
return self._forward_impl(x)
import logging
import math
import os
import zipfile
import pandas as pd
import requests
import torch
from sklearn import metrics
from torch.utils.data import DataLoader
from decentralizepy.datasets.Data import Data
from decentralizepy.datasets.Dataset import Dataset
from decentralizepy.mappings import Mapping
from decentralizepy.models.Model import Model
class MovieLens(Dataset):
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
train_dir="",
test_dir="",
sizes="",
test_batch_size=1,
):
super().__init__(
rank, machine_id, mapping, train_dir, test_dir, sizes, test_batch_size
)
self.n_users, self.n_items, df_train, df_test = self._load_data()
self.train_data, self.test_data = self._split_data(
df_train, df_test, self.n_procs
)
# [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0]
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ]
self.NUM_CLASSES = 10
self.RATING_DICT = {
0.5: 0,
1.0: 1,
1.5: 2,
2.0: 3,
2.5: 4,
3.0: 5,
3.5: 6,
4.0: 7,
4.5: 8,
5.0: 9,
}
def _load_data(self):
f_ratings = os.path.join(self.train_dir, "ml-latest-small", "ratings.csv")
names = ["user_id", "item_id", "rating", "timestamp"]
df_ratings = pd.read_csv(f_ratings, sep=",", names=names, skiprows=1).drop(
columns=["timestamp"]
)
# map item_id properly
items_count = df_ratings["item_id"].nunique()
items_ids = sorted(list(df_ratings["item_id"].unique()))
assert items_count == len(items_ids)
for i in range(0, items_count):
df_ratings.loc[df_ratings["item_id"] == items_ids[i], "item_id"] = i + 1
# split train, test - 70% : 30%
grouped_users = df_ratings.groupby(["user_id"])
users_count = len(grouped_users)
df_train = pd.DataFrame()
df_test = pd.DataFrame()
for i in range(0, users_count):
df_user = df_ratings[df_ratings["user_id"] == i + 1]
df_user_train = df_user.sample(frac=0.7)
df_user_test = pd.concat([df_user, df_user_train]).drop_duplicates(
keep=False
)
assert len(df_user_train) + len(df_user_test) == len(df_user)
df_train = pd.concat([df_train, df_user_train])
df_test = pd.concat([df_test, df_user_test])
# 610, 9724
return users_count, items_count, df_train, df_test
def _split_data(self, train_data, test_data, world_size):
# SPLITTING BY USERS: group by users and split the data accordingly
mod = self.n_users % world_size
users_count = self.n_users // world_size
if self.rank < mod:
users_count += 1
offset = users_count * self.rank
else:
offset = users_count * self.rank + mod
my_train_data = pd.DataFrame()
my_test_data = pd.DataFrame()
for i in range(offset, offset + users_count):
my_train_data = pd.concat(
[my_train_data, train_data[train_data["user_id"] == i + 1]]
)
my_test_data = pd.concat(
[my_test_data, test_data[test_data["user_id"] == i + 1]]
)
logging.info("Data split for test and train.")
return my_train_data, my_test_data
def get_trainset(self, batch_size=1, shuffle=False):
if self.__training__:
train_x = self.train_data[["user_id", "item_id"]].to_numpy()
train_y = self.train_data.rating.values.astype("float32")
return DataLoader(
Data(train_x, train_y), batch_size=batch_size, shuffle=shuffle
)
raise RuntimeError("Training set not initialized!")
def get_testset(self):
if self.__testing__:
test_x = self.test_data[["user_id", "item_id"]].to_numpy()
test_y = self.test_data.rating.values
return DataLoader(Data(test_x, test_y), batch_size=self.test_batch_size)
raise RuntimeError("Test set not initialized!")
def test(self, model, loss):
test_set = self.get_testset()
logging.debug("Test Loader instantiated.")
correct_pred = [0 for _ in range(self.NUM_CLASSES)]
total_pred = [0 for _ in range(self.NUM_CLASSES)]
total_correct = 0
total_predicted = 0
with torch.no_grad():
loss_val = 0.0
loss_predicted = 0.0
count = 0
for test_x, test_y in test_set:
output = model(test_x)
loss_val += loss(output, test_y).item()
count += 1
# threshold values to range [0.5, 5.0]
o1 = (output > 5.0).nonzero(as_tuple=False)
output[o1] = 5.0
o1 = (output < 0.5).nonzero(as_tuple=False)
output[o1] = 0.5
# round a number to the closest half integer
output = torch.round(output * 2) / 2
loss_predicted += metrics.mean_absolute_error(output, test_y)
for rating, prediction in zip(test_y.tolist(), output):
# print(rating, prediction)
logging.debug("{} predicted as {}".format(rating, prediction))
if rating == prediction:
correct_pred[self.RATING_DICT[rating]] += 1
total_correct += 1
total_pred[self.RATING_DICT[rating]] += 1
total_predicted += 1
logging.debug("Predicted on the test set")
for key, value in enumerate(correct_pred):
if total_pred[key] != 0:
accuracy = 100 * float(value) / total_pred[key]
else:
accuracy = 100.0
logging.debug("Accuracy for class {} is: {:.1f} %".format(key, accuracy))
accuracy = 100 * float(total_correct) / total_predicted
loss_val = math.sqrt(loss_val / count)
loss_predicted = loss_predicted / count
logging.info(
"MSE loss: {:.8f} | Rounded MAE loss: {:.8f}".format(
loss_val, loss_predicted
)
)
logging.info("Overall accuracy is: {:.1f} %".format(accuracy))
return accuracy, loss_val
# todo: this class should be in 'models' package; add support for reading it from there and move it
class MatrixFactorization(Model):
"""
Class for a Matrix Factorization model for MovieLens.
"""
def __init__(self, n_users=610, n_items=9724, n_factors=20):
"""
Instantiates the Matrix Factorization model with user and item embeddings.
Parameters
----------
n_users
The number of unique users.
n_items
The number of unique items.
n_factors
The number of columns in embeddings matrix.
"""
super().__init__()
self.user_factors = torch.nn.Embedding(n_users, n_factors)
self.item_factors = torch.nn.Embedding(n_items, n_factors)
self.user_factors.weight.data.uniform_(-0.05, 0.05)
self.item_factors.weight.data.uniform_(-0.05, 0.05)
def forward(self, data):
"""
Forward pass of the model, it does matrix multiplication and returns predictions for given users and items.
"""
users = torch.LongTensor(data[:, 0]) - 1
items = torch.LongTensor(data[:, 1]) - 1
u, it = self.user_factors(users), self.item_factors(items)
x = (u * it).sum(dim=1, keepdim=True)
return x.squeeze(1)
def download_movie_lens(dest_path):
"""
Downloads the movielens latest small dataset.
This data set consists of:
* 100836 ratings from 610 users on 9742 movies.
* Each user has rated at least 20 movies.
https://files.grouplens.org/datasets/movielens/ml-latest-small-README.html
"""
url = "http://files.grouplens.org/datasets/movielens/ml-latest-small.zip"
req = requests.get(url, stream=True)
print("Downloading MovieLens Latest Small data...")
with open(os.path.join(dest_path, "ml-latest-small.zip"), "wb") as fd:
for chunk in req.iter_content(chunk_size=None):
fd.write(chunk)
with zipfile.ZipFile(os.path.join(dest_path, "ml-latest-small.zip"), "r") as z:
z.extractall(dest_path)
print("Downloaded MovieLens Latest Small dataset at", dest_path)
if __name__ == "__main__":
path = "/mnt/nfs/shared/leaf/data/movielens"
zip_file = os.path.join(path, "ml-latest-small.zip")
if not os.path.isfile(zip_file):
download_movie_lens(path)
...@@ -102,3 +102,79 @@ class DataPartitioner(object): ...@@ -102,3 +102,79 @@ class DataPartitioner(object):
""" """
return Partition(self.data, self.partitions[rank]) return Partition(self.data, self.partitions[rank])
class SimpleDataPartitioner(DataPartitioner):
"""
Class to partition the dataset
"""
def __init__(self, data, sizes=[1.0]):
"""
Constructor. Partitions the data according the parameters
Parameters
----------
data : indexable
An indexable list of data items
sizes : list(float)
A list of fractions for each process
"""
self.data = data
self.partitions = []
data_len = len(data)
indexes = [x for x in range(0, data_len)]
for frac in sizes:
part_len = int(frac * data_len)
self.partitions.append(indexes[0:part_len])
indexes = indexes[part_len:]
class KShardDataPartitioner(DataPartitioner):
"""
Class to partition the dataset
"""
def __init__(self, data, sizes=[1.0], shards=1, seed=1234):
"""
Constructor. Partitions the data according the parameters
Parameters
----------
data : indexable
An indexable list of data items
sizes : list(float)
A list of fractions for each process
shards : int
Number of shards to allot to process
seed : int, optional
Seed for generating a random subset
"""
self.data = data
self.partitions = []
data_len = len(data)
indexes = [x for x in range(0, data_len)]
rng = Random()
rng.seed(seed)
for frac in sizes:
self.partitions.append([])
for _ in range(shards):
start = rng.randint(0, len(indexes) - 1)
part_len = int(frac * data_len) // shards
if start + part_len > len(indexes):
self.partitions[-1].extend(indexes[start:])
self.partitions[-1].extend(
indexes[: (start + part_len - len(indexes))]
)
indexes = indexes[(start + part_len - len(indexes)) : start]
else:
self.partitions[-1].extend(indexes[start : start + part_len])
index_start = indexes[:start]
index_start.extend(indexes[start + part_len :])
indexes = index_start
import collections
import json
import logging
import os
import pickle
from collections import defaultdict
from pathlib import Path
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
from torch.utils.data import DataLoader
from decentralizepy.datasets.Data import Data
from decentralizepy.datasets.Dataset import Dataset
from decentralizepy.datasets.Partitioner import DataPartitioner
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.models.Model import Model
VOCAB_LEN = 9999 # 10000 was used as it needed to be +1 due to using mask_zero in the tf embedding
SEQ_LEN = 10
EMBEDDING_DIM = 200
class Reddit(Dataset):
"""
Class for the Reddit dataset
-- Based on https://gitlab.epfl.ch/sacs/efficient-federated-learning/-/blob/master/grad_guessing/data_utils.py
and Femnist.py
"""
def __read_file__(self, file_path):
"""
Read data from the given json file
Parameters
----------
file_path : str
The file path
Returns
-------
tuple
(users, num_samples, data)
"""
with open(file_path, "r") as inf:
client_data = json.load(inf)
return (
client_data["users"],
client_data["num_samples"],
client_data["user_data"],
)
def __read_dir__(self, data_dir):
"""
Function to read all the Reddit data files in the directory
Parameters
----------
data_dir : str
Path to the folder containing the data files
Returns
-------
3-tuple
A tuple containing list of users, number of samples per client,
and the data items per client
"""
users = []
num_samples = []
data = defaultdict(lambda: None)
files = os.listdir(data_dir)
files = [f for f in files if f.endswith(".json")]
for f in files:
file_path = os.path.join(data_dir, f)
u, n, d = self.__read_file__(file_path)
users.extend(u)
num_samples.extend(n)
data.update(d)
return users, num_samples, data
def file_per_user(self, dir, write_dir):
"""
Function to read all the Reddit data files and write one file per user
Parameters
----------
dir : str
Path to the folder containing the data files
write_dir : str
Path to the folder to write the files
"""
clients, num_samples, train_data = self.__read_dir__(dir)
for index, client in enumerate(clients):
my_data = dict()
my_data["users"] = [client]
my_data["num_samples"] = num_samples[index]
my_samples = {"x": train_data[client]["x"], "y": train_data[client]["y"]}
my_data["user_data"] = {client: my_samples}
with open(os.path.join(write_dir, client + ".json"), "w") as of:
json.dump(my_data, of)
print("Created File: ", client + ".json")
def load_trainset(self):
"""
Loads the training set. Partitions it if needed.
"""
logging.info("Loading training set.")
files = os.listdir(self.train_dir)
files = [f for f in files if f.endswith(".json")]
files.sort()
c_len = len(files)
# clients, num_samples, train_data = self.__read_dir__(self.train_dir)
if self.sizes == None: # Equal distribution of data among processes
e = c_len // self.n_procs
frac = e / c_len
self.sizes = [frac] * self.n_procs
self.sizes[-1] += 1.0 - frac * self.n_procs
logging.debug("Size fractions: {}".format(self.sizes))
self.uid = self.mapping.get_uid(self.rank, self.machine_id)
my_clients = DataPartitioner(files, self.sizes).use(self.uid)
my_train_data = {"x": [], "y": []}
self.clients = []
self.num_samples = []
logging.debug("Clients Length: %d", c_len)
logging.debug("My_clients_len: %d", my_clients.__len__())
for i in range(my_clients.__len__()):
cur_file = my_clients.__getitem__(i)
clients, _, train_data = self.__read_file__(
os.path.join(self.train_dir, cur_file)
)
for cur_client in clients:
self.clients.append(cur_client)
processed_x, processed_y = self.prepare_data(train_data[cur_client])
# processed_x is an list of fixed size word id arrays that represent a phrase
# processed_y is a list of word ids that each represent the next word of a phrase
my_train_data["x"].extend(processed_x)
my_train_data["y"].extend(processed_y)
self.num_samples.append(len(processed_y))
# turns the list of lists into a single list
self.train_y = np.array(my_train_data["y"], dtype=np.dtype("int64")).reshape(-1)
self.train_x = np.array(
my_train_data["x"], dtype=np.dtype("int64")
) # .reshape(-1)
logging.info("train_x.shape: %s", str(self.train_x.shape))
logging.info("train_y.shape: %s", str(self.train_y.shape))
assert self.train_x.shape[0] == self.train_y.shape[0]
assert self.train_x.shape[0] > 0
def load_testset(self):
"""
Loads the testing set.
"""
logging.info("Loading testing set.")
_, _, d = self.__read_dir__(self.test_dir)
test_x = []
test_y = []
for test_data in d.values():
processed_x, processed_y = self.prepare_data(test_data)
# processed_x is an list of fixed size word id arrays that represent a phrase
# processed_y is a list of word ids that each represent the next word of a phrase
test_x.extend(processed_x)
test_y.extend(processed_y)
self.test_y = np.array(test_y, dtype=np.dtype("int64")).reshape(-1)
self.test_x = np.array(test_x, dtype=np.dtype("int64"))
logging.info("test_x.shape: %s", str(self.test_x.shape))
logging.info("test_y.shape: %s", str(self.test_y.shape))
assert self.test_x.shape[0] == self.test_y.shape[0]
assert self.test_x.shape[0] > 0
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
n_procs="",
train_dir="",
test_dir="",
sizes="",
test_batch_size=1024,
):
"""
Constructor which reads the data files, instantiates and partitions the dataset
Parameters
----------
rank : int
Rank of the current process (to get the partition).
machine_id : int
Machine ID
mapping : decentralizepy.mappings.Mapping
Mapping to convert rank, machine_id -> uid for data partitioning
It also provides the total number of global processes
train_dir : str, optional
Path to the training data files. Required to instantiate the training set
The training set is partitioned according to the number of global processes and sizes
test_dir : str. optional
Path to the testing data files Required to instantiate the testing set
sizes : list(int), optional
A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0
By default, each process gets an equal amount.
test_batch_size : int, optional
Batch size during testing. Default value is 64
"""
super().__init__(
rank,
machine_id,
mapping,
train_dir,
test_dir,
sizes,
test_batch_size,
)
if self.train_dir and Path(self.train_dir).exists():
vocab_path = os.path.join(self.train_dir, "../../vocab/reddit_vocab.pck")
(
self.vocab,
self.vocab_size,
self.unk_symbol,
self.pad_symbol,
) = self._load_vocab(vocab_path)
logging.info("The reddit vocab has %i tokens.", len(self.vocab))
if self.__training__:
self.load_trainset()
if self.__testing__:
self.load_testset()
# TODO: Add Validation
def _load_vocab(self, VOCABULARY_PATH):
"""
loads the training vocabulary
copied from https://gitlab.epfl.ch/sacs/efficient-federated-learning/-/blob/master/grad_guessing/data_utils.py
Parameters
----------
VOCABULARY_PATH : str
Path to the pickled training vocabulary
Returns
-------
Tuple
vocabulary, size, unk symbol, pad symbol
"""
vocab_file = pickle.load(open(VOCABULARY_PATH, "rb"))
vocab = collections.defaultdict(lambda: vocab_file["unk_symbol"])
vocab.update(vocab_file["vocab"])
return (
vocab,
vocab_file["size"],
vocab_file["unk_symbol"],
vocab_file["pad_symbol"],
)
def prepare_data(self, data):
"""
copied from https://gitlab.epfl.ch/sacs/efficient-federated-learning/-/blob/master/grad_guessing/data_utils.py
Parameters
----------
data
Returns
-------
"""
data_x = data["x"]
data_y = data["y"]
# flatten lists
def flatten_lists(data_x_by_comment, data_y_by_comment):
data_x_by_seq, data_y_by_seq = [], []
for c, l in zip(data_x_by_comment, data_y_by_comment):
data_x_by_seq.extend(c)
data_y_by_seq.extend(l["target_tokens"])
return data_x_by_seq, data_y_by_seq
data_x, data_y = flatten_lists(data_x, data_y)
data_x_processed = self.process_x(data_x)
data_y_processed = self.process_y(data_y)
filtered_x, filtered_y = [], []
for i in range(len(data_x_processed)):
if np.sum(data_y_processed[i]) != 0:
filtered_x.append(data_x_processed[i])
filtered_y.append(data_y_processed[i])
return (filtered_x, filtered_y)
def _tokens_to_ids(self, raw_batch):
"""
Turns an list of list of tokens that are of the same size (with padding <PAD>) if needed
into a list of list of word ids
[['<BOS>', 'do', 'you', 'have', 'proof', 'of', 'purchase', 'for', 'clay', 'play'], [ ...], ...]
turns into:
[[ 5 45 13 24 1153 11 1378 17 6817 165], ...]
copied from https://gitlab.epfl.ch/sacs/efficient-federated-learning/-/blob/master/grad_guessing/data_utils.py
Parameters
----------
raw_batch : list
list of fixed size token lists
Returns
-------
2D array with the rows representing fixed size token_ids pharases
"""
def tokens_to_word_ids(tokens, word2id):
return [word2id[word] for word in tokens]
to_ret = [tokens_to_word_ids(seq, self.vocab) for seq in raw_batch]
return np.array(to_ret)
def process_x(self, raw_x_batch):
"""
copied from https://gitlab.epfl.ch/sacs/efficient-federated-learning/-/blob/master/grad_guessing/data_utils.py
Parameters
----------
raw_x_batch
Returns
-------
"""
tokens = self._tokens_to_ids([s for s in raw_x_batch])
return tokens
def process_y(self, raw_y_batch):
"""
copied from https://gitlab.epfl.ch/sacs/efficient-federated-learning/-/blob/master/grad_guessing/data_utils.py
Parameters
----------
raw_y_batch
Returns
-------
"""
tokens = self._tokens_to_ids([s for s in raw_y_batch])
def getNextWord(token_ids):
n = len(token_ids)
for i in range(n):
# gets the word at the end of the phrase that should be predicted
# that is the last token that is not a pad.
if token_ids[n - i - 1] != self.pad_symbol:
return token_ids[n - i - 1]
return self.pad_symbol
return [getNextWord(t) for t in tokens]
def get_client_ids(self):
"""
Function to retrieve all the clients of the current process
Returns
-------
list(str)
A list of strings of the client ids.
"""
return self.clients
def get_client_id(self, i):
"""
Function to get the client id of the ith sample
Parameters
----------
i : int
Index of the sample
Returns
-------
str
Client ID
Raises
------
IndexError
If the sample index is out of bounds
"""
lb = 0
for j in range(len(self.clients)):
if i < lb + self.num_samples[j]:
return self.clients[j]
raise IndexError("i is out of bounds!")
def get_trainset(self, batch_size=1, shuffle=False):
"""
Function to get the training set
Parameters
----------
batch_size : int, optional
Batch size for learning
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the training set was not initialized
"""
if self.__training__:
return DataLoader(
Data(self.train_x, self.train_y), batch_size=batch_size, shuffle=shuffle
)
raise RuntimeError("Training set not initialized!")
def get_testset(self):
"""
Function to get the test set
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the test set was not initialized
"""
if self.__testing__:
return DataLoader(
Data(self.test_x, self.test_y), batch_size=self.test_batch_size
)
raise RuntimeError("Test set not initialized!")
def test(self, model, loss):
"""
Function to evaluate model on the test dataset.
Parameters
----------
model : decentralizepy.models.Model
Model to evaluate
loss : torch.nn.loss
Loss function to evaluate
Returns
-------
tuple
(accuracy, loss_value)
"""
testloader = self.get_testset()
logging.debug("Test Loader instantiated.")
correct_pred = [0 for _ in range(VOCAB_LEN)]
total_pred = [0 for _ in range(VOCAB_LEN)]
total_correct = 0
total_predicted = 0
with torch.no_grad():
loss_val = 0.0
count = 0
for elems, labels in testloader:
outputs = model(elems)
loss_val += loss(outputs, labels).item()
count += 1
_, predictions = torch.max(outputs, 1)
for label, prediction in zip(labels, predictions):
logging.debug("{} predicted as {}".format(label, prediction))
if label == prediction:
correct_pred[label] += 1
total_correct += 1
total_pred[label] += 1
total_predicted += 1
logging.debug("Predicted on the test set")
for key, value in enumerate(correct_pred):
if total_pred[key] != 0:
accuracy = 100 * float(value) / total_pred[key]
else:
accuracy = 100.0
logging.debug("Accuracy for class {} is: {:.1f} %".format(key, accuracy))
accuracy = 100 * float(total_correct) / total_predicted
loss_val = loss_val / count
logging.info("Overall accuracy is: {:.1f} %".format(accuracy))
return accuracy, loss_val
class RNN(Model):
"""
Class for a RNN Model for Reddit
"""
def __init__(self):
"""
Constructor. Instantiates the RNN Model to predict the next word of a sequence of word.
Based on the TensorFlow model found here: https://gitlab.epfl.ch/sacs/efficient-federated-learning/-/blob/master/grad_guessing/data_utils.py
"""
super().__init__()
# input_length does not exist
self.embedding = nn.Embedding(VOCAB_LEN, EMBEDDING_DIM, padding_idx=0)
self.rnn_cells = nn.LSTM(EMBEDDING_DIM, 256, batch_first=True, num_layers=2)
# activation function is added in the forward pass
# Note: the tensorflow implementation did not use any activation function in this step?
# should I use one.
self.l1 = nn.Linear(256, 128)
# the tf model used sofmax activation here
self.l2 = nn.Linear(128, VOCAB_LEN)
def forward(self, x):
"""
Forward pass of the model
Parameters
----------
x : torch.tensor
The input torch tensor
Returns
-------
torch.tensor
The output torch tensor
"""
x = self.embedding(x)
x = self.rnn_cells(x)
last_layer_output = x[1][0][1, ...]
x = F.relu(self.l1(last_layer_output))
x = self.l2(x)
# softmax is applied by the CrossEntropyLoss used during training
return x
import json
import logging
import os
from collections import defaultdict
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
from torch.utils.data import DataLoader
from decentralizepy.datasets.Data import Data
from decentralizepy.datasets.Dataset import Dataset
from decentralizepy.datasets.Partitioner import DataPartitioner
from decentralizepy.mappings.Mapping import Mapping
from decentralizepy.models.Model import Model
VOCAB = list(
"dhlptx@DHLPTX $(,048cgkoswCGKOSW[_#'/37;?bfjnrvzBFJNRVZ\"&*.26:\naeimquyAEIMQUY]!%)-159\r{{}}<>"
)
VOCAB_LEN = len(VOCAB)
# Creating a mapping from unique characters to indices
char2idx = {u: i for i, u in enumerate(VOCAB)}
idx2char = np.array(VOCAB)
EMBEDDING_DIM = 8
HIDDEN_DIM = 256
NUM_CLASSES = VOCAB_LEN
NUM_LAYERS = 2
SEQ_LENGTH = 80
class Shakespeare(Dataset):
"""
Class for the Shakespeare dataset
-- Based on https://gitlab.epfl.ch/sacs/efficient-federated-learning/-/blob/master/grad_guessing/data_utils.py
"""
def __read_file__(self, file_path):
"""
Read data from the given json file
Parameters
----------
file_path : str
The file path
Returns
-------
tuple
(users, num_samples, data)
"""
with open(file_path, "r") as inf:
client_data = json.load(inf)
return (
client_data["users"],
client_data["num_samples"],
client_data["user_data"],
)
def __read_dir__(self, data_dir):
"""
Function to read all the Reddit data files in the directory
Parameters
----------
data_dir : str
Path to the folder containing the data files
Returns
-------
3-tuple
A tuple containing list of users, number of samples per client,
and the data items per client
"""
users = []
num_samples = []
data = defaultdict(lambda: None)
files = os.listdir(data_dir)
files = [f for f in files if f.endswith(".json")]
for f in files:
file_path = os.path.join(data_dir, f)
u, n, d = self.__read_file__(file_path)
users.extend(u)
num_samples.extend(n)
data.update(d)
return users, num_samples, data
def file_per_user(self, dir, write_dir):
"""
Function to read all the Reddit data files and write one file per user
Parameters
----------
dir : str
Path to the folder containing the data files
write_dir : str
Path to the folder to write the files
"""
clients, num_samples, train_data = self.__read_dir__(dir)
for index, client in enumerate(clients):
my_data = dict()
my_data["users"] = [client]
my_data["num_samples"] = num_samples[index]
my_samples = {"x": train_data[client]["x"], "y": train_data[client]["y"]}
my_data["user_data"] = {client: my_samples}
with open(os.path.join(write_dir, client + ".json"), "w") as of:
json.dump(my_data, of)
print("Created File: ", client + ".json")
def load_trainset(self):
"""
Loads the training set. Partitions it if needed.
"""
logging.info("Loading training set.")
files = os.listdir(self.train_dir)
files = [f for f in files if f.endswith(".json")]
files.sort()
c_len = len(files)
# clients, num_samples, train_data = self.__read_dir__(self.train_dir)
if self.sizes == None: # Equal distribution of data among processes
e = c_len // self.n_procs
frac = e / c_len
self.sizes = [frac] * self.n_procs
self.sizes[-1] += 1.0 - frac * self.n_procs
logging.debug("Size fractions: {}".format(self.sizes))
self.uid = self.mapping.get_uid(self.rank, self.machine_id)
my_clients = DataPartitioner(files, self.sizes).use(self.uid)
my_train_data = {"x": [], "y": []}
self.clients = []
self.num_samples = []
logging.debug("Clients Length: %d", c_len)
logging.debug("My_clients_len: %d", my_clients.__len__())
for i in range(my_clients.__len__()):
cur_file = my_clients.__getitem__(i)
clients, _, train_data = self.__read_file__(
os.path.join(self.train_dir, cur_file)
)
for cur_client in clients:
self.clients.append(cur_client)
my_train_data["x"].extend(self.process(train_data[cur_client]["x"]))
my_train_data["y"].extend(self.process(train_data[cur_client]["y"]))
self.num_samples.append(len(train_data[cur_client]["y"]))
# turns the list of lists into a single list
self.train_y = np.array(my_train_data["y"], dtype=np.dtype("int64")).reshape(-1)
self.train_x = np.array(
my_train_data["x"], dtype=np.dtype("int64")
) # .reshape(-1)
logging.info("train_x.shape: %s", str(self.train_x.shape))
logging.info("train_y.shape: %s", str(self.train_y.shape))
assert self.train_x.shape[0] == self.train_y.shape[0]
assert self.train_x.shape[0] > 0
def load_testset(self):
"""
Loads the testing set.
"""
logging.info("Loading testing set.")
_, _, d = self.__read_dir__(self.test_dir)
test_x = []
test_y = []
for test_data in d.values():
test_x.extend(self.process(test_data["x"]))
test_y.extend(self.process(test_data["y"]))
self.test_y = np.array(test_y, dtype=np.dtype("int64")).reshape(-1)
self.test_x = np.array(test_x, dtype=np.dtype("int64"))
logging.info("test_x.shape: %s", str(self.test_x.shape))
logging.info("test_y.shape: %s", str(self.test_y.shape))
assert self.test_x.shape[0] == self.test_y.shape[0]
assert self.test_x.shape[0] > 0
def __init__(
self,
rank: int,
machine_id: int,
mapping: Mapping,
n_procs="",
train_dir="",
test_dir="",
sizes="",
test_batch_size=1024,
):
"""
Constructor which reads the data files, instantiates and partitions the dataset
Parameters
----------
rank : int
Rank of the current process (to get the partition).
machine_id : int
Machine ID
mapping : decentralizepy.mappings.Mapping
Mapping to convert rank, machine_id -> uid for data partitioning
It also provides the total number of global processes
train_dir : str, optional
Path to the training data files. Required to instantiate the training set
The training set is partitioned according to the number of global processes and sizes
test_dir : str, optional
Path to the testing data files Required to instantiate the testing set
sizes : list(int), optional
A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0
By default, each process gets an equal amount.
test_batch_size : int, optional
Batch size during testing. Default value is 64
"""
super().__init__(
rank,
machine_id,
mapping,
train_dir,
test_dir,
sizes,
test_batch_size,
)
if self.__training__:
self.load_trainset()
if self.__testing__:
self.load_testset()
def process(self, x):
output = list(
map(lambda sentences: list(map(lambda c: char2idx[c], list(sentences))), x)
)
return output
def get_client_ids(self):
"""
Function to retrieve all the clients of the current process
Returns
-------
list(str)
A list of strings of the client ids.
"""
return self.clients
def get_client_id(self, i):
"""
Function to get the client id of the ith sample
Parameters
----------
i : int
Index of the sample
Returns
-------
str
Client ID
Raises
------
IndexError
If the sample index is out of bounds
"""
lb = 0
for j in range(len(self.clients)):
if i < lb + self.num_samples[j]:
return self.clients[j]
raise IndexError("i is out of bounds!")
def get_trainset(self, batch_size=1, shuffle=False):
"""
Function to get the training set
Parameters
----------
batch_size : int, optional
Batch size for learning
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the training set was not initialized
"""
if self.__training__:
return DataLoader(
Data(self.train_x, self.train_y), batch_size=batch_size, shuffle=shuffle
)
raise RuntimeError("Training set not initialized!")
def get_testset(self):
"""
Function to get the test set
Returns
-------
torch.utils.Dataset(decentralizepy.datasets.Data)
Raises
------
RuntimeError
If the test set was not initialized
"""
if self.__testing__:
thirstiest = torch.arange(0, self.test_x.shape[0], 30)
return DataLoader(
Data(self.test_x[thirstiest], self.test_y[thirstiest]),
batch_size=self.test_batch_size,
)
raise RuntimeError("Test set not initialized!")
def test(self, model, loss):
"""
Function to evaluate model on the test dataset.
Parameters
----------
model : decentralizepy.models.Model
Model to evaluate
loss : torch.nn.loss
Loss function to evaluate
Returns
-------
tuple
(accuracy, loss_value)
"""
testloader = self.get_testset()
logging.debug("Test Loader instantiated.")
correct_pred = [0 for _ in range(NUM_CLASSES)]
total_pred = [0 for _ in range(NUM_CLASSES)]
total_correct = 0
total_predicted = 0
with torch.no_grad():
loss_val = 0.0
count = 0
for elems, labels in testloader:
outputs = model(elems)
loss_val += loss(outputs, labels).item()
count += 1
_, predictions = torch.max(outputs, 1)
for label, prediction in zip(labels, predictions):
logging.debug("{} predicted as {}".format(label, prediction))
if label == prediction:
correct_pred[label] += 1
total_correct += 1
total_pred[label] += 1
total_predicted += 1
logging.debug("Predicted on the test set")
for key, value in enumerate(correct_pred):
if total_pred[key] != 0:
accuracy = 100 * float(value) / total_pred[key]
else:
accuracy = 100.0
logging.debug("Accuracy for class {} is: {:.1f} %".format(key, accuracy))
accuracy = 100 * float(total_correct) / total_predicted
loss_val = loss_val / count
logging.info("Overall accuracy is: {:.1f} %".format(accuracy))
return accuracy, loss_val
class LSTM(Model):
"""
Class for a RNN Model for Sent140
"""
def __init__(self):
"""
Constructor. Instantiates the RNN Model to predict the next word of a sequence of word.
Based on the TensorFlow model found here: https://gitlab.epfl.ch/sacs/efficient-federated-learning/-/blob/master/grad_guessing/data_utils.py
"""
super().__init__()
# input_length does not exist
self.embedding = nn.Embedding(VOCAB_LEN, EMBEDDING_DIM)
self.lstm = nn.LSTM(
EMBEDDING_DIM, HIDDEN_DIM, batch_first=True, num_layers=NUM_LAYERS
)
# activation function is added in the forward pass
# Note: the tensorflow implementation did not use any activation function in this step?
# should I use one.
self.l1 = nn.Linear(HIDDEN_DIM * SEQ_LENGTH, VOCAB_LEN)
def forward(self, x):
"""
Forward pass of the model
Parameters
----------
x : torch.tensor
The input torch tensor
Returns
-------
torch.tensor
The output torch tensor
"""
# logging.info("Initial Shape: {}".format(x.shape))
x = self.embedding(x)
# logging.info("Embedding Shape: {}".format(x.shape))
x, _ = self.lstm(x)
# logging.info("LSTM Shape: {}".format(x.shape))
x = F.relu(x.reshape((-1, HIDDEN_DIM * SEQ_LENGTH)))
# logging.info("View Shape: {}".format(x.shape))
x = self.l1(x)
# logging.info("Output Shape: {}".format(x.shape))
return x