Skip to content

feat: implement CpuTpBroadcaster for CPU-only tensor broadcasting#960

Open
Vinkle-hzt wants to merge 1 commit intoalibaba:mainfrom
Vinkle-hzt:feat/cpu_broadcast
Open

feat: implement CpuTpBroadcaster for CPU-only tensor broadcasting#960
Vinkle-hzt wants to merge 1 commit intoalibaba:mainfrom
Vinkle-hzt:feat/cpu_broadcast

Conversation

@Vinkle-hzt
Copy link
Copy Markdown
Collaborator

  • Added CpuTpBroadcaster class to facilitate CPU tensor broadcasting using Unix Domain Sockets, avoiding NCCL's cudaDeviceSynchronize stalls.
  • Introduced methods for initialization and broadcasting, ensuring thread safety and proper handling of intra-node tensor communication.
  • Updated relevant build files to include the new broadcaster and modified execBroadcastCpu to utilize it for CPU tensors.
  • Integrated initialization in Python to ensure proper setup across TP ranks.

@Vinkle-hzt Vinkle-hzt requested a review from LLLLKKKK as a code owner April 30, 2026 07:47
@LLLLKKKK
Copy link
Copy Markdown
Collaborator

AI Code Review - PR #960

Status: LGTM

Summary: P0/0 · P1/0 · P2/1 · P3/3

lgtm ready to ci

Non-blocking Suggestions

P2

  • tpSyncModelInputs 末段 execBroadcast 未守卫 packed_buffers 为空 @ rtp_llm/cpp/models/ModelTypes.cc:303
    • 建议:把末段 NCCL 广播放进 if (!packed_buffers.empty()) 守卫;或当 packed_buffers 仅含 gpu 项时才保留 execBroadcast + cudaSyncAndCheck,CPU-only 分支应完全走 UDS 路径,避免空广播+同步。

P3

  • UDS base_path 仅依赖 os.getppid() 与 dp_rank,存在跨 server 实例与 reparent 冲突风险 @ rtp_llm/models_py/distributed/collective_torch.py:402
    • 建议:建议在路径中混入更具唯一性的标识,如 model_id、master_addr/port、随机会话 token 或 NCCL unique_id 哈希;或改用 abstract namespace UDS(首字符 \0)配合 master_port 做命名空间隔离。
  • CpuTpBroadcaster::broadcast 无超时,对端故障会无限阻塞引擎主线程 @ rtp_llm/cpp/distribute/CpuTpBroadcaster.cc:53
    • 建议:为 peer fd 设置 SO_SNDTIMEO/SO_RCVTIMEO(或在 writeAll/readAll 内部用 poll(POLLIN/POLLOUT, timeout) 包一层),超时即 RTP_LLM_FAIL 并打印对端 rank、累计字节数等诊断信息。
  • CpuTpBroadcaster::initialize 失败直接 RTP_LLM_FAIL,未实现到 NCCL 的软回退 @ rtp_llm/cpp/distribute/CpuTpBroadcaster.cc:115
    • 建议:把 initialize() 中 fatal CHECK 改为 LOG_WARN + 保持 initialized_=false,让 execBroadcastCpu 自动落回 NCCL 路径;或在 Python 端 try/except 包住 init_cpu_tp_broadcaster 并 fallback。
Review Checklist: 4 pass / 4 fail

General Principles Checklist

Failed

  • [FAIL] 6.1 Architecture: 状态不变量:广播路径在所有分支上保持一致的同步语义 Linked issue: tpSyncModelInputs 末段 execBroadcast 未守卫 packed_buffers 为空
  • [FAIL] 6.1 Tests: 新逻辑有针对性测试覆盖(UT/smoke/集成) Linked issue: CpuTpBroadcaster::initialize 失败直接 RTP_LLM_FAIL,未实现到 NCCL 的软回退

Passed

  • [PASS] 6.1 Software Engineering: SRP:CpuTpBroadcaster 单一职责,仅承担 intra-node CPU 广播 bootstrap 与传输
  • [PASS] 6.1 Quality: 改动与无关 formatting/移动隔离,commits 原子性可 bisect

RTP-LLM Checklist

Failed

  • [FAIL] D 性能与热路径: 热路径阻塞操作具备超时与诊断手段 Linked issue: CpuTpBroadcaster::broadcast 无超时,对端故障会无限阻塞引擎主线程

Passed

  • [PASS] A 兼容性与配置: 新增 pybind 入口与 Python bootstrap 的兼容性兜底(hasattr 检查、cross-node 回退)
  • [PASS] E 分布式与拓扑: 跨节点 TP 路径明确回退 NCCL,未破坏 PD 分离与 EP 路径

Python Static-First Checklist

Failed

  • [FAIL] P.B 命名与全局资源: 进程间共享资源(路径/socket/锁)的命名具备足够唯一性,避免跨实例冲突 Linked issue: UDS base_path 仅依赖 os.getppid() 与 dp_rank,存在跨 server 实例与 reparent 冲突风险

