diff options
author | azevaykin <145343289+azevaykin@users.noreply.github.com> | 2024-03-01 09:03:37 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-01 09:03:37 +0300 |
commit | c30bbc485e8bbd901b58fd3c521c09560f402050 (patch) | |
tree | 071a54116a0af673bda99da1433d85f93c6b0e16 | |
parent | 627c051f40a04bc8fd93efb3fb710b74c03fae72 (diff) | |
download | ydb-c30bbc485e8bbd901b58fd3c521c09560f402050.tar.gz |
EvWrite OverloadSubscribe (#2341)
-rw-r--r-- | ydb/core/tx/datashard/check_write_unit.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__op_rows.cpp | 16 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__write.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 15 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_write.cpp | 15 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_write_operation.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_write_operation.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_write_unit.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execution_unit.cpp | 2 |
10 files changed, 52 insertions, 19 deletions
diff --git a/ydb/core/tx/datashard/check_write_unit.cpp b/ydb/core/tx/datashard/check_write_unit.cpp index c90be4d4e0b..b811f6322d1 100644 --- a/ydb/core/tx/datashard/check_write_unit.cpp +++ b/ydb/core/tx/datashard/check_write_unit.cpp @@ -64,9 +64,11 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op, DataShard.IncCounter(COUNTER_WRITE_OUT_OF_SPACE); - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, err); op->Abort(EExecutionUnitKind::FinishProposeWrite); + DataShard.SetOverloadSubscribed(writeOp->GetWriteTx()->GetOverloadSubscribe(), writeOp->GetRecipient(), op->GetTarget(), ERejectReasons::YellowChannels, writeOp->GetWriteResult()->Record); + LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err); return EExecutionStatus::Executed; @@ -88,9 +90,11 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op, DataShard.IncCounter(COUNTER_WRITE_OUT_OF_SPACE); - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, err); op->Abort(EExecutionUnitKind::FinishProposeWrite); + DataShard.SetOverloadSubscribed(writeOp->GetWriteTx()->GetOverloadSubscribe(), writeOp->GetRecipient(), op->GetTarget(), ERejectReasons::YellowChannels, writeOp->GetWriteResult()->Record); + LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err); return EExecutionStatus::Executed; diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 6857aacc8e1..752b7473d76 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -2742,6 +2742,11 @@ bool TDataShard::CheckDataTxRejectAndReply(const NEvents::TDataEvents::TEvWrite: LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectDescription); + if (status == NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED) { + std::optional<ui64> overloadSubscribe = ev->Get()->Record.HasOverloadSubscribe() ? ev->Get()->Record.GetOverloadSubscribe() : std::optional<ui64>{}; + SetOverloadSubscribed(overloadSubscribe, ev->Recipient, ev->Sender, rejectReasons, result->Record); + } + ctx.Send(ev->Sender, result.release()); IncCounter(COUNTER_WRITE_OVERLOADED); IncCounter(COUNTER_WRITE_COMPLETE); diff --git a/ydb/core/tx/datashard/datashard__op_rows.cpp b/ydb/core/tx/datashard/datashard__op_rows.cpp index 0372e649947..a43925e821a 100644 --- a/ydb/core/tx/datashard/datashard__op_rows.cpp +++ b/ydb/core/tx/datashard/datashard__op_rows.cpp @@ -172,20 +172,8 @@ static void Reject(TDataShard* self, TEvRequest& ev, const TString& txDesc, response->Record.SetTabletID(self->TabletID()); response->Record.SetErrorDescription(rejectDescription); - if (ev->Get()->Record.HasOverloadSubscribe() && self->HasPipeServer(ev->Recipient)) { - ui64 seqNo = ev->Get()->Record.GetOverloadSubscribe(); - auto allowed = ( - ERejectReasons::OverloadByProbability | - ERejectReasons::YellowChannels | - ERejectReasons::ChangesQueueOverflow); - if ((rejectReasons & allowed) != ERejectReasons::None && - (rejectReasons - allowed) == ERejectReasons::None) - { - if (self->AddOverloadSubscriber(ev->Recipient, ev->Sender, seqNo, rejectReasons)) { - response->Record.SetOverloadSubscribed(seqNo); - } - } - } + std::optional<ui64> overloadSubscribe = ev->Get()->Record.HasOverloadSubscribe() ? ev->Get()->Record.GetOverloadSubscribe() : std::optional<ui64>{}; + self->SetOverloadSubscribed(overloadSubscribe, ev->Recipient, ev->Sender, rejectReasons, response->Record); ctx.Send(ev->Sender, std::move(response)); } diff --git a/ydb/core/tx/datashard/datashard__write.cpp b/ydb/core/tx/datashard/datashard__write.cpp index d1659138f4a..c2dc558df16 100644 --- a/ydb/core/tx/datashard/datashard__write.cpp +++ b/ydb/core/tx/datashard/datashard__write.cpp @@ -289,6 +289,9 @@ NKikimrDataEvents::TEvWriteResult::EStatus NEvWrite::TConvertor::ConvertErrCode( return NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST; case NKikimrTxDataShard::TError_EKind_SCHEME_CHANGED: return NKikimrDataEvents::TEvWriteResult::STATUS_SCHEME_CHANGED; + case NKikimrTxDataShard::TError_EKind_OUT_OF_SPACE: + case NKikimrTxDataShard::TError_EKind_DISK_SPACE_EXHAUSTED: + return NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED; default: return NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR; } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 7627c1eb899..323c4706c6f 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1711,6 +1711,21 @@ public: void NotifyOverloadSubscribers(ERejectReason reason); void NotifyAllOverloadSubscribers(); + template <typename TResponseRecord> + void SetOverloadSubscribed(const std::optional<ui64>& overloadSubscribe, const TActorId& recipient, const TActorId& sender, const ERejectReasons rejectReasons, TResponseRecord& responseRecord) { + if (overloadSubscribe && HasPipeServer(recipient)) { + ui64 seqNo = overloadSubscribe.value(); + auto allowed = (ERejectReasons::OverloadByProbability | ERejectReasons::YellowChannels | ERejectReasons::ChangesQueueOverflow); + if ((rejectReasons & allowed) != ERejectReasons::None && + (rejectReasons - allowed) == ERejectReasons::None) + { + if (AddOverloadSubscriber(recipient, sender, seqNo, rejectReasons)) { + responseRecord.SetOverloadSubscribed(seqNo); + } + } + } + } + bool HasSharedBlobs() const; void CheckInitiateBorrowedPartsReturn(const TActorContext& ctx); void CheckStateChange(const TActorContext& ctx); diff --git a/ydb/core/tx/datashard/datashard_ut_write.cpp b/ydb/core/tx/datashard/datashard_ut_write.cpp index cc2030a9723..24c0f6812ae 100644 --- a/ydb/core/tx/datashard/datashard_ut_write.cpp +++ b/ydb/core/tx/datashard/datashard_ut_write.cpp @@ -322,7 +322,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) { } - Y_UNIT_TEST(ShouldRejectOnChangeQueueOverflow) { + Y_UNIT_TEST(RejectOnChangeQueueOverflow) { TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root").SetUseRealThreads(false).SetChangesQueueItemsLimit(1); @@ -362,7 +362,18 @@ Y_UNIT_TEST_SUITE(DataShardWrite) { Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, ++txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED); } - } // Y_UNIT_TEST + Cout << "========= Send immediate write + OverloadSubscribe, expecting overloaded =========\n"; + { + ui64 secNo = 55; + + auto request = MakeWriteRequest(++txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE, tableId, opts.Columns_, rowCount); + request->Record.SetOverloadSubscribe(secNo); + + auto writeResult = Write(runtime, sender, shard, std::move(request), NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED); + UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOverloadSubscribed(), secNo); + } + + } // Y_UNIT_TEST } // Y_UNIT_TEST_SUITE } // namespace NKikimr
\ No newline at end of file diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp index 363831ebf98..53be45c4846 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.cpp +++ b/ydb/core/tx/datashard/datashard_write_operation.cpp @@ -55,6 +55,8 @@ TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, ui64 globalTxId, TInstant LockNodeId = record.GetLockNodeId(); } + OverloadSubscribe = record.HasOverloadSubscribe() ? record.GetOverloadSubscribe() : std::optional<ui64>{}; + NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta; LOG_T("Parsing write transaction for " << globalTxId << " at " << TabletId << ", record: " << record.ShortDebugString()); @@ -307,6 +309,7 @@ TWriteOperation::TWriteOperation(const TBasicOpInfo& op, ui64 tabletId) TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr&& ev, TDataShard* self) : TWriteOperation(op, self->TabletID()) { + Recipient = ev->Recipient; SetTarget(ev->Sender); SetCookie(ev->Cookie); diff --git a/ydb/core/tx/datashard/datashard_write_operation.h b/ydb/core/tx/datashard/datashard_write_operation.h index 6e68900beb5..a61850f6c6d 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.h +++ b/ydb/core/tx/datashard/datashard_write_operation.h @@ -120,6 +120,7 @@ private: YDB_READONLY_DEF(std::vector<ui32>, ColumnIds); YDB_READONLY_DEF(TSerializedCellMatrix, Matrix); YDB_READONLY_DEF(TInstant, ReceivedAt); + YDB_READONLY_DEF(std::optional<ui64>, OverloadSubscribe); YDB_READONLY_DEF(ui64, TxSize); @@ -274,6 +275,7 @@ private: const ui64 TabletId; + YDB_READONLY_DEF(TActorId, Recipient); YDB_READONLY_DEF(ui64, ArtifactFlags); YDB_ACCESSOR_DEF(ui64, TxCacheUsage); YDB_ACCESSOR_DEF(ui64, ReleasedTxDataSize); diff --git a/ydb/core/tx/datashard/execute_write_unit.cpp b/ydb/core/tx/datashard/execute_write_unit.cpp index 85a30bbecb1..8925a241759 100644 --- a/ydb/core/tx/datashard/execute_write_unit.cpp +++ b/ydb/core/tx/datashard/execute_write_unit.cpp @@ -79,7 +79,7 @@ public: const TTableId fullTableId(DataShard.GetPathOwnerId(), tableId); const ui64 localTableId = DataShard.GetLocalTableId(fullTableId); if (localTableId == 0) { - writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Unknown table id " << tableId); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Unknown table id " << tableId); return; } const ui64 shadowTableId = DataShard.GetShadowTableId(fullTableId); diff --git a/ydb/core/tx/datashard/execution_unit.cpp b/ydb/core/tx/datashard/execution_unit.cpp index a55fc790ac4..bbe48c388aa 100644 --- a/ydb/core/tx/datashard/execution_unit.cpp +++ b/ydb/core/tx/datashard/execution_unit.cpp @@ -248,6 +248,8 @@ bool TExecutionUnit::CheckRejectDataTx(TOperation::TPtr op, const TActorContext& if (writeOp) { writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, err); + + DataShard.SetOverloadSubscribed(writeOp->GetWriteTx()->GetOverloadSubscribe(), writeOp->GetRecipient(), op->GetTarget(), ERejectReasons::ChangesQueueOverflow, writeOp->GetWriteResult()->Record); } else { BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED) ->AddError(NKikimrTxDataShard::TError::SHARD_IS_BLOCKED, err); |