diff --git a/projects/rccl/src/transport/p2p.cc b/projects/rccl/src/transport/p2p.cc index 61874c9d42..3ee052c0cf 100644 --- a/projects/rccl/src/transport/p2p.cc +++ b/projects/rccl/src/transport/p2p.cc @@ -20,6 +20,7 @@ #else #include "nvlink.h" #endif +#include "shm.h" struct p2pConnectInfo { int direct; @@ -27,17 +28,29 @@ struct p2pConnectInfo { void* directPtr; hipIpcMemHandle_t devIpc; }; + uint64_t pidHash; + int id; + int sendRank; + int recvRank; }; struct p2pSendResources { struct ncclSendMem* devMem; void* ipcPtr; uint32_t* next_hdp_reg; // Next GPU in ring (for p2p transport use only) + uint64_t* opCount; // opCount allocated in host memory + uint64_t* devOpCount; // device side pointer to opCount + uint64_t* remOpCount; // remote opCount allocated in host memory + uint64_t* devRemOpCount; // device side pointer to remote opCount }; struct p2pRecvResources { struct ncclRecvMem* devMem; void* ipcPtr; + uint64_t* opCount; // opCount allocated in host memory + uint64_t* devOpCount; // device side pointer to opCount + uint64_t* remOpCount; // remote opCount allocated in host memory + uint64_t* devRemOpCount; // device side pointer to remote opCount }; #include @@ -498,6 +511,8 @@ end: TRACE(P2P,"IPC: %016lx %016lx %016lx %016lx", devIpc[4], devIpc[5], devIpc[6], devIpc[7]); \ } while (0) +#define MAX_SHM_NAME_LEN 1024 + /* Send: Create and return connect structures for this peer to connect to me */ ncclResult_t p2pSendSetup(struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* send, int buffSize, int channelId) { @@ -521,6 +536,16 @@ ncclResult_t p2pSendSetup(struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peer resources->next_hdp_reg = 0; struct p2pConnectInfo info; + info.id = channelId; + info.pidHash = myInfo->pidHash; + info.sendRank = myInfo->cudaDev; + info.recvRank = peerInfo->cudaDev; + + char shmName[MAX_SHM_NAME_LEN]; + sprintf(shmName, "nccl-p2p-send-opcount-%lx-%d-%d-%d", info.pidHash, info.id, info.sendRank, info.recvRank); + TRACE(NCCL_P2P,"Open shmName %s", shmName); + NCCLCHECK(shmOpen(shmName, sizeof(uint64_t), (void**)&resources->opCount, (void**)&resources->devOpCount, 1)); + if (myInfo->pidHash == peerInfo->pidHash) { info.direct = 1; info.directPtr = resources->devMem; @@ -571,6 +596,16 @@ ncclResult_t p2pRecvSetup(struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peer NCCLCHECK(ncclCudaCalloc((char**)&resources->devMem, recvSize, true)); struct p2pConnectInfo info; + info.id = channelId; + info.pidHash = myInfo->pidHash; + info.sendRank = peerInfo->cudaDev; + info.recvRank = myInfo->cudaDev; + + char shmName[MAX_SHM_NAME_LEN]; + sprintf(shmName, "nccl-p2p-recv-opcount-%lx-%d-%d-%d", info.pidHash, info.id, info.sendRank, info.recvRank); + TRACE(NCCL_P2P,"Open shmName %s", shmName); + NCCLCHECK(shmOpen(shmName, sizeof(uint64_t), (void**)&resources->opCount, (void**)&resources->devOpCount, 1)); + if (myInfo->pidHash == peerInfo->pidHash) { info.direct = 1; info.directPtr = resources->devMem; @@ -626,13 +661,20 @@ static ncclResult_t p2pSendConnect(struct ncclConnect* connectInfo, struct ncclC } } + char shmName[MAX_SHM_NAME_LEN]; + sprintf(shmName, "nccl-p2p-recv-opcount-%lx-%d-%d-%d", info->pidHash, info->id, info->sendRank, info->recvRank); + TRACE(NCCL_P2P,"Open shmName %s", shmName); + NCCLCHECK(shmOpen(shmName, sizeof(uint64_t), (void**)&resources->remOpCount, (void**)&resources->devRemOpCount, 0)); + // Remove the file to ensure proper clean-up + NCCLCHECK(shmUnlink(shmName)); + send->conn.buff = remDevMem->buff; send->conn.llBuff = remDevMem->llBuff; send->conn.tail = &remDevMem->tail; - send->conn.opCountRem = &remDevMem->opCount; + send->conn.opCountRem = resources->devRemOpCount; send->conn.head = &resources->devMem->head; send->conn.ptrExchange = &resources->devMem->ptrExchange; - send->conn.opCountLoc = &resources->devMem->opCount; + send->conn.opCountLoc = resources->devOpCount; send->conn.next_hdp_reg = resources->next_hdp_reg; return ncclSuccess; } @@ -657,12 +699,18 @@ ncclResult_t p2pRecvConnect(struct ncclConnect* connectInfo, struct ncclConnecto } } + char shmName[MAX_SHM_NAME_LEN]; + sprintf(shmName, "nccl-p2p-send-opcount-%lx-%d-%d-%d", info->pidHash, info->id, info->sendRank, info->recvRank); + TRACE(NCCL_P2P,"Open shmName %s", shmName); + NCCLCHECK(shmOpen(shmName, sizeof(uint64_t), (void**)&resources->remOpCount, (void**)&resources->devRemOpCount, 0)); + NCCLCHECK(shmUnlink(shmName)); + recv->conn.buff = resources->devMem->buff; recv->conn.llBuff = resources->devMem->llBuff; recv->conn.tail = &resources->devMem->tail; - recv->conn.opCountLoc = &resources->devMem->opCount; + recv->conn.opCountLoc = resources->devOpCount; recv->conn.head = &remDevMem->head; - recv->conn.opCountRem = &remDevMem->opCount; + recv->conn.opCountRem = resources->devRemOpCount; return ncclSuccess; } @@ -671,6 +719,8 @@ ncclResult_t p2pSendFree(void* resources) { if (sendRes->ipcPtr) CUDACHECK(hipIpcCloseMemHandle(sendRes->ipcPtr)); CUDACHECK(hipFree(sendRes->devMem)); + NCCLCHECK(shmClose(sendRes->opCount, sendRes->devOpCount, sizeof(uint64_t))); + NCCLCHECK(shmClose(sendRes->remOpCount, sendRes->devRemOpCount, sizeof(uint64_t))); free(sendRes); return ncclSuccess; } @@ -680,6 +730,8 @@ ncclResult_t p2pRecvFree(void* resources) { if (recvRes->ipcPtr) CUDACHECK(hipIpcCloseMemHandle(recvRes->ipcPtr)); CUDACHECK(hipFree(recvRes->devMem)); + NCCLCHECK(shmClose(recvRes->opCount, recvRes->devOpCount, sizeof(uint64_t))); + NCCLCHECK(shmClose(recvRes->remOpCount, recvRes->devRemOpCount, sizeof(uint64_t))); free(recvRes); return ncclSuccess; }