Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Now that you have FedScale installed, you can start exploring FedScale following

***We are adding more datasets! Please contribute!***

FedScale consists of 20+ large-scale, heterogeneous FL datasets covering computer vision (CV), natural language processing (NLP), and miscellaneous tasks.
FedScale consists of 20+ large-scale, heterogeneous FL datasets and 70+ various [models](./fedscale/utils/models/cv_models/README.md), covering computer vision (CV), natural language processing (NLP), and miscellaneous tasks.
Each one is associated with its training, validation, and testing datasets.
Please go to the `./benchmark/dataset` directory and follow the dataset [README](./benchmark/dataset/README.md) for more details.

Expand Down
25 changes: 11 additions & 14 deletions benchmark/configs/async_fl/async_fl.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

# ========== Cluster configuration ==========
# ip address of the parameter server (need 1 GPU process)
ps_ip: localhost
ps_ip: 10.0.0.1

# ip address of each worker:# of available gpus process on each gpu in this node
# Note that if we collocate ps and worker on same GPU, then we need to decrease this number of available processes on that GPU by 1
# E.g., master node has 4 available processes, then 1 for the ps, and worker should be set to: worker:3
worker_ips:
- localhost:[2]
- 10.0.0.1:[4]
- 10.0.0.2:[4]

exp_path: $FEDSCALE_HOME/fedscale/core

Expand All @@ -31,27 +32,23 @@ setup_commands:
job_conf:
- job_name: femnist # Generate logs under this folder: log_path/job_name/time_stamp
- log_path: $FEDSCALE_HOME/benchmark # Path of log files
- num_participants: 800 # Number of participants per round, we use K=100 in our paper, large K will be much slower
- num_participants: 20 # Number of participants per round, we use K=100 in our paper, large K will be much slower
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment for this param might need to be updated for async. How do we decide what to use for num_participants since there's no notion of rounds in async?

- data_set: femnist # Dataset: openImg, google_speech, stackoverflow
- data_dir: $FEDSCALE_HOME/benchmark/dataset/data/femnist # Path of the dataset
- data_map_file: $FEDSCALE_HOME/benchmark/dataset/data/femnist/client_data_mapping/train.csv # Allocation of data to each client, turn to iid setting if not provided
- device_conf_file: $FEDSCALE_HOME/benchmark/dataset/data/device_info/client_device_capacity # Path of the client trace
- device_avail_file: $FEDSCALE_HOME/benchmark/dataset/data/device_info/client_behave_trace
#- device_conf_file: $FEDSCALE_HOME/benchmark/dataset/data/device_info/client_device_capacity # Path of the client trace
#- device_avail_file: $FEDSCALE_HOME/benchmark/dataset/data/device_info/client_behave_trace
- model: shufflenet_v2_x2_0 # Models: e.g., shufflenet_v2_x2_0, mobilenet_v2, resnet34, albert-base-v2
- eval_interval: 20 # How many rounds to run a testing on the testing set
- rounds: 500 # Number of rounds to run this training. We use 1000 in our paper, while it may converge w/ ~400 rounds
- eval_interval: 5 # How many rounds to run a testing on the testing set
- rounds: 3000 # Number of rounds to run this training. We use 1000 in our paper, while it may converge w/ ~400 rounds
- filter_less: 21 # Remove clients w/ less than 21 samples
- num_loaders: 2
- local_steps: 20
- learning_rate: 0.05
- batch_size: 20
- test_bsz: 20
- use_cuda: False
- use_cuda: True
- decay_round: 50
- overcommitment: 1.0
- async_buffer: 10
- arrival_interval: 3




- async_buffer: 20
- arrival_interval: 3
124 changes: 67 additions & 57 deletions examples/async_fl/async_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ def __init__(self, args):
self.round_stamp = [0]
self.client_model_version = {}
self.virtual_client_clock = {}
self.round_lock = threading.Lock()
self.weight_tensor_type = {}
# We need to keep the test model for specific round to avoid async mismatch
self.test_model = None

def tictak_client_tasks(self, sampled_clients, num_clients_to_collect):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this async aggregator implementation, is there a notion of concurrency? In the PAPAYA paper, concurrency is a hyper-parameter in addition to the buffer size.
image

Copy link
Member

