aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-05-26 19:24:16 +0300
committerdcherednik <dcherednik@ydb.tech>2023-05-26 19:24:16 +0300
commit7fb69b50f57423be98cc6d9b9cd15eb341080c59 (patch)
treefe07168ca1b3a38a7fa434d9c8d7dca814a9ee7f
parente0c56556489d46590343c4276a5444e7ca85d7c7 (diff)
downloadydb-7fb69b50f57423be98cc6d9b9cd15eb341080c59.tar.gz
Fix cancelation for ScanExecuteYqlScript in case of operation timeout.
-rw-r--r--ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/grpc_services/base/base.h62
-rw-r--r--ydb/core/grpc_services/base/iface.h48
-rw-r--r--ydb/core/grpc_services/cancelation/cancelation.cpp2
-rw-r--r--ydb/core/grpc_services/local_rpc/local_rpc.h2
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_query.cpp2
-rw-r--r--ydb/core/grpc_services/rpc_create_session.cpp2
-rw-r--r--ydb/core/grpc_services/rpc_deferrable.h2
-rw-r--r--ydb/core/grpc_services/rpc_execute_yql_script.cpp14
-rw-r--r--ydb/core/grpc_services/rpc_read_table.cpp2
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp2
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp47
-rw-r--r--ydb/core/kqp/common/events/query.h7
-rw-r--r--ydb/core/kqp/common/kqp_event_impl.cpp15
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp1
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp13
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h3
-rw-r--r--ydb/core/kqp/ut/scan/kqp_scan_ut.cpp15
-rw-r--r--ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp228
22 files changed, 374 insertions, 97 deletions
diff --git a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt
index d1e1087ee09..0e8dce5fa38 100644
--- a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt
@@ -32,6 +32,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
fq-libs-actors
fq-libs-control_plane_proxy
libs-control_plane_proxy-events
+ core-grpc_services-base
core-grpc_services-counters
core-grpc_services-local_rpc
core-grpc_services-cancelation
diff --git a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
index 591edc09801..2f5b3ef7d07 100644
--- a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
@@ -33,6 +33,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
fq-libs-actors
fq-libs-control_plane_proxy
libs-control_plane_proxy-events
+ core-grpc_services-base
core-grpc_services-counters
core-grpc_services-local_rpc
core-grpc_services-cancelation
diff --git a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt
index 591edc09801..2f5b3ef7d07 100644
--- a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt
@@ -33,6 +33,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
fq-libs-actors
fq-libs-control_plane_proxy
libs-control_plane_proxy-events
+ core-grpc_services-base
core-grpc_services-counters
core-grpc_services-local_rpc
core-grpc_services-cancelation
diff --git a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt
index d1e1087ee09..0e8dce5fa38 100644
--- a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt
@@ -32,6 +32,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
fq-libs-actors
fq-libs-control_plane_proxy
libs-control_plane_proxy-events
+ core-grpc_services-base
core-grpc_services-counters
core-grpc_services-local_rpc
core-grpc_services-cancelation
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index 0f7ead8b574..7f3028da4e5 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -1,5 +1,7 @@
#pragma once
+#include "iface.h"
+
#include <grpc++/support/byte_buffer.h>
#include <grpc++/support/slice.h>
@@ -246,22 +248,6 @@ public:
}
};
-class IRequestCtxBaseMtSafe {
-public:
- virtual TMaybe<TString> GetTraceId() const = 0;
- // Returns client provided database name
- virtual const TMaybe<TString> GetDatabaseName() const = 0;
- // Returns "internal" token (result of ticket parser authentication)
- virtual const TIntrusiveConstPtr<NACLib::TUserToken>& GetInternalToken() const = 0;
- // Returns internal token as a serialized message.
- virtual const TString& GetSerializedToken() const = 0;
- virtual bool IsClientLost() const = 0;
- // Is this call made from inside YDB?
- virtual bool IsInternalCall() const {
- return false;
- }
-};
-
class IRequestCtxBase : public virtual IRequestCtxBaseMtSafe {
public:
virtual ~IRequestCtxBase() = default;
@@ -389,19 +375,6 @@ public:
virtual void Pass(const IFacilityProvider& facility) = 0;
};
-// Provide methods which can be safely passed though actor system
-// as part of event
-class IRequestCtxMtSafe : public virtual IRequestCtxBaseMtSafe {
-public:
- virtual ~IRequestCtxMtSafe() = default;
- virtual const google::protobuf::Message* GetRequest() const = 0;
- virtual const TMaybe<TString> GetRequestType() const = 0;
- // Implementation must be thread safe
- virtual void SetClientLostAction(std::function<void()>&& cb) = 0;
- // Allocation is thread safe. https://protobuf.dev/reference/cpp/arenas/#thread-safety
- virtual google::protobuf::Arena* GetArena() = 0;
-};
-
// Request context
// The interface is used for rpc_ request actors
class IRequestCtx
@@ -626,7 +599,7 @@ private:
const TActorId From_;
NGrpc::TAuthState State_;
TIntrusiveConstPtr<NACLib::TUserToken> InternalToken_;
- const TString EmptySerializedTokenMessage_;
+ inline static const TString EmptySerializedTokenMessage_;
NYql::TIssueManager IssueManager_;
};
@@ -848,7 +821,7 @@ public:
private:
TIntrusivePtr<IStreamCtx> Ctx_;
TIntrusiveConstPtr<NACLib::TUserToken> InternalToken_;
- const TString EmptySerializedTokenMessage_;
+ inline static const TString EmptySerializedTokenMessage_;
NYql::TIssueManager IssueManager_;
TMaybe<NRpcService::TRlPath> RlPath_;
bool RlAllowed_;
@@ -964,6 +937,8 @@ public:
using TRequest = TReq;
using TResponse = TResp;
+ using TFinishWrapper = std::function<void(const NGrpc::IRequestContextBase::TAsyncFinishResult&)>;
+
TGRpcRequestWrapperImpl(NGrpc::IRequestContextBase* ctx)
: Ctx_(ctx)
{ }
@@ -1155,16 +1130,15 @@ public:
Ctx_->SetNextReplyCallback(std::move(cb));
}
- void SetClientLostAction(std::function<void()>&& cb) override {
- auto shutdown = [cb = std::move(cb)](const NGrpc::IRequestContextBase::TAsyncFinishResult& future) mutable {
- Y_ASSERT(future.HasValue());
- if (future.GetValue() == NGrpc::IRequestContextBase::EFinishStatus::CANCEL) {
- cb();
- }
- };
+ void SetFinishAction(std::function<void()>&& cb) override {
+ auto shutdown = FinishWrapper(std::move(cb));
Ctx_->GetFinishFuture().Subscribe(std::move(shutdown));
}
+ void SetCustomFinishWrapper(std::function<TFinishWrapper(std::function<void()>&&)> wrapper) {
+ FinishWrapper = wrapper;
+ }
+
bool IsClientLost() const override {
return Ctx_->IsClientLost();
}
@@ -1222,10 +1196,19 @@ private:
return google::protobuf::Arena::CreateMessage<TResponse>(Ctx_->GetArena());
}
+ static TFinishWrapper GetStdFinishWrapper(std::function<void()>&& cb) {
+ return [cb = std::move(cb)](const NGrpc::IRequestContextBase::TAsyncFinishResult& future) mutable {
+ Y_ASSERT(future.HasValue());
+ if (future.GetValue() == NGrpc::IRequestContextBase::EFinishStatus::CANCEL) {
+ cb();
+ }
+ };
+ }
+
private:
TIntrusivePtr<NGrpc::IRequestContextBase> Ctx_;
TIntrusiveConstPtr<NACLib::TUserToken> InternalToken_;
- const TString EmptySerializedTokenMessage_;
+ inline static const TString EmptySerializedTokenMessage_;
NYql::TIssueManager IssueManager;
Ydb::CostInfo* CostInfo = nullptr;
Ydb::QuotaExceeded* QuotaExceeded = nullptr;
@@ -1233,6 +1216,7 @@ private:
TRespHook RespHook;
TMaybe<NRpcService::TRlPath> RlPath;
IGRpcProxyCounters::TPtr Counters;
+ std::function<TFinishWrapper(std::function<void()>&&)> FinishWrapper = &GetStdFinishWrapper;
};
template <ui32 TRpcId, typename TReq, typename TResp, bool IsOperation, typename TDerived>
diff --git a/ydb/core/grpc_services/base/iface.h b/ydb/core/grpc_services/base/iface.h
new file mode 100644
index 00000000000..bc0a2728d59
--- /dev/null
+++ b/ydb/core/grpc_services/base/iface.h
@@ -0,0 +1,48 @@
+#pragma once
+
+#include <util/generic/fwd.h>
+
+namespace google::protobuf {
+class Message;
+class Arena;
+}
+
+namespace NACLib {
+class TUserToken;
+
+}
+namespace NKikimr {
+
+namespace NGRpcService {
+
+class IRequestCtxBaseMtSafe {
+public:
+ virtual TMaybe<TString> GetTraceId() const = 0;
+ // Returns client provided database name
+ virtual const TMaybe<TString> GetDatabaseName() const = 0;
+ // Returns "internal" token (result of ticket parser authentication)
+ virtual const TIntrusiveConstPtr<NACLib::TUserToken>& GetInternalToken() const = 0;
+ // Returns internal token as a serialized message.
+ virtual const TString& GetSerializedToken() const = 0;
+ virtual bool IsClientLost() const = 0;
+ // Is this call made from inside YDB?
+ virtual bool IsInternalCall() const {
+ return false;
+ }
+};
+
+
+// Provide methods which can be safely passed though actor system // as part of event
+class IRequestCtxMtSafe : public virtual IRequestCtxBaseMtSafe {
+public:
+ virtual ~IRequestCtxMtSafe() = default;
+ virtual const google::protobuf::Message* GetRequest() const = 0;
+ virtual const TMaybe<TString> GetRequestType() const = 0;
+ // Implementation must be thread safe
+ virtual void SetFinishAction(std::function<void()>&& cb) = 0;
+ // Allocation is thread safe. https://protobuf.dev/reference/cpp/arenas/#thread-safety
+ virtual google::protobuf::Arena* GetArena() = 0;
+};
+
+}
+}
diff --git a/ydb/core/grpc_services/cancelation/cancelation.cpp b/ydb/core/grpc_services/cancelation/cancelation.cpp
index 37136078594..80eaaccfdc2 100644
--- a/ydb/core/grpc_services/cancelation/cancelation.cpp
+++ b/ydb/core/grpc_services/cancelation/cancelation.cpp
@@ -13,7 +13,7 @@ void PassSubscription(const TEvSubscribeGrpcCancel* ev, IRequestCtxMtSafe* reque
NActors::TActorSystem* as)
{
auto subscriber = ActorIdFromProto(ev->Record.GetSubscriber());
- requestCtx->SetClientLostAction([subscriber, as]() {
+ requestCtx->SetFinishAction([subscriber, as]() {
as->Send(subscriber, new TEvClientLost);
});
}
diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h
index fd90c59ed06..36229d5060e 100644
--- a/ydb/core/grpc_services/local_rpc/local_rpc.h
+++ b/ydb/core/grpc_services/local_rpc/local_rpc.h
@@ -162,7 +162,7 @@ public:
return &Request;
}
- void SetClientLostAction(std::function<void()>&&) override {}
+ void SetFinishAction(std::function<void()>&&) override {}
bool IsClientLost() const override { return false; }
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp
index a50eb52f262..7e3081ecb81 100644
--- a/ydb/core/grpc_services/query/rpc_execute_query.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp
@@ -180,7 +180,7 @@ public:
auto selfId = this->SelfId();
auto as = TActivationContext::ActorSystem();
- Request_->SetClientLostAction([selfId, as]() {
+ Request_->SetFinishAction([selfId, as]() {
as->Send(selfId, new TEvents::TEvWakeup(EWakeupTag::ClientLostTag));
});
diff --git a/ydb/core/grpc_services/rpc_create_session.cpp b/ydb/core/grpc_services/rpc_create_session.cpp
index e16c2ed508c..81f6daaba8b 100644
--- a/ydb/core/grpc_services/rpc_create_session.cpp
+++ b/ydb/core/grpc_services/rpc_create_session.cpp
@@ -46,7 +46,7 @@ public:
auto selfId = this->SelfId();
auto as = TActivationContext::ActorSystem();
- Request_->SetClientLostAction([selfId, as]() {
+ Request_->SetFinishAction([selfId, as]() {
as->Send(selfId, new TEvents::TEvWakeup(EWakeupTag::WakeupTagClientLost));
});
diff --git a/ydb/core/grpc_services/rpc_deferrable.h b/ydb/core/grpc_services/rpc_deferrable.h
index 75d474b4167..a2f60085792 100644
--- a/ydb/core/grpc_services/rpc_deferrable.h
+++ b/ydb/core/grpc_services/rpc_deferrable.h
@@ -78,7 +78,7 @@ public:
actorSystem->Send(selfId, new TRpcServices::TEvForgetOperation());
};
- Request_->SetClientLostAction(std::move(clientLostCb));
+ Request_->SetFinishAction(std::move(clientLostCb));
}
bool HasCancelOperation() {
diff --git a/ydb/core/grpc_services/rpc_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_execute_yql_script.cpp
index 6f0735714df..8d07dfad7d4 100644
--- a/ydb/core/grpc_services/rpc_execute_yql_script.cpp
+++ b/ydb/core/grpc_services/rpc_execute_yql_script.cpp
@@ -20,7 +20,13 @@ public:
using TResult = Ydb::Scripting::ExecuteYqlResult;
TExecuteYqlScriptRPC(IRequestOpCtx* msg)
- : TBase(msg) {}
+ : TBase(msg)
+ {
+ // StreamExecuteYqlScript allows write in to table.
+ // But CanselAfter should not trigger cancelation if we chage table
+ // This logic is broken - just disable CancelAfter_ for a while
+ CancelAfter_ = TDuration();
+ }
void Bootstrap(const TActorContext &ctx) {
TBase::Bootstrap(ctx);
@@ -47,6 +53,8 @@ public:
return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx);
}
+ ::Ydb::Operations::OperationParams operationParams;
+
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
NKikimrKqp::QUERY_ACTION_EXECUTE,
NKikimrKqp::QUERY_TYPE_SQL_SCRIPT,
@@ -59,7 +67,9 @@ public:
&req->parameters(),
req->collect_stats(),
nullptr, // query_cache_policy
- req->has_operation_params() ? &req->operation_params() : nullptr
+ req->has_operation_params() ? &req->operation_params() : nullptr,
+ false, // keep session
+ false // use cancelAfter
);
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
diff --git a/ydb/core/grpc_services/rpc_read_table.cpp b/ydb/core/grpc_services/rpc_read_table.cpp
index 4761e6a983e..ad888c08c67 100644
--- a/ydb/core/grpc_services/rpc_read_table.cpp
+++ b/ydb/core/grpc_services/rpc_read_table.cpp
@@ -153,7 +153,7 @@ public:
LOG_WARN(*as, NKikimrServices::READ_TABLE_API, "ForgetAction occurred, send TEvPoisonPill");
as->Send(actorId, new TEvents::TEvPoisonPill());
};
- Request_->SetClientLostAction(std::move(clientLostCb));
+ Request_->SetFinishAction(std::move(clientLostCb));
}
void PassAway() override {
diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
index 5337c436e9c..f93d44e8803 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
@@ -170,7 +170,7 @@ public:
auto selfId = this->SelfId();
auto as = TActivationContext::ActorSystem();
- Request_->SetClientLostAction([selfId, as]() {
+ Request_->SetFinishAction([selfId, as]() {
as->Send(selfId, new TEvents::TEvWakeup(EWakeupTag::ClientLostTag));
});
diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
index f837f343683..f1ac5f666fb 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
@@ -61,6 +61,18 @@ class TStreamExecuteYqlScriptRPC
private:
typedef TRpcRequestWithOperationParamsActor<TStreamExecuteYqlScriptRPC, TEvStreamExecuteYqlScriptRequest, false> TBase;
+ static std::function<TEvStreamExecuteYqlScriptRequest::TFinishWrapper(std::function<void()>&&)>
+ GetFinishWrapper(std::shared_ptr<std::atomic_bool> flag) {
+ return [flag](std::function<void()>&& cb) {
+ return [cb = std::move(cb), flag](const NGrpc::IRequestContextBase::TAsyncFinishResult& future) mutable {
+ Y_ASSERT(future.HasValue());
+ if (future.GetValue() == NGrpc::IRequestContextBase::EFinishStatus::CANCEL || flag->load()) {
+ cb();
+ }
+ };
+ };
+ }
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::GRPC_STREAM_REQ;
@@ -69,7 +81,17 @@ public:
TStreamExecuteYqlScriptRPC(IRequestNoOpCtx* request, ui64 rpcBufferSize)
: TBase(request)
, RpcBufferSize_(rpcBufferSize)
- {}
+ , CancelationFlag(std::make_shared<std::atomic_bool>(false))
+ {
+ // StreamExecuteYqlScript allows write in to table.
+ // But CanselAfter should not trigger cancelation if we chage table
+ // This logic is broken - just disable CancelAfter_ for a while
+ CancelAfter_ = TDuration();
+
+ auto call = dynamic_cast<TEvStreamExecuteYqlScriptRequest*>(request);
+ Y_VERIFY(call);
+ call->SetCustomFinishWrapper(GetFinishWrapper(CancelationFlag));
+ }
using TBase::Request_;
@@ -89,7 +111,7 @@ public:
auto selfId = this->SelfId();
auto as = TActivationContext::ActorSystem();
- RequestPtr()->SetClientLostAction([selfId, as]() {
+ RequestPtr()->SetFinishAction([selfId, as]() {
as->Send(selfId, new TEvents::TEvWakeup(EStreamRpcWakeupTag::ClientLostTag));
});
@@ -127,7 +149,7 @@ private:
return ReplyFinishStream("StreamExecuteYqlScript request is not supported");
}
- const auto req = GetProtoRequest();
+ auto req = GetProtoRequest();
const auto traceId = Request_->GetTraceId();
auto script = req->script();
@@ -137,6 +159,8 @@ private:
return ReplyFinishStream(Ydb::StatusIds::BAD_REQUEST, issues);
}
+ ::Ydb::Operations::OperationParams operationParams;
+
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
NKikimrKqp::QUERY_ACTION_EXECUTE,
NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING,
@@ -149,7 +173,9 @@ private:
&req->parameters(),
req->collect_stats(),
nullptr, // query_cache_policy
- req->has_operation_params() ? &req->operation_params() : nullptr
+ req->has_operation_params() ? &req->operation_params() : nullptr,
+ false, // keep session
+ false // use cancelAfter
);
if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) {
@@ -361,11 +387,7 @@ private:
<< " which exceeds client timeout " << InactiveClientTimeout_;
LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, message);
- if (GatewayRequestHandlerActorId_) {
- auto timeoutEv = MakeHolder<NKqp::TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::TIMEOUT, "Client timeout");
- ctx.Send(GatewayRequestHandlerActorId_, timeoutEv.Release());
- }
-
+ CancelationFlag->store(true);
auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, message);
return ReplyFinishStream(StatusIds::TIMEOUT, issue);
}
@@ -381,11 +403,7 @@ private:
void HandleOperationTimeout(const TActorContext& ctx) {
LOG_INFO_S(ctx, NKikimrServices::RPC_REQUEST, TStringBuilder() << this->SelfId() << " Operation timeout.");
- if (GatewayRequestHandlerActorId_) {
- auto timeoutEv = MakeHolder<NKqp::TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::TIMEOUT, "Operation timeout");
- ctx.Send(GatewayRequestHandlerActorId_, timeoutEv.Release());
- }
-
+ CancelationFlag->store(true);
auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Operation timeout");
return ReplyFinishStream(StatusIds::TIMEOUT, issue);
}
@@ -455,6 +473,7 @@ private:
ui64 ResultsReceived_ = 0;
// DataQuery
THolder<TDataQueryStreamContext> DataQueryStreamContext;
+ std::shared_ptr<std::atomic_bool> CancelationFlag;
};
} // namespace
diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h
index 458e3c37424..7aca7f6fb16 100644
--- a/ydb/core/kqp/common/events/query.h
+++ b/ydb/core/kqp/common/events/query.h
@@ -31,7 +31,8 @@ public:
const ::Ydb::Table::QueryStatsCollection::Mode collectStats,
const ::Ydb::Table::QueryCachePolicy* queryCachePolicy,
const ::Ydb::Operations::OperationParams* operationParams,
- bool keepSession = false);
+ bool keepSession = false,
+ bool useCancelAfter = true);
TEvQueryRequest() = default;
@@ -251,7 +252,7 @@ public:
void SetClientLostAction(TActorId actorId, NActors::TActorSystem* as) {
if (RequestCtx) {
- RequestCtx->SetClientLostAction([actorId, as]() {
+ RequestCtx->SetFinishAction([actorId, as]() {
as->Send(actorId, new NGRpcService::TEvClientLost());
});
} else if (Record.HasCancelationActor()) {
@@ -281,7 +282,7 @@ private:
const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* YdbParameters = nullptr;
const ::Ydb::Table::QueryStatsCollection::Mode CollectStats = Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE;
const ::Ydb::Table::QueryCachePolicy* QueryCachePolicy = nullptr;
- const ::Ydb::Operations::OperationParams* OperationParams = nullptr;
+ const bool HasOperationParams = false;
bool KeepSession = false;
TDuration OperationTimeout;
TDuration CancelAfter;
diff --git a/ydb/core/kqp/common/kqp_event_impl.cpp b/ydb/core/kqp/common/kqp_event_impl.cpp
index b3c18172ba5..4c70e55e929 100644
--- a/ydb/core/kqp/common/kqp_event_impl.cpp
+++ b/ydb/core/kqp/common/kqp_event_impl.cpp
@@ -18,7 +18,8 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest(
const ::Ydb::Table::QueryStatsCollection::Mode collectStats,
const ::Ydb::Table::QueryCachePolicy* queryCachePolicy,
const ::Ydb::Operations::OperationParams* operationParams,
- bool keepSession)
+ bool keepSession,
+ bool useCancelAfter)
: RequestCtx(ctx)
, RequestActorId(requestActorId)
, Database(CanonizePath(ctx->GetDatabaseName().GetOrElse("")))
@@ -31,12 +32,14 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest(
, YdbParameters(ydbParameters)
, CollectStats(collectStats)
, QueryCachePolicy(queryCachePolicy)
- , OperationParams(operationParams)
+ , HasOperationParams(operationParams)
, KeepSession(keepSession)
{
- if (OperationParams) {
- OperationTimeout = GetDuration(OperationParams->operation_timeout());
- CancelAfter = GetDuration(OperationParams->cancel_after());
+ if (HasOperationParams) {
+ OperationTimeout = GetDuration(operationParams->operation_timeout());
+ if (useCancelAfter) {
+ CancelAfter = GetDuration(operationParams->cancel_after());
+ }
}
}
@@ -85,7 +88,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
Record.MutableRequest()->SetSessionId(SessionId);
Record.MutableRequest()->SetAction(QueryAction);
Record.MutableRequest()->SetType(QueryType);
- if (OperationParams) {
+ if (HasOperationParams) {
Record.MutableRequest()->SetCancelAfterMs(CancelAfter.MilliSeconds());
Record.MutableRequest()->SetTimeoutMs(OperationTimeout.MilliSeconds());
}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index c7117fdb0ff..ef2b5dc0cd6 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1798,6 +1798,7 @@ public:
// always come from WorkerActor
hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup);
+ hFunc(TEvKqp::TEvQueryResponse, HandleNoop);
default:
UnexpectedEvent("CleanupState", ev);
}
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index 9df8580af7b..351ef6a04f4 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -1,6 +1,7 @@
#include "kqp_ut_common.h"
#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/kqp/provider/yql_kikimr_results.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
@@ -575,7 +576,7 @@ void PrintResultSet(const NYdb::TResultSet& resultSet, NYson::TYsonWriter& write
}
bool IsTimeoutError(NYdb::EStatus status) {
- return status == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED || status == NYdb::EStatus::TIMEOUT;
+ return status == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED || status == NYdb::EStatus::TIMEOUT || status == NYdb::EStatus::CANCELLED;
}
template<typename TIterator>
@@ -1046,5 +1047,15 @@ TVector<ui64> GetTableShards(Tests::TServer::TPtr server,
return GetTableShards(server.Get(), sender, path);
}
+void WaitForZeroSessions(const NKqp::TKqpCounters& counters) {
+ int count = 60;
+ while (counters.GetActiveSessionActors()->Val() != 0 && count) {
+ count--;
+ Sleep(TDuration::Seconds(1));
+ }
+
+ UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work");
+}
+
} // namspace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index d3602977ba3..98b08b2e1c2 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -65,6 +65,7 @@ TString MakeQuery(const TString& tmpl) {
namespace NKikimr {
namespace NKqp {
+class TKqpCounters;
const TString KikimrDefaultUtDomainRoot = "Root";
TVector<NKikimrKqp::TKqpSetting> SyntaxV1Settings();
@@ -277,5 +278,7 @@ TVector<ui64> GetTableShards(Tests::TServer* server, TActorId sender, const TStr
TVector<ui64> GetTableShards(Tests::TServer::TPtr server, TActorId sender, const TString &path);
+void WaitForZeroSessions(const NKqp::TKqpCounters& counters);
+
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
index f0612e4aa26..98a25b8114f 100644
--- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
+++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
@@ -128,7 +128,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work");
}
- Y_UNIT_TEST(StreamExecuteScanQueryTimeoutBruteForce) {
+ Y_UNIT_TEST(StreamExecuteScanQueryClientTimeoutBruteForce) {
TKikimrRunner kikimr;
NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
@@ -137,10 +137,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
for (int i = 1; i < maxTimeoutMs; i++) {
auto it = kikimr.GetTableClient().StreamExecuteScanQuery(R"(
SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key;
- )",
- TStreamExecScanQuerySettings()
- .ClientTimeout(TDuration::MilliSeconds(i))
- ).GetValueSync();
+ )", TStreamExecScanQuerySettings().ClientTimeout(TDuration::MilliSeconds(i))).GetValueSync();
if (it.IsSuccess()) {
try {
@@ -156,13 +153,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
}
}
- int count = 60;
- while (counters.GetActiveSessionActors()->Val() != 0 && count) {
- count--;
- Sleep(TDuration::Seconds(1));
- }
-
- UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work");
+ WaitForZeroSessions(counters);
}
Y_UNIT_TEST(IsNull) {
diff --git a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
index 1fa9cdbfcf3..df19a796c62 100644
--- a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
+++ b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
@@ -15,6 +15,18 @@ namespace NKqp {
using namespace NYdb;
using namespace NYdb::NScripting;
+static const TString EXPECTED_EIGHTSHARD_VALUE1 = R"(
+[
+ [[1];[101u];["Value1"]];
+ [[2];[201u];["Value1"]];
+ [[3];[301u];["Value1"]];
+ [[1];[401u];["Value1"]];
+ [[2];[501u];["Value1"]];
+ [[3];[601u];["Value1"]];
+ [[1];[701u];["Value1"]];
+ [[2];[801u];["Value1"]]
+])";
+
Y_UNIT_TEST_SUITE(KqpScripting) {
Y_UNIT_TEST(EndOfQueryCommit) {
TKikimrRunner kikimr;
@@ -583,7 +595,23 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work");
}
- Y_UNIT_TEST(StreamExecuteYqlScriptScanTimeoutBruteForce) {
+ std::function<TExecuteYqlRequestSettings(int)> GetExecuteYqlRequestSettingsFn(bool client, bool operation) {
+ return [client, operation](int i) {
+ TExecuteYqlRequestSettings settings;
+ if (client && operation) {
+ settings.ClientTimeout(TDuration::MilliSeconds(i))
+ .UseClientTimeoutForOperation(true);
+ } else if (client) {
+ settings.ClientTimeout(TDuration::MilliSeconds(i))
+ .UseClientTimeoutForOperation(false);
+ } else if (operation) {
+ settings.OperationTimeout(TDuration::MilliSeconds(i));
+ }
+ return settings;
+ };
+ }
+
+ void DoStreamExecuteYqlScriptScanTimeoutBruteForce(bool clientTimeout, bool operationTimeout) {
TKikimrRunner kikimr;
NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
@@ -591,20 +619,22 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
int maxTimeoutMs = 1000;
+ auto getExecuteYqlRequestSettings = GetExecuteYqlRequestSettingsFn(clientTimeout, operationTimeout);
+ auto expected = TString("[") + EXPECTED_EIGHTSHARD_VALUE1 + "]";
+
+ int unsuccessStatus = 0;
+
for (int i = 1; i < maxTimeoutMs; i++) {
auto it = client.StreamExecuteYqlScript(R"(
SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key;
- )",
- TExecuteYqlRequestSettings()
- .ClientTimeout(TDuration::MilliSeconds(i))
- .UseClientTimeoutForOperation(false)
- ).GetValueSync();
+ )", getExecuteYqlRequestSettings(i)).GetValueSync();
if (it.IsSuccess()) {
try {
auto yson = StreamResultToYson(it, true);
- CompareYson(R"([[[[1];[101u];["Value1"]];[[2];[201u];["Value1"]];[[3];[301u];["Value1"]];[[1];[401u];["Value1"]];[[2];[501u];["Value1"]];[[3];[601u];["Value1"]];[[1];[701u];["Value1"]];[[2];[801u];["Value1"]]]])", yson);
+ CompareYson(expected, yson);
} catch (const TStreamReadError& ex) {
+ unsuccessStatus++;
if (ex.Status != NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED && ex.Status != NYdb::EStatus::TIMEOUT) {
TStringStream msg;
msg << "unexpected status: " << ex.Status;
@@ -618,15 +648,187 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
UNIT_ASSERT_VALUES_EQUAL(it.GetStatus(), NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED);
}
}
+ UNIT_ASSERT(unsuccessStatus);
+ WaitForZeroSessions(counters);
+ }
- int count = 60;
- while (counters.GetActiveSessionActors()->Val() != 0 && count) {
- Cerr << "SESSIONS: " << counters.GetActiveSessionActors()->Val() << Endl;
- count--;
- Sleep(TDuration::Seconds(1));
+ void DoStreamExecuteYqlScriptTimeoutBruteForce(bool clientTimeout, bool operationTimeout) {
+ TKikimrRunner kikimr;
+ NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
+
+ TScriptingClient client(kikimr.GetDriver());
+
+ int maxTimeoutMs = 1000;
+
+ auto getExecuteYqlRequestSettings = GetExecuteYqlRequestSettingsFn(clientTimeout, operationTimeout);
+
+ for (int i = 1; i < maxTimeoutMs; i++) {
+ auto result = client.ExecuteYqlScript(R"(
+ SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key;
+ )", getExecuteYqlRequestSettings(i)).GetValueSync();
+
+ if (result.IsSuccess()) {
+ CompareYson(EXPECTED_EIGHTSHARD_VALUE1, FormatResultSetYson(result.GetResultSet(0)));
+ } else {
+ switch (result.GetStatus()) {
+ case NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED:
+ case NYdb::EStatus::TIMEOUT:
+ break;
+ default: {
+ TStringStream msg;
+ msg << "unexpected status: " << result.GetStatus();
+ UNIT_ASSERT_C(false, msg.Str().data());
+ }
+ }
+ }
}
- UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work");
+ WaitForZeroSessions(counters);
+ }
+
+ Y_UNIT_TEST(StreamExecuteYqlScriptScanCancelAfterBruteForce) {
+ TKikimrRunner kikimr;
+ NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
+
+ TScriptingClient client(kikimr.GetDriver());
+
+ int maxTimeoutMs = 1000;
+
+ auto expected = TString("[") + EXPECTED_EIGHTSHARD_VALUE1 + "]";
+
+ for (int i = 1; i < maxTimeoutMs; i++) {
+ auto it = client.StreamExecuteYqlScript(R"(
+ SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key;
+ )", TExecuteYqlRequestSettings().CancelAfter(TDuration::MilliSeconds(i))).GetValueSync();
+
+ UNIT_ASSERT(it.IsSuccess());
+ try {
+ auto yson = StreamResultToYson(it, true);
+ CompareYson(expected, yson);
+ } catch (const TStreamReadError& ex) {
+ if (ex.Status != NYdb::EStatus::CANCELLED) {
+ TStringStream msg;
+ msg << "unexpected status: " << ex.Status;
+ UNIT_ASSERT_C(false, msg.Str().data());
+ }
+ } catch (const std::exception& ex) {
+ auto msg = TString("unknown exception during the test: ") + ex.what();
+ UNIT_ASSERT_C(false, msg.data());
+ }
+ }
+
+ WaitForZeroSessions(counters);
+ }
+
+ // Check in case of CANCELED status we have no made changes in the table
+ Y_UNIT_TEST(StreamExecuteYqlScriptScanWriteCancelAfterBruteForced) {
+ TKikimrRunner kikimr;
+ NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
+
+ TScriptingClient client(kikimr.GetDriver());
+
+ int maxTimeoutMs = 1000;
+
+ auto createKey = [](int id) -> ui64 {
+ return (1u << 29) + id;
+ };
+
+ auto createExpectedRow = [](ui64 key) -> TString {
+ return Sprintf(R"([[100500];[%luu];["newrecords"]])", key);
+ };
+
+ TString expected;
+
+ for (int i = 1; i <= maxTimeoutMs; i++) {
+ auto it = client.StreamExecuteYqlScript(Sprintf(R"(
+ UPSERT INTO `/Root/EightShard` (Key, Data, Text) VALUES (%lu, 100500, "newrecords");
+ COMMIT;
+ SELECT * FROM `/Root/EightShard` WHERE Text = "newrecords" ORDER BY Key;
+ COMMIT;
+ )", createKey(i)), TExecuteYqlRequestSettings().CancelAfter(TDuration::MilliSeconds(i))).GetValueSync();
+
+ UNIT_ASSERT(it.IsSuccess());
+ try {
+ auto yson = StreamResultToYson(it, true);
+ expected += createExpectedRow(createKey(i));
+ if (i != maxTimeoutMs)
+ expected += ";";
+ CompareYson(TString("[[") + expected + "]]", yson);
+ } catch (const TStreamReadError& ex) {
+ if (ex.Status != NYdb::EStatus::CANCELLED) {
+ TStringStream msg;
+ msg << "unexpected status: " << ex.Status;
+ UNIT_ASSERT_C(false, msg.Str().data());
+ }
+ } catch (const std::exception& ex) {
+ auto msg = TString("unknown exception during the test: ") + ex.what();
+ UNIT_ASSERT_C(false, msg.data());
+ }
+ }
+
+ WaitForZeroSessions(counters);
+ }
+
+ Y_UNIT_TEST(StreamExecuteYqlScriptWriteCancelAfterBruteForced) {
+ TKikimrRunner kikimr;
+ NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
+
+ TScriptingClient client(kikimr.GetDriver());
+
+ int maxTimeoutMs = 1000;
+
+ auto createKey = [](int id) -> ui64 {
+ return (1u << 29) + id;
+ };
+
+ auto createExpectedRow = [](ui64 key) -> TString {
+ return Sprintf(R"([[100500];[%luu];["newrecords"]])", key);
+ };
+
+ TString expected;
+
+ for (int i = 1; i <= maxTimeoutMs; i++) {
+ auto result = client.ExecuteYqlScript(Sprintf(R"(
+ UPSERT INTO `/Root/EightShard` (Key, Data, Text) VALUES (%lu, 100500, "newrecords");
+ COMMIT;
+ SELECT * FROM `/Root/EightShard` WHERE Text = "newrecords" ORDER BY Key;
+ COMMIT;
+ )", createKey(i)), TExecuteYqlRequestSettings().CancelAfter(TDuration::MilliSeconds(i))).GetValueSync();
+
+ if (result.IsSuccess()) {
+ auto yson = FormatResultSetYson(result.GetResultSet(0));
+ expected += createExpectedRow(createKey(i));
+ if (i != maxTimeoutMs)
+ expected += ";";
+ CompareYson(TString("[") + expected + "]", yson);
+ }
+ }
+
+ WaitForZeroSessions(counters);
+ }
+
+ Y_UNIT_TEST(StreamExecuteYqlScriptScanClientTimeoutBruteForce) {
+ DoStreamExecuteYqlScriptScanTimeoutBruteForce(true, false);
+ }
+
+ Y_UNIT_TEST(StreamExecuteYqlScriptScanClientOperationTimeoutBruteForce) {
+ DoStreamExecuteYqlScriptScanTimeoutBruteForce(true, true);
+ }
+
+ Y_UNIT_TEST(StreamExecuteYqlScriptScanOperationTmeoutBruteForce) {
+ DoStreamExecuteYqlScriptScanTimeoutBruteForce(false, true);
+ }
+
+ Y_UNIT_TEST(StreamExecuteYqlScriptClientTimeoutBruteForce) {
+ DoStreamExecuteYqlScriptTimeoutBruteForce(true, false);
+ }
+
+ Y_UNIT_TEST(StreamExecuteYqlScriptClientOperationTimeoutBruteForce) {
+ DoStreamExecuteYqlScriptTimeoutBruteForce(true, true);
+ }
+
+ Y_UNIT_TEST(StreamExecuteYqlScriptOperationTmeoutBruteForce) {
+ DoStreamExecuteYqlScriptTimeoutBruteForce(false, true);
}
Y_UNIT_TEST(StreamExecuteYqlScriptScanScalar) {