aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2025-03-13 17:13:29 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2025-03-13 17:25:23 +0300
commit2096b0e1fd226062be688c1d9c0cc1a2f3a633e7 (patch)
tree252549f660307039bb359e835cfb5325cc3b5b5e
parentf32f9dee5085a51ac5ef9d05db59ca8e12c31921 (diff)
downloadydb-2096b0e1fd226062be688c1d9c0cc1a2f3a633e7.tar.gz
Intermediate changes
commit_hash:4b64d98e7898a081dd87badffe9fcb56bc6f38f8
-rw-r--r--contrib/tools/cython/Cython/Compiler/Main.py6
-rw-r--r--contrib/tools/cython/patches/search-pxd.patch15
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ut/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp186
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_raw_table_reader_ut.cpp98
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp118
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h8
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.cpp83
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h45
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp9
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h9
12 files changed, 391 insertions, 188 deletions
diff --git a/contrib/tools/cython/Cython/Compiler/Main.py b/contrib/tools/cython/Cython/Compiler/Main.py
index 5ec52f6520..3f03b66ac9 100644
--- a/contrib/tools/cython/Cython/Compiler/Main.py
+++ b/contrib/tools/cython/Cython/Compiler/Main.py
@@ -822,12 +822,6 @@ def search_include_directories(dirs, qualified_name, suffix, pos, include=False)
module_filename = module_name + suffix
package_filename = "__init__" + suffix
- # Fix from https://github.com/cython/cython/pull/2738
- if pos:
- path = os.path.join(os.path.dirname(pos[0].filename), module_filename)
- if os.path.exists(path):
- return path
-
for dirname in dirs:
path = os.path.join(dirname, dotted_filename)
if os.path.exists(path):
diff --git a/contrib/tools/cython/patches/search-pxd.patch b/contrib/tools/cython/patches/search-pxd.patch
deleted file mode 100644
index f125fc5387..0000000000
--- a/contrib/tools/cython/patches/search-pxd.patch
+++ /dev/null
@@ -1,15 +0,0 @@
---- contrib/tools/cython/Cython/Compiler/Main.py (575a8ed5bc91c246ebc298ce6de0fe4d891b3594)
-+++ contrib/tools/cython/Cython/Compiler/Main.py (working tree)
-@@ -822,6 +822,12 @@ def search_include_directories(dirs, qualified_name, suffix, pos, include=False)
- module_filename = module_name + suffix
- package_filename = "__init__" + suffix
-
-+ # Fix from https://github.com/cython/cython/pull/2738
-+ if pos:
-+ path = os.path.join(os.path.dirname(pos[0].filename), module_filename)
-+ if os.path.exists(path):
-+ return path
-+
- for dirname in dirs:
- path = os.path.join(dirname, dotted_filename)
- if os.path.exists(path):
diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/ya.make b/yt/yql/providers/yt/fmr/job/impl/ut/ya.make
index cb2ee1f19b..78813e529d 100644
--- a/yt/yql/providers/yt/fmr/job/impl/ut/ya.make
+++ b/yt/yql/providers/yt/fmr/job/impl/ut/ya.make
@@ -3,6 +3,7 @@ UNITTEST()
SRCS(
yql_yt_job_ut.cpp
yql_yt_output_stream_ut.cpp
+ yql_yt_raw_table_reader_ut.cpp
)
PEERDIR(
diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
index 4b4399dcca..7be6348426 100644
--- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
@@ -6,10 +6,18 @@
namespace NYql::NFmr {
-TString TableContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
+TString TableContent_1 = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
+TString TableContent_2 = "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
+TString TableContent_3 = "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
Y_UNIT_TEST_SUITE(FmrJobTests) {
Y_UNIT_TEST(DownloadTable) {
@@ -17,14 +25,15 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag);
+ TFmrJobSettings settings = {1};
+ IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag, settings);
TYtTableRef input = TYtTableRef("test_cluster", "test_path");
TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id", "test_part_id");
TDownloadTaskParams params = TDownloadTaskParams(input, output);
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
- ytUploadedTablesMock->AddTable(input, TableContent);
+ ytUploadedTablesMock->AddTable(input, TableContent_1);
auto res = job->Download(params);
@@ -35,57 +44,47 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
UNIT_ASSERT_EQUAL(statistics->OutputTables.at(output).Rows, 4);
auto resultTableContent = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync();
UNIT_ASSERT_C(resultTableContent, "Result table content is empty");
- UNIT_ASSERT_NO_DIFF(*resultTableContent, TableContent);
+ UNIT_ASSERT_NO_DIFF(*resultTableContent, TableContent_1);
}
Y_UNIT_TEST(UploadTable) {
- TString ytTableContent = TableContent;
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag);
+ TFmrJobSettings settings = {1};
+ IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag, settings);
TYtTableRef output = TYtTableRef("test_cluster", "test_path");
- TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id"};
+ std::vector<TTableRange> ranges = {{"test_part_id"}};
+ TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id", .TableRanges = ranges};
auto params = TUploadTaskParams(input, output);
- tableDataServicePtr->Put(input.TableId, ytTableContent);
+ auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0);
+ tableDataServicePtr->Put(key, TableContent_1);
auto res = job->Upload(params);
auto err = std::get_if<TError>(&res);
UNIT_ASSERT_C(!err,err->ErrorMessage);
- UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), ytTableContent);
+ UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), TableContent_1);
}
Y_UNIT_TEST(MergeFmrTables) {
- TString TableContent_1 =
- "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)";
- TString TableContent_2 =
- "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
- TString TableContent_3 =
- "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag);
+ TFmrJobSettings settings = {1};
+ IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag, settings);
- TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1"};
- TFmrTableInputRef input_2 = TFmrTableInputRef{.TableId = "test_table_id_2"};
- TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3"};
+ std::vector<TTableRange> ranges = {{"test_part_id"}};
+
+ TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges};
+ TFmrTableInputRef input_2 = TFmrTableInputRef{.TableId = "test_table_id_2", .TableRanges = ranges};
+ TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges};
TTaskTableRef input_table_ref_1 = {input_1};
TTaskTableRef input_table_ref_2 = {input_2};
TTaskTableRef input_table_ref_3 = {input_3};
@@ -94,9 +93,12 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
auto params = TMergeTaskParams(inputs, output);
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
- tableDataServicePtr->Put(input_1.TableId, TableContent_1);
- tableDataServicePtr->Put(input_2.TableId, TableContent_2);
- tableDataServicePtr->Put(input_3.TableId, TableContent_3);
+ auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0);
+ auto key_2 = GetTableDataServiceKey(input_2.TableId, "test_part_id", 0);
+ auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0);
+ tableDataServicePtr->Put(key_1, TableContent_1);
+ tableDataServicePtr->Put(key_2, TableContent_2);
+ tableDataServicePtr->Put(key_3, TableContent_3);
auto res = job->Merge(params);
@@ -109,31 +111,19 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
}
Y_UNIT_TEST(MergeMixedTables) {
- TString TableContent_1 =
- "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)";
- TString TableContent_2 =
- "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
- TString TableContent_3 =
- "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag);
+ TFmrJobSettings settings = {1};
+ IFmrJob::TPtr job = MakeFmrJob(tableDataServicePtr, ytService, cancelFlag, settings);
+
+ std::vector<TTableRange> ranges = {{"test_part_id"}};
- TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1"};
+ TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges};
TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster");
- TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3"};
+ TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges};
TTaskTableRef input_table_ref_1 = {input_1};
TTaskTableRef input_table_ref_2 = {input_2};
TTaskTableRef input_table_ref_3 = {input_3};
@@ -142,9 +132,11 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
auto params = TMergeTaskParams(inputs, output);
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
- tableDataServicePtr->Put(input_1.TableId, TableContent_1);
+ auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0);
+ auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0);
+ tableDataServicePtr->Put(key_1, TableContent_1);
ytUploadedTablesMock->AddTable(input_2, TableContent_2);
- tableDataServicePtr->Put(input_3.TableId, TableContent_3);
+ tableDataServicePtr->Put(key_3, TableContent_3);
auto res = job->Merge(params);
@@ -159,11 +151,6 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
Y_UNIT_TEST_SUITE(TaskRunTests) {
Y_UNIT_TEST(RunDownloadTask) {
- TString ytTableContent =
- "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"}";
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
@@ -174,7 +161,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id", "test_part_id");
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
- ytUploadedTablesMock->AddTable(input, ytTableContent);
+ ytUploadedTablesMock->AddTable(input, TableContent_1);
TDownloadTaskParams params = TDownloadTaskParams(input, output);
TTask::TPtr task = MakeTask(ETaskType::Download, "test_task_id", params, "test_session_id");
@@ -183,55 +170,50 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed);
auto resultTableContent = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync();
UNIT_ASSERT_C(resultTableContent, "Result table content is empty");
- UNIT_ASSERT_NO_DIFF(*resultTableContent, ytTableContent);
+ UNIT_ASSERT_NO_DIFF(*resultTableContent, TableContent_1);
}
Y_UNIT_TEST(RunUploadTask) {
- TString ytTableContent =
- "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"}";
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id"};
+ std::vector<TTableRange> ranges = {{"test_part_id"}};
+ TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id", .TableRanges = ranges};
TYtTableRef output = TYtTableRef("test_cluster", "test_path");
TUploadTaskParams params = TUploadTaskParams(input, output);
TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id");
- tableDataServicePtr->Put(input.TableId, ytTableContent);
+ auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0);
+
+ tableDataServicePtr->Put(key, TableContent_1);
ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed);
- UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), ytTableContent);
+ UNIT_ASSERT_NO_DIFF(ytUploadedTablesMock->GetTableContent(output), TableContent_1);
}
Y_UNIT_TEST(RunUploadTaskWithNoTable) {
- TString ytTableContent =
- "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"}";
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id"};
+ std::vector<TTableRange> ranges = {{"test_part_id"}};
+ TFmrTableInputRef input = TFmrTableInputRef{.TableId = "test_table_id", .TableRanges = ranges};
TYtTableRef output = TYtTableRef("test_cluster", "test_path");
TUploadTaskParams params = TUploadTaskParams(input, output);
TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id");
// No table in tableDataServicePtr
- // tableDataServicePtr->Put(input.TableId, ytTableContent);
+ // auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0);
+ // tableDataServicePtr->Put(key, ytTableContent);
ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
@@ -240,30 +222,16 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
}
Y_UNIT_TEST(RunMergeTask) {
- TString TableContent_1 =
- "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)";
- TString TableContent_2 =
- "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
- TString TableContent_3 =
- "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1"};
+ std::vector<TTableRange> ranges = {{"test_part_id"}};
+ TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges};
TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster");
- TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3"};
+ TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges};
TTaskTableRef input_table_ref_1 = {input_1};
TTaskTableRef input_table_ref_2 = {input_2};
TTaskTableRef input_table_ref_3 = {input_3};
@@ -274,9 +242,11 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id");
- tableDataServicePtr->Put(input_1.TableId, TableContent_1);
+ auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0);
+ auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0);
+ tableDataServicePtr->Put(key_1, TableContent_1);
ytUploadedTablesMock->AddTable(input_2, TableContent_2);
- tableDataServicePtr->Put(input_3.TableId, TableContent_3);
+ tableDataServicePtr->Put(key_3, TableContent_3);
ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
@@ -287,30 +257,16 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
}
Y_UNIT_TEST(RunMergeTaskWithNoTable) {
- TString TableContent_1 =
- "{\"key\"=\"1\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"2\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"3\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"4\";\"subkey\"=\"4\";\"value\"=\"qzz\"};)";
- TString TableContent_2 =
- "{\"key\"=\"5\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"6\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"7\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"8\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
- TString TableContent_3 =
- "{\"key\"=\"9\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
- "{\"key\"=\"10\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
- "{\"key\"=\"11\";\"subkey\"=\"3\";\"value\"=\"q\"};"
- "{\"key\"=\"12\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
TYtUploadedTablesMock::TPtr ytUploadedTablesMock = MakeYtUploadedTablesMock();
NYql::NFmr::IYtService::TPtr ytService = MakeYtServiceMock(ytUploadedTablesMock);
std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
- TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1"};
+ std::vector<TTableRange> ranges = {{"test_part_id"}};
+ TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges};
TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster");
- TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3"};
+ TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges};
TTaskTableRef input_table_ref_1 = {input_1};
TTaskTableRef input_table_ref_2 = {input_2};
TTaskTableRef input_table_ref_3 = {input_3};
@@ -321,10 +277,12 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id");
- tableDataServicePtr->Put(input_1.TableId, TableContent_1);
+ auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0);
+ auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0);
+ tableDataServicePtr->Put(key_1, TableContent_1);
// No table in Yt
// ytUploadedTablesMock->AddTable(input_2, TableContent_2);
- tableDataServicePtr->Put(input_3.TableId, TableContent_3);
+ tableDataServicePtr->Put(key_3, TableContent_3);
ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_raw_table_reader_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_raw_table_reader_ut.cpp
new file mode 100644
index 0000000000..4d45e054b6
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_raw_table_reader_ut.cpp
@@ -0,0 +1,98 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_output_stream.h>
+#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h>
+#include <yt/yql/providers/yt/fmr/table_data_service/local/table_data_service.h>
+
+namespace NYql::NFmr {
+
+TString originalTableContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};"
+ "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};"
+ "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};"
+ "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};";
+
+Y_UNIT_TEST_SUITE(FmrRawTableReaderTests) {
+ Y_UNIT_TEST(ReadOneChunkSmallPart) {
+ size_t chunkSize = 1024;
+
+ ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+
+ TFmrOutputStreamSettings settings{chunkSize};
+ TFmrOutputStream outputStream("tableId", "partId", tableDataServicePtr, settings);
+
+ outputStream.Write(originalTableContent.data(), originalTableContent.size());
+ outputStream.Flush();
+
+ TFmrRawTableReaderSettings readerSettings{1};
+ std::vector<TTableRange> tableRanges = {{"partId", 0, 1}};
+ TFmrRawTableReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings);
+
+ char buffer[10];
+ reader.Read(buffer, 10);
+ TString readTableContentPart = {buffer, 10};
+ auto originalTableContentPart = originalTableContent.substr(0, 10);
+ UNIT_ASSERT_NO_DIFF(readTableContentPart, originalTableContentPart);
+ }
+
+ Y_UNIT_TEST(ReadAllOneChunk) {
+ size_t chunkSize = 1024;
+
+ ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+
+ TFmrOutputStreamSettings settings{chunkSize};
+ TFmrOutputStream outputStream("tableId", "partId", tableDataServicePtr, settings);
+
+ outputStream.Write(originalTableContent.data(), originalTableContent.size());
+ outputStream.Flush();
+
+ TFmrRawTableReaderSettings readerSettings{1};
+ std::vector<TTableRange> tableRanges = {{"partId", 0, 1}};
+ TFmrRawTableReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings);
+
+ auto readTableContent = reader.ReadAll();
+ UNIT_ASSERT_NO_DIFF(readTableContent, originalTableContent);
+ }
+
+ Y_UNIT_TEST(ReadAllMultipleChunks) {
+ size_t chunkSize = 32;
+
+ ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+
+ TFmrOutputStreamSettings settings{chunkSize};
+ TFmrOutputStream outputStream("tableId", "partId", tableDataServicePtr, settings);
+
+ outputStream.Write(originalTableContent.data(), originalTableContent.size());
+ outputStream.Flush();
+
+ TFmrRawTableReaderSettings readerSettings{1};
+
+ auto maxChunk = originalTableContent.size() / chunkSize + 1;
+ std::vector<TTableRange> tableRanges = {{"partId", 0, maxChunk}};
+ TFmrRawTableReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings);
+
+ auto readTableContent = reader.ReadAll();
+ UNIT_ASSERT_NO_DIFF(readTableContent, originalTableContent);
+ }
+
+ Y_UNIT_TEST(ReadAllMultipleChunksBigReadAhead) {
+ size_t chunkSize = 32;
+
+ ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
+
+ TFmrOutputStreamSettings settings{chunkSize};
+ TFmrOutputStream outputStream("tableId", "partId", tableDataServicePtr, settings);
+
+ outputStream.Write(originalTableContent.data(), originalTableContent.size());
+ outputStream.Flush();
+
+ TFmrRawTableReaderSettings readerSettings{5};
+
+ auto maxChunk = originalTableContent.size() / chunkSize + 1;
+ std::vector<TTableRange> tableRanges = {{"partId", 0, maxChunk}};
+ TFmrRawTableReader reader("tableId", tableRanges, tableDataServicePtr, readerSettings);
+
+ auto readTableContent = reader.ReadAll();
+ UNIT_ASSERT_NO_DIFF(readTableContent, originalTableContent);
+ }
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/job/impl/ya.make b/yt/yql/providers/yt/fmr/job/impl/ya.make
index 89865c44a4..969c305ed1 100644
--- a/yt/yql/providers/yt/fmr/job/impl/ya.make
+++ b/yt/yql/providers/yt/fmr/job/impl/ya.make
@@ -3,6 +3,7 @@ LIBRARY()
SRCS(
yql_yt_job_impl.cpp
yql_yt_output_stream.cpp
+ yql_yt_raw_table_reader.cpp
)
PEERDIR(
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
index 4e18a287a1..40d0a26760 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
@@ -4,6 +4,7 @@
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h>
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_output_stream.h>
+#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h>
#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
#include <yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h>
#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h>
@@ -15,8 +16,8 @@ namespace NYql::NFmr {
class TFmrJob: public IFmrJob {
public:
- TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag)
- : TableDataService_(tableDataService), YtService_(ytService), CancelFlag_(cancelFlag)
+ TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings)
+ : TableDataService_(tableDataService), YtService_(ytService), CancelFlag_(cancelFlag), Settings_(settings)
{
}
@@ -57,60 +58,68 @@ public:
}
virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const TClusterConnection& clusterConnection) override {
- const auto ytTable = params.Output;
- const auto cluster = params.Output.Cluster;
- const auto path = params.Output.Path;
- const auto tableId = params.Input.TableId;
-
- YQL_CLOG(DEBUG, FastMapReduce) << "Uploading " << cluster << '.' << path;
+ try {
+ const auto ytTable = params.Output;
+ const auto cluster = params.Output.Cluster;
+ const auto path = params.Output.Path;
+ const auto tableId = params.Input.TableId;
+ const auto tableRanges = params.Input.TableRanges;
- TMaybe<TString> getResult = TableDataService_->Get(tableId).GetValueSync();
+ YQL_CLOG(DEBUG, FastMapReduce) << "Uploading " << cluster << '.' << path;
- if (!getResult) {
- YQL_CLOG(ERROR, FastMapReduce) << "Table " << tableId << " not found";
- return TError("Table not found");
- }
+ auto res = GetFmrTableStream(params.Input);
+ auto err = std::get_if<TError>(&res);
+ if (err) {
+ return *err;
+ }
+ auto inputStream = std::get_if<THolder<IInputStream>>(&res);
- TString tableContent = getResult.GetRef();
- TStringInput inputStream(tableContent);
+ // How to raise if not found
- YtService_->Upload(ytTable, inputStream, clusterConnection);
+ YtService_->Upload(ytTable, *inputStream->get(), clusterConnection);
- return TStatistics();
+ return TStatistics();
+ } catch (...) {
+ return TError(CurrentExceptionMessage());
+ }
}
virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const TClusterConnection& clusterConnection) override {
// расширить таск парамс. добавить туда мету
- const auto inputs = params.Input;
- const auto output = params.Output;
+ try {
+ const auto inputs = params.Input;
+ const auto output = params.Output;
- YQL_CLOG(DEBUG, FastMapReduce) << "Merging " << inputs.size() << " inputs";
+ YQL_CLOG(DEBUG, FastMapReduce) << "Merging " << inputs.size() << " inputs";
- TFmrOutputStream outputStream(output.TableId, output.PartId, TableDataService_);
+ TFmrOutputStream outputStream(output.TableId, output.PartId, TableDataService_);
- ui32 totalRowsCount = 0;
+ ui32 totalRowsCount = 0;
- for (const auto& inputTableRef : inputs) {
- if (CancelFlag_->load()) {
- return TError("Canceled");
- }
- ui64 rowsCount = 0; // TMP Todo get rows count from input stats
- auto res = GetTableInputStream(inputTableRef, rowsCount, clusterConnection);
- totalRowsCount += rowsCount;
+ for (const auto& inputTableRef : inputs) {
+ if (CancelFlag_->load()) {
+ return TError("Canceled");
+ }
+ ui64 rowsCount = 0; // TMP Todo get rows count from input stats
+ auto res = GetTableInputStream(inputTableRef, rowsCount, clusterConnection);
+ totalRowsCount += rowsCount;
- auto err = std::get_if<TError>(&res);
- if (err) {
- return *err;
+ auto err = std::get_if<TError>(&res);
+ if (err) {
+ return *err;
+ }
+ auto inputStream = std::get_if<THolder<IInputStream>>(&res);
+ TransferData(inputStream->get(), &outputStream);
}
- auto inputStream = std::get_if<THolder<IInputStream>>(&res);
- TransferData(inputStream->get(), &outputStream);
- }
- outputStream.Flush();
+ outputStream.Flush();
- TTableStats stats = outputStream.GetStats();
- stats.Rows = totalRowsCount;
+ TTableStats stats = outputStream.GetStats();
+ stats.Rows = totalRowsCount;
- return TStatistics({{output, stats}});
+ return TStatistics({{output, stats}});
+ } catch (...) {
+ return TError(CurrentExceptionMessage());
+ }
}
private:
@@ -141,33 +150,40 @@ private:
}
std::variant<THolder<IInputStream>, TError> GetFmrTableStream(const TFmrTableInputRef& fmrTable) {
- auto res = TableDataService_->Get(fmrTable.TableId).GetValueSync();
- if (!res) {
- return TError("Table not found");
- }
- auto tableContent = *res;
- TStringStream stream;
- stream << tableContent;
- return MakeHolder<TStringStream>(stream);
+
+ auto settings = TFmrRawTableReaderSettings(Settings_.ReadAheadChunks);
+ return MakeHolder<TFmrRawTableReader>(TFmrRawTableReader(
+ fmrTable.TableId,
+ fmrTable.TableRanges,
+ TableDataService_,
+ settings
+ ));
}
private:
ITableDataService::TPtr TableDataService_;
IYtService::TPtr YtService_;
std::shared_ptr<std::atomic<bool>> CancelFlag_;
+ const TFmrJobSettings Settings_;
};
-IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag) {
- return MakeIntrusive<TFmrJob>(tableDataService, ytService, cancelFlag);
+IFmrJob::TPtr MakeFmrJob(
+ ITableDataService::TPtr tableDataService,
+ IYtService::TPtr ytService,
+ std::shared_ptr<std::atomic<bool>> cancelFlag,
+ const TFmrJobSettings& settings
+) {
+ return MakeIntrusive<TFmrJob>(tableDataService, ytService, cancelFlag, settings);
}
TJobResult RunJob(
TTask::TPtr task,
ITableDataService::TPtr tableDataService,
IYtService::TPtr ytService,
- std::shared_ptr<std::atomic<bool>> cancelFlag
+ std::shared_ptr<std::atomic<bool>> cancelFlag,
+ const TFmrJobSettings& settings
) {
- IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, cancelFlag);
+ IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, cancelFlag, settings);
auto processTask = [job, task] (auto&& taskParams) {
using T = std::decay_t<decltype(taskParams)>;
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
index 1da096ee23..284b68040d 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
@@ -6,8 +6,12 @@
namespace NYql::NFmr {
-IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag);
+struct TFmrJobSettings {
+ ui64 ReadAheadChunks = 1;
+};
-TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag);
+IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings);
+
+TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings = {});
} // namespace NYql
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.cpp
new file mode 100644
index 0000000000..c458be66b6
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.cpp
@@ -0,0 +1,83 @@
+#include "yql_yt_raw_table_reader.h"
+#include <yql/essentials/utils/log/log.h>
+#include <yt/yql/providers/yt/fmr/utils/table_data_service_key.h>
+
+namespace NYql::NFmr {
+
+TFmrRawTableReader::TFmrRawTableReader(
+ const TString& tableId,
+ const std::vector<TTableRange>& tableRanges,
+ ITableDataService::TPtr tableDataService,
+ const TFmrRawTableReaderSettings& settings
+)
+ : TableId_(tableId)
+ , TableRanges_(tableRanges)
+ , TableDataService_(tableDataService)
+ , Settings_(settings)
+{
+ ReadAhead();
+}
+
+size_t TFmrRawTableReader::DoRead(void* buf, size_t len) {
+ ui64 totalRead = 0;
+ char* output = static_cast<char*>(buf);
+
+ while (len > 0) {
+ ui64 available = DataBuffer_.size() - CurrentPosition_;
+ if (available > 0) {
+ ui64 toCopy = std::min(available, len);
+
+ auto start = DataBuffer_.Begin() + CurrentPosition_;
+ auto end = start + toCopy;
+ std::copy(start, end, output);
+
+ CurrentPosition_ += toCopy;
+ output += toCopy;
+ len -= toCopy;
+ totalRead += toCopy;
+ } else if (!PendingChunks_.empty()) {
+ auto chunk = PendingChunks_.front();
+ TMaybe<TString> data;
+ try {
+ data = chunk.Data.GetValueSync();
+ } catch (...) {
+ ythrow yexception() << "Error reading chunk:" << chunk.Meta << "Error: " << CurrentExceptionMessage();
+ }
+
+ if (data) {
+ DataBuffer_.Assign(data->data(), data->size());
+ } else {
+ ythrow yexception() << "No data for chunk:" << chunk.Meta;
+ }
+
+ PendingChunks_.pop();
+ CurrentPosition_ = 0;
+ available = DataBuffer_.size();
+
+ ReadAhead();
+ } else {
+ break;
+ }
+ }
+ return totalRead;
+}
+
+void TFmrRawTableReader::ReadAhead() {
+ while (PendingChunks_.size() < Settings_.ReadAheadChunks) {
+ if (CurrentRange_ < TableRanges_.size()) {
+ auto currentPartId = TableRanges_[CurrentRange_].PartId;
+ if (CurrentChunk_ < TableRanges_[CurrentRange_].MaxChunk) {
+ auto key = GetTableDataServiceKey(TableId_, currentPartId, CurrentChunk_);
+ PendingChunks_.push({TableDataService_->Get(key), {TableId_, currentPartId, CurrentChunk_}});
+ CurrentChunk_++;
+ } else {
+ CurrentRange_++;
+ }
+ }
+ else {
+ break;
+ }
+ }
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h
new file mode 100644
index 0000000000..ccebc661b4
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_reader.h
@@ -0,0 +1,45 @@
+#pragma once
+
+#include <queue>
+#include <util/generic/buffer.h>
+#include <util/stream/input.h>
+#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
+#include <yt/yql/providers/yt/fmr/table_data_service/interface/table_data_service.h>
+
+namespace NYql::NFmr {
+
+struct TFmrRawTableReaderSettings {
+ ui64 ReadAheadChunks = 1;
+};
+
+struct TPendingFmrChunk {
+ NThreading::TFuture<TMaybe<TString>> Data;
+ TFmrChunkMeta Meta;
+};
+
+class TFmrRawTableReader: public IInputStream {
+ public:
+ TFmrRawTableReader(
+ const TString& tableId,
+ const std::vector<TTableRange>& tableRanges,
+ ITableDataService::TPtr tableDataService,
+ const TFmrRawTableReaderSettings& settings
+ );
+ protected:
+ size_t DoRead(void* buf, size_t len) override;
+ private:
+ void ReadAhead();
+ private:
+ const TString TableId_;
+ const std::vector<TTableRange> TableRanges_;
+ ITableDataService::TPtr TableDataService_;
+ const TFmrRawTableReaderSettings Settings_;
+
+ ui64 CurrentRange_ = 0;
+ ui64 CurrentChunk_ = 0;
+ ui64 CurrentPosition_ = 0;
+ TBuffer DataBuffer_;
+ std::queue<TPendingFmrChunk> PendingChunks_;
+};
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
index c3a182b486..319cf20801 100644
--- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
@@ -10,6 +10,10 @@ TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, co
return MakeIntrusive<TTaskState>(taskStatus, taskId, taskErrorMessage, stats);
}
+TString TFmrChunkMeta::ToString() const {
+ return TStringBuilder() << TableId << ":" << PartId << ":" << std::to_string(Chunk);
+}
+
} // namespace NYql::NFmr
template<>
@@ -22,3 +26,8 @@ void Out<NYql::NFmr::TFmrError>(IOutputStream& out, const NYql::NFmr::TFmrError&
}
out << error.ErrorMessage;
}
+
+template<>
+void Out<NYql::NFmr::TFmrChunkMeta>(IOutputStream& out, const NYql::NFmr::TFmrChunkMeta& meta) {
+ out << meta.ToString();
+}
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
index a4b1ebf37c..4c5bd1252f 100644
--- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
@@ -3,6 +3,7 @@
#include <util/digest/numeric.h>
#include <util/generic/maybe.h>
#include <util/generic/string.h>
+#include <util/string/builder.h>
#include <vector>
namespace NYql::NFmr {
@@ -66,6 +67,14 @@ struct TTableRange {
ui64 MaxChunk = 1; // Пока такой дефолт
};
+struct TFmrChunkMeta {
+ TString TableId;
+ TString PartId;
+ ui64 Chunk = 0; // сделать out метод
+
+ TString ToString() const;
+};
+
struct TFmrTableInputRef {
TString TableId;
std::vector<TTableRange> TableRanges;