diff --git a/projects/rccl/CHANGELOG.md b/projects/rccl/CHANGELOG.md index e0e7913240..189452d206 100644 --- a/projects/rccl/CHANGELOG.md +++ b/projects/rccl/CHANGELOG.md @@ -13,6 +13,12 @@ Full documentation for RCCL is available at [https://rccl.readthedocs.io](https: refer to the interface documentation for details. - To avoid potential deadlocks, user might have to set an environment variables increasing the number of hardware queues (e.g. export GPU_MAX_HW_QUEUES=16) +- Adding support for reusing ports in NET/IB channels + - Opt-in with NCCL_IB_SOCK_CLIENT_PORT_REUSE=1 and NCCL_IB_SOCK_SERVER_PORT_REUSE=1 + - When "Call to bind failed : Address already in use" error happens in large-scale AlltoAll + (e.g., >=64 MI200 nodes), users are suggested to opt-in either one or both of the options + to resolve the massive port usage issue + - Avoid using NCCL_IB_SOCK_SERVER_PORT_REUSE when NCCL_NCHANNELS_PER_NET_PEER is tuned >1 ### Removed - Removed experimental clique-based kernels diff --git a/projects/rccl/src/include/socket.h b/projects/rccl/src/include/socket.h index d72480b6bb..e1886d69e4 100644 --- a/projects/rccl/src/include/socket.h +++ b/projects/rccl/src/include/socket.h @@ -51,7 +51,7 @@ int ncclFindInterfaces(char* ifNames, union ncclSocketAddress *ifAddrs, int ifNa // Create a listening socket. sock->addr can be pre-filled with IP & port info. sock->fd is set after a successful call ncclResult_t ncclSocketListen(struct ncclSocket* sock); // Connect to sock->addr. sock->fd is set after a successful call. -ncclResult_t ncclSocketConnect(struct ncclSocket* sock); +ncclResult_t ncclSocketConnect(struct ncclSocket* sock, int portReuse = 0); // Return socket connection state. ncclResult_t ncclGetSocketState(struct ncclSocket* sock, enum ncclSocketState* state); // Accept an incoming connection from listenSocket->fd and keep the file descriptor in sock->fd, with the remote side IP/port in sock->addr. diff --git a/projects/rccl/src/misc/socket.cc b/projects/rccl/src/misc/socket.cc index ef2bea65a5..0d1f570bee 100644 --- a/projects/rccl/src/misc/socket.cc +++ b/projects/rccl/src/misc/socket.cc @@ -12,6 +12,11 @@ #include #include +#include +#include +#include +static std::vector>> clientPortPool; + /* Format a string representation of a (union ncclSocketAddress *) socket address using getnameinfo() * * Output: "IPv4/IPv6 address" @@ -388,7 +393,7 @@ ncclResult_t ncclGetSocketState(struct ncclSocket* sock, enum ncclSocketState* s return ncclSuccess; } -ncclResult_t ncclSocketConnect(struct ncclSocket* sock) { +ncclResult_t ncclSocketConnect(struct ncclSocket* sock, int portReuse) { char line[SOCKET_NAME_MAXLEN+1]; /* IPv4/IPv6 support */ int family = sock->addr.sa.sa_family; @@ -418,6 +423,35 @@ ncclResult_t ncclSocketConnect(struct ncclSocket* sock) { SYSCHECK(setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (char*)&bufsize, sizeof(int)), "setsockopt"); SYSCHECK(setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (char*)&bufsize, sizeof(int)), "setsockopt");*/ + if (portReuse) { + // pre-define ports according to tid, to avoid extra lock for race condition + if (clientPortPool.size() == 0) { + for (int tid = syscall(SYS_gettid), i = 1; i < 5; i++) { + clientPortPool.push_back(std::make_pair(60000 + i * 1000 + tid % 1000, std::unordered_set())); + } + } + // find a port without conflict (different remote peer) in best effort + int reused_port = -1; + std::string remote_peer(ncclSocketToString(&sock->addr, line)); + for (auto& port : clientPortPool) { + if (port.second.find(remote_peer) == port.second.end()) { + reused_port = port.first; + port.second.insert(remote_peer); + break; + } + } + // bind the port in fd for connect system call + if (reused_port != -1) { + int opt = 1; + SYSCHECK(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)), "setsockopt"); + struct sockaddr_in sin; + sin.sin_family = family; + sin.sin_addr.s_addr = htonl(INADDR_ANY); + sin.sin_port = htons(reused_port); + SYSCHECK(bind(fd, (struct sockaddr *)&sin, salen), "bind_client_port"); + } + } + TRACE(NCCL_INIT|NCCL_NET,"Connecting to socket %s", ncclSocketToString(&sock->addr, line)); int ret; diff --git a/projects/rccl/src/transport/net_ib.cc b/projects/rccl/src/transport/net_ib.cc index 559d79526d..ed0e5e7fbc 100644 --- a/projects/rccl/src/transport/net_ib.cc +++ b/projects/rccl/src/transport/net_ib.cc @@ -83,6 +83,11 @@ NCCL_PARAM(IbTc, "IB_TC", 0); NCCL_PARAM(IbArThreshold, "IB_AR_THRESHOLD", 8192); NCCL_PARAM(IbPciRelaxedOrdering, "IB_PCI_RELAXED_ORDERING", 2); +NCCL_PARAM(IbSockClientPortReuse, "IB_SOCK_CLIENT_PORT_REUSE", 0); +NCCL_PARAM(IbSockServerPortReuse, "IB_SOCK_SERVER_PORT_REUSE", 0); +static thread_local union ncclSocketAddress reusedAddr; +static thread_local int reusedSockfd = -1; + pthread_t ncclIbAsyncThread; static void* ncclIbAsyncThreadMain(void* args) { struct ibv_context* context = (struct ibv_context*)args; @@ -555,7 +560,19 @@ ncclResult_t ncclIbListen(int dev, void* opaqueHandle, void** listenComm) { memset(handle, 0, sizeof(struct ncclIbHandle)); comm->dev = dev; NCCLCHECK(GetSocketAddr(&comm->sock.addr)); - NCCLCHECK(ncclSocketListen(&comm->sock)); + if (ncclParamIbSockServerPortReuse()) { + // reuse the socket address and fd for listen system call + if (reusedSockfd == -1) { + NCCLCHECK(ncclSocketListen(&comm->sock)); + memcpy(&reusedAddr, &comm->sock.addr, sizeof(union ncclSocketAddress)); + reusedSockfd = comm->sock.fd; + } else { + memcpy(&comm->sock.addr, &reusedAddr, sizeof(union ncclSocketAddress)); + comm->sock.fd = reusedSockfd; + } + } else { + NCCLCHECK(ncclSocketListen(&comm->sock)); + } memcpy(&handle->connectAddr, &comm->sock.addr, sizeof(union ncclSocketAddress)); *listenComm = comm; return ncclSuccess; @@ -579,7 +596,7 @@ ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm) { NCCLCHECK(ncclSocketInit(&comm->sock, &handle->connectAddr, NULL, 1)); stage->comm = comm; stage->state = ncclIbCommStateConnect; - NCCLCHECK(ncclSocketConnect(&comm->sock)); + NCCLCHECK(ncclSocketConnect(&comm->sock, ncclParamIbSockClientPortReuse())); ib_connect_check: /* since ncclSocketConnect is async, we must check if connection is complete */ @@ -1268,7 +1285,7 @@ ncclResult_t ncclIbCloseRecv(void* recvComm) { ncclResult_t ncclIbCloseListen(void* listenComm) { struct ncclIbListenComm* comm = (struct ncclIbListenComm*)listenComm; if (comm) { - close(comm->sock.fd); + if (!ncclParamIbSockServerPortReuse() || reusedSockfd != comm->sock.fd) close(comm->sock.fd); free(comm); } return ncclSuccess;