diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-01 18:39:23 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-01 18:39:23 +0300 |
commit | 46348e368ed43863ad93eb203f05b920103b60d1 (patch) | |
tree | a2de182de03cd78989a1961a4725a57b5637108d | |
parent | 53a0ced5335b33e79843e7ef3566348e257a2449 (diff) | |
download | ydb-46348e368ed43863ad93eb203f05b920103b60d1.tar.gz |
corrections before memory guards usage
25 files changed, 189 insertions, 57 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp index cb5fd9b6df9..c73a20a2332 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp @@ -28,7 +28,7 @@ NKikimr::NOlap::TPartialReadResult TColumnShardScanIterator::GetBatch() { } NKikimr::NColumnShard::TBlobRange TColumnShardScanIterator::GetNextBlobToRead() { - return IndexedData.ExtractNextBlob(); + return IndexedData.ExtractNextBlob(ReadyResults.size()); } void TColumnShardScanIterator::FillReadyResults() { diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h index 9e295cc3d79..51b35b25aab 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.h +++ b/ydb/core/tx/columnshard/columnshard__index_scan.h @@ -81,7 +81,6 @@ private: TReadyResults ReadyResults; NOlap::TReadMetadata::TConstPtr ReadMetadata; NOlap::TIndexedReadData IndexedData; - std::unordered_map<NOlap::TCommittedBlob, ui32, THash<NOlap::TCommittedBlob>> WaitCommitted; ui64 ItemsRead = 0; const i64 MaxRowsInBatch = 5000; public: @@ -94,6 +93,7 @@ public: virtual TString DebugString() const override { return TStringBuilder() + << "ready_results:(" << ReadyResults.DebugString() << ");" << "indexed_data:(" << IndexedData.DebugString() << ")" ; } diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index c726d849ca5..791e4217323 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -200,7 +200,9 @@ private: ChunksLimiter = TChunksLimiter(ev->Get()->FreeSpace, ev->Get()->MaxChunksCount); ACFL_DEBUG("event", "TEvScanDataAck")("info", ChunksLimiter.DebugString()); - + if (ScanIterator && !!ScanIterator->GetAvailableResultsCount() && !*ScanIterator->GetAvailableResultsCount()) { + ScanCountersPool.OnEmptyAck(); + } ContinueProcessing(); } @@ -347,11 +349,11 @@ private: } } ScanCountersPool.Hanging->Add(1); - Y_VERIFY_DEBUG(false); // The loop has finished without any progress! LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " is hanging" << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); + Y_VERIFY_DEBUG(false); } void HandleScan(TEvKqp::TEvAbortExecution::TPtr& ev) noexcept { diff --git a/ydb/core/tx/columnshard/counters/common/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/counters/common/CMakeLists.darwin-x86_64.txt index 90f970a4a6f..a143f2cfccc 100644 --- a/ydb/core/tx/columnshard/counters/common/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/counters/common/CMakeLists.darwin-x86_64.txt @@ -12,6 +12,7 @@ target_link_libraries(columnshard-counters-common PUBLIC contrib-libs-cxxsupp yutil cpp-monlib-dynamic_counters + ydb-core-protos ydb-core-base ) target_sources(columnshard-counters-common PRIVATE @@ -19,4 +20,5 @@ target_sources(columnshard-counters-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/client.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/owner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/private.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/object_counter.cpp ) diff --git a/ydb/core/tx/columnshard/counters/common/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/counters/common/CMakeLists.linux-aarch64.txt index ba3f0145091..18d467dd5ab 100644 --- a/ydb/core/tx/columnshard/counters/common/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/counters/common/CMakeLists.linux-aarch64.txt @@ -13,6 +13,7 @@ target_link_libraries(columnshard-counters-common PUBLIC contrib-libs-cxxsupp yutil cpp-monlib-dynamic_counters + ydb-core-protos ydb-core-base ) target_sources(columnshard-counters-common PRIVATE @@ -20,4 +21,5 @@ target_sources(columnshard-counters-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/client.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/owner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/private.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/object_counter.cpp ) diff --git a/ydb/core/tx/columnshard/counters/common/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/counters/common/CMakeLists.linux-x86_64.txt index ba3f0145091..18d467dd5ab 100644 --- a/ydb/core/tx/columnshard/counters/common/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/counters/common/CMakeLists.linux-x86_64.txt @@ -13,6 +13,7 @@ target_link_libraries(columnshard-counters-common PUBLIC contrib-libs-cxxsupp yutil cpp-monlib-dynamic_counters + ydb-core-protos ydb-core-base ) target_sources(columnshard-counters-common PRIVATE @@ -20,4 +21,5 @@ target_sources(columnshard-counters-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/client.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/owner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/private.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/object_counter.cpp ) diff --git a/ydb/core/tx/columnshard/counters/common/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/counters/common/CMakeLists.windows-x86_64.txt index 90f970a4a6f..a143f2cfccc 100644 --- a/ydb/core/tx/columnshard/counters/common/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/counters/common/CMakeLists.windows-x86_64.txt @@ -12,6 +12,7 @@ target_link_libraries(columnshard-counters-common PUBLIC contrib-libs-cxxsupp yutil cpp-monlib-dynamic_counters + ydb-core-protos ydb-core-base ) target_sources(columnshard-counters-common PRIVATE @@ -19,4 +20,5 @@ target_sources(columnshard-counters-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/client.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/owner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/private.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters/common/object_counter.cpp ) diff --git a/ydb/core/tx/columnshard/counters/common/object_counter.cpp b/ydb/core/tx/columnshard/counters/common/object_counter.cpp new file mode 100644 index 00000000000..a0dda125cb2 --- /dev/null +++ b/ydb/core/tx/columnshard/counters/common/object_counter.cpp @@ -0,0 +1,4 @@ +#include "object_counter.h" + +namespace NKikimr::NColumnShard { +} diff --git a/ydb/core/tx/columnshard/counters/common/object_counter.h b/ydb/core/tx/columnshard/counters/common/object_counter.h index 2cef57b7a11..f5f7758d76c 100644 --- a/ydb/core/tx/columnshard/counters/common/object_counter.h +++ b/ydb/core/tx/columnshard/counters/common/object_counter.h @@ -5,7 +5,7 @@ namespace NKikimr::NColumnShard { -template <class TObject, bool UseSignals, bool UseLogs> +template <class TObject> class TMonitoringObjectsCounterImpl: public TCommonCountersOwner { private: using TBase = TCommonCountersOwner; @@ -33,14 +33,18 @@ public: } }; -template <class TObject, bool UseSignals, bool UseLogs> +template <class TObject, bool UseSignals = true, bool UseLogs = false> class TMonitoringObjectsCounter { private: static inline TAtomicCounter Counter = 0; public: + static inline TAtomicCounter GetCounter() { + return Counter.Val(); + } + TMonitoringObjectsCounter() { if (UseSignals) { - Singleton<TMonitoringObjectsCounterImpl<TObject, UseSignals, UseLogs>>()->Inc(); + Singleton<TMonitoringObjectsCounterImpl<TObject>>()->Inc(); } Counter.Inc(); if (UseLogs) { @@ -49,7 +53,7 @@ public: } ~TMonitoringObjectsCounter() { if (UseSignals) { - Singleton<TMonitoringObjectsCounterImpl<TObject, UseSignals, UseLogs>>()->Dec(); + Singleton<TMonitoringObjectsCounterImpl<TObject>>()->Dec(); } Counter.Dec(); if (UseLogs) { diff --git a/ydb/core/tx/columnshard/counters/common/ya.make b/ydb/core/tx/columnshard/counters/common/ya.make index 51dd3ceabfb..804699ec52b 100644 --- a/ydb/core/tx/columnshard/counters/common/ya.make +++ b/ydb/core/tx/columnshard/counters/common/ya.make @@ -5,10 +5,12 @@ SRCS( client.cpp owner.cpp private.cpp + object_counter.cpp ) PEERDIR( library/cpp/monlib/dynamic_counters + ydb/core/protos ydb/core/base ) diff --git a/ydb/core/tx/columnshard/counters/scan.cpp b/ydb/core/tx/columnshard/counters/scan.cpp index 68fd73c8674..763008acb09 100644 --- a/ydb/core/tx/columnshard/counters/scan.cpp +++ b/ydb/core/tx/columnshard/counters/scan.cpp @@ -12,6 +12,7 @@ TScanCounters::TScanCounters(const TString& module) , PriorityFetchCount(TBase::GetDeriviative("PriorityFetch/Count")) , GeneralFetchBytes(TBase::GetDeriviative("GeneralFetch/Bytes")) , GeneralFetchCount(TBase::GetDeriviative("GeneralFetch/Count")) + , NoResultsAckRequest(TBase::GetDeriviative("NoResultsAckRequest")) , PortionBytes(TBase::GetDeriviative("PortionBytes")) , FilterBytes(TBase::GetDeriviative("FilterBytes")) , PostFilterBytes(TBase::GetDeriviative("PostFilterBytes")) diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h index f905b001d1f..bad3bf50b37 100644 --- a/ydb/core/tx/columnshard/counters/scan.h +++ b/ydb/core/tx/columnshard/counters/scan.h @@ -97,6 +97,8 @@ private: NMonitoring::TDynamicCounters::TCounterPtr PriorityFetchCount; NMonitoring::TDynamicCounters::TCounterPtr GeneralFetchBytes; NMonitoring::TDynamicCounters::TCounterPtr GeneralFetchCount; + + NMonitoring::TDynamicCounters::TCounterPtr NoResultsAckRequest; public: NMonitoring::TDynamicCounters::TCounterPtr PortionBytes; NMonitoring::TDynamicCounters::TCounterPtr FilterBytes; @@ -130,6 +132,10 @@ public: TScanCounters(const TString& module = "Scan"); + void OnEmptyAck() const { + NoResultsAckRequest->Add(1); + } + void OnPriorityFetch(const ui64 size) const { PriorityFetchBytes->Add(size); PriorityFetchCount->Add(1); diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index cdd06250f34..298530843cb 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -104,7 +104,7 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v } void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) { - Y_VERIFY(IndexedBlobSubscriber.emplace(range, &batch).second); + Y_VERIFY(IndexedBlobSubscriber.emplace(range, batch.GetBatchAddress()).second); if (batch.GetFetchedInfo().GetFilter()) { Context.GetCounters().PostFilterBytes->Add(range.Size); ReadMetadata->ReadStats->DataAdditionalBytes += range.Size; @@ -130,6 +130,9 @@ void TIndexedReadData::RegisterZeroGranula() { } void TIndexedReadData::InitRead() { + Y_VERIFY(!GranulesContext); + GranulesContext = std::make_unique<NIndexedReader::TGranulesFillingContext>(ReadMetadata, *this, OnePhaseReadMode); + RegisterZeroGranula(); auto& indexInfo = ReadMetadata->GetIndexInfo(); @@ -138,8 +141,6 @@ void TIndexedReadData::InitRead() { SortReplaceDescription = indexInfo.SortReplaceDescription(); - Y_VERIFY(!GranulesContext); - GranulesContext = std::make_unique<NIndexedReader::TGranulesFillingContext>(ReadMetadata, *this, OnePhaseReadMode); ui64 portionsBytes = 0; std::set<ui64> granulesReady; ui64 prevGranule = 0; @@ -174,7 +175,11 @@ void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& da { auto it = IndexedBlobSubscriber.find(blobRange); Y_VERIFY(it != IndexedBlobSubscriber.end()); - portionBatch = it->second; + portionBatch = GranulesContext->GetBatchInfo(it->second); + if (!portionBatch) { + ACFL_INFO("event", "batch for finished granule")("address", it->second.ToString()); + return; + } IndexedBlobSubscriber.erase(it); } if (!portionBatch->AddIndexedReady(blobRange, data)) { @@ -450,30 +455,40 @@ void TIndexedReadData::Abort() { FetchBlobsQueue.Stop(); PriorityBlobsQueue.Stop(); GranulesContext->Abort(); + IndexedBlobSubscriber.clear(); + WaitCommitted.clear(); } -NKikimr::NOlap::TBlobRange TIndexedReadData::ExtractNextBlob() { +NKikimr::NOlap::TBlobRange TIndexedReadData::ExtractNextBlob(const bool hasReadyResults) { Y_VERIFY(GranulesContext); - { - auto* f = PriorityBlobsQueue.front(); - if (f) { - GranulesContext->ForceStartProcessGranule(f->GetGranuleId(), f->GetRange()); - Context.GetCounters().OnPriorityFetch(f->GetRange().Size); - return PriorityBlobsQueue.pop_front(); + while (auto* f = PriorityBlobsQueue.front()) { + if (!GranulesContext->IsGranuleActualForProcessing(f->GetGranuleId())) { + ACFL_DEBUG("event", "!IsGranuleActualForProcessing")("granule_id", f->GetGranuleId()); + PriorityBlobsQueue.pop_front(); + continue; } - } - auto* f = FetchBlobsQueue.front(); - if (!f) { - return TBlobRange(); + GranulesContext->ForceStartProcessGranule(f->GetGranuleId(), f->GetRange()); + Context.GetCounters().OnPriorityFetch(f->GetRange().Size); + return PriorityBlobsQueue.pop_front(); } - if (GranulesContext->TryStartProcessGranule(f->GetGranuleId(), f->GetRange())) { - Context.GetCounters().OnGeneralFetch(f->GetRange().Size); - return FetchBlobsQueue.pop_front(); - } else { - Context.GetCounters().OnProcessingOverloaded(); - return TBlobRange(); + + while (auto* f = FetchBlobsQueue.front()) { + if (!GranulesContext->IsGranuleActualForProcessing(f->GetGranuleId())) { + ACFL_DEBUG("event", "!IsGranuleActualForProcessing")("granule_id", f->GetGranuleId()); + FetchBlobsQueue.pop_front(); + continue; + } + + if (GranulesContext->TryStartProcessGranule(f->GetGranuleId(), f->GetRange(), hasReadyResults)) { + Context.GetCounters().OnGeneralFetch(f->GetRange().Size); + return FetchBlobsQueue.pop_front(); + } else { + Context.GetCounters().OnProcessingOverloaded(); + return TBlobRange(); + } } + return TBlobRange(); } void TIndexedReadData::AddNotIndexed(const TBlobRange& blobRange, const TString& column) { @@ -491,4 +506,16 @@ void TIndexedReadData::AddNotIndexed(const TUnifiedBlobId& blobId, const std::sh WaitCommitted.erase(it); } +void TIndexedReadData::AddData(const TBlobRange& blobRange, const TString& data) { + if (GranulesContext->IsFinished()) { + ACFL_DEBUG("event", "AddData on GranulesContextFinished"); + return; + } + if (IsIndexedBlob(blobRange)) { + AddIndexed(blobRange, data); + } else { + AddNotIndexed(blobRange, data); + } +} + } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index 3638ebd0d7f..8d058979351 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -29,7 +29,7 @@ private: bool OnePhaseReadMode = false; std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed; - THashMap<TBlobRange, NIndexedReader::TBatch*> IndexedBlobSubscriber; + THashMap<TBlobRange, NIndexedReader::TBatchAddress> IndexedBlobSubscriber; std::shared_ptr<arrow::RecordBatch> NotIndexedOutscopeBatch; std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription; @@ -72,13 +72,7 @@ public: /// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK) std::vector<TPartialReadResult> GetReadyResults(const int64_t maxRowsInBatch); - void AddData(const TBlobRange& blobRange, const TString& data) { - if (IsIndexedBlob(blobRange)) { - AddIndexed(blobRange, data); - } else { - AddNotIndexed(blobRange, data); - } - } + void AddData(const TBlobRange& blobRange, const TString& data); NOlap::TReadMetadata::TConstPtr GetReadMetadata() const { return ReadMetadata; @@ -99,7 +93,7 @@ public: return FetchBlobsQueue.size() || PriorityBlobsQueue.size(); } - TBlobRange ExtractNextBlob(); + TBlobRange ExtractNextBlob(const bool hasReadyResults); private: std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch( diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index d79e67dc519..d1fe6384411 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -248,8 +248,9 @@ struct TPortionInfo { sum += numRows * TIndexInfo::GetSpecialColumnByteWidth(i); } else { auto it = Meta.ColumnMeta.find(i); - Y_VERIFY(it != Meta.ColumnMeta.end()); - sum += it->second.RawBytes; + if (it != Meta.ColumnMeta.end()) { + sum += it->second.RawBytes; + } } } return sum; diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h index 40969f6e91b..a380ce82453 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.h +++ b/ydb/core/tx/columnshard/engines/reader/batch.h @@ -69,7 +69,7 @@ public: } }; -class TBatch { +class TBatch: TNonCopyable { private: const TBatchAddress BatchAddress; YDB_READONLY(ui64, Portion, 0); diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp index 7aeed88a99d..93eb1debc38 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp @@ -10,6 +10,7 @@ TGranulesFillingContext::TGranulesFillingContext(TReadMetadata::TConstPtr readMe , InternalReading(internalReading) , Processing(owner.GetCounters()) , Result(owner.GetCounters()) + , GranulesLiveContext(std::make_shared<TGranulesLiveControl>()) , Owner(owner) , Counters(owner.GetCounters()) { @@ -64,9 +65,9 @@ void TGranulesFillingContext::DrainNotIndexedBatches(THashMap<ui64, std::shared_ Processing.DrainNotIndexedBatches(batches); } -bool TGranulesFillingContext::TryStartProcessGranule(const ui64 granuleId, const TBlobRange& range) { +bool TGranulesFillingContext::TryStartProcessGranule(const ui64 granuleId, const TBlobRange& range, const bool hasReadyResults) { Y_VERIFY_DEBUG(!Result.IsReady(granuleId)); - if (InternalReading || Processing.IsInProgress(granuleId)) { + if (InternalReading || Processing.IsInProgress(granuleId) || (!hasReadyResults && !GranulesLiveContext->GetCount())) { Processing.StartBlobProcessing(granuleId, range); return true; } else if (CheckBufferAvailable()) { diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h index 64f5f60fed6..f9f7a45d78d 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.h +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h @@ -3,6 +3,7 @@ #include "granule.h" #include "processing_context.h" #include "order_control/abstract.h" +#include <ydb/core/tx/columnshard/counters/common/object_counter.h> #include <util/generic/hash.h> namespace NKikimr::NOlap { @@ -11,7 +12,7 @@ class TIndexedReadData; namespace NKikimr::NOlap::NIndexedReader { -class TGranulesFillingContext: TNonCopyable { +class TGranulesFillingContext: TNonCopyable, public NColumnShard::TMonitoringObjectsCounter<TGranulesFillingContext, true, false> { private: YDB_READONLY_DEF(std::vector<std::string>, PKColumnNames); TReadMetadata::TConstPtr ReadMetadata; @@ -19,6 +20,7 @@ private: const bool InternalReading = false; TProcessingController Processing; TResultController Result; + std::shared_ptr<TGranulesLiveControl> GranulesLiveContext; TIndexedReadData& Owner; std::set<ui32> EarlyFilterColumns; std::set<ui32> PostFilterColumns; @@ -33,12 +35,20 @@ private: static constexpr i64 ProcessingBytesLimit = GranulesCountProcessingLimit * ExpectedBytesForGranule; bool CheckBufferAvailable() const; public: + std::shared_ptr<TGranulesLiveControl> GetGranulesLiveContext() const { + return GranulesLiveContext; + } + bool IsGranuleActualForProcessing(const ui64 granuleId) const { + return Processing.IsGranuleActualForProcessing(granuleId); + } bool ForceStartProcessGranule(const ui64 granuleId, const TBlobRange& range); - bool TryStartProcessGranule(const ui64 granuleId, const TBlobRange & range); + bool TryStartProcessGranule(const ui64 granuleId, const TBlobRange & range, const bool hasReadyResults); TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData & owner, const bool internalReading); TString DebugString() const { return TStringBuilder() + << "processing:(" << Processing.DebugString() << ");" + << "result:(" << Result.DebugString() << ");" << "sorting_policy:(" << SortingPolicy->DebugString() << ");" ; } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp index 8b7657f4e09..f641b543580 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.cpp +++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp @@ -35,7 +35,7 @@ NKikimr::NOlap::NIndexedReader::TBatch& TGranule::RegisterBatchForFetching(const Y_VERIFY(!ReadyFlag); ui32 batchGranuleIdx = Batches.size(); WaitBatches.emplace(batchGranuleIdx); - Batches.emplace_back(TBatch(TBatchAddress(GranuleId, batchGranuleIdx), *this, portionInfo)); + Batches.emplace_back(TBatchAddress(GranuleId, batchGranuleIdx), *this, portionInfo); Y_VERIFY(GranuleBatchNumbers.emplace(batchGranuleIdx).second); Owner->OnNewBatch(Batches.back()); return Batches.back(); @@ -98,7 +98,7 @@ void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) { Y_VERIFY(!ReadyFlag); Y_VERIFY(!NotIndexedBatchReadyFlag || !batch); if (!NotIndexedBatchReadyFlag) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_batch")("granule_id", GranuleId)("batch_no", "add_not_indexed_batch")("count", WaitBatches.size()); + ACFL_TRACE("event", "new_batch")("granule_id", GranuleId)("batch_no", "add_not_indexed_batch")("count", WaitBatches.size()); } else { return; } @@ -124,6 +124,7 @@ void TGranule::CheckReady() { } void TGranule::OnBlobReady(const TBlobRange& range) noexcept { + Y_VERIFY(InConstruction); if (Owner->GetSortingPolicy()->CanInterrupt() && ReadyFlag) { return; } @@ -132,4 +133,23 @@ void TGranule::OnBlobReady(const TBlobRange& range) noexcept { Owner->OnBlobReady(GranuleId, range); } +TGranule::~TGranule() { + if (InConstruction) { + LiveController->Dec(); + } +} + +TGranule::TGranule(const ui64 granuleId, TGranulesFillingContext& owner) + : GranuleId(granuleId) + , LiveController(owner.GetGranulesLiveContext()) + , Owner(&owner) +{ + +} + +void TGranule::StartConstruction() { + InConstruction = true; + LiveController->Inc(); +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h index 8fe3ca8adf5..5a233427220 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.h +++ b/ydb/core/tx/columnshard/engines/reader/granule.h @@ -10,6 +10,21 @@ namespace NKikimr::NOlap::NIndexedReader { class TGranulesFillingContext; +class TGranulesLiveControl { +private: + TAtomicCounter GranulesCounter = 0; +public: + i64 GetCount() const { + return GranulesCounter.Val(); + } + + void Inc() { + GranulesCounter.Inc(); + } + void Dec() { + Y_VERIFY(GranulesCounter.Dec() >= 0); + } +}; class TGranule { public: @@ -18,6 +33,7 @@ private: ui64 GranuleId = 0; bool NotIndexedBatchReadyFlag = false; + bool InConstruction = false; std::shared_ptr<arrow::RecordBatch> NotIndexedBatch; std::shared_ptr<NArrow::TColumnFilter> NotIndexedBatchFutureFilter; @@ -27,15 +43,14 @@ private: std::deque<TBatch> Batches; std::set<ui32> WaitBatches; std::set<ui32> GranuleBatchNumbers; + std::shared_ptr<TGranulesLiveControl> LiveController; TGranulesFillingContext* Owner = nullptr; THashSet<const void*> BatchesToDedup; ui64 BlobsDataSize = 0; void CheckReady(); public: - TGranule(const ui64 granuleId, TGranulesFillingContext& owner) - : GranuleId(granuleId) - , Owner(&owner) { - } + TGranule(const ui64 granuleId, TGranulesFillingContext& owner); + ~TGranule(); ui64 GetBlobsDataSize() const noexcept { return BlobsDataSize; @@ -131,6 +146,7 @@ public: std::deque<TBatchForMerge> SortBatchesByPK(const bool reverse, TReadMetadata::TConstPtr readMetadata); const std::set<ui32>& GetEarlyFilterColumns() const; + void StartConstruction(); void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch); void OnBlobReady(const TBlobRange& range) noexcept; bool OnFilterReady(TBatch& batchInfo); diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/result.h b/ydb/core/tx/columnshard/engines/reader/order_control/result.h index f9ff56172c0..509d4e1d82f 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_control/result.h +++ b/ydb/core/tx/columnshard/engines/reader/order_control/result.h @@ -11,6 +11,13 @@ protected: i64 BlobsSize = 0; const NColumnShard::TConcreteScanCounters Counters; public: + TString DebugString() const { + return TStringBuilder() + << "to_out:" << GranulesToOut.size() << ";" + << "ready:" << ReadyGranulesAccumulator.size() << ";" + ; + } + TResultController(const NColumnShard::TConcreteScanCounters& counters) : Counters(counters) { diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp index f4eb1186188..76012f89ff1 100644 --- a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp @@ -47,13 +47,13 @@ TGranule::TPtr TProcessingController::ExtractReadyVerified(const ui64 granuleId) return result; } -TGranule::TPtr TProcessingController::GetGranuleVerified(const ui64 granuleId) { +TGranule::TPtr TProcessingController::GetGranuleVerified(const ui64 granuleId) const { auto it = GranulesWaiting.find(granuleId); Y_VERIFY(it != GranulesWaiting.end()); return it->second; } -TGranule::TPtr TProcessingController::GetGranule(const ui64 granuleId) { +TGranule::TPtr TProcessingController::GetGranule(const ui64 granuleId) const { auto itGranule = GranulesWaiting.find(granuleId); if (itGranule == GranulesWaiting.end()) { return nullptr; @@ -63,6 +63,7 @@ TGranule::TPtr TProcessingController::GetGranule(const ui64 granuleId) { TGranule::TPtr TProcessingController::InsertGranule(TGranule::TPtr g) { Y_VERIFY(GranulesWaiting.emplace(g->GetGranuleId(), g).second); + ++OriginalGranulesCount; return g; } @@ -70,6 +71,7 @@ void TProcessingController::StartBlobProcessing(const ui64 granuleId, const TBlo Counters.Aggregations->AddGranuleProcessingBytes(range.Size); if (GranulesInProcessing.emplace(granuleId).second) { if (granuleId) { + GetGranuleVerified(granuleId)->StartConstruction(); Y_VERIFY(GranulesWaiting.contains(granuleId)); Counters.Aggregations->AddGranuleProcessing(); } @@ -81,12 +83,23 @@ void TProcessingController::StartBlobProcessing(const ui64 granuleId, const TBlo } void TProcessingController::Abort() { + NotIndexedBatchesInitialized = true; GranulesWaiting.clear(); GranulesInProcessing.clear(); Counters.Aggregations->RemoveGranuleProcessingInfo(BlobsSize); BlobsSize = 0; } +TString TProcessingController::DebugString() const { + return TStringBuilder() + << "waiting:" << GranulesWaiting.size() << ";" + << "in_progress:" << GranulesInProcessing.size() << ";" + << "original_waiting:" << OriginalGranulesCount << ";" + << "common_granules_data:" << CommonGranuleData << ";" + << "common_initialized:" << NotIndexedBatchesInitialized << ";" + ; +} + NKikimr::NOlap::NIndexedReader::TBatch& TProcessingController::GetBatchInfoVerified(const TBatchAddress& address) { NIndexedReader::TBatch* bInfo = GetBatchInfo(address); Y_VERIFY(bInfo); diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.h b/ydb/core/tx/columnshard/engines/reader/processing_context.h index de40c312a27..587eee7b553 100644 --- a/ydb/core/tx/columnshard/engines/reader/processing_context.h +++ b/ydb/core/tx/columnshard/engines/reader/processing_context.h @@ -7,21 +7,36 @@ namespace NKikimr::NOlap::NIndexedReader { class TProcessingController { private: THashMap<ui64, TGranule::TPtr> GranulesWaiting; + ui32 OriginalGranulesCount = 0; + ui64 CommonGranuleData = 0; std::set<ui64> GranulesInProcessing; i64 BlobsSize = 0; bool NotIndexedBatchesInitialized = false; const NColumnShard::TConcreteScanCounters Counters; public: + TString DebugString() const; + bool IsGranuleActualForProcessing(const ui64 granuleId) const { + return GranulesWaiting.contains(granuleId) || (granuleId == 0 && !NotIndexedBatchesInitialized); + } + TProcessingController(const NColumnShard::TConcreteScanCounters& counters) : Counters(counters) { } + ~TProcessingController() { + Abort(); + } + void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches); NIndexedReader::TBatch* GetBatchInfo(const TBatchAddress& address); NIndexedReader::TBatch& GetBatchInfoVerified(const TBatchAddress& address); + const std::set<ui64>& GetProcessingGranules() const { + return GranulesInProcessing; + } + ui32 GetProcessingGranulesCount() const { return GranulesInProcessing.size(); } @@ -44,13 +59,13 @@ public: TGranule::TPtr ExtractReadyVerified(const ui64 granuleId); - TGranule::TPtr GetGranuleVerified(const ui64 granuleId); + TGranule::TPtr GetGranuleVerified(const ui64 granuleId) const; - bool IsFinished() const { return GranulesWaiting.empty(); } + bool IsFinished() const { return GranulesWaiting.empty() && NotIndexedBatchesInitialized; } TGranule::TPtr InsertGranule(TGranule::TPtr g); - TGranule::TPtr GetGranule(const ui64 granuleId); + TGranule::TPtr GetGranule(const ui64 granuleId) const; }; diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h index 677a3bdf3b9..8d595d778c3 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule.h @@ -222,6 +222,7 @@ public: bool NeedCompaction(const TCompactionLimits& limits) const { if (InCompaction() || Empty()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "granule_skipped_by_state")("granule_id", GetGranuleId())("granule_size", Size()); return false; } return NeedSplitCompaction(limits) || NeedInternalCompaction(limits); diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index 93e1049a1e8..fb5d78cacab 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -160,7 +160,7 @@ public: ctx.Send(SelfId(), new TEvents::TEvPoisonPill()); } else { while (IndexedData.HasMoreBlobs()) { - const auto blobRange = IndexedData.ExtractNextBlob(); + const auto blobRange = IndexedData.ExtractNextBlob(false); SendReadRequest(ctx, blobRange); } BuildResult(ctx); |