@AmberLJC AmberLJC Aug 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point and sorry for not combining Papaya's design. We should do 2 more things

  1. add the concurrency hyper-parameter
  2. Make sure when assigning the training task in tictak_client_tasks, the number of overlapping tasks doesn't exceed max_concurrency


Expand Down Expand Up @@ -81,11 +83,19 @@ def tictak_client_tasks(self, sampled_clients, num_clients_to_collect):
return (sampled_clients, sampled_clients, completed_client_clock,
1, completionTimes)

def save_last_param(self):
""" Save the last model parameters
"""
self.last_gradient_weights = [
p.data.clone() for p in self.model.parameters()]
self.model_weights = copy.deepcopy(self.model.state_dict())
self.weight_tensor_type = {p: self.model_weights[p].data.dtype \
for p in self.model_weights}

def aggregate_client_weights(self, results):
"""May aggregate client updates on the fly"""
"""
[FedAvg] "Communication-Efficient Learning of Deep Networks from Decentralized Data".
H. Brendan McMahan, Eider Moore, Daniel Ramage, Seth Hampson, Blaise Aguera y Arcas. AISTATS, 2017
"PAPAYA: PRACTICAL, PRIVATE, AND SCALABLE FEDERATED LEARNING". MLSys, 2022
"""
# Start to take the average of updates, and we do not keep updates to save memory
# Importance of each update is 1/#_of_participants * staleness
Expand All @@ -95,54 +105,29 @@ def aggregate_client_weights(self, results):
importance = 1. / math.sqrt(1 + client_staleness)

for p in results['update_weight']:
# Different to core/executor, update_weight here is (train_model_weight - untrained)
param_weight = results['update_weight'][p]

if isinstance(param_weight, list):
param_weight = np.asarray(param_weight, dtype=np.float32)
param_weight = torch.from_numpy(
param_weight).to(device=self.device)

if self.model_in_update == 1:
self.model_weights[p].data = param_weight * importance
else:
if self.model_weights[p].data.dtype in (
torch.float, torch.double, torch.half,
torch.bfloat16, torch.chalf, torch.cfloat, torch.cdouble
):
# Only assign importance to floats (trainable variables)
self.model_weights[p].data += param_weight * importance
else:
# Non-floats (e.g., batches), no need to aggregate but need to track
self.model_weights[p].data += param_weight

if self.model_in_update == self.async_buffer_size:
logging.info("Calibrating tensor type")
for p in self.model_weights:
d_type = self.model_weights[p].data.dtype

self.model_weights[p].data = (
self.model_weights[p] / float(self.async_buffer_size)).to(dtype=d_type)

def aggregate_client_group_weights(self, results):
"""Streaming weight aggregation. Similar to aggregate_client_weights,
but each key corresponds to a group of weights (e.g., for Tensorflow)"""

client_staleness = self.round - \
self.client_model_version[results['clientId']]
importance = 1. / math.sqrt(1 + client_staleness)

for p_g in results['update_weight']:
param_weights = results['update_weight'][p_g]
for idx, param_weight in enumerate(param_weights):
if isinstance(param_weight, list):
param_weight = np.asarray(param_weight, dtype=np.float32)
param_weight = torch.from_numpy(
param_weight).to(device=self.device)

if self.model_in_update == 1:
self.model_weights[p_g][idx].data = param_weight * importance
else:
self.model_weights[p_g][idx].data += param_weight * importance

if self.model_in_update == self.async_buffer_size:
for p in self.model_weights:
for idx in range(len(self.model_weights[p])):
d_type = self.model_weights[p][idx].data.dtype

