mojavy.com

スレッドプールの実装方法について

March 03, 2014 at 08:58 PM | categories: unix, programming |

スレッドプール(thread pool)を実装するには、暇なときはthreadを寝かせておいて必要なときに起こす、というイベント通知の仕組みが必要になる。 UnixでC/C++で実装するときはpthreadの条件変数を使うのが普通だと思われるが、適当なファイルディスクリプタをopenしておいてread等でブロックさせる方法でも実装できそう。

どのようなやり方が一般的なのか、いくつか有名どころのOSSの実装を調べてみた。

libuvの場合

https://github.com/joyent/libuv

単純にpthread_cond_waitをつかっている 1

static void worker(void* arg) {
  struct uv__work* w;
  QUEUE* q;

  (void) arg;

  for (;;) {
    uv_mutex_lock(&mutex);

    while (QUEUE_EMPTY(&wq))
      uv_cond_wait(&cond, &mutex);

    q = QUEUE_HEAD(&wq);

    if (q == &exit_message)
      uv_cond_signal(&cond);
    else {
      QUEUE_REMOVE(q);
      QUEUE_INIT(q);  /* Signal uv_cancel() that the work req is
                             executing. */
    }

    uv_mutex_unlock(&mutex);

    if (q == &exit_message)
      break;

    w = QUEUE_DATA(q, struct uv__work, wq);
    w->work(w);

    uv_mutex_lock(&w->loop->wq_mutex);
    w->work = NULL;  /* Signal uv_cancel() that the work req is done
                        executing. */
    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
    uv_async_send(&w->loop->wq_async);
    uv_mutex_unlock(&w->loop->wq_mutex);
  }
}

Boost.Asioの場合

http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio.html

Boost.Asioにスレッドプールそのものは提供されてないが以下のようにして簡単に実装することができる

#include <thread>
#include <functional>
#include <boost/asio.hpp>

int main ( int argc, char* argv[] ) {
    asio::io_service io_service;
    asio::io_service::work work(io_service);

    std::vector<std::thread> threadPool;

    for(size_t t = 0; t < std::thread::hardware_concurrency(); t++){
        threadPool.push_back(thread(std::bind(&asio::io_service::run, &io_service)));
    }

    io_service.post(std::bind(an_expensive_calculation, 42));
    io_service.post(std::bind(a_long_running_task, 123));

    //Do some things with the main thread

    io_service.stop();
    for(std::thread& t : threadPool) {
        t.join();
    }
}

http://stackoverflow.com/questions/14265676/using-boostasio-thread-pool-for-general-purpose-tasks

長くなるのでコードは省略するが、io_service::postするとunixの場合は最終的にはtask_io_service::wake_one_idle_thread_and_unlockからpthread_cond_signalが呼ばれる。

memcachedの場合

https://github.com/memcached/memcached

libeventのイベント通知機能を利用して実装している。それぞれのthread初期化の際にpipeをつくって、そのfdをlibeventに渡す。 2 libevent内部でそのfdをepollなりkqueueなりでブロックして待つ。

//
// memcached.c
//
typedef struct {
    pthread_t thread_id;        /* unique ID of this thread */
    struct event_base *base;    /* libevent handle this thread uses */
    struct event notify_event;  /* listen event for notify pipe */
    int notify_receive_fd;      /* receiving end of notify pipe */
    int notify_send_fd;         /* sending end of notify pipe */
    struct thread_stats stats;  /* Stats generated by this thread */
    struct conn_queue *new_conn_queue; /* queue of new connections to handle */
    cache_t *suffix_cache;      /* suffix cache */
    uint8_t item_lock_type;     /* use fine-grained or global item lock */
} LIBEVENT_THREAD;

//
// thread.c
//
void thread_init(int nthreads, struct event_base *main_base) {
//
// 中略
//
    threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
//
// さらに中略
//
    for (i = 0; i < nthreads; i++) {
        int fds[2];
        if (pipe(fds)) {
            perror("Can't create notify pipe");
            exit(1);
        }

        threads[i].notify_receive_fd = fds[0];
        threads[i].notify_send_fd = fds[1];

        setup_thread(&threads[i]);
        /* Reserve three fds for the libevent base, and two for the pipe */
        stats.reserved_fds += 5;
    }

    /* Create threads after we've done all the libevent setup. */
    for (i = 0; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);
    }

    /* Wait for all the threads to set themselves up before returning. */
    pthread_mutex_lock(&init_lock);
    wait_for_thread_registration(nthreads);
    pthread_mutex_unlock(&init_lock);
}

pthread_cond_waitの実装

脱線するが、pthread_cond_waitがどのようにsleepにはいってるのか気になったので調べた。

https://sourceware.org/git/?p=glibc.git;a=tree;f=nptl;hb=HEAD

pthread_cond_waitのソースコードはglibcnptl以下にある。 __pthread_cond_waitlll_futex_waitを呼んでおり、これは以下のように実装されている。(以下はx86_64のもの)

#define lll_futex_wait(futex, val, private) \
  lll_futex_timed_wait(futex, val, NULL, private)

#define lll_futex_timed_wait(futex, val, timeout, private) \
  ({                                         \
    register const struct timespec *__to __asm ("r10") = timeout;          \
    int __status;                                \
    register __typeof (val) _val __asm ("edx") = (val);                \
    __asm __volatile ("syscall"                            \
             : "=a" (__status)                         \
             : "0" (SYS_futex), "D" (futex),                 \
           "S" (__lll_private_flag (FUTEX_WAIT, private)),         \
           "d" (_val), "r" (__to)                    \
             : "memory", "cc", "r11", "cx");                 \
    __status;                                    \
  })

上記アセンブラは大体以下のような意味3

futex(futex, FUTEX_WAIT, val, timeout, NULL, 0);  // 便宜上、上記コードの引数の変数名をそのままつかっているが、
                                                  // 1つめのfutexはシステムコールのfutexで、
                                                  // 2つめは引数のpthread_cond_tの__futexメンバ変数のアドレス

futex() システムコールは、 指定したアドレスの値が変更されるのをプログラムが待つ手段や 特定のアドレスに対して待機中のプロセスを wake (起床) させる手段を提供する

futex(2) http://linuxjm.sourceforge.jp/html/LDP_man-pages/man2/futex.2.html

とのこと。

まとめ

  • pthread_cond_waitをつかったもののほうが普通は高速なはず
  • memcachedのようなやりかただとユーザプロセス側でスレッドプール管理のための排他制御がほとんど不要になるので多少実装が簡単か

  1. pthread_cond_waitはunixの場合。windowsの場合はpSleepConditionVariableCS、これが使えない場合は疑似的に同様の動作をするようなラッパを定義している。 

  2. memcachedでは使用してないが、libeventはシグナルを通知する際もfdをつかう。Boost.Asioもシグナル通知はpipeを経由する。 

  3. 厳密には違う。 



About Me

pic
mojavy

Recent posts






Categories



Badges