Strengths

  • 在 collective_torch._register_process_groups_to_cpp 阶段集中 bootstrap,避开了 C++ lazy init 时 rank 1 抢跑 rank 0 bind 的竞态,思路清晰且有清晰注释。
  • execBroadcastCpu 在未初始化时回退到原 NCCL+cudaSyncAndCheck 路径,保住了跨节点 TP 的正确性边界。
  • UDS handshake 让 rank 0 主动学习每个 peer rank(peer_rank 范围检查 + 防重复),比按 accept 顺序赋 rank 更鲁棒。
  • execBroadcastCpu 中 contig.is_same(t) 判断避免了 pinned tensor 已经 contiguous 时的多余 copy_,是合理的 fast path。

@Vinkle-hzt
Copy link
Copy Markdown
Collaborator Author

Vinkle-hzt commented Apr 30, 2026

if (
    _parallelism_config is not None
    and _parallelism_config.tp_size > 1
    and _parallelism_config.tp_size <= _parallelism_config.local_world_size
    and hasattr(librtp_compute_ops, "init_cpu_tp_broadcaster")
):

当 tp_size <= local_world_size, 即只有机内tp才会采用UDS

@Vinkle-hzt Vinkle-hzt closed this Apr 30, 2026
@Vinkle-hzt Vinkle-hzt reopened this Apr 30, 2026
@LLLLKKKK
Copy link
Copy Markdown
Collaborator

AI Code Review - PR #960

Status: LGTM

Summary: P0/0 · P1/0 · P2/1 · P3/1

lgtm ready to ci

Non-blocking Suggestions

P2

  • UDS 文件位于 /tmp 且未做权限/认证保护,本地用户可注入或截获 CPU 广播数据 @ rtp_llm/cpp/distribute/CpuTpBroadcaster.cc:159
    • 建议:二选一:1) bind 完成后立即 ::chmod(path.c_str(), 0700),并把 base_path 放到一个仅当前用户可访问的目录(例如 $XDG_RUNTIME_DIR/tmp/rtp_llm_<uid>/,先 mkdir 0700);2) 改用 Linux abstract namespace(addr.sun_path[0] = '\0')以避开文件系统命名空间。同时在 accept 后用 getsockopt(SO_PEERCRED) 校验对端 uid==自己。

P3

  • initialize() 中 30s connect 重试的注释引用了已不再相关的初始化时序 @ rtp_llm/cpp/distribute/CpuTpBroadcaster.cc:190
    • 建议:把注释改写为「重试只是为了容忍 rank K 比 rank 0 略早进入 init() 的极小窗口(毫秒级),600×50ms 是宽裕的安全边际」,避免引用 _create_process_groups 这个无关函数。
Review Checklist: 9 pass / 1 fail

General Principles Checklist

Passed

  • [PASS] 6.1 Software Engineering: SRP / 边界:新模块仅承担一个清晰职责
  • [PASS] 6.1 Architecture: 错误语义/降级路径显式
  • [PASS] 6.1 Tests: 新增逻辑由现有覆盖或新增测试覆盖
  • [PASS] 6.1 Quality: 单一变更不混入无关 diff,commit 与行为匹配

RTP-LLM Checklist

Failed

  • [FAIL] A Compatibility / Config: 新增 IPC 命名不与既有运行时冲突 Linked issue: UDS 文件位于 /tmp 且未做权限/认证保护,本地用户可注入或截获 CPU 广播数据

Passed

  • [PASS] C Concurrency: 共享状态在多线程访问下可见性正确
  • [PASS] E Distributed: intra-node / cross-node 路由正确
  • [PASS] F Cross-platform: 新代码在所有支持平台可构建/可运行

Python Static-First Checklist

Passed

  • [PASS] P.A Python Static Hygiene: 模块级名字(os, logging, librtp_compute_ops)在使用前已导入或保护
  • [PASS] P.C Python Type / API Contract: 调用 C++ binding 的参数类型与签名一致

Strengths

  • 通过单独的 execBroadcastCpu + UDS 路径绕开 NCCL 在小 CPU tensor 广播上引发的 cudaDeviceSynchronize 卡顿,避免了在主路径上引入 device-wide sync,对 mtp_stream_async 等异步路径友好。
  • 保留了 NCCL fallback:CpuTpBroadcaster 未初始化(跨节点 TP / 单卡)时 execBroadcastCpu 自动退化为 execBroadcast + cudaSyncAndCheck,对调用方契约不变;fallback 显式在 ExecOps.h 文档化。
  • init_cpu_tp_broadcaster 在 pybind 侧用 py::gil_scoped_release 包住阻塞 accept/connect,避免在握手期间持有 GIL 阻塞其他 Python 线程;理由在代码中写明,避免后续被无意改回。
  • initialize() 用 mutex + atomic + acquire/release 语义构造,broadcast() 通过 acquire load 看到完整的 peer_fds_ 状态;re-init 同参数幂等、不同参数显式 FAIL,contract 清晰。
  • ModelTypes.cc 中把 if (!packed_buffers.empty()) 加在 NCCL 调用前,避免 cpu_packed 走 UDS 后仅剩 gpu_packed 时仍然发起空 NCCL 广播 + cudaSync。

@wht21
Copy link
Copy Markdown
Collaborator

wht21 commented Apr 30, 2026

internal source has been updated, please review the changes!

@wht21
Copy link
Copy Markdown
Collaborator

wht21 commented Apr 30, 2026

internal source has been updated, please review the changes!

@LLLLKKKK
Copy link
Copy Markdown
Collaborator

AI Code Review - PR #960

Status: BLOCKING

