From adcd9b058aed195b8b73d5eebfccc40e4e79959c Mon Sep 17 00:00:00 2001
From: Rishi Sharma <rishi.sharma@epfl.ch>
Date: Fri, 12 Nov 2021 19:59:20 +0100
Subject: [PATCH] One node training, README

---
 README.rst                                    | 31 ++++++++++++++-
 config.ini                                    | 13 ++++++-
 .../communication/Communication.py            |  6 +++
 src/decentralizepy/datasets/Data.py           |  3 ++
 src/decentralizepy/datasets/Dataset.py        |  4 +-
 src/decentralizepy/datasets/Femnist.py        | 28 ++++++++------
 src/decentralizepy/node/Node.py               | 22 +++++++----
 src/decentralizepy/sharing/Sharing.py         |  6 +++
 src/decentralizepy/training/Training.py       | 38 ++++++++++++++-----
 9 files changed, 119 insertions(+), 32 deletions(-)
 create mode 100644 src/decentralizepy/communication/Communication.py
 create mode 100644 src/decentralizepy/sharing/Sharing.py

diff --git a/README.rst b/README.rst
index 4dfd440..4e186ab 100644
--- a/README.rst
+++ b/README.rst
@@ -1 +1,30 @@
-# decentralizepy
\ No newline at end of file
+decentralizepy
+==============
+
+Node
+----
+* The Manager. Optimizations at process level.
+
+Dataset
+-------
+* Static
+
+Training
+--------
+* Heterogeneity. How much do I want to work?
+
+Graph
+-----
+* Static. Who are my neighbours? Topologies.
+
+Mapping
+-------
+* Naming. The globally unique ids of the processes <-> machine_id, local_rank
+
+Sharing
+-------
+* Leverage Redundancy. Privacy. Optimizations in model and data sharing.
+
+Communication
+-------------
+* IPC/Network level. Compression. Privacy. Reliability
diff --git a/config.ini b/config.ini
index 2b8d4ff..0b04725 100644
--- a/config.ini
+++ b/config.ini
@@ -7,7 +7,7 @@ dataset_package = decentralizepy.datasets.Femnist
 dataset_class = Femnist
 model_class = LogisticRegression
 n_procs = 1.0
-train_dir = 
+train_dir = leaf/data/femnist/data/train
 test_dir = 
 ; python list of fractions below
 sizes = [0.4, 0.2, 0.3, 0.1]
@@ -22,3 +22,14 @@ training_package = decentralizepy.training.Training
 training_class = Training
 epochs_per_round = 25
 batch_size = 64
+shuffle = False
+loss_package = torch.nn.functional
+loss = nll_loss
+
+[COMMUNICATION]
+comm_package = decentralizepy.communication.Communication
+comm_class = Communcation
+
+[SHARING]
+sharing_package = decentralizepy.sharing.Sharing
+sharing_class = Sharing
\ No newline at end of file
diff --git a/src/decentralizepy/communication/Communication.py b/src/decentralizepy/communication/Communication.py
new file mode 100644
index 0000000..18f7b94
--- /dev/null
+++ b/src/decentralizepy/communication/Communication.py
@@ -0,0 +1,6 @@
+class Communication:
+    """
+    Communcation API
+    """
+    def __init__():
+        raise NotImplementedError
\ No newline at end of file
diff --git a/src/decentralizepy/datasets/Data.py b/src/decentralizepy/datasets/Data.py
index 4b3ac7b..7a9e515 100644
--- a/src/decentralizepy/datasets/Data.py
+++ b/src/decentralizepy/datasets/Data.py
@@ -16,6 +16,9 @@ class Data:
         self.x = x
         self.y = y
 
