因为最近的工作要和Horovod打交道,所以分析了Horovod的源码。在这里记一笔。
Horovod有几个亮点,第一,它不依托于某个框架,自己通过MPI建立了一套分布式系统,完成了allreduce, allgather等collective operations通信工作. 第二,发现了Tensor fusion, 梯度传递的时候可以将小的tensor合并成一个大的tensor再进行传递,从而减小每一次操作的额外开销(overhead). 第二点比较好理解,本文讲一下horovod的通信策略。
为了简洁,以下称Horovod为hvd。
hvd将通信和计算框架分离之后,计算框架只需要直接调用hvd接口,如HorovodAllreduceOp来进行梯度求平均即可。但是,因为计算框架往往采用多线程执行训练的计算图,所以在多节点情况下,拿allreduce操作来举例,我们不能保证每个节点上的allreduce请求是有序的。因此MPI_Allreduce并不能直接用。
为了解决这一个问题,hvd 设计了一个主从模式,rank0为master节点,rank 1-n为worker节点。除此之外,我们需要了解hvd在每个节点上定义的数据结构,每个worker节点上都有一个消息队列,而在master节点上除了一个消息队列,还有一个消息map。
每当计算框架发来通信请求时,hvd并不直接执行MPI,而是封装了这个消息并推入自己的消息队列。我们通常在python文件中使用hvd.init()来初始化hvd,实际上是开了一个后台线程和一个MPI线程。后台线程采用定时轮询的方式访问自己的消息队列,如果非空,worker会将自己收到的所有tensor通信请求都发给master。因为是同步MPI,所以每个节点会阻塞等待MPI完成。master收到worker的消息后,会记录到自己的消息map中。如果一个tensor的通信请求出现了n次,也就意味着,所有的节点都已经发出了对该tensor的通信请求,那这个tensor就需要且能够进行通信。master节点会挑选出所有符合要求的tensor进行MPI通信。不符合要求的tensor继续留在消息map中,等待条件符合。
决定了tensor以后,master又会将可以进行通信的tensor 名字和顺序发还给各个节点。至此,所有的节点都得到了即将进行的MPI的tensor和顺序,MPI通信得以进行。
以上所有操作。除了MPI通信外,均在后台进程中执行。
hvd的核心在文件中,也是本文介绍的内容。而其对Tensorflow, Pytorch的接口定义分别在horovod/tensorflow, horovod/pytorch文件夹中。
知乎link: https://zhuanlan.zhihu.com/p/45439173