aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-09-21 18:21:02 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-09-21 18:21:02 +0300
commit03e6b001442731630882038574da6a630b3235f0 (patch)
tree91c10fdf96f1b2c52763d1bfbba4161f90667d43
parentde20aeef3c8918a4d36aee222d7911eaf5456037 (diff)
downloadydb-03e6b001442731630882038574da6a630b3235f0.tar.gz
Use heap for DqMerge connection
improve performance for merge algorithm m*n -> m*log(n) its useful for huge inputs count, for example, in case grouping for non-primary key.
-rw-r--r--ydb/core/kqp/ut/kqp_sort_ut.cpp8
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_producer.cpp143
2 files changed, 97 insertions, 54 deletions
diff --git a/ydb/core/kqp/ut/kqp_sort_ut.cpp b/ydb/core/kqp/ut/kqp_sort_ut.cpp
index b5cf5cb7e2..35c1d7eb42 100644
--- a/ydb/core/kqp/ut/kqp_sort_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_sort_ut.cpp
@@ -1205,11 +1205,11 @@ Y_UNIT_TEST_SUITE(KqpSort) {
auto session = db.CreateSession().GetValueSync().GetSession();
auto result = session.ExecuteDataQuery(Q1_(R"(
- $data = SELECT * FROM EightShard WHERE Text = "Value1" ORDER BY Data LIMIT 7;
+ $data = SELECT * FROM EightShard WHERE Text = "Value1" ORDER BY Data, Key LIMIT 7;
- SELECT * FROM $data ORDER BY Data LIMIT 3 OFFSET 0;
- SELECT * FROM $data ORDER BY Data LIMIT 3 OFFSET 3;
- SELECT * FROM $data ORDER BY Data LIMIT 3 OFFSET 6;
+ SELECT * FROM $data ORDER BY Data, Key LIMIT 3 OFFSET 0;
+ SELECT * FROM $data ORDER BY Data, Key LIMIT 3 OFFSET 3;
+ SELECT * FROM $data ORDER BY Data, Key LIMIT 3 OFFSET 6;
)"), TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.cpp b/ydb/library/yql/dq/runtime/dq_input_producer.cpp
index 594e2495e3..71f64e30a5 100644
--- a/ydb/library/yql/dq/runtime/dq_input_producer.cpp
+++ b/ydb/library/yql/dq/runtime/dq_input_producer.cpp
@@ -65,11 +65,12 @@ public:
: TComputationValue<TDqInputMergeStreamValue>(memInfo)
, Inputs(std::move(inputs))
, SortCols(std::move(sortCols))
- , InputsSize(Inputs.size())
{
- CurrentBuffers.resize(InputsSize);
- CurrentItemIndexes.resize(InputsSize, 0);
- Finished.resize(InputsSize, false);
+ CurrentBuffers.resize(Inputs.size());
+ CurrentItemIndexes.reserve(Inputs.size());
+ for (ui32 idx = 0; idx < Inputs.size(); ++idx) {
+ CurrentItemIndexes.emplace_back(TUnboxedValuesIterator(*this, Inputs[idx], CurrentBuffers[idx]));
+ }
for (auto& sortCol : SortCols) {
const auto typeId = sortCol.GetTypeId();
TMaybe<EDataSlot> maybeDataSlot = FindDataSlot(typeId);
@@ -81,6 +82,50 @@ public:
}
private:
+ class TUnboxedValuesIterator {
+ private:
+ TUnboxedValueVector* Data = nullptr;
+ IDqInput::TPtr Input;
+ const TDqInputMergeStreamValue* Comparator = nullptr;
+ ui32 CurrentIndex = 0;
+ public:
+ NUdf::EFetchStatus FindBuffer() {
+ Data->clear();
+ CurrentIndex = 0;
+ if (Input->Pop(*Data)) {
+ return NUdf::EFetchStatus::Ok;
+ }
+
+ return Input->IsFinished() ? NUdf::EFetchStatus::Finish : NUdf::EFetchStatus::Yield;
+ }
+
+ TUnboxedValuesIterator(const TDqInputMergeStreamValue& comparator, IDqInput::TPtr input, TUnboxedValueVector& data)
+ : Data(&data)
+ , Input(input)
+ , Comparator(&comparator)
+ {
+
+ }
+
+ bool IsYield() const {
+ return CurrentIndex == Data->size();
+ }
+
+ bool operator<(const TUnboxedValuesIterator& item) const {
+ return Comparator->CompareSortCols(GetValue(), item.GetValue()) > 0;
+ }
+ void operator++() {
+ ++CurrentIndex;
+ Y_VERIFY(CurrentIndex <= Data->size());
+ }
+ NKikimr::NUdf::TUnboxedValue& GetValue() {
+ return (*Data)[CurrentIndex];
+ }
+ const NKikimr::NUdf::TUnboxedValue& GetValue() const {
+ return (*Data)[CurrentIndex];
+ }
+ };
+
NUdf::EFetchStatus Fetch(NKikimr::NUdf::TUnboxedValue& result) final {
auto status = CheckBuffers();
switch (status) {
@@ -96,77 +141,75 @@ private:
}
NKikimr::NUdf::TUnboxedValue FindResult() {
- NKikimr::NUdf::TUnboxedValue* res = nullptr;
- size_t chosenIndex = 0;
- for (size_t i = 0; i < InputsSize; ++i) {
- if (CurrentItemIndexes[i] >= CurrentBuffers[i].size()) {
- continue;
- }
- NKikimr::NUdf::TUnboxedValue &val = CurrentBuffers[i][CurrentItemIndexes[i]];
- if (!res || CompareSortCols(*res, val) > 0) {
- res = &val;
- chosenIndex = i;
- }
+ YQL_ENSURE(CurrentItemIndexes.size());
+ std::pop_heap(CurrentItemIndexes.begin(), CurrentItemIndexes.end());
+ auto& current = CurrentItemIndexes.back();
+ Y_VERIFY(!current.IsYield());
+ NKikimr::NUdf::TUnboxedValue res = current.GetValue();
+ ++current;
+ if (!current.IsYield()) {
+ std::push_heap(CurrentItemIndexes.begin(), CurrentItemIndexes.end());
}
- YQL_ENSURE(chosenIndex < InputsSize);
- YQL_ENSURE(res);
- ++CurrentItemIndexes[chosenIndex];
- return *res;
+ return res;
}
- int CompareSortCols(NKikimr::NUdf::TUnboxedValue& lhs, NKikimr::NUdf::TUnboxedValue& rhs) {
+ int CompareSortCols(const NKikimr::NUdf::TUnboxedValue& lhs, const NKikimr::NUdf::TUnboxedValue& rhs) const {
int compRes = 0;
for (auto sortCol = SortCols.begin(); sortCol != SortCols.end() && compRes == 0; ++sortCol) {
auto lhsColValue = lhs.GetElement(sortCol->Index);
auto rhsColValue = rhs.GetElement(sortCol->Index);
- compRes = NKikimr::NMiniKQL::CompareValues(SortColTypes[sortCol->Index],
+ auto it = SortColTypes.find(sortCol->Index);
+ Y_VERIFY(it != SortColTypes.end());
+ compRes = NKikimr::NMiniKQL::CompareValues(it->second,
sortCol->Ascending, /* isOptional */ true, lhsColValue, rhsColValue);
}
return compRes;
}
NUdf::EFetchStatus CheckBuffers() {
- for (size_t i = 0; i < InputsSize; ++i) {
- if (CurrentItemIndexes[i] >= CurrentBuffers[i].size()) {
- auto status = FindBuffer(i);
- if (status == NUdf::EFetchStatus::Yield) {
- return status;
+ if (InitializationIndex >= CurrentItemIndexes.size()) {
+ if (CurrentItemIndexes.size() && CurrentItemIndexes.back().IsYield()) {
+ auto status = CurrentItemIndexes.back().FindBuffer();
+ switch (status) {
+ case NUdf::EFetchStatus::Yield:
+ return status;
+ case NUdf::EFetchStatus::Finish:
+ CurrentItemIndexes.pop_back();
+ break;
+ case NUdf::EFetchStatus::Ok:
+ std::push_heap(CurrentItemIndexes.begin(), CurrentItemIndexes.end());
+ break;
}
- Finished[i] = (status == NUdf::EFetchStatus::Finish);
}
+ } else {
+ while (InitializationIndex < CurrentItemIndexes.size()) {
+ auto status = CurrentItemIndexes[InitializationIndex].FindBuffer();
+ switch (status) {
+ case NUdf::EFetchStatus::Yield:
+ return status;
+ case NUdf::EFetchStatus::Finish:
+ std::swap(CurrentItemIndexes[InitializationIndex], CurrentItemIndexes.back());
+ CurrentItemIndexes.pop_back();
+ break;
+ case NUdf::EFetchStatus::Ok:
+ ++InitializationIndex;
+ break;
+ }
+ }
+ std::make_heap(CurrentItemIndexes.begin(), CurrentItemIndexes.end());
}
- if (IsAllFinished()) {
+ if (CurrentItemIndexes.empty()) {
return NUdf::EFetchStatus::Finish;
}
return NUdf::EFetchStatus::Ok;
}
- NUdf::EFetchStatus FindBuffer(size_t index) {
- CurrentBuffers[index].clear();
-
- if (Inputs[index]->Pop(CurrentBuffers[index])) {
- CurrentItemIndexes[index] = 0;
- return NUdf::EFetchStatus::Ok;
- }
-
- return Inputs[index]->IsFinished() ? NUdf::EFetchStatus::Finish : NUdf::EFetchStatus::Yield;
- }
-
- bool IsAllFinished() {
- bool allFinished = true;
- for (bool finished : Finished) {
- allFinished &= finished;
- }
- return allFinished;
- }
-
private:
TVector<IDqInput::TPtr> Inputs;
TVector<TSortColumnInfo> SortCols;
- size_t InputsSize;
TVector<TUnboxedValueVector> CurrentBuffers;
- TVector<ui64> CurrentItemIndexes;
- TVector<bool> Finished;
+ TVector<TUnboxedValuesIterator> CurrentItemIndexes;
+ ui32 InitializationIndex = 0;
TMap<ui32, EDataSlot> SortColTypes;
};