Skip to content

Conversation

@Moonchild1227
Copy link

Motivation

  • add FileStore persistence backend for KVCache disk caching
  • wait cache_transfer_manager inited
  • Adds a third KVCache storage backend in addition to Mooncake and AttentionStore

Modifications

  • cache_transfer_manager.py
  • file_store.py

Usage or Command

python -m fastdeploy.entrypoints.openai.api_server \
  --model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \
  --port 8300 \
  --metrics-port 8301 \
  --engine-worker-queue-port 8302 \
  --cache-queue-port 8303 \
  --max-model-len 32768 \
  --max-num-seqs 32 \
  --kvcache-storage-backend file \
  --enable-prefix-caching 

Accuracy Tests

none

Checklist

  • Add at least a tag in the PR title.
    • Tag list: [[FDConfig],[APIServer],[Engine], [Scheduler], [PD Disaggregation], [Executor], [Graph Optimization], [Speculative Decoding], [RL], [Models], [Quantization], [Loader], [OP], [KVCache], [DataProcessor], [BugFix], [Docs], [CI], [Optimization], [Feature], [Benchmark], [Others], [XPU], [HPU], [GCU], [DCU], [Iluvatar], [Metax]]
    • You can add new tags based on the PR content, but the semantics must be clear.
  • Format your code, run pre-commit before commit.
  • Add unit tests. Please write the reason in this PR if no unit tests.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

@paddle-bot
Copy link

paddle-bot bot commented Jan 23, 2026

Thanks for your contribution!

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

该 PR 旨在为 KVCache 磁盘缓存新增一个基于文件系统持久化的存储后端(FileStore),作为 Mooncake / AttentionStore 之外的第三种可选后端,并在 cache_transfer_manager 中接入读写流程与参数支持。

Changes:

  • 新增 FileStore KVCacheStorage 实现(本地文件落盘 / 读取 / 清理 / 查询)。
  • 将 FileStore 接入 cache_transfer_manager(新增 backend 选项与 file_path 参数)。
  • 扩展 Engine CLI 的 --kvcache-storage-backend 选项,并更新示例脚本。

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 16 comments.

Show a summary per file
File Description
fastdeploy/engine/args_utils.py CLI 参数 --kvcache-storage-backend 增加 file 选项。
fastdeploy/cache_manager/transfer_factory/file_store/file_store.py 新增 FileStore 后端的核心实现。
fastdeploy/cache_manager/transfer_factory/file_store/init.py 暴露 FileStore / FileStoreConfig。
fastdeploy/cache_manager/transfer_factory/init.py 将 FileStore 加入 transfer_factory 导出。
fastdeploy/cache_manager/cache_transfer_manager.py 增加 file 后端初始化、读写分支与 --kvcache_file_path 参数。
examples/cache_storage/run.sh 示例脚本中 stop.sh 调用被注释。

Comment on lines 1 to 5
import json
import os
import ctypes
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file_store.py 顶部引入了 json,但当前文件中未使用;在启用 ruff/flake8 的 pre-commit 下会触发未使用导入检查并导致 CI 失败。建议移除未使用的 import(或补充实际使用)。

Suggested change
import json
import os
import ctypes
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
import os
import ctypes
from dataclasses import dataclass
from typing import List, Optional

Copilot uses AI. Check for mistakes.
Comment on lines 745 to 773
elif self.storage_backend_type == "file":
logger.info(f"FileStore write_back_storage: writing {len(k_cache_keys)} blocks")

keys = k_cache_keys + v_cache_keys
target_locations = []

for i, block_id in enumerate(gpu_block_ids):

cpu_block_id = cpu_block_ids[i] if i < len(cpu_block_ids) else i
key_buf_ptr = self.storage_key_write_buffer + cpu_block_id * self.storage_buffer_stride_bytes
val_buf_ptr = self.storage_value_write_buffer + cpu_block_id * self.storage_buffer_stride_bytes
target_locations.extend([key_buf_ptr, val_buf_ptr])

target_sizes = [self.storage_buffer_stride_bytes] * len(keys)

start_time = time.time()
results = self.storage_backend.batch_set(keys, target_locations, target_sizes)
write_cost_time = time.time() - start_time