Summary: P0/0 · P1/1 · P2/3 · P3/0

Blocking Issues

P1

  • rank 0 accept() 无超时,peer init 失败时引擎启动死锁 @ rtp_llm/cpp/distribute/CpuTpBroadcaster.cc:167
    • 建议:为 listen_fd_ 设置 SO_RCVTIMEO 或在 accept 前 poll() 一段时间(例如同样 ~30s),失败时 RTP_LLM_FAIL 抛错,至少给运维一个明确的失败信号;并在退出失败路径时清理已 accept 的 fd 与 my_uds_path_。

Non-blocking Suggestions

P2

  • execBroadcastCpu UDS 路径静默忽略 params.mode @ rtp_llm/models_py/bindings/core/ExecOps.cc:443
    • 建议:在 UDS 分支入口 RTP_LLM_CHECK_WITH_INFO(params.mode == ParallelMode::TP, ...),或在 ExecOps.h 注释中增加显式断言要求;同时考虑将 broadcaster key 化(按 mode 维护多份单例)以便后续扩展。
  • UDS 路径 /tmp/rtp_llm_tp__dp 可预测且无身份校验 @ rtp_llm/models_py/distributed/collective_torch.py:545
    • 建议:socket 创建后用 fchmod(fd,0600) 限制文件权限,accept 后用 SO_PEERCRED 校验对端 uid 与本进程一致;或将 socket 路径放到 /run/user/$UID/ 等私有目录。同时在 base_path 加入随机 nonce(通过 NCCL store 在 ranks 间分发)以防止可预测路径。
  • CpuTpBroadcaster 无专门的功能/边界单元测试 @ rtp_llm/cpp/distribute/CpuTpBroadcaster.cc:1
    • 建议:在 rtp_llm/cpp/distribute/test/ 下添加 cc_test:用 socketpair+fork 或 std::thread 模拟两 rank,覆盖正常 broadcast、reinit 一致/不一致、handshake 错误、fallback 与 isInitialized 状态机;并在 collective_torch 测试中加入 init_cpu_tp_broadcaster 的最小双进程用例。

Checklist Violations (6 fail / 112 total)

General Principles Checklist

  • [6.1] Architecture — 状态不变量:创建/更新/失败/重试/回滚路径有效 → issue rank 0 accept() 无超时,peer init 失败时引擎启动死锁
    rank 0 accept() 路径无超时,peer init 失败/掉线时 rank 0 永久阻塞且 initialized 永远为 false,没有失败回滚也没有失败信号。_
  • [6.1] Architecture — 错误语义:fail-fast/retry/fallback/silent 行为显式 → issue execBroadcastCpu UDS 路径静默忽略 params.mode
    rank K connect 有 30s retry 后 fail-fast,但 rank 0 accept 是无限静默阻塞;execBroadcastCpu 对 params.mode 也是静默忽略而非显式拒绝。两处错误语义不对称。
  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → issue CpuTpBroadcaster 无专门的功能/边界单元测试
    ~225 行新通信原语无 cc_test/py_test 直接覆盖,只能靠在线 smoke 间接覆盖;handshake/EINTR/reinit/fallback 路径无回归保护。
  • [6.1] Tests — 分布式/跨平台变更有对应覆盖 → issue CpuTpBroadcaster 无专门的功能/边界单元测试
    分布式 broadcast 改造无双 rank UT;user_buffers_test 改动只覆盖 user_buffers,未触达 CpuTpBroadcaster。

RTP-LLM Checklist

  • [B] 正确性与逻辑 — Bypass/shortcut 路径包含新增变换步骤 → issue execBroadcastCpu UDS 路径静默忽略 params.mode
    execBroadcastCpu 是 NCCL 的 bypass/shortcut,但忽略 params.mode 字段,未把『按 mode 选择 process group』这一变换继承下来;当前只有 TP mode 调用安全,对未来扩展不安全。
  • [H] 测试与 CI — 测试覆盖充分:大重构等价覆盖,新功能端到端测试 → issue CpuTpBroadcaster 无专门的功能/边界单元测试
    新增 ~225 行 C++ 通信原语缺乏直接 UT,handshake/EINTR/reinit/fallback 路径无回归保护。

Strengths

  • socket fd 用 RAII 风格在析构里清理,rank 0 在 bind 前 unlink 处理掉了上次 crash 残留的 stale socket
  • execBroadcastCpu 在未 init 时透明回退到 NCCL+cudaSyncAndCheck,跨节点 TP 与单 rank 走原有路径,向后兼容
  • init_cpu_tp_broadcaster 在 pybind 层 py::gil_scoped_release,避免 accept/connect 阻塞导致 GIL 拖死其它 Python 线程
  • writeAll/readAll 正确处理 EINTR 与短写/短读
  • _register_process_groups_to_cpp 中通过 hasattr 与 tp_size<=local_world_size 双重门控保证只在节点内 TP 才初始化 broadcaster
  • user_buffers_test.py 改用 PortManager 分配端口、把活进程显式 terminate+join,避免后续 case 继承端口/socket

@wht21
Copy link
Copy Markdown
Collaborator

wht21 commented Apr 30, 2026

internal source has been updated, please review the changes!

@LLLLKKKK
Copy link
Copy Markdown
Collaborator

AI Code Review - PR #960