+    def __len__(self):
+        return self.y.shape[0]
+
     def __getitem__(self, i):
         """
         Function to get the item with index i.
diff --git a/src/decentralizepy/datasets/Dataset.py b/src/decentralizepy/datasets/Dataset.py
index 4761450..e4d176c 100644
--- a/src/decentralizepy/datasets/Dataset.py
+++ b/src/decentralizepy/datasets/Dataset.py
@@ -49,7 +49,7 @@ class Dataset:
         Function to get the training set
         Returns
         -------
-        Dataset
+        torch.utils.Dataset(decentralizepy.datasets.Data)
         Raises
         ------
         RuntimeError
@@ -62,7 +62,7 @@ class Dataset:
         Function to get the test set
         Returns
         -------
-        Dataset
+        torch.utils.Dataset(decentralizepy.datasets.Data)
         Raises
         ------
         RuntimeError
diff --git a/src/decentralizepy/datasets/Femnist.py b/src/decentralizepy/datasets/Femnist.py
index 41f3e1b..d1bbe31 100644
--- a/src/decentralizepy/datasets/Femnist.py
+++ b/src/decentralizepy/datasets/Femnist.py
@@ -7,6 +7,7 @@ import numpy as np
 import torch
 import torch.nn.functional as F
 from torch import nn
+from torch.utils.data import DataLoader
 
 from decentralizepy.datasets.Data import Data
 from decentralizepy.datasets.Dataset import Dataset
@@ -15,6 +16,7 @@ from decentralizepy.datasets.Partitioner import DataPartitioner
 NUM_CLASSES = 62
 IMAGE_SIZE = (28, 28)
 FLAT_SIZE = 28 * 28
+PIXEL_RANGE = 256.0
 
 
 class Femnist(Dataset):
@@ -81,7 +83,7 @@ class Femnist(Dataset):
                 frac = e / c_len
                 self.sizes = [frac] * self.n_procs
                 self.sizes[-1] += 1.0 - frac * self.n_procs
-                print(self.sizes)
+                logging.debug("Size fractions: {}".format(sizes))
 
             my_clients = DataPartitioner(clients, self.sizes).use(self.rank)
             my_train_data = {"x": [], "y": []}
@@ -96,13 +98,13 @@ class Femnist(Dataset):
                 my_train_data["y"].extend(train_data[cur_client]["y"])
                 self.num_samples.append(len(train_data[cur_client]["y"]))
             self.train_x = (
-                np.array(my_train_data["x"], dtype=np.dtype("float64"))
+                np.array(my_train_data["x"], dtype=np.dtype("float32"))
                 .reshape(-1, 28, 28, 1)
                 .transpose(0, 3, 1, 2)
             )
             self.train_y = np.array(
-                my_train_data["y"], dtype=np.dtype("float64")
-            ).reshape(-1, 1)
+                my_train_data["y"], dtype=np.dtype("int64")
+            ).reshape(-1)
             logging.debug("train_x.shape: %s", str(self.train_x.shape))
             logging.debug("train_y.shape: %s", str(self.train_y.shape))
 
@@ -111,12 +113,12 @@ class Femnist(Dataset):
             _, _, test_data = self.__read_dir__(self.test_dir)
             test_data = test_data.values()
             self.test_x = (
-                np.array(test_data["x"], dtype=np.dtype("float64"))
+                np.array(test_data["x"], dtype=np.dtype("float32"))
                 .reshape(-1, 28, 28, 1)
                 .transpose(0, 3, 1, 2)
             )
-            self.test_y = np.array(test_data["y"], dtype=np.dtype("float64")).reshape(
-                -1, 1
+            self.test_y = np.array(test_data["y"], dtype=np.dtype("int64")).reshape(
+                -1
             )
             logging.debug("test_x.shape: %s", str(self.test_x.shape))
             logging.debug("test_y.shape: %s", str(self.test_y.shape))
@@ -156,19 +158,23 @@ class Femnist(Dataset):
 
         raise IndexError("i is out of bounds!")
 
-    def get_trainset(self):
+    def get_trainset(self, batch_size, shuffle = False):
         """
         Function to get the training set
+        Parameters
+        ----------
+        batch_size : int
+            Batch size for learning
         Returns
         -------
-        Dataset
+        torch.utils.Dataset(decentralizepy.datasets.Data)
         Raises
         ------
         RuntimeError
             If the training set was not initialized
         """
         if self.__training__:
-            return Data(self.train_x, self.train_y)
+            return DataLoader(Data(self.train_x, self.train_y), batch_size = batch_size, shuffle = shuffle)
         raise RuntimeError("Training set not initialized!")
 
     def get_testset(self):
@@ -176,7 +182,7 @@ class Femnist(Dataset):
         Function to get the test set
         Returns
         -------
-        Dataset
+        torch.utils.Dataset(decentralizepy.datasets.Data)
         Raises
         ------
         RuntimeError
diff --git a/src/decentralizepy/node/Node.py b/src/decentralizepy/node/Node.py
index 7f203c0..ee8ff3e 100644
--- a/src/decentralizepy/node/Node.py
+++ b/src/decentralizepy/node/Node.py
@@ -17,6 +17,7 @@ class Node:
     def __init__(
         self,
         rank: int,
+        machine_id: int,
         mapping: Mapping,
         graph: Graph,
         config,
@@ -31,6 +32,8 @@ class Node:
         ----------
         rank : int
             Rank of process local to the machine
+        machine_id : int
+            Machine ID on which the process in running
         n_procs_local : int
             Number of processes on current machine
         mapping : decentralizepy.mappings
@@ -59,7 +62,6 @@ class Node:
             Other arguments
         """
         log_file = os.path.join(log_dir, str(rank) + ".log")
