aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-04-14 16:15:35 +0300
committerchertus <azuikov@ydb.tech>2023-04-14 16:15:35 +0300
commit0c9d88d453c408129ebe7e3f3085f588d9f63a93 (patch)
tree95c2a5ef097dfaaf1c7466379efc390ba8b79790
parentf186fba2006170397fc55829216039b6e86808cb (diff)
downloadydb-0c9d88d453c408129ebe7e3f3085f588d9f63a93.tar.gz
implement LowerBound for composite key
-rw-r--r--ydb/core/formats/arrow_helpers.cpp51
-rw-r--r--ydb/core/formats/arrow_helpers.h4
-rw-r--r--ydb/core/formats/replace_key.h8
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp24
4 files changed, 39 insertions, 48 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp
index 4f65458fd4c..8b6c31eda66 100644
--- a/ydb/core/formats/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow_helpers.cpp
@@ -720,49 +720,14 @@ TVector<TString> ColumnNames(const std::shared_ptr<arrow::Schema>& schema) {
return out;
}
-i64 LowerBound(const std::shared_ptr<arrow::Array>& array, const arrow::Scalar& border, i64 offset) {
- i64 pos = 0;
- SwitchType(array->type_id(), [&](const auto& type) {
- using TWrap = std::decay_t<decltype(type)>;
- using T = typename TWrap::T;
- using TArray = typename arrow::TypeTraits<T>::ArrayType;
- using TScalar = typename arrow::TypeTraits<T>::ScalarType;
-
- auto& column = static_cast<const TArray&>(*array);
-
- if constexpr (arrow::is_number_type<T>() || arrow::is_timestamp_type<T>()) {
- const auto* start = column.raw_values() + offset;
- const auto* end = column.raw_values() + column.length();
- pos = offset;
- pos += std::lower_bound(start, end, static_cast<const TScalar&>(border).value) - start;
- } else if constexpr (arrow::has_string_view<T>()) {
- arrow::util::string_view value(*static_cast<const TScalar&>(border).value);
-
- // TODO: binary search
- for (pos = offset; pos < column.length(); ++pos) {
- if (!(column.GetView(pos) < value)) {
- return true;
- }
- }
- } else {
- Y_VERIFY(false); // not implemented
- }
-
- return true;
- });
-
- return pos;
-}
-
-// TODO: implement
-i64 LowerBound(const std::shared_ptr<arrow::RecordBatch>& batch, const TReplaceKey& key, i64 offset) {
- Y_VERIFY(batch->num_columns() == 1);
- Y_VERIFY(key.Size() == 1);
-
- auto res = key.Column(0).GetScalar(key.GetPosition());
- Y_VERIFY_OK(res.status());
- Y_VERIFY(*res);
- return LowerBound(batch->column(0), *(*res), offset);
+size_t LowerBound(const std::vector<TRawReplaceKey>& batchKeys, const TReplaceKey& key, size_t offset) {
+ Y_VERIFY(offset <= batchKeys.size());
+ if (offset == batchKeys.size()) {
+ return offset;
+ }
+ auto start = batchKeys.begin() + offset;
+ auto it = std::lower_bound(start, batchKeys.end(), key.ToRaw());
+ return it - batchKeys.begin();
}
std::shared_ptr<arrow::UInt64Array> MakeUI64Array(ui64 value, i64 size) {
diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h
index 23361c5ef78..3845c42584c 100644
--- a/ydb/core/formats/arrow_helpers.h
+++ b/ydb/core/formats/arrow_helpers.h
@@ -14,6 +14,7 @@ using TArrayVec = std::vector<std::shared_ptr<arrow::Array>>;
template<typename T>
class TReplaceKeyTemplate;
using TReplaceKey = TReplaceKeyTemplate<std::shared_ptr<TArrayVec>>;
+using TRawReplaceKey = TReplaceKeyTemplate<const TArrayVec*>;
// Arrow inrernally keeps references to Buffer objects with the data
// This helper class implements arrow::Buffer over TString that owns
@@ -109,8 +110,7 @@ std::shared_ptr<arrow::BooleanArray> MakeFilter(const std::vector<bool>& bits);
std::vector<bool> CombineFilters(std::vector<bool>&& f1, std::vector<bool>&& f2);
std::vector<bool> CombineFilters(std::vector<bool>&& f1, std::vector<bool>&& f2, size_t& count);
TVector<TString> ColumnNames(const std::shared_ptr<arrow::Schema>& schema);
-i64 LowerBound(const std::shared_ptr<arrow::Array>& column, const arrow::Scalar& value, i64 offset = 0);
-i64 LowerBound(const std::shared_ptr<arrow::RecordBatch>& batch, const TReplaceKey& key, i64 offset = 0);
+size_t LowerBound(const std::vector<TRawReplaceKey>& batchKeys, const TReplaceKey& key, size_t offset = 0);
bool ReserveData(arrow::ArrayBuilder& builder, const size_t size);
enum class ECompareType {
LESS = 1,
diff --git a/ydb/core/formats/replace_key.h b/ydb/core/formats/replace_key.h
index 195cc4e5f75..5c5a6804e73 100644
--- a/ydb/core/formats/replace_key.h
+++ b/ydb/core/formats/replace_key.h
@@ -105,6 +105,14 @@ public:
return *(*Columns)[i];
}
+ TReplaceKeyTemplate<const TArrayVec*> ToRaw() const {
+ if constexpr (IsOwning) {
+ return TReplaceKeyTemplate<const TArrayVec*>(Columns.get(), Position);
+ } else {
+ return *this;
+ }
+ }
+
template<typename T = TArrayVecPtr> requires IsOwning
static TReplaceKeyTemplate<TArrayVecPtr> FromBatch(const std::shared_ptr<arrow::RecordBatch>& batch,
const std::shared_ptr<arrow::Schema>& key, int row) {
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 668e74a0385..30c636336b4 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -343,13 +343,22 @@ SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch,
const auto effKey = GetEffectiveKey(batch, indexInfo);
Y_VERIFY(effKey->num_columns() && effKey->num_rows());
+ std::vector<NArrow::TRawReplaceKey> keys;
+ {
+ const auto& columns = effKey->columns();
+ keys.reserve(effKey->num_rows());
+ for (i64 i = 0; i < effKey->num_rows(); ++i) {
+ keys.emplace_back(NArrow::TRawReplaceKey(&columns, i));
+ }
+ }
+
i64 offset = 0;
- for (size_t i = 0; i < granules.size(); ++i) {
+ for (size_t i = 0; i < granules.size() && offset < effKey->num_rows(); ++i) {
const i64 end = (i + 1 == granules.size())
// Just take the number of elements in the key column for the last granule.
? effKey->num_rows()
// Locate position of the next granule in the key.
- : NArrow::LowerBound(effKey, granules[i + 1].first.Border, offset);
+ : NArrow::LowerBound(keys, granules[i + 1].first.Border, offset);
if (const i64 size = end - offset) {
Y_VERIFY(out.emplace(granules[i].second, batch->Slice(offset, size)).second);
@@ -1789,9 +1798,18 @@ SliceGranuleBatches(const TIndexInfo& indexInfo,
const auto effKey = GetEffectiveKey(batch, indexInfo);
Y_VERIFY(effKey->num_columns() && effKey->num_rows());
+ std::vector<NArrow::TRawReplaceKey> keys;
+ {
+ const auto& columns = effKey->columns();
+ keys.reserve(effKey->num_rows());
+ for (i64 i = 0; i < effKey->num_rows(); ++i) {
+ keys.emplace_back(NArrow::TRawReplaceKey(&columns, i));
+ }
+ }
+
batchOffsets.push_back(0);
for (const auto& border : borders) {
- int offset = NArrow::LowerBound(effKey, border, batchOffsets.back());
+ int offset = NArrow::LowerBound(keys, border, batchOffsets.back());
Y_VERIFY(offset >= batchOffsets.back());
batchOffsets.push_back(offset);
}