aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorcdzyura171 <cdzyura171@yandex-team.com>2025-02-10 11:53:19 +0300
committercdzyura171 <cdzyura171@yandex-team.com>2025-02-10 12:12:39 +0300
commit24f0f77bb308a9675f1aae84708724718f12282f (patch)
tree8a574286a14121e448609918089730004528ebeb
parentb2fa5f3d3be16912e911d1c127db0c5a7b22c1a2 (diff)
downloadydb-24f0f77bb308a9675f1aae84708724718f12282f.tar.gz
Use coordinator and worker in fmrGateway
use coordinator and worker in fmrGateway commit_hash:a0977459dcbe041e5330c8959152be40fe18eea6
-rw-r--r--yql/essentials/tools/yql_facade_run/yql_facade_run.cpp6
-rw-r--r--yql/essentials/utils/log/log_component.h4
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make16
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp314
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ya.make21
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp246
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h18
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/interface/ya.make14
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.cpp1
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h68
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make15
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp64
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/impl/ya.make20
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp62
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h16
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/interface/ya.make13
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.cpp1
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h17
-rw-r--r--yt/yql/providers/yt/fmr/request_options/ya.make15
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp28
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h126
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/ut/ya.make16
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp101
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/ya.make20
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp139
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h14
-rw-r--r--yt/yql/providers/yt/fmr/worker/interface/ya.make15
-rw-r--r--yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.cpp1
-rw-r--r--yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h19
-rw-r--r--yt/yql/providers/yt/gateway/fmr/ya.make2
-rw-r--r--yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp266
-rw-r--r--yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h15
-rw-r--r--yt/yql/tools/ytrun/lib/ya.make3
-rw-r--r--yt/yql/tools/ytrun/lib/ytrun_lib.cpp23
-rw-r--r--yt/yql/tools/ytrun/lib/ytrun_lib.h2
35 files changed, 1710 insertions, 11 deletions
diff --git a/yql/essentials/tools/yql_facade_run/yql_facade_run.cpp b/yql/essentials/tools/yql_facade_run/yql_facade_run.cpp
index 4cee954bd8..5a2dc645a6 100644
--- a/yql/essentials/tools/yql_facade_run/yql_facade_run.cpp
+++ b/yql/essentials/tools/yql_facade_run/yql_facade_run.cpp
@@ -281,13 +281,13 @@ void TFacadeRunOptions::Parse(int argc, const char *argv[]) {
opts.AddLongOption("scan-udfs", "Scan specified udfs with external udf-resolver to use static function registry").NoArgument().SetFlag(&ScanUdfs);
opts.AddLongOption("parse-only", "Parse program and exit").NoArgument().StoreValue(&Mode, ERunMode::Parse);
- opts.AddLongOption("compile-only", "Compiled program and exit").NoArgument().StoreValue(&Mode, ERunMode::Compile);
+ opts.AddLongOption("compile-only", "Compile program and exit").NoArgument().StoreValue(&Mode, ERunMode::Compile);
opts.AddLongOption("validate", "Validate program and exit").NoArgument().StoreValue(&Mode, ERunMode::Validate);
opts.AddLongOption("lineage", "Calculate program lineage and exit").NoArgument().StoreValue(&Mode, ERunMode::Lineage);
- opts.AddLongOption('O',"optimize", "Optimize program and exir").NoArgument().StoreValue(&Mode, ERunMode::Optimize);
+ opts.AddLongOption('O',"optimize", "Optimize program and exit").NoArgument().StoreValue(&Mode, ERunMode::Optimize);
opts.AddLongOption('D', "discover", "Discover tables in the program and exit").NoArgument().StoreValue(&Mode, ERunMode::Discover);
opts.AddLongOption("peephole", "Perform peephole program optimization and exit").NoArgument().StoreValue(&Mode, ERunMode::Peephole);
- opts.AddLongOption('R',"run", "Run progrum (use by default)").NoArgument().StoreValue(&Mode, ERunMode::Run);
+ opts.AddLongOption('R',"run", "Run program (use by default)").NoArgument().StoreValue(&Mode, ERunMode::Run);
opts.AddLongOption('L', "show-log", "Show transformation log").Optional().NoArgument().SetFlag(&ShowLog);
opts.AddLongOption('v', "verbosity", "Log verbosity level").Optional().RequiredArgument("LEVEL").StoreResult(&Verbosity);
diff --git a/yql/essentials/utils/log/log_component.h b/yql/essentials/utils/log/log_component.h
index 5cf9357ef4..be1df13863 100644
--- a/yql/essentials/utils/log/log_component.h
+++ b/yql/essentials/utils/log/log_component.h
@@ -83,7 +83,7 @@ struct EComponentHelpers {
case EComponent::ProviderGeneric: return TStringBuf("generic");
case EComponent::ProviderPg: return TStringBuf("PG");
case EComponent::ProviderPure: return TStringBuf("pure");
- case EComponent::FastMapReduce: return TStringBuf("fast map reduce");
+ case EComponent::FastMapReduce: return TStringBuf("FMR");
case EComponent::ProviderYtflow: return TStringBuf("YTFLOW");
default:
ythrow yexception() << "invalid log component value: "
@@ -119,7 +119,7 @@ struct EComponentHelpers {
if (str == TStringBuf("generic")) return EComponent::ProviderGeneric;
if (str == TStringBuf("PG")) return EComponent::ProviderPg;
if (str == TStringBuf("pure")) return EComponent::ProviderPure;
- if (str == TStringBuf("fast map reduce")) return EComponent::FastMapReduce;
+ if (str == TStringBuf("FMR")) return EComponent::FastMapReduce;
if (str == TStringBuf("YTFLOW")) return EComponent::ProviderYtflow;
ythrow yexception() << "unknown log component: '" << str << '\'';
}
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make b/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make
new file mode 100644
index 0000000000..a661e2cb18
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make
@@ -0,0 +1,16 @@
+UNITTEST()
+
+SRCS(
+ yql_yt_coordinator_ut.cpp
+)
+
+PEERDIR(
+ library/cpp/yt/assert
+ yt/yql/providers/yt/fmr/coordinator/impl
+ yt/yql/providers/yt/fmr/job_factory/impl
+ yt/yql/providers/yt/fmr/worker/impl
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
new file mode 100644
index 0000000000..9ba6dfed78
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
@@ -0,0 +1,314 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/threading/future/async.h>
+#include <util/stream/output.h>
+#include <util/string/cast.h>
+#include <util/system/mutex.h>
+#include <util/thread/pool.h>
+#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h>
+#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
+#include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h>
+
+namespace NYql {
+
+class TFmrWorkerProxy: public IFmrWorker {
+public:
+ TFmrWorkerProxy(IFmrCoordinator::TPtr coordinator, IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings):
+ Coordinator_(coordinator), JobFactory_(jobFactory), WorkerSettings_(settings), WorkerCreationNum_(1)
+ {
+ Worker_ = MakeFmrWorker(Coordinator_, JobFactory_, WorkerSettings_);
+ }
+
+ void ResetWorker() {
+ ++WorkerCreationNum_;
+ WorkerSettings_.RandomProvider = CreateDeterministicRandomProvider(WorkerCreationNum_);
+
+ Stop();
+ Worker_ = MakeFmrWorker(Coordinator_, JobFactory_, WorkerSettings_);
+ Start();
+ }
+
+ void Start() {
+ Worker_->Start();
+ }
+
+ void Stop() {
+ Worker_->Stop();
+ }
+
+private:
+ IFmrCoordinator::TPtr Coordinator_;
+ IFmrJobFactory::TPtr JobFactory_;
+ IFmrWorker::TPtr Worker_;
+ TFmrWorkerSettings WorkerSettings_;
+ int WorkerCreationNum_;
+};
+
+
+TDownloadTaskParams downloadTaskParams{
+ .Input = TYtTableRef{"Path","Cluster","TransactionId"},
+ .Output = TFmrTableRef{"TableId"}
+};
+
+TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Download, TTaskParams taskParams = downloadTaskParams) {
+ return TStartOperationRequest{.TaskType = taskType, .TaskParams = taskParams, .SessionId = "SessionId", .IdempotencyKey = "IdempotencyKey"};
+}
+
+std::vector<TStartOperationRequest> CreateSeveralOperationRequests(
+ ETaskType taskType = ETaskType::Download, TTaskParams taskParams = downloadTaskParams, int numRequests = 10)
+{
+ std::vector<TStartOperationRequest> startOperationRequests(numRequests);
+ for (int i = 0; i < numRequests; ++i) {
+ startOperationRequests[i] = TStartOperationRequest{
+ .TaskType = taskType, .TaskParams = taskParams, .IdempotencyKey = "IdempotencyKey_" + ToString(i)
+ };
+ }
+ return startOperationRequests;
+}
+
+auto defaultTaskFunction = [] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
+ while (!cancelFlag->load()) {
+ Sleep(TDuration::Seconds(4));
+ return ETaskStatus::Completed;
+ }
+ return ETaskStatus::Aborted;
+};
+
+Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
+ Y_UNIT_TEST(StartOperation) {
+ auto coordinator = MakeFmrCoordinator();
+ auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync();
+ auto status = startOperationResponse.Status;
+ UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Accepted);
+ }
+ Y_UNIT_TEST(RetryAcceptedOperation) {
+ auto coordinator = MakeFmrCoordinator();
+ auto downloadRequest = CreateOperationRequest();
+ auto firstResponse = coordinator->StartOperation(downloadRequest).GetValueSync();
+ auto firstOperationId = firstResponse.OperationId;
+ auto sameRequest = coordinator->StartOperation(downloadRequest);
+ auto secondResponse = sameRequest.GetValueSync();
+ auto secondOperationId = secondResponse.OperationId;
+ auto status = secondResponse.Status;
+ UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Accepted);
+ UNIT_ASSERT_NO_DIFF(firstOperationId, secondOperationId);
+ }
+
+ Y_UNIT_TEST(DeleteNonexistentOperation) {
+ auto coordinator = MakeFmrCoordinator();
+ auto deleteOperationResponse = coordinator->DeleteOperation({"delete_operation_id"}).GetValueSync();
+ EOperationStatus status = deleteOperationResponse.Status;
+ UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::NotFound);
+ }
+ Y_UNIT_TEST(DeleteOperationBeforeSendToWorker) {
+ auto coordinator = MakeFmrCoordinator();
+ auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync();
+ TString operationId = startOperationResponse.OperationId;
+ auto deleteOperationResponse = coordinator->DeleteOperation({operationId}).GetValueSync();
+ EOperationStatus status = deleteOperationResponse.Status;
+ UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Aborted);
+ }
+ Y_UNIT_TEST(GetNonexistentOperation) {
+ auto coordinator = MakeFmrCoordinator();
+ auto getOperationResponse = coordinator->GetOperation({"get_operation_id"}).GetValueSync();
+ EOperationStatus status = getOperationResponse.Status;
+ UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::NotFound);
+ }
+ Y_UNIT_TEST(GetAcceptedOperationStatus) {
+ auto coordinator = MakeFmrCoordinator();
+ auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync();
+ TString operationId = startOperationResponse.OperationId;
+ auto getOperationResponse = coordinator->GetOperation({operationId}).GetValueSync();
+ EOperationStatus status = getOperationResponse.Status;
+ UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Accepted);
+ }
+ Y_UNIT_TEST(GetRunningOperationStatus) {
+ auto coordinator = MakeFmrCoordinator();
+ auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync();
+ TString operationId = startOperationResponse.OperationId;
+
+ TFmrJobFactorySettings settings{.NumThreads = 3, .Function = defaultTaskFunction};
+ auto factory = MakeFmrJobFactory(settings);
+ TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
+ worker->Start();
+ Sleep(TDuration::Seconds(1));
+ auto getOperationResponse = coordinator->GetOperation({operationId}).GetValueSync();
+ EOperationStatus status = getOperationResponse.Status;
+ UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::InProgress);
+ }
+ Y_UNIT_TEST(GetCompletedOperationStatuses) {
+ auto coordinator = MakeFmrCoordinator();
+ auto startOperationRequests = CreateSeveralOperationRequests();
+ std::vector<TString> operationIds;
+ for (auto& request: startOperationRequests) {
+ auto startOperationResponse = coordinator->StartOperation(request).GetValueSync();
+ operationIds.emplace_back(startOperationResponse.OperationId);
+ }
+ TFmrJobFactorySettings settings{.NumThreads = 10, .Function = defaultTaskFunction};
+ auto factory = MakeFmrJobFactory(settings);
+ TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
+ worker->Start();
+ Sleep(TDuration::Seconds(6));
+ for (auto& operationId: operationIds) {
+ auto getOperationResponse = coordinator->GetOperation({operationId}).GetValueSync();
+ EOperationStatus status = getOperationResponse.Status;
+ UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Completed);
+ }
+ }
+ Y_UNIT_TEST(GetCompletedAndFailedOperationStatuses) {
+ auto coordinator = MakeFmrCoordinator();
+ auto downloadOperationRequests = CreateSeveralOperationRequests();
+ std::vector<TString> downloadOperationIds;
+ for (auto& request: downloadOperationRequests) {
+ auto startOperationResponse = coordinator->StartOperation(request).GetValueSync();
+ downloadOperationIds.emplace_back(startOperationResponse.OperationId);
+ }
+ auto uploadOperationRequest = CreateOperationRequest(ETaskType::Upload, TUploadTaskParams{});
+ auto uploadOperationResponse = coordinator->StartOperation(uploadOperationRequest).GetValueSync();
+ auto uploadOperationId = uploadOperationResponse.OperationId;
+
+ auto func = [&] (TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) {
+ while (! cancelFlag->load()) {
+ Sleep(TDuration::Seconds(1));
+ ETaskStatus taskStatus = std::visit([] (auto&& taskParams) {
+ using T = std::decay_t<decltype(taskParams)>;
+ if constexpr (std::is_same_v<T, TUploadTaskParams>) {
+ return ETaskStatus::Failed;
+ }
+ return ETaskStatus::Completed;
+ }, task->TaskParams);
+ if (taskStatus == ETaskStatus::Failed) {
+ return taskStatus;
+ }
+ Sleep(TDuration::Seconds(1));
+ return ETaskStatus::Completed;
+ }
+ return ETaskStatus::Aborted;
+ };
+
+ TFmrJobFactorySettings settings{.NumThreads = 10, .Function = func};
+ auto factory = MakeFmrJobFactory(settings);
+ TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
+ worker->Start();
+ Sleep(TDuration::Seconds(5));
+
+ for (auto& operationId: downloadOperationIds) {
+ auto getDownloadOperationResponse = coordinator->GetOperation({operationId}).GetValueSync();
+ EOperationStatus status = getDownloadOperationResponse.Status;
+ UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Completed);
+ }
+ auto getUploadOperationResponse = coordinator->GetOperation({uploadOperationId}).GetValueSync();
+ EOperationStatus uploadStatus = getUploadOperationResponse.Status;
+ UNIT_ASSERT_VALUES_EQUAL(uploadStatus, EOperationStatus::Failed);
+ }
+ Y_UNIT_TEST(RetryRunningOperation) {
+ auto coordinator = MakeFmrCoordinator();
+ auto downloadRequest = CreateOperationRequest();
+ auto startOperationResponse = coordinator->StartOperation(downloadRequest).GetValueSync();
+ TString firstOperationId = startOperationResponse.OperationId;
+
+ TFmrJobFactorySettings settings{.NumThreads = 3, .Function = defaultTaskFunction};
+ auto factory = MakeFmrJobFactory(settings);
+ TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
+ worker->Start();
+
+ Sleep(TDuration::Seconds(1));
+ auto secondStartOperationResponse = coordinator->StartOperation(downloadRequest).GetValueSync();
+ EOperationStatus status = secondStartOperationResponse.Status;
+ TString secondOperationId = secondStartOperationResponse.OperationId;
+ UNIT_ASSERT_NO_DIFF(firstOperationId, secondOperationId);
+ UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::InProgress);
+ }
+ Y_UNIT_TEST(RetryRunningOperationAfterIdempotencyKeyClear) {
+ TFmrCoordinatorSettings coordinatorSettings{
+ .WorkersNum = 1, .RandomProvider = CreateDeterministicRandomProvider(2), .IdempotencyKeyStoreTime = TDuration::Seconds(1)};
+ auto coordinator = MakeFmrCoordinator(coordinatorSettings);
+
+ TFmrJobFactorySettings settings{.NumThreads = 3, .Function = defaultTaskFunction};
+ auto factory = MakeFmrJobFactory(settings);
+ TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
+ worker->Start();
+
+ auto downloadRequest = CreateOperationRequest();
+ auto startOperationResponse = coordinator->StartOperation(downloadRequest).GetValueSync();
+ TString firstOperationId = startOperationResponse.OperationId;
+
+ Sleep(TDuration::Seconds(2));
+ auto secondStartOperationResponse = coordinator->StartOperation(downloadRequest).GetValueSync();
+ EOperationStatus secondOperationStatus = secondStartOperationResponse.Status;
+ TString secondOperationId = secondStartOperationResponse.OperationId;
+ auto getFirstOperationResponse = coordinator->GetOperation({firstOperationId}).GetValueSync();
+ EOperationStatus firstOperationStatus = getFirstOperationResponse.Status;
+
+ UNIT_ASSERT_VALUES_UNEQUAL(firstOperationId, secondOperationId);
+ UNIT_ASSERT_VALUES_EQUAL(firstOperationStatus, EOperationStatus::InProgress);
+ UNIT_ASSERT_VALUES_EQUAL(secondOperationStatus, EOperationStatus::Accepted);
+ }
+ Y_UNIT_TEST(CancelTasksAfterVolatileIdReload) {
+ auto coordinator = MakeFmrCoordinator();
+ auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
+ int numIterations = 0;
+ while (!cancelFlag->load()) {
+ Sleep(TDuration::Seconds(1));
+ ++numIterations;
+ if (numIterations == 100) {
+ return ETaskStatus::Completed;
+ }
+ }
+ return ETaskStatus::Aborted;
+ };
+ TFmrJobFactorySettings settings{.NumThreads =3, .Function=func};
+ auto factory = MakeFmrJobFactory(settings);
+ TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ TFmrWorkerProxy workerProxy(coordinator, factory, workerSettings);
+
+ workerProxy.Start();
+ auto operationId = coordinator->StartOperation(CreateOperationRequest()).GetValueSync().OperationId;
+ Sleep(TDuration::Seconds(2));
+ workerProxy.ResetWorker();
+ Sleep(TDuration::Seconds(5));
+ workerProxy.Stop();
+ auto getOperationResult = coordinator->GetOperation({operationId}).GetValueSync();
+ auto getOperationStatus = getOperationResult.Status;
+ UNIT_ASSERT_VALUES_EQUAL(getOperationStatus, EOperationStatus::Failed);
+ auto error = getOperationResult.ErrorMessages[0];
+ UNIT_ASSERT_VALUES_EQUAL(error.Component, EFmrComponent::Coordinator);
+ UNIT_ASSERT_NO_DIFF(error.ErrorMessage, "Max retries limit exceeded");
+ UNIT_ASSERT_NO_DIFF(*error.OperationId, operationId);
+ }
+ Y_UNIT_TEST(HandleJobErrors) {
+ auto coordinator = MakeFmrCoordinator();
+ auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync();
+ TString operationId = startOperationResponse.OperationId;
+
+ auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
+ while (! cancelFlag->load()) {
+ Sleep(TDuration::Seconds(2));
+ throw std::runtime_error{"Function crashed"};
+ }
+ return ETaskStatus::Aborted;
+ };
+
+ TFmrJobFactorySettings settings{.NumThreads = 3, .Function = func};
+ auto factory = MakeFmrJobFactory(settings);
+ TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
+ worker->Start();
+ Sleep(TDuration::Seconds(4));
+ auto getOperationResponse = coordinator->GetOperation({operationId}).GetValueSync();
+
+ EOperationStatus status = getOperationResponse.Status;
+ std::vector<TFmrError> errorMessages = getOperationResponse.ErrorMessages;
+ UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Failed);
+ UNIT_ASSERT(errorMessages.size() == 1);
+ auto& error = errorMessages[0];
+ UNIT_ASSERT_NO_DIFF(error.ErrorMessage, "Function crashed");
+ UNIT_ASSERT_VALUES_EQUAL(error.Component, EFmrComponent::Job);
+ }
+}
+
+} // namspace NYql
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ya.make b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make
new file mode 100644
index 0000000000..11d323d128
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make
@@ -0,0 +1,21 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_coordinator_impl.cpp
+)
+
+PEERDIR(
+ library/cpp/random_provider
+ library/cpp/threading/future
+ yt/yql/providers/yt/fmr/coordinator/interface
+ yql/essentials/utils/log
+ yql/essentials/utils
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
new file mode 100644
index 0000000000..b04814c354
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
@@ -0,0 +1,246 @@
+#include <thread>
+#include <yql/essentials/utils/log/log.h>
+#include <yql/essentials/utils/yql_panic.h>
+#include "yql_yt_coordinator_impl.h"
+
+namespace NYql {
+
+namespace {
+
+struct TCoordinatorTaskInfo {
+ TTask::TPtr Task;
+ ETaskStatus TaskStatus;
+ TString OperationId;
+};
+
+struct TOperationInfo {
+ std::unordered_set<TString> TaskIds; // for now each operation consists only of one task, until paritioner is implemented
+ EOperationStatus OperationStatus;
+ std::vector<TFmrError> ErrorMessages;
+ TString SessionId;
+};
+
+struct TIdempotencyKeyInfo {
+ TString operationId;
+ TInstant OperationCreationTime;
+};
+
+class TFmrCoordinator: public IFmrCoordinator {
+public:
+ TFmrCoordinator(const TFmrCoordinatorSettings& settings)
+ : WorkersNum_(settings.WorkersNum),
+ RandomProvider_(settings.RandomProvider),
+ StopCoordinator_(false),
+ TimeToSleepBetweenClearKeyRequests_(settings.TimeToSleepBetweenClearKeyRequests),
+ IdempotencyKeyStoreTime_(settings.IdempotencyKeyStoreTime)
+ {
+ StartClearingIdempotencyKeys();
+ }
+
+ ~TFmrCoordinator() {
+ StopCoordinator_ = true;
+ ClearIdempotencyKeysThread_.join();
+ }
+
+ NThreading::TFuture<TStartOperationResponse> StartOperation(const TStartOperationRequest& request) override {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(request.SessionId);
+ TGuard<TMutex> guard(Mutex_);
+ TMaybe<TString> IdempotencyKey = request.IdempotencyKey;
+ if (IdempotencyKey && IdempotencyKeys_.contains(*IdempotencyKey)) {
+ auto operationId = IdempotencyKeys_[*IdempotencyKey].operationId;
+ auto& operationInfo = Operations_[operationId];
+ return NThreading::MakeFuture(TStartOperationResponse(operationInfo.OperationStatus, operationId));
+ }
+ auto operationId = GenerateId();
+ if (IdempotencyKey) {
+ IdempotencyKeys_[*IdempotencyKey] = TIdempotencyKeyInfo{.operationId = operationId, .OperationCreationTime=TInstant::Now()};
+ }
+
+ TString taskId = GenerateId();
+ TTask::TPtr createdTask = MakeTask(request.TaskType, taskId, request.TaskParams, request.SessionId);
+
+ Tasks_[taskId] = TCoordinatorTaskInfo{.Task = createdTask, .TaskStatus = ETaskStatus::Accepted, .OperationId = operationId};
+
+ Operations_[operationId] = {.TaskIds = {taskId}, .OperationStatus = EOperationStatus::Accepted, .SessionId = request.SessionId};
+ YQL_CLOG(DEBUG, FastMapReduce) << "Starting operation with id " << operationId;
+ return NThreading::MakeFuture(TStartOperationResponse(EOperationStatus::Accepted, operationId));
+ }
+
+ NThreading::TFuture<TGetOperationResponse> GetOperation(const TGetOperationRequest& request) override {
+ TGuard<TMutex> guard(Mutex_);
+ auto operationId = request.OperationId;
+ if (!Operations_.contains(operationId)) {
+ return NThreading::MakeFuture(TGetOperationResponse(EOperationStatus::NotFound));
+ }
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(Operations_[operationId].SessionId);
+ YQL_CLOG(DEBUG, FastMapReduce) << "Getting operation status with id " << operationId;
+ auto& operationInfo = Operations_[operationId];
+ auto operationStatus = operationInfo.OperationStatus;
+ auto errorMessages = operationInfo.ErrorMessages;
+ return NThreading::MakeFuture(TGetOperationResponse(operationStatus, errorMessages));
+ }
+
+ NThreading::TFuture<TDeleteOperationResponse> DeleteOperation(const TDeleteOperationRequest& request) override {
+ TGuard<TMutex> guard(Mutex_);
+ auto operationId = request.OperationId;
+ if (!Operations_.contains(operationId)) {
+ return NThreading::MakeFuture(TDeleteOperationResponse(EOperationStatus::NotFound));
+ }
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(Operations_[operationId].SessionId);
+ YQL_CLOG(DEBUG, FastMapReduce) << "Deleting operation with id " << operationId;
+ auto taskIds = Operations_[operationId].TaskIds;
+ YQL_ENSURE(taskIds.size() == 1);
+ auto taskId = *taskIds.begin();
+ YQL_ENSURE(Tasks_.contains(taskId));
+
+ auto taskStatus = Tasks_[taskId].TaskStatus;
+ if (taskStatus == ETaskStatus::InProgress) {
+ TaskToDeleteIds_.insert(taskId); // Task is currently running, send signal to worker to cancel
+ } else {
+ ClearTask(taskId); // Task either hasn't begun running or finished, remove info
+ }
+
+ return NThreading::MakeFuture(TDeleteOperationResponse(EOperationStatus::Aborted));
+ }
+
+ NThreading::TFuture<THeartbeatResponse> SendHeartbeatResponse(const THeartbeatRequest& request) override {
+ TGuard<TMutex> guard(Mutex_);
+
+ ui32 workerId = request.WorkerId;
+ YQL_ENSURE(workerId >= 1 && workerId <= WorkersNum_);
+ if (! workerToVolatileId_.contains(workerId)) {
+ workerToVolatileId_[workerId] = request.VolatileId;
+ } else if (request.VolatileId != workerToVolatileId_[workerId]) {
+ workerToVolatileId_[workerId] = request.VolatileId;
+ for (auto& [taskId, taskInfo]: Tasks_) {
+ auto taskStatus = Tasks_[taskId].TaskStatus;
+ auto operationId = Tasks_[taskId].OperationId;
+ if (taskStatus == ETaskStatus::InProgress) {
+ TaskToDeleteIds_.insert(taskId); // Task is currently running, send signal to worker to cancel
+ TString sessionId = Operations_[operationId].SessionId;
+ TFmrError error{
+ .Component = EFmrComponent::Coordinator, .ErrorMessage = "Max retries limit exceeded", .OperationId = operationId};
+ SetUnfinishedTaskStatus(taskId, ETaskStatus::Failed, error);
+ }
+ }
+ }
+
+ for (auto& requestTaskState: request.TaskStates) {
+ auto taskId = requestTaskState->TaskId;
+ YQL_ENSURE(Tasks_.contains(taskId));
+ auto taskStatus = requestTaskState->TaskStatus;
+ YQL_ENSURE(taskStatus != ETaskStatus::Accepted);
+ SetUnfinishedTaskStatus(taskId, taskStatus, requestTaskState->TaskErrorMessage);
+ if (TaskToDeleteIds_.contains(taskId) && Tasks_[taskId].TaskStatus != ETaskStatus::InProgress) {
+ ClearTask(taskId); // Task finished, so we don't need to cancel it, just remove info
+ }
+ }
+
+ std::vector<TTask::TPtr> tasksToRun;
+ for (auto& taskToRunInfo: Tasks_) {
+ if (taskToRunInfo.second.TaskStatus == ETaskStatus::Accepted) {
+ SetUnfinishedTaskStatus(taskToRunInfo.first, ETaskStatus::InProgress);
+ tasksToRun.emplace_back(taskToRunInfo.second.Task);
+ }
+ }
+
+ for (auto& taskId: TaskToDeleteIds_) {
+ SetUnfinishedTaskStatus(taskId, ETaskStatus::Aborted);
+ }
+ return NThreading::MakeFuture(THeartbeatResponse{.TasksToRun = tasksToRun, .TaskToDeleteIds = TaskToDeleteIds_});
+ }
+
+private:
+
+ void StartClearingIdempotencyKeys() {
+ auto ClearIdempotencyKeysFunc = [&] () {
+ while (!StopCoordinator_) {
+ with_lock(Mutex_) {
+ auto currentTime = TInstant::Now();
+ for (auto it = IdempotencyKeys_.begin(); it != IdempotencyKeys_.end();) {
+ auto operationCreationTime = it->second.OperationCreationTime;
+ auto operationId = it->second.operationId;
+ if (currentTime - operationCreationTime > IdempotencyKeyStoreTime_) {
+ it = IdempotencyKeys_.erase(it);
+ if (Operations_.contains(operationId)) {
+ auto& operationInfo = Operations_[operationId];
+ auto operationStatus = operationInfo.OperationStatus;
+ auto& taskIds = operationInfo.TaskIds;
+ YQL_ENSURE(taskIds.size() == 1);
+ auto taskId = *operationInfo.TaskIds.begin();
+ if (operationStatus != EOperationStatus::Accepted && operationStatus != EOperationStatus::InProgress) {
+ ClearTask(taskId);
+ }
+ }
+ } else {
+ ++it;
+ }
+ }
+ }
+ Sleep(TimeToSleepBetweenClearKeyRequests_);
+ }
+ };
+ ClearIdempotencyKeysThread_ = std::thread(ClearIdempotencyKeysFunc);
+ }
+
+ TString GenerateId() {
+ return GetGuidAsString(RandomProvider_->GenGuid());
+ }
+
+ void ClearTask(const TString& taskId) {
+ YQL_ENSURE(Tasks_.contains(taskId));
+ auto& taskInfo = Tasks_[taskId];
+ TaskToDeleteIds_.erase(taskId);
+ Operations_.erase(taskInfo.OperationId);
+ Tasks_.erase(taskId);
+ }
+
+ void SetUnfinishedTaskStatus(const TString& taskId, ETaskStatus newTaskStatus, const TMaybe<TFmrError>& taskErrorMessage = Nothing()) {
+ auto& taskInfo = Tasks_[taskId];
+ YQL_ENSURE(Operations_.contains(taskInfo.OperationId));
+ auto& operationInfo = Operations_[taskInfo.OperationId];
+ if (taskInfo.TaskStatus != ETaskStatus::Accepted && taskInfo.TaskStatus != ETaskStatus::InProgress) {
+ return;
+ }
+ taskInfo.TaskStatus = newTaskStatus;
+ operationInfo.OperationStatus = GetOperationStatus(taskInfo.OperationId);
+ if (taskErrorMessage) {
+ auto& errorMessages = operationInfo.ErrorMessages;
+ errorMessages.emplace_back(*taskErrorMessage);
+ }
+ }
+
+ EOperationStatus GetOperationStatus(const TString& operationId) {
+ if (! Operations_.contains(operationId)) {
+ return EOperationStatus::NotFound;
+ }
+ std::unordered_set<TString> taskIds = Operations_[operationId].TaskIds;
+ YQL_ENSURE(taskIds.size() == 1);
+
+ auto taskId = *taskIds.begin();
+ ETaskStatus taskStatus = Tasks_[taskId].TaskStatus;
+ return static_cast<EOperationStatus>(taskStatus);
+ }
+
+ std::unordered_map<TString, TCoordinatorTaskInfo> Tasks_; // TaskId -> current info about it
+ std::unordered_set<TString> TaskToDeleteIds_; // TaskIds we want to pass to worker for deletion
+ std::unordered_map<TString, TOperationInfo> Operations_; // OperationId -> current info about it
+ std::unordered_map<TString, TIdempotencyKeyInfo> IdempotencyKeys_; // IdempotencyKey -> current info about it
+
+ TMutex Mutex_;
+ const ui32 WorkersNum_;
+ std::unordered_map<ui32, TString> workerToVolatileId_; // worker id -> volatile id
+ const TIntrusivePtr<IRandomProvider> RandomProvider_;
+ std::thread ClearIdempotencyKeysThread_;
+ std::atomic<bool> StopCoordinator_;
+ TDuration TimeToSleepBetweenClearKeyRequests_;
+ TDuration IdempotencyKeyStoreTime_;
+};
+
+} // namespace
+
+IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings) {
+ return MakeIntrusive<TFmrCoordinator>(settings);
+}
+
+} // namespace NYql
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
new file mode 100644
index 0000000000..39ff6b1ae6
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h
@@ -0,0 +1,18 @@
+#include <library/cpp/random_provider/random_provider.h>
+#include <util/system/mutex.h>
+#include <util/system/guard.h>
+#include <util/generic/queue.h>
+#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h>
+
+namespace NYql {
+
+struct TFmrCoordinatorSettings {
+ ui32 WorkersNum; // Not supported yet
+ TIntrusivePtr<IRandomProvider> RandomProvider;
+ TDuration IdempotencyKeyStoreTime = TDuration::Seconds(10);
+ TDuration TimeToSleepBetweenClearKeyRequests = TDuration::Seconds(1);
+};
+
+IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings = {.WorkersNum = 1, .RandomProvider = CreateDeterministicRandomProvider(2)});
+
+} // namspace NYql
diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/ya.make b/yt/yql/providers/yt/fmr/coordinator/interface/ya.make
new file mode 100644
index 0000000000..e6f5982117
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/interface/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_coordinator.cpp
+)
+
+PEERDIR(
+ library/cpp/threading/future
+ yt/yql/providers/yt/fmr/request_options
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.cpp b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.cpp
new file mode 100644
index 0000000000..a4b91e2bd2
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.cpp
@@ -0,0 +1 @@
+#include "yql_yt_coordinator.h"
diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
new file mode 100644
index 0000000000..306ccd7db7
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h
@@ -0,0 +1,68 @@
+#pragma once
+
+#include <library/cpp/threading/future/core/future.h>
+#include <util/datetime/base.h>
+
+#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
+
+namespace NYql {
+
+struct THeartbeatRequest {
+ ui32 WorkerId;
+ TString VolatileId;
+ std::vector<TTaskState::TPtr> TaskStates;
+ TStatistics Statistics;
+};
+// Worker sends requests in loop or long polling
+
+struct THeartbeatResponse {
+ std::vector<TTask::TPtr> TasksToRun;
+ std::unordered_set<TString> TaskToDeleteIds;
+};
+
+struct TStartOperationRequest {
+ ETaskType TaskType;
+ TTaskParams TaskParams;
+ TString SessionId;
+ TMaybe<TString> IdempotencyKey = Nothing();
+ ui32 NumRetries = 1; // Not supported yet
+};
+
+struct TStartOperationResponse {
+ EOperationStatus Status;
+ TString OperationId;
+};
+
+struct TGetOperationRequest {
+ TString OperationId;
+};
+
+struct TGetOperationResponse {
+ EOperationStatus Status;
+ std::vector<TFmrError> ErrorMessages = {};
+};
+
+struct TDeleteOperationRequest {
+ TString OperationId;
+};
+
+struct TDeleteOperationResponse {
+ EOperationStatus Status;
+};
+
+class IFmrCoordinator: public TThrRefBase {
+public:
+ using TPtr = TIntrusivePtr<IFmrCoordinator>;
+
+ virtual ~IFmrCoordinator() = default;
+
+ virtual NThreading::TFuture<TStartOperationResponse> StartOperation(const TStartOperationRequest& request) = 0;
+
+ virtual NThreading::TFuture<TGetOperationResponse> GetOperation(const TGetOperationRequest& request) = 0;
+
+ virtual NThreading::TFuture<TDeleteOperationResponse> DeleteOperation(const TDeleteOperationRequest& request) = 0;
+
+ virtual NThreading::TFuture<THeartbeatResponse> SendHeartbeatResponse(const THeartbeatRequest& request) = 0;
+};
+
+} // namspace NYql
diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make b/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make
new file mode 100644
index 0000000000..597cdac1a4
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make
@@ -0,0 +1,15 @@
+UNITTEST()
+
+SRCS(
+ yql_yt_job_factory_ut.cpp
+)
+
+PEERDIR(
+ library/cpp/yt/assert
+ yt/yql/providers/yt/fmr/job_factory/interface
+ yt/yql/providers/yt/fmr/job_factory/impl
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp b/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp
new file mode 100644
index 0000000000..89369f7e60
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp
@@ -0,0 +1,64 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/threading/future/async.h>
+#include <util/stream/output.h>
+#include <util/string/cast.h>
+#include <util/system/mutex.h>
+#include <util/thread/pool.h>
+#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
+#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h>
+
+namespace NYql {
+
+Y_UNIT_TEST_SUITE(FmrFactoryTests) {
+ Y_UNIT_TEST(GetSuccessfulTaskResult) {
+ auto operationResults = std::make_shared<TString>("no_result_yet");
+ auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
+ while (! cancelFlag->load()) {
+ Sleep(TDuration::Seconds(1));
+ *operationResults = "operation_result";
+ return ETaskStatus::Completed;
+ }
+ return ETaskStatus::Aborted;
+ };
+ TFmrJobFactorySettings settings{.NumThreads =3, .Function=func};
+ auto factory = MakeFmrJobFactory(settings);
+ auto cancelFlag = std::make_shared<std::atomic<bool>>(false);
+
+ auto futureTaskStatus = factory->StartJob(nullptr, cancelFlag);
+ auto taskResult = futureTaskStatus.GetValueSync();
+ ETaskStatus taskStatus = taskResult->TaskStatus;
+
+ UNIT_ASSERT_VALUES_EQUAL(taskStatus, ETaskStatus::Completed);
+ UNIT_ASSERT_NO_DIFF(*operationResults, "operation_result");
+ }
+ Y_UNIT_TEST(CancelTask) {
+ auto operationResults = std::make_shared<TString>("no_result_yet");
+ auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
+ int numIterations = 0;
+ *operationResults = "computing_result";
+ while (! cancelFlag->load()) {
+ Sleep(TDuration::Seconds(1));
+ ++numIterations;
+ if (numIterations == 100) {
+ *operationResults = "operation_result";
+ return ETaskStatus::Completed;
+ }
+ }
+ return ETaskStatus::Aborted;
+ };
+ TFmrJobFactorySettings settings{.NumThreads =3, .Function=func};
+
+ auto factory = MakeFmrJobFactory(settings);
+ auto cancelFlag = std::make_shared<std::atomic<bool>>(false);
+ auto futureTaskStatus = factory->StartJob(
+ nullptr, cancelFlag);
+ Sleep(TDuration::Seconds(2));
+ cancelFlag->store(true);
+ auto taskResult = futureTaskStatus.GetValueSync();
+ ETaskStatus taskStatus = taskResult->TaskStatus;
+ UNIT_ASSERT_VALUES_EQUAL(taskStatus, ETaskStatus::Aborted);
+ UNIT_ASSERT_NO_DIFF(*operationResults, "computing_result");
+ }
+}
+
+} // namspace NYql
diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/ya.make b/yt/yql/providers/yt/fmr/job_factory/impl/ya.make
new file mode 100644
index 0000000000..afadad4a9b
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job_factory/impl/ya.make
@@ -0,0 +1,20 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_job_factory_impl.cpp
+)
+
+PEERDIR(
+ library/cpp/threading/future
+ yt/yql/providers/yt/fmr/job_factory/interface
+ yt/yql/providers/yt/fmr/request_options
+ yql/essentials/utils/log
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp
new file mode 100644
index 0000000000..f2762857b0
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp
@@ -0,0 +1,62 @@
+#include <util/system/mutex.h>
+#include <yql/essentials/utils/log/log.h>
+#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
+
+namespace NYql {
+
+class TFmrJobFactory: public IFmrJobFactory {
+public:
+ TFmrJobFactory(const TFmrJobFactorySettings& settings)
+ : NumThreads_(settings.NumThreads), Function_(settings.Function)
+ {
+ Start();
+ }
+
+ ~TFmrJobFactory() {
+ Stop();
+ }
+
+ NThreading::TFuture<TTaskResult::TPtr> StartJob(TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) override {
+ auto promise = NThreading::NewPromise<TTaskResult::TPtr>();
+ auto future = promise.GetFuture();
+ auto startJobFunc = [&, task, cancelFlag, promise = std::move(promise)] () mutable {
+ ETaskStatus finalTaskStatus;
+ TMaybe<TFmrError> taskErrorMessage;
+ try {
+ TString sessionId;
+ if (task) {
+ sessionId = task->SessionId;
+ }
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId);
+ YQL_CLOG(DEBUG, FastMapReduce) << "Starting job with taskId " << task->TaskId;
+ finalTaskStatus = Function_(task, cancelFlag);
+ } catch (const std::exception& exc) {
+ finalTaskStatus = ETaskStatus::Failed;
+ taskErrorMessage = TFmrError{.Component = EFmrComponent::Job, .ErrorMessage = exc.what()};
+ }
+ promise.SetValue(MakeTaskResult(finalTaskStatus, taskErrorMessage));
+ };
+ ThreadPool_->SafeAddFunc(startJobFunc);
+ return future;
+ }
+
+private:
+ void Start() {
+ ThreadPool_ = CreateThreadPool(NumThreads_);
+ }
+
+ void Stop() {
+ ThreadPool_->Stop();
+ }
+
+private:
+ THolder<IThreadPool> ThreadPool_;
+ i32 NumThreads_;
+ std::function<ETaskStatus(TTask::TPtr, std::shared_ptr<std::atomic<bool>>)> Function_;
+};
+
+TFmrJobFactory::TPtr MakeFmrJobFactory(const TFmrJobFactorySettings& settings) {
+ return MakeIntrusive<TFmrJobFactory>(settings);
+}
+
+} // namespace NYql
diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h
new file mode 100644
index 0000000000..00a29f95a6
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h
@@ -0,0 +1,16 @@
+#pragma once
+
+#include <library/cpp/threading/future/async.h>
+#include <util/thread/pool.h>
+#include <yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h>
+
+namespace NYql {
+
+struct TFmrJobFactorySettings {
+ ui32 NumThreads = 3;
+ std::function<ETaskStatus(TTask::TPtr, std::shared_ptr<std::atomic<bool>>)> Function;
+};
+
+IFmrJobFactory::TPtr MakeFmrJobFactory(const TFmrJobFactorySettings& settings);
+
+} // namepspace NYql
diff --git a/yt/yql/providers/yt/fmr/job_factory/interface/ya.make b/yt/yql/providers/yt/fmr/job_factory/interface/ya.make
new file mode 100644
index 0000000000..ed05358ed3
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job_factory/interface/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_job_factory.cpp
+)
+
+PEERDIR(
+ library/cpp/threading/future
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.cpp b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.cpp
new file mode 100644
index 0000000000..8b1091674d
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.cpp
@@ -0,0 +1 @@
+#include "yql_yt_job_factory.h"
diff --git a/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h
new file mode 100644
index 0000000000..387286687b
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include <library/cpp/threading/future/core/future.h>
+#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
+
+namespace NYql {
+
+class IFmrJobFactory: public TThrRefBase {
+public:
+ using TPtr = TIntrusivePtr<IFmrJobFactory>;
+
+ virtual ~IFmrJobFactory() = default;
+
+ virtual NThreading::TFuture<TTaskResult::TPtr> StartJob(TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) = 0;
+};
+
+} // namspace NYql
diff --git a/yt/yql/providers/yt/fmr/request_options/ya.make b/yt/yql/providers/yt/fmr/request_options/ya.make
new file mode 100644
index 0000000000..0010183a60
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/request_options/ya.make
@@ -0,0 +1,15 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_request_options.cpp
+)
+
+PEERDIR(
+ library/cpp/threading/future
+)
+
+YQL_LAST_ABI_VERSION()
+
+GENERATE_ENUM_SERIALIZATION(yql_yt_request_options.h)
+
+END()
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
new file mode 100644
index 0000000000..1e59e0dc8d
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp
@@ -0,0 +1,28 @@
+#include "yql_yt_request_options.h"
+
+namespace NYql {
+
+TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId) {
+ return MakeIntrusive<TTask>(taskType, taskId, taskParams, sessionId);
+}
+
+TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& taskErrorMessage) {
+ return MakeIntrusive<TTaskState>(taskStatus, taskId, taskErrorMessage);
+}
+
+TTaskResult::TPtr MakeTaskResult(ETaskStatus taskStatus, const TMaybe<TFmrError>& taskErrorMessage) {
+ return MakeIntrusive<TTaskResult>(taskStatus, taskErrorMessage);
+}
+
+} // namepsace NYql
+
+template<>
+void Out<NYql::TFmrError>(IOutputStream& out, const NYql::TFmrError& error) {
+ out << "FmrError[" << error.Component << "]";
+ if (error.Component == NYql::EFmrComponent::Worker) {
+ out << "(TaskId: " << error.TaskId << " WorkerId: " << error.WorkerId << ") ";
+ } else if (error.Component == NYql::EFmrComponent::Coordinator) {
+ out << "(OperationId: " << error.OperationId <<") ";
+ }
+ out << error.ErrorMessage;
+}
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
new file mode 100644
index 0000000000..d8f21e5f1e
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
@@ -0,0 +1,126 @@
+#pragma once
+
+#include <util/generic/maybe.h>
+#include <util/generic/string.h>
+#include <vector>
+
+namespace NYql {
+
+enum class EOperationStatus {
+ Accepted,
+ InProgress,
+ Failed,
+ Completed,
+ Aborted,
+ NotFound
+};
+
+enum class ETaskStatus {
+ Accepted,
+ InProgress,
+ Failed,
+ Completed,
+ Aborted
+};
+
+enum class ETaskType {
+ Download,
+ Upload,
+ Merge
+};
+
+enum class EFmrComponent {
+ Coordinator,
+ Worker,
+ Job
+};
+
+struct TFmrError {
+ EFmrComponent Component;
+ TString ErrorMessage;
+ TMaybe<ui32> WorkerId;
+ TMaybe<TString> TaskId;
+ TMaybe<TString> OperationId;
+};
+
+struct TStatistics {
+};
+
+struct TYtTableRef {
+ TString Path;
+ TString Cluster;
+ TString TransactionId;
+};
+
+struct TFmrTableRef {
+ TString TableId;
+};
+
+struct TTableRef {
+ std::variant<TYtTableRef, TFmrTableRef> TableRef;
+};
+
+struct TUploadTaskParams {
+ TFmrTableRef Input;
+ TYtTableRef Output;
+};
+
+struct TDownloadTaskParams {
+ TYtTableRef Input;
+ TFmrTableRef Output;
+};
+
+struct TMergeTaskParams {
+ std::vector<TTableRef> Input;
+ TFmrTableRef Output;
+};
+
+using TTaskParams = std::variant<TUploadTaskParams, TDownloadTaskParams, TMergeTaskParams>;
+
+struct TTask: public TThrRefBase {
+ TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, ui32 numRetries = 1)
+ : TaskType(taskType), TaskId(taskId), TaskParams(taskParams), SessionId(sessionId), NumRetries(numRetries)
+ {
+ }
+
+ ETaskType TaskType;
+ TString TaskId;
+ TTaskParams TaskParams;
+ TString SessionId;
+ ui32 NumRetries; // Not supported yet
+
+ using TPtr = TIntrusivePtr<TTask>;
+};
+
+struct TTaskState: public TThrRefBase {
+ TTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& errorMessage = Nothing())
+ : TaskStatus(taskStatus), TaskId(taskId), TaskErrorMessage(errorMessage)
+ {
+ }
+
+ ETaskStatus TaskStatus;
+ TString TaskId;
+ TMaybe<TFmrError> TaskErrorMessage;
+
+ using TPtr = TIntrusivePtr<TTaskState>;
+};
+
+struct TTaskResult: public TThrRefBase {
+ TTaskResult(ETaskStatus taskStatus, const TMaybe<TFmrError>& errorMessage = Nothing())
+ : TaskStatus(taskStatus), TaskErrorMessage(errorMessage)
+ {
+ }
+
+ ETaskStatus TaskStatus;
+ TMaybe<TFmrError> TaskErrorMessage;
+
+ using TPtr = TIntrusivePtr<TTaskResult>;
+};
+
+TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId);
+
+TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& taskErrorMessage = Nothing());
+
+TTaskResult::TPtr MakeTaskResult(ETaskStatus taskStatus, const TMaybe<TFmrError>& taskErrorMessage = Nothing());
+
+} // namespace NYql
diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make b/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make
new file mode 100644
index 0000000000..3e12787010
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make
@@ -0,0 +1,16 @@
+UNITTEST()
+
+SRCS(
+ yql_yt_worker_ut.cpp
+)
+
+PEERDIR(
+ library/cpp/yt/assert
+ yt/yql/providers/yt/fmr/coordinator/impl
+ yt/yql/providers/yt/fmr/job_factory/impl
+ yt/yql/providers/yt/fmr/worker/impl
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
new file mode 100644
index 0000000000..8077c9abea
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
@@ -0,0 +1,101 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/threading/future/async.h>
+#include <util/stream/output.h>
+#include <util/string/cast.h>
+#include <util/system/mutex.h>
+#include <util/thread/pool.h>
+#include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h>
+#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h>
+#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
+
+namespace NYql {
+
+TDownloadTaskParams downloadTaskParams{
+ .Input = TYtTableRef{"Path","Cluster","TransactionId"},
+ .Output = TFmrTableRef{"TableId"}
+};
+
+TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Download, TTaskParams taskParams = downloadTaskParams) {
+ return TStartOperationRequest{.TaskType = taskType, .TaskParams = taskParams, .IdempotencyKey = "IdempotencyKey"};
+}
+
+Y_UNIT_TEST_SUITE(FmrWorkerTests) {
+ Y_UNIT_TEST(GetSuccessfulOperationResult) {
+ auto coordinator = MakeFmrCoordinator();
+ auto operationResults = std::make_shared<TString>("no_result_yet");
+ auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
+ while (!cancelFlag->load()) {
+ *operationResults = "operation_result";
+ return ETaskStatus::Completed;
+ }
+ return ETaskStatus::Aborted;
+ };
+ TFmrJobFactorySettings settings{.NumThreads = 3, .Function = func};
+
+ auto factory = MakeFmrJobFactory(settings);
+ TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
+ worker->Start();
+ coordinator->StartOperation(CreateOperationRequest()).GetValueSync();
+ Sleep(TDuration::Seconds(2));
+ worker->Stop();
+ UNIT_ASSERT_NO_DIFF(*operationResults, "operation_result");
+ }
+
+ Y_UNIT_TEST(CancelOperation) {
+ auto coordinator = MakeFmrCoordinator();
+ auto operationResults = std::make_shared<TString>("no_result_yet");
+ auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
+ int numIterations = 0;
+ while (!cancelFlag->load()) {
+ Sleep(TDuration::Seconds(1));
+ ++numIterations;
+ if (numIterations == 100) {
+ *operationResults = "operation_result";
+ return ETaskStatus::Completed;
+ }
+ }
+ *operationResults = "operation_cancelled";
+ return ETaskStatus::Aborted;
+ };
+ TFmrJobFactorySettings settings{.NumThreads =3, .Function=func};
+ auto factory = MakeFmrJobFactory(settings);
+ TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
+ worker->Start();
+ auto operationId = coordinator->StartOperation(CreateOperationRequest()).GetValueSync().OperationId;
+ Sleep(TDuration::Seconds(2));
+ coordinator->DeleteOperation({operationId}).GetValueSync();
+ Sleep(TDuration::Seconds(5));
+ worker->Stop();
+ UNIT_ASSERT_NO_DIFF(*operationResults, "operation_cancelled");
+ }
+ Y_UNIT_TEST(CreateSeveralWorkers) {
+ TFmrCoordinatorSettings coordinatorSettings{};
+ coordinatorSettings.WorkersNum = 2;
+ coordinatorSettings.RandomProvider = CreateDeterministicRandomProvider(3);
+ auto coordinator = MakeFmrCoordinator(coordinatorSettings);
+ std::shared_ptr<std::atomic<ui32>> operationResult = std::make_shared<std::atomic<ui32>>(0);
+ auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
+ while (!cancelFlag->load()) {
+ Sleep(TDuration::Seconds(1));
+ (*operationResult)++;
+ return ETaskStatus::Completed;
+ }
+ return ETaskStatus::Aborted;
+ };
+ TFmrJobFactorySettings settings{.NumThreads =3, .Function=func};
+ auto factory = MakeFmrJobFactory(settings);
+ TFmrWorkerSettings firstWorkerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ TFmrWorkerSettings secondWorkerSettings{.WorkerId = 2, .RandomProvider = CreateDeterministicRandomProvider(2)};
+ auto firstWorker = MakeFmrWorker(coordinator, factory, firstWorkerSettings);
+ auto secondWorker = MakeFmrWorker(coordinator, factory, secondWorkerSettings);
+ firstWorker->Start();
+ secondWorker->Start();
+ coordinator->StartOperation(CreateOperationRequest()).GetValueSync();
+ Sleep(TDuration::Seconds(3));
+ UNIT_ASSERT_VALUES_EQUAL(operationResult->load(), 1);
+ }
+}
+
+} // namspace NYql
diff --git a/yt/yql/providers/yt/fmr/worker/impl/ya.make b/yt/yql/providers/yt/fmr/worker/impl/ya.make
new file mode 100644
index 0000000000..28cfcbea66
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/worker/impl/ya.make
@@ -0,0 +1,20 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_worker_impl.cpp
+)
+
+PEERDIR(
+ library/cpp/random_provider
+ library/cpp/threading/future
+ yt/yql/providers/yt/fmr/worker/interface
+ yql/essentials/utils
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp
new file mode 100644
index 0000000000..00363e443f
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp
@@ -0,0 +1,139 @@
+#include <library/cpp/threading/future/wait/wait.h>
+#include <thread>
+#include <ranges>
+#include <util/system/mutex.h>
+#include <yql/essentials/utils/yql_panic.h>
+#include "yql_yt_worker_impl.h"
+
+namespace NYql {
+
+namespace {
+
+struct TFmrWorkerState {
+ TMutex Mutex;
+ std::unordered_map<TString, TTaskResult::TPtr> TaskStatuses;
+ std::unordered_map<TString, NThreading::TFuture<TTaskResult::TPtr>> TaskFutures;
+};
+
+class TFmrWorker: public IFmrWorker {
+public:
+ TFmrWorker(IFmrCoordinator::TPtr coordinator, IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings)
+ : Coordinator_(coordinator),
+ JobFactory_(jobFactory),
+ WorkerState_(std::make_shared<TFmrWorkerState>(TMutex(), std::unordered_map<TString, TTaskResult::TPtr>{}, std::unordered_map<TString, NThreading::TFuture<TTaskResult::TPtr>>{})),
+ StopWorker_(false),
+ RandomProvider_(settings.RandomProvider),
+ WorkerId_(settings.WorkerId),
+ VolatileId_(GetGuidAsString(RandomProvider_->GenGuid())),
+ TimeToSleepBetweenRequests_(settings.TimeToSleepBetweenRequests)
+{
+}
+
+ ~TFmrWorker() {
+ Stop();
+ }
+
+ void Start() override {
+ auto mainThreadFunc = [&] () {
+ while (!StopWorker_) {
+ std::vector<TTaskState::TPtr> taskStates;
+ std::vector<TString> taskIdsToErase;
+ with_lock(WorkerState_->Mutex) {
+ for (auto& [taskId, taskResult]: WorkerState_->TaskStatuses) {
+ auto taskStatus = taskResult->TaskStatus;
+ if (taskStatus != ETaskStatus::InProgress) {
+ taskIdsToErase.emplace_back(taskId);
+ }
+ taskStates.emplace_back(MakeTaskState(taskStatus, taskId, taskResult->TaskErrorMessage));
+ }
+ for (auto& taskId: taskIdsToErase) {
+ WorkerState_->TaskStatuses.erase(taskId);
+ TasksCancelStatus_.erase(taskId);
+ }
+ }
+
+ auto heartbeatRequest = THeartbeatRequest(
+ WorkerId_,
+ VolatileId_,
+ taskStates,
+ TStatistics()
+ );
+ auto heartbeatResponseFuture = Coordinator_->SendHeartbeatResponse(heartbeatRequest);
+ auto heartbeatResponse = heartbeatResponseFuture.GetValueSync();
+ std::vector<TTask::TPtr> tasksToRun = heartbeatResponse.TasksToRun;
+ std::unordered_set<TString> taskToDeleteIds = heartbeatResponse.TaskToDeleteIds;
+
+ with_lock(WorkerState_->Mutex) {
+ for (auto task: tasksToRun) {
+ auto taskId = task->TaskId;
+ YQL_ENSURE(!WorkerState_->TaskStatuses.contains(taskId));
+ WorkerState_->TaskStatuses[taskId] = MakeTaskResult(ETaskStatus::InProgress);
+ TasksCancelStatus_[taskId] = std::make_shared<std::atomic<bool>>(false);
+ }
+ for (auto& taskToDeleteId: taskToDeleteIds) {
+ if (TasksCancelStatus_.contains(taskToDeleteId)) {
+ TasksCancelStatus_[taskToDeleteId]->store(true);
+ }
+ }
+
+ for (auto task: tasksToRun) {
+ auto taskId = task->TaskId;
+ auto future = JobFactory_->StartJob(task, TasksCancelStatus_[taskId]);
+ future.Subscribe([weakState = std::weak_ptr(WorkerState_), task](const auto& jobFuture) {
+ auto finalTaskStatus = jobFuture.GetValue();
+ std::shared_ptr<TFmrWorkerState> state = weakState.lock();
+ if (state) {
+ with_lock(state->Mutex) {
+ YQL_ENSURE(state->TaskStatuses.contains(task->TaskId));
+ state->TaskStatuses[task->TaskId] = finalTaskStatus;
+ state->TaskFutures.erase(task->TaskId);
+ }
+ }
+ });
+ YQL_ENSURE(!WorkerState_->TaskFutures.contains(taskId));
+ WorkerState_->TaskFutures[taskId] = future;
+ }
+ }
+ Sleep(TimeToSleepBetweenRequests_);
+ }
+ };
+ MainThread_ = std::thread(mainThreadFunc);
+ }
+
+ void Stop() override {
+ std::vector<NThreading::TFuture<TTaskResult::TPtr>> taskFutures;
+ with_lock(WorkerState_->Mutex) {
+ for (auto& taskInfo: TasksCancelStatus_) {
+ taskInfo.second->store(true);
+ }
+ StopWorker_ = true;
+ auto futuresView = std::views::values(WorkerState_->TaskFutures);
+ taskFutures = std::vector<NThreading::TFuture<TTaskResult::TPtr>>{futuresView.begin(), futuresView.end()};
+ }
+ NThreading::WaitAll(taskFutures).GetValueSync();
+ if (MainThread_.joinable()) {
+ MainThread_.join();
+ }
+ }
+
+private:
+
+ IFmrCoordinator::TPtr Coordinator_;
+ IFmrJobFactory::TPtr JobFactory_;
+ std::unordered_map<TString, std::shared_ptr<std::atomic<bool>>> TasksCancelStatus_;
+ std::shared_ptr<TFmrWorkerState> WorkerState_;
+ std::atomic<bool> StopWorker_;
+ const TIntrusivePtr<IRandomProvider> RandomProvider_;
+ const ui32 WorkerId_;
+ const TString VolatileId_;
+ std::thread MainThread_;
+ const TDuration TimeToSleepBetweenRequests_;
+};
+
+} // namespace
+
+IFmrWorker::TPtr MakeFmrWorker(IFmrCoordinator::TPtr coordinator, IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings) {
+ return MakeIntrusive<TFmrWorker>(coordinator, jobFactory, settings);
+}
+
+} // namspace NYql
diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h
new file mode 100644
index 0000000000..dc43652dea
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h
@@ -0,0 +1,14 @@
+#include <library/cpp/random_provider/random_provider.h>
+#include <yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h>
+
+namespace NYql {
+
+struct TFmrWorkerSettings {
+ ui32 WorkerId;
+ TIntrusivePtr<IRandomProvider> RandomProvider;
+ TDuration TimeToSleepBetweenRequests = TDuration::Seconds(1);
+};
+
+IFmrWorker::TPtr MakeFmrWorker(IFmrCoordinator::TPtr coordinator,IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings);
+
+} // namspace NYql
diff --git a/yt/yql/providers/yt/fmr/worker/interface/ya.make b/yt/yql/providers/yt/fmr/worker/interface/ya.make
new file mode 100644
index 0000000000..2042b984d0
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/worker/interface/ya.make
@@ -0,0 +1,15 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_worker.cpp
+)
+
+PEERDIR(
+ library/cpp/threading/future
+ yt/yql/providers/yt/fmr/coordinator/interface
+ yt/yql/providers/yt/fmr/job_factory/interface
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.cpp b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.cpp
new file mode 100644
index 0000000000..4c6001a217
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.cpp
@@ -0,0 +1 @@
+#include "yql_yt_worker.h"
diff --git a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h
new file mode 100644
index 0000000000..d3d5690fbd
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h
@@ -0,0 +1,19 @@
+#include <library/cpp/threading/future/core/future.h>
+
+#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h>
+#include <yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h>
+
+namespace NYql {
+
+class IFmrWorker: public TThrRefBase {
+public:
+ using TPtr = TIntrusivePtr<IFmrWorker>;
+
+ virtual ~IFmrWorker() = default;
+
+ virtual void Start() = 0;
+
+ virtual void Stop() = 0;
+};
+
+} // namspace NYql
diff --git a/yt/yql/providers/yt/gateway/fmr/ya.make b/yt/yql/providers/yt/gateway/fmr/ya.make
index 4f805b1aca..bfc91df2f5 100644
--- a/yt/yql/providers/yt/gateway/fmr/ya.make
+++ b/yt/yql/providers/yt/gateway/fmr/ya.make
@@ -6,6 +6,8 @@ SRCS(
PEERDIR(
yql/essentials/utils/log
+ yt/yql/providers/yt/expr_nodes
+ yt/yql/providers/yt/fmr/coordinator/interface
yt/yql/providers/yt/provider
)
diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
index 81d4224530..266f3c8174 100644
--- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
+++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp
@@ -1,27 +1,285 @@
#include "yql_yt_fmr.h"
+#include <thread>
+
+#include <yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h>
+#include <yt/yql/providers/yt/provider/yql_yt_helpers.h>
+
+#include <yql/essentials/utils/log/log.h>
#include <yql/essentials/utils/log/profile.h>
#include <util/generic/ptr.h>
+#include <util/thread/pool.h>
using namespace NThreading;
+using namespace NYql::NNodes;
namespace NYql {
namespace {
+enum class ETablePresenceStatus {
+ OnlyInYt,
+ OnlyInFmr,
+ Both
+};
+
+struct TDownloadTableToFmrResult: public NCommon::TOperationResult {}; // Download Yt -> Fmr TableDataService
+
class TFmrYtGateway final: public TYtForwardingGatewayBase {
public:
- TFmrYtGateway(IYtGateway::TPtr&& slave)
- : TYtForwardingGatewayBase(std::move(slave))
+ TFmrYtGateway(IYtGateway::TPtr&& slave, IFmrCoordinator::TPtr coordinator, const TFmrYtGatewaySettings& settings)
+ : TYtForwardingGatewayBase(std::move(slave)),
+ Coordinator_(coordinator),
+ SessionStates_(std::make_shared<TSession>(TSession())),
+ RandomProvider_(settings.RandomProvider),
+ TimeToSleepBetweenGetOperationRequests_(settings.TimeToSleepBetweenGetOperationRequests)
{
+ auto getOperationStatusesFunc = [&] {
+ while (!StopFmrGateway_) {
+ with_lock(SessionStates_->Mutex) {
+ auto checkOperationStatuses = [&] <typename T> (std::unordered_map<TString, TPromise<T>>& operationStatuses) {
+ std::vector<TString> completedOperationIds;
+ for (auto& [operationId, promise]: operationStatuses) {
+ auto getOperationFuture = Coordinator_->GetOperation({operationId});
+ getOperationFuture.Subscribe([&, operationId] (const auto& getFuture) {
+ auto getOperationResult = getFuture.GetValueSync();
+ auto getOperationStatus = getOperationResult.Status;
+ auto operationErrorMessages = getOperationResult.ErrorMessages;
+ with_lock(SessionStates_->Mutex) {
+ bool operationCompleted = getOperationStatus != EOperationStatus::Accepted && getOperationStatus != EOperationStatus::InProgress;
+ if (operationCompleted) {
+ // operation finished, set value in future returned in Publish / Download
+ bool hasCompletedSuccessfully = getOperationStatus == EOperationStatus::Completed;
+ SendOperationCompletionSignal(promise, hasCompletedSuccessfully, operationErrorMessages);
+ completedOperationIds.emplace_back(operationId);
+ }
+ }
+ });
+ }
+
+ for (auto& operationId: completedOperationIds) {
+ Coordinator_->DeleteOperation({operationId}).GetValueSync();
+ operationStatuses.erase(operationId);
+ }
+ };
+
+ for (auto& [sessionId, sessionInfo]: SessionStates_->Sessions) {
+ auto& operationStates = sessionInfo.OperationStates;
+ checkOperationStatuses(operationStates.DownloadOperationStatuses);
+ checkOperationStatuses(operationStates.UploadOperationStatuses);
+ }
+ }
+ Sleep(TimeToSleepBetweenGetOperationRequests_);
+ }
+ };
+ GetOperationStatusesThread_ = std::thread(getOperationStatusesFunc);
+ }
+
+ ~TFmrYtGateway() {
+ StopFmrGateway_ = true;
+ GetOperationStatusesThread_.join();
+ }
+
+ TFuture<TPublishResult> Publish(const TExprNode::TPtr& node, TExprContext& ctx, TPublishOptions&& options) final {
+ if (!Coordinator_) {
+ return Slave_->Publish(node, ctx, std::move(options));
+ }
+ auto publish = TYtPublish(node);
+
+ auto cluster = publish.DataSink().Cluster().StringValue();
+ auto outputPath = publish.Publish().Name().StringValue();
+ auto transactionId = GenerateId();
+ auto idempotencyKey = GenerateId();
+
+ auto fmrTableId = cluster + "." + outputPath;
+
+ TFuture<TDownloadTableToFmrResult> downloadToFmrFuture;
+ TFuture<void> downloadedSuccessfully;
+
+ TString sessionId = options.SessionId();
+ YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__, sessionId);
+
+ with_lock(SessionStates_->Mutex) {
+ auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses;
+
+ if (!tablePresenceStatuses.contains(fmrTableId)) {
+ TYtTableRef ytTable{.Path = outputPath, .Cluster = cluster, .TransactionId = transactionId};
+ tablePresenceStatuses[fmrTableId] = ETablePresenceStatus::Both;
+ downloadToFmrFuture = DownloadToFmrTableDataSerivce(ytTable, sessionId);
+ downloadedSuccessfully = downloadToFmrFuture.Apply([downloadedSuccessfully] (auto& downloadFuture) {
+ auto downloadResult = downloadFuture.GetValueSync();
+ });
+ } else {
+ downloadedSuccessfully = MakeFuture();
+ }
+ }
+ downloadedSuccessfully.Wait(); // blocking until download to fmr finishes
+
+ YQL_CLOG(INFO, FastMapReduce) << "Uploading table with cluster " << cluster << " and path " << outputPath << " from fmr to yt";
+
+ TUploadTaskParams uploadTaskParams{
+ .Input = TFmrTableRef{fmrTableId},
+ .Output = TYtTableRef{outputPath, cluster, transactionId}
+ };
+
+ TStartOperationRequest uploadRequest{
+ .TaskType = ETaskType::Upload, .TaskParams = uploadTaskParams, .SessionId = sessionId, .IdempotencyKey=idempotencyKey, .NumRetries=1
+ };
+
+ auto promise = NewPromise<TPublishResult>();
+ auto future = promise.GetFuture();
+
+ auto startOperationResponseFuture = Coordinator_->StartOperation(uploadRequest);
+ startOperationResponseFuture.Subscribe([this, promise = std::move(promise), &sessionId] (const auto& startFuture) {
+ TString operationId = startFuture.GetValueSync().OperationId;
+ with_lock(SessionStates_->Mutex) {
+ YQL_ENSURE(SessionStates_->Sessions.contains(sessionId));
+ auto& operationStates = SessionStates_->Sessions[sessionId].OperationStates;
+ auto& uploadOperationStatuses = operationStates.UploadOperationStatuses;
+ YQL_ENSURE(!uploadOperationStatuses.contains(operationId));
+ uploadOperationStatuses[operationId] = promise;
+ }
+ });
+ return future;
+ }
+
+ TFuture<TDownloadTableToFmrResult> DownloadToFmrTableDataSerivce(const TYtTableRef& ytTableRef, const TString& sessionId) {
+ YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
+ TString fmrTableId = ytTableRef.Cluster + "." + ytTableRef.Path;
+ TDownloadTaskParams downloadTaskParams{
+ .Input = ytTableRef,
+ .Output = {fmrTableId}
+ };
+ auto idempotencyKey = GenerateId();
+ TStartOperationRequest downloadRequest{
+ .TaskType = ETaskType::Download, .TaskParams = downloadTaskParams, .SessionId = sessionId, .IdempotencyKey=idempotencyKey, .NumRetries=1
+ };
+
+ YQL_CLOG(INFO, FastMapReduce) << "Downloading table with cluster " << ytTableRef.Cluster << " and path " << ytTableRef.Path << " from yt to fmr";
+
+ auto promise = NewPromise<TDownloadTableToFmrResult>();
+ auto future = promise.GetFuture();
+
+ auto startOperationResponseFuture = Coordinator_->StartOperation(downloadRequest);
+ startOperationResponseFuture.Subscribe([this, promise = std::move(promise), &sessionId] (const auto& startFuture) {
+ TString operationId = startFuture.GetValueSync().OperationId;
+ with_lock(SessionStates_->Mutex) {
+ auto& operationStates = SessionStates_->Sessions[sessionId].OperationStates;
+ auto& downloadOperationStatuses = operationStates.DownloadOperationStatuses;
+ YQL_ENSURE(!downloadOperationStatuses.contains(operationId));
+ downloadOperationStatuses[operationId] = promise;
+ }
+ });
+ return future;
+ }
+
+ void OpenSession(TOpenSessionOptions&& options) final {
+ Slave_->OpenSession(std::move(options));
+
+ TString sessionId = options.SessionId();
+ YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
+ with_lock(SessionStates_->Mutex) {
+ auto sessions = SessionStates_->Sessions;
+ if (sessions.contains(sessionId)) {
+ YQL_LOG_CTX_THROW yexception() << "Session already exists: " << sessionId;
+ }
+ sessions[sessionId] = TSessionInfo();
+ }
+ }
+
+ TFuture<void> CloseSession(TCloseSessionOptions&& options) final {
+ Slave_->CloseSession(std::move(options)).Wait();
+ YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
+
+ with_lock(SessionStates_->Mutex) {
+ auto& sessions = SessionStates_->Sessions;
+ auto it = sessions.find(options.SessionId());
+ if (it != sessions.end()) {
+ sessions.erase(it);
+ }
+ }
+ return MakeFuture();
+ }
+
+ TFuture<void> CleanupSession(TCleanupSessionOptions&& options) final {
+ Slave_->CleanupSession(std::move(options)).Wait();
+ YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
+
+ TString sessionId = options.SessionId();
+ with_lock(SessionStates_->Mutex) {
+ auto& sessions = SessionStates_->Sessions;
+ YQL_ENSURE(sessions.contains(sessionId));
+ auto& operationStates = sessions[sessionId].OperationStates;
+
+ auto cancelOperationsFunc = [&] <typename T> (std::unordered_map<TString, TPromise<T>>& operationStatuses) {
+ std::vector<TFuture<TDeleteOperationResponse>> cancelOperationsFutures;
+
+ for (auto& [operationId, promise]: operationStatuses) {
+ cancelOperationsFutures.emplace_back(Coordinator_->DeleteOperation({operationId}));
+ }
+ NThreading::WaitAll(cancelOperationsFutures).GetValueSync();
+ std::vector<TFuture<T>> resultFutures;
+
+ for (auto& [operationId, promise]: operationStatuses) {
+ SendOperationCompletionSignal(promise, false);
+ resultFutures.emplace_back(promise.GetFuture());
+ }
+ NThreading::WaitAll(resultFutures).GetValueSync();
+ };
+
+ cancelOperationsFunc(operationStates.DownloadOperationStatuses);
+ cancelOperationsFunc(operationStates.UploadOperationStatuses);
+ }
+
+ return MakeFuture();
+ }
+
+private:
+ struct TFmrGatewayOperationsState {
+ std::unordered_map<TString, TPromise<TPublishResult>> UploadOperationStatuses = {}; // operationId -> promise which we set when operation completes
+ std::unordered_map<TString, TPromise<TDownloadTableToFmrResult>> DownloadOperationStatuses = {};
+ };
+
+ struct TSessionInfo {
+ TFmrGatewayOperationsState OperationStates;
+ std::unordered_map<TString, ETablePresenceStatus> TablePresenceStatuses; // yt cluster and path -> is it In Yt, Fmr TableDataService
+ };
+
+ struct TSession {
+ std::unordered_map<TString, TSessionInfo> Sessions;
+ TMutex Mutex = TMutex();
+ };
+
+ IFmrCoordinator::TPtr Coordinator_;
+ std::shared_ptr<TSession> SessionStates_;
+ const TIntrusivePtr<IRandomProvider> RandomProvider_;
+ TDuration TimeToSleepBetweenGetOperationRequests_;
+ std::thread GetOperationStatusesThread_;
+ std::atomic<bool> StopFmrGateway_;
+
+ TString GenerateId() {
+ return GetGuidAsString(RandomProvider_->GenGuid());
+ }
+
+ template <std::derived_from<NCommon::TOperationResult> T>
+ void SendOperationCompletionSignal(TPromise<T> promise, bool completedSuccessfully = false, const std::vector<TFmrError>& errorMessages = {}) {
+ YQL_ENSURE(!promise.HasValue());
+ T commonOperationResult;
+ if (completedSuccessfully) {
+ commonOperationResult.SetSuccess();
+ } else if (!errorMessages.empty()) {
+ auto exception = yexception() << "Operation failed with errors: " << JoinSeq(" ", errorMessages);
+ commonOperationResult.SetException(exception);
+ }
+ promise.SetValue(commonOperationResult);
}
};
} // namespace
-IYtGateway::TPtr CreateYtFmrGateway(IYtGateway::TPtr slave) {
- return MakeIntrusive<TFmrYtGateway>(std::move(slave));
+IYtGateway::TPtr CreateYtFmrGateway(IYtGateway::TPtr slave, IFmrCoordinator::TPtr coordinator, const TFmrYtGatewaySettings& settings) {
+ return MakeIntrusive<TFmrYtGateway>(std::move(slave), std::move(coordinator), settings);
}
} // namspace NYql
diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h
index 97e26b63b1..df82d50527 100644
--- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h
+++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h
@@ -1,9 +1,22 @@
#pragma once
+#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h>
#include <yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.h>
namespace NYql {
-IYtGateway::TPtr CreateYtFmrGateway(IYtGateway::TPtr slave);
+struct TFmrYtGatewaySettings {
+ TIntrusivePtr<IRandomProvider> RandomProvider;
+ TDuration TimeToSleepBetweenGetOperationRequests;
+};
+
+IYtGateway::TPtr CreateYtFmrGateway(
+ IYtGateway::TPtr slave,
+ IFmrCoordinator::TPtr coordinator = nullptr,
+ const TFmrYtGatewaySettings& settings = TFmrYtGatewaySettings{
+ .RandomProvider = CreateDefaultRandomProvider(),
+ .TimeToSleepBetweenGetOperationRequests = TDuration::Seconds(1)
+ }
+);
} // namspace NYql
diff --git a/yt/yql/tools/ytrun/lib/ya.make b/yt/yql/tools/ytrun/lib/ya.make
index 12bcaa20fd..93b20d7ed0 100644
--- a/yt/yql/tools/ytrun/lib/ya.make
+++ b/yt/yql/tools/ytrun/lib/ya.make
@@ -6,6 +6,9 @@ SRCS(
PEERDIR(
yt/yql/providers/yt/provider
+ yt/yql/providers/yt/fmr/coordinator/impl
+ yt/yql/providers/yt/fmr/job_factory/impl
+ yt/yql/providers/yt/fmr/worker/impl
yt/yql/providers/yt/gateway/native
yt/yql/providers/yt/gateway/fmr
yt/yql/providers/yt/lib/config_clusters
diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
index 9a4b5d0377..c2064ebd85 100644
--- a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
+++ b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
@@ -8,6 +8,8 @@
#include <yt/yql/providers/yt/lib/log/yt_logger.h>
#include <yt/yql/providers/yt/gateway/native/yql_yt_native.h>
#include <yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h>
+#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h>
+#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
#include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
#include <yql/essentials/core/services/yql_transform_pipeline.h>
@@ -170,7 +172,26 @@ IYtGateway::TPtr TYtRunTool::CreateYtGateway() {
services.FileStorage = GetFileStorage();
services.Config = std::make_shared<TYtGatewayConfig>(GetRunOptions().GatewaysConfig->GetYt());
auto ytGateway = CreateYtNativeGateway(services);
- return GetRunOptions().GatewayTypes.contains(FastMapReduceGatewayName) ? CreateYtFmrGateway(ytGateway): ytGateway;
+ if (!GetRunOptions().GatewayTypes.contains(FastMapReduceGatewayName)) {
+ return ytGateway;
+ }
+
+ auto coordinator = MakeFmrCoordinator();
+ auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
+ while (!cancelFlag->load()) {
+ Sleep(TDuration::Seconds(3));
+ return ETaskStatus::Completed;
+ }
+ return ETaskStatus::Aborted;
+ }; // TODO - use function which actually calls Downloader/Uploader based on task params
+
+ TFmrJobFactorySettings settings{.Function=func};
+ auto jobFactory = MakeFmrJobFactory(settings);
+ TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDefaultRandomProvider(),
+ .TimeToSleepBetweenRequests=TDuration::Seconds(1)};
+ FmrWorker_ = MakeFmrWorker(coordinator, jobFactory, workerSettings);
+ FmrWorker_->Start();
+ return CreateYtFmrGateway(ytGateway, coordinator);
}
IOptimizerFactory::TPtr TYtRunTool::CreateCboFactory() {
diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.h b/yt/yql/tools/ytrun/lib/ytrun_lib.h
index de3c0dfd64..e724ce2ac6 100644
--- a/yt/yql/tools/ytrun/lib/ytrun_lib.h
+++ b/yt/yql/tools/ytrun/lib/ytrun_lib.h
@@ -1,6 +1,7 @@
#pragma once
#include <yt/yql/providers/yt/provider/yql_yt_gateway.h>
+#include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h>
#include <yql/essentials/tools/yql_facade_run/yql_facade_run.h>
#include <yql/essentials/core/cbo/cbo_optimizer_new.h>
@@ -32,6 +33,7 @@ protected:
size_t NumThreads_ = 1;
bool KeepTemp_ = false;
TString DefYtServer_;
+ IFmrWorker::TPtr FmrWorker_;
};
} // NYql