diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-27 08:52:18 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-27 08:52:18 +0300 |
commit | ba1e7fcd36aca1f47d0ca457071e5373d659dc75 (patch) | |
tree | bf6188524124e2f7f5cfe511491ee8ddeb3c8521 | |
parent | b9de8851261b66d38f22b8fe68efffb722880d58 (diff) | |
download | ydb-ba1e7fcd36aca1f47d0ca457071e5373d659dc75.tar.gz |
conveyor tasks priority
additional logging
fix fields including checker
11 files changed, 40 insertions, 20 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp index d3eaa60dfa..6e562d290b 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp @@ -85,7 +85,6 @@ void TColumnShardScanIterator::FillReadyResults() { if (batch.ResultBatch->num_rows() > limitLeft) { // Trim the last batch if total row count exceeds the requested limit batch.ResultBatch = batch.ResultBatch->Slice(0, limitLeft); - ready.clear(); } limitLeft -= batch.ResultBatch->num_rows(); ItemsRead += batch.ResultBatch->num_rows(); diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index ee8e9f68b2..91e6d4d010 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -122,11 +122,13 @@ std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds(const bool noTrivial) cons if (LessPredicate) { for (auto&& i : LessPredicate->ColumnNames()) { result.emplace(IndexInfo.GetColumnId(i)); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); } } if (GreaterPredicate) { for (auto&& i : GreaterPredicate->ColumnNames()) { result.emplace(IndexInfo.GetColumnId(i)); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); } } if (Program) { @@ -134,6 +136,7 @@ std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds(const bool noTrivial) cons auto id = IndexInfo.GetColumnIdOptional(i); if (id) { result.emplace(*id); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i); } } } @@ -144,6 +147,7 @@ std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds(const bool noTrivial) cons auto snapSchema = TIndexInfo::ArrowSchemaSnapshot(); for (auto&& i : snapSchema->fields()) { result.emplace(IndexInfo.GetColumnId(i->name())); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i->name()); } } return result; @@ -154,10 +158,12 @@ std::set<ui32> TReadMetadata::GetUsedColumnIds() const { if (PlanStep) { auto snapSchema = TIndexInfo::ArrowSchemaSnapshot(); for (auto&& i : snapSchema->fields()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", i->name()); result.emplace(IndexInfo.GetColumnId(i->name())); } } for (auto&& f : LoadSchema->fields()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", f->name()); result.emplace(IndexInfo.GetColumnId(f->name())); } return result; @@ -216,7 +222,11 @@ void TIndexedReadData::InitRead(ui32 inputBatch, bool inGranulesOrder) { } NIndexedReader::TBatch& currentBatch = itGranule->second.AddBatch(batchNo, portionInfo); - currentBatch.Reset(&EarlyFilterColumns); + if (portionInfo.AllowEarlyFilter()) { + currentBatch.Reset(&EarlyFilterColumns); + } else { + currentBatch.Reset(&UsedColumns); + } Batches[batchNo] = ¤tBatch; ++batchNo; } @@ -564,6 +574,7 @@ TIndexedReadData::TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, , FetchBlobsQueue(fetchBlobsQueue) , ReadMetadata(readMetadata) { + UsedColumns = ReadMetadata->GetUsedColumnIds(); PostFilterColumns = ReadMetadata->GetUsedColumnIds(); EarlyFilterColumns = ReadMetadata->GetEarlyFilterColumnIds(true); if (internalRead || EarlyFilterColumns.empty()) { diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index 172608cf2e..3a4c3c430d 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -226,6 +226,7 @@ struct TPartialReadResult { class TIndexedReadData { private: std::set<ui32> EarlyFilterColumns; + std::set<ui32> UsedColumns; YDB_READONLY_DEF(std::set<ui32>, PostFilterColumns); bool AbortedFlag = false; YDB_READONLY_DEF(NColumnShard::TScanCounters, Counters); diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp index 776760b3dd..c6baead8e9 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.cpp +++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp @@ -37,7 +37,10 @@ NColumnShard::IDataTasksProcessor::ITask::TPtr TBatch::AssembleTask(NColumnShard } bool TBatch::AskedColumnsAlready(const std::set<ui32>& columnIds) const { - if (AskedColumnIds < columnIds) { + if (!CurrentColumnIds) { + return true; + } + if (AskedColumnIds.size() < columnIds.size()) { return false; } for (auto&& i : columnIds) { @@ -68,12 +71,11 @@ void TBatch::Reset(const std::set<ui32>* columnIds) { } else { CurrentColumnIds = *columnIds; Y_VERIFY(CurrentColumnIds->size()); - } - if (CurrentColumnIds) { for (auto&& i : *CurrentColumnIds) { AskedColumnIds.emplace(i); } } + Y_VERIFY(WaitIndexed.empty()); Y_VERIFY(Data.empty()); WaitingBytes = 0; @@ -82,6 +84,7 @@ void TBatch::Reset(const std::set<ui32>* columnIds) { if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) { continue; } + AskedColumnIds.emplace(rec.ColumnId); Y_VERIFY(WaitIndexed.emplace(rec.BlobRange).second); Owner->Owner->AddBlobForFetch(rec.BlobRange, *this); Y_VERIFY(rec.Portion == Portion); diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp index 0354d5d4f2..5039b0405d 100644 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp @@ -17,7 +17,6 @@ bool TAssembleFilter::DoExecuteImpl() { FilteredBatch = nullptr; return true; } -#if 1 // optimization if (ReadMetadata->Program && AllowEarlyFilter) { auto filter = NOlap::EarlyFilter(batch, ReadMetadata->Program); Filter->CombineSequential(filter); @@ -27,10 +26,8 @@ bool TAssembleFilter::DoExecuteImpl() { return true; } } -#else - Y_UNUSED(AllowEarlyFilter); -#endif - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "not_skip_data")("original_count", OriginalCount)("filtered_count", batch->num_rows()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "not_skip_data") + ("original_count", OriginalCount)("filtered_count", batch->num_rows())("columns_count", BatchConstructor.GetColumnsCount())("allow_early", AllowEarlyFilter); FilteredBatch = batch; return true; diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h index 7f1e5f38e7..902276896f 100644 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h @@ -31,7 +31,7 @@ namespace NKikimr::NOlap::NIndexedReader { , BatchNo(batch.GetBatchNo()) , AllowEarlyFilter(allowEarlyFilter) { - + TBase::SetPriority(TBase::EPriority::Normal); } }; diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp index 2e9ff7520c..45310d0d22 100644 --- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp @@ -47,6 +47,7 @@ TAssembleBatch::TAssembleBatch(TPortionInfo::TPreparedBatchData&& batchConstruct , FilterBatch(currentBatch.GetFilterBatch()) , BatchNo(currentBatch.GetBatchNo()) { + TBase::SetPriority(TBase::EPriority::High); Y_VERIFY(currentBatch.GetFilter()); Y_VERIFY(currentBatch.GetFilterBatch()); Y_VERIFY(!currentBatch.GetFilteredBatch()); diff --git a/ydb/core/tx/conveyor/service/service.cpp b/ydb/core/tx/conveyor/service/service.cpp index 45e030ef46..bf9f5ea647 100644 --- a/ydb/core/tx/conveyor/service/service.cpp +++ b/ydb/core/tx/conveyor/service/service.cpp @@ -37,8 +37,8 @@ void TDistributor::Bootstrap() { void TDistributor::HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& ev) { SolutionsRate->Inc(); if (Waiting.size()) { - Send(ev->Sender, new TEvInternal::TEvNewTask(Waiting.front())); - Waiting.pop_front(); + Send(ev->Sender, new TEvInternal::TEvNewTask(Waiting.top())); + Waiting.pop(); } else { Workers.emplace_back(ev->Sender); } @@ -60,7 +60,7 @@ void TDistributor::HandleMain(TEvExecution::TEvNewTask::TPtr& ev) { Send(Workers.back(), new TEvInternal::TEvNewTask(TWorkerTask(ev->Get()->GetTask(), ev->Sender))); Workers.pop_back(); } else if (Waiting.size() < Config.GetQueueSizeLimit()) { - Waiting.emplace_back(ev->Get()->GetTask(), ev->Sender); + Waiting.emplace(ev->Get()->GetTask(), ev->Sender); } else { ALS_ERROR(NKikimrServices::TX_CONVEYOR) << "action=overlimit;sender=" << ev->Sender << ";workers=" << Workers.size() << ";waiting=" << Waiting.size(); OverlimitRate->Inc(); diff --git a/ydb/core/tx/conveyor/service/service.h b/ydb/core/tx/conveyor/service/service.h index 5e9d49a625..cffa1fbca8 100644 --- a/ydb/core/tx/conveyor/service/service.h +++ b/ydb/core/tx/conveyor/service/service.h @@ -4,6 +4,7 @@ #include <ydb/core/tx/conveyor/usage/events.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/monlib/dynamic_counters/counters.h> +#include <queue> namespace NKikimr::NConveyor { @@ -12,7 +13,7 @@ private: const TConfig Config; const TString ConveyorName = "common"; std::vector<TActorId> Workers; - std::deque<TWorkerTask> Waiting; + std::priority_queue<TWorkerTask> Waiting; const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueSize; const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueSizeLimit; const ::NMonitoring::TDynamicCounters::TCounterPtr WorkersCount; diff --git a/ydb/core/tx/conveyor/service/worker.h b/ydb/core/tx/conveyor/service/worker.h index 32b0148add..280b0c43e2 100644 --- a/ydb/core/tx/conveyor/service/worker.h +++ b/ydb/core/tx/conveyor/service/worker.h @@ -19,7 +19,11 @@ public: TWorkerTask(ITask::TPtr task, const NActors::TActorId& ownerId) : Task(task) , OwnerId(ownerId) { + Y_VERIFY(task); + } + bool operator<(const TWorkerTask& wTask) const { + return Task->GetPriority() < wTask.Task->GetPriority(); } }; diff --git a/ydb/core/tx/conveyor/usage/abstract.h b/ydb/core/tx/conveyor/usage/abstract.h index fa60859083..0ac70b8583 100644 --- a/ydb/core/tx/conveyor/usage/abstract.h +++ b/ydb/core/tx/conveyor/usage/abstract.h @@ -6,8 +6,15 @@ namespace NKikimr::NConveyor { class ITask { +public: + enum EPriority: ui32 { + High = 1000, + Normal = 500, + Low = 0 + }; private: - TString ErrorMessage; + YDB_READONLY_DEF(TString, ErrorMessage); + YDB_ACCESSOR(EPriority, Priority, EPriority::Normal); protected: ITask& SetErrorMessage(const TString& message) { ErrorMessage = message; @@ -22,10 +29,6 @@ public: return !!ErrorMessage; } - const TString& GetErrorMessage() const { - return ErrorMessage; - } - bool Execute(); }; |