aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-10-25 08:36:56 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-10-25 09:17:49 +0300
commita13962045e5307dc6ab38d061e75b1f518a8ae29 (patch)
treef60b1182eb0fd833c4cc83daf023d2a6dacc4237
parent01216eb22028efa53b8d26d3c9a84be78cef40ba (diff)
downloadydb-a13962045e5307dc6ab38d061e75b1f518a8ae29.tar.gz
KIKIMR-19213: use conveyors on merge results
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.cpp3
-rw-r--r--ydb/core/formats/arrow/reader/read_filter_merger.h3
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.h4
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp10
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.h3
-rw-r--r--ydb/core/tx/columnshard/counters/scan.h45
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp113
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h36
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h22
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp45
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h28
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_context.h6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.h31
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp2
27 files changed, 284 insertions, 101 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp
index 568ea59f6a..f9951b9432 100644
--- a/ydb/core/formats/arrow/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow/arrow_helpers.cpp
@@ -161,6 +161,9 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::
std::shared_ptr<arrow::RecordBatch> ExtractColumnsValidate(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
const std::vector<TString>& columnNames) {
+ if (!srcBatch) {
+ return srcBatch;
+ }
if (columnNames.empty()) {
return nullptr;
}
diff --git a/ydb/core/formats/arrow/reader/read_filter_merger.h b/ydb/core/formats/arrow/reader/read_filter_merger.h
index b8f97af026..1c05f52796 100644
--- a/ydb/core/formats/arrow/reader/read_filter_merger.h
+++ b/ydb/core/formats/arrow/reader/read_filter_merger.h
@@ -100,6 +100,9 @@ public:
bool IsEqual() const {
return !GreaterIfNotEqual;
}
+ bool IsLess() const {
+ return !!GreaterIfNotEqual && !*GreaterIfNotEqual;
+ }
bool IsGreater() const {
return !!GreaterIfNotEqual && *GreaterIfNotEqual;
}
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h
index 2664631c23..fb7e93ade7 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.h
@@ -87,10 +87,10 @@ public:
return ReadyResults.size();
}
- virtual TString DebugString() const override {
+ virtual TString DebugString(const bool verbose) const override {
return TStringBuilder()
<< "ready_results:(" << ReadyResults.DebugString() << ");"
- << "indexed_data:(" << IndexedData->DebugString() << ")"
+ << "indexed_data:(" << IndexedData->DebugString(verbose) << ")"
;
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 40ad858f28..a7c9873356 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -281,13 +281,7 @@ private:
const i64 maxSteps = ReadMetadataRanges.size();
for (i64 step = 0; step <= maxSteps; ++step) {
ContinueProcessingStep();
-
- // Only exist the loop if either:
- // * we have finished scanning ALL the ranges
- // * or there is an in-flight blob read or ScanData message for which
- // we will get a reply and will be able to proceed further
- if (!ScanIterator || !ChunksLimiter.HasMore() || InFlightReads
- || MemoryAccessor->InWaiting()) {
+ if (!ScanIterator || !ChunksLimiter.HasMore() || InFlightReads || MemoryAccessor->InWaiting() || ScanCountersPool.InWaiting()) {
return;
}
}
@@ -436,7 +430,7 @@ private:
<< " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId
<< " bytes: " << Bytes << " rows: " << Rows << " page faults: " << Result->PageFaults
<< " finished: " << Result->Finished << " pageFault: " << Result->PageFault
- << " stats:" << Stats.DebugString();
+ << " stats:" << Stats.DebugString() << ";iterator:" << (ScanIterator ? ScanIterator->DebugString(false) : "NO");
} else {
Y_ABORT_UNLESS(ChunksLimiter.Take(Bytes));
Result->RequestedBytesLimitReached = !ChunksLimiter.HasMore();
diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h
index 5ec177b6ae..f3a4fdc946 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.h
+++ b/ydb/core/tx/columnshard/columnshard__scan.h
@@ -122,7 +122,8 @@ public:
}
virtual std::shared_ptr<NOlap::NBlobOperations::NRead::ITask> GetNextTaskToRead() { return nullptr; }
- virtual TString DebugString() const {
+ virtual TString DebugString(const bool verbose = false) const {
+ Y_UNUSED(verbose);
return "NO_DATA";
}
};
diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h
index 437080b402..8a3d6db77d 100644
--- a/ydb/core/tx/columnshard/counters/scan.h
+++ b/ydb/core/tx/columnshard/counters/scan.h
@@ -162,12 +162,54 @@ public:
TScanAggregations BuildAggregations();
};
+class TCounterGuard: TNonCopyable {
+private:
+ std::shared_ptr<TAtomicCounter> Counter;
+public:
+ TCounterGuard(TCounterGuard&& guard) {
+ Counter = guard.Counter;
+ guard.Counter = nullptr;
+ }
+
+ TCounterGuard(const std::shared_ptr<TAtomicCounter>& counter)
+ : Counter(counter)
+ {
+ AFL_VERIFY(Counter);
+ Counter->Inc();
+ }
+ ~TCounterGuard() {
+ if (Counter) {
+ AFL_VERIFY(Counter->Dec() >= 0);
+ }
+ }
+
+};
+
class TConcreteScanCounters: public TScanCounters {
private:
using TBase = TScanCounters;
+ std::shared_ptr<TAtomicCounter> MergeTasksCount;
+ std::shared_ptr<TAtomicCounter> AssembleTasksCount;
+ std::shared_ptr<TAtomicCounter> ReadTasksCount;
public:
TScanAggregations Aggregations;
+ TCounterGuard GetMergeTasksGuard() const {
+ return TCounterGuard(MergeTasksCount);
+ }
+
+ TCounterGuard GetReadTasksGuard() const {
+ return TCounterGuard(ReadTasksCount);
+ }
+
+ TCounterGuard GetAssembleTasksGuard() const {
+ return TCounterGuard(AssembleTasksCount);
+ }
+
+ bool InWaiting() const {
+ return MergeTasksCount->Val() || AssembleTasksCount->Val() || ReadTasksCount->Val();
+ }
+
void OnBlobsWaitDuration(const TDuration d, const TDuration fullScanDuration) const {
TBase::OnBlobsWaitDuration(d);
Aggregations.OnBlobWaitingDuration(d, fullScanDuration);
@@ -175,6 +217,9 @@ public:
TConcreteScanCounters(const TScanCounters& counters)
: TBase(counters)
+ , MergeTasksCount(std::make_shared<TAtomicCounter>())
+ , AssembleTasksCount(std::make_shared<TAtomicCounter>())
+ , ReadTasksCount(std::make_shared<TAtomicCounter>())
, Aggregations(TBase::BuildAggregations())
{
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.darwin-x86_64.txt
index 10911162af..067782793e 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.darwin-x86_64.txt
@@ -13,6 +13,7 @@ target_link_libraries(engines-reader-plain_reader PUBLIC
yutil
core-formats-arrow
tx-columnshard-blobs_action
+ tx-conveyor-usage
)
target_sources(engines-reader-plain_reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-aarch64.txt
index 52f5d3db88..7907ec8619 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-aarch64.txt
@@ -14,6 +14,7 @@ target_link_libraries(engines-reader-plain_reader PUBLIC
yutil
core-formats-arrow
tx-columnshard-blobs_action
+ tx-conveyor-usage
)
target_sources(engines-reader-plain_reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-x86_64.txt
index 52f5d3db88..7907ec8619 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.linux-x86_64.txt
@@ -14,6 +14,7 @@ target_link_libraries(engines-reader-plain_reader PUBLIC
yutil
core-formats-arrow
tx-columnshard-blobs_action
+ tx-conveyor-usage
)
target_sources(engines-reader-plain_reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.windows-x86_64.txt
index 10911162af..067782793e 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/CMakeLists.windows-x86_64.txt
@@ -13,6 +13,7 @@ target_link_libraries(engines-reader-plain_reader PUBLIC
yutil
core-formats-arrow
tx-columnshard-blobs_action
+ tx-conveyor-usage
)
target_sources(engines-reader-plain_reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp
index c755f3263b..047dbf52ab 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp
@@ -27,11 +27,12 @@ bool TAssembleFFBatch::DoApply(IDataReader& owner) const {
}
TAssembleBatch::TAssembleBatch(const NActors::TActorId& scanActorId, TPortionInfo::TPreparedBatchData&& batchConstructor,
- const ui32 sourceIdx, const std::shared_ptr<NArrow::TColumnFilter>& filter)
+ const ui32 sourceIdx, const std::shared_ptr<NArrow::TColumnFilter>& filter, NColumnShard::TCounterGuard&& taskGuard)
: TBase(scanActorId)
, BatchConstructor(batchConstructor)
, Filter(filter)
, SourceIdx(sourceIdx)
+ , TaskGuard(std::move(taskGuard))
{
TBase::SetPriority(TBase::EPriority::High);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h
index 3a8262b2ce..fc3df48012 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h
@@ -3,6 +3,7 @@
#include <ydb/core/tx/columnshard/engines/reader/conveyor_task.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
+#include <ydb/core/tx/columnshard/counters/scan.h>
#include <ydb/core/formats/arrow/arrow_filter.h>
namespace NKikimr::NOlap::NPlainReader {
@@ -15,6 +16,7 @@ private:
protected:
std::shared_ptr<arrow::RecordBatch> Result;
const ui32 SourceIdx;
+ const NColumnShard::TCounterGuard TaskGuard;
virtual bool DoExecute() override;
public:
virtual TString GetTaskClassIdentifier() const override {
@@ -22,7 +24,7 @@ public:
}
TAssembleBatch(const NActors::TActorId& scanActorId, TPortionInfo::TPreparedBatchData&& batchConstructor,
- const ui32 sourceIdx, const std::shared_ptr<NArrow::TColumnFilter>& filter);
+ const ui32 sourceIdx, const std::shared_ptr<NArrow::TColumnFilter>& filter, NColumnShard::TCounterGuard&& taskGuard);
};
class TAssembleFFBatch: public TAssembleBatch {
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp
index ab5e5372c5..69520d4bdd 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp
@@ -21,13 +21,14 @@ bool TCommittedAssembler::DoApply(IDataReader& owner) const {
}
TCommittedAssembler::TCommittedAssembler(const NActors::TActorId& scanActorId, const TString& blobData, const TReadMetadata::TConstPtr& readMetadata, const ui32 sourceIdx,
- const TCommittedBlob& cBlob)
+ const TCommittedBlob& cBlob, NColumnShard::TCounterGuard&& taskGuard)
: TBase(scanActorId)
, BlobData(blobData)
, ReadMetadata(readMetadata)
, SourceIdx(sourceIdx)
, SchemaVersion(cBlob.GetSchemaVersion())
, DataSnapshot(cBlob.GetSnapshot())
+ , TaskGuard(std::move(taskGuard))
{
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h
index f0b00d990c..322a17efbf 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h
@@ -4,6 +4,7 @@
#include <ydb/core/tx/columnshard/engines/reader/read_metadata.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
+#include <ydb/core/tx/columnshard/counters/scan.h>
#include <ydb/core/formats/arrow/arrow_filter.h>
namespace NKikimr::NOlap::NPlainReader {
@@ -18,6 +19,7 @@ private:
std::shared_ptr<NArrow::TColumnFilter> EarlyFilter;
std::shared_ptr<arrow::RecordBatch> ResultBatch;
+ const NColumnShard::TCounterGuard TaskGuard;
protected:
virtual bool DoExecute() override;
virtual bool DoApply(IDataReader& owner) const override;
@@ -27,6 +29,6 @@ public:
}
TCommittedAssembler(const NActors::TActorId& scanActorId, const TString& blobData, const TReadMetadata::TConstPtr& readMetadata, const ui32 sourceIdx,
- const TCommittedBlob& cBlob);
+ const TCommittedBlob& cBlob, NColumnShard::TCounterGuard&& taskGuard);
};
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp
index b00f3ef17b..b0d1dd2d36 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp
@@ -32,12 +32,12 @@ TPortionInfo::TPreparedBatchData TAssembleColumnsTaskConstructor::BuildBatchAsse
void TEFTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) {
NConveyor::TScanServiceOperator::SendTaskToExecute(std::make_shared<TAssembleFilter>(ScanActorId, BuildBatchAssembler(),
- ReadMetadata, SourceIdx, ColumnIds, UseEarlyFilter));
+ ReadMetadata, SourceIdx, ColumnIds, UseEarlyFilter, Context.GetCounters().GetAssembleTasksGuard()));
}
void TFFColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) {
NConveyor::TScanServiceOperator::SendTaskToExecute(std::make_shared<TAssembleFFBatch>(ScanActorId, BuildBatchAssembler(),
- SourceIdx, AppliedFilter));
+ SourceIdx, AppliedFilter, Context.GetCounters().GetAssembleTasksGuard()));
}
void TCommittedColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) {
@@ -45,7 +45,7 @@ void TCommittedColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr<NReso
Y_ABORT_UNLESS(NullBlocks.size() == 0);
Y_ABORT_UNLESS(blobs.size() == 1);
NConveyor::TScanServiceOperator::SendTaskToExecute(std::make_shared<TCommittedAssembler>(ScanActorId, blobs.begin()->second,
- ReadMetadata, SourceIdx, CommittedBlob));
+ ReadMetadata, SourceIdx, CommittedBlob, Context.GetCounters().GetAssembleTasksGuard()));
}
bool IFetchTaskConstructor::DoOnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) {
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h
index c24d463720..a9d71aa349 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h
@@ -17,6 +17,7 @@ protected:
std::shared_ptr<const TReadMetadata> ReadMetadata;
TReadContext Context;
THashMap<TBlobRange, ui32> NullBlocks;
+ NColumnShard::TCounterGuard TasksGuard;
virtual bool DoOnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override;
public:
IFetchTaskConstructor(IDataReader& reader, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks, const IDataSource& source, const TString& taskCustomer)
@@ -26,6 +27,7 @@ public:
, ReadMetadata(reader.GetReadMetadata())
, Context(reader.GetContext())
, NullBlocks(std::move(nullBlocks))
+ , TasksGuard(reader.GetCounters().GetReadTasksGuard())
{
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h
index 30ece24411..ec7eb52c2b 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h
@@ -21,6 +21,7 @@ namespace NKikimr::NOlap::NPlainReader {
bool AllowEarlyFilter = false;
std::set<ui32> FilterColumnIds;
const bool UseFilter = true;
+ const NColumnShard::TCounterGuard TaskGuard;
protected:
virtual bool DoApply(IDataReader& owner) const override;
virtual bool DoExecute() override;
@@ -31,13 +32,14 @@ namespace NKikimr::NOlap::NPlainReader {
}
TAssembleFilter(const NActors::TActorId& scanActorId, TPortionInfo::TPreparedBatchData&& batchConstructor, NOlap::TReadMetadata::TConstPtr readMetadata,
- const ui32 sourceIdx, const std::set<ui32>& filterColumnIds, const bool useFilter)
+ const ui32 sourceIdx, const std::set<ui32>& filterColumnIds, const bool useFilter, NColumnShard::TCounterGuard&& taskGuard)
: TBase(scanActorId)
, BatchConstructor(batchConstructor)
, SourceIdx(sourceIdx)
, ReadMetadata(readMetadata)
, FilterColumnIds(filterColumnIds)
, UseFilter(useFilter)
+ , TaskGuard(std::move(taskGuard))
{
TBase::SetPriority(TBase::EPriority::Normal);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
index b55dbba5e3..e43fa12260 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
@@ -1,5 +1,7 @@
#include "interval.h"
#include "scanner.h"
+#include "plain_read_data.h"
+#include <ydb/core/tx/conveyor/usage/service.h>
namespace NKikimr::NOlap::NPlainReader {
@@ -7,40 +9,84 @@ bool TFetchingInterval::IsExclusiveSource() const {
return IncludeStart && Sources.size() == 1 && IncludeFinish;
}
+class TMergeTask: public NColumnShard::IDataTasksProcessor::ITask, public TMergingContext {
+private:
+ std::shared_ptr<NIndexedReader::TMergePartialStream> Merger;
+ std::shared_ptr<arrow::RecordBatch> ResultBatch;
+ NColumnShard::TConcreteScanCounters Counters;
+ const NColumnShard::TCounterGuard Guard;
+ const bool IsExclusiveSource = false;
+ const ui32 OriginalSourcesCount;
+protected:
+ virtual bool DoApply(NOlap::IDataReader& indexedDataRead) const override {
+ auto& reader = static_cast<TPlainReadData&>(indexedDataRead);
+ if (Merger->GetSourcesCount() == 1 && ResultBatch) {
+ auto batch = NArrow::ExtractColumnsValidate(ResultBatch, reader.GetScanner().GetResultFieldNames());
+ AFL_VERIFY(batch);
+ reader.MutableScanner().OnIntervalResult(batch, IntervalIdx);
+ } else {
+ reader.MutableScanner().OnIntervalResult(ResultBatch, IntervalIdx);
+ }
+ return true;
+ }
+ virtual bool DoExecute() override {
+ Merger->SkipToLowerBound(Start, IncludeStart);
+ if (Merger->GetSourcesCount() == 1) {
+ ResultBatch = Merger->SingleSourceDrain(Finish, IncludeFinish);
+ if (ResultBatch) {
+ if (IsExclusiveSource) {
+ Counters.OnNoScanInterval(ResultBatch->num_rows());
+ } else {
+ Counters.OnLogScanInterval(ResultBatch->num_rows());
+ }
+ }
+ if (IncludeFinish && OriginalSourcesCount == 1) {
+ Y_ABORT_UNLESS(Merger->IsEmpty());
+ }
+ } else {
+ Merger->DrainCurrentTo(*RBBuilder, Finish, IncludeFinish);
+ Counters.OnLinearScanInterval(RBBuilder->GetRecordsCount());
+ ResultBatch = RBBuilder->Finalize();
+ }
+ return true;
+ }
+public:
+ virtual TString GetTaskClassIdentifier() const override {
+ return "CS::MERGE_RESULT";
+ }
+
+ TMergeTask(const std::shared_ptr<NIndexedReader::TMergePartialStream>& merger,
+ const TMergingContext& context, const NColumnShard::TConcreteScanCounters& counters, const bool isExclusiveSource, const ui32 sourcesCount)
+ : TMergingContext(context)
+ , Merger(merger)
+ , Counters(counters)
+ , Guard(Counters.GetMergeTasksGuard())
+ , IsExclusiveSource(isExclusiveSource)
+ , OriginalSourcesCount(sourcesCount)
+ {
+ }
+};
+
void TFetchingInterval::ConstructResult() {
- if (!Merger || !IsSourcesReady()) {
+ if (!IsSourcesReady()) {
return;
}
+ AFL_VERIFY(!ResultConstructionInProgress);
+ ResultConstructionInProgress = true;
+ auto merger = Scanner.BuildMerger();
for (auto&& [_, i] : Sources) {
- if (i->GetStart().Compare(Start) == std::partial_ordering::equivalent && !i->IsMergingStarted()) {
- if (auto rb = i->GetBatch()) {
- Merger->AddSource(rb, i->GetFilterStageData().GetNotAppliedEarlyFilter());
- }
- i->StartMerging();
+ if (auto rb = i->GetBatch()) {
+ merger->AddSource(rb, i->GetFilterStageData().GetNotAppliedEarlyFilter());
}
}
- std::shared_ptr<arrow::RecordBatch> simpleBatch;
- AFL_VERIFY(Merger->GetSourcesCount() <= Sources.size());
- if (Sources.size() == 1) {
- simpleBatch = Merger->SingleSourceDrain(Finish, IncludeFinish);
- if (simpleBatch) {
- if (IsExclusiveSource()) {
- Scanner.GetContext().GetCounters().OnNoScanInterval(simpleBatch->num_rows());
- } else {
- Scanner.GetContext().GetCounters().OnLogScanInterval(simpleBatch->num_rows());
- }
- simpleBatch = NArrow::ExtractColumnsValidate(simpleBatch, Scanner.GetResultFieldNames());
- AFL_VERIFY(simpleBatch);
- }
- if (IncludeFinish) {
- Y_ABORT_UNLESS(Merger->IsEmpty());
- }
+ AFL_VERIFY(merger->GetSourcesCount() <= Sources.size());
+ if (merger->GetSourcesCount() == 0) {
+ Scanner.OnIntervalResult(nullptr, IntervalIdx);
} else {
- Merger->DrainCurrentTo(*RBBuilder, Finish, IncludeFinish);
- Scanner.GetContext().GetCounters().OnLinearScanInterval(RBBuilder->GetRecordsCount());
- simpleBatch = RBBuilder->Finalize();
+ auto task = std::make_shared<TMergeTask>(merger, *this, Scanner.GetContext().GetCounters(), IsExclusiveSource(), Sources.size());
+ task->SetPriority(NConveyor::ITask::EPriority::High);
+ NConveyor::TScanServiceOperator::SendTaskToExecute(task);
}
- Scanner.OnIntervalResult(simpleBatch, GetIntervalIdx());
}
void TFetchingInterval::OnSourceFetchStageReady(const ui32 /*sourceIdx*/) {
@@ -51,23 +97,12 @@ void TFetchingInterval::OnSourceFilterStageReady(const ui32 /*sourceIdx*/) {
ConstructResult();
}
-void TFetchingInterval::StartMerge(std::shared_ptr<NIndexedReader::TMergePartialStream> merger) {
- Y_ABORT_UNLESS(!Merger);
- Merger = merger;
- ConstructResult();
-}
-
TFetchingInterval::TFetchingInterval(const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish,
const ui32 intervalIdx, const std::map<ui32, std::shared_ptr<IDataSource>>& sources, TScanHead& scanner,
std::shared_ptr<NIndexedReader::TRecordBatchBuilder> builder, const bool includeFinish, const bool includeStart)
- : Scanner(scanner)
- , Start(start)
- , Finish(finish)
- , IncludeFinish(includeFinish)
- , IncludeStart(includeStart)
+ : TBase(start, finish, intervalIdx, builder, includeFinish, includeStart)
+ , Scanner(scanner)
, Sources(sources)
- , IntervalIdx(intervalIdx)
- , RBBuilder(builder)
{
Y_ABORT_UNLESS(Sources.size());
for (auto&& [_, i] : Sources) {
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h
index c67eced0ea..17977320ee 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h
@@ -6,17 +6,37 @@ namespace NKikimr::NOlap::NPlainReader {
class TScanHead;
-class TFetchingInterval: TNonCopyable {
-private:
- TScanHead& Scanner;
+class TMergingContext {
+protected:
NIndexedReader::TSortableBatchPosition Start;
NIndexedReader::TSortableBatchPosition Finish;
const bool IncludeFinish = true;
const bool IncludeStart = false;
- std::map<ui32, std::shared_ptr<IDataSource>> Sources;
- YDB_READONLY(ui32, IntervalIdx, 0);
- std::shared_ptr<NIndexedReader::TMergePartialStream> Merger;
std::shared_ptr<NIndexedReader::TRecordBatchBuilder> RBBuilder;
+ ui32 IntervalIdx = 0;
+public:
+ TMergingContext(const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish,
+ const ui32 intervalIdx, std::shared_ptr<NIndexedReader::TRecordBatchBuilder> builder, const bool includeFinish, const bool includeStart)
+ : Start(start)
+ , Finish(finish)
+ , IncludeFinish(includeFinish)
+ , IncludeStart(includeStart)
+ , RBBuilder(builder)
+ , IntervalIdx(intervalIdx) {
+
+ }
+
+ ui32 GetIntervalIdx() const {
+ return IntervalIdx;
+ }
+};
+
+class TFetchingInterval: public TMergingContext, TNonCopyable {
+private:
+ using TBase = TMergingContext;
+ bool ResultConstructionInProgress = false;
+ TScanHead& Scanner;
+ std::map<ui32, std::shared_ptr<IDataSource>> Sources;
bool IsExclusiveSource() const;
void ConstructResult();
@@ -58,10 +78,6 @@ public:
return result;
}
- bool HasMerger() const {
- return !!Merger;
- }
-
void OnSourceFetchStageReady(const ui32 sourceIdx);
void OnSourceFilterStageReady(const ui32 sourceIdx);
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
index 501c714751..193b4cac84 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.cpp
@@ -60,7 +60,6 @@ TPlainReadData::TPlainReadData(TReadMetadata::TConstPtr readMetadata, const TRea
}
std::vector<NKikimr::NOlap::TPartialReadResult> TPlainReadData::DoExtractReadyResults(const int64_t maxRowsInBatch) {
- Scanner->DrainResults();
if (ReadyResultsCount < maxRowsInBatch && !Scanner->IsFinished()) {
return {};
}
@@ -104,7 +103,7 @@ std::shared_ptr<NBlobOperations::NRead::ITask> TPlainReadData::DoExtractNextRead
return nullptr;
}
-void TPlainReadData::OnIntervalResult(std::shared_ptr<arrow::RecordBatch> batch) {
+void TPlainReadData::OnIntervalResult(const std::shared_ptr<arrow::RecordBatch>& batch) {
if (batch && batch->num_rows()) {
TPartialReadResult result(std::make_shared<TScanMemoryLimiter::TGuard>(Context.GetMemoryAccessor()), batch);
ReadyResultsCount += result.GetRecordsCount();
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h
index d2505ddf44..6332bb36ae 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h
@@ -28,12 +28,16 @@ private:
TFetchBlobsQueue PriorityQueue;
bool AbortedFlag = false;
protected:
- virtual TString DoDebugString() const override {
- return TStringBuilder() <<
+ virtual TString DoDebugString(const bool verbose) const override {
+ TStringBuilder sb;
+ sb <<
"ef=" << EFColumns->DebugString() << ";" <<
"pk=" << PKColumns->DebugString() << ";" <<
- "ff=" << FFColumns->DebugString() << ";"
- ;
+ "ff=" << FFColumns->DebugString() << ";";
+ if (verbose) {
+ sb << "intervals_schema=" << Scanner->DebugString();
+ }
+ return sb;
}
virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t /*maxRowsInBatch*/) override;
@@ -64,7 +68,15 @@ public:
}
}
- void OnIntervalResult(std::shared_ptr<arrow::RecordBatch> batch);
+ const TScanHead& GetScanner() const {
+ return *Scanner;
+ }
+
+ TScanHead& MutableScanner() {
+ return *Scanner;
+ }
+
+ void OnIntervalResult(const std::shared_ptr<arrow::RecordBatch>& batch);
TPlainReadData(TReadMetadata::TConstPtr readMetadata, const TReadContext& context);
~TPlainReadData() {
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
index f34b4fe716..6c5f60fe14 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.cpp
@@ -6,11 +6,22 @@
namespace NKikimr::NOlap::NPlainReader {
void TScanHead::OnIntervalResult(const std::shared_ptr<arrow::RecordBatch>& batch, const ui32 intervalIdx) {
+ AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, batch).second);
Y_ABORT_UNLESS(FetchingIntervals.size());
- Y_ABORT_UNLESS(FetchingIntervals.front().GetIntervalIdx() == intervalIdx);
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "interval_result")("interval", FetchingIntervals.front().GetIntervalIdx())("count", batch ? batch->num_rows() : 0);
- FetchingIntervals.pop_front();
- Reader.OnIntervalResult(batch);
+ while (FetchingIntervals.size()) {
+ auto it = ReadyIntervals.find(FetchingIntervals.front().GetIntervalIdx());
+ if (it == ReadyIntervals.end()) {
+ break;
+ }
+ const std::shared_ptr<arrow::RecordBatch>& batch = it->second;
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "interval_result")("interval", FetchingIntervals.front().GetIntervalIdx())("count", batch ? batch->num_rows() : 0);
+ FetchingIntervals.pop_front();
+ Reader.OnIntervalResult(batch);
+ ReadyIntervals.erase(it);
+ }
+ if (FetchingIntervals.empty()) {
+ AFL_VERIFY(ReadyIntervals.empty());
+ }
}
TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, TPlainReadData& reader)
@@ -22,15 +33,14 @@ TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, TPlainR
ResultFields.emplace_back(resultSchema->GetFieldByColumnIdVerified(f));
ResultFieldNames.emplace_back(ResultFields.back()->name());
}
- Merger = std::make_shared<NIndexedReader::TMergePartialStream>(reader.GetReadMetadata()->GetReplaceKey(), std::make_shared<arrow::Schema>(ResultFields), reader.GetReadMetadata()->IsDescSorted());
+ ResultSchema = std::make_shared<arrow::Schema>(ResultFields);
DrainSources();
}
bool TScanHead::BuildNextInterval() {
while (BorderPoints.size()) {
- auto position = BorderPoints.begin()->first;
auto firstBorderPointInfo = std::move(BorderPoints.begin()->second);
- const bool isIncludeStart = CurrentSegments.empty();
+ bool includeStart = firstBorderPointInfo.GetStartSources().size();
for (auto&& i : firstBorderPointInfo.GetStartSources()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("add_source", i->GetSourceIdx());
@@ -38,9 +48,11 @@ bool TScanHead::BuildNextInterval() {
}
if (firstBorderPointInfo.GetStartSources().size() && firstBorderPointInfo.GetFinishSources().size()) {
+ includeStart = false;
FetchingIntervals.emplace_back(
BorderPoints.begin()->first, BorderPoints.begin()->first, SegmentIdxCounter++, CurrentSegments,
*this, std::make_shared<NIndexedReader::TRecordBatchBuilder>(ResultFields), true, true);
+ IntervalStats.emplace_back(CurrentSegments.size(), true);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "new_interval")("interval", FetchingIntervals.back().DebugJson());
}
@@ -56,25 +68,18 @@ bool TScanHead::BuildNextInterval() {
const bool includeFinish = BorderPoints.begin()->second.GetStartSources().empty();
FetchingIntervals.emplace_back(
*CurrentStart, BorderPoints.begin()->first, SegmentIdxCounter++, CurrentSegments,
- *this, std::make_shared<NIndexedReader::TRecordBatchBuilder>(ResultFields), includeFinish, isIncludeStart);
+ *this, std::make_shared<NIndexedReader::TRecordBatchBuilder>(ResultFields), includeFinish, includeStart);
+ IntervalStats.emplace_back(CurrentSegments.size(), false);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "new_interval")("interval", FetchingIntervals.back().DebugJson());
return true;
+ } else {
+ IntervalStats.emplace_back(CurrentSegments.size(), false);
}
}
return false;
}
-void TScanHead::DrainResults() {
- while (FetchingIntervals.size()) {
- if (!FetchingIntervals.front().HasMerger()) {
- FetchingIntervals.front().StartMerge(Merger);
- } else {
- break;
- }
- }
-}
-
void TScanHead::DrainSources() {
while (Sources.size()) {
auto source = Sources.front();
@@ -96,4 +101,8 @@ NKikimr::NOlap::NPlainReader::TFetchingPlan TScanHead::GetColumnsFetchingPlan(co
return Reader.GetColumnsFetchingPlan(exclusiveSource);
}
+std::shared_ptr<NKikimr::NOlap::NIndexedReader::TMergePartialStream> TScanHead::BuildMerger() const {
+ return std::make_shared<NIndexedReader::TMergePartialStream>(Reader.GetReadMetadata()->GetReplaceKey(), ResultSchema, Reader.GetReadMetadata()->IsDescSorted());
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h
index 74f20ec565..20579b6452 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/scanner.h
@@ -20,20 +20,34 @@ public:
}
};
+class TIntervalStat {
+private:
+ YDB_READONLY(ui32, SourcesCount, 0);
+ YDB_READONLY(bool, IsPoint, false);
+public:
+ TIntervalStat(const ui32 sourcesCount, const bool isPoint)
+ : SourcesCount(sourcesCount)
+ , IsPoint(isPoint)
+ {
+
+ }
+};
+
class TScanHead {
private:
TPlainReadData& Reader;
std::deque<std::shared_ptr<IDataSource>> Sources;
std::vector<std::shared_ptr<arrow::Field>> ResultFields;
+ std::shared_ptr<arrow::Schema> ResultSchema;
YDB_READONLY_DEF(std::vector<TString>, ResultFieldNames);
THashMap<ui32, std::shared_ptr<IDataSource>> SourceByIdx;
std::map<NIndexedReader::TSortableBatchPosition, TDataSourceEndpoint> BorderPoints;
std::map<ui32, std::shared_ptr<IDataSource>> CurrentSegments;
std::optional<NIndexedReader::TSortableBatchPosition> CurrentStart;
std::deque<TFetchingInterval> FetchingIntervals;
+ THashMap<ui32, std::shared_ptr<arrow::RecordBatch>> ReadyIntervals;
ui32 SegmentIdxCounter = 0;
- std::shared_ptr<NIndexedReader::TMergePartialStream> Merger;
-
+ std::vector<TIntervalStat> IntervalStats;
void DrainSources();
public:
@@ -67,6 +81,14 @@ public:
TReadContext& GetContext();
+ TString DebugString() const {
+ TStringBuilder sb;
+ for (auto&& i : IntervalStats) {
+ sb << (i.GetIsPoint() ? "^" : "") << i.GetSourcesCount() << ";";
+ }
+ return sb;
+ }
+
void OnIntervalResult(const std::shared_ptr<arrow::RecordBatch>& batch, const ui32 intervalIdx);
std::shared_ptr<IDataSource> GetSourceVerified(const ui32 idx) const {
auto it = SourceByIdx.find(idx);
@@ -78,7 +100,7 @@ public:
bool BuildNextInterval();
- void DrainResults();
+ std::shared_ptr<NIndexedReader::TMergePartialStream> BuildMerger() const;
};
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make b/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make
index e500dac1ef..7a1e596b79 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make
@@ -16,6 +16,7 @@ SRCS(
PEERDIR(
ydb/core/formats/arrow
ydb/core/tx/columnshard/blobs_action
+ ydb/core/tx/conveyor/usage
)
END()
diff --git a/ydb/core/tx/columnshard/engines/reader/read_context.h b/ydb/core/tx/columnshard/engines/reader/read_context.h
index 59492588a7..30b40b2112 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_context.h
@@ -55,7 +55,7 @@ protected:
TReadContext Context;
std::shared_ptr<const TReadMetadata> ReadMetadata;
virtual std::shared_ptr<NBlobOperations::NRead::ITask> DoExtractNextReadTask(const bool hasReadyResults) = 0;
- virtual TString DoDebugString() const = 0;
+ virtual TString DoDebugString(const bool verbose) const = 0;
virtual void DoAbort() = 0;
virtual bool DoIsFinished() const = 0;
virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) = 0;
@@ -109,12 +109,12 @@ public:
return DoIsFinished();
}
- TString DebugString() const {
+ TString DebugString(const bool verbose) const {
TStringBuilder sb;
sb << "internal:" << Context.GetIsInternalRead() << ";"
<< "has_buffer:" << (GetMemoryAccessor() ? GetMemoryAccessor()->HasBuffer() : true) << ";"
;
- sb << DoDebugString();
+ sb << DoDebugString(verbose);
return sb;
}
std::shared_ptr<NBlobOperations::NRead::ITask> ExtractNextReadTask(const bool hasReadyResults) {
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
index 3c99dc6d69..c1e68c034d 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
@@ -291,6 +291,35 @@ public:
Y_ABORT_UNLESS(!DataSchema || DataSchema->num_fields());
}
+ void SkipToLowerBound(const TSortableBatchPosition& pos, const bool include) {
+ if (SortHeap.Empty()) {
+ return;
+ }
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("pos", pos.DebugJson().GetStringRobust())("heap", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
+ while (!SortHeap.Empty()) {
+ const auto cmpResult = SortHeap.Current().GetKeyColumns().Compare(pos);
+ if (cmpResult == std::partial_ordering::greater) {
+ break;
+ }
+ if (cmpResult == std::partial_ordering::equivalent && include) {
+ break;
+ }
+ const TSortableBatchPosition::TFoundPosition skipPos = SortHeap.MutableCurrent().SkipToLower(pos);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("pos", pos.DebugJson().GetStringRobust())("heap", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
+ if (skipPos.IsEqual()) {
+ if (!include && !SortHeap.MutableCurrent().Next()) {
+ SortHeap.RemoveTop();
+ } else {
+ SortHeap.UpdateTop();
+ }
+ } else if (skipPos.IsLess()) {
+ SortHeap.RemoveTop();
+ } else {
+ SortHeap.UpdateTop();
+ }
+ }
+ }
+
void SetPossibleSameVersion(const bool value) {
PossibleSameVersionFlag = value;
}
@@ -299,7 +328,7 @@ public:
return SortHeap.Size();
}
- bool GetSourcesCount() const {
+ ui32 GetSourcesCount() const {
return SortHeap.Size();
}
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index bcf6fceda9..0b5fae38da 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -148,7 +148,7 @@ public:
void Bootstrap(const TActorContext& ctx) {
IndexedData = ReadMetadata->BuildReader(NOlap::TReadContext(Storages, Counters, true), ReadMetadata);
- LOG_S_DEBUG("Starting read (" << IndexedData->DebugString() << ") at tablet " << TabletId);
+ LOG_S_DEBUG("Starting read (" << IndexedData->DebugString(false) << ") at tablet " << TabletId);
bool earlyExit = false;
if (Deadline != TInstant::Max()) {