diff --git a/README.rst b/README.rst index 4e186ab2b2cec548e0ea0b54a9a4aa8bf9bfa512..2113ef8bc7ae93cf7798b7b74fea3fdb115fd781 100644 --- a/README.rst +++ b/README.rst @@ -1,6 +1,21 @@ +============== decentralizepy ============== +------------------------- +Setting up decentralizepy +------------------------- + +* Fork the repository. +* Clone and enter your local repository. +* Check if you have python >= 3.8. +* (Optional) Create and activate a virtual environment. +* Update pip:: + pip3 install --upgrade pip + pip install --upgrade pip +* Install decentralizepy for development:: + pip3 install --editable .\[dev\] + Node ---- * The Manager. Optimizations at process level. @@ -28,3 +43,7 @@ Sharing Communication ------------- * IPC/Network level. Compression. Privacy. Reliability + +Model +----- +* Learning Model diff --git a/eval/run.sh b/eval/run.sh new file mode 100644 index 0000000000000000000000000000000000000000..6a56ad2c2e429372cdf15cbbae1cdd9ff4e40348 --- /dev/null +++ b/eval/run.sh @@ -0,0 +1,19 @@ +#!/bin/zsh +cd ~/Gitlab/decentralizepy/eval + + +m=`/sbin/ifconfig ens785 | grep 'inet ' | awk '{print $2}' | awk -v FS=. '{print $4}'` +m=`expr $m - 128` + +env_python=~/miniconda3/envs/decpy/bin/python3 +original_config=config_femnist_grow.ini +graph=96_nodes_random2.edges +config_file=/tmp/config.ini +procs_per_machine=16 +machines=6 +iterations=70 +test_after=2 +eval_file=testing.py +log_level=INFO + +$env_python $eval_file -mid $m -ps $procs_per_machine -ms $machines -is $iterations -gf $graph -ta $test_after -cf $original_config -ll $log_level diff --git a/src/decentralizepy/communication/Communication.py b/src/decentralizepy/communication/Communication.py index a8548983cb6d0bcf7e9f1d0840d4264da47e4164..518b83b58d5b86b1215803f4401bb95f25ad5d8d 100644 --- a/src/decentralizepy/communication/Communication.py +++ b/src/decentralizepy/communication/Communication.py @@ -1,9 +1,25 @@ class Communication: """ Communcation API + """ def __init__(self, rank, machine_id, mapping, total_procs): + """ + Constructor + + Parameters + ---------- + rank : int + Local rank of the process + machine_id : int + Machine id of the process + mapping : decentralizepy.mappings.Mapping + uid, rank, machine_id invertible mapping + total_procs : int + Total number of processes + + """ self.total_procs = total_procs self.rank = rank self.machine_id = machine_id @@ -12,19 +28,82 @@ class Communication: self.total_bytes = 0 def encrypt(self, data): + """ + Encode/Encrypt data. + + Parameters + ---------- + data : dict + Data dict to send + + Returns + ------- + byte + Encoded data + + """ raise NotImplementedError def decrypt(self, sender, data): + """ + Decodes received data. + + Parameters + ---------- + sender : byte + sender of the data + data : byte + Data received + + Returns + ------- + tuple + (sender: int, data: dict) + + """ raise NotImplementedError def connect_neighbors(self, neighbors): + """ + Connects all neighbors. + + Parameters + ---------- + neighbors : list(int) + List of neighbors + + """ raise NotImplementedError def receive(self): + """ + Returns ONE message received. + + Returns + ---------- + dict + Received and decrypted data + + """ raise NotImplementedError def send(self, uid, data): + """ + Send a message to a process. + + Parameters + ---------- + uid : int + Neighbor's unique ID + data : dict + Message as a Python dictionary + + """ raise NotImplementedError def disconnect_neighbors(self): + """ + Disconnects all neighbors. + + """ raise NotImplementedError diff --git a/src/decentralizepy/communication/TCP.py b/src/decentralizepy/communication/TCP.py index 8c754fd0c92530713331d47671e692623c7f3045..54a1af8ae52c17a710d2815fad620515f20c089c 100644 --- a/src/decentralizepy/communication/TCP.py +++ b/src/decentralizepy/communication/TCP.py @@ -13,14 +13,47 @@ BYE = b"BYE" class TCP(Communication): """ TCP Communication API - """ + """ def addr(self, rank, machine_id): + """ + Returns TCP address of the process. + + Parameters + ---------- + rank : int + Local rank of the process + machine_id : int + Machine id of the process + + Returns + ------- + str + Full address of the process using TCP + + """ machine_addr = self.ip_addrs[str(machine_id)] port = rank + 20000 return "tcp://{}:{}".format(machine_addr, port) def __init__(self, rank, machine_id, mapping, total_procs, addresses_filepath): + """ + Constructor + + Parameters + ---------- + rank : int + Local rank of the process + machine_id : int + Machine id of the process + mapping : decentralizepy.mappings.Mapping + uid, rank, machine_id invertible mapping + total_procs : int + Total number of processes + addresses_filepath : str + JSON file with machine_id -> ip mapping + + """ super().__init__(rank, machine_id, mapping, total_procs) with open(addresses_filepath) as addrs: @@ -43,17 +76,66 @@ class TCP(Communication): self.barrier = set() def __del__(self): + """ + Destroys zmq context + + """ self.context.destroy(linger=0) def encrypt(self, data): + """ + Encode data using utf8. + + Parameters + ---------- + data : dict + Data dict to send + + Returns + ------- + byte + Encoded data + + """ return json.dumps(data).encode("utf8") def decrypt(self, sender, data): + """ + Decode received data from utf8. + + Parameters + ---------- + sender : byte + sender of the data + data : byte + Data received + + Returns + ------- + tuple + (sender: int, data: dict) + + """ sender = int(sender.decode()) data = json.loads(data.decode("utf8")) return sender, data def connect_neighbors(self, neighbors): + """ + Connects all neighbors. Sends HELLO. Waits for HELLO. + Caches any data received while waiting for HELLOs. + + Parameters + ---------- + neighbors : list(int) + List of neighbors + + Raises + ------ + RuntimeError + If received BYE while waiting for HELLO + + """ logging.info("Sending connection request to neighbors") for uid in neighbors: logging.debug("Connecting to my neighbour: {}".format(uid)) @@ -84,8 +166,24 @@ class TCP(Communication): self.peer_deque.append(self.decrypt(sender, recv)) logging.info("Connected to all neighbors") + self.initialized = True def receive(self): + """ + Returns ONE message received. + + Returns + ---------- + dict + Received and decrypted data + + Raises + ------ + RuntimeError + If received HELLO + + """ + assert self.initialized == True if len(self.peer_deque) != 0: resp = self.peer_deque.popleft() return resp @@ -106,6 +204,18 @@ class TCP(Communication): return self.decrypt(sender, recv) def send(self, uid, data): + """ + Send a message to a process. + + Parameters + ---------- + uid : int + Neighbor's unique ID + data : dict + Message as a Python dictionary + + """ + assert self.initialized == True to_send = self.encrypt(data) data_size = len(to_send) self.total_bytes += data_size @@ -115,6 +225,11 @@ class TCP(Communication): logging.info("Sent this round: {}".format(data_size)) def disconnect_neighbors(self): + """ + Disconnects all neighbors. + + """ + assert self.initialized == True if not self.sent_disconnections: logging.info("Disconnecting neighbors") for sock in self.peer_sockets.values(): diff --git a/src/decentralizepy/datasets/Celeba.py b/src/decentralizepy/datasets/Celeba.py index f648a7fa6c40419b9cbfd3631fdad471bfe5baae..0b43268394479e30f66b6814755c14e5dd0f8766 100644 --- a/src/decentralizepy/datasets/Celeba.py +++ b/src/decentralizepy/datasets/Celeba.py @@ -25,9 +25,24 @@ NUM_CLASSES = 2 class Celeba(Dataset): """ Class for the Celeba dataset + """ def __read_file__(self, file_path): + """ + Read data from the given json file + + Parameters + ---------- + file_path : str + The file path + + Returns + ------- + tuple + (users, num_samples, data) + + """ with open(file_path, "r") as inf: client_data = json.load(inf) return ( @@ -39,15 +54,18 @@ class Celeba(Dataset): def __read_dir__(self, data_dir): """ Function to read all the FEMNIST data files in the directory + Parameters ---------- data_dir : str Path to the folder containing the data files + Returns ------- 3-tuple A tuple containing list of clients, number of samples per client, and the data items per client + """ clients = [] num_samples = [] @@ -64,6 +82,17 @@ class Celeba(Dataset): return clients, num_samples, data def file_per_user(self, dir, write_dir): + """ + Function to read all the FEMNIST data files and write one file per user + + Parameters + ---------- + dir : str + Path to the folder containing the data files + write_dir : str + Path to the folder to write the files + + """ clients, num_samples, train_data = self.__read_dir__(dir) for index, client in enumerate(clients): my_data = dict() @@ -76,6 +105,10 @@ class Celeba(Dataset): print("Created File: ", client + ".json") def load_trainset(self): + """ + Loads the training set. Partitions it if needed. + + """ logging.info("Loading training set.") files = os.listdir(self.train_dir) files = [f for f in files if f.endswith(".json")] @@ -121,6 +154,10 @@ class Celeba(Dataset): assert self.train_x.shape[0] > 0 def load_testset(self): + """ + Loads the testing set. + + """ logging.info("Loading testing set.") _, _, d = self.__read_dir__(self.test_dir) test_x = [] @@ -153,10 +190,15 @@ class Celeba(Dataset): ): """ Constructor which reads the data files, instantiates and partitions the dataset + Parameters ---------- rank : int - Rank of the current process (to get the partition). Default value is assigned 0 + Rank of the current process (to get the partition). + machine_id : int + Machine ID + mapping : decentralizepy.mappings.Mapping + Mapping to conver rank, machine_id -> uid for data partitioning n_procs : int, optional The number of processes among which to divide the data. Default value is assigned 1 train_dir : str, optional @@ -167,6 +209,9 @@ class Celeba(Dataset): sizes : list(int), optional A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0 By default, each process gets an equal amount. + test_batch_size : int, optional + Batch size during testing. Default value is 64 + """ super().__init__( rank, @@ -190,11 +235,29 @@ class Celeba(Dataset): # TODO: Add Validation def process_x(self, raw_x_batch): + """ + Preprocesses the whole batch of images + + Returns + ------- + np.array + The images as a numpy array + + """ x_batch = [self._load_image(i) for i in raw_x_batch] x_batch = np.array(x_batch) return x_batch def _load_image(self, img_name): + """ + Open and load image. + + Returns + ------- + np.array + The image as a numpy array + + """ img = Image.open(os.path.join(self.IMAGES_DIR, img_name[:-4] + ".png")) img = img.resize((IMAGE_DIM, IMAGE_DIM)).convert("RGB") return np.array(img) @@ -202,28 +265,34 @@ class Celeba(Dataset): def get_client_ids(self): """ Function to retrieve all the clients of the current process + Returns ------- list(str) A list of strings of the client ids. + """ return self.clients def get_client_id(self, i): """ Function to get the client id of the ith sample + Parameters ---------- i : int Index of the sample + Returns ------- str Client ID + Raises ------ IndexError If the sample index is out of bounds + """ lb = 0 for j in range(len(self.clients)): @@ -235,17 +304,21 @@ class Celeba(Dataset): def get_trainset(self, batch_size=1, shuffle=False): """ Function to get the training set + Parameters ---------- batch_size : int, optional Batch size for learning + Returns ------- torch.utils.Dataset(decentralizepy.datasets.Data) + Raises ------ RuntimeError If the training set was not initialized + """ if self.__training__: return DataLoader( @@ -256,13 +329,16 @@ class Celeba(Dataset): def get_testset(self): """ Function to get the test set + Returns ------- torch.utils.Dataset(decentralizepy.datasets.Data) + Raises ------ RuntimeError If the test set was not initialized + """ if self.__testing__: return DataLoader( @@ -270,12 +346,23 @@ class Celeba(Dataset): ) raise RuntimeError("Test set not initialized!") - def imshow(self, img): - npimg = img.numpy() - plt.imshow(np.transpose(npimg, (1, 2, 0))) - plt.show() - def test(self, model, loss): + """ + Function to evaluate model on the test dataset. + + Parameters + ---------- + model : decentralizepy.models.Model + Model to evaluate + loss : torch.nn.loss + Loss function to evaluate + + Returns + ------- + tuple + (accuracy, loss_value) + + """ testloader = self.get_testset() logging.debug("Test Loader instantiated.") @@ -318,7 +405,16 @@ class Celeba(Dataset): class CNN(Model): + """ + Class for a CNN Model for Celeba + + """ def __init__(self): + """ + Constructor. Instantiates the CNN Model + with 84*84*3 Input and 2 output classes + + """ super().__init__() # 2.8k parameters self.conv1 = nn.Conv2d(CHANNELS, 32, 3, padding="same") @@ -329,6 +425,20 @@ class CNN(Model): self.fc1 = nn.Linear(5 * 5 * 32, NUM_CLASSES) def forward(self, x): + """ + Forward pass of the model + + Parameters + ---------- + x : torch.tensor + The input torch tensor + + Returns + ------- + torch.tensor + The output torch tensor + + """ x = F.relu(self.pool(self.conv1(x))) x = F.relu(self.pool(self.conv2(x))) x = F.relu(self.pool(self.conv3(x))) diff --git a/src/decentralizepy/datasets/Data.py b/src/decentralizepy/datasets/Data.py index 7a9e515ecd45c0562edf403c5ea30c3cf915fc09..d063c4191b5289da106af331db81ee2212c1c186 100644 --- a/src/decentralizepy/datasets/Data.py +++ b/src/decentralizepy/datasets/Data.py @@ -1,34 +1,49 @@ class Data: """ This class defines the API for Data. + """ def __init__(self, x, y): """ Constructor + Parameters ---------- x : numpy array A numpy array of data samples y : numpy array A numpy array of outputs corresponding to the sample + """ self.x = x self.y = y def __len__(self): + """ + Return the number of samples in the dataset + + Returns + ------- + int + Number of samples + + """ return self.y.shape[0] def __getitem__(self, i): """ Function to get the item with index i. + Parameters ---------- i : int Index + Returns ------- 2-tuple A tuple of the ith data sample and it's corresponding label + """ return self.x[i], self.y[i] diff --git a/src/decentralizepy/datasets/Dataset.py b/src/decentralizepy/datasets/Dataset.py index 033165c33e412ca36d6dec2b5577fc07eaa355cf..28c36e5af3f87ee9ebc89bdffabf5f0ddf819d1f 100644 --- a/src/decentralizepy/datasets/Dataset.py +++ b/src/decentralizepy/datasets/Dataset.py @@ -5,6 +5,7 @@ class Dataset: """ This class defines the Dataset API. All datasets must follow this API. + """ def __init__( @@ -20,10 +21,15 @@ class Dataset: ): """ Constructor which reads the data files, instantiates and partitions the dataset + Parameters ---------- - rank : int, optional - Rank of the current process (to get the partition). Default value is assigned 0 + rank : int + Rank of the current process (to get the partition). + machine_id : int + Machine ID + mapping : decentralizepy.mappings.Mapping + Mapping to conver rank, machine_id -> uid for data partitioning n_procs : int, optional The number of processes among which to divide the data. Default value is assigned 1 train_dir : str, optional @@ -34,6 +40,9 @@ class Dataset: sizes : list(int), optional A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0 By default, each process gets an equal amount. + test_batch_size : int, optional + Batch size during testing. Default value is 64 + """ self.rank = rank self.machine_id = machine_id @@ -60,25 +69,31 @@ class Dataset: def get_trainset(self): """ Function to get the training set + Returns ------- torch.utils.Dataset(decentralizepy.datasets.Data) + Raises ------ RuntimeError If the training set was not initialized + """ raise NotImplementedError def get_testset(self): """ Function to get the test set + Returns ------- torch.utils.Dataset(decentralizepy.datasets.Data) + Raises ------ RuntimeError If the test set was not initialized + """ raise NotImplementedError diff --git a/src/decentralizepy/datasets/Femnist.py b/src/decentralizepy/datasets/Femnist.py index ab33880b5d7e2a1ff7e0588e2c27bd0481bf9340..5d0dc27b356f52cdd705b65ccc6d7c6261401faa 100644 --- a/src/decentralizepy/datasets/Femnist.py +++ b/src/decentralizepy/datasets/Femnist.py @@ -24,9 +24,24 @@ PIXEL_RANGE = 256.0 class Femnist(Dataset): """ Class for the FEMNIST dataset + """ def __read_file__(self, file_path): + """ + Read data from the given json file + + Parameters + ---------- + file_path : str + The file path + + Returns + ------- + tuple + (users, num_samples, data) + + """ with open(file_path, "r") as inf: client_data = json.load(inf) return ( @@ -38,15 +53,18 @@ class Femnist(Dataset): def __read_dir__(self, data_dir): """ Function to read all the FEMNIST data files in the directory + Parameters ---------- data_dir : str Path to the folder containing the data files + Returns ------- 3-tuple A tuple containing list of clients, number of samples per client, and the data items per client + """ clients = [] num_samples = [] @@ -63,6 +81,17 @@ class Femnist(Dataset): return clients, num_samples, data def file_per_user(self, dir, write_dir): + """ + Function to read all the FEMNIST data files and write one file per user + + Parameters + ---------- + dir : str + Path to the folder containing the data files + write_dir : str + Path to the folder to write the files + + """ clients, num_samples, train_data = self.__read_dir__(dir) for index, client in enumerate(clients): my_data = dict() @@ -75,6 +104,10 @@ class Femnist(Dataset): print("Created File: ", client + ".json") def load_trainset(self): + """ + Loads the training set. Partitions it if needed. + + """ logging.info("Loading training set.") files = os.listdir(self.train_dir) files = [f for f in files if f.endswith(".json")] @@ -119,6 +152,10 @@ class Femnist(Dataset): assert self.train_x.shape[0] > 0 def load_testset(self): + """ + Loads the testing set. + + """ logging.info("Loading testing set.") _, _, d = self.__read_dir__(self.test_dir) test_x = [] @@ -152,10 +189,15 @@ class Femnist(Dataset): ): """ Constructor which reads the data files, instantiates and partitions the dataset + Parameters ---------- rank : int - Rank of the current process (to get the partition). Default value is assigned 0 + Rank of the current process (to get the partition). + machine_id : int + Machine ID + mapping : decentralizepy.mappings.Mapping + Mapping to conver rank, machine_id -> uid for data partitioning n_procs : int, optional The number of processes among which to divide the data. Default value is assigned 1 train_dir : str, optional @@ -166,6 +208,9 @@ class Femnist(Dataset): sizes : list(int), optional A list of fractions specifying how much data to alot each process. Sum of fractions should be 1.0 By default, each process gets an equal amount. + test_batch_size : int, optional + Batch size during testing. Default value is 64 + """ super().__init__( rank, @@ -189,28 +234,34 @@ class Femnist(Dataset): def get_client_ids(self): """ Function to retrieve all the clients of the current process + Returns ------- list(str) A list of strings of the client ids. + """ return self.clients def get_client_id(self, i): """ Function to get the client id of the ith sample + Parameters ---------- i : int Index of the sample + Returns ------- str Client ID + Raises ------ IndexError If the sample index is out of bounds + """ lb = 0 for j in range(len(self.clients)): @@ -222,17 +273,21 @@ class Femnist(Dataset): def get_trainset(self, batch_size=1, shuffle=False): """ Function to get the training set + Parameters ---------- batch_size : int, optional Batch size for learning + Returns ------- torch.utils.Dataset(decentralizepy.datasets.Data) + Raises ------ RuntimeError If the training set was not initialized + """ if self.__training__: return DataLoader( @@ -243,13 +298,16 @@ class Femnist(Dataset): def get_testset(self): """ Function to get the test set + Returns ------- torch.utils.Dataset(decentralizepy.datasets.Data) + Raises ------ RuntimeError If the test set was not initialized + """ if self.__testing__: return DataLoader( @@ -257,12 +315,23 @@ class Femnist(Dataset): ) raise RuntimeError("Test set not initialized!") - def imshow(self, img): - npimg = img.numpy() - plt.imshow(np.transpose(npimg, (1, 2, 0))) - plt.show() - def test(self, model, loss): + """ + Function to evaluate model on the test dataset. + + Parameters + ---------- + model : decentralizepy.models.Model + Model to evaluate + loss : torch.nn.loss + Loss function to evaluate + + Returns + ------- + tuple + (accuracy, loss_value) + + """ testloader = self.get_testset() logging.debug("Test Loader instantiated.") @@ -307,12 +376,14 @@ class Femnist(Dataset): class LogisticRegression(Model): """ Class for a Logistic Regression Neural Network for FEMNIST + """ def __init__(self): """ Constructor. Instantiates the Logistic Regression Model with 28*28 Input and 62 output classes + """ super().__init__() self.fc1 = nn.Linear(FLAT_SIZE, NUM_CLASSES) @@ -320,14 +391,17 @@ class LogisticRegression(Model): def forward(self, x): """ Forward pass of the model + Parameters ---------- x : torch.tensor The input torch tensor + Returns ------- torch.tensor The output torch tensor + """ x = torch.flatten(x, start_dim=1) x = self.fc1(x) @@ -335,7 +409,16 @@ class LogisticRegression(Model): class CNN(Model): + """ + Class for a CNN Model for FEMNIST + + """ def __init__(self): + """ + Constructor. Instantiates the CNN Model + with 28*28*1 Input and 62 output classes + + """ super().__init__() # 1.6 million params self.conv1 = nn.Conv2d(1, 32, 5, padding=2) @@ -345,6 +428,20 @@ class CNN(Model): self.fc2 = nn.Linear(512, NUM_CLASSES) def forward(self, x): + """ + Forward pass of the model + + Parameters + ---------- + x : torch.tensor + The input torch tensor + + Returns + ------- + torch.tensor + The output torch tensor + + """ x = self.pool(F.relu(self.conv1(x))) x = self.pool(F.relu(self.conv2(x))) x = torch.flatten(x, 1) diff --git a/src/decentralizepy/datasets/Partitioner.py b/src/decentralizepy/datasets/Partitioner.py index 6446986b6b513e6f8146097a5d618996f0daaab8..a7b83b37402b815fb578832c0f3c809b6e774d95 100644 --- a/src/decentralizepy/datasets/Partitioner.py +++ b/src/decentralizepy/datasets/Partitioner.py @@ -6,16 +6,19 @@ from random import Random class Partition(object): """ Class for holding the data partition + """ def __init__(self, data, index): """ Constructor. Caches the data and the indices + Parameters ---------- data : indexable index : list A list of indices + """ self.data = data self.index = index @@ -23,23 +26,28 @@ class Partition(object): def __len__(self): """ Function to retrieve the length + Returns ------- int Number of items in the data + """ return len(self.index) def __getitem__(self, index): """ Retrieves the item in data with the given index + Parameters ---------- index : int + Returns ------- Data The data sample with the given `index` in the dataset + """ data_idx = self.index[index] return self.data[data_idx] @@ -48,11 +56,13 @@ class Partition(object): class DataPartitioner(object): """ Class to partition the dataset + """ def __init__(self, data, sizes=[1.0], seed=1234): """ Constructor. Partitions the data according the parameters + Parameters ---------- data : indexable @@ -61,6 +71,7 @@ class DataPartitioner(object): A list of fractions for each process seed : int, optional Seed for generating a random subset + """ self.data = data self.partitions = [] @@ -78,13 +89,16 @@ class DataPartitioner(object): def use(self, rank): """ Get the partition for the process with the given `rank` + Parameters ---------- rank : int Rank of the current process + Returns ------- Partition The dataset partition of the current process + """ return Partition(self.data, self.partitions[rank]) diff --git a/src/decentralizepy/graphs/FullyConnected.py b/src/decentralizepy/graphs/FullyConnected.py index d235f7e5917e60b1a4290736af771793c27e070a..dcf33d850e21208c59e56198c28407502898a2d0 100644 --- a/src/decentralizepy/graphs/FullyConnected.py +++ b/src/decentralizepy/graphs/FullyConnected.py @@ -4,15 +4,18 @@ from decentralizepy.graphs.Graph import Graph class FullyConnected(Graph): """ The class for generating a Fully Connected Graph Topology + """ def __init__(self, n_procs): """ Constructor. Generates a Fully Connected graph + Parameters ---------- n_procs : int total number of nodes in the graph + """ super().__init__(n_procs) for node in range(n_procs): diff --git a/src/decentralizepy/graphs/Graph.py b/src/decentralizepy/graphs/Graph.py index 5f3ccf1c6d50faf1650571d040f99a905e7f5a98..3e20b639fd09a6d8f432417efd8eb737b1566604 100644 --- a/src/decentralizepy/graphs/Graph.py +++ b/src/decentralizepy/graphs/Graph.py @@ -7,10 +7,12 @@ class Graph: def __init__(self, n_procs=None): """ Constructor + Parameters ---------- n_procs : int, optional Number of processes in the graph, if already known + """ if n_procs != None: self.n_procs = n_procs @@ -19,24 +21,28 @@ class Graph: def __insert_adj__(self, node, neighbours): """ Inserts `neighbours` into the adjacency list of `node` + Parameters ---------- node : int The vertex in question neighbours : list(int) A list of neighbours of the `node` + """ self.adj_list[node].update(neighbours) def __insert_edge__(self, x, y): """ Inserts edge `x -> y` into the graph + Parameters ---------- x : int The source vertex y : int The destination vertex + """ self.adj_list[x].add(y) self.adj_list[y].add(x) @@ -44,21 +50,26 @@ class Graph: def read_graph_from_file(self, file, type="edges", force_connect=False): """ Reads the graph from a given file + Parameters ---------- file : str path to the file - type : `edges` or `adjacency` + type : str + `edges` or `adjacency` force_connect : bool, optional Should the graph be force-connected using a ring + Returns ------- int Number of processes, read from the first line of the file + Raises ------ ValueError If the type is not either `edges` or `adjacency` + """ with open(file, "r") as inf: @@ -85,6 +96,17 @@ class Graph: return self.n_procs def write_graph_to_file(self, file, type="edges"): + """ + Writes graph to file + + Parameters + ---------- + file : str + File path + type : str + One of {"edges", "adjacency"}. Writes the corresponding format. + + """ with open(file, "w") as of: of.write(str(self.n_procs) + "\n") if type == "edges": @@ -100,6 +122,7 @@ class Graph: def connect_graph(self): """ Connects the graph using a Ring + """ for node in range(self.n_procs): self.adj_list[node].add((node + 1) % self.n_procs) @@ -108,13 +131,16 @@ class Graph: def neighbors(self, uid): """ Gives the neighbors of a node + Parameters ---------- uid : int globally unique identifier of the node + Returns ------- set(int) a set of neighbours + """ return self.adj_list[uid] diff --git a/src/decentralizepy/graphs/Ring.py b/src/decentralizepy/graphs/Ring.py index c502cf330a4df4dea84715763d32647b5fe51d2b..47a8be2dda5203aff1f45dc3e91834dae7dc1f7c 100644 --- a/src/decentralizepy/graphs/Ring.py +++ b/src/decentralizepy/graphs/Ring.py @@ -4,15 +4,18 @@ from decentralizepy.graphs.Graph import Graph class Ring(Graph): """ The class for generating a Ring topology + """ def __init__(self, n_procs): """ Constructor. Generates a Ring graph + Parameters ---------- n_procs : int total number of nodes in the graph + """ super().__init__(n_procs) self.connect_graph() diff --git a/src/decentralizepy/graphs/SmallWorld.py b/src/decentralizepy/graphs/SmallWorld.py index 81a70225599018924415eeda567daec4684dc686..cc254f3313e2f040a69bb93d0826b7faf8da121b 100644 --- a/src/decentralizepy/graphs/SmallWorld.py +++ b/src/decentralizepy/graphs/SmallWorld.py @@ -8,11 +8,13 @@ class SmallWorld(Graph): The class for generating a SmallWorld topology Graph Adapted from https://gitlab.epfl.ch/sacs/ml-rawdatasharing/dnn-recommender/-/blob/master/topologies.py + """ def __init__(self, n_procs, k_over_2, beta): """ Constructor. Generates a random connected SmallWorld graph + Parameters ---------- n_procs : int @@ -21,6 +23,7 @@ class SmallWorld(Graph): k_over_2 config for smallworld beta : int beta config for smallworld. β = 1 is truly equal to the Erdős-Rényi network model + """ super().__init__(n_procs) G = smallworld.get_smallworld_graph(self.n_procs, k_over_2, beta) diff --git a/src/decentralizepy/mappings/Linear.py b/src/decentralizepy/mappings/Linear.py index b43bf860f7c0835e74336d4f13d3b687bfb5a1d0..57ef628c99817d32a8e1cddfefe5b5ff6a22a5cd 100644 --- a/src/decentralizepy/mappings/Linear.py +++ b/src/decentralizepy/mappings/Linear.py @@ -5,17 +5,20 @@ class Linear(Mapping): """ This class defines the mapping: uid = machine_id * procs_per_machine + rank + """ def __init__(self, n_machines, procs_per_machine): """ Constructor + Parameters ---------- n_machines : int Number of machines involved in learning procs_per_machine : int Number of processes spawned per machine + """ super().__init__(n_machines * procs_per_machine) self.n_machines = n_machines @@ -24,29 +27,35 @@ class Linear(Mapping): def get_uid(self, rank: int, machine_id: int): """ Gives the global unique identifier of the node + Parameters ---------- rank : int Node's rank on its machine machine_id : int node's machine in the cluster + Returns ------- int the unique identifier + """ return machine_id * self.procs_per_machine + rank def get_machine_and_rank(self, uid: int): """ Gives the rank and machine_id of the node + Parameters ---------- uid : int globally unique identifier of the node + Returns ------- 2-tuple a tuple of rank and machine_id + """ return (uid % self.procs_per_machine), (uid // self.procs_per_machine) diff --git a/src/decentralizepy/mappings/Mapping.py b/src/decentralizepy/mappings/Mapping.py index cf454e607dbfea265acbf0d6fe551bf99f9a1026..b979eb96a753b18abf4b3ecfe9ee62561ad08b2d 100644 --- a/src/decentralizepy/mappings/Mapping.py +++ b/src/decentralizepy/mappings/Mapping.py @@ -3,27 +3,37 @@ class Mapping: This class defines the bidirectional mapping between: 1. The unique identifier 2. machine_id and rank + """ def __init__(self, n_procs): """ Constructor + + Parameters + ---------- + n_procs : int + Total number of processes + """ self.n_procs = n_procs def get_uid(self, rank: int, machine_id: int): """ Gives the global unique identifier of the node + Parameters ---------- rank : int Node's rank on its machine machine_id : int node's machine in the cluster + Returns ------- int the unique identifier + """ raise NotImplementedError @@ -31,14 +41,17 @@ class Mapping: def get_machine_and_rank(self, uid: int): """ Gives the rank and machine_id of the node + Parameters ---------- uid : int globally unique identifier of the node + Returns ------- 2-tuple a tuple of rank and machine_id + """ raise NotImplementedError diff --git a/src/decentralizepy/models/Model.py b/src/decentralizepy/models/Model.py index befc93289d8a4ac105e878ff5a567d90aaba03f7..da834029c1b11a31db7dd171011446d0d3bba965 100644 --- a/src/decentralizepy/models/Model.py +++ b/src/decentralizepy/models/Model.py @@ -2,6 +2,15 @@ from torch import nn class Model(nn.Module): + """ + This class wraps the torch model + More fields can be added here + + """ def __init__(self): + """ + Constructor + + """ super().__init__() self.accumulated_gradients = [] diff --git a/src/decentralizepy/node/Node.py b/src/decentralizepy/node/Node.py index 41d061b6e60fece2f2927a09d1eb1f4744300907..183fbfeb87ba6f879b2df8bed3af83b5718d97ad 100644 --- a/src/decentralizepy/node/Node.py +++ b/src/decentralizepy/node/Node.py @@ -13,9 +13,27 @@ from decentralizepy.mappings.Mapping import Mapping class Node: """ This class defines the node (entity that performs learning, sharing and communication). + """ def save_plot(self, l, label, title, xlabel, filename): + """ + Save Matplotlib plot. Clears previous plots. + + Parameters + ---------- + l : dict + dict of x -> y. `x` must be castable to int. + label : str + label of the plot. Used for legend. + title : str + Header + xlabel : str + x-axis label + filename : str + Name of file to save the plot as. + + """ plt.clf() y_axis = [l[key] for key in l.keys()] x_axis = list(map(int, l.keys())) @@ -38,7 +56,8 @@ class Node: *args ): """ - Construct objects + Construct objects. + Parameters ---------- rank : int @@ -52,25 +71,14 @@ class Node: graph : decentralizepy.graphs The object containing the global graph config : dict - A dictionary of configurations. Must contain the following: - [DATASET] - dataset_package - dataset_class - model_class - [OPTIMIZER_PARAMS] - optimizer_package - optimizer_class - [TRAIN_PARAMS] - training_package = decentralizepy.training.Training - training_class = Training - epochs_per_round = 25 - batch_size = 64 + A dictionary of configurations. log_dir : str Logging directory log_level : logging.Level One of DEBUG, INFO, WARNING, ERROR, CRITICAL args : optional Other arguments + """ log_file = os.path.join(log_dir, str(rank) + ".log") logging.basicConfig( @@ -154,7 +162,6 @@ class Node: self.communication = comm_class( self.rank, self.machine_id, self.mapping, self.graph.n_procs, **comm_params ) - self.communication.connect_neighbors(self.graph.neighbors(self.uid)) sharing_configs = config["SHARING"] sharing_package = importlib.import_module(sharing_configs["sharing_package"]) @@ -181,8 +188,10 @@ class Node: def run(self): """ Start the decentralized learning + """ self.testset = self.dataset.get_testset() + self.communication.connect_neighbors(self.graph.neighbors(self.uid)) rounds_to_test = self.test_after for iteration in range(self.iterations): @@ -269,6 +278,7 @@ class Node: ): """ Constructor + Parameters ---------- rank : int @@ -301,6 +311,7 @@ class Node: One of DEBUG, INFO, WARNING, ERROR, CRITICAL args : optional Other arguments + """ self.instantiate( rank, diff --git a/src/decentralizepy/sharing/GrowingAlpha.py b/src/decentralizepy/sharing/GrowingAlpha.py index 1fb841094c5f6960734f28319da910dfd220b47e..49798282084a4facc1c979e74ab314d4bace0a0a 100644 --- a/src/decentralizepy/sharing/GrowingAlpha.py +++ b/src/decentralizepy/sharing/GrowingAlpha.py @@ -4,6 +4,10 @@ from decentralizepy.sharing.PartialModel import PartialModel class GrowingAlpha(PartialModel): + """ + This class implements the basic growing partial model sharing using a linear function. + + """ def __init__( self, rank, @@ -21,6 +25,41 @@ class GrowingAlpha(PartialModel): save_shared=False, metadata_cap=1.0, ): + """ + Constructor + + Parameters + ---------- + rank : int + Local rank + machine_id : int + Global machine id + communication : decentralizepy.communication.Communication + Communication module used to send and receive messages + mapping : decentralizepy.mappings.Mapping + Mapping (rank, machine_id) -> uid + graph : decentralizepy.graphs.Graph + Graph reprensenting neighbors + model : decentralizepy.models.Model + Model to train + dataset : decentralizepy.datasets.Dataset + Dataset for sharing data. Not implemented yet! TODO + log_dir : str + Location to write shared_params (only writing for 2 procs per machine) + init_alpha : float + Percentage of model to share initially + max_alpha : float + Maximum alpha to reach in k steps + k : int + Steps to reach maximum alpha. Also steps after which alpha increases. + dict_ordered : bool + Specifies if the python dict maintains the order of insertion + save_shared : bool + Specifies if the indices of shared parameters should be logged + metadata_cap : float + Share full model when self.alpha > metadata_cap + + """ super().__init__( rank, machine_id, @@ -40,6 +79,10 @@ class GrowingAlpha(PartialModel): self.k = k def step(self): + """ + Perform a sharing step. Implements D-PSGD with alpha increasing as a linear function. + + """ if (self.communication_round + 1) % self.k == 0: self.alpha += (self.max_alpha - self.init_alpha) / self.k self.alpha = min(self.alpha, 1.00) diff --git a/src/decentralizepy/sharing/PartialModel.py b/src/decentralizepy/sharing/PartialModel.py index 52e002e6d8ce304453e1598f3cba16caade44121..7addc0e1f8bff60bb6d7fffd2d64146fabc6dc67 100644 --- a/src/decentralizepy/sharing/PartialModel.py +++ b/src/decentralizepy/sharing/PartialModel.py @@ -9,6 +9,10 @@ from decentralizepy.sharing.Sharing import Sharing class PartialModel(Sharing): + """ + This class implements the vanilla version of partial model sharing. + + """ def __init__( self, rank, @@ -26,6 +30,7 @@ class PartialModel(Sharing): ): """ Constructor + Parameters ---------- rank : int @@ -44,6 +49,15 @@ class PartialModel(Sharing): Dataset for sharing data. Not implemented yet! TODO log_dir : str Location to write shared_params (only writing for 2 procs per machine) + alpha : float + Percentage of model to share + dict_ordered : bool + Specifies if the python dict maintains the order of insertion + save_shared : bool + Specifies if the indices of shared parameters should be logged + metadata_cap : float + Share full model when self.alpha > metadata_cap + """ super().__init__( rank, machine_id, communication, mapping, graph, model, dataset, log_dir @@ -53,7 +67,7 @@ class PartialModel(Sharing): self.save_shared = save_shared self.metadata_cap = metadata_cap - # Only save for 2 procs + # Only save for 2 procs: Save space if rank == 0 or rank == 1: self.save_shared = True @@ -64,6 +78,16 @@ class PartialModel(Sharing): Path(self.folder_path).mkdir(parents=True, exist_ok=True) def extract_top_gradients(self): + """ + Extract the indices and values of the topK gradients. + The gradients must have been accumulated. + + Returns + ------- + tuple + (a,b). a: The magnitudes of the topK gradients, b: Their indices. + + """ logging.info("Summing up gradients") assert len(self.model.accumulated_gradients) > 0 gradient_sum = self.model.accumulated_gradients[0] @@ -79,6 +103,15 @@ class PartialModel(Sharing): ) def serialized_model(self): + """ + Convert model to json dict. self.alpha specifies the fraction of model to send. + + Returns + ------- + dict + Model converted to json dict + + """ if self.alpha > self.metadata_cap: # Share fully return super().serialized_model() @@ -133,6 +166,20 @@ class PartialModel(Sharing): return m def deserialized_model(self, m): + """ + Convert received json dict to state_dict. + + Parameters + ---------- + m : dict + json dict received + + Returns + ------- + state_dict + state_dict of received + + """ if self.alpha > self.metadata_cap: # Share fully return super().deserialized_model(m) diff --git a/src/decentralizepy/sharing/Sharing.py b/src/decentralizepy/sharing/Sharing.py index c8ef9f1d5c5e95dc526d0ac74fee0768c45e78e7..92f6475b207bc79d4ec0bd08c2494393e0564335 100644 --- a/src/decentralizepy/sharing/Sharing.py +++ b/src/decentralizepy/sharing/Sharing.py @@ -9,6 +9,7 @@ import torch class Sharing: """ API defining who to share with and what, and what to do on receiving + """ def __init__( @@ -16,6 +17,7 @@ class Sharing: ): """ Constructor + Parameters ---------- rank : int @@ -34,6 +36,7 @@ class Sharing: Dataset for sharing data. Not implemented yer! TODO log_dir : str Location to write shared_params (only writing for 2 procs per machine) + """ self.rank = rank self.machine_id = machine_id @@ -54,10 +57,12 @@ class Sharing: def received_from_all(self): """ Check if all neighbors have sent the current iteration + Returns ------- bool True if required data has been received, False otherwise + """ for _, i in self.peer_deques.items(): if len(i) == 0: @@ -67,14 +72,17 @@ class Sharing: def get_neighbors(self, neighbors): """ Choose which neighbors to share with + Parameters ---------- neighbors : list(int) List of all neighbors + Returns ------- list(int) Neighbors to share with + """ # modify neighbors here return neighbors @@ -82,10 +90,12 @@ class Sharing: def serialized_model(self): """ Convert model to json dict. Here we can choose how much to share + Returns ------- dict Model converted to json dict + """ m = dict() for key, val in self.model.state_dict().items(): @@ -95,14 +105,17 @@ class Sharing: def deserialized_model(self, m): """ Convert received json dict to state_dict. + Parameters ---------- m : dict json dict received + Returns ------- state_dict state_dict of received + """ state_dict = dict() for key, value in m.items(): @@ -110,6 +123,10 @@ class Sharing: return state_dict def step(self): + """ + Perform a sharing step. Implements D-PSGD. + + """ data = self.serialized_model() my_uid = self.mapping.get_uid(self.rank, self.machine_id) all_neighbors = self.graph.neighbors(my_uid) diff --git a/src/decentralizepy/training/GradientAccumulator.py b/src/decentralizepy/training/GradientAccumulator.py index 31d39e2713e21927a2a53e0a23263843dcc3298c..e4feff28fed6759636cdeca2cbda34b18b620982 100644 --- a/src/decentralizepy/training/GradientAccumulator.py +++ b/src/decentralizepy/training/GradientAccumulator.py @@ -4,6 +4,10 @@ from decentralizepy.training.Training import Training class GradientAccumulator(Training): + """ + This class implements the training module which also accumulates gradients of steps in a list. + + """ def __init__( self, model, @@ -16,6 +20,7 @@ class GradientAccumulator(Training): ): """ Constructor + Parameters ---------- model : torch.nn.Module @@ -24,12 +29,15 @@ class GradientAccumulator(Training): Optimizer to learn parameters loss : function Loss function - epochs_per_round : int, optional - Number of epochs per training call + rounds : int, optional + Number of steps/epochs per training call + full_epochs: bool, optional + True if 1 round = 1 epoch. False if 1 round = 1 minibatch batch_size : int, optional Number of items to learn over, in one batch shuffle : bool True if the dataset should be shuffled before training. + """ super().__init__( model, optimizer, loss, rounds, full_epochs, batch_size, shuffle @@ -38,16 +46,19 @@ class GradientAccumulator(Training): def trainstep(self, data, target): """ One training step on a minibatch. + Parameters ---------- data : any Data item target : any Label + Returns ------- int Loss Value for the step + """ self.model.zero_grad() output = self.model(data) @@ -66,10 +77,12 @@ class GradientAccumulator(Training): def train_full(self, trainset): """ One training iteration, goes through the entire dataset + Parameters ---------- trainset : torch.utils.data.Dataloader The training dataset. + """ for epoch in range(self.rounds): epoch_loss = 0.0 @@ -83,10 +96,12 @@ class GradientAccumulator(Training): """ One training iteration with accumulation of gradients in model.accumulated_gradients. Goes through the entire dataset. + Parameters ---------- dataset : decentralizepy.datasets.Dataset The training dataset. Should implement get_trainset(batch_size, shuffle) + """ self.model.accumulated_gradients = [] super().train(dataset) diff --git a/src/decentralizepy/training/Training.py b/src/decentralizepy/training/Training.py index 7d594351fd28dc1be3207ca8b74700f13e45b9c6..52a8e9d070ed1d1bc7a3641cff25a4456821a756 100644 --- a/src/decentralizepy/training/Training.py +++ b/src/decentralizepy/training/Training.py @@ -8,6 +8,7 @@ from decentralizepy import utils class Training: """ This class implements the training module for a single node. + """ def __init__( @@ -22,6 +23,7 @@ class Training: ): """ Constructor + Parameters ---------- model : torch.nn.Module @@ -38,6 +40,7 @@ class Training: Number of items to learn over, in one batch shuffle : bool True if the dataset should be shuffled before training. + """ self.model = model self.optimizer = optimizer @@ -50,20 +53,24 @@ class Training: def reset_optimizer(self, optimizer): """ Replace the current optimizer with a new one + Parameters ---------- optimizer : torch.optim A new optimizer + """ self.optimizer = optimizer def eval_loss(self, dataset): """ Evaluate the loss on the training set + Parameters ---------- dataset : decentralizepy.datasets.Dataset The training dataset. Should implement get_trainset(batch_size, shuffle) + """ trainset = dataset.get_trainset(self.batch_size, self.shuffle) epoch_loss = 0.0 @@ -81,16 +88,19 @@ class Training: def trainstep(self, data, target): """ One training step on a minibatch. + Parameters ---------- data : any Data item target : any Label + Returns ------- int Loss Value for the step + """ self.model.zero_grad() output = self.model(data) @@ -102,10 +112,12 @@ class Training: def train_full(self, trainset): """ One training iteration, goes through the entire dataset + Parameters ---------- trainset : torch.utils.data.Dataloader The training dataset. + """ for epoch in range(self.rounds): epoch_loss = 0.0 @@ -118,10 +130,12 @@ class Training: def train(self, dataset): """ One training iteration + Parameters ---------- dataset : decentralizepy.datasets.Dataset The training dataset. Should implement get_trainset(batch_size, shuffle) + """ trainset = dataset.get_trainset(self.batch_size, self.shuffle) diff --git a/src/decentralizepy/utils.py b/src/decentralizepy/utils.py index 76eec069b0399000a3744931baca8590b3e94fe1..eac1e17deb5c7514522e47818f65c76501e7f48a 100644 --- a/src/decentralizepy/utils.py +++ b/src/decentralizepy/utils.py @@ -7,6 +7,7 @@ import os def conditional_value(var, nul, default): """ Set the value to default if nul. + Parameters ---------- var : any @@ -15,10 +16,12 @@ def conditional_value(var, nul, default): The null value. Assigns default if var == nul default : any The default value + Returns ------- type(var) The final value + """ if var != nul: return var @@ -29,16 +32,19 @@ def conditional_value(var, nul, default): def remove_keys(d, keys_to_remove): """ Removes given keys from the dict. Returns a new list. + Parameters ---------- d : dict The initial dictionary keys_to_remove : list List of keys to remove from dict + Returns ------- dict A new dictionary with the given keys removed. + """ return {key: d[key] for key in d if key not in keys_to_remove} @@ -46,10 +52,12 @@ def remove_keys(d, keys_to_remove): def get_args(): """ Utility to parse arguments. + Returns ------- args Command line arguments + """ parser = argparse.ArgumentParser() parser.add_argument("-mid", "--machine_id", type=int, default=0) @@ -75,12 +83,14 @@ def get_args(): def write_args(args, path): """ Write arguments to a json file + Parameters ---------- args : args Command line args path : str Location of the file to write to + """ data = { "machine_id": args.machine_id,