# 统计成功写入的block数量
success_block_num = 0
for i in range(len(k_cache_keys)):
key_result = results[i]
val_result = results[i + len(k_cache_keys)]
if key_result == 0 and val_result == 0:
success_block_num += 1

logger.debug(f"_run_write_back_storage, FileStore write_cost_time: {write_cost_time:.6f}s, success_blocks: {success_block_num}")
return success_block_num
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_run_write_back_storage 的 file 后端分支缺少将 GPU kv cache 交换到 storage_key_write_buffer / storage_value_write_buffer 的步骤(moonca ke 分支先 swap_cache_layout(mode=0) 再 batch_set)。当前实现会把未初始化/旧的 pinned buffer 内容写入文件,导致落盘数据不正确。建议在 file 分支先执行两次 swap_cache_layout(key/value,mode=0)再调用 batch_set。

Copilot uses AI. Check for mistakes.
Comment on lines 109 to 122
if k_cache_keys is None and v_cache_keys is None:
# 返回所有缓存项的计数
if not os.path.exists(self.file_path):
return 0
count = 0
for item in os.listdir(self.file_path):
item_path = os.path.join(self.file_path, item)
if os.path.isdir(item_path):
count += 1
logger.debug(f"FileStore query: found {count} cache entries")
return count

# 使用现有的exists方法查询keys
all_keys = (k_cache_keys or []) + (v_cache_keys or [])
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的注释是中文(“返回所有缓存项的计数 / 使用现有的exists方法查询keys”),但仓库约定代码内注释需使用英文。建议改为英文注释,避免中英文混用。

Copilot uses AI. Check for mistakes.
Comment on lines +224 to +231
logger.error(f"Unsupported target_location type: {type(target_location)}")
return None
except (FileNotFoundError, pickle.UnpicklingError, ValueError) as e:
logger.error(f"Failed to load tensor {key}: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error loading tensor {key}: {e}")
return None
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get() 的异常分支捕获了 pickle.UnpicklingError,但本文件未 import pickle;这会在执行到 try/except 时触发 NameError,导致 get() 直接失败。建议补充 import pickle(或移除该异常类型),并确保异常捕获不会引入新的运行时错误。

Copilot uses AI. Check for mistakes.
Comment on lines 595 to 604
# 统计成功读取的block数量
success_block_num = 0
for i in range(len(k_cache_keys)):
key_result = results[i]
val_result = results[i + len(k_cache_keys)]
if key_result is not None and val_result is not None:
success_block_num += 1

valid_gpu_block_ids = gpu_block_ids[:success_block_num]
logger.debug(f"_run_read_storage, FileStore read_cost_time: {read_cost_time:.6f}s, success_blocks: {success_block_num}")
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file 后端读取成功块统计逻辑有误:FileStore.get 在传入指针时返回的是写入字节数(成功为 >0,失败可能为 -1),但这里用 “is not None” 判断会把 -1 当成成功,导致 success_block_num 偏大。建议与 mooncake 分支一致:按返回值 > 0 计为成功,否则视为失败。

Suggested change
# 统计成功读取的block数量
success_block_num = 0
for i in range(len(k_cache_keys)):
key_result = results[i]
val_result = results[i + len(k_cache_keys)]
if key_result is not None and val_result is not None:
success_block_num += 1
valid_gpu_block_ids = gpu_block_ids[:success_block_num]
logger.debug(f"_run_read_storage, FileStore read_cost_time: {read_cost_time:.6f}s, success_blocks: {success_block_num}")
# 统计成功读取的block数量:FileStore.get/batch_get 返回写入字节数,
# > 0 表示成功,失败可能为 -1 或 None
success_block_num = 0
for i in range(len(k_cache_keys)):
key_result = results[i]
val_result = results[i + len(k_cache_keys)]
if (
key_result is not None
and val_result is not None
and key_result > 0
and val_result > 0
):
success_block_num += 1
valid_gpu_block_ids = gpu_block_ids[:success_block_num]
logger.debug(
f"_run_read_storage, FileStore read_cost_time: {read_cost_time:.6f}s, success_blocks: {success_block_num}"
)