-        print(log_file)
         logging.basicConfig(
             filename=log_file,
             format="[%(asctime)s][%(module)s][%(levelname)s] %(message)s",
@@ -77,29 +79,33 @@ class Node:
         logging.debug("type(graph): %s", str(type(self.rank)))
         logging.debug("type(mapping): %s", str(type(self.mapping)))
         
-        dataset_configs = dict(config.items("DATASET"))
+        dataset_configs = config["DATASET"]
         dataset_module = importlib.import_module(dataset_configs["dataset_package"])
         dataset_class = getattr(dataset_module, dataset_configs["dataset_class"])
         dataset_params = utils.remove_keys(dataset_configs, ["dataset_package", "dataset_class", "model_class"])
         self.dataset =  dataset_class(rank, **dataset_params)
-        self.trainset = self.dataset.get_trainset()
 
         logging.info("Dataset instantiation complete.")
 
         model_class = getattr(dataset_module, dataset_configs["model_class"])
         self.model = model_class()
 
-        optimizer_configs = dict(config.items("OPTIMIZER_PARAMS"))
+        optimizer_configs = config["OPTIMIZER_PARAMS"]
         optimizer_module = importlib.import_module(optimizer_configs["optimizer_package"])
         optimizer_class = getattr(optimizer_module, optimizer_configs["optimizer_class"])
         optimizer_params = utils.remove_keys(optimizer_configs, ["optimizer_package", "optimizer_class"])
         self.optimizer = optimizer_class(self.model.parameters(), **optimizer_params)
 
-        train_configs = dict(config.items("TRAIN_PARAMS"))
+        train_configs = config["TRAIN_PARAMS"]
         train_module = importlib.import_module(train_configs["training_package"])
         train_class = getattr(train_module, train_configs["training_class"])
-        train_params = utils.remove_keys(train_configs, ["training_package", "training_class"])
-        self.trainer = train_class(self.model, self.optimizer, **train_params)
+
+        loss_package = importlib.import_module(train_configs["loss_package"])
+        loss = getattr(loss_package, train_configs["loss"])
+
+        train_params = utils.remove_keys(train_configs, ["training_package", "training_class", "loss", "loss_package"])
+        self.trainer = train_class(self.model, self.optimizer, loss, **train_params)
 
         for iteration in range(iterations):
-            self.trainer.train(self.trainset)
\ No newline at end of file
+            logging.info("Starting training iteration: %d", iteration)
+            self.trainer.train(self.dataset)
\ No newline at end of file
diff --git a/src/decentralizepy/sharing/Sharing.py b/src/decentralizepy/sharing/Sharing.py
new file mode 100644
index 0000000..58261b6
--- /dev/null
+++ b/src/decentralizepy/sharing/Sharing.py
@@ -0,0 +1,6 @@
+class Sharing:
+    """
+    API defining who to share with and what, and what to do on receiving
+    """
+    def __init__():
+        raise NotImplementedError
\ No newline at end of file
diff --git a/src/decentralizepy/training/Training.py b/src/decentralizepy/training/Training.py
index 3662366..48a035e 100644
--- a/src/decentralizepy/training/Training.py
+++ b/src/decentralizepy/training/Training.py
@@ -1,29 +1,49 @@
-from torch.optim import SGD
-
+import torch
 from decentralizepy import utils
+import logging
 class Training:
     """
     This class implements the training module for a single node.
     """
-    def __init__(self, model, optimizer, epochs_per_round = "", batch_size = ""):
+    def __init__(self, model, optimizer, loss, epochs_per_round = "", batch_size = "", shuffle = ""):
         """
         Constructor
         Parameters
         ----------
-        epochs_per_round : int
+        model : torch.nn.Module
+            Neural Network for training
+        optimizer : torch.optim
+            Optimizer to learn parameters
+        loss : function
+            Loss function
+        epochs_per_round : int, optional
             Number of epochs per training call
-        batch_size : int
+        batch_size : int, optional
             Number of items to learn over, in one batch
         """
+        self.model = model
+        self.optimizer = optimizer
+        self.loss = loss
         self.epochs_per_round = utils.conditional_value(epochs_per_round, "", 1)
         self.batch_size = utils.conditional_value(batch_size, "", 1)
+        self.shuffle = utils.conditional_value(shuffle, "", False)
 
-    def train(self, trainset):
+    def train(self, dataset):
         """
         One training iteration
         Parameters
         ----------
-        trainset : decentralizepy.datasets.Data
-            The training dataset. Should implement __getitem__(i)
+        dataset : decentralizepy.datasets.Dataset
+            The training dataset. Should implement get_trainset(batch_size, shuffle)
         """
-        raise NotImplementedError
+        trainset = dataset.get_trainset(self.batch_size, self.shuffle)
+        for epoch in range(self.epochs_per_round):
+            epoch_loss = 0.0
+            for data, target in trainset:
+                self.model.zero_grad()
+                output = self.model(data)
+                loss_val = self.loss(output, target)
+                epoch_loss += loss_val.item()
+                loss_val.backward()
+                self.optimizer.step()
+            logging.info("Epoch_loss: %d", epoch_loss)
-- 
GitLab