Arquivos
Jordan DeLong fd00ef394e Remove includes of util.h from .cpp files in hphp/util
Replace some uses with folly functions or boost.  I kept
string_vsnprintf in its own header for now.  We could probably move it
to folly/String.h (folly has stringPrintf and such, just no vararg
version), but on the other hand using va_list is not very encouraged
so maybe we should just leave it here for these legacy uses.

Reviewed By: @ptarjan

Differential Revision: D1124742
2014-01-13 15:19:32 -08:00

473 linhas
15 KiB
C++

/*
+----------------------------------------------------------------------+
| HipHop for PHP |
+----------------------------------------------------------------------+
| Copyright (c) 2010-2013 Facebook, Inc. (http://www.facebook.com) |
+----------------------------------------------------------------------+
| This source file is subject to version 3.01 of the PHP license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.php.net/license/3_01.txt |
| If you did not receive a copy of the PHP license and are unable to |
| obtain it through the world-wide-web, please send a note to |
| license@php.net so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
*/
#include "hphp/util/db-conn.h"
#include <cstdlib>
#include <boost/algorithm/string.hpp>
#include "folly/Conv.h"
#include "hphp/util/db-query.h"
#include "hphp/util/db-mysql.h"
#include "hphp/util/exception.h"
#include "hphp/util/lock.h"
#include "hphp/util/async-job.h"
#include "hphp/util/alloc.h"
namespace HPHP {
///////////////////////////////////////////////////////////////////////////////
DatabaseException::DatabaseException(int code,
const char *fmt, ...) : m_code(code) {
va_list ap; va_start(ap, fmt); format(fmt, ap); va_end(ap);
}
// Class ServerData
int ServerData::DefaultPort = 3306;
std::string ServerData::DefaultUsername = "root";
std::string ServerData::DefaultPassword = "";
class DBConnQueryJob {
public:
DBConnQueryJob(std::shared_ptr<ServerData> server, const std::string sql,
int index,
Mutex &mutex, DBDataSet &dsResult, bool retryQueryOnFail,
unsigned int readTimeout, unsigned int connectTimeout,
int maxRetryOpenOnFail, int maxRetryQueryOnFail)
: m_server(server), m_sql(sql), m_index(index),
m_affected(0), m_dsMutex(&mutex), m_dsResult(&dsResult),
m_retryQueryOnFail(retryQueryOnFail), m_connectTimeout(connectTimeout),
m_readTimeout(readTimeout),
m_maxRetryOpenOnFail(maxRetryOpenOnFail),
m_maxRetryQueryOnFail(maxRetryQueryOnFail) {}
std::shared_ptr<ServerData> m_server;
std::string m_sql;
int m_index;
int m_affected;
Mutex *m_dsMutex;
DBDataSet *m_dsResult;
DBConn::ErrorInfo m_error;
bool m_retryQueryOnFail;
int m_connectTimeout;
int m_readTimeout;
int m_maxRetryOpenOnFail;
int m_maxRetryQueryOnFail;
};
class DBConnQueryWorker {
public:
void onThreadEnter() {}
void doJob(std::shared_ptr<DBConnQueryJob> job);
void onThreadExit() { mysql_thread_end();}
};
static void parseColonPair(std::string &s, size_t pos,
std::string &part1, std::string &part2) {
std::string tmp = s.substr(0, pos);
s = s.substr(pos + 1);
pos = tmp.find(':');
if (pos == std::string::npos) {
part1 = tmp;
} else {
part1 = tmp.substr(0, pos);
part2 = tmp.substr(pos + 1);
}
}
std::shared_ptr<ServerData> ServerData::Create(const std::string &connection) {
auto server = std::make_shared<ServerData>();
std::string s = connection;
size_t pos = s.find('@');
if (pos != std::string::npos) {
parseColonPair(s, pos, server->m_username, server->m_password);
}
pos = s.find('/');
if (pos != std::string::npos) {
std::string port;
parseColonPair(s, pos, server->m_ip, port);
if (!port.empty()) server->m_port = std::atoi(port.c_str());
}
server->m_database = s;
return server;
}
ServerData::ServerData() : m_port(0) {
}
ServerData::ServerData(const char *ip, const char *database,
int port, const char *username,
const char *password,
const SessionVariableVec &sessionVariables) :
m_port(port), m_sessionVariables(sessionVariables) {
if (ip) m_ip = ip;
if (database) m_database = database;
if (username) m_username = username;
if (password) m_password = password;
}
int ServerData::getPort() const {
return m_port > 0 ? m_port : DefaultPort;
}
const std::string &ServerData::getUserName() const {
return m_username.empty() ? DefaultUsername : m_username;
}
const std::string &ServerData::getPassword() const {
return m_password.empty() ? DefaultPassword : m_password;
}
///////////////////////////////////////////////////////////////////////////////
// static members
unsigned int DBConn::DefaultWorkerCount = 50;
unsigned int DBConn::DefaultConnectTimeout = 1000;
unsigned int DBConn::DefaultReadTimeout = 1000;
Mutex DBConn::s_mutex;
DBConn::DatabaseMap DBConn::s_localDatabases;
void DBConn::ClearLocalDatabases() {
Lock lock(s_mutex);
s_localDatabases.clear();
}
void DBConn::AddLocalDB(int dbId, const char *ip, const char *db,
int port, const char *username, const char *password,
const SessionVariableVec &sessionVariables) {
Lock lock(s_mutex);
s_localDatabases[dbId] =
std::make_shared<ServerData>(ip, db, port, username, password,
sessionVariables);
}
///////////////////////////////////////////////////////////////////////////////
DBConn::DBConn(int maxRetryOpenOnFail, int maxRetryQueryOnFail)
: m_conn(nullptr), m_connectTimeout(DefaultConnectTimeout),
m_readTimeout(DefaultReadTimeout),
m_maxRetryOpenOnFail(maxRetryOpenOnFail) {
}
DBConn::~DBConn() {
close();
}
void DBConn::open(std::shared_ptr<ServerData> server,
int connectTimeout /* = -1 */,
int readTimeout /* = -1 */) {
if (isOpened()) {
close();
}
if (connectTimeout <= 0) connectTimeout = DefaultConnectTimeout;
if (readTimeout <= 0) readTimeout = DefaultReadTimeout;
m_conn = mysql_init(nullptr);
MySQLUtil::set_mysql_timeout(m_conn, MySQLUtil::ConnectTimeout,
connectTimeout);
MySQLUtil::set_mysql_timeout(m_conn, MySQLUtil::ReadTimeout, readTimeout);
MYSQL *ret = mysql_real_connect(m_conn, server->getIP().c_str(),
server->getUserName().c_str(),
server->getPassword().c_str(),
server->getDatabase().c_str(),
server->getPort(), nullptr, 0);
if (!ret) {
int code = mysql_errno(m_conn);
const char *msg = mysql_error(m_conn);
std::string smsg = msg ? msg : "";
mysql_close(m_conn);
m_conn = nullptr;
throw DBConnectionException(code, server->getIP().c_str(),
server->getDatabase().c_str(),
smsg.c_str());
}
// Setting session variables
if (server->getSessionVariables().size()) {
auto sessionCmd = std::string("SET ");
for (auto iter = server->getSessionVariables().begin();
iter != server->getSessionVariables().end();
iter++) {
if (iter != server->getSessionVariables().begin()) {
sessionCmd += ", ";
}
sessionCmd += std::string("SESSION ") + iter->first + std::string("=") +
iter->second;
}
char *sessionVarSQL = (char*)Util::safe_malloc(sessionCmd.length() * 2 + 1);
mysql_real_escape_string(m_conn, sessionVarSQL, sessionCmd.c_str(),
sessionCmd.length());
bool failure = mysql_query(m_conn, sessionVarSQL);
Util::safe_free(sessionVarSQL);
if (failure) {
int code = mysql_errno(m_conn);
throw DatabaseException(code, "Failed to execute SQL '%s': %s (%d)",
sessionCmd.c_str(), mysql_error(m_conn), code);
}
}
m_server = server;
m_connectTimeout = connectTimeout;
m_readTimeout = readTimeout;
}
void DBConn::close() {
if (isOpened()) {
mysql_close(m_conn);
m_conn = nullptr;
m_server.reset();
}
}
void DBConn::escapeString(const char *s, std::string &out) {
escapeString(s, strlen(s), out);
}
void DBConn::escapeString(const char *s, int len, std::string &out) {
assert(s);
assert(isOpened());
if (len) {
char *buffer = (char*)malloc(len * 2 + 1);
mysql_real_escape_string(m_conn, buffer, s, len);
out = buffer;
free(buffer);
}
}
int DBConn::execute(const std::string &sql, DBDataSet *ds /* = NULL */,
bool retryQueryOnFail /* = true */) {
return execute(sql.c_str(), ds, retryQueryOnFail);
}
int DBConn::execute(const char *sql, DBDataSet *ds /* = NULL */,
bool retryQueryOnFail /* = true */) {
assert(sql && *sql);
assert(isOpened());
{
bool failure;
if ((failure = mysql_query(m_conn, sql))) {
if (retryQueryOnFail) {
for (int count = 0; count < m_maxRetryOpenOnFail; count++) {
open(m_server, m_connectTimeout, m_readTimeout);
failure = mysql_query(m_conn, sql);
if (!failure) break;
}
}
if (failure) {
int code = mysql_errno(m_conn);
throw DatabaseException(code, "Failed to execute SQL '%s': %s (%d)",
sql, mysql_error(m_conn), code);
}
}
}
MYSQL_RES *result = mysql_store_result(m_conn);
if (!result) {
int code = mysql_errno(m_conn);
if (code) {
throw DatabaseException(code, "Failed to execute SQL '%s': %s (%d)", sql,
mysql_error(m_conn), code);
}
}
int affected = mysql_affected_rows(m_conn);
if (ds) {
ds->addResult(m_conn, result);
} else {
mysql_free_result(result);
}
return affected;
}
int DBConn::getLastInsertId() {
assert(isOpened());
return mysql_insert_id(m_conn);
}
int DBConn::parallelExecute(const char *sql, DBDataSet &ds,
ErrorInfoMap &errors, int maxThread,
bool retryQueryOnFail, int connectTimeout,
int readTimeout,
int maxRetryOpenOnFail,
int maxRetryQueryOnFail) {
assert(sql && *sql);
if (s_localDatabases.empty()) {
return -1;
}
std::vector<std::shared_ptr<DBConnQueryJob>> jobs;
Mutex mutex;
jobs.reserve(s_localDatabases.size());
std::string ssql = sql; // so we have copy-on-write in the loop
for (DatabaseMap::const_iterator iter = s_localDatabases.begin();
iter != s_localDatabases.end(); ++iter) {
jobs.push_back(std::make_shared<DBConnQueryJob>(
iter->second, ssql, iter->first,
mutex, ds,
retryQueryOnFail, connectTimeout,
readTimeout,
maxRetryOpenOnFail,
maxRetryQueryOnFail));
}
return parallelExecute(jobs, errors, maxThread);
}
int DBConn::parallelExecute(const ServerQueryVec &sqls, DBDataSet &ds,
ErrorInfoMap &errors, int maxThread,
bool retryQueryOnFail, int connectTimeout,
int readTimeout,
int maxRetryOpenOnFail,
int maxRetryQueryOnFail) {
if (sqls.empty()) {
return 0;
}
std::vector<std::shared_ptr<DBConnQueryJob>> jobs;
Mutex mutex;
jobs.reserve(sqls.size());
for (unsigned int i = 0; i < sqls.size(); i++) {
const ServerQuery &query = sqls[i];
auto job = std::make_shared<DBConnQueryJob>(
query.first, query.second, i, mutex, ds,
retryQueryOnFail, connectTimeout,
readTimeout,
maxRetryOpenOnFail, maxRetryQueryOnFail);
jobs.push_back(job);
}
return parallelExecute(jobs, errors, maxThread);
}
int DBConn::parallelExecute(const ServerQueryVec &sqls,
std::vector<std::shared_ptr<DBDataSet>> &dss,
ErrorInfoMap &errors, int maxThread,
bool retryQueryOnFail, int connectTimeout,
int readTimeout,
int maxRetryOpenOnFail,
int maxRetryQueryOnFail) {
assert(sqls.size() == dss.size());
if (sqls.empty()) {
return 0;
}
std::vector<std::shared_ptr<DBConnQueryJob>> jobs;
Mutex mutex;
jobs.reserve(sqls.size());
for (unsigned int i = 0; i < sqls.size(); i++) {
const ServerQuery &query = sqls[i];
auto job = std::make_shared<DBConnQueryJob>(
query.first, query.second, i, mutex,
*dss[i], retryQueryOnFail, connectTimeout,
readTimeout,
maxRetryOpenOnFail, maxRetryQueryOnFail);
jobs.push_back(job);
}
return parallelExecute(jobs, errors, maxThread);
}
int DBConn::parallelExecute(std::vector<std::shared_ptr<DBConnQueryJob>> &jobs,
ErrorInfoMap &errors,
int maxThread) {
if (maxThread <= 0) maxThread = DefaultWorkerCount;
JobDispatcher<DBConnQueryJob, DBConnQueryWorker>(jobs, maxThread).run();
int affected = 0;
for (unsigned int i = 0; i < jobs.size(); i++) {
auto job = jobs[i];
int count = job->m_affected;
if (count >= 0) {
affected += count;
} else {
errors[job->m_index] = job->m_error;
}
}
return affected;
}
void DBConnQueryWorker::doJob(std::shared_ptr<DBConnQueryJob> job) {
std::string &sql = job->m_sql;
boost::replace_all(sql, "INDEX",
folly::to<std::string>(job->m_index));
if (!job->m_server) {
job->m_affected = -1;
job->m_error.code = -1;
job->m_error.msg = "(server info missing)";
return;
}
try {
DBConn conn;
int count = 0;
retry:
try {
count++;
conn.open(job->m_server, job->m_connectTimeout, job->m_readTimeout);
} catch (DatabaseException &e) {
if (job->m_retryQueryOnFail &&
count <= job->m_maxRetryQueryOnFail) {
goto retry;
} else {
throw;
}
}
if (job->m_dsResult) {
DBDataSet ds;
job->m_affected = conn.execute(sql.c_str(), &ds,
job->m_retryQueryOnFail);
Lock lock(*job->m_dsMutex);
job->m_dsResult->addDataSet(ds);
} else {
job->m_affected = conn.execute(sql.c_str(), nullptr,
job->m_retryQueryOnFail);
}
} catch (DatabaseException &e) {
job->m_affected = -1;
job->m_error.code = e.m_code;
job->m_error.msg = e.getMessage();
} catch (Exception &e) {
job->m_affected = -1;
job->m_error.code = -1;
job->m_error.msg = e.getMessage();
} catch (std::exception &e) {
job->m_affected = -1;
job->m_error.code = -1;
job->m_error.msg = e.what();
} catch (...) {
job->m_affected = -1;
job->m_error.code = -1;
job->m_error.msg = "(unknown exception)";
}
}
///////////////////////////////////////////////////////////////////////////////
}