aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpavook <pavook@yandex-team.com>2025-02-03 18:37:14 +0300
committerpavook <pavook@yandex-team.com>2025-02-03 18:59:06 +0300
commitca1b99bb0935aa604330db8d93bb366bc4bb10c9 (patch)
tree924bfc7b8ff6a857ba981d4cfdf05101ca15598b
parentdee5ef359ee7e109cc0112fbfa66ab696ecf489e (diff)
downloadydb-ca1b99bb0935aa604330db8d93bb366bc4bb10c9.tar.gz
YT-22307: Support signatures in distributed table commands
commit_hash:2adc5c269658ca17ee322b5ca1079c7562d9d22e
-rw-r--r--yt/yt/client/api/distributed_table_client.cpp19
-rw-r--r--yt/yt/client/api/distributed_table_client.h7
-rw-r--r--yt/yt/client/api/rpc_proxy/client_base.cpp8
-rw-r--r--yt/yt/client/driver/distributed_table_commands.cpp65
-rw-r--r--yt/yt/client/ya.make1
5 files changed, 82 insertions, 18 deletions
diff --git a/yt/yt/client/api/distributed_table_client.cpp b/yt/yt/client/api/distributed_table_client.cpp
new file mode 100644
index 0000000000..b3bdd3eb58
--- /dev/null
+++ b/yt/yt/client/api/distributed_table_client.cpp
@@ -0,0 +1,19 @@
+#include "distributed_table_client.h"
+
+#include <yt/yt/client/signature/signature.h>
+
+#include <yt/yt/core/ytree/fluent.h>
+
+namespace NYT::NApi {
+
+///////////////////////////////////////////////////////////////////////////////
+
+void TDistributedWriteSessionWithCookies::Register(TRegistrar registrar)
+{
+ registrar.Parameter("session", &TThis::Session);
+ registrar.Parameter("cookies", &TThis::Cookies);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NApi
diff --git a/yt/yt/client/api/distributed_table_client.h b/yt/yt/client/api/distributed_table_client.h
index 3cb3640cb0..4286e68ee9 100644
--- a/yt/yt/client/api/distributed_table_client.h
+++ b/yt/yt/client/api/distributed_table_client.h
@@ -6,16 +6,23 @@
#include <yt/yt/client/table_client/config.h>
+#include <yt/yt/core/ytree/yson_struct.h>
+
namespace NYT::NApi {
////////////////////////////////////////////////////////////////////////////////
struct TDistributedWriteSessionWithCookies
+ : public NYTree::TYsonStructLite
{
// TDistributedWriteSession.
TSignedDistributedWriteSessionPtr Session;
// std::vector<TWriteFragmentCookie>.
std::vector<TSignedWriteFragmentCookiePtr> Cookies;
+
+ REGISTER_YSON_STRUCT_LITE(TDistributedWriteSessionWithCookies)
+
+ static void Register(TRegistrar registrar);
};
struct TDistributedWriteSessionWithResults
diff --git a/yt/yt/client/api/rpc_proxy/client_base.cpp b/yt/yt/client/api/rpc_proxy/client_base.cpp
index bfeda9ed87..5fd70dd7c3 100644
--- a/yt/yt/client/api/rpc_proxy/client_base.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_base.cpp
@@ -814,10 +814,10 @@ TFuture<TDistributedWriteSessionWithCookies> TClientBase::StartDistributedWriteS
for (const auto& cookie : result->signed_cookies()) {
cookies.push_back(ConvertTo<TSignedWriteFragmentCookiePtr>(TYsonString(cookie)));
}
- return TDistributedWriteSessionWithCookies{
- .Session = ConvertTo<TSignedDistributedWriteSessionPtr>(TYsonString(result->signed_session())),
- .Cookies = std::move(cookies),
- };
+ TDistributedWriteSessionWithCookies sessionWithCookies;
+ sessionWithCookies.Session = ConvertTo<TSignedDistributedWriteSessionPtr>(TYsonString(result->signed_session())),
+ sessionWithCookies.Cookies = std::move(cookies);
+ return std::move(sessionWithCookies);
}));
}
diff --git a/yt/yt/client/driver/distributed_table_commands.cpp b/yt/yt/client/driver/distributed_table_commands.cpp
index bfb81c20c3..80812a03af 100644
--- a/yt/yt/client/driver/distributed_table_commands.cpp
+++ b/yt/yt/client/driver/distributed_table_commands.cpp
@@ -8,7 +8,9 @@
#include <yt/yt/client/formats/config.h>
#include <yt/yt/client/formats/parser.h>
+#include <yt/yt/client/signature/generator.h>
#include <yt/yt/client/signature/signature.h>
+#include <yt/yt/client/signature/validator.h>
#include <yt/yt/client/ypath/public.h>
@@ -40,9 +42,14 @@ void TStartDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context)
{
auto transaction = AttachTransaction(context, /*required*/ false);
- auto sessionAndCookies = WaitFor(context->GetClient()->StartDistributedWriteSession(
- Path,
- Options));
+ auto signatureGenerator = context->GetDriver()->GetSignatureGenerator();
+ auto sessionAndCookies = WaitFor(context->GetClient()->StartDistributedWriteSession(Path, Options))
+ .ValueOrThrow();
+
+ signatureGenerator->Sign(sessionAndCookies.Session.Underlying());
+ for (const auto& cookie : sessionAndCookies.Cookies) {
+ signatureGenerator->Sign(cookie.Underlying());
+ }
ProduceOutput(context, [sessionAndCookies = std::move(sessionAndCookies)] (IYsonConsumer* consumer) {
Serialize(sessionAndCookies, consumer);
@@ -63,13 +70,27 @@ void TFinishDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context
auto session = ConvertTo<TSignedDistributedWriteSessionPtr>(Session);
auto results = ConvertTo<std::vector<NTableClient::TSignedWriteFragmentResultPtr>>(Results);
- WaitFor(context->GetClient()->FinishDistributedWriteSession(
- TDistributedWriteSessionWithResults{
- .Session = std::move(session),
- .Results = std::move(results),
- },
- Options))
- .ThrowOnError();
+ auto validator = context->GetDriver()->GetSignatureValidator();
+ std::vector<TFuture<bool>> validationFutures;
+ validationFutures.reserve(1 + results.size());
+ validationFutures.emplace_back(validator->Validate(session.Underlying()));
+ for (const auto& result : results) {
+ validationFutures.emplace_back(validator->Validate(result.Underlying()));
+ }
+
+ auto validationResults = WaitFor(AllSucceeded(std::move(validationFutures)))
+ .ValueOrThrow();
+ bool allValid = std::all_of(validationResults.begin(), validationResults.end(), [] (bool value) {
+ return value;
+ });
+ THROW_ERROR_EXCEPTION_UNLESS(
+ allValid,
+ "Signature validation failed for distributed write session finish");
+
+ TDistributedWriteSessionWithResults sessionWithResults(std::move(session), std::move(results));
+
+ WaitFor(context->GetClient()->FinishDistributedWriteSession(sessionWithResults, Options))
+ .ThrowOnError();
ProduceEmptyOutput(context);
}
@@ -91,13 +112,25 @@ NApi::ITableWriterPtr TWriteTableFragmentCommand::CreateTableWriter(
{
PutMethodInfoInTraceContext("write_table_fragment");
+ auto signedCookie = ConvertTo<TSignedWriteFragmentCookiePtr>(Cookie);
+ auto validationSuccessful = WaitFor(context->GetDriver()->GetSignatureValidator()->Validate(signedCookie.Underlying()))
+ .ValueOrThrow();
+
+ if (!validationSuccessful) {
+ auto concreteCookie = ConvertTo<TWriteFragmentCookie>(signedCookie.Underlying()->Payload());
+
+ THROW_ERROR_EXCEPTION(
+ "Signature validation failed for write table fragment")
+ << TErrorAttribute("session_id", concreteCookie.SessionId)
+ << TErrorAttribute("cookie_id", concreteCookie.CookieId);
+ }
+
auto tableWriter = WaitFor(context
->GetClient()
->CreateTableFragmentWriter(
- ConvertTo<TSignedWriteFragmentCookiePtr>(Cookie),
+ signedCookie,
TTypedCommand<TTableFragmentWriterOptions>::Options))
- .ValueOrThrow();
-
+ .ValueOrThrow();
TableWriter = tableWriter;
return tableWriter;
}
@@ -111,7 +144,11 @@ void TWriteTableFragmentCommand::DoExecute(ICommandContextPtr context)
// Sadly, we are plagued by virtual bases :/.
auto writer = DynamicPointerCast<NApi::ITableFragmentWriter>(TableWriter);
- ProduceOutput(context, [result = writer->GetWriteFragmentResult()] (IYsonConsumer* consumer) {
+
+ auto signedWriteResult = writer->GetWriteFragmentResult();
+ context->GetDriver()->GetSignatureGenerator()->Sign(signedWriteResult.Underlying());
+
+ ProduceOutput(context, [result = std::move(signedWriteResult)] (IYsonConsumer* consumer) {
Serialize(
*result.Underlying(),
consumer);
diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make
index 72b5671542..ddc3439867 100644
--- a/yt/yt/client/ya.make
+++ b/yt/yt/client/ya.make
@@ -14,6 +14,7 @@ SRCS(
api/delegating_client.cpp
api/delegating_transaction.cpp
api/distributed_table_session.cpp
+ api/distributed_table_client.cpp
api/etc_client.cpp
api/journal_client.cpp
api/operation_client.cpp