From 13dd064748ebe0698d203e063b1d1d38aacc6e05 Mon Sep 17 00:00:00 2001 From: Jeffrey Wigger <jeffrey.wigger@epfl.ch> Date: Thu, 28 Apr 2022 20:24:44 +0000 Subject: [PATCH] updated configs and run files --- eval/6_star.edges | 11 + eval/run_celeba_synchronous.sh | 117 +++++++++ eval/run_grid.sh | 13 +- eval/run_xtimes.sh | 115 +++++++++ eval/run_xtimes_celeba.sh | 110 ++++++++ eval/run_xtimes_cifar.sh | 110 ++++++++ eval/run_xtimes_femnist.sh | 110 ++++++++ eval/run_xtimes_reddit.sh | 110 ++++++++ eval/run_xtimes_shakespeare.sh | 110 ++++++++ eval/step_configs/config_celeba_fft.ini | 2 +- eval/step_configs/config_celeba_grow.ini | 2 +- .../config_celeba_manualadapt.ini | 2 +- .../config_celeba_partialmodel.ini | 3 +- .../config_celeba_randomalpha.ini | 2 +- .../config_celeba_randomalphainc.ini | 2 +- .../step_configs/config_celeba_roundrobin.ini | 2 +- eval/step_configs/config_celeba_sharing.ini | 2 +- .../config_celeba_subsampling.ini | 2 +- .../config_celeba_synchronous.ini | 33 +++ eval/step_configs/config_celeba_topkacc.ini | 3 +- .../config_celeba_topkacc_norm.ini | 36 +++ .../config_celeba_topkaccbound.ini | 38 +++ eval/step_configs/config_celeba_topkparam.ini | 2 +- .../step_configs/config_celeba_topkrandom.ini | 2 +- eval/step_configs/config_celeba_wavelet.ini | 3 +- ...config_cifar_SharingWithRWAsyncDynamic.ini | 36 +++ ...topk.ini => config_cifar_partialmodel.ini} | 12 +- ...fig_CIFAR.ini => config_cifar_sharing.ini} | 8 +- .../step_configs/config_cifar_subsampling.ini | 36 +++ eval/step_configs/config_cifar_topkacc.ini | 38 +++ ...R_wavelet.ini => config_cifar_wavelet.ini} | 9 +- eval/step_configs/config_femnist_fft.ini | 2 +- eval/step_configs/config_femnist_grow.ini | 2 +- .../config_femnist_partialmodel.ini | 2 +- eval/step_configs/config_femnist_sharing.ini | 2 +- .../config_femnist_subsampling.ini | 2 +- eval/step_configs/config_femnist_topkacc.ini | 5 +- .../config_femnist_topkaccbound.ini | 38 +++ .../step_configs/config_femnist_topkparam.ini | 2 +- eval/step_configs/config_femnist_wavelet.ini | 3 +- .../config_reddit_partialmodel.ini | 34 +++ eval/step_configs/config_reddit_sharing.ini | 2 +- .../config_reddit_subsampling.ini | 34 +++ eval/step_configs/config_reddit_topkacc.ini | 36 +++ eval/step_configs/config_reddit_wavelet.ini | 39 +++ .../config_shakespeare_partialmodel.ini | 34 +++ ...are.ini => config_shakespeare_sharing.ini} | 6 +- .../config_shakespeare_subsampling.ini | 34 +++ .../config_shakespeare_topkacc.ini | 36 +++ .../config_shakespeare_wavelet.ini | 39 +++ src/decentralizepy/datasets/Femnist.py | 142 ++++++++++- src/decentralizepy/datasets/Shakespeare.py | 5 +- src/decentralizepy/models/Resnet.py | 200 +++++++++++++++ src/decentralizepy/sharing/FFT.py | 41 +-- src/decentralizepy/sharing/LowerBoundTopK.py | 217 ++++++++++++++++ src/decentralizepy/sharing/PartialModel.py | 14 +- src/decentralizepy/sharing/Sharing.py | 4 +- src/decentralizepy/sharing/SubSampling.py | 11 + src/decentralizepy/sharing/Synchronous.py | 237 ++++++++++++++++++ src/decentralizepy/sharing/TopKNormalized.py | 117 +++++++++ src/decentralizepy/sharing/Wavelet.py | 48 ++-- src/decentralizepy/utils.py | 1 + 62 files changed, 2377 insertions(+), 93 deletions(-) create mode 100644 eval/6_star.edges create mode 100755 eval/run_celeba_synchronous.sh create mode 100755 eval/run_xtimes.sh create mode 100755 eval/run_xtimes_celeba.sh create mode 100755 eval/run_xtimes_cifar.sh create mode 100755 eval/run_xtimes_femnist.sh create mode 100755 eval/run_xtimes_reddit.sh create mode 100755 eval/run_xtimes_shakespeare.sh create mode 100644 eval/step_configs/config_celeba_synchronous.ini create mode 100644 eval/step_configs/config_celeba_topkacc_norm.ini create mode 100644 eval/step_configs/config_celeba_topkaccbound.ini create mode 100644 eval/step_configs/config_cifar_SharingWithRWAsyncDynamic.ini rename eval/step_configs/{config_CIFAR_topk.ini => config_cifar_partialmodel.ini} (88%) rename eval/step_configs/{config_CIFAR.ini => config_cifar_sharing.ini} (92%) create mode 100644 eval/step_configs/config_cifar_subsampling.ini create mode 100644 eval/step_configs/config_cifar_topkacc.ini rename eval/step_configs/{config_CIFAR_wavelet.ini => config_cifar_wavelet.ini} (89%) create mode 100644 eval/step_configs/config_femnist_topkaccbound.ini create mode 100644 eval/step_configs/config_reddit_partialmodel.ini create mode 100644 eval/step_configs/config_reddit_subsampling.ini create mode 100644 eval/step_configs/config_reddit_topkacc.ini create mode 100644 eval/step_configs/config_reddit_wavelet.ini create mode 100644 eval/step_configs/config_shakespeare_partialmodel.ini rename eval/step_configs/{config_shakespeare.ini => config_shakespeare_sharing.ini} (79%) create mode 100644 eval/step_configs/config_shakespeare_subsampling.ini create mode 100644 eval/step_configs/config_shakespeare_topkacc.ini create mode 100644 eval/step_configs/config_shakespeare_wavelet.ini create mode 100644 src/decentralizepy/models/Resnet.py create mode 100644 src/decentralizepy/sharing/LowerBoundTopK.py create mode 100644 src/decentralizepy/sharing/Synchronous.py create mode 100644 src/decentralizepy/sharing/TopKNormalized.py diff --git a/eval/6_star.edges b/eval/6_star.edges new file mode 100644 index 0000000..fcc45de --- /dev/null +++ b/eval/6_star.edges @@ -0,0 +1,11 @@ +6 +0 1 +0 2 +0 3 +0 4 +0 5 +1 0 +2 0 +3 0 +4 0 +5 0 \ No newline at end of file diff --git a/eval/run_celeba_synchronous.sh b/eval/run_celeba_synchronous.sh new file mode 100755 index 0000000..86c3f3e --- /dev/null +++ b/eval/run_celeba_synchronous.sh @@ -0,0 +1,117 @@ +#!/bin/bash +# Documentation +# Note: documentation was not written for this run file, so actual behaviour may differ +# This bash file takes three inputs. The first argument (nfs_home) is the path to the nfs home directory. +# The second one (python_bin) is the path to the python bin folder. +# The last argument (logs_subfolder) is the path to the logs folder with respect to the nfs home directory. +# +# The nfs home directory should contain the code of this framework stored in $nfs_home/decentralizepy and a folder +# called configs which contains the file 'ip_addr_6Machines.json' +# The python bin folder needs to include all the dependencies of this project including crudini. +# The results will be stored in $nfs_home/$logs_subfolder +# Each of the experiments will be stored in its own folder inside the logs_subfolder. The folder of the experiment +# starts with the last part of the config name, i.e., for 'config_celeba_topkacc.ini' it will start with topkacc. +# The name further includes the learning rate, rounds and batchsize as well as the exact date at which the experiment +# was run. +# Example: ./run_grid.sh /mnt/nfs/wigger /mnt/nfs/wigger/anaconda3/envs/sacs39/bin /logs/celeba +# +# Additional requirements: +# Each node needs a folder called 'tmp' in the user's home directory +# +# Note: +# - The script does not change the optimizer. All configs are writen to use Adam. +# For SGD these need to be changed manually +# - The script will set '--test_after' and '--train_evaluate_after' to comm_rounds_per_global_epoch, i.e., the eavaluation +# on the train set and on the test set is carried out every global epoch. +# - The '--reset_optimizer' option is set to 0, i.e., the optimizer is not reset after a communication round (only +# relevant for Adams and other optimizers with internal state) +# +# Addapting the script to other datasets: +# Change the variable 'dataset_size' to reflect the data sets size. +# +# Known issues: +# - If the script is started at the very end of a minute then there is a change that two folders are created as not all +# machines may start running the script at the exact same moment. + +nfs_home=$1 +python_bin=$2 +logs_subfolder=$3 +decpy_path=$nfs_home/decentralizepy/eval +cd $decpy_path + +env_python=$python_bin/python3 +graph=6_star.edges +config_file=~/tmp/config.ini +procs_per_machine=6 +machines=1 +global_epochs=20 +eval_file=testing.py +log_level=INFO + +ip_machines=$nfs_home/configs/ip_addr_6Machines.json + +m=`cat $ip_machines | grep $(/sbin/ifconfig ens785 | grep 'inet ' | awk '{print $2}') | cut -d'"' -f2` + +# Base configs for which the gird search is done +tests=("step_configs/config_celeba_synchronous.ini") +# Learning rates +lr="0.001" +# Batch size +batchsize="8" +# The number of communication rounds per global epoch +comm_rounds_per_global_epoch="2000" +# testing every x communication rounds +procs=`expr $procs_per_machine \* $machines` +echo procs: $procs +# Celeba has 63741 samples +# Reddit has 70642 +# Femnist 734463 +# Shakespeares 3678451 +dataset_size=63741 +# Calculating the number of samples that each user/proc will have on average +samples_per_user=`expr $dataset_size / $procs` +echo samples per user: $samples_per_user + +# random_seeds for which to rerun the experiments +random_seeds=("90" "91" "92" "93" "94") +# random_seed = 97 +echo batchsize: $batchsize +echo communication rounds per global epoch: $comm_rounds_per_global_epoch +# calculating how many batches there are in a global epoch for each user/proc +batches_per_epoch=$(($samples_per_user / $batchsize)) +echo batches per global epoch: $batches_per_epoch +# the number of iterations in 25 global epochs +iterations=$($env_python -c "from math import floor; print($batches_per_epoch * $global_epochs) if $comm_rounds_per_global_epoch >= $batches_per_epoch else print($global_epochs * $comm_rounds_per_global_epoch)") +echo iterations: $iterations +# calculating the number of batches each user/proc uses per communication step (The actual number may be a float, which we round down) +batches_per_comm_round=$($env_python -c "from math import floor; x = floor($batches_per_epoch / $comm_rounds_per_global_epoch); print(1 if x==0 else x)") +# since the batches per communication round were rounded down we need to change the number of iterations to reflect that +new_iterations=$($env_python -c "from math import floor; tmp = floor($batches_per_epoch / $comm_rounds_per_global_epoch); x = 1 if tmp == 0 else tmp; y = floor((($batches_per_epoch / $comm_rounds_per_global_epoch)/x)*$iterations); print($iterations if y<$iterations else y)") +echo batches per communication round: $batches_per_comm_round +echo corrected iterations: $new_iterations +test_after=$(($new_iterations / $global_epochs)) +echo test after: $test_after +for i in "${tests[@]}" +do + for seed in "${random_seeds[@]}" + do + echo $i + IFS='_' read -ra NAMES <<< $i + IFS='.' read -ra NAME <<< ${NAMES[-1]} + log_dir=$nfs_home$logs_subfolder/${NAME[0]}:lr=$lr:r=$comm_rounds_per_global_epoch:b=$batchsize:$(date '+%Y-%m-%dT%H:%M')/machine$m + echo results are stored in: $log_dir + mkdir -p $log_dir + cp $i $config_file + # changing the config files to reflect the values of the current grid search state + $python_bin/crudini --set $config_file COMMUNICATION addresses_filepath $ip_machines + $python_bin/crudini --set $config_file OPTIMIZER_PARAMS lr $lr + $python_bin/crudini --set $config_file TRAIN_PARAMS rounds $batches_per_comm_round + $python_bin/crudini --set $config_file TRAIN_PARAMS batch_size $batchsize + $python_bin/crudini --set $config_file DATASET random_seed $seed + $env_python $eval_file -ro 0 -tea $test_after -ld $log_dir -mid $m -ps $procs_per_machine -ms $machines -is $new_iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level + echo $i is done + sleep 200 + echo end of sleep + done +done +# \ No newline at end of file diff --git a/eval/run_grid.sh b/eval/run_grid.sh index 54a6e0f..4fcddd6 100755 --- a/eval/run_grid.sh +++ b/eval/run_grid.sh @@ -18,10 +18,8 @@ # Each node needs a folder called 'tmp' in the user's home directory # # Note: -# - The script does not change the optimizer. All configs are writen to use Adam. -# For SGD these need to be changed manually -# - The script will set '--test_after' and '--train_evaluate_after' to comm_rounds_per_global_epoch, i.e., the eavaluation -# on the train set and on the test set is carried out every global epoch. +# - The script does not change the optimizer. All configs are writen to use SGD. +# - The script will set '--test_after' and '--train_evaluate_after' such that it happens at the end of a global epoch. # - The '--reset_optimizer' option is set to 0, i.e., the optimizer is not reset after a communication round (only # relevant for Adams and other optimizers with internal state) # @@ -39,7 +37,7 @@ decpy_path=$nfs_home/decentralizepy/eval cd $decpy_path env_python=$python_bin/python3 -graph=192_regular.edges +graph=96_regular.edges config_file=~/tmp/config.ini procs_per_machine=16 machines=6 @@ -62,7 +60,6 @@ batchsize=("8" "16") comm_rounds_per_global_epoch=("1" "5" "10") procs=`expr $procs_per_machine \* $machines` echo procs: $procs -# Celeba has 63741 samples dataset_size=63741 # Calculating the number of samples that each user/proc will have on average samples_per_user=`expr $dataset_size / $procs` @@ -86,6 +83,8 @@ do new_iterations=$($env_python -c "from math import floor; tmp = floor($batches_per_epoch / $r); x = 1 if tmp == 0 else tmp; y = floor((($batches_per_epoch / $r)/x)*$iterations); print($iterations if y<$iterations else y)") echo batches per communication round: $batches_per_comm_round echo corrected iterations: $new_iterations + test_after=$(($new_iterations / $global_epochs)) + echo test after: $test_after for lr in "${lrs[@]}" do for i in "${tests[@]}" @@ -102,7 +101,7 @@ do $python_bin/crudini --set $config_file OPTIMIZER_PARAMS lr $lr $python_bin/crudini --set $config_file TRAIN_PARAMS rounds $batches_per_comm_round $python_bin/crudini --set $config_file TRAIN_PARAMS batch_size $b - $env_python $eval_file -ro 0 -tea $r -ld $log_dir -mid $m -ps $procs_per_machine -ms $machines -is $new_iterations -gf $graph -ta $r -cf $config_file -ll $log_level + $env_python $eval_file -ro 0 -tea $test_after -ld $log_dir -mid $m -ps $procs_per_machine -ms $machines -is $new_iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level echo $i is done sleep 1 echo end of sleep diff --git a/eval/run_xtimes.sh b/eval/run_xtimes.sh new file mode 100755 index 0000000..3176f53 --- /dev/null +++ b/eval/run_xtimes.sh @@ -0,0 +1,115 @@ +#!/bin/bash +# Documentation +# This bash file takes three inputs. The first argument (nfs_home) is the path to the nfs home directory. +# The second one (python_bin) is the path to the python bin folder. +# The last argument (logs_subfolder) is the path to the logs folder with respect to the nfs home directory. +# +# The nfs home directory should contain the code of this framework stored in $nfs_home/decentralizepy and a folder +# called configs which contains the file 'ip_addr_6Machines.json' +# The python bin folder needs to include all the dependencies of this project including crudini. +# The results will be stored in $nfs_home/$logs_subfolder +# Each of the experiments will be stored in its own folder inside the logs_subfolder. The folder of the experiment +# starts with the last part of the config name, i.e., for 'config_celeba_topkacc.ini' it will start with topkacc. +# The name further includes the learning rate, rounds and batchsize as well as the exact date at which the experiment +# was run. +# Example: ./run_grid.sh /mnt/nfs/wigger /mnt/nfs/wigger/anaconda3/envs/sacs39/bin /logs/celeba +# +# Additional requirements: +# Each node needs a folder called 'tmp' in the user's home directory +# +# Note: +# - The script does not change the optimizer. All configs are writen to use SGD. +# - The script will set '--test_after' and '--train_evaluate_after' such that it happens at the end of a global epoch. +# - The '--reset_optimizer' option is set to 0, i.e., the optimizer is not reset after a communication round (only +# relevant for Adams and other optimizers with internal state) +# +# Addapting the script to other datasets: +# Change the variable 'dataset_size' to reflect the data sets size. +# +# Known issues: +# - If the script is started at the very end of a minute then there is a change that two folders are created as not all +# machines may start running the script at the exact same moment. + +nfs_home=$1 +python_bin=$2 +logs_subfolder=$3 +decpy_path=$nfs_home/decentralizepy/eval +cd $decpy_path + +env_python=$python_bin/python3 +graph=96_regular.edges +config_file=~/tmp/config.ini +procs_per_machine=16 +machines=6 +global_epochs=150 +eval_file=testing.py +log_level=INFO + +ip_machines=$nfs_home/configs/ip_addr_6Machines.json + +m=`cat $ip_machines | grep $(/sbin/ifconfig ens785 | grep 'inet ' | awk '{print $2}') | cut -d'"' -f2` +export PYTHONFAULTHANDLER=1 + +# Base configs for which the gird search is done +tests=("step_configs/config_femnist_partialmodel.ini" "step_configs/config_femnist_topkacc.ini" "step_configs/config_femnist_wavelet.ini") +# Learning rates +lr="0.001" +# Batch size +batchsize="16" +# The number of communication rounds per global epoch +comm_rounds_per_global_epoch="1" +procs=`expr $procs_per_machine \* $machines` +echo procs: $procs +# Celeba has 63741 samples +# Reddit has 70642 +# Femnist 734463 +# Shakespeares 3678451, subsampled 678696 +# cifar 50000 +dataset_size=734463 +# Calculating the number of samples that each user/proc will have on average +samples_per_user=`expr $dataset_size / $procs` +echo samples per user: $samples_per_user + +# random_seeds for which to rerun the experiments +random_seeds=("97") +# random_seed = 97 +echo batchsize: $batchsize +echo communication rounds per global epoch: $comm_rounds_per_global_epoch +# calculating how many batches there are in a global epoch for each user/proc +batches_per_epoch=$(($samples_per_user / $batchsize)) +echo batches per global epoch: $batches_per_epoch +# the number of iterations in 25 global epochs +iterations=$($env_python -c "from math import floor; print($batches_per_epoch * $global_epochs) if $comm_rounds_per_global_epoch >= $batches_per_epoch else print($global_epochs * $comm_rounds_per_global_epoch)") +echo iterations: $iterations +# calculating the number of batches each user/proc uses per communication step (The actual number may be a float, which we round down) +batches_per_comm_round=$($env_python -c "from math import floor; x = floor($batches_per_epoch / $comm_rounds_per_global_epoch); print(1 if x==0 else x)") +# since the batches per communication round were rounded down we need to change the number of iterations to reflect that +new_iterations=$($env_python -c "from math import floor; tmp = floor($batches_per_epoch / $comm_rounds_per_global_epoch); x = 1 if tmp == 0 else tmp; y = floor((($batches_per_epoch / $comm_rounds_per_global_epoch)/x)*$iterations); print($iterations if y<$iterations else y)") +echo batches per communication round: $batches_per_comm_round +echo corrected iterations: $new_iterations +test_after=$(($new_iterations / $global_epochs)) +echo test after: $test_after +for i in "${tests[@]}" +do + for seed in "${random_seeds[@]}" + do + echo $i + IFS='_' read -ra NAMES <<< $i + IFS='.' read -ra NAME <<< ${NAMES[-1]} + log_dir=$nfs_home$logs_subfolder/${NAME[0]}:lr=$lr:r=$comm_rounds_per_global_epoch:b=$batchsize:$(date '+%Y-%m-%dT%H:%M')/machine$m + echo results are stored in: $log_dir + mkdir -p $log_dir + cp $i $config_file + # changing the config files to reflect the values of the current grid search state + $python_bin/crudini --set $config_file COMMUNICATION addresses_filepath $ip_machines + $python_bin/crudini --set $config_file OPTIMIZER_PARAMS lr $lr + $python_bin/crudini --set $config_file TRAIN_PARAMS rounds $batches_per_comm_round + $python_bin/crudini --set $config_file TRAIN_PARAMS batch_size $batchsize + $python_bin/crudini --set $config_file DATASET random_seed $seed + $env_python $eval_file -ro 0 -tea $test_after -ld $log_dir -mid $m -ps $procs_per_machine -ms $machines -is $new_iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level + echo $i is done + sleep 200 + echo end of sleep + done +done +# \ No newline at end of file diff --git a/eval/run_xtimes_celeba.sh b/eval/run_xtimes_celeba.sh new file mode 100755 index 0000000..65c195a --- /dev/null +++ b/eval/run_xtimes_celeba.sh @@ -0,0 +1,110 @@ +#!/bin/bash +# Documentation +# This bash file takes three inputs. The first argument (nfs_home) is the path to the nfs home directory. +# The second one (python_bin) is the path to the python bin folder. +# The last argument (logs_subfolder) is the path to the logs folder with respect to the nfs home directory. +# +# The nfs home directory should contain the code of this framework stored in $nfs_home/decentralizepy and a folder +# called configs which contains the file 'ip_addr_6Machines.json' +# The python bin folder needs to include all the dependencies of this project including crudini. +# The results will be stored in $nfs_home/$logs_subfolder +# Each of the experiments will be stored in its own folder inside the logs_subfolder. The folder of the experiment +# starts with the last part of the config name, i.e., for 'config_celeba_topkacc.ini' it will start with topkacc. +# The name further includes the learning rate, rounds and batchsize as well as the exact date at which the experiment +# was run. +# Example: ./run_grid.sh /mnt/nfs/wigger /mnt/nfs/wigger/anaconda3/envs/sacs39/bin /logs/celeba +# +# Additional requirements: +# Each node needs a folder called 'tmp' in the user's home directory +# +# Note: +# - The script does not change the optimizer. All configs are writen to use SGD. +# - The script will set '--test_after' and '--train_evaluate_after' such that it happens at the end of a global epoch. +# - The '--reset_optimizer' option is set to 0, i.e., the optimizer is not reset after a communication round (only +# relevant for Adams and other optimizers with internal state) +# +# Addapting the script to other datasets: +# Change the variable 'dataset_size' to reflect the data sets size. +# +# Known issues: +# - If the script is started at the very end of a minute then there is a change that two folders are created as not all +# machines may start running the script at the exact same moment. + +nfs_home=$1 +python_bin=$2 +logs_subfolder=$3 +decpy_path=$nfs_home/decentralizepy/eval +cd $decpy_path + +env_python=$python_bin/python3 +graph=96_regular.edges +config_file=~/tmp/config.ini +procs_per_machine=16 +machines=6 +global_epochs=120 +eval_file=testing.py +log_level=INFO + +ip_machines=$nfs_home/configs/ip_addr_6Machines.json + +m=`cat $ip_machines | grep $(/sbin/ifconfig ens785 | grep 'inet ' | awk '{print $2}') | cut -d'"' -f2` +export PYTHONFAULTHANDLER=1 + +# Base configs for which the gird search is done +tests=("step_configs/config_celeba_sharing.ini" "step_configs/config_celeba_partialmodel.ini" "step_configs/config_celeba_topkacc.ini" "step_configs/config_celeba_subsampling.ini" "step_configs/config_celeba_wavelet.ini") +# Learning rates +lr="0.001" +# Batch size +batchsize="8" +# The number of communication rounds per global epoch +comm_rounds_per_global_epoch="10" +procs=`expr $procs_per_machine \* $machines` +echo procs: $procs +dataset_size=63741 +# Calculating the number of samples that each user/proc will have on average +samples_per_user=`expr $dataset_size / $procs` +echo samples per user: $samples_per_user + +# random_seeds for which to rerun the experiments +random_seeds=("90" "91" "92" "93" "94") +# random_seed = 97 +echo batchsize: $batchsize +echo communication rounds per global epoch: $comm_rounds_per_global_epoch +# calculating how many batches there are in a global epoch for each user/proc +batches_per_epoch=$(($samples_per_user / $batchsize)) +echo batches per global epoch: $batches_per_epoch +# the number of iterations in 25 global epochs +iterations=$($env_python -c "from math import floor; print($batches_per_epoch * $global_epochs) if $comm_rounds_per_global_epoch >= $batches_per_epoch else print($global_epochs * $comm_rounds_per_global_epoch)") +echo iterations: $iterations +# calculating the number of batches each user/proc uses per communication step (The actual number may be a float, which we round down) +batches_per_comm_round=$($env_python -c "from math import floor; x = floor($batches_per_epoch / $comm_rounds_per_global_epoch); print(1 if x==0 else x)") +# since the batches per communication round were rounded down we need to change the number of iterations to reflect that +new_iterations=$($env_python -c "from math import floor; tmp = floor($batches_per_epoch / $comm_rounds_per_global_epoch); x = 1 if tmp == 0 else tmp; y = floor((($batches_per_epoch / $comm_rounds_per_global_epoch)/x)*$iterations); print($iterations if y<$iterations else y)") +echo batches per communication round: $batches_per_comm_round +echo corrected iterations: $new_iterations +test_after=$(($new_iterations / $global_epochs)) +echo test after: $test_after +for i in "${tests[@]}" +do + for seed in "${random_seeds[@]}" + do + echo $i + IFS='_' read -ra NAMES <<< $i + IFS='.' read -ra NAME <<< ${NAMES[-1]} + log_dir=$nfs_home$logs_subfolder/${NAME[0]}:lr=$lr:r=$comm_rounds_per_global_epoch:b=$batchsize:$(date '+%Y-%m-%dT%H:%M')/machine$m + echo results are stored in: $log_dir + mkdir -p $log_dir + cp $i $config_file + # changing the config files to reflect the values of the current grid search state + $python_bin/crudini --set $config_file COMMUNICATION addresses_filepath $ip_machines + $python_bin/crudini --set $config_file OPTIMIZER_PARAMS lr $lr + $python_bin/crudini --set $config_file TRAIN_PARAMS rounds $batches_per_comm_round + $python_bin/crudini --set $config_file TRAIN_PARAMS batch_size $batchsize + $python_bin/crudini --set $config_file DATASET random_seed $seed + $env_python $eval_file -ro 0 -tea $test_after -ld $log_dir -mid $m -ps $procs_per_machine -ms $machines -is $new_iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level + echo $i is done + sleep 200 + echo end of sleep + done +done +# \ No newline at end of file diff --git a/eval/run_xtimes_cifar.sh b/eval/run_xtimes_cifar.sh new file mode 100755 index 0000000..1939348 --- /dev/null +++ b/eval/run_xtimes_cifar.sh @@ -0,0 +1,110 @@ +#!/bin/bash +# Documentation +# This bash file takes three inputs. The first argument (nfs_home) is the path to the nfs home directory. +# The second one (python_bin) is the path to the python bin folder. +# The last argument (logs_subfolder) is the path to the logs folder with respect to the nfs home directory. +# +# The nfs home directory should contain the code of this framework stored in $nfs_home/decentralizepy and a folder +# called configs which contains the file 'ip_addr_6Machines.json' +# The python bin folder needs to include all the dependencies of this project including crudini. +# The results will be stored in $nfs_home/$logs_subfolder +# Each of the experiments will be stored in its own folder inside the logs_subfolder. The folder of the experiment +# starts with the last part of the config name, i.e., for 'config_celeba_topkacc.ini' it will start with topkacc. +# The name further includes the learning rate, rounds and batchsize as well as the exact date at which the experiment +# was run. +# Example: ./run_grid.sh /mnt/nfs/wigger /mnt/nfs/wigger/anaconda3/envs/sacs39/bin /logs/celeba +# +# Additional requirements: +# Each node needs a folder called 'tmp' in the user's home directory +# +# Note: +# - The script does not change the optimizer. All configs are writen to use SGD. +# - The script will set '--test_after' and '--train_evaluate_after' such that it happens at the end of a global epoch. +# - The '--reset_optimizer' option is set to 0, i.e., the optimizer is not reset after a communication round (only +# relevant for Adams and other optimizers with internal state) +# +# Addapting the script to other datasets: +# Change the variable 'dataset_size' to reflect the data sets size. +# +# Known issues: +# - If the script is started at the very end of a minute then there is a change that two folders are created as not all +# machines may start running the script at the exact same moment. + +nfs_home=$1 +python_bin=$2 +logs_subfolder=$3 +decpy_path=$nfs_home/decentralizepy/eval +cd $decpy_path + +env_python=$python_bin/python3 +graph=96_regular.edges +config_file=~/tmp/config.ini +procs_per_machine=16 +machines=6 +global_epochs=300 +eval_file=testing.py +log_level=INFO + +ip_machines=$nfs_home/configs/ip_addr_6Machines.json + +m=`cat $ip_machines | grep $(/sbin/ifconfig ens785 | grep 'inet ' | awk '{print $2}') | cut -d'"' -f2` +export PYTHONFAULTHANDLER=1 + +# Base configs for which the gird search is done +tests=("step_configs/config_cifar_sharing.ini" "step_configs/config_cifar_partialmodel.ini" "step_configs/config_cifar_topkacc.ini" "step_configs/config_cifar_subsampling.ini" "step_configs/config_cifar_wavelet.ini") +# Learning rates +lr="0.01" +# Batch size +batchsize="8" +# The number of communication rounds per global epoch +comm_rounds_per_global_epoch="20" +procs=`expr $procs_per_machine \* $machines` +echo procs: $procs +dataset_size=50000 +# Calculating the number of samples that each user/proc will have on average +samples_per_user=`expr $dataset_size / $procs` +echo samples per user: $samples_per_user + +# random_seeds for which to rerun the experiments +random_seeds=("90" "91" "92" "93" "94") +# random_seed = 97 +echo batchsize: $batchsize +echo communication rounds per global epoch: $comm_rounds_per_global_epoch +# calculating how many batches there are in a global epoch for each user/proc +batches_per_epoch=$(($samples_per_user / $batchsize)) +echo batches per global epoch: $batches_per_epoch +# the number of iterations in 25 global epochs +iterations=$($env_python -c "from math import floor; print($batches_per_epoch * $global_epochs) if $comm_rounds_per_global_epoch >= $batches_per_epoch else print($global_epochs * $comm_rounds_per_global_epoch)") +echo iterations: $iterations +# calculating the number of batches each user/proc uses per communication step (The actual number may be a float, which we round down) +batches_per_comm_round=$($env_python -c "from math import floor; x = floor($batches_per_epoch / $comm_rounds_per_global_epoch); print(1 if x==0 else x)") +# since the batches per communication round were rounded down we need to change the number of iterations to reflect that +new_iterations=$($env_python -c "from math import floor; tmp = floor($batches_per_epoch / $comm_rounds_per_global_epoch); x = 1 if tmp == 0 else tmp; y = floor((($batches_per_epoch / $comm_rounds_per_global_epoch)/x)*$iterations); print($iterations if y<$iterations else y)") +echo batches per communication round: $batches_per_comm_round +echo corrected iterations: $new_iterations +test_after=$(($new_iterations / $global_epochs)) +echo test after: $test_after +for i in "${tests[@]}" +do + for seed in "${random_seeds[@]}" + do + echo $i + IFS='_' read -ra NAMES <<< $i + IFS='.' read -ra NAME <<< ${NAMES[-1]} + log_dir=$nfs_home$logs_subfolder/${NAME[0]}:lr=$lr:r=$comm_rounds_per_global_epoch:b=$batchsize:$(date '+%Y-%m-%dT%H:%M')/machine$m + echo results are stored in: $log_dir + mkdir -p $log_dir + cp $i $config_file + # changing the config files to reflect the values of the current grid search state + $python_bin/crudini --set $config_file COMMUNICATION addresses_filepath $ip_machines + $python_bin/crudini --set $config_file OPTIMIZER_PARAMS lr $lr + $python_bin/crudini --set $config_file TRAIN_PARAMS rounds $batches_per_comm_round + $python_bin/crudini --set $config_file TRAIN_PARAMS batch_size $batchsize + $python_bin/crudini --set $config_file DATASET random_seed $seed + $env_python $eval_file -ro 0 -tea $test_after -ld $log_dir -mid $m -ps $procs_per_machine -ms $machines -is $new_iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level + echo $i is done + sleep 200 + echo end of sleep + done +done +# diff --git a/eval/run_xtimes_femnist.sh b/eval/run_xtimes_femnist.sh new file mode 100755 index 0000000..4b1799b --- /dev/null +++ b/eval/run_xtimes_femnist.sh @@ -0,0 +1,110 @@ +#!/bin/bash +# Documentation +# This bash file takes three inputs. The first argument (nfs_home) is the path to the nfs home directory. +# The second one (python_bin) is the path to the python bin folder. +# The last argument (logs_subfolder) is the path to the logs folder with respect to the nfs home directory. +# +# The nfs home directory should contain the code of this framework stored in $nfs_home/decentralizepy and a folder +# called configs which contains the file 'ip_addr_6Machines.json' +# The python bin folder needs to include all the dependencies of this project including crudini. +# The results will be stored in $nfs_home/$logs_subfolder +# Each of the experiments will be stored in its own folder inside the logs_subfolder. The folder of the experiment +# starts with the last part of the config name, i.e., for 'config_celeba_topkacc.ini' it will start with topkacc. +# The name further includes the learning rate, rounds and batchsize as well as the exact date at which the experiment +# was run. +# Example: ./run_grid.sh /mnt/nfs/wigger /mnt/nfs/wigger/anaconda3/envs/sacs39/bin /logs/celeba +# +# Additional requirements: +# Each node needs a folder called 'tmp' in the user's home directory +# +# Note: +# - The script does not change the optimizer. All configs are writen to use SGD. +# - The script will set '--test_after' and '--train_evaluate_after' such that it happens at the end of a global epoch. +# - The '--reset_optimizer' option is set to 0, i.e., the optimizer is not reset after a communication round (only +# relevant for Adams and other optimizers with internal state) +# +# Addapting the script to other datasets: +# Change the variable 'dataset_size' to reflect the data sets size. +# +# Known issues: +# - If the script is started at the very end of a minute then there is a change that two folders are created as not all +# machines may start running the script at the exact same moment. + +nfs_home=$1 +python_bin=$2 +logs_subfolder=$3 +decpy_path=$nfs_home/decentralizepy/eval +cd $decpy_path + +env_python=$python_bin/python3 +graph=96_regular.edges +config_file=~/tmp/config.ini +procs_per_machine=16 +machines=6 +global_epochs=70 +eval_file=testing.py +log_level=INFO + +ip_machines=$nfs_home/configs/ip_addr_6Machines.json + +m=`cat $ip_machines | grep $(/sbin/ifconfig ens785 | grep 'inet ' | awk '{print $2}') | cut -d'"' -f2` +export PYTHONFAULTHANDLER=1 + +# Base configs for which the gird search is done +tests=("step_configs/config_femnist_sharing.ini" "step_configs/config_femnist_partialmodel.ini" "step_configs/config_femnist_topkacc.ini" "step_configs/config_femnist_subsampling.ini" "step_configs/config_femnist_wavelet.ini") +# Learning rates +lr="0.01" +# Batch size +batchsize="16" +# The number of communication rounds per global epoch +comm_rounds_per_global_epoch="10" +procs=`expr $procs_per_machine \* $machines` +echo procs: $procs +dataset_size=734463 +# Calculating the number of samples that each user/proc will have on average +samples_per_user=`expr $dataset_size / $procs` +echo samples per user: $samples_per_user + +# random_seeds for which to rerun the experiments +random_seeds=("90" "91" "92" "93" "94") +# random_seed = 97 +echo batchsize: $batchsize +echo communication rounds per global epoch: $comm_rounds_per_global_epoch +# calculating how many batches there are in a global epoch for each user/proc +batches_per_epoch=$(($samples_per_user / $batchsize)) +echo batches per global epoch: $batches_per_epoch +# the number of iterations in 25 global epochs +iterations=$($env_python -c "from math import floor; print($batches_per_epoch * $global_epochs) if $comm_rounds_per_global_epoch >= $batches_per_epoch else print($global_epochs * $comm_rounds_per_global_epoch)") +echo iterations: $iterations +# calculating the number of batches each user/proc uses per communication step (The actual number may be a float, which we round down) +batches_per_comm_round=$($env_python -c "from math import floor; x = floor($batches_per_epoch / $comm_rounds_per_global_epoch); print(1 if x==0 else x)") +# since the batches per communication round were rounded down we need to change the number of iterations to reflect that +new_iterations=$($env_python -c "from math import floor; tmp = floor($batches_per_epoch / $comm_rounds_per_global_epoch); x = 1 if tmp == 0 else tmp; y = floor((($batches_per_epoch / $comm_rounds_per_global_epoch)/x)*$iterations); print($iterations if y<$iterations else y)") +echo batches per communication round: $batches_per_comm_round +echo corrected iterations: $new_iterations +test_after=$(($new_iterations / $global_epochs)) +echo test after: $test_after +for i in "${tests[@]}" +do + for seed in "${random_seeds[@]}" + do + echo $i + IFS='_' read -ra NAMES <<< $i + IFS='.' read -ra NAME <<< ${NAMES[-1]} + log_dir=$nfs_home$logs_subfolder/${NAME[0]}:lr=$lr:r=$comm_rounds_per_global_epoch:b=$batchsize:$(date '+%Y-%m-%dT%H:%M')/machine$m + echo results are stored in: $log_dir + mkdir -p $log_dir + cp $i $config_file + # changing the config files to reflect the values of the current grid search state + $python_bin/crudini --set $config_file COMMUNICATION addresses_filepath $ip_machines + $python_bin/crudini --set $config_file OPTIMIZER_PARAMS lr $lr + $python_bin/crudini --set $config_file TRAIN_PARAMS rounds $batches_per_comm_round + $python_bin/crudini --set $config_file TRAIN_PARAMS batch_size $batchsize + $python_bin/crudini --set $config_file DATASET random_seed $seed + $env_python $eval_file -ro 0 -tea $test_after -ld $log_dir -mid $m -ps $procs_per_machine -ms $machines -is $new_iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level + echo $i is done + sleep 200 + echo end of sleep + done +done +# \ No newline at end of file diff --git a/eval/run_xtimes_reddit.sh b/eval/run_xtimes_reddit.sh new file mode 100755 index 0000000..4ecf899 --- /dev/null +++ b/eval/run_xtimes_reddit.sh @@ -0,0 +1,110 @@ +#!/bin/bash +# Documentation +# This bash file takes three inputs. The first argument (nfs_home) is the path to the nfs home directory. +# The second one (python_bin) is the path to the python bin folder. +# The last argument (logs_subfolder) is the path to the logs folder with respect to the nfs home directory. +# +# The nfs home directory should contain the code of this framework stored in $nfs_home/decentralizepy and a folder +# called configs which contains the file 'ip_addr_6Machines.json' +# The python bin folder needs to include all the dependencies of this project including crudini. +# The results will be stored in $nfs_home/$logs_subfolder +# Each of the experiments will be stored in its own folder inside the logs_subfolder. The folder of the experiment +# starts with the last part of the config name, i.e., for 'config_celeba_topkacc.ini' it will start with topkacc. +# The name further includes the learning rate, rounds and batchsize as well as the exact date at which the experiment +# was run. +# Example: ./run_grid.sh /mnt/nfs/wigger /mnt/nfs/wigger/anaconda3/envs/sacs39/bin /logs/celeba +# +# Additional requirements: +# Each node needs a folder called 'tmp' in the user's home directory +# +# Note: +# - The script does not change the optimizer. All configs are writen to use SGD. +# - The script will set '--test_after' and '--train_evaluate_after' such that it happens at the end of a global epoch. +# - The '--reset_optimizer' option is set to 0, i.e., the optimizer is not reset after a communication round (only +# relevant for Adams and other optimizers with internal state) +# +# Addapting the script to other datasets: +# Change the variable 'dataset_size' to reflect the data sets size. +# +# Known issues: +# - If the script is started at the very end of a minute then there is a change that two folders are created as not all +# machines may start running the script at the exact same moment. + +nfs_home=$1 +python_bin=$2 +logs_subfolder=$3 +decpy_path=$nfs_home/decentralizepy/eval +cd $decpy_path + +env_python=$python_bin/python3 +graph=96_regular.edges +config_file=~/tmp/config.ini +procs_per_machine=16 +machines=6 +global_epochs=50 +eval_file=testing.py +log_level=INFO + +ip_machines=$nfs_home/configs/ip_addr_6Machines.json + +m=`cat $ip_machines | grep $(/sbin/ifconfig ens785 | grep 'inet ' | awk '{print $2}') | cut -d'"' -f2` +export PYTHONFAULTHANDLER=1 + +# Base configs for which the gird search is done +tests=("step_configs/config_reddit_sharing.ini" "step_configs/config_reddit_partialmodel.ini" "step_configs/config_reddit_topkacc.ini" "step_configs/config_reddit_subsampling.ini" "step_configs/config_reddit_wavelet.ini") +# Learning rates +lr="1" +# Batch size +batchsize="16" +# The number of communication rounds per global epoch +comm_rounds_per_global_epoch="10" +procs=`expr $procs_per_machine \* $machines` +echo procs: $procs +dataset_size=70642 +# Calculating the number of samples that each user/proc will have on average +samples_per_user=`expr $dataset_size / $procs` +echo samples per user: $samples_per_user + +# random_seeds for which to rerun the experiments +random_seeds=("90" "91" "92" "93" "94") +# random_seed = 97 +echo batchsize: $batchsize +echo communication rounds per global epoch: $comm_rounds_per_global_epoch +# calculating how many batches there are in a global epoch for each user/proc +batches_per_epoch=$(($samples_per_user / $batchsize)) +echo batches per global epoch: $batches_per_epoch +# the number of iterations in 25 global epochs +iterations=$($env_python -c "from math import floor; print($batches_per_epoch * $global_epochs) if $comm_rounds_per_global_epoch >= $batches_per_epoch else print($global_epochs * $comm_rounds_per_global_epoch)") +echo iterations: $iterations +# calculating the number of batches each user/proc uses per communication step (The actual number may be a float, which we round down) +batches_per_comm_round=$($env_python -c "from math import floor; x = floor($batches_per_epoch / $comm_rounds_per_global_epoch); print(1 if x==0 else x)") +# since the batches per communication round were rounded down we need to change the number of iterations to reflect that +new_iterations=$($env_python -c "from math import floor; tmp = floor($batches_per_epoch / $comm_rounds_per_global_epoch); x = 1 if tmp == 0 else tmp; y = floor((($batches_per_epoch / $comm_rounds_per_global_epoch)/x)*$iterations); print($iterations if y<$iterations else y)") +echo batches per communication round: $batches_per_comm_round +echo corrected iterations: $new_iterations +test_after=$(($new_iterations / $global_epochs)) +echo test after: $test_after +for i in "${tests[@]}" +do + for seed in "${random_seeds[@]}" + do + echo $i + IFS='_' read -ra NAMES <<< $i + IFS='.' read -ra NAME <<< ${NAMES[-1]} + log_dir=$nfs_home$logs_subfolder/${NAME[0]}:lr=$lr:r=$comm_rounds_per_global_epoch:b=$batchsize:$(date '+%Y-%m-%dT%H:%M')/machine$m + echo results are stored in: $log_dir + mkdir -p $log_dir + cp $i $config_file + # changing the config files to reflect the values of the current grid search state + $python_bin/crudini --set $config_file COMMUNICATION addresses_filepath $ip_machines + $python_bin/crudini --set $config_file OPTIMIZER_PARAMS lr $lr + $python_bin/crudini --set $config_file TRAIN_PARAMS rounds $batches_per_comm_round + $python_bin/crudini --set $config_file TRAIN_PARAMS batch_size $batchsize + $python_bin/crudini --set $config_file DATASET random_seed $seed + $env_python $eval_file -ro 0 -tea $test_after -ld $log_dir -mid $m -ps $procs_per_machine -ms $machines -is $new_iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level + echo $i is done + sleep 200 + echo end of sleep + done +done +# \ No newline at end of file diff --git a/eval/run_xtimes_shakespeare.sh b/eval/run_xtimes_shakespeare.sh new file mode 100755 index 0000000..1fd7b20 --- /dev/null +++ b/eval/run_xtimes_shakespeare.sh @@ -0,0 +1,110 @@ +#!/bin/bash +# Documentation +# This bash file takes three inputs. The first argument (nfs_home) is the path to the nfs home directory. +# The second one (python_bin) is the path to the python bin folder. +# The last argument (logs_subfolder) is the path to the logs folder with respect to the nfs home directory. +# +# The nfs home directory should contain the code of this framework stored in $nfs_home/decentralizepy and a folder +# called configs which contains the file 'ip_addr_6Machines.json' +# The python bin folder needs to include all the dependencies of this project including crudini. +# The results will be stored in $nfs_home/$logs_subfolder +# Each of the experiments will be stored in its own folder inside the logs_subfolder. The folder of the experiment +# starts with the last part of the config name, i.e., for 'config_celeba_topkacc.ini' it will start with topkacc. +# The name further includes the learning rate, rounds and batchsize as well as the exact date at which the experiment +# was run. +# Example: ./run_grid.sh /mnt/nfs/wigger /mnt/nfs/wigger/anaconda3/envs/sacs39/bin /logs/celeba +# +# Additional requirements: +# Each node needs a folder called 'tmp' in the user's home directory +# +# Note: +# - The script does not change the optimizer. All configs are writen to use SGD. +# - The script will set '--test_after' and '--train_evaluate_after' such that it happens at the end of a global epoch. +# - The '--reset_optimizer' option is set to 0, i.e., the optimizer is not reset after a communication round (only +# relevant for Adams and other optimizers with internal state) +# +# Addapting the script to other datasets: +# Change the variable 'dataset_size' to reflect the data sets size. +# +# Known issues: +# - If the script is started at the very end of a minute then there is a change that two folders are created as not all +# machines may start running the script at the exact same moment. + +nfs_home=$1 +python_bin=$2 +logs_subfolder=$3 +decpy_path=$nfs_home/decentralizepy/eval +cd $decpy_path + +env_python=$python_bin/python3 +graph=96_regular.edges +config_file=~/tmp/config.ini +procs_per_machine=16 +machines=6 +global_epochs=80 +eval_file=testing.py +log_level=INFO + +ip_machines=$nfs_home/configs/ip_addr_6Machines.json + +m=`cat $ip_machines | grep $(/sbin/ifconfig ens785 | grep 'inet ' | awk '{print $2}') | cut -d'"' -f2` +export PYTHONFAULTHANDLER=1 + +# Base configs for which the gird search is done +tests=("step_configs/config_shakespeare_sharing.ini" "step_configs/config_shakespeare_partialmodel.ini" "step_configs/config_shakespeare_topkacc.ini" "step_configs/config_shakespeare_subsampling.ini" "step_configs/config_shakespeare_wavelet.ini") +# Learning rates +lr="0.5" +# Batch size +batchsize="16" +# The number of communication rounds per global epoch +comm_rounds_per_global_epoch="25" +procs=`expr $procs_per_machine \* $machines` +echo procs: $procs +dataset_size=678696 +# Calculating the number of samples that each user/proc will have on average +samples_per_user=`expr $dataset_size / $procs` +echo samples per user: $samples_per_user + +# random_seeds for which to rerun the experiments +random_seeds=("90" "91" "92" "93" "94") +# random_seed = 97 +echo batchsize: $batchsize +echo communication rounds per global epoch: $comm_rounds_per_global_epoch +# calculating how many batches there are in a global epoch for each user/proc +batches_per_epoch=$(($samples_per_user / $batchsize)) +echo batches per global epoch: $batches_per_epoch +# the number of iterations in 25 global epochs +iterations=$($env_python -c "from math import floor; print($batches_per_epoch * $global_epochs) if $comm_rounds_per_global_epoch >= $batches_per_epoch else print($global_epochs * $comm_rounds_per_global_epoch)") +echo iterations: $iterations +# calculating the number of batches each user/proc uses per communication step (The actual number may be a float, which we round down) +batches_per_comm_round=$($env_python -c "from math import floor; x = floor($batches_per_epoch / $comm_rounds_per_global_epoch); print(1 if x==0 else x)") +# since the batches per communication round were rounded down we need to change the number of iterations to reflect that +new_iterations=$($env_python -c "from math import floor; tmp = floor($batches_per_epoch / $comm_rounds_per_global_epoch); x = 1 if tmp == 0 else tmp; y = floor((($batches_per_epoch / $comm_rounds_per_global_epoch)/x)*$iterations); print($iterations if y<$iterations else y)") +echo batches per communication round: $batches_per_comm_round +echo corrected iterations: $new_iterations +test_after=$(($new_iterations / $global_epochs)) +echo test after: $test_after +for i in "${tests[@]}" +do + for seed in "${random_seeds[@]}" + do + echo $i + IFS='_' read -ra NAMES <<< $i + IFS='.' read -ra NAME <<< ${NAMES[-1]} + log_dir=$nfs_home$logs_subfolder/${NAME[0]}:lr=$lr:r=$comm_rounds_per_global_epoch:b=$batchsize:$(date '+%Y-%m-%dT%H:%M')/machine$m + echo results are stored in: $log_dir + mkdir -p $log_dir + cp $i $config_file + # changing the config files to reflect the values of the current grid search state + $python_bin/crudini --set $config_file COMMUNICATION addresses_filepath $ip_machines + $python_bin/crudini --set $config_file OPTIMIZER_PARAMS lr $lr + $python_bin/crudini --set $config_file TRAIN_PARAMS rounds $batches_per_comm_round + $python_bin/crudini --set $config_file TRAIN_PARAMS batch_size $batchsize + $python_bin/crudini --set $config_file DATASET random_seed $seed + $env_python $eval_file -ro 0 -tea $test_after -ld $log_dir -mid $m -ps $procs_per_machine -ms $machines -is $new_iterations -gf $graph -ta $test_after -cf $config_file -ll $log_level + echo $i is done + sleep 500 + echo end of sleep + done +done +# \ No newline at end of file diff --git a/eval/step_configs/config_celeba_fft.ini b/eval/step_configs/config_celeba_fft.ini index e8d6a70..709ecf5 100644 --- a/eval/step_configs/config_celeba_fft.ini +++ b/eval/step_configs/config_celeba_fft.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] diff --git a/eval/step_configs/config_celeba_grow.ini b/eval/step_configs/config_celeba_grow.ini index 37e74ae..0d44862 100644 --- a/eval/step_configs/config_celeba_grow.ini +++ b/eval/step_configs/config_celeba_grow.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] diff --git a/eval/step_configs/config_celeba_manualadapt.ini b/eval/step_configs/config_celeba_manualadapt.ini index 1c117e2..2ffa9fc 100644 --- a/eval/step_configs/config_celeba_manualadapt.ini +++ b/eval/step_configs/config_celeba_manualadapt.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] diff --git a/eval/step_configs/config_celeba_partialmodel.ini b/eval/step_configs/config_celeba_partialmodel.ini index 6c9a4b5..df2e8f2 100644 --- a/eval/step_configs/config_celeba_partialmodel.ini +++ b/eval/step_configs/config_celeba_partialmodel.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] @@ -31,3 +31,4 @@ addresses_filepath = ip_addr_6Machines.json [SHARING] sharing_package = decentralizepy.sharing.PartialModel sharing_class = PartialModel +alpha = 0.1 \ No newline at end of file diff --git a/eval/step_configs/config_celeba_randomalpha.ini b/eval/step_configs/config_celeba_randomalpha.ini index 1c4b989..3aed16b 100644 --- a/eval/step_configs/config_celeba_randomalpha.ini +++ b/eval/step_configs/config_celeba_randomalpha.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] diff --git a/eval/step_configs/config_celeba_randomalphainc.ini b/eval/step_configs/config_celeba_randomalphainc.ini index 5171b64..df7654c 100644 --- a/eval/step_configs/config_celeba_randomalphainc.ini +++ b/eval/step_configs/config_celeba_randomalphainc.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] diff --git a/eval/step_configs/config_celeba_roundrobin.ini b/eval/step_configs/config_celeba_roundrobin.ini index 3dadf32..0c8df52 100644 --- a/eval/step_configs/config_celeba_roundrobin.ini +++ b/eval/step_configs/config_celeba_roundrobin.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] diff --git a/eval/step_configs/config_celeba_sharing.ini b/eval/step_configs/config_celeba_sharing.ini index caf05fa..8193f77 100644 --- a/eval/step_configs/config_celeba_sharing.ini +++ b/eval/step_configs/config_celeba_sharing.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] diff --git a/eval/step_configs/config_celeba_subsampling.ini b/eval/step_configs/config_celeba_subsampling.ini index b806898..f519dbb 100644 --- a/eval/step_configs/config_celeba_subsampling.ini +++ b/eval/step_configs/config_celeba_subsampling.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] diff --git a/eval/step_configs/config_celeba_synchronous.ini b/eval/step_configs/config_celeba_synchronous.ini new file mode 100644 index 0000000..bf60e17 --- /dev/null +++ b/eval/step_configs/config_celeba_synchronous.ini @@ -0,0 +1,33 @@ +[DATASET] +dataset_package = decentralizepy.datasets.Celeba +dataset_class = Celeba +model_class = CNN +images_dir = /mnt/nfs/shared/leaf/data/celeba/data/raw/img_align_celeba +train_dir = /mnt/nfs/shared/leaf/data/celeba/per_user_data/train +test_dir = /mnt/nfs/shared/leaf/data/celeba/data/test +; python list of fractions below +sizes = + +[OPTIMIZER_PARAMS] +optimizer_package = torch.optim +optimizer_class = SGD +lr = 0.001 + +[TRAIN_PARAMS] +training_package = decentralizepy.training.Training +training_class = Training +rounds = 4 +full_epochs = False +batch_size = 16 +shuffle = True +loss_package = torch.nn +loss_class = CrossEntropyLoss + +[COMMUNICATION] +comm_package = decentralizepy.communication.TCP +comm_class = TCP +addresses_filepath = ip_addr_6Machines.json + +[SHARING] +sharing_package = decentralizepy.sharing.Synchronous +sharing_class = Synchronous \ No newline at end of file diff --git a/eval/step_configs/config_celeba_topkacc.ini b/eval/step_configs/config_celeba_topkacc.ini index 89eef29..4207f83 100644 --- a/eval/step_configs/config_celeba_topkacc.ini +++ b/eval/step_configs/config_celeba_topkacc.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] @@ -33,3 +33,4 @@ sharing_package = decentralizepy.sharing.PartialModel sharing_class = PartialModel alpha = 0.1 accumulation = True +accumulate_averaging_changes = True \ No newline at end of file diff --git a/eval/step_configs/config_celeba_topkacc_norm.ini b/eval/step_configs/config_celeba_topkacc_norm.ini new file mode 100644 index 0000000..fb03971 --- /dev/null +++ b/eval/step_configs/config_celeba_topkacc_norm.ini @@ -0,0 +1,36 @@ +[DATASET] +dataset_package = decentralizepy.datasets.Celeba +dataset_class = Celeba +model_class = CNN +images_dir = /mnt/nfs/shared/leaf/data/celeba/data/raw/img_align_celeba +train_dir = /mnt/nfs/shared/leaf/data/celeba/per_user_data/train +test_dir = /mnt/nfs/shared/leaf/data/celeba/data/test +; python list of fractions below +sizes = + +[OPTIMIZER_PARAMS] +optimizer_package = torch.optim +optimizer_class = SGD +lr = 0.001 + +[TRAIN_PARAMS] +training_package = decentralizepy.training.Training +training_class = Training +rounds = 4 +full_epochs = False +batch_size = 16 +shuffle = True +loss_package = torch.nn +loss_class = CrossEntropyLoss + +[COMMUNICATION] +comm_package = decentralizepy.communication.TCP +comm_class = TCP +addresses_filepath = ip_addr_6Machines.json + +[SHARING] +sharing_package = decentralizepy.sharing.TopKNormalized +sharing_class = TopKNormalized +alpha = 0.1 +accumulation = True +accumulate_averaging_changes = True \ No newline at end of file diff --git a/eval/step_configs/config_celeba_topkaccbound.ini b/eval/step_configs/config_celeba_topkaccbound.ini new file mode 100644 index 0000000..088c4b8 --- /dev/null +++ b/eval/step_configs/config_celeba_topkaccbound.ini @@ -0,0 +1,38 @@ +[DATASET] +dataset_package = decentralizepy.datasets.Celeba +dataset_class = Celeba +model_class = CNN +images_dir = /mnt/nfs/shared/leaf/data/celeba/data/raw/img_align_celeba +train_dir = /mnt/nfs/shared/leaf/data/celeba/per_user_data/train +test_dir = /mnt/nfs/shared/leaf/data/celeba/data/test +; python list of fractions below +sizes = + +[OPTIMIZER_PARAMS] +optimizer_package = torch.optim +optimizer_class = SGD +lr = 0.001 + +[TRAIN_PARAMS] +training_package = decentralizepy.training.Training +training_class = Training +rounds = 4 +full_epochs = False +batch_size = 16 +shuffle = True +loss_package = torch.nn +loss_class = CrossEntropyLoss + +[COMMUNICATION] +comm_package = decentralizepy.communication.TCP +comm_class = TCP +addresses_filepath = ip_addr_6Machines.json + +[SHARING] +sharing_package = decentralizepy.sharing.LowerBoundTopK +sharing_class = LowerBoundTopK +lower_bound = 0.1 +alpha = 0.1 +metro_hastings = False +accumulation = True +accumulate_averaging_changes = True \ No newline at end of file diff --git a/eval/step_configs/config_celeba_topkparam.ini b/eval/step_configs/config_celeba_topkparam.ini index babc3e9..24be2d7 100644 --- a/eval/step_configs/config_celeba_topkparam.ini +++ b/eval/step_configs/config_celeba_topkparam.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] diff --git a/eval/step_configs/config_celeba_topkrandom.ini b/eval/step_configs/config_celeba_topkrandom.ini index 7674955..8c9d032 100644 --- a/eval/step_configs/config_celeba_topkrandom.ini +++ b/eval/step_configs/config_celeba_topkrandom.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] diff --git a/eval/step_configs/config_celeba_wavelet.ini b/eval/step_configs/config_celeba_wavelet.ini index 1c97eb9..cb5661a 100644 --- a/eval/step_configs/config_celeba_wavelet.ini +++ b/eval/step_configs/config_celeba_wavelet.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] @@ -36,3 +36,4 @@ alpha = 0.1 wavelet=sym2 level= 4 accumulation = True +accumulate_averaging_changes = True diff --git a/eval/step_configs/config_cifar_SharingWithRWAsyncDynamic.ini b/eval/step_configs/config_cifar_SharingWithRWAsyncDynamic.ini new file mode 100644 index 0000000..a41d038 --- /dev/null +++ b/eval/step_configs/config_cifar_SharingWithRWAsyncDynamic.ini @@ -0,0 +1,36 @@ +[DATASET] +dataset_package = decentralizepy.datasets.CIFAR10 +dataset_class = CIFAR10 +model_class = LeNet +train_dir = /mnt/nfs/shared/CIFAR +test_dir = /mnt/nfs/shared/CIFAR +; python list of fractions below +sizes = +random_seed = 99 +partition_niid = True +shards = 1 + +[OPTIMIZER_PARAMS] +optimizer_package = torch.optim +optimizer_class = SGD +lr = 0.001 + +[TRAIN_PARAMS] +training_package = decentralizepy.training.Training +training_class = Training +rounds = 65 +full_epochs = False +batch_size = 8 +shuffle = True +loss_package = torch.nn +loss_class = CrossEntropyLoss + +[COMMUNICATION] +comm_package = decentralizepy.communication.TCPRandomWalkRouting +comm_class = TCPRandomWalkRouting +addresses_filepath = ip_addr_6Machines.json +sampler = equi + +[SHARING] +sharing_package = decentralizepy.sharing.SharingWithRWAsyncDynamic +sharing_class = SharingWithRWAsyncDynamic \ No newline at end of file diff --git a/eval/step_configs/config_CIFAR_topk.ini b/eval/step_configs/config_cifar_partialmodel.ini similarity index 88% rename from eval/step_configs/config_CIFAR_topk.ini rename to eval/step_configs/config_cifar_partialmodel.ini index 248e316..9f7a3af 100644 --- a/eval/step_configs/config_CIFAR_topk.ini +++ b/eval/step_configs/config_cifar_partialmodel.ini @@ -5,19 +5,20 @@ model_class = LeNet train_dir = /mnt/nfs/shared/CIFAR test_dir = /mnt/nfs/shared/CIFAR ; python list of fractions below -sizes = -random_seed = 91 +sizes = +random_seed = 99 partition_niid = True +shards = 4 [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] training_package = decentralizepy.training.Training training_class = Training -rounds = 5 +rounds = 65 full_epochs = False batch_size = 8 shuffle = True @@ -32,5 +33,4 @@ addresses_filepath = ip_addr_6Machines.json [SHARING] sharing_package = decentralizepy.sharing.PartialModel sharing_class = PartialModel -alpha = 0.5 -accumulation = True +alpha=0.5 diff --git a/eval/step_configs/config_CIFAR.ini b/eval/step_configs/config_cifar_sharing.ini similarity index 92% rename from eval/step_configs/config_CIFAR.ini rename to eval/step_configs/config_cifar_sharing.ini index 75ffbe8..8df88c5 100644 --- a/eval/step_configs/config_CIFAR.ini +++ b/eval/step_configs/config_cifar_sharing.ini @@ -5,20 +5,20 @@ model_class = LeNet train_dir = /mnt/nfs/shared/CIFAR test_dir = /mnt/nfs/shared/CIFAR ; python list of fractions below -sizes = -random_seed = 91 +sizes = +random_seed = 99 partition_niid = True shards = 4 [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] training_package = decentralizepy.training.Training training_class = Training -rounds = 5 +rounds = 65 full_epochs = False batch_size = 8 shuffle = True diff --git a/eval/step_configs/config_cifar_subsampling.ini b/eval/step_configs/config_cifar_subsampling.ini new file mode 100644 index 0000000..60b4176 --- /dev/null +++ b/eval/step_configs/config_cifar_subsampling.ini @@ -0,0 +1,36 @@ +[DATASET] +dataset_package = decentralizepy.datasets.CIFAR10 +dataset_class = CIFAR10 +model_class = LeNet +train_dir = /mnt/nfs/shared/CIFAR +test_dir = /mnt/nfs/shared/CIFAR +; python list of fractions below +sizes = +random_seed = 99 +partition_niid = True +shards = 4 + +[OPTIMIZER_PARAMS] +optimizer_package = torch.optim +optimizer_class = SGD +lr = 0.001 + +[TRAIN_PARAMS] +training_package = decentralizepy.training.Training +training_class = Training +rounds = 65 +full_epochs = False +batch_size = 8 +shuffle = True +loss_package = torch.nn +loss_class = CrossEntropyLoss + +[COMMUNICATION] +comm_package = decentralizepy.communication.TCP +comm_class = TCP +addresses_filepath = ip_addr_6Machines.json + +[SHARING] +sharing_package = decentralizepy.sharing.SubSampling +sharing_class = SubSampling +alpha = 0.5 diff --git a/eval/step_configs/config_cifar_topkacc.ini b/eval/step_configs/config_cifar_topkacc.ini new file mode 100644 index 0000000..82c4e41 --- /dev/null +++ b/eval/step_configs/config_cifar_topkacc.ini @@ -0,0 +1,38 @@ +[DATASET] +dataset_package = decentralizepy.datasets.CIFAR10 +dataset_class = CIFAR10 +model_class = LeNet +train_dir = /mnt/nfs/shared/CIFAR +test_dir = /mnt/nfs/shared/CIFAR +; python list of fractions below +sizes = +random_seed = 99 +partition_niid = True +shards = 4 + +[OPTIMIZER_PARAMS] +optimizer_package = torch.optim +optimizer_class = SGD +lr = 0.001 + +[TRAIN_PARAMS] +training_package = decentralizepy.training.Training +training_class = Training +rounds = 65 +full_epochs = False +batch_size = 8 +shuffle = True +loss_package = torch.nn +loss_class = CrossEntropyLoss + +[COMMUNICATION] +comm_package = decentralizepy.communication.TCP +comm_class = TCP +addresses_filepath = ip_addr_6Machines.json + +[SHARING] +sharing_package = decentralizepy.sharing.PartialModel +sharing_class = PartialModel +alpha = 0.5 +accumulation = True +accumulate_averaging_changes = True \ No newline at end of file diff --git a/eval/step_configs/config_CIFAR_wavelet.ini b/eval/step_configs/config_cifar_wavelet.ini similarity index 89% rename from eval/step_configs/config_CIFAR_wavelet.ini rename to eval/step_configs/config_cifar_wavelet.ini index 19fc611..a5c2c1f 100644 --- a/eval/step_configs/config_CIFAR_wavelet.ini +++ b/eval/step_configs/config_cifar_wavelet.ini @@ -5,20 +5,20 @@ model_class = LeNet train_dir = /mnt/nfs/shared/CIFAR test_dir = /mnt/nfs/shared/CIFAR ; python list of fractions below -sizes = -random_seed = 91 +sizes = +random_seed = 99 partition_niid = True shards = 4 [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] training_package = decentralizepy.training.Training training_class = Training -rounds = 5 +rounds = 65 full_epochs = False batch_size = 8 shuffle = True @@ -38,3 +38,4 @@ alpha = 0.5 wavelet=sym2 level= 4 accumulation = True +accumulate_averaging_changes = True diff --git a/eval/step_configs/config_femnist_fft.ini b/eval/step_configs/config_femnist_fft.ini index afac1f4..7d52d93 100644 --- a/eval/step_configs/config_femnist_fft.ini +++ b/eval/step_configs/config_femnist_fft.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 # There are 734463 femnist samples diff --git a/eval/step_configs/config_femnist_grow.ini b/eval/step_configs/config_femnist_grow.ini index 2a779c4..a9e3ead 100644 --- a/eval/step_configs/config_femnist_grow.ini +++ b/eval/step_configs/config_femnist_grow.ini @@ -9,7 +9,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] diff --git a/eval/step_configs/config_femnist_partialmodel.ini b/eval/step_configs/config_femnist_partialmodel.ini index de4f1ce..3d66b6b 100644 --- a/eval/step_configs/config_femnist_partialmodel.ini +++ b/eval/step_configs/config_femnist_partialmodel.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] diff --git a/eval/step_configs/config_femnist_sharing.ini b/eval/step_configs/config_femnist_sharing.ini index e1af10b..559dd6e 100644 --- a/eval/step_configs/config_femnist_sharing.ini +++ b/eval/step_configs/config_femnist_sharing.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] diff --git a/eval/step_configs/config_femnist_subsampling.ini b/eval/step_configs/config_femnist_subsampling.ini index 61a1e9a..09707ee 100644 --- a/eval/step_configs/config_femnist_subsampling.ini +++ b/eval/step_configs/config_femnist_subsampling.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 # There are 734463 femnist samples diff --git a/eval/step_configs/config_femnist_topkacc.ini b/eval/step_configs/config_femnist_topkacc.ini index c9155d1..d279f9b 100644 --- a/eval/step_configs/config_femnist_topkacc.ini +++ b/eval/step_configs/config_femnist_topkacc.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 # There are 734463 femnist samples @@ -33,4 +33,5 @@ addresses_filepath = ip_addr_6Machines.json sharing_package = decentralizepy.sharing.PartialModel sharing_class = PartialModel alpha = 0.1 -accumulation = True \ No newline at end of file +accumulation = True +accumulate_averaging_changes = True \ No newline at end of file diff --git a/eval/step_configs/config_femnist_topkaccbound.ini b/eval/step_configs/config_femnist_topkaccbound.ini new file mode 100644 index 0000000..4f4d689 --- /dev/null +++ b/eval/step_configs/config_femnist_topkaccbound.ini @@ -0,0 +1,38 @@ +[DATASET] +dataset_package = decentralizepy.datasets.Femnist +dataset_class = Femnist +random_seed = 97 +model_class = CNN +train_dir = /mnt/nfs/shared/leaf/data/femnist/per_user_data/train +test_dir = /mnt/nfs/shared/leaf/data/femnist/data/test +; python list of fractions below +sizes = + +[OPTIMIZER_PARAMS] +optimizer_package = torch.optim +optimizer_class = SGD +lr = 0.001 + +# There are 734463 femnist samples +[TRAIN_PARAMS] +training_package = decentralizepy.training.Training +training_class = Training +rounds = 47 +full_epochs = False +batch_size = 16 +shuffle = True +loss_package = torch.nn +loss_class = CrossEntropyLoss + +[COMMUNICATION] +comm_package = decentralizepy.communication.TCP +comm_class = TCP +addresses_filepath = ip_addr_6Machines.json + +[SHARING] +sharing_package = decentralizepy.sharing.LowerBoundTopK +sharing_class = LowerBoundTopK +lower_bound = 0.1 +alpha = 0.1 +accumulation = True +accumulate_averaging_changes = True \ No newline at end of file diff --git a/eval/step_configs/config_femnist_topkparam.ini b/eval/step_configs/config_femnist_topkparam.ini index ada3c3f..d03f0c2 100644 --- a/eval/step_configs/config_femnist_topkparam.ini +++ b/eval/step_configs/config_femnist_topkparam.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 # There are 734463 femnist samples diff --git a/eval/step_configs/config_femnist_wavelet.ini b/eval/step_configs/config_femnist_wavelet.ini index b6ff278..84ef4ff 100644 --- a/eval/step_configs/config_femnist_wavelet.ini +++ b/eval/step_configs/config_femnist_wavelet.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 # There are 734463 femnist samples @@ -37,3 +37,4 @@ alpha = 0.1 wavelet=sym2 level= 4 accumulation = True +accumulate_averaging_changes = True diff --git a/eval/step_configs/config_reddit_partialmodel.ini b/eval/step_configs/config_reddit_partialmodel.ini new file mode 100644 index 0000000..fc4b3c3 --- /dev/null +++ b/eval/step_configs/config_reddit_partialmodel.ini @@ -0,0 +1,34 @@ +[DATASET] +dataset_package = decentralizepy.datasets.Reddit +dataset_class = Reddit +random_seed = 97 +model_class = RNN +train_dir = /mnt/nfs/shared/leaf/data/reddit_new/per_user_data/train +test_dir = /mnt/nfs/shared/leaf/data/reddit_new/new_small_data/test +; python list of fractions below +sizes = + +[OPTIMIZER_PARAMS] +optimizer_package = torch.optim +optimizer_class = SGD +lr = 0.001 + +[TRAIN_PARAMS] +training_package = decentralizepy.training.Training +training_class = Training +rounds = 47 +full_epochs = False +batch_size = 16 +shuffle = True +loss_package = torch.nn +loss_class = CrossEntropyLoss + +[COMMUNICATION] +comm_package = decentralizepy.communication.TCP +comm_class = TCP +addresses_filepath = ip_addr_6Machines.json + +[SHARING] +sharing_package = decentralizepy.sharing.PartialModel +sharing_class = PartialModel +alpha = 0.1 diff --git a/eval/step_configs/config_reddit_sharing.ini b/eval/step_configs/config_reddit_sharing.ini index 0aa4af0..2404478 100644 --- a/eval/step_configs/config_reddit_sharing.ini +++ b/eval/step_configs/config_reddit_sharing.ini @@ -10,7 +10,7 @@ sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.001 [TRAIN_PARAMS] diff --git a/eval/step_configs/config_reddit_subsampling.ini b/eval/step_configs/config_reddit_subsampling.ini new file mode 100644 index 0000000..61c8a8a --- /dev/null +++ b/eval/step_configs/config_reddit_subsampling.ini @@ -0,0 +1,34 @@ +[DATASET] +dataset_package = decentralizepy.datasets.Reddit +dataset_class = Reddit +random_seed = 97 +model_class = RNN +train_dir = /mnt/nfs/shared/leaf/data/reddit_new/per_user_data/train +test_dir = /mnt/nfs/shared/leaf/data/reddit_new/new_small_data/test +; python list of fractions below +sizes = + +[OPTIMIZER_PARAMS] +optimizer_package = torch.optim +optimizer_class = SGD +lr = 0.001 + +[TRAIN_PARAMS] +training_package = decentralizepy.training.Training +training_class = Training +rounds = 4 +full_epochs = False +batch_size = 16 +shuffle = True +loss_package = torch.nn +loss_class = CrossEntropyLoss + +[COMMUNICATION] +comm_package = decentralizepy.communication.TCP +comm_class = TCP +addresses_filepath = ip_addr_6Machines.json + +[SHARING] +sharing_package = decentralizepy.sharing.SubSampling +sharing_class = SubSampling +alpha = 0.1 diff --git a/eval/step_configs/config_reddit_topkacc.ini b/eval/step_configs/config_reddit_topkacc.ini new file mode 100644 index 0000000..249403b --- /dev/null +++ b/eval/step_configs/config_reddit_topkacc.ini @@ -0,0 +1,36 @@ +[DATASET] +dataset_package = decentralizepy.datasets.Reddit +dataset_class = Reddit +random_seed = 97 +model_class = RNN +train_dir = /mnt/nfs/shared/leaf/data/reddit_new/per_user_data/train +test_dir = /mnt/nfs/shared/leaf/data/reddit_new/new_small_data/test +; python list of fractions below +sizes = + +[OPTIMIZER_PARAMS] +optimizer_package = torch.optim +optimizer_class = SGD +lr = 0.001 + +[TRAIN_PARAMS] +training_package = decentralizepy.training.Training +training_class = Training +rounds = 47 +full_epochs = False +batch_size = 16 +shuffle = True +loss_package = torch.nn +loss_class = CrossEntropyLoss + +[COMMUNICATION] +comm_package = decentralizepy.communication.TCP +comm_class = TCP +addresses_filepath = ip_addr_6Machines.json + +[SHARING] +sharing_package = decentralizepy.sharing.PartialModel +sharing_class = PartialModel +alpha = 0.1 +accumulation = True +accumulate_averaging_changes = True \ No newline at end of file diff --git a/eval/step_configs/config_reddit_wavelet.ini b/eval/step_configs/config_reddit_wavelet.ini new file mode 100644 index 0000000..13a57d6 --- /dev/null +++ b/eval/step_configs/config_reddit_wavelet.ini @@ -0,0 +1,39 @@ +[DATASET] +dataset_package = decentralizepy.datasets.Reddit +dataset_class = Reddit +random_seed = 97 +model_class = RNN +train_dir = /mnt/nfs/shared/leaf/data/reddit_new/per_user_data/train +test_dir = /mnt/nfs/shared/leaf/data/reddit_new/new_small_data/test +; python list of fractions below +sizes = + +[OPTIMIZER_PARAMS] +optimizer_package = torch.optim +optimizer_class = SGD +lr = 0.001 + +[TRAIN_PARAMS] +training_package = decentralizepy.training.Training +training_class = Training +rounds = 47 +full_epochs = False +batch_size = 16 +shuffle = True +loss_package = torch.nn +loss_class = CrossEntropyLoss + +[COMMUNICATION] +comm_package = decentralizepy.communication.TCP +comm_class = TCP +addresses_filepath = ip_addr_6Machines.json + +[SHARING] +sharing_package = decentralizepy.sharing.Wavelet +sharing_class = Wavelet +change_based_selection = True +alpha = 0.1 +wavelet=sym2 +level= 4 +accumulation = True +accumulate_averaging_changes = True diff --git a/eval/step_configs/config_shakespeare_partialmodel.ini b/eval/step_configs/config_shakespeare_partialmodel.ini new file mode 100644 index 0000000..453815f --- /dev/null +++ b/eval/step_configs/config_shakespeare_partialmodel.ini @@ -0,0 +1,34 @@ +[DATASET] +dataset_package = decentralizepy.datasets.Shakespeare +dataset_class = Shakespeare +random_seed = 97 +model_class = LSTM +train_dir = /mnt/nfs/shared/leaf/data/shakespeare_sub/per_user_data/train +test_dir = /mnt/nfs/shared/leaf/data/shakespeare_sub/data/test +; python list of fractions below +sizes = + +[OPTIMIZER_PARAMS] +optimizer_package = torch.optim +optimizer_class = SGD +lr = 0.1 + +[TRAIN_PARAMS] +training_package = decentralizepy.training.Training +training_class = Training +rounds = 10 +full_epochs = False +batch_size = 16 +shuffle = True +loss_package = torch.nn +loss_class = CrossEntropyLoss + +[COMMUNICATION] +comm_package = decentralizepy.communication.TCP +comm_class = TCP +addresses_filepath = ip_addr_6Machines.json + +[SHARING] +sharing_package = decentralizepy.sharing.PartialModel +sharing_class = PartialModel +alpha = 0.1 diff --git a/eval/step_configs/config_shakespeare.ini b/eval/step_configs/config_shakespeare_sharing.ini similarity index 79% rename from eval/step_configs/config_shakespeare.ini rename to eval/step_configs/config_shakespeare_sharing.ini index 75fb258..525e928 100644 --- a/eval/step_configs/config_shakespeare.ini +++ b/eval/step_configs/config_shakespeare_sharing.ini @@ -2,14 +2,14 @@ dataset_package = decentralizepy.datasets.Shakespeare dataset_class = Shakespeare model_class = LSTM -train_dir = /mnt/nfs/shared/leaf/data/shakespeare/per_user_data/train -test_dir = /mnt/nfs/shared/leaf/data/shakespeare/data/test +train_dir = /mnt/nfs/shared/leaf/data/shakespeare_sub/per_user_data/train +test_dir = /mnt/nfs/shared/leaf/data/shakespeare_sub/data/test ; python list of fractions below sizes = [OPTIMIZER_PARAMS] optimizer_package = torch.optim -optimizer_class = Adam +optimizer_class = SGD lr = 0.1 [TRAIN_PARAMS] diff --git a/eval/step_configs/config_shakespeare_subsampling.ini b/eval/step_configs/config_shakespeare_subsampling.ini new file mode 100644 index 0000000..7bdef90 --- /dev/null +++ b/eval/step_configs/config_shakespeare_subsampling.ini @@ -0,0 +1,34 @@ +[DATASET] +dataset_package = decentralizepy.datasets.Shakespeare +dataset_class = Shakespeare +random_seed = 97 +model_class = LSTM +train_dir = /mnt/nfs/shared/leaf/data/shakespeare_sub/per_user_data/train +test_dir = /mnt/nfs/shared/leaf/data/shakespeare_sub/data/test +; python list of fractions below +sizes = + +[OPTIMIZER_PARAMS] +optimizer_package = torch.optim +optimizer_class = SGD +lr = 0.1 + +[TRAIN_PARAMS] +training_package = decentralizepy.training.Training +training_class = Training +rounds = 10 +full_epochs = False +batch_size = 16 +shuffle = True +loss_package = torch.nn +loss_class = CrossEntropyLoss + +[COMMUNICATION] +comm_package = decentralizepy.communication.TCP +comm_class = TCP +addresses_filepath = ip_addr_6Machines.json + +[SHARING] +sharing_package = decentralizepy.sharing.SubSampling +sharing_class = SubSampling +alpha = 0.1 diff --git a/eval/step_configs/config_shakespeare_topkacc.ini b/eval/step_configs/config_shakespeare_topkacc.ini new file mode 100644 index 0000000..15838fb --- /dev/null +++ b/eval/step_configs/config_shakespeare_topkacc.ini @@ -0,0 +1,36 @@ +[DATASET] +dataset_package = decentralizepy.datasets.Shakespeare +dataset_class = Shakespeare +random_seed = 97 +model_class = LSTM +train_dir = /mnt/nfs/shared/leaf/data/shakespeare_sub/per_user_data/train +test_dir = /mnt/nfs/shared/leaf/data/shakespeare_sub/data/test +; python list of fractions below +sizes = + +[OPTIMIZER_PARAMS] +optimizer_package = torch.optim +optimizer_class = SGD +lr = 0.1 + +[TRAIN_PARAMS] +training_package = decentralizepy.training.Training +training_class = Training +rounds = 10 +full_epochs = False +batch_size = 16 +shuffle = True +loss_package = torch.nn +loss_class = CrossEntropyLoss + +[COMMUNICATION] +comm_package = decentralizepy.communication.TCP +comm_class = TCP +addresses_filepath = ip_addr_6Machines.json + +[SHARING] +sharing_package = decentralizepy.sharing.PartialModel +sharing_class = PartialModel +alpha = 0.1 +accumulation = True +accumulate_averaging_changes = True \ No newline at end of file diff --git a/eval/step_configs/config_shakespeare_wavelet.ini b/eval/step_configs/config_shakespeare_wavelet.ini new file mode 100644 index 0000000..94cc840 --- /dev/null +++ b/eval/step_configs/config_shakespeare_wavelet.ini @@ -0,0 +1,39 @@ +[DATASET] +dataset_package = decentralizepy.datasets.Shakespeare +dataset_class = Shakespeare +random_seed = 97 +model_class = LSTM +train_dir = /mnt/nfs/shared/leaf/data/shakespeare_sub/per_user_data/train +test_dir = /mnt/nfs/shared/leaf/data/shakespeare_sub/data/test +; python list of fractions below +sizes = + +[OPTIMIZER_PARAMS] +optimizer_package = torch.optim +optimizer_class = SGD +lr = 0.1 + +[TRAIN_PARAMS] +training_package = decentralizepy.training.Training +training_class = Training +rounds = 10 +full_epochs = False +batch_size = 16 +shuffle = True +loss_package = torch.nn +loss_class = CrossEntropyLoss + +[COMMUNICATION] +comm_package = decentralizepy.communication.TCP +comm_class = TCP +addresses_filepath = ip_addr_6Machines.json + +[SHARING] +sharing_package = decentralizepy.sharing.Wavelet +sharing_class = Wavelet +change_based_selection = True +alpha = 0.1 +wavelet=sym2 +level= 4 +accumulation = True +accumulate_averaging_changes = True diff --git a/src/decentralizepy/datasets/Femnist.py b/src/decentralizepy/datasets/Femnist.py index 221e6f2..a7b2677 100644 --- a/src/decentralizepy/datasets/Femnist.py +++ b/src/decentralizepy/datasets/Femnist.py @@ -14,6 +14,7 @@ from decentralizepy.datasets.Dataset import Dataset from decentralizepy.datasets.Partitioner import DataPartitioner from decentralizepy.mappings.Mapping import Mapping from decentralizepy.models.Model import Model +from decentralizepy.models.Resnet import BasicBlock, Bottleneck, conv1x1 NUM_CLASSES = 62 IMAGE_SIZE = (28, 28) @@ -290,7 +291,10 @@ class Femnist(Dataset): """ if self.__training__: return DataLoader( - Data(self.train_x, self.train_y), batch_size=batch_size, shuffle=shuffle + Data(self.train_x, self.train_y), + batch_size=batch_size, + shuffle=shuffle, + drop_last=True, # needed for resnet ) raise RuntimeError("Training set not initialized!") @@ -448,3 +452,139 @@ class CNN(Model): x = F.relu(self.fc1(x)) x = self.fc2(x) return x + + +class RNET(Model): + """ + From PyTorch: + Class for a Resnet Model for FEMNIST + Copied and modified from https://github.com/pytorch/pytorch/blob/75024e228ca441290b6a1c2e564300ad507d7af6/benchmarks/functional_autograd_benchmark/torchvision_models.py + For the license see models/Resnet.py + """ + + def __init__( + self, + num_classes=NUM_CLASSES, + zero_init_residual=False, + groups=1, + width_per_group=32, + replace_stride_with_dilation=None, + norm_layer=None, + ): + super(RNET, self).__init__() + block = BasicBlock + layers = [2, 2, 2, 2] + if norm_layer is None: + norm_layer = nn.BatchNorm2d + self._norm_layer = norm_layer + + self.inplanes = 32 + self.dilation = 1 + if replace_stride_with_dilation is None: + # each element in the tuple indicates if we should replace + # the 2x2 stride with a dilated convolution instead + replace_stride_with_dilation = [False, False, False] + if len(replace_stride_with_dilation) != 3: + raise ValueError( + "replace_stride_with_dilation should be None " + "or a 3-element tuple, got {}".format(replace_stride_with_dilation) + ) + self.groups = groups + self.base_width = width_per_group + self.conv1 = nn.Conv2d( + 1, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False + ) + self.bn1 = norm_layer(self.inplanes) + self.relu = nn.ReLU(inplace=True) + self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1) + self.layer1 = self._make_layer(block, 32, layers[0]) + self.layer2 = self._make_layer( + block, 64, layers[1], stride=2, dilate=replace_stride_with_dilation[0] + ) + self.layer3 = self._make_layer( + block, 128, layers[2], stride=2, dilate=replace_stride_with_dilation[1] + ) + self.layer4 = self._make_layer( + block, 256, layers[3], stride=2, dilate=replace_stride_with_dilation[2] + ) + self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) + self.fc = nn.Linear(256 * block.expansion, num_classes) + + for m in self.modules(): + if isinstance(m, nn.Conv2d): + nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu") + elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)): + nn.init.constant_(m.weight, 1) + nn.init.constant_(m.bias, 0) + + # Zero-initialize the last BN in each residual branch, + # so that the residual branch starts with zeros, and each residual block behaves like an identity. + # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677 + if zero_init_residual: + for m in self.modules(): + if isinstance(m, Bottleneck): + nn.init.constant_(m.bn3.weight, 0) + elif isinstance(m, BasicBlock): + nn.init.constant_(m.bn2.weight, 0) + + def _make_layer(self, block, planes, blocks, stride=1, dilate=False): + norm_layer = self._norm_layer + downsample = None + previous_dilation = self.dilation + if dilate: + self.dilation *= stride + stride = 1 + if stride != 1 or self.inplanes != planes * block.expansion: + downsample = nn.Sequential( + conv1x1(self.inplanes, planes * block.expansion, stride), + norm_layer(planes * block.expansion), + ) + + layers = [] + layers.append( + block( + self.inplanes, + planes, + stride, + downsample, + self.groups, + self.base_width, + previous_dilation, + norm_layer, + ) + ) + self.inplanes = planes * block.expansion + for _ in range(1, blocks): + layers.append( + block( + self.inplanes, + planes, + groups=self.groups, + base_width=self.base_width, + dilation=self.dilation, + norm_layer=norm_layer, + ) + ) + + return nn.Sequential(*layers) + + def _forward_impl(self, x): + # See note [TorchScript super()] + x = self.conv1(x) + x = self.bn1(x) + x = self.relu(x) + x = self.maxpool(x) + + x = self.layer1(x) + x = self.layer2(x) + x = self.layer3(x) + x = self.layer4(x) + + x = self.avgpool(x) + x = torch.flatten(x, 1) + x = self.fc(x) + + return x + + def forward(self, x): + return self._forward_impl(x) diff --git a/src/decentralizepy/datasets/Shakespeare.py b/src/decentralizepy/datasets/Shakespeare.py index cc6f357..60ba148 100644 --- a/src/decentralizepy/datasets/Shakespeare.py +++ b/src/decentralizepy/datasets/Shakespeare.py @@ -295,8 +295,11 @@ class Shakespeare(Dataset): """ if self.__training__: + # Only using a subset of the training set. The full set is too large. + thirstiest = torch.arange(0, self.test_x.shape[0], 30) return DataLoader( - Data(self.train_x, self.train_y), batch_size=batch_size, shuffle=shuffle + Data(self.test_x[thirstiest], self.test_y[thirstiest]), + batch_size=self.test_batch_size, ) raise RuntimeError("Training set not initialized!") diff --git a/src/decentralizepy/models/Resnet.py b/src/decentralizepy/models/Resnet.py new file mode 100644 index 0000000..158a2ed --- /dev/null +++ b/src/decentralizepy/models/Resnet.py @@ -0,0 +1,200 @@ +""" + Copyright (c) 2016- Facebook, Inc (Adam Paszke) + Copyright (c) 2014- Facebook, Inc (Soumith Chintala) + Copyright (c) 2011-2014 Idiap Research Institute (Ronan Collobert) + Copyright (c) 2012-2014 Deepmind Technologies (Koray Kavukcuoglu) + Copyright (c) 2011-2012 NEC Laboratories America (Koray Kavukcuoglu) + Copyright (c) 2011-2013 NYU (Clement Farabet) + Copyright (c) 2006-2010 NEC Laboratories America (Ronan Collobert, Leon Bottou, Iain Melvin, Jason Weston) + Copyright (c) 2006 Idiap Research Institute (Samy Bengio) + Copyright (c) 2001-2004 Idiap Research Institute (Ronan Collobert, Samy Bengio, Johnny Mariethoz) + + From Caffe2: + + Copyright (c) 2016-present, Facebook Inc. All rights reserved. + + All contributions by Facebook: + Copyright (c) 2016 Facebook Inc. + + All contributions by Google: + Copyright (c) 2015 Google Inc. + All rights reserved. + + All contributions by Yangqing Jia: + Copyright (c) 2015 Yangqing Jia + All rights reserved. + + All contributions by Kakao Brain: + Copyright 2019-2020 Kakao Brain + + All contributions from Caffe: + Copyright(c) 2013, 2014, 2015, the respective contributors + All rights reserved. + + All other contributions: + Copyright(c) 2015, 2016 the respective contributors + All rights reserved. + + Caffe2 uses a copyright model similar to Caffe: each contributor holds + copyright over their contributions to Caffe2. The project versioning records + all such contribution and copyright details. If a contributor wants to further + mark their specific copyright on a particular contribution, they should + indicate their copyright solely in the commit message of the change when it is + committed. + + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + 3. Neither the names of Facebook, Deepmind Technologies, NYU, NEC Laboratories America + and IDIAP Research Institute nor the names of its contributors may be + used to endorse or promote products derived from this software without + specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + POSSIBILITY OF SUCH DAMAGE. +""" +from torch import nn + +# Copied and modified from https://github.com/pytorch/pytorch/blob/75024e228ca441290b6a1c2e564300ad507d7af6/benchmarks/functional_autograd_benchmark/torchvision_models.py + + +def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1): + """3x3 convolution with padding""" + return nn.Conv2d( + in_planes, + out_planes, + kernel_size=3, + stride=stride, + padding=dilation, + groups=groups, + bias=False, + dilation=dilation, + ) + + +def conv1x1(in_planes, out_planes, stride=1): + """1x1 convolution""" + return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False) + + +class BasicBlock(nn.Module): + expansion = 1 + + def __init__( + self, + inplanes, + planes, + stride=1, + downsample=None, + groups=1, + base_width=64, + dilation=1, + norm_layer=None, + ): + super(BasicBlock, self).__init__() + if norm_layer is None: + norm_layer = nn.BatchNorm2d + if dilation > 1: + raise NotImplementedError("Dilation > 1 not supported in BasicBlock") + # Both self.conv1 and self.downsample layers downsample the input when stride != 1 + self.conv1 = conv3x3(inplanes, planes, stride) + self.bn1 = norm_layer(planes) + self.relu = nn.ReLU(inplace=True) + self.conv2 = conv3x3(planes, planes) + self.bn2 = norm_layer(planes) + self.downsample = downsample + self.stride = stride + + def forward(self, x): + identity = x + + out = self.conv1(x) + out = self.bn1(out) + out = self.relu(out) + + out = self.conv2(out) + out = self.bn2(out) + + if self.downsample is not None: + identity = self.downsample(x) + + out += identity + out = self.relu(out) + + return out + + +class Bottleneck(nn.Module): + # Bottleneck in torchvision places the stride for downsampling at 3x3 convolution(self.conv2) + # while original implementation places the stride at the first 1x1 convolution(self.conv1) + # according to "Deep residual learning for image recognition"https://arxiv.org/abs/1512.03385. + # This variant is also known as ResNet V1.5 and improves accuracy according to + # https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch. + + expansion = 4 + + def __init__( + self, + inplanes, + planes, + stride=1, + downsample=None, + groups=1, + base_width=64, + dilation=1, + norm_layer=None, + ): + super(Bottleneck, self).__init__() + if norm_layer is None: + norm_layer = nn.BatchNorm2d + width = int(planes * (base_width / 64.0)) * groups + # Both self.conv2 and self.downsample layers downsample the input when stride != 1 + self.conv1 = conv1x1(inplanes, width) + self.bn1 = norm_layer(width) + self.conv2 = conv3x3(width, width, stride, groups, dilation) + self.bn2 = norm_layer(width) + self.conv3 = conv1x1(width, planes * self.expansion) + self.bn3 = norm_layer(planes * self.expansion) + self.relu = nn.ReLU(inplace=True) + self.downsample = downsample + self.stride = stride + + def forward(self, x): + identity = x + + out = self.conv1(x) + out = self.bn1(out) + out = self.relu(out) + + out = self.conv2(out) + out = self.bn2(out) + out = self.relu(out) + + out = self.conv3(out) + out = self.bn3(out) + + if self.downsample is not None: + identity = self.downsample(x) + + out += identity + out = self.relu(out) + + return out diff --git a/src/decentralizepy/sharing/FFT.py b/src/decentralizepy/sharing/FFT.py index d912807..f86f73c 100644 --- a/src/decentralizepy/sharing/FFT.py +++ b/src/decentralizepy/sharing/FFT.py @@ -128,11 +128,8 @@ class FFT(PartialModel): logging.info("Returning fft compressed model weights") with torch.no_grad(): - tensors_to_cat = [ - v.data.flatten() for _, v in self.model.state_dict().items() - ] - concated = torch.cat(tensors_to_cat, dim=0) - flat_fft = self.change_transformer(concated) + + flat_fft = self.pre_share_model_transformed if self.change_based_selection: diff = self.model.model_change _, index = torch.topk( @@ -158,8 +155,15 @@ class FFT(PartialModel): Model converted to json dict """ - if self.alpha > self.metadata_cap: # Share fully - return super().serialized_model() + m = dict() + if self.alpha >= self.metadata_cap: # Share fully + data = self.pre_share_model_transformed + m["params"] = data.numpy() + self.total_data += len(self.communication.encrypt(m["params"])) + if self.model.accumulated_changes is not None: + self.model.accumulated_changes = torch.zeros_like(self.model.accumulated_changes) + return m + with torch.no_grad(): topk, indices = self.apply_fft() @@ -187,14 +191,13 @@ class FFT(PartialModel): ) as of: json.dump(shared_params, of) - m = dict() - if not self.dict_ordered: raise NotImplementedError m["alpha"] = self.alpha m["params"] = topk.numpy() m["indices"] = indices.numpy().astype(np.int32) + m["send_partial"] = True self.total_data += len(self.communication.encrypt(m["params"])) self.total_meta += len(self.communication.encrypt(m["indices"])) + len( @@ -218,8 +221,11 @@ class FFT(PartialModel): state_dict of received """ - if self.alpha > self.metadata_cap: # Share fully - return super().deserialized_model(m) + ret = dict() + if "send_partial" not in m: + params = m["params"] + params_tensor = torch.tensor(params) + ret["params"] = params_tensor with torch.no_grad(): if not self.dict_ordered: @@ -231,9 +237,9 @@ class FFT(PartialModel): params_tensor = torch.tensor(params) indices_tensor = torch.tensor(indices, dtype=torch.long) - ret = dict() ret["indices"] = indices_tensor ret["params"] = params_tensor + ret["send_partial"] = True return ret def _averaging(self): @@ -259,10 +265,13 @@ class FFT(PartialModel): ) data = self.deserialized_model(data) params = data["params"] - indices = data["indices"] - # use local data to complement - topkf = flat_fft.clone().detach() - topkf[indices] = params + if "indices" in data: + indices = data["indices"] + # use local data to complement + topkf = flat_fft.clone().detach() + topkf[indices] = params + else: + topkf = params weight = 1 / (max(len(self.peer_deques), degree) + 1) # Metro-Hastings weight_total += weight diff --git a/src/decentralizepy/sharing/LowerBoundTopK.py b/src/decentralizepy/sharing/LowerBoundTopK.py new file mode 100644 index 0000000..6ac5329 --- /dev/null +++ b/src/decentralizepy/sharing/LowerBoundTopK.py @@ -0,0 +1,217 @@ +import logging + +import numpy as np +import torch + +from decentralizepy.sharing.PartialModel import PartialModel + + +class LowerBoundTopK(PartialModel): + """ + This class implements a bounded version of topK. + + """ + + def __init__( + self, + rank, + machine_id, + communication, + mapping, + graph, + model, + dataset, + log_dir, + lower_bound=0.1, + metro_hastings=True, + **kwargs, + ): + """ + Constructor + + Parameters + ---------- + rank : int + Local rank + machine_id : int + Global machine id + communication : decentralizepy.communication.Communication + Communication module used to send and receive messages + mapping : decentralizepy.mappings.Mapping + Mapping (rank, machine_id) -> uid + graph : decentralizepy.graphs.Graph + Graph reprensenting neighbors + model : decentralizepy.models.Model + Model to train + dataset : decentralizepy.datasets.Dataset + Dataset for sharing data. Not implemented yet! TODO + log_dir : str + Location to write shared_params (only writing for 2 procs per machine) + alpha : float + Percentage of model to share + dict_ordered : bool + Specifies if the python dict maintains the order of insertion + save_shared : bool + Specifies if the indices of shared parameters should be logged + metadata_cap : float + Share full model when self.alpha > metadata_cap + accumulation : bool + True if the the indices to share should be selected based on accumulated frequency change + save_accumulated : bool + True if accumulated weight change should be written to file. In case of accumulation the accumulated change + is stored. If a change_transformer is used then the transformed change is stored. + change_transformer : (x: Tensor) -> Tensor + A function that transforms the model change into other domains. Default: identity function + accumulate_averaging_changes: bool + True if the accumulation should account the model change due to averaging + lower_bound : float + Increases the communication budget per communication round by lower_bound, i.e. the new effective + alpha will be alpha + alpha*lower_bound. The extra budget is used to randomly selected parameters that + were shared in less than alpha*lower_bound*100 percentage of the rounds. + metro_hastings: bool + If True then metro hastings averaging is used otherwise it does per parameter averaging. + + """ + super().__init__( + rank, + machine_id, + communication, + mapping, + graph, + model, + dataset, + log_dir, + **kwargs, + ) + self.lower_bound = lower_bound + self.metro_hastings = metro_hastings + if self.lower_bound > 0: + self.start_lower_bounding_at = 1 / self.lower_bound + + def extract_top_gradients(self): + """ + Extract the indices and values of the topK gradients. + The gradients must have been accumulated. + + Returns + ------- + tuple + (a,b). a: The magnitudes of the topK gradients, b: Their indices. + + """ + if self.lower_bound == 0.0: + return super().extract_top_gradients() + + logging.info("Returning topk gradients bounded") + G_topk = torch.abs(self.model.model_change) + std, mean = torch.std_mean(G_topk, unbiased=False) + self.std = std.item() + self.mean = mean.item() + + val, ind = torch.topk( + G_topk, round(self.alpha * G_topk.shape[0]), dim=0, sorted=False + ) + if self.communication_round > self.start_lower_bounding_at: + # because the superclass increases it where it is inconvenient for this subclass + currently_shared = self.model.shared_parameters_counter.clone().detach() + currently_shared[ind] += 1 + ind_small = ( + currently_shared < self.communication_round * self.lower_bound + ).nonzero(as_tuple=True)[0] + ind_small_unique = np.setdiff1d( + ind_small.numpy(), ind.numpy(), assume_unique=True + ) + take_max = round(self.lower_bound * self.alpha * G_topk.shape[0]) + logging.info( + "lower: %i %i %i", len(ind_small), len(ind_small_unique), take_max + ) + if take_max > ind_small_unique.shape[0]: + take_max = ind_small_unique.shape[0] + to_take = torch.rand(ind_small_unique.shape[0]) + _, ind_of_to_take = torch.topk(to_take, take_max, dim=0, sorted=False) + ind_bound = torch.from_numpy(ind_small_unique)[ind_of_to_take] + logging.info("lower bounding: %i %i", len(ind), len(ind_bound)) + # val = torch.concat(val, G_topk[ind_bound]) # not really needed, as thes are abs values and not further used + ind = torch.cat([ind, ind_bound]) + + return val, ind + + def deserialized_model_avg(self, m): + """ + Convert received dict to state_dict. + + Parameters + ---------- + m : dict + dict received + + Returns + ------- + state_dict + state_dict of received + + """ + if "send_partial" not in m: + return super().deserialized_model(m) + + with torch.no_grad(): + state_dict = self.model.state_dict() + + if not self.dict_ordered: + raise NotImplementedError + + # could be made more efficent + T = torch.zeros_like(self.init_model) + index_tensor = torch.tensor(m["indices"], dtype=torch.long) + logging.debug("Original tensor: {}".format(T[index_tensor])) + T[index_tensor] = torch.tensor(m["params"]) + logging.debug("Final tensor: {}".format(T[index_tensor])) + + return T, index_tensor + + def _averaging(self): + """ + Averages the received model with the local model + + """ + if self.metro_hastings: + super()._averaging() + else: + with torch.no_grad(): + + tensors_to_cat = [] + for _, v in self.model.state_dict().items(): + t = v.flatten() + tensors_to_cat.append(t) + T = torch.cat(tensors_to_cat, dim=0) + weight_total = 0 + weight_vector = torch.ones_like(self.init_model) + datas = [] + for i, n in enumerate(self.peer_deques): + degree, iteration, data = self.peer_deques[n].popleft() + logging.debug( + "Averaging model from neighbor {} of iteration {}".format( + n, iteration + ) + ) + data, ind = self.deserialized_model_avg(data) + weight_vector[ind] += 1 + # weight = 1 / (max(len(self.peer_deques), degree) + 1) # Metro-Hastings + # weight_total += weight + datas.append(data) + + weight_vector = 1.0 / weight_vector + # speed up by exploiting sparsity + T = T * weight_vector + for d in datas: + T += d * weight_vector + + start_index = 0 + total = dict() + for i, key in enumerate(self.model.state_dict()): + end_index = start_index + self.lens[i] + total[key] = T[start_index:end_index].reshape(self.shapes[i]) + start_index = end_index + + logging.info("new averaging") + self.model.load_state_dict(total) diff --git a/src/decentralizepy/sharing/PartialModel.py b/src/decentralizepy/sharing/PartialModel.py index 7d8e7fc..c3da769 100644 --- a/src/decentralizepy/sharing/PartialModel.py +++ b/src/decentralizepy/sharing/PartialModel.py @@ -104,7 +104,7 @@ class PartialModel(Sharing): self.change_transformer(self.init_model) ) self.prev = self.init_model - + self.number_of_params = self.init_model.shape[0] if self.save_accumulated: self.model_change_path = os.path.join( self.log_dir, "model_change/{}".format(self.rank) @@ -162,6 +162,8 @@ class PartialModel(Sharing): """ if self.alpha >= self.metadata_cap: # Share fully + if self.model.accumulated_changes is not None: + self.model.accumulated_changes = torch.zeros_like(self.model.accumulated_changes) return super().serialized_model() with torch.no_grad(): @@ -190,9 +192,7 @@ class PartialModel(Sharing): logging.info("Extracting topk params") - tensors_to_cat = [v.data.flatten() for v in self.model.parameters()] - T = torch.cat(tensors_to_cat, dim=0) - T_topk = T[G_topk] + T_topk = self.pre_share_model[G_topk] logging.info("Generating dictionary to send") @@ -274,8 +274,10 @@ class PartialModel(Sharing): tensors_to_cat = [ v.data.flatten() for _, v in self.model.state_dict().items() ] - pre_share_model = torch.cat(tensors_to_cat, dim=0) - change = self.change_transformer(pre_share_model - self.init_model) + self.pre_share_model = torch.cat(tensors_to_cat, dim=0) + # Would only need one of the transforms + self.pre_share_model_transformed = self.change_transformer(self.pre_share_model) + change = self.change_transformer(self.pre_share_model - self.init_model) if self.accumulation: if not self.accumulate_averaging_changes: # Need to accumulate in _pre_step as the accumulation gets rewind during the step diff --git a/src/decentralizepy/sharing/Sharing.py b/src/decentralizepy/sharing/Sharing.py index 3fe189c..9f7645b 100644 --- a/src/decentralizepy/sharing/Sharing.py +++ b/src/decentralizepy/sharing/Sharing.py @@ -49,8 +49,8 @@ class Sharing: self.total_data = 0 self.peer_deques = dict() - my_neighbors = self.graph.neighbors(self.uid) - for n in my_neighbors: + self.my_neighbors = self.graph.neighbors(self.uid) + for n in self.my_neighbors: self.peer_deques[n] = deque() def received_from_all(self): diff --git a/src/decentralizepy/sharing/SubSampling.py b/src/decentralizepy/sharing/SubSampling.py index 6221714..f8c8f50 100644 --- a/src/decentralizepy/sharing/SubSampling.py +++ b/src/decentralizepy/sharing/SubSampling.py @@ -101,6 +101,17 @@ class SubSampling(Sharing): ) Path(self.folder_path).mkdir(parents=True, exist_ok=True) + with torch.no_grad(): + tensors_to_cat = [] + for _, v in self.model.state_dict().items(): + t = v.flatten() + tensors_to_cat.append(t) + self.init_model = torch.cat(tensors_to_cat, dim=0) + + self.model.shared_parameters_counter = torch.zeros( + self.init_model.shape[0], dtype=torch.int32 + ) + def apply_subsampling(self): """ Creates a random binary mask that is used to subsample the parameters that will be shared diff --git a/src/decentralizepy/sharing/Synchronous.py b/src/decentralizepy/sharing/Synchronous.py new file mode 100644 index 0000000..29d7f62 --- /dev/null +++ b/src/decentralizepy/sharing/Synchronous.py @@ -0,0 +1,237 @@ +import logging +from collections import deque + +import torch + + +class Synchronous: + """ + Synchronous training + + """ + + def __init__( + self, rank, machine_id, communication, mapping, graph, model, dataset, log_dir + ): + """ + Constructor + + Parameters + ---------- + rank : int + Local rank + machine_id : int + Global machine id + communication : decentralizepy.communication.Communication + Communication module used to send and receive messages + mapping : decentralizepy.mappings.Mapping + Mapping (rank, machine_id) -> uid + graph : decentralizepy.graphs.Graph + Graph reprensenting neighbors + model : decentralizepy.models.Model + Model to train + dataset : decentralizepy.datasets.Dataset + Dataset for sharing data. Not implemented yet! TODO + log_dir : str + Location to write shared_params (only writing for 2 procs per machine) + + """ + self.rank = rank + self.machine_id = machine_id + self.uid = mapping.get_uid(rank, machine_id) + self.communication = communication + self.mapping = mapping + self.graph = graph + self.model = model + self.dataset = dataset + self.communication_round = 0 + self.log_dir = log_dir + self.total_data = 0 + + self.peer_deques = dict() + self.my_neighbors = self.graph.neighbors(self.uid) + for n in self.my_neighbors: + self.peer_deques[n] = deque() + + with torch.no_grad(): + self.init_model = {} + for k, v in self.model.state_dict().items(): + self.init_model[k] = v.clone().detach() + + def received_from_all(self): + """ + Check if all neighbors have sent the current iteration + + Returns + ------- + bool + True if required data has been received, False otherwise + + """ + for _, i in self.peer_deques.items(): + if len(i) == 0: + return False + return True + + def get_neighbors(self, neighbors): + """ + Choose which neighbors to share with + + Parameters + ---------- + neighbors : list(int) + List of all neighbors + + Returns + ------- + list(int) + Neighbors to share with + + """ + # modify neighbors here + return neighbors + + def serialized_gradient(self): + """ + Convert model to a dictionary. Here we can choose how much to share + + Returns + ------- + dict + Model converted to dict + + """ + m = dict() + for key, val in self.model.state_dict().items(): + m[key] = val - self.init_model[key] # this is -lr*gradient + self.total_data += len(self.communication.encrypt(m)) + return m + + def serialized_model(self): + """ + Convert model to a dictionary. Here we can choose how much to share + + Returns + ------- + dict + Model converted to dict + + """ + m = dict() + for key, val in self.model.state_dict().items(): + m[key] = val.clone().detach() + self.total_data += len(self.communication.encrypt(m)) + return m + + def deserialized_model(self, m): + """ + Convert received dict to state_dict. + + Parameters + ---------- + m : dict + received dict + + Returns + ------- + state_dict + state_dict of received + + """ + return m + + def _pre_step(self): + """ + Called at the beginning of step. + + """ + pass + + def _post_step(self): + """ + Called at the end of step. + + """ + with torch.no_grad(): + self.init_model = {} + for k, v in self.model.state_dict().items(): + self.init_model[k] = v.clone().detach() + + def _apply_gradients(self): + """ + Averages the received model with the local model + + """ + with torch.no_grad(): + total = dict() + for i, n in enumerate(self.peer_deques): + gradient = self.peer_deques[n].popleft() + logging.debug( + "Applying gradient from neighbor {}".format( + n, + ) + ) + grad = self.deserialized_model(gradient) + + for key, value in grad.items(): + if key in total: + total[key] += value + else: + total[key] = value + + my_grad = self.serialized_gradient() + for key, value in my_grad.items(): + if key in total: + total[key] += value + else: + total[key] = value + new_model = {} + for key, value in self.init_model.items(): + new_model[key] = value + total[key] * (1 / (len(self.my_neighbors) + 1)) + + self.model.load_state_dict(new_model) + + def step(self): + """ + Perform a sharing step. Implements D-PSGD. + + """ + self._pre_step() + logging.info("--- COMMUNICATION ROUND {} ---".format(self.communication_round)) + if self.uid != 0: + gradient = self.serialized_gradient() + # Should be only one neighbour + + self.communication.send(0, gradient) + + logging.info("Waiting for messages from central node") + sender, data = self.communication.receive() + logging.debug("Received model from {}".format(sender)) + logging.info( + "Deserialized received model from {} of iteration {}".format( + sender, self.communication_round + ) + ) + self.model.load_state_dict(data) + else: + logging.info("Waiting for messages from leaf nodes") + while not self.received_from_all(): + sender, data = self.communication.receive() + logging.debug("Received gradient from {}".format(sender)) + self.peer_deques[sender].append(data) + logging.info( + "Deserialized gradient model from {} of iteration {}".format( + sender, self.communication_round + ) + ) + self._apply_gradients() + + data = self.serialized_model() + + all_neighbors = self.graph.neighbors(self.uid) + iter_neighbors = self.get_neighbors(all_neighbors) + for neighbor in iter_neighbors: + self.communication.send(neighbor, data) + + self.communication_round += 1 + self._post_step() diff --git a/src/decentralizepy/sharing/TopKNormalized.py b/src/decentralizepy/sharing/TopKNormalized.py new file mode 100644 index 0000000..15a3caf --- /dev/null +++ b/src/decentralizepy/sharing/TopKNormalized.py @@ -0,0 +1,117 @@ +import logging + +import torch + +from decentralizepy.sharing.PartialModel import PartialModel +from decentralizepy.utils import identity + + +class TopKNormalized(PartialModel): + """ + This class implements the vanilla version of partial model sharing. + + """ + + def __init__( + self, + rank, + machine_id, + communication, + mapping, + graph, + model, + dataset, + log_dir, + alpha=1.0, + dict_ordered=True, + save_shared=False, + metadata_cap=1.0, + accumulation=False, + save_accumulated="", + change_transformer=identity, + accumulate_averaging_changes=False, + epsilon=0.01, + ): + """ + Constructor + + Parameters + ---------- + rank : int + Local rank + machine_id : int + Global machine id + communication : decentralizepy.communication.Communication + Communication module used to send and receive messages + mapping : decentralizepy.mappings.Mapping + Mapping (rank, machine_id) -> uid + graph : decentralizepy.graphs.Graph + Graph reprensenting neighbors + model : decentralizepy.models.Model + Model to train + dataset : decentralizepy.datasets.Dataset + Dataset for sharing data. Not implemented yet! TODO + log_dir : str + Location to write shared_params (only writing for 2 procs per machine) + alpha : float + Percentage of model to share + dict_ordered : bool + Specifies if the python dict maintains the order of insertion + save_shared : bool + Specifies if the indices of shared parameters should be logged + metadata_cap : float + Share full model when self.alpha > metadata_cap + accumulation : bool + True if the the indices to share should be selected based on accumulated frequency change + save_accumulated : bool + True if accumulated weight change should be written to file. In case of accumulation the accumulated change + is stored. If a change_transformer is used then the transformed change is stored. + change_transformer : (x: Tensor) -> Tensor + A function that transforms the model change into other domains. Default: identity function + accumulate_averaging_changes: bool + True if the accumulation should account the model change due to averaging + epsilon : float + numerical stability parameter used during normalization + + """ + super().__init__( + rank, + machine_id, + communication, + mapping, + graph, + model, + dataset, + log_dir, + alpha, + dict_ordered, + save_shared, + metadata_cap, + accumulation, + save_accumulated, + change_transformer, + accumulate_averaging_changes, + ) + self.epsilon = epsilon + + def extract_top_gradients(self): + """ + Extract the indices and values of the topK gradients. + The gradients must have been accumulated. + + Returns + ------- + tuple + (a,b). a: The magnitudes of the topK gradients, b: Their indices. + + """ + + logging.info("Returning topk gradients") + G_topk = torch.abs(self.model.model_change) + G_topk_normalized = G_topk / (torch.abs(self.pre_share_model) + self.epsilon) + std, mean = torch.std_mean(G_topk, unbiased=False) + self.std = std.item() + self.mean = mean.item() + return torch.topk( + G_topk_normalized, round(self.alpha * G_topk.shape[0]), dim=0, sorted=False + ) diff --git a/src/decentralizepy/sharing/Wavelet.py b/src/decentralizepy/sharing/Wavelet.py index 9a02a64..81b9a85 100644 --- a/src/decentralizepy/sharing/Wavelet.py +++ b/src/decentralizepy/sharing/Wavelet.py @@ -148,9 +148,7 @@ class Wavelet(PartialModel): """ logging.info("Returning wavelet compressed model weights") - tensors_to_cat = [v.data.flatten() for _, v in self.model.state_dict().items()] - concated = torch.cat(tensors_to_cat, dim=0) - data = self.change_transformer(concated) + data = self.pre_share_model_transformed if self.change_based_selection: diff = self.model.model_change _, index = torch.topk( @@ -179,8 +177,14 @@ class Wavelet(PartialModel): Model converted to json dict """ - if self.alpha > self.metadata_cap: # Share fully - return super().serialized_model() + m = dict() + if self.alpha >= self.metadata_cap: # Share fully + data = self.pre_share_model_transformed + m["params"] = data.numpy() + self.total_data += len(self.communication.encrypt(m["params"])) + if self.model.accumulated_changes is not None: + self.model.accumulated_changes = torch.zeros_like(self.model.accumulated_changes) + return m with torch.no_grad(): topk, indices = self.apply_wavelet() @@ -207,8 +211,6 @@ class Wavelet(PartialModel): ) as of: json.dump(shared_params, of) - m = dict() - if not self.dict_ordered: raise NotImplementedError @@ -218,6 +220,8 @@ class Wavelet(PartialModel): m["indices"] = indices.numpy().astype(np.int32) + m["send_partial"] = True + self.total_data += len(self.communication.encrypt(m["params"])) self.total_meta += len(self.communication.encrypt(m["indices"])) + len( self.communication.encrypt(m["alpha"]) @@ -240,8 +244,12 @@ class Wavelet(PartialModel): state_dict of received """ - if self.alpha > self.metadata_cap: # Share fully - return super().deserialized_model(m) + ret = dict() + if "send_partial" not in m: + params = m["params"] + params_tensor = torch.tensor(params) + ret["params"] = params_tensor + return ret with torch.no_grad(): if not self.dict_ordered: @@ -256,7 +264,8 @@ class Wavelet(PartialModel): ret = dict() ret["indices"] = indices_tensor ret["params"] = params_tensor - return ret + ret["send_partial"] = True + return ret def _averaging(self): """ @@ -266,11 +275,7 @@ class Wavelet(PartialModel): with torch.no_grad(): total = None weight_total = 0 - tensors_to_cat = [ - v.data.flatten() for _, v in self.model.state_dict().items() - ] - pre_share_model = torch.cat(tensors_to_cat, dim=0) - wt_params = self.change_transformer(pre_share_model) + wt_params = self.pre_share_model_transformed for i, n in enumerate(self.peer_deques): degree, iteration, data = self.peer_deques[n].popleft() logging.debug( @@ -280,11 +285,14 @@ class Wavelet(PartialModel): ) data = self.deserialized_model(data) params = data["params"] - indices = data["indices"] - # use local data to complement - topkwf = wt_params.clone().detach() - topkwf[indices] = params - topkwf = topkwf.reshape(self.wt_shape) + if "indices" in data: + indices = data["indices"] + # use local data to complement + topkwf = wt_params.clone().detach() + topkwf[indices] = params + topkwf = topkwf.reshape(self.wt_shape) + else: + topkwf = params.reshape(self.wt_shape) weight = 1 / (max(len(self.peer_deques), degree) + 1) # Metro-Hastings weight_total += weight diff --git a/src/decentralizepy/utils.py b/src/decentralizepy/utils.py index 3ca85f5..4298ec3 100644 --- a/src/decentralizepy/utils.py +++ b/src/decentralizepy/utils.py @@ -105,6 +105,7 @@ def write_args(args, path): "graph_file": args.graph_file, "graph_type": args.graph_type, "test_after": args.test_after, + "train_evaluate_after": args.train_evaluate_after, "reset_optimizer": args.reset_optimizer, } with open(os.path.join(path, "args.json"), "w") as of: -- GitLab