self.model_weights[p][idx].data = (
self.model_weights[p][idx].data /
float(self.async_buffer_size)
).to(dtype=d_type)
d_type = self.weight_tensor_type[p]
self.model_weights[p].data = (self.model_weights[p].data/float(self.async_buffer_size)).to(dtype=d_type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dividing the model weights by async_buffer_size might not be correct since we are weighing each update by the staleness factor. Should we be dividing by the sum of the importance variable over the round?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is self.model_weights[p].data the delta?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! This is really a good catch! I was wondering this too, so turned to the fedbuff paper. It turns out the paper divides the update by async_buffer_size again (Alg 1, L11). But we are happy to update it if you think it makes more sense.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.model_weights[p].data += delta * importance. So at this time, it becomes the latest model version.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a quick question, and would greatly appreciate your input: existing FL runs the local training over a fixed number of steps, while the step here can be iteration or epoch. We find it is difficult to have a consistent implementation, though this change only takes 2-3 lines of code (e.g., by changing local_step = len(data) * local_step).

Wondering which one you prefer. We think local_step=epoch can lead to longer round duration due to the data heterogeneity. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I think dividing by async_buffer_size is fine. However, if I understand correctly, self.model_weights[p].data = (self.model_weights[p].data/float(self.async_buffer_size)).to(dtype=d_type) is dividing the entire model weights by async_buffer_size, which might not be correct? We would want to divide just the accumulated gradients of this round by the async_buffer_size, and then multiply by a learning rate after then add that to the global model weights. Again, it could be that I'm not understanding the code correctly. 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for fixed number of steps vs len(data), I think it might make sense to have a max number of steps if possible. Then this value can be adjusted for different datasets based on use case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! Yes! You are correct! This might even be the cause of weird accuracy. Many thanks for helping me! Let me fix it and try!


def round_completion_handler(self):
self.global_virtual_clock = self.round_stamp[-1]
Expand Down Expand Up @@ -173,7 +158,7 @@ def round_completion_handler(self):
self.sampled_participants, len(self.sampled_participants))

logging.info(f"{len(clientsToRun)} clients with constant arrival following the order: {clientsToRun}")

logging.info(f"====Register {len(clientsToRun)} to queue")
# Issue requests to the resource manager; Tasks ordered by the completion time
self.resource_manager.register_tasks(clientsToRun)
self.virtual_client_clock.update(virtual_client_clock)
Expand All @@ -192,10 +177,12 @@ def round_completion_handler(self):
self.test_result_accumulator = []
self.stats_util_accumulator = []
self.client_training_results = []
self.loss_accumulator = []

if self.round >= self.args.rounds:
self.broadcast_aggregator_events(commons.SHUT_DOWN)
elif self.round % self.args.eval_interval == 0:
self.test_model = copy.deepcopy(self.model)
self.broadcast_aggregator_events(commons.UPDATE_MODEL)
self.broadcast_aggregator_events(commons.MODEL_TEST)
else:
Expand All @@ -206,15 +193,38 @@ def find_latest_model(self, start_time):
for i, time_stamp in enumerate(reversed(self.round_stamp)):
if start_time >= time_stamp:
return len(self.round_stamp) - i
return None
return 1

def get_test_config(self, client_id):
"""FL model testing on clients, developers can further define personalized client config here.

Args:
client_id (int): The client id.

Returns:
dictionary: The testing config for new task.

"""
# Get the straggler round-id
client_tasks = self.resource_manager.client_run_queue
current_pending_length = min(
self.resource_manager.client_run_queue_idx, len(client_tasks)-1)

current_pending_clients = client_tasks[current_pending_length:]
straggler_round = 1e10
for client in current_pending_clients:
straggler_round = min(
self.find_latest_model(self.client_start_time[client]), straggler_round)

return {'client_id': client_id, 'straggler_round': straggler_round, 'test_model': self.test_model}

def get_client_conf(self, clientId):
"""Training configurations that will be applied on clients"""
start_time = self.client_start_time[clientId]
model_id = self.find_latest_model(start_time)
self.client_model_version[clientId] = model_id
end_time = self.client_round_duration[clientId] + start_time
logging.info(f"Client {clientId} train on model {model_id} during {start_time}-{end_time}")
logging.info(f"Client {clientId} train on model {model_id} during {int(start_time)}-{int(end_time)}")

