aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-04-27 08:52:18 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-04-27 08:52:18 +0300
commitba1e7fcd36aca1f47d0ca457071e5373d659dc75 (patch)
treebf6188524124e2f7f5cfe511491ee8ddeb3c8521
parentb9de8851261b66d38f22b8fe68efffb722880d58 (diff)
downloadydb-ba1e7fcd36aca1f47d0ca457071e5373d659dc75.tar.gz
conveyor tasks priority
additional logging fix fields including checker
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp13
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.cpp9
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp7
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp1
-rw-r--r--ydb/core/tx/conveyor/service/service.cpp6
-rw-r--r--ydb/core/tx/conveyor/service/service.h3
-rw-r--r--ydb/core/tx/conveyor/service/worker.h4
-rw-r--r--ydb/core/tx/conveyor/usage/abstract.h13
11 files changed, 40 insertions, 20 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
index d3eaa60dfa..6e562d290b 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
@@ -85,7 +85,6 @@ void TColumnShardScanIterator::FillReadyResults() {
if (batch.ResultBatch->num_rows() > limitLeft) {
// Trim the last batch if total row count exceeds the requested limit
batch.ResultBatch = batch.ResultBatch->Slice(0, limitLeft);
- ready.clear();
}
limitLeft -= batch.ResultBatch->num_rows();
ItemsRead += batch.ResultBatch->num_rows();
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index ee8e9f68b2..91e6d4d010 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -122,11 +122,13 @@ std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds(const bool noTrivial) cons
if (LessPredicate) {
for (auto&& i : LessPredicate->ColumnNames()) {
result.emplace(IndexInfo.GetColumnId(i));
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i);
}
}
if (GreaterPredicate) {
for (auto&& i : GreaterPredicate->ColumnNames()) {
result.emplace(IndexInfo.GetColumnId(i));
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i);
}
}
if (Program) {
@@ -134,6 +136,7 @@ std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds(const bool noTrivial) cons
auto id = IndexInfo.GetColumnIdOptional(i);
if (id) {
result.emplace(*id);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i);
}
}
}
@@ -144,6 +147,7 @@ std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds(const bool noTrivial) cons
auto snapSchema = TIndexInfo::ArrowSchemaSnapshot();
for (auto&& i : snapSchema->fields()) {
result.emplace(IndexInfo.GetColumnId(i->name()));
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i->name());
}
}
return result;
@@ -154,10 +158,12 @@ std::set<ui32> TReadMetadata::GetUsedColumnIds() const {
if (PlanStep) {
auto snapSchema = TIndexInfo::ArrowSchemaSnapshot();
for (auto&& i : snapSchema->fields()) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", i->name());
result.emplace(IndexInfo.GetColumnId(i->name()));
}
}
for (auto&& f : LoadSchema->fields()) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("used_column", f->name());
result.emplace(IndexInfo.GetColumnId(f->name()));
}
return result;
@@ -216,7 +222,11 @@ void TIndexedReadData::InitRead(ui32 inputBatch, bool inGranulesOrder) {
}
NIndexedReader::TBatch& currentBatch = itGranule->second.AddBatch(batchNo, portionInfo);
- currentBatch.Reset(&EarlyFilterColumns);
+ if (portionInfo.AllowEarlyFilter()) {
+ currentBatch.Reset(&EarlyFilterColumns);
+ } else {
+ currentBatch.Reset(&UsedColumns);
+ }
Batches[batchNo] = &currentBatch;
++batchNo;
}
@@ -564,6 +574,7 @@ TIndexedReadData::TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata,
, FetchBlobsQueue(fetchBlobsQueue)
, ReadMetadata(readMetadata)
{
+ UsedColumns = ReadMetadata->GetUsedColumnIds();
PostFilterColumns = ReadMetadata->GetUsedColumnIds();
EarlyFilterColumns = ReadMetadata->GetEarlyFilterColumnIds(true);
if (internalRead || EarlyFilterColumns.empty()) {
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index 172608cf2e..3a4c3c430d 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -226,6 +226,7 @@ struct TPartialReadResult {
class TIndexedReadData {
private:
std::set<ui32> EarlyFilterColumns;
+ std::set<ui32> UsedColumns;
YDB_READONLY_DEF(std::set<ui32>, PostFilterColumns);
bool AbortedFlag = false;
YDB_READONLY_DEF(NColumnShard::TScanCounters, Counters);
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp
index 776760b3dd..c6baead8e9 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp
@@ -37,7 +37,10 @@ NColumnShard::IDataTasksProcessor::ITask::TPtr TBatch::AssembleTask(NColumnShard
}
bool TBatch::AskedColumnsAlready(const std::set<ui32>& columnIds) const {
- if (AskedColumnIds < columnIds) {
+ if (!CurrentColumnIds) {
+ return true;
+ }
+ if (AskedColumnIds.size() < columnIds.size()) {
return false;
}
for (auto&& i : columnIds) {
@@ -68,12 +71,11 @@ void TBatch::Reset(const std::set<ui32>* columnIds) {
} else {
CurrentColumnIds = *columnIds;
Y_VERIFY(CurrentColumnIds->size());
- }
- if (CurrentColumnIds) {
for (auto&& i : *CurrentColumnIds) {
AskedColumnIds.emplace(i);
}
}
+
Y_VERIFY(WaitIndexed.empty());
Y_VERIFY(Data.empty());
WaitingBytes = 0;
@@ -82,6 +84,7 @@ void TBatch::Reset(const std::set<ui32>* columnIds) {
if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) {
continue;
}
+ AskedColumnIds.emplace(rec.ColumnId);
Y_VERIFY(WaitIndexed.emplace(rec.BlobRange).second);
Owner->Owner->AddBlobForFetch(rec.BlobRange, *this);
Y_VERIFY(rec.Portion == Portion);
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
index 0354d5d4f2..5039b0405d 100644
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
@@ -17,7 +17,6 @@ bool TAssembleFilter::DoExecuteImpl() {
FilteredBatch = nullptr;
return true;
}
-#if 1 // optimization
if (ReadMetadata->Program && AllowEarlyFilter) {
auto filter = NOlap::EarlyFilter(batch, ReadMetadata->Program);
Filter->CombineSequential(filter);
@@ -27,10 +26,8 @@ bool TAssembleFilter::DoExecuteImpl() {
return true;
}
}
-#else
- Y_UNUSED(AllowEarlyFilter);
-#endif
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "not_skip_data")("original_count", OriginalCount)("filtered_count", batch->num_rows());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "not_skip_data")
+ ("original_count", OriginalCount)("filtered_count", batch->num_rows())("columns_count", BatchConstructor.GetColumnsCount())("allow_early", AllowEarlyFilter);
FilteredBatch = batch;
return true;
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
index 7f1e5f38e7..902276896f 100644
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
@@ -31,7 +31,7 @@ namespace NKikimr::NOlap::NIndexedReader {
, BatchNo(batch.GetBatchNo())
, AllowEarlyFilter(allowEarlyFilter)
{
-
+ TBase::SetPriority(TBase::EPriority::Normal);
}
};
diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
index 2e9ff7520c..45310d0d22 100644
--- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
@@ -47,6 +47,7 @@ TAssembleBatch::TAssembleBatch(TPortionInfo::TPreparedBatchData&& batchConstruct
, FilterBatch(currentBatch.GetFilterBatch())
, BatchNo(currentBatch.GetBatchNo())
{
+ TBase::SetPriority(TBase::EPriority::High);
Y_VERIFY(currentBatch.GetFilter());
Y_VERIFY(currentBatch.GetFilterBatch());
Y_VERIFY(!currentBatch.GetFilteredBatch());
diff --git a/ydb/core/tx/conveyor/service/service.cpp b/ydb/core/tx/conveyor/service/service.cpp
index 45e030ef46..bf9f5ea647 100644
--- a/ydb/core/tx/conveyor/service/service.cpp
+++ b/ydb/core/tx/conveyor/service/service.cpp
@@ -37,8 +37,8 @@ void TDistributor::Bootstrap() {
void TDistributor::HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& ev) {
SolutionsRate->Inc();
if (Waiting.size()) {
- Send(ev->Sender, new TEvInternal::TEvNewTask(Waiting.front()));
- Waiting.pop_front();
+ Send(ev->Sender, new TEvInternal::TEvNewTask(Waiting.top()));
+ Waiting.pop();
} else {
Workers.emplace_back(ev->Sender);
}
@@ -60,7 +60,7 @@ void TDistributor::HandleMain(TEvExecution::TEvNewTask::TPtr& ev) {
Send(Workers.back(), new TEvInternal::TEvNewTask(TWorkerTask(ev->Get()->GetTask(), ev->Sender)));
Workers.pop_back();
} else if (Waiting.size() < Config.GetQueueSizeLimit()) {
- Waiting.emplace_back(ev->Get()->GetTask(), ev->Sender);
+ Waiting.emplace(ev->Get()->GetTask(), ev->Sender);
} else {
ALS_ERROR(NKikimrServices::TX_CONVEYOR) << "action=overlimit;sender=" << ev->Sender << ";workers=" << Workers.size() << ";waiting=" << Waiting.size();
OverlimitRate->Inc();
diff --git a/ydb/core/tx/conveyor/service/service.h b/ydb/core/tx/conveyor/service/service.h
index 5e9d49a625..cffa1fbca8 100644
--- a/ydb/core/tx/conveyor/service/service.h
+++ b/ydb/core/tx/conveyor/service/service.h
@@ -4,6 +4,7 @@
#include <ydb/core/tx/conveyor/usage/events.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <queue>
namespace NKikimr::NConveyor {
@@ -12,7 +13,7 @@ private:
const TConfig Config;
const TString ConveyorName = "common";
std::vector<TActorId> Workers;
- std::deque<TWorkerTask> Waiting;
+ std::priority_queue<TWorkerTask> Waiting;
const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueSize;
const ::NMonitoring::TDynamicCounters::TCounterPtr WaitingQueueSizeLimit;
const ::NMonitoring::TDynamicCounters::TCounterPtr WorkersCount;
diff --git a/ydb/core/tx/conveyor/service/worker.h b/ydb/core/tx/conveyor/service/worker.h
index 32b0148add..280b0c43e2 100644
--- a/ydb/core/tx/conveyor/service/worker.h
+++ b/ydb/core/tx/conveyor/service/worker.h
@@ -19,7 +19,11 @@ public:
TWorkerTask(ITask::TPtr task, const NActors::TActorId& ownerId)
: Task(task)
, OwnerId(ownerId) {
+ Y_VERIFY(task);
+ }
+ bool operator<(const TWorkerTask& wTask) const {
+ return Task->GetPriority() < wTask.Task->GetPriority();
}
};
diff --git a/ydb/core/tx/conveyor/usage/abstract.h b/ydb/core/tx/conveyor/usage/abstract.h
index fa60859083..0ac70b8583 100644
--- a/ydb/core/tx/conveyor/usage/abstract.h
+++ b/ydb/core/tx/conveyor/usage/abstract.h
@@ -6,8 +6,15 @@
namespace NKikimr::NConveyor {
class ITask {
+public:
+ enum EPriority: ui32 {
+ High = 1000,
+ Normal = 500,
+ Low = 0
+ };
private:
- TString ErrorMessage;
+ YDB_READONLY_DEF(TString, ErrorMessage);
+ YDB_ACCESSOR(EPriority, Priority, EPriority::Normal);
protected:
ITask& SetErrorMessage(const TString& message) {
ErrorMessage = message;
@@ -22,10 +29,6 @@ public:
return !!ErrorMessage;
}
- const TString& GetErrorMessage() const {
- return ErrorMessage;
- }
-
bool Execute();
};