diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-09-21 18:21:02 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-09-21 18:21:02 +0300 |
commit | 03e6b001442731630882038574da6a630b3235f0 (patch) | |
tree | 91c10fdf96f1b2c52763d1bfbba4161f90667d43 | |
parent | de20aeef3c8918a4d36aee222d7911eaf5456037 (diff) | |
download | ydb-03e6b001442731630882038574da6a630b3235f0.tar.gz |
Use heap for DqMerge connection
improve performance for merge algorithm m*n -> m*log(n)
its useful for huge inputs count, for example, in case grouping for non-primary key.
-rw-r--r-- | ydb/core/kqp/ut/kqp_sort_ut.cpp | 8 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_input_producer.cpp | 143 |
2 files changed, 97 insertions, 54 deletions
diff --git a/ydb/core/kqp/ut/kqp_sort_ut.cpp b/ydb/core/kqp/ut/kqp_sort_ut.cpp index b5cf5cb7e2..35c1d7eb42 100644 --- a/ydb/core/kqp/ut/kqp_sort_ut.cpp +++ b/ydb/core/kqp/ut/kqp_sort_ut.cpp @@ -1205,11 +1205,11 @@ Y_UNIT_TEST_SUITE(KqpSort) { auto session = db.CreateSession().GetValueSync().GetSession(); auto result = session.ExecuteDataQuery(Q1_(R"( - $data = SELECT * FROM EightShard WHERE Text = "Value1" ORDER BY Data LIMIT 7; + $data = SELECT * FROM EightShard WHERE Text = "Value1" ORDER BY Data, Key LIMIT 7; - SELECT * FROM $data ORDER BY Data LIMIT 3 OFFSET 0; - SELECT * FROM $data ORDER BY Data LIMIT 3 OFFSET 3; - SELECT * FROM $data ORDER BY Data LIMIT 3 OFFSET 6; + SELECT * FROM $data ORDER BY Data, Key LIMIT 3 OFFSET 0; + SELECT * FROM $data ORDER BY Data, Key LIMIT 3 OFFSET 3; + SELECT * FROM $data ORDER BY Data, Key LIMIT 3 OFFSET 6; )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.cpp b/ydb/library/yql/dq/runtime/dq_input_producer.cpp index 594e2495e3..71f64e30a5 100644 --- a/ydb/library/yql/dq/runtime/dq_input_producer.cpp +++ b/ydb/library/yql/dq/runtime/dq_input_producer.cpp @@ -65,11 +65,12 @@ public: : TComputationValue<TDqInputMergeStreamValue>(memInfo) , Inputs(std::move(inputs)) , SortCols(std::move(sortCols)) - , InputsSize(Inputs.size()) { - CurrentBuffers.resize(InputsSize); - CurrentItemIndexes.resize(InputsSize, 0); - Finished.resize(InputsSize, false); + CurrentBuffers.resize(Inputs.size()); + CurrentItemIndexes.reserve(Inputs.size()); + for (ui32 idx = 0; idx < Inputs.size(); ++idx) { + CurrentItemIndexes.emplace_back(TUnboxedValuesIterator(*this, Inputs[idx], CurrentBuffers[idx])); + } for (auto& sortCol : SortCols) { const auto typeId = sortCol.GetTypeId(); TMaybe<EDataSlot> maybeDataSlot = FindDataSlot(typeId); @@ -81,6 +82,50 @@ public: } private: + class TUnboxedValuesIterator { + private: + TUnboxedValueVector* Data = nullptr; + IDqInput::TPtr Input; + const TDqInputMergeStreamValue* Comparator = nullptr; + ui32 CurrentIndex = 0; + public: + NUdf::EFetchStatus FindBuffer() { + Data->clear(); + CurrentIndex = 0; + if (Input->Pop(*Data)) { + return NUdf::EFetchStatus::Ok; + } + + return Input->IsFinished() ? NUdf::EFetchStatus::Finish : NUdf::EFetchStatus::Yield; + } + + TUnboxedValuesIterator(const TDqInputMergeStreamValue& comparator, IDqInput::TPtr input, TUnboxedValueVector& data) + : Data(&data) + , Input(input) + , Comparator(&comparator) + { + + } + + bool IsYield() const { + return CurrentIndex == Data->size(); + } + + bool operator<(const TUnboxedValuesIterator& item) const { + return Comparator->CompareSortCols(GetValue(), item.GetValue()) > 0; + } + void operator++() { + ++CurrentIndex; + Y_VERIFY(CurrentIndex <= Data->size()); + } + NKikimr::NUdf::TUnboxedValue& GetValue() { + return (*Data)[CurrentIndex]; + } + const NKikimr::NUdf::TUnboxedValue& GetValue() const { + return (*Data)[CurrentIndex]; + } + }; + NUdf::EFetchStatus Fetch(NKikimr::NUdf::TUnboxedValue& result) final { auto status = CheckBuffers(); switch (status) { @@ -96,77 +141,75 @@ private: } NKikimr::NUdf::TUnboxedValue FindResult() { - NKikimr::NUdf::TUnboxedValue* res = nullptr; - size_t chosenIndex = 0; - for (size_t i = 0; i < InputsSize; ++i) { - if (CurrentItemIndexes[i] >= CurrentBuffers[i].size()) { - continue; - } - NKikimr::NUdf::TUnboxedValue &val = CurrentBuffers[i][CurrentItemIndexes[i]]; - if (!res || CompareSortCols(*res, val) > 0) { - res = &val; - chosenIndex = i; - } + YQL_ENSURE(CurrentItemIndexes.size()); + std::pop_heap(CurrentItemIndexes.begin(), CurrentItemIndexes.end()); + auto& current = CurrentItemIndexes.back(); + Y_VERIFY(!current.IsYield()); + NKikimr::NUdf::TUnboxedValue res = current.GetValue(); + ++current; + if (!current.IsYield()) { + std::push_heap(CurrentItemIndexes.begin(), CurrentItemIndexes.end()); } - YQL_ENSURE(chosenIndex < InputsSize); - YQL_ENSURE(res); - ++CurrentItemIndexes[chosenIndex]; - return *res; + return res; } - int CompareSortCols(NKikimr::NUdf::TUnboxedValue& lhs, NKikimr::NUdf::TUnboxedValue& rhs) { + int CompareSortCols(const NKikimr::NUdf::TUnboxedValue& lhs, const NKikimr::NUdf::TUnboxedValue& rhs) const { int compRes = 0; for (auto sortCol = SortCols.begin(); sortCol != SortCols.end() && compRes == 0; ++sortCol) { auto lhsColValue = lhs.GetElement(sortCol->Index); auto rhsColValue = rhs.GetElement(sortCol->Index); - compRes = NKikimr::NMiniKQL::CompareValues(SortColTypes[sortCol->Index], + auto it = SortColTypes.find(sortCol->Index); + Y_VERIFY(it != SortColTypes.end()); + compRes = NKikimr::NMiniKQL::CompareValues(it->second, sortCol->Ascending, /* isOptional */ true, lhsColValue, rhsColValue); } return compRes; } NUdf::EFetchStatus CheckBuffers() { - for (size_t i = 0; i < InputsSize; ++i) { - if (CurrentItemIndexes[i] >= CurrentBuffers[i].size()) { - auto status = FindBuffer(i); - if (status == NUdf::EFetchStatus::Yield) { - return status; + if (InitializationIndex >= CurrentItemIndexes.size()) { + if (CurrentItemIndexes.size() && CurrentItemIndexes.back().IsYield()) { + auto status = CurrentItemIndexes.back().FindBuffer(); + switch (status) { + case NUdf::EFetchStatus::Yield: + return status; + case NUdf::EFetchStatus::Finish: + CurrentItemIndexes.pop_back(); + break; + case NUdf::EFetchStatus::Ok: + std::push_heap(CurrentItemIndexes.begin(), CurrentItemIndexes.end()); + break; } - Finished[i] = (status == NUdf::EFetchStatus::Finish); } + } else { + while (InitializationIndex < CurrentItemIndexes.size()) { + auto status = CurrentItemIndexes[InitializationIndex].FindBuffer(); + switch (status) { + case NUdf::EFetchStatus::Yield: + return status; + case NUdf::EFetchStatus::Finish: + std::swap(CurrentItemIndexes[InitializationIndex], CurrentItemIndexes.back()); + CurrentItemIndexes.pop_back(); + break; + case NUdf::EFetchStatus::Ok: + ++InitializationIndex; + break; + } + } + std::make_heap(CurrentItemIndexes.begin(), CurrentItemIndexes.end()); } - if (IsAllFinished()) { + if (CurrentItemIndexes.empty()) { return NUdf::EFetchStatus::Finish; } return NUdf::EFetchStatus::Ok; } - NUdf::EFetchStatus FindBuffer(size_t index) { - CurrentBuffers[index].clear(); - - if (Inputs[index]->Pop(CurrentBuffers[index])) { - CurrentItemIndexes[index] = 0; - return NUdf::EFetchStatus::Ok; - } - - return Inputs[index]->IsFinished() ? NUdf::EFetchStatus::Finish : NUdf::EFetchStatus::Yield; - } - - bool IsAllFinished() { - bool allFinished = true; - for (bool finished : Finished) { - allFinished &= finished; - } - return allFinished; - } - private: TVector<IDqInput::TPtr> Inputs; TVector<TSortColumnInfo> SortCols; - size_t InputsSize; TVector<TUnboxedValueVector> CurrentBuffers; - TVector<ui64> CurrentItemIndexes; - TVector<bool> Finished; + TVector<TUnboxedValuesIterator> CurrentItemIndexes; + ui32 InitializationIndex = 0; TMap<ui32, EDataSlot> SortColTypes; }; |