Add an option to hhvm to 503 requests if they have been sitting on the select queue for too long.

Add an option to hhvm to 503 requests if they have been sitting on the select
queue for too long. Threshold is configurable via runtime option and default off
for now. This works with either LIFO or FIFO mode. Basicallly before worker
dequeues a request, it peek at the beginning of queue to see if it's expired. If
so, it takes that off the queue and aborts the request without executing
anything.
Esse commit está contido em:
Stephen Chen
2013-06-26 16:18:24 -07:00
commit de Sara Golemon
commit a112d17438
6 arquivos alterados com 293 adições e 56 exclusões
+3
Ver Arquivo
@@ -110,6 +110,7 @@ int RuntimeOption::ServerWarmupThrottleRequestCount =
kDefaultWarmupThrottleRequestCount;
int RuntimeOption::ServerThreadDropCacheTimeoutSeconds = 0;
int RuntimeOption::ServerThreadJobLIFOSwitchThreshold = INT_MAX;
int RuntimeOption::ServerThreadJobMaxQueuingMilliSeconds = -1;
bool RuntimeOption::ServerThreadDropStack = false;
bool RuntimeOption::ServerHttpSafeMode = false;
bool RuntimeOption::ServerStatCache = true;
@@ -698,6 +699,8 @@ void RuntimeOption::Load(Hdf &config, StringVec *overwrites /* = NULL */,
ServerThreadJobLIFOSwitchThreshold =
server["ThreadJobLIFOSwitchThreshold"].getInt32(
ServerThreadJobLIFOSwitchThreshold);
ServerThreadJobMaxQueuingMilliSeconds =
server["ThreadJobMaxQueuingMilliSeconds"].getInt16(-1);
ServerThreadDropStack = server["ThreadDropStack"].getBool();
ServerHttpSafeMode = server["HttpSafeMode"].getBool();
ServerStatCache = server["StatCache"].getBool(true);
+1
Ver Arquivo
@@ -102,6 +102,7 @@ public:
static bool ServerThreadRoundRobin;
static int ServerThreadDropCacheTimeoutSeconds;
static int ServerThreadJobLIFOSwitchThreshold;
static int ServerThreadJobMaxQueuingMilliSeconds;
static bool ServerThreadDropStack;
static bool ServerHttpSafeMode;
static bool ServerStatCache;
+20 -1
Ver Arquivo
@@ -84,6 +84,15 @@ LibEventWorker::~LibEventWorker() {
}
void LibEventWorker::doJob(LibEventJobPtr job) {
doJobImpl(job, false /*abort*/);
}
void LibEventWorker::abortJob(LibEventJobPtr job) {
doJobImpl(job, true /*abort*/);
m_requestsTimedOutOnQueue->addValue(1);
}
void LibEventWorker::doJobImpl(LibEventJobPtr job, bool abort) {
job->stopTimer();
evhttp_request *request = job->request;
assert(m_opaque);
@@ -97,6 +106,12 @@ void LibEventWorker::doJob(LibEventJobPtr job) {
#endif
bool error = true;
std::string errorMsg;
if (abort) {
transport.sendString("Service Unavailable", 503);
return;
}
try {
std::string cmd = transport.getCommand();
cmd = std::string("/") + cmd;
@@ -136,6 +151,9 @@ void LibEventWorker::onThreadEnter() {
Eval::Debugger::RegisterThread();
}
m_handler = server->createRequestHandler();
m_requestsTimedOutOnQueue =
ServiceData::createTimeseries("requests_timed_out_on_queue",
{ServiceData::StatsType::COUNT});
}
void LibEventWorker::onThreadExit() {
@@ -154,7 +172,8 @@ LibEventServer::LibEventServer(const std::string &address, int port,
m_dispatcher(thread, RuntimeOption::ServerThreadRoundRobin,
RuntimeOption::ServerThreadDropCacheTimeoutSeconds,
RuntimeOption::ServerThreadDropStack,
this, RuntimeOption::ServerThreadJobLIFOSwitchThreshold),
this, RuntimeOption::ServerThreadJobLIFOSwitchThreshold,
RuntimeOption::ServerThreadJobMaxQueuingMilliSeconds),
m_dispatcherThread(this, &LibEventServer::dispatch) {
m_eventBase = event_base_new();
m_server = evhttp_new(m_eventBase);
+4
Ver Arquivo
@@ -22,6 +22,7 @@
#include "hphp/runtime/server/job_queue_vm_stack.h"
#include "hphp/util/job_queue.h"
#include "hphp/util/process.h"
#include "hphp/util/service_data.h"
namespace HPHP {
///////////////////////////////////////////////////////////////////////////////
@@ -59,6 +60,7 @@ struct LibEventWorker
* Request handler called by LibEventServer.
*/
virtual void doJob(LibEventJobPtr job);
virtual void abortJob(LibEventJobPtr job);
/**
* Called when thread enters and exits.
@@ -67,7 +69,9 @@ struct LibEventWorker
virtual void onThreadExit();
private:
void doJobImpl(LibEventJobPtr job, bool abort);
std::unique_ptr<RequestHandler> m_handler;
ServiceData::ExportedTimeSeries* m_requestsTimedOutOnQueue;
};
/**
+156 -51
Ver Arquivo
@@ -17,14 +17,17 @@
#ifndef incl_HPHP_UTIL_JOB_QUEUE_H_
#define incl_HPHP_UTIL_JOB_QUEUE_H_
#include <time.h>
#include <vector>
#include <set>
#include "hphp/util/async_func.h"
#include "hphp/util/synchronizable_multi.h"
#include "hphp/util/lock.h"
#include "hphp/util/atomic.h"
#include "hphp/util/alloc.h"
#include "hphp/util/async_func.h"
#include "hphp/util/atomic.h"
#include "hphp/util/compatibility.h"
#include "hphp/util/exception.h"
#include "hphp/util/lock.h"
#include "hphp/util/logger.h"
#include "hphp/util/synchronizable_multi.h"
namespace HPHP {
///////////////////////////////////////////////////////////////////////////////
@@ -103,19 +106,24 @@ public:
* Constructor.
*/
JobQueue(int threadCount, bool threadRoundRobin, int dropCacheTimeout,
bool dropStack, int lifoSwitchThreshold=INT_MAX)
bool dropStack, int lifoSwitchThreshold=INT_MAX,
int maxJobQueuingMs=-1)
: SynchronizableMulti(threadRoundRobin ? 1 : threadCount),
m_jobCount(0), m_stopped(false), m_workerCount(0),
m_dropCacheTimeout(dropCacheTimeout), m_dropStack(dropStack),
m_lifoSwitchThreshold(lifoSwitchThreshold) {
m_lifoSwitchThreshold(lifoSwitchThreshold),
m_maxJobQueuingMs(maxJobQueuingMs),
m_jobReaperId(-1) {
}
/**
* Put a job into the queue and notify a worker to pick it up.
*/
void enqueue(TJob job) {
timespec enqueueTime;
clock_gettime(CLOCK_MONOTONIC, &enqueueTime);
Lock lock(this);
m_jobs.push_back(job);
m_jobs.emplace_back(job, enqueueTime);
m_jobCount = m_jobs.size();
notify();
}
@@ -125,39 +133,14 @@ public:
* by this queue class, it's up to a worker class on whether to deallocate
* the job object correctly.
*/
TJob dequeue(int id, bool inc = false) {
Lock lock(this);
bool flushed = false;
while (m_jobs.empty()) {
if (m_stopped) {
throw StopSignal();
}
if (m_dropCacheTimeout <= 0 || flushed) {
wait(id, false);
} else if (!wait(id, true, m_dropCacheTimeout)) {
// since we timed out, maybe we can turn idle without holding memory
if (m_jobs.empty()) {
ScopedUnlock unlock(this);
Util::flush_thread_caches();
if (m_dropStack && Util::s_stackLimit) {
Util::flush_thread_stack();
}
DropCachePolicy::dropCache();
flushed = true;
}
}
TJob dequeueMaybeExpired(int id, bool inc, bool* expired) {
if (id == m_jobReaperId.load()) {
*expired = true;
return dequeueOnlyExpiredImpl(id, inc);
}
if (inc) incActiveWorker();
m_jobCount = m_jobs.size() - 1;
if (m_jobCount >= m_lifoSwitchThreshold) {
TJob job = m_jobs.back();
m_jobs.pop_back();
return job;
}
TJob job = m_jobs.front();
m_jobs.pop_front();
return job;
timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
return dequeueMaybeExpiredImpl(id, inc, now, expired);
}
/**
@@ -192,25 +175,125 @@ public:
return m_jobCount;
}
/**
* One worker can be designated as the job reaper. The job reaper's job is to
* check if the oldest job on the queue has expired and if so, terminate that
* job without processing it. When the job reaper work calls
* dequeueMaybeExpired(), it'll only return the oldest job and only if it's
* expired. Otherwise dequeueMaybeExpired() will block until a job expires.
*/
void setJobReaperId(int id) {
assert(m_maxJobQueuingMs > 0);
m_jobReaperId.store(id);
}
int getJobReaperId() const {
return m_jobReaperId.load();
}
private:
friend class JobQueue_Expiration_Test;
TJob dequeueMaybeExpiredImpl(int id, bool inc, const timespec& now,
bool* expired) {
*expired = false;
Lock lock(this);
bool flushed = false;
while (m_jobs.empty()) {
if (m_stopped) {
throw StopSignal();
}
if (m_dropCacheTimeout <= 0 || flushed) {
wait(id, false);
} else if (!wait(id, true, m_dropCacheTimeout)) {
// since we timed out, maybe we can turn idle without holding memory
if (m_jobs.empty()) {
ScopedUnlock unlock(this);
Util::flush_thread_caches();
if (m_dropStack && Util::s_stackLimit) {
Util::flush_thread_stack();
}
DropCachePolicy::dropCache();
flushed = true;
}
}
}
if (inc) incActiveWorker();
m_jobCount = m_jobs.size() - 1;
// peek at the beginning of the queue to see if the request has already
// timed out.
if (m_maxJobQueuingMs > 0 &&
gettime_diff_us(m_jobs.front().second, now) >
m_maxJobQueuingMs * 1000) {
*expired = true;
TJob job = m_jobs.front().first;
m_jobs.pop_front();
return job;
}
if (m_jobCount >= m_lifoSwitchThreshold) {
TJob job = m_jobs.back().first;
m_jobs.pop_back();
return job;
}
TJob job = m_jobs.front().first;
m_jobs.pop_front();
return job;
}
TJob dequeueOnlyExpiredImpl(int id, bool inc) {
Lock lock(this);
assert(m_maxJobQueuingMs > 0);
while(!m_stopped) {
long waitTimeUs = m_maxJobQueuingMs * 1000;
if (!m_jobs.empty()) {
timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
int64_t queuedTimeUs = gettime_diff_us(m_jobs.front().second, now);
if (queuedTimeUs > m_maxJobQueuingMs * 1000) {
if (inc) incActiveWorker();
m_jobCount = m_jobs.size() - 1;
TJob job = m_jobs.front().first;
m_jobs.pop_front();
return job;
}
// oldest job hasn't expired yet. wake us up when it will.
waitTimeUs = m_maxJobQueuingMs * 1000 - queuedTimeUs;
}
if (wait(id, false, waitTimeUs / 1000000, waitTimeUs % 1000000)) {
// We got woken up by somebody calling notify (as opposed to timeout),
// then some work might be on the queue. We only expire things here, so
// let's notify somebody else as well.
notify();
}
}
throw StopSignal();
}
int m_jobCount;
std::deque<TJob> m_jobs;
std::deque<std::pair<TJob, timespec>> m_jobs;
bool m_stopped;
int m_workerCount;
int m_dropCacheTimeout;
bool m_dropStack;
int m_lifoSwitchThreshold;
const int m_dropCacheTimeout;
const bool m_dropStack;
const int m_lifoSwitchThreshold;
const int m_maxJobQueuingMs;
std::atomic<int> m_jobReaperId;
};
template<class TJob, class Policy>
struct JobQueue<TJob,true,Policy> : JobQueue<TJob,false,Policy> {
JobQueue(int threadCount, bool threadRoundRobin, int dropCacheTimeout,
bool dropStack, int lifoSwitchThreshold=INT_MAX) :
bool dropStack, int lifoSwitchThreshold=INT_MAX,
int maxJobQueuingMs=-1) :
JobQueue<TJob,false,Policy>(threadCount,
threadRoundRobin,
dropCacheTimeout,
dropStack,
lifoSwitchThreshold) {
lifoSwitchThreshold,
maxJobQueuingMs) {
pthread_cond_init(&m_cond, nullptr);
}
~JobQueue() {
@@ -279,6 +362,9 @@ public:
* The only functions a subclass needs to implement.
*/
virtual void doJob(TJob job) = 0;
virtual void abortJob(TJob job) {
Logger::Warning("Job dropped by JobQueueDispatcher because of timeout.");
}
virtual void onThreadEnter() {}
virtual void onThreadExit() {}
@@ -290,8 +376,13 @@ public:
onThreadEnter();
while (!m_stopped) {
try {
TJob job = m_queue->dequeue(m_id, countActive);
doJob(job);
bool expired = false;
TJob job = m_queue->dequeueMaybeExpired(m_id, countActive, &expired);
if (expired) {
abortJob(job);
} else {
doJob(job);
}
if (countActive) {
if (!m_queue->decActiveWorker() && waitable) {
Lock lock(m_queue);
@@ -338,11 +429,13 @@ public:
*/
JobQueueDispatcher(int threadCount, bool threadRoundRobin,
int dropCacheTimeout, bool dropStack, void *opaque,
int lifoSwitchThreshold = INT_MAX)
int lifoSwitchThreshold = INT_MAX,
int maxJobQueuingMs = -1)
: m_stopped(true), m_id(0), m_opaque(opaque),
m_maxThreadCount(threadCount),
m_queue(threadCount, threadRoundRobin, dropCacheTimeout, dropStack,
lifoSwitchThreshold) {
lifoSwitchThreshold, maxJobQueuingMs),
m_startReaperThread(maxJobQueuingMs > 0) {
assert(threadCount >= 1);
if (!TWorker::CountActive) {
// If TWorker does not support counting the number of
@@ -400,6 +493,14 @@ public:
(*iter)->start();
}
m_stopped = false;
if (m_startReaperThread) {
// If we have set a max timeout for requests on the queue, start a reaper
// thread just for expiring off old requests so we guarantee requests are
// taken off the queue as soon as possible when they expire even if all
// other worker threads are stalled.
m_queue.setJobReaperId(addWorkerImpl(true));
}
}
/**
@@ -506,17 +607,21 @@ private:
Mutex m_mutex;
std::set<TWorker*> m_workers;
std::set<AsyncFunc<TWorker> *> m_funcs;
const bool m_startReaperThread;
void addWorkerImpl(bool start) {
// return the id for the worker.
int addWorkerImpl(bool start) {
TWorker *worker = new TWorker();
AsyncFunc<TWorker> *func = new AsyncFunc<TWorker>(worker, &TWorker::start);
m_workers.insert(worker);
m_funcs.insert(func);
worker->create(m_id++, &m_queue, func, m_opaque);
int id = m_id++;
worker->create(id, &m_queue, func, m_opaque);
if (start) {
func->start();
}
return id;
}
};
+109 -4
Ver Arquivo
@@ -14,6 +14,8 @@
+----------------------------------------------------------------------+
*/
#include "hphp/util/job_queue.h"
#include <thread>
#include "gtest/gtest.h"
namespace HPHP {
@@ -28,8 +30,9 @@ TEST(JobQueue, Ordering) {
EXPECT_EQ(100, job_queue.getQueuedJobs());
bool expired;
for (int i = 0; i < 100; ++i) {
EXPECT_EQ(i, job_queue.dequeue(0));
EXPECT_EQ(i, job_queue.dequeueMaybeExpired(0, false, &expired));
}
}
@@ -42,8 +45,9 @@ TEST(JobQueue, Ordering) {
EXPECT_EQ(100, job_queue.getQueuedJobs());
bool expired;
for (int i = 0; i < 100; ++i) {
EXPECT_EQ(100 - i - 1, job_queue.dequeue(0));
EXPECT_EQ(100 - i - 1, job_queue.dequeueMaybeExpired(0, false, &expired));
}
}
@@ -56,13 +60,114 @@ TEST(JobQueue, Ordering) {
EXPECT_EQ(100, job_queue.getQueuedJobs());
bool expired;
for (int i = 0; i < 50; ++i) {
EXPECT_EQ(100 - i - 1, job_queue.dequeue(0));
EXPECT_EQ(100 - i - 1, job_queue.dequeueMaybeExpired(0, false, &expired));
}
for (int i = 0; i < 50; ++i) {
EXPECT_EQ(i, job_queue.dequeue(0));
EXPECT_EQ(i, job_queue.dequeueMaybeExpired(0, false, &expired));
}
}
}
TEST(JobQueue, Expiration) {
timespec timeOk;
clock_gettime(CLOCK_MONOTONIC, &timeOk);
timespec timeExpired = timeOk;
timeExpired.tv_sec += 31;
{
JobQueue<int> fifo_queue(1, false, 0, false, INT_MAX, 30000);
fifo_queue.enqueue(1);
fifo_queue.enqueue(2);
fifo_queue.enqueue(3);
bool expired = false;
EXPECT_EQ(1, fifo_queue.dequeueMaybeExpiredImpl(0, true, timeOk, &expired));
EXPECT_FALSE(expired);
EXPECT_EQ(2, fifo_queue.dequeueMaybeExpiredImpl(0, true, timeExpired,
&expired));
EXPECT_TRUE(expired);
EXPECT_EQ(3, fifo_queue.dequeueMaybeExpiredImpl(0, true, timeOk, &expired));
EXPECT_FALSE(expired);
}
{
JobQueue<int> lifo_queue(1, false, 0, false, 0, 30000);
lifo_queue.enqueue(1);
lifo_queue.enqueue(2);
lifo_queue.enqueue(3);
bool expired = false;
EXPECT_EQ(3, lifo_queue.dequeueMaybeExpiredImpl(0, true, timeOk, &expired));
EXPECT_FALSE(expired);
// now we should get a job from the beginning of the queue even though we
// are in lifo mode before request expiration is enabled.
EXPECT_EQ(1, lifo_queue.dequeueMaybeExpiredImpl(0, true, timeExpired,
&expired));
EXPECT_TRUE(expired);
EXPECT_EQ(2, lifo_queue.dequeueMaybeExpiredImpl(0, true, timeOk, &expired));
EXPECT_FALSE(expired);
}
{
// job reaper.
JobQueue<int> lifo_queue(1, false, 0, false, 0, 30000);
lifo_queue.enqueue(1);
lifo_queue.enqueue(2);
lifo_queue.enqueue(3);
lifo_queue.enqueue(4);
lifo_queue.enqueue(5);
lifo_queue.setJobReaperId(1);
// manipulate m_jobs timestamp to simulate time passing.
lifo_queue.m_jobs[0].second.tv_sec -= 32;
lifo_queue.m_jobs[1].second.tv_sec -= 31;
// having job reaper should not affect anything other threads are doing.
bool expired = false;
EXPECT_EQ(1, lifo_queue.dequeueMaybeExpired(0, true, &expired));
EXPECT_TRUE(expired);
EXPECT_EQ(2, lifo_queue.dequeueMaybeExpired(1, true, &expired));
EXPECT_TRUE(expired);
// now no more jobs are expired. job reaper would block.
std::atomic<int> value(-1);
std::thread thread([&]() {
bool expired;
value.store(lifo_queue.dequeueMaybeExpired(1, true, &expired));
});
EXPECT_EQ(-1, value.load());
// even if you notify it.
lifo_queue.notify();
EXPECT_EQ(-1, value.load());
// but normal workers should proceed.
EXPECT_EQ(5, lifo_queue.dequeueMaybeExpired(0, true, &expired));
EXPECT_FALSE(expired);
// now set the first job to be expired.
lifo_queue.m_jobs[0].second.tv_sec -= 32;
lifo_queue.notify();
// busy wait until value is updated.
thread.join();
EXPECT_EQ(3, value.load());
// stop case
bool exceptionCaught = false;
std::thread thread1([&]() {
bool expired;
try {
lifo_queue.dequeueMaybeExpired(1, true, &expired);
} catch (const JobQueue<int>::StopSignal&) {
exceptionCaught = true;
}
});
lifo_queue.stop();
thread1.join();
EXPECT_TRUE(exceptionCaught);
}
}
}