summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2025-09-18 11:59:31 +0300
committerrobot-piglet <[email protected]>2025-09-18 12:17:20 +0300
commite2dc5e15bc02bff63e44ed9c74ff1093d87d845b (patch)
treeedd3015ae492064effbe48767a36f53ca23f6710
parentd2d6681538770d0c2725615373a6c2e07adf653e (diff)
Intermediate changes
commit_hash:1bb058fdc353cb6abf7a9f65e8c1a61a424274f3
-rw-r--r--yql/essentials/tests/common/test_framework/yqlrun.py26
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.cpp11
-rw-r--r--yt/yql/providers/yt/fmr/proto/coordinator.proto4
-rw-r--r--yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.cpp23
-rw-r--r--yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.h2
-rw-r--r--yt/yql/providers/yt/fmr/test_tools/table_data_service/yql_yt_table_data_service_helpers.cpp2
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/ut/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_status_ut.cpp43
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp2
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp127
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h6
-rw-r--r--yt/yql/providers/yt/fmr/worker/interface/ya.make9
-rw-r--r--yt/yql/providers/yt/fmr/worker/interface/yql_yt_fmr_worker.cpp1
-rw-r--r--yt/yql/providers/yt/fmr/worker/interface/yql_yt_fmr_worker.h21
15 files changed, 202 insertions, 77 deletions
diff --git a/yql/essentials/tests/common/test_framework/yqlrun.py b/yql/essentials/tests/common/test_framework/yqlrun.py
index fdf5110f5f3..5629875a8ef 100644
--- a/yql/essentials/tests/common/test_framework/yqlrun.py
+++ b/yql/essentials/tests/common/test_framework/yqlrun.py
@@ -1,3 +1,4 @@
+import errno
import os
import shutil
import yatest.common
@@ -23,6 +24,25 @@ FIX_DIR_PREFIXES = {
}
+def safe_symlink(src, dst):
+ """
+ If dst is an existing directory, creates a symlink inside it with the basename of src.
+ If dst is a file path (does not exist or is not a directory), creates the symlink at dst.
+ If the link already exists, does nothing.
+ """
+ # If dst is an existing directory, append basename of src
+ if os.path.isdir(dst):
+ link_path = os.path.join(dst, os.path.basename(src))
+ else:
+ link_path = dst
+
+ try:
+ os.symlink(src, link_path)
+ except OSError as e:
+ if e.errno != errno.EEXIST:
+ raise
+
+
class YQLRun(object):
def __init__(self, udfs_dir=None, prov='yt', use_sql2yql=False, keep_temp=True, binary=None, gateway_config=None,
@@ -210,7 +230,7 @@ class YQLRun(object):
copy_dest = os.path.join(res_dir, f)
if not os.path.exists(os.path.dirname(copy_dest)):
os.makedirs(os.path.dirname(copy_dest))
- shutil.copy2(
+ safe_symlink(
real_path,
copy_dest,
)
@@ -232,9 +252,9 @@ class YQLRun(object):
else:
copy_dest = res_dir
files[f] = os.path.basename(files[f])
- shutil.copy2(path_to_copy, copy_dest)
+ safe_symlink(path_to_copy, copy_dest)
else:
- shutil.copy2(files[f], res_dir)
+ safe_symlink(files[f], res_dir)
files[f] = os.path.basename(files[f])
cmd += yql_utils.get_cmd_for_files('--file', files)
diff --git a/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.cpp b/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.cpp
index 0cf9afdcdd9..9b0fe37913e 100644
--- a/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.cpp
@@ -76,8 +76,9 @@ public:
TStringStream outputStream;
auto sendHeartbeatRequestFunc = [&]() {
- httpClient.DoPost(sendHearbeatRequestUrl, protoSendHeartbeatRequest.SerializeAsString(), &outputStream, GetHeadersWithLogContext(Headers_, false));
+ auto statusCode = httpClient.DoPost(sendHearbeatRequestUrl, protoSendHeartbeatRequest.SerializeAsString(), &outputStream, GetHeadersWithLogContext(Headers_, false));
TString serializedResponse = outputStream.ReadAll();
+ HandleHttpError(statusCode, serializedResponse);
NProto::THeartbeatResponse protoHeartbeatResponse;
YQL_ENSURE(protoHeartbeatResponse.ParseFromString(serializedResponse));
return NThreading::MakeFuture(HeartbeatResponseFromProto(protoHeartbeatResponse));
@@ -132,6 +133,14 @@ private:
std::function<void(const yexception&)> OnFail_ = [](const yexception& exc) {
YQL_CLOG(DEBUG, FastMapReduce) << "Got exception, retrying: " << exc.what();
};
+
+ void HandleHttpError(TKeepAliveHttpClient::THttpCode statusCode, TString serializedResponse) {
+ if (statusCode = HTTP_OK) return;
+ NProto::TErrorResponse protoErrorResponse;
+ YQL_ENSURE(protoErrorResponse.ParseFromString(serializedResponse));
+ ythrow yexception() << protoErrorResponse.GetErrorMessage();
+ }
+
};
} // namespace
diff --git a/yt/yql/providers/yt/fmr/proto/coordinator.proto b/yt/yql/providers/yt/fmr/proto/coordinator.proto
index 1d159f4783e..91aac08c250 100644
--- a/yt/yql/providers/yt/fmr/proto/coordinator.proto
+++ b/yt/yql/providers/yt/fmr/proto/coordinator.proto
@@ -58,3 +58,7 @@ message TClearSessionRequest {
message TClearSessionResponse {
EOperationStatus Status = 1;
};
+
+message TErrorResponse {
+ string ErrorMessage = 1;
+}
diff --git a/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.cpp b/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.cpp
index aa9def483aa..2d6e16339d8 100644
--- a/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.cpp
+++ b/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.cpp
@@ -25,7 +25,8 @@ enum class ETableDataServiceRequestHandler {
Get,
Delete,
DeleteGroups,
- Clear
+ Clear,
+ Ping
};
class TReplier: public TRequestReplier {
@@ -70,6 +71,9 @@ private:
} else if (queryPath == "clear") {
YQL_ENSURE(httpRequest.Method == "POST");
return ETableDataServiceRequestHandler::Clear;
+ } else if (queryPath == "ping") {
+ YQL_ENSURE(httpRequest.Method == "GET");
+ return ETableDataServiceRequestHandler::Ping;
}
return Nothing();
}
@@ -80,11 +84,8 @@ public:
TTableDataServiceServer(ILocalTableDataService::TPtr tableDataService, const TTableDataServiceServerSettings& settings)
: TableDataService_(tableDataService),
Host_(settings.Host),
- Port_(settings.Port),
- WorkerId_(settings.WorkerId),
- WorkersNum_(settings.WorkersNum)
+ Port_(settings.Port)
{
- YQL_ENSURE(WorkerId_ >= 0 && WorkerId_ < WorkersNum_);
THttpServer::TOptions opts;
opts.AddBindAddress(Host_, Port_);
HttpServer_ = MakeHolder<THttpServer>(this, opts.EnableKeepAlive(true).EnableCompression(true));
@@ -94,19 +95,21 @@ public:
THandler deleteTableDataServiceHandler = std::bind(&TTableDataServiceServer::DeleteTableDataServiceHandler, this, std::placeholders::_1);
THandler deleteGroupsTableDataServiceHandler = std::bind(&TTableDataServiceServer::DeleteGroupsTableDataServiceHandler, this, std::placeholders::_1);
THandler clearTableDataServiceHandler = std::bind(&TTableDataServiceServer::ClearTableDataServiceHander, this, std::placeholders::_1);
+ THandler pingTableDataServiceHandler = std::bind(&TTableDataServiceServer::PingTableDataServiceHandler, this, std::placeholders::_1);
Handlers_ = std::unordered_map<ETableDataServiceRequestHandler, THandler>{
{ETableDataServiceRequestHandler::Put, putTableDataServiceHandler},
{ETableDataServiceRequestHandler::Get, getTableDataServiceHandler},
{ETableDataServiceRequestHandler::Delete, deleteTableDataServiceHandler},
{ETableDataServiceRequestHandler::DeleteGroups, deleteGroupsTableDataServiceHandler},
- {ETableDataServiceRequestHandler::Clear, clearTableDataServiceHandler}
+ {ETableDataServiceRequestHandler::Clear, clearTableDataServiceHandler},
+ {ETableDataServiceRequestHandler::Ping, pingTableDataServiceHandler}
};
}
void Start() override {
HttpServer_->Start();
- Cerr << "Table data service server with id " << WorkerId_ << " is listnening on url " << "http://" + Host_ + ":" + ToString(Port_) << "\n";
+ Cerr << "Table data service server is listnening on url " << "http://" + Host_ + ":" + ToString(Port_) << "\n";
}
void Stop() override {
@@ -127,8 +130,6 @@ private:
ILocalTableDataService::TPtr TableDataService_;
const TString Host_;
const ui16 Port_;
- const ui64 WorkerId_;
- const ui64 WorkersNum_;
struct TTableDataServiceKey {
TString Group;
@@ -191,6 +192,10 @@ private:
TableDataService_->Clear().GetValueSync();
return THttpResponse(HTTP_OK);
}
+
+ THttpResponse PingTableDataServiceHandler(THttpInput& /*input*/) {
+ return THttpResponse(HTTP_OK);
+ }
};
} // namespace
diff --git a/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.h b/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.h
index 51ffe88a24d..d6300af6fc8 100644
--- a/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.h
+++ b/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.h
@@ -6,8 +6,6 @@ namespace NYql::NFmr {
using IFmrServer = IRunnable;
struct TTableDataServiceServerSettings {
- ui64 WorkerId;
- ui64 WorkersNum;
TString Host = "localhost";
ui16 Port = 7000;
};
diff --git a/yt/yql/providers/yt/fmr/test_tools/table_data_service/yql_yt_table_data_service_helpers.cpp b/yt/yql/providers/yt/fmr/test_tools/table_data_service/yql_yt_table_data_service_helpers.cpp
index b9d238bb8a2..567d1d42bfa 100644
--- a/yt/yql/providers/yt/fmr/test_tools/table_data_service/yql_yt_table_data_service_helpers.cpp
+++ b/yt/yql/providers/yt/fmr/test_tools/table_data_service/yql_yt_table_data_service_helpers.cpp
@@ -12,7 +12,7 @@ TString WriteHostsToFile(TTempFileHandle& file, ui64 WorkersNum, const std::vect
}
IFmrServer::TPtr MakeTableDataServiceServer(ui16 port) {
- TTableDataServiceServerSettings tableDataServiceWorkerSettings{.WorkerId = 0, .WorkersNum = 1, .Port = port};
+ TTableDataServiceServerSettings tableDataServiceWorkerSettings{.Port = port};
auto tableDataServiceServer = MakeTableDataServiceServer(MakeLocalTableDataService(), tableDataServiceWorkerSettings);
tableDataServiceServer->Start();
return tableDataServiceServer;
diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make b/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make
index 7bde0ed7f9b..e705903f4db 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make
+++ b/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make
@@ -2,6 +2,7 @@ UNITTEST()
SRCS(
yql_yt_worker_ut.cpp
+ yql_yt_worker_status_ut.cpp
)
PEERDIR(
diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_status_ut.cpp b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_status_ut.cpp
new file mode 100644
index 00000000000..6fd69b87227
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_status_ut.cpp
@@ -0,0 +1,43 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/threading/future/async.h>
+#include <util/system/mutex.h>
+#include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h>
+#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h>
+#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.h>
+#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
+
+
+namespace NYql::NFmr {
+
+Y_UNIT_TEST_SUITE(FmrWorkerStatusTests) {
+
+ Y_UNIT_TEST(WorkerStartTransition) {
+ auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService());
+ auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
+ while (!cancelFlag->load()) {
+ return TJobResult{.TaskStatus = ETaskStatus::Completed, .Stats = TStatistics()};
+ }
+ return TJobResult{.TaskStatus = ETaskStatus::Failed, .Stats = TStatistics()};
+ };
+ TFmrJobFactorySettings settings{.NumThreads = 1, .Function = func};
+ auto factory = MakeFmrJobFactory(settings);
+ TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
+ // Initially worker should be in Stopped state
+ UNIT_ASSERT(worker->GetWorkerState() == EFmrWorkerRuntimeState::Stopped);
+ // Start the worker
+ worker->Start();
+
+ // Give some time for the thread to start
+ Sleep(TDuration::MilliSeconds(100));
+
+ // Worker should be in Running state
+ UNIT_ASSERT(worker->GetWorkerState() == EFmrWorkerRuntimeState::Running);
+
+ worker->Stop();
+ UNIT_ASSERT(worker->GetWorkerState() == EFmrWorkerRuntimeState::Stopped);
+ }
+
+}
+
+} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
index e24d5d62a62..9000e38faec 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
+++ b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
@@ -6,7 +6,7 @@
#include <util/thread/pool.h>
#include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h>
#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h>
-#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service//file/yql_yt_file_coordinator_service.h>
+#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.h>
#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
namespace NYql::NFmr {
diff --git a/yt/yql/providers/yt/fmr/worker/impl/ya.make b/yt/yql/providers/yt/fmr/worker/impl/ya.make
index b5c8bb5ab97..1a0001fb28a 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/ya.make
+++ b/yt/yql/providers/yt/fmr/worker/impl/ya.make
@@ -9,6 +9,7 @@ PEERDIR(
library/cpp/threading/future
yt/yql/providers/yt/fmr/coordinator/interface
yt/yql/providers/yt/fmr/job_factory/interface
+ yt/yql/providers/yt/fmr/worker/interface
yql/essentials/utils
yql/essentials/utils/log
)
diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp
index 95d2463787c..f73f907116a 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp
+++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp
@@ -4,6 +4,7 @@
#include <yql/essentials/utils/log/log.h>
#include <yql/essentials/utils/yql_panic.h>
#include "yql_yt_worker_impl.h"
+#include <yt/yql/providers/yt/fmr/worker/interface/yql_yt_fmr_worker.h>
namespace NYql::NFmr {
@@ -12,6 +13,7 @@ namespace {
struct TFmrWorkerState {
TMutex Mutex;
std::unordered_map<TString, TTaskState::TPtr> TaskStatuses;
+ std::atomic<EFmrWorkerRuntimeState> State = EFmrWorkerRuntimeState::Stopped;
};
class TFmrWorker: public IFmrWorker {
@@ -19,7 +21,7 @@ public:
TFmrWorker(IFmrCoordinator::TPtr coordinator, IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings)
: Coordinator_(coordinator),
JobFactory_(jobFactory),
- WorkerState_(std::make_shared<TFmrWorkerState>(TMutex(), std::unordered_map<TString, TTaskState::TPtr>{})),
+ WorkerState_(std::make_shared<TFmrWorkerState>(TMutex(), std::unordered_map<TString, TTaskState::TPtr>{}, EFmrWorkerRuntimeState::Stopped)),
StopWorker_(false),
RandomProvider_(settings.RandomProvider),
WorkerId_(settings.WorkerId),
@@ -34,71 +36,77 @@ public:
void Start() override {
auto mainThreadFunc = [&] () {
+ WorkerState_->State = EFmrWorkerRuntimeState::Running;
+ std::vector<TTaskState::TPtr> taskStates;
+ std::vector<TString> taskIdsToErase;
+
while (!StopWorker_) {
- std::vector<TTaskState::TPtr> taskStates;
- std::vector<TString> taskIdsToErase;
- with_lock(WorkerState_->Mutex) {
- for (auto& [taskId, taskState]: WorkerState_->TaskStatuses) {
- auto taskStatus = taskState->TaskStatus;
- if (taskStatus != ETaskStatus::InProgress) {
- taskIdsToErase.emplace_back(taskId);
+ try {
+ with_lock(WorkerState_->Mutex) {
+ for (auto& [taskId, taskState]: WorkerState_->TaskStatuses) {
+ auto taskStatus = taskState->TaskStatus;
+ if (taskStatus != ETaskStatus::InProgress) {
+ taskIdsToErase.emplace_back(taskId);
+ }
+ taskStates.emplace_back(taskState);
+ }
+ for (auto& taskId: taskIdsToErase) {
+ WorkerState_->TaskStatuses.erase(taskId);
+ TasksCancelStatus_.erase(taskId);
}
- taskStates.emplace_back(taskState);
- }
- for (auto& taskId: taskIdsToErase) {
- WorkerState_->TaskStatuses.erase(taskId);
- TasksCancelStatus_.erase(taskId);
}
- }
- ui64 maxParallelJobCount = JobFactory_->GetMaxParallelJobCount();
- YQL_ENSURE(maxParallelJobCount >= WorkerState_->TaskStatuses.size());
- ui64 availableSlots = maxParallelJobCount - WorkerState_->TaskStatuses.size();
- auto heartbeatRequest = THeartbeatRequest(
- WorkerId_,
- VolatileId_,
- taskStates,
- availableSlots
- );
- auto heartbeatResponseFuture = Coordinator_->SendHeartbeatResponse(heartbeatRequest);
- auto heartbeatResponse = heartbeatResponseFuture.GetValueSync();
-
- if (heartbeatResponse.NeedToRestart) {
- Restart();
- continue;
- }
+ ui64 maxParallelJobCount = JobFactory_->GetMaxParallelJobCount();
+ YQL_ENSURE(maxParallelJobCount >= WorkerState_->TaskStatuses.size());
+ ui64 availableSlots = maxParallelJobCount - WorkerState_->TaskStatuses.size();
+ auto heartbeatRequest = THeartbeatRequest(
+ WorkerId_,
+ VolatileId_,
+ taskStates,
+ availableSlots
+ );
+ auto heartbeatResponseFuture = Coordinator_->SendHeartbeatResponse(heartbeatRequest);
+ auto heartbeatResponse = heartbeatResponseFuture.GetValueSync();
+
+ if (heartbeatResponse.NeedToRestart) {
+ Restart();
+ continue;
+ }
- std::vector<TTask::TPtr> tasksToRun = heartbeatResponse.TasksToRun;
- std::unordered_set<TString> taskToDeleteIds = heartbeatResponse.TaskToDeleteIds;
- YQL_ENSURE(tasksToRun.size() <= availableSlots);
+ std::vector<TTask::TPtr> tasksToRun = heartbeatResponse.TasksToRun;
+ std::unordered_set<TString> taskToDeleteIds = heartbeatResponse.TaskToDeleteIds;
+ YQL_ENSURE(tasksToRun.size() <= availableSlots);
- with_lock(WorkerState_->Mutex) {
- for (auto task: tasksToRun) {
- auto taskId = task->TaskId;
- YQL_ENSURE(!WorkerState_->TaskStatuses.contains(taskId));
- WorkerState_->TaskStatuses[taskId] = MakeTaskState(ETaskStatus::InProgress, taskId);
- TasksCancelStatus_[taskId] = std::make_shared<std::atomic<bool>>(false);
- }
- for (auto& taskToDeleteId: taskToDeleteIds) {
- if (TasksCancelStatus_.contains(taskToDeleteId)) {
- TasksCancelStatus_[taskToDeleteId]->store(true);
+ with_lock(WorkerState_->Mutex) {
+ for (auto task: tasksToRun) {
+ auto taskId = task->TaskId;
+ YQL_ENSURE(!WorkerState_->TaskStatuses.contains(taskId));
+ WorkerState_->TaskStatuses[taskId] = MakeTaskState(ETaskStatus::InProgress, taskId);
+ TasksCancelStatus_[taskId] = std::make_shared<std::atomic<bool>>(false);
+ }
+ for (auto& taskToDeleteId: taskToDeleteIds) {
+ if (TasksCancelStatus_.contains(taskToDeleteId)) {
+ TasksCancelStatus_[taskToDeleteId]->store(true);
+ }
}
- }
- for (auto task: tasksToRun) {
- auto taskId = task->TaskId;
- auto future = JobFactory_->StartJob(task, TasksCancelStatus_[taskId]);
- future.Subscribe([weakState = std::weak_ptr(WorkerState_), task](const auto& jobFuture) {
- auto finalTaskState = jobFuture.GetValue();
- std::shared_ptr<TFmrWorkerState> state = weakState.lock();
- if (state) {
- with_lock(state->Mutex) {
- YQL_ENSURE(state->TaskStatuses.contains(task->TaskId));
- state->TaskStatuses[task->TaskId] = finalTaskState;
+ for (auto task: tasksToRun) {
+ auto taskId = task->TaskId;
+ auto future = JobFactory_->StartJob(task, TasksCancelStatus_[taskId]);
+ future.Subscribe([weakState = std::weak_ptr(WorkerState_), task](const auto& jobFuture) {
+ auto finalTaskState = jobFuture.GetValue();
+ std::shared_ptr<TFmrWorkerState> state = weakState.lock();
+ if (state) {
+ with_lock(state->Mutex) {
+ YQL_ENSURE(state->TaskStatuses.contains(task->TaskId));
+ state->TaskStatuses[task->TaskId] = finalTaskState;
+ }
}
- }
- });
+ });
+ }
}
+ } catch (...) {
+ YQL_CLOG(ERROR, FastMapReduce) << "Error while processing heartbeat request: " << CurrentExceptionMessage();
}
Sleep(TimeToSleepBetweenRequests_);
}
@@ -110,6 +118,7 @@ public:
with_lock(WorkerState_->Mutex) {
StopJobFactoryTasks();
StopWorker_ = true;
+ WorkerState_->State = EFmrWorkerRuntimeState::Stopped;
}
JobFactory_->Stop();
if (MainThread_.joinable()) {
@@ -117,6 +126,10 @@ public:
}
}
+ EFmrWorkerRuntimeState GetWorkerState() const override {
+ return WorkerState_->State;
+ }
+
private:
void Restart() {
YQL_CLOG(INFO, FastMapReduce) << "Worker with id " << WorkerId_ << " is assumed dead by coordinator, restarting";
@@ -168,7 +181,7 @@ private:
} // namespace
IFmrWorker::TPtr MakeFmrWorker(IFmrCoordinator::TPtr coordinator, IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings) {
- return MakeHolder<TFmrWorker>(coordinator, jobFactory, settings);
+ return MakeIntrusive<TFmrWorker>(coordinator, jobFactory, settings);
}
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h
index 1e18f2aa5b2..ed6bb7fe4dc 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h
+++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h
@@ -3,7 +3,9 @@
#include <library/cpp/random_provider/random_provider.h>
#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h>
#include <yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h>
-#include <yql/essentials/utils/runnable.h>
+#include <yt/yql/providers/yt/fmr/worker/interface/yql_yt_fmr_worker.h>
+
+#include <util/system/mutex.h>
namespace NYql::NFmr {
@@ -13,8 +15,6 @@ struct TFmrWorkerSettings {
TDuration TimeToSleepBetweenRequests = TDuration::Seconds(1);
};
-using IFmrWorker = IRunnable;
-
IFmrWorker::TPtr MakeFmrWorker(IFmrCoordinator::TPtr coordinator, IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings);
} // namespace NYql::NFmr
diff --git a/yt/yql/providers/yt/fmr/worker/interface/ya.make b/yt/yql/providers/yt/fmr/worker/interface/ya.make
new file mode 100644
index 00000000000..62fc5f7cb90
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/worker/interface/ya.make
@@ -0,0 +1,9 @@
+LIBRARY()
+
+SRCS(
+ yql_yt_fmr_worker.cpp
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_fmr_worker.cpp b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_fmr_worker.cpp
new file mode 100644
index 00000000000..fb345dd1a53
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_fmr_worker.cpp
@@ -0,0 +1 @@
+#include "yql_yt_fmr_worker.h"
diff --git a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_fmr_worker.h b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_fmr_worker.h
new file mode 100644
index 00000000000..0b5085a7d60
--- /dev/null
+++ b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_fmr_worker.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <yql/essentials/utils/runnable.h>
+
+namespace NYql::NFmr {
+
+enum class EFmrWorkerRuntimeState {
+ Stopped,
+ Running,
+};
+
+class IFmrWorker: public IRunnable {
+public:
+ using TPtr = TIntrusivePtr<IFmrWorker>;
+
+ virtual ~IFmrWorker() = default;
+
+ virtual EFmrWorkerRuntimeState GetWorkerState() const = 0;
+};
+
+} // namespace NYql::NFmr