Skip to content
Snippets Groups Projects
Commit 980552bd authored by Rishi Sharma's avatar Rishi Sharma
Browse files

Modify Data and Dataset, add barebone Node, structure config.ini

parent 612cf42e
Branches
Tags
No related merge requests found
[GRAPH]
PACKAGE = decentralizepy.graphs.SmallWorld
CLASS = decentralizepy
package = decentralizepy.graphs.SmallWorld
graph_class = SmallWorld
[DATASET]
PACKAGE = decentralizepy.datasets.Femnist
CLASS = Femnist
MODEL = LogisticRegression
dataset_package = decentralizepy.datasets.Femnist
dataset_class = Femnist
model_class = LogisticRegression
n_procs = 1.0
train_dir =
test_dir =
; python list of fractions below
sizes = [0.4, 0.2, 0.3, 0.1]
[MODEL_PARAMS]
USERNAME = xiaoxu
PASSWORD = xiaoxu
HOST = 127.0.0.1
PORT = 5432
DB = xiaoxu_database
\ No newline at end of file
[OPTIMIZER_PARAMS]
optimizer_package = torch.optim
optimizer_class = SGD
lr = 0.1
[TRAIN_PARAMS]
epochs_per_round = 25
batch_size = 64
%% Cell type:code id: tags:
```
from datasets.Femnist import Femnist
from graphs import SmallWorld
from collections import defaultdict
import os
import json
import numpy as np
```
%% Cell type:code id: tags:
```
a = FEMNIST
a
```
%% Output
datasets.FEMNIST.FEMNIST
%% Cell type:code id: tags:
```
b = SmallWorld(6, 2, 2, 1)
```
%% Cell type:code id: tags:
```
b.adj_list
```
%% Output
[{1, 2, 5, 9, 10},
{0, 2, 3, 4, 5},
{0, 1, 3, 8, 10},
{1, 2, 4, 6, 7, 8, 10},
{1, 3, 5, 8, 10},
{0, 1, 4, 6, 9},
{1, 3, 5, 7, 8, 10},
{0, 2, 3, 6, 8, 9, 11},
{1, 2, 3, 4, 6, 7, 11},
{0, 2, 4, 5, 7, 10, 11},
{0, 2, 3, 4, 5, 6, 9},
{0, 4, 7, 8, 9}]
%% Cell type:code id: tags:
```
for i in range(12):
print(b.neighbors(i))
```
%% Output
{1, 2, 5, 9, 10}
{0, 2, 3, 4, 5}
{0, 1, 3, 8, 10}
{1, 2, 4, 6, 7, 8, 10}
{1, 3, 5, 8, 10}
{0, 1, 4, 6, 9}
{1, 3, 5, 7, 8, 10}
{0, 2, 3, 6, 8, 9, 11}
{1, 2, 3, 4, 6, 7, 11}
{0, 2, 4, 5, 7, 10, 11}
{0, 2, 3, 4, 5, 6, 9}
{0, 4, 7, 8, 9}
%% Cell type:code id: tags:
```
clients = []
```
%% Cell type:code id: tags:
```
num_samples = []
data = defaultdict(lambda : None)
```
%% Cell type:code id: tags:
```
datadir = "./leaf/data/femnist/data/train"
files = os.listdir(datadir)
total_users=0
users = set()
```
%% Cell type:code id: tags:
```
files = os.listdir(datadir)[0:1]
```
%% Cell type:code id: tags:
```
for f in files:
file_path = os.path.join(datadir, f)
print(file_path)
with open(file_path, 'r') as inf:
client_data = json.load(inf)
current_users = len(client_data['users'])
print("Current_Users: ", current_users)
total_users += current_users
users.update(client_data['users'])
print("total_users: ", total_users)
print("total_users: ", len(users))
print(client_data['user_data'].keys())
print(np.array(client_data['user_data']['f3408_47']['x']).shape)
print(np.array(client_data['user_data']['f3408_47']['y']).shape)
print(np.array(client_data['user_data']['f3327_11']['x']).shape)
print(np.array(client_data['user_data']['f3327_11']['y']).shape)
print(np.unique(np.array(client_data['user_data']['f3327_11']['y'])))
```
%% Output
./leaf/data/femnist/data/train/all_data_6_niid_0_keep_0_train_9.json
Current_Users: 100
total_users: 200
total_users: 100
dict_keys(['f3408_47', 'f3327_11', 'f3417_01', 'f3339_15', 'f3580_22', 'f3414_29', 'f3328_45', 'f3592_19', 'f3516_45', 'f3130_44', 'f3321_36', 'f3284_38', 'f3232_11', 'f3547_04', 'f3265_08', 'f3500_08', 'f3243_44', 'f3349_22', 'f3118_09', 'f3179_39', 'f3381_42', 'f3198_32', 'f3299_12', 'f3237_27', 'f3593_26', 'f3133_33', 'f3591_14', 'f3231_19', 'f3478_49', 'f3447_20', 'f3442_00', 'f3464_12', 'f3293_30', 'f3111_05', 'f3227_14', 'f3146_14', 'f3165_11', 'f3440_33', 'f3379_03', 'f3529_11', 'f3441_24', 'f3253_11', 'f3238_40', 'f3583_09', 'f3256_38', 'f3325_08', 'f3512_31', 'f3214_03', 'f3572_03', 'f3457_40', 'f3419_33', 'f3496_38', 'f3582_25', 'f3205_40', 'f3353_33', 'f3115_25', 'f3517_27', 'f3567_49', 'f3230_21', 'f3336_15', 'f3415_33', 'f3280_34', 'f3294_06', 'f3171_30', 'f3363_42', 'f3105_03', 'f3545_06', 'f3426_23', 'f3102_36', 'f3164_09', 'f3202_01', 'f3365_46', 'f3450_19', 'f3573_02', 'f3290_01', 'f3443_42', 'f3471_02', 'f3136_07', 'f3553_12', 'f3434_00', 'f3537_23', 'f3479_08', 'f3578_27', 'f3286_40', 'f3155_15', 'f3494_34', 'f3460_47', 'f3595_18', 'f3518_46', 'f3433_10', 'f3538_29', 'f3266_12', 'f3375_30', 'f3390_07', 'f3261_00', 'f3221_05', 'f3139_09', 'f3234_23', 'f3341_29', 'f3485_45'])
(155, 784)
(155,)
(164, 784)
(164,)
[ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 16 18 19 20 21 22 23 24 25
26 27 29 30 31 32 33 34 35 36 37 38 39 40 43 44 45 46 47 48 49 50 51 52
53 54 55 56 57 58 60 61]
%% Cell type:code id: tags:
```
file = 'run.py'
with open(file, 'r') as inf:
print(inf.readline().strip())
print(inf.readlines())
```
%% Output
import torch
['import torch.multiprocessing as mp\n', '\n', '\n', 'x = [1, 2]\n', '\n', 'def f(id, a):\n', ' print(id, x)\n', ' print(id, a)\n', '\n', "if __name__ == '__main__':\n", ' x.append(3)\n', ' mp.spawn(f, nprocs=2, args=(x, ))']
%% Cell type:code id: tags:
```
def f(l):
l[2] = 'c'
a = ['a', 'a', 'a']
print(a)
f(a)
print(a)
```
%% Output
['a', 'a', 'a']
['a', 'a', 'c']
%% Cell type:code id: tags:
```
l = ['a', 'b', 'c']
print(l[:-1])
```
%% Output
['a', 'b']
%% Cell type:code id: tags:
```
import configparser
from localconfig import LocalConfig
def read_ini(file_path):
config = configparser.ConfigParser()
config.read(file_path)
for section in config.sections():
for key in config[section]:
print((key, config[section][key]))
config = LocalConfig(file_path)
for section in config:
print("Section: ", section)
for key, value in config.items(section):
print((key, value))
read_ini("config.ini")
```
%% Output
Section: GRAPH
('package', 'decentralizepy.graphs.SmallWorld')
('graph_class', 'SmallWorld')
Section: DATASET
('package', 'decentralizepy.datasets.Femnist')
('class', 'Femnist')
('model', 'LogisticRegression')
('username', 'xiaoxu')
('password', 'xiaoxu')
('host', '127.0.0.1')
('port', '5432')
('db', 'xiaoxu_database')
('dataset_class', 'Femnist')
('model_class', 'LogisticRegression')
('n_procs', 1.0)
('train_dir', '')
('test_dir', '')
('sizes', '[0.4, 0.2, 0.3, 0.1]')
Section: MODEL_PARAMS
('optimizer_package', 'torch.optim')
('optimizer_class', 'SGD')
('lr', 0.1)
%% Cell type:code id: tags:
```
def func(a = 1, b = 2, c = 3):
print(a + b + c)
l = [3, 5, 7]
func(*l)
```
%% Output
15
%% Cell type:code id: tags:
```
from torch import multiprocessing as mp
mp.spawn(fn = func, nprocs = 2, args = [], kwargs = {'a': 4, 'b': 5, 'c': 6})
```
%% Output
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
/tmp/ipykernel_52405/4231740097.py in <module>
1 from torch import multiprocessing as mp
2
----> 3 mp.spawn(fn = func, nprocs = 2, args = [], kwargs = {'a': 4, 'b': 5, 'c': 6})
TypeError: spawn() got an unexpected keyword argument 'kwargs'
%% Cell type:code id: tags:
```
l = '[0.4, 0.2, 0.3, 0.1]'
type(eval(l))
```
%% Output
list
%% Cell type:code id: tags:
```
```
......
......@@ -41,6 +41,7 @@ install_requires =
jsonlines
pillow
smallworld
localconfig
include_package_data = True
python_requires = >=3.6
[options.packages.find]
......
#!$CONDA_PREFIX/bin/env python
#!$CONDA_PREFIX/python
from setuptools import setup
# https://packaging.python.org/guides/single-sourcing-package-version/
......
class Data:
"""
This class defines the API for Data.
"""
def __init__(self, x, y):
"""
Constructor
Parameters
----------
x : numpy array
A numpy array of data samples
y : numpy array
A numpy array of outputs corresponding to the sample
"""
self.x = x
self.y = y
def __get_item__(self, i):
"""
Function to get the item with index i.
Parameters
----------
i : int
Index
Returns
-------
2-tuple
A tuple of the ith data sample and it's corresponding label
"""
return self.x[i], self.y[i]
def __conditional_value__(var, nul, default):
if var != nul:
return var
else:
return default
class Dataset:
"""
This class defines the API for Dataset.
This class defines the Dataset API.
All datasets must follow this API.
"""
def __init__(self, x, y):
def __init__(self, rank='', n_procs='', train_dir='', test_dir='', sizes=''):
"""
Constructor
Constructor which reads the data files, instantiates and partitions the dataset
Parameters
----------
x : numpy array
A numpy array of data samples
y : numpy array
A numpy array of outputs corresponding to the sample
rank : int, optional
Rank of the current process (to get the partition). Default value is assigned 0
n_procs : int, optional
The number of processes among which to divide the data. Default value is assigned 1
train_dir : str, optional
Path to the training data files. Required to instantiate the training set
The training set is partitioned according to n_procs and sizes
test_dir : str. optional
Path to the testing data files Required to instantiate the testing set
sizes : list(int), optional
A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0
By default, each process gets an equal amount.
"""
self.x = x
self.y = y
self.rank = __conditional_value__(rank, '', 0)
self.n_procs = __conditional_value__(n_procs, '', 1)
self.train_dir = __conditional_value__(train_dir, '', None)
self.test_dir = __conditional_value__(test_dir, '', None)
self.sizes = __conditional_value__(sizes, '', None)
if self.sizes:
if type(self.sizes) == str:
self.sizes = eval(self.sizes)
def __get_item__(self, i):
if train_dir:
self.__training__ = True
else:
self.__training__ = False
if test_dir:
self.__testing__ = True
else:
self.__testing__ = False
def get_trainset(self):
"""
Function to get the item with index i.
Parameters
----------
i : int
Index
Function to get the training set
Returns
-------
Dataset
Raises
------
RuntimeError
If the training set was not initialized
"""
raise NotImplementedError
def get_testset(self):
"""
Function to get the test set
Returns
-------
2-tuple
A tuple of the ith data sample and it's corresponding label
Dataset
Raises
------
RuntimeError
If the test set was not initialized
"""
return self.x[i], self.y[i]
raise NotImplementedError
......@@ -7,6 +7,7 @@ import torch
import torch.nn.functional as F
from torch import nn
from decentralizepy.datasets.Data import Data
from decentralizepy.datasets.Dataset import Dataset
from decentralizepy.datasets.Partitioner import DataPartitioner
......@@ -45,41 +46,41 @@ def __read_dir__(data_dir):
return clients, num_samples, data
class Femnist:
class Femnist(Dataset):
"""
Class for the FEMNIST dataset
"""
def __init__(self, train_dir=None, test_dir=None, rank=0, n_procs=1, sizes=None):
def __init__(self, rank='', n_procs='', train_dir='', test_dir='', sizes=''):
"""
Constructor which reads the data files, instantiates and partitions the dataset
Parameters
----------
rank : int, optional
Rank of the current process (to get the partition). Default value is assigned 0
n_procs : int, optional
The number of processes among which to divide the data. Default value is assigned 1
train_dir : str, optional
Path to the training data files. Required to instantiate the training set
The training set is partitioned according to n_procs and sizes
test_dir : str. optional
Path to the testing data files Required to instantiate the testing set
rank : int, optional
Rank of the current process (to get the partition). Default value is 0
n_procs : int, optional
The number of processes among which to divide the data
sizes : list(int), optional
A list of fractions specifying how much data to alot each process.
A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0
By default, each process gets an equal amount.
"""
if train_dir:
self.__training__ = True
super().__init__(rank, n_procs, train_dir, test_dir, sizes)
if self.__training__:
clients, num_samples, train_data = __read_dir__(train_dir)
c_len = len(self.clients)
if sizes == None: # Equal distribution of data among processes
e = c_len // n_procs
e = c_len // self.n_procs
frac = e / c_len
sizes = [frac] * n_procs
sizes[-1] += 1.0 - frac * n_procs
sizes = [frac] * self.n_procs
sizes[-1] += 1.0 - frac * self.n_procs
my_clients = DataPartitioner(clients, sizes).use(rank)
my_clients = DataPartitioner(clients, sizes).use(self.rank)
my_train_data = []
self.clients = []
self.num_samples = []
......@@ -96,8 +97,7 @@ class Femnist:
my_train_data["y"], dtype=np.dtype("float64")
).reshape(-1, 1)
if test_dir:
self.__testing__ = True
if self.__testing__:
_, _, test_data = __read_dir__(test_dir)
test_data = test_data.values()
self.test_x = np.array(test_data["x"], dtype=np.dtype("float64")).reshape(
......@@ -154,7 +154,7 @@ class Femnist:
If the training set was not initialized
"""
if self.__training__:
return Dataset(self.train_x, self.train_y)
return Data(self.train_x, self.train_y)
raise RuntimeError("Training set not initialized!")
def get_testset(self):
......@@ -169,7 +169,7 @@ class Femnist:
If the test set was not initialized
"""
if self.__testing__:
return Dataset(self.test_x, self.test_y)
return Data(self.test_x, self.test_y)
raise RuntimeError("Test set not initialized!")
......
from torch import utils, optim
import importlib
def __remove_keys__(d, keys_to_remove):
return {key: d[key] for key in d if key not in keys_to_remove}
class Node:
"""
This class defines the node (entity that performs learning, sharing and communication).
"""
def __init__(self, rank, mapping, graph, options):
def __init__(self, rank, mapping, graph, config, *args):
"""
Constructor
Parameters
----------
rank : int
Rank of process local to the machine
n_procs_local : int
Number of processes on current machine
mapping : decentralizepy.mappings
The object containing the mapping rank <--> uid
graph : decentralizepy.graphs
The object containing the global graph
dataset : decentralizepy.datasets class
The class whose object will be instantiated to init the dataset
config : dict
A dictionary of configurations. Must contain the following:
[DATASET]
dataset_package
dataset_class
model_class
[OPTIMIZER_PARAMS]
optimizer_package
optimizer_class
args : optional
Other arguments
"""
self.rank = rank
self.graph = graph
self.mapping = mapping
self.options = options
dataset_configs = dict(config.items("DATASET"))
dataset_module = importlib.import_module(dataset_configs["dataset_package"])
dataset_class = getattr(dataset_module, dataset_configs["dataset_class"])
dataset_params = __remove_keys__(dataset_configs, ["dataset_package", "dataset_class", "model_class"])
self.dataset = dataset_class(rank, **dataset_params)
self.trainset = self.dataset.get_trainset()
model_class = getattr(dataset_module, dataset_configs["model_class"])
self.model = model_class()
optimizer_configs = dict(config.items("OPTIMIZER_PARAMS"))
optimizer_module = importlib.import_module(optimizer_configs["optimizer_package"])
optimizer_class = getattr(optimizer_module, optimizer_configs["optimizer_class"])
optimizer_params = __remove_keys__(optimizer_configs, ["optimizer_package", "optimizer_class"])
self.optimizer = optimizer_class(self.model.parameters(), **optimizer_params)
def __get_item__(self, i):
self.run()
def train_step(self):
"""
Function to get the item with index i.
Parameters
----------
i : int
Index
Returns
-------
2-tuple
A tuple of the ith data sample and it's corresponding label
The training step
"""
for epoch in self.epochs_per_round: # Instantiate this variable
for data, target in self.trainset:
# Perform training step
raise NotImplementedError
def run(self):
"""
The learning loop.
"""
return self.x[i], self.y[i]
while True:
# Receive data
# Learn
# Send data
raise NotImplementedError
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment