aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-07-13 15:25:59 +0300
committerhcpp <hcpp@ydb.tech>2023-07-13 15:25:59 +0300
commit5d053620e044a17ef2e04b26898e5387cd144fb9 (patch)
tree30aa856134bd7c8a90ca486a19e2b377d44f587f
parent69e3c03a79c3b497154302c24a79a5ab278e7904 (diff)
downloadydb-5d053620e044a17ef2e04b26898e5387cd144fb9.tar.gz
start/finish time have been added
-rw-r--r--ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/compute/ydb/actors_factory.cpp6
-rw-r--r--ydb/core/fq/libs/compute/ydb/actors_factory.h2
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/events.h11
-rw-r--r--ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp2
-rw-r--r--ydb/core/fq/libs/compute/ydb/initializer_actor.cpp125
-rw-r--r--ydb/core/fq/libs/compute/ydb/initializer_actor.h18
-rw-r--r--ydb/core/fq/libs/compute/ydb/ya.make1
-rw-r--r--ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp15
12 files changed, 183 insertions, 1 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt
index a89b1eff250..3c9cb0121a1 100644
--- a/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.darwin-x86_64.txt
@@ -38,6 +38,7 @@ target_sources(libs-compute-ydb PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/actors_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/executer_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt
index fb3627b9597..28c4711a5f6 100644
--- a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-aarch64.txt
@@ -39,6 +39,7 @@ target_sources(libs-compute-ydb PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/actors_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/executer_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt
index fb3627b9597..28c4711a5f6 100644
--- a/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.linux-x86_64.txt
@@ -39,6 +39,7 @@ target_sources(libs-compute-ydb PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/actors_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/executer_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
diff --git a/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt
index a89b1eff250..3c9cb0121a1 100644
--- a/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/fq/libs/compute/ydb/CMakeLists.windows-x86_64.txt
@@ -38,6 +38,7 @@ target_sources(libs-compute-ydb PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/actors_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/executer_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
diff --git a/ydb/core/fq/libs/compute/ydb/actors_factory.cpp b/ydb/core/fq/libs/compute/ydb/actors_factory.cpp
index f347334d463..7c0c6ceedb7 100644
--- a/ydb/core/fq/libs/compute/ydb/actors_factory.cpp
+++ b/ydb/core/fq/libs/compute/ydb/actors_factory.cpp
@@ -1,6 +1,7 @@
#include "actors_factory.h"
#include "executer_actor.h"
#include "finalizer_actor.h"
+#include "initializer_actor.h"
#include "resources_cleaner_actor.h"
#include "result_writer_actor.h"
#include "status_tracker_actor.h"
@@ -37,6 +38,11 @@ struct TActorFactory : public IActorFactory {
return CreateConnectorActor(Params);
}
+ std::unique_ptr<NActors::IActor> CreateInitializer(const NActors::TActorId& parent,
+ const NActors::TActorId& pinger) const override {
+ return CreateInitializerActor(Params, parent, pinger, Counters);
+ }
+
std::unique_ptr<NActors::IActor> CreateExecuter(const NActors::TActorId &parent,
const NActors::TActorId &connector,
const NActors::TActorId &pinger) const override {
diff --git a/ydb/core/fq/libs/compute/ydb/actors_factory.h b/ydb/core/fq/libs/compute/ydb/actors_factory.h
index afbdbb13ae0..d81da9727f2 100644
--- a/ydb/core/fq/libs/compute/ydb/actors_factory.h
+++ b/ydb/core/fq/libs/compute/ydb/actors_factory.h
@@ -16,6 +16,8 @@ struct IActorFactory : public TThrRefBase {
virtual std::unique_ptr<NActors::IActor> CreatePinger(const NActors::TActorId& parent) const = 0;
virtual std::unique_ptr<NActors::IActor> CreateConnector() const = 0;
+ virtual std::unique_ptr<NActors::IActor> CreateInitializer(const NActors::TActorId& parent,
+ const NActors::TActorId& pinger) const = 0;
virtual std::unique_ptr<NActors::IActor> CreateExecuter(const NActors::TActorId &parent,
const NActors::TActorId &connector,
const NActors::TActorId &pinger) const = 0;
diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h
index ffb01be9eb9..c7e99d2aae6 100644
--- a/ydb/core/fq/libs/compute/ydb/events/events.h
+++ b/ydb/core/fq/libs/compute/ydb/events/events.h
@@ -33,6 +33,7 @@ struct TEvYdbCompute {
EvCreateDatabaseRequest,
EvCreateDatabaseResponse,
+ EvInitializerResponse,
EvExecuterResponse,
EvStatusTrackerResponse,
EvResultWriterResponse,
@@ -203,6 +204,16 @@ struct TEvYdbCompute {
NYql::TIssues Issues;
};
+ struct TEvInitializerResponse : public NActors::TEventLocal<TEvInitializerResponse, EvInitializerResponse> {
+ TEvInitializerResponse(NYql::TIssues issues, NYdb::EStatus status)
+ : Issues(std::move(issues))
+ , Status(status)
+ {}
+
+ NYql::TIssues Issues;
+ NYdb::EStatus Status;
+ };
+
struct TEvExecuterResponse : public NActors::TEventLocal<TEvExecuterResponse, EvExecuterResponse> {
TEvExecuterResponse(NYdb::TOperation::TOperationId operationId, const TString& executionId)
: OperationId(operationId)
diff --git a/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp b/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp
index a199b6110bd..30f8d30a390 100644
--- a/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp
@@ -18,6 +18,7 @@
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
+#include <google/protobuf/util/time_util.h>
#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Finalizer] QueryId: " << Params.QueryId << " " << stream)
#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Finalizer] QueryId: " << Params.QueryId << " " << stream)
@@ -80,6 +81,7 @@ public:
pingTaskRequest.mutable_result_id()->set_value(Params.ResultId);
}
pingTaskRequest.set_status(ExecStatus == NYdb::NQuery::EExecStatus::Completed || Params.Status == FederatedQuery::QueryMeta::COMPLETING ? ::FederatedQuery::QueryMeta::COMPLETED : ::FederatedQuery::QueryMeta::FAILED);
+ *pingTaskRequest.mutable_finished_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(TInstant::Now().MilliSeconds());
Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest, true));
}
diff --git a/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp b/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp
new file mode 100644
index 00000000000..f893a04ff15
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/initializer_actor.cpp
@@ -0,0 +1,125 @@
+#include "base_compute_actor.h"
+
+#include <ydb/core/fq/libs/common/util.h>
+#include <ydb/core/fq/libs/compute/common/metrics.h>
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+#include <ydb/core/fq/libs/compute/ydb/events/events.h>
+#include <ydb/core/fq/libs/ydb/ydb.h>
+#include <ydb/library/services/services.pb.h>
+
+#include <ydb/library/yql/providers/common/metrics/service_counters.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
+#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
+
+#include <google/protobuf/util/time_util.h>
+
+#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Initializer] QueryId: " << Params.QueryId << " " << stream)
+#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Initializer] QueryId: " << Params.QueryId << " " << stream)
+#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Initializer] QueryId: " << Params.QueryId << " " << stream)
+#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Initializer] QueryId: " << Params.QueryId << " " << stream)
+#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [Initializer] QueryId: " << Params.QueryId << " " << stream)
+
+namespace NFq {
+
+using namespace NActors;
+using namespace NFq;
+
+class TInitializerActor : public TBaseComputeActor<TInitializerActor> {
+public:
+ enum ERequestType {
+ RT_PING,
+ RT_MAX
+ };
+
+ class TCounters: public virtual TThrRefBase {
+ std::array<TComputeRequestCountersPtr, RT_MAX> Requests = CreateArray<RT_MAX, TComputeRequestCountersPtr>({
+ { MakeIntrusive<TComputeRequestCounters>("Ping") }
+ });
+
+ ::NMonitoring::TDynamicCounterPtr Counters;
+
+ public:
+ explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters)
+ : Counters(counters)
+ {
+ for (auto& request: Requests) {
+ request->Register(Counters);
+ }
+ }
+
+ TComputeRequestCountersPtr GetCounters(ERequestType type) {
+ return Requests[type];
+ }
+ };
+
+ TInitializerActor(const TRunActorParams& params, const TActorId& parent, const TActorId& pinger, const ::NYql::NCommon::TServiceCounters& queryCounters)
+ : TBaseComputeActor(queryCounters, "Initializer")
+ , Params(params)
+ , Parent(parent)
+ , Pinger(pinger)
+ , Counters(GetStepCountersSubgroup())
+ , StartTime(TInstant::Now())
+ {}
+
+ static constexpr char ActorName[] = "FQ_INITIALIZER_ACTOR";
+
+ void Start() {
+ LOG_I("Start initializer actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status));
+ if (!Params.RequestStartedAt) {
+ auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
+ pingCounters->InFly->Inc();
+ Become(&TInitializerActor::StateFunc);
+ Fq::Private::PingTaskRequest pingTaskRequest;
+ *pingTaskRequest.mutable_started_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(TInstant::Now().MilliSeconds());
+ Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
+ } else {
+ LOG_I("Query has been initialized (did nothing)");
+ Send(Parent, new TEvYdbCompute::TEvInitializerResponse({}, NYdb::EStatus::SUCCESS));
+ CompleteAndPassAway();
+ }
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvents::TEvForwardPingResponse, Handle);
+ )
+
+ void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) {
+ auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
+ pingCounters->InFly->Dec();
+ pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
+ if (ev.Get()->Get()->Success) {
+ pingCounters->Ok->Inc();
+ LOG_I("Query has been initialized");
+ Send(Parent, new TEvYdbCompute::TEvInitializerResponse({}, NYdb::EStatus::SUCCESS));
+ CompleteAndPassAway();
+ } else {
+ pingCounters->Error->Inc();
+ LOG_E("Error initialization query");
+ Send(Parent, new TEvYdbCompute::TEvInitializerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error moving the query to the terminal state"}}, NYdb::EStatus::INTERNAL_ERROR));
+ FailedAndPassAway();
+ }
+ }
+
+private:
+ TRunActorParams Params;
+ TActorId Parent;
+ TActorId Pinger;
+ TCounters Counters;
+ TInstant StartTime;
+};
+
+std::unique_ptr<NActors::IActor> CreateInitializerActor(const TRunActorParams& params,
+ const TActorId& parent,
+ const TActorId& pinger,
+ const ::NYql::NCommon::TServiceCounters& queryCounters) {
+ return std::make_unique<TInitializerActor>(params, parent, pinger, queryCounters);
+}
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/initializer_actor.h b/ydb/core/fq/libs/compute/ydb/initializer_actor.h
new file mode 100644
index 00000000000..392d455386a
--- /dev/null
+++ b/ydb/core/fq/libs/compute/ydb/initializer_actor.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
+
+#include <ydb/library/yql/providers/common/metrics/service_counters.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_query/query.h>
+
+#include <library/cpp/actors/core/actor.h>
+
+namespace NFq {
+
+std::unique_ptr<NActors::IActor> CreateInitializerActor(const TRunActorParams& params,
+ const NActors::TActorId& parent,
+ const NActors::TActorId& pinger,
+ const ::NYql::NCommon::TServiceCounters& queryCounters);
+
+}
diff --git a/ydb/core/fq/libs/compute/ydb/ya.make b/ydb/core/fq/libs/compute/ydb/ya.make
index 62fba34c4d1..6672a7fe616 100644
--- a/ydb/core/fq/libs/compute/ydb/ya.make
+++ b/ydb/core/fq/libs/compute/ydb/ya.make
@@ -4,6 +4,7 @@ SRCS(
actors_factory.cpp
executer_actor.cpp
finalizer_actor.cpp
+ initializer_actor.cpp
resources_cleaner_actor.cpp
result_writer_actor.cpp
status_tracker_actor.cpp
diff --git a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
index 877d6d28649..a2f727a61ae 100644
--- a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
@@ -61,6 +61,7 @@ public:
}
STRICT_STFUNC(StateFunc,
+ hFunc(TEvYdbCompute::TEvInitializerResponse, Handle);
hFunc(TEvYdbCompute::TEvExecuterResponse, Handle);
hFunc(TEvYdbCompute::TEvStatusTrackerResponse, Handle);
hFunc(TEvYdbCompute::TEvResultWriterResponse, Handle);
@@ -69,6 +70,18 @@ public:
hFunc(TEvYdbCompute::TEvStopperResponse, Handle);
)
+ void Handle(const TEvYdbCompute::TEvInitializerResponse::TPtr& ev) {
+ auto& response = *ev->Get();
+ if (response.Status != NYdb::EStatus::SUCCESS) {
+ LOG_I("InitializerResponse (failed). Issues: " << response.Issues.ToOneLineString());
+ ResignAndPassAway(response.Issues);
+ return;
+ }
+
+ LOG_I("InitializerResponse (success)");
+ Register(ActorFactory->CreateExecuter(SelfId(), Connector, Pinger).release());
+ }
+
void Handle(const TEvYdbCompute::TEvExecuterResponse::TPtr& ev) {
auto& response = *ev->Get();
if (!response.Success) {
@@ -169,7 +182,7 @@ public:
}
break;
case FederatedQuery::QueryMeta::STARTING:
- Register(ActorFactory->CreateExecuter(SelfId(), Connector, Pinger).release());
+ Register(ActorFactory->CreateInitializer(SelfId(), Pinger).release());
break;
case FederatedQuery::QueryMeta::RUNNING:
Register(ActorFactory->CreateStatusTracker(SelfId(), Connector, Pinger, Params.OperationId).release());