summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorcdzyura171 <[email protected]>2025-04-09 23:26:09 +0300
committercdzyura171 <[email protected]>2025-04-09 23:44:51 +0300
commit31b03d8d9aa830228c878c46c2f8e1f64f2cc872 (patch)
tree6f1e72ee7319ccac0417cf79af2c2efa882da74c
parentb8acce08462fa7c4482205ca9bf0f2da7cf1cd36 (diff)
Add fmrrun, create file yt service, support multiple cluster connections
Add fmrrun, create file yt service and support multiple cluster conn, fix issues commit_hash:d06778387ceec03820347f73b05a0281033804ed
-rw-r--r--yql/tools/yqlrun/lib/yqlrun_lib.cpp7
-rw-r--r--yql/tools/yqlrun/lib/yqlrun_lib.h3
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp4
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp45
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h1
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp12
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h2
-rw-r--r--yt/yql/providers/yt/fmr/fmr_tool_lib/ya.make21
-rw-r--r--yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp37
-rw-r--r--yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h19
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp59
-rw-r--r--yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp38
-rw-r--r--yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h6
-rw-r--r--yt/yql/providers/yt/fmr/proto/coordinator.proto2
-rw-r--r--yt/yql/providers/yt/fmr/proto/request_options.proto3
-rw-r--r--yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp18
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp9
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h10
-rw-r--r--yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp1
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp2
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h2
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/file/ut/ya.make14
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/file/ut/yql_yt_file_yt_service_ut.cpp32
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/file/ya.make19
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.cpp68
-rw-r--r--yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h7
-rw-r--r--yt/yql/providers/yt/gateway/file/yql_yt_file.cpp3
-rw-r--r--yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.cpp22
-rw-r--r--yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h4
-rw-r--r--yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp172
-rw-r--r--yt/yql/providers/yt/gateway/native/yql_yt_native.cpp10
-rw-r--r--yt/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp4
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.cpp8
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.h4
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_gateway.h16
-rw-r--r--yt/yql/tools/ytrun/lib/ya.make8
-rw-r--r--yt/yql/tools/ytrun/lib/ytrun_lib.cpp44
-rw-r--r--yt/yql/tools/ytrun/lib/ytrun_lib.h2
39 files changed, 507 insertions, 232 deletions
diff --git a/yql/tools/yqlrun/lib/yqlrun_lib.cpp b/yql/tools/yqlrun/lib/yqlrun_lib.cpp
index 0eba257d9a2..aad7657ea4a 100644
--- a/yql/tools/yqlrun/lib/yqlrun_lib.cpp
+++ b/yql/tools/yqlrun/lib/yqlrun_lib.cpp
@@ -107,14 +107,17 @@ TYqlRunTool::TYqlRunTool()
AddClusterMapping(TString{"plato"}, TString{YtProviderName});
AddProviderFactory([this]() -> NYql::TDataProviderInitializer {
- auto yqlNativeServices = NFile::TYtFileServices::Make(GetFuncRegistry().Get(), TablesMapping_, GetFileStorage(), TmpDir_, KeepTemp_, TablesDirMapping_);
- auto ytNativeGateway = CreateYtFileGateway(yqlNativeServices);
+ auto ytNativeGateway = CreateYtGateway();
auto optimizerFactory = CreateCboFactory();
return GetYtNativeDataProviderInitializer(ytNativeGateway, optimizerFactory, {});
});
SetPeepholePipelineConfigurator(&PEEPHOLE_CONFIG_INSTANCE);
+}
+IYtGateway::TPtr TYqlRunTool::CreateYtGateway() {
+ auto yqlNativeServices = NFile::TYtFileServices::Make(GetFuncRegistry().Get(), TablesMapping_, GetFileStorage(), TmpDir_, KeepTemp_, TablesDirMapping_);
+ return CreateYtFileGateway(yqlNativeServices);
}
IOptimizerFactory::TPtr TYqlRunTool::CreateCboFactory() {
diff --git a/yql/tools/yqlrun/lib/yqlrun_lib.h b/yql/tools/yqlrun/lib/yqlrun_lib.h
index be33617c62e..ab358e28dbb 100644
--- a/yql/tools/yqlrun/lib/yqlrun_lib.h
+++ b/yql/tools/yqlrun/lib/yqlrun_lib.h
@@ -1,5 +1,6 @@
#pragma once
+#include <yt/yql/providers/yt/provider/yql_yt_gateway.h>
#include <yql/essentials/core/cbo/cbo_optimizer_new.h>
#include <yql/essentials/tools/yql_facade_run/yql_facade_run.h>
@@ -15,6 +16,8 @@ public:
protected:
virtual IOptimizerFactory::TPtr CreateCboFactory();
+ virtual IYtGateway::TPtr CreateYtGateway();
+
private:
THashMap<TString, TString> TablesMapping_;
THashMap<TString, TString> TablesDirMapping_;
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
index 2841e738332..fef5f4b110b 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
@@ -54,7 +54,7 @@ TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Do
.TaskType = taskType,
.OperationParams = operationParams,
.IdempotencyKey = "IdempotencyKey",
- .ClusterConnection = TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}
+ .ClusterConnections = {{"Cluster.Path", TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}}}
};
}
@@ -67,7 +67,7 @@ std::vector<TStartOperationRequest> CreateSeveralOperationRequests(
.TaskType = taskType,
.OperationParams = operationParams,
.IdempotencyKey = "IdempotencyKey_" + ToString(i),
- .ClusterConnection = TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn", .Token = "token"}
+ .ClusterConnections = {{"Cluster.Path", TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}}}
};
}
return startOperationRequests;
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ya.make b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make
index 5a16fbe0ce3..32ffb2ed14f 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/ya.make
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make
@@ -8,6 +8,7 @@ PEERDIR(
library/cpp/random_provider
library/cpp/threading/future
library/cpp/yson/node
+ yt/cpp/mapreduce/common
yt/yql/providers/yt/fmr/coordinator/interface
yql/essentials/utils/log
yql/essentials/utils
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
index 1831e7f1acc..2d9a68042e5 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
@@ -1,4 +1,5 @@
#include <thread>
+#include <yt/cpp/mapreduce/common/helpers.h>
#include <yql/essentials/utils/log/log.h>
#include <yql/essentials/utils/yql_panic.h>
#include "yql_yt_coordinator_impl.h"
@@ -37,7 +38,8 @@ public:
RandomProvider_(settings.RandomProvider),
StopCoordinator_(false),
TimeToSleepBetweenClearKeyRequests_(settings.TimeToSleepBetweenClearKeyRequests),
- IdempotencyKeyStoreTime_(settings.IdempotencyKeyStoreTime)
+ IdempotencyKeyStoreTime_(settings.IdempotencyKeyStoreTime),
+ DefaultFmrOperationSpec_(settings.DefaultFmrOperationSpec)
{
StartClearingIdempotencyKeys();
}
@@ -62,15 +64,9 @@ public:
}
TString taskId = GenerateId();
-
auto taskParams = MakeDefaultTaskParamsFromOperation(request.OperationParams);
- TMaybe<NYT::TNode> jobSettings = Nothing();
- auto fmrOperationSpec = request.FmrOperationSpec;
- if (fmrOperationSpec && fmrOperationSpec->IsMap() && fmrOperationSpec->HasKey("job_settings")) {
- jobSettings = (*fmrOperationSpec)["job_settings"];
- }
- TTask::TPtr createdTask = MakeTask(request.TaskType, taskId, taskParams, request.SessionId, request.ClusterConnection, jobSettings);
+ TTask::TPtr createdTask = MakeTask(request.TaskType, taskId, taskParams, request.SessionId, request.ClusterConnections, GetJobSettings(request.FmrOperationSpec));
Tasks_[taskId] = TCoordinatorTaskInfo{.Task = createdTask, .TaskStatus = ETaskStatus::Accepted, .OperationId = operationId};
Operations_[operationId] = {.TaskIds = {taskId}, .OperationStatus = EOperationStatus::Accepted, .SessionId = request.SessionId};
@@ -138,6 +134,8 @@ public:
for (auto& requestTaskState: request.TaskStates) {
auto taskId = requestTaskState->TaskId;
+ auto operationId = Tasks_[taskId].OperationId;
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(Operations_[operationId].SessionId);
YQL_ENSURE(Tasks_.contains(taskId));
auto taskStatus = requestTaskState->TaskStatus;
YQL_ENSURE(taskStatus != ETaskStatus::Accepted);
@@ -156,6 +154,10 @@ public:
tableStats.Rows >= curTableStats.Stats.Rows
);
YQL_ENSURE(fmrTableId.PartId == curTableStats.PartId);
+ if (taskStatus == ETaskStatus::Completed) {
+ YQL_CLOG(DEBUG, FastMapReduce) << "Current statistic from table with id" << fmrTableId.TableId << "_" << fmrTableId.PartId << ": " << tableStats;
+ Cerr << "Current statistic from table with id" << fmrTableId.TableId << "_" << fmrTableId.PartId << ": " << tableStats;
+ }
}
FmrTableStatistics_[fmrTableId.TableId] = TCoordinatorFmrTableStats{
.Stats = tableStats,
@@ -171,10 +173,6 @@ public:
tasksToRun.emplace_back(taskToRunInfo.second.Task);
}
}
-
- for (auto& taskId: TaskToDeleteIds_) {
- SetUnfinishedTaskStatus(taskId, ETaskStatus::Failed);
- }
return NThreading::MakeFuture(THeartbeatResponse{.TasksToRun = tasksToRun, .TaskToDeleteIds = TaskToDeleteIds_});
}
@@ -323,6 +321,28 @@ private:
}
}
+ TMaybe<NYT::TNode> GetJobSettings(const TMaybe<NYT::TNode>& currentFmrOperationSpec) {
+ // TODO - check this works
+ TMaybe<NYT::TNode> defaultJobSettings = Nothing(), currentJobSettings = Nothing();
+ if (currentFmrOperationSpec && currentFmrOperationSpec->HasKey("job_settings")) {
+ currentJobSettings = (*currentFmrOperationSpec)["job_settings"];
+ }
+ if (DefaultFmrOperationSpec_ && DefaultFmrOperationSpec_->HasKey("job_settings")) {
+ defaultJobSettings = (*DefaultFmrOperationSpec_)["job_settings"];
+ }
+ if (defaultJobSettings && !currentJobSettings) {
+ return defaultJobSettings;
+ }
+ if (currentJobSettings && !defaultJobSettings) {
+ return currentJobSettings;
+ }
+ if (!currentJobSettings && !defaultJobSettings) {
+ return Nothing();
+ }
+ NYT::MergeNodes(*currentJobSettings, *defaultJobSettings);
+ return currentJobSettings;
+ }
+
std::unordered_map<TString, TCoordinatorTaskInfo> Tasks_; // TaskId -> current info about it
std::unordered_set<TString> TaskToDeleteIds_; // TaskIds we want to pass to worker for deletion
std::unordered_map<TString, TOperationInfo> Operations_; // OperationId -> current info about it
@@ -337,6 +357,7 @@ private:
TDuration TimeToSleepBetweenClearKeyRequests_;
TDuration IdempotencyKeyStoreTime_;
std::unordered_map<TString, TCoordinatorFmrTableStats> FmrTableStatistics_; // TableId -> Statistics
+ TMaybe<NYT::TNode> DefaultFmrOperationSpec_;
};
} // namespace
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
index b3c06dbe5c3..3fea218981f 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
@@ -14,6 +14,7 @@ struct TFmrCoordinatorSettings {
TIntrusivePtr<IRandomProvider> RandomProvider;
TDuration IdempotencyKeyStoreTime = TDuration::Seconds(10);
TDuration TimeToSleepBetweenClearKeyRequests = TDuration::Seconds(1);
+ TMaybe<NYT::TNode> DefaultFmrOperationSpec = Nothing();
};
IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings = {.WorkersNum = 1, .RandomProvider = CreateDeterministicRandomProvider(2)});
diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp
index 8c244c3f1aa..80367f0bce2 100644
--- a/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.cpp
@@ -70,8 +70,10 @@ NProto::TStartOperationRequest StartOperationRequestToProto(const TStartOperatio
protoStartOperationRequest.SetIdempotencyKey(*startOperationRequest.IdempotencyKey);
}
protoStartOperationRequest.SetNumRetries(startOperationRequest.NumRetries);
- auto protoClusterConnection = ClusterConnectionToProto(startOperationRequest.ClusterConnection);
- protoStartOperationRequest.MutableClusterConnection()->Swap(&protoClusterConnection);
+ auto clusterConnections = *protoStartOperationRequest.MutableClusterConnections();
+ for (auto& [tableName, conn]: startOperationRequest.ClusterConnections) {
+ clusterConnections[tableName] = ClusterConnectionToProto(conn);
+ }
if (startOperationRequest.FmrOperationSpec) {
protoStartOperationRequest.SetFmrOperationSpec(NYT::NodeToYsonString(*startOperationRequest.FmrOperationSpec));
}
@@ -87,7 +89,11 @@ TStartOperationRequest StartOperationRequestFromProto(const NProto::TStartOperat
startOperationRequest.IdempotencyKey = protoStartOperationRequest.GetIdempotencyKey();
}
startOperationRequest.NumRetries = protoStartOperationRequest.GetNumRetries();
- startOperationRequest.ClusterConnection = ClusterConnectionFromProto(protoStartOperationRequest.GetClusterConnection());
+ std::unordered_map<TString, TClusterConnection> startOperationRequestClusterConnections;
+ for (auto& [tableName, conn]: protoStartOperationRequest.GetClusterConnections()) {
+ startOperationRequestClusterConnections[tableName] = ClusterConnectionFromProto(conn);
+ }
+ startOperationRequest.ClusterConnections = startOperationRequestClusterConnections;
if (protoStartOperationRequest.HasFmrOperationSpec()) {
startOperationRequest.FmrOperationSpec = NYT::NodeFromYsonString(protoStartOperationRequest.GetFmrOperationSpec());
}
diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
index 15ecd6c97a5..a045aa5bb01 100644
--- a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
+++ b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
@@ -25,7 +25,7 @@ struct TStartOperationRequest {
TString SessionId;
TMaybe<TString> IdempotencyKey = Nothing();
ui32 NumRetries = 1; // Not supported yet
- TClusterConnection ClusterConnection = {}; // TODO - change to map
+ std::unordered_map<TString, TClusterConnection> ClusterConnections = {};
TMaybe<NYT::TNode> FmrOperationSpec = Nothing();
};
diff --git a/yt/yql/providers/yt/fmr/fmr_tool_lib/ya.make b/yt/yql/providers/yt/fmr/fmr_tool_lib/ya.make
new file mode 100644
index 00000000000..b0c5e5cdf4f
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/fmr_tool_lib/ya.make
@@ -0,0 +1,21 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_fmr_initializer.cpp
+)
+
+PEERDIR(
+ yt/yql/providers/yt/gateway/fmr
+ yt/yql/providers/yt/fmr/coordinator/client
+ yt/yql/providers/yt/fmr/coordinator/impl
+ yt/yql/providers/yt/fmr/job/impl
+ yt/yql/providers/yt/fmr/job_factory/impl
+ yt/yql/providers/yt/fmr/table_data_service/local
+ yt/yql/providers/yt/fmr/worker/impl
+ yt/yql/providers/yt/fmr/yt_service/file
+ yt/yql/providers/yt/fmr/yt_service/impl
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp
new file mode 100644
index 00000000000..3bc8e074a3a
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.cpp
@@ -0,0 +1,37 @@
+#include "yql_yt_fmr_initializer.h"
+
+namespace NYql::NFmr {
+
+std::pair<IYtGateway::TPtr, IFmrWorker::TPtr> InitializeFmrGateway(IYtGateway::TPtr slave, bool disableLocalFmrWorker, const TString& coordinatorServerUrl, bool isFileGateway) {
+ auto coordinator = MakeFmrCoordinator();
+ if (!coordinatorServerUrl.empty()) {
+ TFmrCoordinatorClientSettings coordinatorClientSettings;
+ THttpURL parsedUrl;
+ if (parsedUrl.Parse(coordinatorServerUrl) != THttpURL::ParsedOK) {
+ ythrow yexception() << "Invalid fast map reduce coordinator server url passed in parameters";
+ }
+ coordinatorClientSettings.Port = parsedUrl.GetPort();
+ coordinatorClientSettings.Host = parsedUrl.GetHost();
+ coordinator = MakeFmrCoordinatorClient(coordinatorClientSettings);
+ }
+
+ IFmrWorker::TPtr worker = nullptr;
+ if (!disableLocalFmrWorker) {
+ auto tableDataService = MakeLocalTableDataService(TLocalTableDataServiceSettings(3));
+ auto fmrYtSerivce = isFileGateway ? MakeFileYtSerivce() : MakeFmrYtSerivce();
+
+ auto func = [tableDataService, fmrYtSerivce] (NFmr::TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) mutable {
+ return RunJob(task, tableDataService, fmrYtSerivce, cancelFlag);
+ };
+
+ NFmr::TFmrJobFactorySettings settings{.Function=func};
+ auto jobFactory = MakeFmrJobFactory(settings);
+ NFmr::TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDefaultRandomProvider(),
+ .TimeToSleepBetweenRequests=TDuration::Seconds(1)};
+ worker = MakeFmrWorker(coordinator, jobFactory, workerSettings);
+ worker->Start();
+ }
+ return std::pair<IYtGateway::TPtr, IFmrWorker::TPtr>{CreateYtFmrGateway(slave, coordinator), std::move(worker)};
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h
new file mode 100644
index 00000000000..a87522fbd71
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h>
+#include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h>
+#include <yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.h>
+#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h>
+#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h>
+#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
+#include <yt/yql/providers/yt/fmr/table_data_service/local/yql_yt_table_data_service_local.h>
+#include <yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h>
+#include <yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h>
+
+namespace NYql::NFmr {
+
+constexpr TStringBuf FastMapReduceGatewayName = "fmr";
+
+std::pair<IYtGateway::TPtr, IFmrWorker::TPtr> InitializeFmrGateway(IYtGateway::TPtr slave, bool disableLocalFmrWorker = false, const TString& coordinatorServerUrl = TString(), bool isFileGateway = false);
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
index 7b17dee907b..482225a0fce 100644
--- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp
@@ -47,7 +47,7 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
TDownloadTaskParams params = TDownloadTaskParams(input, output);
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
- auto res = job->Download(params);
+ auto res = job->Download(params, {{"test_cluster.test_path", TClusterConnection()}});
auto err = std::get_if<TError>(&res);
auto statistics = std::get_if<TStatistics>(&res);
@@ -75,7 +75,7 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0);
tableDataServicePtr->Put(key, GetBinaryYson(TableContent_1));
- auto res = job->Upload(params);
+ auto res = job->Upload(params, {{"test_cluster.test_path", TClusterConnection()}});
auto err = std::get_if<TError>(&res);
@@ -85,7 +85,6 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
}
Y_UNIT_TEST(MergeMixedTables) {
-
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
std::vector<TTableRange> ranges = {{"test_part_id"}};
@@ -111,7 +110,7 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
tableDataServicePtr->Put(key_1, GetBinaryYson(TableContent_1));
tableDataServicePtr->Put(key_3, GetBinaryYson(TableContent_3));
- auto res = job->Merge(params);
+ auto res = job->Merge(params, {{"test_cluster.test_path", TClusterConnection()}});
auto err = std::get_if<TError>(&res);
UNIT_ASSERT_C(!err, err->ErrorMessage);
@@ -124,7 +123,6 @@ Y_UNIT_TEST_SUITE(FmrJobTests) {
Y_UNIT_TEST_SUITE(TaskRunTests) {
Y_UNIT_TEST(RunDownloadTask) {
-
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
TYtTableRef input = TYtTableRef("test_cluster", "test_path");
std::unordered_map<TYtTableRef, TString> inputTables{{input, TableContent_1}};
@@ -135,7 +133,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id", "test_part_id");
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
TDownloadTaskParams params = TDownloadTaskParams(input, output);
- TTask::TPtr task = MakeTask(ETaskType::Download, "test_task_id", params, "test_session_id");
+ TTask::TPtr task = MakeTask(ETaskType::Download, "test_task_id", params, "test_session_id", {{"test_cluster.test_path", TClusterConnection()}});
ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
UNIT_ASSERT_EQUAL(status, ETaskStatus::Completed);
@@ -156,7 +154,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
TYtTableRef output = TYtTableRef("test_cluster", "test_path");
TUploadTaskParams params = TUploadTaskParams(input, output);
- TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", TClusterConnection());
+ TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", {{"test_cluster.test_path", TClusterConnection()}});
auto key = GetTableDataServiceKey(input.TableId, "test_part_id", 0);
tableDataServicePtr->Put(key, GetBinaryYson(TableContent_1));
ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
@@ -167,7 +165,6 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
}
Y_UNIT_TEST(RunUploadTaskWithNoTable) {
-
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
std::unordered_map<TYtTableRef, TString> inputTables, outputTables;
NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables);
@@ -178,15 +175,17 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
TYtTableRef output = TYtTableRef("test_cluster", "test_path");
TUploadTaskParams params = TUploadTaskParams(input, output);
- TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", TClusterConnection());
+ TTask::TPtr task = MakeTask(ETaskType::Upload, "test_task_id", params, "test_session_id", {{"test_cluster.test_path", TClusterConnection()}});
// No tables in tableDataService
- ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
- UNIT_ASSERT_EQUAL(status, ETaskStatus::Failed);
+ try {
+ RunJob(task, tableDataServicePtr, ytService, cancelFlag);
+ } catch(...) {
+ UNIT_ASSERT(CurrentExceptionMessage().Contains("No data for chunk:test_table_id:test_part_id"));
+ }
}
Y_UNIT_TEST(RunMergeTask) {
-
ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
std::vector<TTableRange> ranges = {{"test_part_id"}};
TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges};
@@ -205,7 +204,7 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
auto params = TMergeTaskParams(inputs, output);
auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
- TTask::TPtr task = MakeTask(ETaskType::Merge, "test_task_id", params, "test_session_id", TClusterConnection());
+ TTask::TPtr task = MakeTask(ETaskType::Merge, "test_task_id", params, "test_session_id", {{"test_cluster.test_path", TClusterConnection()}});
auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0);
auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0);
@@ -219,40 +218,6 @@ Y_UNIT_TEST_SUITE(TaskRunTests) {
UNIT_ASSERT_C(resultTableContent, "Result table content is empty");
UNIT_ASSERT_NO_DIFF(GetTextYson(*resultTableContent), TableContent_1 + TableContent_2 + TableContent_3);
}
-
- Y_UNIT_TEST(RunMergeTaskWithNoTable) {
-
- ITableDataService::TPtr tableDataServicePtr = MakeLocalTableDataService(TLocalTableDataServiceSettings(1));
- std::unordered_map<TYtTableRef, TString> inputTables, outputTables;
- NYql::NFmr::IYtService::TPtr ytService = MakeMockYtService(inputTables, outputTables);
- std::shared_ptr<std::atomic<bool>> cancelFlag = std::make_shared<std::atomic<bool>>(false);
-
- std::vector<TTableRange> ranges = {{"test_part_id"}};
- TFmrTableInputRef input_1 = TFmrTableInputRef{.TableId = "test_table_id_1", .TableRanges = ranges};
- TYtTableRef input_2 = TYtTableRef("test_path", "test_cluster");
- TFmrTableInputRef input_3 = TFmrTableInputRef{.TableId = "test_table_id_3", .TableRanges = ranges};
- TTaskTableRef input_table_ref_1 = {input_1};
- TTaskTableRef input_table_ref_2 = {input_2};
- TTaskTableRef input_table_ref_3 = {input_3};
- TFmrTableOutputRef output = TFmrTableOutputRef("test_table_id_output", "test_part_id");
- std::vector<TTaskTableRef> inputs = {input_table_ref_1, input_table_ref_2, input_table_ref_3};
- auto params = TMergeTaskParams(inputs, output);
- auto tableDataServiceExpectedOutputKey = GetTableDataServiceKey(output.TableId, output.PartId, 0);
-
- TTask::TPtr task = MakeTask(ETaskType::Merge, "test_task_id", params, "test_session_id", TClusterConnection());
-
- auto key_1 = GetTableDataServiceKey(input_1.TableId, "test_part_id", 0);
- auto key_3 = GetTableDataServiceKey(input_3.TableId, "test_part_id", 0);
- tableDataServicePtr->Put(key_1, GetBinaryYson(TableContent_1));
- // No table (input_2) in Yt
- tableDataServicePtr->Put(key_3, GetBinaryYson(TableContent_3));
-
- ETaskStatus status = RunJob(task, tableDataServicePtr, ytService, cancelFlag).TaskStatus;
-
- UNIT_ASSERT_EQUAL(status, ETaskStatus::Failed);
- auto resultTableContent = tableDataServicePtr->Get(tableDataServiceExpectedOutputKey).GetValueSync();
- UNIT_ASSERT(!resultTableContent);
- }
}
} // namespace NYql
diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
index c8da1df4801..c1721456cdc 100644
--- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
+++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp
@@ -21,7 +21,7 @@ public:
{
}
- virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const TClusterConnection& clusterConnection) override {
+ virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections) override {
try {
const auto ytTable = params.Input;
const auto cluster = params.Input.Cluster;
@@ -31,8 +31,8 @@ public:
const auto partId = output.PartId;
YQL_CLOG(DEBUG, FastMapReduce) << "Downloading " << cluster << '.' << path;
-
- auto ytTableReader = YtService_->MakeReader(ytTable, clusterConnection); // TODO - pass YtReader settings from Gateway
+ YQL_ENSURE(clusterConnections.size() == 1);
+ auto ytTableReader = YtService_->MakeReader(ytTable, clusterConnections.begin()->second); // TODO - pass YtReader settings from Gateway
auto tableDataServiceWriter = TFmrTableDataServiceWriter(tableId, partId, TableDataService_, GetFmrTableDataServiceWriterSettings());
ParseRecords(*ytTableReader, tableDataServiceWriter, GetParseRecordSettings().BlockCount, GetParseRecordSettings().BlockSize);
@@ -46,7 +46,7 @@ public:
}
}
- virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const TClusterConnection& clusterConnection) override {
+ virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections) override {
try {
const auto ytTable = params.Output;
const auto cluster = params.Output.Cluster;
@@ -57,7 +57,8 @@ public:
YQL_CLOG(DEBUG, FastMapReduce) << "Uploading " << cluster << '.' << path;
auto tableDataServiceReader = TFmrTableDataServiceReader(tableId, tableRanges, TableDataService_, GetFmrTableDataServiceReaderSettings());
- auto ytTableWriter = YtService_->MakeWriter(ytTable, clusterConnection);
+ YQL_ENSURE(clusterConnections.size() == 1);
+ auto ytTableWriter = YtService_->MakeWriter(ytTable, clusterConnections.begin()->second);
ParseRecords(tableDataServiceReader, *ytTableWriter, GetParseRecordSettings().BlockCount, GetParseRecordSettings().BlockSize);
ytTableWriter->Flush();
@@ -67,8 +68,7 @@ public:
}
}
- virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const TClusterConnection& clusterConnection) override {
- // TODO - unordered_map<ClusterConnection>
+ virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections) override {
// расширить таск парамс. добавить туда мету
try {
const auto inputs = params.Input;
@@ -81,7 +81,7 @@ public:
if (CancelFlag_->load()) {
return TError("Canceled");
}
- auto inputTableReader = GetTableInputStream(inputTableRef, clusterConnection);
+ auto inputTableReader = GetTableInputStream(inputTableRef, clusterConnections);
ParseRecords(*inputTableReader, tableDataServiceWriter, GetParseRecordSettings().BlockCount, GetParseRecordSettings().BlockSize);
}
tableDataServiceWriter.Flush();
@@ -93,10 +93,12 @@ public:
}
private:
- NYT::TRawTableReaderPtr GetTableInputStream(const TTaskTableRef& tableRef, const TClusterConnection& clusterConnection) {
+ NYT::TRawTableReaderPtr GetTableInputStream(const TTaskTableRef& tableRef, const std::unordered_map<TString, TClusterConnection>& clusterConnections) const {
auto ytTable = std::get_if<TYtTableRef>(&tableRef);
auto fmrTable = std::get_if<TFmrTableInputRef>(&tableRef);
if (ytTable) {
+ TString tableId = ytTable->Cluster + "." + ytTable->Path;
+ auto clusterConnection = clusterConnections.at(tableId);
return YtService_->MakeReader(*ytTable, clusterConnection); // TODO - pass YtReader settings from Gateway
} else if (fmrTable) {
return MakeIntrusive<TFmrTableDataServiceReader>(fmrTable->TableId, fmrTable->TableRanges, TableDataService_, GetFmrTableDataServiceReaderSettings());
@@ -105,15 +107,15 @@ private:
}
}
- TParseRecordSettings GetParseRecordSettings() {
+ TParseRecordSettings GetParseRecordSettings() const {
return Settings_ ? Settings_->ParseRecordSettings : TParseRecordSettings();
}
- TFmrTableDataServiceReaderSettings GetFmrTableDataServiceReaderSettings() {
+ TFmrTableDataServiceReaderSettings GetFmrTableDataServiceReaderSettings() const {
return Settings_ ? Settings_->FmrTableDataServiceReaderSettings : TFmrTableDataServiceReaderSettings();
}
- TFmrTableDataServiceWriterSettings GetFmrTableDataServiceWriterSettings() {
+ TFmrTableDataServiceWriterSettings GetFmrTableDataServiceWriterSettings() const {
return Settings_ ? Settings_->FmrTableDataServiceWriterSettings : TFmrTableDataServiceWriterSettings();
}
@@ -147,27 +149,23 @@ TJobResult RunJob(
using T = std::decay_t<decltype(taskParams)>;
if constexpr (std::is_same_v<T, TUploadTaskParams>) {
- return job->Upload(taskParams, task->ClusterConnection);
+ return job->Upload(taskParams, task->ClusterConnections);
} else if constexpr (std::is_same_v<T, TDownloadTaskParams>) {
- return job->Download(taskParams, task->ClusterConnection);
+ return job->Download(taskParams, task->ClusterConnections);
} else if constexpr (std::is_same_v<T, TMergeTaskParams>) {
- return job->Merge(taskParams, task->ClusterConnection);
+ return job->Merge(taskParams, task->ClusterConnections);
} else {
throw std::runtime_error{"Unsupported task type"};
}
};
std::variant<TError, TStatistics> taskResult = std::visit(processTask, task->TaskParams);
-
auto err = std::get_if<TError>(&taskResult);
-
if (err) {
- YQL_CLOG(ERROR, FastMapReduce) << "Task failed: " << err->ErrorMessage;
- return {ETaskStatus::Failed, TStatistics()};
+ ythrow yexception() << "Job failed with error: " << err->ErrorMessage;
}
auto statistics = std::get_if<TStatistics>(&taskResult);
-
return {ETaskStatus::Completed, *statistics};
};
diff --git a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h
index 4b9c13b04bf..022a6256db0 100644
--- a/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h
+++ b/yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h
@@ -10,11 +10,11 @@ public:
virtual ~IFmrJob() = default;
- virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const TClusterConnection& clusterConnection = TClusterConnection()) = 0;
+ virtual std::variant<TError, TStatistics> Download(const TDownloadTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections = {}) = 0;
- virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const TClusterConnection& clusterConnection = TClusterConnection()) = 0;
+ virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections = {}) = 0;
- virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const TClusterConnection& clusterConnection = TClusterConnection()) = 0;
+ virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const std::unordered_map<TString, TClusterConnection>& clusterConnections = {}) = 0;
};
} // namespace NYql
diff --git a/yt/yql/providers/yt/fmr/proto/coordinator.proto b/yt/yql/providers/yt/fmr/proto/coordinator.proto
index a104cf26eac..34e16ed26b5 100644
--- a/yt/yql/providers/yt/fmr/proto/coordinator.proto
+++ b/yt/yql/providers/yt/fmr/proto/coordinator.proto
@@ -21,7 +21,7 @@ message TStartOperationRequest {
string SessionId = 3;
optional string IdempotencyKey = 4;
uint32 NumRetries = 5;
- TClusterConnection ClusterConnection = 6;
+ map<string, TClusterConnection> ClusterConnections = 6;
optional string FmrOperationSpec = 7;
}
diff --git a/yt/yql/providers/yt/fmr/proto/request_options.proto b/yt/yql/providers/yt/fmr/proto/request_options.proto
index 4bd35eacaa4..62c917b4e50 100644
--- a/yt/yql/providers/yt/fmr/proto/request_options.proto
+++ b/yt/yql/providers/yt/fmr/proto/request_options.proto
@@ -46,6 +46,7 @@ message TFmrError {
message TYtTableRef {
string Path = 1;
string Cluster = 2;
+ optional string FilePath = 3;
}
message TFmrTableRef {
@@ -155,7 +156,7 @@ message TTask {
TTaskParams TaskParams = 3;
string SessionId = 4;
optional uint32 NumRetries = 5;
- TClusterConnection ClusterConnection = 6;
+ map<string, TClusterConnection> ClusterConnections = 6;
optional string JobSettings = 7;
}
diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
index 0860c009962..20dfb5362a9 100644
--- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
+++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp
@@ -39,6 +39,9 @@ NProto::TYtTableRef YtTableRefToProto(const TYtTableRef& ytTableRef) {
NProto::TYtTableRef protoYtTableRef;
protoYtTableRef.SetPath(ytTableRef.Path);
protoYtTableRef.SetCluster(ytTableRef.Cluster);
+ if (ytTableRef.FilePath) {
+ protoYtTableRef.SetFilePath(*ytTableRef.FilePath);
+ }
return protoYtTableRef;
}
@@ -46,6 +49,9 @@ TYtTableRef YtTableRefFromProto(const NProto::TYtTableRef protoYtTableRef) {
TYtTableRef ytTableRef;
ytTableRef.Path = protoYtTableRef.GetPath();
ytTableRef.Cluster = protoYtTableRef.GetCluster();
+ if (protoYtTableRef.HasFilePath()) {
+ ytTableRef.FilePath = protoYtTableRef.GetFilePath();
+ }
return ytTableRef;
}
@@ -398,8 +404,10 @@ NProto::TTask TaskToProto(const TTask& task) {
protoTask.MutableTaskParams()->Swap(&taskParams);
protoTask.SetSessionId(task.SessionId);
protoTask.SetNumRetries(task.NumRetries);
- auto clusterConnection = ClusterConnectionToProto(task.ClusterConnection);
- protoTask.MutableClusterConnection()->Swap(&clusterConnection);
+ auto clusterConnections = *protoTask.MutableClusterConnections();
+ for (auto& [tableName, conn]: task.ClusterConnections) {
+ clusterConnections[tableName] = ClusterConnectionToProto(conn);
+ }
if (task.JobSettings) {
protoTask.SetJobSettings(NYT::NodeToYsonString(*task.JobSettings));
}
@@ -413,7 +421,11 @@ TTask TaskFromProto(const NProto::TTask& protoTask) {
task.TaskParams = TaskParamsFromProto(protoTask.GetTaskParams());
task.SessionId = protoTask.GetSessionId();
task.NumRetries = protoTask.GetNumRetries();
- task.ClusterConnection = ClusterConnectionFromProto(protoTask.GetClusterConnection());
+ std::unordered_map<TString, TClusterConnection> taskClusterConnections;
+ for (auto& [tableName, conn]: protoTask.GetClusterConnections()) {
+ taskClusterConnections[tableName] = ClusterConnectionFromProto(conn);
+ }
+ task.ClusterConnections = taskClusterConnections;
if (protoTask.HasJobSettings()) {
task.JobSettings = NYT::NodeFromYsonString(protoTask.GetJobSettings());
}
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
index 0dc3650855c..8a0bcde5751 100644
--- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
@@ -2,8 +2,8 @@
namespace NYql::NFmr {
-TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection, const TMaybe<NYT::TNode>& jobSettings) {
- return MakeIntrusive<TTask>(taskType, taskId, taskParams, sessionId, clusterConnection, jobSettings);
+TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const std::unordered_map<TString, TClusterConnection>& clusterConnections, const TMaybe<NYT::TNode>& jobSettings) {
+ return MakeIntrusive<TTask>(taskType, taskId, taskParams, sessionId, clusterConnections, jobSettings);
}
TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& taskErrorMessage, const TStatistics& stats) {
@@ -31,3 +31,8 @@ template<>
void Out<NYql::NFmr::TFmrChunkMeta>(IOutputStream& out, const NYql::NFmr::TFmrChunkMeta& meta) {
out << meta.ToString();
}
+
+template<>
+void Out<NYql::NFmr::TTableStats>(IOutputStream& out, const NYql::NFmr::TTableStats& tableStats) {
+ out << tableStats.Chunks << " chunks, " << tableStats.Rows << " rows, " << tableStats.DataWeight << " data weight";
+}
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
index de18d91fa09..bab3df01aae 100644
--- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
@@ -57,6 +57,8 @@ struct TError {
struct TYtTableRef {
TString Path;
TString Cluster;
+ TMaybe<TString> FilePath = Nothing();
+
bool operator == (const TYtTableRef&) const = default;
};
@@ -168,8 +170,8 @@ struct TClusterConnection {
struct TTask: public TThrRefBase {
TTask() = default;
- TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection, const TMaybe<NYT::TNode> & jobSettings = Nothing(), ui32 numRetries = 1)
- : TaskType(taskType), TaskId(taskId), TaskParams(taskParams), SessionId(sessionId), ClusterConnection(clusterConnection), JobSettings(jobSettings), NumRetries(numRetries)
+ TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const std::unordered_map<TString, TClusterConnection>& clusterConnections, const TMaybe<NYT::TNode> & jobSettings = Nothing(), ui32 numRetries = 1)
+ : TaskType(taskType), TaskId(taskId), TaskParams(taskParams), SessionId(sessionId), ClusterConnections(clusterConnections), JobSettings(jobSettings), NumRetries(numRetries)
{
}
@@ -177,7 +179,7 @@ struct TTask: public TThrRefBase {
TString TaskId;
TTaskParams TaskParams = {};
TString SessionId;
- TClusterConnection ClusterConnection = {};
+ std::unordered_map<TString, TClusterConnection> ClusterConnections = {};
TMaybe<NYT::TNode> JobSettings = {};
ui32 NumRetries; // Not supported yet
@@ -199,7 +201,7 @@ struct TTaskState: public TThrRefBase {
using TPtr = TIntrusivePtr<TTaskState>;
};
-TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const TClusterConnection& clusterConnection = TClusterConnection{}, const TMaybe<NYT::TNode>& jobSettings = Nothing());
+TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, const std::unordered_map<TString, TClusterConnection>& clusterConnections = {}, const TMaybe<NYT::TNode>& jobSettings = Nothing());
TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& taskErrorMessage = Nothing(), const TStatistics& stats = TStatistics());
diff --git a/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp b/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp
index bec70aa6ed4..85e9599f8f8 100644
--- a/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp
+++ b/yt/yql/providers/yt/fmr/utils/ut/yql_yt_parse_records_ut.cpp
@@ -16,6 +16,7 @@ Y_UNIT_TEST_SUITE(ParseRecordTests) {
std::unordered_map<TYtTableRef, TString> outputTables;
auto ytService = MakeMockYtService(inputTables, outputTables);
+
auto reader = ytService->MakeReader(testYtTable, TClusterConnection());
auto writer = ytService->MakeWriter(testYtTable, TClusterConnection());
ParseRecords(*reader, *writer, 1, 10);
diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
index db1aa21c430..2007cec3011 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
+++ b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
@@ -20,7 +20,7 @@ TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Do
.TaskType = taskType,
.OperationParams = operationParams,
.IdempotencyKey = "IdempotencyKey",
- .ClusterConnection = TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}
+ .ClusterConnections = {{"Cluster.Path", TClusterConnection{.TransactionId = "transaction_id", .YtServerName = "hahn.yt.yandex.net", .Token = "token"}}}
};
}
diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h
index bff6ddff9e3..1e18f2aa5b2 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h
+++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h
@@ -1,3 +1,5 @@
+#pragma once
+
#include <library/cpp/random_provider/random_provider.h>
#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h>
#include <yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h>
diff --git a/yt/yql/providers/yt/fmr/yt_service/file/ut/ya.make b/yt/yql/providers/yt/fmr/yt_service/file/ut/ya.make
new file mode 100644
index 00000000000..6f4ff700b0d
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/yt_service/file/ut/ya.make
@@ -0,0 +1,14 @@
+UNITTEST()
+
+SRCS(
+ yql_yt_file_yt_service_ut.cpp
+)
+
+PEERDIR(
+ yt/yql/providers/yt/fmr/yt_service/file
+ yt/yql/providers/yt/gateway/file
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/yt_service/file/ut/yql_yt_file_yt_service_ut.cpp b/yt/yql/providers/yt/fmr/yt_service/file/ut/yql_yt_file_yt_service_ut.cpp
new file mode 100644
index 00000000000..acc5867c454
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/yt_service/file/ut/yql_yt_file_yt_service_ut.cpp
@@ -0,0 +1,32 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/stream/file.h>
+#include <yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h>
+#include <yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h> // TODO - REMOVE
+
+namespace NYql::NFmr {
+
+Y_UNIT_TEST_SUITE(FileYtServiceTest) {
+ Y_UNIT_TEST(CheckReaderAndWriter) {
+ TString inputYsonContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};\n"
+ "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};\n";
+
+ TTempFileHandle file{};
+ TYtTableRef ytTable{.Path = "test_path", .Cluster = "hahn", .FilePath = file.Name()};
+
+ auto fileService = MakeFileYtSerivce();
+ auto writer = fileService->MakeWriter(ytTable, TClusterConnection());
+ writer->Write(inputYsonContent.data(), inputYsonContent.size());
+ writer->Flush();
+
+ TFileInput input(file.Name());
+
+ auto reader = fileService->MakeReader(ytTable, TClusterConnection());
+ TStringStream binaryYsonStream;
+ TStringStream textYsonStream;
+ binaryYsonStream << reader->ReadAll();
+ NYson::ReformatYsonStream(&binaryYsonStream, &textYsonStream, NYson::EYsonFormat::Text, ::NYson::EYsonType::ListFragment);
+ UNIT_ASSERT(textYsonStream.ReadAll().Contains(inputYsonContent));
+ }
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/yt_service/file/ya.make b/yt/yql/providers/yt/fmr/yt_service/file/ya.make
new file mode 100644
index 00000000000..dcd0969e3e6
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/yt_service/file/ya.make
@@ -0,0 +1,19 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_file_yt_service.cpp
+)
+
+PEERDIR(
+ library/cpp/yson
+ yt/yql/providers/yt/gateway/file
+ yt/yql/providers/yt/fmr/yt_service/interface
+ yt/yql/providers/yt/lib/yson_helpers
+ yql/essentials/utils
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
+
+RECURSE_FOR_TESTS(ut)
diff --git a/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.cpp b/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.cpp
new file mode 100644
index 00000000000..1c33ed430e9
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.cpp
@@ -0,0 +1,68 @@
+#include "yql_yt_file_yt_service.h"
+#include <library/cpp/yson/parser.h>
+#include <util/stream/file.h>
+#include <yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h>
+#include <yt/yql/providers/yt/lib/yson_helpers/yson_helpers.h>
+#include <yql/essentials/utils/yql_panic.h>
+
+namespace NYql::NFmr {
+
+namespace {
+
+class TFileYtTableWriter: public NYT::TRawTableWriter {
+ public:
+ TFileYtTableWriter(const TString& filePath): FilePath_(filePath) {}
+
+ void NotifyRowEnd() override {
+ }
+
+ private:
+ void DoWrite(const void* buf, size_t len) override {
+ Buffer_.Append(static_cast<const char*>(buf), len);
+ }
+
+ void DoFlush() override {
+ TMemoryInput input(Buffer_.data(), Buffer_.size());
+ TFileOutput outputFileStream(FilePath_);
+ TDoubleHighPrecisionYsonWriter writer(&outputFileStream, ::NYson::EYsonType::ListFragment);
+ NYson::TYsonParser parser(&writer, &input, ::NYson::EYsonType::ListFragment);
+ parser.Parse();
+ Buffer_.Clear();
+ }
+
+ TString FilePath_;
+ TBuffer Buffer_;
+ };
+
+
+class TFileYtService: public NYql::NFmr::IYtService {
+public:
+
+ NYT::TRawTableReaderPtr MakeReader(
+ const TYtTableRef& ytTable,
+ const TClusterConnection& /*clusterConnection*/,
+ const TYtReaderSettings& /*readerSettings*/
+ ) override {
+ YQL_ENSURE(ytTable.FilePath);
+ auto textYsonInputs = NFile::MakeTextYsonInputs({{*ytTable.FilePath, NFile::TColumnsInfo{}}}, false);
+ return textYsonInputs[0];
+ }
+
+ NYT::TRawTableWriterPtr MakeWriter(
+ const TYtTableRef& ytTable,
+ const TClusterConnection& /*clusterConnection*/,
+ const TYtWriterSettings& /*writerSettings*/
+ ) override {
+ YQL_ENSURE(ytTable.FilePath);
+ return MakeIntrusive<TFileYtTableWriter>(*ytTable.FilePath);
+ }
+
+};
+
+} // namespace
+
+IYtService::TPtr MakeFileYtSerivce() {
+ return MakeIntrusive<TFileYtService>();
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h b/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h
new file mode 100644
index 00000000000..ea5e8027b9a
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/yt_service/file/yql_yt_file_yt_service.h
@@ -0,0 +1,7 @@
+#include <yt/yql/providers/yt/fmr/yt_service/interface/yql_yt_yt_service.h>
+
+namespace NYql::NFmr {
+
+IYtService::TPtr MakeFileYtSerivce();
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp
index 429880e66e1..a3caeb351c2 100644
--- a/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp
+++ b/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp
@@ -1596,6 +1596,9 @@ private:
return TClusterConnectionResult();
}
+ TMaybe<TString> GetTableFilePath(const TGetTableFilePathOptions&& options) override {
+ return Services_->GetTablePath(options.Cluster(), options.Path(), options.IsTemp());
+ }
private:
TYtFileServices::TPtr Services_;
diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.cpp
index ec42b70bfdf..7a84a895fab 100644
--- a/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.cpp
+++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.cpp
@@ -10,16 +10,18 @@ namespace NYql::NFile {
class TTextYsonInput: public NYT::TRawTableReader {
public:
- TTextYsonInput(const TString& file, const TColumnsInfo& columnsInfo) {
+ TTextYsonInput(const TString& file, const TColumnsInfo& columnsInfo, bool addRowIndex) {
TIFStream in(file);
TBinaryYsonWriter writer(&Input_, ::NYson::EYsonType::ListFragment);
- writer.OnBeginAttributes();
- writer.OnKeyedItem("row_index");
- writer.OnInt64Scalar(0);
- writer.OnEndAttributes();
- writer.OnEntity();
- writer.OnListItem();
+ if (addRowIndex) {
+ writer.OnBeginAttributes();
+ writer.OnKeyedItem("row_index");
+ writer.OnInt64Scalar(0);
+ writer.OnEndAttributes();
+ writer.OnEntity();
+ writer.OnListItem();
+ }
NYT::NYson::IYsonConsumer* consumer = &writer;
THolder<TColumnFilteringConsumer> filter;
if (columnsInfo.Columns || columnsInfo.RenameColumns) {
@@ -59,16 +61,16 @@ private:
TBufferStream Input_;
};
-TVector<NYT::TRawTableReaderPtr> MakeTextYsonInputs(const TVector<std::pair<TString, TColumnsInfo>>& files) {
+TVector<NYT::TRawTableReaderPtr> MakeTextYsonInputs(const TVector<std::pair<TString, TColumnsInfo>>& files, bool addRowIndex) {
TVector<NYT::TRawTableReaderPtr> rawReaders;
for (auto& file: files) {
if (!NFs::Exists(file.first)) {
rawReaders.emplace_back(nullptr);
continue;
}
- rawReaders.emplace_back(MakeIntrusive<TTextYsonInput>(file.first, file.second));
+ rawReaders.emplace_back(MakeIntrusive<TTextYsonInput>(file.first, file.second, addRowIndex));
}
return rawReaders;
}
-} //namespace NYql::NFile \ No newline at end of file
+} //namespace NYql::NFile
diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h b/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h
index 85b903c84b0..a28606598d3 100644
--- a/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h
+++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_text_yson.h
@@ -9,6 +9,6 @@ struct TColumnsInfo {
TMaybe<NYT::TRichYPath::TRenameColumnsDescriptor> RenameColumns;
};
-TVector<NYT::TRawTableReaderPtr> MakeTextYsonInputs(const TVector<std::pair<TString, TColumnsInfo>>& files);
+TVector<NYT::TRawTableReaderPtr> MakeTextYsonInputs(const TVector<std::pair<TString, TColumnsInfo>>& files, bool addRowIndex = true);
-}//namespace NYql::NFile \ No newline at end of file
+} // namespace NYql::NFile
diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
index dd5160d07c2..5860a65366d 100644
--- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
+++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
@@ -23,14 +23,14 @@ using namespace NYql::NNodes;
namespace NYql::NFmr {
-namespace {
-
enum class ETablePresenceStatus {
OnlyInYt,
OnlyInFmr,
Both
};
+namespace {
+
struct TFmrOperationResult: public NCommon::TOperationResult {};
class TFmrYtGateway final: public TYtForwardingGatewayBase {
@@ -109,7 +109,7 @@ public:
if (auto op = opBase.Maybe<TYtMerge>()) {
auto ytMerge = op.Cast();
- std::vector<TYtTableRef> inputTables = GetMergeInputTables(ytMerge);
+ auto inputTables = GetMergeInputTables(ytMerge);
TYtTableRef outputTable = GetMergeOutputTable(ytMerge);
auto future = DoMerge(inputTables, outputTable, std::move(options));
return future.Apply([this, pos = nodePos, outputTable = std::move(outputTable), options = std::move(options)] (const TFuture<TFmrOperationResult>& f) {
@@ -117,8 +117,7 @@ public:
f.GetValue(); // rethrow error if any
TString sessionId = options.SessionId();
auto config = options.Config();
- TString transformedOutputTableId = GetTransformedPath(outputTable.Path, sessionId, config);
- TString fmrOutputTableId = outputTable.Cluster + "." + transformedOutputTableId;
+ TString fmrOutputTableId = outputTable.Cluster + "." + outputTable.Path;
SetTablePresenceStatus(fmrOutputTableId, sessionId, ETablePresenceStatus::OnlyInFmr);
TRunResult result;
result.OutTableStats.emplace_back(outputTable.Path, MakeIntrusive<TYtTableStatInfo>()); // TODO - add statistics?
@@ -145,19 +144,32 @@ public:
auto config = options.Config();
std::vector<TFuture<TFmrOperationResult>> uploadFmrTablesToYtFutures;
+ auto outputPath = publish.Publish().Name().StringValue();
+
+ bool isAnonymous = NYql::HasSetting(publish.Publish().Settings().Ref(), EYtSettingType::Anonymous);
+ std::vector<TString> currentAnonymousTableAliases;
for (auto out: publish.Input()) {
- auto outTableWithCluster = GetOutTableWithCluster(out);
+ auto inputCluster = GetOutTableWithCluster(out).second;
auto outTable = GetOutTable(out).Cast<TYtOutTable>();
TStringBuf inputPath = outTable.Name().Value();
- TString transformedInputPath = GetTransformedPath(ToString(inputPath), sessionId, config);
+ if (isAnonymous) {
+ currentAnonymousTableAliases.emplace_back(inputCluster + "." + inputPath);
+ }
auto outputBase = out.Operation().Cast<TYtOutputOpBase>().Ptr();
+ uploadFmrTablesToYtFutures.emplace_back(DoUpload(inputCluster, TString(inputPath), sessionId, config, outputBase, ctx));
+ }
+
+ if (isAnonymous) {
+ YQL_CLOG(DEBUG, FastMapReduce) << "Table " << outputPath << " is anonymous, not uploading from fmr to yt";
+ TString fmrOutputTableId = cluster + "." + outputPath;
+ SetTablePresenceStatus(fmrOutputTableId, sessionId, ETablePresenceStatus::OnlyInFmr);
- TFmrTableRef fmrTableRef = TFmrTableRef{outTableWithCluster.second + "." + transformedInputPath};
- uploadFmrTablesToYtFutures.emplace_back(DoUpload(fmrTableRef, sessionId, config, outputBase, ctx));
+ // TODO - figure out what to do here in case of multiple inputs
+ SetFmrIdAlias(fmrOutputTableId, currentAnonymousTableAliases[0], sessionId);
+ return Slave_->Publish(node, ctx, std::move(options));
}
- auto outputPath = publish.Publish().Name().StringValue();
auto idempotencyKey = GenerateId();
return WaitExceptionOrAll(uploadFmrTablesToYtFutures).Apply([&, pos = nodePos, curNode = std::move(node), options = std::move(options)] (const TFuture<void>& f) mutable {
@@ -170,10 +182,6 @@ public:
});
}
- TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& options) override {
- return Slave_->GetClusterConnection(std::move(options));
- }
-
void OpenSession(TOpenSessionOptions&& options) final {
TString sessionId = options.SessionId();
YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
@@ -230,23 +238,34 @@ private:
return GetGuidAsString(RandomProvider_->GenGuid());
}
- TString GetUsername(const TString& sessionId) {
+ TString GetRealTablePath(const TString& sessionId, const TString& cluster, const TString& path, TYtSettings::TConstPtr& config) {
+ auto richPath = Slave_->GetWriteTable(sessionId, cluster, path, GetTablesTmpFolder(*config));
+ return richPath.Path_;
+ }
+
+ void SetTablePresenceStatus(const TString& fmrTableId, const TString& sessionId, ETablePresenceStatus newStatus) {
with_lock(SessionStates_->Mutex) {
- YQL_ENSURE(SessionStates_->Sessions.contains(sessionId));
- auto& session = SessionStates_->Sessions[sessionId];
- return session.UserName;
+ YQL_CLOG(DEBUG, FastMapReduce) << "Setting table presence status " << newStatus << " for table with id " << fmrTableId;
+ auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses;
+ tablePresenceStatuses[fmrTableId] = newStatus;
}
}
- TString GetTransformedPath(const TString& path, const TString& sessionId, TYtSettings::TConstPtr& config) {
- TString username = GetUsername(sessionId);
- return NYql::TransformPath(GetTablesTmpFolder(*config), path, true, username);
+ void SetFmrIdAlias(const TString& fmrTableId, const TString& alias, const TString& sessionId) {
+ with_lock(SessionStates_->Mutex) {
+ YQL_CLOG(DEBUG, FastMapReduce) << "Setting table fmr id alias " << alias << " for table with id " << fmrTableId;
+ auto& fmrIdAliases = SessionStates_->Sessions[sessionId].FmrIdAliases;
+ fmrIdAliases[fmrTableId] = alias;
+ }
}
- void SetTablePresenceStatus(const TString& fmrTableId, const TString& sessionId, ETablePresenceStatus newStatus) {
+ TString GetFmrIdOrAlias(const TString& fmrTableId, const TString& sessionId) {
with_lock(SessionStates_->Mutex) {
- auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses;
- tablePresenceStatuses[fmrTableId] = newStatus;
+ auto& fmrIdAliases = SessionStates_->Sessions[sessionId].FmrIdAliases;
+ if (!fmrIdAliases.contains(fmrTableId)) {
+ return fmrTableId;
+ }
+ return fmrIdAliases[fmrTableId];
}
}
@@ -260,14 +279,14 @@ private:
}
}
- std::vector<TYtTableRef> GetMergeInputTables(const TYtMerge& ytMerge) {
+ std::vector<std::pair<TYtTableRef, bool>> GetMergeInputTables(const TYtMerge& ytMerge) {
auto input = ytMerge.Maybe<TYtTransientOpBase>().Cast().Input();
- std::vector<TYtTableRef> inputTables;
+ std::vector<std::pair<TYtTableRef, bool>> inputTables;
for (auto section: input.Cast<TYtSectionList>()) {
for (auto path: section.Paths()) {
TYtPathInfo pathInfo(path);
TYtTableRef ytTable{.Path = pathInfo.Table->Name, .Cluster = pathInfo.Table->Cluster};
- inputTables.emplace_back(ytTable);
+ inputTables.emplace_back(ytTable, pathInfo.Table->IsTemp);
}
}
return inputTables;
@@ -288,22 +307,7 @@ private:
return outputTables[0];
}
- TString GetClusterFromMergeTables(const std::vector<TYtTableRef>& inputTables, TYtTableRef& outputTable) {
- std::unordered_set<TString> clusters;
- for (auto& [path, cluster]: inputTables) {
- clusters.emplace(cluster);
- }
- YQL_ENSURE(clusters.size() == 1);
- TString cluster = *clusters.begin();
- if (outputTable.Cluster) {
- YQL_ENSURE(outputTable.Cluster == cluster);
- } else {
- outputTable.Cluster = cluster;
- }
- return cluster;
- }
-
- TClusterConnection GetTablesClusterConnection(const TString& cluster, const TString& sessionId, TYtSettings::TConstPtr& config) {
+ TClusterConnection GetTableClusterConnection(const TString& cluster, const TString& sessionId, TYtSettings::TConstPtr& config) {
auto clusterConnectionOptions = TClusterConnectionOptions(sessionId).Cluster(cluster).Config(config);
auto clusterConnection = GetClusterConnection(std::move(clusterConnectionOptions));
return TClusterConnection{
@@ -330,33 +334,30 @@ private:
return future;
}
- TFuture<TFmrOperationResult> DoUpload(const TFmrTableRef& fmrTableRef, const TString& sessionId, TYtSettings::TConstPtr& config, TExprNode::TPtr outputOpBase, TExprContext& ctx) {
+ TFuture<TFmrOperationResult> DoUpload(const TString& outputCluster, const TString& outputPath, const TString& sessionId, TYtSettings::TConstPtr& config, TExprNode::TPtr outputOpBase, TExprContext& ctx) {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId);
- std::vector<TString> ytTableInfo;
- StringSplitter(fmrTableRef.TableId).SplitByString(".").AddTo(&ytTableInfo);
- YQL_ENSURE(ytTableInfo.size() == 2);
- TString outputCluster = ytTableInfo[0], outputPath = ytTableInfo[1];
+ TFmrTableRef fmrTableRef{outputCluster + "." + outputPath};
auto tablePresenceStatus = GetTablePresenceStatus(fmrTableRef.TableId, sessionId);
+
if (!tablePresenceStatus || *tablePresenceStatus != ETablePresenceStatus::OnlyInFmr) {
YQL_CLOG(DEBUG, FastMapReduce) << " We assume table " << fmrTableRef.TableId << " should be present in yt, not uploading from fmr";
- TFmrOperationResult fmrOperationResult = TFmrOperationResult();
- fmrOperationResult.SetSuccess();
- return MakeFuture(fmrOperationResult);
+ return GetSuccessfulFmrOperationResult();
}
- TUploadOperationParams uploadOperationParams{
- .Input = fmrTableRef,
- .Output = TYtTableRef{.Path = outputPath, .Cluster = outputCluster}
- };
+ TString realPath = GetRealTablePath(sessionId, outputCluster, outputPath, config);
+ TYtTableRef outputTable{.Path = realPath, .Cluster = outputCluster};
+ outputTable.FilePath = GetTableFilePath(TGetTableFilePathOptions(sessionId).Cluster(outputCluster).Path(outputPath).IsTemp(true));
+
+ TUploadOperationParams uploadOperationParams{.Input = fmrTableRef, .Output = outputTable};
- auto clusterConnection = GetTablesClusterConnection(outputCluster, sessionId, config);
+ auto clusterConnection = GetTableClusterConnection(outputCluster, sessionId, config);
TStartOperationRequest uploadRequest{
.TaskType = ETaskType::Upload,
.OperationParams = uploadOperationParams,
.SessionId = sessionId,
.IdempotencyKey = GenerateId(),
.NumRetries=1,
- .ClusterConnection = clusterConnection,
+ .ClusterConnections = std::unordered_map<TString, TClusterConnection>{{fmrTableRef.TableId, clusterConnection}},
.FmrOperationSpec = config->FmrOperationSpec.Get(outputCluster)
};
@@ -364,9 +365,9 @@ private:
.Config(config);
auto prepareFuture = Slave_->Prepare(outputOpBase, ctx, std::move(prepareOptions));
- return prepareFuture.Apply([this, uploadRequest = std::move(uploadRequest), sessionId = std::move(sessionId), fmrTableId = std::move(fmrTableRef.TableId)] (const TFuture<TRunResult>& f) {
+ return prepareFuture.Apply([this, uploadRequest = std::move(uploadRequest), sessionId = std::move(sessionId), fmrTableId = std::move(fmrTableRef.TableId)] (const TFuture<TRunResult>& f) mutable {
try {
- f.GetValue(); // rethrow error if any
+ f.GetValue();
YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId);
YQL_CLOG(DEBUG, FastMapReduce) << "Starting upload from fmr to yt for table: " << fmrTableId;
return GetRunningOperationFuture(uploadRequest, sessionId).Apply([this, sessionId = std::move(sessionId), fmrTableId = std::move(fmrTableId)] (const TFuture<TFmrOperationResult>& f) {
@@ -387,19 +388,22 @@ private:
});
}
- TFuture<TFmrOperationResult> DoMerge(const std::vector<TYtTableRef>& inputTables, TYtTableRef& outputTable, TRunOptions&& options) {
+ TFuture<TFmrOperationResult> DoMerge(const std::vector<std::pair<TYtTableRef, bool>>& inputTables, TYtTableRef& outputTable, TRunOptions&& options) {
TString sessionId = options.SessionId();
YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId);
YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
- auto cluster = GetClusterFromMergeTables(inputTables, outputTable); // Can set outputTable.Cluster if empty
+ if (outputTable.Cluster.empty()) {
+ outputTable.Cluster = inputTables[0].first.Cluster;
+ }
- TString outputTableId = outputTable.Path, outputCluster = outputTable.Cluster;
- TString transformedOutputTableId = GetTransformedPath(outputTableId, sessionId, options.Config());
- TFmrTableRef fmrOutputTable{.TableId = outputCluster + "." + transformedOutputTableId};
+ TString outputCluster = outputTable.Cluster, outputPath = outputTable.Path;
+ TFmrTableRef fmrOutputTable{.TableId = outputCluster + "." + outputPath};
std::vector<TOperationTableRef> mergeInputTables;
- for (auto& ytTable: inputTables) {
- TString fmrTableId = ytTable.Cluster + "." + ytTable.Path;
+ std::unordered_map<TString, TClusterConnection> clusterConnections;
+ for (auto [ytTable, isTemp]: inputTables) {
+ TString inputCluster = ytTable.Cluster, inputPath = ytTable.Path;
+ TString fmrTableId = inputCluster + "." + inputPath;
auto tablePresenceStatus = GetTablePresenceStatus(fmrTableId, sessionId);
if (!tablePresenceStatus) {
SetTablePresenceStatus(fmrTableId, sessionId, ETablePresenceStatus::OnlyInYt);
@@ -407,33 +411,40 @@ private:
if (tablePresenceStatus && *tablePresenceStatus != ETablePresenceStatus::OnlyInYt) {
// table is in fmr, do not download
- mergeInputTables.emplace_back(TFmrTableRef{.TableId = fmrTableId});
+ mergeInputTables.emplace_back(TFmrTableRef{.TableId = GetFmrIdOrAlias(fmrTableId, sessionId)});
} else {
+ ytTable.FilePath = GetTableFilePath(TGetTableFilePathOptions(sessionId).Cluster(inputCluster).Path(inputPath).IsTemp(isTemp));
mergeInputTables.emplace_back(ytTable);
+ clusterConnections.emplace(fmrTableId, GetTableClusterConnection(ytTable.Cluster, sessionId, options.Config()));
}
}
TMergeOperationParams mergeOperationParams{.Input = mergeInputTables,.Output = fmrOutputTable};
- auto clusterConnection = GetTablesClusterConnection(cluster, sessionId, options.Config());
TStartOperationRequest mergeOperationRequest{
.TaskType = ETaskType::Merge,
.OperationParams = mergeOperationParams,
.SessionId = sessionId,
.IdempotencyKey = GenerateId(),
.NumRetries = 1,
- .ClusterConnection = clusterConnection,
+ .ClusterConnections = clusterConnections,
.FmrOperationSpec = options.Config()->FmrOperationSpec.Get(outputCluster)
};
std::vector<TString> inputPaths;
- std::transform(inputTables.begin(),inputTables.end(), std::back_inserter(inputPaths), [](const TYtTableRef& ytTableRef){
- return ytTableRef.Path;}
+ std::transform(inputTables.begin(),inputTables.end(), std::back_inserter(inputPaths), [](const std::pair<TYtTableRef, bool>& table){
+ return table.first.Path;}
);
YQL_CLOG(DEBUG, FastMapReduce) << "Starting merge from yt tables: " << JoinRange(' ', inputPaths.begin(), inputPaths.end());
return GetRunningOperationFuture(mergeOperationRequest, sessionId);
}
+ TFuture<TFmrOperationResult> GetSuccessfulFmrOperationResult() {
+ TFmrOperationResult fmrOperationResult = TFmrOperationResult();
+ fmrOperationResult.SetSuccess();
+ return MakeFuture(fmrOperationResult);
+ }
+
private:
struct TFmrGatewayOperationsState {
std::unordered_map<TString, TPromise<TFmrOperationResult>> OperationStatuses = {}; // operationId -> promise which we set when operation completes
@@ -443,6 +454,7 @@ private:
TFmrGatewayOperationsState OperationStates;
std::unordered_map<TString, ETablePresenceStatus> TablePresenceStatuses; // yt cluster and path -> is it In Yt, Fmr TableDataService
TString UserName;
+ std::unordered_map<TString, TString> FmrIdAliases;
};
struct TSession {
@@ -465,3 +477,21 @@ IYtGateway::TPtr CreateYtFmrGateway(IYtGateway::TPtr slave, IFmrCoordinator::TPt
}
} // namespace NYql::NFmr
+
+template<>
+void Out<NYql::NFmr::ETablePresenceStatus>(IOutputStream& out, NYql::NFmr::ETablePresenceStatus status) {
+ switch (status) {
+ case NYql::NFmr::ETablePresenceStatus::Both: {
+ out << "BOTH";
+ return;
+ }
+ case NYql::NFmr::ETablePresenceStatus::OnlyInFmr: {
+ out << "ONLY IN FMR";
+ return;
+ }
+ case NYql::NFmr::ETablePresenceStatus::OnlyInYt: {
+ out << "ONLY IN YT";
+ return;
+ }
+ }
+}
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
index 4f476876b2d..02152c47bba 100644
--- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
+++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
@@ -5798,7 +5798,11 @@ private:
TClusterConnectionResult clusterConnectionResult{};
clusterConnectionResult.TransactionId = GetGuidAsString(entry->Tx->GetId());
clusterConnectionResult.YtServerName = ytServer;
- clusterConnectionResult.Token = options.Config()->Auth.Get();
+ auto auth = options.Config()->Auth.Get();
+ if (!auth || auth->empty()) {
+ auth = Clusters_->GetAuth(options.Cluster());
+ }
+ clusterConnectionResult.Token = auth;
clusterConnectionResult.SetSuccess();
return clusterConnectionResult;
} catch (...) {
@@ -5806,6 +5810,10 @@ private:
}
}
+ TMaybe<TString> GetTableFilePath(const TGetTableFilePathOptions&&) override {
+ return Nothing();
+ }
+
static void ReportBlockStatus(const TYtOpBase& op, const TExecContext<TRunOptions>::TPtr& execCtx) {
if (execCtx->Options_.PublicId().Empty()) {
return;
diff --git a/yt/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp b/yt/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp
index 2d204896f99..3c7f71daeb4 100644
--- a/yt/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp
+++ b/yt/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp
@@ -965,6 +965,10 @@ public:
return Inner_->GetClusterConnection(std::move(options));
}
+ TMaybe<TString> GetTableFilePath(const TGetTableFilePathOptions&& options) override {
+ return Inner_->GetTableFilePath(std::move(options));
+ }
+
private:
const IYtGateway::TPtr Inner_;
const TQContext QContext_;
diff --git a/yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.cpp b/yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.cpp
index 42b30aafb3a..d3ef8db327e 100644
--- a/yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.cpp
@@ -137,4 +137,12 @@ void TYtForwardingGatewayBase::AddCluster(const TYtClusterConfig& config) {
Slave_->AddCluster(config);
}
+IYtGateway::TClusterConnectionResult TYtForwardingGatewayBase::GetClusterConnection(const TClusterConnectionOptions&& options) {
+ return Slave_->GetClusterConnection(std::move(options));
+}
+
+TMaybe<TString> TYtForwardingGatewayBase::GetTableFilePath(const TGetTableFilePathOptions&& options) {
+ return Slave_->GetTableFilePath(std::move(options));
+}
+
} // namspace NYql
diff --git a/yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.h b/yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.h
index 1687d400506..597557bc5ec 100644
--- a/yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.h
+++ b/yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.h
@@ -71,6 +71,10 @@ public:
void AddCluster(const TYtClusterConfig& config) override;
+ TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& options) override;
+
+ TMaybe<TString> GetTableFilePath(const TGetTableFilePathOptions&& options) override;
+
protected:
IYtGateway::TPtr Slave_;
};
diff --git a/yt/yql/providers/yt/provider/yql_yt_gateway.h b/yt/yql/providers/yt/provider/yql_yt_gateway.h
index 2c713ebe265..e1b49abf7b6 100644
--- a/yt/yql/providers/yt/provider/yql_yt_gateway.h
+++ b/yt/yql/providers/yt/provider/yql_yt_gateway.h
@@ -635,6 +635,19 @@ public:
TMaybe<TString> Token;
};
+ struct TGetTableFilePathOptions: public TCommonOptions {
+ using TSelf = TGetTableFilePathOptions;
+
+ TGetTableFilePathOptions(const TString& sessionId)
+ : TCommonOptions(sessionId)
+ {
+ }
+
+ OPTION_FIELD(TString, Cluster)
+ OPTION_FIELD(TString, Path)
+ OPTION_FIELD(bool, IsTemp)
+ };
+
public:
virtual ~IYtGateway() = default;
@@ -697,6 +710,9 @@ public:
virtual void AddCluster(const TYtClusterConfig& cluster) = 0;
virtual TClusterConnectionResult GetClusterConnection(const TClusterConnectionOptions&& options) = 0;
+
+ virtual TMaybe<TString> GetTableFilePath(const TGetTableFilePathOptions&& options) = 0;
+
};
}
diff --git a/yt/yql/tools/ytrun/lib/ya.make b/yt/yql/tools/ytrun/lib/ya.make
index 8f04604931c..fc10024b330 100644
--- a/yt/yql/tools/ytrun/lib/ya.make
+++ b/yt/yql/tools/ytrun/lib/ya.make
@@ -6,13 +6,7 @@ SRCS(
PEERDIR(
yt/yql/providers/yt/provider
- yt/yql/providers/yt/fmr/coordinator/client
- yt/yql/providers/yt/fmr/coordinator/impl
- yt/yql/providers/yt/fmr/job/impl
- yt/yql/providers/yt/fmr/job_factory/impl
- yt/yql/providers/yt/fmr/table_data_service/local
- yt/yql/providers/yt/fmr/worker/impl
- yt/yql/providers/yt/fmr/yt_service/impl
+ yt/yql/providers/yt/fmr/fmr_tool_lib
yt/yql/providers/yt/gateway/native
yt/yql/providers/yt/gateway/fmr
yt/yql/providers/yt/lib/config_clusters
diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
index 63e0c1cede8..260c8b69647 100644
--- a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
+++ b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
@@ -8,12 +8,7 @@
#include <yt/yql/providers/yt/lib/log/yt_logger.h>
#include <yt/yql/providers/yt/gateway/native/yql_yt_native.h>
#include <yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h>
-#include <yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.h>
-#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h>
-#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h>
-#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
-#include <yt/yql/providers/yt/fmr/table_data_service/local/yql_yt_table_data_service_local.h>
-#include <yt/yql/providers/yt/fmr/yt_service/impl/yql_yt_yt_service_impl.h>
+#include <yt/yql/providers/yt/fmr/fmr_tool_lib/yql_yt_fmr_initializer.h>
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
#include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
#include <yql/essentials/core/services/yql_transform_pipeline.h>
@@ -155,12 +150,12 @@ TYtRunTool::TYtRunTool(TString name)
DefYtServer_ = NYql::TConfigClusters::GetDefaultYtServer(*ytConfig);
- if (GetRunOptions().GatewayTypes.contains(FastMapReduceGatewayName)) {
+ if (GetRunOptions().GatewayTypes.contains(NFmr::FastMapReduceGatewayName)) {
GetRunOptions().GatewayTypes.emplace(YtProviderName);
}
});
- GetRunOptions().SetSupportedGateways({TString{YtProviderName}, TString{FastMapReduceGatewayName}});
+ GetRunOptions().SetSupportedGateways({TString{YtProviderName}, TString{NFmr::FastMapReduceGatewayName}});
GetRunOptions().GatewayTypes.emplace(YtProviderName);
AddFsDownloadFactory([this]() -> NFS::IDownloaderPtr {
@@ -187,38 +182,13 @@ IYtGateway::TPtr TYtRunTool::CreateYtGateway() {
services.FileStorage = GetFileStorage();
services.Config = std::make_shared<TYtGatewayConfig>(GetRunOptions().GatewaysConfig->GetYt());
auto ytGateway = CreateYtNativeGateway(services);
- if (!GetRunOptions().GatewayTypes.contains(FastMapReduceGatewayName)) {
+ if (!GetRunOptions().GatewayTypes.contains(NFmr::FastMapReduceGatewayName)) {
return ytGateway;
}
- auto coordinator = NFmr::MakeFmrCoordinator();
- if (!FmrCoordinatorServerUrl_.empty()) {
- NFmr::TFmrCoordinatorClientSettings coordinatorClientSettings;
- THttpURL parsedUrl;
- if (parsedUrl.Parse(FmrCoordinatorServerUrl_) != THttpURL::ParsedOK) {
- ythrow yexception() << "Invalid fast map reduce coordinator server url passed in parameters";
- }
- coordinatorClientSettings.Port = parsedUrl.GetPort();
- coordinatorClientSettings.Host = parsedUrl.GetHost();
- coordinator = NFmr::MakeFmrCoordinatorClient(coordinatorClientSettings);
- }
-
- if (!DisableLocalFmrWorker_) {
- auto tableDataService = MakeLocalTableDataService(NFmr::TLocalTableDataServiceSettings(3));
- auto fmrYtSerivce = NFmr::MakeFmrYtSerivce();
-
- auto func = [tableDataService, fmrYtSerivce] (NFmr::TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) mutable {
- return NFmr::RunJob(task, tableDataService, fmrYtSerivce, cancelFlag);
- };
-
- NFmr::TFmrJobFactorySettings settings{.Function=func};
- auto jobFactory = MakeFmrJobFactory(settings);
- NFmr::TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDefaultRandomProvider(),
- .TimeToSleepBetweenRequests=TDuration::Seconds(1)};
- FmrWorker_ = MakeFmrWorker(coordinator, jobFactory, workerSettings);
- FmrWorker_->Start();
- }
- return NFmr::CreateYtFmrGateway(ytGateway, coordinator);
+ auto [fmrGateway, worker] = NFmr::InitializeFmrGateway(ytGateway, DisableLocalFmrWorker_, FmrCoordinatorServerUrl_);
+ FmrWorker_ = std::move(worker);
+ return fmrGateway;
}
IOptimizerFactory::TPtr TYtRunTool::CreateCboFactory() {
diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.h b/yt/yql/tools/ytrun/lib/ytrun_lib.h
index edbc7c12ec2..9fa4ee3df5a 100644
--- a/yt/yql/tools/ytrun/lib/ytrun_lib.h
+++ b/yt/yql/tools/ytrun/lib/ytrun_lib.h
@@ -12,8 +12,6 @@
namespace NYql {
-constexpr TStringBuf FastMapReduceGatewayName = "fmr";
-
class TYtRunTool: public TFacadeRunner {
public:
TYtRunTool(TString name = "ytrun");