diff options
author | chertus <azuikov@ydb.tech> | 2023-04-14 16:15:35 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-04-14 16:15:35 +0300 |
commit | 0c9d88d453c408129ebe7e3f3085f588d9f63a93 (patch) | |
tree | 95c2a5ef097dfaaf1c7466379efc390ba8b79790 | |
parent | f186fba2006170397fc55829216039b6e86808cb (diff) | |
download | ydb-0c9d88d453c408129ebe7e3f3085f588d9f63a93.tar.gz |
implement LowerBound for composite key
-rw-r--r-- | ydb/core/formats/arrow_helpers.cpp | 51 | ||||
-rw-r--r-- | ydb/core/formats/arrow_helpers.h | 4 | ||||
-rw-r--r-- | ydb/core/formats/replace_key.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 24 |
4 files changed, 39 insertions, 48 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp index 4f65458fd4c..8b6c31eda66 100644 --- a/ydb/core/formats/arrow_helpers.cpp +++ b/ydb/core/formats/arrow_helpers.cpp @@ -720,49 +720,14 @@ TVector<TString> ColumnNames(const std::shared_ptr<arrow::Schema>& schema) { return out; } -i64 LowerBound(const std::shared_ptr<arrow::Array>& array, const arrow::Scalar& border, i64 offset) { - i64 pos = 0; - SwitchType(array->type_id(), [&](const auto& type) { - using TWrap = std::decay_t<decltype(type)>; - using T = typename TWrap::T; - using TArray = typename arrow::TypeTraits<T>::ArrayType; - using TScalar = typename arrow::TypeTraits<T>::ScalarType; - - auto& column = static_cast<const TArray&>(*array); - - if constexpr (arrow::is_number_type<T>() || arrow::is_timestamp_type<T>()) { - const auto* start = column.raw_values() + offset; - const auto* end = column.raw_values() + column.length(); - pos = offset; - pos += std::lower_bound(start, end, static_cast<const TScalar&>(border).value) - start; - } else if constexpr (arrow::has_string_view<T>()) { - arrow::util::string_view value(*static_cast<const TScalar&>(border).value); - - // TODO: binary search - for (pos = offset; pos < column.length(); ++pos) { - if (!(column.GetView(pos) < value)) { - return true; - } - } - } else { - Y_VERIFY(false); // not implemented - } - - return true; - }); - - return pos; -} - -// TODO: implement -i64 LowerBound(const std::shared_ptr<arrow::RecordBatch>& batch, const TReplaceKey& key, i64 offset) { - Y_VERIFY(batch->num_columns() == 1); - Y_VERIFY(key.Size() == 1); - - auto res = key.Column(0).GetScalar(key.GetPosition()); - Y_VERIFY_OK(res.status()); - Y_VERIFY(*res); - return LowerBound(batch->column(0), *(*res), offset); +size_t LowerBound(const std::vector<TRawReplaceKey>& batchKeys, const TReplaceKey& key, size_t offset) { + Y_VERIFY(offset <= batchKeys.size()); + if (offset == batchKeys.size()) { + return offset; + } + auto start = batchKeys.begin() + offset; + auto it = std::lower_bound(start, batchKeys.end(), key.ToRaw()); + return it - batchKeys.begin(); } std::shared_ptr<arrow::UInt64Array> MakeUI64Array(ui64 value, i64 size) { diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h index 23361c5ef78..3845c42584c 100644 --- a/ydb/core/formats/arrow_helpers.h +++ b/ydb/core/formats/arrow_helpers.h @@ -14,6 +14,7 @@ using TArrayVec = std::vector<std::shared_ptr<arrow::Array>>; template<typename T> class TReplaceKeyTemplate; using TReplaceKey = TReplaceKeyTemplate<std::shared_ptr<TArrayVec>>; +using TRawReplaceKey = TReplaceKeyTemplate<const TArrayVec*>; // Arrow inrernally keeps references to Buffer objects with the data // This helper class implements arrow::Buffer over TString that owns @@ -109,8 +110,7 @@ std::shared_ptr<arrow::BooleanArray> MakeFilter(const std::vector<bool>& bits); std::vector<bool> CombineFilters(std::vector<bool>&& f1, std::vector<bool>&& f2); std::vector<bool> CombineFilters(std::vector<bool>&& f1, std::vector<bool>&& f2, size_t& count); TVector<TString> ColumnNames(const std::shared_ptr<arrow::Schema>& schema); -i64 LowerBound(const std::shared_ptr<arrow::Array>& column, const arrow::Scalar& value, i64 offset = 0); -i64 LowerBound(const std::shared_ptr<arrow::RecordBatch>& batch, const TReplaceKey& key, i64 offset = 0); +size_t LowerBound(const std::vector<TRawReplaceKey>& batchKeys, const TReplaceKey& key, size_t offset = 0); bool ReserveData(arrow::ArrayBuilder& builder, const size_t size); enum class ECompareType { LESS = 1, diff --git a/ydb/core/formats/replace_key.h b/ydb/core/formats/replace_key.h index 195cc4e5f75..5c5a6804e73 100644 --- a/ydb/core/formats/replace_key.h +++ b/ydb/core/formats/replace_key.h @@ -105,6 +105,14 @@ public: return *(*Columns)[i]; } + TReplaceKeyTemplate<const TArrayVec*> ToRaw() const { + if constexpr (IsOwning) { + return TReplaceKeyTemplate<const TArrayVec*>(Columns.get(), Position); + } else { + return *this; + } + } + template<typename T = TArrayVecPtr> requires IsOwning static TReplaceKeyTemplate<TArrayVecPtr> FromBatch(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& key, int row) { diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 668e74a0385..30c636336b4 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -343,13 +343,22 @@ SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const auto effKey = GetEffectiveKey(batch, indexInfo); Y_VERIFY(effKey->num_columns() && effKey->num_rows()); + std::vector<NArrow::TRawReplaceKey> keys; + { + const auto& columns = effKey->columns(); + keys.reserve(effKey->num_rows()); + for (i64 i = 0; i < effKey->num_rows(); ++i) { + keys.emplace_back(NArrow::TRawReplaceKey(&columns, i)); + } + } + i64 offset = 0; - for (size_t i = 0; i < granules.size(); ++i) { + for (size_t i = 0; i < granules.size() && offset < effKey->num_rows(); ++i) { const i64 end = (i + 1 == granules.size()) // Just take the number of elements in the key column for the last granule. ? effKey->num_rows() // Locate position of the next granule in the key. - : NArrow::LowerBound(effKey, granules[i + 1].first.Border, offset); + : NArrow::LowerBound(keys, granules[i + 1].first.Border, offset); if (const i64 size = end - offset) { Y_VERIFY(out.emplace(granules[i].second, batch->Slice(offset, size)).second); @@ -1789,9 +1798,18 @@ SliceGranuleBatches(const TIndexInfo& indexInfo, const auto effKey = GetEffectiveKey(batch, indexInfo); Y_VERIFY(effKey->num_columns() && effKey->num_rows()); + std::vector<NArrow::TRawReplaceKey> keys; + { + const auto& columns = effKey->columns(); + keys.reserve(effKey->num_rows()); + for (i64 i = 0; i < effKey->num_rows(); ++i) { + keys.emplace_back(NArrow::TRawReplaceKey(&columns, i)); + } + } + batchOffsets.push_back(0); for (const auto& border : borders) { - int offset = NArrow::LowerBound(effKey, border, batchOffsets.back()); + int offset = NArrow::LowerBound(keys, border, batchOffsets.back()); Y_VERIFY(offset >= batchOffsets.back()); batchOffsets.push_back(offset); } |