Status: BLOCKING

Summary: P0/0 · P1/1 · P2/3 · P3/1

Blocking Issues

P1

  • execBroadcastCpu fallback 丢失 execSyncCommunication(false) @ rtp_llm/models_py/bindings/core/ExecOps.cc:655
    • 建议:在 execBroadcastCpu fallback 中保留 execSyncCommunication(false) 调用,或在 PR 描述/代码注释中明确论证为何 cudaSyncAndCheck 已完整覆盖原 sync 语义

Non-blocking Suggestions

P2

  • 新增 CpuTpBroadcaster 缺乏独立单元测试 @ rtp_llm/cpp/distribute/BUILD:14
    • 建议:增加 cc_test:fork 两个进程跑 initialize+broadcast 正常路径;mock UDS 路径跑 timeout 与 re-init mismatch;确保 cleanupRank0State 在 accept 失败后正确释放 listen_fd 与 unlink
  • execBroadcastCpu 对 root!=0 校验在 init/fallback 不一致 @ rtp_llm/models_py/bindings/core/ExecOps.cc:631
    • 建议:在 execBroadcastCpu 入口统一校验 params.root == 0 并 fail-fast,或在 ExecOps.h 文档注明仅支持 root=0 并在两条路径都强制
  • collective_torch.py bootstrap 段 base_path None 兜底为死代码 @ rtp_llm/models_py/distributed/collective_torch.py:765
    • 建议:直接使用 _cpu_tp_broadcaster_base_path 并删除不可达 fallback 块;或把入口处的赋值放到必要的 if/else 分支以保留 fallback 语义

P3

  • /tmp 路径硬编码、未走 TMPDIR @ rtp_llm/models_py/distributed/collective_torch.py:730
    • 建议:使用 tempfile.gettempdir() 或 os.environ.get("TMPDIR", "/tmp") 作为前缀;或将 base 目录暴露为 ParallelismConfig/env var 以便部署侧覆盖

Checklist Violations (4 fail / 98 total)

General Principles Checklist

  • [6.1] Software Engineering — KISS/YAGNI:无投机性抽象 → issue collective_torch.py bootstrap 段 base_path None 兜底为死代码
    _collective_torch.py 同时存在两处 base_path 推导逻辑:入口处 make_cpu_tp_broadcaster_base_path 与 bootstrap 段 if base_path is None fallback;后者实际不可达。
  • [6.1] Architecture — 错误语义:fail-fast/retry/fallback/silent 行为显式 → issue execBroadcastCpu 对 root!=0 校验在 init/fallback 不一致
    execBroadcastCpu 在 broadcaster 已初始化时通过 CpuTpBroadcaster::broadcast 强制 root==0 抛错;fallback 走 execBroadcast 不做校验。配置错误在 init/fallback 路径下行为分裂、被静默掩盖。
  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → issue 新增 CpuTpBroadcaster 缺乏独立单元测试
    rtp_llm/cpp/distribute/BUILD 无 cc_test。新增 410 行 socket/handshake 逻辑(accept timeout、re-init mismatch、handshake bad rank、cleanupRank0State 异常路径)仅靠 user_buffers_test 端到端覆盖。

RTP-LLM Checklist

  • [E] 分布式 — 隐式同步依赖 / broadcast 后 syncAndCheck → issue execBroadcastCpu fallback 丢失 execSyncCommunication(false)
    原 tpSyncModelInputs 在 shape_hints 与 mm_features 两处广播后均执行 execBroadcast → execSyncCommunication(false) → cudaSyncAndCheck()。新代码改 execBroadcastCpu,其 fallback 仅 execBroadcast + cudaSyncAndCheck(),跨节点 TP 路径丢失 execSyncCommunication 的等待语义。

Strengths

  • RAII 资源清理完整:closeFd/cleanupRank0State 覆盖 accept/handshake throw 路径
  • accept 与 connect 均带 init 阶段 timeout(kInitTimeoutMs=120s),避免 bootstrap 死锁
  • rank0 handshake 校验 peer_rank 边界与重复,防止脏 fd 串话
  • pybind init_cpu_tp_broadcaster 显式 py::gil_scoped_release,避免 accept/connect 长 block 时阻塞其他 Python 线程
  • _run_multi_process_test 补 terminate + join cleanup,防止失败子进程残留 UDS socket 影响后续 case

@Vinkle-hzt Vinkle-hzt force-pushed the feat/cpu_broadcast branch 2 times, most recently from d77c36b to 230436d Compare April 30, 2026 15:25
@wht21
Copy link
Copy Markdown
Collaborator

wht21 commented Apr 30, 2026

internal source has been updated, please review the changes!

1 similar comment
@wht21
Copy link
Copy Markdown
Collaborator

wht21 commented Apr 30, 2026

internal source has been updated, please review the changes!

@LLLLKKKK
Copy link
Copy Markdown
Collaborator

AI Code Review - PR #960

Status: BLOCKING

Summary: P0/0 · P1/1 · P2/3 · P3/2

Blocking Issues

