aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@yandex-team.com>2023-11-01 09:01:58 +0300
committerazevaykin <azevaykin@yandex-team.com>2023-11-01 09:33:40 +0300
commita932c9ba0041a678a2a52524d39510f8bca468c9 (patch)
tree5a64b8a73677f148a7bdb819622be119809f5d1f
parentb20aa0ca2385dad33e357649643a944c2f678557 (diff)
downloadydb-a932c9ba0041a678a2a52524d39510f8bca468c9.tar.gz
Throttle datashard log messages
-rw-r--r--library/cpp/actors/core/log.h7
-rw-r--r--ydb/core/tx/datashard/check_data_tx_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard__op_rows.cpp23
-rw-r--r--ydb/core/tx/datashard/datashard__propose_tx_base.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h17
-rw-r--r--ydb/core/tx/datashard/finish_propose_unit.cpp4
6 files changed, 41 insertions, 16 deletions
diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h
index 05e7a50107..a07540dc49 100644
--- a/library/cpp/actors/core/log.h
+++ b/library/cpp/actors/core/log.h
@@ -123,6 +123,13 @@
} \
} while (0) /**/
+#define LOG_LOG_S_THROTTLE(throttler, actorCtxOrSystem, priority, component, stream) \
+ do { \
+ if ((throttler).Kick()) { \
+ LOG_LOG_S(actorCtxOrSystem, priority, component, stream); \
+ } \
+ } while (0) /**/
+
#define TRACE_EVENT(component) \
const auto& currentTracer = component; \
if (ev->HasEvent()) { \
diff --git a/ydb/core/tx/datashard/check_data_tx_unit.cpp b/ydb/core/tx/datashard/check_data_tx_unit.cpp
index aa5d0ed5d8..20c4c25435 100644
--- a/ydb/core/tx/datashard/check_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/check_data_tx_unit.cpp
@@ -79,7 +79,7 @@ EExecutionStatus TCheckDataTxUnit::Execute(TOperation::TPtr op,
BuildResult(op)->AddError(NKikimrTxDataShard::TError::OUT_OF_SPACE, err);
op->Abort(EExecutionUnitKind::FinishPropose);
- LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
+ LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckDataTxUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
return EExecutionStatus::Executed;
}
@@ -217,7 +217,7 @@ EExecutionStatus TCheckDataTxUnit::Execute(TOperation::TPtr op,
BuildResult(op)->AddError(NKikimrTxDataShard::TError::OUT_OF_SPACE, err);
op->Abort(EExecutionUnitKind::FinishPropose);
- LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
+ LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckDataTxUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
return EExecutionStatus::Executed;
}
diff --git a/ydb/core/tx/datashard/datashard__op_rows.cpp b/ydb/core/tx/datashard/datashard__op_rows.cpp
index 8cc5396fac..5cee5d62ac 100644
--- a/ydb/core/tx/datashard/datashard__op_rows.cpp
+++ b/ydb/core/tx/datashard/datashard__op_rows.cpp
@@ -160,9 +160,10 @@ using TSetStatusFunc = void(*)(typename TEvResponse::ProtoRecordType&);
template <typename TEvResponse, typename TEvRequest>
static void Reject(TDataShard* self, TEvRequest& ev, const TString& txDesc,
ERejectReasons rejectReasons, const TString& rejectDescription,
- TSetStatusFunc<TEvResponse> setStatusFunc, const TActorContext& ctx)
+ TSetStatusFunc<TEvResponse> setStatusFunc, const TActorContext& ctx,
+ TDataShard::ELogThrottlerType logThrottlerType)
{
- LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, "Rejecting " << txDesc << " request on datashard"
+ LOG_LOG_S_THROTTLE(self->GetLogThrottler(logThrottlerType), ctx, NActors::NLog::PRI_NOTICE, NKikimrServices::TX_DATASHARD, "Rejecting " << txDesc << " request on datashard"
<< ": tablet# " << self->TabletID()
<< ", error# " << rejectDescription);
@@ -190,19 +191,19 @@ static void Reject(TDataShard* self, TEvRequest& ev, const TString& txDesc,
}
template <typename TEvResponse, typename TEvRequest>
-static bool MaybeReject(TDataShard* self, TEvRequest& ev, const TActorContext& ctx, const TString& txDesc, bool isWrite) {
+static bool MaybeReject(TDataShard* self, TEvRequest& ev, const TActorContext& ctx, const TString& txDesc, bool isWrite, TDataShard::ELogThrottlerType logThrottlerType) {
NKikimrTxDataShard::TEvProposeTransactionResult::EStatus rejectStatus;
ERejectReasons rejectReasons = ERejectReasons::None;
TString rejectDescription;
if (self->CheckDataTxReject(txDesc, ctx, rejectStatus, rejectReasons, rejectDescription)) {
- Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &WrongShardState, ctx);
+ Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &WrongShardState, ctx, logThrottlerType);
return true;
}
if (self->CheckChangesQueueOverflow()) {
rejectReasons = ERejectReasons::ChangesQueueOverflow;
rejectDescription = TStringBuilder() << "Change queue overflow at tablet " << self->TabletID();
- Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &Overloaded, ctx);
+ Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &Overloaded, ctx, logThrottlerType);
return true;
}
@@ -211,13 +212,13 @@ static bool MaybeReject(TDataShard* self, TEvRequest& ev, const TActorContext& c
self->IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);
rejectReasons = ERejectReasons::YellowChannels;
rejectDescription = TStringBuilder() << "Cannot perform writes: out of disk space at tablet " << self->TabletID();
- Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &OutOfSpace, ctx);
+ Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &OutOfSpace, ctx, logThrottlerType);
return true;
} else if (self->IsSubDomainOutOfSpace()) {
self->IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);
rejectReasons = ERejectReasons::DiskSpace;
rejectDescription = "Cannot perform writes: database is out of disk space";
- Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &DiskSpaceExhausted, ctx);
+ Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &DiskSpaceExhausted, ctx, logThrottlerType);
return true;
}
}
@@ -233,9 +234,9 @@ void TDataShard::Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TAct
}
if (IsReplicated()) {
return Reject<TEvDataShard::TEvUploadRowsResponse>(this, ev, "bulk upsert",
- ERejectReasons::WrongState, "Can't execute bulk upsert at replicated table", &ReadOnly, ctx);
+ ERejectReasons::WrongState, "Can't execute bulk upsert at replicated table", &ReadOnly, ctx, TDataShard::ELogThrottlerType::UploadRows_Reject);
}
- if (!MaybeReject<TEvDataShard::TEvUploadRowsResponse>(this, ev, ctx, "bulk upsert", true)) {
+ if (!MaybeReject<TEvDataShard::TEvUploadRowsResponse>(this, ev, ctx, "bulk upsert", true, TDataShard::ELogThrottlerType::UploadRows_Reject)) {
Executor()->Execute(new TTxUploadRows(this, ev), ctx);
} else {
IncCounter(COUNTER_BULK_UPSERT_OVERLOADED);
@@ -250,9 +251,9 @@ void TDataShard::Handle(TEvDataShard::TEvEraseRowsRequest::TPtr& ev, const TActo
}
if (IsReplicated()) {
return Reject<TEvDataShard::TEvEraseRowsResponse>(this, ev, "erase",
- ERejectReasons::WrongState, "Can't execute erase at replicated table", &ExecError, ctx);
+ ERejectReasons::WrongState, "Can't execute erase at replicated table", &ExecError, ctx, TDataShard::ELogThrottlerType::EraseRows_Reject);
}
- if (!MaybeReject<TEvDataShard::TEvEraseRowsResponse>(this, ev, ctx, "erase", false)) {
+ if (!MaybeReject<TEvDataShard::TEvEraseRowsResponse>(this, ev, ctx, "erase", false, TDataShard::ELogThrottlerType::EraseRows_Reject)) {
Executor()->Execute(new TTxEraseRows(this, ev), ctx);
} else {
IncCounter(COUNTER_ERASE_ROWS_OVERLOADED);
diff --git a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp
index 71021e888a..7fa10a45b4 100644
--- a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp
+++ b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp
@@ -63,7 +63,7 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa
<< Self->TabletID() << " status: " << result->GetStatus());
TString errors = result->GetError();
if (errors.Size()) {
- LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD,
+ LOG_LOG_S_THROTTLE(Self->GetLogThrottler(TDataShard::ELogThrottlerType::TxProposeTransactionBase_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD,
"Errors while proposing transaction txid " << TxId
<< " at tablet " << Self->TabletID() << " status: "
<< result->GetStatus() << " errors: " << errors);
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index accb120c79..6d890a3d15 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -2001,6 +2001,22 @@ public:
*/
void BreakWriteConflict(ui64 txId, absl::flat_hash_set<ui64>& volatileDependencies);
+ enum ELogThrottlerType {
+ CheckDataTxUnit_Execute = 0,
+ TxProposeTransactionBase_Execute,
+ FinishProposeUnit_CompleteRequest,
+ FinishProposeUnit_UpdateCounters,
+ UploadRows_Reject,
+ EraseRows_Reject,
+
+ LAST
+ };
+
+ TTrivialLogThrottler& GetLogThrottler(ELogThrottlerType type) {
+ Y_ABORT_UNLESS(type != ELogThrottlerType::LAST);
+ return LogThrottlers[type];
+ };
+
private:
///
class TLoanReturnTracker {
@@ -2795,6 +2811,7 @@ private:
bool ScheduledPlanPredictedTxs = false;
+ std::vector<TTrivialLogThrottler> LogThrottlers = {ELogThrottlerType::LAST, TDuration::Seconds(1)};
public:
auto& GetLockChangeRecords() {
return LockChangeRecords;
diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp
index 2738366568..5ecdeb81bf 100644
--- a/ydb/core/tx/datashard/finish_propose_unit.cpp
+++ b/ydb/core/tx/datashard/finish_propose_unit.cpp
@@ -165,7 +165,7 @@ void TFinishProposeUnit::CompleteRequest(TOperation::TPtr op,
TString errors = res->GetError();
if (errors.size()) {
- LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD,
+ LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::FinishProposeUnit_CompleteRequest), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD,
"Errors while proposing transaction txid " << op->GetTxId()
<< " at tablet " << DataShard.TabletID() << " status: "
<< res->GetStatus() << " errors: " << errors);
@@ -234,7 +234,7 @@ void TFinishProposeUnit::UpdateCounters(TOperation::TPtr op,
if (res->IsError()) {
DataShard.IncCounter(COUNTER_PREPARE_ERROR);
- LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD,
+ LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::FinishProposeUnit_UpdateCounters), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD,
"Prepare transaction failed. txid " << op->GetTxId()
<< " at tablet " << DataShard.TabletID() << " errors: "
<< PrintErrors(res->Record));