Copilot uses AI. Check for mistakes.
for k in keys:
p = self._get_tensor_path(k)
found = os.path.exists(p)
logger.info(f"--- [CACHE_CHECK] Key: {k[:10]}... Path: {p} Found: {found} ---")
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exists() 对每个 key 使用 logger.info 打印路径与命中情况,prefix cache 场景下会被高频调用,容易造成日志膨胀并影响性能。对比 MooncakeStore.exists(fastdeploy/cache_manager/transfer_factory/mooncake_store/mooncake_store.py:234-237)使用 logger.debug,建议这里也降级到 debug 或做采样/聚合日志。

Suggested change
logger.info(f"--- [CACHE_CHECK] Key: {k[:10]}... Path: {p} Found: {found} ---")
logger.debug(f"--- [CACHE_CHECK] Key: {k[:10]}... Path: {p} Found: {found} ---")

Copilot uses AI. Check for mistakes.
Comment on lines 576 to 605

# 直接使用FileStore的batch_get接口
keys = k_cache_keys + v_cache_keys
target_locations = []

# 为每个block创建tensor目标位置
for i, block_id in enumerate(gpu_block_ids):
# 直接使用GPU缓存作为目标位置
cpu_block_id = cpu_block_ids[i] if i < len(cpu_block_ids) else i
key_buf_ptr = self.storage_key_read_buffer + cpu_block_id * self.storage_buffer_stride_bytes
val_buf_ptr = self.storage_value_read_buffer + cpu_block_id * self.storage_buffer_stride_bytes
target_locations.extend([key_buf_ptr, val_buf_ptr])

target_sizes = [self.storage_buffer_stride_bytes] * len(keys)

start_time = time.time()
results = self.storage_backend.batch_get(keys, target_locations, target_sizes)
read_cost_time = time.time() - start_time

# 统计成功读取的block数量
success_block_num = 0
for i in range(len(k_cache_keys)):
key_result = results[i]
val_result = results[i + len(k_cache_keys)]
if key_result is not None and val_result is not None:
success_block_num += 1

valid_gpu_block_ids = gpu_block_ids[:success_block_num]
logger.debug(f"_run_read_storage, FileStore read_cost_time: {read_cost_time:.6f}s, success_blocks: {success_block_num}")

Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_run_read_storage 在 file 后端分支里仅把数据读到了 storage_key_read_buffer / storage_value_read_buffer,但没有像 mooncake 分支那样调用 swap_cache_layout 把数据从 pinned buffer 写回 GPU 的 kv cache tensor,导致“读缓存命中但 GPU 缓存未更新”。建议在 file 分支补齐 swap_cache_layout(key/value 各一次),并确保 mode=1(cpu=>gpu)与 block id 映射一致。

