diff options
author | azevaykin <azevaykin@yandex-team.com> | 2023-11-01 09:01:58 +0300 |
---|---|---|
committer | azevaykin <azevaykin@yandex-team.com> | 2023-11-01 09:33:40 +0300 |
commit | a932c9ba0041a678a2a52524d39510f8bca468c9 (patch) | |
tree | 5a64b8a73677f148a7bdb819622be119809f5d1f | |
parent | b20aa0ca2385dad33e357649643a944c2f678557 (diff) | |
download | ydb-a932c9ba0041a678a2a52524d39510f8bca468c9.tar.gz |
Throttle datashard log messages
-rw-r--r-- | library/cpp/actors/core/log.h | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/check_data_tx_unit.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__op_rows.cpp | 23 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__propose_tx_base.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 17 | ||||
-rw-r--r-- | ydb/core/tx/datashard/finish_propose_unit.cpp | 4 |
6 files changed, 41 insertions, 16 deletions
diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h index 05e7a50107..a07540dc49 100644 --- a/library/cpp/actors/core/log.h +++ b/library/cpp/actors/core/log.h @@ -123,6 +123,13 @@ } \ } while (0) /**/ +#define LOG_LOG_S_THROTTLE(throttler, actorCtxOrSystem, priority, component, stream) \ + do { \ + if ((throttler).Kick()) { \ + LOG_LOG_S(actorCtxOrSystem, priority, component, stream); \ + } \ + } while (0) /**/ + #define TRACE_EVENT(component) \ const auto& currentTracer = component; \ if (ev->HasEvent()) { \ diff --git a/ydb/core/tx/datashard/check_data_tx_unit.cpp b/ydb/core/tx/datashard/check_data_tx_unit.cpp index aa5d0ed5d8..20c4c25435 100644 --- a/ydb/core/tx/datashard/check_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/check_data_tx_unit.cpp @@ -79,7 +79,7 @@ EExecutionStatus TCheckDataTxUnit::Execute(TOperation::TPtr op, BuildResult(op)->AddError(NKikimrTxDataShard::TError::OUT_OF_SPACE, err); op->Abort(EExecutionUnitKind::FinishPropose); - LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); + LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckDataTxUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err); return EExecutionStatus::Executed; } @@ -217,7 +217,7 @@ EExecutionStatus TCheckDataTxUnit::Execute(TOperation::TPtr op, BuildResult(op)->AddError(NKikimrTxDataShard::TError::OUT_OF_SPACE, err); op->Abort(EExecutionUnitKind::FinishPropose); - LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err); + LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckDataTxUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err); return EExecutionStatus::Executed; } diff --git a/ydb/core/tx/datashard/datashard__op_rows.cpp b/ydb/core/tx/datashard/datashard__op_rows.cpp index 8cc5396fac..5cee5d62ac 100644 --- a/ydb/core/tx/datashard/datashard__op_rows.cpp +++ b/ydb/core/tx/datashard/datashard__op_rows.cpp @@ -160,9 +160,10 @@ using TSetStatusFunc = void(*)(typename TEvResponse::ProtoRecordType&); template <typename TEvResponse, typename TEvRequest> static void Reject(TDataShard* self, TEvRequest& ev, const TString& txDesc, ERejectReasons rejectReasons, const TString& rejectDescription, - TSetStatusFunc<TEvResponse> setStatusFunc, const TActorContext& ctx) + TSetStatusFunc<TEvResponse> setStatusFunc, const TActorContext& ctx, + TDataShard::ELogThrottlerType logThrottlerType) { - LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, "Rejecting " << txDesc << " request on datashard" + LOG_LOG_S_THROTTLE(self->GetLogThrottler(logThrottlerType), ctx, NActors::NLog::PRI_NOTICE, NKikimrServices::TX_DATASHARD, "Rejecting " << txDesc << " request on datashard" << ": tablet# " << self->TabletID() << ", error# " << rejectDescription); @@ -190,19 +191,19 @@ static void Reject(TDataShard* self, TEvRequest& ev, const TString& txDesc, } template <typename TEvResponse, typename TEvRequest> -static bool MaybeReject(TDataShard* self, TEvRequest& ev, const TActorContext& ctx, const TString& txDesc, bool isWrite) { +static bool MaybeReject(TDataShard* self, TEvRequest& ev, const TActorContext& ctx, const TString& txDesc, bool isWrite, TDataShard::ELogThrottlerType logThrottlerType) { NKikimrTxDataShard::TEvProposeTransactionResult::EStatus rejectStatus; ERejectReasons rejectReasons = ERejectReasons::None; TString rejectDescription; if (self->CheckDataTxReject(txDesc, ctx, rejectStatus, rejectReasons, rejectDescription)) { - Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &WrongShardState, ctx); + Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &WrongShardState, ctx, logThrottlerType); return true; } if (self->CheckChangesQueueOverflow()) { rejectReasons = ERejectReasons::ChangesQueueOverflow; rejectDescription = TStringBuilder() << "Change queue overflow at tablet " << self->TabletID(); - Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &Overloaded, ctx); + Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &Overloaded, ctx, logThrottlerType); return true; } @@ -211,13 +212,13 @@ static bool MaybeReject(TDataShard* self, TEvRequest& ev, const TActorContext& c self->IncCounter(COUNTER_PREPARE_OUT_OF_SPACE); rejectReasons = ERejectReasons::YellowChannels; rejectDescription = TStringBuilder() << "Cannot perform writes: out of disk space at tablet " << self->TabletID(); - Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &OutOfSpace, ctx); + Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &OutOfSpace, ctx, logThrottlerType); return true; } else if (self->IsSubDomainOutOfSpace()) { self->IncCounter(COUNTER_PREPARE_OUT_OF_SPACE); rejectReasons = ERejectReasons::DiskSpace; rejectDescription = "Cannot perform writes: database is out of disk space"; - Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &DiskSpaceExhausted, ctx); + Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &DiskSpaceExhausted, ctx, logThrottlerType); return true; } } @@ -233,9 +234,9 @@ void TDataShard::Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TAct } if (IsReplicated()) { return Reject<TEvDataShard::TEvUploadRowsResponse>(this, ev, "bulk upsert", - ERejectReasons::WrongState, "Can't execute bulk upsert at replicated table", &ReadOnly, ctx); + ERejectReasons::WrongState, "Can't execute bulk upsert at replicated table", &ReadOnly, ctx, TDataShard::ELogThrottlerType::UploadRows_Reject); } - if (!MaybeReject<TEvDataShard::TEvUploadRowsResponse>(this, ev, ctx, "bulk upsert", true)) { + if (!MaybeReject<TEvDataShard::TEvUploadRowsResponse>(this, ev, ctx, "bulk upsert", true, TDataShard::ELogThrottlerType::UploadRows_Reject)) { Executor()->Execute(new TTxUploadRows(this, ev), ctx); } else { IncCounter(COUNTER_BULK_UPSERT_OVERLOADED); @@ -250,9 +251,9 @@ void TDataShard::Handle(TEvDataShard::TEvEraseRowsRequest::TPtr& ev, const TActo } if (IsReplicated()) { return Reject<TEvDataShard::TEvEraseRowsResponse>(this, ev, "erase", - ERejectReasons::WrongState, "Can't execute erase at replicated table", &ExecError, ctx); + ERejectReasons::WrongState, "Can't execute erase at replicated table", &ExecError, ctx, TDataShard::ELogThrottlerType::EraseRows_Reject); } - if (!MaybeReject<TEvDataShard::TEvEraseRowsResponse>(this, ev, ctx, "erase", false)) { + if (!MaybeReject<TEvDataShard::TEvEraseRowsResponse>(this, ev, ctx, "erase", false, TDataShard::ELogThrottlerType::EraseRows_Reject)) { Executor()->Execute(new TTxEraseRows(this, ev), ctx); } else { IncCounter(COUNTER_ERASE_ROWS_OVERLOADED); diff --git a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp index 71021e888a..7fa10a45b4 100644 --- a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp +++ b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp @@ -63,7 +63,7 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa << Self->TabletID() << " status: " << result->GetStatus()); TString errors = result->GetError(); if (errors.Size()) { - LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, + LOG_LOG_S_THROTTLE(Self->GetLogThrottler(TDataShard::ELogThrottlerType::TxProposeTransactionBase_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, "Errors while proposing transaction txid " << TxId << " at tablet " << Self->TabletID() << " status: " << result->GetStatus() << " errors: " << errors); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index accb120c79..6d890a3d15 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -2001,6 +2001,22 @@ public: */ void BreakWriteConflict(ui64 txId, absl::flat_hash_set<ui64>& volatileDependencies); + enum ELogThrottlerType { + CheckDataTxUnit_Execute = 0, + TxProposeTransactionBase_Execute, + FinishProposeUnit_CompleteRequest, + FinishProposeUnit_UpdateCounters, + UploadRows_Reject, + EraseRows_Reject, + + LAST + }; + + TTrivialLogThrottler& GetLogThrottler(ELogThrottlerType type) { + Y_ABORT_UNLESS(type != ELogThrottlerType::LAST); + return LogThrottlers[type]; + }; + private: /// class TLoanReturnTracker { @@ -2795,6 +2811,7 @@ private: bool ScheduledPlanPredictedTxs = false; + std::vector<TTrivialLogThrottler> LogThrottlers = {ELogThrottlerType::LAST, TDuration::Seconds(1)}; public: auto& GetLockChangeRecords() { return LockChangeRecords; diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp index 2738366568..5ecdeb81bf 100644 --- a/ydb/core/tx/datashard/finish_propose_unit.cpp +++ b/ydb/core/tx/datashard/finish_propose_unit.cpp @@ -165,7 +165,7 @@ void TFinishProposeUnit::CompleteRequest(TOperation::TPtr op, TString errors = res->GetError(); if (errors.size()) { - LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, + LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::FinishProposeUnit_CompleteRequest), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, "Errors while proposing transaction txid " << op->GetTxId() << " at tablet " << DataShard.TabletID() << " status: " << res->GetStatus() << " errors: " << errors); @@ -234,7 +234,7 @@ void TFinishProposeUnit::UpdateCounters(TOperation::TPtr op, if (res->IsError()) { DataShard.IncCounter(COUNTER_PREPARE_ERROR); - LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, + LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::FinishProposeUnit_UpdateCounters), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, "Prepare transaction failed. txid " << op->GetTxId() << " at tablet " << DataShard.TabletID() << " errors: " << PrintErrors(res->Record)); |