diff options
| author | cdzyura171 <[email protected]> | 2025-04-09 23:26:09 +0300 |
|---|---|---|
| committer | cdzyura171 <[email protected]> | 2025-04-09 23:44:51 +0300 |
| commit | 31b03d8d9aa830228c878c46c2f8e1f64f2cc872 (patch) | |
| tree | 6f1e72ee7319ccac0417cf79af2c2efa882da74c | |
| parent | b8acce08462fa7c4482205ca9bf0f2da7cf1cd36 (diff) | |
Add fmrrun, create file yt service, support multiple cluster connections
Add fmrrun, create file yt service and support multiple cluster conn, fix issues
commit_hash:d06778387ceec03820347f73b05a0281033804ed
39 files changed, 507 insertions, 232 deletions
diff --git a/yql/tools/yqlrun/lib/yqlrun_lib.cpp b/yql/tools/yqlrun/lib/yqlrun_lib.cpp index 0eba257d9a2..aad7657ea4a 100644 --- a/yql/tools/yqlrun/lib/yqlrun_lib.cpp +++ b/yql/tools/yqlrun/lib/yqlrun_lib.cpp @@ -107,14 +107,17 @@ TYqlRunTool::TYqlRunTool() AddClusterMapping(TString{"plato"}, TString{YtProviderName}); AddProviderFactory([this]() -> NYql::TDataProviderInitializer { - auto yqlNativeServices = NFile::TYtFileServices::Make(GetFuncRegistry().Get(), TablesMapping_, GetFileStorage(), TmpDir_, KeepTemp_, TablesDirMapping_); - auto ytNativeGateway = CreateYtFileGateway(yqlNativeServices); + auto ytNativeGateway = CreateYtGateway(); auto optimizerFactory = CreateCboFactory(); return GetYtNativeDataProviderInitializer(ytNativeGateway, optimizerFactory, {}); }); SetPeepholePipelineConfigurator(&PEEPHOLE_CONFIG_INSTANCE); +} +IYtGateway::TPtr TYqlRunTool::CreateYtGateway() { + auto yqlNativeServices = NFile::TYtFileServices::Make(GetFuncRegistry().Get(), TablesMapping_, GetFileStorage(), TmpDir_, KeepTemp_, TablesDirMapping_); + return CreateYtFileGateway(yqlNativeServices); } IOptimizerFactory::TPtr TYqlRunTool::CreateCboFactory() { diff --git a/yql/tools/yqlrun/lib/yqlrun_lib.h b/yql/tools/yqlrun/lib/yqlrun_lib.h index be33617c62e..ab358e28dbb 100644 --- a/yql/tools/yqlrun/lib/yqlrun_lib.h +++ b/yql/tools/yqlrun/lib/yqlrun_lib.h @@ -1,5 +1,6 @@ #pragma once +#include <yt/yql/providers/yt/provider/yql_yt_gateway.h> #include <yql/essentials/core/cbo/cbo_optimizer_new.h> #include <yql/essentials/tools/yql_facade_run/yql_facade_run.h> @@ -15,6 +16,8 @@ public: protected: virtual IOptimizerFactory::TPtr CreateCboFactory(); + virtual IYtGateway::TPtr CreateYtGateway(); + private: THashMap<TString, TString> TablesMapping_; THashMap<TString, TString> TablesDirMapping_; diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp index 2841e738332..fef5f4b110b 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp @@ -54,7 +54,7 @@ TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Do .TaskType = taskType, .OperationParams = operationParams, .IdempotencyKey = "IdempotencyKey", - .ClusterConnection = TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"} + .ClusterConnections = {{"Cluster.Path", TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}}} }; } @@ -67,7 +67,7 @@ std::vector<TStartOperationRequest> CreateSeveralOperationRequests( .TaskType = taskType, .OperationParams = operationParams, .IdempotencyKey = "IdempotencyKey_" + ToString(i), - .ClusterConnection = TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn", .Token = "token"} + .ClusterConnections = {{"Cluster.Path", TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}}} }; } return startOperationRequests; diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ya.make b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make index 5a16fbe0ce3..32ffb2ed14f 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/ya.make +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make @@ -8,6 +8,7 @@ PEERDIR( library/cpp/random_provider library/cpp/threading/future library/cpp/yson/node + yt/cpp/mapreduce/common yt/yql/providers/yt/fmr/coordinator/interface yql/essentials/utils/log yql/essentials/utils diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp index 1831e7f1acc..2d9a68042e5 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp @@ -1,4 +1,5 @@ #include <thread> +#include <yt/cpp/mapreduce/common/helpers.h> #include <yql/essentials/utils/log/log.h> #include <yql/essentials/utils/yql_panic.h> #include "yql_yt_coordinator_impl.h" @@ -37,7 +38,8 @@ public: RandomProvider_(settings.RandomProvider), StopCoordinator_(false), TimeToSleepBetweenClearKeyRequests_(settings.TimeToSleepBetweenClearKeyRequests), - IdempotencyKeyStoreTime_(settings.IdempotencyKeyStoreTime) + IdempotencyKeyStoreTime_(settings.IdempotencyKeyStoreTime), + DefaultFmrOperationSpec_(settings.DefaultFmrOperationSpec) { StartClearingIdempotencyKeys(); } @@ -62,15 +64,9 @@ public: } TString taskId = GenerateId(); - auto taskParams = MakeDefaultTaskParamsFromOperation(request.OperationParams); - TMaybe<NYT::TNode> jobSettings = Nothing(); - auto fmrOperationSpec = request.FmrOperationSpec; - if (fmrOperationSpec && fmrOperationSpec->IsMap() && fmrOperationSpec->HasKey("job_settings")) { - jobSettings = (*fmrOperationSpec)["job_settings"]; - } - TTask::TPtr createdTask = MakeTask(request.TaskType, taskId, taskParams, request.SessionId, request.ClusterConnection, jobSettings); + TTask::TPtr createdTask = MakeTask(request.TaskType, taskId, taskParams, request.SessionId, request.ClusterConnections, GetJobSettings(request.FmrOperationSpec)); Tasks_[taskId] = TCoordinatorTaskInfo{.Task = createdTask, .TaskStatus = ETaskStatus::Accepted, .OperationId = operationId}; Operations_[operationId] = {.TaskIds = {taskId}, .OperationStatus = EOperationStatus::Accepted, .SessionId = request.SessionId}; @@ -138,6 +134,8 @@ public: for (auto& requestTaskState: request.TaskStates) { auto taskId = requestTaskState->TaskId; + auto operationId = Tasks_[taskId].OperationId; + YQL_LOG_CTX_ROOT_SESSION_SCOPE(Operations_[operationId].SessionId); YQL_ENSURE(Tasks_.contains(taskId)); auto taskStatus = requestTaskState->TaskStatus; YQL_ENSURE(taskStatus != ETaskStatus::Accepted); @@ -156,6 +154,10 @@ public: tableStats.Rows >= curTableStats.Stats.Rows ); YQL_ENSURE(fmrTableId.PartId == curTableStats.PartId); + if (taskStatus == ETaskStatus::Completed) { + YQL_CLOG(DEBUG, FastMapReduce) << "Current statistic from table with id" << fmrTableId.TableId << "_" << fmrTableId.PartId << ": " << tableStats; + Cerr << "Current statistic from table with id" << fmrTableId.TableId << "_" << fmrTableId.PartId << ": " << tableStats; + } } FmrTableStatistics_[fmrTableId.TableId] = TCoordinatorFmrTableStats{ .Stats = tableStats, @@ -171,10 +173,6 @@ public: tasksToRun.emplace_back(taskToRunInfo.second.Task); } } - - for (auto& taskId: TaskToDeleteIds_) { - SetUnfinishedTaskStatus(taskId, ETaskStatus::Failed); - } return NThreading::MakeFuture(THeartbeatResponse{.TasksToRun = tasksToRun, .TaskToDeleteIds = TaskToDeleteIds_}); } @@ -323,6 +321,28 @@ private: } } + TMaybe<NYT::TNode> GetJobSettings(const TMaybe<NYT::TNode>& currentFmrOperationSpec) { + // TODO - check this works + TMaybe<NYT::TNode> defaultJobSettings = Nothing(), currentJobSettings = Nothing(); + if (currentFmrOperationSpec && currentFmrOperationSpec->HasKey("job_settings")) { + currentJobSettings = (*currentFmrOperationSpec)["job_settings"]; + } + if (DefaultFmrOperationSpec_ && DefaultFmrOperationSpec_->HasKey("job_settings")) { + defaultJobSettings = (*DefaultFmrOperationSpec_)["job_settings"]; + } + if (defaultJobSettings && !currentJobSettings) { + return defaultJobSettings; + } + if (currentJobSettings && !defaultJobSettings) { + return currentJobSettings; + } + if (!currentJobSettings && !defaultJobSettings) { + return Nothing(); + } + NYT::MergeNodes(*currentJobSettings, *defaultJobSettings); + return currentJobSettings; + } + std::unordered_map<TString, TCoordinatorTaskInfo> Tasks_; // TaskId -> current info about it std::unordered_set<TString> TaskToDeleteIds_; // TaskIds we want to pass to worker for deletion std::unordered_map<TString, TOperationInfo> Operations_; // OperationId -> current info about it @@ -337,6 +357,7 @@ private: TDuration TimeToSleepBetweenClearKeyRequests_; TDuration IdempotencyKeyStoreTime_; std::unordered_map<TString, TCoordinatorFmrTableStats> FmrTableStatistics_; // TableId -> Statistics + TMaybe<NYT::TNode> DefaultFmrOperationSpec_; }; } // namespace diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h index b3c06dbe5c3..3fea218981f 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h +++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h @@ -14,6 +14,7 @@ struct TFmrCoordinatorSettings { TIntrusivePtr<IRandomProvider> RandomProvider; TDuration IdempotencyKeyStoreTime = TDuration::Seconds(10); TDuration TimeToSleepBetweenClearKeyRequests = TDuration::Seconds(1); + TMaybe<NYT::TNode> DefaultFmrOperationSpec = Nothing(); }; IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings = {.WorkersNum = 1, .RandomProvider = CreateDeterministicRandomProvider(2)}); diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp index 8c244c3f1aa..80367f0bce2 100644 --- a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp @@ -70,8 +70,10 @@ NProto::TStartOperationRequest StartOperationRequestToProto(const TStartOperatio protoStartOperationRequest.SetIdempotencyKey(*startOperationRequest.IdempotencyKey); } protoStartOperationRequest.SetNumRetries(startOperationRequest.NumRetries); - auto protoClusterConnection = ClusterConnectionToProto(startOperationRequest.ClusterConnection); - protoStartOperationRequest.MutableClusterConnection()->Swap(&protoClusterConnection); + auto clusterConnections = *protoStartOperationRequest.MutableClusterConnections(); + for (auto& [tableName, conn]: startOperationRequest.ClusterConnections) { + clusterConnections[tableName] = ClusterConnectionToProto(conn); + } if (startOperationRequest.FmrOperationSpec) { protoStartOperationRequest.SetFmrOperationSpec(NYT::NodeToYsonString(*startOperationRequest.FmrOperationSpec)); } @@ -87,7 +89,11 @@ TStartOperationRequest StartOperationRequestFromProto(const NProto::TStartOperat startOperationRequest.IdempotencyKey = protoStartOperationRequest.GetIdempotencyKey(); } startOperationRequest.NumRetries = protoStartOperationRequest.GetNumRetries(); - startOperationRequest.ClusterConnection = ClusterConnectionFromProto(protoStartOperationRequest.GetClusterConnection()); + std::unordered_map<TString, TClusterConnection> startOperationRequestClusterConnections; + for (auto& [tableName, conn]: protoStartOperationRequest.GetClusterConnections()) { + startOperationRequestClusterConnections[tableName] = ClusterConnectionFromProto(conn); + } + startOperationRequest.ClusterConnections = startOperationRequestClusterConnections; if (protoStartOperationRequest.HasFmrOperationSpec()) { startOperationRequest.FmrOperationSpec = NYT::NodeFromYsonString(protoStartOperationRequest.GetFmrOperationSpec()); } diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h index 15ecd6c97a5..a045aa5bb01 100644 --- a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h +++ b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h @@ -25,7 +25,7 @@ struct TStartOperationRequest { TString SessionId; TMaybe<TString> IdempotencyKey = Nothing(); ui32 NumRetries = 1; // Not supported yet - TClusterConnection ClusterConnection = {}; // TODO - change to map + std::unordered_map<TString, TClusterConnection> ClusterConnections = {}; TMaybe<NYT::TNode> FmrOperationSpec = Nothing(); }; diff --git a/yt/yql/providers/yt/fmr/fmr_tool_lib/ya.make b/yt/yql/providers/yt/fmr/fmr_tool_lib/ya.make new file mode 100644 index 00000000000..b0c5e5cdf4f --- /dev/null +++ b/yt/yql/providers/yt/fmr/fmr_tool_lib/ya.make @@ -0,0 +1,21 @@ +LIBRARY() + +SRCS( + yql_yt_fmr_initializer.cpp +) + +PEERDIR( + yt/yql/providers/yt/gateway/fmr + yt/yql/providers/yt/fmr/coordinator/client + yt/yql/providers/yt/fmr/coordinator/impl + yt/yql/providers/yt/fmr/job/impl + yt/yql/providers/yt/fmr/job_factory/impl + yt/yql/providers/yt/fmr/table_data_service/local + yt/yql/providers/yt/fmr/worker/impl + yt/yql/providers/yt/fmr/yt_service/file + yt/yql/providers/yt/fmr/yt_service/impl +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp new file mode 100644 index 00000000000..3bc8e074a3a --- /dev/null +++ b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp @@ -0,0 +1,37 @@ +#include "yql_yt_fmr_initializer.h" + +namespace NYql::NFmr { + +std::pair<IYtGateway::TPtr, IFmrWorker::TPtr> InitializeFmrGateway(IYtGateway::TPtr slave, bool disableLocalFmrWorker, const TString& coordinatorServerUrl, bool isFileGateway) { + auto coordinator = MakeFmrCoordinator(); + if (!coordinatorServerUrl.empty()) { + TFmrCoordinatorClientSettings coordinatorClientSettings; + THttpURL parsedUrl; + if (parsedUrl.Parse(coordinatorServerUrl) != THttpURL::ParsedOK) { + ythrow yexception() << "Invalid fast map reduce coordinator server url passed in parameters"; + } + coordinatorClientSettings.Port = parsedUrl.GetPort(); + coordinatorClientSettings.Host = parsedUrl.GetHost(); + coordinator = MakeFmrCoordinatorClient(coordinatorClientSettings); + } + + IFmrWorker::TPtr worker = nullptr; + if (!disableLocalFmrWorker) { + auto tableDataService = MakeLocalTableDataService(TLocalTableDataServiceSettings(3)); + auto fmrYtSerivce = isFileGateway ? MakeFileYtSerivce() : MakeFmrYtSerivce(); + + auto func = [tableDataService, fmrYtSerivce] (NFmr::TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) mutable { + return RunJob(task, tableDataService, fmrYtSerivce, cancelFlag); + }; + + NFmr::TFmrJobFactorySettings settings{.Function=func}; + auto jobFactory = MakeFmrJobFactory(settings); + NFmr::TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDefaultRandomProvider(), + .TimeToSleepBetweenRequests=TDuration::Seconds(1)}; + worker = MakeFmrWorker(coordinator, jobFactory, workerSettings); + worker->Start(); + } + return std::pair<IYtGateway::TPtr, IFmrWorker::TPtr>{CreateYtFmrGateway(slave, coordinator), std::move(worker)}; +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h new file mode 100644 index 00000000000..a87522fbd71 --- /dev/null +++ b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h @@ -0,0 +1,19 @@ +#pragma once + +#include <yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h> +#include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h> +#include <yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.h> +#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h> +#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h> +#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h> +#include <yt/yql/providers/yt/fmr/table_data_service/local/yql_yt_table_data_service_local.h> +#include <yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h> +#include <yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h> + +namespace NYql::NFmr { + +constexpr TStringBuf FastMapReduceGatewayName = "fmr"; + +std::pair<IYtGateway::TPtr, IFmrWorker::TPtr> InitializeFmrGateway(IYtGateway::TPtr slave, bool disableLocalFmrWorker = false, const TString& coordinatorServerUrl = TString(), bool isFileGateway = false); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp index 7b17dee907b..482225a0fce 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp @@ -47,7 +47,7 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { TDownloadTaskParams params = TDownloadTaskParams(input, output); auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); - auto res = job->Download(params); + auto res = job->Download(params, {{"test_cluster.test_path", TClusterConnection()}}); auto err = std::get_if<TError>(&res); auto statistics = std::get_if<TStatistics>(&res); @@ -75,7 +75,7 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0); tableDataServicePtr->Put(key, GetBinaryYson(TableContent_1)); - auto res = job->Upload(params); + auto res = job->Upload(params, {{"test_cluster.test_path", TClusterConnection()}}); auto err = std::get_if<TError>(&res); @@ -85,7 +85,6 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { } Y_UNIT_TEST(MergeMixedTables) { - ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); std::vector<TTableRange> ranges = {{"test_part_id"}}; @@ -111,7 +110,7 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { tableDataServicePtr->Put(key_1, GetBinaryYson(TableContent_1)); tableDataServicePtr->Put(key_3, GetBinaryYson(TableContent_3)); - auto res = job->Merge(params); + auto res = job->Merge(params, {{"test_cluster.test_path", TClusterConnection()}}); auto err = std::get_if<TError>(&res); UNIT_ASSERT_C(!err, err->ErrorMessage); @@ -124,7 +123,6 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { Y_UNIT_TEST_SUITE(TaskRunTests) { Y_UNIT_TEST(RunDownloadTask) { - ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); TYtTableRef input = TYtTableRef("test_cluster", "test_path"); std::unordered_map<TYtTableRef, TString> inputTables{{input, TableContent_1}}; @@ -135,7 +133,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id", "test_part_id"); auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); TDownloadTaskParams params = TDownloadTaskParams(input, output); - TTask::TPtr task = MakeTask(ETaskType::Download, "test_task_id", params, "test_session_id"); + TTask::TPtr task = MakeTask(ETaskType::Download, "test_task_id", params, "test_session_id", {{"test_cluster.test_path", TClusterConnection()}}); ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed); @@ -156,7 +154,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { TYtTableRef output = TYtTableRef("test_cluster", "test_path"); TUploadTaskParams params = TUploadTaskParams(input, output); - TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", TClusterConnection()); + TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", {{"test_cluster.test_path", TClusterConnection()}}); auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0); tableDataServicePtr->Put(key, GetBinaryYson(TableContent_1)); ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; @@ -167,7 +165,6 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { } Y_UNIT_TEST(RunUploadTaskWithNoTable) { - ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); std::unordered_map<TYtTableRef, TString> inputTables, outputTables; NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables); @@ -178,15 +175,17 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { TYtTableRef output = TYtTableRef("test_cluster", "test_path"); TUploadTaskParams params = TUploadTaskParams(input, output); - TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", TClusterConnection()); + TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", {{"test_cluster.test_path", TClusterConnection()}}); // No tables in tableDataService - ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; - UNIT_ASSERT_EQUAL(status, ETaskStatus::Failed); + try { + RunJob(task, tableDataServicePtr, ytService, cancelFlag); + } catch(...) { + UNIT_ASSERT(CurrentExceptionMessage().Contains("No data for chunk:test_table_id:test_part_id")); + } } Y_UNIT_TEST(RunMergeTask) { - ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); std::vector<TTableRange> ranges = {{"test_part_id"}}; TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges}; @@ -205,7 +204,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { auto params = TMergeTaskParams(inputs, output); auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); - TTask::TPtr task = MakeTask(ETaskType::Merge, "test_task_id", params, "test_session_id", TClusterConnection()); + TTask::TPtr task = MakeTask(ETaskType::Merge, "test_task_id", params, "test_session_id", {{"test_cluster.test_path", TClusterConnection()}}); auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0); auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0); @@ -219,40 +218,6 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { UNIT_ASSERT_C(resultTableContent, "Result table content is empty"); UNIT_ASSERT_NO_DIFF(GetTextYson(*resultTableContent), TableContent_1 + TableContent_2 + TableContent_3); } - - Y_UNIT_TEST(RunMergeTaskWithNoTable) { - - ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1)); - std::unordered_map<TYtTableRef, TString> inputTables, outputTables; - NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables); - std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false); - - std::vector<TTableRange> ranges = {{"test_part_id"}}; - TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges}; - TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster"); - TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges}; - TTaskTableRef input_table_ref_1 = {input_1}; - TTaskTableRef input_table_ref_2 = {input_2}; - TTaskTableRef input_table_ref_3 = {input_3}; - TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id_output", "test_part_id"); - std::vector<TTaskTableRef> inputs = {input_table_ref_1, input_table_ref_2, input_table_ref_3}; - auto params = TMergeTaskParams(inputs, output); - auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); - - TTask::TPtr task = MakeTask(ETaskType::Merge, "test_task_id", params, "test_session_id", TClusterConnection()); - - auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0); - auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0); - tableDataServicePtr->Put(key_1, GetBinaryYson(TableContent_1)); - // No table (input_2) in Yt - tableDataServicePtr->Put(key_3, GetBinaryYson(TableContent_3)); - - ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; - - UNIT_ASSERT_EQUAL(status, ETaskStatus::Failed); - auto resultTableContent = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync(); - UNIT_ASSERT(!resultTableContent); - } } } // namespace NYql diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp index c8da1df4801..c1721456cdc 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp @@ -21,7 +21,7 @@ public: { } - virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const TClusterConnection& clusterConnection) override { + virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections) override { try { const auto ytTable = params.Input; const auto cluster = params.Input.Cluster; @@ -31,8 +31,8 @@ public: const auto partId = output.PartId; YQL_CLOG(DEBUG, FastMapReduce) << "Downloading " << cluster << '.' << path; - - auto ytTableReader = YtService_->MakeReader(ytTable, clusterConnection); // TODO - pass YtReader settings from Gateway + YQL_ENSURE(clusterConnections.size() == 1); + auto ytTableReader = YtService_->MakeReader(ytTable, clusterConnections.begin()->second); // TODO - pass YtReader settings from Gateway auto tableDataServiceWriter = TFmrTableDataServiceWriter(tableId, partId, TableDataService_, GetFmrTableDataServiceWriterSettings()); ParseRecords(*ytTableReader, tableDataServiceWriter, GetParseRecordSettings().BlockCount, GetParseRecordSettings().BlockSize); @@ -46,7 +46,7 @@ public: } } - virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const TClusterConnection& clusterConnection) override { + virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections) override { try { const auto ytTable = params.Output; const auto cluster = params.Output.Cluster; @@ -57,7 +57,8 @@ public: YQL_CLOG(DEBUG, FastMapReduce) << "Uploading " << cluster << '.' << path; auto tableDataServiceReader = TFmrTableDataServiceReader(tableId, tableRanges, TableDataService_, GetFmrTableDataServiceReaderSettings()); - auto ytTableWriter = YtService_->MakeWriter(ytTable, clusterConnection); + YQL_ENSURE(clusterConnections.size() == 1); + auto ytTableWriter = YtService_->MakeWriter(ytTable, clusterConnections.begin()->second); ParseRecords(tableDataServiceReader, *ytTableWriter, GetParseRecordSettings().BlockCount, GetParseRecordSettings().BlockSize); ytTableWriter->Flush(); @@ -67,8 +68,7 @@ public: } } - virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const TClusterConnection& clusterConnection) override { - // TODO - unordered_map<ClusterConnection> + virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections) override { // расширить таск парамс. добавить туда мету try { const auto inputs = params.Input; @@ -81,7 +81,7 @@ public: if (CancelFlag_->load()) { return TError("Canceled"); } - auto inputTableReader = GetTableInputStream(inputTableRef, clusterConnection); + auto inputTableReader = GetTableInputStream(inputTableRef, clusterConnections); ParseRecords(*inputTableReader, tableDataServiceWriter, GetParseRecordSettings().BlockCount, GetParseRecordSettings().BlockSize); } tableDataServiceWriter.Flush(); @@ -93,10 +93,12 @@ public: } private: - NYT::TRawTableReaderPtr GetTableInputStream(const TTaskTableRef& tableRef, const TClusterConnection& clusterConnection) { + NYT::TRawTableReaderPtr GetTableInputStream(const TTaskTableRef& tableRef, const std::unordered_map<TString, TClusterConnection>& clusterConnections) const { auto ytTable = std::get_if<TYtTableRef>(&tableRef); auto fmrTable = std::get_if<TFmrTableInputRef>(&tableRef); if (ytTable) { + TString tableId = ytTable->Cluster + "." + ytTable->Path; + auto clusterConnection = clusterConnections.at(tableId); return YtService_->MakeReader(*ytTable, clusterConnection); // TODO - pass YtReader settings from Gateway } else if (fmrTable) { return MakeIntrusive<TFmrTableDataServiceReader>(fmrTable->TableId, fmrTable->TableRanges, TableDataService_, GetFmrTableDataServiceReaderSettings()); @@ -105,15 +107,15 @@ private: } } - TParseRecordSettings GetParseRecordSettings() { + TParseRecordSettings GetParseRecordSettings() const { return Settings_ ? Settings_->ParseRecordSettings : TParseRecordSettings(); } - TFmrTableDataServiceReaderSettings GetFmrTableDataServiceReaderSettings() { + TFmrTableDataServiceReaderSettings GetFmrTableDataServiceReaderSettings() const { return Settings_ ? Settings_->FmrTableDataServiceReaderSettings : TFmrTableDataServiceReaderSettings(); } - TFmrTableDataServiceWriterSettings GetFmrTableDataServiceWriterSettings() { + TFmrTableDataServiceWriterSettings GetFmrTableDataServiceWriterSettings() const { return Settings_ ? Settings_->FmrTableDataServiceWriterSettings : TFmrTableDataServiceWriterSettings(); } @@ -147,27 +149,23 @@ TJobResult RunJob( using T = std::decay_t<decltype(taskParams)>; if constexpr (std::is_same_v<T, TUploadTaskParams>) { - return job->Upload(taskParams, task->ClusterConnection); + return job->Upload(taskParams, task->ClusterConnections); } else if constexpr (std::is_same_v<T, TDownloadTaskParams>) { - return job->Download(taskParams, task->ClusterConnection); + return job->Download(taskParams, task->ClusterConnections); } else if constexpr (std::is_same_v<T, TMergeTaskParams>) { - return job->Merge(taskParams, task->ClusterConnection); + return job->Merge(taskParams, task->ClusterConnections); } else { throw std::runtime_error{"Unsupported task type"}; } }; std::variant<TError, TStatistics> taskResult = std::visit(processTask, task->TaskParams); - auto err = std::get_if<TError>(&taskResult); - if (err) { - YQL_CLOG(ERROR, FastMapReduce) << "Task failed: " << err->ErrorMessage; - return {ETaskStatus::Failed, TStatistics()}; + ythrow yexception() << "Job failed with error: " << err->ErrorMessage; } auto statistics = std::get_if<TStatistics>(&taskResult); - return {ETaskStatus::Completed, *statistics}; }; diff --git a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h index 4b9c13b04bf..022a6256db0 100644 --- a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h +++ b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h @@ -10,11 +10,11 @@ public: virtual ~IFmrJob() = default; - virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const TClusterConnection& clusterConnection = TClusterConnection()) = 0; + virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections = {}) = 0; - virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const TClusterConnection& clusterConnection = TClusterConnection()) = 0; + virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections = {}) = 0; - virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const TClusterConnection& clusterConnection = TClusterConnection()) = 0; + virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections = {}) = 0; }; } // namespace NYql diff --git a/yt/yql/providers/yt/fmr/proto/coordinator.proto b/yt/yql/providers/yt/fmr/proto/coordinator.proto index a104cf26eac..34e16ed26b5 100644 --- a/yt/yql/providers/yt/fmr/proto/coordinator.proto +++ b/yt/yql/providers/yt/fmr/proto/coordinator.proto @@ -21,7 +21,7 @@ message TStartOperationRequest { string SessionId = 3; optional string IdempotencyKey = 4; uint32 NumRetries = 5; - TClusterConnection ClusterConnection = 6; + map<string, TClusterConnection> ClusterConnections = 6; optional string FmrOperationSpec = 7; } diff --git a/yt/yql/providers/yt/fmr/proto/request_options.proto b/yt/yql/providers/yt/fmr/proto/request_options.proto index 4bd35eacaa4..62c917b4e50 100644 --- a/yt/yql/providers/yt/fmr/proto/request_options.proto +++ b/yt/yql/providers/yt/fmr/proto/request_options.proto @@ -46,6 +46,7 @@ message TFmrError { message TYtTableRef { string Path = 1; string Cluster = 2; + optional string FilePath = 3; } message TFmrTableRef { @@ -155,7 +156,7 @@ message TTask { TTaskParams TaskParams = 3; string SessionId = 4; optional uint32 NumRetries = 5; - TClusterConnection ClusterConnection = 6; + map<string, TClusterConnection> ClusterConnections = 6; optional string JobSettings = 7; } diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp index 0860c009962..20dfb5362a9 100644 --- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp +++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp @@ -39,6 +39,9 @@ NProto::TYtTableRef YtTableRefToProto(const TYtTableRef& ytTableRef) { NProto::TYtTableRef protoYtTableRef; protoYtTableRef.SetPath(ytTableRef.Path); protoYtTableRef.SetCluster(ytTableRef.Cluster); + if (ytTableRef.FilePath) { + protoYtTableRef.SetFilePath(*ytTableRef.FilePath); + } return protoYtTableRef; } @@ -46,6 +49,9 @@ TYtTableRef YtTableRefFromProto(const NProto::TYtTableRef protoYtTableRef) { TYtTableRef ytTableRef; ytTableRef.Path = protoYtTableRef.GetPath(); ytTableRef.Cluster = protoYtTableRef.GetCluster(); + if (protoYtTableRef.HasFilePath()) { + ytTableRef.FilePath = protoYtTableRef.GetFilePath(); + } return ytTableRef; } @@ -398,8 +404,10 @@ NProto::TTask TaskToProto(const TTask& task) { protoTask.MutableTaskParams()->Swap(&taskParams); protoTask.SetSessionId(task.SessionId); protoTask.SetNumRetries(task.NumRetries); - auto clusterConnection = ClusterConnectionToProto(task.ClusterConnection); - protoTask.MutableClusterConnection()->Swap(&clusterConnection); + auto clusterConnections = *protoTask.MutableClusterConnections(); + for (auto& [tableName, conn]: task.ClusterConnections) { + clusterConnections[tableName] = ClusterConnectionToProto(conn); + } if (task.JobSettings) { protoTask.SetJobSettings(NYT::NodeToYsonString(*task.JobSettings)); } @@ -413,7 +421,11 @@ TTask TaskFromProto(const NProto::TTask& protoTask) { task.TaskParams = TaskParamsFromProto(protoTask.GetTaskParams()); task.SessionId = protoTask.GetSessionId(); task.NumRetries = protoTask.GetNumRetries(); - task.ClusterConnection = ClusterConnectionFromProto(protoTask.GetClusterConnection()); + std::unordered_map<TString, TClusterConnection> taskClusterConnections; + for (auto& [tableName, conn]: protoTask.GetClusterConnections()) { + taskClusterConnections[tableName] = ClusterConnectionFromProto(conn); + } + task.ClusterConnections = taskClusterConnections; if (protoTask.HasJobSettings()) { task.JobSettings = NYT::NodeFromYsonString(protoTask.GetJobSettings()); } diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp index 0dc3650855c..8a0bcde5751 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp @@ -2,8 +2,8 @@ namespace NYql::NFmr { -TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection, const TMaybe<NYT::TNode>& jobSettings) { - return MakeIntrusive<TTask>(taskType, taskId, taskParams, sessionId, clusterConnection, jobSettings); +TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const std::unordered_map<TString, TClusterConnection>& clusterConnections, const TMaybe<NYT::TNode>& jobSettings) { + return MakeIntrusive<TTask>(taskType, taskId, taskParams, sessionId, clusterConnections, jobSettings); } TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& taskErrorMessage, const TStatistics& stats) { @@ -31,3 +31,8 @@ template<> void Out<NYql::NFmr::TFmrChunkMeta>(IOutputStream& out, const NYql::NFmr::TFmrChunkMeta& meta) { out << meta.ToString(); } + +template<> +void Out<NYql::NFmr::TTableStats>(IOutputStream& out, const NYql::NFmr::TTableStats& tableStats) { + out << tableStats.Chunks << " chunks, " << tableStats.Rows << " rows, " << tableStats.DataWeight << " data weight"; +} diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h index de18d91fa09..bab3df01aae 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h @@ -57,6 +57,8 @@ struct TError { struct TYtTableRef { TString Path; TString Cluster; + TMaybe<TString> FilePath = Nothing(); + bool operator == (const TYtTableRef&) const = default; }; @@ -168,8 +170,8 @@ struct TClusterConnection { struct TTask: public TThrRefBase { TTask() = default; - TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection, const TMaybe<NYT::TNode> & jobSettings = Nothing(), ui32 numRetries = 1) - : TaskType(taskType), TaskId(taskId), TaskParams(taskParams), SessionId(sessionId), ClusterConnection(clusterConnection), JobSettings(jobSettings), NumRetries(numRetries) + TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const std::unordered_map<TString, TClusterConnection>& clusterConnections, const TMaybe<NYT::TNode> & jobSettings = Nothing(), ui32 numRetries = 1) + : TaskType(taskType), TaskId(taskId), TaskParams(taskParams), SessionId(sessionId), ClusterConnections(clusterConnections), JobSettings(jobSettings), NumRetries(numRetries) { } @@ -177,7 +179,7 @@ struct TTask: public TThrRefBase { TString TaskId; TTaskParams TaskParams = {}; TString SessionId; - TClusterConnection ClusterConnection = {}; + std::unordered_map<TString, TClusterConnection> ClusterConnections = {}; TMaybe<NYT::TNode> JobSettings = {}; ui32 NumRetries; // Not supported yet @@ -199,7 +201,7 @@ struct TTaskState: public TThrRefBase { using TPtr = TIntrusivePtr<TTaskState>; }; -TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection = TClusterConnection{}, const TMaybe<NYT::TNode>& jobSettings = Nothing()); +TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const std::unordered_map<TString, TClusterConnection>& clusterConnections = {}, const TMaybe<NYT::TNode>& jobSettings = Nothing()); TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& taskErrorMessage = Nothing(), const TStatistics& stats = TStatistics()); diff --git a/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp b/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp index bec70aa6ed4..85e9599f8f8 100644 --- a/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp +++ b/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp @@ -16,6 +16,7 @@ Y_UNIT_TEST_SUITE(ParseRecordTests) { std::unordered_map<TYtTableRef, TString> outputTables; auto ytService = MakeMockYtService(inputTables, outputTables); + auto reader = ytService->MakeReader(testYtTable, TClusterConnection()); auto writer = ytService->MakeWriter(testYtTable, TClusterConnection()); ParseRecords(*reader, *writer, 1, 10); diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp index db1aa21c430..2007cec3011 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp +++ b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp @@ -20,7 +20,7 @@ TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Do .TaskType = taskType, .OperationParams = operationParams, .IdempotencyKey = "IdempotencyKey", - .ClusterConnection = TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"} + .ClusterConnections = {{"Cluster.Path", TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}}} }; } diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h index bff6ddff9e3..1e18f2aa5b2 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h +++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h @@ -1,3 +1,5 @@ +#pragma once + #include <library/cpp/random_provider/random_provider.h> #include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h> #include <yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h> diff --git a/yt/yql/providers/yt/fmr/yt_service/file/ut/ya.make b/yt/yql/providers/yt/fmr/yt_service/file/ut/ya.make new file mode 100644 index 00000000000..6f4ff700b0d --- /dev/null +++ b/yt/yql/providers/yt/fmr/yt_service/file/ut/ya.make @@ -0,0 +1,14 @@ +UNITTEST() + +SRCS( + yql_yt_file_yt_service_ut.cpp +) + +PEERDIR( + yt/yql/providers/yt/fmr/yt_service/file + yt/yql/providers/yt/gateway/file +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/yt_service/file/ut/yql_yt_file_yt_service_ut.cpp b/yt/yql/providers/yt/fmr/yt_service/file/ut/yql_yt_file_yt_service_ut.cpp new file mode 100644 index 00000000000..acc5867c454 --- /dev/null +++ b/yt/yql/providers/yt/fmr/yt_service/file/ut/yql_yt_file_yt_service_ut.cpp @@ -0,0 +1,32 @@ +#include <library/cpp/testing/unittest/registar.h> +#include <util/stream/file.h> +#include <yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h> +#include <yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h> // TODO - REMOVE + +namespace NYql::NFmr { + +Y_UNIT_TEST_SUITE(FileYtServiceTest) { + Y_UNIT_TEST(CheckReaderAndWriter) { + TString inputYsonContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};\n" + "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};\n"; + + TTempFileHandle file{}; + TYtTableRef ytTable{.Path = "test_path", .Cluster = "hahn", .FilePath = file.Name()}; + + auto fileService = MakeFileYtSerivce(); + auto writer = fileService->MakeWriter(ytTable, TClusterConnection()); + writer->Write(inputYsonContent.data(), inputYsonContent.size()); + writer->Flush(); + + TFileInput input(file.Name()); + + auto reader = fileService->MakeReader(ytTable, TClusterConnection()); + TStringStream binaryYsonStream; + TStringStream textYsonStream; + binaryYsonStream << reader->ReadAll(); + NYson::ReformatYsonStream(&binaryYsonStream, &textYsonStream, NYson::EYsonFormat::Text, ::NYson::EYsonType::ListFragment); + UNIT_ASSERT(textYsonStream.ReadAll().Contains(inputYsonContent)); + } +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/yt_service/file/ya.make b/yt/yql/providers/yt/fmr/yt_service/file/ya.make new file mode 100644 index 00000000000..dcd0969e3e6 --- /dev/null +++ b/yt/yql/providers/yt/fmr/yt_service/file/ya.make @@ -0,0 +1,19 @@ +LIBRARY() + +SRCS( + yql_yt_file_yt_service.cpp +) + +PEERDIR( + library/cpp/yson + yt/yql/providers/yt/gateway/file + yt/yql/providers/yt/fmr/yt_service/interface + yt/yql/providers/yt/lib/yson_helpers + yql/essentials/utils +) + +YQL_LAST_ABI_VERSION() + +END() + +RECURSE_FOR_TESTS(ut) diff --git a/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.cpp b/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.cpp new file mode 100644 index 00000000000..1c33ed430e9 --- /dev/null +++ b/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.cpp @@ -0,0 +1,68 @@ +#include "yql_yt_file_yt_service.h" +#include <library/cpp/yson/parser.h> +#include <util/stream/file.h> +#include <yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h> +#include <yt/yql/providers/yt/lib/yson_helpers/yson_helpers.h> +#include <yql/essentials/utils/yql_panic.h> + +namespace NYql::NFmr { + +namespace { + +class TFileYtTableWriter: public NYT::TRawTableWriter { + public: + TFileYtTableWriter(const TString& filePath): FilePath_(filePath) {} + + void NotifyRowEnd() override { + } + + private: + void DoWrite(const void* buf, size_t len) override { + Buffer_.Append(static_cast<const char*>(buf), len); + } + + void DoFlush() override { + TMemoryInput input(Buffer_.data(), Buffer_.size()); + TFileOutput outputFileStream(FilePath_); + TDoubleHighPrecisionYsonWriter writer(&outputFileStream, ::NYson::EYsonType::ListFragment); + NYson::TYsonParser parser(&writer, &input, ::NYson::EYsonType::ListFragment); + parser.Parse(); + Buffer_.Clear(); + } + + TString FilePath_; + TBuffer Buffer_; + }; + + +class TFileYtService: public NYql::NFmr::IYtService { +public: + + NYT::TRawTableReaderPtr MakeReader( + const TYtTableRef& ytTable, + const TClusterConnection& /*clusterConnection*/, + const TYtReaderSettings& /*readerSettings*/ + ) override { + YQL_ENSURE(ytTable.FilePath); + auto textYsonInputs = NFile::MakeTextYsonInputs({{*ytTable.FilePath, NFile::TColumnsInfo{}}}, false); + return textYsonInputs[0]; + } + + NYT::TRawTableWriterPtr MakeWriter( + const TYtTableRef& ytTable, + const TClusterConnection& /*clusterConnection*/, + const TYtWriterSettings& /*writerSettings*/ + ) override { + YQL_ENSURE(ytTable.FilePath); + return MakeIntrusive<TFileYtTableWriter>(*ytTable.FilePath); + } + +}; + +} // namespace + +IYtService::TPtr MakeFileYtSerivce() { + return MakeIntrusive<TFileYtService>(); +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h b/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h new file mode 100644 index 00000000000..ea5e8027b9a --- /dev/null +++ b/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h @@ -0,0 +1,7 @@ +#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h> + +namespace NYql::NFmr { + +IYtService::TPtr MakeFileYtSerivce(); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp index 429880e66e1..a3caeb351c2 100644 --- a/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp +++ b/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp @@ -1596,6 +1596,9 @@ private: return TClusterConnectionResult(); } + TMaybe<TString> GetTableFilePath(const TGetTableFilePathOptions&& options) override { + return Services_->GetTablePath(options.Cluster(), options.Path(), options.IsTemp()); + } private: TYtFileServices::TPtr Services_; diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.cpp index ec42b70bfdf..7a84a895fab 100644 --- a/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.cpp +++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.cpp @@ -10,16 +10,18 @@ namespace NYql::NFile { class TTextYsonInput: public NYT::TRawTableReader { public: - TTextYsonInput(const TString& file, const TColumnsInfo& columnsInfo) { + TTextYsonInput(const TString& file, const TColumnsInfo& columnsInfo, bool addRowIndex) { TIFStream in(file); TBinaryYsonWriter writer(&Input_, ::NYson::EYsonType::ListFragment); - writer.OnBeginAttributes(); - writer.OnKeyedItem("row_index"); - writer.OnInt64Scalar(0); - writer.OnEndAttributes(); - writer.OnEntity(); - writer.OnListItem(); + if (addRowIndex) { + writer.OnBeginAttributes(); + writer.OnKeyedItem("row_index"); + writer.OnInt64Scalar(0); + writer.OnEndAttributes(); + writer.OnEntity(); + writer.OnListItem(); + } NYT::NYson::IYsonConsumer* consumer = &writer; THolder<TColumnFilteringConsumer> filter; if (columnsInfo.Columns || columnsInfo.RenameColumns) { @@ -59,16 +61,16 @@ private: TBufferStream Input_; }; -TVector<NYT::TRawTableReaderPtr> MakeTextYsonInputs(const TVector<std::pair<TString, TColumnsInfo>>& files) { +TVector<NYT::TRawTableReaderPtr> MakeTextYsonInputs(const TVector<std::pair<TString, TColumnsInfo>>& files, bool addRowIndex) { TVector<NYT::TRawTableReaderPtr> rawReaders; for (auto& file: files) { if (!NFs::Exists(file.first)) { rawReaders.emplace_back(nullptr); continue; } - rawReaders.emplace_back(MakeIntrusive<TTextYsonInput>(file.first, file.second)); + rawReaders.emplace_back(MakeIntrusive<TTextYsonInput>(file.first, file.second, addRowIndex)); } return rawReaders; } -} //namespace NYql::NFile
\ No newline at end of file +} //namespace NYql::NFile diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h b/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h index 85b903c84b0..a28606598d3 100644 --- a/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h +++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h @@ -9,6 +9,6 @@ struct TColumnsInfo { TMaybe<NYT::TRichYPath::TRenameColumnsDescriptor> RenameColumns; }; -TVector<NYT::TRawTableReaderPtr> MakeTextYsonInputs(const TVector<std::pair<TString, TColumnsInfo>>& files); +TVector<NYT::TRawTableReaderPtr> MakeTextYsonInputs(const TVector<std::pair<TString, TColumnsInfo>>& files, bool addRowIndex = true); -}//namespace NYql::NFile
\ No newline at end of file +} // namespace NYql::NFile diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp index dd5160d07c2..5860a65366d 100644 --- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp +++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp @@ -23,14 +23,14 @@ using namespace NYql::NNodes; namespace NYql::NFmr { -namespace { - enum class ETablePresenceStatus { OnlyInYt, OnlyInFmr, Both }; +namespace { + struct TFmrOperationResult: public NCommon::TOperationResult {}; class TFmrYtGateway final: public TYtForwardingGatewayBase { @@ -109,7 +109,7 @@ public: if (auto op = opBase.Maybe<TYtMerge>()) { auto ytMerge = op.Cast(); - std::vector<TYtTableRef> inputTables = GetMergeInputTables(ytMerge); + auto inputTables = GetMergeInputTables(ytMerge); TYtTableRef outputTable = GetMergeOutputTable(ytMerge); auto future = DoMerge(inputTables, outputTable, std::move(options)); return future.Apply([this, pos = nodePos, outputTable = std::move(outputTable), options = std::move(options)] (const TFuture<TFmrOperationResult>& f) { @@ -117,8 +117,7 @@ public: f.GetValue(); // rethrow error if any TString sessionId = options.SessionId(); auto config = options.Config(); - TString transformedOutputTableId = GetTransformedPath(outputTable.Path, sessionId, config); - TString fmrOutputTableId = outputTable.Cluster + "." + transformedOutputTableId; + TString fmrOutputTableId = outputTable.Cluster + "." + outputTable.Path; SetTablePresenceStatus(fmrOutputTableId, sessionId, ETablePresenceStatus::OnlyInFmr); TRunResult result; result.OutTableStats.emplace_back(outputTable.Path, MakeIntrusive<TYtTableStatInfo>()); // TODO - add statistics? @@ -145,19 +144,32 @@ public: auto config = options.Config(); std::vector<TFuture<TFmrOperationResult>> uploadFmrTablesToYtFutures; + auto outputPath = publish.Publish().Name().StringValue(); + + bool isAnonymous = NYql::HasSetting(publish.Publish().Settings().Ref(), EYtSettingType::Anonymous); + std::vector<TString> currentAnonymousTableAliases; for (auto out: publish.Input()) { - auto outTableWithCluster = GetOutTableWithCluster(out); + auto inputCluster = GetOutTableWithCluster(out).second; auto outTable = GetOutTable(out).Cast<TYtOutTable>(); TStringBuf inputPath = outTable.Name().Value(); - TString transformedInputPath = GetTransformedPath(ToString(inputPath), sessionId, config); + if (isAnonymous) { + currentAnonymousTableAliases.emplace_back(inputCluster + "." + inputPath); + } auto outputBase = out.Operation().Cast<TYtOutputOpBase>().Ptr(); + uploadFmrTablesToYtFutures.emplace_back(DoUpload(inputCluster, TString(inputPath), sessionId, config, outputBase, ctx)); + } + + if (isAnonymous) { + YQL_CLOG(DEBUG, FastMapReduce) << "Table " << outputPath << " is anonymous, not uploading from fmr to yt"; + TString fmrOutputTableId = cluster + "." + outputPath; + SetTablePresenceStatus(fmrOutputTableId, sessionId, ETablePresenceStatus::OnlyInFmr); - TFmrTableRef fmrTableRef = TFmrTableRef{outTableWithCluster.second + "." + transformedInputPath}; - uploadFmrTablesToYtFutures.emplace_back(DoUpload(fmrTableRef, sessionId, config, outputBase, ctx)); + // TODO - figure out what to do here in case of multiple inputs + SetFmrIdAlias(fmrOutputTableId, currentAnonymousTableAliases[0], sessionId); + return Slave_->Publish(node, ctx, std::move(options)); } - auto outputPath = publish.Publish().Name().StringValue(); auto idempotencyKey = GenerateId(); return WaitExceptionOrAll(uploadFmrTablesToYtFutures).Apply([&, pos = nodePos, curNode = std::move(node), options = std::move(options)] (const TFuture<void>& f) mutable { @@ -170,10 +182,6 @@ public: }); } - TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& options) override { - return Slave_->GetClusterConnection(std::move(options)); - } - void OpenSession(TOpenSessionOptions&& options) final { TString sessionId = options.SessionId(); YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); @@ -230,23 +238,34 @@ private: return GetGuidAsString(RandomProvider_->GenGuid()); } - TString GetUsername(const TString& sessionId) { + TString GetRealTablePath(const TString& sessionId, const TString& cluster, const TString& path, TYtSettings::TConstPtr& config) { + auto richPath = Slave_->GetWriteTable(sessionId, cluster, path, GetTablesTmpFolder(*config)); + return richPath.Path_; + } + + void SetTablePresenceStatus(const TString& fmrTableId, const TString& sessionId, ETablePresenceStatus newStatus) { with_lock(SessionStates_->Mutex) { - YQL_ENSURE(SessionStates_->Sessions.contains(sessionId)); - auto& session = SessionStates_->Sessions[sessionId]; - return session.UserName; + YQL_CLOG(DEBUG, FastMapReduce) << "Setting table presence status " << newStatus << " for table with id " << fmrTableId; + auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses; + tablePresenceStatuses[fmrTableId] = newStatus; } } - TString GetTransformedPath(const TString& path, const TString& sessionId, TYtSettings::TConstPtr& config) { - TString username = GetUsername(sessionId); - return NYql::TransformPath(GetTablesTmpFolder(*config), path, true, username); + void SetFmrIdAlias(const TString& fmrTableId, const TString& alias, const TString& sessionId) { + with_lock(SessionStates_->Mutex) { + YQL_CLOG(DEBUG, FastMapReduce) << "Setting table fmr id alias " << alias << " for table with id " << fmrTableId; + auto& fmrIdAliases = SessionStates_->Sessions[sessionId].FmrIdAliases; + fmrIdAliases[fmrTableId] = alias; + } } - void SetTablePresenceStatus(const TString& fmrTableId, const TString& sessionId, ETablePresenceStatus newStatus) { + TString GetFmrIdOrAlias(const TString& fmrTableId, const TString& sessionId) { with_lock(SessionStates_->Mutex) { - auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses; - tablePresenceStatuses[fmrTableId] = newStatus; + auto& fmrIdAliases = SessionStates_->Sessions[sessionId].FmrIdAliases; + if (!fmrIdAliases.contains(fmrTableId)) { + return fmrTableId; + } + return fmrIdAliases[fmrTableId]; } } @@ -260,14 +279,14 @@ private: } } - std::vector<TYtTableRef> GetMergeInputTables(const TYtMerge& ytMerge) { + std::vector<std::pair<TYtTableRef, bool>> GetMergeInputTables(const TYtMerge& ytMerge) { auto input = ytMerge.Maybe<TYtTransientOpBase>().Cast().Input(); - std::vector<TYtTableRef> inputTables; + std::vector<std::pair<TYtTableRef, bool>> inputTables; for (auto section: input.Cast<TYtSectionList>()) { for (auto path: section.Paths()) { TYtPathInfo pathInfo(path); TYtTableRef ytTable{.Path = pathInfo.Table->Name, .Cluster = pathInfo.Table->Cluster}; - inputTables.emplace_back(ytTable); + inputTables.emplace_back(ytTable, pathInfo.Table->IsTemp); } } return inputTables; @@ -288,22 +307,7 @@ private: return outputTables[0]; } - TString GetClusterFromMergeTables(const std::vector<TYtTableRef>& inputTables, TYtTableRef& outputTable) { - std::unordered_set<TString> clusters; - for (auto& [path, cluster]: inputTables) { - clusters.emplace(cluster); - } - YQL_ENSURE(clusters.size() == 1); - TString cluster = *clusters.begin(); - if (outputTable.Cluster) { - YQL_ENSURE(outputTable.Cluster == cluster); - } else { - outputTable.Cluster = cluster; - } - return cluster; - } - - TClusterConnection GetTablesClusterConnection(const TString& cluster, const TString& sessionId, TYtSettings::TConstPtr& config) { + TClusterConnection GetTableClusterConnection(const TString& cluster, const TString& sessionId, TYtSettings::TConstPtr& config) { auto clusterConnectionOptions = TClusterConnectionOptions(sessionId).Cluster(cluster).Config(config); auto clusterConnection = GetClusterConnection(std::move(clusterConnectionOptions)); return TClusterConnection{ @@ -330,33 +334,30 @@ private: return future; } - TFuture<TFmrOperationResult> DoUpload(const TFmrTableRef& fmrTableRef, const TString& sessionId, TYtSettings::TConstPtr& config, TExprNode::TPtr outputOpBase, TExprContext& ctx) { + TFuture<TFmrOperationResult> DoUpload(const TString& outputCluster, const TString& outputPath, const TString& sessionId, TYtSettings::TConstPtr& config, TExprNode::TPtr outputOpBase, TExprContext& ctx) { YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId); - std::vector<TString> ytTableInfo; - StringSplitter(fmrTableRef.TableId).SplitByString(".").AddTo(&ytTableInfo); - YQL_ENSURE(ytTableInfo.size() == 2); - TString outputCluster = ytTableInfo[0], outputPath = ytTableInfo[1]; + TFmrTableRef fmrTableRef{outputCluster + "." + outputPath}; auto tablePresenceStatus = GetTablePresenceStatus(fmrTableRef.TableId, sessionId); + if (!tablePresenceStatus || *tablePresenceStatus != ETablePresenceStatus::OnlyInFmr) { YQL_CLOG(DEBUG, FastMapReduce) << " We assume table " << fmrTableRef.TableId << " should be present in yt, not uploading from fmr"; - TFmrOperationResult fmrOperationResult = TFmrOperationResult(); - fmrOperationResult.SetSuccess(); - return MakeFuture(fmrOperationResult); + return GetSuccessfulFmrOperationResult(); } - TUploadOperationParams uploadOperationParams{ - .Input = fmrTableRef, - .Output = TYtTableRef{.Path = outputPath, .Cluster = outputCluster} - }; + TString realPath = GetRealTablePath(sessionId, outputCluster, outputPath, config); + TYtTableRef outputTable{.Path = realPath, .Cluster = outputCluster}; + outputTable.FilePath = GetTableFilePath(TGetTableFilePathOptions(sessionId).Cluster(outputCluster).Path(outputPath).IsTemp(true)); + + TUploadOperationParams uploadOperationParams{.Input = fmrTableRef, .Output = outputTable}; - auto clusterConnection = GetTablesClusterConnection(outputCluster, sessionId, config); + auto clusterConnection = GetTableClusterConnection(outputCluster, sessionId, config); TStartOperationRequest uploadRequest{ .TaskType = ETaskType::Upload, .OperationParams = uploadOperationParams, .SessionId = sessionId, .IdempotencyKey = GenerateId(), .NumRetries=1, - .ClusterConnection = clusterConnection, + .ClusterConnections = std::unordered_map<TString, TClusterConnection>{{fmrTableRef.TableId, clusterConnection}}, .FmrOperationSpec = config->FmrOperationSpec.Get(outputCluster) }; @@ -364,9 +365,9 @@ private: .Config(config); auto prepareFuture = Slave_->Prepare(outputOpBase, ctx, std::move(prepareOptions)); - return prepareFuture.Apply([this, uploadRequest = std::move(uploadRequest), sessionId = std::move(sessionId), fmrTableId = std::move(fmrTableRef.TableId)] (const TFuture<TRunResult>& f) { + return prepareFuture.Apply([this, uploadRequest = std::move(uploadRequest), sessionId = std::move(sessionId), fmrTableId = std::move(fmrTableRef.TableId)] (const TFuture<TRunResult>& f) mutable { try { - f.GetValue(); // rethrow error if any + f.GetValue(); YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId); YQL_CLOG(DEBUG, FastMapReduce) << "Starting upload from fmr to yt for table: " << fmrTableId; return GetRunningOperationFuture(uploadRequest, sessionId).Apply([this, sessionId = std::move(sessionId), fmrTableId = std::move(fmrTableId)] (const TFuture<TFmrOperationResult>& f) { @@ -387,19 +388,22 @@ private: }); } - TFuture<TFmrOperationResult> DoMerge(const std::vector<TYtTableRef>& inputTables, TYtTableRef& outputTable, TRunOptions&& options) { + TFuture<TFmrOperationResult> DoMerge(const std::vector<std::pair<TYtTableRef, bool>>& inputTables, TYtTableRef& outputTable, TRunOptions&& options) { TString sessionId = options.SessionId(); YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId); YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); - auto cluster = GetClusterFromMergeTables(inputTables, outputTable); // Can set outputTable.Cluster if empty + if (outputTable.Cluster.empty()) { + outputTable.Cluster = inputTables[0].first.Cluster; + } - TString outputTableId = outputTable.Path, outputCluster = outputTable.Cluster; - TString transformedOutputTableId = GetTransformedPath(outputTableId, sessionId, options.Config()); - TFmrTableRef fmrOutputTable{.TableId = outputCluster + "." + transformedOutputTableId}; + TString outputCluster = outputTable.Cluster, outputPath = outputTable.Path; + TFmrTableRef fmrOutputTable{.TableId = outputCluster + "." + outputPath}; std::vector<TOperationTableRef> mergeInputTables; - for (auto& ytTable: inputTables) { - TString fmrTableId = ytTable.Cluster + "." + ytTable.Path; + std::unordered_map<TString, TClusterConnection> clusterConnections; + for (auto [ytTable, isTemp]: inputTables) { + TString inputCluster = ytTable.Cluster, inputPath = ytTable.Path; + TString fmrTableId = inputCluster + "." + inputPath; auto tablePresenceStatus = GetTablePresenceStatus(fmrTableId, sessionId); if (!tablePresenceStatus) { SetTablePresenceStatus(fmrTableId, sessionId, ETablePresenceStatus::OnlyInYt); @@ -407,33 +411,40 @@ private: if (tablePresenceStatus && *tablePresenceStatus != ETablePresenceStatus::OnlyInYt) { // table is in fmr, do not download - mergeInputTables.emplace_back(TFmrTableRef{.TableId = fmrTableId}); + mergeInputTables.emplace_back(TFmrTableRef{.TableId = GetFmrIdOrAlias(fmrTableId, sessionId)}); } else { + ytTable.FilePath = GetTableFilePath(TGetTableFilePathOptions(sessionId).Cluster(inputCluster).Path(inputPath).IsTemp(isTemp)); mergeInputTables.emplace_back(ytTable); + clusterConnections.emplace(fmrTableId, GetTableClusterConnection(ytTable.Cluster, sessionId, options.Config())); } } TMergeOperationParams mergeOperationParams{.Input = mergeInputTables,.Output = fmrOutputTable}; - auto clusterConnection = GetTablesClusterConnection(cluster, sessionId, options.Config()); TStartOperationRequest mergeOperationRequest{ .TaskType = ETaskType::Merge, .OperationParams = mergeOperationParams, .SessionId = sessionId, .IdempotencyKey = GenerateId(), .NumRetries = 1, - .ClusterConnection = clusterConnection, + .ClusterConnections = clusterConnections, .FmrOperationSpec = options.Config()->FmrOperationSpec.Get(outputCluster) }; std::vector<TString> inputPaths; - std::transform(inputTables.begin(),inputTables.end(), std::back_inserter(inputPaths), [](const TYtTableRef& ytTableRef){ - return ytTableRef.Path;} + std::transform(inputTables.begin(),inputTables.end(), std::back_inserter(inputPaths), [](const std::pair<TYtTableRef, bool>& table){ + return table.first.Path;} ); YQL_CLOG(DEBUG, FastMapReduce) << "Starting merge from yt tables: " << JoinRange(' ', inputPaths.begin(), inputPaths.end()); return GetRunningOperationFuture(mergeOperationRequest, sessionId); } + TFuture<TFmrOperationResult> GetSuccessfulFmrOperationResult() { + TFmrOperationResult fmrOperationResult = TFmrOperationResult(); + fmrOperationResult.SetSuccess(); + return MakeFuture(fmrOperationResult); + } + private: struct TFmrGatewayOperationsState { std::unordered_map<TString, TPromise<TFmrOperationResult>> OperationStatuses = {}; // operationId -> promise which we set when operation completes @@ -443,6 +454,7 @@ private: TFmrGatewayOperationsState OperationStates; std::unordered_map<TString, ETablePresenceStatus> TablePresenceStatuses; // yt cluster and path -> is it In Yt, Fmr TableDataService TString UserName; + std::unordered_map<TString, TString> FmrIdAliases; }; struct TSession { @@ -465,3 +477,21 @@ IYtGateway::TPtr CreateYtFmrGateway(IYtGateway::TPtr slave, IFmrCoordinator::TPt } } // namespace NYql::NFmr + +template<> +void Out<NYql::NFmr::ETablePresenceStatus>(IOutputStream& out, NYql::NFmr::ETablePresenceStatus status) { + switch (status) { + case NYql::NFmr::ETablePresenceStatus::Both: { + out << "BOTH"; + return; + } + case NYql::NFmr::ETablePresenceStatus::OnlyInFmr: { + out << "ONLY IN FMR"; + return; + } + case NYql::NFmr::ETablePresenceStatus::OnlyInYt: { + out << "ONLY IN YT"; + return; + } + } +} diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp index 4f476876b2d..02152c47bba 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp +++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp @@ -5798,7 +5798,11 @@ private: TClusterConnectionResult clusterConnectionResult{}; clusterConnectionResult.TransactionId = GetGuidAsString(entry->Tx->GetId()); clusterConnectionResult.YtServerName = ytServer; - clusterConnectionResult.Token = options.Config()->Auth.Get(); + auto auth = options.Config()->Auth.Get(); + if (!auth || auth->empty()) { + auth = Clusters_->GetAuth(options.Cluster()); + } + clusterConnectionResult.Token = auth; clusterConnectionResult.SetSuccess(); return clusterConnectionResult; } catch (...) { @@ -5806,6 +5810,10 @@ private: } } + TMaybe<TString> GetTableFilePath(const TGetTableFilePathOptions&&) override { + return Nothing(); + } + static void ReportBlockStatus(const TYtOpBase& op, const TExecContext<TRunOptions>::TPtr& execCtx) { if (execCtx->Options_.PublicId().Empty()) { return; diff --git a/yt/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp b/yt/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp index 2d204896f99..3c7f71daeb4 100644 --- a/yt/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp +++ b/yt/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp @@ -965,6 +965,10 @@ public: return Inner_->GetClusterConnection(std::move(options)); } + TMaybe<TString> GetTableFilePath(const TGetTableFilePathOptions&& options) override { + return Inner_->GetTableFilePath(std::move(options)); + } + private: const IYtGateway::TPtr Inner_; const TQContext QContext_; diff --git a/yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.cpp b/yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.cpp index 42b30aafb3a..d3ef8db327e 100644 --- a/yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.cpp @@ -137,4 +137,12 @@ void TYtForwardingGatewayBase::AddCluster(const TYtClusterConfig& config) { Slave_->AddCluster(config); } +IYtGateway::TClusterConnectionResult TYtForwardingGatewayBase::GetClusterConnection(const TClusterConnectionOptions&& options) { + return Slave_->GetClusterConnection(std::move(options)); +} + +TMaybe<TString> TYtForwardingGatewayBase::GetTableFilePath(const TGetTableFilePathOptions&& options) { + return Slave_->GetTableFilePath(std::move(options)); +} + } // namspace NYql diff --git a/yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.h b/yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.h index 1687d400506..597557bc5ec 100644 --- a/yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.h +++ b/yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.h @@ -71,6 +71,10 @@ public: void AddCluster(const TYtClusterConfig& config) override; + TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& options) override; + + TMaybe<TString> GetTableFilePath(const TGetTableFilePathOptions&& options) override; + protected: IYtGateway::TPtr Slave_; }; diff --git a/yt/yql/providers/yt/provider/yql_yt_gateway.h b/yt/yql/providers/yt/provider/yql_yt_gateway.h index 2c713ebe265..e1b49abf7b6 100644 --- a/yt/yql/providers/yt/provider/yql_yt_gateway.h +++ b/yt/yql/providers/yt/provider/yql_yt_gateway.h @@ -635,6 +635,19 @@ public: TMaybe<TString> Token; }; + struct TGetTableFilePathOptions: public TCommonOptions { + using TSelf = TGetTableFilePathOptions; + + TGetTableFilePathOptions(const TString& sessionId) + : TCommonOptions(sessionId) + { + } + + OPTION_FIELD(TString, Cluster) + OPTION_FIELD(TString, Path) + OPTION_FIELD(bool, IsTemp) + }; + public: virtual ~IYtGateway() = default; @@ -697,6 +710,9 @@ public: virtual void AddCluster(const TYtClusterConfig& cluster) = 0; virtual TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& options) = 0; + + virtual TMaybe<TString> GetTableFilePath(const TGetTableFilePathOptions&& options) = 0; + }; } diff --git a/yt/yql/tools/ytrun/lib/ya.make b/yt/yql/tools/ytrun/lib/ya.make index 8f04604931c..fc10024b330 100644 --- a/yt/yql/tools/ytrun/lib/ya.make +++ b/yt/yql/tools/ytrun/lib/ya.make @@ -6,13 +6,7 @@ SRCS( PEERDIR( yt/yql/providers/yt/provider - yt/yql/providers/yt/fmr/coordinator/client - yt/yql/providers/yt/fmr/coordinator/impl - yt/yql/providers/yt/fmr/job/impl - yt/yql/providers/yt/fmr/job_factory/impl - yt/yql/providers/yt/fmr/table_data_service/local - yt/yql/providers/yt/fmr/worker/impl - yt/yql/providers/yt/fmr/yt_service/impl + yt/yql/providers/yt/fmr/fmr_tool_lib yt/yql/providers/yt/gateway/native yt/yql/providers/yt/gateway/fmr yt/yql/providers/yt/lib/config_clusters diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp index 63e0c1cede8..260c8b69647 100644 --- a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp +++ b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp @@ -8,12 +8,7 @@ #include <yt/yql/providers/yt/lib/log/yt_logger.h> #include <yt/yql/providers/yt/gateway/native/yql_yt_native.h> #include <yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h> -#include <yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.h> -#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h> -#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h> -#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h> -#include <yt/yql/providers/yt/fmr/table_data_service/local/yql_yt_table_data_service_local.h> -#include <yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h> +#include <yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h> #include <yql/essentials/providers/common/provider/yql_provider_names.h> #include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h> #include <yql/essentials/core/services/yql_transform_pipeline.h> @@ -155,12 +150,12 @@ TYtRunTool::TYtRunTool(TString name) DefYtServer_ = NYql::TConfigClusters::GetDefaultYtServer(*ytConfig); - if (GetRunOptions().GatewayTypes.contains(FastMapReduceGatewayName)) { + if (GetRunOptions().GatewayTypes.contains(NFmr::FastMapReduceGatewayName)) { GetRunOptions().GatewayTypes.emplace(YtProviderName); } }); - GetRunOptions().SetSupportedGateways({TString{YtProviderName}, TString{FastMapReduceGatewayName}}); + GetRunOptions().SetSupportedGateways({TString{YtProviderName}, TString{NFmr::FastMapReduceGatewayName}}); GetRunOptions().GatewayTypes.emplace(YtProviderName); AddFsDownloadFactory([this]() -> NFS::IDownloaderPtr { @@ -187,38 +182,13 @@ IYtGateway::TPtr TYtRunTool::CreateYtGateway() { services.FileStorage = GetFileStorage(); services.Config = std::make_shared<TYtGatewayConfig>(GetRunOptions().GatewaysConfig->GetYt()); auto ytGateway = CreateYtNativeGateway(services); - if (!GetRunOptions().GatewayTypes.contains(FastMapReduceGatewayName)) { + if (!GetRunOptions().GatewayTypes.contains(NFmr::FastMapReduceGatewayName)) { return ytGateway; } - auto coordinator = NFmr::MakeFmrCoordinator(); - if (!FmrCoordinatorServerUrl_.empty()) { - NFmr::TFmrCoordinatorClientSettings coordinatorClientSettings; - THttpURL parsedUrl; - if (parsedUrl.Parse(FmrCoordinatorServerUrl_) != THttpURL::ParsedOK) { - ythrow yexception() << "Invalid fast map reduce coordinator server url passed in parameters"; - } - coordinatorClientSettings.Port = parsedUrl.GetPort(); - coordinatorClientSettings.Host = parsedUrl.GetHost(); - coordinator = NFmr::MakeFmrCoordinatorClient(coordinatorClientSettings); - } - - if (!DisableLocalFmrWorker_) { - auto tableDataService = MakeLocalTableDataService(NFmr::TLocalTableDataServiceSettings(3)); - auto fmrYtSerivce = NFmr::MakeFmrYtSerivce(); - - auto func = [tableDataService, fmrYtSerivce] (NFmr::TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) mutable { - return NFmr::RunJob(task, tableDataService, fmrYtSerivce, cancelFlag); - }; - - NFmr::TFmrJobFactorySettings settings{.Function=func}; - auto jobFactory = MakeFmrJobFactory(settings); - NFmr::TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDefaultRandomProvider(), - .TimeToSleepBetweenRequests=TDuration::Seconds(1)}; - FmrWorker_ = MakeFmrWorker(coordinator, jobFactory, workerSettings); - FmrWorker_->Start(); - } - return NFmr::CreateYtFmrGateway(ytGateway, coordinator); + auto [fmrGateway, worker] = NFmr::InitializeFmrGateway(ytGateway, DisableLocalFmrWorker_, FmrCoordinatorServerUrl_); + FmrWorker_ = std::move(worker); + return fmrGateway; } IOptimizerFactory::TPtr TYtRunTool::CreateCboFactory() { diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.h b/yt/yql/tools/ytrun/lib/ytrun_lib.h index edbc7c12ec2..9fa4ee3df5a 100644 --- a/yt/yql/tools/ytrun/lib/ytrun_lib.h +++ b/yt/yql/tools/ytrun/lib/ytrun_lib.h @@ -12,8 +12,6 @@ namespace NYql { -constexpr TStringBuf FastMapReduceGatewayName = "fmr"; - class TYtRunTool: public TFacadeRunner { public: TYtRunTool(TString name = "ytrun"); |
