Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • sacs/decentralizepy
  • mvujas/decentralizepy
  • randl/decentralizepy
3 results
Show changes
Commits on Source (1)
#!/bin/bash
decpy_path=/mnt/nfs/risharma/Gitlab/decentralizepy/eval
decpy_path=/mnt/nfs/kirsten/Gitlab/decentralizepy/eval
cd $decpy_path
env_python=~/miniconda3/envs/decpy/bin/python3
graph=/mnt/nfs/risharma/Gitlab/tutorial/regular_16.txt
original_config=/mnt/nfs/risharma/Gitlab/tutorial/config_celeba_sharing.ini
graph=/mnt/nfs/kirsten/Gitlab/tutorial/regular_16.txt
original_config=/mnt/nfs/kirsten/Gitlab/tutorial/config_celeba_sharing.ini
config_file=~/tmp/config.ini
procs_per_machine=8
machines=2
......
#!/bin/bash
# Documentation
# Note: documentation was not written for this run file, so actual behaviour may differ
# This bash file takes three inputs. The first argument (nfs_home) is the path to the nfs home directory.
# The second one (python_bin) is the path to the python bin folder.
# The last argument (logs_subfolder) is the path to the logs folder with respect to the nfs home directory.
......@@ -18,8 +19,10 @@
# Each node needs a folder called 'tmp' in the user's home directory
#
# Note:
# - The script does not change the optimizer. All configs are writen to use SGD.
# - The script will set '--test_after' and '--train_evaluate_after' such that it happens at the end of a global epoch.
# - The script does not change the optimizer. All configs are writen to use Adam.
# For SGD these need to be changed manually
# - The script will set '--test_after' and '--train_evaluate_after' to comm_rounds_per_global_epoch, i.e., the eavaluation
# on the train set and on the test set is carried out every global epoch.
# - The '--reset_optimizer' option is set to 0, i.e., the optimizer is not reset after a communication round (only
# relevant for Adams and other optimizers with internal state)
#
......@@ -37,41 +40,40 @@ decpy_path=$nfs_home/decentralizepy/eval
cd $decpy_path
env_python=$python_bin/python3
graph=96_regular.edges
config_file=~/tmp/config.ini
procs_per_machine=16
machines=6
global_epochs=150
eval_file=testing.py
log_level=INFO
ip_machines=$nfs_home/configs/ip_addr_6Machines.json
ip_machines=$nfs_home/$logs_subfolder/ip_addr_6Machines.json
m=`cat $ip_machines | grep $(/sbin/ifconfig ens785 | grep 'inet ' | awk '{print $2}') | cut -d'"' -f2`
export PYTHONFAULTHANDLER=1
# Base configs for which the gird search is done
tests=("step_configs/config_femnist_partialmodel.ini" "step_configs/config_femnist_topkacc.ini" "step_configs/config_femnist_wavelet.ini")
tests="$nfs_home/$logs_subfolder/config.ini"
#tests=("$nfs_home/$logs_subfolder/config_cifar_sharing.ini" "$nfs_home/$logs_subfolder/config_cifar_partialmodel.ini" "$nfs_home/$logs_subfolder/config_cifar_topkacc.ini" "$nfs_home/$logs_subfolder/config_cifar_topkaccRandomAlpha.ini" "$nfs_home/$logs_subfolder/config_cifar_subsampling.ini" "$nfs_home/$logs_subfolder/config_cifar_wavelet.ini" "$nfs_home/$logs_subfolder/config_cifar_waveletRandomAlpha.ini")
#tests=("$nfs_home/$logs_subfolder/config_cifar_partialmodel.ini" "$nfs_home/$logs_subfolder/config_cifar_topkacc.ini" "$nfs_home/$logs_subfolder/config_cifar_topkaccRandomAlpha.ini" "$nfs_home/$logs_subfolder/config_cifar_subsampling.ini" "$nfs_home/$logs_subfolder/config_cifar_wavelet.ini" "$nfs_home/$logs_subfolder/config_cifar_waveletRandomAlpha.ini")
#tests=("$nfs_home/$logs_subfolder/config_cifar_subsampling.ini" "$nfs_home/$logs_subfolder/config_cifar_sharing.ini" "$nfs_home/$logs_subfolder/config_cifar_waveletRandomAlpha.ini")
#tests=("$nfs_home/$logs_subfolder/config_cifar_waveletRandomAlpha.ini")
# Learning rates
lr="0.001"
lr="0.01"
# Batch size
batchsize="16"
batchsize="8"
# The number of communication rounds per global epoch
comm_rounds_per_global_epoch="1"
comm_rounds_per_global_epoch="20"
procs=`expr $procs_per_machine \* $machines`
echo procs: $procs
# Celeba has 63741 samples
# Reddit has 70642
# Femnist 734463
# Shakespeares 3678451, subsampled 678696
# cifar 50000
dataset_size=734463
# Shakespeares 3678451
dataset_size=50000
# Calculating the number of samples that each user/proc will have on average
samples_per_user=`expr $dataset_size / $procs`
echo samples per user: $samples_per_user
# random_seeds for which to rerun the experiments
random_seeds=("97")
# random_seeds=("90" "91" "92" "93" "94")
random_seeds=("94")
# random_seed = 97
echo batchsize: $batchsize
echo communication rounds per global epoch: $comm_rounds_per_global_epoch
......@@ -85,10 +87,10 @@ echo iterations: $iterations
batches_per_comm_round=$($env_python -c "from math import floor; x = floor($batches_per_epoch / $comm_rounds_per_global_epoch); print(1 if x==0 else x)")
# since the batches per communication round were rounded down we need to change the number of iterations to reflect that
new_iterations=$($env_python -c "from math import floor; tmp = floor($batches_per_epoch / $comm_rounds_per_global_epoch); x = 1 if tmp == 0 else tmp; y = floor((($batches_per_epoch / $comm_rounds_per_global_epoch)/x)*$iterations); print($iterations if y<$iterations else y)")
echo batches per communication round: $batches_per_comm_round
echo corrected iterations: $new_iterations
test_after=$(($new_iterations / $global_epochs))
echo test after: $test_after
echo batches per communication round: $batches_per_comm_round
echo corrected iterations: $new_iterations
for i in "${tests[@]}"
do
for seed in "${random_seeds[@]}"
......@@ -96,9 +98,14 @@ do
echo $i
IFS='_' read -ra NAMES <<< $i
IFS='.' read -ra NAME <<< ${NAMES[-1]}
log_dir=$nfs_home$logs_subfolder/${NAME[0]}:lr=$lr:r=$comm_rounds_per_global_epoch:b=$batchsize:$(date '+%Y-%m-%dT%H:%M')/machine$m
echo results are stored in: $log_dir
#log_dir_base=$nfs_home$logs_subfolder/${NAME[0]}:lr=$lr:r=$comm_rounds_per_global_epoch:b=$batchsize:$(date '+%Y-%m-%dT%H:%M')
log_dir_base=$nfs_home$logs_subfolder:lr=$lr:r=$comm_rounds_per_global_epoch:b=$batchsize:$(date '+%Y-%m-%dT%H:%M')
echo results are stored in: $log_dir_base
log_dir=$log_dir_base/machine$m
mkdir -p $log_dir
weight_store_dir=$log_dir_base/weights
mkdir -p $weight_store_dir
graph=$nfs_home/decentralizepy/eval/96_regular.edges
cp $i $config_file
# changing the config files to reflect the values of the current grid search state
$python_bin/crudini --set $config_file COMMUNICATION addresses_filepath $ip_machines
......@@ -106,7 +113,9 @@ do
$python_bin/crudini --set $config_file TRAIN_PARAMS rounds $batches_per_comm_round
$python_bin/crudini --set $config_file TRAIN_PARAMS batch_size $batchsize
$python_bin/crudini --set $config_file DATASET random_seed $seed
$env_python $eval_file -ro 0 -tea $test_after -ld $log_dir -mid $m -ps $procs_per_machine -ms $machines -is $new_iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level
$python_bin/crudini --set $config_file COMMUNICATION addresses_filepath $ip_machines
$python_bin/crudini --set $config_file COMMUNICATION offset -10720
$env_python $eval_file -cte 0 -ro 0 -tea $test_after -ld $log_dir -wsd $weight_store_dir -mid $m -ps $procs_per_machine -ms $machines -is $new_iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level
echo $i is done
sleep 200
echo end of sleep
......
......@@ -33,4 +33,4 @@ addresses_filepath = ip_addr_6Machines.json
[SHARING]
sharing_package = decentralizepy.sharing.PartialModel
sharing_class = PartialModel
alpha=0.5
alpha=0.3
[DATASET]
dataset_package = decentralizepy.datasets.CIFAR10
dataset_class = CIFAR10
model_class = LeNet
train_dir = /mnt/nfs/shared/CIFAR
test_dir = /mnt/nfs/shared/CIFAR
; python list of fractions below
sizes =
random_seed = 90
partition_niid = True
shards = 4
[OPTIMIZER_PARAMS]
optimizer_package = torch.optim
optimizer_class = SGD
lr = 0.01
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
rounds = 3
full_epochs = False
batch_size = 8
shuffle = True
loss_package = torch.nn
loss_class = CrossEntropyLoss
[COMMUNICATION]
comm_package = decentralizepy.communication.TCP
comm_class = TCP
addresses_filepath = /mnt/nfs/kirsten/Gitlab/jac_decentralizepy/decentralizepy/eval/ip_addr_6Machines.json
compression_package = decentralizepy.compression.EliasFpzip
compression_class = EliasFpzip
compress = True
offset = -10720
[SHARING]
sharing_package = decentralizepy.sharing.PartialModel
sharing_class = PartialModel
alpha = 0.37
accumulation = True
accumulate_averaging_changes = True
[DATASET]
dataset_package = decentralizepy.datasets.CIFAR10
dataset_class = CIFAR10
model_class = LeNet
train_dir = /mnt/nfs/shared/CIFAR
test_dir = /mnt/nfs/shared/CIFAR
; python list of fractions below
sizes =
random_seed = 99
partition_niid = True
shards = 4
[OPTIMIZER_PARAMS]
optimizer_package = torch.optim
optimizer_class = SGD
lr = 0.001
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
rounds = 65
full_epochs = False
batch_size = 8
shuffle = True
loss_package = torch.nn
loss_class = CrossEntropyLoss
[COMMUNICATION]
comm_package = decentralizepy.communication.TCP
comm_class = TCP
addresses_filepath = ip_addr_6Machines.json
[SHARING]
sharing_package = decentralizepy.sharing.PartialModel
sharing_class = PartialModel
alpha=0.3
[DATASET]
dataset_package = decentralizepy.datasets.CIFAR10
dataset_class = CIFAR10
model_class = LeNet
train_dir = /mnt/nfs/shared/CIFAR
test_dir = /mnt/nfs/shared/CIFAR
; python list of fractions below
sizes =
random_seed = 99
partition_niid = True
shards = 4
[OPTIMIZER_PARAMS]
optimizer_package = torch.optim
optimizer_class = SGD
lr = 0.001
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
rounds = 65
full_epochs = False
batch_size = 8
shuffle = True
loss_package = torch.nn
loss_class = CrossEntropyLoss
[COMMUNICATION]
comm_package = decentralizepy.communication.TCP
comm_class = TCP
addresses_filepath = ip_addr_6Machines.json
[SHARING]
sharing_package = decentralizepy.sharing.Sharing
sharing_class = Sharing
{
"0": "10.90.41.128",
"1": "10.90.41.129",
"2": "10.90.41.130",
"3": "10.90.41.131",
"4": "10.90.41.132",
"5": "10.90.41.133"
}
\ No newline at end of file
......@@ -142,7 +142,16 @@ class PartialModel(Sharing):
"""
logging.info("Returning topk gradients")
G_topk = torch.abs(self.model.model_change)
#Combined version
G_topk = self.model.jacobian * torch.abs(self.model.model_change)
#Model change:
#G_topk = torch.abs(self.model.model_change)
#Sensitivity:
#G_topk = self.model.jacobian
std, mean = torch.std_mean(G_topk, unbiased=False)
self.std = std.item()
self.mean = mean.item()
......
import copy
import logging
import torch
......@@ -66,6 +68,45 @@ class Training:
self.batch_size = utils.conditional_value(batch_size, "", int(1))
self.shuffle = utils.conditional_value(shuffle, "", False)
def make_functional(self, mod):
orig_params = tuple(mod.parameters())
# Remove all the parameters in the model
names = []
for name, p in list(mod.named_parameters()):
self.del_attr(mod, name.split("."))
names.append(name)
return orig_params, names
def load_weights(self, mod, names, params, as_params=False):
for name, p in zip(names, params):
self.set_attr(mod, name.split("."), p if not as_params else torch.nn.Parameter(p))
def del_attr(self, obj, names):
if len(names) == 1:
delattr(obj, names[0])
else:
self.del_attr(getattr(obj, names[0]), names[1:])
def set_attr(self, obj, names, val):
if len(names) == 1:
setattr(obj, names[0], val)
else:
self.set_attr(getattr(obj, names[0]), names[1:], val)
def transform_parameterwise(self, jac):
Jacobian = []
to_stack = []
with torch.no_grad():
for v in jac:
to_stack = []
for i in range(v.shape[1]):
to_stack.append(v[0][i].flatten())
Jacobian.append(torch.stack(to_stack, axis=1))
Jacobian = torch.cat(Jacobian).pow(2).sum(dim=1).sqrt()
return Jacobian
def reset_optimizer(self, optimizer):
"""
Replace the current optimizer with a new one
......@@ -123,7 +164,23 @@ class Training:
loss_val = self.loss(output, target)
loss_val.backward()
self.optimizer.step()
return loss_val.item()
jac_model = copy.deepcopy(self.model)
params, names = self.make_functional(jac_model)
params = tuple(p.detach().requires_grad_() for p in params)
def f(*new_params):
self.load_weights(jac_model, names, new_params)
out = jac_model(data)
return out
J = torch.autograd.functional.jacobian(f, params)
jacobian = self.transform_parameterwise(J)
del jac_model
return loss_val.item(),jacobian
#return loss_val.item()
def train_full(self, dataset):
"""
......@@ -165,12 +222,17 @@ class Training:
self.train_full(dataset)
else:
iter_loss = 0.0
# more elegant way?
jacobian_total = torch.zeros_like(torch.nn.utils.parameters_to_vector(self.model.parameters()))
count = 0
trainset = dataset.get_trainset(self.batch_size, self.shuffle)
while count < self.rounds:
for data, target in trainset:
iter_loss += self.trainstep(data, target)
loss, jac = self.trainstep(data, target)
iter_loss += loss
jacobian_total += jac
count += 1
logging.info("Round: {} loss: {}".format(count, iter_loss / count))
if count >= self.rounds:
break
self.model.jacobian = jacobian_total