diff options
author | apollo1321 <apollo1321@yandex-team.com> | 2024-10-24 20:38:44 +0300 |
---|---|---|
committer | apollo1321 <apollo1321@yandex-team.com> | 2024-10-24 20:49:36 +0300 |
commit | 71cfb7652c50b626f60cf145b918e4e91acc86d0 (patch) | |
tree | fb9d9571e96149a19754f03d302d0d8a953ef9a5 | |
parent | 90b20905ab5a295ad19fb1f9cc73c6d17f8b1b89 (diff) | |
download | ydb-71cfb7652c50b626f60cf145b918e4e91acc86d0.tar.gz |
YT-21709: Shuffle Service MVP, no TX
Алгоритм описан тут: <HIDDEN_URL>
commit_hash:a93dc0df1cab8b523c165f4538be1d417741c161
-rw-r--r-- | yt/yt/client/api/rpc_proxy/client_impl.cpp | 80 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/connection_impl.cpp | 4 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/connection_impl.h | 2 | ||||
-rw-r--r-- | yt/yt/client/driver/driver.cpp | 6 | ||||
-rw-r--r-- | yt/yt/client/driver/shuffle_commands.cpp | 136 | ||||
-rw-r--r-- | yt/yt/client/driver/shuffle_commands.h | 79 | ||||
-rw-r--r-- | yt/yt/client/driver/ya.make | 5 |
7 files changed, 288 insertions, 24 deletions
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index 430b366714..4289591079 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -3,6 +3,8 @@ #include "config.h" #include "helpers.h" #include "private.h" +#include "row_batch_reader.h" +#include "row_batch_writer.h" #include "table_mount_cache.h" #include "table_writer.h" #include "timestamp_provider.h" @@ -106,13 +108,13 @@ void TClient::Terminate() //////////////////////////////////////////////////////////////////////////////// -IChannelPtr TClient::CreateSequoiaAwareRetryingChannel(NRpc::IChannelPtr channel, bool retryProxyBanned) const +IChannelPtr TClient::CreateSequoiaAwareRetryingChannel(IChannelPtr channel, bool retryProxyBanned) const { const auto& config = Connection_->GetConfig(); bool retrySequoiaErrorsOnly = !config->EnableRetries; // NB: even if client's retries are disabled Sequoia transient failures are // still retriable. See IsRetriableError(). - return NRpc::CreateRetryingChannel( + return CreateRetryingChannel( config->RetryingChannel, std::move(channel), BIND([=] (const TError& error) { @@ -123,7 +125,7 @@ IChannelPtr TClient::CreateSequoiaAwareRetryingChannel(NRpc::IChannelPtr channel IChannelPtr TClient::CreateNonRetryingChannelByAddress(const std::string& address) const { return CreateCredentialsInjectingChannel( - Connection_->CreateChannelByAddress(address), + Connection_->CreateChannelByAddress(TString(address)), ClientOptions_); } @@ -766,7 +768,7 @@ TFuture<ITableWriterPtr> TClient::CreateFragmentTableWriter( FillRequest(req.Get(), cookie, options); auto schema = New<TTableSchema>(); - return NRpc::CreateRpcClientOutputStream( + return CreateRpcClientOutputStream( std::move(req), BIND ([=] (const TSharedRef& metaRef) { NApi::NRpcProxy::NProto::TWriteTableMeta meta; @@ -2678,34 +2680,74 @@ TFuture<TGetFlowViewResult> TClient::GetFlowView( } TFuture<TShuffleHandlePtr> TClient::StartShuffle( - const TString& /*account*/, - int /*partitionCount*/, - const TStartShuffleOptions& /*options*/) + const TString& account, + int partitionCount, + const TStartShuffleOptions& options) { - YT_UNIMPLEMENTED(); + auto proxy = CreateApiServiceProxy(); + + auto req = proxy.StartShuffle(); + SetTimeoutOptions(*req, options); + + req->set_account(account); + req->set_partition_count(partitionCount); + + return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspStartShufflePtr& rsp) { + return ConvertTo<TShuffleHandlePtr>(TYsonString(rsp->shuffle_handle())); + })); } TFuture<void> TClient::FinishShuffle( - const TShuffleHandlePtr& /*shuffleHandle*/, - const TFinishShuffleOptions& /*options*/) + const TShuffleHandlePtr& shuffleHandle, + const TFinishShuffleOptions& options) { - YT_UNIMPLEMENTED(); + auto proxy = CreateApiServiceProxy(); + + auto req = proxy.FinishShuffle(); + SetTimeoutOptions(*req, options); + + req->set_shuffle_handle(ConvertToYsonString(shuffleHandle).ToString()); + + return req->Invoke().AsVoid(); } TFuture<IRowBatchReaderPtr> TClient::CreateShuffleReader( - const TShuffleHandlePtr& /*shuffleHandle*/, - int /*partitionIndex*/, - const TTableReaderConfigPtr& /*config*/) + const TShuffleHandlePtr& shuffleHandle, + int partitionIndex, + const TTableReaderConfigPtr& config) { - YT_UNIMPLEMENTED(); + auto proxy = CreateApiServiceProxy(); + + auto req = proxy.ReadShuffleData(); + InitStreamingRequest(*req); + + req->set_shuffle_handle(ConvertToYsonString(shuffleHandle).ToString()); + req->set_partition_index(partitionIndex); + req->set_reader_config(ConvertToYsonString(config).ToString()); + + return CreateRpcClientInputStream(std::move(req)) + .ApplyUnique(BIND([] (IAsyncZeroCopyInputStreamPtr&& inputStream) { + return CreateRowBatchReader(std::move(inputStream), false); + })); } TFuture<IRowBatchWriterPtr> TClient::CreateShuffleWriter( - const TShuffleHandlePtr& /*shuffleHandle*/, - const TString& /*partitionColumn*/, - const TTableWriterConfigPtr& /*config*/) + const TShuffleHandlePtr& shuffleHandle, + const TString& partitionColumn, + const TTableWriterConfigPtr& config) { - YT_UNIMPLEMENTED(); + auto proxy = CreateApiServiceProxy(); + auto req = proxy.WriteShuffleData(); + InitStreamingRequest(*req); + + req->set_shuffle_handle(ConvertToYsonString(shuffleHandle).ToString()); + req->set_partition_column(partitionColumn); + req->set_writer_config(ConvertToYsonString(config).ToString()); + + return CreateRpcClientOutputStream(std::move(req)) + .ApplyUnique(BIND([] (IAsyncZeroCopyOutputStreamPtr&& outputStream) { + return CreateRowBatchWriter(std::move(outputStream)); + })); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.cpp b/yt/yt/client/api/rpc_proxy/connection_impl.cpp index 5c2f4a7540..6c76bc35b6 100644 --- a/yt/yt/client/api/rpc_proxy/connection_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/connection_impl.cpp @@ -290,9 +290,9 @@ IChannelPtr TConnection::CreateChannel(bool sticky) return CreateRoamingChannel(std::move(provider)); } -IChannelPtr TConnection::CreateChannelByAddress(const std::string& address) +IChannelPtr TConnection::CreateChannelByAddress(const TString& address) { - return CachingChannelFactory_->CreateChannel(address); + return CachingChannelFactory_->CreateChannel(address.ConstRef()); } TClusterTag TConnection::GetClusterTag() const diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.h b/yt/yt/client/api/rpc_proxy/connection_impl.h index 92d1f5b4ec..8ccdf08e7d 100644 --- a/yt/yt/client/api/rpc_proxy/connection_impl.h +++ b/yt/yt/client/api/rpc_proxy/connection_impl.h @@ -27,7 +27,7 @@ public: ~TConnection(); NRpc::IChannelPtr CreateChannel(bool sticky); - NRpc::IChannelPtr CreateChannelByAddress(const std::string& address); + NRpc::IChannelPtr CreateChannelByAddress(const TString& address); const TConnectionConfigPtr& GetConfig(); diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp index e13634f539..3e57e388df 100644 --- a/yt/yt/client/driver/driver.cpp +++ b/yt/yt/client/driver/driver.cpp @@ -19,6 +19,7 @@ #include "scheduler_commands.h" #include "table_commands.h" #include "transaction_commands.h" +#include "shuffle_commands.h" #include <yt/yt/client/api/client_cache.h> #include <yt/yt/client/api/connection.h> @@ -386,6 +387,11 @@ public: REGISTER (TGetPipelineStateCommand, "get_pipeline_state", Null, Structured, false, false, ApiVersion4); REGISTER (TGetFlowViewCommand, "get_flow_view", Null, Structured, false, false, ApiVersion4); + REGISTER (TStartShuffleCommand, "start_shuffle", Null, Structured, true, false, ApiVersion4); + REGISTER (TFinishShuffleCommand, "finish_shuffle", Null, Structured, true, false, ApiVersion4); + REGISTER (TReadShuffleDataCommand, "read_shuffle_data", Null, Tabular, false, true, ApiVersion4); + REGISTER (TWriteShuffleDataCommand, "write_shuffle_data", Tabular, Structured, false, true, ApiVersion4); + if (Config_->EnableInternalCommands) { REGISTER_ALL(TReadHunksCommand, "read_hunks", Null, Structured, false, true ); REGISTER_ALL(TWriteHunksCommand, "write_hunks", Null, Structured, true, true ); diff --git a/yt/yt/client/driver/shuffle_commands.cpp b/yt/yt/client/driver/shuffle_commands.cpp new file mode 100644 index 0000000000..4e2ee805e8 --- /dev/null +++ b/yt/yt/client/driver/shuffle_commands.cpp @@ -0,0 +1,136 @@ +#include "shuffle_commands.h" + +#include <yt/yt/client/table_client/adapters.h> +#include <yt/yt/client/table_client/table_output.h> +#include <yt/yt/client/table_client/value_consumer.h> + +#include <yt/yt/client/driver/config.h> +#include <yt/yt/client/formats/config.h> + +#include <yt/yt/library/formats/format.h> + +namespace NYT::NDriver { + +using namespace NConcurrency; +using namespace NFormats; +using namespace NTableClient; +using namespace NYson; + +////////////////////////////////////////////////////////////////////////////// + +void TStartShuffleCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("account", &TThis::Account); + registrar.Parameter("partition_count", &TThis::PartitionCount); +} + +void TStartShuffleCommand::DoExecute(ICommandContextPtr context) +{ + auto client = context->GetClient(); + auto asyncResult = client->StartShuffle(Account, PartitionCount, Options); + auto shuffleHandle = WaitFor(asyncResult).ValueOrThrow(); + + context->ProduceOutputValue(ConvertToYsonString(shuffleHandle)); +} + +////////////////////////////////////////////////////////////////////////////// + +void TFinishShuffleCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("shuffle_handle", &TThis::ShuffleHandle); +} + +void TFinishShuffleCommand::DoExecute(ICommandContextPtr context) +{ + auto client = context->GetClient(); + auto asyncResult = client->FinishShuffle(ShuffleHandle, Options); + WaitFor(asyncResult).ThrowOnError(); + + ProduceEmptyOutput(context); +} + +////////////////////////////////////////////////////////////////////////////// + +void TReadShuffleDataCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("shuffle_handle", &TThis::ShuffleHandle); + registrar.Parameter("partition_index", &TThis::PartitionIndex); +} + +void TReadShuffleDataCommand::DoExecute(ICommandContextPtr context) +{ + auto client = context->GetClient(); + + auto reader = WaitFor(context->GetClient()->CreateShuffleReader( + ShuffleHandle, + PartitionIndex, + Options)) + .ValueOrThrow(); + + auto format = context->GetOutputFormat(); + + auto writer = CreateStaticTableWriterForFormat( + format, + reader->GetNameTable(), + /*tableSchemas*/ {New<TTableSchema>()}, + context->Request().OutputStream, + /*enableContextSaving*/ false, + New<TControlAttributesConfig>(), + /*keyColumnCount*/ 0); + + NTableClient::TRowBatchReadOptions options{ + .MaxRowsPerRead = context->GetConfig()->ReadBufferRowCount, + .Columnar = (format.GetType() == EFormatType::Arrow), + }; + + PipeReaderToWriterByBatches( + reader, + writer, + options); +} + +////////////////////////////////////////////////////////////////////////////// + +void TWriteShuffleDataCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("shuffle_handle", &TThis::ShuffleHandle); + registrar.Parameter("partition_column", &TThis::PartitionColumn); + registrar.Parameter("max_row_buffer_size", &TThis::MaxRowBufferSize) + .Default(1_MB); +} + +void TWriteShuffleDataCommand::DoExecute(ICommandContextPtr context) +{ + auto client = context->GetClient(); + + auto writer = WaitFor(context->GetClient()->CreateShuffleWriter( + ShuffleHandle, + PartitionColumn, + Options)) + .ValueOrThrow(); + + auto schemalessWriter = CreateSchemalessFromApiWriterAdapter(std::move(writer)); + + TWritingValueConsumer valueConsumer( + schemalessWriter, + ConvertTo<TTypeConversionConfigPtr>(context->GetInputFormat().Attributes()), + MaxRowBufferSize); + + TTableOutput output(CreateParserForFormat( + context->GetInputFormat(), + &valueConsumer)); + + PipeInputToOutput(context->Request().InputStream, &output); + + WaitFor(valueConsumer.Flush()) + .ThrowOnError(); + + WaitFor(schemalessWriter->Close()) + .ThrowOnError(); + + ProduceEmptyOutput(context); +} + +////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDriver diff --git a/yt/yt/client/driver/shuffle_commands.h b/yt/yt/client/driver/shuffle_commands.h new file mode 100644 index 0000000000..7b4394c49a --- /dev/null +++ b/yt/yt/client/driver/shuffle_commands.h @@ -0,0 +1,79 @@ +#pragma once + +#include "command.h" + +#include <yt/yt/client/ypath/rich.h> + +namespace NYT::NDriver { + +//////////////////////////////////////////////////////////////////////////// + +class TStartShuffleCommand + : public TTypedCommand<NApi::TStartShuffleOptions> +{ +public: + REGISTER_YSON_STRUCT_LITE(TStartShuffleCommand); + + static void Register(TRegistrar registrar); + +private: + TString Account; + int PartitionCount; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////// + +class TFinishShuffleCommand + : public TTypedCommand<NApi::TFinishShuffleOptions> +{ +public: + REGISTER_YSON_STRUCT_LITE(TFinishShuffleCommand); + + static void Register(TRegistrar registrar); + +private: + NApi::TShuffleHandlePtr ShuffleHandle; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////// + +class TReadShuffleDataCommand + : public TTypedCommand<NTableClient::TTableReaderConfigPtr> +{ +public: + REGISTER_YSON_STRUCT_LITE(TReadShuffleDataCommand); + + static void Register(TRegistrar registrar); + +private: + NApi::TShuffleHandlePtr ShuffleHandle; + int PartitionIndex; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////// + +class TWriteShuffleDataCommand + : public TTypedCommand<NTableClient::TTableWriterConfigPtr> +{ +public: + REGISTER_YSON_STRUCT_LITE(TWriteShuffleDataCommand); + + static void Register(TRegistrar registrar); + +private: + NApi::TShuffleHandlePtr ShuffleHandle; + TString PartitionColumn; + i64 MaxRowBufferSize; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDriver diff --git a/yt/yt/client/driver/ya.make b/yt/yt/client/driver/ya.make index f33485ce8f..0e6ee7b1fa 100644 --- a/yt/yt/client/driver/ya.make +++ b/yt/yt/client/driver/ya.make @@ -14,16 +14,17 @@ SRCS( driver.cpp etc_commands.cpp file_commands.cpp + flow_commands.cpp helpers.cpp + internal_commands.cpp journal_commands.cpp proxy_discovery_cache.cpp query_commands.cpp queue_commands.cpp scheduler_commands.cpp + shuffle_commands.cpp table_commands.cpp transaction_commands.cpp - internal_commands.cpp - flow_commands.cpp ) PEERDIR( |