aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-05-03 19:10:27 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-05-03 19:10:27 +0300
commit76c944bc91c97bc0dec94800cfc19fc1d9f5c734 (patch)
treea2a6e260c2598cebc2b43524c5ab3e0e3fd6f8e9
parent7b101672fe85f87bdadd7b110b1aba1f9aa24414 (diff)
downloadydb-76c944bc91c97bc0dec94800cfc19fc1d9f5c734.tar.gz
control start merge for granule
remove useless hashmap remove useless checkers
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp24
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.cpp26
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.h6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.cpp36
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.h9
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_controller.cpp31
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_controller.h29
7 files changed, 100 insertions, 61 deletions
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 1cf45874a5..9ed9ebc93d 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -223,7 +223,7 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR
if (NotIndexed.size()) {
auto mergedBatch = MergeNotIndexed(std::move(NotIndexed)); // merged has no dups
if (mergedBatch) {
- // Init split by granules structs
+ // Init split by granules structures
Y_VERIFY(ReadMetadata->SelectInfo);
TColumnEngineForLogs::TMarksGranules marksGranules(*ReadMetadata->SelectInfo);
@@ -233,16 +233,20 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR
Y_VERIFY(!marksGranules.Empty());
auto outNotIndexed = marksGranules.SliceIntoGranules(mergedBatch, IndexInfo());
- GranulesContext->AddNotIndexedBatches(outNotIndexed);
+ GranulesContext->DrainNotIndexedBatches(&outNotIndexed);
Y_VERIFY(outNotIndexed.size() <= 1);
if (outNotIndexed.size() == 1) {
auto it = outNotIndexed.find(0);
Y_VERIFY(it != outNotIndexed.end());
NotIndexedOutscopeBatch = it->second;
}
+ } else {
+ GranulesContext->DrainNotIndexedBatches(nullptr);
}
NotIndexed.clear();
ReadyNotIndexed = 0;
+ } else {
+ GranulesContext->DrainNotIndexedBatches(nullptr);
}
// Extract ready to out granules: ready granules that are not blocked by other (not ready) granules
@@ -312,17 +316,11 @@ TIndexedReadData::MergeNotIndexed(std::vector<std::shared_ptr<arrow::RecordBatch
Y_VERIFY(ReadMetadata->IsSorted());
Y_VERIFY(IndexInfo().GetSortingKey());
- { // remove empty batches
- size_t dst = 0;
- for (size_t src = 0; src < batches.size(); ++src) {
- if (batches[src] && batches[src]->num_rows()) {
- if (dst != src) {
- batches[dst] = batches[src];
- }
- ++dst;
- }
- }
- batches.resize(dst);
+ {
+ const auto pred = [](const std::shared_ptr<arrow::RecordBatch>& b) {
+ return !b || !b->num_rows();
+ };
+ batches.erase(std::remove_if(batches.begin(), batches.end(), pred), batches.end());
}
if (batches.empty()) {
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
index ea1c4ee2f1..a8f6be4a73 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
@@ -55,22 +55,20 @@ NKikimr::NColumnShard::TDataTasksProcessorContainer TGranulesFillingContext::Get
return Owner.GetTasksProcessor();
}
-void TGranulesFillingContext::AddNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>& batches) {
- std::shared_ptr<arrow::RecordBatch> externalBatch;
- for (auto it = batches.begin(); it != batches.end(); ++it) {
- if (!it->first) {
- externalBatch = it->second;
- continue;
+void TGranulesFillingContext::DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches) {
+ for (auto&& [_, g] : Granules) {
+ if (!batches) {
+ g.AddNotIndexedBatch(nullptr);
+ } else {
+ auto it = batches->find(g.GetGranuleId());
+ if (it == batches->end()) {
+ g.AddNotIndexedBatch(nullptr);
+ } else {
+ g.AddNotIndexedBatch(it->second);
+ }
+ batches->erase(it);
}
- auto itGranule = Granules.find(it->first);
- Y_VERIFY(itGranule != Granules.end());
- itGranule->second.AddNotIndexedBatch(it->second);
}
- THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> resultLocal;
- if (externalBatch) {
- resultLocal.emplace(0, externalBatch);
- }
- std::swap(batches, resultLocal);
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h
index cab84fc290..96c9cb40e8 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h
@@ -34,7 +34,7 @@ public:
NColumnShard::TDataTasksProcessorContainer GetTasksProcessor() const;
- void AddNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>& batches);
+ void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches);
TBatch& GetBatchInfo(const ui32 batchNo);
void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch);
@@ -85,6 +85,10 @@ public:
Y_VERIFY(ReadyGranulesAccumulator.emplace(granule.GetGranuleId()).second || AbortedFlag);
}
+ void Wakeup(TGranule& granule) {
+ SortingPolicy->Wakeup(granule, *this);
+ }
+
void PrepareForStart() {
SortingPolicy->Fill(*this);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp
index 3de3003cbe..bd633044fc 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp
@@ -12,6 +12,7 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco
return;
}
}
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_batch")("granule_id", GranuleId)("batch_no", batchInfo.GetBatchNo())("count", WaitBatches.size());
Y_VERIFY(!ReadyFlag);
Y_VERIFY(WaitBatches.erase(batchInfo.GetBatchNo()));
if (batch && batch->num_rows()) {
@@ -31,10 +32,7 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco
}
}
Owner->OnBatchReady(batchInfo, batch);
- if (WaitBatches.empty()) {
- ReadyFlag = true;
- Owner->OnGranuleReady(*this);
- }
+ CheckReady();
}
NKikimr::NOlap::NIndexedReader::TBatch& TGranule::AddBatch(const ui32 batchNo, const TPortionInfo& portionInfo) {
@@ -95,18 +93,30 @@ std::deque<TBatch*> TGranule::SortBatchesByPK(const bool reverse, TReadMetadata:
}
void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) {
- if (!batch || !batch->num_rows()) {
+ Y_VERIFY(!NotIndexedBatchReadyFlag || !batch);
+ if (!NotIndexedBatchReadyFlag) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_batch")("granule_id", GranuleId)("batch_no", "add_not_indexed_batch")("count", WaitBatches.size());
+ } else {
return;
}
- AFL_ERROR(NKikimrServices::KQP_COMPUTE)("event", "add_not_indexed_batch");
- Y_VERIFY(NonSortableBatches.empty());
- Y_VERIFY(SortableBatches.empty());
- Y_VERIFY(!NotIndexedBatch);
- NotIndexedBatch = batch;
- if (Owner->GetReadMetadata()->Program) {
- NotIndexedBatchFutureFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, Owner->GetReadMetadata()->Program));
+ NotIndexedBatchReadyFlag = true;
+ if (batch && batch->num_rows()) {
+ Y_VERIFY(!NotIndexedBatch);
+ NotIndexedBatch = batch;
+ if (Owner->GetReadMetadata()->Program) {
+ NotIndexedBatchFutureFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, Owner->GetReadMetadata()->Program));
+ }
+ DuplicationsAvailableFlag = true;
+ }
+ CheckReady();
+ Owner->Wakeup(*this);
+}
+
+void TGranule::CheckReady() {
+ if (WaitBatches.empty() && NotIndexedBatchReadyFlag) {
+ ReadyFlag = true;
+ Owner->OnGranuleReady(*this);
}
- DuplicationsAvailableFlag = true;
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h
index cc6763a364..49543f6948 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.h
+++ b/ydb/core/tx/columnshard/engines/reader/granule.h
@@ -14,10 +14,13 @@ class TGranulesFillingContext;
class TGranule {
private:
YDB_READONLY(ui64, GranuleId, 0);
- std::vector<std::shared_ptr<arrow::RecordBatch>> NonSortableBatches;
- std::vector<std::shared_ptr<arrow::RecordBatch>> SortableBatches;
+
+ YDB_READONLY_FLAG(NotIndexedBatchReady, false);
YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, NotIndexedBatch);
YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, NotIndexedBatchFutureFilter);
+
+ std::vector<std::shared_ptr<arrow::RecordBatch>> NonSortableBatches;
+ std::vector<std::shared_ptr<arrow::RecordBatch>> SortableBatches;
YDB_FLAG_ACCESSOR(DuplicationsAvailable, false);
YDB_READONLY_FLAG(Ready, false);
std::deque<TBatch> Batches;
@@ -25,6 +28,8 @@ private:
std::set<ui32> GranuleBatchNumbers;
TGranulesFillingContext* Owner = nullptr;
YDB_READONLY_DEF(THashSet<const void*>, BatchesToDedup);
+
+ void CheckReady();
public:
TGranule(const ui64 granuleId, TGranulesFillingContext& owner)
: GranuleId(granuleId)
diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
index 0c82375122..3875f9b635 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
@@ -35,24 +35,27 @@ std::vector<TGranule*> TNonSorting::DoDetachReadyGranules(THashMap<ui64, NIndexe
return result;
}
-bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule& granule, TGranulesFillingContext& context) {
+bool TPKSortingWithLimit::DoWakeup(const TGranule& granule, TGranulesFillingContext& context) {
Y_VERIFY(ReadMetadata->Limit);
if (!CurrentItemsLimit) {
return false;
}
Y_VERIFY(GranulesOutOrderForPortions.size());
- if (granule.GetGranuleId() != GranulesOutOrderForPortions.front()->GetGranuleId()) {
+ if (GranulesOutOrderForPortions.front().GetGranule()->GetGranuleId() != granule.GetGranuleId()) {
return false;
}
while (GranulesOutOrderForPortions.size()) {
- auto it = OrderedBatches.find(GranulesOutOrderForPortions.front()->GetGranuleId());
- auto g = GranulesOutOrderForPortions.front();
- Y_VERIFY(it != OrderedBatches.end());
- if (!it->second.GetStarted()) {
- MergeStream.AddIndependentSource(g->GetNotIndexedBatch(), g->GetNotIndexedBatchFutureFilter());
- it->second.SetStarted(true);
+ auto& g = GranulesOutOrderForPortions.front();
+ // granule have to wait NotIndexedBatch initialization, at first (StartableFlag initialization).
+ // other batches will be delivered in OrderedBatches[granuleId] order
+ if (!g.GetGranule()->IsNotIndexedBatchReady()) {
+ break;
+ }
+ if (g.Start()) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "granule_started")("granule_id", g.GetGranule()->GetGranuleId())("count", GranulesOutOrderForPortions.size());
+ MergeStream.AddIndependentSource(g.GetGranule()->GetNotIndexedBatch(), g.GetGranule()->GetNotIndexedBatchFutureFilter());
}
- auto& batches = it->second.MutableBatches();
+ auto& batches = g.MutableBatches();
while (batches.size() && batches.front()->IsFiltered() && CurrentItemsLimit) {
auto b = batches.front();
if (b->IsSortableInGranule()) {
@@ -73,13 +76,16 @@ bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule&
b->InitBatch(nullptr);
batches.pop_front();
}
- OrderedBatches.erase(it);
GranulesOutOrderForPortions.pop_front();
} else {
break;
}
}
- return false;
+ return true;
+}
+
+bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule& granule, TGranulesFillingContext& context) {
+ return Wakeup(granule, context);
}
void TPKSortingWithLimit::DoFill(TGranulesFillingContext& context) {
@@ -87,9 +93,8 @@ void TPKSortingWithLimit::DoFill(TGranulesFillingContext& context) {
for (ui64 granule : granulesOrder) {
TGranule& g = context.GetGranuleVerified(granule);
GranulesOutOrder.emplace_back(&g);
- Y_VERIFY(OrderedBatches.emplace(granule, TGranuleScanInfo(g.SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata))).second);
+ GranulesOutOrderForPortions.emplace_back(g.SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata), &g);
}
- GranulesOutOrderForPortions = GranulesOutOrder;
}
std::vector<TGranule*> TPKSortingWithLimit::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) {
diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.h b/ydb/core/tx/columnshard/engines/reader/order_controller.h
index 2f697219b6..969c9cd0e3 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_controller.h
+++ b/ydb/core/tx/columnshard/engines/reader/order_controller.h
@@ -11,6 +11,9 @@ class IOrderPolicy {
protected:
TReadMetadata::TConstPtr ReadMetadata;
virtual void DoFill(TGranulesFillingContext& context) = 0;
+ virtual bool DoWakeup(const TGranule& /*granule*/, TGranulesFillingContext& /*context*/) {
+ return true;
+ }
virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) = 0;
virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& /*granule*/, TGranulesFillingContext& context) {
OnBatchFilterInitialized(batchInfo, context);
@@ -50,6 +53,10 @@ public:
void Fill(TGranulesFillingContext& context) {
DoFill(context);
}
+
+ bool Wakeup(const TGranule& granule, TGranulesFillingContext& context) {
+ return DoWakeup(granule, context);
+ }
};
class TNonSorting: public IOrderPolicy {
@@ -89,13 +96,25 @@ public:
}
};
-class TGranuleScanInfo {
+class TGranuleOrdered {
private:
- YDB_ACCESSOR(bool, Started, false);
+ bool StartedFlag = false;
YDB_ACCESSOR_DEF(std::deque<TBatch*>, Batches);
+ YDB_READONLY(const TGranule*, Granule, nullptr);
public:
- TGranuleScanInfo(std::deque<TBatch*>&& batches)
+ bool Start() {
+ if (!StartedFlag) {
+ StartedFlag = true;
+ return true;
+ } else {
+ return false;
+ }
+
+ }
+
+ TGranuleOrdered(std::deque<TBatch*>&& batches, TGranule* granule)
: Batches(std::move(batches))
+ , Granule(granule)
{
}
@@ -105,11 +124,11 @@ class TPKSortingWithLimit: public IOrderPolicy {
private:
using TBase = IOrderPolicy;
std::deque<TGranule*> GranulesOutOrder;
- std::deque<TGranule*> GranulesOutOrderForPortions;
- THashMap<ui64, TGranuleScanInfo> OrderedBatches;
+ std::deque<TGranuleOrdered> GranulesOutOrderForPortions;
ui32 CurrentItemsLimit = 0;
TMergePartialStream MergeStream;
protected:
+ virtual bool DoWakeup(const TGranule& granule, TGranulesFillingContext& context) override;
virtual void DoFill(TGranulesFillingContext& context) override;
virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override;
virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) override;