diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2025-02-19 18:11:41 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2025-02-19 18:30:12 +0300 |
commit | a3f26a347afa5953c2844c2cbe088d1198e733fe (patch) | |
tree | 25332166965cd21ced66ba529cab012cb4b28be7 | |
parent | 52daccf61e2e827114cfb3372071cddaec7974ba (diff) | |
download | ydb-a3f26a347afa5953c2844c2cbe088d1198e733fe.tar.gz |
Intermediate changes
commit_hash:d1545de5448760526e5b01472ffac8185967334a
11 files changed, 43 insertions, 49 deletions
diff --git a/yql/essentials/utils/runnable.h b/yql/essentials/utils/runnable.h index 8179aa7496..1c00894a31 100644 --- a/yql/essentials/utils/runnable.h +++ b/yql/essentials/utils/runnable.h @@ -1,3 +1,5 @@ +#pragma once + #include <util/generic/ptr.h> namespace NYql { diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp index a934fcc37b..36d7020adc 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp @@ -70,7 +70,7 @@ auto defaultTaskFunction = [] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic Sleep(TDuration::Seconds(4)); return ETaskStatus::Completed; } - return ETaskStatus::Aborted; + return ETaskStatus::Failed; }; Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { @@ -128,7 +128,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { TFmrJobFactorySettings settings{.NumThreads = 3, .Function = defaultTaskFunction}; auto factory = MakeFmrJobFactory(settings); - TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; + TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)}; auto worker = MakeFmrWorker(coordinator, factory, workerSettings); worker->Start(); Sleep(TDuration::Seconds(1)); @@ -146,7 +146,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { } TFmrJobFactorySettings settings{.NumThreads = 10, .Function = defaultTaskFunction}; auto factory = MakeFmrJobFactory(settings); - TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; + TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)}; auto worker = MakeFmrWorker(coordinator, factory, workerSettings); worker->Start(); Sleep(TDuration::Seconds(6)); @@ -184,12 +184,12 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { Sleep(TDuration::Seconds(1)); return ETaskStatus::Completed; } - return ETaskStatus::Aborted; + return ETaskStatus::Failed; }; TFmrJobFactorySettings settings{.NumThreads = 10, .Function = func}; auto factory = MakeFmrJobFactory(settings); - TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; + TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)}; auto worker = MakeFmrWorker(coordinator, factory, workerSettings); worker->Start(); Sleep(TDuration::Seconds(5)); @@ -211,7 +211,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { TFmrJobFactorySettings settings{.NumThreads = 3, .Function = defaultTaskFunction}; auto factory = MakeFmrJobFactory(settings); - TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; + TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)}; auto worker = MakeFmrWorker(coordinator, factory, workerSettings); worker->Start(); @@ -229,7 +229,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { TFmrJobFactorySettings settings{.NumThreads = 3, .Function = defaultTaskFunction}; auto factory = MakeFmrJobFactory(settings); - TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; + TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)}; auto worker = MakeFmrWorker(coordinator, factory, workerSettings); worker->Start(); @@ -259,11 +259,11 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { return ETaskStatus::Completed; } } - return ETaskStatus::Aborted; + return ETaskStatus::Failed; }; TFmrJobFactorySettings settings{.NumThreads =3, .Function=func}; auto factory = MakeFmrJobFactory(settings); - TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; + TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)}; TFmrWorkerProxy workerProxy(coordinator, factory, workerSettings); workerProxy.Start(); @@ -290,12 +290,12 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { Sleep(TDuration::Seconds(2)); throw std::runtime_error{"Function crashed"}; } - return ETaskStatus::Aborted; + return ETaskStatus::Failed; }; TFmrJobFactorySettings settings{.NumThreads = 3, .Function = func}; auto factory = MakeFmrJobFactory(settings); - TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; + TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)}; auto worker = MakeFmrWorker(coordinator, factory, workerSettings); worker->Start(); Sleep(TDuration::Seconds(4)); diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp index 9205eae70d..bec951c180 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp @@ -21,7 +21,7 @@ struct TOperationInfo { }; struct TIdempotencyKeyInfo { - TString operationId; + TString OperationId; TInstant OperationCreationTime; }; @@ -47,13 +47,13 @@ public: TGuard<TMutex> guard(Mutex_); TMaybe<TString> IdempotencyKey = request.IdempotencyKey; if (IdempotencyKey && IdempotencyKeys_.contains(*IdempotencyKey)) { - auto operationId = IdempotencyKeys_[*IdempotencyKey].operationId; + auto operationId = IdempotencyKeys_[*IdempotencyKey].OperationId; auto& operationInfo = Operations_[operationId]; return NThreading::MakeFuture(TStartOperationResponse(operationInfo.OperationStatus, operationId)); } auto operationId = GenerateId(); if (IdempotencyKey) { - IdempotencyKeys_[*IdempotencyKey] = TIdempotencyKeyInfo{.operationId = operationId, .OperationCreationTime=TInstant::Now()}; + IdempotencyKeys_[*IdempotencyKey] = TIdempotencyKeyInfo{.OperationId = operationId, .OperationCreationTime=TInstant::Now()}; } TString taskId = GenerateId(); @@ -107,17 +107,16 @@ public: TGuard<TMutex> guard(Mutex_); ui32 workerId = request.WorkerId; - YQL_ENSURE(workerId >= 1 && workerId <= WorkersNum_); - if (! workerToVolatileId_.contains(workerId)) { - workerToVolatileId_[workerId] = request.VolatileId; - } else if (request.VolatileId != workerToVolatileId_[workerId]) { - workerToVolatileId_[workerId] = request.VolatileId; + YQL_ENSURE(workerId >= 0 && workerId < WorkersNum_); + if (! WorkerToVolatileId_.contains(workerId)) { + WorkerToVolatileId_[workerId] = request.VolatileId; + } else if (request.VolatileId != WorkerToVolatileId_[workerId]) { + WorkerToVolatileId_[workerId] = request.VolatileId; for (auto& [taskId, taskInfo]: Tasks_) { auto taskStatus = Tasks_[taskId].TaskStatus; auto operationId = Tasks_[taskId].OperationId; if (taskStatus == ETaskStatus::InProgress) { TaskToDeleteIds_.insert(taskId); // Task is currently running, send signal to worker to cancel - TString sessionId = Operations_[operationId].SessionId; TFmrError error{ .Component = EFmrComponent::Coordinator, .ErrorMessage = "Max retries limit exceeded", .OperationId = operationId}; SetUnfinishedTaskStatus(taskId, ETaskStatus::Failed, error); @@ -145,7 +144,7 @@ public: } for (auto& taskId: TaskToDeleteIds_) { - SetUnfinishedTaskStatus(taskId, ETaskStatus::Aborted); + SetUnfinishedTaskStatus(taskId, ETaskStatus::Failed); } return NThreading::MakeFuture(THeartbeatResponse{.TasksToRun = tasksToRun, .TaskToDeleteIds = TaskToDeleteIds_}); } @@ -159,7 +158,7 @@ private: auto currentTime = TInstant::Now(); for (auto it = IdempotencyKeys_.begin(); it != IdempotencyKeys_.end();) { auto operationCreationTime = it->second.OperationCreationTime; - auto operationId = it->second.operationId; + auto operationId = it->second.OperationId; if (currentTime - operationCreationTime > IdempotencyKeyStoreTime_) { it = IdempotencyKeys_.erase(it); if (Operations_.contains(operationId)) { @@ -229,7 +228,7 @@ private: TMutex Mutex_; const ui32 WorkersNum_; - std::unordered_map<ui32, TString> workerToVolatileId_; // worker id -> volatile id + std::unordered_map<ui32, TString> WorkerToVolatileId_; // worker id -> volatile id const TIntrusivePtr<IRandomProvider> RandomProvider_; std::thread ClearIdempotencyKeysThread_; std::atomic<bool> StopCoordinator_; diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp b/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp index 89e95debdd..da2706c49c 100644 --- a/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp +++ b/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp @@ -18,7 +18,7 @@ Y_UNIT_TEST_SUITE(FmrFactoryTests) { *operationResults = "operation_result"; return ETaskStatus::Completed; } - return ETaskStatus::Aborted; + return ETaskStatus::Failed; }; TFmrJobFactorySettings settings{.NumThreads =3, .Function=func}; auto factory = MakeFmrJobFactory(settings); @@ -44,7 +44,7 @@ Y_UNIT_TEST_SUITE(FmrFactoryTests) { return ETaskStatus::Completed; } } - return ETaskStatus::Aborted; + return ETaskStatus::Failed; }; TFmrJobFactorySettings settings{.NumThreads =3, .Function=func}; @@ -56,7 +56,7 @@ Y_UNIT_TEST_SUITE(FmrFactoryTests) { cancelFlag->store(true); auto taskResult = futureTaskStatus.GetValueSync(); ETaskStatus taskStatus = taskResult->TaskStatus; - UNIT_ASSERT_VALUES_EQUAL(taskStatus, ETaskStatus::Aborted); + UNIT_ASSERT_VALUES_EQUAL(taskStatus, ETaskStatus::Failed); UNIT_ASSERT_NO_DIFF(*operationResults, "computing_result"); } } diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp index af154cb442..a82b8b8551 100644 --- a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp +++ b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp @@ -40,12 +40,11 @@ public: return future; } -private: - void Start() { + void Start() override { ThreadPool_ = CreateThreadPool(NumThreads_); } - void Stop() { + void Stop() override { ThreadPool_->Stop(); } diff --git a/yt/yql/providers/yt/fmr/job_factory/interface/ya.make b/yt/yql/providers/yt/fmr/job_factory/interface/ya.make index ed05358ed3..32ecd264ae 100644 --- a/yt/yql/providers/yt/fmr/job_factory/interface/ya.make +++ b/yt/yql/providers/yt/fmr/job_factory/interface/ya.make @@ -6,6 +6,7 @@ SRCS( PEERDIR( library/cpp/threading/future + yql/essentials/utils ) YQL_LAST_ABI_VERSION() diff --git a/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h index d7c29fc818..4a489a6cad 100644 --- a/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h +++ b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h @@ -2,10 +2,11 @@ #include <library/cpp/threading/future/core/future.h> #include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> +#include <yql/essentials/utils/runnable.h> namespace NYql::NFmr { -class IFmrJobFactory: public TThrRefBase { +class IFmrJobFactory: public IRunnable { public: using TPtr = TIntrusivePtr<IFmrJobFactory>; diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h index 9d69d008b2..4e14478472 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h @@ -21,8 +21,7 @@ enum class ETaskStatus { Accepted, InProgress, Failed, - Completed, - Aborted + Completed }; enum class ETaskType { diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp index c5af15bb3a..4996c932c4 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp +++ b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp @@ -28,12 +28,12 @@ Y_UNIT_TEST_SUITE(FmrWorkerTests) { *operationResults = "operation_result"; return ETaskStatus::Completed; } - return ETaskStatus::Aborted; + return ETaskStatus::Failed; }; TFmrJobFactorySettings settings{.NumThreads = 3, .Function = func}; auto factory = MakeFmrJobFactory(settings); - TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; + TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)}; auto worker = MakeFmrWorker(coordinator, factory, workerSettings); worker->Start(); coordinator->StartOperation(CreateOperationRequest()).GetValueSync(); @@ -56,11 +56,11 @@ Y_UNIT_TEST_SUITE(FmrWorkerTests) { } } *operationResults = "operation_cancelled"; - return ETaskStatus::Aborted; + return ETaskStatus::Failed; }; TFmrJobFactorySettings settings{.NumThreads =3, .Function=func}; auto factory = MakeFmrJobFactory(settings); - TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; + TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)}; auto worker = MakeFmrWorker(coordinator, factory, workerSettings); worker->Start(); auto operationId = coordinator->StartOperation(CreateOperationRequest()).GetValueSync().OperationId; @@ -82,12 +82,12 @@ Y_UNIT_TEST_SUITE(FmrWorkerTests) { (*operationResult)++; return ETaskStatus::Completed; } - return ETaskStatus::Aborted; + return ETaskStatus::Failed; }; TFmrJobFactorySettings settings{.NumThreads =3, .Function=func}; auto factory = MakeFmrJobFactory(settings); - TFmrWorkerSettings firstWorkerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; - TFmrWorkerSettings secondWorkerSettings{.WorkerId = 2, .RandomProvider = CreateDeterministicRandomProvider(2)}; + TFmrWorkerSettings firstWorkerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)}; + TFmrWorkerSettings secondWorkerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(2)}; auto firstWorker = MakeFmrWorker(coordinator, factory, firstWorkerSettings); auto secondWorker = MakeFmrWorker(coordinator, factory, secondWorkerSettings); firstWorker->Start(); diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp index a1a3a186fe..b5eaaae7f1 100644 --- a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp +++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp @@ -1,6 +1,5 @@ #include <library/cpp/threading/future/wait/wait.h> #include <thread> -#include <ranges> #include <util/system/mutex.h> #include <yql/essentials/utils/yql_panic.h> #include "yql_yt_worker_impl.h" @@ -12,7 +11,6 @@ namespace { struct TFmrWorkerState { TMutex Mutex; std::unordered_map<TString, TTaskResult::TPtr> TaskStatuses; - std::unordered_map<TString, NThreading::TFuture<TTaskResult::TPtr>> TaskFutures; }; class TFmrWorker: public IFmrWorker { @@ -20,7 +18,7 @@ public: TFmrWorker(IFmrCoordinator::TPtr coordinator, IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings) : Coordinator_(coordinator), JobFactory_(jobFactory), - WorkerState_(std::make_shared<TFmrWorkerState>(TMutex(), std::unordered_map<TString, TTaskResult::TPtr>{}, std::unordered_map<TString, NThreading::TFuture<TTaskResult::TPtr>>{})), + WorkerState_(std::make_shared<TFmrWorkerState>(TMutex(), std::unordered_map<TString, TTaskResult::TPtr>{})), StopWorker_(false), RandomProvider_(settings.RandomProvider), WorkerId_(settings.WorkerId), @@ -86,12 +84,9 @@ public: with_lock(state->Mutex) { YQL_ENSURE(state->TaskStatuses.contains(task->TaskId)); state->TaskStatuses[task->TaskId] = finalTaskStatus; - state->TaskFutures.erase(task->TaskId); } } }); - YQL_ENSURE(!WorkerState_->TaskFutures.contains(taskId)); - WorkerState_->TaskFutures[taskId] = future; } } Sleep(TimeToSleepBetweenRequests_); @@ -107,10 +102,8 @@ public: taskInfo.second->store(true); } StopWorker_ = true; - auto futuresView = std::views::values(WorkerState_->TaskFutures); - taskFutures = std::vector<NThreading::TFuture<TTaskResult::TPtr>>{futuresView.begin(), futuresView.end()}; } - NThreading::WaitAll(taskFutures).GetValueSync(); + JobFactory_->Stop(); if (MainThread_.joinable()) { MainThread_.join(); } diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp index 9726d00d5a..dcd374a2c3 100644 --- a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp +++ b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp @@ -202,7 +202,7 @@ IYtGateway::TPtr TYtRunTool::CreateYtGateway() { Sleep(TDuration::Seconds(3)); return NFmr::ETaskStatus::Completed; } - return NFmr::ETaskStatus::Aborted; + return NFmr::ETaskStatus::Failed; }; // TODO - use function which actually calls Downloader/Uploader based on task params NFmr::TFmrJobFactorySettings settings{.Function=func}; |