Skip to content

Conversation

fanlai0990
Copy link
Collaborator

Why are these changes needed?

  1. Model testing is somehow missing; 2. Weird model accuracy over training;

Related issue number

Checks

  • I've included any doc changes needed for https://fedscale.readthedocs.io/en/latest/
  • I've made sure the following tests are passing.
  • Testing Configurations
    • Dry Run (20 training rounds & 1 evaluation round)
    • Cifar 10 (20 training rounds & 1 evaluation round)
    • Femnist (20 training rounds & 1 evaluation round)

@ewenw
Copy link
Contributor

ewenw commented Aug 12, 2022

Thank you for the fix!

- job_name: femnist # Generate logs under this folder: log_path/job_name/time_stamp
- log_path: $FEDSCALE_HOME/benchmark # Path of log files
- num_participants: 800 # Number of participants per round, we use K=100 in our paper, large K will be much slower
- num_participants: 20 # Number of participants per round, we use K=100 in our paper, large K will be much slower
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment for this param might need to be updated for async. How do we decide what to use for num_participants since there's no notion of rounds in async?

def get_remaining(self) -> int:
"""Number of tasks left in the queue
"""
return self.get_task_length()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the get_task_length() function could be removed in favor of get_remaining?

return os.path.exists(self.temp_model_path_version(round))
return os.path.exists(self.temp_model_path_version(model_id))

def remove_stale_models(self, straggler_round):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a bit inefficient to check whether the path for all the models exist during each test handler call. Seems like it would be O(N^2) time?

Copy link
Collaborator Author

@fanlai0990 fanlai0990 Aug 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! We can do it in the reversed order:

while version(stale_model_id) exists:  // older version should have already been removed, so we can stop
        remove(stale_model_id) 
        stale_model_id -= 1

On the other hand, since testing is not called in every iteration, this for loop is also fine to some extent. But we will definitely optimize it. Thanks for your input!

@fanlai0990
Copy link
Collaborator Author

fanlai0990 commented Aug 15, 2022

Thank you for the fix!

Thanks a lot for your feedback! This PR is still WIP, since we notice the weird accuracy issue has not been well addressed (although I almost rewrote the entire async). I will fix it and your comments asap. Thanks for your patience!

