diff --git a/config.ini b/config.ini index 056ff2675a9fc09861ee55ab5e5e3e57319d99d7..3420bba9dbb478ea7aa219952ace6769502713c0 100644 --- a/config.ini +++ b/config.ini @@ -1,15 +1,22 @@ [GRAPH] -PACKAGE = decentralizepy.graphs.SmallWorld -CLASS = decentralizepy +package = decentralizepy.graphs.SmallWorld +graph_class = SmallWorld [DATASET] -PACKAGE = decentralizepy.datasets.Femnist -CLASS = Femnist -MODEL = LogisticRegression +dataset_package = decentralizepy.datasets.Femnist +dataset_class = Femnist +model_class = LogisticRegression +n_procs = 1.0 +train_dir = +test_dir = +; python list of fractions below +sizes = [0.4, 0.2, 0.3, 0.1] -[MODEL_PARAMS] -USERNAME = xiaoxu -PASSWORD = xiaoxu -HOST = 127.0.0.1 -PORT = 5432 -DB = xiaoxu_database \ No newline at end of file +[OPTIMIZER_PARAMS] +optimizer_package = torch.optim +optimizer_class = SGD +lr = 0.1 + +[TRAIN_PARAMS] +epochs_per_round = 25 +batch_size = 64 diff --git a/main.ipynb b/main.ipynb index b74c241cacfd0b70f0f1e67a8b41151095ef29e1..f679c226f1addc50902bb0d69066586e769bb2a5 100644 --- a/main.ipynb +++ b/main.ipynb @@ -254,37 +254,110 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ + "Section: GRAPH\n", + "('package', 'decentralizepy.graphs.SmallWorld')\n", + "('graph_class', 'SmallWorld')\n", + "Section: DATASET\n", "('package', 'decentralizepy.datasets.Femnist')\n", - "('class', 'Femnist')\n", - "('model', 'LogisticRegression')\n", - "('username', 'xiaoxu')\n", - "('password', 'xiaoxu')\n", - "('host', '127.0.0.1')\n", - "('port', '5432')\n", - "('db', 'xiaoxu_database')\n" + "('dataset_class', 'Femnist')\n", + "('model_class', 'LogisticRegression')\n", + "('n_procs', 1.0)\n", + "('train_dir', '')\n", + "('test_dir', '')\n", + "('sizes', '[0.4, 0.2, 0.3, 0.1]')\n", + "Section: MODEL_PARAMS\n", + "('optimizer_package', 'torch.optim')\n", + "('optimizer_class', 'SGD')\n", + "('lr', 0.1)\n" ] } ], "source": [ - "import configparser\n", + "from localconfig import LocalConfig\n", "\n", "def read_ini(file_path):\n", - " config = configparser.ConfigParser()\n", - " config.read(file_path)\n", - " for section in config.sections():\n", - " for key in config[section]:\n", - " print((key, config[section][key]))\n", + " config = LocalConfig(file_path)\n", + " for section in config:\n", + " print(\"Section: \", section)\n", + " for key, value in config.items(section):\n", + " print((key, value))\n", " \n", "read_ini(\"config.ini\")" ] }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "15\n" + ] + } + ], + "source": [ + "def func(a = 1, b = 2, c = 3):\n", + " print(a + b + c)\n", + "\n", + "l = [3, 5, 7]\n", + "\n", + "func(*l)" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "ename": "TypeError", + "evalue": "spawn() got an unexpected keyword argument 'kwargs'", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mTypeError\u001b[0m Traceback (most recent call last)", + "\u001b[0;32m/tmp/ipykernel_52405/4231740097.py\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[0;32mfrom\u001b[0m \u001b[0mtorch\u001b[0m \u001b[0;32mimport\u001b[0m \u001b[0mmultiprocessing\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0mmp\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 2\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 3\u001b[0;31m \u001b[0mmp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mspawn\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfn\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mnprocs\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;36m2\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0margs\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mkwargs\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m{\u001b[0m\u001b[0;34m'a'\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0;36m4\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'b'\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0;36m5\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'c'\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0;36m6\u001b[0m\u001b[0;34m}\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m", + "\u001b[0;31mTypeError\u001b[0m: spawn() got an unexpected keyword argument 'kwargs'" + ] + } + ], + "source": [ + "from torch import multiprocessing as mp\n", + "\n", + "mp.spawn(fn = func, nprocs = 2, args = [], kwargs = {'a': 4, 'b': 5, 'c': 6})" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "list" + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "l = '[0.4, 0.2, 0.3, 0.1]'\n", + "type(eval(l))" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/setup.cfg b/setup.cfg index 513a02d19f65b91c304096beaa39eb0f6d015aa7..3faa1f36fc490a44ab218e6ce2c38aa78c9b9016 100644 --- a/setup.cfg +++ b/setup.cfg @@ -41,6 +41,7 @@ install_requires = jsonlines pillow smallworld + localconfig include_package_data = True python_requires = >=3.6 [options.packages.find] diff --git a/setup.py b/setup.py index b1f7e7220d3ee14badbbae866cdafa29a8f6ca4c..36dd13a24c351dc71aefc97e8a00cd37736850a6 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,4 @@ -#!$CONDA_PREFIX/bin/env python +#!$CONDA_PREFIX/python from setuptools import setup # https://packaging.python.org/guides/single-sourcing-package-version/ diff --git a/src/decentralizepy/datasets/Data.py b/src/decentralizepy/datasets/Data.py new file mode 100644 index 0000000000000000000000000000000000000000..2e8b2c2f916683f5f15bc0b39dccc69abe06d385 --- /dev/null +++ b/src/decentralizepy/datasets/Data.py @@ -0,0 +1,31 @@ +class Data: + """ + This class defines the API for Data. + """ + + def __init__(self, x, y): + """ + Constructor + Parameters + ---------- + x : numpy array + A numpy array of data samples + y : numpy array + A numpy array of outputs corresponding to the sample + """ + self.x = x + self.y = y + + def __get_item__(self, i): + """ + Function to get the item with index i. + Parameters + ---------- + i : int + Index + Returns + ------- + 2-tuple + A tuple of the ith data sample and it's corresponding label + """ + return self.x[i], self.y[i] diff --git a/src/decentralizepy/datasets/Dataset.py b/src/decentralizepy/datasets/Dataset.py index 326239e146441a243ac338ba7e2ac190b81615af..b509ce0b3a075929505c7695c1700fe324795c7d 100644 --- a/src/decentralizepy/datasets/Dataset.py +++ b/src/decentralizepy/datasets/Dataset.py @@ -1,31 +1,76 @@ +def __conditional_value__(var, nul, default): + if var != nul: + return var + else: + return default + + class Dataset: """ - This class defines the API for Dataset. + This class defines the Dataset API. + All datasets must follow this API. """ - def __init__(self, x, y): + def __init__(self, rank='', n_procs='', train_dir='', test_dir='', sizes=''): """ - Constructor + Constructor which reads the data files, instantiates and partitions the dataset Parameters ---------- - x : numpy array - A numpy array of data samples - y : numpy array - A numpy array of outputs corresponding to the sample + rank : int, optional + Rank of the current process (to get the partition). Default value is assigned 0 + n_procs : int, optional + The number of processes among which to divide the data. Default value is assigned 1 + train_dir : str, optional + Path to the training data files. Required to instantiate the training set + The training set is partitioned according to n_procs and sizes + test_dir : str. optional + Path to the testing data files Required to instantiate the testing set + sizes : list(int), optional + A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0 + By default, each process gets an equal amount. """ - self.x = x - self.y = y + self.rank = __conditional_value__(rank, '', 0) + self.n_procs = __conditional_value__(n_procs, '', 1) + self.train_dir = __conditional_value__(train_dir, '', None) + self.test_dir = __conditional_value__(test_dir, '', None) + self.sizes = __conditional_value__(sizes, '', None) + if self.sizes: + if type(self.sizes) == str: + self.sizes = eval(self.sizes) + - def __get_item__(self, i): + if train_dir: + self.__training__ = True + else: + self.__training__ = False + + if test_dir: + self.__testing__ = True + else: + self.__testing__ = False + + def get_trainset(self): """ - Function to get the item with index i. - Parameters - ---------- - i : int - Index + Function to get the training set + Returns + ------- + Dataset + Raises + ------ + RuntimeError + If the training set was not initialized + """ + raise NotImplementedError + + def get_testset(self): + """ + Function to get the test set Returns ------- - 2-tuple - A tuple of the ith data sample and it's corresponding label + Dataset + Raises + ------ + RuntimeError + If the test set was not initialized """ - return self.x[i], self.y[i] + raise NotImplementedError diff --git a/src/decentralizepy/datasets/Femnist.py b/src/decentralizepy/datasets/Femnist.py index 992941460b9a88bccb066403c2ccea228583d268..a01680fd001d1f99d93072b45d33b4dda7c1624e 100644 --- a/src/decentralizepy/datasets/Femnist.py +++ b/src/decentralizepy/datasets/Femnist.py @@ -7,6 +7,7 @@ import torch import torch.nn.functional as F from torch import nn +from decentralizepy.datasets.Data import Data from decentralizepy.datasets.Dataset import Dataset from decentralizepy.datasets.Partitioner import DataPartitioner @@ -45,41 +46,41 @@ def __read_dir__(data_dir): return clients, num_samples, data -class Femnist: +class Femnist(Dataset): """ Class for the FEMNIST dataset """ - def __init__(self, train_dir=None, test_dir=None, rank=0, n_procs=1, sizes=None): + def __init__(self, rank='', n_procs='', train_dir='', test_dir='', sizes=''): """ Constructor which reads the data files, instantiates and partitions the dataset Parameters ---------- + rank : int, optional + Rank of the current process (to get the partition). Default value is assigned 0 + n_procs : int, optional + The number of processes among which to divide the data. Default value is assigned 1 train_dir : str, optional Path to the training data files. Required to instantiate the training set The training set is partitioned according to n_procs and sizes test_dir : str. optional Path to the testing data files Required to instantiate the testing set - rank : int, optional - Rank of the current process (to get the partition). Default value is 0 - n_procs : int, optional - The number of processes among which to divide the data sizes : list(int), optional - A list of fractions specifying how much data to alot each process. + A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0 By default, each process gets an equal amount. """ - if train_dir: - self.__training__ = True + super().__init__(rank, n_procs, train_dir, test_dir, sizes) + if self.__training__: clients, num_samples, train_data = __read_dir__(train_dir) c_len = len(self.clients) if sizes == None: # Equal distribution of data among processes - e = c_len // n_procs + e = c_len // self.n_procs frac = e / c_len - sizes = [frac] * n_procs - sizes[-1] += 1.0 - frac * n_procs + sizes = [frac] * self.n_procs + sizes[-1] += 1.0 - frac * self.n_procs - my_clients = DataPartitioner(clients, sizes).use(rank) + my_clients = DataPartitioner(clients, sizes).use(self.rank) my_train_data = [] self.clients = [] self.num_samples = [] @@ -96,8 +97,7 @@ class Femnist: my_train_data["y"], dtype=np.dtype("float64") ).reshape(-1, 1) - if test_dir: - self.__testing__ = True + if self.__testing__: _, _, test_data = __read_dir__(test_dir) test_data = test_data.values() self.test_x = np.array(test_data["x"], dtype=np.dtype("float64")).reshape( @@ -154,7 +154,7 @@ class Femnist: If the training set was not initialized """ if self.__training__: - return Dataset(self.train_x, self.train_y) + return Data(self.train_x, self.train_y) raise RuntimeError("Training set not initialized!") def get_testset(self): @@ -169,7 +169,7 @@ class Femnist: If the test set was not initialized """ if self.__testing__: - return Dataset(self.test_x, self.test_y) + return Data(self.test_x, self.test_y) raise RuntimeError("Test set not initialized!") diff --git a/src/decentralizepy/node/Node.py b/src/decentralizepy/node/Node.py index 24430502dac5aa69e04fcd150e4e5ce76e3a74cf..ed02982a1fcb61d752ae44e57cda95549169cfbd 100644 --- a/src/decentralizepy/node/Node.py +++ b/src/decentralizepy/node/Node.py @@ -1,37 +1,86 @@ +from torch import utils, optim +import importlib + + +def __remove_keys__(d, keys_to_remove): + return {key: d[key] for key in d if key not in keys_to_remove} + class Node: """ This class defines the node (entity that performs learning, sharing and communication). """ - def __init__(self, rank, mapping, graph, options): + def __init__(self, rank, mapping, graph, config, *args): """ Constructor Parameters ---------- rank : int Rank of process local to the machine + n_procs_local : int + Number of processes on current machine mapping : decentralizepy.mappings The object containing the mapping rank <--> uid graph : decentralizepy.graphs The object containing the global graph - dataset : decentralizepy.datasets class - The class whose object will be instantiated to init the dataset + config : dict + A dictionary of configurations. Must contain the following: + [DATASET] + dataset_package + dataset_class + model_class + [OPTIMIZER_PARAMS] + optimizer_package + optimizer_class + args : optional + Other arguments """ self.rank = rank self.graph = graph self.mapping = mapping - self.options = options + + dataset_configs = dict(config.items("DATASET")) + dataset_module = importlib.import_module(dataset_configs["dataset_package"]) + dataset_class = getattr(dataset_module, dataset_configs["dataset_class"]) + + dataset_params = __remove_keys__(dataset_configs, ["dataset_package", "dataset_class", "model_class"]) + self.dataset = dataset_class(rank, **dataset_params) + self.trainset = self.dataset.get_trainset() + + model_class = getattr(dataset_module, dataset_configs["model_class"]) + self.model = model_class() + + optimizer_configs = dict(config.items("OPTIMIZER_PARAMS")) + optimizer_module = importlib.import_module(optimizer_configs["optimizer_package"]) + optimizer_class = getattr(optimizer_module, optimizer_configs["optimizer_class"]) + + optimizer_params = __remove_keys__(optimizer_configs, ["optimizer_package", "optimizer_class"]) + self.optimizer = optimizer_class(self.model.parameters(), **optimizer_params) - def __get_item__(self, i): + self.run() + + def train_step(self): """ - Function to get the item with index i. - Parameters - ---------- - i : int - Index - Returns - ------- - 2-tuple - A tuple of the ith data sample and it's corresponding label + The training step + """ + for epoch in self.epochs_per_round: # Instantiate this variable + for data, target in self.trainset: + # Perform training step + raise NotImplementedError + + + + + def run(self): + """ + The learning loop. """ - return self.x[i], self.y[i] + while True: + # Receive data + + # Learn + + + # Send data + raise NotImplementedError +