aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-07-03 21:00:39 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-07-03 21:00:39 +0300
commit098093ad8fa6d9cedceb6d17acbdc575d4ceff7b (patch)
tree888d46c5c57ca0f8441b03a0de444d0ee609f747
parent01d57c707c2348a54ee38e7c89b31d76df80adfe (diff)
downloadydb-098093ad8fa6d9cedceb6d17acbdc575d4ceff7b.tar.gz
memory usage control (64Gb -> 4Gb on 100 000 000 000 records scan GROUP BY)
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.h1
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp25
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.h32
-rw-r--r--ydb/core/tx/columnshard/columnshard__stats_scan.cpp2
-rw-r--r--ydb/core/tx/columnshard/counters/scan.cpp4
-rw-r--r--ydb/core/tx/columnshard/counters/scan.h87
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp17
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h7
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.cpp21
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.h7
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.cpp15
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.h9
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/result.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/result.h6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/processing_context.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/reader/processing_context.h10
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_context.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_context.h19
-rw-r--r--ydb/core/tx/columnshard/engines/reader/ya.make1
-rw-r--r--ydb/core/tx/columnshard/resources/CMakeLists.darwin-x86_64.txt20
-rw-r--r--ydb/core/tx/columnshard/resources/CMakeLists.linux-aarch64.txt21
-rw-r--r--ydb/core/tx/columnshard/resources/CMakeLists.linux-x86_64.txt21
-rw-r--r--ydb/core/tx/columnshard/resources/CMakeLists.txt17
-rw-r--r--ydb/core/tx/columnshard/resources/CMakeLists.windows-x86_64.txt20
-rw-r--r--ydb/core/tx/columnshard/resources/memory.cpp102
-rw-r--r--ydb/core/tx/columnshard/resources/memory.h197
-rw-r--r--ydb/core/tx/columnshard/resources/ya.make13
35 files changed, 557 insertions, 151 deletions
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
index 9cda5cfe72..f11d99ca83 100644
--- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
@@ -10,6 +10,7 @@ add_subdirectory(common)
add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
+add_subdirectory(resources)
add_subdirectory(ut_rw)
add_subdirectory(ut_schema)
get_built_tool_path(
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
index 8382342f0b..589bd3ab85 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
@@ -10,6 +10,7 @@ add_subdirectory(common)
add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
+add_subdirectory(resources)
add_subdirectory(ut_rw)
add_subdirectory(ut_schema)
get_built_tool_path(
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
index 8382342f0b..589bd3ab85 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
@@ -10,6 +10,7 @@ add_subdirectory(common)
add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
+add_subdirectory(resources)
add_subdirectory(ut_rw)
add_subdirectory(ut_schema)
get_built_tool_path(
diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
index 9cda5cfe72..f11d99ca83 100644
--- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
@@ -10,6 +10,7 @@ add_subdirectory(common)
add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
+add_subdirectory(resources)
add_subdirectory(ut_rw)
add_subdirectory(ut_schema)
get_built_tool_path(
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h
index 51b35b25aa..640947f314 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.h
@@ -94,6 +94,7 @@ public:
virtual TString DebugString() const override {
return TStringBuilder()
<< "ready_results:(" << ReadyResults.DebugString() << ");"
+ << "has_buffer:" << IndexedData.GetMemoryAccessor()->HasBuffer() << ";"
<< "indexed_data:(" << IndexedData.DebugString() << ")"
;
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 791e421732..25293efc35 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -67,6 +67,8 @@ public:
};
class TColumnShardScan : public TActorBootstrapped<TColumnShardScan>, NArrow::IRowWriter {
+private:
+ std::shared_ptr<NOlap::TActorBasedMemoryAccesor> MemoryAccessor;
public:
static constexpr auto ActorActivityType() {
return NKikimrServices::TActivity::KQP_OLAP_SCAN;
@@ -101,7 +103,8 @@ public:
Schedule(Deadline, new TEvents::TEvWakeup);
Y_VERIFY(!ScanIterator);
- NOlap::TReadContext context(MakeTasksProcessor(), ScanCountersPool);
+ MemoryAccessor = std::make_shared<NOlap::TActorBasedMemoryAccesor>(SelfId(), "CSScan/Result");
+ NOlap::TReadContext context(MakeTasksProcessor(), ScanCountersPool, MemoryAccessor);
ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);
// propagate self actor id // TODO: FlagSubscribeOnSession ?
@@ -143,7 +146,6 @@ private:
if (!blobRange.BlobId.IsValid()) {
break;
}
- ScanCountersPool.Aggregations->AddFlightReadInfo(blobRange.Size);
++InFlightReads;
InFlightReadBytes += blobRange.Size;
ranges[blobRange.BlobId].emplace_back(blobRange);
@@ -344,7 +346,8 @@ private:
// * we have finished scanning ALL the ranges
// * or there is an in-flight blob read or ScanData message for which
// we will get a reply and will be able to proceed further
- if (!ScanIterator || !ChunksLimiter.HasMore() || InFlightReads != 0 || ScanIterator->HasWaitingTasks()) {
+ if (!ScanIterator || !ChunksLimiter.HasMore() || InFlightReads != 0 || ScanIterator->HasWaitingTasks()
+ || MemoryAccessor->InWaiting()) {
return;
}
}
@@ -392,12 +395,16 @@ private:
Finish();
}
- void HandleScan(TEvents::TEvWakeup::TPtr&) {
- LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
- "Scan " << ScanActorId << " guard execution timeout"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
+ void HandleScan(TEvents::TEvWakeup::TPtr& ev) {
+ if (ev->Get()->Tag) {
+ ContinueProcessing();
+ } else {
+ LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
+ "Scan " << ScanActorId << " guard execution timeout"
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
- Finish();
+ Finish();
+ }
}
private:
@@ -421,7 +428,7 @@ private:
return Finish();
}
- NOlap::TReadContext context(MakeTasksProcessor(), ScanCountersPool);
+ NOlap::TReadContext context(MakeTasksProcessor(), ScanCountersPool, MemoryAccessor);
ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);
// Used in TArrowToYdbConverter
ResultYqlSchema.clear();
diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h
index eee62328ab..f705b9701a 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.h
+++ b/ydb/core/tx/columnshard/columnshard__scan.h
@@ -2,6 +2,7 @@
#include "blob_cache.h"
#include "engines/reader/conveyor_task.h"
+#include "resources/memory.h"
#include <ydb/core/formats/arrow/size_calcer.h>
#include <ydb/core/tx/program/program.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
@@ -10,6 +11,7 @@ namespace NKikimr::NOlap {
// Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation
class TPartialReadResult {
private:
+ std::shared_ptr<TScanMemoryLimiter::TGuard> MemoryGuard;
std::shared_ptr<arrow::RecordBatch> ResultBatch;
// This 1-row batch contains the last key that was read while producing the ResultBatch.
@@ -18,18 +20,29 @@ private:
public:
void Slice(const ui32 offset, const ui32 length) {
+ const ui64 baseSize = NArrow::GetBatchDataSize(ResultBatch);
ResultBatch = ResultBatch->Slice(offset, length);
+ MemoryGuard->Take(NArrow::GetBatchDataSize(ResultBatch));
+ MemoryGuard->Free(baseSize);
}
void ApplyProgram(const NOlap::TProgramContainer& program) {
+ const ui64 baseSize = NArrow::GetBatchDataSize(ResultBatch);
auto status = program.ApplyProgram(ResultBatch);
if (!status.ok()) {
ErrorString = status.message();
+ } else {
+ MemoryGuard->Take(NArrow::GetBatchDataSize(ResultBatch));
+ MemoryGuard->Free(baseSize);
}
}
ui64 GetSize() const {
- return NArrow::GetBatchDataSize(ResultBatch);
+ if (MemoryGuard) {
+ return MemoryGuard->GetValue();
+ } else {
+ return 0;
+ }
}
const std::shared_ptr<arrow::RecordBatch>& GetResultBatch() const {
@@ -44,16 +57,27 @@ public:
TPartialReadResult() = default;
- explicit TPartialReadResult(std::shared_ptr<arrow::RecordBatch> batch)
- : ResultBatch(batch)
+ explicit TPartialReadResult(
+ TScanMemoryLimiter::IMemoryAccessor::TPtr memoryAccessor,
+ const std::shared_ptr<NOlap::TMemoryAggregation>& memoryAggregation,
+ std::shared_ptr<arrow::RecordBatch> batch)
+ : MemoryGuard(std::make_shared<TScanMemoryLimiter::TGuard>(memoryAccessor, memoryAggregation))
+ , ResultBatch(batch)
{
+ MemoryGuard->Take(NArrow::GetBatchDataSize(ResultBatch));
+ MemoryGuard->Take(NArrow::GetBatchDataSize(LastReadKey));
}
explicit TPartialReadResult(
+ TScanMemoryLimiter::IMemoryAccessor::TPtr memoryAccessor,
+ const std::shared_ptr<NOlap::TMemoryAggregation>& memoryAggregation,
std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<arrow::RecordBatch> lastKey)
- : ResultBatch(batch)
+ : MemoryGuard(std::make_shared<TScanMemoryLimiter::TGuard>(memoryAccessor, memoryAggregation))
+ , ResultBatch(batch)
, LastReadKey(lastKey)
{
+ MemoryGuard->Take(NArrow::GetBatchDataSize(ResultBatch));
+ MemoryGuard->Take(NArrow::GetBatchDataSize(LastReadKey));
}
};
}
diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp
index b7ae2bf23b..5e77fafc81 100644
--- a/ydb/core/tx/columnshard/columnshard__stats_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__stats_scan.cpp
@@ -15,7 +15,7 @@ NKikimr::NOlap::TPartialReadResult TStatsIterator::GetBatch() {
// Leave only requested columns
auto resultBatch = NArrow::ExtractColumns(batch, ResultSchema);
- NOlap::TPartialReadResult out(resultBatch, lastKey);
+ NOlap::TPartialReadResult out(nullptr, nullptr, resultBatch, lastKey);
out.ApplyProgram(ReadMetadata->GetProgram());
return std::move(out);
diff --git a/ydb/core/tx/columnshard/counters/scan.cpp b/ydb/core/tx/columnshard/counters/scan.cpp
index 763008acb0..1a0dc609a7 100644
--- a/ydb/core/tx/columnshard/counters/scan.cpp
+++ b/ydb/core/tx/columnshard/counters/scan.cpp
@@ -45,8 +45,8 @@ TScanCounters::TScanCounters(const TString& module)
{
}
-std::shared_ptr<NKikimr::NColumnShard::TScanAggregations> TScanCounters::BuildAggregations() {
- return std::make_shared<TScanAggregations>(GetModuleId());
+NKikimr::NColumnShard::TScanAggregations TScanCounters::BuildAggregations() {
+ return TScanAggregations(GetModuleId());
}
}
diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h
index bad3bf50b3..a917eb8780 100644
--- a/ydb/core/tx/columnshard/counters/scan.h
+++ b/ydb/core/tx/columnshard/counters/scan.h
@@ -1,89 +1,28 @@
#pragma once
#include "common/owner.h"
+#include <ydb/core/tx/columnshard/resources/memory.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
namespace NKikimr::NColumnShard {
-class TScanDataClassAggregation: public TCommonCountersOwner {
-private:
- using TBase = TCommonCountersOwner;
- NMonitoring::TDynamicCounters::TCounterPtr DeriviativeInFlightBytes;
- NMonitoring::TDynamicCounters::TCounterPtr DeriviativeInFlightCount;
- std::shared_ptr<TValueAggregationClient> InFlightBytes;
- std::shared_ptr<TValueAggregationClient> InFlightCount;
-public:
- TScanDataClassAggregation(const TString& moduleId, const TString& signalId)
- : TBase(moduleId)
- {
- DeriviativeInFlightCount = TBase::GetDeriviative(signalId + "/Count");
- DeriviativeInFlightBytes = TBase::GetDeriviative(signalId + "/Bytes");
- InFlightCount = TBase::GetValueAutoAggregationsClient(signalId + "/Count");
- InFlightBytes = TBase::GetValueAutoAggregationsClient(signalId + "/Bytes");
- }
-
- void AddFullInfo(const ui64 size) {
- DeriviativeInFlightCount->Add(1);
- DeriviativeInFlightBytes->Add(size);
- InFlightCount->Add(1);
- InFlightBytes->Add(size);
- }
-
- void RemoveFullInfo(const ui64 size) {
- InFlightCount->Remove(1);
- InFlightBytes->Remove(size);
- }
-
- void AddCount() {
- DeriviativeInFlightCount->Add(1);
- InFlightCount->Add(1);
- }
-
- void AddBytes(const ui64 size) {
- DeriviativeInFlightBytes->Add(size);
- InFlightBytes->Add(size);
- }
-};
-
class TScanAggregations {
private:
using TBase = TCommonCountersOwner;
- TScanDataClassAggregation ReadBlobs;
- TScanDataClassAggregation GranulesProcessing;
- TScanDataClassAggregation GranulesReady;
+ std::shared_ptr<NOlap::TMemoryAggregation> ReadBlobs;
+ std::shared_ptr<NOlap::TMemoryAggregation> GranulesProcessing;
+ std::shared_ptr<NOlap::TMemoryAggregation> GranulesReady;
+ std::shared_ptr<NOlap::TMemoryAggregation> ResultsReady;
public:
TScanAggregations(const TString& moduleId)
- : ReadBlobs(moduleId, "InFlight/Blobs/Read")
- , GranulesProcessing(moduleId, "InFlight/Granules/Processing")
- , GranulesReady(moduleId, "InFlight/Granules/Ready")
- {
- }
-
- void AddFlightReadInfo(const ui64 size) {
- ReadBlobs.AddFullInfo(size);
+ : GranulesProcessing(std::make_shared<NOlap::TMemoryAggregation>(moduleId, "InFlight/Granules/Processing"))
+ , ResultsReady(std::make_shared<NOlap::TMemoryAggregation>(moduleId, "InFlight/Results/Ready")) {
}
- void RemoveFlightReadInfo(const ui64 size) {
- ReadBlobs.RemoveFullInfo(size);
+ const std::shared_ptr<NOlap::TMemoryAggregation>& GetGranulesProcessing() const {
+ return GranulesProcessing;
}
-
- void AddGranuleProcessing() {
- GranulesProcessing.AddCount();
- }
-
- void AddGranuleProcessingBytes(const ui64 size) {
- GranulesProcessing.AddBytes(size);
- }
-
- void RemoveGranuleProcessingInfo(const ui64 size) {
- GranulesProcessing.RemoveFullInfo(size);
- }
-
- void AddGranuleReady(const ui64 size) {
- GranulesReady.AddFullInfo(size);
- }
-
- void RemoveGranuleReady(const ui64 size) {
- GranulesReady.RemoveFullInfo(size);
+ const std::shared_ptr<NOlap::TMemoryAggregation>& GetResultsReady() const {
+ return ResultsReady;
}
};
@@ -153,14 +92,14 @@ public:
ReadingOverload->Add(1);
}
- std::shared_ptr<TScanAggregations> BuildAggregations();
+ TScanAggregations BuildAggregations();
};
class TConcreteScanCounters: public TScanCounters {
private:
using TBase = TScanCounters;
public:
- std::shared_ptr<TScanAggregations> Aggregations;
+ TScanAggregations Aggregations;
TConcreteScanCounters(const TScanCounters& counters)
: TBase(counters)
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 298530843c..45241701a0 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -255,16 +255,16 @@ std::vector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t
// Extract ready to out granules: ready granules that are not blocked by other (not ready) granules
Y_VERIFY(GranulesContext);
- auto out = MakeResult(ReadyToOut(), maxRowsInBatch);
+ auto out = ReadyToOut(maxRowsInBatch);
const bool requireResult = GranulesContext->IsFinished(); // not indexed or the last indexed read (even if it's empty)
if (requireResult && out.empty()) {
- out.push_back(TPartialReadResult(NArrow::MakeEmptyBatch(ReadMetadata->GetResultSchema())));
+ out.push_back(TPartialReadResult(GetMemoryAccessor(), Context.GetCounters().Aggregations.GetResultsReady(), NArrow::MakeEmptyBatch(ReadMetadata->GetResultSchema())));
}
return out;
}
/// @return batches that are not blocked by others
-std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::ReadyToOut() {
+std::vector<TPartialReadResult> TIndexedReadData::ReadyToOut(const i64 maxRowsInBatch) {
Y_VERIFY(SortReplaceDescription);
Y_VERIFY(GranulesContext);
@@ -306,7 +306,7 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::
NotIndexedOutscopeBatch = nullptr;
}
- return out;
+ return MakeResult(std::move(out), maxRowsInBatch);
}
std::shared_ptr<arrow::RecordBatch>
@@ -333,8 +333,7 @@ TIndexedReadData::MergeNotIndexed(std::vector<std::shared_ptr<arrow::RecordBatch
return merged;
}
-// TODO: better implementation
-static void MergeTooSmallBatches(std::vector<TPartialReadResult>& out) {
+void TIndexedReadData::MergeTooSmallBatches(const std::shared_ptr<TMemoryAggregation>& memAggregation, std::vector<TPartialReadResult>& out) const {
if (out.size() < 10) {
return;
}
@@ -367,7 +366,7 @@ static void MergeTooSmallBatches(std::vector<TPartialReadResult>& out) {
auto batch = NArrow::ToBatch(*res);
std::vector<TPartialReadResult> merged;
- merged.emplace_back(TPartialReadResult(batch, out.back().GetLastReadKey()));
+ merged.emplace_back(TPartialReadResult(GetMemoryAccessor(), memAggregation, batch, out.back().GetLastReadKey()));
out.swap(merged);
}
@@ -422,12 +421,12 @@ std::vector<TPartialReadResult> TIndexedReadData::MakeResult(std::vector<std::ve
// Leave only requested columns
auto resultBatch = NArrow::ExtractColumns(batch, ReadMetadata->GetResultSchema());
- out.emplace_back(TPartialReadResult(resultBatch, lastKey));
+ out.emplace_back(TPartialReadResult(GetMemoryAccessor(), Context.GetCounters().Aggregations.GetResultsReady(), resultBatch, lastKey));
}
}
if (ReadMetadata->GetProgram().HasProgram()) {
- MergeTooSmallBatches(out);
+ MergeTooSmallBatches(Context.GetCounters().Aggregations.GetResultsReady(), out);
for (auto& result : out) {
result.ApplyProgram(ReadMetadata->GetProgram());
}
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index 8d05897935..c1b31db0dd 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -52,6 +52,10 @@ public:
;
}
+ const std::shared_ptr<TActorBasedMemoryAccesor>& GetMemoryAccessor() const {
+ return Context.GetMemoryAccessor();
+ }
+
const NColumnShard::TConcreteScanCounters& GetCounters() const noexcept {
return Context.GetCounters();
}
@@ -101,7 +105,8 @@ private:
std::shared_ptr<arrow::RecordBatch> MergeNotIndexed(
std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const;
- std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> ReadyToOut();
+ std::vector<TPartialReadResult> ReadyToOut(const i64 maxRowsInBatch);
+ void MergeTooSmallBatches(const std::shared_ptr<TMemoryAggregation>& memAggregation, std::vector<TPartialReadResult>& out) const;
std::vector<TPartialReadResult> MakeResult(
std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>>&& granules, int64_t maxRowsInBatch) const;
};
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
index 528f351f28..bd3a1e2c7f 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
@@ -26,6 +26,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC
core-formats-arrow
columnshard-engines-predicate
columnshard-hooks-abstract
+ tx-columnshard-resources
core-tx-program
engines-reader-order_control
columnshard-engines-scheme
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
index 056b297152..da51f9cbc2 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
@@ -27,6 +27,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC
core-formats-arrow
columnshard-engines-predicate
columnshard-hooks-abstract
+ tx-columnshard-resources
core-tx-program
engines-reader-order_control
columnshard-engines-scheme
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
index 056b297152..da51f9cbc2 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
@@ -27,6 +27,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC
core-formats-arrow
columnshard-engines-predicate
columnshard-hooks-abstract
+ tx-columnshard-resources
core-tx-program
engines-reader-order_control
columnshard-engines-scheme
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
index 528f351f28..bd3a1e2c7f 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
@@ -26,6 +26,7 @@ target_link_libraries(columnshard-engines-reader PUBLIC
core-formats-arrow
columnshard-engines-predicate
columnshard-hooks-abstract
+ tx-columnshard-resources
core-tx-program
engines-reader-order_control
columnshard-engines-scheme
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
index 93eb1debc3..b7bc120a42 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
@@ -1,6 +1,7 @@
#include "filling_context.h"
#include "order_control/not_sorted.h"
#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
+#include <ydb/core/tx/columnshard/resources/memory.h>
#include <util/string/join.h>
namespace NKikimr::NOlap::NIndexedReader {
@@ -8,7 +9,7 @@ namespace NKikimr::NOlap::NIndexedReader {
TGranulesFillingContext::TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading)
: ReadMetadata(readMetadata)
, InternalReading(internalReading)
- , Processing(owner.GetCounters())
+ , Processing(owner.GetMemoryAccessor(), owner.GetCounters())
, Result(owner.GetCounters())
, GranulesLiveContext(std::make_shared<TGranulesLiveControl>())
, Owner(owner)
@@ -67,10 +68,11 @@ void TGranulesFillingContext::DrainNotIndexedBatches(THashMap<ui64, std::shared_
bool TGranulesFillingContext::TryStartProcessGranule(const ui64 granuleId, const TBlobRange& range, const bool hasReadyResults) {
Y_VERIFY_DEBUG(!Result.IsReady(granuleId));
- if (InternalReading || Processing.IsInProgress(granuleId) || (!hasReadyResults && !GranulesLiveContext->GetCount())) {
- Processing.StartBlobProcessing(granuleId, range);
- return true;
- } else if (CheckBufferAvailable()) {
+ if (InternalReading || Processing.IsInProgress(granuleId)
+ || (!GranulesLiveContext->GetCount() && !hasReadyResults)
+ || GetMemoryAccessor()->GetLimiter().HasBufferOrSubscribe(GetMemoryAccessor())
+ )
+ {
Processing.StartBlobProcessing(granuleId, range);
return true;
} else {
@@ -78,11 +80,6 @@ bool TGranulesFillingContext::TryStartProcessGranule(const ui64 granuleId, const
}
}
-bool TGranulesFillingContext::CheckBufferAvailable() const {
- return Result.GetCount() + Processing.GetCount() < GranulesCountProcessingLimit ||
- Result.GetBlobsSize() + Processing.GetBlobsSize() < ProcessingBytesLimit;
-}
-
bool TGranulesFillingContext::ForceStartProcessGranule(const ui64 granuleId, const TBlobRange& range) {
Y_VERIFY_DEBUG(!Result.IsReady(granuleId));
Processing.StartBlobProcessing(granuleId, range);
@@ -98,4 +95,8 @@ std::vector<NKikimr::NOlap::NIndexedReader::TGranule::TPtr> TGranulesFillingCont
return SortingPolicy->DetachReadyGranules(Result);
}
+const std::shared_ptr<NKikimr::NOlap::TActorBasedMemoryAccesor>& TGranulesFillingContext::GetMemoryAccessor() const {
+ return Owner.GetMemoryAccessor();
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h
index f9f7a45d78..e76f2f6993 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h
@@ -30,9 +30,6 @@ private:
NColumnShard::TConcreteScanCounters Counters;
bool PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const;
- static constexpr ui32 GranulesCountProcessingLimit = 16;
- static constexpr ui64 ExpectedBytesForGranule = 50 * 1024 * 1024;
- static constexpr i64 ProcessingBytesLimit = GranulesCountProcessingLimit * ExpectedBytesForGranule;
bool CheckBufferAvailable() const;
public:
std::shared_ptr<TGranulesLiveControl> GetGranulesLiveContext() const {
@@ -43,7 +40,9 @@ public:
}
bool ForceStartProcessGranule(const ui64 granuleId, const TBlobRange& range);
bool TryStartProcessGranule(const ui64 granuleId, const TBlobRange & range, const bool hasReadyResults);
- TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData & owner, const bool internalReading);
+ TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading);
+
+ const std::shared_ptr<TActorBasedMemoryAccesor>& GetMemoryAccessor() const;
TString DebugString() const {
return TStringBuilder()
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp
index f641b54358..018c9bafd7 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp
@@ -3,10 +3,12 @@
#include <ydb/core/tx/columnshard/engines/portion_info.h>
#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
#include <ydb/core/tx/columnshard/engines/filter.h>
+#include <ydb/core/tx/columnshard/engines/index_info.h>
namespace NKikimr::NOlap::NIndexedReader {
void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) {
+ RawDataSizeReal += NArrow::GetBatchDataSize(batch);
if (Owner->GetSortingPolicy()->CanInterrupt() && ReadyFlag) {
return;
}
@@ -31,7 +33,13 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco
CheckReady();
}
-NKikimr::NOlap::NIndexedReader::TBatch& TGranule::RegisterBatchForFetching(const TPortionInfo& portionInfo) {
+TBatch& TGranule::RegisterBatchForFetching(const TPortionInfo& portionInfo) {
+ const ui64 batchSize = portionInfo.GetRawBytes(Owner->GetReadMetadata()->GetAllColumns());
+ RawDataSize += batchSize;
+ const ui64 filtersSize = portionInfo.NumRows() * (8 + 8);
+ RawDataSize += filtersSize;
+ ACFL_DEBUG("event", "RegisterBatchForFetching")("columns_count", Owner->GetReadMetadata()->GetAllColumns().size())("batch_raw_size", batchSize)("granule_size", RawDataSize);
+
Y_VERIFY(!ReadyFlag);
ui32 batchGranuleIdx = Batches.size();
WaitBatches.emplace(batchGranuleIdx);
@@ -104,6 +112,7 @@ void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) {
}
NotIndexedBatchReadyFlag = true;
if (batch && batch->num_rows()) {
+ GranuleDataSize.Take(NArrow::GetBatchDataSize(batch));
Y_VERIFY(!NotIndexedBatch);
NotIndexedBatch = batch;
if (NotIndexedBatch) {
@@ -119,6 +128,7 @@ void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) {
void TGranule::CheckReady() {
if (WaitBatches.empty() && NotIndexedBatchReadyFlag) {
ReadyFlag = true;
+ ACFL_DEBUG("event", "granule_ready")("predicted_size", RawDataSize)("real_size", RawDataSizeReal);
Owner->OnGranuleReady(GranuleId);
}
}
@@ -129,7 +139,6 @@ void TGranule::OnBlobReady(const TBlobRange& range) noexcept {
return;
}
Y_VERIFY(!ReadyFlag);
- BlobsDataSize += range.Size;
Owner->OnBlobReady(GranuleId, range);
}
@@ -143,6 +152,7 @@ TGranule::TGranule(const ui64 granuleId, TGranulesFillingContext& owner)
: GranuleId(granuleId)
, LiveController(owner.GetGranulesLiveContext())
, Owner(&owner)
+ , GranuleDataSize(Owner->GetMemoryAccessor(), Owner->GetCounters().Aggregations.GetGranulesProcessing())
{
}
@@ -150,6 +160,7 @@ TGranule::TGranule(const ui64 granuleId, TGranulesFillingContext& owner)
void TGranule::StartConstruction() {
InConstruction = true;
LiveController->Inc();
+ GranuleDataSize.Take(RawDataSize);
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h
index 5a23342722..95754c8cb1 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.h
+++ b/ydb/core/tx/columnshard/engines/reader/granule.h
@@ -31,6 +31,8 @@ public:
using TPtr = std::shared_ptr<TGranule>;
private:
ui64 GranuleId = 0;
+ ui64 RawDataSize = 0;
+ ui64 RawDataSizeReal = 0;
bool NotIndexedBatchReadyFlag = false;
bool InConstruction = false;
@@ -46,16 +48,13 @@ private:
std::shared_ptr<TGranulesLiveControl> LiveController;
TGranulesFillingContext* Owner = nullptr;
THashSet<const void*> BatchesToDedup;
- ui64 BlobsDataSize = 0;
+
+ TScanMemoryLimiter::TGuard GranuleDataSize;
void CheckReady();
public:
TGranule(const ui64 granuleId, TGranulesFillingContext& owner);
~TGranule();
- ui64 GetBlobsDataSize() const noexcept {
- return BlobsDataSize;
- }
-
ui64 GetGranuleId() const noexcept {
return GranuleId;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp
index 552ec7c9b0..e366ce42a1 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp
@@ -6,9 +6,6 @@ TGranule::TPtr TResultController::ExtractFirst() {
TGranule::TPtr result;
if (GranulesToOut.size()) {
result = GranulesToOut.begin()->second;
- Counters.Aggregations->RemoveGranuleReady(result->GetBlobsDataSize());
- BlobsSize -= result->GetBlobsDataSize();
- Y_VERIFY(BlobsSize >= 0);
GranulesToOut.erase(GranulesToOut.begin());
}
return result;
@@ -17,8 +14,6 @@ TGranule::TPtr TResultController::ExtractFirst() {
void TResultController::AddResult(TGranule::TPtr granule) {
Y_VERIFY(GranulesToOut.emplace(granule->GetGranuleId(), granule).second);
Y_VERIFY(ReadyGranulesAccumulator.emplace(granule->GetGranuleId()).second);
- BlobsSize += granule->GetBlobsDataSize();
- Counters.Aggregations->AddGranuleReady(granule->GetBlobsDataSize());
}
TGranule::TPtr TResultController::ExtractResult(const ui64 granuleId) {
@@ -28,9 +23,6 @@ TGranule::TPtr TResultController::ExtractResult(const ui64 granuleId) {
}
TGranule::TPtr result = it->second;
GranulesToOut.erase(it);
- Counters.Aggregations->RemoveGranuleReady(result->GetBlobsDataSize());
- BlobsSize -= result->GetBlobsDataSize();
- Y_VERIFY(BlobsSize >= 0);
return result;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/result.h b/ydb/core/tx/columnshard/engines/reader/order_control/result.h
index 509d4e1d82..84a4366749 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_control/result.h
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/result.h
@@ -8,7 +8,6 @@ class TResultController {
protected:
THashMap<ui64, TGranule::TPtr> GranulesToOut;
std::set<ui64> ReadyGranulesAccumulator;
- i64 BlobsSize = 0;
const NColumnShard::TConcreteScanCounters Counters;
public:
TString DebugString() const {
@@ -26,17 +25,12 @@ public:
void Clear() {
GranulesToOut.clear();
- BlobsSize = 0;
}
bool IsReady(const ui64 granuleId) const {
return ReadyGranulesAccumulator.contains(granuleId);
}
- ui64 GetBlobsSize() const {
- return BlobsSize;
- }
-
ui32 GetCount() const {
return GranulesToOut.size();
}
diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
index 76012f89ff..2a91e4af26 100644
--- a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
@@ -23,6 +23,7 @@ void TProcessingController::DrainNotIndexedBatches(THashMap<ui64, std::shared_pt
batches->erase(it);
}
}
+ GuardZeroGranuleData.FreeAll();
}
NKikimr::NOlap::NIndexedReader::TBatch* TProcessingController::GetBatchInfo(const TBatchAddress& address) {
@@ -40,10 +41,7 @@ TGranule::TPtr TProcessingController::ExtractReadyVerified(const ui64 granuleId)
Y_VERIFY(it != GranulesWaiting.end());
TGranule::TPtr result = it->second;
GranulesInProcessing.erase(granuleId);
- BlobsSize -= result->GetBlobsDataSize();
- Y_VERIFY(BlobsSize >= 0);
GranulesWaiting.erase(it);
- Counters.Aggregations->RemoveGranuleProcessingInfo(result->GetBlobsDataSize());
return result;
}
@@ -68,26 +66,22 @@ TGranule::TPtr TProcessingController::InsertGranule(TGranule::TPtr g) {
}
void TProcessingController::StartBlobProcessing(const ui64 granuleId, const TBlobRange& range) {
- Counters.Aggregations->AddGranuleProcessingBytes(range.Size);
if (GranulesInProcessing.emplace(granuleId).second) {
if (granuleId) {
GetGranuleVerified(granuleId)->StartConstruction();
Y_VERIFY(GranulesWaiting.contains(granuleId));
- Counters.Aggregations->AddGranuleProcessing();
}
}
if (!granuleId) {
Y_VERIFY(!NotIndexedBatchesInitialized);
+ GuardZeroGranuleData.Take(range.Size);
}
- BlobsSize += range.Size;
}
void TProcessingController::Abort() {
NotIndexedBatchesInitialized = true;
GranulesWaiting.clear();
GranulesInProcessing.clear();
- Counters.Aggregations->RemoveGranuleProcessingInfo(BlobsSize);
- BlobsSize = 0;
}
TString TProcessingController::DebugString() const {
diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.h b/ydb/core/tx/columnshard/engines/reader/processing_context.h
index 587eee7b55..bddd479e08 100644
--- a/ydb/core/tx/columnshard/engines/reader/processing_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/processing_context.h
@@ -1,6 +1,7 @@
#pragma once
#include "granule.h"
#include <ydb/core/tx/columnshard/counters/scan.h>
+#include <ydb/core/tx/columnshard/resources/memory.h>
namespace NKikimr::NOlap::NIndexedReader {
@@ -10,17 +11,18 @@ private:
ui32 OriginalGranulesCount = 0;
ui64 CommonGranuleData = 0;
std::set<ui64> GranulesInProcessing;
- i64 BlobsSize = 0;
bool NotIndexedBatchesInitialized = false;
const NColumnShard::TConcreteScanCounters Counters;
+ TScanMemoryLimiter::TGuard GuardZeroGranuleData;
public:
TString DebugString() const;
bool IsGranuleActualForProcessing(const ui64 granuleId) const {
return GranulesWaiting.contains(granuleId) || (granuleId == 0 && !NotIndexedBatchesInitialized);
}
- TProcessingController(const NColumnShard::TConcreteScanCounters& counters)
+ TProcessingController(TScanMemoryLimiter::IMemoryAccessor::TPtr memoryAccessor, const NColumnShard::TConcreteScanCounters& counters)
: Counters(counters)
+ , GuardZeroGranuleData(memoryAccessor, Counters.Aggregations.GetGranulesProcessing())
{
}
@@ -47,10 +49,6 @@ public:
void Abort();
- ui64 GetBlobsSize() const {
- return BlobsSize;
- }
-
ui32 GetCount() const {
return GranulesInProcessing.size();
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_context.cpp b/ydb/core/tx/columnshard/engines/reader/read_context.cpp
index 390bc57ae8..33f87457f1 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_context.cpp
@@ -4,11 +4,17 @@
namespace NKikimr::NOlap {
TReadContext::TReadContext(const NColumnShard::TDataTasksProcessorContainer& processor,
- const NColumnShard::TConcreteScanCounters& counters)
+ const NColumnShard::TConcreteScanCounters& counters,
+ std::shared_ptr<NOlap::TActorBasedMemoryAccesor> memoryAccessor)
: Processor(processor)
, Counters(counters)
+ , MemoryAccessor(memoryAccessor)
{
}
+void TActorBasedMemoryAccesor::DoOnBufferReady() {
+ OwnerId.Send(OwnerId, new NActors::TEvents::TEvWakeup(1));
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_context.h b/ydb/core/tx/columnshard/engines/reader/read_context.h
index 09ede34dc0..5ab203f798 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_context.h
@@ -1,21 +1,38 @@
#pragma once
#include "conveyor_task.h"
#include <ydb/core/tx/columnshard/counters/scan.h>
+#include <ydb/core/tx/columnshard/resources/memory.h>
#include <library/cpp/actors/core/actor.h>
namespace NKikimr::NOlap {
+class TActorBasedMemoryAccesor: public TScanMemoryLimiter::IMemoryAccessor {
+private:
+ using TBase = TScanMemoryLimiter::IMemoryAccessor;
+ const NActors::TActorIdentity OwnerId;
+protected:
+ virtual void DoOnBufferReady() override;
+public:
+ TActorBasedMemoryAccesor(const NActors::TActorIdentity& ownerId, const TString& limiterName)
+ : TBase(TMemoryLimitersController::GetLimiter(limiterName))
+ , OwnerId(ownerId) {
+
+ }
+};
+
class TReadContext {
private:
YDB_ACCESSOR_DEF(NColumnShard::TDataTasksProcessorContainer, Processor);
const NColumnShard::TConcreteScanCounters Counters;
+ YDB_READONLY_DEF(std::shared_ptr<NOlap::TActorBasedMemoryAccesor>, MemoryAccessor);
public:
const NColumnShard::TConcreteScanCounters& GetCounters() const {
return Counters;
}
TReadContext(const NColumnShard::TDataTasksProcessorContainer& processor,
- const NColumnShard::TConcreteScanCounters& counters
+ const NColumnShard::TConcreteScanCounters& counters,
+ std::shared_ptr<NOlap::TActorBasedMemoryAccesor> memoryAccessor
);
TReadContext(const NColumnShard::TConcreteScanCounters& counters)
diff --git a/ydb/core/tx/columnshard/engines/reader/ya.make b/ydb/core/tx/columnshard/engines/reader/ya.make
index b90f70893c..4894dbca62 100644
--- a/ydb/core/tx/columnshard/engines/reader/ya.make
+++ b/ydb/core/tx/columnshard/engines/reader/ya.make
@@ -22,6 +22,7 @@ PEERDIR(
ydb/core/formats/arrow
ydb/core/tx/columnshard/engines/predicate
ydb/core/tx/columnshard/hooks/abstract
+ ydb/core/tx/columnshard/resources
ydb/core/tx/program
ydb/core/tx/columnshard/engines/reader/order_control
ydb/core/tx/columnshard/engines/scheme
diff --git a/ydb/core/tx/columnshard/resources/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/resources/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..9b2360f6b4
--- /dev/null
+++ b/ydb/core/tx/columnshard/resources/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-resources)
+target_link_libraries(tx-columnshard-resources PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(tx-columnshard-resources PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resources/memory.cpp
+)
diff --git a/ydb/core/tx/columnshard/resources/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/resources/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..6248df7a86
--- /dev/null
+++ b/ydb/core/tx/columnshard/resources/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-resources)
+target_link_libraries(tx-columnshard-resources PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(tx-columnshard-resources PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resources/memory.cpp
+)
diff --git a/ydb/core/tx/columnshard/resources/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/resources/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..6248df7a86
--- /dev/null
+++ b/ydb/core/tx/columnshard/resources/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-resources)
+target_link_libraries(tx-columnshard-resources PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(tx-columnshard-resources PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resources/memory.cpp
+)
diff --git a/ydb/core/tx/columnshard/resources/CMakeLists.txt b/ydb/core/tx/columnshard/resources/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/core/tx/columnshard/resources/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/columnshard/resources/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/resources/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..9b2360f6b4
--- /dev/null
+++ b/ydb/core/tx/columnshard/resources/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-resources)
+target_link_libraries(tx-columnshard-resources PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(tx-columnshard-resources PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/resources/memory.cpp
+)
diff --git a/ydb/core/tx/columnshard/resources/memory.cpp b/ydb/core/tx/columnshard/resources/memory.cpp
new file mode 100644
index 0000000000..a6551393e6
--- /dev/null
+++ b/ydb/core/tx/columnshard/resources/memory.cpp
@@ -0,0 +1,102 @@
+#include "memory.h"
+#include <util/system/guard.h>
+
+namespace NKikimr::NOlap {
+
+TScanMemoryCounter::TScanMemoryCounter(const TString& limitName, const ui64 memoryLimit)
+ : TBase(limitName)
+{
+ AvailableMemory = TBase::GetValue("Available");
+ MinimalMemory = TBase::GetValue("Minimal");
+ DeriviativeTake = TBase::GetDeriviative("Take");
+ DeriviativeFree = TBase::GetDeriviative("Free");
+ AvailableMemory->Set(memoryLimit);
+
+ DeriviativeWaitingStart = TBase::GetDeriviative("Waiting/Start");
+ DeriviativeWaitingFinish = TBase::GetDeriviative("Waiting/Finish");
+ CurrentInWaiting = TBase::GetValue("InWaiting");
+}
+
+void TScanMemoryLimiter::Free(const ui64 size) {
+ const i64 newVal = AvailableMemory.Add(size);
+ Y_VERIFY(newVal <= AvailableMemoryLimit);
+ Counters.Free(size);
+ if (newVal > 0 && newVal <= (i64)size) {
+ std::vector<std::shared_ptr<IMemoryAccessor>> accessors;
+ {
+ ::TGuard<TMutex> g(Mutex);
+ while (AvailableMemory.Val() > 0 && InWaiting.size()) {
+ accessors.emplace_back(InWaiting.front());
+ InWaiting.pop_front();
+ Counters.WaitFinish();
+ }
+ WaitingCounter = InWaiting.size();
+ }
+ for (auto&& i : accessors) {
+ i->OnBufferReady();
+ }
+ }
+}
+
+bool TScanMemoryLimiter::HasBufferOrSubscribe(std::shared_ptr<IMemoryAccessor> accessor) {
+ if (AvailableMemory.Val() > 0) {
+ return true;
+ } else if (!accessor) {
+ return false;
+ }
+ ::TGuard<TMutex> g(Mutex);
+ if (accessor->InWaiting()) {
+ return false;
+ }
+ if (AvailableMemory.Val() > 0) {
+ return true;
+ }
+ accessor->StartWaiting();
+ WaitingCounter.Inc();
+ InWaiting.emplace_back(accessor);
+ Counters.WaitStart();
+ return false;
+}
+
+void TScanMemoryLimiter::Take(const ui64 size) {
+ const i64 val = AvailableMemory.Sub(size);
+ Counters.Take(size);
+ AFL_TRACE(NKikimrServices::OBJECTS_MONITORING)("current_memory", val)("min", MinMemory)("event", "take");
+// ::TGuard<TMutex> g(Mutex);
+// if (MinMemory > val) {
+// MinMemory = val;
+// Counters.OnMinimal(val);
+// }
+}
+
+void TScanMemoryLimiter::TGuard::FreeAll() {
+ if (MemorySignals) {
+ MemorySignals->RemoveBytes(Value.Val());
+ }
+ if (MemoryAccessor) {
+ MemoryAccessor->Free(Value.Val());
+ }
+ Value = 0;
+}
+
+void TScanMemoryLimiter::TGuard::Free(const ui64 size) {
+ if (MemoryAccessor) {
+ MemoryAccessor->Free(size);
+ }
+ Y_VERIFY(Value.Sub(size) >= 0);
+ if (MemorySignals) {
+ MemorySignals->RemoveBytes(size);
+ }
+}
+
+void TScanMemoryLimiter::TGuard::Take(const ui64 size) {
+ if (MemoryAccessor) {
+ MemoryAccessor->Take(size);
+ }
+ Value.Add(size);
+ if (MemorySignals) {
+ MemorySignals->AddBytes(size);
+ }
+}
+
+}
diff --git a/ydb/core/tx/columnshard/resources/memory.h b/ydb/core/tx/columnshard/resources/memory.h
new file mode 100644
index 0000000000..b819b4703e
--- /dev/null
+++ b/ydb/core/tx/columnshard/resources/memory.h
@@ -0,0 +1,197 @@
+#pragma once
+#include <ydb/core/tx/columnshard/counters/common/owner.h>
+#include <ydb/core/protos/services.pb.h>
+#include <library/cpp/actors/core/log.h>
+#include <util/system/mutex.h>
+
+namespace NKikimr::NOlap {
+
+class TMemoryAggregation: public NColumnShard::TCommonCountersOwner {
+private:
+ using TBase = TCommonCountersOwner;
+ NMonitoring::TDynamicCounters::TCounterPtr DeriviativeAddInFlightBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr DeriviativeRemoveInFlightBytes;
+ std::shared_ptr<NColumnShard::TValueAggregationClient> InFlightBytes;
+public:
+ TMemoryAggregation(const TString& moduleId, const TString& signalId)
+ : TBase(moduleId) {
+ DeriviativeAddInFlightBytes = TBase::GetDeriviative(signalId + "/Add/Bytes");
+ DeriviativeRemoveInFlightBytes = TBase::GetDeriviative(signalId + "/Remove/Bytes");
+ InFlightBytes = TBase::GetValueAutoAggregationsClient(signalId + "/Bytes");
+ }
+
+ void AddBytes(const ui64 size) {
+ DeriviativeAddInFlightBytes->Add(size);
+ InFlightBytes->Add(size);
+ }
+
+ void RemoveBytes(const ui64 size) {
+ DeriviativeRemoveInFlightBytes->Add(size);
+ InFlightBytes->Remove(size);
+ }
+};
+
+class TScanMemoryCounter: public NColumnShard::TCommonCountersOwner {
+private:
+ using TBase = NColumnShard::TCommonCountersOwner;
+ NMonitoring::TDynamicCounters::TCounterPtr MinimalMemory;
+ NMonitoring::TDynamicCounters::TCounterPtr AvailableMemory;
+ NMonitoring::TDynamicCounters::TCounterPtr DeriviativeTake;
+ NMonitoring::TDynamicCounters::TCounterPtr DeriviativeFree;
+
+ NMonitoring::TDynamicCounters::TCounterPtr DeriviativeWaitingStart;
+ NMonitoring::TDynamicCounters::TCounterPtr DeriviativeWaitingFinish;
+ NMonitoring::TDynamicCounters::TCounterPtr CurrentInWaiting;
+public:
+ TScanMemoryCounter(const TString& limitName, const ui64 memoryLimit);
+
+ void Take(const ui64 size) const {
+ DeriviativeTake->Add(size);
+ AvailableMemory->Sub(size);
+ }
+
+ void OnMinimal(const ui64 size) const {
+ MinimalMemory->Set(size);
+ }
+
+ void WaitStart() const {
+ DeriviativeWaitingStart->Add(1);
+ CurrentInWaiting->Add(1);
+ }
+
+ void WaitFinish() const {
+ DeriviativeWaitingFinish->Add(1);
+ CurrentInWaiting->Sub(1);
+ }
+
+ void Free(const ui64 size) const {
+ DeriviativeFree->Add(size);
+ AvailableMemory->Add(size);
+ }
+};
+
+class TScanMemoryLimiter: TNonCopyable {
+public:
+ const TString LimiterName;
+ const i64 AvailableMemoryLimit;
+ class IMemoryAccessor;
+ class TGuard: TNonCopyable {
+ private:
+ TAtomicCounter Value = 0;
+ std::shared_ptr<IMemoryAccessor> MemoryAccessor;
+ std::shared_ptr<TMemoryAggregation> MemorySignals;
+ public:
+ TGuard(std::shared_ptr<IMemoryAccessor> accesor, std::shared_ptr<TMemoryAggregation> memorySignals = nullptr)
+ : MemoryAccessor(accesor)
+ , MemorySignals(memorySignals) {
+ }
+ ~TGuard() {
+ FreeAll();
+ }
+ i64 GetValue() const {
+ return Value.Val();
+ }
+ void FreeAll();
+ void Free(const ui64 size);
+ void Take(const ui64 size);
+ };
+
+ class IMemoryAccessor {
+ private:
+ TAtomicCounter InWaitingFlag = 0;
+ std::shared_ptr<TScanMemoryLimiter> Owner;
+ friend class TScanMemoryLimiter;
+ void StartWaiting() {
+ Y_VERIFY(InWaitingFlag.Val() == 0);
+ InWaitingFlag = 1;
+ }
+ protected:
+ virtual void DoOnBufferReady() = 0;
+ public:
+ using TPtr = std::shared_ptr<IMemoryAccessor>;
+ IMemoryAccessor(std::shared_ptr<TScanMemoryLimiter> owner)
+ : Owner(owner)
+ {
+
+ }
+ bool HasBuffer() {
+ return Owner->HasBufferOrSubscribe(nullptr);
+ }
+
+ TScanMemoryLimiter& GetLimiter() const {
+ return *Owner;
+ }
+
+ virtual ~IMemoryAccessor() = default;
+ void Take(const ui64 size) const {
+ Owner->Take(size);
+ }
+
+ void Free(const ui64 size) const {
+ Owner->Free(size);
+ }
+
+ void OnBufferReady() {
+ Y_VERIFY(InWaitingFlag.Val() == 1);
+ InWaitingFlag = 0;
+ DoOnBufferReady();
+ }
+
+ bool InWaiting() const {
+ return InWaitingFlag.Val();
+ }
+ };
+
+private:
+ TAtomicCounter AvailableMemory = 0;
+ TAtomicCounter WaitingCounter = 0;
+ TScanMemoryCounter Counters;
+ std::deque<std::shared_ptr<IMemoryAccessor>> InWaiting;
+
+ TMutex Mutex;
+ i64 MinMemory = 0;
+
+ void Free(const ui64 size);
+ void Take(const ui64 size);
+public:
+ TScanMemoryLimiter(const TString& limiterName, const ui64 memoryLimit)
+ : LimiterName(limiterName)
+ , AvailableMemoryLimit(memoryLimit)
+ , AvailableMemory(memoryLimit)
+ , Counters("MemoryLimiters/" + limiterName, memoryLimit)
+ {
+
+ }
+ bool HasBufferOrSubscribe(std::shared_ptr<IMemoryAccessor> accessor);
+};
+
+class TMemoryLimitersController {
+private:
+ TRWMutex Mutex;
+ // limiter by limit name
+ THashMap<TString, std::shared_ptr<TScanMemoryLimiter>> Limiters;
+ static const inline ui64 StandartLimit = (ui64)1 * 1024 * 1024 * 1024;
+ std::shared_ptr<TScanMemoryLimiter> GetLimiterImpl(const TString& name) {
+ TReadGuard rg(Mutex);
+ auto it = Limiters.find(name);
+ if (it == Limiters.end()) {
+ rg.Release();
+ TWriteGuard wg(Mutex);
+ it = Limiters.find(name);
+ if (it != Limiters.end()) {
+ return it->second;
+ } else {
+ return Limiters.emplace(name, std::make_shared<TScanMemoryLimiter>(name, StandartLimit)).first->second;
+ }
+ } else {
+ return it->second;
+ }
+ }
+public:
+ static std::shared_ptr<TScanMemoryLimiter> GetLimiter(const TString& name) {
+ return Singleton<TMemoryLimitersController>()->GetLimiterImpl(name);
+ }
+
+};
+
+}
diff --git a/ydb/core/tx/columnshard/resources/ya.make b/ydb/core/tx/columnshard/resources/ya.make
new file mode 100644
index 0000000000..8b92823eb8
--- /dev/null
+++ b/ydb/core/tx/columnshard/resources/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+SRCS(
+ memory.cpp
+)
+
+PEERDIR(
+ contrib/libs/apache/arrow
+ ydb/core/protos
+ ydb/core/formats/arrow
+)
+
+END()