float(self.async_buffer_size)
).to(dtype=d_type)
d_type = self.weight_tensor_type[p]
self.model_weights[p].data = (self.model_weights[p].data/float(self.async_buffer_size)).to(dtype=d_type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dividing the model weights by async_buffer_size might not be correct since we are weighing each update by the staleness factor. Should we be dividing by the sum of the importance variable over the round?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is self.model_weights[p].data the delta?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! This is really a good catch! I was wondering this too, so turned to the fedbuff paper. It turns out the paper divides the update by async_buffer_size again (Alg 1, L11). But we are happy to update it if you think it makes more sense.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.model_weights[p].data += delta * importance. So at this time, it becomes the latest model version.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a quick question, and would greatly appreciate your input: existing FL runs the local training over a fixed number of steps, while the step here can be iteration or epoch. We find it is difficult to have a consistent implementation, though this change only takes 2-3 lines of code (e.g., by changing local_step = len(data) * local_step).

Wondering which one you prefer. We think local_step=epoch can lead to longer round duration due to the data heterogeneity. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I think dividing by async_buffer_size is fine. However, if I understand correctly, self.model_weights[p].data = (self.model_weights[p].data/float(self.async_buffer_size)).to(dtype=d_type) is dividing the entire model weights by async_buffer_size, which might not be correct? We would want to divide just the accumulated gradients of this round by the async_buffer_size, and then multiply by a learning rate after then add that to the global model weights. Again, it could be that I'm not understanding the code correctly. 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for fixed number of steps vs len(data), I think it might make sense to have a max number of steps if possible. Then this value can be adjusted for different datasets based on use case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! Yes! You are correct! This might even be the cause of weird accuracy. Many thanks for helping me! Let me fix it and try!

# We need to keep the test model for specific round to avoid async mismatch
self.test_model = None

def tictak_client_tasks(self, sampled_clients, num_clients_to_collect):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this async aggregator implementation, is there a notion of concurrency? In the PAPAYA paper, concurrency is a hyper-parameter in addition to the buffer size.
image

Copy link
Member

@AmberLJC AmberLJC Aug 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point and sorry for not combining Papaya's design. We should do 2 more things

  1. add the concurrency hyper-parameter
  2. Make sure when assigning the training task in tictak_client_tasks, the number of overlapping tasks doesn't exceed max_concurrency

@AmberLJC AmberLJC requested a review from ewenw August 24, 2022 15:56
@ewenw
Copy link
Contributor

ewenw commented Aug 24, 2022

Hi @AmberLJC do you have any tensorboard results for async handy? It would be helpful to attach it with the PR. Thanks!

# -results = {'clientId':clientId, 'update_weight': model_param, 'moving_loss': round_train_loss,
# 'trained_size': count, 'wall_duration': time_cost, 'success': is_success 'utility': utility}

# [Async] some clients are scheduled earlier, which should be aggregated in previous round but receive the result late
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding: why do we want to ignore clients that should be aggregated in previous rounds? Don't we want to aggregate it anyways with a staleness factor?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, that's also a solution🤔, if we ignore the fact that the supposed end_time of the result has passed.
(The reason of receive "past" results is that we schedule the training task based on its end_time, but we cannot control the order of task finishing. )
I will either fix the grpc problem or just follow your suggestion. @fanlai0990 what do you think

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can aggregate it according to the staleness factor for now.

@AmberLJC
Copy link
Member

Hi @AmberLJC do you have any tensorboard results for async handy? It would be helpful to attach it with the PR. Thanks!

image

@ewenw
Copy link
Contributor

ewenw commented Aug 24, 2022

Do you know how the convergence over virtual clock time compares with synchronous training with similar parameters?

@AmberLJC
Copy link
Member

Do you know how the convergence over virtual clock time compares with synchronous training with similar parameters?

Let me try it out. I previously had some results, but I want to make a fair comparison again.

end_j += 1
if concurreny_count > self.max_concurrency:
end_list.pop()
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By breaking the loop here, are we limiting the number of participants for each model version to max_concurrency?

For example, if we have an aggregation target of 40, and the max concurrency is set to 20, would round_completion_handler() every be called and new clients be sampled?

Copy link
Member

@AmberLJC AmberLJC Aug 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would not be a problem. Here I just make sure the concurrent number of participants <= 20, while it can still launch > 40 clients in a larger time window. continue does not break the loop, instead it jumps to the next loop.

(though it depends on the traces, so I need to set a larger over here)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks. It seems like num_participants is no longer used in the code or did I miss it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I have removed it.

self.update_lock.release()
return remaining_task_num

def register_tasks(self, clientsToRun):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the tasks need resorting based on end time when new tasks are registered, or is there some assumption that makes this unnecessary? For example, let's say aggregation_buffer_size=1, in round 1 we have sample client_A[start=1, end= 3] and client_B [start=2,end=10]; in round 2 we sample client_C[start=5,end=8]. In round 2, we would actually get the result of client_B and aggregate it before client_C because it is earlier in the queue, even though client_C should have finished earlier.

Also, with multiple executors, the tasks might return in a different order from the original queue here. How is this handled?

Copy link
Collaborator Author

@fanlai0990 fanlai0990 Aug 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are pretty good points.

  1. We can mitigate the first concern by "prefetching" more client selection in future rounds (here in async_aggregator select_participants(aggregation_buffer_size*10)).
  2. The second problem leads to a tradeoff between simulation efficiency and fidelity. One workaround is to stall get_next_task before clients of the current buffer_size complete;

I think we should support the 2nd point. Please let us know your feedback. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. That makes sense, thanks for clarifying. I think it's a valid workaround. My only concern is that with "prefetching", the difference is that round number will affect aggregation because of the staleness factor. Btw, is there a mechanism to keep the size of this queue from exploding since we are preselecting a lot?
  2. I think that's a good solution / option to have for correctness. Simulation speed is not a huge concern compared to the fidelity of the results. If one worker is somehow running slower than the others, then at least one result would be out of order each round, which could potentially change the results.

Thanks a lot for addressing my feedback so quickly!

Copy link
Collaborator Author

@fanlai0990 fanlai0990 Aug 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My pleasure!

  1. I am not pretty sure whether I have understood your concern correctly, please feel free to correct me if not.
    • I think "prefetching" is fine, since we are tracking their correct "(completion) round number", thus staleness, after sorting by the completion time. Please note that the selection_round is different from the completion_round we used in aggregation. In fact, I guess we even do not care about the "selection_round", if we ignore its impact on participant_selection decisions? Admittedly, we are sacrificing the selection fidelity, but it is hard to circumvent this further in simulations.
    • IMO, exploding won't be a big concern. If we assume async_buffer size is 1K, then storing 1K*10/20 indices (int64) is not a big concern? Meanwhile, since the aggregator is draining events on the fly, the event_queue won't pile up. But you are right if we are storing thousands of client updates, and that's why we are developing Redis support now (Redis Support for FedScale #170 ).
  2. Sounds good. I will fix this soon.

Thanks a lot for your input! :)

@fanlai0990
Copy link
Collaborator Author

fanlai0990 commented Sep 2, 2022

@AmberLJC Can you please help to review and test the last commit? Training gets stuck as the model (version) is missing on executors. Thanks!

@fanlai0990 fanlai0990 merged commit 838fa81 into SymbioticLab:master Sep 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants