diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2025-04-15 17:28:48 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2025-04-15 17:39:27 +0300 |
commit | a004c2e3c2279e8f31e98d74ed3b7773a34a9557 (patch) | |
tree | 867a5833c9cd19e6b6d5e307e823d0646681b882 | |
parent | 9dd7e9152369a7948af5bfd3a2b1e2a1f7cd32ee (diff) | |
download | ydb-a004c2e3c2279e8f31e98d74ed3b7773a34a9557.tar.gz |
Intermediate changes
commit_hash:2f170b3ffc5c6d88add9677f8d5819e7bbd549fe
14 files changed, 195 insertions, 103 deletions
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp index fef5f4b110b..c64bc88aaf2 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp @@ -46,7 +46,7 @@ private: TDownloadOperationParams downloadOperationParams{ .Input = TYtTableRef{"Path","Cluster"}, - .Output = TFmrTableRef{"TableId"} + .Output = TFmrTableRef{{"TestCluster", "TestPath"}} }; TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Download, TOperationParams operationParams = downloadOperationParams) { @@ -54,7 +54,7 @@ TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Do .TaskType = taskType, .OperationParams = operationParams, .IdempotencyKey = "IdempotencyKey", - .ClusterConnections = {{"Cluster.Path", TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}}} + .ClusterConnections = {{TFmrTableId("Cluster", "Path"), TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}}} }; } @@ -67,7 +67,7 @@ std::vector<TStartOperationRequest> CreateSeveralOperationRequests( .TaskType = taskType, .OperationParams = operationParams, .IdempotencyKey = "IdempotencyKey_" + ToString(i), - .ClusterConnections = {{"Cluster.Path", TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}}} + .ClusterConnections = {{TFmrTableId("Cluster", "Path"), TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}}} }; } return startOperationRequests; @@ -172,7 +172,10 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { auto startOperationResponse = coordinator->StartOperation(request).GetValueSync(); downloadOperationIds.emplace_back(startOperationResponse.OperationId); } - auto uploadOperationRequest = CreateOperationRequest(ETaskType::Upload, TUploadOperationParams{}); + auto uploadOperationRequest = CreateOperationRequest(ETaskType::Upload, TUploadOperationParams{ + {{"Cluster", "Path"}}, + {} + }); auto uploadOperationResponse = coordinator->StartOperation(uploadOperationRequest).GetValueSync(); auto uploadOperationId = uploadOperationResponse.OperationId; diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp index 2d9a68042e5..625f034c3fd 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp @@ -19,6 +19,7 @@ struct TOperationInfo { EOperationStatus OperationStatus; std::vector<TFmrError> ErrorMessages; TString SessionId; + std::vector<TString> OutputTableIds = {}; }; struct TIdempotencyKeyInfo { @@ -85,7 +86,11 @@ public: auto& operationInfo = Operations_[operationId]; auto operationStatus = operationInfo.OperationStatus; auto errorMessages = operationInfo.ErrorMessages; - return NThreading::MakeFuture(TGetOperationResponse(operationStatus, errorMessages)); + std::vector<TTableStats> outputTablesStats; + for (auto& tableId : operationInfo.OutputTableIds) { + outputTablesStats.emplace_back(FmrTableStatistics_[tableId].Stats); + } + return NThreading::MakeFuture(TGetOperationResponse(operationStatus, errorMessages, outputTablesStats)); } NThreading::TFuture<TDeleteOperationResponse> DeleteOperation(const TDeleteOperationRequest& request) override { @@ -159,6 +164,7 @@ public: Cerr << "Current statistic from table with id" << fmrTableId.TableId << "_" << fmrTableId.PartId << ": " << tableStats; } } + Operations_[operationId].OutputTableIds.emplace_back(fmrTableId.TableId); FmrTableStatistics_[fmrTableId.TableId] = TCoordinatorFmrTableStats{ .Stats = tableStats, .PartId = fmrTableId.PartId @@ -280,7 +286,7 @@ private: if (const TUploadOperationParams* uploadOperationParams = std::get_if<TUploadOperationParams>(&operationParams)) { TUploadTaskParams uploadTaskParams{}; uploadTaskParams.Output = uploadOperationParams->Output; - TString inputTableId = uploadOperationParams->Input.TableId; + TString inputTableId = uploadOperationParams->Input.FmrTableId.Id; TFmrTableInputRef fmrTableInput{ .TableId = inputTableId, .TableRanges = {GetTableRangeFromId(inputTableId)} @@ -290,7 +296,7 @@ private: } else if (const TDownloadOperationParams* downloadOperationParams = std::get_if<TDownloadOperationParams>(&operationParams)) { TDownloadTaskParams downloadTaskParams{}; downloadTaskParams.Input = downloadOperationParams->Input; - TString outputTableId = downloadOperationParams->Output.TableId; + TString outputTableId = downloadOperationParams->Output.FmrTableId.Id; TFmrTableOutputRef fmrTableOutput{ .TableId = outputTableId, .PartId = GetTableRangeFromId(outputTableId).PartId @@ -306,7 +312,7 @@ private: mergeInputTasks.emplace_back(*ytTableRef); } else { TFmrTableRef fmrTableRef = std::get<TFmrTableRef>(elem); - TString inputTableId = fmrTableRef.TableId; + TString inputTableId = fmrTableRef.FmrTableId.Id; TFmrTableInputRef tableInput{ .TableId = inputTableId, .TableRanges = {GetTableRangeFromId(inputTableId)} @@ -316,7 +322,7 @@ private: } mergeTaskParams.Input = mergeInputTasks; TFmrTableOutputRef outputTable; - mergeTaskParams.Output = TFmrTableOutputRef{.TableId = mergeOperationParams.Output.TableId}; + mergeTaskParams.Output = TFmrTableOutputRef{.TableId = mergeOperationParams.Output.FmrTableId.Id}; return mergeTaskParams; } } @@ -356,7 +362,7 @@ private: std::atomic<bool> StopCoordinator_; TDuration TimeToSleepBetweenClearKeyRequests_; TDuration IdempotencyKeyStoreTime_; - std::unordered_map<TString, TCoordinatorFmrTableStats> FmrTableStatistics_; // TableId -> Statistics + std::unordered_map<TFmrTableId, TCoordinatorFmrTableStats> FmrTableStatistics_; // TableId -> Statistics TMaybe<NYT::TNode> DefaultFmrOperationSpec_; }; diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp index 80367f0bce2..1e2407b1ea5 100644 --- a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp @@ -72,7 +72,7 @@ NProto::TStartOperationRequest StartOperationRequestToProto(const TStartOperatio protoStartOperationRequest.SetNumRetries(startOperationRequest.NumRetries); auto clusterConnections = *protoStartOperationRequest.MutableClusterConnections(); for (auto& [tableName, conn]: startOperationRequest.ClusterConnections) { - clusterConnections[tableName] = ClusterConnectionToProto(conn); + clusterConnections[tableName.Id] = ClusterConnectionToProto(conn); } if (startOperationRequest.FmrOperationSpec) { protoStartOperationRequest.SetFmrOperationSpec(NYT::NodeToYsonString(*startOperationRequest.FmrOperationSpec)); @@ -89,7 +89,7 @@ TStartOperationRequest StartOperationRequestFromProto(const NProto::TStartOperat startOperationRequest.IdempotencyKey = protoStartOperationRequest.GetIdempotencyKey(); } startOperationRequest.NumRetries = protoStartOperationRequest.GetNumRetries(); - std::unordered_map<TString, TClusterConnection> startOperationRequestClusterConnections; + std::unordered_map<TFmrTableId, TClusterConnection> startOperationRequestClusterConnections; for (auto& [tableName, conn]: protoStartOperationRequest.GetClusterConnections()) { startOperationRequestClusterConnections[tableName] = ClusterConnectionFromProto(conn); } diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h index a045aa5bb01..a42cb0d35a6 100644 --- a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h +++ b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h @@ -25,7 +25,7 @@ struct TStartOperationRequest { TString SessionId; TMaybe<TString> IdempotencyKey = Nothing(); ui32 NumRetries = 1; // Not supported yet - std::unordered_map<TString, TClusterConnection> ClusterConnections = {}; + std::unordered_map<TFmrTableId, TClusterConnection> ClusterConnections = {}; TMaybe<NYT::TNode> FmrOperationSpec = Nothing(); }; @@ -41,6 +41,7 @@ struct TGetOperationRequest { struct TGetOperationResponse { EOperationStatus Status; std::vector<TFmrError> ErrorMessages = {}; + std::vector<TTableStats> OutputTablesStats = {}; }; struct TDeleteOperationRequest { diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp index 482225a0fce..11b30362ee7 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp @@ -47,7 +47,7 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { TDownloadTaskParams params = TDownloadTaskParams(input, output); auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); - auto res = job->Download(params, {{"test_cluster.test_path", TClusterConnection()}}); + auto res = job->Download(params, {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}); auto err = std::get_if<TError>(&res); auto statistics = std::get_if<TStatistics>(&res); @@ -75,7 +75,7 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0); tableDataServicePtr->Put(key, GetBinaryYson(TableContent_1)); - auto res = job->Upload(params, {{"test_cluster.test_path", TClusterConnection()}}); + auto res = job->Upload(params, {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}); auto err = std::get_if<TError>(&res); @@ -110,7 +110,7 @@ Y_UNIT_TEST_SUITE(FmrJobTests) { tableDataServicePtr->Put(key_1, GetBinaryYson(TableContent_1)); tableDataServicePtr->Put(key_3, GetBinaryYson(TableContent_3)); - auto res = job->Merge(params, {{"test_cluster.test_path", TClusterConnection()}}); + auto res = job->Merge(params, {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}); auto err = std::get_if<TError>(&res); UNIT_ASSERT_C(!err, err->ErrorMessage); @@ -133,7 +133,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id", "test_part_id"); auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); TDownloadTaskParams params = TDownloadTaskParams(input, output); - TTask::TPtr task = MakeTask(ETaskType::Download, "test_task_id", params, "test_session_id", {{"test_cluster.test_path", TClusterConnection()}}); + TTask::TPtr task = MakeTask(ETaskType::Download, "test_task_id", params, "test_session_id", {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}); ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed); @@ -154,7 +154,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { TYtTableRef output = TYtTableRef("test_cluster", "test_path"); TUploadTaskParams params = TUploadTaskParams(input, output); - TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", {{"test_cluster.test_path", TClusterConnection()}}); + TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}); auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0); tableDataServicePtr->Put(key, GetBinaryYson(TableContent_1)); ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus; @@ -175,7 +175,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { TYtTableRef output = TYtTableRef("test_cluster", "test_path"); TUploadTaskParams params = TUploadTaskParams(input, output); - TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", {{"test_cluster.test_path", TClusterConnection()}}); + TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}); // No tables in tableDataService try { @@ -204,7 +204,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) { auto params = TMergeTaskParams(inputs, output); auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0); - TTask::TPtr task = MakeTask(ETaskType::Merge, "test_task_id", params, "test_session_id", {{"test_cluster.test_path", TClusterConnection()}}); + TTask::TPtr task = MakeTask(ETaskType::Merge, "test_task_id", params, "test_session_id", {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}}); auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0); auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0); diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp index c1721456cdc..358c4215970 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp @@ -21,7 +21,7 @@ public: { } - virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections) override { + virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) override { try { const auto ytTable = params.Input; const auto cluster = params.Input.Cluster; @@ -46,7 +46,7 @@ public: } } - virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections) override { + virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) override { try { const auto ytTable = params.Output; const auto cluster = params.Output.Cluster; @@ -68,7 +68,7 @@ public: } } - virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections) override { + virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) override { // расширить таск парамс. добавить туда мету try { const auto inputs = params.Input; @@ -93,11 +93,11 @@ public: } private: - NYT::TRawTableReaderPtr GetTableInputStream(const TTaskTableRef& tableRef, const std::unordered_map<TString, TClusterConnection>& clusterConnections) const { + NYT::TRawTableReaderPtr GetTableInputStream(const TTaskTableRef& tableRef, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) const { auto ytTable = std::get_if<TYtTableRef>(&tableRef); auto fmrTable = std::get_if<TFmrTableInputRef>(&tableRef); if (ytTable) { - TString tableId = ytTable->Cluster + "." + ytTable->Path; + TFmrTableId tableId = {ytTable->Cluster, ytTable->Path}; auto clusterConnection = clusterConnections.at(tableId); return YtService_->MakeReader(*ytTable, clusterConnection); // TODO - pass YtReader settings from Gateway } else if (fmrTable) { diff --git a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h index 022a6256db0..18d9553ba01 100644 --- a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h +++ b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h @@ -10,11 +10,11 @@ public: virtual ~IFmrJob() = default; - virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections = {}) = 0; + virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}) = 0; - virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections = {}) = 0; + virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}) = 0; - virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections = {}) = 0; + virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}) = 0; }; } // namespace NYql diff --git a/yt/yql/providers/yt/fmr/proto/request_options.proto b/yt/yql/providers/yt/fmr/proto/request_options.proto index 62c917b4e50..60c8471846f 100644 --- a/yt/yql/providers/yt/fmr/proto/request_options.proto +++ b/yt/yql/providers/yt/fmr/proto/request_options.proto @@ -35,12 +35,18 @@ enum EFmrComponent { COMPONENT_JOB = 3; } +enum EFmrReason { + REASON_UNKNOWN = 0; + USER_ERROR = 1; +} + message TFmrError { EFmrComponent Component = 1; - string ErrorMessage = 2; - optional uint32 WorkerId = 3; - optional string TaskId = 4; - optional string OperationId = 5; + EFmrReason Reason = 2; + string ErrorMessage = 3; + optional uint32 WorkerId = 4; + optional string TaskId = 5; + optional string OperationId = 6; } message TYtTableRef { @@ -49,8 +55,12 @@ message TYtTableRef { optional string FilePath = 3; } +message TFmrTableId { + string Id = 1; +} + message TFmrTableRef { - string TableId = 1; + TFmrTableId FmrTableId = 1; } message TTableRange { diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp index 20dfb5362a9..b0c368008dc 100644 --- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp +++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp @@ -6,6 +6,7 @@ namespace NYql::NFmr { NProto::TFmrError FmrErrorToProto(const TFmrError& error) { NProto::TFmrError protoError; protoError.SetComponent(static_cast<NProto::EFmrComponent>(error.Component)); + protoError.SetReason(static_cast<NProto::EFmrReason>(error.Reason)); protoError.SetErrorMessage(error.ErrorMessage); if (error.WorkerId) { protoError.SetWorkerId(*error.WorkerId); @@ -22,6 +23,7 @@ NProto::TFmrError FmrErrorToProto(const TFmrError& error) { TFmrError FmrErrorFromProto(const NProto::TFmrError& protoError) { TFmrError fmrError; fmrError.Component = static_cast<EFmrComponent>(protoError.GetComponent()); + fmrError.Reason = static_cast<EFmrErrorReason>(protoError.GetReason()); fmrError.ErrorMessage = protoError.GetErrorMessage(); if (protoError.HasWorkerId()) { fmrError.WorkerId = protoError.GetWorkerId(); @@ -55,16 +57,26 @@ TYtTableRef YtTableRefFromProto(const NProto::TYtTableRef protoYtTableRef) { return ytTableRef; } +NProto::TFmrTableId FmrTableIdToProto(const TFmrTableId& fmrTableId) { + NProto::TFmrTableId protoFmrTableId; + protoFmrTableId.SetId(fmrTableId.Id); + return protoFmrTableId; +} + +TFmrTableId FmrTableIdFromProto(const NProto::TFmrTableId& protoFmrTableId) { + return TFmrTableId(protoFmrTableId.GetId()); +} + NProto::TFmrTableRef FmrTableRefToProto(const TFmrTableRef& fmrTableRef) { NProto::TFmrTableRef protoFmrTableRef; - protoFmrTableRef.SetTableId(fmrTableRef.TableId); + auto protoFmrTableId = FmrTableIdToProto(fmrTableRef.FmrTableId); + protoFmrTableRef.MutableFmrTableId()->Swap(&protoFmrTableId); return protoFmrTableRef; } TFmrTableRef FmrTableRefFromProto(const NProto::TFmrTableRef protoFmrTableRef) { - TFmrTableRef fmrTableRef; - fmrTableRef.TableId = protoFmrTableRef.GetTableId(); - return fmrTableRef; + auto tableId = FmrTableIdFromProto(protoFmrTableRef.GetFmrTableId()); + return TFmrTableRef(tableId); } NProto::TTableRange TableRangeToProto(const TTableRange& tableRange) { @@ -227,10 +239,10 @@ NProto::TUploadTaskParams UploadTaskParamsToProto(const TUploadTaskParams& uploa } TUploadOperationParams UploadOperationParamsFromProto(const NProto::TUploadOperationParams& protoUploadOperationParams) { - TUploadOperationParams uploadOperationParams; - uploadOperationParams.Input = FmrTableRefFromProto(protoUploadOperationParams.GetInput()); - uploadOperationParams.Output = YtTableRefFromProto(protoUploadOperationParams.GetOutput()); - return uploadOperationParams; + return TUploadOperationParams( + FmrTableRefFromProto(protoUploadOperationParams.GetInput()), + YtTableRefFromProto(protoUploadOperationParams.GetOutput()) + ); } TUploadTaskParams UploadTaskParamsFromProto(const NProto::TUploadTaskParams& protoUploadTaskParams) { @@ -259,10 +271,10 @@ NProto::TDownloadTaskParams DownloadTaskParamsToProto(const TDownloadTaskParams& } TDownloadOperationParams DownloadOperationParamsFromProto(const NProto::TDownloadOperationParams& protoDownloadOperationParams) { - TDownloadOperationParams downloadOperationParams; - downloadOperationParams.Input = YtTableRefFromProto(protoDownloadOperationParams.GetInput()); - downloadOperationParams.Output = FmrTableRefFromProto(protoDownloadOperationParams.GetOutput()); - return downloadOperationParams; + return TDownloadOperationParams( + YtTableRefFromProto(protoDownloadOperationParams.GetInput()), + FmrTableRefFromProto(protoDownloadOperationParams.GetOutput()) + ); } TDownloadTaskParams DownloadTaskParamsFromProto(const NProto::TDownloadTaskParams& protoDownloadTaskParams) { @@ -297,14 +309,14 @@ NProto::TMergeTaskParams MergeTaskParamsToProto(const TMergeTaskParams& mergeTas } TMergeOperationParams MergeOperationParamsFromProto(const NProto::TMergeOperationParams& protoMergeOperationParams) { - TMergeOperationParams mergeOperationParams; - std::vector<TOperationTableRef> input; + TMergeOperationParams mergeOperationParams( + {}, + FmrTableRefFromProto(protoMergeOperationParams.GetOutput()) + ); for (size_t i = 0; i < protoMergeOperationParams.InputSize(); ++i) { TOperationTableRef inputTable = OperationTableRefFromProto(protoMergeOperationParams.GetInput(i)); - input.emplace_back(inputTable); + mergeOperationParams.Input.emplace_back(inputTable); } - mergeOperationParams.Input = input; - mergeOperationParams.Output = FmrTableRefFromProto(protoMergeOperationParams.GetOutput()); return mergeOperationParams; } @@ -353,15 +365,13 @@ NProto::TTaskParams TaskParamsToProto(const TTaskParams& taskParams) { } TOperationParams OperationParamsFromProto(const NProto::TOperationParams& protoOperationParams) { - TOperationParams operationParams; if (protoOperationParams.HasDownloadOperationParams()) { - operationParams = DownloadOperationParamsFromProto(protoOperationParams.GetDownloadOperationParams()); + return DownloadOperationParamsFromProto(protoOperationParams.GetDownloadOperationParams()); } else if (protoOperationParams.HasUploadOperationParams()) { - operationParams = UploadOperationParamsFromProto(protoOperationParams.GetUploadOperationParams()); + return UploadOperationParamsFromProto(protoOperationParams.GetUploadOperationParams()); } else { - operationParams = MergeOperationParamsFromProto(protoOperationParams.GetMergeOperationParams()); + return MergeOperationParamsFromProto(protoOperationParams.GetMergeOperationParams()); } - return operationParams; } TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams) { @@ -406,7 +416,7 @@ NProto::TTask TaskToProto(const TTask& task) { protoTask.SetNumRetries(task.NumRetries); auto clusterConnections = *protoTask.MutableClusterConnections(); for (auto& [tableName, conn]: task.ClusterConnections) { - clusterConnections[tableName] = ClusterConnectionToProto(conn); + clusterConnections[tableName.Id] = ClusterConnectionToProto(conn); } if (task.JobSettings) { protoTask.SetJobSettings(NYT::NodeToYsonString(*task.JobSettings)); @@ -421,7 +431,7 @@ TTask TaskFromProto(const NProto::TTask& protoTask) { task.TaskParams = TaskParamsFromProto(protoTask.GetTaskParams()); task.SessionId = protoTask.GetSessionId(); task.NumRetries = protoTask.GetNumRetries(); - std::unordered_map<TString, TClusterConnection> taskClusterConnections; + std::unordered_map<TFmrTableId, TClusterConnection> taskClusterConnections; for (auto& [tableName, conn]: protoTask.GetClusterConnections()) { taskClusterConnections[tableName] = ClusterConnectionFromProto(conn); } diff --git a/yt/yql/providers/yt/fmr/request_options/ya.make b/yt/yql/providers/yt/fmr/request_options/ya.make index df82ec258fe..4e74eb8b185 100644 --- a/yt/yql/providers/yt/fmr/request_options/ya.make +++ b/yt/yql/providers/yt/fmr/request_options/ya.make @@ -7,6 +7,7 @@ SRCS( PEERDIR( library/cpp/yson/node library/cpp/threading/future + yql/essentials/public/issue ) YQL_LAST_ABI_VERSION() diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp index 8a0bcde5751..6da844918da 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp @@ -2,7 +2,15 @@ namespace NYql::NFmr { -TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const std::unordered_map<TString, TClusterConnection>& clusterConnections, const TMaybe<NYT::TNode>& jobSettings) { +TFmrTableId::TFmrTableId(const TString& id): Id(id) +{ +}; + +TFmrTableId::TFmrTableId(const TString& cluster, const TString& path): Id(cluster + "." + path) +{ +}; + +TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections, const TMaybe<NYT::TNode>& jobSettings) { return MakeIntrusive<TTask>(taskType, taskId, taskParams, sessionId, clusterConnections, jobSettings); } @@ -17,6 +25,11 @@ TString TFmrChunkMeta::ToString() const { } // namespace NYql::NFmr template<> +void Out<NYql::NFmr::TFmrTableId>(IOutputStream& out, const NYql::NFmr::TFmrTableId& tableId) { + out << tableId.Id; +} + +template<> void Out<NYql::NFmr::TFmrError>(IOutputStream& out, const NYql::NFmr::TFmrError& error) { out << "FmrError[" << error.Component << "]"; if (error.Component == NYql::NFmr::EFmrComponent::Worker) { diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h index bab3df01aae..855e6b8e7d8 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h @@ -41,8 +41,14 @@ enum class EFmrComponent { Job }; +enum class EFmrErrorReason { + ReasonUnknown, + UserError // TODO Add more reasons +}; + struct TFmrError { EFmrComponent Component; + EFmrErrorReason Reason; TString ErrorMessage; TMaybe<ui32> WorkerId; TMaybe<TString> TaskId; @@ -62,8 +68,20 @@ struct TYtTableRef { bool operator == (const TYtTableRef&) const = default; }; +struct TFmrTableId { + TString Id; + + TFmrTableId() = default; + + TFmrTableId(const TString& id); + + TFmrTableId(const TString& cluster, const TString& path); + + bool operator == (const TFmrTableId&) const = default; +}; + struct TFmrTableRef { - TString TableId; + TFmrTableId FmrTableId; }; struct TTableRange { @@ -102,6 +120,14 @@ struct TTableStats { } // namespace NYql::NFmr namespace std { + + template<> + struct hash<NYql::NFmr::TFmrTableId> { + size_t operator()(const NYql::NFmr::TFmrTableId& tableId) const { + return hash<TString>()(tableId.Id); + } + }; + template<> struct hash<NYql::NFmr::TFmrTableOutputRef> { size_t operator()(const NYql::NFmr::TFmrTableOutputRef& ref) const { @@ -170,7 +196,7 @@ struct TClusterConnection { struct TTask: public TThrRefBase { TTask() = default; - TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const std::unordered_map<TString, TClusterConnection>& clusterConnections, const TMaybe<NYT::TNode> & jobSettings = Nothing(), ui32 numRetries = 1) + TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections, const TMaybe<NYT::TNode> & jobSettings = Nothing(), ui32 numRetries = 1) : TaskType(taskType), TaskId(taskId), TaskParams(taskParams), SessionId(sessionId), ClusterConnections(clusterConnections), JobSettings(jobSettings), NumRetries(numRetries) { } @@ -179,7 +205,7 @@ struct TTask: public TThrRefBase { TString TaskId; TTaskParams TaskParams = {}; TString SessionId; - std::unordered_map<TString, TClusterConnection> ClusterConnections = {}; + std::unordered_map<TFmrTableId, TClusterConnection> ClusterConnections = {}; TMaybe<NYT::TNode> JobSettings = {}; ui32 NumRetries; // Not supported yet @@ -201,7 +227,7 @@ struct TTaskState: public TThrRefBase { using TPtr = TIntrusivePtr<TTaskState>; }; -TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const std::unordered_map<TString, TClusterConnection>& clusterConnections = {}, const TMaybe<NYT::TNode>& jobSettings = Nothing()); +TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}, const TMaybe<NYT::TNode>& jobSettings = Nothing()); TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& taskErrorMessage = Nothing(), const TStatistics& stats = TStatistics()); diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp index 2007cec3011..f19c5365bdf 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp +++ b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp @@ -12,7 +12,7 @@ namespace NYql::NFmr { TDownloadOperationParams downloadOperationParams{ .Input = TYtTableRef{"Path","Cluster"}, - .Output = TFmrTableRef{"TableId"} + .Output = TFmrTableRef{{"Cluster", "Path"}} }; TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Download, TOperationParams operationParams = downloadOperationParams) { @@ -20,7 +20,7 @@ TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Do .TaskType = taskType, .OperationParams = operationParams, .IdempotencyKey = "IdempotencyKey", - .ClusterConnections = {{"Cluster.Path", TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}}} + .ClusterConnections = {{TFmrTableId("Cluster", "Path"), TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}}} }; } diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp index 5860a65366d..e1302451382 100644 --- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp +++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp @@ -31,7 +31,14 @@ enum class ETablePresenceStatus { namespace { -struct TFmrOperationResult: public NCommon::TOperationResult {}; +TIssue ToIssue(const TFmrError& error, const TPosition& pos){ + return TIssue(pos, error.ErrorMessage); +}; + +struct TFmrOperationResult: public NCommon::TOperationResult { + std::vector<TFmrError> Errors = {}; + std::vector<TTableStats> TablesStats = {}; +}; class TFmrYtGateway final: public TYtForwardingGatewayBase { public: @@ -45,29 +52,30 @@ public: auto getOperationStatusesFunc = [&] { while (!StopFmrGateway_) { with_lock(SessionStates_->Mutex) { - auto checkOperationStatuses = [&] (std::unordered_map<TString, TPromise<TFmrOperationResult>>& operationStatuses, const TString& sessionId) { + auto checkOperationStatuses = [&] (std::unordered_map<TFmrTableId, TPromise<TFmrOperationResult>>& operationStatuses, const TString& sessionId) { for (auto& [operationId, promise]: operationStatuses) { YQL_CLOG(TRACE, FastMapReduce) << "Sending get operation request to coordinator with operationId: " << operationId; - auto getOperationFuture = Coordinator_->GetOperation({operationId}); + auto getOperationFuture = Coordinator_->GetOperation({operationId.Id}); getOperationFuture.Subscribe([&, operationId, sessionId] (const auto& getFuture) { auto getOperationResult = getFuture.GetValueSync(); auto getOperationStatus = getOperationResult.Status; auto operationErrorMessages = getOperationResult.ErrorMessages; + auto operationOutputTablesStats = getOperationResult.OutputTablesStats; with_lock(SessionStates_->Mutex) { bool operationCompleted = getOperationStatus != EOperationStatus::Accepted && getOperationStatus != EOperationStatus::InProgress; if (operationCompleted) { // operation finished, set value in future returned in DoMerge / DoUpload bool hasCompletedSuccessfully = getOperationStatus == EOperationStatus::Completed; + TFmrOperationResult fmrOperationResult{}; + fmrOperationResult.Errors = operationErrorMessages; if (hasCompletedSuccessfully) { - TFmrOperationResult fmrOperationResult{}; + fmrOperationResult.TablesStats = operationOutputTablesStats; fmrOperationResult.SetSuccess(); - promise.SetValue(fmrOperationResult); - } else { - promise.SetException(JoinRange(' ', operationErrorMessages.begin(), operationErrorMessages.end())); } - YQL_CLOG(DEBUG, FastMapReduce) << "Sending delete operation request to coordinator with operationId: " << operationId; - auto deleteOperationFuture = Coordinator_->DeleteOperation({operationId}); + promise.SetValue(fmrOperationResult); + YQL_CLOG(INFO, FastMapReduce) << "Sending delete operation request to coordinator with operationId: " << operationId; + auto deleteOperationFuture = Coordinator_->DeleteOperation({operationId.Id}); deleteOperationFuture.Subscribe([&, sessionId, operationId] (const auto& deleteFuture) { auto deleteOperationResult = deleteFuture.GetValueSync(); auto deleteOperationStatus = deleteOperationResult.Status; @@ -114,15 +122,28 @@ public: auto future = DoMerge(inputTables, outputTable, std::move(options)); return future.Apply([this, pos = nodePos, outputTable = std::move(outputTable), options = std::move(options)] (const TFuture<TFmrOperationResult>& f) { try { - f.GetValue(); // rethrow error if any + auto fmrOperationResult = f.GetValue(); // rethrow error if any TString sessionId = options.SessionId(); auto config = options.Config(); - TString fmrOutputTableId = outputTable.Cluster + "." + outputTable.Path; + TFmrTableId fmrOutputTableId = {outputTable.Cluster, outputTable.Path}; SetTablePresenceStatus(fmrOutputTableId, sessionId, ETablePresenceStatus::OnlyInFmr); TRunResult result; - result.OutTableStats.emplace_back(outputTable.Path, MakeIntrusive<TYtTableStatInfo>()); // TODO - add statistics? - result.OutTableStats.back().second->Id = "fmr_" + fmrOutputTableId; - result.SetSuccess(); + auto tableStats = fmrOperationResult.TablesStats.back(); + result.OutTableStats.emplace_back(outputTable.Path, MakeIntrusive<TYtTableStatInfo>()); + result.OutTableStats.back().second->Id = "fmr_" + fmrOutputTableId.Id; + result.OutTableStats.back().second->RecordsCount = tableStats.Rows; + result.OutTableStats.back().second->DataSize = tableStats.DataWeight; + result.OutTableStats.back().second->ChunkCount = tableStats.Chunks; + YQL_CLOG(INFO, FastMapReduce) << "Fmr output table info: RecordsCount = " << result.OutTableStats.back().second->RecordsCount << " DataSize = " << result.OutTableStats.back().second->DataSize << " ChunkCount = " << result.OutTableStats.back().second->ChunkCount; + auto operationErrors = fmrOperationResult.Errors; + TVector<TIssue> issues; + for (const auto& error : operationErrors) { + issues.emplace_back(ToIssue(error, pos)); + } + result.AddIssues(issues); + if (fmrOperationResult.Success()) { + result.SetSuccess(); + } return MakeFuture<TRunResult>(std::move(result)); } catch (...) { return MakeFuture(ResultFromCurrentException<TRunResult>(pos)); @@ -147,14 +168,14 @@ public: auto outputPath = publish.Publish().Name().StringValue(); bool isAnonymous = NYql::HasSetting(publish.Publish().Settings().Ref(), EYtSettingType::Anonymous); - std::vector<TString> currentAnonymousTableAliases; + std::vector<TFmrTableId> currentAnonymousTableAliases; for (auto out: publish.Input()) { - auto inputCluster = GetOutTableWithCluster(out).second; + TString inputCluster = GetOutTableWithCluster(out).second; auto outTable = GetOutTable(out).Cast<TYtOutTable>(); - TStringBuf inputPath = outTable.Name().Value(); + TString inputPath = ToString(outTable.Name().Value()); if (isAnonymous) { - currentAnonymousTableAliases.emplace_back(inputCluster + "." + inputPath); + currentAnonymousTableAliases.emplace_back(TFmrTableId(inputCluster, inputPath)); } auto outputBase = out.Operation().Cast<TYtOutputOpBase>().Ptr(); uploadFmrTablesToYtFutures.emplace_back(DoUpload(inputCluster, TString(inputPath), sessionId, config, outputBase, ctx)); @@ -162,7 +183,7 @@ public: if (isAnonymous) { YQL_CLOG(DEBUG, FastMapReduce) << "Table " << outputPath << " is anonymous, not uploading from fmr to yt"; - TString fmrOutputTableId = cluster + "." + outputPath; + TFmrTableId fmrOutputTableId = {cluster, outputPath}; SetTablePresenceStatus(fmrOutputTableId, sessionId, ETablePresenceStatus::OnlyInFmr); // TODO - figure out what to do here in case of multiple inputs @@ -218,11 +239,11 @@ public: YQL_ENSURE(sessions.contains(sessionId)); auto& operationStates = sessions[sessionId].OperationStates; - auto cancelOperationsFunc = [&] (std::unordered_map<TString, TPromise<TFmrOperationResult>>& operationStatuses) { + auto cancelOperationsFunc = [&] (std::unordered_map<TFmrTableId, TPromise<TFmrOperationResult>>& operationStatuses) { std::vector<TFuture<TDeleteOperationResponse>> cancelOperationsFutures; for (auto& [operationId, promise]: operationStatuses) { - cancelOperationsFutures.emplace_back(Coordinator_->DeleteOperation({operationId})); + cancelOperationsFutures.emplace_back(Coordinator_->DeleteOperation({operationId.Id})); } NThreading::WaitAll(cancelOperationsFutures).GetValueSync(); }; @@ -243,7 +264,7 @@ private: return richPath.Path_; } - void SetTablePresenceStatus(const TString& fmrTableId, const TString& sessionId, ETablePresenceStatus newStatus) { + void SetTablePresenceStatus(const TFmrTableId& fmrTableId, const TString& sessionId, ETablePresenceStatus newStatus) { with_lock(SessionStates_->Mutex) { YQL_CLOG(DEBUG, FastMapReduce) << "Setting table presence status " << newStatus << " for table with id " << fmrTableId; auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses; @@ -251,7 +272,7 @@ private: } } - void SetFmrIdAlias(const TString& fmrTableId, const TString& alias, const TString& sessionId) { + void SetFmrIdAlias(const TFmrTableId& fmrTableId, const TFmrTableId& alias, const TString& sessionId) { with_lock(SessionStates_->Mutex) { YQL_CLOG(DEBUG, FastMapReduce) << "Setting table fmr id alias " << alias << " for table with id " << fmrTableId; auto& fmrIdAliases = SessionStates_->Sessions[sessionId].FmrIdAliases; @@ -259,7 +280,7 @@ private: } } - TString GetFmrIdOrAlias(const TString& fmrTableId, const TString& sessionId) { + TFmrTableId GetFmrIdOrAlias(const TFmrTableId& fmrTableId, const TString& sessionId) { with_lock(SessionStates_->Mutex) { auto& fmrIdAliases = SessionStates_->Sessions[sessionId].FmrIdAliases; if (!fmrIdAliases.contains(fmrTableId)) { @@ -269,7 +290,7 @@ private: } } - TMaybe<ETablePresenceStatus> GetTablePresenceStatus(const TString& fmrTableId, const TString& sessionId) { + TMaybe<ETablePresenceStatus> GetTablePresenceStatus(const TFmrTableId& fmrTableId, const TString& sessionId) { with_lock(SessionStates_->Mutex) { auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses; if (!tablePresenceStatuses.contains(fmrTableId)) { @@ -320,6 +341,7 @@ private: TFuture<TFmrOperationResult> GetRunningOperationFuture(const TStartOperationRequest& startOperationRequest, const TString& sessionId) { auto promise = NewPromise<TFmrOperationResult>(); auto future = promise.GetFuture(); + YQL_CLOG(INFO, FastMapReduce) << "Starting " << startOperationRequest.TaskType << " operation"; auto startOperationResponseFuture = Coordinator_->StartOperation(startOperationRequest); startOperationResponseFuture.Subscribe([this, promise = std::move(promise), sessionId] (const auto& mergeFuture) { TStartOperationResponse mergeOperationResponse = mergeFuture.GetValueSync(); @@ -336,11 +358,11 @@ private: TFuture<TFmrOperationResult> DoUpload(const TString& outputCluster, const TString& outputPath, const TString& sessionId, TYtSettings::TConstPtr& config, TExprNode::TPtr outputOpBase, TExprContext& ctx) { YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId); - TFmrTableRef fmrTableRef{outputCluster + "." + outputPath}; - auto tablePresenceStatus = GetTablePresenceStatus(fmrTableRef.TableId, sessionId); + TFmrTableRef fmrTableRef{TFmrTableId(outputCluster, outputPath)}; + auto tablePresenceStatus = GetTablePresenceStatus(fmrTableRef.FmrTableId, sessionId); if (!tablePresenceStatus || *tablePresenceStatus != ETablePresenceStatus::OnlyInFmr) { - YQL_CLOG(DEBUG, FastMapReduce) << " We assume table " << fmrTableRef.TableId << " should be present in yt, not uploading from fmr"; + YQL_CLOG(INFO, FastMapReduce) << " We assume table " << fmrTableRef.FmrTableId << " should be present in yt, not uploading from fmr"; return GetSuccessfulFmrOperationResult(); } @@ -357,7 +379,7 @@ private: .SessionId = sessionId, .IdempotencyKey = GenerateId(), .NumRetries=1, - .ClusterConnections = std::unordered_map<TString, TClusterConnection>{{fmrTableRef.TableId, clusterConnection}}, + .ClusterConnections = std::unordered_map<TFmrTableId, TClusterConnection>{{fmrTableRef.FmrTableId, clusterConnection}}, .FmrOperationSpec = config->FmrOperationSpec.Get(outputCluster) }; @@ -365,11 +387,11 @@ private: .Config(config); auto prepareFuture = Slave_->Prepare(outputOpBase, ctx, std::move(prepareOptions)); - return prepareFuture.Apply([this, uploadRequest = std::move(uploadRequest), sessionId = std::move(sessionId), fmrTableId = std::move(fmrTableRef.TableId)] (const TFuture<TRunResult>& f) mutable { + return prepareFuture.Apply([this, uploadRequest = std::move(uploadRequest), sessionId = std::move(sessionId), fmrTableId = std::move(fmrTableRef.FmrTableId)] (const TFuture<TRunResult>& f) mutable { try { f.GetValue(); YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId); - YQL_CLOG(DEBUG, FastMapReduce) << "Starting upload from fmr to yt for table: " << fmrTableId; + YQL_CLOG(INFO, FastMapReduce) << "Starting upload from fmr to yt for table: " << fmrTableId; return GetRunningOperationFuture(uploadRequest, sessionId).Apply([this, sessionId = std::move(sessionId), fmrTableId = std::move(fmrTableId)] (const TFuture<TFmrOperationResult>& f) { try { YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId); @@ -397,13 +419,13 @@ private: } TString outputCluster = outputTable.Cluster, outputPath = outputTable.Path; - TFmrTableRef fmrOutputTable{.TableId = outputCluster + "." + outputPath}; + TFmrTableRef fmrOutputTable{TFmrTableId(outputCluster, outputPath)}; std::vector<TOperationTableRef> mergeInputTables; - std::unordered_map<TString, TClusterConnection> clusterConnections; + std::unordered_map<TFmrTableId, TClusterConnection> clusterConnections; for (auto [ytTable, isTemp]: inputTables) { TString inputCluster = ytTable.Cluster, inputPath = ytTable.Path; - TString fmrTableId = inputCluster + "." + inputPath; + TFmrTableId fmrTableId = {inputCluster, inputPath}; auto tablePresenceStatus = GetTablePresenceStatus(fmrTableId, sessionId); if (!tablePresenceStatus) { SetTablePresenceStatus(fmrTableId, sessionId, ETablePresenceStatus::OnlyInYt); @@ -411,7 +433,7 @@ private: if (tablePresenceStatus && *tablePresenceStatus != ETablePresenceStatus::OnlyInYt) { // table is in fmr, do not download - mergeInputTables.emplace_back(TFmrTableRef{.TableId = GetFmrIdOrAlias(fmrTableId, sessionId)}); + mergeInputTables.emplace_back(TFmrTableRef(GetFmrIdOrAlias(fmrTableId, sessionId))); } else { ytTable.FilePath = GetTableFilePath(TGetTableFilePathOptions(sessionId).Cluster(inputCluster).Path(inputPath).IsTemp(isTemp)); mergeInputTables.emplace_back(ytTable); @@ -435,7 +457,7 @@ private: return table.first.Path;} ); - YQL_CLOG(DEBUG, FastMapReduce) << "Starting merge from yt tables: " << JoinRange(' ', inputPaths.begin(), inputPaths.end()); + YQL_CLOG(INFO, FastMapReduce) << "Starting merge from yt tables: " << JoinRange(' ', inputPaths.begin(), inputPaths.end()); return GetRunningOperationFuture(mergeOperationRequest, sessionId); } @@ -447,14 +469,14 @@ private: private: struct TFmrGatewayOperationsState { - std::unordered_map<TString, TPromise<TFmrOperationResult>> OperationStatuses = {}; // operationId -> promise which we set when operation completes + std::unordered_map<TFmrTableId, TPromise<TFmrOperationResult>> OperationStatuses = {}; // operationId -> promise which we set when operation completes }; struct TSessionInfo { TFmrGatewayOperationsState OperationStates; - std::unordered_map<TString, ETablePresenceStatus> TablePresenceStatuses; // yt cluster and path -> is it In Yt, Fmr TableDataService + std::unordered_map<TFmrTableId, ETablePresenceStatus> TablePresenceStatuses; // yt cluster and path -> is it In Yt, Fmr TableDataService TString UserName; - std::unordered_map<TString, TString> FmrIdAliases; + std::unordered_map<TFmrTableId, TFmrTableId> FmrIdAliases; }; struct TSession { |