aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-02-27 16:27:23 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-02-27 17:00:48 +0300
commit6c35e9daf4dc86464e1a262236b8d3593e690ee5 (patch)
tree2727d5da08d12182c1a705ec7213a6b97f5da521 /yt
parent6fa64684d15341dd701bf1498ecafdd764d5f097 (diff)
downloadydb-6c35e9daf4dc86464e1a262236b8d3593e690ee5.tar.gz
Intermediate changes
Diffstat (limited to 'yt')
-rw-r--r--yt/cpp/mapreduce/client/client.cpp34
-rw-r--r--yt/yt/client/api/internal_client.cpp18
-rw-r--r--yt/yt/client/api/internal_client.h4
-rw-r--r--yt/yt/client/driver/internal_commands.cpp2
-rw-r--r--yt/yt/core/bus/tcp/client.cpp6
-rw-r--r--yt/yt/core/bus/tcp/connection.cpp42
-rw-r--r--yt/yt/core/bus/tcp/connection.h2
-rw-r--r--yt/yt/core/crypto/tls.cpp86
-rw-r--r--yt/yt/core/crypto/tls.h5
-rw-r--r--yt/yt/core/crypto/unittests/tls_ut.cpp7
-rw-r--r--yt/yt/core/http/client.cpp14
-rw-r--r--yt/yt/core/http/connection_pool.cpp6
-rw-r--r--yt/yt/core/http/connection_pool.h4
-rw-r--r--yt/yt/core/https/client.cpp1
-rw-r--r--yt/yt/core/https/config.cpp2
-rw-r--r--yt/yt/core/https/config.h1
-rw-r--r--yt/yt/core/https/server.cpp83
-rw-r--r--yt/yt/core/net/dialer.cpp4
-rw-r--r--yt/yt/core/net/dialer.h15
-rw-r--r--yt/yt/core/net/mock/dialer.cpp2
-rw-r--r--yt/yt/core/net/mock/dialer.h2
-rw-r--r--yt/yt/core/net/public.h1
-rw-r--r--yt/yt/library/program/config.h1
-rw-r--r--yt/yt_proto/yt/client/chunk_client/proto/chunk_meta.proto5
-rw-r--r--yt/yt_proto/yt/core/bus/proto/bus.proto4
25 files changed, 285 insertions, 66 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 22f1253e2e..4548577834 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -44,6 +44,8 @@
#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
#include <yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h>
+#include <yt/yt/core/ytree/fluent.h>
+
#include <library/cpp/json/json_reader.h>
#include <util/generic/algorithm.h>
@@ -62,6 +64,32 @@ namespace NDetail {
////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+THashMap<TString, TString> ParseProxyUrlAliasingRules(TString envConfig)
+{
+ if (envConfig.empty()) {
+ return {};
+ }
+ return NYTree::ConvertTo<THashMap<TString, TString>>(NYson::TYsonString(envConfig));
+}
+
+void ApplyProxyUrlAliasingRules(TString& url)
+{
+ static auto rules = ParseProxyUrlAliasingRules(GetEnv("YT_PROXY_URL_ALIASING_CONFIG"));
+ if (auto ruleIt = rules.find(url); ruleIt != rules.end()) {
+ url = ruleIt->second;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+
+////////////////////////////////////////////////////////////////////////////////
+
TClientBase::TClientBase(
const TClientContext& context,
const TTransactionId& transactionId,
@@ -1333,8 +1361,10 @@ TClientPtr CreateClientImpl(
context.ProxyAddress = options.ProxyAddress_;
context.ServerName = serverName;
+
if (context.ServerName.find('.') == TString::npos &&
- context.ServerName.find(':') == TString::npos)
+ context.ServerName.find(':') == TString::npos &&
+ context.ServerName.find("localhost") == TString::npos)
{
context.ServerName += ".yt.yandex.net";
}
@@ -1419,6 +1449,8 @@ IClientPtr CreateClientFromEnv(const TCreateClientOptions& options)
ythrow yexception() << "YT_PROXY is not set";
}
+ NDetail::ApplyProxyUrlAliasingRules(serverName);
+
return NDetail::CreateClientImpl(serverName, options);
}
diff --git a/yt/yt/client/api/internal_client.cpp b/yt/yt/client/api/internal_client.cpp
index 2fb7da7506..8c0e2f4ee5 100644
--- a/yt/yt/client/api/internal_client.cpp
+++ b/yt/yt/client/api/internal_client.cpp
@@ -8,18 +8,26 @@ void TSerializableHunkDescriptor::Register(TRegistrar registrar)
{
registrar.BaseClassParameter("chunk_id", &TThis::ChunkId);
registrar.BaseClassParameter("erasure_codec", &TThis::ErasureCodec)
- .Optional();
+ .Default(NErasure::ECodec::None);
registrar.BaseClassParameter("block_index", &TThis::BlockIndex);
registrar.BaseClassParameter("block_offset", &TThis::BlockOffset);
registrar.BaseClassParameter("block_size", &TThis::BlockSize)
- .Optional();
+ .Default(std::nullopt);
registrar.BaseClassParameter("length", &TThis::Length);
}
-TSerializableHunkDescriptor::TSerializableHunkDescriptor(const THunkDescriptor& descriptor)
- : THunkDescriptor(descriptor)
+TSerializableHunkDescriptorPtr CreateSerializableHunkDescriptor(const THunkDescriptor& descriptor)
{
- ::NYT::NYTree::TYsonStructRegistry::Get()->InitializeStruct(this);
+ auto serializableDescriptor = New<TSerializableHunkDescriptor>();
+ serializableDescriptor->ChunkId = descriptor.ChunkId;
+ serializableDescriptor->ErasureCodec = descriptor.ErasureCodec;
+ serializableDescriptor->BlockIndex = descriptor.BlockIndex;
+ serializableDescriptor->BlockOffset = descriptor.BlockOffset;
+ serializableDescriptor->BlockSize = descriptor.BlockSize;
+ serializableDescriptor->Length = descriptor.Length;
+ serializableDescriptor->Postprocess();
+
+ return serializableDescriptor;
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/internal_client.h b/yt/yt/client/api/internal_client.h
index 01fa29340b..61a2f300ab 100644
--- a/yt/yt/client/api/internal_client.h
+++ b/yt/yt/client/api/internal_client.h
@@ -29,8 +29,6 @@ class TSerializableHunkDescriptor
, public NYTree::TYsonStruct
{
public:
- TSerializableHunkDescriptor(const THunkDescriptor& descriptor);
-
REGISTER_YSON_STRUCT(TSerializableHunkDescriptor);
static void Register(TRegistrar registrar);
@@ -38,6 +36,8 @@ public:
using TSerializableHunkDescriptorPtr = TIntrusivePtr<TSerializableHunkDescriptor>;
+TSerializableHunkDescriptorPtr CreateSerializableHunkDescriptor(const THunkDescriptor& descriptor);
+
////////////////////////////////////////////////////////////////////////////////
struct TReadHunksOptions
diff --git a/yt/yt/client/driver/internal_commands.cpp b/yt/yt/client/driver/internal_commands.cpp
index 889737f91a..a69011714e 100644
--- a/yt/yt/client/driver/internal_commands.cpp
+++ b/yt/yt/client/driver/internal_commands.cpp
@@ -75,7 +75,7 @@ void TWriteHunksCommand::DoExecute(ICommandContextPtr context)
std::vector<NApi::TSerializableHunkDescriptorPtr> serializableDescriptors;
serializableDescriptors.reserve(descriptors.size());
for (const auto& descriptor : descriptors) {
- serializableDescriptors.push_back(New<NApi::TSerializableHunkDescriptor>(descriptor));
+ serializableDescriptors.push_back(CreateSerializableHunkDescriptor(descriptor));
}
context->ProduceOutputValue(BuildYsonStringFluently()
diff --git a/yt/yt/core/bus/tcp/client.cpp b/yt/yt/core/bus/tcp/client.cpp
index 049727b860..6fe8a08856 100644
--- a/yt/yt/core/bus/tcp/client.cpp
+++ b/yt/yt/core/bus/tcp/client.cpp
@@ -175,16 +175,18 @@ public:
auto id = TConnectionId::Create();
- YT_LOG_DEBUG("Connecting to server (Address: %v, ConnectionId: %v, MultiplexingBand: %v, EncryptionMode: %v)",
+ YT_LOG_DEBUG("Connecting to server (Address: %v, ConnectionId: %v, MultiplexingBand: %v, EncryptionMode: %v, VerificationMode: %v)",
EndpointDescription_,
id,
options.MultiplexingBand,
- Config_->EncryptionMode);
+ Config_->EncryptionMode,
+ Config_->VerificationMode);
auto endpointAttributes = ConvertToAttributes(BuildYsonStringFluently()
.BeginMap()
.Items(*EndpointAttributes_)
.Item("connection_id").Value(id)
+ .Item("connection_type").Value(EConnectionType::Client)
.EndMap());
auto poller = TTcpDispatcher::TImpl::Get()->GetXferPoller();
diff --git a/yt/yt/core/bus/tcp/connection.cpp b/yt/yt/core/bus/tcp/connection.cpp
index 84e0f050e7..a235ec6290 100644
--- a/yt/yt/core/bus/tcp/connection.cpp
+++ b/yt/yt/core/bus/tcp/connection.cpp
@@ -121,11 +121,12 @@ TTcpConnection::TTcpConnection(
, UnixDomainSocketPath_(unixDomainSocketPath)
, Handler_(std::move(handler))
, Poller_(std::move(poller))
- , LoggingTag_(Format("ConnectionId: %v, ConnectionType: %v, RemoteAddress: %v, EncryptionMode: %v",
+ , LoggingTag_(Format("ConnectionId: %v, ConnectionType: %v, RemoteAddress: %v, EncryptionMode: %v, VerificationMode: %v",
Id_,
ConnectionType_,
EndpointDescription_,
- Config_->EncryptionMode))
+ Config_->EncryptionMode,
+ Config_->VerificationMode))
, Logger(BusLogger.WithTag(LoggingTag_.c_str()))
, GenerateChecksums_(Config_->GenerateChecksums)
, Socket_(socket)
@@ -287,11 +288,12 @@ void TTcpConnection::TryEnqueueHandshake()
}
NProto::THandshake handshake;
- ToProto(handshake.mutable_foreign_connection_id(), Id_);
+ ToProto(handshake.mutable_connection_id(), Id_);
if (ConnectionType_ == EConnectionType::Client) {
handshake.set_multiplexing_band(ToProto<int>(MultiplexingBand_.load()));
}
- handshake.set_encryption_mode(static_cast<int>(EncryptionMode_));
+ handshake.set_encryption_mode(ToProto<int>(EncryptionMode_));
+ handshake.set_verification_mode(ToProto<int>(VerificationMode_));
auto message = MakeHandshakeMessage(handshake);
auto messageSize = GetByteSize(message);
@@ -500,6 +502,9 @@ void TTcpConnection::Abort(const TError& error)
// Construct a detailed error.
YT_VERIFY(!error.IsOK());
auto detailedError = error << *EndpointAttributes_;
+ if (PeerAttributes_) {
+ detailedError <<= *PeerAttributes_;
+ }
{
auto guard = Guard(Lock_);
@@ -752,6 +757,9 @@ void TTcpConnection::Terminate(const TError& error)
// Construct a detailed error.
YT_VERIFY(!error.IsOK());
auto detailedError = error << *EndpointAttributes_;
+ if (PeerAttributes_) {
+ detailedError <<= *PeerAttributes_;
+ }
auto guard = Guard(Lock_);
@@ -1005,7 +1013,7 @@ ssize_t TTcpConnection::DoReadSocket(char* buffer, size_t size)
case ESslState::Established: {
auto result = SSL_read(Ssl_.get(), buffer, size);
if (PendingSslHandshake_ && result > 0) {
- YT_LOG_DEBUG("TLS/SSL connection has been established by SSL_read (VerificationMode: %v)", VerificationMode_);
+ YT_LOG_DEBUG("TLS/SSL connection has been established by SSL_read");
PendingSslHandshake_ = false;
ReadyPromise_.TrySet();
}
@@ -1178,10 +1186,18 @@ bool TTcpConnection::OnHandshakePacketReceived()
? std::make_optional(FromProto<EMultiplexingBand>(handshake.multiplexing_band()))
: std::nullopt;
- YT_LOG_DEBUG("Handshake received (ForeignConnectionId: %v, MultiplexingBand: %v, ForeignEncryptionMode: %v)",
- FromProto<TConnectionId>(handshake.foreign_connection_id()),
- optionalMultiplexingBand,
- static_cast<EEncryptionMode>(handshake.encryption_mode()));
+ PeerAttributes_ = ConvertToAttributes(BuildYsonStringFluently()
+ .BeginMap()
+ .Item("peer_connection_id").Value(FromProto<TConnectionId>(handshake.connection_id()))
+ .Item("peer_encryption_mode").Value(FromProto<EEncryptionMode>(handshake.encryption_mode()))
+ .Item("peer_verification_mode").Value(FromProto<EVerificationMode>(handshake.verification_mode()))
+ .EndMap());
+
+ YT_LOG_DEBUG("Handshake received (PeerConnectionId: %v, PeerEncryptionMode: %v, PeerVerificationMode: %v, MultiplexingBand: %v)",
+ PeerAttributes_->Get<TString>("peer_connection_id"),
+ PeerAttributes_->Get<TString>("peer_encryption_mode"),
+ PeerAttributes_->Get<TString>("peer_verification_mode"),
+ optionalMultiplexingBand);
if (ConnectionType_ == EConnectionType::Server && optionalMultiplexingBand) {
auto guard = Guard(Lock_);
@@ -1197,7 +1213,7 @@ bool TTcpConnection::OnHandshakePacketReceived()
TryEnqueueHandshake();
}
- auto otherEncryptionMode = handshake.has_encryption_mode() ? static_cast<EEncryptionMode>(handshake.encryption_mode()) : EEncryptionMode::Disabled;
+ auto otherEncryptionMode = handshake.has_encryption_mode() ? FromProto<EEncryptionMode>(handshake.encryption_mode()) : EEncryptionMode::Disabled;
if (EncryptionMode_ == EEncryptionMode::Required || otherEncryptionMode == EEncryptionMode::Required) {
if (EncryptionMode_ == EEncryptionMode::Disabled || otherEncryptionMode == EEncryptionMode::Disabled) {
@@ -1299,7 +1315,7 @@ ssize_t TTcpConnection::DoWriteFragments(const std::vector<struct iovec>& vec)
YT_ASSERT(vec.size() == 1);
auto result = SSL_write(Ssl_.get(), vec[0].iov_base, vec[0].iov_len);
if (PendingSslHandshake_ && result > 0) {
- YT_LOG_DEBUG("TLS/SSL connection has been established by SSL_write (VerificationMode: %v)", VerificationMode_);
+ YT_LOG_DEBUG("TLS/SSL connection has been established by SSL_write");
PendingSslHandshake_ = false;
ReadyPromise_.TrySet();
}
@@ -1857,7 +1873,7 @@ bool TTcpConnection::DoSslHandshake()
auto result = SSL_do_handshake(Ssl_.get());
switch (SSL_get_error(Ssl_.get(), result)) {
case SSL_ERROR_NONE:
- YT_LOG_DEBUG("TLS/SSL connection has been established by SSL_do_handshake (VerificationMode %v)", VerificationMode_);
+ YT_LOG_DEBUG("TLS/SSL connection has been established by SSL_do_handshake");
MaxFragmentsPerWrite_ = 1;
SslState_ = ESslState::Established;
ReadyPromise_.TrySet();
@@ -1918,7 +1934,7 @@ void TTcpConnection::TryEstablishSslSession()
return;
}
- YT_LOG_DEBUG("Starting TLS/SSL connection (VerificationMode: %v)", VerificationMode_);
+ YT_LOG_DEBUG("Starting TLS/SSL connection");
if (Config_->LoadCertsFromBusCertsDirectory && !TTcpDispatcher::TImpl::Get()->GetBusCertsDirectoryPath()) {
Abort(TError(NBus::EErrorCode::SslError, "bus_certs_directory_path is not set in tcp_dispatcher config"));
diff --git a/yt/yt/core/bus/tcp/connection.h b/yt/yt/core/bus/tcp/connection.h
index 39c5dea4d2..0f5d758152 100644
--- a/yt/yt/core/bus/tcp/connection.h
+++ b/yt/yt/core/bus/tcp/connection.h
@@ -277,6 +277,8 @@ private:
const EEncryptionMode EncryptionMode_;
const EVerificationMode VerificationMode_;
+ NYTree::IAttributeDictionaryPtr PeerAttributes_;
+
size_t MaxFragmentsPerWrite_ = 256;
void Open();
diff --git a/yt/yt/core/crypto/tls.cpp b/yt/yt/core/crypto/tls.cpp
index a79a0c9458..f2e9ae036a 100644
--- a/yt/yt/core/crypto/tls.cpp
+++ b/yt/yt/core/crypto/tls.cpp
@@ -54,6 +54,24 @@ struct TSslContextImpl
TSslContextImpl()
{
+ Reset();
+ }
+
+ ~TSslContextImpl()
+ {
+ if (Ctx) {
+ SSL_CTX_free(Ctx);
+ }
+ if (ActiveCtx_) {
+ SSL_CTX_free(ActiveCtx_);
+ }
+ }
+
+ void Reset()
+ {
+ if (Ctx) {
+ SSL_CTX_free(Ctx);
+ }
#if OPENSSL_VERSION_NUMBER >= 0x10100000L
Ctx = SSL_CTX_new(TLS_method());
if (!Ctx) {
@@ -77,12 +95,37 @@ struct TSslContextImpl
#endif
}
- ~TSslContextImpl()
+ void Commit()
{
- if (Ctx) {
- SSL_CTX_free(Ctx);
+ SSL_CTX* oldCtx;
+ YT_ASSERT(Ctx);
+ {
+ auto guard = WriterGuard(Lock_);
+ oldCtx = ActiveCtx_;
+ ActiveCtx_ = Ctx;
+ Ctx = nullptr;
+ }
+ if (oldCtx) {
+ SSL_CTX_free(oldCtx);
}
}
+
+ SSL* NewSsl()
+ {
+ auto guard = ReaderGuard(Lock_);
+ YT_ASSERT(ActiveCtx_);
+ return SSL_new(ActiveCtx_);
+ }
+
+ bool IsActive(const SSL* ssl)
+ {
+ auto guard = ReaderGuard(Lock_);
+ return SSL_get_SSL_CTX(ssl) == ActiveCtx_;
+ }
+
+private:
+ YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, Lock_);
+ SSL_CTX* ActiveCtx_ = nullptr;
};
DEFINE_REFCOUNTED_TYPE(TSslContextImpl)
@@ -104,7 +147,7 @@ public:
, Invoker_(CreateSerializedInvoker(poller->GetInvoker(), "crypto_tls_connection"))
, Underlying_(std::move(connection))
{
- Ssl_ = SSL_new(Ctx_->Ctx);
+ Ssl_ = Ctx_->NewSsl();
if (!Ssl_) {
THROW_ERROR_EXCEPTION("SSL_new failed")
<< GetLastSslError();
@@ -124,6 +167,11 @@ public:
OutputBuffer_ = TSharedMutableRef::Allocate<TTlsBufferTag>(TlsBufferSize);
}
+ void SetHost(const TString& host)
+ {
+ SSL_set_tlsext_host_name(Ssl_, host.c_str());
+ }
+
~TTlsConnection()
{
SSL_free(Ssl_);
@@ -496,12 +544,16 @@ public:
, Poller_(std::move(poller))
{ }
- TFuture<IConnectionPtr> Dial(const TNetworkAddress& remote) override
+ TFuture<IConnectionPtr> Dial(const TNetworkAddress& remote, TRemoteContextPtr context) override
{
- return Underlying_->Dial(remote).Apply(BIND([ctx = Ctx_, poller = Poller_] (const IConnectionPtr& underlying) -> IConnectionPtr {
- auto connection = New<TTlsConnection>(ctx, poller, underlying);
- connection->StartClient();
- return connection;
+ return Underlying_->Dial(remote)
+ .Apply(BIND([ctx = Ctx_, poller = Poller_, context = std::move(context)](const IConnectionPtr& underlying) -> IConnectionPtr {
+ auto connection = New<TTlsConnection>(ctx, poller, underlying);
+ if (context != nullptr && context->Host != std::nullopt) {
+ connection->SetHost(*(context->Host));
+ }
+ connection->StartClient();
+ return connection;
}));
}
@@ -558,6 +610,22 @@ TSslContext::TSslContext()
: Impl_(New<TSslContextImpl>())
{ }
+void TSslContext::Reset()
+{
+ Impl_->Reset();
+}
+
+void TSslContext::Commit(TInstant time)
+{
+ CommitTime_ = time;
+ Impl_->Commit();
+}
+
+TInstant TSslContext::GetCommitTime()
+{
+ return CommitTime_;
+}
+
void TSslContext::UseBuiltinOpenSslX509Store()
{
SSL_CTX_set_cert_store(Impl_->Ctx, GetBuiltinOpenSslX509Store().Release());
diff --git a/yt/yt/core/crypto/tls.h b/yt/yt/core/crypto/tls.h
index 5844d3f021..bb9f85503f 100644
--- a/yt/yt/core/crypto/tls.h
+++ b/yt/yt/core/crypto/tls.h
@@ -20,6 +20,10 @@ class TSslContext
public:
TSslContext();
+ void Reset();
+ void Commit(TInstant time = TInstant::Zero());
+ TInstant GetCommitTime();
+
void UseBuiltinOpenSslX509Store();
void SetCipherList(const TString& list);
@@ -48,6 +52,7 @@ public:
private:
const TIntrusivePtr<TSslContextImpl> Impl_;
+ TInstant CommitTime_;
};
DEFINE_REFCOUNTED_TYPE(TSslContext)
diff --git a/yt/yt/core/crypto/unittests/tls_ut.cpp b/yt/yt/core/crypto/unittests/tls_ut.cpp
index 2981b24350..288994abaf 100644
--- a/yt/yt/core/crypto/unittests/tls_ut.cpp
+++ b/yt/yt/core/crypto/unittests/tls_ut.cpp
@@ -37,6 +37,7 @@ public:
Context->AddCertificate(TestCertificate);
Context->AddPrivateKey(TestCertificate);
+ Context->Commit();
Poller = CreateThreadPoolPoller(2, "TlsTest");
}
@@ -78,7 +79,10 @@ TEST_F(TTlsTest, SimplePingPong)
config->SetDefaults();
auto dialer = Context->CreateDialer(config, Poller, NetLogger);
- auto asyncFirstSide = dialer->Dial(listener->GetAddress());
+ auto context = New<TRemoteContext>();
+ context->Host = "localhost";
+
+ auto asyncFirstSide = dialer->Dial(listener->GetAddress(), context);
auto asyncSecondSide = listener->Accept();
auto firstSide = asyncFirstSide.Get().ValueOrThrow();
@@ -106,6 +110,7 @@ TEST(TTlsTestWithoutFixture, LoadCertificateChain)
auto grpcLock = NRpc::NGrpc::TDispatcher::Get()->GetLibraryLock();
auto context = New<TSslContext>();
context->AddCertificateChain(TestCertificateChain);
+ context->Commit();
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/http/client.cpp b/yt/yt/core/http/client.cpp
index 49ab4286f7..b6eb5181be 100644
--- a/yt/yt/core/http/client.cpp
+++ b/yt/yt/core/http/client.cpp
@@ -126,11 +126,15 @@ private:
return TNetworkAddress(address, parsedUrl.Port.value_or(GetDefaultPort(parsedUrl)));
}
- std::pair<THttpOutputPtr, THttpInputPtr> OpenHttp(const TNetworkAddress& address)
+ std::pair<THttpOutputPtr, THttpInputPtr> OpenHttp(const TUrlRef& urlRef)
{
+ auto context = New<TRemoteContext>();
+ context->Host = urlRef.Host;
+ auto address = GetAddress(urlRef);
+
// TODO(aleexfi): Enable connection pool by default
if (Config_->MaxIdleConnections == 0) {
- auto connection = WaitFor(Dialer_->Dial(address)).ValueOrThrow();
+ auto connection = WaitFor(Dialer_->Dial(address, std::move(context))).ValueOrThrow();
auto input = New<THttpInput>(
connection,
@@ -146,7 +150,7 @@ private:
return {std::move(output), std::move(input)};
} else {
- auto connection = WaitFor(ConnectionPool_->Connect(address)).ValueOrThrow();
+ auto connection = WaitFor(ConnectionPool_->Connect(address, std::move(context))).ValueOrThrow();
auto reuseSharedState = New<NDetail::TReusableConnectionState>(connection, ConnectionPool_);
@@ -239,8 +243,8 @@ private:
THttpInputPtr response;
auto urlRef = ParseUrl(url);
- auto address = GetAddress(urlRef);
- std::tie(request, response) = OpenHttp(address);
+
+ std::tie(request, response) = OpenHttp(urlRef);
request->SetHost(urlRef.Host, urlRef.PortStr);
if (headers) {
diff --git a/yt/yt/core/http/connection_pool.cpp b/yt/yt/core/http/connection_pool.cpp
index 9cb971b8ab..3684bda591 100644
--- a/yt/yt/core/http/connection_pool.cpp
+++ b/yt/yt/core/http/connection_pool.cpp
@@ -52,7 +52,9 @@ TConnectionPool::~TConnectionPool()
YT_UNUSED_FUTURE(ExpiredConnectionsCollector_->Stop());
}
-TFuture<IConnectionPtr> TConnectionPool::Connect(const TNetworkAddress& address)
+TFuture<IConnectionPtr> TConnectionPool::Connect(
+ const TNetworkAddress& address,
+ TRemoteContextPtr context)
{
{
auto guard = Guard(SpinLock_);
@@ -64,7 +66,7 @@ TFuture<IConnectionPtr> TConnectionPool::Connect(const TNetworkAddress& address)
}
}
- return Dialer_->Dial(address);
+ return Dialer_->Dial(address, std::move(context));
}
void TConnectionPool::Release(const IConnectionPtr& connection)
diff --git a/yt/yt/core/http/connection_pool.h b/yt/yt/core/http/connection_pool.h
index b7c06245bd..026626967f 100644
--- a/yt/yt/core/http/connection_pool.h
+++ b/yt/yt/core/http/connection_pool.h
@@ -38,7 +38,9 @@ public:
~TConnectionPool();
- TFuture<NNet::IConnectionPtr> Connect(const NNet::TNetworkAddress& address);
+ TFuture<NNet::IConnectionPtr> Connect(
+ const NNet::TNetworkAddress& address,
+ NNet::TRemoteContextPtr context = nullptr);
void Release(const NNet::IConnectionPtr& connection);
diff --git a/yt/yt/core/https/client.cpp b/yt/yt/core/https/client.cpp
index 2f4b415b89..8ce559a583 100644
--- a/yt/yt/core/https/client.cpp
+++ b/yt/yt/core/https/client.cpp
@@ -120,6 +120,7 @@ IClientPtr CreateClient(
} else {
sslContext->UseBuiltinOpenSslX509Store();
}
+ sslContext->Commit();
auto tlsDialer = sslContext->CreateDialer(
New<TDialerConfig>(),
diff --git a/yt/yt/core/https/config.cpp b/yt/yt/core/https/config.cpp
index 49c59d5258..c41fde9624 100644
--- a/yt/yt/core/https/config.cpp
+++ b/yt/yt/core/https/config.cpp
@@ -10,6 +10,8 @@ void TServerCredentialsConfig::Register(TRegistrar registrar)
.Optional();
registrar.Parameter("cert_chain", &TThis::CertChain)
.Optional();
+ registrar.Parameter("update_period", &TThis::UpdatePeriod)
+ .Optional();
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/https/config.h b/yt/yt/core/https/config.h
index fe4decc26e..eb6f010008 100644
--- a/yt/yt/core/https/config.h
+++ b/yt/yt/core/https/config.h
@@ -16,6 +16,7 @@ class TServerCredentialsConfig
public:
NCrypto::TPemBlobConfigPtr PrivateKey;
NCrypto::TPemBlobConfigPtr CertChain;
+ TDuration UpdatePeriod;
REGISTER_YSON_STRUCT(TServerCredentialsConfig);
diff --git a/yt/yt/core/https/server.cpp b/yt/yt/core/https/server.cpp
index e53f132c04..96fd155390 100644
--- a/yt/yt/core/https/server.cpp
+++ b/yt/yt/core/https/server.cpp
@@ -2,15 +2,23 @@
#include "config.h"
#include <yt/yt/core/http/server.h>
+#include <yt/yt/core/http/private.h>
#include <yt/yt/core/crypto/tls.h>
+#include <yt/yt/core/logging/log.h>
+
+#include <yt/yt/core/misc/fs.h>
+
#include <yt/yt/core/net/address.h>
#include <yt/yt/core/concurrency/poller.h>
+#include <yt/yt/core/concurrency/periodic_executor.h>
namespace NYT::NHttps {
+static const auto& Logger = NHttp::HttpLogger;
+
using namespace NNet;
using namespace NHttp;
using namespace NCrypto;
@@ -22,8 +30,9 @@ class TServer
: public IServer
{
public:
- explicit TServer(IServerPtr underlying)
+ explicit TServer(IServerPtr underlying, TPeriodicExecutorPtr certificateUpdater)
: Underlying_(std::move(underlying))
+ , CertificateUpdater_(certificateUpdater)
{ }
void AddHandler(
@@ -42,12 +51,18 @@ public:
void Start() override
{
Underlying_->Start();
+ if (CertificateUpdater_) {
+ CertificateUpdater_->Start();
+ }
}
//! Stops the server.
void Stop() override
{
Underlying_->Stop();
+ if (CertificateUpdater_) {
+ YT_UNUSED_FUTURE(CertificateUpdater_->Stop());
+ }
}
void SetPathMatcher(const IRequestPathMatcherPtr& matcher) override
@@ -62,28 +77,66 @@ public:
private:
const IServerPtr Underlying_;
+ const TPeriodicExecutorPtr CertificateUpdater_;
};
-IServerPtr CreateServer(
- const TServerConfigPtr& config,
- const IPollerPtr& poller,
- const IPollerPtr& acceptor)
+static void ApplySslConfig(const TSslContextPtr& sslContext, const TServerCredentialsConfigPtr& sslConfig)
{
- auto sslContext = New<TSslContext>();
- if (config->Credentials->CertChain->FileName) {
- sslContext->AddCertificateChainFromFile(*config->Credentials->CertChain->FileName);
- } else if (config->Credentials->CertChain->Value) {
- sslContext->AddCertificateChain(*config->Credentials->CertChain->Value);
+ if (sslConfig->CertChain->FileName) {
+ sslContext->AddCertificateChainFromFile(*sslConfig->CertChain->FileName);
+ } else if (sslConfig->CertChain->Value) {
+ sslContext->AddCertificateChain(*sslConfig->CertChain->Value);
} else {
YT_ABORT();
}
- if (config->Credentials->PrivateKey->FileName) {
- sslContext->AddPrivateKeyFromFile(*config->Credentials->PrivateKey->FileName);
- } else if (config->Credentials->PrivateKey->Value) {
- sslContext->AddPrivateKey(*config->Credentials->PrivateKey->Value);
+ if (sslConfig->PrivateKey->FileName) {
+ sslContext->AddPrivateKeyFromFile(*sslConfig->PrivateKey->FileName);
+ } else if (sslConfig->PrivateKey->Value) {
+ sslContext->AddPrivateKey(*sslConfig->PrivateKey->Value);
} else {
YT_ABORT();
}
+}
+
+IServerPtr CreateServer(
+ const TServerConfigPtr& config,
+ const IPollerPtr& poller,
+ const IPollerPtr& acceptor)
+{
+ auto sslContext = New<TSslContext>();
+ ApplySslConfig(sslContext, config->Credentials);
+ sslContext->Commit();
+
+ auto sslConfig = config->Credentials;
+ TPeriodicExecutorPtr certificateUpdater;
+ if (sslConfig->UpdatePeriod &&
+ sslConfig->CertChain->FileName &&
+ sslConfig->PrivateKey->FileName)
+ {
+ certificateUpdater = New<TPeriodicExecutor>(
+ poller->GetInvoker(),
+ BIND([=, serverName = config->ServerName] {
+ try {
+ auto modificationTime = Max(
+ NFS::GetPathStatistics(*sslConfig->CertChain->FileName).ModificationTime,
+ NFS::GetPathStatistics(*sslConfig->PrivateKey->FileName).ModificationTime);
+
+ // Detect fresh and stable updates.
+ if (modificationTime > sslContext->GetCommitTime() &&
+ modificationTime + sslConfig->UpdatePeriod <= TInstant::Now())
+ {
+ YT_LOG_INFO("Updating TLS certificates (ServerName: %v, ModificationTime: %v)", serverName, modificationTime);
+ sslContext->Reset();
+ ApplySslConfig(sslContext, sslConfig);
+ sslContext->Commit(modificationTime);
+ YT_LOG_INFO("TLS certificates updated (ServerName: %v)", serverName);
+ }
+ } catch (const std::exception& ex) {
+ YT_LOG_WARNING(ex, "Unexpected exception while updating TLS certificates (ServerName: %v)", serverName);
+ }
+ }),
+ sslConfig->UpdatePeriod);
+ }
auto address = TNetworkAddress::CreateIPv6Any(config->Port);
auto tlsListener = sslContext->CreateListener(address, poller, acceptor);
@@ -92,7 +145,7 @@ IServerPtr CreateServer(
configCopy->IsHttps = true;
auto httpServer = NHttp::CreateServer(configCopy, tlsListener, poller, acceptor);
- return New<TServer>(std::move(httpServer));
+ return New<TServer>(std::move(httpServer), std::move(certificateUpdater));
}
IServerPtr CreateServer(const TServerConfigPtr& config, const IPollerPtr& poller)
diff --git a/yt/yt/core/net/dialer.cpp b/yt/yt/core/net/dialer.cpp
index 1a23091b91..c27489d23b 100644
--- a/yt/yt/core/net/dialer.cpp
+++ b/yt/yt/core/net/dialer.cpp
@@ -85,7 +85,9 @@ public:
, Poller_(std::move(poller))
{ }
- TFuture<IConnectionPtr> Dial(const TNetworkAddress& remote) override
+ TFuture<IConnectionPtr> Dial(
+ const TNetworkAddress& remote,
+ TRemoteContextPtr /*context*/) override
{
auto session = New<TDialSession>(
remote,
diff --git a/yt/yt/core/net/dialer.h b/yt/yt/core/net/dialer.h
index b5a6ce3b2a..8dbd67cc43 100644
--- a/yt/yt/core/net/dialer.h
+++ b/yt/yt/core/net/dialer.h
@@ -14,12 +14,25 @@ namespace NYT::NNet {
////////////////////////////////////////////////////////////////////////////////
+//! Сontext that is passed to the Dialer.
+
+struct TRemoteContext
+ : public TRefCounted
+{
+ //! Host is used for TlsDialer.
+ std::optional<TString> Host;
+};
+
+DEFINE_REFCOUNTED_TYPE(TRemoteContext)
+
//! Dialer establishes connection to a (resolved) network address.
struct IDialer
: public virtual TRefCounted
{
- virtual TFuture<IConnectionPtr> Dial(const TNetworkAddress& remote) = 0;
+ virtual TFuture<IConnectionPtr> Dial(
+ const TNetworkAddress& remote,
+ TRemoteContextPtr context = nullptr) = 0;
};
DEFINE_REFCOUNTED_TYPE(IDialer)
diff --git a/yt/yt/core/net/mock/dialer.cpp b/yt/yt/core/net/mock/dialer.cpp
index 6e30a087a5..4b3582b1b5 100644
--- a/yt/yt/core/net/mock/dialer.cpp
+++ b/yt/yt/core/net/mock/dialer.cpp
@@ -7,7 +7,7 @@ namespace NYT::NNet {
TDialerMock::TDialerMock(IDialerPtr underlying)
: Underlying_(std::move(underlying))
{
- ON_CALL(*this, Dial).WillByDefault([this] (const TNetworkAddress& address) {
+ ON_CALL(*this, Dial).WillByDefault([this] (const TNetworkAddress& address, TRemoteContextPtr /*context*/) {
return Underlying_->Dial(address);
});
}
diff --git a/yt/yt/core/net/mock/dialer.h b/yt/yt/core/net/mock/dialer.h
index 707a7b3dad..10a9237f09 100644
--- a/yt/yt/core/net/mock/dialer.h
+++ b/yt/yt/core/net/mock/dialer.h
@@ -14,7 +14,7 @@ class TDialerMock
public:
explicit TDialerMock(IDialerPtr underlying);
- MOCK_METHOD(TFuture<IConnectionPtr>, Dial, (const TNetworkAddress& remote), (override));
+ MOCK_METHOD(TFuture<IConnectionPtr>, Dial, (const TNetworkAddress& remote, TRemoteContextPtr context), (override));
private:
const IDialerPtr Underlying_;
diff --git a/yt/yt/core/net/public.h b/yt/yt/core/net/public.h
index 05d3ec03de..42566ad7fa 100644
--- a/yt/yt/core/net/public.h
+++ b/yt/yt/core/net/public.h
@@ -17,6 +17,7 @@ DECLARE_REFCOUNTED_STRUCT(IPacketConnection)
DECLARE_REFCOUNTED_STRUCT(IConnectionReader)
DECLARE_REFCOUNTED_STRUCT(IConnectionWriter)
DECLARE_REFCOUNTED_STRUCT(IListener)
+DECLARE_REFCOUNTED_STRUCT(TRemoteContext)
DECLARE_REFCOUNTED_STRUCT(IDialer)
DECLARE_REFCOUNTED_STRUCT(IAsyncDialer)
DECLARE_REFCOUNTED_STRUCT(IAsyncDialerSession)
diff --git a/yt/yt/library/program/config.h b/yt/yt/library/program/config.h
index 10b6fd5b06..64723698f2 100644
--- a/yt/yt/library/program/config.h
+++ b/yt/yt/library/program/config.h
@@ -29,7 +29,6 @@
#include <library/cpp/yt/stockpile/stockpile.h>
-
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt_proto/yt/client/chunk_client/proto/chunk_meta.proto b/yt/yt_proto/yt/client/chunk_client/proto/chunk_meta.proto
index 874ec93151..e6aad0b3c0 100644
--- a/yt/yt_proto/yt/client/chunk_client/proto/chunk_meta.proto
+++ b/yt/yt_proto/yt/client/chunk_client/proto/chunk_meta.proto
@@ -146,11 +146,10 @@ message TStripedErasurePlacementExt
// Sizes of the original blocks.
repeated int64 block_sizes = 3;
- // Some of the input blocks can be padded to fit into the segment.
- repeated int64 block_padding_sizes = 5;
-
// Checksums of the original blocks.
repeated fixed64 block_checksums = 4;
+
+ reserved 5;
}
// TChunkMeta is stored in *.meta files on data nodes
diff --git a/yt/yt_proto/yt/core/bus/proto/bus.proto b/yt/yt_proto/yt/core/bus/proto/bus.proto
index daf9d5f083..96638368f5 100644
--- a/yt/yt_proto/yt/core/bus/proto/bus.proto
+++ b/yt/yt_proto/yt/core/bus/proto/bus.proto
@@ -4,10 +4,12 @@ import "yt_proto/yt/core/misc/proto/guid.proto";
message THandshake
{
- required NYT.NProto.TGuid foreign_connection_id = 1;
+ required NYT.NProto.TGuid connection_id = 1;
// Only passed from client to server.
optional int32 multiplexing_band = 2; // EMultiplexingBand
optional int32 encryption_mode = 3; // EEncryptionMode
+
+ optional int32 verification_mode = 4; // EVerificationMode
}