diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-03 21:00:39 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-03 21:00:39 +0300 |
commit | 098093ad8fa6d9cedceb6d17acbdc575d4ceff7b (patch) | |
tree | 888d46c5c57ca0f8441b03a0de444d0ee609f747 | |
parent | 01d57c707c2348a54ee38e7c89b31d76df80adfe (diff) | |
download | ydb-098093ad8fa6d9cedceb6d17acbdc575d4ceff7b.tar.gz |
memory usage control (64Gb -> 4Gb on 100 000 000 000 records scan GROUP BY)
35 files changed, 557 insertions, 151 deletions
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt index 9cda5cfe72..f11d99ca83 100644 --- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt @@ -10,6 +10,7 @@ add_subdirectory(common) add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) +add_subdirectory(resources) add_subdirectory(ut_rw) add_subdirectory(ut_schema) get_built_tool_path( diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt index 8382342f0b..589bd3ab85 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt @@ -10,6 +10,7 @@ add_subdirectory(common) add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) +add_subdirectory(resources) add_subdirectory(ut_rw) add_subdirectory(ut_schema) get_built_tool_path( diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt index 8382342f0b..589bd3ab85 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt @@ -10,6 +10,7 @@ add_subdirectory(common) add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) +add_subdirectory(resources) add_subdirectory(ut_rw) add_subdirectory(ut_schema) get_built_tool_path( diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt index 9cda5cfe72..f11d99ca83 100644 --- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt @@ -10,6 +10,7 @@ add_subdirectory(common) add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) +add_subdirectory(resources) add_subdirectory(ut_rw) add_subdirectory(ut_schema) get_built_tool_path( diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h index 51b35b25aa..640947f314 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.h +++ b/ydb/core/tx/columnshard/columnshard__index_scan.h @@ -94,6 +94,7 @@ public: virtual TString DebugString() const override { return TStringBuilder() << "ready_results:(" << ReadyResults.DebugString() << ");" + << "has_buffer:" << IndexedData.GetMemoryAccessor()->HasBuffer() << ";" << "indexed_data:(" << IndexedData.DebugString() << ")" ; } diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 791e421732..25293efc35 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -67,6 +67,8 @@ public: }; class TColumnShardScan : public TActorBootstrapped<TColumnShardScan>, NArrow::IRowWriter { +private: + std::shared_ptr<NOlap::TActorBasedMemoryAccesor> MemoryAccessor; public: static constexpr auto ActorActivityType() { return NKikimrServices::TActivity::KQP_OLAP_SCAN; @@ -101,7 +103,8 @@ public: Schedule(Deadline, new TEvents::TEvWakeup); Y_VERIFY(!ScanIterator); - NOlap::TReadContext context(MakeTasksProcessor(), ScanCountersPool); + MemoryAccessor = std::make_shared<NOlap::TActorBasedMemoryAccesor>(SelfId(), "CSScan/Result"); + NOlap::TReadContext context(MakeTasksProcessor(), ScanCountersPool, MemoryAccessor); ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context); // propagate self actor id // TODO: FlagSubscribeOnSession ? @@ -143,7 +146,6 @@ private: if (!blobRange.BlobId.IsValid()) { break; } - ScanCountersPool.Aggregations->AddFlightReadInfo(blobRange.Size); ++InFlightReads; InFlightReadBytes += blobRange.Size; ranges[blobRange.BlobId].emplace_back(blobRange); @@ -344,7 +346,8 @@ private: // * we have finished scanning ALL the ranges // * or there is an in-flight blob read or ScanData message for which // we will get a reply and will be able to proceed further - if (!ScanIterator || !ChunksLimiter.HasMore() || InFlightReads != 0 || ScanIterator->HasWaitingTasks()) { + if (!ScanIterator || !ChunksLimiter.HasMore() || InFlightReads != 0 || ScanIterator->HasWaitingTasks() + || MemoryAccessor->InWaiting()) { return; } } @@ -392,12 +395,16 @@ private: Finish(); } - void HandleScan(TEvents::TEvWakeup::TPtr&) { - LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, - "Scan " << ScanActorId << " guard execution timeout" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); + void HandleScan(TEvents::TEvWakeup::TPtr& ev) { + if (ev->Get()->Tag) { + ContinueProcessing(); + } else { + LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, + "Scan " << ScanActorId << " guard execution timeout" + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); - Finish(); + Finish(); + } } private: @@ -421,7 +428,7 @@ private: return Finish(); } - NOlap::TReadContext context(MakeTasksProcessor(), ScanCountersPool); + NOlap::TReadContext context(MakeTasksProcessor(), ScanCountersPool, MemoryAccessor); ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context); // Used in TArrowToYdbConverter ResultYqlSchema.clear(); diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h index eee62328ab..f705b9701a 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.h +++ b/ydb/core/tx/columnshard/columnshard__scan.h @@ -2,6 +2,7 @@ #include "blob_cache.h" #include "engines/reader/conveyor_task.h" +#include "resources/memory.h" #include <ydb/core/formats/arrow/size_calcer.h> #include <ydb/core/tx/program/program.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> @@ -10,6 +11,7 @@ namespace NKikimr::NOlap { // Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation class TPartialReadResult { private: + std::shared_ptr<TScanMemoryLimiter::TGuard> MemoryGuard; std::shared_ptr<arrow::RecordBatch> ResultBatch; // This 1-row batch contains the last key that was read while producing the ResultBatch. @@ -18,18 +20,29 @@ private: public: void Slice(const ui32 offset, const ui32 length) { + const ui64 baseSize = NArrow::GetBatchDataSize(ResultBatch); ResultBatch = ResultBatch->Slice(offset, length); + MemoryGuard->Take(NArrow::GetBatchDataSize(ResultBatch)); + MemoryGuard->Free(baseSize); } void ApplyProgram(const NOlap::TProgramContainer& program) { + const ui64 baseSize = NArrow::GetBatchDataSize(ResultBatch); auto status = program.ApplyProgram(ResultBatch); if (!status.ok()) { ErrorString = status.message(); + } else { + MemoryGuard->Take(NArrow::GetBatchDataSize(ResultBatch)); + MemoryGuard->Free(baseSize); } } ui64 GetSize() const { - return NArrow::GetBatchDataSize(ResultBatch); + if (MemoryGuard) { + return MemoryGuard->GetValue(); + } else { + return 0; + } } const std::shared_ptr<arrow::RecordBatch>& GetResultBatch() const { @@ -44,16 +57,27 @@ public: TPartialReadResult() = default; - explicit TPartialReadResult(std::shared_ptr<arrow::RecordBatch> batch) - : ResultBatch(batch) + explicit TPartialReadResult( + TScanMemoryLimiter::IMemoryAccessor::TPtr memoryAccessor, + const std::shared_ptr<NOlap::TMemoryAggregation>& memoryAggregation, + std::shared_ptr<arrow::RecordBatch> batch) + : MemoryGuard(std::make_shared<TScanMemoryLimiter::TGuard>(memoryAccessor, memoryAggregation)) + , ResultBatch(batch) { + MemoryGuard->Take(NArrow::GetBatchDataSize(ResultBatch)); + MemoryGuard->Take(NArrow::GetBatchDataSize(LastReadKey)); } explicit TPartialReadResult( + TScanMemoryLimiter::IMemoryAccessor::TPtr memoryAccessor, + const std::shared_ptr<NOlap::TMemoryAggregation>& memoryAggregation, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<arrow::RecordBatch> lastKey) - : ResultBatch(batch) + : MemoryGuard(std::make_shared<TScanMemoryLimiter::TGuard>(memoryAccessor, memoryAggregation)) + , ResultBatch(batch) , LastReadKey(lastKey) { + MemoryGuard->Take(NArrow::GetBatchDataSize(ResultBatch)); + MemoryGuard->Take(NArrow::GetBatchDataSize(LastReadKey)); } }; } diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp index b7ae2bf23b..5e77fafc81 100644 --- a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp @@ -15,7 +15,7 @@ NKikimr::NOlap::TPartialReadResult TStatsIterator::GetBatch() { // Leave only requested columns auto resultBatch = NArrow::ExtractColumns(batch, ResultSchema); - NOlap::TPartialReadResult out(resultBatch, lastKey); + NOlap::TPartialReadResult out(nullptr, nullptr, resultBatch, lastKey); out.ApplyProgram(ReadMetadata->GetProgram()); return std::move(out); diff --git a/ydb/core/tx/columnshard/counters/scan.cpp b/ydb/core/tx/columnshard/counters/scan.cpp index 763008acb0..1a0dc609a7 100644 --- a/ydb/core/tx/columnshard/counters/scan.cpp +++ b/ydb/core/tx/columnshard/counters/scan.cpp @@ -45,8 +45,8 @@ TScanCounters::TScanCounters(const TString& module) { } -std::shared_ptr<NKikimr::NColumnShard::TScanAggregations> TScanCounters::BuildAggregations() { - return std::make_shared<TScanAggregations>(GetModuleId()); +NKikimr::NColumnShard::TScanAggregations TScanCounters::BuildAggregations() { + return TScanAggregations(GetModuleId()); } } diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h index bad3bf50b3..a917eb8780 100644 --- a/ydb/core/tx/columnshard/counters/scan.h +++ b/ydb/core/tx/columnshard/counters/scan.h @@ -1,89 +1,28 @@ #pragma once #include "common/owner.h" +#include <ydb/core/tx/columnshard/resources/memory.h> #include <library/cpp/monlib/dynamic_counters/counters.h> namespace NKikimr::NColumnShard { -class TScanDataClassAggregation: public TCommonCountersOwner { -private: - using TBase = TCommonCountersOwner; - NMonitoring::TDynamicCounters::TCounterPtr DeriviativeInFlightBytes; - NMonitoring::TDynamicCounters::TCounterPtr DeriviativeInFlightCount; - std::shared_ptr<TValueAggregationClient> InFlightBytes; - std::shared_ptr<TValueAggregationClient> InFlightCount; -public: - TScanDataClassAggregation(const TString& moduleId, const TString& signalId) - : TBase(moduleId) - { - DeriviativeInFlightCount = TBase::GetDeriviative(signalId + "/Count"); - DeriviativeInFlightBytes = TBase::GetDeriviative(signalId + "/Bytes"); - InFlightCount = TBase::GetValueAutoAggregationsClient(signalId + "/Count"); - InFlightBytes = TBase::GetValueAutoAggregationsClient(signalId + "/Bytes"); - } - - void AddFullInfo(const ui64 size) { - DeriviativeInFlightCount->Add(1); - DeriviativeInFlightBytes->Add(size); - InFlightCount->Add(1); - InFlightBytes->Add(size); - } - - void RemoveFullInfo(const ui64 size) { - InFlightCount->Remove(1); - InFlightBytes->Remove(size); - } - - void AddCount() { - DeriviativeInFlightCount->Add(1); - InFlightCount->Add(1); - } - - void AddBytes(const ui64 size) { - DeriviativeInFlightBytes->Add(size); - InFlightBytes->Add(size); - } -}; - class TScanAggregations { private: using TBase = TCommonCountersOwner; - TScanDataClassAggregation ReadBlobs; - TScanDataClassAggregation GranulesProcessing; - TScanDataClassAggregation GranulesReady; + std::shared_ptr<NOlap::TMemoryAggregation> ReadBlobs; + std::shared_ptr<NOlap::TMemoryAggregation> GranulesProcessing; + std::shared_ptr<NOlap::TMemoryAggregation> GranulesReady; + std::shared_ptr<NOlap::TMemoryAggregation> ResultsReady; public: TScanAggregations(const TString& moduleId) - : ReadBlobs(moduleId, "InFlight/Blobs/Read") - , GranulesProcessing(moduleId, "InFlight/Granules/Processing") - , GranulesReady(moduleId, "InFlight/Granules/Ready") - { - } - - void AddFlightReadInfo(const ui64 size) { - ReadBlobs.AddFullInfo(size); + : GranulesProcessing(std::make_shared<NOlap::TMemoryAggregation>(moduleId, "InFlight/Granules/Processing")) + , ResultsReady(std::make_shared<NOlap::TMemoryAggregation>(moduleId, "InFlight/Results/Ready")) { } - void RemoveFlightReadInfo(const ui64 size) { - ReadBlobs.RemoveFullInfo(size); + const std::shared_ptr<NOlap::TMemoryAggregation>& GetGranulesProcessing() const { + return GranulesProcessing; } - - void AddGranuleProcessing() { - GranulesProcessing.AddCount(); - } - - void AddGranuleProcessingBytes(const ui64 size) { - GranulesProcessing.AddBytes(size); - } - - void RemoveGranuleProcessingInfo(const ui64 size) { - GranulesProcessing.RemoveFullInfo(size); - } - - void AddGranuleReady(const ui64 size) { - GranulesReady.AddFullInfo(size); - } - - void RemoveGranuleReady(const ui64 size) { - GranulesReady.RemoveFullInfo(size); + const std::shared_ptr<NOlap::TMemoryAggregation>& GetResultsReady() const { + return ResultsReady; } }; @@ -153,14 +92,14 @@ public: ReadingOverload->Add(1); } - std::shared_ptr<TScanAggregations> BuildAggregations(); + TScanAggregations BuildAggregations(); }; class TConcreteScanCounters: public TScanCounters { private: using TBase = TScanCounters; public: - std::shared_ptr<TScanAggregations> Aggregations; + TScanAggregations Aggregations; TConcreteScanCounters(const TScanCounters& counters) : TBase(counters) diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 298530843c..45241701a0 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -255,16 +255,16 @@ std::vector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t // Extract ready to out granules: ready granules that are not blocked by other (not ready) granules Y_VERIFY(GranulesContext); - auto out = MakeResult(ReadyToOut(), maxRowsInBatch); + auto out = ReadyToOut(maxRowsInBatch); const bool requireResult = GranulesContext->IsFinished(); // not indexed or the last indexed read (even if it's empty) if (requireResult && out.empty()) { - out.push_back(TPartialReadResult(NArrow::MakeEmptyBatch(ReadMetadata->GetResultSchema()))); + out.push_back(TPartialReadResult(GetMemoryAccessor(), Context.GetCounters().Aggregations.GetResultsReady(), NArrow::MakeEmptyBatch(ReadMetadata->GetResultSchema()))); } return out; } /// @return batches that are not blocked by others -std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::ReadyToOut() { +std::vector<TPartialReadResult> TIndexedReadData::ReadyToOut(const i64 maxRowsInBatch) { Y_VERIFY(SortReplaceDescription); Y_VERIFY(GranulesContext); @@ -306,7 +306,7 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData:: NotIndexedOutscopeBatch = nullptr; } - return out; + return MakeResult(std::move(out), maxRowsInBatch); } std::shared_ptr<arrow::RecordBatch> @@ -333,8 +333,7 @@ TIndexedReadData::MergeNotIndexed(std::vector<std::shared_ptr<arrow::RecordBatch return merged; } -// TODO: better implementation -static void MergeTooSmallBatches(std::vector<TPartialReadResult>& out) { +void TIndexedReadData::MergeTooSmallBatches(const std::shared_ptr<TMemoryAggregation>& memAggregation, std::vector<TPartialReadResult>& out) const { if (out.size() < 10) { return; } @@ -367,7 +366,7 @@ static void MergeTooSmallBatches(std::vector<TPartialReadResult>& out) { auto batch = NArrow::ToBatch(*res); std::vector<TPartialReadResult> merged; - merged.emplace_back(TPartialReadResult(batch, out.back().GetLastReadKey())); + merged.emplace_back(TPartialReadResult(GetMemoryAccessor(), memAggregation, batch, out.back().GetLastReadKey())); out.swap(merged); } @@ -422,12 +421,12 @@ std::vector<TPartialReadResult> TIndexedReadData::MakeResult(std::vector<std::ve // Leave only requested columns auto resultBatch = NArrow::ExtractColumns(batch, ReadMetadata->GetResultSchema()); - out.emplace_back(TPartialReadResult(resultBatch, lastKey)); + out.emplace_back(TPartialReadResult(GetMemoryAccessor(), Context.GetCounters().Aggregations.GetResultsReady(), resultBatch, lastKey)); } } if (ReadMetadata->GetProgram().HasProgram()) { - MergeTooSmallBatches(out); + MergeTooSmallBatches(Context.GetCounters().Aggregations.GetResultsReady(), out); for (auto& result : out) { result.ApplyProgram(ReadMetadata->GetProgram()); } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index 8d05897935..c1b31db0dd 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -52,6 +52,10 @@ public: ; } + const std::shared_ptr<TActorBasedMemoryAccesor>& GetMemoryAccessor() const { + return Context.GetMemoryAccessor(); + } + const NColumnShard::TConcreteScanCounters& GetCounters() const noexcept { return Context.GetCounters(); } @@ -101,7 +105,8 @@ private: std::shared_ptr<arrow::RecordBatch> MergeNotIndexed( std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const; - std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> ReadyToOut(); + std::vector<TPartialReadResult> ReadyToOut(const i64 maxRowsInBatch); + void MergeTooSmallBatches(const std::shared_ptr<TMemoryAggregation>& memAggregation, std::vector<TPartialReadResult>& out) const; std::vector<TPartialReadResult> MakeResult( std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&& granules, int64_t maxRowsInBatch) const; }; diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt index 528f351f28..bd3a1e2c7f 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt @@ -26,6 +26,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC core-formats-arrow columnshard-engines-predicate columnshard-hooks-abstract + tx-columnshard-resources core-tx-program engines-reader-order_control columnshard-engines-scheme diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt index 056b297152..da51f9cbc2 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt @@ -27,6 +27,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC core-formats-arrow columnshard-engines-predicate columnshard-hooks-abstract + tx-columnshard-resources core-tx-program engines-reader-order_control columnshard-engines-scheme diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt index 056b297152..da51f9cbc2 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt @@ -27,6 +27,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC core-formats-arrow columnshard-engines-predicate columnshard-hooks-abstract + tx-columnshard-resources core-tx-program engines-reader-order_control columnshard-engines-scheme diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt index 528f351f28..bd3a1e2c7f 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt @@ -26,6 +26,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC core-formats-arrow columnshard-engines-predicate columnshard-hooks-abstract + tx-columnshard-resources core-tx-program engines-reader-order_control columnshard-engines-scheme diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp index 93eb1debc3..b7bc120a42 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp @@ -1,6 +1,7 @@ #include "filling_context.h" #include "order_control/not_sorted.h" #include <ydb/core/tx/columnshard/engines/indexed_read_data.h> +#include <ydb/core/tx/columnshard/resources/memory.h> #include <util/string/join.h> namespace NKikimr::NOlap::NIndexedReader { @@ -8,7 +9,7 @@ namespace NKikimr::NOlap::NIndexedReader { TGranulesFillingContext::TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading) : ReadMetadata(readMetadata) , InternalReading(internalReading) - , Processing(owner.GetCounters()) + , Processing(owner.GetMemoryAccessor(), owner.GetCounters()) , Result(owner.GetCounters()) , GranulesLiveContext(std::make_shared<TGranulesLiveControl>()) , Owner(owner) @@ -67,10 +68,11 @@ void TGranulesFillingContext::DrainNotIndexedBatches(THashMap<ui64, std::shared_ bool TGranulesFillingContext::TryStartProcessGranule(const ui64 granuleId, const TBlobRange& range, const bool hasReadyResults) { Y_VERIFY_DEBUG(!Result.IsReady(granuleId)); - if (InternalReading || Processing.IsInProgress(granuleId) || (!hasReadyResults && !GranulesLiveContext->GetCount())) { - Processing.StartBlobProcessing(granuleId, range); - return true; - } else if (CheckBufferAvailable()) { + if (InternalReading || Processing.IsInProgress(granuleId) + || (!GranulesLiveContext->GetCount() && !hasReadyResults) + || GetMemoryAccessor()->GetLimiter().HasBufferOrSubscribe(GetMemoryAccessor()) + ) + { Processing.StartBlobProcessing(granuleId, range); return true; } else { @@ -78,11 +80,6 @@ bool TGranulesFillingContext::TryStartProcessGranule(const ui64 granuleId, const } } -bool TGranulesFillingContext::CheckBufferAvailable() const { - return Result.GetCount() + Processing.GetCount() < GranulesCountProcessingLimit || - Result.GetBlobsSize() + Processing.GetBlobsSize() < ProcessingBytesLimit; -} - bool TGranulesFillingContext::ForceStartProcessGranule(const ui64 granuleId, const TBlobRange& range) { Y_VERIFY_DEBUG(!Result.IsReady(granuleId)); Processing.StartBlobProcessing(granuleId, range); @@ -98,4 +95,8 @@ std::vector<NKikimr::NOlap::NIndexedReader::TGranule::TPtr> TGranulesFillingCont return SortingPolicy->DetachReadyGranules(Result); } +const std::shared_ptr<NKikimr::NOlap::TActorBasedMemoryAccesor>& TGranulesFillingContext::GetMemoryAccessor() const { + return Owner.GetMemoryAccessor(); +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h index f9f7a45d78..e76f2f6993 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.h +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h @@ -30,9 +30,6 @@ private: NColumnShard::TConcreteScanCounters Counters; bool PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const; - static constexpr ui32 GranulesCountProcessingLimit = 16; - static constexpr ui64 ExpectedBytesForGranule = 50 * 1024 * 1024; - static constexpr i64 ProcessingBytesLimit = GranulesCountProcessingLimit * ExpectedBytesForGranule; bool CheckBufferAvailable() const; public: std::shared_ptr<TGranulesLiveControl> GetGranulesLiveContext() const { @@ -43,7 +40,9 @@ public: } bool ForceStartProcessGranule(const ui64 granuleId, const TBlobRange& range); bool TryStartProcessGranule(const ui64 granuleId, const TBlobRange & range, const bool hasReadyResults); - TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData & owner, const bool internalReading); + TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading); + + const std::shared_ptr<TActorBasedMemoryAccesor>& GetMemoryAccessor() const; TString DebugString() const { return TStringBuilder() diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp index f641b54358..018c9bafd7 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.cpp +++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp @@ -3,10 +3,12 @@ #include <ydb/core/tx/columnshard/engines/portion_info.h> #include <ydb/core/tx/columnshard/engines/indexed_read_data.h> #include <ydb/core/tx/columnshard/engines/filter.h> +#include <ydb/core/tx/columnshard/engines/index_info.h> namespace NKikimr::NOlap::NIndexedReader { void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) { + RawDataSizeReal += NArrow::GetBatchDataSize(batch); if (Owner->GetSortingPolicy()->CanInterrupt() && ReadyFlag) { return; } @@ -31,7 +33,13 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco CheckReady(); } -NKikimr::NOlap::NIndexedReader::TBatch& TGranule::RegisterBatchForFetching(const TPortionInfo& portionInfo) { +TBatch& TGranule::RegisterBatchForFetching(const TPortionInfo& portionInfo) { + const ui64 batchSize = portionInfo.GetRawBytes(Owner->GetReadMetadata()->GetAllColumns()); + RawDataSize += batchSize; + const ui64 filtersSize = portionInfo.NumRows() * (8 + 8); + RawDataSize += filtersSize; + ACFL_DEBUG("event", "RegisterBatchForFetching")("columns_count", Owner->GetReadMetadata()->GetAllColumns().size())("batch_raw_size", batchSize)("granule_size", RawDataSize); + Y_VERIFY(!ReadyFlag); ui32 batchGranuleIdx = Batches.size(); WaitBatches.emplace(batchGranuleIdx); @@ -104,6 +112,7 @@ void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) { } NotIndexedBatchReadyFlag = true; if (batch && batch->num_rows()) { + GranuleDataSize.Take(NArrow::GetBatchDataSize(batch)); Y_VERIFY(!NotIndexedBatch); NotIndexedBatch = batch; if (NotIndexedBatch) { @@ -119,6 +128,7 @@ void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) { void TGranule::CheckReady() { if (WaitBatches.empty() && NotIndexedBatchReadyFlag) { ReadyFlag = true; + ACFL_DEBUG("event", "granule_ready")("predicted_size", RawDataSize)("real_size", RawDataSizeReal); Owner->OnGranuleReady(GranuleId); } } @@ -129,7 +139,6 @@ void TGranule::OnBlobReady(const TBlobRange& range) noexcept { return; } Y_VERIFY(!ReadyFlag); - BlobsDataSize += range.Size; Owner->OnBlobReady(GranuleId, range); } @@ -143,6 +152,7 @@ TGranule::TGranule(const ui64 granuleId, TGranulesFillingContext& owner) : GranuleId(granuleId) , LiveController(owner.GetGranulesLiveContext()) , Owner(&owner) + , GranuleDataSize(Owner->GetMemoryAccessor(), Owner->GetCounters().Aggregations.GetGranulesProcessing()) { } @@ -150,6 +160,7 @@ TGranule::TGranule(const ui64 granuleId, TGranulesFillingContext& owner) void TGranule::StartConstruction() { InConstruction = true; LiveController->Inc(); + GranuleDataSize.Take(RawDataSize); } } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h index 5a23342722..95754c8cb1 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.h +++ b/ydb/core/tx/columnshard/engines/reader/granule.h @@ -31,6 +31,8 @@ public: using TPtr = std::shared_ptr<TGranule>; private: ui64 GranuleId = 0; + ui64 RawDataSize = 0; + ui64 RawDataSizeReal = 0; bool NotIndexedBatchReadyFlag = false; bool InConstruction = false; @@ -46,16 +48,13 @@ private: std::shared_ptr<TGranulesLiveControl> LiveController; TGranulesFillingContext* Owner = nullptr; THashSet<const void*> BatchesToDedup; - ui64 BlobsDataSize = 0; + + TScanMemoryLimiter::TGuard GranuleDataSize; void CheckReady(); public: TGranule(const ui64 granuleId, TGranulesFillingContext& owner); ~TGranule(); - ui64 GetBlobsDataSize() const noexcept { - return BlobsDataSize; - } - ui64 GetGranuleId() const noexcept { return GranuleId; } diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp index 552ec7c9b0..e366ce42a1 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp +++ b/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp @@ -6,9 +6,6 @@ TGranule::TPtr TResultController::ExtractFirst() { TGranule::TPtr result; if (GranulesToOut.size()) { result = GranulesToOut.begin()->second; - Counters.Aggregations->RemoveGranuleReady(result->GetBlobsDataSize()); - BlobsSize -= result->GetBlobsDataSize(); - Y_VERIFY(BlobsSize >= 0); GranulesToOut.erase(GranulesToOut.begin()); } return result; @@ -17,8 +14,6 @@ TGranule::TPtr TResultController::ExtractFirst() { void TResultController::AddResult(TGranule::TPtr granule) { Y_VERIFY(GranulesToOut.emplace(granule->GetGranuleId(), granule).second); Y_VERIFY(ReadyGranulesAccumulator.emplace(granule->GetGranuleId()).second); - BlobsSize += granule->GetBlobsDataSize(); - Counters.Aggregations->AddGranuleReady(granule->GetBlobsDataSize()); } TGranule::TPtr TResultController::ExtractResult(const ui64 granuleId) { @@ -28,9 +23,6 @@ TGranule::TPtr TResultController::ExtractResult(const ui64 granuleId) { } TGranule::TPtr result = it->second; GranulesToOut.erase(it); - Counters.Aggregations->RemoveGranuleReady(result->GetBlobsDataSize()); - BlobsSize -= result->GetBlobsDataSize(); - Y_VERIFY(BlobsSize >= 0); return result; } diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/result.h b/ydb/core/tx/columnshard/engines/reader/order_control/result.h index 509d4e1d82..84a4366749 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_control/result.h +++ b/ydb/core/tx/columnshard/engines/reader/order_control/result.h @@ -8,7 +8,6 @@ class TResultController { protected: THashMap<ui64, TGranule::TPtr> GranulesToOut; std::set<ui64> ReadyGranulesAccumulator; - i64 BlobsSize = 0; const NColumnShard::TConcreteScanCounters Counters; public: TString DebugString() const { @@ -26,17 +25,12 @@ public: void Clear() { GranulesToOut.clear(); - BlobsSize = 0; } bool IsReady(const ui64 granuleId) const { return ReadyGranulesAccumulator.contains(granuleId); } - ui64 GetBlobsSize() const { - return BlobsSize; - } - ui32 GetCount() const { return GranulesToOut.size(); } diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp index 76012f89ff..2a91e4af26 100644 --- a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp @@ -23,6 +23,7 @@ void TProcessingController::DrainNotIndexedBatches(THashMap<ui64, std::shared_pt batches->erase(it); } } + GuardZeroGranuleData.FreeAll(); } NKikimr::NOlap::NIndexedReader::TBatch* TProcessingController::GetBatchInfo(const TBatchAddress& address) { @@ -40,10 +41,7 @@ TGranule::TPtr TProcessingController::ExtractReadyVerified(const ui64 granuleId) Y_VERIFY(it != GranulesWaiting.end()); TGranule::TPtr result = it->second; GranulesInProcessing.erase(granuleId); - BlobsSize -= result->GetBlobsDataSize(); - Y_VERIFY(BlobsSize >= 0); GranulesWaiting.erase(it); - Counters.Aggregations->RemoveGranuleProcessingInfo(result->GetBlobsDataSize()); return result; } @@ -68,26 +66,22 @@ TGranule::TPtr TProcessingController::InsertGranule(TGranule::TPtr g) { } void TProcessingController::StartBlobProcessing(const ui64 granuleId, const TBlobRange& range) { - Counters.Aggregations->AddGranuleProcessingBytes(range.Size); if (GranulesInProcessing.emplace(granuleId).second) { if (granuleId) { GetGranuleVerified(granuleId)->StartConstruction(); Y_VERIFY(GranulesWaiting.contains(granuleId)); - Counters.Aggregations->AddGranuleProcessing(); } } if (!granuleId) { Y_VERIFY(!NotIndexedBatchesInitialized); + GuardZeroGranuleData.Take(range.Size); } - BlobsSize += range.Size; } void TProcessingController::Abort() { NotIndexedBatchesInitialized = true; GranulesWaiting.clear(); GranulesInProcessing.clear(); - Counters.Aggregations->RemoveGranuleProcessingInfo(BlobsSize); - BlobsSize = 0; } TString TProcessingController::DebugString() const { diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.h b/ydb/core/tx/columnshard/engines/reader/processing_context.h index 587eee7b55..bddd479e08 100644 --- a/ydb/core/tx/columnshard/engines/reader/processing_context.h +++ b/ydb/core/tx/columnshard/engines/reader/processing_context.h @@ -1,6 +1,7 @@ #pragma once #include "granule.h" #include <ydb/core/tx/columnshard/counters/scan.h> +#include <ydb/core/tx/columnshard/resources/memory.h> namespace NKikimr::NOlap::NIndexedReader { @@ -10,17 +11,18 @@ private: ui32 OriginalGranulesCount = 0; ui64 CommonGranuleData = 0; std::set<ui64> GranulesInProcessing; - i64 BlobsSize = 0; bool NotIndexedBatchesInitialized = false; const NColumnShard::TConcreteScanCounters Counters; + TScanMemoryLimiter::TGuard GuardZeroGranuleData; public: TString DebugString() const; bool IsGranuleActualForProcessing(const ui64 granuleId) const { return GranulesWaiting.contains(granuleId) || (granuleId == 0 && !NotIndexedBatchesInitialized); } - TProcessingController(const NColumnShard::TConcreteScanCounters& counters) + TProcessingController(TScanMemoryLimiter::IMemoryAccessor::TPtr memoryAccessor, const NColumnShard::TConcreteScanCounters& counters) : Counters(counters) + , GuardZeroGranuleData(memoryAccessor, Counters.Aggregations.GetGranulesProcessing()) { } @@ -47,10 +49,6 @@ public: void Abort(); - ui64 GetBlobsSize() const { - return BlobsSize; - } - ui32 GetCount() const { return GranulesInProcessing.size(); } diff --git a/ydb/core/tx/columnshard/engines/reader/read_context.cpp b/ydb/core/tx/columnshard/engines/reader/read_context.cpp index 390bc57ae8..33f87457f1 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_context.cpp @@ -4,11 +4,17 @@ namespace NKikimr::NOlap { TReadContext::TReadContext(const NColumnShard::TDataTasksProcessorContainer& processor, - const NColumnShard::TConcreteScanCounters& counters) + const NColumnShard::TConcreteScanCounters& counters, + std::shared_ptr<NOlap::TActorBasedMemoryAccesor> memoryAccessor) : Processor(processor) , Counters(counters) + , MemoryAccessor(memoryAccessor) { } +void TActorBasedMemoryAccesor::DoOnBufferReady() { + OwnerId.Send(OwnerId, new NActors::TEvents::TEvWakeup(1)); +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/read_context.h b/ydb/core/tx/columnshard/engines/reader/read_context.h index 09ede34dc0..5ab203f798 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_context.h +++ b/ydb/core/tx/columnshard/engines/reader/read_context.h @@ -1,21 +1,38 @@ #pragma once #include "conveyor_task.h" #include <ydb/core/tx/columnshard/counters/scan.h> +#include <ydb/core/tx/columnshard/resources/memory.h> #include <library/cpp/actors/core/actor.h> namespace NKikimr::NOlap { +class TActorBasedMemoryAccesor: public TScanMemoryLimiter::IMemoryAccessor { +private: + using TBase = TScanMemoryLimiter::IMemoryAccessor; + const NActors::TActorIdentity OwnerId; +protected: + virtual void DoOnBufferReady() override; +public: + TActorBasedMemoryAccesor(const NActors::TActorIdentity& ownerId, const TString& limiterName) + : TBase(TMemoryLimitersController::GetLimiter(limiterName)) + , OwnerId(ownerId) { + + } +}; + class TReadContext { private: YDB_ACCESSOR_DEF(NColumnShard::TDataTasksProcessorContainer, Processor); const NColumnShard::TConcreteScanCounters Counters; + YDB_READONLY_DEF(std::shared_ptr<NOlap::TActorBasedMemoryAccesor>, MemoryAccessor); public: const NColumnShard::TConcreteScanCounters& GetCounters() const { return Counters; } TReadContext(const NColumnShard::TDataTasksProcessorContainer& processor, - const NColumnShard::TConcreteScanCounters& counters + const NColumnShard::TConcreteScanCounters& counters, + std::shared_ptr<NOlap::TActorBasedMemoryAccesor> memoryAccessor ); TReadContext(const NColumnShard::TConcreteScanCounters& counters) diff --git a/ydb/core/tx/columnshard/engines/reader/ya.make b/ydb/core/tx/columnshard/engines/reader/ya.make index b90f70893c..4894dbca62 100644 --- a/ydb/core/tx/columnshard/engines/reader/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/ya.make @@ -22,6 +22,7 @@ PEERDIR( ydb/core/formats/arrow ydb/core/tx/columnshard/engines/predicate ydb/core/tx/columnshard/hooks/abstract + ydb/core/tx/columnshard/resources ydb/core/tx/program ydb/core/tx/columnshard/engines/reader/order_control ydb/core/tx/columnshard/engines/scheme diff --git a/ydb/core/tx/columnshard/resources/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/resources/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..9b2360f6b4 --- /dev/null +++ b/ydb/core/tx/columnshard/resources/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-resources) +target_link_libraries(tx-columnshard-resources PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow +) +target_sources(tx-columnshard-resources PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resources/memory.cpp +) diff --git a/ydb/core/tx/columnshard/resources/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/resources/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..6248df7a86 --- /dev/null +++ b/ydb/core/tx/columnshard/resources/CMakeLists.linux-aarch64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-resources) +target_link_libraries(tx-columnshard-resources PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow +) +target_sources(tx-columnshard-resources PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resources/memory.cpp +) diff --git a/ydb/core/tx/columnshard/resources/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/resources/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..6248df7a86 --- /dev/null +++ b/ydb/core/tx/columnshard/resources/CMakeLists.linux-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-resources) +target_link_libraries(tx-columnshard-resources PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow +) +target_sources(tx-columnshard-resources PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resources/memory.cpp +) diff --git a/ydb/core/tx/columnshard/resources/CMakeLists.txt b/ydb/core/tx/columnshard/resources/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/core/tx/columnshard/resources/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/columnshard/resources/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/resources/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..9b2360f6b4 --- /dev/null +++ b/ydb/core/tx/columnshard/resources/CMakeLists.windows-x86_64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-resources) +target_link_libraries(tx-columnshard-resources PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-formats-arrow +) +target_sources(tx-columnshard-resources PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resources/memory.cpp +) diff --git a/ydb/core/tx/columnshard/resources/memory.cpp b/ydb/core/tx/columnshard/resources/memory.cpp new file mode 100644 index 0000000000..a6551393e6 --- /dev/null +++ b/ydb/core/tx/columnshard/resources/memory.cpp @@ -0,0 +1,102 @@ +#include "memory.h" +#include <util/system/guard.h> + +namespace NKikimr::NOlap { + +TScanMemoryCounter::TScanMemoryCounter(const TString& limitName, const ui64 memoryLimit) + : TBase(limitName) +{ + AvailableMemory = TBase::GetValue("Available"); + MinimalMemory = TBase::GetValue("Minimal"); + DeriviativeTake = TBase::GetDeriviative("Take"); + DeriviativeFree = TBase::GetDeriviative("Free"); + AvailableMemory->Set(memoryLimit); + + DeriviativeWaitingStart = TBase::GetDeriviative("Waiting/Start"); + DeriviativeWaitingFinish = TBase::GetDeriviative("Waiting/Finish"); + CurrentInWaiting = TBase::GetValue("InWaiting"); +} + +void TScanMemoryLimiter::Free(const ui64 size) { + const i64 newVal = AvailableMemory.Add(size); + Y_VERIFY(newVal <= AvailableMemoryLimit); + Counters.Free(size); + if (newVal > 0 && newVal <= (i64)size) { + std::vector<std::shared_ptr<IMemoryAccessor>> accessors; + { + ::TGuard<TMutex> g(Mutex); + while (AvailableMemory.Val() > 0 && InWaiting.size()) { + accessors.emplace_back(InWaiting.front()); + InWaiting.pop_front(); + Counters.WaitFinish(); + } + WaitingCounter = InWaiting.size(); + } + for (auto&& i : accessors) { + i->OnBufferReady(); + } + } +} + +bool TScanMemoryLimiter::HasBufferOrSubscribe(std::shared_ptr<IMemoryAccessor> accessor) { + if (AvailableMemory.Val() > 0) { + return true; + } else if (!accessor) { + return false; + } + ::TGuard<TMutex> g(Mutex); + if (accessor->InWaiting()) { + return false; + } + if (AvailableMemory.Val() > 0) { + return true; + } + accessor->StartWaiting(); + WaitingCounter.Inc(); + InWaiting.emplace_back(accessor); + Counters.WaitStart(); + return false; +} + +void TScanMemoryLimiter::Take(const ui64 size) { + const i64 val = AvailableMemory.Sub(size); + Counters.Take(size); + AFL_TRACE(NKikimrServices::OBJECTS_MONITORING)("current_memory", val)("min", MinMemory)("event", "take"); +// ::TGuard<TMutex> g(Mutex); +// if (MinMemory > val) { +// MinMemory = val; +// Counters.OnMinimal(val); +// } +} + +void TScanMemoryLimiter::TGuard::FreeAll() { + if (MemorySignals) { + MemorySignals->RemoveBytes(Value.Val()); + } + if (MemoryAccessor) { + MemoryAccessor->Free(Value.Val()); + } + Value = 0; +} + +void TScanMemoryLimiter::TGuard::Free(const ui64 size) { + if (MemoryAccessor) { + MemoryAccessor->Free(size); + } + Y_VERIFY(Value.Sub(size) >= 0); + if (MemorySignals) { + MemorySignals->RemoveBytes(size); + } +} + +void TScanMemoryLimiter::TGuard::Take(const ui64 size) { + if (MemoryAccessor) { + MemoryAccessor->Take(size); + } + Value.Add(size); + if (MemorySignals) { + MemorySignals->AddBytes(size); + } +} + +} diff --git a/ydb/core/tx/columnshard/resources/memory.h b/ydb/core/tx/columnshard/resources/memory.h new file mode 100644 index 0000000000..b819b4703e --- /dev/null +++ b/ydb/core/tx/columnshard/resources/memory.h @@ -0,0 +1,197 @@ +#pragma once +#include <ydb/core/tx/columnshard/counters/common/owner.h> +#include <ydb/core/protos/services.pb.h> +#include <library/cpp/actors/core/log.h> +#include <util/system/mutex.h> + +namespace NKikimr::NOlap { + +class TMemoryAggregation: public NColumnShard::TCommonCountersOwner { +private: + using TBase = TCommonCountersOwner; + NMonitoring::TDynamicCounters::TCounterPtr DeriviativeAddInFlightBytes; + NMonitoring::TDynamicCounters::TCounterPtr DeriviativeRemoveInFlightBytes; + std::shared_ptr<NColumnShard::TValueAggregationClient> InFlightBytes; +public: + TMemoryAggregation(const TString& moduleId, const TString& signalId) + : TBase(moduleId) { + DeriviativeAddInFlightBytes = TBase::GetDeriviative(signalId + "/Add/Bytes"); + DeriviativeRemoveInFlightBytes = TBase::GetDeriviative(signalId + "/Remove/Bytes"); + InFlightBytes = TBase::GetValueAutoAggregationsClient(signalId + "/Bytes"); + } + + void AddBytes(const ui64 size) { + DeriviativeAddInFlightBytes->Add(size); + InFlightBytes->Add(size); + } + + void RemoveBytes(const ui64 size) { + DeriviativeRemoveInFlightBytes->Add(size); + InFlightBytes->Remove(size); + } +}; + +class TScanMemoryCounter: public NColumnShard::TCommonCountersOwner { +private: + using TBase = NColumnShard::TCommonCountersOwner; + NMonitoring::TDynamicCounters::TCounterPtr MinimalMemory; + NMonitoring::TDynamicCounters::TCounterPtr AvailableMemory; + NMonitoring::TDynamicCounters::TCounterPtr DeriviativeTake; + NMonitoring::TDynamicCounters::TCounterPtr DeriviativeFree; + + NMonitoring::TDynamicCounters::TCounterPtr DeriviativeWaitingStart; + NMonitoring::TDynamicCounters::TCounterPtr DeriviativeWaitingFinish; + NMonitoring::TDynamicCounters::TCounterPtr CurrentInWaiting; +public: + TScanMemoryCounter(const TString& limitName, const ui64 memoryLimit); + + void Take(const ui64 size) const { + DeriviativeTake->Add(size); + AvailableMemory->Sub(size); + } + + void OnMinimal(const ui64 size) const { + MinimalMemory->Set(size); + } + + void WaitStart() const { + DeriviativeWaitingStart->Add(1); + CurrentInWaiting->Add(1); + } + + void WaitFinish() const { + DeriviativeWaitingFinish->Add(1); + CurrentInWaiting->Sub(1); + } + + void Free(const ui64 size) const { + DeriviativeFree->Add(size); + AvailableMemory->Add(size); + } +}; + +class TScanMemoryLimiter: TNonCopyable { +public: + const TString LimiterName; + const i64 AvailableMemoryLimit; + class IMemoryAccessor; + class TGuard: TNonCopyable { + private: + TAtomicCounter Value = 0; + std::shared_ptr<IMemoryAccessor> MemoryAccessor; + std::shared_ptr<TMemoryAggregation> MemorySignals; + public: + TGuard(std::shared_ptr<IMemoryAccessor> accesor, std::shared_ptr<TMemoryAggregation> memorySignals = nullptr) + : MemoryAccessor(accesor) + , MemorySignals(memorySignals) { + } + ~TGuard() { + FreeAll(); + } + i64 GetValue() const { + return Value.Val(); + } + void FreeAll(); + void Free(const ui64 size); + void Take(const ui64 size); + }; + + class IMemoryAccessor { + private: + TAtomicCounter InWaitingFlag = 0; + std::shared_ptr<TScanMemoryLimiter> Owner; + friend class TScanMemoryLimiter; + void StartWaiting() { + Y_VERIFY(InWaitingFlag.Val() == 0); + InWaitingFlag = 1; + } + protected: + virtual void DoOnBufferReady() = 0; + public: + using TPtr = std::shared_ptr<IMemoryAccessor>; + IMemoryAccessor(std::shared_ptr<TScanMemoryLimiter> owner) + : Owner(owner) + { + + } + bool HasBuffer() { + return Owner->HasBufferOrSubscribe(nullptr); + } + + TScanMemoryLimiter& GetLimiter() const { + return *Owner; + } + + virtual ~IMemoryAccessor() = default; + void Take(const ui64 size) const { + Owner->Take(size); + } + + void Free(const ui64 size) const { + Owner->Free(size); + } + + void OnBufferReady() { + Y_VERIFY(InWaitingFlag.Val() == 1); + InWaitingFlag = 0; + DoOnBufferReady(); + } + + bool InWaiting() const { + return InWaitingFlag.Val(); + } + }; + +private: + TAtomicCounter AvailableMemory = 0; + TAtomicCounter WaitingCounter = 0; + TScanMemoryCounter Counters; + std::deque<std::shared_ptr<IMemoryAccessor>> InWaiting; + + TMutex Mutex; + i64 MinMemory = 0; + + void Free(const ui64 size); + void Take(const ui64 size); +public: + TScanMemoryLimiter(const TString& limiterName, const ui64 memoryLimit) + : LimiterName(limiterName) + , AvailableMemoryLimit(memoryLimit) + , AvailableMemory(memoryLimit) + , Counters("MemoryLimiters/" + limiterName, memoryLimit) + { + + } + bool HasBufferOrSubscribe(std::shared_ptr<IMemoryAccessor> accessor); +}; + +class TMemoryLimitersController { +private: + TRWMutex Mutex; + // limiter by limit name + THashMap<TString, std::shared_ptr<TScanMemoryLimiter>> Limiters; + static const inline ui64 StandartLimit = (ui64)1 * 1024 * 1024 * 1024; + std::shared_ptr<TScanMemoryLimiter> GetLimiterImpl(const TString& name) { + TReadGuard rg(Mutex); + auto it = Limiters.find(name); + if (it == Limiters.end()) { + rg.Release(); + TWriteGuard wg(Mutex); + it = Limiters.find(name); + if (it != Limiters.end()) { + return it->second; + } else { + return Limiters.emplace(name, std::make_shared<TScanMemoryLimiter>(name, StandartLimit)).first->second; + } + } else { + return it->second; + } + } +public: + static std::shared_ptr<TScanMemoryLimiter> GetLimiter(const TString& name) { + return Singleton<TMemoryLimitersController>()->GetLimiterImpl(name); + } + +}; + +} diff --git a/ydb/core/tx/columnshard/resources/ya.make b/ydb/core/tx/columnshard/resources/ya.make new file mode 100644 index 0000000000..8b92823eb8 --- /dev/null +++ b/ydb/core/tx/columnshard/resources/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + memory.cpp +) + +PEERDIR( + contrib/libs/apache/arrow + ydb/core/protos + ydb/core/formats/arrow +) + +END() |