Reduce AlltoAll port usage in send/recv proxy (#577)
* Reduce AlltoAll port usage when connecting proxy
Reuse socket ports when connecting proxies in AlltoAll.
Existing port usage in AlltoAll is O(n) for recv and O(n) for send,
reusing socket ports in server or client side will make one of them
O(1), reusing both will reduce the total port usage to O(1) and enables
AlltoAll in >64 MI200 nodes.
* Update changelog accordingly
Update changelog accordingly.
[ROCm/rccl commit: 80f53cc171]
Этот коммит содержится в:
@@ -13,6 +13,12 @@ Full documentation for RCCL is available at [https://rccl.readthedocs.io](https:
|
||||
refer to the interface documentation for details.
|
||||
- To avoid potential deadlocks, user might have to set an environment variables increasing
|
||||
the number of hardware queues (e.g. export GPU_MAX_HW_QUEUES=16)
|
||||
- Adding support for reusing ports in NET/IB channels
|
||||
- Opt-in with NCCL_IB_SOCK_CLIENT_PORT_REUSE=1 and NCCL_IB_SOCK_SERVER_PORT_REUSE=1
|
||||
- When "Call to bind failed : Address already in use" error happens in large-scale AlltoAll
|
||||
(e.g., >=64 MI200 nodes), users are suggested to opt-in either one or both of the options
|
||||
to resolve the massive port usage issue
|
||||
- Avoid using NCCL_IB_SOCK_SERVER_PORT_REUSE when NCCL_NCHANNELS_PER_NET_PEER is tuned >1
|
||||
### Removed
|
||||
- Removed experimental clique-based kernels
|
||||
|
||||
|
||||
@@ -51,7 +51,7 @@ int ncclFindInterfaces(char* ifNames, union ncclSocketAddress *ifAddrs, int ifNa
|
||||
// Create a listening socket. sock->addr can be pre-filled with IP & port info. sock->fd is set after a successful call
|
||||
ncclResult_t ncclSocketListen(struct ncclSocket* sock);
|
||||
// Connect to sock->addr. sock->fd is set after a successful call.
|
||||
ncclResult_t ncclSocketConnect(struct ncclSocket* sock);
|
||||
ncclResult_t ncclSocketConnect(struct ncclSocket* sock, int portReuse = 0);
|
||||
// Return socket connection state.
|
||||
ncclResult_t ncclGetSocketState(struct ncclSocket* sock, enum ncclSocketState* state);
|
||||
// Accept an incoming connection from listenSocket->fd and keep the file descriptor in sock->fd, with the remote side IP/port in sock->addr.
|
||||
|
||||
@@ -12,6 +12,11 @@
|
||||
#include <ifaddrs.h>
|
||||
#include <net/if.h>
|
||||
|
||||
#include <vector>
|
||||
#include <utility>
|
||||
#include <unordered_set>
|
||||
static std::vector<std::pair<int, std::unordered_set<std::string>>> clientPortPool;
|
||||
|
||||
/* Format a string representation of a (union ncclSocketAddress *) socket address using getnameinfo()
|
||||
*
|
||||
* Output: "IPv4/IPv6 address<port>"
|
||||
@@ -388,7 +393,7 @@ ncclResult_t ncclGetSocketState(struct ncclSocket* sock, enum ncclSocketState* s
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
ncclResult_t ncclSocketConnect(struct ncclSocket* sock) {
|
||||
ncclResult_t ncclSocketConnect(struct ncclSocket* sock, int portReuse) {
|
||||
char line[SOCKET_NAME_MAXLEN+1];
|
||||
/* IPv4/IPv6 support */
|
||||
int family = sock->addr.sa.sa_family;
|
||||
@@ -418,6 +423,35 @@ ncclResult_t ncclSocketConnect(struct ncclSocket* sock) {
|
||||
SYSCHECK(setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (char*)&bufsize, sizeof(int)), "setsockopt");
|
||||
SYSCHECK(setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (char*)&bufsize, sizeof(int)), "setsockopt");*/
|
||||
|
||||
if (portReuse) {
|
||||
// pre-define ports according to tid, to avoid extra lock for race condition
|
||||
if (clientPortPool.size() == 0) {
|
||||
for (int tid = syscall(SYS_gettid), i = 1; i < 5; i++) {
|
||||
clientPortPool.push_back(std::make_pair(60000 + i * 1000 + tid % 1000, std::unordered_set<std::string>()));
|
||||
}
|
||||
}
|
||||
// find a port without conflict (different remote peer) in best effort
|
||||
int reused_port = -1;
|
||||
std::string remote_peer(ncclSocketToString(&sock->addr, line));
|
||||
for (auto& port : clientPortPool) {
|
||||
if (port.second.find(remote_peer) == port.second.end()) {
|
||||
reused_port = port.first;
|
||||
port.second.insert(remote_peer);
|
||||
break;
|
||||
}
|
||||
}
|
||||
// bind the port in fd for connect system call
|
||||
if (reused_port != -1) {
|
||||
int opt = 1;
|
||||
SYSCHECK(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)), "setsockopt");
|
||||
struct sockaddr_in sin;
|
||||
sin.sin_family = family;
|
||||
sin.sin_addr.s_addr = htonl(INADDR_ANY);
|
||||
sin.sin_port = htons(reused_port);
|
||||
SYSCHECK(bind(fd, (struct sockaddr *)&sin, salen), "bind_client_port");
|
||||
}
|
||||
}
|
||||
|
||||
TRACE(NCCL_INIT|NCCL_NET,"Connecting to socket %s", ncclSocketToString(&sock->addr, line));
|
||||
|
||||
int ret;
|
||||
|
||||
@@ -83,6 +83,11 @@ NCCL_PARAM(IbTc, "IB_TC", 0);
|
||||
NCCL_PARAM(IbArThreshold, "IB_AR_THRESHOLD", 8192);
|
||||
NCCL_PARAM(IbPciRelaxedOrdering, "IB_PCI_RELAXED_ORDERING", 2);
|
||||
|
||||
NCCL_PARAM(IbSockClientPortReuse, "IB_SOCK_CLIENT_PORT_REUSE", 0);
|
||||
NCCL_PARAM(IbSockServerPortReuse, "IB_SOCK_SERVER_PORT_REUSE", 0);
|
||||
static thread_local union ncclSocketAddress reusedAddr;
|
||||
static thread_local int reusedSockfd = -1;
|
||||
|
||||
pthread_t ncclIbAsyncThread;
|
||||
static void* ncclIbAsyncThreadMain(void* args) {
|
||||
struct ibv_context* context = (struct ibv_context*)args;
|
||||
@@ -555,7 +560,19 @@ ncclResult_t ncclIbListen(int dev, void* opaqueHandle, void** listenComm) {
|
||||
memset(handle, 0, sizeof(struct ncclIbHandle));
|
||||
comm->dev = dev;
|
||||
NCCLCHECK(GetSocketAddr(&comm->sock.addr));
|
||||
NCCLCHECK(ncclSocketListen(&comm->sock));
|
||||
if (ncclParamIbSockServerPortReuse()) {
|
||||
// reuse the socket address and fd for listen system call
|
||||
if (reusedSockfd == -1) {
|
||||
NCCLCHECK(ncclSocketListen(&comm->sock));
|
||||
memcpy(&reusedAddr, &comm->sock.addr, sizeof(union ncclSocketAddress));
|
||||
reusedSockfd = comm->sock.fd;
|
||||
} else {
|
||||
memcpy(&comm->sock.addr, &reusedAddr, sizeof(union ncclSocketAddress));
|
||||
comm->sock.fd = reusedSockfd;
|
||||
}
|
||||
} else {
|
||||
NCCLCHECK(ncclSocketListen(&comm->sock));
|
||||
}
|
||||
memcpy(&handle->connectAddr, &comm->sock.addr, sizeof(union ncclSocketAddress));
|
||||
*listenComm = comm;
|
||||
return ncclSuccess;
|
||||
@@ -579,7 +596,7 @@ ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm) {
|
||||
NCCLCHECK(ncclSocketInit(&comm->sock, &handle->connectAddr, NULL, 1));
|
||||
stage->comm = comm;
|
||||
stage->state = ncclIbCommStateConnect;
|
||||
NCCLCHECK(ncclSocketConnect(&comm->sock));
|
||||
NCCLCHECK(ncclSocketConnect(&comm->sock, ncclParamIbSockClientPortReuse()));
|
||||
|
||||
ib_connect_check:
|
||||
/* since ncclSocketConnect is async, we must check if connection is complete */
|
||||
@@ -1268,7 +1285,7 @@ ncclResult_t ncclIbCloseRecv(void* recvComm) {
|
||||
ncclResult_t ncclIbCloseListen(void* listenComm) {
|
||||
struct ncclIbListenComm* comm = (struct ncclIbListenComm*)listenComm;
|
||||
if (comm) {
|
||||
close(comm->sock.fd);
|
||||
if (!ncclParamIbSockServerPortReuse() || reusedSockfd != comm->sock.fd) close(comm->sock.fd);
|
||||
free(comm);
|
||||
}
|
||||
return ncclSuccess;
|
||||
|
||||
Ссылка в новой задаче
Block a user