Skip to content
Snippets Groups Projects
Commit 612cf42e authored by Rishi Sharma's avatar Rishi Sharma
Browse files

Initial Commit

parents
No related branches found
No related tags found
No related merge requests found
Showing
with 937 additions and 0 deletions
**/.idea
**/__pycache__/
**/data/
**/.DS_Store
**/results/
**/experiment_results/
**/.vscode
**/leaf/
**.egg-info
[settings]
multi_line_output=3
include_trailing_comma=True
force_grid_wrap=0
combine_as_imports=True
line_length=88
# decentralizepy
\ No newline at end of file
[GRAPH]
PACKAGE = decentralizepy.graphs.SmallWorld
CLASS = decentralizepy
[DATASET]
PACKAGE = decentralizepy.datasets.Femnist
CLASS = Femnist
MODEL = LogisticRegression
[MODEL_PARAMS]
USERNAME = xiaoxu
PASSWORD = xiaoxu
HOST = 127.0.0.1
PORT = 5432
DB = xiaoxu_database
\ No newline at end of file
leaf.sh 0 → 100755
echo "[Cloning leaf repository]"
git clone https://github.com/TalwalkarLab/leaf.git
echo "[Installing unzip]"
sudo apt-get install unzip
cd leaf/data/shakespeare
echo "[Generating non-iid data]"
./preprocess.sh -s niid --sf 1.0 -k 0 -t sample -tf 0.8 --smplseed 10 --spltseed 10
echo "[Data Generated]"
\ No newline at end of file
%% Cell type:code id: tags:
```
from datasets.Femnist import Femnist
from graphs import SmallWorld
from collections import defaultdict
import os
import json
import numpy as np
```
%% Cell type:code id: tags:
```
a = FEMNIST
a
```
%% Output
datasets.FEMNIST.FEMNIST
%% Cell type:code id: tags:
```
b = SmallWorld(6, 2, 2, 1)
```
%% Cell type:code id: tags:
```
b.adj_list
```
%% Output
[{1, 2, 5, 9, 10},
{0, 2, 3, 4, 5},
{0, 1, 3, 8, 10},
{1, 2, 4, 6, 7, 8, 10},
{1, 3, 5, 8, 10},
{0, 1, 4, 6, 9},
{1, 3, 5, 7, 8, 10},
{0, 2, 3, 6, 8, 9, 11},
{1, 2, 3, 4, 6, 7, 11},
{0, 2, 4, 5, 7, 10, 11},
{0, 2, 3, 4, 5, 6, 9},
{0, 4, 7, 8, 9}]
%% Cell type:code id: tags:
```
for i in range(12):
print(b.neighbors(i))
```
%% Output
{1, 2, 5, 9, 10}
{0, 2, 3, 4, 5}
{0, 1, 3, 8, 10}
{1, 2, 4, 6, 7, 8, 10}
{1, 3, 5, 8, 10}
{0, 1, 4, 6, 9}
{1, 3, 5, 7, 8, 10}
{0, 2, 3, 6, 8, 9, 11}
{1, 2, 3, 4, 6, 7, 11}
{0, 2, 4, 5, 7, 10, 11}
{0, 2, 3, 4, 5, 6, 9}
{0, 4, 7, 8, 9}
%% Cell type:code id: tags:
```
clients = []
```
%% Cell type:code id: tags:
```
num_samples = []
data = defaultdict(lambda : None)
```
%% Cell type:code id: tags:
```
datadir = "./leaf/data/femnist/data/train"
files = os.listdir(datadir)
total_users=0
users = set()
```
%% Cell type:code id: tags:
```
files = os.listdir(datadir)[0:1]
```
%% Cell type:code id: tags:
```
for f in files:
file_path = os.path.join(datadir, f)
print(file_path)
with open(file_path, 'r') as inf:
client_data = json.load(inf)
current_users = len(client_data['users'])
print("Current_Users: ", current_users)
total_users += current_users
users.update(client_data['users'])
print("total_users: ", total_users)
print("total_users: ", len(users))
print(client_data['user_data'].keys())
print(np.array(client_data['user_data']['f3408_47']['x']).shape)
print(np.array(client_data['user_data']['f3408_47']['y']).shape)
print(np.array(client_data['user_data']['f3327_11']['x']).shape)
print(np.array(client_data['user_data']['f3327_11']['y']).shape)
print(np.unique(np.array(client_data['user_data']['f3327_11']['y'])))
```
%% Output
./leaf/data/femnist/data/train/all_data_6_niid_0_keep_0_train_9.json
Current_Users: 100
total_users: 200
total_users: 100
dict_keys(['f3408_47', 'f3327_11', 'f3417_01', 'f3339_15', 'f3580_22', 'f3414_29', 'f3328_45', 'f3592_19', 'f3516_45', 'f3130_44', 'f3321_36', 'f3284_38', 'f3232_11', 'f3547_04', 'f3265_08', 'f3500_08', 'f3243_44', 'f3349_22', 'f3118_09', 'f3179_39', 'f3381_42', 'f3198_32', 'f3299_12', 'f3237_27', 'f3593_26', 'f3133_33', 'f3591_14', 'f3231_19', 'f3478_49', 'f3447_20', 'f3442_00', 'f3464_12', 'f3293_30', 'f3111_05', 'f3227_14', 'f3146_14', 'f3165_11', 'f3440_33', 'f3379_03', 'f3529_11', 'f3441_24', 'f3253_11', 'f3238_40', 'f3583_09', 'f3256_38', 'f3325_08', 'f3512_31', 'f3214_03', 'f3572_03', 'f3457_40', 'f3419_33', 'f3496_38', 'f3582_25', 'f3205_40', 'f3353_33', 'f3115_25', 'f3517_27', 'f3567_49', 'f3230_21', 'f3336_15', 'f3415_33', 'f3280_34', 'f3294_06', 'f3171_30', 'f3363_42', 'f3105_03', 'f3545_06', 'f3426_23', 'f3102_36', 'f3164_09', 'f3202_01', 'f3365_46', 'f3450_19', 'f3573_02', 'f3290_01', 'f3443_42', 'f3471_02', 'f3136_07', 'f3553_12', 'f3434_00', 'f3537_23', 'f3479_08', 'f3578_27', 'f3286_40', 'f3155_15', 'f3494_34', 'f3460_47', 'f3595_18', 'f3518_46', 'f3433_10', 'f3538_29', 'f3266_12', 'f3375_30', 'f3390_07', 'f3261_00', 'f3221_05', 'f3139_09', 'f3234_23', 'f3341_29', 'f3485_45'])
(155, 784)
(155,)
(164, 784)
(164,)
[ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 16 18 19 20 21 22 23 24 25
26 27 29 30 31 32 33 34 35 36 37 38 39 40 43 44 45 46 47 48 49 50 51 52
53 54 55 56 57 58 60 61]
%% Cell type:code id: tags:
```
file = 'run.py'
with open(file, 'r') as inf:
print(inf.readline().strip())
print(inf.readlines())
```
%% Output
import torch
['import torch.multiprocessing as mp\n', '\n', '\n', 'x = [1, 2]\n', '\n', 'def f(id, a):\n', ' print(id, x)\n', ' print(id, a)\n', '\n', "if __name__ == '__main__':\n", ' x.append(3)\n', ' mp.spawn(f, nprocs=2, args=(x, ))']
%% Cell type:code id: tags:
```
def f(l):
l[2] = 'c'
a = ['a', 'a', 'a']
print(a)
f(a)
print(a)
```
%% Output
['a', 'a', 'a']
['a', 'a', 'c']
%% Cell type:code id: tags:
```
l = ['a', 'b', 'c']
print(l[:-1])
```
%% Output
['a', 'b']
%% Cell type:code id: tags:
```
import configparser
def read_ini(file_path):
config = configparser.ConfigParser()
config.read(file_path)
for section in config.sections():
for key in config[section]:
print((key, config[section][key]))
read_ini("config.ini")
```
%% Output
('package', 'decentralizepy.datasets.Femnist')
('class', 'Femnist')
('model', 'LogisticRegression')
('username', 'xiaoxu')
('password', 'xiaoxu')
('host', '127.0.0.1')
('port', '5432')
('db', 'xiaoxu_database')
%% Cell type:code id: tags:
```
```
[build-system]
requires = [
"setuptools>=42",
"wheel"
]
build-backend = "setuptools.build_meta"
\ No newline at end of file
run.py 0 → 100644
import torch
import torch.multiprocessing as mp
x = [1, 2]
def f(id, a):
print(id, x)
print(id, a)
if __name__ == "__main__":
x.append(3)
mp.spawn(f, nprocs=2, args=(x,))
[metadata]
name = decentralizepy
version = 0.1.dev0
author = Rishi Sharma
author_email = rishi.sharma@epfl.ch
license = MIT
description = A framework to write decentralized machine learning applications
keywords =
python
decentralized
ml
learning
sacs
url = https://rishisharma.netlify.app
download_url = https://gitlab.epfl.ch/risharma/decentralizepy
long_description = file: README.rst
classifiers =
Development Status :: 4 - Beta
Intended Audience :: Education
Intended Audience :: Science/Research
License :: OSI Approved :: MIT License
Operating System :: OS Independent
Programming Language :: Python
Programming Language :: Python :: 3
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Topic :: Scientific/Engineering
[options]
package_dir =
= src
packages = find:
zip_safe = False
install_requires =
numpy
torch
torchvision
matplotlib
networkx
zmq
jsonlines
pillow
smallworld
include_package_data = True
python_requires = >=3.6
[options.packages.find]
where = src
[options.extras_require]
dev =
black
coverage
isort
pytest
pytest-xdist
pytest-cov<2.6.0
pycodestyle
sphinx
alabaster
tox
[tool:pytest]
norecursedirs =
.git
dist
build
python_files =
test_*.py
doctest_plus = disabled
addopts = --strict
markers =
slow
remote_data
filterwarnings
mpl_image_compare
[flake8]
ignore = E203, E266, E501, W503
max-line-length = 80
max-complexity = 18
select = B,C,E,F,W,T4,B9
#!$CONDA_PREFIX/bin/env python
from setuptools import setup
# https://packaging.python.org/guides/single-sourcing-package-version/
# http://blog.ionelmc.ro/2014/05/25/python-packaging/
setup(setup_cfg=True)
class Dataset:
"""
This class defines the API for Dataset.
"""
def __init__(self, x, y):
"""
Constructor
Parameters
----------
x : numpy array
A numpy array of data samples
y : numpy array
A numpy array of outputs corresponding to the sample
"""
self.x = x
self.y = y
def __get_item__(self, i):
"""
Function to get the item with index i.
Parameters
----------
i : int
Index
Returns
-------
2-tuple
A tuple of the ith data sample and it's corresponding label
"""
return self.x[i], self.y[i]
import json
import os
from collections import defaultdict
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
from decentralizepy.datasets.Dataset import Dataset
from decentralizepy.datasets.Partitioner import DataPartitioner
NUM_CLASSES = 62
IMAGE_SIZE = (28, 28)
FLAT_SIZE = 28 * 28
def __read_dir__(data_dir):
"""
Function to read all the FEMNIST data files in the directory
Parameters
----------
data_dir : str
Path to the folder containing the data files
Returns
-------
3-tuple
A tuple containing list of clients, number of samples per client,
and the data items per client
"""
clients = []
num_samples = []
data = defaultdict(lambda: None)
files = os.listdir(data_dir)
files = [f for f in files if f.endswith(".json")]
for f in files:
file_path = os.path.join(data_dir, f)
with open(file_path, "r") as inf:
client_data = json.load(inf)
clients.extend(client_data["users"])
num_samples.extend(client_data["num_samples"])
data.update(client_data["user_data"])
return clients, num_samples, data
class Femnist:
"""
Class for the FEMNIST dataset
"""
def __init__(self, train_dir=None, test_dir=None, rank=0, n_procs=1, sizes=None):
"""
Constructor which reads the data files, instantiates and partitions the dataset
Parameters
----------
train_dir : str, optional
Path to the training data files. Required to instantiate the training set
The training set is partitioned according to n_procs and sizes
test_dir : str. optional
Path to the testing data files Required to instantiate the testing set
rank : int, optional
Rank of the current process (to get the partition). Default value is 0
n_procs : int, optional
The number of processes among which to divide the data
sizes : list(int), optional
A list of fractions specifying how much data to alot each process.
By default, each process gets an equal amount.
"""
if train_dir:
self.__training__ = True
clients, num_samples, train_data = __read_dir__(train_dir)
c_len = len(self.clients)
if sizes == None: # Equal distribution of data among processes
e = c_len // n_procs
frac = e / c_len
sizes = [frac] * n_procs
sizes[-1] += 1.0 - frac * n_procs
my_clients = DataPartitioner(clients, sizes).use(rank)
my_train_data = []
self.clients = []
self.num_samples = []
for i in range(my_clients.__len__()):
cur_client = my_clients.__get_item__(i)
self.clients.append(cur_client)
my_train_data.extend(train_data[cur_client])
self.num_samples.append(len(train_data[cur_client]["y"]))
self.train_x = np.array(
my_train_data["x"], dtype=np.dtype("float64")
).reshape(-1, 28, 28, 1)
self.train_y = np.array(
my_train_data["y"], dtype=np.dtype("float64")
).reshape(-1, 1)
if test_dir:
self.__testing__ = True
_, _, test_data = __read_dir__(test_dir)
test_data = test_data.values()
self.test_x = np.array(test_data["x"], dtype=np.dtype("float64")).reshape(
-1, 28, 28, 1
)
self.test_y = np.array(test_data["y"], dtype=np.dtype("float64")).reshape(
-1, 1
)
# TODO: Add Validation
def get_client_ids(self):
"""
Function to retrieve all the clients of the current process
Returns
-------
list(str)
A list of strings of the client ids.
"""
return self.clients
def get_client_id(self, i):
"""
Function to get the client id of the ith sample
Parameters
----------
i : int
Index of the sample
Returns
-------
str
Client ID
Raises
------
IndexError
If the sample index is out of bounds
"""
lb = 0
for j in range(len(self.clients)):
if i < lb + self.num_samples[j]:
return self.clients[j]
raise IndexError("i is out of bounds!")
def get_trainset(self):
"""
Function to get the training set
Returns
-------
Dataset
Raises
------
RuntimeError
If the training set was not initialized
"""
if self.__training__:
return Dataset(self.train_x, self.train_y)
raise RuntimeError("Training set not initialized!")
def get_testset(self):
"""
Function to get the test set
Returns
-------
Dataset
Raises
------
RuntimeError
If the test set was not initialized
"""
if self.__testing__:
return Dataset(self.test_x, self.test_y)
raise RuntimeError("Test set not initialized!")
class LogisticRegression(nn.Module):
"""
Class for a Logistic Regression Neural Network for FEMNIST
"""
def __init__(self):
"""
Constructor. Instantiates the Logistic Regression Model
with 28*28 Input and 62 output classes
"""
super().__init__()
self.fc1 = nn.Linear(FLAT_SIZE, NUM_CLASSES)
def forward(self, x):
"""
Forward pass of the model
Parameters
----------
x : torch.tensor
The input torch tensor
Returns
-------
torch.tensor
The output torch tensor
"""
x = torch.flatten(x, start_dim=1)
x = self.fc1(x)
return F.log_softmax(x, dim=1)
from random import Random
""" Adapted from https://pytorch.org/tutorials/intermediate/dist_tuto.html """
class Partition(object):
"""
Class for holding the data partition
"""
def __init__(self, data, index):
"""
Constructor. Caches the data and the indices
Parameters
----------
data : indexable
index : list
A list of indices
"""
self.data = data
self.index = index
def __len__(self):
"""
Function to retrieve the length
Returns
-------
int
Number of items in the data
"""
return len(self.index)
def __getitem__(self, index):
"""
Retrieves the item in data with the given index
Parameters
----------
index : int
Returns
-------
Data
The data sample with the given `index` in the dataset
"""
data_idx = self.index[index]
return self.data[data_idx]
class DataPartitioner(object):
"""
Class to partition the dataset
"""
def __init__(self, data, sizes=[1.0], seed=1234):
"""
Constructor. Partitions the data according the parameters
Parameters
----------
data : indexable
An indexable list of data items
sizes : list(float)
A list of fractions for each process
seed : int, optional
Seed for generating a random subset
"""
self.data = data
self.partitions = []
rng = Random()
rng.seed(seed)
data_len = len(data)
indexes = [x for x in range(0, data_len)]
rng.shuffle(indexes)
for frac in sizes:
part_len = int(frac * data_len)
self.partitions.append(indexes[0:part_len])
indexes = indexes[part_len:]
def use(self, rank):
"""
Get the partition for the process with the given `rank`
Parameters
----------
rank : int
Rank of the current process
Returns
-------
Partition
The dataset partition of the current process
"""
return Partition(self.data, self.partitions[rank])
from .Femnist import Femnist
class Graph:
"""
This class defines the graph topology.
Adapted from https://gitlab.epfl.ch/sacs/ml-rawdatasharing/dnn-recommender/-/blob/master/api.py
"""
def __init__(self, n_procs=None):
"""
Constructor
Parameters
----------
n_procs : int, optional
Number of processes in the graph, if already known
"""
if n_procs != None:
self.n_procs = n_procs
self.adj_list = [set() for i in range(self.n_procs)]
def __insert_adj__(self, node, neighbours):
"""
Inserts `neighbours` into the adjacency list of `node`
Parameters
----------
node : int
The vertex in question
neighbours : list(int)
A list of neighbours of the `node`
"""
self.adj_list[node].update(neighbours)
def __insert_edge__(self, x, y):
"""
Inserts edge `x -> y` into the graph
Parameters
----------
x : int
The source vertex
y : int
The destination vertex
"""
self.adj_list[x].add(y)
self.adj_list[y].add(x)
def read_graph_from_file(self, file, type="edges", force_connect=False):
"""
Reads the graph from a given file
Parameters
----------
file : str
path to the file
type : `edges` or `adjacency`
force_connect : bool, optional
Should the graph be force-connected using a ring
Returns
-------
int
Number of processes, read from the first line of the file
Raises
------
ValueError
If the type is not either `edges` or `adjacency`
"""
with open(file, "r") as inf:
self.n_procs = int(inf.readline().strip())
self.adj_list = [set() for i in range(self.n_procs)]
lines = inf.readlines()
if type == "edges":
for line in lines:
x, y = map(int, line.strip().split())
self.__insert_edge__(x, y)
elif type == "adjacency":
node_id = 0
for line in lines:
neighbours = line.strip().split()
self.__insert_adj__(node_id, neighbours)
node_id += 1
else:
raise ValueError("Type must be from {edges, adjacency}!")
if force_connect:
self.connect_graph()
return self.n_procs
def connect_graph(self):
"""
Connects the graph using a Ring
"""
for node in range(self.n_procs):
self.adj_list[node].add((node + 1) % self.n_procs)
self.adj_list[node].add((node - 1) % self.n_procs)
def neighbors(self, uid):
"""
Gives the neighbors of a node
Parameters
----------
uid : int
globally unique identifier of the node
Returns
-------
set(int)
a set of neighbours
"""
return self.adj_list[uid]
import smallworld
from decentralizepy.graphs.Graph import Graph
class SmallWorld(Graph):
"""
The class for generating a SmallWorld topology Graph
Adapted from https://gitlab.epfl.ch/sacs/ml-rawdatasharing/dnn-recommender/-/blob/master/topologies.py
"""
def __init__(self, n_procs, k_over_2, beta):
"""
Constructor. Generates a random connected SmallWorld graph
Parameters
----------
n_procs : int
total number of nodes in the graph
k_over_2 : int
k_over_2 config for smallworld
beta : int
beta config for smallworld. β = 1 is truly equal to the Erdős-Rényi network model
"""
super().__init__(n_procs)
G = smallworld.get_smallworld_graph(self.n_procs, k_over_2, beta)
for edge in list(G.edges):
node1 = edge[0]
node2 = edge[1]
self.adj_list[node1].add(node2)
self.adj_list[node2].add(node1)
self.connect_graph()
from .Graph import Graph
from .SmallWorld import SmallWorld
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment