aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-07-01 18:39:23 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-07-01 18:39:23 +0300
commit46348e368ed43863ad93eb203f05b920103b60d1 (patch)
treea2de182de03cd78989a1961a4725a57b5637108d
parent53a0ced5335b33e79843e7ef3566348e257a2449 (diff)
downloadydb-46348e368ed43863ad93eb203f05b920103b60d1.tar.gz
corrections before memory guards usage
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.h2
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp6
-rw-r--r--ydb/core/tx/columnshard/counters/common/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/counters/common/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/tx/columnshard/counters/common/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/counters/common/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/counters/common/object_counter.cpp4
-rw-r--r--ydb/core/tx/columnshard/counters/common/object_counter.h12
-rw-r--r--ydb/core/tx/columnshard/counters/common/ya.make2
-rw-r--r--ydb/core/tx/columnshard/counters/scan.cpp1
-rw-r--r--ydb/core/tx/columnshard/counters/scan.h6
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp69
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h12
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h5
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.h14
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.cpp24
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.h24
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/result.h7
-rw-r--r--ydb/core/tx/columnshard/engines/reader/processing_context.cpp17
-rw-r--r--ydb/core/tx/columnshard/engines/reader/processing_context.h21
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.h1
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp2
25 files changed, 189 insertions, 57 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
index cb5fd9b6df9..c73a20a2332 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
@@ -28,7 +28,7 @@ NKikimr::NOlap::TPartialReadResult TColumnShardScanIterator::GetBatch() {
}
NKikimr::NColumnShard::TBlobRange TColumnShardScanIterator::GetNextBlobToRead() {
- return IndexedData.ExtractNextBlob();
+ return IndexedData.ExtractNextBlob(ReadyResults.size());
}
void TColumnShardScanIterator::FillReadyResults() {
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h
index 9e295cc3d79..51b35b25aab 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.h
@@ -81,7 +81,6 @@ private:
TReadyResults ReadyResults;
NOlap::TReadMetadata::TConstPtr ReadMetadata;
NOlap::TIndexedReadData IndexedData;
- std::unordered_map<NOlap::TCommittedBlob, ui32, THash<NOlap::TCommittedBlob>> WaitCommitted;
ui64 ItemsRead = 0;
const i64 MaxRowsInBatch = 5000;
public:
@@ -94,6 +93,7 @@ public:
virtual TString DebugString() const override {
return TStringBuilder()
+ << "ready_results:(" << ReadyResults.DebugString() << ");"
<< "indexed_data:(" << IndexedData.DebugString() << ")"
;
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index c726d849ca5..791e4217323 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -200,7 +200,9 @@ private:
ChunksLimiter = TChunksLimiter(ev->Get()->FreeSpace, ev->Get()->MaxChunksCount);
ACFL_DEBUG("event", "TEvScanDataAck")("info", ChunksLimiter.DebugString());
-
+ if (ScanIterator && !!ScanIterator->GetAvailableResultsCount() && !*ScanIterator->GetAvailableResultsCount()) {
+ ScanCountersPool.OnEmptyAck();
+ }
ContinueProcessing();
}
@@ -347,11 +349,11 @@ private:
}
}
ScanCountersPool.Hanging->Add(1);
- Y_VERIFY_DEBUG(false);
// The loop has finished without any progress!
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " is hanging"
<< " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
+ Y_VERIFY_DEBUG(false);
}
void HandleScan(TEvKqp::TEvAbortExecution::TPtr& ev) noexcept {
diff --git a/ydb/core/tx/columnshard/counters/common/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/counters/common/CMakeLists.darwin-x86_64.txt
index 90f970a4a6f..a143f2cfccc 100644
--- a/ydb/core/tx/columnshard/counters/common/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/counters/common/CMakeLists.darwin-x86_64.txt
@@ -12,6 +12,7 @@ target_link_libraries(columnshard-counters-common PUBLIC
contrib-libs-cxxsupp
yutil
cpp-monlib-dynamic_counters
+ ydb-core-protos
ydb-core-base
)
target_sources(columnshard-counters-common PRIVATE
@@ -19,4 +20,5 @@ target_sources(columnshard-counters-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/client.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/owner.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/private.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/object_counter.cpp
)
diff --git a/ydb/core/tx/columnshard/counters/common/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/counters/common/CMakeLists.linux-aarch64.txt
index ba3f0145091..18d467dd5ab 100644
--- a/ydb/core/tx/columnshard/counters/common/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/counters/common/CMakeLists.linux-aarch64.txt
@@ -13,6 +13,7 @@ target_link_libraries(columnshard-counters-common PUBLIC
contrib-libs-cxxsupp
yutil
cpp-monlib-dynamic_counters
+ ydb-core-protos
ydb-core-base
)
target_sources(columnshard-counters-common PRIVATE
@@ -20,4 +21,5 @@ target_sources(columnshard-counters-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/client.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/owner.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/private.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/object_counter.cpp
)
diff --git a/ydb/core/tx/columnshard/counters/common/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/counters/common/CMakeLists.linux-x86_64.txt
index ba3f0145091..18d467dd5ab 100644
--- a/ydb/core/tx/columnshard/counters/common/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/counters/common/CMakeLists.linux-x86_64.txt
@@ -13,6 +13,7 @@ target_link_libraries(columnshard-counters-common PUBLIC
contrib-libs-cxxsupp
yutil
cpp-monlib-dynamic_counters
+ ydb-core-protos
ydb-core-base
)
target_sources(columnshard-counters-common PRIVATE
@@ -20,4 +21,5 @@ target_sources(columnshard-counters-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/client.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/owner.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/private.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/object_counter.cpp
)
diff --git a/ydb/core/tx/columnshard/counters/common/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/counters/common/CMakeLists.windows-x86_64.txt
index 90f970a4a6f..a143f2cfccc 100644
--- a/ydb/core/tx/columnshard/counters/common/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/counters/common/CMakeLists.windows-x86_64.txt
@@ -12,6 +12,7 @@ target_link_libraries(columnshard-counters-common PUBLIC
contrib-libs-cxxsupp
yutil
cpp-monlib-dynamic_counters
+ ydb-core-protos
ydb-core-base
)
target_sources(columnshard-counters-common PRIVATE
@@ -19,4 +20,5 @@ target_sources(columnshard-counters-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/client.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/owner.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/private.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/object_counter.cpp
)
diff --git a/ydb/core/tx/columnshard/counters/common/object_counter.cpp b/ydb/core/tx/columnshard/counters/common/object_counter.cpp
new file mode 100644
index 00000000000..a0dda125cb2
--- /dev/null
+++ b/ydb/core/tx/columnshard/counters/common/object_counter.cpp
@@ -0,0 +1,4 @@
+#include "object_counter.h"
+
+namespace NKikimr::NColumnShard {
+}
diff --git a/ydb/core/tx/columnshard/counters/common/object_counter.h b/ydb/core/tx/columnshard/counters/common/object_counter.h
index 2cef57b7a11..f5f7758d76c 100644
--- a/ydb/core/tx/columnshard/counters/common/object_counter.h
+++ b/ydb/core/tx/columnshard/counters/common/object_counter.h
@@ -5,7 +5,7 @@
namespace NKikimr::NColumnShard {
-template <class TObject, bool UseSignals, bool UseLogs>
+template <class TObject>
class TMonitoringObjectsCounterImpl: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;
@@ -33,14 +33,18 @@ public:
}
};
-template <class TObject, bool UseSignals, bool UseLogs>
+template <class TObject, bool UseSignals = true, bool UseLogs = false>
class TMonitoringObjectsCounter {
private:
static inline TAtomicCounter Counter = 0;
public:
+ static inline TAtomicCounter GetCounter() {
+ return Counter.Val();
+ }
+
TMonitoringObjectsCounter() {
if (UseSignals) {
- Singleton<TMonitoringObjectsCounterImpl<TObject, UseSignals, UseLogs>>()->Inc();
+ Singleton<TMonitoringObjectsCounterImpl<TObject>>()->Inc();
}
Counter.Inc();
if (UseLogs) {
@@ -49,7 +53,7 @@ public:
}
~TMonitoringObjectsCounter() {
if (UseSignals) {
- Singleton<TMonitoringObjectsCounterImpl<TObject, UseSignals, UseLogs>>()->Dec();
+ Singleton<TMonitoringObjectsCounterImpl<TObject>>()->Dec();
}
Counter.Dec();
if (UseLogs) {
diff --git a/ydb/core/tx/columnshard/counters/common/ya.make b/ydb/core/tx/columnshard/counters/common/ya.make
index 51dd3ceabfb..804699ec52b 100644
--- a/ydb/core/tx/columnshard/counters/common/ya.make
+++ b/ydb/core/tx/columnshard/counters/common/ya.make
@@ -5,10 +5,12 @@ SRCS(
client.cpp
owner.cpp
private.cpp
+ object_counter.cpp
)
PEERDIR(
library/cpp/monlib/dynamic_counters
+ ydb/core/protos
ydb/core/base
)
diff --git a/ydb/core/tx/columnshard/counters/scan.cpp b/ydb/core/tx/columnshard/counters/scan.cpp
index 68fd73c8674..763008acb09 100644
--- a/ydb/core/tx/columnshard/counters/scan.cpp
+++ b/ydb/core/tx/columnshard/counters/scan.cpp
@@ -12,6 +12,7 @@ TScanCounters::TScanCounters(const TString& module)
, PriorityFetchCount(TBase::GetDeriviative("PriorityFetch/Count"))
, GeneralFetchBytes(TBase::GetDeriviative("GeneralFetch/Bytes"))
, GeneralFetchCount(TBase::GetDeriviative("GeneralFetch/Count"))
+ , NoResultsAckRequest(TBase::GetDeriviative("NoResultsAckRequest"))
, PortionBytes(TBase::GetDeriviative("PortionBytes"))
, FilterBytes(TBase::GetDeriviative("FilterBytes"))
, PostFilterBytes(TBase::GetDeriviative("PostFilterBytes"))
diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h
index f905b001d1f..bad3bf50b37 100644
--- a/ydb/core/tx/columnshard/counters/scan.h
+++ b/ydb/core/tx/columnshard/counters/scan.h
@@ -97,6 +97,8 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr PriorityFetchCount;
NMonitoring::TDynamicCounters::TCounterPtr GeneralFetchBytes;
NMonitoring::TDynamicCounters::TCounterPtr GeneralFetchCount;
+
+ NMonitoring::TDynamicCounters::TCounterPtr NoResultsAckRequest;
public:
NMonitoring::TDynamicCounters::TCounterPtr PortionBytes;
NMonitoring::TDynamicCounters::TCounterPtr FilterBytes;
@@ -130,6 +132,10 @@ public:
TScanCounters(const TString& module = "Scan");
+ void OnEmptyAck() const {
+ NoResultsAckRequest->Add(1);
+ }
+
void OnPriorityFetch(const ui64 size) const {
PriorityFetchBytes->Add(size);
PriorityFetchCount->Add(1);
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index cdd06250f34..298530843cb 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -104,7 +104,7 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v
}
void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) {
- Y_VERIFY(IndexedBlobSubscriber.emplace(range, &batch).second);
+ Y_VERIFY(IndexedBlobSubscriber.emplace(range, batch.GetBatchAddress()).second);
if (batch.GetFetchedInfo().GetFilter()) {
Context.GetCounters().PostFilterBytes->Add(range.Size);
ReadMetadata->ReadStats->DataAdditionalBytes += range.Size;
@@ -130,6 +130,9 @@ void TIndexedReadData::RegisterZeroGranula() {
}
void TIndexedReadData::InitRead() {
+ Y_VERIFY(!GranulesContext);
+ GranulesContext = std::make_unique<NIndexedReader::TGranulesFillingContext>(ReadMetadata, *this, OnePhaseReadMode);
+
RegisterZeroGranula();
auto& indexInfo = ReadMetadata->GetIndexInfo();
@@ -138,8 +141,6 @@ void TIndexedReadData::InitRead() {
SortReplaceDescription = indexInfo.SortReplaceDescription();
- Y_VERIFY(!GranulesContext);
- GranulesContext = std::make_unique<NIndexedReader::TGranulesFillingContext>(ReadMetadata, *this, OnePhaseReadMode);
ui64 portionsBytes = 0;
std::set<ui64> granulesReady;
ui64 prevGranule = 0;
@@ -174,7 +175,11 @@ void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& da
{
auto it = IndexedBlobSubscriber.find(blobRange);
Y_VERIFY(it != IndexedBlobSubscriber.end());
- portionBatch = it->second;
+ portionBatch = GranulesContext->GetBatchInfo(it->second);
+ if (!portionBatch) {
+ ACFL_INFO("event", "batch for finished granule")("address", it->second.ToString());
+ return;
+ }
IndexedBlobSubscriber.erase(it);
}
if (!portionBatch->AddIndexedReady(blobRange, data)) {
@@ -450,30 +455,40 @@ void TIndexedReadData::Abort() {
FetchBlobsQueue.Stop();
PriorityBlobsQueue.Stop();
GranulesContext->Abort();
+ IndexedBlobSubscriber.clear();
+ WaitCommitted.clear();
}
-NKikimr::NOlap::TBlobRange TIndexedReadData::ExtractNextBlob() {
+NKikimr::NOlap::TBlobRange TIndexedReadData::ExtractNextBlob(const bool hasReadyResults) {
Y_VERIFY(GranulesContext);
- {
- auto* f = PriorityBlobsQueue.front();
- if (f) {
- GranulesContext->ForceStartProcessGranule(f->GetGranuleId(), f->GetRange());
- Context.GetCounters().OnPriorityFetch(f->GetRange().Size);
- return PriorityBlobsQueue.pop_front();
+ while (auto* f = PriorityBlobsQueue.front()) {
+ if (!GranulesContext->IsGranuleActualForProcessing(f->GetGranuleId())) {
+ ACFL_DEBUG("event", "!IsGranuleActualForProcessing")("granule_id", f->GetGranuleId());
+ PriorityBlobsQueue.pop_front();
+ continue;
}
- }
- auto* f = FetchBlobsQueue.front();
- if (!f) {
- return TBlobRange();
+ GranulesContext->ForceStartProcessGranule(f->GetGranuleId(), f->GetRange());
+ Context.GetCounters().OnPriorityFetch(f->GetRange().Size);
+ return PriorityBlobsQueue.pop_front();
}
- if (GranulesContext->TryStartProcessGranule(f->GetGranuleId(), f->GetRange())) {
- Context.GetCounters().OnGeneralFetch(f->GetRange().Size);
- return FetchBlobsQueue.pop_front();
- } else {
- Context.GetCounters().OnProcessingOverloaded();
- return TBlobRange();
+
+ while (auto* f = FetchBlobsQueue.front()) {
+ if (!GranulesContext->IsGranuleActualForProcessing(f->GetGranuleId())) {
+ ACFL_DEBUG("event", "!IsGranuleActualForProcessing")("granule_id", f->GetGranuleId());
+ FetchBlobsQueue.pop_front();
+ continue;
+ }
+
+ if (GranulesContext->TryStartProcessGranule(f->GetGranuleId(), f->GetRange(), hasReadyResults)) {
+ Context.GetCounters().OnGeneralFetch(f->GetRange().Size);
+ return FetchBlobsQueue.pop_front();
+ } else {
+ Context.GetCounters().OnProcessingOverloaded();
+ return TBlobRange();
+ }
}
+ return TBlobRange();
}
void TIndexedReadData::AddNotIndexed(const TBlobRange& blobRange, const TString& column) {
@@ -491,4 +506,16 @@ void TIndexedReadData::AddNotIndexed(const TUnifiedBlobId& blobId, const std::sh
WaitCommitted.erase(it);
}
+void TIndexedReadData::AddData(const TBlobRange& blobRange, const TString& data) {
+ if (GranulesContext->IsFinished()) {
+ ACFL_DEBUG("event", "AddData on GranulesContextFinished");
+ return;
+ }
+ if (IsIndexedBlob(blobRange)) {
+ AddIndexed(blobRange, data);
+ } else {
+ AddNotIndexed(blobRange, data);
+ }
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index 3638ebd0d7f..8d058979351 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -29,7 +29,7 @@ private:
bool OnePhaseReadMode = false;
std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed;
- THashMap<TBlobRange, NIndexedReader::TBatch*> IndexedBlobSubscriber;
+ THashMap<TBlobRange, NIndexedReader::TBatchAddress> IndexedBlobSubscriber;
std::shared_ptr<arrow::RecordBatch> NotIndexedOutscopeBatch;
std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription;
@@ -72,13 +72,7 @@ public:
/// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK)
std::vector<TPartialReadResult> GetReadyResults(const int64_t maxRowsInBatch);
- void AddData(const TBlobRange& blobRange, const TString& data) {
- if (IsIndexedBlob(blobRange)) {
- AddIndexed(blobRange, data);
- } else {
- AddNotIndexed(blobRange, data);
- }
- }
+ void AddData(const TBlobRange& blobRange, const TString& data);
NOlap::TReadMetadata::TConstPtr GetReadMetadata() const {
return ReadMetadata;
@@ -99,7 +93,7 @@ public:
return FetchBlobsQueue.size() || PriorityBlobsQueue.size();
}
- TBlobRange ExtractNextBlob();
+ TBlobRange ExtractNextBlob(const bool hasReadyResults);
private:
std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch(
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index d79e67dc519..d1fe6384411 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -248,8 +248,9 @@ struct TPortionInfo {
sum += numRows * TIndexInfo::GetSpecialColumnByteWidth(i);
} else {
auto it = Meta.ColumnMeta.find(i);
- Y_VERIFY(it != Meta.ColumnMeta.end());
- sum += it->second.RawBytes;
+ if (it != Meta.ColumnMeta.end()) {
+ sum += it->second.RawBytes;
+ }
}
}
return sum;
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h
index 40969f6e91b..a380ce82453 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.h
+++ b/ydb/core/tx/columnshard/engines/reader/batch.h
@@ -69,7 +69,7 @@ public:
}
};
-class TBatch {
+class TBatch: TNonCopyable {
private:
const TBatchAddress BatchAddress;
YDB_READONLY(ui64, Portion, 0);
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
index 7aeed88a99d..93eb1debc38 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
@@ -10,6 +10,7 @@ TGranulesFillingContext::TGranulesFillingContext(TReadMetadata::TConstPtr readMe
, InternalReading(internalReading)
, Processing(owner.GetCounters())
, Result(owner.GetCounters())
+ , GranulesLiveContext(std::make_shared<TGranulesLiveControl>())
, Owner(owner)
, Counters(owner.GetCounters())
{
@@ -64,9 +65,9 @@ void TGranulesFillingContext::DrainNotIndexedBatches(THashMap<ui64, std::shared_
Processing.DrainNotIndexedBatches(batches);
}
-bool TGranulesFillingContext::TryStartProcessGranule(const ui64 granuleId, const TBlobRange& range) {
+bool TGranulesFillingContext::TryStartProcessGranule(const ui64 granuleId, const TBlobRange& range, const bool hasReadyResults) {
Y_VERIFY_DEBUG(!Result.IsReady(granuleId));
- if (InternalReading || Processing.IsInProgress(granuleId)) {
+ if (InternalReading || Processing.IsInProgress(granuleId) || (!hasReadyResults && !GranulesLiveContext->GetCount())) {
Processing.StartBlobProcessing(granuleId, range);
return true;
} else if (CheckBufferAvailable()) {
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h
index 64f5f60fed6..f9f7a45d78d 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h
@@ -3,6 +3,7 @@
#include "granule.h"
#include "processing_context.h"
#include "order_control/abstract.h"
+#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
#include <util/generic/hash.h>
namespace NKikimr::NOlap {
@@ -11,7 +12,7 @@ class TIndexedReadData;
namespace NKikimr::NOlap::NIndexedReader {
-class TGranulesFillingContext: TNonCopyable {
+class TGranulesFillingContext: TNonCopyable, public NColumnShard::TMonitoringObjectsCounter<TGranulesFillingContext, true, false> {
private:
YDB_READONLY_DEF(std::vector<std::string>, PKColumnNames);
TReadMetadata::TConstPtr ReadMetadata;
@@ -19,6 +20,7 @@ private:
const bool InternalReading = false;
TProcessingController Processing;
TResultController Result;
+ std::shared_ptr<TGranulesLiveControl> GranulesLiveContext;
TIndexedReadData& Owner;
std::set<ui32> EarlyFilterColumns;
std::set<ui32> PostFilterColumns;
@@ -33,12 +35,20 @@ private:
static constexpr i64 ProcessingBytesLimit = GranulesCountProcessingLimit * ExpectedBytesForGranule;
bool CheckBufferAvailable() const;
public:
+ std::shared_ptr<TGranulesLiveControl> GetGranulesLiveContext() const {
+ return GranulesLiveContext;
+ }
+ bool IsGranuleActualForProcessing(const ui64 granuleId) const {
+ return Processing.IsGranuleActualForProcessing(granuleId);
+ }
bool ForceStartProcessGranule(const ui64 granuleId, const TBlobRange& range);
- bool TryStartProcessGranule(const ui64 granuleId, const TBlobRange & range);
+ bool TryStartProcessGranule(const ui64 granuleId, const TBlobRange & range, const bool hasReadyResults);
TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData & owner, const bool internalReading);
TString DebugString() const {
return TStringBuilder()
+ << "processing:(" << Processing.DebugString() << ");"
+ << "result:(" << Result.DebugString() << ");"
<< "sorting_policy:(" << SortingPolicy->DebugString() << ");"
;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp
index 8b7657f4e09..f641b543580 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp
@@ -35,7 +35,7 @@ NKikimr::NOlap::NIndexedReader::TBatch& TGranule::RegisterBatchForFetching(const
Y_VERIFY(!ReadyFlag);
ui32 batchGranuleIdx = Batches.size();
WaitBatches.emplace(batchGranuleIdx);
- Batches.emplace_back(TBatch(TBatchAddress(GranuleId, batchGranuleIdx), *this, portionInfo));
+ Batches.emplace_back(TBatchAddress(GranuleId, batchGranuleIdx), *this, portionInfo);
Y_VERIFY(GranuleBatchNumbers.emplace(batchGranuleIdx).second);
Owner->OnNewBatch(Batches.back());
return Batches.back();
@@ -98,7 +98,7 @@ void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) {
Y_VERIFY(!ReadyFlag);
Y_VERIFY(!NotIndexedBatchReadyFlag || !batch);
if (!NotIndexedBatchReadyFlag) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_batch")("granule_id", GranuleId)("batch_no", "add_not_indexed_batch")("count", WaitBatches.size());
+ ACFL_TRACE("event", "new_batch")("granule_id", GranuleId)("batch_no", "add_not_indexed_batch")("count", WaitBatches.size());
} else {
return;
}
@@ -124,6 +124,7 @@ void TGranule::CheckReady() {
}
void TGranule::OnBlobReady(const TBlobRange& range) noexcept {
+ Y_VERIFY(InConstruction);
if (Owner->GetSortingPolicy()->CanInterrupt() && ReadyFlag) {
return;
}
@@ -132,4 +133,23 @@ void TGranule::OnBlobReady(const TBlobRange& range) noexcept {
Owner->OnBlobReady(GranuleId, range);
}
+TGranule::~TGranule() {
+ if (InConstruction) {
+ LiveController->Dec();
+ }
+}
+
+TGranule::TGranule(const ui64 granuleId, TGranulesFillingContext& owner)
+ : GranuleId(granuleId)
+ , LiveController(owner.GetGranulesLiveContext())
+ , Owner(&owner)
+{
+
+}
+
+void TGranule::StartConstruction() {
+ InConstruction = true;
+ LiveController->Inc();
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h
index 8fe3ca8adf5..5a233427220 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.h
+++ b/ydb/core/tx/columnshard/engines/reader/granule.h
@@ -10,6 +10,21 @@
namespace NKikimr::NOlap::NIndexedReader {
class TGranulesFillingContext;
+class TGranulesLiveControl {
+private:
+ TAtomicCounter GranulesCounter = 0;
+public:
+ i64 GetCount() const {
+ return GranulesCounter.Val();
+ }
+
+ void Inc() {
+ GranulesCounter.Inc();
+ }
+ void Dec() {
+ Y_VERIFY(GranulesCounter.Dec() >= 0);
+ }
+};
class TGranule {
public:
@@ -18,6 +33,7 @@ private:
ui64 GranuleId = 0;
bool NotIndexedBatchReadyFlag = false;
+ bool InConstruction = false;
std::shared_ptr<arrow::RecordBatch> NotIndexedBatch;
std::shared_ptr<NArrow::TColumnFilter> NotIndexedBatchFutureFilter;
@@ -27,15 +43,14 @@ private:
std::deque<TBatch> Batches;
std::set<ui32> WaitBatches;
std::set<ui32> GranuleBatchNumbers;
+ std::shared_ptr<TGranulesLiveControl> LiveController;
TGranulesFillingContext* Owner = nullptr;
THashSet<const void*> BatchesToDedup;
ui64 BlobsDataSize = 0;
void CheckReady();
public:
- TGranule(const ui64 granuleId, TGranulesFillingContext& owner)
- : GranuleId(granuleId)
- , Owner(&owner) {
- }
+ TGranule(const ui64 granuleId, TGranulesFillingContext& owner);
+ ~TGranule();
ui64 GetBlobsDataSize() const noexcept {
return BlobsDataSize;
@@ -131,6 +146,7 @@ public:
std::deque<TBatchForMerge> SortBatchesByPK(const bool reverse, TReadMetadata::TConstPtr readMetadata);
const std::set<ui32>& GetEarlyFilterColumns() const;
+ void StartConstruction();
void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch);
void OnBlobReady(const TBlobRange& range) noexcept;
bool OnFilterReady(TBatch& batchInfo);
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/result.h b/ydb/core/tx/columnshard/engines/reader/order_control/result.h
index f9ff56172c0..509d4e1d82f 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_control/result.h
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/result.h
@@ -11,6 +11,13 @@ protected:
i64 BlobsSize = 0;
const NColumnShard::TConcreteScanCounters Counters;
public:
+ TString DebugString() const {
+ return TStringBuilder()
+ << "to_out:" << GranulesToOut.size() << ";"
+ << "ready:" << ReadyGranulesAccumulator.size() << ";"
+ ;
+ }
+
TResultController(const NColumnShard::TConcreteScanCounters& counters)
: Counters(counters)
{
diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
index f4eb1186188..76012f89ff1 100644
--- a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
@@ -47,13 +47,13 @@ TGranule::TPtr TProcessingController::ExtractReadyVerified(const ui64 granuleId)
return result;
}
-TGranule::TPtr TProcessingController::GetGranuleVerified(const ui64 granuleId) {
+TGranule::TPtr TProcessingController::GetGranuleVerified(const ui64 granuleId) const {
auto it = GranulesWaiting.find(granuleId);
Y_VERIFY(it != GranulesWaiting.end());
return it->second;
}
-TGranule::TPtr TProcessingController::GetGranule(const ui64 granuleId) {
+TGranule::TPtr TProcessingController::GetGranule(const ui64 granuleId) const {
auto itGranule = GranulesWaiting.find(granuleId);
if (itGranule == GranulesWaiting.end()) {
return nullptr;
@@ -63,6 +63,7 @@ TGranule::TPtr TProcessingController::GetGranule(const ui64 granuleId) {
TGranule::TPtr TProcessingController::InsertGranule(TGranule::TPtr g) {
Y_VERIFY(GranulesWaiting.emplace(g->GetGranuleId(), g).second);
+ ++OriginalGranulesCount;
return g;
}
@@ -70,6 +71,7 @@ void TProcessingController::StartBlobProcessing(const ui64 granuleId, const TBlo
Counters.Aggregations->AddGranuleProcessingBytes(range.Size);
if (GranulesInProcessing.emplace(granuleId).second) {
if (granuleId) {
+ GetGranuleVerified(granuleId)->StartConstruction();
Y_VERIFY(GranulesWaiting.contains(granuleId));
Counters.Aggregations->AddGranuleProcessing();
}
@@ -81,12 +83,23 @@ void TProcessingController::StartBlobProcessing(const ui64 granuleId, const TBlo
}
void TProcessingController::Abort() {
+ NotIndexedBatchesInitialized = true;
GranulesWaiting.clear();
GranulesInProcessing.clear();
Counters.Aggregations->RemoveGranuleProcessingInfo(BlobsSize);
BlobsSize = 0;
}
+TString TProcessingController::DebugString() const {
+ return TStringBuilder()
+ << "waiting:" << GranulesWaiting.size() << ";"
+ << "in_progress:" << GranulesInProcessing.size() << ";"
+ << "original_waiting:" << OriginalGranulesCount << ";"
+ << "common_granules_data:" << CommonGranuleData << ";"
+ << "common_initialized:" << NotIndexedBatchesInitialized << ";"
+ ;
+}
+
NKikimr::NOlap::NIndexedReader::TBatch& TProcessingController::GetBatchInfoVerified(const TBatchAddress& address) {
NIndexedReader::TBatch* bInfo = GetBatchInfo(address);
Y_VERIFY(bInfo);
diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.h b/ydb/core/tx/columnshard/engines/reader/processing_context.h
index de40c312a27..587eee7b553 100644
--- a/ydb/core/tx/columnshard/engines/reader/processing_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/processing_context.h
@@ -7,21 +7,36 @@ namespace NKikimr::NOlap::NIndexedReader {
class TProcessingController {
private:
THashMap<ui64, TGranule::TPtr> GranulesWaiting;
+ ui32 OriginalGranulesCount = 0;
+ ui64 CommonGranuleData = 0;
std::set<ui64> GranulesInProcessing;
i64 BlobsSize = 0;
bool NotIndexedBatchesInitialized = false;
const NColumnShard::TConcreteScanCounters Counters;
public:
+ TString DebugString() const;
+ bool IsGranuleActualForProcessing(const ui64 granuleId) const {
+ return GranulesWaiting.contains(granuleId) || (granuleId == 0 && !NotIndexedBatchesInitialized);
+ }
+
TProcessingController(const NColumnShard::TConcreteScanCounters& counters)
: Counters(counters)
{
}
+ ~TProcessingController() {
+ Abort();
+ }
+
void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches);
NIndexedReader::TBatch* GetBatchInfo(const TBatchAddress& address);
NIndexedReader::TBatch& GetBatchInfoVerified(const TBatchAddress& address);
+ const std::set<ui64>& GetProcessingGranules() const {
+ return GranulesInProcessing;
+ }
+
ui32 GetProcessingGranulesCount() const {
return GranulesInProcessing.size();
}
@@ -44,13 +59,13 @@ public:
TGranule::TPtr ExtractReadyVerified(const ui64 granuleId);
- TGranule::TPtr GetGranuleVerified(const ui64 granuleId);
+ TGranule::TPtr GetGranuleVerified(const ui64 granuleId) const;
- bool IsFinished() const { return GranulesWaiting.empty(); }
+ bool IsFinished() const { return GranulesWaiting.empty() && NotIndexedBatchesInitialized; }
TGranule::TPtr InsertGranule(TGranule::TPtr g);
- TGranule::TPtr GetGranule(const ui64 granuleId);
+ TGranule::TPtr GetGranule(const ui64 granuleId) const;
};
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h
index 677a3bdf3b9..8d595d778c3 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.h
+++ b/ydb/core/tx/columnshard/engines/storage/granule.h
@@ -222,6 +222,7 @@ public:
bool NeedCompaction(const TCompactionLimits& limits) const {
if (InCompaction() || Empty()) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "granule_skipped_by_state")("granule_id", GetGranuleId())("granule_size", Size());
return false;
}
return NeedSplitCompaction(limits) || NeedInternalCompaction(limits);
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index 93e1049a1e8..fb5d78cacab 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -160,7 +160,7 @@ public:
ctx.Send(SelfId(), new TEvents::TEvPoisonPill());
} else {
while (IndexedData.HasMoreBlobs()) {
- const auto blobRange = IndexedData.ExtractNextBlob();
+ const auto blobRange = IndexedData.ExtractNextBlob(false);
SendReadRequest(ctx, blobRange);
}
BuildResult(ctx);