aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-04-10 10:58:33 +0300
committerchertus <azuikov@ydb.tech>2023-04-10 10:58:33 +0300
commitb38e48c6cd4b18e00bce01b943325dc772ec941c (patch)
treedab5b1ee3c2b53b04d69058d5f4b0f79f04ccbc0
parentefb303f9483bcb4bf5c3186bcf1c31afa069bf75 (diff)
downloadydb-b38e48c6cd4b18e00bce01b943325dc772ec941c.tar.gz
rewrite TColumnEngineForLogs::TMark
-rw-r--r--ydb/core/formats/replace_key.h194
-rw-r--r--ydb/core/formats/sort_cursor.h24
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h87
4 files changed, 150 insertions, 163 deletions
diff --git a/ydb/core/formats/replace_key.h b/ydb/core/formats/replace_key.h
index 19da041b42..3f27dd1982 100644
--- a/ydb/core/formats/replace_key.h
+++ b/ydb/core/formats/replace_key.h
@@ -1,6 +1,7 @@
#pragma once
#include <ydb/core/base/defs.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
+#include <compare>
namespace NKikimr::NArrow {
@@ -18,15 +19,13 @@ public:
return TypedHash(Column(0), Position, Column(0).type_id());
}
- // TODO: NULLs
template<typename T>
bool operator == (const TReplaceKeyTemplate<T>& key) const {
Y_VERIFY_DEBUG(Size() == key.Size());
for (int i = 0; i < Size(); ++i) {
- Y_VERIFY_DEBUG(Column(i).type_id() == key.Column(i).type_id());
-
- if (!TypedEquals(Column(i), Position, key.Column(i), key.Position)) {
+ auto cmp = CompareColumnValue(i, key, i);
+ if (std::is_neq(cmp)) {
return false;
}
}
@@ -34,44 +33,48 @@ public:
}
template<typename T>
- bool operator < (const TReplaceKeyTemplate<T>& key) const {
+ std::partial_ordering operator <=> (const TReplaceKeyTemplate<T>& key) const {
Y_VERIFY_DEBUG(Size() == key.Size());
for (int i = 0; i < Size(); ++i) {
- int cmp = CompareColumnValue(i, key, i);
- if (cmp < 0) {
- return true;
- } else if (cmp > 0) {
- return false;
+ auto cmp = CompareColumnValue(i, key, i);
+ if (std::is_neq(cmp)) {
+ return cmp;
}
}
- return false;
+ return std::partial_ordering::equivalent;
}
template<typename T>
- bool LessNotNull(const TReplaceKeyTemplate<T>& key) const {
+ std::partial_ordering CompareNotNull(const TReplaceKeyTemplate<T>& key) const {
Y_VERIFY_DEBUG(Size() == key.Size());
for (int i = 0; i < Size(); ++i) {
- int cmp = CompareColumnValue(i, key, i, true);
- if (cmp < 0) {
- return true;
- } else if (cmp > 0) {
- return false;
+ auto cmp = CompareColumnValueNotNull(i, key, i);
+ if (std::is_neq(cmp)) {
+ return cmp;
}
}
- return false;
+ return std::partial_ordering::equivalent;
}
template<typename T>
- int CompareColumnValue(int column, const TReplaceKeyTemplate<T>& key, int keyColumn, bool notNull = false) const {
+ bool LessNotNull(const TReplaceKeyTemplate<T>& key) const {
+ return CompareNotNull(key) == std::partial_ordering::less;
+ }
+
+ template<typename T>
+ std::partial_ordering CompareColumnValueNotNull(int column, const TReplaceKeyTemplate<T>& key, int keyColumn) const {
Y_VERIFY_DEBUG(Column(column).type_id() == key.Column(keyColumn).type_id());
- if (notNull) {
- return TypedCompare<true>(Column(column), Position, key.Column(keyColumn), key.Position);
- } else {
- return TypedCompare<false>(Column(column), Position, key.Column(keyColumn), key.Position);
- }
+ return TypedCompare<true>(Column(column), Position, key.Column(keyColumn), key.Position);
+ }
+
+ template<typename T>
+ std::partial_ordering CompareColumnValue(int column, const TReplaceKeyTemplate<T>& key, int keyColumn) const {
+ Y_VERIFY_DEBUG(Column(column).type_id() == key.Column(keyColumn).type_id());
+
+ return TypedCompare<false>(Column(column), Position, key.Column(keyColumn), key.Position);
}
int Size() const {
@@ -91,58 +94,52 @@ private:
int Position;
static size_t TypedHash(const arrow::Array& ar, int pos, arrow::Type::type typeId) {
- // TODO: more types
- switch (typeId) {
- case arrow::Type::TIMESTAMP:
- return THash<size_t>()((size_t)static_cast<const arrow::TimestampArray&>(ar).Value(pos));
- default:
- break;
- }
- return 0;
- }
-
- static bool TypedEquals(const arrow::Array& lhs, int lpos, const arrow::Array& rhs, int rpos) {
- arrow::Type::type typeId = lhs.type_id();
switch (typeId) {
case arrow::Type::NA:
case arrow::Type::BOOL:
break;
case arrow::Type::UINT8:
- return EqualValue<arrow::UInt8Array>(lhs, lpos, rhs, rpos);
+ return THash<ui8>()(static_cast<const arrow::UInt8Array&>(ar).Value(pos));
case arrow::Type::INT8:
- return EqualValue<arrow::Int8Array>(lhs, lpos, rhs, rpos);
+ return THash<i8>()(static_cast<const arrow::Int8Array&>(ar).Value(pos));
case arrow::Type::UINT16:
- return EqualValue<arrow::UInt16Array>(lhs, lpos, rhs, rpos);
+ return THash<ui16>()(static_cast<const arrow::UInt16Array&>(ar).Value(pos));
case arrow::Type::INT16:
- return EqualValue<arrow::Int16Array>(lhs, lpos, rhs, rpos);
+ return THash<i16>()(static_cast<const arrow::Int16Array&>(ar).Value(pos));
case arrow::Type::UINT32:
- return EqualValue<arrow::UInt32Array>(lhs, lpos, rhs, rpos);
+ return THash<ui32>()(static_cast<const arrow::UInt32Array&>(ar).Value(pos));
case arrow::Type::INT32:
- return EqualValue<arrow::Int32Array>(lhs, lpos, rhs, rpos);
+ return THash<i32>()(static_cast<const arrow::Int32Array&>(ar).Value(pos));
case arrow::Type::UINT64:
- return EqualValue<arrow::UInt64Array>(lhs, lpos, rhs, rpos);
+ return THash<ui64>()(static_cast<const arrow::UInt64Array&>(ar).Value(pos));
case arrow::Type::INT64:
- return EqualValue<arrow::Int64Array>(lhs, lpos, rhs, rpos);
+ return THash<i64>()(static_cast<const arrow::Int64Array&>(ar).Value(pos));
case arrow::Type::HALF_FLOAT:
break;
case arrow::Type::FLOAT:
- return EqualValue<arrow::FloatArray>(lhs, lpos, rhs, rpos);
+ return THash<float>()(static_cast<const arrow::FloatArray&>(ar).Value(pos));
case arrow::Type::DOUBLE:
- return EqualValue<arrow::DoubleArray>(lhs, lpos, rhs, rpos);
- case arrow::Type::STRING:
- return EqualView<arrow::StringArray>(lhs, lpos, rhs, rpos);
- case arrow::Type::BINARY:
- return EqualView<arrow::BinaryArray>(lhs, lpos, rhs, rpos);
+ return THash<double>()(static_cast<const arrow::DoubleArray&>(ar).Value(pos));
+ case arrow::Type::STRING: {
+ const auto& str = static_cast<const arrow::StringArray&>(ar).GetView(pos);
+ return THash<std::string_view>()(std::string_view(str.data(), str.size()));
+ }
+ case arrow::Type::BINARY: {
+ const auto& str = static_cast<const arrow::BinaryArray&>(ar).GetView(pos);
+ return THash<std::string_view>()(std::string_view(str.data(), str.size()));
+ }
case arrow::Type::FIXED_SIZE_BINARY:
- break;
case arrow::Type::DATE32:
- return EqualView<arrow::Date32Array>(lhs, lpos, rhs, rpos);
case arrow::Type::DATE64:
- return EqualView<arrow::Date64Array>(lhs, lpos, rhs, rpos);
+ break;
case arrow::Type::TIMESTAMP:
- return EqualValue<arrow::TimestampArray>(lhs, lpos, rhs, rpos);
+ return THash<i64>()(static_cast<const arrow::TimestampArray&>(ar).Value(pos));
+ case arrow::Type::TIME32:
+ return THash<i32>()(static_cast<const arrow::Time32Array&>(ar).Value(pos));
+ case arrow::Type::TIME64:
+ return THash<i64>()(static_cast<const arrow::Time64Array&>(ar).Value(pos));
case arrow::Type::DURATION:
- return EqualValue<arrow::DurationArray>(lhs, lpos, rhs, rpos);
+ return THash<i64>()(static_cast<const arrow::DurationArray&>(ar).Value(pos));
case arrow::Type::DECIMAL256:
case arrow::Type::DECIMAL:
case arrow::Type::DENSE_UNION:
@@ -159,42 +156,41 @@ private:
case arrow::Type::MAX_ID:
case arrow::Type::SPARSE_UNION:
case arrow::Type::STRUCT:
- case arrow::Type::TIME32:
- case arrow::Type::TIME64:
+ Y_FAIL("not implemented");
break;
}
- return false;
+ return 0;
}
template <bool notNull>
- static int TypedCompare(const arrow::Array& lhs, int lpos, const arrow::Array& rhs, int rpos) {
+ static std::partial_ordering TypedCompare(const arrow::Array& lhs, int lpos, const arrow::Array& rhs, int rpos) {
arrow::Type::type typeId = lhs.type_id();
switch (typeId) {
case arrow::Type::NA:
case arrow::Type::BOOL:
break;
case arrow::Type::UINT8:
- return CompareValue<arrow::UInt8Array, notNull>(lhs, lpos, rhs, rpos);
+ return CompareView<arrow::UInt8Array, notNull>(lhs, lpos, rhs, rpos);
case arrow::Type::INT8:
- return CompareValue<arrow::Int8Array, notNull>(lhs, lpos, rhs, rpos);
+ return CompareView<arrow::Int8Array, notNull>(lhs, lpos, rhs, rpos);
case arrow::Type::UINT16:
- return CompareValue<arrow::UInt16Array, notNull>(lhs, lpos, rhs, rpos);
+ return CompareView<arrow::UInt16Array, notNull>(lhs, lpos, rhs, rpos);
case arrow::Type::INT16:
- return CompareValue<arrow::Int16Array, notNull>(lhs, lpos, rhs, rpos);
+ return CompareView<arrow::Int16Array, notNull>(lhs, lpos, rhs, rpos);
case arrow::Type::UINT32:
- return CompareValue<arrow::UInt32Array, notNull>(lhs, lpos, rhs, rpos);
+ return CompareView<arrow::UInt32Array, notNull>(lhs, lpos, rhs, rpos);
case arrow::Type::INT32:
- return CompareValue<arrow::Int32Array, notNull>(lhs, lpos, rhs, rpos);
+ return CompareView<arrow::Int32Array, notNull>(lhs, lpos, rhs, rpos);
case arrow::Type::UINT64:
- return CompareValue<arrow::UInt64Array, notNull>(lhs, lpos, rhs, rpos);
+ return CompareView<arrow::UInt64Array, notNull>(lhs, lpos, rhs, rpos);
case arrow::Type::INT64:
- return CompareValue<arrow::Int64Array, notNull>(lhs, lpos, rhs, rpos);
+ return CompareView<arrow::Int64Array, notNull>(lhs, lpos, rhs, rpos);
case arrow::Type::HALF_FLOAT:
break;
case arrow::Type::FLOAT:
- return CompareValue<arrow::FloatArray, notNull>(lhs, lpos, rhs, rpos);
+ return CompareView<arrow::FloatArray, notNull>(lhs, lpos, rhs, rpos);
case arrow::Type::DOUBLE:
- return CompareValue<arrow::DoubleArray, notNull>(lhs, lpos, rhs, rpos);
+ return CompareView<arrow::DoubleArray, notNull>(lhs, lpos, rhs, rpos);
case arrow::Type::STRING:
return CompareView<arrow::StringArray, notNull>(lhs, lpos, rhs, rpos);
case arrow::Type::BINARY:
@@ -204,13 +200,13 @@ private:
case arrow::Type::DATE64:
break;
case arrow::Type::TIMESTAMP:
- return CompareValue<arrow::TimestampArray, notNull>(lhs, lpos, rhs, rpos);
+ return CompareView<arrow::TimestampArray, notNull>(lhs, lpos, rhs, rpos);
case arrow::Type::TIME32:
- return CompareValue<arrow::Time32Array, notNull>(lhs, lpos, rhs, rpos);
+ return CompareView<arrow::Time32Array, notNull>(lhs, lpos, rhs, rpos);
case arrow::Type::TIME64:
- return CompareValue<arrow::Time64Array, notNull>(lhs, lpos, rhs, rpos);
+ return CompareView<arrow::Time64Array, notNull>(lhs, lpos, rhs, rpos);
case arrow::Type::DURATION:
- return CompareValue<arrow::DurationArray, notNull>(lhs, lpos, rhs, rpos);
+ return CompareView<arrow::DurationArray, notNull>(lhs, lpos, rhs, rpos);
case arrow::Type::DECIMAL256:
case arrow::Type::DECIMAL:
case arrow::Type::DENSE_UNION:
@@ -227,40 +223,14 @@ private:
case arrow::Type::MAX_ID:
case arrow::Type::SPARSE_UNION:
case arrow::Type::STRUCT:
+ Y_FAIL("not implemented");
break;
}
- return false;
- }
-
- // TODO: NULLs
- template <typename T>
- static bool EqualValue(const arrow::Array& lhs, int lpos, const arrow::Array& rhs, int rpos) {
- auto& left = static_cast<const T&>(lhs);
- auto& right = static_cast<const T&>(rhs);
- return left.Value(lpos) == right.Value(rpos);
- }
-
- // TODO: NULLs
- template <typename T>
- static bool EqualView(const arrow::Array& lhs, int lpos, const arrow::Array& rhs, int rpos) {
- auto& left = static_cast<const T&>(lhs);
- auto& right = static_cast<const T&>(rhs);
- return left.GetView(lpos) == right.GetView(rpos);
- }
-
- template <typename T, bool notNull>
- static int CompareValue(const arrow::Array& lhs, int lpos, const arrow::Array& rhs, int rpos) {
- auto& left = static_cast<const T&>(lhs);
- auto& right = static_cast<const T&>(rhs);
- if constexpr (notNull) {
- return CompareValueNotNull(left.Value(lpos), right.Value(rpos));
- } else {
- return CompareValue(left.Value(lpos), right.Value(rpos), left.IsNull(lpos), right.IsNull(rpos));
- }
+ return std::partial_ordering::equivalent;
}
template <typename T, bool notNull>
- static int CompareView(const arrow::Array& lhs, int lpos, const arrow::Array& rhs, int rpos) {
+ static std::partial_ordering CompareView(const arrow::Array& lhs, int lpos, const arrow::Array& rhs, int rpos) {
auto& left = static_cast<const T&>(lhs);
auto& right = static_cast<const T&>(rhs);
if constexpr (notNull) {
@@ -271,34 +241,30 @@ private:
}
template <typename T>
- static int CompareValue(const T& x, const T& y, bool xIsNull, bool yIsNull) {
+ static std::partial_ordering CompareValue(const T& x, const T& y, bool xIsNull, bool yIsNull) {
+ // TODO: std::partial_ordering::unordered for both nulls?
if (xIsNull) {
- return -1;
+ return std::partial_ordering::less;
}
if (yIsNull) {
- return 1;
+ return std::partial_ordering::greater;
}
return CompareValueNotNull(x, y);
}
template <typename T>
- static int CompareValueNotNull(const T& x, const T& y) {
+ static std::partial_ordering CompareValueNotNull(const T& x, const T& y) {
if constexpr (std::is_same_v<T, arrow::util::string_view>) {
size_t minSize = (x.size() < y.size()) ? x.size() : y.size();
int cmp = memcmp(x.data(), y.data(), minSize);
if (cmp < 0) {
- return -1; // avoid INT_MIN as negative cmp. We require "-negative is positive" for result.
+ return std::partial_ordering::less;
} else if (cmp > 0) {
- return 1;
+ return std::partial_ordering::greater;
}
return CompareValueNotNull(x.size(), y.size());
} else {
- if (x < y) {
- return -1;
- } else if (x > y) {
- return 1;
- }
- return 0;
+ return x <=> y;
}
}
};
diff --git a/ydb/core/formats/sort_cursor.h b/ydb/core/formats/sort_cursor.h
index 36e249c2a2..a12e0b554c 100644
--- a/ydb/core/formats/sort_cursor.h
+++ b/ydb/core/formats/sort_cursor.h
@@ -116,12 +116,24 @@ private:
TRawReplaceKey left(Impl->sort_columns.get(), lhs_pos);
TRawReplaceKey right(rhs.Impl->sort_columns.get(), rhs_pos);
- for (size_t i = 0; i < Impl->desc->Size(); ++i) {
- int res = Impl->desc->Direction(i) * left.CompareColumnValue(i, right, i, NotNull);
- if (res > 0)
- return true;
- if (res < 0)
- return false;
+ if (NotNull) {
+ for (size_t i = 0; i < Impl->desc->Size(); ++i) {
+ auto cmp = left.CompareColumnValueNotNull(i, right, i);
+ int res = Impl->desc->Direction(i) * (std::is_eq(cmp) ? 0 : (std::is_lt(cmp) ? -1 : 1));
+ if (res > 0)
+ return true;
+ if (res < 0)
+ return false;
+ }
+ } else {
+ for (size_t i = 0; i < Impl->desc->Size(); ++i) {
+ auto cmp = left.CompareColumnValue(i, right, i);
+ int res = Impl->desc->Direction(i) * (std::is_eq(cmp) ? 0 : (std::is_lt(cmp) ? -1 : 1));
+ if (res > 0)
+ return true;
+ if (res < 0)
+ return false;
+ }
}
return Impl->order > rhs.Impl->order;
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 6c275279c2..f8f1008d5b 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -351,7 +351,7 @@ SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch,
// Just take the number of elements in the key column for the last granule.
? keyColumn->length()
// Locate position of the next granule in the key.
- : NArrow::LowerBound(keyColumn, *granules[i + 1].first.Border, offset);
+ : NArrow::LowerBound(keyColumn, *granules[i + 1].first.ToScalar(), offset); // TODO: avoid ToScalar()
if (const i64 size = end - offset) {
Y_VERIFY(out.emplace(granules[i].second, batch->Slice(offset, size)).second);
@@ -1108,7 +1108,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
for (auto& [granule, p] : changes.NewGranules) {
ui64 pathId = p.first;
TMark mark = p.second;
- TGranuleRecord rec(pathId, granule, snapshot, mark.Border);
+ TGranuleRecord rec(pathId, granule, snapshot, mark.ToScalar());
if (!SetGranule(rec, apply)) {
LOG_S_ERROR("Cannot insert granule " << rec << " at tablet " << TabletId);
@@ -1253,7 +1253,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
if (Granules.contains(granule)) {
granuleStart = Granules[granule]->Record.Mark;
} else {
- granuleStart = changes.NewGranules.find(granule)->second.second.Border;
+ granuleStart = changes.NewGranules.find(granule)->second.second.ToScalar();
}
auto portionStart = portionInfo.PkStart();
Y_VERIFY(portionStart);
@@ -1794,7 +1794,7 @@ SliceGranuleBatches(const TIndexInfo& indexInfo,
batchOffsets.push_back(0);
for (auto& border : borders) {
- int offset = NArrow::LowerBound(keyColumn, *border.Border, batchOffsets.back());
+ int offset = NArrow::LowerBound(keyColumn, *border.ToScalar(), batchOffsets.back());
Y_VERIFY(offset >= batchOffsets.back());
batchOffsets.push_back(offset);
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index bb36667525..04bb5e04f4 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -1,5 +1,6 @@
#pragma once
#include "defs.h"
+#include <ydb/core/formats/replace_key.h>
#include "column_engine.h"
#include "scalars.h"
@@ -20,51 +21,34 @@ class TCountersTable;
class TColumnEngineForLogs : public IColumnEngine {
public:
struct TMark {
- std::shared_ptr<arrow::Scalar> Border;
+ // TODO: Grouped marks. Share columns in TReplaceKey between multiple marks.
+ NArrow::TReplaceKey Border;
explicit TMark(const std::shared_ptr<arrow::Scalar>& s)
- : Border(s)
- {
- Y_VERIFY(Border);
- Y_VERIFY_DEBUG(NArrow::IsGoodScalar(Border));
- }
+ : Border(FromScalar(s))
+ {}
explicit TMark(const std::shared_ptr<arrow::DataType>& type)
- : Border(MinScalar(type))
- {
- Y_VERIFY_DEBUG(NArrow::IsGoodScalar(Border));
- }
+ : Border(MinBorder(type))
+ {}
- TMark(const TString& key, const std::shared_ptr<arrow::DataType>& type) {
- Deserialize(key, type);
- Y_VERIFY_DEBUG(NArrow::IsGoodScalar(Border));
- }
+ TMark(const TString& key, const std::shared_ptr<arrow::DataType>& type)
+ : Border(FromScalar(DeserializeKeyScalar(key, type)))
+ {}
TMark(const TMark& m) = default;
TMark& operator = (const TMark& m) = default;
bool operator == (const TMark& m) const {
- return Border->Equals(*m.Border);
- }
-
- bool operator < (const TMark& m) const {
- return NArrow::ScalarLess(*Border, *m.Border);
- }
-
- bool operator <= (const TMark& m) const {
- return Border->Equals(*m.Border) || NArrow::ScalarLess(*Border, *m.Border);
+ return Border == m.Border;
}
- bool operator > (const TMark& m) const {
- return !(*this <= m);
- }
-
- bool operator >= (const TMark& m) const {
- return !(*this < m);
+ std::partial_ordering operator <=> (const TMark& m) const {
+ return Border <=> m.Border;
}
ui64 Hash() const {
- return Border->hash();
+ return Border.Hash();
}
operator size_t () const {
@@ -76,11 +60,32 @@ public:
}
TString Serialize() const {
- return SerializeKeyScalar(Border);
+ return SerializeKeyScalar(ToScalar(Border));
}
void Deserialize(const TString& key, const std::shared_ptr<arrow::DataType>& type) {
- Border = DeserializeKeyScalar(key, type);
+ Border = FromScalar(DeserializeKeyScalar(key, type));
+ }
+
+ std::shared_ptr<arrow::Scalar> ToScalar() const {
+ return ToScalar(Border);
+ }
+
+ private:
+ static NArrow::TReplaceKey FromScalar(const std::shared_ptr<arrow::Scalar>& s) {
+ Y_VERIFY_DEBUG(NArrow::IsGoodScalar(s));
+ auto res = MakeArrayFromScalar(*s, 1);
+ Y_VERIFY(res.status().ok(), "%s", res.status().ToString().c_str());
+ return NArrow::TReplaceKey(std::make_shared<NArrow::TArrayVec>(1, *res), 0);
+ }
+
+ static std::shared_ptr<arrow::Scalar> ToScalar(const NArrow::TReplaceKey& key) {
+ Y_VERIFY_DEBUG(key.Size() == 1);
+ auto& column = key.Column(0);
+ auto res = column.GetScalar(key.GetPosition());
+ Y_VERIFY(res.status().ok(), "%s", res.status().ToString().c_str());
+ Y_VERIFY_DEBUG(NArrow::IsGoodScalar(*res));
+ return *res;
}
static std::shared_ptr<arrow::Scalar> MinScalar(const std::shared_ptr<arrow::DataType>& type) {
@@ -90,6 +95,10 @@ public:
}
return NArrow::MinScalar(type);
}
+
+ static NArrow::TReplaceKey MinBorder(const std::shared_ptr<arrow::DataType>& type) {
+ return FromScalar(MinScalar(type));
+ }
};
class TMarksGranules {
@@ -133,7 +142,7 @@ public:
TChanges(const TColumnEngineForLogs& engine,
TVector<NOlap::TInsertedData>&& blobsToIndex, const TCompactionLimits& limits)
: TColumnEngineChanges(TColumnEngineChanges::INSERT)
- , DefaultMark(engine.GetMarkType())
+ , DefaultMark(engine.GetDefaultMark())
{
Limits = limits;
DataToIndex = std::move(blobsToIndex);
@@ -142,7 +151,7 @@ public:
TChanges(const TColumnEngineForLogs& engine,
std::unique_ptr<TCompactionInfo>&& info, const TCompactionLimits& limits)
: TColumnEngineChanges(TColumnEngineChanges::COMPACTION)
- , DefaultMark(engine.GetMarkType())
+ , DefaultMark(engine.GetDefaultMark())
{
Limits = limits;
CompactionInfo = std::move(info);
@@ -151,7 +160,7 @@ public:
TChanges(const TColumnEngineForLogs& engine,
const TSnapshot& snapshot, const TCompactionLimits& limits)
: TColumnEngineChanges(TColumnEngineChanges::CLEANUP)
- , DefaultMark(engine.GetMarkType())
+ , DefaultMark(engine.GetDefaultMark())
{
Limits = limits;
InitSnapshot = snapshot;
@@ -160,7 +169,7 @@ public:
TChanges(const TColumnEngineForLogs& engine,
TColumnEngineChanges::EType type, const TSnapshot& applySnapshot)
: TColumnEngineChanges(type)
- , DefaultMark(engine.GetMarkType())
+ , DefaultMark(engine.GetDefaultMark())
{
ApplySnapshot = applySnapshot;
}
@@ -262,11 +271,11 @@ public:
}
std::shared_ptr<arrow::Scalar> DeserializeMark(const TString& key) const override {
- return TMark(key, MarkType).Border;
+ return TMark(key, MarkType).ToScalar();
}
- const std::shared_ptr<arrow::DataType>& GetMarkType() const {
- return MarkType;
+ TMark GetDefaultMark() const {
+ return TMark(MarkType);
}
bool Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop = {}) override;