/* +----------------------------------------------------------------------+ | HipHop for PHP | +----------------------------------------------------------------------+ | Copyright (c) 2010- Facebook, Inc. (http://www.facebook.com) | +----------------------------------------------------------------------+ | This source file is subject to version 3.01 of the PHP license, | | that is bundled with this package in the file LICENSE, and is | | available through the world-wide-web at the following url: | | http://www.php.net/license/3_01.txt | | If you did not receive a copy of the PHP license and are unable to | | obtain it through the world-wide-web, please send a note to | | license@php.net so we can mail you a copy immediately. | +----------------------------------------------------------------------+ */ #include #include #include #include #include #include #include #include #include #include #include #include using std::set; using std::deque; namespace HPHP { /////////////////////////////////////////////////////////////////////////////// class PageletTransport : public Transport, public Synchronizable { public: PageletTransport(CStrRef url, CArrRef headers, CStrRef postData, CStrRef remoteHost, const set &rfc1867UploadedFiles, CArrRef files) : m_refCount(0), m_done(false), m_code(0) { gettime(CLOCK_MONOTONIC, &m_queueTime); m_threadType = PageletThread; m_url.append(url.data(), url.size()); m_remoteHost.append(remoteHost.data(), remoteHost.size()); for (ArrayIter iter(headers); iter; ++iter) { Variant key = iter.first(); String header = iter.second(); if (key.isString() && !key.toString().empty()) { m_requestHeaders[key.toString().data()].push_back(header.data()); } else { int pos = header.find(": "); if (pos >= 0) { string name = header.substr(0, pos).data(); string value = header.substr(pos + 2).data(); m_requestHeaders[name].push_back(value); } else { Logger::Error("throwing away bad header: %s", header.data()); } } } if (postData.empty()) { m_get = true; } else { m_get = false; m_postData.append(postData.data(), postData.size()); } disableCompression(); // so we don't have to decompress during sendImpl() m_rfc1867UploadedFiles = rfc1867UploadedFiles; m_files = (std::string) f_serialize(files); } /** * Implementing Transport... */ virtual const char *getUrl() { return m_url.c_str(); } virtual const char *getRemoteHost() { return m_remoteHost.c_str(); } virtual uint16_t getRemotePort() { return 0; } virtual const void *getPostData(int &size) { size = m_postData.size(); return m_postData.data(); } virtual Method getMethod() { return m_get ? Transport::GET : Transport::POST; } virtual std::string getHeader(const char *name) { assert(name && *name); HeaderMap::const_iterator iter = m_requestHeaders.find(name); if (iter != m_requestHeaders.end()) { return iter->second[0]; } return ""; } virtual void getHeaders(HeaderMap &headers) { headers = m_requestHeaders; } virtual void addHeaderImpl(const char *name, const char *value) { assert(name && *name); assert(value); m_responseHeaders[name].push_back(value); } virtual void removeHeaderImpl(const char *name) { assert(name && *name); m_responseHeaders.erase(name); } virtual void sendImpl(const void *data, int size, int code, bool chunked) { m_response.append((const char*)data, size); if (code) { m_code = code; } } virtual void onSendEndImpl() { Lock lock(this); m_done = true; notify(); } virtual bool isUploadedFile(CStrRef filename) { return m_rfc1867UploadedFiles.find(filename.c_str()) != m_rfc1867UploadedFiles.end(); } virtual bool moveUploadedFile(CStrRef filename, CStrRef destination) { if (!isUploadedFile(filename.c_str())) { Logger::Error("%s is not an uploaded file.", filename.c_str()); return false; } return moveUploadedFileHelper(filename, destination); } virtual bool getFiles(string &files) { files = m_files; return true; } // task interface bool isDone() { return m_done; } void addToPipeline(const string &s) { Lock lock(this); m_pipeline.push_back(s); notify(); } bool isPipelineEmpty() { Lock lock(this); return m_pipeline.empty(); } String getResults(Array &headers, int &code, int64_t timeout_ms) { { Lock lock(this); while (!m_done && m_pipeline.empty()) { if (timeout_ms > 0) { long seconds = timeout_ms / 1000; long long nanosecs = (timeout_ms % 1000) * 1000000; if (!wait(seconds, nanosecs)) { code = -1; return ""; } } else { wait(); } } if (!m_pipeline.empty()) { // intermediate results do not have headers and code string ret = m_pipeline.front(); m_pipeline.pop_front(); code = 0; return ret; } } String response(m_response.c_str(), m_response.size(), CopyString); headers = Array::Create(); for (HeaderMap::const_iterator iter = m_responseHeaders.begin(); iter != m_responseHeaders.end(); ++iter) { for (unsigned int i = 0; i < iter->second.size(); i++) { StringBuffer sb; sb.append(iter->first); sb.append(": "); sb.append(iter->second[i]); headers.append(sb.detach()); } } code = m_code; return response; } // ref counting void incRefCount() { atomic_inc(m_refCount); } void decRefCount() { assert(m_refCount); if (atomic_dec(m_refCount) == 0) { delete this; } } timespec getStartTimer() const { return m_queueTime; } private: int m_refCount; string m_url; HeaderMap m_requestHeaders; bool m_get; string m_postData; string m_remoteHost; bool m_done; HeaderMap m_responseHeaders; string m_response; int m_code; deque m_pipeline; // the intermediate pagelet results set m_rfc1867UploadedFiles; string m_files; // serialized to use as $_FILES }; /////////////////////////////////////////////////////////////////////////////// struct PageletWorker : JobQueueWorker { virtual void doJob(PageletTransport *job) { try { job->onRequestStart(job->getStartTimer()); HttpRequestHandler().handleRequest(job); job->decRefCount(); } catch (...) { Logger::Error("HttpRequestHandler leaked exceptions"); } } }; /////////////////////////////////////////////////////////////////////////////// class PageletTask : public SweepableResourceData { public: DECLARE_OBJECT_ALLOCATION(PageletTask) PageletTask(CStrRef url, CArrRef headers, CStrRef post_data, CStrRef remote_host, const std::set &rfc1867UploadedFiles, CArrRef files) { m_job = new PageletTransport(url, headers, remote_host, post_data, rfc1867UploadedFiles, files); m_job->incRefCount(); } ~PageletTask() { m_job->decRefCount(); } PageletTransport *getJob() { return m_job;} static StaticString s_class_name; // overriding ResourceData virtual CStrRef o_getClassNameHook() const { return s_class_name; } private: PageletTransport *m_job; }; IMPLEMENT_OBJECT_ALLOCATION(PageletTask) StaticString PageletTask::s_class_name("PageletTask"); /////////////////////////////////////////////////////////////////////////////// // implementing PageletServer static JobQueueDispatcher *s_dispatcher; bool PageletServer::Enabled() { return RuntimeOption::PageletServerThreadCount > 0; } void PageletServer::Restart() { if (s_dispatcher) { s_dispatcher->stop(); delete s_dispatcher; s_dispatcher = nullptr; } if (RuntimeOption::PageletServerThreadCount > 0) { s_dispatcher = new JobQueueDispatcher (RuntimeOption::PageletServerThreadCount, RuntimeOption::PageletServerThreadRoundRobin, RuntimeOption::PageletServerThreadDropCacheTimeoutSeconds, RuntimeOption::PageletServerThreadDropStack, nullptr); Logger::Info("pagelet server started"); s_dispatcher->start(); } } Object PageletServer::TaskStart(CStrRef url, CArrRef headers, CStrRef remote_host, CStrRef post_data /* = null_string */, CArrRef files /* = null_array */) { if (RuntimeOption::PageletServerThreadCount <= 0) { return null_object; } if (RuntimeOption::PageletServerQueueLimit > 0 && s_dispatcher->getQueuedJobs() > RuntimeOption::PageletServerQueueLimit) { return null_object; } PageletTask *task = NEWOBJ(PageletTask)(url, headers, remote_host, post_data, get_uploaded_files(), files); Object ret(task); PageletTransport *job = task->getJob(); job->incRefCount(); // paired with worker's decRefCount() assert(s_dispatcher); s_dispatcher->enqueue(job); return ret; } int64_t PageletServer::TaskStatus(CObjRef task) { PageletTask *ptask = task.getTyped(); PageletTransport *job = ptask->getJob(); if (!job->isPipelineEmpty()) { return PAGELET_READY; } if (job->isDone()) { return PAGELET_DONE; } return PAGELET_NOT_READY; } String PageletServer::TaskResult(CObjRef task, Array &headers, int &code, int64_t timeout_ms) { PageletTask *ptask = task.getTyped(); return ptask->getJob()->getResults(headers, code, timeout_ms); } void PageletServer::AddToPipeline(const string &s) { assert(!s.empty()); PageletTransport *job = dynamic_cast(g_context->getTransport()); assert(job); job->addToPipeline(s); } int PageletServer::GetActiveWorker() { return s_dispatcher->getActiveWorker(); } int PageletServer::GetQueuedJobs() { return s_dispatcher->getQueuedJobs(); } /////////////////////////////////////////////////////////////////////////////// }