Update folly

Esse commit está contido em:
Sara Golemon
2013-07-01 13:06:55 -07:00
commit 252dfe987c
24 arquivos alterados com 1715 adições e 143 exclusões
+4 -1
Ver Arquivo
@@ -21,4 +21,7 @@ include_directories(${LIBGLOG_INCLUDE_DIR})
find_package(PThread REQUIRED)
include_directories(${LIBPTHREAD_INCLUDE_DIRS})
target_link_libraries(folly ${Boost_LIBRARIES} ${LIBGLOG_LIBRARY} ${LIBPTHREAD_LIBRARIES})
find_package(Libunwind REQUIRED)
include_directories(${LIBUNWIND_INCLUDE_DIR})
target_link_libraries(folly ${Boost_LIBRARIES} ${LIBGLOG_LIBRARY} ${LIBPTHREAD_LIBRARIES} ${LIBUNWIND_LIBRARY})
-11
Ver Arquivo
@@ -22,16 +22,6 @@
// popcnt
#ifndef __POPCNT__
// Clang doesn't support ifuncs. This also allows ifunc support to be explicitly
// passed in as a compile flag.
#ifndef FOLLY_HAVE_IFUNC
# ifdef __clang__
# define FOLLY_HAVE_IFUNC 0
# else
# define FOLLY_HAVE_IFUNC 1
# endif
#endif
namespace {
int popcount_builtin(unsigned int x) {
@@ -102,4 +92,3 @@ int popcountll(unsigned long long x)
} // namespace folly
#endif /* !__POPCNT__ */
+33
Ver Arquivo
@@ -0,0 +1,33 @@
/*
* Copyright 2013 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Wrapper around <chrono> that hides away some gcc 4.6 issues
#ifndef FOLLY_CHRONO_H_
#define FOLLY_CHRONO_H_
#include <chrono>
#include "folly/Portability.h"
// gcc 4.6 uses an obsolete name for steady_clock, although the implementation
// is the same
#if __GNUC_PREREQ(4, 6) && !__GNUC_PREREQ(4, 7)
namespace std { namespace chrono {
typedef monotonic_clock steady_clock;
}} // namespaces
#endif
#endif /* FOLLY_CHRONO_H_ */
+72
Ver Arquivo
@@ -22,6 +22,7 @@
#include "folly/dynamic.h"
namespace folly {
template <typename T> T convertTo(const dynamic&);
template <typename T> dynamic toDynamic(const T&);
}
/**
@@ -51,6 +52,7 @@ namespace dynamicconverter_detail {
BOOST_MPL_HAS_XXX_TRAIT_DEF(value_type);
BOOST_MPL_HAS_XXX_TRAIT_DEF(iterator);
BOOST_MPL_HAS_XXX_TRAIT_DEF(mapped_type);
template <typename T> struct class_is_container {
typedef std::reverse_iterator<T*> some_iterator;
@@ -59,6 +61,12 @@ template <typename T> struct class_is_container {
std::is_constructible<T, some_iterator, some_iterator>::value };
};
template <typename T> struct class_is_range {
enum { value = has_value_type<T>::value &&
has_iterator<T>::value };
};
template <typename T> struct is_container
: std::conditional<
std::is_class<T>::value,
@@ -66,6 +74,19 @@ template <typename T> struct is_container
std::false_type
>::type {};
template <typename T> struct is_range
: std::conditional<
std::is_class<T>::value,
class_is_range<T>,
std::false_type
>::type {};
template <typename T> struct is_associative_container
: std::integral_constant<
bool,
is_range<T>::value && has_mapped_type<T>::value
> {};
} // namespace dynamicconverter_detail
///////////////////////////////////////////////////////////////////////////////
@@ -240,6 +261,52 @@ struct DynamicConverter<C,
throw TypeError("object or array", d.type());
}
}
};
template <typename C, typename Enable = void>
struct DynamicConstructor {
static dynamic construct(const C& x) {
return dynamic(x);
}
};
template<typename C>
struct DynamicConstructor<C,
typename std::enable_if<
dynamicconverter_detail::is_associative_container<C>::value>::type> {
static dynamic construct(const C& x) {
dynamic d = dynamic::object;
for (auto& pair : x) {
d.insert(toDynamic(pair.first), toDynamic(pair.second));
}
return d;
}
};
template<typename C>
struct DynamicConstructor<C,
typename std::enable_if<
!dynamicconverter_detail::is_associative_container<C>::value &&
!std::is_constructible<StringPiece, const C&>::value &&
dynamicconverter_detail::is_range<C>::value>::type> {
static dynamic construct(const C& x) {
dynamic d = {};
for (auto& item : x) {
d.push_back(toDynamic(item));
}
return d;
}
};
template<typename A, typename B>
struct DynamicConstructor<std::pair<A, B>, void> {
static dynamic construct(const std::pair<A, B>& x) {
dynamic d = {};
d.push_back(toDynamic(x.first));
d.push_back(toDynamic(x.second));
return d;
}
};
///////////////////////////////////////////////////////////////////////////////
@@ -250,6 +317,11 @@ T convertTo(const dynamic& d) {
return DynamicConverter<typename std::remove_cv<T>::type>::convert(d);
}
template<typename T>
dynamic toDynamic(const T& x) {
return DynamicConstructor<typename std::remove_cv<T>::type>::construct(x);
}
} // namespace folly
#endif // DYNAMIC_CONVERTER_H
+6
Ver Arquivo
@@ -101,6 +101,10 @@
#include <limits>
#include <type_traits>
// Ignore shadowing warnings within this file, so includers can use -Wshadow.
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wshadow"
#ifdef _LIBSTDCXX_FBSTRING
namespace std _GLIBCXX_VISIBILITY(default) {
_GLIBCXX_BEGIN_NAMESPACE_VERSION
@@ -2323,6 +2327,8 @@ _GLIBCXX_END_NAMESPACE_VERSION
} // namespace folly
#pragma GCC diagnostic pop
#ifndef _LIBSTDCXX_FBSTRING
namespace std {
+6 -1
Ver Arquivo
@@ -36,6 +36,10 @@
#include "folly/small_vector.h"
#include "folly/FormatArg.h"
// Ignore shadowing warnings within this file, so includers can use -Wshadow.
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wshadow"
namespace folly {
// forward declarations
@@ -268,5 +272,6 @@ void formatFormatter(const Formatter<containerMode, Args...>& formatter,
#include "folly/Format-inl.h"
#endif /* FOLLY_FORMAT_H_ */
#pragma GCC diagnostic pop
#endif /* FOLLY_FORMAT_H_ */
+852
Ver Arquivo
@@ -0,0 +1,852 @@
/*
* Copyright 2013 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <algorithm>
#include <atomic>
#include <assert.h>
#include <boost/noncopyable.hpp>
#include <errno.h>
#include <limits>
#include <linux/futex.h>
#include <string.h>
#include <sys/syscall.h>
#include <unistd.h>
#include <folly/Traits.h>
#include <folly/detail/Futex.h>
namespace folly {
namespace detail {
template<typename T, template<typename> class Atom>
class SingleElementQueue;
} // namespace detail
/// MPMCQueue<T> is a high-performance bounded concurrent queue that
/// supports multiple producers, multiple consumers, and optional blocking.
/// The queue has a fixed capacity, for which all memory will be allocated
/// up front. The bulk of the work of enqueuing and dequeuing can be
/// performed in parallel.
///
/// The underlying implementation uses a ticket dispenser for the head and
/// the tail, spreading accesses across N single-element queues to produce
/// a queue with capacity N. The ticket dispensers use atomic increment,
/// which is more robust to contention than a CAS loop. Each of the
/// single-element queues uses its own CAS to serialize access, with an
/// adaptive spin cutoff. When spinning fails on a single-element queue
/// it uses futex()'s _BITSET operations to reduce unnecessary wakeups
/// even if multiple waiters are present on an individual queue (such as
/// when the MPMCQueue's capacity is smaller than the number of enqueuers
/// or dequeuers).
///
/// NOEXCEPT INTERACTION: Ticket-based queues separate the assignment
/// of In benchmarks (contained in tao/queues/ConcurrentQueueTests)
/// it handles 1 to 1, 1 to N, N to 1, and N to M thread counts better
/// than any of the alternatives present in fbcode, for both small (~10)
/// and large capacities. In these benchmarks it is also faster than
/// tbb::concurrent_bounded_queue for all configurations. When there are
/// many more threads than cores, MPMCQueue is _much_ faster than the tbb
/// queue because it uses futex() to block and unblock waiting threads,
/// rather than spinning with sched_yield.
///
/// queue positions from the actual construction of the in-queue elements,
/// which means that the T constructor used during enqueue must not throw
/// an exception. This is enforced at compile time using type traits,
/// which requires that T be adorned with accurate noexcept information.
/// If your type does not use noexcept, you will have to wrap it in
/// something that provides the guarantee. We provide an alternate
/// safe implementation for types that don't use noexcept but that are
/// marked folly::IsRelocatable and boost::has_nothrow_constructor,
/// which is common for folly types. In particular, if you can declare
/// FOLLY_ASSUME_FBVECTOR_COMPATIBLE then your type can be put in
/// MPMCQueue.
template<typename T,
template<typename> class Atom = std::atomic,
typename = typename std::enable_if<
std::is_nothrow_constructible<T,T&&>::value ||
folly::IsRelocatable<T>::value>::type>
class MPMCQueue : boost::noncopyable {
public:
typedef T value_type;
explicit MPMCQueue(size_t capacity)
: capacity_(capacity)
, slots_(new detail::SingleElementQueue<T,Atom>[capacity +
2 * kSlotPadding])
, stride_(computeStride(capacity))
, pushTicket_(0)
, popTicket_(0)
, pushSpinCutoff_(0)
, popSpinCutoff_(0)
{
// ideally this would be a static assert, but g++ doesn't allow it
assert(alignof(MPMCQueue<T,Atom>) >= kFalseSharingRange);
}
/// A default-constructed queue is useful because a usable (non-zero
/// capacity) queue can be moved onto it or swapped with it
MPMCQueue() noexcept
: capacity_(0)
, slots_(nullptr)
, stride_(0)
, pushTicket_(0)
, popTicket_(0)
, pushSpinCutoff_(0)
, popSpinCutoff_(0)
{}
/// IMPORTANT: The move constructor is here to make it easier to perform
/// the initialization phase, it is not safe to use when there are any
/// concurrent accesses (this is not checked).
MPMCQueue(MPMCQueue<T,Atom>&& rhs) noexcept
: capacity_(rhs.capacity_)
, slots_(rhs.slots_)
, stride_(rhs.stride_)
, pushTicket_(rhs.pushTicket_.load(std::memory_order_relaxed))
, popTicket_(rhs.popTicket_.load(std::memory_order_relaxed))
, pushSpinCutoff_(rhs.pushSpinCutoff_.load(std::memory_order_relaxed))
, popSpinCutoff_(rhs.popSpinCutoff_.load(std::memory_order_relaxed))
{
// relaxed ops are okay for the previous reads, since rhs queue can't
// be in concurrent use
// zero out rhs
rhs.capacity_ = 0;
rhs.slots_ = nullptr;
rhs.stride_ = 0;
rhs.pushTicket_.store(0, std::memory_order_relaxed);
rhs.popTicket_.store(0, std::memory_order_relaxed);
rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
rhs.popSpinCutoff_.store(0, std::memory_order_relaxed);
}
/// IMPORTANT: The move operator is here to make it easier to perform
/// the initialization phase, it is not safe to use when there are any
/// concurrent accesses (this is not checked).
MPMCQueue<T,Atom> const& operator= (MPMCQueue<T,Atom>&& rhs) {
if (this != &rhs) {
this->~MPMCQueue();
new (this) MPMCQueue(std::move(rhs));
}
return *this;
}
/// MPMCQueue can only be safely destroyed when there are no
/// pending enqueuers or dequeuers (this is not checked).
~MPMCQueue() {
delete[] slots_;
}
/// Returns the number of successful reads minus the number of successful
/// writes. Waiting blockingRead and blockingWrite calls are included,
/// so this value can be negative.
ssize_t size() const noexcept {
// since both pushes and pops increase monotonically, we can get a
// consistent snapshot either by bracketing a read of popTicket_ with
// two reads of pushTicket_ that return the same value, or the other
// way around. We maximize our chances by alternately attempting
// both bracketings.
uint64_t pushes = pushTicket_.load(std::memory_order_acquire); // A
uint64_t pops = popTicket_.load(std::memory_order_acquire); // B
while (true) {
uint64_t nextPushes = pushTicket_.load(std::memory_order_acquire); // C
if (pushes == nextPushes) {
// pushTicket_ didn't change from A (or the previous C) to C,
// so we can linearize at B (or D)
return pushes - pops;
}
pushes = nextPushes;
uint64_t nextPops = popTicket_.load(std::memory_order_acquire); // D
if (pops == nextPops) {
// popTicket_ didn't chance from B (or the previous D), so we
// can linearize at C
return pushes - pops;
}
pops = nextPops;
}
}
/// Returns true if there are no items available for dequeue
bool isEmpty() const noexcept {
return size() <= 0;
}
/// Returns true if there is currently no empty space to enqueue
bool isFull() const noexcept {
// careful with signed -> unsigned promotion, since size can be negative
return size() >= static_cast<ssize_t>(capacity_);
}
/// Returns is a guess at size() for contexts that don't need a precise
/// value, such as stats.
uint64_t sizeGuess() const noexcept {
return writeCount() - readCount();
}
/// Doesn't change
size_t capacity() const noexcept {
return capacity_;
}
/// Returns the total number of calls to blockingWrite or successful
/// calls to write, including those blockingWrite calls that are
/// currently blocking
uint64_t writeCount() const noexcept {
return pushTicket_.load(std::memory_order_acquire);
}
/// Returns the total number of calls to blockingRead or successful
/// calls to read, including those blockingRead calls that are currently
/// blocking
uint64_t readCount() const noexcept {
return popTicket_.load(std::memory_order_acquire);
}
/// Enqueues a T constructed from args, blocking until space is
/// available. Note that this method signature allows enqueue via
/// move, if args is a T rvalue, via copy, if args is a T lvalue, or
/// via emplacement if args is an initializer list that can be passed
/// to a T constructor.
template <typename ...Args>
void blockingWrite(Args&&... args) noexcept {
enqueueWithTicket(pushTicket_++, std::forward<Args>(args)...);
}
/// If an item can be enqueued with no blocking, does so and returns
/// true, otherwise returns false. This method is similar to
/// writeIfNotFull, but if you don't have a specific need for that
/// method you should use this one.
///
/// One of the common usages of this method is to enqueue via the
/// move constructor, something like q.write(std::move(x)). If write
/// returns false because the queue is full then x has not actually been
/// consumed, which looks strange. To understand why it is actually okay
/// to use x afterward, remember that std::move is just a typecast that
/// provides an rvalue reference that enables use of a move constructor
/// or operator. std::move doesn't actually move anything. It could
/// more accurately be called std::rvalue_cast or std::move_permission.
template <typename ...Args>
bool write(Args&&... args) noexcept {
uint64_t ticket;
if (tryObtainReadyPushTicket(ticket)) {
// we have pre-validated that the ticket won't block
enqueueWithTicket(ticket, std::forward<Args>(args)...);
return true;
} else {
return false;
}
}
/// If the queue is not full, enqueues and returns true, otherwise
/// returns false. Unlike write this method can be blocked by another
/// thread, specifically a read that has linearized (been assigned
/// a ticket) but not yet completed. If you don't really need this
/// function you should probably use write.
///
/// MPMCQueue isn't lock-free, so just because a read operation has
/// linearized (and isFull is false) doesn't mean that space has been
/// made available for another write. In this situation write will
/// return false, but writeIfNotFull will wait for the dequeue to finish.
/// This method is required if you are composing queues and managing
/// your own wakeup, because it guarantees that after every successful
/// write a readIfNotFull will succeed.
template <typename ...Args>
bool writeIfNotFull(Args&&... args) noexcept {
uint64_t ticket;
if (tryObtainPromisedPushTicket(ticket)) {
// some other thread is already dequeuing the slot into which we
// are going to enqueue, but we might have to wait for them to finish
enqueueWithTicket(ticket, std::forward<Args>(args)...);
return true;
} else {
return false;
}
}
/// Moves a dequeued element onto elem, blocking until an element
/// is available
void blockingRead(T& elem) noexcept {
dequeueWithTicket(popTicket_++, elem);
}
/// If an item can be dequeued with no blocking, does so and returns
/// true, otherwise returns false.
bool read(T& elem) noexcept {
uint64_t ticket;
if (tryObtainReadyPopTicket(ticket)) {
// the ticket has been pre-validated to not block
dequeueWithTicket(ticket, elem);
return true;
} else {
return false;
}
}
/// If the queue is not empty, dequeues and returns true, otherwise
/// returns false. If the matching write is still in progress then this
/// method may block waiting for it. If you don't rely on being able
/// to dequeue (such as by counting completed write) then you should
/// prefer read.
bool readIfNotEmpty(T& elem) noexcept {
uint64_t ticket;
if (tryObtainPromisedPopTicket(ticket)) {
// the matching enqueue already has a ticket, but might not be done
dequeueWithTicket(ticket, elem);
return true;
} else {
return false;
}
}
private:
enum {
/// Once every kAdaptationFreq we will spin longer, to try to estimate
/// the proper spin backoff
kAdaptationFreq = 128,
/// Memory locations on the same cache line are subject to false
/// sharing, which is very bad for performance
kFalseSharingRange = 64,
/// To avoid false sharing in slots_ with neighboring memory
/// allocations, we pad it with this many SingleElementQueue-s at
/// each end
kSlotPadding = 1 +
(kFalseSharingRange - 1) / sizeof(detail::SingleElementQueue<T,Atom>)
};
#define FOLLY_ON_NEXT_CACHE_LINE __attribute__((aligned(kFalseSharingRange)))
/// The maximum number of items in the queue at once
size_t capacity_ FOLLY_ON_NEXT_CACHE_LINE;
/// An array of capacity_ SingleElementQueue-s, each of which holds
/// either 0 or 1 item. We over-allocate by 2 * kSlotPadding and don't
/// touch the slots at either end, to avoid false sharing
detail::SingleElementQueue<T,Atom>* slots_;
/// The number of slots_ indices that we advance for each ticket, to
/// avoid false sharing. Ideally slots_[i] and slots_[i + stride_]
/// aren't on the same cache line
int stride_;
/// Enqueuers get tickets from here
Atom<uint64_t> pushTicket_ FOLLY_ON_NEXT_CACHE_LINE;
/// Dequeuers get tickets from here
Atom<uint64_t> popTicket_ FOLLY_ON_NEXT_CACHE_LINE;
/// This is how many times we will spin before using FUTEX_WAIT when
/// the queue is full on enqueue, adaptively computed by occasionally
/// spinning for longer and smoothing with an exponential moving average
Atom<int> pushSpinCutoff_ FOLLY_ON_NEXT_CACHE_LINE;
/// The adaptive spin cutoff when the queue is empty on dequeue
Atom<int> popSpinCutoff_ FOLLY_ON_NEXT_CACHE_LINE;
/// Alignment doesn't avoid false sharing at the end of the struct,
/// so fill out the last cache line
char padding_[kFalseSharingRange - sizeof(Atom<int>)];
#undef FOLLY_ON_NEXT_CACHE_LINE
/// We assign tickets in increasing order, but we don't want to
/// access neighboring elements of slots_ because that will lead to
/// false sharing (multiple cores accessing the same cache line even
/// though they aren't accessing the same bytes in that cache line).
/// To avoid this we advance by stride slots per ticket.
///
/// We need gcd(capacity, stride) to be 1 so that we will use all
/// of the slots. We ensure this by only considering prime strides,
/// which either have no common divisors with capacity or else have
/// a zero remainder after dividing by capacity. That is sufficient
/// to guarantee correctness, but we also want to actually spread the
/// accesses away from each other to avoid false sharing (consider a
/// stride of 7 with a capacity of 8). To that end we try a few taking
/// care to observe that advancing by -1 is as bad as advancing by 1
/// when in comes to false sharing.
///
/// The simple way to avoid false sharing would be to pad each
/// SingleElementQueue, but since we have capacity_ of them that could
/// waste a lot of space.
static int computeStride(size_t capacity) noexcept {
static const int smallPrimes[] = { 2, 3, 5, 7, 11, 13, 17, 19, 23 };
int bestStride = 1;
size_t bestSep = 1;
for (int stride : smallPrimes) {
if ((stride % capacity) == 0 || (capacity % stride) == 0) {
continue;
}
size_t sep = stride % capacity;
sep = std::min(sep, capacity - sep);
if (sep > bestSep) {
bestStride = stride;
bestSep = sep;
}
}
return bestStride;
}
/// Returns the index into slots_ that should be used when enqueuing or
/// dequeuing with the specified ticket
size_t idx(uint64_t ticket) noexcept {
return ((ticket * stride_) % capacity_) + kSlotPadding;
}
/// Maps an enqueue or dequeue ticket to the turn should be used at the
/// corresponding SingleElementQueue
uint32_t turn(uint64_t ticket) noexcept {
return ticket / capacity_;
}
/// Tries to obtain a push ticket for which SingleElementQueue::enqueue
/// won't block. Returns true on immediate success, false on immediate
/// failure.
bool tryObtainReadyPushTicket(uint64_t& rv) noexcept {
auto ticket = pushTicket_.load(std::memory_order_acquire); // A
while (true) {
if (!slots_[idx(ticket)].mayEnqueue(turn(ticket))) {
// if we call enqueue(ticket, ...) on the SingleElementQueue
// right now it would block, but this might no longer be the next
// ticket. We can increase the chance of tryEnqueue success under
// contention (without blocking) by rechecking the ticket dispenser
auto prev = ticket;
ticket = pushTicket_.load(std::memory_order_acquire); // B
if (prev == ticket) {
// mayEnqueue was bracketed by two reads (A or prev B or prev
// failing CAS to B), so we are definitely unable to enqueue
return false;
}
} else {
// we will bracket the mayEnqueue check with a read (A or prev B
// or prev failing CAS) and the following CAS. If the CAS fails
// it will effect a load of pushTicket_
if (pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
rv = ticket;
return true;
}
}
}
}
/// Tries to obtain a push ticket which can be satisfied if all
/// in-progress pops complete. This function does not block, but
/// blocking may be required when using the returned ticket if some
/// other thread's pop is still in progress (ticket has been granted but
/// pop has not yet completed).
bool tryObtainPromisedPushTicket(uint64_t& rv) noexcept {
auto numPushes = pushTicket_.load(std::memory_order_acquire); // A
while (true) {
auto numPops = popTicket_.load(std::memory_order_acquire); // B
// n will be negative if pops are pending
int64_t n = numPushes - numPops;
if (n >= static_cast<ssize_t>(capacity_)) {
// Full, linearize at B. We don't need to recheck the read we
// performed at A, because if numPushes was stale at B then the
// real numPushes value is even worse
return false;
}
if (pushTicket_.compare_exchange_strong(numPushes, numPushes + 1)) {
rv = numPushes;
return true;
}
}
}
/// Tries to obtain a pop ticket for which SingleElementQueue::dequeue
/// won't block. Returns true on immediate success, false on immediate
/// failure.
bool tryObtainReadyPopTicket(uint64_t& rv) noexcept {
auto ticket = popTicket_.load(std::memory_order_acquire);
while (true) {
if (!slots_[idx(ticket)].mayDequeue(turn(ticket))) {
auto prev = ticket;
ticket = popTicket_.load(std::memory_order_acquire);
if (prev == ticket) {
return false;
}
} else {
if (popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
rv = ticket;
return true;
}
}
}
}
/// Similar to tryObtainReadyPopTicket, but returns a pop ticket whose
/// corresponding push ticket has already been handed out, rather than
/// returning one whose corresponding push ticket has already been
/// completed. This means that there is a possibility that the caller
/// will block when using the ticket, but it allows the user to rely on
/// the fact that if enqueue has succeeded, tryObtainPromisedPopTicket
/// will return true. The "try" part of this is that we won't have
/// to block waiting for someone to call enqueue, although we might
/// have to block waiting for them to finish executing code inside the
/// MPMCQueue itself.
bool tryObtainPromisedPopTicket(uint64_t& rv) noexcept {
auto numPops = popTicket_.load(std::memory_order_acquire); // A
while (true) {
auto numPushes = pushTicket_.load(std::memory_order_acquire); // B
if (numPops >= numPushes) {
// Empty, or empty with pending pops. Linearize at B. We don't
// need to recheck the read we performed at A, because if numPops
// is stale then the fresh value is larger and the >= is still true
return false;
}
if (popTicket_.compare_exchange_strong(numPops, numPops + 1)) {
rv = numPops;
return true;
}
}
}
// Given a ticket, constructs an enqueued item using args
template <typename ...Args>
void enqueueWithTicket(uint64_t ticket, Args&&... args) noexcept {
slots_[idx(ticket)].enqueue(turn(ticket),
pushSpinCutoff_,
(ticket % kAdaptationFreq) == 0,
std::forward<Args>(args)...);
}
// Given a ticket, dequeues the corresponding element
void dequeueWithTicket(uint64_t ticket, T& elem) noexcept {
slots_[idx(ticket)].dequeue(turn(ticket),
popSpinCutoff_,
(ticket % kAdaptationFreq) == 0,
elem);
}
};
namespace detail {
/// A TurnSequencer allows threads to order their execution according to
/// a monotonically increasing (with wraparound) "turn" value. The two
/// operations provided are to wait for turn T, and to move to the next
/// turn. Every thread that is waiting for T must have arrived before
/// that turn is marked completed (for MPMCQueue only one thread waits
/// for any particular turn, so this is trivially true).
///
/// TurnSequencer's state_ holds 26 bits of the current turn (shifted
/// left by 6), along with a 6 bit saturating value that records the
/// maximum waiter minus the current turn. Wraparound of the turn space
/// is expected and handled. This allows us to atomically adjust the
/// number of outstanding waiters when we perform a FUTEX_WAKE operation.
/// Compare this strategy to sem_t's separate num_waiters field, which
/// isn't decremented until after the waiting thread gets scheduled,
/// during which time more enqueues might have occurred and made pointless
/// FUTEX_WAKE calls.
///
/// TurnSequencer uses futex() directly. It is optimized for the
/// case that the highest awaited turn is 32 or less higher than the
/// current turn. We use the FUTEX_WAIT_BITSET variant, which lets
/// us embed 32 separate wakeup channels in a single futex. See
/// http://locklessinc.com/articles/futex_cheat_sheet for a description.
///
/// We only need to keep exact track of the delta between the current
/// turn and the maximum waiter for the 32 turns that follow the current
/// one, because waiters at turn t+32 will be awoken at turn t. At that
/// point they can then adjust the delta using the higher base. Since we
/// need to encode waiter deltas of 0 to 32 inclusive, we use 6 bits.
/// We actually store waiter deltas up to 63, since that might reduce
/// the number of CAS operations a tiny bit.
///
/// To avoid some futex() calls entirely, TurnSequencer uses an adaptive
/// spin cutoff before waiting. The overheads (and convergence rate)
/// of separately tracking the spin cutoff for each TurnSequencer would
/// be prohibitive, so the actual storage is passed in as a parameter and
/// updated atomically. This also lets the caller use different adaptive
/// cutoffs for different operations (read versus write, for example).
/// To avoid contention, the spin cutoff is only updated when requested
/// by the caller.
template <template<typename> class Atom>
struct TurnSequencer {
explicit TurnSequencer(const uint32_t firstTurn = 0) noexcept
: state_(encode(firstTurn << kTurnShift, 0))
{}
/// Returns true iff a call to waitForTurn(turn, ...) won't block
bool isTurn(const uint32_t turn) const noexcept {
auto state = state_.load(std::memory_order_acquire);
return decodeCurrentSturn(state) == (turn << kTurnShift);
}
// Internally we always work with shifted turn values, which makes the
// truncation and wraparound work correctly. This leaves us bits at
// the bottom to store the number of waiters. We call shifted turns
// "sturns" inside this class.
/// Blocks the current thread until turn has arrived. If
/// updateSpinCutoff is true then this will spin for up to kMaxSpins tries
/// before blocking and will adjust spinCutoff based on the results,
/// otherwise it will spin for at most spinCutoff spins.
void waitForTurn(const uint32_t turn,
Atom<int>& spinCutoff,
const bool updateSpinCutoff) noexcept {
int prevThresh = spinCutoff.load(std::memory_order_relaxed);
const int effectiveSpinCutoff =
updateSpinCutoff || prevThresh == 0 ? kMaxSpins : prevThresh;
int tries;
const uint32_t sturn = turn << kTurnShift;
for (tries = 0; ; ++tries) {
uint32_t state = state_.load(std::memory_order_acquire);
uint32_t current_sturn = decodeCurrentSturn(state);
if (current_sturn == sturn) {
break;
}
// wrap-safe version of assert(current_sturn < sturn)
assert(sturn - current_sturn < std::numeric_limits<uint32_t>::max() / 2);
// the first effectSpinCutoff tries are spins, after that we will
// record ourself as a waiter and block with futexWait
if (tries < effectiveSpinCutoff) {
asm volatile ("pause");
continue;
}
uint32_t current_max_waiter_delta = decodeMaxWaitersDelta(state);
uint32_t our_waiter_delta = (sturn - current_sturn) >> kTurnShift;
uint32_t new_state;
if (our_waiter_delta <= current_max_waiter_delta) {
// state already records us as waiters, probably because this
// isn't our first time around this loop
new_state = state;
} else {
new_state = encode(current_sturn, our_waiter_delta);
if (state != new_state &&
!state_.compare_exchange_strong(state, new_state)) {
continue;
}
}
state_.futexWait(new_state, futexChannel(turn));
}
if (updateSpinCutoff || prevThresh == 0) {
// if we hit kMaxSpins then spinning was pointless, so the right
// spinCutoff is kMinSpins
int target;
if (tries >= kMaxSpins) {
target = kMinSpins;
} else {
// to account for variations, we allow ourself to spin 2*N when
// we think that N is actually required in order to succeed
target = std::min(int{kMaxSpins}, std::max(int{kMinSpins}, tries * 2));
}
if (prevThresh == 0) {
// bootstrap
spinCutoff = target;
} else {
// try once, keep moving if CAS fails. Exponential moving average
// with alpha of 7/8
spinCutoff.compare_exchange_weak(
prevThresh, prevThresh + (target - prevThresh) / 8);
}
}
}
/// Unblocks a thread running waitForTurn(turn + 1)
void completeTurn(const uint32_t turn) noexcept {
uint32_t state = state_.load(std::memory_order_acquire);
while (true) {
assert(state == encode(turn << kTurnShift, decodeMaxWaitersDelta(state)));
uint32_t max_waiter_delta = decodeMaxWaitersDelta(state);
uint32_t new_state = encode(
(turn + 1) << kTurnShift,
max_waiter_delta == 0 ? 0 : max_waiter_delta - 1);
if (state_.compare_exchange_strong(state, new_state)) {
if (max_waiter_delta != 0) {
state_.futexWake(std::numeric_limits<int>::max(),
futexChannel(turn + 1));
}
break;
}
// failing compare_exchange_strong updates first arg to the value
// that caused the failure, so no need to reread state_
}
}
/// Returns the least-most significant byte of the current uncompleted
/// turn. The full 32 bit turn cannot be recovered.
uint8_t uncompletedTurnLSB() const noexcept {
return state_.load(std::memory_order_acquire) >> kTurnShift;
}
private:
enum : uint32_t {
/// kTurnShift counts the bits that are stolen to record the delta
/// between the current turn and the maximum waiter. It needs to be big
/// enough to record wait deltas of 0 to 32 inclusive. Waiters more
/// than 32 in the future will be woken up 32*n turns early (since
/// their BITSET will hit) and will adjust the waiter count again.
/// We go a bit beyond and let the waiter count go up to 63, which
/// is free and might save us a few CAS
kTurnShift = 6,
kWaitersMask = (1 << kTurnShift) - 1,
/// The minimum spin count that we will adaptively select
kMinSpins = 20,
/// The maximum spin count that we will adaptively select, and the
/// spin count that will be used when probing to get a new data point
/// for the adaptation
kMaxSpins = 2000,
};
/// This holds both the current turn, and the highest waiting turn,
/// stored as (current_turn << 6) | min(63, max(waited_turn - current_turn))
Futex<Atom> state_;
/// Returns the bitmask to pass futexWait or futexWake when communicating
/// about the specified turn
int futexChannel(uint32_t turn) const noexcept {
return 1 << (turn & 31);
}
uint32_t decodeCurrentSturn(uint32_t state) const noexcept {
return state & ~kWaitersMask;
}
uint32_t decodeMaxWaitersDelta(uint32_t state) const noexcept {
return state & kWaitersMask;
}
uint32_t encode(uint32_t currentSturn, uint32_t maxWaiterD) const noexcept {
return currentSturn | std::min(uint32_t{ kWaitersMask }, maxWaiterD);
}
};
/// SingleElementQueue implements a blocking queue that holds at most one
/// item, and that requires its users to assign incrementing identifiers
/// (turns) to each enqueue and dequeue operation. Note that the turns
/// used by SingleElementQueue are doubled inside the TurnSequencer
template <typename T, template <typename> class Atom>
struct SingleElementQueue {
~SingleElementQueue() noexcept {
if ((sequencer_.uncompletedTurnLSB() & 1) == 1) {
// we are pending a dequeue, so we have a constructed item
destroyContents();
}
}
/// enqueue using in-place noexcept construction
template <typename ...Args,
typename = typename std::enable_if<
std::is_nothrow_constructible<T,Args...>::value>::type>
void enqueue(const uint32_t turn,
Atom<int>& spinCutoff,
const bool updateSpinCutoff,
Args&&... args) noexcept {
sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
new (contents_) T(std::forward<Args>(args)...);
sequencer_.completeTurn(turn * 2);
}
/// enqueue using move construction, either real (if
/// is_nothrow_move_constructible) or simulated using relocation and
/// default construction (if IsRelocatable and has_nothrow_constructor)
template <typename = typename std::enable_if<
(folly::IsRelocatable<T>::value &&
boost::has_nothrow_constructor<T>::value) ||
std::is_nothrow_constructible<T,T&&>::value>::type>
void enqueue(const uint32_t turn,
Atom<int>& spinCutoff,
const bool updateSpinCutoff,
T&& goner) noexcept {
if (std::is_nothrow_constructible<T,T&&>::value) {
// this is preferred
sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
new (contents_) T(std::move(goner));
sequencer_.completeTurn(turn * 2);
} else {
// simulate nothrow move with relocation, followed by default
// construction to fill the gap we created
sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
memcpy(contents_, &goner, sizeof(T));
sequencer_.completeTurn(turn * 2);
new (&goner) T();
}
}
bool mayEnqueue(const uint32_t turn) const noexcept {
return sequencer_.isTurn(turn * 2);
}
void dequeue(uint32_t turn,
Atom<int>& spinCutoff,
const bool updateSpinCutoff,
T& elem) noexcept {
if (folly::IsRelocatable<T>::value) {
// this version is preferred, because we do as much work as possible
// before waiting
try {
elem.~T();
} catch (...) {
// unlikely, but if we don't complete our turn the queue will die
}
sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff);
memcpy(&elem, contents_, sizeof(T));
sequencer_.completeTurn(turn * 2 + 1);
} else {
// use nothrow move assignment
sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff);
elem = std::move(*ptr());
destroyContents();
sequencer_.completeTurn(turn * 2 + 1);
}
}
bool mayDequeue(const uint32_t turn) const noexcept {
return sequencer_.isTurn(turn * 2 + 1);
}
private:
/// Storage for a T constructed with placement new
char contents_[sizeof(T)] __attribute__((aligned(alignof(T))));
/// Even turns are pushes, odd turns are pops
TurnSequencer<Atom> sequencer_;
T* ptr() noexcept {
return static_cast<T*>(static_cast<void*>(contents_));
}
void destroyContents() noexcept {
try {
ptr()->~T();
} catch (...) {
// g++ doesn't seem to have std::is_nothrow_destructible yet
}
#ifndef NDEBUG
memset(contents_, 'Q', sizeof(T));
#endif
}
};
} // namespace detail
} // namespace folly
+7 -6
Ver Arquivo
@@ -65,14 +65,15 @@ static_assert(kMinPageSize >= 16,
#define PAGE_FOR(addr) \
(reinterpret_cast<uintptr_t>(addr) / kMinPageSize)
inline size_t nextAlignedIndex(const char* arr) {
auto firstPossible = reinterpret_cast<uintptr_t>(arr) + 1;
return 1 + // add 1 because the index starts at 'arr'
((firstPossible + 15) & ~0xF) // round up to next multiple of 16
- firstPossible;
}
#if FOLLY_HAVE_EMMINTRIN_H
inline size_t nextAlignedIndex(const char* arr) {
auto firstPossible = reinterpret_cast<uintptr_t>(arr) + 1;
return 1 + // add 1 because the index starts at 'arr'
((firstPossible + 15) & ~0xF) // round up to next multiple of 16
- firstPossible;
}
// build sse4.2-optimized version even if -msse4.2 is not passed to GCC
size_t qfind_first_byte_of_needles16(const StringPiece& haystack,
const StringPiece& needles)
+47 -1
Ver Arquivo
@@ -35,6 +35,10 @@
#include "folly/Traits.h"
#include "folly/Likely.h"
// Ignore shadowing warnings within this file, so includers can use -Wshadow.
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wshadow"
namespace folly {
template <class T> class Range;
@@ -58,6 +62,15 @@ template <class T>
size_t qfind(const Range<T> & haystack,
const typename Range<T>::value_type& needle);
/**
* Finds the last occurrence of needle in haystack. The result is the
* offset reported to the beginning of haystack, or string::npos if
* needle wasn't found.
*/
template <class T>
size_t rfind(const Range<T> & haystack,
const typename Range<T>::value_type& needle);
/**
* Finds the first occurrence of any element of needle in
@@ -119,7 +132,8 @@ public:
typename std::iterator_traits<Iter>::reference>::type
value_type;
typedef typename std::iterator_traits<Iter>::reference reference;
typedef std::char_traits<value_type> traits_type;
typedef std::char_traits<typename std::remove_const<value_type>::type>
traits_type;
static const size_type npos;
@@ -387,6 +401,10 @@ public:
return qfind(*this, c);
}
size_type rfind(value_type c) const {
return folly::rfind(*this, c);
}
size_type find(value_type c, size_t pos) const {
if (pos > size()) return std::string::npos;
size_type ret = qfind(subpiece(pos), c);
@@ -668,6 +686,17 @@ size_t qfind(const Range<T>& haystack,
return pos == haystack.end() ? std::string::npos : pos - haystack.data();
}
template <class T>
size_t rfind(const Range<T>& haystack,
const typename Range<T>::value_type& needle) {
for (auto i = haystack.size(); i-- > 0; ) {
if (haystack[i] == needle) {
return i;
}
}
return std::string::npos;
}
// specialization for StringPiece
template <>
inline size_t qfind(const Range<const char*>& haystack, const char& needle) {
@@ -676,6 +705,13 @@ inline size_t qfind(const Range<const char*>& haystack, const char& needle) {
return pos == nullptr ? std::string::npos : pos - haystack.data();
}
template <>
inline size_t rfind(const Range<const char*>& haystack, const char& needle) {
auto pos = static_cast<const char*>(
::memrchr(haystack.data(), needle, haystack.size()));
return pos == nullptr ? std::string::npos : pos - haystack.data();
}
// specialization for ByteRange
template <>
inline size_t qfind(const Range<const unsigned char*>& haystack,
@@ -685,6 +721,14 @@ inline size_t qfind(const Range<const unsigned char*>& haystack,
return pos == nullptr ? std::string::npos : pos - haystack.data();
}
template <>
inline size_t rfind(const Range<const unsigned char*>& haystack,
const unsigned char& needle) {
auto pos = static_cast<const unsigned char*>(
::memrchr(haystack.data(), needle, haystack.size()));
return pos == nullptr ? std::string::npos : pos - haystack.data();
}
template <class T>
size_t qfind_first_of(const Range<T>& haystack,
const Range<T>& needles) {
@@ -707,6 +751,8 @@ inline size_t qfind_first_of(const Range<const unsigned char*>& haystack,
}
} // !namespace folly
#pragma GCC diagnostic pop
FOLLY_ASSUME_FBVECTOR_COMPATIBLE_1(folly::Range);
#endif // FOLLY_RANGE_H_
+5 -2
Ver Arquivo
@@ -241,12 +241,15 @@ fbstring errnoStr(int err) {
fbstring result;
// https://developer.apple.com/library/mac/documentation/Darwin/Reference/ManPages/man3/strerror_r.3.html
// http://www.kernel.org/doc/man-pages/online/pages/man3/strerror.3.html
#if (_POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600) && !_GNU_SOURCE
#if defined(__APPLE__) || defined(__FreeBSD__) || \
((_POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600) && !_GNU_SOURCE)
// Using XSI-compatible strerror_r
int r = strerror_r(err, buf, sizeof(buf));
if (r == -1) {
// OSX/FreeBSD use EINVAL and Linux uses -1 so just check for non-zero
if (r != 0) {
result = to<fbstring>(
"Unknown error ", err,
" (strerror_r failed with error ", errno, ")");
+18 -12
Ver Arquivo
@@ -617,12 +617,11 @@ bool discardRead(int fd) {
} // namespace
std::pair<std::string, std::string> Subprocess::communicate(
const CommunicateFlags& flags,
StringPiece data) {
IOBufQueue dataQueue;
dataQueue.wrapBuffer(data.data(), data.size());
StringPiece input) {
IOBufQueue inputQueue;
inputQueue.wrapBuffer(input.data(), input.size());
auto outQueues = communicateIOBuf(flags, std::move(dataQueue));
auto outQueues = communicateIOBuf(std::move(inputQueue));
auto outBufs = std::make_pair(outQueues.first.move(),
outQueues.second.move());
std::pair<std::string, std::string> out;
@@ -640,14 +639,21 @@ std::pair<std::string, std::string> Subprocess::communicate(
}
std::pair<IOBufQueue, IOBufQueue> Subprocess::communicateIOBuf(
const CommunicateFlags& flags,
IOBufQueue data) {
IOBufQueue input) {
// If the user supplied a non-empty input buffer, make sure
// that stdin is a pipe so we can write the data.
if (!input.empty()) {
// findByChildFd() will throw std::invalid_argument if no pipe for
// STDIN_FILENO exists
findByChildFd(STDIN_FILENO);
}
std::pair<IOBufQueue, IOBufQueue> out;
auto readCallback = [&] (int pfd, int cfd) -> bool {
if (cfd == 1 && flags.readStdout_) {
if (cfd == STDOUT_FILENO) {
return handleRead(pfd, out.first);
} else if (cfd == 2 && flags.readStderr_) {
} else if (cfd == STDERR_FILENO) {
return handleRead(pfd, out.second);
} else {
// Don't close the file descriptor, the child might not like SIGPIPE,
@@ -657,11 +663,11 @@ std::pair<IOBufQueue, IOBufQueue> Subprocess::communicateIOBuf(
};
auto writeCallback = [&] (int pfd, int cfd) -> bool {
if (cfd == 0 && flags.writeStdin_) {
return handleWrite(pfd, data);
if (cfd == STDIN_FILENO) {
return handleWrite(pfd, input);
} else {
// If we don't want to write to this fd, just close it.
return false;
return true;
}
};
+19 -52
Ver Arquivo
@@ -224,19 +224,23 @@ class Subprocess : private boost::noncopyable {
/**
* Shortcut to change the action for standard input.
*/
Options& stdin(int action) { return fd(0, action); }
Options& stdin(int action) { return fd(STDIN_FILENO, action); }
/**
* Shortcut to change the action for standard output.
*/
Options& stdout(int action) { return fd(1, action); }
Options& stdout(int action) { return fd(STDOUT_FILENO, action); }
/**
* Shortcut to change the action for standard error.
* Note that stderr(1) will redirect the standard error to the same
* file descriptor as standard output; the equivalent of bash's "2>&1"
*/
Options& stderr(int action) { return fd(2, action); }
Options& stderr(int action) { return fd(STDERR_FILENO, action); }
Options& pipeStdin() { return fd(STDIN_FILENO, PIPE_IN); }
Options& pipeStdout() { return fd(STDOUT_FILENO, PIPE_OUT); }
Options& pipeStderr() { return fd(STDERR_FILENO, PIPE_OUT); }
/**
* Close all other fds (other than standard input, output, error,
@@ -310,19 +314,19 @@ class Subprocess : private boost::noncopyable {
const std::vector<std::string>* env = nullptr);
/**
* Append all data, close the stdin (to-child) fd, and read all data,
* except that this is done in a safe manner to prevent deadlocking.
* Communicate with the child until all pipes to/from the child are closed.
*
* If writeStdin() is given in flags, the process must have been opened with
* stdinFd=PIPE.
* The input buffer is written to the process' stdin pipe, and data is read
* from the stdout and stderr pipes. Non-blocking I/O is performed on all
* pipes simultaneously to avoid deadlocks.
*
* If readStdout() is given in flags, the first returned value will be the
* value read from the child's stdout; the child must have been opened with
* stdoutFd=PIPE.
* The stdin pipe will be closed after the full input buffer has been written.
* An error will be thrown if a non-empty input buffer is supplied but stdin
* was not configured as a pipe.
*
* If readStderr() is given in flags, the second returned value will be the
* value read from the child's stderr; the child must have been opened with
* stderrFd=PIPE.
* Returns a pair of buffers containing the data read from stdout and stderr.
* If stdout or stderr is not a pipe, an empty IOBuf queue will be returned
* for the respective buffer.
*
* Note that communicate() returns when all pipes to/from the child are
* closed; the child might stay alive after that, so you must still wait().
@@ -331,39 +335,11 @@ class Subprocess : private boost::noncopyable {
* that it won't try to allocate all data at once). communicate
* uses strings for simplicity.
*/
class CommunicateFlags : private boost::orable<CommunicateFlags> {
friend class Subprocess;
public:
CommunicateFlags()
: writeStdin_(false), readStdout_(false), readStderr_(false) { }
CommunicateFlags& writeStdin() { writeStdin_ = true; return *this; }
CommunicateFlags& readStdout() { readStdout_ = true; return *this; }
CommunicateFlags& readStderr() { readStderr_ = true; return *this; }
CommunicateFlags& operator|=(const CommunicateFlags& other);
private:
bool writeStdin_;
bool readStdout_;
bool readStderr_;
};
static CommunicateFlags writeStdin() {
return CommunicateFlags().writeStdin();
}
static CommunicateFlags readStdout() {
return CommunicateFlags().readStdout();
}
static CommunicateFlags readStderr() {
return CommunicateFlags().readStderr();
}
std::pair<IOBufQueue, IOBufQueue> communicateIOBuf(
const CommunicateFlags& flags = readStdout(),
IOBufQueue data = IOBufQueue());
IOBufQueue input = IOBufQueue());
std::pair<std::string, std::string> communicate(
const CommunicateFlags& flags = readStdout(),
StringPiece data = StringPiece());
StringPiece input = StringPiece());
/**
* Communicate with the child until all pipes to/from the child are closed.
@@ -546,15 +522,6 @@ inline Subprocess::Options& Subprocess::Options::operator|=(
return *this;
}
inline Subprocess::CommunicateFlags& Subprocess::CommunicateFlags::operator|=(
const Subprocess::CommunicateFlags& other) {
if (this == &other) return *this;
writeStdin_ |= other.writeStdin_;
readStdout_ |= other.readStdout_;
readStderr_ |= other.readStderr_;
return *this;
}
} // namespace folly
#endif /* FOLLY_SUBPROCESS_H_ */
+18 -18
Ver Arquivo
@@ -332,7 +332,7 @@ template <typename RHS, RHS rhs, typename LHS>
bool less_than_impl(
typename std::enable_if<
(rhs <= std::numeric_limits<LHS>::max()
&& rhs >= std::numeric_limits<LHS>::min()),
&& rhs > std::numeric_limits<LHS>::min()),
LHS
>::type const lhs
) {
@@ -352,39 +352,39 @@ bool less_than_impl(
template <typename RHS, RHS rhs, typename LHS>
bool less_than_impl(
typename std::enable_if<
(rhs < std::numeric_limits<LHS>::min()),
(rhs <= std::numeric_limits<LHS>::min()),
LHS
>::type const
) {
return false;
}
template <typename LHS, LHS lhs, typename RHS>
template <typename RHS, RHS rhs, typename LHS>
bool greater_than_impl(
typename std::enable_if<
(lhs <= std::numeric_limits<RHS>::max()
&& lhs >= std::numeric_limits<RHS>::min()),
RHS
>::type const rhs
(rhs <= std::numeric_limits<LHS>::max()
&& rhs >= std::numeric_limits<LHS>::min()),
LHS
>::type const lhs
) {
return lhs < rhs;
return lhs > rhs;
}
template <typename LHS, LHS lhs, typename RHS>
template <typename RHS, RHS rhs, typename LHS>
bool greater_than_impl(
typename std::enable_if<
(lhs > std::numeric_limits<RHS>::max()),
RHS
(rhs > std::numeric_limits<LHS>::max()),
LHS
>::type const
) {
return false;
}
template <typename LHS, LHS lhs, typename RHS>
template <typename RHS, RHS rhs, typename LHS>
bool greater_than_impl(
typename std::enable_if<
(lhs < std::numeric_limits<RHS>::min()),
RHS
(rhs < std::numeric_limits<LHS>::min()),
LHS
>::type const
) {
return true;
@@ -409,11 +409,11 @@ bool less_than(LHS const lhs) {
>(lhs);
}
template <typename LHS, LHS lhs, typename RHS>
bool greater_than(RHS const rhs) {
template <typename RHS, RHS rhs, typename LHS>
bool greater_than(LHS const lhs) {
return detail::greater_than_impl<
LHS, lhs, typename std::remove_reference<RHS>::type
>(rhs);
RHS, rhs, typename std::remove_reference<LHS>::type
>(lhs);
}
} // namespace folly
+84
Ver Arquivo
@@ -0,0 +1,84 @@
/*
* Copyright 2013 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <limits>
#include <assert.h>
#include <errno.h>
#include <linux/futex.h>
#include <sys/syscall.h>
#include <unistd.h>
#include <boost/noncopyable.hpp>
namespace folly { namespace detail {
/**
* Futex is an atomic 32 bit unsigned integer that provides access to the
* futex() syscall on that value. It is templated in such a way that it
* can interact properly with DeterministicSchedule testing.
*
* If you don't know how to use futex(), you probably shouldn't be using
* this class. Even if you do know how, you should have a good reason
* (and benchmarks to back you up).
*/
template <template <typename> class Atom = std::atomic>
struct Futex : Atom<uint32_t>, boost::noncopyable {
explicit Futex(uint32_t init = 0) : Atom<uint32_t>(init) {}
/** Puts the thread to sleep if this->load() == expected. Returns true when
* it is returning because it has consumed a wake() event, false for any
* other return (signal, this->load() != expected, or spurious wakeup). */
bool futexWait(uint32_t expected, uint32_t waitMask = -1);
/** Wakens up to count waiters where (waitMask & wakeMask) != 0,
* returning the number of awoken threads. */
int futexWake(int count = std::numeric_limits<int>::max(),
uint32_t wakeMask = -1);
};
template <>
inline bool Futex<std::atomic>::futexWait(uint32_t expected,
uint32_t waitMask) {
assert(sizeof(*this) == sizeof(int));
int rv = syscall(SYS_futex,
this, /* addr1 */
FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG, /* op */
expected, /* val */
nullptr, /* timeout */
nullptr, /* addr2 */
waitMask); /* val3 */
assert(rv == 0 || (errno == EWOULDBLOCK || errno == EINTR));
return rv == 0;
}
template <>
inline int Futex<std::atomic>::futexWake(int count, uint32_t wakeMask) {
assert(sizeof(*this) == sizeof(int));
int rv = syscall(SYS_futex,
this, /* addr1 */
FUTEX_WAKE_BITSET | FUTEX_PRIVATE_FLAG, /* op */
count, /* val */
nullptr, /* timeout */
nullptr, /* addr2 */
wakeMask); /* val3 */
assert(rv >= 0);
return rv;
}
}}
+48 -19
Ver Arquivo
@@ -21,10 +21,14 @@
#include <syscall.h>
#include <linux/futex.h>
#include <sys/time.h>
#include <cassert>
#include <climits>
#include <atomic>
#include <thread>
#include "folly/Bits.h"
#include "folly/Likely.h"
namespace folly {
@@ -90,12 +94,12 @@ inline int futex(int* uaddr, int op, int val, const timespec* timeout,
*/
class EventCount {
public:
EventCount() noexcept : epoch_(0), waiters_(0) { }
EventCount() noexcept : val_(0) { }
class Key {
friend class EventCount;
explicit Key(int e) noexcept : epoch_(e) { }
int epoch_;
explicit Key(uint32_t e) noexcept : epoch_(e) { }
uint32_t epoch_;
};
void notify() noexcept;
@@ -118,8 +122,28 @@ class EventCount {
EventCount& operator=(const EventCount&) = delete;
EventCount& operator=(EventCount&&) = delete;
std::atomic<int> epoch_;
std::atomic<int> waiters_;
// This requires 64-bit
static_assert(sizeof(int) == 4, "bad platform");
static_assert(sizeof(uint32_t) == 4, "bad platform");
static_assert(sizeof(uint64_t) == 8, "bad platform");
#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
static constexpr size_t kEpochOffset = 1;
#elif __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
static constexpr size_t kEpochOffset = 0; // in units of sizeof(int)
#else
# error Your machine uses a weird endianness!
#endif
// val_ stores the epoch in the most significant 32 bits and the
// waiter count in the least significant 32 bits.
std::atomic<uint64_t> val_;
static constexpr uint64_t kAddWaiter = uint64_t(1);
static constexpr uint64_t kSubWaiter = uint64_t(-1);
static constexpr size_t kEpochShift = 32;
static constexpr uint64_t kAddEpoch = uint64_t(1) << kEpochShift;
static constexpr uint64_t kWaiterMask = kAddEpoch - 1;
};
inline void EventCount::notify() noexcept {
@@ -131,31 +155,36 @@ inline void EventCount::notifyAll() noexcept {
}
inline void EventCount::doNotify(int n) noexcept {
// The order is important: epoch_ is incremented before waiters_ is checked.
// prepareWait() increments waiters_ before checking epoch_, so it is
// impossible to miss a wakeup.
++epoch_;
if (waiters_ != 0) {
detail::futex(reinterpret_cast<int*>(&epoch_), FUTEX_WAKE, n, nullptr,
nullptr, 0);
uint64_t prev = val_.fetch_add(kAddEpoch, std::memory_order_acq_rel);
if (UNLIKELY(prev & kWaiterMask)) {
detail::futex(reinterpret_cast<int*>(&val_) + kEpochOffset,
FUTEX_WAKE, n, nullptr, nullptr, 0);
}
}
inline EventCount::Key EventCount::prepareWait() noexcept {
++waiters_;
return Key(epoch_);
uint64_t prev = val_.fetch_add(kAddWaiter, std::memory_order_acq_rel);
return Key(prev >> kEpochShift);
}
inline void EventCount::cancelWait() noexcept {
--waiters_;
// memory_order_relaxed would suffice for correctness, but the faster
// #waiters gets to 0, the less likely it is that we'll do spurious wakeups
// (and thus system calls).
uint64_t prev = val_.fetch_add(kSubWaiter, std::memory_order_seq_cst);
assert((prev & kWaiterMask) != 0);
}
inline void EventCount::wait(Key key) noexcept {
while (epoch_ == key.epoch_) {
detail::futex(reinterpret_cast<int*>(&epoch_), FUTEX_WAIT, key.epoch_,
nullptr, nullptr, 0);
while ((val_.load(std::memory_order_acquire) >> kEpochShift) == key.epoch_) {
detail::futex(reinterpret_cast<int*>(&val_) + kEpochOffset,
FUTEX_WAIT, key.epoch_, nullptr, nullptr, 0);
}
--waiters_;
// memory_order_relaxed would suffice for correctness, but the faster
// #waiters gets to 0, the less likely it is that we'll do spurious wakeups
// (and thus system calls)
uint64_t prev = val_.fetch_add(kSubWaiter, std::memory_order_seq_cst);
assert((prev & kWaiterMask) != 0);
}
template <class Condition>
+6
Ver Arquivo
@@ -14,6 +14,10 @@
* limitations under the License.
*/
// Ignore shadowing warnings within this file, so includers can use -Wshadow.
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wshadow"
namespace folly { namespace gen {
/**
@@ -1829,3 +1833,5 @@ inline detail::Skip skip(size_t count) {
}
}} //folly::gen
#pragma GCC diagnostic pop
@@ -18,8 +18,12 @@
#include "folly/experimental/exception_tracer/StackTrace.h"
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include "unwind.h"
#define UNW_LOCAL_ONLY 1
#include <libunwind.h>
struct Context {
StackTrace* trace;
@@ -27,43 +31,69 @@ struct Context {
size_t capacity;
};
static _Unwind_Reason_Code addIP(struct _Unwind_Context* ctx, void* varg) {
struct Context* arg = (struct Context*)varg;
static int checkError(const char* name, int err) {
if (err < 0) {
fprintf(stderr, "libunwind error: %s %d\n", name, err);
return -EINVAL;
}
return 0;
}
if (arg->skip) {
--arg->skip;
return _URC_NO_REASON;
static int addIP(struct Context* ctx, unw_cursor_t* cursor) {
if (ctx->skip) {
--ctx->skip;
return 0;
}
if (arg->trace->frameCount == arg->capacity) {
size_t newCapacity = (arg->capacity < 8 ? 8 : arg->capacity * 1.5);
unw_word_t ip;
int r = unw_get_reg(cursor, UNW_REG_IP, &ip);
int err = checkError("unw_get_reg", r);
if (err) return err;
if (ctx->trace->frameCount == ctx->capacity) {
size_t newCapacity = (ctx->capacity < 8 ? 8 : ctx->capacity * 1.5);
uintptr_t* newBlock =
realloc(arg->trace->frameIPs, newCapacity * sizeof(uintptr_t));
realloc(ctx->trace->frameIPs, newCapacity * sizeof(uintptr_t));
if (!newBlock) {
return _URC_FATAL_PHASE1_ERROR;
return -ENOMEM;
}
arg->trace->frameIPs = newBlock;
arg->capacity = newCapacity;
ctx->trace->frameIPs = newBlock;
ctx->capacity = newCapacity;
}
arg->trace->frameIPs[arg->trace->frameCount++] = _Unwind_GetIP(ctx);
return _URC_NO_REASON; /* success */
ctx->trace->frameIPs[ctx->trace->frameCount++] = ip;
return 0; /* success */
}
int getCurrentStackTrace(size_t skip, StackTrace* trace) {
trace->frameIPs = NULL;
trace->frameCount = 0;
struct Context ctx;
ctx.trace = trace;
ctx.skip = skip;
ctx.capacity = 0;
if (_Unwind_Backtrace(addIP, &ctx) == _URC_END_OF_STACK) {
return 0;
}
unw_context_t uctx;
int r = unw_getcontext(&uctx);
int err = checkError("unw_get_context", r);
if (err) return err;
destroyStackTrace(trace);
return -ENOMEM;
unw_cursor_t cursor;
r = unw_init_local(&cursor, &uctx);
err = checkError("unw_init_local", r);
if (err) return err;
while ((r = unw_step(&cursor)) > 0) {
if ((err = addIP(&ctx, &cursor)) != 0) {
destroyStackTrace(trace);
return err;
}
}
err = checkError("unw_step", r);
if (err) return err;
return 0;
}
void destroyStackTrace(StackTrace* trace) {
+6
Ver Arquivo
@@ -34,6 +34,10 @@
#include "folly/Range.h"
#include "folly/FBVector.h"
// Ignore shadowing warnings within this file, so includers can use -Wshadow.
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wshadow"
namespace folly {
/**
@@ -1214,4 +1218,6 @@ inline IOBuf::Iterator IOBuf::end() const { return cend(); }
} // folly
#pragma GCC diagnostic pop
#endif // FOLLY_IO_IOBUF_H_
+7
Ver Arquivo
@@ -222,6 +222,13 @@ class IOBufQueue {
return chainLength_;
}
/**
* Returns true iff the IOBuf chain length is 0.
*/
bool empty() const {
return !head_ || head_->empty();
}
const Options& options() const {
return options_;
}
+6
Ver Arquivo
@@ -64,6 +64,10 @@
# endif
#endif
// Ignore shadowing warnings within this file, so includers can use -Wshadow.
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wshadow"
namespace folly {
//////////////////////////////////////////////////////////////////////
@@ -1180,6 +1184,8 @@ void swap(small_vector<T,MaxInline,A,B,C>& a,
}
#pragma GCC diagnostic pop
#ifdef FB_PACKED
# undef FB_PACKED
#endif
+1 -1
Ver Arquivo
@@ -515,7 +515,7 @@ public:
return end();
}
size_type count(const key_type& key) {
size_type count(const key_type& key) const {
return find(key) == end() ? 0 : 1;
}
+4
Ver Arquivo
@@ -27,10 +27,14 @@
#include "folly/stats/Histogram.h"
#include "folly/stats/Histogram-defs.h"
#include "folly/stats/MultiLevelTimeSeries.h"
#include "folly/stats/MultiLevelTimeSeries-defs.h"
namespace folly {
template class BucketedTimeSeries<int64_t>;
template class Histogram<int64_t>;
template class detail::HistogramBuckets<int64_t, Histogram<int64_t>::Bucket>;
template class MultiLevelTimeSeries<int64_t>;
} // folly
@@ -0,0 +1,105 @@
/*
* Copyright 2013 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FOLLY_STATS_MULTILEVELTIMESERIES_DEFS_H_
#define FOLLY_STATS_MULTILEVELTIMESERIES_DEFS_H_
#include <glog/logging.h>
namespace folly {
template <typename VT, typename TT>
MultiLevelTimeSeries<VT, TT>::MultiLevelTimeSeries(
size_t numBuckets,
size_t numLevels,
const TimeType levelDurations[])
: numBuckets_(numBuckets),
cachedTime_(0),
cachedSum_(0),
cachedCount_(0) {
CHECK_GT(numLevels, 0);
CHECK(levelDurations);
levels_.reserve(numLevels);
for (int i = 0; i < numLevels; ++i) {
if (levelDurations[i] == TT(0)) {
CHECK_EQ(i, numLevels - 1);
} else if (i > 0) {
CHECK(levelDurations[i-1] < levelDurations[i]);
}
levels_.emplace_back(numBuckets, levelDurations[i]);
}
}
template <typename VT, typename TT>
void MultiLevelTimeSeries<VT, TT>::addValue(TimeType now,
const ValueType& val) {
addValueAggregated(now, val, 1);
}
template <typename VT, typename TT>
void MultiLevelTimeSeries<VT, TT>::addValue(TimeType now,
const ValueType& val,
int64_t times) {
addValueAggregated(now, val * times, times);
}
template <typename VT, typename TT>
void MultiLevelTimeSeries<VT, TT>::addValueAggregated(TimeType now,
const ValueType& sum,
int64_t nsamples) {
if (cachedTime_ != now) {
flush();
cachedTime_ = now;
}
cachedSum_ += sum;
cachedCount_ += nsamples;
}
template <typename VT, typename TT>
void MultiLevelTimeSeries<VT, TT>::update(TimeType now) {
flush();
for (int i = 0; i < levels_.size(); ++i) {
levels_[i].update(now);
}
}
template <typename VT, typename TT>
void MultiLevelTimeSeries<VT, TT>::flush() {
// update all the underlying levels
if (cachedCount_ > 0) {
for (int i = 0; i < levels_.size(); ++i) {
levels_[i].addValueAggregated(cachedTime_, cachedSum_, cachedCount_);
}
cachedCount_ = 0;
cachedSum_ = 0;
}
}
template <typename VT, typename TT>
void MultiLevelTimeSeries<VT, TT>::clear() {
for (auto & level : levels_) {
level.clear();
}
cachedTime_ = TimeType(0);
cachedSum_ = 0;
cachedCount_ = 0;
}
} // folly
#endif // FOLLY_STATS_MULTILEVELTIMESERIES_DEFS_H_
+312
Ver Arquivo
@@ -0,0 +1,312 @@
/*
* Copyright 2013 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FOLLY_STATS_MULTILEVELTIMESERIES_H_
#define FOLLY_STATS_MULTILEVELTIMESERIES_H_
#include <chrono>
#include <string>
#include <vector>
#include "folly/stats/BucketedTimeSeries.h"
namespace folly {
/*
* This class represents a timeseries which keeps several levels of data
* granularity (similar in principle to the loads reported by the UNIX
* 'uptime' command). It uses several instances (one per level) of
* BucketedTimeSeries as the underlying storage.
*
* This can easily be used to track sums (and thus rates or averages) over
* several predetermined time periods, as well as all-time sums. For example,
* you would use to it to track query rate or response speed over the last
* 5, 15, 30, and 60 minutes.
*
* The MultiLevelTimeSeries takes a list of level durations as an input; the
* durations must be strictly increasing. Furthermore a special level can be
* provided with a duration of '0' -- this will be an "all-time" level. If
* an all-time level is provided, it MUST be the last level present.
*
* The class assumes that time advances forward -- you can't retroactively add
* values for events in the past -- the 'now' argument is provided for better
* efficiency and ease of unittesting.
*
* The class is not thread-safe -- use your own synchronization!
*/
template <typename VT, typename TT=std::chrono::seconds>
class MultiLevelTimeSeries {
public:
typedef VT ValueType;
typedef TT TimeType;
typedef folly::BucketedTimeSeries<ValueType, TimeType> Level;
/*
* Create a new MultiLevelTimeSeries.
*
* This creates a new MultiLevelTimeSeries that tracks time series data at the
* specified time durations (level). The time series data tracked at each
* level is then further divided by numBuckets for memory efficiency.
*
* The durations must be strictly increasing. Furthermore a special level can
* be provided with a duration of '0' -- this will be an "all-time" level. If
* an all-time level is provided, it MUST be the last level present.
*/
MultiLevelTimeSeries(size_t numBuckets,
size_t numLevels,
const TimeType levelDurations[]);
/*
* Return the number of buckets used to track time series at each level.
*/
size_t numBuckets() const { return numBuckets_; }
/*
* Return the number of levels tracked by MultiLevelTimeSeries.
*/
size_t numLevels() const { return levels_.size(); }
/*
* Get the BucketedTimeSeries backing the specified level.
*
* Note: you should generally call update() or flush() before accessing the
* data. Otherwise you may be reading stale data if update() or flush() has
* not been called recently.
*/
const Level& getLevel(int level) const {
CHECK(level >= 0);
CHECK_LT(level, levels_.size());
return levels_[level];
}
/*
* Get the highest granularity level that is still large enough to contain
* data going back to the specified start time.
*
* Note: you should generally call update() or flush() before accessing the
* data. Otherwise you may be reading stale data if update() or flush() has
* not been called recently.
*/
const Level& getLevel(TimeType start) const {
for (const auto& level : levels_) {
if (level.isAllTime()) {
return level;
}
// Note that we use duration() here rather than elapsed().
// If duration is large enough to contain the start time then this level
// is good enough, even if elapsed() indicates that no data was recorded
// before the specified start time.
if (level.getLatestTime() - level.duration() <= start) {
return level;
}
}
// We should always have an all-time level, so this is never reached.
LOG(FATAL) << "No level of timeseries covers internval"
<< " from " << start.count() << " to now";
return levels_.back();
}
/*
* Return the sum of all the data points currently tracked at this level.
*
* Note: you should generally call update() or flush() before accessing the
* data. Otherwise you may be reading stale data if update() or flush() has
* not been called recently.
*/
ValueType sum(int level) const {
return getLevel(level).sum();
}
/*
* Return the average (sum / count) of all the data points currently tracked
* at this level.
*
* The return type may be specified to control whether floating-point or
* integer division should be performed.
*
* Note: you should generally call update() or flush() before accessing the
* data. Otherwise you may be reading stale data if update() or flush() has
* not been called recently.
*/
template <typename ReturnType=double>
ReturnType avg(int level) const {
return getLevel(level).template avg<ReturnType>();
}
/*
* Return the rate (sum divided by elaspsed time) of the all data points
* currently tracked at this level.
*
* Note: you should generally call update() or flush() before accessing the
* data. Otherwise you may be reading stale data if update() or flush() has
* not been called recently.
*/
template <typename ReturnType=double, typename Interval=TimeType>
ValueType rate(int level) const {
return getLevel(level).template rate<ReturnType, Interval>();
}
/*
* Return the number of data points currently tracked at this level.
*
* Note: you should generally call update() or flush() before accessing the
* data. Otherwise you may be reading stale data if update() or flush() has
* not been called recently.
*/
int64_t count(int level) const {
return getLevel(level).count();
}
/*
* Return the count divided by the elapsed time tracked at this level.
*
* Note: you should generally call update() or flush() before accessing the
* data. Otherwise you may be reading stale data if update() or flush() has
* not been called recently.
*/
template <typename ReturnType=double, typename Interval=TimeType>
ReturnType countRate(int level) const {
return getLevel(level).template countRate<ReturnType, Interval>();
}
/*
* Estimate the sum of the data points that occurred in the specified time
* period at this level.
*
* The range queried is [start, end).
* That is, start is inclusive, and end is exclusive.
*
* Note that data outside of the timeseries duration will no longer be
* available for use in the estimation. Specifying a start time earlier than
* getEarliestTime() will not have much effect, since only data points after
* that point in time will be counted.
*
* Note that the value returned is an estimate, and may not be precise.
*
* Note: you should generally call update() or flush() before accessing the
* data. Otherwise you may be reading stale data if update() or flush() has
* not been called recently.
*/
ValueType sum(TimeType start, TimeType end) const {
return getLevel(start).sum(start, end);
}
/*
* Estimate the average value during the specified time period.
*
* The same caveats documented in the sum(TimeType start, TimeType end)
* comments apply here as well.
*
* Note: you should generally call update() or flush() before accessing the
* data. Otherwise you may be reading stale data if update() or flush() has
* not been called recently.
*/
template <typename ReturnType=double>
ReturnType avg(TimeType start, TimeType end) const {
return getLevel(start).template avg<ReturnType>(start, end);
}
/*
* Estimate the rate during the specified time period.
*
* The same caveats documented in the sum(TimeType start, TimeType end)
* comments apply here as well.
*
* Note: you should generally call update() or flush() before accessing the
* data. Otherwise you may be reading stale data if update() or flush() has
* not been called recently.
*/
template <typename ReturnType=double>
ReturnType rate(TimeType start, TimeType end) const {
return getLevel(start).template rate<ReturnType>(start, end);
}
/*
* Estimate the count during the specified time period.
*
* The same caveats documented in the sum(TimeType start, TimeType end)
* comments apply here as well.
*
* Note: you should generally call update() or flush() before accessing the
* data. Otherwise you may be reading stale data if update() or flush() has
* not been called recently.
*/
int64_t count(TimeType start, TimeType end) const {
return getLevel(start).count(start, end);
}
/*
* Adds the value 'val' at time 'now' to all levels.
*
* Data points added at the same time point is cached internally here and not
* propagated to the underlying levels until either flush() is called or when
* update from a different time comes.
*
* This function expects time to always move forwards: it cannot be used to
* add historical data points that have occurred in the past. If now is
* older than the another timestamp that has already been passed to
* addValue() or update(), now will be ignored and the latest timestamp will
* be used.
*/
void addValue(TimeType now, const ValueType& val);
/*
* Adds the value 'val' at time 'now' to all levels.
*/
void addValue(TimeType now, const ValueType& val, int64_t times);
/*
* Adds the value 'val' at time 'now' to all levels as the sum of 'nsamples'
* samples.
*/
void addValueAggregated(TimeType now, const ValueType& sum, int64_t nsamples);
/*
* Update all the levels to the specified time, doing all the necessary
* work to rotate the buckets and remove any stale data points.
*
* When reading data from the timeseries, you should make sure to manually
* call update() before accessing the data. Otherwise you may be reading
* stale data if update() has not been called recently.
*/
void update(TimeType now);
/*
* Reset all the timeseries to an empty state as if no data points have ever
* been added to it.
*/
void clear();
/*
* Flush all cached updates.
*/
void flush();
private:
size_t numBuckets_;
std::vector<Level> levels_;
// Updates within the same time interval are cached
// They are flushed out when updates from a different time comes,
// or flush() is called.
TimeType cachedTime_;
ValueType cachedSum_;
int cachedCount_;
};
} // folly
#endif // FOLLY_STATS_MULTILEVELTIMESERIES_H_