aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-04-24 20:24:22 +0300
committerdcherednik <dcherednik@ydb.tech>2023-04-24 20:24:22 +0300
commite4e423960ce04a66895607b49f17348193241699 (patch)
tree5555f72ac81fa3c9f4db677cef5b520dcd18290e
parentc562747cc8a659d9306a4e3986225286a504671b (diff)
downloadydb-e4e423960ce04a66895607b49f17348193241699.tar.gz
Improve cancelation for scan requests.
Old logic works only in case if there was at least one part (rpc layer saw executor id). New logic - subscribe for ClientLost from sessionActor. In this case session actor sends AbortExecution to executor to executor. A bit more complex logic in case of scripting service. We have 2 session actors to handle this request. Both of them are subscribed for cancelation.
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp57
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp64
-rw-r--r--ydb/core/kqp/counters/kqp_counters.cpp4
-rw-r--r--ydb/core/kqp/counters/kqp_counters.h1
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp2
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp21
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp6
-rw-r--r--ydb/core/kqp/ut/scan/kqp_scan_ut.cpp35
-rw-r--r--ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp39
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_failpoints.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard_failpoints.h43
12 files changed, 187 insertions, 92 deletions
diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
index 3ccad104efc..5337c436e9c 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
@@ -91,18 +91,12 @@ bool NeedReportPlan(const Ydb::Table::ExecuteScanQueryRequest& req) {
}
}
-bool FillKqpRequest(const Ydb::Table::ExecuteScanQueryRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest,
- TParseRequestError& error)
+bool CheckRequest(const Ydb::Table::ExecuteScanQueryRequest& req, TParseRequestError& error)
{
- kqpRequest.MutableRequest()->MutableYdbParameters()->insert(req.parameters().begin(), req.parameters().end());
switch (req.mode()) {
case Ydb::Table::ExecuteScanQueryRequest::MODE_EXEC:
- kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
- kqpRequest.MutableRequest()->SetStatsMode(GetKqpStatsMode(req.collect_stats()));
- kqpRequest.MutableRequest()->SetCollectStats(req.collect_stats());
break;
case Ydb::Table::ExecuteScanQueryRequest::MODE_EXPLAIN:
- kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXPLAIN);
break;
default: {
NYql::TIssues issues;
@@ -111,8 +105,6 @@ bool FillKqpRequest(const Ydb::Table::ExecuteScanQueryRequest& req, NKikimrKqp::
return false;
}
}
- kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_SCAN);
- kqpRequest.MutableRequest()->SetKeepSession(false);
auto& query = req.query();
switch (query.query_case()) {
@@ -122,8 +114,6 @@ bool FillKqpRequest(const Ydb::Table::ExecuteScanQueryRequest& req, NKikimrKqp::
error = TParseRequestError(Ydb::StatusIds::BAD_REQUEST, issues);
return false;
}
-
- kqpRequest.MutableRequest()->SetQuery(query.yql_text());
break;
}
@@ -213,22 +203,31 @@ private:
const auto req = Request_->GetProtoRequest();
const auto traceId = Request_->GetTraceId();
- auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
- SetAuthToken(ev, *Request_);
- SetDatabase(ev, *Request_);
- SetRlPath(ev, *Request_);
-
- if (traceId) {
- ev->Record.SetTraceId(traceId.GetRef());
- }
-
- ActorIdToProto(this->SelfId(), ev->Record.MutableRequestActorId());
-
TParseRequestError parseError;
- if (!FillKqpRequest(*req, ev->Record, parseError)) {
+ if (!CheckRequest(*req, parseError)) {
return ReplyFinishStream(parseError.Status, parseError.Issues);
}
+ auto action = (req->mode() == Ydb::Table::ExecuteScanQueryRequest::MODE_EXEC)
+ ? NKikimrKqp::QUERY_ACTION_EXECUTE
+ : NKikimrKqp::QUERY_ACTION_EXPLAIN;
+
+ auto text = req->query().yql_text();
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
+ action,
+ NKikimrKqp::QUERY_TYPE_SQL_SCAN,
+ SelfId(),
+ Request_,
+ TString(), //sessionId
+ std::move(text),
+ TString(), //queryId,
+ nullptr, //tx_control
+ &req->parameters(),
+ req->collect_stats(),
+ nullptr, // query_cache_policy
+ nullptr
+ );
+
if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) {
NYql::TIssues issues;
issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error"));
@@ -402,13 +401,7 @@ private:
}
void HandleClientLost(const TActorContext& ctx) {
- LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, "Client lost, send abort event to executer " << ExecuterActorId_);
-
- if (ExecuterActorId_) {
- auto abortEv = TEvKqp::TEvAbortExecution::Aborted("Client lost"); // any status code can be here
-
- ctx.Send(ExecuterActorId_, abortEv.Release());
- }
+ LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, "Client lost");
// We must try to finish stream otherwise grpc will not free allocated memory
// If stream already scheduled to be finished (ReplyFinishStream already called)
@@ -421,6 +414,8 @@ private:
void HandleTimeout(const TActorContext& ctx) {
TInstant now = TAppData::TimeProvider->Now();
TDuration timeout;
+ LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, "Got timeout event, InactiveClientTimeout: " << InactiveClientTimeout_
+ << " GRpcResponsesSizeQueue: " << GRpcResponsesSizeQueue_.size());
if (InactiveClientTimeout_ && GRpcResponsesSizeQueue_.size() > 0) {
TDuration processTime = now - LastDataStreamTimestamp_;
@@ -485,7 +480,7 @@ private:
}
private:
- std::unique_ptr<TEvStreamExecuteScanQueryRequest> Request_;
+ std::shared_ptr<TEvStreamExecuteScanQueryRequest> Request_;
const ui64 RpcBufferSize_;
TDuration InactiveClientTimeout_;
diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
index 21e03f99115..613fe6a7d89 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
@@ -53,28 +53,6 @@ namespace {
ClientLostTag = 1,
ClientTimeoutTag = 2
};
-
- bool FillKqpRequest(const Ydb::Scripting::ExecuteYqlRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest,
- TParseRequestError& error)
- {
- kqpRequest.MutableRequest()->MutableYdbParameters()->insert(req.parameters().begin(), req.parameters().end());
-
- auto& script = req.script();
- NYql::TIssues issues;
- if (!CheckQuery(script, issues)) {
- error = TParseRequestError(Ydb::StatusIds::BAD_REQUEST, issues);
- return false;
- }
-
- kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
- kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING);
- kqpRequest.MutableRequest()->SetStatsMode(GetKqpStatsMode(req.collect_stats()));
- kqpRequest.MutableRequest()->SetCollectStats(req.collect_stats());
- kqpRequest.MutableRequest()->SetKeepSession(false);
- kqpRequest.MutableRequest()->SetQuery(script);
-
- return true;
- }
}
class TStreamExecuteYqlScriptRPC
@@ -132,6 +110,7 @@ private:
HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
// Overide default forget action which terminate this actor on client disconnect
hFunc(TRpcServices::TEvForgetOperation, HandleForget);
+ hFunc(TEvSubscribeGrpcCancel, HandleSubscribeiGrpcCancel);
default: {
return ReplyFinishStream(TStringBuilder()
<< "Unexpected event received in TStreamExecuteYqlScriptRPC::StateWork: " << ev->GetTypeRewrite());
@@ -152,20 +131,30 @@ private:
const auto req = GetProtoRequest();
const auto traceId = Request_->GetTraceId();
- auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
- SetAuthToken(ev, *Request_);
- SetDatabase(ev, *Request_);
+ auto script = req->script();
- if (traceId) {
- ev->Record.SetTraceId(traceId.GetRef());
+ NYql::TIssues issues;
+ if (!CheckQuery(script, issues)) {
+ return ReplyFinishStream(Ydb::StatusIds::BAD_REQUEST, issues);
}
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
+ NKikimrKqp::QUERY_ACTION_EXECUTE,
+ NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING,
+ SelfId(),
+ Request_,
+ TString(), //sessionId
+ std::move(script),
+ TString(), //queryId
+ nullptr, //tx_control
+ &req->parameters(),
+ req->collect_stats(),
+ nullptr, // query_cache_policy
+ req->has_operation_params() ? &req->operation_params() : nullptr
+ );
+
ActorIdToProto(this->SelfId(), ev->Record.MutableRequestActorId());
- TParseRequestError parseError;
- if (!FillKqpRequest(*req, ev->Record, parseError)) {
- return ReplyFinishStream(parseError.Status, parseError.Issues);
- }
if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) {
return ReplyFinishStream("Couldn't send request to KqpProxy");
}
@@ -356,13 +345,7 @@ private:
}
void HandleClientLost(const TActorContext& ctx) {
- LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, "Client lost, send abort event to executer " << GatewayRequestHandlerActorId_);
-
- if (GatewayRequestHandlerActorId_) {
- auto abortEv = NKqp::TEvKqp::TEvAbortExecution::Aborted("Client lost");
-
- ctx.Send(GatewayRequestHandlerActorId_, abortEv.Release());
- }
+ LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, "Client lost");
// We must try to finish stream otherwise grpc will not free allocated memory
// If stream already scheduled to be finished (ReplyFinishStream already called)
@@ -461,6 +444,11 @@ private:
}
private:
+ void HandleSubscribeiGrpcCancel(TEvSubscribeGrpcCancel::TPtr& ev) {
+ auto as = TActivationContext::ActorSystem();
+ PassSubscription(ev->Get(), Request_.get(), as);
+ }
+
const ui64 RpcBufferSize_;
TDuration InactiveClientTimeout_;
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp
index 2fe3213e249..e60e2f68ecf 100644
--- a/ydb/core/kqp/counters/kqp_counters.cpp
+++ b/ydb/core/kqp/counters/kqp_counters.cpp
@@ -1167,6 +1167,10 @@ const ::NMonitoring::TDynamicCounters::TCounterPtr TKqpCounters::RecompileReques
return TKqpCountersBase::CompileRequestsRecompile;
}
+const ::NMonitoring::TDynamicCounters::TCounterPtr TKqpCounters::GetActiveSessionActors() const {
+ return TKqpCountersBase::ActiveSessionActors;
+}
+
::NMonitoring::TDynamicCounters::TCounterPtr TKqpCounters::GetQueryTypeCounter(
NKikimrKqp::EQueryType queryType)
{
diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h
index bfeed50c1d2..1a0e8cb80c5 100644
--- a/ydb/core/kqp/counters/kqp_counters.h
+++ b/ydb/core/kqp/counters/kqp_counters.h
@@ -328,6 +328,7 @@ public:
const ::NMonitoring::TDynamicCounters::TCounterPtr RecompileRequestGet() const;
::NMonitoring::TDynamicCounterPtr GetKqpCounters() const;
::NMonitoring::TDynamicCounterPtr GetQueryReplayCounters() const;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr GetActiveSessionActors() const;
::NMonitoring::TDynamicCounters::TCounterPtr GetQueryTypeCounter(NKikimrKqp::EQueryType queryType);
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index a721cf20664..21ce6c3ac31 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -1959,6 +1959,8 @@ public:
ev->Record.MutableRequest()->SetKeepSession(false);
ev->Record.MutableRequest()->SetCollectStats(settings.CollectStats);
+ ActorIdToProto(target, ev->Record.MutableCancelationActor());
+
FillParameters(std::move(params), *ev->Record.MutableRequest()->MutableParameters());
return SendKqpScanQueryStreamRequest(ev.Release(), target,
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 67406873e4b..a5e64c722a0 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -706,7 +706,6 @@ public:
LocalSessions->StartIdleCheck(info, GetSessionIdleDuration());
}
- LogResponse(proxyRequest->TraceId, ev->Get()->Record, proxyRequest->DbCounters);
Send(proxyRequest->Sender, ev->Release().Release(), 0, proxyRequest->SenderCookie);
TKqpRequestInfo requestInfo(proxyRequest->TraceId);
@@ -1133,26 +1132,6 @@ private:
}
void LogResponse(const TKqpRequestInfo&,
- const TEvKqp::TProtoArenaHolder<NKikimrKqp::TEvQueryResponse>& holder,
- TKqpDbCountersPtr dbCounters)
- {
- const auto& event = holder.GetRef();
-
- Counters->ReportResponseStatus(dbCounters, event.ByteSize(),
- event.GetYdbStatus());
-
- for (auto& issue : event.GetResponse().GetQueryIssues()) {
- Counters->ReportIssues(dbCounters, issue);
- }
-
- ui64 resultsBytes = 0;
- for (auto& result : event.GetResponse().GetResults()) {
- resultsBytes += result.ByteSize();
- }
- Counters->ReportResultsBytes(dbCounters, resultsBytes);
- }
-
- void LogResponse(const TKqpRequestInfo&,
const NKikimrKqp::TEvCreateSessionResponse& event, TKqpDbCountersPtr dbCounters)
{
Counters->ReportResponseStatus(dbCounters, event.ByteSize(),
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 66211acdd74..8aaa7698e3c 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -411,6 +411,7 @@ public:
void CompileQuery() {
YQL_ENSURE(QueryState);
auto ev = QueryState->BuildCompileRequest();
+ LOG_D("Sending CompileQuery request");
Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId);
Become(&TKqpSessionActor::CompileState);
}
@@ -494,6 +495,7 @@ public:
}
void AcquirePersistentSnapshot() {
+ LOG_D("acquire persistent snapshot");
auto timeout = QueryState->QueryDeadlines.TimeoutAt - TAppData::TimeProvider->Now();
auto* snapMgr = CreateKqpSnapshotManager(Settings.Database, timeout);
@@ -1573,6 +1575,7 @@ public:
Counters->ReportTxAborted(Settings.DbCounters, Transactions.ToBeAbortedSize());
}
+ auto workerId = WorkerId;
if (WorkerId) {
auto ev = std::make_unique<TEvKqp::TEvCloseSessionRequest>();
ev->Record.MutableRequest()->SetSessionId(SessionId);
@@ -1593,7 +1596,8 @@ public:
}
LOG_I("Cleanup start, isFinal: " << isFinal << " CleanupCtx: " << bool{CleanupCtx}
- << " TransactionsToBeAborted.size(): " << (CleanupCtx ? CleanupCtx->TransactionsToBeAborted.size() : 0));
+ << " TransactionsToBeAborted.size(): " << (CleanupCtx ? CleanupCtx->TransactionsToBeAborted.size() : 0)
+ << " WorkerId: " << (workerId ? *workerId : TActorId()));
if (CleanupCtx) {
Become(&TKqpSessionActor::CleanupState);
} else {
diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
index bcae28b42e9..41959f5bcdb 100644
--- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
+++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
@@ -1,5 +1,6 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
+#include <ydb/core/tx/datashard/datashard_failpoints.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
@@ -93,6 +94,40 @@ void CreateNullSampleTables(TKikimrRunner& kikimr) {
} // namespace
Y_UNIT_TEST_SUITE(KqpScan) {
+
+ Y_UNIT_TEST(StreamExecuteScanQueryCancelation) {
+ TKikimrRunner kikimr;
+ NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
+
+ NDataShard::gSkipReadIteratorResultFailPoint.Enable(-1);
+
+ {
+ auto it = kikimr.GetTableClient().StreamExecuteScanQuery(R"(
+ SELECT * FROM `/Root/EightShard` WHERE Text = "Value1";
+ )").GetValueSync();
+
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+
+ // We must wait execution to be started
+ int count = 60;
+ while (counters.GetActiveSessionActors()->Val() != 1 && count) {
+ count--;
+ Sleep(TDuration::Seconds(1));
+ }
+
+ UNIT_ASSERT_C(count, "Unable to wait second session actor (executing compiled program) start");
+ }
+
+ NDataShard::gSkipRepliesFailPoint.Disable();
+ int count = 60;
+ while (counters.GetActiveSessionActors()->Val() != 0 && count) {
+ count--;
+ Sleep(TDuration::Seconds(1));
+ }
+
+ UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work");
+ }
+
Y_UNIT_TEST(IsNull) {
auto kikimr = DefaultKikimrRunner();
CreateNullSampleTables(kikimr);
diff --git a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
index ab0bcb354ac..5474d8649be 100644
--- a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
+++ b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
@@ -1,6 +1,10 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
+#include <ydb/core/kqp/counters/kqp_counters.h>
+
+#include <ydb/core/tx/datashard/datashard_failpoints.h>
+
#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h>
#include <library/cpp/json/json_prettifier.h>
@@ -524,6 +528,41 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
CompareYson(R"([[[8u]]])", StreamResultToYson(it));
}
+ Y_UNIT_TEST(StreamExecuteYqlScriptScanCancelation) {
+ TKikimrRunner kikimr;
+ TScriptingClient client(kikimr.GetDriver());
+ NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
+
+ NDataShard::gSkipReadIteratorResultFailPoint.Enable(-1);
+
+ {
+ auto it = client.StreamExecuteYqlScript(R"(
+ PRAGMA kikimr.ScanQuery = "true";
+ SELECT * FROM `/Root/EightShard` WHERE Text = "Value1";
+ )").GetValueSync();
+
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+
+ // We must wait execution to be started
+ int count = 60;
+ while (counters.GetActiveSessionActors()->Val() != 2 && count) {
+ count--;
+ Sleep(TDuration::Seconds(1));
+ }
+
+ UNIT_ASSERT_C(count, "Unable to wait second session actor (executing compiled program) start");
+ }
+
+ NDataShard::gSkipReadIteratorResultFailPoint.Disable();
+ int count = 60;
+ while (counters.GetActiveSessionActors()->Val() != 1 && count) {
+ count--;
+ Sleep(TDuration::Seconds(1));
+ }
+
+ UNIT_ASSERT_C(count, "Unable to wait for proper active session count, it looks like cancelation doesn`t work");
+ }
+
Y_UNIT_TEST(StreamExecuteYqlScriptScanScalar) {
TKikimrRunner kikimr;
TScriptingClient client(kikimr.GetDriver());
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index ff63be7746e..beae056e505 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -1,3 +1,4 @@
+#include "datashard_failpoints.h"
#include "datashard_impl.h"
#include "datashard_read_operation.h"
#include "setup_sys_locks.h"
@@ -1404,7 +1405,10 @@ public:
<< ", firstUnprocessed# " << state.FirstUnprocessedQuery);
Reader->FillResult(*Result, state);
- Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId);
+
+ if (!gSkipReadIteratorResultFailPoint.Check(Self->TabletID())) {
+ Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId);
+ }
}
void Complete(const TActorContext& ctx) override {
diff --git a/ydb/core/tx/datashard/datashard_failpoints.cpp b/ydb/core/tx/datashard/datashard_failpoints.cpp
index a8f785dab51..3c1d19aa064 100644
--- a/ydb/core/tx/datashard/datashard_failpoints.cpp
+++ b/ydb/core/tx/datashard/datashard_failpoints.cpp
@@ -5,5 +5,6 @@ namespace NDataShard {
TCancelTxFailPoint gCancelTxFailPoint;
TSkipRepliesFailPoint gSkipRepliesFailPoint;
+TSkipReadIteratorResultFailPoint gSkipReadIteratorResultFailPoint;
}}
diff --git a/ydb/core/tx/datashard/datashard_failpoints.h b/ydb/core/tx/datashard/datashard_failpoints.h
index f6b8cac58bd..0e3516da95b 100644
--- a/ydb/core/tx/datashard/datashard_failpoints.h
+++ b/ydb/core/tx/datashard/datashard_failpoints.h
@@ -119,7 +119,50 @@ struct TSkipRepliesFailPoint {
}
};
+// Allows to skip specified number of replies from datashard by TabletID and TxId
+struct TSkipReadIteratorResultFailPoint {
+ TAtomic Enabled;
+ TSpinLock Lock;
+ ui64 TabletId;
+
+ TSkipReadIteratorResultFailPoint() {
+ Disable();
+ }
+
+ void Enable(ui64 tabletId) {
+ Disable();
+
+ TGuard<TSpinLock> g(Lock);
+ TabletId = tabletId;
+
+ AtomicSet(Enabled, 1);
+ }
+
+ void Disable() {
+ TGuard<TSpinLock> g(Lock);
+ TabletId = 0;
+
+ AtomicSet(Enabled, 0);
+ }
+
+ bool Check(ui64 tabletId) {
+ if (!AtomicGet(Enabled)) {
+ return false;
+ }
+
+ TGuard<TSpinLock> g(Lock);
+
+ if (tabletId != TabletId && TabletId != (ui64)-1) {
+ return false;
+ }
+
+ return true;
+ }
+};
+
+
extern TCancelTxFailPoint gCancelTxFailPoint;
extern TSkipRepliesFailPoint gSkipRepliesFailPoint;
+extern TSkipReadIteratorResultFailPoint gSkipReadIteratorResultFailPoint;
}}