diff --git a/hphp/runtime/base/server/pagelet_server.cpp b/hphp/runtime/base/server/pagelet_server.cpp index 8017db1d4..22794e0ff 100644 --- a/hphp/runtime/base/server/pagelet_server.cpp +++ b/hphp/runtime/base/server/pagelet_server.cpp @@ -282,20 +282,24 @@ StaticString PageletTask::s_class_name("PageletTask"); // implementing PageletServer static JobQueueDispatcher *s_dispatcher; +static Mutex s_dispatchMutex; bool PageletServer::Enabled() { - return RuntimeOption::PageletServerThreadCount > 0; + return s_dispatcher; } void PageletServer::Restart() { Stop(); if (RuntimeOption::PageletServerThreadCount > 0) { - s_dispatcher = new JobQueueDispatcher - (RuntimeOption::PageletServerThreadCount, - RuntimeOption::PageletServerThreadRoundRobin, - RuntimeOption::PageletServerThreadDropCacheTimeoutSeconds, - RuntimeOption::PageletServerThreadDropStack, - nullptr); + { + Lock l(s_dispatchMutex); + s_dispatcher = new JobQueueDispatcher + (RuntimeOption::PageletServerThreadCount, + RuntimeOption::PageletServerThreadRoundRobin, + RuntimeOption::PageletServerThreadDropCacheTimeoutSeconds, + RuntimeOption::PageletServerThreadDropStack, + nullptr); + } Logger::Info("pagelet server started"); s_dispatcher->start(); } @@ -304,6 +308,7 @@ void PageletServer::Restart() { void PageletServer::Stop() { if (s_dispatcher) { s_dispatcher->stop(); + Lock l(s_dispatchMutex); delete s_dispatcher; s_dispatcher = nullptr; } @@ -313,22 +318,28 @@ Object PageletServer::TaskStart(CStrRef url, CArrRef headers, CStrRef remote_host, CStrRef post_data /* = null_string */, CArrRef files /* = null_array */) { - if (RuntimeOption::PageletServerThreadCount <= 0) { - return null_object; - } - if (RuntimeOption::PageletServerQueueLimit > 0 && - s_dispatcher->getQueuedJobs() > RuntimeOption::PageletServerQueueLimit) { - return null_object; + { + Lock l(s_dispatchMutex); + if (!s_dispatcher) { + return null_object; + } + if (RuntimeOption::PageletServerQueueLimit > 0 && + s_dispatcher->getQueuedJobs() > + RuntimeOption::PageletServerQueueLimit) { + return null_object; + } } PageletTask *task = NEWOBJ(PageletTask)(url, headers, remote_host, post_data, get_uploaded_files(), files); Object ret(task); PageletTransport *job = task->getJob(); - job->incRefCount(); // paired with worker's decRefCount() - assert(s_dispatcher); - s_dispatcher->enqueue(job); - - return ret; + Lock l(s_dispatchMutex); + if (s_dispatcher) { + job->incRefCount(); // paired with worker's decRefCount() + s_dispatcher->enqueue(job); + return ret; + } + return null_object; } int64_t PageletServer::TaskStatus(CObjRef task) { @@ -358,11 +369,13 @@ void PageletServer::AddToPipeline(const string &s) { } int PageletServer::GetActiveWorker() { - return s_dispatcher->getActiveWorker(); + Lock l(s_dispatchMutex); + return s_dispatcher ? s_dispatcher->getActiveWorker() : 0; } int PageletServer::GetQueuedJobs() { - return s_dispatcher->getQueuedJobs(); + Lock l(s_dispatchMutex); + return s_dispatcher ? s_dispatcher->getQueuedJobs() : 0; } /////////////////////////////////////////////////////////////////////////////// diff --git a/hphp/runtime/base/server/xbox_server.cpp b/hphp/runtime/base/server/xbox_server.cpp index dac9f2732..62a85aa53 100644 --- a/hphp/runtime/base/server/xbox_server.cpp +++ b/hphp/runtime/base/server/xbox_server.cpp @@ -202,17 +202,21 @@ private: /////////////////////////////////////////////////////////////////////////////// static JobQueueDispatcher *s_dispatcher; +static Mutex s_dispatchMutex; void XboxServer::Restart() { Stop(); if (RuntimeOption::XboxServerThreadCount > 0) { - s_dispatcher = new JobQueueDispatcher - (RuntimeOption::XboxServerThreadCount, - RuntimeOption::ServerThreadRoundRobin, - RuntimeOption::ServerThreadDropCacheTimeoutSeconds, - RuntimeOption::ServerThreadDropStack, - nullptr); + { + Lock l(s_dispatchMutex); + s_dispatcher = new JobQueueDispatcher + (RuntimeOption::XboxServerThreadCount, + RuntimeOption::ServerThreadRoundRobin, + RuntimeOption::ServerThreadDropCacheTimeoutSeconds, + RuntimeOption::ServerThreadDropStack, + nullptr); + } if (RuntimeOption::XboxServerLogInfo) { Logger::Info("xbox server started"); } @@ -223,6 +227,8 @@ void XboxServer::Restart() { void XboxServer::Stop() { if (s_dispatcher) { s_dispatcher->stop(); + + Lock l(s_dispatchMutex); delete s_dispatcher; s_dispatcher = nullptr; } @@ -242,17 +248,20 @@ const StaticString bool XboxServer::SendMessage(CStrRef message, Variant &ret, int timeout_ms, CStrRef host /* = "localhost" */) { if (isLocalHost(host)) { + XboxTransport *job; + { + Lock l(s_dispatchMutex); + if (!s_dispatcher) { + return false; + } - if (RuntimeOption::XboxServerThreadCount <= 0) { - return false; + job = new XboxTransport(message); + job->incRefCount(); // paired with worker's decRefCount() + job->incRefCount(); // paired with decRefCount() at below + assert(s_dispatcher); + s_dispatcher->enqueue(job); } - XboxTransport *job = new XboxTransport(message); - job->incRefCount(); // paired with worker's decRefCount() - job->incRefCount(); // paired with decRefCount() at below - assert(s_dispatcher); - s_dispatcher->enqueue(job); - if (timeout_ms <= 0) { timeout_ms = RuntimeOption::XboxDefaultLocalTimeoutMilliSeconds; } @@ -314,8 +323,8 @@ bool XboxServer::SendMessage(CStrRef message, Variant &ret, int timeout_ms, bool XboxServer::PostMessage(CStrRef message, CStrRef host /* = "localhost" */) { if (isLocalHost(host)) { - - if (RuntimeOption::XboxServerThreadCount <= 0) { + Lock l(s_dispatchMutex); + if (!s_dispatcher) { return false; } @@ -381,36 +390,37 @@ StaticString XboxTask::s_class_name("XboxTask"); /////////////////////////////////////////////////////////////////////////////// -bool XboxServer::Available() { - return s_dispatcher->getActiveWorker() < +Object XboxServer::TaskStart(CStrRef msg, CStrRef reqInitDoc /* = "" */) { + { + Lock l(s_dispatchMutex); + if (s_dispatcher && + (s_dispatcher->getActiveWorker() < RuntimeOption::XboxServerThreadCount || s_dispatcher->getQueuedJobs() < - RuntimeOption::XboxServerMaxQueueLength; -} + RuntimeOption::XboxServerMaxQueueLength)) { + XboxTask *task = NEWOBJ(XboxTask)(msg, reqInitDoc); + Object ret(task); + XboxTransport *job = task->getJob(); + job->incRefCount(); // paired with worker's decRefCount() + Transport *transport = g_context->getTransport(); + if (transport) { + job->setHost(transport->getHeader("Host")); + } + assert(s_dispatcher); + s_dispatcher->enqueue(job); -Object XboxServer::TaskStart(CStrRef msg, CStrRef reqInitDoc /* = "" */) { - bool xboxEnabled = (RuntimeOption::XboxServerThreadCount > 0); - if (!xboxEnabled || !Available()) { - const char* errMsg = (xboxEnabled ? - "Cannot create new Xbox task because the Xbox queue has " - "reached maximum capacity" : - "Cannot create new Xbox task because the Xbox is not enabled"); - Object e = SystemLib::AllocExceptionObject(errMsg); - throw_exception(e); - return Object(); + return ret; + } } - XboxTask *task = NEWOBJ(XboxTask)(msg, reqInitDoc); - Object ret(task); - XboxTransport *job = task->getJob(); - job->incRefCount(); // paired with worker's decRefCount() - Transport *transport = g_context->getTransport(); - if (transport) { - job->setHost(transport->getHeader("Host")); - } - assert(s_dispatcher); - s_dispatcher->enqueue(job); + const char* errMsg = + (RuntimeOption::XboxServerThreadCount > 0 ? + "Cannot create new Xbox task because the Xbox queue has " + "reached maximum capacity" : + "Cannot create new Xbox task because the Xbox is not enabled"); - return ret; + Object e = SystemLib::AllocExceptionObject(errMsg); + throw_exception(e); + return Object(); } bool XboxServer::TaskStatus(CObjRef task) { diff --git a/hphp/runtime/base/server/xbox_server.h b/hphp/runtime/base/server/xbox_server.h index 113116baf..be9be6379 100644 --- a/hphp/runtime/base/server/xbox_server.h +++ b/hphp/runtime/base/server/xbox_server.h @@ -44,11 +44,6 @@ public: CStrRef host = "localhost"); static bool PostMessage(CStrRef message, CStrRef host = "localhost"); - /** - * Check whether all the the xbox threads are busy - */ - static bool Available(); - /** * Local tasklet for parallel processing. */