diff options
author | ivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com> | 2024-02-14 10:14:35 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-02-14 10:14:35 +0300 |
commit | 188a9d421bf7ebb3211b22b2a27aa808d8f59237 (patch) | |
tree | 4f96e584e1845087e6031bbf6105e14814b96cb9 | |
parent | 698fdc293a3686db35f2d1cb9ad2f202c7c051cb (diff) | |
download | ydb-188a9d421bf7ebb3211b22b2a27aa808d8f59237.tar.gz |
fixes for TxWriteIndex volume control, policies of exceptions on searching, eviction tasks for portions usage (#1894)
35 files changed, 317 insertions, 144 deletions
diff --git a/ydb/core/formats/arrow/program.cpp b/ydb/core/formats/arrow/program.cpp index 02d3c43d825..a05ea71da08 100644 --- a/ydb/core/formats/arrow/program.cpp +++ b/ydb/core/formats/arrow/program.cpp @@ -139,18 +139,20 @@ public: TKernelFunction(const TFunctionPtr kernelsFunction, arrow::compute::ExecContext* ctx) : TBase(ctx) , Function(kernelsFunction) - {} + { + AFL_VERIFY(Function); + } arrow::Result<arrow::Datum> Call(const TAssignObject& assign, const TDatumBatch& batch) const override { auto arguments = TBase::BuildArgs(batch, assign.GetArguments()); if (!arguments) { return arrow::Status::Invalid("Error parsing args."); } - try { +// try { return Function->Execute(*arguments, assign.GetOptions(), TBase::Ctx); - } catch (const std::exception& ex) { - return arrow::Status::ExecutionError(ex.what()); - } +// } catch (const std::exception& ex) { +// return arrow::Status::ExecutionError(ex.what()); +// } } }; diff --git a/ydb/core/tx/columnshard/blobs_action/tier/write.cpp b/ydb/core/tx/columnshard/blobs_action/tier/write.cpp index fa54aa75fad..9620f8d37e5 100644 --- a/ydb/core/tx/columnshard/blobs_action/tier/write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/tier/write.cpp @@ -24,7 +24,6 @@ void TWriteAction::DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, for (auto&& i : GetBlobsForWrite()) { dbBlobs.RemoveTierDraftBlobId(GetStorageId(), i.first); dbBlobs.AddTierBlobToDelete(GetStorageId(), i.first); - GCInfo->MutableBlobsToDelete().emplace_back(i.first); } } } @@ -41,4 +40,12 @@ NKikimr::NOlap::TUnifiedBlobId TWriteAction::AllocateNextBlobId(const TString& d return TUnifiedBlobId(Max<ui32>(), TLogoBlobID(TabletId, now.GetValue() >> 32, now.GetValue() & Max<ui32>(), TLogoBlobID::MaxChannel, data.size(), AtomicIncrement(Counter) % TLogoBlobID::MaxCookie, 1)); } +void TWriteAction::DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool blobsWroteSuccessfully) { + if (!blobsWroteSuccessfully) { + for (auto&& i : GetBlobsForWrite()) { + GCInfo->MutableBlobsToDelete().emplace_back(i.first); + } + } +} + } diff --git a/ydb/core/tx/columnshard/blobs_action/tier/write.h b/ydb/core/tx/columnshard/blobs_action/tier/write.h index 9d65f971ce6..425cabc27d5 100644 --- a/ydb/core/tx/columnshard/blobs_action/tier/write.h +++ b/ydb/core/tx/columnshard/blobs_action/tier/write.h @@ -27,9 +27,7 @@ protected: } virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool blobsWroteSuccessfully) override; - virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool /*blobsWroteSuccessfully*/) override { - - } + virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool blobsWroteSuccessfully) override; public: virtual bool NeedDraftTransaction() const override { return true; diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp index 4d080a3f930..1701f5ae6a5 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp @@ -111,7 +111,7 @@ void TTxWrite::Complete(const TActorContext& ctx) { for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) { const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteData()->GetWriteMeta(); ctx.Send(writeMeta.GetSource(), Results[i].release()); - Self->CSCounters.OnWriteTxComplete((now - writeMeta.GetWriteStartInstant()).MilliSeconds()); + Self->CSCounters.OnWriteTxComplete(now - writeMeta.GetWriteStartInstant()); Self->CSCounters.OnSuccessWriteResponse(); } diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp index cd48f921478..429696ac995 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp @@ -85,4 +85,11 @@ TTxWriteIndex::TTxWriteIndex(TColumnShard* self, TEvPrivate::TEvWriteIndex::TPtr Y_ABORT_UNLESS(Ev && Ev->Get()->IndexChanges); } +void TTxWriteIndex::Describe(IOutputStream& out) const noexcept { + out << TypeName(*this); + if (Ev->Get()->IndexChanges) { + out << ": " << Ev->Get()->IndexChanges->DebugString(); + } +} + } diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.h b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.h index 09922121d15..5c20461c402 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.h +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.h @@ -16,6 +16,7 @@ public: bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; void Complete(const TActorContext& ctx) override; TTxType GetTxType() const override { return TXTYPE_WRITE_INDEX; } + virtual void Describe(IOutputStream& out) const noexcept override; private: diff --git a/ydb/core/tx/columnshard/blobs_reader/actor.cpp b/ydb/core/tx/columnshard/blobs_reader/actor.cpp index 6112a82e263..0ba5d774d2a 100644 --- a/ydb/core/tx/columnshard/blobs_reader/actor.cpp +++ b/ydb/core/tx/columnshard/blobs_reader/actor.cpp @@ -15,7 +15,7 @@ void TActor::Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev) bool aborted = false; if (event.Status != NKikimrProto::EReplyStatus::OK) { WaitingBlobsCount.Sub(Task->GetWaitingCount()); - if (!Task->AddError(event.BlobRange, IBlobsReadingAction::TErrorStatus::Fail(event.Status, "cannot get blob"))) { + if (!Task->AddError(event.BlobRange, IBlobsReadingAction::TErrorStatus::Fail(event.Status, "cannot get blob: " + event.Data.substr(1024)))) { aborted = true; } } else { diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index a842ac0cdf0..577d6112acf 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -1,6 +1,7 @@ #include "columnshard_impl.h" #include "blobs_action/transaction/tx_write.h" #include "blobs_action/transaction/tx_draft.h" +#include "counters/columnshard.h" #include "operations/slice_builder.h" #include "operations/write_data.h" @@ -21,9 +22,17 @@ void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const IncCounter(COUNTER_WRITE_OVERLOAD); CSCounters.OnOverloadInsertTable(writeData.GetSize()); break; - case EOverloadStatus::Shard: + case EOverloadStatus::ShardTxInFly: IncCounter(COUNTER_WRITE_OVERLOAD); - CSCounters.OnOverloadShard(writeData.GetSize()); + CSCounters.OnOverloadShardTx(writeData.GetSize()); + break; + case EOverloadStatus::ShardWritesInFly: + IncCounter(COUNTER_WRITE_OVERLOAD); + CSCounters.OnOverloadShardWrites(writeData.GetSize()); + break; + case EOverloadStatus::ShardWritesSizeInFly: + IncCounter(COUNTER_WRITE_OVERLOAD); + CSCounters.OnOverloadShardWritesSize(writeData.GetSize()); break; case EOverloadStatus::None: Y_ABORT("invalid function usage"); @@ -45,8 +54,20 @@ TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 tableId) return EOverloadStatus::InsertTable; } - if (WritesMonitor.ShardOverloaded()) { - return EOverloadStatus::Shard; + ui64 txLimit = Settings.OverloadTxInFlight; + ui64 writesLimit = Settings.OverloadWritesInFlight; + ui64 writesSizeLimit = Settings.OverloadWritesSizeInFlight; + if (txLimit && Executor()->GetStats().TxInFly > txLimit) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "shard_overload")("reason", "tx_in_fly")("sum", Executor()->GetStats().TxInFly)("limit", txLimit); + return EOverloadStatus::ShardTxInFly; + } + if (writesLimit && WritesMonitor.GetWritesInFlight() > writesLimit) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "shard_overload")("reason", "writes_in_fly")("sum", WritesMonitor.GetWritesInFlight())("limit", writesLimit); + return EOverloadStatus::ShardWritesInFly; + } + if (writesSizeLimit && WritesMonitor.GetWritesSizeInFlight() > writesSizeLimit) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "shard_overload")("reason", "writes_size_in_fly")("sum", WritesMonitor.GetWritesSizeInFlight())("limit", writesSizeLimit); + return EOverloadStatus::ShardWritesSizeInFly; } return EOverloadStatus::None; } @@ -57,7 +78,8 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo auto& putResult = ev->Get()->GetPutResult(); OnYellowChannels(putResult); NOlap::TWritingBuffer& wBuffer = ev->Get()->MutableWritesBuffer(); - auto& baseAggregations = wBuffer.GetAggregations(); + auto baseAggregations = wBuffer.GetAggregations(); + wBuffer.InitReplyReceived(TMonotonic::Now()); auto wg = WritesMonitor.FinishWrite(wBuffer.GetSumSize(), wBuffer.GetAggregations().size()); @@ -70,13 +92,13 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR); ctx.Send(writeMeta.GetSource(), result.release()); - CSCounters.OnFailedWriteResponse(); + CSCounters.OnFailedWriteResponse(EWriteFailReason::NoTable); wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator()); continue; } if (putResult.GetPutStatus() != NKikimrProto::OK) { - CSCounters.OnWritePutBlobsFail((TMonotonic::Now() - writeMeta.GetWriteStartInstant()).MilliSeconds()); + CSCounters.OnWritePutBlobsFail(TMonotonic::Now() - writeMeta.GetWriteStartInstant()); IncCounter(COUNTER_WRITE_FAIL); auto errCode = NKikimrTxColumnShard::EResultStatus::STORAGE_ERROR; @@ -97,16 +119,17 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "put data fails"); ctx.Send(writeMeta.GetSource(), result.release()); } - CSCounters.OnFailedWriteResponse(); + CSCounters.OnFailedWriteResponse(EWriteFailReason::PutBlob); wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator()); } else { const TMonotonic now = TMonotonic::Now(); - CSCounters.OnWritePutBlobsSuccess((now - writeMeta.GetWriteStartInstant()).MilliSeconds()); - CSCounters.OnWriteMiddle1PutBlobsSuccess((now - writeMeta.GetWriteMiddle1StartInstant()).MilliSeconds()); - CSCounters.OnWriteMiddle2PutBlobsSuccess((now - writeMeta.GetWriteMiddle2StartInstant()).MilliSeconds()); - CSCounters.OnWriteMiddle3PutBlobsSuccess((now - writeMeta.GetWriteMiddle3StartInstant()).MilliSeconds()); - CSCounters.OnWriteMiddle4PutBlobsSuccess((now - writeMeta.GetWriteMiddle4StartInstant()).MilliSeconds()); - CSCounters.OnWriteMiddle5PutBlobsSuccess((now - writeMeta.GetWriteMiddle5StartInstant()).MilliSeconds()); + CSCounters.OnWritePutBlobsSuccess(now - writeMeta.GetWriteStartInstant()); + CSCounters.OnWriteMiddle1PutBlobsSuccess(now - writeMeta.GetWriteMiddle1StartInstant()); + CSCounters.OnWriteMiddle2PutBlobsSuccess(now - writeMeta.GetWriteMiddle2StartInstant()); + CSCounters.OnWriteMiddle3PutBlobsSuccess(now - writeMeta.GetWriteMiddle3StartInstant()); + CSCounters.OnWriteMiddle4PutBlobsSuccess(now - writeMeta.GetWriteMiddle4StartInstant()); + CSCounters.OnWriteMiddle5PutBlobsSuccess(now - writeMeta.GetWriteMiddle5StartInstant()); + CSCounters.OnWriteMiddle6PutBlobsSuccess(now - writeMeta.GetWriteMiddle6StartInstant()); LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId() << (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID()); @@ -139,18 +162,20 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex IncCounter(signalIndex); ctx.Send(source, std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR)); - CSCounters.OnFailedWriteResponse(); return; }; if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "disabled"); + CSCounters.OnFailedWriteResponse(EWriteFailReason::Disabled); return returnFail(COUNTER_WRITE_FAIL); } if (!TablesManager.IsReadyForWrite(tableId)) { LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex()? "": " no index") << " at tablet " << TabletID()); + + CSCounters.OnFailedWriteResponse(EWriteFailReason::NoTable); return returnFail(COUNTER_WRITE_FAIL); } @@ -159,6 +184,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex if (!arrowData->ParseFromProto(record)) { LOG_S_ERROR("Write (fail) " << record.GetData().size() << " bytes into pathId " << writeMeta.GetTableId() << " at tablet " << TabletID()); + CSCounters.OnFailedWriteResponse(EWriteFailReason::IncorrectSchema); return returnFail(COUNTER_WRITE_FAIL); } @@ -167,7 +193,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex if (overloadStatus != EOverloadStatus::None) { std::unique_ptr<NActors::IEventBase> result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED); OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx); - CSCounters.OnFailedWriteResponse(); + CSCounters.OnFailedWriteResponse(EWriteFailReason::Overload); } else { if (ui64 writeId = (ui64)HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) { LOG_S_DEBUG("Write (duplicate) into pathId " << writeMeta.GetTableId() @@ -179,7 +205,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex auto result = std::make_unique<TEvColumnShard::TEvWriteResult>( TabletID(), writeMeta, writeId, NKikimrTxColumnShard::EResultStatus::SUCCESS); ctx.Send(writeMeta.GetSource(), result.release()); - CSCounters.OnFailedWriteResponse(); + CSCounters.OnFailedWriteResponse(EWriteFailReason::LongTxDuplication); return; } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 8ec5ef62099..86ba233e04f 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -297,6 +297,9 @@ void TColumnShard::TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTabl failedAborts.push_back(writeId); } } + if (failedAborts.size()) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "failed_aborts")("count", failedAborts.size())("writes_count", writesToAbort.size()); + } for (auto& writeId : failedAborts) { writesToAbort.erase(writeId); } @@ -813,6 +816,7 @@ void TColumnShard::SetupCleanupInsertTable() { if (!InsertTable->GetAborted().size() && !writeIdsToCleanup.size()) { return; } + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "cleanup_started")("aborted", InsertTable->GetAborted().size())("to_cleanup", writeIdsToCleanup.size()); Execute(new TTxInsertTableCleanup(this, std::move(writeIdsToCleanup)), TActorContext::AsActorContext()); } diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 6a94e4b5167..7382ea645cb 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -211,7 +211,9 @@ class TColumnShard void OnTieringModified(); public: enum class EOverloadStatus { - Shard /* "shard" */, + ShardTxInFly /* "shard_tx" */, + ShardWritesInFly /* "shard_writes" */, + ShardWritesSizeInFly /* "shard_writes_size" */, InsertTable /* "insert_table" */, Disk /* "disk" */, None /* "none" */ @@ -326,8 +328,8 @@ private: class TWritesMonitor { private: TColumnShard& Owner; - ui64 WritesInFlight = 0; - ui64 WritesSizeInFlight = 0; + YDB_READONLY(ui64, WritesInFlight, 0); + YDB_READONLY(ui64, WritesSizeInFlight, 0); public: class TGuard: public TNonCopyable { @@ -363,17 +365,8 @@ private: return TGuard(*this); } - bool ShardOverloaded() const { - ui64 txLimit = Owner.Settings.OverloadTxInFlight; - ui64 writesLimit = Owner.Settings.OverloadWritesInFlight; - ui64 writesSizeLimit = Owner.Settings.OverloadWritesSizeInFlight; - return (txLimit && Owner.Executor()->GetStats().TxInFly > txLimit) || - (writesLimit && WritesInFlight > writesLimit) || - (writesSizeLimit && WritesSizeInFlight > writesSizeLimit); - } - TString DebugString() const { - return TStringBuilder() << "TWritesMonitor: inflight " << WritesInFlight << " (" << WritesSizeInFlight << " bytes)"; + return TStringBuilder() << "{object=write_monitor;count=" << WritesInFlight << ";size=" << WritesSizeInFlight << "}"; } private: diff --git a/ydb/core/tx/columnshard/common/limits.cpp b/ydb/core/tx/columnshard/common/limits.cpp new file mode 100644 index 00000000000..7bd45fa70d3 --- /dev/null +++ b/ydb/core/tx/columnshard/common/limits.cpp @@ -0,0 +1,4 @@ +#include "limits.h" + +namespace NKikimr::NColumnShard { +}
\ No newline at end of file diff --git a/ydb/core/tx/columnshard/common/limits.h b/ydb/core/tx/columnshard/common/limits.h new file mode 100644 index 00000000000..04a5cb55128 --- /dev/null +++ b/ydb/core/tx/columnshard/common/limits.h @@ -0,0 +1,9 @@ +#pragma once +#include <util/system/types.h> + +namespace NKikimr::NOlap { +class TGlobalLimits { +public: + static const inline ui64 TxWriteLimitBytes = 256 * 1024 * 1024; +}; +}
\ No newline at end of file diff --git a/ydb/core/tx/columnshard/common/snapshot.h b/ydb/core/tx/columnshard/common/snapshot.h index 942daa24e70..389310cd528 100644 --- a/ydb/core/tx/columnshard/common/snapshot.h +++ b/ydb/core/tx/columnshard/common/snapshot.h @@ -1,7 +1,9 @@ #pragma once +#include <ydb/library/conclusion/status.h> + #include <util/stream/output.h> #include <util/string/cast.h> -#include <ydb/library/conclusion/status.h> +#include <util/datetime/base.h> namespace NKikimrColumnShardProto { class TSnapshot; @@ -20,6 +22,10 @@ public: , TxId(txId) { } + constexpr TInstant GetPlanInstant() const noexcept { + return TInstant::MilliSeconds(PlanStep); + } + constexpr ui64 GetPlanStep() const noexcept { return PlanStep; } diff --git a/ydb/core/tx/columnshard/common/ya.make b/ydb/core/tx/columnshard/common/ya.make index a1006f15143..a73340ea968 100644 --- a/ydb/core/tx/columnshard/common/ya.make +++ b/ydb/core/tx/columnshard/common/ya.make @@ -1,6 +1,7 @@ LIBRARY() SRCS( + limits.h reverse_accessor.cpp scalars.cpp snapshot.cpp diff --git a/ydb/core/tx/columnshard/counters/columnshard.cpp b/ydb/core/tx/columnshard/counters/columnshard.cpp index 989deae6b9b..e094fbbcd5a 100644 --- a/ydb/core/tx/columnshard/counters/columnshard.cpp +++ b/ydb/core/tx/columnshard/counters/columnshard.cpp @@ -2,6 +2,8 @@ #include <ydb/core/base/appdata.h> #include <ydb/core/base/counters.h> +#include <ydb/library/actors/core/log.h> + namespace NKikimr::NColumnShard { TCSCounters::TCSCounters() @@ -24,8 +26,12 @@ TCSCounters::TCSCounters() OverloadInsertTableBytes = TBase::GetDeriviative("OverloadInsertTable/Bytes"); OverloadInsertTableCount = TBase::GetDeriviative("OverloadInsertTable/Count"); - OverloadShardBytes = TBase::GetDeriviative("OverloadShard/Bytes"); - OverloadShardCount = TBase::GetDeriviative("OverloadShard/Count"); + OverloadShardTxBytes = TBase::GetDeriviative("OverloadShard/Tx/Bytes"); + OverloadShardTxCount = TBase::GetDeriviative("OverloadShard/Tx/Count"); + OverloadShardWritesBytes = TBase::GetDeriviative("OverloadShard/Writes/Bytes"); + OverloadShardWritesCount = TBase::GetDeriviative("OverloadShard/Writes/Count"); + OverloadShardWritesSizeBytes = TBase::GetDeriviative("OverloadShard/WritesSize/Bytes"); + OverloadShardWritesSizeCount = TBase::GetDeriviative("OverloadShard/WritesSize/Count"); InternalCompactionGranuleBytes = TBase::GetValueAutoAggregationsClient("InternalCompaction/Bytes"); InternalCompactionGranulePortionsCount = TBase::GetValueAutoAggregationsClient("InternalCompaction/PortionsCount"); @@ -39,12 +45,25 @@ TCSCounters::TCSCounters() HistogramSuccessWriteMiddle3PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle3PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); HistogramSuccessWriteMiddle4PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle4PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); HistogramSuccessWriteMiddle5PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle5PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); + HistogramSuccessWriteMiddle6PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle6PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); HistogramFailedWritePutBlobsDurationMs = TBase::GetHistogram("FailedWritePutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); HistogramWriteTxCompleteDurationMs = TBase::GetHistogram("WriteTxCompleteDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); WritePutBlobsCount = TBase::GetValue("WritePutBlobs"); WriteRequests = TBase::GetValue("WriteRequests"); - FailedWriteRequests = TBase::GetDeriviative("FailedWriteRequests"); + + for (auto&& i : GetEnumAllValues<EWriteFailReason>()) { + auto sub = CreateSubGroup("reason", ::ToString(i)); + FailedWriteRequests.emplace(i, sub.GetDeriviative("FailedWriteRequests")); + } + SuccessWriteRequests = TBase::GetDeriviative("SuccessWriteRequests"); } +void TCSCounters::OnFailedWriteResponse(const EWriteFailReason reason) const { + WriteRequests->Sub(1); + auto it = FailedWriteRequests.find(reason); + AFL_VERIFY(it != FailedWriteRequests.end()); + it->second->Add(1); +} + } diff --git a/ydb/core/tx/columnshard/counters/columnshard.h b/ydb/core/tx/columnshard/counters/columnshard.h index ffe935c995c..2441ca6fef3 100644 --- a/ydb/core/tx/columnshard/counters/columnshard.h +++ b/ydb/core/tx/columnshard/counters/columnshard.h @@ -1,9 +1,21 @@ #pragma once -#include <library/cpp/monlib/dynamic_counters/counters.h> #include "common/owner.h" +#include <library/cpp/monlib/dynamic_counters/counters.h> + +#include <util/generic/hash_set.h> + namespace NKikimr::NColumnShard { +enum class EWriteFailReason { + Disabled /* "disabled" */, + PutBlob /* "put_blob" */, + LongTxDuplication /* "long_tx_duplication" */, + NoTable /* "no_table" */, + IncorrectSchema /* "incorrect_schema" */, + Overload /* "overload" */ +}; + class TCSCounters: public TCommonCountersOwner { private: using TBase = TCommonCountersOwner; @@ -24,8 +36,12 @@ private: NMonitoring::TDynamicCounters::TCounterPtr OverloadInsertTableBytes; NMonitoring::TDynamicCounters::TCounterPtr OverloadInsertTableCount; - NMonitoring::TDynamicCounters::TCounterPtr OverloadShardBytes; - NMonitoring::TDynamicCounters::TCounterPtr OverloadShardCount; + NMonitoring::TDynamicCounters::TCounterPtr OverloadShardTxBytes; + NMonitoring::TDynamicCounters::TCounterPtr OverloadShardTxCount; + NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesBytes; + NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesCount; + NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesSizeBytes; + NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesSizeCount; std::shared_ptr<TValueAggregationClient> InternalCompactionGranuleBytes; std::shared_ptr<TValueAggregationClient> InternalCompactionGranulePortionsCount; @@ -39,54 +55,56 @@ private: NMonitoring::THistogramPtr HistogramSuccessWriteMiddle3PutBlobsDurationMs; NMonitoring::THistogramPtr HistogramSuccessWriteMiddle4PutBlobsDurationMs; NMonitoring::THistogramPtr HistogramSuccessWriteMiddle5PutBlobsDurationMs; + NMonitoring::THistogramPtr HistogramSuccessWriteMiddle6PutBlobsDurationMs; NMonitoring::THistogramPtr HistogramFailedWritePutBlobsDurationMs; NMonitoring::THistogramPtr HistogramWriteTxCompleteDurationMs; NMonitoring::TDynamicCounters::TCounterPtr WritePutBlobsCount; NMonitoring::TDynamicCounters::TCounterPtr WriteRequests; - NMonitoring::TDynamicCounters::TCounterPtr FailedWriteRequests; + THashMap<EWriteFailReason, NMonitoring::TDynamicCounters::TCounterPtr> FailedWriteRequests; NMonitoring::TDynamicCounters::TCounterPtr SuccessWriteRequests; public: void OnStartWriteRequest() const { WriteRequests->Add(1); } - void OnFailedWriteResponse() const { - WriteRequests->Sub(1); - FailedWriteRequests->Add(1); - } + void OnFailedWriteResponse(const EWriteFailReason reason) const; void OnSuccessWriteResponse() const { WriteRequests->Sub(1); SuccessWriteRequests->Add(1); } - void OnWritePutBlobsSuccess(const ui32 milliseconds) const { - HistogramSuccessWritePutBlobsDurationMs->Collect(milliseconds); + void OnWritePutBlobsSuccess(const TDuration d) const { + HistogramSuccessWritePutBlobsDurationMs->Collect(d.MilliSeconds()); WritePutBlobsCount->Sub(1); } - void OnWriteMiddle1PutBlobsSuccess(const ui32 milliseconds) const { - HistogramSuccessWriteMiddle1PutBlobsDurationMs->Collect(milliseconds); + void OnWriteMiddle1PutBlobsSuccess(const TDuration d) const { + HistogramSuccessWriteMiddle1PutBlobsDurationMs->Collect(d.MilliSeconds()); + } + + void OnWriteMiddle2PutBlobsSuccess(const TDuration d) const { + HistogramSuccessWriteMiddle2PutBlobsDurationMs->Collect(d.MilliSeconds()); } - void OnWriteMiddle2PutBlobsSuccess(const ui32 milliseconds) const { - HistogramSuccessWriteMiddle2PutBlobsDurationMs->Collect(milliseconds); + void OnWriteMiddle3PutBlobsSuccess(const TDuration d) const { + HistogramSuccessWriteMiddle3PutBlobsDurationMs->Collect(d.MilliSeconds()); } - void OnWriteMiddle3PutBlobsSuccess(const ui32 milliseconds) const { - HistogramSuccessWriteMiddle3PutBlobsDurationMs->Collect(milliseconds); + void OnWriteMiddle4PutBlobsSuccess(const TDuration d) const { + HistogramSuccessWriteMiddle4PutBlobsDurationMs->Collect(d.MilliSeconds()); } - void OnWriteMiddle4PutBlobsSuccess(const ui32 milliseconds) const { - HistogramSuccessWriteMiddle4PutBlobsDurationMs->Collect(milliseconds); + void OnWriteMiddle5PutBlobsSuccess(const TDuration d) const { + HistogramSuccessWriteMiddle5PutBlobsDurationMs->Collect(d.MilliSeconds()); } - void OnWriteMiddle5PutBlobsSuccess(const ui32 milliseconds) const { - HistogramSuccessWriteMiddle5PutBlobsDurationMs->Collect(milliseconds); + void OnWriteMiddle6PutBlobsSuccess(const TDuration d) const { + HistogramSuccessWriteMiddle6PutBlobsDurationMs->Collect(d.MilliSeconds()); } - void OnWritePutBlobsFail(const ui32 milliseconds) const { - HistogramFailedWritePutBlobsDurationMs->Collect(milliseconds); + void OnWritePutBlobsFail(const TDuration d) const { + HistogramFailedWritePutBlobsDurationMs->Collect(d.MilliSeconds()); WritePutBlobsCount->Sub(1); } @@ -94,8 +112,8 @@ public: WritePutBlobsCount->Add(1); } - void OnWriteTxComplete(const ui32 milliseconds) const { - HistogramWriteTxCompleteDurationMs->Collect(milliseconds); + void OnWriteTxComplete(const TDuration d) const { + HistogramWriteTxCompleteDurationMs->Collect(d.MilliSeconds()); } void OnInternalCompactionInfo(const ui64 bytes, const ui32 portionsCount) const { @@ -113,9 +131,19 @@ public: OverloadInsertTableCount->Add(1); } - void OnOverloadShard(const ui64 size) const { - OverloadShardBytes->Add(size); - OverloadShardCount->Add(1); + void OnOverloadShardTx(const ui64 size) const { + OverloadShardTxBytes->Add(size); + OverloadShardTxCount->Add(1); + } + + void OnOverloadShardWrites(const ui64 size) const { + OverloadShardWritesBytes->Add(size); + OverloadShardWritesCount->Add(1); + } + + void OnOverloadShardWritesSize(const ui64 size) const { + OverloadShardWritesSizeBytes->Add(size); + OverloadShardWritesSizeCount->Add(1); } void SkipIndexationInputDueToSplitCompaction(const ui64 size) const { diff --git a/ydb/core/tx/columnshard/counters/common/owner.h b/ydb/core/tx/columnshard/counters/common/owner.h index 1ea4bd513b7..456699c80dc 100644 --- a/ydb/core/tx/columnshard/counters/common/owner.h +++ b/ydb/core/tx/columnshard/counters/common/owner.h @@ -50,6 +50,9 @@ public: NMonitoring::TDynamicCounters::TCounterPtr GetDeriviative(const TString& name) const; void DeepSubGroup(const TString& id, const TString& value); void DeepSubGroup(const TString& componentName); + TCommonCountersOwner CreateSubGroup(const TString& param, const TString& value) const { + return TCommonCountersOwner(*this, param, value); + } NMonitoring::THistogramPtr GetHistogram(const TString& name, NMonitoring::IHistogramCollectorPtr&& hCollector) const; TCommonCountersOwner(const TString& module, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseSignals = nullptr); diff --git a/ydb/core/tx/columnshard/counters/ya.make b/ydb/core/tx/columnshard/counters/ya.make index aed1f68de28..5a0dbc7c325 100644 --- a/ydb/core/tx/columnshard/counters/ya.make +++ b/ydb/core/tx/columnshard/counters/ya.make @@ -17,4 +17,6 @@ PEERDIR( ydb/core/base ) +GENERATE_ENUM_SERIALIZATION(columnshard.h) + END() diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp index cfba0a99406..9a0ad102e54 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp +++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp @@ -9,13 +9,7 @@ namespace NKikimr::NOlap { void TTTLColumnEngineChanges::DoDebugString(TStringOutput& out) const { TBase::DoDebugString(out); - if (PortionsToEvict.size()) { - out << "eviction=(count=" << PortionsToEvict.size() << ";portions=["; - for (auto& info : PortionsToEvict) { - out << info.GetPortionInfo() << ";to=" << info.GetFeatures().TargetTierName << ";"; - } - out << "];"; - } + out << "eviction=" << PortionsToEvict.size() << ";"; } void TTTLColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index 2ff9533af29..8b47d79ffb8 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -6,16 +6,6 @@ namespace NKikimr::NOlap { -void TChangesWithAppend::DoDebugString(TStringOutput& out) const { - if (ui32 added = AppendedPortions.size()) { - out << "portions_count:" << added << ";portions=("; - for (auto& portionInfo : AppendedPortions) { - out << portionInfo; - } - out << "); "; - } -} - void TChangesWithAppend::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& /*context*/) { for (auto& portionInfo : AppendedPortions) { switch (portionInfo.GetPortionInfo().GetMeta().Produced) { diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h index a398c61b005..779a3ab8a14 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.h +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h @@ -13,7 +13,6 @@ private: protected: TSplitSettings SplitSettings; TSaverContext SaverContext; - virtual void DoDebugString(TStringOutput& out) const override; virtual void DoCompile(TFinalizationContext& context) override; virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context) override; virtual void DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override; @@ -24,6 +23,10 @@ protected: std::vector<TPortionInfoWithBlobs> MakeAppendedPortions(const std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context) const; + virtual void DoDebugString(TStringOutput& out) const override { + out << "remove=" << PortionsToRemove.size() << ";append=" << AppendedPortions.size() << ";"; + } + public: const TSplitSettings& GetSplitSettings() const { return SplitSettings; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index a0bc0dbeffa..17833bc5a6e 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -1,6 +1,7 @@ #include "column_engine_logs.h" #include "filter.h" +#include <ydb/core/tx/columnshard/common/limits.h> #include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> #include <ydb/core/formats/arrow/one_batch_input_stream.h> #include <ydb/core/formats/arrow/merging_sorted_input_stream.h> @@ -238,7 +239,11 @@ std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(st } std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(const TCompactionLimits& limits, const THashSet<TPortionAddress>& busyPortions) noexcept { - auto granule = GranulesStorage->GetGranuleForCompaction(Tables); + THashSet<ui64> busyGranuleIds; + for (auto&& i : busyPortions) { + busyGranuleIds.emplace(i.GetPathId()); + } + auto granule = GranulesStorage->GetGranuleForCompaction(Tables, busyGranuleIds); if (!granule) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no granules for start compaction"); return nullptr; @@ -253,14 +258,16 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(cons } std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const TSnapshot& snapshot, - THashSet<ui64>& pathsToDrop, ui32 maxRecords) noexcept { + THashSet<ui64>& pathsToDrop, ui32 /*maxRecords*/) noexcept { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size()); auto changes = std::make_shared<TCleanupColumnEngineChanges>(StoragesManager); - ui32 affectedRecords = 0; // Add all portions from dropped paths THashSet<ui64> dropPortions; THashSet<ui64> emptyPaths; + ui64 txSize = 0; + const ui64 txSizeLimit = TGlobalLimits::TxWriteLimitBytes / 4; + changes->NeedRepeat = false; for (ui64 pathId : pathsToDrop) { auto itTable = Tables.find(pathId); if (itTable == Tables.end()) { @@ -269,25 +276,21 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup( } for (auto& [portion, info] : itTable->second->GetPortions()) { - affectedRecords += info->NumChunks(); - changes->PortionsToDrop.push_back(*info); - dropPortions.insert(portion); - - if (affectedRecords > maxRecords) { + if (txSize + info->GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) { + txSize += info->GetTxVolume(); + } else { + changes->NeedRepeat = true; break; } + changes->PortionsToDrop.push_back(*info); + dropPortions.insert(portion); } } for (ui64 pathId : emptyPaths) { pathsToDrop.erase(pathId); } - if (affectedRecords > maxRecords) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size())("portions_prepared", changes->PortionsToDrop.size()); - return changes; - } - - while (CleanupPortions.size() && affectedRecords <= maxRecords) { + while (CleanupPortions.size() && !changes->NeedRepeat) { auto it = CleanupPortions.begin(); if (it->first >= snapshot) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanupStop")("snapshot", snapshot.DebugString())("current_snapshot", it->first.DebugString()); @@ -295,12 +298,16 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup( } for (auto&& i : it->second) { Y_ABORT_UNLESS(i.CheckForCleanup(snapshot)); - affectedRecords += i.NumChunks(); + if (txSize + i.GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) { + txSize += i.GetTxVolume(); + } else { + changes->NeedRepeat = true; + break; + } changes->PortionsToDrop.push_back(i); } CleanupPortions.erase(it); } - changes->NeedRepeat = affectedRecords > maxRecords; AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size())("portions_prepared", changes->PortionsToDrop.size()); if (changes->PortionsToDrop.empty()) { @@ -312,7 +319,6 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup( TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering& ttl, TTieringProcessContext& context) const { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "ProcessTiering")("path_id", pathId)("ttl", ttl.GetDebugString()); - ui64 dropBlobs = 0; auto& indexInfo = VersionedIndex.GetLastSchema()->GetIndexInfo(); Y_ABORT_UNLESS(context.Changes->Tiering.emplace(pathId, ttl).second); @@ -330,6 +336,7 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering auto ttlColumnNames = ttl.GetTtlColumns(); Y_ABORT_UNLESS(ttlColumnNames.size() == 1); // TODO: support different ttl columns ui32 ttlColumnId = indexInfo.GetColumnId(*ttlColumnNames.begin()); + const TInstant now = TInstant::Now(); for (auto& [portion, info] : itTable->second->GetPortions()) { if (info->HasRemoveSnapshot()) { continue; @@ -339,8 +346,7 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering continue; } - context.AllowDrop = (dropBlobs <= TCompactionLimits::MAX_BLOBS_TO_DELETE); - const bool tryEvictPortion = ttl.HasTiers() && context.HasMemoryForEviction(); + const bool tryEvictPortion = ttl.HasTiers() && context.HasLimitsForEviction(); if (auto max = info->MaxValue(ttlColumnId)) { bool keep = !expireTimestampOpt; @@ -356,10 +362,15 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering } } - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", context.AllowDrop); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", context.HasLimitsForTtl()); if (keep && tryEvictPortion) { const TString currentTierName = info->GetMeta().GetTierName() ? info->GetMeta().GetTierName() : IStoragesManager::DefaultStorageId; TString tierName = ""; + const TInstant maxChangePortionInstant = info->RecordSnapshotMax().GetPlanInstant(); + if (now - maxChangePortionInstant < TDuration::Minutes(60)) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_portion_to_evict")("reason", "too_fresh")("delta", now - maxChangePortionInstant); + continue; + } for (auto& tierRef : ttl.GetOrderedTiers()) { auto& tierInfo = tierRef.Get(); if (!indexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) { @@ -389,22 +400,22 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering if (currentTierName != tierName) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering switch detected")("from", currentTierName)("to", tierName); context.Changes->AddPortionToEvict(*info, TPortionEvictionFeatures(tierName, pathId, StoragesManager->GetOperator(tierName))); - context.AppPortionForCheckMemoryUsage(*info); + context.AppPortionForEvictionChecker(*info); SignalCounters.OnPortionToEvict(info->BlobsBytes()); } } - if (!keep && context.AllowDrop) { + if (!keep && context.HasLimitsForTtl()) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_remove")("portion", info->DebugString()); - dropBlobs += info->NumBlobs(); AFL_VERIFY(context.Changes->PortionsToRemove.emplace(info->GetAddress(), *info).second); SignalCounters.OnPortionToDrop(info->BlobsBytes()); + context.AppPortionForTtlChecker(*info); } } else { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_not_max"); SignalCounters.OnPortionNoBorder(info->BlobsBytes()); } } - if (dWaiting > TDuration::MilliSeconds(500) && (!context.HasMemoryForEviction() || !context.AllowDrop)) { + if (dWaiting > TDuration::MilliSeconds(500) && (!context.HasLimitsForEviction() || !context.HasLimitsForTtl())) { dWaiting = TDuration::MilliSeconds(500); } Y_ABORT_UNLESS(!!dWaiting); diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 7cd28ac2be6..c281d9b653f 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -3,6 +3,7 @@ #include "defs.h" #include "column_engine.h" #include <ydb/core/tx/columnshard/common/scalars.h> +#include <ydb/core/tx/columnshard/common/limits.h> #include <ydb/core/tx/columnshard/counters/engine_logs.h> #include <ydb/core/tx/columnshard/columnshard_ttl.h> #include "scheme/tier_info.h" @@ -87,21 +88,29 @@ private: private: const ui64 MemoryUsageLimit; ui64 MemoryUsage = 0; + ui64 TxWriteVolume = 0; std::shared_ptr<TColumnEngineChanges::IMemoryPredictor> MemoryPredictor; public: - bool AllowEviction = true; - bool AllowDrop = true; const TInstant Now; std::shared_ptr<TTTLColumnEngineChanges> Changes; std::map<ui64, TDuration> DurationsForced; const THashSet<TPortionAddress>& BusyPortions; - void AppPortionForCheckMemoryUsage(const TPortionInfo& info) { + void AppPortionForEvictionChecker(const TPortionInfo& info) { MemoryUsage = MemoryPredictor->AddPortion(info); + TxWriteVolume += info.GetTxVolume(); } - bool HasMemoryForEviction() const { - return MemoryUsage < MemoryUsageLimit; + void AppPortionForTtlChecker(const TPortionInfo& info) { + TxWriteVolume += info.GetTxVolume(); + } + + bool HasLimitsForEviction() const { + return MemoryUsage < MemoryUsageLimit && TxWriteVolume < TGlobalLimits::TxWriteLimitBytes; + } + + bool HasLimitsForTtl() const { + return TxWriteVolume < TGlobalLimits::TxWriteLimitBytes; } TTieringProcessContext(const ui64 memoryUsageLimit, std::shared_ptr<TTTLColumnEngineChanges> changes, diff --git a/ydb/core/tx/columnshard/engines/portions/meta.cpp b/ydb/core/tx/columnshard/engines/portions/meta.cpp index 9f6a2c30823..fde6d7135c3 100644 --- a/ydb/core/tx/columnshard/engines/portions/meta.cpp +++ b/ydb/core/tx/columnshard/engines/portions/meta.cpp @@ -69,7 +69,7 @@ bool TPortionMeta::DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortio } std::optional<NKikimrTxColumnShard::TIndexPortionMeta> TPortionMeta::SerializeToProto(const ui32 columnId, const ui32 chunk) const { - if (columnId != FirstPkColumn || chunk != 0) { + if (!IsChunkWithPortionInfo(columnId, chunk)) { return {}; } diff --git a/ydb/core/tx/columnshard/engines/portions/meta.h b/ydb/core/tx/columnshard/engines/portions/meta.h index 7acafc89a77..005021f7c45 100644 --- a/ydb/core/tx/columnshard/engines/portions/meta.h +++ b/ydb/core/tx/columnshard/engines/portions/meta.h @@ -26,6 +26,10 @@ public: EProduced Produced{EProduced::UNSPECIFIED}; ui32 FirstPkColumn = 0; + bool IsChunkWithPortionInfo(const ui32 columnId, const ui32 chunkIdx) const { + return columnId == FirstPkColumn && chunkIdx == 0; + } + bool DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo); std::optional<NKikimrTxColumnShard::TIndexPortionMeta> SerializeToProto(const ui32 columnId, const ui32 chunk) const; diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 3deb26a0a73..23b21564ab8 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -306,6 +306,10 @@ std::vector<NKikimr::NOlap::TPortionInfo::TPage> TPortionInfo::BuildPages() cons return pages; } +ui64 TPortionInfo::GetTxVolume() const { + return 1024 + Records.size() * 256 + Indexes.size() * 256; +} + std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const { Y_ABORT_UNLESS(!Blobs.empty()); diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 01850d31951..d5a052ae52c 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -30,6 +30,8 @@ private: YDB_READONLY_DEF(std::vector<TIndexChunk>, Indexes); public: + ui64 GetTxVolume() const; // fake-correct method for determ volume on rewrite this portion in transaction progress + class TPage { private: YDB_READONLY_DEF(std::vector<const TColumnRecord*>, Records); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp index 96923fed2a2..7f38c7dfc00 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp @@ -3,6 +3,8 @@ #include <ydb/core/tx/columnshard/engines/filter.h> #include <ydb/core/formats/arrow/simple_arrays_cache.h> +#include <ydb/library/yql/minikql/mkql_terminator.h> + namespace NKikimr::NOlap::NPlainReader { bool TStepAction::DoApply(IDataReader& /*owner*/) const { @@ -14,6 +16,7 @@ bool TStepAction::DoApply(IDataReader& /*owner*/) const { } bool TStepAction::DoExecute() { + NMiniKQL::TThrowingBindTerminator bind; while (Step) { if (Source->IsEmptyData()) { Source->Finalize(); @@ -58,6 +61,9 @@ bool TAssemblerStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source } bool TFilterProgramStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& /*step*/) const { + AFL_VERIFY(source); + AFL_VERIFY(Step); + AFL_VERIFY(source->GetStageData().GetTable()); auto filter = Step->BuildFilter(source->GetStageData().GetTable()); source->MutableStageData().AddFilter(filter); return true; diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h index b941510a4c7..9ce196f5a79 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h @@ -1,11 +1,12 @@ #pragma once #include "counters.h" -#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h> -#include <ydb/core/tx/columnshard/engines/changes/abstract/abstract.h> -#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> #include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h> +#include <ydb/core/tx/columnshard/common/limits.h> #include <ydb/core/tx/columnshard/engines/changes/general_compaction.h> +#include <ydb/core/tx/columnshard/engines/changes/abstract/abstract.h> +#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> +#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h> #include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> #include <ydb/library/accessor/accessor.h> @@ -361,8 +362,12 @@ public: std::vector<std::shared_ptr<TPortionInfo>> result; std::shared_ptr<NCompaction::TGeneralCompactColumnEngineChanges::IMemoryPredictor> predictor = NCompaction::TGeneralCompactColumnEngineChanges::BuildMemoryPredictor(); + ui64 txSizeLimit = 0; for (auto&& i : sorted) { result.emplace_back(i); + if (txSizeLimit + i->GetTxVolume() > TGlobalLimits::TxWriteLimitBytes / 2) { + break; + } if (predictor->AddPortion(*i) > sizeLimit && result.size() > 1) { break; } diff --git a/ydb/core/tx/columnshard/engines/storage/storage.cpp b/ydb/core/tx/columnshard/engines/storage/storage.cpp index 7e6f7e81b53..c51c2bd6e16 100644 --- a/ydb/core/tx/columnshard/engines/storage/storage.cpp +++ b/ydb/core/tx/columnshard/engines/storage/storage.cpp @@ -9,12 +9,15 @@ void TGranulesStorage::UpdateGranuleInfo(const TGranuleMeta& granule) { } } -std::shared_ptr<NKikimr::NOlap::TGranuleMeta> TGranulesStorage::GetGranuleForCompaction(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules) const { +std::shared_ptr<NKikimr::NOlap::TGranuleMeta> TGranulesStorage::GetGranuleForCompaction(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules, const THashSet<ui64>& busyGranuleIds) const { const TInstant now = TInstant::Now(); std::optional<NStorageOptimizer::TOptimizationPriority> priority; std::shared_ptr<TGranuleMeta> granule; for (auto&& i : granules) { i.second->ActualizeOptimizer(now); + if (busyGranuleIds.contains(i.first)) { + continue; + } if (!priority || *priority < i.second->GetCompactionPriority()) { priority = i.second->GetCompactionPriority(); granule = i.second; diff --git a/ydb/core/tx/columnshard/engines/storage/storage.h b/ydb/core/tx/columnshard/engines/storage/storage.h index 1fa7b524c88..7ddba8009f6 100644 --- a/ydb/core/tx/columnshard/engines/storage/storage.h +++ b/ydb/core/tx/columnshard/engines/storage/storage.h @@ -62,7 +62,7 @@ public: return TModificationGuard(*this); } - std::shared_ptr<TGranuleMeta> GetGranuleForCompaction(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules) const; + std::shared_ptr<TGranuleMeta> GetGranuleForCompaction(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules, const THashSet<ui64>& busyGranuleIds) const; void UpdateGranuleInfo(const TGranuleMeta& granule); diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp index 318e4889fe0..7b858a3e698 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp @@ -35,14 +35,45 @@ void TWideSerializedBatch::InitBlobId(const TUnifiedBlobId& id) { void TWritingBuffer::InitReadyInstant(const TMonotonic instant) { for (auto&& aggr : Aggregations) { - aggr->GetWriteData()->MutableWriteMeta().SetWriteMiddle4StartInstant(instant); + aggr->GetWriteData()->MutableWriteMeta().SetWriteMiddle5StartInstant(instant); } } void TWritingBuffer::InitStartSending(const TMonotonic instant) { for (auto&& aggr : Aggregations) { - aggr->GetWriteData()->MutableWriteMeta().SetWriteMiddle5StartInstant(instant); + aggr->GetWriteData()->MutableWriteMeta().SetWriteMiddle4StartInstant(instant); + } +} + +void TWritingBuffer::InitReplyReceived(const TMonotonic instant) { + for (auto&& aggr : Aggregations) { + aggr->GetWriteData()->MutableWriteMeta().SetWriteMiddle6StartInstant(instant); + } +} + +std::vector<NKikimr::NOlap::TWritingBlob> TWritingBuffer::GroupIntoBlobs() { + std::vector<TWritingBlob> result; + TWritingBlob currentBlob; + ui64 sumSize = 0; + for (auto&& aggr : Aggregations) { + for (auto&& bInfo : aggr->MutableSplittedBlobs()) { + if (!currentBlob.AddData(bInfo)) { + result.emplace_back(std::move(currentBlob)); + currentBlob = TWritingBlob(); + AFL_VERIFY(currentBlob.AddData(bInfo)); + } + sumSize += bInfo.GetSplittedBlobs().GetSize(); + } + } + if (currentBlob.GetSize()) { + result.emplace_back(std::move(currentBlob)); + } + if (result.size()) { + if (sumSize / result.size() < 4 * 1024 * 1024 && result.size() != 1) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "error_splitting")("size", sumSize)("count", result.size()); + } } + return result; } } diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h index 7f977a17159..392d76fd10c 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h @@ -160,24 +160,9 @@ public: void InitReadyInstant(const TMonotonic instant); void InitStartSending(const TMonotonic instant); + void InitReplyReceived(const TMonotonic instant); - std::vector<TWritingBlob> GroupIntoBlobs() { - std::vector<TWritingBlob> result; - TWritingBlob currentBlob; - for (auto&& aggr : Aggregations) { - for (auto&& bInfo : aggr->MutableSplittedBlobs()) { - if (!currentBlob.AddData(bInfo)) { - result.emplace_back(std::move(currentBlob)); - currentBlob = TWritingBlob(); - AFL_VERIFY(currentBlob.AddData(bInfo)); - } - } - } - if (currentBlob.GetSize()) { - result.emplace_back(std::move(currentBlob)); - } - return result; - } + std::vector<TWritingBlob> GroupIntoBlobs(); }; class TIndexedWriteController : public NColumnShard::IWriteController, public NColumnShard::TMonitoringObjectsCounter<TIndexedWriteController, true> { diff --git a/ydb/core/tx/columnshard/engines/writer/write_controller.h b/ydb/core/tx/columnshard/engines/writer/write_controller.h index 554423c8886..f6d030528b2 100644 --- a/ydb/core/tx/columnshard/engines/writer/write_controller.h +++ b/ydb/core/tx/columnshard/engines/writer/write_controller.h @@ -47,6 +47,19 @@ protected: return WriteTasks.back(); } public: + TString DebugString() const { + TStringBuilder sb; + for (auto&& i : WritingActions) { + sb << i.second->GetStorageId() << ","; + } + ui64 size = 0; + for (auto&& i : WriteTasks) { + size += i.GetBlobId().BlobSize(); + } + + return TStringBuilder() << "size=" << size << ";count=" << WriteTasks.size() << ";actions=" << sb << ";"; + } + void Abort() { for (auto&& i : WritingActions) { i.second->Abort(); diff --git a/ydb/core/tx/data_events/write_data.h b/ydb/core/tx/data_events/write_data.h index 739d9b3e89c..d25c65e65c1 100644 --- a/ydb/core/tx/data_events/write_data.h +++ b/ydb/core/tx/data_events/write_data.h @@ -5,6 +5,7 @@ #include <ydb/library/accessor/accessor.h> #include <ydb/library/actors/core/monotonic.h> +#include <util/generic/guid.h> namespace NKikimr::NOlap { class IBlobsWritingAction; @@ -31,12 +32,14 @@ class TWriteMeta { YDB_ACCESSOR(ui64, WritePartId, 0); YDB_ACCESSOR_DEF(TString, DedupId); + YDB_READONLY(TString, Id, TGUID::CreateTimebased().AsUuidString()); YDB_READONLY(TMonotonic, WriteStartInstant, TMonotonic::Now()); YDB_ACCESSOR(TMonotonic, WriteMiddle1StartInstant, TMonotonic::Now()); YDB_ACCESSOR(TMonotonic, WriteMiddle2StartInstant, TMonotonic::Now()); YDB_ACCESSOR(TMonotonic, WriteMiddle3StartInstant, TMonotonic::Now()); YDB_ACCESSOR(TMonotonic, WriteMiddle4StartInstant, TMonotonic::Now()); YDB_ACCESSOR(TMonotonic, WriteMiddle5StartInstant, TMonotonic::Now()); + YDB_ACCESSOR(TMonotonic, WriteMiddle6StartInstant, TMonotonic::Now()); public: TWriteMeta(const ui64 writeId, const ui64 tableId, const NActors::TActorId& source) : WriteId(writeId) |