aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authorblaze <blaze@yandex-team.ru>2022-02-10 16:50:31 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:31 +0300
commit8de5e9fef85b2ab655e3bc1d77ee2674f417cd15 (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library
parent6813864abdb5ce336cde7a2e5cd80232ba54eef1 (diff)
downloadydb-8de5e9fef85b2ab655e3bc1d77ee2674f417cd15.tar.gz
Restoring authorship annotation for <blaze@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library')
-rw-r--r--library/cpp/actors/dnscachelib/dnscache.cpp404
-rw-r--r--library/cpp/actors/dnscachelib/dnscache.h138
-rw-r--r--library/cpp/actors/dnscachelib/timekeeper.h62
-rw-r--r--library/cpp/bucket_quoter/bucket_quoter.h224
-rw-r--r--library/cpp/lfalloc/lf_allocX64.cpp6
-rw-r--r--library/cpp/messagebus/message.cpp4
-rw-r--r--library/cpp/messagebus/ya.make6
-rw-r--r--library/cpp/messagebus/ybus.h2
-rw-r--r--library/cpp/monlib/counters/counters.h2
9 files changed, 424 insertions, 424 deletions
diff --git a/library/cpp/actors/dnscachelib/dnscache.cpp b/library/cpp/actors/dnscachelib/dnscache.cpp
index 7ca64c0d09..649339ddb2 100644
--- a/library/cpp/actors/dnscachelib/dnscache.cpp
+++ b/library/cpp/actors/dnscachelib/dnscache.cpp
@@ -1,17 +1,17 @@
-#include "dnscache.h"
+#include "dnscache.h"
#include "probes.h"
-#include "timekeeper.h"
-
-#include <contrib/libs/c-ares/ares.h>
-#include <util/system/guard.h>
+#include "timekeeper.h"
+
+#include <contrib/libs/c-ares/ares.h>
+#include <util/system/guard.h>
#include <util/datetime/systime.h>
-
+
const TDnsCache::THost TDnsCache::NullHost;
LWTRACE_USING(DNSCACHELIB_PROVIDER);
static_assert(sizeof(ares_channel) == sizeof(void*), "expect sizeof(ares_channel) == sizeof(void *)");
-
+
TDnsCache::TDnsCache(bool allowIpv4, bool allowIpv6, time_t lifetime, time_t neg, ui32 timeout)
: EntryLifetime(lifetime)
, NegativeLifetime(neg)
@@ -30,28 +30,28 @@ TDnsCache::TDnsCache(bool allowIpv4, bool allowIpv6, time_t lifetime, time_t neg
}
#endif
- ares_channel chan;
-
- if (ares_init(&chan) != ARES_SUCCESS) {
+ ares_channel chan;
+
+ if (ares_init(&chan) != ARES_SUCCESS) {
LWPROBE(AresInitFailed);
- ythrow yexception() << "ares_init() failed";
- }
- Channel = chan;
+ ythrow yexception() << "ares_init() failed";
+ }
+ Channel = chan;
LWPROBE(Created);
-}
-
+}
+
TDnsCache::~TDnsCache(void) {
- ares_channel chan = static_cast<ares_channel>(Channel);
-
- ares_cancel(chan);
- ares_destroy(chan);
+ ares_channel chan = static_cast<ares_channel>(Channel);
+
+ ares_cancel(chan);
+ ares_destroy(chan);
LWPROBE(Destroyed);
#ifdef _win_
ares_library_cleanup();
#endif
-}
-
+}
+
TString TDnsCache::GetHostByAddr(const NAddr::IRemoteAddr& addr) {
in6_addr key;
@@ -146,14 +146,14 @@ void TDnsCache::GetAllAddresses(
void TDnsCache::GetStats(ui64& a_cache_hits, ui64& a_cache_misses,
ui64& ptr_cache_hits, ui64& ptr_cache_misses) {
- TGuard<TMutex> lock(CacheMtx);
-
- a_cache_hits = ACacheHits;
- a_cache_misses = ACacheMisses;
- ptr_cache_hits = PtrCacheHits;
- ptr_cache_misses = PtrCacheMisses;
-}
-
+ TGuard<TMutex> lock(CacheMtx);
+
+ a_cache_hits = ACacheHits;
+ a_cache_misses = ACacheMisses;
+ ptr_cache_hits = PtrCacheHits;
+ ptr_cache_misses = PtrCacheMisses;
+}
+
bool TDnsCache::THost::IsStale(int family, const TDnsCache* ctx) const noexcept {
time_t resolved = family == AF_INET ? ResolvedV4 : ResolvedV6;
time_t notfound = family == AF_INET ? NotFoundV4 : NotFoundV6;
@@ -174,247 +174,247 @@ TDnsCache::Resolve(const TString& hostname, int family, bool cacheOnly) {
return NullHost;
}
- THostCache::iterator p;
-
+ THostCache::iterator p;
+
Y_ASSERT(family == AF_INET || family == AF_INET6);
-
- {
- TGuard<TMutex> lock(CacheMtx);
- p = HostCache.find(hostname);
- if (p != HostCache.end()) {
+
+ {
+ TGuard<TMutex> lock(CacheMtx);
+ p = HostCache.find(hostname);
+ if (p != HostCache.end()) {
if (!p->second.IsStale(family, this)) {
- /* Recently resolved, just return cached value */
- ACacheHits += 1;
+ /* Recently resolved, just return cached value */
+ ACacheHits += 1;
THost& host = p->second;
LWPROBE(ResolveFromCache, hostname, family, host.AddrsV4ToString(), host.AddrsV6ToString(), ACacheHits);
return host;
} else {
LWPROBE(ResolveCacheTimeout, hostname);
- }
- } else {
- /* Never resolved, create cache entry */
+ }
+ } else {
+ /* Never resolved, create cache entry */
LWPROBE(ResolveCacheNew, hostname);
p = HostCache.insert(std::make_pair(hostname, THost())).first;
- }
- ACacheMisses += 1;
- }
-
+ }
+ ACacheMisses += 1;
+ }
+
if (cacheOnly)
return NullHost;
TAtomic& inprogress = (family == AF_INET ? p->second.InProgressV4 : p->second.InProgressV6);
-
- {
- /* This way only! CacheMtx should always be taken AFTER AresMtx,
- * because later in ares_process it can only be done this way.
- * Lock order reversal will cause deadlock in unfortunate monents.
- */
- TGuard<TMutex> areslock(AresMtx);
- TGuard<TMutex> cachelock(CacheMtx);
-
+
+ {
+ /* This way only! CacheMtx should always be taken AFTER AresMtx,
+ * because later in ares_process it can only be done this way.
+ * Lock order reversal will cause deadlock in unfortunate monents.
+ */
+ TGuard<TMutex> areslock(AresMtx);
+ TGuard<TMutex> cachelock(CacheMtx);
+
if (!inprogress) {
- ares_channel chan = static_cast<ares_channel>(Channel);
+ ares_channel chan = static_cast<ares_channel>(Channel);
TGHBNContext* ctx = new TGHBNContext();
- ctx->Owner = this;
- ctx->Hostname = hostname;
- ctx->Family = family;
-
+ ctx->Owner = this;
+ ctx->Hostname = hostname;
+ ctx->Family = family;
+
AtomicSet(inprogress, 1);
- ares_gethostbyname(chan, hostname.c_str(), family,
+ ares_gethostbyname(chan, hostname.c_str(), family,
&TDnsCache::GHBNCallback, ctx);
- }
- }
-
- WaitTask(inprogress);
-
+ }
+ }
+
+ WaitTask(inprogress);
+
LWPROBE(ResolveDone, hostname, family, p->second.AddrsV4ToString(), p->second.AddrsV6ToString());
- return p->second;
-}
-
+ return p->second;
+}
+
bool TDnsCache::ValidateHName(const TString& name) const noexcept {
return name.size() > 0;
}
const TDnsCache::TAddr& TDnsCache::ResolveAddr(const in6_addr& addr, int family) {
- TAddrCache::iterator p;
-
- {
- TGuard<TMutex> lock(CacheMtx);
- p = AddrCache.find(addr);
- if (p != AddrCache.end()) {
+ TAddrCache::iterator p;
+
+ {
+ TGuard<TMutex> lock(CacheMtx);
+ p = AddrCache.find(addr);
+ if (p != AddrCache.end()) {
if (TTimeKeeper::GetTime() - p->second.Resolved < EntryLifetime || TTimeKeeper::GetTime() - p->second.NotFound < NegativeLifetime) {
- /* Recently resolved, just return cached value */
- PtrCacheHits += 1;
- return p->second;
- }
- } else {
- /* Never resolved, create cache entry */
-
+ /* Recently resolved, just return cached value */
+ PtrCacheHits += 1;
+ return p->second;
+ }
+ } else {
+ /* Never resolved, create cache entry */
+
p = AddrCache.insert(std::make_pair(addr, TAddr())).first;
- }
- PtrCacheMisses += 1;
- }
-
- {
- /* This way only! CacheMtx should always be taken AFTER AresMtx,
- * because later in ares_process it can only be done this way.
- * Lock order reversal will cause deadlock in unfortunate monents.
- */
- TGuard<TMutex> areslock(AresMtx);
- TGuard<TMutex> cachelock(CacheMtx);
-
+ }
+ PtrCacheMisses += 1;
+ }
+
+ {
+ /* This way only! CacheMtx should always be taken AFTER AresMtx,
+ * because later in ares_process it can only be done this way.
+ * Lock order reversal will cause deadlock in unfortunate monents.
+ */
+ TGuard<TMutex> areslock(AresMtx);
+ TGuard<TMutex> cachelock(CacheMtx);
+
if (!p->second.InProgress) {
- ares_channel chan = static_cast<ares_channel>(Channel);
+ ares_channel chan = static_cast<ares_channel>(Channel);
TGHBAContext* ctx = new TGHBAContext();
- ctx->Owner = this;
- ctx->Addr = addr;
-
+ ctx->Owner = this;
+ ctx->Addr = addr;
+
AtomicSet(p->second.InProgress, 1);
- ares_gethostbyaddr(chan, &addr,
+ ares_gethostbyaddr(chan, &addr,
family == AF_INET ? sizeof(in_addr) : sizeof(in6_addr),
family, &TDnsCache::GHBACallback, ctx);
- }
- }
-
- WaitTask(p->second.InProgress);
-
- return p->second;
-}
-
+ }
+ }
+
+ WaitTask(p->second.InProgress);
+
+ return p->second;
+}
+
void TDnsCache::WaitTask(TAtomic& flag) {
const TInstant start = TInstant(TTimeKeeper::GetTimeval());
-
+
while (AtomicGet(flag)) {
- ares_channel chan = static_cast<ares_channel>(Channel);
-
- struct pollfd pfd[ARES_GETSOCK_MAXNUM];
- int nfds;
- ares_socket_t socks[ARES_GETSOCK_MAXNUM];
- int bits;
-
- {
- TGuard<TMutex> lock(AresMtx);
- bits = ares_getsock(chan, socks, ARES_GETSOCK_MAXNUM);
- if (bits == 0) {
- /* other thread did our job */
- continue;
- }
- }
-
- for (nfds = 0; nfds < ARES_GETSOCK_MAXNUM; nfds++) {
- pfd[nfds].events = 0;
- pfd[nfds].revents = 0;
- if (ARES_GETSOCK_READABLE(bits, nfds)) {
- pfd[nfds].fd = socks[nfds];
+ ares_channel chan = static_cast<ares_channel>(Channel);
+
+ struct pollfd pfd[ARES_GETSOCK_MAXNUM];
+ int nfds;
+ ares_socket_t socks[ARES_GETSOCK_MAXNUM];
+ int bits;
+
+ {
+ TGuard<TMutex> lock(AresMtx);
+ bits = ares_getsock(chan, socks, ARES_GETSOCK_MAXNUM);
+ if (bits == 0) {
+ /* other thread did our job */
+ continue;
+ }
+ }
+
+ for (nfds = 0; nfds < ARES_GETSOCK_MAXNUM; nfds++) {
+ pfd[nfds].events = 0;
+ pfd[nfds].revents = 0;
+ if (ARES_GETSOCK_READABLE(bits, nfds)) {
+ pfd[nfds].fd = socks[nfds];
pfd[nfds].events |= POLLRDNORM | POLLIN;
- }
- if (ARES_GETSOCK_WRITABLE(bits, nfds)) {
- pfd[nfds].fd = socks[nfds];
+ }
+ if (ARES_GETSOCK_WRITABLE(bits, nfds)) {
+ pfd[nfds].fd = socks[nfds];
pfd[nfds].events |= POLLWRNORM | POLLOUT;
- }
- if (pfd[nfds].events == 0) {
- break;
- }
- }
-
+ }
+ if (pfd[nfds].events == 0) {
+ break;
+ }
+ }
+
Y_ASSERT(nfds != 0);
-
+
const TDuration left = TInstant(TTimeKeeper::GetTimeval()) - start;
const TDuration wait = Max(Timeout - left, TDuration::Zero());
int rv = poll(pfd, nfds, wait.MilliSeconds());
- if (rv == -1) {
- if (errno == EINTR) {
- continue;
- }
- /* Unknown error in select, can't recover. Just pretend there was no reply */
- rv = 0;
- }
-
- if (rv == 0) {
- /* poll() timed out */
- TGuard<TMutex> lock(AresMtx);
- ares_process_fd(chan, ARES_SOCKET_BAD, ARES_SOCKET_BAD);
- } else {
- for (int i = 0; i < nfds; i++) {
- if (pfd[i].revents == 0) {
- continue;
- }
- TGuard<TMutex> lock(AresMtx);
- ares_process_fd(chan,
+ if (rv == -1) {
+ if (errno == EINTR) {
+ continue;
+ }
+ /* Unknown error in select, can't recover. Just pretend there was no reply */
+ rv = 0;
+ }
+
+ if (rv == 0) {
+ /* poll() timed out */
+ TGuard<TMutex> lock(AresMtx);
+ ares_process_fd(chan, ARES_SOCKET_BAD, ARES_SOCKET_BAD);
+ } else {
+ for (int i = 0; i < nfds; i++) {
+ if (pfd[i].revents == 0) {
+ continue;
+ }
+ TGuard<TMutex> lock(AresMtx);
+ ares_process_fd(chan,
pfd[i].revents & (POLLRDNORM | POLLIN)
? pfd[i].fd
: ARES_SOCKET_BAD,
pfd[i].revents & (POLLWRNORM | POLLOUT)
? pfd[i].fd
: ARES_SOCKET_BAD);
- }
- }
+ }
+ }
if (start + Timeout <= TInstant(TTimeKeeper::GetTimeval())) {
break;
}
- }
-}
-
+ }
+}
+
void TDnsCache::GHBNCallback(void* arg, int status, int, struct hostent* info) {
THolder<TGHBNContext> ctx(static_cast<TGHBNContext*>(arg));
- TGuard<TMutex> lock(ctx->Owner->CacheMtx);
- THostCache::iterator p = ctx->Owner->HostCache.find(ctx->Hostname);
-
+ TGuard<TMutex> lock(ctx->Owner->CacheMtx);
+ THostCache::iterator p = ctx->Owner->HostCache.find(ctx->Hostname);
+
Y_ASSERT(p != ctx->Owner->HostCache.end());
-
+
time_t& resolved = (ctx->Family == AF_INET ? p->second.ResolvedV4 : p->second.ResolvedV6);
time_t& notfound = (ctx->Family == AF_INET ? p->second.NotFoundV4 : p->second.NotFoundV6);
TAtomic& inprogress = (ctx->Family == AF_INET ? p->second.InProgressV4 : p->second.InProgressV6);
-
- if (status == ARES_SUCCESS) {
- if (info->h_addrtype == AF_INET) {
- p->second.AddrsV4.clear();
+
+ if (status == ARES_SUCCESS) {
+ if (info->h_addrtype == AF_INET) {
+ p->second.AddrsV4.clear();
for (int i = 0; info->h_addr_list[i] != nullptr; i++) {
p->second.AddrsV4.push_back(*(TIpHost*)(info->h_addr_list[i]));
- }
- /* It is possible to ask ares for IPv6 and have IPv4 addrs instead,
- so take care and set V4 timers anyway.
- */
- p->second.ResolvedV4 = TTimeKeeper::GetTime();
- p->second.ResolvedV4 = 0;
+ }
+ /* It is possible to ask ares for IPv6 and have IPv4 addrs instead,
+ so take care and set V4 timers anyway.
+ */
+ p->second.ResolvedV4 = TTimeKeeper::GetTime();
+ p->second.ResolvedV4 = 0;
AtomicSet(p->second.InProgressV4, 0);
- } else if (info->h_addrtype == AF_INET6) {
- p->second.AddrsV6.clear();
+ } else if (info->h_addrtype == AF_INET6) {
+ p->second.AddrsV6.clear();
for (int i = 0; info->h_addr_list[i] != nullptr; i++) {
p->second.AddrsV6.push_back(*(struct in6_addr*)(info->h_addr_list[i]));
- }
- } else {
+ }
+ } else {
Y_FAIL("unknown address type in ares callback");
- }
- resolved = TTimeKeeper::GetTime();
- notfound = 0;
- } else {
- notfound = TTimeKeeper::GetTime();
- resolved = 0;
- }
+ }
+ resolved = TTimeKeeper::GetTime();
+ notfound = 0;
+ } else {
+ notfound = TTimeKeeper::GetTime();
+ resolved = 0;
+ }
AtomicSet(inprogress, 0);
-}
-
+}
+
void TDnsCache::GHBACallback(void* arg, int status, int, struct hostent* info) {
THolder<TGHBAContext> ctx(static_cast<TGHBAContext*>(arg));
- TGuard<TMutex> lock(ctx->Owner->CacheMtx);
- TAddrCache::iterator p = ctx->Owner->AddrCache.find(ctx->Addr);
-
+ TGuard<TMutex> lock(ctx->Owner->CacheMtx);
+ TAddrCache::iterator p = ctx->Owner->AddrCache.find(ctx->Addr);
+
Y_ASSERT(p != ctx->Owner->AddrCache.end());
-
- if (status == ARES_SUCCESS) {
- p->second.Hostname = info->h_name;
- p->second.Resolved = TTimeKeeper::GetTime();
- p->second.NotFound = 0;
- } else {
- p->second.NotFound = TTimeKeeper::GetTime();
- p->second.Resolved = 0;
- }
+
+ if (status == ARES_SUCCESS) {
+ p->second.Hostname = info->h_name;
+ p->second.Resolved = TTimeKeeper::GetTime();
+ p->second.NotFound = 0;
+ } else {
+ p->second.NotFound = TTimeKeeper::GetTime();
+ p->second.Resolved = 0;
+ }
AtomicSet(p->second.InProgress, 0);
-}
+}
TString TDnsCache::THost::AddrsV4ToString() const {
TStringStream ss;
diff --git a/library/cpp/actors/dnscachelib/dnscache.h b/library/cpp/actors/dnscachelib/dnscache.h
index a8331ad080..3313a251a1 100644
--- a/library/cpp/actors/dnscachelib/dnscache.h
+++ b/library/cpp/actors/dnscachelib/dnscache.h
@@ -1,57 +1,57 @@
-#pragma once
-
+#pragma once
+
#include <contrib/libs/c-ares/ares.h>
-#include <util/generic/map.h>
-#include <util/generic/vector.h>
-#include <util/network/address.h>
-#include <util/system/mutex.h>
+#include <util/generic/map.h>
+#include <util/generic/vector.h>
+#include <util/network/address.h>
+#include <util/system/mutex.h>
#include <util/datetime/base.h>
-
-/** Asynchronous DNS resolver.
- *
- * This is NOT general purpose resolver! It is designed with very specific assumptions:
- * 1) there is relatively small and rarely changed set of resolved names (like, server pool in cluster)
- * 2) this names supposed to have addresses, absense of A record is equal to DNS error
- * 3) most of the time IP addresses do not change
- * 4) it's OK to return old IP address when DNS server not responding in time
- */
-
-class TDnsCache {
-public:
+
+/** Asynchronous DNS resolver.
+ *
+ * This is NOT general purpose resolver! It is designed with very specific assumptions:
+ * 1) there is relatively small and rarely changed set of resolved names (like, server pool in cluster)
+ * 2) this names supposed to have addresses, absense of A record is equal to DNS error
+ * 3) most of the time IP addresses do not change
+ * 4) it's OK to return old IP address when DNS server not responding in time
+ */
+
+class TDnsCache {
+public:
TDnsCache(bool allowIpv4 = true, bool allowIpv6 = true, time_t entry_lifetime = 1800, time_t neg_lifetime = 1, ui32 request_timeout = 500000);
~TDnsCache();
-
+
TString GetHostByAddr(const NAddr::IRemoteAddr&);
-
+
// ip in network byte order
TIpHost Get(const TString& host);
- /* use with AF_INET, AF_INET6 or AF_UNSPEC */
+ /* use with AF_INET, AF_INET6 or AF_UNSPEC */
NAddr::IRemoteAddrPtr GetAddr(const TString& host,
int family,
TIpPort port = 0,
bool cacheOnly = false);
-
+
void GetAllAddresses(const TString& host, TVector<NAddr::IRemoteAddrPtr>&);
-
+
void GetStats(ui64& a_cache_hits, ui64& a_cache_misses,
ui64& ptr_cache_hits, ui64& ptr_cache_misses);
-
+
protected:
bool ValidateHName(const TString& host) const noexcept;
-private:
- struct TGHBNContext {
+private:
+ struct TGHBNContext {
TDnsCache* Owner;
TString Hostname;
- int Family;
- };
-
- struct TGHBAContext {
+ int Family;
+ };
+
+ struct TGHBAContext {
TDnsCache* Owner;
- in6_addr Addr;
- };
-
+ in6_addr Addr;
+ };
+
struct THost {
THost() noexcept {
}
@@ -60,7 +60,7 @@ private:
time_t ResolvedV4 = 0;
time_t NotFoundV4 = 0;
TAtomic InProgressV4 = 0;
-
+
TVector<in6_addr> AddrsV6;
time_t ResolvedV6 = 0;
time_t NotFoundV6 = 0;
@@ -70,8 +70,8 @@ private:
TString AddrsV6ToString() const;
bool IsStale(int family, const TDnsCache* ctx) const noexcept;
- };
-
+ };
+
typedef TMap<TString, THost> THostCache;
struct TAddr {
@@ -79,54 +79,54 @@ private:
time_t Resolved = 0;
time_t NotFound = 0;
TAtomic InProgress = 0;
- };
- /* IRemoteAddr is annoingly hard to use, so I'll use in6_addr as key
- * and put v4 addrs in it.
- */
- struct TAddrCmp {
+ };
+ /* IRemoteAddr is annoingly hard to use, so I'll use in6_addr as key
+ * and put v4 addrs in it.
+ */
+ struct TAddrCmp {
bool operator()(const in6_addr& left, const in6_addr& right) const {
- for (size_t i = 0; i < sizeof(left); i++) {
- if (left.s6_addr[i] < right.s6_addr[i]) {
- return true;
- } else if (left.s6_addr[i] > right.s6_addr[i]) {
- return false;
- }
- }
- // equal
- return false;
- }
- };
+ for (size_t i = 0; i < sizeof(left); i++) {
+ if (left.s6_addr[i] < right.s6_addr[i]) {
+ return true;
+ } else if (left.s6_addr[i] > right.s6_addr[i]) {
+ return false;
+ }
+ }
+ // equal
+ return false;
+ }
+ };
typedef TMap<in6_addr, TAddr, TAddrCmp> TAddrCache;
-
+
const THost& Resolve(const TString&, int family, bool cacheOnly = false);
const TAddr& ResolveAddr(const in6_addr&, int family);
void WaitTask(TAtomic&);
-
+
static void GHBNCallback(void* arg, int status, int timeouts,
struct hostent* info);
-
+
static void GHBACallback(void* arg, int status, int timeouts,
struct hostent* info);
-
- const time_t EntryLifetime;
- const time_t NegativeLifetime;
+
+ const time_t EntryLifetime;
+ const time_t NegativeLifetime;
const TDuration Timeout;
const bool AllowIpV4;
const bool AllowIpV6;
-
- TMutex CacheMtx;
- THostCache HostCache;
- TAddrCache AddrCache;
- ui64 ACacheHits;
- ui64 ACacheMisses;
- ui64 PtrCacheHits;
- ui64 PtrCacheMisses;
-
+
+ TMutex CacheMtx;
+ THostCache HostCache;
+ TAddrCache AddrCache;
+ ui64 ACacheHits;
+ ui64 ACacheMisses;
+ ui64 PtrCacheHits;
+ ui64 PtrCacheMisses;
+
const static THost NullHost;
- TMutex AresMtx;
+ TMutex AresMtx;
void* Channel;
struct TAresLibInit {
@@ -145,4 +145,4 @@ private:
};
static TAresLibInit InitAresLib;
-};
+};
diff --git a/library/cpp/actors/dnscachelib/timekeeper.h b/library/cpp/actors/dnscachelib/timekeeper.h
index eb25b14b7a..0528d8549c 100644
--- a/library/cpp/actors/dnscachelib/timekeeper.h
+++ b/library/cpp/actors/dnscachelib/timekeeper.h
@@ -1,31 +1,31 @@
-#pragma once
-
+#pragma once
+
#include <util/datetime/base.h>
-#include <util/generic/singleton.h>
+#include <util/generic/singleton.h>
#include <util/string/cast.h>
-#include <util/system/thread.h>
+#include <util/system/thread.h>
#include <util/system/event.h>
#include <util/system/env.h>
-
+
#include <cstdlib>
-/* Keeps current time accurate up to 1/10 second */
-
-class TTimeKeeper {
-public:
+/* Keeps current time accurate up to 1/10 second */
+
+class TTimeKeeper {
+public:
static TInstant GetNow(void) {
return TInstant::MicroSeconds(GetTime());
}
- static time_t GetTime(void) {
- return Singleton<TTimeKeeper>()->CurrentTime.tv_sec;
- }
-
+ static time_t GetTime(void) {
+ return Singleton<TTimeKeeper>()->CurrentTime.tv_sec;
+ }
+
static const struct timeval& GetTimeval(void) {
- return Singleton<TTimeKeeper>()->CurrentTime;
- }
-
- TTimeKeeper()
+ return Singleton<TTimeKeeper>()->CurrentTime;
+ }
+
+ TTimeKeeper()
: Thread(&TTimeKeeper::Worker, this)
{
ConstTime = !!GetEnv("TEST_TIME");
@@ -40,31 +40,31 @@ public:
gettimeofday(&CurrentTime, nullptr);
Thread.Start();
}
- }
-
- ~TTimeKeeper() {
+ }
+
+ ~TTimeKeeper() {
if (!ConstTime) {
Exit.Signal();
Thread.Join();
}
- }
-
-private:
- static const ui32 UpdateInterval = 100000;
- struct timeval CurrentTime;
+ }
+
+private:
+ static const ui32 UpdateInterval = 100000;
+ struct timeval CurrentTime;
bool ConstTime;
TSystemEvent Exit;
- TThread Thread;
-
+ TThread Thread;
+
static void* Worker(void* arg) {
TTimeKeeper* owner = static_cast<TTimeKeeper*>(arg);
do {
- /* Race condition may occur here but locking looks too expensive */
+ /* Race condition may occur here but locking looks too expensive */
gettimeofday(&owner->CurrentTime, nullptr);
} while (!owner->Exit.WaitT(TDuration::MicroSeconds(UpdateInterval)));
-
+
return nullptr;
- }
-};
+ }
+};
diff --git a/library/cpp/bucket_quoter/bucket_quoter.h b/library/cpp/bucket_quoter/bucket_quoter.h
index 03bf7d7641..3d92ef8450 100644
--- a/library/cpp/bucket_quoter/bucket_quoter.h
+++ b/library/cpp/bucket_quoter/bucket_quoter.h
@@ -1,44 +1,44 @@
-#pragma once
-
-#include <util/datetime/base.h>
-#include <util/system/mutex.h>
+#pragma once
+
+#include <util/datetime/base.h>
+#include <util/system/mutex.h>
#include <util/system/hp_timer.h>
-
-/* Token bucket.
- * Makes flow of *inflow* units per second in average, with up to *capacity* bursts.
- * Do not use for STRICT flow control.
- */
-
-/* samples: create and use quoter sending 1000 bytes per second on average,
- with up to 60 seconds quota buildup.
-
- TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL);
-
- for (;;) {
- T *msg = get_message();
-
- quoter.Sleep();
- quoter.Use(msg->GetSize());
- send_message(msg);
- }
-
- ----------------------------
-
- TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL);
-
- for (;;) {
- T *msg = get_message();
-
- while (! quoter.IsAvail()) {
- // do something else
- }
-
- quoter.Use(msg->GetSize());
- send_message(msg);
- }
-
-*/
-
+
+/* Token bucket.
+ * Makes flow of *inflow* units per second in average, with up to *capacity* bursts.
+ * Do not use for STRICT flow control.
+ */
+
+/* samples: create and use quoter sending 1000 bytes per second on average,
+ with up to 60 seconds quota buildup.
+
+ TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL);
+
+ for (;;) {
+ T *msg = get_message();
+
+ quoter.Sleep();
+ quoter.Use(msg->GetSize());
+ send_message(msg);
+ }
+
+ ----------------------------
+
+ TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL);
+
+ for (;;) {
+ T *msg = get_message();
+
+ while (! quoter.IsAvail()) {
+ // do something else
+ }
+
+ quoter.Use(msg->GetSize());
+ send_message(msg);
+ }
+
+*/
+
struct TInstantTimerMs {
using TTime = TInstant;
static constexpr ui64 Resolution = 1000ull; // milliseconds
@@ -69,8 +69,8 @@ struct THPTimerUs {
};
template <typename StatCounter, typename Lock = TMutex, typename Timer = TInstantTimerMs>
-class TBucketQuoter {
-public:
+class TBucketQuoter {
+public:
using TTime = typename Timer::TTime;
struct TResult {
@@ -79,43 +79,43 @@ public:
ui64 Seqno;
};
- /* fixed quota */
+ /* fixed quota */
TBucketQuoter(ui64 inflow, ui64 capacity, StatCounter* msgPassed = nullptr,
StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr,
StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr)
- : MsgPassed(msgPassed)
- , BucketUnderflows(bucketUnderflows)
- , TokensUsed(tokensUsed)
+ : MsgPassed(msgPassed)
+ , BucketUnderflows(bucketUnderflows)
+ , TokensUsed(tokensUsed)
, UsecWaited(usecWaited)
, AggregateInflow(aggregateInflow)
, Bucket(fill ? capacity : 0)
, LastAdd(Timer::Now())
- , InflowTokensPerSecond(&FixedInflow)
- , BucketTokensCapacity(&FixedCapacity)
- , FixedInflow(inflow)
- , FixedCapacity(capacity)
- {
- /* no-op */
- }
-
- /* adjustable quotas */
+ , InflowTokensPerSecond(&FixedInflow)
+ , BucketTokensCapacity(&FixedCapacity)
+ , FixedInflow(inflow)
+ , FixedCapacity(capacity)
+ {
+ /* no-op */
+ }
+
+ /* adjustable quotas */
TBucketQuoter(TAtomic* inflow, TAtomic* capacity, StatCounter* msgPassed = nullptr,
StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr,
StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr)
- : MsgPassed(msgPassed)
- , BucketUnderflows(bucketUnderflows)
- , TokensUsed(tokensUsed)
+ : MsgPassed(msgPassed)
+ , BucketUnderflows(bucketUnderflows)
+ , TokensUsed(tokensUsed)
, UsecWaited(usecWaited)
, AggregateInflow(aggregateInflow)
, Bucket(fill ? AtomicGet(*capacity) : 0)
, LastAdd(Timer::Now())
- , InflowTokensPerSecond(inflow)
- , BucketTokensCapacity(capacity)
- {
- /* no-op */
- }
-
- bool IsAvail() {
+ , InflowTokensPerSecond(inflow)
+ , BucketTokensCapacity(capacity)
+ {
+ /* no-op */
+ }
+
+ bool IsAvail() {
TGuard<Lock> g(BucketMutex);
FillBucket();
if (Bucket < 0) {
@@ -125,21 +125,21 @@ public:
}
return (Bucket >= 0);
}
-
+
bool IsAvail(TResult& res) {
TGuard<Lock> g(BucketMutex);
res.Before = Bucket;
- FillBucket();
+ FillBucket();
res.After = Bucket;
res.Seqno = ++Seqno;
- if (Bucket < 0) {
- if (BucketUnderflows) {
- (*BucketUnderflows)++;
- }
- }
- return (Bucket >= 0);
- }
-
+ if (Bucket < 0) {
+ if (BucketUnderflows) {
+ (*BucketUnderflows)++;
+ }
+ }
+ return (Bucket >= 0);
+ }
+
ui64 GetAvail() {
TGuard<Lock> g(BucketMutex);
FillBucket();
@@ -158,8 +158,8 @@ public:
void Use(ui64 tokens, bool sleep = false) {
TGuard<Lock> g(BucketMutex);
UseNoLock(tokens, sleep);
- }
-
+ }
+
void Use(ui64 tokens, TResult& res, bool sleep = false) {
TGuard<Lock> g(BucketMutex);
res.Before = Bucket;
@@ -167,11 +167,11 @@ public:
res.After = Bucket;
res.Seqno = ++Seqno;
}
-
+
i64 UseAndFill(ui64 tokens) {
TGuard<Lock> g(BucketMutex);
UseNoLock(tokens);
- FillBucket();
+ FillBucket();
return Bucket;
}
@@ -192,14 +192,14 @@ public:
TGuard<Lock> g(BucketMutex);
FillBucket();
- if (Bucket >= 0) {
- return 0;
- }
-
- ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond);
- return usec;
- }
-
+ if (Bucket >= 0) {
+ return 0;
+ }
+
+ ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond);
+ return usec;
+ }
+
ui32 GetWaitTime(TResult& res) {
TGuard<Lock> g(BucketMutex);
res.Before = Bucket;
@@ -213,22 +213,22 @@ public:
return usec;
}
- void Sleep() {
+ void Sleep() {
while (!IsAvail()) {
- ui32 delay = GetWaitTime();
- if (delay != 0) {
- usleep(delay);
+ ui32 delay = GetWaitTime();
+ if (delay != 0) {
+ usleep(delay);
if (UsecWaited) {
(*UsecWaited) += delay;
}
- }
- }
- }
-
-private:
- void FillBucket() {
+ }
+ }
+ }
+
+private:
+ void FillBucket() {
TTime now = Timer::Now();
-
+
ui64 elapsed = Timer::Duration(LastAdd, now);
if (*InflowTokensPerSecond * elapsed >= Timer::Resolution) {
ui64 inflow = *InflowTokensPerSecond * elapsed / Timer::Resolution;
@@ -236,14 +236,14 @@ private:
*AggregateInflow += inflow;
}
Bucket += inflow;
- if (Bucket > *BucketTokensCapacity) {
- Bucket = *BucketTokensCapacity;
- }
-
- LastAdd = now;
- }
- }
-
+ if (Bucket > *BucketTokensCapacity) {
+ Bucket = *BucketTokensCapacity;
+ }
+
+ LastAdd = now;
+ }
+ }
+
void UseNoLock(ui64 tokens, bool sleep = false) {
if (sleep)
Sleep();
@@ -268,14 +268,14 @@ private:
StatCounter* TokensUsed;
StatCounter* UsecWaited;
StatCounter* AggregateInflow;
-
- i64 Bucket;
+
+ i64 Bucket;
TTime LastAdd;
Lock BucketMutex;
ui64 Seqno = 0;
-
+
TAtomic* InflowTokensPerSecond;
TAtomic* BucketTokensCapacity;
- TAtomic FixedInflow;
- TAtomic FixedCapacity;
-};
+ TAtomic FixedInflow;
+ TAtomic FixedCapacity;
+};
diff --git a/library/cpp/lfalloc/lf_allocX64.cpp b/library/cpp/lfalloc/lf_allocX64.cpp
index 3944c9a28f..2eb90761fe 100644
--- a/library/cpp/lfalloc/lf_allocX64.cpp
+++ b/library/cpp/lfalloc/lf_allocX64.cpp
@@ -49,10 +49,10 @@ extern "C" void* malloc(size_t size) {
return LFAlloc(size);
}
-extern "C" void* valloc(size_t size) {
+extern "C" void* valloc(size_t size) {
return LFVAlloc(size);
-}
-
+}
+
extern "C" int posix_memalign(void** memptr, size_t alignment, size_t size) {
return LFPosixMemalign(memptr, alignment, size);
}
diff --git a/library/cpp/messagebus/message.cpp b/library/cpp/messagebus/message.cpp
index 8083df8261..bfa7ed8e9b 100644
--- a/library/cpp/messagebus/message.cpp
+++ b/library/cpp/messagebus/message.cpp
@@ -163,8 +163,8 @@ namespace NBus {
} else {
GetHeader()->FlagsInternal &= ~(MESSAGE_COMPRESS_INTERNAL);
}
- }
-
+ }
+
void TBusMessage::SetCompressedResponse(bool v) {
if (v) {
GetHeader()->FlagsInternal |= MESSAGE_COMPRESS_RESPONSE;
diff --git a/library/cpp/messagebus/ya.make b/library/cpp/messagebus/ya.make
index 7c95035e82..e13cf06dea 100644
--- a/library/cpp/messagebus/ya.make
+++ b/library/cpp/messagebus/ya.make
@@ -51,7 +51,7 @@ SRCS(
ybus.h
)
-PEERDIR(
+PEERDIR(
contrib/libs/sparsehash
library/cpp/codecs
library/cpp/deprecated/enum_codegen
@@ -63,6 +63,6 @@ PEERDIR(
library/cpp/messagebus/scheduler
library/cpp/string_utils/indent_text
library/cpp/threading/future
-)
-
+)
+
END()
diff --git a/library/cpp/messagebus/ybus.h b/library/cpp/messagebus/ybus.h
index 4fbaaf851f..de21ad8521 100644
--- a/library/cpp/messagebus/ybus.h
+++ b/library/cpp/messagebus/ybus.h
@@ -98,7 +98,7 @@ namespace NBus {
return NCodecs::ICodec::GetInstance("snappy");
}
};
-
+
class TBusSyncSourceSession: public TAtomicRefCount<TBusSyncSourceSession> {
friend class TBusMessageQueue;
diff --git a/library/cpp/monlib/counters/counters.h b/library/cpp/monlib/counters/counters.h
index 7bf3408a60..038b55f0c8 100644
--- a/library/cpp/monlib/counters/counters.h
+++ b/library/cpp/monlib/counters/counters.h
@@ -27,7 +27,7 @@ namespace NMonitoring {
}
#define OUTPUT_NAMED_COUNTER(var, name) out << name << ": \t" << var << NMonitoring::PrettyNum(var, prettyBuf, 32) << '\n'
-#define OUTPUT_COUNTER(var) OUTPUT_NAMED_COUNTER(var, #var);
+#define OUTPUT_COUNTER(var) OUTPUT_NAMED_COUNTER(var, #var);
char* PrettyNumShort(i64 val, char* buf, size_t size);
char* PrettyNum(i64 val, char* buf, size_t size);