From c6f4e6b3c7fe238a52a00f0aab8603b4824ea7c2 Mon Sep 17 00:00:00 2001 From: Jan Oravec Date: Sat, 15 Jun 2013 22:48:48 -0700 Subject: [PATCH] Recover from C++ exceptions in AsioExternalThreadEvent::unserialize(), part 2 AsioExternalThreadEvent::unserialize() may legally throw PHP exceptions. When that happens, construction of PHP exceptions reenters VM, surprised flag is checked and pending C++ exceptions may be thrown. Let's make sure the queue of received events in runUntil() is not lost. Move this responsibility to the AsioExternalThreadEventQueue. --- hphp/runtime/ext/asio/asio_context.cpp | 22 +++--- .../ext/asio/asio_external_thread_event.cpp | 2 +- .../asio/asio_external_thread_event_queue.cpp | 79 +++++++++++++++---- .../asio/asio_external_thread_event_queue.h | 14 ++-- .../external_thread_event_wait_handle.cpp | 15 ++-- 5 files changed, 89 insertions(+), 43 deletions(-) diff --git a/hphp/runtime/ext/asio/asio_context.cpp b/hphp/runtime/ext/asio/asio_context.cpp index dccf94306..a1d7a2939 100644 --- a/hphp/runtime/ext/asio/asio_context.cpp +++ b/hphp/runtime/ext/asio/asio_context.cpp @@ -105,11 +105,10 @@ void AsioContext::runUntil(c_WaitableWaitHandle* wait_handle) { // process ready external thread events once per 256 other events // (when 8-bit check_ete_counter overflows) if (!++check_ete_counter) { - auto ete_wh = session->getExternalThreadEventQueue()->tryConsumeMulti(); - while (ete_wh) { - auto next_wh = ete_wh->getNextToProcess(); - ete_wh->process(); - ete_wh = next_wh; + // queue may contain received unprocessed events from failed runUntil() + auto queue = session->getExternalThreadEventQueue(); + if (UNLIKELY(queue->hasReceived()) || queue->tryReceiveSome()) { + queue->processAllReceived(); } } @@ -134,13 +133,14 @@ void AsioContext::runUntil(c_WaitableWaitHandle* wait_handle) { // pending external thread events? wait for at least one to become ready if (!m_externalThreadEvents.empty()) { - // all your wait time are belong to us - auto ete_wh = session->getExternalThreadEventQueue()->consumeMulti(); - while (ete_wh) { - auto next_wh = ete_wh->getNextToProcess(); - ete_wh->process(); - ete_wh = next_wh; + // queue may contain received unprocessed events from failed runUntil() + auto queue = session->getExternalThreadEventQueue(); + if (LIKELY(!queue->hasReceived())) { + // all your wait time are belong to us + queue->receiveSome(); } + + queue->processAllReceived(); continue; } diff --git a/hphp/runtime/ext/asio/asio_external_thread_event.cpp b/hphp/runtime/ext/asio/asio_external_thread_event.cpp index 40bfc5b57..d52cd9e32 100644 --- a/hphp/runtime/ext/asio/asio_external_thread_event.cpp +++ b/hphp/runtime/ext/asio/asio_external_thread_event.cpp @@ -50,7 +50,7 @@ void AsioExternalThreadEvent::markAsFinished() { uint32_t/*state_t*/ expected(Waiting); if (m_state.compare_exchange_strong(expected, Finished)) { // transfer ownership - m_queue->produce(m_waitHandle); + m_queue->send(m_waitHandle); } else { // web request died, destroy object assert(expected == Canceled); diff --git a/hphp/runtime/ext/asio/asio_external_thread_event_queue.cpp b/hphp/runtime/ext/asio/asio_external_thread_event_queue.cpp index 5e50324e9..3c14ba7ca 100644 --- a/hphp/runtime/ext/asio/asio_external_thread_event_queue.cpp +++ b/hphp/runtime/ext/asio/asio_external_thread_event_queue.cpp @@ -23,22 +23,71 @@ namespace HPHP { /////////////////////////////////////////////////////////////////////////////// AsioExternalThreadEventQueue::AsioExternalThreadEventQueue() - : m_queue(nullptr), m_queueMutex(), m_queueCondition() { + : m_received(nullptr), m_queue(nullptr), m_queueMutex(), + m_queueCondition() { } -c_ExternalThreadEventWaitHandle* AsioExternalThreadEventQueue::consumeMulti() { - // try check for ready external thread events without grabbing lock - auto ready = m_queue.exchange(nullptr); - if (ready != nullptr) { - assert(ready != k_consumerWaiting); - return ready; +/** + * Process all received finished events. + * + * May throw C++ exception that may leave some events unprocessed. + */ +void AsioExternalThreadEventQueue::processAllReceived() { + assert(m_received); + do { + auto ete_wh = m_received; + m_received = m_received->getNextToProcess(); + ete_wh->process(); + } while (m_received); +} + +/** + * Abandon all received finished events. + * + * Returns true iff provided wait handle was abandoned. + */ +bool AsioExternalThreadEventQueue::abandonAllReceived(c_ExternalThreadEventWaitHandle* wait_handle) { + assert(m_received); + bool seen = false; + do { + auto ete_wh = m_received; + m_received = m_received->getNextToProcess(); + ete_wh->abandon(true); + seen |= ete_wh == wait_handle; + } while (m_received); + return seen; +} + +/** + * Try to receive finished events without blocking. + * + * Returns true iff at least one event was received. + */ +bool AsioExternalThreadEventQueue::tryReceiveSome() { + assert(!m_received); + m_received = m_queue.exchange(nullptr); + assert(m_received != k_consumerWaiting); + return m_received; +} + +/** + * Receive at least one finished event. Block if necessary. + */ +void AsioExternalThreadEventQueue::receiveSome() { + assert(!m_received); + + // try receive external thread events without grabbing lock + m_received = m_queue.exchange(nullptr); + if (m_received) { + assert(m_received != k_consumerWaiting); + return; } - // no ready external thread events available, synchronization needed + // no external thread events received, synchronization needed std::unique_lock lock(m_queueMutex); // transition from empty to WAITING - if (m_queue.compare_exchange_strong(ready, k_consumerWaiting)) { + if (m_queue.compare_exchange_strong(m_received, k_consumerWaiting)) { // wait for transition from WAITING to non-empty do { m_queueCondition.wait(lock); @@ -47,13 +96,15 @@ c_ExternalThreadEventWaitHandle* AsioExternalThreadEventQueue::consumeMulti() { // external thread transitioned from empty to non-empty while grabbing lock } - ready = m_queue.exchange(nullptr); - assert(ready != nullptr); - assert(ready != k_consumerWaiting); - return ready; + m_received = m_queue.exchange(nullptr); + assert(m_received); + assert(m_received != k_consumerWaiting); } -void AsioExternalThreadEventQueue::produce(c_ExternalThreadEventWaitHandle* wait_handle) { +/** + * Send finished event from the processing thread to the web request thread. + */ +void AsioExternalThreadEventQueue::send(c_ExternalThreadEventWaitHandle* wait_handle) { auto next = m_queue.load(); while (true) { while (next != k_consumerWaiting) { diff --git a/hphp/runtime/ext/asio/asio_external_thread_event_queue.h b/hphp/runtime/ext/asio/asio_external_thread_event_queue.h index 787be95b9..4aac6ff6f 100644 --- a/hphp/runtime/ext/asio/asio_external_thread_event_queue.h +++ b/hphp/runtime/ext/asio/asio_external_thread_event_queue.h @@ -32,18 +32,18 @@ class AsioExternalThreadEventQueue { public: AsioExternalThreadEventQueue(); - c_ExternalThreadEventWaitHandle* tryConsumeMulti() { - auto ready = m_queue.exchange(nullptr); - assert(ready != k_consumerWaiting); - return ready; - } + bool hasReceived() { return m_received; } + void processAllReceived(); + bool abandonAllReceived(c_ExternalThreadEventWaitHandle* wait_handle); - c_ExternalThreadEventWaitHandle* consumeMulti(); - void produce(c_ExternalThreadEventWaitHandle* wait_handle); + bool tryReceiveSome(); + void receiveSome(); + void send(c_ExternalThreadEventWaitHandle* wait_handle); private: static constexpr auto k_consumerWaiting = static_cast((void*)1L); + c_ExternalThreadEventWaitHandle* m_received; std::atomic m_queue; std::mutex m_queueMutex; std::condition_variable m_queueCondition; diff --git a/hphp/runtime/ext/asio/external_thread_event_wait_handle.cpp b/hphp/runtime/ext/asio/external_thread_event_wait_handle.cpp index 736a9dfc6..a6e7b149d 100644 --- a/hphp/runtime/ext/asio/external_thread_event_wait_handle.cpp +++ b/hphp/runtime/ext/asio/external_thread_event_wait_handle.cpp @@ -45,16 +45,11 @@ void c_ExternalThreadEventWaitHandle::sweep() { // event has finished, but process() was not called yet auto queue = AsioSession::Get()->getExternalThreadEventQueue(); - bool done = false; - do { - auto ete_wh = queue->consumeMulti(); - while (ete_wh) { - done |= ete_wh == this; - auto next_wh = ete_wh->getNextToProcess(); - ete_wh->abandon(true); - ete_wh = next_wh; - } - } while (!done); + bool done = queue->hasReceived() && queue->abandonAllReceived(this); + while (!done) { + queue->receiveSome(); + done = queue->abandonAllReceived(this); + } } void c_ExternalThreadEventWaitHandle::t___construct() {