-
Notifications
You must be signed in to change notification settings - Fork 689
[Feature] [KVCache] support file_store kv cache backend #6188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
[Feature] [KVCache] support file_store kv cache backend #6188
Conversation
|
Thanks for your contribution! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
该 PR 旨在为 KVCache 磁盘缓存新增一个基于文件系统持久化的存储后端(FileStore),作为 Mooncake / AttentionStore 之外的第三种可选后端,并在 cache_transfer_manager 中接入读写流程与参数支持。
Changes:
- 新增 FileStore KVCacheStorage 实现(本地文件落盘 / 读取 / 清理 / 查询)。
- 将 FileStore 接入 cache_transfer_manager(新增 backend 选项与 file_path 参数)。
- 扩展 Engine CLI 的
--kvcache-storage-backend选项,并更新示例脚本。
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 16 comments.
Show a summary per file
| File | Description |
|---|---|
| fastdeploy/engine/args_utils.py | CLI 参数 --kvcache-storage-backend 增加 file 选项。 |
| fastdeploy/cache_manager/transfer_factory/file_store/file_store.py | 新增 FileStore 后端的核心实现。 |
| fastdeploy/cache_manager/transfer_factory/file_store/init.py | 暴露 FileStore / FileStoreConfig。 |
| fastdeploy/cache_manager/transfer_factory/init.py | 将 FileStore 加入 transfer_factory 导出。 |
| fastdeploy/cache_manager/cache_transfer_manager.py | 增加 file 后端初始化、读写分支与 --kvcache_file_path 参数。 |
| examples/cache_storage/run.sh | 示例脚本中 stop.sh 调用被注释。 |
| import json | ||
| import os | ||
| import ctypes | ||
| from dataclasses import dataclass | ||
| from typing import Any, Dict, List, Optional |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
file_store.py 顶部引入了 json,但当前文件中未使用;在启用 ruff/flake8 的 pre-commit 下会触发未使用导入检查并导致 CI 失败。建议移除未使用的 import(或补充实际使用)。
| import json | |
| import os | |
| import ctypes | |
| from dataclasses import dataclass | |
| from typing import Any, Dict, List, Optional | |
| import os | |
| import ctypes | |
| from dataclasses import dataclass | |
| from typing import List, Optional |
| elif self.storage_backend_type == "file": | ||
| logger.info(f"FileStore write_back_storage: writing {len(k_cache_keys)} blocks") | ||
|
|
||
| keys = k_cache_keys + v_cache_keys | ||
| target_locations = [] | ||
|
|
||
| for i, block_id in enumerate(gpu_block_ids): | ||
|
|
||
| cpu_block_id = cpu_block_ids[i] if i < len(cpu_block_ids) else i | ||
| key_buf_ptr = self.storage_key_write_buffer + cpu_block_id * self.storage_buffer_stride_bytes | ||
| val_buf_ptr = self.storage_value_write_buffer + cpu_block_id * self.storage_buffer_stride_bytes | ||
| target_locations.extend([key_buf_ptr, val_buf_ptr]) | ||
|
|
||
| target_sizes = [self.storage_buffer_stride_bytes] * len(keys) | ||
|
|
||
| start_time = time.time() | ||
| results = self.storage_backend.batch_set(keys, target_locations, target_sizes) | ||
| write_cost_time = time.time() - start_time | ||
|
|
||
| # 统计成功写入的block数量 | ||
| success_block_num = 0 | ||
| for i in range(len(k_cache_keys)): | ||
| key_result = results[i] | ||
| val_result = results[i + len(k_cache_keys)] | ||
| if key_result == 0 and val_result == 0: | ||
| success_block_num += 1 | ||
|
|
||
| logger.debug(f"_run_write_back_storage, FileStore write_cost_time: {write_cost_time:.6f}s, success_blocks: {success_block_num}") | ||
| return success_block_num |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_run_write_back_storage 的 file 后端分支缺少将 GPU kv cache 交换到 storage_key_write_buffer / storage_value_write_buffer 的步骤(moonca ke 分支先 swap_cache_layout(mode=0) 再 batch_set)。当前实现会把未初始化/旧的 pinned buffer 内容写入文件,导致落盘数据不正确。建议在 file 分支先执行两次 swap_cache_layout(key/value,mode=0)再调用 batch_set。
| if k_cache_keys is None and v_cache_keys is None: | ||
| # 返回所有缓存项的计数 | ||
| if not os.path.exists(self.file_path): | ||
| return 0 | ||
| count = 0 | ||
| for item in os.listdir(self.file_path): | ||
| item_path = os.path.join(self.file_path, item) | ||
| if os.path.isdir(item_path): | ||
| count += 1 | ||
| logger.debug(f"FileStore query: found {count} cache entries") | ||
| return count | ||
|
|
||
| # 使用现有的exists方法查询keys | ||
| all_keys = (k_cache_keys or []) + (v_cache_keys or []) |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里的注释是中文(“返回所有缓存项的计数 / 使用现有的exists方法查询keys”),但仓库约定代码内注释需使用英文。建议改为英文注释,避免中英文混用。
| logger.error(f"Unsupported target_location type: {type(target_location)}") | ||
| return None | ||
| except (FileNotFoundError, pickle.UnpicklingError, ValueError) as e: | ||
| logger.error(f"Failed to load tensor {key}: {e}") | ||
| return None | ||
| except Exception as e: | ||
| logger.error(f"Unexpected error loading tensor {key}: {e}") | ||
| return None |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get() 的异常分支捕获了 pickle.UnpicklingError,但本文件未 import pickle;这会在执行到 try/except 时触发 NameError,导致 get() 直接失败。建议补充 import pickle(或移除该异常类型),并确保异常捕获不会引入新的运行时错误。
| # 统计成功读取的block数量 | ||
| success_block_num = 0 | ||
| for i in range(len(k_cache_keys)): | ||
| key_result = results[i] | ||
| val_result = results[i + len(k_cache_keys)] | ||
| if key_result is not None and val_result is not None: | ||
| success_block_num += 1 | ||
|
|
||
| valid_gpu_block_ids = gpu_block_ids[:success_block_num] | ||
| logger.debug(f"_run_read_storage, FileStore read_cost_time: {read_cost_time:.6f}s, success_blocks: {success_block_num}") |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
file 后端读取成功块统计逻辑有误:FileStore.get 在传入指针时返回的是写入字节数(成功为 >0,失败可能为 -1),但这里用 “is not None” 判断会把 -1 当成成功,导致 success_block_num 偏大。建议与 mooncake 分支一致:按返回值 > 0 计为成功,否则视为失败。
| # 统计成功读取的block数量 | |
| success_block_num = 0 | |
| for i in range(len(k_cache_keys)): | |
| key_result = results[i] | |
| val_result = results[i + len(k_cache_keys)] | |
| if key_result is not None and val_result is not None: | |
| success_block_num += 1 | |
| valid_gpu_block_ids = gpu_block_ids[:success_block_num] | |
| logger.debug(f"_run_read_storage, FileStore read_cost_time: {read_cost_time:.6f}s, success_blocks: {success_block_num}") | |
| # 统计成功读取的block数量:FileStore.get/batch_get 返回写入字节数, | |
| # > 0 表示成功,失败可能为 -1 或 None | |
| success_block_num = 0 | |
| for i in range(len(k_cache_keys)): | |
| key_result = results[i] | |
| val_result = results[i + len(k_cache_keys)] | |
| if ( | |
| key_result is not None | |
| and val_result is not None | |
| and key_result > 0 | |
| and val_result > 0 | |
| ): | |
| success_block_num += 1 | |
| valid_gpu_block_ids = gpu_block_ids[:success_block_num] | |
| logger.debug( | |
| f"_run_read_storage, FileStore read_cost_time: {read_cost_time:.6f}s, success_blocks: {success_block_num}" | |
| ) |
| for k in keys: | ||
| p = self._get_tensor_path(k) | ||
| found = os.path.exists(p) | ||
| logger.info(f"--- [CACHE_CHECK] Key: {k[:10]}... Path: {p} Found: {found} ---") |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exists() 对每个 key 使用 logger.info 打印路径与命中情况,prefix cache 场景下会被高频调用,容易造成日志膨胀并影响性能。对比 MooncakeStore.exists(fastdeploy/cache_manager/transfer_factory/mooncake_store/mooncake_store.py:234-237)使用 logger.debug,建议这里也降级到 debug 或做采样/聚合日志。
| logger.info(f"--- [CACHE_CHECK] Key: {k[:10]}... Path: {p} Found: {found} ---") | |
| logger.debug(f"--- [CACHE_CHECK] Key: {k[:10]}... Path: {p} Found: {found} ---") |
|
|
||
| # 直接使用FileStore的batch_get接口 | ||
| keys = k_cache_keys + v_cache_keys | ||
| target_locations = [] | ||
|
|
||
| # 为每个block创建tensor目标位置 | ||
| for i, block_id in enumerate(gpu_block_ids): | ||
| # 直接使用GPU缓存作为目标位置 | ||
| cpu_block_id = cpu_block_ids[i] if i < len(cpu_block_ids) else i | ||
| key_buf_ptr = self.storage_key_read_buffer + cpu_block_id * self.storage_buffer_stride_bytes | ||
| val_buf_ptr = self.storage_value_read_buffer + cpu_block_id * self.storage_buffer_stride_bytes | ||
| target_locations.extend([key_buf_ptr, val_buf_ptr]) | ||
|
|
||
| target_sizes = [self.storage_buffer_stride_bytes] * len(keys) | ||
|
|
||
| start_time = time.time() | ||
| results = self.storage_backend.batch_get(keys, target_locations, target_sizes) | ||
| read_cost_time = time.time() - start_time | ||
|
|
||
| # 统计成功读取的block数量 | ||
| success_block_num = 0 | ||
| for i in range(len(k_cache_keys)): | ||
| key_result = results[i] | ||
| val_result = results[i + len(k_cache_keys)] | ||
| if key_result is not None and val_result is not None: | ||
| success_block_num += 1 | ||
|
|
||
| valid_gpu_block_ids = gpu_block_ids[:success_block_num] | ||
| logger.debug(f"_run_read_storage, FileStore read_cost_time: {read_cost_time:.6f}s, success_blocks: {success_block_num}") | ||
|
|
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_run_read_storage 在 file 后端分支里仅把数据读到了 storage_key_read_buffer / storage_value_read_buffer,但没有像 mooncake 分支那样调用 swap_cache_layout 把数据从 pinned buffer 写回 GPU 的 kv cache tensor,导致“读缓存命中但 GPU 缓存未更新”。建议在 file 分支补齐 swap_cache_layout(key/value 各一次),并确保 mode=1(cpu=>gpu)与 block id 映射一致。
| # 直接使用FileStore的batch_get接口 | |
| keys = k_cache_keys + v_cache_keys | |
| target_locations = [] | |
| # 为每个block创建tensor目标位置 | |
| for i, block_id in enumerate(gpu_block_ids): | |
| # 直接使用GPU缓存作为目标位置 | |
| cpu_block_id = cpu_block_ids[i] if i < len(cpu_block_ids) else i | |
| key_buf_ptr = self.storage_key_read_buffer + cpu_block_id * self.storage_buffer_stride_bytes | |
| val_buf_ptr = self.storage_value_read_buffer + cpu_block_id * self.storage_buffer_stride_bytes | |
| target_locations.extend([key_buf_ptr, val_buf_ptr]) | |
| target_sizes = [self.storage_buffer_stride_bytes] * len(keys) | |
| start_time = time.time() | |
| results = self.storage_backend.batch_get(keys, target_locations, target_sizes) | |
| read_cost_time = time.time() - start_time | |
| # 统计成功读取的block数量 | |
| success_block_num = 0 | |
| for i in range(len(k_cache_keys)): | |
| key_result = results[i] | |
| val_result = results[i + len(k_cache_keys)] | |
| if key_result is not None and val_result is not None: | |
| success_block_num += 1 | |
| valid_gpu_block_ids = gpu_block_ids[:success_block_num] | |
| logger.debug(f"_run_read_storage, FileStore read_cost_time: {read_cost_time:.6f}s, success_blocks: {success_block_num}") | |
| # Use FileStore batch_get to read contiguous blocks into pinned buffers | |
| keys = k_cache_keys + v_cache_keys | |
| target_locations = [] | |
| # For each block, compute the target position inside the pinned read buffers | |
| for i, block_id in enumerate(gpu_block_ids): | |
| cpu_block_id = cpu_block_ids[i] if i < len(cpu_block_ids) else i | |
| key_buf_ptr = self.storage_key_read_buffer + cpu_block_id * self.storage_buffer_stride_bytes | |
| val_buf_ptr = self.storage_value_read_buffer + cpu_block_id * self.storage_buffer_stride_bytes | |
| target_locations.extend([key_buf_ptr, val_buf_ptr]) | |
| target_sizes = [self.storage_buffer_stride_bytes] * len(keys) | |
| start_time = time.time() | |
| results = self.storage_backend.batch_get(keys, target_locations, target_sizes) | |
| read_cost_time = time.time() - start_time | |
| # Count successfully read blocks | |
| success_block_num = 0 | |
| for i in range(len(k_cache_keys)): | |
| key_result = results[i] | |
| val_result = results[i + len(k_cache_keys)] | |
| if key_result is not None and val_result is not None: | |
| success_block_num += 1 | |
| valid_gpu_block_ids = gpu_block_ids[:success_block_num] | |
| # Sync successfully read blocks from CPU pinned buffers back to GPU kv cache | |
| if success_block_num > 0: | |
| valid_cpu_block_ids = cpu_block_ids[:success_block_num] | |
| # key cache: cpu (pinned buffer) => gpu (kv cache tensor) | |
| swap_cache_layout( | |
| src_buffer=self.storage_key_read_buffer, | |
| cache_kvs=self.gpu_cache_kvs, | |
| src_block_ids=valid_cpu_block_ids, | |
| dst_block_ids=valid_gpu_block_ids, | |
| num_layers=self.num_layers, | |
| num_extra_layers=self.num_extra_layers, | |
| is_key=True, | |
| mode=1, | |
| ) | |
| # value cache: cpu (pinned buffer) => gpu (kv cache tensor) | |
| swap_cache_layout( | |
| src_buffer=self.storage_value_read_buffer, | |
| cache_kvs=self.gpu_cache_kvs, | |
| src_block_ids=valid_cpu_block_ids, | |
| dst_block_ids=valid_gpu_block_ids, | |
| num_layers=self.num_layers, | |
| num_extra_layers=self.num_extra_layers, | |
| is_key=False, | |
| mode=1, | |
| ) | |
| logger.debug( | |
| f"_run_read_storage, FileStore read_cost_time: {read_cost_time:.6f}s, success_blocks: {success_block_num}" | |
| ) |
| ) | ||
| from fastdeploy.cache_manager.transfer_factory import AttentionStore, MooncakeStore | ||
| from fastdeploy.cache_manager.transfer_factory import AttentionStore, MooncakeStore, FileStore | ||
| from fastdeploy.cache_manager.transfer_factory.file_store.file_store import FileStoreConfig |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cache_transfer_manager.py 中新增了 FileStoreConfig 的 import,但当前文件里未使用该符号,会触发 lint(ruff/flake8)并降低可读性。建议删除该 import,或改为从 fastdeploy.cache_manager.transfer_factory.file_store 统一导入并实际使用。
| from fastdeploy.cache_manager.transfer_factory.file_store.file_store import FileStoreConfig |
| elif args.kvcache_storage_backend == "file": | ||
| logger.info("Start initialize file store...") | ||
| self.storage_backend = FileStore( | ||
| namespace=self.model_id, | ||
| tp_rank=self.rank, | ||
| tp_size=self.n_ranks, | ||
| file_path=args.kvcache_file_path, | ||
| ) | ||
| self._init_storage_buffer(args) | ||
| logger.info("Initialized file store successfully") |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
新增 file 存储后端分支(初始化 + read/write 逻辑)目前缺少单测覆盖。仓库已有 tests/cache_manager/test_cache_transfer_manager.py 覆盖其他分支逻辑,建议补充针对 kvcache_storage_backend=="file" 的用例(可通过 mock FileStore.batch_get/batch_set 与 swap_cache_layout 调用来验证读写流程与成功块统计)。
| # def _get_suffixed_key(self, key: str) -> str: | ||
| # return key + self.config_suffix | ||
|
|
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment appears to contain commented-out code.
| # def _get_suffixed_key(self, key: str) -> str: | |
| # return key + self.config_suffix |
Motivation
Modifications
Usage or Command
python -m fastdeploy.entrypoints.openai.api_server \ --model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \ --port 8300 \ --metrics-port 8301 \ --engine-worker-queue-port 8302 \ --cache-queue-port 8303 \ --max-model-len 32768 \ --max-num-seqs 32 \ --kvcache-storage-backend file \ --enable-prefix-cachingAccuracy Tests
none
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.