diff options
author | and42 <and42@yandex-team.ru> | 2022-02-10 16:47:12 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:12 +0300 |
commit | 4fc9a1a64db469cc94894abfec740aa5c9e9789b (patch) | |
tree | 525f0b4c196e68c46ae8411cddc11211ef5670c1 | |
parent | 287d7d8c4ffc811d1e51c756ecfb13b78f4ee62d (diff) | |
download | ydb-4fc9a1a64db469cc94894abfec740aa5c9e9789b.tar.gz |
Restoring authorship annotation for <and42@yandex-team.ru>. Commit 1 of 2.
-rw-r--r-- | contrib/libs/ya.make | 2 | ||||
-rw-r--r-- | library/cpp/coroutine/engine/sockpool.cpp | 104 | ||||
-rw-r--r-- | library/cpp/coroutine/engine/sockpool.h | 12 | ||||
-rw-r--r-- | library/cpp/coroutine/engine/ya.make | 2 | ||||
-rw-r--r-- | library/cpp/dns/README.md | 18 | ||||
-rw-r--r-- | library/cpp/dns/cache.cpp | 196 | ||||
-rw-r--r-- | library/cpp/dns/cache.h | 34 | ||||
-rw-r--r-- | library/cpp/dns/magic.cpp | 4 | ||||
-rw-r--r-- | library/cpp/dns/magic.h | 2 | ||||
-rw-r--r-- | library/cpp/dns/thread.cpp | 42 | ||||
-rw-r--r-- | library/cpp/dns/thread.h | 20 | ||||
-rw-r--r-- | library/cpp/dns/ya.make | 22 | ||||
-rw-r--r-- | library/cpp/http/io/stream.cpp | 8 | ||||
-rw-r--r-- | library/cpp/http/server/http_ut.cpp | 190 | ||||
-rw-r--r-- | library/cpp/logger/element.h | 2 | ||||
-rw-r--r-- | util/network/endpoint.cpp | 104 | ||||
-rw-r--r-- | util/network/endpoint.h | 100 | ||||
-rw-r--r-- | util/network/endpoint_ut.cpp | 116 | ||||
-rw-r--r-- | util/system/atexit.cpp | 8 | ||||
-rw-r--r-- | util/system/event.cpp | 20 | ||||
-rw-r--r-- | util/system/event_ut.cpp | 26 |
21 files changed, 516 insertions, 516 deletions
diff --git a/contrib/libs/ya.make b/contrib/libs/ya.make index 9c4640fdcf..38f192ce0c 100644 --- a/contrib/libs/ya.make +++ b/contrib/libs/ya.make @@ -257,7 +257,7 @@ RECURSE( openmp openssl opentracing-cpp - opus + opus ortools ortools/proto osrm diff --git a/library/cpp/coroutine/engine/sockpool.cpp b/library/cpp/coroutine/engine/sockpool.cpp index b9482e780f..01f8c35a4d 100644 --- a/library/cpp/coroutine/engine/sockpool.cpp +++ b/library/cpp/coroutine/engine/sockpool.cpp @@ -1,58 +1,58 @@ -#include "sockpool.h" - -void SetCommonSockOpts(SOCKET sock, const struct sockaddr* sa) { - SetSockOpt(sock, SOL_SOCKET, SO_REUSEADDR, 1); - - if (!sa || sa->sa_family == AF_INET) { - sockaddr_in s_in; - s_in.sin_family = AF_INET; - s_in.sin_addr.s_addr = INADDR_ANY; - s_in.sin_port = 0; - - if (bind(sock, (struct sockaddr*)&s_in, sizeof(s_in)) == -1) { - warn("bind"); - } - } else if (sa->sa_family == AF_INET6) { +#include "sockpool.h" + +void SetCommonSockOpts(SOCKET sock, const struct sockaddr* sa) { + SetSockOpt(sock, SOL_SOCKET, SO_REUSEADDR, 1); + + if (!sa || sa->sa_family == AF_INET) { + sockaddr_in s_in; + s_in.sin_family = AF_INET; + s_in.sin_addr.s_addr = INADDR_ANY; + s_in.sin_port = 0; + + if (bind(sock, (struct sockaddr*)&s_in, sizeof(s_in)) == -1) { + warn("bind"); + } + } else if (sa->sa_family == AF_INET6) { sockaddr_in6 s_in6(*(const sockaddr_in6*)sa); - Zero(s_in6.sin6_addr); - s_in6.sin6_port = 0; - - if (bind(sock, (const struct sockaddr*)&s_in6, sizeof s_in6) == -1) { - warn("bind6"); - } - } else { + Zero(s_in6.sin6_addr); + s_in6.sin6_port = 0; + + if (bind(sock, (const struct sockaddr*)&s_in6, sizeof s_in6) == -1) { + warn("bind6"); + } + } else { Y_ASSERT(0); - } - - SetNoDelay(sock, true); -} - -TPooledSocket TSocketPool::AllocateMore(TConnectData* conn) { - TCont* cont = conn->Cont; - - while (true) { + } + + SetNoDelay(sock, true); +} + +TPooledSocket TSocketPool::AllocateMore(TConnectData* conn) { + TCont* cont = conn->Cont; + + while (true) { TSocketHolder s(NCoro::Socket(Addr_->Addr()->sa_family, SOCK_STREAM, 0)); - - if (s == INVALID_SOCKET) { + + if (s == INVALID_SOCKET) { ythrow TSystemError(errno) << TStringBuf("can not create socket"); - } - - SetCommonSockOpts(s, Addr_->Addr()); - SetZeroLinger(s); - + } + + SetCommonSockOpts(s, Addr_->Addr()); + SetZeroLinger(s); + const int ret = NCoro::ConnectD(cont, s, Addr_->Addr(), Addr_->Len(), conn->DeadLine); - - if (ret == EINTR) { - continue; - } else if (ret) { + + if (ret == EINTR) { + continue; + } else if (ret) { ythrow TSystemError(ret) << TStringBuf("can not connect(") << cont->Name() << ')'; - } - - THolder<TPooledSocket::TImpl> res(new TPooledSocket::TImpl(s, this)); - s.Release(); - - if (res->IsOpen()) { - return res.Release(); - } - } -} + } + + THolder<TPooledSocket::TImpl> res(new TPooledSocket::TImpl(s, this)); + s.Release(); + + if (res->IsOpen()) { + return res.Release(); + } + } +} diff --git a/library/cpp/coroutine/engine/sockpool.h b/library/cpp/coroutine/engine/sockpool.h index 1ebb7e7b38..da1f3b3a04 100644 --- a/library/cpp/coroutine/engine/sockpool.h +++ b/library/cpp/coroutine/engine/sockpool.h @@ -1,11 +1,11 @@ #pragma once -#include "impl.h" +#include "impl.h" #include "network.h" -#include <util/network/address.h> -#include <util/network/socket.h> -#include <util/system/mutex.h> +#include <util/network/address.h> +#include <util/network/socket.h> +#include <util/system/mutex.h> extern void SetCommonSockOpts(SOCKET sock, const struct sockaddr* sa = nullptr); @@ -143,7 +143,7 @@ class TSocketPool { public: typedef TAtomicSharedPtr<NAddr::IRemoteAddr> TAddrRef; - + TSocketPool(int ip, int port) : Addr_(new NAddr::TIPv4Addr(TIpAddress((ui32)ip, (ui16)port))) { @@ -153,7 +153,7 @@ public: : Addr_(addr) { } - + void EraseStale(const TInstant& maxAge) noexcept { TSockets toDelete; diff --git a/library/cpp/coroutine/engine/ya.make b/library/cpp/coroutine/engine/ya.make index 8c20b9afc3..b81b35476e 100644 --- a/library/cpp/coroutine/engine/ya.make +++ b/library/cpp/coroutine/engine/ya.make @@ -20,7 +20,7 @@ SRCS( iostatus.cpp network.cpp poller.cpp - sockpool.cpp + sockpool.cpp stack/stack.cpp stack/stack_allocator.cpp stack/stack_guards.cpp diff --git a/library/cpp/dns/README.md b/library/cpp/dns/README.md index 6659d73ea2..81184e2948 100644 --- a/library/cpp/dns/README.md +++ b/library/cpp/dns/README.md @@ -1,9 +1,9 @@ -Overview -=== -Библиотека кеширующего resolving-а - изначально писалась для имплементации neh http протокола, использующей корутины. -Для предотвращения пробоя короткого стека корутин есть метод, предусматривающий вынос в отдельный тред собственно вызов функции резолвинга. -Для предотвращения обращения к DNS серверам (использования вместо этого заранее заданных ip-адресов), -предусмотрена ручка добавления alias-ов hosname -> ip-address (требование от метапоискового движка). - -Из-за того, что библиотка разрабатывалась под задачу максимально быстрого резолвинга добавлены слои кеширования результатов -resoving-а, - возможности сбросить кеш для того, чтобы получить более свежие адреса для указанного host-а _нет_. +Overview +=== +Библиотека кеширующего resolving-а - изначально писалась для имплементации neh http протокола, использующей корутины. +Для предотвращения пробоя короткого стека корутин есть метод, предусматривающий вынос в отдельный тред собственно вызов функции резолвинга. +Для предотвращения обращения к DNS серверам (использования вместо этого заранее заданных ip-адресов), +предусмотрена ручка добавления alias-ов hosname -> ip-address (требование от метапоискового движка). + +Из-за того, что библиотка разрабатывалась под задачу максимально быстрого резолвинга добавлены слои кеширования результатов +resoving-а, - возможности сбросить кеш для того, чтобы получить более свежие адреса для указанного host-а _нет_. diff --git a/library/cpp/dns/cache.cpp b/library/cpp/dns/cache.cpp index 05c14e82fc..74c52ba493 100644 --- a/library/cpp/dns/cache.cpp +++ b/library/cpp/dns/cache.cpp @@ -1,103 +1,103 @@ -#include "cache.h" +#include "cache.h" -#include "thread.h" - -#include <util/system/tls.h> -#include <util/system/info.h> -#include <util/system/rwlock.h> +#include "thread.h" + +#include <util/system/tls.h> +#include <util/system/info.h> +#include <util/system/rwlock.h> #include <util/thread/singleton.h> -#include <util/generic/singleton.h> +#include <util/generic/singleton.h> #include <util/generic/hash.h> -using namespace NDns; +using namespace NDns; namespace { - struct TResolveTask { - enum EMethod { + struct TResolveTask { + enum EMethod { Normal, Threaded - }; + }; - inline TResolveTask(const TResolveInfo& info, EMethod method) - : Info(info) - , Method(method) - { - } + inline TResolveTask(const TResolveInfo& info, EMethod method) + : Info(info) + , Method(method) + { + } - const TResolveInfo& Info; - const EMethod Method; - }; + const TResolveInfo& Info; + const EMethod Method; + }; - class IDns { - public: + class IDns { + public: virtual ~IDns() = default; - virtual const TResolvedHost* Resolve(const TResolveTask&) = 0; - }; - + virtual const TResolvedHost* Resolve(const TResolveTask&) = 0; + }; + typedef TAtomicSharedPtr<TResolvedHost> TResolvedHostPtr; - struct THashResolveInfo { - inline size_t operator()(const TResolveInfo& ri) const { + struct THashResolveInfo { + inline size_t operator()(const TResolveInfo& ri) const { return ComputeHash(ri.Host) ^ ri.Port; - } - }; + } + }; - struct TCompareResolveInfo { - inline bool operator()(const NDns::TResolveInfo& x, const NDns::TResolveInfo& y) const { - return x.Host == y.Host && x.Port == y.Port; - } - }; + struct TCompareResolveInfo { + inline bool operator()(const NDns::TResolveInfo& x, const NDns::TResolveInfo& y) const { + return x.Host == y.Host && x.Port == y.Port; + } + }; - class TGlobalCachedDns: public IDns, public TNonCopyable { - public: + class TGlobalCachedDns: public IDns, public TNonCopyable { + public: const TResolvedHost* Resolve(const TResolveTask& rt) override { - //2. search host in cache - { - TReadGuard guard(L_); + //2. search host in cache + { + TReadGuard guard(L_); - TCache::const_iterator it = C_.find(rt.Info); + TCache::const_iterator it = C_.find(rt.Info); - if (it != C_.end()) { - return it->second.Get(); + if (it != C_.end()) { + return it->second.Get(); } } - TResolvedHostPtr res = ResolveA(rt); + TResolvedHostPtr res = ResolveA(rt); - //update cache - { - TWriteGuard guard(L_); + //update cache + { + TWriteGuard guard(L_); std::pair<TCache::iterator, bool> updateResult = C_.insert(std::make_pair(TResolveInfo(res->Host, rt.Info.Port), res)); TResolvedHost* rh = updateResult.first->second.Get(); - if (updateResult.second) { - //fresh resolved host, set cache record id for it - rh->Id = C_.size() - 1; - } + if (updateResult.second) { + //fresh resolved host, set cache record id for it + rh->Id = C_.size() - 1; + } - return rh; + return rh; } - } + } void AddAlias(const TString& host, const TString& alias) noexcept { - TWriteGuard guard(LA_); + TWriteGuard guard(LA_); - A_[host] = alias; + A_[host] = alias; } - static inline TGlobalCachedDns* Instance() { - return SingletonWithPriority<TGlobalCachedDns, 65530>(); + static inline TGlobalCachedDns* Instance() { + return SingletonWithPriority<TGlobalCachedDns, 65530>(); } private: - inline TResolvedHostPtr ResolveA(const TResolveTask& rt) { + inline TResolvedHostPtr ResolveA(const TResolveTask& rt) { TString originalHost(rt.Info.Host); TString host(originalHost); - //3. replace host to alias, if exist - if (A_.size()) { - TReadGuard guard(LA_); + //3. replace host to alias, if exist + if (A_.size()) { + TReadGuard guard(LA_); TStringBuf names[] = {"*", host}; for (const auto& name : names) { @@ -109,32 +109,32 @@ namespace { } } - if (host.length() > 2 && host[0] == '[') { + if (host.length() > 2 && host[0] == '[') { TString unbracedIpV6(host.data() + 1, host.size() - 2); - host.swap(unbracedIpV6); - } - - TAutoPtr<TNetworkAddress> na; - - //4. getaddrinfo (direct or in separate thread) - if (rt.Method == TResolveTask::Normal) { - na.Reset(new TNetworkAddress(host, rt.Info.Port)); - } else if (rt.Method == TResolveTask::Threaded) { - na = ThreadedResolve(host, rt.Info.Port); - } else { + host.swap(unbracedIpV6); + } + + TAutoPtr<TNetworkAddress> na; + + //4. getaddrinfo (direct or in separate thread) + if (rt.Method == TResolveTask::Normal) { + na.Reset(new TNetworkAddress(host, rt.Info.Port)); + } else if (rt.Method == TResolveTask::Threaded) { + na = ThreadedResolve(host, rt.Info.Port); + } else { Y_ASSERT(0); throw yexception() << TStringBuf("invalid resolve method"); - } + } - return new TResolvedHost(originalHost, *na); + return new TResolvedHost(originalHost, *na); } typedef THashMap<TResolveInfo, TResolvedHostPtr, THashResolveInfo, TCompareResolveInfo> TCache; - TCache C_; - TRWMutex L_; + TCache C_; + TRWMutex L_; typedef THashMap<TString, TString> TAliases; - TAliases A_; - TRWMutex LA_; + TAliases A_; + TRWMutex LA_; }; class TCachedDns: public IDns { @@ -145,18 +145,18 @@ namespace { } const TResolvedHost* Resolve(const TResolveTask& rt) override { - //1. search in local thread cache + //1. search in local thread cache { - TCache::const_iterator it = C_.find(rt.Info); + TCache::const_iterator it = C_.find(rt.Info); if (it != C_.end()) { return it->second; } } - const TResolvedHost* res = S_->Resolve(rt); + const TResolvedHost* res = S_->Resolve(rt); - C_[TResolveInfo(res->Host, rt.Info.Port)] = res; + C_[TResolveInfo(res->Host, rt.Info.Port)] = res; return res; } @@ -167,9 +167,9 @@ namespace { IDns* S_; }; - struct TThreadedDns: public TCachedDns { + struct TThreadedDns: public TCachedDns { inline TThreadedDns() - : TCachedDns(TGlobalCachedDns::Instance()) + : TCachedDns(TGlobalCachedDns::Instance()) { } }; @@ -179,20 +179,20 @@ namespace { } } -namespace NDns { - const TResolvedHost* CachedResolve(const TResolveInfo& ri) { - TResolveTask rt(ri, TResolveTask::Normal); - - return ThrDns()->Resolve(rt); - } - - const TResolvedHost* CachedThrResolve(const TResolveInfo& ri) { - TResolveTask rt(ri, TResolveTask::Threaded); - - return ThrDns()->Resolve(rt); - } - +namespace NDns { + const TResolvedHost* CachedResolve(const TResolveInfo& ri) { + TResolveTask rt(ri, TResolveTask::Normal); + + return ThrDns()->Resolve(rt); + } + + const TResolvedHost* CachedThrResolve(const TResolveInfo& ri) { + TResolveTask rt(ri, TResolveTask::Threaded); + + return ThrDns()->Resolve(rt); + } + void AddHostAlias(const TString& host, const TString& alias) { - TGlobalCachedDns::Instance()->AddAlias(host, alias); - } + TGlobalCachedDns::Instance()->AddAlias(host, alias); + } } diff --git a/library/cpp/dns/cache.h b/library/cpp/dns/cache.h index eda5dc4070..a29379cb5e 100644 --- a/library/cpp/dns/cache.h +++ b/library/cpp/dns/cache.h @@ -4,7 +4,7 @@ #include <util/generic/strbuf.h> #include <util/generic/string.h> -namespace NDns { +namespace NDns { struct TResolveInfo { inline TResolveInfo(const TStringBuf& host, ui16 port) : Host(host) @@ -18,28 +18,28 @@ namespace NDns { struct TResolvedHost { inline TResolvedHost(const TString& host, const TNetworkAddress& addr) noexcept - : Host(host) - , Addr(addr) - , Id(0) + : Host(host) + , Addr(addr) + , Id(0) { } TString Host; //resolved hostname (from TResolveInfo, - before aliasing) TNetworkAddress Addr; - size_t Id; //cache record id + size_t Id; //cache record id }; - // Resolving order: - // 1. check local thread cache, return if found - // 2. check global cache, return if found - // 3. search alias for hostname, if found, continue resolving alias - // 4. normal resolver - const TResolvedHost* CachedResolve(const TResolveInfo& ri); - - //like previous, but at stage 4 use separate thread for resolving (created on first usage) - //useful in green-threads with tiny stack - const TResolvedHost* CachedThrResolve(const TResolveInfo& ri); - - //create alias for host, which can be used for static resolving (when alias is ip address) + // Resolving order: + // 1. check local thread cache, return if found + // 2. check global cache, return if found + // 3. search alias for hostname, if found, continue resolving alias + // 4. normal resolver + const TResolvedHost* CachedResolve(const TResolveInfo& ri); + + //like previous, but at stage 4 use separate thread for resolving (created on first usage) + //useful in green-threads with tiny stack + const TResolvedHost* CachedThrResolve(const TResolveInfo& ri); + + //create alias for host, which can be used for static resolving (when alias is ip address) void AddHostAlias(const TString& host, const TString& alias); } diff --git a/library/cpp/dns/magic.cpp b/library/cpp/dns/magic.cpp index b93792146f..12b21d4156 100644 --- a/library/cpp/dns/magic.cpp +++ b/library/cpp/dns/magic.cpp @@ -2,7 +2,7 @@ #include <util/generic/yexception.h> -using namespace NDns; +using namespace NDns; namespace { namespace NX { @@ -21,7 +21,7 @@ namespace { } } -IErrorRef NDns::SaveError() { +IErrorRef NDns::SaveError() { using namespace NX; return new NX::TError(); diff --git a/library/cpp/dns/magic.h b/library/cpp/dns/magic.h index d52cde0a6c..64a98fc9cc 100644 --- a/library/cpp/dns/magic.h +++ b/library/cpp/dns/magic.h @@ -3,7 +3,7 @@ #include <util/generic/yexception.h> #include <util/generic/ptr.h> -namespace NDns { +namespace NDns { class IError { public: virtual ~IError() = default; diff --git a/library/cpp/dns/thread.cpp b/library/cpp/dns/thread.cpp index 8b27d2d527..d091b0f144 100644 --- a/library/cpp/dns/thread.cpp +++ b/library/cpp/dns/thread.cpp @@ -1,27 +1,27 @@ -#include "thread.h" - +#include "thread.h" + #include "magic.h" -#include <util/network/socket.h> +#include <util/network/socket.h> #include <util/thread/factory.h> -#include <util/thread/lfqueue.h> -#include <util/system/event.h> +#include <util/thread/lfqueue.h> +#include <util/system/event.h> #include <util/generic/vector.h> -#include <util/generic/singleton.h> +#include <util/generic/singleton.h> -using namespace NDns; +using namespace NDns; namespace { class TThreadedResolver: public IThreadFactory::IThreadAble, public TNonCopyable { struct TResolveRequest { inline TResolveRequest(const TString& host, ui16 port) - : Host(host) - , Port(port) + : Host(host) + , Port(port) { } - inline TNetworkAddressPtr Wait() { - E.Wait(); + inline TNetworkAddressPtr Wait() { + E.Wait(); if (!Error) { if (!Result) { @@ -38,7 +38,7 @@ namespace { inline void Resolve() noexcept { try { - Result = new TNetworkAddress(Host, Port); + Result = new TNetworkAddress(Host, Port); } catch (...) { Error = SaveError(); } @@ -47,20 +47,20 @@ namespace { } inline void Wake() noexcept { - E.Signal(); + E.Signal(); } TString Host; - ui16 Port; + ui16 Port; TManualEvent E; - TNetworkAddressPtr Result; + TNetworkAddressPtr Result; IErrorRef Error; }; public: - inline TThreadedResolver() + inline TThreadedResolver() : E_(TSystemEvent::rAuto) - { + { T_.push_back(SystemThreadFactory()->Run(this)); } @@ -87,7 +87,7 @@ namespace { } inline TNetworkAddressPtr Resolve(const TString& host, ui16 port) { - TResolveRequest rr(host, port); + TResolveRequest rr(host, port); Schedule(&rr); @@ -124,10 +124,10 @@ namespace { typedef TAutoPtr<IThreadFactory::IThread> IThreadRef; TVector<IThreadRef> T_; }; -} +} -namespace NDns { +namespace NDns { TNetworkAddressPtr ThreadedResolve(const TString& host, ui16 port) { - return TThreadedResolver::Instance()->Resolve(host, port); + return TThreadedResolver::Instance()->Resolve(host, port); } } diff --git a/library/cpp/dns/thread.h b/library/cpp/dns/thread.h index 06b41d78ce..16b63844ac 100644 --- a/library/cpp/dns/thread.h +++ b/library/cpp/dns/thread.h @@ -1,12 +1,12 @@ -#pragma once - -#include <util/network/socket.h> - +#pragma once + +#include <util/network/socket.h> + #include <util/generic/string.h> -#include <util/generic/ptr.h> - -namespace NDns { - typedef TAutoPtr<TNetworkAddress> TNetworkAddressPtr; - +#include <util/generic/ptr.h> + +namespace NDns { + typedef TAutoPtr<TNetworkAddress> TNetworkAddressPtr; + TNetworkAddressPtr ThreadedResolve(const TString& host, ui16 port); -} +} diff --git a/library/cpp/dns/ya.make b/library/cpp/dns/ya.make index 0cd83332aa..7fb9372138 100644 --- a/library/cpp/dns/ya.make +++ b/library/cpp/dns/ya.make @@ -1,11 +1,11 @@ -LIBRARY() - -OWNER(and42) - -SRCS( - cache.cpp - thread.cpp - magic.cpp -) - -END() +LIBRARY() + +OWNER(and42) + +SRCS( + cache.cpp + thread.cpp + magic.cpp +) + +END() diff --git a/library/cpp/http/io/stream.cpp b/library/cpp/http/io/stream.cpp index 6689be684f..13920e3776 100644 --- a/library/cpp/http/io/stream.cpp +++ b/library/cpp/http/io/stream.cpp @@ -384,7 +384,7 @@ private: bool HasContentLength_; ui64 ContentLength_; - + bool ContentEncoded_; bool Expect100Continue_; }; @@ -445,9 +445,9 @@ bool THttpInput::GetContentLength(ui64& value) const noexcept { } bool THttpInput::ContentEncoded() const noexcept { - return Impl_->ContentEncoded(); -} - + return Impl_->ContentEncoded(); +} + bool THttpInput::HasContent() const noexcept { return Impl_->HasContent(); } diff --git a/library/cpp/http/server/http_ut.cpp b/library/cpp/http/server/http_ut.cpp index cc62bb988e..b1532e5915 100644 --- a/library/cpp/http/server/http_ut.cpp +++ b/library/cpp/http/server/http_ut.cpp @@ -6,13 +6,13 @@ #include <util/generic/cast.h> #include <util/stream/output.h> -#include <util/stream/zlib.h> +#include <util/stream/zlib.h> #include <util/system/datetime.h> #include <util/system/sem.h> Y_UNIT_TEST_SUITE(THttpServerTest) { class TEchoServer: public THttpServer::ICallBack { - class TRequest: public THttpClientRequestEx { + class TRequest: public THttpClientRequestEx { public: inline TRequest(TEchoServer* parent) : Parent_(parent) @@ -31,7 +31,7 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { Output() << Parent_->Res_; } Output().Finish(); - + return true; } @@ -138,13 +138,13 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { static const TString CrLf = "\r\n"; - struct TTestRequest { + struct TTestRequest { TTestRequest(ui16 port, TString content = TString()) - : Port(port) + : Port(port) , Content(std::move(content)) - { - } - + { + } + void CheckContinue(TSocketInput& si) { if (Expect100Continue) { TStringStream ss; @@ -164,43 +164,43 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { TString Execute() { TSocket* s = nullptr; - THolder<TSocket> singleReqSocket; - if (KeepAliveConnection) { + THolder<TSocket> singleReqSocket; + if (KeepAliveConnection) { if (!KeepAlivedSocket) { KeepAlivedSocket = MakeHolder<TSocket>(TNetworkAddress("localhost", Port), TDuration::Seconds(10)); } s = KeepAlivedSocket.Get(); - } else { - TNetworkAddress addr("localhost", Port); - singleReqSocket.Reset(new TSocket(addr, TDuration::Seconds(10))); - s = singleReqSocket.Get(); - } - bool isPost = Type == "POST"; + } else { + TNetworkAddress addr("localhost", Port); + singleReqSocket.Reset(new TSocket(addr, TDuration::Seconds(10))); + s = singleReqSocket.Get(); + } + bool isPost = Type == "POST"; TSocketInput si(*s); - - if (UseHttpOutput) { - TSocketOutput so(*s); - THttpOutput output(&so); - - output.EnableKeepAlive(KeepAliveConnection); - output.EnableCompression(EnableResponseEncoding); - - TStringStream r; - r << Type << " / HTTP/1.1" << CrLf; - r << "Host: localhost:" + ToString(Port) << CrLf; - if (isPost) { + + if (UseHttpOutput) { + TSocketOutput so(*s); + THttpOutput output(&so); + + output.EnableKeepAlive(KeepAliveConnection); + output.EnableCompression(EnableResponseEncoding); + + TStringStream r; + r << Type << " / HTTP/1.1" << CrLf; + r << "Host: localhost:" + ToString(Port) << CrLf; + if (isPost) { if (ContentEncoding.size()) { - r << "Content-Encoding: " << ContentEncoding << CrLf; - } else { - r << "Transfer-Encoding: chunked" << CrLf; - } + r << "Content-Encoding: " << ContentEncoding << CrLf; + } else { + r << "Transfer-Encoding: chunked" << CrLf; + } if (Expect100Continue) { r << "Expect: 100-continue" << CrLf; } - } - - r << CrLf; - if (isPost) { + } + + r << CrLf; + if (isPost) { output.Write(r.Str()); output.Flush(); CheckContinue(si); @@ -209,74 +209,74 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { } else { output.Write(r.Str()); output.Finish(); - } - } else { - TStringStream r; - r << Type << " / HTTP/1.1" << CrLf; - r << "Host: localhost:" + ToString(Port) << CrLf; - if (KeepAliveConnection) { - r << "Connection: Keep-Alive" << CrLf; - } else { - r << "Connection: Close" << CrLf; - } - if (EnableResponseEncoding) { - r << "Accept-Encoding: gzip, deflate, x-gzip, x-deflate, y-lzo, y-lzf, y-lzq, y-bzip2, y-lzma" << CrLf; - } + } + } else { + TStringStream r; + r << Type << " / HTTP/1.1" << CrLf; + r << "Host: localhost:" + ToString(Port) << CrLf; + if (KeepAliveConnection) { + r << "Connection: Keep-Alive" << CrLf; + } else { + r << "Connection: Close" << CrLf; + } + if (EnableResponseEncoding) { + r << "Accept-Encoding: gzip, deflate, x-gzip, x-deflate, y-lzo, y-lzf, y-lzq, y-bzip2, y-lzma" << CrLf; + } if (isPost && Expect100Continue) { r << "Expect: 100-continue" << CrLf; } if (isPost && ContentEncoding.size() && Content.size()) { - r << "Content-Encoding: " << ContentEncoding << CrLf; - TStringStream compressedContent; - { - TZLibCompress zlib(&compressedContent); + r << "Content-Encoding: " << ContentEncoding << CrLf; + TStringStream compressedContent; + { + TZLibCompress zlib(&compressedContent); zlib.Write(Content.data(), Content.size()); - zlib.Flush(); - zlib.Finish(); - } + zlib.Flush(); + zlib.Finish(); + } r << "Content-Length: " << compressedContent.Size() << CrLf; - r << CrLf; + r << CrLf; s->Send(r.Data(), r.Size()); CheckContinue(si); Hdr = r.Str(); TString tosend = compressedContent.Str(); s->Send(tosend.data(), tosend.size()); - } else { - if (isPost) { + } else { + if (isPost) { r << "Content-Length: " << Content.size() << CrLf; - r << CrLf; + r << CrLf; s->Send(r.Data(), r.Size()); CheckContinue(si); Hdr = r.Str(); s->Send(Content.data(), Content.size()); - } else { - r << CrLf; + } else { + r << CrLf; Hdr = r.Str(); s->Send(r.Data(), r.Size()); - } - } - } - - THttpInput input(&si); - TStringStream ss; - TransferData(&input, &ss); - + } + } + } + + THttpInput input(&si); + TStringStream ss; + TransferData(&input, &ss); + return ss.Str(); - } - + } + TString GetDescription() const { - if (UseHttpOutput) { - TStringStream ss; - ss << (KeepAliveConnection ? "keep-alive " : "") << Type; + if (UseHttpOutput) { + TStringStream ss; + ss << (KeepAliveConnection ? "keep-alive " : "") << Type; if (ContentEncoding.size()) { ss << " with encoding=" << ContentEncoding; - } + } return ss.Str(); - } else { - return Hdr; - } - } - + } else { + return Hdr; + } + } + ui16 Port = 0; bool UseHttpOutput = true; TString Type = "GET"; @@ -287,8 +287,8 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { bool EnableResponseEncoding = false; TString Hdr; bool Expect100Continue = false; - }; - + }; + class TFailingMtpQueue: public TSimpleThreadPool { private: bool FailOnAdd_ = false; @@ -324,7 +324,7 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { TString res = TestData(); TPortManager pm; const ui16 port = pm.GetPort(); - const bool trueFalse[] = {true, false}; + const bool trueFalse[] = {true, false}; TEchoServer serverImpl(res); THttpServer server(&serverImpl, THttpServer::TOptions(port).EnableKeepAlive(true).EnableCompression(true)); @@ -332,36 +332,36 @@ Y_UNIT_TEST_SUITE(THttpServerTest) { for (int i = 0; i < 2; ++i) { UNIT_ASSERT(server.Start()); - TTestRequest r(port); - r.Content = res; + TTestRequest r(port); + r.Content = res; for (bool keepAlive : trueFalse) { - r.KeepAliveConnection = keepAlive; + r.KeepAliveConnection = keepAlive; // THttpOutput use chunked stream, else use Content-Length for (bool useHttpOutput : trueFalse) { - r.UseHttpOutput = useHttpOutput; + r.UseHttpOutput = useHttpOutput; for (bool enableResponseEncoding : trueFalse) { - r.EnableResponseEncoding = enableResponseEncoding; + r.EnableResponseEncoding = enableResponseEncoding; const TString reqTypes[] = {"GET", "POST"}; for (const TString& reqType : reqTypes) { - r.Type = reqType; + r.Type = reqType; const TString encoders[] = {"", "deflate"}; for (const TString& encoder : encoders) { - r.ContentEncoding = encoder; + r.ContentEncoding = encoder; for (bool expect100Continue : trueFalse) { r.Expect100Continue = expect100Continue; TString resp = r.Execute(); UNIT_ASSERT_C(resp == res, "diff echo response for request:\n" + r.GetDescription()); } - } - } - } - } + } + } + } + } } server.Stop(); diff --git a/library/cpp/logger/element.h b/library/cpp/logger/element.h index fc9bff851f..412c6cfb23 100644 --- a/library/cpp/logger/element.h +++ b/library/cpp/logger/element.h @@ -38,7 +38,7 @@ public: ELogPriority Priority() const noexcept { return Priority_; } - + protected: void DoFlush() override; diff --git a/util/network/endpoint.cpp b/util/network/endpoint.cpp index 9acdd06940..80dc18d63e 100644 --- a/util/network/endpoint.cpp +++ b/util/network/endpoint.cpp @@ -1,67 +1,67 @@ -#include "endpoint.h" +#include "endpoint.h" #include "sock.h" - -TEndpoint::TEndpoint(const TEndpoint::TAddrRef& addr) - : Addr_(addr) -{ - const sockaddr* sa = Addr_->Addr(); - + +TEndpoint::TEndpoint(const TEndpoint::TAddrRef& addr) + : Addr_(addr) +{ + const sockaddr* sa = Addr_->Addr(); + if (sa->sa_family != AF_INET && sa->sa_family != AF_INET6 && sa->sa_family != AF_UNIX) { ythrow yexception() << TStringBuf("endpoint can contain only ipv4, ipv6 or unix address"); - } -} - -TEndpoint::TEndpoint() - : Addr_(new NAddr::TIPv4Addr(TIpAddress(TIpHost(0), TIpPort(0)))) -{ -} - -void TEndpoint::SetPort(ui16 port) { + } +} + +TEndpoint::TEndpoint() + : Addr_(new NAddr::TIPv4Addr(TIpAddress(TIpHost(0), TIpPort(0)))) +{ +} + +void TEndpoint::SetPort(ui16 port) { if (Port() == port || Addr_->Addr()->sa_family == AF_UNIX) { - return; - } - - NAddr::TOpaqueAddr* oa = new NAddr::TOpaqueAddr(Addr_.Get()); - Addr_.Reset(oa); - sockaddr* sa = oa->MutableAddr(); - - if (sa->sa_family == AF_INET) { - ((sockaddr_in*)sa)->sin_port = HostToInet(port); - } else { - ((sockaddr_in6*)sa)->sin6_port = HostToInet(port); - } -} - + return; + } + + NAddr::TOpaqueAddr* oa = new NAddr::TOpaqueAddr(Addr_.Get()); + Addr_.Reset(oa); + sockaddr* sa = oa->MutableAddr(); + + if (sa->sa_family == AF_INET) { + ((sockaddr_in*)sa)->sin_port = HostToInet(port); + } else { + ((sockaddr_in6*)sa)->sin6_port = HostToInet(port); + } +} + ui16 TEndpoint::Port() const noexcept { if (Addr_->Addr()->sa_family == AF_UNIX) { return 0; } - const sockaddr* sa = Addr_->Addr(); - - if (sa->sa_family == AF_INET) { - return InetToHost(((const sockaddr_in*)sa)->sin_port); - } else { - return InetToHost(((const sockaddr_in6*)sa)->sin6_port); - } -} - -size_t TEndpoint::Hash() const { - const sockaddr* sa = Addr_->Addr(); - - if (sa->sa_family == AF_INET) { - const sockaddr_in* sa4 = (const sockaddr_in*)sa; - - return IntHash((((ui64)sa4->sin_addr.s_addr) << 16) ^ sa4->sin_port); + const sockaddr* sa = Addr_->Addr(); + + if (sa->sa_family == AF_INET) { + return InetToHost(((const sockaddr_in*)sa)->sin_port); + } else { + return InetToHost(((const sockaddr_in6*)sa)->sin6_port); + } +} + +size_t TEndpoint::Hash() const { + const sockaddr* sa = Addr_->Addr(); + + if (sa->sa_family == AF_INET) { + const sockaddr_in* sa4 = (const sockaddr_in*)sa; + + return IntHash((((ui64)sa4->sin_addr.s_addr) << 16) ^ sa4->sin_port); } else if (sa->sa_family == AF_INET6) { - const sockaddr_in6* sa6 = (const sockaddr_in6*)sa; - const ui64* ptr = (const ui64*)&sa6->sin6_addr; - - return IntHash(ptr[0] ^ ptr[1] ^ sa6->sin6_port); + const sockaddr_in6* sa6 = (const sockaddr_in6*)sa; + const ui64* ptr = (const ui64*)&sa6->sin6_addr; + + return IntHash(ptr[0] ^ ptr[1] ^ sa6->sin6_port); } else { const sockaddr_un* un = (const sockaddr_un*)sa; THash<TString> strHash; return strHash(un->sun_path); - } -} + } +} diff --git a/util/network/endpoint.h b/util/network/endpoint.h index a3e59b4925..fd98d331af 100644 --- a/util/network/endpoint.h +++ b/util/network/endpoint.h @@ -1,61 +1,61 @@ -#pragma once - -#include "address.h" - -#include <util/str_stl.h> - -//some equivalent boost::asio::ip::endpoint (easy for using pair ip:port) -class TEndpoint { -public: +#pragma once + +#include "address.h" + +#include <util/str_stl.h> + +//some equivalent boost::asio::ip::endpoint (easy for using pair ip:port) +class TEndpoint { +public: using TAddrRef = NAddr::IRemoteAddrRef; - - TEndpoint(const TAddrRef& addr); - TEndpoint(); - + + TEndpoint(const TAddrRef& addr); + TEndpoint(); + inline const TAddrRef& Addr() const noexcept { - return Addr_; - } - inline const sockaddr* SockAddr() const { - return Addr_->Addr(); - } - inline socklen_t SockAddrLen() const { - return Addr_->Len(); - } - - inline bool IsIpV4() const { - return Addr_->Addr()->sa_family == AF_INET; - } - inline bool IsIpV6() const { - return Addr_->Addr()->sa_family == AF_INET6; - } + return Addr_; + } + inline const sockaddr* SockAddr() const { + return Addr_->Addr(); + } + inline socklen_t SockAddrLen() const { + return Addr_->Len(); + } + + inline bool IsIpV4() const { + return Addr_->Addr()->sa_family == AF_INET; + } + inline bool IsIpV6() const { + return Addr_->Addr()->sa_family == AF_INET6; + } inline bool IsUnix() const { return Addr_->Addr()->sa_family == AF_UNIX; } - + inline TString IpToString() const { - return NAddr::PrintHost(*Addr_); - } - - void SetPort(ui16 port); + return NAddr::PrintHost(*Addr_); + } + + void SetPort(ui16 port); ui16 Port() const noexcept; - - size_t Hash() const; - -private: - TAddrRef Addr_; -}; - + + size_t Hash() const; + +private: + TAddrRef Addr_; +}; + template <> -struct THash<TEndpoint> { +struct THash<TEndpoint> { inline size_t operator()(const TEndpoint& ep) const { - return ep.Hash(); - } -}; - + return ep.Hash(); + } +}; + inline bool operator==(const TEndpoint& l, const TEndpoint& r) { - try { + try { return NAddr::IsSame(*l.Addr(), *r.Addr()) && l.Port() == r.Port(); - } catch (...) { - return false; - } -} + } catch (...) { + return false; + } +} diff --git a/util/network/endpoint_ut.cpp b/util/network/endpoint_ut.cpp index d5e40dd6e1..30becbfe77 100644 --- a/util/network/endpoint_ut.cpp +++ b/util/network/endpoint_ut.cpp @@ -1,25 +1,25 @@ -#include "endpoint.h" - +#include "endpoint.h" + #include <library/cpp/testing/unittest/registar.h> - -#include <util/generic/hash_set.h> -#include <util/generic/strbuf.h> - + +#include <util/generic/hash_set.h> +#include <util/generic/strbuf.h> + Y_UNIT_TEST_SUITE(TEndpointTest) { Y_UNIT_TEST(TestSimple) { TVector<TNetworkAddress> addrs; - TEndpoint ep0; - - UNIT_ASSERT(ep0.IsIpV4()); - UNIT_ASSERT_VALUES_EQUAL(0, ep0.Port()); - UNIT_ASSERT_VALUES_EQUAL("0.0.0.0", ep0.IpToString()); - + TEndpoint ep0; + + UNIT_ASSERT(ep0.IsIpV4()); + UNIT_ASSERT_VALUES_EQUAL(0, ep0.Port()); + UNIT_ASSERT_VALUES_EQUAL("0.0.0.0", ep0.IpToString()); + TEndpoint ep1; - + try { TNetworkAddress na1("25.26.27.28", 24242); - + addrs.push_back(na1); ep1 = TEndpoint(new NAddr::TAddrInfo(&*na1.Begin())); @@ -35,56 +35,56 @@ Y_UNIT_TEST_SUITE(TEndpointTest) { ep1 = TEndpoint(new NAddr::TAddrInfo(&*n.Begin())); } - ep0.SetPort(12345); - - TEndpoint ep2(ep0); - - ep0.SetPort(0); - - UNIT_ASSERT_VALUES_EQUAL(12345, ep2.Port()); - - TEndpoint ep2_; - - ep2_.SetPort(12345); - - UNIT_ASSERT(ep2 == ep2_); - - TNetworkAddress na3("2a02:6b8:0:1410::5f6c:f3c2", 54321); - TEndpoint ep3(new NAddr::TAddrInfo(&*na3.Begin())); - - UNIT_ASSERT(ep3.IsIpV6()); + ep0.SetPort(12345); + + TEndpoint ep2(ep0); + + ep0.SetPort(0); + + UNIT_ASSERT_VALUES_EQUAL(12345, ep2.Port()); + + TEndpoint ep2_; + + ep2_.SetPort(12345); + + UNIT_ASSERT(ep2 == ep2_); + + TNetworkAddress na3("2a02:6b8:0:1410::5f6c:f3c2", 54321); + TEndpoint ep3(new NAddr::TAddrInfo(&*na3.Begin())); + + UNIT_ASSERT(ep3.IsIpV6()); UNIT_ASSERT(ep3.IpToString().StartsWith(TStringBuf("2a02:6b8:0:1410:"))); UNIT_ASSERT(ep3.IpToString().EndsWith(TStringBuf(":5f6c:f3c2"))); - UNIT_ASSERT_VALUES_EQUAL(54321, ep3.Port()); - - TNetworkAddress na4("2a02:6b8:0:1410:0::5f6c:f3c2", 1); - TEndpoint ep4(new NAddr::TAddrInfo(&*na4.Begin())); - - TEndpoint ep3_ = ep4; - - ep3_.SetPort(54321); - + UNIT_ASSERT_VALUES_EQUAL(54321, ep3.Port()); + + TNetworkAddress na4("2a02:6b8:0:1410:0::5f6c:f3c2", 1); + TEndpoint ep4(new NAddr::TAddrInfo(&*na4.Begin())); + + TEndpoint ep3_ = ep4; + + ep3_.SetPort(54321); + THashSet<TEndpoint> he; - - he.insert(ep0); - he.insert(ep1); - he.insert(ep2); - + + he.insert(ep0); + he.insert(ep1); + he.insert(ep2); + UNIT_ASSERT_VALUES_EQUAL(3u, he.size()); - - he.insert(ep2_); - + + he.insert(ep2_); + UNIT_ASSERT_VALUES_EQUAL(3u, he.size()); - - he.insert(ep3); - he.insert(ep3_); - + + he.insert(ep3); + he.insert(ep3_); + UNIT_ASSERT_VALUES_EQUAL(4u, he.size()); - - he.insert(ep4); - + + he.insert(ep4); + UNIT_ASSERT_VALUES_EQUAL(5u, he.size()); - } + } Y_UNIT_TEST(TestEqual) { const TString ip1 = "2a02:6b8:0:1410::5f6c:f3c2"; @@ -120,4 +120,4 @@ Y_UNIT_TEST_SUITE(TEndpointTest) { UNIT_ASSERT(!ep2.IsUnix()); UNIT_ASSERT(ep2.SockAddr()->sa_family != AF_UNIX); } -} +} diff --git a/util/system/atexit.cpp b/util/system/atexit.cpp index 74fb10b6b1..25dc673b5f 100644 --- a/util/system/atexit.cpp +++ b/util/system/atexit.cpp @@ -45,15 +45,15 @@ namespace { Y_ASSERT(c); Items_.pop(); - + { auto unguard = Unguard(guard); - + try { - c->Func(c->Ctx); + c->Func(c->Ctx); } catch (...) { // ¯\_(ツ)_/¯ - } + } } } } diff --git a/util/system/event.cpp b/util/system/event.cpp index 79b3cdb291..9771e44a89 100644 --- a/util/system/event.cpp +++ b/util/system/event.cpp @@ -3,7 +3,7 @@ #include <cstdio> -#include "atomic.h" +#include "atomic.h" #include "event.h" #include "mutex.h" #include "condvar.h" @@ -47,17 +47,17 @@ public: } #else inline TEvImpl(ResetMode rmode) - : Manual(rmode == rManual ? true : false) + : Manual(rmode == rManual ? true : false) { } inline void Signal() noexcept { - if (Manual && AtomicGet(Signaled)) { + if (Manual && AtomicGet(Signaled)) { return; // shortcut } with_lock (Mutex) { - AtomicSet(Signaled, 1); + AtomicSet(Signaled, 1); } if (Manual) { @@ -68,27 +68,27 @@ public: } inline void Reset() noexcept { - AtomicSet(Signaled, 0); + AtomicSet(Signaled, 0); } inline bool WaitD(TInstant deadLine) noexcept { - if (Manual && AtomicGet(Signaled)) { + if (Manual && AtomicGet(Signaled)) { return true; // shortcut } bool resSignaled = true; with_lock (Mutex) { - while (!AtomicGet(Signaled)) { + while (!AtomicGet(Signaled)) { if (!Cond.WaitD(Mutex, deadLine)) { - resSignaled = AtomicGet(Signaled); // timed out, but Signaled could have been set + resSignaled = AtomicGet(Signaled); // timed out, but Signaled could have been set break; } } if (!Manual) { - AtomicSet(Signaled, 0); + AtomicSet(Signaled, 0); } } @@ -102,7 +102,7 @@ private: #else TCondVar Cond; TMutex Mutex; - TAtomic Signaled = 0; + TAtomic Signaled = 0; bool Manual; #endif }; diff --git a/util/system/event_ut.cpp b/util/system/event_ut.cpp index 2506cb7a91..d6f14c04cb 100644 --- a/util/system/event_ut.cpp +++ b/util/system/event_ut.cpp @@ -96,22 +96,22 @@ Y_UNIT_TEST_SUITE(EventTest) { } Y_UNIT_TEST(ConcurrentSignalAndWaitTest) { - // test for problem detected by thread-sanitizer (signal/wait race) SEARCH-2113 - const size_t limit = 200; - TManualEvent event[limit]; + // test for problem detected by thread-sanitizer (signal/wait race) SEARCH-2113 + const size_t limit = 200; + TManualEvent event[limit]; TThreadPool queue; - queue.Start(limit); + queue.Start(limit); TVector<THolder<IObjectInQueue>> tasks; - for (size_t i = 0; i < limit; ++i) { + for (size_t i = 0; i < limit; ++i) { tasks.emplace_back(MakeHolder<TSignalTask>(event[i])); - UNIT_ASSERT(queue.Add(tasks.back().Get())); - } - for (size_t i = limit; i != 0; --i) { - UNIT_ASSERT(event[i - 1].WaitT(TDuration::Seconds(90))); - } - queue.Stop(); - } - + UNIT_ASSERT(queue.Add(tasks.back().Get())); + } + for (size_t i = limit; i != 0; --i) { + UNIT_ASSERT(event[i - 1].WaitT(TDuration::Seconds(90))); + } + queue.Stop(); + } + /** Test for a problem: http://nga.at.yandex-team.ru/5772 */ Y_UNIT_TEST(DestructorBeforeSignalFinishTest) { return; |