summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <[email protected]>2024-09-19 00:03:20 +0300
committerbabenko <[email protected]>2024-09-19 00:16:35 +0300
commitff8fb64ae74790de8eae8e3a3be7b7e204387590 (patch)
tree3f11910b345f71bd82a531748c7154e4a0074ebb
parentd1a1efc07455cf12920c34ecc614573a978b3848 (diff)
Refactor and improve logging in HTTP client
commit_hash:f1cdaa7cad19759076e1635e041044afb79321b1
-rw-r--r--yt/yt/core/bus/tcp/connection.cpp8
-rw-r--r--yt/yt/core/bus/tcp/connection.h2
-rw-r--r--yt/yt/core/crypto/tls.cpp101
-rw-r--r--yt/yt/core/http/client.cpp77
-rw-r--r--yt/yt/core/http/client.h10
-rw-r--r--yt/yt/core/http/connection_pool.cpp27
-rw-r--r--yt/yt/core/http/connection_reuse_helpers.h10
-rw-r--r--yt/yt/core/http/helpers.cpp2
-rw-r--r--yt/yt/core/http/http.h5
-rw-r--r--yt/yt/core/http/mock/http.h4
-rw-r--r--yt/yt/core/http/private.h8
-rw-r--r--yt/yt/core/http/public.h2
-rw-r--r--yt/yt/core/http/server.cpp34
-rw-r--r--yt/yt/core/http/stream.cpp20
-rw-r--r--yt/yt/core/http/stream.h16
-rw-r--r--yt/yt/core/http/unittests/http_ut.cpp13
-rw-r--r--yt/yt/core/net/connection.cpp47
-rw-r--r--yt/yt/core/net/connection.h6
-rw-r--r--yt/yt/core/net/dialer.cpp54
-rw-r--r--yt/yt/core/net/dialer.h10
-rw-r--r--yt/yt/core/net/public.h2
21 files changed, 260 insertions, 198 deletions
diff --git a/yt/yt/core/bus/tcp/connection.cpp b/yt/yt/core/bus/tcp/connection.cpp
index b71fbb9363b..83faf6cb42b 100644
--- a/yt/yt/core/bus/tcp/connection.cpp
+++ b/yt/yt/core/bus/tcp/connection.cpp
@@ -621,18 +621,18 @@ void TTcpConnection::ConnectSocket(const TNetworkAddress& address)
DialerSession_->Dial();
}
-void TTcpConnection::OnDialerFinished(const TErrorOr<SOCKET>& socketOrError)
+void TTcpConnection::OnDialerFinished(const TErrorOr<TFileDescriptor>& fdOrError)
{
YT_LOG_DEBUG("Dialer finished");
DialerSession_.Reset();
- if (!socketOrError.IsOK()) {
+ if (!fdOrError.IsOK()) {
Abort(TError(
NBus::EErrorCode::TransportError,
"Error connecting to %v",
EndpointDescription_)
- << socketOrError);
+ << fdOrError);
return;
}
@@ -643,7 +643,7 @@ void TTcpConnection::OnDialerFinished(const TErrorOr<SOCKET>& socketOrError)
return;
}
- Socket_ = socketOrError.Value();
+ Socket_ = fdOrError.Value();
auto tosLevel = TosLevel_.load();
if (tosLevel != DefaultTosLevel) {
diff --git a/yt/yt/core/bus/tcp/connection.h b/yt/yt/core/bus/tcp/connection.h
index d5d631f3b22..2948d20b676 100644
--- a/yt/yt/core/bus/tcp/connection.h
+++ b/yt/yt/core/bus/tcp/connection.h
@@ -297,7 +297,7 @@ private:
int GetSocketPort();
void ConnectSocket(const NNet::TNetworkAddress& address);
- void OnDialerFinished(const TErrorOr<SOCKET>& socketOrError);
+ void OnDialerFinished(const TErrorOr<TFileDescriptor>& fdOrError);
void ResolveAddress();
void OnAddressResolveFinished(const TErrorOr<NNet::TNetworkAddress>& result);
diff --git a/yt/yt/core/crypto/tls.cpp b/yt/yt/core/crypto/tls.cpp
index ee3d1399b62..38779e21a4c 100644
--- a/yt/yt/core/crypto/tls.cpp
+++ b/yt/yt/core/crypto/tls.cpp
@@ -57,7 +57,7 @@ constexpr auto TlsBufferSize = 1_MB;
struct TSslContextImpl
: public TRefCounted
{
- SSL_CTX* Ctx = nullptr;
+ SSL_CTX* Context = nullptr;
TSslContextImpl()
{
@@ -66,36 +66,36 @@ struct TSslContextImpl
~TSslContextImpl()
{
- if (Ctx) {
- SSL_CTX_free(Ctx);
+ if (Context) {
+ SSL_CTX_free(Context);
}
- if (ActiveCtx_) {
- SSL_CTX_free(ActiveCtx_);
+ if (ActiveContext_) {
+ SSL_CTX_free(ActiveContext_);
}
}
void Reset()
{
- if (Ctx) {
- SSL_CTX_free(Ctx);
+ if (Context) {
+ SSL_CTX_free(Context);
}
#if OPENSSL_VERSION_NUMBER >= 0x10100000L
- Ctx = SSL_CTX_new(TLS_method());
- if (!Ctx) {
+ Context = SSL_CTX_new(TLS_method());
+ if (!Context) {
THROW_ERROR_EXCEPTION("SSL_CTX_new(TLS_method()) failed")
<< GetSslErrors();
}
- if (SSL_CTX_set_min_proto_version(Ctx, TLS1_2_VERSION) == 0) {
+ if (SSL_CTX_set_min_proto_version(Context, TLS1_2_VERSION) == 0) {
THROW_ERROR_EXCEPTION("SSL_CTX_set_min_proto_version failed")
<< GetSslErrors();
}
- if (SSL_CTX_set_max_proto_version(Ctx, TLS1_2_VERSION) == 0) {
+ if (SSL_CTX_set_max_proto_version(Context, TLS1_2_VERSION) == 0) {
THROW_ERROR_EXCEPTION("SSL_CTX_set_max_proto_version failed")
<< GetSslErrors();
}
#else
Ctx = SSL_CTX_new(TLSv1_2_method());
- if (!Ctx) {
+ if (!Context) {
THROW_ERROR_EXCEPTION("SSL_CTX_new(TLSv1_2_method()) failed")
<< GetSslErrors();
}
@@ -104,17 +104,17 @@ struct TSslContextImpl
void Commit(TInstant time)
{
- SSL_CTX* oldCtx;
- YT_ASSERT(Ctx);
+ SSL_CTX* oldContext;
+ YT_ASSERT(Context);
{
auto guard = WriterGuard(Lock_);
- oldCtx = ActiveCtx_;
- ActiveCtx_ = Ctx;
- Ctx = nullptr;
+ oldContext = ActiveContext_;
+ ActiveContext_ = Context;
+ Context = nullptr;
CommitTime_ = time;
}
- if (oldCtx) {
- SSL_CTX_free(oldCtx);
+ if (oldContext) {
+ SSL_CTX_free(oldContext);
}
}
@@ -127,19 +127,19 @@ struct TSslContextImpl
SSL* NewSsl()
{
auto guard = ReaderGuard(Lock_);
- YT_ASSERT(ActiveCtx_);
- return SSL_new(ActiveCtx_);
+ YT_ASSERT(ActiveContext_);
+ return SSL_new(ActiveContext_);
}
bool IsActive(const SSL* ssl)
{
auto guard = ReaderGuard(Lock_);
- return SSL_get_SSL_CTX(ssl) == ActiveCtx_;
+ return SSL_get_SSL_CTX(ssl) == ActiveContext_;
}
private:
YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, Lock_);
- SSL_CTX* ActiveCtx_ = nullptr;
+ SSL_CTX* ActiveContext_ = nullptr;
TInstant CommitTime_;
};
@@ -155,14 +155,14 @@ class TTlsConnection
{
public:
TTlsConnection(
- TSslContextImplPtr ctx,
+ TSslContextImplPtr context,
IPollerPtr poller,
IConnectionPtr connection)
- : Ctx_(std::move(ctx))
+ : Context_(std::move(context))
, Invoker_(CreateSerializedInvoker(poller->GetInvoker(), "crypto_tls_connection"))
, Underlying_(std::move(connection))
{
- Ssl_ = Ctx_->NewSsl();
+ Ssl_ = Context_->NewSsl();
if (!Ssl_) {
THROW_ERROR_EXCEPTION("SSL_new failed")
<< GetSslErrors();
@@ -229,14 +229,19 @@ public:
return Underlying_->GetWriteByteCount();
}
- const TNetworkAddress& LocalAddress() const override
+ TConnectionId GetId() const override
+ {
+ return Underlying_->GetId();
+ }
+
+ const TNetworkAddress& GetLocalAddress() const override
{
- return Underlying_->LocalAddress();
+ return Underlying_->GetLocalAddress();
}
- const TNetworkAddress& RemoteAddress() const override
+ const TNetworkAddress& GetRemoteAddress() const override
{
- return Underlying_->RemoteAddress();
+ return Underlying_->GetRemoteAddress();
}
TConnectionStatistics GetWriteStatistics() const override
@@ -356,7 +361,7 @@ public:
}
private:
- const TSslContextImplPtr Ctx_;
+ const TSslContextImplPtr Context_;
const IInvokerPtr Invoker_;
const IConnectionPtr Underlying_;
@@ -570,18 +575,18 @@ public:
TSslContextImplPtr ctx,
IDialerPtr dialer,
IPollerPtr poller)
- : Ctx_(std::move(ctx))
+ : Context_(std::move(ctx))
, Underlying_(std::move(dialer))
, Poller_(std::move(poller))
{ }
- TFuture<IConnectionPtr> Dial(const TNetworkAddress& remote, TDialerContextPtr context) override
+ TFuture<IConnectionPtr> Dial(const TNetworkAddress& remoteAddress, TDialerContextPtr context) override
{
- return Underlying_->Dial(remote)
- .Apply(BIND([ctx = Ctx_, poller = Poller_, context = std::move(context)] (const IConnectionPtr& underlying) -> IConnectionPtr {
+ return Underlying_->Dial(remoteAddress)
+ .Apply(BIND([ctx = Context_, poller = Poller_, context = std::move(context)] (const IConnectionPtr& underlying) -> IConnectionPtr {
auto connection = New<TTlsConnection>(ctx, poller, underlying);
- if (context != nullptr && context->Host != std::nullopt) {
- connection->SetHost(*(context->Host));
+ if (context && context->Host) {
+ connection->SetHost(*context->Host);
}
connection->StartClient();
return connection;
@@ -589,7 +594,7 @@ public:
}
private:
- const TSslContextImplPtr Ctx_;
+ const TSslContextImplPtr Context_;
const IDialerPtr Underlying_;
const IPollerPtr Poller_;
};
@@ -658,12 +663,12 @@ TInstant TSslContext::GetCommitTime() const
void TSslContext::UseBuiltinOpenSslX509Store()
{
- SSL_CTX_set_cert_store(Impl_->Ctx, GetBuiltinOpenSslX509Store().Release());
+ SSL_CTX_set_cert_store(Impl_->Context, GetBuiltinOpenSslX509Store().Release());
}
void TSslContext::SetCipherList(const TString& list)
{
- if (SSL_CTX_set_cipher_list(Impl_->Ctx, list.data()) == 0) {
+ if (SSL_CTX_set_cipher_list(Impl_->Context, list.data()) == 0) {
THROW_ERROR_EXCEPTION("SSL_CTX_set_cipher_list failed")
<< TErrorAttribute("cipher_list", list)
<< GetSslErrors();
@@ -672,7 +677,7 @@ void TSslContext::SetCipherList(const TString& list)
void TSslContext::AddCertificateFromFile(const TString& path)
{
- if (SSL_CTX_use_certificate_file(Impl_->Ctx, path.c_str(), SSL_FILETYPE_PEM) != 1) {
+ if (SSL_CTX_use_certificate_file(Impl_->Context, path.c_str(), SSL_FILETYPE_PEM) != 1) {
THROW_ERROR_EXCEPTION("SSL_CTX_use_certificate_file failed")
<< TErrorAttribute("path", path)
<< GetSslErrors();
@@ -681,7 +686,7 @@ void TSslContext::AddCertificateFromFile(const TString& path)
void TSslContext::AddCertificateChainFromFile(const TString& path)
{
- if (SSL_CTX_use_certificate_chain_file(Impl_->Ctx, path.c_str()) != 1) {
+ if (SSL_CTX_use_certificate_chain_file(Impl_->Context, path.c_str()) != 1) {
THROW_ERROR_EXCEPTION("SSL_CTX_use_certificate_chain_file failed")
<< TErrorAttribute("path", path)
<< GetSslErrors();
@@ -690,7 +695,7 @@ void TSslContext::AddCertificateChainFromFile(const TString& path)
void TSslContext::AddPrivateKeyFromFile(const TString& path)
{
- if (SSL_CTX_use_PrivateKey_file(Impl_->Ctx, path.c_str(), SSL_FILETYPE_PEM) != 1) {
+ if (SSL_CTX_use_PrivateKey_file(Impl_->Context, path.c_str(), SSL_FILETYPE_PEM) != 1) {
THROW_ERROR_EXCEPTION("SSL_CTX_use_PrivateKey_file failed")
<< TErrorAttribute("path", path)
<< GetSslErrors();
@@ -714,12 +719,12 @@ void TSslContext::AddCertificateChain(const TString& certificateChain)
X509_free(certificateObject);
});
- if (SSL_CTX_use_certificate(Impl_->Ctx, certificateObject) != 1) {
+ if (SSL_CTX_use_certificate(Impl_->Context, certificateObject) != 1) {
THROW_ERROR_EXCEPTION("SSL_CTX_use_certificate failed")
<< GetSslErrors();
}
- SSL_CTX_clear_chain_certs(Impl_->Ctx);
+ SSL_CTX_clear_chain_certs(Impl_->Context);
while (true) {
auto chainCertificateObject = PEM_read_bio_X509(bio, nullptr, nullptr, nullptr);
if (!chainCertificateObject) {
@@ -733,7 +738,7 @@ void TSslContext::AddCertificateChain(const TString& certificateChain)
<< GetSslErrors();
}
- int result = SSL_CTX_add0_chain_cert(Impl_->Ctx, chainCertificateObject);
+ int result = SSL_CTX_add0_chain_cert(Impl_->Context, chainCertificateObject);
if (!result) {
X509_free(chainCertificateObject);
THROW_ERROR_EXCEPTION("SSL_CTX_add0_chain_cert")
@@ -759,7 +764,7 @@ void TSslContext::AddCertificate(const TString& certificate)
X509_free(certificateObject);
});
- if (SSL_CTX_use_certificate(Impl_->Ctx, certificateObject) != 1) {
+ if (SSL_CTX_use_certificate(Impl_->Context, certificateObject) != 1) {
THROW_ERROR_EXCEPTION("SSL_CTX_use_certificate failed")
<< GetSslErrors();
}
@@ -782,7 +787,7 @@ void TSslContext::AddPrivateKey(const TString& privateKey)
EVP_PKEY_free(privateKeyObject);
});
- if (SSL_CTX_use_PrivateKey(Impl_->Ctx, privateKeyObject) != 1) {
+ if (SSL_CTX_use_PrivateKey(Impl_->Context, privateKeyObject) != 1) {
THROW_ERROR_EXCEPTION("SSL_CTX_use_PrivateKey failed")
<< GetSslErrors();
}
diff --git a/yt/yt/core/http/client.cpp b/yt/yt/core/http/client.cpp
index dc7bf3077df..d5e072025ef 100644
--- a/yt/yt/core/http/client.cpp
+++ b/yt/yt/core/http/client.cpp
@@ -12,6 +12,7 @@
#include <yt/yt/core/net/connection.h>
#include <yt/yt/core/concurrency/poller.h>
+
#include <util/string/cast.h>
namespace NYT::NHttp {
@@ -26,13 +27,13 @@ class TClient
{
public:
TClient(
- const TClientConfigPtr& config,
- const IDialerPtr& dialer,
- const IInvokerPtr& invoker)
- : Config_(config)
- , Dialer_(dialer)
- , Invoker_(invoker)
- , ConnectionPool_(New<TConnectionPool>(dialer, config, invoker))
+ TClientConfigPtr config,
+ IDialerPtr dialer,
+ IInvokerPtr invoker)
+ : Config_(std::move(config))
+ , Dialer_(std::move(dialer))
+ , Invoker_(std::move(invoker))
+ , ConnectionPool_(New<TConnectionPool>(Dialer_, Config_, Invoker_))
{ }
TFuture<IResponsePtr> Get(
@@ -109,7 +110,7 @@ private:
const TClientConfigPtr Config_;
const IDialerPtr Dialer_;
const IInvokerPtr Invoker_;
- TConnectionPoolPtr ConnectionPool_;
+ const TConnectionPoolPtr ConnectionPool_;
static int GetDefaultPort(const TUrlRef& parsedUrl)
{
@@ -125,9 +126,8 @@ private:
auto host = parsedUrl.Host;
TNetworkAddress address;
- auto tryIP = TNetworkAddress::TryParse(host);
- if (tryIP.IsOK()) {
- address = tryIP.Value();
+ if (auto ipOrError = TNetworkAddress::TryParse(host); ipOrError.IsOK()) {
+ address = ipOrError.Value();
} else {
auto asyncAddress = TAddressResolver::Get()->Resolve(ToString(host));
address = WaitFor(asyncAddress)
@@ -137,15 +137,17 @@ private:
return TNetworkAddress(address, parsedUrl.Port.value_or(GetDefaultPort(parsedUrl)));
}
- std::pair<THttpOutputPtr, THttpInputPtr> OpenHttp(const TUrlRef& urlRef)
+ std::pair<THttpOutputPtr, THttpInputPtr> Connect(const TUrlRef& urlRef)
{
auto context = New<TDialerContext>();
context->Host = urlRef.Host;
+
auto address = GetAddress(urlRef);
// TODO(aleexfi): Enable connection pool by default
if (Config_->MaxIdleConnections == 0) {
- auto connection = WaitFor(Dialer_->Dial(address, std::move(context))).ValueOrThrow();
+ auto connection = WaitFor(Dialer_->Dial(address, std::move(context)))
+ .ValueOrThrow();
auto input = New<THttpInput>(
connection,
@@ -161,9 +163,10 @@ private:
return {std::move(output), std::move(input)};
} else {
- auto connection = WaitFor(ConnectionPool_->Connect(address, std::move(context))).ValueOrThrow();
+ auto connection = WaitFor(ConnectionPool_->Connect(address, std::move(context)))
+ .ValueOrThrow();
- auto reuseSharedState = New<NDetail::TReusableConnectionState>(connection, ConnectionPool_);
+ auto reusableState = New<NDetail::TReusableConnectionState>(connection, ConnectionPool_);
auto input = New<NDetail::TConnectionReuseWrapper<THttpInput>>(
connection,
@@ -171,13 +174,13 @@ private:
Invoker_,
EMessageType::Response,
Config_);
- input->SetReusableState(reuseSharedState);
+ input->SetReusableState(reusableState);
auto output = New<NDetail::TConnectionReuseWrapper<THttpOutput>>(
connection,
EMessageType::Request,
Config_);
- output->SetReusableState(reuseSharedState);
+ output->SetReusableState(reusableState);
return {std::move(output), std::move(input)};
}
@@ -186,7 +189,7 @@ private:
template <typename T>
TFuture<T> WrapError(const TString& url, TCallback<T()> action)
{
- return BIND([=, this_ = MakeStrong(this)] {
+ return BIND([=, this_ = MakeStrong(this), action = std::move(action)] {
try {
return action();
} catch (const std::exception& ex) {
@@ -223,7 +226,7 @@ private:
// Waits for response headers internally.
Response_->GetStatusCode();
- return IResponsePtr{Response_};
+ return IResponsePtr(Response_);
}));
}
@@ -238,10 +241,10 @@ private:
}
private:
- THttpOutputPtr Request_;
- THttpInputPtr Response_;
- TIntrusivePtr<TClient> Client_;
- TString Url_;
+ const THttpOutputPtr Request_;
+ const THttpInputPtr Response_;
+ const TIntrusivePtr<TClient> Client_;
+ const TString Url_;
};
std::pair<THttpOutputPtr, THttpInputPtr> StartAndWriteHeaders(
@@ -254,7 +257,7 @@ private:
auto urlRef = ParseUrl(url);
- std::tie(request, response) = OpenHttp(urlRef);
+ std::tie(request, response) = Connect(urlRef);
request->SetHost(urlRef.Host, urlRef.PortStr);
if (headers) {
@@ -276,7 +279,7 @@ private:
{
return WrapError(url, BIND([=, this, this_ = MakeStrong(this)] {
auto [request, response] = StartAndWriteHeaders(method, url, headers);
- return IActiveRequestPtr{New<TActiveRequest>(request, response, this_, url)};
+ return IActiveRequestPtr(New<TActiveRequest>(request, response, this_, url));
}));
}
@@ -303,28 +306,32 @@ private:
return DoRequest(method, *redirectUrl, body, headers, redirectCount + 1);
}
- return IResponsePtr(response);
+ return response;
}
};
////////////////////////////////////////////////////////////////////////////////
IClientPtr CreateClient(
- const TClientConfigPtr& config,
- const IDialerPtr& dialer,
- const IInvokerPtr& invoker)
+ TClientConfigPtr config,
+ IDialerPtr dialer,
+ IInvokerPtr invoker)
{
- return New<TClient>(config, dialer, invoker);
+ return New<TClient>(
+ std::move(config),
+ std::move(dialer),
+ std::move(invoker));
}
IClientPtr CreateClient(
- const TClientConfigPtr& config,
- const IPollerPtr& poller)
+ TClientConfigPtr config,
+ IPollerPtr poller)
{
+ auto invoker = poller->GetInvoker();
return CreateClient(
- config,
- CreateDialer(New<TDialerConfig>(), poller, HttpLogger()),
- poller->GetInvoker());
+ std::move(config),
+ CreateDialer(New<TDialerConfig>(), std::move(poller), HttpLogger()),
+ std::move(invoker));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/http/client.h b/yt/yt/core/http/client.h
index a75df31142e..27457626332 100644
--- a/yt/yt/core/http/client.h
+++ b/yt/yt/core/http/client.h
@@ -78,12 +78,12 @@ DEFINE_REFCOUNTED_TYPE(IClient)
////////////////////////////////////////////////////////////////////////////////
IClientPtr CreateClient(
- const TClientConfigPtr& config,
- const NNet::IDialerPtr& dialer,
- const IInvokerPtr& invoker);
+ TClientConfigPtr config,
+ NNet::IDialerPtr dialer,
+ IInvokerPtr invoker);
IClientPtr CreateClient(
- const TClientConfigPtr& config,
- const NConcurrency::IPollerPtr& poller);
+ TClientConfigPtr config,
+ NConcurrency::IPollerPtr poller);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/http/connection_pool.cpp b/yt/yt/core/http/connection_pool.cpp
index a14891164c1..24dc08d5b23 100644
--- a/yt/yt/core/http/connection_pool.cpp
+++ b/yt/yt/core/http/connection_pool.cpp
@@ -1,5 +1,7 @@
#include "connection_pool.h"
+#include "private.h"
+
#include <yt/yt/core/concurrency/periodic_executor.h>
#include <yt/yt/core/net/connection.h>
@@ -11,6 +13,10 @@ using namespace NConcurrency;
////////////////////////////////////////////////////////////////////////////////
+static constexpr auto& Logger = HttpLogger;
+
+////////////////////////////////////////////////////////////////////////////////
+
TDuration TIdleConnection::GetIdleTime() const
{
return TInstant::Now() - InsertionTime;
@@ -56,7 +62,11 @@ TFuture<IConnectionPtr> TConnectionPool::Connect(
while (auto item = Connections_.Extract(address)) {
if (item->GetIdleTime() < Config_->ConnectionIdleTimeout && item->IsOK()) {
- return MakeFuture<IConnectionPtr>(std::move(item->Connection));
+ auto&& connection = item->Connection;
+ YT_LOG_DEBUG("Connection is extracted from cache (Address: %v, ConnectionId: %v)",
+ address,
+ connection->GetId());
+ return MakeFuture<IConnectionPtr>(std::move(connection));
}
}
}
@@ -66,21 +76,28 @@ TFuture<IConnectionPtr> TConnectionPool::Connect(
void TConnectionPool::Release(const IConnectionPtr& connection)
{
+ YT_LOG_DEBUG("Connection is put to cache (Address: %v, ConnectionId: %v)",
+ connection->GetRemoteAddress(),
+ connection->GetId());
+
auto guard = Guard(SpinLock_);
- Connections_.Insert(connection->RemoteAddress(), {connection, TInstant::Now()});
+ Connections_.Insert(connection->GetRemoteAddress(), {connection, TInstant::Now()});
}
void TConnectionPool::DropExpiredConnections()
{
auto guard = Guard(SpinLock_);
- TMultiLruCache<TNetworkAddress, TIdleConnection> validConnections(
- Config_->MaxIdleConnections);
+ decltype(Connections_) validConnections(Config_->MaxIdleConnections);
while (Connections_.GetSize() > 0) {
auto idleConnection = Connections_.Pop();
if (idleConnection.GetIdleTime() < Config_->ConnectionIdleTimeout && idleConnection.IsOK()) {
- validConnections.Insert(idleConnection.Connection->RemoteAddress(), idleConnection);
+ validConnections.Insert(idleConnection.Connection->GetRemoteAddress(), idleConnection);
+ } else {
+ YT_LOG_DEBUG("Connection expired from cache (Address: %v, ConnectionId: %v)",
+ idleConnection.Connection->GetRemoteAddress(),
+ idleConnection.Connection->GetId());
}
}
diff --git a/yt/yt/core/http/connection_reuse_helpers.h b/yt/yt/core/http/connection_reuse_helpers.h
index bb3762bb6d6..5d7039ebad8 100644
--- a/yt/yt/core/http/connection_reuse_helpers.h
+++ b/yt/yt/core/http/connection_reuse_helpers.h
@@ -1,6 +1,6 @@
#pragma once
-#include "public.h"
+#include "private.h"
#include <yt/yt/core/net/public.h>
@@ -9,7 +9,7 @@ namespace NYT::NHttp::NDetail {
////////////////////////////////////////////////////////////////////////////////
//! Responsible for returning the connection to the owning pool
-//! if it could be reused
+//! if it could be reused.
struct TReusableConnectionState final
{
std::atomic<bool> Reusable = true;
@@ -20,10 +20,12 @@ struct TReusableConnectionState final
~TReusableConnectionState();
};
-using TReusableConnectionStatePtr = TIntrusivePtr<TReusableConnectionState>;
+DEFINE_REFCOUNTED_TYPE(TReusableConnectionState)
+
+////////////////////////////////////////////////////////////////////////////////
//! Reports to the shared state whether the connection could be reused
-//! (by calling T::IsSafeToReuse() in the destructor)
+//! (by calling T::IsSafeToReuse() in the destructor).
template <class T>
class TConnectionReuseWrapper
: public T
diff --git a/yt/yt/core/http/helpers.cpp b/yt/yt/core/http/helpers.cpp
index 96a9c672450..e5bead9511b 100644
--- a/yt/yt/core/http/helpers.cpp
+++ b/yt/yt/core/http/helpers.cpp
@@ -413,7 +413,7 @@ bool TryParseTraceParent(const TString& traceParent, NTracing::TSpanContext& spa
// Now we have exactly three parts: traceId-spanId-options.
// Parse trace context.
- if (!TGuid::FromStringHex32(parts[0], &spanContext.TraceId)) {
+ if (!NTracing::TTraceId::FromStringHex32(parts[0], &spanContext.TraceId)) {
return false;
}
diff --git a/yt/yt/core/http/http.h b/yt/yt/core/http/http.h
index afd6c061f61..0de762c235e 100644
--- a/yt/yt/core/http/http.h
+++ b/yt/yt/core/http/http.h
@@ -233,8 +233,9 @@ struct IRequest
virtual const NNet::TNetworkAddress& GetRemoteAddress() const = 0;
- virtual TGuid GetConnectionId() const = 0;
- virtual TGuid GetRequestId() const = 0;
+ virtual TConnectionId GetConnectionId() const = 0;
+ virtual TRequestId GetRequestId() const = 0;
+
virtual i64 GetReadByteCount() const = 0;
virtual TInstant GetStartTime() const = 0;
diff --git a/yt/yt/core/http/mock/http.h b/yt/yt/core/http/mock/http.h
index 27e62e561fb..84ee0356ea8 100644
--- a/yt/yt/core/http/mock/http.h
+++ b/yt/yt/core/http/mock/http.h
@@ -21,8 +21,8 @@ public:
MOCK_METHOD(const TUrlRef&, GetUrl, (), (override));
MOCK_METHOD(const THeadersPtr&, GetHeaders, (), (override));
MOCK_METHOD(const NNet::TNetworkAddress&, GetRemoteAddress, (), (const, override));
- MOCK_METHOD(TGuid, GetConnectionId, (), (const, override));
- MOCK_METHOD(TGuid, GetRequestId, (), (const, override));
+ MOCK_METHOD(TConnectionId, GetConnectionId, (), (const, override));
+ MOCK_METHOD(TRequestId, GetRequestId, (), (const, override));
MOCK_METHOD(i64, GetReadByteCount, (), (const, override));
MOCK_METHOD(TInstant, GetStartTime, (), (const, override));
MOCK_METHOD(int, GetPort, (), (const, override));
diff --git a/yt/yt/core/http/private.h b/yt/yt/core/http/private.h
index 691d022bbd9..a1563033503 100644
--- a/yt/yt/core/http/private.h
+++ b/yt/yt/core/http/private.h
@@ -20,4 +20,12 @@ DECLARE_REFCOUNTED_CLASS(THttpOutput)
////////////////////////////////////////////////////////////////////////////////
+namespace NDetail {
+
+DECLARE_REFCOUNTED_STRUCT(TReusableConnectionState)
+
+} // namespace NDetail
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NHttp
diff --git a/yt/yt/core/http/public.h b/yt/yt/core/http/public.h
index 74f0e8d29d6..b904f89675c 100644
--- a/yt/yt/core/http/public.h
+++ b/yt/yt/core/http/public.h
@@ -33,6 +33,8 @@ DECLARE_REFCOUNTED_CLASS(TSharedRefOutputStream)
////////////////////////////////////////////////////////////////////////////////
using TContentEncoding = TString;
+using TConnectionId = TGuid;
+using TRequestId = TGuid;
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/http/server.cpp b/yt/yt/core/http/server.cpp
index 74648033071..ae7df4b6204 100644
--- a/yt/yt/core/http/server.cpp
+++ b/yt/yt/core/http/server.cpp
@@ -165,20 +165,19 @@ private:
ConnectionsDropped_.Increment();
ActiveConnections_--;
YT_LOG_WARNING("Server is over max active connection limit (RemoteAddress: %v)",
- connection->RemoteAddress());
+ connection->GetRemoteAddress());
return;
}
ConnectionsActive_.Update(count);
ConnectionsAccepted_.Increment();
- auto connectionId = TGuid::Create();
YT_LOG_DEBUG("Connection accepted (ConnectionId: %v, RemoteAddress: %v, LocalAddress: %v)",
- connectionId,
- connection->RemoteAddress(),
- connection->LocalAddress());
+ connection->GetId(),
+ connection->GetRemoteAddress(),
+ connection->GetLocalAddress());
Invoker_->Invoke(
- BIND(&TServer::HandleConnection, MakeStrong(this), std::move(connection), connectionId));
+ BIND(&TServer::HandleConnection, MakeStrong(this), std::move(connection)));
}
bool HandleRequest(const THttpInputPtr& request, const THttpOutputPtr& response)
@@ -271,10 +270,10 @@ private:
return true;
}
- void HandleConnection(const IConnectionPtr& connection, TGuid connectionId)
+ void HandleConnection(const IConnectionPtr& connection)
{
try {
- connection->SubscribePeerDisconnect(BIND([config = Config_, canceler = GetCurrentFiberCanceler(), connectionId = connectionId] {
+ connection->SubscribePeerDisconnect(BIND([config = Config_, canceler = GetCurrentFiberCanceler(), connectionId = connection->GetId()] {
YT_LOG_DEBUG("Client closed TCP socket (ConnectionId: %v)", connectionId);
if (config->CancelFiberOnConnectionClose.value_or(false)) {
@@ -291,17 +290,17 @@ private:
connection->SetNoDelay();
}
- DoHandleConnection(connection, connectionId);
+ DoHandleConnection(connection);
} catch (const std::exception& ex) {
- YT_LOG_ERROR(ex, "Unhandled exception (ConnectionId: %v)", connectionId);
+ YT_LOG_ERROR(ex, "Unhandled exception (ConnectionId: %v)", connection->GetId());
}
}
- void DoHandleConnection(const IConnectionPtr& connection, TGuid connectionId)
+ void DoHandleConnection(const IConnectionPtr& connection)
{
auto request = New<THttpInput>(
connection,
- connection->RemoteAddress(),
+ connection->GetRemoteAddress(),
GetCurrentInvoker(),
EMessageType::Request,
Config_);
@@ -317,11 +316,8 @@ private:
EMessageType::Response,
Config_);
- request->SetConnectionId(connectionId);
- response->SetConnectionId(connectionId);
-
while (true) {
- auto requestId = TGuid::Create();
+ auto requestId = TRequestId::Create();
request->SetRequestId(requestId);
response->SetRequestId(requestId);
@@ -332,7 +328,7 @@ private:
auto logDrop = [&] (auto reason) {
YT_LOG_DEBUG("Dropping HTTP connection (ConnectionId: %v, Reason: %v)",
- connectionId,
+ connection->GetId(),
reason);
};
@@ -381,10 +377,10 @@ private:
auto connectionResult = WaitFor(connection->Close());
if (connectionResult.IsOK()) {
YT_LOG_DEBUG("HTTP connection closed (ConnectionId: %v)",
- connectionId);
+ connection->GetId());
} else {
YT_LOG_DEBUG(connectionResult, "Error closing HTTP connection (ConnectionId: %v)",
- connectionId);
+ connection->GetId());
}
}
};
diff --git a/yt/yt/core/http/stream.cpp b/yt/yt/core/http/stream.cpp
index db16ffdf53a..632e035a4c9 100644
--- a/yt/yt/core/http/stream.cpp
+++ b/yt/yt/core/http/stream.cpp
@@ -295,22 +295,17 @@ const TNetworkAddress& THttpInput::GetRemoteAddress() const
return RemoteAddress_;
}
-TGuid THttpInput::GetConnectionId() const
+TConnectionId THttpInput::GetConnectionId() const
{
- return ConnectionId_;
+ return Connection_->GetId();
}
-void THttpInput::SetConnectionId(TGuid connectionId)
-{
- ConnectionId_ = connectionId;
-}
-
-TGuid THttpInput::GetRequestId() const
+TRequestId THttpInput::GetRequestId() const
{
return RequestId_;
}
-void THttpInput::SetRequestId(TGuid requestId)
+void THttpInput::SetRequestId(TRequestId requestId)
{
RequestId_ = requestId;
}
@@ -633,12 +628,7 @@ void THttpOutput::Reset()
Trailers_.Reset();
}
-void THttpOutput::SetConnectionId(TGuid connectionId)
-{
- ConnectionId_ = connectionId;
-}
-
-void THttpOutput::SetRequestId(TGuid requestId)
+void THttpOutput::SetRequestId(TRequestId requestId)
{
RequestId_ = requestId;
}
diff --git a/yt/yt/core/http/stream.h b/yt/yt/core/http/stream.h
index b01da702f16..9efcea7f7a0 100644
--- a/yt/yt/core/http/stream.h
+++ b/yt/yt/core/http/stream.h
@@ -105,11 +105,10 @@ public:
const NNet::TNetworkAddress& GetRemoteAddress() const override;
- TGuid GetConnectionId() const override;
- void SetConnectionId(TGuid connectionId);
+ TConnectionId GetConnectionId() const override;
- TGuid GetRequestId() const override;
- void SetRequestId(TGuid requestId);
+ TRequestId GetRequestId() const override;
+ void SetRequestId(TRequestId requestId);
i64 GetReadByteCount() const override;
@@ -149,8 +148,7 @@ private:
THeadersPtr Headers_;
// Debug.
- TGuid ConnectionId_;
- TGuid RequestId_;
+ TRequestId RequestId_;
i64 StartByteCount_ = 0;
NNet::TConnectionStatistics StartStatistics_;
TInstant LastProgressLogTime_;
@@ -215,8 +213,7 @@ public:
bool IsSafeToReuse() const;
void Reset();
- void SetConnectionId(TGuid connectionId);
- void SetRequestId(TGuid requestId);
+ void SetRequestId(TRequestId requestId);
i64 GetWriteByteCount() const override;
@@ -228,8 +225,7 @@ private:
TClosure OnWriteFinish_;
//! Debugging.
- TGuid ConnectionId_;
- TGuid RequestId_;
+ TRequestId RequestId_;
i64 StartByteCount_ = 0;
NNet::TConnectionStatistics StartStatistics_;
bool HeadersLogged_ = false;
diff --git a/yt/yt/core/http/unittests/http_ut.cpp b/yt/yt/core/http/unittests/http_ut.cpp
index e5bc967d730..cda7dea65f8 100644
--- a/yt/yt/core/http/unittests/http_ut.cpp
+++ b/yt/yt/core/http/unittests/http_ut.cpp
@@ -159,6 +159,11 @@ struct TFakeConnection
TString Input;
TString Output;
+ TConnectionId GetId() const override
+ {
+ return {};
+ }
+
bool SetNoDelay() override
{
return true;
@@ -216,12 +221,12 @@ struct TFakeConnection
THROW_ERROR_EXCEPTION("Not implemented");
}
- const TNetworkAddress& LocalAddress() const override
+ const TNetworkAddress& GetLocalAddress() const override
{
THROW_ERROR_EXCEPTION("Not implemented");
}
- const TNetworkAddress& RemoteAddress() const override
+ const TNetworkAddress& GetRemoteAddress() const override
{
THROW_ERROR_EXCEPTION("Not implemented");
}
@@ -1149,7 +1154,7 @@ TEST_P(THttpServerTest, ConnectionKeepAlive)
auto response = New<THttpInput>(
connection,
- connection->RemoteAddress(),
+ connection->GetRemoteAddress(),
Poller->GetInvoker(),
EMessageType::Response,
New<THttpIOConfig>());
@@ -1183,7 +1188,7 @@ TEST_P(THttpServerTest, ConnectionKeepAlive)
auto response = New<THttpInput>(
connection,
- connection->RemoteAddress(),
+ connection->GetRemoteAddress(),
Poller->GetInvoker(),
EMessageType::Response,
New<THttpIOConfig>());
diff --git a/yt/yt/core/net/connection.cpp b/yt/yt/core/net/connection.cpp
index c18c6195c49..fa0c88d4d29 100644
--- a/yt/yt/core/net/connection.cpp
+++ b/yt/yt/core/net/connection.cpp
@@ -628,8 +628,7 @@ public:
TFuture<void> Close()
{
- auto error = TError("Connection closed")
- << TErrorAttribute("connection", Name_);
+ auto error = AnnotateError(TError("Connection closed"));
return AbortIO(error);
}
@@ -665,12 +664,17 @@ public:
return future;
}
- const TNetworkAddress& LocalAddress() const
+ TConnectionId GetId() const
+ {
+ return Id_;
+ }
+
+ const TNetworkAddress& GetLocalAddress() const
{
return LocalAddress_;
}
- const TNetworkAddress& RemoteAddress() const
+ const TNetworkAddress& GetRemoteAddress() const
{
return RemoteAddress_;
}
@@ -746,7 +750,8 @@ public:
}
private:
- const TString Name_;
+ const TConnectionId Id_ = TConnectionId::Create();
+ const TString Endpoint_;
const TString LoggingTag_;
const TNetworkAddress LocalAddress_;
const TNetworkAddress RemoteAddress_;
@@ -768,7 +773,8 @@ private:
TString filePath,
const IPollerPtr& poller,
bool useDeliveryFence)
- : Name_(Format("File{%v}", filePath))
+ : Endpoint_(Format("File{%v}", filePath))
+ , LoggingTag_(Format("ConnectionId: %v %v", Id_, Endpoint_))
, FD_(fd)
, Poller_(std::move(poller))
, UseDeliveryFence_(useDeliveryFence)
@@ -780,8 +786,8 @@ private:
const TNetworkAddress& localAddress,
const TNetworkAddress& remoteAddress,
IPollerPtr poller)
- : Name_(Format("FD{%v<->%v}", localAddress, remoteAddress))
- , LoggingTag_(Format("ConnectionId: %v", Name_))
+ : Endpoint_(Format("FD{%v<->%v}", localAddress, remoteAddress))
+ , LoggingTag_(Format("ConnectionId: %v %v", Id_, Endpoint_))
, LocalAddress_(localAddress)
, RemoteAddress_(remoteAddress)
, FD_(fd)
@@ -894,6 +900,13 @@ private:
TDelayedExecutorCookie ReadTimeoutCookie_;
TDelayedExecutorCookie WriteTimeoutCookie_;
+ TError AnnotateError(const TError& error) const
+ {
+ return error
+ << TErrorAttribute("connection_id", Id_)
+ << TErrorAttribute("connection_endpoint", Endpoint_);
+ }
+
TFuture<void> DoWrite(const TSharedRef& data)
{
auto write = std::make_unique<TWriteOperation>(data);
@@ -959,8 +972,7 @@ private:
if (error.IsOK()) {
if (direction->Operation) {
- THROW_ERROR_EXCEPTION("Another IO operation is in progress")
- << TErrorAttribute("connection", Name_);
+ THROW_ERROR(AnnotateError(TError("Another IO operation is in progress")));
}
YT_VERIFY(!direction->Running);
@@ -1010,7 +1022,7 @@ private:
if (result.IsOK()) {
direction->BytesTransferred += result.Value().ByteCount;
} else {
- result = result << TErrorAttribute("connection", Name_);
+ result = AnnotateError(result);
}
bool needUnregister = false;
@@ -1154,14 +1166,19 @@ public:
YT_UNUSED_FUTURE(Impl_->Abort(TError("Connection is abandoned")));
}
- const TNetworkAddress& LocalAddress() const override
+ TConnectionId GetId() const override
+ {
+ return Impl_->GetId();
+ }
+
+ const TNetworkAddress& GetLocalAddress() const override
{
- return Impl_->LocalAddress();
+ return Impl_->GetLocalAddress();
}
- const TNetworkAddress& RemoteAddress() const override
+ const TNetworkAddress& GetRemoteAddress() const override
{
- return Impl_->RemoteAddress();
+ return Impl_->GetRemoteAddress();
}
int GetHandle() const override
diff --git a/yt/yt/core/net/connection.h b/yt/yt/core/net/connection.h
index 24b28872d5e..894b7482220 100644
--- a/yt/yt/core/net/connection.h
+++ b/yt/yt/core/net/connection.h
@@ -68,8 +68,10 @@ struct IConnection
: public IConnectionReader
, public IConnectionWriter
{
- virtual const TNetworkAddress& LocalAddress() const = 0;
- virtual const TNetworkAddress& RemoteAddress() const = 0;
+ virtual TConnectionId GetId() const = 0;
+
+ virtual const TNetworkAddress& GetLocalAddress() const = 0;
+ virtual const TNetworkAddress& GetRemoteAddress() const = 0;
// Returns true if connection is not is failed state and has no
// active IO operations.
diff --git a/yt/yt/core/net/dialer.cpp b/yt/yt/core/net/dialer.cpp
index d21b9b01402..5bc8d3db92d 100644
--- a/yt/yt/core/net/dialer.cpp
+++ b/yt/yt/core/net/dialer.cpp
@@ -1,6 +1,7 @@
#include "dialer.h"
#include "connection.h"
#include "config.h"
+#include "private.h"
#include <yt/yt/core/concurrency/pollable_detail.h>
@@ -15,6 +16,10 @@ using namespace NConcurrency;
////////////////////////////////////////////////////////////////////////////////
+static constexpr auto& Logger = NetLogger;
+
+////////////////////////////////////////////////////////////////////////////////
+
class TDialSession
: public TRefCounted
{
@@ -23,48 +28,55 @@ public:
const TNetworkAddress& remoteAddress,
const IAsyncDialerPtr& asyncDialer,
IPollerPtr poller)
- : Name_(Format("dialer[%v]", remoteAddress))
- , RemoteAddress_(remoteAddress)
+ : RemoteAddress_(remoteAddress)
, Poller_(std::move(poller))
, Session_(asyncDialer->CreateSession(
remoteAddress,
BIND(&TDialSession::OnDialerFinished, MakeWeak(this))))
+ { }
+
+ TFuture<IConnectionPtr> Run()
{
+ YT_LOG_DEBUG("Dial started (Address: %v)",
+ RemoteAddress_);
+
Session_->Dial();
Promise_.OnCanceled(BIND([this, this_ = MakeStrong(this)] (const TError& error) {
Promise_.TrySet(TError(NYT::EErrorCode::Canceled, "Dial canceled")
- << TErrorAttribute("dialer", Name_)
+ << TErrorAttribute("remote_address", ToString(RemoteAddress_))
<< error);
}));
- }
- TFuture<IConnectionPtr> GetFuture() const
- {
return Promise_.ToFuture();
}
private:
- const TString Name_;
const TNetworkAddress RemoteAddress_;
const IPollerPtr Poller_;
const IAsyncDialerSessionPtr Session_;
const TPromise<IConnectionPtr> Promise_ = NewPromise<IConnectionPtr>();
- void OnDialerFinished(const TErrorOr<SOCKET>& socketOrError)
+ void OnDialerFinished(const TErrorOr<TFileDescriptor>& fdOrError)
{
- if (socketOrError.IsOK()) {
- auto socket = socketOrError.Value();
- Promise_.TrySet(CreateConnectionFromFD(
- socket,
- GetSocketName(socket),
- RemoteAddress_,
- Poller_));
- } else {
- Promise_.TrySet(socketOrError
- << TErrorAttribute("dialer", Name_));
+ if (!fdOrError.IsOK()) {
+ Promise_.TrySet(TError("Dial failed")
+ << TErrorAttribute("remote_address", ToString(RemoteAddress_))
+ << fdOrError);
+ return;
}
+
+ auto socket = fdOrError.Value();
+ YT_LOG_DEBUG("Dial completed (Address: %v, FD: %v)",
+ RemoteAddress_,
+ socket);
+
+ Promise_.TrySet(CreateConnectionFromFD(
+ socket,
+ GetSocketName(socket),
+ RemoteAddress_,
+ Poller_));
}
};
@@ -86,14 +98,14 @@ public:
{ }
TFuture<IConnectionPtr> Dial(
- const TNetworkAddress& remote,
+ const TNetworkAddress& remoteAddress,
TDialerContextPtr /*context*/) override
{
auto session = New<TDialSession>(
- remote,
+ remoteAddress,
AsyncDialer_,
Poller_);
- return session->GetFuture();
+ return session->Run();
}
private:
diff --git a/yt/yt/core/net/dialer.h b/yt/yt/core/net/dialer.h
index b48c782cf5e..ac145d28a8d 100644
--- a/yt/yt/core/net/dialer.h
+++ b/yt/yt/core/net/dialer.h
@@ -4,6 +4,8 @@
#include <yt/yt/core/net/address.h>
+#include <yt/yt/core/misc/proc.h>
+
#include <library/cpp/yt/logging/public.h>
#include <library/cpp/yt/memory/ref.h>
@@ -14,8 +16,7 @@ namespace NYT::NNet {
////////////////////////////////////////////////////////////////////////////////
-struct TDialerContext
- : public TRefCounted
+struct TDialerContext final
{
//! Host is used for TlsDialer.
std::optional<TString> Host;
@@ -24,12 +25,11 @@ struct TDialerContext
DEFINE_REFCOUNTED_TYPE(TDialerContext)
//! Dialer establishes connection to a (resolved) network address.
-
struct IDialer
: public virtual TRefCounted
{
virtual TFuture<IConnectionPtr> Dial(
- const TNetworkAddress& remote,
+ const TNetworkAddress& remoteAddress,
TDialerContextPtr context = nullptr) = 0;
};
@@ -43,7 +43,7 @@ IDialerPtr CreateDialer(
////////////////////////////////////////////////////////////////////////////////
//! Async dialer notifies caller via callback for better performance.
-using TAsyncDialerCallback = TCallback<void(const TErrorOr<SOCKET>&)>;
+using TAsyncDialerCallback = TCallback<void(const TErrorOr<TFileDescriptor>&)>;
//! Dialer session interface.
//! Caller should hold a reference to a session until callback is called.
diff --git a/yt/yt/core/net/public.h b/yt/yt/core/net/public.h
index b28a2f65d21..fde7eea47a6 100644
--- a/yt/yt/core/net/public.h
+++ b/yt/yt/core/net/public.h
@@ -12,6 +12,8 @@ class TNetworkAddress;
class TIP6Address;
class TIP6Network;
+using TConnectionId = TGuid;
+
DECLARE_REFCOUNTED_STRUCT(IConnection)
DECLARE_REFCOUNTED_STRUCT(IPacketConnection)
DECLARE_REFCOUNTED_STRUCT(IConnectionReader)