Suggested change
# 直接使用FileStore的batch_get接口
keys = k_cache_keys + v_cache_keys
target_locations = []
# 为每个block创建tensor目标位置
for i, block_id in enumerate(gpu_block_ids):
# 直接使用GPU缓存作为目标位置
cpu_block_id = cpu_block_ids[i] if i < len(cpu_block_ids) else i
key_buf_ptr = self.storage_key_read_buffer + cpu_block_id * self.storage_buffer_stride_bytes
val_buf_ptr = self.storage_value_read_buffer + cpu_block_id * self.storage_buffer_stride_bytes
target_locations.extend([key_buf_ptr, val_buf_ptr])
target_sizes = [self.storage_buffer_stride_bytes] * len(keys)
start_time = time.time()
results = self.storage_backend.batch_get(keys, target_locations, target_sizes)
read_cost_time = time.time() - start_time
# 统计成功读取的block数量
success_block_num = 0
for i in range(len(k_cache_keys)):
key_result = results[i]
val_result = results[i + len(k_cache_keys)]
if key_result is not None and val_result is not None:
success_block_num += 1
valid_gpu_block_ids = gpu_block_ids[:success_block_num]
logger.debug(f"_run_read_storage, FileStore read_cost_time: {read_cost_time:.6f}s, success_blocks: {success_block_num}")
# Use FileStore batch_get to read contiguous blocks into pinned buffers
keys = k_cache_keys + v_cache_keys
target_locations = []
# For each block, compute the target position inside the pinned read buffers
for i, block_id in enumerate(gpu_block_ids):
cpu_block_id = cpu_block_ids[i] if i < len(cpu_block_ids) else i
key_buf_ptr = self.storage_key_read_buffer + cpu_block_id * self.storage_buffer_stride_bytes
val_buf_ptr = self.storage_value_read_buffer + cpu_block_id * self.storage_buffer_stride_bytes
target_locations.extend([key_buf_ptr, val_buf_ptr])
target_sizes = [self.storage_buffer_stride_bytes] * len(keys)
start_time = time.time()
results = self.storage_backend.batch_get(keys, target_locations, target_sizes)
read_cost_time = time.time() - start_time
# Count successfully read blocks
success_block_num = 0
for i in range(len(k_cache_keys)):
key_result = results[i]
val_result = results[i + len(k_cache_keys)]
if key_result is not None and val_result is not None:
success_block_num += 1
valid_gpu_block_ids = gpu_block_ids[:success_block_num]
# Sync successfully read blocks from CPU pinned buffers back to GPU kv cache
if success_block_num > 0:
valid_cpu_block_ids = cpu_block_ids[:success_block_num]
# key cache: cpu (pinned buffer) => gpu (kv cache tensor)
swap_cache_layout(
src_buffer=self.storage_key_read_buffer,
cache_kvs=self.gpu_cache_kvs,
src_block_ids=valid_cpu_block_ids,
dst_block_ids=valid_gpu_block_ids,
num_layers=self.num_layers,
num_extra_layers=self.num_extra_layers,
is_key=True,
mode=1,
)
# value cache: cpu (pinned buffer) => gpu (kv cache tensor)
swap_cache_layout(
src_buffer=self.storage_value_read_buffer,
cache_kvs=self.gpu_cache_kvs,
src_block_ids=valid_cpu_block_ids,
dst_block_ids=valid_gpu_block_ids,
num_layers=self.num_layers,
num_extra_layers=self.num_extra_layers,
is_key=False,
mode=1,
)
logger.debug(
f"_run_read_storage, FileStore read_cost_time: {read_cost_time:.6f}s, success_blocks: {success_block_num}"
)

Copilot uses AI. Check for mistakes.
)
from fastdeploy.cache_manager.transfer_factory import AttentionStore, MooncakeStore
from fastdeploy.cache_manager.transfer_factory import AttentionStore, MooncakeStore, FileStore
from fastdeploy.cache_manager.transfer_factory.file_store.file_store import FileStoreConfig
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cache_transfer_manager.py 中新增了 FileStoreConfig 的 import,但当前文件里未使用该符号,会触发 lint(ruff/flake8)并降低可读性。建议删除该 import,或改为从 fastdeploy.cache_manager.transfer_factory.file_store 统一导入并实际使用。

Suggested change
from fastdeploy.cache_manager.transfer_factory.file_store.file_store import FileStoreConfig

Copilot uses AI. Check for mistakes.
Comment on lines +261 to +270
elif args.kvcache_storage_backend == "file":
logger.info("Start initialize file store...")
self.storage_backend = FileStore(
namespace=self.model_id,
tp_rank=self.rank,
tp_size=self.n_ranks,
file_path=args.kvcache_file_path,
)
self._init_storage_buffer(args)
logger.info("Initialized file store successfully")
Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

新增 file 存储后端分支(初始化 + read/write 逻辑)目前缺少单测覆盖。仓库已有 tests/cache_manager/test_cache_transfer_manager.py 覆盖其他分支逻辑,建议补充针对 kvcache_storage_backend=="file" 的用例(可通过 mock FileStore.batch_get/batch_set 与 swap_cache_layout 调用来验证读写流程与成功块统计)。

Copilot generated this review using guidance from repository custom instructions.
Comment on lines 68 to 70
# def _get_suffixed_key(self, key: str) -> str:
# return key + self.config_suffix

Copy link

Copilot AI Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment appears to contain commented-out code.

Suggested change
# def _get_suffixed_key(self, key: str) -> str:
# return key + self.config_suffix

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

contributor External developers

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant