rdma.cc
相关结构体
1 | // structure to save the address of remote channels. |
RdmaAdapter
RdmaAdapter(const WorkerEnv* worker_env)
- dev_list = ibv_get_device_list(NULL);获取主机可用的VPI设备列表
- ib_dev = dev_list[0]默认取第一个设备
- ibv_context* context = ibv_open_device(ib_dev);打开设备获取context
- ibv_pd* pd = ibv_alloc_pd(context);创建Protection Domain
- workerenv\ (worker_env)拷贝工作环境参数
- eventchannel\ = ibv_create_compchannel(context\);创建完成通道,用于通知完成队列
- cq_ = ibv_createcq(context\, MAX_CONCURRENT_WRITES * 2, NULL, eventchannel, 0);创建完成队列
- ibv_req_notifycq(cq, 0)完成完成队列与完成通道的关联
- 启动处理线程Process_CQ()
~RdmaAdapter()
- ibv_destroycq(cq)
- ibv_destroy_comp_channel(eventchannel)
- ibv_deallocpd(pd)
- ibv_closedevice(context)
void Process_CQ()
- ibv_get_cq_event(eventchannel, &cq, &cq_context)阻塞等待队列中进入新元素
- ibv_ack_cq_events(cq, 1);确认收到的事件
- ibv_req_notifycq(cq, 0)重新注册,等待下次事件触发
int ne = ibv_poll_cq(cq\_, MAX_CONCURRENT_WRITES * 2, static_cast<ibv_wc*>(wc_));
从CQ队列中获取所有的事件,ne表示事件个数- 遍历每个cqe
- 判断wc_[i].status == IBV_WC_SUCCESS,检查wr的状态是否正确
- 若wc_[i].opcode == IBV_WC_RECV_RDMA_WITH_IMM
- RdmaChannel rc = reinterpret_cast<RdmaChannel>(wc_[i].wr_id);若是接收事件,则wr_id中存放本地Channel的指针
- rc->Recv();让channel做好接收的准备
- RdmaBuffer* rb = rc->FindBuffer(wc_[i].imm_data); 利用imm_data来寻找buffer地址
- RdmaMessage::ParseMessage(rm, rb->buffer_); 将buffer中的信息解析成Message
- 判断rm.type
- 若rm.type_ == RDMA_MESSAGE_ACK
- 将本地的tx_messagebuffer的remote状态设置为空闲。
- 本地tx_messagebuffer发送下一条Message
- 若rm.type_ == RDMA_MESSAGE_TENSOR_REQUEST
- 首先通过本地的tx_ackbuffer发送ack,使得对方释放它的message buffer。
- RdmaBuffer* tb = rc->FindOrCreateBuffer(rm.name_); 通过name来寻找buffer,此处的buffer为tensor buffer。
- string key_with_stepid = VerbsUtil::AppendStepidToKey(rm.name\, rm.stepid\); 生成一个类似”tx_tensor_buffer123456”的标识符。
- tb->EnqueueItem(key_with_step_id); 将此标识符放入处理队列中。
- workerenv->compute_pool->Schedule(tb\ { tb->SendNextItem(); }); 设置定时任务,使得tx_tensor_buffer开始发送数据。
- 若rm.type_ == RDMA_MESSAGE_BUFFER_IDLE
- 首先通过本地的tx_ackbuffer发送ack,使得对方释放它的message buffer。
- RdmaBuffer* tb = rc->FindBuffer(rm.name_); 通过name来寻找tensor buffer。
- 设置此tx_tensor_buffer的remote状态为空闲,表示对方已经就绪。
- 设置定时任务,使得tx_tensor_buffer开始发送数据。
- 若rm.type_ == RDMA_MESSAGE_BUFFER_REQUEST
- 首先通过本地的tx_ackbuffer发送ack,使得对方释放它的message buffer。
- 收到此消息时,表示发送方说“现有的tensor buffer不够大小了,我已经重新创建了,大小告诉你了,你也重新创一个,我们再建立连接。”
- RdmaBuffer* tb = rc->FindOrCreateBuffer(rm.name_, TENSOR); 寻找这个tensor buffer,找到以后进行空间处理(tb->CreateCPUBuffer(rm.buffersize\);)、连接处理(tb->SetRemoteMR(rmr, true);)。
- 创建成功后,回复发送者说,我创建好了,你来跟我建立连接吧。准备发送RDMA_MESSAGE_BUFFER_RESPONSE消息。
- 通过tx_message_buffer发送此消息,消息入队。
- tx_message_buffer开始发送下一条消息。
- 若rm.type_ == RDMA_MESSAGE_BUFFER_RESPONSE
- 首先通过本地的tx_ackbuffer发送ack,使得对方释放它的message buffer。
- 寻找本地的tx_tensor_buffer,来和接收者建立链接。
- 将tx_tensor_buffer的local和remote状态都设置为空闲,准备发送数据。
- 设置定时任务,使得tx_tensor_buffer开始发送数据。
- 若rm.type_ == RDMA_MESSAGE_TENSOR_WRITE
- 设置定时任务,通过key_with_step_id (ex: “tx_tensor_buffer123456”)来运行指定的callback函数。
- 若rm.type_ == RDMA_MESSAGE_ACK
- 若wc_[i].opcode == IBV_WC_RDMA_WRITE
- RdmaBuffer rb = reinterpret_cast<RdmaBuffer>(wc_[i].wr_id); 若为本地后台发来的消息,则wr_id中存放buffer地址。一般为tx_message_buffer。
- 将该buffer的local状态设置为空闲。
- 解析buffer中的消息。
- 若buffer中的消息类型是RDMA_MESSAGE_ACK,则不做任何处理。否则创建定时任务,使得tx_message_buffer开始发送下一条数据。
RdmaChannel
RdmaChannel(const RdmaAdapter* adapter, const string local_name, const string remotename)
- qp_ = ibv_createqp(adapter->pd_, &attr); 创建Queue Pair
- ibv_modifyqp(qp, &attr, mask) 初始化QP
- 创建4个buffer并建立hash,同时加入索引表,tx_messagebuffer = new RdmaMessageBuffer(this, buffer_names[0]);
- 执行100次Recv() (ibv_post_recv()),使得buffer准备好接收。
~RdmaChannel()
- ibv_destroyqp(qp) 销毁QP
- 销毁buffer
TensorFlow相关
类关系
1 | core:refCounts=>start: core:refCounts |
1 | RendezvousMgrInterface=>start: RendezvousMgrInterface(纯虚类) |
初始化过程
verbs_server_lib.cc文件中存在静态变量
static VerbsServerRegistrar registrar;
该静态变量的构造函数中包含VERBS_SERVER服务的注册,和VerbsServerFactory服务对象的创建。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21class VerbsServerRegistrar {
public:
VerbsServerRegistrar() {
gpr_allocation_functions alloc_fns;
alloc_fns.malloc_fn = port::Malloc;
alloc_fns.realloc_fn = port::Realloc;
alloc_fns.free_fn = port::Free;
gpr_set_allocation_functions(alloc_fns);
ServerFactory::Register("VERBS_SERVER", new VerbsServerFactory());
}
};
/* static */
void ServerFactory::Register(const string& server_type,
ServerFactory* factory) {
mutex_lock l(*get_server_factory_lock());
if (!server_factories()->insert({server_type, factory}).second) {
LOG(ERROR) << "Two server factories are being registered under "
<< server_type;
}
}VerbsServerFactory类的重写函数中包含VerbsServer的创建。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16std::unique_ptr<ServerInterface> svr;
TF_CHECK_OK(NewServer(server, &svr));
TF_CHECK_OK(svr->Start());
TF_CHECK_OK(svr->Join());
class VerbsServerFactory : public ServerFactory {
public:
bool AcceptsOptions(const ServerDef& server_def) override {
return server_def.protocol() == "grpc+verbs";
}
Status NewServer(const ServerDef& server_def,
std::unique_ptr<ServerInterface>* out_server) override {
return VerbsServer::Create(server_def, Env::Default(), out_server);
}
};VerbsServer::Create是静态函数,该函数中包含对VerbsService类的对象化、VerbsServer的对象化以及INIT和RdmaRendezvousMgr的对象化(RdmaRemoteRendezvous类被TF_DISALLOW_COPY_AND_ASSIGN修饰,导致RdmaRemoteRendezvous类的拷贝构造函数和复制构造函数为私有)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60/* static */
Status VerbsServer::Create(const ServerDef& server_def, Env* env,
std::unique_ptr<ServerInterface>* out_server) {
std::unique_ptr<VerbsServer> ret(new VerbsServer(server_def, Env::Default()));
ServiceInitFunction service_func = [&ret](const WorkerEnv* worker_env,
::grpc::ServerBuilder* builder) {
return SetNewVerbsService(&ret->verbs_service_, worker_env, builder);
};
TF_RETURN_IF_ERROR(ret->Init(service_func, NewRdmaRendezvousMgr));
*out_server = std::move(ret);
return Status::OK();
}
RendezvousMgrInterface* NewRdmaRendezvousMgr(const WorkerEnv* env) {
return new RdmaRendezvousMgr(env);
}
Status VerbsServer::Init(ServiceInitFunction service_func,
RendezvousMgrCreationFunction rendezvous_mgr_func) {
Status s = GrpcServer::Init(service_func, rendezvous_mgr_func);
{
mutex_lock l(mu_);
CHECK_EQ(verbs_state_, DISCONNECTED);
CHECK(ChannelCacheFactory(server_def(), &channel_cache_).ok());
rdma_mgr_ = new RdmaMgr(worker_env(), channel_cache_);
// set rdma_mgr for verbs_service and rdma_rendezvous_mgr
verbs_service_->SetRdmaMgr(rdma_mgr_);
dynamic_cast<RdmaRendezvousMgr*>(worker_env()->rendezvous_mgr)
->SetRdmaMgr(rdma_mgr_);
}
return s;
}
// Create a GrpcVerbsService, then assign it to a given handle.
void SetNewVerbsService(GrpcVerbsService** handle, const WorkerEnv* worker_env,
::grpc::ServerBuilder* builder) {
*handle = new GrpcVerbsService(worker_env, builder);
}
RdmaMgr::RdmaMgr(const WorkerEnv* const worker_env,
GrpcChannelCache* const channel_cache)
: worker_env_(worker_env), channel_cache_(channel_cache) {
rdma_adapter_ = new RdmaAdapter(worker_env_);
// hardcoded to default session (legacy_session_)
// TODO: use WorkerSessionForSession
// need to pass in session handle
local_worker_ = worker_env_->session_mgr->LegacySession()->worker_name;
std::vector<string> workers;
worker_env_->session_mgr->LegacySession()->worker_cache->ListWorkers(
&workers);
num_remote_workers_ = workers.size() - 1;
VLOG(2) << "rmda_mgr on local worker: " << local_worker_;
for (size_t i = 0; i < workers.size(); i++) {
if (local_worker_.compare(workers[i]) != 0) {
channel_table_.insert(
{workers[i],
new RdmaChannel(rdma_adapter_, local_worker_, workers[i])});
}
}
}VerbsServer的Start函数,包含了gRPC线程的创建和Channel的设置。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36Status VerbsServer::Start() {
Status s = GrpcServer::Start();
{
mutex_lock l(mu_);
if (verbs_state_ == DISCONNECTED) {
// verbs_thread needs to be initiated
// before rdma_mgr sets up the rdma channels.
verbs_thread_.reset(worker_env()->env->StartThread(
ThreadOptions(), "TF_verbs_service",
[this] { verbs_service_->HandleRPCsLoop(); }));
rdma_mgr_->SetupChannels();
verbs_state_ = CONNECTED;
}
}
return s;
}
// This method blocks forever handling requests from the completion queue.
void GrpcVerbsService::HandleRPCsLoop() {
for (int i = 0; i < 10; ++i) {
ENQUEUE_REQUEST(GetRemoteAddress, false);
}
void* tag;
bool ok;
while (cq_->Next(&tag, &ok)) {
UntypedCall<GrpcVerbsService>::Tag* callback_tag =
static_cast<UntypedCall<GrpcVerbsService>::Tag*>(tag);
if (callback_tag) {
callback_tag->OnCompleted(this, ok);
} else {
cq_->Shutdown();
}
}
}VerbsServer的Join函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28Status VerbsServer::Join() {
Status s = GrpcServer::Join();
{
mutex_lock l(mu_);
if (verbs_state_ == CONNECTED) {
verbs_state_ = DISCONNECTED;
verbs_thread_.reset();
}
}
return s;
}
Status GrpcServer::Join() {
mutex_lock l(mu_);
switch (state_) {
case NEW:
// Prevent the server from being started subsequently.
state_ = STOPPED;
return Status::OK();
case STARTED:
case STOPPED:
master_thread_.reset();
worker_thread_.reset();
return Status::OK();
default:
CHECK(false);
}
}
Rendezvous
Rendezvous的基类为core::RefCounted,声明如下
1 | namespace core { |
Rendezvous 类声明
1 | // A Rendezvous is an abstraction for passing a Tensor |
rendezvous.cc中包含了class LocalRendezvousImpl实现,实现了Send和RecvAsync的具体实现。对外提供了如下接口:
1 | // 具体定义,在cc文件中 |
Rendezvous的一个子类RemoteRendezvous为纯虚类
1 | // RemoteRendezvous follow a 2-part initialization. First the objects are |
RemoteRendezvous类的子类BaseRemoteRendezvous
1 | // RemoteRendezvous is a Rendezvous which can handle either |
在rdma_rendezvous_mgr.cc文件中声明且定义了RdmaRemoteRendezvous子类,RdmaRemoteRendezvous不对外开放
1 | class RdmaRemoteRendezvous : public BaseRemoteRendezvous { |
RendezvousMgr
虚基类RendezvousMgrInterface,无具体实现
1 | // RendezvousMgr keeps track of a set of local rendezvous instances. |
RendezvousMgrInterface的子类 BaseRendezvousMgr
1 | // RendezvousMgr keeps track of a set of local rendezvous instances. |
BaseRendezvousMgr的子类RdmaRendezvousMgr
1 | // RendezvousMgr keeps track of a set of local rendezvous instances. |
发送
注:GraphMgr类中的ExecuteAsync函数进行Rendezvous的Init操作。
1 | void GraphMgr::ExecuteAsync(const string& handle, const int64 step_id,...) |
应用层先寻找Rendezvous
/tensorflow/core/distributed_runtime/graph_mgr.cc:418
1 | Status GraphMgr::SendInputs(const int64 step_id, const NamedTensors& in) { |
Rendezvous.send为纯虚函数,实现为BaseRemoteRendezvous::Send。Rdma中没有重写。
1 | // The caller is a tensor producer and it sends a message (a tensor |
接收
应用层先寻找Rendezvous
/tensorflow/core/distributed_runtime/graph_mgr.cc:425
1 | // 阻塞等待式接收 |
RdmaRemoteRendezvous中没有重写RecvAsync,故调用基类的RecvAsync函数
不管是阻塞式接收还是Callback式接收,都调用以下函数。阻塞式接收的Callback函数自动生成为notify函数。
1 | // This method is called only by the RecvOp. It tests to see |
若是本地操作,调用LocalRendezvousImpl类的RecvAsync。
若是远程操作,调用RDMA中的重写部分。
1 | void RdmaRemoteRendezvous::RecvFromRemoteAsync( |
1 | // Callback provided by a tensor consumer waiting on the rendezvous. |
1 | // Rdma-Write the content of the buffer |
接收详细操作:
1 | st=>start: start |
1 | Title: RdmaRemoteRendezvous::RecvFromRemoteAsync |
- 执行GraphMgr::RecvOutputsAsync,输入参数包含step_id、输出tensor和callback1。
- 执行GraphMgr::RecvOutputsFromRendezvousAsync,输入参数由step_id替换为从rendezvous_mgr找到的redezvous。函数中用callback2封装了callback1。
- 执行BaseRemoteRendezvous::RecvAsync,输入参数主要有parsed key和callback2。
- 判断src和dest。
- 若src和dest一致,则调用LocalRendezvousImpl::RecvAsync,参数主要有parsed key和callback3(封装callback2)。
- 若不一致,调用RdmaRemoteRendezvous::RecvFromRemoteAsync。参数不变。
- 接收方Recv函数:将CallBack2封装成Callback3后放入本地Channel中。
- 接收方Recv函数:将RDMA_MESSAGE_TENSOR_REQUEST消息放入tx_message_buffer。
- 接收方Message Buffer:发送消息。
- 发送方Process_CQ函数:收到RDMA_MESSAGE_TENSOR_REQUEST。
- 发送方Ack Buffer:回复RDMA_MESSAGE_ACK消息。
- 发送方Process_CQ函数:检测本地有无该Tensor(利用key搜索),将(key+step_id)组合入Queue,即将发送。
- 发送方Tensor Buffer:从Queue中获得key_with_step_id,同时创建新的rdma callback1。调用BaseRendezvousMgr::RecvLocalAsync(三参数),输入参数有step_id、通过key新生成的parsed 和 rdma callback1。
- BaseRendezvousMgr::RecvLocalAsync(三参数)通过step_id查找BaseRemoteRendezvous,同时将rdma callback1封装为rdma callback2。调用BaseRendezvousMgr::RecvLocalAsync(二参数)。
- BaseRendezvousMgr::RecvLocalAsync(二参数)将判断该Rendezvous是否被init,若未init,则将parsed和rdma callback2组装为DeferredCall放入对列中,直到Rendezvous被初始化。若已经INIT,则执行BaseRemoteRendezvous::RecvLocalAsyncInternal,输入参数不变(parsed和rdma callback2)。
- 调用LocalRendezvousImpl::RecvAsync,参数主要有parsed和rdma callback2。同4.1
- 在Table中寻找tensor。
- 未找到,存储回调,等待tensor。
- 若找到,则运行回调。若为RDMA回调,则运行如下:
- 将Tensor转为Proto。
- 若local和remote状态没准备好,则将(key+step_id)入队,对应4.2.7。
- 若local和remote状态就绪,但是tensor buffer空间不够。
- 自己创建足够大的buffer。
- 将(key+step_id)入队,对应4.2.7。
- 创建RDMA_MESSAGE_BUFFER_REQUEST消息,并发送之。
- 接收方收到后回复Ack
- …
- 若一切就绪。
- 创建RDMA_MESSAGE_TENSOR_WRITE消息,放入proto。
- 用IBV_WR_RDMA_WRITE_WITH_IMM写入远程信息。
LOG
1 |
|
结构体
WorkEnv
/tensorflow/core/distributed_runtime/worker_env.h
1 | // The worker environment class, which holds a bag of pointers to |
Item
/tensorflow/core/framework/rendezvous.cc
1 | struct Item { |
ParseKey
1 | // Constructs a rendezvous key for the tensor of "name" sent from |
调试
10.42.10.36
1 | #coding=utf-8 |
PS端
1 | docker run -it \ |
进入容器后执行
1 | cd /Example |
Worker端
1 | docker run -it \ |
进入容器后执行
1 | cd /Example |