aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-07-24 13:02:40 +0300
committerhcpp <hcpp@ydb.tech>2023-07-24 13:02:40 +0300
commit17c4bafd1ceef077f18d709106abf746aead4267 (patch)
treee4d94c1b31a29b1e34a171efa6d5cede712a30ce
parent9908c93a1412479fe215dbfa2e10b17e1e7cd0f8 (diff)
downloadydb-17c4bafd1ceef077f18d709106abf746aead4267.tar.gz
several result sets have been supported
-rw-r--r--ydb/core/fq/libs/compute/ydb/base_compute_actor.h14
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/events.h22
-rw-r--r--ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp211
-rw-r--r--ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp2
4 files changed, 196 insertions, 53 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/base_compute_actor.h b/ydb/core/fq/libs/compute/ydb/base_compute_actor.h
index 8148271ec2c..a4fa8f363e3 100644
--- a/ydb/core/fq/libs/compute/ydb/base_compute_actor.h
+++ b/ydb/core/fq/libs/compute/ydb/base_compute_actor.h
@@ -20,7 +20,14 @@ public:
using TBase::PassAway;
TBaseComputeActor(const ::NYql::NCommon::TServiceCounters& queryCounters, const TString& stepName)
- : Counters(MakeIntrusive<TComputeRequestCounters>("Total", queryCounters.Counters->GetSubgroup("step", stepName)))
+ : BaseCounters(queryCounters.Counters)
+ , Counters(MakeIntrusive<TComputeRequestCounters>("Total", queryCounters.Counters->GetSubgroup("step", stepName)))
+ , TotalStartTime(TInstant::Now())
+ {}
+
+ TBaseComputeActor(const ::NMonitoring::TDynamicCounterPtr& baseCounters, const TString& stepName)
+ : BaseCounters(baseCounters)
+ , Counters(MakeIntrusive<TComputeRequestCounters>("Total", baseCounters->GetSubgroup("step", stepName)))
, TotalStartTime(TInstant::Now())
{}
@@ -53,7 +60,12 @@ public:
return Counters->Counters;
}
+ ::NMonitoring::TDynamicCounterPtr GetBaseCounters() const {
+ return BaseCounters;
+ }
+
private:
+ ::NMonitoring::TDynamicCounterPtr BaseCounters;
TComputeRequestCountersPtr Counters;
TInstant TotalStartTime;
};
diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h
index c7e99d2aae6..d3c97ac2147 100644
--- a/ydb/core/fq/libs/compute/ydb/events/events.h
+++ b/ydb/core/fq/libs/compute/ydb/events/events.h
@@ -37,6 +37,7 @@ struct TEvYdbCompute {
EvExecuterResponse,
EvStatusTrackerResponse,
EvResultWriterResponse,
+ EvResultSetWriterResponse,
EvResourcesCleanerResponse,
EvFinalizerResponse,
EvStopperResponse,
@@ -89,13 +90,15 @@ struct TEvYdbCompute {
, Status(status)
{}
- TEvGetOperationResponse(NYdb::NQuery::EExecStatus execStatus, NYql::TIssues issues)
+ TEvGetOperationResponse(NYdb::NQuery::EExecStatus execStatus, const TVector<Ydb::Query::ResultSetMeta>& resultSetsMeta, NYql::TIssues issues)
: ExecStatus(execStatus)
+ , ResultSetsMeta(resultSetsMeta)
, Issues(std::move(issues))
, Status(NYdb::EStatus::SUCCESS)
{}
NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified;
+ TVector<Ydb::Query::ResultSetMeta> ResultSetsMeta;
NYql::TIssues Issues;
NYdb::EStatus Status;
};
@@ -283,6 +286,23 @@ struct TEvYdbCompute {
NYql::TIssues Issues;
NYdb::EStatus Status;
};
+
+
+ struct TEvResultSetWriterResponse : public NActors::TEventLocal<TEvResultSetWriterResponse, EvResultSetWriterResponse> {
+ TEvResultSetWriterResponse(NYql::TIssues issues, NYdb::EStatus status)
+ : Issues(std::move(issues))
+ , Status(status)
+ {}
+
+ TEvResultSetWriterResponse(int64_t rowsCount)
+ : Status(NYdb::EStatus::SUCCESS)
+ , RowsCount(rowsCount)
+ {}
+
+ NYql::TIssues Issues;
+ NYdb::EStatus Status;
+ int64_t RowsCount = 0;
+ };
};
}
diff --git a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
index 43bb445fc88..0f142492ac6 100644
--- a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
@@ -33,20 +33,18 @@ namespace NFq {
using namespace NActors;
using namespace NFq;
-class TResultWriterActor : public TBaseComputeActor<TResultWriterActor> {
+class TResultSetWriterActor : public TBaseComputeActor<TResultSetWriterActor> {
public:
enum ERequestType {
RT_FETCH_SCRIPT_RESULT,
- RT_WRITE_RESULT,
- RT_PING,
+ RT_WRITE_RESULT_SET,
RT_MAX
};
class TCounters: public virtual TThrRefBase {
std::array<TComputeRequestCountersPtr, RT_MAX> Requests = CreateArray<RT_MAX, TComputeRequestCountersPtr>({
{ MakeIntrusive<TComputeRequestCounters>("FetchScriptResult") },
- { MakeIntrusive<TComputeRequestCounters>("WriteResult") },
- { MakeIntrusive<TComputeRequestCounters>("Ping") }
+ { MakeIntrusive<TComputeRequestCounters>("WriteResultSet") }
});
::NMonitoring::TDynamicCounterPtr Counters;
@@ -65,52 +63,34 @@ public:
}
};
- TResultWriterActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const TActorId& pinger, const NKikimr::NOperationId::TOperationId& operationId, const ::NYql::NCommon::TServiceCounters& queryCounters)
- : TBaseComputeActor(queryCounters, "ResultWriter")
+ TResultSetWriterActor(const TRunActorParams& params, int64_t resultSetId, const TActorId& parent, const TActorId& connector, const NKikimr::NOperationId::TOperationId& operationId, const ::NMonitoring::TDynamicCounterPtr& counters)
+ : TBaseComputeActor(counters, "ResultSetWriter")
, Params(params)
+ , ResultSetId(resultSetId)
, Parent(parent)
, Connector(connector)
- , Pinger(pinger)
, OperationId(operationId)
, Counters(GetStepCountersSubgroup())
{}
- static constexpr char ActorName[] = "FQ_RESULT_WRITER_ACTOR";
+ static constexpr char ActorName[] = "FQ_RESULT_SET_WRITER_ACTOR";
void Start() {
- LOG_I("Start result writer actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status));
- Become(&TResultWriterActor::StateFunc);
+ LOG_I("Start result set writer actor, id: " << ResultSetId);
+ Become(&TResultSetWriterActor::StateFunc);
SendFetchScriptResultRequest();
}
STRICT_STFUNC(StateFunc,
hFunc(TEvYdbCompute::TEvFetchScriptResultResponse, Handle);
hFunc(NFq::TEvInternalService::TEvWriteResultResponse, Handle);
- hFunc(TEvents::TEvForwardPingResponse, Handle);
)
- void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) {
- auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
- pingCounters->InFly->Dec();
- pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
- if (ev.Get()->Get()->Success) {
- pingCounters->Ok->Inc();
- LOG_I("The result has been moved");
- Send(Parent, new TEvYdbCompute::TEvResultWriterResponse({}, NYdb::EStatus::SUCCESS));
- CompleteAndPassAway();
- } else {
- pingCounters->Error->Inc();
- LOG_E("Move result error");
- Send(Parent, new TEvYdbCompute::TEvResultWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Move result error. OperationId: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR));
- FailedAndPassAway();
- }
- }
-
void Handle(const TEvYdbCompute::TEvFetchScriptResultResponse::TPtr& ev) {
const auto& response = *ev.Get()->Get();
if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_E("Can't fetch script result: " << ev->Get()->Issues.ToOneLineString());
- Send(Parent, new TEvYdbCompute::TEvResultWriterResponse(ev->Get()->Issues, NYdb::EStatus::INTERNAL_ERROR));
+ Send(Parent, new TEvYdbCompute::TEvResultSetWriterResponse(ev->Get()->Issues, NYdb::EStatus::INTERNAL_ERROR));
FailedAndPassAway();
return;
}
@@ -124,27 +104,20 @@ public:
auto chunk = CreateProtoRequestWithoutResultSet(Offset);
Offset += response.ResultSet->RowsCount();
*chunk.mutable_result_set() = resultSetProto;
- auto writeResultCounters = Counters.GetCounters(ERequestType::RT_WRITE_RESULT);
+ auto writeResultCounters = Counters.GetCounters(ERequestType::RT_WRITE_RESULT_SET);
writeResultCounters->InFly->Inc();
Send(NFq::MakeInternalServiceActorId(), new NFq::TEvInternalService::TEvWriteResultRequest(std::move(chunk)));
}
if (!FetchToken) {
- PingTaskRequest.mutable_result_id()->set_value(Params.ResultId);
- PingTaskRequest.set_result_set_count(1);
- auto& resultSetMeta = *PingTaskRequest.add_result_set_meta();
- resultSetMeta.set_rows_count(Offset);
- for (const auto& column: resultSetProto.columns()) {
- *resultSetMeta.add_column() = column;
- }
if (emptyResultSet) {
- SendFinalPingRequest();
+ SendReplyAndPassAway();
}
}
}
void Handle(const NFq::TEvInternalService::TEvWriteResultResponse::TPtr& ev) {
- auto writeResultCounters = Counters.GetCounters(ERequestType::RT_WRITE_RESULT);
+ auto writeResultCounters = Counters.GetCounters(ERequestType::RT_WRITE_RESULT_SET);
writeResultCounters->InFly->Dec();
writeResultCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
if (ev.Get()->Get()->Status.IsSuccess()) {
@@ -153,12 +126,12 @@ public:
if (FetchToken) {
SendFetchScriptResultRequest();
} else {
- SendFinalPingRequest();
+ SendReplyAndPassAway();
}
} else {
writeResultCounters->Error->Inc();
LOG_E("Error writing result for offset " << Offset);
- Send(Parent, new TEvYdbCompute::TEvResultWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error writing result for offset " << Offset}}, NYdb::EStatus::INTERNAL_ERROR));
+ Send(Parent, new TEvYdbCompute::TEvResultSetWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error writing result for offset " << Offset}}, NYdb::EStatus::INTERNAL_ERROR));
FailedAndPassAway();
}
}
@@ -167,36 +140,174 @@ public:
auto fetchScriptResultCounters = Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT);
fetchScriptResultCounters->InFly->Inc();
StartTime = TInstant::Now();
- Register(new TRetryActor<TEvYdbCompute::TEvFetchScriptResultRequest, TEvYdbCompute::TEvFetchScriptResultResponse, NKikimr::NOperationId::TOperationId, int64_t, TString>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, OperationId, 0, FetchToken));
+ Register(new TRetryActor<TEvYdbCompute::TEvFetchScriptResultRequest, TEvYdbCompute::TEvFetchScriptResultResponse, NKikimr::NOperationId::TOperationId, int64_t, TString>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, OperationId, ResultSetId, FetchToken));
}
- void SendFinalPingRequest() {
- auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
- pingCounters->InFly->Inc();
- Send(Pinger, new TEvents::TEvForwardPingRequest(PingTaskRequest));
+ void SendReplyAndPassAway() {
+ Send(Parent, new TEvYdbCompute::TEvResultSetWriterResponse(Offset));
+ PassAway();
}
Fq::Private::WriteTaskResultRequest CreateProtoRequestWithoutResultSet(ui64 startRowIndex) {
Fq::Private::WriteTaskResultRequest protoReq;
protoReq.set_owner_id(Params.Owner);
protoReq.mutable_result_id()->set_value(Params.ResultId);
+ protoReq.set_result_set_id(ResultSetId);
protoReq.set_offset(startRowIndex);
- protoReq.set_result_set_id(0);
- protoReq.set_request_id(0);
*protoReq.mutable_deadline() = NProtoInterop::CastToProto(Params.Deadline);
return protoReq;
}
private:
TRunActorParams Params;
+ uint32_t ResultSetId = 0;
TActorId Parent;
TActorId Connector;
- TActorId Pinger;
NKikimr::NOperationId::TOperationId OperationId;
TCounters Counters;
TInstant StartTime;
int64_t Offset = 0;
TString FetchToken;
+};
+
+class TResultWriterActor : public TBaseComputeActor<TResultWriterActor> {
+public:
+ enum ERequestType {
+ RT_GET_OPERATION,
+ RT_PING,
+ RT_MAX
+ };
+
+ class TCounters: public virtual TThrRefBase {
+ std::array<TComputeRequestCountersPtr, RT_MAX> Requests = CreateArray<RT_MAX, TComputeRequestCountersPtr>({
+ { MakeIntrusive<TComputeRequestCounters>("GetOperation") },
+ { MakeIntrusive<TComputeRequestCounters>("Ping") }
+ });
+
+ ::NMonitoring::TDynamicCounterPtr Counters;
+
+ public:
+ explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters)
+ : Counters(counters)
+ {
+ for (auto& request: Requests) {
+ request->Register(Counters);
+ }
+ }
+
+ TComputeRequestCountersPtr GetCounters(ERequestType type) {
+ return Requests[type];
+ }
+ };
+
+ TResultWriterActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const TActorId& pinger, const NKikimr::NOperationId::TOperationId& operationId, const ::NYql::NCommon::TServiceCounters& queryCounters)
+ : TBaseComputeActor(queryCounters, "ResultWriter")
+ , Params(params)
+ , Parent(parent)
+ , Connector(connector)
+ , Pinger(pinger)
+ , OperationId(operationId)
+ , Counters(GetStepCountersSubgroup())
+ {}
+
+ static constexpr char ActorName[] = "FQ_RESULT_WRITER_ACTOR";
+
+ void Start() {
+ LOG_I("Start result writer actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status));
+ Become(&TResultWriterActor::StateFunc);
+ SendGetOperation();
+ }
+
+ void SendGetOperation() {
+ Register(new TRetryActor<TEvYdbCompute::TEvGetOperationRequest, TEvYdbCompute::TEvGetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_GET_OPERATION), SelfId(), Connector, OperationId));
+ }
+
+ void WriteNextResultSet() {
+ if (CurrentResultSetId < (int64_t)PingTaskRequest.result_set_meta_size()) {
+ Register(new TResultSetWriterActor(Params,
+ CurrentResultSetId++,
+ SelfId(),
+ Connector,
+ OperationId,
+ GetBaseCounters()));
+ return;
+ }
+
+ SendFinalPingRequest();
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvYdbCompute::TEvGetOperationResponse, Handle);
+ hFunc(TEvYdbCompute::TEvResultSetWriterResponse, Handle);
+ hFunc(TEvents::TEvForwardPingResponse, Handle);
+ )
+
+ void Handle(const TEvYdbCompute::TEvGetOperationResponse::TPtr& ev) {
+ const auto& response = *ev.Get()->Get();
+ if (response.Status != NYdb::EStatus::SUCCESS) {
+ LOG_E("Can't get operation: " << ev->Get()->Issues.ToOneLineString());
+ Send(Parent, new TEvYdbCompute::TEvResultWriterResponse(ev->Get()->Issues, ev->Get()->Status));
+ FailedAndPassAway();
+ return;
+ }
+
+ for (const auto& resultSetMeta: ev.Get()->Get()->ResultSetsMeta) {
+ auto& meta = *PingTaskRequest.add_result_set_meta();
+ for (const auto& column: resultSetMeta.columns()) {
+ *meta.add_column() = column;
+ }
+ meta.set_truncated(resultSetMeta.truncated());
+ }
+
+ WriteNextResultSet();
+ }
+
+ void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) {
+ auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
+ pingCounters->InFly->Dec();
+ pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
+ if (ev.Get()->Get()->Success) {
+ pingCounters->Ok->Inc();
+ LOG_I("The result has been moved");
+ Send(Parent, new TEvYdbCompute::TEvResultWriterResponse({}, NYdb::EStatus::SUCCESS));
+ CompleteAndPassAway();
+ } else {
+ pingCounters->Error->Inc();
+ LOG_E("Move result error");
+ Send(Parent, new TEvYdbCompute::TEvResultWriterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Move result error. OperationId: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR));
+ FailedAndPassAway();
+ }
+ }
+
+ void Handle(const TEvYdbCompute::TEvResultSetWriterResponse::TPtr& ev) {
+ const auto& response = *ev.Get()->Get();
+ if (response.Status != NYdb::EStatus::SUCCESS) {
+ LOG_E("Can't fetch script result: " << ev->Get()->Issues.ToOneLineString());
+ Send(Parent, new TEvYdbCompute::TEvResultWriterResponse(ev->Get()->Issues, NYdb::EStatus::INTERNAL_ERROR));
+ FailedAndPassAway();
+ return;
+ }
+ PingTaskRequest.mutable_result_set_meta(CurrentResultSetId - 1)->set_rows_count(ev->Get()->RowsCount);
+ WriteNextResultSet();
+ }
+
+ void SendFinalPingRequest() {
+ auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
+ pingCounters->InFly->Inc();
+ PingTaskRequest.set_result_set_count(PingTaskRequest.result_set_meta_size());
+ Send(Pinger, new TEvents::TEvForwardPingRequest(PingTaskRequest));
+ }
+
+private:
+ TRunActorParams Params;
+ int64_t CurrentResultSetId = 0;
+ TActorId Parent;
+ TActorId Connector;
+ TActorId Pinger;
+ NKikimr::NOperationId::TOperationId OperationId;
+ TCounters Counters;
+ TInstant StartTime;
+ TString FetchToken;
Fq::Private::PingTaskRequest PingTaskRequest;
};
diff --git a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
index 46af0743fe7..4765e00b236 100644
--- a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
@@ -66,7 +66,7 @@ public:
try {
auto response = future.ExtractValueSync();
if (response.Status().IsSuccess()) {
- actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(response.Metadata().ExecStatus, response.Status().GetIssues()), 0, cookie);
+ actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(response.Metadata().ExecStatus, response.Metadata().ResultSetsMeta, response.Status().GetIssues()), 0, cookie);
} else {
actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(response.Status().GetIssues(), response.Status().GetStatus()), 0, cookie);
}