aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2022-10-07 16:40:37 +0300
committerchertus <azuikov@ydb.tech>2022-10-07 16:40:37 +0300
commit50b7367df2b1a0cb024fafe65f5254a81c8c895d (patch)
treea2a1d62e4f4c2213ed353aa55ff609f27cf5f3d9
parent2c90eec063340e2538e15e1718821387f5d21f8a (diff)
downloadydb-50b7367df2b1a0cb024fafe65f5254a81c8c895d.tar.gz
use Scalar instead of ui64 as mark in ColumnShard engine
-rw-r--r--ydb/core/formats/arrow_helpers.cpp121
-rw-r--r--ydb/core/formats/arrow_helpers.h24
-rw-r--r--ydb/core/formats/sharding.h108
-rw-r--r--ydb/core/grpc_services/rpc_log_store.cpp32
-rw-r--r--ydb/core/tx/columnshard/columnshard__costs.cpp11
-rw-r--r--ydb/core/tx/columnshard/columnshard__costs.h2
-rw-r--r--ydb/core/tx/columnshard/columnshard__propose_transaction.cpp18
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp81
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h2
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h22
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.cpp3
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h12
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h3
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp288
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h175
-rw-r--r--ydb/core/tx/columnshard/engines/db_wrapper.cpp12
-rw-r--r--ydb/core/tx/columnshard/engines/db_wrapper.h13
-rw-r--r--ydb/core/tx/columnshard/engines/granules_table.h25
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.cpp111
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h6
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp21
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h3
-rw-r--r--ydb/core/tx/columnshard/engines/scalars.cpp229
-rw-r--r--ydb/core/tx/columnshard/engines/scalars.h14
-rw-r--r--ydb/core/tx/columnshard/engines/ut_insert_table.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp145
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_read_write.cpp401
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_schema.cpp115
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h33
-rw-r--r--ydb/services/ydb/ydb_logstore_ut.cpp53
-rw-r--r--ydb/services/ydb/ydb_olapstore_ut.cpp132
32 files changed, 1600 insertions, 622 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp
index 00feaabbf5e..0dc67b9e028 100644
--- a/ydb/core/formats/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow_helpers.cpp
@@ -5,6 +5,7 @@
#include <ydb/library/binary_json/write.h>
#include <ydb/library/dynumber/dynumber.h>
+#include <ydb/core/util/yverify_stream.h>
#include <util/memory/pool.h>
#include <util/system/yassert.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/io/memory.h>
@@ -356,11 +357,12 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::
auto srcSchema = srcBatch->schema();
for (auto& name : columnNames) {
- fields.push_back(srcSchema->GetFieldByName(name));
- columns.push_back(srcBatch->GetColumnByName(name));
- if (!columns.back()) {
+ int pos = srcSchema->GetFieldIndex(name);
+ if (pos < 0) {
return {};
}
+ fields.push_back(srcSchema->field(pos));
+ columns.push_back(srcBatch->column(pos));
}
return arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(fields), srcBatch->num_rows(), columns);
@@ -374,6 +376,10 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::
for (auto& field : dstSchema->fields()) {
columns.push_back(srcBatch->GetColumnByName(field->name()));
+ if (!columns.back()->type()->Equals(field->type())) {
+ columns.back() = {};
+ }
+
if (!columns.back()) {
if (addNotExisted) {
auto result = arrow::MakeArrayOfNull(field->type(), srcBatch->num_rows());
@@ -709,6 +715,40 @@ ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column) {
return bytes;
}
+i64 LowerBound(const std::shared_ptr<arrow::Array>& array, const arrow::Scalar& border, i64 offset) {
+ i64 pos = 0;
+ SwitchType(array->type_id(), [&](const auto& type) {
+ using TWrap = std::decay_t<decltype(type)>;
+ using T = typename TWrap::T;
+ using TArray = typename arrow::TypeTraits<T>::ArrayType;
+ using TScalar = typename arrow::TypeTraits<T>::ScalarType;
+
+ auto& column = static_cast<const TArray&>(*array);
+
+ if constexpr (arrow::is_number_type<T>() || arrow::is_timestamp_type<T>()) {
+ const auto* start = column.raw_values() + offset;
+ const auto* end = column.raw_values() + column.length();
+ pos = offset;
+ pos += std::lower_bound(start, end, static_cast<const TScalar&>(border).value) - start;
+ } else if constexpr (arrow::has_string_view<T>()) {
+ arrow::util::string_view value(*static_cast<const TScalar&>(border).value);
+
+ // TODO: binary search
+ for (pos = offset; pos < column.length(); ++pos) {
+ if (!(column.GetView(pos) < value)) {
+ return true;
+ }
+ }
+ } else {
+ Y_VERIFY(false); // not implemented
+ }
+
+ return true;
+ });
+
+ return pos;
+}
+
std::shared_ptr<arrow::UInt64Array> MakeUI64Array(ui64 value, i64 size) {
auto res = arrow::MakeArrayFromScalar(arrow::UInt64Scalar(value), size);
Y_VERIFY(res.ok());
@@ -742,29 +782,65 @@ std::pair<int, int> FindMinMaxPosition(const std::shared_ptr<arrow::Array>& arra
return {minPos, maxPos};
}
-std::shared_ptr<arrow::Scalar> GetScalar(const std::shared_ptr<arrow::Array>& array, int position) {
+std::shared_ptr<arrow::Scalar> MinScalar(const std::shared_ptr<arrow::DataType>& type) {
std::shared_ptr<arrow::Scalar> out;
- SwitchType(array->type_id(), [&](const auto& type) {
- using TWrap = std::decay_t<decltype(type)>;
- using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;
- using TScalar = typename arrow::TypeTraits<typename TWrap::T>::ScalarType;
-
- auto& column = static_cast<const TArray&>(*array);
- if constexpr (std::is_same_v<TScalar, arrow::StringScalar> ||
- std::is_same_v<TScalar, arrow::BinaryScalar> ||
- std::is_same_v<TScalar, arrow::FixedSizeBinaryScalar> ||
- std::is_same_v<TScalar, arrow::Decimal128Scalar> ||
- std::is_same_v<TScalar, arrow::LargeStringScalar> ||
- std::is_same_v<TScalar, arrow::LargeBinaryScalar>) {
- // TODO
+ SwitchType(type->id(), [&](const auto& t) {
+ using TWrap = std::decay_t<decltype(t)>;
+ using T = typename TWrap::T;
+ using TScalar = typename arrow::TypeTraits<T>::ScalarType;
+
+ if constexpr (std::is_same_v<T, arrow::StringType> ||
+ std::is_same_v<T, arrow::BinaryType> ||
+ std::is_same_v<T, arrow::LargeStringType> ||
+ std::is_same_v<T, arrow::LargeBinaryType>) {
+ out = std::make_shared<TScalar>(arrow::Buffer::FromString(""), type);
+ } else if constexpr (std::is_same_v<T, arrow::FixedSizeBinaryType>) {
+ std::string s(static_cast<arrow::FixedSizeBinaryType&>(*type).byte_width(), '\0');
+ out = std::make_shared<TScalar>(arrow::Buffer::FromString(s), type);
+ } else if constexpr (std::is_same_v<T, arrow::HalfFloatType>) {
+ return false;
+ } else if constexpr (arrow::is_temporal_type<T>::value) {
+ using TCType = typename arrow::TypeTraits<T>::CType;
+ out = std::make_shared<TScalar>(Min<TCType>(), type);
+ } else if constexpr (arrow::has_c_type<T>::value) {
+ using TCType = typename arrow::TypeTraits<T>::CType;
+ out = std::make_shared<TScalar>(Min<TCType>());
} else {
- out = std::make_shared<TScalar>(column.GetView(position), array->type());
+ return false;
}
return true;
});
+ Y_VERIFY(out);
return out;
}
+std::shared_ptr<arrow::Scalar> GetScalar(const std::shared_ptr<arrow::Array>& array, int position) {
+ auto res = array->GetScalar(position);
+ Y_VERIFY(res.ok());
+ return *res;
+}
+
+bool IsGoodScalar(const std::shared_ptr<arrow::Scalar>& x) {
+ if (!x) {
+ return false;
+ }
+
+ return SwitchType(x->type->id(), [&](const auto& type) {
+ using TWrap = std::decay_t<decltype(type)>;
+ using TScalar = typename arrow::TypeTraits<typename TWrap::T>::ScalarType;
+ using TValue = std::decay_t<decltype(static_cast<const TScalar&>(*x).value)>;
+
+ if constexpr (arrow::has_string_view<typename TWrap::T>()) {
+ const auto& xval = static_cast<const TScalar&>(*x).value;
+ return xval && xval->data();
+ }
+ if constexpr (std::is_arithmetic_v<TValue>) {
+ return true;
+ }
+ return false;
+ });
+}
+
bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y) {
Y_VERIFY(x);
Y_VERIFY(y);
@@ -772,16 +848,21 @@ bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<a
}
bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y) {
- Y_VERIFY(x.type->Equals(y.type));
+ Y_VERIFY_S(x.type->Equals(y.type), x.type->ToString() + " vs " + y.type->ToString());
return SwitchType(x.type->id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
using TScalar = typename arrow::TypeTraits<typename TWrap::T>::ScalarType;
using TValue = std::decay_t<decltype(static_cast<const TScalar&>(x).value)>;
+
if constexpr (arrow::has_string_view<typename TWrap::T>()) {
const auto& xval = static_cast<const TScalar&>(x).value;
const auto& yval = static_cast<const TScalar&>(y).value;
- return TStringBuf((char*)xval->data(), xval->size()) < TStringBuf((char*)yval->data(), yval->size());
+ Y_VERIFY(xval);
+ Y_VERIFY(yval);
+ TStringBuf xBuf(reinterpret_cast<const char*>(xval->data()), xval->size());
+ TStringBuf yBuf(reinterpret_cast<const char*>(yval->data()), yval->size());
+ return xBuf < yBuf;
}
if constexpr (std::is_arithmetic_v<TValue>) {
const auto& xval = static_cast<const TScalar&>(x).value;
diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h
index 9f96d2b81b7..c9ba1b464a0 100644
--- a/ydb/core/formats/arrow_helpers.h
+++ b/ydb/core/formats/arrow_helpers.h
@@ -103,6 +103,8 @@ ui64 GetBatchDataSize(const std::shared_ptr<arrow::RecordBatch>& batch);
// Return size in bytes *not* including size of bitmap mask
ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column);
+i64 LowerBound(const std::shared_ptr<arrow::Array>& column, const arrow::Scalar& value, i64 offset = 0);
+
enum class ECompareType {
LESS = 1,
LESS_OR_EQUAL,
@@ -124,28 +126,10 @@ bool IsSortedAndUnique(const std::shared_ptr<arrow::RecordBatch>& batch,
bool desc = false);
bool HasAllColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& schema);
-template <typename TArr>
-std::shared_ptr<TArr> GetTypedColumn(const std::shared_ptr<arrow::RecordBatch>& batch, int pos) {
- auto array = batch->column(pos);
- Y_VERIFY(array);
- //Y_VERIFY(array->type_id() == arrow::Type::TIMESTAMP); // TODO
- auto column = std::static_pointer_cast<TArr>(array);
- Y_VERIFY(column);
- return column;
-}
-
-template <typename TArr>
-std::shared_ptr<TArr> GetTypedColumn(const std::shared_ptr<arrow::RecordBatch>& batch, const std::string& columnName) {
- auto array = batch->GetColumnByName(columnName);
- Y_VERIFY(array);
- //Y_VERIFY(array->type_id() == arrow::Type::TIMESTAMP); // TODO
- auto column = std::static_pointer_cast<TArr>(array);
- Y_VERIFY(column);
- return column;
-}
-
std::pair<int, int> FindMinMaxPosition(const std::shared_ptr<arrow::Array>& column);
+std::shared_ptr<arrow::Scalar> MinScalar(const std::shared_ptr<arrow::DataType>& type);
std::shared_ptr<arrow::Scalar> GetScalar(const std::shared_ptr<arrow::Array>& array, int position);
+bool IsGoodScalar(const std::shared_ptr<arrow::Scalar>& x);
bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y);
diff --git a/ydb/core/formats/sharding.h b/ydb/core/formats/sharding.h
index bd6c7b16ccd..8e22fed3eb5 100644
--- a/ydb/core/formats/sharding.h
+++ b/ydb/core/formats/sharding.h
@@ -8,56 +8,102 @@
namespace NKikimr::NArrow {
-class THashSharding {
+class TShardingBase {
+protected:
+ static void AppendField(const std::shared_ptr<arrow::Array>& array, int row, std::string& concat) {
+ NArrow::SwitchType(array->type_id(), [&](const auto& type) {
+ using TWrap = std::decay_t<decltype(type)>;
+ using T = typename TWrap::T;
+ using TArray = typename arrow::TypeTraits<T>::ArrayType;
+
+ if (!array->IsNull(row)) {
+ auto& typedArray = static_cast<const TArray&>(*array);
+ auto value = typedArray.GetView(row);
+ if constexpr (arrow::has_string_view<T>()) {
+ concat.append(value.data(), value.size());
+ } else if constexpr (arrow::has_c_type<T>()) {
+ if constexpr (arrow::is_physical_integer_type<T>()) {
+ concat.append(reinterpret_cast<const char*>(&value), sizeof(value));
+ } else {
+ // Do not use bool or floats for sharding
+ static_assert(arrow::is_boolean_type<T>() || arrow::is_floating_type<T>());
+ }
+ } else {
+ static_assert(arrow::is_decimal_type<T>());
+ }
+ }
+ return true;
+ });
+ }
+};
+
+class THashSharding : public TShardingBase {
public:
- THashSharding(ui32 shardsCount)
+ THashSharding(ui32 shardsCount, ui64 seed = 0)
: ShardsCount(shardsCount)
+ , Seed(seed)
{}
std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch,
const TVector<TString>& shardingColumns) const
{
- if (shardingColumns.size() != 1) {
- return {};
- }
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+ columns.reserve(shardingColumns.size());
- auto array = batch->GetColumnByName(shardingColumns[0]);
- if (!array) {
- return {};
+ for (auto& colName : shardingColumns) {
+ auto array = batch->GetColumnByName(colName);
+ if (!array) {
+ return {};
+ }
+ columns.emplace_back(array);
}
std::vector<ui32> out(batch->num_rows());
- SwitchType(array->type_id(), [&](const auto& type) {
- using TWrap = std::decay_t<decltype(type)>;
- using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;
+ if (columns.size() == 1) {
+ auto& array = columns[0];
+ SwitchType(array->type_id(), [&](const auto& type) {
+ using TWrap = std::decay_t<decltype(type)>;
+ using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;
- auto& column = static_cast<const TArray&>(*array);
+ auto& column = static_cast<const TArray&>(*array);
+ for (int row = 0; row < batch->num_rows(); ++row) {
+ out[row] = ShardNo(column.GetView(row));
+ }
+ return true;
+ });
+ } else {
+ std::string concat;
for (int row = 0; row < batch->num_rows(); ++row) {
- out[row] = ShardNo(column.GetView(row));
+ concat.clear();
+ for (auto& column : columns) {
+ AppendField(column, row, concat);
+ }
+
+ out[row] = ShardNo(concat);
}
- return true;
- });
+ }
return out;
}
private:
ui32 ShardsCount;
+ ui64 Seed;
template <typename T, std::enable_if_t<std::is_arithmetic<T>::value, bool> = true>
ui32 ShardNo(T value) const {
- return XXH64(&value, sizeof(value), 0) % ShardsCount;
+ return XXH64(&value, sizeof(value), Seed) % ShardsCount;
}
ui32 ShardNo(arrow::util::string_view value) const {
- return XXH64(value.data(), value.size(), 0) % ShardsCount;
+ return XXH64(value.data(), value.size(), Seed) % ShardsCount;
}
};
// KIKIMR-11529
-class TLogsSharding {
+class TLogsSharding : public TShardingBase {
public:
static constexpr ui32 DEFAULT_ACITVE_SHARDS = 10;
static constexpr TDuration DEFAULT_CHANGE_PERIOD = TDuration::Minutes(5);
@@ -138,32 +184,6 @@ private:
ui32 NumActive;
ui64 TsMin;
ui64 ChangePeriod;
-
- static void AppendField(const std::shared_ptr<arrow::Array>& array, int row, std::string& concat) {
- NArrow::SwitchType(array->type_id(), [&](const auto& type) {
- using TWrap = std::decay_t<decltype(type)>;
- using T = typename TWrap::T;
- using TArray = typename arrow::TypeTraits<T>::ArrayType;
-
- if (!array->IsNull(row)) {
- auto& typedArray = static_cast<const TArray&>(*array);
- auto value = typedArray.GetView(row);
- if constexpr (arrow::has_string_view<T>()) {
- concat.append(value.data(), value.size());
- } else if constexpr (arrow::has_c_type<T>()) {
- if constexpr (arrow::is_physical_integer_type<T>()) {
- concat.append(reinterpret_cast<const char*>(&value), sizeof(value));
- } else {
- // Do not use bool or floats for sharding
- static_assert(arrow::is_boolean_type<T>() || arrow::is_floating_type<T>());
- }
- } else {
- static_assert(arrow::is_decimal_type<T>());
- }
- }
- return true;
- });
- }
};
}
diff --git a/ydb/core/grpc_services/rpc_log_store.cpp b/ydb/core/grpc_services/rpc_log_store.cpp
index 732303407bc..ea15e6a2f4b 100644
--- a/ydb/core/grpc_services/rpc_log_store.cpp
+++ b/ydb/core/grpc_services/rpc_log_store.cpp
@@ -76,6 +76,34 @@ void ConvertCompressionFromInternalToPublic(const NKikimrSchemeOp::TCompressionO
to.set_compression_level(from.GetCompressionLevel());
}
+static bool IsAllowedFirstPkField(ui32 typeId) {
+ switch (typeId) {
+ //case NYql::NProto::Bool
+ case NYql::NProto::Uint8: // Byte
+ case NYql::NProto::Int32:
+ case NYql::NProto::Uint32:
+ case NYql::NProto::Int64:
+ case NYql::NProto::Uint64:
+ //case NYql::NProto::Float:
+ //case NYql::NProto::Double:
+ case NYql::NProto::String:
+ case NYql::NProto::Utf8:
+ //case NYql::NProto::Yson:
+ //case NYql::NProto::Json:
+ //case NYql::NProto::JsonDocument:
+ case NYql::NProto::Date:
+ case NYql::NProto::Datetime:
+ case NYql::NProto::Timestamp:
+ //case NYql::NProto::Interval:
+ //case NYql::NProto::Decimal:
+ //case NYql::NProto::DyNumber:
+ return true;
+ default:
+ break;
+ }
+ return false;
+}
+
bool ConvertSchemaFromPublicToInternal(const Ydb::LogStore::Schema& from, NKikimrSchemeOp::TColumnTableSchema& to,
Ydb::StatusIds::StatusCode& status, TString& error)
{
@@ -103,8 +131,8 @@ bool ConvertSchemaFromPublicToInternal(const Ydb::LogStore::Schema& from, NKikim
col->SetType(typeName);
key.erase(column.name());
- if (column.name() == firstKeyColumn && typeInfo.GetTypeId() != NYql::NProto::Timestamp) {
- error = "not supported first PK column type for LogStore. Only Timestamp columns are allowed for now.";
+ if (column.name() == firstKeyColumn && !IsAllowedFirstPkField(typeInfo.GetTypeId())) {
+ error = "not supported first PK column type for LogStore";
return false;
}
}
diff --git a/ydb/core/tx/columnshard/columnshard__costs.cpp b/ydb/core/tx/columnshard/columnshard__costs.cpp
index 4405a9695d2..f258d669905 100644
--- a/ydb/core/tx/columnshard/columnshard__costs.cpp
+++ b/ydb/core/tx/columnshard/columnshard__costs.cpp
@@ -9,7 +9,7 @@
namespace NKikimr::NOlap::NCosts {
void TKeyRangesBuilder::AddMarkFromGranule(const TGranuleRecord& record) {
- Constructor.StartRecord(true).AddRecordValue(std::make_shared<arrow::UInt64Scalar>(ExtractKey(record.IndexKey)));
+ Constructor.StartRecord(true).AddRecordValue(record.Mark);
Features.emplace_back(TMarkRangeFeatures());
}
@@ -32,19 +32,14 @@ void TKeyRangesBuilder::FillRangeMarks(const std::shared_ptr<NOlap::TPredicate>&
Y_VERIFY(!LeftBorderOpened);
LeftBorderOpened = true;
}
- for (auto&& i : granuleRecords) {
- AddMarkFromGranule(i);
+ for (auto&& rec : granuleRecords) {
+ AddMarkFromGranule(rec);
}
if (AddMarkFromPredicate(right)) {
Features.back().SetIntervalSkipped(true);
}
}
-ui64 TKeyRangesBuilder::ExtractKey(const TString& key) {
- Y_VERIFY(key.size() == 8);
- return *reinterpret_cast<const ui64*>(key.data());
-}
-
NKikimrKqp::TEvRemoteCostData::TCostInfo TKeyRanges::SerializeToProto() const {
NKikimrKqp::TEvRemoteCostData::TCostInfo result;
result.SetLeftBorderOpened(LeftBorderOpened);
diff --git a/ydb/core/tx/columnshard/columnshard__costs.h b/ydb/core/tx/columnshard/columnshard__costs.h
index 6a11f3187c5..b7cb20ebe5d 100644
--- a/ydb/core/tx/columnshard/columnshard__costs.h
+++ b/ydb/core/tx/columnshard/columnshard__costs.h
@@ -98,7 +98,7 @@ private:
const TIndexInfo& IndexInfo;
bool AddMarkFromPredicate(const std::shared_ptr<NOlap::TPredicate>& p);
void AddMarkFromGranule(const TGranuleRecord& record);
- static ui64 ExtractKey(const TString& key);
+
public:
TKeyRangesBuilder(const TIndexInfo& indexInfo);
void Reserve(const ui32 num) {
diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
index cb18edaf0da..06813954659 100644
--- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
+++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
@@ -36,6 +36,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
auto txKind = record.GetTxKind();
//ui64 ssId = record.GetSchemeShardId();
ui64 txId = record.GetTxId();
+ auto& txBody = record.GetTxBody();
auto status = NKikimrTxColumnShard::EResultStatus::ERROR;
TString statusMessage;
@@ -45,12 +46,19 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
switch (txKind) {
case NKikimrTxColumnShard::TX_KIND_SCHEMA: {
TColumnShard::TAlterMeta meta;
- if (!meta.Body.ParseFromString(record.GetTxBody())) {
+ if (!meta.Body.ParseFromString(txBody)) {
statusMessage = TStringBuilder()
<< "Schema TxId# " << txId << " cannot be parsed";
break;
}
+ // Invalid body generated at a newer SchemeShard
+ if (!meta.Validate()) {
+ statusMessage = TStringBuilder()
+ << "Schema TxId# " << txId << " cannot be proposed";
+ break;
+ }
+
Y_VERIFY(record.HasSchemeShardId());
if (Self->CurrentSchemeShardId == 0) {
Self->CurrentSchemeShardId = record.GetSchemeShardId();
@@ -91,7 +99,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
txInfo.TxKind = txKind;
txInfo.Source = Ev->Get()->GetSource();
txInfo.Cookie = Ev->Cookie;
- Schema::SaveTxInfo(db, txInfo.TxId, txInfo.TxKind, record.GetTxBody(), txInfo.MaxStep, txInfo.Source, txInfo.Cookie);
+ Schema::SaveTxInfo(db, txInfo.TxId, txInfo.TxKind, txBody, txInfo.MaxStep, txInfo.Source, txInfo.Cookie);
if (!Self->AltersInFlight.contains(txId)) {
Self->AltersInFlight.emplace(txId, std::move(meta));
@@ -113,7 +121,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
}
NKikimrTxColumnShard::TCommitTxBody body;
- if (!body.ParseFromString(record.GetTxBody())) {
+ if (!body.ParseFromString(txBody)) {
statusMessage = TStringBuilder()
<< "Commit TxId# " << txId << " cannot be parsed";
break;
@@ -168,7 +176,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
txInfo.MaxStep = maxStep;
txInfo.Source = Ev->Get()->GetSource();
txInfo.Cookie = Ev->Cookie;
- Schema::SaveTxInfo(db, txInfo.TxId, txInfo.TxKind, record.GetTxBody(), txInfo.MaxStep, txInfo.Source, txInfo.Cookie);
+ Schema::SaveTxInfo(db, txInfo.TxId, txInfo.TxKind, txBody, txInfo.MaxStep, txInfo.Source, txInfo.Cookie);
Self->CommitsInFlight.emplace(txId, std::move(meta));
@@ -185,7 +193,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
// TODO: make real tx: save and progress with tablets restart support
NKikimrTxColumnShard::TTtlTxBody ttlBody;
- if (!ttlBody.ParseFromString(record.GetTxBody())) {
+ if (!ttlBody.ParseFromString(txBody)) {
statusMessage = "TTL tx cannot be parsed";
status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR;
break;
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index b0cb8d70e1a..d31e9436ba8 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -22,8 +22,89 @@ NTabletPipe::TClientConfig GetPipeClientConfig() {
return config;
}
+bool ValidateTableSchema(const NKikimrSchemeOp::TColumnTableSchema& schema) {
+ namespace NTypeIds = NScheme::NTypeIds;
+
+ static const THashSet<NScheme::TTypeId> supportedTypes = {
+ NTypeIds::Timestamp,
+ NTypeIds::Int8,
+ NTypeIds::Int16,
+ NTypeIds::Int32,
+ NTypeIds::Int64,
+ NTypeIds::Uint8,
+ NTypeIds::Uint16,
+ NTypeIds::Uint32,
+ NTypeIds::Uint64,
+ NTypeIds::Date,
+ NTypeIds::Datetime,
+ //NTypeIds::Interval,
+ //NTypeIds::Float,
+ //NTypeIds::Double,
+ NTypeIds::String,
+ NTypeIds::Utf8
+ };
+
+ if (!schema.HasEngine() ||
+ schema.GetEngine() != NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES) {
+ return false;
+ }
+
+ if (!schema.KeyColumnNamesSize()) {
+ return false;
+ }
+
+ TString firstKeyColumn = schema.GetKeyColumnNames()[0];
+ THashSet<TString> keyColumns(schema.GetKeyColumnNames().begin(), schema.GetKeyColumnNames().end());
+
+ for (const NKikimrSchemeOp::TOlapColumnDescription& column : schema.GetColumns()) {
+ TString name = column.GetName();
+ keyColumns.erase(name);
+
+ if (name == firstKeyColumn && !supportedTypes.count(column.GetTypeId())) {
+ return false;
+ }
+ }
+
+ if (!keyColumns.empty()) {
+ return false;
+ }
+ return true;
}
+bool ValidateTablePreset(const NKikimrSchemeOp::TColumnTableSchemaPreset& preset) {
+ if (preset.HasName() && preset.GetName() != "default") {
+ return false;
+ }
+ return ValidateTableSchema(preset.GetSchema());
+}
+
+}
+
+bool TColumnShard::TAlterMeta::Validate() const {
+ switch (Body.TxBody_case()) {
+ case NKikimrTxColumnShard::TSchemaTxBody::kInitShard:
+ break;
+ case NKikimrTxColumnShard::TSchemaTxBody::kEnsureTables:
+ for (auto& table : Body.GetEnsureTables().GetTables()) {
+ if (table.HasSchemaPreset() && !ValidateTablePreset(table.GetSchemaPreset())) {
+ return false;
+ }
+ if (table.HasSchema() && !ValidateTableSchema(table.GetSchema())) {
+ return false;
+ }
+ // TODO: validate TtlSettings
+ }
+ break;
+ case NKikimrTxColumnShard::TSchemaTxBody::kAlterTable:
+ case NKikimrTxColumnShard::TSchemaTxBody::kDropTable:
+ case NKikimrTxColumnShard::TSchemaTxBody::kAlterStore:
+ case NKikimrTxColumnShard::TSchemaTxBody::TXBODY_NOT_SET:
+ break;
+ }
+ return true;
+}
+
+
TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
: TActor(&TThis::StateInit)
, TTabletExecutedFlat(info, tablet, nullptr)
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index f791eee3549..3f1f6c33aa7 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -271,6 +271,8 @@ private:
struct TAlterMeta {
NKikimrTxColumnShard::TSchemaTxBody Body;
THashSet<TActorId> NotifySubscribers;
+
+ bool Validate() const;
};
struct TCommitMeta {
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index 62758999453..5bff08f0c33 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -494,32 +494,34 @@ struct Schema : NIceDb::Schema {
// IndexGranules activities
- static void IndexGranules_Write(NIceDb::TNiceDb& db, ui32 index, const TGranuleRecord& row) {
- db.Table<IndexGranules>().Key(index, row.PathId, row.IndexKey).Update(
+ static void IndexGranules_Write(NIceDb::TNiceDb& db, ui32 index, const NOlap::IColumnEngine& engine,
+ const TGranuleRecord& row) {
+ db.Table<IndexGranules>().Key(index, row.PathId, engine.SerializeMark(row.Mark)).Update(
NIceDb::TUpdate<IndexGranules::Granule>(row.Granule),
NIceDb::TUpdate<IndexGranules::PlanStep>(row.CreatedAt.PlanStep),
NIceDb::TUpdate<IndexGranules::TxId>(row.CreatedAt.TxId)
);
}
- static void IndexGranules_Erase(NIceDb::TNiceDb& db, ui32 index, const TGranuleRecord& row) {
- db.Table<IndexGranules>().Key(index, row.PathId, row.IndexKey).Delete();
+ static void IndexGranules_Erase(NIceDb::TNiceDb& db, ui32 index, const NOlap::IColumnEngine& engine,
+ const TGranuleRecord& row) {
+ db.Table<IndexGranules>().Key(index, row.PathId, engine.SerializeMark(row.Mark)).Delete();
}
- static bool IndexGranules_Load(NIceDb::TNiceDb& db, ui32 index, std::function<void(TGranuleRecord&&)> callback) {
+ static bool IndexGranules_Load(NIceDb::TNiceDb& db, ui32 index, const NOlap::IColumnEngine& engine,
+ std::function<void(TGranuleRecord&&)> callback) {
auto rowset = db.Table<IndexGranules>().Prefix(index).Select();
if (!rowset.IsReady())
return false;
while (!rowset.EndOfSet()) {
- TGranuleRecord row;
- row.PathId = rowset.GetValue<IndexGranules::PathId>();
- row.IndexKey = rowset.GetValue<IndexGranules::IndexKey>();
- row.Granule = rowset.GetValue<IndexGranules::Granule>();
+ ui64 pathId = rowset.GetValue<IndexGranules::PathId>();
+ TString indexKey = rowset.GetValue<IndexGranules::IndexKey>();
+ ui64 granule = rowset.GetValue<IndexGranules::Granule>();
ui64 planStep = rowset.GetValue<IndexGranules::PlanStep>();
ui64 txId = rowset.GetValue<IndexGranules::TxId>();
- row.CreatedAt = {planStep, txId};
+ TGranuleRecord row(pathId, granule, {planStep, txId}, engine.DeserializeMark(indexKey));
callback(std::move(row));
if (!rowset.Next())
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
index 786359eb054..f409132c1c5 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
@@ -183,7 +183,8 @@ TVector<TCell> MakeTestCells(const TVector<TTypeInfo>& types, ui32 value, TVecto
mem.push_back("{ \"a\" = [ { \"b\" = 1; } ]; }");
const TString& str = mem.back();
cells.push_back(TCell(str.data(), str.size()));
- } else if (type == NTypeIds::Timestamp || type == NTypeIds::Uint64 || type == NTypeIds::Int64) {
+ } else if (type == NTypeIds::Timestamp || type == NTypeIds::Interval ||
+ type == NTypeIds::Uint64 || type == NTypeIds::Int64) {
cells.push_back(TCell::Make<ui64>(value));
} else if (type == NTypeIds::Uint32 || type == NTypeIds::Int32 || type == NTypeIds::Datetime) {
cells.push_back(TCell::Make<ui32>(value));
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index 5cb92a8ae83..b218c56b47c 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -88,14 +88,13 @@ struct TTestSchema {
}
};
- static auto YdbSchema() {
+ static auto YdbSchema(const std::pair<TString, TTypeInfo>& firstKeyItem = {"timestamp", TTypeInfo(NTypeIds::Timestamp) }) {
TVector<std::pair<TString, TTypeInfo>> schema = {
// PK
- {"timestamp", TTypeInfo(NTypeIds::Timestamp) },
+ firstKeyItem,
{"resource_type", TTypeInfo(NTypeIds::Utf8) },
{"resource_id", TTypeInfo(NTypeIds::Utf8) },
{"uid", TTypeInfo(NTypeIds::Utf8) },
- //
{"level", TTypeInfo(NTypeIds::Int32) },
{"message", TTypeInfo(NTypeIds::Utf8) },
{"json_payload", TTypeInfo(NTypeIds::Json) },
@@ -179,7 +178,8 @@ struct TTestSchema {
return col;
}
- static TString CreateTableTxBody(ui64 pathId, const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns,
+ static TString CreateTableTxBody(ui64 pathId, const TVector<std::pair<TString, TTypeInfo>>& columns,
+ const TVector<std::pair<TString, TTypeInfo>>& pk,
const TTableSpecials& specials = {}) {
NKikimrTxColumnShard::TSchemaTxBody tx;
auto* table = tx.MutableEnsureTables()->AddTables();
@@ -199,9 +199,7 @@ struct TTestSchema {
*schema->MutableColumns()->Add() = CreateColumn(i + 1, columns[i].first, columns[i].second);
}
- auto pk = columns;
- Y_VERIFY(pk.size() >= 4);
- pk.resize(4);
+ Y_VERIFY(pk.size() == 4);
for (auto& column : ExtractNames(pk)) {
schema->AddKeyColumnNames(column);
}
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.txt b/ydb/core/tx/columnshard/engines/CMakeLists.txt
index 0aee1e51a3e..5da5cd8162e 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.txt
@@ -33,4 +33,5 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scalars.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 85704b2a0fe..4cfc7fa2ef6 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -372,6 +372,9 @@ public:
virtual const THashSet<ui64>* GetOverloadedGranules(ui64 /*pathId*/) const { return nullptr; }
virtual bool HasOverloadedGranules() const { return false; }
+ virtual TString SerializeMark(const std::shared_ptr<arrow::Scalar>& scalar) const = 0;
+ virtual std::shared_ptr<arrow::Scalar> DeserializeMark(const TString& key) const = 0;
+
virtual bool Load(IDbWrapper& db, const THashSet<ui64>& pathsToDrop = {}) = 0;
virtual std::shared_ptr<TSelectInfo> Select(ui64 pathId, TSnapshot snapshot,
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 53e46ed3b27..5b5af6b9461 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -6,15 +6,16 @@
namespace NKikimr::NOlap {
-std::shared_ptr<arrow::TimestampArray> GetTimestampColumn(const TIndexInfo& indexInfo,
- const std::shared_ptr<arrow::RecordBatch>& batch) {
+std::shared_ptr<arrow::Array> GetFirstPKColumn(const TIndexInfo& indexInfo,
+ const std::shared_ptr<arrow::RecordBatch>& batch) {
TString columnName = indexInfo.GetPK()[0].first;
- std::string tsColumnName(columnName.data(), columnName.size());
- return NArrow::GetTypedColumn<arrow::TimestampArray>(batch, tsColumnName);
+ return batch->GetColumnByName(std::string(columnName.data(), columnName.size()));
}
namespace {
+using TMark = TColumnEngineForLogs::TMark;
+
arrow::ipc::IpcWriteOptions WriteOptions(const TCompression& compression) {
auto& codec = compression.Codec;
@@ -36,17 +37,18 @@ arrow::ipc::IpcWriteOptions WriteOptions(const TCompression& compression) {
return options;
}
-ui64 ExtractTimestamp(const std::shared_ptr<TPredicate>& pkPredicate, const std::shared_ptr<arrow::Schema>& key) {
+std::shared_ptr<arrow::Scalar> ExtractFirstKey(const std::shared_ptr<TPredicate>& pkPredicate,
+ const std::shared_ptr<arrow::Schema>& key) {
if (pkPredicate) {
Y_VERIFY(pkPredicate->Good());
Y_VERIFY(key->num_fields() == 1);
Y_VERIFY(key->field(0)->Equals(pkPredicate->Batch->schema()->field(0)));
- auto array = NArrow::GetTypedColumn<arrow::TimestampArray>(pkPredicate->Batch, 0);
+ auto array = pkPredicate->Batch->column(0);
Y_VERIFY(array && array->length() == 1);
- return array->Value(0);
+ return *array->GetScalar(0);
}
- return 0;
+ return {};
}
// Although source batches are ordered only by PK (sorting key) resulting pathBatches are ordered by extended key.
@@ -188,16 +190,17 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> PortionsToBatches(const TIndexI
return batches;
}
-bool InitInGranuleMerge(TVector<TPortionInfo>& portions, const TCompactionLimits& limits, const TSnapshot& snap,
- TMap<ui64, ui64>& borders) {
+bool InitInGranuleMerge(const TMark& granuleMark, TVector<TPortionInfo>& portions, const TCompactionLimits& limits,
+ const TSnapshot& snap, TMap<TMark, ui64>& borders) {
ui64 oldTimePlanStep = snap.PlanStep - TDuration::Seconds(limits.InGranuleCompactSeconds).MilliSeconds();
ui32 insertedCount = 0;
ui32 insertedNew = 0;
THashSet<ui64> filtered;
THashSet<ui64> goodCompacted;
+ THashSet<ui64> nextToGood;
{
- TMap<ui64, TVector<const TPortionInfo*>> points;
+ TMap<TMark, TVector<const TPortionInfo*>> points;
for (auto& portionInfo : portions) {
if (portionInfo.IsInserted()) {
@@ -212,38 +215,46 @@ bool InitInGranuleMerge(TVector<TPortionInfo>& portions, const TCompactionLimits
auto start = portionInfo.PkStart();
auto end = portionInfo.PkEnd();
Y_VERIFY(start && end);
- ui64 min = static_cast<const arrow::TimestampScalar&>(*start).value;
- ui64 max = static_cast<const arrow::TimestampScalar&>(*end).value;
- points[min].push_back(&portionInfo); // insert start
- points[max].push_back(nullptr); // insert end
+ points[TMark(start)].push_back(&portionInfo);
+ points[TMark(end)].push_back(nullptr);
}
- ui32 bucketCounter = 0;
+ ui32 countInBucket = 0;
+ ui64 bucketStartPortion = 0;
+ bool isGood = false;
int sum = 0;
- const TPortionInfo* lastPortion{};
for (auto& [key, vec] : points) {
for (auto& portionInfo : vec) {
if (portionInfo) {
++sum;
- lastPortion = portionInfo;
- ++bucketCounter;
+ ui64 currentPortion = portionInfo->Portion();
+ if (!bucketStartPortion) {
+ bucketStartPortion = currentPortion;
+ }
+ ++countInBucket;
+
+ ui64 prevIsGood = isGood;
+ isGood = goodCompacted.count(currentPortion);
+ if (prevIsGood && !isGood) {
+ nextToGood.insert(currentPortion);
+ }
} else {
--sum;
}
}
if (!sum) { // count(start) == count(end), start new range
- if (bucketCounter == 1) {
- Y_VERIFY(lastPortion);
+ Y_VERIFY(bucketStartPortion);
+ if (countInBucket == 1) {
// We do not want to merge big compacted portions with inserted ones if there's no intersections.
- ui64 maxBlobSize = lastPortion->BlobsSizes().second;
- if (!lastPortion->IsInserted() && maxBlobSize >= limits.GoodBlobSize) {
- filtered.insert(lastPortion->Portion());
+ if (isGood) {
+ filtered.insert(bucketStartPortion);
}
}
- bucketCounter = 0;
+ countInBucket = 0;
+ bucketStartPortion = 0;
}
}
}
@@ -261,27 +272,25 @@ bool InitInGranuleMerge(TVector<TPortionInfo>& portions, const TCompactionLimits
// It's a map for SliceIntoGranules(). We use fake granule ids here to slice batch with borders.
// We could merge inserted portions alltogether and slice result with filtered borders to prevent intersections.
- borders[0] = 0;
+ borders[granuleMark] = 0;
TVector<TPortionInfo> tmp;
tmp.reserve(portions.size());
for (auto& portionInfo : portions) {
+ ui64 curPortion = portionInfo.Portion();
+
// Prevent merge of compacted portions with no intersections
- if (filtered.count(portionInfo.Portion())) {
+ if (filtered.count(curPortion)) {
auto start = portionInfo.PkStart();
Y_VERIFY(start);
- ui64 ts = static_cast<const arrow::TimestampScalar&>(*start).value;
- borders[ts] = 0;
- // No need to add its end
+ borders[TMark(start)] = 0;
} else {
- // Merge good compacted portion with intersections but prevent its unneeded growth
- if (goodCompacted.count(portionInfo.Portion())) {
- // Add "first after end" border but do not add start: allow to merge with older or intersected data.
- // Do not add start to prevent [good] [small] [good] portions pattern.
- auto end = portionInfo.PkEnd();
- Y_VERIFY(end);
- ui64 ts = static_cast<const arrow::TimestampScalar&>(*end).value + 1;
- borders[ts] = 0;
+ // nextToGood borders potentially split good compacted portions into 2 parts:
+ // the first one without intersections and the second with them
+ if (goodCompacted.count(curPortion) || nextToGood.count(curPortion)) {
+ auto start = portionInfo.PkStart();
+ Y_VERIFY(start);
+ borders[TMark(start)] = 0;
}
tmp.emplace_back(std::move(portionInfo));
@@ -290,7 +299,7 @@ bool InitInGranuleMerge(TVector<TPortionInfo>& portions, const TCompactionLimits
tmp.swap(portions);
if (borders.size() == 1) {
- Y_VERIFY(borders.begin()->first == 0);
+ Y_VERIFY(borders.begin()->first == granuleMark);
borders.clear();
}
@@ -319,30 +328,29 @@ inline THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> SliceIntoGranulesImpl
THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> out;
if (tsGranules.size() == 1) {
- Y_VERIFY(tsGranules.begin()->first == 0);
+ //Y_VERIFY(tsGranules.begin()->first.IsDefault());
ui64 granule = tsGranules.begin()->second;
out.emplace(granule, batch);
} else {
- std::shared_ptr<arrow::TimestampArray> keyColumn = GetTimestampColumn(indexInfo, batch);
+ auto keyColumn = GetFirstPKColumn(indexInfo, batch);
Y_VERIFY(keyColumn && keyColumn->length() > 0);
- TVector<i64> borders;
+ TVector<TMark> borders;
borders.reserve(tsGranules.size());
for (auto& [ts, granule] : tsGranules) {
borders.push_back(ts);
}
ui32 i = 0;
- int offset = 0;
+ i64 offset = 0;
for (auto& [ts, granule] : tsGranules) {
- int end = keyColumn->length();
+ i64 end = keyColumn->length();
if (i < borders.size() - 1) {
- i64 border = borders[i + 1];
- const auto* ptr = keyColumn->raw_values();
- end = std::lower_bound(ptr + offset, ptr + keyColumn->length(), border) - ptr;
+ TMark border = borders[i + 1];
+ end = NArrow::LowerBound(keyColumn, *border.Border, offset);
}
- int size = end - offset;
+ i64 size = end - offset;
if (size) {
Y_VERIFY(size > 0);
Y_VERIFY(!out.count(granule));
@@ -359,17 +367,19 @@ inline THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> SliceIntoGranulesImpl
}
THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>
-SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const TMap<ui64, ui64>& tsGranules,
+SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch,
+ const TMap<TMark, ui64>& markGranules,
const TIndexInfo& indexInfo)
{
- return SliceIntoGranulesImpl(batch, tsGranules, indexInfo);
+ return SliceIntoGranulesImpl(batch, markGranules, indexInfo);
}
THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>
-SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::pair<ui64, ui64>>& tsGranules,
+SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch,
+ const std::vector<std::pair<TMark, ui64>>& markGranules,
const TIndexInfo& indexInfo)
{
- return SliceIntoGranulesImpl(batch, tsGranules, indexInfo);
+ return SliceIntoGranulesImpl(batch, markGranules, indexInfo);
}
TColumnEngineForLogs::TColumnEngineForLogs(TIndexInfo&& info, ui64 tabletId, const TCompactionLimits& limits)
@@ -384,8 +394,12 @@ TColumnEngineForLogs::TColumnEngineForLogs(TIndexInfo&& info, ui64 tabletId, con
/// * apply PK predicate before REPLACE
IndexInfo.SetAllKeys(IndexInfo.GetPK(), {0});
+ auto& indexKey = IndexInfo.GetIndexKey();
+ Y_VERIFY(indexKey->num_fields() == 1);
+ MarkType = indexKey->field(0)->type();
+
ui32 indexId = IndexInfo.GetId();
- GranulesTable = std::make_shared<TGranulesTable>(indexId);
+ GranulesTable = std::make_shared<TGranulesTable>(*this, indexId);
ColumnsTable = std::make_shared<TColumnsTable>(indexId);
CountersTable = std::make_shared<TCountersTable>(indexId);
}
@@ -564,7 +578,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, const THashSet<ui64>& pathsToDro
for (auto& emptyGranules : EmptyGranuleTracks(pathId)) {
// keep first one => megre, keep nothing => drop.
bool keepFirst = !pathsToDrop.count(pathId);
- for (auto& [ts, granule] : emptyGranules) {
+ for (auto& [mark, granule] : emptyGranules) {
if (keepFirst) {
keepFirst = false;
continue;
@@ -574,7 +588,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, const THashSet<ui64>& pathsToDro
auto spg = Granules[granule];
Y_VERIFY(spg);
GranulesTable->Erase(db, spg->Record);
- EraseGranule(pathId, granule, ts);
+ EraseGranule(pathId, granule, mark);
}
}
}
@@ -627,7 +641,7 @@ bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) {
std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(TVector<TInsertedData>&& dataToIndex) {
Y_VERIFY(dataToIndex.size());
- auto changes = std::make_shared<TChanges>(std::move(dataToIndex), Limits);
+ auto changes = std::make_shared<TChanges>(*this, std::move(dataToIndex), Limits);
ui32 reserveGranules = 0;
changes->InitSnapshot = LastSnapshot;
@@ -669,7 +683,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std:
const TSnapshot& outdatedSnapshot) {
Y_VERIFY(info);
- auto changes = std::make_shared<TChanges>(std::move(info), Limits);
+ auto changes = std::make_shared<TChanges>(*this, std::move(info), Limits);
changes->InitSnapshot = LastSnapshot;
Y_VERIFY(changes->CompactionInfo->Granules.size() == 1);
@@ -697,17 +711,17 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std:
ui64 pathId = spg->Record.PathId;
Y_VERIFY(PathGranules.count(pathId));
- for (const auto& [ts, pathGranule] : PathGranules[pathId]) {
+ for (const auto& [mark, pathGranule] : PathGranules[pathId]) {
if (pathGranule == granule) {
- changes->SrcGranule = {pathId, granule, ts};
+ changes->SrcGranule = TChanges::TSrcGranule(pathId, granule, mark);
break;
}
}
- Y_VERIFY(changes->SrcGranule.Good());
+ Y_VERIFY(changes->SrcGranule);
if (changes->CompactionInfo->InGranule) {
TSnapshot completedSnap = (outdatedSnapshot < LastSnapshot) ? LastSnapshot : outdatedSnapshot;
- if (!InitInGranuleMerge(portions, Limits, completedSnap, changes->MergeBorders)) {
+ if (!InitInGranuleMerge(changes->SrcGranule->Mark, portions, Limits, completedSnap, changes->MergeBorders)) {
return {};
}
} else {
@@ -720,7 +734,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std:
std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const TSnapshot& snapshot,
THashSet<ui64>& pathsToDrop) {
- auto changes = std::make_shared<TChanges>(snapshot, Limits);
+ auto changes = std::make_shared<TChanges>(*this, snapshot, Limits);
// Add all portions from dropped paths
THashSet<ui64> dropPortions;
@@ -773,7 +787,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
}
TSnapshot fakeSnapshot = {1, 1}; // TODO: better snapshot
- auto changes = std::make_shared<TChanges>(TColumnEngineChanges::TTL, fakeSnapshot);
+ auto changes = std::make_shared<TChanges>(*this, TColumnEngineChanges::TTL, fakeSnapshot);
ui64 evicttionSize = 0;
bool allowEviction = true;
ui64 dropBlobs = 0;
@@ -836,13 +850,13 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
return changes;
}
-TVector<TVector<std::pair<ui64, ui64>>> TColumnEngineForLogs::EmptyGranuleTracks(ui64 pathId) const {
+TVector<TVector<std::pair<TMark, ui64>>> TColumnEngineForLogs::EmptyGranuleTracks(ui64 pathId) const {
Y_VERIFY(PathGranules.count(pathId));
const auto& pathGranules = PathGranules.find(pathId)->second;
- TVector<TVector<std::pair<ui64, ui64>>> emptyGranules;
+ TVector<TVector<std::pair<TMark, ui64>>> emptyGranules;
ui64 emptyStart = 0;
- for (const auto& [ts, granule]: pathGranules) {
+ for (const auto& [mark, granule]: pathGranules) {
Y_VERIFY(Granules.count(granule));
auto spg = Granules.find(granule)->second;
Y_VERIFY(spg);
@@ -852,7 +866,7 @@ TVector<TVector<std::pair<ui64, ui64>>> TColumnEngineForLogs::EmptyGranuleTracks
emptyGranules.push_back({});
emptyStart = granule;
}
- emptyGranules.back().emplace_back(ts, granule);
+ emptyGranules.back().emplace_back(mark, granule);
} else if (emptyStart) {
emptyStart = 0;
}
@@ -923,10 +937,10 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE
// Set x-snapshot to switched portions
if (changes->IsCompaction()) {
- Y_VERIFY(changes->SrcGranule.Good());
+ Y_VERIFY(changes->SrcGranule);
/// @warning set granule not in split even if tx would be aborted later
- GranulesInSplit.erase(changes->SrcGranule.Granule);
+ GranulesInSplit.erase(changes->SrcGranule->Granule);
Y_VERIFY(changes->CompactionInfo);
for (auto& portionInfo : changes->SwitchedPortions) {
@@ -973,7 +987,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE
granules[portionInfo.Granule()] = {};
}
} else if (changes->IsCompaction() && !changes->CompactionInfo->InGranule) {
- granules[changes->SrcGranule.Granule] = {};
+ granules[changes->SrcGranule->Granule] = {};
} else {
for (auto& portionInfo : changes->AppendedPortions) {
granules[portionInfo.Granule()] = {};
@@ -993,13 +1007,13 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
if (changes.CompactionInfo->InGranule) {
#if 0
if (changes.SwitchedPortions.size() <= changes.AppendedPortions.size()) {
- LOG_S_ERROR("Cannot compact granule " << changes.SrcGranule.Granule << " at tablet " << TabletId);
+ LOG_S_ERROR("Cannot compact granule " << changes.SrcGranule->Granule << " at tablet " << TabletId);
return false;
}
#endif
} else {
if (changes.NewGranules.empty()) {
- LOG_S_ERROR("Cannot split granule " << changes.SrcGranule.Granule << " at tablet " << TabletId);
+ LOG_S_ERROR("Cannot split granule " << changes.SrcGranule->Granule << " at tablet " << TabletId);
return false;
}
}
@@ -1012,9 +1026,8 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
for (auto& [granule, p] : changes.NewGranules) {
ui64 pathId = p.first;
- ui64 ts = p.second;
- TString key((const char*)&ts, sizeof(ts));
- TGranuleRecord rec{pathId, key, granule, snapshot};
+ TMark mark = p.second;
+ TGranuleRecord rec(pathId, granule, snapshot, mark.Border);
if (!SetGranule(rec, apply)) {
LOG_S_ERROR("Cannot insert granule " << rec << " at tablet " << TabletId);
@@ -1038,15 +1051,15 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
return false;
}
- ui64 granuleMinTs = ExtractKey(Granules[granule]->Record.IndexKey);
+ auto& granuleStart = Granules[granule]->Record.Mark;
if (!apply) { // granule vs portion minPK
- auto pkScalar = portionInfo.PkStart();
- Y_VERIFY(pkScalar);
- ui64 portionMinTs = ExtractKey(*pkScalar);
- if (granuleMinTs > portionMinTs) {
- LOG_S_ERROR("Cannot update invalid portion " << portionInfo << " minTs: " << portionMinTs
- << " granule minTs: " << granuleMinTs << " at tablet " << TabletId);
+ auto portionStart = portionInfo.PkStart();
+ Y_VERIFY(portionStart);
+ if (TMark(portionStart) < TMark(granuleStart)) {
+ LOG_S_ERROR("Cannot update invalid portion " << portionInfo
+ << " start: " << portionStart->ToString()
+ << " granule start: " << granuleStart->ToString() << " at tablet " << TabletId);
return false;
}
}
@@ -1152,18 +1165,17 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
}
// granule vs portion minPK
- ui64 granuleMinTs = 0;
+ std::shared_ptr<arrow::Scalar> granuleStart;
if (Granules.count(granule)) {
- granuleMinTs = ExtractKey(Granules[granule]->Record.IndexKey);
+ granuleStart = Granules[granule]->Record.Mark;
} else {
- granuleMinTs = changes.NewGranules.find(granule)->second.second;
+ granuleStart = changes.NewGranules.find(granule)->second.second.Border;
}
- auto pkScalar = portionInfo.PkStart();
- Y_VERIFY(pkScalar);
- ui64 portionMinTs = ExtractKey(*pkScalar);
- if (granuleMinTs > portionMinTs) {
- LOG_S_ERROR("Cannot insert invalid portion " << portionInfo << " minTs: " << portionMinTs
- << " granule minTs: " << granuleMinTs << " at tablet " << TabletId);
+ auto portionStart = portionInfo.PkStart();
+ Y_VERIFY(portionStart);
+ if (TMark(portionStart) < TMark(granuleStart)) {
+ LOG_S_ERROR("Cannot insert invalid portion " << portionInfo << " start: " << portionStart->ToString()
+ << " granule start: " << granuleStart->ToString() << " at tablet " << TabletId);
return false;
}
}
@@ -1201,33 +1213,33 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
}
bool TColumnEngineForLogs::SetGranule(const TGranuleRecord& rec, bool apply) {
- ui64 ts = ExtractKey(rec.IndexKey);
+ TMark mark(rec.Mark);
if (!apply) {
if (Granules.count(rec.Granule)) {
return false;
}
- if (PathGranules.count(rec.PathId) && PathGranules[rec.PathId].count(ts)) {
+ if (PathGranules.count(rec.PathId) && PathGranules[rec.PathId].count(mark)) {
return false;
}
return true;
}
- PathGranules[rec.PathId].emplace(ts, rec.Granule);
+ PathGranules[rec.PathId].emplace(mark, rec.Granule);
auto& spg = Granules[rec.Granule];
Y_VERIFY(!spg);
spg = std::make_shared<TGranuleMeta>(rec);
return true; // It must return true if (apply == true)
}
-void TColumnEngineForLogs::EraseGranule(ui64 pathId, ui64 granule, ui64 ts) {
+void TColumnEngineForLogs::EraseGranule(ui64 pathId, ui64 granule, const TMark& mark) {
Y_VERIFY(PathGranules.count(pathId));
Y_VERIFY(Granules.count(granule));
Granules.erase(granule);
EmptyGranules.erase(granule);
- PathGranules[pathId].erase(ts);
+ PathGranules[pathId].erase(mark);
}
bool TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, bool apply, bool updateStats) {
@@ -1362,21 +1374,21 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot
out->Granules.reserve(pathGranules.size());
// TODO: out.Portions.reserve()
- ui64 tsFrom = ExtractTimestamp(from, GetIndexKey());
- ui64 tsTo = ExtractTimestamp(to, GetIndexKey());
+ auto keyFrom = ExtractFirstKey(from, GetIndexKey());
+ auto keyTo = ExtractFirstKey(to, GetIndexKey());
// Apply FROM
auto it = pathGranules.begin();
- if (tsFrom) {
- it = pathGranules.upper_bound(tsFrom);
+ if (keyFrom) {
+ it = pathGranules.upper_bound(TMark(keyFrom));
--it;
}
for (; it != pathGranules.end(); ++it) {
- ui64 ts = it->first;
+ auto& mark = it->first;
ui64 granule = it->second;
// Apply TO
- if (to && ts > tsTo) {
+ if (keyTo && TMark(keyTo) < mark) {
break;
}
@@ -1596,29 +1608,31 @@ static TVector<TString> CompactInGranule(const TIndexInfo& indexInfo,
/// @return vec({ts, batch}). ts0 <= ts1 <= ... <= tsN
/// @note We use ts from PK for split but there could be lots PK with the same ts.
-static TVector<std::pair<ui64, std::shared_ptr<arrow::RecordBatch>>>
-SliceGranuleBatches(const TIndexInfo& indexInfo, const TColumnEngineForLogs::TChanges& changes,
- std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches, i64 ts0) {
- TVector<std::pair<ui64, std::shared_ptr<arrow::RecordBatch>>> out;
+static TVector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>>
+SliceGranuleBatches(const TIndexInfo& indexInfo,
+ const TColumnEngineForLogs::TChanges& changes,
+ std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches,
+ const TMark& ts0) {
+ TVector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>> out;
// Extract unique effective key (timestamp) and their counts
i64 numRows = 0;
- TMap<ui64, ui32> uniqKeyCount;
+ TMap<TMark, ui32> uniqKeyCount;
for (auto& batch : batches) {
numRows += batch->num_rows();
- std::shared_ptr<arrow::TimestampArray> keyColumn = GetTimestampColumn(indexInfo, batch);
+ auto keyColumn = GetFirstPKColumn(indexInfo, batch);
Y_VERIFY(keyColumn && keyColumn->length() > 0);
for (int pos = 0; pos < keyColumn->length(); ++pos) {
- ui64 ts = keyColumn->Value(pos);
+ TMark ts(*keyColumn->GetScalar(pos));
++uniqKeyCount[ts];
}
}
Y_VERIFY(uniqKeyCount.size());
- i64 minTs = uniqKeyCount.begin()->first;
- i64 maxTs = uniqKeyCount.rbegin()->first;
+ auto minTs = uniqKeyCount.begin()->first;
+ auto maxTs = uniqKeyCount.rbegin()->first;
Y_VERIFY(minTs >= ts0);
// It's an estimation of needed count cause numRows calculated before key replaces
@@ -1638,7 +1652,7 @@ SliceGranuleBatches(const TIndexInfo& indexInfo, const TColumnEngineForLogs::TCh
}
// Make split borders from uniq keys
- TVector<i64> borders;
+ TVector<TMark> borders;
borders.reserve(numRows / rowsInGranule);
{
ui32 sumRows = 0;
@@ -1662,15 +1676,12 @@ SliceGranuleBatches(const TIndexInfo& indexInfo, const TColumnEngineForLogs::TCh
auto& batchOffsets = offsets[i];
batchOffsets.reserve(borders.size() + 1);
- std::shared_ptr<arrow::TimestampArray> keyColumn = GetTimestampColumn(indexInfo, batch);
+ auto keyColumn = GetFirstPKColumn(indexInfo, batch);
Y_VERIFY(keyColumn && keyColumn->length() > 0);
batchOffsets.push_back(0);
- for (i64 border : borders) {
- const auto* start = keyColumn->raw_values() + batchOffsets.back();
- const auto* end = keyColumn->raw_values() + keyColumn->length();
- const auto* borderPos = std::lower_bound(start, end, border);
- int offset = borderPos - keyColumn->raw_values();
+ for (auto& border : borders) {
+ int offset = NArrow::LowerBound(keyColumn, *border.Border, batchOffsets.back());
Y_VERIFY(offset >= batchOffsets.back());
batchOffsets.push_back(offset);
}
@@ -1702,16 +1713,17 @@ SliceGranuleBatches(const TIndexInfo& indexInfo, const TColumnEngineForLogs::TCh
Y_VERIFY(slice->num_rows());
granuleNumRows += slice->num_rows();
#if 1 // Check correctness
- std::shared_ptr<arrow::TimestampArray> keyColumn = GetTimestampColumn(indexInfo, slice);
+ auto keyColumn = GetFirstPKColumn(indexInfo, slice);
Y_VERIFY(keyColumn && keyColumn->length() > 0);
- i64 startKey = granuleNo ? borders[granuleNo - 1] : minTs;
- Y_VERIFY(keyColumn->Value(0) >= startKey);
+ auto startKey = granuleNo ? borders[granuleNo - 1] : minTs;
+ Y_VERIFY(TMark(*keyColumn->GetScalar(0)) >= startKey);
+
if (granuleNo < borders.size() - 1) {
- i64 endKey = borders[granuleNo];
- Y_VERIFY(keyColumn->Value(keyColumn->length() - 1) < endKey);
+ auto endKey = borders[granuleNo];
+ Y_VERIFY(TMark(*keyColumn->GetScalar(keyColumn->length() - 1)) < endKey);
} else {
- Y_VERIFY(keyColumn->Value(keyColumn->length() - 1) <= maxTs);
+ Y_VERIFY(TMark(*keyColumn->GetScalar(keyColumn->length() - 1)) <= maxTs);
}
#endif
Y_VERIFY_DEBUG(NArrow::IsSorted(slice, indexInfo.GetReplaceKey()));
@@ -1727,14 +1739,14 @@ SliceGranuleBatches(const TIndexInfo& indexInfo, const TColumnEngineForLogs::TCh
for (auto& batch : merged) {
Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey()));
- i64 startKey = ts0;
+ auto startKey = ts0;
if (granuleNo) {
startKey = borders[granuleNo - 1];
}
#if 1 // Check correctness
- std::shared_ptr<arrow::TimestampArray> keyColumn = GetTimestampColumn(indexInfo, batch);
+ auto keyColumn = GetFirstPKColumn(indexInfo, batch);
Y_VERIFY(keyColumn && keyColumn->length() > 0);
- Y_VERIFY(keyColumn->Value(0) >= startKey);
+ Y_VERIFY(TMark(*keyColumn->GetScalar(0)) >= startKey);
#endif
out.emplace_back(startKey, batch);
}
@@ -1743,8 +1755,10 @@ SliceGranuleBatches(const TIndexInfo& indexInfo, const TColumnEngineForLogs::TCh
return out;
}
-static ui64 TryMovePortions(TVector<TPortionInfo>& portions, TMap<ui64, ui64>& tsIds,
- TVector<std::pair<TPortionInfo, ui64>>& toMove, i64 ts0) {
+static ui64 TryMovePortions(TVector<TPortionInfo>& portions,
+ TMap<TMark, ui64>& tsIds,
+ TVector<std::pair<TPortionInfo, ui64>>& toMove,
+ const TMark& ts0) {
std::vector<const TPortionInfo*> compacted;
compacted.reserve(portions.size());
std::vector<const TPortionInfo*> inserted;
@@ -1776,9 +1790,9 @@ static ui64 TryMovePortions(TVector<TPortionInfo>& portions, TMap<ui64, ui64>& t
ui64 numRows = 0;
ui32 counter = 0;
for (auto* portionInfo : compacted) {
- ui64 ts = ts0;
+ TMark ts = ts0;
if (counter) {
- ts = static_cast<const arrow::TimestampScalar&>(*portionInfo->PkStart()).value;
+ ts = TMark(portionInfo->PkStart());
}
ui32 rows = portionInfo->NumRows();
@@ -1801,12 +1815,12 @@ static ui64 TryMovePortions(TVector<TPortionInfo>& portions, TMap<ui64, ui64>& t
static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo,
std::shared_ptr<TColumnEngineForLogs::TChanges> changes) {
- ui64 pathId = changes->SrcGranule.PathId;
- //ui64 granule = changes->SrcGranule.Granule;
- ui64 ts0 = changes->SrcGranule.Ts;
+ ui64 pathId = changes->SrcGranule->PathId;
+ //ui64 granule = changes->SrcGranule->Granule;
+ TMark ts0 = changes->SrcGranule->Mark;
TVector<TPortionInfo>& portions = changes->SwitchedPortions;
- TMap<ui64, ui64> tsIds;
+ TMap<TMark, ui64> tsIds;
ui64 movedRows = TryMovePortions(portions, tsIds, changes->PortionsToMove, ts0);
auto srcBatches = PortionsToBatches(indexInfo, portions, changes->Blobs, (bool)movedRows);
Y_VERIFY(srcBatches.size() == portions.size());
@@ -1829,8 +1843,8 @@ static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo,
ui32 rowsInGranule = numRows / numSplitInto;
Y_VERIFY(rowsInGranule);
- TMap<ui64, ui64> newTsIds;
- ui64 lastTs = tsIds.rbegin()->first;
+ TMap<TMark, ui64> newTsIds;
+ auto lastTs = tsIds.rbegin()->first;
ui32 tmpGranule = 0;
ui32 sumRows = 0;
ui32 i = 0;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 133a62ddf16..a09710ad213 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -1,6 +1,7 @@
#pragma once
#include "defs.h"
#include "column_engine.h"
+#include "scalars.h"
namespace NKikimr::NArrow {
struct TSortDescription;
@@ -18,39 +19,127 @@ class TCountersTable;
/// - Columns: granule -> blobs
class TColumnEngineForLogs : public IColumnEngine {
public:
+ struct TMark {
+ std::shared_ptr<arrow::Scalar> Border;
+
+ explicit TMark(const std::shared_ptr<arrow::Scalar>& s)
+ : Border(s)
+ {
+ Y_VERIFY_DEBUG(NArrow::IsGoodScalar(Border));
+ }
+
+ explicit TMark(const std::shared_ptr<arrow::DataType>& type)
+ : Border(MinScalar(type))
+ {
+ Y_VERIFY_DEBUG(NArrow::IsGoodScalar(Border));
+ }
+
+ TMark(const TString& key, const std::shared_ptr<arrow::DataType>& type) {
+ Deserialize(key, type);
+ Y_VERIFY_DEBUG(NArrow::IsGoodScalar(Border));
+ }
+
+ TMark(const TMark& m) = default;
+ TMark& operator = (const TMark& m) = default;
+
+ bool operator == (const TMark& m) const {
+ Y_VERIFY(Border);
+ Y_VERIFY(m.Border);
+ return Border->Equals(*m.Border);
+ }
+
+ bool operator < (const TMark& m) const {
+ Y_VERIFY(Border);
+ Y_VERIFY(m.Border);
+ return NArrow::ScalarLess(*Border, *m.Border);
+ }
+
+ bool operator <= (const TMark& m) const {
+ Y_VERIFY(Border);
+ Y_VERIFY(m.Border);
+ return Border->Equals(*m.Border) || NArrow::ScalarLess(*Border, *m.Border);
+ }
+
+ bool operator > (const TMark& m) const {
+ return !(*this <= m);
+ }
+
+ bool operator >= (const TMark& m) const {
+ return !(*this < m);
+ }
+
+ ui64 Hash() const {
+ return Border->hash();
+ }
+
+ operator size_t () const {
+ return Hash();
+ }
+
+ operator bool () const {
+ Y_VERIFY(false);
+ }
+
+ TString Serialize() const {
+ return SerializeKeyScalar(Border);
+ }
+
+ void Deserialize(const TString& key, const std::shared_ptr<arrow::DataType>& type) {
+ Border = DeserializeKeyScalar(key, type);
+ }
+
+ static std::shared_ptr<arrow::Scalar> MinScalar(const std::shared_ptr<arrow::DataType>& type) {
+ if (type->id() == arrow::Type::TIMESTAMP) {
+ // TODO: support negative timestamps in index
+ return std::make_shared<arrow::TimestampScalar>(0, type);
+ }
+ return NArrow::MinScalar(type);
+ }
+ };
+
class TChanges : public TColumnEngineChanges {
public:
struct TSrcGranule {
ui64 PathId{0};
ui64 Granule{0};
- ui64 Ts{0};
+ TMark Mark;
- bool Good() const { return PathId && Granule; }
+ TSrcGranule(ui64 pathId, ui64 granule, const TMark& mark)
+ : PathId(pathId), Granule(granule), Mark(mark)
+ {}
};
- TChanges(TVector<NOlap::TInsertedData>&& blobsToIndex, const TCompactionLimits& limits)
+ TChanges(const TColumnEngineForLogs& engine,
+ TVector<NOlap::TInsertedData>&& blobsToIndex, const TCompactionLimits& limits)
: TColumnEngineChanges(TColumnEngineChanges::INSERT)
+ , DefaultMark(engine.GetMarkType())
{
Limits = limits;
DataToIndex = std::move(blobsToIndex);
}
- TChanges(std::unique_ptr<TCompactionInfo>&& info, const TCompactionLimits& limits)
+ TChanges(const TColumnEngineForLogs& engine,
+ std::unique_ptr<TCompactionInfo>&& info, const TCompactionLimits& limits)
: TColumnEngineChanges(TColumnEngineChanges::COMPACTION)
+ , DefaultMark(engine.GetMarkType())
{
Limits = limits;
CompactionInfo = std::move(info);
}
- TChanges(const TSnapshot& snapshot, const TCompactionLimits& limits)
+ TChanges(const TColumnEngineForLogs& engine,
+ const TSnapshot& snapshot, const TCompactionLimits& limits)
: TColumnEngineChanges(TColumnEngineChanges::CLEANUP)
+ , DefaultMark(engine.GetMarkType())
{
Limits = limits;
InitSnapshot = snapshot;
}
- TChanges(TColumnEngineChanges::EType type, const TSnapshot& applySnapshot)
+ TChanges(const TColumnEngineForLogs& engine,
+ TColumnEngineChanges::EType type, const TSnapshot& applySnapshot)
: TColumnEngineChanges(type)
+ , DefaultMark(engine.GetMarkType())
{
ApplySnapshot = applySnapshot;
}
@@ -64,31 +153,31 @@ public:
ui64 granule = FirstGranuleId;
++FirstGranuleId;
--ReservedGranuleIds;
- ui64 ts = 0;
- NewGranules.emplace(granule, std::make_pair(pathId, ts));
- PathToGranule[pathId].emplace_back(ts, granule);
+
+ NewGranules.emplace(granule, std::make_pair(pathId, DefaultMark));
+ PathToGranule[pathId].emplace_back(DefaultMark, granule);
return true;
}
- ui64 SetTmpGranule(ui64 pathId, ui64 ts) {
- Y_VERIFY(pathId == SrcGranule.PathId);
- if (!TmpGranuleIds.count(ts)) {
- TmpGranuleIds[ts] = FirstGranuleId;
+ ui64 SetTmpGranule(ui64 pathId, const TMark& mark) {
+ Y_VERIFY(pathId == SrcGranule->PathId);
+ if (!TmpGranuleIds.count(mark)) {
+ TmpGranuleIds[mark] = FirstGranuleId;
++FirstGranuleId;
}
- return TmpGranuleIds[ts];
+ return TmpGranuleIds[mark];
}
THashMap<ui64, ui64> TmpToNewGranules(ui64 start) {
THashMap<ui64, ui64> granuleRemap;
- for (auto& [ts, counter] : TmpGranuleIds) {
+ for (auto& [mark, counter] : TmpGranuleIds) {
ui64 granule = start + counter;
- if (ts == SrcGranule.Ts) {
+ if (mark == SrcGranule->Mark) {
Y_VERIFY(!counter);
- granule = SrcGranule.Granule;
+ granule = SrcGranule->Granule;
} else {
Y_VERIFY(counter);
- NewGranules[granule] = {SrcGranule.PathId, ts};
+ NewGranules.emplace(granule, std::make_pair(SrcGranule->PathId, mark));
}
granuleRemap[counter] = granule;
}
@@ -105,11 +194,12 @@ public:
return numSplitInto;
}
- THashMap<ui64, std::vector<std::pair<ui64, ui64>>> PathToGranule; // pathId -> {timestamp, granule}
- TSrcGranule SrcGranule;
- THashMap<ui64, std::pair<ui64, ui64>> NewGranules; // granule -> {pathId, key}
- THashMap<ui64, ui32> TmpGranuleIds; // ts -> tmp granule id
- TMap<ui64, ui64> MergeBorders;
+ TMark DefaultMark;
+ THashMap<ui64, std::vector<std::pair<TMark, ui64>>> PathToGranule; // pathId -> {mark, granule}
+ std::optional<TSrcGranule> SrcGranule;
+ THashMap<ui64, std::pair<ui64, TMark>> NewGranules; // granule -> {pathId, key}
+ THashMap<TMark, ui32> TmpGranuleIds; // mark -> tmp granule id
+ TMap<TMark, ui64> MergeBorders;
ui64 FirstGranuleId{0};
ui32 ReservedGranuleIds{0};
};
@@ -138,6 +228,19 @@ public:
bool HasOverloadedGranules() const override { return !PathsGranulesOverloaded.empty(); }
+ TString SerializeMark(const std::shared_ptr<arrow::Scalar>& scalar) const override {
+ Y_VERIFY_S(scalar->type->Equals(MarkType), scalar->type->ToString() + ", expected " + MarkType->ToString());
+ return TMark(scalar).Serialize();
+ }
+
+ std::shared_ptr<arrow::Scalar> DeserializeMark(const TString& key) const override {
+ return TMark(key, MarkType).Border;
+ }
+
+ const std::shared_ptr<arrow::DataType>& GetMarkType() const {
+ return MarkType;
+ }
+
bool Load(IDbWrapper& db, const THashSet<ui64>& pathsToDrop = {}) override;
std::shared_ptr<TColumnEngineChanges> StartInsert(TVector<TInsertedData>&& dataToIndex) override;
std::shared_ptr<TColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo,
@@ -172,15 +275,6 @@ public:
/// @note called from EvictionActor
static TVector<TString> EvictBlobs(const TIndexInfo& indexInfo, std::shared_ptr<TColumnEngineChanges> changes);
- static ui64 ExtractKey(const TString& key) {
- Y_VERIFY(key.size() == 8);
- return *reinterpret_cast<const ui64*>(key.data());
- }
-
- static ui64 ExtractKey(const arrow::Scalar& scalar) {
- return static_cast<const arrow::TimestampScalar&>(scalar).value;
- }
-
private:
struct TGranuleMeta {
TGranuleRecord Record;
@@ -206,11 +300,12 @@ private:
TIndexInfo IndexInfo;
TCompactionLimits Limits;
ui64 TabletId;
+ std::shared_ptr<arrow::DataType> MarkType;
std::shared_ptr<TGranulesTable> GranulesTable;
std::shared_ptr<TColumnsTable> ColumnsTable;
std::shared_ptr<TCountersTable> CountersTable;
THashMap<ui64, std::shared_ptr<TGranuleMeta>> Granules; // granule -> meta
- THashMap<ui64, TMap<ui64, ui64>> PathGranules; // path_id -> {timestamp, granule}
+ THashMap<ui64, TMap<TMark, ui64>> PathGranules; // path_id -> {mark, granule}
TMap<ui64, std::shared_ptr<TColumnEngineStats>> PathStats; // per path_id stats sorted by path_id
THashSet<ui64> GranulesInSplit;
THashSet<ui64> EmptyGranules;
@@ -243,7 +338,7 @@ private:
bool LoadCounters(IDbWrapper& db);
bool ApplyChanges(IDbWrapper& db, const TChanges& changes, const TSnapshot& snapshot, bool apply);
- void EraseGranule(ui64 pathId, ui64 granule, ui64 ts);
+ void EraseGranule(ui64 pathId, ui64 granule, const TMark& mark);
bool SetGranule(const TGranuleRecord& rec, bool apply);
bool UpsertPortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true);
bool ErasePortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true);
@@ -256,17 +351,19 @@ private:
TMap<TSnapshot, TVector<ui64>> GetOrderedPortions(ui64 granule, const TSnapshot& snapshot = TSnapshot::Max()) const;
void UpdateOverloaded(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules);
- TVector<TVector<std::pair<ui64, ui64>>> EmptyGranuleTracks(ui64 pathId) const;
+ TVector<TVector<std::pair<TMark, ui64>>> EmptyGranuleTracks(ui64 pathId) const;
};
-std::shared_ptr<arrow::TimestampArray> GetTimestampColumn(const TIndexInfo& indexInfo,
- const std::shared_ptr<arrow::RecordBatch>& batch);
+std::shared_ptr<arrow::Array> GetFirstPKColumn(const TIndexInfo& indexInfo,
+ const std::shared_ptr<arrow::RecordBatch>& batch);
THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>
-SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const TMap<ui64, ui64>& tsGranules,
+SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch,
+ const TMap<TColumnEngineForLogs::TMark, ui64>& tsGranules,
const TIndexInfo& indexInfo);
THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>
-SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::pair<ui64, ui64>>& tsGranules,
+SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch,
+ const std::vector<std::pair<TColumnEngineForLogs::TMark, ui64>>& tsGranules,
const TIndexInfo& indexInfo);
}
diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
index 89352303602..a93981f4f85 100644
--- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp
+++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
@@ -42,19 +42,19 @@ bool TDbWrapper::Load(THashMap<TWriteId, TInsertedData>& inserted,
return NColumnShard::Schema::InsertTable_Load(db, DsGroupSelector, inserted, committed, aborted, loadTime);
}
-void TDbWrapper::WriteGranule(ui32 index, const TGranuleRecord& row) {
+void TDbWrapper::WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) {
NIceDb::TNiceDb db(Database);
- NColumnShard::Schema::IndexGranules_Write(db, index, row);
+ NColumnShard::Schema::IndexGranules_Write(db, index, engine, row);
}
-void TDbWrapper::EraseGranule(ui32 index, const TGranuleRecord& row) {
+void TDbWrapper::EraseGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) {
NIceDb::TNiceDb db(Database);
- NColumnShard::Schema::IndexGranules_Erase(db, index, row);
+ NColumnShard::Schema::IndexGranules_Erase(db, index, engine, row);
}
-bool TDbWrapper::LoadGranules(ui32 index, std::function<void(TGranuleRecord&&)> callback) {
+bool TDbWrapper::LoadGranules(ui32 index, const IColumnEngine& engine, std::function<void(TGranuleRecord&&)> callback) {
NIceDb::TNiceDb db(Database);
- return NColumnShard::Schema::IndexGranules_Load(db, index, callback);
+ return NColumnShard::Schema::IndexGranules_Load(db, index, engine, callback);
}
void TDbWrapper::WriteColumn(ui32 index, const TColumnRecord& row) {
diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h
index 0b117c291b7..632201ea998 100644
--- a/ydb/core/tx/columnshard/engines/db_wrapper.h
+++ b/ydb/core/tx/columnshard/engines/db_wrapper.h
@@ -10,6 +10,7 @@ namespace NKikimr::NOlap {
struct TInsertedData;
struct TColumnRecord;
struct TGranuleRecord;
+class IColumnEngine;
class IDbWrapper {
public:
@@ -27,9 +28,9 @@ public:
THashMap<TWriteId, TInsertedData>& aborted,
const TInstant& loadTime) = 0;
- virtual void WriteGranule(ui32 index, const TGranuleRecord& row) = 0;
- virtual void EraseGranule(ui32 index, const TGranuleRecord& row) = 0;
- virtual bool LoadGranules(ui32 index, std::function<void(TGranuleRecord&&)> callback) = 0;
+ virtual void WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) = 0;
+ virtual void EraseGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) = 0;
+ virtual bool LoadGranules(ui32 index, const IColumnEngine& engine, std::function<void(TGranuleRecord&&)> callback) = 0;
virtual void WriteColumn(ui32 index, const TColumnRecord& row) = 0;
virtual void EraseColumn(ui32 index, const TColumnRecord& row) = 0;
@@ -58,9 +59,9 @@ public:
THashMap<TWriteId, TInsertedData>& aborted,
const TInstant& loadTime) override;
- void WriteGranule(ui32 index, const TGranuleRecord& row) override;
- void EraseGranule(ui32 index, const TGranuleRecord& row) override;
- bool LoadGranules(ui32 index, std::function<void(TGranuleRecord&&)> callback) override;
+ void WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) override;
+ void EraseGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) override;
+ bool LoadGranules(ui32 index, const IColumnEngine& engine, std::function<void(TGranuleRecord&&)> callback) override;
void WriteColumn(ui32 index, const TColumnRecord& row) override;
void EraseColumn(ui32 index, const TColumnRecord& row) override;
diff --git a/ydb/core/tx/columnshard/engines/granules_table.h b/ydb/core/tx/columnshard/engines/granules_table.h
index 1d955fdf6c1..8940a7d3ce5 100644
--- a/ydb/core/tx/columnshard/engines/granules_table.h
+++ b/ydb/core/tx/columnshard/engines/granules_table.h
@@ -5,12 +5,21 @@ namespace NKikimr::NOlap {
struct TGranuleRecord {
ui64 PathId;
- TString IndexKey;
ui64 Granule;
TSnapshot CreatedAt;
+ std::shared_ptr<arrow::Scalar> Mark;
+
+ TGranuleRecord(ui64 pathId, ui64 granule, const TSnapshot& createdAt, const std::shared_ptr<arrow::Scalar>& mark)
+ : PathId(pathId)
+ , Granule(granule)
+ , CreatedAt(createdAt)
+ , Mark(mark)
+ {
+ Y_VERIFY(Mark);
+ }
bool operator == (const TGranuleRecord& rec) const {
- return (PathId == rec.PathId) && (IndexKey == rec.IndexKey);
+ return (PathId == rec.PathId) && (Mark->Equals(*rec.Mark));
}
friend IOutputStream& operator << (IOutputStream& out, const TGranuleRecord& rec) {
@@ -25,23 +34,25 @@ struct TGranuleRecord {
class TGranulesTable {
public:
- TGranulesTable(ui32 indexId)
- : IndexId(indexId)
+ TGranulesTable(const IColumnEngine& engine, ui32 indexId)
+ : Engine(engine)
+ , IndexId(indexId)
{}
void Write(IDbWrapper& db, const TGranuleRecord& row) {
- db.WriteGranule(IndexId, row);
+ db.WriteGranule(IndexId, Engine, row);
}
void Erase(IDbWrapper& db, const TGranuleRecord& row) {
- db.EraseGranule(IndexId, row);
+ db.EraseGranule(IndexId, Engine, row);
}
bool Load(IDbWrapper& db, std::function<void(TGranuleRecord&&)> callback) {
- return db.LoadGranules(IndexId, callback);
+ return db.LoadGranules(IndexId, Engine, callback);
}
private:
+ const IColumnEngine& Engine;
ui32 IndexId;
};
diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp
index e7400f255ab..45c601d7c0d 100644
--- a/ydb/core/tx/columnshard/engines/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/index_info.cpp
@@ -10,82 +10,6 @@ namespace NKikimr::NOlap {
const TString TIndexInfo::STORE_INDEX_STATS_TABLE = TString("/") + NSysView::SysPathName + "/" + NSysView::StorePrimaryIndexStatsName;
const TString TIndexInfo::TABLE_INDEX_STATS_TABLE = TString("/") + NSysView::SysPathName + "/" + NSysView::TablePrimaryIndexStatsName;
-void ScalarToConstant(const arrow::Scalar& scalar, NKikimrSSA::TProgram_TConstant& value) {
- switch (scalar.type->id()) {
- case arrow::Type::BOOL:
- value.SetBool(static_cast<const arrow::BooleanScalar&>(scalar).value);
- break;
- case arrow::Type::UINT8:
- value.SetUint32(static_cast<const arrow::UInt8Scalar&>(scalar).value);
- break;
- case arrow::Type::UINT16:
- value.SetUint32(static_cast<const arrow::UInt16Scalar&>(scalar).value);
- break;
- case arrow::Type::UINT32:
- value.SetUint32(static_cast<const arrow::UInt32Scalar&>(scalar).value);
- break;
- case arrow::Type::UINT64:
- value.SetUint64(static_cast<const arrow::UInt64Scalar&>(scalar).value);
- break;
- case arrow::Type::INT8:
- value.SetInt32(static_cast<const arrow::Int8Scalar&>(scalar).value);
- break;
- case arrow::Type::INT16:
- value.SetInt32(static_cast<const arrow::Int16Scalar&>(scalar).value);
- break;
- case arrow::Type::INT32:
- value.SetInt32(static_cast<const arrow::Int32Scalar&>(scalar).value);
- break;
- case arrow::Type::INT64:
- value.SetInt64(static_cast<const arrow::Int64Scalar&>(scalar).value);
- break;
- case arrow::Type::FLOAT:
- value.SetFloat(static_cast<const arrow::FloatScalar&>(scalar).value);
- break;
- case arrow::Type::DOUBLE:
- value.SetDouble(static_cast<const arrow::DoubleScalar&>(scalar).value);
- break;
- case arrow::Type::TIMESTAMP:
- value.SetUint64(static_cast<const arrow::TimestampScalar&>(scalar).value);
- break;
- default:
- Y_VERIFY(false, "Some types are not supported in min-max index yet"); // TODO
- }
-}
-
-std::shared_ptr<arrow::Scalar> ConstantToScalar(const NKikimrSSA::TProgram_TConstant& value,
- const std::shared_ptr<arrow::DataType>& type) {
- switch (type->id()) {
- case arrow::Type::BOOL:
- return std::make_shared<arrow::BooleanScalar>(value.GetBool());
- case arrow::Type::UINT8:
- return std::make_shared<arrow::UInt8Scalar>(value.GetUint32());
- case arrow::Type::UINT16:
- return std::make_shared<arrow::UInt16Scalar>(value.GetUint32());
- case arrow::Type::UINT32:
- return std::make_shared<arrow::UInt32Scalar>(value.GetUint32());
- case arrow::Type::UINT64:
- return std::make_shared<arrow::UInt64Scalar>(value.GetUint64());
- case arrow::Type::INT8:
- return std::make_shared<arrow::Int8Scalar>(value.GetInt32());
- case arrow::Type::INT16:
- return std::make_shared<arrow::Int16Scalar>(value.GetInt32());
- case arrow::Type::INT32:
- return std::make_shared<arrow::Int32Scalar>(value.GetInt32());
- case arrow::Type::INT64:
- return std::make_shared<arrow::Int64Scalar>(value.GetInt64());
- case arrow::Type::FLOAT:
- return std::make_shared<arrow::FloatScalar>(value.GetFloat());
- case arrow::Type::DOUBLE:
- return std::make_shared<arrow::DoubleScalar>(value.GetDouble());
- case arrow::Type::TIMESTAMP:
- return std::make_shared<arrow::TimestampScalar>(value.GetUint64(), type);
- default:
- Y_VERIFY(false, "Some types are not supported in min-max index yet"); // TODO
- }
- return {};
-}
-
TVector<TRawTypeValue> TIndexInfo::ExtractKey(const THashMap<ui32, TCell>& fields, bool allowNulls) const {
TVector<TRawTypeValue> key;
key.reserve(KeyColumns.size());
@@ -107,15 +31,16 @@ TVector<TRawTypeValue> TIndexInfo::ExtractKey(const THashMap<ui32, TCell>& field
std::shared_ptr<arrow::RecordBatch> TIndexInfo::PrepareForInsert(const TString& data, const TString& metadata,
TString& strError) const {
std::shared_ptr<arrow::Schema> schema = ArrowSchema();
+ std::shared_ptr<arrow::Schema> differentSchema;
if (metadata.size()) {
- schema = NArrow::DeserializeSchema(metadata);
- if (!schema) {
+ differentSchema = NArrow::DeserializeSchema(metadata);
+ if (!differentSchema) {
strError = "DeserializeSchema() failed";
return {};
}
}
- auto batch = NArrow::DeserializeBatch(data, schema);
+ auto batch = NArrow::DeserializeBatch(data, (differentSchema ? differentSchema : schema));
if (!batch) {
strError = "DeserializeBatch() failed";
return {};
@@ -124,23 +49,9 @@ std::shared_ptr<arrow::RecordBatch> TIndexInfo::PrepareForInsert(const TString&
strError = "empty batch";
return {};
}
- auto status = batch->ValidateFull();
- if (!status.ok()) {
- auto tmp = status.ToString();
- strError = TString(tmp.data(), tmp.size());
- return {};
- }
-
- // Require all the columns for now. It's possible to ommit some in future.
- for (auto& field : schema->fields()) {
- if (!batch->GetColumnByName(field->name())) {
- strError = "missing column '" + field->name() + "'";
- return {};
- }
- }
// Correct schema
- if (metadata.size()) {
+ if (differentSchema) {
batch = NArrow::ExtractColumns(batch, ArrowSchema());
if (!batch) {
strError = "cannot correct schema";
@@ -148,6 +59,11 @@ std::shared_ptr<arrow::RecordBatch> TIndexInfo::PrepareForInsert(const TString&
}
}
+ if (!batch->schema()->Equals(ArrowSchema())) {
+ strError = "unexpected schema for insert batch: '" + batch->schema()->ToString() + "'";
+ return {};
+ }
+
// Check PK is NOT NULL
for (auto& field : SortingKey->fields()) {
auto column = batch->GetColumnByName(field->name());
@@ -161,6 +77,13 @@ std::shared_ptr<arrow::RecordBatch> TIndexInfo::PrepareForInsert(const TString&
}
}
+ auto status = batch->ValidateFull();
+ if (!status.ok()) {
+ auto tmp = status.ToString();
+ strError = TString(tmp.data(), tmp.size());
+ return {};
+ }
+
Y_VERIFY(SortingKey);
batch = NArrow::SortBatch(batch, SortingKey);
Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortingKey));
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index 7484a05c9a9..218c8dfbbf0 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -1,7 +1,7 @@
#pragma once
#include "defs.h"
+#include "scalars.h"
#include <ydb/core/tablet_flat/flat_dbase_scheme.h>
-#include <ydb/core/protos/tx_columnshard.pb.h>
#include <ydb/core/sys_view/common/schema.h>
namespace arrow {
@@ -16,10 +16,6 @@ namespace NKikimr::NArrow {
namespace NKikimr::NOlap {
-void ScalarToConstant(const arrow::Scalar& scalar, NKikimrSSA::TProgram_TConstant& value);
-std::shared_ptr<arrow::Scalar> ConstantToScalar(const NKikimrSSA::TProgram_TConstant& value,
- const std::shared_ptr<arrow::DataType>& type);
-
template <typename T>
static std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const T& ids) {
std::vector<std::shared_ptr<arrow::Field>> fields;
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 4b98d5329cc..7f1594d9a97 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -12,6 +12,8 @@ namespace NKikimr::NOlap {
namespace {
+using TMark = TColumnEngineForLogs::TMark;
+
// Slices a batch into smaller batches and appends them to result vector (which might be non-empty already)
void SliceBatch(const std::shared_ptr<arrow::RecordBatch>& batch,
const int64_t maxRowsInBatch,
@@ -61,14 +63,14 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v
TVector<TVector<std::shared_ptr<arrow::RecordBatch>>> rangesSlices; // rangesSlices[rangeNo][sliceNo]
rangesSlices.reserve(batches.size());
{
- TMap<ui64, TVector<std::shared_ptr<arrow::RecordBatch>>> points;
+ TMap<TMark, TVector<std::shared_ptr<arrow::RecordBatch>>> points;
for (auto& batch : batches) {
- std::shared_ptr<arrow::TimestampArray> keyColumn = GetTimestampColumn(indexInfo, batch);
+ std::shared_ptr<arrow::Array> keyColumn = GetFirstPKColumn(indexInfo, batch);
Y_VERIFY(keyColumn && keyColumn->length() > 0);
- ui64 min = keyColumn->Value(0);
- ui64 max = keyColumn->Value(keyColumn->length() - 1);
+ TMark min(*keyColumn->GetScalar(0));
+ TMark max(*keyColumn->GetScalar(keyColumn->length() - 1));
points[min].push_back(batch); // insert start
points[max].push_back({}); // insert end
@@ -146,6 +148,7 @@ THashMap<TBlobRange, ui64> TIndexedReadData::InitRead(ui32 inputBatch, bool inGr
Y_VERIFY(ReadMetadata->LoadSchema);
Y_VERIFY(ReadMetadata->ResultSchema);
Y_VERIFY(IndexInfo().GetSortingKey());
+ Y_VERIFY(IndexInfo().GetIndexKey() && IndexInfo().GetIndexKey()->num_fields());
SortReplaceDescription = IndexInfo().SortReplaceDescription();
@@ -203,14 +206,14 @@ THashMap<TBlobRange, ui64> TIndexedReadData::InitRead(ui32 inputBatch, bool inGr
// Init split by granules structs
for (auto& rec : ReadMetadata->SelectInfo->Granules) {
- Y_VERIFY(rec.IndexKey.size() == 8);
- ui64 ts = TColumnEngineForLogs::ExtractKey(rec.IndexKey); // TODO: support other engines
- TsGranules.emplace(ts, rec.Granule);
+ TsGranules.emplace(rec.Mark, rec.Granule);
}
- if (!TsGranules.count(0)) {
+
+ TMark minMark(IndexInfo().GetIndexKey()->field(0)->type());
+ if (!TsGranules.count(minMark)) {
// committed data before the first granule would be placed in fake (0,0) granule
// committed data after the last granule would be placed into the last granule (or here if none)
- TsGranules.emplace(0, 0);
+ TsGranules.emplace(minMark, 0);
}
auto& stats = ReadMetadata->ReadStats;
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index ecb594caa11..a080e42c33a 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -1,6 +1,7 @@
#pragma once
#include "defs.h"
#include "column_engine.h"
+#include "column_engine_logs.h" // for TColumnEngineForLogs::TMark
#include "predicate.h"
namespace NKikimr::NColumnShard {
@@ -228,7 +229,7 @@ private:
THashMap<ui64, ui64> PortionGranule; // portion -> granule
THashMap<ui64, ui32> GranuleWaits; // granule -> num portions to wait
TDeque<ui64> GranulesOutOrder;
- TMap<ui64, ui64> TsGranules; // ts (key) -> granule
+ TMap<TColumnEngineForLogs::TMark, ui64> TsGranules; // ts (key) -> granule
THashSet<ui64> PortionsWithSelfDups;
std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription;
diff --git a/ydb/core/tx/columnshard/engines/scalars.cpp b/ydb/core/tx/columnshard/engines/scalars.cpp
new file mode 100644
index 00000000000..3b45e293649
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scalars.cpp
@@ -0,0 +1,229 @@
+#include "scalars.h"
+#include <ydb/core/util/yverify_stream.h>
+
+namespace NKikimr::NOlap {
+
+void ScalarToConstant(const arrow::Scalar& scalar, NKikimrSSA::TProgram_TConstant& value) {
+ switch (scalar.type->id()) {
+ case arrow::Type::BOOL:
+ value.SetBool(static_cast<const arrow::BooleanScalar&>(scalar).value);
+ break;
+ case arrow::Type::UINT8:
+ value.SetUint32(static_cast<const arrow::UInt8Scalar&>(scalar).value);
+ break;
+ case arrow::Type::UINT16:
+ value.SetUint32(static_cast<const arrow::UInt16Scalar&>(scalar).value);
+ break;
+ case arrow::Type::UINT32:
+ value.SetUint32(static_cast<const arrow::UInt32Scalar&>(scalar).value);
+ break;
+ case arrow::Type::UINT64:
+ value.SetUint64(static_cast<const arrow::UInt64Scalar&>(scalar).value);
+ break;
+ case arrow::Type::INT8:
+ value.SetInt32(static_cast<const arrow::Int8Scalar&>(scalar).value);
+ break;
+ case arrow::Type::INT16:
+ value.SetInt32(static_cast<const arrow::Int16Scalar&>(scalar).value);
+ break;
+ case arrow::Type::INT32:
+ value.SetInt32(static_cast<const arrow::Int32Scalar&>(scalar).value);
+ break;
+ case arrow::Type::INT64:
+ value.SetInt64(static_cast<const arrow::Int64Scalar&>(scalar).value);
+ break;
+ case arrow::Type::FLOAT:
+ value.SetFloat(static_cast<const arrow::FloatScalar&>(scalar).value);
+ break;
+ case arrow::Type::DOUBLE:
+ value.SetDouble(static_cast<const arrow::DoubleScalar&>(scalar).value);
+ break;
+ case arrow::Type::DATE32:
+ value.SetInt32(static_cast<const arrow::Date32Scalar&>(scalar).value);
+ break;
+ case arrow::Type::DATE64:
+ value.SetInt64(static_cast<const arrow::Date64Scalar&>(scalar).value);
+ break;
+ case arrow::Type::TIMESTAMP:
+ // TODO: signed timestamps
+ value.SetUint64(static_cast<const arrow::TimestampScalar&>(scalar).value);
+ break;
+ case arrow::Type::TIME32:
+ value.SetInt32(static_cast<const arrow::Time32Scalar&>(scalar).value);
+ break;
+ case arrow::Type::TIME64:
+ value.SetInt64(static_cast<const arrow::Time64Scalar&>(scalar).value);
+ break;
+ case arrow::Type::INTERVAL_MONTHS:
+ value.SetInt32(static_cast<const arrow::MonthIntervalScalar&>(scalar).value);
+ break;
+ case arrow::Type::DURATION:
+ value.SetInt64(static_cast<const arrow::DurationScalar&>(scalar).value);
+ break;
+ case arrow::Type::STRING: {
+ auto& buffer = static_cast<const arrow::StringScalar&>(scalar).value;
+ value.SetText(TString(reinterpret_cast<const char *>(buffer->data()), buffer->size()));
+ break;
+ }
+ case arrow::Type::LARGE_STRING: {
+ auto& buffer = static_cast<const arrow::LargeStringScalar&>(scalar).value;
+ value.SetText(TString(reinterpret_cast<const char *>(buffer->data()), buffer->size()));
+ break;
+ }
+ case arrow::Type::BINARY: {
+ auto& buffer = static_cast<const arrow::BinaryScalar&>(scalar).value;
+ value.SetBytes(TString(reinterpret_cast<const char *>(buffer->data()), buffer->size()));
+ break;
+ }
+ case arrow::Type::LARGE_BINARY: {
+ auto& buffer = static_cast<const arrow::LargeBinaryScalar&>(scalar).value;
+ value.SetBytes(TString(reinterpret_cast<const char *>(buffer->data()), buffer->size()));
+ break;
+ }
+ case arrow::Type::FIXED_SIZE_BINARY: {
+ auto& buffer = static_cast<const arrow::FixedSizeBinaryScalar&>(scalar).value;
+ value.SetBytes(TString(reinterpret_cast<const char *>(buffer->data()), buffer->size()));
+ break;
+ }
+ default:
+ Y_VERIFY_S(false, "Some types have no constant conversion yet: " << scalar.type->ToString());
+ }
+}
+
+std::shared_ptr<arrow::Scalar> ConstantToScalar(const NKikimrSSA::TProgram_TConstant& value,
+ const std::shared_ptr<arrow::DataType>& type) {
+ switch (type->id()) {
+ case arrow::Type::BOOL:
+ return std::make_shared<arrow::BooleanScalar>(value.GetBool());
+ case arrow::Type::UINT8:
+ return std::make_shared<arrow::UInt8Scalar>(value.GetUint32());
+ case arrow::Type::UINT16:
+ return std::make_shared<arrow::UInt16Scalar>(value.GetUint32());
+ case arrow::Type::UINT32:
+ return std::make_shared<arrow::UInt32Scalar>(value.GetUint32());
+ case arrow::Type::UINT64:
+ return std::make_shared<arrow::UInt64Scalar>(value.GetUint64());
+ case arrow::Type::INT8:
+ return std::make_shared<arrow::Int8Scalar>(value.GetInt32());
+ case arrow::Type::INT16:
+ return std::make_shared<arrow::Int16Scalar>(value.GetInt32());
+ case arrow::Type::INT32:
+ return std::make_shared<arrow::Int32Scalar>(value.GetInt32());
+ case arrow::Type::INT64:
+ return std::make_shared<arrow::Int64Scalar>(value.GetInt64());
+ case arrow::Type::FLOAT:
+ return std::make_shared<arrow::FloatScalar>(value.GetFloat());
+ case arrow::Type::DOUBLE:
+ return std::make_shared<arrow::DoubleScalar>(value.GetDouble());
+ case arrow::Type::DATE32:
+ return std::make_shared<arrow::Date32Scalar>(value.GetInt32());
+ case arrow::Type::DATE64:
+ return std::make_shared<arrow::Date64Scalar>(value.GetInt64());
+ case arrow::Type::TIMESTAMP:
+ if (value.HasUint64()) {
+ return std::make_shared<arrow::TimestampScalar>(value.GetUint64(), type);
+ } else if (value.HasInt64()) {
+ return std::make_shared<arrow::TimestampScalar>(value.GetInt64(), type);
+ } else {
+ Y_VERIFY(false, "Unexpected timestamp");
+ }
+ case arrow::Type::TIME32:
+ return std::make_shared<arrow::Time32Scalar>(value.GetInt32(), type);
+ case arrow::Type::TIME64:
+ return std::make_shared<arrow::Time64Scalar>(value.GetInt64(), type);
+ case arrow::Type::INTERVAL_MONTHS:
+ return std::make_shared<arrow::MonthIntervalScalar>(value.GetInt32());
+ case arrow::Type::DURATION:
+ return std::make_shared<arrow::DurationScalar>(value.GetInt64(), type);
+ case arrow::Type::STRING:
+ return std::make_shared<arrow::StringScalar>(value.GetText());
+ case arrow::Type::LARGE_STRING:
+ return std::make_shared<arrow::LargeStringScalar>(value.GetText());
+ case arrow::Type::BINARY: {
+ std::string bytes = value.GetBytes();
+ return std::make_shared<arrow::BinaryScalar>(arrow::Buffer::FromString(bytes));
+ }
+ case arrow::Type::LARGE_BINARY: {
+ std::string bytes = value.GetBytes();
+ return std::make_shared<arrow::LargeBinaryScalar>(arrow::Buffer::FromString(bytes));
+ }
+ case arrow::Type::FIXED_SIZE_BINARY: {
+ std::string bytes = value.GetBytes();
+ return std::make_shared<arrow::FixedSizeBinaryScalar>(arrow::Buffer::FromString(bytes), type);
+ }
+ default:
+ Y_VERIFY_S(false, "Some types have no constant conversion yet: " << type->ToString());
+ }
+ return {};
+}
+
+TString SerializeKeyScalar(const std::shared_ptr<arrow::Scalar>& key) {
+ TString out;
+ NArrow::SwitchType(key->type->id(), [&](const auto& t) {
+ using TWrap = std::decay_t<decltype(t)>;
+ using T = typename TWrap::T;
+ using TScalar = typename arrow::TypeTraits<T>::ScalarType;
+
+ if constexpr (std::is_same_v<T, arrow::StringType> ||
+ std::is_same_v<T, arrow::BinaryType> ||
+ std::is_same_v<T, arrow::LargeStringType> ||
+ std::is_same_v<T, arrow::LargeBinaryType> ||
+ std::is_same_v<T, arrow::FixedSizeBinaryType>) {
+ auto& buffer = static_cast<const TScalar&>(*key).value;
+ out = buffer->ToString();
+ } else if constexpr (std::is_same_v<T, arrow::HalfFloatType>) {
+ return false;
+ } else if constexpr (arrow::is_temporal_type<T>::value || arrow::has_c_type<T>::value) {
+ using TCType = typename arrow::TypeTraits<T>::CType;
+ static_assert(std::is_same_v<TCType, typename TScalar::ValueType>);
+
+ const TCType& value = static_cast<const TScalar&>(*key).value;
+ out = TString(reinterpret_cast<const char*>(&value), sizeof(value));
+ Y_VERIFY_S(!out.empty(), key->type->ToString());
+ } else {
+ return false;
+ }
+ return true;
+ });
+ return out;
+}
+
+std::shared_ptr<arrow::Scalar> DeserializeKeyScalar(const TString& key, const std::shared_ptr<arrow::DataType>& type) {
+ std::shared_ptr<arrow::Scalar> out;
+ NArrow::SwitchType(type->id(), [&](const auto& t) {
+ using TWrap = std::decay_t<decltype(t)>;
+ using T = typename TWrap::T;
+ using TScalar = typename arrow::TypeTraits<T>::ScalarType;
+
+ if constexpr (std::is_same_v<T, arrow::StringType> ||
+ std::is_same_v<T, arrow::BinaryType> ||
+ std::is_same_v<T, arrow::LargeStringType> ||
+ std::is_same_v<T, arrow::LargeBinaryType> ||
+ std::is_same_v<T, arrow::FixedSizeBinaryType>) {
+ out = std::make_shared<TScalar>(arrow::Buffer::FromString(key), type);
+ } else if constexpr (std::is_same_v<T, arrow::HalfFloatType>) {
+ return false;
+ } else if constexpr (arrow::is_temporal_type<T>::value) {
+ using TCType = typename arrow::TypeTraits<T>::CType;
+ static_assert(std::is_same_v<TCType, typename TScalar::ValueType>);
+
+ Y_VERIFY(key.size() == sizeof(TCType));
+ TCType value = ReadUnaligned<TCType>(key.data());
+ out = std::make_shared<TScalar>(value, type);
+ } else if constexpr (arrow::has_c_type<T>::value) {
+ using TCType = typename arrow::TypeTraits<T>::CType;
+ static_assert(std::is_same_v<TCType, typename TScalar::ValueType>);
+
+ Y_VERIFY(key.size() == sizeof(TCType));
+ TCType value = ReadUnaligned<TCType>(key.data());
+ out = std::make_shared<TScalar>(value);
+ } else {
+ return false;
+ }
+ return true;
+ });
+ Y_VERIFY_S(out, type->ToString());
+ return out;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/scalars.h b/ydb/core/tx/columnshard/engines/scalars.h
new file mode 100644
index 00000000000..02d6dac893a
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scalars.h
@@ -0,0 +1,14 @@
+#pragma once
+#include "defs.h"
+#include <ydb/core/protos/tx_columnshard.pb.h>
+
+namespace NKikimr::NOlap {
+
+void ScalarToConstant(const arrow::Scalar& scalar, NKikimrSSA::TProgram_TConstant& value);
+std::shared_ptr<arrow::Scalar> ConstantToScalar(const NKikimrSSA::TProgram_TConstant& value,
+ const std::shared_ptr<arrow::DataType>& type);
+
+TString SerializeKeyScalar(const std::shared_ptr<arrow::Scalar>& key);
+std::shared_ptr<arrow::Scalar> DeserializeKeyScalar(const TString& key, const std::shared_ptr<arrow::DataType>& type);
+
+}
diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
index 458b2282ec6..8c3a5239e3b 100644
--- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
@@ -26,9 +26,9 @@ public:
return true;
}
- void WriteGranule(ui32, const TGranuleRecord&) override {}
- void EraseGranule(ui32, const TGranuleRecord&) override {}
- bool LoadGranules(ui32, std::function<void(TGranuleRecord&&)>) override { return true; }
+ void WriteGranule(ui32, const IColumnEngine&, const TGranuleRecord&) override {}
+ void EraseGranule(ui32, const IColumnEngine&, const TGranuleRecord&) override {}
+ bool LoadGranules(ui32, const IColumnEngine&, std::function<void(TGranuleRecord&&)>) override { return true; }
void WriteColumn(ui32, const TColumnRecord&) override {}
void EraseColumn(ui32, const TColumnRecord&) override {}
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 953e700a5d6..48286575d0a 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -54,7 +54,7 @@ public:
return true;
}
- void WriteGranule(ui32 index, const TGranuleRecord& row) override {
+ void WriteGranule(ui32 index, const IColumnEngine&, const TGranuleRecord& row) override {
auto& granules = Indices[index].Granules[row.PathId];
bool replaced = false;
@@ -70,7 +70,7 @@ public:
}
}
- void EraseGranule(ui32 index, const TGranuleRecord& row) override {
+ void EraseGranule(ui32 index, const IColumnEngine&, const TGranuleRecord& row) override {
auto& pathGranules = Indices[index].Granules[row.PathId];
TVector<TGranuleRecord> filtered;
@@ -83,7 +83,7 @@ public:
pathGranules.swap(filtered);
}
- bool LoadGranules(ui32 index, std::function<void(TGranuleRecord&&)> callback) override {
+ bool LoadGranules(ui32 index, const IColumnEngine&, std::function<void(TGranuleRecord&&)> callback) override {
auto& granules = Indices[index].Granules;
for (auto& [pathId, vec] : granules) {
for (auto& rec : vec) {
@@ -171,31 +171,39 @@ static const TVector<std::pair<TString, TTypeInfo>> testKey = {
{"uid", TTypeInfo(NTypeIds::Utf8) }
};
-TIndexInfo TestTableInfo() {
+TIndexInfo TestTableInfo(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema = testColumns,
+ const TVector<std::pair<TString, TTypeInfo>>& key = testKey) {
TIndexInfo indexInfo("", 0);
- for (ui32 i = 0; i < testColumns.size(); ++i) {
+ for (ui32 i = 0; i < ydbSchema.size(); ++i) {
ui32 id = i + 1;
- auto& name = testColumns[i].first;
- auto& type = testColumns[i].second;
+ auto& name = ydbSchema[i].first;
+ auto& type = ydbSchema[i].second;
indexInfo.Columns[id] = NTable::TColumn(name, id, type);
indexInfo.ColumnNames[name] = id;
}
- for (const auto& [keyName, keyType] : testKey) {
+ for (const auto& [keyName, keyType] : key) {
indexInfo.KeyColumns.push_back(indexInfo.ColumnNames[keyName]);
}
- indexInfo.AddTtlColumn(testColumns[0].first);
-
+ indexInfo.AddTtlColumn("timestamp");
return indexInfo;
}
+static NOlap::TTiersInfo MakeTtl(TInstant border) {
+ return NOlap::TTiersInfo("timestamp", border);
+}
+
+template <typename TKeyDataType>
class TBuilder {
public:
+ using TTraits = typename arrow::TypeTraits<TKeyDataType>;
+ using TCType = std::conditional_t<arrow::has_c_type<TKeyDataType>::value, typename TTraits::CType, std::string>;
+
struct TRow {
- ui64 Timestamp;
+ TCType Timestamp;
std::string ResourceType;
std::string ResourceId;
std::string Uid;
@@ -211,7 +219,7 @@ public:
bool AddRow(const TRow& row) {
bool ok = true;
- ok = ok && BatchBuilder->GetFieldAs<arrow::TypeTraits<arrow::TimestampType>::BuilderType>(0)->Append(row.Timestamp).ok();
+ ok = ok && BatchBuilder->GetFieldAs<typename TTraits::BuilderType>(0)->Append(row.Timestamp).ok();
ok = ok && BatchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append(row.ResourceType).ok();
ok = ok && BatchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(2)->Append(row.ResourceId).ok();
ok = ok && BatchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(3)->Append(row.Uid).ok();
@@ -226,10 +234,6 @@ public:
return batch;
}
- static NOlap::TTiersInfo MakeTtl(TInstant border) {
- return NOlap::TTiersInfo(testColumns[0].first, border);
- }
-
private:
std::shared_ptr<arrow::Schema> Schema = NArrow::MakeArrowSchema(testColumns);
std::unique_ptr<arrow::RecordBatchBuilder> BatchBuilder;
@@ -240,11 +244,12 @@ TBlobRange MakeBlobRange(ui32 step, ui32 blobSize) {
return TBlobRange(TUnifiedBlobId(11111, TLogoBlobID(100500, 42, step, 3, blobSize, 0)), 0, blobSize);
}
-TString MakeTestBlob(ui64 start = 0, ui64 end = 100) {
- TBuilder builder;
- for (ui64 ts = start; ts < end; ++ts) {
+TString MakeTestBlob(i64 start = 0, i64 end = 100) {
+ TBuilder<arrow::TimestampType> builder;
+ for (i64 ts = start; ts < end; ++ts) {
TString str = ToString(ts);
- builder.AddRow({ts, str, str, str, str});
+ TString sortedStr = Sprintf("%05ld", (long)ts);
+ builder.AddRow({ts, sortedStr, str, str, str});
}
auto batch = builder.Finish();
return NArrow::SerializeBatchNoCompression(batch);
@@ -289,9 +294,9 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap,
return engine.ApplyChanges(db, changes, snap);
}
-bool Insert(TTestDbWrapper& db, TSnapshot snap,
+bool Insert(const TIndexInfo& tableInfo, TTestDbWrapper& db, TSnapshot snap,
TVector<TInsertedData>&& dataToIndex, THashMap<TBlobRange, TString>& blobs, ui32& step) {
- TColumnEngineForLogs engine(TestTableInfo(), 0, TestLimits());
+ TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits());
engine.Load(db);
return Insert(engine, db, snap, std::move(dataToIndex), blobs, step);
@@ -324,9 +329,9 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T
return engine.ApplyChanges(db, changes, snap);
}
-bool Compact(TTestDbWrapper& db, TSnapshot snap, THashMap<TBlobRange, TString>&& blobs, ui32& step,
- const TExpected& expected) {
- TColumnEngineForLogs engine(TestTableInfo(), 0, TestLimits());
+bool Compact(const TIndexInfo& tableInfo, TTestDbWrapper& db, TSnapshot snap, THashMap<TBlobRange,
+ TString>&& blobs, ui32& step, const TExpected& expected) {
+ TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits());
engine.Load(db);
return Compact(engine, db, snap, std::move(blobs), step, expected);
}
@@ -361,12 +366,26 @@ std::shared_ptr<TPredicate> MakePredicate(int64_t ts, NArrow::EOperation op) {
return p;
}
+std::shared_ptr<TPredicate> MakeStrPredicate(const std::string& key, NArrow::EOperation op) {
+ auto p = std::make_shared<TPredicate>();
+ p->Operation = op;
+
+ auto type = arrow::utf8();
+ auto res = arrow::MakeArrayFromScalar(arrow::StringScalar(key), 1);
+
+ std::vector<std::shared_ptr<arrow::Field>> fields = { std::make_shared<arrow::Field>("resource_type", type) };
+ p->Batch = arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(fields), 1, {*res});
+ return p;
+}
+
}
Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
- Y_UNIT_TEST(IndexWriteLoadRead) {
+ void WriteLoadRead(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema,
+ const TVector<std::pair<TString, TTypeInfo>>& key) {
TTestDbWrapper db;
+ TIndexInfo tableInfo = TestTableInfo(ydbSchema, key);
TVector<ui64> paths = {1, 2};
@@ -389,11 +408,11 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
THashMap<TBlobRange, TString> blobs;
blobs[blobRanges[0]] = testBlob;
blobs[blobRanges[1]] = testBlob;
- Insert(db, {1, 2}, std::move(dataToIndex), blobs, step);
+ Insert(tableInfo, db, {1, 2}, std::move(dataToIndex), blobs, step);
// load
- TColumnEngineForLogs engine(TestTableInfo(), 0);
+ TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0);
engine.Load(db);
// selects
@@ -440,8 +459,25 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
}
}
- Y_UNIT_TEST(IndexReadWithPredicates) {
+ Y_UNIT_TEST(IndexWriteLoadRead) {
+ WriteLoadRead(testColumns, testKey);
+ }
+
+ Y_UNIT_TEST(IndexWriteLoadReadStrPK) {
+ TVector<std::pair<TString, TTypeInfo>> key = {
+ {"resource_type", TTypeInfo(NTypeIds::Utf8) },
+ {"resource_id", TTypeInfo(NTypeIds::Utf8) },
+ {"uid", TTypeInfo(NTypeIds::Utf8) },
+ {"timestamp", TTypeInfo(NTypeIds::Timestamp) }
+ };
+
+ WriteLoadRead(testColumns, key);
+ }
+
+ void ReadWithPredicates(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema,
+ const TVector<std::pair<TString, TTypeInfo>>& key) {
TTestDbWrapper db;
+ TIndexInfo tableInfo = TestTableInfo(ydbSchema, key);
ui64 pathId = 1;
ui32 step = 1000;
@@ -462,19 +498,19 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
dataToIndex.push_back(
TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()});
- bool ok = Insert(db, {planStep, txId}, std::move(dataToIndex), blobs, step);
+ bool ok = Insert(tableInfo, db, {planStep, txId}, std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
}
// compact
planStep = 2;
- bool ok = Compact(db, TSnapshot{planStep, 1}, std::move(blobs), step, {20, 4, 4});
+ bool ok = Compact(tableInfo, db, TSnapshot{planStep, 1}, std::move(blobs), step, {20, 4, 4});
UNIT_ASSERT(ok);
// load
- TColumnEngineForLogs engine(TestTableInfo(), 0, TestLimits());
+ TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits());
engine.Load(db);
// read
@@ -484,13 +520,13 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
planStep = 3;
const TIndexInfo& indexInfo = engine.GetIndexInfo();
- THashSet<ui32> oneColumnId = { indexInfo.GetColumnId(testColumns[0].first) };
+ THashSet<ui32> oneColumnId = { indexInfo.GetColumnId(key[0].first) };
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, {}, {});
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 4);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 4);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 4);
}
// predicates
@@ -498,22 +534,44 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{
ui64 txId = 1;
std::shared_ptr<TPredicate> gt10k = MakePredicate(10000, NArrow::EOperation::Greater);
+ if (key[0].second == TTypeInfo(NTypeIds::Utf8)) {
+ gt10k = MakeStrPredicate("10000", NArrow::EOperation::Greater);
+ }
auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, gt10k, {});
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2);
}
{
ui64 txId = 1;
std::shared_ptr<TPredicate> lt10k = MakePredicate(9999, NArrow::EOperation::Less); // TODO: better border checks
+ if (key[0].second == TTypeInfo(NTypeIds::Utf8)) {
+ lt10k = MakeStrPredicate("09999", NArrow::EOperation::Less);
+ }
auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, 0, lt10k);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2);
}
}
+ Y_UNIT_TEST(IndexReadWithPredicates) {
+ ReadWithPredicates(testColumns, testKey);
+ }
+
+ Y_UNIT_TEST(IndexReadWithPredicatesStrPK) {
+ TVector<std::pair<TString, TTypeInfo>> key = {
+ {"resource_type", TTypeInfo(NTypeIds::Utf8) },
+ {"resource_id", TTypeInfo(NTypeIds::Utf8) },
+ {"uid", TTypeInfo(NTypeIds::Utf8) },
+ {"timestamp", TTypeInfo(NTypeIds::Timestamp) }
+ };
+
+ ReadWithPredicates(testColumns, key);
+ }
+
Y_UNIT_TEST(IndexWriteOverload) {
TTestDbWrapper db;
+ TIndexInfo tableInfo = TestTableInfo();
ui64 pathId = 1;
ui32 step = 1000;
@@ -521,7 +579,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// inserts
ui64 planStep = 1;
- TColumnEngineForLogs engine(TestTableInfo(), 0, TestLimits());
+ TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits());
engine.Load(db);
THashMap<TBlobRange, TString> blobs;
@@ -551,7 +609,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
UNIT_ASSERT(overload);
{ // check it's overloaded after reload
- TColumnEngineForLogs tmpEngine(TestTableInfo(), 0, TestLimits());
+ TColumnEngineForLogs tmpEngine(TIndexInfo(tableInfo), 0, TestLimits());
tmpEngine.Load(db);
UNIT_ASSERT(tmpEngine.GetOverloadedGranules(pathId));
}
@@ -582,7 +640,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
}
{ // check it's not overloaded after reload
- TColumnEngineForLogs tmpEngine(TestTableInfo(), 0, TestLimits());
+ TColumnEngineForLogs tmpEngine(TIndexInfo(tableInfo), 0, TestLimits());
tmpEngine.Load(db);
UNIT_ASSERT(!tmpEngine.GetOverloadedGranules(pathId));
}
@@ -590,6 +648,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
Y_UNIT_TEST(IndexTtl) {
TTestDbWrapper db;
+ TIndexInfo tableInfo = TestTableInfo();
ui64 pathId = 1;
ui32 step = 1000;
@@ -610,19 +669,19 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
dataToIndex.push_back(
TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()});
- bool ok = Insert(db, {planStep, txId}, std::move(dataToIndex), blobs, step);
+ bool ok = Insert(tableInfo, db, {planStep, txId}, std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
}
// compact
planStep = 2;
- bool ok = Compact(db, TSnapshot{planStep, 1}, std::move(blobs), step, {20, 4, 4});
+ bool ok = Compact(tableInfo, db, TSnapshot{planStep, 1}, std::move(blobs), step, {20, 4, 4});
UNIT_ASSERT(ok);
// load
- TColumnEngineForLogs engine(TestTableInfo(), 0, TestLimits());
+ TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits());
engine.Load(db);
// read
@@ -650,7 +709,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// TTL
THashMap<ui64, NOlap::TTiersInfo> pathTtls;
- pathTtls.emplace(pathId, TBuilder::MakeTtl(TInstant::MicroSeconds(10000)));
+ pathTtls.emplace(pathId, MakeTtl(TInstant::MicroSeconds(10000)));
Ttl(engine, db, pathTtls, 2);
// read + load + read
diff --git a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
index 7dda4cdcdb8..5836e05b087 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
@@ -18,14 +18,18 @@ namespace NTypeIds = NScheme::NTypeIds;
using TTypeId = NScheme::TTypeId;
using TTypeInfo = NScheme::TTypeInfo;
-static const TVector<std::pair<TString, TTypeInfo>> testYdbSchema = TTestSchema::YdbSchema();
-static const TVector<std::pair<TString, TTypeInfo>> testYdbPkSchema = TTestSchema::YdbPkSchema();
-
+template <typename TKey = ui64>
bool DataHas(const TVector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range,
bool requireUniq = false) {
- THashMap<ui64, ui32> keys;
+ static constexpr const bool isStrKey = std::is_same_v<TKey, std::string>;
+
+ THashMap<TKey, ui32> keys;
for (size_t i = range.first; i < range.second; ++i) {
- keys.emplace(i, 0);
+ if constexpr (isStrKey) {
+ keys.emplace(ToString(i), 0);
+ } else {
+ keys.emplace(i, 0);
+ }
}
auto schema = NArrow::DeserializeSchema(srtSchema);
@@ -35,10 +39,27 @@ bool DataHas(const TVector<TString>& blobs, const TString& srtSchema, std::pair<
std::shared_ptr<arrow::Array> array = batch->GetColumnByName("timestamp");
UNIT_ASSERT(array);
- auto& ts = dynamic_cast<const arrow::NumericArray<arrow::TimestampType>&>(*array);
- for (int i = 0; i < ts.length(); ++i) {
- ui64 value = ts.Value(i);
+ for (int i = 0; i < array->length(); ++i) {
+ TKey value{};
+
+ NArrow::SwitchType(array->type_id(), [&](const auto& type) {
+ using TWrap = std::decay_t<decltype(type)>;
+ using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;
+
+ if constexpr (isStrKey && arrow::has_string_view<typename TWrap::T>()) {
+ value = static_cast<const TArray&>(*array).GetString(i);
+ return true;
+ }
+ if constexpr (!isStrKey && arrow::has_c_type<typename TWrap::T>()) {
+ auto& column = static_cast<const TArray&>(*array);
+ value = column.Value(i);
+ return true;
+ }
+ UNIT_ASSERT(false);
+ return false;
+ });
+
++keys[value];
}
}
@@ -57,10 +78,17 @@ bool DataHas(const TVector<TString>& blobs, const TString& srtSchema, std::pair<
return true;
}
+template <typename TKey = ui64>
bool DataHasOnly(const TVector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range) {
- THashSet<ui64> keys;
+ static constexpr const bool isStrKey = std::is_same_v<TKey, std::string>;
+
+ THashSet<TKey> keys;
for (size_t i = range.first; i < range.second; ++i) {
- keys.emplace(i);
+ if constexpr (isStrKey) {
+ keys.emplace(ToString(i));
+ } else {
+ keys.emplace(i);
+ }
}
auto schema = NArrow::DeserializeSchema(srtSchema);
@@ -70,10 +98,27 @@ bool DataHasOnly(const TVector<TString>& blobs, const TString& srtSchema, std::p
std::shared_ptr<arrow::Array> array = batch->GetColumnByName("timestamp");
UNIT_ASSERT(array);
- auto& ts = dynamic_cast<const arrow::NumericArray<arrow::TimestampType>&>(*array);
- for (int i = 0; i < ts.length(); ++i) {
- ui64 value = ts.Value(i);
+ for (int i = 0; i < array->length(); ++i) {
+ ui64 value{};
+
+ NArrow::SwitchType(array->type_id(), [&](const auto& type) {
+ using TWrap = std::decay_t<decltype(type)>;
+ using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;
+
+ if constexpr (isStrKey && arrow::has_string_view<typename TWrap::T>()) {
+ value = static_cast<const TArray&>(*array).GetView(i);
+ return true;
+ }
+ if constexpr (!isStrKey && arrow::has_c_type<typename TWrap::T>()) {
+ auto& column = static_cast<const TArray&>(*array);
+ value = column.Value(i);
+ return true;
+ }
+ UNIT_ASSERT(false);
+ return false;
+ });
+
if (!keys.count(value)) {
Cerr << "Unexpected key: " << value << "\n";
return false;
@@ -86,10 +131,12 @@ bool DataHasOnly(const TVector<TString>& blobs, const TString& srtSchema, std::p
template <typename TArrowType>
bool CheckTypedIntValues(const std::shared_ptr<arrow::Array>& array, const std::vector<int64_t>& expected) {
+ using TArray = typename arrow::TypeTraits<TArrowType>::ArrayType;
+
UNIT_ASSERT(array);
UNIT_ASSERT_VALUES_EQUAL(array->length(), (int)expected.size());
- auto& column = dynamic_cast<const arrow::NumericArray<TArrowType>&>(*array);
+ auto& column = dynamic_cast<const TArray&>(*array);
for (int i = 0; i < column.length(); ++i) {
auto value = column.Value(i);
@@ -98,12 +145,14 @@ bool CheckTypedIntValues(const std::shared_ptr<arrow::Array>& array, const std::
return true;
}
-template <typename TArrowArrayType>
+template <typename TArrowType>
bool CheckTypedStrValues(const std::shared_ptr<arrow::Array>& array, const std::vector<std::string>& expected) {
+ using TArray = typename arrow::TypeTraits<TArrowType>::ArrayType;
+
UNIT_ASSERT(array);
UNIT_ASSERT_VALUES_EQUAL(array->length(), (int)expected.size());
- auto& column = dynamic_cast<const TArrowArrayType&>(*array);
+ auto& column = dynamic_cast<const TArray&>(*array);
for (int i = 0; i < column.length(); ++i) {
auto value = column.GetString(i);
@@ -150,11 +199,11 @@ bool CheckIntValues(const std::shared_ptr<arrow::Array>& array, const std::vecto
return CheckTypedIntValues<arrow::DoubleType>(array, expected);
case arrow::Type::STRING:
- return CheckTypedStrValues<arrow::StringArray>(array, expectedStr);
+ return CheckTypedStrValues<arrow::StringType>(array, expectedStr);
case arrow::Type::BINARY:
- return CheckTypedStrValues<arrow::BinaryArray>(array, expectedStr);
+ return CheckTypedStrValues<arrow::BinaryType>(array, expectedStr);
case arrow::Type::FIXED_SIZE_BINARY:
- return CheckTypedStrValues<arrow::FixedSizeBinaryArray>(array, expectedStr);
+ return CheckTypedStrValues<arrow::FixedSizeBinaryType>(array, expectedStr);
default:
Cerr << "type : " << array->type()->ToString() << "\n";
@@ -171,14 +220,37 @@ bool CheckOrdered(const TString& blob, const TString& srtSchema) {
std::shared_ptr<arrow::Array> array = batch->GetColumnByName("timestamp");
UNIT_ASSERT(array);
- auto& ts = dynamic_cast<const arrow::NumericArray<arrow::TimestampType>&>(*array);
- if (!ts.length()) {
+ if (!array->length()) {
return true;
}
- ui64 prev = ts.Value(0);
- for (int i = 1; i < ts.length(); ++i) {
- ui64 value = ts.Value(i);
+ ui64 prev{};
+ for (int i = 0; i < array->length(); ++i) {
+ ui64 value{};
+
+ NArrow::SwitchType(array->type_id(), [&](const auto& type) {
+ using TWrap = std::decay_t<decltype(type)>;
+ using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;
+#if 0
+ if constexpr (isStrKey && arrow::has_string_view<typename TWrap::T>()) {
+ value = static_cast<const TArray&>(*array).GetView(i);
+ return true;
+ }
+#endif
+ if constexpr (/*!isStrKey && */arrow::has_c_type<typename TWrap::T>()) {
+ auto& column = static_cast<const TArray&>(*array);
+ value = column.Value(i);
+ return true;
+ }
+ UNIT_ASSERT(false);
+ return false;
+ });
+
+ if (!i) {
+ prev = value;
+ continue;
+ }
+
if (prev > value) {
Cerr << "Unordered: " << prev << " " << value << "\n";
return false;
@@ -212,9 +284,11 @@ bool CheckColumns(const TString& blob, const NKikimrTxColumnShard::TMetadata& me
void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId,
const TVector<std::pair<TString, TTypeInfo>>& schema = TTestSchema::YdbSchema(),
- NOlap::TSnapshot snap = {10, 10}, TString codec = "") {
+ const TVector<std::pair<TString, TTypeInfo>>& pk = TTestSchema::YdbPkSchema(),
+ TString codec = "none") {
+ NOlap::TSnapshot snap = {10, 10};
bool ok = ProposeSchemaTx(runtime, sender,
- TTestSchema::CreateTableTxBody(pathId, schema,
+ TTestSchema::CreateTableTxBody(pathId, schema, pk,
TTestSchema::TTableSpecials().WithCodec(codec)),
snap);
UNIT_ASSERT(ok);
@@ -260,14 +334,34 @@ void TestWrite(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema) {
ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema));
UNIT_ASSERT(!ok);
- // wrong type
- // TODO: better check (it probably does not work in general case)
+ // wrong first key column type (with supported layout: Int64 vs Timestamp)
+ // It fails only if we specify source schema. No way to detect it from serialized batch data.
schema = ydbSchema;
- schema[1].second = TTypeInfo(NTypeIds::Int32);
- ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema));
+ schema[0].second = TTypeInfo(NTypeIds::Int64);
+ ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema),
+ NArrow::MakeArrowSchema(schema));
UNIT_ASSERT(!ok);
+ // wrong type (no additional schema - fails in case of wrong layout)
+
+ for (size_t i = 0; i < ydbSchema.size(); ++i) {
+ schema = ydbSchema;
+ schema[i].second = TTypeInfo(NTypeIds::Int8);
+ ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema));
+ UNIT_ASSERT(!ok);
+ }
+
+ // wrong type (with additional schema)
+
+ for (size_t i = 0; i < ydbSchema.size(); ++i) {
+ schema = ydbSchema;
+ schema[i].second = TTypeInfo(NTypeIds::Int64);
+ ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema),
+ NArrow::MakeArrowSchema(schema));
+ UNIT_ASSERT(ok == (ydbSchema[i].second == TTypeInfo(NTypeIds::Int64)));
+ }
+
schema = ydbSchema;
schema[1].second = TTypeInfo(NTypeIds::Utf8);
schema[5].second = TTypeInfo(NTypeIds::Int32);
@@ -276,17 +370,12 @@ void TestWrite(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema) {
// reordered columns
+ THashMap<TString, TTypeInfo> remap(ydbSchema.begin(), ydbSchema.end());
+
schema.resize(0);
- schema.push_back({"level", TTypeInfo(NTypeIds::Int32) });
- schema.push_back({"timestamp", TTypeInfo(NTypeIds::Timestamp) });
- schema.push_back({"uid", TTypeInfo(NTypeIds::Utf8) });
- schema.push_back({"resource_id", TTypeInfo(NTypeIds::Utf8) });
- schema.push_back({"resource_type", TTypeInfo(NTypeIds::Utf8) });
- schema.push_back({"message", TTypeInfo(NTypeIds::Utf8) });
- schema.push_back({"request_id", TTypeInfo(NTypeIds::Utf8) });
- schema.push_back({"saved_at", TTypeInfo(NTypeIds::Timestamp) });
- schema.push_back({"ingested_at", TTypeInfo(NTypeIds::Timestamp) });
- schema.push_back({"json_payload", TTypeInfo(NTypeIds::Json) });
+ for (auto& [name, typeInfo] : remap) {
+ schema.push_back({name, typeInfo});
+ }
ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema));
UNIT_ASSERT(!ok);
@@ -365,6 +454,7 @@ void TestWriteReadDup() {
}
void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeInfo>>& ydbSchema = TTestSchema::YdbSchema(),
+ const TVector<std::pair<TString, TTypeInfo>>& testYdbPk = TTestSchema::YdbPkSchema(),
TString codec = "") {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -408,7 +498,7 @@ void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeInfo>>& y
ui64 writeId = 0;
ui64 tableId = 1;
- SetupSchema(runtime, sender, tableId, ydbSchema, {10, 10}, codec);
+ SetupSchema(runtime, sender, tableId, ydbSchema, testYdbPk, codec);
// ----xx
// -----xx..
@@ -683,7 +773,8 @@ void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeInfo>>& y
UNIT_ASSERT(resRead.GetData().size() > 0);
//UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
//UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
- if (resRead.GetFinished()) {
+ bool lastBach = resRead.GetFinished();
+ if (lastBach) {
expected = resRead.GetBatch() + 1;
}
readData.push_back(resRead.GetData());
@@ -694,18 +785,20 @@ void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeInfo>>& y
}
UNIT_ASSERT(CheckOrdered(resRead.GetData(), schema));
- UNIT_ASSERT(meta.HasReadStats());
- auto& readStats = meta.GetReadStats();
-
- if (ydbSchema == TTestSchema::YdbSchema()) {
- if (codec == "" || codec == "lz4") {
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 50);
- } else if (codec == "none") {
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 75);
- } else if (codec == "zstd") {
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 26);
- } else {
- UNIT_ASSERT(false);
+ if (lastBach) {
+ UNIT_ASSERT(meta.HasReadStats());
+ auto& readStats = meta.GetReadStats();
+
+ if (ydbSchema == TTestSchema::YdbSchema()) {
+ if (codec == "" || codec == "lz4") {
+ UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 50);
+ } else if (codec == "none") {
+ UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 75);
+ } else if (codec == "zstd") {
+ UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 26);
+ } else {
+ UNIT_ASSERT(false);
+ }
}
}
}
@@ -717,9 +810,9 @@ void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeInfo>>& y
// read 11 (range predicate: closed interval)
{
- TSerializedTableRange range = MakeTestRange({10, 42}, true, true, testYdbPkSchema);
+ TSerializedTableRange range = MakeTestRange({10, 42}, true, true, testYdbPk);
NOlap::TPredicate prGreater, prLess;
- std::tie(prGreater, prLess) = RangePredicates(range, testYdbPkSchema);
+ std::tie(prGreater, prLess) = RangePredicates(range, testYdbPk);
auto evRead = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, 24, txId, tableId);
auto* greater = Proto(evRead.get()).MutableGreaterPredicate();
@@ -761,9 +854,9 @@ void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeInfo>>& y
// read 12 (range predicate: open interval)
{
- TSerializedTableRange range = MakeTestRange({10, 42}, false, false, testYdbPkSchema);
+ TSerializedTableRange range = MakeTestRange({10, 42}, false, false, testYdbPk);
NOlap::TPredicate prGreater, prLess;
- std::tie(prGreater, prLess) = RangePredicates(range, testYdbPkSchema);
+ std::tie(prGreater, prLess) = RangePredicates(range, testYdbPk);
auto evRead = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, 24, txId, tableId);
auto* greater = Proto(evRead.get()).MutableGreaterPredicate();
@@ -805,7 +898,9 @@ void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeInfo>>& y
UNIT_ASSERT(DataHasOnly(readData, schema, {11, 41 + 1}));
}
-void TestCompactionInGranuleImpl(bool reboots) {
+void TestCompactionInGranuleImpl(bool reboots,
+ const TVector<std::pair<TString, TTypeInfo>>& ydbSchema,
+ const TVector<std::pair<TString, TTypeInfo>>& ydbPk) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -848,14 +943,14 @@ void TestCompactionInGranuleImpl(bool reboots) {
ui64 planStep = 100;
ui64 txId = 100;
- SetupSchema(runtime, sender, tableId);
+ SetupSchema(runtime, sender, tableId, ydbSchema, ydbPk);
TAutoPtr<IEventHandle> handle;
// Write same keys: merge on compaction
static const ui32 triggerPortionSize = 75 * 1000;
std::pair<ui64, ui64> triggerPortion = {0, triggerPortionSize};
- TString triggerData = MakeTestBlob(triggerPortion, testYdbSchema);
+ TString triggerData = MakeTestBlob(triggerPortion, ydbSchema);
UNIT_ASSERT(triggerData.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
UNIT_ASSERT(triggerData.size() < NColumnShard::TLimits::MAX_BLOB_SIZE);
@@ -870,7 +965,7 @@ void TestCompactionInGranuleImpl(bool reboots) {
ids.reserve(numWrites);
for (ui32 w = 0; w < numWrites; ++w, ++writeId, pos += portionSize) {
std::pair<ui64, ui64> portion = {pos, pos + portionSize};
- TString data = MakeTestBlob(portion, testYdbSchema);
+ TString data = MakeTestBlob(portion, ydbSchema);
ids.push_back(writeId);
UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, data));
@@ -923,8 +1018,13 @@ void TestCompactionInGranuleImpl(bool reboots) {
TVector<TString> readData;
readData.push_back(resRead.GetData());
- UNIT_ASSERT(DataHas(readData, schema, triggerPortion, true));
- UNIT_ASSERT(DataHas(readData, schema, smallWrites, true));
+ if (ydbPk[0].second == TTypeInfo(NTypeIds::String) || ydbPk[0].second == TTypeInfo(NTypeIds::Utf8)) {
+ UNIT_ASSERT(DataHas<std::string>(readData, schema, triggerPortion, true));
+ UNIT_ASSERT(DataHas<std::string>(readData, schema, smallWrites, true));
+ } else {
+ UNIT_ASSERT(DataHas(readData, schema, triggerPortion, true));
+ UNIT_ASSERT(DataHas(readData, schema, smallWrites, true));
+ }
UNIT_ASSERT(meta.HasReadStats());
auto& readStats = meta.GetReadStats();
@@ -1103,7 +1203,7 @@ void TestReadWithProgram(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema
ui64 planStep = 100;
ui64 txId = 100;
- SetupSchema(runtime, sender, tableId);
+ SetupSchema(runtime, sender, tableId, ydbSchema);
{ // write some data
bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, ydbSchema));
@@ -1240,7 +1340,9 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema,
ui64 planStep = 100;
ui64 txId = 100;
- SetupSchema(runtime, sender, tableId, ydbSchema);
+ auto pk = ydbSchema;
+ pk.resize(4);
+ SetupSchema(runtime, sender, tableId, ydbSchema, pk);
{ // write some data
bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, testDataBlob);
@@ -1396,19 +1498,81 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
Y_UNIT_TEST(WriteReadNoCompression) {
- TestWriteRead(true, TTestSchema::YdbSchema(), "none");
+ TestWriteRead(true, TTestSchema::YdbSchema(), TTestSchema::YdbPkSchema(), "none");
}
Y_UNIT_TEST(WriteReadZSTD) {
- TestWriteRead(true, TTestSchema::YdbSchema(), "zstd");
+ TestWriteRead(true, TTestSchema::YdbSchema(), TTestSchema::YdbPkSchema(), "zstd");
}
Y_UNIT_TEST(CompactionInGranule) {
- TestCompactionInGranuleImpl(false);
+ std::vector<TTypeId> types = {
+ NTypeIds::Timestamp,
+ //NTypeIds::Int16,
+ //NTypeIds::Uint16,
+ NTypeIds::Int32,
+ NTypeIds::Uint32,
+ NTypeIds::Int64,
+ NTypeIds::Uint64,
+ //NTypeIds::Date,
+ NTypeIds::Datetime
+ //NTypeIds::Interval
+ };
+ auto schema = TTestSchema::YdbSchema();
+ auto pk = TTestSchema::YdbPkSchema();
+
+ for (auto& type : types) {
+ schema[0].second = TTypeInfo(type);
+ pk[0].second = TTypeInfo(type);
+ TestCompactionInGranuleImpl(false, schema, pk);
+ }
+ }
+#if 0
+ Y_UNIT_TEST(CompactionInGranuleFloatKey) {
+ std::vector<NScheme::TTypeId> types = {
+ NTypeIds::Float,
+ NTypeIds::Double
+ };
+ auto schema = TTestSchema::YdbSchema();
+ auto pk = TTestSchema::YdbPkSchema();
+
+ for (auto& type : types) {
+ schema[0].second = TTypeInfo(type);
+ pk[0].second = TTypeInfo(type);
+ TestCompactionInGranuleImpl(false, schema, pk);
+ }
+ }
+#endif
+ Y_UNIT_TEST(CompactionInGranuleStrKey) {
+ std::vector<NScheme::TTypeId> types = {
+ NTypeIds::String,
+ NTypeIds::Utf8
+ };
+ auto schema = TTestSchema::YdbSchema();
+ auto pk = TTestSchema::YdbPkSchema();
+
+ for (auto& type : types) {
+ schema[0].second = TTypeInfo(type);
+ pk[0].second = TTypeInfo(type);
+ TestCompactionInGranuleImpl(false, schema, pk);
+ }
}
Y_UNIT_TEST(RebootCompactionInGranule) {
- TestCompactionInGranuleImpl(true);
+ // some of types
+ std::vector<NScheme::TTypeId> types = {
+ NTypeIds::Timestamp,
+ NTypeIds::Int32,
+ NTypeIds::String
+ };
+ auto schema = TTestSchema::YdbSchema();
+ auto pk = TTestSchema::YdbPkSchema();
+
+ for (auto& type : types) {
+ schema[0].second = TTypeInfo(type);
+ pk[0].second = TTypeInfo(type);
+ TestCompactionInGranuleImpl(true, schema, pk);
+ }
}
Y_UNIT_TEST(ReadWithProgram) {
@@ -1473,7 +1637,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
}
- Y_UNIT_TEST(CompactionSplitGranule) {
+ void TestCompactionSplitGranule(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema,
+ const TVector<std::pair<TString, TTypeInfo>>& ydbPk) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -1490,9 +1655,11 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
ui64 planStep = 100;
ui64 txId = 100;
- SetupSchema(runtime, sender, tableId);
+ SetupSchema(runtime, sender, tableId, ydbSchema, ydbPk, "lz4");
TAutoPtr<IEventHandle> handle;
+ bool isStrPk0 = ydbPk[0].second == TTypeInfo(NTypeIds::String) || ydbPk[0].second == TTypeInfo(NTypeIds::Utf8);
+
// Write different keys: grow on compatcion
static const ui32 triggerPortionSize = 85 * 1000;
@@ -1502,7 +1669,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
for (ui32 i = 0; i < numWrites; ++i, ++writeId, ++planStep, ++txId) {
ui64 start = i * (triggerPortionSize - overlapSize);
std::pair<ui64, ui64> triggerPortion = {start, start + triggerPortionSize};
- TString triggerData = MakeTestBlob(triggerPortion, testYdbSchema);
+ TString triggerData = MakeTestBlob(triggerPortion, ydbSchema);
UNIT_ASSERT(triggerData.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
UNIT_ASSERT(triggerData.size() < NColumnShard::TLimits::MAX_BLOB_SIZE);
@@ -1569,7 +1736,12 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
UNIT_ASSERT(num < 100);
}
- UNIT_ASSERT(DataHas(readData, schema, {0, numRows}, true));
+ if (isStrPk0) {
+ UNIT_ASSERT(DataHas<std::string>(readData, schema, {0, numRows}, true));
+ } else {
+ UNIT_ASSERT(DataHas(readData, schema, {0, numRows}, true));
+ }
+
readData.clear();
{ // read with predicate (TO)
@@ -1577,9 +1749,9 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
Proto(read.get()).AddColumnNames("timestamp");
Proto(read.get()).AddColumnNames("message");
- TSerializedTableRange range = MakeTestRange({0, 1000}, false, false, testYdbPkSchema);
+ TSerializedTableRange range = MakeTestRange({0, 1}, false, false, ydbPk);
NOlap::TPredicate prGreater, prLess;
- std::tie(prGreater, prLess) = RangePredicates(range, testYdbPkSchema);
+ std::tie(prGreater, prLess) = RangePredicates(range, ydbPk);
auto* less = Proto(read.get()).MutableLessPredicate();
for (auto& name : prLess.ColumnNames()) {
@@ -1597,6 +1769,9 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
UNIT_ASSERT(event);
auto& resRead = Proto(event);
+ Cerr << "[" << __LINE__ << "] " << ydbPk[0].second.GetTypeId() << " "
+ << resRead.GetBatch() << " " << resRead.GetData().size() << "\n";
+
UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
@@ -1619,6 +1794,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
//UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO
UNIT_ASSERT_VALUES_EQUAL(readStats.GetUsedColumns(), 7); // planStep, txId + 4 PK columns + "message"
UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 1);
+ //UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), 1); // TODO: min-max index optimization?
}
// TODO: check data
@@ -1629,9 +1805,13 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
Proto(read.get()).AddColumnNames("timestamp");
Proto(read.get()).AddColumnNames("message");
- TSerializedTableRange range = MakeTestRange({2000 * 1000, 1000 * 1000 * 1000}, false, false, testYdbPkSchema);
+ TSerializedTableRange range = MakeTestRange({numRows, numRows + 1000}, false, false, ydbPk);
+ if (isStrPk0) {
+ range = MakeTestRange({99990, 99999}, false, false, ydbPk);
+ }
+
NOlap::TPredicate prGreater, prLess;
- std::tie(prGreater, prLess) = RangePredicates(range, testYdbPkSchema);
+ std::tie(prGreater, prLess) = RangePredicates(range, ydbPk);
auto* greater = Proto(read.get()).MutableGreaterPredicate();
for (auto& name : prGreater.ColumnNames()) {
@@ -1649,6 +1829,9 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
UNIT_ASSERT(event);
auto& resRead = Proto(event);
+ Cerr << "[" << __LINE__ << "] " << ydbPk[0].second.GetTypeId() << " "
+ << resRead.GetBatch() << " " << resRead.GetData().size() << "\n";
+
UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
@@ -1671,6 +1854,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
//UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO
UNIT_ASSERT_VALUES_EQUAL(readStats.GetUsedColumns(), 7); // planStep, txId + 4 PK columns + "message"
UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 1);
+ //UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), 0); // TODO: min-max index optimization?
}
// TODO: check data
@@ -1706,23 +1890,66 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
ui64 numBytes = static_cast<arrow::UInt64Array&>(*bytes).Value(i);
ui64 numRawBytes = static_cast<arrow::UInt64Array&>(*rawBytes).Value(i);
+ Cerr << "[" << __LINE__ << "] " << ydbPk[0].second.GetTypeId() << " "
+ << pathId << " " << kind << " " << numRows << " " << numBytes << " " << numRawBytes << "\n";
+
if (pathId == tableId) {
if (kind == 2) {
UNIT_ASSERT_VALUES_EQUAL(numRows, (triggerPortionSize - overlapSize) * numWrites + overlapSize);
UNIT_ASSERT(numBytes > numRows);
- UNIT_ASSERT(numRawBytes > numBytes);
+ //UNIT_ASSERT(numRawBytes > numBytes);
}
} else {
UNIT_ASSERT_VALUES_EQUAL(numRows, 0);
UNIT_ASSERT_VALUES_EQUAL(numBytes, 0);
UNIT_ASSERT_VALUES_EQUAL(numRawBytes, 0);
}
-
- Cerr << pathId << " " << kind << " " << numRows << " " << numBytes << " " << numRawBytes << "\n";
}
}
}
+ Y_UNIT_TEST(CompactionSplitGranule) {
+ std::vector<TTypeId> types = {
+ NTypeIds::Timestamp,
+ //NTypeIds::Int16,
+ //NTypeIds::Uint16,
+ NTypeIds::Int32,
+ NTypeIds::Uint32,
+ NTypeIds::Int64,
+ NTypeIds::Uint64,
+ //NTypeIds::Date,
+ NTypeIds::Datetime
+ //NTypeIds::Interval
+ //NTypeIds::Float
+ //NTypeIds::Double
+ };
+
+ auto schema = TTestSchema::YdbSchema();
+ auto pk = TTestSchema::YdbPkSchema();
+
+ for (auto& type : types) {
+ schema[0].second = TTypeInfo(type);
+ pk[0].second = TTypeInfo(type);
+ TestCompactionSplitGranule(schema, pk);
+ }
+ }
+
+ Y_UNIT_TEST(CompactionSplitGranuleStrKey) {
+ std::vector<TTypeId> types = {
+ NTypeIds::String,
+ NTypeIds::Utf8
+ };
+
+ auto schema = TTestSchema::YdbSchema();
+ auto pk = TTestSchema::YdbPkSchema();
+
+ for (auto& type : types) {
+ schema[0].second = TTypeInfo(type);
+ pk[0].second = TTypeInfo(type);
+ TestCompactionSplitGranule(schema, pk);
+ }
+ }
+
Y_UNIT_TEST(ReadStale) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -1740,13 +1967,14 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
ui64 planStep = 1000000;
ui64 txId = 100;
- SetupSchema(runtime, sender, tableId);
+ auto ydbSchema = TTestSchema::YdbSchema();
+ SetupSchema(runtime, sender, tableId, ydbSchema);
TAutoPtr<IEventHandle> handle;
// Write some test data to adavnce the time
{
std::pair<ui64, ui64> triggerPortion = {1, 1000};
- TString triggerData = MakeTestBlob(triggerPortion, testYdbSchema);
+ TString triggerData = MakeTestBlob(triggerPortion, ydbSchema);
UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData));
@@ -1827,7 +2055,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
ui64 writeId = 0;
ui64 tableId = 1;
- SetupSchema(runtime, sender, tableId);
+ auto ydbSchema = TTestSchema::YdbSchema();
+ SetupSchema(runtime, sender, tableId, ydbSchema);
TAutoPtr<IEventHandle> handle;
bool blockReadFinished = true;
@@ -1944,7 +2173,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
static const ui32 triggerPortionSize = 75 * 1000;
std::pair<ui64, ui64> triggerPortion = {0, triggerPortionSize};
- TString triggerData = MakeTestBlob(triggerPortion, testYdbSchema);
+ TString triggerData = MakeTestBlob(triggerPortion, ydbSchema);
UNIT_ASSERT(triggerData.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
UNIT_ASSERT(triggerData.size() < NColumnShard::TLimits::MAX_BLOB_SIZE);
@@ -1962,7 +2191,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
// Do a small write that is not indexed so that we will get a committed blob in read request
{
- TString smallData = MakeTestBlob({0, 2}, testYdbSchema);
+ TString smallData = MakeTestBlob({0, 2}, ydbSchema);
UNIT_ASSERT(smallData.size() < 100 * 1024);
UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, smallData));
diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
index ecf54d53e54..e8e6a8e65a2 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
@@ -12,6 +12,7 @@ using NWrappers::NTestHelpers::TS3Mock;
namespace {
static const TVector<std::pair<TString, TTypeInfo>> testYdbSchema = TTestSchema::YdbSchema();
+static const TVector<std::pair<TString, TTypeInfo>> testYdbPk = TTestSchema::YdbPkSchema();
std::shared_ptr<arrow::RecordBatch> UpdateColumn(std::shared_ptr<arrow::RecordBatch> batch, TString columnName, i64 seconds) {
std::string name(columnName.c_str(), columnName.size());
@@ -46,8 +47,8 @@ bool TriggerTTL(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot s
return (res.GetStatus() == NKikimrTxColumnShard::SUCCESS);
}
-std::shared_ptr<arrow::TimestampArray> GetTimestampColumn(const TString& blob, const TString& srtSchema,
- const std::string& columnName)
+std::shared_ptr<arrow::Array> GetFirstPKColumn(const TString& blob, const TString& srtSchema,
+ const std::string& columnName)
{
auto schema = NArrow::DeserializeSchema(srtSchema);
auto batch = NArrow::DeserializeBatch(blob, schema);
@@ -55,7 +56,7 @@ std::shared_ptr<arrow::TimestampArray> GetTimestampColumn(const TString& blob, c
std::shared_ptr<arrow::Array> array = batch->GetColumnByName(columnName);
UNIT_ASSERT(array);
- return std::static_pointer_cast<arrow::TimestampArray>(array);
+ return array;
}
bool CheckSame(const TString& blob, const TString& srtSchema, ui32 expectedSize,
@@ -63,14 +64,14 @@ bool CheckSame(const TString& blob, const TString& srtSchema, ui32 expectedSize,
auto expected = arrow::TimestampScalar(seconds * 1000 * 1000, arrow::timestamp(arrow::TimeUnit::MICRO));
UNIT_ASSERT_VALUES_EQUAL(expected.value, seconds * 1000 * 1000);
- auto tsCol = GetTimestampColumn(blob, srtSchema, columnName);
+ auto tsCol = GetFirstPKColumn(blob, srtSchema, columnName);
UNIT_ASSERT(tsCol);
UNIT_ASSERT_VALUES_EQUAL(tsCol->length(), expectedSize);
for (int i = 0; i < tsCol->length(); ++i) {
- i64 value = tsCol->Value(i);
- if (value != expected.value) {
- Cerr << "Unexpected: " << value << ", expected " << expected.value << "\n";
+ auto value = *tsCol->GetScalar(i);
+ if (!value->Equals(expected)) {
+ Cerr << "Unexpected: '" << value->ToString() << "', expected " << expected.value << "\n";
return false;
}
}
@@ -98,6 +99,23 @@ std::vector<TString> MakeData(const std::vector<ui64>& ts, ui32 portionSize, ui3
return data;
}
+bool TestCreateTable(const TString& txBody, ui64 planStep = 1000, ui64 txId = 100) {
+ TTestBasicRuntime runtime;
+ TTester::Setup(runtime);
+
+ TActorId sender = runtime.AllocateEdgeActor();
+ CreateTestBootstrapper(runtime,
+ CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard),
+ &CreateColumnShard);
+
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
+ runtime.DispatchEvents(options);
+
+ //
+ return ProposeSchemaTx(runtime, sender, txBody, {++planStep, ++txId});
+}
+
// ts[0] = 1600000000; // date -u --date='@1600000000' Sun Sep 13 12:26:40 UTC 2020
// ts[1] = 1620000000; // date -u --date='@1620000000' Mon May 3 00:00:00 UTC 2021
void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
@@ -135,7 +153,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
spec.SetTtl(ttlSec);
}
bool ok = ProposeSchemaTx(runtime, sender,
- TTestSchema::CreateTableTxBody(tableId, testYdbSchema, spec),
+ TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk, spec),
{++planStep, ++txId});
UNIT_ASSERT(ok);
PlanSchemaTx(runtime, sender, {planStep, txId});
@@ -398,7 +416,7 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe
UNIT_ASSERT(specs.size() > 0);
bool ok = ProposeSchemaTx(runtime, sender,
- TTestSchema::CreateTableTxBody(tableId, testYdbSchema, specs[0]),
+ TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk, specs[0]),
{++planStep, ++txId});
UNIT_ASSERT(ok);
PlanSchemaTx(runtime, sender, {planStep, txId});
@@ -471,13 +489,15 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe
if (resRead.GetData().size()) {
auto& meta = resRead.GetMeta();
auto& schema = meta.GetSchema();
- auto tsColumn = GetTimestampColumn(resRead.GetData(), schema, TTestSchema::DefaultTtlColumn);
- UNIT_ASSERT(tsColumn);
+ auto pkColumn = GetFirstPKColumn(resRead.GetData(), schema, TTestSchema::DefaultTtlColumn);
+ UNIT_ASSERT(pkColumn);
+ UNIT_ASSERT(pkColumn->type_id() == arrow::Type::TIMESTAMP);
UNIT_ASSERT(meta.HasReadStats());
auto& readStats = meta.GetReadStats();
ui64 numBytes = readStats.GetDataBytes(); // compressed bytes in storage
+ auto tsColumn = std::static_pointer_cast<arrow::TimestampArray>(pkColumn);
resColumns.emplace_back(tsColumn, numBytes);
} else {
resColumns.emplace_back(nullptr, 0);
@@ -610,7 +630,7 @@ void TestDrop(bool reboots) {
ui64 planStep = 1000000000; // greater then delays
ui64 txId = 100;
- bool ok = ProposeSchemaTx(runtime, sender, TTestSchema::CreateTableTxBody(tableId, testYdbSchema),
+ bool ok = ProposeSchemaTx(runtime, sender, TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk),
{++planStep, ++txId});
UNIT_ASSERT(ok);
PlanSchemaTx(runtime, sender, {planStep, txId});
@@ -676,6 +696,77 @@ extern bool gAllowLogBatchingDefaultValue;
}
Y_UNIT_TEST_SUITE(TColumnShardTestSchema) {
+ Y_UNIT_TEST(CreateTable) {
+ ui64 tableId = 1;
+
+ TVector<TTypeId> intTypes = {
+ NTypeIds::Timestamp,
+ NTypeIds::Int8,
+ NTypeIds::Int16,
+ NTypeIds::Int32,
+ NTypeIds::Int64,
+ NTypeIds::Uint8,
+ NTypeIds::Uint16,
+ NTypeIds::Uint32,
+ NTypeIds::Uint64,
+ NTypeIds::Date,
+ NTypeIds::Datetime
+ };
+
+ auto schema = TTestSchema::YdbSchema({"k0", TTypeInfo(NTypeIds::Timestamp)});
+ auto pk = schema;
+ pk.resize(4);
+
+ for (auto& ydbType : intTypes) {
+ schema[0].second = TTypeInfo(ydbType);
+ pk[0].second = TTypeInfo(ydbType);
+ auto txBody = TTestSchema::CreateTableTxBody(tableId, schema, pk);
+ bool ok = TestCreateTable(txBody);
+ UNIT_ASSERT(ok);
+ }
+
+ // TODO: support float types
+ TVector<TTypeId> floatTypes = {
+ NTypeIds::Float,
+ NTypeIds::Double
+ };
+
+ for (auto& ydbType : floatTypes) {
+ schema[0].second = TTypeInfo(ydbType);
+ pk[0].second = TTypeInfo(ydbType);
+ auto txBody = TTestSchema::CreateTableTxBody(tableId, schema, pk);
+ bool ok = TestCreateTable(txBody);
+ UNIT_ASSERT(!ok);
+ }
+
+ TVector<TTypeId> strTypes = {
+ NTypeIds::String,
+ NTypeIds::Utf8
+ };
+
+ for (auto& ydbType : strTypes) {
+ schema[0].second = TTypeInfo(ydbType);
+ pk[0].second = TTypeInfo(ydbType);
+ auto txBody = TTestSchema::CreateTableTxBody(tableId, schema, pk);
+ bool ok = TestCreateTable(txBody);
+ UNIT_ASSERT(ok);
+ }
+
+ TVector<TTypeId> xsonTypes = {
+ NTypeIds::Yson,
+ NTypeIds::Json,
+ NTypeIds::JsonDocument
+ };
+
+ for (auto& ydbType : xsonTypes) {
+ schema[0].second = TTypeInfo(ydbType);
+ pk[0].second = TTypeInfo(ydbType);
+ auto txBody = TTestSchema::CreateTableTxBody(tableId, schema, pk);
+ bool ok = TestCreateTable(txBody);
+ UNIT_ASSERT(!ok);
+ }
+ }
+
Y_UNIT_TEST(ExternalTTL) {
TestTtl(false, false);
}
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index 83c99145215..84d9937a9ad 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -635,36 +635,19 @@ private:
auto outputColumns = GetOutputColumns(ctx);
if (!outputColumns.empty()) {
- std::shared_ptr<arrow::RecordBatch> batch = Batch;
- if (!batch) {
+ if (!Batch) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, "No batch in bulk upsert data", ctx);
}
-#if 1 // TODO: it's been checked in BuildSchema, remove if possible
- for (auto& columnName : outputColumns) {
- if (!batch->GetColumnByName(columnName)) {
- // TODO: upsert with implicit NULLs instead?
- return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR,
- "No column '" + columnName + "' in bulk upsert data", ctx);
- }
- }
-#endif
- // reuse source serialized batch if it's present and OK
- bool reuseData = false;
- {
- if (GetSourceType() == EUploadSource::ArrowBatch) {
- reuseData = (outputColumns.size() == (size_t)batch->num_columns());
- }
- // TODO: reuse batchData with reordered columns
- for (int i = 0; i < batch->num_columns() && reuseData; ++i) {
- if (batch->column_name(i) != outputColumns[i]) {
- reuseData = false;
+ auto batch = NArrow::ExtractColumns(Batch, outputColumns);
+ if (!batch) {
+ for (auto& columnName : outputColumns) {
+ if (Batch->schema()->GetFieldIndex(columnName) < 0) {
+ return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR,
+ "No column '" + columnName + "' in bulk upsert data", ctx);
}
}
- }
-
- if (!reuseData) {
- batch = NArrow::ExtractColumns(batch, outputColumns);
+ return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "Cannot prepare bulk upsert data", ctx);
}
Y_VERIFY(batch);
diff --git a/ydb/services/ydb/ydb_logstore_ut.cpp b/ydb/services/ydb/ydb_logstore_ut.cpp
index 221dcfc206e..5b5c3205da7 100644
--- a/ydb/services/ydb/ydb_logstore_ut.cpp
+++ b/ydb/services/ydb/ydb_logstore_ut.cpp
@@ -11,9 +11,30 @@ using namespace NYdb;
namespace {
-TVector<NYdb::TColumn> TestSchemaColumns() {
+const std::vector<EPrimitiveType> allowedTypes = {
+ //EPrimitiveType::Bool,
+ EPrimitiveType::Uint8,
+ EPrimitiveType::Int32,
+ EPrimitiveType::Uint32,
+ EPrimitiveType::Int64,
+ EPrimitiveType::Uint64,
+ //EPrimitiveType::Float, // TODO
+ //EPrimitiveType::Double,// TODO
+ EPrimitiveType::Date,
+ EPrimitiveType::Datetime,
+ EPrimitiveType::Timestamp,
+ //EPrimitiveType::Interval,
+ EPrimitiveType::String,
+ EPrimitiveType::Utf8,
+ //EPrimitiveType::Yson,
+ //EPrimitiveType::Json,
+ //EPrimitiveType::JsonDocument,
+ //EPrimitiveType::DyNumber,
+};
+
+TVector<NYdb::TColumn> TestSchemaColumns(EPrimitiveType pkField = EPrimitiveType::Timestamp) {
return {
- NYdb::TColumn("timestamp", NYdb::NLogStore::MakeColumnType(EPrimitiveType::Timestamp)),
+ NYdb::TColumn("timestamp", NYdb::NLogStore::MakeColumnType(pkField)),
NYdb::TColumn("resource_type", NYdb::NLogStore::MakeColumnType(EPrimitiveType::Utf8)),
NYdb::TColumn("resource_id", NYdb::NLogStore::MakeColumnType(EPrimitiveType::Utf8)),
NYdb::TColumn("uid", NYdb::NLogStore::MakeColumnType(EPrimitiveType::Utf8)),
@@ -59,7 +80,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
return connection;
}
- Y_UNIT_TEST(LogStore) {
+ void CreateDropStore(EPrimitiveType pkField) {
NKikimrConfig::TAppConfig appConfig;
TKikimrWithGrpcAndRootSchema server(appConfig);
EnableDebugLogs(server);
@@ -68,7 +89,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
NYdb::NLogStore::TLogStoreClient logStoreClient(connection);
{
- NYdb::NLogStore::TSchema logSchema(TestSchemaColumns(), TestSchemaKey());
+ NYdb::NLogStore::TSchema logSchema(TestSchemaColumns(pkField), TestSchemaKey());
THashMap<TString, NYdb::NLogStore::TSchema> schemaPresets;
schemaPresets["default"] = logSchema;
NYdb::NLogStore::TLogStoreDescription storeDescr(4, schemaPresets);
@@ -87,7 +108,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
const auto& schema = descr.GetSchemaPresets().begin()->second;
UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns().size(), 10);
- UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[0].ToString(), "{ name: \"timestamp\", type: Timestamp? }");
+ UNIT_ASSERT(schema.GetColumns()[0].ToString().StartsWith("{ name: \"timestamp\", type:"));
UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[1].ToString(), "{ name: \"resource_type\", type: Utf8? }");
UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[4].ToString(), "{ name: \"level\", type: Int32? }");
UNIT_ASSERT_VALUES_EQUAL(schema.GetPrimaryKeyColumns(),
@@ -102,6 +123,12 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
}
}
+ Y_UNIT_TEST(LogStore) {
+ for (auto pk0 : allowedTypes) {
+ CreateDropStore(pk0);
+ }
+ }
+
Y_UNIT_TEST(LogStoreTiers) {
NKikimrConfig::TAppConfig appConfig;
TKikimrWithGrpcAndRootSchema server(appConfig);
@@ -188,7 +215,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
}
{ // wrong schema: not supported PK
- NYdb::NLogStore::TSchema logSchema(TestSchemaColumns(), {"resource_type", "resource_id"});
+ NYdb::NLogStore::TSchema logSchema(TestSchemaColumns(), {"json_payload", "resource_id"});
THashMap<TString, NYdb::NLogStore::TSchema> schemaPresets;
schemaPresets["default"] = logSchema;
NYdb::NLogStore::TLogStoreDescription storeDescr(4, schemaPresets);
@@ -273,7 +300,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
}
}
- Y_UNIT_TEST(LogTable) {
+ void CreateDropTable(EPrimitiveType pkField) {
NKikimrConfig::TAppConfig appConfig;
TKikimrWithGrpcAndRootSchema server(appConfig);
EnableDebugLogs(server);
@@ -281,7 +308,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
auto connection = ConnectToServer(server);
NYdb::NLogStore::TLogStoreClient logStoreClient(connection);
- NYdb::NLogStore::TSchema logSchema(TestSchemaColumns(), TestSchemaKey());
+ NYdb::NLogStore::TSchema logSchema(TestSchemaColumns(pkField), TestSchemaKey());
{
THashMap<TString, NYdb::NLogStore::TSchema> schemaPresets;
@@ -305,7 +332,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
UNIT_ASSERT_VALUES_EQUAL(descr.GetShardsCount(), 4);
const auto& schema = descr.GetSchema();
UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns().size(), 10);
- UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[0].ToString(), "{ name: \"timestamp\", type: Timestamp? }");
+ UNIT_ASSERT(schema.GetColumns()[0].ToString().StartsWith("{ name: \"timestamp\", type:"));
UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[1].ToString(), "{ name: \"resource_type\", type: Utf8? }");
UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[4].ToString(), "{ name: \"level\", type: Int32? }");
UNIT_ASSERT_VALUES_EQUAL(schema.GetPrimaryKeyColumns(),
@@ -327,7 +354,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
UNIT_ASSERT_VALUES_EQUAL(descr.GetShardsCount(), 4);
const auto& schema = descr.GetSchema();
UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns().size(), 10);
- UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[0].ToString(), "{ name: \"timestamp\", type: Timestamp? }");
+ UNIT_ASSERT(schema.GetColumns()[0].ToString().StartsWith("{ name: \"timestamp\", type:"));
UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[1].ToString(), "{ name: \"resource_type\", type: Utf8? }");
UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[4].ToString(), "{ name: \"level\", type: Int32? }");
UNIT_ASSERT_VALUES_EQUAL(schema.GetPrimaryKeyColumns(),
@@ -414,6 +441,12 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
}
}
+ Y_UNIT_TEST(LogTable) {
+ for (auto pk0 : allowedTypes) {
+ CreateDropTable(pk0);
+ }
+ }
+
Y_UNIT_TEST(AlterLogStore) {
NKikimrConfig::TAppConfig appConfig;
TKikimrWithGrpcAndRootSchema server(appConfig);
diff --git a/ydb/services/ydb/ydb_olapstore_ut.cpp b/ydb/services/ydb/ydb_olapstore_ut.cpp
index 84294c161bc..b66531f43de 100644
--- a/ydb/services/ydb/ydb_olapstore_ut.cpp
+++ b/ydb/services/ydb/ydb_olapstore_ut.cpp
@@ -7,13 +7,36 @@
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+#include <ydb/library/yql/public/udf/udf_types.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
using namespace NYdb;
namespace {
-std::vector<TString> testShardingVariants = {
+
+THashMap<EPrimitiveType, TString> allowedTypes = {
+ //EPrimitiveType::Bool,
+ {EPrimitiveType::Uint8, "Uint8"},
+ {EPrimitiveType::Int32, "Int32"},
+ {EPrimitiveType::Uint32, "Uint32"},
+ {EPrimitiveType::Int64, "Int64"},
+ {EPrimitiveType::Uint64, "Uint64"},
+ //{EPrimitiveType::Float, "Float"},
+ //{EPrimitiveType::Double, "Double"},
+ {EPrimitiveType::Date, "Date"},
+ {EPrimitiveType::Datetime, "Datetime"},
+ {EPrimitiveType::Timestamp, "Timestamp"},
+ //{EPrimitiveType::Interval, "Interval"},
+ {EPrimitiveType::String, "String"},
+ {EPrimitiveType::Utf8, "Utf8"}
+ //EPrimitiveType::Yson,
+ //EPrimitiveType::Json,
+ //EPrimitiveType::JsonDocument,
+ //EPrimitiveType::DyNumber,
+};
+
+static constexpr const char* testShardingVariants[] = {
R"(["timestamp", "uid"])",
R"(["timestamp", "resource_type", "resource_id", "uid"])"
};
@@ -47,10 +70,16 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
return connection;
}
- void CreateOlapTable(const TServerSettings& settings, const TString& tableName, ui32 numShards = 2,
- const TString shardingColumns = R"(["timestamp", "uid"])")
+ struct TTestOlapTableOptions {
+ EPrimitiveType TsType = EPrimitiveType::Timestamp;
+ ui32 NumShards = 2;
+ TString Sharding = testShardingVariants[0];
+ TString HashFunction = "HASH_FUNCTION_CLOUD_LOGS";
+ };
+
+ void CreateOlapTable(const TServerSettings& settings, const TString& tableName, TTestOlapTableOptions opts = {})
{
- const char * tableDescr = R"(
+ TString tableDescr = Sprintf(R"(
Name: "OlapStore"
ColumnShardCount: 4
SchemaPresets {
@@ -60,7 +89,7 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
Columns { Name: "json_payload" Type: "JsonDocument" }
Columns { Name: "resource_id" Type: "Utf8" }
Columns { Name: "uid" Type: "Utf8" }
- Columns { Name: "timestamp" Type: "Timestamp" }
+ Columns { Name: "timestamp" Type: "%s" }
Columns { Name: "resource_type" Type: "Utf8" }
Columns { Name: "level" Type: "Int32" }
Columns { Name: "ingested_at" Type: "Timestamp" }
@@ -70,7 +99,7 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
}
}
- )";
+ )", allowedTypes[opts.TsType].c_str());
TClient annoyingClient(settings);
NMsgBusProxy::EResponseStatus status = annoyingClient.CreateOlapStore("/Root", tableDescr);
@@ -81,11 +110,11 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
ColumnShardCount : %d
Sharding {
HashSharding {
- Function: HASH_FUNCTION_CLOUD_LOGS
+ Function: %s
Columns: %s
}
}
- )", tableName.c_str(), numShards, shardingColumns.c_str()));
+ )", tableName.c_str(), opts.NumShards, opts.HashFunction.c_str(), opts.Sharding.c_str()));
UNIT_ASSERT_VALUES_EQUAL(status, NMsgBusProxy::EResponseStatus::MSTATUS_OK);
}
@@ -112,16 +141,60 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
}
NYdb::NTable::TAsyncBulkUpsertResult SendBatch(NYdb::NTable::TTableClient& client, const TString& tableName,
- const ui64 batchSize, const ui32 baseUserId, i64& ts)
+ const ui64 batchSize, const ui32 baseUserId, std::pair<EPrimitiveType, i64>& value)
{
+ i64 ts = value.second;
+
TValueBuilder rows;
rows.BeginList();
for (ui64 i = 0; i < batchSize; ++i, ts += 1000) {
const ui32 userId = baseUserId + (i % 100);
- rows.AddListItem()
- .BeginStruct()
- .AddMember("timestamp").Timestamp(TInstant::MicroSeconds(ts))
- .AddMember("resource_type").Utf8(i%2 ? "app" : "nginx")
+ auto& row = rows.AddListItem()
+ .BeginStruct();
+ switch (value.first) {
+ case EPrimitiveType::Uint8:
+ row.AddMember("timestamp").Uint8(ts);
+ break;
+ case EPrimitiveType::Int32:
+ row.AddMember("timestamp").Int32(ts);
+ break;
+ case EPrimitiveType::Uint32:
+ row.AddMember("timestamp").Uint32(ts);
+ break;
+ case EPrimitiveType::Int64:
+ row.AddMember("timestamp").Int64(ts);
+ break;
+ case EPrimitiveType::Uint64:
+ row.AddMember("timestamp").Uint64(ts);
+ break;
+ case EPrimitiveType::Float:
+ row.AddMember("timestamp").Float(ts);
+ break;
+ case EPrimitiveType::Double:
+ row.AddMember("timestamp").Double(ts);
+ break;
+ case EPrimitiveType::Timestamp:
+ row.AddMember("timestamp").Timestamp(TInstant::MicroSeconds(ts % NYql::NUdf::MAX_TIMESTAMP));
+ break;
+ case EPrimitiveType::Date:
+ row.AddMember("timestamp").Date(TInstant::Days((ts / 1000) % NYql::NUdf::MAX_DATE));
+ break;
+ case EPrimitiveType::Datetime:
+ row.AddMember("timestamp").Datetime(TInstant::Seconds((ts / 1000) % NYql::NUdf::MAX_DATETIME));
+ break;
+ case EPrimitiveType::Interval:
+ row.AddMember("timestamp").Interval(ts);
+ break;
+ case EPrimitiveType::String:
+ row.AddMember("timestamp").String(ToString(ts));
+ break;
+ case EPrimitiveType::Utf8:
+ row.AddMember("timestamp").Utf8(ToString(ts));
+ break;
+ default:
+ UNIT_ASSERT(false);
+ }
+ row.AddMember("resource_type").Utf8(i%2 ? "app" : "nginx")
.AddMember("resource_id").Utf8("resource_" + ToString((i+13) % 7))
.AddMember("uid").Utf8(ToString(i % 23))
.AddMember("level").Int32(i % 10)
@@ -156,7 +229,7 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
NYdb::NTable::TTableClient client(connection, NYdb::NTable::TClientSettings().AuthToken(token));
TInstant start = TInstant::Now();
- i64 ts = startTs;
+ auto ts = std::make_pair<EPrimitiveType, i64>(EPrimitiveType::Timestamp, (i64)startTs);
const ui32 baseUserId = 1000000;
TVector<NYdb::NTable::TAsyncBulkUpsertResult> results;
@@ -208,7 +281,9 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
// Create OLTP and OLAP tables with the same set of columns and same PK
void CreateTestTables(const TServerSettings& settings, const TString& tableName, const TString& sharding) {
- CreateOlapTable(settings, tableName, 2, sharding);
+ TTestOlapTableOptions opts;
+ opts.Sharding = sharding;
+ CreateOlapTable(settings, tableName, opts);
CreateTable(settings, "oltp_" + tableName);
}
@@ -236,22 +311,27 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
return result;
}
- Y_UNIT_TEST(BulkUpsert) {
+ void TestBulkUpsert(EPrimitiveType pkFirstType) {
NKikimrConfig::TAppConfig appConfig;
TKikimrWithGrpcAndRootSchema server(appConfig);
EnableDebugLogs(server);
auto connection = ConnectToServer(server);
- CreateOlapTable(*server.ServerSettings, "log1");
+ TTestOlapTableOptions opts;
+ opts.TsType = pkFirstType;
+ opts.HashFunction = "HASH_FUNCTION_MODULO_N";
+ CreateOlapTable(*server.ServerSettings, "log1", opts);
TClient annoyingClient(*server.ServerSettings);
annoyingClient.ModifyOwner("/Root/OlapStore", "log1", "alice@builtin");
{
NYdb::NTable::TTableClient client(connection, NYdb::NTable::TClientSettings().AuthToken("bob@builtin"));
- i64 ts = 1000;
+
+ std::pair<EPrimitiveType, i64> ts(pkFirstType, 1000);
auto res = SendBatch(client, "/Root/OlapStore/log1", 100, 1, ts).GetValueSync();
+ Cerr << __FILE__ << ":" << __LINE__ << " Issues: " << res.GetIssues().ToString() << "\n";
UNIT_ASSERT_VALUES_EQUAL(res.GetStatus(), EStatus::UNAUTHORIZED);
UNIT_ASSERT_STRING_CONTAINS(res.GetIssues().ToString(),
"Access denied for bob@builtin with access UpdateRow to table '/Root/OlapStore/log1'");
@@ -262,8 +342,9 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
{
NYdb::NTable::TTableClient client(connection, NYdb::NTable::TClientSettings().AuthToken("alice@builtin"));
- i64 ts = 1000;
+ std::pair<EPrimitiveType, i64> ts(pkFirstType, 1000);
auto res = SendBatch(client, "log1", 100, 1, ts).GetValueSync();
+ Cerr << __FILE__ << ":" << __LINE__ << " Issues: " << res.GetIssues().ToString() << "\n";
UNIT_ASSERT_VALUES_EQUAL(res.GetStatus(), EStatus::SCHEME_ERROR);
UNIT_ASSERT_STRING_CONTAINS(res.GetIssues().ToString(), "Unknown database for table 'log1'");
@@ -273,8 +354,9 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
{
NYdb::NTable::TTableClient client(connection, NYdb::NTable::TClientSettings().AuthToken("alice@builtin"));
- i64 ts = 1000;
+ std::pair<EPrimitiveType, i64> ts(pkFirstType, 1000);
auto res = SendBatch(client, "/Root/OlapStore/log1", 100, 1, ts).GetValueSync();
+ Cerr << __FILE__ << ":" << __LINE__ << " Issues: " << res.GetIssues().ToString() << "\n";
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
TString result = RunQuery(connection, "SELECT count(*) FROM `/Root/OlapStore/log1`;");
@@ -282,6 +364,12 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
}
}
+ Y_UNIT_TEST(BulkUpsert) {
+ for (auto& [type, name] : allowedTypes) {
+ TestBulkUpsert(type);
+ }
+ }
+
void TestManyTables(const TString& sharding) {
NKikimrConfig::TAppConfig appConfig;
TKikimrWithGrpcAndRootSchema server(appConfig);
@@ -327,7 +415,9 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
auto connection = ConnectToServer(server);
NYdb::NTable::TTableClient client(connection);
- CreateOlapTable(*server.ServerSettings, "log1", 2, sharding);
+ TTestOlapTableOptions opts;
+ opts.Sharding = sharding;
+ CreateOlapTable(*server.ServerSettings, "log1", opts);
const ui64 batchCount = 100;
const ui64 batchSize = 1000;