aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-06-26 20:20:47 +0300
committerhor911 <hor911@ydb.tech>2023-06-26 20:20:47 +0300
commitc9f2529bb452d88806592b231006347470252f5a (patch)
tree8779e48cc3798a22e1c1af28aa8a833becb017d6
parenta2346cb456fe01d95d8f02ee2db0b77ca2c427f6 (diff)
downloadydb-c9f2529bb452d88806592b231006347470252f5a.tar.gz
Save Script Results to Database
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/events.h10
-rw-r--r--ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp2
-rw-r--r--ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp2
-rw-r--r--ydb/core/grpc_services/query/rpc_fetch_script_results.cpp1
-rw-r--r--ydb/core/grpc_services/rpc_forget_operation.cpp6
-rw-r--r--ydb/core/kqp/common/events/script_executions.h26
-rw-r--r--ydb/core/kqp/common/simple/kqp_event_ids.h4
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.cpp418
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.h4
-rw-r--r--ydb/core/kqp/run_script_actor/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/run_script_actor/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/run_script_actor/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp184
-rw-r--r--ydb/core/kqp/run_script_actor/ya.make1
-rw-r--r--ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp12
-rw-r--r--ydb/core/kqp/ut/service/kqp_query_service_ut.cpp38
-rw-r--r--ydb/core/protos/kqp.proto1
-rw-r--r--ydb/public/api/protos/draft/ydb_query.proto3
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp14
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/client.h4
21 files changed, 656 insertions, 78 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h
index 1c041a59d2..de40ba19cb 100644
--- a/ydb/core/fq/libs/compute/ydb/events/events.h
+++ b/ydb/core/fq/libs/compute/ydb/events/events.h
@@ -97,13 +97,15 @@ struct TEvPrivate {
};
struct TEvFetchScriptResultRequest : public NActors::TEventLocal<TEvFetchScriptResultRequest, EvFetchScriptResultRequest> {
- TEvFetchScriptResultRequest(int64_t rowOffset, TString executionId)
- : RowOffset(rowOffset)
- , ExecutionId(std::move(executionId))
+ TEvFetchScriptResultRequest(TString executionId, int64_t resultSetId, int64_t rowOffset)
+ : ExecutionId(std::move(executionId))
+ , ResultSetId(resultSetId)
+ , RowOffset(rowOffset)
{}
- int64_t RowOffset = 0;
TString ExecutionId;
+ int64_t ResultSetId = 0;
+ int64_t RowOffset = 0;
};
struct TEvFetchScriptResultResponse : public NActors::TEventLocal<TEvFetchScriptResultResponse, EvFetchScriptResultResponse> {
diff --git a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
index 6f46f98cf8..04d5f1df55 100644
--- a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
@@ -159,7 +159,7 @@ public:
auto fetchScriptResultCounters = Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT);
fetchScriptResultCounters->InFly->Inc();
StartTime = TInstant::Now();
- Register(new TRetryActor<TEvPrivate::TEvFetchScriptResultRequest, TEvPrivate::TEvFetchScriptResultResponse, int64_t, TString>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, Offset, ExecutionId));
+ Register(new TRetryActor<TEvPrivate::TEvFetchScriptResultRequest, TEvPrivate::TEvFetchScriptResultResponse, TString, int64_t, int64_t>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, ExecutionId, 0, Offset));
}
Fq::Private::WriteTaskResultRequest CreateProtoRequestWithoutResultSet(ui64 startRowIndex) {
diff --git a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
index 164fe79513..50a820d172 100644
--- a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
@@ -74,7 +74,7 @@ public:
NYdb::NQuery::TFetchScriptResultsSettings settings;
settings.RowsOffset(ev->Get()->RowOffset);
QueryClient
- ->FetchScriptResults(ev->Get()->ExecutionId, settings)
+ ->FetchScriptResults(ev->Get()->ExecutionId, ev->Get()->ResultSetId, settings)
.Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) {
try {
auto response = future.ExtractValueSync();
diff --git a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
index 8a093102ce..a24d5ed9f1 100644
--- a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
+++ b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
@@ -85,6 +85,7 @@ private:
const auto* userReq = GetProtoRequest();
auto req = MakeHolder<NKqp::TEvKqp::TEvFetchScriptResultsRequest>();
+ req->Record.SetResultSetId(userReq->result_set_id());
req->Record.SetRowsOffset(userReq->rows_offset());
req->Record.SetRowsLimit(userReq->rows_limit());
diff --git a/ydb/core/grpc_services/rpc_forget_operation.cpp b/ydb/core/grpc_services/rpc_forget_operation.cpp
index c41c0f56a9..fd7048416e 100644
--- a/ydb/core/grpc_services/rpc_forget_operation.cpp
+++ b/ydb/core/grpc_services/rpc_forget_operation.cpp
@@ -88,10 +88,10 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
Reply(record.GetStatus(), record.GetIssues());
}
- void Handle(NKqp::TEvForgetScriptExecutionOperationResponce::TPtr& ev) {
+ void Handle(NKqp::TEvForgetScriptExecutionOperationResponse::TPtr& ev) {
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> issuesProto;
NYql::IssuesToMessage(ev->Get()->Issues, &issuesProto);
- LOG_D("Handle NKqp::TEvForgetScriptExecutionOperationResponce responce"
+ LOG_D("Handle NKqp::TEvForgetScriptExecutionOperationResponse response"
<< ": status# " << ev->Get()->Status);
Reply(ev->Get()->Status, issuesProto);
}
@@ -139,7 +139,7 @@ public:
hFunc(TEvExport::TEvForgetExportResponse, Handle);
hFunc(TEvImport::TEvForgetImportResponse, Handle);
hFunc(TEvIndexBuilder::TEvForgetResponse, Handle);
- hFunc(NKqp::TEvForgetScriptExecutionOperationResponce, Handle);
+ hFunc(NKqp::TEvForgetScriptExecutionOperationResponse, Handle);
default:
return StateBase(ev);
}
diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h
index 6b3903ab3b..2ed8218fef 100644
--- a/ydb/core/kqp/common/events/script_executions.h
+++ b/ydb/core/kqp/common/events/script_executions.h
@@ -24,8 +24,8 @@ struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForget
NOperationId::TOperationId OperationId;
};
-struct TEvForgetScriptExecutionOperationResponce : public NActors::TEventLocal<TEvForgetScriptExecutionOperationResponce, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperationResponce> {
- TEvForgetScriptExecutionOperationResponce(Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
+struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvForgetScriptExecutionOperationResponse, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperationResponse> {
+ TEvForgetScriptExecutionOperationResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
: Status(status)
, Issues(issues)
{
@@ -134,4 +134,26 @@ struct TEvScriptExecutionFinished : public NActors::TEventLocal<TEvScriptExecuti
NYql::TIssues Issues;
};
+struct TEvSaveScriptResultMetaFinished : public NActors::TEventLocal<TEvSaveScriptResultMetaFinished, TKqpScriptExecutionEvents::EvSaveScriptResultMetaFinished> {
+ TEvSaveScriptResultMetaFinished(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {})
+ : Status(status)
+ , Issues(std::move(issues))
+ {
+ }
+
+ Ydb::StatusIds::StatusCode Status;
+ NYql::TIssues Issues;
+};
+
+struct TEvSaveScriptResultFinished : public NActors::TEventLocal<TEvSaveScriptResultFinished, TKqpScriptExecutionEvents::EvSaveScriptResultFinished> {
+ TEvSaveScriptResultFinished(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {})
+ : Status(status)
+ , Issues(std::move(issues))
+ {
+ }
+
+ Ydb::StatusIds::StatusCode Status;
+ NYql::TIssues Issues;
+};
+
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h
index 21d543fdd8..678b748664 100644
--- a/ydb/core/kqp/common/simple/kqp_event_ids.h
+++ b/ydb/core/kqp/common/simple/kqp_event_ids.h
@@ -139,7 +139,9 @@ struct TKqpScriptExecutionEvents {
EvCancelScriptExecutionOperationResponse,
EvScriptExecutionFinished,
EvForgetScriptExecutionOperation,
- EvForgetScriptExecutionOperationResponce
+ EvForgetScriptExecutionOperationResponse,
+ EvSaveScriptResultMetaFinished,
+ EvSaveScriptResultFinished,
};
};
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
index b05e3fb812..3347992604 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
@@ -315,6 +315,7 @@ public:
Become(&TScriptExecutionsTablesCreator::StateFunc);
RunCreateScriptExecutions();
RunCreateScriptExecutionLeases();
+ RunCreateScriptResultSets();
}
private:
@@ -330,6 +331,7 @@ private:
}
void RunCreateScriptExecutions() {
+ TablesCreating++;
Register(
new TTableCreator(
{ ".metadata", "script_executions" },
@@ -350,6 +352,7 @@ private:
Col("store_deadline", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline.
Col("meta", NScheme::NTypeIds::JsonDocument),
Col("parameters", NScheme::NTypeIds::String), // TODO: store aparameters separately to support bigger storage.
+ Col("result_set_metas", NScheme::NTypeIds::JsonDocument),
},
{ "database", "execution_id" }
)
@@ -357,6 +360,7 @@ private:
}
void RunCreateScriptExecutionLeases() {
+ TablesCreating++;
Register(
new TTableCreator(
{ ".metadata", "script_execution_leases" },
@@ -371,8 +375,27 @@ private:
);
}
+ void RunCreateScriptResultSets() {
+ TablesCreating++;
+ Register(
+ new TTableCreator(
+ { ".metadata", "result_sets" },
+ {
+ Col("database", NScheme::NTypeIds::Text),
+ Col("execution_id", NScheme::NTypeIds::Text),
+ Col("result_set_id", NScheme::NTypeIds::Int32),
+ Col("row_id", NScheme::NTypeIds::Int64),
+ Col("expire_at", NScheme::NTypeIds::Timestamp),
+ Col("result_set", NScheme::NTypeIds::String),
+ },
+ { "database", "execution_id", "result_set_id", "row_id" }
+ )
+ );
+ }
+
void Handle(TEvPrivate::TEvCreateTableResponse::TPtr&) {
- if (++TablesCreated == 2) {
+ Y_VERIFY(TablesCreating > 0);
+ if (--TablesCreating == 0) {
Send(Owner, std::move(ResultEvent));
PassAway();
}
@@ -385,7 +408,7 @@ private:
private:
THolder<NActors::IEventBase> ResultEvent;
NActors::TActorId Owner;
- size_t TablesCreated = 0;
+ size_t TablesCreating = 0;
};
class TCreateScriptOperationQuery : public TQueryBase {
@@ -1003,7 +1026,7 @@ public:
}
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
- Send(Request->Sender, new TEvForgetScriptExecutionOperationResponce(status, std::move(issues)));
+ Send(Request->Sender, new TEvForgetScriptExecutionOperationResponse(status, std::move(issues)));
}
private:
@@ -1030,7 +1053,8 @@ public:
query_text,
syntax,
execution_mode,
- issues
+ issues,
+ result_set_metas
FROM `.metadata/script_executions`
WHERE database = $database AND execution_id = $execution_id;
@@ -1104,6 +1128,21 @@ public:
Issues = DeserializeIssues(*issuesSerialized);
}
+ const TMaybe<TString> serializedMetas = result.ColumnParser("result_set_metas").GetOptionalJsonDocument();
+ if (serializedMetas) {
+ NJson::TJsonValue value;
+ if (!NJson::ReadJsonTree(*serializedMetas, &value) || value.GetType() != NJson::JSON_ARRAY) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result meta is corrupted");
+ return;
+ }
+
+ for (auto i = 0; i < value.GetIntegerRobust(); i++) {
+ const NJson::TJsonValue* metaValue;
+ value.GetValuePointer(i, &metaValue);
+ NProtobufJson::Json2Proto(*metaValue, *metadata.add_result_set_meta());
+ }
+ }
+
bool finishing = false;
if (!operationStatus) {
// Check lease deadline
@@ -1524,11 +1563,368 @@ private:
}
PassAway();
}
-
private:
TEvKqp::TEvGetRunScriptActorRequest::TPtr Request;
};
+class TSaveScriptExecutionResultMetaQuery : public TQueryBase {
+public:
+ TSaveScriptExecutionResultMetaQuery(const TString& database, const TString& executionId, const TString& serializedMetas)
+ : Database(database), ExecutionId(executionId), SerializedMetas(serializedMetas)
+ {
+ }
+
+ void OnRunQuery() override {
+ TString sql = R"(
+ DECLARE $database AS Text;
+ DECLARE $execution_id AS Text;
+ DECLARE $result_set_metas AS JsonDocument;
+
+ UPDATE `.metadata/script_executions`
+ SET result_set_metas = $result_set_metas
+ WHERE database = $database
+ AND execution_id = $execution_id;
+ )";
+
+ NYdb::TParamsBuilder params;
+ params
+ .AddParam("$database")
+ .Utf8(Database)
+ .Build()
+ .AddParam("$execution_id")
+ .Utf8(ExecutionId)
+ .Build()
+ .AddParam("$result_set_metas")
+ .JsonDocument(SerializedMetas)
+ .Build();
+
+ RunDataQuery(sql, &params);
+ }
+
+ void OnQueryResult() override {
+ Finish();
+ }
+
+ void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
+ if (status == Ydb::StatusIds::SUCCESS) {
+ Send(Owner, new TEvSaveScriptResultMetaFinished(status));
+ } else {
+ Send(Owner, new TEvSaveScriptResultMetaFinished(status, std::move(issues)));
+ }
+ }
+
+private:
+ const TString Database;
+ const TString ExecutionId;
+ const TString SerializedMetas;
+};
+
+class TSaveScriptExecutionResultMetaActor : public TActorBootstrapped<TSaveScriptExecutionResultMetaActor> {
+public:
+ TSaveScriptExecutionResultMetaActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, const TString& serializedMetas)
+ : ReplyActorId(replyActorId), Database(database), ExecutionId(executionId), SerializedMetas(serializedMetas)
+ {
+ }
+
+ void Bootstrap() {
+ Register(new TSaveScriptExecutionResultMetaQuery(Database, ExecutionId, SerializedMetas));
+
+ Become(&TSaveScriptExecutionResultMetaActor::StateFunc);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvSaveScriptResultMetaFinished, Handle);
+ )
+
+ void Handle(TEvSaveScriptResultMetaFinished::TPtr& ev) {
+ Send(ev->Forward(ReplyActorId));
+ PassAway();
+ }
+
+private:
+ const NActors::TActorId ReplyActorId;
+ const TString Database;
+ const TString ExecutionId;
+ const TString SerializedMetas;
+};
+
+class TSaveScriptExecutionResultQuery : public TQueryBase {
+public:
+ TSaveScriptExecutionResultQuery(const TString& database, const TString& executionId, i32 resultSetId, TInstant expireAt, i64 firstRow, std::vector<TString>&& serializedRows)
+ : Database(database), ExecutionId(executionId), ResultSetId(resultSetId), ExpireAt(expireAt), FirstRow(firstRow), SerializedRows(std::move(serializedRows))
+ {
+ }
+
+ void OnRunQuery() override {
+ TString sql = R"(
+ DECLARE $database AS Text;
+ DECLARE $execution_id AS Text;
+ DECLARE $result_set_id AS Int32;
+ DECLARE $expire_at AS Timestamp;
+ DECLARE $items AS List<Struct<row_id:Int64,result_set:String>>;
+
+ UPSERT INTO `.metadata/result_sets`
+ SELECT $database as database, $execution_id as execution_id, $result_set_id as result_set_id,
+ T.row_id as row_id, $expire_at as expire_at, T.result_set as result_set
+ FROM AS_TABLE($items) AS T;
+ )";
+
+ NYdb::TParamsBuilder params;
+ params
+ .AddParam("$database")
+ .Utf8(Database)
+ .Build()
+ .AddParam("$execution_id")
+ .Utf8(ExecutionId)
+ .Build()
+ .AddParam("$result_set_id")
+ .Int32(ResultSetId)
+ .Build()
+ .AddParam("$expire_at")
+ .Timestamp(ExpireAt)
+ .Build();
+
+ auto& param = params
+ .AddParam("$items");
+
+ param
+ .BeginList();
+
+ auto row = FirstRow;
+ for(auto& serializedRow : SerializedRows) {
+ param
+ .AddListItem()
+ .BeginStruct()
+ .AddMember("row_id")
+ .Int64(row++)
+ .AddMember("result_set")
+ .String(serializedRow)
+ .EndStruct();
+ }
+ param
+ .EndList()
+ .Build();
+
+ RunDataQuery(sql, &params);
+ }
+
+ void OnQueryResult() override {
+ Finish();
+ }
+
+ void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
+ if (status == Ydb::StatusIds::SUCCESS) {
+ Send(Owner, new TEvSaveScriptResultFinished(status));
+ } else {
+ Send(Owner, new TEvSaveScriptResultFinished(status, std::move(issues)));
+ }
+ }
+
+private:
+ const TString Database;
+ const TString ExecutionId;
+ const i32 ResultSetId;
+ const TInstant ExpireAt;
+ const i64 FirstRow;
+ const std::vector<TString> SerializedRows;
+};
+
+class TSaveScriptExecutionResultActor : public TActorBootstrapped<TSaveScriptExecutionResultActor> {
+public:
+ TSaveScriptExecutionResultActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, i32 resultSetId, TInstant expireAt, i64 firstRow, std::vector<TString>&& serializedRows)
+ : ReplyActorId(replyActorId), Database(database), ExecutionId(executionId), ResultSetId(resultSetId), ExpireAt(expireAt), FirstRow(firstRow), SerializedRows(std::move(serializedRows))
+ {
+ }
+
+ void Bootstrap() {
+ Register(new TSaveScriptExecutionResultQuery(Database, ExecutionId, ResultSetId, ExpireAt, FirstRow, std::move(SerializedRows)));
+
+ Become(&TSaveScriptExecutionResultActor::StateFunc);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvSaveScriptResultFinished, Handle);
+ )
+
+ void Handle(TEvSaveScriptResultFinished::TPtr& ev) {
+ Send(ev->Forward(ReplyActorId));
+ PassAway();
+ }
+
+private:
+ const NActors::TActorId ReplyActorId;
+ const TString Database;
+ const TString ExecutionId;
+ const i32 ResultSetId;
+ const TInstant ExpireAt;
+ const i64 FirstRow;
+ std::vector<TString> SerializedRows;
+};
+
+class TGetScriptExecutionResultQuery : public TQueryBase {
+public:
+ TGetScriptExecutionResultQuery(const TString& database, const TString& executionId, i32 resultSetId, i64 offset, i64 limit)
+ : Database(database), ExecutionId(executionId), ResultSetId(resultSetId), Offset(offset), Limit(limit)
+ {
+ Response = MakeHolder<TEvKqp::TEvFetchScriptResultsResponse>();
+ Response->Record.SetResultSetIndex(ResultSetId);
+ }
+
+ void OnRunQuery() override {
+ TString sql = R"(
+ DECLARE $database AS Text;
+ DECLARE $execution_id AS Text;
+ DECLARE $result_set_id AS Int32;
+ DECLARE $offset AS Int64;
+ DECLARE $limit AS Uint64;
+
+ SELECT result_set_metas
+ FROM `.metadata/script_executions`
+ WHERE database = $database
+ AND execution_id = $execution_id;
+
+ SELECT row_id, result_set
+ FROM `.metadata/result_sets`
+ WHERE database = $database
+ AND execution_id = $execution_id
+ AND result_set_id = $result_set_id
+ AND row_id >= $offset
+ ORDER BY row_id
+ LIMIT $limit;
+ )";
+
+ NYdb::TParamsBuilder params;
+ params
+ .AddParam("$database")
+ .Utf8(Database)
+ .Build()
+ .AddParam("$execution_id")
+ .Utf8(ExecutionId)
+ .Build()
+ .AddParam("$result_set_id")
+ .Int32(ResultSetId)
+ .Build()
+ .AddParam("$offset")
+ .Int64(Offset)
+ .Build()
+ .AddParam("$limit")
+ .Uint64(Limit)
+ .Build();
+
+ RunDataQuery(sql, &params);
+ }
+
+ void OnQueryResult() override {
+ if (ResultSets.size() != 2) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
+ return;
+ }
+
+ { // columns
+ NYdb::TResultSetParser result(ResultSets[0]);
+
+ if (!result.TryNextRow()) {
+ Finish(Ydb::StatusIds::BAD_REQUEST, "Script execution not found");
+ return;
+ }
+
+ const TMaybe<TString> serializedMetas = result.ColumnParser("result_set_metas").GetOptionalJsonDocument();
+ if (!serializedMetas) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result meta is empty");
+ return;
+ }
+
+ NJson::TJsonValue value;
+ if (!NJson::ReadJsonTree(*serializedMetas, &value) || value.GetType() != NJson::JSON_ARRAY) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result meta is corrupted");
+ return;
+ }
+
+ const NJson::TJsonValue* metaValue;
+ if (!value.GetValuePointer(ResultSetId, &metaValue)) {
+ Finish(Ydb::StatusIds::BAD_REQUEST, "Result set index is invalid");
+ return;
+ }
+
+ Ydb::Query::ResultSetMeta meta;
+ NProtobufJson::Json2Proto(*metaValue, meta);
+
+ *Response->Record.MutableResultSet()->mutable_columns() = meta.columns();
+ Response->Record.MutableResultSet()->set_truncated(meta.truncated());
+ }
+
+ { // rows
+ NYdb::TResultSetParser result(ResultSets[1]);
+
+ while (result.TryNextRow()) {
+ const TMaybe<TString> serializedRow = result.ColumnParser("result_set").GetOptionalString();
+
+ if (!serializedRow) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result set row is empty");
+ return;
+ }
+
+ if (!Response->Record.MutableResultSet()->add_rows()->ParseFromString(*serializedRow)) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result set row is corrupted");
+ return;
+ }
+ }
+ }
+
+ Finish();
+ }
+
+ void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
+ Response->Record.SetStatus(status);
+ if (status != Ydb::StatusIds::SUCCESS) {
+ Response->Record.MutableResultSet()->Clear();
+ }
+ if (issues) {
+ NYql::IssuesToMessage(issues, Response->Record.MutableIssues());
+ }
+ Send(Owner, std::move(Response));
+ }
+
+private:
+ const TString Database;
+ const TString ExecutionId;
+ const i32 ResultSetId;
+ const i64 Offset;
+ const i64 Limit;
+ THolder<TEvKqp::TEvFetchScriptResultsResponse> Response;
+};
+
+class TGetScriptExecutionResultActor : public TActorBootstrapped<TGetScriptExecutionResultActor> {
+public:
+ TGetScriptExecutionResultActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, i32 resultSetId, i64 offset, i64 limit)
+ : ReplyActorId(replyActorId), Database(database), ExecutionId(executionId), ResultSetId(resultSetId), Offset(offset), Limit(limit)
+ {
+ }
+
+ void Bootstrap() {
+ Register(new TGetScriptExecutionResultQuery(Database, ExecutionId, ResultSetId, Offset, Limit));
+
+ Become(&TGetScriptExecutionResultActor::StateFunc);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvKqp::TEvFetchScriptResultsResponse, Handle);
+ )
+
+ void Handle(TEvKqp::TEvFetchScriptResultsResponse::TPtr& ev) {
+ Send(ev->Forward(ReplyActorId));
+ PassAway();
+ }
+
+private:
+ const NActors::TActorId ReplyActorId;
+ const TString Database;
+ const TString ExecutionId;
+ const i32 ResultSetId;
+ const i64 Offset;
+ const i64 Limit;
+};
+
} // anonymous namespace
NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev) {
@@ -1574,6 +1970,18 @@ NActors::IActor* CreateScriptLeaseUpdateActor(const TActorId& runScriptActorId,
return new TScriptLeaseUpdateActor(runScriptActorId, database, executionId, leaseDeadline);
}
+NActors::IActor* CreateSaveScriptExecutionResultMetaActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, const TString& serializedMeta) {
+ return new TSaveScriptExecutionResultMetaActor(replyActorId, database, executionId, serializedMeta);
+}
+
+NActors::IActor* CreateSaveScriptExecutionResultActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, i32 resultSetId, TInstant expireAt, i64 firstRow, std::vector<TString>&& serializedRows) {
+ return new TSaveScriptExecutionResultActor(replyActorId, database, executionId, resultSetId, expireAt, firstRow, std::move(serializedRows));
+}
+
+NActors::IActor* CreateGetScriptExecutionResultActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, i32 resultSetId, i64 offset, i64 limit) {
+ return new TGetScriptExecutionResultActor(replyActorId, database, executionId, resultSetId, offset, limit);
+}
+
namespace NPrivate {
NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& record, TDuration leaseDuration) {
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.h b/ydb/core/kqp/proxy_service/kqp_script_executions.h
index 89dcc41743..940ff189ed 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.h
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.h
@@ -38,5 +38,9 @@ NActors::IActor* CreateScriptExecutionFinisher(
// Updates lease deadline in database.
NActors::IActor* CreateScriptLeaseUpdateActor(const TActorId& runScriptActorId, const TString& database, const TString& executionId, const TInstant& leaseDeadline);
+// Store and fetch results.
+NActors::IActor* CreateSaveScriptExecutionResultMetaActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, const TString& serializedMeta);
+NActors::IActor* CreateSaveScriptExecutionResultActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, TInstant expireAt, i64 firstRow, std::vector<TString>&& serializedRows);
+NActors::IActor* CreateGetScriptExecutionResultActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, i64 offset, i64 limit);
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/run_script_actor/CMakeLists.darwin-x86_64.txt
index 10fc76583e..3c2be21b07 100644
--- a/ydb/core/kqp/run_script_actor/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/run_script_actor/CMakeLists.darwin-x86_64.txt
@@ -16,6 +16,7 @@ target_link_libraries(core-kqp-run_script_actor PUBLIC
contrib-libs-cxxsupp
yutil
cpp-actors-core
+ cpp-protobuf-json
ydb-core-base
ydb-core-protos
kqp-common-events
diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt
index 211a247244..b4e8d47592 100644
--- a/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt
@@ -17,6 +17,7 @@ target_link_libraries(core-kqp-run_script_actor PUBLIC
contrib-libs-cxxsupp
yutil
cpp-actors-core
+ cpp-protobuf-json
ydb-core-base
ydb-core-protos
kqp-common-events
diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/run_script_actor/CMakeLists.linux-x86_64.txt
index 211a247244..b4e8d47592 100644
--- a/ydb/core/kqp/run_script_actor/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/run_script_actor/CMakeLists.linux-x86_64.txt
@@ -17,6 +17,7 @@ target_link_libraries(core-kqp-run_script_actor PUBLIC
contrib-libs-cxxsupp
yutil
cpp-actors-core
+ cpp-protobuf-json
ydb-core-base
ydb-core-protos
kqp-common-events
diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/run_script_actor/CMakeLists.windows-x86_64.txt
index 10fc76583e..3c2be21b07 100644
--- a/ydb/core/kqp/run_script_actor/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/run_script_actor/CMakeLists.windows-x86_64.txt
@@ -16,6 +16,7 @@ target_link_libraries(core-kqp-run_script_actor PUBLIC
contrib-libs-cxxsupp
yutil
cpp-actors-core
+ cpp-protobuf-json
ydb-core-base
ydb-core-protos
kqp-common-events
diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
index 40652c88a6..3c66dd8380 100644
--- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
+++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
@@ -12,6 +12,8 @@
#include <library/cpp/actors/core/event_pb.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
+#include <library/cpp/protobuf/json/json2proto.h>
+#include <library/cpp/protobuf/json/proto2json.h>
#include <util/generic/size_literals.h>
@@ -69,6 +71,8 @@ private:
hFunc(TEvKqp::TEvCancelScriptExecutionRequest, Handle);
hFunc(TEvScriptExecutionFinished, Handle);
hFunc(TEvScriptLeaseUpdateResponse, Handle);
+ hFunc(TEvSaveScriptResultMetaFinished, Handle);
+ hFunc(TEvSaveScriptResultFinished, Handle);
)
void SendToKqpProxy(THolder<NActors::IEventBase> ev) {
@@ -167,15 +171,91 @@ private:
LOG_D("Send stream data ack"
<< ", seqNo: " << ev->Get()->Record.GetSeqNo()
+ << ", queryResultIndex: " << ev->Get()->Record.GetQueryResultIndex()
<< ", to: " << ev->Sender);
Send(ev->Sender, resp.Release());
- if (IsExecuting() && !IsTruncated()) {
- MergeResultSet(ev);
+ auto resultSetIndex = ev->Get()->Record.GetQueryResultIndex();
+
+ if (resultSetIndex > ExpireAt.size()) {
+ Issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected result set index"));
+ Finish(Ydb::StatusIds::INTERNAL_ERROR);
+ return;
+ }
+
+ if (resultSetIndex == ExpireAt.size()) {
+ // next ResultSet arrived
+ ResultSetRowCount.resize(resultSetIndex + 1);
+ ResultSetByteCount.resize(resultSetIndex + 1);
+ Truncated.resize(resultSetIndex + 1);
+ ExpireAt.resize(resultSetIndex + 1);
+ ExpireAt[resultSetIndex] = TInstant::Now() + TDuration::Days(1);
+ }
+
+ if (IsExecuting() && !Truncated[resultSetIndex]) {
+ auto& rowCount = ResultSetRowCount[resultSetIndex];
+ auto& byteCount = ResultSetByteCount[resultSetIndex];
+ auto firstRow = rowCount;
+ std::vector<TString> serializedRows;
+
+ for (const auto& row : ev->Get()->Record.GetResultSet().rows()) {
+ if (rowCount > RESULT_ROWS_LIMIT) {
+ Truncated[resultSetIndex] = true;
+ break;
+ }
+
+ auto serializedSize = row.ByteSizeLong();
+ if (byteCount + serializedSize > RESULT_SIZE_LIMIT) {
+ Truncated[resultSetIndex] = true;
+ break;
+ }
+
+ rowCount++;
+ byteCount += serializedSize;
+ serializedRows.push_back(row.SerializeAsString());
+ }
+
+ if (firstRow == 0 || Truncated[resultSetIndex]) {
+ Ydb::Query::ResultSetMeta meta;
+ *meta.mutable_columns() = ev->Get()->Record.GetResultSet().columns();
+ meta.set_truncated(Truncated[resultSetIndex]);
+
+ NJson::TJsonValue* value;
+ if (firstRow == 0) {
+ value = &ResultSetMetas[resultSetIndex];
+ ResultSetMetaArray.push_back(value);
+ } else {
+ value = ResultSetMetaArray[resultSetIndex];
+ }
+ NProtobufJson::Proto2Json(meta, *value, NProtobufJson::TProto2JsonConfig());
+
+ // can't save meta when previous request is not completed for TLI reasons
+ if (SaveResultMetaInflight) {
+ PendingResultMeta = true;
+ } else {
+ SaveResultMeta();
+ SaveResultMetaInflight++;
+ }
+ }
+
+ if (!serializedRows.empty()) {
+ Register(
+ CreateSaveScriptExecutionResultActor(SelfId(), Database, ExecutionId, resultSetIndex, ExpireAt[resultSetIndex], firstRow, std::move(serializedRows))
+ );
+ SaveResultInflight++;
+ }
}
}
+ void SaveResultMeta() {
+ NJsonWriter::TBuf sout;
+ sout.WriteJsonValue(&ResultSetMetas);
+ Register(
+ CreateSaveScriptExecutionResultMetaActor(SelfId(), Database, ExecutionId, sout.Str())
+ );
+ }
+
void Handle(TEvKqp::TEvQueryResponse::TPtr& ev) {
if (RunState != ERunState::Running) {
return;
@@ -198,27 +278,17 @@ private:
resp->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST);
resp->Record.AddIssues()->set_message("Script execution is cancelled");
}
- } else {
- if (!ResultSets.empty()) {
- resp->Record.SetResultSetIndex(0);
- resp->Record.MutableResultSet()->mutable_columns()->CopyFrom(ResultSets[0].columns());
-
- const ui64 rowsOffset = ev->Get()->Record.GetRowsOffset();
- const ui64 rowsLimit = ev->Get()->Record.GetRowsLimit();
- ui64 rowsAdded = 0;
- for (i64 row = static_cast<i64>(rowsOffset); row < ResultSets[0].rows_size(); ++row) {
- if (rowsAdded >= rowsLimit) {
- resp->Record.MutableResultSet()->set_truncated(true);
- break;
- }
- resp->Record.MutableResultSet()->add_rows()->CopyFrom(ResultSets[0].rows(row));
- }
- }
+ } else if (Status != Ydb::StatusIds::SUCCESS) {
resp->Record.SetStatus(Status);
for (const auto& issue : Issues) {
auto item = resp->Record.add_issues();
NYql::IssueToMessage(issue, item);
}
+ } else {
+ Register(
+ CreateGetScriptExecutionResultActor(ev->Sender, Database, ExecutionId, ev->Get()->Record.GetResultSetId(), ev->Get()->Record.GetRowsOffset(), ev->Get()->Record.GetRowsLimit())
+ );
+ return;
}
Send(ev->Sender, std::move(resp));
}
@@ -286,28 +356,28 @@ private:
CancelRequests.clear();
}
- void MergeResultSet(TEvKqpExecuter::TEvStreamData::TPtr& ev) {
- if (ResultSets.empty()) {
- ResultSets.emplace_back(ev->Get()->Record.GetResultSet());
+ void Handle(TEvSaveScriptResultMetaFinished::TPtr& ev) {
+ if (PendingResultMeta) {
+ PendingResultMeta = false;
+ SaveResultMeta();
return;
}
- if (ResultSets[0].columns_size() != ev->Get()->Record.GetResultSet().columns_size()) {
- Issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error"));
- Finish(Ydb::StatusIds::INTERNAL_ERROR);
- return;
- }
- size_t rowsAdded = 0;
- for (auto& row : *ev->Get()->Record.MutableResultSet()->mutable_rows()) {
- ResultSets[0].add_rows()->Swap(&row);
- ++rowsAdded;
- if (ResultSets[0].rows_size() >= RESULT_ROWS_LIMIT) {
- break;
- }
+
+ SaveResultMetaInflight--;
+ if (Status == Ydb::StatusIds::SUCCESS && ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
+ Status = ev->Get()->Status;
+ Issues.AddIssues(ev->Get()->Issues);
}
- if (ev->Get()->Record.GetResultSet().truncated() || ResultSets[0].rows_size() >= RESULT_ROWS_LIMIT || ResultSets[0].ByteSizeLong() >= RESULT_SIZE_LIMIT) {
- ResultSets[0].set_truncated(true);
+ CheckSaveInflight();
+ }
+
+ void Handle(TEvSaveScriptResultFinished::TPtr& ev) {
+ SaveResultInflight--;
+ if (Status == Ydb::StatusIds::SUCCESS && ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
+ Status = ev->Get()->Status;
+ Issues.AddIssues(ev->Get()->Issues);
}
- LOG_D("Received partial result. Rows added: " << rowsAdded << ". Truncated: " << IsTruncated());
+ CheckSaveInflight();
}
static Ydb::Query::ExecStatus GetExecStatusFromStatusCode(Ydb::StatusIds::StatusCode status) {
@@ -320,17 +390,33 @@ private:
}
}
- void Finish(Ydb::StatusIds::StatusCode status, ERunState runState = ERunState::Finishing) {
- RunState = runState;
- Status = status;
- Register(CreateScriptExecutionFinisher(ExecutionId, Database, LeaseGeneration, status, GetExecStatusFromStatusCode(status), Issues));
+ void CheckSaveInflight() {
+ if (Status == Ydb::StatusIds::SUCCESS && RunState == ERunState::Finishing && (SaveResultMetaInflight || SaveResultInflight)) {
+ // wait for save completions
+ return;
+ }
+
+ Register(CreateScriptExecutionFinisher(ExecutionId, Database, LeaseGeneration, Status, GetExecStatusFromStatusCode(Status), Issues));
if (RunState == ERunState::Cancelling) {
Issues.AddIssue("Script execution is cancelled");
- ResultSets.clear();
}
CloseSession();
}
+ void Finish(Ydb::StatusIds::StatusCode status, ERunState runState = ERunState::Finishing) {
+ RunState = runState;
+ Status = status;
+
+ // if query has no results, save empty json array
+ if (ResultSetMetaArray.empty()) {
+ ResultSetMetas.SetType(NJson::JSON_ARRAY);
+ SaveResultMeta();
+ SaveResultMetaInflight++;
+ } else {
+ CheckSaveInflight();
+ }
+ }
+
void PassAway() override {
CloseSession();
NActors::TActorBootstrapped<TRunScriptActor>::PassAway();
@@ -343,10 +429,6 @@ private:
&& RunState != ERunState::Cancelling;
}
- bool IsTruncated() const {
- return !ResultSets.empty() && ResultSets[0].truncated();
- }
-
private:
const TString ExecutionId;
const NKikimrKqp::TEvQueryRequest Request;
@@ -361,7 +443,17 @@ private:
// Result
NYql::TIssues Issues;
Ydb::StatusIds::StatusCode Status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
- std::vector<Ydb::ResultSet> ResultSets;
+
+ // Result
+ std::vector<ui64> ResultSetRowCount;
+ std::vector<ui64> ResultSetByteCount;
+ std::vector<bool> Truncated;
+ std::vector<TInstant> ExpireAt;
+ std::vector<NJson::TJsonValue*> ResultSetMetaArray;
+ NJson::TJsonValue ResultSetMetas;
+ ui32 SaveResultInflight = 0;
+ ui32 SaveResultMetaInflight = 0;
+ bool PendingResultMeta = false;
};
} // namespace
diff --git a/ydb/core/kqp/run_script_actor/ya.make b/ydb/core/kqp/run_script_actor/ya.make
index 8374348187..5ca4b2184f 100644
--- a/ydb/core/kqp/run_script_actor/ya.make
+++ b/ydb/core/kqp/run_script_actor/ya.make
@@ -6,6 +6,7 @@ SRCS(
PEERDIR(
library/cpp/actors/core
+ library/cpp/protobuf/json
ydb/core/base
ydb/core/protos
ydb/core/kqp/common/events
diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
index 36fc003d62..4fe2c48bf2 100644
--- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
@@ -129,7 +129,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
- TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync();
+ TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId, 0).ExtractValueSync();
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
TResultSetParser resultSet(results.ExtractResultSet());
@@ -254,7 +254,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
- TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync();
+ TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId, 0).ExtractValueSync();
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
TResultSetParser resultSet(results.ExtractResultSet());
@@ -328,7 +328,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
- TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync();
+ TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId, 0).ExtractValueSync();
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
TResultSetParser resultSet(results.ExtractResultSet());
@@ -392,7 +392,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
- TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync();
+ TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId, 0).ExtractValueSync();
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
TResultSetParser resultSet(results.ExtractResultSet());
@@ -471,7 +471,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
- TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync();
+ TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId, 0).ExtractValueSync();
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
TResultSetParser resultSet(results.ExtractResultSet());
@@ -524,7 +524,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
- TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync();
+ TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId, 0).ExtractValueSync();
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
TResultSetParser resultSet(results.ExtractResultSet());
diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
index 13daa1c6de..94a23904bd 100644
--- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
@@ -292,7 +292,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_STRING_CONTAINS(readyOp.Metadata().ScriptContent.Text, "SELECT 42");
auto checkFetch = [&](const auto& executionOrOperation) {
- TFetchScriptResultsResult results = db.FetchScriptResults(executionOrOperation).ExtractValueSync();
+ TFetchScriptResultsResult results = db.FetchScriptResults(executionOrOperation, 0).ExtractValueSync();
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
TResultSetParser resultSet(results.ExtractResultSet());
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1);
@@ -305,6 +305,42 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
checkFetch(scriptExecutionOperation);
}
+ Y_UNIT_TEST(ExecuteMultiScript) {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+
+ auto scriptExecutionOperation = db.ExecuteScript(R"(
+ SELECT 42; SELECT 101;
+ )").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
+ UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
+
+ NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
+ UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
+ UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecMode, EExecMode::Execute);
+ UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecutionId, scriptExecutionOperation.Metadata().ExecutionId);
+ UNIT_ASSERT_STRING_CONTAINS(readyOp.Metadata().ScriptContent.Text, "SELECT 42; SELECT 101;");
+
+ {
+ TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId, 0).ExtractValueSync();
+ UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
+ TResultSetParser resultSet(results.ExtractResultSet());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1);
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetInt32(), 42);
+ }
+ {
+ TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId, 1).ExtractValueSync();
+ UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
+ TResultSetParser resultSet(results.ExtractResultSet());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1);
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetInt32(), 101);
+ }
+ }
+
Y_UNIT_TEST(ListScriptExecutions) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 605e01cb57..c8cbec780f 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -624,6 +624,7 @@ message TKqpStreamLookupSettings {
}
message TEvFetchScriptResultsRequest {
+ optional uint64 ResultSetId = 3;
optional uint64 RowsOffset = 1;
optional uint64 RowsLimit = 2;
}
diff --git a/ydb/public/api/protos/draft/ydb_query.proto b/ydb/public/api/protos/draft/ydb_query.proto
index ad69e1500a..42822027e3 100644
--- a/ydb/public/api/protos/draft/ydb_query.proto
+++ b/ydb/public/api/protos/draft/ydb_query.proto
@@ -179,6 +179,7 @@ message ExecuteQueryRequest {
message ResultSetMeta {
repeated Ydb.Column columns = 1;
+ bool truncated = 2;
}
message ExecuteQueryResponsePart {
@@ -264,6 +265,8 @@ message FetchScriptResultsRequest {
string operation_id = 5 [(Ydb.length).le = 1024];
}
+ int64 result_set_id = 6;
+
oneof fetch {
string fetch_token = 2 [(Ydb.length).le = 1024];
}
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
index 462756dc6b..deb7d9e175 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
@@ -70,15 +70,17 @@ public:
return promise.GetFuture();
}
- TAsyncFetchScriptResultsResult FetchScriptResults(const TString& executionId, const TFetchScriptResultsSettings& settings) {
+ TAsyncFetchScriptResultsResult FetchScriptResults(const TString& executionId, int64_t resultSetId, const TFetchScriptResultsSettings& settings) {
auto request = MakeRequest<Ydb::Query::FetchScriptResultsRequest>();
request.set_execution_id(executionId);
+ request.set_result_set_id(resultSetId);
return FetchScriptResultsImpl(std::move(request), settings);
}
- TAsyncFetchScriptResultsResult FetchScriptResults(const TScriptExecutionOperation& scriptExecutionOperation, const TFetchScriptResultsSettings& settings) {
+ TAsyncFetchScriptResultsResult FetchScriptResults(const TScriptExecutionOperation& scriptExecutionOperation, int64_t resultSetId, const TFetchScriptResultsSettings& settings) {
auto request = MakeRequest<Ydb::Query::FetchScriptResultsRequest>();
request.set_operation_id(NKikimr::NOperationId::ProtoToString(scriptExecutionOperation.Id()));
+ request.set_result_set_id(resultSetId);
return FetchScriptResultsImpl(std::move(request), settings);
}
@@ -171,16 +173,16 @@ NThreading::TFuture<TScriptExecutionOperation> TQueryClient::ExecuteScript(const
return Impl_->ExecuteScript(script, settings);
}
-TAsyncFetchScriptResultsResult TQueryClient::FetchScriptResults(const TString& executionId,
+TAsyncFetchScriptResultsResult TQueryClient::FetchScriptResults(const TString& executionId, int64_t resultSetId,
const TFetchScriptResultsSettings& settings)
{
- return Impl_->FetchScriptResults(executionId, settings);
+ return Impl_->FetchScriptResults(executionId, resultSetId, settings);
}
-TAsyncFetchScriptResultsResult TQueryClient::FetchScriptResults(const TScriptExecutionOperation& scriptExecutionOperation,
+TAsyncFetchScriptResultsResult TQueryClient::FetchScriptResults(const TScriptExecutionOperation& scriptExecutionOperation, int64_t resultSetId,
const TFetchScriptResultsSettings& settings)
{
- return Impl_->FetchScriptResults(scriptExecutionOperation, settings);
+ return Impl_->FetchScriptResults(scriptExecutionOperation, resultSetId, settings);
}
} // namespace NYdb::NQuery
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.h b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h
index ae27967ba4..76ee566ba4 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/client.h
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h
@@ -38,10 +38,10 @@ public:
NThreading::TFuture<TScriptExecutionOperation> ExecuteScript(const TString& script,
const TExecuteScriptSettings& settings = TExecuteScriptSettings());
- TAsyncFetchScriptResultsResult FetchScriptResults(const TString& executionId,
+ TAsyncFetchScriptResultsResult FetchScriptResults(const TString& executionId, int64_t resultSetId,
const TFetchScriptResultsSettings& settings = TFetchScriptResultsSettings());
- TAsyncFetchScriptResultsResult FetchScriptResults(const TScriptExecutionOperation& scriptExecutionOperation,
+ TAsyncFetchScriptResultsResult FetchScriptResults(const TScriptExecutionOperation& scriptExecutionOperation, int64_t resultSetId,
const TFetchScriptResultsSettings& settings = TFetchScriptResultsSettings());
private: