diff options
author | hcpp <hcpp@ydb.tech> | 2023-07-24 13:02:40 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-07-24 13:02:40 +0300 |
commit | 17c4bafd1ceef077f18d709106abf746aead4267 (patch) | |
tree | e4d94c1b31a29b1e34a171efa6d5cede712a30ce | |
parent | 9908c93a1412479fe215dbfa2e10b17e1e7cd0f8 (diff) | |
download | ydb-17c4bafd1ceef077f18d709106abf746aead4267.tar.gz |
several result sets have been supported
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/base_compute_actor.h | 14 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/events/events.h | 22 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp | 211 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp | 2 |
4 files changed, 196 insertions, 53 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/base_compute_actor.h b/ydb/core/fq/libs/compute/ydb/base_compute_actor.h index 8148271ec2c..a4fa8f363e3 100644 --- a/ydb/core/fq/libs/compute/ydb/base_compute_actor.h +++ b/ydb/core/fq/libs/compute/ydb/base_compute_actor.h @@ -20,7 +20,14 @@ public: using TBase::PassAway; TBaseComputeActor(const ::NYql::NCommon::TServiceCounters& queryCounters, const TString& stepName) - : Counters(MakeIntrusive<TComputeRequestCounters>("Total", queryCounters.Counters->GetSubgroup("step", stepName))) + : BaseCounters(queryCounters.Counters) + , Counters(MakeIntrusive<TComputeRequestCounters>("Total", queryCounters.Counters->GetSubgroup("step", stepName))) + , TotalStartTime(TInstant::Now()) + {} + + TBaseComputeActor(const ::NMonitoring::TDynamicCounterPtr& baseCounters, const TString& stepName) + : BaseCounters(baseCounters) + , Counters(MakeIntrusive<TComputeRequestCounters>("Total", baseCounters->GetSubgroup("step", stepName))) , TotalStartTime(TInstant::Now()) {} @@ -53,7 +60,12 @@ public: return Counters->Counters; } + ::NMonitoring::TDynamicCounterPtr GetBaseCounters() const { + return BaseCounters; + } + private: + ::NMonitoring::TDynamicCounterPtr BaseCounters; TComputeRequestCountersPtr Counters; TInstant TotalStartTime; }; diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h index c7e99d2aae6..d3c97ac2147 100644 --- a/ydb/core/fq/libs/compute/ydb/events/events.h +++ b/ydb/core/fq/libs/compute/ydb/events/events.h @@ -37,6 +37,7 @@ struct TEvYdbCompute { EvExecuterResponse, EvStatusTrackerResponse, EvResultWriterResponse, + EvResultSetWriterResponse, EvResourcesCleanerResponse, EvFinalizerResponse, EvStopperResponse, @@ -89,13 +90,15 @@ struct TEvYdbCompute { , Status(status) {} - TEvGetOperationResponse(NYdb::NQuery::EExecStatus execStatus, NYql::TIssues issues) + TEvGetOperationResponse(NYdb::NQuery::EExecStatus execStatus, const TVector<Ydb::Query::ResultSetMeta>& resultSetsMeta, NYql::TIssues issues) : ExecStatus(execStatus) + , ResultSetsMeta(resultSetsMeta) , Issues(std::move(issues)) , Status(NYdb::EStatus::SUCCESS) {} NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified; + TVector<Ydb::Query::ResultSetMeta> ResultSetsMeta; NYql::TIssues Issues; NYdb::EStatus Status; }; @@ -283,6 +286,23 @@ struct TEvYdbCompute { NYql::TIssues Issues; NYdb::EStatus Status; }; + + + struct TEvResultSetWriterResponse : public NActors::TEventLocal<TEvResultSetWriterResponse, EvResultSetWriterResponse> { + TEvResultSetWriterResponse(NYql::TIssues issues, NYdb::EStatus status) + : Issues(std::move(issues)) + , Status(status) + {} + + TEvResultSetWriterResponse(int64_t rowsCount) + : Status(NYdb::EStatus::SUCCESS) + , RowsCount(rowsCount) + {} + + NYql::TIssues Issues; + NYdb::EStatus Status; + int64_t RowsCount = 0; + }; }; } diff --git a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp index 43bb445fc88..0f142492ac6 100644 --- a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp @@ -33,20 +33,18 @@ namespace NFq { using namespace NActors; using namespace NFq; -class TResultWriterActor : public TBaseComputeActor<TResultWriterActor> { +class TResultSetWriterActor : public TBaseComputeActor<TResultSetWriterActor> { public: enum ERequestType { RT_FETCH_SCRIPT_RESULT, - RT_WRITE_RESULT, - RT_PING, + RT_WRITE_RESULT_SET, RT_MAX }; class TCounters: public virtual TThrRefBase { std::array<TComputeRequestCountersPtr, RT_MAX> Requests = CreateArray<RT_MAX, TComputeRequestCountersPtr>({ { MakeIntrusive<TComputeRequestCounters>("FetchScriptResult") }, - { MakeIntrusive<TComputeRequestCounters>("WriteResult") }, - { MakeIntrusive<TComputeRequestCounters>("Ping") } + { MakeIntrusive<TComputeRequestCounters>("WriteResultSet") } }); ::NMonitoring::TDynamicCounterPtr Counters; @@ -65,52 +63,34 @@ public: } }; - TResultWriterActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const TActorId& pinger, const NKikimr::NOperationId::TOperationId& operationId, const ::NYql::NCommon::TServiceCounters& queryCounters) - : TBaseComputeActor(queryCounters, "ResultWriter") + TResultSetWriterActor(const TRunActorParams& params, int64_t resultSetId, const TActorId& parent, const TActorId& connector, const NKikimr::NOperationId::TOperationId& operationId, const ::NMonitoring::TDynamicCounterPtr& counters) + : TBaseComputeActor(counters, "ResultSetWriter") , Params(params) + , ResultSetId(resultSetId) , Parent(parent) , Connector(connector) - , Pinger(pinger) , OperationId(operationId) , Counters(GetStepCountersSubgroup()) {} - static constexpr char ActorName[] = "FQ_RESULT_WRITER_ACTOR"; + static constexpr char ActorName[] = "FQ_RESULT_SET_WRITER_ACTOR"; void Start() { - LOG_I("Start result writer actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status)); - Become(&TResultWriterActor::StateFunc); + LOG_I("Start result set writer actor, id: " << ResultSetId); + Become(&TResultSetWriterActor::StateFunc); SendFetchScriptResultRequest(); } STRICT_STFUNC(StateFunc, hFunc(TEvYdbCompute::TEvFetchScriptResultResponse, Handle); hFunc(NFq::TEvInternalService::TEvWriteResultResponse, Handle); - hFunc(TEvents::TEvForwardPingResponse, Handle); ) - void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) { - auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); - pingCounters->InFly->Dec(); - pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); - if (ev.Get()->Get()->Success) { - pingCounters->Ok->Inc(); - LOG_I("The result has been moved"); - Send(Parent, new TEvYdbCompute::TEvResultWriterResponse({}, NYdb::EStatus::SUCCESS)); - CompleteAndPassAway(); - } else { - pingCounters->Error->Inc(); - LOG_E("Move result error"); - Send(Parent, new TEvYdbCompute::TEvResultWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Move result error. OperationId: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR)); - FailedAndPassAway(); - } - } - void Handle(const TEvYdbCompute::TEvFetchScriptResultResponse::TPtr& ev) { const auto& response = *ev.Get()->Get(); if (response.Status != NYdb::EStatus::SUCCESS) { LOG_E("Can't fetch script result: " << ev->Get()->Issues.ToOneLineString()); - Send(Parent, new TEvYdbCompute::TEvResultWriterResponse(ev->Get()->Issues, NYdb::EStatus::INTERNAL_ERROR)); + Send(Parent, new TEvYdbCompute::TEvResultSetWriterResponse(ev->Get()->Issues, NYdb::EStatus::INTERNAL_ERROR)); FailedAndPassAway(); return; } @@ -124,27 +104,20 @@ public: auto chunk = CreateProtoRequestWithoutResultSet(Offset); Offset += response.ResultSet->RowsCount(); *chunk.mutable_result_set() = resultSetProto; - auto writeResultCounters = Counters.GetCounters(ERequestType::RT_WRITE_RESULT); + auto writeResultCounters = Counters.GetCounters(ERequestType::RT_WRITE_RESULT_SET); writeResultCounters->InFly->Inc(); Send(NFq::MakeInternalServiceActorId(), new NFq::TEvInternalService::TEvWriteResultRequest(std::move(chunk))); } if (!FetchToken) { - PingTaskRequest.mutable_result_id()->set_value(Params.ResultId); - PingTaskRequest.set_result_set_count(1); - auto& resultSetMeta = *PingTaskRequest.add_result_set_meta(); - resultSetMeta.set_rows_count(Offset); - for (const auto& column: resultSetProto.columns()) { - *resultSetMeta.add_column() = column; - } if (emptyResultSet) { - SendFinalPingRequest(); + SendReplyAndPassAway(); } } } void Handle(const NFq::TEvInternalService::TEvWriteResultResponse::TPtr& ev) { - auto writeResultCounters = Counters.GetCounters(ERequestType::RT_WRITE_RESULT); + auto writeResultCounters = Counters.GetCounters(ERequestType::RT_WRITE_RESULT_SET); writeResultCounters->InFly->Dec(); writeResultCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); if (ev.Get()->Get()->Status.IsSuccess()) { @@ -153,12 +126,12 @@ public: if (FetchToken) { SendFetchScriptResultRequest(); } else { - SendFinalPingRequest(); + SendReplyAndPassAway(); } } else { writeResultCounters->Error->Inc(); LOG_E("Error writing result for offset " << Offset); - Send(Parent, new TEvYdbCompute::TEvResultWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error writing result for offset " << Offset}}, NYdb::EStatus::INTERNAL_ERROR)); + Send(Parent, new TEvYdbCompute::TEvResultSetWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error writing result for offset " << Offset}}, NYdb::EStatus::INTERNAL_ERROR)); FailedAndPassAway(); } } @@ -167,36 +140,174 @@ public: auto fetchScriptResultCounters = Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT); fetchScriptResultCounters->InFly->Inc(); StartTime = TInstant::Now(); - Register(new TRetryActor<TEvYdbCompute::TEvFetchScriptResultRequest, TEvYdbCompute::TEvFetchScriptResultResponse, NKikimr::NOperationId::TOperationId, int64_t, TString>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, OperationId, 0, FetchToken)); + Register(new TRetryActor<TEvYdbCompute::TEvFetchScriptResultRequest, TEvYdbCompute::TEvFetchScriptResultResponse, NKikimr::NOperationId::TOperationId, int64_t, TString>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, OperationId, ResultSetId, FetchToken)); } - void SendFinalPingRequest() { - auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); - pingCounters->InFly->Inc(); - Send(Pinger, new TEvents::TEvForwardPingRequest(PingTaskRequest)); + void SendReplyAndPassAway() { + Send(Parent, new TEvYdbCompute::TEvResultSetWriterResponse(Offset)); + PassAway(); } Fq::Private::WriteTaskResultRequest CreateProtoRequestWithoutResultSet(ui64 startRowIndex) { Fq::Private::WriteTaskResultRequest protoReq; protoReq.set_owner_id(Params.Owner); protoReq.mutable_result_id()->set_value(Params.ResultId); + protoReq.set_result_set_id(ResultSetId); protoReq.set_offset(startRowIndex); - protoReq.set_result_set_id(0); - protoReq.set_request_id(0); *protoReq.mutable_deadline() = NProtoInterop::CastToProto(Params.Deadline); return protoReq; } private: TRunActorParams Params; + uint32_t ResultSetId = 0; TActorId Parent; TActorId Connector; - TActorId Pinger; NKikimr::NOperationId::TOperationId OperationId; TCounters Counters; TInstant StartTime; int64_t Offset = 0; TString FetchToken; +}; + +class TResultWriterActor : public TBaseComputeActor<TResultWriterActor> { +public: + enum ERequestType { + RT_GET_OPERATION, + RT_PING, + RT_MAX + }; + + class TCounters: public virtual TThrRefBase { + std::array<TComputeRequestCountersPtr, RT_MAX> Requests = CreateArray<RT_MAX, TComputeRequestCountersPtr>({ + { MakeIntrusive<TComputeRequestCounters>("GetOperation") }, + { MakeIntrusive<TComputeRequestCounters>("Ping") } + }); + + ::NMonitoring::TDynamicCounterPtr Counters; + + public: + explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters) + : Counters(counters) + { + for (auto& request: Requests) { + request->Register(Counters); + } + } + + TComputeRequestCountersPtr GetCounters(ERequestType type) { + return Requests[type]; + } + }; + + TResultWriterActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const TActorId& pinger, const NKikimr::NOperationId::TOperationId& operationId, const ::NYql::NCommon::TServiceCounters& queryCounters) + : TBaseComputeActor(queryCounters, "ResultWriter") + , Params(params) + , Parent(parent) + , Connector(connector) + , Pinger(pinger) + , OperationId(operationId) + , Counters(GetStepCountersSubgroup()) + {} + + static constexpr char ActorName[] = "FQ_RESULT_WRITER_ACTOR"; + + void Start() { + LOG_I("Start result writer actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status)); + Become(&TResultWriterActor::StateFunc); + SendGetOperation(); + } + + void SendGetOperation() { + Register(new TRetryActor<TEvYdbCompute::TEvGetOperationRequest, TEvYdbCompute::TEvGetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_GET_OPERATION), SelfId(), Connector, OperationId)); + } + + void WriteNextResultSet() { + if (CurrentResultSetId < (int64_t)PingTaskRequest.result_set_meta_size()) { + Register(new TResultSetWriterActor(Params, + CurrentResultSetId++, + SelfId(), + Connector, + OperationId, + GetBaseCounters())); + return; + } + + SendFinalPingRequest(); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvYdbCompute::TEvGetOperationResponse, Handle); + hFunc(TEvYdbCompute::TEvResultSetWriterResponse, Handle); + hFunc(TEvents::TEvForwardPingResponse, Handle); + ) + + void Handle(const TEvYdbCompute::TEvGetOperationResponse::TPtr& ev) { + const auto& response = *ev.Get()->Get(); + if (response.Status != NYdb::EStatus::SUCCESS) { + LOG_E("Can't get operation: " << ev->Get()->Issues.ToOneLineString()); + Send(Parent, new TEvYdbCompute::TEvResultWriterResponse(ev->Get()->Issues, ev->Get()->Status)); + FailedAndPassAway(); + return; + } + + for (const auto& resultSetMeta: ev.Get()->Get()->ResultSetsMeta) { + auto& meta = *PingTaskRequest.add_result_set_meta(); + for (const auto& column: resultSetMeta.columns()) { + *meta.add_column() = column; + } + meta.set_truncated(resultSetMeta.truncated()); + } + + WriteNextResultSet(); + } + + void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) { + auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); + pingCounters->InFly->Dec(); + pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); + if (ev.Get()->Get()->Success) { + pingCounters->Ok->Inc(); + LOG_I("The result has been moved"); + Send(Parent, new TEvYdbCompute::TEvResultWriterResponse({}, NYdb::EStatus::SUCCESS)); + CompleteAndPassAway(); + } else { + pingCounters->Error->Inc(); + LOG_E("Move result error"); + Send(Parent, new TEvYdbCompute::TEvResultWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Move result error. OperationId: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR)); + FailedAndPassAway(); + } + } + + void Handle(const TEvYdbCompute::TEvResultSetWriterResponse::TPtr& ev) { + const auto& response = *ev.Get()->Get(); + if (response.Status != NYdb::EStatus::SUCCESS) { + LOG_E("Can't fetch script result: " << ev->Get()->Issues.ToOneLineString()); + Send(Parent, new TEvYdbCompute::TEvResultWriterResponse(ev->Get()->Issues, NYdb::EStatus::INTERNAL_ERROR)); + FailedAndPassAway(); + return; + } + PingTaskRequest.mutable_result_set_meta(CurrentResultSetId - 1)->set_rows_count(ev->Get()->RowsCount); + WriteNextResultSet(); + } + + void SendFinalPingRequest() { + auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); + pingCounters->InFly->Inc(); + PingTaskRequest.set_result_set_count(PingTaskRequest.result_set_meta_size()); + Send(Pinger, new TEvents::TEvForwardPingRequest(PingTaskRequest)); + } + +private: + TRunActorParams Params; + int64_t CurrentResultSetId = 0; + TActorId Parent; + TActorId Connector; + TActorId Pinger; + NKikimr::NOperationId::TOperationId OperationId; + TCounters Counters; + TInstant StartTime; + TString FetchToken; Fq::Private::PingTaskRequest PingTaskRequest; }; diff --git a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp index 46af0743fe7..4765e00b236 100644 --- a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp @@ -66,7 +66,7 @@ public: try { auto response = future.ExtractValueSync(); if (response.Status().IsSuccess()) { - actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(response.Metadata().ExecStatus, response.Status().GetIssues()), 0, cookie); + actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(response.Metadata().ExecStatus, response.Metadata().ResultSetsMeta, response.Status().GetIssues()), 0, cookie); } else { actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(response.Status().GetIssues(), response.Status().GetStatus()), 0, cookie); } |