diff options
author | snaury <snaury@ydb.tech> | 2023-08-24 16:26:26 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-08-24 16:45:45 +0300 |
commit | a62381eb07390ecd888e550fe69845f46ba4f72b (patch) | |
tree | 5b4155cf1ab40815e98f94d74e0e2a29768f12fb | |
parent | 890fb1bb552aca548086b4a920cb89f1af9eb114 (diff) | |
download | ydb-a62381eb07390ecd888e550fe69845f46ba4f72b.tar.gz |
Subscribe to overload in bulk upsert and retry when it goes down KIKIMR-19021
22 files changed, 630 insertions, 79 deletions
diff --git a/ydb/core/base/tablet_pipe.h b/ydb/core/base/tablet_pipe.h index d92d4a32cc..42287e10d4 100644 --- a/ydb/core/base/tablet_pipe.h +++ b/ydb/core/base/tablet_pipe.h @@ -141,15 +141,18 @@ namespace NKikimr { }; struct TEvServerConnected : public TEventLocal<TEvServerConnected, EvServerConnected> { - TEvServerConnected(ui64 tabletId, const TActorId& clientId, const TActorId& serverId) + TEvServerConnected(ui64 tabletId, const TActorId& clientId, const TActorId& serverId, + const TActorId& interconnectSession = {}) : TabletId(tabletId) , ClientId(clientId) , ServerId(serverId) + , InterconnectSession(interconnectSession) {} const ui64 TabletId; const TActorId ClientId; const TActorId ServerId; + const TActorId InterconnectSession; }; struct TEvClientDestroyed : public TEventLocal<TEvClientDestroyed, EvClientDestroyed> { diff --git a/ydb/core/grpc_services/rpc_load_rows.cpp b/ydb/core/grpc_services/rpc_load_rows.cpp index 6f3f442d4d..7030002aff 100644 --- a/ydb/core/grpc_services/rpc_load_rows.cpp +++ b/ydb/core/grpc_services/rpc_load_rows.cpp @@ -122,6 +122,17 @@ public: {} private: + void OnBeforeStart(const TActorContext& ctx) override { + Request->SetFinishAction([selfId = ctx.SelfID, as = ctx.ExecutorThread.ActorSystem]() { + as->Send(selfId, new TEvents::TEvPoison); + }); + } + + void OnBeforePoison(const TActorContext&) override { + // Client is gone, but we need to "reply" anyway? + Request->SendResult(Ydb::StatusIds::CANCELLED, {}); + } + bool ReportCostInfoEnabled() const { return GetProtoRequest(Request.get())->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED; } @@ -274,6 +285,17 @@ public: {} private: + void OnBeforeStart(const TActorContext& ctx) override { + Request->SetFinishAction([selfId = ctx.SelfID, as = ctx.ExecutorThread.ActorSystem]() { + as->Send(selfId, new TEvents::TEvPoison); + }); + } + + void OnBeforePoison(const TActorContext&) override { + // Client is gone, but we need to "reply" anyway? + Request->SendResult(Ydb::StatusIds::CANCELLED, {}); + } + bool ReportCostInfoEnabled() const { return GetProtoRequest(Request.get())->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED; } diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index a176f797ac..948ea0d2e0 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -865,12 +865,14 @@ message TEvUploadRowsRequest { repeated TSerializedRowToLoad Rows = 3; optional uint64 CancelDeadlineMs = 4; // Wallclock timestamp (not duration) optional bool WriteToTableShadow = 5; + optional uint64 OverloadSubscribe = 7; } message TEvUploadRowsResponse { optional uint64 TabletID = 1; optional uint32 Status = 2; optional string ErrorDescription = 3; + optional uint64 OverloadSubscribed = 4; } message TEvReadColumnsRequest { @@ -1297,6 +1299,7 @@ message TEvEraseRowsRequest { repeated uint32 KeyColumnIds = 2; repeated bytes KeyColumns = 3; // SerilializedCellVector optional uint64 SchemaVersion = 5; + optional uint64 OverloadSubscribe = 6; oneof Condition { TExpirationCondition Expiration = 4; @@ -1328,6 +1331,7 @@ message TEvEraseRowsResponse { optional uint64 TabletID = 1; optional EStatus Status = 2; optional string ErrorDescription = 3; + optional uint64 OverloadSubscribed = 4; } message TEvConditionalEraseRowsRequest { @@ -1905,3 +1909,14 @@ message TTxVolatileDetails { // When true all preceding transactions are dependencies optional bool CommitOrdered = 8; } + +// Sent by datashard when some overload reason stopped being relevant +message TEvOverloadReady { + optional uint64 TabletID = 1; + optional uint64 SeqNo = 2; +} + +// Sent by overload subscribers to unsubscribe and free memory +message TEvOverloadUnsubscribe { + optional uint64 SeqNo = 1; +} diff --git a/ydb/core/tablet/tablet_pipe_server.cpp b/ydb/core/tablet/tablet_pipe_server.cpp index a1e829a30b..0fceaab97d 100644 --- a/ydb/core/tablet/tablet_pipe_server.cpp +++ b/ydb/core/tablet/tablet_pipe_server.cpp @@ -211,7 +211,7 @@ namespace NTabletPipe { void OnConnected(const TActorContext& ctx) { Become(&TThis::StateActive); SendToClient(ctx, new TEvTabletPipe::TEvConnectResult(NKikimrProto::OK, TabletId, ClientId, ctx.SelfID, Leader, Generation), IEventHandle::FlagTrackDelivery, ConnectCookie); - ctx.Send(RecipientId, new TEvTabletPipe::TEvServerConnected(TabletId, ClientId, ctx.SelfID)); + ctx.Send(RecipientId, new TEvTabletPipe::TEvServerConnected(TabletId, ClientId, ctx.SelfID, InterconnectSession)); Connected = true; } diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index 4851cde9e5..bc2c81c8e3 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -44,6 +44,8 @@ namespace NKikimr { namespace NTabletFlatExecutor { +static constexpr ui64 MaxTxInFly = 10000; + LWTRACE_USING(TABLET_FLAT_PROVIDER) struct TCompactionChangesCtx { @@ -226,7 +228,14 @@ void TExecutor::ReflectSchemeSettings() noexcept } void TExecutor::OnYellowChannels(TVector<ui32> yellowMoveChannels, TVector<ui32> yellowStopChannels) { + size_t oldMoveCount = Stats->YellowMoveChannels.size(); + size_t oldStopCount = Stats->YellowStopChannels.size(); CheckYellow(std::move(yellowMoveChannels), std::move(yellowStopChannels)); + if (oldMoveCount != Stats->YellowMoveChannels.size() || + oldStopCount != Stats->YellowStopChannels.size()) + { + Owner->OnYellowChannelsChanged(); + } } void TExecutor::CheckYellow(TVector<ui32> &&yellowMoveChannels, TVector<ui32> &&yellowStopChannels, bool terminal) { @@ -328,12 +337,20 @@ void TExecutor::Handle(TEvTablet::TEvCheckBlobstorageStatusResult::TPtr &ev) { } } + auto prevMoveChannels = Stats->YellowMoveChannels; + auto prevStopChannels = Stats->YellowStopChannels; Stats->YellowMoveChannels.clear(); Stats->YellowStopChannels.clear(); Stats->IsAnyChannelYellowMove = false; Stats->IsAnyChannelYellowStop = false; CheckYellow(std::move(lightYellowMoveChannels), std::move(yellowStopChannels)); + + if (prevMoveChannels != Stats->YellowMoveChannels || + prevStopChannels != Stats->YellowStopChannels) + { + Owner->OnYellowChannelsChanged(); + } } void TExecutor::ActivateFollower(const TActorContext &ctx) { @@ -1221,8 +1238,11 @@ void TExecutor::AdvancePendingPartSwitches() { } } - if (PendingPartSwitches.empty()) // could be border change + // could be border change + if (PendingPartSwitches.empty()) { PlanTransactionActivation(); + MaybeRelaxRejectProbability(); + } } bool TExecutor::ApplyReadyPartSwitches() { @@ -2341,6 +2361,8 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv ResourceMetrics->CPU.Increment(bookkeepingTimeuS + execTimeuS, Time->Now()); ResourceMetrics->TryUpdate(ctx); } + + MaybeRelaxRejectProbability(); } void TExecutor::MakeLogSnapshot() { @@ -2820,6 +2842,8 @@ void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext ActiveTransaction = false; PlanTransactionActivation(); + + MaybeRelaxRejectProbability(); } void TExecutor::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr &ev) { @@ -3375,6 +3399,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled) } Owner->CompactionComplete(tableId, OwnerCtx()); + MaybeRelaxRejectProbability(); ActiveTransaction = false; @@ -3535,8 +3560,10 @@ void TExecutor::UpdateCounters(const TActorContext &ctx) { float TExecutor::GetRejectProbability() const { // Limit number of in-flight TXs // TODO: make configurable - if (Stats->TxInFly > 10000) + if (Stats->TxInFly > MaxTxInFly) { + HadRejectProbabilityByTxInFly = true; return 1.0; + } // Followers do not control compaction so let's always allow to read the data from follower if (Stats->IsFollower) @@ -3561,9 +3588,28 @@ float TExecutor::GetRejectProbability() const { const float overloadFactor = CompactionLogic->GetOverloadFactor(); const float rejectProbability = calcProbability(overloadFactor); + if (rejectProbability > 0.0f) { + HadRejectProbabilityByOverload = true; + } + return rejectProbability; } +void TExecutor::MaybeRelaxRejectProbability() { + if (HadRejectProbabilityByTxInFly && Stats->TxInFly <= MaxTxInFly || + HadRejectProbabilityByOverload) + { + HadRejectProbabilityByTxInFly = false; + HadRejectProbabilityByOverload = false; + GetRejectProbability(); + if (!HadRejectProbabilityByTxInFly && + !HadRejectProbabilityByOverload) + { + Owner->OnRejectProbabilityRelaxed(); + } + } +} + TString TExecutor::BorrowSnapshot(ui32 table, const TTableSnapshotContext &snap, TRawVals from, TRawVals to, ui64 loaner) const { diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index 8906408299..ae49cb4710 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -460,6 +460,9 @@ class TExecutor bool HadFollowerAttached = false; bool NeedFollowerSnapshot = false; + mutable bool HadRejectProbabilityByTxInFly = false; + mutable bool HadRejectProbabilityByOverload = false; + THashMap<ui32, TIntrusivePtr<TBarrier>> InFlyCompactionGcBarriers; TDeque<THolder<TEvTablet::TFUpdateBody>> PostponedFollowerUpdates; THashMap<ui32, TVector<TIntrusivePtr<TBarrier>>> InFlySnapCollectionBarriers; @@ -694,6 +697,7 @@ public: ui64 TabletId() const { return Owner->TabletID(); } float GetRejectProbability() const override; + void MaybeRelaxRejectProbability(); TActorId GetLauncher() const { return Launcher; } }; diff --git a/ydb/core/tablet_flat/tablet_flat_executor.cpp b/ydb/core/tablet_flat/tablet_flat_executor.cpp index 258addbf7f..ebd5eca21d 100644 --- a/ydb/core/tablet_flat/tablet_flat_executor.cpp +++ b/ydb/core/tablet_flat/tablet_flat_executor.cpp @@ -37,6 +37,14 @@ namespace NFlatExecutorSetup { return true; } + void ITablet::OnYellowChannelsChanged() { + // nothing by default + } + + void ITablet::OnRejectProbabilityRelaxed() { + // nothing by default + } + void ITablet::UpdateTabletInfo(TIntrusivePtr<TTabletStorageInfo> info, const TActorId& launcherID) { if (info) TabletInfo = info; diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h index 9b611f793e..768857db77 100644 --- a/ydb/core/tablet_flat/tablet_flat_executor.h +++ b/ydb/core/tablet_flat/tablet_flat_executor.h @@ -455,6 +455,8 @@ namespace NFlatExecutorSetup { virtual void ScanComplete(NTable::EAbort status, TAutoPtr<IDestructable> prod, ui64 cookie, const TActorContext &ctx); virtual bool ReassignChannelsEnabled() const; + virtual void OnYellowChannelsChanged(); + virtual void OnRejectProbabilityRelaxed(); // memory usage excluding transactions and executor cache. virtual ui64 GetMemoryUsage() const { return 50 << 10; } diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt index fc6df7e8de..2a8ad90687 100644 --- a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt @@ -233,6 +233,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_distributed_erase.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_failpoints.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_dep_tracker.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_overload.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_pipeline.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_s3_downloads.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_s3_uploads.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt index 877fc708c7..0622573166 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt @@ -234,6 +234,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_distributed_erase.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_failpoints.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_dep_tracker.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_overload.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_pipeline.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_s3_downloads.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_s3_uploads.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt index 877fc708c7..0622573166 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt @@ -234,6 +234,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_distributed_erase.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_failpoints.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_dep_tracker.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_overload.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_pipeline.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_s3_downloads.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_s3_uploads.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt index 645ad6538e..4a4986e6f8 100644 --- a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt @@ -234,6 +234,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_distributed_erase.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_failpoints.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_dep_tracker.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_overload.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_pipeline.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_s3_downloads.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_s3_uploads.cpp diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 0f4dca03a2..dbe1709fd8 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -916,6 +916,8 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) { IncCounter(COUNTER_CHANGE_RECORDS_REMOVED); SetCounter(COUNTER_CHANGE_QUEUE_SIZE, ChangesQueue.size()); + + CheckChangesQueueNoOverflow(); } void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records) { @@ -2445,11 +2447,13 @@ void TDataShard::Handle(TEvDataShard::TEvStateChangedResult::TPtr& ev, const TAc bool TDataShard::CheckDataTxReject(const TString& opDescr, const TActorContext &ctx, NKikimrTxDataShard::TEvProposeTransactionResult::EStatus &rejectStatus, - TString &reason) + ERejectReasons &rejectReasons, + TString &rejectDescription) { bool reject = false; rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED; - TVector<TString> rejectReasons; + rejectReasons = ERejectReasons::None; + TVector<TString> rejectDescriptions; // In v0.5 reject all transactions on split Src after receiving EvSplit if (State == TShardState::SplitSrcWaitForNoTxInFlight || @@ -2457,37 +2461,43 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr, State == TShardState::SplitSrcSendingSnapshot || State == TShardState::SplitSrcWaitForPartitioningChanged) { reject = true; - rejectReasons.push_back(TStringBuilder() + rejectReasons |= ERejectReasons::WrongState; + rejectDescriptions.push_back(TStringBuilder() << "is in process of split opId " << SrcSplitOpId << " state " << DatashardStateName(State) << " (wrong shard state)"); } else if (State == TShardState::SplitDstReceivingSnapshot) { reject = true; - rejectReasons.push_back(TStringBuilder() + rejectReasons |= ERejectReasons::WrongState; + rejectDescriptions.push_back(TStringBuilder() << "is in process of split opId " << DstSplitOpId << " state " << DatashardStateName(State)); } else if (State == TShardState::PreOffline || State == TShardState::Offline) { reject = true; rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::ERROR; - rejectReasons.push_back("is in a pre/offline state assuming this is due to a finished split (wrong shard state)"); + rejectReasons |= ERejectReasons::WrongState; + rejectDescriptions.push_back("is in a pre/offline state assuming this is due to a finished split (wrong shard state)"); } else if (MvccSwitchState == TSwitchState::SWITCHING) { reject = true; - rejectReasons.push_back(TStringBuilder() + rejectReasons |= ERejectReasons::WrongState; + rejectDescriptions.push_back(TStringBuilder() << "is in process of mvcc state change" << " state " << DatashardStateName(State)); } if (Pipeline.HasDrop()) { reject = true; - rejectReasons.push_back("is in process of drop"); rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::ERROR; + rejectReasons |= ERejectReasons::Dropping; + rejectDescriptions.push_back("is in process of drop"); } ui64 txInfly = TxInFly(); TDuration lag = GetDataTxCompleteLag(); if (txInfly > 1 && lag > TDuration::MilliSeconds(MaxTxLagMilliseconds)) { reject = true; - rejectReasons.push_back(TStringBuilder() + rejectReasons |= ERejectReasons::OverloadByLag; + rejectDescriptions.push_back(TStringBuilder() << "lags behind, lag: " << lag << " in-flight tx count: " << txInfly); } @@ -2496,8 +2506,10 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr, if (!reject && rejectProbabilty > 0) { float rnd = AppData(ctx)->RandomProvider->GenRandReal2(); reject |= (rnd < rejectProbabilty); - if (reject) - rejectReasons.push_back("decided to reject due to given RejectProbability"); + if (reject) { + rejectReasons |= ERejectReasons::OverloadByProbability; + rejectDescriptions.push_back("decided to reject due to given RejectProbability"); + } } size_t totalInFly = @@ -2505,30 +2517,33 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr, + ProposeQueue.Size() + TxWaiting(); if (totalInFly > GetMaxTxInFly()) { reject = true; - rejectReasons.push_back("MaxTxInFly was exceeded"); + rejectReasons |= ERejectReasons::OverloadByTxInFly; + rejectDescriptions.push_back("MaxTxInFly was exceeded"); } if (!reject && Stopping) { reject = true; - rejectReasons.push_back("is restarting"); + rejectReasons |= ERejectReasons::WrongState; + rejectDescriptions.push_back("is restarting"); } if (!reject) { for (auto& it : TableInfos) { if (it.second->IsBackup) { reject = true; - rejectReasons.push_back("is a backup table"); rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::ERROR; + rejectReasons |= ERejectReasons::WrongState; + rejectDescriptions.push_back("is a backup table"); break; } } } if (reject) { - reason = TStringBuilder() + rejectDescription = TStringBuilder() << "Rejecting " << opDescr << " because datashard " << TabletID() << ": " - << JoinSeq("; ", rejectReasons); + << JoinSeq("; ", rejectDescriptions); } return reject; @@ -2550,8 +2565,9 @@ bool TDataShard::CheckDataTxRejectAndReply(TEvDataShard::TEvProposeTransaction* TString txDescr = TStringBuilder() << "data TxId " << msg->GetTxId(); NKikimrTxDataShard::TEvProposeTransactionResult::EStatus rejectStatus; - TString rejectReason; - bool reject = CheckDataTxReject(txDescr, ctx, rejectStatus, rejectReason); + ERejectReasons rejectReasons; + TString rejectDescription; + bool reject = CheckDataTxReject(txDescr, ctx, rejectStatus, rejectReasons, rejectDescription); if (reject) { LWTRACK(ProposeTransactionReject, msg->Orbit); @@ -2561,8 +2577,8 @@ bool TDataShard::CheckDataTxRejectAndReply(TEvDataShard::TEvProposeTransaction* msg->GetTxId(), rejectStatus)); - result->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, rejectReason); - LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectReason); + result->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, rejectDescription); + LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectDescription); ctx.Send(msg->GetSource(), result.Release()); IncCounter(COUNTER_PREPARE_OVERLOADED); @@ -3031,23 +3047,38 @@ void TDataShard::AbortExpectationsFromDeletedTablet(ui64 tabletId, THashMap<ui64 } void TDataShard::Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev, const TActorContext &ctx) { - Y_UNUSED(ev); Y_UNUSED(ctx); LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Server connected at " << (Executor()->GetStats().IsFollower ? "follower " : "leader ") << "tablet# " << ev->Get()->TabletId << ", clientId# " << ev->Get()->ClientId << ", serverId# " << ev->Get()->ServerId << ", sessionId# " << ev->InterconnectSession); + + auto res = PipeServers.emplace( + std::piecewise_construct, + std::forward_as_tuple(ev->Get()->ServerId), + std::forward_as_tuple()); + Y_VERIFY_DEBUG_S(res.second, + "Unexpected TEvServerConnected for " << ev->Get()->ServerId); + + res.first->second.InterconnectSession = ev->Get()->InterconnectSession; } void TDataShard::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr &ev, const TActorContext &ctx) { - Y_UNUSED(ev); Y_UNUSED(ctx); LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Server disconnected at " << (Executor()->GetStats().IsFollower ? "follower " : "leader ") << "tablet# " << ev->Get()->TabletId << ", clientId# " << ev->Get()->ClientId << ", serverId# " << ev->Get()->ServerId << ", sessionId# " << ev->InterconnectSession); + + auto it = PipeServers.find(ev->Get()->ServerId); + Y_VERIFY_DEBUG_S(it != PipeServers.end(), + "Unexpected TEvServerDisconnected for " << ev->Get()->ServerId); + + DiscardOverloadSubscribers(it->second); + + PipeServers.erase(it); } void TDataShard::Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext& ctx) { @@ -3161,6 +3192,17 @@ bool TDataShard::CheckChangesQueueOverflow() const { return ChangesQueue.size() >= sizeLimit || ChangesQueueBytes >= bytesLimit; } +void TDataShard::CheckChangesQueueNoOverflow() { + if (OverloadSubscribersByReason[RejectReasonIndex(ERejectReason::ChangesQueueOverflow)]) { + const auto* appData = AppData(); + const auto sizeLimit = appData->DataShardConfig.GetChangesQueueItemsLimit(); + const auto bytesLimit = appData->DataShardConfig.GetChangesQueueBytesLimit(); + if (ChangesQueue.size() < sizeLimit && ChangesQueueBytes < bytesLimit) { + NotifyOverloadSubscribers(ERejectReason::ChangesQueueOverflow); + } + } +} + void TDataShard::DoPeriodicTasks(const TActorContext &ctx) { UpdateLagCounters(ctx); UpdateTableStats(ctx); diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index ae17c3af96..695fd692b9 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -323,6 +323,9 @@ struct TEvDataShard { EvCdcStreamScanRequest, EvCdcStreamScanResponse, + EvOverloadReady, + EvOverloadUnsubscribe, + EvEnd }; @@ -880,6 +883,33 @@ struct TEvDataShard { } }; + struct TEvOverloadReady + : public TEventPB< + TEvOverloadReady, + NKikimrTxDataShard::TEvOverloadReady, + EvOverloadReady> + { + TEvOverloadReady() = default; + + explicit TEvOverloadReady(ui64 tabletId, ui64 seqNo) { + Record.SetTabletID(tabletId); + Record.SetSeqNo(seqNo); + } + }; + + struct TEvOverloadUnsubscribe + : public TEventPB< + TEvOverloadUnsubscribe, + NKikimrTxDataShard::TEvOverloadUnsubscribe, + EvOverloadUnsubscribe> + { + TEvOverloadUnsubscribe() = default; + + explicit TEvOverloadUnsubscribe(ui64 seqNo) { + Record.SetSeqNo(seqNo); + } + }; + // In most cases this event is local, thus users must // use Keys, Ranges and Program struct members instead of corresponding // protobuf members. In case of remote event these struct members will diff --git a/ydb/core/tx/datashard/datashard__op_rows.cpp b/ydb/core/tx/datashard/datashard__op_rows.cpp index 43857d2f98..c48239868d 100644 --- a/ydb/core/tx/datashard/datashard__op_rows.cpp +++ b/ydb/core/tx/datashard/datashard__op_rows.cpp @@ -158,45 +158,66 @@ template <typename TEvResponse> using TSetStatusFunc = void(*)(typename TEvResponse::ProtoRecordType&); template <typename TEvResponse, typename TEvRequest> -static void Reject(TDataShard* self, TEvRequest& ev, const TString& txDesc, const TString& reason, +static void Reject(TDataShard* self, TEvRequest& ev, const TString& txDesc, + ERejectReasons rejectReasons, const TString& rejectDescription, TSetStatusFunc<TEvResponse> setStatusFunc, const TActorContext& ctx) { LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, "Rejecting " << txDesc << " request on datashard" << ": tablet# " << self->TabletID() - << ", error# " << reason); + << ", error# " << rejectDescription); auto response = MakeHolder<TEvResponse>(); setStatusFunc(response->Record); response->Record.SetTabletID(self->TabletID()); - response->Record.SetErrorDescription(reason); + response->Record.SetErrorDescription(rejectDescription); + + if (ev->Get()->Record.HasOverloadSubscribe() && self->HasPipeServer(ev->Recipient)) { + ui64 seqNo = ev->Get()->Record.GetOverloadSubscribe(); + auto allowed = ( + ERejectReasons::OverloadByProbability | + ERejectReasons::YellowChannels | + ERejectReasons::ChangesQueueOverflow); + if ((rejectReasons & allowed) != ERejectReasons::None && + (rejectReasons - allowed) == ERejectReasons::None) + { + if (self->AddOverloadSubscriber(ev->Recipient, ev->Sender, seqNo, rejectReasons)) { + response->Record.SetOverloadSubscribed(seqNo); + } + } + } + ctx.Send(ev->Sender, std::move(response)); } template <typename TEvResponse, typename TEvRequest> static bool MaybeReject(TDataShard* self, TEvRequest& ev, const TActorContext& ctx, const TString& txDesc, bool isWrite) { NKikimrTxDataShard::TEvProposeTransactionResult::EStatus rejectStatus; - TString rejectReason; - if (self->CheckDataTxReject(txDesc, ctx, rejectStatus, rejectReason)) { - Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReason, &WrongShardState, ctx); + ERejectReasons rejectReasons = ERejectReasons::None; + TString rejectDescription; + if (self->CheckDataTxReject(txDesc, ctx, rejectStatus, rejectReasons, rejectDescription)) { + Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &WrongShardState, ctx); return true; } if (self->CheckChangesQueueOverflow()) { - rejectReason = TStringBuilder() << "Change queue overflow at tablet " << self->TabletID(); - Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReason, &Overloaded, ctx); + rejectReasons = ERejectReasons::ChangesQueueOverflow; + rejectDescription = TStringBuilder() << "Change queue overflow at tablet " << self->TabletID(); + Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &Overloaded, ctx); return true; } if (isWrite) { if (self->IsAnyChannelYellowStop()) { self->IncCounter(COUNTER_PREPARE_OUT_OF_SPACE); - rejectReason = TStringBuilder() << "Cannot perform writes: out of disk space at tablet " << self->TabletID(); - Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReason, &OutOfSpace, ctx); + rejectReasons = ERejectReasons::YellowChannels; + rejectDescription = TStringBuilder() << "Cannot perform writes: out of disk space at tablet " << self->TabletID(); + Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &OutOfSpace, ctx); return true; } else if (self->IsSubDomainOutOfSpace()) { self->IncCounter(COUNTER_PREPARE_OUT_OF_SPACE); - rejectReason = "Cannot perform writes: database is out of disk space"; - Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReason, &DiskSpaceExhausted, ctx); + rejectReasons = ERejectReasons::DiskSpace; + rejectDescription = "Cannot perform writes: database is out of disk space"; + Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &DiskSpaceExhausted, ctx); return true; } } @@ -212,7 +233,7 @@ void TDataShard::Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TAct } if (IsReplicated()) { return Reject<TEvDataShard::TEvUploadRowsResponse>(this, ev, "bulk upsert", - "Can't execute bulk upsert at replicated table", &ReadOnly, ctx); + ERejectReasons::WrongState, "Can't execute bulk upsert at replicated table", &ReadOnly, ctx); } if (!MaybeReject<TEvDataShard::TEvUploadRowsResponse>(this, ev, ctx, "bulk upsert", true)) { Executor()->Execute(new TTxUploadRows(this, ev), ctx); @@ -229,7 +250,7 @@ void TDataShard::Handle(TEvDataShard::TEvEraseRowsRequest::TPtr& ev, const TActo } if (IsReplicated()) { return Reject<TEvDataShard::TEvEraseRowsResponse>(this, ev, "erase", - "Can't execute erase at replicated table", &ExecError, ctx); + ERejectReasons::WrongState, "Can't execute erase at replicated table", &ExecError, ctx); } if (!MaybeReject<TEvDataShard::TEvEraseRowsResponse>(this, ev, ctx, "erase", false)) { Executor()->Execute(new TTxEraseRows(this, ev), ctx); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 533d59bc55..2ea68071a8 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -22,6 +22,7 @@ #include "read_iterator.h" #include "volatile_tx.h" #include "conflicts_cache.h" +#include "reject_reason.h" #include <ydb/core/tx/time_cast/time_cast.h> #include <ydb/core/tx/tx_processing.h> @@ -1216,6 +1217,7 @@ class TDataShard void HandleSafe(TEvDataShard::TEvKqpScan::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvEraseRowsRequest::TPtr& ev, const TActorContext& ctx); + void Handle(TEvDataShard::TEvOverloadUnsubscribe::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvConditionalEraseRowsRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvConditionalEraseRowsRegistered::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ctx); @@ -1510,7 +1512,8 @@ public: bool CheckDataTxReject(const TString& opDescr, const TActorContext &ctx, NKikimrTxDataShard::TEvProposeTransactionResult::EStatus& rejectStatus, - TString &reason); + ERejectReasons& rejectReasons, + TString& rejectDescription); bool CheckDataTxRejectAndReply(TEvDataShard::TEvProposeTransaction* msg, const TActorContext& ctx); TSysLocks& SysLocksTable() { return SysLocks; } @@ -1665,8 +1668,14 @@ public: void ScanComplete(NTable::EAbort status, TAutoPtr<IDestructable> prod, ui64 cookie, const TActorContext &ctx) override; bool ReassignChannelsEnabled() const override; + void OnYellowChannelsChanged() override; + void OnRejectProbabilityRelaxed() override; ui64 GetMemoryUsage() const override; + bool HasPipeServer(const TActorId& pipeServerId); + bool AddOverloadSubscriber(const TActorId& pipeServerId, const TActorId& actorId, ui64 seqNo, ERejectReasons reasons); + void NotifyOverloadSubscribers(ERejectReason reason); + bool HasSharedBlobs() const; void CheckInitiateBorrowedPartsReturn(const TActorContext& ctx); void CheckStateChange(const TActorContext& ctx); @@ -1921,6 +1930,7 @@ public: bool CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr& ev) const; bool CheckChangesQueueOverflow() const; + void CheckChangesQueueNoOverflow(); void DeleteReadIterator(TReadIteratorsMap::iterator it); void CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx); @@ -2302,6 +2312,31 @@ private: TTxProgressIdempotentScalarScheduleQueue<TEvPrivate::TEvCleanupTransaction> CleanupQueue; TTxProgressQueue<ui64, TNoOpDestroy, TEvPrivate::TEvProgressResendReadSet> ResendReadSetQueue; + struct TPipeServerInfoOverloadSubscribersTag {}; + + struct TOverloadSubscriber { + ui64 SeqNo = 0; + ERejectReasons Reasons = ERejectReasons::None; + }; + + struct TPipeServerInfo + : public TIntrusiveListItem<TPipeServerInfo, TPipeServerInfoOverloadSubscribersTag> + { + TPipeServerInfo() = default; + + TActorId InterconnectSession; + THashMap<TActorId, TOverloadSubscriber> OverloadSubscribers; + }; + + using TPipeServers = THashMap<TActorId, TPipeServerInfo>; + using TPipeServersWithOverloadSubscribers = TIntrusiveList<TPipeServerInfo, TPipeServerInfoOverloadSubscribersTag>; + + TPipeServers PipeServers; + TPipeServersWithOverloadSubscribers PipeServersWithOverloadSubscribers; + size_t OverloadSubscribersByReason[RejectReasonCount] = { 0 }; + + void DiscardOverloadSubscribers(TPipeServerInfo& pipeServer); + class TProposeQueue : private TTxProgressIdempotentScalarQueue<TEvPrivate::TEvDelayedProposeTransaction> { public: struct TItem : public TMoveOnly { @@ -2831,6 +2866,7 @@ protected: HFunc(TEvDataShard::TEvKqpScan, Handle); HFunc(TEvDataShard::TEvUploadRowsRequest, Handle); HFunc(TEvDataShard::TEvEraseRowsRequest, Handle); + HFunc(TEvDataShard::TEvOverloadUnsubscribe, Handle); HFunc(TEvDataShard::TEvConditionalEraseRowsRequest, Handle); HFunc(TEvPrivate::TEvConditionalEraseRowsRegistered, Handle); HFunc(TEvDataShard::TEvRead, Handle); diff --git a/ydb/core/tx/datashard/datashard_overload.cpp b/ydb/core/tx/datashard/datashard_overload.cpp new file mode 100644 index 0000000000..4724a93a95 --- /dev/null +++ b/ydb/core/tx/datashard/datashard_overload.cpp @@ -0,0 +1,106 @@ +#include "datashard_impl.h" + +namespace NKikimr::NDataShard { + +void TDataShard::OnYellowChannelsChanged() { + if (!IsAnyChannelYellowStop()) { + NotifyOverloadSubscribers(ERejectReason::YellowChannels); + } +} + +void TDataShard::OnRejectProbabilityRelaxed() { + NotifyOverloadSubscribers(ERejectReason::OverloadByProbability); +} + +bool TDataShard::HasPipeServer(const TActorId& pipeServerId) { + return PipeServers.contains(pipeServerId); +} + +bool TDataShard::AddOverloadSubscriber(const TActorId& pipeServerId, const TActorId& actorId, ui64 seqNo, ERejectReasons reasons) { + auto it = PipeServers.find(pipeServerId); + if (it != PipeServers.end()) { + bool wasEmpty = it->second.OverloadSubscribers.empty(); + auto& entry = it->second.OverloadSubscribers[actorId]; + if (entry.SeqNo <= seqNo) { + entry.SeqNo = seqNo; + // Increment counter for every new reason + EnumerateRejectReasons(reasons - entry.Reasons, [&](ERejectReason reason) { + OverloadSubscribersByReason[RejectReasonIndex(reason)]++; + }); + entry.Reasons |= reasons; + } + if (wasEmpty) { + PipeServersWithOverloadSubscribers.PushBack(&it->second); + } + return true; + } + return false; +} + +void TDataShard::NotifyOverloadSubscribers(ERejectReason reason) { + if (OverloadSubscribersByReason[RejectReasonIndex(reason)] == 0) { + // Avoid spending time when we know it is pointless + return; + } + ERejectReasons reasons = MakeRejectReasons(reason); + + TPipeServersWithOverloadSubscribers left; + while (!PipeServersWithOverloadSubscribers.Empty()) { + TPipeServerInfo* pipeServer = PipeServersWithOverloadSubscribers.PopFront(); + for (auto it = pipeServer->OverloadSubscribers.begin(); it != pipeServer->OverloadSubscribers.end();) { + auto current = it++; + const TActorId& actorId = current->first; + TOverloadSubscriber& entry = current->second; + if ((entry.Reasons & reasons) != reasons) { + // Reasons don't match + continue; + } + entry.Reasons -= reasons; + OverloadSubscribersByReason[RejectReasonIndex(reason)]--; + if (entry.Reasons == ERejectReasons::None) { + SendViaSession( + pipeServer->InterconnectSession, + actorId, + SelfId(), + new TEvDataShard::TEvOverloadReady(TabletID(), entry.SeqNo)); + pipeServer->OverloadSubscribers.erase(current); + } + } + if (!pipeServer->OverloadSubscribers.empty()) { + left.PushBack(pipeServer); + } + } + PipeServersWithOverloadSubscribers.Append(left); +} + +void TDataShard::DiscardOverloadSubscribers(TPipeServerInfo& pipeServer) { + for (auto it = pipeServer.OverloadSubscribers.begin(); it != pipeServer.OverloadSubscribers.end(); ++it) { + TOverloadSubscriber& entry = it->second; + EnumerateRejectReasons(entry.Reasons, [&](ERejectReason reason) { + OverloadSubscribersByReason[RejectReasonIndex(reason)]--; + }); + } + pipeServer.OverloadSubscribers.clear(); + PipeServersWithOverloadSubscribers.Remove(&pipeServer); +} + +void TDataShard::Handle(TEvDataShard::TEvOverloadUnsubscribe::TPtr& ev, const TActorContext& ctx) { + Y_UNUSED(ctx); + auto* msg = ev->Get(); + if (auto* pipeServer = PipeServers.FindPtr(ev->Recipient)) { + auto it = pipeServer->OverloadSubscribers.find(ev->Sender); + if (it != pipeServer->OverloadSubscribers.end()) { + if (it->second.SeqNo == msg->Record.GetSeqNo()) { + EnumerateRejectReasons(it->second.Reasons, [&](ERejectReason reason) { + OverloadSubscribersByReason[RejectReasonIndex(reason)]--; + }); + pipeServer->OverloadSubscribers.erase(it); + if (pipeServer->OverloadSubscribers.empty()) { + PipeServersWithOverloadSubscribers.Remove(pipeServer); + } + } + } + } +} + +} // namespae NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp b/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp index 49e6af420d..d81812c13c 100644 --- a/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp +++ b/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp @@ -744,7 +744,7 @@ Y_UNIT_TEST_SUITE(TTxDataShardUploadRows) { DoUploadTestRows(server, sender, "/Root/table-1", Ydb::Type::UINT32, Ydb::StatusIds::GENERIC_ERROR); } - Y_UNIT_TEST(ShouldRejectOnChangeQueueOverflow) { + void DoShouldRejectOnChangeQueueOverflow(bool overloadSubscribe) { TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") @@ -771,16 +771,86 @@ Y_UNIT_TEST_SUITE(TTxDataShardUploadRows) { }) ); - runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { - if (ev->GetTypeRewrite() == NDataShard::TEvChangeExchange::EvEnqueueRecords) { - return TTestActorRuntime::EEventAction::DROP; + TVector<ui32> observedUploadStatus; + TVector<THolder<IEventHandle>> blockedEnqueueRecords; + auto prevObserverFunc = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case NDataShard::TEvChangeExchange::EvEnqueueRecords: + blockedEnqueueRecords.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + case TEvDataShard::TEvUploadRowsRequest::EventType: + if (!overloadSubscribe) { + ev->Get<TEvDataShard::TEvUploadRowsRequest>()->Record.ClearOverloadSubscribe(); + } + break; + case TEvDataShard::TEvUploadRowsResponse::EventType: + observedUploadStatus.push_back(ev->Get<TEvDataShard::TEvUploadRowsResponse>()->Record.GetStatus()); + break; } return TTestActorRuntime::EEventAction::PROCESS; }); DoUploadTestRows(server, sender, "/Root/table-1", Ydb::Type::UINT32, Ydb::StatusIds::SUCCESS); - DoUploadTestRows(server, sender, "/Root/table-1", Ydb::Type::UINT32, Ydb::StatusIds::OVERLOADED); + + UNIT_ASSERT(!observedUploadStatus.empty()); + UNIT_ASSERT(observedUploadStatus.back() == NKikimrTxDataShard::TError::OK); + observedUploadStatus.clear(); + + if (!overloadSubscribe) { + DoUploadTestRows(server, sender, "/Root/table-1", Ydb::Type::UINT32, Ydb::StatusIds::OVERLOADED); + return; + } + + TVector<THolder<TEvTxUserProxy::TEvUploadRowsResponse>> responses; + auto responseAwaiter = runtime.Register(new TLambdaActor([&](TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvTxUserProxy::TEvUploadRowsResponse::EventType: { + auto msg = ev->Release<TEvTxUserProxy::TEvUploadRowsResponse>(); + responses.push_back(std::move(msg)); + break; + } + } + })); + + DoStartUploadTestRows(server, responseAwaiter, "/Root/table-1", Ydb::Type::UINT32); + + runtime.SimulateSleep(TDuration::Seconds(1)); + UNIT_ASSERT(!blockedEnqueueRecords.empty()); + UNIT_ASSERT(!observedUploadStatus.empty()); + UNIT_ASSERT(observedUploadStatus.back() == NKikimrTxDataShard::TError::SHARD_IS_BLOCKED); + observedUploadStatus.clear(); + UNIT_ASSERT(responses.empty()); + + runtime.SetObserverFunc(prevObserverFunc); + for (auto& ev : blockedEnqueueRecords) { + runtime.Send(ev.Release(), 0, true); + } + blockedEnqueueRecords.clear(); + + auto waitFor = [&](const auto& condition, const TString& description) { + if (!condition()) { + Cerr << "... waiting for " << description << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + runtime.DispatchEvents(options); + UNIT_ASSERT_C(condition(), "... failed to wait for " << description); + } + }; + + waitFor([&]{ return !responses.empty(); }, "upload rows response"); + + UNIT_ASSERT_VALUES_EQUAL(responses.back()->Status, Ydb::StatusIds::SUCCESS); + } + + Y_UNIT_TEST(ShouldRejectOnChangeQueueOverflow) { + DoShouldRejectOnChangeQueueOverflow(false); + } + + Y_UNIT_TEST(ShouldRejectOnChangeQueueOverflowAndRetry) { + DoShouldRejectOnChangeQueueOverflow(true); } } diff --git a/ydb/core/tx/datashard/reject_reason.h b/ydb/core/tx/datashard/reject_reason.h new file mode 100644 index 0000000000..32e6cd25cd --- /dev/null +++ b/ydb/core/tx/datashard/reject_reason.h @@ -0,0 +1,58 @@ +#pragma once +#include "defs.h" + +namespace NKikimr::NDataShard { + + // Note: must be synchronized with ERejectReasons below + enum class ERejectReason : int { + WrongState = 0, + Dropping = 1, + OverloadByLag = 2, + OverloadByProbability = 3, + OverloadByTxInFly = 4, + YellowChannels = 5, + DiskSpace = 6, + ChangesQueueOverflow = 7, + }; + + // Note: must be synchronized with ERejectReason above + enum class ERejectReasons : ui32 { + None = 0, + WrongState = 1 << 0, + Dropping = 1 << 1, + OverloadByLag = 1 << 2, + OverloadByProbability = 1 << 3, + OverloadByTxInFly = 1 << 4, + YellowChannels = 1 << 5, + DiskSpace = 1 << 6, + ChangesQueueOverflow = 1 << 7, + }; + + // Note: must be synchronized with both enums above + static constexpr int RejectReasonCount = 8; + + inline ERejectReasons operator|(ERejectReasons a, ERejectReasons b) { return ERejectReasons(ui32(a) | ui32(b)); } + inline ERejectReasons operator&(ERejectReasons a, ERejectReasons b) { return ERejectReasons(ui32(a) & ui32(b)); } + inline ERejectReasons operator-(ERejectReasons a, ERejectReasons b) { return ERejectReasons(ui32(a) & ~ui32(b)); } + inline ERejectReasons& operator|=(ERejectReasons& a, ERejectReasons b) { return a = (a | b); } + inline ERejectReasons& operator&=(ERejectReasons& a, ERejectReasons b) { return a = (a & b); } + inline ERejectReasons& operator-=(ERejectReasons& a, ERejectReasons b) { return a = (a - b); } + + inline int RejectReasonIndex(ERejectReason reason) { + return int(reason); + } + + inline ERejectReasons MakeRejectReasons(ERejectReason reason) { + return ERejectReasons(1 << int(reason)); + } + + template<class TCallback> + inline void EnumerateRejectReasons(ERejectReasons reasons, TCallback&& callback) { + for (int i = 0; i < RejectReasonCount; ++i) { + if ((reasons & ERejectReasons(1 << i)) != ERejectReasons::None) { + callback(ERejectReason(i)); + } + } + } + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index 2d063ad542..9c191545b6 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -98,6 +98,7 @@ SRCS( datashard_failpoints.h datashard_dep_tracker.cpp datashard_dep_tracker.h + datashard_overload.cpp datashard_pipeline.cpp datashard_pipeline.h datashard_s3_downloads.cpp diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 9898343636..09c3189206 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -103,13 +103,19 @@ private: static constexpr TDuration DEFAULT_TIMEOUT = TDuration::Seconds(5*60); + struct TShardUploadRetryState { + // Contains basic request settings like table ids and columns + NKikimrTxDataShard::TEvUploadRowsRequest Headers; + TVector<std::pair<TString, TString>> Rows; + ui64 LastOverloadSeqNo = 0; + ui64 SentOverloadSeqNo = 0; + }; + TActorId SchemeCache; TActorId LeaderPipeCache; TDuration Timeout; TInstant StartTime; TActorId TimeoutTimerActorId; - bool WaitingResolveReply; - bool Finished; TAutoPtr<NSchemeCache::TSchemeCacheRequest> ResolvePartitionsResult; std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> ResolveNamesResult; @@ -119,6 +125,7 @@ private: TVector<NScheme::TTypeInfo> ValueColumnTypes; NSchemeCache::TSchemeCacheNavigate::EKind TableKind = NSchemeCache::TSchemeCacheNavigate::KindUnknown; THashSet<TTabletId> ShardRepliesLeft; + THashMap<TTabletId, TShardUploadRetryState> ShardUploadRetryStates; Ydb::StatusIds::StatusCode Status; TString ErrorMessage; std::shared_ptr<NYql::TIssues> Issues = std::make_shared<NYql::TIssues>(); @@ -171,20 +178,23 @@ public: , SchemeCache(MakeSchemeCacheID()) , LeaderPipeCache(MakePipePeNodeCacheID(false)) , Timeout((timeout && timeout <= DEFAULT_TIMEOUT) ? timeout : DEFAULT_TIMEOUT) - , WaitingResolveReply(false) - , Finished(false) , Status(Ydb::StatusIds::SUCCESS) , DiskQuotaExceeded(diskQuotaExceeded) {} void Bootstrap(const NActors::TActorContext& ctx) { StartTime = TAppData::TimeProvider->Now(); + OnBeforeStart(ctx); ResolveTable(GetTable(), ctx); } void Die(const NActors::TActorContext& ctx) override { - Y_VERIFY(Finished); - Y_VERIFY(!WaitingResolveReply); + for (auto& pr : ShardUploadRetryStates) { + if (pr.second.SentOverloadSeqNo) { + auto* msg = new TEvDataShard::TEvOverloadUnsubscribe(pr.second.SentOverloadSeqNo); + ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(msg, pr.first, false)); + } + } ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(0)); if (TimeoutTimerActorId) { ctx.Send(TimeoutTimerActorId, new TEvents::TEvPoisonPill()); @@ -244,6 +254,14 @@ protected: } private: + virtual void OnBeforeStart(const TActorContext&) { + // nothing by default + } + + virtual void OnBeforePoison(const TActorContext&) { + // nothing by default + } + virtual TString GetDatabase() = 0; virtual const TString& GetTable() = 0; virtual const TVector<std::pair<TSerializedCellVec, TString>>& GetRows() const = 0; @@ -269,10 +287,17 @@ private: } private: + void Handle(TEvents::TEvPoison::TPtr&, const TActorContext& ctx) { + OnBeforePoison(ctx); + Die(ctx); + } + +private: STFUNC(StateWaitResolveTable) { switch (ev->GetTypeRewrite()) { HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); CFunc(TEvents::TSystem::Wakeup, HandleTimeout); + HFunc(TEvents::TEvPoison, Handle); default: break; @@ -507,7 +532,6 @@ private: new IEventHandle(ctx.SelfID, ctx.SelfID, new TEvents::TEvWakeup())); TBase::Become(&TThis::StateWaitResolveTable); - WaitingResolveReply = true; } void HandleTimeout(const TActorContext& ctx) { @@ -518,11 +542,6 @@ private: } void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { - WaitingResolveReply = false; - if (Finished) { - return Die(ctx); - } - const NSchemeCache::TSchemeCacheNavigate& request = *ev->Get()->Request; Y_VERIFY(request.ResultSet.size() == 1); @@ -657,6 +676,7 @@ private: switch (ev->GetTypeRewrite()) { HFunc(NLongTxService::TEvLongTxService::TEvBeginTxResult, Handle); CFunc(TEvents::TSystem::Wakeup, HandleTimeout); + HFunc(TEvents::TEvPoison, Handle); } } @@ -773,6 +793,7 @@ private: switch (ev->GetTypeRewrite()) { HFunc(TEvents::TEvCompleted, HandleWriteBatchResult); CFunc(TEvents::TSystem::Wakeup, HandleTimeout); + HFunc(TEvents::TEvPoison, Handle); } } @@ -783,7 +804,6 @@ private: for (const auto& issue: *Issues) { RaiseIssue(issue); } - Finished = true; return ReplyWithResult(status, ctx); } @@ -800,14 +820,13 @@ private: switch (ev->GetTypeRewrite()) { HFunc(NLongTxService::TEvLongTxService::TEvCommitTxResult, Handle); CFunc(TEvents::TSystem::Wakeup, HandleTimeout); + HFunc(TEvents::TEvPoison, Handle); } } void Handle(NLongTxService::TEvLongTxService::TEvCommitTxResult::TPtr& ev, const NActors::TActorContext& ctx) { const auto* msg = ev->Get(); - Finished = true; - if (msg->Record.GetStatus() == Ydb::StatusIds::SUCCESS) { // We are done with the transaction, forget it LongTxId = NLongTxService::TLongTxId(); @@ -875,13 +894,13 @@ private: ctx.Send(SchemeCache, resolveReq.Release()); TBase::Become(&TThis::StateWaitResolveShards); - WaitingResolveReply = true; } STFUNC(StateWaitResolveShards) { switch (ev->GetTypeRewrite()) { HFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle); CFunc(TEvents::TSystem::Wakeup, HandleTimeout); + HFunc(TEvents::TEvPoison, Handle); default: break; @@ -889,11 +908,6 @@ private: } void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr &ev, const TActorContext &ctx) { - WaitingResolveReply = false; - if (Finished) { - return Die(ctx); - } - TEvTxProxySchemeCache::TEvResolveKeySetResult *msg = ev->Get(); ResolvePartitionsResult = msg->Request; @@ -922,12 +936,32 @@ private: MakeShardRequests(ctx); } + void RetryShardRequest(ui64 shardId, TShardUploadRetryState* state, const TActorContext& ctx) { + Y_VERIFY(ShardRepliesLeft.contains(shardId)); + + auto ev = std::make_unique<TEvDataShard::TEvUploadRowsRequest>(); + ev->Record = state->Headers; + for (const auto& pr : state->Rows) { + auto* row = ev->Record.AddRows(); + row->SetKeyColumns(pr.first); + row->SetValueColumns(pr.second); + } + + // Mark our request as supporting overload subscriptions + ui64 seqNo = ++state->LastOverloadSeqNo; + ev->Record.SetOverloadSubscribe(seqNo); + state->SentOverloadSeqNo = seqNo; + + ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery); + } + void MakeShardRequests(const NActors::TActorContext& ctx) { const auto* keyRange = GetKeyRange(); Y_VERIFY(!keyRange->GetPartitions().empty()); // Group rows by shard id + TVector<TShardUploadRetryState*> uploadRetryStates(keyRange->GetPartitions().size()); TVector<std::unique_ptr<TEvDataShard::TEvUploadRowsRequest>> shardRequests(keyRange->GetPartitions().size()); for (const auto& keyValue : GetRows()) { // Find partition for the key @@ -942,6 +976,12 @@ private: size_t shardIdx = it - keyRange->GetPartitions().begin(); + auto* retryState = uploadRetryStates[shardIdx]; + if (!retryState) { + TTabletId shardId = it->ShardId; + retryState = uploadRetryStates[shardIdx] = &ShardUploadRetryStates[shardId]; + } + TEvDataShard::TEvUploadRowsRequest* ev = shardRequests[shardIdx].get(); if (!ev) { shardRequests[shardIdx].reset(new TEvDataShard::TEvUploadRowsRequest()); @@ -958,23 +998,37 @@ private: if (WriteToTableShadow) { ev->Record.SetWriteToTableShadow(true); } + // Copy protobuf settings without rows + retryState->Headers = ev->Record; } + TString keyColumns = keyValue.first.GetBuffer(); + TString valueColumns = keyValue.second; + + // We expect to keep a reference to existing key and value data here + uploadRetryStates[shardIdx]->Rows.emplace_back(keyColumns, valueColumns); + auto* row = ev->Record.AddRows(); - row->SetKeyColumns(keyValue.first.GetBuffer()); - row->SetValueColumns(keyValue.second); + row->SetKeyColumns(std::move(keyColumns)); + row->SetValueColumns(std::move(valueColumns)); } // Send requests to the shards for (size_t idx = 0; idx < shardRequests.size(); ++idx) { - if (!shardRequests[idx]) + auto& ev = shardRequests[idx]; + if (!ev) continue; TTabletId shardId = keyRange->GetPartitions()[idx].ShardId; LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, "Sending request to shards " << shardId); - ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(shardRequests[idx].release(), shardId, true), IEventHandle::FlagTrackDelivery); + // Mark our request as supporting overload subscriptions + ui64 seqNo = ++uploadRetryStates[idx]->LastOverloadSeqNo; + ev->Record.SetOverloadSubscribe(seqNo); + uploadRetryStates[idx]->SentOverloadSeqNo = seqNo; + + ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery); auto res = ShardRepliesLeft.insert(shardId); if (!res.second) { @@ -1009,9 +1063,11 @@ private: STFUNC(StateWaitResults) { switch (ev->GetTypeRewrite()) { HFunc(TEvDataShard::TEvUploadRowsResponse, Handle); + HFunc(TEvDataShard::TEvOverloadReady, Handle); HFunc(TEvents::TEvUndelivered, Handle); HFunc(TEvPipeCache::TEvDeliveryProblem, Handle); CFunc(TEvents::TSystem::Wakeup, HandleTimeout); + HFunc(TEvents::TEvPoison, Handle); default: break; @@ -1021,8 +1077,7 @@ private: void Handle(TEvDataShard::TEvUploadRowsResponse::TPtr& ev, const NActors::TActorContext& ctx) { const auto& shardResponse = ev->Get()->Record; - // Notify the cache that we are done with the pipe - ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(shardResponse.GetTabletID())); + ui64 shardId = shardResponse.GetTabletID(); LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, "Upload rows: got " << NKikimrTxDataShard::TError::EKind_Name((NKikimrTxDataShard::TError::EKind)shardResponse.GetStatus()) @@ -1052,14 +1107,41 @@ private: break; }; + if (auto* state = ShardUploadRetryStates.FindPtr(shardId)) { + if (!shardResponse.HasOverloadSubscribed()) { + // Shard doesn't support overload subscriptions for this request + state->SentOverloadSeqNo = 0; + } else if (shardResponse.GetOverloadSubscribed() == state->SentOverloadSeqNo) { + // Wait until shard notifies us it is possible to write again + LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, "Upload rows: subscribed to overload change at shard " << shardId); + return; + } + } + SetError(status, shardResponse.GetErrorDescription()); } - ShardRepliesLeft.erase(shardResponse.GetTabletID()); + // Notify the cache that we are done with the pipe + ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(shardId)); + + ShardRepliesLeft.erase(shardId); + ShardUploadRetryStates.erase(shardId); ReplyIfDone(ctx); } + void Handle(TEvDataShard::TEvOverloadReady::TPtr& ev, const TActorContext& ctx) { + auto& record = ev->Get()->Record; + ui64 shardId = record.GetTabletID(); + ui64 seqNo = record.GetSeqNo(); + + if (auto* state = ShardUploadRetryStates.FindPtr(shardId)) { + if (state->SentOverloadSeqNo && state->SentOverloadSeqNo == seqNo && ShardRepliesLeft.contains(shardId)) { + RetryShardRequest(shardId, state, ctx); + } + } + } + void SetError(::Ydb::StatusIds::StatusCode status, const TString& message) { if (Status != ::Ydb::StatusIds::SUCCESS) { return; @@ -1075,8 +1157,6 @@ private: return; } - Finished = true; - if (!ErrorMessage.empty()) { RaiseIssue(NYql::TIssue(ErrorMessage)); } @@ -1105,9 +1185,7 @@ private: RollbackLongTx(ctx); } - if (!WaitingResolveReply) { - Die(ctx); - } + Die(ctx); } }; diff --git a/ydb/services/ydb/ydb_bulk_upsert_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_ut.cpp index 9b457a94d6..973c5c0a0a 100644 --- a/ydb/services/ydb/ydb_bulk_upsert_ut.cpp +++ b/ydb/services/ydb/ydb_bulk_upsert_ut.cpp @@ -1080,6 +1080,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsert) { bool gotOverload = false; bool gotSuccess = false; TString blob(100, 'a'); + TMonotonic start = TMonotonic::Now(); for (ui64 count = 0; (!gotOverload || !gotSuccess) && count < 100000; count++) { TValueBuilder rows; @@ -1115,9 +1116,13 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsert) { gotOverload = true; } } + auto elapsed = TMonotonic::Now() - start; + if (elapsed >= TDuration::Seconds(5)) { + break; + } } - UNIT_ASSERT(gotOverload); UNIT_ASSERT(gotSuccess); + UNIT_ASSERT(!gotOverload); } void CreateTestTable(NYdb::NTable::TTableClient& client) { |