diff options
author | pavook <pavook@yandex-team.com> | 2025-02-03 18:37:14 +0300 |
---|---|---|
committer | pavook <pavook@yandex-team.com> | 2025-02-03 18:59:06 +0300 |
commit | ca1b99bb0935aa604330db8d93bb366bc4bb10c9 (patch) | |
tree | 924bfc7b8ff6a857ba981d4cfdf05101ca15598b | |
parent | dee5ef359ee7e109cc0112fbfa66ab696ecf489e (diff) | |
download | ydb-ca1b99bb0935aa604330db8d93bb366bc4bb10c9.tar.gz |
YT-22307: Support signatures in distributed table commands
commit_hash:2adc5c269658ca17ee322b5ca1079c7562d9d22e
-rw-r--r-- | yt/yt/client/api/distributed_table_client.cpp | 19 | ||||
-rw-r--r-- | yt/yt/client/api/distributed_table_client.h | 7 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/client_base.cpp | 8 | ||||
-rw-r--r-- | yt/yt/client/driver/distributed_table_commands.cpp | 65 | ||||
-rw-r--r-- | yt/yt/client/ya.make | 1 |
5 files changed, 82 insertions, 18 deletions
diff --git a/yt/yt/client/api/distributed_table_client.cpp b/yt/yt/client/api/distributed_table_client.cpp new file mode 100644 index 0000000000..b3bdd3eb58 --- /dev/null +++ b/yt/yt/client/api/distributed_table_client.cpp @@ -0,0 +1,19 @@ +#include "distributed_table_client.h" + +#include <yt/yt/client/signature/signature.h> + +#include <yt/yt/core/ytree/fluent.h> + +namespace NYT::NApi { + +/////////////////////////////////////////////////////////////////////////////// + +void TDistributedWriteSessionWithCookies::Register(TRegistrar registrar) +{ + registrar.Parameter("session", &TThis::Session); + registrar.Parameter("cookies", &TThis::Cookies); +} + +/////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NApi diff --git a/yt/yt/client/api/distributed_table_client.h b/yt/yt/client/api/distributed_table_client.h index 3cb3640cb0..4286e68ee9 100644 --- a/yt/yt/client/api/distributed_table_client.h +++ b/yt/yt/client/api/distributed_table_client.h @@ -6,16 +6,23 @@ #include <yt/yt/client/table_client/config.h> +#include <yt/yt/core/ytree/yson_struct.h> + namespace NYT::NApi { //////////////////////////////////////////////////////////////////////////////// struct TDistributedWriteSessionWithCookies + : public NYTree::TYsonStructLite { // TDistributedWriteSession. TSignedDistributedWriteSessionPtr Session; // std::vector<TWriteFragmentCookie>. std::vector<TSignedWriteFragmentCookiePtr> Cookies; + + REGISTER_YSON_STRUCT_LITE(TDistributedWriteSessionWithCookies) + + static void Register(TRegistrar registrar); }; struct TDistributedWriteSessionWithResults diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp index bfeda9ed87..5fd70dd7c3 100644 --- a/yt/yt/client/api/rpc_proxy/client_base.cpp +++ b/yt/yt/client/api/rpc_proxy/client_base.cpp @@ -814,10 +814,10 @@ TFuture<TDistributedWriteSessionWithCookies> TClientBase::StartDistributedWriteS for (const auto& cookie : result->signed_cookies()) { cookies.push_back(ConvertTo<TSignedWriteFragmentCookiePtr>(TYsonString(cookie))); } - return TDistributedWriteSessionWithCookies{ - .Session = ConvertTo<TSignedDistributedWriteSessionPtr>(TYsonString(result->signed_session())), - .Cookies = std::move(cookies), - }; + TDistributedWriteSessionWithCookies sessionWithCookies; + sessionWithCookies.Session = ConvertTo<TSignedDistributedWriteSessionPtr>(TYsonString(result->signed_session())), + sessionWithCookies.Cookies = std::move(cookies); + return std::move(sessionWithCookies); })); } diff --git a/yt/yt/client/driver/distributed_table_commands.cpp b/yt/yt/client/driver/distributed_table_commands.cpp index bfb81c20c3..80812a03af 100644 --- a/yt/yt/client/driver/distributed_table_commands.cpp +++ b/yt/yt/client/driver/distributed_table_commands.cpp @@ -8,7 +8,9 @@ #include <yt/yt/client/formats/config.h> #include <yt/yt/client/formats/parser.h> +#include <yt/yt/client/signature/generator.h> #include <yt/yt/client/signature/signature.h> +#include <yt/yt/client/signature/validator.h> #include <yt/yt/client/ypath/public.h> @@ -40,9 +42,14 @@ void TStartDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context) { auto transaction = AttachTransaction(context, /*required*/ false); - auto sessionAndCookies = WaitFor(context->GetClient()->StartDistributedWriteSession( - Path, - Options)); + auto signatureGenerator = context->GetDriver()->GetSignatureGenerator(); + auto sessionAndCookies = WaitFor(context->GetClient()->StartDistributedWriteSession(Path, Options)) + .ValueOrThrow(); + + signatureGenerator->Sign(sessionAndCookies.Session.Underlying()); + for (const auto& cookie : sessionAndCookies.Cookies) { + signatureGenerator->Sign(cookie.Underlying()); + } ProduceOutput(context, [sessionAndCookies = std::move(sessionAndCookies)] (IYsonConsumer* consumer) { Serialize(sessionAndCookies, consumer); @@ -63,13 +70,27 @@ void TFinishDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context auto session = ConvertTo<TSignedDistributedWriteSessionPtr>(Session); auto results = ConvertTo<std::vector<NTableClient::TSignedWriteFragmentResultPtr>>(Results); - WaitFor(context->GetClient()->FinishDistributedWriteSession( - TDistributedWriteSessionWithResults{ - .Session = std::move(session), - .Results = std::move(results), - }, - Options)) - .ThrowOnError(); + auto validator = context->GetDriver()->GetSignatureValidator(); + std::vector<TFuture<bool>> validationFutures; + validationFutures.reserve(1 + results.size()); + validationFutures.emplace_back(validator->Validate(session.Underlying())); + for (const auto& result : results) { + validationFutures.emplace_back(validator->Validate(result.Underlying())); + } + + auto validationResults = WaitFor(AllSucceeded(std::move(validationFutures))) + .ValueOrThrow(); + bool allValid = std::all_of(validationResults.begin(), validationResults.end(), [] (bool value) { + return value; + }); + THROW_ERROR_EXCEPTION_UNLESS( + allValid, + "Signature validation failed for distributed write session finish"); + + TDistributedWriteSessionWithResults sessionWithResults(std::move(session), std::move(results)); + + WaitFor(context->GetClient()->FinishDistributedWriteSession(sessionWithResults, Options)) + .ThrowOnError(); ProduceEmptyOutput(context); } @@ -91,13 +112,25 @@ NApi::ITableWriterPtr TWriteTableFragmentCommand::CreateTableWriter( { PutMethodInfoInTraceContext("write_table_fragment"); + auto signedCookie = ConvertTo<TSignedWriteFragmentCookiePtr>(Cookie); + auto validationSuccessful = WaitFor(context->GetDriver()->GetSignatureValidator()->Validate(signedCookie.Underlying())) + .ValueOrThrow(); + + if (!validationSuccessful) { + auto concreteCookie = ConvertTo<TWriteFragmentCookie>(signedCookie.Underlying()->Payload()); + + THROW_ERROR_EXCEPTION( + "Signature validation failed for write table fragment") + << TErrorAttribute("session_id", concreteCookie.SessionId) + << TErrorAttribute("cookie_id", concreteCookie.CookieId); + } + auto tableWriter = WaitFor(context ->GetClient() ->CreateTableFragmentWriter( - ConvertTo<TSignedWriteFragmentCookiePtr>(Cookie), + signedCookie, TTypedCommand<TTableFragmentWriterOptions>::Options)) - .ValueOrThrow(); - + .ValueOrThrow(); TableWriter = tableWriter; return tableWriter; } @@ -111,7 +144,11 @@ void TWriteTableFragmentCommand::DoExecute(ICommandContextPtr context) // Sadly, we are plagued by virtual bases :/. auto writer = DynamicPointerCast<NApi::ITableFragmentWriter>(TableWriter); - ProduceOutput(context, [result = writer->GetWriteFragmentResult()] (IYsonConsumer* consumer) { + + auto signedWriteResult = writer->GetWriteFragmentResult(); + context->GetDriver()->GetSignatureGenerator()->Sign(signedWriteResult.Underlying()); + + ProduceOutput(context, [result = std::move(signedWriteResult)] (IYsonConsumer* consumer) { Serialize( *result.Underlying(), consumer); diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index 72b5671542..ddc3439867 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -14,6 +14,7 @@ SRCS( api/delegating_client.cpp api/delegating_transaction.cpp api/distributed_table_session.cpp + api/distributed_table_client.cpp api/etc_client.cpp api/journal_client.cpp api/operation_client.cpp |