diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-04-24 20:24:22 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-04-24 20:24:22 +0300 |
commit | e4e423960ce04a66895607b49f17348193241699 (patch) | |
tree | 5555f72ac81fa3c9f4db677cef5b520dcd18290e | |
parent | c562747cc8a659d9306a4e3986225286a504671b (diff) | |
download | ydb-e4e423960ce04a66895607b49f17348193241699.tar.gz |
Improve cancelation for scan requests.
Old logic works only in case if there was at least one part (rpc layer saw executor id).
New logic - subscribe for ClientLost from sessionActor.
In this case session actor sends AbortExecution to executor to executor.
A bit more complex logic in case of scripting service. We have 2 session actors to handle this request.
Both of them are subscribed for cancelation.
-rw-r--r-- | ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp | 57 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp | 64 | ||||
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 21 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scan/kqp_scan_ut.cpp | 35 | ||||
-rw-r--r-- | ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp | 39 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_failpoints.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_failpoints.h | 43 |
12 files changed, 187 insertions, 92 deletions
diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp index 3ccad104efc..5337c436e9c 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp @@ -91,18 +91,12 @@ bool NeedReportPlan(const Ydb::Table::ExecuteScanQueryRequest& req) { } } -bool FillKqpRequest(const Ydb::Table::ExecuteScanQueryRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest, - TParseRequestError& error) +bool CheckRequest(const Ydb::Table::ExecuteScanQueryRequest& req, TParseRequestError& error) { - kqpRequest.MutableRequest()->MutableYdbParameters()->insert(req.parameters().begin(), req.parameters().end()); switch (req.mode()) { case Ydb::Table::ExecuteScanQueryRequest::MODE_EXEC: - kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); - kqpRequest.MutableRequest()->SetStatsMode(GetKqpStatsMode(req.collect_stats())); - kqpRequest.MutableRequest()->SetCollectStats(req.collect_stats()); break; case Ydb::Table::ExecuteScanQueryRequest::MODE_EXPLAIN: - kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXPLAIN); break; default: { NYql::TIssues issues; @@ -111,8 +105,6 @@ bool FillKqpRequest(const Ydb::Table::ExecuteScanQueryRequest& req, NKikimrKqp:: return false; } } - kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_SCAN); - kqpRequest.MutableRequest()->SetKeepSession(false); auto& query = req.query(); switch (query.query_case()) { @@ -122,8 +114,6 @@ bool FillKqpRequest(const Ydb::Table::ExecuteScanQueryRequest& req, NKikimrKqp:: error = TParseRequestError(Ydb::StatusIds::BAD_REQUEST, issues); return false; } - - kqpRequest.MutableRequest()->SetQuery(query.yql_text()); break; } @@ -213,22 +203,31 @@ private: const auto req = Request_->GetProtoRequest(); const auto traceId = Request_->GetTraceId(); - auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); - SetAuthToken(ev, *Request_); - SetDatabase(ev, *Request_); - SetRlPath(ev, *Request_); - - if (traceId) { - ev->Record.SetTraceId(traceId.GetRef()); - } - - ActorIdToProto(this->SelfId(), ev->Record.MutableRequestActorId()); - TParseRequestError parseError; - if (!FillKqpRequest(*req, ev->Record, parseError)) { + if (!CheckRequest(*req, parseError)) { return ReplyFinishStream(parseError.Status, parseError.Issues); } + auto action = (req->mode() == Ydb::Table::ExecuteScanQueryRequest::MODE_EXEC) + ? NKikimrKqp::QUERY_ACTION_EXECUTE + : NKikimrKqp::QUERY_ACTION_EXPLAIN; + + auto text = req->query().yql_text(); + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>( + action, + NKikimrKqp::QUERY_TYPE_SQL_SCAN, + SelfId(), + Request_, + TString(), //sessionId + std::move(text), + TString(), //queryId, + nullptr, //tx_control + &req->parameters(), + req->collect_stats(), + nullptr, // query_cache_policy + nullptr + ); + if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) { NYql::TIssues issues; issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error")); @@ -402,13 +401,7 @@ private: } void HandleClientLost(const TActorContext& ctx) { - LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, "Client lost, send abort event to executer " << ExecuterActorId_); - - if (ExecuterActorId_) { - auto abortEv = TEvKqp::TEvAbortExecution::Aborted("Client lost"); // any status code can be here - - ctx.Send(ExecuterActorId_, abortEv.Release()); - } + LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, "Client lost"); // We must try to finish stream otherwise grpc will not free allocated memory // If stream already scheduled to be finished (ReplyFinishStream already called) @@ -421,6 +414,8 @@ private: void HandleTimeout(const TActorContext& ctx) { TInstant now = TAppData::TimeProvider->Now(); TDuration timeout; + LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, "Got timeout event, InactiveClientTimeout: " << InactiveClientTimeout_ + << " GRpcResponsesSizeQueue: " << GRpcResponsesSizeQueue_.size()); if (InactiveClientTimeout_ && GRpcResponsesSizeQueue_.size() > 0) { TDuration processTime = now - LastDataStreamTimestamp_; @@ -485,7 +480,7 @@ private: } private: - std::unique_ptr<TEvStreamExecuteScanQueryRequest> Request_; + std::shared_ptr<TEvStreamExecuteScanQueryRequest> Request_; const ui64 RpcBufferSize_; TDuration InactiveClientTimeout_; diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp index 21e03f99115..613fe6a7d89 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp @@ -53,28 +53,6 @@ namespace { ClientLostTag = 1, ClientTimeoutTag = 2 }; - - bool FillKqpRequest(const Ydb::Scripting::ExecuteYqlRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest, - TParseRequestError& error) - { - kqpRequest.MutableRequest()->MutableYdbParameters()->insert(req.parameters().begin(), req.parameters().end()); - - auto& script = req.script(); - NYql::TIssues issues; - if (!CheckQuery(script, issues)) { - error = TParseRequestError(Ydb::StatusIds::BAD_REQUEST, issues); - return false; - } - - kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); - kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING); - kqpRequest.MutableRequest()->SetStatsMode(GetKqpStatsMode(req.collect_stats())); - kqpRequest.MutableRequest()->SetCollectStats(req.collect_stats()); - kqpRequest.MutableRequest()->SetKeepSession(false); - kqpRequest.MutableRequest()->SetQuery(script); - - return true; - } } class TStreamExecuteYqlScriptRPC @@ -132,6 +110,7 @@ private: HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle); // Overide default forget action which terminate this actor on client disconnect hFunc(TRpcServices::TEvForgetOperation, HandleForget); + hFunc(TEvSubscribeGrpcCancel, HandleSubscribeiGrpcCancel); default: { return ReplyFinishStream(TStringBuilder() << "Unexpected event received in TStreamExecuteYqlScriptRPC::StateWork: " << ev->GetTypeRewrite()); @@ -152,20 +131,30 @@ private: const auto req = GetProtoRequest(); const auto traceId = Request_->GetTraceId(); - auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); - SetAuthToken(ev, *Request_); - SetDatabase(ev, *Request_); + auto script = req->script(); - if (traceId) { - ev->Record.SetTraceId(traceId.GetRef()); + NYql::TIssues issues; + if (!CheckQuery(script, issues)) { + return ReplyFinishStream(Ydb::StatusIds::BAD_REQUEST, issues); } + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>( + NKikimrKqp::QUERY_ACTION_EXECUTE, + NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING, + SelfId(), + Request_, + TString(), //sessionId + std::move(script), + TString(), //queryId + nullptr, //tx_control + &req->parameters(), + req->collect_stats(), + nullptr, // query_cache_policy + req->has_operation_params() ? &req->operation_params() : nullptr + ); + ActorIdToProto(this->SelfId(), ev->Record.MutableRequestActorId()); - TParseRequestError parseError; - if (!FillKqpRequest(*req, ev->Record, parseError)) { - return ReplyFinishStream(parseError.Status, parseError.Issues); - } if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) { return ReplyFinishStream("Couldn't send request to KqpProxy"); } @@ -356,13 +345,7 @@ private: } void HandleClientLost(const TActorContext& ctx) { - LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, "Client lost, send abort event to executer " << GatewayRequestHandlerActorId_); - - if (GatewayRequestHandlerActorId_) { - auto abortEv = NKqp::TEvKqp::TEvAbortExecution::Aborted("Client lost"); - - ctx.Send(GatewayRequestHandlerActorId_, abortEv.Release()); - } + LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, "Client lost"); // We must try to finish stream otherwise grpc will not free allocated memory // If stream already scheduled to be finished (ReplyFinishStream already called) @@ -461,6 +444,11 @@ private: } private: + void HandleSubscribeiGrpcCancel(TEvSubscribeGrpcCancel::TPtr& ev) { + auto as = TActivationContext::ActorSystem(); + PassSubscription(ev->Get(), Request_.get(), as); + } + const ui64 RpcBufferSize_; TDuration InactiveClientTimeout_; diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index 2fe3213e249..e60e2f68ecf 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -1167,6 +1167,10 @@ const ::NMonitoring::TDynamicCounters::TCounterPtr TKqpCounters::RecompileReques return TKqpCountersBase::CompileRequestsRecompile; } +const ::NMonitoring::TDynamicCounters::TCounterPtr TKqpCounters::GetActiveSessionActors() const { + return TKqpCountersBase::ActiveSessionActors; +} + ::NMonitoring::TDynamicCounters::TCounterPtr TKqpCounters::GetQueryTypeCounter( NKikimrKqp::EQueryType queryType) { diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index bfeed50c1d2..1a0e8cb80c5 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -328,6 +328,7 @@ public: const ::NMonitoring::TDynamicCounters::TCounterPtr RecompileRequestGet() const; ::NMonitoring::TDynamicCounterPtr GetKqpCounters() const; ::NMonitoring::TDynamicCounterPtr GetQueryReplayCounters() const; + const ::NMonitoring::TDynamicCounters::TCounterPtr GetActiveSessionActors() const; ::NMonitoring::TDynamicCounters::TCounterPtr GetQueryTypeCounter(NKikimrKqp::EQueryType queryType); diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index a721cf20664..21ce6c3ac31 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -1959,6 +1959,8 @@ public: ev->Record.MutableRequest()->SetKeepSession(false); ev->Record.MutableRequest()->SetCollectStats(settings.CollectStats); + ActorIdToProto(target, ev->Record.MutableCancelationActor()); + FillParameters(std::move(params), *ev->Record.MutableRequest()->MutableParameters()); return SendKqpScanQueryStreamRequest(ev.Release(), target, diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 67406873e4b..a5e64c722a0 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -706,7 +706,6 @@ public: LocalSessions->StartIdleCheck(info, GetSessionIdleDuration()); } - LogResponse(proxyRequest->TraceId, ev->Get()->Record, proxyRequest->DbCounters); Send(proxyRequest->Sender, ev->Release().Release(), 0, proxyRequest->SenderCookie); TKqpRequestInfo requestInfo(proxyRequest->TraceId); @@ -1133,26 +1132,6 @@ private: } void LogResponse(const TKqpRequestInfo&, - const TEvKqp::TProtoArenaHolder<NKikimrKqp::TEvQueryResponse>& holder, - TKqpDbCountersPtr dbCounters) - { - const auto& event = holder.GetRef(); - - Counters->ReportResponseStatus(dbCounters, event.ByteSize(), - event.GetYdbStatus()); - - for (auto& issue : event.GetResponse().GetQueryIssues()) { - Counters->ReportIssues(dbCounters, issue); - } - - ui64 resultsBytes = 0; - for (auto& result : event.GetResponse().GetResults()) { - resultsBytes += result.ByteSize(); - } - Counters->ReportResultsBytes(dbCounters, resultsBytes); - } - - void LogResponse(const TKqpRequestInfo&, const NKikimrKqp::TEvCreateSessionResponse& event, TKqpDbCountersPtr dbCounters) { Counters->ReportResponseStatus(dbCounters, event.ByteSize(), diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 66211acdd74..8aaa7698e3c 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -411,6 +411,7 @@ public: void CompileQuery() { YQL_ENSURE(QueryState); auto ev = QueryState->BuildCompileRequest(); + LOG_D("Sending CompileQuery request"); Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId); Become(&TKqpSessionActor::CompileState); } @@ -494,6 +495,7 @@ public: } void AcquirePersistentSnapshot() { + LOG_D("acquire persistent snapshot"); auto timeout = QueryState->QueryDeadlines.TimeoutAt - TAppData::TimeProvider->Now(); auto* snapMgr = CreateKqpSnapshotManager(Settings.Database, timeout); @@ -1573,6 +1575,7 @@ public: Counters->ReportTxAborted(Settings.DbCounters, Transactions.ToBeAbortedSize()); } + auto workerId = WorkerId; if (WorkerId) { auto ev = std::make_unique<TEvKqp::TEvCloseSessionRequest>(); ev->Record.MutableRequest()->SetSessionId(SessionId); @@ -1593,7 +1596,8 @@ public: } LOG_I("Cleanup start, isFinal: " << isFinal << " CleanupCtx: " << bool{CleanupCtx} - << " TransactionsToBeAborted.size(): " << (CleanupCtx ? CleanupCtx->TransactionsToBeAborted.size() : 0)); + << " TransactionsToBeAborted.size(): " << (CleanupCtx ? CleanupCtx->TransactionsToBeAborted.size() : 0) + << " WorkerId: " << (workerId ? *workerId : TActorId())); if (CleanupCtx) { Become(&TKqpSessionActor::CleanupState); } else { diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp index bcae28b42e9..41959f5bcdb 100644 --- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp @@ -1,5 +1,6 @@ #include <ydb/core/kqp/ut/common/kqp_ut_common.h> #include <ydb/core/kqp/counters/kqp_counters.h> +#include <ydb/core/tx/datashard/datashard_failpoints.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> @@ -93,6 +94,40 @@ void CreateNullSampleTables(TKikimrRunner& kikimr) { } // namespace Y_UNIT_TEST_SUITE(KqpScan) { + + Y_UNIT_TEST(StreamExecuteScanQueryCancelation) { + TKikimrRunner kikimr; + NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); + + NDataShard::gSkipReadIteratorResultFailPoint.Enable(-1); + + { + auto it = kikimr.GetTableClient().StreamExecuteScanQuery(R"( + SELECT * FROM `/Root/EightShard` WHERE Text = "Value1"; + )").GetValueSync(); + + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + + // We must wait execution to be started + int count = 60; + while (counters.GetActiveSessionActors()->Val() != 1 && count) { + count--; + Sleep(TDuration::Seconds(1)); + } + + UNIT_ASSERT_C(count, "Unable to wait second session actor (executing compiled program) start"); + } + + NDataShard::gSkipRepliesFailPoint.Disable(); + int count = 60; + while (counters.GetActiveSessionActors()->Val() != 0 && count) { + count--; + Sleep(TDuration::Seconds(1)); + } + + UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work"); + } + Y_UNIT_TEST(IsNull) { auto kikimr = DefaultKikimrRunner(); CreateNullSampleTables(kikimr); diff --git a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp index ab0bcb354ac..5474d8649be 100644 --- a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp +++ b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp @@ -1,6 +1,10 @@ #include <ydb/core/kqp/ut/common/kqp_ut_common.h> #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> +#include <ydb/core/kqp/counters/kqp_counters.h> + +#include <ydb/core/tx/datashard/datashard_failpoints.h> + #include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h> #include <library/cpp/json/json_prettifier.h> @@ -524,6 +528,41 @@ Y_UNIT_TEST_SUITE(KqpScripting) { CompareYson(R"([[[8u]]])", StreamResultToYson(it)); } + Y_UNIT_TEST(StreamExecuteYqlScriptScanCancelation) { + TKikimrRunner kikimr; + TScriptingClient client(kikimr.GetDriver()); + NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); + + NDataShard::gSkipReadIteratorResultFailPoint.Enable(-1); + + { + auto it = client.StreamExecuteYqlScript(R"( + PRAGMA kikimr.ScanQuery = "true"; + SELECT * FROM `/Root/EightShard` WHERE Text = "Value1"; + )").GetValueSync(); + + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + + // We must wait execution to be started + int count = 60; + while (counters.GetActiveSessionActors()->Val() != 2 && count) { + count--; + Sleep(TDuration::Seconds(1)); + } + + UNIT_ASSERT_C(count, "Unable to wait second session actor (executing compiled program) start"); + } + + NDataShard::gSkipReadIteratorResultFailPoint.Disable(); + int count = 60; + while (counters.GetActiveSessionActors()->Val() != 1 && count) { + count--; + Sleep(TDuration::Seconds(1)); + } + + UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work"); + } + Y_UNIT_TEST(StreamExecuteYqlScriptScanScalar) { TKikimrRunner kikimr; TScriptingClient client(kikimr.GetDriver()); diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index ff63be7746e..beae056e505 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -1,3 +1,4 @@ +#include "datashard_failpoints.h" #include "datashard_impl.h" #include "datashard_read_operation.h" #include "setup_sys_locks.h" @@ -1404,7 +1405,10 @@ public: << ", firstUnprocessed# " << state.FirstUnprocessedQuery); Reader->FillResult(*Result, state); - Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId); + + if (!gSkipReadIteratorResultFailPoint.Check(Self->TabletID())) { + Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId); + } } void Complete(const TActorContext& ctx) override { diff --git a/ydb/core/tx/datashard/datashard_failpoints.cpp b/ydb/core/tx/datashard/datashard_failpoints.cpp index a8f785dab51..3c1d19aa064 100644 --- a/ydb/core/tx/datashard/datashard_failpoints.cpp +++ b/ydb/core/tx/datashard/datashard_failpoints.cpp @@ -5,5 +5,6 @@ namespace NDataShard { TCancelTxFailPoint gCancelTxFailPoint; TSkipRepliesFailPoint gSkipRepliesFailPoint; +TSkipReadIteratorResultFailPoint gSkipReadIteratorResultFailPoint; }} diff --git a/ydb/core/tx/datashard/datashard_failpoints.h b/ydb/core/tx/datashard/datashard_failpoints.h index f6b8cac58bd..0e3516da95b 100644 --- a/ydb/core/tx/datashard/datashard_failpoints.h +++ b/ydb/core/tx/datashard/datashard_failpoints.h @@ -119,7 +119,50 @@ struct TSkipRepliesFailPoint { } }; +// Allows to skip specified number of replies from datashard by TabletID and TxId +struct TSkipReadIteratorResultFailPoint { + TAtomic Enabled; + TSpinLock Lock; + ui64 TabletId; + + TSkipReadIteratorResultFailPoint() { + Disable(); + } + + void Enable(ui64 tabletId) { + Disable(); + + TGuard<TSpinLock> g(Lock); + TabletId = tabletId; + + AtomicSet(Enabled, 1); + } + + void Disable() { + TGuard<TSpinLock> g(Lock); + TabletId = 0; + + AtomicSet(Enabled, 0); + } + + bool Check(ui64 tabletId) { + if (!AtomicGet(Enabled)) { + return false; + } + + TGuard<TSpinLock> g(Lock); + + if (tabletId != TabletId && TabletId != (ui64)-1) { + return false; + } + + return true; + } +}; + + extern TCancelTxFailPoint gCancelTxFailPoint; extern TSkipRepliesFailPoint gSkipRepliesFailPoint; +extern TSkipReadIteratorResultFailPoint gSkipReadIteratorResultFailPoint; }} |