diff options
author | hor911 <hor911@ydb.tech> | 2023-06-26 20:20:47 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-06-26 20:20:47 +0300 |
commit | c9f2529bb452d88806592b231006347470252f5a (patch) | |
tree | 8779e48cc3798a22e1c1af28aa8a833becb017d6 | |
parent | a2346cb456fe01d95d8f02ee2db0b77ca2c427f6 (diff) | |
download | ydb-c9f2529bb452d88806592b231006347470252f5a.tar.gz |
Save Script Results to Database
21 files changed, 656 insertions, 78 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h index 1c041a59d2..de40ba19cb 100644 --- a/ydb/core/fq/libs/compute/ydb/events/events.h +++ b/ydb/core/fq/libs/compute/ydb/events/events.h @@ -97,13 +97,15 @@ struct TEvPrivate { }; struct TEvFetchScriptResultRequest : public NActors::TEventLocal<TEvFetchScriptResultRequest, EvFetchScriptResultRequest> { - TEvFetchScriptResultRequest(int64_t rowOffset, TString executionId) - : RowOffset(rowOffset) - , ExecutionId(std::move(executionId)) + TEvFetchScriptResultRequest(TString executionId, int64_t resultSetId, int64_t rowOffset) + : ExecutionId(std::move(executionId)) + , ResultSetId(resultSetId) + , RowOffset(rowOffset) {} - int64_t RowOffset = 0; TString ExecutionId; + int64_t ResultSetId = 0; + int64_t RowOffset = 0; }; struct TEvFetchScriptResultResponse : public NActors::TEventLocal<TEvFetchScriptResultResponse, EvFetchScriptResultResponse> { diff --git a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp index 6f46f98cf8..04d5f1df55 100644 --- a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp @@ -159,7 +159,7 @@ public: auto fetchScriptResultCounters = Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT); fetchScriptResultCounters->InFly->Inc(); StartTime = TInstant::Now(); - Register(new TRetryActor<TEvPrivate::TEvFetchScriptResultRequest, TEvPrivate::TEvFetchScriptResultResponse, int64_t, TString>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, Offset, ExecutionId)); + Register(new TRetryActor<TEvPrivate::TEvFetchScriptResultRequest, TEvPrivate::TEvFetchScriptResultResponse, TString, int64_t, int64_t>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, ExecutionId, 0, Offset)); } Fq::Private::WriteTaskResultRequest CreateProtoRequestWithoutResultSet(ui64 startRowIndex) { diff --git a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp index 164fe79513..50a820d172 100644 --- a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp @@ -74,7 +74,7 @@ public: NYdb::NQuery::TFetchScriptResultsSettings settings; settings.RowsOffset(ev->Get()->RowOffset); QueryClient - ->FetchScriptResults(ev->Get()->ExecutionId, settings) + ->FetchScriptResults(ev->Get()->ExecutionId, ev->Get()->ResultSetId, settings) .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) { try { auto response = future.ExtractValueSync(); diff --git a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp index 8a093102ce..a24d5ed9f1 100644 --- a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp +++ b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp @@ -85,6 +85,7 @@ private: const auto* userReq = GetProtoRequest(); auto req = MakeHolder<NKqp::TEvKqp::TEvFetchScriptResultsRequest>(); + req->Record.SetResultSetId(userReq->result_set_id()); req->Record.SetRowsOffset(userReq->rows_offset()); req->Record.SetRowsLimit(userReq->rows_limit()); diff --git a/ydb/core/grpc_services/rpc_forget_operation.cpp b/ydb/core/grpc_services/rpc_forget_operation.cpp index c41c0f56a9..fd7048416e 100644 --- a/ydb/core/grpc_services/rpc_forget_operation.cpp +++ b/ydb/core/grpc_services/rpc_forget_operation.cpp @@ -88,10 +88,10 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC, Reply(record.GetStatus(), record.GetIssues()); } - void Handle(NKqp::TEvForgetScriptExecutionOperationResponce::TPtr& ev) { + void Handle(NKqp::TEvForgetScriptExecutionOperationResponse::TPtr& ev) { google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> issuesProto; NYql::IssuesToMessage(ev->Get()->Issues, &issuesProto); - LOG_D("Handle NKqp::TEvForgetScriptExecutionOperationResponce responce" + LOG_D("Handle NKqp::TEvForgetScriptExecutionOperationResponse response" << ": status# " << ev->Get()->Status); Reply(ev->Get()->Status, issuesProto); } @@ -139,7 +139,7 @@ public: hFunc(TEvExport::TEvForgetExportResponse, Handle); hFunc(TEvImport::TEvForgetImportResponse, Handle); hFunc(TEvIndexBuilder::TEvForgetResponse, Handle); - hFunc(NKqp::TEvForgetScriptExecutionOperationResponce, Handle); + hFunc(NKqp::TEvForgetScriptExecutionOperationResponse, Handle); default: return StateBase(ev); } diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h index 6b3903ab3b..2ed8218fef 100644 --- a/ydb/core/kqp/common/events/script_executions.h +++ b/ydb/core/kqp/common/events/script_executions.h @@ -24,8 +24,8 @@ struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForget NOperationId::TOperationId OperationId; }; -struct TEvForgetScriptExecutionOperationResponce : public NActors::TEventLocal<TEvForgetScriptExecutionOperationResponce, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperationResponce> { - TEvForgetScriptExecutionOperationResponce(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) +struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvForgetScriptExecutionOperationResponse, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperationResponse> { + TEvForgetScriptExecutionOperationResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) : Status(status) , Issues(issues) { @@ -134,4 +134,26 @@ struct TEvScriptExecutionFinished : public NActors::TEventLocal<TEvScriptExecuti NYql::TIssues Issues; }; +struct TEvSaveScriptResultMetaFinished : public NActors::TEventLocal<TEvSaveScriptResultMetaFinished, TKqpScriptExecutionEvents::EvSaveScriptResultMetaFinished> { + TEvSaveScriptResultMetaFinished(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) + : Status(status) + , Issues(std::move(issues)) + { + } + + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; +}; + +struct TEvSaveScriptResultFinished : public NActors::TEventLocal<TEvSaveScriptResultFinished, TKqpScriptExecutionEvents::EvSaveScriptResultFinished> { + TEvSaveScriptResultFinished(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) + : Status(status) + , Issues(std::move(issues)) + { + } + + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; +}; + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index 21d543fdd8..678b748664 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -139,7 +139,9 @@ struct TKqpScriptExecutionEvents { EvCancelScriptExecutionOperationResponse, EvScriptExecutionFinished, EvForgetScriptExecutionOperation, - EvForgetScriptExecutionOperationResponce + EvForgetScriptExecutionOperationResponse, + EvSaveScriptResultMetaFinished, + EvSaveScriptResultFinished, }; }; diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index b05e3fb812..3347992604 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -315,6 +315,7 @@ public: Become(&TScriptExecutionsTablesCreator::StateFunc); RunCreateScriptExecutions(); RunCreateScriptExecutionLeases(); + RunCreateScriptResultSets(); } private: @@ -330,6 +331,7 @@ private: } void RunCreateScriptExecutions() { + TablesCreating++; Register( new TTableCreator( { ".metadata", "script_executions" }, @@ -350,6 +352,7 @@ private: Col("store_deadline", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline. Col("meta", NScheme::NTypeIds::JsonDocument), Col("parameters", NScheme::NTypeIds::String), // TODO: store aparameters separately to support bigger storage. + Col("result_set_metas", NScheme::NTypeIds::JsonDocument), }, { "database", "execution_id" } ) @@ -357,6 +360,7 @@ private: } void RunCreateScriptExecutionLeases() { + TablesCreating++; Register( new TTableCreator( { ".metadata", "script_execution_leases" }, @@ -371,8 +375,27 @@ private: ); } + void RunCreateScriptResultSets() { + TablesCreating++; + Register( + new TTableCreator( + { ".metadata", "result_sets" }, + { + Col("database", NScheme::NTypeIds::Text), + Col("execution_id", NScheme::NTypeIds::Text), + Col("result_set_id", NScheme::NTypeIds::Int32), + Col("row_id", NScheme::NTypeIds::Int64), + Col("expire_at", NScheme::NTypeIds::Timestamp), + Col("result_set", NScheme::NTypeIds::String), + }, + { "database", "execution_id", "result_set_id", "row_id" } + ) + ); + } + void Handle(TEvPrivate::TEvCreateTableResponse::TPtr&) { - if (++TablesCreated == 2) { + Y_VERIFY(TablesCreating > 0); + if (--TablesCreating == 0) { Send(Owner, std::move(ResultEvent)); PassAway(); } @@ -385,7 +408,7 @@ private: private: THolder<NActors::IEventBase> ResultEvent; NActors::TActorId Owner; - size_t TablesCreated = 0; + size_t TablesCreating = 0; }; class TCreateScriptOperationQuery : public TQueryBase { @@ -1003,7 +1026,7 @@ public: } void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { - Send(Request->Sender, new TEvForgetScriptExecutionOperationResponce(status, std::move(issues))); + Send(Request->Sender, new TEvForgetScriptExecutionOperationResponse(status, std::move(issues))); } private: @@ -1030,7 +1053,8 @@ public: query_text, syntax, execution_mode, - issues + issues, + result_set_metas FROM `.metadata/script_executions` WHERE database = $database AND execution_id = $execution_id; @@ -1104,6 +1128,21 @@ public: Issues = DeserializeIssues(*issuesSerialized); } + const TMaybe<TString> serializedMetas = result.ColumnParser("result_set_metas").GetOptionalJsonDocument(); + if (serializedMetas) { + NJson::TJsonValue value; + if (!NJson::ReadJsonTree(*serializedMetas, &value) || value.GetType() != NJson::JSON_ARRAY) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result meta is corrupted"); + return; + } + + for (auto i = 0; i < value.GetIntegerRobust(); i++) { + const NJson::TJsonValue* metaValue; + value.GetValuePointer(i, &metaValue); + NProtobufJson::Json2Proto(*metaValue, *metadata.add_result_set_meta()); + } + } + bool finishing = false; if (!operationStatus) { // Check lease deadline @@ -1524,11 +1563,368 @@ private: } PassAway(); } - private: TEvKqp::TEvGetRunScriptActorRequest::TPtr Request; }; +class TSaveScriptExecutionResultMetaQuery : public TQueryBase { +public: + TSaveScriptExecutionResultMetaQuery(const TString& database, const TString& executionId, const TString& serializedMetas) + : Database(database), ExecutionId(executionId), SerializedMetas(serializedMetas) + { + } + + void OnRunQuery() override { + TString sql = R"( + DECLARE $database AS Text; + DECLARE $execution_id AS Text; + DECLARE $result_set_metas AS JsonDocument; + + UPDATE `.metadata/script_executions` + SET result_set_metas = $result_set_metas + WHERE database = $database + AND execution_id = $execution_id; + )"; + + NYdb::TParamsBuilder params; + params + .AddParam("$database") + .Utf8(Database) + .Build() + .AddParam("$execution_id") + .Utf8(ExecutionId) + .Build() + .AddParam("$result_set_metas") + .JsonDocument(SerializedMetas) + .Build(); + + RunDataQuery(sql, ¶ms); + } + + void OnQueryResult() override { + Finish(); + } + + void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { + if (status == Ydb::StatusIds::SUCCESS) { + Send(Owner, new TEvSaveScriptResultMetaFinished(status)); + } else { + Send(Owner, new TEvSaveScriptResultMetaFinished(status, std::move(issues))); + } + } + +private: + const TString Database; + const TString ExecutionId; + const TString SerializedMetas; +}; + +class TSaveScriptExecutionResultMetaActor : public TActorBootstrapped<TSaveScriptExecutionResultMetaActor> { +public: + TSaveScriptExecutionResultMetaActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, const TString& serializedMetas) + : ReplyActorId(replyActorId), Database(database), ExecutionId(executionId), SerializedMetas(serializedMetas) + { + } + + void Bootstrap() { + Register(new TSaveScriptExecutionResultMetaQuery(Database, ExecutionId, SerializedMetas)); + + Become(&TSaveScriptExecutionResultMetaActor::StateFunc); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvSaveScriptResultMetaFinished, Handle); + ) + + void Handle(TEvSaveScriptResultMetaFinished::TPtr& ev) { + Send(ev->Forward(ReplyActorId)); + PassAway(); + } + +private: + const NActors::TActorId ReplyActorId; + const TString Database; + const TString ExecutionId; + const TString SerializedMetas; +}; + +class TSaveScriptExecutionResultQuery : public TQueryBase { +public: + TSaveScriptExecutionResultQuery(const TString& database, const TString& executionId, i32 resultSetId, TInstant expireAt, i64 firstRow, std::vector<TString>&& serializedRows) + : Database(database), ExecutionId(executionId), ResultSetId(resultSetId), ExpireAt(expireAt), FirstRow(firstRow), SerializedRows(std::move(serializedRows)) + { + } + + void OnRunQuery() override { + TString sql = R"( + DECLARE $database AS Text; + DECLARE $execution_id AS Text; + DECLARE $result_set_id AS Int32; + DECLARE $expire_at AS Timestamp; + DECLARE $items AS List<Struct<row_id:Int64,result_set:String>>; + + UPSERT INTO `.metadata/result_sets` + SELECT $database as database, $execution_id as execution_id, $result_set_id as result_set_id, + T.row_id as row_id, $expire_at as expire_at, T.result_set as result_set + FROM AS_TABLE($items) AS T; + )"; + + NYdb::TParamsBuilder params; + params + .AddParam("$database") + .Utf8(Database) + .Build() + .AddParam("$execution_id") + .Utf8(ExecutionId) + .Build() + .AddParam("$result_set_id") + .Int32(ResultSetId) + .Build() + .AddParam("$expire_at") + .Timestamp(ExpireAt) + .Build(); + + auto& param = params + .AddParam("$items"); + + param + .BeginList(); + + auto row = FirstRow; + for(auto& serializedRow : SerializedRows) { + param + .AddListItem() + .BeginStruct() + .AddMember("row_id") + .Int64(row++) + .AddMember("result_set") + .String(serializedRow) + .EndStruct(); + } + param + .EndList() + .Build(); + + RunDataQuery(sql, ¶ms); + } + + void OnQueryResult() override { + Finish(); + } + + void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { + if (status == Ydb::StatusIds::SUCCESS) { + Send(Owner, new TEvSaveScriptResultFinished(status)); + } else { + Send(Owner, new TEvSaveScriptResultFinished(status, std::move(issues))); + } + } + +private: + const TString Database; + const TString ExecutionId; + const i32 ResultSetId; + const TInstant ExpireAt; + const i64 FirstRow; + const std::vector<TString> SerializedRows; +}; + +class TSaveScriptExecutionResultActor : public TActorBootstrapped<TSaveScriptExecutionResultActor> { +public: + TSaveScriptExecutionResultActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, i32 resultSetId, TInstant expireAt, i64 firstRow, std::vector<TString>&& serializedRows) + : ReplyActorId(replyActorId), Database(database), ExecutionId(executionId), ResultSetId(resultSetId), ExpireAt(expireAt), FirstRow(firstRow), SerializedRows(std::move(serializedRows)) + { + } + + void Bootstrap() { + Register(new TSaveScriptExecutionResultQuery(Database, ExecutionId, ResultSetId, ExpireAt, FirstRow, std::move(SerializedRows))); + + Become(&TSaveScriptExecutionResultActor::StateFunc); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvSaveScriptResultFinished, Handle); + ) + + void Handle(TEvSaveScriptResultFinished::TPtr& ev) { + Send(ev->Forward(ReplyActorId)); + PassAway(); + } + +private: + const NActors::TActorId ReplyActorId; + const TString Database; + const TString ExecutionId; + const i32 ResultSetId; + const TInstant ExpireAt; + const i64 FirstRow; + std::vector<TString> SerializedRows; +}; + +class TGetScriptExecutionResultQuery : public TQueryBase { +public: + TGetScriptExecutionResultQuery(const TString& database, const TString& executionId, i32 resultSetId, i64 offset, i64 limit) + : Database(database), ExecutionId(executionId), ResultSetId(resultSetId), Offset(offset), Limit(limit) + { + Response = MakeHolder<TEvKqp::TEvFetchScriptResultsResponse>(); + Response->Record.SetResultSetIndex(ResultSetId); + } + + void OnRunQuery() override { + TString sql = R"( + DECLARE $database AS Text; + DECLARE $execution_id AS Text; + DECLARE $result_set_id AS Int32; + DECLARE $offset AS Int64; + DECLARE $limit AS Uint64; + + SELECT result_set_metas + FROM `.metadata/script_executions` + WHERE database = $database + AND execution_id = $execution_id; + + SELECT row_id, result_set + FROM `.metadata/result_sets` + WHERE database = $database + AND execution_id = $execution_id + AND result_set_id = $result_set_id + AND row_id >= $offset + ORDER BY row_id + LIMIT $limit; + )"; + + NYdb::TParamsBuilder params; + params + .AddParam("$database") + .Utf8(Database) + .Build() + .AddParam("$execution_id") + .Utf8(ExecutionId) + .Build() + .AddParam("$result_set_id") + .Int32(ResultSetId) + .Build() + .AddParam("$offset") + .Int64(Offset) + .Build() + .AddParam("$limit") + .Uint64(Limit) + .Build(); + + RunDataQuery(sql, ¶ms); + } + + void OnQueryResult() override { + if (ResultSets.size() != 2) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); + return; + } + + { // columns + NYdb::TResultSetParser result(ResultSets[0]); + + if (!result.TryNextRow()) { + Finish(Ydb::StatusIds::BAD_REQUEST, "Script execution not found"); + return; + } + + const TMaybe<TString> serializedMetas = result.ColumnParser("result_set_metas").GetOptionalJsonDocument(); + if (!serializedMetas) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result meta is empty"); + return; + } + + NJson::TJsonValue value; + if (!NJson::ReadJsonTree(*serializedMetas, &value) || value.GetType() != NJson::JSON_ARRAY) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result meta is corrupted"); + return; + } + + const NJson::TJsonValue* metaValue; + if (!value.GetValuePointer(ResultSetId, &metaValue)) { + Finish(Ydb::StatusIds::BAD_REQUEST, "Result set index is invalid"); + return; + } + + Ydb::Query::ResultSetMeta meta; + NProtobufJson::Json2Proto(*metaValue, meta); + + *Response->Record.MutableResultSet()->mutable_columns() = meta.columns(); + Response->Record.MutableResultSet()->set_truncated(meta.truncated()); + } + + { // rows + NYdb::TResultSetParser result(ResultSets[1]); + + while (result.TryNextRow()) { + const TMaybe<TString> serializedRow = result.ColumnParser("result_set").GetOptionalString(); + + if (!serializedRow) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result set row is empty"); + return; + } + + if (!Response->Record.MutableResultSet()->add_rows()->ParseFromString(*serializedRow)) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result set row is corrupted"); + return; + } + } + } + + Finish(); + } + + void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { + Response->Record.SetStatus(status); + if (status != Ydb::StatusIds::SUCCESS) { + Response->Record.MutableResultSet()->Clear(); + } + if (issues) { + NYql::IssuesToMessage(issues, Response->Record.MutableIssues()); + } + Send(Owner, std::move(Response)); + } + +private: + const TString Database; + const TString ExecutionId; + const i32 ResultSetId; + const i64 Offset; + const i64 Limit; + THolder<TEvKqp::TEvFetchScriptResultsResponse> Response; +}; + +class TGetScriptExecutionResultActor : public TActorBootstrapped<TGetScriptExecutionResultActor> { +public: + TGetScriptExecutionResultActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, i32 resultSetId, i64 offset, i64 limit) + : ReplyActorId(replyActorId), Database(database), ExecutionId(executionId), ResultSetId(resultSetId), Offset(offset), Limit(limit) + { + } + + void Bootstrap() { + Register(new TGetScriptExecutionResultQuery(Database, ExecutionId, ResultSetId, Offset, Limit)); + + Become(&TGetScriptExecutionResultActor::StateFunc); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvKqp::TEvFetchScriptResultsResponse, Handle); + ) + + void Handle(TEvKqp::TEvFetchScriptResultsResponse::TPtr& ev) { + Send(ev->Forward(ReplyActorId)); + PassAway(); + } + +private: + const NActors::TActorId ReplyActorId; + const TString Database; + const TString ExecutionId; + const i32 ResultSetId; + const i64 Offset; + const i64 Limit; +}; + } // anonymous namespace NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev) { @@ -1574,6 +1970,18 @@ NActors::IActor* CreateScriptLeaseUpdateActor(const TActorId& runScriptActorId, return new TScriptLeaseUpdateActor(runScriptActorId, database, executionId, leaseDeadline); } +NActors::IActor* CreateSaveScriptExecutionResultMetaActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, const TString& serializedMeta) { + return new TSaveScriptExecutionResultMetaActor(replyActorId, database, executionId, serializedMeta); +} + +NActors::IActor* CreateSaveScriptExecutionResultActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, i32 resultSetId, TInstant expireAt, i64 firstRow, std::vector<TString>&& serializedRows) { + return new TSaveScriptExecutionResultActor(replyActorId, database, executionId, resultSetId, expireAt, firstRow, std::move(serializedRows)); +} + +NActors::IActor* CreateGetScriptExecutionResultActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, i32 resultSetId, i64 offset, i64 limit) { + return new TGetScriptExecutionResultActor(replyActorId, database, executionId, resultSetId, offset, limit); +} + namespace NPrivate { NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& record, TDuration leaseDuration) { diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.h b/ydb/core/kqp/proxy_service/kqp_script_executions.h index 89dcc41743..940ff189ed 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.h +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.h @@ -38,5 +38,9 @@ NActors::IActor* CreateScriptExecutionFinisher( // Updates lease deadline in database. NActors::IActor* CreateScriptLeaseUpdateActor(const TActorId& runScriptActorId, const TString& database, const TString& executionId, const TInstant& leaseDeadline); +// Store and fetch results. +NActors::IActor* CreateSaveScriptExecutionResultMetaActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, const TString& serializedMeta); +NActors::IActor* CreateSaveScriptExecutionResultActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, TInstant expireAt, i64 firstRow, std::vector<TString>&& serializedRows); +NActors::IActor* CreateGetScriptExecutionResultActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, i64 offset, i64 limit); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/run_script_actor/CMakeLists.darwin-x86_64.txt index 10fc76583e..3c2be21b07 100644 --- a/ydb/core/kqp/run_script_actor/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/run_script_actor/CMakeLists.darwin-x86_64.txt @@ -16,6 +16,7 @@ target_link_libraries(core-kqp-run_script_actor PUBLIC contrib-libs-cxxsupp yutil cpp-actors-core + cpp-protobuf-json ydb-core-base ydb-core-protos kqp-common-events diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt index 211a247244..b4e8d47592 100644 --- a/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt @@ -17,6 +17,7 @@ target_link_libraries(core-kqp-run_script_actor PUBLIC contrib-libs-cxxsupp yutil cpp-actors-core + cpp-protobuf-json ydb-core-base ydb-core-protos kqp-common-events diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/run_script_actor/CMakeLists.linux-x86_64.txt index 211a247244..b4e8d47592 100644 --- a/ydb/core/kqp/run_script_actor/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/run_script_actor/CMakeLists.linux-x86_64.txt @@ -17,6 +17,7 @@ target_link_libraries(core-kqp-run_script_actor PUBLIC contrib-libs-cxxsupp yutil cpp-actors-core + cpp-protobuf-json ydb-core-base ydb-core-protos kqp-common-events diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/run_script_actor/CMakeLists.windows-x86_64.txt index 10fc76583e..3c2be21b07 100644 --- a/ydb/core/kqp/run_script_actor/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/run_script_actor/CMakeLists.windows-x86_64.txt @@ -16,6 +16,7 @@ target_link_libraries(core-kqp-run_script_actor PUBLIC contrib-libs-cxxsupp yutil cpp-actors-core + cpp-protobuf-json ydb-core-base ydb-core-protos kqp-common-events diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp index 40652c88a6..3c66dd8380 100644 --- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp +++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp @@ -12,6 +12,8 @@ #include <library/cpp/actors/core/event_pb.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> +#include <library/cpp/protobuf/json/json2proto.h> +#include <library/cpp/protobuf/json/proto2json.h> #include <util/generic/size_literals.h> @@ -69,6 +71,8 @@ private: hFunc(TEvKqp::TEvCancelScriptExecutionRequest, Handle); hFunc(TEvScriptExecutionFinished, Handle); hFunc(TEvScriptLeaseUpdateResponse, Handle); + hFunc(TEvSaveScriptResultMetaFinished, Handle); + hFunc(TEvSaveScriptResultFinished, Handle); ) void SendToKqpProxy(THolder<NActors::IEventBase> ev) { @@ -167,15 +171,91 @@ private: LOG_D("Send stream data ack" << ", seqNo: " << ev->Get()->Record.GetSeqNo() + << ", queryResultIndex: " << ev->Get()->Record.GetQueryResultIndex() << ", to: " << ev->Sender); Send(ev->Sender, resp.Release()); - if (IsExecuting() && !IsTruncated()) { - MergeResultSet(ev); + auto resultSetIndex = ev->Get()->Record.GetQueryResultIndex(); + + if (resultSetIndex > ExpireAt.size()) { + Issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected result set index")); + Finish(Ydb::StatusIds::INTERNAL_ERROR); + return; + } + + if (resultSetIndex == ExpireAt.size()) { + // next ResultSet arrived + ResultSetRowCount.resize(resultSetIndex + 1); + ResultSetByteCount.resize(resultSetIndex + 1); + Truncated.resize(resultSetIndex + 1); + ExpireAt.resize(resultSetIndex + 1); + ExpireAt[resultSetIndex] = TInstant::Now() + TDuration::Days(1); + } + + if (IsExecuting() && !Truncated[resultSetIndex]) { + auto& rowCount = ResultSetRowCount[resultSetIndex]; + auto& byteCount = ResultSetByteCount[resultSetIndex]; + auto firstRow = rowCount; + std::vector<TString> serializedRows; + + for (const auto& row : ev->Get()->Record.GetResultSet().rows()) { + if (rowCount > RESULT_ROWS_LIMIT) { + Truncated[resultSetIndex] = true; + break; + } + + auto serializedSize = row.ByteSizeLong(); + if (byteCount + serializedSize > RESULT_SIZE_LIMIT) { + Truncated[resultSetIndex] = true; + break; + } + + rowCount++; + byteCount += serializedSize; + serializedRows.push_back(row.SerializeAsString()); + } + + if (firstRow == 0 || Truncated[resultSetIndex]) { + Ydb::Query::ResultSetMeta meta; + *meta.mutable_columns() = ev->Get()->Record.GetResultSet().columns(); + meta.set_truncated(Truncated[resultSetIndex]); + + NJson::TJsonValue* value; + if (firstRow == 0) { + value = &ResultSetMetas[resultSetIndex]; + ResultSetMetaArray.push_back(value); + } else { + value = ResultSetMetaArray[resultSetIndex]; + } + NProtobufJson::Proto2Json(meta, *value, NProtobufJson::TProto2JsonConfig()); + + // can't save meta when previous request is not completed for TLI reasons + if (SaveResultMetaInflight) { + PendingResultMeta = true; + } else { + SaveResultMeta(); + SaveResultMetaInflight++; + } + } + + if (!serializedRows.empty()) { + Register( + CreateSaveScriptExecutionResultActor(SelfId(), Database, ExecutionId, resultSetIndex, ExpireAt[resultSetIndex], firstRow, std::move(serializedRows)) + ); + SaveResultInflight++; + } } } + void SaveResultMeta() { + NJsonWriter::TBuf sout; + sout.WriteJsonValue(&ResultSetMetas); + Register( + CreateSaveScriptExecutionResultMetaActor(SelfId(), Database, ExecutionId, sout.Str()) + ); + } + void Handle(TEvKqp::TEvQueryResponse::TPtr& ev) { if (RunState != ERunState::Running) { return; @@ -198,27 +278,17 @@ private: resp->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST); resp->Record.AddIssues()->set_message("Script execution is cancelled"); } - } else { - if (!ResultSets.empty()) { - resp->Record.SetResultSetIndex(0); - resp->Record.MutableResultSet()->mutable_columns()->CopyFrom(ResultSets[0].columns()); - - const ui64 rowsOffset = ev->Get()->Record.GetRowsOffset(); - const ui64 rowsLimit = ev->Get()->Record.GetRowsLimit(); - ui64 rowsAdded = 0; - for (i64 row = static_cast<i64>(rowsOffset); row < ResultSets[0].rows_size(); ++row) { - if (rowsAdded >= rowsLimit) { - resp->Record.MutableResultSet()->set_truncated(true); - break; - } - resp->Record.MutableResultSet()->add_rows()->CopyFrom(ResultSets[0].rows(row)); - } - } + } else if (Status != Ydb::StatusIds::SUCCESS) { resp->Record.SetStatus(Status); for (const auto& issue : Issues) { auto item = resp->Record.add_issues(); NYql::IssueToMessage(issue, item); } + } else { + Register( + CreateGetScriptExecutionResultActor(ev->Sender, Database, ExecutionId, ev->Get()->Record.GetResultSetId(), ev->Get()->Record.GetRowsOffset(), ev->Get()->Record.GetRowsLimit()) + ); + return; } Send(ev->Sender, std::move(resp)); } @@ -286,28 +356,28 @@ private: CancelRequests.clear(); } - void MergeResultSet(TEvKqpExecuter::TEvStreamData::TPtr& ev) { - if (ResultSets.empty()) { - ResultSets.emplace_back(ev->Get()->Record.GetResultSet()); + void Handle(TEvSaveScriptResultMetaFinished::TPtr& ev) { + if (PendingResultMeta) { + PendingResultMeta = false; + SaveResultMeta(); return; } - if (ResultSets[0].columns_size() != ev->Get()->Record.GetResultSet().columns_size()) { - Issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error")); - Finish(Ydb::StatusIds::INTERNAL_ERROR); - return; - } - size_t rowsAdded = 0; - for (auto& row : *ev->Get()->Record.MutableResultSet()->mutable_rows()) { - ResultSets[0].add_rows()->Swap(&row); - ++rowsAdded; - if (ResultSets[0].rows_size() >= RESULT_ROWS_LIMIT) { - break; - } + + SaveResultMetaInflight--; + if (Status == Ydb::StatusIds::SUCCESS && ev->Get()->Status != Ydb::StatusIds::SUCCESS) { + Status = ev->Get()->Status; + Issues.AddIssues(ev->Get()->Issues); } - if (ev->Get()->Record.GetResultSet().truncated() || ResultSets[0].rows_size() >= RESULT_ROWS_LIMIT || ResultSets[0].ByteSizeLong() >= RESULT_SIZE_LIMIT) { - ResultSets[0].set_truncated(true); + CheckSaveInflight(); + } + + void Handle(TEvSaveScriptResultFinished::TPtr& ev) { + SaveResultInflight--; + if (Status == Ydb::StatusIds::SUCCESS && ev->Get()->Status != Ydb::StatusIds::SUCCESS) { + Status = ev->Get()->Status; + Issues.AddIssues(ev->Get()->Issues); } - LOG_D("Received partial result. Rows added: " << rowsAdded << ". Truncated: " << IsTruncated()); + CheckSaveInflight(); } static Ydb::Query::ExecStatus GetExecStatusFromStatusCode(Ydb::StatusIds::StatusCode status) { @@ -320,17 +390,33 @@ private: } } - void Finish(Ydb::StatusIds::StatusCode status, ERunState runState = ERunState::Finishing) { - RunState = runState; - Status = status; - Register(CreateScriptExecutionFinisher(ExecutionId, Database, LeaseGeneration, status, GetExecStatusFromStatusCode(status), Issues)); + void CheckSaveInflight() { + if (Status == Ydb::StatusIds::SUCCESS && RunState == ERunState::Finishing && (SaveResultMetaInflight || SaveResultInflight)) { + // wait for save completions + return; + } + + Register(CreateScriptExecutionFinisher(ExecutionId, Database, LeaseGeneration, Status, GetExecStatusFromStatusCode(Status), Issues)); if (RunState == ERunState::Cancelling) { Issues.AddIssue("Script execution is cancelled"); - ResultSets.clear(); } CloseSession(); } + void Finish(Ydb::StatusIds::StatusCode status, ERunState runState = ERunState::Finishing) { + RunState = runState; + Status = status; + + // if query has no results, save empty json array + if (ResultSetMetaArray.empty()) { + ResultSetMetas.SetType(NJson::JSON_ARRAY); + SaveResultMeta(); + SaveResultMetaInflight++; + } else { + CheckSaveInflight(); + } + } + void PassAway() override { CloseSession(); NActors::TActorBootstrapped<TRunScriptActor>::PassAway(); @@ -343,10 +429,6 @@ private: && RunState != ERunState::Cancelling; } - bool IsTruncated() const { - return !ResultSets.empty() && ResultSets[0].truncated(); - } - private: const TString ExecutionId; const NKikimrKqp::TEvQueryRequest Request; @@ -361,7 +443,17 @@ private: // Result NYql::TIssues Issues; Ydb::StatusIds::StatusCode Status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; - std::vector<Ydb::ResultSet> ResultSets; + + // Result + std::vector<ui64> ResultSetRowCount; + std::vector<ui64> ResultSetByteCount; + std::vector<bool> Truncated; + std::vector<TInstant> ExpireAt; + std::vector<NJson::TJsonValue*> ResultSetMetaArray; + NJson::TJsonValue ResultSetMetas; + ui32 SaveResultInflight = 0; + ui32 SaveResultMetaInflight = 0; + bool PendingResultMeta = false; }; } // namespace diff --git a/ydb/core/kqp/run_script_actor/ya.make b/ydb/core/kqp/run_script_actor/ya.make index 8374348187..5ca4b2184f 100644 --- a/ydb/core/kqp/run_script_actor/ya.make +++ b/ydb/core/kqp/run_script_actor/ya.make @@ -6,6 +6,7 @@ SRCS( PEERDIR( library/cpp/actors/core + library/cpp/protobuf/json ydb/core/base ydb/core/protos ydb/core/kqp/common/events diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp index 36fc003d62..4fe2c48bf2 100644 --- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp @@ -129,7 +129,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync(); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId, 0).ExtractValueSync(); UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); TResultSetParser resultSet(results.ExtractResultSet()); @@ -254,7 +254,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync(); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId, 0).ExtractValueSync(); UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); TResultSetParser resultSet(results.ExtractResultSet()); @@ -328,7 +328,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync(); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId, 0).ExtractValueSync(); UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); TResultSetParser resultSet(results.ExtractResultSet()); @@ -392,7 +392,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync(); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId, 0).ExtractValueSync(); UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); TResultSetParser resultSet(results.ExtractResultSet()); @@ -471,7 +471,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync(); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId, 0).ExtractValueSync(); UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); TResultSetParser resultSet(results.ExtractResultSet()); @@ -524,7 +524,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); - TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync(); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId, 0).ExtractValueSync(); UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); TResultSetParser resultSet(results.ExtractResultSet()); diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp index 13daa1c6de..94a23904bd 100644 --- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp @@ -292,7 +292,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_STRING_CONTAINS(readyOp.Metadata().ScriptContent.Text, "SELECT 42"); auto checkFetch = [&](const auto& executionOrOperation) { - TFetchScriptResultsResult results = db.FetchScriptResults(executionOrOperation).ExtractValueSync(); + TFetchScriptResultsResult results = db.FetchScriptResults(executionOrOperation, 0).ExtractValueSync(); UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); TResultSetParser resultSet(results.ExtractResultSet()); UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1); @@ -305,6 +305,42 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { checkFetch(scriptExecutionOperation); } + Y_UNIT_TEST(ExecuteMultiScript) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + auto scriptExecutionOperation = db.ExecuteScript(R"( + SELECT 42; SELECT 101; + )").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecMode, EExecMode::Execute); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecutionId, scriptExecutionOperation.Metadata().ExecutionId); + UNIT_ASSERT_STRING_CONTAINS(readyOp.Metadata().ScriptContent.Text, "SELECT 42; SELECT 101;"); + + { + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId, 0).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1); + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetInt32(), 42); + } + { + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId, 1).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1); + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetInt32(), 101); + } + } + Y_UNIT_TEST(ListScriptExecutions) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient(); diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 605e01cb57..c8cbec780f 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -624,6 +624,7 @@ message TKqpStreamLookupSettings { } message TEvFetchScriptResultsRequest { + optional uint64 ResultSetId = 3; optional uint64 RowsOffset = 1; optional uint64 RowsLimit = 2; } diff --git a/ydb/public/api/protos/draft/ydb_query.proto b/ydb/public/api/protos/draft/ydb_query.proto index ad69e1500a..42822027e3 100644 --- a/ydb/public/api/protos/draft/ydb_query.proto +++ b/ydb/public/api/protos/draft/ydb_query.proto @@ -179,6 +179,7 @@ message ExecuteQueryRequest { message ResultSetMeta { repeated Ydb.Column columns = 1; + bool truncated = 2; } message ExecuteQueryResponsePart { @@ -264,6 +265,8 @@ message FetchScriptResultsRequest { string operation_id = 5 [(Ydb.length).le = 1024]; } + int64 result_set_id = 6; + oneof fetch { string fetch_token = 2 [(Ydb.length).le = 1024]; } diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp index 462756dc6b..deb7d9e175 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp @@ -70,15 +70,17 @@ public: return promise.GetFuture(); } - TAsyncFetchScriptResultsResult FetchScriptResults(const TString& executionId, const TFetchScriptResultsSettings& settings) { + TAsyncFetchScriptResultsResult FetchScriptResults(const TString& executionId, int64_t resultSetId, const TFetchScriptResultsSettings& settings) { auto request = MakeRequest<Ydb::Query::FetchScriptResultsRequest>(); request.set_execution_id(executionId); + request.set_result_set_id(resultSetId); return FetchScriptResultsImpl(std::move(request), settings); } - TAsyncFetchScriptResultsResult FetchScriptResults(const TScriptExecutionOperation& scriptExecutionOperation, const TFetchScriptResultsSettings& settings) { + TAsyncFetchScriptResultsResult FetchScriptResults(const TScriptExecutionOperation& scriptExecutionOperation, int64_t resultSetId, const TFetchScriptResultsSettings& settings) { auto request = MakeRequest<Ydb::Query::FetchScriptResultsRequest>(); request.set_operation_id(NKikimr::NOperationId::ProtoToString(scriptExecutionOperation.Id())); + request.set_result_set_id(resultSetId); return FetchScriptResultsImpl(std::move(request), settings); } @@ -171,16 +173,16 @@ NThreading::TFuture<TScriptExecutionOperation> TQueryClient::ExecuteScript(const return Impl_->ExecuteScript(script, settings); } -TAsyncFetchScriptResultsResult TQueryClient::FetchScriptResults(const TString& executionId, +TAsyncFetchScriptResultsResult TQueryClient::FetchScriptResults(const TString& executionId, int64_t resultSetId, const TFetchScriptResultsSettings& settings) { - return Impl_->FetchScriptResults(executionId, settings); + return Impl_->FetchScriptResults(executionId, resultSetId, settings); } -TAsyncFetchScriptResultsResult TQueryClient::FetchScriptResults(const TScriptExecutionOperation& scriptExecutionOperation, +TAsyncFetchScriptResultsResult TQueryClient::FetchScriptResults(const TScriptExecutionOperation& scriptExecutionOperation, int64_t resultSetId, const TFetchScriptResultsSettings& settings) { - return Impl_->FetchScriptResults(scriptExecutionOperation, settings); + return Impl_->FetchScriptResults(scriptExecutionOperation, resultSetId, settings); } } // namespace NYdb::NQuery diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.h b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h index ae27967ba4..76ee566ba4 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/client.h +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h @@ -38,10 +38,10 @@ public: NThreading::TFuture<TScriptExecutionOperation> ExecuteScript(const TString& script, const TExecuteScriptSettings& settings = TExecuteScriptSettings()); - TAsyncFetchScriptResultsResult FetchScriptResults(const TString& executionId, + TAsyncFetchScriptResultsResult FetchScriptResults(const TString& executionId, int64_t resultSetId, const TFetchScriptResultsSettings& settings = TFetchScriptResultsSettings()); - TAsyncFetchScriptResultsResult FetchScriptResults(const TScriptExecutionOperation& scriptExecutionOperation, + TAsyncFetchScriptResultsResult FetchScriptResults(const TScriptExecutionOperation& scriptExecutionOperation, int64_t resultSetId, const TFetchScriptResultsSettings& settings = TFetchScriptResultsSettings()); private: |