aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorcdzyura171 <cdzyura171@yandex-team.com>2025-04-03 14:50:13 +0300
committercdzyura171 <cdzyura171@yandex-team.com>2025-04-03 15:12:42 +0300
commitf8722f9dd0311cb20863ccc2b382cd7db56aff43 (patch)
tree85163aa2d1a153df604b524635ecead51226347a
parent9aadc5b85f8524240e9f25315f51d47666a01e8b (diff)
downloadydb-f8722f9dd0311cb20863ccc2b382cd7db56aff43.tar.gz
Add merge to fast map reduce gateway
Add merge to fast map reduce gateway commit_hash:83b68d130a725a0ff04ee777063f130df571849e
-rw-r--r--yql/essentials/providers/common/provider/yql_provider.cpp2
-rw-r--r--yql/essentials/providers/common/provider/yql_provider.h2
-rw-r--r--yt/yql/providers/yt/common/yql_yt_settings.cpp7
-rw-r--r--yt/yql/providers/yt/common/yql_yt_settings.h1
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp7
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h1
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp7
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h3
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp8
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp75
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h6
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h2
-rw-r--r--yt/yql/providers/yt/fmr/proto/coordinator.proto1
-rw-r--r--yt/yql/providers/yt/fmr/proto/request_options.proto1
-rw-r--r--yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp7
-rw-r--r--yt/yql/providers/yt/fmr/request_options/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp4
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h8
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp8
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h1
-rw-r--r--yt/yql/providers/yt/gateway/file/yql_yt_file.cpp2
-rw-r--r--yt/yql/providers/yt/gateway/fmr/ya.make5
-rw-r--r--yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp438
25 files changed, 403 insertions, 196 deletions
diff --git a/yql/essentials/providers/common/provider/yql_provider.cpp b/yql/essentials/providers/common/provider/yql_provider.cpp
index 7f3ef39e9a5..f5fdda40527 100644
--- a/yql/essentials/providers/common/provider/yql_provider.cpp
+++ b/yql/essentials/providers/common/provider/yql_provider.cpp
@@ -1084,7 +1084,7 @@ bool FillUsedFilesImpl(
return childrenOk;
}
-static void GetToken(const TString& string, TString& out, const TTypeAnnotationContext& type) {
+void GetToken(const TString& string, TString& out, const TTypeAnnotationContext& type) {
auto separator = string.find(":");
const auto p0 = string.substr(0, separator);
if (p0 == "api") {
diff --git a/yql/essentials/providers/common/provider/yql_provider.h b/yql/essentials/providers/common/provider/yql_provider.h
index efddf213ff8..4410f1cbc2c 100644
--- a/yql/essentials/providers/common/provider/yql_provider.h
+++ b/yql/essentials/providers/common/provider/yql_provider.h
@@ -216,6 +216,8 @@ void TransformerStatsToYson(const TString& name, const IGraphTransformer::TStati
TString TransformerStatsToYson(const IGraphTransformer::TStatistics& stats, NYson::EYsonFormat format
= NYson::EYsonFormat::Pretty);
+void GetToken(const TString& string, TString& out, const TTypeAnnotationContext& type);
+
void FillSecureParams(const TExprNode::TPtr& node, const TTypeAnnotationContext& types, THashMap<TString, TString>& secureParams);
bool FillUsedFiles(const TExprNode& node, TUserDataTable& files, const TTypeAnnotationContext& types, TExprContext& ctx, const TUserDataTable& crutches = {});
diff --git a/yt/yql/providers/yt/common/yql_yt_settings.cpp b/yt/yql/providers/yt/common/yql_yt_settings.cpp
index e8f019ea168..27d6a032a1c 100644
--- a/yt/yql/providers/yt/common/yql_yt_settings.cpp
+++ b/yt/yql/providers/yt/common/yql_yt_settings.cpp
@@ -366,6 +366,13 @@ TYtConfiguration::TYtConfiguration(TTypeAnnotationContext& typeCtx)
OperationSpec[cluster] = spec;
HybridDqExecution = false;
});
+ REGISTER_SETTING(*this, FmrOperationSpec)
+ .Parser([](const TString& v) { return NYT::NodeFromYsonString(v, ::NYson::EYsonType::Node); })
+ .Validator([] (const TString&, const NYT::TNode& value) {
+ if (!value.IsMap()) {
+ throw yexception() << "Expected yson map, but got " << value.GetType();
+ }
+ });
REGISTER_SETTING(*this, Annotations)
.Parser([](const TString& v) { return NYT::NodeFromYsonString(v); })
.Validator([] (const TString&, const NYT::TNode& value) {
diff --git a/yt/yql/providers/yt/common/yql_yt_settings.h b/yt/yql/providers/yt/common/yql_yt_settings.h
index e2dd916629d..5a291b5dab2 100644
--- a/yt/yql/providers/yt/common/yql_yt_settings.h
+++ b/yt/yql/providers/yt/common/yql_yt_settings.h
@@ -183,6 +183,7 @@ struct TYtSettings {
NCommon::TConfSetting<TString, true> DockerImage;
NCommon::TConfSetting<NYT::TNode, true> JobEnv;
NCommon::TConfSetting<NYT::TNode, true> OperationSpec;
+ NCommon::TConfSetting<NYT::TNode, true> FmrOperationSpec;
NCommon::TConfSetting<NYT::TNode, true> Annotations;
NCommon::TConfSetting<NYT::TNode, true> StartedBy;
NCommon::TConfSetting<NYT::TNode, true> Description;
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ya.make b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make
index 11d323d1289..5a16fbe0ce3 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/ya.make
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make
@@ -7,6 +7,7 @@ SRCS(
PEERDIR(
library/cpp/random_provider
library/cpp/threading/future
+ library/cpp/yson/node
yt/yql/providers/yt/fmr/coordinator/interface
yql/essentials/utils/log
yql/essentials/utils
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
index 9c14f304036..1831e7f1acc 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
@@ -64,7 +64,12 @@ public:
TString taskId = GenerateId();
auto taskParams = MakeDefaultTaskParamsFromOperation(request.OperationParams);
- TTask::TPtr createdTask = MakeTask(request.TaskType, taskId, taskParams, request.SessionId, request.ClusterConnection);
+ TMaybe<NYT::TNode> jobSettings = Nothing();
+ auto fmrOperationSpec = request.FmrOperationSpec;
+ if (fmrOperationSpec && fmrOperationSpec->IsMap() && fmrOperationSpec->HasKey("job_settings")) {
+ jobSettings = (*fmrOperationSpec)["job_settings"];
+ }
+ TTask::TPtr createdTask = MakeTask(request.TaskType, taskId, taskParams, request.SessionId, request.ClusterConnection, jobSettings);
Tasks_[taskId] = TCoordinatorTaskInfo{.Task = createdTask, .TaskStatus = ETaskStatus::Accepted, .OperationId = operationId};
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
index d8a526096ae..b3c06dbe5c3 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
@@ -1,6 +1,7 @@
#pragma once
#include <library/cpp/random_provider/random_provider.h>
+#include <library/cpp/yson/node/node.h>
#include <util/system/mutex.h>
#include <util/system/guard.h>
#include <util/generic/queue.h>
diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/ya.make b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/ya.make
index 62df1953063..cd29edc85d7 100644
--- a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/ya.make
+++ b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/ya.make
@@ -5,6 +5,7 @@ SRCS(
)
PEERDIR(
+ library/cpp/yson/node
yt/yql/providers/yt/fmr/coordinator/interface
yt/yql/providers/yt/fmr/proto
yt/yql/providers/yt/fmr/request_options/proto_helpers
diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp
index fbb4a641a08..8c244c3f1aa 100644
--- a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp
@@ -1,4 +1,5 @@
#include "yql_yt_coordinator_proto_helpers.h"
+#include <library/cpp/yson/node/node_io.h>
namespace NYql::NFmr {
@@ -71,6 +72,9 @@ NProto::TStartOperationRequest StartOperationRequestToProto(const TStartOperatio
protoStartOperationRequest.SetNumRetries(startOperationRequest.NumRetries);
auto protoClusterConnection = ClusterConnectionToProto(startOperationRequest.ClusterConnection);
protoStartOperationRequest.MutableClusterConnection()->Swap(&protoClusterConnection);
+ if (startOperationRequest.FmrOperationSpec) {
+ protoStartOperationRequest.SetFmrOperationSpec(NYT::NodeToYsonString(*startOperationRequest.FmrOperationSpec));
+ }
return protoStartOperationRequest;
}
@@ -84,6 +88,9 @@ TStartOperationRequest StartOperationRequestFromProto(const NProto::TStartOperat
}
startOperationRequest.NumRetries = protoStartOperationRequest.GetNumRetries();
startOperationRequest.ClusterConnection = ClusterConnectionFromProto(protoStartOperationRequest.GetClusterConnection());
+ if (protoStartOperationRequest.HasFmrOperationSpec()) {
+ startOperationRequest.FmrOperationSpec = NYT::NodeFromYsonString(protoStartOperationRequest.GetFmrOperationSpec());
+ }
return startOperationRequest;
}
diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
index 15a06b2d590..15ecd6c97a5 100644
--- a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
+++ b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
@@ -25,7 +25,8 @@ struct TStartOperationRequest {
TString SessionId;
TMaybe<TString> IdempotencyKey = Nothing();
ui32 NumRetries = 1; // Not supported yet
- TClusterConnection ClusterConnection = {};
+ TClusterConnection ClusterConnection = {}; // TODO - change to map
+ TMaybe<NYT::TNode> FmrOperationSpec = Nothing();
};
struct TStartOperationResponse {
diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
index ac3a9263c61..7b17dee907b 100644
--- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
@@ -156,7 +156,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
TYtTableRef output = TYtTableRef("test_cluster", "test_path");
TUploadTaskParams params = TUploadTaskParams(input, output);
- TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id");
+ TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", TClusterConnection());
auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0);
tableDataServicePtr->Put(key, GetBinaryYson(TableContent_1));
ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
@@ -178,7 +178,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
TYtTableRef output = TYtTableRef("test_cluster", "test_path");
TUploadTaskParams params = TUploadTaskParams(input, output);
- TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id");
+ TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", TClusterConnection());
// No tables in tableDataService
ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
@@ -205,7 +205,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
auto params = TMergeTaskParams(inputs, output);
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
- TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id");
+ TTask::TPtr task = MakeTask(ETaskType::Merge, "test_task_id", params, "test_session_id", TClusterConnection());
auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0);
auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0);
@@ -239,7 +239,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
auto params = TMergeTaskParams(inputs, output);
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
- TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id");
+ TTask::TPtr task = MakeTask(ETaskType::Merge, "test_task_id", params, "test_session_id", TClusterConnection());
auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0);
auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0);
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
index aa3d0b51122..c8da1df4801 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
@@ -16,15 +16,12 @@ namespace NYql::NFmr {
class TFmrJob: public IFmrJob {
public:
- TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings)
+ TFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TMaybe<TFmrJobSettings>& settings)
: TableDataService_(tableDataService), YtService_(ytService), CancelFlag_(cancelFlag), Settings_(settings)
{
}
- virtual std::variant<TError, TStatistics> Download(
- const TDownloadTaskParams& params,
- const TClusterConnection& clusterConnection
- ) override {
+ virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const TClusterConnection& clusterConnection) override {
try {
const auto ytTable = params.Input;
const auto cluster = params.Input.Cluster;
@@ -36,9 +33,9 @@ public:
YQL_CLOG(DEBUG, FastMapReduce) << "Downloading " << cluster << '.' << path;
auto ytTableReader = YtService_->MakeReader(ytTable, clusterConnection); // TODO - pass YtReader settings from Gateway
- auto tableDataServiceWriter = TFmrTableDataServiceWriter(tableId, partId, TableDataService_, Settings_.FmrTableDataServiceWriterSettings);
+ auto tableDataServiceWriter = TFmrTableDataServiceWriter(tableId, partId, TableDataService_, GetFmrTableDataServiceWriterSettings());
- ParseRecords(*ytTableReader, tableDataServiceWriter, Settings_.ParseRecordSettings.BlockCount, Settings_.ParseRecordSettings.BlockSize);
+ ParseRecords(*ytTableReader, tableDataServiceWriter, GetParseRecordSettings().BlockCount, GetParseRecordSettings().BlockSize);
tableDataServiceWriter.Flush();
TTableStats stats = tableDataServiceWriter.GetStats();
@@ -59,9 +56,9 @@ public:
YQL_CLOG(DEBUG, FastMapReduce) << "Uploading " << cluster << '.' << path;
- auto tableDataServiceReader = TFmrTableDataServiceReader(tableId, tableRanges, TableDataService_, Settings_.FmrTableDataServiceReaderSettings);
- auto ytTableWriter = YtService_->MakeWriter(ytTable, clusterConnection); // TODO - pass YtReader settings from Gateway
- ParseRecords(tableDataServiceReader, *ytTableWriter, Settings_.ParseRecordSettings.BlockCount, Settings_.ParseRecordSettings.BlockSize);
+ auto tableDataServiceReader = TFmrTableDataServiceReader(tableId, tableRanges, TableDataService_, GetFmrTableDataServiceReaderSettings());
+ auto ytTableWriter = YtService_->MakeWriter(ytTable, clusterConnection);
+ ParseRecords(tableDataServiceReader, *ytTableWriter, GetParseRecordSettings().BlockCount, GetParseRecordSettings().BlockSize);
ytTableWriter->Flush();
return TStatistics();
@@ -71,6 +68,7 @@ public:
}
virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const TClusterConnection& clusterConnection) override {
+ // TODO - unordered_map<ClusterConnection>
// расширить таск парамс. добавить туда мету
try {
const auto inputs = params.Input;
@@ -78,13 +76,13 @@ public:
YQL_CLOG(DEBUG, FastMapReduce) << "Merging " << inputs.size() << " inputs";
- auto tableDataServiceWriter = TFmrTableDataServiceWriter(output.TableId, output.PartId, TableDataService_, Settings_.FmrTableDataServiceWriterSettings);
+ auto tableDataServiceWriter = TFmrTableDataServiceWriter(output.TableId, output.PartId, TableDataService_, GetFmrTableDataServiceWriterSettings());
for (const auto& inputTableRef : inputs) {
if (CancelFlag_->load()) {
return TError("Canceled");
}
auto inputTableReader = GetTableInputStream(inputTableRef, clusterConnection);
- ParseRecords(*inputTableReader, tableDataServiceWriter, Settings_.ParseRecordSettings.BlockCount, Settings_.ParseRecordSettings.BlockSize);
+ ParseRecords(*inputTableReader, tableDataServiceWriter, GetParseRecordSettings().BlockCount, GetParseRecordSettings().BlockSize);
}
tableDataServiceWriter.Flush();
return TStatistics({{output, tableDataServiceWriter.GetStats()}});
@@ -101,17 +99,29 @@ private:
if (ytTable) {
return YtService_->MakeReader(*ytTable, clusterConnection); // TODO - pass YtReader settings from Gateway
} else if (fmrTable) {
- return MakeIntrusive<TFmrTableDataServiceReader>(fmrTable->TableId, fmrTable->TableRanges, TableDataService_, Settings_.FmrTableDataServiceReaderSettings);
+ return MakeIntrusive<TFmrTableDataServiceReader>(fmrTable->TableId, fmrTable->TableRanges, TableDataService_, GetFmrTableDataServiceReaderSettings());
} else {
ythrow yexception() << "Unsupported table type";
}
}
+ TParseRecordSettings GetParseRecordSettings() {
+ return Settings_ ? Settings_->ParseRecordSettings : TParseRecordSettings();
+ }
+
+ TFmrTableDataServiceReaderSettings GetFmrTableDataServiceReaderSettings() {
+ return Settings_ ? Settings_->FmrTableDataServiceReaderSettings : TFmrTableDataServiceReaderSettings();
+ }
+
+ TFmrTableDataServiceWriterSettings GetFmrTableDataServiceWriterSettings() {
+ return Settings_ ? Settings_->FmrTableDataServiceWriterSettings : TFmrTableDataServiceWriterSettings();
+ }
+
private:
ITableDataService::TPtr TableDataService_;
IYtService::TPtr YtService_;
std::shared_ptr<std::atomic<bool>> CancelFlag_;
- const TFmrJobSettings Settings_;
+ TMaybe<TFmrJobSettings> Settings_;
};
IFmrJob::TPtr MakeFmrJob(
@@ -128,9 +138,10 @@ TJobResult RunJob(
ITableDataService::TPtr tableDataService,
IYtService::TPtr ytService,
std::shared_ptr<std::atomic<bool>> cancelFlag,
- const TFmrJobSettings& settings
+ const TMaybe<TFmrJobSettings>& settings
) {
- IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, cancelFlag, settings);
+ TFmrJobSettings jobSettings = settings ? *settings : GetJobSettingsFromTask(task);
+ IFmrJob::TPtr job = MakeFmrJob(tableDataService, ytService, cancelFlag, jobSettings);
auto processTask = [job, task] (auto&& taskParams) {
using T = std::decay_t<decltype(taskParams)>;
@@ -160,4 +171,36 @@ TJobResult RunJob(
return {ETaskStatus::Completed, *statistics};
};
+TFmrJobSettings GetJobSettingsFromTask(TTask::TPtr task) {
+ if (!task->JobSettings) {
+ return TFmrJobSettings();
+ }
+ auto jobSettings = *task->JobSettings;
+ YQL_ENSURE(jobSettings.IsMap());
+ TFmrJobSettings resultSettings{};
+ if (jobSettings.HasKey("parse_record_settings")) {
+ auto& parseRecordSettings = jobSettings["parse_record_settings"];
+ if (parseRecordSettings.HasKey("block_count")) {
+ resultSettings.ParseRecordSettings.BlockCount = parseRecordSettings["block_count"].AsInt64();
+ }
+ if (parseRecordSettings.HasKey("block_size")) {
+ resultSettings.ParseRecordSettings.BlockSize = parseRecordSettings["block_size"].AsInt64();
+ // TODO - support different formats (B, MB, ...)
+ }
+ }
+ if (jobSettings.HasKey("fmr_reader_settings")) {
+ auto& fmrReaderSettings = jobSettings["fmr_reader_settings"];
+ if (fmrReaderSettings.HasKey("read_ahead_chunks")) {
+ resultSettings.FmrTableDataServiceReaderSettings.ReadAheadChunks = fmrReaderSettings["read_ahead_chunks"].AsInt64();
+ }
+ }
+ if (jobSettings.HasKey("fmr_writer_settings")) {
+ auto& fmrWriterSettings = jobSettings["fmr_writer_settings"];
+ if (fmrWriterSettings.HasKey("chunk_size")) {
+ resultSettings.FmrTableDataServiceWriterSettings.ChunkSize = fmrWriterSettings["chunk_size"].AsInt64();
+ }
+ }
+ return resultSettings;
+}
+
} // namespace NYql
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
index 7010a2eb964..cb21e95f9c9 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h
@@ -9,7 +9,7 @@ namespace NYql::NFmr {
struct TParseRecordSettings {
ui64 BlockCount = 1;
- ui64 BlockSize = 1024 * 1024; // 1Mb
+ ui64 BlockSize = 1024 * 1024;
};
struct TFmrJobSettings {
@@ -20,6 +20,8 @@ struct TFmrJobSettings {
IFmrJob::TPtr MakeFmrJob(ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings = {});
-TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TFmrJobSettings& settings = {});
+TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IYtService::TPtr ytService, std::shared_ptr<std::atomic<bool>> cancelFlag, const TMaybe<TFmrJobSettings>& settings = Nothing());
+
+TFmrJobSettings GetJobSettingsFromTask(TTask::TPtr task);
} // namespace NYql
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h
index a708b795432..2e27cd4f072 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h
@@ -10,7 +10,7 @@
namespace NYql::NFmr {
struct TFmrTableDataServiceWriterSettings {
- ui64 ChunkSize = 1024 * 1024; // 1Mb
+ ui64 ChunkSize = 1024 * 1024;
};
class TFmrTableDataServiceWriter: public NYT::TRawTableWriter {
diff --git a/yt/yql/providers/yt/fmr/proto/coordinator.proto b/yt/yql/providers/yt/fmr/proto/coordinator.proto
index 99d0ca23617..a104cf26eac 100644
--- a/yt/yql/providers/yt/fmr/proto/coordinator.proto
+++ b/yt/yql/providers/yt/fmr/proto/coordinator.proto
@@ -22,6 +22,7 @@ message TStartOperationRequest {
optional string IdempotencyKey = 4;
uint32 NumRetries = 5;
TClusterConnection ClusterConnection = 6;
+ optional string FmrOperationSpec = 7;
}
message TStartOperationResponse {
diff --git a/yt/yql/providers/yt/fmr/proto/request_options.proto b/yt/yql/providers/yt/fmr/proto/request_options.proto
index a2580eb39f8..4bd35eacaa4 100644
--- a/yt/yql/providers/yt/fmr/proto/request_options.proto
+++ b/yt/yql/providers/yt/fmr/proto/request_options.proto
@@ -156,6 +156,7 @@ message TTask {
string SessionId = 4;
optional uint32 NumRetries = 5;
TClusterConnection ClusterConnection = 6;
+ optional string JobSettings = 7;
}
message TTaskState {
diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
index 0ce6dea92da..0860c009962 100644
--- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
+++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
@@ -1,4 +1,5 @@
#include "yql_yt_request_proto_helpers.h"
+#include <library/cpp/yson/node/node_io.h>
namespace NYql::NFmr {
@@ -399,6 +400,9 @@ NProto::TTask TaskToProto(const TTask& task) {
protoTask.SetNumRetries(task.NumRetries);
auto clusterConnection = ClusterConnectionToProto(task.ClusterConnection);
protoTask.MutableClusterConnection()->Swap(&clusterConnection);
+ if (task.JobSettings) {
+ protoTask.SetJobSettings(NYT::NodeToYsonString(*task.JobSettings));
+ }
return protoTask;
}
@@ -410,6 +414,9 @@ TTask TaskFromProto(const NProto::TTask& protoTask) {
task.SessionId = protoTask.GetSessionId();
task.NumRetries = protoTask.GetNumRetries();
task.ClusterConnection = ClusterConnectionFromProto(protoTask.GetClusterConnection());
+ if (protoTask.HasJobSettings()) {
+ task.JobSettings = NYT::NodeFromYsonString(protoTask.GetJobSettings());
+ }
return task;
}
diff --git a/yt/yql/providers/yt/fmr/request_options/ya.make b/yt/yql/providers/yt/fmr/request_options/ya.make
index 9e330848c29..df82ec258fe 100644
--- a/yt/yql/providers/yt/fmr/request_options/ya.make
+++ b/yt/yql/providers/yt/fmr/request_options/ya.make
@@ -5,6 +5,7 @@ SRCS(
)
PEERDIR(
+ library/cpp/yson/node
library/cpp/threading/future
)
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
index 319cf208012..0dc3650855c 100644
--- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
@@ -2,8 +2,8 @@
namespace NYql::NFmr {
-TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection) {
- return MakeIntrusive<TTask>(taskType, taskId, taskParams, sessionId, clusterConnection);
+TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection, const TMaybe<NYT::TNode>& jobSettings) {
+ return MakeIntrusive<TTask>(taskType, taskId, taskParams, sessionId, clusterConnection, jobSettings);
}
TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& taskErrorMessage, const TStatistics& stats) {
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
index 2c2e94a057a..de18d91fa09 100644
--- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
@@ -1,5 +1,6 @@
#pragma once
+#include <library/cpp/yson/node/node.h>
#include <util/digest/numeric.h>
#include <util/generic/maybe.h>
#include <util/generic/string.h>
@@ -167,8 +168,8 @@ struct TClusterConnection {
struct TTask: public TThrRefBase {
TTask() = default;
- TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection, ui32 numRetries = 1)
- : TaskType(taskType), TaskId(taskId), TaskParams(taskParams), SessionId(sessionId), ClusterConnection(clusterConnection), NumRetries(numRetries)
+ TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection, const TMaybe<NYT::TNode> & jobSettings = Nothing(), ui32 numRetries = 1)
+ : TaskType(taskType), TaskId(taskId), TaskParams(taskParams), SessionId(sessionId), ClusterConnection(clusterConnection), JobSettings(jobSettings), NumRetries(numRetries)
{
}
@@ -177,6 +178,7 @@ struct TTask: public TThrRefBase {
TTaskParams TaskParams = {};
TString SessionId;
TClusterConnection ClusterConnection = {};
+ TMaybe<NYT::TNode> JobSettings = {};
ui32 NumRetries; // Not supported yet
using TPtr = TIntrusivePtr<TTask>;
@@ -197,7 +199,7 @@ struct TTaskState: public TThrRefBase {
using TPtr = TIntrusivePtr<TTaskState>;
};
-TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection = TClusterConnection{});
+TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection = TClusterConnection{}, const TMaybe<NYT::TNode>& jobSettings = Nothing());
TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& taskErrorMessage = Nothing(), const TStatistics& stats = TStatistics());
diff --git a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp b/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp
index c6e535fb715..d618766edeb 100644
--- a/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp
+++ b/yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.cpp
@@ -28,13 +28,13 @@ public:
NYT::TRawTableWriterPtr MakeWriter(
const TYtTableRef& ytTable,
const TClusterConnection& clusterConnection,
- const TYtWriterSettings& writerSetttings
+ const TYtWriterSettings& /*writerSetttings*/
) override {
auto client = CreateClient(clusterConnection);
auto transaction = client->AttachTransaction(GetGuid(clusterConnection.TransactionId));
- auto path = NYT::TRichYPath(NYT::AddPathPrefix(ytTable.Path, "//"));
- auto richPath = NYT::TRichYPath(path).Append(writerSetttings.AppendMode);
- return transaction->CreateRawWriter(richPath, NYT::TFormat::YsonBinary());
+ TString ytPath = NYT::AddPathPrefix(ytTable.Path, "//");
+ auto richPath = NYT::TRichYPath(ytPath).Append(true);
+ return transaction->CreateRawWriter(richPath, NYT::TFormat::YsonBinary()); // TODO - support writerOptions
}
private:
diff --git a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h b/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h
index 1661c188955..7ed2a47fc00 100644
--- a/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h
+++ b/yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h
@@ -11,7 +11,6 @@ struct TYtReaderSettings {
};
struct TYtWriterSettings {
- bool AppendMode = true;
};
class IYtService: public TThrRefBase {
diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp
index 529e0806df0..429880e66e1 100644
--- a/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp
+++ b/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp
@@ -1593,7 +1593,7 @@ private:
}
TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& /*options*/) override {
- ythrow yexception() << "GetClusterConnection should not be called for file gateway";
+ return TClusterConnectionResult();
}
diff --git a/yt/yql/providers/yt/gateway/fmr/ya.make b/yt/yql/providers/yt/gateway/fmr/ya.make
index c424b95d0b9..d00a1b6ee49 100644
--- a/yt/yql/providers/yt/gateway/fmr/ya.make
+++ b/yt/yql/providers/yt/gateway/fmr/ya.make
@@ -5,13 +5,16 @@ SRCS(
)
PEERDIR(
+ yql/essentials/providers/common/codec
yql/essentials/utils/log
- yt/cpp/mapreduce/client
+ yt/cpp/mapreduce/common
+ yt/cpp/mapreduce/interface
yt/yql/providers/yt/gateway/lib
yt/yql/providers/yt/gateway/native
yt/yql/providers/yt/expr_nodes
yt/yql/providers/yt/fmr/coordinator/interface
yt/yql/providers/yt/lib/config_clusters
+ yt/yql/providers/yt/lib/schema
yt/yql/providers/yt/provider
)
diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
index aab5bdb6dde..dd5160d07c2 100644
--- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
+++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
@@ -2,16 +2,20 @@
#include <thread>
+#include <yt/cpp/mapreduce/common/helpers.h>
#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h>
#include <yt/yql/providers/yt/gateway/lib/yt_helpers.h>
#include <yt/yql/providers/yt/gateway/native/yql_yt_native.h>
+#include <yt/yql/providers/yt/lib/schema/schema.h>
#include <yt/yql/providers/yt/provider/yql_yt_helpers.h>
+#include <yql/essentials/providers/common/codec/yql_codec_type_flags.h>
#include <yql/essentials/utils/log/log.h>
#include <yql/essentials/utils/log/profile.h>
#include <util/generic/ptr.h>
+#include <util/string/split.h>
#include <util/thread/pool.h>
using namespace NThreading;
@@ -27,7 +31,7 @@ enum class ETablePresenceStatus {
Both
};
-struct TDownloadTableToFmrResult: public NCommon::TOperationResult {}; // Download Yt -> Fmr TableDataService
+struct TFmrOperationResult: public NCommon::TOperationResult {};
class TFmrYtGateway final: public TYtForwardingGatewayBase {
public:
@@ -41,7 +45,7 @@ public:
auto getOperationStatusesFunc = [&] {
while (!StopFmrGateway_) {
with_lock(SessionStates_->Mutex) {
- auto checkOperationStatuses = [&] <typename T> (std::unordered_map<TString, TPromise<T>>& operationStatuses, const TString& sessionId) {
+ auto checkOperationStatuses = [&] (std::unordered_map<TString, TPromise<TFmrOperationResult>>& operationStatuses, const TString& sessionId) {
for (auto& [operationId, promise]: operationStatuses) {
YQL_CLOG(TRACE, FastMapReduce) << "Sending get operation request to coordinator with operationId: " << operationId;
@@ -53,9 +57,15 @@ public:
with_lock(SessionStates_->Mutex) {
bool operationCompleted = getOperationStatus != EOperationStatus::Accepted && getOperationStatus != EOperationStatus::InProgress;
if (operationCompleted) {
- // operation finished, set value in future returned in Publish / Download
+ // operation finished, set value in future returned in DoMerge / DoUpload
bool hasCompletedSuccessfully = getOperationStatus == EOperationStatus::Completed;
- SendOperationCompletionSignal(promise, hasCompletedSuccessfully, operationErrorMessages);
+ if (hasCompletedSuccessfully) {
+ TFmrOperationResult fmrOperationResult{};
+ fmrOperationResult.SetSuccess();
+ promise.SetValue(fmrOperationResult);
+ } else {
+ promise.SetException(JoinRange(' ', operationErrorMessages.begin(), operationErrorMessages.end()));
+ }
YQL_CLOG(DEBUG, FastMapReduce) << "Sending delete operation request to coordinator with operationId: " << operationId;
auto deleteOperationFuture = Coordinator_->DeleteOperation({operationId});
deleteOperationFuture.Subscribe([&, sessionId, operationId] (const auto& deleteFuture) {
@@ -66,8 +76,7 @@ public:
YQL_ENSURE( SessionStates_->Sessions.contains(sessionId));
auto& sessionInfo = SessionStates_->Sessions[sessionId];
auto& operationStates = sessionInfo.OperationStates;
- operationStates.DownloadOperationStatuses.erase(operationId);
- operationStates.UploadOperationStatuses.erase(operationId);
+ operationStates.OperationStatuses.erase(operationId);
}
});
}
@@ -78,8 +87,7 @@ public:
for (auto [sessionId, sessionInfo]: SessionStates_->Sessions) {
auto& operationStates = sessionInfo.OperationStates;
- checkOperationStatuses(operationStates.DownloadOperationStatuses, sessionId);
- checkOperationStatuses(operationStates.UploadOperationStatuses, sessionId);
+ checkOperationStatuses(operationStates.OperationStatuses, sessionId);
}
}
Sleep(TimeToSleepBetweenGetOperationRequests_);
@@ -93,138 +101,73 @@ public:
GetOperationStatusesThread_.join();
}
- TFuture<TPublishResult> Publish(const TExprNode::TPtr& node, TExprContext& ctx, TPublishOptions&& options) final {
+ TFuture<TRunResult> Run(const TExprNode::TPtr& node, TExprContext& ctx, TRunOptions&& options) final {
YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
- if (!Coordinator_) {
- return Slave_->Publish(node, ctx, std::move(options));
- }
- auto publish = TYtPublish(node);
+ auto nodePos = ctx.GetPosition(node->Pos());
+ TYtOpBase opBase(node);
TString sessionId = options.SessionId();
- auto cluster = publish.DataSink().Cluster().StringValue();
- auto token = options.Config()->Auth.Get();
- TString transformedInputPath;
- TString userName = GetUsername(sessionId);
- for (auto out: publish.Input()) {
- auto outTable = GetOutTable(out).Cast<TYtOutTable>();
- TStringBuf inputPath = outTable.Name().Value();
- transformedInputPath = NYql::TransformPath(GetTablesTmpFolder(*options.Config()), inputPath, true, userName);
- break;
+ if (auto op = opBase.Maybe<TYtMerge>()) {
+ auto ytMerge = op.Cast();
+ std::vector<TYtTableRef> inputTables = GetMergeInputTables(ytMerge);
+ TYtTableRef outputTable = GetMergeOutputTable(ytMerge);
+ auto future = DoMerge(inputTables, outputTable, std::move(options));
+ return future.Apply([this, pos = nodePos, outputTable = std::move(outputTable), options = std::move(options)] (const TFuture<TFmrOperationResult>& f) {
+ try {
+ f.GetValue(); // rethrow error if any
+ TString sessionId = options.SessionId();
+ auto config = options.Config();
+ TString transformedOutputTableId = GetTransformedPath(outputTable.Path, sessionId, config);
+ TString fmrOutputTableId = outputTable.Cluster + "." + transformedOutputTableId;
+ SetTablePresenceStatus(fmrOutputTableId, sessionId, ETablePresenceStatus::OnlyInFmr);
+ TRunResult result;
+ result.OutTableStats.emplace_back(outputTable.Path, MakeIntrusive<TYtTableStatInfo>()); // TODO - add statistics?
+ result.OutTableStats.back().second->Id = "fmr_" + fmrOutputTableId;
+ result.SetSuccess();
+ return MakeFuture<TRunResult>(std::move(result));
+ } catch (...) {
+ return MakeFuture(ResultFromCurrentException<TRunResult>(pos));
+ }
+ });
+ } else {
+ return Slave_->Run(node, ctx, std::move(options));
}
+ }
- // TODO - handle several inputs in Publish, use ColumnGroups, Run Merge
-
- auto outputPath = publish.Publish().Name().StringValue();
- auto idempotencyKey = GenerateId();
+ TFuture<TPublishResult> Publish(const TExprNode::TPtr& node, TExprContext& ctx, TPublishOptions&& options) final {
+ TString sessionId = options.SessionId();
+ YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
+ auto nodePos = ctx.GetPosition(node->Pos());
+ auto publish = TYtPublish(node);
- auto fmrTableId = cluster + "." + outputPath;
+ auto cluster = publish.DataSink().Cluster().StringValue();
+ std::vector<TFmrTableRef> fmrTableIds;
+ auto config = options.Config();
- TFuture<TDownloadTableToFmrResult> downloadToFmrFuture;
- TFuture<void> downloadedSuccessfully;
+ std::vector<TFuture<TFmrOperationResult>> uploadFmrTablesToYtFutures;
- with_lock(SessionStates_->Mutex) {
- auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses;
+ for (auto out: publish.Input()) {
+ auto outTableWithCluster = GetOutTableWithCluster(out);
+ auto outTable = GetOutTable(out).Cast<TYtOutTable>();
+ TStringBuf inputPath = outTable.Name().Value();
+ TString transformedInputPath = GetTransformedPath(ToString(inputPath), sessionId, config);
+ auto outputBase = out.Operation().Cast<TYtOutputOpBase>().Ptr();
- if (!tablePresenceStatuses.contains(fmrTableId)) {
- TYtTableRef ytTable{.Path = transformedInputPath, .Cluster = cluster};
- TFmrTableRef fmrTable{.TableId = fmrTableId};
- tablePresenceStatuses[fmrTableId] = ETablePresenceStatus::Both;
- downloadToFmrFuture = DownloadToFmrTableDataSerivce(ytTable, fmrTable, sessionId, options.Config());
- downloadedSuccessfully = downloadToFmrFuture.Apply([downloadedSuccessfully] (auto& downloadFuture) {
- auto downloadResult = downloadFuture.GetValueSync();
- });
- } else {
- downloadedSuccessfully = MakeFuture();
- }
+ TFmrTableRef fmrTableRef = TFmrTableRef{outTableWithCluster.second + "." + transformedInputPath};
+ uploadFmrTablesToYtFutures.emplace_back(DoUpload(fmrTableRef, sessionId, config, outputBase, ctx));
}
- downloadedSuccessfully.Wait(); // blocking until download to fmr finishes
-
- TUploadOperationParams uploadOperationParams{
- .Input = TFmrTableRef{fmrTableId},
- .Output = TYtTableRef{outputPath, cluster}
- };
-
- auto clusterConnectionOptions = TClusterConnectionOptions(options.SessionId())
- .Cluster(cluster).Config(options.Config());
- auto clusterConnection = GetClusterConnection(std::move(clusterConnectionOptions));
- YQL_ENSURE(clusterConnection.Success());
-
- TStartOperationRequest uploadRequest{
- .TaskType = ETaskType::Upload,
- .OperationParams = uploadOperationParams,
- .SessionId = sessionId,
- .IdempotencyKey=idempotencyKey,
- .NumRetries=1,
- .ClusterConnection = TClusterConnection{
- .TransactionId = clusterConnection.TransactionId,
- .YtServerName = clusterConnection.YtServerName,
- .Token = clusterConnection.Token
- }
- };
-
- auto promise = NewPromise<TPublishResult>();
- auto future = promise.GetFuture();
- YQL_CLOG(DEBUG, FastMapReduce) << "Starting upload to yt table: " << cluster + "." + outputPath;
- auto uploadOperationResponseFuture = Coordinator_->StartOperation(uploadRequest);
- uploadOperationResponseFuture.Subscribe([this, promise = std::move(promise), sessionId] (const auto& uploadFuture) {
- TStartOperationResponse startOperationResponse = uploadFuture.GetValueSync();
- TString operationId = startOperationResponse.OperationId;
- with_lock(SessionStates_->Mutex) {
- YQL_ENSURE(SessionStates_->Sessions.contains(sessionId));
- auto& operationStates = SessionStates_->Sessions[sessionId].OperationStates;
- auto& uploadOperationStatuses = operationStates.UploadOperationStatuses;
- YQL_ENSURE(!uploadOperationStatuses.contains(operationId));
- uploadOperationStatuses[operationId] = promise;
- }
- });
- return future;
- }
-
- TFuture<TDownloadTableToFmrResult> DownloadToFmrTableDataSerivce(
- const TYtTableRef& ytTableRef, const TFmrTableRef& fmrTableRef, const TString& sessionId, TYtSettings::TConstPtr& config)
- {
- YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
- TString fmrTableId = fmrTableRef.TableId;
- TDownloadOperationParams downloadOperationParams{
- .Input = ytTableRef,
- .Output = {fmrTableId}
- };
+ auto outputPath = publish.Publish().Name().StringValue();
auto idempotencyKey = GenerateId();
- auto clusterConnectionOptions = TClusterConnectionOptions(sessionId)
- .Cluster(ytTableRef.Cluster).Config(config);
- auto clusterConnection = GetClusterConnection(std::move(clusterConnectionOptions));
- YQL_ENSURE(clusterConnection.Success());
- TStartOperationRequest downloadRequest{
- .TaskType = ETaskType::Download,
- .OperationParams = downloadOperationParams,
- .SessionId = sessionId,
- .IdempotencyKey = idempotencyKey,
- .NumRetries=1,
- .ClusterConnection = TClusterConnection{
- .TransactionId = clusterConnection.TransactionId,
- .YtServerName = clusterConnection.YtServerName,
- .Token = clusterConnection.Token
- }
- };
- YQL_CLOG(DEBUG, FastMapReduce) << "Starting download from yt table: " << fmrTableId;
-
- auto promise = NewPromise<TDownloadTableToFmrResult>();
- auto future = promise.GetFuture();
-
- auto downloadOperationResponseFuture = Coordinator_->StartOperation(downloadRequest);
- downloadOperationResponseFuture.Subscribe([this, promise = std::move(promise), sessionId] (const auto& downloadFuture) {
- TStartOperationResponse downloadOperationResponse = downloadFuture.GetValueSync();
- TString operationId = downloadOperationResponse.OperationId;
- with_lock(SessionStates_->Mutex) {
- auto& operationStates = SessionStates_->Sessions[sessionId].OperationStates;
- auto& downloadOperationStatuses = operationStates.DownloadOperationStatuses;
- YQL_ENSURE(!downloadOperationStatuses.contains(operationId));
- downloadOperationStatuses[operationId] = promise;
+ return WaitExceptionOrAll(uploadFmrTablesToYtFutures).Apply([&, pos = nodePos, curNode = std::move(node), options = std::move(options)] (const TFuture<void>& f) mutable {
+ try {
+ f.GetValue(); // rethrow error if any
+ return Slave_->Publish(curNode, ctx, std::move(options));
+ } catch (...) {
+ return MakeFuture(ResultFromCurrentException<TPublishResult>(pos));
}
});
- return future;
}
TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& options) override {
@@ -267,29 +210,233 @@ public:
YQL_ENSURE(sessions.contains(sessionId));
auto& operationStates = sessions[sessionId].OperationStates;
- auto cancelOperationsFunc = [&] <typename T> (std::unordered_map<TString, TPromise<T>>& operationStatuses) {
+ auto cancelOperationsFunc = [&] (std::unordered_map<TString, TPromise<TFmrOperationResult>>& operationStatuses) {
std::vector<TFuture<TDeleteOperationResponse>> cancelOperationsFutures;
for (auto& [operationId, promise]: operationStatuses) {
cancelOperationsFutures.emplace_back(Coordinator_->DeleteOperation({operationId}));
}
NThreading::WaitAll(cancelOperationsFutures).GetValueSync();
- for (auto& [operationId, promise]: operationStatuses) {
- SendOperationCompletionSignal(promise, false);
- }
};
- cancelOperationsFunc(operationStates.DownloadOperationStatuses);
- cancelOperationsFunc(operationStates.UploadOperationStatuses);
+ cancelOperationsFunc(operationStates.OperationStatuses);
}
Slave_->CleanupSession(std::move(options)).Wait();
return MakeFuture();
}
private:
+ TString GenerateId() {
+ return GetGuidAsString(RandomProvider_->GenGuid());
+ }
+
+ TString GetUsername(const TString& sessionId) {
+ with_lock(SessionStates_->Mutex) {
+ YQL_ENSURE(SessionStates_->Sessions.contains(sessionId));
+ auto& session = SessionStates_->Sessions[sessionId];
+ return session.UserName;
+ }
+ }
+
+ TString GetTransformedPath(const TString& path, const TString& sessionId, TYtSettings::TConstPtr& config) {
+ TString username = GetUsername(sessionId);
+ return NYql::TransformPath(GetTablesTmpFolder(*config), path, true, username);
+ }
+
+ void SetTablePresenceStatus(const TString& fmrTableId, const TString& sessionId, ETablePresenceStatus newStatus) {
+ with_lock(SessionStates_->Mutex) {
+ auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses;
+ tablePresenceStatuses[fmrTableId] = newStatus;
+ }
+ }
+
+ TMaybe<ETablePresenceStatus> GetTablePresenceStatus(const TString& fmrTableId, const TString& sessionId) {
+ with_lock(SessionStates_->Mutex) {
+ auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses;
+ if (!tablePresenceStatuses.contains(fmrTableId)) {
+ return Nothing();
+ }
+ return tablePresenceStatuses[fmrTableId];
+ }
+ }
+
+ std::vector<TYtTableRef> GetMergeInputTables(const TYtMerge& ytMerge) {
+ auto input = ytMerge.Maybe<TYtTransientOpBase>().Cast().Input();
+ std::vector<TYtTableRef> inputTables;
+ for (auto section: input.Cast<TYtSectionList>()) {
+ for (auto path: section.Paths()) {
+ TYtPathInfo pathInfo(path);
+ TYtTableRef ytTable{.Path = pathInfo.Table->Name, .Cluster = pathInfo.Table->Cluster};
+ inputTables.emplace_back(ytTable);
+ }
+ }
+ return inputTables;
+ }
+
+ TYtTableRef GetMergeOutputTable(const TYtMerge& ytMerge) {
+ auto output = ytMerge.Maybe<TYtOutputOpBase>().Cast().Output();
+ std::vector<TYtTableRef> outputTables;
+ for (auto table: output) {
+ TYtOutTableInfo tableInfo(table);
+ TString outTableName = tableInfo.Name;
+ if (outTableName.empty()) {
+ outTableName = TStringBuilder() << "tmp/" << GetGuidAsString(RandomProvider_->GenGuid());
+ }
+ outputTables.emplace_back(outTableName, tableInfo.Cluster);
+ }
+ YQL_ENSURE(outputTables.size() == 1);
+ return outputTables[0];
+ }
+
+ TString GetClusterFromMergeTables(const std::vector<TYtTableRef>& inputTables, TYtTableRef& outputTable) {
+ std::unordered_set<TString> clusters;
+ for (auto& [path, cluster]: inputTables) {
+ clusters.emplace(cluster);
+ }
+ YQL_ENSURE(clusters.size() == 1);
+ TString cluster = *clusters.begin();
+ if (outputTable.Cluster) {
+ YQL_ENSURE(outputTable.Cluster == cluster);
+ } else {
+ outputTable.Cluster = cluster;
+ }
+ return cluster;
+ }
+
+ TClusterConnection GetTablesClusterConnection(const TString& cluster, const TString& sessionId, TYtSettings::TConstPtr& config) {
+ auto clusterConnectionOptions = TClusterConnectionOptions(sessionId).Cluster(cluster).Config(config);
+ auto clusterConnection = GetClusterConnection(std::move(clusterConnectionOptions));
+ return TClusterConnection{
+ .TransactionId = clusterConnection.TransactionId,
+ .YtServerName = clusterConnection.YtServerName,
+ .Token = clusterConnection.Token
+ };
+ }
+
+ TFuture<TFmrOperationResult> GetRunningOperationFuture(const TStartOperationRequest& startOperationRequest, const TString& sessionId) {
+ auto promise = NewPromise<TFmrOperationResult>();
+ auto future = promise.GetFuture();
+ auto startOperationResponseFuture = Coordinator_->StartOperation(startOperationRequest);
+ startOperationResponseFuture.Subscribe([this, promise = std::move(promise), sessionId] (const auto& mergeFuture) {
+ TStartOperationResponse mergeOperationResponse = mergeFuture.GetValueSync();
+ TString operationId = mergeOperationResponse.OperationId;
+ with_lock(SessionStates_->Mutex) {
+ auto& operationStates = SessionStates_->Sessions[sessionId].OperationStates;
+ auto& operationStatuses = operationStates.OperationStatuses;
+ YQL_ENSURE(!operationStatuses.contains(operationId));
+ operationStatuses[operationId] = promise;
+ }
+ });
+ return future;
+ }
+
+ TFuture<TFmrOperationResult> DoUpload(const TFmrTableRef& fmrTableRef, const TString& sessionId, TYtSettings::TConstPtr& config, TExprNode::TPtr outputOpBase, TExprContext& ctx) {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId);
+ std::vector<TString> ytTableInfo;
+ StringSplitter(fmrTableRef.TableId).SplitByString(".").AddTo(&ytTableInfo);
+ YQL_ENSURE(ytTableInfo.size() == 2);
+ TString outputCluster = ytTableInfo[0], outputPath = ytTableInfo[1];
+ auto tablePresenceStatus = GetTablePresenceStatus(fmrTableRef.TableId, sessionId);
+ if (!tablePresenceStatus || *tablePresenceStatus != ETablePresenceStatus::OnlyInFmr) {
+ YQL_CLOG(DEBUG, FastMapReduce) << " We assume table " << fmrTableRef.TableId << " should be present in yt, not uploading from fmr";
+ TFmrOperationResult fmrOperationResult = TFmrOperationResult();
+ fmrOperationResult.SetSuccess();
+ return MakeFuture(fmrOperationResult);
+ }
+
+ TUploadOperationParams uploadOperationParams{
+ .Input = fmrTableRef,
+ .Output = TYtTableRef{.Path = outputPath, .Cluster = outputCluster}
+ };
+
+ auto clusterConnection = GetTablesClusterConnection(outputCluster, sessionId, config);
+ TStartOperationRequest uploadRequest{
+ .TaskType = ETaskType::Upload,
+ .OperationParams = uploadOperationParams,
+ .SessionId = sessionId,
+ .IdempotencyKey = GenerateId(),
+ .NumRetries=1,
+ .ClusterConnection = clusterConnection,
+ .FmrOperationSpec = config->FmrOperationSpec.Get(outputCluster)
+ };
+
+ auto prepareOptions = TPrepareOptions(sessionId)
+ .Config(config);
+ auto prepareFuture = Slave_->Prepare(outputOpBase, ctx, std::move(prepareOptions));
+
+ return prepareFuture.Apply([this, uploadRequest = std::move(uploadRequest), sessionId = std::move(sessionId), fmrTableId = std::move(fmrTableRef.TableId)] (const TFuture<TRunResult>& f) {
+ try {
+ f.GetValue(); // rethrow error if any
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId);
+ YQL_CLOG(DEBUG, FastMapReduce) << "Starting upload from fmr to yt for table: " << fmrTableId;
+ return GetRunningOperationFuture(uploadRequest, sessionId).Apply([this, sessionId = std::move(sessionId), fmrTableId = std::move(fmrTableId)] (const TFuture<TFmrOperationResult>& f) {
+ try {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId);
+ auto fmrUploadResult = f.GetValue();
+ SetTablePresenceStatus(fmrTableId, sessionId, ETablePresenceStatus::Both);
+ return f;
+ } catch (...) {
+ YQL_CLOG(ERROR, FastMapReduce) << CurrentExceptionMessage();
+ return MakeFuture(ResultFromCurrentException<TFmrOperationResult>());
+ }
+ });
+ } catch (...) {
+ YQL_CLOG(ERROR, FastMapReduce) << CurrentExceptionMessage();
+ return MakeFuture(ResultFromCurrentException<TFmrOperationResult>());
+ }
+ });
+ }
+
+ TFuture<TFmrOperationResult> DoMerge(const std::vector<TYtTableRef>& inputTables, TYtTableRef& outputTable, TRunOptions&& options) {
+ TString sessionId = options.SessionId();
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId);
+ YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
+ auto cluster = GetClusterFromMergeTables(inputTables, outputTable); // Can set outputTable.Cluster if empty
+
+ TString outputTableId = outputTable.Path, outputCluster = outputTable.Cluster;
+ TString transformedOutputTableId = GetTransformedPath(outputTableId, sessionId, options.Config());
+ TFmrTableRef fmrOutputTable{.TableId = outputCluster + "." + transformedOutputTableId};
+
+ std::vector<TOperationTableRef> mergeInputTables;
+ for (auto& ytTable: inputTables) {
+ TString fmrTableId = ytTable.Cluster + "." + ytTable.Path;
+ auto tablePresenceStatus = GetTablePresenceStatus(fmrTableId, sessionId);
+ if (!tablePresenceStatus) {
+ SetTablePresenceStatus(fmrTableId, sessionId, ETablePresenceStatus::OnlyInYt);
+ }
+
+ if (tablePresenceStatus && *tablePresenceStatus != ETablePresenceStatus::OnlyInYt) {
+ // table is in fmr, do not download
+ mergeInputTables.emplace_back(TFmrTableRef{.TableId = fmrTableId});
+ } else {
+ mergeInputTables.emplace_back(ytTable);
+ }
+ }
+
+ TMergeOperationParams mergeOperationParams{.Input = mergeInputTables,.Output = fmrOutputTable};
+ auto clusterConnection = GetTablesClusterConnection(cluster, sessionId, options.Config());
+ TStartOperationRequest mergeOperationRequest{
+ .TaskType = ETaskType::Merge,
+ .OperationParams = mergeOperationParams,
+ .SessionId = sessionId,
+ .IdempotencyKey = GenerateId(),
+ .NumRetries = 1,
+ .ClusterConnection = clusterConnection,
+ .FmrOperationSpec = options.Config()->FmrOperationSpec.Get(outputCluster)
+ };
+
+ std::vector<TString> inputPaths;
+ std::transform(inputTables.begin(),inputTables.end(), std::back_inserter(inputPaths), [](const TYtTableRef& ytTableRef){
+ return ytTableRef.Path;}
+ );
+
+ YQL_CLOG(DEBUG, FastMapReduce) << "Starting merge from yt tables: " << JoinRange(' ', inputPaths.begin(), inputPaths.end());
+ return GetRunningOperationFuture(mergeOperationRequest, sessionId);
+ }
+
+private:
struct TFmrGatewayOperationsState {
- std::unordered_map<TString, TPromise<TPublishResult>> UploadOperationStatuses = {}; // operationId -> promise which we set when operation completes
- std::unordered_map<TString, TPromise<TDownloadTableToFmrResult>> DownloadOperationStatuses = {};
+ std::unordered_map<TString, TPromise<TFmrOperationResult>> OperationStatuses = {}; // operationId -> promise which we set when operation completes
};
struct TSessionInfo {
@@ -309,31 +456,6 @@ private:
TDuration TimeToSleepBetweenGetOperationRequests_;
std::thread GetOperationStatusesThread_;
std::atomic<bool> StopFmrGateway_;
-
- TString GenerateId() {
- return GetGuidAsString(RandomProvider_->GenGuid());
- }
-
- template <std::derived_from<NCommon::TOperationResult> T>
- void SendOperationCompletionSignal(TPromise<T> promise, bool completedSuccessfully = false, const std::vector<TFmrError>& errorMessages = {}) {
- YQL_ENSURE(!promise.HasValue());
- T commonOperationResult{};
- if (completedSuccessfully) {
- commonOperationResult.SetSuccess();
- } else if (!errorMessages.empty()) {
- auto exception = yexception() << "Operation failed with errors: " << JoinSeq(" ", errorMessages);
- commonOperationResult.SetException(exception);
- }
- promise.SetValue(commonOperationResult);
- }
-
- TString GetUsername(const TString& sessionId) {
- with_lock(SessionStates_->Mutex) {
- YQL_ENSURE(SessionStates_->Sessions.contains(sessionId));
- auto& session = SessionStates_->Sessions[sessionId];
- return session.UserName;
- }
- }
};
} // namespace