Added a thread pool to WorkerPool for Linux that dynamically adds threads as needed.

Review URL: http://codereview.chromium.org/39102

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@12414 0039d316-1c4b-4281-b951-d872f2087c98
Esse commit está contido em:
willchan@chromium.org
2009-03-24 23:41:53 +00:00
commit 141da8c993
6 arquivos alterados com 509 adições e 24 exclusões
+2
Ver Arquivo
@@ -303,6 +303,7 @@
'word_iterator.h',
'worker_pool.h',
'worker_pool_linux.cc',
'worker_pool_linux.h',
'worker_pool_mac.mm',
'worker_pool_win.cc',
],
@@ -602,6 +603,7 @@
# Linux has an implementation of idle_timer, but it's unclear
# if we want it yet, so leave it 'unported' for now.
'idletimer_unittest.cc',
'worker_pool_linux_unittest.cc',
],
'dependencies': [
'../build/linux/system.gyp:gtk',
+1
Ver Arquivo
@@ -402,6 +402,7 @@ if env.Bit('linux'):
'sys_string_conversions_linux.cc',
'test_file_util_linux.cc',
'worker_pool_linux.cc',
'worker_pool_linux.h',
])
linux_version = env.Command('$BASE_DIR/file_version_info_linux.h',
+2
Ver Arquivo
@@ -119,6 +119,7 @@ input_files = ChromeFileList([
'win_util_unittest.cc',
'wmi_util_unittest.cc',
'word_iterator_unittest.cc',
'worker_pool_unittest.cc'
]),
MSVSFilter('gfx_tests', [
@@ -141,6 +142,7 @@ if env.Bit('posix'):
if env.Bit('linux'):
input_files.Append(
'data_pack_unittest.cc',
'worker_pool_linux_unittest.cc'
)
if env.Bit('mac'):
+148 -24
Ver Arquivo
@@ -3,41 +3,165 @@
// found in the LICENSE file.
#include "base/worker_pool.h"
#include "base/worker_pool_linux.h"
#include "base/lazy_instance.h"
#include "base/logging.h"
#include "base/platform_thread.h"
#include "base/ref_counted.h"
#include "base/string_util.h"
#include "base/task.h"
namespace {
void* PThreadCallback(void* param) {
Task* task = static_cast<Task*>(param);
task->Run();
delete task;
return 0;
const int kIdleSecondsBeforeExit = 10 * 60;
const int kWorkerThreadStackSize = 64 * 1024;
class WorkerPoolImpl {
public:
WorkerPoolImpl();
~WorkerPoolImpl();
void PostTask(const tracked_objects::Location& from_here, Task* task,
bool task_is_slow);
private:
scoped_refptr<base::LinuxDynamicThreadPool> pool_;
};
WorkerPoolImpl::WorkerPoolImpl()
: pool_(new base::LinuxDynamicThreadPool(
"WorkerPool", kIdleSecondsBeforeExit)) {}
WorkerPoolImpl::~WorkerPoolImpl() {
pool_->Terminate();
}
void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here,
Task* task, bool task_is_slow) {
task->SetBirthPlace(from_here);
pool_->PostTask(task);
}
base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool(base::LINKER_INITIALIZED);
class WorkerThread : public PlatformThread::Delegate {
public:
WorkerThread(const std::string& name_prefix, int idle_seconds_before_exit,
base::LinuxDynamicThreadPool* pool)
: name_prefix_(name_prefix),
idle_seconds_before_exit_(idle_seconds_before_exit),
pool_(pool) {}
virtual void ThreadMain();
private:
const std::string name_prefix_;
const int idle_seconds_before_exit_;
scoped_refptr<base::LinuxDynamicThreadPool> pool_;
DISALLOW_COPY_AND_ASSIGN(WorkerThread);
};
void WorkerThread::ThreadMain() {
const std::string name =
StringPrintf("%s/%d", name_prefix_.c_str(),
IntToString(PlatformThread::CurrentId()).c_str());
PlatformThread::SetName(name.c_str());
for (;;) {
Task* task = pool_->WaitForTask();
if (!task)
break;
task->Run();
delete task;
}
// The WorkerThread is non-joinable, so it deletes itself.
delete this;
}
} // namespace
bool WorkerPool::PostTask(const tracked_objects::Location& from_here,
Task* task, bool task_is_slow) {
task->SetBirthPlace(from_here);
pthread_t thread;
pthread_attr_t attr;
// POSIX does not have a worker thread pool implementation. For now we just
// create a thread for each task, and ignore |task_is_slow|.
// TODO(dsh): Implement thread reuse.
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
int err = pthread_create(&thread, &attr, PThreadCallback, task);
pthread_attr_destroy(&attr);
if (err) {
DLOG(ERROR) << "pthread_create failed: " << err;
delete task;
return false;
}
g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow);
return true;
}
namespace base {
LinuxDynamicThreadPool::LinuxDynamicThreadPool(
const std::string& name_prefix,
int idle_seconds_before_exit)
: name_prefix_(name_prefix),
idle_seconds_before_exit_(idle_seconds_before_exit),
tasks_available_cv_(&lock_),
num_idle_threads_(0),
terminated_(false),
num_idle_threads_cv_(NULL) {}
LinuxDynamicThreadPool::~LinuxDynamicThreadPool() {
while (!tasks_.empty()) {
Task* task = tasks_.front();
tasks_.pop();
delete task;
}
}
void LinuxDynamicThreadPool::Terminate() {
{
AutoLock locked(lock_);
DCHECK(!terminated_) << "Thread pool is already terminated.";
terminated_ = true;
}
tasks_available_cv_.Broadcast();
}
void LinuxDynamicThreadPool::PostTask(Task* task) {
AutoLock locked(lock_);
DCHECK(!terminated_) <<
"This thread pool is already terminated. Do not post new tasks.";
tasks_.push(task);
// We have enough worker threads.
if (static_cast<size_t>(num_idle_threads_) >= tasks_.size()) {
tasks_available_cv_.Signal();
} else {
// The new PlatformThread will take ownership of the WorkerThread object,
// which will delete itself on exit.
WorkerThread* worker =
new WorkerThread(name_prefix_, idle_seconds_before_exit_, this);
PlatformThread::CreateNonJoinable(kWorkerThreadStackSize, worker);
}
}
Task* LinuxDynamicThreadPool::WaitForTask() {
AutoLock locked(lock_);
if (terminated_)
return NULL;
if (tasks_.empty()) { // No work available, wait for work.
num_idle_threads_++;
if (num_idle_threads_cv_.get())
num_idle_threads_cv_->Signal();
tasks_available_cv_.TimedWait(
TimeDelta::FromSeconds(kIdleSecondsBeforeExit));
num_idle_threads_--;
if (num_idle_threads_cv_.get())
num_idle_threads_cv_->Signal();
if (tasks_.empty()) {
// We waited for work, but there's still no work. Return NULL to signal
// the thread to terminate.
return NULL;
}
}
Task* task = tasks_.front();
tasks_.pop();
return task;
}
} // namespace base
+88
Ver Arquivo
@@ -0,0 +1,88 @@
// Copyright (c) 2009 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//
// The thread pool used in the Linux implementation of WorkerPool dynamically
// adds threads as necessary to handle all tasks. It keeps old threads around
// for a period of time to allow them to be reused. After this waiting period,
// the threads exit. This thread pool uses non-joinable threads, therefore
// worker threads are not joined during process shutdown. This means that
// potentially long running tasks (such as DNS lookup) do not block process
// shutdown, but also means that process shutdown may "leak" objects. Note that
// although LinuxDynamicThreadPool spawns the worker threads and manages the
// task queue, it does not own the worker threads. The worker threads ask the
// LinuxDynamicThreadPool for work and eventually clean themselves up. The
// worker threads all maintain scoped_refptrs to the LinuxDynamicThreadPool
// instance, which prevents LinuxDynamicThreadPool from disappearing before all
// worker threads exit. The owner of LinuxDynamicThreadPool should likewise
// maintain a scoped_refptr to the LinuxDynamicThreadPool instance.
//
// NOTE: The classes defined in this file are only meant for use by the Linux
// implementation of WorkerPool. No one else should be using these classes.
// These symbols are exported in a header purely for testing purposes.
#ifndef BASE_WORKER_POOL_LINUX_H_
#define BASE_WORKER_POOL_LINUX_H_
#include <queue>
#include <string>
#include "base/basictypes.h"
#include "base/condition_variable.h"
#include "base/lock.h"
#include "base/platform_thread.h"
#include "base/ref_counted.h"
#include "base/scoped_ptr.h"
class Task;
namespace base {
class LinuxDynamicThreadPool
: public RefCountedThreadSafe<LinuxDynamicThreadPool> {
public:
class LinuxDynamicThreadPoolPeer;
// All worker threads will share the same |name_prefix|. They will exit after
// |idle_seconds_before_exit|.
LinuxDynamicThreadPool(const std::string& name_prefix,
int idle_seconds_before_exit);
~LinuxDynamicThreadPool();
// Indicates that the thread pool is going away. Stops handing out tasks to
// worker threads. Wakes up all the idle threads to let them exit.
void Terminate();
// Adds |task| to the thread pool. LinuxDynamicThreadPool assumes ownership
// of |task|.
void PostTask(Task* task);
// Worker thread method to wait for up to |idle_seconds_before_exit| for more
// work from the thread pool. Returns NULL if no work is available.
Task* WaitForTask();
private:
friend class LinuxDynamicThreadPoolPeer;
const std::string name_prefix_;
const int idle_seconds_before_exit_;
Lock lock_; // Protects all the variables below.
// Signal()s worker threads to let them know more tasks are available.
// Also used for Broadcast()'ing to worker threads to let them know the pool
// is being deleted and they can exit.
ConditionVariable tasks_available_cv_;
int num_idle_threads_;
std::queue<Task*> tasks_;
bool terminated_;
// Only used for tests to ensure correct thread ordering. It will always be
// NULL in non-test code.
scoped_ptr<ConditionVariable> num_idle_threads_cv_;
DISALLOW_COPY_AND_ASSIGN(LinuxDynamicThreadPool);
};
} // namespace base
#endif // BASE_WORKER_POOL_LINUX_H_
+268
Ver Arquivo
@@ -0,0 +1,268 @@
// Copyright (c) 2009 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/worker_pool_linux.h"
#include <set>
#include "base/condition_variable.h"
#include "base/lock.h"
#include "base/platform_thread.h"
#include "base/task.h"
#include "base/waitable_event.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace base {
// Peer class to provide passthrough access to LinuxDynamicThreadPool internals.
class LinuxDynamicThreadPool::LinuxDynamicThreadPoolPeer {
public:
explicit LinuxDynamicThreadPoolPeer(LinuxDynamicThreadPool* pool)
: pool_(pool) {}
Lock* lock() { return &pool_->lock_; }
ConditionVariable* tasks_available_cv() {
return &pool_->tasks_available_cv_;
}
const std::queue<Task*>& tasks() const { return pool_->tasks_; }
int num_idle_threads() const { return pool_->num_idle_threads_; }
ConditionVariable* num_idle_threads_cv() {
return pool_->num_idle_threads_cv_.get();
}
void set_num_idle_threads_cv(ConditionVariable* cv) {
pool_->num_idle_threads_cv_.reset(cv);
}
private:
LinuxDynamicThreadPool* pool_;
DISALLOW_COPY_AND_ASSIGN(LinuxDynamicThreadPoolPeer);
};
} // namespace base
namespace {
// IncrementingTask's main purpose is to increment a counter. It also updates a
// set of unique thread ids, and signals a ConditionVariable on completion.
// Note that since it does not block, there is no way to control the number of
// threads used if more than one IncrementingTask is consecutively posted to the
// thread pool, since the first one might finish executing before the subsequent
// PostTask() calls get invoked.
class IncrementingTask : public Task {
public:
IncrementingTask(Lock* counter_lock,
int* counter,
Lock* unique_threads_lock,
std::set<PlatformThreadId>* unique_threads)
: counter_lock_(counter_lock),
unique_threads_lock_(unique_threads_lock),
unique_threads_(unique_threads),
counter_(counter) {}
virtual void Run() {
AddSelfToUniqueThreadSet();
AutoLock locked(*counter_lock_);
(*counter_)++;
}
void AddSelfToUniqueThreadSet() {
AutoLock locked(*unique_threads_lock_);
unique_threads_->insert(PlatformThread::CurrentId());
}
private:
Lock* counter_lock_;
Lock* unique_threads_lock_;
std::set<PlatformThreadId>* unique_threads_;
int* counter_;
DISALLOW_COPY_AND_ASSIGN(IncrementingTask);
};
// BlockingIncrementingTask is a simple wrapper around IncrementingTask that
// allows for waiting at the start of Run() for a WaitableEvent to be signalled.
class BlockingIncrementingTask : public Task {
public:
BlockingIncrementingTask(Lock* counter_lock,
int* counter,
Lock* unique_threads_lock,
std::set<PlatformThreadId>* unique_threads,
Lock* num_waiting_to_start_lock,
int* num_waiting_to_start,
ConditionVariable* num_waiting_to_start_cv,
base::WaitableEvent* start)
: incrementer_(
counter_lock, counter, unique_threads_lock, unique_threads),
num_waiting_to_start_lock_(num_waiting_to_start_lock),
num_waiting_to_start_(num_waiting_to_start),
num_waiting_to_start_cv_(num_waiting_to_start_cv),
start_(start) {}
virtual void Run() {
{
AutoLock num_waiting_to_start_locked(*num_waiting_to_start_lock_);
(*num_waiting_to_start_)++;
}
num_waiting_to_start_cv_->Signal();
CHECK(start_->Wait());
incrementer_.Run();
}
private:
IncrementingTask incrementer_;
Lock* num_waiting_to_start_lock_;
int* num_waiting_to_start_;
ConditionVariable* num_waiting_to_start_cv_;
base::WaitableEvent* start_;
DISALLOW_COPY_AND_ASSIGN(BlockingIncrementingTask);
};
class LinuxDynamicThreadPoolTest : public testing::Test {
protected:
LinuxDynamicThreadPoolTest()
: pool_(new base::LinuxDynamicThreadPool("dynamic_pool", 60*60)),
peer_(pool_.get()),
counter_(0),
num_waiting_to_start_(0),
num_waiting_to_start_cv_(&num_waiting_to_start_lock_),
start_(true, false) {}
virtual void SetUp() {
peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock()));
}
virtual void TearDown() {
// Wake up the idle threads so they can terminate.
if (pool_.get()) pool_->Terminate();
}
void WaitForTasksToStart(int num_tasks) {
AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_);
while (num_waiting_to_start_ < num_tasks) {
num_waiting_to_start_cv_.Wait();
}
}
void WaitForIdleThreads(int num_idle_threads) {
AutoLock pool_locked(*peer_.lock());
while (peer_.num_idle_threads() < num_idle_threads) {
peer_.num_idle_threads_cv()->Wait();
}
}
Task* CreateNewIncrementingTask() {
return new IncrementingTask(&counter_lock_, &counter_,
&unique_threads_lock_, &unique_threads_);
}
Task* CreateNewBlockingIncrementingTask() {
return new BlockingIncrementingTask(
&counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_,
&num_waiting_to_start_lock_, &num_waiting_to_start_,
&num_waiting_to_start_cv_, &start_);
}
scoped_refptr<base::LinuxDynamicThreadPool> pool_;
base::LinuxDynamicThreadPool::LinuxDynamicThreadPoolPeer peer_;
Lock counter_lock_;
int counter_;
Lock unique_threads_lock_;
std::set<PlatformThreadId> unique_threads_;
Lock num_waiting_to_start_lock_;
int num_waiting_to_start_;
ConditionVariable num_waiting_to_start_cv_;
base::WaitableEvent start_;
};
TEST_F(LinuxDynamicThreadPoolTest, Basic) {
EXPECT_EQ(0, peer_.num_idle_threads());
EXPECT_EQ(0U, unique_threads_.size());
EXPECT_EQ(0U, peer_.tasks().size());
// Add one task and wait for it to be completed.
pool_->PostTask(CreateNewIncrementingTask());
WaitForIdleThreads(1);
EXPECT_EQ(1U, unique_threads_.size()) <<
"There should be only one thread allocated for one task.";
EXPECT_EQ(1, peer_.num_idle_threads());
EXPECT_EQ(1, counter_);
}
TEST_F(LinuxDynamicThreadPoolTest, ReuseIdle) {
// Add one task and wait for it to be completed.
pool_->PostTask(CreateNewIncrementingTask());
WaitForIdleThreads(1);
// Add another 2 tasks. One should reuse the existing worker thread.
pool_->PostTask(CreateNewBlockingIncrementingTask());
pool_->PostTask(CreateNewBlockingIncrementingTask());
WaitForTasksToStart(2);
start_.Signal();
WaitForIdleThreads(2);
EXPECT_EQ(2U, unique_threads_.size());
EXPECT_EQ(2, peer_.num_idle_threads());
EXPECT_EQ(3, counter_);
}
TEST_F(LinuxDynamicThreadPoolTest, TwoActiveTasks) {
// Add two blocking tasks.
pool_->PostTask(CreateNewBlockingIncrementingTask());
pool_->PostTask(CreateNewBlockingIncrementingTask());
EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet.";
WaitForTasksToStart(2);
start_.Signal();
WaitForIdleThreads(2);
EXPECT_EQ(2U, unique_threads_.size());
EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle.";
EXPECT_EQ(2, counter_);
}
TEST_F(LinuxDynamicThreadPoolTest, Complex) {
// Add two non blocking tasks and wait for them to finish.
pool_->PostTask(CreateNewIncrementingTask());
WaitForIdleThreads(1);
// Add two blocking tasks, start them simultaneously, and wait for them to
// finish.
pool_->PostTask(CreateNewBlockingIncrementingTask());
pool_->PostTask(CreateNewBlockingIncrementingTask());
WaitForTasksToStart(2);
start_.Signal();
WaitForIdleThreads(2);
EXPECT_EQ(3, counter_);
EXPECT_EQ(2, peer_.num_idle_threads());
EXPECT_EQ(2U, unique_threads_.size());
// Wake up all idle threads so they can exit.
{
AutoLock locked(*peer_.lock());
while (peer_.num_idle_threads() > 0) {
peer_.tasks_available_cv()->Signal();
peer_.num_idle_threads_cv()->Wait();
}
}
// Add another non blocking task. There are no threads to reuse.
pool_->PostTask(CreateNewIncrementingTask());
WaitForIdleThreads(1);
EXPECT_EQ(3U, unique_threads_.size());
EXPECT_EQ(1, peer_.num_idle_threads());
EXPECT_EQ(4, counter_);
}
} // namespace