Only let one thread fadvise at a time

On Linux, fadvise forces unwritten pages to be flushed to disk before
returning. This means that under heavy disk IO, all threads could be
waiting on the fadvise call.

This diff uses atomic operations to ensure that only one thread can
do IO at any given time. In addition, it makes any fadvise calls
tail the end of the file by 1 MB. This reduces the chance that
fadvise will trigger IO at all.
Esse commit está contido em:
Ben Maurer
2013-07-22 11:01:12 -07:00
commit de Sara Golemon
commit ef1e94eee2
9 arquivos alterados com 158 adições e 80 exclusões
+1 -1
Ver Arquivo
@@ -608,7 +608,7 @@ void RuntimeOption::Load(Hdf &config, StringVec *overwrites /* = NULL */,
if (LogFile[0] == '|') Logger::IsPipeOutput = true;
LogFileSymLink = logger["SymLink"].getString();
}
Logger::DropCacheChunkSize =
LogFileFlusher::DropCacheChunkSize =
logger["DropCacheChunkSize"].getInt32(1 << 20);
AlwaysEscapeLog = logger["AlwaysEscapeLog"].getBool(false);
RuntimeOption::LogHeaderMangle = logger["HeaderMangle"].getInt32(0);
+4 -16
Ver Arquivo
@@ -114,12 +114,8 @@ void AccessLog::log(Transport *transport, const VirtualHost *vhost) {
AccessLog::ThreadData *threadData = m_fGetThreadData();
FILE *threadLog = threadData->log;
if (threadLog) {
threadData->bytesWritten +=
writeLog(transport, vhost, threadLog, m_defaultFormat.c_str());
threadData->prevBytesWritten =
Logger::checkDropCache(threadData->bytesWritten,
threadData->prevBytesWritten,
threadLog);
int bytes = writeLog(transport, vhost, threadLog, m_defaultFormat.c_str());
threadData->flusher.recordWriteAndMaybeDropCaches(threadLog, bytes);
}
if (Logger::UseCronolog) {
for (uint i = 0; i < m_cronOutput.size(); ++i) {
@@ -128,11 +124,7 @@ void AccessLog::log(Transport *transport, const VirtualHost *vhost) {
if (!outFile) continue;
const char *format = m_files[i].format.c_str();
int bytes = writeLog(transport, vhost, outFile, format);
cronOutput.m_bytesWritten.fetch_add(bytes, std::memory_order_relaxed);
cronOutput.m_prevBytesWritten = Logger::checkDropCache(
cronOutput.m_bytesWritten.load(std::memory_order_relaxed),
cronOutput.m_prevBytesWritten,
outFile);
cronOutput.flusher.recordWriteAndMaybeDropCaches(outFile, bytes);
}
} else {
for (uint i = 0; i < m_output.size(); ++i) {
@@ -141,12 +133,8 @@ void AccessLog::log(Transport *transport, const VirtualHost *vhost) {
if (!outFile) continue;
const char *format = m_files[i].format.c_str();
int bytes = writeLog(transport, vhost, outFile, format);
output.bytesWritten.fetch_add(bytes, std::memory_order_relaxed);
if (m_files[i].file[0] != '|') {
output.prevBytesWritten =
Logger::checkDropCache(output.bytesWritten,
output.prevBytesWritten,
outFile);
output.flusher.recordWriteAndMaybeDropCaches(outFile, bytes);
}
}
}
+18 -3
Ver Arquivo
@@ -21,10 +21,26 @@
#include "hphp/util/logger.h"
#include "hphp/util/lock.h"
#include "hphp/util/cronolog.h"
#include "hphp/util/util.h"
namespace HPHP {
///////////////////////////////////////////////////////////////////////////////
class LogFileData {
public:
LogFileData() : log(nullptr) {}
explicit LogFileData(FILE *f) : log(f) {}
LogFileData(const LogFileData& rhs) : log(rhs.log) {
}
LogFileData& operator=(const LogFileData& rhs) {
log = rhs.log;
return *this;
}
FILE *log;
LogFileFlusher flusher;
};
class AccessLogFileData {
public:
AccessLogFileData(const std::string &fil,
@@ -40,11 +56,10 @@ class AccessLog {
public:
class ThreadData {
public:
ThreadData() : log(nullptr), bytesWritten(0), prevBytesWritten(0) {}
ThreadData() : log(nullptr) {}
FILE *log;
int64_t startTime;
int bytesWritten;
int prevBytesWritten;
LogFileFlusher flusher;
};
typedef ThreadData* (*GetThreadDataFunc)();
explicit AccessLog(GetThreadDataFunc f) :
+3 -5
Ver Arquivo
@@ -21,6 +21,7 @@
#include <string>
#include "hphp/util/cronoutils.h"
#include "hphp/util/lock.h"
#include "hphp/util/util.h"
namespace HPHP {
///////////////////////////////////////////////////////////////////////////////
@@ -39,9 +40,7 @@ public:
m_timeOffset(0),
m_nextPeriod(0),
m_prevFile(nullptr),
m_file(nullptr),
m_bytesWritten(0),
m_prevBytesWritten(0) {}
m_file(nullptr) {}
~Cronolog() {
if (m_prevFile) fclose(m_prevFile);
if (m_file) fclose(m_file);
@@ -65,8 +64,7 @@ public:
time_t m_nextPeriod;
FILE *m_prevFile;
FILE *m_file;
std::atomic<int> m_bytesWritten;
int m_prevBytesWritten;
LogFileFlusher flusher;
Mutex m_mutex;
private:
+5 -21
Ver Arquivo
@@ -50,12 +50,10 @@ bool Logger::UseSyslog = false;
bool Logger::UseLogFile = true;
bool Logger::UseCronolog = true;
bool Logger::IsPipeOutput = false;
int Logger::DropCacheChunkSize = (1 << 20);
FILE *Logger::Output = nullptr;
Cronolog Logger::cronOutput;
Logger::LogLevelType Logger::LogLevel = LogInfo;
std::atomic<int> Logger::bytesWritten(0);
int Logger::prevBytesWritten = 0;
LogFileFlusher Logger::flusher;
bool Logger::LogHeader = false;
bool Logger::LogNativeStackTrace = true;
std::string Logger::ExtraHeader;
@@ -185,16 +183,13 @@ void Logger::log(LogLevelType level, const std::string &msg,
} else {
bytes = fprintf(f, "%s%s%s", sheader.c_str(), escaped, ending);
}
bytesWritten.fetch_add(bytes, std::memory_order_relaxed);
FILE *tf = threadData->log;
if (tf) {
threadData->bytesWritten +=
int threadBytes =
fprintf(tf, "%s%s%s", header.c_str(), escaped, ending);
fflush(tf);
threadData->prevBytesWritten =
checkDropCache(threadData->bytesWritten,
threadData->prevBytesWritten,
tf);
threadData->flusher.recordWriteAndMaybeDropCaches(tf, threadBytes);
}
if (threadData->hook) {
threadData->hook(header.c_str(), msg.c_str(), ending,
@@ -206,9 +201,7 @@ void Logger::log(LogLevelType level, const std::string &msg,
fflush(f);
if (UseCronolog || (Output && !Logger::IsPipeOutput)) {
prevBytesWritten =
checkDropCache(bytesWritten.load(std::memory_order_relaxed),
prevBytesWritten, f);
flusher.recordWriteAndMaybeDropCaches(f, bytes);
}
}
}
@@ -263,15 +256,6 @@ char *Logger::EscapeString(const std::string &msg) {
return new_str;
}
int Logger::checkDropCache(int bytesWritten, int prevBytesWritten,
FILE *f) {
if (bytesWritten - prevBytesWritten > Logger::DropCacheChunkSize) {
Util::drop_cache(f);
return bytesWritten;
}
return prevBytesWritten;
}
bool Logger::SetThreadLog(const char *file) {
return (s_threadData->log = fopen(file, "a")) != nullptr;
}
+4 -33
Ver Arquivo
@@ -21,6 +21,7 @@
#include <string>
#include <stdarg.h>
#include "hphp/util/thread_local.h"
#include "hphp/util/util.h"
#include "hphp/util/cronolog.h"
namespace HPHP {
@@ -29,30 +30,6 @@ namespace HPHP {
class StackTrace;
class Exception;
class LogFileData {
public:
LogFileData() : log(nullptr), bytesWritten(0), prevBytesWritten(0) {}
explicit LogFileData(FILE *f)
: log(f)
, bytesWritten(0)
, prevBytesWritten(0)
{}
LogFileData(const LogFileData& rhs) :
log(rhs.log), prevBytesWritten(rhs.prevBytesWritten) {
bytesWritten.store(rhs.bytesWritten.load());
}
LogFileData& operator=(const LogFileData& rhs) {
log = rhs.log;
prevBytesWritten = rhs.prevBytesWritten;
bytesWritten.store(rhs.bytesWritten.load());
return *this;
}
FILE *log;
std::atomic<int> bytesWritten;
int prevBytesWritten;
};
class Logger {
public:
enum LogLevelType {
@@ -67,12 +44,10 @@ public:
static bool UseLogFile;
static bool IsPipeOutput;
static bool UseCronolog;
static int DropCacheChunkSize;
static FILE *Output;
static Cronolog cronOutput;
static LogLevelType LogLevel;
static std::atomic<int> bytesWritten;
static int prevBytesWritten;
static LogFileFlusher flusher;
static bool LogHeader;
static bool LogNativeStackTrace;
static std::string ExtraHeader;
@@ -109,8 +84,6 @@ public:
}
}
static int checkDropCache(int bytesWritten, int prevBytesWritten,
FILE *f);
static char *EscapeString(const std::string &msg);
virtual ~Logger() { }
@@ -118,12 +91,10 @@ public:
protected:
class ThreadData {
public:
ThreadData() : request(0), message(0), bytesWritten(0), prevBytesWritten(0),
log(nullptr), hook(nullptr) {}
ThreadData() : request(0), message(0), log(nullptr), hook(nullptr) {}
int request;
int message;
int bytesWritten;
int prevBytesWritten;
LogFileFlusher flusher;
FILE *log;
PFUNC_LOG hook;
void *hookData;
+76
Ver Arquivo
@@ -0,0 +1,76 @@
/*
+----------------------------------------------------------------------+
| HipHop for PHP |
+----------------------------------------------------------------------+
| Copyright (c) 2010-2013 Facebook, Inc. (http://www.facebook.com) |
+----------------------------------------------------------------------+
| This source file is subject to version 3.01 of the PHP license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.php.net/license/3_01.txt |
| If you did not receive a copy of the PHP license and are unable to |
| obtain it through the world-wide-web, please send a note to |
| license@php.net so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
*/
#include "hphp/util/logger.h"
#include "gtest/gtest.h"
namespace HPHP {
class MockLogFileFlusher : public LogFileFlusher {
public:
MockLogFileFlusher() : last_flush(0) {}
off_t last_flush;
protected:
void dropCache(int fd, off_t off) {
last_flush = off;
}
};
static void doWrite(LogFileFlusher* flusher, int fd, int bytes) {
char* buf = new char[bytes];
write(fd, buf, bytes);
delete[] buf;
flusher->recordWriteAndMaybeDropCaches(fd, bytes);
}
TEST(LogFileFlusherTest, flush) {
MockLogFileFlusher flusher;
LogFileFlusher::DropCacheChunkSize = 1 << 19;
ASSERT_EQ(LogFileFlusher::kDropCacheTail, 1 << 20);
char tmpl[] = "/tmp/LogFileFlusherTest.XXXXXX";
int fd = mkstemp(tmpl);
ASSERT_NE(fd, 0);
unlink (tmpl);
// 100 bytes written, not enough to flush
doWrite(&flusher, fd, 100);
EXPECT_EQ(flusher.last_flush, 0);
// 512KB written, but file is not 1 MB large yet so no fadvise
doWrite(&flusher, fd, (1 << 19));
EXPECT_EQ(flusher.last_flush, 0);
// Another 512KB, after one more byte a fadvise will be called
doWrite(&flusher, fd, (1 << 19));
EXPECT_EQ(flusher.last_flush, 0);
// fadvise called
doWrite(&flusher, fd, 1);
EXPECT_EQ(flusher.last_flush, 101);
// Almost ready for another flush
doWrite(&flusher, fd, (1 << 19));
EXPECT_EQ(flusher.last_flush, 101);
// Another flush occurs
doWrite(&flusher, fd, 1);
EXPECT_EQ(flusher.last_flush, (1 << 19) + 100 + 2);
}
}
+19
Ver Arquivo
@@ -303,6 +303,25 @@ int Util::drop_cache(FILE *f, off_t len /* = 0 */) {
return drop_cache(fileno(f), len);
}
int LogFileFlusher::DropCacheChunkSize = (1 << 20);
void LogFileFlusher::recordWriteAndMaybeDropCaches(int fd, int bytes) {
int oldBytesWritten = m_bytesWritten.fetch_add(bytes);
int newBytesWritten = oldBytesWritten + bytes;
if (!(newBytesWritten > DropCacheChunkSize &&
oldBytesWritten <= DropCacheChunkSize)) {
return;
}
off_t offset = lseek(fd, 0, SEEK_CUR);
if (offset > kDropCacheTail) {
dropCache(fd, offset - kDropCacheTail);
}
m_bytesWritten = 0;
}
int Util::directCopy(const char *srcfile, const char *dstfile) {
int srcFd = open(srcfile, O_RDONLY);
if (srcFd == -1) return -1;
+28 -1
Ver Arquivo
@@ -17,6 +17,7 @@
#ifndef incl_HPHP_UTIL_H_
#define incl_HPHP_UTIL_H_
#include <atomic>
#include <vector>
#include <string>
#include <map>
@@ -390,6 +391,32 @@ struct Nuller : private boost::noncopyable {
};
///////////////////////////////////////////////////////////////////////////////
}}
}
class LogFileFlusher {
public:
LogFileFlusher() : m_bytesWritten(0) {}
virtual ~LogFileFlusher() {}
void recordWriteAndMaybeDropCaches(int fd, int bytes);
inline void recordWriteAndMaybeDropCaches(FILE* f, int bytes) {
recordWriteAndMaybeDropCaches(fileno(f), bytes);
}
enum {
kDropCacheTail = 1 * 1024 * 1024
};
static int DropCacheChunkSize;
protected:
// For testing
virtual void dropCache(int fd, off_t len) {
Util::drop_cache(fd, len);
}
private:
std::atomic<int> m_bytesWritten;
};
}
#endif