diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-21 17:47:19 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-21 17:47:19 +0300 |
commit | dc2d2c099383b5e10d74ffdcb8e26d784ed8bcd2 (patch) | |
tree | 517ed44af9d76bdee41b5982f12fdee1571bb569 | |
parent | af3b9b45cc3b7781373eb55e93f0db0cf5c19d1e (diff) | |
download | ydb-dc2d2c099383b5e10d74ffdcb8e26d784ed8bcd2.tar.gz |
fix validations and new signals
12 files changed, 52 insertions, 23 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp index 46fc3fc0b4..56d73d1747 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp @@ -66,7 +66,7 @@ NKikimr::NOlap::TPartialReadResult TColumnShardScanIterator::GetBatch() { } NKikimr::NColumnShard::TBlobRange TColumnShardScanIterator::GetNextBlobToRead() { - return IndexedData.NextBlob(); + return IndexedData.ExtractNextBlob(); } void TColumnShardScanIterator::FillReadyResults() { diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 2ae0630759..073d771917 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -87,7 +87,9 @@ public: , ReadMetadataRanges(std::move(readMetadataList)) , ReadMetadataIndex(0) , Deadline(TInstant::Now() + (timeout ? timeout + SCAN_HARD_TIMEOUT_GAP : SCAN_HARD_TIMEOUT)) - , ScanCountersPool(scanCountersPool) { + , ScanCountersPool(scanCountersPool) + , Stats(ScanCountersPool) + { KeyYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetKeyYqlSchema(); } @@ -211,7 +213,6 @@ private: "Scan " << ScanActorId << " blobs response:" << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); --InFlightReads; - auto& event = *ev->Get(); const auto& blobRange = event.BlobRange; ScanCountersPool.Aggregations->RemoveFlightReadInfo(blobRange.Size); @@ -356,8 +357,7 @@ private: NextReadMetadata(); } - const size_t MIN_READY_RESULTS_IN_QUEUE = 3; - if (ScanIterator && ScanIterator->ReadyResultsCount() < MIN_READY_RESULTS_IN_QUEUE) { + if (ScanIterator) { // Make read-ahead requests for the subsequent blobs ReadNextBlob(); } @@ -609,12 +609,19 @@ private: ui64 Bytes = 0; TDuration ReadingDurationSum; TDuration ReadingDurationMax; + NMonitoring::THistogramPtr DurationsCounter; public: + TBlobStats(const NMonitoring::THistogramPtr durationsCounter) + : DurationsCounter(durationsCounter) + { + + } void Received(const NBlobCache::TBlobRange& br, const TDuration d) { ReadingDurationSum += d; ReadingDurationMax = Max(ReadingDurationMax, d); ++PartsCount; Bytes += br.Size; + DurationsCounter->Collect(d.MilliSeconds()); } TString DebugString() const { TStringBuilder sb; @@ -645,6 +652,13 @@ private: THashMap<TString, TInstant> SectionLast; public: + TScanStats(const TConcreteScanCounters& counters) + : CacheBlobs(counters.HistogramCacheBlobsDuration) + , MissBlobs(counters.HistogramMissCacheBlobsDuration) + { + + } + TString DebugString() const { const TInstant now = TInstant::Now(); TStringBuilder sb; diff --git a/ydb/core/tx/columnshard/counters/scan.cpp b/ydb/core/tx/columnshard/counters/scan.cpp index 43331d18b3..cdfee85a6f 100644 --- a/ydb/core/tx/columnshard/counters/scan.cpp +++ b/ydb/core/tx/columnshard/counters/scan.cpp @@ -30,6 +30,9 @@ TScanCounters::TScanCounters(const TString& module) , TwoPhasesFilterUsefulBytes(TBase::GetDeriviative("TwoPhasesFilterUsefulBytes")) , TwoPhasesPostFilterFetchedBytes(TBase::GetDeriviative("TwoPhasesPostFilterFetchedBytes")) , TwoPhasesPostFilterUsefulBytes(TBase::GetDeriviative("TwoPhasesPostFilterUsefulBytes")) + , HistogramCacheBlobsDuration(TBase::GetHistogram("CacheBlobsDurationMs", NMonitoring::ExponentialHistogram(12, 2))) + , HistogramMissCacheBlobsDuration(TBase::GetHistogram("MissCacheBlobsDurationMs", NMonitoring::ExponentialHistogram(12, 2))) + { } diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h index 60f0365cb3..a28672c841 100644 --- a/ydb/core/tx/columnshard/counters/scan.h +++ b/ydb/core/tx/columnshard/counters/scan.h @@ -116,6 +116,9 @@ public: NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesPostFilterFetchedBytes; NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesPostFilterUsefulBytes; + NMonitoring::THistogramPtr HistogramCacheBlobsDuration; + NMonitoring::THistogramPtr HistogramMissCacheBlobsDuration; + TScanCounters(const TString& module = "Scan"); void OnProcessingOverloaded() { diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 49d7454ce7..1c6951a0bf 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -464,4 +464,18 @@ void TIndexedReadData::Abort() { GranulesContext->Abort(); } +NKikimr::NOlap::TBlobRange TIndexedReadData::ExtractNextBlob() { + Y_VERIFY(GranulesContext); + auto* f = FetchBlobsQueue.front(); + if (!f) { + return TBlobRange(); + } + if (!f->GetGranuleId() || GranulesContext->TryStartProcessGranule(f->GetGranuleId(), f->GetRange())) { + return FetchBlobsQueue.pop_front(); + } else { + Counters.OnProcessingOverloaded(); + return TBlobRange(); + } +} + } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index 44bc085053..8e26fc05e9 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -94,19 +94,7 @@ public: return FetchBlobsQueue.size(); } - TBlobRange NextBlob() { - Y_VERIFY(GranulesContext); - auto* f = FetchBlobsQueue.front(); - if (!f) { - return TBlobRange(); - } - if (GranulesContext->TryStartProcessGranule(f->GetGranuleId(), f->GetRange())) { - return FetchBlobsQueue.pop_front(); - } else { - Counters.OnProcessingOverloaded(); - return TBlobRange(); - } - } + TBlobRange ExtractNextBlob(); private: std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch( diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp index a93ecf5e35..b91877b993 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp @@ -87,7 +87,7 @@ std::vector<NKikimr::NOlap::NIndexedReader::TGranule::TPtr> TGranulesFillingCont const ui32 sizeBefore = Result.GetCount(); auto result = SortingPolicy->DetachReadyGranules(Result); if (sizeBefore == Result.GetCount()) { - Y_VERIFY(InternalReading || CheckBufferAvailable()); + Y_VERIFY(InternalReading || CheckBufferAvailable() || Processing.GetProcessingGranulesCount()); } return result; } diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h index b599068bc8..c8d2ea4963 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.h +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h @@ -29,7 +29,7 @@ private: bool PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const; static constexpr ui32 GranulesCountProcessingLimit = 16; - static constexpr ui64 ExpectedBytesForGranule = 200 * 1024 * 1024; + static constexpr ui64 ExpectedBytesForGranule = 50 * 1024 * 1024; static constexpr i64 ProcessingBytesLimit = GranulesCountProcessingLimit * ExpectedBytesForGranule; bool CheckBufferAvailable() const; public: diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp index d843ae7301..3c31ff4a9a 100644 --- a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp @@ -62,13 +62,15 @@ TGranule::TPtr TProcessingController::GetGranule(const ui64 granuleId) { TGranule::TPtr TProcessingController::InsertGranule(TGranule::TPtr g) { Y_VERIFY(GranulesWaiting.emplace(g->GetGranuleId(), g).second); - Counters.Aggregations->AddGranuleProcessing(); return g; } void TProcessingController::StartBlobProcessing(const ui64 granuleId, const TBlobRange& range) { Counters.Aggregations->AddGranuleProcessingBytes(range.Size); - GranulesInProcessing.emplace(granuleId); + if (GranulesInProcessing.emplace(granuleId).second) { + Y_VERIFY(GranulesWaiting.contains(granuleId)); + Counters.Aggregations->AddGranuleProcessing(); + } BlobsSize += range.Size; } diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.h b/ydb/core/tx/columnshard/engines/reader/processing_context.h index 4514010abc..ba8f3d22d4 100644 --- a/ydb/core/tx/columnshard/engines/reader/processing_context.h +++ b/ydb/core/tx/columnshard/engines/reader/processing_context.h @@ -21,6 +21,10 @@ public: NIndexedReader::TBatch* GetBatchInfo(const TBatchAddress& address); + ui32 GetProcessingGranulesCount() const { + return GranulesInProcessing.size(); + } + bool IsInProgress(const ui64 granuleId) const { return GranulesInProcessing.contains(granuleId); } diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index 3b379bb2ad..ca48cf66d5 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -174,7 +174,7 @@ public: IndexedData.InitRead(notIndexed); while (IndexedData.HasMoreBlobs()) { - const auto blobRange = IndexedData.NextBlob(); + const auto blobRange = IndexedData.ExtractNextBlob(); WaitIndexed.insert(blobRange); IndexedBlobs.emplace(blobRange); } diff --git a/ydb/core/tx/conveyor/service/service.cpp b/ydb/core/tx/conveyor/service/service.cpp index 2f7317b5a3..9d65cfb738 100644 --- a/ydb/core/tx/conveyor/service/service.cpp +++ b/ydb/core/tx/conveyor/service/service.cpp @@ -23,6 +23,7 @@ void TDistributor::Bootstrap() { for (ui32 i = 0; i < workersCount; ++i) { Workers.emplace_back(Register(new TWorker())); } + Counters.AvailableWorkersCount->Set(Workers.size()); Counters.WorkersCountLimit->Set(Workers.size()); Counters.WaitingQueueSizeLimit->Set(Config.GetQueueSizeLimit()); Become(&TDistributor::StateMain); |