diff options
author | robot-piglet <[email protected]> | 2025-09-18 11:59:31 +0300 |
---|---|---|
committer | robot-piglet <[email protected]> | 2025-09-18 12:17:20 +0300 |
commit | e2dc5e15bc02bff63e44ed9c74ff1093d87d845b (patch) | |
tree | edd3015ae492064effbe48767a36f53ca23f6710 | |
parent | d2d6681538770d0c2725615373a6c2e07adf653e (diff) |
Intermediate changes
commit_hash:1bb058fdc353cb6abf7a9f65e8c1a61a424274f3
15 files changed, 202 insertions, 77 deletions
diff --git a/yql/essentials/tests/common/test_framework/yqlrun.py b/yql/essentials/tests/common/test_framework/yqlrun.py index fdf5110f5f3..5629875a8ef 100644 --- a/yql/essentials/tests/common/test_framework/yqlrun.py +++ b/yql/essentials/tests/common/test_framework/yqlrun.py @@ -1,3 +1,4 @@ +import errno import os import shutil import yatest.common @@ -23,6 +24,25 @@ FIX_DIR_PREFIXES = { } +def safe_symlink(src, dst): + """ + If dst is an existing directory, creates a symlink inside it with the basename of src. + If dst is a file path (does not exist or is not a directory), creates the symlink at dst. + If the link already exists, does nothing. + """ + # If dst is an existing directory, append basename of src + if os.path.isdir(dst): + link_path = os.path.join(dst, os.path.basename(src)) + else: + link_path = dst + + try: + os.symlink(src, link_path) + except OSError as e: + if e.errno != errno.EEXIST: + raise + + class YQLRun(object): def __init__(self, udfs_dir=None, prov='yt', use_sql2yql=False, keep_temp=True, binary=None, gateway_config=None, @@ -210,7 +230,7 @@ class YQLRun(object): copy_dest = os.path.join(res_dir, f) if not os.path.exists(os.path.dirname(copy_dest)): os.makedirs(os.path.dirname(copy_dest)) - shutil.copy2( + safe_symlink( real_path, copy_dest, ) @@ -232,9 +252,9 @@ class YQLRun(object): else: copy_dest = res_dir files[f] = os.path.basename(files[f]) - shutil.copy2(path_to_copy, copy_dest) + safe_symlink(path_to_copy, copy_dest) else: - shutil.copy2(files[f], res_dir) + safe_symlink(files[f], res_dir) files[f] = os.path.basename(files[f]) cmd += yql_utils.get_cmd_for_files('--file', files) diff --git a/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.cpp b/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.cpp index 0cf9afdcdd9..9b0fe37913e 100644 --- a/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.cpp @@ -76,8 +76,9 @@ public: TStringStream outputStream; auto sendHeartbeatRequestFunc = [&]() { - httpClient.DoPost(sendHearbeatRequestUrl, protoSendHeartbeatRequest.SerializeAsString(), &outputStream, GetHeadersWithLogContext(Headers_, false)); + auto statusCode = httpClient.DoPost(sendHearbeatRequestUrl, protoSendHeartbeatRequest.SerializeAsString(), &outputStream, GetHeadersWithLogContext(Headers_, false)); TString serializedResponse = outputStream.ReadAll(); + HandleHttpError(statusCode, serializedResponse); NProto::THeartbeatResponse protoHeartbeatResponse; YQL_ENSURE(protoHeartbeatResponse.ParseFromString(serializedResponse)); return NThreading::MakeFuture(HeartbeatResponseFromProto(protoHeartbeatResponse)); @@ -132,6 +133,14 @@ private: std::function<void(const yexception&)> OnFail_ = [](const yexception& exc) { YQL_CLOG(DEBUG, FastMapReduce) << "Got exception, retrying: " << exc.what(); }; + + void HandleHttpError(TKeepAliveHttpClient::THttpCode statusCode, TString serializedResponse) { + if (statusCode = HTTP_OK) return; + NProto::TErrorResponse protoErrorResponse; + YQL_ENSURE(protoErrorResponse.ParseFromString(serializedResponse)); + ythrow yexception() << protoErrorResponse.GetErrorMessage(); + } + }; } // namespace diff --git a/yt/yql/providers/yt/fmr/proto/coordinator.proto b/yt/yql/providers/yt/fmr/proto/coordinator.proto index 1d159f4783e..91aac08c250 100644 --- a/yt/yql/providers/yt/fmr/proto/coordinator.proto +++ b/yt/yql/providers/yt/fmr/proto/coordinator.proto @@ -58,3 +58,7 @@ message TClearSessionRequest { message TClearSessionResponse { EOperationStatus Status = 1; }; + +message TErrorResponse { + string ErrorMessage = 1; +} diff --git a/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.cpp b/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.cpp index aa9def483aa..2d6e16339d8 100644 --- a/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.cpp +++ b/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.cpp @@ -25,7 +25,8 @@ enum class ETableDataServiceRequestHandler { Get, Delete, DeleteGroups, - Clear + Clear, + Ping }; class TReplier: public TRequestReplier { @@ -70,6 +71,9 @@ private: } else if (queryPath == "clear") { YQL_ENSURE(httpRequest.Method == "POST"); return ETableDataServiceRequestHandler::Clear; + } else if (queryPath == "ping") { + YQL_ENSURE(httpRequest.Method == "GET"); + return ETableDataServiceRequestHandler::Ping; } return Nothing(); } @@ -80,11 +84,8 @@ public: TTableDataServiceServer(ILocalTableDataService::TPtr tableDataService, const TTableDataServiceServerSettings& settings) : TableDataService_(tableDataService), Host_(settings.Host), - Port_(settings.Port), - WorkerId_(settings.WorkerId), - WorkersNum_(settings.WorkersNum) + Port_(settings.Port) { - YQL_ENSURE(WorkerId_ >= 0 && WorkerId_ < WorkersNum_); THttpServer::TOptions opts; opts.AddBindAddress(Host_, Port_); HttpServer_ = MakeHolder<THttpServer>(this, opts.EnableKeepAlive(true).EnableCompression(true)); @@ -94,19 +95,21 @@ public: THandler deleteTableDataServiceHandler = std::bind(&TTableDataServiceServer::DeleteTableDataServiceHandler, this, std::placeholders::_1); THandler deleteGroupsTableDataServiceHandler = std::bind(&TTableDataServiceServer::DeleteGroupsTableDataServiceHandler, this, std::placeholders::_1); THandler clearTableDataServiceHandler = std::bind(&TTableDataServiceServer::ClearTableDataServiceHander, this, std::placeholders::_1); + THandler pingTableDataServiceHandler = std::bind(&TTableDataServiceServer::PingTableDataServiceHandler, this, std::placeholders::_1); Handlers_ = std::unordered_map<ETableDataServiceRequestHandler, THandler>{ {ETableDataServiceRequestHandler::Put, putTableDataServiceHandler}, {ETableDataServiceRequestHandler::Get, getTableDataServiceHandler}, {ETableDataServiceRequestHandler::Delete, deleteTableDataServiceHandler}, {ETableDataServiceRequestHandler::DeleteGroups, deleteGroupsTableDataServiceHandler}, - {ETableDataServiceRequestHandler::Clear, clearTableDataServiceHandler} + {ETableDataServiceRequestHandler::Clear, clearTableDataServiceHandler}, + {ETableDataServiceRequestHandler::Ping, pingTableDataServiceHandler} }; } void Start() override { HttpServer_->Start(); - Cerr << "Table data service server with id " << WorkerId_ << " is listnening on url " << "http://" + Host_ + ":" + ToString(Port_) << "\n"; + Cerr << "Table data service server is listnening on url " << "http://" + Host_ + ":" + ToString(Port_) << "\n"; } void Stop() override { @@ -127,8 +130,6 @@ private: ILocalTableDataService::TPtr TableDataService_; const TString Host_; const ui16 Port_; - const ui64 WorkerId_; - const ui64 WorkersNum_; struct TTableDataServiceKey { TString Group; @@ -191,6 +192,10 @@ private: TableDataService_->Clear().GetValueSync(); return THttpResponse(HTTP_OK); } + + THttpResponse PingTableDataServiceHandler(THttpInput& /*input*/) { + return THttpResponse(HTTP_OK); + } }; } // namespace diff --git a/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.h b/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.h index 51ffe88a24d..d6300af6fc8 100644 --- a/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.h +++ b/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.h @@ -6,8 +6,6 @@ namespace NYql::NFmr { using IFmrServer = IRunnable; struct TTableDataServiceServerSettings { - ui64 WorkerId; - ui64 WorkersNum; TString Host = "localhost"; ui16 Port = 7000; }; diff --git a/yt/yql/providers/yt/fmr/test_tools/table_data_service/yql_yt_table_data_service_helpers.cpp b/yt/yql/providers/yt/fmr/test_tools/table_data_service/yql_yt_table_data_service_helpers.cpp index b9d238bb8a2..567d1d42bfa 100644 --- a/yt/yql/providers/yt/fmr/test_tools/table_data_service/yql_yt_table_data_service_helpers.cpp +++ b/yt/yql/providers/yt/fmr/test_tools/table_data_service/yql_yt_table_data_service_helpers.cpp @@ -12,7 +12,7 @@ TString WriteHostsToFile(TTempFileHandle& file, ui64 WorkersNum, const std::vect } IFmrServer::TPtr MakeTableDataServiceServer(ui16 port) { - TTableDataServiceServerSettings tableDataServiceWorkerSettings{.WorkerId = 0, .WorkersNum = 1, .Port = port}; + TTableDataServiceServerSettings tableDataServiceWorkerSettings{.Port = port}; auto tableDataServiceServer = MakeTableDataServiceServer(MakeLocalTableDataService(), tableDataServiceWorkerSettings); tableDataServiceServer->Start(); return tableDataServiceServer; diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make b/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make index 7bde0ed7f9b..e705903f4db 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make +++ b/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make @@ -2,6 +2,7 @@ UNITTEST() SRCS( yql_yt_worker_ut.cpp + yql_yt_worker_status_ut.cpp ) PEERDIR( diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_status_ut.cpp b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_status_ut.cpp new file mode 100644 index 00000000000..6fd69b87227 --- /dev/null +++ b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_status_ut.cpp @@ -0,0 +1,43 @@ +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/threading/future/async.h> +#include <util/system/mutex.h> +#include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h> +#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h> +#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.h> +#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h> + + +namespace NYql::NFmr { + +Y_UNIT_TEST_SUITE(FmrWorkerStatusTests) { + + Y_UNIT_TEST(WorkerStartTransition) { + auto coordinator = MakeFmrCoordinator(TFmrCoordinatorSettings(), MakeFileYtCoordinatorService()); + auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { + while (!cancelFlag->load()) { + return TJobResult{.TaskStatus = ETaskStatus::Completed, .Stats = TStatistics()}; + } + return TJobResult{.TaskStatus = ETaskStatus::Failed, .Stats = TStatistics()}; + }; + TFmrJobFactorySettings settings{.NumThreads = 1, .Function = func}; + auto factory = MakeFmrJobFactory(settings); + TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)}; + auto worker = MakeFmrWorker(coordinator, factory, workerSettings); + // Initially worker should be in Stopped state + UNIT_ASSERT(worker->GetWorkerState() == EFmrWorkerRuntimeState::Stopped); + // Start the worker + worker->Start(); + + // Give some time for the thread to start + Sleep(TDuration::MilliSeconds(100)); + + // Worker should be in Running state + UNIT_ASSERT(worker->GetWorkerState() == EFmrWorkerRuntimeState::Running); + + worker->Stop(); + UNIT_ASSERT(worker->GetWorkerState() == EFmrWorkerRuntimeState::Stopped); + } + +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp index e24d5d62a62..9000e38faec 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp +++ b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp @@ -6,7 +6,7 @@ #include <util/thread/pool.h> #include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h> #include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h> -#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service//file/yql_yt_file_coordinator_service.h> +#include <yt/yql/providers/yt/fmr/coordinator/yt_coordinator_service/file/yql_yt_file_coordinator_service.h> #include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h> namespace NYql::NFmr { diff --git a/yt/yql/providers/yt/fmr/worker/impl/ya.make b/yt/yql/providers/yt/fmr/worker/impl/ya.make index b5c8bb5ab97..1a0001fb28a 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/ya.make +++ b/yt/yql/providers/yt/fmr/worker/impl/ya.make @@ -9,6 +9,7 @@ PEERDIR( library/cpp/threading/future yt/yql/providers/yt/fmr/coordinator/interface yt/yql/providers/yt/fmr/job_factory/interface + yt/yql/providers/yt/fmr/worker/interface yql/essentials/utils yql/essentials/utils/log ) diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp index 95d2463787c..f73f907116a 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp +++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp @@ -4,6 +4,7 @@ #include <yql/essentials/utils/log/log.h> #include <yql/essentials/utils/yql_panic.h> #include "yql_yt_worker_impl.h" +#include <yt/yql/providers/yt/fmr/worker/interface/yql_yt_fmr_worker.h> namespace NYql::NFmr { @@ -12,6 +13,7 @@ namespace { struct TFmrWorkerState { TMutex Mutex; std::unordered_map<TString, TTaskState::TPtr> TaskStatuses; + std::atomic<EFmrWorkerRuntimeState> State = EFmrWorkerRuntimeState::Stopped; }; class TFmrWorker: public IFmrWorker { @@ -19,7 +21,7 @@ public: TFmrWorker(IFmrCoordinator::TPtr coordinator, IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings) : Coordinator_(coordinator), JobFactory_(jobFactory), - WorkerState_(std::make_shared<TFmrWorkerState>(TMutex(), std::unordered_map<TString, TTaskState::TPtr>{})), + WorkerState_(std::make_shared<TFmrWorkerState>(TMutex(), std::unordered_map<TString, TTaskState::TPtr>{}, EFmrWorkerRuntimeState::Stopped)), StopWorker_(false), RandomProvider_(settings.RandomProvider), WorkerId_(settings.WorkerId), @@ -34,71 +36,77 @@ public: void Start() override { auto mainThreadFunc = [&] () { + WorkerState_->State = EFmrWorkerRuntimeState::Running; + std::vector<TTaskState::TPtr> taskStates; + std::vector<TString> taskIdsToErase; + while (!StopWorker_) { - std::vector<TTaskState::TPtr> taskStates; - std::vector<TString> taskIdsToErase; - with_lock(WorkerState_->Mutex) { - for (auto& [taskId, taskState]: WorkerState_->TaskStatuses) { - auto taskStatus = taskState->TaskStatus; - if (taskStatus != ETaskStatus::InProgress) { - taskIdsToErase.emplace_back(taskId); + try { + with_lock(WorkerState_->Mutex) { + for (auto& [taskId, taskState]: WorkerState_->TaskStatuses) { + auto taskStatus = taskState->TaskStatus; + if (taskStatus != ETaskStatus::InProgress) { + taskIdsToErase.emplace_back(taskId); + } + taskStates.emplace_back(taskState); + } + for (auto& taskId: taskIdsToErase) { + WorkerState_->TaskStatuses.erase(taskId); + TasksCancelStatus_.erase(taskId); } - taskStates.emplace_back(taskState); - } - for (auto& taskId: taskIdsToErase) { - WorkerState_->TaskStatuses.erase(taskId); - TasksCancelStatus_.erase(taskId); } - } - ui64 maxParallelJobCount = JobFactory_->GetMaxParallelJobCount(); - YQL_ENSURE(maxParallelJobCount >= WorkerState_->TaskStatuses.size()); - ui64 availableSlots = maxParallelJobCount - WorkerState_->TaskStatuses.size(); - auto heartbeatRequest = THeartbeatRequest( - WorkerId_, - VolatileId_, - taskStates, - availableSlots - ); - auto heartbeatResponseFuture = Coordinator_->SendHeartbeatResponse(heartbeatRequest); - auto heartbeatResponse = heartbeatResponseFuture.GetValueSync(); - - if (heartbeatResponse.NeedToRestart) { - Restart(); - continue; - } + ui64 maxParallelJobCount = JobFactory_->GetMaxParallelJobCount(); + YQL_ENSURE(maxParallelJobCount >= WorkerState_->TaskStatuses.size()); + ui64 availableSlots = maxParallelJobCount - WorkerState_->TaskStatuses.size(); + auto heartbeatRequest = THeartbeatRequest( + WorkerId_, + VolatileId_, + taskStates, + availableSlots + ); + auto heartbeatResponseFuture = Coordinator_->SendHeartbeatResponse(heartbeatRequest); + auto heartbeatResponse = heartbeatResponseFuture.GetValueSync(); + + if (heartbeatResponse.NeedToRestart) { + Restart(); + continue; + } - std::vector<TTask::TPtr> tasksToRun = heartbeatResponse.TasksToRun; - std::unordered_set<TString> taskToDeleteIds = heartbeatResponse.TaskToDeleteIds; - YQL_ENSURE(tasksToRun.size() <= availableSlots); + std::vector<TTask::TPtr> tasksToRun = heartbeatResponse.TasksToRun; + std::unordered_set<TString> taskToDeleteIds = heartbeatResponse.TaskToDeleteIds; + YQL_ENSURE(tasksToRun.size() <= availableSlots); - with_lock(WorkerState_->Mutex) { - for (auto task: tasksToRun) { - auto taskId = task->TaskId; - YQL_ENSURE(!WorkerState_->TaskStatuses.contains(taskId)); - WorkerState_->TaskStatuses[taskId] = MakeTaskState(ETaskStatus::InProgress, taskId); - TasksCancelStatus_[taskId] = std::make_shared<std::atomic<bool>>(false); - } - for (auto& taskToDeleteId: taskToDeleteIds) { - if (TasksCancelStatus_.contains(taskToDeleteId)) { - TasksCancelStatus_[taskToDeleteId]->store(true); + with_lock(WorkerState_->Mutex) { + for (auto task: tasksToRun) { + auto taskId = task->TaskId; + YQL_ENSURE(!WorkerState_->TaskStatuses.contains(taskId)); + WorkerState_->TaskStatuses[taskId] = MakeTaskState(ETaskStatus::InProgress, taskId); + TasksCancelStatus_[taskId] = std::make_shared<std::atomic<bool>>(false); + } + for (auto& taskToDeleteId: taskToDeleteIds) { + if (TasksCancelStatus_.contains(taskToDeleteId)) { + TasksCancelStatus_[taskToDeleteId]->store(true); + } } - } - for (auto task: tasksToRun) { - auto taskId = task->TaskId; - auto future = JobFactory_->StartJob(task, TasksCancelStatus_[taskId]); - future.Subscribe([weakState = std::weak_ptr(WorkerState_), task](const auto& jobFuture) { - auto finalTaskState = jobFuture.GetValue(); - std::shared_ptr<TFmrWorkerState> state = weakState.lock(); - if (state) { - with_lock(state->Mutex) { - YQL_ENSURE(state->TaskStatuses.contains(task->TaskId)); - state->TaskStatuses[task->TaskId] = finalTaskState; + for (auto task: tasksToRun) { + auto taskId = task->TaskId; + auto future = JobFactory_->StartJob(task, TasksCancelStatus_[taskId]); + future.Subscribe([weakState = std::weak_ptr(WorkerState_), task](const auto& jobFuture) { + auto finalTaskState = jobFuture.GetValue(); + std::shared_ptr<TFmrWorkerState> state = weakState.lock(); + if (state) { + with_lock(state->Mutex) { + YQL_ENSURE(state->TaskStatuses.contains(task->TaskId)); + state->TaskStatuses[task->TaskId] = finalTaskState; + } } - } - }); + }); + } } + } catch (...) { + YQL_CLOG(ERROR, FastMapReduce) << "Error while processing heartbeat request: " << CurrentExceptionMessage(); } Sleep(TimeToSleepBetweenRequests_); } @@ -110,6 +118,7 @@ public: with_lock(WorkerState_->Mutex) { StopJobFactoryTasks(); StopWorker_ = true; + WorkerState_->State = EFmrWorkerRuntimeState::Stopped; } JobFactory_->Stop(); if (MainThread_.joinable()) { @@ -117,6 +126,10 @@ public: } } + EFmrWorkerRuntimeState GetWorkerState() const override { + return WorkerState_->State; + } + private: void Restart() { YQL_CLOG(INFO, FastMapReduce) << "Worker with id " << WorkerId_ << " is assumed dead by coordinator, restarting"; @@ -168,7 +181,7 @@ private: } // namespace IFmrWorker::TPtr MakeFmrWorker(IFmrCoordinator::TPtr coordinator, IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings) { - return MakeHolder<TFmrWorker>(coordinator, jobFactory, settings); + return MakeIntrusive<TFmrWorker>(coordinator, jobFactory, settings); } } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h index 1e18f2aa5b2..ed6bb7fe4dc 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h +++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h @@ -3,7 +3,9 @@ #include <library/cpp/random_provider/random_provider.h> #include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h> #include <yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h> -#include <yql/essentials/utils/runnable.h> +#include <yt/yql/providers/yt/fmr/worker/interface/yql_yt_fmr_worker.h> + +#include <util/system/mutex.h> namespace NYql::NFmr { @@ -13,8 +15,6 @@ struct TFmrWorkerSettings { TDuration TimeToSleepBetweenRequests = TDuration::Seconds(1); }; -using IFmrWorker = IRunnable; - IFmrWorker::TPtr MakeFmrWorker(IFmrCoordinator::TPtr coordinator, IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings); } // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/worker/interface/ya.make b/yt/yql/providers/yt/fmr/worker/interface/ya.make new file mode 100644 index 00000000000..62fc5f7cb90 --- /dev/null +++ b/yt/yql/providers/yt/fmr/worker/interface/ya.make @@ -0,0 +1,9 @@ +LIBRARY() + +SRCS( + yql_yt_fmr_worker.cpp +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_fmr_worker.cpp b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_fmr_worker.cpp new file mode 100644 index 00000000000..fb345dd1a53 --- /dev/null +++ b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_fmr_worker.cpp @@ -0,0 +1 @@ +#include "yql_yt_fmr_worker.h" diff --git a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_fmr_worker.h b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_fmr_worker.h new file mode 100644 index 00000000000..0b5085a7d60 --- /dev/null +++ b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_fmr_worker.h @@ -0,0 +1,21 @@ +#pragma once + +#include <yql/essentials/utils/runnable.h> + +namespace NYql::NFmr { + +enum class EFmrWorkerRuntimeState { + Stopped, + Running, +}; + +class IFmrWorker: public IRunnable { +public: + using TPtr = TIntrusivePtr<IFmrWorker>; + + virtual ~IFmrWorker() = default; + + virtual EFmrWorkerRuntimeState GetWorkerState() const = 0; +}; + +} // namespace NYql::NFmr |