aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-06-29 20:02:40 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-06-29 20:02:40 +0300
commitf5e6392913227cb1a4299c7722690a3d64616255 (patch)
tree78d2605d1fd0d088d580f3ad69a3c89097c80237
parent48ac40272844f8e875067d7e785b879304bdb9ce (diff)
downloadydb-f5e6392913227cb1a4299c7722690a3d64616255.tar.gz
use different queues instead of front/back
-rw-r--r--ydb/core/tx/columnshard/counters/scan.cpp4
-rw-r--r--ydb/core/tx/columnshard/counters/scan.h17
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp23
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h5
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.cpp17
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.h8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/processing_context.cpp16
-rw-r--r--ydb/core/tx/columnshard/engines/reader/processing_context.h1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/queue.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/reader/queue.h1
10 files changed, 74 insertions, 23 deletions
diff --git a/ydb/core/tx/columnshard/counters/scan.cpp b/ydb/core/tx/columnshard/counters/scan.cpp
index ec06d0ccdf8..68fd73c8674 100644
--- a/ydb/core/tx/columnshard/counters/scan.cpp
+++ b/ydb/core/tx/columnshard/counters/scan.cpp
@@ -8,6 +8,10 @@ TScanCounters::TScanCounters(const TString& module)
: TBase(module)
, ProcessingOverload(TBase::GetDeriviative("ProcessingOverload"))
, ReadingOverload(TBase::GetDeriviative("ReadingOverload"))
+ , PriorityFetchBytes(TBase::GetDeriviative("PriorityFetch/Bytes"))
+ , PriorityFetchCount(TBase::GetDeriviative("PriorityFetch/Count"))
+ , GeneralFetchBytes(TBase::GetDeriviative("GeneralFetch/Bytes"))
+ , GeneralFetchCount(TBase::GetDeriviative("GeneralFetch/Count"))
, PortionBytes(TBase::GetDeriviative("PortionBytes"))
, FilterBytes(TBase::GetDeriviative("FilterBytes"))
, PostFilterBytes(TBase::GetDeriviative("PostFilterBytes"))
diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h
index 1e1597e533a..f905b001d1f 100644
--- a/ydb/core/tx/columnshard/counters/scan.h
+++ b/ydb/core/tx/columnshard/counters/scan.h
@@ -92,6 +92,11 @@ private:
using TBase = TCommonCountersOwner;
NMonitoring::TDynamicCounters::TCounterPtr ProcessingOverload;
NMonitoring::TDynamicCounters::TCounterPtr ReadingOverload;
+
+ NMonitoring::TDynamicCounters::TCounterPtr PriorityFetchBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr PriorityFetchCount;
+ NMonitoring::TDynamicCounters::TCounterPtr GeneralFetchBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr GeneralFetchCount;
public:
NMonitoring::TDynamicCounters::TCounterPtr PortionBytes;
NMonitoring::TDynamicCounters::TCounterPtr FilterBytes;
@@ -125,7 +130,17 @@ public:
TScanCounters(const TString& module = "Scan");
- void OnProcessingOverloaded() {
+ void OnPriorityFetch(const ui64 size) const {
+ PriorityFetchBytes->Add(size);
+ PriorityFetchCount->Add(1);
+ }
+
+ void OnGeneralFetch(const ui64 size) const {
+ GeneralFetchBytes->Add(size);
+ GeneralFetchCount->Add(1);
+ }
+
+ void OnProcessingOverloaded() const {
ProcessingOverload->Add(1);
}
void OnReadingOverloaded() const {
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index d54f79f9b62..90758cc318a 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -109,7 +109,7 @@ void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::
if (batch.GetFetchedInfo().GetFilter()) {
Counters.PostFilterBytes->Add(range.Size);
ReadMetadata->ReadStats->DataAdditionalBytes += range.Size;
- FetchBlobsQueue.emplace_front(batch.GetGranule(), range);
+ PriorityBlobsQueue.emplace_back(batch.GetGranule(), range);
} else {
Counters.FilterBytes->Add(range.Size);
ReadMetadata->ReadStats->DataFilterBytes += range.Size;
@@ -129,12 +129,18 @@ void TIndexedReadData::InitRead(ui32 inputBatch) {
Y_VERIFY(!GranulesContext);
GranulesContext = std::make_unique<NIndexedReader::TGranulesFillingContext>(ReadMetadata, *this, OnePhaseReadMode);
ui64 portionsBytes = 0;
+ std::set<ui64> granulesReady;
+ ui64 prevGranule = 0;
for (auto& portionInfo : ReadMetadata->SelectInfo->GetPortionsOrdered(ReadMetadata->IsDescSorted())) {
portionsBytes += portionInfo.BlobsBytes();
Y_VERIFY_S(portionInfo.Records.size(), "ReadMeatadata: " << *ReadMetadata);
NIndexedReader::TGranule::TPtr granule = GranulesContext->UpsertGranule(portionInfo.Records[0].Granule);
granule->RegisterBatchForFetching(portionInfo);
+ if (prevGranule != portionInfo.Granule()) {
+ Y_VERIFY(granulesReady.emplace(portionInfo.Granule()).second);
+ prevGranule = portionInfo.Granule();
+ }
}
GranulesContext->PrepareForStart();
@@ -445,22 +451,33 @@ TIndexedReadData::TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata,
bool TIndexedReadData::IsFinished() const {
Y_VERIFY(GranulesContext);
- return NotIndexed.empty() && FetchBlobsQueue.empty() && GranulesContext->IsFinished();
+ return NotIndexed.empty() && FetchBlobsQueue.empty() && PriorityBlobsQueue.empty() && GranulesContext->IsFinished();
}
void TIndexedReadData::Abort() {
Y_VERIFY(GranulesContext);
FetchBlobsQueue.Stop();
+ PriorityBlobsQueue.Stop();
GranulesContext->Abort();
}
NKikimr::NOlap::TBlobRange TIndexedReadData::ExtractNextBlob() {
Y_VERIFY(GranulesContext);
+ {
+ auto* f = PriorityBlobsQueue.front();
+ if (f) {
+ GranulesContext->ForceStartProcessGranule(f->GetGranuleId(), f->GetRange());
+ Counters.OnPriorityFetch(f->GetRange().Size);
+ return PriorityBlobsQueue.pop_front();
+ }
+ }
+
auto* f = FetchBlobsQueue.front();
if (!f) {
return TBlobRange();
}
- if (!f->GetGranuleId() || GranulesContext->TryStartProcessGranule(f->GetGranuleId(), f->GetRange())) {
+ if (GranulesContext->TryStartProcessGranule(f->GetGranuleId(), f->GetRange())) {
+ Counters.OnGeneralFetch(f->GetRange().Size);
return FetchBlobsQueue.pop_front();
} else {
Counters.OnProcessingOverloaded();
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index 8e26fc05e97..3ea10230cf2 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -24,6 +24,7 @@ private:
NColumnShard::TConcreteScanCounters Counters;
NColumnShard::TDataTasksProcessorContainer TasksProcessor;
TFetchBlobsQueue FetchBlobsQueue;
+ TFetchBlobsQueue PriorityBlobsQueue;
NOlap::TReadMetadata::TConstPtr ReadMetadata;
bool OnePhaseReadMode = false;
std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed;
@@ -87,11 +88,11 @@ public:
}
void AddBlobToFetchInFront(const ui64 granuleId, const TBlobRange& range) {
- FetchBlobsQueue.emplace_front(granuleId, range);
+ PriorityBlobsQueue.emplace_back(granuleId, range);
}
bool HasMoreBlobs() const {
- return FetchBlobsQueue.size();
+ return FetchBlobsQueue.size() || PriorityBlobsQueue.size();
}
TBlobRange ExtractNextBlob();
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
index b91877b993c..7aeed88a99d 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
@@ -52,6 +52,10 @@ NIndexedReader::TBatch* TGranulesFillingContext::GetBatchInfo(const TBatchAddres
return Processing.GetBatchInfo(address);
}
+NIndexedReader::TBatch& TGranulesFillingContext::GetBatchInfoVerified(const TBatchAddress& address) {
+ return Processing.GetBatchInfoVerified(address);
+}
+
NKikimr::NColumnShard::TDataTasksProcessorContainer TGranulesFillingContext::GetTasksProcessor() const {
return Owner.GetTasksProcessor();
}
@@ -78,18 +82,19 @@ bool TGranulesFillingContext::CheckBufferAvailable() const {
Result.GetBlobsSize() + Processing.GetBlobsSize() < ProcessingBytesLimit;
}
+bool TGranulesFillingContext::ForceStartProcessGranule(const ui64 granuleId, const TBlobRange& range) {
+ Y_VERIFY_DEBUG(!Result.IsReady(granuleId));
+ Processing.StartBlobProcessing(granuleId, range);
+ return true;
+}
+
void TGranulesFillingContext::OnGranuleReady(const ui64 granuleId) {
Result.AddResult(Processing.ExtractReadyVerified(granuleId));
}
std::vector<NKikimr::NOlap::NIndexedReader::TGranule::TPtr> TGranulesFillingContext::DetachReadyInOrder() {
Y_VERIFY(SortingPolicy);
- const ui32 sizeBefore = Result.GetCount();
- auto result = SortingPolicy->DetachReadyGranules(Result);
- if (sizeBefore == Result.GetCount()) {
- Y_VERIFY(InternalReading || CheckBufferAvailable() || Processing.GetProcessingGranulesCount());
- }
- return result;
+ return SortingPolicy->DetachReadyGranules(Result);
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h
index c8d2ea4963a..571f88f1698 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h
@@ -11,7 +11,7 @@ class TIndexedReadData;
namespace NKikimr::NOlap::NIndexedReader {
-class TGranulesFillingContext {
+class TGranulesFillingContext: TNonCopyable {
private:
YDB_READONLY_DEF(std::vector<std::string>, PKColumnNames);
TReadMetadata::TConstPtr ReadMetadata;
@@ -33,7 +33,8 @@ private:
static constexpr i64 ProcessingBytesLimit = GranulesCountProcessingLimit * ExpectedBytesForGranule;
bool CheckBufferAvailable() const;
public:
- bool TryStartProcessGranule(const ui64 granuleId, const TBlobRange& range);
+ bool ForceStartProcessGranule(const ui64 granuleId, const TBlobRange& range);
+ bool TryStartProcessGranule(const ui64 granuleId, const TBlobRange & range);
TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData & owner, const bool internalReading);
void OnBlobReady(const ui64 /*granuleId*/, const TBlobRange& /*range*/) noexcept {
@@ -55,7 +56,7 @@ public:
return SortingPolicy;
}
- NColumnShard::TScanCounters GetCounters() const noexcept {
+ const NColumnShard::TConcreteScanCounters& GetCounters() const noexcept {
return Counters;
}
@@ -63,6 +64,7 @@ public:
void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches);
NIndexedReader::TBatch* GetBatchInfo(const TBatchAddress& address);
+ NIndexedReader::TBatch& GetBatchInfoVerified(const TBatchAddress& address);
void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch);
void OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch);
diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
index 3c31ff4a9aa..f4eb1186188 100644
--- a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
@@ -8,6 +8,7 @@ void TProcessingController::DrainNotIndexedBatches(THashMap<ui64, std::shared_pt
return;
}
NotIndexedBatchesInitialized = true;
+ GranulesInProcessing.erase(0);
auto granules = GranulesWaiting;
for (auto&& [_, gPtr] : granules) {
if (!batches) {
@@ -68,8 +69,13 @@ TGranule::TPtr TProcessingController::InsertGranule(TGranule::TPtr g) {
void TProcessingController::StartBlobProcessing(const ui64 granuleId, const TBlobRange& range) {
Counters.Aggregations->AddGranuleProcessingBytes(range.Size);
if (GranulesInProcessing.emplace(granuleId).second) {
- Y_VERIFY(GranulesWaiting.contains(granuleId));
- Counters.Aggregations->AddGranuleProcessing();
+ if (granuleId) {
+ Y_VERIFY(GranulesWaiting.contains(granuleId));
+ Counters.Aggregations->AddGranuleProcessing();
+ }
+ }
+ if (!granuleId) {
+ Y_VERIFY(!NotIndexedBatchesInitialized);
}
BlobsSize += range.Size;
}
@@ -81,4 +87,10 @@ void TProcessingController::Abort() {
BlobsSize = 0;
}
+NKikimr::NOlap::NIndexedReader::TBatch& TProcessingController::GetBatchInfoVerified(const TBatchAddress& address) {
+ NIndexedReader::TBatch* bInfo = GetBatchInfo(address);
+ Y_VERIFY(bInfo);
+ return *bInfo;
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.h b/ydb/core/tx/columnshard/engines/reader/processing_context.h
index ba8f3d22d43..de40c312a27 100644
--- a/ydb/core/tx/columnshard/engines/reader/processing_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/processing_context.h
@@ -20,6 +20,7 @@ public:
void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches);
NIndexedReader::TBatch* GetBatchInfo(const TBatchAddress& address);
+ NIndexedReader::TBatch& GetBatchInfoVerified(const TBatchAddress& address);
ui32 GetProcessingGranulesCount() const {
return GranulesInProcessing.size();
diff --git a/ydb/core/tx/columnshard/engines/reader/queue.cpp b/ydb/core/tx/columnshard/engines/reader/queue.cpp
index c578cd857dd..fc857a54291 100644
--- a/ydb/core/tx/columnshard/engines/reader/queue.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/queue.cpp
@@ -12,11 +12,6 @@ NKikimr::NOlap::TBlobRange TFetchBlobsQueue::pop_front() {
}
}
-void TFetchBlobsQueue::emplace_front(const ui64 granuleId, const TBlobRange& range) {
- Y_VERIFY(!StoppedFlag);
- IteratorBlobsSequential.emplace_front(granuleId, range);
-}
-
void TFetchBlobsQueue::emplace_back(const ui64 granuleId, const TBlobRange& range) {
Y_VERIFY(!StoppedFlag);
IteratorBlobsSequential.emplace_back(granuleId, range);
diff --git a/ydb/core/tx/columnshard/engines/reader/queue.h b/ydb/core/tx/columnshard/engines/reader/queue.h
index 383b0cc0937..7c5415000af 100644
--- a/ydb/core/tx/columnshard/engines/reader/queue.h
+++ b/ydb/core/tx/columnshard/engines/reader/queue.h
@@ -59,7 +59,6 @@ public:
}
TBlobRange pop_front();
- void emplace_front(const ui64 granuleId, const TBlobRange& range);
void emplace_back(const ui64 granuleId, const TBlobRange& range);
};