aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-08-24 16:26:26 +0300
committersnaury <snaury@ydb.tech>2023-08-24 16:45:45 +0300
commita62381eb07390ecd888e550fe69845f46ba4f72b (patch)
tree5b4155cf1ab40815e98f94d74e0e2a29768f12fb
parent890fb1bb552aca548086b4a920cb89f1af9eb114 (diff)
downloadydb-a62381eb07390ecd888e550fe69845f46ba4f72b.tar.gz
Subscribe to overload in bulk upsert and retry when it goes down KIKIMR-19021
-rw-r--r--ydb/core/base/tablet_pipe.h5
-rw-r--r--ydb/core/grpc_services/rpc_load_rows.cpp22
-rw-r--r--ydb/core/protos/tx_datashard.proto15
-rw-r--r--ydb/core/tablet/tablet_pipe_server.cpp2
-rw-r--r--ydb/core/tablet_flat/flat_executor.cpp50
-rw-r--r--ydb/core/tablet_flat/flat_executor.h4
-rw-r--r--ydb/core/tablet_flat/tablet_flat_executor.cpp8
-rw-r--r--ydb/core/tablet_flat/tablet_flat_executor.h2
-rw-r--r--ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/datashard.cpp84
-rw-r--r--ydb/core/tx/datashard/datashard.h30
-rw-r--r--ydb/core/tx/datashard/datashard__op_rows.cpp49
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h38
-rw-r--r--ydb/core/tx/datashard/datashard_overload.cpp106
-rw-r--r--ydb/core/tx/datashard/datashard_ut_upload_rows.cpp80
-rw-r--r--ydb/core/tx/datashard/reject_reason.h58
-rw-r--r--ydb/core/tx/datashard/ya.make1
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h144
-rw-r--r--ydb/services/ydb/ydb_bulk_upsert_ut.cpp7
22 files changed, 630 insertions, 79 deletions
diff --git a/ydb/core/base/tablet_pipe.h b/ydb/core/base/tablet_pipe.h
index d92d4a32cc..42287e10d4 100644
--- a/ydb/core/base/tablet_pipe.h
+++ b/ydb/core/base/tablet_pipe.h
@@ -141,15 +141,18 @@ namespace NKikimr {
};
struct TEvServerConnected : public TEventLocal<TEvServerConnected, EvServerConnected> {
- TEvServerConnected(ui64 tabletId, const TActorId& clientId, const TActorId& serverId)
+ TEvServerConnected(ui64 tabletId, const TActorId& clientId, const TActorId& serverId,
+ const TActorId& interconnectSession = {})
: TabletId(tabletId)
, ClientId(clientId)
, ServerId(serverId)
+ , InterconnectSession(interconnectSession)
{}
const ui64 TabletId;
const TActorId ClientId;
const TActorId ServerId;
+ const TActorId InterconnectSession;
};
struct TEvClientDestroyed : public TEventLocal<TEvClientDestroyed, EvClientDestroyed> {
diff --git a/ydb/core/grpc_services/rpc_load_rows.cpp b/ydb/core/grpc_services/rpc_load_rows.cpp
index 6f3f442d4d..7030002aff 100644
--- a/ydb/core/grpc_services/rpc_load_rows.cpp
+++ b/ydb/core/grpc_services/rpc_load_rows.cpp
@@ -122,6 +122,17 @@ public:
{}
private:
+ void OnBeforeStart(const TActorContext& ctx) override {
+ Request->SetFinishAction([selfId = ctx.SelfID, as = ctx.ExecutorThread.ActorSystem]() {
+ as->Send(selfId, new TEvents::TEvPoison);
+ });
+ }
+
+ void OnBeforePoison(const TActorContext&) override {
+ // Client is gone, but we need to "reply" anyway?
+ Request->SendResult(Ydb::StatusIds::CANCELLED, {});
+ }
+
bool ReportCostInfoEnabled() const {
return GetProtoRequest(Request.get())->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED;
}
@@ -274,6 +285,17 @@ public:
{}
private:
+ void OnBeforeStart(const TActorContext& ctx) override {
+ Request->SetFinishAction([selfId = ctx.SelfID, as = ctx.ExecutorThread.ActorSystem]() {
+ as->Send(selfId, new TEvents::TEvPoison);
+ });
+ }
+
+ void OnBeforePoison(const TActorContext&) override {
+ // Client is gone, but we need to "reply" anyway?
+ Request->SendResult(Ydb::StatusIds::CANCELLED, {});
+ }
+
bool ReportCostInfoEnabled() const {
return GetProtoRequest(Request.get())->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED;
}
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index a176f797ac..948ea0d2e0 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -865,12 +865,14 @@ message TEvUploadRowsRequest {
repeated TSerializedRowToLoad Rows = 3;
optional uint64 CancelDeadlineMs = 4; // Wallclock timestamp (not duration)
optional bool WriteToTableShadow = 5;
+ optional uint64 OverloadSubscribe = 7;
}
message TEvUploadRowsResponse {
optional uint64 TabletID = 1;
optional uint32 Status = 2;
optional string ErrorDescription = 3;
+ optional uint64 OverloadSubscribed = 4;
}
message TEvReadColumnsRequest {
@@ -1297,6 +1299,7 @@ message TEvEraseRowsRequest {
repeated uint32 KeyColumnIds = 2;
repeated bytes KeyColumns = 3; // SerilializedCellVector
optional uint64 SchemaVersion = 5;
+ optional uint64 OverloadSubscribe = 6;
oneof Condition {
TExpirationCondition Expiration = 4;
@@ -1328,6 +1331,7 @@ message TEvEraseRowsResponse {
optional uint64 TabletID = 1;
optional EStatus Status = 2;
optional string ErrorDescription = 3;
+ optional uint64 OverloadSubscribed = 4;
}
message TEvConditionalEraseRowsRequest {
@@ -1905,3 +1909,14 @@ message TTxVolatileDetails {
// When true all preceding transactions are dependencies
optional bool CommitOrdered = 8;
}
+
+// Sent by datashard when some overload reason stopped being relevant
+message TEvOverloadReady {
+ optional uint64 TabletID = 1;
+ optional uint64 SeqNo = 2;
+}
+
+// Sent by overload subscribers to unsubscribe and free memory
+message TEvOverloadUnsubscribe {
+ optional uint64 SeqNo = 1;
+}
diff --git a/ydb/core/tablet/tablet_pipe_server.cpp b/ydb/core/tablet/tablet_pipe_server.cpp
index a1e829a30b..0fceaab97d 100644
--- a/ydb/core/tablet/tablet_pipe_server.cpp
+++ b/ydb/core/tablet/tablet_pipe_server.cpp
@@ -211,7 +211,7 @@ namespace NTabletPipe {
void OnConnected(const TActorContext& ctx) {
Become(&TThis::StateActive);
SendToClient(ctx, new TEvTabletPipe::TEvConnectResult(NKikimrProto::OK, TabletId, ClientId, ctx.SelfID, Leader, Generation), IEventHandle::FlagTrackDelivery, ConnectCookie);
- ctx.Send(RecipientId, new TEvTabletPipe::TEvServerConnected(TabletId, ClientId, ctx.SelfID));
+ ctx.Send(RecipientId, new TEvTabletPipe::TEvServerConnected(TabletId, ClientId, ctx.SelfID, InterconnectSession));
Connected = true;
}
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp
index 4851cde9e5..bc2c81c8e3 100644
--- a/ydb/core/tablet_flat/flat_executor.cpp
+++ b/ydb/core/tablet_flat/flat_executor.cpp
@@ -44,6 +44,8 @@
namespace NKikimr {
namespace NTabletFlatExecutor {
+static constexpr ui64 MaxTxInFly = 10000;
+
LWTRACE_USING(TABLET_FLAT_PROVIDER)
struct TCompactionChangesCtx {
@@ -226,7 +228,14 @@ void TExecutor::ReflectSchemeSettings() noexcept
}
void TExecutor::OnYellowChannels(TVector<ui32> yellowMoveChannels, TVector<ui32> yellowStopChannels) {
+ size_t oldMoveCount = Stats->YellowMoveChannels.size();
+ size_t oldStopCount = Stats->YellowStopChannels.size();
CheckYellow(std::move(yellowMoveChannels), std::move(yellowStopChannels));
+ if (oldMoveCount != Stats->YellowMoveChannels.size() ||
+ oldStopCount != Stats->YellowStopChannels.size())
+ {
+ Owner->OnYellowChannelsChanged();
+ }
}
void TExecutor::CheckYellow(TVector<ui32> &&yellowMoveChannels, TVector<ui32> &&yellowStopChannels, bool terminal) {
@@ -328,12 +337,20 @@ void TExecutor::Handle(TEvTablet::TEvCheckBlobstorageStatusResult::TPtr &ev) {
}
}
+ auto prevMoveChannels = Stats->YellowMoveChannels;
+ auto prevStopChannels = Stats->YellowStopChannels;
Stats->YellowMoveChannels.clear();
Stats->YellowStopChannels.clear();
Stats->IsAnyChannelYellowMove = false;
Stats->IsAnyChannelYellowStop = false;
CheckYellow(std::move(lightYellowMoveChannels), std::move(yellowStopChannels));
+
+ if (prevMoveChannels != Stats->YellowMoveChannels ||
+ prevStopChannels != Stats->YellowStopChannels)
+ {
+ Owner->OnYellowChannelsChanged();
+ }
}
void TExecutor::ActivateFollower(const TActorContext &ctx) {
@@ -1221,8 +1238,11 @@ void TExecutor::AdvancePendingPartSwitches() {
}
}
- if (PendingPartSwitches.empty()) // could be border change
+ // could be border change
+ if (PendingPartSwitches.empty()) {
PlanTransactionActivation();
+ MaybeRelaxRejectProbability();
+ }
}
bool TExecutor::ApplyReadyPartSwitches() {
@@ -2341,6 +2361,8 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv
ResourceMetrics->CPU.Increment(bookkeepingTimeuS + execTimeuS, Time->Now());
ResourceMetrics->TryUpdate(ctx);
}
+
+ MaybeRelaxRejectProbability();
}
void TExecutor::MakeLogSnapshot() {
@@ -2820,6 +2842,8 @@ void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext
ActiveTransaction = false;
PlanTransactionActivation();
+
+ MaybeRelaxRejectProbability();
}
void TExecutor::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr &ev) {
@@ -3375,6 +3399,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled)
}
Owner->CompactionComplete(tableId, OwnerCtx());
+ MaybeRelaxRejectProbability();
ActiveTransaction = false;
@@ -3535,8 +3560,10 @@ void TExecutor::UpdateCounters(const TActorContext &ctx) {
float TExecutor::GetRejectProbability() const {
// Limit number of in-flight TXs
// TODO: make configurable
- if (Stats->TxInFly > 10000)
+ if (Stats->TxInFly > MaxTxInFly) {
+ HadRejectProbabilityByTxInFly = true;
return 1.0;
+ }
// Followers do not control compaction so let's always allow to read the data from follower
if (Stats->IsFollower)
@@ -3561,9 +3588,28 @@ float TExecutor::GetRejectProbability() const {
const float overloadFactor = CompactionLogic->GetOverloadFactor();
const float rejectProbability = calcProbability(overloadFactor);
+ if (rejectProbability > 0.0f) {
+ HadRejectProbabilityByOverload = true;
+ }
+
return rejectProbability;
}
+void TExecutor::MaybeRelaxRejectProbability() {
+ if (HadRejectProbabilityByTxInFly && Stats->TxInFly <= MaxTxInFly ||
+ HadRejectProbabilityByOverload)
+ {
+ HadRejectProbabilityByTxInFly = false;
+ HadRejectProbabilityByOverload = false;
+ GetRejectProbability();
+ if (!HadRejectProbabilityByTxInFly &&
+ !HadRejectProbabilityByOverload)
+ {
+ Owner->OnRejectProbabilityRelaxed();
+ }
+ }
+}
+
TString TExecutor::BorrowSnapshot(ui32 table, const TTableSnapshotContext &snap, TRawVals from, TRawVals to, ui64 loaner) const
{
diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h
index 8906408299..ae49cb4710 100644
--- a/ydb/core/tablet_flat/flat_executor.h
+++ b/ydb/core/tablet_flat/flat_executor.h
@@ -460,6 +460,9 @@ class TExecutor
bool HadFollowerAttached = false;
bool NeedFollowerSnapshot = false;
+ mutable bool HadRejectProbabilityByTxInFly = false;
+ mutable bool HadRejectProbabilityByOverload = false;
+
THashMap<ui32, TIntrusivePtr<TBarrier>> InFlyCompactionGcBarriers;
TDeque<THolder<TEvTablet::TFUpdateBody>> PostponedFollowerUpdates;
THashMap<ui32, TVector<TIntrusivePtr<TBarrier>>> InFlySnapCollectionBarriers;
@@ -694,6 +697,7 @@ public:
ui64 TabletId() const { return Owner->TabletID(); }
float GetRejectProbability() const override;
+ void MaybeRelaxRejectProbability();
TActorId GetLauncher() const { return Launcher; }
};
diff --git a/ydb/core/tablet_flat/tablet_flat_executor.cpp b/ydb/core/tablet_flat/tablet_flat_executor.cpp
index 258addbf7f..ebd5eca21d 100644
--- a/ydb/core/tablet_flat/tablet_flat_executor.cpp
+++ b/ydb/core/tablet_flat/tablet_flat_executor.cpp
@@ -37,6 +37,14 @@ namespace NFlatExecutorSetup {
return true;
}
+ void ITablet::OnYellowChannelsChanged() {
+ // nothing by default
+ }
+
+ void ITablet::OnRejectProbabilityRelaxed() {
+ // nothing by default
+ }
+
void ITablet::UpdateTabletInfo(TIntrusivePtr<TTabletStorageInfo> info, const TActorId& launcherID) {
if (info)
TabletInfo = info;
diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h
index 9b611f793e..768857db77 100644
--- a/ydb/core/tablet_flat/tablet_flat_executor.h
+++ b/ydb/core/tablet_flat/tablet_flat_executor.h
@@ -455,6 +455,8 @@ namespace NFlatExecutorSetup {
virtual void ScanComplete(NTable::EAbort status, TAutoPtr<IDestructable> prod, ui64 cookie, const TActorContext &ctx);
virtual bool ReassignChannelsEnabled() const;
+ virtual void OnYellowChannelsChanged();
+ virtual void OnRejectProbabilityRelaxed();
// memory usage excluding transactions and executor cache.
virtual ui64 GetMemoryUsage() const { return 50 << 10; }
diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
index fc6df7e8de..2a8ad90687 100644
--- a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
@@ -233,6 +233,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_distributed_erase.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_failpoints.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_dep_tracker.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_overload.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_pipeline.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_s3_downloads.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_s3_uploads.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
index 877fc708c7..0622573166 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
@@ -234,6 +234,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_distributed_erase.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_failpoints.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_dep_tracker.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_overload.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_pipeline.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_s3_downloads.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_s3_uploads.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
index 877fc708c7..0622573166 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
@@ -234,6 +234,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_distributed_erase.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_failpoints.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_dep_tracker.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_overload.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_pipeline.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_s3_downloads.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_s3_uploads.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
index 645ad6538e..4a4986e6f8 100644
--- a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
@@ -234,6 +234,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_distributed_erase.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_failpoints.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_dep_tracker.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_overload.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_pipeline.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_s3_downloads.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_s3_uploads.cpp
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 0f4dca03a2..dbe1709fd8 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -916,6 +916,8 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
IncCounter(COUNTER_CHANGE_RECORDS_REMOVED);
SetCounter(COUNTER_CHANGE_QUEUE_SIZE, ChangesQueue.size());
+
+ CheckChangesQueueNoOverflow();
}
void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records) {
@@ -2445,11 +2447,13 @@ void TDataShard::Handle(TEvDataShard::TEvStateChangedResult::TPtr& ev, const TAc
bool TDataShard::CheckDataTxReject(const TString& opDescr,
const TActorContext &ctx,
NKikimrTxDataShard::TEvProposeTransactionResult::EStatus &rejectStatus,
- TString &reason)
+ ERejectReasons &rejectReasons,
+ TString &rejectDescription)
{
bool reject = false;
rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED;
- TVector<TString> rejectReasons;
+ rejectReasons = ERejectReasons::None;
+ TVector<TString> rejectDescriptions;
// In v0.5 reject all transactions on split Src after receiving EvSplit
if (State == TShardState::SplitSrcWaitForNoTxInFlight ||
@@ -2457,37 +2461,43 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr,
State == TShardState::SplitSrcSendingSnapshot ||
State == TShardState::SplitSrcWaitForPartitioningChanged) {
reject = true;
- rejectReasons.push_back(TStringBuilder()
+ rejectReasons |= ERejectReasons::WrongState;
+ rejectDescriptions.push_back(TStringBuilder()
<< "is in process of split opId " << SrcSplitOpId
<< " state " << DatashardStateName(State)
<< " (wrong shard state)");
} else if (State == TShardState::SplitDstReceivingSnapshot) {
reject = true;
- rejectReasons.push_back(TStringBuilder()
+ rejectReasons |= ERejectReasons::WrongState;
+ rejectDescriptions.push_back(TStringBuilder()
<< "is in process of split opId " << DstSplitOpId
<< " state " << DatashardStateName(State));
} else if (State == TShardState::PreOffline || State == TShardState::Offline) {
reject = true;
rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::ERROR;
- rejectReasons.push_back("is in a pre/offline state assuming this is due to a finished split (wrong shard state)");
+ rejectReasons |= ERejectReasons::WrongState;
+ rejectDescriptions.push_back("is in a pre/offline state assuming this is due to a finished split (wrong shard state)");
} else if (MvccSwitchState == TSwitchState::SWITCHING) {
reject = true;
- rejectReasons.push_back(TStringBuilder()
+ rejectReasons |= ERejectReasons::WrongState;
+ rejectDescriptions.push_back(TStringBuilder()
<< "is in process of mvcc state change"
<< " state " << DatashardStateName(State));
}
if (Pipeline.HasDrop()) {
reject = true;
- rejectReasons.push_back("is in process of drop");
rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::ERROR;
+ rejectReasons |= ERejectReasons::Dropping;
+ rejectDescriptions.push_back("is in process of drop");
}
ui64 txInfly = TxInFly();
TDuration lag = GetDataTxCompleteLag();
if (txInfly > 1 && lag > TDuration::MilliSeconds(MaxTxLagMilliseconds)) {
reject = true;
- rejectReasons.push_back(TStringBuilder()
+ rejectReasons |= ERejectReasons::OverloadByLag;
+ rejectDescriptions.push_back(TStringBuilder()
<< "lags behind, lag: " << lag
<< " in-flight tx count: " << txInfly);
}
@@ -2496,8 +2506,10 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr,
if (!reject && rejectProbabilty > 0) {
float rnd = AppData(ctx)->RandomProvider->GenRandReal2();
reject |= (rnd < rejectProbabilty);
- if (reject)
- rejectReasons.push_back("decided to reject due to given RejectProbability");
+ if (reject) {
+ rejectReasons |= ERejectReasons::OverloadByProbability;
+ rejectDescriptions.push_back("decided to reject due to given RejectProbability");
+ }
}
size_t totalInFly =
@@ -2505,30 +2517,33 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr,
+ ProposeQueue.Size() + TxWaiting();
if (totalInFly > GetMaxTxInFly()) {
reject = true;
- rejectReasons.push_back("MaxTxInFly was exceeded");
+ rejectReasons |= ERejectReasons::OverloadByTxInFly;
+ rejectDescriptions.push_back("MaxTxInFly was exceeded");
}
if (!reject && Stopping) {
reject = true;
- rejectReasons.push_back("is restarting");
+ rejectReasons |= ERejectReasons::WrongState;
+ rejectDescriptions.push_back("is restarting");
}
if (!reject) {
for (auto& it : TableInfos) {
if (it.second->IsBackup) {
reject = true;
- rejectReasons.push_back("is a backup table");
rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::ERROR;
+ rejectReasons |= ERejectReasons::WrongState;
+ rejectDescriptions.push_back("is a backup table");
break;
}
}
}
if (reject) {
- reason = TStringBuilder()
+ rejectDescription = TStringBuilder()
<< "Rejecting " << opDescr
<< " because datashard " << TabletID() << ": "
- << JoinSeq("; ", rejectReasons);
+ << JoinSeq("; ", rejectDescriptions);
}
return reject;
@@ -2550,8 +2565,9 @@ bool TDataShard::CheckDataTxRejectAndReply(TEvDataShard::TEvProposeTransaction*
TString txDescr = TStringBuilder() << "data TxId " << msg->GetTxId();
NKikimrTxDataShard::TEvProposeTransactionResult::EStatus rejectStatus;
- TString rejectReason;
- bool reject = CheckDataTxReject(txDescr, ctx, rejectStatus, rejectReason);
+ ERejectReasons rejectReasons;
+ TString rejectDescription;
+ bool reject = CheckDataTxReject(txDescr, ctx, rejectStatus, rejectReasons, rejectDescription);
if (reject) {
LWTRACK(ProposeTransactionReject, msg->Orbit);
@@ -2561,8 +2577,8 @@ bool TDataShard::CheckDataTxRejectAndReply(TEvDataShard::TEvProposeTransaction*
msg->GetTxId(),
rejectStatus));
- result->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, rejectReason);
- LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectReason);
+ result->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, rejectDescription);
+ LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectDescription);
ctx.Send(msg->GetSource(), result.Release());
IncCounter(COUNTER_PREPARE_OVERLOADED);
@@ -3031,23 +3047,38 @@ void TDataShard::AbortExpectationsFromDeletedTablet(ui64 tabletId, THashMap<ui64
}
void TDataShard::Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev, const TActorContext &ctx) {
- Y_UNUSED(ev); Y_UNUSED(ctx);
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Server connected at "
<< (Executor()->GetStats().IsFollower ? "follower " : "leader ")
<< "tablet# " << ev->Get()->TabletId
<< ", clientId# " << ev->Get()->ClientId
<< ", serverId# " << ev->Get()->ServerId
<< ", sessionId# " << ev->InterconnectSession);
+
+ auto res = PipeServers.emplace(
+ std::piecewise_construct,
+ std::forward_as_tuple(ev->Get()->ServerId),
+ std::forward_as_tuple());
+ Y_VERIFY_DEBUG_S(res.second,
+ "Unexpected TEvServerConnected for " << ev->Get()->ServerId);
+
+ res.first->second.InterconnectSession = ev->Get()->InterconnectSession;
}
void TDataShard::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr &ev, const TActorContext &ctx) {
- Y_UNUSED(ev); Y_UNUSED(ctx);
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Server disconnected at "
<< (Executor()->GetStats().IsFollower ? "follower " : "leader ")
<< "tablet# " << ev->Get()->TabletId
<< ", clientId# " << ev->Get()->ClientId
<< ", serverId# " << ev->Get()->ServerId
<< ", sessionId# " << ev->InterconnectSession);
+
+ auto it = PipeServers.find(ev->Get()->ServerId);
+ Y_VERIFY_DEBUG_S(it != PipeServers.end(),
+ "Unexpected TEvServerDisconnected for " << ev->Get()->ServerId);
+
+ DiscardOverloadSubscribers(it->second);
+
+ PipeServers.erase(it);
}
void TDataShard::Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext& ctx) {
@@ -3161,6 +3192,17 @@ bool TDataShard::CheckChangesQueueOverflow() const {
return ChangesQueue.size() >= sizeLimit || ChangesQueueBytes >= bytesLimit;
}
+void TDataShard::CheckChangesQueueNoOverflow() {
+ if (OverloadSubscribersByReason[RejectReasonIndex(ERejectReason::ChangesQueueOverflow)]) {
+ const auto* appData = AppData();
+ const auto sizeLimit = appData->DataShardConfig.GetChangesQueueItemsLimit();
+ const auto bytesLimit = appData->DataShardConfig.GetChangesQueueBytesLimit();
+ if (ChangesQueue.size() < sizeLimit && ChangesQueueBytes < bytesLimit) {
+ NotifyOverloadSubscribers(ERejectReason::ChangesQueueOverflow);
+ }
+ }
+}
+
void TDataShard::DoPeriodicTasks(const TActorContext &ctx) {
UpdateLagCounters(ctx);
UpdateTableStats(ctx);
diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h
index ae17c3af96..695fd692b9 100644
--- a/ydb/core/tx/datashard/datashard.h
+++ b/ydb/core/tx/datashard/datashard.h
@@ -323,6 +323,9 @@ struct TEvDataShard {
EvCdcStreamScanRequest,
EvCdcStreamScanResponse,
+ EvOverloadReady,
+ EvOverloadUnsubscribe,
+
EvEnd
};
@@ -880,6 +883,33 @@ struct TEvDataShard {
}
};
+ struct TEvOverloadReady
+ : public TEventPB<
+ TEvOverloadReady,
+ NKikimrTxDataShard::TEvOverloadReady,
+ EvOverloadReady>
+ {
+ TEvOverloadReady() = default;
+
+ explicit TEvOverloadReady(ui64 tabletId, ui64 seqNo) {
+ Record.SetTabletID(tabletId);
+ Record.SetSeqNo(seqNo);
+ }
+ };
+
+ struct TEvOverloadUnsubscribe
+ : public TEventPB<
+ TEvOverloadUnsubscribe,
+ NKikimrTxDataShard::TEvOverloadUnsubscribe,
+ EvOverloadUnsubscribe>
+ {
+ TEvOverloadUnsubscribe() = default;
+
+ explicit TEvOverloadUnsubscribe(ui64 seqNo) {
+ Record.SetSeqNo(seqNo);
+ }
+ };
+
// In most cases this event is local, thus users must
// use Keys, Ranges and Program struct members instead of corresponding
// protobuf members. In case of remote event these struct members will
diff --git a/ydb/core/tx/datashard/datashard__op_rows.cpp b/ydb/core/tx/datashard/datashard__op_rows.cpp
index 43857d2f98..c48239868d 100644
--- a/ydb/core/tx/datashard/datashard__op_rows.cpp
+++ b/ydb/core/tx/datashard/datashard__op_rows.cpp
@@ -158,45 +158,66 @@ template <typename TEvResponse>
using TSetStatusFunc = void(*)(typename TEvResponse::ProtoRecordType&);
template <typename TEvResponse, typename TEvRequest>
-static void Reject(TDataShard* self, TEvRequest& ev, const TString& txDesc, const TString& reason,
+static void Reject(TDataShard* self, TEvRequest& ev, const TString& txDesc,
+ ERejectReasons rejectReasons, const TString& rejectDescription,
TSetStatusFunc<TEvResponse> setStatusFunc, const TActorContext& ctx)
{
LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, "Rejecting " << txDesc << " request on datashard"
<< ": tablet# " << self->TabletID()
- << ", error# " << reason);
+ << ", error# " << rejectDescription);
auto response = MakeHolder<TEvResponse>();
setStatusFunc(response->Record);
response->Record.SetTabletID(self->TabletID());
- response->Record.SetErrorDescription(reason);
+ response->Record.SetErrorDescription(rejectDescription);
+
+ if (ev->Get()->Record.HasOverloadSubscribe() && self->HasPipeServer(ev->Recipient)) {
+ ui64 seqNo = ev->Get()->Record.GetOverloadSubscribe();
+ auto allowed = (
+ ERejectReasons::OverloadByProbability |
+ ERejectReasons::YellowChannels |
+ ERejectReasons::ChangesQueueOverflow);
+ if ((rejectReasons & allowed) != ERejectReasons::None &&
+ (rejectReasons - allowed) == ERejectReasons::None)
+ {
+ if (self->AddOverloadSubscriber(ev->Recipient, ev->Sender, seqNo, rejectReasons)) {
+ response->Record.SetOverloadSubscribed(seqNo);
+ }
+ }
+ }
+
ctx.Send(ev->Sender, std::move(response));
}
template <typename TEvResponse, typename TEvRequest>
static bool MaybeReject(TDataShard* self, TEvRequest& ev, const TActorContext& ctx, const TString& txDesc, bool isWrite) {
NKikimrTxDataShard::TEvProposeTransactionResult::EStatus rejectStatus;
- TString rejectReason;
- if (self->CheckDataTxReject(txDesc, ctx, rejectStatus, rejectReason)) {
- Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReason, &WrongShardState, ctx);
+ ERejectReasons rejectReasons = ERejectReasons::None;
+ TString rejectDescription;
+ if (self->CheckDataTxReject(txDesc, ctx, rejectStatus, rejectReasons, rejectDescription)) {
+ Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &WrongShardState, ctx);
return true;
}
if (self->CheckChangesQueueOverflow()) {
- rejectReason = TStringBuilder() << "Change queue overflow at tablet " << self->TabletID();
- Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReason, &Overloaded, ctx);
+ rejectReasons = ERejectReasons::ChangesQueueOverflow;
+ rejectDescription = TStringBuilder() << "Change queue overflow at tablet " << self->TabletID();
+ Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &Overloaded, ctx);
return true;
}
if (isWrite) {
if (self->IsAnyChannelYellowStop()) {
self->IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);
- rejectReason = TStringBuilder() << "Cannot perform writes: out of disk space at tablet " << self->TabletID();
- Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReason, &OutOfSpace, ctx);
+ rejectReasons = ERejectReasons::YellowChannels;
+ rejectDescription = TStringBuilder() << "Cannot perform writes: out of disk space at tablet " << self->TabletID();
+ Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &OutOfSpace, ctx);
return true;
} else if (self->IsSubDomainOutOfSpace()) {
self->IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);
- rejectReason = "Cannot perform writes: database is out of disk space";
- Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReason, &DiskSpaceExhausted, ctx);
+ rejectReasons = ERejectReasons::DiskSpace;
+ rejectDescription = "Cannot perform writes: database is out of disk space";
+ Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &DiskSpaceExhausted, ctx);
return true;
}
}
@@ -212,7 +233,7 @@ void TDataShard::Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TAct
}
if (IsReplicated()) {
return Reject<TEvDataShard::TEvUploadRowsResponse>(this, ev, "bulk upsert",
- "Can't execute bulk upsert at replicated table", &ReadOnly, ctx);
+ ERejectReasons::WrongState, "Can't execute bulk upsert at replicated table", &ReadOnly, ctx);
}
if (!MaybeReject<TEvDataShard::TEvUploadRowsResponse>(this, ev, ctx, "bulk upsert", true)) {
Executor()->Execute(new TTxUploadRows(this, ev), ctx);
@@ -229,7 +250,7 @@ void TDataShard::Handle(TEvDataShard::TEvEraseRowsRequest::TPtr& ev, const TActo
}
if (IsReplicated()) {
return Reject<TEvDataShard::TEvEraseRowsResponse>(this, ev, "erase",
- "Can't execute erase at replicated table", &ExecError, ctx);
+ ERejectReasons::WrongState, "Can't execute erase at replicated table", &ExecError, ctx);
}
if (!MaybeReject<TEvDataShard::TEvEraseRowsResponse>(this, ev, ctx, "erase", false)) {
Executor()->Execute(new TTxEraseRows(this, ev), ctx);
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 533d59bc55..2ea68071a8 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -22,6 +22,7 @@
#include "read_iterator.h"
#include "volatile_tx.h"
#include "conflicts_cache.h"
+#include "reject_reason.h"
#include <ydb/core/tx/time_cast/time_cast.h>
#include <ydb/core/tx/tx_processing.h>
@@ -1216,6 +1217,7 @@ class TDataShard
void HandleSafe(TEvDataShard::TEvKqpScan::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvEraseRowsRequest::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvDataShard::TEvOverloadUnsubscribe::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvConditionalEraseRowsRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvConditionalEraseRowsRegistered::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ctx);
@@ -1510,7 +1512,8 @@ public:
bool CheckDataTxReject(const TString& opDescr,
const TActorContext &ctx,
NKikimrTxDataShard::TEvProposeTransactionResult::EStatus& rejectStatus,
- TString &reason);
+ ERejectReasons& rejectReasons,
+ TString& rejectDescription);
bool CheckDataTxRejectAndReply(TEvDataShard::TEvProposeTransaction* msg, const TActorContext& ctx);
TSysLocks& SysLocksTable() { return SysLocks; }
@@ -1665,8 +1668,14 @@ public:
void ScanComplete(NTable::EAbort status, TAutoPtr<IDestructable> prod, ui64 cookie, const TActorContext &ctx) override;
bool ReassignChannelsEnabled() const override;
+ void OnYellowChannelsChanged() override;
+ void OnRejectProbabilityRelaxed() override;
ui64 GetMemoryUsage() const override;
+ bool HasPipeServer(const TActorId& pipeServerId);
+ bool AddOverloadSubscriber(const TActorId& pipeServerId, const TActorId& actorId, ui64 seqNo, ERejectReasons reasons);
+ void NotifyOverloadSubscribers(ERejectReason reason);
+
bool HasSharedBlobs() const;
void CheckInitiateBorrowedPartsReturn(const TActorContext& ctx);
void CheckStateChange(const TActorContext& ctx);
@@ -1921,6 +1930,7 @@ public:
bool CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr& ev) const;
bool CheckChangesQueueOverflow() const;
+ void CheckChangesQueueNoOverflow();
void DeleteReadIterator(TReadIteratorsMap::iterator it);
void CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx);
@@ -2302,6 +2312,31 @@ private:
TTxProgressIdempotentScalarScheduleQueue<TEvPrivate::TEvCleanupTransaction> CleanupQueue;
TTxProgressQueue<ui64, TNoOpDestroy, TEvPrivate::TEvProgressResendReadSet> ResendReadSetQueue;
+ struct TPipeServerInfoOverloadSubscribersTag {};
+
+ struct TOverloadSubscriber {
+ ui64 SeqNo = 0;
+ ERejectReasons Reasons = ERejectReasons::None;
+ };
+
+ struct TPipeServerInfo
+ : public TIntrusiveListItem<TPipeServerInfo, TPipeServerInfoOverloadSubscribersTag>
+ {
+ TPipeServerInfo() = default;
+
+ TActorId InterconnectSession;
+ THashMap<TActorId, TOverloadSubscriber> OverloadSubscribers;
+ };
+
+ using TPipeServers = THashMap<TActorId, TPipeServerInfo>;
+ using TPipeServersWithOverloadSubscribers = TIntrusiveList<TPipeServerInfo, TPipeServerInfoOverloadSubscribersTag>;
+
+ TPipeServers PipeServers;
+ TPipeServersWithOverloadSubscribers PipeServersWithOverloadSubscribers;
+ size_t OverloadSubscribersByReason[RejectReasonCount] = { 0 };
+
+ void DiscardOverloadSubscribers(TPipeServerInfo& pipeServer);
+
class TProposeQueue : private TTxProgressIdempotentScalarQueue<TEvPrivate::TEvDelayedProposeTransaction> {
public:
struct TItem : public TMoveOnly {
@@ -2831,6 +2866,7 @@ protected:
HFunc(TEvDataShard::TEvKqpScan, Handle);
HFunc(TEvDataShard::TEvUploadRowsRequest, Handle);
HFunc(TEvDataShard::TEvEraseRowsRequest, Handle);
+ HFunc(TEvDataShard::TEvOverloadUnsubscribe, Handle);
HFunc(TEvDataShard::TEvConditionalEraseRowsRequest, Handle);
HFunc(TEvPrivate::TEvConditionalEraseRowsRegistered, Handle);
HFunc(TEvDataShard::TEvRead, Handle);
diff --git a/ydb/core/tx/datashard/datashard_overload.cpp b/ydb/core/tx/datashard/datashard_overload.cpp
new file mode 100644
index 0000000000..4724a93a95
--- /dev/null
+++ b/ydb/core/tx/datashard/datashard_overload.cpp
@@ -0,0 +1,106 @@
+#include "datashard_impl.h"
+
+namespace NKikimr::NDataShard {
+
+void TDataShard::OnYellowChannelsChanged() {
+ if (!IsAnyChannelYellowStop()) {
+ NotifyOverloadSubscribers(ERejectReason::YellowChannels);
+ }
+}
+
+void TDataShard::OnRejectProbabilityRelaxed() {
+ NotifyOverloadSubscribers(ERejectReason::OverloadByProbability);
+}
+
+bool TDataShard::HasPipeServer(const TActorId& pipeServerId) {
+ return PipeServers.contains(pipeServerId);
+}
+
+bool TDataShard::AddOverloadSubscriber(const TActorId& pipeServerId, const TActorId& actorId, ui64 seqNo, ERejectReasons reasons) {
+ auto it = PipeServers.find(pipeServerId);
+ if (it != PipeServers.end()) {
+ bool wasEmpty = it->second.OverloadSubscribers.empty();
+ auto& entry = it->second.OverloadSubscribers[actorId];
+ if (entry.SeqNo <= seqNo) {
+ entry.SeqNo = seqNo;
+ // Increment counter for every new reason
+ EnumerateRejectReasons(reasons - entry.Reasons, [&](ERejectReason reason) {
+ OverloadSubscribersByReason[RejectReasonIndex(reason)]++;
+ });
+ entry.Reasons |= reasons;
+ }
+ if (wasEmpty) {
+ PipeServersWithOverloadSubscribers.PushBack(&it->second);
+ }
+ return true;
+ }
+ return false;
+}
+
+void TDataShard::NotifyOverloadSubscribers(ERejectReason reason) {
+ if (OverloadSubscribersByReason[RejectReasonIndex(reason)] == 0) {
+ // Avoid spending time when we know it is pointless
+ return;
+ }
+ ERejectReasons reasons = MakeRejectReasons(reason);
+
+ TPipeServersWithOverloadSubscribers left;
+ while (!PipeServersWithOverloadSubscribers.Empty()) {
+ TPipeServerInfo* pipeServer = PipeServersWithOverloadSubscribers.PopFront();
+ for (auto it = pipeServer->OverloadSubscribers.begin(); it != pipeServer->OverloadSubscribers.end();) {
+ auto current = it++;
+ const TActorId& actorId = current->first;
+ TOverloadSubscriber& entry = current->second;
+ if ((entry.Reasons & reasons) != reasons) {
+ // Reasons don't match
+ continue;
+ }
+ entry.Reasons -= reasons;
+ OverloadSubscribersByReason[RejectReasonIndex(reason)]--;
+ if (entry.Reasons == ERejectReasons::None) {
+ SendViaSession(
+ pipeServer->InterconnectSession,
+ actorId,
+ SelfId(),
+ new TEvDataShard::TEvOverloadReady(TabletID(), entry.SeqNo));
+ pipeServer->OverloadSubscribers.erase(current);
+ }
+ }
+ if (!pipeServer->OverloadSubscribers.empty()) {
+ left.PushBack(pipeServer);
+ }
+ }
+ PipeServersWithOverloadSubscribers.Append(left);
+}
+
+void TDataShard::DiscardOverloadSubscribers(TPipeServerInfo& pipeServer) {
+ for (auto it = pipeServer.OverloadSubscribers.begin(); it != pipeServer.OverloadSubscribers.end(); ++it) {
+ TOverloadSubscriber& entry = it->second;
+ EnumerateRejectReasons(entry.Reasons, [&](ERejectReason reason) {
+ OverloadSubscribersByReason[RejectReasonIndex(reason)]--;
+ });
+ }
+ pipeServer.OverloadSubscribers.clear();
+ PipeServersWithOverloadSubscribers.Remove(&pipeServer);
+}
+
+void TDataShard::Handle(TEvDataShard::TEvOverloadUnsubscribe::TPtr& ev, const TActorContext& ctx) {
+ Y_UNUSED(ctx);
+ auto* msg = ev->Get();
+ if (auto* pipeServer = PipeServers.FindPtr(ev->Recipient)) {
+ auto it = pipeServer->OverloadSubscribers.find(ev->Sender);
+ if (it != pipeServer->OverloadSubscribers.end()) {
+ if (it->second.SeqNo == msg->Record.GetSeqNo()) {
+ EnumerateRejectReasons(it->second.Reasons, [&](ERejectReason reason) {
+ OverloadSubscribersByReason[RejectReasonIndex(reason)]--;
+ });
+ pipeServer->OverloadSubscribers.erase(it);
+ if (pipeServer->OverloadSubscribers.empty()) {
+ PipeServersWithOverloadSubscribers.Remove(pipeServer);
+ }
+ }
+ }
+ }
+}
+
+} // namespae NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp b/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp
index 49e6af420d..d81812c13c 100644
--- a/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp
@@ -744,7 +744,7 @@ Y_UNIT_TEST_SUITE(TTxDataShardUploadRows) {
DoUploadTestRows(server, sender, "/Root/table-1", Ydb::Type::UINT32, Ydb::StatusIds::GENERIC_ERROR);
}
- Y_UNIT_TEST(ShouldRejectOnChangeQueueOverflow) {
+ void DoShouldRejectOnChangeQueueOverflow(bool overloadSubscribe) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
@@ -771,16 +771,86 @@ Y_UNIT_TEST_SUITE(TTxDataShardUploadRows) {
})
);
- runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
- if (ev->GetTypeRewrite() == NDataShard::TEvChangeExchange::EvEnqueueRecords) {
- return TTestActorRuntime::EEventAction::DROP;
+ TVector<ui32> observedUploadStatus;
+ TVector<THolder<IEventHandle>> blockedEnqueueRecords;
+ auto prevObserverFunc = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case NDataShard::TEvChangeExchange::EvEnqueueRecords:
+ blockedEnqueueRecords.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ case TEvDataShard::TEvUploadRowsRequest::EventType:
+ if (!overloadSubscribe) {
+ ev->Get<TEvDataShard::TEvUploadRowsRequest>()->Record.ClearOverloadSubscribe();
+ }
+ break;
+ case TEvDataShard::TEvUploadRowsResponse::EventType:
+ observedUploadStatus.push_back(ev->Get<TEvDataShard::TEvUploadRowsResponse>()->Record.GetStatus());
+ break;
}
return TTestActorRuntime::EEventAction::PROCESS;
});
DoUploadTestRows(server, sender, "/Root/table-1", Ydb::Type::UINT32, Ydb::StatusIds::SUCCESS);
- DoUploadTestRows(server, sender, "/Root/table-1", Ydb::Type::UINT32, Ydb::StatusIds::OVERLOADED);
+
+ UNIT_ASSERT(!observedUploadStatus.empty());
+ UNIT_ASSERT(observedUploadStatus.back() == NKikimrTxDataShard::TError::OK);
+ observedUploadStatus.clear();
+
+ if (!overloadSubscribe) {
+ DoUploadTestRows(server, sender, "/Root/table-1", Ydb::Type::UINT32, Ydb::StatusIds::OVERLOADED);
+ return;
+ }
+
+ TVector<THolder<TEvTxUserProxy::TEvUploadRowsResponse>> responses;
+ auto responseAwaiter = runtime.Register(new TLambdaActor([&](TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTxUserProxy::TEvUploadRowsResponse::EventType: {
+ auto msg = ev->Release<TEvTxUserProxy::TEvUploadRowsResponse>();
+ responses.push_back(std::move(msg));
+ break;
+ }
+ }
+ }));
+
+ DoStartUploadTestRows(server, responseAwaiter, "/Root/table-1", Ydb::Type::UINT32);
+
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ UNIT_ASSERT(!blockedEnqueueRecords.empty());
+ UNIT_ASSERT(!observedUploadStatus.empty());
+ UNIT_ASSERT(observedUploadStatus.back() == NKikimrTxDataShard::TError::SHARD_IS_BLOCKED);
+ observedUploadStatus.clear();
+ UNIT_ASSERT(responses.empty());
+
+ runtime.SetObserverFunc(prevObserverFunc);
+ for (auto& ev : blockedEnqueueRecords) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+ blockedEnqueueRecords.clear();
+
+ auto waitFor = [&](const auto& condition, const TString& description) {
+ if (!condition()) {
+ Cerr << "... waiting for " << description << Endl;
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return condition();
+ };
+ runtime.DispatchEvents(options);
+ UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
+ }
+ };
+
+ waitFor([&]{ return !responses.empty(); }, "upload rows response");
+
+ UNIT_ASSERT_VALUES_EQUAL(responses.back()->Status, Ydb::StatusIds::SUCCESS);
+ }
+
+ Y_UNIT_TEST(ShouldRejectOnChangeQueueOverflow) {
+ DoShouldRejectOnChangeQueueOverflow(false);
+ }
+
+ Y_UNIT_TEST(ShouldRejectOnChangeQueueOverflowAndRetry) {
+ DoShouldRejectOnChangeQueueOverflow(true);
}
}
diff --git a/ydb/core/tx/datashard/reject_reason.h b/ydb/core/tx/datashard/reject_reason.h
new file mode 100644
index 0000000000..32e6cd25cd
--- /dev/null
+++ b/ydb/core/tx/datashard/reject_reason.h
@@ -0,0 +1,58 @@
+#pragma once
+#include "defs.h"
+
+namespace NKikimr::NDataShard {
+
+ // Note: must be synchronized with ERejectReasons below
+ enum class ERejectReason : int {
+ WrongState = 0,
+ Dropping = 1,
+ OverloadByLag = 2,
+ OverloadByProbability = 3,
+ OverloadByTxInFly = 4,
+ YellowChannels = 5,
+ DiskSpace = 6,
+ ChangesQueueOverflow = 7,
+ };
+
+ // Note: must be synchronized with ERejectReason above
+ enum class ERejectReasons : ui32 {
+ None = 0,
+ WrongState = 1 << 0,
+ Dropping = 1 << 1,
+ OverloadByLag = 1 << 2,
+ OverloadByProbability = 1 << 3,
+ OverloadByTxInFly = 1 << 4,
+ YellowChannels = 1 << 5,
+ DiskSpace = 1 << 6,
+ ChangesQueueOverflow = 1 << 7,
+ };
+
+ // Note: must be synchronized with both enums above
+ static constexpr int RejectReasonCount = 8;
+
+ inline ERejectReasons operator|(ERejectReasons a, ERejectReasons b) { return ERejectReasons(ui32(a) | ui32(b)); }
+ inline ERejectReasons operator&(ERejectReasons a, ERejectReasons b) { return ERejectReasons(ui32(a) & ui32(b)); }
+ inline ERejectReasons operator-(ERejectReasons a, ERejectReasons b) { return ERejectReasons(ui32(a) & ~ui32(b)); }
+ inline ERejectReasons& operator|=(ERejectReasons& a, ERejectReasons b) { return a = (a | b); }
+ inline ERejectReasons& operator&=(ERejectReasons& a, ERejectReasons b) { return a = (a & b); }
+ inline ERejectReasons& operator-=(ERejectReasons& a, ERejectReasons b) { return a = (a - b); }
+
+ inline int RejectReasonIndex(ERejectReason reason) {
+ return int(reason);
+ }
+
+ inline ERejectReasons MakeRejectReasons(ERejectReason reason) {
+ return ERejectReasons(1 << int(reason));
+ }
+
+ template<class TCallback>
+ inline void EnumerateRejectReasons(ERejectReasons reasons, TCallback&& callback) {
+ for (int i = 0; i < RejectReasonCount; ++i) {
+ if ((reasons & ERejectReasons(1 << i)) != ERejectReasons::None) {
+ callback(ERejectReason(i));
+ }
+ }
+ }
+
+} // namespace NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make
index 2d063ad542..9c191545b6 100644
--- a/ydb/core/tx/datashard/ya.make
+++ b/ydb/core/tx/datashard/ya.make
@@ -98,6 +98,7 @@ SRCS(
datashard_failpoints.h
datashard_dep_tracker.cpp
datashard_dep_tracker.h
+ datashard_overload.cpp
datashard_pipeline.cpp
datashard_pipeline.h
datashard_s3_downloads.cpp
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index 9898343636..09c3189206 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -103,13 +103,19 @@ private:
static constexpr TDuration DEFAULT_TIMEOUT = TDuration::Seconds(5*60);
+ struct TShardUploadRetryState {
+ // Contains basic request settings like table ids and columns
+ NKikimrTxDataShard::TEvUploadRowsRequest Headers;
+ TVector<std::pair<TString, TString>> Rows;
+ ui64 LastOverloadSeqNo = 0;
+ ui64 SentOverloadSeqNo = 0;
+ };
+
TActorId SchemeCache;
TActorId LeaderPipeCache;
TDuration Timeout;
TInstant StartTime;
TActorId TimeoutTimerActorId;
- bool WaitingResolveReply;
- bool Finished;
TAutoPtr<NSchemeCache::TSchemeCacheRequest> ResolvePartitionsResult;
std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> ResolveNamesResult;
@@ -119,6 +125,7 @@ private:
TVector<NScheme::TTypeInfo> ValueColumnTypes;
NSchemeCache::TSchemeCacheNavigate::EKind TableKind = NSchemeCache::TSchemeCacheNavigate::KindUnknown;
THashSet<TTabletId> ShardRepliesLeft;
+ THashMap<TTabletId, TShardUploadRetryState> ShardUploadRetryStates;
Ydb::StatusIds::StatusCode Status;
TString ErrorMessage;
std::shared_ptr<NYql::TIssues> Issues = std::make_shared<NYql::TIssues>();
@@ -171,20 +178,23 @@ public:
, SchemeCache(MakeSchemeCacheID())
, LeaderPipeCache(MakePipePeNodeCacheID(false))
, Timeout((timeout && timeout <= DEFAULT_TIMEOUT) ? timeout : DEFAULT_TIMEOUT)
- , WaitingResolveReply(false)
- , Finished(false)
, Status(Ydb::StatusIds::SUCCESS)
, DiskQuotaExceeded(diskQuotaExceeded)
{}
void Bootstrap(const NActors::TActorContext& ctx) {
StartTime = TAppData::TimeProvider->Now();
+ OnBeforeStart(ctx);
ResolveTable(GetTable(), ctx);
}
void Die(const NActors::TActorContext& ctx) override {
- Y_VERIFY(Finished);
- Y_VERIFY(!WaitingResolveReply);
+ for (auto& pr : ShardUploadRetryStates) {
+ if (pr.second.SentOverloadSeqNo) {
+ auto* msg = new TEvDataShard::TEvOverloadUnsubscribe(pr.second.SentOverloadSeqNo);
+ ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(msg, pr.first, false));
+ }
+ }
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(0));
if (TimeoutTimerActorId) {
ctx.Send(TimeoutTimerActorId, new TEvents::TEvPoisonPill());
@@ -244,6 +254,14 @@ protected:
}
private:
+ virtual void OnBeforeStart(const TActorContext&) {
+ // nothing by default
+ }
+
+ virtual void OnBeforePoison(const TActorContext&) {
+ // nothing by default
+ }
+
virtual TString GetDatabase() = 0;
virtual const TString& GetTable() = 0;
virtual const TVector<std::pair<TSerializedCellVec, TString>>& GetRows() const = 0;
@@ -269,10 +287,17 @@ private:
}
private:
+ void Handle(TEvents::TEvPoison::TPtr&, const TActorContext& ctx) {
+ OnBeforePoison(ctx);
+ Die(ctx);
+ }
+
+private:
STFUNC(StateWaitResolveTable) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
CFunc(TEvents::TSystem::Wakeup, HandleTimeout);
+ HFunc(TEvents::TEvPoison, Handle);
default:
break;
@@ -507,7 +532,6 @@ private:
new IEventHandle(ctx.SelfID, ctx.SelfID, new TEvents::TEvWakeup()));
TBase::Become(&TThis::StateWaitResolveTable);
- WaitingResolveReply = true;
}
void HandleTimeout(const TActorContext& ctx) {
@@ -518,11 +542,6 @@ private:
}
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
- WaitingResolveReply = false;
- if (Finished) {
- return Die(ctx);
- }
-
const NSchemeCache::TSchemeCacheNavigate& request = *ev->Get()->Request;
Y_VERIFY(request.ResultSet.size() == 1);
@@ -657,6 +676,7 @@ private:
switch (ev->GetTypeRewrite()) {
HFunc(NLongTxService::TEvLongTxService::TEvBeginTxResult, Handle);
CFunc(TEvents::TSystem::Wakeup, HandleTimeout);
+ HFunc(TEvents::TEvPoison, Handle);
}
}
@@ -773,6 +793,7 @@ private:
switch (ev->GetTypeRewrite()) {
HFunc(TEvents::TEvCompleted, HandleWriteBatchResult);
CFunc(TEvents::TSystem::Wakeup, HandleTimeout);
+ HFunc(TEvents::TEvPoison, Handle);
}
}
@@ -783,7 +804,6 @@ private:
for (const auto& issue: *Issues) {
RaiseIssue(issue);
}
- Finished = true;
return ReplyWithResult(status, ctx);
}
@@ -800,14 +820,13 @@ private:
switch (ev->GetTypeRewrite()) {
HFunc(NLongTxService::TEvLongTxService::TEvCommitTxResult, Handle);
CFunc(TEvents::TSystem::Wakeup, HandleTimeout);
+ HFunc(TEvents::TEvPoison, Handle);
}
}
void Handle(NLongTxService::TEvLongTxService::TEvCommitTxResult::TPtr& ev, const NActors::TActorContext& ctx) {
const auto* msg = ev->Get();
- Finished = true;
-
if (msg->Record.GetStatus() == Ydb::StatusIds::SUCCESS) {
// We are done with the transaction, forget it
LongTxId = NLongTxService::TLongTxId();
@@ -875,13 +894,13 @@ private:
ctx.Send(SchemeCache, resolveReq.Release());
TBase::Become(&TThis::StateWaitResolveShards);
- WaitingResolveReply = true;
}
STFUNC(StateWaitResolveShards) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle);
CFunc(TEvents::TSystem::Wakeup, HandleTimeout);
+ HFunc(TEvents::TEvPoison, Handle);
default:
break;
@@ -889,11 +908,6 @@ private:
}
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr &ev, const TActorContext &ctx) {
- WaitingResolveReply = false;
- if (Finished) {
- return Die(ctx);
- }
-
TEvTxProxySchemeCache::TEvResolveKeySetResult *msg = ev->Get();
ResolvePartitionsResult = msg->Request;
@@ -922,12 +936,32 @@ private:
MakeShardRequests(ctx);
}
+ void RetryShardRequest(ui64 shardId, TShardUploadRetryState* state, const TActorContext& ctx) {
+ Y_VERIFY(ShardRepliesLeft.contains(shardId));
+
+ auto ev = std::make_unique<TEvDataShard::TEvUploadRowsRequest>();
+ ev->Record = state->Headers;
+ for (const auto& pr : state->Rows) {
+ auto* row = ev->Record.AddRows();
+ row->SetKeyColumns(pr.first);
+ row->SetValueColumns(pr.second);
+ }
+
+ // Mark our request as supporting overload subscriptions
+ ui64 seqNo = ++state->LastOverloadSeqNo;
+ ev->Record.SetOverloadSubscribe(seqNo);
+ state->SentOverloadSeqNo = seqNo;
+
+ ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery);
+ }
+
void MakeShardRequests(const NActors::TActorContext& ctx) {
const auto* keyRange = GetKeyRange();
Y_VERIFY(!keyRange->GetPartitions().empty());
// Group rows by shard id
+ TVector<TShardUploadRetryState*> uploadRetryStates(keyRange->GetPartitions().size());
TVector<std::unique_ptr<TEvDataShard::TEvUploadRowsRequest>> shardRequests(keyRange->GetPartitions().size());
for (const auto& keyValue : GetRows()) {
// Find partition for the key
@@ -942,6 +976,12 @@ private:
size_t shardIdx = it - keyRange->GetPartitions().begin();
+ auto* retryState = uploadRetryStates[shardIdx];
+ if (!retryState) {
+ TTabletId shardId = it->ShardId;
+ retryState = uploadRetryStates[shardIdx] = &ShardUploadRetryStates[shardId];
+ }
+
TEvDataShard::TEvUploadRowsRequest* ev = shardRequests[shardIdx].get();
if (!ev) {
shardRequests[shardIdx].reset(new TEvDataShard::TEvUploadRowsRequest());
@@ -958,23 +998,37 @@ private:
if (WriteToTableShadow) {
ev->Record.SetWriteToTableShadow(true);
}
+ // Copy protobuf settings without rows
+ retryState->Headers = ev->Record;
}
+ TString keyColumns = keyValue.first.GetBuffer();
+ TString valueColumns = keyValue.second;
+
+ // We expect to keep a reference to existing key and value data here
+ uploadRetryStates[shardIdx]->Rows.emplace_back(keyColumns, valueColumns);
+
auto* row = ev->Record.AddRows();
- row->SetKeyColumns(keyValue.first.GetBuffer());
- row->SetValueColumns(keyValue.second);
+ row->SetKeyColumns(std::move(keyColumns));
+ row->SetValueColumns(std::move(valueColumns));
}
// Send requests to the shards
for (size_t idx = 0; idx < shardRequests.size(); ++idx) {
- if (!shardRequests[idx])
+ auto& ev = shardRequests[idx];
+ if (!ev)
continue;
TTabletId shardId = keyRange->GetPartitions()[idx].ShardId;
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, "Sending request to shards " << shardId);
- ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(shardRequests[idx].release(), shardId, true), IEventHandle::FlagTrackDelivery);
+ // Mark our request as supporting overload subscriptions
+ ui64 seqNo = ++uploadRetryStates[idx]->LastOverloadSeqNo;
+ ev->Record.SetOverloadSubscribe(seqNo);
+ uploadRetryStates[idx]->SentOverloadSeqNo = seqNo;
+
+ ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery);
auto res = ShardRepliesLeft.insert(shardId);
if (!res.second) {
@@ -1009,9 +1063,11 @@ private:
STFUNC(StateWaitResults) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvDataShard::TEvUploadRowsResponse, Handle);
+ HFunc(TEvDataShard::TEvOverloadReady, Handle);
HFunc(TEvents::TEvUndelivered, Handle);
HFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
CFunc(TEvents::TSystem::Wakeup, HandleTimeout);
+ HFunc(TEvents::TEvPoison, Handle);
default:
break;
@@ -1021,8 +1077,7 @@ private:
void Handle(TEvDataShard::TEvUploadRowsResponse::TPtr& ev, const NActors::TActorContext& ctx) {
const auto& shardResponse = ev->Get()->Record;
- // Notify the cache that we are done with the pipe
- ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(shardResponse.GetTabletID()));
+ ui64 shardId = shardResponse.GetTabletID();
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, "Upload rows: got "
<< NKikimrTxDataShard::TError::EKind_Name((NKikimrTxDataShard::TError::EKind)shardResponse.GetStatus())
@@ -1052,14 +1107,41 @@ private:
break;
};
+ if (auto* state = ShardUploadRetryStates.FindPtr(shardId)) {
+ if (!shardResponse.HasOverloadSubscribed()) {
+ // Shard doesn't support overload subscriptions for this request
+ state->SentOverloadSeqNo = 0;
+ } else if (shardResponse.GetOverloadSubscribed() == state->SentOverloadSeqNo) {
+ // Wait until shard notifies us it is possible to write again
+ LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, "Upload rows: subscribed to overload change at shard " << shardId);
+ return;
+ }
+ }
+
SetError(status, shardResponse.GetErrorDescription());
}
- ShardRepliesLeft.erase(shardResponse.GetTabletID());
+ // Notify the cache that we are done with the pipe
+ ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(shardId));
+
+ ShardRepliesLeft.erase(shardId);
+ ShardUploadRetryStates.erase(shardId);
ReplyIfDone(ctx);
}
+ void Handle(TEvDataShard::TEvOverloadReady::TPtr& ev, const TActorContext& ctx) {
+ auto& record = ev->Get()->Record;
+ ui64 shardId = record.GetTabletID();
+ ui64 seqNo = record.GetSeqNo();
+
+ if (auto* state = ShardUploadRetryStates.FindPtr(shardId)) {
+ if (state->SentOverloadSeqNo && state->SentOverloadSeqNo == seqNo && ShardRepliesLeft.contains(shardId)) {
+ RetryShardRequest(shardId, state, ctx);
+ }
+ }
+ }
+
void SetError(::Ydb::StatusIds::StatusCode status, const TString& message) {
if (Status != ::Ydb::StatusIds::SUCCESS) {
return;
@@ -1075,8 +1157,6 @@ private:
return;
}
- Finished = true;
-
if (!ErrorMessage.empty()) {
RaiseIssue(NYql::TIssue(ErrorMessage));
}
@@ -1105,9 +1185,7 @@ private:
RollbackLongTx(ctx);
}
- if (!WaitingResolveReply) {
- Die(ctx);
- }
+ Die(ctx);
}
};
diff --git a/ydb/services/ydb/ydb_bulk_upsert_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_ut.cpp
index 9b457a94d6..973c5c0a0a 100644
--- a/ydb/services/ydb/ydb_bulk_upsert_ut.cpp
+++ b/ydb/services/ydb/ydb_bulk_upsert_ut.cpp
@@ -1080,6 +1080,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsert) {
bool gotOverload = false;
bool gotSuccess = false;
TString blob(100, 'a');
+ TMonotonic start = TMonotonic::Now();
for (ui64 count = 0; (!gotOverload || !gotSuccess) && count < 100000; count++) {
TValueBuilder rows;
@@ -1115,9 +1116,13 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsert) {
gotOverload = true;
}
}
+ auto elapsed = TMonotonic::Now() - start;
+ if (elapsed >= TDuration::Seconds(5)) {
+ break;
+ }
}
- UNIT_ASSERT(gotOverload);
UNIT_ASSERT(gotSuccess);
+ UNIT_ASSERT(!gotOverload);
}
void CreateTestTable(NYdb::NTable::TTableClient& client) {