P1

  • CpuTpBroadcaster::broadcast 写 UDS 无 SIGPIPE 防护 @ rtp_llm/cpp/distribute/CpuTpBroadcaster.cc:144
    • 建议:改用 ::send(fd, p, left, MSG_NOSIGNAL) 代替 ::write 进行所有 UDS socket 写;writeAllWithTimeout 同改。或在 CpuTpBroadcaster::initialize 首次调用时 signal(SIGPIPE, SIG_IGN)。这样 peer 崩溃时返回 EPIPE 即可走 RTP_LLM_CHECK_WITH_INFO 抛异常,保留原始堆栈与日志。

Non-blocking Suggestions

P2

  • _normalize_parallelism_ranks 静默覆盖调用方 tp_rank/dp_rank @ rtp_llm/models_py/distributed/collective_torch.py:750
    • 建议:若入参已设置 tp_rank/dp_rank 且与计算值不一致,assert 报错而不是覆盖;或在覆盖时 logging.warning 标明旧值/新值。也可以将函数重命名为 _enforce_canonical_ranks 并在 docstring 明确「调用方不要预先设定」。
  • init_distributed_environment 重复调用仍会产生副作用 @ rtp_llm/models_py/distributed/collective_torch.py:770
    • 建议:把 _normalize_parallelism_ranks 与 _cpu_tp_broadcaster_base_path 计算移动到 if _initialized and torch.distributed.is_initialized(): return 之后,或在最初判断「真正会执行 init 的分支」内执行;并在文档/注释中说明 broadcaster 单例不可重置,以避免与 destroy_distributed_environment 配合时陷入 mismatch。
  • 新建 CpuTpBroadcaster 缺少独立单元测试 @ rtp_llm/cpp/distribute/CpuTpBroadcaster.cc:233
    • 建议:新增 cc_test_wrapper(name='cpu_tp_broadcaster_test'),用 fork+UDS 模拟 rank0/rankK 完成下面 case:(1) 双 rank 正常 init+broadcast 多次往返;(2) rank0 init 时遇到 stale socket;(3) rankK connect 重试后成功;(4) re-init 同参数 no-op、不同参数抛异常;(5) broadcast nbytes=0 短路。也可在 user_buffers_test 中通过 mock librtp_compute_ops.init_cpu_tp_broadcaster 验证 collective_torch 的入口条件。

P3

  • ExecOps.cc execBroadcastCpu fallback 比原 callsite 多一次 execSyncCommunication @ rtp_llm/models_py/bindings/core/ExecOps.cc:660
    • 建议:在 execBroadcastCpu 注释里明确「fallback 路径恒定附加 execSyncCommunication+cudaSyncAndCheck,可能比原 callsite 多一次同步」,或拆出 execBroadcastCpuStrict / execBroadcastCpuLoose 让 callsite 选择是否需要 sync,以保留语义对等。
  • RTP_LLM_CPU_TP_BROADCASTER_ID 未做路径字符限制 @ rtp_llm/models_py/distributed/collective_torch.py:734
    • 建议:对 session_id 走 re.sub(r'[^A-Za-z0-9._-]', '_', session_id),并校验最终 path 长度 < sizeof(sockaddr_un.sun_path)(在 Python 端先报错,比 C++ assert 更友好)。

Checklist Violations (7 fail / 85 total)

General Principles Checklist

  • [6.1] Architecture — 状态不变量:创建/更新/失败/重试/回滚路径有效 → issue init_distributed_environment 重复调用仍会产生副作用
    _rank 0 init 失败时调用 cleanupRank0State 恢复局部状态,但 initialized 仍为 false;然而 collective_torch 端在『重复 init』路径会先无条件重写 cpu_tp_broadcaster_base_path,与 destroy 后再 init 的不变量耦合不一致(详见 issues 第 3 条)。
  • [6.1] Architecture — 错误语义:fail-fast/retry/fallback/silent 行为显式 → issue CpuTpBroadcaster::broadcast 写 UDS 无 SIGPIPE 防护
    execBroadcastCpu 的 fallback 路径(execBroadcast+execSyncCommunication+cudaSyncAndCheck)隐含一次额外 sync,与原 callsite 行为不完全一致,未在文档显式说明;CpuTpBroadcaster 写 UDS 又缺少对 SIGPIPE 的显式处理,最终错误语义可能从『抛异常』退化为『进程被信号终止』。
  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → issue 新建 CpuTpBroadcaster 缺少独立单元测试
    新增 CpuTpBroadcaster 与 execBroadcastCpu 没有 cc_test 覆盖;user_buffers_test.py 改的是端口管理与子进程清理,并不直接验证新代码路径。
  • [6.1] Tests — 边界 case 覆盖(空、单元素、最大值) → issue 新建 CpuTpBroadcaster 缺少独立单元测试
    broadcast(nbytes=0) 短路、tp_size==1 no-op、stale socket re-bind、rankK connect 重试、handshake bad peer_rank 等边界 case 均无测试覆盖。
  • [6.1] Tests — 分布式/跨平台变更有对应覆盖 → issue 新建 CpuTpBroadcaster 缺少独立单元测试
    PR 直接影响 TP 分布式数据一致性,但既无 multi-rank ut,也未在 smoke 中显式声明 TP > 1 case。CUDA/ROCm/PPU 的差异路径未单独验证(broadcaster 本身设备无关,但 callsite 涉及 cudaSyncAndCheck fallback)。

