GenMapWaitHandle - a BlockableWaitHandle that wait for a Map of WaitHandles
add GenMapWaitHandle, a compliment to GenVectorWaitHandle and GenArrayWaitHandle.
Esse commit está contido em:
@@ -148,6 +148,17 @@ void AsioSession::onGenArrayCreate(c_GenArrayWaitHandle* wait_handle, CVarRef de
|
||||
}
|
||||
}
|
||||
|
||||
void AsioSession::onGenMapCreate(c_GenMapWaitHandle* wait_handle, CVarRef dependencies) {
|
||||
assert(m_onGenMapCreateCallback.get());
|
||||
try {
|
||||
vm_call_user_func(
|
||||
m_onGenMapCreateCallback,
|
||||
CREATE_VECTOR2(wait_handle, dependencies));
|
||||
} catch (const Object& callback_exception) {
|
||||
raise_warning("[asio] Ignoring exception thrown by GenMapWaitHandle::onCreate callback");
|
||||
}
|
||||
}
|
||||
|
||||
void AsioSession::onGenVectorCreate(c_GenVectorWaitHandle* wait_handle, CVarRef dependencies) {
|
||||
assert(m_onGenVectorCreateCallback.get());
|
||||
try {
|
||||
|
||||
@@ -28,6 +28,7 @@ namespace HPHP {
|
||||
|
||||
FORWARD_DECLARE_CLASS_BUILTIN(WaitHandle);
|
||||
FORWARD_DECLARE_CLASS_BUILTIN(GenArrayWaitHandle);
|
||||
FORWARD_DECLARE_CLASS_BUILTIN(GenMapWaitHandle);
|
||||
FORWARD_DECLARE_CLASS_BUILTIN(GenVectorWaitHandle);
|
||||
FORWARD_DECLARE_CLASS_BUILTIN(SetResultToRefWaitHandle);
|
||||
FORWARD_DECLARE_CLASS_BUILTIN(ContinuationWaitHandle);
|
||||
@@ -135,6 +136,14 @@ class AsioSession {
|
||||
bool hasOnGenArrayCreateCallback() { return m_onGenArrayCreateCallback.get(); }
|
||||
void onGenArrayCreate(c_GenArrayWaitHandle* wait_handle, CVarRef dependencies);
|
||||
|
||||
// GenMapWaitHandle callbacks:
|
||||
void setOnGenMapCreateCallback(ObjectData* on_create) {
|
||||
assert(!on_create || on_create->instanceof(c_Closure::s_cls));
|
||||
m_onGenMapCreateCallback = on_create;
|
||||
}
|
||||
bool hasOnGenMapCreateCallback() { return m_onGenMapCreateCallback.get(); }
|
||||
void onGenMapCreate(c_GenMapWaitHandle* wait_handle, CVarRef dependencies);
|
||||
|
||||
// GenVectorWaitHandle callbacks:
|
||||
void setOnGenVectorCreateCallback(ObjectData* on_create) {
|
||||
assert(!on_create || on_create->instanceof(c_Closure::s_cls));
|
||||
@@ -168,6 +177,7 @@ class AsioSession {
|
||||
Object m_onContinuationSuccessCallback;
|
||||
Object m_onContinuationFailCallback;
|
||||
Object m_onGenArrayCreateCallback;
|
||||
Object m_onGenMapCreateCallback;
|
||||
Object m_onGenVectorCreateCallback;
|
||||
Object m_onSetResultToRefCreateCallback;
|
||||
Object m_onJoinCallback;
|
||||
|
||||
@@ -0,0 +1,240 @@
|
||||
/*
|
||||
+----------------------------------------------------------------------+
|
||||
| HipHop for PHP |
|
||||
+----------------------------------------------------------------------+
|
||||
| Copyright (c) 2010- Facebook, Inc. (http://www.facebook.com) |
|
||||
| Copyright (c) 1997-2010 The PHP Group |
|
||||
+----------------------------------------------------------------------+
|
||||
| This source file is subject to version 3.01 of the PHP license, |
|
||||
| that is bundled with this package in the file LICENSE, and is |
|
||||
| available through the world-wide-web at the following url: |
|
||||
| http://www.php.net/license/3_01.txt |
|
||||
| If you did not receive a copy of the PHP license and are unable to |
|
||||
| obtain it through the world-wide-web, please send a note to |
|
||||
| license@php.net so we can mail you a copy immediately. |
|
||||
+----------------------------------------------------------------------+
|
||||
*/
|
||||
|
||||
#include <hphp/runtime/ext/ext_collections.h>
|
||||
#include <hphp/runtime/ext/ext_asio.h>
|
||||
#include <hphp/runtime/ext/ext_closure.h>
|
||||
#include <hphp/runtime/ext/asio/asio_context.h>
|
||||
#include <hphp/runtime/ext/asio/asio_session.h>
|
||||
#include <hphp/system/systemlib.h>
|
||||
|
||||
namespace HPHP {
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
namespace {
|
||||
StaticString s_genMap("<gen-map>");
|
||||
|
||||
void putException(Object& exception_field, ObjectData* new_exception) {
|
||||
assert(new_exception);
|
||||
assert(new_exception->instanceof(SystemLib::s_ExceptionClass));
|
||||
|
||||
if (exception_field.isNull()) {
|
||||
exception_field = new_exception;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c_GenMapWaitHandle::c_GenMapWaitHandle(Class* cb)
|
||||
: c_BlockableWaitHandle(cb), m_exception() {
|
||||
}
|
||||
|
||||
c_GenMapWaitHandle::~c_GenMapWaitHandle() {
|
||||
}
|
||||
|
||||
void c_GenMapWaitHandle::t___construct() {
|
||||
Object e(SystemLib::AllocInvalidOperationExceptionObject(
|
||||
"Use GenMapWaitHandle::create() instead of constructor"));
|
||||
throw e;
|
||||
}
|
||||
|
||||
void c_GenMapWaitHandle::ti_setoncreatecallback(CVarRef callback) {
|
||||
if (!callback.isNull() && !callback.instanceof(c_Closure::s_cls)) {
|
||||
Object e(SystemLib::AllocInvalidArgumentExceptionObject(
|
||||
"Unable to set GenMapWaitHandle::onCreate: on_create_cb not a closure"));
|
||||
throw e;
|
||||
}
|
||||
AsioSession::Get()->setOnGenMapCreateCallback(callback.getObjectDataOrNull());
|
||||
}
|
||||
|
||||
Object c_GenMapWaitHandle::ti_create(CVarRef dependencies) {
|
||||
if (UNLIKELY(!dependencies.instanceof(c_Map::s_cls))) {
|
||||
Object e(SystemLib::AllocInvalidArgumentExceptionObject(
|
||||
"Expected dependencies to be an instance of Map"));
|
||||
throw e;
|
||||
}
|
||||
assert(dynamic_cast<c_Map*>(dependencies.getObjectData()));
|
||||
p_Map deps = static_cast<c_Map*>(dependencies.getObjectData())->clone();
|
||||
for (ssize_t iter_pos = deps->iter_begin();
|
||||
iter_pos;
|
||||
iter_pos = deps->iter_next(iter_pos)) {
|
||||
|
||||
TypedValue* current = deps->iter_value(iter_pos);
|
||||
if (UNLIKELY(!c_WaitHandle::fromTypedValue(current))) {
|
||||
Object e(SystemLib::AllocInvalidArgumentExceptionObject(
|
||||
"Expected dependencies to be a map of WaitHandle instances"));
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
Object exception;
|
||||
for (ssize_t iter_pos = deps->iter_begin();
|
||||
iter_pos;
|
||||
iter_pos = deps->iter_next(iter_pos)) {
|
||||
|
||||
TypedValue* current = deps->iter_value(iter_pos);
|
||||
assert(current->m_type == KindOfObject);
|
||||
assert(dynamic_cast<c_WaitHandle*>(current->m_data.pobj));
|
||||
auto child = static_cast<c_WaitHandle*>(current->m_data.pobj);
|
||||
|
||||
if (child->isSucceeded()) {
|
||||
tvSetIgnoreRef(child->getResult(), current);
|
||||
} else if (child->isFailed()) {
|
||||
putException(exception, child->getException());
|
||||
} else {
|
||||
assert(dynamic_cast<c_WaitableWaitHandle*>(child));
|
||||
auto child_wh = static_cast<c_WaitableWaitHandle*>(child);
|
||||
|
||||
p_GenMapWaitHandle my_wh = NEWOBJ(c_GenMapWaitHandle)();
|
||||
my_wh->initialize(exception, deps.get(), iter_pos, child_wh);
|
||||
|
||||
AsioSession* session = AsioSession::Get();
|
||||
if (UNLIKELY(session->hasOnGenMapCreateCallback())) {
|
||||
session->onGenMapCreate(my_wh.get(), dependencies);
|
||||
}
|
||||
|
||||
return my_wh;
|
||||
}
|
||||
}
|
||||
|
||||
if (exception.isNull()) {
|
||||
TypedValue tv;
|
||||
tv.m_type = KindOfObject;
|
||||
tv.m_data.pobj = deps.get();
|
||||
return c_StaticResultWaitHandle::Create(&tv);
|
||||
} else {
|
||||
return c_StaticExceptionWaitHandle::Create(exception.get());
|
||||
}
|
||||
}
|
||||
|
||||
void c_GenMapWaitHandle::initialize(CObjRef exception, c_Map* deps, ssize_t iter_pos, c_WaitableWaitHandle* child) {
|
||||
m_exception = exception;
|
||||
m_deps = deps;
|
||||
m_iterPos = iter_pos;
|
||||
try {
|
||||
blockOn(child);
|
||||
} catch (const Object& cycle_exception) {
|
||||
putException(m_exception, cycle_exception.get());
|
||||
m_iterPos = m_deps->iter_next(m_iterPos);
|
||||
onUnblocked();
|
||||
}
|
||||
}
|
||||
|
||||
void c_GenMapWaitHandle::onUnblocked() {
|
||||
for (;
|
||||
m_iterPos;
|
||||
m_iterPos = m_deps->iter_next(m_iterPos)) {
|
||||
|
||||
TypedValue* current = m_deps->iter_value(m_iterPos);
|
||||
assert(current->m_type == KindOfObject);
|
||||
assert(dynamic_cast<c_WaitHandle*>(current->m_data.pobj));
|
||||
auto child = static_cast<c_WaitHandle*>(current->m_data.pobj);
|
||||
|
||||
if (child->isSucceeded()) {
|
||||
tvSetIgnoreRef(child->getResult(), current);
|
||||
} else if (child->isFailed()) {
|
||||
putException(m_exception, child->getException());
|
||||
} else {
|
||||
assert(dynamic_cast<c_WaitableWaitHandle*>(child));
|
||||
auto child_wh = static_cast<c_WaitableWaitHandle*>(child);
|
||||
|
||||
try {
|
||||
blockOn(child_wh);
|
||||
return;
|
||||
} catch (const Object& cycle_exception) {
|
||||
putException(m_exception, cycle_exception.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (m_exception.isNull()) {
|
||||
TypedValue result;
|
||||
result.m_type = KindOfObject;
|
||||
result.m_data.pobj = m_deps.get();
|
||||
setResult(&result);
|
||||
m_deps = nullptr;
|
||||
} else {
|
||||
setException(m_exception.get());
|
||||
m_exception = nullptr;
|
||||
m_deps = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
String c_GenMapWaitHandle::getName() {
|
||||
return s_genMap;
|
||||
}
|
||||
|
||||
c_WaitableWaitHandle* c_GenMapWaitHandle::getChild() {
|
||||
assert(getState() == STATE_BLOCKED);
|
||||
return static_cast<c_WaitableWaitHandle*>(
|
||||
m_deps->iter_value(m_iterPos)->m_data.pobj);
|
||||
}
|
||||
|
||||
void c_GenMapWaitHandle::enterContext(context_idx_t ctx_idx) {
|
||||
assert(AsioSession::Get()->getContext(ctx_idx));
|
||||
|
||||
// stop before corrupting unioned data
|
||||
if (isFinished()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// already in the more specific context?
|
||||
if (LIKELY(getContextIdx() >= ctx_idx)) {
|
||||
return;
|
||||
}
|
||||
|
||||
assert(getState() == STATE_BLOCKED);
|
||||
|
||||
// recursively import current child
|
||||
{
|
||||
assert(m_iterPos);
|
||||
TypedValue* current = m_deps->iter_value(m_iterPos);
|
||||
|
||||
assert(current->m_type == KindOfObject);
|
||||
assert(dynamic_cast<c_WaitableWaitHandle*>(current->m_data.pobj));
|
||||
auto child_wh = static_cast<c_WaitableWaitHandle*>(current->m_data.pobj);
|
||||
child_wh->enterContext(ctx_idx);
|
||||
}
|
||||
|
||||
// import ourselves
|
||||
setContextIdx(ctx_idx);
|
||||
|
||||
// try to import other children
|
||||
try {
|
||||
for (ssize_t iter_pos = m_deps->iter_next(m_iterPos);
|
||||
iter_pos;
|
||||
iter_pos = m_deps->iter_next(iter_pos)) {
|
||||
|
||||
TypedValue* current = m_deps->iter_value(iter_pos);
|
||||
assert(current->m_type == KindOfObject);
|
||||
assert(dynamic_cast<c_WaitHandle*>(current->m_data.pobj));
|
||||
auto child = static_cast<c_WaitHandle*>(current->m_data.pobj);
|
||||
|
||||
if (child->isFinished()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
assert(dynamic_cast<c_WaitableWaitHandle*>(child));
|
||||
auto child_wh = static_cast<c_WaitableWaitHandle*>(child);
|
||||
child_wh->enterContext(ctx_idx);
|
||||
}
|
||||
} catch (const Object& cycle_exception) {
|
||||
// exception will be eventually processed by onUnblocked()
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
}
|
||||
@@ -46,6 +46,7 @@ void f_asio_set_on_started_callback(CVarRef on_started_cb);
|
||||
* BlockableWaitHandle - wait handle that can be blocked by other WH
|
||||
* ContinuationWaitHandle - Continuation-powered asynchronous execution
|
||||
* GenArrayWaitHandle - wait handle representing an array of WHs
|
||||
* GenMapWaitHandle - wait handle representing an Map of WHs
|
||||
* GenVectorWaitHandle - wait handle representing an Vector of WHs
|
||||
* SetResultToRefWaitHandle - wait handle that sets result to reference
|
||||
* RescheduleWaitHandle - wait handle that reschedules execution
|
||||
@@ -344,7 +345,45 @@ class c_GenArrayWaitHandle : public c_BlockableWaitHandle {
|
||||
Array m_deps;
|
||||
ssize_t m_iterPos;
|
||||
};
|
||||
//
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// class GenMapWaitHandle
|
||||
|
||||
/**
|
||||
* A wait handle that waits for a map of wait handles. The wait handle
|
||||
* finishes once all wait handles in the map are finished. The result value
|
||||
* preserves the keys of the original map. If one of the wait handles failed,
|
||||
* the exception is propagated by failure.
|
||||
*/
|
||||
FORWARD_DECLARE_CLASS_BUILTIN(GenMapWaitHandle);
|
||||
FORWARD_DECLARE_CLASS_BUILTIN(Map);
|
||||
class c_GenMapWaitHandle : public c_BlockableWaitHandle {
|
||||
public:
|
||||
DECLARE_CLASS(GenMapWaitHandle, GenMapWaitHandle, BlockableWaitHandle)
|
||||
|
||||
// need to implement
|
||||
public: c_GenMapWaitHandle(Class* cls = c_GenMapWaitHandle::s_cls);
|
||||
public: ~c_GenMapWaitHandle();
|
||||
public: void t___construct();
|
||||
public: static void ti_setoncreatecallback(CVarRef callback);
|
||||
public: static Object ti_create(CVarRef dependencies);
|
||||
|
||||
public:
|
||||
String getName();
|
||||
void enterContext(context_idx_t ctx_idx);
|
||||
|
||||
protected:
|
||||
void onUnblocked();
|
||||
c_WaitableWaitHandle* getChild();
|
||||
|
||||
private:
|
||||
void initialize(CObjRef exception, c_Map* deps, int64_t iter_pos, c_WaitableWaitHandle* child);
|
||||
|
||||
Object m_exception;
|
||||
p_Map m_deps;
|
||||
ssize_t m_iterPos;
|
||||
};
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// class GenVectorWaitHandle
|
||||
|
||||
|
||||
@@ -485,6 +485,7 @@ class c_Map : public ExtObjectDataFlags<ObjectData::MapAttrInit|
|
||||
friend class c_Vector;
|
||||
friend class c_StableMap;
|
||||
friend class ArrayIter;
|
||||
friend class c_GenMapWaitHandle;
|
||||
};
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@@ -640,6 +640,68 @@
|
||||
"consts": [
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "GenMapWaitHandle",
|
||||
"parent": "BlockableWaitHandle",
|
||||
"desc": "A wait handle representing a map of asynchronous operations",
|
||||
"flags": [
|
||||
"HasDocComment"
|
||||
],
|
||||
"funcs": [
|
||||
{
|
||||
"name": "__construct",
|
||||
"flags": [
|
||||
"IsPrivate",
|
||||
"HasDocComment"
|
||||
],
|
||||
"return": {
|
||||
"type": null
|
||||
},
|
||||
"args": [
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "create",
|
||||
"desc": "Create a wait handle that waits for a given map of dependencies",
|
||||
"flags": [
|
||||
"IsStatic",
|
||||
"HasDocComment"
|
||||
],
|
||||
"return": {
|
||||
"type": "Object",
|
||||
"desc": "A WaitHandle that will wait for a given map of dependencies and return their results"
|
||||
},
|
||||
"args": [
|
||||
{
|
||||
"name": "dependencies",
|
||||
"type": "Variant",
|
||||
"desc": "A map of dependencies to wait for"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "setOnCreateCallback",
|
||||
"desc": "Set callback for when a GenMapWaitHandle is created",
|
||||
"flags": [
|
||||
"IsStatic",
|
||||
"HasDocComment",
|
||||
"HipHopSpecific"
|
||||
],
|
||||
"return": {
|
||||
"type": null
|
||||
},
|
||||
"args": [
|
||||
{
|
||||
"name": "on_create_cb",
|
||||
"type": "Variant",
|
||||
"desc": "A Closure to be called on creation"
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"consts": [
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "GenVectorWaitHandle",
|
||||
"parent": "BlockableWaitHandle",
|
||||
|
||||
Referência em uma Nova Issue
Bloquear um usuário