diff options
author | cdzyura171 <cdzyura171@yandex-team.com> | 2025-04-03 14:50:13 +0300 |
---|---|---|
committer | cdzyura171 <cdzyura171@yandex-team.com> | 2025-04-03 15:12:42 +0300 |
commit | f8722f9dd0311cb20863ccc2b382cd7db56aff43 (patch) | |
tree | 85163aa2d1a153df604b524635ecead51226347a | |
parent | 9aadc5b85f8524240e9f25315f51d47666a01e8b (diff) | |
download | ydb-f8722f9dd0311cb20863ccc2b382cd7db56aff43.tar.gz |
Add merge to fast map reduce gateway
Add merge to fast map reduce gateway
commit_hash:83b68d130a725a0ff04ee777063f130df571849e
25 files changed, 403 insertions, 196 deletions
diff --git a/yql/essentials/providers/common/provider/yql_provider.cpp b/yql/essentials/providers/common/provider/yql_provider.cpp index 7f3ef39e9a5..f5fdda40527 100644 --- a/yql/essentials/providers/common/provider/yql_provider.cpp +++ b/yql/essentials/providers/common/provider/yql_provider.cpp @@ -1084,7 +1084,7 @@ bool FillUsedFilesImpl( return childrenOk; } -static void GetToken(const TString& string, TString& out, const TTypeAnnotationContext& type) { +void GetToken(const TString& string, TString& out, const TTypeAnnotationContext& type) { auto separator = string.find(":"); const auto p0 = string.substr(0, separator); if (p0 == "api") { diff --git a/yql/essentials/providers/common/provider/yql_provider.h b/yql/essentials/providers/common/provider/yql_provider.h index efddf213ff8..4410f1cbc2c 100644 --- a/yql/essentials/providers/common/provider/yql_provider.h +++ b/yql/essentials/providers/common/provider/yql_provider.h @@ -216,6 +216,8 @@ void TransformerStatsToYson(const TString& name, const IGraphTransformer::TStati TString TransformerStatsToYson(const IGraphTransformer::TStatistics& stats, NYson::EYsonFormat format = NYson::EYsonFormat::Pretty); +void GetToken(const TString& string, TString& out, const TTypeAnnotationContext& type); + void FillSecureParams(const TExprNode::TPtr& node, const TTypeAnnotationContext& types, THashMap<TString, TString>& secureParams); bool FillUsedFiles(const TExprNode& node, TUserDataTable& files, const TTypeAnnotationContext& types, TExprContext& ctx, const TUserDataTable& crutches = {}); diff --git a/yt/yql/providers/yt/common/yql_yt_settings.cpp b/yt/yql/providers/yt/common/yql_yt_settings.cpp index e8f019ea168..27d6a032a1c 100644 --- a/yt/yql/providers/yt/common/yql_yt_settings.cpp +++ b/yt/yql/providers/yt/common/yql_yt_settings.cpp @@ -366,6 +366,13 @@ TYtConfiguration::TYtConfiguration(TTypeAnnotationContext& typeCtx) OperationSpec[cluster] = spec; HybridDqExecution = false; }); + REGISTER_SETTING(*this, FmrOperationSpec) + .Parser([](const TString& v) { return NYT::NodeFromYsonString(v, ::NYson::EYsonType::Node); }) + .Validator([] (const TString&, const NYT::TNode& value) { + if (!value.IsMap()) { + throw yexception() << "Expected yson map, but got " << value.GetType(); + } + }); REGISTER_SETTING(*this, Annotations) .Parser([](const TString& v) { return NYT::NodeFromYsonString(v); }) .Validator([] (const TString&, const NYT::TNode& value) { diff --git a/yt/yql/providers/yt/common/yql_yt_settings.h b/yt/yql/providers/yt/common/yql_yt_settings.h index e2dd916629d..5a291b5dab2 100644 --- a/yt/yql/providers/yt/common/yql_yt_settings.h +++ b/yt/yql/providers/yt/common/yql_yt_settings.h @@ -183,6 +183,7 @@ struct TYtSettings { NCommon::TConfSetting<TString, true> DockerImage; NCommon::TConfSetting<NYT::TNode, true> JobEnv; NCommon::TConfSetting<NYT::TNode, true> OperationSpec; + NCommon::TConfSetting<NYT::TNode, true> FmrOperationSpec; NCommon::TConfSetting<NYT::TNode, true> Annotations; NCommon::TConfSetting<NYT::TNode, true> StartedBy; NCommon::TConfSetting<NYT::TNode, true> Description; diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ya.make b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make index 11d323d1289..5a16fbe0ce3 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/ya.make +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make @@ -7,6 +7,7 @@ SRCS( PEERDIR( library/cpp/random_provider library/cpp/threading/future + library/cpp/yson/node yt/yql/providers/yt/fmr/coordinator/interface yql/essentials/utils/log yql/essentials/utils diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp index 9c14f304036..1831e7f1acc 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp @@ -64,7 +64,12 @@ public: TString taskId = GenerateId(); auto taskParams = MakeDefaultTaskParamsFromOperation(request.OperationParams); - TTask::TPtr createdTask = MakeTask(request.TaskType, taskId, taskParams, request.SessionId, request.ClusterConnection); + TMaybe<NYT::TNode> jobSettings = Nothing(); + auto fmrOperationSpec = request.FmrOperationSpec; + if (fmrOperationSpec && fmrOperationSpec->IsMap() && fmrOperationSpec->HasKey("job_settings")) { + jobSettings = (*fmrOperationSpec)["job_settings"]; + } + TTask::TPtr createdTask = MakeTask(request.TaskType, taskId, taskParams, request.SessionId, request.ClusterConnection, jobSettings); Tasks_[taskId] = TCoordinatorTaskInfo{.Task = createdTask, .TaskStatus = ETaskStatus::Accepted, .OperationId = operationId}; diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h index d8a526096ae..b3c06dbe5c3 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h +++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h @@ -1,6 +1,7 @@ #pragma once #include <library/cpp/random_provider/random_provider.h> +#include <library/cpp/yson/node/node.h> #include <util/system/mutex.h> #include <util/system/guard.h> #include <util/generic/queue.h> diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/ya.make b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/ya.make index 62df1953063..cd29edc85d7 100644 --- a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/ya.make +++ b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/ya.make @@ -5,6 +5,7 @@ SRCS( ) PEERDIR( + library/cpp/yson/node yt/yql/providers/yt/fmr/coordinator/interface yt/yql/providers/yt/fmr/proto yt/yql/providers/yt/fmr/request_options/proto_helpers diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp index fbb4a641a08..8c244c3f1aa 100644 --- a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp @@ -1,4 +1,5 @@ #include "yql_yt_coordinator_proto_helpers.h" +#include <library/cpp/yson/node/node_io.h> namespace NYql::NFmr { @@ -71,6 +72,9 @@ NProto::TStartOperationRequest StartOperationRequestToProto(const TStartOperatio protoStartOperationRequest.SetNumRetries(startOperationRequest.NumRetries); auto protoClusterConnection = ClusterConnectionToProto(startOperationRequest.ClusterConnection); protoStartOperationRequest.MutableClusterConnection()->Swap(&protoClusterConnection); + if (startOperationRequest.FmrOperationSpec) { + protoStartOperationRequest.SetFmrOperationSpec(NYT::NodeToYsonString(*startOperationRequest.FmrOperationSpec)); + } return protoStartOperationRequest; } @@ -84,6 +88,9 @@ TStartOperationRequest StartOperationRequestFromProto(const NProto::TStartOperat } startOperationRequest.NumRetries = protoStartOperationRequest.GetNumRetries(); startOperationRequest.ClusterConnection = ClusterConnectionFromProto(protoStartOperationRequest.GetClusterConnection()); + if (protoStartOperationRequest.HasFmrOperationSpec()) { + startOperationRequest.FmrOperationSpec = NYT::NodeFromYsonString(protoStartOperationRequest.GetFmrOperationSpec()); + } return startOperationRequest; } diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h index 15a06b2d590..15ecd6c97a5 100644 --- a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h +++ b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h @@ -25,7 +25,8 @@ struct TStartOperationRequest { TString SessionId; TMaybe<TString> IdempotencyKey = Nothing(); ui32 NumRetries = 1; // Not supported yet - TClusterConnection ClusterConnection = {}; + TClusterConnection ClusterConnection = {}; // TODO - change to map + TMaybe<NYT::TNode> FmrOperationSpec = Nothing(); }; struct TStartOperationResponse { diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp index ac3a9263c61..7b17dee907b 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp @@ -156,7 +156,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { TYtTableRef output = TYtTableRef("test_cluster", "test_path"); TUploadTaskParams params = TUploadTaskParams(input, output); - TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id"); + TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", TClusterConnection()); auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0); tableDataServicePtr->Put(key, GetBinaryYson(TableContent_1)); ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; @@ -178,7 +178,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { TYtTableRef output = TYtTableRef("test_cluster", "test_path"); TUploadTaskParams params = TUploadTaskParams(input, output); - TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id"); + TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", TClusterConnection()); // No tables in tableDataService ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; @@ -205,7 +205,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { auto params = TMergeTaskParams(inputs, output); auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); - TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id"); + TTask::TPtr task = MakeTask(ETaskType::Merge, "test_task_id", params, "test_session_id", TClusterConnection()); auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0); auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0); @@ -239,7 +239,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { auto params = TMergeTaskParams(inputs, output); auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); - TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id"); + TTask::TPtr task = MakeTask(ETaskType::Merge, "test_task_id", params, "test_session_id", TClusterConnection()); auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0); auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0); diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp index aa3d0b51122..c8da1df4801 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp @@ -16,15 +16,12 @@ namespace NYql::NFmr { class TFmrJob: public IFmrJob { public: - TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings) + TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TMaybe<TFmrJobSettings>& settings) : TableDataService_(tableDataService), YtService_(ytService), CancelFlag_(cancelFlag), Settings_(settings) { } - virtual std::variant<TError, TStatistics> Download( - const TDownloadTaskParams& params, - const TClusterConnection& clusterConnection - ) override { + virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const TClusterConnection& clusterConnection) override { try { const auto ytTable = params.Input; const auto cluster = params.Input.Cluster; @@ -36,9 +33,9 @@ public: YQL_CLOG(DEBUG, FastMapReduce) << "Downloading " << cluster << '.' << path; auto ytTableReader = YtService_->MakeReader(ytTable, clusterConnection); // TODO - pass YtReader settings from Gateway - auto tableDataServiceWriter = TFmrTableDataServiceWriter(tableId, partId, TableDataService_, Settings_.FmrTableDataServiceWriterSettings); + auto tableDataServiceWriter = TFmrTableDataServiceWriter(tableId, partId, TableDataService_, GetFmrTableDataServiceWriterSettings()); - ParseRecords(*ytTableReader, tableDataServiceWriter, Settings_.ParseRecordSettings.BlockCount, Settings_.ParseRecordSettings.BlockSize); + ParseRecords(*ytTableReader, tableDataServiceWriter, GetParseRecordSettings().BlockCount, GetParseRecordSettings().BlockSize); tableDataServiceWriter.Flush(); TTableStats stats = tableDataServiceWriter.GetStats(); @@ -59,9 +56,9 @@ public: YQL_CLOG(DEBUG, FastMapReduce) << "Uploading " << cluster << '.' << path; - auto tableDataServiceReader = TFmrTableDataServiceReader(tableId, tableRanges, TableDataService_, Settings_.FmrTableDataServiceReaderSettings); - auto ytTableWriter = YtService_->MakeWriter(ytTable, clusterConnection); // TODO - pass YtReader settings from Gateway - ParseRecords(tableDataServiceReader, *ytTableWriter, Settings_.ParseRecordSettings.BlockCount, Settings_.ParseRecordSettings.BlockSize); + auto tableDataServiceReader = TFmrTableDataServiceReader(tableId, tableRanges, TableDataService_, GetFmrTableDataServiceReaderSettings()); + auto ytTableWriter = YtService_->MakeWriter(ytTable, clusterConnection); + ParseRecords(tableDataServiceReader, *ytTableWriter, GetParseRecordSettings().BlockCount, GetParseRecordSettings().BlockSize); ytTableWriter->Flush(); return TStatistics(); @@ -71,6 +68,7 @@ public: } virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const TClusterConnection& clusterConnection) override { + // TODO - unordered_map<ClusterConnection> // расширить таск парамс. добавить туда мету try { const auto inputs = params.Input; @@ -78,13 +76,13 @@ public: YQL_CLOG(DEBUG, FastMapReduce) << "Merging " << inputs.size() << " inputs"; - auto tableDataServiceWriter = TFmrTableDataServiceWriter(output.TableId, output.PartId, TableDataService_, Settings_.FmrTableDataServiceWriterSettings); + auto tableDataServiceWriter = TFmrTableDataServiceWriter(output.TableId, output.PartId, TableDataService_, GetFmrTableDataServiceWriterSettings()); for (const auto& inputTableRef : inputs) { if (CancelFlag_->load()) { return TError("Canceled"); } auto inputTableReader = GetTableInputStream(inputTableRef, clusterConnection); - ParseRecords(*inputTableReader, tableDataServiceWriter, Settings_.ParseRecordSettings.BlockCount, Settings_.ParseRecordSettings.BlockSize); + ParseRecords(*inputTableReader, tableDataServiceWriter, GetParseRecordSettings().BlockCount, GetParseRecordSettings().BlockSize); } tableDataServiceWriter.Flush(); return TStatistics({{output, tableDataServiceWriter.GetStats()}}); @@ -101,17 +99,29 @@ private: if (ytTable) { return YtService_->MakeReader(*ytTable, clusterConnection); // TODO - pass YtReader settings from Gateway } else if (fmrTable) { - return MakeIntrusive<TFmrTableDataServiceReader>(fmrTable->TableId, fmrTable->TableRanges, TableDataService_, Settings_.FmrTableDataServiceReaderSettings); + return MakeIntrusive<TFmrTableDataServiceReader>(fmrTable->TableId, fmrTable->TableRanges, TableDataService_, GetFmrTableDataServiceReaderSettings()); } else { ythrow yexception() << "Unsupported table type"; } } + TParseRecordSettings GetParseRecordSettings() { + return Settings_ ? Settings_->ParseRecordSettings : TParseRecordSettings(); + } + + TFmrTableDataServiceReaderSettings GetFmrTableDataServiceReaderSettings() { + return Settings_ ? Settings_->FmrTableDataServiceReaderSettings : TFmrTableDataServiceReaderSettings(); + } + + TFmrTableDataServiceWriterSettings GetFmrTableDataServiceWriterSettings() { + return Settings_ ? Settings_->FmrTableDataServiceWriterSettings : TFmrTableDataServiceWriterSettings(); + } + private: ITableDataService::TPtr TableDataService_; IYtService::TPtr YtService_; std::shared_ptr<std::atomic<bool>> CancelFlag_; - const TFmrJobSettings Settings_; + TMaybe<TFmrJobSettings> Settings_; }; IFmrJob::TPtr MakeFmrJob( @@ -128,9 +138,10 @@ TJobResult RunJob( ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, - const TFmrJobSettings& settings + const TMaybe<TFmrJobSettings>& settings ) { - IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, cancelFlag, settings); + TFmrJobSettings jobSettings = settings ? *settings : GetJobSettingsFromTask(task); + IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, cancelFlag, jobSettings); auto processTask = [job, task] (auto&& taskParams) { using T = std::decay_t<decltype(taskParams)>; @@ -160,4 +171,36 @@ TJobResult RunJob( return {ETaskStatus::Completed, *statistics}; }; +TFmrJobSettings GetJobSettingsFromTask(TTask::TPtr task) { + if (!task->JobSettings) { + return TFmrJobSettings(); + } + auto jobSettings = *task->JobSettings; + YQL_ENSURE(jobSettings.IsMap()); + TFmrJobSettings resultSettings{}; + if (jobSettings.HasKey("parse_record_settings")) { + auto& parseRecordSettings = jobSettings["parse_record_settings"]; + if (parseRecordSettings.HasKey("block_count")) { + resultSettings.ParseRecordSettings.BlockCount = parseRecordSettings["block_count"].AsInt64(); + } + if (parseRecordSettings.HasKey("block_size")) { + resultSettings.ParseRecordSettings.BlockSize = parseRecordSettings["block_size"].AsInt64(); + // TODO - support different formats (B, MB, ...) + } + } + if (jobSettings.HasKey("fmr_reader_settings")) { + auto& fmrReaderSettings = jobSettings["fmr_reader_settings"]; + if (fmrReaderSettings.HasKey("read_ahead_chunks")) { + resultSettings.FmrTableDataServiceReaderSettings.ReadAheadChunks = fmrReaderSettings["read_ahead_chunks"].AsInt64(); + } + } + if (jobSettings.HasKey("fmr_writer_settings")) { + auto& fmrWriterSettings = jobSettings["fmr_writer_settings"]; + if (fmrWriterSettings.HasKey("chunk_size")) { + resultSettings.FmrTableDataServiceWriterSettings.ChunkSize = fmrWriterSettings["chunk_size"].AsInt64(); + } + } + return resultSettings; +} + } // namespace NYql diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h index 7010a2eb964..cb21e95f9c9 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h @@ -9,7 +9,7 @@ namespace NYql::NFmr { struct TParseRecordSettings { ui64 BlockCount = 1; - ui64 BlockSize = 1024 * 1024; // 1Mb + ui64 BlockSize = 1024 * 1024; }; struct TFmrJobSettings { @@ -20,6 +20,8 @@ struct TFmrJobSettings { IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings = {}); -TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings = {}); +TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TMaybe<TFmrJobSettings>& settings = Nothing()); + +TFmrJobSettings GetJobSettingsFromTask(TTask::TPtr task); } // namespace NYql diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h index a708b795432..2e27cd4f072 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h @@ -10,7 +10,7 @@ namespace NYql::NFmr { struct TFmrTableDataServiceWriterSettings { - ui64 ChunkSize = 1024 * 1024; // 1Mb + ui64 ChunkSize = 1024 * 1024; }; class TFmrTableDataServiceWriter: public NYT::TRawTableWriter { diff --git a/yt/yql/providers/yt/fmr/proto/coordinator.proto b/yt/yql/providers/yt/fmr/proto/coordinator.proto index 99d0ca23617..a104cf26eac 100644 --- a/yt/yql/providers/yt/fmr/proto/coordinator.proto +++ b/yt/yql/providers/yt/fmr/proto/coordinator.proto @@ -22,6 +22,7 @@ message TStartOperationRequest { optional string IdempotencyKey = 4; uint32 NumRetries = 5; TClusterConnection ClusterConnection = 6; + optional string FmrOperationSpec = 7; } message TStartOperationResponse { diff --git a/yt/yql/providers/yt/fmr/proto/request_options.proto b/yt/yql/providers/yt/fmr/proto/request_options.proto index a2580eb39f8..4bd35eacaa4 100644 --- a/yt/yql/providers/yt/fmr/proto/request_options.proto +++ b/yt/yql/providers/yt/fmr/proto/request_options.proto @@ -156,6 +156,7 @@ message TTask { string SessionId = 4; optional uint32 NumRetries = 5; TClusterConnection ClusterConnection = 6; + optional string JobSettings = 7; } message TTaskState { diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp index 0ce6dea92da..0860c009962 100644 --- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp +++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp @@ -1,4 +1,5 @@ #include "yql_yt_request_proto_helpers.h" +#include <library/cpp/yson/node/node_io.h> namespace NYql::NFmr { @@ -399,6 +400,9 @@ NProto::TTask TaskToProto(const TTask& task) { protoTask.SetNumRetries(task.NumRetries); auto clusterConnection = ClusterConnectionToProto(task.ClusterConnection); protoTask.MutableClusterConnection()->Swap(&clusterConnection); + if (task.JobSettings) { + protoTask.SetJobSettings(NYT::NodeToYsonString(*task.JobSettings)); + } return protoTask; } @@ -410,6 +414,9 @@ TTask TaskFromProto(const NProto::TTask& protoTask) { task.SessionId = protoTask.GetSessionId(); task.NumRetries = protoTask.GetNumRetries(); task.ClusterConnection = ClusterConnectionFromProto(protoTask.GetClusterConnection()); + if (protoTask.HasJobSettings()) { + task.JobSettings = NYT::NodeFromYsonString(protoTask.GetJobSettings()); + } return task; } diff --git a/yt/yql/providers/yt/fmr/request_options/ya.make b/yt/yql/providers/yt/fmr/request_options/ya.make index 9e330848c29..df82ec258fe 100644 --- a/yt/yql/providers/yt/fmr/request_options/ya.make +++ b/yt/yql/providers/yt/fmr/request_options/ya.make @@ -5,6 +5,7 @@ SRCS( ) PEERDIR( + library/cpp/yson/node library/cpp/threading/future ) diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp index 319cf208012..0dc3650855c 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp @@ -2,8 +2,8 @@ namespace NYql::NFmr { -TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection) { - return MakeIntrusive<TTask>(taskType, taskId, taskParams, sessionId, clusterConnection); +TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection, const TMaybe<NYT::TNode>& jobSettings) { + return MakeIntrusive<TTask>(taskType, taskId, taskParams, sessionId, clusterConnection, jobSettings); } TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& taskErrorMessage, const TStatistics& stats) { diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h index 2c2e94a057a..de18d91fa09 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h @@ -1,5 +1,6 @@ #pragma once +#include <library/cpp/yson/node/node.h> #include <util/digest/numeric.h> #include <util/generic/maybe.h> #include <util/generic/string.h> @@ -167,8 +168,8 @@ struct TClusterConnection { struct TTask: public TThrRefBase { TTask() = default; - TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection, ui32 numRetries = 1) - : TaskType(taskType), TaskId(taskId), TaskParams(taskParams), SessionId(sessionId), ClusterConnection(clusterConnection), NumRetries(numRetries) + TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection, const TMaybe<NYT::TNode> & jobSettings = Nothing(), ui32 numRetries = 1) + : TaskType(taskType), TaskId(taskId), TaskParams(taskParams), SessionId(sessionId), ClusterConnection(clusterConnection), JobSettings(jobSettings), NumRetries(numRetries) { } @@ -177,6 +178,7 @@ struct TTask: public TThrRefBase { TTaskParams TaskParams = {}; TString SessionId; TClusterConnection ClusterConnection = {}; + TMaybe<NYT::TNode> JobSettings = {}; ui32 NumRetries; // Not supported yet using TPtr = TIntrusivePtr<TTask>; @@ -197,7 +199,7 @@ struct TTaskState: public TThrRefBase { using TPtr = TIntrusivePtr<TTaskState>; }; -TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection = TClusterConnection{}); +TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection = TClusterConnection{}, const TMaybe<NYT::TNode>& jobSettings = Nothing()); TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& taskErrorMessage = Nothing(), const TStatistics& stats = TStatistics()); diff --git a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp b/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp index c6e535fb715..d618766edeb 100644 --- a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp +++ b/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp @@ -28,13 +28,13 @@ public: NYT::TRawTableWriterPtr MakeWriter( const TYtTableRef& ytTable, const TClusterConnection& clusterConnection, - const TYtWriterSettings& writerSetttings + const TYtWriterSettings& /*writerSetttings*/ ) override { auto client = CreateClient(clusterConnection); auto transaction = client->AttachTransaction(GetGuid(clusterConnection.TransactionId)); - auto path = NYT::TRichYPath(NYT::AddPathPrefix(ytTable.Path, "//")); - auto richPath = NYT::TRichYPath(path).Append(writerSetttings.AppendMode); - return transaction->CreateRawWriter(richPath, NYT::TFormat::YsonBinary()); + TString ytPath = NYT::AddPathPrefix(ytTable.Path, "//"); + auto richPath = NYT::TRichYPath(ytPath).Append(true); + return transaction->CreateRawWriter(richPath, NYT::TFormat::YsonBinary()); // TODO - support writerOptions } private: diff --git a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h b/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h index 1661c188955..7ed2a47fc00 100644 --- a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h +++ b/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h @@ -11,7 +11,6 @@ struct TYtReaderSettings { }; struct TYtWriterSettings { - bool AppendMode = true; }; class IYtService: public TThrRefBase { diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp index 529e0806df0..429880e66e1 100644 --- a/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp +++ b/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp @@ -1593,7 +1593,7 @@ private: } TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& /*options*/) override { - ythrow yexception() << "GetClusterConnection should not be called for file gateway"; + return TClusterConnectionResult(); } diff --git a/yt/yql/providers/yt/gateway/fmr/ya.make b/yt/yql/providers/yt/gateway/fmr/ya.make index c424b95d0b9..d00a1b6ee49 100644 --- a/yt/yql/providers/yt/gateway/fmr/ya.make +++ b/yt/yql/providers/yt/gateway/fmr/ya.make @@ -5,13 +5,16 @@ SRCS( ) PEERDIR( + yql/essentials/providers/common/codec yql/essentials/utils/log - yt/cpp/mapreduce/client + yt/cpp/mapreduce/common + yt/cpp/mapreduce/interface yt/yql/providers/yt/gateway/lib yt/yql/providers/yt/gateway/native yt/yql/providers/yt/expr_nodes yt/yql/providers/yt/fmr/coordinator/interface yt/yql/providers/yt/lib/config_clusters + yt/yql/providers/yt/lib/schema yt/yql/providers/yt/provider ) diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp index aab5bdb6dde..dd5160d07c2 100644 --- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp +++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp @@ -2,16 +2,20 @@ #include <thread> +#include <yt/cpp/mapreduce/common/helpers.h> #include <yt/cpp/mapreduce/interface/client.h> #include <yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h> #include <yt/yql/providers/yt/gateway/lib/yt_helpers.h> #include <yt/yql/providers/yt/gateway/native/yql_yt_native.h> +#include <yt/yql/providers/yt/lib/schema/schema.h> #include <yt/yql/providers/yt/provider/yql_yt_helpers.h> +#include <yql/essentials/providers/common/codec/yql_codec_type_flags.h> #include <yql/essentials/utils/log/log.h> #include <yql/essentials/utils/log/profile.h> #include <util/generic/ptr.h> +#include <util/string/split.h> #include <util/thread/pool.h> using namespace NThreading; @@ -27,7 +31,7 @@ enum class ETablePresenceStatus { Both }; -struct TDownloadTableToFmrResult: public NCommon::TOperationResult {}; // Download Yt -> Fmr TableDataService +struct TFmrOperationResult: public NCommon::TOperationResult {}; class TFmrYtGateway final: public TYtForwardingGatewayBase { public: @@ -41,7 +45,7 @@ public: auto getOperationStatusesFunc = [&] { while (!StopFmrGateway_) { with_lock(SessionStates_->Mutex) { - auto checkOperationStatuses = [&] <typename T> (std::unordered_map<TString, TPromise<T>>& operationStatuses, const TString& sessionId) { + auto checkOperationStatuses = [&] (std::unordered_map<TString, TPromise<TFmrOperationResult>>& operationStatuses, const TString& sessionId) { for (auto& [operationId, promise]: operationStatuses) { YQL_CLOG(TRACE, FastMapReduce) << "Sending get operation request to coordinator with operationId: " << operationId; @@ -53,9 +57,15 @@ public: with_lock(SessionStates_->Mutex) { bool operationCompleted = getOperationStatus != EOperationStatus::Accepted && getOperationStatus != EOperationStatus::InProgress; if (operationCompleted) { - // operation finished, set value in future returned in Publish / Download + // operation finished, set value in future returned in DoMerge / DoUpload bool hasCompletedSuccessfully = getOperationStatus == EOperationStatus::Completed; - SendOperationCompletionSignal(promise, hasCompletedSuccessfully, operationErrorMessages); + if (hasCompletedSuccessfully) { + TFmrOperationResult fmrOperationResult{}; + fmrOperationResult.SetSuccess(); + promise.SetValue(fmrOperationResult); + } else { + promise.SetException(JoinRange(' ', operationErrorMessages.begin(), operationErrorMessages.end())); + } YQL_CLOG(DEBUG, FastMapReduce) << "Sending delete operation request to coordinator with operationId: " << operationId; auto deleteOperationFuture = Coordinator_->DeleteOperation({operationId}); deleteOperationFuture.Subscribe([&, sessionId, operationId] (const auto& deleteFuture) { @@ -66,8 +76,7 @@ public: YQL_ENSURE( SessionStates_->Sessions.contains(sessionId)); auto& sessionInfo = SessionStates_->Sessions[sessionId]; auto& operationStates = sessionInfo.OperationStates; - operationStates.DownloadOperationStatuses.erase(operationId); - operationStates.UploadOperationStatuses.erase(operationId); + operationStates.OperationStatuses.erase(operationId); } }); } @@ -78,8 +87,7 @@ public: for (auto [sessionId, sessionInfo]: SessionStates_->Sessions) { auto& operationStates = sessionInfo.OperationStates; - checkOperationStatuses(operationStates.DownloadOperationStatuses, sessionId); - checkOperationStatuses(operationStates.UploadOperationStatuses, sessionId); + checkOperationStatuses(operationStates.OperationStatuses, sessionId); } } Sleep(TimeToSleepBetweenGetOperationRequests_); @@ -93,138 +101,73 @@ public: GetOperationStatusesThread_.join(); } - TFuture<TPublishResult> Publish(const TExprNode::TPtr& node, TExprContext& ctx, TPublishOptions&& options) final { + TFuture<TRunResult> Run(const TExprNode::TPtr& node, TExprContext& ctx, TRunOptions&& options) final { YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); - if (!Coordinator_) { - return Slave_->Publish(node, ctx, std::move(options)); - } - auto publish = TYtPublish(node); + auto nodePos = ctx.GetPosition(node->Pos()); + TYtOpBase opBase(node); TString sessionId = options.SessionId(); - auto cluster = publish.DataSink().Cluster().StringValue(); - auto token = options.Config()->Auth.Get(); - TString transformedInputPath; - TString userName = GetUsername(sessionId); - for (auto out: publish.Input()) { - auto outTable = GetOutTable(out).Cast<TYtOutTable>(); - TStringBuf inputPath = outTable.Name().Value(); - transformedInputPath = NYql::TransformPath(GetTablesTmpFolder(*options.Config()), inputPath, true, userName); - break; + if (auto op = opBase.Maybe<TYtMerge>()) { + auto ytMerge = op.Cast(); + std::vector<TYtTableRef> inputTables = GetMergeInputTables(ytMerge); + TYtTableRef outputTable = GetMergeOutputTable(ytMerge); + auto future = DoMerge(inputTables, outputTable, std::move(options)); + return future.Apply([this, pos = nodePos, outputTable = std::move(outputTable), options = std::move(options)] (const TFuture<TFmrOperationResult>& f) { + try { + f.GetValue(); // rethrow error if any + TString sessionId = options.SessionId(); + auto config = options.Config(); + TString transformedOutputTableId = GetTransformedPath(outputTable.Path, sessionId, config); + TString fmrOutputTableId = outputTable.Cluster + "." + transformedOutputTableId; + SetTablePresenceStatus(fmrOutputTableId, sessionId, ETablePresenceStatus::OnlyInFmr); + TRunResult result; + result.OutTableStats.emplace_back(outputTable.Path, MakeIntrusive<TYtTableStatInfo>()); // TODO - add statistics? + result.OutTableStats.back().second->Id = "fmr_" + fmrOutputTableId; + result.SetSuccess(); + return MakeFuture<TRunResult>(std::move(result)); + } catch (...) { + return MakeFuture(ResultFromCurrentException<TRunResult>(pos)); + } + }); + } else { + return Slave_->Run(node, ctx, std::move(options)); } + } - // TODO - handle several inputs in Publish, use ColumnGroups, Run Merge - - auto outputPath = publish.Publish().Name().StringValue(); - auto idempotencyKey = GenerateId(); + TFuture<TPublishResult> Publish(const TExprNode::TPtr& node, TExprContext& ctx, TPublishOptions&& options) final { + TString sessionId = options.SessionId(); + YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); + auto nodePos = ctx.GetPosition(node->Pos()); + auto publish = TYtPublish(node); - auto fmrTableId = cluster + "." + outputPath; + auto cluster = publish.DataSink().Cluster().StringValue(); + std::vector<TFmrTableRef> fmrTableIds; + auto config = options.Config(); - TFuture<TDownloadTableToFmrResult> downloadToFmrFuture; - TFuture<void> downloadedSuccessfully; + std::vector<TFuture<TFmrOperationResult>> uploadFmrTablesToYtFutures; - with_lock(SessionStates_->Mutex) { - auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses; + for (auto out: publish.Input()) { + auto outTableWithCluster = GetOutTableWithCluster(out); + auto outTable = GetOutTable(out).Cast<TYtOutTable>(); + TStringBuf inputPath = outTable.Name().Value(); + TString transformedInputPath = GetTransformedPath(ToString(inputPath), sessionId, config); + auto outputBase = out.Operation().Cast<TYtOutputOpBase>().Ptr(); - if (!tablePresenceStatuses.contains(fmrTableId)) { - TYtTableRef ytTable{.Path = transformedInputPath, .Cluster = cluster}; - TFmrTableRef fmrTable{.TableId = fmrTableId}; - tablePresenceStatuses[fmrTableId] = ETablePresenceStatus::Both; - downloadToFmrFuture = DownloadToFmrTableDataSerivce(ytTable, fmrTable, sessionId, options.Config()); - downloadedSuccessfully = downloadToFmrFuture.Apply([downloadedSuccessfully] (auto& downloadFuture) { - auto downloadResult = downloadFuture.GetValueSync(); - }); - } else { - downloadedSuccessfully = MakeFuture(); - } + TFmrTableRef fmrTableRef = TFmrTableRef{outTableWithCluster.second + "." + transformedInputPath}; + uploadFmrTablesToYtFutures.emplace_back(DoUpload(fmrTableRef, sessionId, config, outputBase, ctx)); } - downloadedSuccessfully.Wait(); // blocking until download to fmr finishes - - TUploadOperationParams uploadOperationParams{ - .Input = TFmrTableRef{fmrTableId}, - .Output = TYtTableRef{outputPath, cluster} - }; - - auto clusterConnectionOptions = TClusterConnectionOptions(options.SessionId()) - .Cluster(cluster).Config(options.Config()); - auto clusterConnection = GetClusterConnection(std::move(clusterConnectionOptions)); - YQL_ENSURE(clusterConnection.Success()); - - TStartOperationRequest uploadRequest{ - .TaskType = ETaskType::Upload, - .OperationParams = uploadOperationParams, - .SessionId = sessionId, - .IdempotencyKey=idempotencyKey, - .NumRetries=1, - .ClusterConnection = TClusterConnection{ - .TransactionId = clusterConnection.TransactionId, - .YtServerName = clusterConnection.YtServerName, - .Token = clusterConnection.Token - } - }; - - auto promise = NewPromise<TPublishResult>(); - auto future = promise.GetFuture(); - YQL_CLOG(DEBUG, FastMapReduce) << "Starting upload to yt table: " << cluster + "." + outputPath; - auto uploadOperationResponseFuture = Coordinator_->StartOperation(uploadRequest); - uploadOperationResponseFuture.Subscribe([this, promise = std::move(promise), sessionId] (const auto& uploadFuture) { - TStartOperationResponse startOperationResponse = uploadFuture.GetValueSync(); - TString operationId = startOperationResponse.OperationId; - with_lock(SessionStates_->Mutex) { - YQL_ENSURE(SessionStates_->Sessions.contains(sessionId)); - auto& operationStates = SessionStates_->Sessions[sessionId].OperationStates; - auto& uploadOperationStatuses = operationStates.UploadOperationStatuses; - YQL_ENSURE(!uploadOperationStatuses.contains(operationId)); - uploadOperationStatuses[operationId] = promise; - } - }); - return future; - } - - TFuture<TDownloadTableToFmrResult> DownloadToFmrTableDataSerivce( - const TYtTableRef& ytTableRef, const TFmrTableRef& fmrTableRef, const TString& sessionId, TYtSettings::TConstPtr& config) - { - YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); - TString fmrTableId = fmrTableRef.TableId; - TDownloadOperationParams downloadOperationParams{ - .Input = ytTableRef, - .Output = {fmrTableId} - }; + auto outputPath = publish.Publish().Name().StringValue(); auto idempotencyKey = GenerateId(); - auto clusterConnectionOptions = TClusterConnectionOptions(sessionId) - .Cluster(ytTableRef.Cluster).Config(config); - auto clusterConnection = GetClusterConnection(std::move(clusterConnectionOptions)); - YQL_ENSURE(clusterConnection.Success()); - TStartOperationRequest downloadRequest{ - .TaskType = ETaskType::Download, - .OperationParams = downloadOperationParams, - .SessionId = sessionId, - .IdempotencyKey = idempotencyKey, - .NumRetries=1, - .ClusterConnection = TClusterConnection{ - .TransactionId = clusterConnection.TransactionId, - .YtServerName = clusterConnection.YtServerName, - .Token = clusterConnection.Token - } - }; - YQL_CLOG(DEBUG, FastMapReduce) << "Starting download from yt table: " << fmrTableId; - - auto promise = NewPromise<TDownloadTableToFmrResult>(); - auto future = promise.GetFuture(); - - auto downloadOperationResponseFuture = Coordinator_->StartOperation(downloadRequest); - downloadOperationResponseFuture.Subscribe([this, promise = std::move(promise), sessionId] (const auto& downloadFuture) { - TStartOperationResponse downloadOperationResponse = downloadFuture.GetValueSync(); - TString operationId = downloadOperationResponse.OperationId; - with_lock(SessionStates_->Mutex) { - auto& operationStates = SessionStates_->Sessions[sessionId].OperationStates; - auto& downloadOperationStatuses = operationStates.DownloadOperationStatuses; - YQL_ENSURE(!downloadOperationStatuses.contains(operationId)); - downloadOperationStatuses[operationId] = promise; + return WaitExceptionOrAll(uploadFmrTablesToYtFutures).Apply([&, pos = nodePos, curNode = std::move(node), options = std::move(options)] (const TFuture<void>& f) mutable { + try { + f.GetValue(); // rethrow error if any + return Slave_->Publish(curNode, ctx, std::move(options)); + } catch (...) { + return MakeFuture(ResultFromCurrentException<TPublishResult>(pos)); } }); - return future; } TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& options) override { @@ -267,29 +210,233 @@ public: YQL_ENSURE(sessions.contains(sessionId)); auto& operationStates = sessions[sessionId].OperationStates; - auto cancelOperationsFunc = [&] <typename T> (std::unordered_map<TString, TPromise<T>>& operationStatuses) { + auto cancelOperationsFunc = [&] (std::unordered_map<TString, TPromise<TFmrOperationResult>>& operationStatuses) { std::vector<TFuture<TDeleteOperationResponse>> cancelOperationsFutures; for (auto& [operationId, promise]: operationStatuses) { cancelOperationsFutures.emplace_back(Coordinator_->DeleteOperation({operationId})); } NThreading::WaitAll(cancelOperationsFutures).GetValueSync(); - for (auto& [operationId, promise]: operationStatuses) { - SendOperationCompletionSignal(promise, false); - } }; - cancelOperationsFunc(operationStates.DownloadOperationStatuses); - cancelOperationsFunc(operationStates.UploadOperationStatuses); + cancelOperationsFunc(operationStates.OperationStatuses); } Slave_->CleanupSession(std::move(options)).Wait(); return MakeFuture(); } private: + TString GenerateId() { + return GetGuidAsString(RandomProvider_->GenGuid()); + } + + TString GetUsername(const TString& sessionId) { + with_lock(SessionStates_->Mutex) { + YQL_ENSURE(SessionStates_->Sessions.contains(sessionId)); + auto& session = SessionStates_->Sessions[sessionId]; + return session.UserName; + } + } + + TString GetTransformedPath(const TString& path, const TString& sessionId, TYtSettings::TConstPtr& config) { + TString username = GetUsername(sessionId); + return NYql::TransformPath(GetTablesTmpFolder(*config), path, true, username); + } + + void SetTablePresenceStatus(const TString& fmrTableId, const TString& sessionId, ETablePresenceStatus newStatus) { + with_lock(SessionStates_->Mutex) { + auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses; + tablePresenceStatuses[fmrTableId] = newStatus; + } + } + + TMaybe<ETablePresenceStatus> GetTablePresenceStatus(const TString& fmrTableId, const TString& sessionId) { + with_lock(SessionStates_->Mutex) { + auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses; + if (!tablePresenceStatuses.contains(fmrTableId)) { + return Nothing(); + } + return tablePresenceStatuses[fmrTableId]; + } + } + + std::vector<TYtTableRef> GetMergeInputTables(const TYtMerge& ytMerge) { + auto input = ytMerge.Maybe<TYtTransientOpBase>().Cast().Input(); + std::vector<TYtTableRef> inputTables; + for (auto section: input.Cast<TYtSectionList>()) { + for (auto path: section.Paths()) { + TYtPathInfo pathInfo(path); + TYtTableRef ytTable{.Path = pathInfo.Table->Name, .Cluster = pathInfo.Table->Cluster}; + inputTables.emplace_back(ytTable); + } + } + return inputTables; + } + + TYtTableRef GetMergeOutputTable(const TYtMerge& ytMerge) { + auto output = ytMerge.Maybe<TYtOutputOpBase>().Cast().Output(); + std::vector<TYtTableRef> outputTables; + for (auto table: output) { + TYtOutTableInfo tableInfo(table); + TString outTableName = tableInfo.Name; + if (outTableName.empty()) { + outTableName = TStringBuilder() << "tmp/" << GetGuidAsString(RandomProvider_->GenGuid()); + } + outputTables.emplace_back(outTableName, tableInfo.Cluster); + } + YQL_ENSURE(outputTables.size() == 1); + return outputTables[0]; + } + + TString GetClusterFromMergeTables(const std::vector<TYtTableRef>& inputTables, TYtTableRef& outputTable) { + std::unordered_set<TString> clusters; + for (auto& [path, cluster]: inputTables) { + clusters.emplace(cluster); + } + YQL_ENSURE(clusters.size() == 1); + TString cluster = *clusters.begin(); + if (outputTable.Cluster) { + YQL_ENSURE(outputTable.Cluster == cluster); + } else { + outputTable.Cluster = cluster; + } + return cluster; + } + + TClusterConnection GetTablesClusterConnection(const TString& cluster, const TString& sessionId, TYtSettings::TConstPtr& config) { + auto clusterConnectionOptions = TClusterConnectionOptions(sessionId).Cluster(cluster).Config(config); + auto clusterConnection = GetClusterConnection(std::move(clusterConnectionOptions)); + return TClusterConnection{ + .TransactionId = clusterConnection.TransactionId, + .YtServerName = clusterConnection.YtServerName, + .Token = clusterConnection.Token + }; + } + + TFuture<TFmrOperationResult> GetRunningOperationFuture(const TStartOperationRequest& startOperationRequest, const TString& sessionId) { + auto promise = NewPromise<TFmrOperationResult>(); + auto future = promise.GetFuture(); + auto startOperationResponseFuture = Coordinator_->StartOperation(startOperationRequest); + startOperationResponseFuture.Subscribe([this, promise = std::move(promise), sessionId] (const auto& mergeFuture) { + TStartOperationResponse mergeOperationResponse = mergeFuture.GetValueSync(); + TString operationId = mergeOperationResponse.OperationId; + with_lock(SessionStates_->Mutex) { + auto& operationStates = SessionStates_->Sessions[sessionId].OperationStates; + auto& operationStatuses = operationStates.OperationStatuses; + YQL_ENSURE(!operationStatuses.contains(operationId)); + operationStatuses[operationId] = promise; + } + }); + return future; + } + + TFuture<TFmrOperationResult> DoUpload(const TFmrTableRef& fmrTableRef, const TString& sessionId, TYtSettings::TConstPtr& config, TExprNode::TPtr outputOpBase, TExprContext& ctx) { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId); + std::vector<TString> ytTableInfo; + StringSplitter(fmrTableRef.TableId).SplitByString(".").AddTo(&ytTableInfo); + YQL_ENSURE(ytTableInfo.size() == 2); + TString outputCluster = ytTableInfo[0], outputPath = ytTableInfo[1]; + auto tablePresenceStatus = GetTablePresenceStatus(fmrTableRef.TableId, sessionId); + if (!tablePresenceStatus || *tablePresenceStatus != ETablePresenceStatus::OnlyInFmr) { + YQL_CLOG(DEBUG, FastMapReduce) << " We assume table " << fmrTableRef.TableId << " should be present in yt, not uploading from fmr"; + TFmrOperationResult fmrOperationResult = TFmrOperationResult(); + fmrOperationResult.SetSuccess(); + return MakeFuture(fmrOperationResult); + } + + TUploadOperationParams uploadOperationParams{ + .Input = fmrTableRef, + .Output = TYtTableRef{.Path = outputPath, .Cluster = outputCluster} + }; + + auto clusterConnection = GetTablesClusterConnection(outputCluster, sessionId, config); + TStartOperationRequest uploadRequest{ + .TaskType = ETaskType::Upload, + .OperationParams = uploadOperationParams, + .SessionId = sessionId, + .IdempotencyKey = GenerateId(), + .NumRetries=1, + .ClusterConnection = clusterConnection, + .FmrOperationSpec = config->FmrOperationSpec.Get(outputCluster) + }; + + auto prepareOptions = TPrepareOptions(sessionId) + .Config(config); + auto prepareFuture = Slave_->Prepare(outputOpBase, ctx, std::move(prepareOptions)); + + return prepareFuture.Apply([this, uploadRequest = std::move(uploadRequest), sessionId = std::move(sessionId), fmrTableId = std::move(fmrTableRef.TableId)] (const TFuture<TRunResult>& f) { + try { + f.GetValue(); // rethrow error if any + YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId); + YQL_CLOG(DEBUG, FastMapReduce) << "Starting upload from fmr to yt for table: " << fmrTableId; + return GetRunningOperationFuture(uploadRequest, sessionId).Apply([this, sessionId = std::move(sessionId), fmrTableId = std::move(fmrTableId)] (const TFuture<TFmrOperationResult>& f) { + try { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId); + auto fmrUploadResult = f.GetValue(); + SetTablePresenceStatus(fmrTableId, sessionId, ETablePresenceStatus::Both); + return f; + } catch (...) { + YQL_CLOG(ERROR, FastMapReduce) << CurrentExceptionMessage(); + return MakeFuture(ResultFromCurrentException<TFmrOperationResult>()); + } + }); + } catch (...) { + YQL_CLOG(ERROR, FastMapReduce) << CurrentExceptionMessage(); + return MakeFuture(ResultFromCurrentException<TFmrOperationResult>()); + } + }); + } + + TFuture<TFmrOperationResult> DoMerge(const std::vector<TYtTableRef>& inputTables, TYtTableRef& outputTable, TRunOptions&& options) { + TString sessionId = options.SessionId(); + YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId); + YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); + auto cluster = GetClusterFromMergeTables(inputTables, outputTable); // Can set outputTable.Cluster if empty + + TString outputTableId = outputTable.Path, outputCluster = outputTable.Cluster; + TString transformedOutputTableId = GetTransformedPath(outputTableId, sessionId, options.Config()); + TFmrTableRef fmrOutputTable{.TableId = outputCluster + "." + transformedOutputTableId}; + + std::vector<TOperationTableRef> mergeInputTables; + for (auto& ytTable: inputTables) { + TString fmrTableId = ytTable.Cluster + "." + ytTable.Path; + auto tablePresenceStatus = GetTablePresenceStatus(fmrTableId, sessionId); + if (!tablePresenceStatus) { + SetTablePresenceStatus(fmrTableId, sessionId, ETablePresenceStatus::OnlyInYt); + } + + if (tablePresenceStatus && *tablePresenceStatus != ETablePresenceStatus::OnlyInYt) { + // table is in fmr, do not download + mergeInputTables.emplace_back(TFmrTableRef{.TableId = fmrTableId}); + } else { + mergeInputTables.emplace_back(ytTable); + } + } + + TMergeOperationParams mergeOperationParams{.Input = mergeInputTables,.Output = fmrOutputTable}; + auto clusterConnection = GetTablesClusterConnection(cluster, sessionId, options.Config()); + TStartOperationRequest mergeOperationRequest{ + .TaskType = ETaskType::Merge, + .OperationParams = mergeOperationParams, + .SessionId = sessionId, + .IdempotencyKey = GenerateId(), + .NumRetries = 1, + .ClusterConnection = clusterConnection, + .FmrOperationSpec = options.Config()->FmrOperationSpec.Get(outputCluster) + }; + + std::vector<TString> inputPaths; + std::transform(inputTables.begin(),inputTables.end(), std::back_inserter(inputPaths), [](const TYtTableRef& ytTableRef){ + return ytTableRef.Path;} + ); + + YQL_CLOG(DEBUG, FastMapReduce) << "Starting merge from yt tables: " << JoinRange(' ', inputPaths.begin(), inputPaths.end()); + return GetRunningOperationFuture(mergeOperationRequest, sessionId); + } + +private: struct TFmrGatewayOperationsState { - std::unordered_map<TString, TPromise<TPublishResult>> UploadOperationStatuses = {}; // operationId -> promise which we set when operation completes - std::unordered_map<TString, TPromise<TDownloadTableToFmrResult>> DownloadOperationStatuses = {}; + std::unordered_map<TString, TPromise<TFmrOperationResult>> OperationStatuses = {}; // operationId -> promise which we set when operation completes }; struct TSessionInfo { @@ -309,31 +456,6 @@ private: TDuration TimeToSleepBetweenGetOperationRequests_; std::thread GetOperationStatusesThread_; std::atomic<bool> StopFmrGateway_; - - TString GenerateId() { - return GetGuidAsString(RandomProvider_->GenGuid()); - } - - template <std::derived_from<NCommon::TOperationResult> T> - void SendOperationCompletionSignal(TPromise<T> promise, bool completedSuccessfully = false, const std::vector<TFmrError>& errorMessages = {}) { - YQL_ENSURE(!promise.HasValue()); - T commonOperationResult{}; - if (completedSuccessfully) { - commonOperationResult.SetSuccess(); - } else if (!errorMessages.empty()) { - auto exception = yexception() << "Operation failed with errors: " << JoinSeq(" ", errorMessages); - commonOperationResult.SetException(exception); - } - promise.SetValue(commonOperationResult); - } - - TString GetUsername(const TString& sessionId) { - with_lock(SessionStates_->Mutex) { - YQL_ENSURE(SessionStates_->Sessions.contains(sessionId)); - auto& session = SessionStates_->Sessions[sessionId]; - return session.UserName; - } - } }; } // namespace |