summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <[email protected]>2024-03-28 15:59:04 +0100
committerGitHub <[email protected]>2024-03-28 15:59:04 +0100
commit03802296567a7b6c033480b72b309ebfa46ecc74 (patch)
treee6ae6d581079a86fb5e0211535973ae5dde4832a
parent71ccd148087baeb233db4f6525dbef9b57e39f6f (diff)
[refactoring] Use query settings for kqp TEvQueryRequest (#3250)
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_query.cpp11
-rw-r--r--ydb/core/grpc_services/rpc_execute_yql_script.cpp9
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp9
-rw-r--r--ydb/core/kqp/common/events/query.h42
-rw-r--r--ydb/core/kqp/common/kqp_event_impl.cpp13
5 files changed, 55 insertions, 29 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp
index ba8b42cf88b..ef52c40ceef 100644
--- a/ydb/core/grpc_services/query/rpc_execute_query.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp
@@ -273,6 +273,12 @@ private:
auto cachePolicy = google::protobuf::Arena::CreateMessage<Ydb::Table::QueryCachePolicy>(Request_->GetArena());
cachePolicy->set_keep_in_cache(true);
+ auto settings = NKqp::NPrivateEvents::TQueryRequestSettings()
+ .SetKeepSession(false)
+ .SetUseCancelAfter(false)
+ .SetSyntax(syntax)
+ .SetSupportStreamTrailingResult(true);
+
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
QueryAction,
queryType,
@@ -286,10 +292,7 @@ private:
GetCollectStatsMode(req->stats_mode()),
cachePolicy,
nullptr, // operationParams
- false, // keepSession
- false, // useCancelAfter
- syntax,
- true); // trailing support
+ settings);
if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) {
NYql::TIssues issues;
diff --git a/ydb/core/grpc_services/rpc_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_execute_yql_script.cpp
index 8f1a23be8a7..72b8350d8b7 100644
--- a/ydb/core/grpc_services/rpc_execute_yql_script.cpp
+++ b/ydb/core/grpc_services/rpc_execute_yql_script.cpp
@@ -58,6 +58,11 @@ public:
::Ydb::Operations::OperationParams operationParams;
+ auto settings = NKqp::NPrivateEvents::TQueryRequestSettings()
+ .SetKeepSession(false)
+ .SetUseCancelAfter(false)
+ .SetSyntax(req->syntax());
+
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
NKikimrKqp::QUERY_ACTION_EXECUTE,
NKikimrKqp::QUERY_TYPE_SQL_SCRIPT,
@@ -71,9 +76,7 @@ public:
req->collect_stats(),
nullptr, // query_cache_policy
req->has_operation_params() ? &req->operation_params() : nullptr,
- false, // keep session
- false, // use cancelAfter
- req->syntax()
+ settings
);
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0, Span_.GetTraceId());
diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
index 3591bc330cc..6c85b8f9c30 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
@@ -165,6 +165,11 @@ private:
::Ydb::Operations::OperationParams operationParams;
+ auto settings = NKqp::NPrivateEvents::TQueryRequestSettings()
+ .SetKeepSession(false)
+ .SetUseCancelAfter(false)
+ .SetSyntax(req->syntax());
+
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
NKikimrKqp::QUERY_ACTION_EXECUTE,
NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING,
@@ -178,9 +183,7 @@ private:
req->collect_stats(),
nullptr, // query_cache_policy
req->has_operation_params() ? &req->operation_params() : nullptr,
- false, // keep session
- false, // use cancelAfter
- req->syntax()
+ settings
);
if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) {
diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h
index 71a4b63504d..7d01c9b28c5 100644
--- a/ydb/core/kqp/common/events/query.h
+++ b/ydb/core/kqp/common/events/query.h
@@ -18,6 +18,33 @@ struct TEvQueryRequestRemote: public TEventPB<TEvQueryRequestRemote, NKikimrKqp:
TKqpEvents::EvQueryRequest> {
};
+struct TQueryRequestSettings {
+ TQueryRequestSettings& SetKeepSession(bool flag) {
+ KeepSession = flag;
+ return *this;
+ }
+
+ TQueryRequestSettings& SetUseCancelAfter(bool flag) {
+ UseCancelAfter = flag;
+ return *this;
+ }
+
+ TQueryRequestSettings& SetSyntax(const ::Ydb::Query::Syntax& syntax) {
+ Syntax = syntax;
+ return *this;
+ }
+
+ TQueryRequestSettings& SetSupportStreamTrailingResult(bool flag) {
+ SupportsStreamTrailingResult = flag;
+ return *this;
+ }
+
+ bool KeepSession = false;
+ bool UseCancelAfter = true;
+ ::Ydb::Query::Syntax Syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED;
+ bool SupportsStreamTrailingResult = false;
+};
+
struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents::EvQueryRequest> {
public:
TEvQueryRequest(
@@ -33,10 +60,7 @@ public:
const ::Ydb::Table::QueryStatsCollection::Mode collectStats,
const ::Ydb::Table::QueryCachePolicy* queryCachePolicy,
const ::Ydb::Operations::OperationParams* operationParams,
- bool keepSession = false,
- bool useCancelAfter = true,
- const ::Ydb::Query::Syntax syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED,
- bool supportsStreamTrailingResult = false);
+ const TQueryRequestSettings& querySettings = TQueryRequestSettings());
TEvQueryRequest() = default;
@@ -67,7 +91,7 @@ public:
}
bool GetKeepSession() const {
- return RequestCtx ? KeepSession : Record.GetRequest().GetKeepSession();
+ return RequestCtx ? QuerySettings.KeepSession : Record.GetRequest().GetKeepSession();
}
TDuration GetCancelAfter() const {
@@ -103,7 +127,7 @@ public:
}
Ydb::Query::Syntax GetSyntax() const {
- return RequestCtx ? Syntax : Record.GetRequest().GetSyntax();
+ return RequestCtx ? QuerySettings.Syntax : Record.GetRequest().GetSyntax();
}
bool HasPreparedQuery() const {
@@ -288,7 +312,7 @@ public:
}
bool GetSupportsStreamTrailingResult() const {
- return SupportsStreamTrailingResult;
+ return QuerySettings.SupportsStreamTrailingResult;
}
TDuration GetProgressStatsPeriod() const {
@@ -317,13 +341,11 @@ private:
const ::Ydb::Table::QueryStatsCollection::Mode CollectStats = Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE;
const ::Ydb::Table::QueryCachePolicy* QueryCachePolicy = nullptr;
const bool HasOperationParams = false;
- bool KeepSession = false;
+ const TQueryRequestSettings QuerySettings;
TDuration OperationTimeout;
TDuration CancelAfter;
- const ::Ydb::Query::Syntax Syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED;
TIntrusivePtr<TUserRequestContext> UserRequestContext;
TDuration ProgressStatsPeriod;
- bool SupportsStreamTrailingResult = false;
};
struct TEvDataQueryStreamPart: public TEventPB<TEvDataQueryStreamPart,
diff --git a/ydb/core/kqp/common/kqp_event_impl.cpp b/ydb/core/kqp/common/kqp_event_impl.cpp
index 4bc996a2d2f..478891c2d20 100644
--- a/ydb/core/kqp/common/kqp_event_impl.cpp
+++ b/ydb/core/kqp/common/kqp_event_impl.cpp
@@ -18,10 +18,7 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest(
const ::Ydb::Table::QueryStatsCollection::Mode collectStats,
const ::Ydb::Table::QueryCachePolicy* queryCachePolicy,
const ::Ydb::Operations::OperationParams* operationParams,
- bool keepSession,
- bool useCancelAfter,
- const ::Ydb::Query::Syntax syntax,
- bool supportsStreamTrailingResult)
+ const TQueryRequestSettings& querySettings)
: RequestCtx(ctx)
, RequestActorId(requestActorId)
, Database(CanonizePath(ctx->GetDatabaseName().GetOrElse("")))
@@ -35,13 +32,11 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest(
, CollectStats(collectStats)
, QueryCachePolicy(queryCachePolicy)
, HasOperationParams(operationParams)
- , KeepSession(keepSession)
- , Syntax(syntax)
- , SupportsStreamTrailingResult(supportsStreamTrailingResult)
+ , QuerySettings(querySettings)
{
if (HasOperationParams) {
OperationTimeout = GetDuration(operationParams->operation_timeout());
- if (useCancelAfter) {
+ if (QuerySettings.UseCancelAfter) {
CancelAfter = GetDuration(operationParams->cancel_after());
}
}
@@ -92,7 +87,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
Record.MutableRequest()->SetSessionId(SessionId);
Record.MutableRequest()->SetAction(QueryAction);
Record.MutableRequest()->SetType(QueryType);
- Record.MutableRequest()->SetSyntax(Syntax);
+ Record.MutableRequest()->SetSyntax(QuerySettings.Syntax);
if (HasOperationParams) {
Record.MutableRequest()->SetCancelAfterMs(CancelAfter.MilliSeconds());
Record.MutableRequest()->SetTimeoutMs(OperationTimeout.MilliSeconds());