diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-05-26 19:24:16 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-05-26 19:24:16 +0300 |
commit | 7fb69b50f57423be98cc6d9b9cd15eb341080c59 (patch) | |
tree | fe07168ca1b3a38a7fa434d9c8d7dca814a9ee7f | |
parent | e0c56556489d46590343c4276a5444e7ca85d7c7 (diff) | |
download | ydb-7fb69b50f57423be98cc6d9b9cd15eb341080c59.tar.gz |
Fix cancelation for ScanExecuteYqlScript in case of operation timeout.
22 files changed, 374 insertions, 97 deletions
diff --git a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt index d1e1087ee09..0e8dce5fa38 100644 --- a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt @@ -32,6 +32,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC fq-libs-actors fq-libs-control_plane_proxy libs-control_plane_proxy-events + core-grpc_services-base core-grpc_services-counters core-grpc_services-local_rpc core-grpc_services-cancelation diff --git a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt index 591edc09801..2f5b3ef7d07 100644 --- a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt +++ b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt @@ -33,6 +33,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC fq-libs-actors fq-libs-control_plane_proxy libs-control_plane_proxy-events + core-grpc_services-base core-grpc_services-counters core-grpc_services-local_rpc core-grpc_services-cancelation diff --git a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt index 591edc09801..2f5b3ef7d07 100644 --- a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt +++ b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt @@ -33,6 +33,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC fq-libs-actors fq-libs-control_plane_proxy libs-control_plane_proxy-events + core-grpc_services-base core-grpc_services-counters core-grpc_services-local_rpc core-grpc_services-cancelation diff --git a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt index d1e1087ee09..0e8dce5fa38 100644 --- a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt +++ b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt @@ -32,6 +32,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC fq-libs-actors fq-libs-control_plane_proxy libs-control_plane_proxy-events + core-grpc_services-base core-grpc_services-counters core-grpc_services-local_rpc core-grpc_services-cancelation diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 0f7ead8b574..7f3028da4e5 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -1,5 +1,7 @@ #pragma once +#include "iface.h" + #include <grpc++/support/byte_buffer.h> #include <grpc++/support/slice.h> @@ -246,22 +248,6 @@ public: } }; -class IRequestCtxBaseMtSafe { -public: - virtual TMaybe<TString> GetTraceId() const = 0; - // Returns client provided database name - virtual const TMaybe<TString> GetDatabaseName() const = 0; - // Returns "internal" token (result of ticket parser authentication) - virtual const TIntrusiveConstPtr<NACLib::TUserToken>& GetInternalToken() const = 0; - // Returns internal token as a serialized message. - virtual const TString& GetSerializedToken() const = 0; - virtual bool IsClientLost() const = 0; - // Is this call made from inside YDB? - virtual bool IsInternalCall() const { - return false; - } -}; - class IRequestCtxBase : public virtual IRequestCtxBaseMtSafe { public: virtual ~IRequestCtxBase() = default; @@ -389,19 +375,6 @@ public: virtual void Pass(const IFacilityProvider& facility) = 0; }; -// Provide methods which can be safely passed though actor system -// as part of event -class IRequestCtxMtSafe : public virtual IRequestCtxBaseMtSafe { -public: - virtual ~IRequestCtxMtSafe() = default; - virtual const google::protobuf::Message* GetRequest() const = 0; - virtual const TMaybe<TString> GetRequestType() const = 0; - // Implementation must be thread safe - virtual void SetClientLostAction(std::function<void()>&& cb) = 0; - // Allocation is thread safe. https://protobuf.dev/reference/cpp/arenas/#thread-safety - virtual google::protobuf::Arena* GetArena() = 0; -}; - // Request context // The interface is used for rpc_ request actors class IRequestCtx @@ -626,7 +599,7 @@ private: const TActorId From_; NGrpc::TAuthState State_; TIntrusiveConstPtr<NACLib::TUserToken> InternalToken_; - const TString EmptySerializedTokenMessage_; + inline static const TString EmptySerializedTokenMessage_; NYql::TIssueManager IssueManager_; }; @@ -848,7 +821,7 @@ public: private: TIntrusivePtr<IStreamCtx> Ctx_; TIntrusiveConstPtr<NACLib::TUserToken> InternalToken_; - const TString EmptySerializedTokenMessage_; + inline static const TString EmptySerializedTokenMessage_; NYql::TIssueManager IssueManager_; TMaybe<NRpcService::TRlPath> RlPath_; bool RlAllowed_; @@ -964,6 +937,8 @@ public: using TRequest = TReq; using TResponse = TResp; + using TFinishWrapper = std::function<void(const NGrpc::IRequestContextBase::TAsyncFinishResult&)>; + TGRpcRequestWrapperImpl(NGrpc::IRequestContextBase* ctx) : Ctx_(ctx) { } @@ -1155,16 +1130,15 @@ public: Ctx_->SetNextReplyCallback(std::move(cb)); } - void SetClientLostAction(std::function<void()>&& cb) override { - auto shutdown = [cb = std::move(cb)](const NGrpc::IRequestContextBase::TAsyncFinishResult& future) mutable { - Y_ASSERT(future.HasValue()); - if (future.GetValue() == NGrpc::IRequestContextBase::EFinishStatus::CANCEL) { - cb(); - } - }; + void SetFinishAction(std::function<void()>&& cb) override { + auto shutdown = FinishWrapper(std::move(cb)); Ctx_->GetFinishFuture().Subscribe(std::move(shutdown)); } + void SetCustomFinishWrapper(std::function<TFinishWrapper(std::function<void()>&&)> wrapper) { + FinishWrapper = wrapper; + } + bool IsClientLost() const override { return Ctx_->IsClientLost(); } @@ -1222,10 +1196,19 @@ private: return google::protobuf::Arena::CreateMessage<TResponse>(Ctx_->GetArena()); } + static TFinishWrapper GetStdFinishWrapper(std::function<void()>&& cb) { + return [cb = std::move(cb)](const NGrpc::IRequestContextBase::TAsyncFinishResult& future) mutable { + Y_ASSERT(future.HasValue()); + if (future.GetValue() == NGrpc::IRequestContextBase::EFinishStatus::CANCEL) { + cb(); + } + }; + } + private: TIntrusivePtr<NGrpc::IRequestContextBase> Ctx_; TIntrusiveConstPtr<NACLib::TUserToken> InternalToken_; - const TString EmptySerializedTokenMessage_; + inline static const TString EmptySerializedTokenMessage_; NYql::TIssueManager IssueManager; Ydb::CostInfo* CostInfo = nullptr; Ydb::QuotaExceeded* QuotaExceeded = nullptr; @@ -1233,6 +1216,7 @@ private: TRespHook RespHook; TMaybe<NRpcService::TRlPath> RlPath; IGRpcProxyCounters::TPtr Counters; + std::function<TFinishWrapper(std::function<void()>&&)> FinishWrapper = &GetStdFinishWrapper; }; template <ui32 TRpcId, typename TReq, typename TResp, bool IsOperation, typename TDerived> diff --git a/ydb/core/grpc_services/base/iface.h b/ydb/core/grpc_services/base/iface.h new file mode 100644 index 00000000000..bc0a2728d59 --- /dev/null +++ b/ydb/core/grpc_services/base/iface.h @@ -0,0 +1,48 @@ +#pragma once + +#include <util/generic/fwd.h> + +namespace google::protobuf { +class Message; +class Arena; +} + +namespace NACLib { +class TUserToken; + +} +namespace NKikimr { + +namespace NGRpcService { + +class IRequestCtxBaseMtSafe { +public: + virtual TMaybe<TString> GetTraceId() const = 0; + // Returns client provided database name + virtual const TMaybe<TString> GetDatabaseName() const = 0; + // Returns "internal" token (result of ticket parser authentication) + virtual const TIntrusiveConstPtr<NACLib::TUserToken>& GetInternalToken() const = 0; + // Returns internal token as a serialized message. + virtual const TString& GetSerializedToken() const = 0; + virtual bool IsClientLost() const = 0; + // Is this call made from inside YDB? + virtual bool IsInternalCall() const { + return false; + } +}; + + +// Provide methods which can be safely passed though actor system // as part of event +class IRequestCtxMtSafe : public virtual IRequestCtxBaseMtSafe { +public: + virtual ~IRequestCtxMtSafe() = default; + virtual const google::protobuf::Message* GetRequest() const = 0; + virtual const TMaybe<TString> GetRequestType() const = 0; + // Implementation must be thread safe + virtual void SetFinishAction(std::function<void()>&& cb) = 0; + // Allocation is thread safe. https://protobuf.dev/reference/cpp/arenas/#thread-safety + virtual google::protobuf::Arena* GetArena() = 0; +}; + +} +} diff --git a/ydb/core/grpc_services/cancelation/cancelation.cpp b/ydb/core/grpc_services/cancelation/cancelation.cpp index 37136078594..80eaaccfdc2 100644 --- a/ydb/core/grpc_services/cancelation/cancelation.cpp +++ b/ydb/core/grpc_services/cancelation/cancelation.cpp @@ -13,7 +13,7 @@ void PassSubscription(const TEvSubscribeGrpcCancel* ev, IRequestCtxMtSafe* reque NActors::TActorSystem* as) { auto subscriber = ActorIdFromProto(ev->Record.GetSubscriber()); - requestCtx->SetClientLostAction([subscriber, as]() { + requestCtx->SetFinishAction([subscriber, as]() { as->Send(subscriber, new TEvClientLost); }); } diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h index fd90c59ed06..36229d5060e 100644 --- a/ydb/core/grpc_services/local_rpc/local_rpc.h +++ b/ydb/core/grpc_services/local_rpc/local_rpc.h @@ -162,7 +162,7 @@ public: return &Request; } - void SetClientLostAction(std::function<void()>&&) override {} + void SetFinishAction(std::function<void()>&&) override {} bool IsClientLost() const override { return false; } diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index a50eb52f262..7e3081ecb81 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -180,7 +180,7 @@ public: auto selfId = this->SelfId(); auto as = TActivationContext::ActorSystem(); - Request_->SetClientLostAction([selfId, as]() { + Request_->SetFinishAction([selfId, as]() { as->Send(selfId, new TEvents::TEvWakeup(EWakeupTag::ClientLostTag)); }); diff --git a/ydb/core/grpc_services/rpc_create_session.cpp b/ydb/core/grpc_services/rpc_create_session.cpp index e16c2ed508c..81f6daaba8b 100644 --- a/ydb/core/grpc_services/rpc_create_session.cpp +++ b/ydb/core/grpc_services/rpc_create_session.cpp @@ -46,7 +46,7 @@ public: auto selfId = this->SelfId(); auto as = TActivationContext::ActorSystem(); - Request_->SetClientLostAction([selfId, as]() { + Request_->SetFinishAction([selfId, as]() { as->Send(selfId, new TEvents::TEvWakeup(EWakeupTag::WakeupTagClientLost)); }); diff --git a/ydb/core/grpc_services/rpc_deferrable.h b/ydb/core/grpc_services/rpc_deferrable.h index 75d474b4167..a2f60085792 100644 --- a/ydb/core/grpc_services/rpc_deferrable.h +++ b/ydb/core/grpc_services/rpc_deferrable.h @@ -78,7 +78,7 @@ public: actorSystem->Send(selfId, new TRpcServices::TEvForgetOperation()); }; - Request_->SetClientLostAction(std::move(clientLostCb)); + Request_->SetFinishAction(std::move(clientLostCb)); } bool HasCancelOperation() { diff --git a/ydb/core/grpc_services/rpc_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_execute_yql_script.cpp index 6f0735714df..8d07dfad7d4 100644 --- a/ydb/core/grpc_services/rpc_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_execute_yql_script.cpp @@ -20,7 +20,13 @@ public: using TResult = Ydb::Scripting::ExecuteYqlResult; TExecuteYqlScriptRPC(IRequestOpCtx* msg) - : TBase(msg) {} + : TBase(msg) + { + // StreamExecuteYqlScript allows write in to table. + // But CanselAfter should not trigger cancelation if we chage table + // This logic is broken - just disable CancelAfter_ for a while + CancelAfter_ = TDuration(); + } void Bootstrap(const TActorContext &ctx) { TBase::Bootstrap(ctx); @@ -47,6 +53,8 @@ public: return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx); } + ::Ydb::Operations::OperationParams operationParams; + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>( NKikimrKqp::QUERY_ACTION_EXECUTE, NKikimrKqp::QUERY_TYPE_SQL_SCRIPT, @@ -59,7 +67,9 @@ public: &req->parameters(), req->collect_stats(), nullptr, // query_cache_policy - req->has_operation_params() ? &req->operation_params() : nullptr + req->has_operation_params() ? &req->operation_params() : nullptr, + false, // keep session + false // use cancelAfter ); ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); diff --git a/ydb/core/grpc_services/rpc_read_table.cpp b/ydb/core/grpc_services/rpc_read_table.cpp index 4761e6a983e..ad888c08c67 100644 --- a/ydb/core/grpc_services/rpc_read_table.cpp +++ b/ydb/core/grpc_services/rpc_read_table.cpp @@ -153,7 +153,7 @@ public: LOG_WARN(*as, NKikimrServices::READ_TABLE_API, "ForgetAction occurred, send TEvPoisonPill"); as->Send(actorId, new TEvents::TEvPoisonPill()); }; - Request_->SetClientLostAction(std::move(clientLostCb)); + Request_->SetFinishAction(std::move(clientLostCb)); } void PassAway() override { diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp index 5337c436e9c..f93d44e8803 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp @@ -170,7 +170,7 @@ public: auto selfId = this->SelfId(); auto as = TActivationContext::ActorSystem(); - Request_->SetClientLostAction([selfId, as]() { + Request_->SetFinishAction([selfId, as]() { as->Send(selfId, new TEvents::TEvWakeup(EWakeupTag::ClientLostTag)); }); diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp index f837f343683..f1ac5f666fb 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp @@ -61,6 +61,18 @@ class TStreamExecuteYqlScriptRPC private: typedef TRpcRequestWithOperationParamsActor<TStreamExecuteYqlScriptRPC, TEvStreamExecuteYqlScriptRequest, false> TBase; + static std::function<TEvStreamExecuteYqlScriptRequest::TFinishWrapper(std::function<void()>&&)> + GetFinishWrapper(std::shared_ptr<std::atomic_bool> flag) { + return [flag](std::function<void()>&& cb) { + return [cb = std::move(cb), flag](const NGrpc::IRequestContextBase::TAsyncFinishResult& future) mutable { + Y_ASSERT(future.HasValue()); + if (future.GetValue() == NGrpc::IRequestContextBase::EFinishStatus::CANCEL || flag->load()) { + cb(); + } + }; + }; + } + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::GRPC_STREAM_REQ; @@ -69,7 +81,17 @@ public: TStreamExecuteYqlScriptRPC(IRequestNoOpCtx* request, ui64 rpcBufferSize) : TBase(request) , RpcBufferSize_(rpcBufferSize) - {} + , CancelationFlag(std::make_shared<std::atomic_bool>(false)) + { + // StreamExecuteYqlScript allows write in to table. + // But CanselAfter should not trigger cancelation if we chage table + // This logic is broken - just disable CancelAfter_ for a while + CancelAfter_ = TDuration(); + + auto call = dynamic_cast<TEvStreamExecuteYqlScriptRequest*>(request); + Y_VERIFY(call); + call->SetCustomFinishWrapper(GetFinishWrapper(CancelationFlag)); + } using TBase::Request_; @@ -89,7 +111,7 @@ public: auto selfId = this->SelfId(); auto as = TActivationContext::ActorSystem(); - RequestPtr()->SetClientLostAction([selfId, as]() { + RequestPtr()->SetFinishAction([selfId, as]() { as->Send(selfId, new TEvents::TEvWakeup(EStreamRpcWakeupTag::ClientLostTag)); }); @@ -127,7 +149,7 @@ private: return ReplyFinishStream("StreamExecuteYqlScript request is not supported"); } - const auto req = GetProtoRequest(); + auto req = GetProtoRequest(); const auto traceId = Request_->GetTraceId(); auto script = req->script(); @@ -137,6 +159,8 @@ private: return ReplyFinishStream(Ydb::StatusIds::BAD_REQUEST, issues); } + ::Ydb::Operations::OperationParams operationParams; + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>( NKikimrKqp::QUERY_ACTION_EXECUTE, NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING, @@ -149,7 +173,9 @@ private: &req->parameters(), req->collect_stats(), nullptr, // query_cache_policy - req->has_operation_params() ? &req->operation_params() : nullptr + req->has_operation_params() ? &req->operation_params() : nullptr, + false, // keep session + false // use cancelAfter ); if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) { @@ -361,11 +387,7 @@ private: << " which exceeds client timeout " << InactiveClientTimeout_; LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, message); - if (GatewayRequestHandlerActorId_) { - auto timeoutEv = MakeHolder<NKqp::TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::TIMEOUT, "Client timeout"); - ctx.Send(GatewayRequestHandlerActorId_, timeoutEv.Release()); - } - + CancelationFlag->store(true); auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, message); return ReplyFinishStream(StatusIds::TIMEOUT, issue); } @@ -381,11 +403,7 @@ private: void HandleOperationTimeout(const TActorContext& ctx) { LOG_INFO_S(ctx, NKikimrServices::RPC_REQUEST, TStringBuilder() << this->SelfId() << " Operation timeout."); - if (GatewayRequestHandlerActorId_) { - auto timeoutEv = MakeHolder<NKqp::TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::TIMEOUT, "Operation timeout"); - ctx.Send(GatewayRequestHandlerActorId_, timeoutEv.Release()); - } - + CancelationFlag->store(true); auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Operation timeout"); return ReplyFinishStream(StatusIds::TIMEOUT, issue); } @@ -455,6 +473,7 @@ private: ui64 ResultsReceived_ = 0; // DataQuery THolder<TDataQueryStreamContext> DataQueryStreamContext; + std::shared_ptr<std::atomic_bool> CancelationFlag; }; } // namespace diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h index 458e3c37424..7aca7f6fb16 100644 --- a/ydb/core/kqp/common/events/query.h +++ b/ydb/core/kqp/common/events/query.h @@ -31,7 +31,8 @@ public: const ::Ydb::Table::QueryStatsCollection::Mode collectStats, const ::Ydb::Table::QueryCachePolicy* queryCachePolicy, const ::Ydb::Operations::OperationParams* operationParams, - bool keepSession = false); + bool keepSession = false, + bool useCancelAfter = true); TEvQueryRequest() = default; @@ -251,7 +252,7 @@ public: void SetClientLostAction(TActorId actorId, NActors::TActorSystem* as) { if (RequestCtx) { - RequestCtx->SetClientLostAction([actorId, as]() { + RequestCtx->SetFinishAction([actorId, as]() { as->Send(actorId, new NGRpcService::TEvClientLost()); }); } else if (Record.HasCancelationActor()) { @@ -281,7 +282,7 @@ private: const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* YdbParameters = nullptr; const ::Ydb::Table::QueryStatsCollection::Mode CollectStats = Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE; const ::Ydb::Table::QueryCachePolicy* QueryCachePolicy = nullptr; - const ::Ydb::Operations::OperationParams* OperationParams = nullptr; + const bool HasOperationParams = false; bool KeepSession = false; TDuration OperationTimeout; TDuration CancelAfter; diff --git a/ydb/core/kqp/common/kqp_event_impl.cpp b/ydb/core/kqp/common/kqp_event_impl.cpp index b3c18172ba5..4c70e55e929 100644 --- a/ydb/core/kqp/common/kqp_event_impl.cpp +++ b/ydb/core/kqp/common/kqp_event_impl.cpp @@ -18,7 +18,8 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest( const ::Ydb::Table::QueryStatsCollection::Mode collectStats, const ::Ydb::Table::QueryCachePolicy* queryCachePolicy, const ::Ydb::Operations::OperationParams* operationParams, - bool keepSession) + bool keepSession, + bool useCancelAfter) : RequestCtx(ctx) , RequestActorId(requestActorId) , Database(CanonizePath(ctx->GetDatabaseName().GetOrElse(""))) @@ -31,12 +32,14 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest( , YdbParameters(ydbParameters) , CollectStats(collectStats) , QueryCachePolicy(queryCachePolicy) - , OperationParams(operationParams) + , HasOperationParams(operationParams) , KeepSession(keepSession) { - if (OperationParams) { - OperationTimeout = GetDuration(OperationParams->operation_timeout()); - CancelAfter = GetDuration(OperationParams->cancel_after()); + if (HasOperationParams) { + OperationTimeout = GetDuration(operationParams->operation_timeout()); + if (useCancelAfter) { + CancelAfter = GetDuration(operationParams->cancel_after()); + } } } @@ -85,7 +88,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const { Record.MutableRequest()->SetSessionId(SessionId); Record.MutableRequest()->SetAction(QueryAction); Record.MutableRequest()->SetType(QueryType); - if (OperationParams) { + if (HasOperationParams) { Record.MutableRequest()->SetCancelAfterMs(CancelAfter.MilliSeconds()); Record.MutableRequest()->SetTimeoutMs(OperationTimeout.MilliSeconds()); } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index c7117fdb0ff..ef2b5dc0cd6 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1798,6 +1798,7 @@ public: // always come from WorkerActor hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup); + hFunc(TEvKqp::TEvQueryResponse, HandleNoop); default: UnexpectedEvent("CleanupState", ev); } diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index 9df8580af7b..351ef6a04f4 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -1,6 +1,7 @@ #include "kqp_ut_common.h" #include <ydb/core/tx/schemeshard/schemeshard.h> +#include <ydb/core/kqp/counters/kqp_counters.h> #include <ydb/core/kqp/provider/yql_kikimr_results.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> @@ -575,7 +576,7 @@ void PrintResultSet(const NYdb::TResultSet& resultSet, NYson::TYsonWriter& write } bool IsTimeoutError(NYdb::EStatus status) { - return status == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED || status == NYdb::EStatus::TIMEOUT; + return status == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED || status == NYdb::EStatus::TIMEOUT || status == NYdb::EStatus::CANCELLED; } template<typename TIterator> @@ -1046,5 +1047,15 @@ TVector<ui64> GetTableShards(Tests::TServer::TPtr server, return GetTableShards(server.Get(), sender, path); } +void WaitForZeroSessions(const NKqp::TKqpCounters& counters) { + int count = 60; + while (counters.GetActiveSessionActors()->Val() != 0 && count) { + count--; + Sleep(TDuration::Seconds(1)); + } + + UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work"); +} + } // namspace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index d3602977ba3..98b08b2e1c2 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -65,6 +65,7 @@ TString MakeQuery(const TString& tmpl) { namespace NKikimr { namespace NKqp { +class TKqpCounters; const TString KikimrDefaultUtDomainRoot = "Root"; TVector<NKikimrKqp::TKqpSetting> SyntaxV1Settings(); @@ -277,5 +278,7 @@ TVector<ui64> GetTableShards(Tests::TServer* server, TActorId sender, const TStr TVector<ui64> GetTableShards(Tests::TServer::TPtr server, TActorId sender, const TString &path); +void WaitForZeroSessions(const NKqp::TKqpCounters& counters); + } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp index f0612e4aa26..98a25b8114f 100644 --- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp @@ -128,7 +128,7 @@ Y_UNIT_TEST_SUITE(KqpScan) { UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work"); } - Y_UNIT_TEST(StreamExecuteScanQueryTimeoutBruteForce) { + Y_UNIT_TEST(StreamExecuteScanQueryClientTimeoutBruteForce) { TKikimrRunner kikimr; NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); @@ -137,10 +137,7 @@ Y_UNIT_TEST_SUITE(KqpScan) { for (int i = 1; i < maxTimeoutMs; i++) { auto it = kikimr.GetTableClient().StreamExecuteScanQuery(R"( SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key; - )", - TStreamExecScanQuerySettings() - .ClientTimeout(TDuration::MilliSeconds(i)) - ).GetValueSync(); + )", TStreamExecScanQuerySettings().ClientTimeout(TDuration::MilliSeconds(i))).GetValueSync(); if (it.IsSuccess()) { try { @@ -156,13 +153,7 @@ Y_UNIT_TEST_SUITE(KqpScan) { } } - int count = 60; - while (counters.GetActiveSessionActors()->Val() != 0 && count) { - count--; - Sleep(TDuration::Seconds(1)); - } - - UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work"); + WaitForZeroSessions(counters); } Y_UNIT_TEST(IsNull) { diff --git a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp index 1fa9cdbfcf3..df19a796c62 100644 --- a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp +++ b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp @@ -15,6 +15,18 @@ namespace NKqp { using namespace NYdb; using namespace NYdb::NScripting; +static const TString EXPECTED_EIGHTSHARD_VALUE1 = R"( +[ + [[1];[101u];["Value1"]]; + [[2];[201u];["Value1"]]; + [[3];[301u];["Value1"]]; + [[1];[401u];["Value1"]]; + [[2];[501u];["Value1"]]; + [[3];[601u];["Value1"]]; + [[1];[701u];["Value1"]]; + [[2];[801u];["Value1"]] +])"; + Y_UNIT_TEST_SUITE(KqpScripting) { Y_UNIT_TEST(EndOfQueryCommit) { TKikimrRunner kikimr; @@ -583,7 +595,23 @@ Y_UNIT_TEST_SUITE(KqpScripting) { UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work"); } - Y_UNIT_TEST(StreamExecuteYqlScriptScanTimeoutBruteForce) { + std::function<TExecuteYqlRequestSettings(int)> GetExecuteYqlRequestSettingsFn(bool client, bool operation) { + return [client, operation](int i) { + TExecuteYqlRequestSettings settings; + if (client && operation) { + settings.ClientTimeout(TDuration::MilliSeconds(i)) + .UseClientTimeoutForOperation(true); + } else if (client) { + settings.ClientTimeout(TDuration::MilliSeconds(i)) + .UseClientTimeoutForOperation(false); + } else if (operation) { + settings.OperationTimeout(TDuration::MilliSeconds(i)); + } + return settings; + }; + } + + void DoStreamExecuteYqlScriptScanTimeoutBruteForce(bool clientTimeout, bool operationTimeout) { TKikimrRunner kikimr; NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); @@ -591,20 +619,22 @@ Y_UNIT_TEST_SUITE(KqpScripting) { int maxTimeoutMs = 1000; + auto getExecuteYqlRequestSettings = GetExecuteYqlRequestSettingsFn(clientTimeout, operationTimeout); + auto expected = TString("[") + EXPECTED_EIGHTSHARD_VALUE1 + "]"; + + int unsuccessStatus = 0; + for (int i = 1; i < maxTimeoutMs; i++) { auto it = client.StreamExecuteYqlScript(R"( SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key; - )", - TExecuteYqlRequestSettings() - .ClientTimeout(TDuration::MilliSeconds(i)) - .UseClientTimeoutForOperation(false) - ).GetValueSync(); + )", getExecuteYqlRequestSettings(i)).GetValueSync(); if (it.IsSuccess()) { try { auto yson = StreamResultToYson(it, true); - CompareYson(R"([[[[1];[101u];["Value1"]];[[2];[201u];["Value1"]];[[3];[301u];["Value1"]];[[1];[401u];["Value1"]];[[2];[501u];["Value1"]];[[3];[601u];["Value1"]];[[1];[701u];["Value1"]];[[2];[801u];["Value1"]]]])", yson); + CompareYson(expected, yson); } catch (const TStreamReadError& ex) { + unsuccessStatus++; if (ex.Status != NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED && ex.Status != NYdb::EStatus::TIMEOUT) { TStringStream msg; msg << "unexpected status: " << ex.Status; @@ -618,15 +648,187 @@ Y_UNIT_TEST_SUITE(KqpScripting) { UNIT_ASSERT_VALUES_EQUAL(it.GetStatus(), NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED); } } + UNIT_ASSERT(unsuccessStatus); + WaitForZeroSessions(counters); + } - int count = 60; - while (counters.GetActiveSessionActors()->Val() != 0 && count) { - Cerr << "SESSIONS: " << counters.GetActiveSessionActors()->Val() << Endl; - count--; - Sleep(TDuration::Seconds(1)); + void DoStreamExecuteYqlScriptTimeoutBruteForce(bool clientTimeout, bool operationTimeout) { + TKikimrRunner kikimr; + NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); + + TScriptingClient client(kikimr.GetDriver()); + + int maxTimeoutMs = 1000; + + auto getExecuteYqlRequestSettings = GetExecuteYqlRequestSettingsFn(clientTimeout, operationTimeout); + + for (int i = 1; i < maxTimeoutMs; i++) { + auto result = client.ExecuteYqlScript(R"( + SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key; + )", getExecuteYqlRequestSettings(i)).GetValueSync(); + + if (result.IsSuccess()) { + CompareYson(EXPECTED_EIGHTSHARD_VALUE1, FormatResultSetYson(result.GetResultSet(0))); + } else { + switch (result.GetStatus()) { + case NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED: + case NYdb::EStatus::TIMEOUT: + break; + default: { + TStringStream msg; + msg << "unexpected status: " << result.GetStatus(); + UNIT_ASSERT_C(false, msg.Str().data()); + } + } + } } - UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work"); + WaitForZeroSessions(counters); + } + + Y_UNIT_TEST(StreamExecuteYqlScriptScanCancelAfterBruteForce) { + TKikimrRunner kikimr; + NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); + + TScriptingClient client(kikimr.GetDriver()); + + int maxTimeoutMs = 1000; + + auto expected = TString("[") + EXPECTED_EIGHTSHARD_VALUE1 + "]"; + + for (int i = 1; i < maxTimeoutMs; i++) { + auto it = client.StreamExecuteYqlScript(R"( + SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key; + )", TExecuteYqlRequestSettings().CancelAfter(TDuration::MilliSeconds(i))).GetValueSync(); + + UNIT_ASSERT(it.IsSuccess()); + try { + auto yson = StreamResultToYson(it, true); + CompareYson(expected, yson); + } catch (const TStreamReadError& ex) { + if (ex.Status != NYdb::EStatus::CANCELLED) { + TStringStream msg; + msg << "unexpected status: " << ex.Status; + UNIT_ASSERT_C(false, msg.Str().data()); + } + } catch (const std::exception& ex) { + auto msg = TString("unknown exception during the test: ") + ex.what(); + UNIT_ASSERT_C(false, msg.data()); + } + } + + WaitForZeroSessions(counters); + } + + // Check in case of CANCELED status we have no made changes in the table + Y_UNIT_TEST(StreamExecuteYqlScriptScanWriteCancelAfterBruteForced) { + TKikimrRunner kikimr; + NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); + + TScriptingClient client(kikimr.GetDriver()); + + int maxTimeoutMs = 1000; + + auto createKey = [](int id) -> ui64 { + return (1u << 29) + id; + }; + + auto createExpectedRow = [](ui64 key) -> TString { + return Sprintf(R"([[100500];[%luu];["newrecords"]])", key); + }; + + TString expected; + + for (int i = 1; i <= maxTimeoutMs; i++) { + auto it = client.StreamExecuteYqlScript(Sprintf(R"( + UPSERT INTO `/Root/EightShard` (Key, Data, Text) VALUES (%lu, 100500, "newrecords"); + COMMIT; + SELECT * FROM `/Root/EightShard` WHERE Text = "newrecords" ORDER BY Key; + COMMIT; + )", createKey(i)), TExecuteYqlRequestSettings().CancelAfter(TDuration::MilliSeconds(i))).GetValueSync(); + + UNIT_ASSERT(it.IsSuccess()); + try { + auto yson = StreamResultToYson(it, true); + expected += createExpectedRow(createKey(i)); + if (i != maxTimeoutMs) + expected += ";"; + CompareYson(TString("[[") + expected + "]]", yson); + } catch (const TStreamReadError& ex) { + if (ex.Status != NYdb::EStatus::CANCELLED) { + TStringStream msg; + msg << "unexpected status: " << ex.Status; + UNIT_ASSERT_C(false, msg.Str().data()); + } + } catch (const std::exception& ex) { + auto msg = TString("unknown exception during the test: ") + ex.what(); + UNIT_ASSERT_C(false, msg.data()); + } + } + + WaitForZeroSessions(counters); + } + + Y_UNIT_TEST(StreamExecuteYqlScriptWriteCancelAfterBruteForced) { + TKikimrRunner kikimr; + NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); + + TScriptingClient client(kikimr.GetDriver()); + + int maxTimeoutMs = 1000; + + auto createKey = [](int id) -> ui64 { + return (1u << 29) + id; + }; + + auto createExpectedRow = [](ui64 key) -> TString { + return Sprintf(R"([[100500];[%luu];["newrecords"]])", key); + }; + + TString expected; + + for (int i = 1; i <= maxTimeoutMs; i++) { + auto result = client.ExecuteYqlScript(Sprintf(R"( + UPSERT INTO `/Root/EightShard` (Key, Data, Text) VALUES (%lu, 100500, "newrecords"); + COMMIT; + SELECT * FROM `/Root/EightShard` WHERE Text = "newrecords" ORDER BY Key; + COMMIT; + )", createKey(i)), TExecuteYqlRequestSettings().CancelAfter(TDuration::MilliSeconds(i))).GetValueSync(); + + if (result.IsSuccess()) { + auto yson = FormatResultSetYson(result.GetResultSet(0)); + expected += createExpectedRow(createKey(i)); + if (i != maxTimeoutMs) + expected += ";"; + CompareYson(TString("[") + expected + "]", yson); + } + } + + WaitForZeroSessions(counters); + } + + Y_UNIT_TEST(StreamExecuteYqlScriptScanClientTimeoutBruteForce) { + DoStreamExecuteYqlScriptScanTimeoutBruteForce(true, false); + } + + Y_UNIT_TEST(StreamExecuteYqlScriptScanClientOperationTimeoutBruteForce) { + DoStreamExecuteYqlScriptScanTimeoutBruteForce(true, true); + } + + Y_UNIT_TEST(StreamExecuteYqlScriptScanOperationTmeoutBruteForce) { + DoStreamExecuteYqlScriptScanTimeoutBruteForce(false, true); + } + + Y_UNIT_TEST(StreamExecuteYqlScriptClientTimeoutBruteForce) { + DoStreamExecuteYqlScriptTimeoutBruteForce(true, false); + } + + Y_UNIT_TEST(StreamExecuteYqlScriptClientOperationTimeoutBruteForce) { + DoStreamExecuteYqlScriptTimeoutBruteForce(true, true); + } + + Y_UNIT_TEST(StreamExecuteYqlScriptOperationTmeoutBruteForce) { + DoStreamExecuteYqlScriptTimeoutBruteForce(false, true); } Y_UNIT_TEST(StreamExecuteYqlScriptScanScalar) { |