Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • sacs/decentralizepy
  • mvujas/decentralizepy
  • randl/decentralizepy
3 results
Show changes
Commits on Source (1)
#!/bin/bash #!/bin/bash
decpy_path=/mnt/nfs/risharma/Gitlab/decentralizepy/eval decpy_path=/mnt/nfs/kirsten/Gitlab/decentralizepy/eval
cd $decpy_path cd $decpy_path
env_python=~/miniconda3/envs/decpy/bin/python3 env_python=~/miniconda3/envs/decpy/bin/python3
graph=/mnt/nfs/risharma/Gitlab/tutorial/regular_16.txt graph=/mnt/nfs/kirsten/Gitlab/tutorial/regular_16.txt
original_config=/mnt/nfs/risharma/Gitlab/tutorial/config_celeba_sharing.ini original_config=/mnt/nfs/kirsten/Gitlab/tutorial/config_celeba_sharing.ini
config_file=~/tmp/config.ini config_file=~/tmp/config.ini
procs_per_machine=8 procs_per_machine=8
machines=2 machines=2
......
#!/bin/bash #!/bin/bash
# Documentation # Documentation
# Note: documentation was not written for this run file, so actual behaviour may differ
# This bash file takes three inputs. The first argument (nfs_home) is the path to the nfs home directory. # This bash file takes three inputs. The first argument (nfs_home) is the path to the nfs home directory.
# The second one (python_bin) is the path to the python bin folder. # The second one (python_bin) is the path to the python bin folder.
# The last argument (logs_subfolder) is the path to the logs folder with respect to the nfs home directory. # The last argument (logs_subfolder) is the path to the logs folder with respect to the nfs home directory.
...@@ -18,8 +19,10 @@ ...@@ -18,8 +19,10 @@
# Each node needs a folder called 'tmp' in the user's home directory # Each node needs a folder called 'tmp' in the user's home directory
# #
# Note: # Note:
# - The script does not change the optimizer. All configs are writen to use SGD. # - The script does not change the optimizer. All configs are writen to use Adam.
# - The script will set '--test_after' and '--train_evaluate_after' such that it happens at the end of a global epoch. # For SGD these need to be changed manually
# - The script will set '--test_after' and '--train_evaluate_after' to comm_rounds_per_global_epoch, i.e., the eavaluation
# on the train set and on the test set is carried out every global epoch.
# - The '--reset_optimizer' option is set to 0, i.e., the optimizer is not reset after a communication round (only # - The '--reset_optimizer' option is set to 0, i.e., the optimizer is not reset after a communication round (only
# relevant for Adams and other optimizers with internal state) # relevant for Adams and other optimizers with internal state)
# #
...@@ -37,41 +40,40 @@ decpy_path=$nfs_home/decentralizepy/eval ...@@ -37,41 +40,40 @@ decpy_path=$nfs_home/decentralizepy/eval
cd $decpy_path cd $decpy_path
env_python=$python_bin/python3 env_python=$python_bin/python3
graph=96_regular.edges
config_file=~/tmp/config.ini config_file=~/tmp/config.ini
procs_per_machine=16 procs_per_machine=16
machines=6 machines=6
global_epochs=150 global_epochs=150
eval_file=testing.py eval_file=testing.py
log_level=INFO log_level=INFO
ip_machines=$nfs_home/$logs_subfolder/ip_addr_6Machines.json
ip_machines=$nfs_home/configs/ip_addr_6Machines.json
m=`cat $ip_machines | grep $(/sbin/ifconfig ens785 | grep 'inet ' | awk '{print $2}') | cut -d'"' -f2` m=`cat $ip_machines | grep $(/sbin/ifconfig ens785 | grep 'inet ' | awk '{print $2}') | cut -d'"' -f2`
export PYTHONFAULTHANDLER=1 export PYTHONFAULTHANDLER=1
# Base configs for which the gird search is done # Base configs for which the gird search is done
tests=("step_configs/config_femnist_partialmodel.ini" "step_configs/config_femnist_topkacc.ini" "step_configs/config_femnist_wavelet.ini") tests="$nfs_home/$logs_subfolder/config.ini"
#tests=("$nfs_home/$logs_subfolder/config_cifar_sharing.ini" "$nfs_home/$logs_subfolder/config_cifar_partialmodel.ini" "$nfs_home/$logs_subfolder/config_cifar_topkacc.ini" "$nfs_home/$logs_subfolder/config_cifar_topkaccRandomAlpha.ini" "$nfs_home/$logs_subfolder/config_cifar_subsampling.ini" "$nfs_home/$logs_subfolder/config_cifar_wavelet.ini" "$nfs_home/$logs_subfolder/config_cifar_waveletRandomAlpha.ini")
#tests=("$nfs_home/$logs_subfolder/config_cifar_partialmodel.ini" "$nfs_home/$logs_subfolder/config_cifar_topkacc.ini" "$nfs_home/$logs_subfolder/config_cifar_topkaccRandomAlpha.ini" "$nfs_home/$logs_subfolder/config_cifar_subsampling.ini" "$nfs_home/$logs_subfolder/config_cifar_wavelet.ini" "$nfs_home/$logs_subfolder/config_cifar_waveletRandomAlpha.ini")
#tests=("$nfs_home/$logs_subfolder/config_cifar_subsampling.ini" "$nfs_home/$logs_subfolder/config_cifar_sharing.ini" "$nfs_home/$logs_subfolder/config_cifar_waveletRandomAlpha.ini")
#tests=("$nfs_home/$logs_subfolder/config_cifar_waveletRandomAlpha.ini")
# Learning rates # Learning rates
lr="0.001" lr="0.01"
# Batch size # Batch size
batchsize="16" batchsize="8"
# The number of communication rounds per global epoch # The number of communication rounds per global epoch
comm_rounds_per_global_epoch="1" comm_rounds_per_global_epoch="20"
procs=`expr $procs_per_machine \* $machines` procs=`expr $procs_per_machine \* $machines`
echo procs: $procs echo procs: $procs
# Celeba has 63741 samples # Celeba has 63741 samples
# Reddit has 70642 # Reddit has 70642
# Femnist 734463 # Femnist 734463
# Shakespeares 3678451, subsampled 678696 # Shakespeares 3678451
# cifar 50000 dataset_size=50000
dataset_size=734463
# Calculating the number of samples that each user/proc will have on average # Calculating the number of samples that each user/proc will have on average
samples_per_user=`expr $dataset_size / $procs` samples_per_user=`expr $dataset_size / $procs`
echo samples per user: $samples_per_user echo samples per user: $samples_per_user
# random_seeds for which to rerun the experiments # random_seeds for which to rerun the experiments
random_seeds=("97") # random_seeds=("90" "91" "92" "93" "94")
random_seeds=("94")
# random_seed = 97 # random_seed = 97
echo batchsize: $batchsize echo batchsize: $batchsize
echo communication rounds per global epoch: $comm_rounds_per_global_epoch echo communication rounds per global epoch: $comm_rounds_per_global_epoch
...@@ -85,10 +87,10 @@ echo iterations: $iterations ...@@ -85,10 +87,10 @@ echo iterations: $iterations
batches_per_comm_round=$($env_python -c "from math import floor; x = floor($batches_per_epoch / $comm_rounds_per_global_epoch); print(1 if x==0 else x)") batches_per_comm_round=$($env_python -c "from math import floor; x = floor($batches_per_epoch / $comm_rounds_per_global_epoch); print(1 if x==0 else x)")
# since the batches per communication round were rounded down we need to change the number of iterations to reflect that # since the batches per communication round were rounded down we need to change the number of iterations to reflect that
new_iterations=$($env_python -c "from math import floor; tmp = floor($batches_per_epoch / $comm_rounds_per_global_epoch); x = 1 if tmp == 0 else tmp; y = floor((($batches_per_epoch / $comm_rounds_per_global_epoch)/x)*$iterations); print($iterations if y<$iterations else y)") new_iterations=$($env_python -c "from math import floor; tmp = floor($batches_per_epoch / $comm_rounds_per_global_epoch); x = 1 if tmp == 0 else tmp; y = floor((($batches_per_epoch / $comm_rounds_per_global_epoch)/x)*$iterations); print($iterations if y<$iterations else y)")
echo batches per communication round: $batches_per_comm_round
echo corrected iterations: $new_iterations
test_after=$(($new_iterations / $global_epochs)) test_after=$(($new_iterations / $global_epochs))
echo test after: $test_after echo test after: $test_after
echo batches per communication round: $batches_per_comm_round
echo corrected iterations: $new_iterations
for i in "${tests[@]}" for i in "${tests[@]}"
do do
for seed in "${random_seeds[@]}" for seed in "${random_seeds[@]}"
...@@ -96,9 +98,14 @@ do ...@@ -96,9 +98,14 @@ do
echo $i echo $i
IFS='_' read -ra NAMES <<< $i IFS='_' read -ra NAMES <<< $i
IFS='.' read -ra NAME <<< ${NAMES[-1]} IFS='.' read -ra NAME <<< ${NAMES[-1]}
log_dir=$nfs_home$logs_subfolder/${NAME[0]}:lr=$lr:r=$comm_rounds_per_global_epoch:b=$batchsize:$(date '+%Y-%m-%dT%H:%M')/machine$m #log_dir_base=$nfs_home$logs_subfolder/${NAME[0]}:lr=$lr:r=$comm_rounds_per_global_epoch:b=$batchsize:$(date '+%Y-%m-%dT%H:%M')
echo results are stored in: $log_dir log_dir_base=$nfs_home$logs_subfolder:lr=$lr:r=$comm_rounds_per_global_epoch:b=$batchsize:$(date '+%Y-%m-%dT%H:%M')
echo results are stored in: $log_dir_base
log_dir=$log_dir_base/machine$m
mkdir -p $log_dir mkdir -p $log_dir
weight_store_dir=$log_dir_base/weights
mkdir -p $weight_store_dir
graph=$nfs_home/decentralizepy/eval/96_regular.edges
cp $i $config_file cp $i $config_file
# changing the config files to reflect the values of the current grid search state # changing the config files to reflect the values of the current grid search state
$python_bin/crudini --set $config_file COMMUNICATION addresses_filepath $ip_machines $python_bin/crudini --set $config_file COMMUNICATION addresses_filepath $ip_machines
...@@ -106,7 +113,9 @@ do ...@@ -106,7 +113,9 @@ do
$python_bin/crudini --set $config_file TRAIN_PARAMS rounds $batches_per_comm_round $python_bin/crudini --set $config_file TRAIN_PARAMS rounds $batches_per_comm_round
$python_bin/crudini --set $config_file TRAIN_PARAMS batch_size $batchsize $python_bin/crudini --set $config_file TRAIN_PARAMS batch_size $batchsize
$python_bin/crudini --set $config_file DATASET random_seed $seed $python_bin/crudini --set $config_file DATASET random_seed $seed
$env_python $eval_file -ro 0 -tea $test_after -ld $log_dir -mid $m -ps $procs_per_machine -ms $machines -is $new_iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level $python_bin/crudini --set $config_file COMMUNICATION addresses_filepath $ip_machines
$python_bin/crudini --set $config_file COMMUNICATION offset -10720
$env_python $eval_file -cte 0 -ro 0 -tea $test_after -ld $log_dir -wsd $weight_store_dir -mid $m -ps $procs_per_machine -ms $machines -is $new_iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level
echo $i is done echo $i is done
sleep 200 sleep 200
echo end of sleep echo end of sleep
......
...@@ -33,4 +33,4 @@ addresses_filepath = ip_addr_6Machines.json ...@@ -33,4 +33,4 @@ addresses_filepath = ip_addr_6Machines.json
[SHARING] [SHARING]
sharing_package = decentralizepy.sharing.PartialModel sharing_package = decentralizepy.sharing.PartialModel
sharing_class = PartialModel sharing_class = PartialModel
alpha=0.5 alpha=0.3
[DATASET]
dataset_package = decentralizepy.datasets.CIFAR10
dataset_class = CIFAR10
model_class = LeNet
train_dir = /mnt/nfs/shared/CIFAR
test_dir = /mnt/nfs/shared/CIFAR
; python list of fractions below
sizes =
random_seed = 90
partition_niid = True
shards = 4
[OPTIMIZER_PARAMS]
optimizer_package = torch.optim
optimizer_class = SGD
lr = 0.01
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
rounds = 3
full_epochs = False
batch_size = 8
shuffle = True
loss_package = torch.nn
loss_class = CrossEntropyLoss
[COMMUNICATION]
comm_package = decentralizepy.communication.TCP
comm_class = TCP
addresses_filepath = /mnt/nfs/kirsten/Gitlab/jac_decentralizepy/decentralizepy/eval/ip_addr_6Machines.json
compression_package = decentralizepy.compression.EliasFpzip
compression_class = EliasFpzip
compress = True
offset = -10720
[SHARING]
sharing_package = decentralizepy.sharing.PartialModel
sharing_class = PartialModel
alpha = 0.37
accumulation = True
accumulate_averaging_changes = True
[DATASET]
dataset_package = decentralizepy.datasets.CIFAR10
dataset_class = CIFAR10
model_class = LeNet
train_dir = /mnt/nfs/shared/CIFAR
test_dir = /mnt/nfs/shared/CIFAR
; python list of fractions below
sizes =
random_seed = 99
partition_niid = True
shards = 4
[OPTIMIZER_PARAMS]
optimizer_package = torch.optim
optimizer_class = SGD
lr = 0.001
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
rounds = 65
full_epochs = False
batch_size = 8
shuffle = True
loss_package = torch.nn
loss_class = CrossEntropyLoss
[COMMUNICATION]
comm_package = decentralizepy.communication.TCP
comm_class = TCP
addresses_filepath = ip_addr_6Machines.json
[SHARING]
sharing_package = decentralizepy.sharing.PartialModel
sharing_class = PartialModel
alpha=0.3
[DATASET]
dataset_package = decentralizepy.datasets.CIFAR10
dataset_class = CIFAR10
model_class = LeNet
train_dir = /mnt/nfs/shared/CIFAR
test_dir = /mnt/nfs/shared/CIFAR
; python list of fractions below
sizes =
random_seed = 99
partition_niid = True
shards = 4
[OPTIMIZER_PARAMS]
optimizer_package = torch.optim
optimizer_class = SGD
lr = 0.001
[TRAIN_PARAMS]
training_package = decentralizepy.training.Training
training_class = Training
rounds = 65
full_epochs = False
batch_size = 8
shuffle = True
loss_package = torch.nn
loss_class = CrossEntropyLoss
[COMMUNICATION]
comm_package = decentralizepy.communication.TCP
comm_class = TCP
addresses_filepath = ip_addr_6Machines.json
[SHARING]
sharing_package = decentralizepy.sharing.Sharing
sharing_class = Sharing
{
"0": "10.90.41.128",
"1": "10.90.41.129",
"2": "10.90.41.130",
"3": "10.90.41.131",
"4": "10.90.41.132",
"5": "10.90.41.133"
}
\ No newline at end of file
...@@ -142,7 +142,16 @@ class PartialModel(Sharing): ...@@ -142,7 +142,16 @@ class PartialModel(Sharing):
""" """
logging.info("Returning topk gradients") logging.info("Returning topk gradients")
G_topk = torch.abs(self.model.model_change)
#Combined version
G_topk = self.model.jacobian * torch.abs(self.model.model_change)
#Model change:
#G_topk = torch.abs(self.model.model_change)
#Sensitivity:
#G_topk = self.model.jacobian
std, mean = torch.std_mean(G_topk, unbiased=False) std, mean = torch.std_mean(G_topk, unbiased=False)
self.std = std.item() self.std = std.item()
self.mean = mean.item() self.mean = mean.item()
......
import copy
import logging import logging
import torch import torch
...@@ -66,6 +68,45 @@ class Training: ...@@ -66,6 +68,45 @@ class Training:
self.batch_size = utils.conditional_value(batch_size, "", int(1)) self.batch_size = utils.conditional_value(batch_size, "", int(1))
self.shuffle = utils.conditional_value(shuffle, "", False) self.shuffle = utils.conditional_value(shuffle, "", False)
def make_functional(self, mod):
orig_params = tuple(mod.parameters())
# Remove all the parameters in the model
names = []
for name, p in list(mod.named_parameters()):
self.del_attr(mod, name.split("."))
names.append(name)
return orig_params, names
def load_weights(self, mod, names, params, as_params=False):
for name, p in zip(names, params):
self.set_attr(mod, name.split("."), p if not as_params else torch.nn.Parameter(p))
def del_attr(self, obj, names):
if len(names) == 1:
delattr(obj, names[0])
else:
self.del_attr(getattr(obj, names[0]), names[1:])
def set_attr(self, obj, names, val):
if len(names) == 1:
setattr(obj, names[0], val)
else:
self.set_attr(getattr(obj, names[0]), names[1:], val)
def transform_parameterwise(self, jac):
Jacobian = []
to_stack = []
with torch.no_grad():
for v in jac:
to_stack = []
for i in range(v.shape[1]):
to_stack.append(v[0][i].flatten())
Jacobian.append(torch.stack(to_stack, axis=1))
Jacobian = torch.cat(Jacobian).pow(2).sum(dim=1).sqrt()
return Jacobian
def reset_optimizer(self, optimizer): def reset_optimizer(self, optimizer):
""" """
Replace the current optimizer with a new one Replace the current optimizer with a new one
...@@ -123,7 +164,23 @@ class Training: ...@@ -123,7 +164,23 @@ class Training:
loss_val = self.loss(output, target) loss_val = self.loss(output, target)
loss_val.backward() loss_val.backward()
self.optimizer.step() self.optimizer.step()
return loss_val.item()
jac_model = copy.deepcopy(self.model)
params, names = self.make_functional(jac_model)
params = tuple(p.detach().requires_grad_() for p in params)
def f(*new_params):
self.load_weights(jac_model, names, new_params)
out = jac_model(data)
return out
J = torch.autograd.functional.jacobian(f, params)
jacobian = self.transform_parameterwise(J)
del jac_model
return loss_val.item(),jacobian
#return loss_val.item()
def train_full(self, dataset): def train_full(self, dataset):
""" """
...@@ -165,12 +222,17 @@ class Training: ...@@ -165,12 +222,17 @@ class Training:
self.train_full(dataset) self.train_full(dataset)
else: else:
iter_loss = 0.0 iter_loss = 0.0
# more elegant way?
jacobian_total = torch.zeros_like(torch.nn.utils.parameters_to_vector(self.model.parameters()))
count = 0 count = 0
trainset = dataset.get_trainset(self.batch_size, self.shuffle) trainset = dataset.get_trainset(self.batch_size, self.shuffle)
while count < self.rounds: while count < self.rounds:
for data, target in trainset: for data, target in trainset:
iter_loss += self.trainstep(data, target) loss, jac = self.trainstep(data, target)
iter_loss += loss
jacobian_total += jac
count += 1 count += 1
logging.info("Round: {} loss: {}".format(count, iter_loss / count)) logging.info("Round: {} loss: {}".format(count, iter_loss / count))
if count >= self.rounds: if count >= self.rounds:
break break
self.model.jacobian = jacobian_total