aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2025-02-19 18:11:41 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2025-02-19 18:30:12 +0300
commita3f26a347afa5953c2844c2cbe088d1198e733fe (patch)
tree25332166965cd21ced66ba529cab012cb4b28be7
parent52daccf61e2e827114cfb3372071cddaec7974ba (diff)
downloadydb-a3f26a347afa5953c2844c2cbe088d1198e733fe.tar.gz
Intermediate changes
commit_hash:d1545de5448760526e5b01472ffac8185967334a
-rw-r--r--yql/essentials/utils/runnable.h2
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp22
-rw-r--r--yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp23
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp6
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp5
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/interface/ya.make1
-rw-r--r--yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h3
-rw-r--r--yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h3
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp14
-rw-r--r--yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp11
-rw-r--r--yt/yql/tools/ytrun/lib/ytrun_lib.cpp2
11 files changed, 43 insertions, 49 deletions
diff --git a/yql/essentials/utils/runnable.h b/yql/essentials/utils/runnable.h
index 8179aa7496..1c00894a31 100644
--- a/yql/essentials/utils/runnable.h
+++ b/yql/essentials/utils/runnable.h
@@ -1,3 +1,5 @@
+#pragma once
+
#include <util/generic/ptr.h>
namespace NYql {
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
index a934fcc37b..36d7020adc 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp
@@ -70,7 +70,7 @@ auto defaultTaskFunction = [] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic
Sleep(TDuration::Seconds(4));
return ETaskStatus::Completed;
}
- return ETaskStatus::Aborted;
+ return ETaskStatus::Failed;
};
Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
@@ -128,7 +128,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
TFmrJobFactorySettings settings{.NumThreads = 3, .Function = defaultTaskFunction};
auto factory = MakeFmrJobFactory(settings);
- TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
worker->Start();
Sleep(TDuration::Seconds(1));
@@ -146,7 +146,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
}
TFmrJobFactorySettings settings{.NumThreads = 10, .Function = defaultTaskFunction};
auto factory = MakeFmrJobFactory(settings);
- TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
worker->Start();
Sleep(TDuration::Seconds(6));
@@ -184,12 +184,12 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
Sleep(TDuration::Seconds(1));
return ETaskStatus::Completed;
}
- return ETaskStatus::Aborted;
+ return ETaskStatus::Failed;
};
TFmrJobFactorySettings settings{.NumThreads = 10, .Function = func};
auto factory = MakeFmrJobFactory(settings);
- TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
worker->Start();
Sleep(TDuration::Seconds(5));
@@ -211,7 +211,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
TFmrJobFactorySettings settings{.NumThreads = 3, .Function = defaultTaskFunction};
auto factory = MakeFmrJobFactory(settings);
- TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
worker->Start();
@@ -229,7 +229,7 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
TFmrJobFactorySettings settings{.NumThreads = 3, .Function = defaultTaskFunction};
auto factory = MakeFmrJobFactory(settings);
- TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
worker->Start();
@@ -259,11 +259,11 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
return ETaskStatus::Completed;
}
}
- return ETaskStatus::Aborted;
+ return ETaskStatus::Failed;
};
TFmrJobFactorySettings settings{.NumThreads =3, .Function=func};
auto factory = MakeFmrJobFactory(settings);
- TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
TFmrWorkerProxy workerProxy(coordinator, factory, workerSettings);
workerProxy.Start();
@@ -290,12 +290,12 @@ Y_UNIT_TEST_SUITE(FmrCoordinatorTests) {
Sleep(TDuration::Seconds(2));
throw std::runtime_error{"Function crashed"};
}
- return ETaskStatus::Aborted;
+ return ETaskStatus::Failed;
};
TFmrJobFactorySettings settings{.NumThreads = 3, .Function = func};
auto factory = MakeFmrJobFactory(settings);
- TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
worker->Start();
Sleep(TDuration::Seconds(4));
diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
index 9205eae70d..bec951c180 100644
--- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
+++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp
@@ -21,7 +21,7 @@ struct TOperationInfo {
};
struct TIdempotencyKeyInfo {
- TString operationId;
+ TString OperationId;
TInstant OperationCreationTime;
};
@@ -47,13 +47,13 @@ public:
TGuard<TMutex> guard(Mutex_);
TMaybe<TString> IdempotencyKey = request.IdempotencyKey;
if (IdempotencyKey && IdempotencyKeys_.contains(*IdempotencyKey)) {
- auto operationId = IdempotencyKeys_[*IdempotencyKey].operationId;
+ auto operationId = IdempotencyKeys_[*IdempotencyKey].OperationId;
auto& operationInfo = Operations_[operationId];
return NThreading::MakeFuture(TStartOperationResponse(operationInfo.OperationStatus, operationId));
}
auto operationId = GenerateId();
if (IdempotencyKey) {
- IdempotencyKeys_[*IdempotencyKey] = TIdempotencyKeyInfo{.operationId = operationId, .OperationCreationTime=TInstant::Now()};
+ IdempotencyKeys_[*IdempotencyKey] = TIdempotencyKeyInfo{.OperationId = operationId, .OperationCreationTime=TInstant::Now()};
}
TString taskId = GenerateId();
@@ -107,17 +107,16 @@ public:
TGuard<TMutex> guard(Mutex_);
ui32 workerId = request.WorkerId;
- YQL_ENSURE(workerId >= 1 && workerId <= WorkersNum_);
- if (! workerToVolatileId_.contains(workerId)) {
- workerToVolatileId_[workerId] = request.VolatileId;
- } else if (request.VolatileId != workerToVolatileId_[workerId]) {
- workerToVolatileId_[workerId] = request.VolatileId;
+ YQL_ENSURE(workerId >= 0 && workerId < WorkersNum_);
+ if (! WorkerToVolatileId_.contains(workerId)) {
+ WorkerToVolatileId_[workerId] = request.VolatileId;
+ } else if (request.VolatileId != WorkerToVolatileId_[workerId]) {
+ WorkerToVolatileId_[workerId] = request.VolatileId;
for (auto& [taskId, taskInfo]: Tasks_) {
auto taskStatus = Tasks_[taskId].TaskStatus;
auto operationId = Tasks_[taskId].OperationId;
if (taskStatus == ETaskStatus::InProgress) {
TaskToDeleteIds_.insert(taskId); // Task is currently running, send signal to worker to cancel
- TString sessionId = Operations_[operationId].SessionId;
TFmrError error{
.Component = EFmrComponent::Coordinator, .ErrorMessage = "Max retries limit exceeded", .OperationId = operationId};
SetUnfinishedTaskStatus(taskId, ETaskStatus::Failed, error);
@@ -145,7 +144,7 @@ public:
}
for (auto& taskId: TaskToDeleteIds_) {
- SetUnfinishedTaskStatus(taskId, ETaskStatus::Aborted);
+ SetUnfinishedTaskStatus(taskId, ETaskStatus::Failed);
}
return NThreading::MakeFuture(THeartbeatResponse{.TasksToRun = tasksToRun, .TaskToDeleteIds = TaskToDeleteIds_});
}
@@ -159,7 +158,7 @@ private:
auto currentTime = TInstant::Now();
for (auto it = IdempotencyKeys_.begin(); it != IdempotencyKeys_.end();) {
auto operationCreationTime = it->second.OperationCreationTime;
- auto operationId = it->second.operationId;
+ auto operationId = it->second.OperationId;
if (currentTime - operationCreationTime > IdempotencyKeyStoreTime_) {
it = IdempotencyKeys_.erase(it);
if (Operations_.contains(operationId)) {
@@ -229,7 +228,7 @@ private:
TMutex Mutex_;
const ui32 WorkersNum_;
- std::unordered_map<ui32, TString> workerToVolatileId_; // worker id -> volatile id
+ std::unordered_map<ui32, TString> WorkerToVolatileId_; // worker id -> volatile id
const TIntrusivePtr<IRandomProvider> RandomProvider_;
std::thread ClearIdempotencyKeysThread_;
std::atomic<bool> StopCoordinator_;
diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp b/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp
index 89e95debdd..da2706c49c 100644
--- a/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp
+++ b/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp
@@ -18,7 +18,7 @@ Y_UNIT_TEST_SUITE(FmrFactoryTests) {
*operationResults = "operation_result";
return ETaskStatus::Completed;
}
- return ETaskStatus::Aborted;
+ return ETaskStatus::Failed;
};
TFmrJobFactorySettings settings{.NumThreads =3, .Function=func};
auto factory = MakeFmrJobFactory(settings);
@@ -44,7 +44,7 @@ Y_UNIT_TEST_SUITE(FmrFactoryTests) {
return ETaskStatus::Completed;
}
}
- return ETaskStatus::Aborted;
+ return ETaskStatus::Failed;
};
TFmrJobFactorySettings settings{.NumThreads =3, .Function=func};
@@ -56,7 +56,7 @@ Y_UNIT_TEST_SUITE(FmrFactoryTests) {
cancelFlag->store(true);
auto taskResult = futureTaskStatus.GetValueSync();
ETaskStatus taskStatus = taskResult->TaskStatus;
- UNIT_ASSERT_VALUES_EQUAL(taskStatus, ETaskStatus::Aborted);
+ UNIT_ASSERT_VALUES_EQUAL(taskStatus, ETaskStatus::Failed);
UNIT_ASSERT_NO_DIFF(*operationResults, "computing_result");
}
}
diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp
index af154cb442..a82b8b8551 100644
--- a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp
+++ b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp
@@ -40,12 +40,11 @@ public:
return future;
}
-private:
- void Start() {
+ void Start() override {
ThreadPool_ = CreateThreadPool(NumThreads_);
}
- void Stop() {
+ void Stop() override {
ThreadPool_->Stop();
}
diff --git a/yt/yql/providers/yt/fmr/job_factory/interface/ya.make b/yt/yql/providers/yt/fmr/job_factory/interface/ya.make
index ed05358ed3..32ecd264ae 100644
--- a/yt/yql/providers/yt/fmr/job_factory/interface/ya.make
+++ b/yt/yql/providers/yt/fmr/job_factory/interface/ya.make
@@ -6,6 +6,7 @@ SRCS(
PEERDIR(
library/cpp/threading/future
+ yql/essentials/utils
)
YQL_LAST_ABI_VERSION()
diff --git a/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h
index d7c29fc818..4a489a6cad 100644
--- a/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h
+++ b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h
@@ -2,10 +2,11 @@
#include <library/cpp/threading/future/core/future.h>
#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
+#include <yql/essentials/utils/runnable.h>
namespace NYql::NFmr {
-class IFmrJobFactory: public TThrRefBase {
+class IFmrJobFactory: public IRunnable {
public:
using TPtr = TIntrusivePtr<IFmrJobFactory>;
diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
index 9d69d008b2..4e14478472 100644
--- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
+++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h
@@ -21,8 +21,7 @@ enum class ETaskStatus {
Accepted,
InProgress,
Failed,
- Completed,
- Aborted
+ Completed
};
enum class ETaskType {
diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
index c5af15bb3a..4996c932c4 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
+++ b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp
@@ -28,12 +28,12 @@ Y_UNIT_TEST_SUITE(FmrWorkerTests) {
*operationResults = "operation_result";
return ETaskStatus::Completed;
}
- return ETaskStatus::Aborted;
+ return ETaskStatus::Failed;
};
TFmrJobFactorySettings settings{.NumThreads = 3, .Function = func};
auto factory = MakeFmrJobFactory(settings);
- TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
worker->Start();
coordinator->StartOperation(CreateOperationRequest()).GetValueSync();
@@ -56,11 +56,11 @@ Y_UNIT_TEST_SUITE(FmrWorkerTests) {
}
}
*operationResults = "operation_cancelled";
- return ETaskStatus::Aborted;
+ return ETaskStatus::Failed;
};
TFmrJobFactorySettings settings{.NumThreads =3, .Function=func};
auto factory = MakeFmrJobFactory(settings);
- TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
auto worker = MakeFmrWorker(coordinator, factory, workerSettings);
worker->Start();
auto operationId = coordinator->StartOperation(CreateOperationRequest()).GetValueSync().OperationId;
@@ -82,12 +82,12 @@ Y_UNIT_TEST_SUITE(FmrWorkerTests) {
(*operationResult)++;
return ETaskStatus::Completed;
}
- return ETaskStatus::Aborted;
+ return ETaskStatus::Failed;
};
TFmrJobFactorySettings settings{.NumThreads =3, .Function=func};
auto factory = MakeFmrJobFactory(settings);
- TFmrWorkerSettings firstWorkerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)};
- TFmrWorkerSettings secondWorkerSettings{.WorkerId = 2, .RandomProvider = CreateDeterministicRandomProvider(2)};
+ TFmrWorkerSettings firstWorkerSettings{.WorkerId = 0, .RandomProvider = CreateDeterministicRandomProvider(1)};
+ TFmrWorkerSettings secondWorkerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(2)};
auto firstWorker = MakeFmrWorker(coordinator, factory, firstWorkerSettings);
auto secondWorker = MakeFmrWorker(coordinator, factory, secondWorkerSettings);
firstWorker->Start();
diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp
index a1a3a186fe..b5eaaae7f1 100644
--- a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp
+++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp
@@ -1,6 +1,5 @@
#include <library/cpp/threading/future/wait/wait.h>
#include <thread>
-#include <ranges>
#include <util/system/mutex.h>
#include <yql/essentials/utils/yql_panic.h>
#include "yql_yt_worker_impl.h"
@@ -12,7 +11,6 @@ namespace {
struct TFmrWorkerState {
TMutex Mutex;
std::unordered_map<TString, TTaskResult::TPtr> TaskStatuses;
- std::unordered_map<TString, NThreading::TFuture<TTaskResult::TPtr>> TaskFutures;
};
class TFmrWorker: public IFmrWorker {
@@ -20,7 +18,7 @@ public:
TFmrWorker(IFmrCoordinator::TPtr coordinator, IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings)
: Coordinator_(coordinator),
JobFactory_(jobFactory),
- WorkerState_(std::make_shared<TFmrWorkerState>(TMutex(), std::unordered_map<TString, TTaskResult::TPtr>{}, std::unordered_map<TString, NThreading::TFuture<TTaskResult::TPtr>>{})),
+ WorkerState_(std::make_shared<TFmrWorkerState>(TMutex(), std::unordered_map<TString, TTaskResult::TPtr>{})),
StopWorker_(false),
RandomProvider_(settings.RandomProvider),
WorkerId_(settings.WorkerId),
@@ -86,12 +84,9 @@ public:
with_lock(state->Mutex) {
YQL_ENSURE(state->TaskStatuses.contains(task->TaskId));
state->TaskStatuses[task->TaskId] = finalTaskStatus;
- state->TaskFutures.erase(task->TaskId);
}
}
});
- YQL_ENSURE(!WorkerState_->TaskFutures.contains(taskId));
- WorkerState_->TaskFutures[taskId] = future;
}
}
Sleep(TimeToSleepBetweenRequests_);
@@ -107,10 +102,8 @@ public:
taskInfo.second->store(true);
}
StopWorker_ = true;
- auto futuresView = std::views::values(WorkerState_->TaskFutures);
- taskFutures = std::vector<NThreading::TFuture<TTaskResult::TPtr>>{futuresView.begin(), futuresView.end()};
}
- NThreading::WaitAll(taskFutures).GetValueSync();
+ JobFactory_->Stop();
if (MainThread_.joinable()) {
MainThread_.join();
}
diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
index 9726d00d5a..dcd374a2c3 100644
--- a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
+++ b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp
@@ -202,7 +202,7 @@ IYtGateway::TPtr TYtRunTool::CreateYtGateway() {
Sleep(TDuration::Seconds(3));
return NFmr::ETaskStatus::Completed;
}
- return NFmr::ETaskStatus::Aborted;
+ return NFmr::ETaskStatus::Failed;
}; // TODO - use function which actually calls Downloader/Uploader based on task params
NFmr::TFmrJobFactorySettings settings{.Function=func};