Get rid of Synchronized<hphp_hash_map> from service_data

We can get rid of all the lock contention by using
tbb::concurrent_hash_map. Unfortunately tbb::concurrent_hash_map doesn't let us
iterate over it while we concurrently do inserts. Work around this limitation by
keeping a copy of all the counter keys in a concurrent_vector which does let us
concurrently insert and iterate.

On lookup / insert, we first look into concurrent_hash_map for the key. If key
already exists, return the value. Otherwise, we insert into both the map and the
list.

On export, we iterate over the list and look up each key from the map.
Esse commit está contido em:
Stephen Chen
2013-07-01 13:50:17 -07:00
commit de Sara Golemon
commit d4de02ba16
3 arquivos alterados com 133 adições e 89 exclusões
+114 -82
Ver Arquivo
@@ -18,6 +18,7 @@
#include <array>
#include <memory>
#include <tbb/concurrent_unordered_map.h>
#include "folly/Conv.h"
#include "folly/MapUtil.h"
@@ -100,91 +101,119 @@ void ExportedHistogram::exportAll(const std::string& prefix,
}
}
namespace detail {
template <class ClassWithPrivateDestructor>
class FriendDeleter {
public:
template <class... Args>
explicit FriendDeleter(Args&&... args)
: m_instance(new ClassWithPrivateDestructor(
std::forward<Args>(args)...)) {}
~FriendDeleter() { delete m_instance; }
ClassWithPrivateDestructor* get() const { return m_instance; }
ClassWithPrivateDestructor* release() {
auto r = m_instance;
m_instance = nullptr;
return r;
}
private:
ClassWithPrivateDestructor* m_instance;
};
} // namespace detail
namespace {
// Find 'key' in concurrent_unordered_map 'map'. Return true iff the key is
// found.
template<class Key, class Value>
bool concurrentMapGet(const tbb::concurrent_unordered_map<Key, Value>& map,
const Key& key,
Value& value) {
auto iterator = map.find(key);
if (iterator != map.end()) {
value = iterator->second;
return true;
}
return false;
}
// Find or insert 'key' into concurrent_unordered_map 'map'.
//
// Return the value pointer from 'map' if it exists. Otherwise, insert it into
// the map by creating a new object on the heap using the supplied arguments.
//
// Note that this function could be called concurrently. If the insertion to
// 'map' is successful, we release the ownership of value object from
// valuePtr. If the key is already in the map because someone else beat us to
// the insertion, we will return the existing value and delete the object we
// created.
//
template <class Key, class Value, class... Args>
Value* getOrCreateWithArgs(tbb::concurrent_unordered_map<Key, Value*>& map,
const Key& key,
Args&&... args) {
// Optimistic case: the object might already be created. Do a simple look
// up.
Value* ret = nullptr;
if (concurrentMapGet(map, key, ret)) {
return ret;
}
// We didn't find an existing value for the key. Create it. Hold the new
// object in a deleter and release it later if the insert is successful.
detail::FriendDeleter<Value> deleter(std::forward<Args>(args)...);
auto result = map.insert(std::make_pair(key, deleter.get()));
if (result.second) {
// insert successfully. release the memory.
deleter.release();
} else {
// key is already inserted. This can happen if two threads were racing
// to create the counter. In this case, nothing further needs to be done.
// valuePtr's object will get destroyed when we go out of scope.
}
return result.first->second;
}
class Impl {
public:
ExportedCounter* createCounter(const std::string& name) {
SYNCHRONIZED(m_counterMap) {
auto iterator = m_counterMap.find(name);
if (iterator == m_counterMap.end()) {
return (m_counterMap[name] = new ExportedCounter());
}
return iterator->second;
}
// make compiler happy.
return nullptr;
return getOrCreateWithArgs(m_counterMap, name);
}
ExportedTimeSeries* createTimeseries(
const std::string& name,
const std::vector<ServiceData::StatsType>& types,
const std::vector<std::chrono::seconds>& levels,
int numBuckets) {
SYNCHRONIZED(m_timeseriesMap) {
ExportedTimeSeries* counter = nullptr;
auto iterator = m_timeseriesMap.find(name);
if (iterator == m_timeseriesMap.end()) {
counter = new ExportedTimeSeries(numBuckets, levels, types);
m_timeseriesMap[name] = counter;
} else {
counter = iterator->second;
}
return counter;
}
// make compiler happy.
return nullptr;
const std::string& name,
const std::vector<ServiceData::StatsType>& types,
const std::vector<std::chrono::seconds>& levels,
int numBuckets) {
return getOrCreateWithArgs(
m_timeseriesMap, name, numBuckets, levels, types);
}
ExportedHistogram* createHistogram(
const std::string& name,
int64_t bucketSize,
int64_t min,
int64_t max,
const std::vector<double>& exportPercentiles) {
SYNCHRONIZED(m_histogramMap) {
ExportedHistogram* histogram;
auto iterator = m_histogramMap.find(name);
if (iterator == m_histogramMap.end()) {
histogram = new ExportedHistogram(bucketSize, min, max,
exportPercentiles);
m_histogramMap[name] = histogram;
} else {
histogram = iterator->second;
}
return histogram;
}
// make compiler happy.
return nullptr;
const std::string& name,
int64_t bucketSize,
int64_t min,
int64_t max,
const std::vector<double>& exportPercentiles) {
return getOrCreateWithArgs(
m_histogramMap, name, bucketSize, min, max, exportPercentiles);
}
void exportAll(std::map<std::string, int64_t>& statsMap) {
// make a copy of the counter map so we can't hold the lock on the map while
// we are exporting individual stats.
hphp_hash_map<std::string, ExportedCounter*> counters;
m_counterMap.copy(&counters);
for (auto iter : counters) {
statsMap.insert(std::make_pair(iter.first, iter.second->getValue()));
for (auto& counter : m_counterMap) {
statsMap.insert(std::make_pair(counter.first,
counter.second->getValue()));
}
// Same idea here. Make a copy first to iterate over so we don't hold the
// lock on the map while we export individual timeseries
hphp_hash_map<std::string, ExportedTimeSeries*> timeseries;
m_timeseriesMap.copy(&timeseries);
for (auto iter : timeseries) {
iter.second->exportAll(iter.first, statsMap);
for (auto& ts : m_timeseriesMap) {
ts.second->exportAll(ts.first, statsMap);
}
// And same here for histograms.
hphp_hash_map<std::string, ExportedHistogram*> histograms;
m_histogramMap.copy(&histograms);
for (auto iter : histograms) {
iter.second->exportAll(iter.first, statsMap);
for (auto& histogram : m_histogramMap) {
histogram.second->exportAll(histogram.first, statsMap);
}
}
@@ -202,13 +231,16 @@ class Impl {
}
}
typedef hphp_hash_map<std::string, ExportedCounter*> ExportedCounterMap;
typedef hphp_hash_map<std::string, ExportedTimeSeries*> ExportedTimeSeriesMap;
typedef hphp_hash_map<std::string, ExportedHistogram*> ExportedHistogramMap;
typedef tbb::concurrent_unordered_map<std::string, ExportedCounter*>
ExportedCounterMap;
typedef tbb::concurrent_unordered_map<std::string, ExportedTimeSeries*>
ExportedTimeSeriesMap;
typedef tbb::concurrent_unordered_map<std::string, ExportedHistogram*>
ExportedHistogramMap;
folly::Synchronized<ExportedCounterMap, folly::RWSpinLock> m_counterMap;
folly::Synchronized<ExportedTimeSeriesMap, folly::RWSpinLock> m_timeseriesMap;
folly::Synchronized<ExportedHistogramMap, folly::RWSpinLock> m_histogramMap;
ExportedCounterMap m_counterMap;
ExportedTimeSeriesMap m_timeseriesMap;
ExportedHistogramMap m_histogramMap;
};
// Implementation note:
@@ -248,20 +280,20 @@ ExportedCounter* createCounter(const std::string& name) {
}
ExportedTimeSeries* createTimeseries(
const std::string& name,
const std::vector<ServiceData::StatsType>& types,
const std::vector<std::chrono::seconds>& levels,
int numBuckets) {
const std::string& name,
const std::vector<ServiceData::StatsType>& types,
const std::vector<std::chrono::seconds>& levels,
int numBuckets) {
return getServiceDataInstance().createTimeseries(
name, types, levels, numBuckets);
}
ExportedHistogram* createHistogram(
const std::string& name,
int64_t bucketSize,
int64_t min,
int64_t max,
const std::vector<double>& exportPercentile) {
const std::string& name,
int64_t bucketSize,
int64_t min,
int64_t max,
const std::vector<double>& exportPercentile) {
return getServiceDataInstance().createHistogram(
name, bucketSize, min, max, exportPercentile);
}
+16 -6
Ver Arquivo
@@ -30,6 +30,7 @@
#include "folly/stats/MultiLevelTimeSeries.h"
namespace HPHP {
///////////////////////////////////////////////////////////////////////////////
/*
@@ -97,6 +98,11 @@ class ExportedCounter;
class ExportedHistogram;
class ExportedTimeSeries;
namespace detail {
template <class ClassWithPrivateDestructor>
class FriendDeleter;
};
enum class StatsType { AVG, SUM, RATE, COUNT, PCT };
/*
@@ -167,6 +173,7 @@ void exportAll(std::map<std::string, int64_t>& statsMap);
// Interface for a flat counter. All methods are thread safe.
class ExportedCounter {
public:
ExportedCounter() : m_value(0) {}
void increment() { m_value.fetch_add(1, std::memory_order_relaxed); }
void decrement() { m_value.fetch_sub(1, std::memory_order_relaxed); }
void setValue(int64_t value) {
@@ -175,9 +182,10 @@ class ExportedCounter {
int64_t getValue() const { return m_value.load(std::memory_order_relaxed); }
private:
std::atomic_int_fast64_t m_value;
friend class detail::FriendDeleter<ExportedCounter>;
~ExportedCounter() {}
~ExportedCounter() = delete;
std::atomic_int_fast64_t m_value;
};
// Interface for timeseries data. All methods are thread safe.
@@ -195,11 +203,12 @@ class ExportedTimeSeries {
std::map<std::string, int64_t>& statsMap);
private:
friend class detail::FriendDeleter<ExportedTimeSeries>;
~ExportedTimeSeries() {}
folly::Synchronized<folly::MultiLevelTimeSeries<int64_t>,
folly::RWSpinLock > m_timeseries;
const std::vector<ServiceData::StatsType> m_exportTypes;
~ExportedTimeSeries() = delete;
};
// Interface for histogram data. All methods are thread safe.
@@ -213,10 +222,11 @@ class ExportedHistogram {
std::map<std::string, int64_t>& statsMap);
private:
friend class detail::FriendDeleter<ExportedHistogram>;
~ExportedHistogram() {}
folly::Synchronized<folly::Histogram<int64_t>, folly::RWSpinLock> m_histogram;
const std::vector<double> m_exportPercentiles;
~ExportedHistogram() = delete;
};
}; // namespace ServiceData
+3 -1
Ver Arquivo
@@ -14,6 +14,7 @@
+----------------------------------------------------------------------+
*/
#include <atomic>
#include "hphp/util/service_data.h"
#include "gtest/gtest.h"
namespace HPHP {
@@ -57,10 +58,11 @@ TEST(ServiceDataTest, CounterTest) {
counter1->setValue(5);
counter2->increment();
counter1->increment();
ServiceData::createCounter("c2")->increment();
{
std::map<std::string, int64_t> values;
ServiceData::exportAll(values);
EXPECT_EQ(7, values["c2"]);
EXPECT_EQ(8, values["c2"]);
}
}