aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <145343289+azevaykin@users.noreply.github.com>2024-03-01 09:03:37 +0300
committerGitHub <noreply@github.com>2024-03-01 09:03:37 +0300
commitc30bbc485e8bbd901b58fd3c521c09560f402050 (patch)
tree071a54116a0af673bda99da1433d85f93c6b0e16
parent627c051f40a04bc8fd93efb3fb710b74c03fae72 (diff)
downloadydb-c30bbc485e8bbd901b58fd3c521c09560f402050.tar.gz
EvWrite OverloadSubscribe (#2341)
-rw-r--r--ydb/core/tx/datashard/check_write_unit.cpp8
-rw-r--r--ydb/core/tx/datashard/datashard.cpp5
-rw-r--r--ydb/core/tx/datashard/datashard__op_rows.cpp16
-rw-r--r--ydb/core/tx/datashard/datashard__write.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h15
-rw-r--r--ydb/core/tx/datashard/datashard_ut_write.cpp15
-rw-r--r--ydb/core/tx/datashard/datashard_write_operation.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard_write_operation.h2
-rw-r--r--ydb/core/tx/datashard/execute_write_unit.cpp2
-rw-r--r--ydb/core/tx/datashard/execution_unit.cpp2
10 files changed, 52 insertions, 19 deletions
diff --git a/ydb/core/tx/datashard/check_write_unit.cpp b/ydb/core/tx/datashard/check_write_unit.cpp
index c90be4d4e0b..b811f6322d1 100644
--- a/ydb/core/tx/datashard/check_write_unit.cpp
+++ b/ydb/core/tx/datashard/check_write_unit.cpp
@@ -64,9 +64,11 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
DataShard.IncCounter(COUNTER_WRITE_OUT_OF_SPACE);
- writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
+ writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, err);
op->Abort(EExecutionUnitKind::FinishProposeWrite);
+ DataShard.SetOverloadSubscribed(writeOp->GetWriteTx()->GetOverloadSubscribe(), writeOp->GetRecipient(), op->GetTarget(), ERejectReasons::YellowChannels, writeOp->GetWriteResult()->Record);
+
LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
return EExecutionStatus::Executed;
@@ -88,9 +90,11 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
DataShard.IncCounter(COUNTER_WRITE_OUT_OF_SPACE);
- writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
+ writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, err);
op->Abort(EExecutionUnitKind::FinishProposeWrite);
+ DataShard.SetOverloadSubscribed(writeOp->GetWriteTx()->GetOverloadSubscribe(), writeOp->GetRecipient(), op->GetTarget(), ERejectReasons::YellowChannels, writeOp->GetWriteResult()->Record);
+
LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
return EExecutionStatus::Executed;
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 6857aacc8e1..752b7473d76 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -2742,6 +2742,11 @@ bool TDataShard::CheckDataTxRejectAndReply(const NEvents::TDataEvents::TEvWrite:
LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectDescription);
+ if (status == NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED) {
+ std::optional<ui64> overloadSubscribe = ev->Get()->Record.HasOverloadSubscribe() ? ev->Get()->Record.GetOverloadSubscribe() : std::optional<ui64>{};
+ SetOverloadSubscribed(overloadSubscribe, ev->Recipient, ev->Sender, rejectReasons, result->Record);
+ }
+
ctx.Send(ev->Sender, result.release());
IncCounter(COUNTER_WRITE_OVERLOADED);
IncCounter(COUNTER_WRITE_COMPLETE);
diff --git a/ydb/core/tx/datashard/datashard__op_rows.cpp b/ydb/core/tx/datashard/datashard__op_rows.cpp
index 0372e649947..a43925e821a 100644
--- a/ydb/core/tx/datashard/datashard__op_rows.cpp
+++ b/ydb/core/tx/datashard/datashard__op_rows.cpp
@@ -172,20 +172,8 @@ static void Reject(TDataShard* self, TEvRequest& ev, const TString& txDesc,
response->Record.SetTabletID(self->TabletID());
response->Record.SetErrorDescription(rejectDescription);
- if (ev->Get()->Record.HasOverloadSubscribe() && self->HasPipeServer(ev->Recipient)) {
- ui64 seqNo = ev->Get()->Record.GetOverloadSubscribe();
- auto allowed = (
- ERejectReasons::OverloadByProbability |
- ERejectReasons::YellowChannels |
- ERejectReasons::ChangesQueueOverflow);
- if ((rejectReasons & allowed) != ERejectReasons::None &&
- (rejectReasons - allowed) == ERejectReasons::None)
- {
- if (self->AddOverloadSubscriber(ev->Recipient, ev->Sender, seqNo, rejectReasons)) {
- response->Record.SetOverloadSubscribed(seqNo);
- }
- }
- }
+ std::optional<ui64> overloadSubscribe = ev->Get()->Record.HasOverloadSubscribe() ? ev->Get()->Record.GetOverloadSubscribe() : std::optional<ui64>{};
+ self->SetOverloadSubscribed(overloadSubscribe, ev->Recipient, ev->Sender, rejectReasons, response->Record);
ctx.Send(ev->Sender, std::move(response));
}
diff --git a/ydb/core/tx/datashard/datashard__write.cpp b/ydb/core/tx/datashard/datashard__write.cpp
index d1659138f4a..c2dc558df16 100644
--- a/ydb/core/tx/datashard/datashard__write.cpp
+++ b/ydb/core/tx/datashard/datashard__write.cpp
@@ -289,6 +289,9 @@ NKikimrDataEvents::TEvWriteResult::EStatus NEvWrite::TConvertor::ConvertErrCode(
return NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST;
case NKikimrTxDataShard::TError_EKind_SCHEME_CHANGED:
return NKikimrDataEvents::TEvWriteResult::STATUS_SCHEME_CHANGED;
+ case NKikimrTxDataShard::TError_EKind_OUT_OF_SPACE:
+ case NKikimrTxDataShard::TError_EKind_DISK_SPACE_EXHAUSTED:
+ return NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED;
default:
return NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR;
}
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 7627c1eb899..323c4706c6f 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1711,6 +1711,21 @@ public:
void NotifyOverloadSubscribers(ERejectReason reason);
void NotifyAllOverloadSubscribers();
+ template <typename TResponseRecord>
+ void SetOverloadSubscribed(const std::optional<ui64>& overloadSubscribe, const TActorId& recipient, const TActorId& sender, const ERejectReasons rejectReasons, TResponseRecord& responseRecord) {
+ if (overloadSubscribe && HasPipeServer(recipient)) {
+ ui64 seqNo = overloadSubscribe.value();
+ auto allowed = (ERejectReasons::OverloadByProbability | ERejectReasons::YellowChannels | ERejectReasons::ChangesQueueOverflow);
+ if ((rejectReasons & allowed) != ERejectReasons::None &&
+ (rejectReasons - allowed) == ERejectReasons::None)
+ {
+ if (AddOverloadSubscriber(recipient, sender, seqNo, rejectReasons)) {
+ responseRecord.SetOverloadSubscribed(seqNo);
+ }
+ }
+ }
+ }
+
bool HasSharedBlobs() const;
void CheckInitiateBorrowedPartsReturn(const TActorContext& ctx);
void CheckStateChange(const TActorContext& ctx);
diff --git a/ydb/core/tx/datashard/datashard_ut_write.cpp b/ydb/core/tx/datashard/datashard_ut_write.cpp
index cc2030a9723..24c0f6812ae 100644
--- a/ydb/core/tx/datashard/datashard_ut_write.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_write.cpp
@@ -322,7 +322,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
}
- Y_UNIT_TEST(ShouldRejectOnChangeQueueOverflow) {
+ Y_UNIT_TEST(RejectOnChangeQueueOverflow) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root").SetUseRealThreads(false).SetChangesQueueItemsLimit(1);
@@ -362,7 +362,18 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, ++txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED);
}
- } // Y_UNIT_TEST
+ Cout << "========= Send immediate write + OverloadSubscribe, expecting overloaded =========\n";
+ {
+ ui64 secNo = 55;
+
+ auto request = MakeWriteRequest(++txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE, tableId, opts.Columns_, rowCount);
+ request->Record.SetOverloadSubscribe(secNo);
+
+ auto writeResult = Write(runtime, sender, shard, std::move(request), NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED);
+ UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOverloadSubscribed(), secNo);
+ }
+
+ } // Y_UNIT_TEST
} // Y_UNIT_TEST_SUITE
} // namespace NKikimr \ No newline at end of file
diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp
index 363831ebf98..53be45c4846 100644
--- a/ydb/core/tx/datashard/datashard_write_operation.cpp
+++ b/ydb/core/tx/datashard/datashard_write_operation.cpp
@@ -55,6 +55,8 @@ TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, ui64 globalTxId, TInstant
LockNodeId = record.GetLockNodeId();
}
+ OverloadSubscribe = record.HasOverloadSubscribe() ? record.GetOverloadSubscribe() : std::optional<ui64>{};
+
NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;
LOG_T("Parsing write transaction for " << globalTxId << " at " << TabletId << ", record: " << record.ShortDebugString());
@@ -307,6 +309,7 @@ TWriteOperation::TWriteOperation(const TBasicOpInfo& op, ui64 tabletId)
TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr&& ev, TDataShard* self)
: TWriteOperation(op, self->TabletID())
{
+ Recipient = ev->Recipient;
SetTarget(ev->Sender);
SetCookie(ev->Cookie);
diff --git a/ydb/core/tx/datashard/datashard_write_operation.h b/ydb/core/tx/datashard/datashard_write_operation.h
index 6e68900beb5..a61850f6c6d 100644
--- a/ydb/core/tx/datashard/datashard_write_operation.h
+++ b/ydb/core/tx/datashard/datashard_write_operation.h
@@ -120,6 +120,7 @@ private:
YDB_READONLY_DEF(std::vector<ui32>, ColumnIds);
YDB_READONLY_DEF(TSerializedCellMatrix, Matrix);
YDB_READONLY_DEF(TInstant, ReceivedAt);
+ YDB_READONLY_DEF(std::optional<ui64>, OverloadSubscribe);
YDB_READONLY_DEF(ui64, TxSize);
@@ -274,6 +275,7 @@ private:
const ui64 TabletId;
+ YDB_READONLY_DEF(TActorId, Recipient);
YDB_READONLY_DEF(ui64, ArtifactFlags);
YDB_ACCESSOR_DEF(ui64, TxCacheUsage);
YDB_ACCESSOR_DEF(ui64, ReleasedTxDataSize);
diff --git a/ydb/core/tx/datashard/execute_write_unit.cpp b/ydb/core/tx/datashard/execute_write_unit.cpp
index 85a30bbecb1..8925a241759 100644
--- a/ydb/core/tx/datashard/execute_write_unit.cpp
+++ b/ydb/core/tx/datashard/execute_write_unit.cpp
@@ -79,7 +79,7 @@ public:
const TTableId fullTableId(DataShard.GetPathOwnerId(), tableId);
const ui64 localTableId = DataShard.GetLocalTableId(fullTableId);
if (localTableId == 0) {
- writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, TStringBuilder() << "Unknown table id " << tableId);
+ writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Unknown table id " << tableId);
return;
}
const ui64 shadowTableId = DataShard.GetShadowTableId(fullTableId);
diff --git a/ydb/core/tx/datashard/execution_unit.cpp b/ydb/core/tx/datashard/execution_unit.cpp
index a55fc790ac4..bbe48c388aa 100644
--- a/ydb/core/tx/datashard/execution_unit.cpp
+++ b/ydb/core/tx/datashard/execution_unit.cpp
@@ -248,6 +248,8 @@ bool TExecutionUnit::CheckRejectDataTx(TOperation::TPtr op, const TActorContext&
if (writeOp) {
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, err);
+
+ DataShard.SetOverloadSubscribed(writeOp->GetWriteTx()->GetOverloadSubscribe(), writeOp->GetRecipient(), op->GetTarget(), ERejectReasons::ChangesQueueOverflow, writeOp->GetWriteResult()->Record);
} else {
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED)
->AddError(NKikimrTxDataShard::TError::SHARD_IS_BLOCKED, err);