diff options
| author | Daniil Cherednik <[email protected]> | 2024-03-28 15:59:04 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-03-28 15:59:04 +0100 |
| commit | 03802296567a7b6c033480b72b309ebfa46ecc74 (patch) | |
| tree | e6ae6d581079a86fb5e0211535973ae5dde4832a | |
| parent | 71ccd148087baeb233db4f6525dbef9b57e39f6f (diff) | |
[refactoring] Use query settings for kqp TEvQueryRequest (#3250)
| -rw-r--r-- | ydb/core/grpc_services/query/rpc_execute_query.cpp | 11 | ||||
| -rw-r--r-- | ydb/core/grpc_services/rpc_execute_yql_script.cpp | 9 | ||||
| -rw-r--r-- | ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp | 9 | ||||
| -rw-r--r-- | ydb/core/kqp/common/events/query.h | 42 | ||||
| -rw-r--r-- | ydb/core/kqp/common/kqp_event_impl.cpp | 13 |
5 files changed, 55 insertions, 29 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index ba8b42cf88b..ef52c40ceef 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -273,6 +273,12 @@ private: auto cachePolicy = google::protobuf::Arena::CreateMessage<Ydb::Table::QueryCachePolicy>(Request_->GetArena()); cachePolicy->set_keep_in_cache(true); + auto settings = NKqp::NPrivateEvents::TQueryRequestSettings() + .SetKeepSession(false) + .SetUseCancelAfter(false) + .SetSyntax(syntax) + .SetSupportStreamTrailingResult(true); + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>( QueryAction, queryType, @@ -286,10 +292,7 @@ private: GetCollectStatsMode(req->stats_mode()), cachePolicy, nullptr, // operationParams - false, // keepSession - false, // useCancelAfter - syntax, - true); // trailing support + settings); if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) { NYql::TIssues issues; diff --git a/ydb/core/grpc_services/rpc_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_execute_yql_script.cpp index 8f1a23be8a7..72b8350d8b7 100644 --- a/ydb/core/grpc_services/rpc_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_execute_yql_script.cpp @@ -58,6 +58,11 @@ public: ::Ydb::Operations::OperationParams operationParams; + auto settings = NKqp::NPrivateEvents::TQueryRequestSettings() + .SetKeepSession(false) + .SetUseCancelAfter(false) + .SetSyntax(req->syntax()); + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>( NKikimrKqp::QUERY_ACTION_EXECUTE, NKikimrKqp::QUERY_TYPE_SQL_SCRIPT, @@ -71,9 +76,7 @@ public: req->collect_stats(), nullptr, // query_cache_policy req->has_operation_params() ? &req->operation_params() : nullptr, - false, // keep session - false, // use cancelAfter - req->syntax() + settings ); ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0, Span_.GetTraceId()); diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp index 3591bc330cc..6c85b8f9c30 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp @@ -165,6 +165,11 @@ private: ::Ydb::Operations::OperationParams operationParams; + auto settings = NKqp::NPrivateEvents::TQueryRequestSettings() + .SetKeepSession(false) + .SetUseCancelAfter(false) + .SetSyntax(req->syntax()); + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>( NKikimrKqp::QUERY_ACTION_EXECUTE, NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING, @@ -178,9 +183,7 @@ private: req->collect_stats(), nullptr, // query_cache_policy req->has_operation_params() ? &req->operation_params() : nullptr, - false, // keep session - false, // use cancelAfter - req->syntax() + settings ); if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) { diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h index 71a4b63504d..7d01c9b28c5 100644 --- a/ydb/core/kqp/common/events/query.h +++ b/ydb/core/kqp/common/events/query.h @@ -18,6 +18,33 @@ struct TEvQueryRequestRemote: public TEventPB<TEvQueryRequestRemote, NKikimrKqp: TKqpEvents::EvQueryRequest> { }; +struct TQueryRequestSettings { + TQueryRequestSettings& SetKeepSession(bool flag) { + KeepSession = flag; + return *this; + } + + TQueryRequestSettings& SetUseCancelAfter(bool flag) { + UseCancelAfter = flag; + return *this; + } + + TQueryRequestSettings& SetSyntax(const ::Ydb::Query::Syntax& syntax) { + Syntax = syntax; + return *this; + } + + TQueryRequestSettings& SetSupportStreamTrailingResult(bool flag) { + SupportsStreamTrailingResult = flag; + return *this; + } + + bool KeepSession = false; + bool UseCancelAfter = true; + ::Ydb::Query::Syntax Syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED; + bool SupportsStreamTrailingResult = false; +}; + struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents::EvQueryRequest> { public: TEvQueryRequest( @@ -33,10 +60,7 @@ public: const ::Ydb::Table::QueryStatsCollection::Mode collectStats, const ::Ydb::Table::QueryCachePolicy* queryCachePolicy, const ::Ydb::Operations::OperationParams* operationParams, - bool keepSession = false, - bool useCancelAfter = true, - const ::Ydb::Query::Syntax syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED, - bool supportsStreamTrailingResult = false); + const TQueryRequestSettings& querySettings = TQueryRequestSettings()); TEvQueryRequest() = default; @@ -67,7 +91,7 @@ public: } bool GetKeepSession() const { - return RequestCtx ? KeepSession : Record.GetRequest().GetKeepSession(); + return RequestCtx ? QuerySettings.KeepSession : Record.GetRequest().GetKeepSession(); } TDuration GetCancelAfter() const { @@ -103,7 +127,7 @@ public: } Ydb::Query::Syntax GetSyntax() const { - return RequestCtx ? Syntax : Record.GetRequest().GetSyntax(); + return RequestCtx ? QuerySettings.Syntax : Record.GetRequest().GetSyntax(); } bool HasPreparedQuery() const { @@ -288,7 +312,7 @@ public: } bool GetSupportsStreamTrailingResult() const { - return SupportsStreamTrailingResult; + return QuerySettings.SupportsStreamTrailingResult; } TDuration GetProgressStatsPeriod() const { @@ -317,13 +341,11 @@ private: const ::Ydb::Table::QueryStatsCollection::Mode CollectStats = Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE; const ::Ydb::Table::QueryCachePolicy* QueryCachePolicy = nullptr; const bool HasOperationParams = false; - bool KeepSession = false; + const TQueryRequestSettings QuerySettings; TDuration OperationTimeout; TDuration CancelAfter; - const ::Ydb::Query::Syntax Syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED; TIntrusivePtr<TUserRequestContext> UserRequestContext; TDuration ProgressStatsPeriod; - bool SupportsStreamTrailingResult = false; }; struct TEvDataQueryStreamPart: public TEventPB<TEvDataQueryStreamPart, diff --git a/ydb/core/kqp/common/kqp_event_impl.cpp b/ydb/core/kqp/common/kqp_event_impl.cpp index 4bc996a2d2f..478891c2d20 100644 --- a/ydb/core/kqp/common/kqp_event_impl.cpp +++ b/ydb/core/kqp/common/kqp_event_impl.cpp @@ -18,10 +18,7 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest( const ::Ydb::Table::QueryStatsCollection::Mode collectStats, const ::Ydb::Table::QueryCachePolicy* queryCachePolicy, const ::Ydb::Operations::OperationParams* operationParams, - bool keepSession, - bool useCancelAfter, - const ::Ydb::Query::Syntax syntax, - bool supportsStreamTrailingResult) + const TQueryRequestSettings& querySettings) : RequestCtx(ctx) , RequestActorId(requestActorId) , Database(CanonizePath(ctx->GetDatabaseName().GetOrElse(""))) @@ -35,13 +32,11 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest( , CollectStats(collectStats) , QueryCachePolicy(queryCachePolicy) , HasOperationParams(operationParams) - , KeepSession(keepSession) - , Syntax(syntax) - , SupportsStreamTrailingResult(supportsStreamTrailingResult) + , QuerySettings(querySettings) { if (HasOperationParams) { OperationTimeout = GetDuration(operationParams->operation_timeout()); - if (useCancelAfter) { + if (QuerySettings.UseCancelAfter) { CancelAfter = GetDuration(operationParams->cancel_after()); } } @@ -92,7 +87,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const { Record.MutableRequest()->SetSessionId(SessionId); Record.MutableRequest()->SetAction(QueryAction); Record.MutableRequest()->SetType(QueryType); - Record.MutableRequest()->SetSyntax(Syntax); + Record.MutableRequest()->SetSyntax(QuerySettings.Syntax); if (HasOperationParams) { Record.MutableRequest()->SetCancelAfterMs(CancelAfter.MilliSeconds()); Record.MutableRequest()->SetTimeoutMs(OperationTimeout.MilliSeconds()); |
