SDK 参考¶
PeerCache 既可作为 SGLang 的即插即用后端,也可作为库使用。本页记录公开的 Python API 以及 C++ 数据面绑定。
peercache.store.PeerCacheStore¶
SGLang 的 HiCacheStorage 后端。通常由 SGLang 的 dynamic backend 工厂实例化,也可
直接驱动。
from types import SimpleNamespace
from peercache.store import PeerCacheStore
storage_config = SimpleNamespace(
tp_rank=0, tp_size=1, pp_rank=0, pp_size=1, is_mla_model=False,
extra_config={"discovery_addr": "10.0.0.1:31998", "protocol": "rdma",
"device_name": "mlx5_0", "global_segment_size": "8gb"},
)
store = PeerCacheStore(storage_config)
store.register_mem_pool_host(mem_pool_host) # SGLang 会调用它
主要方法¶
| 方法 | 说明 |
|---|---|
register_mem_pool_host(mem_pool_host) |
注册接收 MR(主机 KV 缓冲区),并分配 + 注册发布池。 |
register_mem_host_pool_v2(host_pool, name) |
为 v2 传输注册额外的(hybrid)内存池。 |
batch_set_v1(keys, host_indices) |
本地发布页面 + 在目录记录位置。按页返回 list[bool]。 |
batch_get_v1(keys, host_indices) |
查询位置 + RDMA READ 进主机缓冲区。按页返回 list[bool]。 |
batch_exists(keys) |
从头开始连续存在的页面数量。 |
batch_set_v2 / batch_get_v2 / batch_exists_v2 |
Hybrid 模型(KV + sidecar 池)路径。 |
set / get / batch_set / batch_get / exists |
单 key / 批量零拷贝(ptr+size)API。 |
clear() |
清除本地所有已发布条目(池 + 目录)。 |
close() |
拆除服务发现、RPC 服务器与传输。 |
peercache.config.PeerCacheConfig¶
from peercache.config import PeerCacheConfig
cfg = PeerCacheConfig(
discovery_addr="10.0.0.1:31998",
protocol="rdma", # 或 "tcp"
device_name="mlx5_0",
ib_port=1, gid_index=3,
global_segment_size="8gb", # 接受 int 或 "4gb"/"512mb"
vnodes=160,
directory_replicas=2,
max_masters=3, # head 加最多 2 个 host 作为发现 master
disk_enabled=True,
disk_path="/data/peercache/",
disk_size="100gb",
)
# 或从 SGLang extra_config 构造:
cfg = PeerCacheConfig.from_extra_config({"discovery_addr": "10.0.0.1:31998"})
peercache.server.NodeRuntime¶
为单个节点串联控制面 + 数据面:传输、控制 RPC 服务器 + 目录分片、服务发现客户端、 哈希环以及目录客户端。
from peercache.server import NodeRuntime
rt = NodeRuntime(cfg)
rt.start()
print(rt.node_id, rt.local_rdma_endpoint)
print(rt.is_meta) # 本节点是否承担了内嵌 meta
rt.directory.put({...}) # peercache.directory.DirectoryClient
rt.transport.batch_read([...]) # peercache.transport.Transport
rt.stop()
peercache.transport¶
通用数据面接口,含两种实现。
from peercache.transport import create_transport, Mr, ReadOp
t = create_transport(cfg) # RdmaTransport,或 TcpTransport 回退
mr: Mr = t.register_mr(addr, length) # -> Mr(addr, rkey, lkey)
ok: list[bool] = t.batch_read([
ReadOp(remote_endpoint="host:port", local_addr=dst,
remote_addr=src, rkey=mr.rkey, length=n),
])
endpoint = t.local_endpoint() # 对外公告的 "host:port"
RdmaTransport封装 C++ 的_peercache.TransferEngine(单边 RDMA READ)。TcpTransport在 TCP 之上镜像同一套 API,用于无 RDMA 时的测试。
peercache.directory¶
from peercache.directory import DirectoryServer, DirectoryClient
server = DirectoryServer(); server.attach(rpc_server) # 承载一个分片
client = DirectoryClient(ring, resolve_control, replicas=1)
client.put({key: DataLocation(...)})
locs = client.get([key]) # list[DataLocation | None]
present = client.exists([key]) # list[bool]
client.delete([key])
peercache.discovery¶
from peercache.discovery import DiscoveryServer, DiscoveryClient
meta = DiscoveryServer("0.0.0.0", 31998); meta.start()
client = DiscoveryClient(addr, node_info, on_members=cb, heartbeat_interval=2.0)
client.start() # 注册 + 心跳 + 刷新成员列表
client.members() # dict[node_id, NodeInfo]
注意:通常你无需手动创建
DiscoveryServer——NodeRuntime会在 IP 等于discovery_addr的节点上自动启动它。
peercache.hashring.ConsistentHashRing¶
from peercache.hashring import ConsistentHashRing
ring = ConsistentHashRing(vnodes=160)
ring.set_nodes(["n1", "n2", "n3"])
ring.get_node("key") # 归属节点 node_id
ring.get_nodes("key", 2) # 副本集合,顺时针,互不相同
peercache.pool.PublishedPool¶
from peercache.pool import PublishedPool
pool = PublishedPool(base_addr, capacity, rkey, on_evict=cb)
remote_addr = pool.publish(key, src_ptr, length) # 本地 memcpy;过大则返回 None
pool.address_of(key) # (remote_addr, length) | None
pool.remove(key)
peercache.diskstore.DiskStore¶
L4 磁盘分层:异步写透、按 LRU 约束容量、重启安全的索引。
from peercache.diskstore import DiskStore
d = DiskStore("/data/peercache", max_bytes=100 << 30,
on_evict=lambda keys: None, node_id="node-0")
d.put("key", b"...page bytes...") # 异步写透(幂等)
d.get("key") # bytes | None(命中后移到 MRU)
d.exists("key") # bool
used_bytes, num_keys = d.stats()
d.remove("key"); d.close()
peercache.metrics¶
from peercache.metrics import Metrics, MetricsServer
m = Metrics(node_id="node-0")
m.record_read(hit=True, nbytes=4096, seconds=0.0003, source="remote") # local/remote/disk
m.record_write(nbytes=4096, seconds=0.0002)
m.set_gauge_provider("pool_bytes_used", lambda: pool.bytes_used)
text = m.render_prometheus() # Prometheus 文本格式
srv = MetricsServer(m, "0.0.0.0", 31997, dashboard=True)
srv.start() # GET /metrics, GET /(可视化页面), GET /healthz
srv.stop()
PeerCacheStore 会自动接入:落盘到 DiskStore、注册内存池/磁盘/成员数 gauge,并
运行 MetricsServer(参见 extra_config 中的 disk_* 与 metrics_* 键)。
peercache.types¶
DataLocation(node_id, rdma_endpoint, remote_addr, rkey, length)—— 一条目录 值;to_dict()/from_dict()用于线缆序列化。NodeInfo(node_id, control_host, control_port, rdma_host, rdma_port)—— 节点向 服务发现公告的信息。
C++ 绑定:_peercache.TransferEngine¶
通过 pybind11 从 cpp/ 构建(仅在装有 libibverbs/librdmacm 的主机上)。
import _peercache
assert _peercache.HAS_RDMA
eng = _peercache.TransferEngine(device_name="mlx5_0", ib_port=1, gid_index=3,
bind_host="0.0.0.0", bind_port=0)
mr = eng.register_mr(addr, length) # -> MrHandle(addr, length, lkey, rkey)
reqs = [_peercache.ReadRequest(remote_node="host:port", local_addr=dst,
remote_addr=src, rkey=mr.rkey, length=n)]
ok = eng.batch_read(reqs) # list[bool]
endpoint = eng.local_endpoint()
在不带 RDMA 构建时(PEERCACHE_NO_RDMA=ON),该模块以 HAS_RDMA == False 导入,
create_transport 会自动选择 TCP 回退。