aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-06-21 17:47:19 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-06-21 17:47:19 +0300
commitdc2d2c099383b5e10d74ffdcb8e26d784ed8bcd2 (patch)
tree517ed44af9d76bdee41b5982f12fdee1571bb569
parentaf3b9b45cc3b7781373eb55e93f0db0cf5c19d1e (diff)
downloadydb-dc2d2c099383b5e10d74ffdcb8e26d784ed8bcd2.tar.gz
fix validations and new signals
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp22
-rw-r--r--ydb/core/tx/columnshard/counters/scan.cpp3
-rw-r--r--ydb/core/tx/columnshard/counters/scan.h3
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp14
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h14
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/processing_context.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/processing_context.h4
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp2
-rw-r--r--ydb/core/tx/conveyor/service/service.cpp1
12 files changed, 52 insertions, 23 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
index 46fc3fc0b4..56d73d1747 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
@@ -66,7 +66,7 @@ NKikimr::NOlap::TPartialReadResult TColumnShardScanIterator::GetBatch() {
}
NKikimr::NColumnShard::TBlobRange TColumnShardScanIterator::GetNextBlobToRead() {
- return IndexedData.NextBlob();
+ return IndexedData.ExtractNextBlob();
}
void TColumnShardScanIterator::FillReadyResults() {
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 2ae0630759..073d771917 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -87,7 +87,9 @@ public:
, ReadMetadataRanges(std::move(readMetadataList))
, ReadMetadataIndex(0)
, Deadline(TInstant::Now() + (timeout ? timeout + SCAN_HARD_TIMEOUT_GAP : SCAN_HARD_TIMEOUT))
- , ScanCountersPool(scanCountersPool) {
+ , ScanCountersPool(scanCountersPool)
+ , Stats(ScanCountersPool)
+ {
KeyYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetKeyYqlSchema();
}
@@ -211,7 +213,6 @@ private:
"Scan " << ScanActorId << " blobs response:"
<< " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
--InFlightReads;
-
auto& event = *ev->Get();
const auto& blobRange = event.BlobRange;
ScanCountersPool.Aggregations->RemoveFlightReadInfo(blobRange.Size);
@@ -356,8 +357,7 @@ private:
NextReadMetadata();
}
- const size_t MIN_READY_RESULTS_IN_QUEUE = 3;
- if (ScanIterator && ScanIterator->ReadyResultsCount() < MIN_READY_RESULTS_IN_QUEUE) {
+ if (ScanIterator) {
// Make read-ahead requests for the subsequent blobs
ReadNextBlob();
}
@@ -609,12 +609,19 @@ private:
ui64 Bytes = 0;
TDuration ReadingDurationSum;
TDuration ReadingDurationMax;
+ NMonitoring::THistogramPtr DurationsCounter;
public:
+ TBlobStats(const NMonitoring::THistogramPtr durationsCounter)
+ : DurationsCounter(durationsCounter)
+ {
+
+ }
void Received(const NBlobCache::TBlobRange& br, const TDuration d) {
ReadingDurationSum += d;
ReadingDurationMax = Max(ReadingDurationMax, d);
++PartsCount;
Bytes += br.Size;
+ DurationsCounter->Collect(d.MilliSeconds());
}
TString DebugString() const {
TStringBuilder sb;
@@ -645,6 +652,13 @@ private:
THashMap<TString, TInstant> SectionLast;
public:
+ TScanStats(const TConcreteScanCounters& counters)
+ : CacheBlobs(counters.HistogramCacheBlobsDuration)
+ , MissBlobs(counters.HistogramMissCacheBlobsDuration)
+ {
+
+ }
+
TString DebugString() const {
const TInstant now = TInstant::Now();
TStringBuilder sb;
diff --git a/ydb/core/tx/columnshard/counters/scan.cpp b/ydb/core/tx/columnshard/counters/scan.cpp
index 43331d18b3..cdfee85a6f 100644
--- a/ydb/core/tx/columnshard/counters/scan.cpp
+++ b/ydb/core/tx/columnshard/counters/scan.cpp
@@ -30,6 +30,9 @@ TScanCounters::TScanCounters(const TString& module)
, TwoPhasesFilterUsefulBytes(TBase::GetDeriviative("TwoPhasesFilterUsefulBytes"))
, TwoPhasesPostFilterFetchedBytes(TBase::GetDeriviative("TwoPhasesPostFilterFetchedBytes"))
, TwoPhasesPostFilterUsefulBytes(TBase::GetDeriviative("TwoPhasesPostFilterUsefulBytes"))
+ , HistogramCacheBlobsDuration(TBase::GetHistogram("CacheBlobsDurationMs", NMonitoring::ExponentialHistogram(12, 2)))
+ , HistogramMissCacheBlobsDuration(TBase::GetHistogram("MissCacheBlobsDurationMs", NMonitoring::ExponentialHistogram(12, 2)))
+
{
}
diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h
index 60f0365cb3..a28672c841 100644
--- a/ydb/core/tx/columnshard/counters/scan.h
+++ b/ydb/core/tx/columnshard/counters/scan.h
@@ -116,6 +116,9 @@ public:
NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesPostFilterFetchedBytes;
NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesPostFilterUsefulBytes;
+ NMonitoring::THistogramPtr HistogramCacheBlobsDuration;
+ NMonitoring::THistogramPtr HistogramMissCacheBlobsDuration;
+
TScanCounters(const TString& module = "Scan");
void OnProcessingOverloaded() {
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 49d7454ce7..1c6951a0bf 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -464,4 +464,18 @@ void TIndexedReadData::Abort() {
GranulesContext->Abort();
}
+NKikimr::NOlap::TBlobRange TIndexedReadData::ExtractNextBlob() {
+ Y_VERIFY(GranulesContext);
+ auto* f = FetchBlobsQueue.front();
+ if (!f) {
+ return TBlobRange();
+ }
+ if (!f->GetGranuleId() || GranulesContext->TryStartProcessGranule(f->GetGranuleId(), f->GetRange())) {
+ return FetchBlobsQueue.pop_front();
+ } else {
+ Counters.OnProcessingOverloaded();
+ return TBlobRange();
+ }
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index 44bc085053..8e26fc05e9 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -94,19 +94,7 @@ public:
return FetchBlobsQueue.size();
}
- TBlobRange NextBlob() {
- Y_VERIFY(GranulesContext);
- auto* f = FetchBlobsQueue.front();
- if (!f) {
- return TBlobRange();
- }
- if (GranulesContext->TryStartProcessGranule(f->GetGranuleId(), f->GetRange())) {
- return FetchBlobsQueue.pop_front();
- } else {
- Counters.OnProcessingOverloaded();
- return TBlobRange();
- }
- }
+ TBlobRange ExtractNextBlob();
private:
std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch(
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
index a93ecf5e35..b91877b993 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
@@ -87,7 +87,7 @@ std::vector<NKikimr::NOlap::NIndexedReader::TGranule::TPtr> TGranulesFillingCont
const ui32 sizeBefore = Result.GetCount();
auto result = SortingPolicy->DetachReadyGranules(Result);
if (sizeBefore == Result.GetCount()) {
- Y_VERIFY(InternalReading || CheckBufferAvailable());
+ Y_VERIFY(InternalReading || CheckBufferAvailable() || Processing.GetProcessingGranulesCount());
}
return result;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h
index b599068bc8..c8d2ea4963 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h
@@ -29,7 +29,7 @@ private:
bool PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const;
static constexpr ui32 GranulesCountProcessingLimit = 16;
- static constexpr ui64 ExpectedBytesForGranule = 200 * 1024 * 1024;
+ static constexpr ui64 ExpectedBytesForGranule = 50 * 1024 * 1024;
static constexpr i64 ProcessingBytesLimit = GranulesCountProcessingLimit * ExpectedBytesForGranule;
bool CheckBufferAvailable() const;
public:
diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
index d843ae7301..3c31ff4a9a 100644
--- a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
@@ -62,13 +62,15 @@ TGranule::TPtr TProcessingController::GetGranule(const ui64 granuleId) {
TGranule::TPtr TProcessingController::InsertGranule(TGranule::TPtr g) {
Y_VERIFY(GranulesWaiting.emplace(g->GetGranuleId(), g).second);
- Counters.Aggregations->AddGranuleProcessing();
return g;
}
void TProcessingController::StartBlobProcessing(const ui64 granuleId, const TBlobRange& range) {
Counters.Aggregations->AddGranuleProcessingBytes(range.Size);
- GranulesInProcessing.emplace(granuleId);
+ if (GranulesInProcessing.emplace(granuleId).second) {
+ Y_VERIFY(GranulesWaiting.contains(granuleId));
+ Counters.Aggregations->AddGranuleProcessing();
+ }
BlobsSize += range.Size;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.h b/ydb/core/tx/columnshard/engines/reader/processing_context.h
index 4514010abc..ba8f3d22d4 100644
--- a/ydb/core/tx/columnshard/engines/reader/processing_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/processing_context.h
@@ -21,6 +21,10 @@ public:
NIndexedReader::TBatch* GetBatchInfo(const TBatchAddress& address);
+ ui32 GetProcessingGranulesCount() const {
+ return GranulesInProcessing.size();
+ }
+
bool IsInProgress(const ui64 granuleId) const {
return GranulesInProcessing.contains(granuleId);
}
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index 3b379bb2ad..ca48cf66d5 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -174,7 +174,7 @@ public:
IndexedData.InitRead(notIndexed);
while (IndexedData.HasMoreBlobs()) {
- const auto blobRange = IndexedData.NextBlob();
+ const auto blobRange = IndexedData.ExtractNextBlob();
WaitIndexed.insert(blobRange);
IndexedBlobs.emplace(blobRange);
}
diff --git a/ydb/core/tx/conveyor/service/service.cpp b/ydb/core/tx/conveyor/service/service.cpp
index 2f7317b5a3..9d65cfb738 100644
--- a/ydb/core/tx/conveyor/service/service.cpp
+++ b/ydb/core/tx/conveyor/service/service.cpp
@@ -23,6 +23,7 @@ void TDistributor::Bootstrap() {
for (ui32 i = 0; i < workersCount; ++i) {
Workers.emplace_back(Register(new TWorker()));
}
+ Counters.AvailableWorkersCount->Set(Workers.size());
Counters.WorkersCountLimit->Set(Workers.size());
Counters.WaitingQueueSizeLimit->Set(Config.GetQueueSizeLimit());
Become(&TDistributor::StateMain);