Recover from C++ exceptions in AsioExternalThreadEvent::unserialize(), part 2

AsioExternalThreadEvent::unserialize() may legally throw PHP exceptions.
When that happens, construction of PHP exceptions reenters VM, surprised
flag is checked and pending C++ exceptions may be thrown.

Let's make sure the queue of received events in runUntil() is not lost.
Move this responsibility to the AsioExternalThreadEventQueue.
Esse commit está contido em:
Jan Oravec
2013-06-15 22:48:48 -07:00
commit de Sara Golemon
commit c6f4e6b3c7
5 arquivos alterados com 89 adições e 43 exclusões
+11 -11
Ver Arquivo
@@ -105,11 +105,10 @@ void AsioContext::runUntil(c_WaitableWaitHandle* wait_handle) {
// process ready external thread events once per 256 other events
// (when 8-bit check_ete_counter overflows)
if (!++check_ete_counter) {
auto ete_wh = session->getExternalThreadEventQueue()->tryConsumeMulti();
while (ete_wh) {
auto next_wh = ete_wh->getNextToProcess();
ete_wh->process();
ete_wh = next_wh;
// queue may contain received unprocessed events from failed runUntil()
auto queue = session->getExternalThreadEventQueue();
if (UNLIKELY(queue->hasReceived()) || queue->tryReceiveSome()) {
queue->processAllReceived();
}
}
@@ -134,13 +133,14 @@ void AsioContext::runUntil(c_WaitableWaitHandle* wait_handle) {
// pending external thread events? wait for at least one to become ready
if (!m_externalThreadEvents.empty()) {
// all your wait time are belong to us
auto ete_wh = session->getExternalThreadEventQueue()->consumeMulti();
while (ete_wh) {
auto next_wh = ete_wh->getNextToProcess();
ete_wh->process();
ete_wh = next_wh;
// queue may contain received unprocessed events from failed runUntil()
auto queue = session->getExternalThreadEventQueue();
if (LIKELY(!queue->hasReceived())) {
// all your wait time are belong to us
queue->receiveSome();
}
queue->processAllReceived();
continue;
}
@@ -50,7 +50,7 @@ void AsioExternalThreadEvent::markAsFinished() {
uint32_t/*state_t*/ expected(Waiting);
if (m_state.compare_exchange_strong(expected, Finished)) {
// transfer ownership
m_queue->produce(m_waitHandle);
m_queue->send(m_waitHandle);
} else {
// web request died, destroy object
assert(expected == Canceled);
@@ -23,22 +23,71 @@ namespace HPHP {
///////////////////////////////////////////////////////////////////////////////
AsioExternalThreadEventQueue::AsioExternalThreadEventQueue()
: m_queue(nullptr), m_queueMutex(), m_queueCondition() {
: m_received(nullptr), m_queue(nullptr), m_queueMutex(),
m_queueCondition() {
}
c_ExternalThreadEventWaitHandle* AsioExternalThreadEventQueue::consumeMulti() {
// try check for ready external thread events without grabbing lock
auto ready = m_queue.exchange(nullptr);
if (ready != nullptr) {
assert(ready != k_consumerWaiting);
return ready;
/**
* Process all received finished events.
*
* May throw C++ exception that may leave some events unprocessed.
*/
void AsioExternalThreadEventQueue::processAllReceived() {
assert(m_received);
do {
auto ete_wh = m_received;
m_received = m_received->getNextToProcess();
ete_wh->process();
} while (m_received);
}
/**
* Abandon all received finished events.
*
* Returns true iff provided wait handle was abandoned.
*/
bool AsioExternalThreadEventQueue::abandonAllReceived(c_ExternalThreadEventWaitHandle* wait_handle) {
assert(m_received);
bool seen = false;
do {
auto ete_wh = m_received;
m_received = m_received->getNextToProcess();
ete_wh->abandon(true);
seen |= ete_wh == wait_handle;
} while (m_received);
return seen;
}
/**
* Try to receive finished events without blocking.
*
* Returns true iff at least one event was received.
*/
bool AsioExternalThreadEventQueue::tryReceiveSome() {
assert(!m_received);
m_received = m_queue.exchange(nullptr);
assert(m_received != k_consumerWaiting);
return m_received;
}
/**
* Receive at least one finished event. Block if necessary.
*/
void AsioExternalThreadEventQueue::receiveSome() {
assert(!m_received);
// try receive external thread events without grabbing lock
m_received = m_queue.exchange(nullptr);
if (m_received) {
assert(m_received != k_consumerWaiting);
return;
}
// no ready external thread events available, synchronization needed
// no external thread events received, synchronization needed
std::unique_lock<std::mutex> lock(m_queueMutex);
// transition from empty to WAITING
if (m_queue.compare_exchange_strong(ready, k_consumerWaiting)) {
if (m_queue.compare_exchange_strong(m_received, k_consumerWaiting)) {
// wait for transition from WAITING to non-empty
do {
m_queueCondition.wait(lock);
@@ -47,13 +96,15 @@ c_ExternalThreadEventWaitHandle* AsioExternalThreadEventQueue::consumeMulti() {
// external thread transitioned from empty to non-empty while grabbing lock
}
ready = m_queue.exchange(nullptr);
assert(ready != nullptr);
assert(ready != k_consumerWaiting);
return ready;
m_received = m_queue.exchange(nullptr);
assert(m_received);
assert(m_received != k_consumerWaiting);
}
void AsioExternalThreadEventQueue::produce(c_ExternalThreadEventWaitHandle* wait_handle) {
/**
* Send finished event from the processing thread to the web request thread.
*/
void AsioExternalThreadEventQueue::send(c_ExternalThreadEventWaitHandle* wait_handle) {
auto next = m_queue.load();
while (true) {
while (next != k_consumerWaiting) {
@@ -32,18 +32,18 @@ class AsioExternalThreadEventQueue {
public:
AsioExternalThreadEventQueue();
c_ExternalThreadEventWaitHandle* tryConsumeMulti() {
auto ready = m_queue.exchange(nullptr);
assert(ready != k_consumerWaiting);
return ready;
}
bool hasReceived() { return m_received; }
void processAllReceived();
bool abandonAllReceived(c_ExternalThreadEventWaitHandle* wait_handle);
c_ExternalThreadEventWaitHandle* consumeMulti();
void produce(c_ExternalThreadEventWaitHandle* wait_handle);
bool tryReceiveSome();
void receiveSome();
void send(c_ExternalThreadEventWaitHandle* wait_handle);
private:
static constexpr auto k_consumerWaiting = static_cast<c_ExternalThreadEventWaitHandle*>((void*)1L);
c_ExternalThreadEventWaitHandle* m_received;
std::atomic<c_ExternalThreadEventWaitHandle*> m_queue;
std::mutex m_queueMutex;
std::condition_variable m_queueCondition;
@@ -45,16 +45,11 @@ void c_ExternalThreadEventWaitHandle::sweep() {
// event has finished, but process() was not called yet
auto queue = AsioSession::Get()->getExternalThreadEventQueue();
bool done = false;
do {
auto ete_wh = queue->consumeMulti();
while (ete_wh) {
done |= ete_wh == this;
auto next_wh = ete_wh->getNextToProcess();
ete_wh->abandon(true);
ete_wh = next_wh;
}
} while (!done);
bool done = queue->hasReceived() && queue->abandonAllReceived(this);
while (!done) {
queue->receiveSome();
done = queue->abandonAllReceived(this);
}
}
void c_ExternalThreadEventWaitHandle::t___construct() {