aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorapollo1321 <apollo1321@yandex-team.com>2024-10-24 20:38:44 +0300
committerapollo1321 <apollo1321@yandex-team.com>2024-10-24 20:49:36 +0300
commit71cfb7652c50b626f60cf145b918e4e91acc86d0 (patch)
treefb9d9571e96149a19754f03d302d0d8a953ef9a5
parent90b20905ab5a295ad19fb1f9cc73c6d17f8b1b89 (diff)
downloadydb-71cfb7652c50b626f60cf145b918e4e91acc86d0.tar.gz
YT-21709: Shuffle Service MVP, no TX
Алгоритм описан тут: <HIDDEN_URL> commit_hash:a93dc0df1cab8b523c165f4538be1d417741c161
-rw-r--r--yt/yt/client/api/rpc_proxy/client_impl.cpp80
-rw-r--r--yt/yt/client/api/rpc_proxy/connection_impl.cpp4
-rw-r--r--yt/yt/client/api/rpc_proxy/connection_impl.h2
-rw-r--r--yt/yt/client/driver/driver.cpp6
-rw-r--r--yt/yt/client/driver/shuffle_commands.cpp136
-rw-r--r--yt/yt/client/driver/shuffle_commands.h79
-rw-r--r--yt/yt/client/driver/ya.make5
7 files changed, 288 insertions, 24 deletions
diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp
index 430b366714..4289591079 100644
--- a/yt/yt/client/api/rpc_proxy/client_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp
@@ -3,6 +3,8 @@
#include "config.h"
#include "helpers.h"
#include "private.h"
+#include "row_batch_reader.h"
+#include "row_batch_writer.h"
#include "table_mount_cache.h"
#include "table_writer.h"
#include "timestamp_provider.h"
@@ -106,13 +108,13 @@ void TClient::Terminate()
////////////////////////////////////////////////////////////////////////////////
-IChannelPtr TClient::CreateSequoiaAwareRetryingChannel(NRpc::IChannelPtr channel, bool retryProxyBanned) const
+IChannelPtr TClient::CreateSequoiaAwareRetryingChannel(IChannelPtr channel, bool retryProxyBanned) const
{
const auto& config = Connection_->GetConfig();
bool retrySequoiaErrorsOnly = !config->EnableRetries;
// NB: even if client's retries are disabled Sequoia transient failures are
// still retriable. See IsRetriableError().
- return NRpc::CreateRetryingChannel(
+ return CreateRetryingChannel(
config->RetryingChannel,
std::move(channel),
BIND([=] (const TError& error) {
@@ -123,7 +125,7 @@ IChannelPtr TClient::CreateSequoiaAwareRetryingChannel(NRpc::IChannelPtr channel
IChannelPtr TClient::CreateNonRetryingChannelByAddress(const std::string& address) const
{
return CreateCredentialsInjectingChannel(
- Connection_->CreateChannelByAddress(address),
+ Connection_->CreateChannelByAddress(TString(address)),
ClientOptions_);
}
@@ -766,7 +768,7 @@ TFuture<ITableWriterPtr> TClient::CreateFragmentTableWriter(
FillRequest(req.Get(), cookie, options);
auto schema = New<TTableSchema>();
- return NRpc::CreateRpcClientOutputStream(
+ return CreateRpcClientOutputStream(
std::move(req),
BIND ([=] (const TSharedRef& metaRef) {
NApi::NRpcProxy::NProto::TWriteTableMeta meta;
@@ -2678,34 +2680,74 @@ TFuture<TGetFlowViewResult> TClient::GetFlowView(
}
TFuture<TShuffleHandlePtr> TClient::StartShuffle(
- const TString& /*account*/,
- int /*partitionCount*/,
- const TStartShuffleOptions& /*options*/)
+ const TString& account,
+ int partitionCount,
+ const TStartShuffleOptions& options)
{
- YT_UNIMPLEMENTED();
+ auto proxy = CreateApiServiceProxy();
+
+ auto req = proxy.StartShuffle();
+ SetTimeoutOptions(*req, options);
+
+ req->set_account(account);
+ req->set_partition_count(partitionCount);
+
+ return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspStartShufflePtr& rsp) {
+ return ConvertTo<TShuffleHandlePtr>(TYsonString(rsp->shuffle_handle()));
+ }));
}
TFuture<void> TClient::FinishShuffle(
- const TShuffleHandlePtr& /*shuffleHandle*/,
- const TFinishShuffleOptions& /*options*/)
+ const TShuffleHandlePtr& shuffleHandle,
+ const TFinishShuffleOptions& options)
{
- YT_UNIMPLEMENTED();
+ auto proxy = CreateApiServiceProxy();
+
+ auto req = proxy.FinishShuffle();
+ SetTimeoutOptions(*req, options);
+
+ req->set_shuffle_handle(ConvertToYsonString(shuffleHandle).ToString());
+
+ return req->Invoke().AsVoid();
}
TFuture<IRowBatchReaderPtr> TClient::CreateShuffleReader(
- const TShuffleHandlePtr& /*shuffleHandle*/,
- int /*partitionIndex*/,
- const TTableReaderConfigPtr& /*config*/)
+ const TShuffleHandlePtr& shuffleHandle,
+ int partitionIndex,
+ const TTableReaderConfigPtr& config)
{
- YT_UNIMPLEMENTED();
+ auto proxy = CreateApiServiceProxy();
+
+ auto req = proxy.ReadShuffleData();
+ InitStreamingRequest(*req);
+
+ req->set_shuffle_handle(ConvertToYsonString(shuffleHandle).ToString());
+ req->set_partition_index(partitionIndex);
+ req->set_reader_config(ConvertToYsonString(config).ToString());
+
+ return CreateRpcClientInputStream(std::move(req))
+ .ApplyUnique(BIND([] (IAsyncZeroCopyInputStreamPtr&& inputStream) {
+ return CreateRowBatchReader(std::move(inputStream), false);
+ }));
}
TFuture<IRowBatchWriterPtr> TClient::CreateShuffleWriter(
- const TShuffleHandlePtr& /*shuffleHandle*/,
- const TString& /*partitionColumn*/,
- const TTableWriterConfigPtr& /*config*/)
+ const TShuffleHandlePtr& shuffleHandle,
+ const TString& partitionColumn,
+ const TTableWriterConfigPtr& config)
{
- YT_UNIMPLEMENTED();
+ auto proxy = CreateApiServiceProxy();
+ auto req = proxy.WriteShuffleData();
+ InitStreamingRequest(*req);
+
+ req->set_shuffle_handle(ConvertToYsonString(shuffleHandle).ToString());
+ req->set_partition_column(partitionColumn);
+ req->set_writer_config(ConvertToYsonString(config).ToString());
+
+ return CreateRpcClientOutputStream(std::move(req))
+ .ApplyUnique(BIND([] (IAsyncZeroCopyOutputStreamPtr&& outputStream) {
+ return CreateRowBatchWriter(std::move(outputStream));
+ }));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.cpp b/yt/yt/client/api/rpc_proxy/connection_impl.cpp
index 5c2f4a7540..6c76bc35b6 100644
--- a/yt/yt/client/api/rpc_proxy/connection_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/connection_impl.cpp
@@ -290,9 +290,9 @@ IChannelPtr TConnection::CreateChannel(bool sticky)
return CreateRoamingChannel(std::move(provider));
}
-IChannelPtr TConnection::CreateChannelByAddress(const std::string& address)
+IChannelPtr TConnection::CreateChannelByAddress(const TString& address)
{
- return CachingChannelFactory_->CreateChannel(address);
+ return CachingChannelFactory_->CreateChannel(address.ConstRef());
}
TClusterTag TConnection::GetClusterTag() const
diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.h b/yt/yt/client/api/rpc_proxy/connection_impl.h
index 92d1f5b4ec..8ccdf08e7d 100644
--- a/yt/yt/client/api/rpc_proxy/connection_impl.h
+++ b/yt/yt/client/api/rpc_proxy/connection_impl.h
@@ -27,7 +27,7 @@ public:
~TConnection();
NRpc::IChannelPtr CreateChannel(bool sticky);
- NRpc::IChannelPtr CreateChannelByAddress(const std::string& address);
+ NRpc::IChannelPtr CreateChannelByAddress(const TString& address);
const TConnectionConfigPtr& GetConfig();
diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp
index e13634f539..3e57e388df 100644
--- a/yt/yt/client/driver/driver.cpp
+++ b/yt/yt/client/driver/driver.cpp
@@ -19,6 +19,7 @@
#include "scheduler_commands.h"
#include "table_commands.h"
#include "transaction_commands.h"
+#include "shuffle_commands.h"
#include <yt/yt/client/api/client_cache.h>
#include <yt/yt/client/api/connection.h>
@@ -386,6 +387,11 @@ public:
REGISTER (TGetPipelineStateCommand, "get_pipeline_state", Null, Structured, false, false, ApiVersion4);
REGISTER (TGetFlowViewCommand, "get_flow_view", Null, Structured, false, false, ApiVersion4);
+ REGISTER (TStartShuffleCommand, "start_shuffle", Null, Structured, true, false, ApiVersion4);
+ REGISTER (TFinishShuffleCommand, "finish_shuffle", Null, Structured, true, false, ApiVersion4);
+ REGISTER (TReadShuffleDataCommand, "read_shuffle_data", Null, Tabular, false, true, ApiVersion4);
+ REGISTER (TWriteShuffleDataCommand, "write_shuffle_data", Tabular, Structured, false, true, ApiVersion4);
+
if (Config_->EnableInternalCommands) {
REGISTER_ALL(TReadHunksCommand, "read_hunks", Null, Structured, false, true );
REGISTER_ALL(TWriteHunksCommand, "write_hunks", Null, Structured, true, true );
diff --git a/yt/yt/client/driver/shuffle_commands.cpp b/yt/yt/client/driver/shuffle_commands.cpp
new file mode 100644
index 0000000000..4e2ee805e8
--- /dev/null
+++ b/yt/yt/client/driver/shuffle_commands.cpp
@@ -0,0 +1,136 @@
+#include "shuffle_commands.h"
+
+#include <yt/yt/client/table_client/adapters.h>
+#include <yt/yt/client/table_client/table_output.h>
+#include <yt/yt/client/table_client/value_consumer.h>
+
+#include <yt/yt/client/driver/config.h>
+#include <yt/yt/client/formats/config.h>
+
+#include <yt/yt/library/formats/format.h>
+
+namespace NYT::NDriver {
+
+using namespace NConcurrency;
+using namespace NFormats;
+using namespace NTableClient;
+using namespace NYson;
+
+//////////////////////////////////////////////////////////////////////////////
+
+void TStartShuffleCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("account", &TThis::Account);
+ registrar.Parameter("partition_count", &TThis::PartitionCount);
+}
+
+void TStartShuffleCommand::DoExecute(ICommandContextPtr context)
+{
+ auto client = context->GetClient();
+ auto asyncResult = client->StartShuffle(Account, PartitionCount, Options);
+ auto shuffleHandle = WaitFor(asyncResult).ValueOrThrow();
+
+ context->ProduceOutputValue(ConvertToYsonString(shuffleHandle));
+}
+
+//////////////////////////////////////////////////////////////////////////////
+
+void TFinishShuffleCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("shuffle_handle", &TThis::ShuffleHandle);
+}
+
+void TFinishShuffleCommand::DoExecute(ICommandContextPtr context)
+{
+ auto client = context->GetClient();
+ auto asyncResult = client->FinishShuffle(ShuffleHandle, Options);
+ WaitFor(asyncResult).ThrowOnError();
+
+ ProduceEmptyOutput(context);
+}
+
+//////////////////////////////////////////////////////////////////////////////
+
+void TReadShuffleDataCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("shuffle_handle", &TThis::ShuffleHandle);
+ registrar.Parameter("partition_index", &TThis::PartitionIndex);
+}
+
+void TReadShuffleDataCommand::DoExecute(ICommandContextPtr context)
+{
+ auto client = context->GetClient();
+
+ auto reader = WaitFor(context->GetClient()->CreateShuffleReader(
+ ShuffleHandle,
+ PartitionIndex,
+ Options))
+ .ValueOrThrow();
+
+ auto format = context->GetOutputFormat();
+
+ auto writer = CreateStaticTableWriterForFormat(
+ format,
+ reader->GetNameTable(),
+ /*tableSchemas*/ {New<TTableSchema>()},
+ context->Request().OutputStream,
+ /*enableContextSaving*/ false,
+ New<TControlAttributesConfig>(),
+ /*keyColumnCount*/ 0);
+
+ NTableClient::TRowBatchReadOptions options{
+ .MaxRowsPerRead = context->GetConfig()->ReadBufferRowCount,
+ .Columnar = (format.GetType() == EFormatType::Arrow),
+ };
+
+ PipeReaderToWriterByBatches(
+ reader,
+ writer,
+ options);
+}
+
+//////////////////////////////////////////////////////////////////////////////
+
+void TWriteShuffleDataCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("shuffle_handle", &TThis::ShuffleHandle);
+ registrar.Parameter("partition_column", &TThis::PartitionColumn);
+ registrar.Parameter("max_row_buffer_size", &TThis::MaxRowBufferSize)
+ .Default(1_MB);
+}
+
+void TWriteShuffleDataCommand::DoExecute(ICommandContextPtr context)
+{
+ auto client = context->GetClient();
+
+ auto writer = WaitFor(context->GetClient()->CreateShuffleWriter(
+ ShuffleHandle,
+ PartitionColumn,
+ Options))
+ .ValueOrThrow();
+
+ auto schemalessWriter = CreateSchemalessFromApiWriterAdapter(std::move(writer));
+
+ TWritingValueConsumer valueConsumer(
+ schemalessWriter,
+ ConvertTo<TTypeConversionConfigPtr>(context->GetInputFormat().Attributes()),
+ MaxRowBufferSize);
+
+ TTableOutput output(CreateParserForFormat(
+ context->GetInputFormat(),
+ &valueConsumer));
+
+ PipeInputToOutput(context->Request().InputStream, &output);
+
+ WaitFor(valueConsumer.Flush())
+ .ThrowOnError();
+
+ WaitFor(schemalessWriter->Close())
+ .ThrowOnError();
+
+ ProduceEmptyOutput(context);
+}
+
+//////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDriver
diff --git a/yt/yt/client/driver/shuffle_commands.h b/yt/yt/client/driver/shuffle_commands.h
new file mode 100644
index 0000000000..7b4394c49a
--- /dev/null
+++ b/yt/yt/client/driver/shuffle_commands.h
@@ -0,0 +1,79 @@
+#pragma once
+
+#include "command.h"
+
+#include <yt/yt/client/ypath/rich.h>
+
+namespace NYT::NDriver {
+
+////////////////////////////////////////////////////////////////////////////
+
+class TStartShuffleCommand
+ : public TTypedCommand<NApi::TStartShuffleOptions>
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TStartShuffleCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ TString Account;
+ int PartitionCount;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////
+
+class TFinishShuffleCommand
+ : public TTypedCommand<NApi::TFinishShuffleOptions>
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TFinishShuffleCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ NApi::TShuffleHandlePtr ShuffleHandle;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////
+
+class TReadShuffleDataCommand
+ : public TTypedCommand<NTableClient::TTableReaderConfigPtr>
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TReadShuffleDataCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ NApi::TShuffleHandlePtr ShuffleHandle;
+ int PartitionIndex;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////
+
+class TWriteShuffleDataCommand
+ : public TTypedCommand<NTableClient::TTableWriterConfigPtr>
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TWriteShuffleDataCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ NApi::TShuffleHandlePtr ShuffleHandle;
+ TString PartitionColumn;
+ i64 MaxRowBufferSize;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDriver
diff --git a/yt/yt/client/driver/ya.make b/yt/yt/client/driver/ya.make
index f33485ce8f..0e6ee7b1fa 100644
--- a/yt/yt/client/driver/ya.make
+++ b/yt/yt/client/driver/ya.make
@@ -14,16 +14,17 @@ SRCS(
driver.cpp
etc_commands.cpp
file_commands.cpp
+ flow_commands.cpp
helpers.cpp
+ internal_commands.cpp
journal_commands.cpp
proxy_discovery_cache.cpp
query_commands.cpp
queue_commands.cpp
scheduler_commands.cpp
+ shuffle_commands.cpp
table_commands.cpp
transaction_commands.cpp
- internal_commands.cpp
- flow_commands.cpp
)
PEERDIR(