From 46a35ec19c228680778489b2ec1b5026014358f4 Mon Sep 17 00:00:00 2001
From: Rishi Sharma <rishi.sharma@epfl.ch>
Date: Mon, 21 Mar 2022 13:38:20 +0100
Subject: [PATCH] Add CIFAR

---
 src/decentralizepy/datasets/CIFAR10.py     | 273 +++++++++++++++++++++
 src/decentralizepy/datasets/Partitioner.py |  28 +++
 2 files changed, 301 insertions(+)
 create mode 100644 src/decentralizepy/datasets/CIFAR10.py

diff --git a/src/decentralizepy/datasets/CIFAR10.py b/src/decentralizepy/datasets/CIFAR10.py
new file mode 100644
index 0000000..5de71cb
--- /dev/null
+++ b/src/decentralizepy/datasets/CIFAR10.py
@@ -0,0 +1,273 @@
+import logging
+import os
+
+import numpy as np
+import torch
+import torchvision
+import torchvision.transforms as transforms
+import torch.nn.functional as F
+from torch import nn
+from torch.utils.data import DataLoader
+
+from decentralizepy.datasets.Data import Data
+from decentralizepy.datasets.Dataset import Dataset
+from decentralizepy.datasets.Partitioner import DataPartitioner, SimpleDataPartitioner
+from decentralizepy.mappings.Mapping import Mapping
+from decentralizepy.models.Model import Model
+
+NUM_CLASSES = 10
+
+class CIFAR10(Dataset):
+    """
+    Class for the FEMNIST dataset
+
+    """
+
+    def load_trainset(self):
+        """
+        Loads the training set. Partitions it if needed.
+
+        """
+        logging.info("Loading training set.")
+        trainset = torchvision.datasets.CIFAR10(root=self.train_dir, train=True,
+                                        download=True, transform=self.transform)
+        c_len = len(trainset)
+
+
+        if self.sizes == None:  # Equal distribution of data among processes
+            e = c_len // self.n_procs
+            frac = e / c_len
+            self.sizes = [frac] * self.n_procs
+            self.sizes[-1] += 1.0 - frac * self.n_procs
+            logging.debug("Size fractions: {}".format(self.sizes))
+
+        self.uid = self.mapping.get_uid(self.rank, self.machine_id)
+
+        if not self.partition_niid:
+            self.trainset = DataPartitioner(trainset, self.sizes).use(self.uid)
+        else:        
+            train_data = {key: [] for key in range(10)}
+            for x, y in trainset:
+                train_data[y].append(x)
+            all_trainset = []
+            for y, x in train_data.items():
+                all_trainset.extend([(a, y) for a in x])
+            self.trainset = SimpleDataPartitioner(all_trainset, self.sizes).use(self.uid)
+
+    def load_testset(self):
+        """
+        Loads the testing set.
+
+        """
+        logging.info("Loading testing set.")
+        
+        self.testset = torchvision.datasets.CIFAR10(root=self.test_dir, train=False,
+                                       download=True, transform=self.transform)
+        
+
+    def __init__(
+        self,
+        rank: int,
+        machine_id: int,
+        mapping: Mapping,
+        n_procs="",
+        train_dir="",
+        test_dir="",
+        sizes="",
+        test_batch_size=1024,
+        partition_niid=False
+    ):
+        """
+        Constructor which reads the data files, instantiates and partitions the dataset
+
+        Parameters
+        ----------
+        rank : int
+            Rank of the current process (to get the partition).
+        machine_id : int
+            Machine ID
+        mapping : decentralizepy.mappings.Mapping
+            Mapping to convert rank, machine_id -> uid for data partitioning
+            It also provides the total number of global processes
+        train_dir : str, optional
+            Path to the training data files. Required to instantiate the training set
+            The training set is partitioned according to the number of global processes and sizes
+        test_dir : str. optional
+            Path to the testing data files Required to instantiate the testing set
+        sizes : list(int), optional
+            A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0
+            By default, each process gets an equal amount.
+        test_batch_size : int, optional
+            Batch size during testing. Default value is 64
+        partition_niid: bool, optional
+            When True, partitions dataset in a non-iid way
+
+        """
+        super().__init__(
+            rank,
+            machine_id,
+            mapping,
+            train_dir,
+            test_dir,
+            sizes,
+            test_batch_size,
+        )
+
+        self.partition_niid = partition_niid
+        self.transform = transforms.Compose(
+            [transforms.ToTensor(),
+             transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
+
+        if self.__training__:
+            self.load_trainset()
+
+        if self.__testing__:
+            self.load_testset()
+
+        # TODO: Add Validation
+
+    def get_trainset(self, batch_size=1, shuffle=False):
+        """
+        Function to get the training set
+
+        Parameters
+        ----------
+        batch_size : int, optional
+            Batch size for learning
+
+        Returns
+        -------
+        torch.utils.Dataset(decentralizepy.datasets.Data)
+
+        Raises
+        ------
+        RuntimeError
+            If the training set was not initialized
+
+        """
+        if self.__training__:
+            return DataLoader(
+                self.trainset, batch_size=batch_size, shuffle=shuffle
+            )
+        raise RuntimeError("Training set not initialized!")
+
+    def get_testset(self):
+        """
+        Function to get the test set
+
+        Returns
+        -------
+        torch.utils.Dataset(decentralizepy.datasets.Data)
+
+        Raises
+        ------
+        RuntimeError
+            If the test set was not initialized
+
+        """
+        if self.__testing__:
+            return DataLoader(
+                self.testset , batch_size=self.test_batch_size
+            )
+        raise RuntimeError("Test set not initialized!")
+
+    def test(self, model, loss):
+        """
+        Function to evaluate model on the test dataset.
+
+        Parameters
+        ----------
+        model : decentralizepy.models.Model
+            Model to evaluate
+        loss : torch.nn.loss
+            Loss function to evaluate
+
+        Returns
+        -------
+        tuple
+            (accuracy, loss_value)
+
+        """
+        testloader = self.get_testset()
+
+        logging.debug("Test Loader instantiated.")
+
+        correct_pred = [0 for _ in range(NUM_CLASSES)]
+        total_pred = [0 for _ in range(NUM_CLASSES)]
+
+        total_correct = 0
+        total_predicted = 0
+
+        with torch.no_grad():
+            loss_val = 0.0
+            count = 0
+            for elems, labels in testloader:
+                outputs = model(elems)
+                loss_val += loss(outputs, labels).item()
+                count += 1
+                _, predictions = torch.max(outputs, 1)
+                for label, prediction in zip(labels, predictions):
+                    logging.debug("{} predicted as {}".format(label, prediction))
+                    if label == prediction:
+                        correct_pred[label] += 1
+                        total_correct += 1
+                    total_pred[label] += 1
+                    total_predicted += 1
+
+        logging.debug("Predicted on the test set")
+
+        for key, value in enumerate(correct_pred):
+            if total_pred[key] != 0:
+                accuracy = 100 * float(value) / total_pred[key]
+            else:
+                accuracy = 100.0
+            logging.debug("Accuracy for class {} is: {:.1f} %".format(key, accuracy))
+
+        accuracy = 100 * float(total_correct) / total_predicted
+        loss_val = loss_val / count
+        logging.info("Overall accuracy is: {:.1f} %".format(accuracy))
+        return accuracy, loss_val
+
+class CNN(Model):
+    """
+    Class for a CNN Model for CIFAR10
+
+    """
+
+    def __init__(self):
+        """
+        Constructor. Instantiates the CNN Model
+            with 10 output classes
+
+        """
+        super().__init__()
+        # 1.6 million params
+        self.conv1 = nn.Conv2d(3, 6, 5)
+        self.pool = nn.MaxPool2d(2, 2)
+        self.conv2 = nn.Conv2d(6, 16, 5)
+        self.fc1 = nn.Linear(16 * 5 * 5, 120)
+        self.fc2 = nn.Linear(120, 84)
+        self.fc3 = nn.Linear(84, NUM_CLASSES)
+
+    def forward(self, x):
+        """
+        Forward pass of the model
+
+        Parameters
+        ----------
+        x : torch.tensor
+            The input torch tensor
+
+        Returns
+        -------
+        torch.tensor
+            The output torch tensor
+
+        """
+        x = self.pool(F.relu(self.conv1(x)))
+        x = self.pool(F.relu(self.conv2(x)))
+        x = torch.flatten(x, 1)
+        x = F.relu(self.fc1(x))
+        x = F.relu(self.fc2(x))
+        x = self.fc3(x)
+        return x
diff --git a/src/decentralizepy/datasets/Partitioner.py b/src/decentralizepy/datasets/Partitioner.py
index a7b83b3..80c3a80 100644
--- a/src/decentralizepy/datasets/Partitioner.py
+++ b/src/decentralizepy/datasets/Partitioner.py
@@ -102,3 +102,31 @@ class DataPartitioner(object):
 
         """
         return Partition(self.data, self.partitions[rank])
+
+class SimpleDataPartitioner(DataPartitioner):
+    """
+    Class to partition the dataset
+
+    """
+
+    def __init__(self, data, sizes=[1.0]):
+        """
+        Constructor. Partitions the data according the parameters
+
+        Parameters
+        ----------
+        data : indexable
+            An indexable list of data items
+        sizes : list(float)
+            A list of fractions for each process
+
+        """
+        self.data = data
+        self.partitions = []
+        data_len = len(data)
+        indexes = [x for x in range(0, data_len)]
+
+        for frac in sizes:
+            part_len = int(frac * data_len)
+            self.partitions.append(indexes[0:part_len])
+            indexes = indexes[part_len:]
-- 
GitLab