diff options
author | chertus <azuikov@ydb.tech> | 2022-10-07 16:40:37 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2022-10-07 16:40:37 +0300 |
commit | 50b7367df2b1a0cb024fafe65f5254a81c8c895d (patch) | |
tree | a2a1d62e4f4c2213ed353aa55ff609f27cf5f3d9 | |
parent | 2c90eec063340e2538e15e1718821387f5d21f8a (diff) | |
download | ydb-50b7367df2b1a0cb024fafe65f5254a81c8c895d.tar.gz |
use Scalar instead of ui64 as mark in ColumnShard engine
32 files changed, 1600 insertions, 622 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp index 00feaabbf5e..0dc67b9e028 100644 --- a/ydb/core/formats/arrow_helpers.cpp +++ b/ydb/core/formats/arrow_helpers.cpp @@ -5,6 +5,7 @@ #include <ydb/library/binary_json/write.h> #include <ydb/library/dynumber/dynumber.h> +#include <ydb/core/util/yverify_stream.h> #include <util/memory/pool.h> #include <util/system/yassert.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/io/memory.h> @@ -356,11 +357,12 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow:: auto srcSchema = srcBatch->schema(); for (auto& name : columnNames) { - fields.push_back(srcSchema->GetFieldByName(name)); - columns.push_back(srcBatch->GetColumnByName(name)); - if (!columns.back()) { + int pos = srcSchema->GetFieldIndex(name); + if (pos < 0) { return {}; } + fields.push_back(srcSchema->field(pos)); + columns.push_back(srcBatch->column(pos)); } return arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(fields), srcBatch->num_rows(), columns); @@ -374,6 +376,10 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow:: for (auto& field : dstSchema->fields()) { columns.push_back(srcBatch->GetColumnByName(field->name())); + if (!columns.back()->type()->Equals(field->type())) { + columns.back() = {}; + } + if (!columns.back()) { if (addNotExisted) { auto result = arrow::MakeArrayOfNull(field->type(), srcBatch->num_rows()); @@ -709,6 +715,40 @@ ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column) { return bytes; } +i64 LowerBound(const std::shared_ptr<arrow::Array>& array, const arrow::Scalar& border, i64 offset) { + i64 pos = 0; + SwitchType(array->type_id(), [&](const auto& type) { + using TWrap = std::decay_t<decltype(type)>; + using T = typename TWrap::T; + using TArray = typename arrow::TypeTraits<T>::ArrayType; + using TScalar = typename arrow::TypeTraits<T>::ScalarType; + + auto& column = static_cast<const TArray&>(*array); + + if constexpr (arrow::is_number_type<T>() || arrow::is_timestamp_type<T>()) { + const auto* start = column.raw_values() + offset; + const auto* end = column.raw_values() + column.length(); + pos = offset; + pos += std::lower_bound(start, end, static_cast<const TScalar&>(border).value) - start; + } else if constexpr (arrow::has_string_view<T>()) { + arrow::util::string_view value(*static_cast<const TScalar&>(border).value); + + // TODO: binary search + for (pos = offset; pos < column.length(); ++pos) { + if (!(column.GetView(pos) < value)) { + return true; + } + } + } else { + Y_VERIFY(false); // not implemented + } + + return true; + }); + + return pos; +} + std::shared_ptr<arrow::UInt64Array> MakeUI64Array(ui64 value, i64 size) { auto res = arrow::MakeArrayFromScalar(arrow::UInt64Scalar(value), size); Y_VERIFY(res.ok()); @@ -742,29 +782,65 @@ std::pair<int, int> FindMinMaxPosition(const std::shared_ptr<arrow::Array>& arra return {minPos, maxPos}; } -std::shared_ptr<arrow::Scalar> GetScalar(const std::shared_ptr<arrow::Array>& array, int position) { +std::shared_ptr<arrow::Scalar> MinScalar(const std::shared_ptr<arrow::DataType>& type) { std::shared_ptr<arrow::Scalar> out; - SwitchType(array->type_id(), [&](const auto& type) { - using TWrap = std::decay_t<decltype(type)>; - using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType; - using TScalar = typename arrow::TypeTraits<typename TWrap::T>::ScalarType; - - auto& column = static_cast<const TArray&>(*array); - if constexpr (std::is_same_v<TScalar, arrow::StringScalar> || - std::is_same_v<TScalar, arrow::BinaryScalar> || - std::is_same_v<TScalar, arrow::FixedSizeBinaryScalar> || - std::is_same_v<TScalar, arrow::Decimal128Scalar> || - std::is_same_v<TScalar, arrow::LargeStringScalar> || - std::is_same_v<TScalar, arrow::LargeBinaryScalar>) { - // TODO + SwitchType(type->id(), [&](const auto& t) { + using TWrap = std::decay_t<decltype(t)>; + using T = typename TWrap::T; + using TScalar = typename arrow::TypeTraits<T>::ScalarType; + + if constexpr (std::is_same_v<T, arrow::StringType> || + std::is_same_v<T, arrow::BinaryType> || + std::is_same_v<T, arrow::LargeStringType> || + std::is_same_v<T, arrow::LargeBinaryType>) { + out = std::make_shared<TScalar>(arrow::Buffer::FromString(""), type); + } else if constexpr (std::is_same_v<T, arrow::FixedSizeBinaryType>) { + std::string s(static_cast<arrow::FixedSizeBinaryType&>(*type).byte_width(), '\0'); + out = std::make_shared<TScalar>(arrow::Buffer::FromString(s), type); + } else if constexpr (std::is_same_v<T, arrow::HalfFloatType>) { + return false; + } else if constexpr (arrow::is_temporal_type<T>::value) { + using TCType = typename arrow::TypeTraits<T>::CType; + out = std::make_shared<TScalar>(Min<TCType>(), type); + } else if constexpr (arrow::has_c_type<T>::value) { + using TCType = typename arrow::TypeTraits<T>::CType; + out = std::make_shared<TScalar>(Min<TCType>()); } else { - out = std::make_shared<TScalar>(column.GetView(position), array->type()); + return false; } return true; }); + Y_VERIFY(out); return out; } +std::shared_ptr<arrow::Scalar> GetScalar(const std::shared_ptr<arrow::Array>& array, int position) { + auto res = array->GetScalar(position); + Y_VERIFY(res.ok()); + return *res; +} + +bool IsGoodScalar(const std::shared_ptr<arrow::Scalar>& x) { + if (!x) { + return false; + } + + return SwitchType(x->type->id(), [&](const auto& type) { + using TWrap = std::decay_t<decltype(type)>; + using TScalar = typename arrow::TypeTraits<typename TWrap::T>::ScalarType; + using TValue = std::decay_t<decltype(static_cast<const TScalar&>(*x).value)>; + + if constexpr (arrow::has_string_view<typename TWrap::T>()) { + const auto& xval = static_cast<const TScalar&>(*x).value; + return xval && xval->data(); + } + if constexpr (std::is_arithmetic_v<TValue>) { + return true; + } + return false; + }); +} + bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y) { Y_VERIFY(x); Y_VERIFY(y); @@ -772,16 +848,21 @@ bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<a } bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y) { - Y_VERIFY(x.type->Equals(y.type)); + Y_VERIFY_S(x.type->Equals(y.type), x.type->ToString() + " vs " + y.type->ToString()); return SwitchType(x.type->id(), [&](const auto& type) { using TWrap = std::decay_t<decltype(type)>; using TScalar = typename arrow::TypeTraits<typename TWrap::T>::ScalarType; using TValue = std::decay_t<decltype(static_cast<const TScalar&>(x).value)>; + if constexpr (arrow::has_string_view<typename TWrap::T>()) { const auto& xval = static_cast<const TScalar&>(x).value; const auto& yval = static_cast<const TScalar&>(y).value; - return TStringBuf((char*)xval->data(), xval->size()) < TStringBuf((char*)yval->data(), yval->size()); + Y_VERIFY(xval); + Y_VERIFY(yval); + TStringBuf xBuf(reinterpret_cast<const char*>(xval->data()), xval->size()); + TStringBuf yBuf(reinterpret_cast<const char*>(yval->data()), yval->size()); + return xBuf < yBuf; } if constexpr (std::is_arithmetic_v<TValue>) { const auto& xval = static_cast<const TScalar&>(x).value; diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h index 9f96d2b81b7..c9ba1b464a0 100644 --- a/ydb/core/formats/arrow_helpers.h +++ b/ydb/core/formats/arrow_helpers.h @@ -103,6 +103,8 @@ ui64 GetBatchDataSize(const std::shared_ptr<arrow::RecordBatch>& batch); // Return size in bytes *not* including size of bitmap mask ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column); +i64 LowerBound(const std::shared_ptr<arrow::Array>& column, const arrow::Scalar& value, i64 offset = 0); + enum class ECompareType { LESS = 1, LESS_OR_EQUAL, @@ -124,28 +126,10 @@ bool IsSortedAndUnique(const std::shared_ptr<arrow::RecordBatch>& batch, bool desc = false); bool HasAllColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& schema); -template <typename TArr> -std::shared_ptr<TArr> GetTypedColumn(const std::shared_ptr<arrow::RecordBatch>& batch, int pos) { - auto array = batch->column(pos); - Y_VERIFY(array); - //Y_VERIFY(array->type_id() == arrow::Type::TIMESTAMP); // TODO - auto column = std::static_pointer_cast<TArr>(array); - Y_VERIFY(column); - return column; -} - -template <typename TArr> -std::shared_ptr<TArr> GetTypedColumn(const std::shared_ptr<arrow::RecordBatch>& batch, const std::string& columnName) { - auto array = batch->GetColumnByName(columnName); - Y_VERIFY(array); - //Y_VERIFY(array->type_id() == arrow::Type::TIMESTAMP); // TODO - auto column = std::static_pointer_cast<TArr>(array); - Y_VERIFY(column); - return column; -} - std::pair<int, int> FindMinMaxPosition(const std::shared_ptr<arrow::Array>& column); +std::shared_ptr<arrow::Scalar> MinScalar(const std::shared_ptr<arrow::DataType>& type); std::shared_ptr<arrow::Scalar> GetScalar(const std::shared_ptr<arrow::Array>& array, int position); +bool IsGoodScalar(const std::shared_ptr<arrow::Scalar>& x); bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y); bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y); diff --git a/ydb/core/formats/sharding.h b/ydb/core/formats/sharding.h index bd6c7b16ccd..8e22fed3eb5 100644 --- a/ydb/core/formats/sharding.h +++ b/ydb/core/formats/sharding.h @@ -8,56 +8,102 @@ namespace NKikimr::NArrow { -class THashSharding { +class TShardingBase { +protected: + static void AppendField(const std::shared_ptr<arrow::Array>& array, int row, std::string& concat) { + NArrow::SwitchType(array->type_id(), [&](const auto& type) { + using TWrap = std::decay_t<decltype(type)>; + using T = typename TWrap::T; + using TArray = typename arrow::TypeTraits<T>::ArrayType; + + if (!array->IsNull(row)) { + auto& typedArray = static_cast<const TArray&>(*array); + auto value = typedArray.GetView(row); + if constexpr (arrow::has_string_view<T>()) { + concat.append(value.data(), value.size()); + } else if constexpr (arrow::has_c_type<T>()) { + if constexpr (arrow::is_physical_integer_type<T>()) { + concat.append(reinterpret_cast<const char*>(&value), sizeof(value)); + } else { + // Do not use bool or floats for sharding + static_assert(arrow::is_boolean_type<T>() || arrow::is_floating_type<T>()); + } + } else { + static_assert(arrow::is_decimal_type<T>()); + } + } + return true; + }); + } +}; + +class THashSharding : public TShardingBase { public: - THashSharding(ui32 shardsCount) + THashSharding(ui32 shardsCount, ui64 seed = 0) : ShardsCount(shardsCount) + , Seed(seed) {} std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch, const TVector<TString>& shardingColumns) const { - if (shardingColumns.size() != 1) { - return {}; - } + std::vector<std::shared_ptr<arrow::Array>> columns; + columns.reserve(shardingColumns.size()); - auto array = batch->GetColumnByName(shardingColumns[0]); - if (!array) { - return {}; + for (auto& colName : shardingColumns) { + auto array = batch->GetColumnByName(colName); + if (!array) { + return {}; + } + columns.emplace_back(array); } std::vector<ui32> out(batch->num_rows()); - SwitchType(array->type_id(), [&](const auto& type) { - using TWrap = std::decay_t<decltype(type)>; - using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType; + if (columns.size() == 1) { + auto& array = columns[0]; + SwitchType(array->type_id(), [&](const auto& type) { + using TWrap = std::decay_t<decltype(type)>; + using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType; - auto& column = static_cast<const TArray&>(*array); + auto& column = static_cast<const TArray&>(*array); + for (int row = 0; row < batch->num_rows(); ++row) { + out[row] = ShardNo(column.GetView(row)); + } + return true; + }); + } else { + std::string concat; for (int row = 0; row < batch->num_rows(); ++row) { - out[row] = ShardNo(column.GetView(row)); + concat.clear(); + for (auto& column : columns) { + AppendField(column, row, concat); + } + + out[row] = ShardNo(concat); } - return true; - }); + } return out; } private: ui32 ShardsCount; + ui64 Seed; template <typename T, std::enable_if_t<std::is_arithmetic<T>::value, bool> = true> ui32 ShardNo(T value) const { - return XXH64(&value, sizeof(value), 0) % ShardsCount; + return XXH64(&value, sizeof(value), Seed) % ShardsCount; } ui32 ShardNo(arrow::util::string_view value) const { - return XXH64(value.data(), value.size(), 0) % ShardsCount; + return XXH64(value.data(), value.size(), Seed) % ShardsCount; } }; // KIKIMR-11529 -class TLogsSharding { +class TLogsSharding : public TShardingBase { public: static constexpr ui32 DEFAULT_ACITVE_SHARDS = 10; static constexpr TDuration DEFAULT_CHANGE_PERIOD = TDuration::Minutes(5); @@ -138,32 +184,6 @@ private: ui32 NumActive; ui64 TsMin; ui64 ChangePeriod; - - static void AppendField(const std::shared_ptr<arrow::Array>& array, int row, std::string& concat) { - NArrow::SwitchType(array->type_id(), [&](const auto& type) { - using TWrap = std::decay_t<decltype(type)>; - using T = typename TWrap::T; - using TArray = typename arrow::TypeTraits<T>::ArrayType; - - if (!array->IsNull(row)) { - auto& typedArray = static_cast<const TArray&>(*array); - auto value = typedArray.GetView(row); - if constexpr (arrow::has_string_view<T>()) { - concat.append(value.data(), value.size()); - } else if constexpr (arrow::has_c_type<T>()) { - if constexpr (arrow::is_physical_integer_type<T>()) { - concat.append(reinterpret_cast<const char*>(&value), sizeof(value)); - } else { - // Do not use bool or floats for sharding - static_assert(arrow::is_boolean_type<T>() || arrow::is_floating_type<T>()); - } - } else { - static_assert(arrow::is_decimal_type<T>()); - } - } - return true; - }); - } }; } diff --git a/ydb/core/grpc_services/rpc_log_store.cpp b/ydb/core/grpc_services/rpc_log_store.cpp index 732303407bc..ea15e6a2f4b 100644 --- a/ydb/core/grpc_services/rpc_log_store.cpp +++ b/ydb/core/grpc_services/rpc_log_store.cpp @@ -76,6 +76,34 @@ void ConvertCompressionFromInternalToPublic(const NKikimrSchemeOp::TCompressionO to.set_compression_level(from.GetCompressionLevel()); } +static bool IsAllowedFirstPkField(ui32 typeId) { + switch (typeId) { + //case NYql::NProto::Bool + case NYql::NProto::Uint8: // Byte + case NYql::NProto::Int32: + case NYql::NProto::Uint32: + case NYql::NProto::Int64: + case NYql::NProto::Uint64: + //case NYql::NProto::Float: + //case NYql::NProto::Double: + case NYql::NProto::String: + case NYql::NProto::Utf8: + //case NYql::NProto::Yson: + //case NYql::NProto::Json: + //case NYql::NProto::JsonDocument: + case NYql::NProto::Date: + case NYql::NProto::Datetime: + case NYql::NProto::Timestamp: + //case NYql::NProto::Interval: + //case NYql::NProto::Decimal: + //case NYql::NProto::DyNumber: + return true; + default: + break; + } + return false; +} + bool ConvertSchemaFromPublicToInternal(const Ydb::LogStore::Schema& from, NKikimrSchemeOp::TColumnTableSchema& to, Ydb::StatusIds::StatusCode& status, TString& error) { @@ -103,8 +131,8 @@ bool ConvertSchemaFromPublicToInternal(const Ydb::LogStore::Schema& from, NKikim col->SetType(typeName); key.erase(column.name()); - if (column.name() == firstKeyColumn && typeInfo.GetTypeId() != NYql::NProto::Timestamp) { - error = "not supported first PK column type for LogStore. Only Timestamp columns are allowed for now."; + if (column.name() == firstKeyColumn && !IsAllowedFirstPkField(typeInfo.GetTypeId())) { + error = "not supported first PK column type for LogStore"; return false; } } diff --git a/ydb/core/tx/columnshard/columnshard__costs.cpp b/ydb/core/tx/columnshard/columnshard__costs.cpp index 4405a9695d2..f258d669905 100644 --- a/ydb/core/tx/columnshard/columnshard__costs.cpp +++ b/ydb/core/tx/columnshard/columnshard__costs.cpp @@ -9,7 +9,7 @@ namespace NKikimr::NOlap::NCosts { void TKeyRangesBuilder::AddMarkFromGranule(const TGranuleRecord& record) { - Constructor.StartRecord(true).AddRecordValue(std::make_shared<arrow::UInt64Scalar>(ExtractKey(record.IndexKey))); + Constructor.StartRecord(true).AddRecordValue(record.Mark); Features.emplace_back(TMarkRangeFeatures()); } @@ -32,19 +32,14 @@ void TKeyRangesBuilder::FillRangeMarks(const std::shared_ptr<NOlap::TPredicate>& Y_VERIFY(!LeftBorderOpened); LeftBorderOpened = true; } - for (auto&& i : granuleRecords) { - AddMarkFromGranule(i); + for (auto&& rec : granuleRecords) { + AddMarkFromGranule(rec); } if (AddMarkFromPredicate(right)) { Features.back().SetIntervalSkipped(true); } } -ui64 TKeyRangesBuilder::ExtractKey(const TString& key) { - Y_VERIFY(key.size() == 8); - return *reinterpret_cast<const ui64*>(key.data()); -} - NKikimrKqp::TEvRemoteCostData::TCostInfo TKeyRanges::SerializeToProto() const { NKikimrKqp::TEvRemoteCostData::TCostInfo result; result.SetLeftBorderOpened(LeftBorderOpened); diff --git a/ydb/core/tx/columnshard/columnshard__costs.h b/ydb/core/tx/columnshard/columnshard__costs.h index 6a11f3187c5..b7cb20ebe5d 100644 --- a/ydb/core/tx/columnshard/columnshard__costs.h +++ b/ydb/core/tx/columnshard/columnshard__costs.h @@ -98,7 +98,7 @@ private: const TIndexInfo& IndexInfo; bool AddMarkFromPredicate(const std::shared_ptr<NOlap::TPredicate>& p); void AddMarkFromGranule(const TGranuleRecord& record); - static ui64 ExtractKey(const TString& key); + public: TKeyRangesBuilder(const TIndexInfo& indexInfo); void Reserve(const ui32 num) { diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp index cb18edaf0da..06813954659 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp @@ -36,6 +36,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex auto txKind = record.GetTxKind(); //ui64 ssId = record.GetSchemeShardId(); ui64 txId = record.GetTxId(); + auto& txBody = record.GetTxBody(); auto status = NKikimrTxColumnShard::EResultStatus::ERROR; TString statusMessage; @@ -45,12 +46,19 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex switch (txKind) { case NKikimrTxColumnShard::TX_KIND_SCHEMA: { TColumnShard::TAlterMeta meta; - if (!meta.Body.ParseFromString(record.GetTxBody())) { + if (!meta.Body.ParseFromString(txBody)) { statusMessage = TStringBuilder() << "Schema TxId# " << txId << " cannot be parsed"; break; } + // Invalid body generated at a newer SchemeShard + if (!meta.Validate()) { + statusMessage = TStringBuilder() + << "Schema TxId# " << txId << " cannot be proposed"; + break; + } + Y_VERIFY(record.HasSchemeShardId()); if (Self->CurrentSchemeShardId == 0) { Self->CurrentSchemeShardId = record.GetSchemeShardId(); @@ -91,7 +99,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex txInfo.TxKind = txKind; txInfo.Source = Ev->Get()->GetSource(); txInfo.Cookie = Ev->Cookie; - Schema::SaveTxInfo(db, txInfo.TxId, txInfo.TxKind, record.GetTxBody(), txInfo.MaxStep, txInfo.Source, txInfo.Cookie); + Schema::SaveTxInfo(db, txInfo.TxId, txInfo.TxKind, txBody, txInfo.MaxStep, txInfo.Source, txInfo.Cookie); if (!Self->AltersInFlight.contains(txId)) { Self->AltersInFlight.emplace(txId, std::move(meta)); @@ -113,7 +121,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex } NKikimrTxColumnShard::TCommitTxBody body; - if (!body.ParseFromString(record.GetTxBody())) { + if (!body.ParseFromString(txBody)) { statusMessage = TStringBuilder() << "Commit TxId# " << txId << " cannot be parsed"; break; @@ -168,7 +176,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex txInfo.MaxStep = maxStep; txInfo.Source = Ev->Get()->GetSource(); txInfo.Cookie = Ev->Cookie; - Schema::SaveTxInfo(db, txInfo.TxId, txInfo.TxKind, record.GetTxBody(), txInfo.MaxStep, txInfo.Source, txInfo.Cookie); + Schema::SaveTxInfo(db, txInfo.TxId, txInfo.TxKind, txBody, txInfo.MaxStep, txInfo.Source, txInfo.Cookie); Self->CommitsInFlight.emplace(txId, std::move(meta)); @@ -185,7 +193,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex // TODO: make real tx: save and progress with tablets restart support NKikimrTxColumnShard::TTtlTxBody ttlBody; - if (!ttlBody.ParseFromString(record.GetTxBody())) { + if (!ttlBody.ParseFromString(txBody)) { statusMessage = "TTL tx cannot be parsed"; status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR; break; diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index b0cb8d70e1a..d31e9436ba8 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -22,8 +22,89 @@ NTabletPipe::TClientConfig GetPipeClientConfig() { return config; } +bool ValidateTableSchema(const NKikimrSchemeOp::TColumnTableSchema& schema) { + namespace NTypeIds = NScheme::NTypeIds; + + static const THashSet<NScheme::TTypeId> supportedTypes = { + NTypeIds::Timestamp, + NTypeIds::Int8, + NTypeIds::Int16, + NTypeIds::Int32, + NTypeIds::Int64, + NTypeIds::Uint8, + NTypeIds::Uint16, + NTypeIds::Uint32, + NTypeIds::Uint64, + NTypeIds::Date, + NTypeIds::Datetime, + //NTypeIds::Interval, + //NTypeIds::Float, + //NTypeIds::Double, + NTypeIds::String, + NTypeIds::Utf8 + }; + + if (!schema.HasEngine() || + schema.GetEngine() != NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES) { + return false; + } + + if (!schema.KeyColumnNamesSize()) { + return false; + } + + TString firstKeyColumn = schema.GetKeyColumnNames()[0]; + THashSet<TString> keyColumns(schema.GetKeyColumnNames().begin(), schema.GetKeyColumnNames().end()); + + for (const NKikimrSchemeOp::TOlapColumnDescription& column : schema.GetColumns()) { + TString name = column.GetName(); + keyColumns.erase(name); + + if (name == firstKeyColumn && !supportedTypes.count(column.GetTypeId())) { + return false; + } + } + + if (!keyColumns.empty()) { + return false; + } + return true; } +bool ValidateTablePreset(const NKikimrSchemeOp::TColumnTableSchemaPreset& preset) { + if (preset.HasName() && preset.GetName() != "default") { + return false; + } + return ValidateTableSchema(preset.GetSchema()); +} + +} + +bool TColumnShard::TAlterMeta::Validate() const { + switch (Body.TxBody_case()) { + case NKikimrTxColumnShard::TSchemaTxBody::kInitShard: + break; + case NKikimrTxColumnShard::TSchemaTxBody::kEnsureTables: + for (auto& table : Body.GetEnsureTables().GetTables()) { + if (table.HasSchemaPreset() && !ValidateTablePreset(table.GetSchemaPreset())) { + return false; + } + if (table.HasSchema() && !ValidateTableSchema(table.GetSchema())) { + return false; + } + // TODO: validate TtlSettings + } + break; + case NKikimrTxColumnShard::TSchemaTxBody::kAlterTable: + case NKikimrTxColumnShard::TSchemaTxBody::kDropTable: + case NKikimrTxColumnShard::TSchemaTxBody::kAlterStore: + case NKikimrTxColumnShard::TSchemaTxBody::TXBODY_NOT_SET: + break; + } + return true; +} + + TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet) : TActor(&TThis::StateInit) , TTabletExecutedFlat(info, tablet, nullptr) diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index f791eee3549..3f1f6c33aa7 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -271,6 +271,8 @@ private: struct TAlterMeta { NKikimrTxColumnShard::TSchemaTxBody Body; THashSet<TActorId> NotifySubscribers; + + bool Validate() const; }; struct TCommitMeta { diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 62758999453..5bff08f0c33 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -494,32 +494,34 @@ struct Schema : NIceDb::Schema { // IndexGranules activities - static void IndexGranules_Write(NIceDb::TNiceDb& db, ui32 index, const TGranuleRecord& row) { - db.Table<IndexGranules>().Key(index, row.PathId, row.IndexKey).Update( + static void IndexGranules_Write(NIceDb::TNiceDb& db, ui32 index, const NOlap::IColumnEngine& engine, + const TGranuleRecord& row) { + db.Table<IndexGranules>().Key(index, row.PathId, engine.SerializeMark(row.Mark)).Update( NIceDb::TUpdate<IndexGranules::Granule>(row.Granule), NIceDb::TUpdate<IndexGranules::PlanStep>(row.CreatedAt.PlanStep), NIceDb::TUpdate<IndexGranules::TxId>(row.CreatedAt.TxId) ); } - static void IndexGranules_Erase(NIceDb::TNiceDb& db, ui32 index, const TGranuleRecord& row) { - db.Table<IndexGranules>().Key(index, row.PathId, row.IndexKey).Delete(); + static void IndexGranules_Erase(NIceDb::TNiceDb& db, ui32 index, const NOlap::IColumnEngine& engine, + const TGranuleRecord& row) { + db.Table<IndexGranules>().Key(index, row.PathId, engine.SerializeMark(row.Mark)).Delete(); } - static bool IndexGranules_Load(NIceDb::TNiceDb& db, ui32 index, std::function<void(TGranuleRecord&&)> callback) { + static bool IndexGranules_Load(NIceDb::TNiceDb& db, ui32 index, const NOlap::IColumnEngine& engine, + std::function<void(TGranuleRecord&&)> callback) { auto rowset = db.Table<IndexGranules>().Prefix(index).Select(); if (!rowset.IsReady()) return false; while (!rowset.EndOfSet()) { - TGranuleRecord row; - row.PathId = rowset.GetValue<IndexGranules::PathId>(); - row.IndexKey = rowset.GetValue<IndexGranules::IndexKey>(); - row.Granule = rowset.GetValue<IndexGranules::Granule>(); + ui64 pathId = rowset.GetValue<IndexGranules::PathId>(); + TString indexKey = rowset.GetValue<IndexGranules::IndexKey>(); + ui64 granule = rowset.GetValue<IndexGranules::Granule>(); ui64 planStep = rowset.GetValue<IndexGranules::PlanStep>(); ui64 txId = rowset.GetValue<IndexGranules::TxId>(); - row.CreatedAt = {planStep, txId}; + TGranuleRecord row(pathId, granule, {planStep, txId}, engine.DeserializeMark(indexKey)); callback(std::move(row)); if (!rowset.Next()) diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp index 786359eb054..f409132c1c5 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp @@ -183,7 +183,8 @@ TVector<TCell> MakeTestCells(const TVector<TTypeInfo>& types, ui32 value, TVecto mem.push_back("{ \"a\" = [ { \"b\" = 1; } ]; }"); const TString& str = mem.back(); cells.push_back(TCell(str.data(), str.size())); - } else if (type == NTypeIds::Timestamp || type == NTypeIds::Uint64 || type == NTypeIds::Int64) { + } else if (type == NTypeIds::Timestamp || type == NTypeIds::Interval || + type == NTypeIds::Uint64 || type == NTypeIds::Int64) { cells.push_back(TCell::Make<ui64>(value)); } else if (type == NTypeIds::Uint32 || type == NTypeIds::Int32 || type == NTypeIds::Datetime) { cells.push_back(TCell::Make<ui32>(value)); diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index 5cb92a8ae83..b218c56b47c 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -88,14 +88,13 @@ struct TTestSchema { } }; - static auto YdbSchema() { + static auto YdbSchema(const std::pair<TString, TTypeInfo>& firstKeyItem = {"timestamp", TTypeInfo(NTypeIds::Timestamp) }) { TVector<std::pair<TString, TTypeInfo>> schema = { // PK - {"timestamp", TTypeInfo(NTypeIds::Timestamp) }, + firstKeyItem, {"resource_type", TTypeInfo(NTypeIds::Utf8) }, {"resource_id", TTypeInfo(NTypeIds::Utf8) }, {"uid", TTypeInfo(NTypeIds::Utf8) }, - // {"level", TTypeInfo(NTypeIds::Int32) }, {"message", TTypeInfo(NTypeIds::Utf8) }, {"json_payload", TTypeInfo(NTypeIds::Json) }, @@ -179,7 +178,8 @@ struct TTestSchema { return col; } - static TString CreateTableTxBody(ui64 pathId, const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, + static TString CreateTableTxBody(ui64 pathId, const TVector<std::pair<TString, TTypeInfo>>& columns, + const TVector<std::pair<TString, TTypeInfo>>& pk, const TTableSpecials& specials = {}) { NKikimrTxColumnShard::TSchemaTxBody tx; auto* table = tx.MutableEnsureTables()->AddTables(); @@ -199,9 +199,7 @@ struct TTestSchema { *schema->MutableColumns()->Add() = CreateColumn(i + 1, columns[i].first, columns[i].second); } - auto pk = columns; - Y_VERIFY(pk.size() >= 4); - pk.resize(4); + Y_VERIFY(pk.size() == 4); for (auto& column : ExtractNames(pk)) { schema->AddKeyColumnNames(column); } diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.txt b/ydb/core/tx/columnshard/engines/CMakeLists.txt index 0aee1e51a3e..5da5cd8162e 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.txt @@ -33,4 +33,5 @@ target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/scalars.cpp ) diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 85704b2a0fe..4cfc7fa2ef6 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -372,6 +372,9 @@ public: virtual const THashSet<ui64>* GetOverloadedGranules(ui64 /*pathId*/) const { return nullptr; } virtual bool HasOverloadedGranules() const { return false; } + virtual TString SerializeMark(const std::shared_ptr<arrow::Scalar>& scalar) const = 0; + virtual std::shared_ptr<arrow::Scalar> DeserializeMark(const TString& key) const = 0; + virtual bool Load(IDbWrapper& db, const THashSet<ui64>& pathsToDrop = {}) = 0; virtual std::shared_ptr<TSelectInfo> Select(ui64 pathId, TSnapshot snapshot, diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 53e46ed3b27..5b5af6b9461 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -6,15 +6,16 @@ namespace NKikimr::NOlap { -std::shared_ptr<arrow::TimestampArray> GetTimestampColumn(const TIndexInfo& indexInfo, - const std::shared_ptr<arrow::RecordBatch>& batch) { +std::shared_ptr<arrow::Array> GetFirstPKColumn(const TIndexInfo& indexInfo, + const std::shared_ptr<arrow::RecordBatch>& batch) { TString columnName = indexInfo.GetPK()[0].first; - std::string tsColumnName(columnName.data(), columnName.size()); - return NArrow::GetTypedColumn<arrow::TimestampArray>(batch, tsColumnName); + return batch->GetColumnByName(std::string(columnName.data(), columnName.size())); } namespace { +using TMark = TColumnEngineForLogs::TMark; + arrow::ipc::IpcWriteOptions WriteOptions(const TCompression& compression) { auto& codec = compression.Codec; @@ -36,17 +37,18 @@ arrow::ipc::IpcWriteOptions WriteOptions(const TCompression& compression) { return options; } -ui64 ExtractTimestamp(const std::shared_ptr<TPredicate>& pkPredicate, const std::shared_ptr<arrow::Schema>& key) { +std::shared_ptr<arrow::Scalar> ExtractFirstKey(const std::shared_ptr<TPredicate>& pkPredicate, + const std::shared_ptr<arrow::Schema>& key) { if (pkPredicate) { Y_VERIFY(pkPredicate->Good()); Y_VERIFY(key->num_fields() == 1); Y_VERIFY(key->field(0)->Equals(pkPredicate->Batch->schema()->field(0))); - auto array = NArrow::GetTypedColumn<arrow::TimestampArray>(pkPredicate->Batch, 0); + auto array = pkPredicate->Batch->column(0); Y_VERIFY(array && array->length() == 1); - return array->Value(0); + return *array->GetScalar(0); } - return 0; + return {}; } // Although source batches are ordered only by PK (sorting key) resulting pathBatches are ordered by extended key. @@ -188,16 +190,17 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> PortionsToBatches(const TIndexI return batches; } -bool InitInGranuleMerge(TVector<TPortionInfo>& portions, const TCompactionLimits& limits, const TSnapshot& snap, - TMap<ui64, ui64>& borders) { +bool InitInGranuleMerge(const TMark& granuleMark, TVector<TPortionInfo>& portions, const TCompactionLimits& limits, + const TSnapshot& snap, TMap<TMark, ui64>& borders) { ui64 oldTimePlanStep = snap.PlanStep - TDuration::Seconds(limits.InGranuleCompactSeconds).MilliSeconds(); ui32 insertedCount = 0; ui32 insertedNew = 0; THashSet<ui64> filtered; THashSet<ui64> goodCompacted; + THashSet<ui64> nextToGood; { - TMap<ui64, TVector<const TPortionInfo*>> points; + TMap<TMark, TVector<const TPortionInfo*>> points; for (auto& portionInfo : portions) { if (portionInfo.IsInserted()) { @@ -212,38 +215,46 @@ bool InitInGranuleMerge(TVector<TPortionInfo>& portions, const TCompactionLimits auto start = portionInfo.PkStart(); auto end = portionInfo.PkEnd(); Y_VERIFY(start && end); - ui64 min = static_cast<const arrow::TimestampScalar&>(*start).value; - ui64 max = static_cast<const arrow::TimestampScalar&>(*end).value; - points[min].push_back(&portionInfo); // insert start - points[max].push_back(nullptr); // insert end + points[TMark(start)].push_back(&portionInfo); + points[TMark(end)].push_back(nullptr); } - ui32 bucketCounter = 0; + ui32 countInBucket = 0; + ui64 bucketStartPortion = 0; + bool isGood = false; int sum = 0; - const TPortionInfo* lastPortion{}; for (auto& [key, vec] : points) { for (auto& portionInfo : vec) { if (portionInfo) { ++sum; - lastPortion = portionInfo; - ++bucketCounter; + ui64 currentPortion = portionInfo->Portion(); + if (!bucketStartPortion) { + bucketStartPortion = currentPortion; + } + ++countInBucket; + + ui64 prevIsGood = isGood; + isGood = goodCompacted.count(currentPortion); + if (prevIsGood && !isGood) { + nextToGood.insert(currentPortion); + } } else { --sum; } } if (!sum) { // count(start) == count(end), start new range - if (bucketCounter == 1) { - Y_VERIFY(lastPortion); + Y_VERIFY(bucketStartPortion); + if (countInBucket == 1) { // We do not want to merge big compacted portions with inserted ones if there's no intersections. - ui64 maxBlobSize = lastPortion->BlobsSizes().second; - if (!lastPortion->IsInserted() && maxBlobSize >= limits.GoodBlobSize) { - filtered.insert(lastPortion->Portion()); + if (isGood) { + filtered.insert(bucketStartPortion); } } - bucketCounter = 0; + countInBucket = 0; + bucketStartPortion = 0; } } } @@ -261,27 +272,25 @@ bool InitInGranuleMerge(TVector<TPortionInfo>& portions, const TCompactionLimits // It's a map for SliceIntoGranules(). We use fake granule ids here to slice batch with borders. // We could merge inserted portions alltogether and slice result with filtered borders to prevent intersections. - borders[0] = 0; + borders[granuleMark] = 0; TVector<TPortionInfo> tmp; tmp.reserve(portions.size()); for (auto& portionInfo : portions) { + ui64 curPortion = portionInfo.Portion(); + // Prevent merge of compacted portions with no intersections - if (filtered.count(portionInfo.Portion())) { + if (filtered.count(curPortion)) { auto start = portionInfo.PkStart(); Y_VERIFY(start); - ui64 ts = static_cast<const arrow::TimestampScalar&>(*start).value; - borders[ts] = 0; - // No need to add its end + borders[TMark(start)] = 0; } else { - // Merge good compacted portion with intersections but prevent its unneeded growth - if (goodCompacted.count(portionInfo.Portion())) { - // Add "first after end" border but do not add start: allow to merge with older or intersected data. - // Do not add start to prevent [good] [small] [good] portions pattern. - auto end = portionInfo.PkEnd(); - Y_VERIFY(end); - ui64 ts = static_cast<const arrow::TimestampScalar&>(*end).value + 1; - borders[ts] = 0; + // nextToGood borders potentially split good compacted portions into 2 parts: + // the first one without intersections and the second with them + if (goodCompacted.count(curPortion) || nextToGood.count(curPortion)) { + auto start = portionInfo.PkStart(); + Y_VERIFY(start); + borders[TMark(start)] = 0; } tmp.emplace_back(std::move(portionInfo)); @@ -290,7 +299,7 @@ bool InitInGranuleMerge(TVector<TPortionInfo>& portions, const TCompactionLimits tmp.swap(portions); if (borders.size() == 1) { - Y_VERIFY(borders.begin()->first == 0); + Y_VERIFY(borders.begin()->first == granuleMark); borders.clear(); } @@ -319,30 +328,29 @@ inline THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> SliceIntoGranulesImpl THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> out; if (tsGranules.size() == 1) { - Y_VERIFY(tsGranules.begin()->first == 0); + //Y_VERIFY(tsGranules.begin()->first.IsDefault()); ui64 granule = tsGranules.begin()->second; out.emplace(granule, batch); } else { - std::shared_ptr<arrow::TimestampArray> keyColumn = GetTimestampColumn(indexInfo, batch); + auto keyColumn = GetFirstPKColumn(indexInfo, batch); Y_VERIFY(keyColumn && keyColumn->length() > 0); - TVector<i64> borders; + TVector<TMark> borders; borders.reserve(tsGranules.size()); for (auto& [ts, granule] : tsGranules) { borders.push_back(ts); } ui32 i = 0; - int offset = 0; + i64 offset = 0; for (auto& [ts, granule] : tsGranules) { - int end = keyColumn->length(); + i64 end = keyColumn->length(); if (i < borders.size() - 1) { - i64 border = borders[i + 1]; - const auto* ptr = keyColumn->raw_values(); - end = std::lower_bound(ptr + offset, ptr + keyColumn->length(), border) - ptr; + TMark border = borders[i + 1]; + end = NArrow::LowerBound(keyColumn, *border.Border, offset); } - int size = end - offset; + i64 size = end - offset; if (size) { Y_VERIFY(size > 0); Y_VERIFY(!out.count(granule)); @@ -359,17 +367,19 @@ inline THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> SliceIntoGranulesImpl } THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> -SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const TMap<ui64, ui64>& tsGranules, +SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, + const TMap<TMark, ui64>& markGranules, const TIndexInfo& indexInfo) { - return SliceIntoGranulesImpl(batch, tsGranules, indexInfo); + return SliceIntoGranulesImpl(batch, markGranules, indexInfo); } THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> -SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::pair<ui64, ui64>>& tsGranules, +SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, + const std::vector<std::pair<TMark, ui64>>& markGranules, const TIndexInfo& indexInfo) { - return SliceIntoGranulesImpl(batch, tsGranules, indexInfo); + return SliceIntoGranulesImpl(batch, markGranules, indexInfo); } TColumnEngineForLogs::TColumnEngineForLogs(TIndexInfo&& info, ui64 tabletId, const TCompactionLimits& limits) @@ -384,8 +394,12 @@ TColumnEngineForLogs::TColumnEngineForLogs(TIndexInfo&& info, ui64 tabletId, con /// * apply PK predicate before REPLACE IndexInfo.SetAllKeys(IndexInfo.GetPK(), {0}); + auto& indexKey = IndexInfo.GetIndexKey(); + Y_VERIFY(indexKey->num_fields() == 1); + MarkType = indexKey->field(0)->type(); + ui32 indexId = IndexInfo.GetId(); - GranulesTable = std::make_shared<TGranulesTable>(indexId); + GranulesTable = std::make_shared<TGranulesTable>(*this, indexId); ColumnsTable = std::make_shared<TColumnsTable>(indexId); CountersTable = std::make_shared<TCountersTable>(indexId); } @@ -564,7 +578,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, const THashSet<ui64>& pathsToDro for (auto& emptyGranules : EmptyGranuleTracks(pathId)) { // keep first one => megre, keep nothing => drop. bool keepFirst = !pathsToDrop.count(pathId); - for (auto& [ts, granule] : emptyGranules) { + for (auto& [mark, granule] : emptyGranules) { if (keepFirst) { keepFirst = false; continue; @@ -574,7 +588,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, const THashSet<ui64>& pathsToDro auto spg = Granules[granule]; Y_VERIFY(spg); GranulesTable->Erase(db, spg->Record); - EraseGranule(pathId, granule, ts); + EraseGranule(pathId, granule, mark); } } } @@ -627,7 +641,7 @@ bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) { std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(TVector<TInsertedData>&& dataToIndex) { Y_VERIFY(dataToIndex.size()); - auto changes = std::make_shared<TChanges>(std::move(dataToIndex), Limits); + auto changes = std::make_shared<TChanges>(*this, std::move(dataToIndex), Limits); ui32 reserveGranules = 0; changes->InitSnapshot = LastSnapshot; @@ -669,7 +683,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std: const TSnapshot& outdatedSnapshot) { Y_VERIFY(info); - auto changes = std::make_shared<TChanges>(std::move(info), Limits); + auto changes = std::make_shared<TChanges>(*this, std::move(info), Limits); changes->InitSnapshot = LastSnapshot; Y_VERIFY(changes->CompactionInfo->Granules.size() == 1); @@ -697,17 +711,17 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std: ui64 pathId = spg->Record.PathId; Y_VERIFY(PathGranules.count(pathId)); - for (const auto& [ts, pathGranule] : PathGranules[pathId]) { + for (const auto& [mark, pathGranule] : PathGranules[pathId]) { if (pathGranule == granule) { - changes->SrcGranule = {pathId, granule, ts}; + changes->SrcGranule = TChanges::TSrcGranule(pathId, granule, mark); break; } } - Y_VERIFY(changes->SrcGranule.Good()); + Y_VERIFY(changes->SrcGranule); if (changes->CompactionInfo->InGranule) { TSnapshot completedSnap = (outdatedSnapshot < LastSnapshot) ? LastSnapshot : outdatedSnapshot; - if (!InitInGranuleMerge(portions, Limits, completedSnap, changes->MergeBorders)) { + if (!InitInGranuleMerge(changes->SrcGranule->Mark, portions, Limits, completedSnap, changes->MergeBorders)) { return {}; } } else { @@ -720,7 +734,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std: std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop) { - auto changes = std::make_shared<TChanges>(snapshot, Limits); + auto changes = std::make_shared<TChanges>(*this, snapshot, Limits); // Add all portions from dropped paths THashSet<ui64> dropPortions; @@ -773,7 +787,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash } TSnapshot fakeSnapshot = {1, 1}; // TODO: better snapshot - auto changes = std::make_shared<TChanges>(TColumnEngineChanges::TTL, fakeSnapshot); + auto changes = std::make_shared<TChanges>(*this, TColumnEngineChanges::TTL, fakeSnapshot); ui64 evicttionSize = 0; bool allowEviction = true; ui64 dropBlobs = 0; @@ -836,13 +850,13 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash return changes; } -TVector<TVector<std::pair<ui64, ui64>>> TColumnEngineForLogs::EmptyGranuleTracks(ui64 pathId) const { +TVector<TVector<std::pair<TMark, ui64>>> TColumnEngineForLogs::EmptyGranuleTracks(ui64 pathId) const { Y_VERIFY(PathGranules.count(pathId)); const auto& pathGranules = PathGranules.find(pathId)->second; - TVector<TVector<std::pair<ui64, ui64>>> emptyGranules; + TVector<TVector<std::pair<TMark, ui64>>> emptyGranules; ui64 emptyStart = 0; - for (const auto& [ts, granule]: pathGranules) { + for (const auto& [mark, granule]: pathGranules) { Y_VERIFY(Granules.count(granule)); auto spg = Granules.find(granule)->second; Y_VERIFY(spg); @@ -852,7 +866,7 @@ TVector<TVector<std::pair<ui64, ui64>>> TColumnEngineForLogs::EmptyGranuleTracks emptyGranules.push_back({}); emptyStart = granule; } - emptyGranules.back().emplace_back(ts, granule); + emptyGranules.back().emplace_back(mark, granule); } else if (emptyStart) { emptyStart = 0; } @@ -923,10 +937,10 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE // Set x-snapshot to switched portions if (changes->IsCompaction()) { - Y_VERIFY(changes->SrcGranule.Good()); + Y_VERIFY(changes->SrcGranule); /// @warning set granule not in split even if tx would be aborted later - GranulesInSplit.erase(changes->SrcGranule.Granule); + GranulesInSplit.erase(changes->SrcGranule->Granule); Y_VERIFY(changes->CompactionInfo); for (auto& portionInfo : changes->SwitchedPortions) { @@ -973,7 +987,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE granules[portionInfo.Granule()] = {}; } } else if (changes->IsCompaction() && !changes->CompactionInfo->InGranule) { - granules[changes->SrcGranule.Granule] = {}; + granules[changes->SrcGranule->Granule] = {}; } else { for (auto& portionInfo : changes->AppendedPortions) { granules[portionInfo.Granule()] = {}; @@ -993,13 +1007,13 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, if (changes.CompactionInfo->InGranule) { #if 0 if (changes.SwitchedPortions.size() <= changes.AppendedPortions.size()) { - LOG_S_ERROR("Cannot compact granule " << changes.SrcGranule.Granule << " at tablet " << TabletId); + LOG_S_ERROR("Cannot compact granule " << changes.SrcGranule->Granule << " at tablet " << TabletId); return false; } #endif } else { if (changes.NewGranules.empty()) { - LOG_S_ERROR("Cannot split granule " << changes.SrcGranule.Granule << " at tablet " << TabletId); + LOG_S_ERROR("Cannot split granule " << changes.SrcGranule->Granule << " at tablet " << TabletId); return false; } } @@ -1012,9 +1026,8 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, for (auto& [granule, p] : changes.NewGranules) { ui64 pathId = p.first; - ui64 ts = p.second; - TString key((const char*)&ts, sizeof(ts)); - TGranuleRecord rec{pathId, key, granule, snapshot}; + TMark mark = p.second; + TGranuleRecord rec(pathId, granule, snapshot, mark.Border); if (!SetGranule(rec, apply)) { LOG_S_ERROR("Cannot insert granule " << rec << " at tablet " << TabletId); @@ -1038,15 +1051,15 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, return false; } - ui64 granuleMinTs = ExtractKey(Granules[granule]->Record.IndexKey); + auto& granuleStart = Granules[granule]->Record.Mark; if (!apply) { // granule vs portion minPK - auto pkScalar = portionInfo.PkStart(); - Y_VERIFY(pkScalar); - ui64 portionMinTs = ExtractKey(*pkScalar); - if (granuleMinTs > portionMinTs) { - LOG_S_ERROR("Cannot update invalid portion " << portionInfo << " minTs: " << portionMinTs - << " granule minTs: " << granuleMinTs << " at tablet " << TabletId); + auto portionStart = portionInfo.PkStart(); + Y_VERIFY(portionStart); + if (TMark(portionStart) < TMark(granuleStart)) { + LOG_S_ERROR("Cannot update invalid portion " << portionInfo + << " start: " << portionStart->ToString() + << " granule start: " << granuleStart->ToString() << " at tablet " << TabletId); return false; } } @@ -1152,18 +1165,17 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, } // granule vs portion minPK - ui64 granuleMinTs = 0; + std::shared_ptr<arrow::Scalar> granuleStart; if (Granules.count(granule)) { - granuleMinTs = ExtractKey(Granules[granule]->Record.IndexKey); + granuleStart = Granules[granule]->Record.Mark; } else { - granuleMinTs = changes.NewGranules.find(granule)->second.second; + granuleStart = changes.NewGranules.find(granule)->second.second.Border; } - auto pkScalar = portionInfo.PkStart(); - Y_VERIFY(pkScalar); - ui64 portionMinTs = ExtractKey(*pkScalar); - if (granuleMinTs > portionMinTs) { - LOG_S_ERROR("Cannot insert invalid portion " << portionInfo << " minTs: " << portionMinTs - << " granule minTs: " << granuleMinTs << " at tablet " << TabletId); + auto portionStart = portionInfo.PkStart(); + Y_VERIFY(portionStart); + if (TMark(portionStart) < TMark(granuleStart)) { + LOG_S_ERROR("Cannot insert invalid portion " << portionInfo << " start: " << portionStart->ToString() + << " granule start: " << granuleStart->ToString() << " at tablet " << TabletId); return false; } } @@ -1201,33 +1213,33 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, } bool TColumnEngineForLogs::SetGranule(const TGranuleRecord& rec, bool apply) { - ui64 ts = ExtractKey(rec.IndexKey); + TMark mark(rec.Mark); if (!apply) { if (Granules.count(rec.Granule)) { return false; } - if (PathGranules.count(rec.PathId) && PathGranules[rec.PathId].count(ts)) { + if (PathGranules.count(rec.PathId) && PathGranules[rec.PathId].count(mark)) { return false; } return true; } - PathGranules[rec.PathId].emplace(ts, rec.Granule); + PathGranules[rec.PathId].emplace(mark, rec.Granule); auto& spg = Granules[rec.Granule]; Y_VERIFY(!spg); spg = std::make_shared<TGranuleMeta>(rec); return true; // It must return true if (apply == true) } -void TColumnEngineForLogs::EraseGranule(ui64 pathId, ui64 granule, ui64 ts) { +void TColumnEngineForLogs::EraseGranule(ui64 pathId, ui64 granule, const TMark& mark) { Y_VERIFY(PathGranules.count(pathId)); Y_VERIFY(Granules.count(granule)); Granules.erase(granule); EmptyGranules.erase(granule); - PathGranules[pathId].erase(ts); + PathGranules[pathId].erase(mark); } bool TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, bool apply, bool updateStats) { @@ -1362,21 +1374,21 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot out->Granules.reserve(pathGranules.size()); // TODO: out.Portions.reserve() - ui64 tsFrom = ExtractTimestamp(from, GetIndexKey()); - ui64 tsTo = ExtractTimestamp(to, GetIndexKey()); + auto keyFrom = ExtractFirstKey(from, GetIndexKey()); + auto keyTo = ExtractFirstKey(to, GetIndexKey()); // Apply FROM auto it = pathGranules.begin(); - if (tsFrom) { - it = pathGranules.upper_bound(tsFrom); + if (keyFrom) { + it = pathGranules.upper_bound(TMark(keyFrom)); --it; } for (; it != pathGranules.end(); ++it) { - ui64 ts = it->first; + auto& mark = it->first; ui64 granule = it->second; // Apply TO - if (to && ts > tsTo) { + if (keyTo && TMark(keyTo) < mark) { break; } @@ -1596,29 +1608,31 @@ static TVector<TString> CompactInGranule(const TIndexInfo& indexInfo, /// @return vec({ts, batch}). ts0 <= ts1 <= ... <= tsN /// @note We use ts from PK for split but there could be lots PK with the same ts. -static TVector<std::pair<ui64, std::shared_ptr<arrow::RecordBatch>>> -SliceGranuleBatches(const TIndexInfo& indexInfo, const TColumnEngineForLogs::TChanges& changes, - std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches, i64 ts0) { - TVector<std::pair<ui64, std::shared_ptr<arrow::RecordBatch>>> out; +static TVector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>> +SliceGranuleBatches(const TIndexInfo& indexInfo, + const TColumnEngineForLogs::TChanges& changes, + std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches, + const TMark& ts0) { + TVector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>> out; // Extract unique effective key (timestamp) and their counts i64 numRows = 0; - TMap<ui64, ui32> uniqKeyCount; + TMap<TMark, ui32> uniqKeyCount; for (auto& batch : batches) { numRows += batch->num_rows(); - std::shared_ptr<arrow::TimestampArray> keyColumn = GetTimestampColumn(indexInfo, batch); + auto keyColumn = GetFirstPKColumn(indexInfo, batch); Y_VERIFY(keyColumn && keyColumn->length() > 0); for (int pos = 0; pos < keyColumn->length(); ++pos) { - ui64 ts = keyColumn->Value(pos); + TMark ts(*keyColumn->GetScalar(pos)); ++uniqKeyCount[ts]; } } Y_VERIFY(uniqKeyCount.size()); - i64 minTs = uniqKeyCount.begin()->first; - i64 maxTs = uniqKeyCount.rbegin()->first; + auto minTs = uniqKeyCount.begin()->first; + auto maxTs = uniqKeyCount.rbegin()->first; Y_VERIFY(minTs >= ts0); // It's an estimation of needed count cause numRows calculated before key replaces @@ -1638,7 +1652,7 @@ SliceGranuleBatches(const TIndexInfo& indexInfo, const TColumnEngineForLogs::TCh } // Make split borders from uniq keys - TVector<i64> borders; + TVector<TMark> borders; borders.reserve(numRows / rowsInGranule); { ui32 sumRows = 0; @@ -1662,15 +1676,12 @@ SliceGranuleBatches(const TIndexInfo& indexInfo, const TColumnEngineForLogs::TCh auto& batchOffsets = offsets[i]; batchOffsets.reserve(borders.size() + 1); - std::shared_ptr<arrow::TimestampArray> keyColumn = GetTimestampColumn(indexInfo, batch); + auto keyColumn = GetFirstPKColumn(indexInfo, batch); Y_VERIFY(keyColumn && keyColumn->length() > 0); batchOffsets.push_back(0); - for (i64 border : borders) { - const auto* start = keyColumn->raw_values() + batchOffsets.back(); - const auto* end = keyColumn->raw_values() + keyColumn->length(); - const auto* borderPos = std::lower_bound(start, end, border); - int offset = borderPos - keyColumn->raw_values(); + for (auto& border : borders) { + int offset = NArrow::LowerBound(keyColumn, *border.Border, batchOffsets.back()); Y_VERIFY(offset >= batchOffsets.back()); batchOffsets.push_back(offset); } @@ -1702,16 +1713,17 @@ SliceGranuleBatches(const TIndexInfo& indexInfo, const TColumnEngineForLogs::TCh Y_VERIFY(slice->num_rows()); granuleNumRows += slice->num_rows(); #if 1 // Check correctness - std::shared_ptr<arrow::TimestampArray> keyColumn = GetTimestampColumn(indexInfo, slice); + auto keyColumn = GetFirstPKColumn(indexInfo, slice); Y_VERIFY(keyColumn && keyColumn->length() > 0); - i64 startKey = granuleNo ? borders[granuleNo - 1] : minTs; - Y_VERIFY(keyColumn->Value(0) >= startKey); + auto startKey = granuleNo ? borders[granuleNo - 1] : minTs; + Y_VERIFY(TMark(*keyColumn->GetScalar(0)) >= startKey); + if (granuleNo < borders.size() - 1) { - i64 endKey = borders[granuleNo]; - Y_VERIFY(keyColumn->Value(keyColumn->length() - 1) < endKey); + auto endKey = borders[granuleNo]; + Y_VERIFY(TMark(*keyColumn->GetScalar(keyColumn->length() - 1)) < endKey); } else { - Y_VERIFY(keyColumn->Value(keyColumn->length() - 1) <= maxTs); + Y_VERIFY(TMark(*keyColumn->GetScalar(keyColumn->length() - 1)) <= maxTs); } #endif Y_VERIFY_DEBUG(NArrow::IsSorted(slice, indexInfo.GetReplaceKey())); @@ -1727,14 +1739,14 @@ SliceGranuleBatches(const TIndexInfo& indexInfo, const TColumnEngineForLogs::TCh for (auto& batch : merged) { Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey())); - i64 startKey = ts0; + auto startKey = ts0; if (granuleNo) { startKey = borders[granuleNo - 1]; } #if 1 // Check correctness - std::shared_ptr<arrow::TimestampArray> keyColumn = GetTimestampColumn(indexInfo, batch); + auto keyColumn = GetFirstPKColumn(indexInfo, batch); Y_VERIFY(keyColumn && keyColumn->length() > 0); - Y_VERIFY(keyColumn->Value(0) >= startKey); + Y_VERIFY(TMark(*keyColumn->GetScalar(0)) >= startKey); #endif out.emplace_back(startKey, batch); } @@ -1743,8 +1755,10 @@ SliceGranuleBatches(const TIndexInfo& indexInfo, const TColumnEngineForLogs::TCh return out; } -static ui64 TryMovePortions(TVector<TPortionInfo>& portions, TMap<ui64, ui64>& tsIds, - TVector<std::pair<TPortionInfo, ui64>>& toMove, i64 ts0) { +static ui64 TryMovePortions(TVector<TPortionInfo>& portions, + TMap<TMark, ui64>& tsIds, + TVector<std::pair<TPortionInfo, ui64>>& toMove, + const TMark& ts0) { std::vector<const TPortionInfo*> compacted; compacted.reserve(portions.size()); std::vector<const TPortionInfo*> inserted; @@ -1776,9 +1790,9 @@ static ui64 TryMovePortions(TVector<TPortionInfo>& portions, TMap<ui64, ui64>& t ui64 numRows = 0; ui32 counter = 0; for (auto* portionInfo : compacted) { - ui64 ts = ts0; + TMark ts = ts0; if (counter) { - ts = static_cast<const arrow::TimestampScalar&>(*portionInfo->PkStart()).value; + ts = TMark(portionInfo->PkStart()); } ui32 rows = portionInfo->NumRows(); @@ -1801,12 +1815,12 @@ static ui64 TryMovePortions(TVector<TPortionInfo>& portions, TMap<ui64, ui64>& t static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo, std::shared_ptr<TColumnEngineForLogs::TChanges> changes) { - ui64 pathId = changes->SrcGranule.PathId; - //ui64 granule = changes->SrcGranule.Granule; - ui64 ts0 = changes->SrcGranule.Ts; + ui64 pathId = changes->SrcGranule->PathId; + //ui64 granule = changes->SrcGranule->Granule; + TMark ts0 = changes->SrcGranule->Mark; TVector<TPortionInfo>& portions = changes->SwitchedPortions; - TMap<ui64, ui64> tsIds; + TMap<TMark, ui64> tsIds; ui64 movedRows = TryMovePortions(portions, tsIds, changes->PortionsToMove, ts0); auto srcBatches = PortionsToBatches(indexInfo, portions, changes->Blobs, (bool)movedRows); Y_VERIFY(srcBatches.size() == portions.size()); @@ -1829,8 +1843,8 @@ static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo, ui32 rowsInGranule = numRows / numSplitInto; Y_VERIFY(rowsInGranule); - TMap<ui64, ui64> newTsIds; - ui64 lastTs = tsIds.rbegin()->first; + TMap<TMark, ui64> newTsIds; + auto lastTs = tsIds.rbegin()->first; ui32 tmpGranule = 0; ui32 sumRows = 0; ui32 i = 0; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 133a62ddf16..a09710ad213 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -1,6 +1,7 @@ #pragma once #include "defs.h" #include "column_engine.h" +#include "scalars.h" namespace NKikimr::NArrow { struct TSortDescription; @@ -18,39 +19,127 @@ class TCountersTable; /// - Columns: granule -> blobs class TColumnEngineForLogs : public IColumnEngine { public: + struct TMark { + std::shared_ptr<arrow::Scalar> Border; + + explicit TMark(const std::shared_ptr<arrow::Scalar>& s) + : Border(s) + { + Y_VERIFY_DEBUG(NArrow::IsGoodScalar(Border)); + } + + explicit TMark(const std::shared_ptr<arrow::DataType>& type) + : Border(MinScalar(type)) + { + Y_VERIFY_DEBUG(NArrow::IsGoodScalar(Border)); + } + + TMark(const TString& key, const std::shared_ptr<arrow::DataType>& type) { + Deserialize(key, type); + Y_VERIFY_DEBUG(NArrow::IsGoodScalar(Border)); + } + + TMark(const TMark& m) = default; + TMark& operator = (const TMark& m) = default; + + bool operator == (const TMark& m) const { + Y_VERIFY(Border); + Y_VERIFY(m.Border); + return Border->Equals(*m.Border); + } + + bool operator < (const TMark& m) const { + Y_VERIFY(Border); + Y_VERIFY(m.Border); + return NArrow::ScalarLess(*Border, *m.Border); + } + + bool operator <= (const TMark& m) const { + Y_VERIFY(Border); + Y_VERIFY(m.Border); + return Border->Equals(*m.Border) || NArrow::ScalarLess(*Border, *m.Border); + } + + bool operator > (const TMark& m) const { + return !(*this <= m); + } + + bool operator >= (const TMark& m) const { + return !(*this < m); + } + + ui64 Hash() const { + return Border->hash(); + } + + operator size_t () const { + return Hash(); + } + + operator bool () const { + Y_VERIFY(false); + } + + TString Serialize() const { + return SerializeKeyScalar(Border); + } + + void Deserialize(const TString& key, const std::shared_ptr<arrow::DataType>& type) { + Border = DeserializeKeyScalar(key, type); + } + + static std::shared_ptr<arrow::Scalar> MinScalar(const std::shared_ptr<arrow::DataType>& type) { + if (type->id() == arrow::Type::TIMESTAMP) { + // TODO: support negative timestamps in index + return std::make_shared<arrow::TimestampScalar>(0, type); + } + return NArrow::MinScalar(type); + } + }; + class TChanges : public TColumnEngineChanges { public: struct TSrcGranule { ui64 PathId{0}; ui64 Granule{0}; - ui64 Ts{0}; + TMark Mark; - bool Good() const { return PathId && Granule; } + TSrcGranule(ui64 pathId, ui64 granule, const TMark& mark) + : PathId(pathId), Granule(granule), Mark(mark) + {} }; - TChanges(TVector<NOlap::TInsertedData>&& blobsToIndex, const TCompactionLimits& limits) + TChanges(const TColumnEngineForLogs& engine, + TVector<NOlap::TInsertedData>&& blobsToIndex, const TCompactionLimits& limits) : TColumnEngineChanges(TColumnEngineChanges::INSERT) + , DefaultMark(engine.GetMarkType()) { Limits = limits; DataToIndex = std::move(blobsToIndex); } - TChanges(std::unique_ptr<TCompactionInfo>&& info, const TCompactionLimits& limits) + TChanges(const TColumnEngineForLogs& engine, + std::unique_ptr<TCompactionInfo>&& info, const TCompactionLimits& limits) : TColumnEngineChanges(TColumnEngineChanges::COMPACTION) + , DefaultMark(engine.GetMarkType()) { Limits = limits; CompactionInfo = std::move(info); } - TChanges(const TSnapshot& snapshot, const TCompactionLimits& limits) + TChanges(const TColumnEngineForLogs& engine, + const TSnapshot& snapshot, const TCompactionLimits& limits) : TColumnEngineChanges(TColumnEngineChanges::CLEANUP) + , DefaultMark(engine.GetMarkType()) { Limits = limits; InitSnapshot = snapshot; } - TChanges(TColumnEngineChanges::EType type, const TSnapshot& applySnapshot) + TChanges(const TColumnEngineForLogs& engine, + TColumnEngineChanges::EType type, const TSnapshot& applySnapshot) : TColumnEngineChanges(type) + , DefaultMark(engine.GetMarkType()) { ApplySnapshot = applySnapshot; } @@ -64,31 +153,31 @@ public: ui64 granule = FirstGranuleId; ++FirstGranuleId; --ReservedGranuleIds; - ui64 ts = 0; - NewGranules.emplace(granule, std::make_pair(pathId, ts)); - PathToGranule[pathId].emplace_back(ts, granule); + + NewGranules.emplace(granule, std::make_pair(pathId, DefaultMark)); + PathToGranule[pathId].emplace_back(DefaultMark, granule); return true; } - ui64 SetTmpGranule(ui64 pathId, ui64 ts) { - Y_VERIFY(pathId == SrcGranule.PathId); - if (!TmpGranuleIds.count(ts)) { - TmpGranuleIds[ts] = FirstGranuleId; + ui64 SetTmpGranule(ui64 pathId, const TMark& mark) { + Y_VERIFY(pathId == SrcGranule->PathId); + if (!TmpGranuleIds.count(mark)) { + TmpGranuleIds[mark] = FirstGranuleId; ++FirstGranuleId; } - return TmpGranuleIds[ts]; + return TmpGranuleIds[mark]; } THashMap<ui64, ui64> TmpToNewGranules(ui64 start) { THashMap<ui64, ui64> granuleRemap; - for (auto& [ts, counter] : TmpGranuleIds) { + for (auto& [mark, counter] : TmpGranuleIds) { ui64 granule = start + counter; - if (ts == SrcGranule.Ts) { + if (mark == SrcGranule->Mark) { Y_VERIFY(!counter); - granule = SrcGranule.Granule; + granule = SrcGranule->Granule; } else { Y_VERIFY(counter); - NewGranules[granule] = {SrcGranule.PathId, ts}; + NewGranules.emplace(granule, std::make_pair(SrcGranule->PathId, mark)); } granuleRemap[counter] = granule; } @@ -105,11 +194,12 @@ public: return numSplitInto; } - THashMap<ui64, std::vector<std::pair<ui64, ui64>>> PathToGranule; // pathId -> {timestamp, granule} - TSrcGranule SrcGranule; - THashMap<ui64, std::pair<ui64, ui64>> NewGranules; // granule -> {pathId, key} - THashMap<ui64, ui32> TmpGranuleIds; // ts -> tmp granule id - TMap<ui64, ui64> MergeBorders; + TMark DefaultMark; + THashMap<ui64, std::vector<std::pair<TMark, ui64>>> PathToGranule; // pathId -> {mark, granule} + std::optional<TSrcGranule> SrcGranule; + THashMap<ui64, std::pair<ui64, TMark>> NewGranules; // granule -> {pathId, key} + THashMap<TMark, ui32> TmpGranuleIds; // mark -> tmp granule id + TMap<TMark, ui64> MergeBorders; ui64 FirstGranuleId{0}; ui32 ReservedGranuleIds{0}; }; @@ -138,6 +228,19 @@ public: bool HasOverloadedGranules() const override { return !PathsGranulesOverloaded.empty(); } + TString SerializeMark(const std::shared_ptr<arrow::Scalar>& scalar) const override { + Y_VERIFY_S(scalar->type->Equals(MarkType), scalar->type->ToString() + ", expected " + MarkType->ToString()); + return TMark(scalar).Serialize(); + } + + std::shared_ptr<arrow::Scalar> DeserializeMark(const TString& key) const override { + return TMark(key, MarkType).Border; + } + + const std::shared_ptr<arrow::DataType>& GetMarkType() const { + return MarkType; + } + bool Load(IDbWrapper& db, const THashSet<ui64>& pathsToDrop = {}) override; std::shared_ptr<TColumnEngineChanges> StartInsert(TVector<TInsertedData>&& dataToIndex) override; std::shared_ptr<TColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo, @@ -172,15 +275,6 @@ public: /// @note called from EvictionActor static TVector<TString> EvictBlobs(const TIndexInfo& indexInfo, std::shared_ptr<TColumnEngineChanges> changes); - static ui64 ExtractKey(const TString& key) { - Y_VERIFY(key.size() == 8); - return *reinterpret_cast<const ui64*>(key.data()); - } - - static ui64 ExtractKey(const arrow::Scalar& scalar) { - return static_cast<const arrow::TimestampScalar&>(scalar).value; - } - private: struct TGranuleMeta { TGranuleRecord Record; @@ -206,11 +300,12 @@ private: TIndexInfo IndexInfo; TCompactionLimits Limits; ui64 TabletId; + std::shared_ptr<arrow::DataType> MarkType; std::shared_ptr<TGranulesTable> GranulesTable; std::shared_ptr<TColumnsTable> ColumnsTable; std::shared_ptr<TCountersTable> CountersTable; THashMap<ui64, std::shared_ptr<TGranuleMeta>> Granules; // granule -> meta - THashMap<ui64, TMap<ui64, ui64>> PathGranules; // path_id -> {timestamp, granule} + THashMap<ui64, TMap<TMark, ui64>> PathGranules; // path_id -> {mark, granule} TMap<ui64, std::shared_ptr<TColumnEngineStats>> PathStats; // per path_id stats sorted by path_id THashSet<ui64> GranulesInSplit; THashSet<ui64> EmptyGranules; @@ -243,7 +338,7 @@ private: bool LoadCounters(IDbWrapper& db); bool ApplyChanges(IDbWrapper& db, const TChanges& changes, const TSnapshot& snapshot, bool apply); - void EraseGranule(ui64 pathId, ui64 granule, ui64 ts); + void EraseGranule(ui64 pathId, ui64 granule, const TMark& mark); bool SetGranule(const TGranuleRecord& rec, bool apply); bool UpsertPortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true); bool ErasePortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true); @@ -256,17 +351,19 @@ private: TMap<TSnapshot, TVector<ui64>> GetOrderedPortions(ui64 granule, const TSnapshot& snapshot = TSnapshot::Max()) const; void UpdateOverloaded(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules); - TVector<TVector<std::pair<ui64, ui64>>> EmptyGranuleTracks(ui64 pathId) const; + TVector<TVector<std::pair<TMark, ui64>>> EmptyGranuleTracks(ui64 pathId) const; }; -std::shared_ptr<arrow::TimestampArray> GetTimestampColumn(const TIndexInfo& indexInfo, - const std::shared_ptr<arrow::RecordBatch>& batch); +std::shared_ptr<arrow::Array> GetFirstPKColumn(const TIndexInfo& indexInfo, + const std::shared_ptr<arrow::RecordBatch>& batch); THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> -SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const TMap<ui64, ui64>& tsGranules, +SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, + const TMap<TColumnEngineForLogs::TMark, ui64>& tsGranules, const TIndexInfo& indexInfo); THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> -SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::pair<ui64, ui64>>& tsGranules, +SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch, + const std::vector<std::pair<TColumnEngineForLogs::TMark, ui64>>& tsGranules, const TIndexInfo& indexInfo); } diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp index 89352303602..a93981f4f85 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp +++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp @@ -42,19 +42,19 @@ bool TDbWrapper::Load(THashMap<TWriteId, TInsertedData>& inserted, return NColumnShard::Schema::InsertTable_Load(db, DsGroupSelector, inserted, committed, aborted, loadTime); } -void TDbWrapper::WriteGranule(ui32 index, const TGranuleRecord& row) { +void TDbWrapper::WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) { NIceDb::TNiceDb db(Database); - NColumnShard::Schema::IndexGranules_Write(db, index, row); + NColumnShard::Schema::IndexGranules_Write(db, index, engine, row); } -void TDbWrapper::EraseGranule(ui32 index, const TGranuleRecord& row) { +void TDbWrapper::EraseGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) { NIceDb::TNiceDb db(Database); - NColumnShard::Schema::IndexGranules_Erase(db, index, row); + NColumnShard::Schema::IndexGranules_Erase(db, index, engine, row); } -bool TDbWrapper::LoadGranules(ui32 index, std::function<void(TGranuleRecord&&)> callback) { +bool TDbWrapper::LoadGranules(ui32 index, const IColumnEngine& engine, std::function<void(TGranuleRecord&&)> callback) { NIceDb::TNiceDb db(Database); - return NColumnShard::Schema::IndexGranules_Load(db, index, callback); + return NColumnShard::Schema::IndexGranules_Load(db, index, engine, callback); } void TDbWrapper::WriteColumn(ui32 index, const TColumnRecord& row) { diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h index 0b117c291b7..632201ea998 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.h +++ b/ydb/core/tx/columnshard/engines/db_wrapper.h @@ -10,6 +10,7 @@ namespace NKikimr::NOlap { struct TInsertedData; struct TColumnRecord; struct TGranuleRecord; +class IColumnEngine; class IDbWrapper { public: @@ -27,9 +28,9 @@ public: THashMap<TWriteId, TInsertedData>& aborted, const TInstant& loadTime) = 0; - virtual void WriteGranule(ui32 index, const TGranuleRecord& row) = 0; - virtual void EraseGranule(ui32 index, const TGranuleRecord& row) = 0; - virtual bool LoadGranules(ui32 index, std::function<void(TGranuleRecord&&)> callback) = 0; + virtual void WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) = 0; + virtual void EraseGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) = 0; + virtual bool LoadGranules(ui32 index, const IColumnEngine& engine, std::function<void(TGranuleRecord&&)> callback) = 0; virtual void WriteColumn(ui32 index, const TColumnRecord& row) = 0; virtual void EraseColumn(ui32 index, const TColumnRecord& row) = 0; @@ -58,9 +59,9 @@ public: THashMap<TWriteId, TInsertedData>& aborted, const TInstant& loadTime) override; - void WriteGranule(ui32 index, const TGranuleRecord& row) override; - void EraseGranule(ui32 index, const TGranuleRecord& row) override; - bool LoadGranules(ui32 index, std::function<void(TGranuleRecord&&)> callback) override; + void WriteGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) override; + void EraseGranule(ui32 index, const IColumnEngine& engine, const TGranuleRecord& row) override; + bool LoadGranules(ui32 index, const IColumnEngine& engine, std::function<void(TGranuleRecord&&)> callback) override; void WriteColumn(ui32 index, const TColumnRecord& row) override; void EraseColumn(ui32 index, const TColumnRecord& row) override; diff --git a/ydb/core/tx/columnshard/engines/granules_table.h b/ydb/core/tx/columnshard/engines/granules_table.h index 1d955fdf6c1..8940a7d3ce5 100644 --- a/ydb/core/tx/columnshard/engines/granules_table.h +++ b/ydb/core/tx/columnshard/engines/granules_table.h @@ -5,12 +5,21 @@ namespace NKikimr::NOlap { struct TGranuleRecord { ui64 PathId; - TString IndexKey; ui64 Granule; TSnapshot CreatedAt; + std::shared_ptr<arrow::Scalar> Mark; + + TGranuleRecord(ui64 pathId, ui64 granule, const TSnapshot& createdAt, const std::shared_ptr<arrow::Scalar>& mark) + : PathId(pathId) + , Granule(granule) + , CreatedAt(createdAt) + , Mark(mark) + { + Y_VERIFY(Mark); + } bool operator == (const TGranuleRecord& rec) const { - return (PathId == rec.PathId) && (IndexKey == rec.IndexKey); + return (PathId == rec.PathId) && (Mark->Equals(*rec.Mark)); } friend IOutputStream& operator << (IOutputStream& out, const TGranuleRecord& rec) { @@ -25,23 +34,25 @@ struct TGranuleRecord { class TGranulesTable { public: - TGranulesTable(ui32 indexId) - : IndexId(indexId) + TGranulesTable(const IColumnEngine& engine, ui32 indexId) + : Engine(engine) + , IndexId(indexId) {} void Write(IDbWrapper& db, const TGranuleRecord& row) { - db.WriteGranule(IndexId, row); + db.WriteGranule(IndexId, Engine, row); } void Erase(IDbWrapper& db, const TGranuleRecord& row) { - db.EraseGranule(IndexId, row); + db.EraseGranule(IndexId, Engine, row); } bool Load(IDbWrapper& db, std::function<void(TGranuleRecord&&)> callback) { - return db.LoadGranules(IndexId, callback); + return db.LoadGranules(IndexId, Engine, callback); } private: + const IColumnEngine& Engine; ui32 IndexId; }; diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp index e7400f255ab..45c601d7c0d 100644 --- a/ydb/core/tx/columnshard/engines/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/index_info.cpp @@ -10,82 +10,6 @@ namespace NKikimr::NOlap { const TString TIndexInfo::STORE_INDEX_STATS_TABLE = TString("/") + NSysView::SysPathName + "/" + NSysView::StorePrimaryIndexStatsName; const TString TIndexInfo::TABLE_INDEX_STATS_TABLE = TString("/") + NSysView::SysPathName + "/" + NSysView::TablePrimaryIndexStatsName; -void ScalarToConstant(const arrow::Scalar& scalar, NKikimrSSA::TProgram_TConstant& value) { - switch (scalar.type->id()) { - case arrow::Type::BOOL: - value.SetBool(static_cast<const arrow::BooleanScalar&>(scalar).value); - break; - case arrow::Type::UINT8: - value.SetUint32(static_cast<const arrow::UInt8Scalar&>(scalar).value); - break; - case arrow::Type::UINT16: - value.SetUint32(static_cast<const arrow::UInt16Scalar&>(scalar).value); - break; - case arrow::Type::UINT32: - value.SetUint32(static_cast<const arrow::UInt32Scalar&>(scalar).value); - break; - case arrow::Type::UINT64: - value.SetUint64(static_cast<const arrow::UInt64Scalar&>(scalar).value); - break; - case arrow::Type::INT8: - value.SetInt32(static_cast<const arrow::Int8Scalar&>(scalar).value); - break; - case arrow::Type::INT16: - value.SetInt32(static_cast<const arrow::Int16Scalar&>(scalar).value); - break; - case arrow::Type::INT32: - value.SetInt32(static_cast<const arrow::Int32Scalar&>(scalar).value); - break; - case arrow::Type::INT64: - value.SetInt64(static_cast<const arrow::Int64Scalar&>(scalar).value); - break; - case arrow::Type::FLOAT: - value.SetFloat(static_cast<const arrow::FloatScalar&>(scalar).value); - break; - case arrow::Type::DOUBLE: - value.SetDouble(static_cast<const arrow::DoubleScalar&>(scalar).value); - break; - case arrow::Type::TIMESTAMP: - value.SetUint64(static_cast<const arrow::TimestampScalar&>(scalar).value); - break; - default: - Y_VERIFY(false, "Some types are not supported in min-max index yet"); // TODO - } -} - -std::shared_ptr<arrow::Scalar> ConstantToScalar(const NKikimrSSA::TProgram_TConstant& value, - const std::shared_ptr<arrow::DataType>& type) { - switch (type->id()) { - case arrow::Type::BOOL: - return std::make_shared<arrow::BooleanScalar>(value.GetBool()); - case arrow::Type::UINT8: - return std::make_shared<arrow::UInt8Scalar>(value.GetUint32()); - case arrow::Type::UINT16: - return std::make_shared<arrow::UInt16Scalar>(value.GetUint32()); - case arrow::Type::UINT32: - return std::make_shared<arrow::UInt32Scalar>(value.GetUint32()); - case arrow::Type::UINT64: - return std::make_shared<arrow::UInt64Scalar>(value.GetUint64()); - case arrow::Type::INT8: - return std::make_shared<arrow::Int8Scalar>(value.GetInt32()); - case arrow::Type::INT16: - return std::make_shared<arrow::Int16Scalar>(value.GetInt32()); - case arrow::Type::INT32: - return std::make_shared<arrow::Int32Scalar>(value.GetInt32()); - case arrow::Type::INT64: - return std::make_shared<arrow::Int64Scalar>(value.GetInt64()); - case arrow::Type::FLOAT: - return std::make_shared<arrow::FloatScalar>(value.GetFloat()); - case arrow::Type::DOUBLE: - return std::make_shared<arrow::DoubleScalar>(value.GetDouble()); - case arrow::Type::TIMESTAMP: - return std::make_shared<arrow::TimestampScalar>(value.GetUint64(), type); - default: - Y_VERIFY(false, "Some types are not supported in min-max index yet"); // TODO - } - return {}; -} - TVector<TRawTypeValue> TIndexInfo::ExtractKey(const THashMap<ui32, TCell>& fields, bool allowNulls) const { TVector<TRawTypeValue> key; key.reserve(KeyColumns.size()); @@ -107,15 +31,16 @@ TVector<TRawTypeValue> TIndexInfo::ExtractKey(const THashMap<ui32, TCell>& field std::shared_ptr<arrow::RecordBatch> TIndexInfo::PrepareForInsert(const TString& data, const TString& metadata, TString& strError) const { std::shared_ptr<arrow::Schema> schema = ArrowSchema(); + std::shared_ptr<arrow::Schema> differentSchema; if (metadata.size()) { - schema = NArrow::DeserializeSchema(metadata); - if (!schema) { + differentSchema = NArrow::DeserializeSchema(metadata); + if (!differentSchema) { strError = "DeserializeSchema() failed"; return {}; } } - auto batch = NArrow::DeserializeBatch(data, schema); + auto batch = NArrow::DeserializeBatch(data, (differentSchema ? differentSchema : schema)); if (!batch) { strError = "DeserializeBatch() failed"; return {}; @@ -124,23 +49,9 @@ std::shared_ptr<arrow::RecordBatch> TIndexInfo::PrepareForInsert(const TString& strError = "empty batch"; return {}; } - auto status = batch->ValidateFull(); - if (!status.ok()) { - auto tmp = status.ToString(); - strError = TString(tmp.data(), tmp.size()); - return {}; - } - - // Require all the columns for now. It's possible to ommit some in future. - for (auto& field : schema->fields()) { - if (!batch->GetColumnByName(field->name())) { - strError = "missing column '" + field->name() + "'"; - return {}; - } - } // Correct schema - if (metadata.size()) { + if (differentSchema) { batch = NArrow::ExtractColumns(batch, ArrowSchema()); if (!batch) { strError = "cannot correct schema"; @@ -148,6 +59,11 @@ std::shared_ptr<arrow::RecordBatch> TIndexInfo::PrepareForInsert(const TString& } } + if (!batch->schema()->Equals(ArrowSchema())) { + strError = "unexpected schema for insert batch: '" + batch->schema()->ToString() + "'"; + return {}; + } + // Check PK is NOT NULL for (auto& field : SortingKey->fields()) { auto column = batch->GetColumnByName(field->name()); @@ -161,6 +77,13 @@ std::shared_ptr<arrow::RecordBatch> TIndexInfo::PrepareForInsert(const TString& } } + auto status = batch->ValidateFull(); + if (!status.ok()) { + auto tmp = status.ToString(); + strError = TString(tmp.data(), tmp.size()); + return {}; + } + Y_VERIFY(SortingKey); batch = NArrow::SortBatch(batch, SortingKey); Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortingKey)); diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index 7484a05c9a9..218c8dfbbf0 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -1,7 +1,7 @@ #pragma once #include "defs.h" +#include "scalars.h" #include <ydb/core/tablet_flat/flat_dbase_scheme.h> -#include <ydb/core/protos/tx_columnshard.pb.h> #include <ydb/core/sys_view/common/schema.h> namespace arrow { @@ -16,10 +16,6 @@ namespace NKikimr::NArrow { namespace NKikimr::NOlap { -void ScalarToConstant(const arrow::Scalar& scalar, NKikimrSSA::TProgram_TConstant& value); -std::shared_ptr<arrow::Scalar> ConstantToScalar(const NKikimrSSA::TProgram_TConstant& value, - const std::shared_ptr<arrow::DataType>& type); - template <typename T> static std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const T& ids) { std::vector<std::shared_ptr<arrow::Field>> fields; diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 4b98d5329cc..7f1594d9a97 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -12,6 +12,8 @@ namespace NKikimr::NOlap { namespace { +using TMark = TColumnEngineForLogs::TMark; + // Slices a batch into smaller batches and appends them to result vector (which might be non-empty already) void SliceBatch(const std::shared_ptr<arrow::RecordBatch>& batch, const int64_t maxRowsInBatch, @@ -61,14 +63,14 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v TVector<TVector<std::shared_ptr<arrow::RecordBatch>>> rangesSlices; // rangesSlices[rangeNo][sliceNo] rangesSlices.reserve(batches.size()); { - TMap<ui64, TVector<std::shared_ptr<arrow::RecordBatch>>> points; + TMap<TMark, TVector<std::shared_ptr<arrow::RecordBatch>>> points; for (auto& batch : batches) { - std::shared_ptr<arrow::TimestampArray> keyColumn = GetTimestampColumn(indexInfo, batch); + std::shared_ptr<arrow::Array> keyColumn = GetFirstPKColumn(indexInfo, batch); Y_VERIFY(keyColumn && keyColumn->length() > 0); - ui64 min = keyColumn->Value(0); - ui64 max = keyColumn->Value(keyColumn->length() - 1); + TMark min(*keyColumn->GetScalar(0)); + TMark max(*keyColumn->GetScalar(keyColumn->length() - 1)); points[min].push_back(batch); // insert start points[max].push_back({}); // insert end @@ -146,6 +148,7 @@ THashMap<TBlobRange, ui64> TIndexedReadData::InitRead(ui32 inputBatch, bool inGr Y_VERIFY(ReadMetadata->LoadSchema); Y_VERIFY(ReadMetadata->ResultSchema); Y_VERIFY(IndexInfo().GetSortingKey()); + Y_VERIFY(IndexInfo().GetIndexKey() && IndexInfo().GetIndexKey()->num_fields()); SortReplaceDescription = IndexInfo().SortReplaceDescription(); @@ -203,14 +206,14 @@ THashMap<TBlobRange, ui64> TIndexedReadData::InitRead(ui32 inputBatch, bool inGr // Init split by granules structs for (auto& rec : ReadMetadata->SelectInfo->Granules) { - Y_VERIFY(rec.IndexKey.size() == 8); - ui64 ts = TColumnEngineForLogs::ExtractKey(rec.IndexKey); // TODO: support other engines - TsGranules.emplace(ts, rec.Granule); + TsGranules.emplace(rec.Mark, rec.Granule); } - if (!TsGranules.count(0)) { + + TMark minMark(IndexInfo().GetIndexKey()->field(0)->type()); + if (!TsGranules.count(minMark)) { // committed data before the first granule would be placed in fake (0,0) granule // committed data after the last granule would be placed into the last granule (or here if none) - TsGranules.emplace(0, 0); + TsGranules.emplace(minMark, 0); } auto& stats = ReadMetadata->ReadStats; diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index ecb594caa11..a080e42c33a 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -1,6 +1,7 @@ #pragma once #include "defs.h" #include "column_engine.h" +#include "column_engine_logs.h" // for TColumnEngineForLogs::TMark #include "predicate.h" namespace NKikimr::NColumnShard { @@ -228,7 +229,7 @@ private: THashMap<ui64, ui64> PortionGranule; // portion -> granule THashMap<ui64, ui32> GranuleWaits; // granule -> num portions to wait TDeque<ui64> GranulesOutOrder; - TMap<ui64, ui64> TsGranules; // ts (key) -> granule + TMap<TColumnEngineForLogs::TMark, ui64> TsGranules; // ts (key) -> granule THashSet<ui64> PortionsWithSelfDups; std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription; diff --git a/ydb/core/tx/columnshard/engines/scalars.cpp b/ydb/core/tx/columnshard/engines/scalars.cpp new file mode 100644 index 00000000000..3b45e293649 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scalars.cpp @@ -0,0 +1,229 @@ +#include "scalars.h" +#include <ydb/core/util/yverify_stream.h> + +namespace NKikimr::NOlap { + +void ScalarToConstant(const arrow::Scalar& scalar, NKikimrSSA::TProgram_TConstant& value) { + switch (scalar.type->id()) { + case arrow::Type::BOOL: + value.SetBool(static_cast<const arrow::BooleanScalar&>(scalar).value); + break; + case arrow::Type::UINT8: + value.SetUint32(static_cast<const arrow::UInt8Scalar&>(scalar).value); + break; + case arrow::Type::UINT16: + value.SetUint32(static_cast<const arrow::UInt16Scalar&>(scalar).value); + break; + case arrow::Type::UINT32: + value.SetUint32(static_cast<const arrow::UInt32Scalar&>(scalar).value); + break; + case arrow::Type::UINT64: + value.SetUint64(static_cast<const arrow::UInt64Scalar&>(scalar).value); + break; + case arrow::Type::INT8: + value.SetInt32(static_cast<const arrow::Int8Scalar&>(scalar).value); + break; + case arrow::Type::INT16: + value.SetInt32(static_cast<const arrow::Int16Scalar&>(scalar).value); + break; + case arrow::Type::INT32: + value.SetInt32(static_cast<const arrow::Int32Scalar&>(scalar).value); + break; + case arrow::Type::INT64: + value.SetInt64(static_cast<const arrow::Int64Scalar&>(scalar).value); + break; + case arrow::Type::FLOAT: + value.SetFloat(static_cast<const arrow::FloatScalar&>(scalar).value); + break; + case arrow::Type::DOUBLE: + value.SetDouble(static_cast<const arrow::DoubleScalar&>(scalar).value); + break; + case arrow::Type::DATE32: + value.SetInt32(static_cast<const arrow::Date32Scalar&>(scalar).value); + break; + case arrow::Type::DATE64: + value.SetInt64(static_cast<const arrow::Date64Scalar&>(scalar).value); + break; + case arrow::Type::TIMESTAMP: + // TODO: signed timestamps + value.SetUint64(static_cast<const arrow::TimestampScalar&>(scalar).value); + break; + case arrow::Type::TIME32: + value.SetInt32(static_cast<const arrow::Time32Scalar&>(scalar).value); + break; + case arrow::Type::TIME64: + value.SetInt64(static_cast<const arrow::Time64Scalar&>(scalar).value); + break; + case arrow::Type::INTERVAL_MONTHS: + value.SetInt32(static_cast<const arrow::MonthIntervalScalar&>(scalar).value); + break; + case arrow::Type::DURATION: + value.SetInt64(static_cast<const arrow::DurationScalar&>(scalar).value); + break; + case arrow::Type::STRING: { + auto& buffer = static_cast<const arrow::StringScalar&>(scalar).value; + value.SetText(TString(reinterpret_cast<const char *>(buffer->data()), buffer->size())); + break; + } + case arrow::Type::LARGE_STRING: { + auto& buffer = static_cast<const arrow::LargeStringScalar&>(scalar).value; + value.SetText(TString(reinterpret_cast<const char *>(buffer->data()), buffer->size())); + break; + } + case arrow::Type::BINARY: { + auto& buffer = static_cast<const arrow::BinaryScalar&>(scalar).value; + value.SetBytes(TString(reinterpret_cast<const char *>(buffer->data()), buffer->size())); + break; + } + case arrow::Type::LARGE_BINARY: { + auto& buffer = static_cast<const arrow::LargeBinaryScalar&>(scalar).value; + value.SetBytes(TString(reinterpret_cast<const char *>(buffer->data()), buffer->size())); + break; + } + case arrow::Type::FIXED_SIZE_BINARY: { + auto& buffer = static_cast<const arrow::FixedSizeBinaryScalar&>(scalar).value; + value.SetBytes(TString(reinterpret_cast<const char *>(buffer->data()), buffer->size())); + break; + } + default: + Y_VERIFY_S(false, "Some types have no constant conversion yet: " << scalar.type->ToString()); + } +} + +std::shared_ptr<arrow::Scalar> ConstantToScalar(const NKikimrSSA::TProgram_TConstant& value, + const std::shared_ptr<arrow::DataType>& type) { + switch (type->id()) { + case arrow::Type::BOOL: + return std::make_shared<arrow::BooleanScalar>(value.GetBool()); + case arrow::Type::UINT8: + return std::make_shared<arrow::UInt8Scalar>(value.GetUint32()); + case arrow::Type::UINT16: + return std::make_shared<arrow::UInt16Scalar>(value.GetUint32()); + case arrow::Type::UINT32: + return std::make_shared<arrow::UInt32Scalar>(value.GetUint32()); + case arrow::Type::UINT64: + return std::make_shared<arrow::UInt64Scalar>(value.GetUint64()); + case arrow::Type::INT8: + return std::make_shared<arrow::Int8Scalar>(value.GetInt32()); + case arrow::Type::INT16: + return std::make_shared<arrow::Int16Scalar>(value.GetInt32()); + case arrow::Type::INT32: + return std::make_shared<arrow::Int32Scalar>(value.GetInt32()); + case arrow::Type::INT64: + return std::make_shared<arrow::Int64Scalar>(value.GetInt64()); + case arrow::Type::FLOAT: + return std::make_shared<arrow::FloatScalar>(value.GetFloat()); + case arrow::Type::DOUBLE: + return std::make_shared<arrow::DoubleScalar>(value.GetDouble()); + case arrow::Type::DATE32: + return std::make_shared<arrow::Date32Scalar>(value.GetInt32()); + case arrow::Type::DATE64: + return std::make_shared<arrow::Date64Scalar>(value.GetInt64()); + case arrow::Type::TIMESTAMP: + if (value.HasUint64()) { + return std::make_shared<arrow::TimestampScalar>(value.GetUint64(), type); + } else if (value.HasInt64()) { + return std::make_shared<arrow::TimestampScalar>(value.GetInt64(), type); + } else { + Y_VERIFY(false, "Unexpected timestamp"); + } + case arrow::Type::TIME32: + return std::make_shared<arrow::Time32Scalar>(value.GetInt32(), type); + case arrow::Type::TIME64: + return std::make_shared<arrow::Time64Scalar>(value.GetInt64(), type); + case arrow::Type::INTERVAL_MONTHS: + return std::make_shared<arrow::MonthIntervalScalar>(value.GetInt32()); + case arrow::Type::DURATION: + return std::make_shared<arrow::DurationScalar>(value.GetInt64(), type); + case arrow::Type::STRING: + return std::make_shared<arrow::StringScalar>(value.GetText()); + case arrow::Type::LARGE_STRING: + return std::make_shared<arrow::LargeStringScalar>(value.GetText()); + case arrow::Type::BINARY: { + std::string bytes = value.GetBytes(); + return std::make_shared<arrow::BinaryScalar>(arrow::Buffer::FromString(bytes)); + } + case arrow::Type::LARGE_BINARY: { + std::string bytes = value.GetBytes(); + return std::make_shared<arrow::LargeBinaryScalar>(arrow::Buffer::FromString(bytes)); + } + case arrow::Type::FIXED_SIZE_BINARY: { + std::string bytes = value.GetBytes(); + return std::make_shared<arrow::FixedSizeBinaryScalar>(arrow::Buffer::FromString(bytes), type); + } + default: + Y_VERIFY_S(false, "Some types have no constant conversion yet: " << type->ToString()); + } + return {}; +} + +TString SerializeKeyScalar(const std::shared_ptr<arrow::Scalar>& key) { + TString out; + NArrow::SwitchType(key->type->id(), [&](const auto& t) { + using TWrap = std::decay_t<decltype(t)>; + using T = typename TWrap::T; + using TScalar = typename arrow::TypeTraits<T>::ScalarType; + + if constexpr (std::is_same_v<T, arrow::StringType> || + std::is_same_v<T, arrow::BinaryType> || + std::is_same_v<T, arrow::LargeStringType> || + std::is_same_v<T, arrow::LargeBinaryType> || + std::is_same_v<T, arrow::FixedSizeBinaryType>) { + auto& buffer = static_cast<const TScalar&>(*key).value; + out = buffer->ToString(); + } else if constexpr (std::is_same_v<T, arrow::HalfFloatType>) { + return false; + } else if constexpr (arrow::is_temporal_type<T>::value || arrow::has_c_type<T>::value) { + using TCType = typename arrow::TypeTraits<T>::CType; + static_assert(std::is_same_v<TCType, typename TScalar::ValueType>); + + const TCType& value = static_cast<const TScalar&>(*key).value; + out = TString(reinterpret_cast<const char*>(&value), sizeof(value)); + Y_VERIFY_S(!out.empty(), key->type->ToString()); + } else { + return false; + } + return true; + }); + return out; +} + +std::shared_ptr<arrow::Scalar> DeserializeKeyScalar(const TString& key, const std::shared_ptr<arrow::DataType>& type) { + std::shared_ptr<arrow::Scalar> out; + NArrow::SwitchType(type->id(), [&](const auto& t) { + using TWrap = std::decay_t<decltype(t)>; + using T = typename TWrap::T; + using TScalar = typename arrow::TypeTraits<T>::ScalarType; + + if constexpr (std::is_same_v<T, arrow::StringType> || + std::is_same_v<T, arrow::BinaryType> || + std::is_same_v<T, arrow::LargeStringType> || + std::is_same_v<T, arrow::LargeBinaryType> || + std::is_same_v<T, arrow::FixedSizeBinaryType>) { + out = std::make_shared<TScalar>(arrow::Buffer::FromString(key), type); + } else if constexpr (std::is_same_v<T, arrow::HalfFloatType>) { + return false; + } else if constexpr (arrow::is_temporal_type<T>::value) { + using TCType = typename arrow::TypeTraits<T>::CType; + static_assert(std::is_same_v<TCType, typename TScalar::ValueType>); + + Y_VERIFY(key.size() == sizeof(TCType)); + TCType value = ReadUnaligned<TCType>(key.data()); + out = std::make_shared<TScalar>(value, type); + } else if constexpr (arrow::has_c_type<T>::value) { + using TCType = typename arrow::TypeTraits<T>::CType; + static_assert(std::is_same_v<TCType, typename TScalar::ValueType>); + + Y_VERIFY(key.size() == sizeof(TCType)); + TCType value = ReadUnaligned<TCType>(key.data()); + out = std::make_shared<TScalar>(value); + } else { + return false; + } + return true; + }); + Y_VERIFY_S(out, type->ToString()); + return out; +} + +} diff --git a/ydb/core/tx/columnshard/engines/scalars.h b/ydb/core/tx/columnshard/engines/scalars.h new file mode 100644 index 00000000000..02d6dac893a --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scalars.h @@ -0,0 +1,14 @@ +#pragma once +#include "defs.h" +#include <ydb/core/protos/tx_columnshard.pb.h> + +namespace NKikimr::NOlap { + +void ScalarToConstant(const arrow::Scalar& scalar, NKikimrSSA::TProgram_TConstant& value); +std::shared_ptr<arrow::Scalar> ConstantToScalar(const NKikimrSSA::TProgram_TConstant& value, + const std::shared_ptr<arrow::DataType>& type); + +TString SerializeKeyScalar(const std::shared_ptr<arrow::Scalar>& key); +std::shared_ptr<arrow::Scalar> DeserializeKeyScalar(const TString& key, const std::shared_ptr<arrow::DataType>& type); + +} diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp index 458b2282ec6..8c3a5239e3b 100644 --- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp @@ -26,9 +26,9 @@ public: return true; } - void WriteGranule(ui32, const TGranuleRecord&) override {} - void EraseGranule(ui32, const TGranuleRecord&) override {} - bool LoadGranules(ui32, std::function<void(TGranuleRecord&&)>) override { return true; } + void WriteGranule(ui32, const IColumnEngine&, const TGranuleRecord&) override {} + void EraseGranule(ui32, const IColumnEngine&, const TGranuleRecord&) override {} + bool LoadGranules(ui32, const IColumnEngine&, std::function<void(TGranuleRecord&&)>) override { return true; } void WriteColumn(ui32, const TColumnRecord&) override {} void EraseColumn(ui32, const TColumnRecord&) override {} diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 953e700a5d6..48286575d0a 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -54,7 +54,7 @@ public: return true; } - void WriteGranule(ui32 index, const TGranuleRecord& row) override { + void WriteGranule(ui32 index, const IColumnEngine&, const TGranuleRecord& row) override { auto& granules = Indices[index].Granules[row.PathId]; bool replaced = false; @@ -70,7 +70,7 @@ public: } } - void EraseGranule(ui32 index, const TGranuleRecord& row) override { + void EraseGranule(ui32 index, const IColumnEngine&, const TGranuleRecord& row) override { auto& pathGranules = Indices[index].Granules[row.PathId]; TVector<TGranuleRecord> filtered; @@ -83,7 +83,7 @@ public: pathGranules.swap(filtered); } - bool LoadGranules(ui32 index, std::function<void(TGranuleRecord&&)> callback) override { + bool LoadGranules(ui32 index, const IColumnEngine&, std::function<void(TGranuleRecord&&)> callback) override { auto& granules = Indices[index].Granules; for (auto& [pathId, vec] : granules) { for (auto& rec : vec) { @@ -171,31 +171,39 @@ static const TVector<std::pair<TString, TTypeInfo>> testKey = { {"uid", TTypeInfo(NTypeIds::Utf8) } }; -TIndexInfo TestTableInfo() { +TIndexInfo TestTableInfo(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema = testColumns, + const TVector<std::pair<TString, TTypeInfo>>& key = testKey) { TIndexInfo indexInfo("", 0); - for (ui32 i = 0; i < testColumns.size(); ++i) { + for (ui32 i = 0; i < ydbSchema.size(); ++i) { ui32 id = i + 1; - auto& name = testColumns[i].first; - auto& type = testColumns[i].second; + auto& name = ydbSchema[i].first; + auto& type = ydbSchema[i].second; indexInfo.Columns[id] = NTable::TColumn(name, id, type); indexInfo.ColumnNames[name] = id; } - for (const auto& [keyName, keyType] : testKey) { + for (const auto& [keyName, keyType] : key) { indexInfo.KeyColumns.push_back(indexInfo.ColumnNames[keyName]); } - indexInfo.AddTtlColumn(testColumns[0].first); - + indexInfo.AddTtlColumn("timestamp"); return indexInfo; } +static NOlap::TTiersInfo MakeTtl(TInstant border) { + return NOlap::TTiersInfo("timestamp", border); +} + +template <typename TKeyDataType> class TBuilder { public: + using TTraits = typename arrow::TypeTraits<TKeyDataType>; + using TCType = std::conditional_t<arrow::has_c_type<TKeyDataType>::value, typename TTraits::CType, std::string>; + struct TRow { - ui64 Timestamp; + TCType Timestamp; std::string ResourceType; std::string ResourceId; std::string Uid; @@ -211,7 +219,7 @@ public: bool AddRow(const TRow& row) { bool ok = true; - ok = ok && BatchBuilder->GetFieldAs<arrow::TypeTraits<arrow::TimestampType>::BuilderType>(0)->Append(row.Timestamp).ok(); + ok = ok && BatchBuilder->GetFieldAs<typename TTraits::BuilderType>(0)->Append(row.Timestamp).ok(); ok = ok && BatchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append(row.ResourceType).ok(); ok = ok && BatchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(2)->Append(row.ResourceId).ok(); ok = ok && BatchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(3)->Append(row.Uid).ok(); @@ -226,10 +234,6 @@ public: return batch; } - static NOlap::TTiersInfo MakeTtl(TInstant border) { - return NOlap::TTiersInfo(testColumns[0].first, border); - } - private: std::shared_ptr<arrow::Schema> Schema = NArrow::MakeArrowSchema(testColumns); std::unique_ptr<arrow::RecordBatchBuilder> BatchBuilder; @@ -240,11 +244,12 @@ TBlobRange MakeBlobRange(ui32 step, ui32 blobSize) { return TBlobRange(TUnifiedBlobId(11111, TLogoBlobID(100500, 42, step, 3, blobSize, 0)), 0, blobSize); } -TString MakeTestBlob(ui64 start = 0, ui64 end = 100) { - TBuilder builder; - for (ui64 ts = start; ts < end; ++ts) { +TString MakeTestBlob(i64 start = 0, i64 end = 100) { + TBuilder<arrow::TimestampType> builder; + for (i64 ts = start; ts < end; ++ts) { TString str = ToString(ts); - builder.AddRow({ts, str, str, str, str}); + TString sortedStr = Sprintf("%05ld", (long)ts); + builder.AddRow({ts, sortedStr, str, str, str}); } auto batch = builder.Finish(); return NArrow::SerializeBatchNoCompression(batch); @@ -289,9 +294,9 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, return engine.ApplyChanges(db, changes, snap); } -bool Insert(TTestDbWrapper& db, TSnapshot snap, +bool Insert(const TIndexInfo& tableInfo, TTestDbWrapper& db, TSnapshot snap, TVector<TInsertedData>&& dataToIndex, THashMap<TBlobRange, TString>& blobs, ui32& step) { - TColumnEngineForLogs engine(TestTableInfo(), 0, TestLimits()); + TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits()); engine.Load(db); return Insert(engine, db, snap, std::move(dataToIndex), blobs, step); @@ -324,9 +329,9 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T return engine.ApplyChanges(db, changes, snap); } -bool Compact(TTestDbWrapper& db, TSnapshot snap, THashMap<TBlobRange, TString>&& blobs, ui32& step, - const TExpected& expected) { - TColumnEngineForLogs engine(TestTableInfo(), 0, TestLimits()); +bool Compact(const TIndexInfo& tableInfo, TTestDbWrapper& db, TSnapshot snap, THashMap<TBlobRange, + TString>&& blobs, ui32& step, const TExpected& expected) { + TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits()); engine.Load(db); return Compact(engine, db, snap, std::move(blobs), step, expected); } @@ -361,12 +366,26 @@ std::shared_ptr<TPredicate> MakePredicate(int64_t ts, NArrow::EOperation op) { return p; } +std::shared_ptr<TPredicate> MakeStrPredicate(const std::string& key, NArrow::EOperation op) { + auto p = std::make_shared<TPredicate>(); + p->Operation = op; + + auto type = arrow::utf8(); + auto res = arrow::MakeArrayFromScalar(arrow::StringScalar(key), 1); + + std::vector<std::shared_ptr<arrow::Field>> fields = { std::make_shared<arrow::Field>("resource_type", type) }; + p->Batch = arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(fields), 1, {*res}); + return p; +} + } Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { - Y_UNIT_TEST(IndexWriteLoadRead) { + void WriteLoadRead(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema, + const TVector<std::pair<TString, TTypeInfo>>& key) { TTestDbWrapper db; + TIndexInfo tableInfo = TestTableInfo(ydbSchema, key); TVector<ui64> paths = {1, 2}; @@ -389,11 +408,11 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { THashMap<TBlobRange, TString> blobs; blobs[blobRanges[0]] = testBlob; blobs[blobRanges[1]] = testBlob; - Insert(db, {1, 2}, std::move(dataToIndex), blobs, step); + Insert(tableInfo, db, {1, 2}, std::move(dataToIndex), blobs, step); // load - TColumnEngineForLogs engine(TestTableInfo(), 0); + TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0); engine.Load(db); // selects @@ -440,8 +459,25 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { } } - Y_UNIT_TEST(IndexReadWithPredicates) { + Y_UNIT_TEST(IndexWriteLoadRead) { + WriteLoadRead(testColumns, testKey); + } + + Y_UNIT_TEST(IndexWriteLoadReadStrPK) { + TVector<std::pair<TString, TTypeInfo>> key = { + {"resource_type", TTypeInfo(NTypeIds::Utf8) }, + {"resource_id", TTypeInfo(NTypeIds::Utf8) }, + {"uid", TTypeInfo(NTypeIds::Utf8) }, + {"timestamp", TTypeInfo(NTypeIds::Timestamp) } + }; + + WriteLoadRead(testColumns, key); + } + + void ReadWithPredicates(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema, + const TVector<std::pair<TString, TTypeInfo>>& key) { TTestDbWrapper db; + TIndexInfo tableInfo = TestTableInfo(ydbSchema, key); ui64 pathId = 1; ui32 step = 1000; @@ -462,19 +498,19 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { dataToIndex.push_back( TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()}); - bool ok = Insert(db, {planStep, txId}, std::move(dataToIndex), blobs, step); + bool ok = Insert(tableInfo, db, {planStep, txId}, std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); } // compact planStep = 2; - bool ok = Compact(db, TSnapshot{planStep, 1}, std::move(blobs), step, {20, 4, 4}); + bool ok = Compact(tableInfo, db, TSnapshot{planStep, 1}, std::move(blobs), step, {20, 4, 4}); UNIT_ASSERT(ok); // load - TColumnEngineForLogs engine(TestTableInfo(), 0, TestLimits()); + TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits()); engine.Load(db); // read @@ -484,13 +520,13 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { planStep = 3; const TIndexInfo& indexInfo = engine.GetIndexInfo(); - THashSet<ui32> oneColumnId = { indexInfo.GetColumnId(testColumns[0].first) }; + THashSet<ui32> oneColumnId = { indexInfo.GetColumnId(key[0].first) }; { // full scan ui64 txId = 1; auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, {}, {}); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 4); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 4); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 4); } // predicates @@ -498,22 +534,44 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { ui64 txId = 1; std::shared_ptr<TPredicate> gt10k = MakePredicate(10000, NArrow::EOperation::Greater); + if (key[0].second == TTypeInfo(NTypeIds::Utf8)) { + gt10k = MakeStrPredicate("10000", NArrow::EOperation::Greater); + } auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, gt10k, {}); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2); } { ui64 txId = 1; std::shared_ptr<TPredicate> lt10k = MakePredicate(9999, NArrow::EOperation::Less); // TODO: better border checks + if (key[0].second == TTypeInfo(NTypeIds::Utf8)) { + lt10k = MakeStrPredicate("09999", NArrow::EOperation::Less); + } auto selectInfo = engine.Select(pathId, {planStep, txId}, oneColumnId, 0, lt10k); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2); UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2); } } + Y_UNIT_TEST(IndexReadWithPredicates) { + ReadWithPredicates(testColumns, testKey); + } + + Y_UNIT_TEST(IndexReadWithPredicatesStrPK) { + TVector<std::pair<TString, TTypeInfo>> key = { + {"resource_type", TTypeInfo(NTypeIds::Utf8) }, + {"resource_id", TTypeInfo(NTypeIds::Utf8) }, + {"uid", TTypeInfo(NTypeIds::Utf8) }, + {"timestamp", TTypeInfo(NTypeIds::Timestamp) } + }; + + ReadWithPredicates(testColumns, key); + } + Y_UNIT_TEST(IndexWriteOverload) { TTestDbWrapper db; + TIndexInfo tableInfo = TestTableInfo(); ui64 pathId = 1; ui32 step = 1000; @@ -521,7 +579,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // inserts ui64 planStep = 1; - TColumnEngineForLogs engine(TestTableInfo(), 0, TestLimits()); + TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits()); engine.Load(db); THashMap<TBlobRange, TString> blobs; @@ -551,7 +609,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { UNIT_ASSERT(overload); { // check it's overloaded after reload - TColumnEngineForLogs tmpEngine(TestTableInfo(), 0, TestLimits()); + TColumnEngineForLogs tmpEngine(TIndexInfo(tableInfo), 0, TestLimits()); tmpEngine.Load(db); UNIT_ASSERT(tmpEngine.GetOverloadedGranules(pathId)); } @@ -582,7 +640,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { } { // check it's not overloaded after reload - TColumnEngineForLogs tmpEngine(TestTableInfo(), 0, TestLimits()); + TColumnEngineForLogs tmpEngine(TIndexInfo(tableInfo), 0, TestLimits()); tmpEngine.Load(db); UNIT_ASSERT(!tmpEngine.GetOverloadedGranules(pathId)); } @@ -590,6 +648,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { Y_UNIT_TEST(IndexTtl) { TTestDbWrapper db; + TIndexInfo tableInfo = TestTableInfo(); ui64 pathId = 1; ui32 step = 1000; @@ -610,19 +669,19 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { dataToIndex.push_back( TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now()}); - bool ok = Insert(db, {planStep, txId}, std::move(dataToIndex), blobs, step); + bool ok = Insert(tableInfo, db, {planStep, txId}, std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); } // compact planStep = 2; - bool ok = Compact(db, TSnapshot{planStep, 1}, std::move(blobs), step, {20, 4, 4}); + bool ok = Compact(tableInfo, db, TSnapshot{planStep, 1}, std::move(blobs), step, {20, 4, 4}); UNIT_ASSERT(ok); // load - TColumnEngineForLogs engine(TestTableInfo(), 0, TestLimits()); + TColumnEngineForLogs engine(TIndexInfo(tableInfo), 0, TestLimits()); engine.Load(db); // read @@ -650,7 +709,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // TTL THashMap<ui64, NOlap::TTiersInfo> pathTtls; - pathTtls.emplace(pathId, TBuilder::MakeTtl(TInstant::MicroSeconds(10000))); + pathTtls.emplace(pathId, MakeTtl(TInstant::MicroSeconds(10000))); Ttl(engine, db, pathTtls, 2); // read + load + read diff --git a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp index 7dda4cdcdb8..5836e05b087 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp @@ -18,14 +18,18 @@ namespace NTypeIds = NScheme::NTypeIds; using TTypeId = NScheme::TTypeId; using TTypeInfo = NScheme::TTypeInfo; -static const TVector<std::pair<TString, TTypeInfo>> testYdbSchema = TTestSchema::YdbSchema(); -static const TVector<std::pair<TString, TTypeInfo>> testYdbPkSchema = TTestSchema::YdbPkSchema(); - +template <typename TKey = ui64> bool DataHas(const TVector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range, bool requireUniq = false) { - THashMap<ui64, ui32> keys; + static constexpr const bool isStrKey = std::is_same_v<TKey, std::string>; + + THashMap<TKey, ui32> keys; for (size_t i = range.first; i < range.second; ++i) { - keys.emplace(i, 0); + if constexpr (isStrKey) { + keys.emplace(ToString(i), 0); + } else { + keys.emplace(i, 0); + } } auto schema = NArrow::DeserializeSchema(srtSchema); @@ -35,10 +39,27 @@ bool DataHas(const TVector<TString>& blobs, const TString& srtSchema, std::pair< std::shared_ptr<arrow::Array> array = batch->GetColumnByName("timestamp"); UNIT_ASSERT(array); - auto& ts = dynamic_cast<const arrow::NumericArray<arrow::TimestampType>&>(*array); - for (int i = 0; i < ts.length(); ++i) { - ui64 value = ts.Value(i); + for (int i = 0; i < array->length(); ++i) { + TKey value{}; + + NArrow::SwitchType(array->type_id(), [&](const auto& type) { + using TWrap = std::decay_t<decltype(type)>; + using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType; + + if constexpr (isStrKey && arrow::has_string_view<typename TWrap::T>()) { + value = static_cast<const TArray&>(*array).GetString(i); + return true; + } + if constexpr (!isStrKey && arrow::has_c_type<typename TWrap::T>()) { + auto& column = static_cast<const TArray&>(*array); + value = column.Value(i); + return true; + } + UNIT_ASSERT(false); + return false; + }); + ++keys[value]; } } @@ -57,10 +78,17 @@ bool DataHas(const TVector<TString>& blobs, const TString& srtSchema, std::pair< return true; } +template <typename TKey = ui64> bool DataHasOnly(const TVector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range) { - THashSet<ui64> keys; + static constexpr const bool isStrKey = std::is_same_v<TKey, std::string>; + + THashSet<TKey> keys; for (size_t i = range.first; i < range.second; ++i) { - keys.emplace(i); + if constexpr (isStrKey) { + keys.emplace(ToString(i)); + } else { + keys.emplace(i); + } } auto schema = NArrow::DeserializeSchema(srtSchema); @@ -70,10 +98,27 @@ bool DataHasOnly(const TVector<TString>& blobs, const TString& srtSchema, std::p std::shared_ptr<arrow::Array> array = batch->GetColumnByName("timestamp"); UNIT_ASSERT(array); - auto& ts = dynamic_cast<const arrow::NumericArray<arrow::TimestampType>&>(*array); - for (int i = 0; i < ts.length(); ++i) { - ui64 value = ts.Value(i); + for (int i = 0; i < array->length(); ++i) { + ui64 value{}; + + NArrow::SwitchType(array->type_id(), [&](const auto& type) { + using TWrap = std::decay_t<decltype(type)>; + using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType; + + if constexpr (isStrKey && arrow::has_string_view<typename TWrap::T>()) { + value = static_cast<const TArray&>(*array).GetView(i); + return true; + } + if constexpr (!isStrKey && arrow::has_c_type<typename TWrap::T>()) { + auto& column = static_cast<const TArray&>(*array); + value = column.Value(i); + return true; + } + UNIT_ASSERT(false); + return false; + }); + if (!keys.count(value)) { Cerr << "Unexpected key: " << value << "\n"; return false; @@ -86,10 +131,12 @@ bool DataHasOnly(const TVector<TString>& blobs, const TString& srtSchema, std::p template <typename TArrowType> bool CheckTypedIntValues(const std::shared_ptr<arrow::Array>& array, const std::vector<int64_t>& expected) { + using TArray = typename arrow::TypeTraits<TArrowType>::ArrayType; + UNIT_ASSERT(array); UNIT_ASSERT_VALUES_EQUAL(array->length(), (int)expected.size()); - auto& column = dynamic_cast<const arrow::NumericArray<TArrowType>&>(*array); + auto& column = dynamic_cast<const TArray&>(*array); for (int i = 0; i < column.length(); ++i) { auto value = column.Value(i); @@ -98,12 +145,14 @@ bool CheckTypedIntValues(const std::shared_ptr<arrow::Array>& array, const std:: return true; } -template <typename TArrowArrayType> +template <typename TArrowType> bool CheckTypedStrValues(const std::shared_ptr<arrow::Array>& array, const std::vector<std::string>& expected) { + using TArray = typename arrow::TypeTraits<TArrowType>::ArrayType; + UNIT_ASSERT(array); UNIT_ASSERT_VALUES_EQUAL(array->length(), (int)expected.size()); - auto& column = dynamic_cast<const TArrowArrayType&>(*array); + auto& column = dynamic_cast<const TArray&>(*array); for (int i = 0; i < column.length(); ++i) { auto value = column.GetString(i); @@ -150,11 +199,11 @@ bool CheckIntValues(const std::shared_ptr<arrow::Array>& array, const std::vecto return CheckTypedIntValues<arrow::DoubleType>(array, expected); case arrow::Type::STRING: - return CheckTypedStrValues<arrow::StringArray>(array, expectedStr); + return CheckTypedStrValues<arrow::StringType>(array, expectedStr); case arrow::Type::BINARY: - return CheckTypedStrValues<arrow::BinaryArray>(array, expectedStr); + return CheckTypedStrValues<arrow::BinaryType>(array, expectedStr); case arrow::Type::FIXED_SIZE_BINARY: - return CheckTypedStrValues<arrow::FixedSizeBinaryArray>(array, expectedStr); + return CheckTypedStrValues<arrow::FixedSizeBinaryType>(array, expectedStr); default: Cerr << "type : " << array->type()->ToString() << "\n"; @@ -171,14 +220,37 @@ bool CheckOrdered(const TString& blob, const TString& srtSchema) { std::shared_ptr<arrow::Array> array = batch->GetColumnByName("timestamp"); UNIT_ASSERT(array); - auto& ts = dynamic_cast<const arrow::NumericArray<arrow::TimestampType>&>(*array); - if (!ts.length()) { + if (!array->length()) { return true; } - ui64 prev = ts.Value(0); - for (int i = 1; i < ts.length(); ++i) { - ui64 value = ts.Value(i); + ui64 prev{}; + for (int i = 0; i < array->length(); ++i) { + ui64 value{}; + + NArrow::SwitchType(array->type_id(), [&](const auto& type) { + using TWrap = std::decay_t<decltype(type)>; + using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType; +#if 0 + if constexpr (isStrKey && arrow::has_string_view<typename TWrap::T>()) { + value = static_cast<const TArray&>(*array).GetView(i); + return true; + } +#endif + if constexpr (/*!isStrKey && */arrow::has_c_type<typename TWrap::T>()) { + auto& column = static_cast<const TArray&>(*array); + value = column.Value(i); + return true; + } + UNIT_ASSERT(false); + return false; + }); + + if (!i) { + prev = value; + continue; + } + if (prev > value) { Cerr << "Unordered: " << prev << " " << value << "\n"; return false; @@ -212,9 +284,11 @@ bool CheckColumns(const TString& blob, const NKikimrTxColumnShard::TMetadata& me void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId, const TVector<std::pair<TString, TTypeInfo>>& schema = TTestSchema::YdbSchema(), - NOlap::TSnapshot snap = {10, 10}, TString codec = "") { + const TVector<std::pair<TString, TTypeInfo>>& pk = TTestSchema::YdbPkSchema(), + TString codec = "none") { + NOlap::TSnapshot snap = {10, 10}; bool ok = ProposeSchemaTx(runtime, sender, - TTestSchema::CreateTableTxBody(pathId, schema, + TTestSchema::CreateTableTxBody(pathId, schema, pk, TTestSchema::TTableSpecials().WithCodec(codec)), snap); UNIT_ASSERT(ok); @@ -260,14 +334,34 @@ void TestWrite(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema) { ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema)); UNIT_ASSERT(!ok); - // wrong type - // TODO: better check (it probably does not work in general case) + // wrong first key column type (with supported layout: Int64 vs Timestamp) + // It fails only if we specify source schema. No way to detect it from serialized batch data. schema = ydbSchema; - schema[1].second = TTypeInfo(NTypeIds::Int32); - ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema)); + schema[0].second = TTypeInfo(NTypeIds::Int64); + ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema), + NArrow::MakeArrowSchema(schema)); UNIT_ASSERT(!ok); + // wrong type (no additional schema - fails in case of wrong layout) + + for (size_t i = 0; i < ydbSchema.size(); ++i) { + schema = ydbSchema; + schema[i].second = TTypeInfo(NTypeIds::Int8); + ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema)); + UNIT_ASSERT(!ok); + } + + // wrong type (with additional schema) + + for (size_t i = 0; i < ydbSchema.size(); ++i) { + schema = ydbSchema; + schema[i].second = TTypeInfo(NTypeIds::Int64); + ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema), + NArrow::MakeArrowSchema(schema)); + UNIT_ASSERT(ok == (ydbSchema[i].second == TTypeInfo(NTypeIds::Int64))); + } + schema = ydbSchema; schema[1].second = TTypeInfo(NTypeIds::Utf8); schema[5].second = TTypeInfo(NTypeIds::Int32); @@ -276,17 +370,12 @@ void TestWrite(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema) { // reordered columns + THashMap<TString, TTypeInfo> remap(ydbSchema.begin(), ydbSchema.end()); + schema.resize(0); - schema.push_back({"level", TTypeInfo(NTypeIds::Int32) }); - schema.push_back({"timestamp", TTypeInfo(NTypeIds::Timestamp) }); - schema.push_back({"uid", TTypeInfo(NTypeIds::Utf8) }); - schema.push_back({"resource_id", TTypeInfo(NTypeIds::Utf8) }); - schema.push_back({"resource_type", TTypeInfo(NTypeIds::Utf8) }); - schema.push_back({"message", TTypeInfo(NTypeIds::Utf8) }); - schema.push_back({"request_id", TTypeInfo(NTypeIds::Utf8) }); - schema.push_back({"saved_at", TTypeInfo(NTypeIds::Timestamp) }); - schema.push_back({"ingested_at", TTypeInfo(NTypeIds::Timestamp) }); - schema.push_back({"json_payload", TTypeInfo(NTypeIds::Json) }); + for (auto& [name, typeInfo] : remap) { + schema.push_back({name, typeInfo}); + } ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, schema)); UNIT_ASSERT(!ok); @@ -365,6 +454,7 @@ void TestWriteReadDup() { } void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeInfo>>& ydbSchema = TTestSchema::YdbSchema(), + const TVector<std::pair<TString, TTypeInfo>>& testYdbPk = TTestSchema::YdbPkSchema(), TString codec = "") { TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -408,7 +498,7 @@ void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeInfo>>& y ui64 writeId = 0; ui64 tableId = 1; - SetupSchema(runtime, sender, tableId, ydbSchema, {10, 10}, codec); + SetupSchema(runtime, sender, tableId, ydbSchema, testYdbPk, codec); // ----xx // -----xx.. @@ -683,7 +773,8 @@ void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeInfo>>& y UNIT_ASSERT(resRead.GetData().size() > 0); //UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); //UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - if (resRead.GetFinished()) { + bool lastBach = resRead.GetFinished(); + if (lastBach) { expected = resRead.GetBatch() + 1; } readData.push_back(resRead.GetData()); @@ -694,18 +785,20 @@ void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeInfo>>& y } UNIT_ASSERT(CheckOrdered(resRead.GetData(), schema)); - UNIT_ASSERT(meta.HasReadStats()); - auto& readStats = meta.GetReadStats(); - - if (ydbSchema == TTestSchema::YdbSchema()) { - if (codec == "" || codec == "lz4") { - UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 50); - } else if (codec == "none") { - UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 75); - } else if (codec == "zstd") { - UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 26); - } else { - UNIT_ASSERT(false); + if (lastBach) { + UNIT_ASSERT(meta.HasReadStats()); + auto& readStats = meta.GetReadStats(); + + if (ydbSchema == TTestSchema::YdbSchema()) { + if (codec == "" || codec == "lz4") { + UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 50); + } else if (codec == "none") { + UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 75); + } else if (codec == "zstd") { + UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 26); + } else { + UNIT_ASSERT(false); + } } } } @@ -717,9 +810,9 @@ void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeInfo>>& y // read 11 (range predicate: closed interval) { - TSerializedTableRange range = MakeTestRange({10, 42}, true, true, testYdbPkSchema); + TSerializedTableRange range = MakeTestRange({10, 42}, true, true, testYdbPk); NOlap::TPredicate prGreater, prLess; - std::tie(prGreater, prLess) = RangePredicates(range, testYdbPkSchema); + std::tie(prGreater, prLess) = RangePredicates(range, testYdbPk); auto evRead = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, 24, txId, tableId); auto* greater = Proto(evRead.get()).MutableGreaterPredicate(); @@ -761,9 +854,9 @@ void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeInfo>>& y // read 12 (range predicate: open interval) { - TSerializedTableRange range = MakeTestRange({10, 42}, false, false, testYdbPkSchema); + TSerializedTableRange range = MakeTestRange({10, 42}, false, false, testYdbPk); NOlap::TPredicate prGreater, prLess; - std::tie(prGreater, prLess) = RangePredicates(range, testYdbPkSchema); + std::tie(prGreater, prLess) = RangePredicates(range, testYdbPk); auto evRead = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, 24, txId, tableId); auto* greater = Proto(evRead.get()).MutableGreaterPredicate(); @@ -805,7 +898,9 @@ void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeInfo>>& y UNIT_ASSERT(DataHasOnly(readData, schema, {11, 41 + 1})); } -void TestCompactionInGranuleImpl(bool reboots) { +void TestCompactionInGranuleImpl(bool reboots, + const TVector<std::pair<TString, TTypeInfo>>& ydbSchema, + const TVector<std::pair<TString, TTypeInfo>>& ydbPk) { TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -848,14 +943,14 @@ void TestCompactionInGranuleImpl(bool reboots) { ui64 planStep = 100; ui64 txId = 100; - SetupSchema(runtime, sender, tableId); + SetupSchema(runtime, sender, tableId, ydbSchema, ydbPk); TAutoPtr<IEventHandle> handle; // Write same keys: merge on compaction static const ui32 triggerPortionSize = 75 * 1000; std::pair<ui64, ui64> triggerPortion = {0, triggerPortionSize}; - TString triggerData = MakeTestBlob(triggerPortion, testYdbSchema); + TString triggerData = MakeTestBlob(triggerPortion, ydbSchema); UNIT_ASSERT(triggerData.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT); UNIT_ASSERT(triggerData.size() < NColumnShard::TLimits::MAX_BLOB_SIZE); @@ -870,7 +965,7 @@ void TestCompactionInGranuleImpl(bool reboots) { ids.reserve(numWrites); for (ui32 w = 0; w < numWrites; ++w, ++writeId, pos += portionSize) { std::pair<ui64, ui64> portion = {pos, pos + portionSize}; - TString data = MakeTestBlob(portion, testYdbSchema); + TString data = MakeTestBlob(portion, ydbSchema); ids.push_back(writeId); UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, data)); @@ -923,8 +1018,13 @@ void TestCompactionInGranuleImpl(bool reboots) { TVector<TString> readData; readData.push_back(resRead.GetData()); - UNIT_ASSERT(DataHas(readData, schema, triggerPortion, true)); - UNIT_ASSERT(DataHas(readData, schema, smallWrites, true)); + if (ydbPk[0].second == TTypeInfo(NTypeIds::String) || ydbPk[0].second == TTypeInfo(NTypeIds::Utf8)) { + UNIT_ASSERT(DataHas<std::string>(readData, schema, triggerPortion, true)); + UNIT_ASSERT(DataHas<std::string>(readData, schema, smallWrites, true)); + } else { + UNIT_ASSERT(DataHas(readData, schema, triggerPortion, true)); + UNIT_ASSERT(DataHas(readData, schema, smallWrites, true)); + } UNIT_ASSERT(meta.HasReadStats()); auto& readStats = meta.GetReadStats(); @@ -1103,7 +1203,7 @@ void TestReadWithProgram(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema ui64 planStep = 100; ui64 txId = 100; - SetupSchema(runtime, sender, tableId); + SetupSchema(runtime, sender, tableId, ydbSchema); { // write some data bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, ydbSchema)); @@ -1240,7 +1340,9 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema, ui64 planStep = 100; ui64 txId = 100; - SetupSchema(runtime, sender, tableId, ydbSchema); + auto pk = ydbSchema; + pk.resize(4); + SetupSchema(runtime, sender, tableId, ydbSchema, pk); { // write some data bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, testDataBlob); @@ -1396,19 +1498,81 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } Y_UNIT_TEST(WriteReadNoCompression) { - TestWriteRead(true, TTestSchema::YdbSchema(), "none"); + TestWriteRead(true, TTestSchema::YdbSchema(), TTestSchema::YdbPkSchema(), "none"); } Y_UNIT_TEST(WriteReadZSTD) { - TestWriteRead(true, TTestSchema::YdbSchema(), "zstd"); + TestWriteRead(true, TTestSchema::YdbSchema(), TTestSchema::YdbPkSchema(), "zstd"); } Y_UNIT_TEST(CompactionInGranule) { - TestCompactionInGranuleImpl(false); + std::vector<TTypeId> types = { + NTypeIds::Timestamp, + //NTypeIds::Int16, + //NTypeIds::Uint16, + NTypeIds::Int32, + NTypeIds::Uint32, + NTypeIds::Int64, + NTypeIds::Uint64, + //NTypeIds::Date, + NTypeIds::Datetime + //NTypeIds::Interval + }; + auto schema = TTestSchema::YdbSchema(); + auto pk = TTestSchema::YdbPkSchema(); + + for (auto& type : types) { + schema[0].second = TTypeInfo(type); + pk[0].second = TTypeInfo(type); + TestCompactionInGranuleImpl(false, schema, pk); + } + } +#if 0 + Y_UNIT_TEST(CompactionInGranuleFloatKey) { + std::vector<NScheme::TTypeId> types = { + NTypeIds::Float, + NTypeIds::Double + }; + auto schema = TTestSchema::YdbSchema(); + auto pk = TTestSchema::YdbPkSchema(); + + for (auto& type : types) { + schema[0].second = TTypeInfo(type); + pk[0].second = TTypeInfo(type); + TestCompactionInGranuleImpl(false, schema, pk); + } + } +#endif + Y_UNIT_TEST(CompactionInGranuleStrKey) { + std::vector<NScheme::TTypeId> types = { + NTypeIds::String, + NTypeIds::Utf8 + }; + auto schema = TTestSchema::YdbSchema(); + auto pk = TTestSchema::YdbPkSchema(); + + for (auto& type : types) { + schema[0].second = TTypeInfo(type); + pk[0].second = TTypeInfo(type); + TestCompactionInGranuleImpl(false, schema, pk); + } } Y_UNIT_TEST(RebootCompactionInGranule) { - TestCompactionInGranuleImpl(true); + // some of types + std::vector<NScheme::TTypeId> types = { + NTypeIds::Timestamp, + NTypeIds::Int32, + NTypeIds::String + }; + auto schema = TTestSchema::YdbSchema(); + auto pk = TTestSchema::YdbPkSchema(); + + for (auto& type : types) { + schema[0].second = TTypeInfo(type); + pk[0].second = TTypeInfo(type); + TestCompactionInGranuleImpl(true, schema, pk); + } } Y_UNIT_TEST(ReadWithProgram) { @@ -1473,7 +1637,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } } - Y_UNIT_TEST(CompactionSplitGranule) { + void TestCompactionSplitGranule(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema, + const TVector<std::pair<TString, TTypeInfo>>& ydbPk) { TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -1490,9 +1655,11 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { ui64 planStep = 100; ui64 txId = 100; - SetupSchema(runtime, sender, tableId); + SetupSchema(runtime, sender, tableId, ydbSchema, ydbPk, "lz4"); TAutoPtr<IEventHandle> handle; + bool isStrPk0 = ydbPk[0].second == TTypeInfo(NTypeIds::String) || ydbPk[0].second == TTypeInfo(NTypeIds::Utf8); + // Write different keys: grow on compatcion static const ui32 triggerPortionSize = 85 * 1000; @@ -1502,7 +1669,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { for (ui32 i = 0; i < numWrites; ++i, ++writeId, ++planStep, ++txId) { ui64 start = i * (triggerPortionSize - overlapSize); std::pair<ui64, ui64> triggerPortion = {start, start + triggerPortionSize}; - TString triggerData = MakeTestBlob(triggerPortion, testYdbSchema); + TString triggerData = MakeTestBlob(triggerPortion, ydbSchema); UNIT_ASSERT(triggerData.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT); UNIT_ASSERT(triggerData.size() < NColumnShard::TLimits::MAX_BLOB_SIZE); @@ -1569,7 +1736,12 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { UNIT_ASSERT(num < 100); } - UNIT_ASSERT(DataHas(readData, schema, {0, numRows}, true)); + if (isStrPk0) { + UNIT_ASSERT(DataHas<std::string>(readData, schema, {0, numRows}, true)); + } else { + UNIT_ASSERT(DataHas(readData, schema, {0, numRows}, true)); + } + readData.clear(); { // read with predicate (TO) @@ -1577,9 +1749,9 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { Proto(read.get()).AddColumnNames("timestamp"); Proto(read.get()).AddColumnNames("message"); - TSerializedTableRange range = MakeTestRange({0, 1000}, false, false, testYdbPkSchema); + TSerializedTableRange range = MakeTestRange({0, 1}, false, false, ydbPk); NOlap::TPredicate prGreater, prLess; - std::tie(prGreater, prLess) = RangePredicates(range, testYdbPkSchema); + std::tie(prGreater, prLess) = RangePredicates(range, ydbPk); auto* less = Proto(read.get()).MutableLessPredicate(); for (auto& name : prLess.ColumnNames()) { @@ -1597,6 +1769,9 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { UNIT_ASSERT(event); auto& resRead = Proto(event); + Cerr << "[" << __LINE__ << "] " << ydbPk[0].second.GetTypeId() << " " + << resRead.GetBatch() << " " << resRead.GetData().size() << "\n"; + UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); @@ -1619,6 +1794,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { //UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO UNIT_ASSERT_VALUES_EQUAL(readStats.GetUsedColumns(), 7); // planStep, txId + 4 PK columns + "message" UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 1); + //UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), 1); // TODO: min-max index optimization? } // TODO: check data @@ -1629,9 +1805,13 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { Proto(read.get()).AddColumnNames("timestamp"); Proto(read.get()).AddColumnNames("message"); - TSerializedTableRange range = MakeTestRange({2000 * 1000, 1000 * 1000 * 1000}, false, false, testYdbPkSchema); + TSerializedTableRange range = MakeTestRange({numRows, numRows + 1000}, false, false, ydbPk); + if (isStrPk0) { + range = MakeTestRange({99990, 99999}, false, false, ydbPk); + } + NOlap::TPredicate prGreater, prLess; - std::tie(prGreater, prLess) = RangePredicates(range, testYdbPkSchema); + std::tie(prGreater, prLess) = RangePredicates(range, ydbPk); auto* greater = Proto(read.get()).MutableGreaterPredicate(); for (auto& name : prGreater.ColumnNames()) { @@ -1649,6 +1829,9 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { UNIT_ASSERT(event); auto& resRead = Proto(event); + Cerr << "[" << __LINE__ << "] " << ydbPk[0].second.GetTypeId() << " " + << resRead.GetBatch() << " " << resRead.GetData().size() << "\n"; + UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); @@ -1671,6 +1854,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { //UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO UNIT_ASSERT_VALUES_EQUAL(readStats.GetUsedColumns(), 7); // planStep, txId + 4 PK columns + "message" UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 1); + //UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), 0); // TODO: min-max index optimization? } // TODO: check data @@ -1706,23 +1890,66 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { ui64 numBytes = static_cast<arrow::UInt64Array&>(*bytes).Value(i); ui64 numRawBytes = static_cast<arrow::UInt64Array&>(*rawBytes).Value(i); + Cerr << "[" << __LINE__ << "] " << ydbPk[0].second.GetTypeId() << " " + << pathId << " " << kind << " " << numRows << " " << numBytes << " " << numRawBytes << "\n"; + if (pathId == tableId) { if (kind == 2) { UNIT_ASSERT_VALUES_EQUAL(numRows, (triggerPortionSize - overlapSize) * numWrites + overlapSize); UNIT_ASSERT(numBytes > numRows); - UNIT_ASSERT(numRawBytes > numBytes); + //UNIT_ASSERT(numRawBytes > numBytes); } } else { UNIT_ASSERT_VALUES_EQUAL(numRows, 0); UNIT_ASSERT_VALUES_EQUAL(numBytes, 0); UNIT_ASSERT_VALUES_EQUAL(numRawBytes, 0); } - - Cerr << pathId << " " << kind << " " << numRows << " " << numBytes << " " << numRawBytes << "\n"; } } } + Y_UNIT_TEST(CompactionSplitGranule) { + std::vector<TTypeId> types = { + NTypeIds::Timestamp, + //NTypeIds::Int16, + //NTypeIds::Uint16, + NTypeIds::Int32, + NTypeIds::Uint32, + NTypeIds::Int64, + NTypeIds::Uint64, + //NTypeIds::Date, + NTypeIds::Datetime + //NTypeIds::Interval + //NTypeIds::Float + //NTypeIds::Double + }; + + auto schema = TTestSchema::YdbSchema(); + auto pk = TTestSchema::YdbPkSchema(); + + for (auto& type : types) { + schema[0].second = TTypeInfo(type); + pk[0].second = TTypeInfo(type); + TestCompactionSplitGranule(schema, pk); + } + } + + Y_UNIT_TEST(CompactionSplitGranuleStrKey) { + std::vector<TTypeId> types = { + NTypeIds::String, + NTypeIds::Utf8 + }; + + auto schema = TTestSchema::YdbSchema(); + auto pk = TTestSchema::YdbPkSchema(); + + for (auto& type : types) { + schema[0].second = TTypeInfo(type); + pk[0].second = TTypeInfo(type); + TestCompactionSplitGranule(schema, pk); + } + } + Y_UNIT_TEST(ReadStale) { TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -1740,13 +1967,14 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { ui64 planStep = 1000000; ui64 txId = 100; - SetupSchema(runtime, sender, tableId); + auto ydbSchema = TTestSchema::YdbSchema(); + SetupSchema(runtime, sender, tableId, ydbSchema); TAutoPtr<IEventHandle> handle; // Write some test data to adavnce the time { std::pair<ui64, ui64> triggerPortion = {1, 1000}; - TString triggerData = MakeTestBlob(triggerPortion, testYdbSchema); + TString triggerData = MakeTestBlob(triggerPortion, ydbSchema); UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData)); @@ -1827,7 +2055,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { ui64 writeId = 0; ui64 tableId = 1; - SetupSchema(runtime, sender, tableId); + auto ydbSchema = TTestSchema::YdbSchema(); + SetupSchema(runtime, sender, tableId, ydbSchema); TAutoPtr<IEventHandle> handle; bool blockReadFinished = true; @@ -1944,7 +2173,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { static const ui32 triggerPortionSize = 75 * 1000; std::pair<ui64, ui64> triggerPortion = {0, triggerPortionSize}; - TString triggerData = MakeTestBlob(triggerPortion, testYdbSchema); + TString triggerData = MakeTestBlob(triggerPortion, ydbSchema); UNIT_ASSERT(triggerData.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT); UNIT_ASSERT(triggerData.size() < NColumnShard::TLimits::MAX_BLOB_SIZE); @@ -1962,7 +2191,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { // Do a small write that is not indexed so that we will get a committed blob in read request { - TString smallData = MakeTestBlob({0, 2}, testYdbSchema); + TString smallData = MakeTestBlob({0, 2}, ydbSchema); UNIT_ASSERT(smallData.size() < 100 * 1024); UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, smallData)); diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp index ecf54d53e54..e8e6a8e65a2 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp @@ -12,6 +12,7 @@ using NWrappers::NTestHelpers::TS3Mock; namespace { static const TVector<std::pair<TString, TTypeInfo>> testYdbSchema = TTestSchema::YdbSchema(); +static const TVector<std::pair<TString, TTypeInfo>> testYdbPk = TTestSchema::YdbPkSchema(); std::shared_ptr<arrow::RecordBatch> UpdateColumn(std::shared_ptr<arrow::RecordBatch> batch, TString columnName, i64 seconds) { std::string name(columnName.c_str(), columnName.size()); @@ -46,8 +47,8 @@ bool TriggerTTL(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot s return (res.GetStatus() == NKikimrTxColumnShard::SUCCESS); } -std::shared_ptr<arrow::TimestampArray> GetTimestampColumn(const TString& blob, const TString& srtSchema, - const std::string& columnName) +std::shared_ptr<arrow::Array> GetFirstPKColumn(const TString& blob, const TString& srtSchema, + const std::string& columnName) { auto schema = NArrow::DeserializeSchema(srtSchema); auto batch = NArrow::DeserializeBatch(blob, schema); @@ -55,7 +56,7 @@ std::shared_ptr<arrow::TimestampArray> GetTimestampColumn(const TString& blob, c std::shared_ptr<arrow::Array> array = batch->GetColumnByName(columnName); UNIT_ASSERT(array); - return std::static_pointer_cast<arrow::TimestampArray>(array); + return array; } bool CheckSame(const TString& blob, const TString& srtSchema, ui32 expectedSize, @@ -63,14 +64,14 @@ bool CheckSame(const TString& blob, const TString& srtSchema, ui32 expectedSize, auto expected = arrow::TimestampScalar(seconds * 1000 * 1000, arrow::timestamp(arrow::TimeUnit::MICRO)); UNIT_ASSERT_VALUES_EQUAL(expected.value, seconds * 1000 * 1000); - auto tsCol = GetTimestampColumn(blob, srtSchema, columnName); + auto tsCol = GetFirstPKColumn(blob, srtSchema, columnName); UNIT_ASSERT(tsCol); UNIT_ASSERT_VALUES_EQUAL(tsCol->length(), expectedSize); for (int i = 0; i < tsCol->length(); ++i) { - i64 value = tsCol->Value(i); - if (value != expected.value) { - Cerr << "Unexpected: " << value << ", expected " << expected.value << "\n"; + auto value = *tsCol->GetScalar(i); + if (!value->Equals(expected)) { + Cerr << "Unexpected: '" << value->ToString() << "', expected " << expected.value << "\n"; return false; } } @@ -98,6 +99,23 @@ std::vector<TString> MakeData(const std::vector<ui64>& ts, ui32 portionSize, ui3 return data; } +bool TestCreateTable(const TString& txBody, ui64 planStep = 1000, ui64 txId = 100) { + TTestBasicRuntime runtime; + TTester::Setup(runtime); + + TActorId sender = runtime.AllocateEdgeActor(); + CreateTestBootstrapper(runtime, + CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), + &CreateColumnShard); + + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); + runtime.DispatchEvents(options); + + // + return ProposeSchemaTx(runtime, sender, txBody, {++planStep, ++txId}); +} + // ts[0] = 1600000000; // date -u --date='@1600000000' Sun Sep 13 12:26:40 UTC 2020 // ts[1] = 1620000000; // date -u --date='@1620000000' Mon May 3 00:00:00 UTC 2021 void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, @@ -135,7 +153,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, spec.SetTtl(ttlSec); } bool ok = ProposeSchemaTx(runtime, sender, - TTestSchema::CreateTableTxBody(tableId, testYdbSchema, spec), + TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk, spec), {++planStep, ++txId}); UNIT_ASSERT(ok); PlanSchemaTx(runtime, sender, {planStep, txId}); @@ -398,7 +416,7 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe UNIT_ASSERT(specs.size() > 0); bool ok = ProposeSchemaTx(runtime, sender, - TTestSchema::CreateTableTxBody(tableId, testYdbSchema, specs[0]), + TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk, specs[0]), {++planStep, ++txId}); UNIT_ASSERT(ok); PlanSchemaTx(runtime, sender, {planStep, txId}); @@ -471,13 +489,15 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe if (resRead.GetData().size()) { auto& meta = resRead.GetMeta(); auto& schema = meta.GetSchema(); - auto tsColumn = GetTimestampColumn(resRead.GetData(), schema, TTestSchema::DefaultTtlColumn); - UNIT_ASSERT(tsColumn); + auto pkColumn = GetFirstPKColumn(resRead.GetData(), schema, TTestSchema::DefaultTtlColumn); + UNIT_ASSERT(pkColumn); + UNIT_ASSERT(pkColumn->type_id() == arrow::Type::TIMESTAMP); UNIT_ASSERT(meta.HasReadStats()); auto& readStats = meta.GetReadStats(); ui64 numBytes = readStats.GetDataBytes(); // compressed bytes in storage + auto tsColumn = std::static_pointer_cast<arrow::TimestampArray>(pkColumn); resColumns.emplace_back(tsColumn, numBytes); } else { resColumns.emplace_back(nullptr, 0); @@ -610,7 +630,7 @@ void TestDrop(bool reboots) { ui64 planStep = 1000000000; // greater then delays ui64 txId = 100; - bool ok = ProposeSchemaTx(runtime, sender, TTestSchema::CreateTableTxBody(tableId, testYdbSchema), + bool ok = ProposeSchemaTx(runtime, sender, TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk), {++planStep, ++txId}); UNIT_ASSERT(ok); PlanSchemaTx(runtime, sender, {planStep, txId}); @@ -676,6 +696,77 @@ extern bool gAllowLogBatchingDefaultValue; } Y_UNIT_TEST_SUITE(TColumnShardTestSchema) { + Y_UNIT_TEST(CreateTable) { + ui64 tableId = 1; + + TVector<TTypeId> intTypes = { + NTypeIds::Timestamp, + NTypeIds::Int8, + NTypeIds::Int16, + NTypeIds::Int32, + NTypeIds::Int64, + NTypeIds::Uint8, + NTypeIds::Uint16, + NTypeIds::Uint32, + NTypeIds::Uint64, + NTypeIds::Date, + NTypeIds::Datetime + }; + + auto schema = TTestSchema::YdbSchema({"k0", TTypeInfo(NTypeIds::Timestamp)}); + auto pk = schema; + pk.resize(4); + + for (auto& ydbType : intTypes) { + schema[0].second = TTypeInfo(ydbType); + pk[0].second = TTypeInfo(ydbType); + auto txBody = TTestSchema::CreateTableTxBody(tableId, schema, pk); + bool ok = TestCreateTable(txBody); + UNIT_ASSERT(ok); + } + + // TODO: support float types + TVector<TTypeId> floatTypes = { + NTypeIds::Float, + NTypeIds::Double + }; + + for (auto& ydbType : floatTypes) { + schema[0].second = TTypeInfo(ydbType); + pk[0].second = TTypeInfo(ydbType); + auto txBody = TTestSchema::CreateTableTxBody(tableId, schema, pk); + bool ok = TestCreateTable(txBody); + UNIT_ASSERT(!ok); + } + + TVector<TTypeId> strTypes = { + NTypeIds::String, + NTypeIds::Utf8 + }; + + for (auto& ydbType : strTypes) { + schema[0].second = TTypeInfo(ydbType); + pk[0].second = TTypeInfo(ydbType); + auto txBody = TTestSchema::CreateTableTxBody(tableId, schema, pk); + bool ok = TestCreateTable(txBody); + UNIT_ASSERT(ok); + } + + TVector<TTypeId> xsonTypes = { + NTypeIds::Yson, + NTypeIds::Json, + NTypeIds::JsonDocument + }; + + for (auto& ydbType : xsonTypes) { + schema[0].second = TTypeInfo(ydbType); + pk[0].second = TTypeInfo(ydbType); + auto txBody = TTestSchema::CreateTableTxBody(tableId, schema, pk); + bool ok = TestCreateTable(txBody); + UNIT_ASSERT(!ok); + } + } + Y_UNIT_TEST(ExternalTTL) { TestTtl(false, false); } diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 83c99145215..84d9937a9ad 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -635,36 +635,19 @@ private: auto outputColumns = GetOutputColumns(ctx); if (!outputColumns.empty()) { - std::shared_ptr<arrow::RecordBatch> batch = Batch; - if (!batch) { + if (!Batch) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, "No batch in bulk upsert data", ctx); } -#if 1 // TODO: it's been checked in BuildSchema, remove if possible - for (auto& columnName : outputColumns) { - if (!batch->GetColumnByName(columnName)) { - // TODO: upsert with implicit NULLs instead? - return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, - "No column '" + columnName + "' in bulk upsert data", ctx); - } - } -#endif - // reuse source serialized batch if it's present and OK - bool reuseData = false; - { - if (GetSourceType() == EUploadSource::ArrowBatch) { - reuseData = (outputColumns.size() == (size_t)batch->num_columns()); - } - // TODO: reuse batchData with reordered columns - for (int i = 0; i < batch->num_columns() && reuseData; ++i) { - if (batch->column_name(i) != outputColumns[i]) { - reuseData = false; + auto batch = NArrow::ExtractColumns(Batch, outputColumns); + if (!batch) { + for (auto& columnName : outputColumns) { + if (Batch->schema()->GetFieldIndex(columnName) < 0) { + return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, + "No column '" + columnName + "' in bulk upsert data", ctx); } } - } - - if (!reuseData) { - batch = NArrow::ExtractColumns(batch, outputColumns); + return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "Cannot prepare bulk upsert data", ctx); } Y_VERIFY(batch); diff --git a/ydb/services/ydb/ydb_logstore_ut.cpp b/ydb/services/ydb/ydb_logstore_ut.cpp index 221dcfc206e..5b5c3205da7 100644 --- a/ydb/services/ydb/ydb_logstore_ut.cpp +++ b/ydb/services/ydb/ydb_logstore_ut.cpp @@ -11,9 +11,30 @@ using namespace NYdb; namespace { -TVector<NYdb::TColumn> TestSchemaColumns() { +const std::vector<EPrimitiveType> allowedTypes = { + //EPrimitiveType::Bool, + EPrimitiveType::Uint8, + EPrimitiveType::Int32, + EPrimitiveType::Uint32, + EPrimitiveType::Int64, + EPrimitiveType::Uint64, + //EPrimitiveType::Float, // TODO + //EPrimitiveType::Double,// TODO + EPrimitiveType::Date, + EPrimitiveType::Datetime, + EPrimitiveType::Timestamp, + //EPrimitiveType::Interval, + EPrimitiveType::String, + EPrimitiveType::Utf8, + //EPrimitiveType::Yson, + //EPrimitiveType::Json, + //EPrimitiveType::JsonDocument, + //EPrimitiveType::DyNumber, +}; + +TVector<NYdb::TColumn> TestSchemaColumns(EPrimitiveType pkField = EPrimitiveType::Timestamp) { return { - NYdb::TColumn("timestamp", NYdb::NLogStore::MakeColumnType(EPrimitiveType::Timestamp)), + NYdb::TColumn("timestamp", NYdb::NLogStore::MakeColumnType(pkField)), NYdb::TColumn("resource_type", NYdb::NLogStore::MakeColumnType(EPrimitiveType::Utf8)), NYdb::TColumn("resource_id", NYdb::NLogStore::MakeColumnType(EPrimitiveType::Utf8)), NYdb::TColumn("uid", NYdb::NLogStore::MakeColumnType(EPrimitiveType::Utf8)), @@ -59,7 +80,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { return connection; } - Y_UNIT_TEST(LogStore) { + void CreateDropStore(EPrimitiveType pkField) { NKikimrConfig::TAppConfig appConfig; TKikimrWithGrpcAndRootSchema server(appConfig); EnableDebugLogs(server); @@ -68,7 +89,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { NYdb::NLogStore::TLogStoreClient logStoreClient(connection); { - NYdb::NLogStore::TSchema logSchema(TestSchemaColumns(), TestSchemaKey()); + NYdb::NLogStore::TSchema logSchema(TestSchemaColumns(pkField), TestSchemaKey()); THashMap<TString, NYdb::NLogStore::TSchema> schemaPresets; schemaPresets["default"] = logSchema; NYdb::NLogStore::TLogStoreDescription storeDescr(4, schemaPresets); @@ -87,7 +108,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { const auto& schema = descr.GetSchemaPresets().begin()->second; UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns().size(), 10); - UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[0].ToString(), "{ name: \"timestamp\", type: Timestamp? }"); + UNIT_ASSERT(schema.GetColumns()[0].ToString().StartsWith("{ name: \"timestamp\", type:")); UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[1].ToString(), "{ name: \"resource_type\", type: Utf8? }"); UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[4].ToString(), "{ name: \"level\", type: Int32? }"); UNIT_ASSERT_VALUES_EQUAL(schema.GetPrimaryKeyColumns(), @@ -102,6 +123,12 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { } } + Y_UNIT_TEST(LogStore) { + for (auto pk0 : allowedTypes) { + CreateDropStore(pk0); + } + } + Y_UNIT_TEST(LogStoreTiers) { NKikimrConfig::TAppConfig appConfig; TKikimrWithGrpcAndRootSchema server(appConfig); @@ -188,7 +215,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { } { // wrong schema: not supported PK - NYdb::NLogStore::TSchema logSchema(TestSchemaColumns(), {"resource_type", "resource_id"}); + NYdb::NLogStore::TSchema logSchema(TestSchemaColumns(), {"json_payload", "resource_id"}); THashMap<TString, NYdb::NLogStore::TSchema> schemaPresets; schemaPresets["default"] = logSchema; NYdb::NLogStore::TLogStoreDescription storeDescr(4, schemaPresets); @@ -273,7 +300,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { } } - Y_UNIT_TEST(LogTable) { + void CreateDropTable(EPrimitiveType pkField) { NKikimrConfig::TAppConfig appConfig; TKikimrWithGrpcAndRootSchema server(appConfig); EnableDebugLogs(server); @@ -281,7 +308,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { auto connection = ConnectToServer(server); NYdb::NLogStore::TLogStoreClient logStoreClient(connection); - NYdb::NLogStore::TSchema logSchema(TestSchemaColumns(), TestSchemaKey()); + NYdb::NLogStore::TSchema logSchema(TestSchemaColumns(pkField), TestSchemaKey()); { THashMap<TString, NYdb::NLogStore::TSchema> schemaPresets; @@ -305,7 +332,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { UNIT_ASSERT_VALUES_EQUAL(descr.GetShardsCount(), 4); const auto& schema = descr.GetSchema(); UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns().size(), 10); - UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[0].ToString(), "{ name: \"timestamp\", type: Timestamp? }"); + UNIT_ASSERT(schema.GetColumns()[0].ToString().StartsWith("{ name: \"timestamp\", type:")); UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[1].ToString(), "{ name: \"resource_type\", type: Utf8? }"); UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[4].ToString(), "{ name: \"level\", type: Int32? }"); UNIT_ASSERT_VALUES_EQUAL(schema.GetPrimaryKeyColumns(), @@ -327,7 +354,7 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { UNIT_ASSERT_VALUES_EQUAL(descr.GetShardsCount(), 4); const auto& schema = descr.GetSchema(); UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns().size(), 10); - UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[0].ToString(), "{ name: \"timestamp\", type: Timestamp? }"); + UNIT_ASSERT(schema.GetColumns()[0].ToString().StartsWith("{ name: \"timestamp\", type:")); UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[1].ToString(), "{ name: \"resource_type\", type: Utf8? }"); UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns()[4].ToString(), "{ name: \"level\", type: Int32? }"); UNIT_ASSERT_VALUES_EQUAL(schema.GetPrimaryKeyColumns(), @@ -414,6 +441,12 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { } } + Y_UNIT_TEST(LogTable) { + for (auto pk0 : allowedTypes) { + CreateDropTable(pk0); + } + } + Y_UNIT_TEST(AlterLogStore) { NKikimrConfig::TAppConfig appConfig; TKikimrWithGrpcAndRootSchema server(appConfig); diff --git a/ydb/services/ydb/ydb_olapstore_ut.cpp b/ydb/services/ydb/ydb_olapstore_ut.cpp index 84294c161bc..b66531f43de 100644 --- a/ydb/services/ydb/ydb_olapstore_ut.cpp +++ b/ydb/services/ydb/ydb_olapstore_ut.cpp @@ -7,13 +7,36 @@ #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> +#include <ydb/library/yql/public/udf/udf_types.h> #include <ydb/library/yql/public/issue/yql_issue.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> using namespace NYdb; namespace { -std::vector<TString> testShardingVariants = { + +THashMap<EPrimitiveType, TString> allowedTypes = { + //EPrimitiveType::Bool, + {EPrimitiveType::Uint8, "Uint8"}, + {EPrimitiveType::Int32, "Int32"}, + {EPrimitiveType::Uint32, "Uint32"}, + {EPrimitiveType::Int64, "Int64"}, + {EPrimitiveType::Uint64, "Uint64"}, + //{EPrimitiveType::Float, "Float"}, + //{EPrimitiveType::Double, "Double"}, + {EPrimitiveType::Date, "Date"}, + {EPrimitiveType::Datetime, "Datetime"}, + {EPrimitiveType::Timestamp, "Timestamp"}, + //{EPrimitiveType::Interval, "Interval"}, + {EPrimitiveType::String, "String"}, + {EPrimitiveType::Utf8, "Utf8"} + //EPrimitiveType::Yson, + //EPrimitiveType::Json, + //EPrimitiveType::JsonDocument, + //EPrimitiveType::DyNumber, +}; + +static constexpr const char* testShardingVariants[] = { R"(["timestamp", "uid"])", R"(["timestamp", "resource_type", "resource_id", "uid"])" }; @@ -47,10 +70,16 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { return connection; } - void CreateOlapTable(const TServerSettings& settings, const TString& tableName, ui32 numShards = 2, - const TString shardingColumns = R"(["timestamp", "uid"])") + struct TTestOlapTableOptions { + EPrimitiveType TsType = EPrimitiveType::Timestamp; + ui32 NumShards = 2; + TString Sharding = testShardingVariants[0]; + TString HashFunction = "HASH_FUNCTION_CLOUD_LOGS"; + }; + + void CreateOlapTable(const TServerSettings& settings, const TString& tableName, TTestOlapTableOptions opts = {}) { - const char * tableDescr = R"( + TString tableDescr = Sprintf(R"( Name: "OlapStore" ColumnShardCount: 4 SchemaPresets { @@ -60,7 +89,7 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { Columns { Name: "json_payload" Type: "JsonDocument" } Columns { Name: "resource_id" Type: "Utf8" } Columns { Name: "uid" Type: "Utf8" } - Columns { Name: "timestamp" Type: "Timestamp" } + Columns { Name: "timestamp" Type: "%s" } Columns { Name: "resource_type" Type: "Utf8" } Columns { Name: "level" Type: "Int32" } Columns { Name: "ingested_at" Type: "Timestamp" } @@ -70,7 +99,7 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { Engine: COLUMN_ENGINE_REPLACING_TIMESERIES } } - )"; + )", allowedTypes[opts.TsType].c_str()); TClient annoyingClient(settings); NMsgBusProxy::EResponseStatus status = annoyingClient.CreateOlapStore("/Root", tableDescr); @@ -81,11 +110,11 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { ColumnShardCount : %d Sharding { HashSharding { - Function: HASH_FUNCTION_CLOUD_LOGS + Function: %s Columns: %s } } - )", tableName.c_str(), numShards, shardingColumns.c_str())); + )", tableName.c_str(), opts.NumShards, opts.HashFunction.c_str(), opts.Sharding.c_str())); UNIT_ASSERT_VALUES_EQUAL(status, NMsgBusProxy::EResponseStatus::MSTATUS_OK); } @@ -112,16 +141,60 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { } NYdb::NTable::TAsyncBulkUpsertResult SendBatch(NYdb::NTable::TTableClient& client, const TString& tableName, - const ui64 batchSize, const ui32 baseUserId, i64& ts) + const ui64 batchSize, const ui32 baseUserId, std::pair<EPrimitiveType, i64>& value) { + i64 ts = value.second; + TValueBuilder rows; rows.BeginList(); for (ui64 i = 0; i < batchSize; ++i, ts += 1000) { const ui32 userId = baseUserId + (i % 100); - rows.AddListItem() - .BeginStruct() - .AddMember("timestamp").Timestamp(TInstant::MicroSeconds(ts)) - .AddMember("resource_type").Utf8(i%2 ? "app" : "nginx") + auto& row = rows.AddListItem() + .BeginStruct(); + switch (value.first) { + case EPrimitiveType::Uint8: + row.AddMember("timestamp").Uint8(ts); + break; + case EPrimitiveType::Int32: + row.AddMember("timestamp").Int32(ts); + break; + case EPrimitiveType::Uint32: + row.AddMember("timestamp").Uint32(ts); + break; + case EPrimitiveType::Int64: + row.AddMember("timestamp").Int64(ts); + break; + case EPrimitiveType::Uint64: + row.AddMember("timestamp").Uint64(ts); + break; + case EPrimitiveType::Float: + row.AddMember("timestamp").Float(ts); + break; + case EPrimitiveType::Double: + row.AddMember("timestamp").Double(ts); + break; + case EPrimitiveType::Timestamp: + row.AddMember("timestamp").Timestamp(TInstant::MicroSeconds(ts % NYql::NUdf::MAX_TIMESTAMP)); + break; + case EPrimitiveType::Date: + row.AddMember("timestamp").Date(TInstant::Days((ts / 1000) % NYql::NUdf::MAX_DATE)); + break; + case EPrimitiveType::Datetime: + row.AddMember("timestamp").Datetime(TInstant::Seconds((ts / 1000) % NYql::NUdf::MAX_DATETIME)); + break; + case EPrimitiveType::Interval: + row.AddMember("timestamp").Interval(ts); + break; + case EPrimitiveType::String: + row.AddMember("timestamp").String(ToString(ts)); + break; + case EPrimitiveType::Utf8: + row.AddMember("timestamp").Utf8(ToString(ts)); + break; + default: + UNIT_ASSERT(false); + } + row.AddMember("resource_type").Utf8(i%2 ? "app" : "nginx") .AddMember("resource_id").Utf8("resource_" + ToString((i+13) % 7)) .AddMember("uid").Utf8(ToString(i % 23)) .AddMember("level").Int32(i % 10) @@ -156,7 +229,7 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { NYdb::NTable::TTableClient client(connection, NYdb::NTable::TClientSettings().AuthToken(token)); TInstant start = TInstant::Now(); - i64 ts = startTs; + auto ts = std::make_pair<EPrimitiveType, i64>(EPrimitiveType::Timestamp, (i64)startTs); const ui32 baseUserId = 1000000; TVector<NYdb::NTable::TAsyncBulkUpsertResult> results; @@ -208,7 +281,9 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { // Create OLTP and OLAP tables with the same set of columns and same PK void CreateTestTables(const TServerSettings& settings, const TString& tableName, const TString& sharding) { - CreateOlapTable(settings, tableName, 2, sharding); + TTestOlapTableOptions opts; + opts.Sharding = sharding; + CreateOlapTable(settings, tableName, opts); CreateTable(settings, "oltp_" + tableName); } @@ -236,22 +311,27 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { return result; } - Y_UNIT_TEST(BulkUpsert) { + void TestBulkUpsert(EPrimitiveType pkFirstType) { NKikimrConfig::TAppConfig appConfig; TKikimrWithGrpcAndRootSchema server(appConfig); EnableDebugLogs(server); auto connection = ConnectToServer(server); - CreateOlapTable(*server.ServerSettings, "log1"); + TTestOlapTableOptions opts; + opts.TsType = pkFirstType; + opts.HashFunction = "HASH_FUNCTION_MODULO_N"; + CreateOlapTable(*server.ServerSettings, "log1", opts); TClient annoyingClient(*server.ServerSettings); annoyingClient.ModifyOwner("/Root/OlapStore", "log1", "alice@builtin"); { NYdb::NTable::TTableClient client(connection, NYdb::NTable::TClientSettings().AuthToken("bob@builtin")); - i64 ts = 1000; + + std::pair<EPrimitiveType, i64> ts(pkFirstType, 1000); auto res = SendBatch(client, "/Root/OlapStore/log1", 100, 1, ts).GetValueSync(); + Cerr << __FILE__ << ":" << __LINE__ << " Issues: " << res.GetIssues().ToString() << "\n"; UNIT_ASSERT_VALUES_EQUAL(res.GetStatus(), EStatus::UNAUTHORIZED); UNIT_ASSERT_STRING_CONTAINS(res.GetIssues().ToString(), "Access denied for bob@builtin with access UpdateRow to table '/Root/OlapStore/log1'"); @@ -262,8 +342,9 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { { NYdb::NTable::TTableClient client(connection, NYdb::NTable::TClientSettings().AuthToken("alice@builtin")); - i64 ts = 1000; + std::pair<EPrimitiveType, i64> ts(pkFirstType, 1000); auto res = SendBatch(client, "log1", 100, 1, ts).GetValueSync(); + Cerr << __FILE__ << ":" << __LINE__ << " Issues: " << res.GetIssues().ToString() << "\n"; UNIT_ASSERT_VALUES_EQUAL(res.GetStatus(), EStatus::SCHEME_ERROR); UNIT_ASSERT_STRING_CONTAINS(res.GetIssues().ToString(), "Unknown database for table 'log1'"); @@ -273,8 +354,9 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { { NYdb::NTable::TTableClient client(connection, NYdb::NTable::TClientSettings().AuthToken("alice@builtin")); - i64 ts = 1000; + std::pair<EPrimitiveType, i64> ts(pkFirstType, 1000); auto res = SendBatch(client, "/Root/OlapStore/log1", 100, 1, ts).GetValueSync(); + Cerr << __FILE__ << ":" << __LINE__ << " Issues: " << res.GetIssues().ToString() << "\n"; UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); TString result = RunQuery(connection, "SELECT count(*) FROM `/Root/OlapStore/log1`;"); @@ -282,6 +364,12 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { } } + Y_UNIT_TEST(BulkUpsert) { + for (auto& [type, name] : allowedTypes) { + TestBulkUpsert(type); + } + } + void TestManyTables(const TString& sharding) { NKikimrConfig::TAppConfig appConfig; TKikimrWithGrpcAndRootSchema server(appConfig); @@ -327,7 +415,9 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { auto connection = ConnectToServer(server); NYdb::NTable::TTableClient client(connection); - CreateOlapTable(*server.ServerSettings, "log1", 2, sharding); + TTestOlapTableOptions opts; + opts.Sharding = sharding; + CreateOlapTable(*server.ServerSettings, "log1", opts); const ui64 batchCount = 100; const ui64 batchSize = 1000; |