Add an option to allow hphp's request queue dynamically switch between FIFO and LIFO.
Add an option to allow hphp's request queue dynamically switch between FIFO and LIFO. By default everything is still FIFO like today. Setting ServerThreadJobLIFO to true turns everything to LIFO like today. If ServerThreadJobLIFOThreshold is also set, then we do FIFO up until the queue length hits the threshold. Once the threshold is crossed, workers will take quests from the end of the queue. Once the queue size shrink below the threshold, we resume FIFO order. This behavior helps us prioritize newer requests when the server is loaded.
Esse commit está contido em:
@@ -108,7 +108,7 @@ constexpr int kDefaultWarmupThrottleRequestCount = 0;
|
||||
int RuntimeOption::ServerWarmupThrottleRequestCount =
|
||||
kDefaultWarmupThrottleRequestCount;
|
||||
int RuntimeOption::ServerThreadDropCacheTimeoutSeconds = 0;
|
||||
bool RuntimeOption::ServerThreadJobLIFO = false;
|
||||
int RuntimeOption::ServerThreadJobLIFOSwitchThreshold = INT_MAX;
|
||||
bool RuntimeOption::ServerThreadDropStack = false;
|
||||
bool RuntimeOption::ServerHttpSafeMode = false;
|
||||
bool RuntimeOption::ServerStatCache = true;
|
||||
@@ -689,7 +689,12 @@ void RuntimeOption::Load(Hdf &config, StringVec *overwrites /* = NULL */,
|
||||
);
|
||||
ServerThreadDropCacheTimeoutSeconds =
|
||||
server["ThreadDropCacheTimeoutSeconds"].getInt32(0);
|
||||
ServerThreadJobLIFO = server["ThreadJobLIFO"].getBool();
|
||||
if (server["ThreadJobLIFO"].getBool()) {
|
||||
ServerThreadJobLIFOSwitchThreshold = 0;
|
||||
}
|
||||
ServerThreadJobLIFOSwitchThreshold =
|
||||
server["ThreadJobLIFOSwitchThreshold"].getInt32(
|
||||
ServerThreadJobLIFOSwitchThreshold);
|
||||
ServerThreadDropStack = server["ThreadDropStack"].getBool();
|
||||
ServerHttpSafeMode = server["HttpSafeMode"].getBool();
|
||||
ServerStatCache = server["StatCache"].getBool(true);
|
||||
|
||||
@@ -100,7 +100,7 @@ public:
|
||||
static int ServerWarmupThrottleRequestCount;
|
||||
static bool ServerThreadRoundRobin;
|
||||
static int ServerThreadDropCacheTimeoutSeconds;
|
||||
static bool ServerThreadJobLIFO;
|
||||
static int ServerThreadJobLIFOSwitchThreshold;
|
||||
static bool ServerThreadDropStack;
|
||||
static bool ServerHttpSafeMode;
|
||||
static bool ServerStatCache;
|
||||
|
||||
@@ -158,7 +158,7 @@ LibEventServer::LibEventServer(const std::string &address, int port,
|
||||
m_dispatcher(thread, RuntimeOption::ServerThreadRoundRobin,
|
||||
RuntimeOption::ServerThreadDropCacheTimeoutSeconds,
|
||||
RuntimeOption::ServerThreadDropStack,
|
||||
this, RuntimeOption::ServerThreadJobLIFO),
|
||||
this, RuntimeOption::ServerThreadJobLIFOSwitchThreshold),
|
||||
m_dispatcherThread(this, &LibEventServer::dispatch) {
|
||||
m_eventBase = event_base_new();
|
||||
m_server = evhttp_new(m_eventBase);
|
||||
|
||||
+25
-8
@@ -63,6 +63,22 @@ namespace HPHP {
|
||||
* store prepared jobs. With JobQueueDispatcher, job queue is normally empty
|
||||
* initially and new jobs are pushed into the queue over time. Also, workers
|
||||
* can be stopped individually.
|
||||
*
|
||||
* Job process ordering
|
||||
* ====================
|
||||
* By default, requests are processed in FIFO order.
|
||||
*
|
||||
* In addition, we support an option where the request processing order can flip
|
||||
* between FIFO or LIFO based on the length of the queue. This can be enabled by
|
||||
* setting the 'lifoSwitchThreshold' parameter. If the job queue is configured
|
||||
* to be in FIFO mode, and the current queue length exceeds
|
||||
* lifoSwitchThreshold, then the workers will begin work on requests in LIFO
|
||||
* order until the queue size is below the threshold in which case we resume in
|
||||
* FIFO order. Setting the queue to be in LIFO mode initially will have the
|
||||
* opposite behavior. This is useful when we are in a loaded situation and we
|
||||
* want to prioritize the newest requests.
|
||||
*
|
||||
* You can configure a LIFO ordered queue by setting lifoSwitchThreshold to 0.
|
||||
*/
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
@@ -87,11 +103,11 @@ public:
|
||||
* Constructor.
|
||||
*/
|
||||
JobQueue(int threadCount, bool threadRoundRobin, int dropCacheTimeout,
|
||||
bool dropStack, bool lifo)
|
||||
bool dropStack, int lifoSwitchThreshold=INT_MAX)
|
||||
: SynchronizableMulti(threadRoundRobin ? 1 : threadCount),
|
||||
m_jobCount(0), m_stopped(false), m_workerCount(0),
|
||||
m_dropCacheTimeout(dropCacheTimeout), m_dropStack(dropStack),
|
||||
m_lifo(lifo) {
|
||||
m_lifoSwitchThreshold(lifoSwitchThreshold) {
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -133,7 +149,8 @@ public:
|
||||
}
|
||||
if (inc) incActiveWorker();
|
||||
m_jobCount = m_jobs.size() - 1;
|
||||
if (m_lifo) {
|
||||
|
||||
if (m_jobCount >= m_lifoSwitchThreshold) {
|
||||
TJob job = m_jobs.back();
|
||||
m_jobs.pop_back();
|
||||
return job;
|
||||
@@ -182,18 +199,18 @@ public:
|
||||
int m_workerCount;
|
||||
int m_dropCacheTimeout;
|
||||
bool m_dropStack;
|
||||
bool m_lifo;
|
||||
int m_lifoSwitchThreshold;
|
||||
};
|
||||
|
||||
template<class TJob, class Policy>
|
||||
struct JobQueue<TJob,true,Policy> : JobQueue<TJob,false,Policy> {
|
||||
JobQueue(int threadCount, bool threadRoundRobin, int dropCacheTimeout,
|
||||
bool dropStack, bool lifo) :
|
||||
bool dropStack, int lifoSwitchThreshold=INT_MAX) :
|
||||
JobQueue<TJob,false,Policy>(threadCount,
|
||||
threadRoundRobin,
|
||||
dropCacheTimeout,
|
||||
dropStack,
|
||||
lifo) {
|
||||
lifoSwitchThreshold) {
|
||||
pthread_cond_init(&m_cond, nullptr);
|
||||
}
|
||||
~JobQueue() {
|
||||
@@ -321,11 +338,11 @@ public:
|
||||
*/
|
||||
JobQueueDispatcher(int threadCount, bool threadRoundRobin,
|
||||
int dropCacheTimeout, bool dropStack, void *opaque,
|
||||
bool lifo = false)
|
||||
int lifoSwitchThreshold = INT_MAX)
|
||||
: m_stopped(true), m_id(0), m_opaque(opaque),
|
||||
m_maxThreadCount(threadCount),
|
||||
m_queue(threadCount, threadRoundRobin, dropCacheTimeout, dropStack,
|
||||
lifo) {
|
||||
lifoSwitchThreshold) {
|
||||
assert(threadCount >= 1);
|
||||
if (!TWorker::CountActive) {
|
||||
// If TWorker does not support counting the number of
|
||||
|
||||
@@ -0,0 +1,68 @@
|
||||
/*
|
||||
+----------------------------------------------------------------------+
|
||||
| HipHop for PHP |
|
||||
+----------------------------------------------------------------------+
|
||||
| Copyright (c) 2010-2013 Facebook, Inc. (http://www.facebook.com) |
|
||||
+----------------------------------------------------------------------+
|
||||
| This source file is subject to version 3.01 of the PHP license, |
|
||||
| that is bundled with this package in the file LICENSE, and is |
|
||||
| available through the world-wide-web at the following url: |
|
||||
| http://www.php.net/license/3_01.txt |
|
||||
| If you did not receive a copy of the PHP license and are unable to |
|
||||
| obtain it through the world-wide-web, please send a note to |
|
||||
| license@php.net so we can mail you a copy immediately. |
|
||||
+----------------------------------------------------------------------+
|
||||
*/
|
||||
#include "hphp/util/job_queue.h"
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
namespace HPHP {
|
||||
|
||||
TEST(JobQueue, Ordering) {
|
||||
{
|
||||
// FIFO only.
|
||||
JobQueue<int> job_queue(1, false, 0, false);
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
job_queue.enqueue(i);
|
||||
}
|
||||
|
||||
EXPECT_EQ(100, job_queue.getQueuedJobs());
|
||||
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
EXPECT_EQ(i, job_queue.dequeue(0));
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// LIFO only.
|
||||
JobQueue<int> job_queue(1, false, 0, false, 0);
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
job_queue.enqueue(i);
|
||||
}
|
||||
|
||||
EXPECT_EQ(100, job_queue.getQueuedJobs());
|
||||
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
EXPECT_EQ(100 - i - 1, job_queue.dequeue(0));
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// Hybrid. First do 50 LIFO, then 50 FIFO.
|
||||
JobQueue<int> job_queue(1, false, 0, false, 50);
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
job_queue.enqueue(i);
|
||||
}
|
||||
|
||||
EXPECT_EQ(100, job_queue.getQueuedJobs());
|
||||
|
||||
for (int i = 0; i < 50; ++i) {
|
||||
EXPECT_EQ(100 - i - 1, job_queue.dequeue(0));
|
||||
}
|
||||
for (int i = 0; i < 50; ++i) {
|
||||
EXPECT_EQ(i, job_queue.dequeue(0));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Referência em uma Nova Issue
Bloquear um usuário