diff options
author | cdzyura171 <cdzyura171@yandex-team.com> | 2025-02-10 11:53:19 +0300 |
---|---|---|
committer | cdzyura171 <cdzyura171@yandex-team.com> | 2025-02-10 12:12:39 +0300 |
commit | 24f0f77bb308a9675f1aae84708724718f12282f (patch) | |
tree | 8a574286a14121e448609918089730004528ebeb | |
parent | b2fa5f3d3be16912e911d1c127db0c5a7b22c1a2 (diff) | |
download | ydb-24f0f77bb308a9675f1aae84708724718f12282f.tar.gz |
Use coordinator and worker in fmrGateway
use coordinator and worker in fmrGateway
commit_hash:a0977459dcbe041e5330c8959152be40fe18eea6
35 files changed, 1710 insertions, 11 deletions
diff --git a/yql/essentials/tools/yql_facade_run/yql_facade_run.cpp b/yql/essentials/tools/yql_facade_run/yql_facade_run.cpp index 4cee954bd8..5a2dc645a6 100644 --- a/yql/essentials/tools/yql_facade_run/yql_facade_run.cpp +++ b/yql/essentials/tools/yql_facade_run/yql_facade_run.cpp @@ -281,13 +281,13 @@ void TFacadeRunOptions::Parse(int argc, const char *argv[]) { opts.AddLongOption("scan-udfs", "Scan specified udfs with external udf-resolver to use static function registry").NoArgument().SetFlag(&ScanUdfs); opts.AddLongOption("parse-only", "Parse program and exit").NoArgument().StoreValue(&Mode, ERunMode::Parse); - opts.AddLongOption("compile-only", "Compiled program and exit").NoArgument().StoreValue(&Mode, ERunMode::Compile); + opts.AddLongOption("compile-only", "Compile program and exit").NoArgument().StoreValue(&Mode, ERunMode::Compile); opts.AddLongOption("validate", "Validate program and exit").NoArgument().StoreValue(&Mode, ERunMode::Validate); opts.AddLongOption("lineage", "Calculate program lineage and exit").NoArgument().StoreValue(&Mode, ERunMode::Lineage); - opts.AddLongOption('O',"optimize", "Optimize program and exir").NoArgument().StoreValue(&Mode, ERunMode::Optimize); + opts.AddLongOption('O',"optimize", "Optimize program and exit").NoArgument().StoreValue(&Mode, ERunMode::Optimize); opts.AddLongOption('D', "discover", "Discover tables in the program and exit").NoArgument().StoreValue(&Mode, ERunMode::Discover); opts.AddLongOption("peephole", "Perform peephole program optimization and exit").NoArgument().StoreValue(&Mode, ERunMode::Peephole); - opts.AddLongOption('R',"run", "Run progrum (use by default)").NoArgument().StoreValue(&Mode, ERunMode::Run); + opts.AddLongOption('R',"run", "Run program (use by default)").NoArgument().StoreValue(&Mode, ERunMode::Run); opts.AddLongOption('L', "show-log", "Show transformation log").Optional().NoArgument().SetFlag(&ShowLog); opts.AddLongOption('v', "verbosity", "Log verbosity level").Optional().RequiredArgument("LEVEL").StoreResult(&Verbosity); diff --git a/yql/essentials/utils/log/log_component.h b/yql/essentials/utils/log/log_component.h index 5cf9357ef4..be1df13863 100644 --- a/yql/essentials/utils/log/log_component.h +++ b/yql/essentials/utils/log/log_component.h @@ -83,7 +83,7 @@ struct EComponentHelpers { case EComponent::ProviderGeneric: return TStringBuf("generic"); case EComponent::ProviderPg: return TStringBuf("PG"); case EComponent::ProviderPure: return TStringBuf("pure"); - case EComponent::FastMapReduce: return TStringBuf("fast map reduce"); + case EComponent::FastMapReduce: return TStringBuf("FMR"); case EComponent::ProviderYtflow: return TStringBuf("YTFLOW"); default: ythrow yexception() << "invalid log component value: " @@ -119,7 +119,7 @@ struct EComponentHelpers { if (str == TStringBuf("generic")) return EComponent::ProviderGeneric; if (str == TStringBuf("PG")) return EComponent::ProviderPg; if (str == TStringBuf("pure")) return EComponent::ProviderPure; - if (str == TStringBuf("fast map reduce")) return EComponent::FastMapReduce; + if (str == TStringBuf("FMR")) return EComponent::FastMapReduce; if (str == TStringBuf("YTFLOW")) return EComponent::ProviderYtflow; ythrow yexception() << "unknown log component: '" << str << '\''; } diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make b/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make new file mode 100644 index 0000000000..a661e2cb18 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make @@ -0,0 +1,16 @@ +UNITTEST() + +SRCS( + yql_yt_coordinator_ut.cpp +) + +PEERDIR( + library/cpp/yt/assert + yt/yql/providers/yt/fmr/coordinator/impl + yt/yql/providers/yt/fmr/job_factory/impl + yt/yql/providers/yt/fmr/worker/impl +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp new file mode 100644 index 0000000000..9ba6dfed78 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ut/yql_yt_coordinator_ut.cpp @@ -0,0 +1,314 @@ +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/threading/future/async.h> +#include <util/stream/output.h> +#include <util/string/cast.h> +#include <util/system/mutex.h> +#include <util/thread/pool.h> +#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h> +#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h> +#include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h> + +namespace NYql { + +class TFmrWorkerProxy: public IFmrWorker { +public: + TFmrWorkerProxy(IFmrCoordinator::TPtr coordinator, IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings): + Coordinator_(coordinator), JobFactory_(jobFactory), WorkerSettings_(settings), WorkerCreationNum_(1) + { + Worker_ = MakeFmrWorker(Coordinator_, JobFactory_, WorkerSettings_); + } + + void ResetWorker() { + ++WorkerCreationNum_; + WorkerSettings_.RandomProvider = CreateDeterministicRandomProvider(WorkerCreationNum_); + + Stop(); + Worker_ = MakeFmrWorker(Coordinator_, JobFactory_, WorkerSettings_); + Start(); + } + + void Start() { + Worker_->Start(); + } + + void Stop() { + Worker_->Stop(); + } + +private: + IFmrCoordinator::TPtr Coordinator_; + IFmrJobFactory::TPtr JobFactory_; + IFmrWorker::TPtr Worker_; + TFmrWorkerSettings WorkerSettings_; + int WorkerCreationNum_; +}; + + +TDownloadTaskParams downloadTaskParams{ + .Input = TYtTableRef{"Path","Cluster","TransactionId"}, + .Output = TFmrTableRef{"TableId"} +}; + +TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Download, TTaskParams taskParams = downloadTaskParams) { + return TStartOperationRequest{.TaskType = taskType, .TaskParams = taskParams, .SessionId = "SessionId", .IdempotencyKey = "IdempotencyKey"}; +} + +std::vector<TStartOperationRequest> CreateSeveralOperationRequests( + ETaskType taskType = ETaskType::Download, TTaskParams taskParams = downloadTaskParams, int numRequests = 10) +{ + std::vector<TStartOperationRequest> startOperationRequests(numRequests); + for (int i = 0; i < numRequests; ++i) { + startOperationRequests[i] = TStartOperationRequest{ + .TaskType = taskType, .TaskParams = taskParams, .IdempotencyKey = "IdempotencyKey_" + ToString(i) + }; + } + return startOperationRequests; +} + +auto defaultTaskFunction = [] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { + while (!cancelFlag->load()) { + Sleep(TDuration::Seconds(4)); + return ETaskStatus::Completed; + } + return ETaskStatus::Aborted; +}; + +Y_UNIT_TEST_SUITE(FmrCoordinatorTests) { + Y_UNIT_TEST(StartOperation) { + auto coordinator = MakeFmrCoordinator(); + auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync(); + auto status = startOperationResponse.Status; + UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Accepted); + } + Y_UNIT_TEST(RetryAcceptedOperation) { + auto coordinator = MakeFmrCoordinator(); + auto downloadRequest = CreateOperationRequest(); + auto firstResponse = coordinator->StartOperation(downloadRequest).GetValueSync(); + auto firstOperationId = firstResponse.OperationId; + auto sameRequest = coordinator->StartOperation(downloadRequest); + auto secondResponse = sameRequest.GetValueSync(); + auto secondOperationId = secondResponse.OperationId; + auto status = secondResponse.Status; + UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Accepted); + UNIT_ASSERT_NO_DIFF(firstOperationId, secondOperationId); + } + + Y_UNIT_TEST(DeleteNonexistentOperation) { + auto coordinator = MakeFmrCoordinator(); + auto deleteOperationResponse = coordinator->DeleteOperation({"delete_operation_id"}).GetValueSync(); + EOperationStatus status = deleteOperationResponse.Status; + UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::NotFound); + } + Y_UNIT_TEST(DeleteOperationBeforeSendToWorker) { + auto coordinator = MakeFmrCoordinator(); + auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync(); + TString operationId = startOperationResponse.OperationId; + auto deleteOperationResponse = coordinator->DeleteOperation({operationId}).GetValueSync(); + EOperationStatus status = deleteOperationResponse.Status; + UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Aborted); + } + Y_UNIT_TEST(GetNonexistentOperation) { + auto coordinator = MakeFmrCoordinator(); + auto getOperationResponse = coordinator->GetOperation({"get_operation_id"}).GetValueSync(); + EOperationStatus status = getOperationResponse.Status; + UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::NotFound); + } + Y_UNIT_TEST(GetAcceptedOperationStatus) { + auto coordinator = MakeFmrCoordinator(); + auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync(); + TString operationId = startOperationResponse.OperationId; + auto getOperationResponse = coordinator->GetOperation({operationId}).GetValueSync(); + EOperationStatus status = getOperationResponse.Status; + UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Accepted); + } + Y_UNIT_TEST(GetRunningOperationStatus) { + auto coordinator = MakeFmrCoordinator(); + auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync(); + TString operationId = startOperationResponse.OperationId; + + TFmrJobFactorySettings settings{.NumThreads = 3, .Function = defaultTaskFunction}; + auto factory = MakeFmrJobFactory(settings); + TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; + auto worker = MakeFmrWorker(coordinator, factory, workerSettings); + worker->Start(); + Sleep(TDuration::Seconds(1)); + auto getOperationResponse = coordinator->GetOperation({operationId}).GetValueSync(); + EOperationStatus status = getOperationResponse.Status; + UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::InProgress); + } + Y_UNIT_TEST(GetCompletedOperationStatuses) { + auto coordinator = MakeFmrCoordinator(); + auto startOperationRequests = CreateSeveralOperationRequests(); + std::vector<TString> operationIds; + for (auto& request: startOperationRequests) { + auto startOperationResponse = coordinator->StartOperation(request).GetValueSync(); + operationIds.emplace_back(startOperationResponse.OperationId); + } + TFmrJobFactorySettings settings{.NumThreads = 10, .Function = defaultTaskFunction}; + auto factory = MakeFmrJobFactory(settings); + TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; + auto worker = MakeFmrWorker(coordinator, factory, workerSettings); + worker->Start(); + Sleep(TDuration::Seconds(6)); + for (auto& operationId: operationIds) { + auto getOperationResponse = coordinator->GetOperation({operationId}).GetValueSync(); + EOperationStatus status = getOperationResponse.Status; + UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Completed); + } + } + Y_UNIT_TEST(GetCompletedAndFailedOperationStatuses) { + auto coordinator = MakeFmrCoordinator(); + auto downloadOperationRequests = CreateSeveralOperationRequests(); + std::vector<TString> downloadOperationIds; + for (auto& request: downloadOperationRequests) { + auto startOperationResponse = coordinator->StartOperation(request).GetValueSync(); + downloadOperationIds.emplace_back(startOperationResponse.OperationId); + } + auto uploadOperationRequest = CreateOperationRequest(ETaskType::Upload, TUploadTaskParams{}); + auto uploadOperationResponse = coordinator->StartOperation(uploadOperationRequest).GetValueSync(); + auto uploadOperationId = uploadOperationResponse.OperationId; + + auto func = [&] (TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) { + while (! cancelFlag->load()) { + Sleep(TDuration::Seconds(1)); + ETaskStatus taskStatus = std::visit([] (auto&& taskParams) { + using T = std::decay_t<decltype(taskParams)>; + if constexpr (std::is_same_v<T, TUploadTaskParams>) { + return ETaskStatus::Failed; + } + return ETaskStatus::Completed; + }, task->TaskParams); + if (taskStatus == ETaskStatus::Failed) { + return taskStatus; + } + Sleep(TDuration::Seconds(1)); + return ETaskStatus::Completed; + } + return ETaskStatus::Aborted; + }; + + TFmrJobFactorySettings settings{.NumThreads = 10, .Function = func}; + auto factory = MakeFmrJobFactory(settings); + TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; + auto worker = MakeFmrWorker(coordinator, factory, workerSettings); + worker->Start(); + Sleep(TDuration::Seconds(5)); + + for (auto& operationId: downloadOperationIds) { + auto getDownloadOperationResponse = coordinator->GetOperation({operationId}).GetValueSync(); + EOperationStatus status = getDownloadOperationResponse.Status; + UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Completed); + } + auto getUploadOperationResponse = coordinator->GetOperation({uploadOperationId}).GetValueSync(); + EOperationStatus uploadStatus = getUploadOperationResponse.Status; + UNIT_ASSERT_VALUES_EQUAL(uploadStatus, EOperationStatus::Failed); + } + Y_UNIT_TEST(RetryRunningOperation) { + auto coordinator = MakeFmrCoordinator(); + auto downloadRequest = CreateOperationRequest(); + auto startOperationResponse = coordinator->StartOperation(downloadRequest).GetValueSync(); + TString firstOperationId = startOperationResponse.OperationId; + + TFmrJobFactorySettings settings{.NumThreads = 3, .Function = defaultTaskFunction}; + auto factory = MakeFmrJobFactory(settings); + TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; + auto worker = MakeFmrWorker(coordinator, factory, workerSettings); + worker->Start(); + + Sleep(TDuration::Seconds(1)); + auto secondStartOperationResponse = coordinator->StartOperation(downloadRequest).GetValueSync(); + EOperationStatus status = secondStartOperationResponse.Status; + TString secondOperationId = secondStartOperationResponse.OperationId; + UNIT_ASSERT_NO_DIFF(firstOperationId, secondOperationId); + UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::InProgress); + } + Y_UNIT_TEST(RetryRunningOperationAfterIdempotencyKeyClear) { + TFmrCoordinatorSettings coordinatorSettings{ + .WorkersNum = 1, .RandomProvider = CreateDeterministicRandomProvider(2), .IdempotencyKeyStoreTime = TDuration::Seconds(1)}; + auto coordinator = MakeFmrCoordinator(coordinatorSettings); + + TFmrJobFactorySettings settings{.NumThreads = 3, .Function = defaultTaskFunction}; + auto factory = MakeFmrJobFactory(settings); + TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; + auto worker = MakeFmrWorker(coordinator, factory, workerSettings); + worker->Start(); + + auto downloadRequest = CreateOperationRequest(); + auto startOperationResponse = coordinator->StartOperation(downloadRequest).GetValueSync(); + TString firstOperationId = startOperationResponse.OperationId; + + Sleep(TDuration::Seconds(2)); + auto secondStartOperationResponse = coordinator->StartOperation(downloadRequest).GetValueSync(); + EOperationStatus secondOperationStatus = secondStartOperationResponse.Status; + TString secondOperationId = secondStartOperationResponse.OperationId; + auto getFirstOperationResponse = coordinator->GetOperation({firstOperationId}).GetValueSync(); + EOperationStatus firstOperationStatus = getFirstOperationResponse.Status; + + UNIT_ASSERT_VALUES_UNEQUAL(firstOperationId, secondOperationId); + UNIT_ASSERT_VALUES_EQUAL(firstOperationStatus, EOperationStatus::InProgress); + UNIT_ASSERT_VALUES_EQUAL(secondOperationStatus, EOperationStatus::Accepted); + } + Y_UNIT_TEST(CancelTasksAfterVolatileIdReload) { + auto coordinator = MakeFmrCoordinator(); + auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { + int numIterations = 0; + while (!cancelFlag->load()) { + Sleep(TDuration::Seconds(1)); + ++numIterations; + if (numIterations == 100) { + return ETaskStatus::Completed; + } + } + return ETaskStatus::Aborted; + }; + TFmrJobFactorySettings settings{.NumThreads =3, .Function=func}; + auto factory = MakeFmrJobFactory(settings); + TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; + TFmrWorkerProxy workerProxy(coordinator, factory, workerSettings); + + workerProxy.Start(); + auto operationId = coordinator->StartOperation(CreateOperationRequest()).GetValueSync().OperationId; + Sleep(TDuration::Seconds(2)); + workerProxy.ResetWorker(); + Sleep(TDuration::Seconds(5)); + workerProxy.Stop(); + auto getOperationResult = coordinator->GetOperation({operationId}).GetValueSync(); + auto getOperationStatus = getOperationResult.Status; + UNIT_ASSERT_VALUES_EQUAL(getOperationStatus, EOperationStatus::Failed); + auto error = getOperationResult.ErrorMessages[0]; + UNIT_ASSERT_VALUES_EQUAL(error.Component, EFmrComponent::Coordinator); + UNIT_ASSERT_NO_DIFF(error.ErrorMessage, "Max retries limit exceeded"); + UNIT_ASSERT_NO_DIFF(*error.OperationId, operationId); + } + Y_UNIT_TEST(HandleJobErrors) { + auto coordinator = MakeFmrCoordinator(); + auto startOperationResponse = coordinator->StartOperation(CreateOperationRequest()).GetValueSync(); + TString operationId = startOperationResponse.OperationId; + + auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { + while (! cancelFlag->load()) { + Sleep(TDuration::Seconds(2)); + throw std::runtime_error{"Function crashed"}; + } + return ETaskStatus::Aborted; + }; + + TFmrJobFactorySettings settings{.NumThreads = 3, .Function = func}; + auto factory = MakeFmrJobFactory(settings); + TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; + auto worker = MakeFmrWorker(coordinator, factory, workerSettings); + worker->Start(); + Sleep(TDuration::Seconds(4)); + auto getOperationResponse = coordinator->GetOperation({operationId}).GetValueSync(); + + EOperationStatus status = getOperationResponse.Status; + std::vector<TFmrError> errorMessages = getOperationResponse.ErrorMessages; + UNIT_ASSERT_VALUES_EQUAL(status, EOperationStatus::Failed); + UNIT_ASSERT(errorMessages.size() == 1); + auto& error = errorMessages[0]; + UNIT_ASSERT_NO_DIFF(error.ErrorMessage, "Function crashed"); + UNIT_ASSERT_VALUES_EQUAL(error.Component, EFmrComponent::Job); + } +} + +} // namspace NYql diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/ya.make b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make new file mode 100644 index 0000000000..11d323d128 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/impl/ya.make @@ -0,0 +1,21 @@ +LIBRARY() + +SRCS( + yql_yt_coordinator_impl.cpp +) + +PEERDIR( + library/cpp/random_provider + library/cpp/threading/future + yt/yql/providers/yt/fmr/coordinator/interface + yql/essentials/utils/log + yql/essentials/utils +) + +YQL_LAST_ABI_VERSION() + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp new file mode 100644 index 0000000000..b04814c354 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp @@ -0,0 +1,246 @@ +#include <thread> +#include <yql/essentials/utils/log/log.h> +#include <yql/essentials/utils/yql_panic.h> +#include "yql_yt_coordinator_impl.h" + +namespace NYql { + +namespace { + +struct TCoordinatorTaskInfo { + TTask::TPtr Task; + ETaskStatus TaskStatus; + TString OperationId; +}; + +struct TOperationInfo { + std::unordered_set<TString> TaskIds; // for now each operation consists only of one task, until paritioner is implemented + EOperationStatus OperationStatus; + std::vector<TFmrError> ErrorMessages; + TString SessionId; +}; + +struct TIdempotencyKeyInfo { + TString operationId; + TInstant OperationCreationTime; +}; + +class TFmrCoordinator: public IFmrCoordinator { +public: + TFmrCoordinator(const TFmrCoordinatorSettings& settings) + : WorkersNum_(settings.WorkersNum), + RandomProvider_(settings.RandomProvider), + StopCoordinator_(false), + TimeToSleepBetweenClearKeyRequests_(settings.TimeToSleepBetweenClearKeyRequests), + IdempotencyKeyStoreTime_(settings.IdempotencyKeyStoreTime) + { + StartClearingIdempotencyKeys(); + } + + ~TFmrCoordinator() { + StopCoordinator_ = true; + ClearIdempotencyKeysThread_.join(); + } + + NThreading::TFuture<TStartOperationResponse> StartOperation(const TStartOperationRequest& request) override { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(request.SessionId); + TGuard<TMutex> guard(Mutex_); + TMaybe<TString> IdempotencyKey = request.IdempotencyKey; + if (IdempotencyKey && IdempotencyKeys_.contains(*IdempotencyKey)) { + auto operationId = IdempotencyKeys_[*IdempotencyKey].operationId; + auto& operationInfo = Operations_[operationId]; + return NThreading::MakeFuture(TStartOperationResponse(operationInfo.OperationStatus, operationId)); + } + auto operationId = GenerateId(); + if (IdempotencyKey) { + IdempotencyKeys_[*IdempotencyKey] = TIdempotencyKeyInfo{.operationId = operationId, .OperationCreationTime=TInstant::Now()}; + } + + TString taskId = GenerateId(); + TTask::TPtr createdTask = MakeTask(request.TaskType, taskId, request.TaskParams, request.SessionId); + + Tasks_[taskId] = TCoordinatorTaskInfo{.Task = createdTask, .TaskStatus = ETaskStatus::Accepted, .OperationId = operationId}; + + Operations_[operationId] = {.TaskIds = {taskId}, .OperationStatus = EOperationStatus::Accepted, .SessionId = request.SessionId}; + YQL_CLOG(DEBUG, FastMapReduce) << "Starting operation with id " << operationId; + return NThreading::MakeFuture(TStartOperationResponse(EOperationStatus::Accepted, operationId)); + } + + NThreading::TFuture<TGetOperationResponse> GetOperation(const TGetOperationRequest& request) override { + TGuard<TMutex> guard(Mutex_); + auto operationId = request.OperationId; + if (!Operations_.contains(operationId)) { + return NThreading::MakeFuture(TGetOperationResponse(EOperationStatus::NotFound)); + } + YQL_LOG_CTX_ROOT_SESSION_SCOPE(Operations_[operationId].SessionId); + YQL_CLOG(DEBUG, FastMapReduce) << "Getting operation status with id " << operationId; + auto& operationInfo = Operations_[operationId]; + auto operationStatus = operationInfo.OperationStatus; + auto errorMessages = operationInfo.ErrorMessages; + return NThreading::MakeFuture(TGetOperationResponse(operationStatus, errorMessages)); + } + + NThreading::TFuture<TDeleteOperationResponse> DeleteOperation(const TDeleteOperationRequest& request) override { + TGuard<TMutex> guard(Mutex_); + auto operationId = request.OperationId; + if (!Operations_.contains(operationId)) { + return NThreading::MakeFuture(TDeleteOperationResponse(EOperationStatus::NotFound)); + } + YQL_LOG_CTX_ROOT_SESSION_SCOPE(Operations_[operationId].SessionId); + YQL_CLOG(DEBUG, FastMapReduce) << "Deleting operation with id " << operationId; + auto taskIds = Operations_[operationId].TaskIds; + YQL_ENSURE(taskIds.size() == 1); + auto taskId = *taskIds.begin(); + YQL_ENSURE(Tasks_.contains(taskId)); + + auto taskStatus = Tasks_[taskId].TaskStatus; + if (taskStatus == ETaskStatus::InProgress) { + TaskToDeleteIds_.insert(taskId); // Task is currently running, send signal to worker to cancel + } else { + ClearTask(taskId); // Task either hasn't begun running or finished, remove info + } + + return NThreading::MakeFuture(TDeleteOperationResponse(EOperationStatus::Aborted)); + } + + NThreading::TFuture<THeartbeatResponse> SendHeartbeatResponse(const THeartbeatRequest& request) override { + TGuard<TMutex> guard(Mutex_); + + ui32 workerId = request.WorkerId; + YQL_ENSURE(workerId >= 1 && workerId <= WorkersNum_); + if (! workerToVolatileId_.contains(workerId)) { + workerToVolatileId_[workerId] = request.VolatileId; + } else if (request.VolatileId != workerToVolatileId_[workerId]) { + workerToVolatileId_[workerId] = request.VolatileId; + for (auto& [taskId, taskInfo]: Tasks_) { + auto taskStatus = Tasks_[taskId].TaskStatus; + auto operationId = Tasks_[taskId].OperationId; + if (taskStatus == ETaskStatus::InProgress) { + TaskToDeleteIds_.insert(taskId); // Task is currently running, send signal to worker to cancel + TString sessionId = Operations_[operationId].SessionId; + TFmrError error{ + .Component = EFmrComponent::Coordinator, .ErrorMessage = "Max retries limit exceeded", .OperationId = operationId}; + SetUnfinishedTaskStatus(taskId, ETaskStatus::Failed, error); + } + } + } + + for (auto& requestTaskState: request.TaskStates) { + auto taskId = requestTaskState->TaskId; + YQL_ENSURE(Tasks_.contains(taskId)); + auto taskStatus = requestTaskState->TaskStatus; + YQL_ENSURE(taskStatus != ETaskStatus::Accepted); + SetUnfinishedTaskStatus(taskId, taskStatus, requestTaskState->TaskErrorMessage); + if (TaskToDeleteIds_.contains(taskId) && Tasks_[taskId].TaskStatus != ETaskStatus::InProgress) { + ClearTask(taskId); // Task finished, so we don't need to cancel it, just remove info + } + } + + std::vector<TTask::TPtr> tasksToRun; + for (auto& taskToRunInfo: Tasks_) { + if (taskToRunInfo.second.TaskStatus == ETaskStatus::Accepted) { + SetUnfinishedTaskStatus(taskToRunInfo.first, ETaskStatus::InProgress); + tasksToRun.emplace_back(taskToRunInfo.second.Task); + } + } + + for (auto& taskId: TaskToDeleteIds_) { + SetUnfinishedTaskStatus(taskId, ETaskStatus::Aborted); + } + return NThreading::MakeFuture(THeartbeatResponse{.TasksToRun = tasksToRun, .TaskToDeleteIds = TaskToDeleteIds_}); + } + +private: + + void StartClearingIdempotencyKeys() { + auto ClearIdempotencyKeysFunc = [&] () { + while (!StopCoordinator_) { + with_lock(Mutex_) { + auto currentTime = TInstant::Now(); + for (auto it = IdempotencyKeys_.begin(); it != IdempotencyKeys_.end();) { + auto operationCreationTime = it->second.OperationCreationTime; + auto operationId = it->second.operationId; + if (currentTime - operationCreationTime > IdempotencyKeyStoreTime_) { + it = IdempotencyKeys_.erase(it); + if (Operations_.contains(operationId)) { + auto& operationInfo = Operations_[operationId]; + auto operationStatus = operationInfo.OperationStatus; + auto& taskIds = operationInfo.TaskIds; + YQL_ENSURE(taskIds.size() == 1); + auto taskId = *operationInfo.TaskIds.begin(); + if (operationStatus != EOperationStatus::Accepted && operationStatus != EOperationStatus::InProgress) { + ClearTask(taskId); + } + } + } else { + ++it; + } + } + } + Sleep(TimeToSleepBetweenClearKeyRequests_); + } + }; + ClearIdempotencyKeysThread_ = std::thread(ClearIdempotencyKeysFunc); + } + + TString GenerateId() { + return GetGuidAsString(RandomProvider_->GenGuid()); + } + + void ClearTask(const TString& taskId) { + YQL_ENSURE(Tasks_.contains(taskId)); + auto& taskInfo = Tasks_[taskId]; + TaskToDeleteIds_.erase(taskId); + Operations_.erase(taskInfo.OperationId); + Tasks_.erase(taskId); + } + + void SetUnfinishedTaskStatus(const TString& taskId, ETaskStatus newTaskStatus, const TMaybe<TFmrError>& taskErrorMessage = Nothing()) { + auto& taskInfo = Tasks_[taskId]; + YQL_ENSURE(Operations_.contains(taskInfo.OperationId)); + auto& operationInfo = Operations_[taskInfo.OperationId]; + if (taskInfo.TaskStatus != ETaskStatus::Accepted && taskInfo.TaskStatus != ETaskStatus::InProgress) { + return; + } + taskInfo.TaskStatus = newTaskStatus; + operationInfo.OperationStatus = GetOperationStatus(taskInfo.OperationId); + if (taskErrorMessage) { + auto& errorMessages = operationInfo.ErrorMessages; + errorMessages.emplace_back(*taskErrorMessage); + } + } + + EOperationStatus GetOperationStatus(const TString& operationId) { + if (! Operations_.contains(operationId)) { + return EOperationStatus::NotFound; + } + std::unordered_set<TString> taskIds = Operations_[operationId].TaskIds; + YQL_ENSURE(taskIds.size() == 1); + + auto taskId = *taskIds.begin(); + ETaskStatus taskStatus = Tasks_[taskId].TaskStatus; + return static_cast<EOperationStatus>(taskStatus); + } + + std::unordered_map<TString, TCoordinatorTaskInfo> Tasks_; // TaskId -> current info about it + std::unordered_set<TString> TaskToDeleteIds_; // TaskIds we want to pass to worker for deletion + std::unordered_map<TString, TOperationInfo> Operations_; // OperationId -> current info about it + std::unordered_map<TString, TIdempotencyKeyInfo> IdempotencyKeys_; // IdempotencyKey -> current info about it + + TMutex Mutex_; + const ui32 WorkersNum_; + std::unordered_map<ui32, TString> workerToVolatileId_; // worker id -> volatile id + const TIntrusivePtr<IRandomProvider> RandomProvider_; + std::thread ClearIdempotencyKeysThread_; + std::atomic<bool> StopCoordinator_; + TDuration TimeToSleepBetweenClearKeyRequests_; + TDuration IdempotencyKeyStoreTime_; +}; + +} // namespace + +IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings) { + return MakeIntrusive<TFmrCoordinator>(settings); +} + +} // namespace NYql diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h new file mode 100644 index 0000000000..39ff6b1ae6 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h @@ -0,0 +1,18 @@ +#include <library/cpp/random_provider/random_provider.h> +#include <util/system/mutex.h> +#include <util/system/guard.h> +#include <util/generic/queue.h> +#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h> + +namespace NYql { + +struct TFmrCoordinatorSettings { + ui32 WorkersNum; // Not supported yet + TIntrusivePtr<IRandomProvider> RandomProvider; + TDuration IdempotencyKeyStoreTime = TDuration::Seconds(10); + TDuration TimeToSleepBetweenClearKeyRequests = TDuration::Seconds(1); +}; + +IFmrCoordinator::TPtr MakeFmrCoordinator(const TFmrCoordinatorSettings& settings = {.WorkersNum = 1, .RandomProvider = CreateDeterministicRandomProvider(2)}); + +} // namspace NYql diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/ya.make b/yt/yql/providers/yt/fmr/coordinator/interface/ya.make new file mode 100644 index 0000000000..e6f5982117 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/interface/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + yql_yt_coordinator.cpp +) + +PEERDIR( + library/cpp/threading/future + yt/yql/providers/yt/fmr/request_options +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.cpp b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.cpp new file mode 100644 index 0000000000..a4b91e2bd2 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.cpp @@ -0,0 +1 @@ +#include "yql_yt_coordinator.h" diff --git a/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h new file mode 100644 index 0000000000..306ccd7db7 --- /dev/null +++ b/yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h @@ -0,0 +1,68 @@ +#pragma once + +#include <library/cpp/threading/future/core/future.h> +#include <util/datetime/base.h> + +#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> + +namespace NYql { + +struct THeartbeatRequest { + ui32 WorkerId; + TString VolatileId; + std::vector<TTaskState::TPtr> TaskStates; + TStatistics Statistics; +}; +// Worker sends requests in loop or long polling + +struct THeartbeatResponse { + std::vector<TTask::TPtr> TasksToRun; + std::unordered_set<TString> TaskToDeleteIds; +}; + +struct TStartOperationRequest { + ETaskType TaskType; + TTaskParams TaskParams; + TString SessionId; + TMaybe<TString> IdempotencyKey = Nothing(); + ui32 NumRetries = 1; // Not supported yet +}; + +struct TStartOperationResponse { + EOperationStatus Status; + TString OperationId; +}; + +struct TGetOperationRequest { + TString OperationId; +}; + +struct TGetOperationResponse { + EOperationStatus Status; + std::vector<TFmrError> ErrorMessages = {}; +}; + +struct TDeleteOperationRequest { + TString OperationId; +}; + +struct TDeleteOperationResponse { + EOperationStatus Status; +}; + +class IFmrCoordinator: public TThrRefBase { +public: + using TPtr = TIntrusivePtr<IFmrCoordinator>; + + virtual ~IFmrCoordinator() = default; + + virtual NThreading::TFuture<TStartOperationResponse> StartOperation(const TStartOperationRequest& request) = 0; + + virtual NThreading::TFuture<TGetOperationResponse> GetOperation(const TGetOperationRequest& request) = 0; + + virtual NThreading::TFuture<TDeleteOperationResponse> DeleteOperation(const TDeleteOperationRequest& request) = 0; + + virtual NThreading::TFuture<THeartbeatResponse> SendHeartbeatResponse(const THeartbeatRequest& request) = 0; +}; + +} // namspace NYql diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make b/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make new file mode 100644 index 0000000000..597cdac1a4 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job_factory/impl/ut/ya.make @@ -0,0 +1,15 @@ +UNITTEST() + +SRCS( + yql_yt_job_factory_ut.cpp +) + +PEERDIR( + library/cpp/yt/assert + yt/yql/providers/yt/fmr/job_factory/interface + yt/yql/providers/yt/fmr/job_factory/impl +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp b/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp new file mode 100644 index 0000000000..89369f7e60 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job_factory/impl/ut/yql_yt_job_factory_ut.cpp @@ -0,0 +1,64 @@ +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/threading/future/async.h> +#include <util/stream/output.h> +#include <util/string/cast.h> +#include <util/system/mutex.h> +#include <util/thread/pool.h> +#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h> +#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h> + +namespace NYql { + +Y_UNIT_TEST_SUITE(FmrFactoryTests) { + Y_UNIT_TEST(GetSuccessfulTaskResult) { + auto operationResults = std::make_shared<TString>("no_result_yet"); + auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { + while (! cancelFlag->load()) { + Sleep(TDuration::Seconds(1)); + *operationResults = "operation_result"; + return ETaskStatus::Completed; + } + return ETaskStatus::Aborted; + }; + TFmrJobFactorySettings settings{.NumThreads =3, .Function=func}; + auto factory = MakeFmrJobFactory(settings); + auto cancelFlag = std::make_shared<std::atomic<bool>>(false); + + auto futureTaskStatus = factory->StartJob(nullptr, cancelFlag); + auto taskResult = futureTaskStatus.GetValueSync(); + ETaskStatus taskStatus = taskResult->TaskStatus; + + UNIT_ASSERT_VALUES_EQUAL(taskStatus, ETaskStatus::Completed); + UNIT_ASSERT_NO_DIFF(*operationResults, "operation_result"); + } + Y_UNIT_TEST(CancelTask) { + auto operationResults = std::make_shared<TString>("no_result_yet"); + auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { + int numIterations = 0; + *operationResults = "computing_result"; + while (! cancelFlag->load()) { + Sleep(TDuration::Seconds(1)); + ++numIterations; + if (numIterations == 100) { + *operationResults = "operation_result"; + return ETaskStatus::Completed; + } + } + return ETaskStatus::Aborted; + }; + TFmrJobFactorySettings settings{.NumThreads =3, .Function=func}; + + auto factory = MakeFmrJobFactory(settings); + auto cancelFlag = std::make_shared<std::atomic<bool>>(false); + auto futureTaskStatus = factory->StartJob( + nullptr, cancelFlag); + Sleep(TDuration::Seconds(2)); + cancelFlag->store(true); + auto taskResult = futureTaskStatus.GetValueSync(); + ETaskStatus taskStatus = taskResult->TaskStatus; + UNIT_ASSERT_VALUES_EQUAL(taskStatus, ETaskStatus::Aborted); + UNIT_ASSERT_NO_DIFF(*operationResults, "computing_result"); + } +} + +} // namspace NYql diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/ya.make b/yt/yql/providers/yt/fmr/job_factory/impl/ya.make new file mode 100644 index 0000000000..afadad4a9b --- /dev/null +++ b/yt/yql/providers/yt/fmr/job_factory/impl/ya.make @@ -0,0 +1,20 @@ +LIBRARY() + +SRCS( + yql_yt_job_factory_impl.cpp +) + +PEERDIR( + library/cpp/threading/future + yt/yql/providers/yt/fmr/job_factory/interface + yt/yql/providers/yt/fmr/request_options + yql/essentials/utils/log +) + +YQL_LAST_ABI_VERSION() + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp new file mode 100644 index 0000000000..f2762857b0 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.cpp @@ -0,0 +1,62 @@ +#include <util/system/mutex.h> +#include <yql/essentials/utils/log/log.h> +#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h> + +namespace NYql { + +class TFmrJobFactory: public IFmrJobFactory { +public: + TFmrJobFactory(const TFmrJobFactorySettings& settings) + : NumThreads_(settings.NumThreads), Function_(settings.Function) + { + Start(); + } + + ~TFmrJobFactory() { + Stop(); + } + + NThreading::TFuture<TTaskResult::TPtr> StartJob(TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) override { + auto promise = NThreading::NewPromise<TTaskResult::TPtr>(); + auto future = promise.GetFuture(); + auto startJobFunc = [&, task, cancelFlag, promise = std::move(promise)] () mutable { + ETaskStatus finalTaskStatus; + TMaybe<TFmrError> taskErrorMessage; + try { + TString sessionId; + if (task) { + sessionId = task->SessionId; + } + YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId); + YQL_CLOG(DEBUG, FastMapReduce) << "Starting job with taskId " << task->TaskId; + finalTaskStatus = Function_(task, cancelFlag); + } catch (const std::exception& exc) { + finalTaskStatus = ETaskStatus::Failed; + taskErrorMessage = TFmrError{.Component = EFmrComponent::Job, .ErrorMessage = exc.what()}; + } + promise.SetValue(MakeTaskResult(finalTaskStatus, taskErrorMessage)); + }; + ThreadPool_->SafeAddFunc(startJobFunc); + return future; + } + +private: + void Start() { + ThreadPool_ = CreateThreadPool(NumThreads_); + } + + void Stop() { + ThreadPool_->Stop(); + } + +private: + THolder<IThreadPool> ThreadPool_; + i32 NumThreads_; + std::function<ETaskStatus(TTask::TPtr, std::shared_ptr<std::atomic<bool>>)> Function_; +}; + +TFmrJobFactory::TPtr MakeFmrJobFactory(const TFmrJobFactorySettings& settings) { + return MakeIntrusive<TFmrJobFactory>(settings); +} + +} // namespace NYql diff --git a/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h new file mode 100644 index 0000000000..00a29f95a6 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h @@ -0,0 +1,16 @@ +#pragma once + +#include <library/cpp/threading/future/async.h> +#include <util/thread/pool.h> +#include <yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h> + +namespace NYql { + +struct TFmrJobFactorySettings { + ui32 NumThreads = 3; + std::function<ETaskStatus(TTask::TPtr, std::shared_ptr<std::atomic<bool>>)> Function; +}; + +IFmrJobFactory::TPtr MakeFmrJobFactory(const TFmrJobFactorySettings& settings); + +} // namepspace NYql diff --git a/yt/yql/providers/yt/fmr/job_factory/interface/ya.make b/yt/yql/providers/yt/fmr/job_factory/interface/ya.make new file mode 100644 index 0000000000..ed05358ed3 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job_factory/interface/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + yql_yt_job_factory.cpp +) + +PEERDIR( + library/cpp/threading/future +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.cpp b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.cpp new file mode 100644 index 0000000000..8b1091674d --- /dev/null +++ b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.cpp @@ -0,0 +1 @@ +#include "yql_yt_job_factory.h" diff --git a/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h new file mode 100644 index 0000000000..387286687b --- /dev/null +++ b/yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h @@ -0,0 +1,17 @@ +#pragma once + +#include <library/cpp/threading/future/core/future.h> +#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h> + +namespace NYql { + +class IFmrJobFactory: public TThrRefBase { +public: + using TPtr = TIntrusivePtr<IFmrJobFactory>; + + virtual ~IFmrJobFactory() = default; + + virtual NThreading::TFuture<TTaskResult::TPtr> StartJob(TTask::TPtr task, std::shared_ptr<std::atomic<bool>> cancelFlag) = 0; +}; + +} // namspace NYql diff --git a/yt/yql/providers/yt/fmr/request_options/ya.make b/yt/yql/providers/yt/fmr/request_options/ya.make new file mode 100644 index 0000000000..0010183a60 --- /dev/null +++ b/yt/yql/providers/yt/fmr/request_options/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + yql_yt_request_options.cpp +) + +PEERDIR( + library/cpp/threading/future +) + +YQL_LAST_ABI_VERSION() + +GENERATE_ENUM_SERIALIZATION(yql_yt_request_options.h) + +END() diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp new file mode 100644 index 0000000000..1e59e0dc8d --- /dev/null +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp @@ -0,0 +1,28 @@ +#include "yql_yt_request_options.h" + +namespace NYql { + +TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId) { + return MakeIntrusive<TTask>(taskType, taskId, taskParams, sessionId); +} + +TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& taskErrorMessage) { + return MakeIntrusive<TTaskState>(taskStatus, taskId, taskErrorMessage); +} + +TTaskResult::TPtr MakeTaskResult(ETaskStatus taskStatus, const TMaybe<TFmrError>& taskErrorMessage) { + return MakeIntrusive<TTaskResult>(taskStatus, taskErrorMessage); +} + +} // namepsace NYql + +template<> +void Out<NYql::TFmrError>(IOutputStream& out, const NYql::TFmrError& error) { + out << "FmrError[" << error.Component << "]"; + if (error.Component == NYql::EFmrComponent::Worker) { + out << "(TaskId: " << error.TaskId << " WorkerId: " << error.WorkerId << ") "; + } else if (error.Component == NYql::EFmrComponent::Coordinator) { + out << "(OperationId: " << error.OperationId <<") "; + } + out << error.ErrorMessage; +} diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h new file mode 100644 index 0000000000..d8f21e5f1e --- /dev/null +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h @@ -0,0 +1,126 @@ +#pragma once + +#include <util/generic/maybe.h> +#include <util/generic/string.h> +#include <vector> + +namespace NYql { + +enum class EOperationStatus { + Accepted, + InProgress, + Failed, + Completed, + Aborted, + NotFound +}; + +enum class ETaskStatus { + Accepted, + InProgress, + Failed, + Completed, + Aborted +}; + +enum class ETaskType { + Download, + Upload, + Merge +}; + +enum class EFmrComponent { + Coordinator, + Worker, + Job +}; + +struct TFmrError { + EFmrComponent Component; + TString ErrorMessage; + TMaybe<ui32> WorkerId; + TMaybe<TString> TaskId; + TMaybe<TString> OperationId; +}; + +struct TStatistics { +}; + +struct TYtTableRef { + TString Path; + TString Cluster; + TString TransactionId; +}; + +struct TFmrTableRef { + TString TableId; +}; + +struct TTableRef { + std::variant<TYtTableRef, TFmrTableRef> TableRef; +}; + +struct TUploadTaskParams { + TFmrTableRef Input; + TYtTableRef Output; +}; + +struct TDownloadTaskParams { + TYtTableRef Input; + TFmrTableRef Output; +}; + +struct TMergeTaskParams { + std::vector<TTableRef> Input; + TFmrTableRef Output; +}; + +using TTaskParams = std::variant<TUploadTaskParams, TDownloadTaskParams, TMergeTaskParams>; + +struct TTask: public TThrRefBase { + TTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId, ui32 numRetries = 1) + : TaskType(taskType), TaskId(taskId), TaskParams(taskParams), SessionId(sessionId), NumRetries(numRetries) + { + } + + ETaskType TaskType; + TString TaskId; + TTaskParams TaskParams; + TString SessionId; + ui32 NumRetries; // Not supported yet + + using TPtr = TIntrusivePtr<TTask>; +}; + +struct TTaskState: public TThrRefBase { + TTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& errorMessage = Nothing()) + : TaskStatus(taskStatus), TaskId(taskId), TaskErrorMessage(errorMessage) + { + } + + ETaskStatus TaskStatus; + TString TaskId; + TMaybe<TFmrError> TaskErrorMessage; + + using TPtr = TIntrusivePtr<TTaskState>; +}; + +struct TTaskResult: public TThrRefBase { + TTaskResult(ETaskStatus taskStatus, const TMaybe<TFmrError>& errorMessage = Nothing()) + : TaskStatus(taskStatus), TaskErrorMessage(errorMessage) + { + } + + ETaskStatus TaskStatus; + TMaybe<TFmrError> TaskErrorMessage; + + using TPtr = TIntrusivePtr<TTaskResult>; +}; + +TTask::TPtr MakeTask(ETaskType taskType, const TString& taskId, const TTaskParams& taskParams, const TString& sessionId); + +TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, const TMaybe<TFmrError>& taskErrorMessage = Nothing()); + +TTaskResult::TPtr MakeTaskResult(ETaskStatus taskStatus, const TMaybe<TFmrError>& taskErrorMessage = Nothing()); + +} // namespace NYql diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make b/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make new file mode 100644 index 0000000000..3e12787010 --- /dev/null +++ b/yt/yql/providers/yt/fmr/worker/impl/ut/ya.make @@ -0,0 +1,16 @@ +UNITTEST() + +SRCS( + yql_yt_worker_ut.cpp +) + +PEERDIR( + library/cpp/yt/assert + yt/yql/providers/yt/fmr/coordinator/impl + yt/yql/providers/yt/fmr/job_factory/impl + yt/yql/providers/yt/fmr/worker/impl +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp new file mode 100644 index 0000000000..8077c9abea --- /dev/null +++ b/yt/yql/providers/yt/fmr/worker/impl/ut/yql_yt_worker_ut.cpp @@ -0,0 +1,101 @@ +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/threading/future/async.h> +#include <util/stream/output.h> +#include <util/string/cast.h> +#include <util/system/mutex.h> +#include <util/thread/pool.h> +#include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h> +#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h> +#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h> + +namespace NYql { + +TDownloadTaskParams downloadTaskParams{ + .Input = TYtTableRef{"Path","Cluster","TransactionId"}, + .Output = TFmrTableRef{"TableId"} +}; + +TStartOperationRequest CreateOperationRequest(ETaskType taskType = ETaskType::Download, TTaskParams taskParams = downloadTaskParams) { + return TStartOperationRequest{.TaskType = taskType, .TaskParams = taskParams, .IdempotencyKey = "IdempotencyKey"}; +} + +Y_UNIT_TEST_SUITE(FmrWorkerTests) { + Y_UNIT_TEST(GetSuccessfulOperationResult) { + auto coordinator = MakeFmrCoordinator(); + auto operationResults = std::make_shared<TString>("no_result_yet"); + auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { + while (!cancelFlag->load()) { + *operationResults = "operation_result"; + return ETaskStatus::Completed; + } + return ETaskStatus::Aborted; + }; + TFmrJobFactorySettings settings{.NumThreads = 3, .Function = func}; + + auto factory = MakeFmrJobFactory(settings); + TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; + auto worker = MakeFmrWorker(coordinator, factory, workerSettings); + worker->Start(); + coordinator->StartOperation(CreateOperationRequest()).GetValueSync(); + Sleep(TDuration::Seconds(2)); + worker->Stop(); + UNIT_ASSERT_NO_DIFF(*operationResults, "operation_result"); + } + + Y_UNIT_TEST(CancelOperation) { + auto coordinator = MakeFmrCoordinator(); + auto operationResults = std::make_shared<TString>("no_result_yet"); + auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { + int numIterations = 0; + while (!cancelFlag->load()) { + Sleep(TDuration::Seconds(1)); + ++numIterations; + if (numIterations == 100) { + *operationResults = "operation_result"; + return ETaskStatus::Completed; + } + } + *operationResults = "operation_cancelled"; + return ETaskStatus::Aborted; + }; + TFmrJobFactorySettings settings{.NumThreads =3, .Function=func}; + auto factory = MakeFmrJobFactory(settings); + TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; + auto worker = MakeFmrWorker(coordinator, factory, workerSettings); + worker->Start(); + auto operationId = coordinator->StartOperation(CreateOperationRequest()).GetValueSync().OperationId; + Sleep(TDuration::Seconds(2)); + coordinator->DeleteOperation({operationId}).GetValueSync(); + Sleep(TDuration::Seconds(5)); + worker->Stop(); + UNIT_ASSERT_NO_DIFF(*operationResults, "operation_cancelled"); + } + Y_UNIT_TEST(CreateSeveralWorkers) { + TFmrCoordinatorSettings coordinatorSettings{}; + coordinatorSettings.WorkersNum = 2; + coordinatorSettings.RandomProvider = CreateDeterministicRandomProvider(3); + auto coordinator = MakeFmrCoordinator(coordinatorSettings); + std::shared_ptr<std::atomic<ui32>> operationResult = std::make_shared<std::atomic<ui32>>(0); + auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { + while (!cancelFlag->load()) { + Sleep(TDuration::Seconds(1)); + (*operationResult)++; + return ETaskStatus::Completed; + } + return ETaskStatus::Aborted; + }; + TFmrJobFactorySettings settings{.NumThreads =3, .Function=func}; + auto factory = MakeFmrJobFactory(settings); + TFmrWorkerSettings firstWorkerSettings{.WorkerId = 1, .RandomProvider = CreateDeterministicRandomProvider(1)}; + TFmrWorkerSettings secondWorkerSettings{.WorkerId = 2, .RandomProvider = CreateDeterministicRandomProvider(2)}; + auto firstWorker = MakeFmrWorker(coordinator, factory, firstWorkerSettings); + auto secondWorker = MakeFmrWorker(coordinator, factory, secondWorkerSettings); + firstWorker->Start(); + secondWorker->Start(); + coordinator->StartOperation(CreateOperationRequest()).GetValueSync(); + Sleep(TDuration::Seconds(3)); + UNIT_ASSERT_VALUES_EQUAL(operationResult->load(), 1); + } +} + +} // namspace NYql diff --git a/yt/yql/providers/yt/fmr/worker/impl/ya.make b/yt/yql/providers/yt/fmr/worker/impl/ya.make new file mode 100644 index 0000000000..28cfcbea66 --- /dev/null +++ b/yt/yql/providers/yt/fmr/worker/impl/ya.make @@ -0,0 +1,20 @@ +LIBRARY() + +SRCS( + yql_yt_worker_impl.cpp +) + +PEERDIR( + library/cpp/random_provider + library/cpp/threading/future + yt/yql/providers/yt/fmr/worker/interface + yql/essentials/utils +) + +YQL_LAST_ABI_VERSION() + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp new file mode 100644 index 0000000000..00363e443f --- /dev/null +++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.cpp @@ -0,0 +1,139 @@ +#include <library/cpp/threading/future/wait/wait.h> +#include <thread> +#include <ranges> +#include <util/system/mutex.h> +#include <yql/essentials/utils/yql_panic.h> +#include "yql_yt_worker_impl.h" + +namespace NYql { + +namespace { + +struct TFmrWorkerState { + TMutex Mutex; + std::unordered_map<TString, TTaskResult::TPtr> TaskStatuses; + std::unordered_map<TString, NThreading::TFuture<TTaskResult::TPtr>> TaskFutures; +}; + +class TFmrWorker: public IFmrWorker { +public: + TFmrWorker(IFmrCoordinator::TPtr coordinator, IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings) + : Coordinator_(coordinator), + JobFactory_(jobFactory), + WorkerState_(std::make_shared<TFmrWorkerState>(TMutex(), std::unordered_map<TString, TTaskResult::TPtr>{}, std::unordered_map<TString, NThreading::TFuture<TTaskResult::TPtr>>{})), + StopWorker_(false), + RandomProvider_(settings.RandomProvider), + WorkerId_(settings.WorkerId), + VolatileId_(GetGuidAsString(RandomProvider_->GenGuid())), + TimeToSleepBetweenRequests_(settings.TimeToSleepBetweenRequests) +{ +} + + ~TFmrWorker() { + Stop(); + } + + void Start() override { + auto mainThreadFunc = [&] () { + while (!StopWorker_) { + std::vector<TTaskState::TPtr> taskStates; + std::vector<TString> taskIdsToErase; + with_lock(WorkerState_->Mutex) { + for (auto& [taskId, taskResult]: WorkerState_->TaskStatuses) { + auto taskStatus = taskResult->TaskStatus; + if (taskStatus != ETaskStatus::InProgress) { + taskIdsToErase.emplace_back(taskId); + } + taskStates.emplace_back(MakeTaskState(taskStatus, taskId, taskResult->TaskErrorMessage)); + } + for (auto& taskId: taskIdsToErase) { + WorkerState_->TaskStatuses.erase(taskId); + TasksCancelStatus_.erase(taskId); + } + } + + auto heartbeatRequest = THeartbeatRequest( + WorkerId_, + VolatileId_, + taskStates, + TStatistics() + ); + auto heartbeatResponseFuture = Coordinator_->SendHeartbeatResponse(heartbeatRequest); + auto heartbeatResponse = heartbeatResponseFuture.GetValueSync(); + std::vector<TTask::TPtr> tasksToRun = heartbeatResponse.TasksToRun; + std::unordered_set<TString> taskToDeleteIds = heartbeatResponse.TaskToDeleteIds; + + with_lock(WorkerState_->Mutex) { + for (auto task: tasksToRun) { + auto taskId = task->TaskId; + YQL_ENSURE(!WorkerState_->TaskStatuses.contains(taskId)); + WorkerState_->TaskStatuses[taskId] = MakeTaskResult(ETaskStatus::InProgress); + TasksCancelStatus_[taskId] = std::make_shared<std::atomic<bool>>(false); + } + for (auto& taskToDeleteId: taskToDeleteIds) { + if (TasksCancelStatus_.contains(taskToDeleteId)) { + TasksCancelStatus_[taskToDeleteId]->store(true); + } + } + + for (auto task: tasksToRun) { + auto taskId = task->TaskId; + auto future = JobFactory_->StartJob(task, TasksCancelStatus_[taskId]); + future.Subscribe([weakState = std::weak_ptr(WorkerState_), task](const auto& jobFuture) { + auto finalTaskStatus = jobFuture.GetValue(); + std::shared_ptr<TFmrWorkerState> state = weakState.lock(); + if (state) { + with_lock(state->Mutex) { + YQL_ENSURE(state->TaskStatuses.contains(task->TaskId)); + state->TaskStatuses[task->TaskId] = finalTaskStatus; + state->TaskFutures.erase(task->TaskId); + } + } + }); + YQL_ENSURE(!WorkerState_->TaskFutures.contains(taskId)); + WorkerState_->TaskFutures[taskId] = future; + } + } + Sleep(TimeToSleepBetweenRequests_); + } + }; + MainThread_ = std::thread(mainThreadFunc); + } + + void Stop() override { + std::vector<NThreading::TFuture<TTaskResult::TPtr>> taskFutures; + with_lock(WorkerState_->Mutex) { + for (auto& taskInfo: TasksCancelStatus_) { + taskInfo.second->store(true); + } + StopWorker_ = true; + auto futuresView = std::views::values(WorkerState_->TaskFutures); + taskFutures = std::vector<NThreading::TFuture<TTaskResult::TPtr>>{futuresView.begin(), futuresView.end()}; + } + NThreading::WaitAll(taskFutures).GetValueSync(); + if (MainThread_.joinable()) { + MainThread_.join(); + } + } + +private: + + IFmrCoordinator::TPtr Coordinator_; + IFmrJobFactory::TPtr JobFactory_; + std::unordered_map<TString, std::shared_ptr<std::atomic<bool>>> TasksCancelStatus_; + std::shared_ptr<TFmrWorkerState> WorkerState_; + std::atomic<bool> StopWorker_; + const TIntrusivePtr<IRandomProvider> RandomProvider_; + const ui32 WorkerId_; + const TString VolatileId_; + std::thread MainThread_; + const TDuration TimeToSleepBetweenRequests_; +}; + +} // namespace + +IFmrWorker::TPtr MakeFmrWorker(IFmrCoordinator::TPtr coordinator, IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings) { + return MakeIntrusive<TFmrWorker>(coordinator, jobFactory, settings); +} + +} // namspace NYql diff --git a/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h new file mode 100644 index 0000000000..dc43652dea --- /dev/null +++ b/yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h @@ -0,0 +1,14 @@ +#include <library/cpp/random_provider/random_provider.h> +#include <yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h> + +namespace NYql { + +struct TFmrWorkerSettings { + ui32 WorkerId; + TIntrusivePtr<IRandomProvider> RandomProvider; + TDuration TimeToSleepBetweenRequests = TDuration::Seconds(1); +}; + +IFmrWorker::TPtr MakeFmrWorker(IFmrCoordinator::TPtr coordinator,IFmrJobFactory::TPtr jobFactory, const TFmrWorkerSettings& settings); + +} // namspace NYql diff --git a/yt/yql/providers/yt/fmr/worker/interface/ya.make b/yt/yql/providers/yt/fmr/worker/interface/ya.make new file mode 100644 index 0000000000..2042b984d0 --- /dev/null +++ b/yt/yql/providers/yt/fmr/worker/interface/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + yql_yt_worker.cpp +) + +PEERDIR( + library/cpp/threading/future + yt/yql/providers/yt/fmr/coordinator/interface + yt/yql/providers/yt/fmr/job_factory/interface +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.cpp b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.cpp new file mode 100644 index 0000000000..4c6001a217 --- /dev/null +++ b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.cpp @@ -0,0 +1 @@ +#include "yql_yt_worker.h" diff --git a/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h new file mode 100644 index 0000000000..d3d5690fbd --- /dev/null +++ b/yt/yql/providers/yt/fmr/worker/interface/yql_yt_worker.h @@ -0,0 +1,19 @@ +#include <library/cpp/threading/future/core/future.h> + +#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h> +#include <yt/yql/providers/yt/fmr/job_factory/interface/yql_yt_job_factory.h> + +namespace NYql { + +class IFmrWorker: public TThrRefBase { +public: + using TPtr = TIntrusivePtr<IFmrWorker>; + + virtual ~IFmrWorker() = default; + + virtual void Start() = 0; + + virtual void Stop() = 0; +}; + +} // namspace NYql diff --git a/yt/yql/providers/yt/gateway/fmr/ya.make b/yt/yql/providers/yt/gateway/fmr/ya.make index 4f805b1aca..bfc91df2f5 100644 --- a/yt/yql/providers/yt/gateway/fmr/ya.make +++ b/yt/yql/providers/yt/gateway/fmr/ya.make @@ -6,6 +6,8 @@ SRCS( PEERDIR( yql/essentials/utils/log + yt/yql/providers/yt/expr_nodes + yt/yql/providers/yt/fmr/coordinator/interface yt/yql/providers/yt/provider ) diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp index 81d4224530..266f3c8174 100644 --- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp +++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp @@ -1,27 +1,285 @@ #include "yql_yt_fmr.h" +#include <thread> + +#include <yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h> +#include <yt/yql/providers/yt/provider/yql_yt_helpers.h> + +#include <yql/essentials/utils/log/log.h> #include <yql/essentials/utils/log/profile.h> #include <util/generic/ptr.h> +#include <util/thread/pool.h> using namespace NThreading; +using namespace NYql::NNodes; namespace NYql { namespace { +enum class ETablePresenceStatus { + OnlyInYt, + OnlyInFmr, + Both +}; + +struct TDownloadTableToFmrResult: public NCommon::TOperationResult {}; // Download Yt -> Fmr TableDataService + class TFmrYtGateway final: public TYtForwardingGatewayBase { public: - TFmrYtGateway(IYtGateway::TPtr&& slave) - : TYtForwardingGatewayBase(std::move(slave)) + TFmrYtGateway(IYtGateway::TPtr&& slave, IFmrCoordinator::TPtr coordinator, const TFmrYtGatewaySettings& settings) + : TYtForwardingGatewayBase(std::move(slave)), + Coordinator_(coordinator), + SessionStates_(std::make_shared<TSession>(TSession())), + RandomProvider_(settings.RandomProvider), + TimeToSleepBetweenGetOperationRequests_(settings.TimeToSleepBetweenGetOperationRequests) { + auto getOperationStatusesFunc = [&] { + while (!StopFmrGateway_) { + with_lock(SessionStates_->Mutex) { + auto checkOperationStatuses = [&] <typename T> (std::unordered_map<TString, TPromise<T>>& operationStatuses) { + std::vector<TString> completedOperationIds; + for (auto& [operationId, promise]: operationStatuses) { + auto getOperationFuture = Coordinator_->GetOperation({operationId}); + getOperationFuture.Subscribe([&, operationId] (const auto& getFuture) { + auto getOperationResult = getFuture.GetValueSync(); + auto getOperationStatus = getOperationResult.Status; + auto operationErrorMessages = getOperationResult.ErrorMessages; + with_lock(SessionStates_->Mutex) { + bool operationCompleted = getOperationStatus != EOperationStatus::Accepted && getOperationStatus != EOperationStatus::InProgress; + if (operationCompleted) { + // operation finished, set value in future returned in Publish / Download + bool hasCompletedSuccessfully = getOperationStatus == EOperationStatus::Completed; + SendOperationCompletionSignal(promise, hasCompletedSuccessfully, operationErrorMessages); + completedOperationIds.emplace_back(operationId); + } + } + }); + } + + for (auto& operationId: completedOperationIds) { + Coordinator_->DeleteOperation({operationId}).GetValueSync(); + operationStatuses.erase(operationId); + } + }; + + for (auto& [sessionId, sessionInfo]: SessionStates_->Sessions) { + auto& operationStates = sessionInfo.OperationStates; + checkOperationStatuses(operationStates.DownloadOperationStatuses); + checkOperationStatuses(operationStates.UploadOperationStatuses); + } + } + Sleep(TimeToSleepBetweenGetOperationRequests_); + } + }; + GetOperationStatusesThread_ = std::thread(getOperationStatusesFunc); + } + + ~TFmrYtGateway() { + StopFmrGateway_ = true; + GetOperationStatusesThread_.join(); + } + + TFuture<TPublishResult> Publish(const TExprNode::TPtr& node, TExprContext& ctx, TPublishOptions&& options) final { + if (!Coordinator_) { + return Slave_->Publish(node, ctx, std::move(options)); + } + auto publish = TYtPublish(node); + + auto cluster = publish.DataSink().Cluster().StringValue(); + auto outputPath = publish.Publish().Name().StringValue(); + auto transactionId = GenerateId(); + auto idempotencyKey = GenerateId(); + + auto fmrTableId = cluster + "." + outputPath; + + TFuture<TDownloadTableToFmrResult> downloadToFmrFuture; + TFuture<void> downloadedSuccessfully; + + TString sessionId = options.SessionId(); + YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__, sessionId); + + with_lock(SessionStates_->Mutex) { + auto& tablePresenceStatuses = SessionStates_->Sessions[sessionId].TablePresenceStatuses; + + if (!tablePresenceStatuses.contains(fmrTableId)) { + TYtTableRef ytTable{.Path = outputPath, .Cluster = cluster, .TransactionId = transactionId}; + tablePresenceStatuses[fmrTableId] = ETablePresenceStatus::Both; + downloadToFmrFuture = DownloadToFmrTableDataSerivce(ytTable, sessionId); + downloadedSuccessfully = downloadToFmrFuture.Apply([downloadedSuccessfully] (auto& downloadFuture) { + auto downloadResult = downloadFuture.GetValueSync(); + }); + } else { + downloadedSuccessfully = MakeFuture(); + } + } + downloadedSuccessfully.Wait(); // blocking until download to fmr finishes + + YQL_CLOG(INFO, FastMapReduce) << "Uploading table with cluster " << cluster << " and path " << outputPath << " from fmr to yt"; + + TUploadTaskParams uploadTaskParams{ + .Input = TFmrTableRef{fmrTableId}, + .Output = TYtTableRef{outputPath, cluster, transactionId} + }; + + TStartOperationRequest uploadRequest{ + .TaskType = ETaskType::Upload, .TaskParams = uploadTaskParams, .SessionId = sessionId, .IdempotencyKey=idempotencyKey, .NumRetries=1 + }; + + auto promise = NewPromise<TPublishResult>(); + auto future = promise.GetFuture(); + + auto startOperationResponseFuture = Coordinator_->StartOperation(uploadRequest); + startOperationResponseFuture.Subscribe([this, promise = std::move(promise), &sessionId] (const auto& startFuture) { + TString operationId = startFuture.GetValueSync().OperationId; + with_lock(SessionStates_->Mutex) { + YQL_ENSURE(SessionStates_->Sessions.contains(sessionId)); + auto& operationStates = SessionStates_->Sessions[sessionId].OperationStates; + auto& uploadOperationStatuses = operationStates.UploadOperationStatuses; + YQL_ENSURE(!uploadOperationStatuses.contains(operationId)); + uploadOperationStatuses[operationId] = promise; + } + }); + return future; + } + + TFuture<TDownloadTableToFmrResult> DownloadToFmrTableDataSerivce(const TYtTableRef& ytTableRef, const TString& sessionId) { + YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); + TString fmrTableId = ytTableRef.Cluster + "." + ytTableRef.Path; + TDownloadTaskParams downloadTaskParams{ + .Input = ytTableRef, + .Output = {fmrTableId} + }; + auto idempotencyKey = GenerateId(); + TStartOperationRequest downloadRequest{ + .TaskType = ETaskType::Download, .TaskParams = downloadTaskParams, .SessionId = sessionId, .IdempotencyKey=idempotencyKey, .NumRetries=1 + }; + + YQL_CLOG(INFO, FastMapReduce) << "Downloading table with cluster " << ytTableRef.Cluster << " and path " << ytTableRef.Path << " from yt to fmr"; + + auto promise = NewPromise<TDownloadTableToFmrResult>(); + auto future = promise.GetFuture(); + + auto startOperationResponseFuture = Coordinator_->StartOperation(downloadRequest); + startOperationResponseFuture.Subscribe([this, promise = std::move(promise), &sessionId] (const auto& startFuture) { + TString operationId = startFuture.GetValueSync().OperationId; + with_lock(SessionStates_->Mutex) { + auto& operationStates = SessionStates_->Sessions[sessionId].OperationStates; + auto& downloadOperationStatuses = operationStates.DownloadOperationStatuses; + YQL_ENSURE(!downloadOperationStatuses.contains(operationId)); + downloadOperationStatuses[operationId] = promise; + } + }); + return future; + } + + void OpenSession(TOpenSessionOptions&& options) final { + Slave_->OpenSession(std::move(options)); + + TString sessionId = options.SessionId(); + YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); + with_lock(SessionStates_->Mutex) { + auto sessions = SessionStates_->Sessions; + if (sessions.contains(sessionId)) { + YQL_LOG_CTX_THROW yexception() << "Session already exists: " << sessionId; + } + sessions[sessionId] = TSessionInfo(); + } + } + + TFuture<void> CloseSession(TCloseSessionOptions&& options) final { + Slave_->CloseSession(std::move(options)).Wait(); + YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); + + with_lock(SessionStates_->Mutex) { + auto& sessions = SessionStates_->Sessions; + auto it = sessions.find(options.SessionId()); + if (it != sessions.end()) { + sessions.erase(it); + } + } + return MakeFuture(); + } + + TFuture<void> CleanupSession(TCleanupSessionOptions&& options) final { + Slave_->CleanupSession(std::move(options)).Wait(); + YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); + + TString sessionId = options.SessionId(); + with_lock(SessionStates_->Mutex) { + auto& sessions = SessionStates_->Sessions; + YQL_ENSURE(sessions.contains(sessionId)); + auto& operationStates = sessions[sessionId].OperationStates; + + auto cancelOperationsFunc = [&] <typename T> (std::unordered_map<TString, TPromise<T>>& operationStatuses) { + std::vector<TFuture<TDeleteOperationResponse>> cancelOperationsFutures; + + for (auto& [operationId, promise]: operationStatuses) { + cancelOperationsFutures.emplace_back(Coordinator_->DeleteOperation({operationId})); + } + NThreading::WaitAll(cancelOperationsFutures).GetValueSync(); + std::vector<TFuture<T>> resultFutures; + + for (auto& [operationId, promise]: operationStatuses) { + SendOperationCompletionSignal(promise, false); + resultFutures.emplace_back(promise.GetFuture()); + } + NThreading::WaitAll(resultFutures).GetValueSync(); + }; + + cancelOperationsFunc(operationStates.DownloadOperationStatuses); + cancelOperationsFunc(operationStates.UploadOperationStatuses); + } + + return MakeFuture(); + } + +private: + struct TFmrGatewayOperationsState { + std::unordered_map<TString, TPromise<TPublishResult>> UploadOperationStatuses = {}; // operationId -> promise which we set when operation completes + std::unordered_map<TString, TPromise<TDownloadTableToFmrResult>> DownloadOperationStatuses = {}; + }; + + struct TSessionInfo { + TFmrGatewayOperationsState OperationStates; + std::unordered_map<TString, ETablePresenceStatus> TablePresenceStatuses; // yt cluster and path -> is it In Yt, Fmr TableDataService + }; + + struct TSession { + std::unordered_map<TString, TSessionInfo> Sessions; + TMutex Mutex = TMutex(); + }; + + IFmrCoordinator::TPtr Coordinator_; + std::shared_ptr<TSession> SessionStates_; + const TIntrusivePtr<IRandomProvider> RandomProvider_; + TDuration TimeToSleepBetweenGetOperationRequests_; + std::thread GetOperationStatusesThread_; + std::atomic<bool> StopFmrGateway_; + + TString GenerateId() { + return GetGuidAsString(RandomProvider_->GenGuid()); + } + + template <std::derived_from<NCommon::TOperationResult> T> + void SendOperationCompletionSignal(TPromise<T> promise, bool completedSuccessfully = false, const std::vector<TFmrError>& errorMessages = {}) { + YQL_ENSURE(!promise.HasValue()); + T commonOperationResult; + if (completedSuccessfully) { + commonOperationResult.SetSuccess(); + } else if (!errorMessages.empty()) { + auto exception = yexception() << "Operation failed with errors: " << JoinSeq(" ", errorMessages); + commonOperationResult.SetException(exception); + } + promise.SetValue(commonOperationResult); } }; } // namespace -IYtGateway::TPtr CreateYtFmrGateway(IYtGateway::TPtr slave) { - return MakeIntrusive<TFmrYtGateway>(std::move(slave)); +IYtGateway::TPtr CreateYtFmrGateway(IYtGateway::TPtr slave, IFmrCoordinator::TPtr coordinator, const TFmrYtGatewaySettings& settings) { + return MakeIntrusive<TFmrYtGateway>(std::move(slave), std::move(coordinator), settings); } } // namspace NYql diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h index 97e26b63b1..df82d50527 100644 --- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h +++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h @@ -1,9 +1,22 @@ #pragma once +#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h> #include <yt/yql/providers/yt/provider/yql_yt_forwarding_gateway.h> namespace NYql { -IYtGateway::TPtr CreateYtFmrGateway(IYtGateway::TPtr slave); +struct TFmrYtGatewaySettings { + TIntrusivePtr<IRandomProvider> RandomProvider; + TDuration TimeToSleepBetweenGetOperationRequests; +}; + +IYtGateway::TPtr CreateYtFmrGateway( + IYtGateway::TPtr slave, + IFmrCoordinator::TPtr coordinator = nullptr, + const TFmrYtGatewaySettings& settings = TFmrYtGatewaySettings{ + .RandomProvider = CreateDefaultRandomProvider(), + .TimeToSleepBetweenGetOperationRequests = TDuration::Seconds(1) + } +); } // namspace NYql diff --git a/yt/yql/tools/ytrun/lib/ya.make b/yt/yql/tools/ytrun/lib/ya.make index 12bcaa20fd..93b20d7ed0 100644 --- a/yt/yql/tools/ytrun/lib/ya.make +++ b/yt/yql/tools/ytrun/lib/ya.make @@ -6,6 +6,9 @@ SRCS( PEERDIR( yt/yql/providers/yt/provider + yt/yql/providers/yt/fmr/coordinator/impl + yt/yql/providers/yt/fmr/job_factory/impl + yt/yql/providers/yt/fmr/worker/impl yt/yql/providers/yt/gateway/native yt/yql/providers/yt/gateway/fmr yt/yql/providers/yt/lib/config_clusters diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp index 9a4b5d0377..c2064ebd85 100644 --- a/yt/yql/tools/ytrun/lib/ytrun_lib.cpp +++ b/yt/yql/tools/ytrun/lib/ytrun_lib.cpp @@ -8,6 +8,8 @@ #include <yt/yql/providers/yt/lib/log/yt_logger.h> #include <yt/yql/providers/yt/gateway/native/yql_yt_native.h> #include <yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h> +#include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h> +#include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h> #include <yql/essentials/providers/common/provider/yql_provider_names.h> #include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h> #include <yql/essentials/core/services/yql_transform_pipeline.h> @@ -170,7 +172,26 @@ IYtGateway::TPtr TYtRunTool::CreateYtGateway() { services.FileStorage = GetFileStorage(); services.Config = std::make_shared<TYtGatewayConfig>(GetRunOptions().GatewaysConfig->GetYt()); auto ytGateway = CreateYtNativeGateway(services); - return GetRunOptions().GatewayTypes.contains(FastMapReduceGatewayName) ? CreateYtFmrGateway(ytGateway): ytGateway; + if (!GetRunOptions().GatewayTypes.contains(FastMapReduceGatewayName)) { + return ytGateway; + } + + auto coordinator = MakeFmrCoordinator(); + auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) { + while (!cancelFlag->load()) { + Sleep(TDuration::Seconds(3)); + return ETaskStatus::Completed; + } + return ETaskStatus::Aborted; + }; // TODO - use function which actually calls Downloader/Uploader based on task params + + TFmrJobFactorySettings settings{.Function=func}; + auto jobFactory = MakeFmrJobFactory(settings); + TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDefaultRandomProvider(), + .TimeToSleepBetweenRequests=TDuration::Seconds(1)}; + FmrWorker_ = MakeFmrWorker(coordinator, jobFactory, workerSettings); + FmrWorker_->Start(); + return CreateYtFmrGateway(ytGateway, coordinator); } IOptimizerFactory::TPtr TYtRunTool::CreateCboFactory() { diff --git a/yt/yql/tools/ytrun/lib/ytrun_lib.h b/yt/yql/tools/ytrun/lib/ytrun_lib.h index de3c0dfd64..e724ce2ac6 100644 --- a/yt/yql/tools/ytrun/lib/ytrun_lib.h +++ b/yt/yql/tools/ytrun/lib/ytrun_lib.h @@ -1,6 +1,7 @@ #pragma once #include <yt/yql/providers/yt/provider/yql_yt_gateway.h> +#include <yt/yql/providers/yt/fmr/worker/impl/yql_yt_worker_impl.h> #include <yql/essentials/tools/yql_facade_run/yql_facade_run.h> #include <yql/essentials/core/cbo/cbo_optimizer_new.h> @@ -32,6 +33,7 @@ protected: size_t NumThreads_ = 1; bool KeepTemp_ = false; TString DefYtServer_; + IFmrWorker::TPtr FmrWorker_; }; } // NYql |