RTP-LLM Checklist

  • [E] 分布式 — 隐式同步依赖 / broadcast 后 syncAndCheck → issue ExecOps.cc execBroadcastCpu fallback 比原 callsite 多一次 execSyncCommunication
    原 callsite 是 execBroadcast→execSyncCommunication→cudaSyncAndCheck;新 UDS 路径下 cpu_packed/shape_hints 不再走 cudaSyncAndCheck(因 CPU 无 GPU 流),fallback 路径在 execBroadcastCpu 内统一补 sync,但 ModelTypes 第三处 callsite 比原行为多一次 execSyncCommunication,需在文档/注释中显式说明。
  • [H] 测试与 CI — 测试覆盖充分:大重构等价覆盖,新功能端到端测试 → issue 新建 CpuTpBroadcaster 缺少独立单元测试
    新增 broadcaster 既无 cc_test,又无端到端的 multi-rank 自检 case,只通过 user_buffers_test 覆盖原 NCCL 路径(且改动还集中在端口/进程清理),覆盖不足。

Strengths

  • execBroadcastCpu 内置「未初始化 → 走原 NCCL+execSyncCommunication+cudaSyncAndCheck」fallback,跨节点 TP 与单 rank 行为不变,降级路径清晰。
  • CpuTpBroadcaster 头文件明确写出线程安全模型(initialize 加锁、broadcast 仅由引擎主线程调用)与 star topology root=0 的契约,便于调用方理解边界。
  • init_cpu_tp_broadcaster pybind 包装显式 py::gil_scoped_release,避免 accept/connect 阻塞期间持锁导致 Python 侧死锁,符合 RTP-LLM G 节跨语言陷阱原则。
  • 测试 _run_multi_process_test 改写后先全量 join 再聚合 failures,并在 finally 中 terminate 残留子进程,避免单 case 失败污染后续 case 的 socket/port。

@wht21
Copy link
Copy Markdown
Collaborator

wht21 commented Apr 30, 2026

internal source has been updated, please review the changes!

@LLLLKKKK
Copy link
Copy Markdown
Collaborator

AI Code Review - PR #960

Status: BLOCKING

Summary: P0/0 · P1/1 · P2/3 · P3/0

Blocking Issues

P1

  • destroy_distributed_environment 未清理 C++ CpuTpBroadcaster 单例,重新初始化会失败 @ rtp_llm/models_py/distributed/collective_torch.py:546
    • 建议:在 ExecOps 暴露 destroy_cpu_tp_broadcaster 接口(关闭 fds、unlink、清空 initialized_/peer_fds_),并在 destroy_distributed_environment 调用;或在 C++ initialize 检测到 base_path 变化时主动 teardown 而非 FAIL。

Non-blocking Suggestions

P2

  • Cross-node fallback 路径将原本打包的 cpu_packed/gpu_packed 拆成两次 NCCL broadcast @ rtp_llm/cpp/models/ModelTypes.cc:303
    • 建议:在 fallback 路径或 ModelTypes.cc 内保持原有合并广播:当 isInitialized() 返回 false 时,将 cpu_packed/gpu_packed 一并交给 execBroadcast 一次广播;或在 ModelTypes 中根据 broadcaster.isInitialized() 选择拆分/合并策略。
  • _normalize_parallelism_ranks 静默覆盖调用方显式传入的 tp_rank/dp_rank @ rtp_llm/models_py/distributed/collective_torch.py:782
    • 建议:将 normalize 改为校验:发现冲突时抛出 ValueError 让调用方显式修复;如确实需要兼容旧调用,加 strict 参数控制并把日志升级为 ERROR;同时记录这一行为在 docstring。
  • 新增 CpuTpBroadcaster 缺少专项单元测试,bootstrap/握手/fallback 路径未被覆盖 @ rtp_llm/cpp/distribute/CpuTpBroadcaster.cc:1
    • 建议:在 rtp_llm/cpp/distribute/test/ 下增加多进程或 fork-based 单测覆盖:rank 0/K 握手、超时、断链清理、单 rank 短路、init 不一致触发 FAIL;并加一条多进程 Python 测试覆盖 fallback 路径(mock isInitialized=false)。

Checklist Violations (9 fail / 85 total)

General Principles Checklist

  • [6.1] Architecture — 状态不变量:创建/更新/失败/重试/回滚路径有效 → issue destroy_distributed_environment 未清理 C++ CpuTpBroadcaster 单例,重新初始化会失败
    destroy_distributed_environment 仅清理 Python 状态,C++ 单例保留 initialized/peer_fds_,再 init 触发 FAIL 或使用陈旧 fd。_
  • [6.1] Architecture — 兼容性:公开 API/持久数据/配置/环境迁移安全 → issue _normalize_parallelism_ranks 静默覆盖调用方显式传入的 tp_rank/dp_rank
    _normalize_parallelism_ranks 静默改写 ParallelismConfig.tp_rank/dp_rank,会破坏依赖原有显式 rank 设置的调用方。
  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → issue 新增 CpuTpBroadcaster 缺少专项单元测试,bootstrap/握手/fallback 路径未被覆盖
    新增 CpuTpBroadcaster 与 execBroadcastCpu fallback 没有任何专项单测;user_buffers_test 仅改端口分配。
  • [6.1] Tests — 边界 case 覆盖(空、单元素、最大值) → issue 新增 CpuTpBroadcaster 缺少专项单元测试,bootstrap/握手/fallback 路径未被覆盖
    tp_size<=1 短路、nbytes==0 短路、超时、断链等边界路径无测试。
  • [6.1] Tests — 分布式/跨平台变更有对应覆盖 → issue 新增 CpuTpBroadcaster 缺少专项单元测试,bootstrap/握手/fallback 路径未被覆盖
    intra-node UDS 路径与 cross-node fallback 路径均无回归覆盖。

