aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2025-05-29 13:34:22 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2025-05-29 13:53:52 +0300
commitfdbc38349df2ee0ddc678fa2bffe84786f9639a3 (patch)
treebf49a9cbcd326a82b5380c4042b8277863db0ea5
parent25151c4698d52226d8b4882d2ef7fb2f04587b7c (diff)
downloadydb-fdbc38349df2ee0ddc678fa2bffe84786f9639a3.tar.gz
Intermediate changes
commit_hash:6dbef13d0dcaf09696934ec231fa4610d7edfec1
-rw-r--r--ya.conf1
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/default_coordinator_settings.yson12
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make2
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp84
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_partitioner_ut.cpp220
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ya.make5
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp441
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h7
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_partitioner.cpp168
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_partitioner.h66
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp15
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h3
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/ya.make13
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/yql_yt_coordinator_service_ut.cpp43
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ya.make16
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.cpp59
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.h7
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/ya.make13
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/yql_yt_coordinator_service_impl.cpp96
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/yql_yt_coordinator_service_impl.h7
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/ya.make14
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.cpp1
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.h25
-rw-r--r--yt/yql/providers/yt/fmr/fmr_tool_lib/ya.make6
-rw-r--r--yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp8
-rw-r--r--yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h7
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ut/ya.make3
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp71
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_writer_ut.cpp24
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ya.make2
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp92
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h6
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.cpp16
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h20
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.cpp16
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h7
-rw-r--r--yt/yql/providers/yt/fmr/job/interface/ya.make4
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp5
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp6
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h2
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/interface/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h2
-rw-r--r--yt/yql/providers/yt/fmr/proto/coordinator.proto2
-rw-r--r--yt/yql/providers/yt/fmr/proto/request_options.proto50
-rw-r--r--yt/yql/providers/yt/fmr/request_options/proto_helpers/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp271
-rw-r--r--yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h28
-rw-r--r--yt/yql/providers/yt/fmr/request_options/ya.make2
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp58
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h55
-rw-r--r--yt/yql/providers/yt/fmr/table_data_service/local/yql_yt_table_data_service_local.cpp9
-rw-r--r--yt/yql/providers/yt/fmr/utils/ut/ya.make2
-rw-r--r--yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp14
-rw-r--r--yt/yql/providers/yt/fmr/utils/ya.make4
-rw-r--r--yt/yql/providers/yt/fmr/utils/yql_yt_client.cpp14
-rw-r--r--yt/yql/providers/yt/fmr/utils/yql_yt_client.h10
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/ut/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp9
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp7
-rw-r--r--yt/yql/providers/yt/fmr/yt_job_service/file/ut/ya.make (renamed from yt/yql/providers/yt/fmr/yt_service/file/ut/ya.make)4
-rw-r--r--yt/yql/providers/yt/fmr/yt_job_service/file/ut/yql_yt_file_yt_job_service_ut.cpp (renamed from yt/yql/providers/yt/fmr/yt_service/file/ut/yql_yt_file_yt_service_ut.cpp)12
-rw-r--r--yt/yql/providers/yt/fmr/yt_job_service/file/ya.make (renamed from yt/yql/providers/yt/fmr/yt_service/file/ya.make)4
-rw-r--r--yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.cpp68
-rw-r--r--yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.h7
-rw-r--r--yt/yql/providers/yt/fmr/yt_job_service/impl/ya.make18
-rw-r--r--yt/yql/providers/yt/fmr/yt_job_service/impl/yql_yt_job_service_impl.cpp (renamed from yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp)34
-rw-r--r--yt/yql/providers/yt/fmr/yt_job_service/impl/yql_yt_job_service_impl.h7
-rw-r--r--yt/yql/providers/yt/fmr/yt_job_service/interface/ya.make (renamed from yt/yql/providers/yt/fmr/yt_service/interface/ya.make)2
-rw-r--r--yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.cpp1
-rw-r--r--yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.h (renamed from yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h)13
-rw-r--r--yt/yql/providers/yt/fmr/yt_job_service/mock/ya.make (renamed from yt/yql/providers/yt/fmr/yt_service/mock/ya.make)5
-rw-r--r--yt/yql/providers/yt/fmr/yt_job_service/mock/yql_yt_job_service_mock.cpp (renamed from yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.cpp)25
-rw-r--r--yt/yql/providers/yt/fmr/yt_job_service/mock/yql_yt_job_service_mock.h9
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.cpp68
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h7
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/impl/ya.make15
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h7
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.cpp1
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h9
-rw-r--r--yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h1
-rw-r--r--yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp1
81 files changed, 1901 insertions, 570 deletions
diff --git a/ya.conf b/ya.conf
index d73ed1b7a42..752dc171bb8 100644
--- a/ya.conf
+++ b/ya.conf
@@ -18,6 +18,7 @@ tools_cache_master = true
use_atd_revisions_info = true
use_jstyle_server = true
use_command_file_in_testtool = true
+use_universal_fetcher_everywhere = true
# ===== opensource only table params =====
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/default_coordinator_settings.yson b/yt/yql/providers/yt/fmr/coordinator/impl/default_coordinator_settings.yson
index 203b3d1ce66..ef865b7f5f7 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/default_coordinator_settings.yson
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/default_coordinator_settings.yson
@@ -23,4 +23,14 @@
"max_row_weight" = 16777216;
};
};
-} \ No newline at end of file
+ "partition" = {
+ "yt_table" = {
+ "max_data_weight_per_part" = 104857600;
+ "max_parts" = 100;
+ };
+ "fmr_table" = {
+ "max_data_weight_per_part" = 104857600;
+ "max_parts" = 100;
+ };
+ }
+}
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make b/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make
index 2f67f6d05ef..179b3e230f0 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make
@@ -2,10 +2,12 @@ UNITTEST()
SRCS(
yql_yt_coordinator_ut.cpp
+ yql_yt_partitioner_ut.cpp
)
PEERDIR(
yt/yql/providers/yt/fmr/coordinator/impl
+ yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file
yt/yql/providers/yt/fmr/job_factory/impl
yt/yql/providers/yt/fmr/worker/impl
)
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
index 0de9cb8fda0..ae3505fdd7b 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
@@ -7,6 +7,7 @@
#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h>
#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
#include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h>
+#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.h>
namespace NYql::NFmr {
@@ -45,10 +46,12 @@ private:
TDownloadOperationParams downloadOperationParams{
- .Input = TYtTableRef{"Path","Cluster"},
+ .Input = TYtTableRef{.Path = "Path", .Cluster = "Cluster", .FilePath = "File_path"},
.Output = TFmrTableRef{{"TestCluster", "TestPath"}}
};
+// TODO - создать общий файл на все тесты, наполнить его чем-то
+
TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Download, TOperationParams operationParams = downloadOperationParams) {
return TStartOperationRequest{
.TaskType = taskType,
@@ -83,13 +86,13 @@ auto defaultTaskFunction = [] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic
Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
Y_UNIT_TEST(StartOperation) {
- auto coordinator = MakeFmrCoordinator();
+ auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService());
auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync();
auto status = startOperationResponse.Status;
UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Accepted);
}
Y_UNIT_TEST(RetryAcceptedOperation) {
- auto coordinator = MakeFmrCoordinator();
+ auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService());
auto downloadRequest = CreateOperationRequest();
auto firstResponse = coordinator->StartOperation(downloadRequest).GetValueSync();
auto firstOperationId = firstResponse.OperationId;
@@ -102,13 +105,13 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
}
Y_UNIT_TEST(DeleteNonexistentOperation) {
- auto coordinator = MakeFmrCoordinator();
+ auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService());
auto deleteOperationResponse = coordinator->DeleteOperation({"delete_operation_id"}).GetValueSync();
EOperationStatus status = deleteOperationResponse.Status;
UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::NotFound);
}
Y_UNIT_TEST(DeleteOperationBeforeSendToWorker) {
- auto coordinator = MakeFmrCoordinator();
+ auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService());
auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync();
TString operationId = startOperationResponse.OperationId;
auto deleteOperationResponse = coordinator->DeleteOperation({operationId}).GetValueSync();
@@ -116,13 +119,13 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Aborted);
}
Y_UNIT_TEST(GetNonexistentOperation) {
- auto coordinator = MakeFmrCoordinator();
+ auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService());
auto getOperationResponse = coordinator->GetOperation({"get_operation_id"}).GetValueSync();
EOperationStatus status = getOperationResponse.Status;
UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::NotFound);
}
Y_UNIT_TEST(GetAcceptedOperationStatus) {
- auto coordinator = MakeFmrCoordinator();
+ auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService());
auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync();
TString operationId = startOperationResponse.OperationId;
auto getOperationResponse = coordinator->GetOperation({operationId}).GetValueSync();
@@ -130,7 +133,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Accepted);
}
Y_UNIT_TEST(GetRunningOperationStatus) {
- auto coordinator = MakeFmrCoordinator();
+ auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService());
auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync();
TString operationId = startOperationResponse.OperationId;
@@ -145,7 +148,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::InProgress);
}
Y_UNIT_TEST(GetCompletedOperationStatuses) {
- auto coordinator = MakeFmrCoordinator();
+ auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService());
auto startOperationRequests = CreateSeveralOperationRequests();
std::vector<TString> operationIds;
for (auto& request: startOperationRequests) {
@@ -165,34 +168,27 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
}
}
Y_UNIT_TEST(GetCompletedAndFailedOperationStatuses) {
- auto coordinator = MakeFmrCoordinator();
+ auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService());
auto downloadOperationRequests = CreateSeveralOperationRequests();
std::vector<TString> downloadOperationIds;
for (auto& request: downloadOperationRequests) {
auto startOperationResponse = coordinator->StartOperation(request).GetValueSync();
downloadOperationIds.emplace_back(startOperationResponse.OperationId);
}
- auto uploadOperationRequest = CreateOperationRequest(ETaskType::Upload, TUploadOperationParams{
- {{"Cluster", "Path"}},
- {}
+ auto badDownloadRequest = CreateOperationRequest(ETaskType::Download, TDownloadOperationParams{
+ .Input = TYtTableRef{.Path = "bad_path", .Cluster = "bad_cluster", .FilePath = "bad_file_path"},
+ .Output = TFmrTableRef{{"bad_cluster", "bad_path"}}
});
- auto uploadOperationResponse = coordinator->StartOperation(uploadOperationRequest).GetValueSync();
- auto uploadOperationId = uploadOperationResponse.OperationId;
+ auto badDownloadOperationResponse = coordinator->StartOperation(badDownloadRequest).GetValueSync();
+ auto badDownloadOperationId = badDownloadOperationResponse.OperationId;
auto func = [&] (TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) {
while (! cancelFlag->load()) {
Sleep(TDuration::Seconds(1));
- ETaskStatus taskStatus = std::visit([] (auto&& taskParams) {
- using T = std::decay_t<decltype(taskParams)>;
- if constexpr (std::is_same_v<T, TUploadTaskParams>) {
- return ETaskStatus::Failed;
- }
- return ETaskStatus::Completed;
- }, task->TaskParams);
- if (taskStatus == ETaskStatus::Failed) {
+ TDownloadTaskParams downloadTaskParams = std::get<TDownloadTaskParams>(task->TaskParams);
+ if (downloadTaskParams.Output.TableId.Contains("bad_path")) {
return TJobResult{.TaskStatus = ETaskStatus::Failed, .Stats = TStatistics()};
}
- Sleep(TDuration::Seconds(1));
return TJobResult{.TaskStatus = ETaskStatus::Completed, .Stats = TStatistics()};
}
return TJobResult{.TaskStatus = ETaskStatus::Failed, .Stats = TStatistics()};
@@ -210,12 +206,12 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
EOperationStatus status = getDownloadOperationResponse.Status;
UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Completed);
}
- auto getUploadOperationResponse = coordinator->GetOperation({uploadOperationId}).GetValueSync();
- EOperationStatus uploadStatus = getUploadOperationResponse.Status;
- UNIT_ASSERT_VALUES_EQUAL(uploadStatus, EOperationStatus::Failed);
+ auto getBadDownloadOperationResponse = coordinator->GetOperation({badDownloadOperationId}).GetValueSync();
+ EOperationStatus badDownloadStatus = getBadDownloadOperationResponse.Status;
+ UNIT_ASSERT_VALUES_EQUAL(badDownloadStatus, EOperationStatus::Failed);
}
Y_UNIT_TEST(RetryRunningOperation) {
- auto coordinator = MakeFmrCoordinator();
+ auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService());
auto downloadRequest = CreateOperationRequest();
auto startOperationResponse = coordinator->StartOperation(downloadRequest).GetValueSync();
TString firstOperationId = startOperationResponse.OperationId;
@@ -236,7 +232,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
Y_UNIT_TEST(RetryRunningOperationAfterIdempotencyKeyClear) {
auto coordinatorSettings = TFmrCoordinatorSettings();
coordinatorSettings.IdempotencyKeyStoreTime = TDuration::Seconds(1);
- auto coordinator = MakeFmrCoordinator(coordinatorSettings);
+ auto coordinator = MakeFmrCoordinator(coordinatorSettings, MakeFileYtCoordinatorService());
TFmrJobFactorySettings settings{.NumThreads = 3, .Function = defaultTaskFunction};
auto factory = MakeFmrJobFactory(settings);
@@ -260,7 +256,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
UNIT_ASSERT_VALUES_EQUAL(secondOperationStatus, EOperationStatus::Accepted);
}
Y_UNIT_TEST(CancelTasksAfterVolatileIdReload) {
- auto coordinator = MakeFmrCoordinator();
+ auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService());
auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
int numIterations = 0;
while (!cancelFlag->load()) {
@@ -292,7 +288,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
UNIT_ASSERT_NO_DIFF(*error.OperationId, operationId);
}
Y_UNIT_TEST(HandleJobErrors) {
- auto coordinator = MakeFmrCoordinator();
+ auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService());
auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync();
TString operationId = startOperationResponse.OperationId;
@@ -323,13 +319,21 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
}
Y_UNIT_TEST(GetFmrTableInfo) {
- auto coordinator = MakeFmrCoordinator();
- TTableStats tableStats = {.Chunks = 1, .Rows = 2, .DataWeight = 3};
- TString tableId = "test_table";
- TFmrTableOutputRef fmrTableOutputRef{.TableId = tableId, .PartId = "test_part_id"};
- std::unordered_map<TFmrTableOutputRef, TTableStats> outputTables{{fmrTableOutputRef, tableStats}};
- auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
+ auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService());
+ ui64 totalChunkCount = 10, chunkRowCount = 1, chunkDataWeight = 2;
+ TString tableId = "TestCluster.TestPath"; // corresponds to CreateOperationRequest()
+ auto func = [&] (TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) {
while (!cancelFlag->load()) {
+ Sleep(TDuration::Seconds(1));
+ TDownloadTaskParams downloadTaskParams = std::get<TDownloadTaskParams>(task->TaskParams);
+ TString partId = downloadTaskParams.Output.PartId;
+ TFmrTableOutputRef fmrTableOutputRef{.TableId = tableId, .PartId = partId};
+ TTableChunkStats tableChunkStats{
+ .PartId = partId,
+ .PartIdChunkStats = std::vector<TChunkStats>(totalChunkCount, TChunkStats{.Rows = chunkRowCount, .DataWeight = chunkDataWeight})
+ };
+ std::unordered_map<TFmrTableOutputRef, TTableChunkStats> outputTables{{fmrTableOutputRef, tableChunkStats}};
+
return TJobResult{.TaskStatus = ETaskStatus::Completed, .Stats = TStatistics{
.OutputTables = outputTables
}};
@@ -346,9 +350,9 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
Sleep(TDuration::Seconds(3));
auto response = coordinator->GetFmrTableInfo({tableId}).GetValueSync();
worker->Stop();
- UNIT_ASSERT_VALUES_EQUAL(response.TableStats.Chunks, tableStats.Chunks);
- UNIT_ASSERT_VALUES_EQUAL(response.TableStats.Rows, tableStats.Rows);
- UNIT_ASSERT_VALUES_EQUAL(response.TableStats.DataWeight, tableStats.DataWeight);
+ UNIT_ASSERT_VALUES_EQUAL(response.TableStats.Chunks, totalChunkCount);
+ UNIT_ASSERT_VALUES_EQUAL(response.TableStats.Rows, totalChunkCount * chunkRowCount);
+ UNIT_ASSERT_VALUES_EQUAL(response.TableStats.DataWeight, totalChunkCount * chunkDataWeight);
}
}
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_partitioner_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_partitioner_ut.cpp
new file mode 100644
index 00000000000..87d491afd60
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_partitioner_ut.cpp
@@ -0,0 +1,220 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_partitioner.h>
+
+namespace NYql::NFmr {
+
+const TString FirstPartId = "test_part_id_0", SecondPartId = "test_part_id_1";
+
+std::unordered_map<TFmrTableId, std::vector<TString>> GetTestPartIdsForTable(const TFmrTableId& fmrId) {
+ return std::unordered_map<TFmrTableId, std::vector<TString>>{{fmrId, std::vector<TString>{FirstPartId, SecondPartId}}};
+}
+
+std::unordered_map<TString, std::vector<TChunkStats>> GetTestPartIdStats() {
+ const std::vector<TChunkStats> firstPartitionChunkStats{
+ TChunkStats{.DataWeight = 30},
+ TChunkStats{.DataWeight = 30},
+ TChunkStats{.DataWeight = 10},
+ TChunkStats{.DataWeight = 20},
+ };
+ const std::vector<TChunkStats> secondPartitionChunkStats{
+ TChunkStats{.DataWeight = 40},
+ TChunkStats{.DataWeight = 15},
+ };
+
+ return std::unordered_map<TString, std::vector<TChunkStats>>{
+ {FirstPartId, firstPartitionChunkStats},
+ {SecondPartId, secondPartitionChunkStats}
+ };
+}
+
+const TString FirstTablePartId = "first_table_part_id", SecondTablePartId = "sec_table_part_id", ThirdTablePartId = "third_table_part_id";
+
+std::unordered_map<TFmrTableId, std::vector<TString>> GetTestPartIdsForMultipleTables(std::vector<TFmrTableId>& fmrTableIds) {
+ UNIT_ASSERT_VALUES_EQUAL(fmrTableIds.size(), 3);
+ return std::unordered_map<TFmrTableId, std::vector<TString>>{
+ {fmrTableIds[0], std::vector<TString>{FirstTablePartId}},
+ {fmrTableIds[1], std::vector<TString>{SecondTablePartId}},
+ {fmrTableIds[2], std::vector<TString>{ThirdTablePartId}}
+ };
+}
+
+std::unordered_map<TString, std::vector<TChunkStats>> GetTestPartIdStatsForMultipleTables() {
+ const std::vector<TChunkStats> firstPartitionChunkStats{
+ TChunkStats{.DataWeight = 40},
+ TChunkStats{.DataWeight = 20},
+ };
+ const std::vector<TChunkStats> secondPartitionChunkStats{
+ TChunkStats{.DataWeight = 40},
+ TChunkStats{.DataWeight = 15},
+ TChunkStats{.DataWeight = 5},
+ };
+
+ const std::vector<TChunkStats> thirdPartitionChunkStats{
+ TChunkStats{.DataWeight = 20},
+ TChunkStats{.DataWeight = 30},
+ TChunkStats{.DataWeight = 60},
+ };
+
+ return std::unordered_map<TString, std::vector<TChunkStats>>{
+ {FirstTablePartId, firstPartitionChunkStats},
+ {SecondTablePartId, secondPartitionChunkStats},
+ {ThirdTablePartId, thirdPartitionChunkStats}
+ };
+}
+
+std::vector<std::vector<TFmrTableInputRef>> ChangeGottenTasksFormat(const std::vector<TTaskTableInputRef>& inputTasks) {
+ // needed for testing so we can check resulting vectors for equality.
+ std::vector<std::vector<TFmrTableInputRef>> resultTasks;
+ for (auto& task: inputTasks) {
+ std::vector<TFmrTableInputRef> curTask;
+ std::transform(task.Inputs.begin(),task.Inputs.end(), std::back_inserter(curTask), [](const TTaskTableRef& tablePart){
+ return std::get<TFmrTableInputRef>(tablePart);
+ });
+ resultTasks.emplace_back(curTask);
+ }
+ return resultTasks;
+}
+
+Y_UNIT_TEST_SUITE(PartitionerTests) {
+ Y_UNIT_TEST(PartitionFmrTable) {
+ auto fmrTableId = TFmrTableId("test_cluster", "test_path");
+ TFmrTableRef fmrTable = TFmrTableRef{fmrTableId};
+
+ auto partIdsForTables = GetTestPartIdsForTable(fmrTableId);
+ auto partIdStats = GetTestPartIdStats();
+ TFmrPartitionerSettings settings{.MaxDataWeightPerPart = 50, .MaxParts = 100};
+ TFmrPartitioner partitioner(partIdsForTables, partIdStats, settings);
+
+ auto [gottenTasks, status] = partitioner.PartitionFmrTablesIntoTasks({fmrTable});
+ UNIT_ASSERT_VALUES_EQUAL(status, true);
+
+ std::vector<std::vector<TFmrTableInputRef>> expectedTasks = {
+ {TFmrTableInputRef{
+ .TableId = fmrTableId.Id,
+ .TableRanges = {
+ TTableRange{.PartId = FirstPartId, .MinChunk = 0, .MaxChunk = 1}
+ }
+ }},
+ {TFmrTableInputRef{
+ .TableId = fmrTableId.Id,
+ .TableRanges = {
+ TTableRange{.PartId = FirstPartId, .MinChunk = 1, .MaxChunk = 3}
+ }
+ }},
+ {TFmrTableInputRef{
+ .TableId = fmrTableId.Id,
+ .TableRanges = {
+ TTableRange{.PartId = SecondPartId, .MinChunk = 0, .MaxChunk = 1}
+ }
+ }},
+ {TFmrTableInputRef{
+ .TableId = fmrTableId.Id,
+ .TableRanges = {
+ TTableRange{.PartId = FirstPartId, .MinChunk = 3, .MaxChunk = 4},
+ TTableRange{.PartId = SecondPartId, .MinChunk = 1, .MaxChunk = 2},
+ }
+ }},
+ };
+ UNIT_ASSERT_VALUES_EQUAL(ChangeGottenTasksFormat(gottenTasks), expectedTasks);
+ }
+ Y_UNIT_TEST(MaxPartsNumExceeded) {
+ auto fmrTableId = TFmrTableId("test_cluster", "test_path");
+ TFmrTableRef fmrTable = TFmrTableRef{fmrTableId};
+
+ auto partIdsForTables = GetTestPartIdsForTable(fmrTableId);
+ auto partIdStats = GetTestPartIdStats();
+ TFmrPartitionerSettings settings{.MaxDataWeightPerPart = 50, .MaxParts = 2};
+ TFmrPartitioner partitioner(partIdsForTables, partIdStats, settings);
+
+ auto [gottenTasks, status] = partitioner.PartitionFmrTablesIntoTasks({fmrTable});
+ UNIT_ASSERT_VALUES_EQUAL(status, false);
+ }
+ Y_UNIT_TEST(SeveralFullPartitionsInTask) {
+ auto fmrTableId = TFmrTableId("test_cluster", "test_path");
+ TFmrTableRef fmrTable = TFmrTableRef{fmrTableId};
+
+ auto partIdsForTables = GetTestPartIdsForTable(fmrTableId);
+ auto partIdStats = GetTestPartIdStats();
+ TFmrPartitionerSettings settings{.MaxDataWeightPerPart = 1000000, .MaxParts = 1};
+ TFmrPartitioner partitioner(partIdsForTables, partIdStats, settings);
+
+ auto [gottenTasks, status] = partitioner.PartitionFmrTablesIntoTasks({fmrTable});
+ UNIT_ASSERT_VALUES_EQUAL(status, true);
+
+ std::vector<std::vector<TFmrTableInputRef>> expectedTasks = {
+ {
+ TFmrTableInputRef{
+ .TableId = fmrTableId.Id,
+ .TableRanges = {
+ TTableRange{.PartId = FirstPartId, .MinChunk = 0, .MaxChunk = 4},
+ TTableRange{.PartId = SecondPartId, .MinChunk = 0, .MaxChunk = 2},
+ }
+ }
+ }
+ };
+ UNIT_ASSERT_VALUES_EQUAL(ChangeGottenTasksFormat(gottenTasks), expectedTasks);
+ }
+ Y_UNIT_TEST(SeveralInputTables) {
+ std::vector<TFmrTableId> inputFmrTableIds{
+ TFmrTableId("test_cluster_1", "test_path_1"),
+ TFmrTableId("test_cluster_2", "test_path_2"),
+ TFmrTableId("test_cluster_3", "test_path_3"),
+ };
+ std::vector<TFmrTableRef> inputTables;
+ for (auto& id: inputFmrTableIds) {
+ inputTables.emplace_back(TFmrTableRef{.FmrTableId = id});
+ }
+
+ auto partIdsForTables = GetTestPartIdsForMultipleTables(inputFmrTableIds);
+ auto partIdStats = GetTestPartIdStatsForMultipleTables();
+ TFmrPartitionerSettings settings{.MaxDataWeightPerPart = 50, .MaxParts = 1000};
+ TFmrPartitioner partitioner(partIdsForTables, partIdStats, settings);
+ auto [gottenTasks, status] = partitioner.PartitionFmrTablesIntoTasks(inputTables);
+ UNIT_ASSERT_VALUES_EQUAL(status, true);
+
+ std::vector<std::vector<TFmrTableInputRef>> expectedTasks = {
+ {TFmrTableInputRef{
+ .TableId = TFmrTableId("test_cluster_1", "test_path_1").Id,
+ .TableRanges = {
+ TTableRange{.PartId = FirstTablePartId, .MinChunk = 0, .MaxChunk = 1}
+ }
+ }},
+ {TFmrTableInputRef{
+ .TableId = TFmrTableId("test_cluster_2", "test_path_2").Id,
+ .TableRanges = {
+ TTableRange{.PartId = SecondTablePartId, .MinChunk = 0, .MaxChunk = 1}
+ }
+ }},
+
+ {TFmrTableInputRef{
+ .TableId = TFmrTableId("test_cluster_3", "test_path_3").Id,
+ .TableRanges = {
+ TTableRange{.PartId = ThirdTablePartId, .MinChunk = 0, .MaxChunk = 2}
+ }
+ }},
+ {TFmrTableInputRef{
+ .TableId = TFmrTableId("test_cluster_3", "test_path_3").Id,
+ .TableRanges = {
+ TTableRange{.PartId = ThirdTablePartId, .MinChunk = 2, .MaxChunk = 3}
+ }
+ }},
+ {
+ TFmrTableInputRef{
+ .TableId = TFmrTableId("test_cluster_1", "test_path_1").Id,
+ .TableRanges = {
+ TTableRange{.PartId = FirstTablePartId, .MinChunk = 1, .MaxChunk = 2}
+ }
+ },
+ TFmrTableInputRef{
+ .TableId = TFmrTableId("test_cluster_2", "test_path_2").Id,
+ .TableRanges = {
+ TTableRange{.PartId = SecondTablePartId, .MinChunk = 1, .MaxChunk = 3}
+ }
+ }
+ }
+ };
+ UNIT_ASSERT_VALUES_EQUAL(ChangeGottenTasksFormat(gottenTasks), expectedTasks);
+ }
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ya.make b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make
index 20abd2bbc84..5d0057b51f8 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/ya.make
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make
@@ -2,6 +2,7 @@ LIBRARY()
SRCS(
yql_yt_coordinator_impl.cpp
+ yql_yt_partitioner.cpp
)
PEERDIR(
@@ -11,8 +12,10 @@ PEERDIR(
library/cpp/yson/node
yt/cpp/mapreduce/common
yt/yql/providers/yt/fmr/coordinator/interface
- yql/essentials/utils/log
+ yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface
+ yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl
yql/essentials/utils
+ yql/essentials/utils/log
)
RESOURCE(
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
index 10ce8469243..22d9ebc6da2 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
@@ -1,6 +1,7 @@
#include <thread>
#include <library/cpp/resource/resource.h>
#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_partitioner.h>
#include <yql/essentials/utils/log/log.h>
#include <yql/essentials/utils/yql_panic.h>
#include "yql_yt_coordinator_impl.h"
@@ -17,39 +18,16 @@ TFmrCoordinatorSettings::TFmrCoordinatorSettings() {
namespace {
-struct TCoordinatorTaskInfo {
- TTask::TPtr Task;
- ETaskStatus TaskStatus;
- TString OperationId;
-};
-
-struct TOperationInfo {
- std::unordered_set<TString> TaskIds; // for now each operation consists only of one task, until paritioner is implemented
- EOperationStatus OperationStatus;
- std::vector<TFmrError> ErrorMessages;
- TString SessionId;
- std::vector<TString> OutputTableIds = {};
-};
-
-struct TIdempotencyKeyInfo {
- TString OperationId;
- TInstant OperationCreationTime;
-};
-
-struct TCoordinatorFmrTableStats {
- TTableStats Stats;
- TString PartId; // only one PartId for now
-};
-
class TFmrCoordinator: public IFmrCoordinator {
public:
- TFmrCoordinator(const TFmrCoordinatorSettings& settings)
+ TFmrCoordinator(const TFmrCoordinatorSettings& settings, IYtCoordinatorService::TPtr ytCoordinatorService)
: WorkersNum_(settings.WorkersNum),
RandomProvider_(settings.RandomProvider),
StopCoordinator_(false),
TimeToSleepBetweenClearKeyRequests_(settings.TimeToSleepBetweenClearKeyRequests),
IdempotencyKeyStoreTime_(settings.IdempotencyKeyStoreTime),
- DefaultFmrOperationSpec_(settings.DefaultFmrOperationSpec)
+ DefaultFmrOperationSpec_(settings.DefaultFmrOperationSpec),
+ YtCoordinatorService_(ytCoordinatorService)
{
StartClearingIdempotencyKeys();
}
@@ -73,13 +51,20 @@ public:
IdempotencyKeys_[*IdempotencyKey] = TIdempotencyKeyInfo{.OperationId = operationId, .OperationCreationTime=TInstant::Now()};
}
- TString taskId = GenerateId();
- auto taskParams = MakeDefaultTaskParamsFromOperation(request.OperationParams);
+ auto fmrOperationSpec = GetMergedFmrOperationSpec(request.FmrOperationSpec);
+ auto taskParams = PartitionOperationIntoSeveralTasks(request.OperationParams, fmrOperationSpec, request.ClusterConnections);
- TTask::TPtr createdTask = MakeTask(request.TaskType, taskId, taskParams, request.SessionId, request.ClusterConnections, GetJobSettings(request.FmrOperationSpec));
- Tasks_[taskId] = TCoordinatorTaskInfo{.Task = createdTask, .TaskStatus = ETaskStatus::Accepted, .OperationId = operationId};
+ std::unordered_set<TString> taskIds;
+
+ for (auto& currentTaskParams: taskParams) {
+ TString taskId = GenerateId();
+ TTask::TPtr createdTask = MakeTask(request.TaskType, taskId, currentTaskParams, request.SessionId, request.ClusterConnections, fmrOperationSpec);
+ Tasks_[taskId] = TCoordinatorTaskInfo{.Task = createdTask, .TaskStatus = ETaskStatus::Accepted, .OperationId = operationId};
+ TasksToRun_.emplace(createdTask, taskId);
+ taskIds.emplace(taskId);
+ }
- Operations_[operationId] = {.TaskIds = {taskId}, .OperationStatus = EOperationStatus::Accepted, .SessionId = request.SessionId};
+ Operations_[operationId] = {.TaskIds = taskIds, .OperationStatus = EOperationStatus::Accepted, .SessionId = request.SessionId};
YQL_CLOG(DEBUG, FastMapReduce) << "Starting operation with id " << operationId;
return NThreading::MakeFuture(TStartOperationResponse(EOperationStatus::Accepted, operationId));
}
@@ -96,8 +81,11 @@ public:
auto operationStatus = operationInfo.OperationStatus;
auto errorMessages = operationInfo.ErrorMessages;
std::vector<TTableStats> outputTablesStats;
- for (auto& tableId : operationInfo.OutputTableIds) {
- outputTablesStats.emplace_back(FmrTableStatistics_[tableId].Stats);
+ if (operationStatus == EOperationStatus::Completed) {
+ // Calculating output table stats only in case of successful completion of opereation
+ for (auto& tableId : operationInfo.OutputTableIds) {
+ outputTablesStats.emplace_back(CalculateTableStats(tableId));
+ }
}
return NThreading::MakeFuture(TGetOperationResponse(operationStatus, errorMessages, outputTablesStats));
}
@@ -111,15 +99,14 @@ public:
YQL_LOG_CTX_ROOT_SESSION_SCOPE(Operations_[operationId].SessionId);
YQL_CLOG(DEBUG, FastMapReduce) << "Deleting operation with id " << operationId;
auto taskIds = Operations_[operationId].TaskIds;
- YQL_ENSURE(taskIds.size() == 1);
- auto taskId = *taskIds.begin();
- YQL_ENSURE(Tasks_.contains(taskId));
-
- auto taskStatus = Tasks_[taskId].TaskStatus;
- if (taskStatus == ETaskStatus::InProgress) {
- TaskToDeleteIds_.insert(taskId); // Task is currently running, send signal to worker to cancel
- } else {
- ClearTask(taskId); // Task either hasn't begun running or finished, remove info
+ for (auto& taskId: taskIds){
+ YQL_ENSURE(Tasks_.contains(taskId));
+ auto taskStatus = Tasks_[taskId].TaskStatus;
+ if (taskStatus == ETaskStatus::InProgress) {
+ TaskToDeleteIds_.insert(taskId); // Task is currently running, send signal to worker to cancel
+ } else {
+ ClearTask(taskId); // Task either hasn't begun running or finished, remove info
+ }
}
return NThreading::MakeFuture(TDeleteOperationResponse(EOperationStatus::Aborted));
@@ -148,9 +135,9 @@ public:
for (auto& requestTaskState: request.TaskStates) {
auto taskId = requestTaskState->TaskId;
+ YQL_ENSURE(Tasks_.contains(taskId));
auto operationId = Tasks_[taskId].OperationId;
YQL_LOG_CTX_ROOT_SESSION_SCOPE(Operations_[operationId].SessionId);
- YQL_ENSURE(Tasks_.contains(taskId));
auto taskStatus = requestTaskState->TaskStatus;
YQL_ENSURE(taskStatus != ETaskStatus::Accepted);
SetUnfinishedTaskStatus(taskId, taskStatus, requestTaskState->TaskErrorMessage);
@@ -159,48 +146,52 @@ public:
}
auto statistics = requestTaskState->Stats;
+ YQL_CLOG(TRACE, FastMapReduce) << " Task with id " << taskId << " has current status " << taskStatus << Endl;
+ bool isOperationCompleted = (GetOperationStatus(operationId) == EOperationStatus::Completed);
for (auto& [fmrTableId, tableStats]: statistics.OutputTables) {
- if (FmrTableStatistics_.contains(fmrTableId.TableId)) {
- auto curTableStats = FmrTableStatistics_[fmrTableId.TableId];
- YQL_ENSURE(
- tableStats.Chunks >= curTableStats.Stats.Chunks &&
- tableStats.DataWeight >= curTableStats.Stats.DataWeight &&
- tableStats.Rows >= curTableStats.Stats.Rows
- );
- YQL_ENSURE(fmrTableId.PartId == curTableStats.PartId);
- if (taskStatus == ETaskStatus::Completed) {
- YQL_CLOG(DEBUG, FastMapReduce) << "Current statistic from table with id" << fmrTableId.TableId << "_" << fmrTableId.PartId << ": " << tableStats;
- }
+ Operations_[operationId].OutputTableIds.emplace(fmrTableId.TableId);
+ PartIdStats_[fmrTableId.PartId] = tableStats.PartIdChunkStats;
+ if (isOperationCompleted) {
+ YQL_CLOG(INFO, FastMapReduce) << "Operation with id " << operationId << " has finished successfully";
+ CalculateTableStats(fmrTableId.TableId, true);
}
- Operations_[operationId].OutputTableIds.emplace_back(fmrTableId.TableId);
- FmrTableStatistics_[fmrTableId.TableId] = TCoordinatorFmrTableStats{
- .Stats = tableStats,
- .PartId = fmrTableId.PartId
- };
+ // TODO - проверка на валидность возвращаемой воркером статистики?
}
}
- std::vector<TTask::TPtr> tasksToRun;
- for (auto& taskToRunInfo: Tasks_) {
- if (taskToRunInfo.second.TaskStatus == ETaskStatus::Accepted) {
- SetUnfinishedTaskStatus(taskToRunInfo.first, ETaskStatus::InProgress);
- tasksToRun.emplace_back(taskToRunInfo.second.Task);
+ std::vector<TTask::TPtr> currentTasksToRun;
+ ui64 filledSlots = 0;
+ while (filledSlots < request.AvailableSlots) {
+ if (TasksToRun_.empty()) {
+ break;
}
+ auto [task, taskId] = TasksToRun_.front();
+ TasksToRun_.pop();
+ if (!Tasks_.contains(taskId)) {
+ continue;
+ }
+ auto& taskInfo = Tasks_[taskId];
+ YQL_ENSURE(taskInfo.TaskStatus == ETaskStatus::Accepted);
+ SetUnfinishedTaskStatus(taskId, ETaskStatus::InProgress);
+ SetPartIdsForTask(task);
+ currentTasksToRun.emplace_back(task);
+ ++filledSlots;
}
- return NThreading::MakeFuture(THeartbeatResponse{.TasksToRun = tasksToRun, .TaskToDeleteIds = TaskToDeleteIds_});
+
+ return NThreading::MakeFuture(THeartbeatResponse{.TasksToRun = currentTasksToRun, .TaskToDeleteIds = TaskToDeleteIds_});
}
NThreading::TFuture<TGetFmrTableInfoResponse> GetFmrTableInfo(const TGetFmrTableInfoRequest& request) override {
TGuard<TMutex> guard(Mutex_);
TGetFmrTableInfoResponse response;
auto tableId = request.TableId;
- if (!FmrTableStatistics_.contains(tableId)) {
+ if (!PartIdsForTables_.contains(tableId)) {
response.ErrorMessages = {TFmrError{
.Component = EFmrComponent::Coordinator, .ErrorMessage = "Fmr table id " + tableId + " was not found"
}};
return NThreading::MakeFuture(response);
}
- response.TableStats = FmrTableStatistics_[tableId].Stats;
+ response.TableStats = CalculateTableStats(tableId);
return NThreading::MakeFuture(response);
}
@@ -219,11 +210,11 @@ private:
if (Operations_.contains(operationId)) {
auto& operationInfo = Operations_[operationId];
auto operationStatus = operationInfo.OperationStatus;
- auto& taskIds = operationInfo.TaskIds;
- YQL_ENSURE(taskIds.size() == 1);
- auto taskId = *operationInfo.TaskIds.begin();
if (operationStatus != EOperationStatus::Accepted && operationStatus != EOperationStatus::InProgress) {
- ClearTask(taskId);
+ auto& taskIds = operationInfo.TaskIds;
+ for (auto& taskId: taskIds) {
+ ClearTask(taskId);
+ }
}
}
} else {
@@ -245,7 +236,14 @@ private:
YQL_ENSURE(Tasks_.contains(taskId));
auto& taskInfo = Tasks_[taskId];
TaskToDeleteIds_.erase(taskId);
- Operations_.erase(taskInfo.OperationId);
+
+ YQL_ENSURE(Operations_.contains(taskInfo.OperationId));
+ auto& currentTaskIdsForOperation = Operations_[taskInfo.OperationId];
+ currentTaskIdsForOperation.TaskIds.erase(taskId);
+ if (currentTaskIdsForOperation.TaskIds.empty()) {
+ // All task for operation are cleared, can clear it
+ Operations_.erase(taskInfo.OperationId);
+ }
Tasks_.erase(taskId);
}
@@ -256,6 +254,7 @@ private:
if (taskInfo.TaskStatus != ETaskStatus::Accepted && taskInfo.TaskStatus != ETaskStatus::InProgress) {
return;
}
+ YQL_CLOG(TRACE, FastMapReduce) << "Setting task status for task id" << taskId << " from " << taskInfo.TaskStatus << " to new Task status " << newTaskStatus << "\n";
taskInfo.TaskStatus = newTaskStatus;
operationInfo.OperationStatus = GetOperationStatus(taskInfo.OperationId);
if (taskErrorMessage) {
@@ -265,102 +264,89 @@ private:
}
EOperationStatus GetOperationStatus(const TString& operationId) {
- if (! Operations_.contains(operationId)) {
+ if (!Operations_.contains(operationId)) {
return EOperationStatus::NotFound;
}
std::unordered_set<TString> taskIds = Operations_[operationId].TaskIds;
- YQL_ENSURE(taskIds.size() == 1);
+ std::unordered_set<ETaskStatus> taskStatuses;
+
+ for (auto& taskId: taskIds) {
+ taskStatuses.emplace(Tasks_[taskId].TaskStatus);
+ }
+ YQL_ENSURE(!taskStatuses.contains(ETaskStatus::Unknown));
- auto taskId = *taskIds.begin();
- ETaskStatus taskStatus = Tasks_[taskId].TaskStatus;
- return static_cast<EOperationStatus>(taskStatus);
+ if (taskStatuses.contains(ETaskStatus::Failed)) {
+ return EOperationStatus::Failed;
+ }
+ if (taskStatuses.contains(ETaskStatus::InProgress)) {
+ return EOperationStatus::InProgress;
+ }
+ if (taskStatuses.contains(ETaskStatus::InProgress)) {
+ return EOperationStatus::InProgress;
+ }
+ if (taskStatuses.contains(ETaskStatus::Accepted)) {
+ return EOperationStatus::Accepted;
+ }
+ return EOperationStatus::Completed;
}
- TTableRange GetTableRangeFromId(const TString& tableId) {
- if (!FmrTableStatistics_.contains(tableId)) {
- TString partId = GenerateId();
- FmrTableStatistics_[tableId] = TCoordinatorFmrTableStats{.Stats=TTableStats{}, .PartId=partId};
- return TTableRange{.PartId = partId};
- }
- auto fmrTableStats = FmrTableStatistics_[tableId];
- return TTableRange{
- .PartId = fmrTableStats.PartId,
- .MinChunk = 0,
- .MaxChunk = fmrTableStats.Stats.Chunks
- };
+ TFmrPartitionerSettings GetFmrPartitionerSettings(const NYT::TNode& fmrOperationSpec) {
+ TFmrPartitionerSettings settings;
+ auto& fmrPartitionSettings = fmrOperationSpec["partition"]["fmr_table"];
+ settings.MaxDataWeightPerPart = fmrPartitionSettings["max_data_weight_per_part"].AsInt64();
+ settings.MaxParts = fmrPartitionSettings["max_parts"].AsInt64();
+ return settings;
}
- std::vector<TTaskTableRef> TaskInputTablesFromOperationInputTables(const std::vector<TOperationTableRef>& operationTables) {
- std::vector<TTaskTableRef> taskInputTables;
- for (auto& elem: operationTables) {
- if (const TYtTableRef* ytTableRef = std::get_if<TYtTableRef>(&elem)) {
- taskInputTables.emplace_back(*ytTableRef);
- } else {
- TFmrTableRef fmrTableRef = std::get<TFmrTableRef>(elem);
- TString inputTableId = fmrTableRef.FmrTableId.Id;
- TFmrTableInputRef tableInput{
- .TableId = inputTableId,
- .TableRanges = {GetTableRangeFromId(inputTableId)}
- };
- taskInputTables.emplace_back(tableInput);
- }
+ TYtPartitionerSettings GetYtPartitionerSettings(const NYT::TNode& fmrOperationSpec) {
+ TYtPartitionerSettings settings;
+ auto& ytPartitionSettings = fmrOperationSpec["partition"]["yt_table"];
+ settings.MaxDataWeightPerPart = ytPartitionSettings["max_data_weight_per_part"].AsInt64();
+ settings.MaxParts = ytPartitionSettings["max_parts"].AsInt64();
+ return settings;
+ }
+
+ std::vector<TTaskParams> PartitionOperationIntoSeveralTasks(const TOperationParams& operationParams, const NYT::TNode& fmrOperationSpec, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) {
+ auto fmrPartitionerSettings = GetFmrPartitionerSettings(fmrOperationSpec);
+ auto ytPartitionerSettings = GetYtPartitionerSettings(fmrOperationSpec);
+ auto fmrPartitioner = TFmrPartitioner(PartIdsForTables_,PartIdStats_, fmrPartitionerSettings); // TODO - fix this
+
+ std::vector<TYtTableRef> ytInputTables;
+ std::vector<TFmrTableRef> fmrInputTables;
+ GetOperationInputTables(ytInputTables, fmrInputTables, operationParams);
+
+ TPartitionResult partitionResult = PartitionInputTablesIntoTasks(ytInputTables, fmrInputTables, fmrPartitioner, YtCoordinatorService_, clusterConnections, ytPartitionerSettings);
+ if (!partitionResult.PartitionStatus) {
+ ythrow yexception() << "Failed to partition input tables into tasks";
+ // TODO - return FAILED_PARTITIONING status instead.
}
- return taskInputTables;
+ return GetOutputTaskParams(partitionResult, operationParams);
}
- std::vector<TFmrTableOutputRef> TaskOutputTablesFromOperationOutputTables(const std::vector<TFmrTableRef>& operationTables) {
- std::vector<TFmrTableOutputRef> taskOutputTables;
- for (auto& fmrTableRef: operationTables) {
- TString outputTableId = fmrTableRef.FmrTableId.Id;
- TFmrTableOutputRef tableOutput{
- .TableId = outputTableId,
- .PartId = GetTableRangeFromId(outputTableId).PartId
- };
- taskOutputTables.emplace_back(tableOutput);
+ void GetOperationInputTables(std::vector<TYtTableRef>& ytInputTables, std::vector<TFmrTableRef>& fmrInputTables, const TOperationParams& operationParams) {
+ TOperationInputTablesGetter tablesGetter{};
+ std::visit(tablesGetter, operationParams);
+
+ auto& inputTables = tablesGetter.OperationTableRef;
+ for (auto& table: inputTables) {
+ auto ytTable = std::get_if<TYtTableRef>(&table);
+ auto fmrTable = std::get_if<TFmrTableRef>(&table);
+ if (ytTable) {
+ ytInputTables.emplace_back(*ytTable);
+ } else {
+ fmrInputTables.emplace_back(*fmrTable);
}
- return taskOutputTables;
+ }
}
- TTaskParams MakeDefaultTaskParamsFromOperation(const TOperationParams& operationParams) {
- if (const TUploadOperationParams* uploadOperationParams = std::get_if<TUploadOperationParams>(&operationParams)) {
- TUploadTaskParams uploadTaskParams{};
- uploadTaskParams.Output = uploadOperationParams->Output;
- TString inputTableId = uploadOperationParams->Input.FmrTableId.Id;
- TFmrTableInputRef fmrTableInput{
- .TableId = inputTableId,
- .TableRanges = {GetTableRangeFromId(inputTableId)}
- };
- uploadTaskParams.Input = fmrTableInput;
- return uploadTaskParams;
- } else if (const TDownloadOperationParams* downloadOperationParams = std::get_if<TDownloadOperationParams>(&operationParams)) {
- TDownloadTaskParams downloadTaskParams{};
- downloadTaskParams.Input = downloadOperationParams->Input;
- TString outputTableId = downloadOperationParams->Output.FmrTableId.Id;
- TFmrTableOutputRef fmrTableOutput{
- .TableId = outputTableId,
- .PartId = GetTableRangeFromId(outputTableId).PartId
- };
- downloadTaskParams.Output = fmrTableOutput;
- return downloadTaskParams;
- } else if (const TMergeOperationParams* mergeOperationParams = std::get_if<TMergeOperationParams>(&operationParams)) {
- TMergeTaskParams mergeTaskParams;
- mergeTaskParams.Input = TaskInputTablesFromOperationInputTables(mergeOperationParams->Input);
- TFmrTableOutputRef outputTable;
- mergeTaskParams.Output = TFmrTableOutputRef{.TableId = mergeOperationParams->Output.FmrTableId.Id};
- return mergeTaskParams;
- } else if (const TMapOperationParams* mapOperationParams = std::get_if<TMapOperationParams>(&operationParams)) {
- TMapTaskParams mapTaskParams;
- mapTaskParams.Input = TaskInputTablesFromOperationInputTables(mapOperationParams->Input);
- mapTaskParams.Output = TaskOutputTablesFromOperationOutputTables(mapOperationParams->Output);
- mapTaskParams.Executable = mapOperationParams->Executable;
- return mapTaskParams;
- } else {
- ythrow yexception() << "Unknown operation params";
- }
+ std::vector<TTaskParams> GetOutputTaskParams(const TPartitionResult& partitionResult, const TOperationParams& operationParams) {
+ TOutputTaskParamsGetter taskGetter{.PartitionResult = partitionResult};
+ std::visit(taskGetter, operationParams);
+ return taskGetter.TaskParams;
}
- NYT::TNode GetJobSettings(const TMaybe<NYT::TNode>& currentFmrOperationSpec) {
- // For now fmr operation spec only consists of job settings
+ NYT::TNode GetMergedFmrOperationSpec(const TMaybe<NYT::TNode>& currentFmrOperationSpec) {
+ // just pass whole merged operation spec for simplicity here
if (!currentFmrOperationSpec) {
return DefaultFmrOperationSpec_;
}
@@ -369,7 +355,76 @@ private:
return resultFmrOperationSpec;
}
+ void SetPartIdsForTask(TTask::TPtr task) {
+ // TODO - add failover, clearing previous partId if exists
+ TString partId = GenerateId();
+
+ auto* downloadTaskParams = std::get_if<TDownloadTaskParams>(&task->TaskParams);
+ auto* mergeTaskParams = std::get_if<TMergeTaskParams>(&task->TaskParams);
+ auto* mapTaskParams = std::get_if<TMapTaskParams>(&task->TaskParams);
+ if (downloadTaskParams) {
+ TString tableId = downloadTaskParams->Output.TableId;
+ downloadTaskParams->Output.PartId = partId;
+ PartIdsForTables_[tableId].emplace_back(partId);
+ } else if (mergeTaskParams) {
+ TString tableId = mergeTaskParams->Output.TableId;
+ mergeTaskParams->Output.PartId = partId;
+ PartIdsForTables_[tableId].emplace_back(partId);
+ } else if (mapTaskParams) {
+ for (auto& fmrTableOutputRef: mapTaskParams->Output) {
+ TString tableId = fmrTableOutputRef.TableId;
+ fmrTableOutputRef.PartId = partId;
+ PartIdsForTables_[tableId].emplace_back(partId);
+ }
+ }
+ }
+
+ TTableStats CalculateTableStats(const TString& tableId, bool isOperationFinished = false) {
+ if (OperationTableStats_.contains(tableId)) {
+ return OperationTableStats_[tableId];
+ }
+ TTableStats tableStats{};
+ auto& partIds = PartIdsForTables_.at(tableId);
+ YQL_CLOG(DEBUG, FastMapReduce) << "Calculating table stats for table with id " << tableId << " with " << partIds.size() << " part ids";
+ for (auto& part: partIds) {
+ auto& partStats = PartIdStats_[part];
+ tableStats.Chunks += partStats.size();
+ YQL_CLOG(DEBUG, FastMapReduce) << " Gotten " << partStats.size() << " chunks for part id " << part;
+ for (auto& chunkStats: PartIdStats_[part]) {
+ tableStats.DataWeight += chunkStats.DataWeight;
+ tableStats.Rows += chunkStats.Rows;
+ }
+ }
+ if (isOperationFinished) {
+ // Stats for table won't change, inserting into map for caching
+ OperationTableStats_[tableId] = tableStats;
+ }
+ return tableStats;
+ }
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ struct TCoordinatorTaskInfo {
+ TTask::TPtr Task;
+ ETaskStatus TaskStatus;
+ TString OperationId;
+ };
+
+ struct TOperationInfo {
+ std::unordered_set<TString> TaskIds;
+ EOperationStatus OperationStatus;
+ std::vector<TFmrError> ErrorMessages;
+ TString SessionId;
+ std::unordered_set<TString> OutputTableIds = {};
+ };
+
+ struct TIdempotencyKeyInfo {
+ TString OperationId;
+ TInstant OperationCreationTime;
+ };
+
std::unordered_map<TString, TCoordinatorTaskInfo> Tasks_; // TaskId -> current info about it
+ std::queue<std::pair<TTask::TPtr, TString>> TasksToRun_; // Task, and TaskId
std::unordered_set<TString> TaskToDeleteIds_; // TaskIds we want to pass to worker for deletion
std::unordered_map<TString, TOperationInfo> Operations_; // OperationId -> current info about it
std::unordered_map<TString, TIdempotencyKeyInfo> IdempotencyKeys_; // IdempotencyKey -> current info about it
@@ -382,14 +437,90 @@ private:
std::atomic<bool> StopCoordinator_;
TDuration TimeToSleepBetweenClearKeyRequests_;
TDuration IdempotencyKeyStoreTime_;
- std::unordered_map<TFmrTableId, TCoordinatorFmrTableStats> FmrTableStatistics_; // TableId -> Statistics
+
+ std::unordered_map<TFmrTableId, std::vector<TString>> PartIdsForTables_; // TableId -> List of all corresponding partIds
+ std::unordered_map<TString, std::vector<TChunkStats>> PartIdStats_; // PartId -> Detailed statistic for each chunk
+ std::unordered_map<TString, TTableStats> OperationTableStats_; // TableId -> Statistic for fmr table, filled when operation completes
+
+
NYT::TNode DefaultFmrOperationSpec_;
+ IYtCoordinatorService::TPtr YtCoordinatorService_; // Needed for partitioning of yt tables
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ // Helper structs for partitioning operation into tasks
+
+ struct TOperationInputTablesGetter {
+ std::vector<TOperationTableRef> OperationTableRef; // will be filled when std::visit is called
+
+ void operator () (const TUploadOperationParams& uploadOperationParams) {
+ OperationTableRef.emplace_back(uploadOperationParams.Input);
+ }
+ void operator () (const TDownloadOperationParams& downloadOperationParams) {
+ OperationTableRef.emplace_back(downloadOperationParams.Input);
+ }
+ void operator () (const TMergeOperationParams& mergeOperationParams) {
+ OperationTableRef = mergeOperationParams.Input;
+ }
+ void operator () (const TMapOperationParams& mapOperationParams) {
+ OperationTableRef = mapOperationParams.Input;
+ }
+ };
+
+ struct TOutputTaskParamsGetter {
+ std::vector<TTaskParams> TaskParams; // Will be filled when std::visit is called
+ TPartitionResult PartitionResult;
+
+ void operator () (const TUploadOperationParams& uploadOperationParams) {
+ for (auto& task: PartitionResult.TaskInputs) {
+ TUploadTaskParams uploadTaskParams;
+ YQL_ENSURE(task.Inputs.size() == 1, "Upload task should have exactly one fmr table partition input");
+ auto& fmrTablePart = task.Inputs[0];
+ uploadTaskParams.Input = std::get<TFmrTableInputRef>(fmrTablePart);
+ uploadTaskParams.Output = uploadOperationParams.Output;
+ TaskParams.emplace_back(uploadTaskParams);
+ }
+ }
+ void operator () (const TDownloadOperationParams& downloadOperationParams) {
+ for (auto& task: PartitionResult.TaskInputs) {
+ TDownloadTaskParams downloadTaskParams;
+ YQL_ENSURE(task.Inputs.size() == 1, "Download task should have exactly one yt table partition input");
+ auto& ytTablePart = task.Inputs[0];
+ downloadTaskParams.Input = std::get<TYtTableTaskRef>(ytTablePart);
+ downloadTaskParams.Output = TFmrTableOutputRef{.TableId = downloadOperationParams.Output.FmrTableId.Id};
+ // PartId for tasks which write to table data service will be set later
+ TaskParams.emplace_back(downloadTaskParams);
+ }
+ }
+ void operator () (const TMergeOperationParams& mergeOperationParams) {
+ for (auto& task: PartitionResult.TaskInputs) {
+ TMergeTaskParams mergeTaskParams;
+ mergeTaskParams.Input = task;
+ mergeTaskParams.Output = TFmrTableOutputRef{.TableId = mergeOperationParams.Output.FmrTableId.Id};
+ TaskParams.emplace_back(mergeTaskParams);
+ }
+ }
+ void operator () (const TMapOperationParams& mapOperationParams) {
+ for (auto& task: PartitionResult.TaskInputs) {
+ TMapTaskParams mapTaskParams;
+ mapTaskParams.Input = task;
+ std::vector<TFmrTableOutputRef> fmrTableOutputRefs;
+ std::transform(mapOperationParams.Output.begin(), mapOperationParams.Output.end(), std::back_inserter(fmrTableOutputRefs), [] (const TFmrTableRef& fmrTableRef) {
+ return TFmrTableOutputRef{.TableId = fmrTableRef.FmrTableId.Id};
+ });
+
+ mapTaskParams.Output = fmrTableOutputRefs;
+ mapTaskParams.Executable = mapOperationParams.Executable; // TODO - change Executable to mapper
+ TaskParams.emplace_back(mapTaskParams);
+ }
+ }
+ };
};
} // namespace
-IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings) {
- return MakeIntrusive<TFmrCoordinator>(settings);
+IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings, IYtCoordinatorService::TPtr ytCoordinatorService) {
+ return MakeIntrusive<TFmrCoordinator>(settings, ytCoordinatorService);
}
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
index 563752dfc7b..ceac2a7b8cc 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
@@ -6,6 +6,8 @@
#include <util/system/guard.h>
#include <util/generic/queue.h>
#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h>
+#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.h>
+#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/yql_yt_coordinator_service_impl.h>
namespace NYql::NFmr {
@@ -19,6 +21,9 @@ struct TFmrCoordinatorSettings {
TFmrCoordinatorSettings();
};
-IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings = TFmrCoordinatorSettings());
+IFmrCoordinator::TPtr MakeFmrCoordinator(
+ const TFmrCoordinatorSettings& settings = TFmrCoordinatorSettings(),
+ IYtCoordinatorService::TPtr ytCoordinatorService = MakeYtCoordinatorService()
+);
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_partitioner.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_partitioner.cpp
new file mode 100644
index 00000000000..9f3f305130a
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_partitioner.cpp
@@ -0,0 +1,168 @@
+#include "yql_yt_partitioner.h"
+#include <library/cpp/iterator/enumerate.h>
+#include <yql/essentials/utils/log/log.h>
+#include <yql/essentials/utils/yql_panic.h>
+#include <yt/cpp/mapreduce/common/helpers.h>
+
+namespace NYql::NFmr {
+
+TFmrPartitioner::TFmrPartitioner(
+ const std::unordered_map<TFmrTableId, std::vector<TString>>& partIdsForTables,
+ const std::unordered_map<TString, std::vector<TChunkStats>>& partIdStats,
+ const TFmrPartitionerSettings& settings
+)
+ : PartIdsForTables_(partIdsForTables), PartIdStats_(partIdStats), Settings_(settings)
+{
+}
+
+std::pair<std::vector<TTaskTableInputRef>, bool> TFmrPartitioner::PartitionFmrTablesIntoTasks(const std::vector<TFmrTableRef>& fmrTables) {
+ // TODO - return matrix with ranges for all tables, in order to support table_index correctly.
+ if (fmrTables.empty()) {
+ return {{}, true};
+ }
+ const ui64 maxDataWeightPerPart = Settings_.MaxDataWeightPerPart;
+ std::vector<TTaskTableInputRef> currentFmrTasks;
+ std::vector<TLeftoverRange> leftoverRanges;
+ // First try to create tasks in which all chunks have the same partId, then handle leftovers (end of chunks for each partId)
+ for (const auto& fmrTable: fmrTables) {
+ YQL_ENSURE(PartIdsForTables_.contains(fmrTable.FmrTableId));
+ auto partIds = PartIdsForTables_.at(fmrTable.FmrTableId);
+ for (auto& partId: partIds) {
+ std::vector<TChunkStats> stats = PartIdStats_.at(partId);
+ HandleFmrPartition(fmrTable.FmrTableId, partId, stats, maxDataWeightPerPart, currentFmrTasks, leftoverRanges);
+ if (!CheckMaxTasksSize(currentFmrTasks)) {
+ return {{}, false};
+ }
+ }
+ }
+ HandleFmrLeftoverRanges(maxDataWeightPerPart, currentFmrTasks, leftoverRanges);
+ if (!CheckMaxTasksSize(currentFmrTasks)) {
+ return {{}, false};
+ }
+ return {currentFmrTasks, true};
+}
+
+void TFmrPartitioner::HandleFmrPartition(
+ const TFmrTableId& fmrTable,
+ const TString& partId,
+ const std::vector<TChunkStats> stats,
+ ui64 maxDataWeightPerPart,
+ std::vector<TTaskTableInputRef>& currentFmrTasks,
+ std::vector<TLeftoverRange>& leftoverRanges
+) {
+ ui64 curDataWeight = 0;
+ i64 curMinChunk = -1;
+
+ for (ui64 i = 0; i < stats.size();) {
+ if (curDataWeight + stats[i].DataWeight <= maxDataWeightPerPart) {
+ // check if we can add this chunk to current task, or have to split
+ curDataWeight += stats[i].DataWeight;
+ if (curMinChunk == -1) {
+ curMinChunk = i;
+ }
+ ++i;
+ } else {
+ if (curMinChunk != -1) {
+ std::vector<TTableRange> tableRange{TTableRange{.PartId = partId, .MinChunk = static_cast<ui64>(curMinChunk), .MaxChunk = i}};
+ TFmrTableInputRef fmrTableInput{.TableId = fmrTable.Id, .TableRanges = tableRange};
+ currentFmrTasks.emplace_back(TTaskTableInputRef{.Inputs = {fmrTableInput}});
+ }
+ curMinChunk = -1;
+ curDataWeight = 0;
+ ui64 j = i;
+ while (j < stats.size()) {
+ // iterate to create separate tasks for all chunks which are larger then maxDataWeight
+ if (stats[j].DataWeight < maxDataWeightPerPart) {
+ break;
+ }
+ std::vector<TTableRange> tableRange{TTableRange{.PartId = partId, .MinChunk = j, .MaxChunk = j + 1}};
+ TFmrTableInputRef fmrTableInput{.TableId = fmrTable.Id, .TableRanges = tableRange};
+ currentFmrTasks.emplace_back(TTaskTableInputRef{.Inputs = {fmrTableInput}});
+ ++j;
+ }
+ i = j;
+ }
+ }
+
+ if (curMinChunk != -1) {
+ TTableRange leftoverTableRange{.PartId = partId, .MinChunk = static_cast<ui64>(curMinChunk), .MaxChunk = stats.size()};
+ leftoverRanges.emplace_back(TLeftoverRange{.TableId = fmrTable.Id, .TableRange = leftoverTableRange, .DataWeight = curDataWeight});
+ }
+}
+
+void TFmrPartitioner::HandleFmrLeftoverRanges(
+ ui64 maxDataWeightPerPart,
+ std::vector<TTaskTableInputRef>& fmrTasks,
+ std::vector<TLeftoverRange>& leftoverRanges
+) {
+ TTaskTableInputRef currentTask{};
+ ui64 curDataWeight = 0;
+ TFmrTableInputRef curFmrTable;
+ TString curTableId;
+ for (auto& range: leftoverRanges) {
+ if (curDataWeight + range.DataWeight > maxDataWeightPerPart) {
+ if (curFmrTable != TFmrTableInputRef()) {
+ currentTask.Inputs.emplace_back(curFmrTable);
+ curFmrTable = TFmrTableInputRef();
+ curTableId = range.TableId;
+ }
+ fmrTasks.emplace_back(currentTask);
+ currentTask = TTaskTableInputRef();
+ curDataWeight = 0;
+ }
+ if (range.TableId != curTableId && curFmrTable != TFmrTableInputRef()) {
+ currentTask.Inputs.emplace_back(curFmrTable);
+ curFmrTable = TFmrTableInputRef();
+ }
+ curTableId = range.TableId;
+ curFmrTable.TableId = curTableId;
+ curFmrTable.TableRanges.emplace_back(range.TableRange);
+ curDataWeight += range.DataWeight;
+ }
+
+ currentTask.Inputs.emplace_back(curFmrTable);
+ fmrTasks.emplace_back(currentTask);
+}
+
+bool TFmrPartitioner::CheckMaxTasksSize(const std::vector<TTaskTableInputRef>& currentFmrTasks) {
+ return currentFmrTasks.size() <= Settings_.MaxParts;
+}
+
+TPartitionResult PartitionInputTablesIntoTasks(
+ const std::vector<TYtTableRef>& ytInputTables,
+ const std::vector<TFmrTableRef> fmrInputTables,
+ TFmrPartitioner& partitioner,
+ IYtCoordinatorService::TPtr ytCoordinatorService,
+ const std::unordered_map<TFmrTableId, TClusterConnection> &clusterConnections,
+ const TYtPartitionerSettings& ytPartitionSettings
+) {
+
+ std::vector<TTaskTableRef> tasks;
+ std::vector<TTaskTableInputRef> currentTasks;
+
+ auto [gottenFmrTasks, fmrPartitionStatus] = partitioner.PartitionFmrTablesIntoTasks(fmrInputTables);
+ if (!fmrPartitionStatus) {
+ return TPartitionResult{.PartitionStatus = false};
+ }
+ YQL_CLOG(INFO, FastMapReduce) << "Successfully partitioned input fmr tables into " << gottenFmrTasks.size() << " tasks";
+ for (auto& fmrTask: gottenFmrTasks) {
+ YQL_CLOG(DEBUG, FastMapReduce) << fmrTask;
+ currentTasks.emplace_back(fmrTask);
+ }
+ if (ytInputTables.empty()) {
+ return TPartitionResult{.TaskInputs = currentTasks, .PartitionStatus = true};
+ }
+ auto settings = ytPartitionSettings;
+ if (settings.MaxParts <= gottenFmrTasks.size()) {
+ return TPartitionResult{.PartitionStatus = false};
+ }
+ settings.MaxParts = ytPartitionSettings.MaxParts - gottenFmrTasks.size();
+ auto [gottenYtTasks, ytPartitionStatus] = ytCoordinatorService->PartitionYtTables(ytInputTables, clusterConnections, settings);
+ for (auto& ytTask: gottenYtTasks) {
+ currentTasks.emplace_back(TTaskTableInputRef{.Inputs = {ytTask}});
+ }
+ YQL_CLOG(INFO, FastMapReduce) << "Gotten " << currentTasks.size() << " yt and fmr tasks to run from operation input tables";
+ return TPartitionResult{.TaskInputs = currentTasks, .PartitionStatus = ytPartitionStatus};
+}
+
+}
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_partitioner.h b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_partitioner.h
new file mode 100644
index 00000000000..d053ecbcfdd
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_partitioner.h
@@ -0,0 +1,66 @@
+#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
+#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.h>
+
+namespace NYql::NFmr {
+
+struct TPartitionResult {
+ std::vector<TTaskTableInputRef> TaskInputs;
+ bool PartitionStatus = false;
+};
+
+struct TFmrPartitionerSettings {
+ ui64 MaxDataWeightPerPart = 0;
+ ui64 MaxParts = 0;
+};
+
+class TFmrPartitioner {
+public:
+ TFmrPartitioner(
+ const std::unordered_map<TFmrTableId, std::vector<TString>>& partIdsForTables,
+ const std::unordered_map<TString, std::vector<TChunkStats>>& partIdStats,
+ const TFmrPartitionerSettings& settings
+ );
+
+ std::pair<std::vector<TTaskTableInputRef>, bool> PartitionFmrTablesIntoTasks(const std::vector<TFmrTableRef>& fmrTables);
+
+private:
+ struct TLeftoverRange {
+ TString TableId;
+ TTableRange TableRange;
+ ui64 DataWeight;
+ };
+
+ void HandleFmrPartition(
+ const TFmrTableId& fmrTable,
+ const TString& partId,
+ const std::vector<TChunkStats> stats,
+ ui64 maxDataWeightPerPart,
+ std::vector<TTaskTableInputRef>& currentFmrTasks,
+ std::vector<TLeftoverRange>& leftoverRanges
+ );
+
+ void HandleFmrLeftoverRanges(
+ ui64 maxDataWeightPerPart,
+ std::vector<TTaskTableInputRef>& fmrTasks,
+ std::vector<TLeftoverRange>& leftoverRanges
+ );
+
+ bool CheckMaxTasksSize(const std::vector<TTaskTableInputRef>& currentFmrTasks);
+
+private:
+ const std::unordered_map<TFmrTableId, std::vector<TString>> PartIdsForTables_; // TableId -> all corresponding part ids.
+ const std::unordered_map<TString, std::vector<TChunkStats>> PartIdStats_; // PartId -> statistics for all existing chunks in it.
+ const TFmrPartitionerSettings Settings_;
+};
+
+TPartitionResult PartitionInputTablesIntoTasks(
+ const std::vector<TYtTableRef>& ytInputTables,
+ const std::vector<TFmrTableRef> fmrInputTables,
+ TFmrPartitioner& partitioner,
+ IYtCoordinatorService::TPtr ytCoordinatorService,
+ const std::unordered_map<TFmrTableId, TClusterConnection> &clusterConnections,
+ const TYtPartitionerSettings& ytPartitionSettings
+);
+
+} // namespace NYql::NFmr
+
diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp
index 1e2407b1ea5..d237adab447 100644
--- a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp
@@ -13,6 +13,7 @@ NProto::THeartbeatRequest HeartbeatRequestToProto(const THeartbeatRequest& heart
protoHeartbeatRequest.AddTaskStates();
protoHeartbeatRequest.MutableTaskStates(i)->Swap(&protoTaskState);
}
+ protoHeartbeatRequest.SetAvailableSlots(heartbeatRequest.AvailableSlots);
return protoHeartbeatRequest;
}
@@ -26,6 +27,7 @@ THeartbeatRequest HeartbeatRequestFromProto(const NProto::THeartbeatRequest prot
taskStates.emplace_back(TIntrusivePtr<TTaskState>(new TTaskState(curTaskState)));
}
heartbeatRequest.TaskStates = taskStates;
+ heartbeatRequest.AvailableSlots = protoHeartbeatRequest.GetAvailableSlots();
return heartbeatRequest;
}
@@ -70,7 +72,7 @@ NProto::TStartOperationRequest StartOperationRequestToProto(const TStartOperatio
protoStartOperationRequest.SetIdempotencyKey(*startOperationRequest.IdempotencyKey);
}
protoStartOperationRequest.SetNumRetries(startOperationRequest.NumRetries);
- auto clusterConnections = *protoStartOperationRequest.MutableClusterConnections();
+ auto& clusterConnections = *protoStartOperationRequest.MutableClusterConnections();
for (auto& [tableName, conn]: startOperationRequest.ClusterConnections) {
clusterConnections[tableName.Id] = ClusterConnectionToProto(conn);
}
@@ -122,6 +124,11 @@ NProto::TGetOperationResponse GetOperationResponseToProto(const TGetOperationRes
auto protoError = FmrErrorToProto(errorMessage);
curError->Swap(&protoError);
}
+ for (auto& tableStats: getOperationResponse.OutputTablesStats) {
+ auto* curTableStats = protoGetOperationResponse.AddTableStats();
+ auto protoTableStats = TableStatsToProto(tableStats);
+ curTableStats->Swap(&protoTableStats);
+ }
return protoGetOperationResponse;
}
@@ -129,11 +136,17 @@ TGetOperationResponse GetOperationResponseFromProto(const NProto::TGetOperationR
TGetOperationResponse getOperationResponse;
getOperationResponse.Status = static_cast<EOperationStatus>(protoGetOperationReponse.GetStatus());
std::vector<TFmrError> errorMessages;
+ std::vector<TTableStats> outputTableStats;
for (size_t i = 0; i < protoGetOperationReponse.ErrorMessagesSize(); ++i) {
TFmrError errorMessage = FmrErrorFromProto(protoGetOperationReponse.GetErrorMessages(i));
errorMessages.emplace_back(errorMessage);
}
+ for (size_t i = 0; i < protoGetOperationReponse.TableStatsSize(); ++i) {
+ TTableStats tableStats = TableStatsFromProto(protoGetOperationReponse.GetTableStats(i));
+ outputTableStats.emplace_back(tableStats);
+ }
getOperationResponse.ErrorMessages = errorMessages;
+ getOperationResponse.OutputTablesStats = outputTableStats;
return getOperationResponse;
}
diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
index a42cb0d35a6..be016c230b2 100644
--- a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
+++ b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
@@ -11,6 +11,7 @@ struct THeartbeatRequest {
ui32 WorkerId;
TString VolatileId;
std::vector<TTaskState::TPtr> TaskStates;
+ ui64 AvailableSlots = 0;
};
// Worker sends requests in loop or long polling
@@ -57,7 +58,7 @@ struct TGetFmrTableInfoRequest {
};
struct TGetFmrTableInfoResponse {
- TTableStats TableStats; // for only one PartId
+ TTableStats TableStats;
std::vector<TFmrError> ErrorMessages = {};
};
diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/ya.make b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/ya.make
new file mode 100644
index 00000000000..0f2075ad1b3
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/ya.make
@@ -0,0 +1,13 @@
+UNITTEST()
+
+SRCS(
+ yql_yt_coordinator_service_ut.cpp
+)
+
+PEERDIR(
+ yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/yql_yt_coordinator_service_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/yql_yt_coordinator_service_ut.cpp
new file mode 100644
index 00000000000..75ade8a9e05
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ut/yql_yt_coordinator_service_ut.cpp
@@ -0,0 +1,43 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/stream/file.h>
+#include <util/system/tempfile.h>
+#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.h>
+
+namespace NYql::NFmr {
+
+Y_UNIT_TEST_SUITE(FileYtCoordinatorServiceTests) {
+ Y_UNIT_TEST(PartitionFiles) {
+ const i64 FileNums = 5;
+
+ std::vector<THolder<TTempFileHandle>> fileHandles(FileNums);
+ std::vector<TYtTableRef> ytTables(FileNums, TYtTableRef());
+ std::vector<i64> fileLengths = {30, 10, 20, 5, 40};
+
+ for (int i = 0; i < FileNums; ++i) {
+ fileHandles[i] = MakeHolder<TTempFileHandle>();
+ auto curFileName= fileHandles[i]->Name();
+ ytTables[i].FilePath = curFileName;
+ TFileOutput writer(curFileName);
+ writer.Write(TString("1") * fileLengths[i]);
+ writer.Flush();
+ }
+
+ auto fileService = MakeFileYtCoordinatorService();
+ auto settings = TYtPartitionerSettings{.MaxDataWeightPerPart = 50, .MaxParts = 100};
+ auto [gottenPartitions, status] = fileService->PartitionYtTables(ytTables, {}, settings);
+ UNIT_ASSERT_VALUES_EQUAL(status, true);
+
+ std::vector<std::vector<TString>> expectedFilePartitions = {
+ {fileHandles[0]->Name(), fileHandles[1]->Name()},
+ {fileHandles[2]->Name(), fileHandles[3]->Name()},
+ {fileHandles[4]->Name()}
+ };
+ std::vector<std::vector<TString>> gottenFileParititons;
+ for (auto& part: gottenPartitions) {
+ gottenFileParititons.emplace_back(part.FilePaths);
+ }
+ UNIT_ASSERT_VALUES_EQUAL(gottenFileParititons, expectedFilePartitions);
+ }
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ya.make b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ya.make
new file mode 100644
index 00000000000..b307ab11fae
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/ya.make
@@ -0,0 +1,16 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_file_coordinator_service.cpp
+)
+
+PEERDIR(
+ yt/cpp/mapreduce/common
+ yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
+
+RECURSE_FOR_TESTS(ut)
diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.cpp b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.cpp
new file mode 100644
index 00000000000..1d5dfb2d019
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.cpp
@@ -0,0 +1,59 @@
+#include "yql_yt_file_coordinator_service.h"
+
+#include <library/cpp/yson/parser.h>
+#include <util/stream/file.h>
+#include <util/system/fstat.h>
+#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h>
+#include <yt/yql/providers/yt/lib/yson_helpers/yson_helpers.h>
+#include <yql/essentials/utils/yql_panic.h>
+
+namespace NYql::NFmr {
+
+namespace {
+
+class TFileYtCoordinatorService: public IYtCoordinatorService {
+public:
+
+ std::pair<std::vector<TYtTableTaskRef>, bool> PartitionYtTables(
+ const std::vector<TYtTableRef>& ytTables,
+ const std::unordered_map<TFmrTableId, TClusterConnection>& /*clusterConnections*/,
+ const TYtPartitionerSettings& settings
+ ) override {
+ const i64 maxDataWeightPerPart = settings.MaxDataWeightPerPart;
+ std::vector<TYtTableTaskRef> ytPartitions;
+ TYtTableTaskRef curYtTableTaskRef{};
+ i64 curFileLength = 0;
+ for (auto& ytTable: ytTables) {
+ YQL_ENSURE(ytTable.FilePath);
+ auto fileLength = GetFileLength(*ytTable.FilePath);
+ if (fileLength + curFileLength > maxDataWeightPerPart) {
+ ytPartitions.emplace_back(curYtTableTaskRef);
+ if (ytPartitions.size() > settings.MaxParts) {
+ return {{}, false};
+ }
+ curYtTableTaskRef = TYtTableTaskRef{};
+ curFileLength = 0;
+ }
+ TString ytPath = NYT::AddPathPrefix(ytTable.Path, "//");
+ auto richPath = NYT::TRichYPath(ytPath).Append(true);
+ // append RichPath just in case, TODO - figure out if we actually need to use it somewhere
+ curYtTableTaskRef.RichPaths.emplace_back(richPath);
+ curYtTableTaskRef.FilePaths.emplace_back(*ytTable.FilePath);
+ curFileLength += fileLength;
+ }
+ ytPartitions.emplace_back(curYtTableTaskRef);
+ if (ytPartitions.size() > settings.MaxParts) {
+ return {{}, false};
+ }
+ return {ytPartitions, true};
+ }
+};
+
+} // namespace
+
+IYtCoordinatorService::TPtr MakeFileYtCoordinatorService() {
+ return MakeIntrusive<TFileYtCoordinatorService>();
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.h b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.h
new file mode 100644
index 00000000000..7d5ca59e547
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.h
@@ -0,0 +1,7 @@
+#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.h>
+
+namespace NYql::NFmr {
+
+IYtCoordinatorService::TPtr MakeFileYtCoordinatorService();
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/ya.make b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/ya.make
new file mode 100644
index 00000000000..bea26d3843a
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_coordinator_service_impl.cpp
+)
+
+PEERDIR(
+ yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/yql_yt_coordinator_service_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/yql_yt_coordinator_service_impl.cpp
new file mode 100644
index 00000000000..bb556251ed9
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/yql_yt_coordinator_service_impl.cpp
@@ -0,0 +1,96 @@
+#include "yql_yt_coordinator_service_impl.h"
+
+#include <library/cpp/yt/error/error.h>
+#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/yql/providers/yt/fmr/utils/yql_yt_client.h>
+#include <yql/essentials/utils/log/log.h>
+#include <yql/essentials/utils/yql_panic.h>
+
+namespace NYql::NFmr {
+
+namespace {
+
+class TYtCoordinatorService: public IYtCoordinatorService {
+public:
+
+ std::pair<std::vector<TYtTableTaskRef>, bool> PartitionYtTables(
+ const std::vector<TYtTableRef>& ytTables,
+ const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections,
+ const TYtPartitionerSettings& settings
+ ) override {
+ auto getTablePartitionsOptions = NYT::TGetTablePartitionsOptions()
+ .PartitionMode(NYT::ETablePartitionMode::Unordered)
+ .DataWeightPerPartition(settings.MaxDataWeightPerPart)
+ .MaxPartitionCount(settings.MaxParts)
+ .AdjustDataWeightPerPartition(false); // TODO - add adjust data weight into partitioner settings
+
+ std::vector<TYtTableTaskRef> ytPartitions;
+ auto groupedYtTables = GroupYtTables(ytTables, clusterConnections);
+ for (auto& [ytTables, clusterConnection]: groupedYtTables) {
+ auto client = CreateClient(clusterConnection);
+ auto transaction = client->AttachTransaction(GetGuid(clusterConnection.TransactionId));
+ TVector<NYT::TRichYPath> richPaths;
+ for (auto& ytTable: ytTables ) {
+ TString ytPath = NYT::AddPathPrefix(ytTable.Path, "//");
+ richPaths.emplace_back(NYT::TRichYPath(ytPath).Cluster(ytTable.Cluster));
+ }
+ try {
+ NYT::TMultiTablePartitions partitions = transaction->GetTablePartitions(richPaths, getTablePartitionsOptions);
+
+ for (const auto& partition : partitions.Partitions) {
+ TYtTableTaskRef ytTableTaskRef{};
+ for (const auto& richPath : partition.TableRanges) {
+ ytTableTaskRef.RichPaths.emplace_back(richPath);
+ }
+ ytPartitions.emplace_back(ytTableTaskRef);
+ }
+ } catch (NYT::TErrorException& ex) {
+ YQL_CLOG(INFO, FastMapReduce) << "Failed to partition yt tables with message: " << CurrentExceptionMessage();
+ return {{}, false};
+ }
+ }
+ YQL_CLOG(INFO, FastMapReduce) << "partitioned input yt tables into " << ytPartitions.size() << " tasks";
+ for (auto& task: ytPartitions) {
+ YQL_CLOG(DEBUG, FastMapReduce) << task;
+ }
+ return {ytPartitions, true};
+ }
+
+private:
+ struct TGroupedYtTablesByCluster {
+ std::vector<TYtTableRef> YtTables;
+ TClusterConnection ClusterConnection;
+ };
+
+ std::vector<TGroupedYtTablesByCluster> GroupYtTables(
+ const std::vector<TYtTableRef>& ytTables,
+ const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections
+ ) {
+ std::vector<TGroupedYtTablesByCluster> tableGroups;
+ std::unordered_map<TString, ui64> ytServerToGroups;
+ for (auto& ytTable: ytTables) {
+ auto fmrTableId = TFmrTableId(ytTable.Cluster, ytTable.Path);
+ auto clusterConnection = clusterConnections.at(fmrTableId);
+ auto ytServerName = clusterConnection.YtServerName;
+ if (!ytServerToGroups.contains(ytServerName)) {
+ tableGroups.emplace_back(TGroupedYtTablesByCluster{
+ .YtTables = {ytTable},
+ .ClusterConnection = clusterConnection
+ });
+ ytServerToGroups[ytServerName] = tableGroups.size() - 1;
+ } else {
+ auto index = ytServerToGroups[ytServerName];
+ tableGroups[index].YtTables.emplace_back(ytTable);
+ }
+ }
+ return tableGroups;
+ }
+};
+
+} // namespace
+
+IYtCoordinatorService::TPtr MakeYtCoordinatorService() {
+ return MakeIntrusive<TYtCoordinatorService>();
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/yql_yt_coordinator_service_impl.h b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/yql_yt_coordinator_service_impl.h
new file mode 100644
index 00000000000..44a95233140
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/yql_yt_coordinator_service_impl.h
@@ -0,0 +1,7 @@
+#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.h>
+
+namespace NYql::NFmr {
+
+IYtCoordinatorService::TPtr MakeYtCoordinatorService();
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/ya.make b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/ya.make
new file mode 100644
index 00000000000..612f56f02d0
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_coordinator_service_interface.cpp
+)
+
+PEERDIR(
+ yt/cpp/mapreduce/interface
+ yt/yql/providers/yt/fmr/request_options
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.cpp b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.cpp
new file mode 100644
index 00000000000..450c685eced
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.cpp
@@ -0,0 +1 @@
+#include "yql_yt_coordinator_service_interface.h"
diff --git a/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.h b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.h
new file mode 100644
index 00000000000..13c01f5d8a8
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/interface/yql_yt_coordinator_service_interface.h
@@ -0,0 +1,25 @@
+#pragma once
+
+#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
+
+namespace NYql::NFmr {
+
+struct TYtPartitionerSettings {
+ ui64 MaxDataWeightPerPart = 0;
+ ui64 MaxParts = 0;
+};
+
+class IYtCoordinatorService: public TThrRefBase {
+public:
+ virtual ~IYtCoordinatorService() = default;
+
+ using TPtr = TIntrusivePtr<IYtCoordinatorService>;
+
+ virtual std::pair<std::vector<TYtTableTaskRef>, bool> PartitionYtTables(
+ const std::vector<TYtTableRef>& ytTables,
+ const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections,
+ const TYtPartitionerSettings& settings
+ ) = 0;
+};
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/fmr_tool_lib/ya.make b/yt/yql/providers/yt/fmr/fmr_tool_lib/ya.make
index b0c5e5cdf4f..dbada26432e 100644
--- a/yt/yql/providers/yt/fmr/fmr_tool_lib/ya.make
+++ b/yt/yql/providers/yt/fmr/fmr_tool_lib/ya.make
@@ -8,12 +8,14 @@ PEERDIR(
yt/yql/providers/yt/gateway/fmr
yt/yql/providers/yt/fmr/coordinator/client
yt/yql/providers/yt/fmr/coordinator/impl
+ yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file
+ yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl
yt/yql/providers/yt/fmr/job/impl
yt/yql/providers/yt/fmr/job_factory/impl
yt/yql/providers/yt/fmr/table_data_service/local
yt/yql/providers/yt/fmr/worker/impl
- yt/yql/providers/yt/fmr/yt_service/file
- yt/yql/providers/yt/fmr/yt_service/impl
+ yt/yql/providers/yt/fmr/yt_job_service/file
+ yt/yql/providers/yt/fmr/yt_job_service/impl
)
YQL_LAST_ABI_VERSION()
diff --git a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp
index 1d6bc504a5e..7849baa2517 100644
--- a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp
+++ b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp
@@ -11,7 +11,7 @@ std::pair<IYtGateway::TPtr, IFmrWorker::TPtr> InitializeFmrGateway(IYtGateway::T
coordinatorSettings.DefaultFmrOperationSpec = fmrOperationSpec;
}
- auto coordinator = MakeFmrCoordinator(coordinatorSettings);
+ auto coordinator = isFileGateway ? MakeFmrCoordinator(coordinatorSettings, MakeFileYtCoordinatorService()) : MakeFmrCoordinator(coordinatorSettings, MakeYtCoordinatorService());
if (!coordinatorServerUrl.empty()) {
TFmrCoordinatorClientSettings coordinatorClientSettings;
THttpURL parsedUrl;
@@ -26,10 +26,10 @@ std::pair<IYtGateway::TPtr, IFmrWorker::TPtr> InitializeFmrGateway(IYtGateway::T
IFmrWorker::TPtr worker = nullptr;
if (!disableLocalFmrWorker) {
auto tableDataService = MakeLocalTableDataService(TLocalTableDataServiceSettings(3));
- auto fmrYtSerivce = isFileGateway ? MakeFileYtSerivce() : MakeFmrYtSerivce();
+ auto fmrYtJobSerivce = isFileGateway ? MakeFileYtJobSerivce() : MakeYtJobSerivce();
- auto func = [tableDataService, fmrYtSerivce] (NFmr::TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) mutable {
- return RunJob(task, tableDataService, fmrYtSerivce, cancelFlag);
+ auto func = [tableDataService, fmrYtJobSerivce] (NFmr::TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) mutable {
+ return RunJob(task, tableDataService, fmrYtJobSerivce, cancelFlag);
};
NFmr::TFmrJobFactorySettings settings{.Function=func};
diff --git a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h
index 24f17928e17..743f856eb8c 100644
--- a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h
+++ b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h
@@ -4,11 +4,14 @@
#include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h>
#include <yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.h>
#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h>
+#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.h>
+#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/impl/yql_yt_coordinator_service_impl.h>
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h>
#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
#include <yt/yql/providers/yt/fmr/table_data_service/local/yql_yt_table_data_service_local.h>
-#include <yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h>
-#include <yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h>
+#include <yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.h>
+#include <yt/yql/providers/yt/fmr/yt_job_service/impl/yql_yt_job_service_impl.h>
+
namespace NYql::NFmr {
diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/ya.make b/yt/yql/providers/yt/fmr/job/impl/ut/ya.make
index a997d1c1983..6b372d758b3 100644
--- a/yt/yql/providers/yt/fmr/job/impl/ut/ya.make
+++ b/yt/yql/providers/yt/fmr/job/impl/ut/ya.make
@@ -7,8 +7,9 @@ SRCS(
)
PEERDIR(
+ yt/cpp/mapreduce/common
yt/yql/providers/yt/fmr/job/impl
- yt/yql/providers/yt/fmr/yt_service/mock
+ yt/yql/providers/yt/fmr/yt_job_service/mock
yt/yql/providers/yt/fmr/table_data_service/local
yql/essentials/utils/log
yql/essentials/parser/pg_wrapper
diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
index 6b9ce068652..614fe30a44d 100644
--- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
@@ -1,8 +1,9 @@
#include <library/cpp/testing/unittest/registar.h>
+#include <yt/cpp/mapreduce/common/helpers.h>
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h>
#include <yt/yql/providers/yt/fmr/table_data_service/local/yql_yt_table_data_service_local.h>
#include <yt/yql/providers/yt/fmr/utils/yql_yt_table_data_service_key.h>
-#include <yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h>
+#include <yt/yql/providers/yt/fmr/yt_job_service/mock/yql_yt_job_service_mock.h>
namespace NYql::NFmr {
@@ -36,12 +37,13 @@ TString GetTextYson(const TString& binaryYsonContent) {
Y_UNIT_TEST_SUITE(FmrJobTests) {
Y_UNIT_TEST(DownloadTable) {
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
- TYtTableRef input = TYtTableRef("test_cluster", "test_path");
- std::unordered_map<TYtTableRef, TString> inputTables{{input, TableContent_1}};
+ auto richPath = NYT::TRichYPath("//test_path").Cluster("test_cluster");
+ TYtTableTaskRef input = TYtTableTaskRef{.RichPaths = {richPath}};
+ std::unordered_map<TString, TString> inputTables{{NYT::NodeToCanonicalYsonString(NYT::PathToNode(richPath)), TableContent_1}};
std::unordered_map<TYtTableRef, TString> outputTables;
- NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables);
+ NYql::NFmr::IYtJobService::TPtr ytJobService = MakeMockYtJobService(inputTables, outputTables);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService);
+ IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytJobService);
TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id", "test_part_id");
TDownloadTaskParams params = TDownloadTaskParams(input, output);
@@ -53,7 +55,10 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
auto statistics = std::get_if<TStatistics>(&res);
UNIT_ASSERT_C(!err, err->ErrorMessage);
- UNIT_ASSERT_EQUAL(statistics->OutputTables.at(output).Rows, 4);
+ auto detailedChunkStats = statistics->OutputTables.at(output).PartIdChunkStats;
+ UNIT_ASSERT_VALUES_EQUAL(detailedChunkStats.size(), 1); // coordinator settings taken from file with default values, so large chunk size
+ UNIT_ASSERT_VALUES_EQUAL(detailedChunkStats[0].Rows, 4);
+
auto resultTableContent = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync();
UNIT_ASSERT_C(resultTableContent, "Result table content is empty");
UNIT_ASSERT_NO_DIFF(*resultTableContent, GetBinaryYson(TableContent_1));
@@ -61,11 +66,12 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
Y_UNIT_TEST(UploadTable) {
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
- std::unordered_map<TYtTableRef, TString> inputTables, outputTables;
- NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables);
+ std::unordered_map<TString, TString> inputTables;
+ std::unordered_map<TYtTableRef, TString> outputTables;
+ NYql::NFmr::IYtJobService::TPtr ytJobService = MakeMockYtJobService(inputTables, outputTables);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService);
+ IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytJobService);
TYtTableRef output = TYtTableRef("test_cluster", "test_path");
std::vector<TTableRange> ranges = {{"test_part_id"}};
@@ -89,20 +95,21 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
std::vector<TTableRange> ranges = {{"test_part_id"}};
TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges};
- TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster");
+ auto richPath = NYT::TRichYPath("//test_path").Cluster("test_cluster");
+ TYtTableTaskRef input_2 = TYtTableTaskRef{.RichPaths = {richPath}};
TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges};
- std::unordered_map<TYtTableRef, TString> inputTables{{input_2, TableContent_2}};
+ std::unordered_map<TString, TString> inputTables{{NYT::NodeToCanonicalYsonString(NYT::PathToNode(richPath)), TableContent_2}};
std::unordered_map<TYtTableRef, TString> outputTables;
- NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables);
+ NYql::NFmr::IYtJobService::TPtr ytJobService = MakeMockYtJobService(inputTables, outputTables);
auto cancelFlag = std::make_shared<std::atomic<bool>>(false);
- IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService);
+ IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytJobService);
TTaskTableRef input_table_ref_1 = {input_1};
TTaskTableRef input_table_ref_2 = {input_2};
TTaskTableRef input_table_ref_3 = {input_3};
TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id_output", "test_part_id");
std::vector<TTaskTableRef> inputs = {input_table_ref_1, input_table_ref_2, input_table_ref_3};
- auto params = TMergeTaskParams(inputs, output);
+ auto params = TMergeTaskParams{.Input = TTaskTableInputRef{.Inputs = inputs}, .Output = output};
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0);
@@ -130,17 +137,19 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
Y_UNIT_TEST_SUITE(TaskRunTests) {
Y_UNIT_TEST(RunDownloadTask) {
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
- TYtTableRef input = TYtTableRef("test_cluster", "test_path");
- std::unordered_map<TYtTableRef, TString> inputTables{{input, TableContent_1}};
+ auto richPath = NYT::TRichYPath("//test_path").Cluster("test_cluster");
+ TYtTableTaskRef input = TYtTableTaskRef{.RichPaths = {richPath}};
+ TFmrTableId inputFmrId("test_cluster", "test_path");
+ std::unordered_map<TString, TString> inputTables{{NYT::NodeToCanonicalYsonString(NYT::PathToNode(richPath)), TableContent_1}};
std::unordered_map<TYtTableRef, TString> outputTables;
- NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables);
+ NYql::NFmr::IYtJobService::TPtr ytJobService = MakeMockYtJobService(inputTables, outputTables);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id", "test_part_id");
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
TDownloadTaskParams params = TDownloadTaskParams(input, output);
TTask::TPtr task = MakeTask(ETaskType::Download, "test_task_id", params, "test_session_id", {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}});
- ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
+ ETaskStatus status = RunJob(task, tableDataServicePtr, ytJobService, cancelFlag).TaskStatus;
UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed);
auto resultTableContent = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync();
@@ -151,8 +160,9 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
Y_UNIT_TEST(RunUploadTask) {
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
- std::unordered_map<TYtTableRef, TString> inputTables, outputTables;
- NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables);
+ std::unordered_map<TString, TString> inputTables;
+ std::unordered_map<TYtTableRef, TString> outputTables;
+ NYql::NFmr::IYtJobService::TPtr ytJobService = MakeMockYtJobService(inputTables, outputTables);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
std::vector<TTableRange> ranges = {{"test_part_id"}};
@@ -163,7 +173,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}});
auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0);
tableDataServicePtr->Put(key, GetBinaryYson(TableContent_1));
- ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
+ ETaskStatus status = RunJob(task, tableDataServicePtr, ytJobService, cancelFlag).TaskStatus;
UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed);
UNIT_ASSERT(outputTables.contains(output));
@@ -172,8 +182,9 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
Y_UNIT_TEST(RunUploadTaskWithNoTable) {
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
- std::unordered_map<TYtTableRef, TString> inputTables, outputTables;
- NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables);
+ std::unordered_map<TString, TString> inputTables;
+ std::unordered_map<TYtTableRef, TString> outputTables;
+ NYql::NFmr::IYtJobService::TPtr ytJobService = MakeMockYtJobService(inputTables, outputTables);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
std::vector<TTableRange> ranges = {{"test_part_id"}};
@@ -185,7 +196,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
// No tables in tableDataService
UNIT_ASSERT_EXCEPTION_CONTAINS(
- RunJob(task, tableDataServicePtr, ytService, cancelFlag),
+ RunJob(task, tableDataServicePtr, ytJobService, cancelFlag),
yexception,
"No data for chunk:test_table_id:test_part_id"
);
@@ -195,11 +206,13 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
std::vector<TTableRange> ranges = {{"test_part_id"}};
TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges};
- TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster");
+ auto richPath = NYT::TRichYPath("//test_path").Cluster("test_cluster");
+ TYtTableTaskRef input_2 = TYtTableTaskRef{.RichPaths = {richPath}};
+ TFmrTableId inputFmrId_2("test_cluster", "test_path");
TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges};
- std::unordered_map<TYtTableRef, TString> inputTables{{input_2, TableContent_2}};
+ std::unordered_map<TString, TString> inputTables{{NYT::NodeToCanonicalYsonString(NYT::PathToNode(richPath)), TableContent_2}};
std::unordered_map<TYtTableRef, TString> outputTables;
- NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables);
+ NYql::NFmr::IYtJobService::TPtr ytJobService = MakeMockYtJobService(inputTables, outputTables);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
TTaskTableRef input_table_ref_1 = {input_1};
@@ -207,7 +220,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
TTaskTableRef input_table_ref_3 = {input_3};
TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id_output", "test_part_id");
std::vector<TTaskTableRef> inputs = {input_table_ref_1, input_table_ref_2, input_table_ref_3};
- auto params = TMergeTaskParams(inputs, output);
+ auto params = TMergeTaskParams{.Input = TTaskTableInputRef{.Inputs = inputs}, .Output = output};
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
TTask::TPtr task = MakeTask(ETaskType::Merge, "test_task_id", params, "test_session_id", {{TFmrTableId("test_cluster", "test_path"), TClusterConnection()}});
@@ -217,7 +230,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
tableDataServicePtr->Put(key_1, GetBinaryYson(TableContent_1));
tableDataServicePtr->Put(key_3, GetBinaryYson(TableContent_3));
- ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
+ ETaskStatus status = RunJob(task, tableDataServicePtr, ytJobService, cancelFlag).TaskStatus;
UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed);
auto resultTableContentMaybe = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync();
diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_writer_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_writer_ut.cpp
index 925cdfff125..4d1a9860e48 100644
--- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_writer_ut.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_table_data_service_writer_ut.cpp
@@ -12,7 +12,7 @@ const std::vector<TString> TableYsonRows = {
"{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"
};
-TTableStats WriteDataToTableDataSerice(
+TTableChunkStats WriteDataToTableDataSerice(
ITableDataService::TPtr tableDataService,
const std::vector<TString>& tableYsonRows,
ui64 chunkSize,
@@ -34,18 +34,28 @@ TTableStats WriteDataToTableDataSerice(
Y_UNIT_TEST_SUITE(FmrWriterTests) {
Y_UNIT_TEST(WriteYsonRows) {
- ui64 totalSize = 0;
- for (auto& row: TableYsonRows) {
+ ui64 totalSize = 0, firstPartSize = 0, secPartSize = 0;
+ for (ui64 i = 0; i < TableYsonRows.size(); ++i) {
+ auto& row = TableYsonRows[i];
totalSize += row.size();
+ if (i < 2) {
+ firstPartSize += row.size();
+ } else {
+ secPartSize += row.size();
+ }
}
ui64 chunkSize = totalSize / 2;
ITableDataService::TPtr tableDataService = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+
auto stats = WriteDataToTableDataSerice(tableDataService, TableYsonRows, chunkSize);
- auto realChunks = stats.Chunks;
- auto realDataWeight =stats.DataWeight;
- UNIT_ASSERT_VALUES_EQUAL(realChunks, 2);
- UNIT_ASSERT_VALUES_EQUAL(realDataWeight, totalSize);
+ UNIT_ASSERT_VALUES_EQUAL(stats.PartId, "partId");
+ std::vector<TChunkStats> gottenPartIdChunkStats = stats.PartIdChunkStats;
+ std::vector<TChunkStats> expectedChunkStats = {
+ TChunkStats{.Rows = 2, .DataWeight = firstPartSize},
+ TChunkStats{.Rows = 2, .DataWeight = secPartSize}
+ };
+ UNIT_ASSERT(gottenPartIdChunkStats == expectedChunkStats);
TString expectedFirstChunkTableContent = JoinRange(TStringBuf(), TableYsonRows.begin(), TableYsonRows.begin() + 2);
TString expectedSecondChunkTableContent = JoinRange(TStringBuf(), TableYsonRows.begin() + 2, TableYsonRows.end());
diff --git a/yt/yql/providers/yt/fmr/job/impl/ya.make b/yt/yql/providers/yt/fmr/job/impl/ya.make
index 02b5058984a..dc29cebd265 100644
--- a/yt/yql/providers/yt/fmr/job/impl/ya.make
+++ b/yt/yql/providers/yt/fmr/job/impl/ya.make
@@ -11,10 +11,10 @@ PEERDIR(
library/cpp/yson/node
yt/cpp/mapreduce/interface
yt/yql/providers/yt/fmr/job/interface
- yt/yql/providers/yt/fmr/request_options
yt/yql/providers/yt/fmr/utils
yt/yql/providers/yt/fmr/table_data_service/interface
yql/essentials/utils
+ yql/essentials/utils/log
)
YQL_LAST_ABI_VERSION()
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
index e3ceec83dc7..24926bb415a 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
@@ -3,12 +3,13 @@
#include <util/stream/file.h>
+#include <yt/cpp/mapreduce/common/helpers.h> // Для логов, потом мб убрать
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h>
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h>
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h>
#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
#include <yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h>
-#include <yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h>
+#include <yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.h>
#include <yql/essentials/utils/log/log.h>
@@ -17,8 +18,8 @@ namespace NYql::NFmr {
class TFmrJob: public IFmrJob {
public:
- TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, const TFmrJobSettings& settings)
- : TableDataService_(tableDataService), YtService_(ytService), Settings_(settings)
+ TFmrJob(ITableDataService::TPtr tableDataService, IYtJobService::TPtr ytJobService, const TFmrJobSettings& settings)
+ : TableDataService_(tableDataService), YtJobService_(ytJobService), Settings_(settings)
{
}
@@ -28,25 +29,26 @@ public:
std::shared_ptr<std::atomic<bool>> cancelFlag
) override {
try {
- const auto ytTable = params.Input;
- const auto cluster = params.Input.Cluster;
- const auto path = params.Input.Path;
+ const auto ytTableTaskRef = params.Input;
const auto output = params.Output;
const auto tableId = output.TableId;
const auto partId = output.PartId;
- YQL_CLOG(DEBUG, FastMapReduce) << "Downloading " << cluster << '.' << path;
YQL_ENSURE(clusterConnections.size() == 1);
- auto ytTableReader = YtService_->MakeReader(ytTable, clusterConnections.begin()->second, Settings_.YtReaderSettings);
+
+ std::vector<NYT::TRawTableReaderPtr> ytTableReaders = GetYtTableReaders(ytTableTaskRef, clusterConnections);
auto tableDataServiceWriter = MakeIntrusive<TFmrTableDataServiceWriter>(tableId, partId, TableDataService_, Settings_.FmrWriterSettings);
- ParseRecords(ytTableReader, tableDataServiceWriter, Settings_.ParseRecordSettings.DonwloadReadBlockCount, Settings_.ParseRecordSettings.DonwloadReadBlockSize, cancelFlag);
+ for (auto& ytTableReader: ytTableReaders) {
+ ParseRecords(ytTableReader, tableDataServiceWriter, Settings_.ParseRecordSettings.DonwloadReadBlockCount, Settings_.ParseRecordSettings.DonwloadReadBlockSize, cancelFlag);
+ }
tableDataServiceWriter->Flush();
- TTableStats stats = tableDataServiceWriter->GetStats();
+ TTableChunkStats stats = tableDataServiceWriter->GetStats();
auto statistics = TStatistics({{output, stats}});
return statistics;
} catch (...) {
+ YQL_CLOG(ERROR, FastMapReduce) << "Gotten error inside download: " << CurrentExceptionMessage();
return TError(CurrentExceptionMessage());
}
}
@@ -63,16 +65,15 @@ public:
const auto tableId = params.Input.TableId;
const auto tableRanges = params.Input.TableRanges;
- YQL_CLOG(DEBUG, FastMapReduce) << "Uploading " << cluster << '.' << path;
-
auto tableDataServiceReader = MakeIntrusive<TFmrTableDataServiceReader>(tableId, tableRanges, TableDataService_, Settings_.FmrReaderSettings);
YQL_ENSURE(clusterConnections.size() == 1);
- auto ytTableWriter = YtService_->MakeWriter(ytTable, clusterConnections.begin()->second, Settings_.YtWriterSettings);
+ auto ytTableWriter = YtJobService_->MakeWriter(ytTable, clusterConnections.begin()->second, Settings_.YtWriterSettings);
ParseRecords(tableDataServiceReader, ytTableWriter, Settings_.ParseRecordSettings.UploadReadBlockCount, Settings_.ParseRecordSettings.UploadReadBlockSize, cancelFlag);
ytTableWriter->Flush();
return TStatistics();
} catch (...) {
+ YQL_CLOG(ERROR, FastMapReduce) << "Gotten error inside upload: " << CurrentExceptionMessage();
return TError(CurrentExceptionMessage());
}
}
@@ -83,19 +84,25 @@ public:
std::shared_ptr<std::atomic<bool>> cancelFlag
) override {
try {
- const auto inputs = params.Input;
+ const auto taskTableInputRef = params.Input;
const auto output = params.Output;
- YQL_CLOG(DEBUG, FastMapReduce) << "Merging " << inputs.size() << " inputs";
auto& parseRecordSettings = Settings_.ParseRecordSettings;
auto tableDataServiceWriter = MakeIntrusive<TFmrTableDataServiceWriter>(output.TableId, output.PartId, TableDataService_, Settings_.FmrWriterSettings);
auto threadPool = CreateThreadPool(parseRecordSettings.MergeNumThreads);
TMaybe<TMutex> mutex = TMutex();
- for (const auto& inputTableRef : inputs) {
- auto inputTableReader = GetTableInputStream(inputTableRef, clusterConnections);
- threadPool->SafeAddFunc([&, inputTableReader] {
- ParseRecords(inputTableReader, tableDataServiceWriter, parseRecordSettings.MergeReadBlockCount, parseRecordSettings.MergeReadBlockSize, cancelFlag, mutex);
+ for (const auto& inputTableRef : taskTableInputRef.Inputs) {
+ threadPool->SafeAddFunc([&, tableDataServiceWriter] {
+ try {
+ auto inputTableReaders = GetTableInputStreams(inputTableRef, clusterConnections);
+ for (auto& tableReader: inputTableReaders) {
+ ParseRecords(tableReader, tableDataServiceWriter, parseRecordSettings.MergeReadBlockCount, parseRecordSettings.MergeReadBlockSize, cancelFlag, mutex);
+ }
+ } catch (...) {
+ YQL_CLOG(ERROR, FastMapReduce) << CurrentExceptionMessage();
+ throw yexception() << CurrentExceptionMessage();
+ }
});
}
threadPool->Stop();
@@ -103,6 +110,7 @@ public:
tableDataServiceWriter->Flush();
return TStatistics({{output, tableDataServiceWriter->GetStats()}});
} catch (...) {
+ YQL_CLOG(ERROR, FastMapReduce) << "Gotten error inside merge: " << CurrentExceptionMessage();
return TError(CurrentExceptionMessage());
}
}
@@ -112,48 +120,68 @@ public:
const std::unordered_map<TFmrTableId, TClusterConnection>& /* clusterConnections */,
std::shared_ptr<std::atomic<bool>> /* cancelFlag */
) override {
- Cerr << "MAP NOT IMPLEMENTED" << Endl;
YQL_CLOG(ERROR, FastMapReduce) << "MAP NOT IMPLEMENTED";
ythrow yexception() << "Not implemented";
}
private:
- NYT::TRawTableReaderPtr GetTableInputStream(const TTaskTableRef& tableRef, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) const {
- auto ytTable = std::get_if<TYtTableRef>(&tableRef);
+ std::vector<NYT::TRawTableReaderPtr> GetTableInputStreams(const TTaskTableRef& tableRef, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) const {
+ auto ytTableTaskRef = std::get_if<TYtTableTaskRef>(&tableRef);
auto fmrTable = std::get_if<TFmrTableInputRef>(&tableRef);
- if (ytTable) {
- TFmrTableId tableId = {ytTable->Cluster, ytTable->Path};
- auto clusterConnection = clusterConnections.at(tableId);
- return YtService_->MakeReader(*ytTable, clusterConnection, Settings_.YtReaderSettings);
+ if (ytTableTaskRef) {
+ return GetYtTableReaders(*ytTableTaskRef, clusterConnections);
} else if (fmrTable) {
- return MakeIntrusive<TFmrTableDataServiceReader>(fmrTable->TableId, fmrTable->TableRanges, TableDataService_, Settings_.FmrReaderSettings);
+ return {MakeIntrusive<TFmrTableDataServiceReader>(fmrTable->TableId, fmrTable->TableRanges, TableDataService_, Settings_.FmrReaderSettings)};
} else {
ythrow yexception() << "Unsupported table type";
}
}
+ std::vector<NYT::TRawTableReaderPtr> GetYtTableReaders(const TYtTableTaskRef& ytTableTaskRef, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) const {
+ std::vector<NYT::TRawTableReaderPtr> ytTableReaders;
+ if (!ytTableTaskRef.FilePaths.empty()) {
+ // underlying gateway is file, so create readers from filepaths.
+ for (auto& filePath: ytTableTaskRef.FilePaths) {
+ ytTableReaders.emplace_back(YtJobService_->MakeReader(filePath));
+ YQL_CLOG(DEBUG, FastMapReduce) << "Creating reader for file path " << filePath;
+ }
+ } else {
+ for (auto& richPath: ytTableTaskRef.RichPaths) {
+ YQL_ENSURE(richPath.Cluster_);
+
+ // TODO - вместо этого написать нормальные хелперы из RichPath в структуры и назад
+ TStringBuf choppedPath;
+ YQL_ENSURE(TStringBuf(richPath.Path_).AfterPrefix("//", choppedPath));
+ auto fmrTableId = TFmrTableId(*richPath.Cluster_, TString(choppedPath));
+ auto clusterConnection = clusterConnections.at(fmrTableId);
+ ytTableReaders.emplace_back(YtJobService_->MakeReader(richPath, clusterConnection, Settings_.YtReaderSettings));
+ }
+ }
+ return ytTableReaders;
+ }
+
private:
ITableDataService::TPtr TableDataService_;
- IYtService::TPtr YtService_;
+ IYtJobService::TPtr YtJobService_;
TFmrJobSettings Settings_;
};
IFmrJob::TPtr MakeFmrJob(
ITableDataService::TPtr tableDataService,
- IYtService::TPtr ytService,
+ IYtJobService::TPtr ytJobService,
const TFmrJobSettings& settings
) {
- return MakeIntrusive<TFmrJob>(tableDataService, ytService, settings);
+ return MakeIntrusive<TFmrJob>(tableDataService, ytJobService, settings);
}
TJobResult RunJob(
TTask::TPtr task,
ITableDataService::TPtr tableDataService,
- IYtService::TPtr ytService,
+ IYtJobService::TPtr ytJobService,
std::shared_ptr<std::atomic<bool>> cancelFlag
) {
TFmrJobSettings jobSettings = GetJobSettingsFromTask(task);
- IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, jobSettings);
+ IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytJobService, jobSettings);
auto processTask = [job, task, cancelFlag] (auto&& taskParams) {
using T = std::decay_t<decltype(taskParams)>;
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
index 09b17090427..93affcaf4ed 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
@@ -4,7 +4,7 @@
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h>
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h>
#include <yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h>
-#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h>
+#include <yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.h>
#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
namespace NYql::NFmr {
@@ -28,9 +28,9 @@ struct TFmrJobSettings {
ui64 NumThreads = 0;
};
-IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, const TFmrJobSettings& settings = {});
+IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtJobService::TPtr ytJobService, const TFmrJobSettings& settings = {});
-TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag);
+TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtJobService::TPtr ytJobService, std::shared_ptr<std::atomic<bool>> cancelFlag);
TFmrJobSettings GetJobSettingsFromTask(TTask::TPtr task);
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.cpp
index 9b2caf44aa4..48476ef9d91 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.cpp
@@ -16,6 +16,7 @@ TFmrTableDataServiceReader::TFmrTableDataServiceReader(
TableDataService_(tableDataService),
ReadAheadChunks_(settings.ReadAheadChunks)
{
+ SetMinChunkInNewRange();
ReadAhead();
}
@@ -40,12 +41,12 @@ size_t TFmrTableDataServiceReader::DoRead(void* buf, size_t len) {
try {
data = chunk.Data.GetValueSync();
} catch (...) {
- ythrow yexception() << "Error reading chunk: " << chunk.Meta << "Error: " << CurrentExceptionMessage();
+ ythrow yexception() << "Error reading chunk: " << chunk.Meta.ToString() << "Error: " << CurrentExceptionMessage();
}
if (data) {
DataBuffer_.Assign(data->data(), data->size());
} else {
- ythrow yexception() << "No data for chunk:" << chunk.Meta;
+ ythrow yexception() << "No data for chunk:" << chunk.Meta.ToString();
}
PendingChunks_.pop();
@@ -71,10 +72,17 @@ void TFmrTableDataServiceReader::ReadAhead() {
CurrentChunk_++;
} else {
CurrentRange_++;
+ SetMinChunkInNewRange();
}
}
}
+void TFmrTableDataServiceReader::SetMinChunkInNewRange() {
+ if (CurrentRange_ < TableRanges_.size()) {
+ CurrentChunk_ = TableRanges_[0].MinChunk;
+ }
+}
+
bool TFmrTableDataServiceReader::Retry(const TMaybe<ui32>&, const TMaybe<ui64>&, const std::exception_ptr&) {
return false;
}
@@ -85,4 +93,8 @@ bool TFmrTableDataServiceReader::HasRangeIndices() const {
return false;
}
+TString TFmrTableDataServiceReader::TFmrChunkMeta::ToString() const {
+ return TStringBuilder() << TableId << ":" << PartId << ":" << std::to_string(Chunk);
+}
+
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h
index d56bb0eb0d7..822865b58d7 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_reader.h
@@ -13,11 +13,6 @@ struct TFmrReaderSettings {
ui64 ReadAheadChunks = 1;
};
-struct TPendingFmrChunk {
- NThreading::TFuture<TMaybe<TString>> Data;
- TFmrChunkMeta Meta;
-};
-
class TFmrTableDataServiceReader: public NYT::TRawTableReader {
public:
TFmrTableDataServiceReader(
@@ -34,9 +29,24 @@ public:
bool HasRangeIndices() const override;
private:
+ struct TFmrChunkMeta {
+ TString TableId;
+ TString PartId;
+ ui64 Chunk = 0;
+
+ TString ToString() const;
+ };
+
+ struct TPendingFmrChunk {
+ NThreading::TFuture<TMaybe<TString>> Data;
+ TFmrChunkMeta Meta;
+ };
+
size_t DoRead(void* buf, size_t len) override;
void ReadAhead();
+ void SetMinChunkInNewRange();
+
const TString TableId_;
std::vector<TTableRange> TableRanges_;
ITableDataService::TPtr TableDataService_;
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.cpp
index 139e57368cf..9063c48da33 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.cpp
@@ -1,6 +1,7 @@
#include "yql_yt_table_data_service_writer.h"
#include <library/cpp/threading/future/wait/wait.h>
#include <util/string/join.h>
+#include <yql/essentials/utils/log/log.h>
#include <yql/essentials/utils/yql_panic.h>
@@ -27,7 +28,7 @@ void TFmrTableDataServiceWriter::DoWrite(const void* buf, size_t len) {
}
void TFmrTableDataServiceWriter::NotifyRowEnd() {
- ++Rows_;
+ ++CurrentChunkRows_;
if (TableContent_.size() >= MaxRowWeight_) {
ythrow yexception() << "Current row size: " << TableContent_.size() << " is larger than max row weight: " << MaxRowWeight_;
}
@@ -74,17 +75,16 @@ void TFmrTableDataServiceWriter::PutRows() {
}
}
);
- ++ChunkCount_;
DataWeight_ += TableContent_.Size();
+ PartIdChunkStats_.emplace_back(TChunkStats{.Rows = CurrentChunkRows_, .DataWeight = TableContent_.Size()});
+ CurrentChunkRows_ = 0;
+ ++ChunkCount_;
TableContent_.Clear();
}
-TTableStats TFmrTableDataServiceWriter::GetStats() {
- return TTableStats{
- .Chunks = ChunkCount_,
- .Rows = Rows_,
- .DataWeight = DataWeight_,
- };
+TTableChunkStats TFmrTableDataServiceWriter::GetStats() {
+ YQL_CLOG(DEBUG, FastMapReduce) << " Finished writing to table data service for table Id: " << TableId_ << " and part Id " << PartId_ ;
+ return TTableChunkStats{.PartId = PartId_, .PartIdChunkStats = PartIdChunkStats_};
}
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h
index 544b9fc3bd1..ca42498b276 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h
@@ -10,8 +10,6 @@
namespace NYql::NFmr {
-
-
struct TFmrWriterSettings {
ui64 ChunkSize = 1024 * 1024;
ui64 MaxInflightChunks = 1;
@@ -27,7 +25,7 @@ public:
const TFmrWriterSettings& settings = TFmrWriterSettings()
);
- TTableStats GetStats();
+ TTableChunkStats GetStats();
void NotifyRowEnd() override;
@@ -44,7 +42,7 @@ private:
const TString PartId_;
ITableDataService::TPtr TableDataService_;
ui64 DataWeight_ = 0;
- ui64 Rows_ = 0;
+ ui64 CurrentChunkRows_ = 0;
TBuffer TableContent_;
const ui64 ChunkSize_; // size at which we push to table data service
@@ -52,6 +50,7 @@ private:
const ui64 MaxRowWeight_;
ui64 ChunkCount_ = 0;
+ std::vector<TChunkStats> PartIdChunkStats_;
struct TFmrWriterState {
ui64 CurInflightChunks = 0;
diff --git a/yt/yql/providers/yt/fmr/job/interface/ya.make b/yt/yql/providers/yt/fmr/job/interface/ya.make
index 7d256622f7e..c2439772033 100644
--- a/yt/yql/providers/yt/fmr/job/interface/ya.make
+++ b/yt/yql/providers/yt/fmr/job/interface/ya.make
@@ -4,6 +4,10 @@ SRCS(
yql_yt_job.cpp
)
+PEERDIR(
+ yt/yql/providers/yt/fmr/request_options
+)
+
YQL_LAST_ABI_VERSION()
END()
diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp b/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp
index bcf702709c7..5977af62134 100644
--- a/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp
+++ b/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp
@@ -10,7 +10,10 @@
namespace NYql::NFmr {
TTask::TPtr CreateTestTask() {
- auto input = TYtTableRef("test_cluster", "test_path");
+ auto input = TYtTableTaskRef{
+ .RichPaths = {NYT::TRichYPath("test_path").Cluster("test_cluster")},
+ .FilePaths = {"test_file_path"}
+ };
auto output = TFmrTableOutputRef("test_table_id");
auto params = TDownloadTaskParams(input, output);
return MakeTask(ETaskType::Download, "test_task_id", params, "test_session_id");
diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp
index 9c6c0b5b054..610d4f3b7cb 100644
--- a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp
+++ b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp
@@ -42,6 +42,10 @@ public:
return future;
}
+ ui64 GetMaxParallelJobCount() override {
+ return NumThreads_;
+ }
+
void Start() override {
ThreadPool_ = CreateThreadPool(NumThreads_);
}
@@ -52,7 +56,7 @@ public:
private:
THolder<IThreadPool> ThreadPool_;
- i32 NumThreads_;
+ ui64 NumThreads_;
std::function<TJobResult(TTask::TPtr, std::shared_ptr<std::atomic<bool>>)> Function_;
const TIntrusivePtr<IRandomProvider> RandomProvider_;
};
diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h
index 68ef1a5c379..77d41106e30 100644
--- a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h
+++ b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h
@@ -13,7 +13,7 @@ struct TJobResult {
};
struct TFmrJobFactorySettings {
- ui32 NumThreads = 3;
+ ui64 NumThreads = 3;
std::function<TJobResult(TTask::TPtr, std::shared_ptr<std::atomic<bool>>)> Function;
TIntrusivePtr<IRandomProvider> RandomProvider = CreateDefaultRandomProvider();
};
diff --git a/yt/yql/providers/yt/fmr/job_factory/interface/ya.make b/yt/yql/providers/yt/fmr/job_factory/interface/ya.make
index 32ecd264ae5..bef9aae0b2d 100644
--- a/yt/yql/providers/yt/fmr/job_factory/interface/ya.make
+++ b/yt/yql/providers/yt/fmr/job_factory/interface/ya.make
@@ -7,6 +7,7 @@ SRCS(
PEERDIR(
library/cpp/threading/future
yql/essentials/utils
+ yt/yql/providers/yt/fmr/request_options
)
YQL_LAST_ABI_VERSION()
diff --git a/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h
index 27393ff3399..cfb2370d320 100644
--- a/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h
+++ b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h
@@ -13,6 +13,8 @@ public:
virtual ~IFmrJobFactory() = default;
virtual NThreading::TFuture<TTaskState::TPtr> StartJob(TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) = 0;
+
+ virtual ui64 GetMaxParallelJobCount() = 0;
};
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/proto/coordinator.proto b/yt/yql/providers/yt/fmr/proto/coordinator.proto
index 34e16ed26b5..d3a9307dcd1 100644
--- a/yt/yql/providers/yt/fmr/proto/coordinator.proto
+++ b/yt/yql/providers/yt/fmr/proto/coordinator.proto
@@ -8,6 +8,7 @@ message THeartbeatRequest {
uint32 WorkerId = 1;
string VolatileId = 2;
repeated TTaskState TaskStates = 3;
+ uint64 AvailableSlots = 4;
}
message THeartbeatResponse {
@@ -33,6 +34,7 @@ message TStartOperationResponse {
message TGetOperationResponse {
EOperationStatus Status = 1;
repeated TFmrError ErrorMessages = 2;
+ repeated TTableStats TableStats = 3;
}
message TDeleteOperationResponse {
diff --git a/yt/yql/providers/yt/fmr/proto/request_options.proto b/yt/yql/providers/yt/fmr/proto/request_options.proto
index 60c8471846f..92db8d0b0e7 100644
--- a/yt/yql/providers/yt/fmr/proto/request_options.proto
+++ b/yt/yql/providers/yt/fmr/proto/request_options.proto
@@ -18,7 +18,6 @@ enum ETaskStatus {
TASK_IN_PROGRESS = 2;
TASK_FAILED = 3;
TASK_COMPLETED = 4;
- TASK_ABORTED = 5;
}
enum ETaskType {
@@ -26,6 +25,7 @@ enum ETaskType {
TASK_TYPE_DOWNLOAD = 1;
TASK_TYPE_UPLOAD = 2;
TASK_TYPE_MERGE = 3;
+ TASK_TYPE_MAP = 4;
}
enum EFmrComponent {
@@ -47,6 +47,7 @@ message TFmrError {
optional uint32 WorkerId = 4;
optional string TaskId = 5;
optional string OperationId = 6;
+ optional string JobId = 7;
}
message TYtTableRef {
@@ -55,6 +56,11 @@ message TYtTableRef {
optional string FilePath = 3;
}
+message TYtTableTaskRef {
+ repeated string RichPath = 1;
+ repeated string FilePath = 2;
+}
+
message TFmrTableId {
string Id = 1;
}
@@ -85,13 +91,23 @@ message TTableStats {
uint64 DataWeight = 3;
}
-message TFmrStatisticsObject {
- TFmrTableOutputRef Table = 1;
- TTableStats Statistic = 2;
+message TChunkStats {
+ uint64 Rows = 1;
+ uint64 DataWeight = 2;
+}
+
+message TTableChunkStats {
+ string PartId = 1;
+ repeated TChunkStats PartIdChunkStats = 2;
+}
+
+message TStatisticsObject {
+ TFmrTableOutputRef FmrTableOutputRef = 1;
+ TTableChunkStats TableChunkStats = 2;
}
message TStatistics {
- repeated TFmrStatisticsObject OutputTables = 1;
+ repeated TStatisticsObject OutputTables = 1;
}
message TOperationTableRef {
@@ -103,11 +119,15 @@ message TOperationTableRef {
message TTaskTableRef {
oneof TaskTableRef {
- TYtTableRef YtTableRef = 1;
+ TYtTableTaskRef YtTableTaskRef = 1;
TFmrTableInputRef FmrTableInputRef = 2;
}
}
+message TTaskTableInputRef {
+ repeated TTaskTableRef Inputs = 1;
+}
+
message TUploadOperationParams {
TFmrTableRef Input = 1;
TYtTableRef Output = 2;
@@ -124,7 +144,7 @@ message TDownloadOperationParams {
}
message TDownloadTaskParams {
- TYtTableRef Input = 1;
+ TYtTableTaskRef Input = 1;
TFmrTableOutputRef Output = 2;
}
@@ -134,15 +154,28 @@ message TMergeOperationParams {
}
message TMergeTaskParams {
- repeated TTaskTableRef Input = 1;
+ TTaskTableInputRef Input = 1;
TFmrTableOutputRef Output = 2;
}
+message TMapOperationParams {
+ repeated TOperationTableRef Input = 1;
+ repeated TFmrTableRef Output = 2;
+ string Executable = 3;
+}
+
+message TMapTaskParams {
+ TTaskTableInputRef Input = 1;
+ repeated TFmrTableOutputRef Output = 2;
+ string Executable = 3;
+}
+
message TOperationParams {
oneof TOperationParams {
TUploadOperationParams UploadOperationParams = 1;
TDownloadOperationParams DownloadOperationParams = 2;
TMergeOperationParams MergeOperationParams = 3;
+ TMapOperationParams MapOperationParams = 4;
}
}
@@ -151,6 +184,7 @@ message TTaskParams {
TUploadTaskParams UploadTaskParams = 1;
TDownloadTaskParams DownloadTaskParams = 2;
TMergeTaskParams MergeTaskParams = 3;
+ TMapTaskParams MapTaskParams = 4;
}
}
diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/ya.make b/yt/yql/providers/yt/fmr/request_options/proto_helpers/ya.make
index 36f1b3ebcaf..5e1ec3d0437 100644
--- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/ya.make
+++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/ya.make
@@ -5,6 +5,7 @@ SRCS(
)
PEERDIR(
+ yt/cpp/mapreduce/common
yt/yql/providers/yt/fmr/proto
)
diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
index b0c368008dc..34f01f4884d 100644
--- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
+++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
@@ -1,5 +1,7 @@
#include "yql_yt_request_proto_helpers.h"
#include <library/cpp/yson/node/node_io.h>
+#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/interface/serialize.h>
namespace NYql::NFmr {
@@ -17,6 +19,7 @@ NProto::TFmrError FmrErrorToProto(const TFmrError& error) {
if (error.OperationId) {
protoError.SetOperationId(*error.OperationId);
}
+ protoError.SetJobId(*error.JobId);
return protoError;
}
@@ -34,6 +37,7 @@ TFmrError FmrErrorFromProto(const NProto::TFmrError& protoError) {
if (protoError.HasOperationId()) {
fmrError.OperationId = protoError.GetOperationId();
}
+ fmrError.JobId = protoError.GetJobId();
return fmrError;
}
@@ -57,6 +61,32 @@ TYtTableRef YtTableRefFromProto(const NProto::TYtTableRef protoYtTableRef) {
return ytTableRef;
}
+NProto::TYtTableTaskRef YtTableTaskRefToProto(const TYtTableTaskRef& ytTableTaskRef) {
+ NProto::TYtTableTaskRef protoYtTableTaskRef;
+ for (auto& richPath: ytTableTaskRef.RichPaths) {
+ TString serializedRichPath = NYT::NodeToYsonString(NYT::PathToNode(richPath));
+ protoYtTableTaskRef.AddRichPath(serializedRichPath);
+ }
+ for (auto& filePath: ytTableTaskRef.FilePaths) {
+ protoYtTableTaskRef.AddFilePath(filePath);
+ }
+ return protoYtTableTaskRef;
+}
+
+TYtTableTaskRef YtTableTaskRefFromProto(const NProto::TYtTableTaskRef protoYtTableTaskRef) {
+ TYtTableTaskRef ytTableTaskRef;
+ for (auto& serializedPath: protoYtTableTaskRef.GetRichPath()) {
+ auto node = NYT::NodeFromYsonString(serializedPath);
+ NYT::TRichYPath richPath;
+ NYT::Deserialize(richPath, node);
+ ytTableTaskRef.RichPaths.emplace_back(richPath);
+ }
+ for (auto& filePath: protoYtTableTaskRef.GetFilePath()) {
+ ytTableTaskRef.FilePaths.emplace_back(filePath);
+ }
+ return ytTableTaskRef;
+}
+
NProto::TFmrTableId FmrTableIdToProto(const TFmrTableId& fmrTableId) {
NProto::TFmrTableId protoFmrTableId;
protoFmrTableId.SetId(fmrTableId.Id);
@@ -148,14 +178,47 @@ TTableStats TableStatsFromProto(const NProto::TTableStats& protoTableStats) {
};
}
+NProto::TChunkStats ChunkStatsToProto(const TChunkStats& chunkStats) {
+ NProto::TChunkStats protoChunkStats;
+ protoChunkStats.SetRows(chunkStats.Rows);
+ protoChunkStats.SetDataWeight(chunkStats.DataWeight);
+ return protoChunkStats;
+}
+
+TChunkStats ChunkStatsFromProto(const NProto::TChunkStats& protoChunkStats) {
+ return TChunkStats{.Rows = protoChunkStats.GetRows(), .DataWeight = protoChunkStats.GetDataWeight()};
+}
+
+NProto::TTableChunkStats TableChunkStatsToProto(const TTableChunkStats& tableChunkStats) {
+ NProto::TTableChunkStats protoTableChunkStats;
+ protoTableChunkStats.SetPartId(tableChunkStats.PartId);
+ for (auto& chunkStats: tableChunkStats.PartIdChunkStats) {
+ NProto::TChunkStats protoChunkStats = ChunkStatsToProto(chunkStats);
+ auto* curPartIdChunkStats = protoTableChunkStats.AddPartIdChunkStats();
+ curPartIdChunkStats->Swap(&protoChunkStats);
+ }
+ return protoTableChunkStats;
+}
+
+TTableChunkStats TableChunkStatsFromProto(const NProto::TTableChunkStats& protoTableChunkStats) {
+ TTableChunkStats tableChunkStats;
+ tableChunkStats.PartId = protoTableChunkStats.GetPartId();
+ std::vector<TChunkStats> partIdChunkStats;
+ for (auto& stat: protoTableChunkStats.GetPartIdChunkStats()) {
+ partIdChunkStats.emplace_back(ChunkStatsFromProto(stat));
+ }
+ tableChunkStats.PartIdChunkStats = partIdChunkStats;
+ return tableChunkStats;
+}
+
NProto::TStatistics StatisticsToProto(const TStatistics& stats) {
NProto::TStatistics protoStatistics;
- for (auto& [fmrTableOutputRef, tableStat]: stats.OutputTables) {
- NProto::TFmrTableOutputRef protoFmrTableOutputref = FmrTableOutputRefToProto(fmrTableOutputRef);
- NProto::TTableStats protoStats = TableStatsToProto(tableStat);
- NProto::TFmrStatisticsObject statTableObject;
- statTableObject.MutableTable()->Swap(&protoFmrTableOutputref);
- statTableObject.MutableStatistic()->Swap(&protoStats);
+ for (auto& [fmrTableOutputRef, tableChunkStats]: stats.OutputTables) {
+ NProto::TFmrTableOutputRef protoFmrTableOutputRef = FmrTableOutputRefToProto(fmrTableOutputRef);
+ NProto::TTableChunkStats protoTableChunkStats = TableChunkStatsToProto(tableChunkStats);
+ NProto::TStatisticsObject statTableObject;
+ statTableObject.MutableFmrTableOutputRef()->Swap(&protoFmrTableOutputRef);
+ statTableObject.MutableTableChunkStats()->Swap(&protoTableChunkStats);
auto* curOutputTable = protoStatistics.AddOutputTables();
curOutputTable->Swap(&statTableObject);
}
@@ -163,12 +226,12 @@ NProto::TStatistics StatisticsToProto(const TStatistics& stats) {
}
TStatistics StatisticsFromProto(const NProto::TStatistics& protoStats) {
- std::unordered_map<TFmrTableOutputRef, TTableStats> outputTables;
+ std::unordered_map<TFmrTableOutputRef, TTableChunkStats> outputTables;
for (size_t i = 0; i < protoStats.OutputTablesSize(); ++i) {
- NProto::TFmrStatisticsObject protoStatTableObject = protoStats.GetOutputTables(i);
- TFmrTableOutputRef fmrTableOutputRef = FmrTableOutputRefFromProto(protoStatTableObject.GetTable());
- TTableStats tableStats = TableStatsFromProto(protoStatTableObject.GetStatistic());
- outputTables[fmrTableOutputRef] = tableStats;
+ NProto::TStatisticsObject protoStatTableObject = protoStats.GetOutputTables(i);
+ TFmrTableOutputRef fmrTableOutputRef = FmrTableOutputRefFromProto(protoStatTableObject.GetFmrTableOutputRef());
+ TTableChunkStats tableChunkStats = TableChunkStatsFromProto(protoStatTableObject.GetTableChunkStats());
+ outputTables[fmrTableOutputRef] = tableChunkStats;
}
return TStatistics{.OutputTables = outputTables};
}
@@ -193,14 +256,14 @@ TOperationTableRef OperationTableRefFromProto(const NProto::TOperationTableRef&
} else {
tableRef = FmrTableRefFromProto(protoOperationTableRef.GetFmrTableRef());
}
- return {tableRef};
+ return tableRef;
}
NProto::TTaskTableRef TaskTableRefToProto(const TTaskTableRef& taskTableRef) {
NProto::TTaskTableRef protoTaskTableRef;
- if (auto* ytTableRefPtr = std::get_if<TYtTableRef>(&taskTableRef)) {
- NProto::TYtTableRef protoYtTableRef = YtTableRefToProto(*ytTableRefPtr);
- protoTaskTableRef.MutableYtTableRef()->Swap(&protoYtTableRef);
+ if (auto* ytTableTaskRefPtr = std::get_if<TYtTableTaskRef>(&taskTableRef)) {
+ NProto::TYtTableTaskRef protoYtTableTaskRef = YtTableTaskRefToProto(*ytTableTaskRefPtr);
+ protoTaskTableRef.MutableYtTableTaskRef()->Swap(&protoYtTableTaskRef);
} else {
auto* fmrTableInputRefPtr = std::get_if<TFmrTableInputRef>(&taskTableRef);
NProto::TFmrTableInputRef protoFmrTableInputRef = FmrTableInputRefToProto(*fmrTableInputRefPtr);
@@ -211,13 +274,31 @@ NProto::TTaskTableRef TaskTableRefToProto(const TTaskTableRef& taskTableRef) {
}
TTaskTableRef TaskTableRefFromProto(const NProto::TTaskTableRef& protoTaskTableRef) {
- std::variant<TYtTableRef, TFmrTableInputRef> tableRef;
- if (protoTaskTableRef.HasYtTableRef()) {
- tableRef = YtTableRefFromProto(protoTaskTableRef.GetYtTableRef());
+ std::variant<TYtTableTaskRef, TFmrTableInputRef> tableRef;
+ if (protoTaskTableRef.HasYtTableTaskRef()) {
+ tableRef = YtTableTaskRefFromProto(protoTaskTableRef.GetYtTableTaskRef());
} else {
tableRef = FmrTableInputRefFromProto(protoTaskTableRef.GetFmrTableInputRef());
}
- return {tableRef};
+ return tableRef;
+}
+
+NProto::TTaskTableInputRef TaskTableInputRefToProto(const TTaskTableInputRef& taskTableInputRef) {
+ NProto::TTaskTableInputRef protoTaskTableInputRef;
+ for (auto& taskTableRef: taskTableInputRef.Inputs) {
+ auto protoTaskTableRef = TaskTableRefToProto(taskTableRef);
+ auto* curInput = protoTaskTableInputRef.AddInputs();
+ curInput->Swap(&protoTaskTableRef);
+ }
+ return protoTaskTableInputRef;
+}
+
+TTaskTableInputRef TaskTableInputRefFromProto(const NProto::TTaskTableInputRef& protoTaskTableInputRef) {
+ std::vector<TTaskTableRef> inputs;
+ for (auto& protoTaskTableRef: protoTaskTableInputRef.GetInputs()) {
+ inputs.emplace_back(TaskTableRefFromProto(protoTaskTableRef));
+ }
+ return TTaskTableInputRef{.Inputs = inputs};
}
NProto::TUploadOperationParams UploadOperationParamsToProto(const TUploadOperationParams& uploadOperationParams) {
@@ -229,6 +310,13 @@ NProto::TUploadOperationParams UploadOperationParamsToProto(const TUploadOperati
return protoUploadOperationParams;
}
+TUploadOperationParams UploadOperationParamsFromProto(const NProto::TUploadOperationParams& protoUploadOperationParams) {
+ return TUploadOperationParams(
+ FmrTableRefFromProto(protoUploadOperationParams.GetInput()),
+ YtTableRefFromProto(protoUploadOperationParams.GetOutput())
+ );
+}
+
NProto::TUploadTaskParams UploadTaskParamsToProto(const TUploadTaskParams& uploadTaskParams) {
NProto::TUploadTaskParams protoUploadTaskParams;
auto input = FmrTableInputRefToProto(uploadTaskParams.Input);
@@ -238,13 +326,6 @@ NProto::TUploadTaskParams UploadTaskParamsToProto(const TUploadTaskParams& uploa
return protoUploadTaskParams;
}
-TUploadOperationParams UploadOperationParamsFromProto(const NProto::TUploadOperationParams& protoUploadOperationParams) {
- return TUploadOperationParams(
- FmrTableRefFromProto(protoUploadOperationParams.GetInput()),
- YtTableRefFromProto(protoUploadOperationParams.GetOutput())
- );
-}
-
TUploadTaskParams UploadTaskParamsFromProto(const NProto::TUploadTaskParams& protoUploadTaskParams) {
TUploadTaskParams uploadTaskParams;
uploadTaskParams.Input = FmrTableInputRefFromProto(protoUploadTaskParams.GetInput());
@@ -261,25 +342,25 @@ NProto::TDownloadOperationParams DownloadOperationParamsToProto(const TDownloadO
return protoDownloadOperationParams;
}
+TDownloadOperationParams DownloadOperationParamsFromProto(const NProto::TDownloadOperationParams& protoDownloadOperationParams) {
+ return TDownloadOperationParams(
+ YtTableRefFromProto(protoDownloadOperationParams.GetInput()),
+ FmrTableRefFromProto(protoDownloadOperationParams.GetOutput())
+ );
+}
+
NProto::TDownloadTaskParams DownloadTaskParamsToProto(const TDownloadTaskParams& downloadTaskParams) {
NProto::TDownloadTaskParams protoDownloadTaskParams;
- auto input = YtTableRefToProto(downloadTaskParams.Input);
+ auto input = YtTableTaskRefToProto(downloadTaskParams.Input);
auto output = FmrTableOutputRefToProto(downloadTaskParams.Output);
protoDownloadTaskParams.MutableInput()->Swap(&input);
protoDownloadTaskParams.MutableOutput()->Swap(&output);
return protoDownloadTaskParams;
}
-TDownloadOperationParams DownloadOperationParamsFromProto(const NProto::TDownloadOperationParams& protoDownloadOperationParams) {
- return TDownloadOperationParams(
- YtTableRefFromProto(protoDownloadOperationParams.GetInput()),
- FmrTableRefFromProto(protoDownloadOperationParams.GetOutput())
- );
-}
-
TDownloadTaskParams DownloadTaskParamsFromProto(const NProto::TDownloadTaskParams& protoDownloadTaskParams) {
TDownloadTaskParams downloadTaskParams;
- downloadTaskParams.Input = YtTableRefFromProto(protoDownloadTaskParams.GetInput());
+ downloadTaskParams.Input = YtTableTaskRefFromProto(protoDownloadTaskParams.GetInput());
downloadTaskParams.Output = FmrTableOutputRefFromProto(protoDownloadTaskParams.GetOutput());
return downloadTaskParams;
}
@@ -296,18 +377,6 @@ NProto::TMergeOperationParams MergeOperationParamsToProto(const TMergeOperationP
return protoMergeOperationParams;
}
-NProto::TMergeTaskParams MergeTaskParamsToProto(const TMergeTaskParams& mergeTaskParams) {
- NProto::TMergeTaskParams protoMergeTaskParams;
- for (size_t i = 0; i < mergeTaskParams.Input.size(); ++i) {
- auto inputTable = TaskTableRefToProto(mergeTaskParams.Input[i]);
- auto* curInput = protoMergeTaskParams.AddInput();
- curInput->Swap(&inputTable);
- }
- auto outputTable = FmrTableOutputRefToProto(mergeTaskParams.Output);
- protoMergeTaskParams.MutableOutput()->Swap(&outputTable);
- return protoMergeTaskParams;
-}
-
TMergeOperationParams MergeOperationParamsFromProto(const NProto::TMergeOperationParams& protoMergeOperationParams) {
TMergeOperationParams mergeOperationParams(
{},
@@ -320,18 +389,72 @@ TMergeOperationParams MergeOperationParamsFromProto(const NProto::TMergeOperatio
return mergeOperationParams;
}
+NProto::TMergeTaskParams MergeTaskParamsToProto(const TMergeTaskParams& mergeTaskParams) {
+ NProto::TMergeTaskParams protoMergeTaskParams;
+ auto inputTables = TaskTableInputRefToProto(mergeTaskParams.Input);
+ protoMergeTaskParams.MutableInput()->Swap(&inputTables);
+ auto outputTable = FmrTableOutputRefToProto(mergeTaskParams.Output);
+ protoMergeTaskParams.MutableOutput()->Swap(&outputTable);
+ return protoMergeTaskParams;
+}
+
TMergeTaskParams MergeTaskParamsFromProto(const NProto::TMergeTaskParams& protoMergeTaskParams) {
TMergeTaskParams mergeTaskParams;
- std::vector<TTaskTableRef> input;
- for (size_t i = 0; i < protoMergeTaskParams.InputSize(); ++i) {
- TTaskTableRef inputTable = TaskTableRefFromProto(protoMergeTaskParams.GetInput(i));
- input.emplace_back(inputTable);
- }
- mergeTaskParams.Input = input;
+ mergeTaskParams.Input = TaskTableInputRefFromProto(protoMergeTaskParams.GetInput());
mergeTaskParams.Output = FmrTableOutputRefFromProto(protoMergeTaskParams.GetOutput());
return mergeTaskParams;
}
+NProto::TMapOperationParams MapOperationParamsToProto(const TMapOperationParams& mapOperationParams) {
+ NProto::TMapOperationParams protoMapOperationParams;
+ for (auto& operationTableRef: mapOperationParams.Input) {
+ auto protoOperationTableRef = OperationTableRefToProto(operationTableRef);
+ protoMapOperationParams.AddInput()->Swap(&protoOperationTableRef);
+ }
+ for (auto& fmrTableRef: mapOperationParams.Output) {
+ auto protoFmrTableRef = FmrTableRefToProto(fmrTableRef);
+ protoMapOperationParams.AddOutput()->Swap(&protoFmrTableRef);
+ }
+ protoMapOperationParams.SetExecutable(mapOperationParams.Executable);
+ return protoMapOperationParams;
+}
+
+TMapOperationParams MapOperationParamsFromProto(const NProto::TMapOperationParams& protoMapOperationParams) {
+ std::vector<TOperationTableRef> inputTables;
+ std::vector<TFmrTableRef> outputTables;
+ for (auto& protoOperationTableRef: protoMapOperationParams.GetInput()) {
+ inputTables.emplace_back(OperationTableRefFromProto(protoOperationTableRef));
+ }
+ for (auto& protoFmrTableRef: protoMapOperationParams.GetOutput()) {
+ outputTables.emplace_back(FmrTableRefFromProto(protoFmrTableRef));
+ }
+ return TMapOperationParams{.Input = inputTables, .Output = outputTables, .Executable = protoMapOperationParams.GetExecutable()};
+}
+
+NProto::TMapTaskParams MapTaskParamsToProto(const TMapTaskParams& mapTaskParams) {
+ NProto::TMapTaskParams protoMapTaskParams;
+ auto protoTaskTableInputRef = TaskTableInputRefToProto(mapTaskParams.Input);
+ protoMapTaskParams.MutableInput()->Swap(&protoTaskTableInputRef);
+ for (auto& fmrTableOutputRef: mapTaskParams.Output) {
+ auto protoFmrTableOutputRef = FmrTableOutputRefToProto(fmrTableOutputRef);
+ protoMapTaskParams.AddOutput()->Swap(&protoFmrTableOutputRef);
+ }
+ protoMapTaskParams.SetExecutable(mapTaskParams.Executable);
+ return protoMapTaskParams;
+}
+
+TMapTaskParams MapTaskParamsFromProto(const NProto::TMapTaskParams& protoMapTaskParams) {
+ TMapTaskParams mapTaskParams;
+ mapTaskParams.Input = TaskTableInputRefFromProto(protoMapTaskParams.GetInput());
+ std::vector<TFmrTableOutputRef> outputTables;
+ for (auto& protoFmrTableOutputRef: protoMapTaskParams.GetOutput()) {
+ outputTables.emplace_back(FmrTableOutputRefFromProto(protoFmrTableOutputRef));
+ }
+ mapTaskParams.Output = outputTables;
+ mapTaskParams.Executable = protoMapTaskParams.GetExecutable();
+ return mapTaskParams;
+}
+
NProto::TOperationParams OperationParamsToProto(const TOperationParams& operationParams) {
NProto::TOperationParams protoOperationParams;
if (auto* uploadOperationParamsPtr = std::get_if<TUploadOperationParams>(&operationParams)) {
@@ -340,14 +463,29 @@ NProto::TOperationParams OperationParamsToProto(const TOperationParams& operatio
} else if (auto* downloadOperationParamsPtr = std::get_if<TDownloadOperationParams>(&operationParams)) {
NProto::TDownloadOperationParams protoDownloadOperationParams = DownloadOperationParamsToProto(*downloadOperationParamsPtr);
protoOperationParams.MutableDownloadOperationParams()->Swap(&protoDownloadOperationParams);
- } else {
- auto* mergeOperationParamsPtr = std::get_if<TMergeOperationParams>(&operationParams);
+ } else if (auto* mergeOperationParamsPtr = std::get_if<TMergeOperationParams>(&operationParams)) {
NProto::TMergeOperationParams protoMergeOperationParams = MergeOperationParamsToProto(*mergeOperationParamsPtr);
protoOperationParams.MutableMergeOperationParams()->Swap(&protoMergeOperationParams);
+ } else {
+ auto* mapOperationParamsPtr = std::get_if<TMapOperationParams>(&operationParams);
+ NProto::TMapOperationParams protoMapOperationParams = MapOperationParamsToProto(*mapOperationParamsPtr);
+ protoOperationParams.MutableMapOperationParams()->Swap(&protoMapOperationParams);
}
return protoOperationParams;
}
+TOperationParams OperationParamsFromProto(const NProto::TOperationParams& protoOperationParams) {
+ if (protoOperationParams.HasDownloadOperationParams()) {
+ return DownloadOperationParamsFromProto(protoOperationParams.GetDownloadOperationParams());
+ } else if (protoOperationParams.HasUploadOperationParams()) {
+ return UploadOperationParamsFromProto(protoOperationParams.GetUploadOperationParams());
+ } else if (protoOperationParams.HasMergeOperationParams()) {
+ return MergeOperationParamsFromProto(protoOperationParams.GetMergeOperationParams());
+ } else {
+ return MapOperationParamsFromProto(protoOperationParams.GetMapOperationParams());
+ }
+}
+
NProto::TTaskParams TaskParamsToProto(const TTaskParams& taskParams) {
NProto::TTaskParams protoTaskParams;
if (auto* uploadTaskParamsPtr = std::get_if<TUploadTaskParams>(&taskParams)) {
@@ -356,22 +494,15 @@ NProto::TTaskParams TaskParamsToProto(const TTaskParams& taskParams) {
} else if (auto* downloadTaskParamsPtr = std::get_if<TDownloadTaskParams>(&taskParams)) {
NProto::TDownloadTaskParams protoDownloadTaskParams = DownloadTaskParamsToProto(*downloadTaskParamsPtr);
protoTaskParams.MutableDownloadTaskParams()->Swap(&protoDownloadTaskParams);
- } else {
- auto* mergeTaskParamsPtr = std::get_if<TMergeTaskParams>(&taskParams);
+ } else if (auto* mergeTaskParamsPtr = std::get_if<TMergeTaskParams>(&taskParams)) {
NProto::TMergeTaskParams protoMergeTaskParams = MergeTaskParamsToProto(*mergeTaskParamsPtr);
protoTaskParams.MutableMergeTaskParams()->Swap(&protoMergeTaskParams);
- }
- return protoTaskParams;
-}
-
-TOperationParams OperationParamsFromProto(const NProto::TOperationParams& protoOperationParams) {
- if (protoOperationParams.HasDownloadOperationParams()) {
- return DownloadOperationParamsFromProto(protoOperationParams.GetDownloadOperationParams());
- } else if (protoOperationParams.HasUploadOperationParams()) {
- return UploadOperationParamsFromProto(protoOperationParams.GetUploadOperationParams());
} else {
- return MergeOperationParamsFromProto(protoOperationParams.GetMergeOperationParams());
+ auto* mapTaskParamsPtr = std::get_if<TMapTaskParams>(&taskParams);
+ NProto::TMapTaskParams protoMapTaskParams = MapTaskParamsToProto(*mapTaskParamsPtr);
+ protoTaskParams.MutableMapTaskParams()->Swap(&protoMapTaskParams);
}
+ return protoTaskParams;
}
TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams) {
@@ -380,8 +511,10 @@ TTaskParams TaskParamsFromProto(const NProto::TTaskParams& protoTaskParams) {
taskParams = DownloadTaskParamsFromProto(protoTaskParams.GetDownloadTaskParams());
} else if (protoTaskParams.HasUploadTaskParams()) {
taskParams = UploadTaskParamsFromProto(protoTaskParams.GetUploadTaskParams());
- } else {
+ } else if (protoTaskParams.HasMergeTaskParams()) {
taskParams = MergeTaskParamsFromProto(protoTaskParams.GetMergeTaskParams());
+ } else {
+ taskParams = MapTaskParamsFromProto(protoTaskParams.GetMapTaskParams());
}
return taskParams;
}
@@ -414,7 +547,7 @@ NProto::TTask TaskToProto(const TTask& task) {
protoTask.MutableTaskParams()->Swap(&taskParams);
protoTask.SetSessionId(task.SessionId);
protoTask.SetNumRetries(task.NumRetries);
- auto clusterConnections = *protoTask.MutableClusterConnections();
+ auto& clusterConnections = *protoTask.MutableClusterConnections();
for (auto& [tableName, conn]: task.ClusterConnections) {
clusterConnections[tableName.Id] = ClusterConnectionToProto(conn);
}
diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h
index 3f7ee93725b..cfeb5a424e9 100644
--- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h
+++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h
@@ -13,6 +13,14 @@ NProto::TYtTableRef YtTableRefToProto(const TYtTableRef& ytTableRef);
TYtTableRef YtTableRefFromProto(const NProto::TYtTableRef protoYtTableRef);
+NProto::TYtTableTaskRef YtTableTaskRefToProto(const TYtTableTaskRef& ytTableTaskRef);
+
+TYtTableTaskRef YtTableTaskRefFromProto(const NProto::TYtTableTaskRef protoYtTableTaskRef);
+
+NProto::TFmrTableId FmrTableIdToProto(const TFmrTableId& fmrTableId);
+
+TFmrTableId FmrTableIdFromProto(const NProto::TFmrTableId& protoFmrTableId);
+
NProto::TFmrTableRef FmrTableRefToProto(const TFmrTableRef& fmrTableRef);
TFmrTableRef FmrTableRefFromProto(const NProto::TFmrTableRef protoFmrTableRef);
@@ -33,6 +41,14 @@ NProto::TTableStats TableStatsToProto(const TTableStats& tableStats);
TTableStats TableStatsFromProto(const NProto::TTableStats& protoTableStats);
+NProto::TChunkStats ChunkStatsToProto(const TChunkStats& chunkStats);
+
+TChunkStats ChunkStatsFromProto(const NProto::TChunkStats& protoChunkStats);
+
+NProto::TTableChunkStats TableChunkStatsToProto(const TTableChunkStats& tableChunkStats);
+
+TTableChunkStats TableChunkStatsFromProto(const NProto::TTableChunkStats& protoTableChunkStats);
+
NProto::TStatistics StatisticsToProto(const TStatistics& stats);
TStatistics StatisticsFromProto(const NProto::TStatistics& protoStats);
@@ -45,6 +61,10 @@ NProto::TTaskTableRef TaskTableRefToProto(const TTaskTableRef& taskTableRef);
TTaskTableRef TaskTableRefFromProto(const NProto::TTaskTableRef& protoTaskTableRef);
+NProto::TTaskTableInputRef TaskTableInputRefToProto(const TTaskTableInputRef& taskTableInputRef);
+
+TTaskTableInputRef TaskTableInputRefFromProto(const NProto::TTaskTableInputRef& protoTaskTableInputRef);
+
NProto::TUploadOperationParams UploadOperationParamsToProto(const TUploadOperationParams& uploadOperationParams);
TUploadOperationParams UploadOperationParamsFromProto(const NProto::TUploadOperationParams& protoUploadOperationParams);
@@ -69,6 +89,14 @@ NProto::TMergeTaskParams MergeTaskParamsToProto(const TMergeTaskParams& mergeTas
TMergeTaskParams MergeTaskParamsFromProto(const NProto::TMergeTaskParams& protoMergeTaskParams);
+NProto::TMapOperationParams MapOperationParamsToProto(const TMapOperationParams& mapOperationParams);
+
+TMapOperationParams MapOperationParamsFromProto(const NProto::TMapOperationParams& protoMapOperationParams);
+
+NProto::TMapTaskParams MapTaskParamsToProto(const TMapTaskParams& mapTaskParams);
+
+TMapTaskParams MapTaskParamsFromProto(const NProto::TMapTaskParams& protoMapTaskParams);
+
NProto::TOperationParams OperationParamsToProto(const TOperationParams& operationParams);
TOperationParams OperationParamsFromProto(const NProto::TOperationParams& protoOperationParams);
diff --git a/yt/yql/providers/yt/fmr/request_options/ya.make b/yt/yql/providers/yt/fmr/request_options/ya.make
index 4e74eb8b185..ec57848ad29 100644
--- a/yt/yql/providers/yt/fmr/request_options/ya.make
+++ b/yt/yql/providers/yt/fmr/request_options/ya.make
@@ -7,6 +7,8 @@ SRCS(
PEERDIR(
library/cpp/yson/node
library/cpp/threading/future
+ yt/cpp/mapreduce/common
+ yt/cpp/mapreduce/interface
yql/essentials/public/issue
)
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
index 6da844918da..a013f67329a 100644
--- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
@@ -1,4 +1,5 @@
#include "yql_yt_request_options.h"
+#include <yt/cpp/mapreduce/common/helpers.h>
namespace NYql::NFmr {
@@ -18,10 +19,6 @@ TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, co
return MakeIntrusive<TTaskState>(taskStatus, taskId, taskErrorMessage, stats);
}
-TString TFmrChunkMeta::ToString() const {
- return TStringBuilder() << TableId << ":" << PartId << ":" << std::to_string(Chunk);
-}
-
} // namespace NYql::NFmr
template<>
@@ -41,11 +38,56 @@ void Out<NYql::NFmr::TFmrError>(IOutputStream& out, const NYql::NFmr::TFmrError&
}
template<>
-void Out<NYql::NFmr::TFmrChunkMeta>(IOutputStream& out, const NYql::NFmr::TFmrChunkMeta& meta) {
- out << meta.ToString();
+void Out<NYql::NFmr::TTableStats>(IOutputStream& out, const NYql::NFmr::TTableStats& tableStats) {
+ out << tableStats.Chunks << " chunks, " << tableStats.Rows << " rows, " << tableStats.DataWeight << " data weight";
}
template<>
-void Out<NYql::NFmr::TTableStats>(IOutputStream& out, const NYql::NFmr::TTableStats& tableStats) {
- out << tableStats.Chunks << " chunks, " << tableStats.Rows << " rows, " << tableStats.DataWeight << " data weight";
+void Out<NYql::NFmr::TTableRange>(IOutputStream& out, const NYql::NFmr::TTableRange& range) {
+ out << "TableRange with part id: " << range.PartId << " , min chunk: " << range.MinChunk << " , max chunk: " << range.MaxChunk << "\n";
+}
+
+template<>
+void Out<NYql::NFmr::TFmrTableInputRef>(IOutputStream& out, const NYql::NFmr::TFmrTableInputRef& inputRef) {
+ out << "FmrTableInputRef consisting of " << inputRef.TableRanges.size() << " table ranges:\n";
+ out << "TableId: " << inputRef.TableId << "\n";
+ for (auto& range: inputRef.TableRanges) {
+ out << range;
+ }
+}
+
+template<>
+void Out<NYql::NFmr::TYtTableTaskRef>(IOutputStream& out, const NYql::NFmr::TYtTableTaskRef& ytTableTaskRef) {
+ if (!ytTableTaskRef.FilePaths.empty()) {
+ out << "YtTableTaskRef consisting of " << ytTableTaskRef.FilePaths.size() << " file paths:\n";
+ for (auto& filePath: ytTableTaskRef.FilePaths) {
+ out << filePath << " ";
+ }
+ } else {
+ out << "YtTableTaskRef consisting of " << ytTableTaskRef.RichPaths.size() << " rich yt paths:\n";
+ for (auto& richPath: ytTableTaskRef.RichPaths) {
+ out << NodeToYsonString(NYT::PathToNode(richPath)) << "\n";
+ }
+ }
+}
+
+template<>
+void Out<NYql::NFmr::TTaskTableRef>(IOutputStream& out, const NYql::NFmr::TTaskTableRef& taskTableRef) {
+ if (auto* ytTableTaskRef = std::get_if<NYql::NFmr::TYtTableTaskRef>(&taskTableRef)) {
+ out << *ytTableTaskRef;
+ } else {
+ out << std::get<NYql::NFmr::TFmrTableInputRef>(taskTableRef);
+ }
+}
+
+template<>
+void Out<NYql::NFmr::TTaskTableInputRef>(IOutputStream& out, const NYql::NFmr::TTaskTableInputRef& taskTableInputRef) {
+ for (auto& taskTableRef: taskTableInputRef.Inputs) {
+ out << taskTableRef;
+ }
+}
+
+template<>
+void Out<NYql::NFmr::TChunkStats>(IOutputStream& out, const NYql::NFmr::TChunkStats& chunkStats) {
+ out << chunkStats.Rows << " rows " << chunkStats.DataWeight << " dataWeight\n";
}
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
index 743c20ea339..53bf1d3d4b5 100644
--- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
@@ -7,6 +7,8 @@
#include <util/string/builder.h>
#include <vector>
+#include <yt/cpp/mapreduce/interface/common.h>
+
namespace NYql::NFmr {
enum class EOperationStatus {
@@ -44,7 +46,8 @@ enum class EFmrComponent {
enum class EFmrErrorReason {
ReasonUnknown,
- UserError // TODO Add more reasons
+ UserError
+ // TODO - return FallbackQuery or FallbackOperation instead of UserError, pass info to gateway.
};
struct TFmrError {
@@ -66,9 +69,18 @@ struct TYtTableRef {
TString Cluster;
TMaybe<TString> FilePath = Nothing();
+ // TODO - maybe just use TRichYPath here also instead of Path and Cluster?
+
bool operator == (const TYtTableRef&) const = default;
};
+struct TYtTableTaskRef {
+ std::vector<NYT::TRichYPath> RichPaths;
+ std::vector<TString> FilePaths;
+
+ bool operator == (const TYtTableTaskRef&) const = default;
+}; // corresponds to a partition of several yt input tables.
+
struct TFmrTableId {
TString Id;
@@ -83,26 +95,21 @@ struct TFmrTableId {
struct TFmrTableRef {
TFmrTableId FmrTableId;
+ bool operator == (const TFmrTableRef&) const = default;
};
struct TTableRange {
TString PartId;
ui64 MinChunk = 0;
ui64 MaxChunk = 1;
-};
-
-struct TFmrChunkMeta {
- TString TableId;
- TString PartId;
- ui64 Chunk = 0;
-
- TString ToString() const;
-};
+ bool operator == (const TTableRange&) const = default;
+}; // Corresnponds to range [MinChunk, MaxChunk)
struct TFmrTableInputRef {
TString TableId;
std::vector<TTableRange> TableRanges;
-};
+ bool operator == (const TFmrTableInputRef&) const = default;
+}; // Corresponds to part of table with fixed TableId but several PartIds, Empty TablesRanges means that this table is not present in task.
struct TFmrTableOutputRef {
TString TableId;
@@ -118,6 +125,18 @@ struct TTableStats {
bool operator == (const TTableStats&) const = default;
};
+struct TChunkStats {
+ ui64 Rows = 0;
+ ui64 DataWeight = 0;
+ bool operator == (const TChunkStats&) const = default;
+};
+
+struct TTableChunkStats {
+ TString PartId;
+ std::vector<TChunkStats> PartIdChunkStats;
+ bool operator == (const TTableChunkStats&) const = default;
+}; // detailed statistics for all chunks in partition
+
} // namespace NYql::NFmr
namespace std {
@@ -147,12 +166,12 @@ namespace std {
namespace NYql::NFmr {
struct TStatistics {
- std::unordered_map<TFmrTableOutputRef, TTableStats> OutputTables;
+ std::unordered_map<TFmrTableOutputRef, TTableChunkStats> OutputTables;
};
using TOperationTableRef = std::variant<TYtTableRef, TFmrTableRef>;
-using TTaskTableRef = std::variant<TYtTableRef, TFmrTableInputRef>;
+using TTaskTableRef = std::variant<TYtTableTaskRef, TFmrTableInputRef>;
struct TUploadOperationParams {
TFmrTableRef Input;
@@ -170,7 +189,7 @@ struct TDownloadOperationParams {
};
struct TDownloadTaskParams {
- TYtTableRef Input;
+ TYtTableTaskRef Input;
TFmrTableOutputRef Output;
};
@@ -179,8 +198,12 @@ struct TMergeOperationParams {
TFmrTableRef Output;
};
+struct TTaskTableInputRef {
+ std::vector<TTaskTableRef> Inputs;
+}; // Corresponds to task input tables, which can consist parts of either fmr or yt input tables.
+
struct TMergeTaskParams {
- std::vector<TTaskTableRef> Input;
+ TTaskTableInputRef Input;
TFmrTableOutputRef Output;
};
@@ -191,7 +214,7 @@ struct TMapOperationParams {
};
struct TMapTaskParams {
- std::vector<TTaskTableRef> Input;
+ TTaskTableInputRef Input;
std::vector<TFmrTableOutputRef> Output;
TString Executable;
};
diff --git a/yt/yql/providers/yt/fmr/table_data_service/local/yql_yt_table_data_service_local.cpp b/yt/yql/providers/yt/fmr/table_data_service/local/yql_yt_table_data_service_local.cpp
index cd1e1a21699..b901fd3de89 100644
--- a/yt/yql/providers/yt/fmr/table_data_service/local/yql_yt_table_data_service_local.cpp
+++ b/yt/yql/providers/yt/fmr/table_data_service/local/yql_yt_table_data_service_local.cpp
@@ -1,5 +1,7 @@
#include "yql_yt_table_data_service_local.h"
+#include <util/system/mutex.h>
#include <yt/yql/providers/yt/fmr/utils/yql_yt_table_data_service_key.h>
+#include <yql/essentials/utils/log/log.h>
namespace NYql::NFmr {
@@ -12,6 +14,8 @@ public:
}
NThreading::TFuture<void> Put(const TString& key, const TString& value) {
+ TGuard<TMutex> guard(Mutex_);
+ YQL_CLOG(TRACE, FastMapReduce) << "Putting key " << key << " to local table data service";
auto& map = Data_[std::hash<TString>()(key) % NumParts_];
auto it = map.find(key);
if (it != map.end()) {
@@ -22,6 +26,8 @@ public:
}
NThreading::TFuture<TMaybe<TString>> Get(const TString& key) {
+ TGuard<TMutex> guard(Mutex_);
+ YQL_CLOG(TRACE, FastMapReduce) << "Getting key " << key << " from local table data service";
TMaybe<TString> value = Nothing();
auto& map = Data_[std::hash<TString>()(key) % NumParts_];
auto it = map.find(key);
@@ -32,6 +38,8 @@ public:
}
NThreading::TFuture<void> Delete(const TString& key) {
+ TGuard<TMutex> guard(Mutex_);
+ YQL_CLOG(TRACE, FastMapReduce) << "Deleting key " << key << " from local table data service";
auto& map = Data_[std::hash<TString>()(key) % NumParts_];
auto it = map.find(key);
if (it == map.end()) {
@@ -44,6 +52,7 @@ public:
private:
std::vector<std::unordered_map<TString, TString>> Data_;
const ui32 NumParts_;
+ TMutex Mutex_ = TMutex();
};
} // namespace
diff --git a/yt/yql/providers/yt/fmr/utils/ut/ya.make b/yt/yql/providers/yt/fmr/utils/ut/ya.make
index 2e7c9b94a65..b2c94c2502a 100644
--- a/yt/yql/providers/yt/fmr/utils/ut/ya.make
+++ b/yt/yql/providers/yt/fmr/utils/ut/ya.make
@@ -7,7 +7,7 @@ SRCS(
PEERDIR(
yt/yql/providers/yt/fmr/utils
yt/yql/providers/yt/fmr/request_options
- yt/yql/providers/yt/fmr/yt_service/mock
+ yt/yql/providers/yt/fmr/yt_job_service/mock
yql/essentials/parser/pg_wrapper
yql/essentials/parser/pg_wrapper/interface
yql/essentials/public/udf
diff --git a/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp b/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp
index a0736aa96f3..d642ca7a6f8 100644
--- a/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp
+++ b/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp
@@ -1,9 +1,9 @@
#include <library/cpp/testing/unittest/registar.h>
+#include <yt/cpp/mapreduce/common/helpers.h>
#include <yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h>
-#include <yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h>
+#include <yt/yql/providers/yt/fmr/yt_job_service/mock/yql_yt_job_service_mock.h>
#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
-#include <yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h>
using namespace NYql::NFmr;
@@ -12,13 +12,15 @@ Y_UNIT_TEST_SUITE(ParseRecordTests) {
TString inputYsonContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};\n"
"{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};\n";
TYtTableRef testYtTable = TYtTableRef{.Path = "test_path", .Cluster = "hahn"};
- std::unordered_map<TYtTableRef, TString> inputTables{{testYtTable, inputYsonContent}};
+ auto richPath = NYT::TRichYPath("test_path").Cluster("test_cluster");
+ std::unordered_map<TString, TString> inputTables{{NYT::NodeToCanonicalYsonString(NYT::PathToNode(richPath)), inputYsonContent}};
+
std::unordered_map<TYtTableRef, TString> outputTables;
- auto ytService = MakeMockYtService(inputTables, outputTables);
+ auto ytJobService = MakeMockYtJobService(inputTables, outputTables);
- auto reader = ytService->MakeReader(testYtTable, TClusterConnection());
- auto writer = ytService->MakeWriter(testYtTable, TClusterConnection());
+ auto reader = ytJobService->MakeReader(richPath);
+ auto writer = ytJobService->MakeWriter(testYtTable, TClusterConnection());
auto cancelFlag = std::make_shared<std::atomic<bool>>(false);
ParseRecords(reader, writer, 1, 10, cancelFlag);
writer->Flush();
diff --git a/yt/yql/providers/yt/fmr/utils/ya.make b/yt/yql/providers/yt/fmr/utils/ya.make
index 0d683128179..8bd317b26d5 100644
--- a/yt/yql/providers/yt/fmr/utils/ya.make
+++ b/yt/yql/providers/yt/fmr/utils/ya.make
@@ -1,6 +1,7 @@
LIBRARY()
SRCS(
+ yql_yt_client.cpp
yql_yt_log_context.cpp
yql_yt_parse_records.cpp
yql_yt_table_data_service_key.cpp
@@ -8,8 +9,9 @@ SRCS(
PEERDIR(
library/cpp/http/io
+ yt/cpp/mapreduce/client
yt/cpp/mapreduce/interface
- yt/yql/providers/yt/fmr/yt_service/impl
+ yt/yql/providers/yt/fmr/request_options
yt/yql/providers/yt/codec
yql/essentials/utils
)
diff --git a/yt/yql/providers/yt/fmr/utils/yql_yt_client.cpp b/yt/yql/providers/yt/fmr/utils/yql_yt_client.cpp
new file mode 100644
index 00000000000..3eaeba98d9f
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/utils/yql_yt_client.cpp
@@ -0,0 +1,14 @@
+#include "yql_yt_client.h"
+
+namespace NYql::NFmr {
+
+NYT::IClientPtr CreateClient(const TClusterConnection& clusterConnection) {
+ NYT::TCreateClientOptions createOpts;
+ auto token = clusterConnection.Token;
+ if (token) {
+ createOpts.Token(*token);
+ }
+ return NYT::CreateClient(clusterConnection.YtServerName, createOpts);
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/utils/yql_yt_client.h b/yt/yql/providers/yt/fmr/utils/yql_yt_client.h
new file mode 100644
index 00000000000..3d4f2f07224
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/utils/yql_yt_client.h
@@ -0,0 +1,10 @@
+#pragma once
+
+#include <yt/cpp/mapreduce/interface/client.h>
+#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
+
+namespace NYql::NFmr {
+
+NYT::IClientPtr CreateClient(const TClusterConnection& clusterConnection);
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make b/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make
index 790f845c3e8..7bde0ed7f9b 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make
+++ b/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make
@@ -6,6 +6,7 @@ SRCS(
PEERDIR(
yt/yql/providers/yt/fmr/coordinator/impl
+ yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file
yt/yql/providers/yt/fmr/job_factory/impl
yt/yql/providers/yt/fmr/worker/impl
)
diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
index f19c5365bdf..79ccfa46f70 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
+++ b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
@@ -6,12 +6,13 @@
#include <util/thread/pool.h>
#include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h>
#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h>
+#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service//file/yql_yt_file_coordinator_service.h>
#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
namespace NYql::NFmr {
TDownloadOperationParams downloadOperationParams{
- .Input = TYtTableRef{"Path","Cluster"},
+ .Input = TYtTableRef{"Path","Cluster", "FilePath"},
.Output = TFmrTableRef{{"Cluster", "Path"}}
};
@@ -26,7 +27,7 @@ TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Do
Y_UNIT_TEST_SUITE(FmrWorkerTests) {
Y_UNIT_TEST(GetSuccessfulOperationResult) {
- auto coordinator = MakeFmrCoordinator();
+ auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService());
auto operationResults = std::make_shared<TString>("no_result_yet");
auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
while (!cancelFlag->load()) {
@@ -48,7 +49,7 @@ Y_UNIT_TEST_SUITE(FmrWorkerTests) {
}
Y_UNIT_TEST(CancelOperation) {
- auto coordinator = MakeFmrCoordinator();
+ auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService());
auto operationResults = std::make_shared<TString>("no_result_yet");
auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
int numIterations = 0;
@@ -79,7 +80,7 @@ Y_UNIT_TEST_SUITE(FmrWorkerTests) {
TFmrCoordinatorSettings coordinatorSettings{};
coordinatorSettings.WorkersNum = 2;
coordinatorSettings.RandomProvider = CreateDeterministicRandomProvider(3);
- auto coordinator = MakeFmrCoordinator(coordinatorSettings);
+ auto coordinator = MakeFmrCoordinator(coordinatorSettings, MakeFileYtCoordinatorService());
std::shared_ptr<std::atomic<ui32>> operationResult = std::make_shared<std::atomic<ui32>>(0);
auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
while (!cancelFlag->load()) {
diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp
index ce06e6d0783..ee8ce692f00 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp
+++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp
@@ -50,15 +50,20 @@ public:
}
}
+ ui64 maxParallelJobCount = JobFactory_->GetMaxParallelJobCount();
+ YQL_ENSURE(maxParallelJobCount >= WorkerState_->TaskStatuses.size());
+ ui64 availableSlots = maxParallelJobCount - WorkerState_->TaskStatuses.size();
auto heartbeatRequest = THeartbeatRequest(
WorkerId_,
VolatileId_,
- taskStates
+ taskStates,
+ availableSlots
);
auto heartbeatResponseFuture = Coordinator_->SendHeartbeatResponse(heartbeatRequest);
auto heartbeatResponse = heartbeatResponseFuture.GetValueSync();
std::vector<TTask::TPtr> tasksToRun = heartbeatResponse.TasksToRun;
std::unordered_set<TString> taskToDeleteIds = heartbeatResponse.TaskToDeleteIds;
+ YQL_ENSURE(tasksToRun.size() <= availableSlots);
with_lock(WorkerState_->Mutex) {
for (auto task: tasksToRun) {
diff --git a/yt/yql/providers/yt/fmr/yt_service/file/ut/ya.make b/yt/yql/providers/yt/fmr/yt_job_service/file/ut/ya.make
index 6f4ff700b0d..884cda6f788 100644
--- a/yt/yql/providers/yt/fmr/yt_service/file/ut/ya.make
+++ b/yt/yql/providers/yt/fmr/yt_job_service/file/ut/ya.make
@@ -1,11 +1,11 @@
UNITTEST()
SRCS(
- yql_yt_file_yt_service_ut.cpp
+ yql_yt_file_yt_job_service_ut.cpp
)
PEERDIR(
- yt/yql/providers/yt/fmr/yt_service/file
+ yt/yql/providers/yt/fmr/yt_job_service/file
yt/yql/providers/yt/gateway/file
)
diff --git a/yt/yql/providers/yt/fmr/yt_service/file/ut/yql_yt_file_yt_service_ut.cpp b/yt/yql/providers/yt/fmr/yt_job_service/file/ut/yql_yt_file_yt_job_service_ut.cpp
index acc5867c454..6a013963cd7 100644
--- a/yt/yql/providers/yt/fmr/yt_service/file/ut/yql_yt_file_yt_service_ut.cpp
+++ b/yt/yql/providers/yt/fmr/yt_job_service/file/ut/yql_yt_file_yt_job_service_ut.cpp
@@ -1,26 +1,26 @@
#include <library/cpp/testing/unittest/registar.h>
#include <util/stream/file.h>
-#include <yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h>
-#include <yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h> // TODO - REMOVE
+#include <yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.h>
+#include <yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h>
namespace NYql::NFmr {
-Y_UNIT_TEST_SUITE(FileYtServiceTest) {
- Y_UNIT_TEST(CheckReaderAndWriter) {
+Y_UNIT_TEST_SUITE(FileYtServiceTests) {
+ Y_UNIT_TEST(CheckFileReaderAndWriter) {
TString inputYsonContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};\n"
"{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};\n";
TTempFileHandle file{};
TYtTableRef ytTable{.Path = "test_path", .Cluster = "hahn", .FilePath = file.Name()};
- auto fileService = MakeFileYtSerivce();
+ auto fileService = MakeFileYtJobSerivce();
auto writer = fileService->MakeWriter(ytTable, TClusterConnection());
writer->Write(inputYsonContent.data(), inputYsonContent.size());
writer->Flush();
TFileInput input(file.Name());
- auto reader = fileService->MakeReader(ytTable, TClusterConnection());
+ auto reader = fileService->MakeReader(file.Name());
TStringStream binaryYsonStream;
TStringStream textYsonStream;
binaryYsonStream << reader->ReadAll();
diff --git a/yt/yql/providers/yt/fmr/yt_service/file/ya.make b/yt/yql/providers/yt/fmr/yt_job_service/file/ya.make
index dcd0969e3e6..13cfe7ebd3d 100644
--- a/yt/yql/providers/yt/fmr/yt_service/file/ya.make
+++ b/yt/yql/providers/yt/fmr/yt_job_service/file/ya.make
@@ -1,13 +1,13 @@
LIBRARY()
SRCS(
- yql_yt_file_yt_service.cpp
+ yql_yt_file_yt_job_service.cpp
)
PEERDIR(
library/cpp/yson
yt/yql/providers/yt/gateway/file
- yt/yql/providers/yt/fmr/yt_service/interface
+ yt/yql/providers/yt/fmr/yt_job_service/interface
yt/yql/providers/yt/lib/yson_helpers
yql/essentials/utils
)
diff --git a/yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.cpp b/yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.cpp
new file mode 100644
index 00000000000..d72f76bd419
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.cpp
@@ -0,0 +1,68 @@
+#include "yql_yt_file_yt_job_service.h"
+#include <library/cpp/yson/parser.h>
+#include <util/stream/file.h>
+#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h>
+#include <yt/yql/providers/yt/lib/yson_helpers/yson_helpers.h>
+#include <yql/essentials/utils/yql_panic.h>
+
+namespace NYql::NFmr {
+
+namespace {
+
+class TFileYtTableWriter: public NYT::TRawTableWriter {
+public:
+ TFileYtTableWriter(const TString& filePath): FilePath_(filePath) {}
+
+ void NotifyRowEnd() override {
+ }
+
+private:
+ void DoWrite(const void* buf, size_t len) override {
+ Buffer_.Append(static_cast<const char*>(buf), len);
+ }
+
+ void DoFlush() override {
+ TMemoryInput input(Buffer_.data(), Buffer_.size());
+ TFile outputFile(FilePath_, OpenAlways | WrOnly | ForAppend);
+ TFileOutput outputFileStream(outputFile);
+ TDoubleHighPrecisionYsonWriter writer(&outputFileStream, ::NYson::EYsonType::ListFragment);
+ NYson::TYsonParser parser(&writer, &input, ::NYson::EYsonType::ListFragment);
+ parser.Parse();
+ Buffer_.Clear();
+ }
+
+ TString FilePath_;
+ TBuffer Buffer_;
+};
+
+class TFileYtJobService: public NYql::NFmr::IYtJobService {
+public:
+
+NYT::TRawTableReaderPtr MakeReader(
+ const std::variant<NYT::TRichYPath, TString>& inputTableRef,
+ const TClusterConnection& /*clusterConnection*/,
+ const TYtReaderSettings& /*readerSettings*/
+) override {
+ TString filePath = std::get<TString>(inputTableRef);
+ auto textYsonInputs = NFile::MakeTextYsonInputs({{filePath, NFile::TColumnsInfo{}}}, false);
+ return textYsonInputs[0];
+ }
+
+ NYT::TRawTableWriterPtr MakeWriter(
+ const TYtTableRef& ytTable,
+ const TClusterConnection& /*clusterConnection*/,
+ const TYtWriterSettings& /*writerSettings*/
+ ) override {
+ YQL_ENSURE(ytTable.FilePath);
+ return MakeIntrusive<TFileYtTableWriter>(*ytTable.FilePath);
+ }
+};
+
+} // namespace
+
+IYtJobService::TPtr MakeFileYtJobSerivce() {
+ return MakeIntrusive<TFileYtJobService>();
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.h b/yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.h
new file mode 100644
index 00000000000..a4f7c2426ce
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.h
@@ -0,0 +1,7 @@
+#include <yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.h>
+
+namespace NYql::NFmr {
+
+IYtJobService::TPtr MakeFileYtJobSerivce();
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/yt_job_service/impl/ya.make b/yt/yql/providers/yt/fmr/yt_job_service/impl/ya.make
new file mode 100644
index 00000000000..53303655e61
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/yt_job_service/impl/ya.make
@@ -0,0 +1,18 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_job_service_impl.cpp
+)
+
+PEERDIR(
+ library/cpp/yt/error
+ yt/cpp/mapreduce/client
+ yt/cpp/mapreduce/common
+ yt/yql/providers/yt/fmr/yt_job_service/interface
+ yql/essentials/utils
+ yql/essentials/utils/log
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp b/yt/yql/providers/yt/fmr/yt_job_service/impl/yql_yt_job_service_impl.cpp
index 4634a76031b..3c21287dc5b 100644
--- a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp
+++ b/yt/yql/providers/yt/fmr/yt_job_service/impl/yql_yt_job_service_impl.cpp
@@ -1,28 +1,36 @@
+#include <library/cpp/yt/error/error.h>
#include <yt/cpp/mapreduce/common/helpers.h>
#include <yt/cpp/mapreduce/interface/client.h>
+#include <yt/yql/providers/yt/fmr/utils/yql_yt_client.h>
+#include <yql/essentials/utils/log/log.h>
+#include <yql/essentials/utils/yql_panic.h>
-#include "yql_yt_yt_service_impl.h"
+#include "yql_yt_job_service_impl.h"
namespace NYql::NFmr {
namespace {
-class TFmrYtService: public NYql::NFmr::IYtService {
+class TFmrYtJobService: public IYtJobService {
public:
NYT::TRawTableReaderPtr MakeReader(
- const TYtTableRef& ytTable,
+ const std::variant<NYT::TRichYPath, TString>& inputTableRef,
const TClusterConnection& clusterConnection,
const TYtReaderSettings& readerSettings
) override {
+ auto richPath = std::get<NYT::TRichYPath>(inputTableRef);
+ //YQL_CLOG(DEBUG, FastMapReduce) << "Creating reader for input yt table with path " << NYT::NodeToCanonicalYsonString(NYT::PathToNode(richPath));
+ YQL_ENSURE(richPath.Cluster_);
+ TFmrTableId fmrId(*richPath.Cluster_, richPath.Path_);
auto client = CreateClient(clusterConnection);
auto transaction = client->AttachTransaction(GetGuid(clusterConnection.TransactionId));
- auto path = NYT::TRichYPath(NYT::AddPathPrefix(ytTable.Path, "//"));
+
auto controlAttributes = NYT::TControlAttributes();
if (!readerSettings.WithAttributes) {
controlAttributes.EnableRangeIndex(false).EnableRowIndex(false);
}
auto readerOptions = NYT::TTableReaderOptions().ControlAttributes(controlAttributes);
- return transaction->CreateRawReader(path, NYT::TFormat::YsonBinary(), readerOptions);
+ return transaction->CreateRawReader(richPath, NYT::TFormat::YsonBinary(), readerOptions);
}
NYT::TRawTableWriterPtr MakeWriter(
@@ -33,29 +41,19 @@ public:
auto client = CreateClient(clusterConnection);
auto transaction = client->AttachTransaction(GetGuid(clusterConnection.TransactionId));
TString ytPath = NYT::AddPathPrefix(ytTable.Path, "//");
- auto richPath = NYT::TRichYPath(ytPath).Append(true);
+ auto richPath = NYT::TRichYPath(ytPath).Cluster(ytTable.Cluster).Append(true);
auto writerOptions = NYT::TTableWriterOptions();
if (writerSetttings.MaxRowWeight) {
writerOptions.Config(NYT::TNode()("max_row_weight", *writerSetttings.MaxRowWeight));
}
return transaction->CreateRawWriter(richPath, NYT::TFormat::YsonBinary(), writerOptions);
}
-
-private:
- NYT::IClientPtr CreateClient(const TClusterConnection& clusterConnection) {
- NYT::TCreateClientOptions createOpts;
- auto token = clusterConnection.Token;
- if (token) {
- createOpts.Token(*token);
- }
- return NYT::CreateClient(clusterConnection.YtServerName, createOpts);
- }
};
} // namespace
-IYtService::TPtr MakeFmrYtSerivce() {
- return MakeIntrusive<TFmrYtService>();
+IYtJobService::TPtr MakeYtJobSerivce() {
+ return MakeIntrusive<TFmrYtJobService>();
}
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/yt_job_service/impl/yql_yt_job_service_impl.h b/yt/yql/providers/yt/fmr/yt_job_service/impl/yql_yt_job_service_impl.h
new file mode 100644
index 00000000000..bac4abf6ab7
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/yt_job_service/impl/yql_yt_job_service_impl.h
@@ -0,0 +1,7 @@
+#include <yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.h>
+
+namespace NYql::NFmr {
+
+IYtJobService::TPtr MakeYtJobSerivce();
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/yt_service/interface/ya.make b/yt/yql/providers/yt/fmr/yt_job_service/interface/ya.make
index 032f645e022..a824de924f5 100644
--- a/yt/yql/providers/yt/fmr/yt_service/interface/ya.make
+++ b/yt/yql/providers/yt/fmr/yt_job_service/interface/ya.make
@@ -1,7 +1,7 @@
LIBRARY()
SRCS(
- yql_yt_yt_service.cpp
+ yql_yt_job_service.cpp
)
PEERDIR(
diff --git a/yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.cpp b/yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.cpp
new file mode 100644
index 00000000000..7b3bac7a269
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.cpp
@@ -0,0 +1 @@
+#include "yql_yt_job_service.h"
diff --git a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h b/yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.h
index 77204493e70..0975fe729c7 100644
--- a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h
+++ b/yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.h
@@ -7,22 +7,23 @@
namespace NYql::NFmr {
struct TYtReaderSettings {
- bool WithAttributes = false; // Enable RowIndex and RangeIndex
+ bool WithAttributes = false; // Enable RowIndex and RangeIndex, for now only mode = false is supported.
};
struct TYtWriterSettings {
TMaybe<ui64> MaxRowWeight = Nothing();
};
-class IYtService: public TThrRefBase {
+class IYtJobService: public TThrRefBase {
public:
- virtual ~IYtService() = default;
+ virtual ~IYtJobService() = default;
- using TPtr = TIntrusivePtr<IYtService>;
+ using TPtr = TIntrusivePtr<IYtJobService>;
+ // Either RichPath to actual Yt table or filepath is passed depending on type of underlying gateway.
virtual NYT::TRawTableReaderPtr MakeReader(
- const TYtTableRef& ytTable,
- const TClusterConnection& clusterConnection,
+ const std::variant<NYT::TRichYPath, TString>& inputTableRef,
+ const TClusterConnection& clusterConnection = TClusterConnection(),
const TYtReaderSettings& settings = TYtReaderSettings()
) = 0;
diff --git a/yt/yql/providers/yt/fmr/yt_service/mock/ya.make b/yt/yql/providers/yt/fmr/yt_job_service/mock/ya.make
index e586d08ec3a..4896db2ef75 100644
--- a/yt/yql/providers/yt/fmr/yt_service/mock/ya.make
+++ b/yt/yql/providers/yt/fmr/yt_job_service/mock/ya.make
@@ -1,12 +1,13 @@
LIBRARY()
SRCS(
- yql_yt_yt_service_mock.cpp
+ yql_yt_job_service_mock.cpp
)
PEERDIR(
+ yt/cpp/mapreduce/common
yt/cpp/mapreduce/interface
- yt/yql/providers/yt/fmr/yt_service/interface
+ yt/yql/providers/yt/fmr/yt_job_service/interface
yql/essentials/utils
)
diff --git a/yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.cpp b/yt/yql/providers/yt/fmr/yt_job_service/mock/yql_yt_job_service_mock.cpp
index 337c82d2b31..e31b7a22c17 100644
--- a/yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.cpp
+++ b/yt/yql/providers/yt/fmr/yt_job_service/mock/yql_yt_job_service_mock.cpp
@@ -1,5 +1,6 @@
-#include "yql_yt_yt_service_mock.h"
+#include "yql_yt_job_service_mock.h"
+#include <yt/cpp/mapreduce/common/helpers.h>
#include <yt/cpp/mapreduce/interface/io.h>
#include <yql/essentials/utils/yql_panic.h>
@@ -58,14 +59,20 @@ private:
TBuffer Buffer_;
};
-class TMockYtService: public NYql::NFmr::IYtService {
+class TMockYtJobService: public NYql::NFmr::IYtJobService {
public:
- TMockYtService(const std::unordered_map<TYtTableRef, TString>& inputTables, std::unordered_map<TYtTableRef, TString>& outputTables)
+ TMockYtJobService(const std::unordered_map<TString, TString>& inputTables, std::unordered_map<TYtTableRef, TString>& outputTables)
: InputTables_(inputTables), OutputTables_(outputTables) {}
- NYT::TRawTableReaderPtr MakeReader(const TYtTableRef& ytTableRef, const TClusterConnection&, const TYtReaderSettings&) override {
- YQL_ENSURE(InputTables_.contains(ytTableRef));
- return MakeIntrusive<TMockYtTableReader>(InputTables_[ytTableRef]);
+ virtual NYT::TRawTableReaderPtr MakeReader(
+ const std::variant<NYT::TRichYPath, TString>& inputTableRef,
+ const TClusterConnection& /*clusterConnection*/,
+ const TYtReaderSettings& /*settings*/
+ ) override {
+ auto richPath = std::get<NYT::TRichYPath>(inputTableRef);
+ TString richPathStr = NYT::NodeToCanonicalYsonString(NYT::PathToNode(richPath));
+ YQL_ENSURE(InputTables_.contains(richPathStr));
+ return {MakeIntrusive<TMockYtTableReader>(InputTables_[richPathStr])};
}
NYT::TRawTableWriterPtr MakeWriter(const TYtTableRef& ytTableRef, const TClusterConnection&, const TYtWriterSettings&) override {
@@ -76,14 +83,14 @@ public:
}
private:
- std::unordered_map<TYtTableRef, TString> InputTables_; // table -> textYsonContent
+ std::unordered_map<TString, TString> InputTables_; // rich yt path in string form -> total textYsonContent of it
std::unordered_map<TYtTableRef, TString>& OutputTables_;
};
} // namespace
-IYtService::TPtr MakeMockYtService(const std::unordered_map<TYtTableRef, TString>& inputTables, std::unordered_map<TYtTableRef, TString>& outputTables) {
- return MakeIntrusive<TMockYtService>(inputTables, outputTables);
+IYtJobService::TPtr MakeMockYtJobService(const std::unordered_map<TString, TString>& inputTables, std::unordered_map<TYtTableRef, TString>& outputTables) {
+ return MakeIntrusive<TMockYtJobService>(inputTables, outputTables);
}
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/yt_job_service/mock/yql_yt_job_service_mock.h b/yt/yql/providers/yt/fmr/yt_job_service/mock/yql_yt_job_service_mock.h
new file mode 100644
index 00000000000..f13dc79de1d
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/yt_job_service/mock/yql_yt_job_service_mock.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include <yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.h>
+
+namespace NYql::NFmr {
+
+IYtJobService::TPtr MakeMockYtJobService(const std::unordered_map<TString, TString>& inputTables, std::unordered_map<TYtTableRef, TString>& outputTables);
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.cpp b/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.cpp
deleted file mode 100644
index 1c33ed430e9..00000000000
--- a/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.cpp
+++ /dev/null
@@ -1,68 +0,0 @@
-#include "yql_yt_file_yt_service.h"
-#include <library/cpp/yson/parser.h>
-#include <util/stream/file.h>
-#include <yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h>
-#include <yt/yql/providers/yt/lib/yson_helpers/yson_helpers.h>
-#include <yql/essentials/utils/yql_panic.h>
-
-namespace NYql::NFmr {
-
-namespace {
-
-class TFileYtTableWriter: public NYT::TRawTableWriter {
- public:
- TFileYtTableWriter(const TString& filePath): FilePath_(filePath) {}
-
- void NotifyRowEnd() override {
- }
-
- private:
- void DoWrite(const void* buf, size_t len) override {
- Buffer_.Append(static_cast<const char*>(buf), len);
- }
-
- void DoFlush() override {
- TMemoryInput input(Buffer_.data(), Buffer_.size());
- TFileOutput outputFileStream(FilePath_);
- TDoubleHighPrecisionYsonWriter writer(&outputFileStream, ::NYson::EYsonType::ListFragment);
- NYson::TYsonParser parser(&writer, &input, ::NYson::EYsonType::ListFragment);
- parser.Parse();
- Buffer_.Clear();
- }
-
- TString FilePath_;
- TBuffer Buffer_;
- };
-
-
-class TFileYtService: public NYql::NFmr::IYtService {
-public:
-
- NYT::TRawTableReaderPtr MakeReader(
- const TYtTableRef& ytTable,
- const TClusterConnection& /*clusterConnection*/,
- const TYtReaderSettings& /*readerSettings*/
- ) override {
- YQL_ENSURE(ytTable.FilePath);
- auto textYsonInputs = NFile::MakeTextYsonInputs({{*ytTable.FilePath, NFile::TColumnsInfo{}}}, false);
- return textYsonInputs[0];
- }
-
- NYT::TRawTableWriterPtr MakeWriter(
- const TYtTableRef& ytTable,
- const TClusterConnection& /*clusterConnection*/,
- const TYtWriterSettings& /*writerSettings*/
- ) override {
- YQL_ENSURE(ytTable.FilePath);
- return MakeIntrusive<TFileYtTableWriter>(*ytTable.FilePath);
- }
-
-};
-
-} // namespace
-
-IYtService::TPtr MakeFileYtSerivce() {
- return MakeIntrusive<TFileYtService>();
-}
-
-} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h b/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h
deleted file mode 100644
index ea5e8027b9a..00000000000
--- a/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h
+++ /dev/null
@@ -1,7 +0,0 @@
-#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h>
-
-namespace NYql::NFmr {
-
-IYtService::TPtr MakeFileYtSerivce();
-
-} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/yt_service/impl/ya.make b/yt/yql/providers/yt/fmr/yt_service/impl/ya.make
deleted file mode 100644
index 1b37156ed2f..00000000000
--- a/yt/yql/providers/yt/fmr/yt_service/impl/ya.make
+++ /dev/null
@@ -1,15 +0,0 @@
-LIBRARY()
-
-SRCS(
- yql_yt_yt_service_impl.cpp
-)
-
-PEERDIR(
- yt/cpp/mapreduce/client
- yt/cpp/mapreduce/common
- yt/yql/providers/yt/fmr/yt_service/interface
-)
-
-YQL_LAST_ABI_VERSION()
-
-END()
diff --git a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h b/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h
deleted file mode 100644
index bf1962cac5a..00000000000
--- a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h
+++ /dev/null
@@ -1,7 +0,0 @@
-#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h>
-
-namespace NYql::NFmr {
-
-IYtService::TPtr MakeFmrYtSerivce();
-
-} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.cpp b/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.cpp
deleted file mode 100644
index fbcafbc3a27..00000000000
--- a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.cpp
+++ /dev/null
@@ -1 +0,0 @@
-#include "yql_yt_yt_service.h"
diff --git a/yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h b/yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h
deleted file mode 100644
index b10426c2f03..00000000000
--- a/yt/yql/providers/yt/fmr/yt_service/mock/yql_yt_yt_service_mock.h
+++ /dev/null
@@ -1,9 +0,0 @@
-#pragma once
-
-#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h>
-
-namespace NYql::NFmr {
-
-IYtService::TPtr MakeMockYtService(const std::unordered_map<TYtTableRef, TString>& inputTables, std::unordered_map<TYtTableRef, TString>& outputTables);
-
-} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h b/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h
index a28606598d3..01c400d8527 100644
--- a/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h
+++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h
@@ -1,4 +1,5 @@
#pragma once
+
#include <yt/cpp/mapreduce/interface/fwd.h>
#include <yt/cpp/mapreduce/interface/common.h>
diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
index ad261da0d62..6b2d145b24d 100644
--- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
+++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
@@ -134,6 +134,7 @@ public:
TString sessionId = options.SessionId();
auto config = options.Config();
TRunResult result;
+ YQL_ENSURE(fmrOperationResult.TablesStats.size() == outputTables.size());
for (size_t i = 0; i < outputTables.size(); ++i) {
auto outputTable = outputTables[i];
TFmrTableId fmrOutputTableId = {outputTable.Cluster, outputTable.Path};