aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/grpc_services/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.linux.txt1
-rw-r--r--ydb/core/grpc_services/query/query_helpers.h89
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_query.cpp44
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_script.cpp65
-rw-r--r--ydb/core/grpc_services/query/rpc_fetch_script_results.cpp159
-rw-r--r--ydb/core/grpc_services/query/service_query.h1
-rw-r--r--ydb/core/grpc_services/rpc_execute_data_query.cpp4
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp4
-rw-r--r--ydb/core/kqp/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/kqp/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/CMakeLists.linux.txt1
-rw-r--r--ydb/core/kqp/common/kqp.h37
-rw-r--r--ydb/core/kqp/common/kqp_event_ids.h4
-rw-r--r--ydb/core/kqp/proxy_service/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/proxy_service/CMakeLists.linux.txt1
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp10
-rw-r--r--ydb/core/kqp/run_script_actor/CMakeLists.darwin.txt25
-rw-r--r--ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt26
-rw-r--r--ydb/core/kqp/run_script_actor/CMakeLists.linux.txt26
-rw-r--r--ydb/core/kqp/run_script_actor/CMakeLists.txt15
-rw-r--r--ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp172
-rw-r--r--ydb/core/kqp/run_script_actor/kqp_run_script_actor.h14
-rw-r--r--ydb/core/kqp/ut/service/kqp_query_service_ut.cpp21
-rw-r--r--ydb/core/protos/kqp.proto12
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp53
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/client.h3
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp9
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/query.h46
-rw-r--r--ydb/services/ydb/ydb_query.cpp6
32 files changed, 795 insertions, 59 deletions
diff --git a/ydb/core/grpc_services/CMakeLists.darwin.txt b/ydb/core/grpc_services/CMakeLists.darwin.txt
index 927b117349..3dc6b63a01 100644
--- a/ydb/core/grpc_services/CMakeLists.darwin.txt
+++ b/ydb/core/grpc_services/CMakeLists.darwin.txt
@@ -126,4 +126,5 @@ target_sources(ydb-core-grpc_services PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/table_settings.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_query.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_script.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
)
diff --git a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
index d4495d7980..bb6c3dc35a 100644
--- a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
@@ -127,4 +127,5 @@ target_sources(ydb-core-grpc_services PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/table_settings.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_query.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_script.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
)
diff --git a/ydb/core/grpc_services/CMakeLists.linux.txt b/ydb/core/grpc_services/CMakeLists.linux.txt
index d4495d7980..bb6c3dc35a 100644
--- a/ydb/core/grpc_services/CMakeLists.linux.txt
+++ b/ydb/core/grpc_services/CMakeLists.linux.txt
@@ -127,4 +127,5 @@ target_sources(ydb-core-grpc_services PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/table_settings.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_query.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_script.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
)
diff --git a/ydb/core/grpc_services/query/query_helpers.h b/ydb/core/grpc_services/query/query_helpers.h
new file mode 100644
index 0000000000..c706ee3416
--- /dev/null
+++ b/ydb/core/grpc_services/query/query_helpers.h
@@ -0,0 +1,89 @@
+#pragma once
+#include <ydb/core/base/kikimr_issue.h>
+#include <ydb/core/grpc_services/rpc_kqp_base.h>
+#include <ydb/public/api/protos/draft/ydb_query.pb.h>
+
+#include <utility>
+
+namespace NKikimr::NGRpcService {
+
+namespace NQueryHelpersPrivate {
+
+inline bool FillQueryContent(const Ydb::Query::ExecuteQueryRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest, NYql::TIssues& issues) {
+ switch (req.query_case()) {
+ case Ydb::Query::ExecuteQueryRequest::kQueryContent:
+ if (!CheckQuery(req.query_content().text(), issues)) {
+ return false;
+ }
+
+ kqpRequest.MutableRequest()->SetQuery(req.query_content().text());
+ return true;
+
+ default:
+ issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query option"));
+ return false;
+ }
+}
+
+inline bool FillQueryContent(const Ydb::Query::ExecuteScriptRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest, NYql::TIssues& issues) {
+ switch (req.script_case()) {
+ case Ydb::Query::ExecuteScriptRequest::kScriptContent:
+ if (!CheckQuery(req.script_content().text(), issues)) {
+ return false;
+ }
+
+ kqpRequest.MutableRequest()->SetQuery(req.script_content().text());
+ return true;
+
+ case Ydb::Query::ExecuteScriptRequest::kScriptId:
+ issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Execution by script id is not supported yet"));
+ return false;
+
+ default:
+ issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query option"));
+ return false;
+ }
+}
+
+inline NKikimrKqp::EQueryType GetQueryType(const Ydb::Query::ExecuteQueryRequest&) {
+ return NKikimrKqp::QUERY_TYPE_SQL_QUERY;
+}
+
+inline NKikimrKqp::EQueryType GetQueryType(const Ydb::Query::ExecuteScriptRequest&) {
+ return NKikimrKqp::QUERY_TYPE_SQL_QUERY; // TODO: make new query type
+}
+
+} // namespace NQueryHelpersPrivate
+
+template <class TRequestProto>
+std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest(
+ const TRequestProto& req, NKikimrKqp::TEvQueryRequest& kqpRequest)
+{
+ kqpRequest.MutableRequest()->MutableYdbParameters()->insert(req.parameters().begin(), req.parameters().end());
+ switch (req.exec_mode()) {
+ case Ydb::Query::EXEC_MODE_EXECUTE:
+ kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ break;
+ default: {
+ NYql::TIssues issues;
+ issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query mode"));
+ return {Ydb::StatusIds::BAD_REQUEST, issues};
+ }
+ }
+
+ kqpRequest.MutableRequest()->SetType(NQueryHelpersPrivate::GetQueryType(req));
+ kqpRequest.MutableRequest()->SetKeepSession(false);
+
+ // TODO: Use tx control from request.
+ kqpRequest.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
+ kqpRequest.MutableRequest()->MutableTxControl()->set_commit_tx(true);
+
+ NYql::TIssues issues;
+ if (!NQueryHelpersPrivate::FillQueryContent(req, kqpRequest, issues)) {
+ return {Ydb::StatusIds::BAD_REQUEST, issues};
+ }
+
+ return {Ydb::StatusIds::SUCCESS, {}};
+}
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp
index e34e7f53ef..96271f2c68 100644
--- a/ydb/core/grpc_services/query/rpc_execute_query.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp
@@ -1,4 +1,5 @@
#include "service_query.h"
+#include "query_helpers.h"
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
@@ -19,49 +20,6 @@ using namespace NActors;
using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteQueryRequest,
Ydb::Query::ExecuteQueryResponsePart>;
-std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest(
- const Ydb::Query::ExecuteQueryRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest)
-{
- kqpRequest.MutableRequest()->MutableYdbParameters()->insert(req.parameters().begin(), req.parameters().end());
- switch (req.exec_mode()) {
- case Ydb::Query::EXEC_MODE_EXECUTE:
- kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
- break;
- default: {
- NYql::TIssues issues;
- issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query mode"));
- return {Ydb::StatusIds::BAD_REQUEST, issues};
- }
- }
-
- kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_QUERY);
- kqpRequest.MutableRequest()->SetKeepSession(false);
-
- // TODO: Use tx control from request.
- kqpRequest.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
- kqpRequest.MutableRequest()->MutableTxControl()->set_commit_tx(true);
-
- switch (req.query_case()) {
- case Ydb::Query::ExecuteQueryRequest::kQueryContent: {
- NYql::TIssues issues;
- if (!CheckQuery(req.query_content().text(), issues)) {
- return {Ydb::StatusIds::BAD_REQUEST, issues};
- }
-
- kqpRequest.MutableRequest()->SetQuery(req.query_content().text());
- break;
- }
-
- default: {
- NYql::TIssues issues;
- issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query option"));
- return {Ydb::StatusIds::BAD_REQUEST, issues};
- }
- }
-
- return {Ydb::StatusIds::SUCCESS, {}};
-}
-
class RpcFlowControlState {
public:
RpcFlowControlState(ui64 inflightLimitBytes)
diff --git a/ydb/core/grpc_services/query/rpc_execute_script.cpp b/ydb/core/grpc_services/query/rpc_execute_script.cpp
index 83f3bd84b6..9ad4b5a033 100644
--- a/ydb/core/grpc_services/query/rpc_execute_script.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_script.cpp
@@ -1,13 +1,16 @@
#include "service_query.h"
+#include "query_helpers.h"
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/kikimr_issue.h>
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/grpc_services/rpc_kqp_base.h>
+#include <ydb/core/kqp/common/kqp.h>
#include <ydb/public/api/protos/draft/ydb_query.pb.h>
#include <ydb/public/lib/operation_id/operation_id.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/hfunc.h>
namespace NKikimr::NGRpcService {
@@ -29,21 +32,57 @@ public:
{}
void Bootstrap() {
- Ydb::Operations::Operation operation;
- operation.set_id(GetOperationId());
NYql::TIssues issues;
- issues.AddIssue("Execute script started");
- Reply(Ydb::StatusIds::SUCCESS, std::move(operation), issues);
+ Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS;
+ if (auto scriptRequest = MakeScriptRequest(issues, status)) {
+ if (Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), scriptRequest.Release())) {
+ Become(&TExecuteScriptRPC::StateFunc);
+ } else {
+ issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error"));
+ Reply(Ydb::StatusIds::INTERNAL_ERROR, issues);
+ }
+ } else {
+ Reply(status, issues);
+ }
}
- TString GetOperationId() const {
- Ydb::TOperationId operationId;
- operationId.SetKind(Ydb::TOperationId::SCRIPT);
- NOperationId::AddOptionalValue(operationId, "id", "fake_execute_script_id");
- return NOperationId::ProtoToString(operationId);
+private:
+ STRICT_STFUNC(StateFunc,
+ hFunc(NKqp::TEvKqp::TEvScriptResponse, Handle)
+ )
+
+ void Handle(NKqp::TEvKqp::TEvScriptResponse::TPtr& ev) {
+ Ydb::Operations::Operation operation;
+ operation.set_id(ev->Get()->OperationId);
+ Ydb::Query::ExecuteScriptMetadata metadata;
+ metadata.set_execution_id(ev->Get()->ExecutionId);
+ metadata.set_exec_status(ev->Get()->ExecStatus);
+ metadata.set_exec_mode(ev->Get()->ExecMode);
+ operation.mutable_metadata()->PackFrom(metadata);
+ Reply(ev->Get()->Status, std::move(operation), ev->Get()->Issues);
+ }
+
+ THolder<NKqp::TEvKqp::TEvScriptRequest> MakeScriptRequest(NYql::TIssues& issues, Ydb::StatusIds::StatusCode& status) const {
+ const auto* req = Request_->GetProtoRequest();
+ const auto traceId = Request_->GetTraceId();
+
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvScriptRequest>();
+
+ SetAuthToken(ev, *Request_);
+ SetDatabase(ev, *Request_);
+ SetRlPath(ev, *Request_);
+
+ if (traceId) {
+ ev->Record.SetTraceId(traceId.GetRef());
+ }
+
+ std::tie(status, issues) = FillKqpRequest(*req, ev->Record);
+ if (status != Ydb::StatusIds::SUCCESS) {
+ return nullptr;
+ }
+ return ev;
}
-private:
void Reply(Ydb::StatusIds::StatusCode status, Ydb::Operations::Operation&& result, const NYql::TIssues& issues = {}) {
LOG_INFO_S(TActivationContext::AsActorContext(), NKikimrServices::RPC_REQUEST, "Execute script, status: "
<< Ydb::StatusIds::StatusCode_Name(status) << (issues ? ". Issues: " : "") << issues.ToOneLineString());
@@ -64,6 +103,12 @@ private:
PassAway();
}
+ void Reply(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) {
+ Ydb::Operations::Operation result;
+ result.set_ready(true);
+ Reply(status, std::move(result), issues);
+ }
+
private:
std::unique_ptr<TEvExecuteScriptRequest> Request_;
};
diff --git a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
new file mode 100644
index 0000000000..c4066df2d4
--- /dev/null
+++ b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
@@ -0,0 +1,159 @@
+#include "service_query.h"
+
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/kikimr_issue.h>
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/kqp/common/kqp.h>
+#include <ydb/public/api/protos/draft/ydb_query.pb.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/interconnect.h>
+
+namespace NKikimr::NGRpcService {
+
+namespace {
+
+using namespace NActors;
+
+using TEvFetchScriptResultsRequest = TGrpcRequestNoOperationCall<Ydb::Query::FetchScriptResultsRequest,
+ Ydb::Query::FetchScriptResultsResponse>;
+
+constexpr i64 MAX_ROWS_LIMIT = 1000;
+
+class TFetchScriptResultsRPC : public TActorBootstrapped<TFetchScriptResultsRPC> {
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::GRPC_REQ;
+ }
+
+ TFetchScriptResultsRPC(TEvFetchScriptResultsRequest* request)
+ : Request_(request)
+ {}
+
+ void Bootstrap() {
+ const auto* req = Request_->GetProtoRequest();
+ if (!req) {
+ Reply(Ydb::StatusIds::INTERNAL_ERROR, "Internal error");
+ return;
+ }
+
+ const TString& executionId = req->execution_id();
+ NActors::TActorId runScriptActor;
+ if (!runScriptActor.Parse(executionId.data(), executionId.size())) {
+ Reply(Ydb::StatusIds::BAD_REQUEST, "Incorrect execution id");
+ return;
+ }
+
+ if (req->rows_limit() <= 0) {
+ Reply(Ydb::StatusIds::BAD_REQUEST, "Invalid rows limit");
+ return;
+ }
+
+ if (req->rows_offset() < 0) {
+ Reply(Ydb::StatusIds::BAD_REQUEST, "Invalid rows offset");
+ return;
+ }
+
+ if (req->rows_limit() > MAX_ROWS_LIMIT) {
+ Reply(Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Rows limit is too large. Values <= " << MAX_ROWS_LIMIT << " are allowed");
+ return;
+ }
+
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvFetchScriptResultsRequest>();
+ ev->Record.SetRowsOffset(req->rows_offset());
+ ev->Record.SetRowsLimit(req->rows_limit());
+
+ ui64 flags = IEventHandle::FlagTrackDelivery;
+ if (runScriptActor.NodeId() != SelfId().NodeId()) {
+ flags |= IEventHandle::FlagSubscribeOnSession;
+ SubscribedOnSession = runScriptActor.NodeId();
+ }
+ Send(runScriptActor, std::move(ev), flags);
+
+ Become(&TFetchScriptResultsRPC::StateFunc);
+ }
+
+private:
+ STRICT_STFUNC(StateFunc,
+ hFunc(NKqp::TEvKqp::TEvFetchScriptResultsResponse, Handle);
+ hFunc(NActors::TEvents::TEvUndelivered, Handle);
+ hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle);
+ )
+
+ void Handle(NKqp::TEvKqp::TEvFetchScriptResultsResponse::TPtr& ev) {
+ Ydb::Query::FetchScriptResultsResponse resp;
+ resp.set_status(ev->Get()->Record.GetStatus());
+ resp.mutable_issues()->Swap(ev->Get()->Record.MutableIssues());
+ resp.set_result_set_index(static_cast<i64>(ev->Get()->Record.GetResultSetIndex()));
+ if (ev->Get()->Record.HasResultSet()) {
+ resp.mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet());
+ }
+ Reply(resp.status(), std::move(resp));
+ }
+
+ void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
+ if (ev->Get()->Reason == NActors::TEvents::TEvUndelivered::ReasonActorUnknown) {
+ Reply(Ydb::StatusIds::NOT_FOUND, "No such execution");
+ } else {
+ Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver request to destination");
+ }
+ }
+
+ void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr&) {
+ Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver request to destination");
+ }
+
+ void PassAway() override {
+ if (SubscribedOnSession) {
+ Send(TActivationContext::InterconnectProxy(*SubscribedOnSession), new TEvents::TEvUnsubscribe());
+ }
+ TActorBootstrapped<TFetchScriptResultsRPC>::PassAway();
+ }
+
+ void Reply(Ydb::StatusIds::StatusCode status, Ydb::Query::FetchScriptResultsResponse&& result, const NYql::TIssues& issues = {}) {
+ LOG_INFO_S(TActivationContext::AsActorContext(), NKikimrServices::RPC_REQUEST, "Fetch script results, status: "
+ << Ydb::StatusIds::StatusCode_Name(status) << (issues ? ". Issues: " : "") << issues.ToOneLineString());
+
+ for (const auto& issue : issues) {
+ auto item = result.add_issues();
+ NYql::IssueToMessage(issue, item);
+ }
+
+ result.set_status(status);
+
+ TString serializedResult;
+ Y_PROTOBUF_SUPPRESS_NODISCARD result.SerializeToString(&serializedResult);
+
+ Request_->SendSerializedResult(std::move(serializedResult), status);
+
+ PassAway();
+ }
+
+ void Reply(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) {
+ Ydb::Query::FetchScriptResultsResponse result;
+ Reply(status, std::move(result), issues);
+ }
+
+ void Reply(Ydb::StatusIds::StatusCode status, const TString& errorText) {
+ NYql::TIssues issues;
+ issues.AddIssue(errorText);
+ Reply(status, issues);
+ }
+
+private:
+ std::unique_ptr<TEvFetchScriptResultsRequest> Request_;
+ TMaybe<ui32> SubscribedOnSession;
+};
+
+} // namespace
+
+void DoFetchScriptResults(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
+ Y_UNUSED(f);
+ auto* req = dynamic_cast<TEvFetchScriptResultsRequest*>(p.release());
+ Y_VERIFY(req != nullptr, "Wrong using of TGRpcRequestWrapper");
+ TActivationContext::AsActorContext().Register(new TFetchScriptResultsRPC(req));
+}
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/query/service_query.h b/ydb/core/grpc_services/query/service_query.h
index 2a02f52ea0..08924d84c3 100644
--- a/ydb/core/grpc_services/query/service_query.h
+++ b/ydb/core/grpc_services/query/service_query.h
@@ -10,5 +10,6 @@ class IFacilityProvider;
void DoExecuteQueryRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&);
void DoExecuteScriptRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&);
+void DoFetchScriptResults(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&);
} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp
index a7eab0d176..170dd68c95 100644
--- a/ydb/core/grpc_services/rpc_execute_data_query.cpp
+++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp
@@ -88,7 +88,7 @@ public:
NKikimrKqp::EQueryType queryType;
switch (query.query_case()) {
- case Query::kYqlText: {
+ case Ydb::Table::Query::kYqlText: {
NYql::TIssues issues;
if (!CheckQuery(query.yql_text(), issues)) {
return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx);
@@ -99,7 +99,7 @@ public:
break;
}
- case Query::kId: {
+ case Ydb::Table::Query::kId: {
if (query.id().empty()) {
NYql::TIssues issues;
issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Empty query id"));
diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
index 8223279aaa..e1d19dac86 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
@@ -116,7 +116,7 @@ bool FillKqpRequest(const Ydb::Table::ExecuteScanQueryRequest& req, NKikimrKqp::
auto& query = req.query();
switch (query.query_case()) {
- case Query::kYqlText: {
+ case Ydb::Table::Query::kYqlText: {
NYql::TIssues issues;
if (!CheckQuery(query.yql_text(), issues)) {
error = TParseRequestError(Ydb::StatusIds::BAD_REQUEST, issues);
@@ -127,7 +127,7 @@ bool FillKqpRequest(const Ydb::Table::ExecuteScanQueryRequest& req, NKikimrKqp::
break;
}
- case Query::kId: {
+ case Ydb::Table::Query::kId: {
NYql::TIssues issues;
issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
"Specifying query by ID not supported in scan execution."));
diff --git a/ydb/core/kqp/CMakeLists.darwin.txt b/ydb/core/kqp/CMakeLists.darwin.txt
index 3eb7972cab..106d4102b4 100644
--- a/ydb/core/kqp/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/CMakeLists.darwin.txt
@@ -20,6 +20,7 @@ add_subdirectory(provider)
add_subdirectory(proxy_service)
add_subdirectory(query_compiler)
add_subdirectory(rm_service)
+add_subdirectory(run_script_actor)
add_subdirectory(runtime)
add_subdirectory(session_actor)
add_subdirectory(topics)
diff --git a/ydb/core/kqp/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/CMakeLists.linux-aarch64.txt
index c539f81bfa..071a51d8ab 100644
--- a/ydb/core/kqp/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/CMakeLists.linux-aarch64.txt
@@ -20,6 +20,7 @@ add_subdirectory(provider)
add_subdirectory(proxy_service)
add_subdirectory(query_compiler)
add_subdirectory(rm_service)
+add_subdirectory(run_script_actor)
add_subdirectory(runtime)
add_subdirectory(session_actor)
add_subdirectory(topics)
diff --git a/ydb/core/kqp/CMakeLists.linux.txt b/ydb/core/kqp/CMakeLists.linux.txt
index c539f81bfa..071a51d8ab 100644
--- a/ydb/core/kqp/CMakeLists.linux.txt
+++ b/ydb/core/kqp/CMakeLists.linux.txt
@@ -20,6 +20,7 @@ add_subdirectory(provider)
add_subdirectory(proxy_service)
add_subdirectory(query_compiler)
add_subdirectory(rm_service)
+add_subdirectory(run_script_actor)
add_subdirectory(runtime)
add_subdirectory(session_actor)
add_subdirectory(topics)
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h
index 477ae39811..09d7ae7220 100644
--- a/ydb/core/kqp/common/kqp.h
+++ b/ydb/core/kqp/common/kqp.h
@@ -6,6 +6,7 @@
#include <library/cpp/lwtrace/shuttle.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
+#include <ydb/public/api/protos/draft/ydb_query.pb.h>
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/grpc_services/cancelation/cancelation.h>
@@ -754,7 +755,43 @@ struct TEvKqp {
{}
};
+ struct TEvScriptRequest : public TEventLocal<TEvScriptRequest, TKqpEvents::EvScriptRequest> {
+ TEvScriptRequest() = default;
+
+ mutable NKikimrKqp::TEvQueryRequest Record;
+ };
+
+ struct TEvScriptResponse : public TEventLocal<TEvScriptResponse, TKqpEvents::EvScriptResponse> {
+ TEvScriptResponse(TString operationId, TString executionId, Ydb::Query::ExecStatus execStatus, Ydb::Query::ExecMode execMode)
+ : Status(Ydb::StatusIds::SUCCESS)
+ , OperationId(std::move(operationId))
+ , ExecutionId(std::move(executionId))
+ , ExecStatus(execStatus)
+ , ExecMode(execMode)
+ {}
+
+ TEvScriptResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
+ : Status(status)
+ , Issues(std::move(issues))
+ , ExecStatus(Ydb::Query::EXEC_STATUS_FAILED)
+ , ExecMode(Ydb::Query::EXEC_MODE_UNSPECIFIED)
+ {}
+
+ const Ydb::StatusIds::StatusCode Status;
+ const NYql::TIssues Issues;
+ const TString OperationId;
+ const TString ExecutionId;
+ const Ydb::Query::ExecStatus ExecStatus;
+ const Ydb::Query::ExecMode ExecMode;
+ };
+
using TEvAbortExecution = NYql::NDq::TEvDq::TEvAbortExecution;
+
+ struct TEvFetchScriptResultsRequest : public TEventPB<TEvFetchScriptResultsRequest, NKikimrKqp::TEvFetchScriptResultsRequest, TKqpEvents::EvFetchScriptResultsRequest> {
+ };
+
+ struct TEvFetchScriptResultsResponse : public TEventPB<TEvFetchScriptResultsResponse, NKikimrKqp::TEvFetchScriptResultsResponse, TKqpEvents::EvFetchScriptResultsResponse> {
+ };
};
class TKqpRequestInfo {
diff --git a/ydb/core/kqp/common/kqp_event_ids.h b/ydb/core/kqp/common/kqp_event_ids.h
index 76bf41bffe..05012cbd21 100644
--- a/ydb/core/kqp/common/kqp_event_ids.h
+++ b/ydb/core/kqp/common/kqp_event_ids.h
@@ -30,6 +30,10 @@ struct TKqpEvents {
EvDataQueryStreamPart,
EvDataQueryStreamPartAck,
EvRecompileRequest,
+ EvScriptRequest,
+ EvScriptResponse,
+ EvFetchScriptResultsRequest,
+ EvFetchScriptResultsResponse,
};
static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution);
diff --git a/ydb/core/kqp/proxy_service/CMakeLists.darwin.txt b/ydb/core/kqp/proxy_service/CMakeLists.darwin.txt
index 1c0bbdf2ff..24f5c082b8 100644
--- a/ydb/core/kqp/proxy_service/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/proxy_service/CMakeLists.darwin.txt
@@ -21,6 +21,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC
core-cms-console
core-kqp-common
core-kqp-counters
+ core-kqp-run_script_actor
ydb-core-mind
ydb-core-protos
)
diff --git a/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt
index 60d88c62cc..25cdfec4e6 100644
--- a/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt
@@ -22,6 +22,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC
core-cms-console
core-kqp-common
core-kqp-counters
+ core-kqp-run_script_actor
ydb-core-mind
ydb-core-protos
)
diff --git a/ydb/core/kqp/proxy_service/CMakeLists.linux.txt b/ydb/core/kqp/proxy_service/CMakeLists.linux.txt
index 60d88c62cc..25cdfec4e6 100644
--- a/ydb/core/kqp/proxy_service/CMakeLists.linux.txt
+++ b/ydb/core/kqp/proxy_service/CMakeLists.linux.txt
@@ -22,6 +22,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC
core-cms-console
core-kqp-common
core-kqp-counters
+ core-kqp-run_script_actor
ydb-core-mind
ydb-core-protos
)
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index fa7d7fd773..007d00e21f 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -13,6 +13,7 @@
#include <ydb/core/kqp/session_actor/kqp_worker_common.h>
#include <ydb/core/kqp/node_service/kqp_node_service.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
+#include <ydb/core/kqp/run_script_actor/kqp_run_script_actor.h>
#include <ydb/core/kqp/runtime/kqp_spilling_file.h>
#include <ydb/core/kqp/runtime/kqp_spilling.h>
#include <ydb/core/actorlib_impl/long_timer.h>
@@ -561,6 +562,14 @@ public:
KQP_PROXY_LOG_D(TKqpRequestInfo(traceId, sessionId) << "Sent request to target, requestId: " << requestId << ", targetId: " << targetId);
}
+ void Handle(TEvKqp::TEvScriptRequest::TPtr& ev) {
+ const NActors::TActorId actorId = Register(CreateRunScriptActor(ev->Get()->Record));
+ Ydb::TOperationId operationId;
+ operationId.SetKind(Ydb::TOperationId::SCRIPT);
+ NOperationId::AddOptionalValue(operationId, "actor_id", actorId.ToString());
+ Send(ev->Sender, new TEvKqp::TEvScriptResponse(NOperationId::ProtoToString(operationId), actorId.ToString(), Ydb::Query::EXEC_STATUS_STARTING, Ydb::Query::EXEC_MODE_EXECUTE));
+ }
+
void Handle(TEvKqp::TEvCloseSessionRequest::TPtr& ev) {
auto& event = ev->Get()->Record;
auto& request = event.GetRequest();
@@ -994,6 +1003,7 @@ public:
hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, Handle);
hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle);
hFunc(TEvKqp::TEvQueryRequest, Handle);
+ hFunc(TEvKqp::TEvScriptRequest, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, Handle);
hFunc(TEvKqp::TEvQueryResponse, ForwardEvent);
hFunc(TEvKqp::TEvProcessResponse, ForwardEvent);
diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.darwin.txt b/ydb/core/kqp/run_script_actor/CMakeLists.darwin.txt
new file mode 100644
index 0000000000..0359b1c93e
--- /dev/null
+++ b/ydb/core/kqp/run_script_actor/CMakeLists.darwin.txt
@@ -0,0 +1,25 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(core-kqp-run_script_actor)
+target_compile_options(core-kqp-run_script_actor PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-kqp-run_script_actor PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-base
+ ydb-core-protos
+ core-kqp-executer_actor
+ api-protos
+)
+target_sources(core-kqp-run_script_actor PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
+)
diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..fb65e2fbd0
--- /dev/null
+++ b/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,26 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(core-kqp-run_script_actor)
+target_compile_options(core-kqp-run_script_actor PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-kqp-run_script_actor PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-base
+ ydb-core-protos
+ core-kqp-executer_actor
+ api-protos
+)
+target_sources(core-kqp-run_script_actor PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
+)
diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.linux.txt b/ydb/core/kqp/run_script_actor/CMakeLists.linux.txt
new file mode 100644
index 0000000000..fb65e2fbd0
--- /dev/null
+++ b/ydb/core/kqp/run_script_actor/CMakeLists.linux.txt
@@ -0,0 +1,26 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(core-kqp-run_script_actor)
+target_compile_options(core-kqp-run_script_actor PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-kqp-run_script_actor PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-base
+ ydb-core-protos
+ core-kqp-executer_actor
+ api-protos
+)
+target_sources(core-kqp-run_script_actor PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
+)
diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.txt b/ydb/core/kqp/run_script_actor/CMakeLists.txt
new file mode 100644
index 0000000000..5bb4faffb4
--- /dev/null
+++ b/ydb/core/kqp/run_script_actor/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
new file mode 100644
index 0000000000..12fd725346
--- /dev/null
+++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
@@ -0,0 +1,172 @@
+#include "kqp_run_script_actor.h"
+
+#include <ydb/core/base/kikimr_issue.h>
+#include <ydb/core/kqp/common/kqp.h>
+#include <ydb/core/kqp/executer_actor/kqp_executer.h>
+#include <ydb/core/protos/issue_id.pb.h>
+#include <ydb/public/api/protos/ydb_status_codes.pb.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/event_pb.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
+
+#include <util/generic/size_literals.h>
+
+#define LOG_T(stream) LOG_TRACE_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream);
+#define LOG_D(stream) LOG_DEBUG_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream);
+#define LOG_I(stream) LOG_INFO_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream);
+#define LOG_W(stream) LOG_WARN_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream);
+#define LOG_E(stream) LOG_ERROR_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream);
+
+namespace NKikimr::NKqp {
+
+namespace {
+
+constexpr ui64 RESULT_SIZE_LIMIT = 10_MB;
+constexpr int RESULT_ROWS_LIMIT = 1000;
+
+class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
+public:
+ TRunScriptActor(const NKikimrKqp::TEvQueryRequest& request)
+ : Request(request)
+ {}
+
+ static constexpr char ActorName[] = "KQP_RUN_SCRIPT_ACTOR";
+
+ void Bootstrap() {
+ Become(&TRunScriptActor::StateFunc);
+
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+ ev->Record = Request;
+
+ NActors::ActorIdToProto(SelfId(), ev->Record.MutableRequestActorId());
+
+ if (!Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), ev.Release())) {
+ Issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error"));
+ Finish(Ydb::StatusIds::INTERNAL_ERROR);
+ }
+ }
+
+private:
+ STRICT_STFUNC(StateFunc,
+ hFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle);
+ hFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
+ hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
+ hFunc(NKqp::TEvKqp::TEvFetchScriptResultsRequest, Handle);
+ )
+
+ void Handle(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) {
+ ExecuterActorId = ActorIdFromProto(ev->Get()->Record.GetExecuterActorId());
+ LOG_D("ExecuterActorId: " << ExecuterActorId);
+ }
+
+ void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
+ resp->Record.SetFreeSpace(RESULT_SIZE_LIMIT);
+
+ LOG_D("Send stream data ack"
+ << ", seqNo: " << ev->Get()->Record.GetSeqNo()
+ << ", to: " << ev->Sender);
+
+ Send(ev->Sender, resp.Release());
+
+ if (!IsFinished() && !IsTruncated()) {
+ MergeResultSet(ev);
+ }
+ }
+
+ void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
+ auto& record = ev->Get()->Record.GetRef();
+
+ const auto& issueMessage = record.GetResponse().GetQueryIssues();
+ NYql::IssuesFromMessage(issueMessage, Issues);
+
+ Finish(record.GetYdbStatus());
+ }
+
+ void Handle(NKqp::TEvKqp::TEvFetchScriptResultsRequest::TPtr& ev) {
+ auto resp = MakeHolder<NKqp::TEvKqp::TEvFetchScriptResultsResponse>();
+ if (!IsFinished()) {
+ resp->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST);
+ resp->Record.AddIssues()->set_message("Results are not ready");
+ } else {
+ if (!ResultSets.empty()) {
+ resp->Record.SetResultSetIndex(0);
+ resp->Record.MutableResultSet()->mutable_columns()->CopyFrom(ResultSets[0].columns());
+
+ const ui64 rowsOffset = ev->Get()->Record.GetRowsOffset();
+ const ui64 rowsLimit = ev->Get()->Record.GetRowsLimit();
+ ui64 rowsAdded = 0;
+ for (i64 row = static_cast<i64>(rowsOffset); row < ResultSets[0].rows_size(); ++row) {
+ if (rowsAdded >= rowsLimit) {
+ resp->Record.MutableResultSet()->set_truncated(true);
+ break;
+ }
+ resp->Record.MutableResultSet()->add_rows()->CopyFrom(ResultSets[0].rows(row));
+ }
+ }
+ resp->Record.SetStatus(Status);
+ for (const auto& issue : Issues) {
+ auto item = resp->Record.add_issues();
+ NYql::IssueToMessage(issue, item);
+ }
+ }
+ Send(ev->Sender, std::move(resp));
+ }
+
+ void MergeResultSet(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
+ if (ResultSets.empty()) {
+ ResultSets.emplace_back(ev->Get()->Record.GetResultSet());
+ return;
+ }
+ if (ResultSets[0].columns_size() != ev->Get()->Record.GetResultSet().columns_size()) {
+ Issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error"));
+ Finish(Ydb::StatusIds::INTERNAL_ERROR);
+ return;
+ }
+ size_t rowsAdded = 0;
+ for (auto& row : *ev->Get()->Record.MutableResultSet()->mutable_rows()) {
+ ResultSets[0].add_rows()->Swap(&row);
+ ++rowsAdded;
+ if (ResultSets[0].rows_size() >= RESULT_ROWS_LIMIT) {
+ break;
+ }
+ }
+ if (ev->Get()->Record.GetResultSet().truncated() || ResultSets[0].rows_size() >= RESULT_ROWS_LIMIT || ResultSets[0].ByteSizeLong() >= RESULT_SIZE_LIMIT) {
+ ResultSets[0].set_truncated(true);
+ }
+ LOG_D("Received partial result. Rows added: " << rowsAdded << ". Truncated: " << IsTruncated());
+ }
+
+ void Finish(Ydb::StatusIds::StatusCode status) {
+ Status = status;
+ }
+
+ bool IsFinished() const {
+ return Status != Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
+ }
+
+ bool IsTruncated() const {
+ return !ResultSets.empty() && ResultSets[0].truncated();
+ }
+
+private:
+ const NKikimrKqp::TEvQueryRequest Request;
+
+ NActors::TActorId ExecuterActorId;
+
+ // Result
+ NYql::TIssues Issues;
+ Ydb::StatusIds::StatusCode Status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
+ std::vector<Ydb::ResultSet> ResultSets;
+};
+
+} // namespace
+
+NActors::IActor* CreateRunScriptActor(const NKikimrKqp::TEvQueryRequest& request) {
+ return new TRunScriptActor(request);
+}
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h
new file mode 100644
index 0000000000..5f0ee7241b
--- /dev/null
+++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h
@@ -0,0 +1,14 @@
+#pragma once
+
+#include <ydb/core/protos/kqp.pb.h>
+
+#include <library/cpp/actors/core/actor.h>
+
+namespace NKikimr::NKqp {
+
+struct TEvKqpRunScriptActor {
+};
+
+NActors::IActor* CreateRunScriptActor(const NKikimrKqp::TEvQueryRequest& request);
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
index d01e88f320..5db75604d8 100644
--- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
@@ -90,10 +90,27 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();
- auto result = db.ExecuteScript(R"(
+ auto executeScrptsResult = db.ExecuteScript(R"(
SELECT 42
)").ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.Status().GetStatus(), EStatus::SUCCESS, result.Status().GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL_C(executeScrptsResult.Status().GetStatus(), EStatus::SUCCESS, executeScrptsResult.Status().GetIssues().ToString());
+ UNIT_ASSERT(executeScrptsResult.Metadata().ExecutionId);
+
+ TMaybe<TFetchScriptResultsResult> results;
+ do {
+ Sleep(TDuration::MilliSeconds(50));
+ TAsyncFetchScriptResultsResult future = db.FetchScriptResults(executeScrptsResult.Metadata().ExecutionId);
+ results.ConstructInPlace(future.ExtractValueSync());
+ if (!results->IsSuccess()) {
+ UNIT_ASSERT_C(results->GetStatus() == NYdb::EStatus::BAD_REQUEST, results->GetStatus());
+ UNIT_ASSERT_STRING_CONTAINS(results->GetIssues().ToOneLineString(), "Results are not ready");
+ }
+ } while (!results->HasResultSet());
+ TResultSetParser resultSet(results->ExtractResultSet());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1);
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetInt32(), 42);
}
}
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 58c9ba8b75..5220ae755b 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -593,3 +593,15 @@ message TKqpStreamLookupSettings {
optional bool ImmediateTx = 6;
repeated string LookupKeyColumns = 7;
}
+
+message TEvFetchScriptResultsRequest {
+ optional uint64 RowsOffset = 1;
+ optional uint64 RowsLimit = 2;
+}
+
+message TEvFetchScriptResultsResponse {
+ optional Ydb.StatusIds.StatusCode Status = 1;
+ repeated Ydb.Issue.IssueMessage Issues = 2;
+ optional uint64 ResultSetIndex = 3;
+ optional Ydb.ResultSet ResultSet = 4;
+}
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
index de36842701..b8c5c4f118 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
@@ -65,6 +65,53 @@ public:
return promise.GetFuture();
}
+ TAsyncFetchScriptResultsResult FetchScriptResults(const TString& executionId, const TFetchScriptResultsSettings& settings) {
+ using namespace Ydb::Query;
+ auto request = MakeRequest<FetchScriptResultsRequest>();
+ request.set_execution_id(executionId);
+ if (settings.FetchToken_) {
+ request.set_fetch_token(settings.FetchToken_);
+ }
+ request.set_rows_offset(settings.RowsOffset_);
+ request.set_rows_limit(settings.RowsLimit_);
+
+ auto promise = NThreading::NewPromise<TFetchScriptResultsResult>();
+
+ auto extractor = [promise]
+ (FetchScriptResultsResponse* response, TPlainStatus status) mutable {
+ if (response) {
+ NYql::TIssues opIssues;
+ NYql::IssuesFromMessage(response->issues(), opIssues);
+ TStatus st(static_cast<EStatus>(response->status()), std::move(opIssues));
+
+ promise.SetValue(
+ TFetchScriptResultsResult(
+ std::move(st),
+ TResultSet(std::move(*response->mutable_result_set())),
+ response->result_set_index(),
+ response->next_fetch_token()
+ )
+ );
+ } else {
+ TStatus st(std::move(status));
+ promise.SetValue(TFetchScriptResultsResult(std::move(st)));
+ }
+ };
+
+ TRpcRequestSettings rpcSettings;
+ rpcSettings.ClientTimeout = TDuration::Seconds(60);
+
+ Connections_->Run<V1::QueryService, FetchScriptResultsRequest, FetchScriptResultsResponse>(
+ std::move(request),
+ extractor,
+ &V1::QueryService::Stub::AsyncFetchScriptResults,
+ DbDriverState_,
+ rpcSettings,
+ TEndpointKey());
+
+ return promise.GetFuture();
+ }
+
private:
TClientSettings Settings_;
};
@@ -92,4 +139,10 @@ TAsyncExecuteScriptResult TQueryClient::ExecuteScript(const TString& script,
return Impl_->ExecuteScript(script, settings);
}
+TAsyncFetchScriptResultsResult TQueryClient::FetchScriptResults(const TString& executionId,
+ const TFetchScriptResultsSettings& settings)
+{
+ return Impl_->FetchScriptResults(executionId, settings);
+}
+
} // namespace NYdb::NQuery
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.h b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h
index fe702eb300..e3d073f5ad 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/client.h
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h
@@ -30,6 +30,9 @@ public:
TAsyncExecuteScriptResult ExecuteScript(const TString& script,
const TExecuteScriptSettings& settings = TExecuteScriptSettings());
+ TAsyncFetchScriptResultsResult FetchScriptResults(const TString& executionId,
+ const TFetchScriptResultsSettings& settings = TFetchScriptResultsSettings());
+
private:
class TImpl;
std::shared_ptr<TImpl> Impl_;
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp
index bf1a8e517b..146847bdba 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp
@@ -14,4 +14,13 @@ TResultSet TExecuteQueryResult::GetResultSet(size_t resultIndex) const {
return ResultSets_[resultIndex];
}
+TExecuteScriptResult::TExecuteScriptResult(TStatus&& status, Ydb::Operations::Operation&& operation)
+ : TOperation(std::move(status), std::move(operation))
+{
+ Ydb::Query::ExecuteScriptMetadata metadata;
+ GetProto().metadata().UnpackTo(&metadata);
+
+ Metadata_.ExecutionId = metadata.execution_id();
+}
+
} // namespace NYdb::NQuery
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/query.h b/ydb/public/sdk/cpp/client/draft/ydb_query/query.h
index 9e40a2ec72..6a442d8ee0 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/query.h
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/query.h
@@ -3,6 +3,7 @@
#include <ydb/public/api/grpc/draft/ydb_query_v1.grpc.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h>
#include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h>
#include <ydb/public/sdk/cpp/client/ydb_types/request_settings.h>
#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h>
@@ -85,9 +86,54 @@ struct TExecuteScriptSettings : public TOperationRequestSettings<TExecuteScriptS
class TExecuteScriptResult : public TOperation {
public:
+ struct TMetadata {
+ TString ExecutionId;
+ };
+
using TOperation::TOperation;
+ TExecuteScriptResult(TStatus&& status, Ydb::Operations::Operation&& operation);
+
+ const TMetadata& Metadata() const {
+ return Metadata_;
+ }
+
+private:
+ TMetadata Metadata_;
};
using TAsyncExecuteScriptResult = NThreading::TFuture<TExecuteScriptResult>;
+struct TFetchScriptResultsSettings : public TRequestSettings<TFetchScriptResultsSettings> {
+ FLUENT_SETTING(TString, FetchToken);
+ FLUENT_SETTING_DEFAULT(ui64, RowsOffset, 0);
+ FLUENT_SETTING_DEFAULT(ui64, RowsLimit, 1000);
+};
+
+class TFetchScriptResultsResult : public TStatus {
+public:
+ bool HasResultSet() const { return ResultSet_.Defined(); }
+ ui64 GetResultSetIndex() const { return ResultSetIndex_; }
+ const TResultSet& GetResultSet() const { return *ResultSet_; }
+ TResultSet ExtractResultSet() { return std::move(*ResultSet_); }
+ const TString& NextPageToken() const { return NextPageToken_; }
+
+ explicit TFetchScriptResultsResult(TStatus&& status)
+ : TStatus(std::move(status))
+ {}
+
+ TFetchScriptResultsResult(TStatus&& status, TResultSet&& resultSet, i64 resultSetIndex, const TString& nextPageToken)
+ : TStatus(std::move(status))
+ , ResultSet_(std::move(resultSet))
+ , ResultSetIndex_(resultSetIndex)
+ , NextPageToken_(nextPageToken)
+ {}
+
+private:
+ TMaybe<TResultSet> ResultSet_;
+ i64 ResultSetIndex_ = 0;
+ TString NextPageToken_;
+};
+
+using TAsyncFetchScriptResultsResult = NThreading::TFuture<TFetchScriptResultsResult>;
+
} // namespace NYdb::NQuery
diff --git a/ydb/services/ydb/ydb_query.cpp b/ydb/services/ydb/ydb_query.cpp
index d41fbe743c..698417e203 100644
--- a/ydb/services/ydb/ydb_query.cpp
+++ b/ydb/services/ydb/ydb_query.cpp
@@ -34,6 +34,12 @@ void TGRpcYdbQueryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
new TGrpcRequestNoOperationCall<ExecuteScriptRequest, Ydb::Operations::Operation>
(ctx, &DoExecuteScriptRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr}));
})
+
+ ADD_REQUEST(FetchScriptResults, FetchScriptResultsRequest, FetchScriptResultsResponse, {
+ ActorSystem_->Send(GRpcRequestProxyId_,
+ new TGrpcRequestNoOperationCall<FetchScriptResultsRequest, FetchScriptResultsResponse>
+ (ctx, &DoFetchScriptResults, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr}));
+ })
#undef ADD_REQUEST
}