RTP-LLM Checklist

  • [D] 性能 — 通信 / buffer 开销评估 → issue Cross-node fallback 路径将原本打包的 cpu_packed/gpu_packed 拆成两次 NCCL broadcast
    cross-node fallback 把原本一次合并的 cpu_packed/gpu_packed 拆为两次 NCCL broadcast + 两次 cudaSyncAndCheck,每步多 1 次 launch。
  • [E] 分布式 — C++/Python 重复状态标志一致性 → issue destroy_distributed_environment 未清理 C++ CpuTpBroadcaster 单例,重新初始化会失败
    Python destroy_distributed_environment 重置 cpu_tp_broadcaster_base_path,但 C++ 单例的 initialized/peer_fds/base_path_ 不会清零,导致两侧状态不一致。_
  • [H] 测试与 CI — 测试覆盖充分:大重构等价覆盖,新功能端到端测试 → issue 新增 CpuTpBroadcaster 缺少专项单元测试,bootstrap/握手/fallback 路径未被覆盖
    约 420 行新代码(CpuTpBroadcaster + execBroadcastCpu)无任何专项单测或端到端测试。

Python Static-First Checklist

  • [P.A] 静态结构与类型纪律 — 禁止 hasattr 做控制流分支 → checklist-only
    collective_torch.py 用 hasattr(librtp_compute_ops, 'init_cpu_tp_broadcaster') 控制是否 bootstrap broadcaster。

Strengths

  • UDS star 拓扑 + handshake + link probe 设计清晰,绕开 NCCL cudaDeviceSynchronize stall 的性能动机有据可循(注释引用 m2.md / mtp_stream_async)
  • initialize() 在 rank 0 异常路径用 cleanupRank0State 统一释放 listen_fd_/peer_fds_/UDS 文件,避免半初始化状态残留
  • init_cpu_tp_broadcaster pybind 显式 py::gil_scoped_release,避免 accept/connect 期间阻塞其他 Python 线程
  • Python 端 _make_cpu_tp_broadcaster_base_path 用 _UDS_SUN_PATH_LIMIT 与 C++ sockaddr_un 限制对齐,提前抛错
  • execBroadcastCpu fallback 保留 execSyncCommunication+cudaSyncAndCheck 契约,跨节点正确性有兜底
  • user_buffers_test._run_multi_process_test 改造保证子进程异常或超时也会被 terminate,避免 socket 残留影响后续用例

@wht21
Copy link
Copy Markdown
Collaborator

wht21 commented Apr 30, 2026

internal source has been updated, please review the changes!

@LLLLKKKK
Copy link
Copy Markdown
Collaborator

AI Code Review - PR #960

Status: BLOCKING

Summary: P0/0 · P1/1 · P2/2 · P3/0

Blocking Issues

P1

  • 缺少 CpuTpBroadcaster 单元测试 @ rtp_llm/cpp/distribute/CpuTpBroadcaster.cc:241
    • 建议:补一个 cc_test,至少覆盖:(1) tp_size=2/4 的 happy-path 广播;(2) rank0 收到非法 peer_rank 抛错;(3) 重复 peer_rank 抛错;(4) link probe 字节不匹配抛错;(5) reset 后用新的 base_path 再 init 成功。可走 fork() 多进程 + tmp UDS dir。

Non-blocking Suggestions

P2

  • 跨节点 TP fallback 把 1 次 NCCL 广播拆成 2 次 @ rtp_llm/cpp/models/ModelTypes.cc:300
    • 建议:在 fallback 路径上仍把 cpu_packed 重新合并进 packed_buffers 一起广播(例如让 execBroadcastCpu 在未 init 时直接 return 并由调用方决定是否合并),或在跨节点场景跳过 cpu/gpu 拆分;同时在 PR 描述/注释里明确跨节点回退的代价。
  • CpuTpBroadcaster::broadcast 运行时无超时无观测,peer 崩溃将静默挂死 @ rtp_llm/cpp/distribute/CpuTpBroadcaster.cc:430
    • 建议:为 broadcast 增加可配置的软超时(例如默认数十秒)+ 周期性 RTP_LLM_LOG_WARNING;或至少加 watchdog 定时检测 peer_fds_ EPOLLHUP/EPOLLERR 并在 rank 异常退出时 fail-fast 抛 timeout,避免静默挂死。

Checklist Violations (5 fail / 72 total)

General Principles Checklist

  • [6.1] Architecture — 可观测性:日志/指标/超时可操作、非噪声 → issue CpuTpBroadcaster::broadcast 运行时无超时无观测,peer 崩溃将静默挂死
    broadcast 运行时无超时无 watchdog,peer 崩溃将静默挂死,无任何告警日志。
  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → issue 缺少 CpuTpBroadcaster 单元测试
    CpuTpBroadcaster 协议级实现 ~490 行无任何 cc_test/py_test。
  • [6.1] Tests — 边界 case 覆盖(空、单元素、最大值) → issue 缺少 CpuTpBroadcaster 单元测试
    广播路径对 nbytes=0、tp_size=1 短路有处理但无测试覆盖;handshake 越界/重复 peer_rank 仅有 RTP_LLM_CHECK 抛错,无测试。
  • [6.1] Tests — 分布式/跨平台变更有对应覆盖 → issue 缺少 CpuTpBroadcaster 单元测试
    intra-node TP 主路径 (UDS) 无测试,cross-node TP fallback 路径无测试。

RTP-LLM Checklist

  • [D] 性能 — 通信 / buffer 开销评估 → issue 跨节点 TP fallback 把 1 次 NCCL 广播拆成 2 次
    cross-node TP fallback 把原本 1 次合并 NCCL 广播拆为 2 次,且每次都附 execSyncCommunication+cudaSyncAndCheck,跨节点路径净退化。

Strengths

  • UDS bootstrap 路径细节扎实:accept/connect 双向超时、handshake 校验 peer_rank 范围与重复、链路探测 token 双向验证、bootstrap 异常路径 cleanupRank0State 释放 listen_fd 与 unlink socket 文件,failure semantics 清晰
  • execBroadcastCpu 通过『未初始化即 fallback 到 execBroadcast + execSyncCommunication + cudaSyncAndCheck』保留了 tpSyncModelInputs 旧 callsite 的即时可读语义,回退路径对正确性零退化
  • Python 侧 _make_cpu_tp_broadcaster_base_path 对 sun_path 长度做了显式校验、以 0o700 创建 per-uid 目录、用 ppid+nccl_init_port+dp_rank 组合做 session 标识并提供 RTP_LLM_CPU_TP_BROADCASTER_ID/DIR 环境变量逃生通道,避免多实例冲突
  • init_cpu_tp_broadcaster pybind 绑定显式 gil_scoped_release,避免 rank0 accept 阻塞期间死锁其它 Python 线程
  • user_buffers_test.py 子进程清理改为 join→收集失败→finally terminate,且 NCCL_PORT_COUNT=12 避免相邻测试端口踩踏,鲁棒性提升

- Added CpuTpBroadcaster class to facilitate CPU tensor broadcasting using Unix Domain Sockets, avoiding NCCL's cudaDeviceSynchronize stalls.
- Introduced methods for initialization and broadcasting, ensuring thread safety and proper handling of intra-node tensor communication.
- Updated relevant build files to include the new broadcaster and modified execBroadcastCpu to utilize it for CPU tensors.
- Integrated initialization in Python to ensure proper setup across TP ranks.
@LLLLKKKK
Copy link
Copy Markdown
Collaborator

AI Code Review - PR #960

Status: LGTM

Summary: P0/0 · P1/0 · P2/0 · P3/2

lgtm ready to ci

Non-blocking Suggestions

P3

  • 链路探针错误信息混淆读失败与字节不匹配 @ rtp_llm/cpp/distribute/CpuTpBroadcaster.cc:428
    • 建议:拆成两次 RTP_LLM_CHECK_WITH_INFO:先校验 n,再校验 probe 值(错误信息显式打印 probe 实际值与期望值),不再依赖 strerror。
  • _make_cpu_tp_broadcaster_base_path 在 broadcaster 不会启用时仍创建目录 @ rtp_llm/models_py/distributed/collective_torch.py:60
    • 建议:把 base_path 的构造与目录创建延迟到 _register_process_groups_to_cpp 中、与 hasattr/tp_size 守卫同位置,避免无关 side-effect。

Checklist Violations (1 fail / 74 total)

General Principles Checklist

  • [6.1] Architecture — 可观测性:日志/指标/超时可操作、非噪声 → issue 链路探针错误信息混淆读失败与字节不匹配
    rank0 接受 peer / 非根连接成功路径都有 RTP_LLM_LOG_INFO;但 link probe 失败的错误信息把读失败与字节不匹配混合并 strerror 一个不相关 errno,对运维定位有迷惑性。

Strengths

  • rank0 部分 accept 失败有完整 cleanup:cleanupRank0State + 内部 try/catch 通过 close_accepted 标志精确管理 fd 所有权切换
  • broadcast 线程安全契约(仅引擎主线程调用)显式写在头文件注释中,避免后续误用
  • init_cpu_tp_broadcaster / destroy 在 pybind 层用 py::gil_scoped_release,避免 accept/connect 阻塞期间持锁拖死 Python 其他线程
  • 跨节点 TP 自动 fallback 到 NCCL,并在 fallback 路径里完整保留 execBroadcast + execSyncCommunication(false) + cudaSyncAndCheck 旧契约
  • 测试覆盖 happy path(TP=2/4)、bad peer rank、duplicate rank、bad link probe、reset 复用,并用 fork+alarm(30) 隔离子进程
  • UDS 路径用 (ppid, nccl_init_port, dp_rank) 命名空间避免跨 init / 跨 DP 组冲突,rank0 启动时 ::unlink 清理上次崩溃残留
  • Python 与 C++ 两侧 UDS sun_path 长度上限对齐(_UDS_SUN_PATH_LIMIT=108 vs sizeof(sun_path))

@wht21
Copy link
Copy Markdown
Collaborator

wht21 commented Apr 30, 2026

internal source has been updated, please review the changes!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants