aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2025-04-08 23:11:49 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2025-04-08 23:22:05 +0300
commit27b0bd14698f3db17c8a2c3fd040daeab91c3075 (patch)
treeb5f8afc6ec69c32d6bd77cc2f6a15902e22eb5e7
parentc94b2ebe613bc396a23432873400286cfc7f7ef1 (diff)
downloadydb-27b0bd14698f3db17c8a2c3fd040daeab91c3075.tar.gz
Intermediate changes
commit_hash:bf469eb9f85927cfef8fb4151f740c34dd092add
-rw-r--r--yt/yt/client/driver/distributed_table_commands.cpp53
-rw-r--r--yt/yt/client/driver/distributed_table_commands.h10
-rw-r--r--yt/yt/client/driver/driver.cpp9
3 files changed, 44 insertions, 28 deletions
diff --git a/yt/yt/client/driver/distributed_table_commands.cpp b/yt/yt/client/driver/distributed_table_commands.cpp
index 0f26f86cbf0..c40869201ed 100644
--- a/yt/yt/client/driver/distributed_table_commands.cpp
+++ b/yt/yt/client/driver/distributed_table_commands.cpp
@@ -12,6 +12,10 @@
#include <yt/yt/client/signature/signature.h>
#include <yt/yt/client/signature/validator.h>
+#include <yt/yt/client/table_client/adapters.h>
+#include <yt/yt/client/table_client/table_output.h>
+#include <yt/yt/client/table_client/value_consumer.h>
+
#include <yt/yt/client/ypath/public.h>
#include <yt/yt/library/formats/format.h>
@@ -25,6 +29,7 @@ namespace NYT::NDriver {
using namespace NApi;
using namespace NConcurrency;
using namespace NFormats;
+using namespace NTableClient;
using namespace NTracing;
using namespace NYTree;
using namespace NYson;
@@ -35,6 +40,12 @@ using namespace NYPath;
void TStartDistributedWriteSessionCommand::Register(TRegistrar registrar)
{
registrar.Parameter("path", &TThis::Path);
+ registrar.ParameterWithUniversalAccessor<int>(
+ "cookie_count",
+ [] (TThis* command) -> auto& {
+ return command->Options.CookieCount;
+ })
+ .Default();
}
// -> DistributedWriteSession
@@ -91,23 +102,18 @@ void TFinishDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context
WaitFor(context->GetClient()->FinishDistributedWriteSession(sessionWithResults, Options))
.ThrowOnError();
-
- ProduceEmptyOutput(context);
}
////////////////////////////////////////////////////////////////////////////////
-void TWriteTableFragmentCommand::Execute(ICommandContextPtr context)
-{
- TTypedCommand<NApi::TTableFragmentWriterOptions>::Execute(std::move(context));
-}
-
void TWriteTableFragmentCommand::Register(TRegistrar registrar)
{
registrar.Parameter("cookie", &TThis::Cookie);
+ registrar.Parameter("max_row_buffer_size", &TThis::MaxRowBufferSize)
+ .Default(1_MB);
}
-NApi::ITableWriterPtr TWriteTableFragmentCommand::CreateTableWriter(
+ITableFragmentWriterPtr TWriteTableFragmentCommand::CreateTableWriter(
const ICommandContextPtr& context)
{
PutMethodInfoInTraceContext("write_table_fragment");
@@ -125,14 +131,12 @@ NApi::ITableWriterPtr TWriteTableFragmentCommand::CreateTableWriter(
<< TErrorAttribute("cookie_id", concreteCookie.CookieId);
}
- auto tableWriter = WaitFor(context
+ return WaitFor(context
->GetClient()
->CreateTableFragmentWriter(
signedCookie,
TTypedCommand<TTableFragmentWriterOptions>::Options))
.ValueOrThrow();
- TableWriter = tableWriter;
- return tableWriter;
}
// -> Cookie
@@ -140,12 +144,31 @@ void TWriteTableFragmentCommand::DoExecute(ICommandContextPtr context)
{
auto cookie = ConvertTo<TSignedWriteFragmentCookiePtr>(Cookie);
- DoExecuteImpl(context);
+ auto tableWriter = CreateTableWriter(context);
+
+ // NB(pavook): we shouldn't ping transaction here, as this method is executed in parallel
+ // and pinging the transaction could cause substantial master load.
+
+ auto schemalessWriter = CreateSchemalessFromApiWriterAdapter(static_cast<ITableWriterPtr>(tableWriter));
- // Sadly, we are plagued by virtual bases :/.
- auto writer = DynamicPointerCast<NApi::ITableFragmentWriter>(TableWriter);
+ TWritingValueConsumer valueConsumer(
+ schemalessWriter,
+ ConvertTo<TTypeConversionConfigPtr>(context->GetInputFormat().Attributes()),
+ MaxRowBufferSize);
+
+ TTableOutput output(CreateParserForFormat(
+ context->GetInputFormat(),
+ &valueConsumer));
+
+ PipeInputToOutput(context->Request().InputStream, &output);
+
+ WaitFor(valueConsumer.Flush())
+ .ThrowOnError();
+
+ WaitFor(schemalessWriter->Close())
+ .ThrowOnError();
- auto signedWriteResult = writer->GetWriteFragmentResult();
+ auto signedWriteResult = tableWriter->GetWriteFragmentResult();
context->GetDriver()->GetSignatureGenerator()->Resign(signedWriteResult.Underlying());
ProduceOutput(context, [result = std::move(signedWriteResult)] (IYsonConsumer* consumer) {
diff --git a/yt/yt/client/driver/distributed_table_commands.h b/yt/yt/client/driver/distributed_table_commands.h
index 57fcfd7c6df..36bd404e6ac 100644
--- a/yt/yt/client/driver/distributed_table_commands.h
+++ b/yt/yt/client/driver/distributed_table_commands.h
@@ -48,13 +48,8 @@ private:
// -> Cookie
class TWriteTableFragmentCommand
: public TTypedCommand<NApi::TTableFragmentWriterOptions>
- , private TWriteTableCommand
{
public:
- // Shadow normal execute in order to fix
- // ambiguity in dispatch.
- void Execute(ICommandContextPtr context) override;
-
REGISTER_YSON_STRUCT_LITE(TWriteTableFragmentCommand);
static void Register(TRegistrar registrar);
@@ -63,10 +58,9 @@ private:
using TBase = TWriteTableCommand;
NYTree::INodePtr Cookie;
- TRefCountedPtr TableWriter;
+ i64 MaxRowBufferSize;
- NApi::ITableWriterPtr CreateTableWriter(
- const ICommandContextPtr& context) override;
+ NApi::ITableFragmentWriterPtr CreateTableWriter(const ICommandContextPtr& context);
void DoExecute(ICommandContextPtr context) override;
};
diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp
index 0fa6393238e..b21748f9439 100644
--- a/yt/yt/client/driver/driver.cpp
+++ b/yt/yt/client/driver/driver.cpp
@@ -398,6 +398,10 @@ public:
REGISTER (TReadShuffleDataCommand, "read_shuffle_data", Null, Tabular, false, true, ApiVersion4);
REGISTER (TWriteShuffleDataCommand, "write_shuffle_data", Tabular, Structured, false, true, ApiVersion4);
+ REGISTER (TStartDistributedWriteSessionCommand, "start_distributed_write_session", Null, Structured, true, false, ApiVersion4);
+ REGISTER (TFinishDistributedWriteSessionCommand, "finish_distributed_write_session", Null, Null, true, false, ApiVersion4);
+ REGISTER (TWriteTableFragmentCommand, "write_table_fragment", Tabular, Structured, true, true, ApiVersion4);
+
if (Config_->EnableInternalCommands) {
REGISTER_ALL(TReadHunksCommand, "read_hunks", Null, Structured, false, true );
REGISTER_ALL(TWriteHunksCommand, "write_hunks", Null, Structured, true, true );
@@ -408,11 +412,6 @@ public:
REGISTER_ALL(TRevokeLeaseCommand, "revoke_lease", Null, Structured, true, false);
REGISTER_ALL(TReferenceLeaseCommand, "reference_lease", Null, Structured, true, false);
REGISTER_ALL(TUnreferenceLeaseCommand, "unreference_lease", Null, Structured, true, false);
-
- // TODO(arkady-e1ppa): flags past command name might be complete rubbish -- think them through later.
- REGISTER_ALL(TStartDistributedWriteSessionCommand, "start_distributed_write_session", Null, Structured, true, false);
- REGISTER_ALL(TFinishDistributedWriteSessionCommand, "finish_distributed_write_session", Null, Null, true, false);
- REGISTER_ALL(TWriteTableFragmentCommand, "distributed_write_table_partition", Tabular, Structured, true, true );
REGISTER_ALL(TForsakeChaosCoordinator, "forsake_chaos_coordinator", Null, Null, true, true);
}