aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2025-04-15 17:28:48 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2025-04-15 17:39:27 +0300
commita004c2e3c2279e8f31e98d74ed3b7773a34a9557 (patch)
tree867a5833c9cd19e6b6d5e307e823d0646681b882
parent9dd7e9152369a7948af5bfd3a2b1e2a1f7cd32ee (diff)
downloadydb-a004c2e3c2279e8f31e98d74ed3b7773a34a9557.tar.gz
Intermediate changes
commit_hash:2f170b3ffc5c6d88add9677f8d5819e7bbd549fe
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp11
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp18
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp4
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h3
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp14
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp10
-rw-r--r--yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h6
-rw-r--r--yt/yql/providers/yt/fmr/proto/request_options.proto20
-rw-r--r--yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp58
-rw-r--r--yt/yql/providers/yt/fmr/request_options/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp15
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h34
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp4
-rw-r--r--yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp100
14 files changed, 195 insertions, 103 deletions
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
index fef5f4b110b..c64bc88aaf2 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
@@ -46,7 +46,7 @@ private:
TDownloadOperationParams downloadOperationParams{
.Input = TYtTableRef{"Path","Cluster"},
- .Output = TFmrTableRef{"TableId"}
+ .Output = TFmrTableRef{{"TestCluster", "TestPath"}}
};
TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Download, TOperationParams operationParams = downloadOperationParams) {
@@ -54,7 +54,7 @@ TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Do
.TaskType = taskType,
.OperationParams = operationParams,
.IdempotencyKey = "IdempotencyKey",
- .ClusterConnections = {{"Cluster.Path", TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}}}
+ .ClusterConnections = {{TFmrTableId("Cluster", "Path"), TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}}}
};
}
@@ -67,7 +67,7 @@ std::vector<TStartOperationRequest> CreateSeveralOperationRequests(
.TaskType = taskType,
.OperationParams = operationParams,
.IdempotencyKey = "IdempotencyKey_" + ToString(i),
- .ClusterConnections = {{"Cluster.Path", TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}}}
+ .ClusterConnections = {{TFmrTableId("Cluster", "Path"), TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}}}
};
}
return startOperationRequests;
@@ -172,7 +172,10 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
auto startOperationResponse = coordinator->StartOperation(request).GetValueSync();
downloadOperationIds.emplace_back(startOperationResponse.OperationId);
}
- auto uploadOperationRequest = CreateOperationRequest(ETaskType::Upload, TUploadOperationParams{});
+ auto uploadOperationRequest = CreateOperationRequest(ETaskType::Upload, TUploadOperationParams{
+ {{"Cluster", "Path"}},
+ {}
+ });
auto uploadOperationResponse = coordinator->StartOperation(uploadOperationRequest).GetValueSync();
auto uploadOperationId = uploadOperationResponse.OperationId;
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
index 2d9a68042e5..625f034c3fd 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
@@ -19,6 +19,7 @@ struct TOperationInfo {
EOperationStatus OperationStatus;
std::vector<TFmrError> ErrorMessages;
TString SessionId;
+ std::vector<TString> OutputTableIds = {};
};
struct TIdempotencyKeyInfo {
@@ -85,7 +86,11 @@ public:
auto& operationInfo = Operations_[operationId];
auto operationStatus = operationInfo.OperationStatus;
auto errorMessages = operationInfo.ErrorMessages;
- return NThreading::MakeFuture(TGetOperationResponse(operationStatus, errorMessages));
+ std::vector<TTableStats> outputTablesStats;
+ for (auto& tableId : operationInfo.OutputTableIds) {
+ outputTablesStats.emplace_back(FmrTableStatistics_[tableId].Stats);
+ }
+ return NThreading::MakeFuture(TGetOperationResponse(operationStatus, errorMessages, outputTablesStats));
}
NThreading::TFuture<TDeleteOperationResponse> DeleteOperation(const TDeleteOperationRequest& request) override {
@@ -159,6 +164,7 @@ public:
Cerr << "Current statistic from table with id" << fmrTableId.TableId << "_" << fmrTableId.PartId << ": " << tableStats;
}
}
+ Operations_[operationId].OutputTableIds.emplace_back(fmrTableId.TableId);
FmrTableStatistics_[fmrTableId.TableId] = TCoordinatorFmrTableStats{
.Stats = tableStats,
.PartId = fmrTableId.PartId
@@ -280,7 +286,7 @@ private:
if (const TUploadOperationParams* uploadOperationParams = std::get_if<TUploadOperationParams>(&operationParams)) {
TUploadTaskParams uploadTaskParams{};
uploadTaskParams.Output = uploadOperationParams->Output;
- TString inputTableId = uploadOperationParams->Input.TableId;
+ TString inputTableId = uploadOperationParams->Input.FmrTableId.Id;
TFmrTableInputRef fmrTableInput{
.TableId = inputTableId,
.TableRanges = {GetTableRangeFromId(inputTableId)}
@@ -290,7 +296,7 @@ private:
} else if (const TDownloadOperationParams* downloadOperationParams = std::get_if<TDownloadOperationParams>(&operationParams)) {
TDownloadTaskParams downloadTaskParams{};
downloadTaskParams.Input = downloadOperationParams->Input;
- TString outputTableId = downloadOperationParams->Output.TableId;
+ TString outputTableId = downloadOperationParams->Output.FmrTableId.Id;
TFmrTableOutputRef fmrTableOutput{
.TableId = outputTableId,
.PartId = GetTableRangeFromId(outputTableId).PartId
@@ -306,7 +312,7 @@ private:
mergeInputTasks.emplace_back(*ytTableRef);
} else {
TFmrTableRef fmrTableRef = std::get<TFmrTableRef>(elem);
- TString inputTableId = fmrTableRef.TableId;
+ TString inputTableId = fmrTableRef.FmrTableId.Id;
TFmrTableInputRef tableInput{
.TableId = inputTableId,
.TableRanges = {GetTableRangeFromId(inputTableId)}
@@ -316,7 +322,7 @@ private:
}
mergeTaskParams.Input = mergeInputTasks;
TFmrTableOutputRef outputTable;
- mergeTaskParams.Output = TFmrTableOutputRef{.TableId = mergeOperationParams.Output.TableId};
+ mergeTaskParams.Output = TFmrTableOutputRef{.TableId = mergeOperationParams.Output.FmrTableId.Id};
return mergeTaskParams;
}
}
@@ -356,7 +362,7 @@ private:
std::atomic<bool> StopCoordinator_;
TDuration TimeToSleepBetweenClearKeyRequests_;
TDuration IdempotencyKeyStoreTime_;
- std::unordered_map<TString, TCoordinatorFmrTableStats> FmrTableStatistics_; // TableId -> Statistics
+ std::unordered_map<TFmrTableId, TCoordinatorFmrTableStats> FmrTableStatistics_; // TableId -> Statistics
TMaybe<NYT::TNode> DefaultFmrOperationSpec_;
};
diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp
index 80367f0bce2..1e2407b1ea5 100644
--- a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp
@@ -72,7 +72,7 @@ NProto::TStartOperationRequest StartOperationRequestToProto(const TStartOperatio
protoStartOperationRequest.SetNumRetries(startOperationRequest.NumRetries);
auto clusterConnections = *protoStartOperationRequest.MutableClusterConnections();
for (auto& [tableName, conn]: startOperationRequest.ClusterConnections) {
- clusterConnections[tableName] = ClusterConnectionToProto(conn);
+ clusterConnections[tableName.Id] = ClusterConnectionToProto(conn);
}
if (startOperationRequest.FmrOperationSpec) {
protoStartOperationRequest.SetFmrOperationSpec(NYT::NodeToYsonString(*startOperationRequest.FmrOperationSpec));
@@ -89,7 +89,7 @@ TStartOperationRequest StartOperationRequestFromProto(const NProto::TStartOperat
startOperationRequest.IdempotencyKey = protoStartOperationRequest.GetIdempotencyKey();
}
startOperationRequest.NumRetries = protoStartOperationRequest.GetNumRetries();
- std::unordered_map<TString, TClusterConnection> startOperationRequestClusterConnections;
+ std::unordered_map<TFmrTableId, TClusterConnection> startOperationRequestClusterConnections;
for (auto& [tableName, conn]: protoStartOperationRequest.GetClusterConnections()) {
startOperationRequestClusterConnections[tableName] = ClusterConnectionFromProto(conn);
}
diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
index a045aa5bb01..a42cb0d35a6 100644
--- a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
+++ b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
@@ -25,7 +25,7 @@ struct TStartOperationRequest {
TString SessionId;
TMaybe<TString> IdempotencyKey = Nothing();
ui32 NumRetries = 1; // Not supported yet
- std::unordered_map<TString, TClusterConnection> ClusterConnections = {};
+ std::unordered_map<TFmrTableId, TClusterConnection> ClusterConnections = {};
TMaybe<NYT::TNode> FmrOperationSpec = Nothing();
};
@@ -41,6 +41,7 @@ struct TGetOperationRequest {
struct TGetOperationResponse {
EOperationStatus Status;
std::vector<TFmrError> ErrorMessages = {};
+ std::vector<TTableStats> OutputTablesStats = {};
};
struct TDeleteOperationRequest {
diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
index 482225a0fce..11b30362ee7 100644
--- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
@@ -47,7 +47,7 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
TDownloadTaskParams params = TDownloadTaskParams(input, output);
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
- auto res = job->Download(params, {{"test_cluster.test_path", TClusterConnection()}});
+ auto res = job->Download(params, {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}});
auto err = std::get_if<TError>(&res);
auto statistics = std::get_if<TStatistics>(&res);
@@ -75,7 +75,7 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0);
tableDataServicePtr->Put(key, GetBinaryYson(TableContent_1));
- auto res = job->Upload(params, {{"test_cluster.test_path", TClusterConnection()}});
+ auto res = job->Upload(params, {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}});
auto err = std::get_if<TError>(&res);
@@ -110,7 +110,7 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
tableDataServicePtr->Put(key_1, GetBinaryYson(TableContent_1));
tableDataServicePtr->Put(key_3, GetBinaryYson(TableContent_3));
- auto res = job->Merge(params, {{"test_cluster.test_path", TClusterConnection()}});
+ auto res = job->Merge(params, {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}});
auto err = std::get_if<TError>(&res);
UNIT_ASSERT_C(!err, err->ErrorMessage);
@@ -133,7 +133,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id", "test_part_id");
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
TDownloadTaskParams params = TDownloadTaskParams(input, output);
- TTask::TPtr task = MakeTask(ETaskType::Download, "test_task_id", params, "test_session_id", {{"test_cluster.test_path", TClusterConnection()}});
+ TTask::TPtr task = MakeTask(ETaskType::Download, "test_task_id", params, "test_session_id", {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}});
ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed);
@@ -154,7 +154,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
TYtTableRef output = TYtTableRef("test_cluster", "test_path");
TUploadTaskParams params = TUploadTaskParams(input, output);
- TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", {{"test_cluster.test_path", TClusterConnection()}});
+ TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}});
auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0);
tableDataServicePtr->Put(key, GetBinaryYson(TableContent_1));
ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
@@ -175,7 +175,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
TYtTableRef output = TYtTableRef("test_cluster", "test_path");
TUploadTaskParams params = TUploadTaskParams(input, output);
- TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", {{"test_cluster.test_path", TClusterConnection()}});
+ TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}});
// No tables in tableDataService
try {
@@ -204,7 +204,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
auto params = TMergeTaskParams(inputs, output);
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
- TTask::TPtr task = MakeTask(ETaskType::Merge, "test_task_id", params, "test_session_id", {{"test_cluster.test_path", TClusterConnection()}});
+ TTask::TPtr task = MakeTask(ETaskType::Merge, "test_task_id", params, "test_session_id", {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}});
auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0);
auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0);
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
index c1721456cdc..358c4215970 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
@@ -21,7 +21,7 @@ public:
{
}
- virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections) override {
+ virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) override {
try {
const auto ytTable = params.Input;
const auto cluster = params.Input.Cluster;
@@ -46,7 +46,7 @@ public:
}
}
- virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections) override {
+ virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) override {
try {
const auto ytTable = params.Output;
const auto cluster = params.Output.Cluster;
@@ -68,7 +68,7 @@ public:
}
}
- virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections) override {
+ virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) override {
// расширить таск парамс. добавить туда мету
try {
const auto inputs = params.Input;
@@ -93,11 +93,11 @@ public:
}
private:
- NYT::TRawTableReaderPtr GetTableInputStream(const TTaskTableRef& tableRef, const std::unordered_map<TString, TClusterConnection>& clusterConnections) const {
+ NYT::TRawTableReaderPtr GetTableInputStream(const TTaskTableRef& tableRef, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) const {
auto ytTable = std::get_if<TYtTableRef>(&tableRef);
auto fmrTable = std::get_if<TFmrTableInputRef>(&tableRef);
if (ytTable) {
- TString tableId = ytTable->Cluster + "." + ytTable->Path;
+ TFmrTableId tableId = {ytTable->Cluster, ytTable->Path};
auto clusterConnection = clusterConnections.at(tableId);
return YtService_->MakeReader(*ytTable, clusterConnection); // TODO - pass YtReader settings from Gateway
} else if (fmrTable) {
diff --git a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h
index 022a6256db0..18d9553ba01 100644
--- a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h
+++ b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h
@@ -10,11 +10,11 @@ public:
virtual ~IFmrJob() = default;
- virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections = {}) = 0;
+ virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}) = 0;
- virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections = {}) = 0;
+ virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}) = 0;
- virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections = {}) = 0;
+ virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}) = 0;
};
} // namespace NYql
diff --git a/yt/yql/providers/yt/fmr/proto/request_options.proto b/yt/yql/providers/yt/fmr/proto/request_options.proto
index 62c917b4e50..60c8471846f 100644
--- a/yt/yql/providers/yt/fmr/proto/request_options.proto
+++ b/yt/yql/providers/yt/fmr/proto/request_options.proto
@@ -35,12 +35,18 @@ enum EFmrComponent {
COMPONENT_JOB = 3;
}
+enum EFmrReason {
+ REASON_UNKNOWN = 0;
+ USER_ERROR = 1;
+}
+
message TFmrError {
EFmrComponent Component = 1;
- string ErrorMessage = 2;
- optional uint32 WorkerId = 3;
- optional string TaskId = 4;
- optional string OperationId = 5;
+ EFmrReason Reason = 2;
+ string ErrorMessage = 3;
+ optional uint32 WorkerId = 4;
+ optional string TaskId = 5;
+ optional string OperationId = 6;
}
message TYtTableRef {
@@ -49,8 +55,12 @@ message TYtTableRef {
optional string FilePath = 3;
}
+message TFmrTableId {
+ string Id = 1;
+}
+
message TFmrTableRef {
- string TableId = 1;
+ TFmrTableId FmrTableId = 1;
}
message TTableRange {
diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
index 20dfb5362a9..b0c368008dc 100644
--- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
+++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
@@ -6,6 +6,7 @@ namespace NYql::NFmr {
NProto::TFmrError FmrErrorToProto(const TFmrError& error) {
NProto::TFmrError protoError;
protoError.SetComponent(static_cast<NProto::EFmrComponent>(error.Component));
+ protoError.SetReason(static_cast<NProto::EFmrReason>(error.Reason));
protoError.SetErrorMessage(error.ErrorMessage);
if (error.WorkerId) {
protoError.SetWorkerId(*error.WorkerId);
@@ -22,6 +23,7 @@ NProto::TFmrError FmrErrorToProto(const TFmrError& error) {
TFmrError FmrErrorFromProto(const NProto::TFmrError& protoError) {
TFmrError fmrError;
fmrError.Component = static_cast<EFmrComponent>(protoError.GetComponent());
+ fmrError.Reason = static_cast<EFmrErrorReason>(protoError.GetReason());
fmrError.ErrorMessage = protoError.GetErrorMessage();
if (protoError.HasWorkerId()) {
fmrError.WorkerId = protoError.GetWorkerId();
@@ -55,16 +57,26 @@ TYtTableRef YtTableRefFromProto(const NProto::TYtTableRef protoYtTableRef) {
return ytTableRef;
}
+NProto::TFmrTableId FmrTableIdToProto(const TFmrTableId& fmrTableId) {
+ NProto::TFmrTableId protoFmrTableId;
+ protoFmrTableId.SetId(fmrTableId.Id);
+ return protoFmrTableId;
+}
+
+TFmrTableId FmrTableIdFromProto(const NProto::TFmrTableId& protoFmrTableId) {
+ return TFmrTableId(protoFmrTableId.GetId());
+}
+
NProto::TFmrTableRef FmrTableRefToProto(const TFmrTableRef& fmrTableRef) {
NProto::TFmrTableRef protoFmrTableRef;
- protoFmrTableRef.SetTableId(fmrTableRef.TableId);
+ auto protoFmrTableId = FmrTableIdToProto(fmrTableRef.FmrTableId);
+ protoFmrTableRef.MutableFmrTableId()->Swap(&protoFmrTableId);
return protoFmrTableRef;
}
TFmrTableRef FmrTableRefFromProto(const NProto::TFmrTableRef protoFmrTableRef) {
- TFmrTableRef fmrTableRef;
- fmrTableRef.TableId = protoFmrTableRef.GetTableId();
- return fmrTableRef;
+ auto tableId = FmrTableIdFromProto(protoFmrTableRef.GetFmrTableId());
+ return TFmrTableRef(tableId);
}
NProto::TTableRange TableRangeToProto(const TTableRange& tableRange) {
@@ -227,10 +239,10 @@ NProto::TUploadTaskParams UploadTaskParamsToProto(const TUploadTaskParams& uploa
}
TUploadOperationParams UploadOperationParamsFromProto(const NProto::TUploadOperationParams& protoUploadOperationParams) {
- TUploadOperationParams uploadOperationParams;
- uploadOperationParams.Input = FmrTableRefFromProto(protoUploadOperationParams.GetInput());
- uploadOperationParams.Output = YtTableRefFromProto(protoUploadOperationParams.GetOutput());
- return uploadOperationParams;
+ return TUploadOperationParams(
+ FmrTableRefFromProto(protoUploadOperationParams.GetInput()),
+ YtTableRefFromProto(protoUploadOperationParams.GetOutput())
+ );
}
TUploadTaskParams UploadTaskParamsFromProto(const NProto::TUploadTaskParams& protoUploadTaskParams) {
@@ -259,10 +271,10 @@ NProto::TDownloadTaskParams DownloadTaskParamsToProto(const TDownloadTaskParams&
}
TDownloadOperationParams DownloadOperationParamsFromProto(const NProto::TDownloadOperationParams& protoDownloadOperationParams) {
- TDownloadOperationParams downloadOperationParams;
- downloadOperationParams.Input = YtTableRefFromProto(protoDownloadOperationParams.GetInput());
- downloadOperationParams.Output = FmrTableRefFromProto(protoDownloadOperationParams.GetOutput());
- return downloadOperationParams;
+ return TDownloadOperationParams(
+ YtTableRefFromProto(protoDownloadOperationParams.GetInput()),
+ FmrTableRefFromProto(protoDownloadOperationParams.GetOutput())
+ );
}
TDownloadTaskParams DownloadTaskParamsFromProto(const NProto::TDownloadTaskParams& protoDownloadTaskParams) {
@@ -297,14 +309,14 @@ NProto::TMergeTaskParams MergeTaskParamsToProto(const TMergeTaskParams& mergeTas
}
TMergeOperationParams MergeOperationParamsFromProto(const NProto::TMergeOperationParams& protoMergeOperationParams) {
- TMergeOperationParams mergeOperationParams;
- std::vector<TOperationTableRef> input;
+ TMergeOperationParams mergeOperationParams(
+ {},
+ FmrTableRefFromProto(protoMergeOperationParams.GetOutput())
+ );
for (size_t i = 0; i < protoMergeOperationParams.InputSize(); ++i) {
TOperationTableRef inputTable = OperationTableRefFromProto(protoMergeOperationParams.GetInput(i));
- input.emplace_back(inputTable);
+ mergeOperationParams.Input.emplace_back(inputTable);
}
- mergeOperationParams.Input = input;
- mergeOperationParams.Output = FmrTableRefFromProto(protoMergeOperationParams.GetOutput());
return mergeOperationParams;
}
@@ -353,15 +365,13 @@ NProto::TTaskParams TaskParamsToProto(const TTaskParams& taskParams) {
}
TOperationParams OperationParamsFromProto(const NProto::TOperationParams& protoOperationParams) {
- TOperationParams operationParams;
if (protoOperationParams.HasDownloadOperationParams()) {
- operationParams = DownloadOperationParamsFromProto(protoOperationParams.GetDownloadOperationParams());
+ return DownloadOperationParamsFromProto(protoOperationParams.GetDownloadOperationParams());
} else if (protoOperationParams.HasUploadOperationParams()) {
- operationParams = UploadOperationParamsFromProto(protoOperationParams.GetUploadOperationParams());
+ return UploadOperationParamsFromProto(protoOperationParams.GetUploadOperationParams());
} else {
- operationParams = MergeOperationParamsFromProto(protoOperationParams.GetMergeOperationParams());
+ return MergeOperationParamsFromProto(protoOperationParams.GetMergeOperationParams());
}
- return operationParams;
}
TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams) {
@@ -406,7 +416,7 @@ NProto::TTask TaskToProto(const TTask& task) {
protoTask.SetNumRetries(task.NumRetries);
auto clusterConnections = *protoTask.MutableClusterConnections();
for (auto& [tableName, conn]: task.ClusterConnections) {
- clusterConnections[tableName] = ClusterConnectionToProto(conn);
+ clusterConnections[tableName.Id] = ClusterConnectionToProto(conn);
}
if (task.JobSettings) {
protoTask.SetJobSettings(NYT::NodeToYsonString(*task.JobSettings));
@@ -421,7 +431,7 @@ TTask TaskFromProto(const NProto::TTask& protoTask) {
task.TaskParams = TaskParamsFromProto(protoTask.GetTaskParams());
task.SessionId = protoTask.GetSessionId();
task.NumRetries = protoTask.GetNumRetries();
- std::unordered_map<TString, TClusterConnection> taskClusterConnections;
+ std::unordered_map<TFmrTableId, TClusterConnection> taskClusterConnections;
for (auto& [tableName, conn]: protoTask.GetClusterConnections()) {
taskClusterConnections[tableName] = ClusterConnectionFromProto(conn);
}
diff --git a/yt/yql/providers/yt/fmr/request_options/ya.make b/yt/yql/providers/yt/fmr/request_options/ya.make
index df82ec258fe..4e74eb8b185 100644
--- a/yt/yql/providers/yt/fmr/request_options/ya.make
+++ b/yt/yql/providers/yt/fmr/request_options/ya.make
@@ -7,6 +7,7 @@ SRCS(
PEERDIR(
library/cpp/yson/node
library/cpp/threading/future
+ yql/essentials/public/issue
)
YQL_LAST_ABI_VERSION()
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
index 8a0bcde5751..6da844918da 100644
--- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
@@ -2,7 +2,15 @@
namespace NYql::NFmr {
-TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const std::unordered_map<TString, TClusterConnection>& clusterConnections, const TMaybe<NYT::TNode>& jobSettings) {
+TFmrTableId::TFmrTableId(const TString& id): Id(id)
+{
+};
+
+TFmrTableId::TFmrTableId(const TString& cluster, const TString& path): Id(cluster + "." + path)
+{
+};
+
+TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections, const TMaybe<NYT::TNode>& jobSettings) {
return MakeIntrusive<TTask>(taskType, taskId, taskParams, sessionId, clusterConnections, jobSettings);
}
@@ -17,6 +25,11 @@ TString TFmrChunkMeta::ToString() const {
} // namespace NYql::NFmr
template<>
+void Out<NYql::NFmr::TFmrTableId>(IOutputStream& out, const NYql::NFmr::TFmrTableId& tableId) {
+ out << tableId.Id;
+}
+
+template<>
void Out<NYql::NFmr::TFmrError>(IOutputStream& out, const NYql::NFmr::TFmrError& error) {
out << "FmrError[" << error.Component << "]";
if (error.Component == NYql::NFmr::EFmrComponent::Worker) {
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
index bab3df01aae..855e6b8e7d8 100644
--- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
@@ -41,8 +41,14 @@ enum class EFmrComponent {
Job
};
+enum class EFmrErrorReason {
+ ReasonUnknown,
+ UserError // TODO Add more reasons
+};
+
struct TFmrError {
EFmrComponent Component;
+ EFmrErrorReason Reason;
TString ErrorMessage;
TMaybe<ui32> WorkerId;
TMaybe<TString> TaskId;
@@ -62,8 +68,20 @@ struct TYtTableRef {
bool operator == (const TYtTableRef&) const = default;
};
+struct TFmrTableId {
+ TString Id;
+
+ TFmrTableId() = default;
+
+ TFmrTableId(const TString& id);
+
+ TFmrTableId(const TString& cluster, const TString& path);
+
+ bool operator == (const TFmrTableId&) const = default;
+};
+
struct TFmrTableRef {
- TString TableId;
+ TFmrTableId FmrTableId;
};
struct TTableRange {
@@ -102,6 +120,14 @@ struct TTableStats {
} // namespace NYql::NFmr
namespace std {
+
+ template<>
+ struct hash<NYql::NFmr::TFmrTableId> {
+ size_t operator()(const NYql::NFmr::TFmrTableId& tableId) const {
+ return hash<TString>()(tableId.Id);
+ }
+ };
+
template<>
struct hash<NYql::NFmr::TFmrTableOutputRef> {
size_t operator()(const NYql::NFmr::TFmrTableOutputRef& ref) const {
@@ -170,7 +196,7 @@ struct TClusterConnection {
struct TTask: public TThrRefBase {
TTask() = default;
- TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const std::unordered_map<TString, TClusterConnection>& clusterConnections, const TMaybe<NYT::TNode> & jobSettings = Nothing(), ui32 numRetries = 1)
+ TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections, const TMaybe<NYT::TNode> & jobSettings = Nothing(), ui32 numRetries = 1)
: TaskType(taskType), TaskId(taskId), TaskParams(taskParams), SessionId(sessionId), ClusterConnections(clusterConnections), JobSettings(jobSettings), NumRetries(numRetries)
{
}
@@ -179,7 +205,7 @@ struct TTask: public TThrRefBase {
TString TaskId;
TTaskParams TaskParams = {};
TString SessionId;
- std::unordered_map<TString, TClusterConnection> ClusterConnections = {};
+ std::unordered_map<TFmrTableId, TClusterConnection> ClusterConnections = {};
TMaybe<NYT::TNode> JobSettings = {};
ui32 NumRetries; // Not supported yet
@@ -201,7 +227,7 @@ struct TTaskState: public TThrRefBase {
using TPtr = TIntrusivePtr<TTaskState>;
};
-TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const std::unordered_map<TString, TClusterConnection>& clusterConnections = {}, const TMaybe<NYT::TNode>& jobSettings = Nothing());
+TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}, const TMaybe<NYT::TNode>& jobSettings = Nothing());
TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& taskErrorMessage = Nothing(), const TStatistics& stats = TStatistics());
diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
index 2007cec3011..f19c5365bdf 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
+++ b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
@@ -12,7 +12,7 @@ namespace NYql::NFmr {
TDownloadOperationParams downloadOperationParams{
.Input = TYtTableRef{"Path","Cluster"},
- .Output = TFmrTableRef{"TableId"}
+ .Output = TFmrTableRef{{"Cluster", "Path"}}
};
TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Download, TOperationParams operationParams = downloadOperationParams) {
@@ -20,7 +20,7 @@ TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Do
.TaskType = taskType,
.OperationParams = operationParams,
.IdempotencyKey = "IdempotencyKey",
- .ClusterConnections = {{"Cluster.Path", TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}}}
+ .ClusterConnections = {{TFmrTableId("Cluster", "Path"), TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}}}
};
}
diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
index 5860a65366d..e1302451382 100644
--- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
+++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
@@ -31,7 +31,14 @@ enum class ETablePresenceStatus {
namespace {
-struct TFmrOperationResult: public NCommon::TOperationResult {};
+TIssue ToIssue(const TFmrError& error, const TPosition& pos){
+ return TIssue(pos, error.ErrorMessage);
+};
+
+struct TFmrOperationResult: public NCommon::TOperationResult {
+ std::vector<TFmrError> Errors = {};
+ std::vector<TTableStats> TablesStats = {};
+};
class TFmrYtGateway final: public TYtForwardingGatewayBase {
public:
@@ -45,29 +52,30 @@ public:
auto getOperationStatusesFunc = [&] {
while (!StopFmrGateway_) {
with_lock(SessionStates_->Mutex) {
- auto checkOperationStatuses = [&] (std::unordered_map<TString, TPromise<TFmrOperationResult>>& operationStatuses, const TString& sessionId) {
+ auto checkOperationStatuses = [&] (std::unordered_map<TFmrTableId, TPromise<TFmrOperationResult>>& operationStatuses, const TString& sessionId) {
for (auto& [operationId, promise]: operationStatuses) {
YQL_CLOG(TRACE, FastMapReduce) << "Sending get operation request to coordinator with operationId: " << operationId;
- auto getOperationFuture = Coordinator_->GetOperation({operationId});
+ auto getOperationFuture = Coordinator_->GetOperation({operationId.Id});
getOperationFuture.Subscribe([&, operationId, sessionId] (const auto& getFuture) {
auto getOperationResult = getFuture.GetValueSync();
auto getOperationStatus = getOperationResult.Status;
auto operationErrorMessages = getOperationResult.ErrorMessages;
+ auto operationOutputTablesStats = getOperationResult.OutputTablesStats;
with_lock(SessionStates_->Mutex) {
bool operationCompleted = getOperationStatus != EOperationStatus::Accepted && getOperationStatus != EOperationStatus::InProgress;
if (operationCompleted) {
// operation finished, set value in future returned in DoMerge / DoUpload
bool hasCompletedSuccessfully = getOperationStatus == EOperationStatus::Completed;
+ TFmrOperationResult fmrOperationResult{};
+ fmrOperationResult.Errors = operationErrorMessages;
if (hasCompletedSuccessfully) {
- TFmrOperationResult fmrOperationResult{};
+ fmrOperationResult.TablesStats = operationOutputTablesStats;
fmrOperationResult.SetSuccess();
- promise.SetValue(fmrOperationResult);
- } else {
- promise.SetException(JoinRange(' ', operationErrorMessages.begin(), operationErrorMessages.end()));
}
- YQL_CLOG(DEBUG, FastMapReduce) << "Sending delete operation request to coordinator with operationId: " << operationId;
- auto deleteOperationFuture = Coordinator_->DeleteOperation({operationId});
+ promise.SetValue(fmrOperationResult);
+ YQL_CLOG(INFO, FastMapReduce) << "Sending delete operation request to coordinator with operationId: " << operationId;
+ auto deleteOperationFuture = Coordinator_->DeleteOperation({operationId.Id});
deleteOperationFuture.Subscribe([&, sessionId, operationId] (const auto& deleteFuture) {
auto deleteOperationResult = deleteFuture.GetValueSync();
auto deleteOperationStatus = deleteOperationResult.Status;
@@ -114,15 +122,28 @@ public:
auto future = DoMerge(inputTables, outputTable, std::move(options));
return future.Apply([this, pos = nodePos, outputTable = std::move(outputTable), options = std::move(options)] (const TFuture<TFmrOperationResult>& f) {
try {
- f.GetValue(); // rethrow error if any
+ auto fmrOperationResult = f.GetValue(); // rethrow error if any
TString sessionId = options.SessionId();
auto config = options.Config();
- TString fmrOutputTableId = outputTable.Cluster + "." + outputTable.Path;
+ TFmrTableId fmrOutputTableId = {outputTable.Cluster, outputTable.Path};
SetTablePresenceStatus(fmrOutputTableId, sessionId, ETablePresenceStatus::OnlyInFmr);
TRunResult result;
- result.OutTableStats.emplace_back(outputTable.Path, MakeIntrusive<TYtTableStatInfo>()); // TODO - add statistics?
- result.OutTableStats.back().second->Id = "fmr_" + fmrOutputTableId;
- result.SetSuccess();
+ auto tableStats = fmrOperationResult.TablesStats.back();
+ result.OutTableStats.emplace_back(outputTable.Path, MakeIntrusive<TYtTableStatInfo>());
+ result.OutTableStats.back().second->Id = "fmr_" + fmrOutputTableId.Id;
+ result.OutTableStats.back().second->RecordsCount = tableStats.Rows;
+ result.OutTableStats.back().second->DataSize = tableStats.DataWeight;
+ result.OutTableStats.back().second->ChunkCount = tableStats.Chunks;
+ YQL_CLOG(INFO, FastMapReduce) << "Fmr output table info: RecordsCount = " << result.OutTableStats.back().second->RecordsCount << " DataSize = " << result.OutTableStats.back().second->DataSize << " ChunkCount = " << result.OutTableStats.back().second->ChunkCount;
+ auto operationErrors = fmrOperationResult.Errors;
+ TVector<TIssue> issues;
+ for (const auto& error : operationErrors) {
+ issues.emplace_back(ToIssue(error, pos));
+ }
+ result.AddIssues(issues);
+ if (fmrOperationResult.Success()) {
+ result.SetSuccess();
+ }
return MakeFuture<TRunResult>(std::move(result));
} catch (...) {
return MakeFuture(ResultFromCurrentException<TRunResult>(pos));
@@ -147,14 +168,14 @@ public:
auto outputPath = publish.Publish().Name().StringValue();
bool isAnonymous = NYql::HasSetting(publish.Publish().Settings().Ref(), EYtSettingType::Anonymous);
- std::vector<TString> currentAnonymousTableAliases;
+ std::vector<TFmrTableId> currentAnonymousTableAliases;
for (auto out: publish.Input()) {
- auto inputCluster = GetOutTableWithCluster(out).second;
+ TString inputCluster = GetOutTableWithCluster(out).second;
auto outTable = GetOutTable(out).Cast<TYtOutTable>();
- TStringBuf inputPath = outTable.Name().Value();
+ TString inputPath = ToString(outTable.Name().Value());
if (isAnonymous) {
- currentAnonymousTableAliases.emplace_back(inputCluster + "." + inputPath);
+ currentAnonymousTableAliases.emplace_back(TFmrTableId(inputCluster, inputPath));
}
auto outputBase = out.Operation().Cast<TYtOutputOpBase>().Ptr();
uploadFmrTablesToYtFutures.emplace_back(DoUpload(inputCluster, TString(inputPath), sessionId, config, outputBase, ctx));
@@ -162,7 +183,7 @@ public:
if (isAnonymous) {
YQL_CLOG(DEBUG, FastMapReduce) << "Table " << outputPath << " is anonymous, not uploading from fmr to yt";
- TString fmrOutputTableId = cluster + "." + outputPath;
+ TFmrTableId fmrOutputTableId = {cluster, outputPath};
SetTablePresenceStatus(fmrOutputTableId, sessionId, ETablePresenceStatus::OnlyInFmr);
// TODO - figure out what to do here in case of multiple inputs
@@ -218,11 +239,11 @@ public:
YQL_ENSURE(sessions.contains(sessionId));
auto& operationStates = sessions[sessionId].OperationStates;
- auto cancelOperationsFunc = [&] (std::unordered_map<TString, TPromise<TFmrOperationResult>>& operationStatuses) {
+ auto cancelOperationsFunc = [&] (std::unordered_map<TFmrTableId, TPromise<TFmrOperationResult>>& operationStatuses) {
std::vector<TFuture<TDeleteOperationResponse>> cancelOperationsFutures;
for (auto& [operationId, promise]: operationStatuses) {
- cancelOperationsFutures.emplace_back(Coordinator_->DeleteOperation({operationId}));
+ cancelOperationsFutures.emplace_back(Coordinator_->DeleteOperation({operationId.Id}));
}
NThreading::WaitAll(cancelOperationsFutures).GetValueSync();
};
@@ -243,7 +264,7 @@ private:
return richPath.Path_;
}
- void SetTablePresenceStatus(const TString& fmrTableId, const TString& sessionId, ETablePresenceStatus newStatus) {
+ void SetTablePresenceStatus(const TFmrTableId& fmrTableId, const TString& sessionId, ETablePresenceStatus newStatus) {
with_lock(SessionStates_->Mutex) {
YQL_CLOG(DEBUG, FastMapReduce) << "Setting table presence status " << newStatus << " for table with id " << fmrTableId;
auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses;
@@ -251,7 +272,7 @@ private:
}
}
- void SetFmrIdAlias(const TString& fmrTableId, const TString& alias, const TString& sessionId) {
+ void SetFmrIdAlias(const TFmrTableId& fmrTableId, const TFmrTableId& alias, const TString& sessionId) {
with_lock(SessionStates_->Mutex) {
YQL_CLOG(DEBUG, FastMapReduce) << "Setting table fmr id alias " << alias << " for table with id " << fmrTableId;
auto& fmrIdAliases = SessionStates_->Sessions[sessionId].FmrIdAliases;
@@ -259,7 +280,7 @@ private:
}
}
- TString GetFmrIdOrAlias(const TString& fmrTableId, const TString& sessionId) {
+ TFmrTableId GetFmrIdOrAlias(const TFmrTableId& fmrTableId, const TString& sessionId) {
with_lock(SessionStates_->Mutex) {
auto& fmrIdAliases = SessionStates_->Sessions[sessionId].FmrIdAliases;
if (!fmrIdAliases.contains(fmrTableId)) {
@@ -269,7 +290,7 @@ private:
}
}
- TMaybe<ETablePresenceStatus> GetTablePresenceStatus(const TString& fmrTableId, const TString& sessionId) {
+ TMaybe<ETablePresenceStatus> GetTablePresenceStatus(const TFmrTableId& fmrTableId, const TString& sessionId) {
with_lock(SessionStates_->Mutex) {
auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses;
if (!tablePresenceStatuses.contains(fmrTableId)) {
@@ -320,6 +341,7 @@ private:
TFuture<TFmrOperationResult> GetRunningOperationFuture(const TStartOperationRequest& startOperationRequest, const TString& sessionId) {
auto promise = NewPromise<TFmrOperationResult>();
auto future = promise.GetFuture();
+ YQL_CLOG(INFO, FastMapReduce) << "Starting " << startOperationRequest.TaskType << " operation";
auto startOperationResponseFuture = Coordinator_->StartOperation(startOperationRequest);
startOperationResponseFuture.Subscribe([this, promise = std::move(promise), sessionId] (const auto& mergeFuture) {
TStartOperationResponse mergeOperationResponse = mergeFuture.GetValueSync();
@@ -336,11 +358,11 @@ private:
TFuture<TFmrOperationResult> DoUpload(const TString& outputCluster, const TString& outputPath, const TString& sessionId, TYtSettings::TConstPtr& config, TExprNode::TPtr outputOpBase, TExprContext& ctx) {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId);
- TFmrTableRef fmrTableRef{outputCluster + "." + outputPath};
- auto tablePresenceStatus = GetTablePresenceStatus(fmrTableRef.TableId, sessionId);
+ TFmrTableRef fmrTableRef{TFmrTableId(outputCluster, outputPath)};
+ auto tablePresenceStatus = GetTablePresenceStatus(fmrTableRef.FmrTableId, sessionId);
if (!tablePresenceStatus || *tablePresenceStatus != ETablePresenceStatus::OnlyInFmr) {
- YQL_CLOG(DEBUG, FastMapReduce) << " We assume table " << fmrTableRef.TableId << " should be present in yt, not uploading from fmr";
+ YQL_CLOG(INFO, FastMapReduce) << " We assume table " << fmrTableRef.FmrTableId << " should be present in yt, not uploading from fmr";
return GetSuccessfulFmrOperationResult();
}
@@ -357,7 +379,7 @@ private:
.SessionId = sessionId,
.IdempotencyKey = GenerateId(),
.NumRetries=1,
- .ClusterConnections = std::unordered_map<TString, TClusterConnection>{{fmrTableRef.TableId, clusterConnection}},
+ .ClusterConnections = std::unordered_map<TFmrTableId, TClusterConnection>{{fmrTableRef.FmrTableId, clusterConnection}},
.FmrOperationSpec = config->FmrOperationSpec.Get(outputCluster)
};
@@ -365,11 +387,11 @@ private:
.Config(config);
auto prepareFuture = Slave_->Prepare(outputOpBase, ctx, std::move(prepareOptions));
- return prepareFuture.Apply([this, uploadRequest = std::move(uploadRequest), sessionId = std::move(sessionId), fmrTableId = std::move(fmrTableRef.TableId)] (const TFuture<TRunResult>& f) mutable {
+ return prepareFuture.Apply([this, uploadRequest = std::move(uploadRequest), sessionId = std::move(sessionId), fmrTableId = std::move(fmrTableRef.FmrTableId)] (const TFuture<TRunResult>& f) mutable {
try {
f.GetValue();
YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId);
- YQL_CLOG(DEBUG, FastMapReduce) << "Starting upload from fmr to yt for table: " << fmrTableId;
+ YQL_CLOG(INFO, FastMapReduce) << "Starting upload from fmr to yt for table: " << fmrTableId;
return GetRunningOperationFuture(uploadRequest, sessionId).Apply([this, sessionId = std::move(sessionId), fmrTableId = std::move(fmrTableId)] (const TFuture<TFmrOperationResult>& f) {
try {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId);
@@ -397,13 +419,13 @@ private:
}
TString outputCluster = outputTable.Cluster, outputPath = outputTable.Path;
- TFmrTableRef fmrOutputTable{.TableId = outputCluster + "." + outputPath};
+ TFmrTableRef fmrOutputTable{TFmrTableId(outputCluster, outputPath)};
std::vector<TOperationTableRef> mergeInputTables;
- std::unordered_map<TString, TClusterConnection> clusterConnections;
+ std::unordered_map<TFmrTableId, TClusterConnection> clusterConnections;
for (auto [ytTable, isTemp]: inputTables) {
TString inputCluster = ytTable.Cluster, inputPath = ytTable.Path;
- TString fmrTableId = inputCluster + "." + inputPath;
+ TFmrTableId fmrTableId = {inputCluster, inputPath};
auto tablePresenceStatus = GetTablePresenceStatus(fmrTableId, sessionId);
if (!tablePresenceStatus) {
SetTablePresenceStatus(fmrTableId, sessionId, ETablePresenceStatus::OnlyInYt);
@@ -411,7 +433,7 @@ private:
if (tablePresenceStatus && *tablePresenceStatus != ETablePresenceStatus::OnlyInYt) {
// table is in fmr, do not download
- mergeInputTables.emplace_back(TFmrTableRef{.TableId = GetFmrIdOrAlias(fmrTableId, sessionId)});
+ mergeInputTables.emplace_back(TFmrTableRef(GetFmrIdOrAlias(fmrTableId, sessionId)));
} else {
ytTable.FilePath = GetTableFilePath(TGetTableFilePathOptions(sessionId).Cluster(inputCluster).Path(inputPath).IsTemp(isTemp));
mergeInputTables.emplace_back(ytTable);
@@ -435,7 +457,7 @@ private:
return table.first.Path;}
);
- YQL_CLOG(DEBUG, FastMapReduce) << "Starting merge from yt tables: " << JoinRange(' ', inputPaths.begin(), inputPaths.end());
+ YQL_CLOG(INFO, FastMapReduce) << "Starting merge from yt tables: " << JoinRange(' ', inputPaths.begin(), inputPaths.end());
return GetRunningOperationFuture(mergeOperationRequest, sessionId);
}
@@ -447,14 +469,14 @@ private:
private:
struct TFmrGatewayOperationsState {
- std::unordered_map<TString, TPromise<TFmrOperationResult>> OperationStatuses = {}; // operationId -> promise which we set when operation completes
+ std::unordered_map<TFmrTableId, TPromise<TFmrOperationResult>> OperationStatuses = {}; // operationId -> promise which we set when operation completes
};
struct TSessionInfo {
TFmrGatewayOperationsState OperationStates;
- std::unordered_map<TString, ETablePresenceStatus> TablePresenceStatuses; // yt cluster and path -> is it In Yt, Fmr TableDataService
+ std::unordered_map<TFmrTableId, ETablePresenceStatus> TablePresenceStatuses; // yt cluster and path -> is it In Yt, Fmr TableDataService
TString UserName;
- std::unordered_map<TString, TString> FmrIdAliases;
+ std::unordered_map<TFmrTableId, TFmrTableId> FmrIdAliases;
};
struct TSession {