-
Notifications
You must be signed in to change notification settings - Fork 689
[BugFix] fix cache transfer tasks failure after cache cleared #6201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BugFix] fix cache transfer tasks failure after cache cleared #6201
Conversation
|
Thanks for your contribution! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
此PR旨在修复缓存清除后缓存传输任务失败的bug。该问题的根本原因是在清除缓存期间,正在进行的传输任务可能会访问已被清除的缓存数据。通过引入pause/resume机制,确保在清除缓存之前所有正在进行的传输任务都已完成,并在清除过程中阻止新的传输任务启动。
Changes:
- 添加了同步原语(
_pause_cond、is_paused、inflight)来跟踪和控制传输任务状态 - 实现了
submit_task方法包装任务提交,支持pause/resume功能 - 实现了
pause()和resume()方法来控制传输管理器的暂停和恢复 - 在缓存清除流程中集成pause/resume调用,确保缓存清除期间没有并发的传输操作
|
|
||
| time.sleep(0.1) | ||
|
|
||
| def pause(self): |
Copilot
AI
Jan 24, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pause方法缺少文档字符串(docstring)。建议添加docstring说明该方法会阻塞等待所有进行中的传输任务完成,以及它设置transfer manager为暂停状态的行为。这对于理解和维护代码很重要。
| def pause(self): | |
| def pause(self): | |
| """Block until all inflight transfer tasks finish and mark the transfer manager as paused. | |
| This method sets ``is_paused`` to True and then waits on the internal condition | |
| variable until no inflight transfer tasks remain (i.e., ``self.inflight == 0``). | |
| It is a blocking call and should be invoked from a thread that can safely wait. | |
| """ |
| self.is_paused = True | ||
| self._pause_cond.wait_for(lambda: self.inflight == 0) | ||
|
|
||
| def resume(self): |
Copilot
AI
Jan 24, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resume方法缺少文档字符串(docstring)。建议添加docstring说明该方法会唤醒所有等待中的任务提交,以及它恢复transfer manager正常运行状态的行为。
| def resume(self): | |
| def resume(self): | |
| """Resume the transfer manager by clearing the paused state and waking all waiting transfer tasks.""" |
| self._pause_cond.wait_for(lambda: not self.is_paused) | ||
| self.inflight += 1 | ||
|
|
||
| thread_pool.submit(inflight_task(task_fn, *args)) |
Copilot
AI
Jan 24, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
在调用thread_pool.submit时,应该传递可调用对象(函数引用),而不是立即执行函数。当前代码inflight_task(task_fn, *args)会立即执行函数并尝试提交其返回值,这会导致任务无法正确提交到线程池。
应该使用lambda表达式或functools.partial来延迟执行:
thread_pool.submit(inflight_task, task_fn, *args)或thread_pool.submit(lambda: inflight_task(task_fn, *args))
参考同一代码库中的正确用法,如prefix_cache_manager.py:878行:self.executor_pool.submit(self.release_block_ids, task)
| thread_pool.submit(inflight_task(task_fn, *args)) | |
| thread_pool.submit(inflight_task, task_fn, *args) |
|
|
||
| except Exception as e: | ||
| logger.error(f"[RL] failed to restore caches: {e}") | ||
|
|
Copilot
AI
Jan 24, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
在UPDATING异常处理块中缺少resume()调用。如果在恢复缓存过程中发生异常,transfer manager将保持paused状态(因为pause()是在CLEARING阶段调用的),导致后续所有传输任务被永久阻塞。
建议在异常处理块中添加self.resume()调用,以确保即使发生错误也能恢复transfer manager的正常运行。
| # Ensure transfer manager is resumed even if restore fails | |
| self.resume() |
|
|
||
| thread_pool.submit(inflight_task(task_fn, *args)) |
Copilot
AI
Jan 24, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
存在潜在的竞态条件:inflight计数器在锁内递增(第453行),但任务提交发生在锁外(第455行)。如果在释放锁之后、任务提交之前调用了pause(),pause()可能会认为没有进行中的任务(因为任务还未提交),但实际上inflight已经被递增了。
建议将thread_pool.submit调用移到锁内,确保inflight计数和任务提交的原子性。不过需要注意submit操作应该是非阻塞的,这在使用ThreadPoolExecutor时是安全的。
| thread_pool.submit(inflight_task(task_fn, *args)) | |
| thread_pool.submit(inflight_task, task_fn, *args) |
| def submit_task(self, thread_pool: concurrent.futures.ThreadPoolExecutor, task_fn, *args): | ||
|
|
Copilot
AI
Jan 24, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
submit_task方法缺少文档字符串(docstring)。从代码库的约定来看,其他公共方法(如check_work_status、do_data_transfer等)都有docstring。建议添加docstring来说明该方法的用途、参数和行为,特别是它在pause/resume机制中的作用。
| def submit_task(self, thread_pool: concurrent.futures.ThreadPoolExecutor, task_fn, *args): | |
| def submit_task(self, thread_pool: concurrent.futures.ThreadPoolExecutor, task_fn, *args): | |
| """ | |
| Submit a transfer task to the thread pool with pause/resume support. | |
| This method blocks task submission while the manager is paused and tracks | |
| the number of inflight tasks. It cooperates with the pause/resume mechanism | |
| by: | |
| - Waiting on ``self._pause_cond`` until ``self.is_paused`` is False before | |
| increasing the inflight counter. | |
| - Decreasing ``self.inflight`` when the wrapped task finishes and notifying | |
| all waiters on ``self._pause_cond`` when the inflight counter drops to 0. | |
| Args: | |
| thread_pool (concurrent.futures.ThreadPoolExecutor): Executor used to | |
| run the submitted task asynchronously. | |
| task_fn (Callable): The function to be executed in the thread pool. | |
| *args: Positional arguments passed through to ``task_fn``. | |
| """ |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## release/2.4 #6201 +/- ##
==============================================
Coverage ? 58.67%
==============================================
Files ? 329
Lines ? 41073
Branches ? 6262
==============================================
Hits ? 24100
Misses ? 15086
Partials ? 1887
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
1c01c9b
into
PaddlePaddle:release/2.4
Motivation
Modifications
Usage or Command
Accuracy Tests
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.