Support for UNIX sockets

Add a new config param, Server.FileSocket. When
Server.FileSocket
is set it will be used inplace of a network socket for the primary
server. This uses a new parameter to ServerOptions, m_useFileSocket,
to toggle between treating the address as a socket path or a network
address.

To initialize a socket connection thrift expects the socket file to not
exist. To support this the 'something nice' retry in startServer will
unlink an existing socket only if fuser claims it is unused.
Server.EvilShutdown enables unlinking the socket regardless of current
users.

Closes #1594

Reviewed By: @ptarjan

Differential Revision: D1135876

Pulled By: @sgolemon
Esse commit está contido em:
Erik
2014-01-28 15:33:17 -08:00
commit de Sara Golemon
commit b538a34e89
7 arquivos alterados com 71 adições e 19 exclusões
+2
Ver Arquivo
@@ -107,6 +107,7 @@ std::string RuntimeOption::Host;
std::string RuntimeOption::DefaultServerNameSuffix; std::string RuntimeOption::DefaultServerNameSuffix;
std::string RuntimeOption::ServerType = "libevent"; std::string RuntimeOption::ServerType = "libevent";
std::string RuntimeOption::ServerIP; std::string RuntimeOption::ServerIP;
std::string RuntimeOption::ServerFileSocket;
std::string RuntimeOption::ServerPrimaryIP; std::string RuntimeOption::ServerPrimaryIP;
int RuntimeOption::ServerPort; int RuntimeOption::ServerPort;
int RuntimeOption::ServerPortFd = -1; int RuntimeOption::ServerPortFd = -1;
@@ -721,6 +722,7 @@ void RuntimeOption::Load(Hdf &config,
DefaultServerNameSuffix = server["DefaultServerNameSuffix"].getString(); DefaultServerNameSuffix = server["DefaultServerNameSuffix"].getString();
ServerType = server["Type"].getString(ServerType); ServerType = server["Type"].getString(ServerType);
ServerIP = server["IP"].getString(); ServerIP = server["IP"].getString();
ServerFileSocket = server["FileSocket"].getString();
ServerPrimaryIP = Util::GetPrimaryIP(); ServerPrimaryIP = Util::GetPrimaryIP();
ServerPort = server["Port"].getUInt16(80); ServerPort = server["Port"].getUInt16(80);
ServerBacklog = server["Backlog"].getInt16(128); ServerBacklog = server["Backlog"].getInt16(128);
+1
Ver Arquivo
@@ -109,6 +109,7 @@ public:
static std::string DefaultServerNameSuffix; static std::string DefaultServerNameSuffix;
static std::string ServerType; static std::string ServerType;
static std::string ServerIP; static std::string ServerIP;
static std::string ServerFileSocket;
static std::string ServerPrimaryIP; static std::string ServerPrimaryIP;
static int ServerPort; static int ServerPort;
static int ServerPortFd; static int ServerPortFd;
@@ -25,7 +25,8 @@ public:
virtual ServerPtr createServer(const ServerOptions& options) override { virtual ServerPtr createServer(const ServerOptions& options) override {
return std::make_shared<FastCGIServer>(options.m_address, return std::make_shared<FastCGIServer>(options.m_address,
options.m_port, options.m_port,
options.m_numThreads); options.m_numThreads,
options.m_useFileSocket);
} }
}; };
+11 -4
Ver Arquivo
@@ -183,7 +183,8 @@ void FastCGIConnection::handleRequest(int transport_id) {
FastCGIServer::FastCGIServer(const std::string &address, FastCGIServer::FastCGIServer(const std::string &address,
int port, int port,
int workers) int workers,
bool useFileSocket)
: Server(address, port, workers), : Server(address, port, workers),
m_worker(&m_eventBaseManager), m_worker(&m_eventBaseManager),
m_dispatcher(workers, m_dispatcher(workers,
@@ -195,7 +196,9 @@ FastCGIServer::FastCGIServer(const std::string &address,
RuntimeOption::ServerThreadJobMaxQueuingMilliSeconds, RuntimeOption::ServerThreadJobMaxQueuingMilliSeconds,
RequestPriority::k_numPriorities) { RequestPriority::k_numPriorities) {
TSocketAddress sock_addr; TSocketAddress sock_addr;
if (address.empty()) { if (useFileSocket) {
sock_addr.setFromPath(address);
} else if (address.empty()) {
sock_addr.setFromLocalPort(port); sock_addr.setFromLocalPort(port);
} else { } else {
sock_addr.setFromHostPort(address, port); sock_addr.setFromHostPort(address, port);
@@ -226,8 +229,12 @@ void FastCGIServer::start() {
m_socket->bind(m_socketConfig.getAddress()); m_socket->bind(m_socketConfig.getAddress());
} catch (const apache::thrift::transport::TTransportException& ex) { } catch (const apache::thrift::transport::TTransportException& ex) {
LOG(ERROR) << ex.what(); LOG(ERROR) << ex.what();
throw FailedToListenException(m_socketConfig.getAddress().getAddressStr(), if (m_socketConfig.getAddress().getFamily() == AF_UNIX) {
m_socketConfig.getAddress().getPort()); throw FailedToListenException(m_socketConfig.getAddress().getPath());
} else {
throw FailedToListenException(m_socketConfig.getAddress().getAddressStr(),
m_socketConfig.getAddress().getPort());
}
} }
m_acceptor.reset(new FastCGIAcceptor(m_socketConfig, this)); m_acceptor.reset(new FastCGIAcceptor(m_socketConfig, this));
m_acceptor->init(m_socket.get(), m_worker.getEventBase()); m_acceptor->init(m_socket.get(), m_worker.getEventBase());
+2 -1
Ver Arquivo
@@ -128,7 +128,8 @@ class FastCGIServer : public Server,
public: public:
FastCGIServer(const std::string &address, FastCGIServer(const std::string &address,
int port, int port,
int workers); int workers,
bool useFileSocket);
~FastCGIServer() { ~FastCGIServer() {
if (!m_done) { if (!m_done) {
waitForEnd(); waitForEnd();
+47 -12
Ver Arquivo
@@ -77,9 +77,11 @@ HttpServer::HttpServer()
auto serverFactory = ServerFactoryRegistry::getInstance()->getFactory auto serverFactory = ServerFactoryRegistry::getInstance()->getFactory
(RuntimeOption::ServerType); (RuntimeOption::ServerType);
ServerOptions options const std::string address = RuntimeOption::ServerFileSocket.empty()
(RuntimeOption::ServerIP, RuntimeOption::ServerPort, ? RuntimeOption::ServerIP : RuntimeOption::ServerFileSocket;
startingThreadCount); ServerOptions options(
address, RuntimeOption::ServerPort, startingThreadCount);
options.m_useFileSocket = !RuntimeOption::ServerFileSocket.empty();
options.m_serverFD = RuntimeOption::ServerPortFd; options.m_serverFD = RuntimeOption::ServerPortFd;
options.m_sslFD = RuntimeOption::SSLPortFd; options.m_sslFD = RuntimeOption::SSLPortFd;
options.m_takeoverFilename = RuntimeOption::TakeoverFilename; options.m_takeoverFilename = RuntimeOption::TakeoverFilename;
@@ -498,7 +500,12 @@ bool HttpServer::startServer(bool pageServer) {
} }
if (errno == EACCES) { if (errno == EACCES) {
Logger::Error("Permission denied listening on port %d", port); if (pageServer && !RuntimeOption::ServerFileSocket.empty()) {
Logger::Error("Permission denied opening socket at %s",
RuntimeOption::ServerFileSocket.c_str());
} else {
Logger::Error("Permission denied listening on port %d", port);
}
return false; return false;
} }
@@ -511,6 +518,22 @@ bool HttpServer::startServer(bool pageServer) {
StringBuffer response; StringBuffer response;
http.get(url.c_str(), response); http.get(url.c_str(), response);
if (pageServer && !RuntimeOption::ServerFileSocket.empty()) {
if (i == 0) {
Logger::Info("Unlinking unused socket at %s",
RuntimeOption::ServerFileSocket.c_str());
}
struct stat stat_buf;
if (stat(RuntimeOption::ServerFileSocket.c_str(), &stat_buf) == 0
&& S_ISSOCK(stat_buf.st_mode)) {
std::string cmd = "bash -c '! fuser ";
cmd += RuntimeOption::ServerFileSocket;
cmd += "'";
if (Util::ssystem(cmd.c_str()) == 0) {
unlink(RuntimeOption::ServerFileSocket.c_str());
}
}
}
sleep(1); sleep(1);
} }
} }
@@ -546,15 +569,27 @@ bool HttpServer::startServer(bool pageServer) {
} }
return true; return true;
} catch (FailedToListenException &e) { } catch (FailedToListenException &e) {
if (i == 0) { if (pageServer && !RuntimeOption::ServerFileSocket.empty()) {
Logger::Info("killing anything listening on port %d", port); if (i == 0) {
Logger::Info("unlinking socket at %s",
RuntimeOption::ServerFileSocket.c_str());
}
struct stat stat_buf;
if (stat(RuntimeOption::ServerFileSocket.c_str(), &stat_buf) == 0
&& S_ISSOCK(stat_buf.st_mode)) {
unlink(RuntimeOption::ServerFileSocket.c_str());
}
} else {
if (i == 0) {
Logger::Info("killing anything listening on port %d", port);
}
std::string cmd = "lsof -t -i :";
cmd += lexical_cast<std::string>(port);
cmd += " | xargs kill -9";
Util::ssystem(cmd.c_str());
} }
std::string cmd = "lsof -t -i :";
cmd += lexical_cast<std::string>(port);
cmd += " | xargs kill -9";
Util::ssystem(cmd.c_str());
sleep(1); sleep(1);
} }
} }
+6 -1
Ver Arquivo
@@ -265,7 +265,8 @@ public:
m_numThreads(numThreads), m_numThreads(numThreads),
m_serverFD(-1), m_serverFD(-1),
m_sslFD(-1), m_sslFD(-1),
m_takeoverFilename() { m_takeoverFilename(),
m_useFileSocket(false) {
} }
std::string m_address; std::string m_address;
@@ -274,6 +275,7 @@ public:
int m_serverFD; int m_serverFD;
int m_sslFD; int m_sslFD;
std::string m_takeoverFilename; std::string m_takeoverFilename;
bool m_useFileSocket;
}; };
/** /**
@@ -327,6 +329,9 @@ public:
class FailedToListenException : public ServerException { class FailedToListenException : public ServerException {
public: public:
explicit FailedToListenException(const std::string &addr)
: ServerException("Failed to listen to unix socket at %s", addr.c_str()) {
}
FailedToListenException(const std::string &addr, int port) FailedToListenException(const std::string &addr, int port)
: ServerException("Failed to listen on %s:%d", addr.c_str(), port) { : ServerException("Failed to listen on %s:%d", addr.c_str(), port) {
} }