Fix shutdown crashes in xbox/pagelet server

Various code tested to see if xbox was enabled by the
config, but what it really cared about was whether there was
an xbox dispatcher. During shutdown we stop the XboxServer before
the main server, so there's a short window where the XboxServer
is enabled by the config, but there is no dispatcher. If a running
thread tries to run xbox code, it would crash.

Switch the tests to test s_dispatcher and hold a lock, rather than
testing the config.

Fix similar issues with PageletServer, (although we don't currently
shut it down until process exit - I'll post a follow up diff to fix that).
Esse commit está contido em:
mwilliams
2013-06-23 16:57:06 -07:00
commit de Sara Golemon
commit 090ca545ff
3 arquivos alterados com 84 adições e 66 exclusões
+33 -20
Ver Arquivo
@@ -282,20 +282,24 @@ StaticString PageletTask::s_class_name("PageletTask");
// implementing PageletServer
static JobQueueDispatcher<PageletTransport*, PageletWorker> *s_dispatcher;
static Mutex s_dispatchMutex;
bool PageletServer::Enabled() {
return RuntimeOption::PageletServerThreadCount > 0;
return s_dispatcher;
}
void PageletServer::Restart() {
Stop();
if (RuntimeOption::PageletServerThreadCount > 0) {
s_dispatcher = new JobQueueDispatcher<PageletTransport*, PageletWorker>
(RuntimeOption::PageletServerThreadCount,
RuntimeOption::PageletServerThreadRoundRobin,
RuntimeOption::PageletServerThreadDropCacheTimeoutSeconds,
RuntimeOption::PageletServerThreadDropStack,
nullptr);
{
Lock l(s_dispatchMutex);
s_dispatcher = new JobQueueDispatcher<PageletTransport*, PageletWorker>
(RuntimeOption::PageletServerThreadCount,
RuntimeOption::PageletServerThreadRoundRobin,
RuntimeOption::PageletServerThreadDropCacheTimeoutSeconds,
RuntimeOption::PageletServerThreadDropStack,
nullptr);
}
Logger::Info("pagelet server started");
s_dispatcher->start();
}
@@ -304,6 +308,7 @@ void PageletServer::Restart() {
void PageletServer::Stop() {
if (s_dispatcher) {
s_dispatcher->stop();
Lock l(s_dispatchMutex);
delete s_dispatcher;
s_dispatcher = nullptr;
}
@@ -313,22 +318,28 @@ Object PageletServer::TaskStart(CStrRef url, CArrRef headers,
CStrRef remote_host,
CStrRef post_data /* = null_string */,
CArrRef files /* = null_array */) {
if (RuntimeOption::PageletServerThreadCount <= 0) {
return null_object;
}
if (RuntimeOption::PageletServerQueueLimit > 0 &&
s_dispatcher->getQueuedJobs() > RuntimeOption::PageletServerQueueLimit) {
return null_object;
{
Lock l(s_dispatchMutex);
if (!s_dispatcher) {
return null_object;
}
if (RuntimeOption::PageletServerQueueLimit > 0 &&
s_dispatcher->getQueuedJobs() >
RuntimeOption::PageletServerQueueLimit) {
return null_object;
}
}
PageletTask *task = NEWOBJ(PageletTask)(url, headers, remote_host, post_data,
get_uploaded_files(), files);
Object ret(task);
PageletTransport *job = task->getJob();
job->incRefCount(); // paired with worker's decRefCount()
assert(s_dispatcher);
s_dispatcher->enqueue(job);
return ret;
Lock l(s_dispatchMutex);
if (s_dispatcher) {
job->incRefCount(); // paired with worker's decRefCount()
s_dispatcher->enqueue(job);
return ret;
}
return null_object;
}
int64_t PageletServer::TaskStatus(CObjRef task) {
@@ -358,11 +369,13 @@ void PageletServer::AddToPipeline(const string &s) {
}
int PageletServer::GetActiveWorker() {
return s_dispatcher->getActiveWorker();
Lock l(s_dispatchMutex);
return s_dispatcher ? s_dispatcher->getActiveWorker() : 0;
}
int PageletServer::GetQueuedJobs() {
return s_dispatcher->getQueuedJobs();
Lock l(s_dispatchMutex);
return s_dispatcher ? s_dispatcher->getQueuedJobs() : 0;
}
///////////////////////////////////////////////////////////////////////////////
+51 -41
Ver Arquivo
@@ -202,17 +202,21 @@ private:
///////////////////////////////////////////////////////////////////////////////
static JobQueueDispatcher<XboxTransport*, XboxWorker> *s_dispatcher;
static Mutex s_dispatchMutex;
void XboxServer::Restart() {
Stop();
if (RuntimeOption::XboxServerThreadCount > 0) {
s_dispatcher = new JobQueueDispatcher<XboxTransport*, XboxWorker>
(RuntimeOption::XboxServerThreadCount,
RuntimeOption::ServerThreadRoundRobin,
RuntimeOption::ServerThreadDropCacheTimeoutSeconds,
RuntimeOption::ServerThreadDropStack,
nullptr);
{
Lock l(s_dispatchMutex);
s_dispatcher = new JobQueueDispatcher<XboxTransport*, XboxWorker>
(RuntimeOption::XboxServerThreadCount,
RuntimeOption::ServerThreadRoundRobin,
RuntimeOption::ServerThreadDropCacheTimeoutSeconds,
RuntimeOption::ServerThreadDropStack,
nullptr);
}
if (RuntimeOption::XboxServerLogInfo) {
Logger::Info("xbox server started");
}
@@ -223,6 +227,8 @@ void XboxServer::Restart() {
void XboxServer::Stop() {
if (s_dispatcher) {
s_dispatcher->stop();
Lock l(s_dispatchMutex);
delete s_dispatcher;
s_dispatcher = nullptr;
}
@@ -242,17 +248,20 @@ const StaticString
bool XboxServer::SendMessage(CStrRef message, Variant &ret, int timeout_ms,
CStrRef host /* = "localhost" */) {
if (isLocalHost(host)) {
XboxTransport *job;
{
Lock l(s_dispatchMutex);
if (!s_dispatcher) {
return false;
}
if (RuntimeOption::XboxServerThreadCount <= 0) {
return false;
job = new XboxTransport(message);
job->incRefCount(); // paired with worker's decRefCount()
job->incRefCount(); // paired with decRefCount() at below
assert(s_dispatcher);
s_dispatcher->enqueue(job);
}
XboxTransport *job = new XboxTransport(message);
job->incRefCount(); // paired with worker's decRefCount()
job->incRefCount(); // paired with decRefCount() at below
assert(s_dispatcher);
s_dispatcher->enqueue(job);
if (timeout_ms <= 0) {
timeout_ms = RuntimeOption::XboxDefaultLocalTimeoutMilliSeconds;
}
@@ -314,8 +323,8 @@ bool XboxServer::SendMessage(CStrRef message, Variant &ret, int timeout_ms,
bool XboxServer::PostMessage(CStrRef message,
CStrRef host /* = "localhost" */) {
if (isLocalHost(host)) {
if (RuntimeOption::XboxServerThreadCount <= 0) {
Lock l(s_dispatchMutex);
if (!s_dispatcher) {
return false;
}
@@ -381,36 +390,37 @@ StaticString XboxTask::s_class_name("XboxTask");
///////////////////////////////////////////////////////////////////////////////
bool XboxServer::Available() {
return s_dispatcher->getActiveWorker() <
Object XboxServer::TaskStart(CStrRef msg, CStrRef reqInitDoc /* = "" */) {
{
Lock l(s_dispatchMutex);
if (s_dispatcher &&
(s_dispatcher->getActiveWorker() <
RuntimeOption::XboxServerThreadCount ||
s_dispatcher->getQueuedJobs() <
RuntimeOption::XboxServerMaxQueueLength;
}
RuntimeOption::XboxServerMaxQueueLength)) {
XboxTask *task = NEWOBJ(XboxTask)(msg, reqInitDoc);
Object ret(task);
XboxTransport *job = task->getJob();
job->incRefCount(); // paired with worker's decRefCount()
Transport *transport = g_context->getTransport();
if (transport) {
job->setHost(transport->getHeader("Host"));
}
assert(s_dispatcher);
s_dispatcher->enqueue(job);
Object XboxServer::TaskStart(CStrRef msg, CStrRef reqInitDoc /* = "" */) {
bool xboxEnabled = (RuntimeOption::XboxServerThreadCount > 0);
if (!xboxEnabled || !Available()) {
const char* errMsg = (xboxEnabled ?
"Cannot create new Xbox task because the Xbox queue has "
"reached maximum capacity" :
"Cannot create new Xbox task because the Xbox is not enabled");
Object e = SystemLib::AllocExceptionObject(errMsg);
throw_exception(e);
return Object();
return ret;
}
}
XboxTask *task = NEWOBJ(XboxTask)(msg, reqInitDoc);
Object ret(task);
XboxTransport *job = task->getJob();
job->incRefCount(); // paired with worker's decRefCount()
Transport *transport = g_context->getTransport();
if (transport) {
job->setHost(transport->getHeader("Host"));
}
assert(s_dispatcher);
s_dispatcher->enqueue(job);
const char* errMsg =
(RuntimeOption::XboxServerThreadCount > 0 ?
"Cannot create new Xbox task because the Xbox queue has "
"reached maximum capacity" :
"Cannot create new Xbox task because the Xbox is not enabled");
return ret;
Object e = SystemLib::AllocExceptionObject(errMsg);
throw_exception(e);
return Object();
}
bool XboxServer::TaskStatus(CObjRef task) {
-5
Ver Arquivo
@@ -44,11 +44,6 @@ public:
CStrRef host = "localhost");
static bool PostMessage(CStrRef message, CStrRef host = "localhost");
/**
* Check whether all the the xbox threads are busy
*/
static bool Available();
/**
* Local tasklet for parallel processing.
*/