diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-02-27 16:27:23 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-02-27 17:00:48 +0300 |
commit | 6c35e9daf4dc86464e1a262236b8d3593e690ee5 (patch) | |
tree | 2727d5da08d12182c1a705ec7213a6b97f5da521 /yt | |
parent | 6fa64684d15341dd701bf1498ecafdd764d5f097 (diff) | |
download | ydb-6c35e9daf4dc86464e1a262236b8d3593e690ee5.tar.gz |
Intermediate changes
Diffstat (limited to 'yt')
25 files changed, 285 insertions, 66 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 22f1253e2e..4548577834 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -44,6 +44,8 @@ #include <yt/cpp/mapreduce/raw_client/raw_requests.h> #include <yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h> +#include <yt/yt/core/ytree/fluent.h> + #include <library/cpp/json/json_reader.h> #include <util/generic/algorithm.h> @@ -62,6 +64,32 @@ namespace NDetail { //////////////////////////////////////////////////////////////////////////////// +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +THashMap<TString, TString> ParseProxyUrlAliasingRules(TString envConfig) +{ + if (envConfig.empty()) { + return {}; + } + return NYTree::ConvertTo<THashMap<TString, TString>>(NYson::TYsonString(envConfig)); +} + +void ApplyProxyUrlAliasingRules(TString& url) +{ + static auto rules = ParseProxyUrlAliasingRules(GetEnv("YT_PROXY_URL_ALIASING_CONFIG")); + if (auto ruleIt = rules.find(url); ruleIt != rules.end()) { + url = ruleIt->second; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + TClientBase::TClientBase( const TClientContext& context, const TTransactionId& transactionId, @@ -1333,8 +1361,10 @@ TClientPtr CreateClientImpl( context.ProxyAddress = options.ProxyAddress_; context.ServerName = serverName; + if (context.ServerName.find('.') == TString::npos && - context.ServerName.find(':') == TString::npos) + context.ServerName.find(':') == TString::npos && + context.ServerName.find("localhost") == TString::npos) { context.ServerName += ".yt.yandex.net"; } @@ -1419,6 +1449,8 @@ IClientPtr CreateClientFromEnv(const TCreateClientOptions& options) ythrow yexception() << "YT_PROXY is not set"; } + NDetail::ApplyProxyUrlAliasingRules(serverName); + return NDetail::CreateClientImpl(serverName, options); } diff --git a/yt/yt/client/api/internal_client.cpp b/yt/yt/client/api/internal_client.cpp index 2fb7da7506..8c0e2f4ee5 100644 --- a/yt/yt/client/api/internal_client.cpp +++ b/yt/yt/client/api/internal_client.cpp @@ -8,18 +8,26 @@ void TSerializableHunkDescriptor::Register(TRegistrar registrar) { registrar.BaseClassParameter("chunk_id", &TThis::ChunkId); registrar.BaseClassParameter("erasure_codec", &TThis::ErasureCodec) - .Optional(); + .Default(NErasure::ECodec::None); registrar.BaseClassParameter("block_index", &TThis::BlockIndex); registrar.BaseClassParameter("block_offset", &TThis::BlockOffset); registrar.BaseClassParameter("block_size", &TThis::BlockSize) - .Optional(); + .Default(std::nullopt); registrar.BaseClassParameter("length", &TThis::Length); } -TSerializableHunkDescriptor::TSerializableHunkDescriptor(const THunkDescriptor& descriptor) - : THunkDescriptor(descriptor) +TSerializableHunkDescriptorPtr CreateSerializableHunkDescriptor(const THunkDescriptor& descriptor) { - ::NYT::NYTree::TYsonStructRegistry::Get()->InitializeStruct(this); + auto serializableDescriptor = New<TSerializableHunkDescriptor>(); + serializableDescriptor->ChunkId = descriptor.ChunkId; + serializableDescriptor->ErasureCodec = descriptor.ErasureCodec; + serializableDescriptor->BlockIndex = descriptor.BlockIndex; + serializableDescriptor->BlockOffset = descriptor.BlockOffset; + serializableDescriptor->BlockSize = descriptor.BlockSize; + serializableDescriptor->Length = descriptor.Length; + serializableDescriptor->Postprocess(); + + return serializableDescriptor; } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/internal_client.h b/yt/yt/client/api/internal_client.h index 01fa29340b..61a2f300ab 100644 --- a/yt/yt/client/api/internal_client.h +++ b/yt/yt/client/api/internal_client.h @@ -29,8 +29,6 @@ class TSerializableHunkDescriptor , public NYTree::TYsonStruct { public: - TSerializableHunkDescriptor(const THunkDescriptor& descriptor); - REGISTER_YSON_STRUCT(TSerializableHunkDescriptor); static void Register(TRegistrar registrar); @@ -38,6 +36,8 @@ public: using TSerializableHunkDescriptorPtr = TIntrusivePtr<TSerializableHunkDescriptor>; +TSerializableHunkDescriptorPtr CreateSerializableHunkDescriptor(const THunkDescriptor& descriptor); + //////////////////////////////////////////////////////////////////////////////// struct TReadHunksOptions diff --git a/yt/yt/client/driver/internal_commands.cpp b/yt/yt/client/driver/internal_commands.cpp index 889737f91a..a69011714e 100644 --- a/yt/yt/client/driver/internal_commands.cpp +++ b/yt/yt/client/driver/internal_commands.cpp @@ -75,7 +75,7 @@ void TWriteHunksCommand::DoExecute(ICommandContextPtr context) std::vector<NApi::TSerializableHunkDescriptorPtr> serializableDescriptors; serializableDescriptors.reserve(descriptors.size()); for (const auto& descriptor : descriptors) { - serializableDescriptors.push_back(New<NApi::TSerializableHunkDescriptor>(descriptor)); + serializableDescriptors.push_back(CreateSerializableHunkDescriptor(descriptor)); } context->ProduceOutputValue(BuildYsonStringFluently() diff --git a/yt/yt/core/bus/tcp/client.cpp b/yt/yt/core/bus/tcp/client.cpp index 049727b860..6fe8a08856 100644 --- a/yt/yt/core/bus/tcp/client.cpp +++ b/yt/yt/core/bus/tcp/client.cpp @@ -175,16 +175,18 @@ public: auto id = TConnectionId::Create(); - YT_LOG_DEBUG("Connecting to server (Address: %v, ConnectionId: %v, MultiplexingBand: %v, EncryptionMode: %v)", + YT_LOG_DEBUG("Connecting to server (Address: %v, ConnectionId: %v, MultiplexingBand: %v, EncryptionMode: %v, VerificationMode: %v)", EndpointDescription_, id, options.MultiplexingBand, - Config_->EncryptionMode); + Config_->EncryptionMode, + Config_->VerificationMode); auto endpointAttributes = ConvertToAttributes(BuildYsonStringFluently() .BeginMap() .Items(*EndpointAttributes_) .Item("connection_id").Value(id) + .Item("connection_type").Value(EConnectionType::Client) .EndMap()); auto poller = TTcpDispatcher::TImpl::Get()->GetXferPoller(); diff --git a/yt/yt/core/bus/tcp/connection.cpp b/yt/yt/core/bus/tcp/connection.cpp index 84e0f050e7..a235ec6290 100644 --- a/yt/yt/core/bus/tcp/connection.cpp +++ b/yt/yt/core/bus/tcp/connection.cpp @@ -121,11 +121,12 @@ TTcpConnection::TTcpConnection( , UnixDomainSocketPath_(unixDomainSocketPath) , Handler_(std::move(handler)) , Poller_(std::move(poller)) - , LoggingTag_(Format("ConnectionId: %v, ConnectionType: %v, RemoteAddress: %v, EncryptionMode: %v", + , LoggingTag_(Format("ConnectionId: %v, ConnectionType: %v, RemoteAddress: %v, EncryptionMode: %v, VerificationMode: %v", Id_, ConnectionType_, EndpointDescription_, - Config_->EncryptionMode)) + Config_->EncryptionMode, + Config_->VerificationMode)) , Logger(BusLogger.WithTag(LoggingTag_.c_str())) , GenerateChecksums_(Config_->GenerateChecksums) , Socket_(socket) @@ -287,11 +288,12 @@ void TTcpConnection::TryEnqueueHandshake() } NProto::THandshake handshake; - ToProto(handshake.mutable_foreign_connection_id(), Id_); + ToProto(handshake.mutable_connection_id(), Id_); if (ConnectionType_ == EConnectionType::Client) { handshake.set_multiplexing_band(ToProto<int>(MultiplexingBand_.load())); } - handshake.set_encryption_mode(static_cast<int>(EncryptionMode_)); + handshake.set_encryption_mode(ToProto<int>(EncryptionMode_)); + handshake.set_verification_mode(ToProto<int>(VerificationMode_)); auto message = MakeHandshakeMessage(handshake); auto messageSize = GetByteSize(message); @@ -500,6 +502,9 @@ void TTcpConnection::Abort(const TError& error) // Construct a detailed error. YT_VERIFY(!error.IsOK()); auto detailedError = error << *EndpointAttributes_; + if (PeerAttributes_) { + detailedError <<= *PeerAttributes_; + } { auto guard = Guard(Lock_); @@ -752,6 +757,9 @@ void TTcpConnection::Terminate(const TError& error) // Construct a detailed error. YT_VERIFY(!error.IsOK()); auto detailedError = error << *EndpointAttributes_; + if (PeerAttributes_) { + detailedError <<= *PeerAttributes_; + } auto guard = Guard(Lock_); @@ -1005,7 +1013,7 @@ ssize_t TTcpConnection::DoReadSocket(char* buffer, size_t size) case ESslState::Established: { auto result = SSL_read(Ssl_.get(), buffer, size); if (PendingSslHandshake_ && result > 0) { - YT_LOG_DEBUG("TLS/SSL connection has been established by SSL_read (VerificationMode: %v)", VerificationMode_); + YT_LOG_DEBUG("TLS/SSL connection has been established by SSL_read"); PendingSslHandshake_ = false; ReadyPromise_.TrySet(); } @@ -1178,10 +1186,18 @@ bool TTcpConnection::OnHandshakePacketReceived() ? std::make_optional(FromProto<EMultiplexingBand>(handshake.multiplexing_band())) : std::nullopt; - YT_LOG_DEBUG("Handshake received (ForeignConnectionId: %v, MultiplexingBand: %v, ForeignEncryptionMode: %v)", - FromProto<TConnectionId>(handshake.foreign_connection_id()), - optionalMultiplexingBand, - static_cast<EEncryptionMode>(handshake.encryption_mode())); + PeerAttributes_ = ConvertToAttributes(BuildYsonStringFluently() + .BeginMap() + .Item("peer_connection_id").Value(FromProto<TConnectionId>(handshake.connection_id())) + .Item("peer_encryption_mode").Value(FromProto<EEncryptionMode>(handshake.encryption_mode())) + .Item("peer_verification_mode").Value(FromProto<EVerificationMode>(handshake.verification_mode())) + .EndMap()); + + YT_LOG_DEBUG("Handshake received (PeerConnectionId: %v, PeerEncryptionMode: %v, PeerVerificationMode: %v, MultiplexingBand: %v)", + PeerAttributes_->Get<TString>("peer_connection_id"), + PeerAttributes_->Get<TString>("peer_encryption_mode"), + PeerAttributes_->Get<TString>("peer_verification_mode"), + optionalMultiplexingBand); if (ConnectionType_ == EConnectionType::Server && optionalMultiplexingBand) { auto guard = Guard(Lock_); @@ -1197,7 +1213,7 @@ bool TTcpConnection::OnHandshakePacketReceived() TryEnqueueHandshake(); } - auto otherEncryptionMode = handshake.has_encryption_mode() ? static_cast<EEncryptionMode>(handshake.encryption_mode()) : EEncryptionMode::Disabled; + auto otherEncryptionMode = handshake.has_encryption_mode() ? FromProto<EEncryptionMode>(handshake.encryption_mode()) : EEncryptionMode::Disabled; if (EncryptionMode_ == EEncryptionMode::Required || otherEncryptionMode == EEncryptionMode::Required) { if (EncryptionMode_ == EEncryptionMode::Disabled || otherEncryptionMode == EEncryptionMode::Disabled) { @@ -1299,7 +1315,7 @@ ssize_t TTcpConnection::DoWriteFragments(const std::vector<struct iovec>& vec) YT_ASSERT(vec.size() == 1); auto result = SSL_write(Ssl_.get(), vec[0].iov_base, vec[0].iov_len); if (PendingSslHandshake_ && result > 0) { - YT_LOG_DEBUG("TLS/SSL connection has been established by SSL_write (VerificationMode: %v)", VerificationMode_); + YT_LOG_DEBUG("TLS/SSL connection has been established by SSL_write"); PendingSslHandshake_ = false; ReadyPromise_.TrySet(); } @@ -1857,7 +1873,7 @@ bool TTcpConnection::DoSslHandshake() auto result = SSL_do_handshake(Ssl_.get()); switch (SSL_get_error(Ssl_.get(), result)) { case SSL_ERROR_NONE: - YT_LOG_DEBUG("TLS/SSL connection has been established by SSL_do_handshake (VerificationMode %v)", VerificationMode_); + YT_LOG_DEBUG("TLS/SSL connection has been established by SSL_do_handshake"); MaxFragmentsPerWrite_ = 1; SslState_ = ESslState::Established; ReadyPromise_.TrySet(); @@ -1918,7 +1934,7 @@ void TTcpConnection::TryEstablishSslSession() return; } - YT_LOG_DEBUG("Starting TLS/SSL connection (VerificationMode: %v)", VerificationMode_); + YT_LOG_DEBUG("Starting TLS/SSL connection"); if (Config_->LoadCertsFromBusCertsDirectory && !TTcpDispatcher::TImpl::Get()->GetBusCertsDirectoryPath()) { Abort(TError(NBus::EErrorCode::SslError, "bus_certs_directory_path is not set in tcp_dispatcher config")); diff --git a/yt/yt/core/bus/tcp/connection.h b/yt/yt/core/bus/tcp/connection.h index 39c5dea4d2..0f5d758152 100644 --- a/yt/yt/core/bus/tcp/connection.h +++ b/yt/yt/core/bus/tcp/connection.h @@ -277,6 +277,8 @@ private: const EEncryptionMode EncryptionMode_; const EVerificationMode VerificationMode_; + NYTree::IAttributeDictionaryPtr PeerAttributes_; + size_t MaxFragmentsPerWrite_ = 256; void Open(); diff --git a/yt/yt/core/crypto/tls.cpp b/yt/yt/core/crypto/tls.cpp index a79a0c9458..f2e9ae036a 100644 --- a/yt/yt/core/crypto/tls.cpp +++ b/yt/yt/core/crypto/tls.cpp @@ -54,6 +54,24 @@ struct TSslContextImpl TSslContextImpl() { + Reset(); + } + + ~TSslContextImpl() + { + if (Ctx) { + SSL_CTX_free(Ctx); + } + if (ActiveCtx_) { + SSL_CTX_free(ActiveCtx_); + } + } + + void Reset() + { + if (Ctx) { + SSL_CTX_free(Ctx); + } #if OPENSSL_VERSION_NUMBER >= 0x10100000L Ctx = SSL_CTX_new(TLS_method()); if (!Ctx) { @@ -77,12 +95,37 @@ struct TSslContextImpl #endif } - ~TSslContextImpl() + void Commit() { - if (Ctx) { - SSL_CTX_free(Ctx); + SSL_CTX* oldCtx; + YT_ASSERT(Ctx); + { + auto guard = WriterGuard(Lock_); + oldCtx = ActiveCtx_; + ActiveCtx_ = Ctx; + Ctx = nullptr; + } + if (oldCtx) { + SSL_CTX_free(oldCtx); } } + + SSL* NewSsl() + { + auto guard = ReaderGuard(Lock_); + YT_ASSERT(ActiveCtx_); + return SSL_new(ActiveCtx_); + } + + bool IsActive(const SSL* ssl) + { + auto guard = ReaderGuard(Lock_); + return SSL_get_SSL_CTX(ssl) == ActiveCtx_; + } + +private: + YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, Lock_); + SSL_CTX* ActiveCtx_ = nullptr; }; DEFINE_REFCOUNTED_TYPE(TSslContextImpl) @@ -104,7 +147,7 @@ public: , Invoker_(CreateSerializedInvoker(poller->GetInvoker(), "crypto_tls_connection")) , Underlying_(std::move(connection)) { - Ssl_ = SSL_new(Ctx_->Ctx); + Ssl_ = Ctx_->NewSsl(); if (!Ssl_) { THROW_ERROR_EXCEPTION("SSL_new failed") << GetLastSslError(); @@ -124,6 +167,11 @@ public: OutputBuffer_ = TSharedMutableRef::Allocate<TTlsBufferTag>(TlsBufferSize); } + void SetHost(const TString& host) + { + SSL_set_tlsext_host_name(Ssl_, host.c_str()); + } + ~TTlsConnection() { SSL_free(Ssl_); @@ -496,12 +544,16 @@ public: , Poller_(std::move(poller)) { } - TFuture<IConnectionPtr> Dial(const TNetworkAddress& remote) override + TFuture<IConnectionPtr> Dial(const TNetworkAddress& remote, TRemoteContextPtr context) override { - return Underlying_->Dial(remote).Apply(BIND([ctx = Ctx_, poller = Poller_] (const IConnectionPtr& underlying) -> IConnectionPtr { - auto connection = New<TTlsConnection>(ctx, poller, underlying); - connection->StartClient(); - return connection; + return Underlying_->Dial(remote) + .Apply(BIND([ctx = Ctx_, poller = Poller_, context = std::move(context)](const IConnectionPtr& underlying) -> IConnectionPtr { + auto connection = New<TTlsConnection>(ctx, poller, underlying); + if (context != nullptr && context->Host != std::nullopt) { + connection->SetHost(*(context->Host)); + } + connection->StartClient(); + return connection; })); } @@ -558,6 +610,22 @@ TSslContext::TSslContext() : Impl_(New<TSslContextImpl>()) { } +void TSslContext::Reset() +{ + Impl_->Reset(); +} + +void TSslContext::Commit(TInstant time) +{ + CommitTime_ = time; + Impl_->Commit(); +} + +TInstant TSslContext::GetCommitTime() +{ + return CommitTime_; +} + void TSslContext::UseBuiltinOpenSslX509Store() { SSL_CTX_set_cert_store(Impl_->Ctx, GetBuiltinOpenSslX509Store().Release()); diff --git a/yt/yt/core/crypto/tls.h b/yt/yt/core/crypto/tls.h index 5844d3f021..bb9f85503f 100644 --- a/yt/yt/core/crypto/tls.h +++ b/yt/yt/core/crypto/tls.h @@ -20,6 +20,10 @@ class TSslContext public: TSslContext(); + void Reset(); + void Commit(TInstant time = TInstant::Zero()); + TInstant GetCommitTime(); + void UseBuiltinOpenSslX509Store(); void SetCipherList(const TString& list); @@ -48,6 +52,7 @@ public: private: const TIntrusivePtr<TSslContextImpl> Impl_; + TInstant CommitTime_; }; DEFINE_REFCOUNTED_TYPE(TSslContext) diff --git a/yt/yt/core/crypto/unittests/tls_ut.cpp b/yt/yt/core/crypto/unittests/tls_ut.cpp index 2981b24350..288994abaf 100644 --- a/yt/yt/core/crypto/unittests/tls_ut.cpp +++ b/yt/yt/core/crypto/unittests/tls_ut.cpp @@ -37,6 +37,7 @@ public: Context->AddCertificate(TestCertificate); Context->AddPrivateKey(TestCertificate); + Context->Commit(); Poller = CreateThreadPoolPoller(2, "TlsTest"); } @@ -78,7 +79,10 @@ TEST_F(TTlsTest, SimplePingPong) config->SetDefaults(); auto dialer = Context->CreateDialer(config, Poller, NetLogger); - auto asyncFirstSide = dialer->Dial(listener->GetAddress()); + auto context = New<TRemoteContext>(); + context->Host = "localhost"; + + auto asyncFirstSide = dialer->Dial(listener->GetAddress(), context); auto asyncSecondSide = listener->Accept(); auto firstSide = asyncFirstSide.Get().ValueOrThrow(); @@ -106,6 +110,7 @@ TEST(TTlsTestWithoutFixture, LoadCertificateChain) auto grpcLock = NRpc::NGrpc::TDispatcher::Get()->GetLibraryLock(); auto context = New<TSslContext>(); context->AddCertificateChain(TestCertificateChain); + context->Commit(); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/http/client.cpp b/yt/yt/core/http/client.cpp index 49ab4286f7..b6eb5181be 100644 --- a/yt/yt/core/http/client.cpp +++ b/yt/yt/core/http/client.cpp @@ -126,11 +126,15 @@ private: return TNetworkAddress(address, parsedUrl.Port.value_or(GetDefaultPort(parsedUrl))); } - std::pair<THttpOutputPtr, THttpInputPtr> OpenHttp(const TNetworkAddress& address) + std::pair<THttpOutputPtr, THttpInputPtr> OpenHttp(const TUrlRef& urlRef) { + auto context = New<TRemoteContext>(); + context->Host = urlRef.Host; + auto address = GetAddress(urlRef); + // TODO(aleexfi): Enable connection pool by default if (Config_->MaxIdleConnections == 0) { - auto connection = WaitFor(Dialer_->Dial(address)).ValueOrThrow(); + auto connection = WaitFor(Dialer_->Dial(address, std::move(context))).ValueOrThrow(); auto input = New<THttpInput>( connection, @@ -146,7 +150,7 @@ private: return {std::move(output), std::move(input)}; } else { - auto connection = WaitFor(ConnectionPool_->Connect(address)).ValueOrThrow(); + auto connection = WaitFor(ConnectionPool_->Connect(address, std::move(context))).ValueOrThrow(); auto reuseSharedState = New<NDetail::TReusableConnectionState>(connection, ConnectionPool_); @@ -239,8 +243,8 @@ private: THttpInputPtr response; auto urlRef = ParseUrl(url); - auto address = GetAddress(urlRef); - std::tie(request, response) = OpenHttp(address); + + std::tie(request, response) = OpenHttp(urlRef); request->SetHost(urlRef.Host, urlRef.PortStr); if (headers) { diff --git a/yt/yt/core/http/connection_pool.cpp b/yt/yt/core/http/connection_pool.cpp index 9cb971b8ab..3684bda591 100644 --- a/yt/yt/core/http/connection_pool.cpp +++ b/yt/yt/core/http/connection_pool.cpp @@ -52,7 +52,9 @@ TConnectionPool::~TConnectionPool() YT_UNUSED_FUTURE(ExpiredConnectionsCollector_->Stop()); } -TFuture<IConnectionPtr> TConnectionPool::Connect(const TNetworkAddress& address) +TFuture<IConnectionPtr> TConnectionPool::Connect( + const TNetworkAddress& address, + TRemoteContextPtr context) { { auto guard = Guard(SpinLock_); @@ -64,7 +66,7 @@ TFuture<IConnectionPtr> TConnectionPool::Connect(const TNetworkAddress& address) } } - return Dialer_->Dial(address); + return Dialer_->Dial(address, std::move(context)); } void TConnectionPool::Release(const IConnectionPtr& connection) diff --git a/yt/yt/core/http/connection_pool.h b/yt/yt/core/http/connection_pool.h index b7c06245bd..026626967f 100644 --- a/yt/yt/core/http/connection_pool.h +++ b/yt/yt/core/http/connection_pool.h @@ -38,7 +38,9 @@ public: ~TConnectionPool(); - TFuture<NNet::IConnectionPtr> Connect(const NNet::TNetworkAddress& address); + TFuture<NNet::IConnectionPtr> Connect( + const NNet::TNetworkAddress& address, + NNet::TRemoteContextPtr context = nullptr); void Release(const NNet::IConnectionPtr& connection); diff --git a/yt/yt/core/https/client.cpp b/yt/yt/core/https/client.cpp index 2f4b415b89..8ce559a583 100644 --- a/yt/yt/core/https/client.cpp +++ b/yt/yt/core/https/client.cpp @@ -120,6 +120,7 @@ IClientPtr CreateClient( } else { sslContext->UseBuiltinOpenSslX509Store(); } + sslContext->Commit(); auto tlsDialer = sslContext->CreateDialer( New<TDialerConfig>(), diff --git a/yt/yt/core/https/config.cpp b/yt/yt/core/https/config.cpp index 49c59d5258..c41fde9624 100644 --- a/yt/yt/core/https/config.cpp +++ b/yt/yt/core/https/config.cpp @@ -10,6 +10,8 @@ void TServerCredentialsConfig::Register(TRegistrar registrar) .Optional(); registrar.Parameter("cert_chain", &TThis::CertChain) .Optional(); + registrar.Parameter("update_period", &TThis::UpdatePeriod) + .Optional(); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/https/config.h b/yt/yt/core/https/config.h index fe4decc26e..eb6f010008 100644 --- a/yt/yt/core/https/config.h +++ b/yt/yt/core/https/config.h @@ -16,6 +16,7 @@ class TServerCredentialsConfig public: NCrypto::TPemBlobConfigPtr PrivateKey; NCrypto::TPemBlobConfigPtr CertChain; + TDuration UpdatePeriod; REGISTER_YSON_STRUCT(TServerCredentialsConfig); diff --git a/yt/yt/core/https/server.cpp b/yt/yt/core/https/server.cpp index e53f132c04..96fd155390 100644 --- a/yt/yt/core/https/server.cpp +++ b/yt/yt/core/https/server.cpp @@ -2,15 +2,23 @@ #include "config.h" #include <yt/yt/core/http/server.h> +#include <yt/yt/core/http/private.h> #include <yt/yt/core/crypto/tls.h> +#include <yt/yt/core/logging/log.h> + +#include <yt/yt/core/misc/fs.h> + #include <yt/yt/core/net/address.h> #include <yt/yt/core/concurrency/poller.h> +#include <yt/yt/core/concurrency/periodic_executor.h> namespace NYT::NHttps { +static const auto& Logger = NHttp::HttpLogger; + using namespace NNet; using namespace NHttp; using namespace NCrypto; @@ -22,8 +30,9 @@ class TServer : public IServer { public: - explicit TServer(IServerPtr underlying) + explicit TServer(IServerPtr underlying, TPeriodicExecutorPtr certificateUpdater) : Underlying_(std::move(underlying)) + , CertificateUpdater_(certificateUpdater) { } void AddHandler( @@ -42,12 +51,18 @@ public: void Start() override { Underlying_->Start(); + if (CertificateUpdater_) { + CertificateUpdater_->Start(); + } } //! Stops the server. void Stop() override { Underlying_->Stop(); + if (CertificateUpdater_) { + YT_UNUSED_FUTURE(CertificateUpdater_->Stop()); + } } void SetPathMatcher(const IRequestPathMatcherPtr& matcher) override @@ -62,28 +77,66 @@ public: private: const IServerPtr Underlying_; + const TPeriodicExecutorPtr CertificateUpdater_; }; -IServerPtr CreateServer( - const TServerConfigPtr& config, - const IPollerPtr& poller, - const IPollerPtr& acceptor) +static void ApplySslConfig(const TSslContextPtr& sslContext, const TServerCredentialsConfigPtr& sslConfig) { - auto sslContext = New<TSslContext>(); - if (config->Credentials->CertChain->FileName) { - sslContext->AddCertificateChainFromFile(*config->Credentials->CertChain->FileName); - } else if (config->Credentials->CertChain->Value) { - sslContext->AddCertificateChain(*config->Credentials->CertChain->Value); + if (sslConfig->CertChain->FileName) { + sslContext->AddCertificateChainFromFile(*sslConfig->CertChain->FileName); + } else if (sslConfig->CertChain->Value) { + sslContext->AddCertificateChain(*sslConfig->CertChain->Value); } else { YT_ABORT(); } - if (config->Credentials->PrivateKey->FileName) { - sslContext->AddPrivateKeyFromFile(*config->Credentials->PrivateKey->FileName); - } else if (config->Credentials->PrivateKey->Value) { - sslContext->AddPrivateKey(*config->Credentials->PrivateKey->Value); + if (sslConfig->PrivateKey->FileName) { + sslContext->AddPrivateKeyFromFile(*sslConfig->PrivateKey->FileName); + } else if (sslConfig->PrivateKey->Value) { + sslContext->AddPrivateKey(*sslConfig->PrivateKey->Value); } else { YT_ABORT(); } +} + +IServerPtr CreateServer( + const TServerConfigPtr& config, + const IPollerPtr& poller, + const IPollerPtr& acceptor) +{ + auto sslContext = New<TSslContext>(); + ApplySslConfig(sslContext, config->Credentials); + sslContext->Commit(); + + auto sslConfig = config->Credentials; + TPeriodicExecutorPtr certificateUpdater; + if (sslConfig->UpdatePeriod && + sslConfig->CertChain->FileName && + sslConfig->PrivateKey->FileName) + { + certificateUpdater = New<TPeriodicExecutor>( + poller->GetInvoker(), + BIND([=, serverName = config->ServerName] { + try { + auto modificationTime = Max( + NFS::GetPathStatistics(*sslConfig->CertChain->FileName).ModificationTime, + NFS::GetPathStatistics(*sslConfig->PrivateKey->FileName).ModificationTime); + + // Detect fresh and stable updates. + if (modificationTime > sslContext->GetCommitTime() && + modificationTime + sslConfig->UpdatePeriod <= TInstant::Now()) + { + YT_LOG_INFO("Updating TLS certificates (ServerName: %v, ModificationTime: %v)", serverName, modificationTime); + sslContext->Reset(); + ApplySslConfig(sslContext, sslConfig); + sslContext->Commit(modificationTime); + YT_LOG_INFO("TLS certificates updated (ServerName: %v)", serverName); + } + } catch (const std::exception& ex) { + YT_LOG_WARNING(ex, "Unexpected exception while updating TLS certificates (ServerName: %v)", serverName); + } + }), + sslConfig->UpdatePeriod); + } auto address = TNetworkAddress::CreateIPv6Any(config->Port); auto tlsListener = sslContext->CreateListener(address, poller, acceptor); @@ -92,7 +145,7 @@ IServerPtr CreateServer( configCopy->IsHttps = true; auto httpServer = NHttp::CreateServer(configCopy, tlsListener, poller, acceptor); - return New<TServer>(std::move(httpServer)); + return New<TServer>(std::move(httpServer), std::move(certificateUpdater)); } IServerPtr CreateServer(const TServerConfigPtr& config, const IPollerPtr& poller) diff --git a/yt/yt/core/net/dialer.cpp b/yt/yt/core/net/dialer.cpp index 1a23091b91..c27489d23b 100644 --- a/yt/yt/core/net/dialer.cpp +++ b/yt/yt/core/net/dialer.cpp @@ -85,7 +85,9 @@ public: , Poller_(std::move(poller)) { } - TFuture<IConnectionPtr> Dial(const TNetworkAddress& remote) override + TFuture<IConnectionPtr> Dial( + const TNetworkAddress& remote, + TRemoteContextPtr /*context*/) override { auto session = New<TDialSession>( remote, diff --git a/yt/yt/core/net/dialer.h b/yt/yt/core/net/dialer.h index b5a6ce3b2a..8dbd67cc43 100644 --- a/yt/yt/core/net/dialer.h +++ b/yt/yt/core/net/dialer.h @@ -14,12 +14,25 @@ namespace NYT::NNet { //////////////////////////////////////////////////////////////////////////////// +//! Сontext that is passed to the Dialer. + +struct TRemoteContext + : public TRefCounted +{ + //! Host is used for TlsDialer. + std::optional<TString> Host; +}; + +DEFINE_REFCOUNTED_TYPE(TRemoteContext) + //! Dialer establishes connection to a (resolved) network address. struct IDialer : public virtual TRefCounted { - virtual TFuture<IConnectionPtr> Dial(const TNetworkAddress& remote) = 0; + virtual TFuture<IConnectionPtr> Dial( + const TNetworkAddress& remote, + TRemoteContextPtr context = nullptr) = 0; }; DEFINE_REFCOUNTED_TYPE(IDialer) diff --git a/yt/yt/core/net/mock/dialer.cpp b/yt/yt/core/net/mock/dialer.cpp index 6e30a087a5..4b3582b1b5 100644 --- a/yt/yt/core/net/mock/dialer.cpp +++ b/yt/yt/core/net/mock/dialer.cpp @@ -7,7 +7,7 @@ namespace NYT::NNet { TDialerMock::TDialerMock(IDialerPtr underlying) : Underlying_(std::move(underlying)) { - ON_CALL(*this, Dial).WillByDefault([this] (const TNetworkAddress& address) { + ON_CALL(*this, Dial).WillByDefault([this] (const TNetworkAddress& address, TRemoteContextPtr /*context*/) { return Underlying_->Dial(address); }); } diff --git a/yt/yt/core/net/mock/dialer.h b/yt/yt/core/net/mock/dialer.h index 707a7b3dad..10a9237f09 100644 --- a/yt/yt/core/net/mock/dialer.h +++ b/yt/yt/core/net/mock/dialer.h @@ -14,7 +14,7 @@ class TDialerMock public: explicit TDialerMock(IDialerPtr underlying); - MOCK_METHOD(TFuture<IConnectionPtr>, Dial, (const TNetworkAddress& remote), (override)); + MOCK_METHOD(TFuture<IConnectionPtr>, Dial, (const TNetworkAddress& remote, TRemoteContextPtr context), (override)); private: const IDialerPtr Underlying_; diff --git a/yt/yt/core/net/public.h b/yt/yt/core/net/public.h index 05d3ec03de..42566ad7fa 100644 --- a/yt/yt/core/net/public.h +++ b/yt/yt/core/net/public.h @@ -17,6 +17,7 @@ DECLARE_REFCOUNTED_STRUCT(IPacketConnection) DECLARE_REFCOUNTED_STRUCT(IConnectionReader) DECLARE_REFCOUNTED_STRUCT(IConnectionWriter) DECLARE_REFCOUNTED_STRUCT(IListener) +DECLARE_REFCOUNTED_STRUCT(TRemoteContext) DECLARE_REFCOUNTED_STRUCT(IDialer) DECLARE_REFCOUNTED_STRUCT(IAsyncDialer) DECLARE_REFCOUNTED_STRUCT(IAsyncDialerSession) diff --git a/yt/yt/library/program/config.h b/yt/yt/library/program/config.h index 10b6fd5b06..64723698f2 100644 --- a/yt/yt/library/program/config.h +++ b/yt/yt/library/program/config.h @@ -29,7 +29,6 @@ #include <library/cpp/yt/stockpile/stockpile.h> - namespace NYT { //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt_proto/yt/client/chunk_client/proto/chunk_meta.proto b/yt/yt_proto/yt/client/chunk_client/proto/chunk_meta.proto index 874ec93151..e6aad0b3c0 100644 --- a/yt/yt_proto/yt/client/chunk_client/proto/chunk_meta.proto +++ b/yt/yt_proto/yt/client/chunk_client/proto/chunk_meta.proto @@ -146,11 +146,10 @@ message TStripedErasurePlacementExt // Sizes of the original blocks. repeated int64 block_sizes = 3; - // Some of the input blocks can be padded to fit into the segment. - repeated int64 block_padding_sizes = 5; - // Checksums of the original blocks. repeated fixed64 block_checksums = 4; + + reserved 5; } // TChunkMeta is stored in *.meta files on data nodes diff --git a/yt/yt_proto/yt/core/bus/proto/bus.proto b/yt/yt_proto/yt/core/bus/proto/bus.proto index daf9d5f083..96638368f5 100644 --- a/yt/yt_proto/yt/core/bus/proto/bus.proto +++ b/yt/yt_proto/yt/core/bus/proto/bus.proto @@ -4,10 +4,12 @@ import "yt_proto/yt/core/misc/proto/guid.proto"; message THandshake { - required NYT.NProto.TGuid foreign_connection_id = 1; + required NYT.NProto.TGuid connection_id = 1; // Only passed from client to server. optional int32 multiplexing_band = 2; // EMultiplexingBand optional int32 encryption_mode = 3; // EEncryptionMode + + optional int32 verification_mode = 4; // EVerificationMode } |