diff options
author | blaze <blaze@yandex-team.ru> | 2022-02-10 16:50:31 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:31 +0300 |
commit | 8de5e9fef85b2ab655e3bc1d77ee2674f417cd15 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library | |
parent | 6813864abdb5ce336cde7a2e5cd80232ba54eef1 (diff) | |
download | ydb-8de5e9fef85b2ab655e3bc1d77ee2674f417cd15.tar.gz |
Restoring authorship annotation for <blaze@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library')
-rw-r--r-- | library/cpp/actors/dnscachelib/dnscache.cpp | 404 | ||||
-rw-r--r-- | library/cpp/actors/dnscachelib/dnscache.h | 138 | ||||
-rw-r--r-- | library/cpp/actors/dnscachelib/timekeeper.h | 62 | ||||
-rw-r--r-- | library/cpp/bucket_quoter/bucket_quoter.h | 224 | ||||
-rw-r--r-- | library/cpp/lfalloc/lf_allocX64.cpp | 6 | ||||
-rw-r--r-- | library/cpp/messagebus/message.cpp | 4 | ||||
-rw-r--r-- | library/cpp/messagebus/ya.make | 6 | ||||
-rw-r--r-- | library/cpp/messagebus/ybus.h | 2 | ||||
-rw-r--r-- | library/cpp/monlib/counters/counters.h | 2 |
9 files changed, 424 insertions, 424 deletions
diff --git a/library/cpp/actors/dnscachelib/dnscache.cpp b/library/cpp/actors/dnscachelib/dnscache.cpp index 7ca64c0d09..649339ddb2 100644 --- a/library/cpp/actors/dnscachelib/dnscache.cpp +++ b/library/cpp/actors/dnscachelib/dnscache.cpp @@ -1,17 +1,17 @@ -#include "dnscache.h" +#include "dnscache.h" #include "probes.h" -#include "timekeeper.h" - -#include <contrib/libs/c-ares/ares.h> -#include <util/system/guard.h> +#include "timekeeper.h" + +#include <contrib/libs/c-ares/ares.h> +#include <util/system/guard.h> #include <util/datetime/systime.h> - + const TDnsCache::THost TDnsCache::NullHost; LWTRACE_USING(DNSCACHELIB_PROVIDER); static_assert(sizeof(ares_channel) == sizeof(void*), "expect sizeof(ares_channel) == sizeof(void *)"); - + TDnsCache::TDnsCache(bool allowIpv4, bool allowIpv6, time_t lifetime, time_t neg, ui32 timeout) : EntryLifetime(lifetime) , NegativeLifetime(neg) @@ -30,28 +30,28 @@ TDnsCache::TDnsCache(bool allowIpv4, bool allowIpv6, time_t lifetime, time_t neg } #endif - ares_channel chan; - - if (ares_init(&chan) != ARES_SUCCESS) { + ares_channel chan; + + if (ares_init(&chan) != ARES_SUCCESS) { LWPROBE(AresInitFailed); - ythrow yexception() << "ares_init() failed"; - } - Channel = chan; + ythrow yexception() << "ares_init() failed"; + } + Channel = chan; LWPROBE(Created); -} - +} + TDnsCache::~TDnsCache(void) { - ares_channel chan = static_cast<ares_channel>(Channel); - - ares_cancel(chan); - ares_destroy(chan); + ares_channel chan = static_cast<ares_channel>(Channel); + + ares_cancel(chan); + ares_destroy(chan); LWPROBE(Destroyed); #ifdef _win_ ares_library_cleanup(); #endif -} - +} + TString TDnsCache::GetHostByAddr(const NAddr::IRemoteAddr& addr) { in6_addr key; @@ -146,14 +146,14 @@ void TDnsCache::GetAllAddresses( void TDnsCache::GetStats(ui64& a_cache_hits, ui64& a_cache_misses, ui64& ptr_cache_hits, ui64& ptr_cache_misses) { - TGuard<TMutex> lock(CacheMtx); - - a_cache_hits = ACacheHits; - a_cache_misses = ACacheMisses; - ptr_cache_hits = PtrCacheHits; - ptr_cache_misses = PtrCacheMisses; -} - + TGuard<TMutex> lock(CacheMtx); + + a_cache_hits = ACacheHits; + a_cache_misses = ACacheMisses; + ptr_cache_hits = PtrCacheHits; + ptr_cache_misses = PtrCacheMisses; +} + bool TDnsCache::THost::IsStale(int family, const TDnsCache* ctx) const noexcept { time_t resolved = family == AF_INET ? ResolvedV4 : ResolvedV6; time_t notfound = family == AF_INET ? NotFoundV4 : NotFoundV6; @@ -174,247 +174,247 @@ TDnsCache::Resolve(const TString& hostname, int family, bool cacheOnly) { return NullHost; } - THostCache::iterator p; - + THostCache::iterator p; + Y_ASSERT(family == AF_INET || family == AF_INET6); - - { - TGuard<TMutex> lock(CacheMtx); - p = HostCache.find(hostname); - if (p != HostCache.end()) { + + { + TGuard<TMutex> lock(CacheMtx); + p = HostCache.find(hostname); + if (p != HostCache.end()) { if (!p->second.IsStale(family, this)) { - /* Recently resolved, just return cached value */ - ACacheHits += 1; + /* Recently resolved, just return cached value */ + ACacheHits += 1; THost& host = p->second; LWPROBE(ResolveFromCache, hostname, family, host.AddrsV4ToString(), host.AddrsV6ToString(), ACacheHits); return host; } else { LWPROBE(ResolveCacheTimeout, hostname); - } - } else { - /* Never resolved, create cache entry */ + } + } else { + /* Never resolved, create cache entry */ LWPROBE(ResolveCacheNew, hostname); p = HostCache.insert(std::make_pair(hostname, THost())).first; - } - ACacheMisses += 1; - } - + } + ACacheMisses += 1; + } + if (cacheOnly) return NullHost; TAtomic& inprogress = (family == AF_INET ? p->second.InProgressV4 : p->second.InProgressV6); - - { - /* This way only! CacheMtx should always be taken AFTER AresMtx, - * because later in ares_process it can only be done this way. - * Lock order reversal will cause deadlock in unfortunate monents. - */ - TGuard<TMutex> areslock(AresMtx); - TGuard<TMutex> cachelock(CacheMtx); - + + { + /* This way only! CacheMtx should always be taken AFTER AresMtx, + * because later in ares_process it can only be done this way. + * Lock order reversal will cause deadlock in unfortunate monents. + */ + TGuard<TMutex> areslock(AresMtx); + TGuard<TMutex> cachelock(CacheMtx); + if (!inprogress) { - ares_channel chan = static_cast<ares_channel>(Channel); + ares_channel chan = static_cast<ares_channel>(Channel); TGHBNContext* ctx = new TGHBNContext(); - ctx->Owner = this; - ctx->Hostname = hostname; - ctx->Family = family; - + ctx->Owner = this; + ctx->Hostname = hostname; + ctx->Family = family; + AtomicSet(inprogress, 1); - ares_gethostbyname(chan, hostname.c_str(), family, + ares_gethostbyname(chan, hostname.c_str(), family, &TDnsCache::GHBNCallback, ctx); - } - } - - WaitTask(inprogress); - + } + } + + WaitTask(inprogress); + LWPROBE(ResolveDone, hostname, family, p->second.AddrsV4ToString(), p->second.AddrsV6ToString()); - return p->second; -} - + return p->second; +} + bool TDnsCache::ValidateHName(const TString& name) const noexcept { return name.size() > 0; } const TDnsCache::TAddr& TDnsCache::ResolveAddr(const in6_addr& addr, int family) { - TAddrCache::iterator p; - - { - TGuard<TMutex> lock(CacheMtx); - p = AddrCache.find(addr); - if (p != AddrCache.end()) { + TAddrCache::iterator p; + + { + TGuard<TMutex> lock(CacheMtx); + p = AddrCache.find(addr); + if (p != AddrCache.end()) { if (TTimeKeeper::GetTime() - p->second.Resolved < EntryLifetime || TTimeKeeper::GetTime() - p->second.NotFound < NegativeLifetime) { - /* Recently resolved, just return cached value */ - PtrCacheHits += 1; - return p->second; - } - } else { - /* Never resolved, create cache entry */ - + /* Recently resolved, just return cached value */ + PtrCacheHits += 1; + return p->second; + } + } else { + /* Never resolved, create cache entry */ + p = AddrCache.insert(std::make_pair(addr, TAddr())).first; - } - PtrCacheMisses += 1; - } - - { - /* This way only! CacheMtx should always be taken AFTER AresMtx, - * because later in ares_process it can only be done this way. - * Lock order reversal will cause deadlock in unfortunate monents. - */ - TGuard<TMutex> areslock(AresMtx); - TGuard<TMutex> cachelock(CacheMtx); - + } + PtrCacheMisses += 1; + } + + { + /* This way only! CacheMtx should always be taken AFTER AresMtx, + * because later in ares_process it can only be done this way. + * Lock order reversal will cause deadlock in unfortunate monents. + */ + TGuard<TMutex> areslock(AresMtx); + TGuard<TMutex> cachelock(CacheMtx); + if (!p->second.InProgress) { - ares_channel chan = static_cast<ares_channel>(Channel); + ares_channel chan = static_cast<ares_channel>(Channel); TGHBAContext* ctx = new TGHBAContext(); - ctx->Owner = this; - ctx->Addr = addr; - + ctx->Owner = this; + ctx->Addr = addr; + AtomicSet(p->second.InProgress, 1); - ares_gethostbyaddr(chan, &addr, + ares_gethostbyaddr(chan, &addr, family == AF_INET ? sizeof(in_addr) : sizeof(in6_addr), family, &TDnsCache::GHBACallback, ctx); - } - } - - WaitTask(p->second.InProgress); - - return p->second; -} - + } + } + + WaitTask(p->second.InProgress); + + return p->second; +} + void TDnsCache::WaitTask(TAtomic& flag) { const TInstant start = TInstant(TTimeKeeper::GetTimeval()); - + while (AtomicGet(flag)) { - ares_channel chan = static_cast<ares_channel>(Channel); - - struct pollfd pfd[ARES_GETSOCK_MAXNUM]; - int nfds; - ares_socket_t socks[ARES_GETSOCK_MAXNUM]; - int bits; - - { - TGuard<TMutex> lock(AresMtx); - bits = ares_getsock(chan, socks, ARES_GETSOCK_MAXNUM); - if (bits == 0) { - /* other thread did our job */ - continue; - } - } - - for (nfds = 0; nfds < ARES_GETSOCK_MAXNUM; nfds++) { - pfd[nfds].events = 0; - pfd[nfds].revents = 0; - if (ARES_GETSOCK_READABLE(bits, nfds)) { - pfd[nfds].fd = socks[nfds]; + ares_channel chan = static_cast<ares_channel>(Channel); + + struct pollfd pfd[ARES_GETSOCK_MAXNUM]; + int nfds; + ares_socket_t socks[ARES_GETSOCK_MAXNUM]; + int bits; + + { + TGuard<TMutex> lock(AresMtx); + bits = ares_getsock(chan, socks, ARES_GETSOCK_MAXNUM); + if (bits == 0) { + /* other thread did our job */ + continue; + } + } + + for (nfds = 0; nfds < ARES_GETSOCK_MAXNUM; nfds++) { + pfd[nfds].events = 0; + pfd[nfds].revents = 0; + if (ARES_GETSOCK_READABLE(bits, nfds)) { + pfd[nfds].fd = socks[nfds]; pfd[nfds].events |= POLLRDNORM | POLLIN; - } - if (ARES_GETSOCK_WRITABLE(bits, nfds)) { - pfd[nfds].fd = socks[nfds]; + } + if (ARES_GETSOCK_WRITABLE(bits, nfds)) { + pfd[nfds].fd = socks[nfds]; pfd[nfds].events |= POLLWRNORM | POLLOUT; - } - if (pfd[nfds].events == 0) { - break; - } - } - + } + if (pfd[nfds].events == 0) { + break; + } + } + Y_ASSERT(nfds != 0); - + const TDuration left = TInstant(TTimeKeeper::GetTimeval()) - start; const TDuration wait = Max(Timeout - left, TDuration::Zero()); int rv = poll(pfd, nfds, wait.MilliSeconds()); - if (rv == -1) { - if (errno == EINTR) { - continue; - } - /* Unknown error in select, can't recover. Just pretend there was no reply */ - rv = 0; - } - - if (rv == 0) { - /* poll() timed out */ - TGuard<TMutex> lock(AresMtx); - ares_process_fd(chan, ARES_SOCKET_BAD, ARES_SOCKET_BAD); - } else { - for (int i = 0; i < nfds; i++) { - if (pfd[i].revents == 0) { - continue; - } - TGuard<TMutex> lock(AresMtx); - ares_process_fd(chan, + if (rv == -1) { + if (errno == EINTR) { + continue; + } + /* Unknown error in select, can't recover. Just pretend there was no reply */ + rv = 0; + } + + if (rv == 0) { + /* poll() timed out */ + TGuard<TMutex> lock(AresMtx); + ares_process_fd(chan, ARES_SOCKET_BAD, ARES_SOCKET_BAD); + } else { + for (int i = 0; i < nfds; i++) { + if (pfd[i].revents == 0) { + continue; + } + TGuard<TMutex> lock(AresMtx); + ares_process_fd(chan, pfd[i].revents & (POLLRDNORM | POLLIN) ? pfd[i].fd : ARES_SOCKET_BAD, pfd[i].revents & (POLLWRNORM | POLLOUT) ? pfd[i].fd : ARES_SOCKET_BAD); - } - } + } + } if (start + Timeout <= TInstant(TTimeKeeper::GetTimeval())) { break; } - } -} - + } +} + void TDnsCache::GHBNCallback(void* arg, int status, int, struct hostent* info) { THolder<TGHBNContext> ctx(static_cast<TGHBNContext*>(arg)); - TGuard<TMutex> lock(ctx->Owner->CacheMtx); - THostCache::iterator p = ctx->Owner->HostCache.find(ctx->Hostname); - + TGuard<TMutex> lock(ctx->Owner->CacheMtx); + THostCache::iterator p = ctx->Owner->HostCache.find(ctx->Hostname); + Y_ASSERT(p != ctx->Owner->HostCache.end()); - + time_t& resolved = (ctx->Family == AF_INET ? p->second.ResolvedV4 : p->second.ResolvedV6); time_t& notfound = (ctx->Family == AF_INET ? p->second.NotFoundV4 : p->second.NotFoundV6); TAtomic& inprogress = (ctx->Family == AF_INET ? p->second.InProgressV4 : p->second.InProgressV6); - - if (status == ARES_SUCCESS) { - if (info->h_addrtype == AF_INET) { - p->second.AddrsV4.clear(); + + if (status == ARES_SUCCESS) { + if (info->h_addrtype == AF_INET) { + p->second.AddrsV4.clear(); for (int i = 0; info->h_addr_list[i] != nullptr; i++) { p->second.AddrsV4.push_back(*(TIpHost*)(info->h_addr_list[i])); - } - /* It is possible to ask ares for IPv6 and have IPv4 addrs instead, - so take care and set V4 timers anyway. - */ - p->second.ResolvedV4 = TTimeKeeper::GetTime(); - p->second.ResolvedV4 = 0; + } + /* It is possible to ask ares for IPv6 and have IPv4 addrs instead, + so take care and set V4 timers anyway. + */ + p->second.ResolvedV4 = TTimeKeeper::GetTime(); + p->second.ResolvedV4 = 0; AtomicSet(p->second.InProgressV4, 0); - } else if (info->h_addrtype == AF_INET6) { - p->second.AddrsV6.clear(); + } else if (info->h_addrtype == AF_INET6) { + p->second.AddrsV6.clear(); for (int i = 0; info->h_addr_list[i] != nullptr; i++) { p->second.AddrsV6.push_back(*(struct in6_addr*)(info->h_addr_list[i])); - } - } else { + } + } else { Y_FAIL("unknown address type in ares callback"); - } - resolved = TTimeKeeper::GetTime(); - notfound = 0; - } else { - notfound = TTimeKeeper::GetTime(); - resolved = 0; - } + } + resolved = TTimeKeeper::GetTime(); + notfound = 0; + } else { + notfound = TTimeKeeper::GetTime(); + resolved = 0; + } AtomicSet(inprogress, 0); -} - +} + void TDnsCache::GHBACallback(void* arg, int status, int, struct hostent* info) { THolder<TGHBAContext> ctx(static_cast<TGHBAContext*>(arg)); - TGuard<TMutex> lock(ctx->Owner->CacheMtx); - TAddrCache::iterator p = ctx->Owner->AddrCache.find(ctx->Addr); - + TGuard<TMutex> lock(ctx->Owner->CacheMtx); + TAddrCache::iterator p = ctx->Owner->AddrCache.find(ctx->Addr); + Y_ASSERT(p != ctx->Owner->AddrCache.end()); - - if (status == ARES_SUCCESS) { - p->second.Hostname = info->h_name; - p->second.Resolved = TTimeKeeper::GetTime(); - p->second.NotFound = 0; - } else { - p->second.NotFound = TTimeKeeper::GetTime(); - p->second.Resolved = 0; - } + + if (status == ARES_SUCCESS) { + p->second.Hostname = info->h_name; + p->second.Resolved = TTimeKeeper::GetTime(); + p->second.NotFound = 0; + } else { + p->second.NotFound = TTimeKeeper::GetTime(); + p->second.Resolved = 0; + } AtomicSet(p->second.InProgress, 0); -} +} TString TDnsCache::THost::AddrsV4ToString() const { TStringStream ss; diff --git a/library/cpp/actors/dnscachelib/dnscache.h b/library/cpp/actors/dnscachelib/dnscache.h index a8331ad080..3313a251a1 100644 --- a/library/cpp/actors/dnscachelib/dnscache.h +++ b/library/cpp/actors/dnscachelib/dnscache.h @@ -1,57 +1,57 @@ -#pragma once - +#pragma once + #include <contrib/libs/c-ares/ares.h> -#include <util/generic/map.h> -#include <util/generic/vector.h> -#include <util/network/address.h> -#include <util/system/mutex.h> +#include <util/generic/map.h> +#include <util/generic/vector.h> +#include <util/network/address.h> +#include <util/system/mutex.h> #include <util/datetime/base.h> - -/** Asynchronous DNS resolver. - * - * This is NOT general purpose resolver! It is designed with very specific assumptions: - * 1) there is relatively small and rarely changed set of resolved names (like, server pool in cluster) - * 2) this names supposed to have addresses, absense of A record is equal to DNS error - * 3) most of the time IP addresses do not change - * 4) it's OK to return old IP address when DNS server not responding in time - */ - -class TDnsCache { -public: + +/** Asynchronous DNS resolver. + * + * This is NOT general purpose resolver! It is designed with very specific assumptions: + * 1) there is relatively small and rarely changed set of resolved names (like, server pool in cluster) + * 2) this names supposed to have addresses, absense of A record is equal to DNS error + * 3) most of the time IP addresses do not change + * 4) it's OK to return old IP address when DNS server not responding in time + */ + +class TDnsCache { +public: TDnsCache(bool allowIpv4 = true, bool allowIpv6 = true, time_t entry_lifetime = 1800, time_t neg_lifetime = 1, ui32 request_timeout = 500000); ~TDnsCache(); - + TString GetHostByAddr(const NAddr::IRemoteAddr&); - + // ip in network byte order TIpHost Get(const TString& host); - /* use with AF_INET, AF_INET6 or AF_UNSPEC */ + /* use with AF_INET, AF_INET6 or AF_UNSPEC */ NAddr::IRemoteAddrPtr GetAddr(const TString& host, int family, TIpPort port = 0, bool cacheOnly = false); - + void GetAllAddresses(const TString& host, TVector<NAddr::IRemoteAddrPtr>&); - + void GetStats(ui64& a_cache_hits, ui64& a_cache_misses, ui64& ptr_cache_hits, ui64& ptr_cache_misses); - + protected: bool ValidateHName(const TString& host) const noexcept; -private: - struct TGHBNContext { +private: + struct TGHBNContext { TDnsCache* Owner; TString Hostname; - int Family; - }; - - struct TGHBAContext { + int Family; + }; + + struct TGHBAContext { TDnsCache* Owner; - in6_addr Addr; - }; - + in6_addr Addr; + }; + struct THost { THost() noexcept { } @@ -60,7 +60,7 @@ private: time_t ResolvedV4 = 0; time_t NotFoundV4 = 0; TAtomic InProgressV4 = 0; - + TVector<in6_addr> AddrsV6; time_t ResolvedV6 = 0; time_t NotFoundV6 = 0; @@ -70,8 +70,8 @@ private: TString AddrsV6ToString() const; bool IsStale(int family, const TDnsCache* ctx) const noexcept; - }; - + }; + typedef TMap<TString, THost> THostCache; struct TAddr { @@ -79,54 +79,54 @@ private: time_t Resolved = 0; time_t NotFound = 0; TAtomic InProgress = 0; - }; - /* IRemoteAddr is annoingly hard to use, so I'll use in6_addr as key - * and put v4 addrs in it. - */ - struct TAddrCmp { + }; + /* IRemoteAddr is annoingly hard to use, so I'll use in6_addr as key + * and put v4 addrs in it. + */ + struct TAddrCmp { bool operator()(const in6_addr& left, const in6_addr& right) const { - for (size_t i = 0; i < sizeof(left); i++) { - if (left.s6_addr[i] < right.s6_addr[i]) { - return true; - } else if (left.s6_addr[i] > right.s6_addr[i]) { - return false; - } - } - // equal - return false; - } - }; + for (size_t i = 0; i < sizeof(left); i++) { + if (left.s6_addr[i] < right.s6_addr[i]) { + return true; + } else if (left.s6_addr[i] > right.s6_addr[i]) { + return false; + } + } + // equal + return false; + } + }; typedef TMap<in6_addr, TAddr, TAddrCmp> TAddrCache; - + const THost& Resolve(const TString&, int family, bool cacheOnly = false); const TAddr& ResolveAddr(const in6_addr&, int family); void WaitTask(TAtomic&); - + static void GHBNCallback(void* arg, int status, int timeouts, struct hostent* info); - + static void GHBACallback(void* arg, int status, int timeouts, struct hostent* info); - - const time_t EntryLifetime; - const time_t NegativeLifetime; + + const time_t EntryLifetime; + const time_t NegativeLifetime; const TDuration Timeout; const bool AllowIpV4; const bool AllowIpV6; - - TMutex CacheMtx; - THostCache HostCache; - TAddrCache AddrCache; - ui64 ACacheHits; - ui64 ACacheMisses; - ui64 PtrCacheHits; - ui64 PtrCacheMisses; - + + TMutex CacheMtx; + THostCache HostCache; + TAddrCache AddrCache; + ui64 ACacheHits; + ui64 ACacheMisses; + ui64 PtrCacheHits; + ui64 PtrCacheMisses; + const static THost NullHost; - TMutex AresMtx; + TMutex AresMtx; void* Channel; struct TAresLibInit { @@ -145,4 +145,4 @@ private: }; static TAresLibInit InitAresLib; -}; +}; diff --git a/library/cpp/actors/dnscachelib/timekeeper.h b/library/cpp/actors/dnscachelib/timekeeper.h index eb25b14b7a..0528d8549c 100644 --- a/library/cpp/actors/dnscachelib/timekeeper.h +++ b/library/cpp/actors/dnscachelib/timekeeper.h @@ -1,31 +1,31 @@ -#pragma once - +#pragma once + #include <util/datetime/base.h> -#include <util/generic/singleton.h> +#include <util/generic/singleton.h> #include <util/string/cast.h> -#include <util/system/thread.h> +#include <util/system/thread.h> #include <util/system/event.h> #include <util/system/env.h> - + #include <cstdlib> -/* Keeps current time accurate up to 1/10 second */ - -class TTimeKeeper { -public: +/* Keeps current time accurate up to 1/10 second */ + +class TTimeKeeper { +public: static TInstant GetNow(void) { return TInstant::MicroSeconds(GetTime()); } - static time_t GetTime(void) { - return Singleton<TTimeKeeper>()->CurrentTime.tv_sec; - } - + static time_t GetTime(void) { + return Singleton<TTimeKeeper>()->CurrentTime.tv_sec; + } + static const struct timeval& GetTimeval(void) { - return Singleton<TTimeKeeper>()->CurrentTime; - } - - TTimeKeeper() + return Singleton<TTimeKeeper>()->CurrentTime; + } + + TTimeKeeper() : Thread(&TTimeKeeper::Worker, this) { ConstTime = !!GetEnv("TEST_TIME"); @@ -40,31 +40,31 @@ public: gettimeofday(&CurrentTime, nullptr); Thread.Start(); } - } - - ~TTimeKeeper() { + } + + ~TTimeKeeper() { if (!ConstTime) { Exit.Signal(); Thread.Join(); } - } - -private: - static const ui32 UpdateInterval = 100000; - struct timeval CurrentTime; + } + +private: + static const ui32 UpdateInterval = 100000; + struct timeval CurrentTime; bool ConstTime; TSystemEvent Exit; - TThread Thread; - + TThread Thread; + static void* Worker(void* arg) { TTimeKeeper* owner = static_cast<TTimeKeeper*>(arg); do { - /* Race condition may occur here but locking looks too expensive */ + /* Race condition may occur here but locking looks too expensive */ gettimeofday(&owner->CurrentTime, nullptr); } while (!owner->Exit.WaitT(TDuration::MicroSeconds(UpdateInterval))); - + return nullptr; - } -}; + } +}; diff --git a/library/cpp/bucket_quoter/bucket_quoter.h b/library/cpp/bucket_quoter/bucket_quoter.h index 03bf7d7641..3d92ef8450 100644 --- a/library/cpp/bucket_quoter/bucket_quoter.h +++ b/library/cpp/bucket_quoter/bucket_quoter.h @@ -1,44 +1,44 @@ -#pragma once - -#include <util/datetime/base.h> -#include <util/system/mutex.h> +#pragma once + +#include <util/datetime/base.h> +#include <util/system/mutex.h> #include <util/system/hp_timer.h> - -/* Token bucket. - * Makes flow of *inflow* units per second in average, with up to *capacity* bursts. - * Do not use for STRICT flow control. - */ - -/* samples: create and use quoter sending 1000 bytes per second on average, - with up to 60 seconds quota buildup. - - TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL); - - for (;;) { - T *msg = get_message(); - - quoter.Sleep(); - quoter.Use(msg->GetSize()); - send_message(msg); - } - - ---------------------------- - - TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL); - - for (;;) { - T *msg = get_message(); - - while (! quoter.IsAvail()) { - // do something else - } - - quoter.Use(msg->GetSize()); - send_message(msg); - } - -*/ - + +/* Token bucket. + * Makes flow of *inflow* units per second in average, with up to *capacity* bursts. + * Do not use for STRICT flow control. + */ + +/* samples: create and use quoter sending 1000 bytes per second on average, + with up to 60 seconds quota buildup. + + TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL); + + for (;;) { + T *msg = get_message(); + + quoter.Sleep(); + quoter.Use(msg->GetSize()); + send_message(msg); + } + + ---------------------------- + + TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL); + + for (;;) { + T *msg = get_message(); + + while (! quoter.IsAvail()) { + // do something else + } + + quoter.Use(msg->GetSize()); + send_message(msg); + } + +*/ + struct TInstantTimerMs { using TTime = TInstant; static constexpr ui64 Resolution = 1000ull; // milliseconds @@ -69,8 +69,8 @@ struct THPTimerUs { }; template <typename StatCounter, typename Lock = TMutex, typename Timer = TInstantTimerMs> -class TBucketQuoter { -public: +class TBucketQuoter { +public: using TTime = typename Timer::TTime; struct TResult { @@ -79,43 +79,43 @@ public: ui64 Seqno; }; - /* fixed quota */ + /* fixed quota */ TBucketQuoter(ui64 inflow, ui64 capacity, StatCounter* msgPassed = nullptr, StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr, StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr) - : MsgPassed(msgPassed) - , BucketUnderflows(bucketUnderflows) - , TokensUsed(tokensUsed) + : MsgPassed(msgPassed) + , BucketUnderflows(bucketUnderflows) + , TokensUsed(tokensUsed) , UsecWaited(usecWaited) , AggregateInflow(aggregateInflow) , Bucket(fill ? capacity : 0) , LastAdd(Timer::Now()) - , InflowTokensPerSecond(&FixedInflow) - , BucketTokensCapacity(&FixedCapacity) - , FixedInflow(inflow) - , FixedCapacity(capacity) - { - /* no-op */ - } - - /* adjustable quotas */ + , InflowTokensPerSecond(&FixedInflow) + , BucketTokensCapacity(&FixedCapacity) + , FixedInflow(inflow) + , FixedCapacity(capacity) + { + /* no-op */ + } + + /* adjustable quotas */ TBucketQuoter(TAtomic* inflow, TAtomic* capacity, StatCounter* msgPassed = nullptr, StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr, StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr) - : MsgPassed(msgPassed) - , BucketUnderflows(bucketUnderflows) - , TokensUsed(tokensUsed) + : MsgPassed(msgPassed) + , BucketUnderflows(bucketUnderflows) + , TokensUsed(tokensUsed) , UsecWaited(usecWaited) , AggregateInflow(aggregateInflow) , Bucket(fill ? AtomicGet(*capacity) : 0) , LastAdd(Timer::Now()) - , InflowTokensPerSecond(inflow) - , BucketTokensCapacity(capacity) - { - /* no-op */ - } - - bool IsAvail() { + , InflowTokensPerSecond(inflow) + , BucketTokensCapacity(capacity) + { + /* no-op */ + } + + bool IsAvail() { TGuard<Lock> g(BucketMutex); FillBucket(); if (Bucket < 0) { @@ -125,21 +125,21 @@ public: } return (Bucket >= 0); } - + bool IsAvail(TResult& res) { TGuard<Lock> g(BucketMutex); res.Before = Bucket; - FillBucket(); + FillBucket(); res.After = Bucket; res.Seqno = ++Seqno; - if (Bucket < 0) { - if (BucketUnderflows) { - (*BucketUnderflows)++; - } - } - return (Bucket >= 0); - } - + if (Bucket < 0) { + if (BucketUnderflows) { + (*BucketUnderflows)++; + } + } + return (Bucket >= 0); + } + ui64 GetAvail() { TGuard<Lock> g(BucketMutex); FillBucket(); @@ -158,8 +158,8 @@ public: void Use(ui64 tokens, bool sleep = false) { TGuard<Lock> g(BucketMutex); UseNoLock(tokens, sleep); - } - + } + void Use(ui64 tokens, TResult& res, bool sleep = false) { TGuard<Lock> g(BucketMutex); res.Before = Bucket; @@ -167,11 +167,11 @@ public: res.After = Bucket; res.Seqno = ++Seqno; } - + i64 UseAndFill(ui64 tokens) { TGuard<Lock> g(BucketMutex); UseNoLock(tokens); - FillBucket(); + FillBucket(); return Bucket; } @@ -192,14 +192,14 @@ public: TGuard<Lock> g(BucketMutex); FillBucket(); - if (Bucket >= 0) { - return 0; - } - - ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond); - return usec; - } - + if (Bucket >= 0) { + return 0; + } + + ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond); + return usec; + } + ui32 GetWaitTime(TResult& res) { TGuard<Lock> g(BucketMutex); res.Before = Bucket; @@ -213,22 +213,22 @@ public: return usec; } - void Sleep() { + void Sleep() { while (!IsAvail()) { - ui32 delay = GetWaitTime(); - if (delay != 0) { - usleep(delay); + ui32 delay = GetWaitTime(); + if (delay != 0) { + usleep(delay); if (UsecWaited) { (*UsecWaited) += delay; } - } - } - } - -private: - void FillBucket() { + } + } + } + +private: + void FillBucket() { TTime now = Timer::Now(); - + ui64 elapsed = Timer::Duration(LastAdd, now); if (*InflowTokensPerSecond * elapsed >= Timer::Resolution) { ui64 inflow = *InflowTokensPerSecond * elapsed / Timer::Resolution; @@ -236,14 +236,14 @@ private: *AggregateInflow += inflow; } Bucket += inflow; - if (Bucket > *BucketTokensCapacity) { - Bucket = *BucketTokensCapacity; - } - - LastAdd = now; - } - } - + if (Bucket > *BucketTokensCapacity) { + Bucket = *BucketTokensCapacity; + } + + LastAdd = now; + } + } + void UseNoLock(ui64 tokens, bool sleep = false) { if (sleep) Sleep(); @@ -268,14 +268,14 @@ private: StatCounter* TokensUsed; StatCounter* UsecWaited; StatCounter* AggregateInflow; - - i64 Bucket; + + i64 Bucket; TTime LastAdd; Lock BucketMutex; ui64 Seqno = 0; - + TAtomic* InflowTokensPerSecond; TAtomic* BucketTokensCapacity; - TAtomic FixedInflow; - TAtomic FixedCapacity; -}; + TAtomic FixedInflow; + TAtomic FixedCapacity; +}; diff --git a/library/cpp/lfalloc/lf_allocX64.cpp b/library/cpp/lfalloc/lf_allocX64.cpp index 3944c9a28f..2eb90761fe 100644 --- a/library/cpp/lfalloc/lf_allocX64.cpp +++ b/library/cpp/lfalloc/lf_allocX64.cpp @@ -49,10 +49,10 @@ extern "C" void* malloc(size_t size) { return LFAlloc(size); } -extern "C" void* valloc(size_t size) { +extern "C" void* valloc(size_t size) { return LFVAlloc(size); -} - +} + extern "C" int posix_memalign(void** memptr, size_t alignment, size_t size) { return LFPosixMemalign(memptr, alignment, size); } diff --git a/library/cpp/messagebus/message.cpp b/library/cpp/messagebus/message.cpp index 8083df8261..bfa7ed8e9b 100644 --- a/library/cpp/messagebus/message.cpp +++ b/library/cpp/messagebus/message.cpp @@ -163,8 +163,8 @@ namespace NBus { } else { GetHeader()->FlagsInternal &= ~(MESSAGE_COMPRESS_INTERNAL); } - } - + } + void TBusMessage::SetCompressedResponse(bool v) { if (v) { GetHeader()->FlagsInternal |= MESSAGE_COMPRESS_RESPONSE; diff --git a/library/cpp/messagebus/ya.make b/library/cpp/messagebus/ya.make index 7c95035e82..e13cf06dea 100644 --- a/library/cpp/messagebus/ya.make +++ b/library/cpp/messagebus/ya.make @@ -51,7 +51,7 @@ SRCS( ybus.h ) -PEERDIR( +PEERDIR( contrib/libs/sparsehash library/cpp/codecs library/cpp/deprecated/enum_codegen @@ -63,6 +63,6 @@ PEERDIR( library/cpp/messagebus/scheduler library/cpp/string_utils/indent_text library/cpp/threading/future -) - +) + END() diff --git a/library/cpp/messagebus/ybus.h b/library/cpp/messagebus/ybus.h index 4fbaaf851f..de21ad8521 100644 --- a/library/cpp/messagebus/ybus.h +++ b/library/cpp/messagebus/ybus.h @@ -98,7 +98,7 @@ namespace NBus { return NCodecs::ICodec::GetInstance("snappy"); } }; - + class TBusSyncSourceSession: public TAtomicRefCount<TBusSyncSourceSession> { friend class TBusMessageQueue; diff --git a/library/cpp/monlib/counters/counters.h b/library/cpp/monlib/counters/counters.h index 7bf3408a60..038b55f0c8 100644 --- a/library/cpp/monlib/counters/counters.h +++ b/library/cpp/monlib/counters/counters.h @@ -27,7 +27,7 @@ namespace NMonitoring { } #define OUTPUT_NAMED_COUNTER(var, name) out << name << ": \t" << var << NMonitoring::PrettyNum(var, prettyBuf, 32) << '\n' -#define OUTPUT_COUNTER(var) OUTPUT_NAMED_COUNTER(var, #var); +#define OUTPUT_COUNTER(var) OUTPUT_NAMED_COUNTER(var, #var); char* PrettyNumShort(i64 val, char* buf, size_t size); char* PrettyNum(i64 val, char* buf, size_t size); |