diff options
author | hcpp <hcpp@ydb.tech> | 2023-07-13 15:25:59 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-07-13 15:25:59 +0300 |
commit | 5d053620e044a17ef2e04b26898e5387cd144fb9 (patch) | |
tree | 30aa856134bd7c8a90ca486a19e2b377d44f587f | |
parent | 69e3c03a79c3b497154302c24a79a5ab278e7904 (diff) | |
download | ydb-5d053620e044a17ef2e04b26898e5387cd144fb9.tar.gz |
start/finish time have been added
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/actors_factory.cpp | 6 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/actors_factory.h | 2 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/events/events.h | 11 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/initializer_actor.cpp | 125 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/initializer_actor.h | 18 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/ya.make | 1 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp | 15 |
12 files changed, 183 insertions, 1 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt index a89b1eff250..3c9cb0121a1 100644 --- a/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt @@ -38,6 +38,7 @@ target_sources(libs-compute-ydb PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/actors_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/executer_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt index fb3627b9597..28c4711a5f6 100644 --- a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt +++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt @@ -39,6 +39,7 @@ target_sources(libs-compute-ydb PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/actors_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/executer_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt index fb3627b9597..28c4711a5f6 100644 --- a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt +++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt @@ -39,6 +39,7 @@ target_sources(libs-compute-ydb PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/actors_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/executer_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt index a89b1eff250..3c9cb0121a1 100644 --- a/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt +++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt @@ -38,6 +38,7 @@ target_sources(libs-compute-ydb PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/actors_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/executer_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp diff --git a/ydb/core/fq/libs/compute/ydb/actors_factory.cpp b/ydb/core/fq/libs/compute/ydb/actors_factory.cpp index f347334d463..7c0c6ceedb7 100644 --- a/ydb/core/fq/libs/compute/ydb/actors_factory.cpp +++ b/ydb/core/fq/libs/compute/ydb/actors_factory.cpp @@ -1,6 +1,7 @@ #include "actors_factory.h" #include "executer_actor.h" #include "finalizer_actor.h" +#include "initializer_actor.h" #include "resources_cleaner_actor.h" #include "result_writer_actor.h" #include "status_tracker_actor.h" @@ -37,6 +38,11 @@ struct TActorFactory : public IActorFactory { return CreateConnectorActor(Params); } + std::unique_ptr<NActors::IActor> CreateInitializer(const NActors::TActorId& parent, + const NActors::TActorId& pinger) const override { + return CreateInitializerActor(Params, parent, pinger, Counters); + } + std::unique_ptr<NActors::IActor> CreateExecuter(const NActors::TActorId &parent, const NActors::TActorId &connector, const NActors::TActorId &pinger) const override { diff --git a/ydb/core/fq/libs/compute/ydb/actors_factory.h b/ydb/core/fq/libs/compute/ydb/actors_factory.h index afbdbb13ae0..d81da9727f2 100644 --- a/ydb/core/fq/libs/compute/ydb/actors_factory.h +++ b/ydb/core/fq/libs/compute/ydb/actors_factory.h @@ -16,6 +16,8 @@ struct IActorFactory : public TThrRefBase { virtual std::unique_ptr<NActors::IActor> CreatePinger(const NActors::TActorId& parent) const = 0; virtual std::unique_ptr<NActors::IActor> CreateConnector() const = 0; + virtual std::unique_ptr<NActors::IActor> CreateInitializer(const NActors::TActorId& parent, + const NActors::TActorId& pinger) const = 0; virtual std::unique_ptr<NActors::IActor> CreateExecuter(const NActors::TActorId &parent, const NActors::TActorId &connector, const NActors::TActorId &pinger) const = 0; diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h index ffb01be9eb9..c7e99d2aae6 100644 --- a/ydb/core/fq/libs/compute/ydb/events/events.h +++ b/ydb/core/fq/libs/compute/ydb/events/events.h @@ -33,6 +33,7 @@ struct TEvYdbCompute { EvCreateDatabaseRequest, EvCreateDatabaseResponse, + EvInitializerResponse, EvExecuterResponse, EvStatusTrackerResponse, EvResultWriterResponse, @@ -203,6 +204,16 @@ struct TEvYdbCompute { NYql::TIssues Issues; }; + struct TEvInitializerResponse : public NActors::TEventLocal<TEvInitializerResponse, EvInitializerResponse> { + TEvInitializerResponse(NYql::TIssues issues, NYdb::EStatus status) + : Issues(std::move(issues)) + , Status(status) + {} + + NYql::TIssues Issues; + NYdb::EStatus Status; + }; + struct TEvExecuterResponse : public NActors::TEventLocal<TEvExecuterResponse, EvExecuterResponse> { TEvExecuterResponse(NYdb::TOperation::TOperationId operationId, const TString& executionId) : OperationId(operationId) diff --git a/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp b/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp index a199b6110bd..30f8d30a390 100644 --- a/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp @@ -18,6 +18,7 @@ #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> +#include <google/protobuf/util/time_util.h> #define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Finalizer] QueryId: " << Params.QueryId << " " << stream) #define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Finalizer] QueryId: " << Params.QueryId << " " << stream) @@ -80,6 +81,7 @@ public: pingTaskRequest.mutable_result_id()->set_value(Params.ResultId); } pingTaskRequest.set_status(ExecStatus == NYdb::NQuery::EExecStatus::Completed || Params.Status == FederatedQuery::QueryMeta::COMPLETING ? ::FederatedQuery::QueryMeta::COMPLETED : ::FederatedQuery::QueryMeta::FAILED); + *pingTaskRequest.mutable_finished_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(TInstant::Now().MilliSeconds()); Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest, true)); } diff --git a/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp b/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp new file mode 100644 index 00000000000..f893a04ff15 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp @@ -0,0 +1,125 @@ +#include "base_compute_actor.h" + +#include <ydb/core/fq/libs/common/util.h> +#include <ydb/core/fq/libs/compute/common/metrics.h> +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> +#include <ydb/core/fq/libs/compute/ydb/events/events.h> +#include <ydb/core/fq/libs/ydb/ydb.h> +#include <ydb/library/services/services.pb.h> + +#include <ydb/library/yql/providers/common/metrics/service_counters.h> + +#include <ydb/public/sdk/cpp/client/ydb_query/client.h> +#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> + +#include <google/protobuf/util/time_util.h> + +#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Initializer] QueryId: " << Params.QueryId << " " << stream) +#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Initializer] QueryId: " << Params.QueryId << " " << stream) +#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Initializer] QueryId: " << Params.QueryId << " " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Initializer] QueryId: " << Params.QueryId << " " << stream) +#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Initializer] QueryId: " << Params.QueryId << " " << stream) + +namespace NFq { + +using namespace NActors; +using namespace NFq; + +class TInitializerActor : public TBaseComputeActor<TInitializerActor> { +public: + enum ERequestType { + RT_PING, + RT_MAX + }; + + class TCounters: public virtual TThrRefBase { + std::array<TComputeRequestCountersPtr, RT_MAX> Requests = CreateArray<RT_MAX, TComputeRequestCountersPtr>({ + { MakeIntrusive<TComputeRequestCounters>("Ping") } + }); + + ::NMonitoring::TDynamicCounterPtr Counters; + + public: + explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters) + : Counters(counters) + { + for (auto& request: Requests) { + request->Register(Counters); + } + } + + TComputeRequestCountersPtr GetCounters(ERequestType type) { + return Requests[type]; + } + }; + + TInitializerActor(const TRunActorParams& params, const TActorId& parent, const TActorId& pinger, const ::NYql::NCommon::TServiceCounters& queryCounters) + : TBaseComputeActor(queryCounters, "Initializer") + , Params(params) + , Parent(parent) + , Pinger(pinger) + , Counters(GetStepCountersSubgroup()) + , StartTime(TInstant::Now()) + {} + + static constexpr char ActorName[] = "FQ_INITIALIZER_ACTOR"; + + void Start() { + LOG_I("Start initializer actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status)); + if (!Params.RequestStartedAt) { + auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); + pingCounters->InFly->Inc(); + Become(&TInitializerActor::StateFunc); + Fq::Private::PingTaskRequest pingTaskRequest; + *pingTaskRequest.mutable_started_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(TInstant::Now().MilliSeconds()); + Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest)); + } else { + LOG_I("Query has been initialized (did nothing)"); + Send(Parent, new TEvYdbCompute::TEvInitializerResponse({}, NYdb::EStatus::SUCCESS)); + CompleteAndPassAway(); + } + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvents::TEvForwardPingResponse, Handle); + ) + + void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) { + auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); + pingCounters->InFly->Dec(); + pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); + if (ev.Get()->Get()->Success) { + pingCounters->Ok->Inc(); + LOG_I("Query has been initialized"); + Send(Parent, new TEvYdbCompute::TEvInitializerResponse({}, NYdb::EStatus::SUCCESS)); + CompleteAndPassAway(); + } else { + pingCounters->Error->Inc(); + LOG_E("Error initialization query"); + Send(Parent, new TEvYdbCompute::TEvInitializerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error moving the query to the terminal state"}}, NYdb::EStatus::INTERNAL_ERROR)); + FailedAndPassAway(); + } + } + +private: + TRunActorParams Params; + TActorId Parent; + TActorId Pinger; + TCounters Counters; + TInstant StartTime; +}; + +std::unique_ptr<NActors::IActor> CreateInitializerActor(const TRunActorParams& params, + const TActorId& parent, + const TActorId& pinger, + const ::NYql::NCommon::TServiceCounters& queryCounters) { + return std::make_unique<TInitializerActor>(params, parent, pinger, queryCounters); +} + +} diff --git a/ydb/core/fq/libs/compute/ydb/initializer_actor.h b/ydb/core/fq/libs/compute/ydb/initializer_actor.h new file mode 100644 index 00000000000..392d455386a --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/initializer_actor.h @@ -0,0 +1,18 @@ +#pragma once + +#include <ydb/core/fq/libs/compute/common/run_actor_params.h> + +#include <ydb/library/yql/providers/common/metrics/service_counters.h> + +#include <ydb/public/sdk/cpp/client/ydb_query/query.h> + +#include <library/cpp/actors/core/actor.h> + +namespace NFq { + +std::unique_ptr<NActors::IActor> CreateInitializerActor(const TRunActorParams& params, + const NActors::TActorId& parent, + const NActors::TActorId& pinger, + const ::NYql::NCommon::TServiceCounters& queryCounters); + +} diff --git a/ydb/core/fq/libs/compute/ydb/ya.make b/ydb/core/fq/libs/compute/ydb/ya.make index 62fba34c4d1..6672a7fe616 100644 --- a/ydb/core/fq/libs/compute/ydb/ya.make +++ b/ydb/core/fq/libs/compute/ydb/ya.make @@ -4,6 +4,7 @@ SRCS( actors_factory.cpp executer_actor.cpp finalizer_actor.cpp + initializer_actor.cpp resources_cleaner_actor.cpp result_writer_actor.cpp status_tracker_actor.cpp diff --git a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp index 877d6d28649..a2f727a61ae 100644 --- a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp @@ -61,6 +61,7 @@ public: } STRICT_STFUNC(StateFunc, + hFunc(TEvYdbCompute::TEvInitializerResponse, Handle); hFunc(TEvYdbCompute::TEvExecuterResponse, Handle); hFunc(TEvYdbCompute::TEvStatusTrackerResponse, Handle); hFunc(TEvYdbCompute::TEvResultWriterResponse, Handle); @@ -69,6 +70,18 @@ public: hFunc(TEvYdbCompute::TEvStopperResponse, Handle); ) + void Handle(const TEvYdbCompute::TEvInitializerResponse::TPtr& ev) { + auto& response = *ev->Get(); + if (response.Status != NYdb::EStatus::SUCCESS) { + LOG_I("InitializerResponse (failed). Issues: " << response.Issues.ToOneLineString()); + ResignAndPassAway(response.Issues); + return; + } + + LOG_I("InitializerResponse (success)"); + Register(ActorFactory->CreateExecuter(SelfId(), Connector, Pinger).release()); + } + void Handle(const TEvYdbCompute::TEvExecuterResponse::TPtr& ev) { auto& response = *ev->Get(); if (!response.Success) { @@ -169,7 +182,7 @@ public: } break; case FederatedQuery::QueryMeta::STARTING: - Register(ActorFactory->CreateExecuter(SelfId(), Connector, Pinger).release()); + Register(ActorFactory->CreateInitializer(SelfId(), Pinger).release()); break; case FederatedQuery::QueryMeta::RUNNING: Register(ActorFactory->CreateStatusTracker(SelfId(), Connector, Pinger, Params.OperationId).release()); |