From 090ca545ff08d946e8465dd666b4924682ff43c7 Mon Sep 17 00:00:00 2001 From: mwilliams Date: Sun, 23 Jun 2013 16:57:06 -0700 Subject: [PATCH] Fix shutdown crashes in xbox/pagelet server Various code tested to see if xbox was enabled by the config, but what it really cared about was whether there was an xbox dispatcher. During shutdown we stop the XboxServer before the main server, so there's a short window where the XboxServer is enabled by the config, but there is no dispatcher. If a running thread tries to run xbox code, it would crash. Switch the tests to test s_dispatcher and hold a lock, rather than testing the config. Fix similar issues with PageletServer, (although we don't currently shut it down until process exit - I'll post a follow up diff to fix that). --- hphp/runtime/base/server/pagelet_server.cpp | 53 +++++++----- hphp/runtime/base/server/xbox_server.cpp | 92 ++++++++++++--------- hphp/runtime/base/server/xbox_server.h | 5 -- 3 files changed, 84 insertions(+), 66 deletions(-) diff --git a/hphp/runtime/base/server/pagelet_server.cpp b/hphp/runtime/base/server/pagelet_server.cpp index 8017db1d4..22794e0ff 100644 --- a/hphp/runtime/base/server/pagelet_server.cpp +++ b/hphp/runtime/base/server/pagelet_server.cpp @@ -282,20 +282,24 @@ StaticString PageletTask::s_class_name("PageletTask"); // implementing PageletServer static JobQueueDispatcher *s_dispatcher; +static Mutex s_dispatchMutex; bool PageletServer::Enabled() { - return RuntimeOption::PageletServerThreadCount > 0; + return s_dispatcher; } void PageletServer::Restart() { Stop(); if (RuntimeOption::PageletServerThreadCount > 0) { - s_dispatcher = new JobQueueDispatcher - (RuntimeOption::PageletServerThreadCount, - RuntimeOption::PageletServerThreadRoundRobin, - RuntimeOption::PageletServerThreadDropCacheTimeoutSeconds, - RuntimeOption::PageletServerThreadDropStack, - nullptr); + { + Lock l(s_dispatchMutex); + s_dispatcher = new JobQueueDispatcher + (RuntimeOption::PageletServerThreadCount, + RuntimeOption::PageletServerThreadRoundRobin, + RuntimeOption::PageletServerThreadDropCacheTimeoutSeconds, + RuntimeOption::PageletServerThreadDropStack, + nullptr); + } Logger::Info("pagelet server started"); s_dispatcher->start(); } @@ -304,6 +308,7 @@ void PageletServer::Restart() { void PageletServer::Stop() { if (s_dispatcher) { s_dispatcher->stop(); + Lock l(s_dispatchMutex); delete s_dispatcher; s_dispatcher = nullptr; } @@ -313,22 +318,28 @@ Object PageletServer::TaskStart(CStrRef url, CArrRef headers, CStrRef remote_host, CStrRef post_data /* = null_string */, CArrRef files /* = null_array */) { - if (RuntimeOption::PageletServerThreadCount <= 0) { - return null_object; - } - if (RuntimeOption::PageletServerQueueLimit > 0 && - s_dispatcher->getQueuedJobs() > RuntimeOption::PageletServerQueueLimit) { - return null_object; + { + Lock l(s_dispatchMutex); + if (!s_dispatcher) { + return null_object; + } + if (RuntimeOption::PageletServerQueueLimit > 0 && + s_dispatcher->getQueuedJobs() > + RuntimeOption::PageletServerQueueLimit) { + return null_object; + } } PageletTask *task = NEWOBJ(PageletTask)(url, headers, remote_host, post_data, get_uploaded_files(), files); Object ret(task); PageletTransport *job = task->getJob(); - job->incRefCount(); // paired with worker's decRefCount() - assert(s_dispatcher); - s_dispatcher->enqueue(job); - - return ret; + Lock l(s_dispatchMutex); + if (s_dispatcher) { + job->incRefCount(); // paired with worker's decRefCount() + s_dispatcher->enqueue(job); + return ret; + } + return null_object; } int64_t PageletServer::TaskStatus(CObjRef task) { @@ -358,11 +369,13 @@ void PageletServer::AddToPipeline(const string &s) { } int PageletServer::GetActiveWorker() { - return s_dispatcher->getActiveWorker(); + Lock l(s_dispatchMutex); + return s_dispatcher ? s_dispatcher->getActiveWorker() : 0; } int PageletServer::GetQueuedJobs() { - return s_dispatcher->getQueuedJobs(); + Lock l(s_dispatchMutex); + return s_dispatcher ? s_dispatcher->getQueuedJobs() : 0; } /////////////////////////////////////////////////////////////////////////////// diff --git a/hphp/runtime/base/server/xbox_server.cpp b/hphp/runtime/base/server/xbox_server.cpp index dac9f2732..62a85aa53 100644 --- a/hphp/runtime/base/server/xbox_server.cpp +++ b/hphp/runtime/base/server/xbox_server.cpp @@ -202,17 +202,21 @@ private: /////////////////////////////////////////////////////////////////////////////// static JobQueueDispatcher *s_dispatcher; +static Mutex s_dispatchMutex; void XboxServer::Restart() { Stop(); if (RuntimeOption::XboxServerThreadCount > 0) { - s_dispatcher = new JobQueueDispatcher - (RuntimeOption::XboxServerThreadCount, - RuntimeOption::ServerThreadRoundRobin, - RuntimeOption::ServerThreadDropCacheTimeoutSeconds, - RuntimeOption::ServerThreadDropStack, - nullptr); + { + Lock l(s_dispatchMutex); + s_dispatcher = new JobQueueDispatcher + (RuntimeOption::XboxServerThreadCount, + RuntimeOption::ServerThreadRoundRobin, + RuntimeOption::ServerThreadDropCacheTimeoutSeconds, + RuntimeOption::ServerThreadDropStack, + nullptr); + } if (RuntimeOption::XboxServerLogInfo) { Logger::Info("xbox server started"); } @@ -223,6 +227,8 @@ void XboxServer::Restart() { void XboxServer::Stop() { if (s_dispatcher) { s_dispatcher->stop(); + + Lock l(s_dispatchMutex); delete s_dispatcher; s_dispatcher = nullptr; } @@ -242,17 +248,20 @@ const StaticString bool XboxServer::SendMessage(CStrRef message, Variant &ret, int timeout_ms, CStrRef host /* = "localhost" */) { if (isLocalHost(host)) { + XboxTransport *job; + { + Lock l(s_dispatchMutex); + if (!s_dispatcher) { + return false; + } - if (RuntimeOption::XboxServerThreadCount <= 0) { - return false; + job = new XboxTransport(message); + job->incRefCount(); // paired with worker's decRefCount() + job->incRefCount(); // paired with decRefCount() at below + assert(s_dispatcher); + s_dispatcher->enqueue(job); } - XboxTransport *job = new XboxTransport(message); - job->incRefCount(); // paired with worker's decRefCount() - job->incRefCount(); // paired with decRefCount() at below - assert(s_dispatcher); - s_dispatcher->enqueue(job); - if (timeout_ms <= 0) { timeout_ms = RuntimeOption::XboxDefaultLocalTimeoutMilliSeconds; } @@ -314,8 +323,8 @@ bool XboxServer::SendMessage(CStrRef message, Variant &ret, int timeout_ms, bool XboxServer::PostMessage(CStrRef message, CStrRef host /* = "localhost" */) { if (isLocalHost(host)) { - - if (RuntimeOption::XboxServerThreadCount <= 0) { + Lock l(s_dispatchMutex); + if (!s_dispatcher) { return false; } @@ -381,36 +390,37 @@ StaticString XboxTask::s_class_name("XboxTask"); /////////////////////////////////////////////////////////////////////////////// -bool XboxServer::Available() { - return s_dispatcher->getActiveWorker() < +Object XboxServer::TaskStart(CStrRef msg, CStrRef reqInitDoc /* = "" */) { + { + Lock l(s_dispatchMutex); + if (s_dispatcher && + (s_dispatcher->getActiveWorker() < RuntimeOption::XboxServerThreadCount || s_dispatcher->getQueuedJobs() < - RuntimeOption::XboxServerMaxQueueLength; -} + RuntimeOption::XboxServerMaxQueueLength)) { + XboxTask *task = NEWOBJ(XboxTask)(msg, reqInitDoc); + Object ret(task); + XboxTransport *job = task->getJob(); + job->incRefCount(); // paired with worker's decRefCount() + Transport *transport = g_context->getTransport(); + if (transport) { + job->setHost(transport->getHeader("Host")); + } + assert(s_dispatcher); + s_dispatcher->enqueue(job); -Object XboxServer::TaskStart(CStrRef msg, CStrRef reqInitDoc /* = "" */) { - bool xboxEnabled = (RuntimeOption::XboxServerThreadCount > 0); - if (!xboxEnabled || !Available()) { - const char* errMsg = (xboxEnabled ? - "Cannot create new Xbox task because the Xbox queue has " - "reached maximum capacity" : - "Cannot create new Xbox task because the Xbox is not enabled"); - Object e = SystemLib::AllocExceptionObject(errMsg); - throw_exception(e); - return Object(); + return ret; + } } - XboxTask *task = NEWOBJ(XboxTask)(msg, reqInitDoc); - Object ret(task); - XboxTransport *job = task->getJob(); - job->incRefCount(); // paired with worker's decRefCount() - Transport *transport = g_context->getTransport(); - if (transport) { - job->setHost(transport->getHeader("Host")); - } - assert(s_dispatcher); - s_dispatcher->enqueue(job); + const char* errMsg = + (RuntimeOption::XboxServerThreadCount > 0 ? + "Cannot create new Xbox task because the Xbox queue has " + "reached maximum capacity" : + "Cannot create new Xbox task because the Xbox is not enabled"); - return ret; + Object e = SystemLib::AllocExceptionObject(errMsg); + throw_exception(e); + return Object(); } bool XboxServer::TaskStatus(CObjRef task) { diff --git a/hphp/runtime/base/server/xbox_server.h b/hphp/runtime/base/server/xbox_server.h index 113116baf..be9be6379 100644 --- a/hphp/runtime/base/server/xbox_server.h +++ b/hphp/runtime/base/server/xbox_server.h @@ -44,11 +44,6 @@ public: CStrRef host = "localhost"); static bool PostMessage(CStrRef message, CStrRef host = "localhost"); - /** - * Check whether all the the xbox threads are busy - */ - static bool Available(); - /** * Local tasklet for parallel processing. */