libuv之线程池以及线程间通信源码解析

theanarkh 2019/10/13 12:06

libuv实现了一个线程池,该线程池在用户提交了第一个任务的时候初始化,而不是系统启动的时候就初始化。入口代码如下。

static void init_once(void) {
#ifndef _WIN32
  /* Re-initialize the threadpool after fork.
   * Note that this discards the global mutex and condition as well
   * as the work queue.
   */

  if (pthread_atfork(NULLNULL, &reset_once))
    abort();
#endif
  init_threads();
}

// 给线程池提交一个任务
void uv__work_submit(uv_loop_t* loop,
                     struct uv__work* w,
                     enum uv__work_kind kind,
                     void (*work)
(struct uv__work* w),
                     void (*done)(struct uv__work* w, int status)
{
  // 保证已经初始化线程,并只执行一次
  uv_once(&once, init_once);
  w->loop = loop;
  w->work = work;
  w->done = done;
  post(&w->wq, kind);
}

下面我们直接看一下线程池的初始化函数。

static void init_threads(void) {
  unsigned int i;
  const char* val;
  uv_sem_t sem;

  nthreads = ARRAY_SIZE(default_threads);
  val = getenv("UV_THREADPOOL_SIZE");
  if (val != NULL)
    nthreads = atoi(val);
  if (nthreads == 0)
    nthreads = 1;
  if (nthreads > MAX_THREADPOOL_SIZE)
    nthreads = MAX_THREADPOOL_SIZE;

  threads = default_threads;
  if (nthreads > ARRAY_SIZE(default_threads)) {
    threads = uv__malloc(nthreads * sizeof(threads[0]));
    if (threads == NULL) {
      nthreads = ARRAY_SIZE(default_threads);
      threads = default_threads;
    }
  }
  // 初始化条件变量
  if (uv_cond_init(&cond))
    abort();
  // 初始化互斥变量
  if (uv_mutex_init(&mutex))
    abort();

  // 初始化三个队列
  QUEUE_INIT(&wq);
  QUEUE_INIT(&slow_io_pending_wq);
  QUEUE_INIT(&run_slow_work_message);

  // 初始化信号量变量,值为0
  if (uv_sem_init(&sem, 0))
    abort();
  // 创建多个线程
  for (i = 0; i < nthreads; i++)
    if (uv_thread_create(threads + i, worker, &sem))
      abort();
  // 等待sem信号量为非0的时候减去一,为0则阻塞
  for (i = 0; i < nthreads; i++)
    uv_sem_wait(&sem);

  uv_sem_destroy(&sem);
}

大致上就是初始化各种变量,根据配置创建多个线程,每个线程的工作函数是worker。到这,就完成了线程池的创建,接下来我们看一下如何给线程池提交一个任务。有两种方式,libuv内部使用的是uv__work_submit函数。

// 给线程池提交一个任务
void uv__work_submit(uv_loop_t* loop,
                     struct uv__work* w,
                     enum uv__work_kind kind,
                     void (*work)
(struct uv__work* w),
                     void (*done)(struct uv__work* w, int status)
{
  // 保证已经初始化线程,并只执行一次
  uv_once(&once, init_once);
  w->loop = loop;
  w->work = work;
  w->done = done;
  post(&w->wq, kind);
}

用户使用的是uv_queue_work

int uv_queue_work(uv_loop_t* loop,
                  uv_work_t* req,
                  uv_work_cb work_cb,
                  uv_after_work_cb after_work_cb)
 
{
  if (work_cb == NULL)
    return UV_EINVAL;

  uv__req_init(loop, req, UV_WORK);
  req->loop = loop;
  req->work_cb = work_cb;
  req->after_work_cb = after_work_cb;
  uv__work_submit(loop,
                  &req->work_req,
                  UV__WORK_CPU,
                  uv__queue_work,
                  uv__queue_done);
  return 0;
}

不过这两种方式区别不大,只是做了些封装。可以这两种方式最后都是通过post函数进行提交任务的。所以我们继续看post的代码。

// 把任务插入队列等待线程处理
static void post(QUEUE* q, enum uv__work_kind kind) {
  uv_mutex_lock(&mutex);
  // 类型是慢IO
  if (kind == UV__WORK_SLOW_IO) {
    /* Insert into a separate queue. */
    // 插入慢IO对应的队列
    QUEUE_INSERT_TAIL(&slow_io_pending_wq, q);
    /*
      有慢IO任务的时候,需要给主队列wq插入一个消息节点run_slow_work_message,
      说明有慢IO任务,所以如果run_slow_work_message是空,说明还没有插入主队列。
      需要进行q = &run_slow_work_message;赋值,然后把run_slow_work_message插入
      主队列 
    */

    if (!QUEUE_EMPTY(&run_slow_work_message)) {
      /* Running slow I/O tasks is already scheduled => Nothing to do here.
         The worker that runs said other task will schedule this one as well. */

      uv_mutex_unlock(&mutex);
      return;
    }
    q = &run_slow_work_message;
  }
  // 把节点插入主队列,可能是慢IO消息节点或者一般任务
  QUEUE_INSERT_TAIL(&wq, q);
  // 有空闲线程则唤醒他
  if (idle_threads > 0)
    uv_cond_signal(&cond);
  uv_mutex_unlock(&mutex);
}

post根据任务的类型,把节点插入到相应的队列。到这里,就完成了任务的提交,接下来就是等待任务的执行和完成。每个线程的工作函数是worker。我们看看代码。

static void worker(void* arg) {
  struct uv__workw;
  QUEUE* q;
  int is_slow_work;

  uv_sem_post((uv_sem_t*) arg);
  arg = NULL;

  uv_mutex_lock(&mutex);
  for (;;) {
    /* `mutex` should always be locked at this point. */

    /* Keep waiting while either no work is present or only slow I/O
       and we're at the threshold for that. */

    /*
      如果队列为空,或者不为空,但是队列里只有慢IO任务且正在执行的慢IO任务个数达到阈值,
      则空闲函数加一,等待费慢IO任务,防止慢IO占用资源线程,导致其他快的任务无法得到执行
    */

    while (QUEUE_EMPTY(&wq) ||
           (QUEUE_HEAD(&wq) == &run_slow_work_message &&
            QUEUE_NEXT(&run_slow_work_message) == &wq &&
            slow_io_work_running >= slow_work_thread_threshold())) {
      idle_threads += 1;
      // 阻塞,等待唤醒
      uv_cond_wait(&cond, &mutex);
      idle_threads -= 1;
    }
    // 取出头结点,头指点可能是退出消息、慢IO,一般请求
    q = QUEUE_HEAD(&wq);
    // 如果头结点是退出消息,则结束线程
    if (q == &exit_message) {
      uv_cond_signal(&cond);
      uv_mutex_unlock(&mutex);
      break;
    }
    // 移除节点 
    QUEUE_REMOVE(q);
    QUEUE_INIT(q);  /* Signal uv_cancel() that the work req is executing. */

    is_slow_work = 0;
    // 如果当前节点等于慢IO节点
    if (q == &run_slow_work_message) {
      /* If we're at the slow I/O threshold, re-schedule until after all
         other work in the queue is done. */

      // 遇到阈值,重新入队 
      if (slow_io_work_running >= slow_work_thread_threshold()) {
        QUEUE_INSERT_TAIL(&wq, q);
        continue;
      }

      /* If we encountered a request to run slow I/O work but there is none
         to run, that means it's cancelled => Start over. */

      // 没有慢IO任务则继续
      if (QUEUE_EMPTY(&slow_io_pending_wq))
        continue;
      // 处理慢IO任务
      is_slow_work = 1;
      // 正在处理慢IO任务的个数累加,用于其他线程判断慢IO任务个数是否达到阈值
      slow_io_work_running++;
      // 取出任务
      q = QUEUE_HEAD(&slow_io_pending_wq);
      QUEUE_REMOVE(q);
      QUEUE_INIT(q);

      /* If there is more slow I/O work, schedule it to be run as well. */
      // 取出一个任务后,如果还有满IO任务则把慢IO标记节点重新入队,表示还有满IO任务,因为上面把该标记节点出队了 
      if (!QUEUE_EMPTY(&slow_io_pending_wq)) {
        QUEUE_INSERT_TAIL(&wq, &run_slow_work_message);
        // 有空闲线程则唤醒他,因为还有任务处理
        if (idle_threads > 0)
          uv_cond_signal(&cond);
      }
    }

    uv_mutex_unlock(&mutex);
    // q是慢IO或者一般任务
    w = QUEUE_DATA(q, struct uv__work, wq);
    // 执行业务回调,该函数一般会阻塞
    w->work(w);

    uv_mutex_lock(&w->loop->wq_mutex);
    w->work = NULL;  /* Signal uv_cancel() that the work req is done
                        executing. */

    // 执行完任务,插入到loop的wq队列,在uv__work_done的时候会执行该队列的节点
    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
    // 通知loop的wq_async节点
    uv_async_send(&w->loop->wq_async);
    uv_mutex_unlock(&w->loop->wq_mutex);

    /* Lock `mutex` since that is expected at the start of the next
     * iteration. */

    uv_mutex_lock(&mutex);
    // 执行完满IO任务,记录正在执行的慢IO个数变量减1
    if (is_slow_work) {
      /* `slow_io_work_running` is protected by `mutex`. */
      slow_io_work_running--;
    }
  }
}

worker函数就是死循环从队列里取出任务执行,执行完之后通知主线程。还有一些关于慢IO任务的优化。因为提交任务有两种方式,所以执行任务时对应的函数也不一样。如果是用户通过uv_queue_work提交的,对应的的执行函数就是。

static void uv__queue_work(struct uv__work* w) {
  uv_work_t* req = container_of(w, uv_work_t, work_req);

  req->work_cb(req);
}

其实也只是做了层封装,wrok_cb对应的就是用户设置的函数。工作函数一般是阻塞的,所以会导致线程的阻塞,这就是线程池的意义。一个线程挂起,另一个可以继续执行任务。等待阻塞返回时,线程会通知主线程。重点work函数里的这两句代码。

    // 执行完任务,插入到loop的wq队列,在uv__work_done的时候会执行该队列的节点
    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
    // 通知loop的wq_async节点
    uv_async_send(&w->loop->wq_async);

我们看一下uv_async_send的代码。

int uv_async_send(uv_async_t* handle) {
  /* Do a cheap read first. */
  if (ACCESS_ONCE(int, handle->pending) != 0)
    return 0;
  // 如pending是0,则设置为1,返回0,如果是1则返回1,所以如果多次调用该函数是会被合并的
  if (cmpxchgi(&handle->pending, 01) == 0)
    uv__async_send(handle->loop);

  return 0;
}


static void uv__async_send(uv_loop_t* loop) {
  const void* buf;
  ssize_t len;
  int fd;
  int r;

  buf = "";
  len = 1;
  fd = loop->async_wfd;

#if defined(__linux__)
  // 说明用的是eventfd而不是管道
  if (fd == -1) {
    static const uint64_t val = 1;
    buf = &val;
    len = sizeof(val);
    // 见uv__async_start
    fd = loop->async_io_watcher.fd;  /* eventfd */
  }
#endif
  // 通知读端
  do
    r = write(fd, buf, len);
  while (r == -1 && errno == EINTR);

  if (r == len)
    return;

  if (r == -1)
    if (errno == EAGAIN || errno == EWOULDBLOCK)
      return;

  abort();
}

子线程给主线程的wq_async对应的IO观察者发送消息。然后在poll IO阶段,epoll_wai就会监听到这个事件,从而执行回调函数。由uv_loop_init初始化时代码可知,回调函数是uv__work_done。

 uv_loop_init函数: err = uv_async_init(loop, &loop->wq_async, uv__work_done);

void uv__work_done(uv_async_t* handle) {
  struct uv__workw;
  uv_loop_t* loop;
  QUEUE* q;
  QUEUE wq;
  int err;

  loop = container_of(handle, uv_loop_t, wq_async);

  uv_mutex_lock(&loop->wq_mutex);
  // 把loop->wq队列的节点全部移到wp变量中,wq的队列在线程处理函数work里进行设置
  QUEUE_MOVE(&loop->wq, &wq);
  uv_mutex_unlock(&loop->wq_mutex);

  while (!QUEUE_EMPTY(&wq)) {
    q = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(q);

    w = container_of(q, struct uv__work, wq);
    err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
    // 执行回调
    w->done(w, err);
  }
}

这里又和提交任务的方式有关,如果是用户通过uv_queue_work提交的任务,这时候的回调uv__queue_done。该函数也是做了一些封装。

static void uv__queue_done(struct uv__work* w, int err) {
  uv_work_t* req;

  req = container_of(w, uv_work_t, work_req);
  uv__req_unregister(req->loop, req);

  if (req->after_work_cb == NULL)
    return;
  // 用户设置的回调
  req->after_work_cb(req, err);
}

如果是libuv本身提交的任务。则回调函数即libuv设置的,没有经过封装。最后提一下libuv提供的取消任务函数。

static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
  int cancelled;
  // 加锁,为了把节点移出队列
  uv_mutex_lock(&mutex);
  // 加锁,为了判断w->wq是否为空
  uv_mutex_lock(&w->loop->wq_mutex);
  // w在一个队列中并work不为空,则可取消
  cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;
  // 删除该节点
  if (cancelled)
    QUEUE_REMOVE(&w->wq);

  uv_mutex_unlock(&w->loop->wq_mutex);
  uv_mutex_unlock(&mutex);

  if (!cancelled)
    return UV_EBUSY;
  // 重置回调函数
  w->work = uv__cancelled;

  uv_mutex_lock(&loop->wq_mutex);
   // 插入loop的wq队列
  QUEUE_INSERT_TAIL(&loop->wq, &w->wq);
  // 通知主线程执行任务回调
  uv_async_send(&loop->wq_async);
  uv_mutex_unlock(&loop->wq_mutex);

  return 0;
}

到这,libuv的线程池就解析完了。


theanarkh
+ 关注