aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-04-26 19:06:50 +0300
committerdcherednik <dcherednik@ydb.tech>2023-04-26 19:06:50 +0300
commitab82746e7a7418011af7429a6f8ab5193fab9fea (patch)
treeaaf37a59f0e46a2ab315db7e464226b17e02d323
parentf83eaa2132d8892a0cbeaf87e149aba08c9577f9 (diff)
downloadydb-ab82746e7a7418011af7429a6f8ab5193fab9fea.tar.gz
Fix TEvQueryRequest usage for stream scripting.
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp2
-rw-r--r--ydb/core/kqp/common/kqp_event_impl.cpp1
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp6
-rw-r--r--ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp20
4 files changed, 24 insertions, 5 deletions
diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
index 613fe6a7d8..90b61a5207 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
@@ -153,8 +153,6 @@ private:
req->has_operation_params() ? &req->operation_params() : nullptr
);
- ActorIdToProto(this->SelfId(), ev->Record.MutableRequestActorId());
-
if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) {
return ReplyFinishStream("Couldn't send request to KqpProxy");
}
diff --git a/ydb/core/kqp/common/kqp_event_impl.cpp b/ydb/core/kqp/common/kqp_event_impl.cpp
index 1088f60229..b3c18172ba 100644
--- a/ydb/core/kqp/common/kqp_event_impl.cpp
+++ b/ydb/core/kqp/common/kqp_event_impl.cpp
@@ -48,6 +48,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
Record.MutableRequest()->SetDatabase(Database);
ActorIdToProto(RequestActorId, Record.MutableCancelationActor());
+ ActorIdToProto(RequestActorId, Record.MutableRequestActorId());
if (auto traceId = RequestCtx->GetTraceId()) {
Record.SetTraceId(traceId.GetRef());
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 8aaa7698e3..4243fe96ca 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -179,11 +179,11 @@ public:
}
bool ConvertParameters() {
- auto& event = QueryState->RequestEv->Record;
+ auto& proto = QueryState->RequestEv->Record;
- if (!event.GetRequest().HasParameters() && event.GetRequest().YdbParametersSize()) {
+ if (!proto.GetRequest().HasParameters() && QueryState->RequestEv->GetYdbParameters().size()) {
try {
- ConvertYdbParamsToMiniKQLParams(event.GetRequest().GetYdbParameters(), *event.MutableRequest()->MutableParameters());
+ ConvertYdbParamsToMiniKQLParams(QueryState->RequestEv->GetYdbParameters(), *proto.MutableRequest()->MutableParameters());
} catch (const std::exception& ex) {
TString message = TStringBuilder() << "Failed to parse query parameters. "<< ex.what();
ReplyProcessError(QueryState->Sender, QueryState->ProxyRequestId, Ydb::StatusIds::BAD_REQUEST, message);
diff --git a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
index 3e4488a6ad..bab5024c1d 100644
--- a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
+++ b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
@@ -95,6 +95,26 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
UNIT_ASSERT_VALUES_EQUAL(rs0.ColumnParser(0).GetUint64(), 8u);
}
+ Y_UNIT_TEST(StreamScanQuery) {
+ TKikimrRunner kikimr;
+ TScriptingClient client(kikimr.GetDriver());
+
+ auto params = client.GetParamsBuilder()
+ .AddParam("$text").String("Value1").Build()
+ .Build();
+
+ auto it = client.StreamExecuteYqlScript(R"(
+ PRAGMA db.ScanQuery = "true";
+ DECLARE $text AS String;
+ SELECT COUNT(*) FROM `/Root/EightShard` WHERE Text = $text;
+ )", params).GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+
+ CompareYson(R"([
+ [[8u]]
+ ])", StreamResultToYson(it));
+ }
+
Y_UNIT_TEST(ScanQueryInvalid) {
TKikimrRunner kikimr;
TScriptingClient client(kikimr.GetDriver());