diff options
author | chertus <azuikov@ydb.tech> | 2023-05-04 16:35:21 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-05-04 16:35:21 +0300 |
commit | 5813fbc93132b092da12b713d11c19c91d551bcb (patch) | |
tree | a3c5c699ad1e1d4bda1a06cbba0102d265e3b593 | |
parent | 6b1bed39e996ecde5d9637b97612d7c4f36467f3 (diff) | |
download | ydb-5813fbc93132b092da12b713d11c19c91d551bcb.tar.gz |
remove YDB_* macro from core/tx/column_shard
17 files changed, 264 insertions, 107 deletions
diff --git a/ydb/core/tx/columnshard/counters.h b/ydb/core/tx/columnshard/counters.h index f862fb2ccdf..f9875de8f7f 100644 --- a/ydb/core/tx/columnshard/counters.h +++ b/ydb/core/tx/columnshard/counters.h @@ -4,33 +4,31 @@ namespace NKikimr::NColumnShard { -class TScanCounters { -private: - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, PortionBytes); - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterBytes); - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, PostFilterBytes); - - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, AssembleFilterCount); - - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterOnlyCount); - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterOnlyFetchedBytes); - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterOnlyUsefulBytes); - - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, EmptyFilterCount); - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, EmptyFilterFetchedBytes); - - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, OriginalRowsCount); - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilteredRowsCount); - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, SkippedBytes); - - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesCount); - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesFilterFetchedBytes); - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesFilterUsefulBytes); - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesPostFilterFetchedBytes); - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesPostFilterUsefulBytes); -public: +struct TScanCounters { + NMonitoring::TDynamicCounters::TCounterPtr PortionBytes; + NMonitoring::TDynamicCounters::TCounterPtr FilterBytes; + NMonitoring::TDynamicCounters::TCounterPtr PostFilterBytes; + + NMonitoring::TDynamicCounters::TCounterPtr AssembleFilterCount; + + NMonitoring::TDynamicCounters::TCounterPtr FilterOnlyCount; + NMonitoring::TDynamicCounters::TCounterPtr FilterOnlyFetchedBytes; + NMonitoring::TDynamicCounters::TCounterPtr FilterOnlyUsefulBytes; + + NMonitoring::TDynamicCounters::TCounterPtr EmptyFilterCount; + NMonitoring::TDynamicCounters::TCounterPtr EmptyFilterFetchedBytes; + + NMonitoring::TDynamicCounters::TCounterPtr OriginalRowsCount; + NMonitoring::TDynamicCounters::TCounterPtr FilteredRowsCount; + NMonitoring::TDynamicCounters::TCounterPtr SkippedBytes; + + NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesCount; + NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesFilterFetchedBytes; + NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesFilterUsefulBytes; + NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesPostFilterFetchedBytes; + NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesPostFilterUsefulBytes; + TScanCounters(const TString& module = "Scan"); }; - } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 9ed9ebc93d4..d992f9dd638 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -117,11 +117,11 @@ void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader:: Y_VERIFY(IndexedBlobs.emplace(range).second); Y_VERIFY(IndexedBlobSubscriber.emplace(range, &batch).second); if (batch.GetFilter()) { - Counters.GetPostFilterBytes()->Add(range.Size); + Counters.PostFilterBytes->Add(range.Size); ReadMetadata->ReadStats->DataAdditionalBytes += range.Size; FetchBlobsQueue.emplace_front(range); } else { - Counters.GetFilterBytes()->Add(range.Size); + Counters.FilterBytes->Add(range.Size); ReadMetadata->ReadStats->DataFilterBytes += range.Size; FetchBlobsQueue.emplace_back(range); } @@ -151,7 +151,7 @@ void TIndexedReadData::InitRead(ui32 inputBatch) { } GranulesContext->PrepareForStart(); - Counters.GetPortionBytes()->Add(portionsBytes); + Counters.PortionBytes->Add(portionsBytes); auto& stats = ReadMetadata->ReadStats; stats->IndexGranules = ReadMetadata->SelectInfo->Granules.size(); stats->IndexPortions = ReadMetadata->SelectInfo->Portions.size(); diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index ffdc27fb5b1..a243682b867 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -21,8 +21,8 @@ class TIndexedReadData { private: std::unique_ptr<NIndexedReader::TGranulesFillingContext> GranulesContext; - YDB_READONLY_DEF(NColumnShard::TScanCounters, Counters); - YDB_READONLY_DEF(NColumnShard::TDataTasksProcessorContainer, TasksProcessor); + NColumnShard::TScanCounters Counters; + NColumnShard::TDataTasksProcessorContainer TasksProcessor; TFetchBlobsQueue& FetchBlobsQueue; NOlap::TReadMetadata::TConstPtr ReadMetadata; bool OnePhaseReadMode = false; @@ -38,6 +38,14 @@ public: TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, TFetchBlobsQueue& fetchBlobsQueue, const bool internalRead, const NColumnShard::TScanCounters& counters, NColumnShard::TDataTasksProcessorContainer tasksProcessor); + const NColumnShard::TScanCounters& GetCounters() const noexcept { + return Counters; + } + + const NColumnShard::TDataTasksProcessorContainer& GetTasksProcessor() const noexcept { + return TasksProcessor; + } + NIndexedReader::TGranulesFillingContext& GetGranulesContext() { Y_VERIFY(GranulesContext); return *GranulesContext; diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp index f5174fd6068..664d9bc3a54 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portion_info.cpp @@ -233,11 +233,12 @@ std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble(con std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const { std::vector<std::shared_ptr<arrow::ChunkedArray>> columns; std::vector< std::shared_ptr<arrow::Field>> fields; + ui64 limit = options.RecordsCountLimit ? *options.RecordsCountLimit : Max<ui64>(); for (auto&& i : Columns) { if (!options.IsAcceptedColumn(i.GetColumnId())) { continue; } - columns.emplace_back(i.Assemble(options.GetRecordsCountLimitDef(Max<ui32>()), !options.IsForwardAssemble())); + columns.emplace_back(i.Assemble(limit, !options.ForwardAssemble)); fields.emplace_back(i.GetField()); } diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index 4efd4c181d1..5db5aca2a05 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -254,8 +254,8 @@ public: class TAssembleBlobInfo { private: - YDB_READONLY(ui32, NullRowsCount, 0); - YDB_READONLY_DEF(TString, Data); + ui32 NullRowsCount = 0; + TString Data; public: TAssembleBlobInfo(const ui32 rowsCount) : NullRowsCount(rowsCount) { @@ -267,6 +267,14 @@ public: } + ui32 GetNullRowsCount() const noexcept { + return NullRowsCount; + } + + const TString& GetData() const noexcept { + return Data; + } + std::shared_ptr<arrow::RecordBatch> BuildRecordBatch(std::shared_ptr<arrow::Schema> schema) const { if (NullRowsCount) { Y_VERIFY(!Data); @@ -280,10 +288,14 @@ public: class TPreparedColumn { private: - YDB_READONLY(ui32, ColumnId, 0); + ui32 ColumnId = 0; std::shared_ptr<arrow::Field> Field; std::vector<TAssembleBlobInfo> Blobs; public: + ui32 GetColumnId() const noexcept { + return ColumnId; + } + const std::string& GetName() const { return Field->name(); } @@ -309,18 +321,20 @@ public: std::shared_ptr<arrow::Schema> Schema; public: + struct TAssembleOptions { + const bool ForwardAssemble = true; + std::optional<ui32> RecordsCountLimit; + std::optional<std::set<ui32>> IncludedColumnIds; + std::optional<std::set<ui32>> ExcludedColumnIds; - std::vector<std::string> GetSchemaColumnNames() const { - return Schema->field_names(); - } + TAssembleOptions() noexcept + : TAssembleOptions(true) + {} + + explicit TAssembleOptions(bool forward) noexcept + : ForwardAssemble(forward) + {} - class TAssembleOptions { - private: - YDB_OPT(ui32, RecordsCountLimit); - YDB_FLAG_ACCESSOR(ForwardAssemble, true); - YDB_OPT(std::set<ui32>, IncludedColumnIds); - YDB_OPT(std::set<ui32>, ExcludedColumnIds); - public: bool IsAcceptedColumn(const ui32 columnId) const { if (IncludedColumnIds && !IncludedColumnIds->contains(columnId)) { return false; @@ -332,6 +346,10 @@ public: } }; + std::vector<std::string> GetSchemaColumnNames() const { + return Schema->field_names(); + } + size_t GetColumnsCount() const { return Columns.size(); } @@ -340,10 +358,9 @@ public: : Columns(std::move(columns)) , Schema(schema) { - } - std::shared_ptr<arrow::RecordBatch> Assemble(const TAssembleOptions& options = Default<TAssembleOptions>()) const; + std::shared_ptr<arrow::RecordBatch> Assemble(const TAssembleOptions& options = {}) const; }; template <class TExternalBlobInfo> diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp index dfe30b754f2..47e00395b62 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.cpp +++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp @@ -19,7 +19,7 @@ TBatch::TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionI Owner->SetDuplicationsAvailable(true); if (portionInfo.CanHaveDups()) { AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "dup_portion"); - DuplicationsAvailableFlag = true; + DuplicationsAvailable = true; } } } diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h index d854b4a9976..16a2fb33bb7 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.h +++ b/ydb/core/tx/columnshard/engines/reader/batch.h @@ -19,31 +19,79 @@ class TGranule; class TBatch { private: - YDB_READONLY(ui64, BatchNo, 0); - YDB_READONLY(ui64, Portion, 0); - YDB_READONLY(ui64, Granule, 0); - YDB_READONLY(ui64, WaitingBytes, 0); - YDB_READONLY(ui64, FetchedBytes, 0); + ui64 BatchNo = 0; + ui64 Portion = 0; + ui64 Granule = 0; + ui64 WaitingBytes = 0; + ui64 FetchedBytes = 0; THashSet<TBlobRange> WaitIndexed; - YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilteredBatch); - YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilterBatch); - YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter); - YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, FutureFilter); - + std::shared_ptr<arrow::RecordBatch> FilteredBatch; + std::shared_ptr<arrow::RecordBatch> FilterBatch; + std::shared_ptr<NArrow::TColumnFilter> Filter; + std::shared_ptr<NArrow::TColumnFilter> FutureFilter; + ui32 OriginalRecordsCount = 0; - YDB_READONLY_FLAG(DuplicationsAvailable, false); + bool DuplicationsAvailable = false; THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data; TGranule* Owner; const TPortionInfo* PortionInfo = nullptr; - YDB_READONLY_DEF(std::optional<std::set<ui32>>, CurrentColumnIds); + std::optional<std::set<ui32>> CurrentColumnIds; std::set<ui32> AskedColumnIds; void ResetCommon(const std::set<ui32>& columnIds); ui64 GetUsefulBytes(const ui64 bytes) const; public: + bool IsDuplicationsAvailable() const noexcept { + return DuplicationsAvailable; + } + + void SetDuplicationsAvailable(bool val) noexcept { + DuplicationsAvailable = val; + } + + ui64 GetBatchNo() const noexcept { + return BatchNo; + } + + ui64 GetPortion() const noexcept { + return Portion; + } + + ui64 GetGranule() const noexcept { + return Granule; + } + + ui64 GetWaitingBytes() const noexcept { + return WaitingBytes; + } + + ui64 GetFetchedBytes() const noexcept { + return FetchedBytes; + } + + const std::optional<std::set<ui32>>& GetCurrentColumnIds() const noexcept { + return CurrentColumnIds; + } + + const std::shared_ptr<arrow::RecordBatch>& GetFilteredBatch() const noexcept { + return FilteredBatch; + } + + const std::shared_ptr<arrow::RecordBatch>& GetFilterBatch() const noexcept { + return FilterBatch; + } + + const std::shared_ptr<NArrow::TColumnFilter>& GetFilter() const noexcept { + return Filter; + } + + const std::shared_ptr<NArrow::TColumnFilter>& GetFutureFilter() const noexcept { + return FutureFilter; + } + ui64 GetUsefulWaitingBytes() const { return GetUsefulBytes(WaitingBytes); } diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp index 7da0a05e6d3..344748263e1 100644 --- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp +++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp @@ -7,7 +7,7 @@ bool IDataTasksProcessor::ITask::DoExecute() { if (OwnerOperator && OwnerOperator->IsStopped()) { return true; } else { - DataProcessedFlag = true; + DataProcessed = true; return DoExecuteImpl(); } } diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h index 040a2fe730e..afe942fbe1c 100644 --- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h +++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h @@ -20,7 +20,7 @@ public: class ITask: public NConveyor::ITask { private: std::shared_ptr<IDataTasksProcessor> OwnerOperator; - YDB_READONLY_FLAG(DataProcessed, false); + bool DataProcessed = false; protected: TDataTasksProcessorContainer GetTasksProcessorContainer() const; virtual bool DoApply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const = 0; @@ -38,6 +38,10 @@ public: using TPtr = std::shared_ptr<ITask>; virtual ~ITask() = default; bool Apply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const; + + bool IsDataProcessed() const noexcept { + return DataProcessed; + } }; protected: virtual bool DoAdd(ITask::TPtr task) = 0; @@ -64,7 +68,7 @@ public: class TDataTasksProcessorContainer { private: - YDB_READONLY_DEF(IDataTasksProcessor::TPtr, Object); + IDataTasksProcessor::TPtr Object; public: TDataTasksProcessorContainer() = default; TDataTasksProcessorContainer(IDataTasksProcessor::TPtr object) @@ -91,6 +95,10 @@ public: return Object && Object->IsStopped(); } + IDataTasksProcessor::TPtr GetObject() const noexcept { + return Object; + } + void Add(NOlap::NIndexedReader::TGranulesFillingContext& context, IDataTasksProcessor::ITask::TPtr task); }; diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h index 96c9cb40e8b..e9652e0b11b 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.h +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h @@ -13,18 +13,18 @@ namespace NKikimr::NOlap::NIndexedReader { class TGranulesFillingContext { private: bool AbortedFlag = false; - YDB_READONLY_DEF(TReadMetadata::TConstPtr, ReadMetadata); + TReadMetadata::TConstPtr ReadMetadata; const bool InternalReading = false; TIndexedReadData& Owner; THashMap<ui64, NIndexedReader::TGranule*> GranulesToOut; std::set<ui64> ReadyGranulesAccumulator; THashMap<ui64, NIndexedReader::TGranule> Granules; - YDB_READONLY_DEF(std::set<ui32>, EarlyFilterColumns); - YDB_READONLY_DEF(std::set<ui32>, PostFilterColumns); + std::set<ui32> EarlyFilterColumns; + std::set<ui32> PostFilterColumns; std::set<ui32> FilterStageColumns; std::set<ui32> UsedColumns; - YDB_READONLY_DEF(IOrderPolicy::TPtr, SortingPolicy); - YDB_READONLY_DEF(NColumnShard::TScanCounters, Counters); + IOrderPolicy::TPtr SortingPolicy; + NColumnShard::TScanCounters Counters; std::vector<NIndexedReader::TBatch*> Batches; bool PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const; @@ -32,6 +32,26 @@ private: public: TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading, const ui32 batchesCount); + TReadMetadata::TConstPtr GetReadMetadata() const noexcept { + return ReadMetadata; + } + + const std::set<ui32>& GetEarlyFilterColumns() const noexcept { + return EarlyFilterColumns; + } + + const std::set<ui32>& GetPostFilterColumns() const noexcept { + return PostFilterColumns; + } + + IOrderPolicy::TPtr GetSortingPolicy() const noexcept { + return SortingPolicy; + } + + NColumnShard::TScanCounters GetCounters() const noexcept { + return Counters; + } + NColumnShard::TDataTasksProcessorContainer GetTasksProcessor() const; void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches); diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp index 7531c0dd176..c7f9c5f0bac 100644 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp @@ -9,7 +9,7 @@ bool TAssembleFilter::DoExecuteImpl() { /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here TPortionInfo::TPreparedBatchData::TAssembleOptions options; - options.SetIncludedColumnIds(FilterColumnIds); + options.IncludedColumnIds = FilterColumnIds; auto batch = BatchConstructor.Assemble(options); Y_VERIFY(batch); Y_VERIFY(batch->num_rows()); @@ -36,7 +36,7 @@ bool TAssembleFilter::DoExecuteImpl() { if ((size_t)batch->schema()->num_fields() < BatchConstructor.GetColumnsCount()) { TPortionInfo::TPreparedBatchData::TAssembleOptions options; - options.SetExcludedColumnIds(FilterColumnIds); + options.ExcludedColumnIds = FilterColumnIds; auto addBatch = BatchConstructor.Assemble(options); Y_VERIFY(addBatch); Y_VERIFY(Filter->Apply(addBatch)); @@ -53,8 +53,8 @@ bool TAssembleFilter::DoExecuteImpl() { bool TAssembleFilter::DoApply(TGranulesFillingContext& owner) const { TBatch& batch = owner.GetBatchInfo(BatchNo); Y_VERIFY(OriginalCount); - owner.GetCounters().GetOriginalRowsCount()->Add(OriginalCount); - owner.GetCounters().GetAssembleFilterCount()->Add(1); + owner.GetCounters().OriginalRowsCount->Add(OriginalCount); + owner.GetCounters().AssembleFilterCount->Add(1); batch.InitFilter(Filter, FilteredBatch, OriginalCount, EarlyFilter); return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h index 49543f69486..eddaa66dc11 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.h +++ b/ydb/core/tx/columnshard/engines/reader/granule.h @@ -13,21 +13,21 @@ class TGranulesFillingContext; class TGranule { private: - YDB_READONLY(ui64, GranuleId, 0); + ui64 GranuleId = 0; - YDB_READONLY_FLAG(NotIndexedBatchReady, false); - YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, NotIndexedBatch); - YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, NotIndexedBatchFutureFilter); + bool NotIndexedBatchReadyFlag = false; + std::shared_ptr<arrow::RecordBatch> NotIndexedBatch; + std::shared_ptr<NArrow::TColumnFilter> NotIndexedBatchFutureFilter; std::vector<std::shared_ptr<arrow::RecordBatch>> NonSortableBatches; std::vector<std::shared_ptr<arrow::RecordBatch>> SortableBatches; - YDB_FLAG_ACCESSOR(DuplicationsAvailable, false); - YDB_READONLY_FLAG(Ready, false); + bool DuplicationsAvailableFlag = false; + bool ReadyFlag = false; std::deque<TBatch> Batches; std::set<ui32> WaitBatches; std::set<ui32> GranuleBatchNumbers; TGranulesFillingContext* Owner = nullptr; - YDB_READONLY_DEF(THashSet<const void*>, BatchesToDedup); + THashSet<const void*> BatchesToDedup; void CheckReady(); public: @@ -36,6 +36,38 @@ public: , Owner(&owner) { } + ui64 GetGranuleId() const noexcept { + return GranuleId; + } + + const THashSet<const void*>& GetBatchesToDedup() const noexcept { + return BatchesToDedup; + } + + const std::shared_ptr<arrow::RecordBatch>& GetNotIndexedBatch() const noexcept { + return NotIndexedBatch; + } + + const std::shared_ptr<NArrow::TColumnFilter>& GetNotIndexedBatchFutureFilter() const noexcept { + return NotIndexedBatchFutureFilter; + } + + bool IsNotIndexedBatchReady() const noexcept { + return NotIndexedBatchReadyFlag; + } + + bool IsDuplicationsAvailable() const noexcept { + return DuplicationsAvailableFlag; + } + + void SetDuplicationsAvailable(bool val) noexcept { + DuplicationsAvailableFlag = val; + } + + bool IsReady() const noexcept { + return ReadyFlag; + } + std::vector<std::shared_ptr<arrow::RecordBatch>> GetReadyBatches() const { std::vector<std::shared_ptr<arrow::RecordBatch>> result; result.reserve(SortableBatches.size() + NonSortableBatches.size() + 1); diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp index 3875f9b6355..8f51265b7bd 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp +++ b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp @@ -55,7 +55,7 @@ bool TPKSortingWithLimit::DoWakeup(const TGranule& granule, TGranulesFillingCont AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "granule_started")("granule_id", g.GetGranule()->GetGranuleId())("count", GranulesOutOrderForPortions.size()); MergeStream.AddIndependentSource(g.GetGranule()->GetNotIndexedBatch(), g.GetGranule()->GetNotIndexedBatchFutureFilter()); } - auto& batches = g.MutableBatches(); + auto& batches = g.GetBatches(); while (batches.size() && batches.front()->IsFiltered() && CurrentItemsLimit) { auto b = batches.front(); if (b->IsSortableInGranule()) { @@ -72,7 +72,7 @@ bool TPKSortingWithLimit::DoWakeup(const TGranule& granule, TGranulesFillingCont if (!CurrentItemsLimit || batches.empty()) { while (batches.size()) { auto b = batches.front(); - context.GetCounters().GetSkippedBytes()->Add(b->GetFetchBytes(context.GetPostFilterColumns())); + context.GetCounters().SkippedBytes->Add(b->GetFetchBytes(context.GetPostFilterColumns())); b->InitBatch(nullptr); batches.pop_front(); } @@ -121,22 +121,22 @@ TPKSortingWithLimit::TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata) void IOrderPolicy::OnBatchFilterInitialized(TBatch& batch, TGranulesFillingContext& context) { Y_VERIFY(!!batch.GetFilter()); if (!batch.GetFilteredRecordsCount()) { - context.GetCounters().GetEmptyFilterCount()->Add(1); - context.GetCounters().GetEmptyFilterFetchedBytes()->Add(batch.GetFetchedBytes()); - context.GetCounters().GetSkippedBytes()->Add(batch.GetFetchBytes(context.GetPostFilterColumns())); + context.GetCounters().EmptyFilterCount->Add(1); + context.GetCounters().EmptyFilterFetchedBytes->Add(batch.GetFetchedBytes()); + context.GetCounters().SkippedBytes->Add(batch.GetFetchBytes(context.GetPostFilterColumns())); batch.InitBatch(nullptr); } else { - context.GetCounters().GetFilteredRowsCount()->Add(batch.GetFilterBatch()->num_rows()); + context.GetCounters().FilteredRowsCount->Add(batch.GetFilterBatch()->num_rows()); if (batch.AskedColumnsAlready(context.GetPostFilterColumns())) { - context.GetCounters().GetFilterOnlyCount()->Add(1); - context.GetCounters().GetFilterOnlyFetchedBytes()->Add(batch.GetFetchedBytes()); - context.GetCounters().GetFilterOnlyUsefulBytes()->Add(batch.GetUsefulFetchedBytes()); - context.GetCounters().GetSkippedBytes()->Add(batch.GetFetchBytes(context.GetPostFilterColumns())); + context.GetCounters().FilterOnlyCount->Add(1); + context.GetCounters().FilterOnlyFetchedBytes->Add(batch.GetFetchedBytes()); + context.GetCounters().FilterOnlyUsefulBytes->Add(batch.GetUsefulFetchedBytes()); + context.GetCounters().SkippedBytes->Add(batch.GetFetchBytes(context.GetPostFilterColumns())); batch.InitBatch(batch.GetFilterBatch()); } else { - context.GetCounters().GetTwoPhasesFilterFetchedBytes()->Add(batch.GetFetchedBytes()); - context.GetCounters().GetTwoPhasesFilterUsefulBytes()->Add(batch.GetUsefulFetchedBytes()); + context.GetCounters().TwoPhasesFilterFetchedBytes->Add(batch.GetFetchedBytes()); + context.GetCounters().TwoPhasesFilterUsefulBytes->Add(batch.GetUsefulFetchedBytes()); batch.ResetWithFilter(context.GetPostFilterColumns()); if (batch.IsFetchingReady()) { @@ -146,9 +146,9 @@ void IOrderPolicy::OnBatchFilterInitialized(TBatch& batch, TGranulesFillingConte } } - context.GetCounters().GetTwoPhasesCount()->Add(1); - context.GetCounters().GetTwoPhasesPostFilterFetchedBytes()->Add(batch.GetWaitingBytes()); - context.GetCounters().GetTwoPhasesPostFilterUsefulBytes()->Add(batch.GetUsefulWaitingBytes()); + context.GetCounters().TwoPhasesCount->Add(1); + context.GetCounters().TwoPhasesPostFilterFetchedBytes->Add(batch.GetWaitingBytes()); + context.GetCounters().TwoPhasesPostFilterUsefulBytes->Add(batch.GetUsefulWaitingBytes()); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "additional_data") ("filtered_count", batch.GetFilterBatch()->num_rows()) ("blobs_count", batch.GetWaitingBlobs().size()) diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.h b/ydb/core/tx/columnshard/engines/reader/order_controller.h index 969c9cd0e38..842488aeb8d 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_controller.h +++ b/ydb/core/tx/columnshard/engines/reader/order_controller.h @@ -99,8 +99,8 @@ public: class TGranuleOrdered { private: bool StartedFlag = false; - YDB_ACCESSOR_DEF(std::deque<TBatch*>, Batches); - YDB_READONLY(const TGranule*, Granule, nullptr); + std::deque<TBatch*> Batches; + const TGranule* Granule = nullptr; public: bool Start() { if (!StartedFlag) { @@ -109,14 +109,25 @@ public: } else { return false; } - + } TGranuleOrdered(std::deque<TBatch*>&& batches, TGranule* granule) : Batches(std::move(batches)) , Granule(granule) { + } + + const std::deque<TBatch*>& GetBatches() const noexcept { + return Batches; + } + + std::deque<TBatch*>& GetBatches() noexcept { + return Batches; + } + const TGranule* GetGranule() const noexcept { + return Granule; } }; diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp index 460dc1297d8..e857d671f3d 100644 --- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp @@ -10,15 +10,17 @@ bool TAssembleBatch::DoExecuteImpl() { /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here Y_VERIFY(BatchConstructor.GetColumnsCount()); + Y_VERIFY(Filter); - TPortionInfo::TPreparedBatchData::TAssembleOptions options; - if (Filter->GetInactiveHeadSize() > Filter->GetInactiveTailSize()) { - options.SetRecordsCountLimit(Filter->Size() - Filter->GetInactiveHeadSize()) - .SetForwardAssemble(false); - Filter->CutInactiveHead(); - } else { - options.SetRecordsCountLimit(Filter->Size() - Filter->GetInactiveTailSize()); + bool forward = Filter->GetInactiveHeadSize() <= Filter->GetInactiveTailSize(); + + TPortionInfo::TPreparedBatchData::TAssembleOptions options(forward); + options.RecordsCountLimit = Filter->Size() - (forward ? Filter->GetInactiveTailSize() : Filter->GetInactiveHeadSize()); + + if (forward) { Filter->CutInactiveTail(); + } else { + Filter->CutInactiveHead(); } auto addBatch = BatchConstructor.Assemble(options); diff --git a/ydb/core/tx/columnshard/engines/reader/queue.h b/ydb/core/tx/columnshard/engines/reader/queue.h index f8a3da41591..148cca85b9a 100644 --- a/ydb/core/tx/columnshard/engines/reader/queue.h +++ b/ydb/core/tx/columnshard/engines/reader/queue.h @@ -7,8 +7,12 @@ namespace NKikimr::NOlap { class TFetchBlobsQueue { private: bool StoppedFlag = false; - YDB_ACCESSOR_DEF(std::deque<TBlobRange>, IteratorBlobsSequential); + std::deque<TBlobRange> IteratorBlobsSequential; public: + const std::deque<TBlobRange>& GetIteratorBlobsSequential() const noexcept { + return IteratorBlobsSequential; + } + bool IsStopped() const { return StoppedFlag; } diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h index a43345df60d..534707f8858 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h @@ -13,8 +13,8 @@ class TMergePartialStream { private: class TBatchIterator { private: - YDB_ACCESSOR(i64, Position, 0); - YDB_OPT(ui32, PoolId); + i64 Position = 0; + std::optional<ui32> PoolId; std::shared_ptr<arrow::RecordBatch> Batch; std::shared_ptr<NArrow::TColumnFilter> Filter; @@ -79,6 +79,14 @@ private: } } + bool HasPoolId() const noexcept { + return PoolId.has_value(); + } + + ui32 GetPoolIdUnsafe() const noexcept { + return *PoolId; + } + bool CheckNextBatch(const TBatchIterator& nextIterator) { Y_VERIFY_DEBUG(nextIterator.Columns.size() == Columns.size()); return NArrow::ColumnsCompare(Columns, GetLastPosition(), nextIterator.Columns, 0) * ReverseSortKff < 0; |