diff --git a/hphp/runtime/base/runtime_option.cpp b/hphp/runtime/base/runtime_option.cpp index 842abfaa7..5d511f2db 100644 --- a/hphp/runtime/base/runtime_option.cpp +++ b/hphp/runtime/base/runtime_option.cpp @@ -100,6 +100,9 @@ int RuntimeOption::ServerBacklog = 128; int RuntimeOption::ServerConnectionLimit = 0; int RuntimeOption::ServerThreadCount = 50; bool RuntimeOption::ServerThreadRoundRobin = false; +constexpr int kDefaultWarmupThrottleRequestCount = 0; +int RuntimeOption::ServerWarmupThrottleRequestCount = + kDefaultWarmupThrottleRequestCount; int RuntimeOption::ServerThreadDropCacheTimeoutSeconds = 0; bool RuntimeOption::ServerThreadJobLIFO = false; bool RuntimeOption::ServerThreadDropStack = false; @@ -664,6 +667,10 @@ void RuntimeOption::Load(Hdf &config, StringVec *overwrites /* = NULL */, ServerConnectionLimit = server["ConnectionLimit"].getInt16(0); ServerThreadCount = server["ThreadCount"].getInt32(50); ServerThreadRoundRobin = server["ThreadRoundRobin"].getBool(); + ServerWarmupThrottleRequestCount = + server["WarmupThrottleRequestCount"].getInt32( + kDefaultWarmupThrottleRequestCount + ); ServerThreadDropCacheTimeoutSeconds = server["ThreadDropCacheTimeoutSeconds"].getInt32(0); ServerThreadJobLIFO = server["ThreadJobLIFO"].getBool(); diff --git a/hphp/runtime/base/runtime_option.h b/hphp/runtime/base/runtime_option.h index 09b5a4466..fef5d7878 100644 --- a/hphp/runtime/base/runtime_option.h +++ b/hphp/runtime/base/runtime_option.h @@ -96,6 +96,7 @@ public: static int ServerBacklog; static int ServerConnectionLimit; static int ServerThreadCount; + static int ServerWarmupThrottleRequestCount; static bool ServerThreadRoundRobin; static int ServerThreadDropCacheTimeoutSeconds; static bool ServerThreadJobLIFO; diff --git a/hphp/runtime/base/server/http_server.cpp b/hphp/runtime/base/server/http_server.cpp index 0c42bb6fb..ad5d0973e 100644 --- a/hphp/runtime/base/server/http_server.cpp +++ b/hphp/runtime/base/server/http_server.cpp @@ -45,6 +45,8 @@ namespace HPHP { HttpServerPtr HttpServer::Server; time_t HttpServer::StartTime; +const int kNumProcessors = sysconf(_SC_NPROCESSORS_ONLN); + /////////////////////////////////////////////////////////////////////////////// HttpServer::HttpServer(void *sslCTX /* = NULL */) @@ -54,27 +56,48 @@ HttpServer::HttpServer(void *sslCTX /* = NULL */) // enabling mutex profiling, but it's not turned on LockProfiler::s_pfunc_profile = server_stats_log_mutex; + bool const useWarmupThrottle = + RuntimeOption::ServerWarmupThrottleRequestCount > 0 && + RuntimeOption::ServerThreadCount > kNumProcessors; + + auto maybeEnableThrottle = [&] (LibEventServer* s) { + if (!useWarmupThrottle) return; + + s->enableWarmupThrottle(RuntimeOption::ServerThreadCount - kNumProcessors, + RuntimeOption::ServerWarmupThrottleRequestCount); + Logger::Info("Starting with %d threads for the first %d requests\n", + kNumProcessors, + RuntimeOption::ServerWarmupThrottleRequestCount); + }; + + int const startingThreadCount = !useWarmupThrottle + ? RuntimeOption::ServerThreadCount + : kNumProcessors; + if (RuntimeOption::ServerPortFd != -1 || RuntimeOption::SSLPortFd != -1) { LibEventServerWithFd* server = (new TypedServer (RuntimeOption::ServerIP, RuntimeOption::ServerPort, - RuntimeOption::ServerThreadCount, + startingThreadCount, RuntimeOption::RequestTimeoutSeconds)); + maybeEnableThrottle(server); server->setServerSocketFd(RuntimeOption::ServerPortFd); server->setSSLSocketFd(RuntimeOption::SSLPortFd); m_pageServer = ServerPtr(server); } else if (RuntimeOption::TakeoverFilename.empty()) { - m_pageServer = ServerPtr - (new TypedServer + auto const server = new TypedServer (RuntimeOption::ServerIP, RuntimeOption::ServerPort, - RuntimeOption::ServerThreadCount, - RuntimeOption::RequestTimeoutSeconds)); + startingThreadCount, + RuntimeOption::RequestTimeoutSeconds); + maybeEnableThrottle(server); + m_pageServer = ServerPtr(server); } else { LibEventServerWithTakeover* server = (new TypedServer (RuntimeOption::ServerIP, RuntimeOption::ServerPort, - RuntimeOption::ServerThreadCount, + startingThreadCount, RuntimeOption::RequestTimeoutSeconds)); + maybeEnableThrottle(server); server->setTransferFilename(RuntimeOption::TakeoverFilename); server->addTakeoverListener(this); m_pageServer = ServerPtr(server); diff --git a/hphp/runtime/base/server/libevent_server.cpp b/hphp/runtime/base/server/libevent_server.cpp index 4976b33cd..4bf2c23b8 100644 --- a/hphp/runtime/base/server/libevent_server.cpp +++ b/hphp/runtime/base/server/libevent_server.cpp @@ -14,6 +14,8 @@ +----------------------------------------------------------------------+ */ +#include + #include #include #include @@ -87,6 +89,8 @@ void LibEventWorker::doJob(LibEventJobPtr job) { assert(m_opaque); LibEventServer *server = (LibEventServer*)m_opaque; + server->bumpReqCount(); + if (m_handler == nullptr || server->supportReset()) { m_handler = server->createRequestHandler(); assert(m_handler); @@ -153,6 +157,9 @@ void LibEventWorker::onThreadExit() { LibEventServer::LibEventServer(const std::string &address, int port, int thread, int timeoutSeconds) : Server(address, port, thread), + m_warmup_thread_slack(0), + m_req_number(0), + m_warmup_req_threshold(0), m_accept_sock(-1), m_accept_sock_ssl(-1), m_timeoutThreadData(timeoutSeconds), @@ -204,6 +211,24 @@ int LibEventServer::getLibEventConnectionCount() { return evhttp_get_connection_count(m_server); } +void LibEventServer::enableWarmupThrottle(int threadSlack, int reqCount) { + m_warmup_thread_slack = threadSlack; + const_cast(m_warmup_req_threshold) = reqCount; +} + +void LibEventServer::bumpReqCount() { + auto const oldReqNum = m_req_number.fetch_add(1, std::memory_order_relaxed); + if (oldReqNum == m_warmup_req_threshold) { + if (auto const num = m_warmup_thread_slack.load()) { + Logger::Info("Finished warmup; adding %d new worker threads\n", num); + m_dispatcher.addWorkers(num); + + // Set to zero so we can't do it if the req counter wraps. + m_warmup_thread_slack.store(0); + } + } +} + void LibEventServer::start() { if (getStatus() == RUNNING) return; diff --git a/hphp/runtime/base/server/libevent_server.h b/hphp/runtime/base/server/libevent_server.h index 607b56415..30d7b59fb 100644 --- a/hphp/runtime/base/server/libevent_server.h +++ b/hphp/runtime/base/server/libevent_server.h @@ -131,7 +131,15 @@ public: int timeoutSeconds); ~LibEventServer(); - // implemting Server + /* + * Function to enable reducing the thread count during initial + * warmup requests. + * + * See RuntimeOption::ServerWarmupThrottleRequestCount. + */ + void enableWarmupThrottle(int threadSlack, int reqCount); + + // implementing Server virtual void start(); virtual void waitForEnd(); virtual void stop(); @@ -175,6 +183,18 @@ protected: virtual int getAcceptSocket(); virtual int getAcceptSocketSSL(); +private: + // Number of threads to start when warmup request counter passes the + // throttled request threshold. + friend class LibEventWorker; + void bumpReqCount(); + +private: + std::atomic m_warmup_thread_slack; + std::atomic m_req_number; + int const m_warmup_req_threshold; + +protected: int m_accept_sock; int m_accept_sock_ssl; event_base *m_eventBase; diff --git a/hphp/util/job_queue.h b/hphp/util/job_queue.h index 3e7366160..0c3f65fbc 100644 --- a/hphp/util/job_queue.h +++ b/hphp/util/job_queue.h @@ -353,9 +353,11 @@ public: int getActiveWorker() { return m_queue.getActiveWorker(); } + int getQueuedJobs() { return m_queue.getQueuedJobs(); } + int getTargetNumWorkers() { if (TWorker::CountActive) { int target = getActiveWorker() + getQueuedJobs(); @@ -406,6 +408,17 @@ public: } } + /* + * Add N new worker threads. + */ + void addWorkers(int n) { + Lock lock(m_mutex); + if (m_stopped) return; + for (int i = 0; i < n; ++i) { + addWorkerImpl(true); + } + } + void getWorkers(std::vector &workers) { Lock lock(m_mutex); workers.insert(workers.end(), m_workers.begin(), m_workers.end());