Move external thread event queue logic from AsioSession to AsioExternalThreadEventQueue

External thread event queueing logic is big enough that it deserves its
own class. Move the code and rename members to better reflect the
underlying operations. Implementation is otherwise unchanged.
Esse commit está contido em:
Jan Oravec
2013-06-15 19:57:55 -07:00
commit de Sara Golemon
commit 797a6822d0
8 arquivos alterados com 148 adições e 75 exclusões
+3 -2
Ver Arquivo
@@ -17,6 +17,7 @@
#include "hphp/runtime/ext/asio/asio_context.h"
#include "hphp/runtime/ext/ext_asio.h"
#include "hphp/runtime/ext/asio/asio_external_thread_event_queue.h"
#include "hphp/runtime/ext/asio/asio_session.h"
#include "hphp/system/systemlib.h"
@@ -104,7 +105,7 @@ void AsioContext::runUntil(c_WaitableWaitHandle* wait_handle) {
// process ready external thread events once per 256 other events
// (when 8-bit check_ete_counter overflows)
if (!++check_ete_counter) {
auto ete_wh = session->getReadyExternalThreadEvents();
auto ete_wh = session->getExternalThreadEventQueue()->tryConsumeMulti();
while (ete_wh) {
auto next_wh = ete_wh->getNextToProcess();
ete_wh->process();
@@ -134,7 +135,7 @@ void AsioContext::runUntil(c_WaitableWaitHandle* wait_handle) {
// pending external thread events? wait for at least one to become ready
if (!m_externalThreadEvents.empty()) {
// all your wait time are belong to us
auto ete_wh = session->waitForExternalThreadEvents();
auto ete_wh = session->getExternalThreadEventQueue()->consumeMulti();
while (ete_wh) {
auto next_wh = ete_wh->getNextToProcess();
ete_wh->process();
@@ -24,7 +24,7 @@ namespace HPHP {
///////////////////////////////////////////////////////////////////////////////
AsioExternalThreadEvent::AsioExternalThreadEvent(ObjectData* priv_data)
: m_session(AsioSession::Get()),
: m_queue(AsioSession::Get()->getExternalThreadEventQueue()),
m_state(Waiting) {
m_waitHandle = c_ExternalThreadEventWaitHandle::Create(this, priv_data);
}
@@ -50,7 +50,7 @@ void AsioExternalThreadEvent::markAsFinished() {
uint32_t/*state_t*/ expected(Waiting);
if (m_state.compare_exchange_strong(expected, Finished)) {
// transfer ownership
m_session->enqueueExternalThreadEvent(m_waitHandle);
m_queue->produce(m_waitHandle);
} else {
// web request died, destroy object
assert(expected == Canceled);
@@ -281,7 +281,7 @@ class AsioExternalThreadEvent {
Abandoned,
};
AsioSession* m_session;
AsioExternalThreadEventQueue* m_queue;
c_ExternalThreadEventWaitHandle* m_waitHandle;
std::atomic<uint32_t/*state_t*/> m_state;
};
@@ -0,0 +1,78 @@
/*
+----------------------------------------------------------------------+
| HipHop for PHP |
+----------------------------------------------------------------------+
| Copyright (c) 2010-2013 Facebook, Inc. (http://www.facebook.com) |
| Copyright (c) 1997-2010 The PHP Group |
+----------------------------------------------------------------------+
| This source file is subject to version 3.01 of the PHP license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.php.net/license/3_01.txt |
| If you did not receive a copy of the PHP license and are unable to |
| obtain it through the world-wide-web, please send a note to |
| license@php.net so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
*/
#include "hphp/runtime/ext/asio/asio_external_thread_event_queue.h"
#include "hphp/runtime/ext/ext_asio.h"
#include "hphp/system/systemlib.h"
namespace HPHP {
///////////////////////////////////////////////////////////////////////////////
AsioExternalThreadEventQueue::AsioExternalThreadEventQueue()
: m_queue(nullptr), m_queueMutex(), m_queueCondition() {
}
c_ExternalThreadEventWaitHandle* AsioExternalThreadEventQueue::consumeMulti() {
// try check for ready external thread events without grabbing lock
auto ready = m_queue.exchange(nullptr);
if (ready != nullptr) {
assert(ready != k_consumerWaiting);
return ready;
}
// no ready external thread events available, synchronization needed
std::unique_lock<std::mutex> lock(m_queueMutex);
// transition from empty to WAITING
if (m_queue.compare_exchange_strong(ready, k_consumerWaiting)) {
// wait for transition from WAITING to non-empty
do {
m_queueCondition.wait(lock);
} while (m_queue.load() == k_consumerWaiting);
} else {
// external thread transitioned from empty to non-empty while grabbing lock
}
ready = m_queue.exchange(nullptr);
assert(ready != nullptr);
assert(ready != k_consumerWaiting);
return ready;
}
void AsioExternalThreadEventQueue::produce(c_ExternalThreadEventWaitHandle* wait_handle) {
auto next = m_queue.load();
while (true) {
while (next != k_consumerWaiting) {
wait_handle->setNextToProcess(next);
if (m_queue.compare_exchange_weak(next, wait_handle)) {
return;
}
}
// try to transition from WAITING to non-empty
wait_handle->setNextToProcess(nullptr);
if (m_queue.compare_exchange_weak(next, wait_handle)) {
// succeeded, notify condition
std::unique_lock<std::mutex> lock(m_queueMutex);
m_queueCondition.notify_one();
return;
}
}
}
///////////////////////////////////////////////////////////////////////////////
}
@@ -0,0 +1,55 @@
/*
+----------------------------------------------------------------------+
| HipHop for PHP |
+----------------------------------------------------------------------+
| Copyright (c) 2010-2013 Facebook, Inc. (http://www.facebook.com) |
| Copyright (c) 1997-2010 The PHP Group |
+----------------------------------------------------------------------+
| This source file is subject to version 3.01 of the PHP license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.php.net/license/3_01.txt |
| If you did not receive a copy of the PHP license and are unable to |
| obtain it through the world-wide-web, please send a note to |
| license@php.net so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
*/
#ifndef incl_HPHP_EXT_ASIO_EXTERNAL_THREAD_EVENT_QUEUE_H_
#define incl_HPHP_EXT_ASIO_EXTERNAL_THREAD_EVENT_QUEUE_H_
#include "hphp/runtime/base/base_includes.h"
#include <atomic>
#include <condition_variable>
#include <mutex>
namespace HPHP {
///////////////////////////////////////////////////////////////////////////////
FORWARD_DECLARE_CLASS_BUILTIN(ExternalThreadEventWaitHandle);
class AsioExternalThreadEventQueue {
public:
AsioExternalThreadEventQueue();
c_ExternalThreadEventWaitHandle* tryConsumeMulti() {
auto ready = m_queue.exchange(nullptr);
assert(ready != k_consumerWaiting);
return ready;
}
c_ExternalThreadEventWaitHandle* consumeMulti();
void produce(c_ExternalThreadEventWaitHandle* wait_handle);
private:
static constexpr auto k_consumerWaiting = static_cast<c_ExternalThreadEventWaitHandle*>((void*)1L);
std::atomic<c_ExternalThreadEventWaitHandle*> m_queue;
std::mutex m_queueMutex;
std::condition_variable m_queueCondition;
};
///////////////////////////////////////////////////////////////////////////////
}
#endif // incl_HPHP_EXT_ASIO_EXTERNAL_THREAD_EVENT_QUEUE_H_
+1 -51
Ver Arquivo
@@ -33,9 +33,7 @@ void AsioSession::Init() {
}
AsioSession::AsioSession()
: m_contexts(), m_readyExternalThreadEvents(nullptr),
m_readyExternalThreadEventsMutex(),
m_readyExternalThreadEventsCondition() {
: m_contexts(), m_externalThreadEventQueue() {
}
void AsioSession::enterContext() {
@@ -70,54 +68,6 @@ uint16_t AsioSession::getCurrentWaitHandleDepth() {
return isInContext() ? getCurrentWaitHandle()->getDepth() : 0;
}
c_ExternalThreadEventWaitHandle* AsioSession::waitForExternalThreadEvents() {
// try check for ready external thread events without grabbing lock
auto ready = m_readyExternalThreadEvents.exchange(nullptr);
if (ready != nullptr) {
assert(ready != k_waitingForExternalThreadEvents);
return ready;
}
// no ready external thread events available, synchronization needed
std::unique_lock<std::mutex> lock(m_readyExternalThreadEventsMutex);
// transition from empty to WAITING
if (m_readyExternalThreadEvents.compare_exchange_strong(ready, k_waitingForExternalThreadEvents)) {
// wait for transition from WAITING to non-empty
do {
m_readyExternalThreadEventsCondition.wait(lock);
} while (m_readyExternalThreadEvents.load() == k_waitingForExternalThreadEvents);
} else {
// external thread transitioned from empty to non-empty while grabbing lock
}
ready = m_readyExternalThreadEvents.exchange(nullptr);
assert(ready != nullptr);
assert(ready != k_waitingForExternalThreadEvents);
return ready;
}
void AsioSession::enqueueExternalThreadEvent(c_ExternalThreadEventWaitHandle* wait_handle) {
auto next = m_readyExternalThreadEvents.load();
while (true) {
while (next != k_waitingForExternalThreadEvents) {
wait_handle->setNextToProcess(next);
if (m_readyExternalThreadEvents.compare_exchange_weak(next, wait_handle)) {
return;
}
}
// try to transition from WAITING to non-empty
wait_handle->setNextToProcess(nullptr);
if (m_readyExternalThreadEvents.compare_exchange_weak(next, wait_handle)) {
// succeeded, notify condition
std::unique_lock<std::mutex> lock(m_readyExternalThreadEventsMutex);
m_readyExternalThreadEventsCondition.notify_one();
return;
}
}
}
void AsioSession::initAbruptInterruptException() {
assert(!hasAbruptInterruptException());
m_abruptInterruptException = SystemLib::AllocInvalidOperationExceptionObject(
+5 -17
Ver Arquivo
@@ -18,11 +18,9 @@
#ifndef incl_HPHP_EXT_ASIO_SESSION_H_
#define incl_HPHP_EXT_ASIO_SESSION_H_
#include <atomic>
#include <condition_variable>
#include <mutex>
#include "hphp/runtime/base/base_includes.h"
#include "hphp/runtime/ext/asio/asio_context.h"
#include "hphp/runtime/ext/asio/asio_external_thread_event_queue.h"
#include "hphp/runtime/ext/ext_closure.h"
namespace HPHP {
@@ -33,7 +31,6 @@ FORWARD_DECLARE_CLASS_BUILTIN(GenArrayWaitHandle);
FORWARD_DECLARE_CLASS_BUILTIN(GenVectorWaitHandle);
FORWARD_DECLARE_CLASS_BUILTIN(SetResultToRefWaitHandle);
FORWARD_DECLARE_CLASS_BUILTIN(ContinuationWaitHandle);
FORWARD_DECLARE_CLASS_BUILTIN(ExternalThreadEventWaitHandle);
class AsioSession {
public:
@@ -73,16 +70,11 @@ class AsioSession {
uint16_t getCurrentWaitHandleDepth();
// external thread event
c_ExternalThreadEventWaitHandle* getReadyExternalThreadEvents() {
auto ready = m_readyExternalThreadEvents.exchange(nullptr);
assert(ready != k_waitingForExternalThreadEvents);
return ready;
// external thread events
AsioExternalThreadEventQueue* getExternalThreadEventQueue() {
return &m_externalThreadEventQueue;
}
c_ExternalThreadEventWaitHandle* waitForExternalThreadEvents();
void enqueueExternalThreadEvent(c_ExternalThreadEventWaitHandle* wait_handle);
// abrupt interrupt exception
CObjRef getAbruptInterruptException() {
return m_abruptInterruptException;
@@ -163,15 +155,11 @@ class AsioSession {
private:
static DECLARE_THREAD_LOCAL_PROXY(AsioSession, false, s_current);
static constexpr c_ExternalThreadEventWaitHandle* k_waitingForExternalThreadEvents = static_cast<c_ExternalThreadEventWaitHandle*>((void*)1L);
AsioSession();
smart::vector<AsioContext*> m_contexts;
std::atomic<c_ExternalThreadEventWaitHandle*> m_readyExternalThreadEvents;
std::mutex m_readyExternalThreadEventsMutex;
std::condition_variable m_readyExternalThreadEventsCondition;
AsioExternalThreadEventQueue m_externalThreadEventQueue;
Object m_abruptInterruptException;
@@ -17,6 +17,7 @@
#include "hphp/runtime/ext/ext_asio.h"
#include "hphp/runtime/ext/asio/asio_external_thread_event.h"
#include "hphp/runtime/ext/asio/asio_external_thread_event_queue.h"
#include "hphp/runtime/ext/asio/asio_session.h"
#include "hphp/system/systemlib.h"
@@ -43,10 +44,10 @@ void c_ExternalThreadEventWaitHandle::sweep() {
}
// event has finished, but process() was not called yet
auto session = AsioSession::Get();
auto queue = AsioSession::Get()->getExternalThreadEventQueue();
bool done = false;
do {
auto ete_wh = session->waitForExternalThreadEvents();
auto ete_wh = queue->consumeMulti();
while (ete_wh) {
done |= ete_wh == this;
auto next_wh = ete_wh->getNextToProcess();