aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-05-04 16:35:21 +0300
committerchertus <azuikov@ydb.tech>2023-05-04 16:35:21 +0300
commit5813fbc93132b092da12b713d11c19c91d551bcb (patch)
treea3c5c699ad1e1d4bda1a06cbba0102d265e3b593
parent6b1bed39e996ecde5d9637b97612d7c4f36467f3 (diff)
downloadydb-5813fbc93132b092da12b713d11c19c91d551bcb.tar.gz
remove YDB_* macro from core/tx/column_shard
-rw-r--r--ydb/core/tx/columnshard/counters.h50
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h12
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h47
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.h72
-rw-r--r--ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/conveyor_task.h12
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.h30
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.h46
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_controller.cpp30
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_controller.h17
-rw-r--r--ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp16
-rw-r--r--ydb/core/tx/columnshard/engines/reader/queue.h6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.h12
17 files changed, 264 insertions, 107 deletions
diff --git a/ydb/core/tx/columnshard/counters.h b/ydb/core/tx/columnshard/counters.h
index f862fb2ccdf..f9875de8f7f 100644
--- a/ydb/core/tx/columnshard/counters.h
+++ b/ydb/core/tx/columnshard/counters.h
@@ -4,33 +4,31 @@
namespace NKikimr::NColumnShard {
-class TScanCounters {
-private:
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, PortionBytes);
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterBytes);
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, PostFilterBytes);
-
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, AssembleFilterCount);
-
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterOnlyCount);
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterOnlyFetchedBytes);
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterOnlyUsefulBytes);
-
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, EmptyFilterCount);
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, EmptyFilterFetchedBytes);
-
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, OriginalRowsCount);
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilteredRowsCount);
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, SkippedBytes);
-
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesCount);
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesFilterFetchedBytes);
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesFilterUsefulBytes);
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesPostFilterFetchedBytes);
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesPostFilterUsefulBytes);
-public:
+struct TScanCounters {
+ NMonitoring::TDynamicCounters::TCounterPtr PortionBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr FilterBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr PostFilterBytes;
+
+ NMonitoring::TDynamicCounters::TCounterPtr AssembleFilterCount;
+
+ NMonitoring::TDynamicCounters::TCounterPtr FilterOnlyCount;
+ NMonitoring::TDynamicCounters::TCounterPtr FilterOnlyFetchedBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr FilterOnlyUsefulBytes;
+
+ NMonitoring::TDynamicCounters::TCounterPtr EmptyFilterCount;
+ NMonitoring::TDynamicCounters::TCounterPtr EmptyFilterFetchedBytes;
+
+ NMonitoring::TDynamicCounters::TCounterPtr OriginalRowsCount;
+ NMonitoring::TDynamicCounters::TCounterPtr FilteredRowsCount;
+ NMonitoring::TDynamicCounters::TCounterPtr SkippedBytes;
+
+ NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesCount;
+ NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesFilterFetchedBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesFilterUsefulBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesPostFilterFetchedBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesPostFilterUsefulBytes;
+
TScanCounters(const TString& module = "Scan");
};
-
}
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 9ed9ebc93d4..d992f9dd638 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -117,11 +117,11 @@ void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::
Y_VERIFY(IndexedBlobs.emplace(range).second);
Y_VERIFY(IndexedBlobSubscriber.emplace(range, &batch).second);
if (batch.GetFilter()) {
- Counters.GetPostFilterBytes()->Add(range.Size);
+ Counters.PostFilterBytes->Add(range.Size);
ReadMetadata->ReadStats->DataAdditionalBytes += range.Size;
FetchBlobsQueue.emplace_front(range);
} else {
- Counters.GetFilterBytes()->Add(range.Size);
+ Counters.FilterBytes->Add(range.Size);
ReadMetadata->ReadStats->DataFilterBytes += range.Size;
FetchBlobsQueue.emplace_back(range);
}
@@ -151,7 +151,7 @@ void TIndexedReadData::InitRead(ui32 inputBatch) {
}
GranulesContext->PrepareForStart();
- Counters.GetPortionBytes()->Add(portionsBytes);
+ Counters.PortionBytes->Add(portionsBytes);
auto& stats = ReadMetadata->ReadStats;
stats->IndexGranules = ReadMetadata->SelectInfo->Granules.size();
stats->IndexPortions = ReadMetadata->SelectInfo->Portions.size();
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index ffdc27fb5b1..a243682b867 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -21,8 +21,8 @@ class TIndexedReadData {
private:
std::unique_ptr<NIndexedReader::TGranulesFillingContext> GranulesContext;
- YDB_READONLY_DEF(NColumnShard::TScanCounters, Counters);
- YDB_READONLY_DEF(NColumnShard::TDataTasksProcessorContainer, TasksProcessor);
+ NColumnShard::TScanCounters Counters;
+ NColumnShard::TDataTasksProcessorContainer TasksProcessor;
TFetchBlobsQueue& FetchBlobsQueue;
NOlap::TReadMetadata::TConstPtr ReadMetadata;
bool OnePhaseReadMode = false;
@@ -38,6 +38,14 @@ public:
TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, TFetchBlobsQueue& fetchBlobsQueue,
const bool internalRead, const NColumnShard::TScanCounters& counters, NColumnShard::TDataTasksProcessorContainer tasksProcessor);
+ const NColumnShard::TScanCounters& GetCounters() const noexcept {
+ return Counters;
+ }
+
+ const NColumnShard::TDataTasksProcessorContainer& GetTasksProcessor() const noexcept {
+ return TasksProcessor;
+ }
+
NIndexedReader::TGranulesFillingContext& GetGranulesContext() {
Y_VERIFY(GranulesContext);
return *GranulesContext;
diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp
index f5174fd6068..664d9bc3a54 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portion_info.cpp
@@ -233,11 +233,12 @@ std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble(con
std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const {
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
std::vector< std::shared_ptr<arrow::Field>> fields;
+ ui64 limit = options.RecordsCountLimit ? *options.RecordsCountLimit : Max<ui64>();
for (auto&& i : Columns) {
if (!options.IsAcceptedColumn(i.GetColumnId())) {
continue;
}
- columns.emplace_back(i.Assemble(options.GetRecordsCountLimitDef(Max<ui32>()), !options.IsForwardAssemble()));
+ columns.emplace_back(i.Assemble(limit, !options.ForwardAssemble));
fields.emplace_back(i.GetField());
}
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index 4efd4c181d1..5db5aca2a05 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -254,8 +254,8 @@ public:
class TAssembleBlobInfo {
private:
- YDB_READONLY(ui32, NullRowsCount, 0);
- YDB_READONLY_DEF(TString, Data);
+ ui32 NullRowsCount = 0;
+ TString Data;
public:
TAssembleBlobInfo(const ui32 rowsCount)
: NullRowsCount(rowsCount) {
@@ -267,6 +267,14 @@ public:
}
+ ui32 GetNullRowsCount() const noexcept {
+ return NullRowsCount;
+ }
+
+ const TString& GetData() const noexcept {
+ return Data;
+ }
+
std::shared_ptr<arrow::RecordBatch> BuildRecordBatch(std::shared_ptr<arrow::Schema> schema) const {
if (NullRowsCount) {
Y_VERIFY(!Data);
@@ -280,10 +288,14 @@ public:
class TPreparedColumn {
private:
- YDB_READONLY(ui32, ColumnId, 0);
+ ui32 ColumnId = 0;
std::shared_ptr<arrow::Field> Field;
std::vector<TAssembleBlobInfo> Blobs;
public:
+ ui32 GetColumnId() const noexcept {
+ return ColumnId;
+ }
+
const std::string& GetName() const {
return Field->name();
}
@@ -309,18 +321,20 @@ public:
std::shared_ptr<arrow::Schema> Schema;
public:
+ struct TAssembleOptions {
+ const bool ForwardAssemble = true;
+ std::optional<ui32> RecordsCountLimit;
+ std::optional<std::set<ui32>> IncludedColumnIds;
+ std::optional<std::set<ui32>> ExcludedColumnIds;
- std::vector<std::string> GetSchemaColumnNames() const {
- return Schema->field_names();
- }
+ TAssembleOptions() noexcept
+ : TAssembleOptions(true)
+ {}
+
+ explicit TAssembleOptions(bool forward) noexcept
+ : ForwardAssemble(forward)
+ {}
- class TAssembleOptions {
- private:
- YDB_OPT(ui32, RecordsCountLimit);
- YDB_FLAG_ACCESSOR(ForwardAssemble, true);
- YDB_OPT(std::set<ui32>, IncludedColumnIds);
- YDB_OPT(std::set<ui32>, ExcludedColumnIds);
- public:
bool IsAcceptedColumn(const ui32 columnId) const {
if (IncludedColumnIds && !IncludedColumnIds->contains(columnId)) {
return false;
@@ -332,6 +346,10 @@ public:
}
};
+ std::vector<std::string> GetSchemaColumnNames() const {
+ return Schema->field_names();
+ }
+
size_t GetColumnsCount() const {
return Columns.size();
}
@@ -340,10 +358,9 @@ public:
: Columns(std::move(columns))
, Schema(schema)
{
-
}
- std::shared_ptr<arrow::RecordBatch> Assemble(const TAssembleOptions& options = Default<TAssembleOptions>()) const;
+ std::shared_ptr<arrow::RecordBatch> Assemble(const TAssembleOptions& options = {}) const;
};
template <class TExternalBlobInfo>
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp
index dfe30b754f2..47e00395b62 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp
@@ -19,7 +19,7 @@ TBatch::TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionI
Owner->SetDuplicationsAvailable(true);
if (portionInfo.CanHaveDups()) {
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "dup_portion");
- DuplicationsAvailableFlag = true;
+ DuplicationsAvailable = true;
}
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h
index d854b4a9976..16a2fb33bb7 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.h
+++ b/ydb/core/tx/columnshard/engines/reader/batch.h
@@ -19,31 +19,79 @@ class TGranule;
class TBatch {
private:
- YDB_READONLY(ui64, BatchNo, 0);
- YDB_READONLY(ui64, Portion, 0);
- YDB_READONLY(ui64, Granule, 0);
- YDB_READONLY(ui64, WaitingBytes, 0);
- YDB_READONLY(ui64, FetchedBytes, 0);
+ ui64 BatchNo = 0;
+ ui64 Portion = 0;
+ ui64 Granule = 0;
+ ui64 WaitingBytes = 0;
+ ui64 FetchedBytes = 0;
THashSet<TBlobRange> WaitIndexed;
- YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilteredBatch);
- YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilterBatch);
- YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter);
- YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, FutureFilter);
-
+ std::shared_ptr<arrow::RecordBatch> FilteredBatch;
+ std::shared_ptr<arrow::RecordBatch> FilterBatch;
+ std::shared_ptr<NArrow::TColumnFilter> Filter;
+ std::shared_ptr<NArrow::TColumnFilter> FutureFilter;
+
ui32 OriginalRecordsCount = 0;
- YDB_READONLY_FLAG(DuplicationsAvailable, false);
+ bool DuplicationsAvailable = false;
THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data;
TGranule* Owner;
const TPortionInfo* PortionInfo = nullptr;
- YDB_READONLY_DEF(std::optional<std::set<ui32>>, CurrentColumnIds);
+ std::optional<std::set<ui32>> CurrentColumnIds;
std::set<ui32> AskedColumnIds;
void ResetCommon(const std::set<ui32>& columnIds);
ui64 GetUsefulBytes(const ui64 bytes) const;
public:
+ bool IsDuplicationsAvailable() const noexcept {
+ return DuplicationsAvailable;
+ }
+
+ void SetDuplicationsAvailable(bool val) noexcept {
+ DuplicationsAvailable = val;
+ }
+
+ ui64 GetBatchNo() const noexcept {
+ return BatchNo;
+ }
+
+ ui64 GetPortion() const noexcept {
+ return Portion;
+ }
+
+ ui64 GetGranule() const noexcept {
+ return Granule;
+ }
+
+ ui64 GetWaitingBytes() const noexcept {
+ return WaitingBytes;
+ }
+
+ ui64 GetFetchedBytes() const noexcept {
+ return FetchedBytes;
+ }
+
+ const std::optional<std::set<ui32>>& GetCurrentColumnIds() const noexcept {
+ return CurrentColumnIds;
+ }
+
+ const std::shared_ptr<arrow::RecordBatch>& GetFilteredBatch() const noexcept {
+ return FilteredBatch;
+ }
+
+ const std::shared_ptr<arrow::RecordBatch>& GetFilterBatch() const noexcept {
+ return FilterBatch;
+ }
+
+ const std::shared_ptr<NArrow::TColumnFilter>& GetFilter() const noexcept {
+ return Filter;
+ }
+
+ const std::shared_ptr<NArrow::TColumnFilter>& GetFutureFilter() const noexcept {
+ return FutureFilter;
+ }
+
ui64 GetUsefulWaitingBytes() const {
return GetUsefulBytes(WaitingBytes);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
index 7da0a05e6d3..344748263e1 100644
--- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
@@ -7,7 +7,7 @@ bool IDataTasksProcessor::ITask::DoExecute() {
if (OwnerOperator && OwnerOperator->IsStopped()) {
return true;
} else {
- DataProcessedFlag = true;
+ DataProcessed = true;
return DoExecuteImpl();
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
index 040a2fe730e..afe942fbe1c 100644
--- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
+++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
@@ -20,7 +20,7 @@ public:
class ITask: public NConveyor::ITask {
private:
std::shared_ptr<IDataTasksProcessor> OwnerOperator;
- YDB_READONLY_FLAG(DataProcessed, false);
+ bool DataProcessed = false;
protected:
TDataTasksProcessorContainer GetTasksProcessorContainer() const;
virtual bool DoApply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const = 0;
@@ -38,6 +38,10 @@ public:
using TPtr = std::shared_ptr<ITask>;
virtual ~ITask() = default;
bool Apply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const;
+
+ bool IsDataProcessed() const noexcept {
+ return DataProcessed;
+ }
};
protected:
virtual bool DoAdd(ITask::TPtr task) = 0;
@@ -64,7 +68,7 @@ public:
class TDataTasksProcessorContainer {
private:
- YDB_READONLY_DEF(IDataTasksProcessor::TPtr, Object);
+ IDataTasksProcessor::TPtr Object;
public:
TDataTasksProcessorContainer() = default;
TDataTasksProcessorContainer(IDataTasksProcessor::TPtr object)
@@ -91,6 +95,10 @@ public:
return Object && Object->IsStopped();
}
+ IDataTasksProcessor::TPtr GetObject() const noexcept {
+ return Object;
+ }
+
void Add(NOlap::NIndexedReader::TGranulesFillingContext& context, IDataTasksProcessor::ITask::TPtr task);
};
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h
index 96c9cb40e8b..e9652e0b11b 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h
@@ -13,18 +13,18 @@ namespace NKikimr::NOlap::NIndexedReader {
class TGranulesFillingContext {
private:
bool AbortedFlag = false;
- YDB_READONLY_DEF(TReadMetadata::TConstPtr, ReadMetadata);
+ TReadMetadata::TConstPtr ReadMetadata;
const bool InternalReading = false;
TIndexedReadData& Owner;
THashMap<ui64, NIndexedReader::TGranule*> GranulesToOut;
std::set<ui64> ReadyGranulesAccumulator;
THashMap<ui64, NIndexedReader::TGranule> Granules;
- YDB_READONLY_DEF(std::set<ui32>, EarlyFilterColumns);
- YDB_READONLY_DEF(std::set<ui32>, PostFilterColumns);
+ std::set<ui32> EarlyFilterColumns;
+ std::set<ui32> PostFilterColumns;
std::set<ui32> FilterStageColumns;
std::set<ui32> UsedColumns;
- YDB_READONLY_DEF(IOrderPolicy::TPtr, SortingPolicy);
- YDB_READONLY_DEF(NColumnShard::TScanCounters, Counters);
+ IOrderPolicy::TPtr SortingPolicy;
+ NColumnShard::TScanCounters Counters;
std::vector<NIndexedReader::TBatch*> Batches;
bool PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const;
@@ -32,6 +32,26 @@ private:
public:
TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading, const ui32 batchesCount);
+ TReadMetadata::TConstPtr GetReadMetadata() const noexcept {
+ return ReadMetadata;
+ }
+
+ const std::set<ui32>& GetEarlyFilterColumns() const noexcept {
+ return EarlyFilterColumns;
+ }
+
+ const std::set<ui32>& GetPostFilterColumns() const noexcept {
+ return PostFilterColumns;
+ }
+
+ IOrderPolicy::TPtr GetSortingPolicy() const noexcept {
+ return SortingPolicy;
+ }
+
+ NColumnShard::TScanCounters GetCounters() const noexcept {
+ return Counters;
+ }
+
NColumnShard::TDataTasksProcessorContainer GetTasksProcessor() const;
void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches);
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
index 7531c0dd176..c7f9c5f0bac 100644
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
@@ -9,7 +9,7 @@ bool TAssembleFilter::DoExecuteImpl() {
/// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here
TPortionInfo::TPreparedBatchData::TAssembleOptions options;
- options.SetIncludedColumnIds(FilterColumnIds);
+ options.IncludedColumnIds = FilterColumnIds;
auto batch = BatchConstructor.Assemble(options);
Y_VERIFY(batch);
Y_VERIFY(batch->num_rows());
@@ -36,7 +36,7 @@ bool TAssembleFilter::DoExecuteImpl() {
if ((size_t)batch->schema()->num_fields() < BatchConstructor.GetColumnsCount()) {
TPortionInfo::TPreparedBatchData::TAssembleOptions options;
- options.SetExcludedColumnIds(FilterColumnIds);
+ options.ExcludedColumnIds = FilterColumnIds;
auto addBatch = BatchConstructor.Assemble(options);
Y_VERIFY(addBatch);
Y_VERIFY(Filter->Apply(addBatch));
@@ -53,8 +53,8 @@ bool TAssembleFilter::DoExecuteImpl() {
bool TAssembleFilter::DoApply(TGranulesFillingContext& owner) const {
TBatch& batch = owner.GetBatchInfo(BatchNo);
Y_VERIFY(OriginalCount);
- owner.GetCounters().GetOriginalRowsCount()->Add(OriginalCount);
- owner.GetCounters().GetAssembleFilterCount()->Add(1);
+ owner.GetCounters().OriginalRowsCount->Add(OriginalCount);
+ owner.GetCounters().AssembleFilterCount->Add(1);
batch.InitFilter(Filter, FilteredBatch, OriginalCount, EarlyFilter);
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h
index 49543f69486..eddaa66dc11 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.h
+++ b/ydb/core/tx/columnshard/engines/reader/granule.h
@@ -13,21 +13,21 @@ class TGranulesFillingContext;
class TGranule {
private:
- YDB_READONLY(ui64, GranuleId, 0);
+ ui64 GranuleId = 0;
- YDB_READONLY_FLAG(NotIndexedBatchReady, false);
- YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, NotIndexedBatch);
- YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, NotIndexedBatchFutureFilter);
+ bool NotIndexedBatchReadyFlag = false;
+ std::shared_ptr<arrow::RecordBatch> NotIndexedBatch;
+ std::shared_ptr<NArrow::TColumnFilter> NotIndexedBatchFutureFilter;
std::vector<std::shared_ptr<arrow::RecordBatch>> NonSortableBatches;
std::vector<std::shared_ptr<arrow::RecordBatch>> SortableBatches;
- YDB_FLAG_ACCESSOR(DuplicationsAvailable, false);
- YDB_READONLY_FLAG(Ready, false);
+ bool DuplicationsAvailableFlag = false;
+ bool ReadyFlag = false;
std::deque<TBatch> Batches;
std::set<ui32> WaitBatches;
std::set<ui32> GranuleBatchNumbers;
TGranulesFillingContext* Owner = nullptr;
- YDB_READONLY_DEF(THashSet<const void*>, BatchesToDedup);
+ THashSet<const void*> BatchesToDedup;
void CheckReady();
public:
@@ -36,6 +36,38 @@ public:
, Owner(&owner) {
}
+ ui64 GetGranuleId() const noexcept {
+ return GranuleId;
+ }
+
+ const THashSet<const void*>& GetBatchesToDedup() const noexcept {
+ return BatchesToDedup;
+ }
+
+ const std::shared_ptr<arrow::RecordBatch>& GetNotIndexedBatch() const noexcept {
+ return NotIndexedBatch;
+ }
+
+ const std::shared_ptr<NArrow::TColumnFilter>& GetNotIndexedBatchFutureFilter() const noexcept {
+ return NotIndexedBatchFutureFilter;
+ }
+
+ bool IsNotIndexedBatchReady() const noexcept {
+ return NotIndexedBatchReadyFlag;
+ }
+
+ bool IsDuplicationsAvailable() const noexcept {
+ return DuplicationsAvailableFlag;
+ }
+
+ void SetDuplicationsAvailable(bool val) noexcept {
+ DuplicationsAvailableFlag = val;
+ }
+
+ bool IsReady() const noexcept {
+ return ReadyFlag;
+ }
+
std::vector<std::shared_ptr<arrow::RecordBatch>> GetReadyBatches() const {
std::vector<std::shared_ptr<arrow::RecordBatch>> result;
result.reserve(SortableBatches.size() + NonSortableBatches.size() + 1);
diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
index 3875f9b6355..8f51265b7bd 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
@@ -55,7 +55,7 @@ bool TPKSortingWithLimit::DoWakeup(const TGranule& granule, TGranulesFillingCont
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "granule_started")("granule_id", g.GetGranule()->GetGranuleId())("count", GranulesOutOrderForPortions.size());
MergeStream.AddIndependentSource(g.GetGranule()->GetNotIndexedBatch(), g.GetGranule()->GetNotIndexedBatchFutureFilter());
}
- auto& batches = g.MutableBatches();
+ auto& batches = g.GetBatches();
while (batches.size() && batches.front()->IsFiltered() && CurrentItemsLimit) {
auto b = batches.front();
if (b->IsSortableInGranule()) {
@@ -72,7 +72,7 @@ bool TPKSortingWithLimit::DoWakeup(const TGranule& granule, TGranulesFillingCont
if (!CurrentItemsLimit || batches.empty()) {
while (batches.size()) {
auto b = batches.front();
- context.GetCounters().GetSkippedBytes()->Add(b->GetFetchBytes(context.GetPostFilterColumns()));
+ context.GetCounters().SkippedBytes->Add(b->GetFetchBytes(context.GetPostFilterColumns()));
b->InitBatch(nullptr);
batches.pop_front();
}
@@ -121,22 +121,22 @@ TPKSortingWithLimit::TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata)
void IOrderPolicy::OnBatchFilterInitialized(TBatch& batch, TGranulesFillingContext& context) {
Y_VERIFY(!!batch.GetFilter());
if (!batch.GetFilteredRecordsCount()) {
- context.GetCounters().GetEmptyFilterCount()->Add(1);
- context.GetCounters().GetEmptyFilterFetchedBytes()->Add(batch.GetFetchedBytes());
- context.GetCounters().GetSkippedBytes()->Add(batch.GetFetchBytes(context.GetPostFilterColumns()));
+ context.GetCounters().EmptyFilterCount->Add(1);
+ context.GetCounters().EmptyFilterFetchedBytes->Add(batch.GetFetchedBytes());
+ context.GetCounters().SkippedBytes->Add(batch.GetFetchBytes(context.GetPostFilterColumns()));
batch.InitBatch(nullptr);
} else {
- context.GetCounters().GetFilteredRowsCount()->Add(batch.GetFilterBatch()->num_rows());
+ context.GetCounters().FilteredRowsCount->Add(batch.GetFilterBatch()->num_rows());
if (batch.AskedColumnsAlready(context.GetPostFilterColumns())) {
- context.GetCounters().GetFilterOnlyCount()->Add(1);
- context.GetCounters().GetFilterOnlyFetchedBytes()->Add(batch.GetFetchedBytes());
- context.GetCounters().GetFilterOnlyUsefulBytes()->Add(batch.GetUsefulFetchedBytes());
- context.GetCounters().GetSkippedBytes()->Add(batch.GetFetchBytes(context.GetPostFilterColumns()));
+ context.GetCounters().FilterOnlyCount->Add(1);
+ context.GetCounters().FilterOnlyFetchedBytes->Add(batch.GetFetchedBytes());
+ context.GetCounters().FilterOnlyUsefulBytes->Add(batch.GetUsefulFetchedBytes());
+ context.GetCounters().SkippedBytes->Add(batch.GetFetchBytes(context.GetPostFilterColumns()));
batch.InitBatch(batch.GetFilterBatch());
} else {
- context.GetCounters().GetTwoPhasesFilterFetchedBytes()->Add(batch.GetFetchedBytes());
- context.GetCounters().GetTwoPhasesFilterUsefulBytes()->Add(batch.GetUsefulFetchedBytes());
+ context.GetCounters().TwoPhasesFilterFetchedBytes->Add(batch.GetFetchedBytes());
+ context.GetCounters().TwoPhasesFilterUsefulBytes->Add(batch.GetUsefulFetchedBytes());
batch.ResetWithFilter(context.GetPostFilterColumns());
if (batch.IsFetchingReady()) {
@@ -146,9 +146,9 @@ void IOrderPolicy::OnBatchFilterInitialized(TBatch& batch, TGranulesFillingConte
}
}
- context.GetCounters().GetTwoPhasesCount()->Add(1);
- context.GetCounters().GetTwoPhasesPostFilterFetchedBytes()->Add(batch.GetWaitingBytes());
- context.GetCounters().GetTwoPhasesPostFilterUsefulBytes()->Add(batch.GetUsefulWaitingBytes());
+ context.GetCounters().TwoPhasesCount->Add(1);
+ context.GetCounters().TwoPhasesPostFilterFetchedBytes->Add(batch.GetWaitingBytes());
+ context.GetCounters().TwoPhasesPostFilterUsefulBytes->Add(batch.GetUsefulWaitingBytes());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "additional_data")
("filtered_count", batch.GetFilterBatch()->num_rows())
("blobs_count", batch.GetWaitingBlobs().size())
diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.h b/ydb/core/tx/columnshard/engines/reader/order_controller.h
index 969c9cd0e38..842488aeb8d 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_controller.h
+++ b/ydb/core/tx/columnshard/engines/reader/order_controller.h
@@ -99,8 +99,8 @@ public:
class TGranuleOrdered {
private:
bool StartedFlag = false;
- YDB_ACCESSOR_DEF(std::deque<TBatch*>, Batches);
- YDB_READONLY(const TGranule*, Granule, nullptr);
+ std::deque<TBatch*> Batches;
+ const TGranule* Granule = nullptr;
public:
bool Start() {
if (!StartedFlag) {
@@ -109,14 +109,25 @@ public:
} else {
return false;
}
-
+
}
TGranuleOrdered(std::deque<TBatch*>&& batches, TGranule* granule)
: Batches(std::move(batches))
, Granule(granule)
{
+ }
+
+ const std::deque<TBatch*>& GetBatches() const noexcept {
+ return Batches;
+ }
+
+ std::deque<TBatch*>& GetBatches() noexcept {
+ return Batches;
+ }
+ const TGranule* GetGranule() const noexcept {
+ return Granule;
}
};
diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
index 460dc1297d8..e857d671f3d 100644
--- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
@@ -10,15 +10,17 @@ bool TAssembleBatch::DoExecuteImpl() {
/// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here
Y_VERIFY(BatchConstructor.GetColumnsCount());
+ Y_VERIFY(Filter);
- TPortionInfo::TPreparedBatchData::TAssembleOptions options;
- if (Filter->GetInactiveHeadSize() > Filter->GetInactiveTailSize()) {
- options.SetRecordsCountLimit(Filter->Size() - Filter->GetInactiveHeadSize())
- .SetForwardAssemble(false);
- Filter->CutInactiveHead();
- } else {
- options.SetRecordsCountLimit(Filter->Size() - Filter->GetInactiveTailSize());
+ bool forward = Filter->GetInactiveHeadSize() <= Filter->GetInactiveTailSize();
+
+ TPortionInfo::TPreparedBatchData::TAssembleOptions options(forward);
+ options.RecordsCountLimit = Filter->Size() - (forward ? Filter->GetInactiveTailSize() : Filter->GetInactiveHeadSize());
+
+ if (forward) {
Filter->CutInactiveTail();
+ } else {
+ Filter->CutInactiveHead();
}
auto addBatch = BatchConstructor.Assemble(options);
diff --git a/ydb/core/tx/columnshard/engines/reader/queue.h b/ydb/core/tx/columnshard/engines/reader/queue.h
index f8a3da41591..148cca85b9a 100644
--- a/ydb/core/tx/columnshard/engines/reader/queue.h
+++ b/ydb/core/tx/columnshard/engines/reader/queue.h
@@ -7,8 +7,12 @@ namespace NKikimr::NOlap {
class TFetchBlobsQueue {
private:
bool StoppedFlag = false;
- YDB_ACCESSOR_DEF(std::deque<TBlobRange>, IteratorBlobsSequential);
+ std::deque<TBlobRange> IteratorBlobsSequential;
public:
+ const std::deque<TBlobRange>& GetIteratorBlobsSequential() const noexcept {
+ return IteratorBlobsSequential;
+ }
+
bool IsStopped() const {
return StoppedFlag;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
index a43345df60d..534707f8858 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
@@ -13,8 +13,8 @@ class TMergePartialStream {
private:
class TBatchIterator {
private:
- YDB_ACCESSOR(i64, Position, 0);
- YDB_OPT(ui32, PoolId);
+ i64 Position = 0;
+ std::optional<ui32> PoolId;
std::shared_ptr<arrow::RecordBatch> Batch;
std::shared_ptr<NArrow::TColumnFilter> Filter;
@@ -79,6 +79,14 @@ private:
}
}
+ bool HasPoolId() const noexcept {
+ return PoolId.has_value();
+ }
+
+ ui32 GetPoolIdUnsafe() const noexcept {
+ return *PoolId;
+ }
+
bool CheckNextBatch(const TBatchIterator& nextIterator) {
Y_VERIFY_DEBUG(nextIterator.Columns.size() == Columns.size());
return NArrow::ColumnsCompare(Columns, GetLastPosition(), nextIterator.Columns, 0) * ReverseSortKff < 0;