conf = {
'learning_rate': self.args.learning_rate,
Expand All @@ -227,17 +237,17 @@ def create_client_task(self, executorId):

next_clientId = self.resource_manager.get_next_task(executorId)
train_config = None
# NOTE: model = None then the executor will load the global model broadcasted in UPDATE_MODEL
model = None
model_version = None

if next_clientId != None:
config = self.get_client_conf(next_clientId)
model_version = self.find_latest_model(self.client_start_time[next_clientId])
train_config = {'client_id': next_clientId, 'task_config': config}
return train_config, model
return train_config, model_version

def CLIENT_EXECUTE_COMPLETION(self, request, context):
"""FL clients complete the execution task.

Args:
request (CompleteRequest): Complete request info from executor.

Expand All @@ -249,26 +259,26 @@ def CLIENT_EXECUTE_COMPLETION(self, request, context):
executor_id, client_id, event = request.executor_id, request.client_id, request.event
execution_status, execution_msg = request.status, request.msg
meta_result, data_result = request.meta_result, request.data_result

if event == commons.CLIENT_TRAIN:
# Training results may be uploaded in CLIENT_EXECUTE_RESULT request later,
# so we need to specify whether to ask client to do so (in case of straggler/timeout in real FL).
if execution_status is False:
logging.error(f"Executor {executor_id} fails to run client {client_id}, due to {execution_msg}")

if self.resource_manager.has_next_task(executor_id):
# NOTE: we do not pop the train immediately in simulation mode,
# since the executor may run multiple clients
if commons.CLIENT_TRAIN not in self.individual_client_events[executor_id]:
self.individual_client_events[executor_id].append(
commons.CLIENT_TRAIN)

elif event in (commons.MODEL_TEST, commons.UPLOAD_MODEL):
self.add_event_handler(
executor_id, event, meta_result, data_result)
else:
logging.error(f"Received undefined event {event} from client {client_id}")

if self.resource_manager.has_next_task(executor_id):
# NOTE: we do not pop the train immediately in simulation mode,
# since the executor may run multiple clients
if commons.CLIENT_TRAIN not in self.individual_client_events[executor_id]:
self.individual_client_events[executor_id].append(
commons.CLIENT_TRAIN)

return self.CLIENT_PING(request, context)

def log_train_result(self, avg_loss):
Expand Down Expand Up @@ -304,7 +314,7 @@ def event_monitor(self):
if current_event == commons.UPLOAD_MODEL:
self.client_completion_handler(
self.deserialize_response(data))
if len(self.stats_util_accumulator) == self.async_buffer_size:
if self.model_in_update == self.async_buffer_size:
clientID = self.deserialize_response(data)['clientId']
self.round_stamp.append(
self.client_round_duration[clientID] + self.client_start_time[clientID])
Expand Down
64 changes: 64 additions & 0 deletions examples/async_fl/async_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import copy
import logging
import math

import torch
from torch.autograd import Variable

from fedscale.core.execution.client import Client
from fedscale.core.execution.optimizers import ClientOptimizer
from fedscale.dataloaders.nlp import mask_tokens


class Client(Client):
"""Basic client component in Federated Learning"""

def train(self, client_data, model, conf):

clientId = conf.clientId
logging.info(f"Start to train (CLIENT: {clientId}) ...")
tokenizer, device = conf.tokenizer, conf.device

model = model.to(device=device)
model.train()

trained_unique_samples = min(
len(client_data.dataset), conf.local_steps * conf.batch_size)

self.global_model = None
if conf.gradient_policy == 'fed-prox':
# could be move to optimizer
self.global_model = [param.data.clone() for param in model.parameters()]

prev_model_dict = copy.deepcopy(model.state_dict())
optimizer = self.get_optimizer(model, conf)
criterion = self.get_criterion(conf)
error_type = None

# TODO: One may hope to run fixed number of epochs, instead of iterations
while self.completed_steps < conf.local_steps:
try:
self.train_step(client_data, conf, model, optimizer, criterion)
except Exception as ex:
error_type = ex
break

state_dicts = model.state_dict()
# In async, we need the delta_weight only
model_param = {p: (state_dicts[p] - prev_model_dict[p]).data.cpu().numpy()
for p in state_dicts}
results = {'clientId': clientId, 'moving_loss': self.epoch_train_loss,
'trained_size': self.completed_steps*conf.batch_size,
'success': self.completed_steps == conf.batch_size}
results['utility'] = math.sqrt(
self.loss_squre)*float(trained_unique_samples)

if error_type is None:
logging.info(f"Training of (CLIENT: {clientId}) completes, {results}")
else:
logging.info(f"Training of (CLIENT: {clientId}) failed as {error_type}")

results['update_weight'] = model_param
results['wall_duration'] = 0

return results
Loading