Skip to content

Conversation

@liyonghua0910
Copy link
Collaborator

Motivation

💡 If this PR is a Cherry Pick, the PR title needs to follow the format by adding the [Cherry-Pick] label at the very beginning and appending the original PR ID at the end. For example, [Cherry-Pick][CI] Add check trigger and logic(#5191)

💡 如若此PR是Cherry Pick,PR标题需遵循格式,在最开始加上[Cherry-Pick]标签,以及最后面加上原PR ID,例如[Cherry-Pick][CI] Add check trigger and logic(#5191)

Modifications

Usage or Command

Accuracy Tests

Checklist

  • Add at least a tag in the PR title.
    • Tag list: [[FDConfig],[APIServer],[Engine], [Scheduler], [PD Disaggregation], [Executor], [Graph Optimization], [Speculative Decoding], [RL], [Models], [Quantization], [Loader], [OP], [KVCache], [DataProcessor], [BugFix], [Docs], [CI], [Optimization], [Feature], [Benchmark], [Others], [XPU], [HPU], [GCU], [DCU], [Iluvatar], [Metax]]
    • You can add new tags based on the PR content, but the semantics must be clear.
  • Format your code, run pre-commit before commit.
  • Add unit tests. Please write the reason in this PR if no unit tests.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

@paddle-bot
Copy link

paddle-bot bot commented Jan 24, 2026

Thanks for your contribution!

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

此PR旨在修复缓存清除后缓存传输任务失败的bug。该问题的根本原因是在清除缓存期间,正在进行的传输任务可能会访问已被清除的缓存数据。通过引入pause/resume机制,确保在清除缓存之前所有正在进行的传输任务都已完成,并在清除过程中阻止新的传输任务启动。

Changes:

  • 添加了同步原语(_pause_condis_pausedinflight)来跟踪和控制传输任务状态
  • 实现了submit_task方法包装任务提交,支持pause/resume功能
  • 实现了pause()resume()方法来控制传输管理器的暂停和恢复
  • 在缓存清除流程中集成pause/resume调用,确保缓存清除期间没有并发的传输操作


time.sleep(0.1)

def pause(self):
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pause方法缺少文档字符串(docstring)。建议添加docstring说明该方法会阻塞等待所有进行中的传输任务完成,以及它设置transfer manager为暂停状态的行为。这对于理解和维护代码很重要。

Suggested change
def pause(self):
def pause(self):
"""Block until all inflight transfer tasks finish and mark the transfer manager as paused.
This method sets ``is_paused`` to True and then waits on the internal condition
variable until no inflight transfer tasks remain (i.e., ``self.inflight == 0``).
It is a blocking call and should be invoked from a thread that can safely wait.
"""

Copilot uses AI. Check for mistakes.
self.is_paused = True
self._pause_cond.wait_for(lambda: self.inflight == 0)

def resume(self):
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resume方法缺少文档字符串(docstring)。建议添加docstring说明该方法会唤醒所有等待中的任务提交,以及它恢复transfer manager正常运行状态的行为。

Suggested change
def resume(self):
def resume(self):
"""Resume the transfer manager by clearing the paused state and waking all waiting transfer tasks."""

Copilot uses AI. Check for mistakes.
self._pause_cond.wait_for(lambda: not self.is_paused)
self.inflight += 1

thread_pool.submit(inflight_task(task_fn, *args))
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在调用thread_pool.submit时,应该传递可调用对象(函数引用),而不是立即执行函数。当前代码inflight_task(task_fn, *args)会立即执行函数并尝试提交其返回值,这会导致任务无法正确提交到线程池。

应该使用lambda表达式或functools.partial来延迟执行:

  • thread_pool.submit(inflight_task, task_fn, *args)
  • thread_pool.submit(lambda: inflight_task(task_fn, *args))

参考同一代码库中的正确用法,如prefix_cache_manager.py:878行:self.executor_pool.submit(self.release_block_ids, task)

Suggested change
thread_pool.submit(inflight_task(task_fn, *args))
thread_pool.submit(inflight_task, task_fn, *args)

Copilot uses AI. Check for mistakes.

except Exception as e:
logger.error(f"[RL] failed to restore caches: {e}")

Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在UPDATING异常处理块中缺少resume()调用。如果在恢复缓存过程中发生异常,transfer manager将保持paused状态(因为pause()是在CLEARING阶段调用的),导致后续所有传输任务被永久阻塞。

建议在异常处理块中添加self.resume()调用,以确保即使发生错误也能恢复transfer manager的正常运行。

Suggested change
# Ensure transfer manager is resumed even if restore fails
self.resume()

Copilot uses AI. Check for mistakes.
Comment on lines 454 to 455

thread_pool.submit(inflight_task(task_fn, *args))
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

存在潜在的竞态条件:inflight计数器在锁内递增(第453行),但任务提交发生在锁外(第455行)。如果在释放锁之后、任务提交之前调用了pause(),pause()可能会认为没有进行中的任务(因为任务还未提交),但实际上inflight已经被递增了。

建议将thread_pool.submit调用移到锁内,确保inflight计数和任务提交的原子性。不过需要注意submit操作应该是非阻塞的,这在使用ThreadPoolExecutor时是安全的。

Suggested change
thread_pool.submit(inflight_task(task_fn, *args))
thread_pool.submit(inflight_task, task_fn, *args)

Copilot uses AI. Check for mistakes.
Comment on lines +440 to +441
def submit_task(self, thread_pool: concurrent.futures.ThreadPoolExecutor, task_fn, *args):

Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

submit_task方法缺少文档字符串(docstring)。从代码库的约定来看,其他公共方法(如check_work_status、do_data_transfer等)都有docstring。建议添加docstring来说明该方法的用途、参数和行为,特别是它在pause/resume机制中的作用。

Suggested change
def submit_task(self, thread_pool: concurrent.futures.ThreadPoolExecutor, task_fn, *args):
def submit_task(self, thread_pool: concurrent.futures.ThreadPoolExecutor, task_fn, *args):
"""
Submit a transfer task to the thread pool with pause/resume support.
This method blocks task submission while the manager is paused and tracks
the number of inflight tasks. It cooperates with the pause/resume mechanism
by:
- Waiting on ``self._pause_cond`` until ``self.is_paused`` is False before
increasing the inflight counter.
- Decreasing ``self.inflight`` when the wrapped task finishes and notifying
all waiters on ``self._pause_cond`` when the inflight counter drops to 0.
Args:
thread_pool (concurrent.futures.ThreadPoolExecutor): Executor used to
run the submitted task asynchronously.
task_fn (Callable): The function to be executed in the thread pool.
*args: Positional arguments passed through to ``task_fn``.
"""

Copilot uses AI. Check for mistakes.
@codecov-commenter
Copy link

Codecov Report

❌ Patch coverage is 20.68966% with 23 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (release/2.4@9a48206). Learn more about missing BASE report.

Files with missing lines Patch % Lines
fastdeploy/cache_manager/cache_transfer_manager.py 20.68% 23 Missing ⚠️
Additional details and impacted files
@@              Coverage Diff               @@
##             release/2.4    #6201   +/-   ##
==============================================
  Coverage               ?   58.67%           
==============================================
  Files                  ?      329           
  Lines                  ?    41073           
  Branches               ?     6262           
==============================================
  Hits                   ?    24100           
  Misses                 ?    15086           
  Partials               ?     1887           
Flag Coverage Δ
GPU 58.67% <20.68%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@Jiang-Jia-Jun Jiang-Jia-Jun merged commit 1c01c9b into PaddlePaddle:release/2.4 Jan 26, 2026
21 of 28 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants