diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-22 16:30:42 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-22 16:30:42 +0300 |
commit | f86980eca696fad8264c575b94b8e0e2aa26be29 (patch) | |
tree | 2ff5eae59e1e286cfab05a30311cc58167fcd222 | |
parent | cfaf3952685b3a7dc43dceafe49f2449391fc54d (diff) | |
download | ydb-f86980eca696fad8264c575b94b8e0e2aa26be29.tar.gz |
dictionary compression test
-rw-r--r-- | ydb/core/formats/arrow/dictionary/conversion.cpp | 6 | ||||
-rw-r--r-- | ydb/core/formats/arrow/simple_builder/array.cpp | 2 | ||||
-rw-r--r-- | ydb/core/formats/arrow/simple_builder/array.h | 31 | ||||
-rw-r--r-- | ydb/core/formats/arrow/simple_builder/batch.cpp | 2 | ||||
-rw-r--r-- | ydb/core/formats/arrow/simple_builder/batch.h | 2 | ||||
-rw-r--r-- | ydb/core/formats/arrow/simple_builder/filler.cpp | 2 | ||||
-rw-r--r-- | ydb/core/formats/arrow/simple_builder/filler.h | 55 | ||||
-rw-r--r-- | ydb/core/formats/arrow/ut/ut_dictionary.cpp | 28 | ||||
-rw-r--r-- | ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 555 | ||||
-rw-r--r-- | ydb/core/testlib/cs_helper.cpp | 48 | ||||
-rw-r--r-- | ydb/core/testlib/cs_helper.h | 35 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write_index.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_info.h | 9 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/ut_logs_engine.cpp | 2 |
14 files changed, 575 insertions, 204 deletions
diff --git a/ydb/core/formats/arrow/dictionary/conversion.cpp b/ydb/core/formats/arrow/dictionary/conversion.cpp index 8c96ea23ae0..78c57c8d23f 100644 --- a/ydb/core/formats/arrow/dictionary/conversion.cpp +++ b/ydb/core/formats/arrow/dictionary/conversion.cpp @@ -30,9 +30,9 @@ std::shared_ptr<arrow::Array> DictionaryToArray(const arrow::DictionaryArray& da constexpr bool indicesIntegral = std::is_integral<typename TWrapIndices::T::c_type>::value; if constexpr (indicesIntegral && hasCType) { using TIndices = typename arrow::TypeTraits<typename TWrapIndices::T>::ArrayType; - using TDictionaryAccessor = TDictionaryArrayAccessor<TDictionaryValue, TIndices>; + using TDictionaryAccessor = NConstruction::TDictionaryArrayAccessor<TDictionaryValue, TIndices>; auto& columnIndices = static_cast<const TIndices&>(*data.indices()); - result = TSimpleArrayConstructor<TDictionaryAccessor>("absent", TDictionaryAccessor(columnDictionary, columnIndices)).BuildArray(data.length()); + result = NConstruction::TSimpleArrayConstructor("absent", TDictionaryAccessor(columnDictionary, columnIndices)).BuildArray(data.length()); return true; } } @@ -80,7 +80,7 @@ std::shared_ptr<arrow::DictionaryArray> ArrayToDictionary(const std::shared_ptr< SwitchType(data->type_id(), [&](const auto& type) { using TWrap = std::decay_t<decltype(type)>; if constexpr (arrow::has_string_view<typename TWrap::T>::value && arrow::TypeTraits<typename TWrap::T>::is_parameter_free) { - auto resultArray = TDictionaryArrayConstructor<TLinearArrayAccessor<typename TWrap::T>>("absent", *data).BuildArray(data->length()); + auto resultArray = NConstruction::TDictionaryArrayConstructor<NConstruction::TLinearArrayAccessor<typename TWrap::T>>("absent", *data).BuildArray(data->length()); Y_VERIFY(resultArray->type()->id() == arrow::Type::DICTIONARY); result = static_pointer_cast<arrow::DictionaryArray>(resultArray); } else { diff --git a/ydb/core/formats/arrow/simple_builder/array.cpp b/ydb/core/formats/arrow/simple_builder/array.cpp index 4c4ffca6c1d..009e3e8702d 100644 --- a/ydb/core/formats/arrow/simple_builder/array.cpp +++ b/ydb/core/formats/arrow/simple_builder/array.cpp @@ -1,5 +1,5 @@ #include "array.h" -namespace NKikimr::NArrow { +namespace NKikimr::NArrow::NConstruction { } diff --git a/ydb/core/formats/arrow/simple_builder/array.h b/ydb/core/formats/arrow/simple_builder/array.h index 7f823787590..87a81a6c37c 100644 --- a/ydb/core/formats/arrow/simple_builder/array.h +++ b/ydb/core/formats/arrow/simple_builder/array.h @@ -5,7 +5,7 @@ #include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_dict.h> #include <util/generic/string.h> -namespace NKikimr::NArrow { +namespace NKikimr::NArrow::NConstruction { class IArrayBuilder { private: @@ -49,6 +49,35 @@ public: }; template <class TFiller> +class TBinaryArrayConstructor: public IArrayBuilder { +private: + using TBase = IArrayBuilder; + using TBuilder = typename arrow::TypeTraits<typename TFiller::TValue>::BuilderType; + const TFiller Filler; +protected: + virtual std::shared_ptr<arrow::Array> DoBuildArray(const ui32 recordsCount) const override { + TBuilder fBuilder = TBuilder(); + std::vector<const char*> values; + values.reserve(recordsCount); + for (ui32 i = 0; i < recordsCount; ++i) { + values.emplace_back(Filler.GetValueView(i)); + } + auto addStatus = fBuilder.AppendValues(values.data(), recordsCount); + if (!addStatus.ok()) { + const std::string errorMessage = addStatus.ToString(); + Y_VERIFY(false, "%s", errorMessage.data()); + } + return *fBuilder.Finish(); + } +public: + TBinaryArrayConstructor(const TString& fieldName, const TFiller& filler = TFiller()) + : TBase(fieldName) + , Filler(filler) { + + } +}; + +template <class TFiller> class TDictionaryArrayConstructor: public IArrayBuilder { private: using TBase = IArrayBuilder; diff --git a/ydb/core/formats/arrow/simple_builder/batch.cpp b/ydb/core/formats/arrow/simple_builder/batch.cpp index 4a60043a087..c1f27c58b7c 100644 --- a/ydb/core/formats/arrow/simple_builder/batch.cpp +++ b/ydb/core/formats/arrow/simple_builder/batch.cpp @@ -1,6 +1,6 @@ #include "batch.h" -namespace NKikimr::NArrow { +namespace NKikimr::NArrow::NConstruction { std::shared_ptr<arrow::RecordBatch> TRecordBatchConstructor::BuildBatch(const ui32 numRows) const { std::vector<std::shared_ptr<arrow::Array>> columns; diff --git a/ydb/core/formats/arrow/simple_builder/batch.h b/ydb/core/formats/arrow/simple_builder/batch.h index 4dfb3ea0e3f..f910a0c9b14 100644 --- a/ydb/core/formats/arrow/simple_builder/batch.h +++ b/ydb/core/formats/arrow/simple_builder/batch.h @@ -2,7 +2,7 @@ #include "array.h" #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> -namespace NKikimr::NArrow { +namespace NKikimr::NArrow::NConstruction { class TRecordBatchConstructor { private: const std::vector<IArrayBuilder::TPtr> Builders; diff --git a/ydb/core/formats/arrow/simple_builder/filler.cpp b/ydb/core/formats/arrow/simple_builder/filler.cpp index c4ce2456a57..f6168701ddb 100644 --- a/ydb/core/formats/arrow/simple_builder/filler.cpp +++ b/ydb/core/formats/arrow/simple_builder/filler.cpp @@ -1,7 +1,7 @@ #include "filler.h" #include <library/cpp/testing/unittest/registar.h> -namespace NKikimr::NArrow { +namespace NKikimr::NArrow::NConstruction { TStringPoolFiller::TStringPoolFiller(const ui32 poolSize, const ui32 strLen) { for (ui32 i = 0; i < poolSize; ++i) { diff --git a/ydb/core/formats/arrow/simple_builder/filler.h b/ydb/core/formats/arrow/simple_builder/filler.h index 520192af2fa..c2eab029144 100644 --- a/ydb/core/formats/arrow/simple_builder/filler.h +++ b/ydb/core/formats/arrow/simple_builder/filler.h @@ -5,14 +5,23 @@ #include <util/system/types.h> #include <util/generic/string.h> -namespace NKikimr::NArrow { +namespace NKikimr::NArrow::NConstruction { template <class TArrowInt> class TIntSeqFiller { public: using TValue = TArrowInt; - typename TArrowInt::c_type GetValue(const ui32 idx) const { - return idx; +private: + using CType = typename TArrowInt::c_type; + const CType Delta; +public: + CType GetValue(const CType idx) const { + return Delta + idx; + } + TIntSeqFiller(const CType delta = 0) + : Delta(delta) + { + } }; @@ -42,6 +51,22 @@ public: } }; +template <class TValueExt> +class TBinaryArrayAccessor { +private: + using TArray = typename arrow::TypeTraits<TValueExt>::ArrayType; + const TArray& Data; +public: + using TValue = TValueExt; + const char* GetValueView(const ui32 idx) const { + return Data.GetView(idx).data(); + } + + TBinaryArrayAccessor(const arrow::Array& data) + : Data(static_cast<const TArray&>(data)) { + } +}; + template <class TDictionaryValue, class TIndices> class TDictionaryArrayAccessor { private: @@ -56,8 +81,32 @@ public: TDictionaryArrayAccessor(const TDictionary& dictionary, const TIndices& indices) : Dictionary(dictionary) + , Indices(indices) { + } +}; + +template <class TDictionaryValue, class TIndices> +class TBinaryDictionaryArrayAccessor { +private: + using TDictionary = typename arrow::TypeTraits<TDictionaryValue>::ArrayType; + const TDictionary& Dictionary; + const TIndices& Indices; + std::vector<TString> DictionaryStrings; +public: + using TValue = TDictionaryValue; + const char* GetValueView(const ui32 idx) const { + return DictionaryStrings[Indices.Value(idx)].data(); + } + + TBinaryDictionaryArrayAccessor(const TDictionary& dictionary, const TIndices& indices) + : Dictionary(dictionary) , Indices(indices) { + DictionaryStrings.reserve(Dictionary.length()); + for (i64 idx = 0; idx < Dictionary.length(); ++idx) { + auto sView = Dictionary.Value(idx); + DictionaryStrings.emplace_back(TString(sView.data(), sView.size())); + } } }; diff --git a/ydb/core/formats/arrow/ut/ut_dictionary.cpp b/ydb/core/formats/arrow/ut/ut_dictionary.cpp index f90efa8c709..77e15a8019f 100644 --- a/ydb/core/formats/arrow/ut/ut_dictionary.cpp +++ b/ydb/core/formats/arrow/ut/ut_dictionary.cpp @@ -11,15 +11,15 @@ Y_UNIT_TEST_SUITE(Dictionary) { using namespace NKikimr::NArrow; - ui64 Test(IArrayBuilder::TPtr column, const arrow::ipc::IpcWriteOptions& options, const ui32 bSize) { - std::shared_ptr<arrow::RecordBatch> batch = TRecordBatchConstructor({ column }).BuildBatch(bSize); - const TString data = NKikimr::NArrow::NSerialization::TFullDataSerializer(options).Serialize(batch); - auto deserializedBatch = *NKikimr::NArrow::NSerialization::TFullDataDeserializer().Deserialize(data); + ui64 Test(NConstruction::IArrayBuilder::TPtr column, const arrow::ipc::IpcWriteOptions& options, const ui32 bSize) { + std::shared_ptr<arrow::RecordBatch> batch = NConstruction::TRecordBatchConstructor({ column }).BuildBatch(bSize); + const TString data = NSerialization::TFullDataSerializer(options).Serialize(batch); + auto deserializedBatch = *NSerialization::TFullDataDeserializer().Deserialize(data); Y_VERIFY(!!deserializedBatch); auto originalBatchTransformed = DictionaryToArray(batch); auto roundBatchTransformed = DictionaryToArray(deserializedBatch); - const TString roundUnpacked = NKikimr::NArrow::NSerialization::TFullDataSerializer(options).Serialize(roundBatchTransformed); - const TString roundTransformed = NKikimr::NArrow::NSerialization::TFullDataSerializer(options).Serialize(originalBatchTransformed); + const TString roundUnpacked = NSerialization::TFullDataSerializer(options).Serialize(roundBatchTransformed); + const TString roundTransformed = NSerialization::TFullDataSerializer(options).Serialize(originalBatchTransformed); Y_VERIFY(roundBatchTransformed->num_rows() == originalBatchTransformed->num_rows()); Y_VERIFY(roundUnpacked == roundTransformed); return data.size(); @@ -40,11 +40,13 @@ Y_UNIT_TEST_SUITE(Dictionary) { ui64 bytesDict; ui64 bytesRaw; { - IArrayBuilder::TPtr column = std::make_shared<TDictionaryArrayConstructor<TStringPoolFiller>>("field", TStringPoolFiller(pSize, strLen)); + NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TDictionaryArrayConstructor<NConstruction::TStringPoolFiller>>( + "field", NConstruction::TStringPoolFiller(pSize, strLen)); bytesDict = Test(column, options, bSize); } { - IArrayBuilder::TPtr column = std::make_shared<TSimpleArrayConstructor<TStringPoolFiller>>("field", TStringPoolFiller(pSize, strLen)); + NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>( + "field", NConstruction::TStringPoolFiller(pSize, strLen)); bytesRaw = Test(column, options, bSize); } Cerr << "--------" << bytesDict << " / " << bytesRaw << " = " << 1.0 * bytesDict / bytesRaw << Endl; @@ -69,10 +71,12 @@ Y_UNIT_TEST_SUITE(Dictionary) { ui64 bytesFull; ui64 bytesPayload; { - IArrayBuilder::TPtr column = std::make_shared<TSimpleArrayConstructor<TStringPoolFiller>>("field", TStringPoolFiller(pSize, strLen)); - std::shared_ptr<arrow::RecordBatch> batch = TRecordBatchConstructor({ column }).BuildBatch(bSize); - const TString dataFull = NKikimr::NArrow::NSerialization::TFullDataSerializer(options).Serialize(batch); - const TString dataPayload = NKikimr::NArrow::NSerialization::TBatchPayloadSerializer(options).Serialize(batch); + NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>( + "field", NConstruction::TStringPoolFiller(pSize, strLen) + ); + std::shared_ptr<arrow::RecordBatch> batch = NConstruction::TRecordBatchConstructor({ column }).BuildBatch(bSize); + const TString dataFull = NSerialization::TFullDataSerializer(options).Serialize(batch); + const TString dataPayload = NSerialization::TBatchPayloadSerializer(options).Serialize(batch); bytesFull = dataFull.size(); bytesPayload = dataPayload.size(); } diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 4789a2ba675..d2891880efd 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -8,6 +8,9 @@ #include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/writer.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> +#include <ydb/core/formats/arrow/simple_builder/filler.h> +#include <ydb/core/formats/arrow/simple_builder/array.h> +#include <ydb/core/formats/arrow/simple_builder/batch.h> #include <ydb/core/formats/arrow/ssa_runtime_version.h> #include <ydb/core/kqp/executer_actor/kqp_executer.h> #include <ydb/core/tx/datashard/datashard.h> @@ -23,6 +26,7 @@ #include <util/system/sanitizers.h> #include <fmt/format.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/type_traits.h> namespace NKikimr { namespace NKqp { @@ -43,7 +47,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { //runtime->SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NActors::NLog::PRI_DEBUG); // runtime->SetLogPriority(NKikimrServices::SCHEME_BOARD_REPLICA, NActors::NLog::PRI_DEBUG); // runtime->SetLogPriority(NKikimrServices::TX_PROXY, NActors::NLog::PRI_DEBUG); - runtime->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_DEBUG); + // runtime->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_DEBUG); runtime->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG); runtime->SetLogPriority(NKikimrServices::KQP_GATEWAY, NActors::NLog::PRI_DEBUG); runtime->SetLogPriority(NKikimrServices::KQP_RESOURCE_MANAGER, NActors::NLog::PRI_DEBUG); @@ -59,39 +63,319 @@ Y_UNIT_TEST_SUITE(KqpOlap) { EnableDebugLogging(kikimr.GetTestServer().GetRuntime()); } - class TLocalHelper: public Tests::NCS::THelper { + void PrintValue(IOutputStream& out, const NYdb::TValue& v) { + NYdb::TValueParser value(v); + + while (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) { + if (value.IsNull()) { + out << "<NULL>"; + return; + } else { + value.OpenOptional(); + } + } + + switch (value.GetPrimitiveType()) { + case NYdb::EPrimitiveType::Uint32: + { + out << value.GetUint32(); + break; + } + case NYdb::EPrimitiveType::Uint64: + { + out << value.GetUint64(); + break; + } + case NYdb::EPrimitiveType::Int64: + { + out << value.GetInt64(); + break; + } + case NYdb::EPrimitiveType::Utf8: + { + out << value.GetUtf8(); + break; + } + case NYdb::EPrimitiveType::Timestamp: + { + out << value.GetTimestamp(); + break; + } + default: + { + UNIT_ASSERT_C(false, "PrintValue not iplemented for this type"); + } + } + } + + void PrintRow(IOutputStream& out, const THashMap<TString, NYdb::TValue>& fields) { + for (const auto& f : fields) { + out << f.first << ": "; + PrintValue(out, f.second); + out << " "; + } + } + + void PrintRows(IOutputStream& out, const TVector<THashMap<TString, NYdb::TValue>>& rows) { + for (const auto& r : rows) { + PrintRow(out, r); + out << "\n"; + } + } + + TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPartIterator& it, NJson::TJsonValue* statInfo = nullptr) { + TVector<THashMap<TString, NYdb::TValue>> rows; + if (statInfo) { + *statInfo = NJson::JSON_NULL; + } + for (;;) { + auto streamPart = it.ReadNext().GetValueSync(); + if (!streamPart.IsSuccess()) { + UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); + break; + } + + UNIT_ASSERT_C(streamPart.HasResultSet() || streamPart.HasQueryStats(), + "Unexpected empty scan query response."); + + if (streamPart.HasQueryStats()) { + auto plan = streamPart.GetQueryStats().GetPlan(); + if (plan && statInfo) { + UNIT_ASSERT(NJson::ReadJsonFastTree(*plan, statInfo)); + } + } + + if (streamPart.HasResultSet()) { + auto resultSet = streamPart.ExtractResultSet(); + NYdb::TResultSetParser rsParser(resultSet); + while (rsParser.TryNextRow()) { + THashMap<TString, NYdb::TValue> row; + for (size_t ci = 0; ci < resultSet.ColumnsCount(); ++ci) { + row.emplace(resultSet.GetColumnsMeta()[ci].Name, rsParser.GetValue(ci)); + } + rows.emplace_back(std::move(row)); + } + } + } + return rows; + } + + TVector<THashMap<TString, NYdb::TValue>> ExecuteScanQuery(NYdb::NTable::TTableClient& tableClient, const TString& query) { + Cerr << "====================================\n" + << "QUERY:\n" << query + << "\n\nRESULT:\n"; + + TStreamExecScanQuerySettings scanSettings; + auto it = tableClient.StreamExecuteScanQuery(query, scanSettings).GetValueSync(); + auto rows = CollectRows(it); + + PrintRows(Cerr, rows); + Cerr << "\n"; + + return rows; + } + + ui64 GetUint32(const NYdb::TValue& v) { + NYdb::TValueParser value(v); + if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) { + return *value.GetOptionalUint32(); + } else { + return value.GetUint32(); + } + } + + ui64 GetUint64(const NYdb::TValue& v) { + NYdb::TValueParser value(v); + if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) { + return *value.GetOptionalUint64(); + } else { + return value.GetUint64(); + } + } + + TInstant GetTimestamp(const NYdb::TValue& v) { + NYdb::TValueParser value(v); + if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) { + return *value.GetOptionalTimestamp(); + } else { + return value.GetTimestamp(); + } + } + + class TTypedLocalHelper: public Tests::NCS::THelper { private: using TBase = Tests::NCS::THelper; + const TString TypeName; + TKikimrRunner& KikimrRunner; + const TString TablePath; + const TString TableName; + const TString StoreName; + protected: + virtual TString GetTestTableSchema() const override { + TString result; + if (TypeName) { + result = R"(Columns { Name: "field" Type: ")" + TypeName + "\"}"; + } + result += R"( + Columns { Name: "pk_int" Type: "Int64" } + KeyColumnNames: "pk_int" + Engine: COLUMN_ENGINE_REPLACING_TIMESERIES + )"; + return result; + } + virtual std::vector<TString> GetShardingColumns() const override { + return { "pk_int" }; + } public: - void CreateTestOlapTable(TString tableName = "olapTable", TString storeName = "olapStore", - ui32 storeShardsCount = 4, ui32 tableShardsCount = 3, - TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") { - TActorId sender = Server.GetRuntime()->AllocateEdgeActor(); - CreateTestOlapStore(sender, Sprintf(R"( - Name: "%s" - ColumnShardCount: %d - SchemaPresets { - Name: "default" - Schema { - %s - } + TTypedLocalHelper(const TString& typeName, TKikimrRunner& kikimrRunner, const TString& tableName = "olapTable", const TString& storeName = "olapStore") + : TBase(kikimrRunner.GetTestServer()) + , TypeName(typeName) + , KikimrRunner(kikimrRunner) + , TablePath("/Root/" + storeName + "/" + tableName) + , TableName(tableName) + , StoreName(storeName) + { + SetShardingMethod("HASH_FUNCTION_MODULO_N"); + } + + void PrintCount() { + const TString selectQuery = "SELECT COUNT(*), MAX(pk_int), MIN(pk_int) FROM `" + TablePath + "`"; + + auto tableClient = KikimrRunner.GetTableClient(); + auto rows = ExecuteScanQuery(tableClient, selectQuery); + for (auto&& r : rows) { + for (auto&& c : r) { + Cerr << c.first << ":" << Endl << c.second.GetProto().DebugString() << Endl; } - )", storeName.c_str(), storeShardsCount, GetTestTableSchema().data())); + } + } - TString shardingColumns = "[\"timestamp\", \"uid\"]"; - if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") { - shardingColumns = "[\"uid\"]"; + class TDistribution { + private: + YDB_READONLY(ui32, Count, 0); + YDB_READONLY(ui32, MinCount, 0); + YDB_READONLY(ui32, MaxCount, 0); + YDB_READONLY(ui32, GroupsCount, 0); + public: + TDistribution(const ui32 count, const ui32 minCount, const ui32 maxCount, const ui32 groupsCount) + : Count(count) + , MinCount(minCount) + , MaxCount(maxCount) + , GroupsCount(groupsCount) + { + + } + + TString DebugString() const { + return TStringBuilder() + << "count=" << Count << ";" + << "min_count=" << MinCount << ";" + << "max_count=" << MaxCount << ";" + << "groups_count=" << GroupsCount << ";"; } + }; + + TDistribution GetDistribution() { + const TString selectQuery = "SELECT COUNT(*) as c, field FROM `" + TablePath + "` GROUP BY field ORDER BY field"; - TBase::CreateTestOlapTable(sender, storeName, Sprintf(R"( - Name: "%s" - ColumnShardCount: %d - Sharding { - HashSharding { - Function: %s - Columns: %s + auto tableClient = KikimrRunner.GetTableClient(); + auto rows = ExecuteScanQuery(tableClient, selectQuery); + ui32 count = 0; + std::optional<ui32> minCount; + std::optional<ui32> maxCount; + std::set<TString> groups; + for (auto&& r : rows) { + for (auto&& c : r) { + if (c.first == "c") { + const ui64 v = GetUint64(c.second); + count += v; + if (!minCount || *minCount > v) { + minCount = v; + } + if (!maxCount || *maxCount < v) { + maxCount = v; + } + } else if (c.first == "field") { + Y_VERIFY(groups.emplace(c.second.GetProto().DebugString()).second); + } + Cerr << c.first << ":" << Endl << c.second.GetProto().DebugString() << Endl; } - })", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str())); + } + Y_VERIFY(maxCount); + Y_VERIFY(minCount); + return TDistribution(count, *minCount, *maxCount, groups.size()); + } + + void GetVolumes(ui64& rawBytes, ui64& bytes, const bool verbose = false) { + const TString selectQuery = "SELECT * FROM `" + TablePath + "/.sys/primary_index_stats`"; + + auto tableClient = KikimrRunner.GetTableClient(); + + std::optional<ui64> rawBytesPred; + std::optional<ui64> bytesPred; + while (true) { + auto rows = ExecuteScanQuery(tableClient, selectQuery); + rawBytes = 0; + bytes = 0; + for (auto&& r : rows) { + if (verbose) { + Cerr << "-------" << Endl; + } + for (auto&& c : r) { + if (c.first == "RawBytes") { + rawBytes += GetUint64(c.second); + } + if (c.first == "Bytes") { + bytes += GetUint64(c.second); + } + if (verbose) { + Cerr << c.first << ":" << Endl << c.second.GetProto().DebugString() << Endl; + } + } + } + if (rawBytesPred && *rawBytesPred == rawBytes && bytesPred && *bytesPred == bytes) { + break; + } else { + rawBytesPred = rawBytes; + bytesPred = bytes; + Cerr << "Wait changes: " << bytes << "/" << rawBytes << Endl; + Sleep(TDuration::Seconds(5)); + } + } + Cerr << bytes << "/" << rawBytes << Endl; + } + + template <class TFiller> + void FillTable(const TFiller& fillPolicy, const ui32 pkKff = 0, const ui32 numRows = 800000) const { + std::vector<NArrow::NConstruction::IArrayBuilder::TPtr> builders; + builders.emplace_back(std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::Int64Type>>>("pk_int", numRows * pkKff)); + builders.emplace_back(std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<TFiller>>("field", fillPolicy)); + NArrow::NConstruction::TRecordBatchConstructor batchBuilder(builders); + std::shared_ptr<arrow::RecordBatch> batch = batchBuilder.BuildBatch(numRows); + TBase::SendDataViaActorSystem(TablePath, batch); + } + + void FillPKOnly(const ui32 pkKff = 0, const ui32 numRows = 800000) const { + std::vector<NArrow::NConstruction::IArrayBuilder::TPtr> builders; + builders.emplace_back(std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::Int64Type>>>("pk_int", numRows * pkKff)); + NArrow::NConstruction::TRecordBatchConstructor batchBuilder(builders); + std::shared_ptr<arrow::RecordBatch> batch = batchBuilder.BuildBatch(numRows); + TBase::SendDataViaActorSystem(TablePath, batch); + } + + void CreateTestOlapTable(ui32 storeShardsCount = 4, ui32 tableShardsCount = 3) { + CreateOlapTableWithStore(TableName, StoreName, storeShardsCount, tableShardsCount); + } + }; + + class TLocalHelper: public Tests::NCS::THelper { + private: + using TBase = Tests::NCS::THelper; + public: + + void CreateTestOlapTable(TString tableName = "olapTable", TString storeName = "olapStore", + ui32 storeShardsCount = 4, ui32 tableShardsCount = 3) { + CreateOlapTableWithStore(tableName, storeName, storeShardsCount, tableShardsCount); } using TBase::TBase; @@ -274,135 +558,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString()); } - TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPartIterator& it, NJson::TJsonValue* statInfo = nullptr) { - TVector<THashMap<TString, NYdb::TValue>> rows; - if (statInfo) { - *statInfo = NJson::JSON_NULL; - } - for (;;) { - auto streamPart = it.ReadNext().GetValueSync(); - if (!streamPart.IsSuccess()) { - UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); - break; - } - - UNIT_ASSERT_C(streamPart.HasResultSet() || streamPart.HasQueryStats(), - "Unexpected empty scan query response."); - - if (streamPart.HasQueryStats()) { - auto plan = streamPart.GetQueryStats().GetPlan(); - if (plan && statInfo) { - UNIT_ASSERT(NJson::ReadJsonFastTree(*plan, statInfo)); - } - } - - if (streamPart.HasResultSet()) { - auto resultSet = streamPart.ExtractResultSet(); - NYdb::TResultSetParser rsParser(resultSet); - while (rsParser.TryNextRow()) { - THashMap<TString, NYdb::TValue> row; - for (size_t ci = 0; ci < resultSet.ColumnsCount(); ++ci) { - row.emplace(resultSet.GetColumnsMeta()[ci].Name, rsParser.GetValue(ci)); - } - rows.emplace_back(std::move(row)); - } - } - } - return rows; - } - - void PrintValue(IOutputStream& out, const NYdb::TValue& v) { - NYdb::TValueParser value(v); - - while (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) { - if (value.IsNull()) { - out << "<NULL>"; - return; - } else { - value.OpenOptional(); - } - } - - switch (value.GetPrimitiveType()) { - case NYdb::EPrimitiveType::Uint32: { - out << value.GetUint32(); - break; - } - case NYdb::EPrimitiveType::Uint64: { - out << value.GetUint64(); - break; - } - case NYdb::EPrimitiveType::Utf8: { - out << value.GetUtf8(); - break; - } - case NYdb::EPrimitiveType::Timestamp: { - out << value.GetTimestamp(); - break; - } - default: { - UNIT_ASSERT_C(false, "PrintValue not iplemented for this type"); - } - } - } - - void PrintRow(IOutputStream& out, const THashMap<TString, NYdb::TValue>& fields) { - for (const auto& f : fields) { - out << f.first << ": "; - PrintValue(out, f.second); - out << " "; - } - } - - void PrintRows(IOutputStream& out, const TVector<THashMap<TString, NYdb::TValue>>& rows) { - for (const auto& r : rows) { - PrintRow(out, r); - out << "\n"; - } - } - - TVector<THashMap<TString, NYdb::TValue>> ExecuteScanQuery(NYdb::NTable::TTableClient& tableClient, const TString& query) { - Cerr << "====================================\n" - << "QUERY:\n" << query - << "\n\nRESULT:\n"; - - TStreamExecScanQuerySettings scanSettings; - auto it = tableClient.StreamExecuteScanQuery(query, scanSettings).GetValueSync(); - auto rows = CollectRows(it); - - PrintRows(Cerr, rows); - Cerr << "\n"; - - return rows; - } - - ui64 GetUint32(const NYdb::TValue& v) { - NYdb::TValueParser value(v); - if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) { - return *value.GetOptionalUint32(); - } else { - return value.GetUint32(); - } - } - - ui64 GetUint64(const NYdb::TValue& v) { - NYdb::TValueParser value(v); - if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) { - return *value.GetOptionalUint64(); - } else { - return value.GetUint64(); - } - } - - TInstant GetTimestamp(const NYdb::TValue& v) { - NYdb::TValueParser value(v); - if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) { - return *value.GetOptionalTimestamp(); - } else { - return value.GetTimestamp(); - } - } - void CreateTableOfAllTypes(TKikimrRunner& kikimr) { auto& legacyClient = kikimr.GetTestClient(); @@ -1218,7 +1373,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { TLocalHelper(kikimr).CreateTestOlapTable(); WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2000); - // EnableDebugLogging(kikimr); + //EnableDebugLogging(kikimr); auto tableClient = kikimr.GetTableClient(); auto selectQuery = TString(R"( @@ -1928,7 +2083,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { .SetEnableOlapSchemaOperations(true); TKikimrRunner kikimr(settings); - EnableDebugLogging(kikimr); + //EnableDebugLogging(kikimr); TLocalHelper(kikimr).CreateTestOlapTable(); auto tableClient = kikimr.GetTableClient(); @@ -3141,6 +3296,92 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } } + Y_UNIT_TEST(StatsSysViewEnumStringBytes) { + ui64 rawBytesPK1; + ui64 bytesPK1; + { + auto settings = TKikimrSettings() + .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + EnableDebugLogging(kikimr); + TTypedLocalHelper helper("", kikimr, "olapTable", "olapStore12"); + helper.CreateTestOlapTable(); + helper.FillPKOnly(0, 800000); + helper.GetVolumes(rawBytesPK1, bytesPK1, false); + } + + ui64 rawBytesUnpack1PK = 0; + ui64 bytesUnpack1PK = 0; + ui64 rawBytesPackAndUnpack2PK; + ui64 bytesPackAndUnpack2PK; + const ui32 rowsCount = 800000; + const ui32 groupsCount = 512; + { + auto settings = TKikimrSettings() + .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + EnableDebugLogging(kikimr); + TTypedLocalHelper helper("Utf8", kikimr); + helper.CreateTestOlapTable(); + NArrow::NConstruction::TStringPoolFiller sPool(groupsCount, 52); + helper.FillTable(sPool, 0, rowsCount); + helper.PrintCount(); + { + auto d = helper.GetDistribution(); + Y_VERIFY(d.GetCount() == rowsCount); + Y_VERIFY(d.GetGroupsCount() == groupsCount); + Y_VERIFY(d.GetMaxCount() - d.GetMinCount() <= 1); + } + helper.GetVolumes(rawBytesUnpack1PK, bytesUnpack1PK, false); + Sleep(TDuration::Seconds(5)); + auto tableClient = kikimr.GetTableClient(); + { + auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field, LOW_CARDINALITY=`true`);"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + { + auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field1, LOW_CARDINALITY=`true`);"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::GENERIC_ERROR, alterResult.GetIssues().ToString()); + } + { + auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field, LOW_CARDINALITY1=`true`);"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::GENERIC_ERROR, alterResult.GetIssues().ToString()); + } + Sleep(TDuration::Seconds(5)); + helper.FillTable(sPool, 1, 800000); + Sleep(TDuration::Seconds(5)); + { + helper.GetVolumes(rawBytesPackAndUnpack2PK, bytesPackAndUnpack2PK, false); + helper.PrintCount(); + { + auto d = helper.GetDistribution(); + Cerr << d.DebugString() << Endl; + Y_VERIFY(d.GetCount() == 2 * rowsCount); + Y_VERIFY(d.GetGroupsCount() == groupsCount); + Y_VERIFY(d.GetMaxCount() - d.GetMinCount() <= 2); + } + } + } + const ui64 rawBytesUnpack = rawBytesUnpack1PK - rawBytesPK1; + const ui64 bytesUnpack = bytesUnpack1PK - bytesPK1; + const ui64 rawBytesPack = rawBytesPackAndUnpack2PK - rawBytesUnpack1PK - rawBytesPK1; + const ui64 bytesPack = bytesPackAndUnpack2PK - bytesUnpack1PK - bytesPK1; + TStringBuilder result; + result << "unpacked data: " << rawBytesUnpack << " / " << bytesUnpack << Endl; + result << "packed data: " << rawBytesPack << " / " << bytesPack << Endl; + result << "frq_diff: " << 1.0 * bytesPack / bytesUnpack << Endl; + result << "frq_compression: " << 1.0 * bytesPack / rawBytesPack << Endl; + result << "pk_size : " << rawBytesPK1 << " / " << bytesPK1 << Endl; + Cerr << result << Endl; + Y_VERIFY(bytesPack / bytesUnpack < 0.1); + } + Y_UNIT_TEST(SelectLimit1ManyShards) { TPortManager tp; ui16 mbusport = tp.GetPort(2134); diff --git a/ydb/core/testlib/cs_helper.cpp b/ydb/core/testlib/cs_helper.cpp index ae234a1859c..baebf4d7af4 100644 --- a/ydb/core/testlib/cs_helper.cpp +++ b/ydb/core/testlib/cs_helper.cpp @@ -26,7 +26,10 @@ void THelperSchemaless::CreateTestOlapStore(TActorId sender, TString scheme) { Server.GetRuntime()->Send(new IEventHandle(MakeTxProxyID(), sender, request.release())); auto ev = Server.GetRuntime()->GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); + Cerr << ev->Get()->Record.DebugString() << Endl; + auto status = ev->Get()->Record.GetStatus(); ui64 txId = ev->Get()->Record.GetTxId(); + UNIT_ASSERT(status != TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecError); WaitForSchemeOperation(sender, txId); } @@ -50,11 +53,12 @@ void THelperSchemaless::CreateTestOlapTable(TActorId sender, TString storeOrDirN auto ev = Server.GetRuntime()->GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); ui64 txId = ev->Get()->Record.GetTxId(); auto status = ev->Get()->Record.GetStatus(); + Cerr << ev->Get()->Record.DebugString() << Endl; UNIT_ASSERT(status != TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecError); WaitForSchemeOperation(sender, txId); } -void THelperSchemaless::SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch) { +void THelperSchemaless::SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch) const { auto* runtime = Server.GetRuntime(); UNIT_ASSERT(batch); @@ -93,14 +97,14 @@ void THelperSchemaless::SendDataViaActorSystem(TString testTable, std::shared_pt runtime->DispatchEvents(options); } -void THelperSchemaless::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { +void THelperSchemaless::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const { auto batch = TestArrowBatch(pathIdBegin, tsBegin, rowCount); SendDataViaActorSystem(testTable, batch); } // -std::shared_ptr<arrow::Schema> THelper::GetArrowSchema() { +std::shared_ptr<arrow::Schema> THelper::GetArrowSchema() const { std::vector<std::shared_ptr<arrow::Field>> fields; fields.emplace_back(arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO))); fields.emplace_back(arrow::field("resource_id", arrow::utf8())); @@ -113,7 +117,7 @@ std::shared_ptr<arrow::Schema> THelper::GetArrowSchema() { return std::make_shared<arrow::Schema>(std::move(fields)); } -std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { +std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const { std::shared_ptr<arrow::Schema> schema = GetArrowSchema(); arrow::TimestampBuilder b1(arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO), arrow::default_memory_pool()); @@ -181,9 +185,35 @@ TString THelper::GetTestTableSchema() const { return sb; } +void THelper::CreateOlapTableWithStore(TString tableName /*= "olapTable"*/, TString storeName /*= "olapStore"*/, ui32 storeShardsCount /*= 4*/, ui32 tableShardsCount /*= 3*/) { + TActorId sender = Server.GetRuntime()->AllocateEdgeActor(); + CreateTestOlapStore(sender, Sprintf(R"( + Name: "%s" + ColumnShardCount: %d + SchemaPresets { + Name: "default" + Schema { + %s + } + } + )", storeName.c_str(), storeShardsCount, GetTestTableSchema().data())); + + const TString shardingColumns = "[\"" + JoinSeq("\",\"", GetShardingColumns()) + "\"]"; + + TBase::CreateTestOlapTable(sender, storeName, Sprintf(R"( + Name: "%s" + ColumnShardCount: %d + Sharding { + HashSharding { + Function: %s + Columns: %s + } + })", tableName.c_str(), tableShardsCount, ShardingMethod.data(), shardingColumns.c_str())); +} + // Clickbench table -std::shared_ptr<arrow::Schema> TCickBenchHelper::GetArrowSchema() { +std::shared_ptr<arrow::Schema> TCickBenchHelper::GetArrowSchema() const { return std::make_shared<arrow::Schema>( std::vector<std::shared_ptr<arrow::Field>> { arrow::field("WatchID", arrow::int64()), @@ -294,7 +324,7 @@ std::shared_ptr<arrow::Schema> TCickBenchHelper::GetArrowSchema() { }); } -std::shared_ptr<arrow::RecordBatch> TCickBenchHelper::TestArrowBatch(ui64, ui64 begin, size_t rowCount) { +std::shared_ptr<arrow::RecordBatch> TCickBenchHelper::TestArrowBatch(ui64, ui64 begin, size_t rowCount) const { std::shared_ptr<arrow::Schema> schema = GetArrowSchema(); UNIT_ASSERT(schema); UNIT_ASSERT(schema->num_fields()); @@ -351,7 +381,7 @@ std::shared_ptr<arrow::RecordBatch> TCickBenchHelper::TestArrowBatch(ui64, ui64 // Table with NULLs -std::shared_ptr<arrow::Schema> TTableWithNullsHelper::GetArrowSchema() { +std::shared_ptr<arrow::Schema> TTableWithNullsHelper::GetArrowSchema() const { return std::make_shared<arrow::Schema>( std::vector<std::shared_ptr<arrow::Field>>{ arrow::field("id", arrow::int32()), @@ -363,11 +393,11 @@ std::shared_ptr<arrow::Schema> TTableWithNullsHelper::GetArrowSchema() { }); } -std::shared_ptr<arrow::RecordBatch> TTableWithNullsHelper::TestArrowBatch() { +std::shared_ptr<arrow::RecordBatch> TTableWithNullsHelper::TestArrowBatch() const { return TestArrowBatch(0, 0, 10); } -std::shared_ptr<arrow::RecordBatch> TTableWithNullsHelper::TestArrowBatch(ui64, ui64, size_t rowCount) { +std::shared_ptr<arrow::RecordBatch> TTableWithNullsHelper::TestArrowBatch(ui64, ui64, size_t rowCount) const { rowCount = 10; std::shared_ptr<arrow::Schema> schema = GetArrowSchema(); diff --git a/ydb/core/testlib/cs_helper.h b/ydb/core/testlib/cs_helper.h index f2022674160..0ed78d368f9 100644 --- a/ydb/core/testlib/cs_helper.h +++ b/ydb/core/testlib/cs_helper.h @@ -14,21 +14,31 @@ public: using TBase::TBase; void CreateTestOlapStore(TActorId sender, TString scheme); void CreateTestOlapTable(TActorId sender, TString storeOrDirName, TString scheme); - void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount); - void SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch); + void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const; + void SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch) const; - virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) = 0; + virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const = 0; }; class THelper: public THelperSchemaless { private: using TBase = THelperSchemaless; - std::shared_ptr<arrow::Schema> GetArrowSchema(); + std::shared_ptr<arrow::Schema> GetArrowSchema() const; YDB_FLAG_ACCESSOR(WithJsonDocument, false); + TString ShardingMethod = "HASH_FUNCTION_CLOUD_LOGS"; +protected: + void CreateOlapTableWithStore(TString tableName = "olapTable", TString storeName = "olapStore", + ui32 storeShardsCount = 4, ui32 tableShardsCount = 3); public: using TBase::TBase; + THelper& SetShardingMethod(const TString& value) { + Y_VERIFY(value == "HASH_FUNCTION_CLOUD_LOGS" || value == "HASH_FUNCTION_MODULO_N"); + ShardingMethod = value; + return *this; + } + static constexpr const char * PROTO_SCHEMA = R"( Columns { Name: "timestamp" Type: "Timestamp" NotNull: true } #Columns { Name: "resource_type" Type: "Utf8" } @@ -44,16 +54,19 @@ public: Engine: COLUMN_ENGINE_REPLACING_TIMESERIES )"; - TString GetTestTableSchema() const; + virtual std::vector<TString> GetShardingColumns() const { + return {"timestamp", "uid"}; + } + virtual TString GetTestTableSchema() const; - std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) override; + virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const override; }; class TCickBenchHelper: public THelperSchemaless { private: using TBase = THelperSchemaless; - std::shared_ptr<arrow::Schema> GetArrowSchema(); + std::shared_ptr<arrow::Schema> GetArrowSchema() const; public: using TBase::TBase; @@ -167,14 +180,14 @@ public: KeyColumnNames: ["EventTime", "EventDate", "CounterID", "UserID", "WatchID"] )"; - std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64 begin, size_t rowCount) override; + std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64 begin, size_t rowCount) const override; }; class TTableWithNullsHelper: public THelperSchemaless { private: using TBase = THelperSchemaless; - std::shared_ptr<arrow::Schema> GetArrowSchema(); + std::shared_ptr<arrow::Schema> GetArrowSchema() const; public: using TBase::TBase; @@ -188,8 +201,8 @@ public: KeyColumnNames: "id" )"; - std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64, size_t rowCount = 10) override; - std::shared_ptr<arrow::RecordBatch> TestArrowBatch(); + std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64, size_t rowCount = 10) const override; + std::shared_ptr<arrow::RecordBatch> TestArrowBatch() const; }; } diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index 970730d3e04..40c3539ec55 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -335,7 +335,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte LOG_S_DEBUG("WriteIndex (" << blobs.size() << " blobs) at tablet " << TabletID()); Y_VERIFY(!blobs.empty()); - ctx.Register(CreateWriteActor(TabletID(), NOlap::TIndexInfo("dummy", 0), ctx.SelfID, + ctx.Register(CreateWriteActor(TabletID(), NOlap::TIndexInfo::BuildDefault(), ctx.SelfID, BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release())); } } else { diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index a565d80ba6d..2e9e316b223 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -145,6 +145,8 @@ public: struct TIndexInfo : public NTable::TScheme::TTableSchema { private: THashMap<ui32, TColumnFeatures> ColumnFeatures; + TIndexInfo(const TString& name, ui32 id, bool compositeIndexKey = false); + bool DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema); public: static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step"; static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id"; @@ -177,10 +179,13 @@ public: } return true; } - TIndexInfo(const TString& name, ui32 id, bool compositeIndexKey = false); - bool DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema); public: + static TIndexInfo BuildDefault() { + TIndexInfo result("dummy", 0, false); + return result; + } + static std::optional<TIndexInfo> BuildFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) { TIndexInfo result("", 0, schema.GetCompositeMarks()); if (!result.DeserializeFromProto(schema)) { diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 0803566198b..155a8292ea4 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -172,7 +172,7 @@ static const std::vector<std::pair<TString, TTypeInfo>> testKey = { TIndexInfo TestTableInfo(const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema = testColumns, const std::vector<std::pair<TString, TTypeInfo>>& key = testKey) { - TIndexInfo indexInfo("", 0); + TIndexInfo indexInfo = TIndexInfo::BuildDefault(); for (ui32 i = 0; i < ydbSchema.size(); ++i) { ui32 id = i + 1; |