diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2025-04-08 23:11:49 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2025-04-08 23:22:05 +0300 |
commit | 27b0bd14698f3db17c8a2c3fd040daeab91c3075 (patch) | |
tree | b5f8afc6ec69c32d6bd77cc2f6a15902e22eb5e7 | |
parent | c94b2ebe613bc396a23432873400286cfc7f7ef1 (diff) | |
download | ydb-27b0bd14698f3db17c8a2c3fd040daeab91c3075.tar.gz |
Intermediate changes
commit_hash:bf469eb9f85927cfef8fb4151f740c34dd092add
-rw-r--r-- | yt/yt/client/driver/distributed_table_commands.cpp | 53 | ||||
-rw-r--r-- | yt/yt/client/driver/distributed_table_commands.h | 10 | ||||
-rw-r--r-- | yt/yt/client/driver/driver.cpp | 9 |
3 files changed, 44 insertions, 28 deletions
diff --git a/yt/yt/client/driver/distributed_table_commands.cpp b/yt/yt/client/driver/distributed_table_commands.cpp index 0f26f86cbf0..c40869201ed 100644 --- a/yt/yt/client/driver/distributed_table_commands.cpp +++ b/yt/yt/client/driver/distributed_table_commands.cpp @@ -12,6 +12,10 @@ #include <yt/yt/client/signature/signature.h> #include <yt/yt/client/signature/validator.h> +#include <yt/yt/client/table_client/adapters.h> +#include <yt/yt/client/table_client/table_output.h> +#include <yt/yt/client/table_client/value_consumer.h> + #include <yt/yt/client/ypath/public.h> #include <yt/yt/library/formats/format.h> @@ -25,6 +29,7 @@ namespace NYT::NDriver { using namespace NApi; using namespace NConcurrency; using namespace NFormats; +using namespace NTableClient; using namespace NTracing; using namespace NYTree; using namespace NYson; @@ -35,6 +40,12 @@ using namespace NYPath; void TStartDistributedWriteSessionCommand::Register(TRegistrar registrar) { registrar.Parameter("path", &TThis::Path); + registrar.ParameterWithUniversalAccessor<int>( + "cookie_count", + [] (TThis* command) -> auto& { + return command->Options.CookieCount; + }) + .Default(); } // -> DistributedWriteSession @@ -91,23 +102,18 @@ void TFinishDistributedWriteSessionCommand::DoExecute(ICommandContextPtr context WaitFor(context->GetClient()->FinishDistributedWriteSession(sessionWithResults, Options)) .ThrowOnError(); - - ProduceEmptyOutput(context); } //////////////////////////////////////////////////////////////////////////////// -void TWriteTableFragmentCommand::Execute(ICommandContextPtr context) -{ - TTypedCommand<NApi::TTableFragmentWriterOptions>::Execute(std::move(context)); -} - void TWriteTableFragmentCommand::Register(TRegistrar registrar) { registrar.Parameter("cookie", &TThis::Cookie); + registrar.Parameter("max_row_buffer_size", &TThis::MaxRowBufferSize) + .Default(1_MB); } -NApi::ITableWriterPtr TWriteTableFragmentCommand::CreateTableWriter( +ITableFragmentWriterPtr TWriteTableFragmentCommand::CreateTableWriter( const ICommandContextPtr& context) { PutMethodInfoInTraceContext("write_table_fragment"); @@ -125,14 +131,12 @@ NApi::ITableWriterPtr TWriteTableFragmentCommand::CreateTableWriter( << TErrorAttribute("cookie_id", concreteCookie.CookieId); } - auto tableWriter = WaitFor(context + return WaitFor(context ->GetClient() ->CreateTableFragmentWriter( signedCookie, TTypedCommand<TTableFragmentWriterOptions>::Options)) .ValueOrThrow(); - TableWriter = tableWriter; - return tableWriter; } // -> Cookie @@ -140,12 +144,31 @@ void TWriteTableFragmentCommand::DoExecute(ICommandContextPtr context) { auto cookie = ConvertTo<TSignedWriteFragmentCookiePtr>(Cookie); - DoExecuteImpl(context); + auto tableWriter = CreateTableWriter(context); + + // NB(pavook): we shouldn't ping transaction here, as this method is executed in parallel + // and pinging the transaction could cause substantial master load. + + auto schemalessWriter = CreateSchemalessFromApiWriterAdapter(static_cast<ITableWriterPtr>(tableWriter)); - // Sadly, we are plagued by virtual bases :/. - auto writer = DynamicPointerCast<NApi::ITableFragmentWriter>(TableWriter); + TWritingValueConsumer valueConsumer( + schemalessWriter, + ConvertTo<TTypeConversionConfigPtr>(context->GetInputFormat().Attributes()), + MaxRowBufferSize); + + TTableOutput output(CreateParserForFormat( + context->GetInputFormat(), + &valueConsumer)); + + PipeInputToOutput(context->Request().InputStream, &output); + + WaitFor(valueConsumer.Flush()) + .ThrowOnError(); + + WaitFor(schemalessWriter->Close()) + .ThrowOnError(); - auto signedWriteResult = writer->GetWriteFragmentResult(); + auto signedWriteResult = tableWriter->GetWriteFragmentResult(); context->GetDriver()->GetSignatureGenerator()->Resign(signedWriteResult.Underlying()); ProduceOutput(context, [result = std::move(signedWriteResult)] (IYsonConsumer* consumer) { diff --git a/yt/yt/client/driver/distributed_table_commands.h b/yt/yt/client/driver/distributed_table_commands.h index 57fcfd7c6df..36bd404e6ac 100644 --- a/yt/yt/client/driver/distributed_table_commands.h +++ b/yt/yt/client/driver/distributed_table_commands.h @@ -48,13 +48,8 @@ private: // -> Cookie class TWriteTableFragmentCommand : public TTypedCommand<NApi::TTableFragmentWriterOptions> - , private TWriteTableCommand { public: - // Shadow normal execute in order to fix - // ambiguity in dispatch. - void Execute(ICommandContextPtr context) override; - REGISTER_YSON_STRUCT_LITE(TWriteTableFragmentCommand); static void Register(TRegistrar registrar); @@ -63,10 +58,9 @@ private: using TBase = TWriteTableCommand; NYTree::INodePtr Cookie; - TRefCountedPtr TableWriter; + i64 MaxRowBufferSize; - NApi::ITableWriterPtr CreateTableWriter( - const ICommandContextPtr& context) override; + NApi::ITableFragmentWriterPtr CreateTableWriter(const ICommandContextPtr& context); void DoExecute(ICommandContextPtr context) override; }; diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp index 0fa6393238e..b21748f9439 100644 --- a/yt/yt/client/driver/driver.cpp +++ b/yt/yt/client/driver/driver.cpp @@ -398,6 +398,10 @@ public: REGISTER (TReadShuffleDataCommand, "read_shuffle_data", Null, Tabular, false, true, ApiVersion4); REGISTER (TWriteShuffleDataCommand, "write_shuffle_data", Tabular, Structured, false, true, ApiVersion4); + REGISTER (TStartDistributedWriteSessionCommand, "start_distributed_write_session", Null, Structured, true, false, ApiVersion4); + REGISTER (TFinishDistributedWriteSessionCommand, "finish_distributed_write_session", Null, Null, true, false, ApiVersion4); + REGISTER (TWriteTableFragmentCommand, "write_table_fragment", Tabular, Structured, true, true, ApiVersion4); + if (Config_->EnableInternalCommands) { REGISTER_ALL(TReadHunksCommand, "read_hunks", Null, Structured, false, true ); REGISTER_ALL(TWriteHunksCommand, "write_hunks", Null, Structured, true, true ); @@ -408,11 +412,6 @@ public: REGISTER_ALL(TRevokeLeaseCommand, "revoke_lease", Null, Structured, true, false); REGISTER_ALL(TReferenceLeaseCommand, "reference_lease", Null, Structured, true, false); REGISTER_ALL(TUnreferenceLeaseCommand, "unreference_lease", Null, Structured, true, false); - - // TODO(arkady-e1ppa): flags past command name might be complete rubbish -- think them through later. - REGISTER_ALL(TStartDistributedWriteSessionCommand, "start_distributed_write_session", Null, Structured, true, false); - REGISTER_ALL(TFinishDistributedWriteSessionCommand, "finish_distributed_write_session", Null, Null, true, false); - REGISTER_ALL(TWriteTableFragmentCommand, "distributed_write_table_partition", Tabular, Structured, true, true ); REGISTER_ALL(TForsakeChaosCoordinator, "forsake_chaos_coordinator", Null, Null, true, true); } |