diff options
32 files changed, 795 insertions, 59 deletions
diff --git a/ydb/core/grpc_services/CMakeLists.darwin.txt b/ydb/core/grpc_services/CMakeLists.darwin.txt index 927b117349..3dc6b63a01 100644 --- a/ydb/core/grpc_services/CMakeLists.darwin.txt +++ b/ydb/core/grpc_services/CMakeLists.darwin.txt @@ -126,4 +126,5 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/table_settings.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_script.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp ) diff --git a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt index d4495d7980..bb6c3dc35a 100644 --- a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt +++ b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt @@ -127,4 +127,5 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/table_settings.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_script.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp ) diff --git a/ydb/core/grpc_services/CMakeLists.linux.txt b/ydb/core/grpc_services/CMakeLists.linux.txt index d4495d7980..bb6c3dc35a 100644 --- a/ydb/core/grpc_services/CMakeLists.linux.txt +++ b/ydb/core/grpc_services/CMakeLists.linux.txt @@ -127,4 +127,5 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/table_settings.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_script.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp ) diff --git a/ydb/core/grpc_services/query/query_helpers.h b/ydb/core/grpc_services/query/query_helpers.h new file mode 100644 index 0000000000..c706ee3416 --- /dev/null +++ b/ydb/core/grpc_services/query/query_helpers.h @@ -0,0 +1,89 @@ +#pragma once +#include <ydb/core/base/kikimr_issue.h> +#include <ydb/core/grpc_services/rpc_kqp_base.h> +#include <ydb/public/api/protos/draft/ydb_query.pb.h> + +#include <utility> + +namespace NKikimr::NGRpcService { + +namespace NQueryHelpersPrivate { + +inline bool FillQueryContent(const Ydb::Query::ExecuteQueryRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest, NYql::TIssues& issues) { + switch (req.query_case()) { + case Ydb::Query::ExecuteQueryRequest::kQueryContent: + if (!CheckQuery(req.query_content().text(), issues)) { + return false; + } + + kqpRequest.MutableRequest()->SetQuery(req.query_content().text()); + return true; + + default: + issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query option")); + return false; + } +} + +inline bool FillQueryContent(const Ydb::Query::ExecuteScriptRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest, NYql::TIssues& issues) { + switch (req.script_case()) { + case Ydb::Query::ExecuteScriptRequest::kScriptContent: + if (!CheckQuery(req.script_content().text(), issues)) { + return false; + } + + kqpRequest.MutableRequest()->SetQuery(req.script_content().text()); + return true; + + case Ydb::Query::ExecuteScriptRequest::kScriptId: + issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Execution by script id is not supported yet")); + return false; + + default: + issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query option")); + return false; + } +} + +inline NKikimrKqp::EQueryType GetQueryType(const Ydb::Query::ExecuteQueryRequest&) { + return NKikimrKqp::QUERY_TYPE_SQL_QUERY; +} + +inline NKikimrKqp::EQueryType GetQueryType(const Ydb::Query::ExecuteScriptRequest&) { + return NKikimrKqp::QUERY_TYPE_SQL_QUERY; // TODO: make new query type +} + +} // namespace NQueryHelpersPrivate + +template <class TRequestProto> +std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest( + const TRequestProto& req, NKikimrKqp::TEvQueryRequest& kqpRequest) +{ + kqpRequest.MutableRequest()->MutableYdbParameters()->insert(req.parameters().begin(), req.parameters().end()); + switch (req.exec_mode()) { + case Ydb::Query::EXEC_MODE_EXECUTE: + kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + break; + default: { + NYql::TIssues issues; + issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query mode")); + return {Ydb::StatusIds::BAD_REQUEST, issues}; + } + } + + kqpRequest.MutableRequest()->SetType(NQueryHelpersPrivate::GetQueryType(req)); + kqpRequest.MutableRequest()->SetKeepSession(false); + + // TODO: Use tx control from request. + kqpRequest.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + kqpRequest.MutableRequest()->MutableTxControl()->set_commit_tx(true); + + NYql::TIssues issues; + if (!NQueryHelpersPrivate::FillQueryContent(req, kqpRequest, issues)) { + return {Ydb::StatusIds::BAD_REQUEST, issues}; + } + + return {Ydb::StatusIds::SUCCESS, {}}; +} + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index e34e7f53ef..96271f2c68 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -1,4 +1,5 @@ #include "service_query.h" +#include "query_helpers.h" #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/base/appdata.h> @@ -19,49 +20,6 @@ using namespace NActors; using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteQueryRequest, Ydb::Query::ExecuteQueryResponsePart>; -std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest( - const Ydb::Query::ExecuteQueryRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest) -{ - kqpRequest.MutableRequest()->MutableYdbParameters()->insert(req.parameters().begin(), req.parameters().end()); - switch (req.exec_mode()) { - case Ydb::Query::EXEC_MODE_EXECUTE: - kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); - break; - default: { - NYql::TIssues issues; - issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query mode")); - return {Ydb::StatusIds::BAD_REQUEST, issues}; - } - } - - kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_QUERY); - kqpRequest.MutableRequest()->SetKeepSession(false); - - // TODO: Use tx control from request. - kqpRequest.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); - kqpRequest.MutableRequest()->MutableTxControl()->set_commit_tx(true); - - switch (req.query_case()) { - case Ydb::Query::ExecuteQueryRequest::kQueryContent: { - NYql::TIssues issues; - if (!CheckQuery(req.query_content().text(), issues)) { - return {Ydb::StatusIds::BAD_REQUEST, issues}; - } - - kqpRequest.MutableRequest()->SetQuery(req.query_content().text()); - break; - } - - default: { - NYql::TIssues issues; - issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query option")); - return {Ydb::StatusIds::BAD_REQUEST, issues}; - } - } - - return {Ydb::StatusIds::SUCCESS, {}}; -} - class RpcFlowControlState { public: RpcFlowControlState(ui64 inflightLimitBytes) diff --git a/ydb/core/grpc_services/query/rpc_execute_script.cpp b/ydb/core/grpc_services/query/rpc_execute_script.cpp index 83f3bd84b6..9ad4b5a033 100644 --- a/ydb/core/grpc_services/query/rpc_execute_script.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_script.cpp @@ -1,13 +1,16 @@ #include "service_query.h" +#include "query_helpers.h" #include <ydb/core/base/appdata.h> #include <ydb/core/base/kikimr_issue.h> #include <ydb/core/grpc_services/base/base.h> #include <ydb/core/grpc_services/rpc_kqp_base.h> +#include <ydb/core/kqp/common/kqp.h> #include <ydb/public/api/protos/draft/ydb_query.pb.h> #include <ydb/public/lib/operation_id/operation_id.h> #include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/hfunc.h> namespace NKikimr::NGRpcService { @@ -29,21 +32,57 @@ public: {} void Bootstrap() { - Ydb::Operations::Operation operation; - operation.set_id(GetOperationId()); NYql::TIssues issues; - issues.AddIssue("Execute script started"); - Reply(Ydb::StatusIds::SUCCESS, std::move(operation), issues); + Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS; + if (auto scriptRequest = MakeScriptRequest(issues, status)) { + if (Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), scriptRequest.Release())) { + Become(&TExecuteScriptRPC::StateFunc); + } else { + issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error")); + Reply(Ydb::StatusIds::INTERNAL_ERROR, issues); + } + } else { + Reply(status, issues); + } } - TString GetOperationId() const { - Ydb::TOperationId operationId; - operationId.SetKind(Ydb::TOperationId::SCRIPT); - NOperationId::AddOptionalValue(operationId, "id", "fake_execute_script_id"); - return NOperationId::ProtoToString(operationId); +private: + STRICT_STFUNC(StateFunc, + hFunc(NKqp::TEvKqp::TEvScriptResponse, Handle) + ) + + void Handle(NKqp::TEvKqp::TEvScriptResponse::TPtr& ev) { + Ydb::Operations::Operation operation; + operation.set_id(ev->Get()->OperationId); + Ydb::Query::ExecuteScriptMetadata metadata; + metadata.set_execution_id(ev->Get()->ExecutionId); + metadata.set_exec_status(ev->Get()->ExecStatus); + metadata.set_exec_mode(ev->Get()->ExecMode); + operation.mutable_metadata()->PackFrom(metadata); + Reply(ev->Get()->Status, std::move(operation), ev->Get()->Issues); + } + + THolder<NKqp::TEvKqp::TEvScriptRequest> MakeScriptRequest(NYql::TIssues& issues, Ydb::StatusIds::StatusCode& status) const { + const auto* req = Request_->GetProtoRequest(); + const auto traceId = Request_->GetTraceId(); + + auto ev = MakeHolder<NKqp::TEvKqp::TEvScriptRequest>(); + + SetAuthToken(ev, *Request_); + SetDatabase(ev, *Request_); + SetRlPath(ev, *Request_); + + if (traceId) { + ev->Record.SetTraceId(traceId.GetRef()); + } + + std::tie(status, issues) = FillKqpRequest(*req, ev->Record); + if (status != Ydb::StatusIds::SUCCESS) { + return nullptr; + } + return ev; } -private: void Reply(Ydb::StatusIds::StatusCode status, Ydb::Operations::Operation&& result, const NYql::TIssues& issues = {}) { LOG_INFO_S(TActivationContext::AsActorContext(), NKikimrServices::RPC_REQUEST, "Execute script, status: " << Ydb::StatusIds::StatusCode_Name(status) << (issues ? ". Issues: " : "") << issues.ToOneLineString()); @@ -64,6 +103,12 @@ private: PassAway(); } + void Reply(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) { + Ydb::Operations::Operation result; + result.set_ready(true); + Reply(status, std::move(result), issues); + } + private: std::unique_ptr<TEvExecuteScriptRequest> Request_; }; diff --git a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp new file mode 100644 index 0000000000..c4066df2d4 --- /dev/null +++ b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp @@ -0,0 +1,159 @@ +#include "service_query.h" + +#include <ydb/core/base/appdata.h> +#include <ydb/core/base/kikimr_issue.h> +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/kqp/common/kqp.h> +#include <ydb/public/api/protos/draft/ydb_query.pb.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/interconnect.h> + +namespace NKikimr::NGRpcService { + +namespace { + +using namespace NActors; + +using TEvFetchScriptResultsRequest = TGrpcRequestNoOperationCall<Ydb::Query::FetchScriptResultsRequest, + Ydb::Query::FetchScriptResultsResponse>; + +constexpr i64 MAX_ROWS_LIMIT = 1000; + +class TFetchScriptResultsRPC : public TActorBootstrapped<TFetchScriptResultsRPC> { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::GRPC_REQ; + } + + TFetchScriptResultsRPC(TEvFetchScriptResultsRequest* request) + : Request_(request) + {} + + void Bootstrap() { + const auto* req = Request_->GetProtoRequest(); + if (!req) { + Reply(Ydb::StatusIds::INTERNAL_ERROR, "Internal error"); + return; + } + + const TString& executionId = req->execution_id(); + NActors::TActorId runScriptActor; + if (!runScriptActor.Parse(executionId.data(), executionId.size())) { + Reply(Ydb::StatusIds::BAD_REQUEST, "Incorrect execution id"); + return; + } + + if (req->rows_limit() <= 0) { + Reply(Ydb::StatusIds::BAD_REQUEST, "Invalid rows limit"); + return; + } + + if (req->rows_offset() < 0) { + Reply(Ydb::StatusIds::BAD_REQUEST, "Invalid rows offset"); + return; + } + + if (req->rows_limit() > MAX_ROWS_LIMIT) { + Reply(Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Rows limit is too large. Values <= " << MAX_ROWS_LIMIT << " are allowed"); + return; + } + + auto ev = MakeHolder<NKqp::TEvKqp::TEvFetchScriptResultsRequest>(); + ev->Record.SetRowsOffset(req->rows_offset()); + ev->Record.SetRowsLimit(req->rows_limit()); + + ui64 flags = IEventHandle::FlagTrackDelivery; + if (runScriptActor.NodeId() != SelfId().NodeId()) { + flags |= IEventHandle::FlagSubscribeOnSession; + SubscribedOnSession = runScriptActor.NodeId(); + } + Send(runScriptActor, std::move(ev), flags); + + Become(&TFetchScriptResultsRPC::StateFunc); + } + +private: + STRICT_STFUNC(StateFunc, + hFunc(NKqp::TEvKqp::TEvFetchScriptResultsResponse, Handle); + hFunc(NActors::TEvents::TEvUndelivered, Handle); + hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle); + ) + + void Handle(NKqp::TEvKqp::TEvFetchScriptResultsResponse::TPtr& ev) { + Ydb::Query::FetchScriptResultsResponse resp; + resp.set_status(ev->Get()->Record.GetStatus()); + resp.mutable_issues()->Swap(ev->Get()->Record.MutableIssues()); + resp.set_result_set_index(static_cast<i64>(ev->Get()->Record.GetResultSetIndex())); + if (ev->Get()->Record.HasResultSet()) { + resp.mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet()); + } + Reply(resp.status(), std::move(resp)); + } + + void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { + if (ev->Get()->Reason == NActors::TEvents::TEvUndelivered::ReasonActorUnknown) { + Reply(Ydb::StatusIds::NOT_FOUND, "No such execution"); + } else { + Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver request to destination"); + } + } + + void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr&) { + Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver request to destination"); + } + + void PassAway() override { + if (SubscribedOnSession) { + Send(TActivationContext::InterconnectProxy(*SubscribedOnSession), new TEvents::TEvUnsubscribe()); + } + TActorBootstrapped<TFetchScriptResultsRPC>::PassAway(); + } + + void Reply(Ydb::StatusIds::StatusCode status, Ydb::Query::FetchScriptResultsResponse&& result, const NYql::TIssues& issues = {}) { + LOG_INFO_S(TActivationContext::AsActorContext(), NKikimrServices::RPC_REQUEST, "Fetch script results, status: " + << Ydb::StatusIds::StatusCode_Name(status) << (issues ? ". Issues: " : "") << issues.ToOneLineString()); + + for (const auto& issue : issues) { + auto item = result.add_issues(); + NYql::IssueToMessage(issue, item); + } + + result.set_status(status); + + TString serializedResult; + Y_PROTOBUF_SUPPRESS_NODISCARD result.SerializeToString(&serializedResult); + + Request_->SendSerializedResult(std::move(serializedResult), status); + + PassAway(); + } + + void Reply(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) { + Ydb::Query::FetchScriptResultsResponse result; + Reply(status, std::move(result), issues); + } + + void Reply(Ydb::StatusIds::StatusCode status, const TString& errorText) { + NYql::TIssues issues; + issues.AddIssue(errorText); + Reply(status, issues); + } + +private: + std::unique_ptr<TEvFetchScriptResultsRequest> Request_; + TMaybe<ui32> SubscribedOnSession; +}; + +} // namespace + +void DoFetchScriptResults(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) { + Y_UNUSED(f); + auto* req = dynamic_cast<TEvFetchScriptResultsRequest*>(p.release()); + Y_VERIFY(req != nullptr, "Wrong using of TGRpcRequestWrapper"); + TActivationContext::AsActorContext().Register(new TFetchScriptResultsRPC(req)); +} + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/query/service_query.h b/ydb/core/grpc_services/query/service_query.h index 2a02f52ea0..08924d84c3 100644 --- a/ydb/core/grpc_services/query/service_query.h +++ b/ydb/core/grpc_services/query/service_query.h @@ -10,5 +10,6 @@ class IFacilityProvider; void DoExecuteQueryRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&); void DoExecuteScriptRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&); +void DoFetchScriptResults(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&); } // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp index a7eab0d176..170dd68c95 100644 --- a/ydb/core/grpc_services/rpc_execute_data_query.cpp +++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp @@ -88,7 +88,7 @@ public: NKikimrKqp::EQueryType queryType; switch (query.query_case()) { - case Query::kYqlText: { + case Ydb::Table::Query::kYqlText: { NYql::TIssues issues; if (!CheckQuery(query.yql_text(), issues)) { return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx); @@ -99,7 +99,7 @@ public: break; } - case Query::kId: { + case Ydb::Table::Query::kId: { if (query.id().empty()) { NYql::TIssues issues; issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Empty query id")); diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp index 8223279aaa..e1d19dac86 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp @@ -116,7 +116,7 @@ bool FillKqpRequest(const Ydb::Table::ExecuteScanQueryRequest& req, NKikimrKqp:: auto& query = req.query(); switch (query.query_case()) { - case Query::kYqlText: { + case Ydb::Table::Query::kYqlText: { NYql::TIssues issues; if (!CheckQuery(query.yql_text(), issues)) { error = TParseRequestError(Ydb::StatusIds::BAD_REQUEST, issues); @@ -127,7 +127,7 @@ bool FillKqpRequest(const Ydb::Table::ExecuteScanQueryRequest& req, NKikimrKqp:: break; } - case Query::kId: { + case Ydb::Table::Query::kId: { NYql::TIssues issues; issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Specifying query by ID not supported in scan execution.")); diff --git a/ydb/core/kqp/CMakeLists.darwin.txt b/ydb/core/kqp/CMakeLists.darwin.txt index 3eb7972cab..106d4102b4 100644 --- a/ydb/core/kqp/CMakeLists.darwin.txt +++ b/ydb/core/kqp/CMakeLists.darwin.txt @@ -20,6 +20,7 @@ add_subdirectory(provider) add_subdirectory(proxy_service) add_subdirectory(query_compiler) add_subdirectory(rm_service) +add_subdirectory(run_script_actor) add_subdirectory(runtime) add_subdirectory(session_actor) add_subdirectory(topics) diff --git a/ydb/core/kqp/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/CMakeLists.linux-aarch64.txt index c539f81bfa..071a51d8ab 100644 --- a/ydb/core/kqp/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/CMakeLists.linux-aarch64.txt @@ -20,6 +20,7 @@ add_subdirectory(provider) add_subdirectory(proxy_service) add_subdirectory(query_compiler) add_subdirectory(rm_service) +add_subdirectory(run_script_actor) add_subdirectory(runtime) add_subdirectory(session_actor) add_subdirectory(topics) diff --git a/ydb/core/kqp/CMakeLists.linux.txt b/ydb/core/kqp/CMakeLists.linux.txt index c539f81bfa..071a51d8ab 100644 --- a/ydb/core/kqp/CMakeLists.linux.txt +++ b/ydb/core/kqp/CMakeLists.linux.txt @@ -20,6 +20,7 @@ add_subdirectory(provider) add_subdirectory(proxy_service) add_subdirectory(query_compiler) add_subdirectory(rm_service) +add_subdirectory(run_script_actor) add_subdirectory(runtime) add_subdirectory(session_actor) add_subdirectory(topics) diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index 477ae39811..09d7ae7220 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -6,6 +6,7 @@ #include <library/cpp/lwtrace/shuttle.h> #include <ydb/core/kqp/counters/kqp_counters.h> #include <ydb/public/api/protos/ydb_status_codes.pb.h> +#include <ydb/public/api/protos/draft/ydb_query.pb.h> #include <ydb/core/grpc_services/base/base.h> #include <ydb/core/grpc_services/cancelation/cancelation.h> @@ -754,7 +755,43 @@ struct TEvKqp { {} }; + struct TEvScriptRequest : public TEventLocal<TEvScriptRequest, TKqpEvents::EvScriptRequest> { + TEvScriptRequest() = default; + + mutable NKikimrKqp::TEvQueryRequest Record; + }; + + struct TEvScriptResponse : public TEventLocal<TEvScriptResponse, TKqpEvents::EvScriptResponse> { + TEvScriptResponse(TString operationId, TString executionId, Ydb::Query::ExecStatus execStatus, Ydb::Query::ExecMode execMode) + : Status(Ydb::StatusIds::SUCCESS) + , OperationId(std::move(operationId)) + , ExecutionId(std::move(executionId)) + , ExecStatus(execStatus) + , ExecMode(execMode) + {} + + TEvScriptResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) + : Status(status) + , Issues(std::move(issues)) + , ExecStatus(Ydb::Query::EXEC_STATUS_FAILED) + , ExecMode(Ydb::Query::EXEC_MODE_UNSPECIFIED) + {} + + const Ydb::StatusIds::StatusCode Status; + const NYql::TIssues Issues; + const TString OperationId; + const TString ExecutionId; + const Ydb::Query::ExecStatus ExecStatus; + const Ydb::Query::ExecMode ExecMode; + }; + using TEvAbortExecution = NYql::NDq::TEvDq::TEvAbortExecution; + + struct TEvFetchScriptResultsRequest : public TEventPB<TEvFetchScriptResultsRequest, NKikimrKqp::TEvFetchScriptResultsRequest, TKqpEvents::EvFetchScriptResultsRequest> { + }; + + struct TEvFetchScriptResultsResponse : public TEventPB<TEvFetchScriptResultsResponse, NKikimrKqp::TEvFetchScriptResultsResponse, TKqpEvents::EvFetchScriptResultsResponse> { + }; }; class TKqpRequestInfo { diff --git a/ydb/core/kqp/common/kqp_event_ids.h b/ydb/core/kqp/common/kqp_event_ids.h index 76bf41bffe..05012cbd21 100644 --- a/ydb/core/kqp/common/kqp_event_ids.h +++ b/ydb/core/kqp/common/kqp_event_ids.h @@ -30,6 +30,10 @@ struct TKqpEvents { EvDataQueryStreamPart, EvDataQueryStreamPartAck, EvRecompileRequest, + EvScriptRequest, + EvScriptResponse, + EvFetchScriptResultsRequest, + EvFetchScriptResultsResponse, }; static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution); diff --git a/ydb/core/kqp/proxy_service/CMakeLists.darwin.txt b/ydb/core/kqp/proxy_service/CMakeLists.darwin.txt index 1c0bbdf2ff..24f5c082b8 100644 --- a/ydb/core/kqp/proxy_service/CMakeLists.darwin.txt +++ b/ydb/core/kqp/proxy_service/CMakeLists.darwin.txt @@ -21,6 +21,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC core-cms-console core-kqp-common core-kqp-counters + core-kqp-run_script_actor ydb-core-mind ydb-core-protos ) diff --git a/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt index 60d88c62cc..25cdfec4e6 100644 --- a/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt @@ -22,6 +22,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC core-cms-console core-kqp-common core-kqp-counters + core-kqp-run_script_actor ydb-core-mind ydb-core-protos ) diff --git a/ydb/core/kqp/proxy_service/CMakeLists.linux.txt b/ydb/core/kqp/proxy_service/CMakeLists.linux.txt index 60d88c62cc..25cdfec4e6 100644 --- a/ydb/core/kqp/proxy_service/CMakeLists.linux.txt +++ b/ydb/core/kqp/proxy_service/CMakeLists.linux.txt @@ -22,6 +22,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC core-cms-console core-kqp-common core-kqp-counters + core-kqp-run_script_actor ydb-core-mind ydb-core-protos ) diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index fa7d7fd773..007d00e21f 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -13,6 +13,7 @@ #include <ydb/core/kqp/session_actor/kqp_worker_common.h> #include <ydb/core/kqp/node_service/kqp_node_service.h> #include <ydb/core/kqp/rm_service/kqp_rm_service.h> +#include <ydb/core/kqp/run_script_actor/kqp_run_script_actor.h> #include <ydb/core/kqp/runtime/kqp_spilling_file.h> #include <ydb/core/kqp/runtime/kqp_spilling.h> #include <ydb/core/actorlib_impl/long_timer.h> @@ -561,6 +562,14 @@ public: KQP_PROXY_LOG_D(TKqpRequestInfo(traceId, sessionId) << "Sent request to target, requestId: " << requestId << ", targetId: " << targetId); } + void Handle(TEvKqp::TEvScriptRequest::TPtr& ev) { + const NActors::TActorId actorId = Register(CreateRunScriptActor(ev->Get()->Record)); + Ydb::TOperationId operationId; + operationId.SetKind(Ydb::TOperationId::SCRIPT); + NOperationId::AddOptionalValue(operationId, "actor_id", actorId.ToString()); + Send(ev->Sender, new TEvKqp::TEvScriptResponse(NOperationId::ProtoToString(operationId), actorId.ToString(), Ydb::Query::EXEC_STATUS_STARTING, Ydb::Query::EXEC_MODE_EXECUTE)); + } + void Handle(TEvKqp::TEvCloseSessionRequest::TPtr& ev) { auto& event = ev->Get()->Record; auto& request = event.GetRequest(); @@ -994,6 +1003,7 @@ public: hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, Handle); hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle); hFunc(TEvKqp::TEvQueryRequest, Handle); + hFunc(TEvKqp::TEvScriptRequest, Handle); hFunc(TEvKqp::TEvCloseSessionRequest, Handle); hFunc(TEvKqp::TEvQueryResponse, ForwardEvent); hFunc(TEvKqp::TEvProcessResponse, ForwardEvent); diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.darwin.txt b/ydb/core/kqp/run_script_actor/CMakeLists.darwin.txt new file mode 100644 index 0000000000..0359b1c93e --- /dev/null +++ b/ydb/core/kqp/run_script_actor/CMakeLists.darwin.txt @@ -0,0 +1,25 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-kqp-run_script_actor) +target_compile_options(core-kqp-run_script_actor PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-kqp-run_script_actor PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-base + ydb-core-protos + core-kqp-executer_actor + api-protos +) +target_sources(core-kqp-run_script_actor PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp +) diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..fb65e2fbd0 --- /dev/null +++ b/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt @@ -0,0 +1,26 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-kqp-run_script_actor) +target_compile_options(core-kqp-run_script_actor PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-kqp-run_script_actor PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-base + ydb-core-protos + core-kqp-executer_actor + api-protos +) +target_sources(core-kqp-run_script_actor PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp +) diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.linux.txt b/ydb/core/kqp/run_script_actor/CMakeLists.linux.txt new file mode 100644 index 0000000000..fb65e2fbd0 --- /dev/null +++ b/ydb/core/kqp/run_script_actor/CMakeLists.linux.txt @@ -0,0 +1,26 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-kqp-run_script_actor) +target_compile_options(core-kqp-run_script_actor PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-kqp-run_script_actor PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-base + ydb-core-protos + core-kqp-executer_actor + api-protos +) +target_sources(core-kqp-run_script_actor PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp +) diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.txt b/ydb/core/kqp/run_script_actor/CMakeLists.txt new file mode 100644 index 0000000000..5bb4faffb4 --- /dev/null +++ b/ydb/core/kqp/run_script_actor/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp new file mode 100644 index 0000000000..12fd725346 --- /dev/null +++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp @@ -0,0 +1,172 @@ +#include "kqp_run_script_actor.h" + +#include <ydb/core/base/kikimr_issue.h> +#include <ydb/core/kqp/common/kqp.h> +#include <ydb/core/kqp/executer_actor/kqp_executer.h> +#include <ydb/core/protos/issue_id.pb.h> +#include <ydb/public/api/protos/ydb_status_codes.pb.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/event_pb.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> + +#include <util/generic/size_literals.h> + +#define LOG_T(stream) LOG_TRACE_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream); +#define LOG_D(stream) LOG_DEBUG_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream); +#define LOG_I(stream) LOG_INFO_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream); +#define LOG_W(stream) LOG_WARN_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream); +#define LOG_E(stream) LOG_ERROR_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream); + +namespace NKikimr::NKqp { + +namespace { + +constexpr ui64 RESULT_SIZE_LIMIT = 10_MB; +constexpr int RESULT_ROWS_LIMIT = 1000; + +class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> { +public: + TRunScriptActor(const NKikimrKqp::TEvQueryRequest& request) + : Request(request) + {} + + static constexpr char ActorName[] = "KQP_RUN_SCRIPT_ACTOR"; + + void Bootstrap() { + Become(&TRunScriptActor::StateFunc); + + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + ev->Record = Request; + + NActors::ActorIdToProto(SelfId(), ev->Record.MutableRequestActorId()); + + if (!Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), ev.Release())) { + Issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error")); + Finish(Ydb::StatusIds::INTERNAL_ERROR); + } + } + +private: + STRICT_STFUNC(StateFunc, + hFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle); + hFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle); + hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); + hFunc(NKqp::TEvKqp::TEvFetchScriptResultsRequest, Handle); + ) + + void Handle(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) { + ExecuterActorId = ActorIdFromProto(ev->Get()->Record.GetExecuterActorId()); + LOG_D("ExecuterActorId: " << ExecuterActorId); + } + + void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) { + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + resp->Record.SetFreeSpace(RESULT_SIZE_LIMIT); + + LOG_D("Send stream data ack" + << ", seqNo: " << ev->Get()->Record.GetSeqNo() + << ", to: " << ev->Sender); + + Send(ev->Sender, resp.Release()); + + if (!IsFinished() && !IsTruncated()) { + MergeResultSet(ev); + } + } + + void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) { + auto& record = ev->Get()->Record.GetRef(); + + const auto& issueMessage = record.GetResponse().GetQueryIssues(); + NYql::IssuesFromMessage(issueMessage, Issues); + + Finish(record.GetYdbStatus()); + } + + void Handle(NKqp::TEvKqp::TEvFetchScriptResultsRequest::TPtr& ev) { + auto resp = MakeHolder<NKqp::TEvKqp::TEvFetchScriptResultsResponse>(); + if (!IsFinished()) { + resp->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST); + resp->Record.AddIssues()->set_message("Results are not ready"); + } else { + if (!ResultSets.empty()) { + resp->Record.SetResultSetIndex(0); + resp->Record.MutableResultSet()->mutable_columns()->CopyFrom(ResultSets[0].columns()); + + const ui64 rowsOffset = ev->Get()->Record.GetRowsOffset(); + const ui64 rowsLimit = ev->Get()->Record.GetRowsLimit(); + ui64 rowsAdded = 0; + for (i64 row = static_cast<i64>(rowsOffset); row < ResultSets[0].rows_size(); ++row) { + if (rowsAdded >= rowsLimit) { + resp->Record.MutableResultSet()->set_truncated(true); + break; + } + resp->Record.MutableResultSet()->add_rows()->CopyFrom(ResultSets[0].rows(row)); + } + } + resp->Record.SetStatus(Status); + for (const auto& issue : Issues) { + auto item = resp->Record.add_issues(); + NYql::IssueToMessage(issue, item); + } + } + Send(ev->Sender, std::move(resp)); + } + + void MergeResultSet(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) { + if (ResultSets.empty()) { + ResultSets.emplace_back(ev->Get()->Record.GetResultSet()); + return; + } + if (ResultSets[0].columns_size() != ev->Get()->Record.GetResultSet().columns_size()) { + Issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error")); + Finish(Ydb::StatusIds::INTERNAL_ERROR); + return; + } + size_t rowsAdded = 0; + for (auto& row : *ev->Get()->Record.MutableResultSet()->mutable_rows()) { + ResultSets[0].add_rows()->Swap(&row); + ++rowsAdded; + if (ResultSets[0].rows_size() >= RESULT_ROWS_LIMIT) { + break; + } + } + if (ev->Get()->Record.GetResultSet().truncated() || ResultSets[0].rows_size() >= RESULT_ROWS_LIMIT || ResultSets[0].ByteSizeLong() >= RESULT_SIZE_LIMIT) { + ResultSets[0].set_truncated(true); + } + LOG_D("Received partial result. Rows added: " << rowsAdded << ". Truncated: " << IsTruncated()); + } + + void Finish(Ydb::StatusIds::StatusCode status) { + Status = status; + } + + bool IsFinished() const { + return Status != Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; + } + + bool IsTruncated() const { + return !ResultSets.empty() && ResultSets[0].truncated(); + } + +private: + const NKikimrKqp::TEvQueryRequest Request; + + NActors::TActorId ExecuterActorId; + + // Result + NYql::TIssues Issues; + Ydb::StatusIds::StatusCode Status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; + std::vector<Ydb::ResultSet> ResultSets; +}; + +} // namespace + +NActors::IActor* CreateRunScriptActor(const NKikimrKqp::TEvQueryRequest& request) { + return new TRunScriptActor(request); +} + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h new file mode 100644 index 0000000000..5f0ee7241b --- /dev/null +++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h @@ -0,0 +1,14 @@ +#pragma once + +#include <ydb/core/protos/kqp.pb.h> + +#include <library/cpp/actors/core/actor.h> + +namespace NKikimr::NKqp { + +struct TEvKqpRunScriptActor { +}; + +NActors::IActor* CreateRunScriptActor(const NKikimrKqp::TEvQueryRequest& request); + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp index d01e88f320..5db75604d8 100644 --- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp @@ -90,10 +90,27 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient(); - auto result = db.ExecuteScript(R"( + auto executeScrptsResult = db.ExecuteScript(R"( SELECT 42 )").ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.Status().GetStatus(), EStatus::SUCCESS, result.Status().GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(executeScrptsResult.Status().GetStatus(), EStatus::SUCCESS, executeScrptsResult.Status().GetIssues().ToString()); + UNIT_ASSERT(executeScrptsResult.Metadata().ExecutionId); + + TMaybe<TFetchScriptResultsResult> results; + do { + Sleep(TDuration::MilliSeconds(50)); + TAsyncFetchScriptResultsResult future = db.FetchScriptResults(executeScrptsResult.Metadata().ExecutionId); + results.ConstructInPlace(future.ExtractValueSync()); + if (!results->IsSuccess()) { + UNIT_ASSERT_C(results->GetStatus() == NYdb::EStatus::BAD_REQUEST, results->GetStatus()); + UNIT_ASSERT_STRING_CONTAINS(results->GetIssues().ToOneLineString(), "Results are not ready"); + } + } while (!results->HasResultSet()); + TResultSetParser resultSet(results->ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1); + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetInt32(), 42); } } diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 58c9ba8b75..5220ae755b 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -593,3 +593,15 @@ message TKqpStreamLookupSettings { optional bool ImmediateTx = 6; repeated string LookupKeyColumns = 7; } + +message TEvFetchScriptResultsRequest { + optional uint64 RowsOffset = 1; + optional uint64 RowsLimit = 2; +} + +message TEvFetchScriptResultsResponse { + optional Ydb.StatusIds.StatusCode Status = 1; + repeated Ydb.Issue.IssueMessage Issues = 2; + optional uint64 ResultSetIndex = 3; + optional Ydb.ResultSet ResultSet = 4; +} diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp index de36842701..b8c5c4f118 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp @@ -65,6 +65,53 @@ public: return promise.GetFuture(); } + TAsyncFetchScriptResultsResult FetchScriptResults(const TString& executionId, const TFetchScriptResultsSettings& settings) { + using namespace Ydb::Query; + auto request = MakeRequest<FetchScriptResultsRequest>(); + request.set_execution_id(executionId); + if (settings.FetchToken_) { + request.set_fetch_token(settings.FetchToken_); + } + request.set_rows_offset(settings.RowsOffset_); + request.set_rows_limit(settings.RowsLimit_); + + auto promise = NThreading::NewPromise<TFetchScriptResultsResult>(); + + auto extractor = [promise] + (FetchScriptResultsResponse* response, TPlainStatus status) mutable { + if (response) { + NYql::TIssues opIssues; + NYql::IssuesFromMessage(response->issues(), opIssues); + TStatus st(static_cast<EStatus>(response->status()), std::move(opIssues)); + + promise.SetValue( + TFetchScriptResultsResult( + std::move(st), + TResultSet(std::move(*response->mutable_result_set())), + response->result_set_index(), + response->next_fetch_token() + ) + ); + } else { + TStatus st(std::move(status)); + promise.SetValue(TFetchScriptResultsResult(std::move(st))); + } + }; + + TRpcRequestSettings rpcSettings; + rpcSettings.ClientTimeout = TDuration::Seconds(60); + + Connections_->Run<V1::QueryService, FetchScriptResultsRequest, FetchScriptResultsResponse>( + std::move(request), + extractor, + &V1::QueryService::Stub::AsyncFetchScriptResults, + DbDriverState_, + rpcSettings, + TEndpointKey()); + + return promise.GetFuture(); + } + private: TClientSettings Settings_; }; @@ -92,4 +139,10 @@ TAsyncExecuteScriptResult TQueryClient::ExecuteScript(const TString& script, return Impl_->ExecuteScript(script, settings); } +TAsyncFetchScriptResultsResult TQueryClient::FetchScriptResults(const TString& executionId, + const TFetchScriptResultsSettings& settings) +{ + return Impl_->FetchScriptResults(executionId, settings); +} + } // namespace NYdb::NQuery diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.h b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h index fe702eb300..e3d073f5ad 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/client.h +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h @@ -30,6 +30,9 @@ public: TAsyncExecuteScriptResult ExecuteScript(const TString& script, const TExecuteScriptSettings& settings = TExecuteScriptSettings()); + TAsyncFetchScriptResultsResult FetchScriptResults(const TString& executionId, + const TFetchScriptResultsSettings& settings = TFetchScriptResultsSettings()); + private: class TImpl; std::shared_ptr<TImpl> Impl_; diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp index bf1a8e517b..146847bdba 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp @@ -14,4 +14,13 @@ TResultSet TExecuteQueryResult::GetResultSet(size_t resultIndex) const { return ResultSets_[resultIndex]; } +TExecuteScriptResult::TExecuteScriptResult(TStatus&& status, Ydb::Operations::Operation&& operation) + : TOperation(std::move(status), std::move(operation)) +{ + Ydb::Query::ExecuteScriptMetadata metadata; + GetProto().metadata().UnpackTo(&metadata); + + Metadata_.ExecutionId = metadata.execution_id(); +} + } // namespace NYdb::NQuery diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/query.h b/ydb/public/sdk/cpp/client/draft/ydb_query/query.h index 9e40a2ec72..6a442d8ee0 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/query.h +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/query.h @@ -3,6 +3,7 @@ #include <ydb/public/api/grpc/draft/ydb_query_v1.grpc.pb.h> #include <ydb/public/sdk/cpp/client/ydb_result/result.h> +#include <ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h> #include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h> #include <ydb/public/sdk/cpp/client/ydb_types/request_settings.h> #include <ydb/public/sdk/cpp/client/ydb_types/status/status.h> @@ -85,9 +86,54 @@ struct TExecuteScriptSettings : public TOperationRequestSettings<TExecuteScriptS class TExecuteScriptResult : public TOperation { public: + struct TMetadata { + TString ExecutionId; + }; + using TOperation::TOperation; + TExecuteScriptResult(TStatus&& status, Ydb::Operations::Operation&& operation); + + const TMetadata& Metadata() const { + return Metadata_; + } + +private: + TMetadata Metadata_; }; using TAsyncExecuteScriptResult = NThreading::TFuture<TExecuteScriptResult>; +struct TFetchScriptResultsSettings : public TRequestSettings<TFetchScriptResultsSettings> { + FLUENT_SETTING(TString, FetchToken); + FLUENT_SETTING_DEFAULT(ui64, RowsOffset, 0); + FLUENT_SETTING_DEFAULT(ui64, RowsLimit, 1000); +}; + +class TFetchScriptResultsResult : public TStatus { +public: + bool HasResultSet() const { return ResultSet_.Defined(); } + ui64 GetResultSetIndex() const { return ResultSetIndex_; } + const TResultSet& GetResultSet() const { return *ResultSet_; } + TResultSet ExtractResultSet() { return std::move(*ResultSet_); } + const TString& NextPageToken() const { return NextPageToken_; } + + explicit TFetchScriptResultsResult(TStatus&& status) + : TStatus(std::move(status)) + {} + + TFetchScriptResultsResult(TStatus&& status, TResultSet&& resultSet, i64 resultSetIndex, const TString& nextPageToken) + : TStatus(std::move(status)) + , ResultSet_(std::move(resultSet)) + , ResultSetIndex_(resultSetIndex) + , NextPageToken_(nextPageToken) + {} + +private: + TMaybe<TResultSet> ResultSet_; + i64 ResultSetIndex_ = 0; + TString NextPageToken_; +}; + +using TAsyncFetchScriptResultsResult = NThreading::TFuture<TFetchScriptResultsResult>; + } // namespace NYdb::NQuery diff --git a/ydb/services/ydb/ydb_query.cpp b/ydb/services/ydb/ydb_query.cpp index d41fbe743c..698417e203 100644 --- a/ydb/services/ydb/ydb_query.cpp +++ b/ydb/services/ydb/ydb_query.cpp @@ -34,6 +34,12 @@ void TGRpcYdbQueryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { new TGrpcRequestNoOperationCall<ExecuteScriptRequest, Ydb::Operations::Operation> (ctx, &DoExecuteScriptRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); }) + + ADD_REQUEST(FetchScriptResults, FetchScriptResultsRequest, FetchScriptResultsResponse, { + ActorSystem_->Send(GRpcRequestProxyId_, + new TGrpcRequestNoOperationCall<FetchScriptResultsRequest, FetchScriptResultsResponse> + (ctx, &DoFetchScriptResults, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); + }) #undef ADD_REQUEST } |