From 797a6822d0f230b235817e077f34c56374f65cbc Mon Sep 17 00:00:00 2001 From: Jan Oravec Date: Sat, 15 Jun 2013 19:57:55 -0700 Subject: [PATCH] Move external thread event queue logic from AsioSession to AsioExternalThreadEventQueue External thread event queueing logic is big enough that it deserves its own class. Move the code and rename members to better reflect the underlying operations. Implementation is otherwise unchanged. --- hphp/runtime/ext/asio/asio_context.cpp | 5 +- .../ext/asio/asio_external_thread_event.cpp | 4 +- .../ext/asio/asio_external_thread_event.h | 2 +- .../asio/asio_external_thread_event_queue.cpp | 78 +++++++++++++++++++ .../asio/asio_external_thread_event_queue.h | 55 +++++++++++++ hphp/runtime/ext/asio/asio_session.cpp | 52 +------------ hphp/runtime/ext/asio/asio_session.h | 22 ++---- .../external_thread_event_wait_handle.cpp | 5 +- 8 files changed, 148 insertions(+), 75 deletions(-) create mode 100644 hphp/runtime/ext/asio/asio_external_thread_event_queue.cpp create mode 100644 hphp/runtime/ext/asio/asio_external_thread_event_queue.h diff --git a/hphp/runtime/ext/asio/asio_context.cpp b/hphp/runtime/ext/asio/asio_context.cpp index 0fc47a08e..dccf94306 100644 --- a/hphp/runtime/ext/asio/asio_context.cpp +++ b/hphp/runtime/ext/asio/asio_context.cpp @@ -17,6 +17,7 @@ #include "hphp/runtime/ext/asio/asio_context.h" #include "hphp/runtime/ext/ext_asio.h" +#include "hphp/runtime/ext/asio/asio_external_thread_event_queue.h" #include "hphp/runtime/ext/asio/asio_session.h" #include "hphp/system/systemlib.h" @@ -104,7 +105,7 @@ void AsioContext::runUntil(c_WaitableWaitHandle* wait_handle) { // process ready external thread events once per 256 other events // (when 8-bit check_ete_counter overflows) if (!++check_ete_counter) { - auto ete_wh = session->getReadyExternalThreadEvents(); + auto ete_wh = session->getExternalThreadEventQueue()->tryConsumeMulti(); while (ete_wh) { auto next_wh = ete_wh->getNextToProcess(); ete_wh->process(); @@ -134,7 +135,7 @@ void AsioContext::runUntil(c_WaitableWaitHandle* wait_handle) { // pending external thread events? wait for at least one to become ready if (!m_externalThreadEvents.empty()) { // all your wait time are belong to us - auto ete_wh = session->waitForExternalThreadEvents(); + auto ete_wh = session->getExternalThreadEventQueue()->consumeMulti(); while (ete_wh) { auto next_wh = ete_wh->getNextToProcess(); ete_wh->process(); diff --git a/hphp/runtime/ext/asio/asio_external_thread_event.cpp b/hphp/runtime/ext/asio/asio_external_thread_event.cpp index fa9458ed7..40bfc5b57 100644 --- a/hphp/runtime/ext/asio/asio_external_thread_event.cpp +++ b/hphp/runtime/ext/asio/asio_external_thread_event.cpp @@ -24,7 +24,7 @@ namespace HPHP { /////////////////////////////////////////////////////////////////////////////// AsioExternalThreadEvent::AsioExternalThreadEvent(ObjectData* priv_data) - : m_session(AsioSession::Get()), + : m_queue(AsioSession::Get()->getExternalThreadEventQueue()), m_state(Waiting) { m_waitHandle = c_ExternalThreadEventWaitHandle::Create(this, priv_data); } @@ -50,7 +50,7 @@ void AsioExternalThreadEvent::markAsFinished() { uint32_t/*state_t*/ expected(Waiting); if (m_state.compare_exchange_strong(expected, Finished)) { // transfer ownership - m_session->enqueueExternalThreadEvent(m_waitHandle); + m_queue->produce(m_waitHandle); } else { // web request died, destroy object assert(expected == Canceled); diff --git a/hphp/runtime/ext/asio/asio_external_thread_event.h b/hphp/runtime/ext/asio/asio_external_thread_event.h index 60747c76f..8b1a1fd8c 100644 --- a/hphp/runtime/ext/asio/asio_external_thread_event.h +++ b/hphp/runtime/ext/asio/asio_external_thread_event.h @@ -281,7 +281,7 @@ class AsioExternalThreadEvent { Abandoned, }; - AsioSession* m_session; + AsioExternalThreadEventQueue* m_queue; c_ExternalThreadEventWaitHandle* m_waitHandle; std::atomic m_state; }; diff --git a/hphp/runtime/ext/asio/asio_external_thread_event_queue.cpp b/hphp/runtime/ext/asio/asio_external_thread_event_queue.cpp new file mode 100644 index 000000000..5e50324e9 --- /dev/null +++ b/hphp/runtime/ext/asio/asio_external_thread_event_queue.cpp @@ -0,0 +1,78 @@ +/* + +----------------------------------------------------------------------+ + | HipHop for PHP | + +----------------------------------------------------------------------+ + | Copyright (c) 2010-2013 Facebook, Inc. (http://www.facebook.com) | + | Copyright (c) 1997-2010 The PHP Group | + +----------------------------------------------------------------------+ + | This source file is subject to version 3.01 of the PHP license, | + | that is bundled with this package in the file LICENSE, and is | + | available through the world-wide-web at the following url: | + | http://www.php.net/license/3_01.txt | + | If you did not receive a copy of the PHP license and are unable to | + | obtain it through the world-wide-web, please send a note to | + | license@php.net so we can mail you a copy immediately. | + +----------------------------------------------------------------------+ +*/ + +#include "hphp/runtime/ext/asio/asio_external_thread_event_queue.h" +#include "hphp/runtime/ext/ext_asio.h" +#include "hphp/system/systemlib.h" + +namespace HPHP { +/////////////////////////////////////////////////////////////////////////////// + +AsioExternalThreadEventQueue::AsioExternalThreadEventQueue() + : m_queue(nullptr), m_queueMutex(), m_queueCondition() { +} + +c_ExternalThreadEventWaitHandle* AsioExternalThreadEventQueue::consumeMulti() { + // try check for ready external thread events without grabbing lock + auto ready = m_queue.exchange(nullptr); + if (ready != nullptr) { + assert(ready != k_consumerWaiting); + return ready; + } + + // no ready external thread events available, synchronization needed + std::unique_lock lock(m_queueMutex); + + // transition from empty to WAITING + if (m_queue.compare_exchange_strong(ready, k_consumerWaiting)) { + // wait for transition from WAITING to non-empty + do { + m_queueCondition.wait(lock); + } while (m_queue.load() == k_consumerWaiting); + } else { + // external thread transitioned from empty to non-empty while grabbing lock + } + + ready = m_queue.exchange(nullptr); + assert(ready != nullptr); + assert(ready != k_consumerWaiting); + return ready; +} + +void AsioExternalThreadEventQueue::produce(c_ExternalThreadEventWaitHandle* wait_handle) { + auto next = m_queue.load(); + while (true) { + while (next != k_consumerWaiting) { + wait_handle->setNextToProcess(next); + if (m_queue.compare_exchange_weak(next, wait_handle)) { + return; + } + } + + // try to transition from WAITING to non-empty + wait_handle->setNextToProcess(nullptr); + if (m_queue.compare_exchange_weak(next, wait_handle)) { + // succeeded, notify condition + std::unique_lock lock(m_queueMutex); + m_queueCondition.notify_one(); + return; + } + } +} + +/////////////////////////////////////////////////////////////////////////////// +} diff --git a/hphp/runtime/ext/asio/asio_external_thread_event_queue.h b/hphp/runtime/ext/asio/asio_external_thread_event_queue.h new file mode 100644 index 000000000..787be95b9 --- /dev/null +++ b/hphp/runtime/ext/asio/asio_external_thread_event_queue.h @@ -0,0 +1,55 @@ +/* + +----------------------------------------------------------------------+ + | HipHop for PHP | + +----------------------------------------------------------------------+ + | Copyright (c) 2010-2013 Facebook, Inc. (http://www.facebook.com) | + | Copyright (c) 1997-2010 The PHP Group | + +----------------------------------------------------------------------+ + | This source file is subject to version 3.01 of the PHP license, | + | that is bundled with this package in the file LICENSE, and is | + | available through the world-wide-web at the following url: | + | http://www.php.net/license/3_01.txt | + | If you did not receive a copy of the PHP license and are unable to | + | obtain it through the world-wide-web, please send a note to | + | license@php.net so we can mail you a copy immediately. | + +----------------------------------------------------------------------+ +*/ + +#ifndef incl_HPHP_EXT_ASIO_EXTERNAL_THREAD_EVENT_QUEUE_H_ +#define incl_HPHP_EXT_ASIO_EXTERNAL_THREAD_EVENT_QUEUE_H_ + +#include "hphp/runtime/base/base_includes.h" +#include +#include +#include + +namespace HPHP { +/////////////////////////////////////////////////////////////////////////////// + +FORWARD_DECLARE_CLASS_BUILTIN(ExternalThreadEventWaitHandle); + +class AsioExternalThreadEventQueue { + public: + AsioExternalThreadEventQueue(); + + c_ExternalThreadEventWaitHandle* tryConsumeMulti() { + auto ready = m_queue.exchange(nullptr); + assert(ready != k_consumerWaiting); + return ready; + } + + c_ExternalThreadEventWaitHandle* consumeMulti(); + void produce(c_ExternalThreadEventWaitHandle* wait_handle); + + private: + static constexpr auto k_consumerWaiting = static_cast((void*)1L); + + std::atomic m_queue; + std::mutex m_queueMutex; + std::condition_variable m_queueCondition; +}; + +/////////////////////////////////////////////////////////////////////////////// +} + +#endif // incl_HPHP_EXT_ASIO_EXTERNAL_THREAD_EVENT_QUEUE_H_ diff --git a/hphp/runtime/ext/asio/asio_session.cpp b/hphp/runtime/ext/asio/asio_session.cpp index 5ea6267cd..d742ed1b3 100644 --- a/hphp/runtime/ext/asio/asio_session.cpp +++ b/hphp/runtime/ext/asio/asio_session.cpp @@ -33,9 +33,7 @@ void AsioSession::Init() { } AsioSession::AsioSession() - : m_contexts(), m_readyExternalThreadEvents(nullptr), - m_readyExternalThreadEventsMutex(), - m_readyExternalThreadEventsCondition() { + : m_contexts(), m_externalThreadEventQueue() { } void AsioSession::enterContext() { @@ -70,54 +68,6 @@ uint16_t AsioSession::getCurrentWaitHandleDepth() { return isInContext() ? getCurrentWaitHandle()->getDepth() : 0; } -c_ExternalThreadEventWaitHandle* AsioSession::waitForExternalThreadEvents() { - // try check for ready external thread events without grabbing lock - auto ready = m_readyExternalThreadEvents.exchange(nullptr); - if (ready != nullptr) { - assert(ready != k_waitingForExternalThreadEvents); - return ready; - } - - // no ready external thread events available, synchronization needed - std::unique_lock lock(m_readyExternalThreadEventsMutex); - - // transition from empty to WAITING - if (m_readyExternalThreadEvents.compare_exchange_strong(ready, k_waitingForExternalThreadEvents)) { - // wait for transition from WAITING to non-empty - do { - m_readyExternalThreadEventsCondition.wait(lock); - } while (m_readyExternalThreadEvents.load() == k_waitingForExternalThreadEvents); - } else { - // external thread transitioned from empty to non-empty while grabbing lock - } - - ready = m_readyExternalThreadEvents.exchange(nullptr); - assert(ready != nullptr); - assert(ready != k_waitingForExternalThreadEvents); - return ready; -} - -void AsioSession::enqueueExternalThreadEvent(c_ExternalThreadEventWaitHandle* wait_handle) { - auto next = m_readyExternalThreadEvents.load(); - while (true) { - while (next != k_waitingForExternalThreadEvents) { - wait_handle->setNextToProcess(next); - if (m_readyExternalThreadEvents.compare_exchange_weak(next, wait_handle)) { - return; - } - } - - // try to transition from WAITING to non-empty - wait_handle->setNextToProcess(nullptr); - if (m_readyExternalThreadEvents.compare_exchange_weak(next, wait_handle)) { - // succeeded, notify condition - std::unique_lock lock(m_readyExternalThreadEventsMutex); - m_readyExternalThreadEventsCondition.notify_one(); - return; - } - } -} - void AsioSession::initAbruptInterruptException() { assert(!hasAbruptInterruptException()); m_abruptInterruptException = SystemLib::AllocInvalidOperationExceptionObject( diff --git a/hphp/runtime/ext/asio/asio_session.h b/hphp/runtime/ext/asio/asio_session.h index 74bd8bd91..db59c2544 100644 --- a/hphp/runtime/ext/asio/asio_session.h +++ b/hphp/runtime/ext/asio/asio_session.h @@ -18,11 +18,9 @@ #ifndef incl_HPHP_EXT_ASIO_SESSION_H_ #define incl_HPHP_EXT_ASIO_SESSION_H_ -#include -#include -#include #include "hphp/runtime/base/base_includes.h" #include "hphp/runtime/ext/asio/asio_context.h" +#include "hphp/runtime/ext/asio/asio_external_thread_event_queue.h" #include "hphp/runtime/ext/ext_closure.h" namespace HPHP { @@ -33,7 +31,6 @@ FORWARD_DECLARE_CLASS_BUILTIN(GenArrayWaitHandle); FORWARD_DECLARE_CLASS_BUILTIN(GenVectorWaitHandle); FORWARD_DECLARE_CLASS_BUILTIN(SetResultToRefWaitHandle); FORWARD_DECLARE_CLASS_BUILTIN(ContinuationWaitHandle); -FORWARD_DECLARE_CLASS_BUILTIN(ExternalThreadEventWaitHandle); class AsioSession { public: @@ -73,16 +70,11 @@ class AsioSession { uint16_t getCurrentWaitHandleDepth(); - // external thread event - c_ExternalThreadEventWaitHandle* getReadyExternalThreadEvents() { - auto ready = m_readyExternalThreadEvents.exchange(nullptr); - assert(ready != k_waitingForExternalThreadEvents); - return ready; + // external thread events + AsioExternalThreadEventQueue* getExternalThreadEventQueue() { + return &m_externalThreadEventQueue; } - c_ExternalThreadEventWaitHandle* waitForExternalThreadEvents(); - void enqueueExternalThreadEvent(c_ExternalThreadEventWaitHandle* wait_handle); - // abrupt interrupt exception CObjRef getAbruptInterruptException() { return m_abruptInterruptException; @@ -163,15 +155,11 @@ class AsioSession { private: static DECLARE_THREAD_LOCAL_PROXY(AsioSession, false, s_current); - static constexpr c_ExternalThreadEventWaitHandle* k_waitingForExternalThreadEvents = static_cast((void*)1L); - AsioSession(); smart::vector m_contexts; - std::atomic m_readyExternalThreadEvents; - std::mutex m_readyExternalThreadEventsMutex; - std::condition_variable m_readyExternalThreadEventsCondition; + AsioExternalThreadEventQueue m_externalThreadEventQueue; Object m_abruptInterruptException; diff --git a/hphp/runtime/ext/asio/external_thread_event_wait_handle.cpp b/hphp/runtime/ext/asio/external_thread_event_wait_handle.cpp index b71551de4..736a9dfc6 100644 --- a/hphp/runtime/ext/asio/external_thread_event_wait_handle.cpp +++ b/hphp/runtime/ext/asio/external_thread_event_wait_handle.cpp @@ -17,6 +17,7 @@ #include "hphp/runtime/ext/ext_asio.h" #include "hphp/runtime/ext/asio/asio_external_thread_event.h" +#include "hphp/runtime/ext/asio/asio_external_thread_event_queue.h" #include "hphp/runtime/ext/asio/asio_session.h" #include "hphp/system/systemlib.h" @@ -43,10 +44,10 @@ void c_ExternalThreadEventWaitHandle::sweep() { } // event has finished, but process() was not called yet - auto session = AsioSession::Get(); + auto queue = AsioSession::Get()->getExternalThreadEventQueue(); bool done = false; do { - auto ete_wh = session->waitForExternalThreadEvents(); + auto ete_wh = queue->consumeMulti(); while (ete_wh) { done |= ete_wh == this; auto next_wh = ete_wh->getNextToProcess();