diff --git a/src/decentralizepy/datasets/CIFAR10.py b/src/decentralizepy/datasets/CIFAR10.py new file mode 100644 index 0000000000000000000000000000000000000000..5de71cbb1775699e1bd233932d549b2cd8aa23a2 --- /dev/null +++ b/src/decentralizepy/datasets/CIFAR10.py @@ -0,0 +1,273 @@ +import logging +import os + +import numpy as np +import torch +import torchvision +import torchvision.transforms as transforms +import torch.nn.functional as F +from torch import nn +from torch.utils.data import DataLoader + +from decentralizepy.datasets.Data import Data +from decentralizepy.datasets.Dataset import Dataset +from decentralizepy.datasets.Partitioner import DataPartitioner, SimpleDataPartitioner +from decentralizepy.mappings.Mapping import Mapping +from decentralizepy.models.Model import Model + +NUM_CLASSES = 10 + +class CIFAR10(Dataset): + """ + Class for the FEMNIST dataset + + """ + + def load_trainset(self): + """ + Loads the training set. Partitions it if needed. + + """ + logging.info("Loading training set.") + trainset = torchvision.datasets.CIFAR10(root=self.train_dir, train=True, + download=True, transform=self.transform) + c_len = len(trainset) + + + if self.sizes == None: # Equal distribution of data among processes + e = c_len // self.n_procs + frac = e / c_len + self.sizes = [frac] * self.n_procs + self.sizes[-1] += 1.0 - frac * self.n_procs + logging.debug("Size fractions: {}".format(self.sizes)) + + self.uid = self.mapping.get_uid(self.rank, self.machine_id) + + if not self.partition_niid: + self.trainset = DataPartitioner(trainset, self.sizes).use(self.uid) + else: + train_data = {key: [] for key in range(10)} + for x, y in trainset: + train_data[y].append(x) + all_trainset = [] + for y, x in train_data.items(): + all_trainset.extend([(a, y) for a in x]) + self.trainset = SimpleDataPartitioner(all_trainset, self.sizes).use(self.uid) + + def load_testset(self): + """ + Loads the testing set. + + """ + logging.info("Loading testing set.") + + self.testset = torchvision.datasets.CIFAR10(root=self.test_dir, train=False, + download=True, transform=self.transform) + + + def __init__( + self, + rank: int, + machine_id: int, + mapping: Mapping, + n_procs="", + train_dir="", + test_dir="", + sizes="", + test_batch_size=1024, + partition_niid=False + ): + """ + Constructor which reads the data files, instantiates and partitions the dataset + + Parameters + ---------- + rank : int + Rank of the current process (to get the partition). + machine_id : int + Machine ID + mapping : decentralizepy.mappings.Mapping + Mapping to convert rank, machine_id -> uid for data partitioning + It also provides the total number of global processes + train_dir : str, optional + Path to the training data files. Required to instantiate the training set + The training set is partitioned according to the number of global processes and sizes + test_dir : str. optional + Path to the testing data files Required to instantiate the testing set + sizes : list(int), optional + A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0 + By default, each process gets an equal amount. + test_batch_size : int, optional + Batch size during testing. Default value is 64 + partition_niid: bool, optional + When True, partitions dataset in a non-iid way + + """ + super().__init__( + rank, + machine_id, + mapping, + train_dir, + test_dir, + sizes, + test_batch_size, + ) + + self.partition_niid = partition_niid + self.transform = transforms.Compose( + [transforms.ToTensor(), + transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) + + if self.__training__: + self.load_trainset() + + if self.__testing__: + self.load_testset() + + # TODO: Add Validation + + def get_trainset(self, batch_size=1, shuffle=False): + """ + Function to get the training set + + Parameters + ---------- + batch_size : int, optional + Batch size for learning + + Returns + ------- + torch.utils.Dataset(decentralizepy.datasets.Data) + + Raises + ------ + RuntimeError + If the training set was not initialized + + """ + if self.__training__: + return DataLoader( + self.trainset, batch_size=batch_size, shuffle=shuffle + ) + raise RuntimeError("Training set not initialized!") + + def get_testset(self): + """ + Function to get the test set + + Returns + ------- + torch.utils.Dataset(decentralizepy.datasets.Data) + + Raises + ------ + RuntimeError + If the test set was not initialized + + """ + if self.__testing__: + return DataLoader( + self.testset , batch_size=self.test_batch_size + ) + raise RuntimeError("Test set not initialized!") + + def test(self, model, loss): + """ + Function to evaluate model on the test dataset. + + Parameters + ---------- + model : decentralizepy.models.Model + Model to evaluate + loss : torch.nn.loss + Loss function to evaluate + + Returns + ------- + tuple + (accuracy, loss_value) + + """ + testloader = self.get_testset() + + logging.debug("Test Loader instantiated.") + + correct_pred = [0 for _ in range(NUM_CLASSES)] + total_pred = [0 for _ in range(NUM_CLASSES)] + + total_correct = 0 + total_predicted = 0 + + with torch.no_grad(): + loss_val = 0.0 + count = 0 + for elems, labels in testloader: + outputs = model(elems) + loss_val += loss(outputs, labels).item() + count += 1 + _, predictions = torch.max(outputs, 1) + for label, prediction in zip(labels, predictions): + logging.debug("{} predicted as {}".format(label, prediction)) + if label == prediction: + correct_pred[label] += 1 + total_correct += 1 + total_pred[label] += 1 + total_predicted += 1 + + logging.debug("Predicted on the test set") + + for key, value in enumerate(correct_pred): + if total_pred[key] != 0: + accuracy = 100 * float(value) / total_pred[key] + else: + accuracy = 100.0 + logging.debug("Accuracy for class {} is: {:.1f} %".format(key, accuracy)) + + accuracy = 100 * float(total_correct) / total_predicted + loss_val = loss_val / count + logging.info("Overall accuracy is: {:.1f} %".format(accuracy)) + return accuracy, loss_val + +class CNN(Model): + """ + Class for a CNN Model for CIFAR10 + + """ + + def __init__(self): + """ + Constructor. Instantiates the CNN Model + with 10 output classes + + """ + super().__init__() + # 1.6 million params + self.conv1 = nn.Conv2d(3, 6, 5) + self.pool = nn.MaxPool2d(2, 2) + self.conv2 = nn.Conv2d(6, 16, 5) + self.fc1 = nn.Linear(16 * 5 * 5, 120) + self.fc2 = nn.Linear(120, 84) + self.fc3 = nn.Linear(84, NUM_CLASSES) + + def forward(self, x): + """ + Forward pass of the model + + Parameters + ---------- + x : torch.tensor + The input torch tensor + + Returns + ------- + torch.tensor + The output torch tensor + + """ + x = self.pool(F.relu(self.conv1(x))) + x = self.pool(F.relu(self.conv2(x))) + x = torch.flatten(x, 1) + x = F.relu(self.fc1(x)) + x = F.relu(self.fc2(x)) + x = self.fc3(x) + return x diff --git a/src/decentralizepy/datasets/Partitioner.py b/src/decentralizepy/datasets/Partitioner.py index a7b83b37402b815fb578832c0f3c809b6e774d95..80c3a802b1b4eff12ab8d3c7bcda83b8d45a5e70 100644 --- a/src/decentralizepy/datasets/Partitioner.py +++ b/src/decentralizepy/datasets/Partitioner.py @@ -102,3 +102,31 @@ class DataPartitioner(object): """ return Partition(self.data, self.partitions[rank]) + +class SimpleDataPartitioner(DataPartitioner): + """ + Class to partition the dataset + + """ + + def __init__(self, data, sizes=[1.0]): + """ + Constructor. Partitions the data according the parameters + + Parameters + ---------- + data : indexable + An indexable list of data items + sizes : list(float) + A list of fractions for each process + + """ + self.data = data + self.partitions = [] + data_len = len(data) + indexes = [x for x in range(0, data_len)] + + for frac in sizes: + part_len = int(frac * data_len) + self.partitions.append(indexes[0:part_len]) + indexes = indexes[part_len:]