aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-05-22 16:30:42 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-05-22 16:30:42 +0300
commitf86980eca696fad8264c575b94b8e0e2aa26be29 (patch)
tree2ff5eae59e1e286cfab05a30311cc58167fcd222
parentcfaf3952685b3a7dc43dceafe49f2449391fc54d (diff)
downloadydb-f86980eca696fad8264c575b94b8e0e2aa26be29.tar.gz
dictionary compression test
-rw-r--r--ydb/core/formats/arrow/dictionary/conversion.cpp6
-rw-r--r--ydb/core/formats/arrow/simple_builder/array.cpp2
-rw-r--r--ydb/core/formats/arrow/simple_builder/array.h31
-rw-r--r--ydb/core/formats/arrow/simple_builder/batch.cpp2
-rw-r--r--ydb/core/formats/arrow/simple_builder/batch.h2
-rw-r--r--ydb/core/formats/arrow/simple_builder/filler.cpp2
-rw-r--r--ydb/core/formats/arrow/simple_builder/filler.h55
-rw-r--r--ydb/core/formats/arrow/ut/ut_dictionary.cpp28
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp555
-rw-r--r--ydb/core/testlib/cs_helper.cpp48
-rw-r--r--ydb/core/testlib/cs_helper.h35
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h9
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp2
14 files changed, 575 insertions, 204 deletions
diff --git a/ydb/core/formats/arrow/dictionary/conversion.cpp b/ydb/core/formats/arrow/dictionary/conversion.cpp
index 8c96ea23ae0..78c57c8d23f 100644
--- a/ydb/core/formats/arrow/dictionary/conversion.cpp
+++ b/ydb/core/formats/arrow/dictionary/conversion.cpp
@@ -30,9 +30,9 @@ std::shared_ptr<arrow::Array> DictionaryToArray(const arrow::DictionaryArray& da
constexpr bool indicesIntegral = std::is_integral<typename TWrapIndices::T::c_type>::value;
if constexpr (indicesIntegral && hasCType) {
using TIndices = typename arrow::TypeTraits<typename TWrapIndices::T>::ArrayType;
- using TDictionaryAccessor = TDictionaryArrayAccessor<TDictionaryValue, TIndices>;
+ using TDictionaryAccessor = NConstruction::TDictionaryArrayAccessor<TDictionaryValue, TIndices>;
auto& columnIndices = static_cast<const TIndices&>(*data.indices());
- result = TSimpleArrayConstructor<TDictionaryAccessor>("absent", TDictionaryAccessor(columnDictionary, columnIndices)).BuildArray(data.length());
+ result = NConstruction::TSimpleArrayConstructor("absent", TDictionaryAccessor(columnDictionary, columnIndices)).BuildArray(data.length());
return true;
}
}
@@ -80,7 +80,7 @@ std::shared_ptr<arrow::DictionaryArray> ArrayToDictionary(const std::shared_ptr<
SwitchType(data->type_id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
if constexpr (arrow::has_string_view<typename TWrap::T>::value && arrow::TypeTraits<typename TWrap::T>::is_parameter_free) {
- auto resultArray = TDictionaryArrayConstructor<TLinearArrayAccessor<typename TWrap::T>>("absent", *data).BuildArray(data->length());
+ auto resultArray = NConstruction::TDictionaryArrayConstructor<NConstruction::TLinearArrayAccessor<typename TWrap::T>>("absent", *data).BuildArray(data->length());
Y_VERIFY(resultArray->type()->id() == arrow::Type::DICTIONARY);
result = static_pointer_cast<arrow::DictionaryArray>(resultArray);
} else {
diff --git a/ydb/core/formats/arrow/simple_builder/array.cpp b/ydb/core/formats/arrow/simple_builder/array.cpp
index 4c4ffca6c1d..009e3e8702d 100644
--- a/ydb/core/formats/arrow/simple_builder/array.cpp
+++ b/ydb/core/formats/arrow/simple_builder/array.cpp
@@ -1,5 +1,5 @@
#include "array.h"
-namespace NKikimr::NArrow {
+namespace NKikimr::NArrow::NConstruction {
}
diff --git a/ydb/core/formats/arrow/simple_builder/array.h b/ydb/core/formats/arrow/simple_builder/array.h
index 7f823787590..87a81a6c37c 100644
--- a/ydb/core/formats/arrow/simple_builder/array.h
+++ b/ydb/core/formats/arrow/simple_builder/array.h
@@ -5,7 +5,7 @@
#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_dict.h>
#include <util/generic/string.h>
-namespace NKikimr::NArrow {
+namespace NKikimr::NArrow::NConstruction {
class IArrayBuilder {
private:
@@ -49,6 +49,35 @@ public:
};
template <class TFiller>
+class TBinaryArrayConstructor: public IArrayBuilder {
+private:
+ using TBase = IArrayBuilder;
+ using TBuilder = typename arrow::TypeTraits<typename TFiller::TValue>::BuilderType;
+ const TFiller Filler;
+protected:
+ virtual std::shared_ptr<arrow::Array> DoBuildArray(const ui32 recordsCount) const override {
+ TBuilder fBuilder = TBuilder();
+ std::vector<const char*> values;
+ values.reserve(recordsCount);
+ for (ui32 i = 0; i < recordsCount; ++i) {
+ values.emplace_back(Filler.GetValueView(i));
+ }
+ auto addStatus = fBuilder.AppendValues(values.data(), recordsCount);
+ if (!addStatus.ok()) {
+ const std::string errorMessage = addStatus.ToString();
+ Y_VERIFY(false, "%s", errorMessage.data());
+ }
+ return *fBuilder.Finish();
+ }
+public:
+ TBinaryArrayConstructor(const TString& fieldName, const TFiller& filler = TFiller())
+ : TBase(fieldName)
+ , Filler(filler) {
+
+ }
+};
+
+template <class TFiller>
class TDictionaryArrayConstructor: public IArrayBuilder {
private:
using TBase = IArrayBuilder;
diff --git a/ydb/core/formats/arrow/simple_builder/batch.cpp b/ydb/core/formats/arrow/simple_builder/batch.cpp
index 4a60043a087..c1f27c58b7c 100644
--- a/ydb/core/formats/arrow/simple_builder/batch.cpp
+++ b/ydb/core/formats/arrow/simple_builder/batch.cpp
@@ -1,6 +1,6 @@
#include "batch.h"
-namespace NKikimr::NArrow {
+namespace NKikimr::NArrow::NConstruction {
std::shared_ptr<arrow::RecordBatch> TRecordBatchConstructor::BuildBatch(const ui32 numRows) const {
std::vector<std::shared_ptr<arrow::Array>> columns;
diff --git a/ydb/core/formats/arrow/simple_builder/batch.h b/ydb/core/formats/arrow/simple_builder/batch.h
index 4dfb3ea0e3f..f910a0c9b14 100644
--- a/ydb/core/formats/arrow/simple_builder/batch.h
+++ b/ydb/core/formats/arrow/simple_builder/batch.h
@@ -2,7 +2,7 @@
#include "array.h"
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
-namespace NKikimr::NArrow {
+namespace NKikimr::NArrow::NConstruction {
class TRecordBatchConstructor {
private:
const std::vector<IArrayBuilder::TPtr> Builders;
diff --git a/ydb/core/formats/arrow/simple_builder/filler.cpp b/ydb/core/formats/arrow/simple_builder/filler.cpp
index c4ce2456a57..f6168701ddb 100644
--- a/ydb/core/formats/arrow/simple_builder/filler.cpp
+++ b/ydb/core/formats/arrow/simple_builder/filler.cpp
@@ -1,7 +1,7 @@
#include "filler.h"
#include <library/cpp/testing/unittest/registar.h>
-namespace NKikimr::NArrow {
+namespace NKikimr::NArrow::NConstruction {
TStringPoolFiller::TStringPoolFiller(const ui32 poolSize, const ui32 strLen) {
for (ui32 i = 0; i < poolSize; ++i) {
diff --git a/ydb/core/formats/arrow/simple_builder/filler.h b/ydb/core/formats/arrow/simple_builder/filler.h
index 520192af2fa..c2eab029144 100644
--- a/ydb/core/formats/arrow/simple_builder/filler.h
+++ b/ydb/core/formats/arrow/simple_builder/filler.h
@@ -5,14 +5,23 @@
#include <util/system/types.h>
#include <util/generic/string.h>
-namespace NKikimr::NArrow {
+namespace NKikimr::NArrow::NConstruction {
template <class TArrowInt>
class TIntSeqFiller {
public:
using TValue = TArrowInt;
- typename TArrowInt::c_type GetValue(const ui32 idx) const {
- return idx;
+private:
+ using CType = typename TArrowInt::c_type;
+ const CType Delta;
+public:
+ CType GetValue(const CType idx) const {
+ return Delta + idx;
+ }
+ TIntSeqFiller(const CType delta = 0)
+ : Delta(delta)
+ {
+
}
};
@@ -42,6 +51,22 @@ public:
}
};
+template <class TValueExt>
+class TBinaryArrayAccessor {
+private:
+ using TArray = typename arrow::TypeTraits<TValueExt>::ArrayType;
+ const TArray& Data;
+public:
+ using TValue = TValueExt;
+ const char* GetValueView(const ui32 idx) const {
+ return Data.GetView(idx).data();
+ }
+
+ TBinaryArrayAccessor(const arrow::Array& data)
+ : Data(static_cast<const TArray&>(data)) {
+ }
+};
+
template <class TDictionaryValue, class TIndices>
class TDictionaryArrayAccessor {
private:
@@ -56,8 +81,32 @@ public:
TDictionaryArrayAccessor(const TDictionary& dictionary, const TIndices& indices)
: Dictionary(dictionary)
+ , Indices(indices) {
+ }
+};
+
+template <class TDictionaryValue, class TIndices>
+class TBinaryDictionaryArrayAccessor {
+private:
+ using TDictionary = typename arrow::TypeTraits<TDictionaryValue>::ArrayType;
+ const TDictionary& Dictionary;
+ const TIndices& Indices;
+ std::vector<TString> DictionaryStrings;
+public:
+ using TValue = TDictionaryValue;
+ const char* GetValueView(const ui32 idx) const {
+ return DictionaryStrings[Indices.Value(idx)].data();
+ }
+
+ TBinaryDictionaryArrayAccessor(const TDictionary& dictionary, const TIndices& indices)
+ : Dictionary(dictionary)
, Indices(indices)
{
+ DictionaryStrings.reserve(Dictionary.length());
+ for (i64 idx = 0; idx < Dictionary.length(); ++idx) {
+ auto sView = Dictionary.Value(idx);
+ DictionaryStrings.emplace_back(TString(sView.data(), sView.size()));
+ }
}
};
diff --git a/ydb/core/formats/arrow/ut/ut_dictionary.cpp b/ydb/core/formats/arrow/ut/ut_dictionary.cpp
index f90efa8c709..77e15a8019f 100644
--- a/ydb/core/formats/arrow/ut/ut_dictionary.cpp
+++ b/ydb/core/formats/arrow/ut/ut_dictionary.cpp
@@ -11,15 +11,15 @@ Y_UNIT_TEST_SUITE(Dictionary) {
using namespace NKikimr::NArrow;
- ui64 Test(IArrayBuilder::TPtr column, const arrow::ipc::IpcWriteOptions& options, const ui32 bSize) {
- std::shared_ptr<arrow::RecordBatch> batch = TRecordBatchConstructor({ column }).BuildBatch(bSize);
- const TString data = NKikimr::NArrow::NSerialization::TFullDataSerializer(options).Serialize(batch);
- auto deserializedBatch = *NKikimr::NArrow::NSerialization::TFullDataDeserializer().Deserialize(data);
+ ui64 Test(NConstruction::IArrayBuilder::TPtr column, const arrow::ipc::IpcWriteOptions& options, const ui32 bSize) {
+ std::shared_ptr<arrow::RecordBatch> batch = NConstruction::TRecordBatchConstructor({ column }).BuildBatch(bSize);
+ const TString data = NSerialization::TFullDataSerializer(options).Serialize(batch);
+ auto deserializedBatch = *NSerialization::TFullDataDeserializer().Deserialize(data);
Y_VERIFY(!!deserializedBatch);
auto originalBatchTransformed = DictionaryToArray(batch);
auto roundBatchTransformed = DictionaryToArray(deserializedBatch);
- const TString roundUnpacked = NKikimr::NArrow::NSerialization::TFullDataSerializer(options).Serialize(roundBatchTransformed);
- const TString roundTransformed = NKikimr::NArrow::NSerialization::TFullDataSerializer(options).Serialize(originalBatchTransformed);
+ const TString roundUnpacked = NSerialization::TFullDataSerializer(options).Serialize(roundBatchTransformed);
+ const TString roundTransformed = NSerialization::TFullDataSerializer(options).Serialize(originalBatchTransformed);
Y_VERIFY(roundBatchTransformed->num_rows() == originalBatchTransformed->num_rows());
Y_VERIFY(roundUnpacked == roundTransformed);
return data.size();
@@ -40,11 +40,13 @@ Y_UNIT_TEST_SUITE(Dictionary) {
ui64 bytesDict;
ui64 bytesRaw;
{
- IArrayBuilder::TPtr column = std::make_shared<TDictionaryArrayConstructor<TStringPoolFiller>>("field", TStringPoolFiller(pSize, strLen));
+ NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TDictionaryArrayConstructor<NConstruction::TStringPoolFiller>>(
+ "field", NConstruction::TStringPoolFiller(pSize, strLen));
bytesDict = Test(column, options, bSize);
}
{
- IArrayBuilder::TPtr column = std::make_shared<TSimpleArrayConstructor<TStringPoolFiller>>("field", TStringPoolFiller(pSize, strLen));
+ NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>(
+ "field", NConstruction::TStringPoolFiller(pSize, strLen));
bytesRaw = Test(column, options, bSize);
}
Cerr << "--------" << bytesDict << " / " << bytesRaw << " = " << 1.0 * bytesDict / bytesRaw << Endl;
@@ -69,10 +71,12 @@ Y_UNIT_TEST_SUITE(Dictionary) {
ui64 bytesFull;
ui64 bytesPayload;
{
- IArrayBuilder::TPtr column = std::make_shared<TSimpleArrayConstructor<TStringPoolFiller>>("field", TStringPoolFiller(pSize, strLen));
- std::shared_ptr<arrow::RecordBatch> batch = TRecordBatchConstructor({ column }).BuildBatch(bSize);
- const TString dataFull = NKikimr::NArrow::NSerialization::TFullDataSerializer(options).Serialize(batch);
- const TString dataPayload = NKikimr::NArrow::NSerialization::TBatchPayloadSerializer(options).Serialize(batch);
+ NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>(
+ "field", NConstruction::TStringPoolFiller(pSize, strLen)
+ );
+ std::shared_ptr<arrow::RecordBatch> batch = NConstruction::TRecordBatchConstructor({ column }).BuildBatch(bSize);
+ const TString dataFull = NSerialization::TFullDataSerializer(options).Serialize(batch);
+ const TString dataPayload = NSerialization::TBatchPayloadSerializer(options).Serialize(batch);
bytesFull = dataFull.size();
bytesPayload = dataPayload.size();
}
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index 4789a2ba675..d2891880efd 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -8,6 +8,9 @@
#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/writer.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
+#include <ydb/core/formats/arrow/simple_builder/filler.h>
+#include <ydb/core/formats/arrow/simple_builder/array.h>
+#include <ydb/core/formats/arrow/simple_builder/batch.h>
#include <ydb/core/formats/arrow/ssa_runtime_version.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
#include <ydb/core/tx/datashard/datashard.h>
@@ -23,6 +26,7 @@
#include <util/system/sanitizers.h>
#include <fmt/format.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/type_traits.h>
namespace NKikimr {
namespace NKqp {
@@ -43,7 +47,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
//runtime->SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NActors::NLog::PRI_DEBUG);
// runtime->SetLogPriority(NKikimrServices::SCHEME_BOARD_REPLICA, NActors::NLog::PRI_DEBUG);
// runtime->SetLogPriority(NKikimrServices::TX_PROXY, NActors::NLog::PRI_DEBUG);
- runtime->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_DEBUG);
+ // runtime->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_DEBUG);
runtime->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG);
runtime->SetLogPriority(NKikimrServices::KQP_GATEWAY, NActors::NLog::PRI_DEBUG);
runtime->SetLogPriority(NKikimrServices::KQP_RESOURCE_MANAGER, NActors::NLog::PRI_DEBUG);
@@ -59,39 +63,319 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
EnableDebugLogging(kikimr.GetTestServer().GetRuntime());
}
- class TLocalHelper: public Tests::NCS::THelper {
+ void PrintValue(IOutputStream& out, const NYdb::TValue& v) {
+ NYdb::TValueParser value(v);
+
+ while (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) {
+ if (value.IsNull()) {
+ out << "<NULL>";
+ return;
+ } else {
+ value.OpenOptional();
+ }
+ }
+
+ switch (value.GetPrimitiveType()) {
+ case NYdb::EPrimitiveType::Uint32:
+ {
+ out << value.GetUint32();
+ break;
+ }
+ case NYdb::EPrimitiveType::Uint64:
+ {
+ out << value.GetUint64();
+ break;
+ }
+ case NYdb::EPrimitiveType::Int64:
+ {
+ out << value.GetInt64();
+ break;
+ }
+ case NYdb::EPrimitiveType::Utf8:
+ {
+ out << value.GetUtf8();
+ break;
+ }
+ case NYdb::EPrimitiveType::Timestamp:
+ {
+ out << value.GetTimestamp();
+ break;
+ }
+ default:
+ {
+ UNIT_ASSERT_C(false, "PrintValue not iplemented for this type");
+ }
+ }
+ }
+
+ void PrintRow(IOutputStream& out, const THashMap<TString, NYdb::TValue>& fields) {
+ for (const auto& f : fields) {
+ out << f.first << ": ";
+ PrintValue(out, f.second);
+ out << " ";
+ }
+ }
+
+ void PrintRows(IOutputStream& out, const TVector<THashMap<TString, NYdb::TValue>>& rows) {
+ for (const auto& r : rows) {
+ PrintRow(out, r);
+ out << "\n";
+ }
+ }
+
+ TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPartIterator& it, NJson::TJsonValue* statInfo = nullptr) {
+ TVector<THashMap<TString, NYdb::TValue>> rows;
+ if (statInfo) {
+ *statInfo = NJson::JSON_NULL;
+ }
+ for (;;) {
+ auto streamPart = it.ReadNext().GetValueSync();
+ if (!streamPart.IsSuccess()) {
+ UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString());
+ break;
+ }
+
+ UNIT_ASSERT_C(streamPart.HasResultSet() || streamPart.HasQueryStats(),
+ "Unexpected empty scan query response.");
+
+ if (streamPart.HasQueryStats()) {
+ auto plan = streamPart.GetQueryStats().GetPlan();
+ if (plan && statInfo) {
+ UNIT_ASSERT(NJson::ReadJsonFastTree(*plan, statInfo));
+ }
+ }
+
+ if (streamPart.HasResultSet()) {
+ auto resultSet = streamPart.ExtractResultSet();
+ NYdb::TResultSetParser rsParser(resultSet);
+ while (rsParser.TryNextRow()) {
+ THashMap<TString, NYdb::TValue> row;
+ for (size_t ci = 0; ci < resultSet.ColumnsCount(); ++ci) {
+ row.emplace(resultSet.GetColumnsMeta()[ci].Name, rsParser.GetValue(ci));
+ }
+ rows.emplace_back(std::move(row));
+ }
+ }
+ }
+ return rows;
+ }
+
+ TVector<THashMap<TString, NYdb::TValue>> ExecuteScanQuery(NYdb::NTable::TTableClient& tableClient, const TString& query) {
+ Cerr << "====================================\n"
+ << "QUERY:\n" << query
+ << "\n\nRESULT:\n";
+
+ TStreamExecScanQuerySettings scanSettings;
+ auto it = tableClient.StreamExecuteScanQuery(query, scanSettings).GetValueSync();
+ auto rows = CollectRows(it);
+
+ PrintRows(Cerr, rows);
+ Cerr << "\n";
+
+ return rows;
+ }
+
+ ui64 GetUint32(const NYdb::TValue& v) {
+ NYdb::TValueParser value(v);
+ if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) {
+ return *value.GetOptionalUint32();
+ } else {
+ return value.GetUint32();
+ }
+ }
+
+ ui64 GetUint64(const NYdb::TValue& v) {
+ NYdb::TValueParser value(v);
+ if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) {
+ return *value.GetOptionalUint64();
+ } else {
+ return value.GetUint64();
+ }
+ }
+
+ TInstant GetTimestamp(const NYdb::TValue& v) {
+ NYdb::TValueParser value(v);
+ if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) {
+ return *value.GetOptionalTimestamp();
+ } else {
+ return value.GetTimestamp();
+ }
+ }
+
+ class TTypedLocalHelper: public Tests::NCS::THelper {
private:
using TBase = Tests::NCS::THelper;
+ const TString TypeName;
+ TKikimrRunner& KikimrRunner;
+ const TString TablePath;
+ const TString TableName;
+ const TString StoreName;
+ protected:
+ virtual TString GetTestTableSchema() const override {
+ TString result;
+ if (TypeName) {
+ result = R"(Columns { Name: "field" Type: ")" + TypeName + "\"}";
+ }
+ result += R"(
+ Columns { Name: "pk_int" Type: "Int64" }
+ KeyColumnNames: "pk_int"
+ Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
+ )";
+ return result;
+ }
+ virtual std::vector<TString> GetShardingColumns() const override {
+ return { "pk_int" };
+ }
public:
- void CreateTestOlapTable(TString tableName = "olapTable", TString storeName = "olapStore",
- ui32 storeShardsCount = 4, ui32 tableShardsCount = 3,
- TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") {
- TActorId sender = Server.GetRuntime()->AllocateEdgeActor();
- CreateTestOlapStore(sender, Sprintf(R"(
- Name: "%s"
- ColumnShardCount: %d
- SchemaPresets {
- Name: "default"
- Schema {
- %s
- }
+ TTypedLocalHelper(const TString& typeName, TKikimrRunner& kikimrRunner, const TString& tableName = "olapTable", const TString& storeName = "olapStore")
+ : TBase(kikimrRunner.GetTestServer())
+ , TypeName(typeName)
+ , KikimrRunner(kikimrRunner)
+ , TablePath("/Root/" + storeName + "/" + tableName)
+ , TableName(tableName)
+ , StoreName(storeName)
+ {
+ SetShardingMethod("HASH_FUNCTION_MODULO_N");
+ }
+
+ void PrintCount() {
+ const TString selectQuery = "SELECT COUNT(*), MAX(pk_int), MIN(pk_int) FROM `" + TablePath + "`";
+
+ auto tableClient = KikimrRunner.GetTableClient();
+ auto rows = ExecuteScanQuery(tableClient, selectQuery);
+ for (auto&& r : rows) {
+ for (auto&& c : r) {
+ Cerr << c.first << ":" << Endl << c.second.GetProto().DebugString() << Endl;
}
- )", storeName.c_str(), storeShardsCount, GetTestTableSchema().data()));
+ }
+ }
- TString shardingColumns = "[\"timestamp\", \"uid\"]";
- if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") {
- shardingColumns = "[\"uid\"]";
+ class TDistribution {
+ private:
+ YDB_READONLY(ui32, Count, 0);
+ YDB_READONLY(ui32, MinCount, 0);
+ YDB_READONLY(ui32, MaxCount, 0);
+ YDB_READONLY(ui32, GroupsCount, 0);
+ public:
+ TDistribution(const ui32 count, const ui32 minCount, const ui32 maxCount, const ui32 groupsCount)
+ : Count(count)
+ , MinCount(minCount)
+ , MaxCount(maxCount)
+ , GroupsCount(groupsCount)
+ {
+
+ }
+
+ TString DebugString() const {
+ return TStringBuilder()
+ << "count=" << Count << ";"
+ << "min_count=" << MinCount << ";"
+ << "max_count=" << MaxCount << ";"
+ << "groups_count=" << GroupsCount << ";";
}
+ };
+
+ TDistribution GetDistribution() {
+ const TString selectQuery = "SELECT COUNT(*) as c, field FROM `" + TablePath + "` GROUP BY field ORDER BY field";
- TBase::CreateTestOlapTable(sender, storeName, Sprintf(R"(
- Name: "%s"
- ColumnShardCount: %d
- Sharding {
- HashSharding {
- Function: %s
- Columns: %s
+ auto tableClient = KikimrRunner.GetTableClient();
+ auto rows = ExecuteScanQuery(tableClient, selectQuery);
+ ui32 count = 0;
+ std::optional<ui32> minCount;
+ std::optional<ui32> maxCount;
+ std::set<TString> groups;
+ for (auto&& r : rows) {
+ for (auto&& c : r) {
+ if (c.first == "c") {
+ const ui64 v = GetUint64(c.second);
+ count += v;
+ if (!minCount || *minCount > v) {
+ minCount = v;
+ }
+ if (!maxCount || *maxCount < v) {
+ maxCount = v;
+ }
+ } else if (c.first == "field") {
+ Y_VERIFY(groups.emplace(c.second.GetProto().DebugString()).second);
+ }
+ Cerr << c.first << ":" << Endl << c.second.GetProto().DebugString() << Endl;
}
- })", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str()));
+ }
+ Y_VERIFY(maxCount);
+ Y_VERIFY(minCount);
+ return TDistribution(count, *minCount, *maxCount, groups.size());
+ }
+
+ void GetVolumes(ui64& rawBytes, ui64& bytes, const bool verbose = false) {
+ const TString selectQuery = "SELECT * FROM `" + TablePath + "/.sys/primary_index_stats`";
+
+ auto tableClient = KikimrRunner.GetTableClient();
+
+ std::optional<ui64> rawBytesPred;
+ std::optional<ui64> bytesPred;
+ while (true) {
+ auto rows = ExecuteScanQuery(tableClient, selectQuery);
+ rawBytes = 0;
+ bytes = 0;
+ for (auto&& r : rows) {
+ if (verbose) {
+ Cerr << "-------" << Endl;
+ }
+ for (auto&& c : r) {
+ if (c.first == "RawBytes") {
+ rawBytes += GetUint64(c.second);
+ }
+ if (c.first == "Bytes") {
+ bytes += GetUint64(c.second);
+ }
+ if (verbose) {
+ Cerr << c.first << ":" << Endl << c.second.GetProto().DebugString() << Endl;
+ }
+ }
+ }
+ if (rawBytesPred && *rawBytesPred == rawBytes && bytesPred && *bytesPred == bytes) {
+ break;
+ } else {
+ rawBytesPred = rawBytes;
+ bytesPred = bytes;
+ Cerr << "Wait changes: " << bytes << "/" << rawBytes << Endl;
+ Sleep(TDuration::Seconds(5));
+ }
+ }
+ Cerr << bytes << "/" << rawBytes << Endl;
+ }
+
+ template <class TFiller>
+ void FillTable(const TFiller& fillPolicy, const ui32 pkKff = 0, const ui32 numRows = 800000) const {
+ std::vector<NArrow::NConstruction::IArrayBuilder::TPtr> builders;
+ builders.emplace_back(std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::Int64Type>>>("pk_int", numRows * pkKff));
+ builders.emplace_back(std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<TFiller>>("field", fillPolicy));
+ NArrow::NConstruction::TRecordBatchConstructor batchBuilder(builders);
+ std::shared_ptr<arrow::RecordBatch> batch = batchBuilder.BuildBatch(numRows);
+ TBase::SendDataViaActorSystem(TablePath, batch);
+ }
+
+ void FillPKOnly(const ui32 pkKff = 0, const ui32 numRows = 800000) const {
+ std::vector<NArrow::NConstruction::IArrayBuilder::TPtr> builders;
+ builders.emplace_back(std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::Int64Type>>>("pk_int", numRows * pkKff));
+ NArrow::NConstruction::TRecordBatchConstructor batchBuilder(builders);
+ std::shared_ptr<arrow::RecordBatch> batch = batchBuilder.BuildBatch(numRows);
+ TBase::SendDataViaActorSystem(TablePath, batch);
+ }
+
+ void CreateTestOlapTable(ui32 storeShardsCount = 4, ui32 tableShardsCount = 3) {
+ CreateOlapTableWithStore(TableName, StoreName, storeShardsCount, tableShardsCount);
+ }
+ };
+
+ class TLocalHelper: public Tests::NCS::THelper {
+ private:
+ using TBase = Tests::NCS::THelper;
+ public:
+
+ void CreateTestOlapTable(TString tableName = "olapTable", TString storeName = "olapStore",
+ ui32 storeShardsCount = 4, ui32 tableShardsCount = 3) {
+ CreateOlapTableWithStore(tableName, storeName, storeShardsCount, tableShardsCount);
}
using TBase::TBase;
@@ -274,135 +558,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString());
}
- TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPartIterator& it, NJson::TJsonValue* statInfo = nullptr) {
- TVector<THashMap<TString, NYdb::TValue>> rows;
- if (statInfo) {
- *statInfo = NJson::JSON_NULL;
- }
- for (;;) {
- auto streamPart = it.ReadNext().GetValueSync();
- if (!streamPart.IsSuccess()) {
- UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString());
- break;
- }
-
- UNIT_ASSERT_C(streamPart.HasResultSet() || streamPart.HasQueryStats(),
- "Unexpected empty scan query response.");
-
- if (streamPart.HasQueryStats()) {
- auto plan = streamPart.GetQueryStats().GetPlan();
- if (plan && statInfo) {
- UNIT_ASSERT(NJson::ReadJsonFastTree(*plan, statInfo));
- }
- }
-
- if (streamPart.HasResultSet()) {
- auto resultSet = streamPart.ExtractResultSet();
- NYdb::TResultSetParser rsParser(resultSet);
- while (rsParser.TryNextRow()) {
- THashMap<TString, NYdb::TValue> row;
- for (size_t ci = 0; ci < resultSet.ColumnsCount(); ++ci) {
- row.emplace(resultSet.GetColumnsMeta()[ci].Name, rsParser.GetValue(ci));
- }
- rows.emplace_back(std::move(row));
- }
- }
- }
- return rows;
- }
-
- void PrintValue(IOutputStream& out, const NYdb::TValue& v) {
- NYdb::TValueParser value(v);
-
- while (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) {
- if (value.IsNull()) {
- out << "<NULL>";
- return;
- } else {
- value.OpenOptional();
- }
- }
-
- switch (value.GetPrimitiveType()) {
- case NYdb::EPrimitiveType::Uint32: {
- out << value.GetUint32();
- break;
- }
- case NYdb::EPrimitiveType::Uint64: {
- out << value.GetUint64();
- break;
- }
- case NYdb::EPrimitiveType::Utf8: {
- out << value.GetUtf8();
- break;
- }
- case NYdb::EPrimitiveType::Timestamp: {
- out << value.GetTimestamp();
- break;
- }
- default: {
- UNIT_ASSERT_C(false, "PrintValue not iplemented for this type");
- }
- }
- }
-
- void PrintRow(IOutputStream& out, const THashMap<TString, NYdb::TValue>& fields) {
- for (const auto& f : fields) {
- out << f.first << ": ";
- PrintValue(out, f.second);
- out << " ";
- }
- }
-
- void PrintRows(IOutputStream& out, const TVector<THashMap<TString, NYdb::TValue>>& rows) {
- for (const auto& r : rows) {
- PrintRow(out, r);
- out << "\n";
- }
- }
-
- TVector<THashMap<TString, NYdb::TValue>> ExecuteScanQuery(NYdb::NTable::TTableClient& tableClient, const TString& query) {
- Cerr << "====================================\n"
- << "QUERY:\n" << query
- << "\n\nRESULT:\n";
-
- TStreamExecScanQuerySettings scanSettings;
- auto it = tableClient.StreamExecuteScanQuery(query, scanSettings).GetValueSync();
- auto rows = CollectRows(it);
-
- PrintRows(Cerr, rows);
- Cerr << "\n";
-
- return rows;
- }
-
- ui64 GetUint32(const NYdb::TValue& v) {
- NYdb::TValueParser value(v);
- if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) {
- return *value.GetOptionalUint32();
- } else {
- return value.GetUint32();
- }
- }
-
- ui64 GetUint64(const NYdb::TValue& v) {
- NYdb::TValueParser value(v);
- if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) {
- return *value.GetOptionalUint64();
- } else {
- return value.GetUint64();
- }
- }
-
- TInstant GetTimestamp(const NYdb::TValue& v) {
- NYdb::TValueParser value(v);
- if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) {
- return *value.GetOptionalTimestamp();
- } else {
- return value.GetTimestamp();
- }
- }
-
void CreateTableOfAllTypes(TKikimrRunner& kikimr) {
auto& legacyClient = kikimr.GetTestClient();
@@ -1218,7 +1373,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TLocalHelper(kikimr).CreateTestOlapTable();
WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2000);
- // EnableDebugLogging(kikimr);
+ //EnableDebugLogging(kikimr);
auto tableClient = kikimr.GetTableClient();
auto selectQuery = TString(R"(
@@ -1928,7 +2083,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetEnableOlapSchemaOperations(true);
TKikimrRunner kikimr(settings);
- EnableDebugLogging(kikimr);
+ //EnableDebugLogging(kikimr);
TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
@@ -3141,6 +3296,92 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
}
+ Y_UNIT_TEST(StatsSysViewEnumStringBytes) {
+ ui64 rawBytesPK1;
+ ui64 bytesPK1;
+ {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false);
+ TKikimrRunner kikimr(settings);
+ EnableDebugLogging(kikimr);
+ TTypedLocalHelper helper("", kikimr, "olapTable", "olapStore12");
+ helper.CreateTestOlapTable();
+ helper.FillPKOnly(0, 800000);
+ helper.GetVolumes(rawBytesPK1, bytesPK1, false);
+ }
+
+ ui64 rawBytesUnpack1PK = 0;
+ ui64 bytesUnpack1PK = 0;
+ ui64 rawBytesPackAndUnpack2PK;
+ ui64 bytesPackAndUnpack2PK;
+ const ui32 rowsCount = 800000;
+ const ui32 groupsCount = 512;
+ {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false);
+ TKikimrRunner kikimr(settings);
+ EnableDebugLogging(kikimr);
+ TTypedLocalHelper helper("Utf8", kikimr);
+ helper.CreateTestOlapTable();
+ NArrow::NConstruction::TStringPoolFiller sPool(groupsCount, 52);
+ helper.FillTable(sPool, 0, rowsCount);
+ helper.PrintCount();
+ {
+ auto d = helper.GetDistribution();
+ Y_VERIFY(d.GetCount() == rowsCount);
+ Y_VERIFY(d.GetGroupsCount() == groupsCount);
+ Y_VERIFY(d.GetMaxCount() - d.GetMinCount() <= 1);
+ }
+ helper.GetVolumes(rawBytesUnpack1PK, bytesUnpack1PK, false);
+ Sleep(TDuration::Seconds(5));
+ auto tableClient = kikimr.GetTableClient();
+ {
+ auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field, LOW_CARDINALITY=`true`);";
+ auto session = tableClient.CreateSession().GetValueSync().GetSession();
+ auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
+ }
+ {
+ auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field1, LOW_CARDINALITY=`true`);";
+ auto session = tableClient.CreateSession().GetValueSync().GetSession();
+ auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::GENERIC_ERROR, alterResult.GetIssues().ToString());
+ }
+ {
+ auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field, LOW_CARDINALITY1=`true`);";
+ auto session = tableClient.CreateSession().GetValueSync().GetSession();
+ auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::GENERIC_ERROR, alterResult.GetIssues().ToString());
+ }
+ Sleep(TDuration::Seconds(5));
+ helper.FillTable(sPool, 1, 800000);
+ Sleep(TDuration::Seconds(5));
+ {
+ helper.GetVolumes(rawBytesPackAndUnpack2PK, bytesPackAndUnpack2PK, false);
+ helper.PrintCount();
+ {
+ auto d = helper.GetDistribution();
+ Cerr << d.DebugString() << Endl;
+ Y_VERIFY(d.GetCount() == 2 * rowsCount);
+ Y_VERIFY(d.GetGroupsCount() == groupsCount);
+ Y_VERIFY(d.GetMaxCount() - d.GetMinCount() <= 2);
+ }
+ }
+ }
+ const ui64 rawBytesUnpack = rawBytesUnpack1PK - rawBytesPK1;
+ const ui64 bytesUnpack = bytesUnpack1PK - bytesPK1;
+ const ui64 rawBytesPack = rawBytesPackAndUnpack2PK - rawBytesUnpack1PK - rawBytesPK1;
+ const ui64 bytesPack = bytesPackAndUnpack2PK - bytesUnpack1PK - bytesPK1;
+ TStringBuilder result;
+ result << "unpacked data: " << rawBytesUnpack << " / " << bytesUnpack << Endl;
+ result << "packed data: " << rawBytesPack << " / " << bytesPack << Endl;
+ result << "frq_diff: " << 1.0 * bytesPack / bytesUnpack << Endl;
+ result << "frq_compression: " << 1.0 * bytesPack / rawBytesPack << Endl;
+ result << "pk_size : " << rawBytesPK1 << " / " << bytesPK1 << Endl;
+ Cerr << result << Endl;
+ Y_VERIFY(bytesPack / bytesUnpack < 0.1);
+ }
+
Y_UNIT_TEST(SelectLimit1ManyShards) {
TPortManager tp;
ui16 mbusport = tp.GetPort(2134);
diff --git a/ydb/core/testlib/cs_helper.cpp b/ydb/core/testlib/cs_helper.cpp
index ae234a1859c..baebf4d7af4 100644
--- a/ydb/core/testlib/cs_helper.cpp
+++ b/ydb/core/testlib/cs_helper.cpp
@@ -26,7 +26,10 @@ void THelperSchemaless::CreateTestOlapStore(TActorId sender, TString scheme) {
Server.GetRuntime()->Send(new IEventHandle(MakeTxProxyID(), sender, request.release()));
auto ev = Server.GetRuntime()->GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
+ Cerr << ev->Get()->Record.DebugString() << Endl;
+ auto status = ev->Get()->Record.GetStatus();
ui64 txId = ev->Get()->Record.GetTxId();
+ UNIT_ASSERT(status != TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecError);
WaitForSchemeOperation(sender, txId);
}
@@ -50,11 +53,12 @@ void THelperSchemaless::CreateTestOlapTable(TActorId sender, TString storeOrDirN
auto ev = Server.GetRuntime()->GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
ui64 txId = ev->Get()->Record.GetTxId();
auto status = ev->Get()->Record.GetStatus();
+ Cerr << ev->Get()->Record.DebugString() << Endl;
UNIT_ASSERT(status != TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecError);
WaitForSchemeOperation(sender, txId);
}
-void THelperSchemaless::SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch) {
+void THelperSchemaless::SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch) const {
auto* runtime = Server.GetRuntime();
UNIT_ASSERT(batch);
@@ -93,14 +97,14 @@ void THelperSchemaless::SendDataViaActorSystem(TString testTable, std::shared_pt
runtime->DispatchEvents(options);
}
-void THelperSchemaless::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
+void THelperSchemaless::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const {
auto batch = TestArrowBatch(pathIdBegin, tsBegin, rowCount);
SendDataViaActorSystem(testTable, batch);
}
//
-std::shared_ptr<arrow::Schema> THelper::GetArrowSchema() {
+std::shared_ptr<arrow::Schema> THelper::GetArrowSchema() const {
std::vector<std::shared_ptr<arrow::Field>> fields;
fields.emplace_back(arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)));
fields.emplace_back(arrow::field("resource_id", arrow::utf8()));
@@ -113,7 +117,7 @@ std::shared_ptr<arrow::Schema> THelper::GetArrowSchema() {
return std::make_shared<arrow::Schema>(std::move(fields));
}
-std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
+std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const {
std::shared_ptr<arrow::Schema> schema = GetArrowSchema();
arrow::TimestampBuilder b1(arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO), arrow::default_memory_pool());
@@ -181,9 +185,35 @@ TString THelper::GetTestTableSchema() const {
return sb;
}
+void THelper::CreateOlapTableWithStore(TString tableName /*= "olapTable"*/, TString storeName /*= "olapStore"*/, ui32 storeShardsCount /*= 4*/, ui32 tableShardsCount /*= 3*/) {
+ TActorId sender = Server.GetRuntime()->AllocateEdgeActor();
+ CreateTestOlapStore(sender, Sprintf(R"(
+ Name: "%s"
+ ColumnShardCount: %d
+ SchemaPresets {
+ Name: "default"
+ Schema {
+ %s
+ }
+ }
+ )", storeName.c_str(), storeShardsCount, GetTestTableSchema().data()));
+
+ const TString shardingColumns = "[\"" + JoinSeq("\",\"", GetShardingColumns()) + "\"]";
+
+ TBase::CreateTestOlapTable(sender, storeName, Sprintf(R"(
+ Name: "%s"
+ ColumnShardCount: %d
+ Sharding {
+ HashSharding {
+ Function: %s
+ Columns: %s
+ }
+ })", tableName.c_str(), tableShardsCount, ShardingMethod.data(), shardingColumns.c_str()));
+}
+
// Clickbench table
-std::shared_ptr<arrow::Schema> TCickBenchHelper::GetArrowSchema() {
+std::shared_ptr<arrow::Schema> TCickBenchHelper::GetArrowSchema() const {
return std::make_shared<arrow::Schema>(
std::vector<std::shared_ptr<arrow::Field>> {
arrow::field("WatchID", arrow::int64()),
@@ -294,7 +324,7 @@ std::shared_ptr<arrow::Schema> TCickBenchHelper::GetArrowSchema() {
});
}
-std::shared_ptr<arrow::RecordBatch> TCickBenchHelper::TestArrowBatch(ui64, ui64 begin, size_t rowCount) {
+std::shared_ptr<arrow::RecordBatch> TCickBenchHelper::TestArrowBatch(ui64, ui64 begin, size_t rowCount) const {
std::shared_ptr<arrow::Schema> schema = GetArrowSchema();
UNIT_ASSERT(schema);
UNIT_ASSERT(schema->num_fields());
@@ -351,7 +381,7 @@ std::shared_ptr<arrow::RecordBatch> TCickBenchHelper::TestArrowBatch(ui64, ui64
// Table with NULLs
-std::shared_ptr<arrow::Schema> TTableWithNullsHelper::GetArrowSchema() {
+std::shared_ptr<arrow::Schema> TTableWithNullsHelper::GetArrowSchema() const {
return std::make_shared<arrow::Schema>(
std::vector<std::shared_ptr<arrow::Field>>{
arrow::field("id", arrow::int32()),
@@ -363,11 +393,11 @@ std::shared_ptr<arrow::Schema> TTableWithNullsHelper::GetArrowSchema() {
});
}
-std::shared_ptr<arrow::RecordBatch> TTableWithNullsHelper::TestArrowBatch() {
+std::shared_ptr<arrow::RecordBatch> TTableWithNullsHelper::TestArrowBatch() const {
return TestArrowBatch(0, 0, 10);
}
-std::shared_ptr<arrow::RecordBatch> TTableWithNullsHelper::TestArrowBatch(ui64, ui64, size_t rowCount) {
+std::shared_ptr<arrow::RecordBatch> TTableWithNullsHelper::TestArrowBatch(ui64, ui64, size_t rowCount) const {
rowCount = 10;
std::shared_ptr<arrow::Schema> schema = GetArrowSchema();
diff --git a/ydb/core/testlib/cs_helper.h b/ydb/core/testlib/cs_helper.h
index f2022674160..0ed78d368f9 100644
--- a/ydb/core/testlib/cs_helper.h
+++ b/ydb/core/testlib/cs_helper.h
@@ -14,21 +14,31 @@ public:
using TBase::TBase;
void CreateTestOlapStore(TActorId sender, TString scheme);
void CreateTestOlapTable(TActorId sender, TString storeOrDirName, TString scheme);
- void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount);
- void SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch);
+ void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const;
+ void SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch) const;
- virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) = 0;
+ virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const = 0;
};
class THelper: public THelperSchemaless {
private:
using TBase = THelperSchemaless;
- std::shared_ptr<arrow::Schema> GetArrowSchema();
+ std::shared_ptr<arrow::Schema> GetArrowSchema() const;
YDB_FLAG_ACCESSOR(WithJsonDocument, false);
+ TString ShardingMethod = "HASH_FUNCTION_CLOUD_LOGS";
+protected:
+ void CreateOlapTableWithStore(TString tableName = "olapTable", TString storeName = "olapStore",
+ ui32 storeShardsCount = 4, ui32 tableShardsCount = 3);
public:
using TBase::TBase;
+ THelper& SetShardingMethod(const TString& value) {
+ Y_VERIFY(value == "HASH_FUNCTION_CLOUD_LOGS" || value == "HASH_FUNCTION_MODULO_N");
+ ShardingMethod = value;
+ return *this;
+ }
+
static constexpr const char * PROTO_SCHEMA = R"(
Columns { Name: "timestamp" Type: "Timestamp" NotNull: true }
#Columns { Name: "resource_type" Type: "Utf8" }
@@ -44,16 +54,19 @@ public:
Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
)";
- TString GetTestTableSchema() const;
+ virtual std::vector<TString> GetShardingColumns() const {
+ return {"timestamp", "uid"};
+ }
+ virtual TString GetTestTableSchema() const;
- std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) override;
+ virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) const override;
};
class TCickBenchHelper: public THelperSchemaless {
private:
using TBase = THelperSchemaless;
- std::shared_ptr<arrow::Schema> GetArrowSchema();
+ std::shared_ptr<arrow::Schema> GetArrowSchema() const;
public:
using TBase::TBase;
@@ -167,14 +180,14 @@ public:
KeyColumnNames: ["EventTime", "EventDate", "CounterID", "UserID", "WatchID"]
)";
- std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64 begin, size_t rowCount) override;
+ std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64 begin, size_t rowCount) const override;
};
class TTableWithNullsHelper: public THelperSchemaless {
private:
using TBase = THelperSchemaless;
- std::shared_ptr<arrow::Schema> GetArrowSchema();
+ std::shared_ptr<arrow::Schema> GetArrowSchema() const;
public:
using TBase::TBase;
@@ -188,8 +201,8 @@ public:
KeyColumnNames: "id"
)";
- std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64, size_t rowCount = 10) override;
- std::shared_ptr<arrow::RecordBatch> TestArrowBatch();
+ std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64, size_t rowCount = 10) const override;
+ std::shared_ptr<arrow::RecordBatch> TestArrowBatch() const;
};
}
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index 970730d3e04..40c3539ec55 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -335,7 +335,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte
LOG_S_DEBUG("WriteIndex (" << blobs.size() << " blobs) at tablet " << TabletID());
Y_VERIFY(!blobs.empty());
- ctx.Register(CreateWriteActor(TabletID(), NOlap::TIndexInfo("dummy", 0), ctx.SelfID,
+ ctx.Register(CreateWriteActor(TabletID(), NOlap::TIndexInfo::BuildDefault(), ctx.SelfID,
BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release()));
}
} else {
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index a565d80ba6d..2e9e316b223 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -145,6 +145,8 @@ public:
struct TIndexInfo : public NTable::TScheme::TTableSchema {
private:
THashMap<ui32, TColumnFeatures> ColumnFeatures;
+ TIndexInfo(const TString& name, ui32 id, bool compositeIndexKey = false);
+ bool DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema);
public:
static constexpr const char* SPEC_COL_PLAN_STEP = "_yql_plan_step";
static constexpr const char* SPEC_COL_TX_ID = "_yql_tx_id";
@@ -177,10 +179,13 @@ public:
}
return true;
}
- TIndexInfo(const TString& name, ui32 id, bool compositeIndexKey = false);
- bool DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema);
public:
+ static TIndexInfo BuildDefault() {
+ TIndexInfo result("dummy", 0, false);
+ return result;
+ }
+
static std::optional<TIndexInfo> BuildFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) {
TIndexInfo result("", 0, schema.GetCompositeMarks());
if (!result.DeserializeFromProto(schema)) {
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 0803566198b..155a8292ea4 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -172,7 +172,7 @@ static const std::vector<std::pair<TString, TTypeInfo>> testKey = {
TIndexInfo TestTableInfo(const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema = testColumns,
const std::vector<std::pair<TString, TTypeInfo>>& key = testKey) {
- TIndexInfo indexInfo("", 0);
+ TIndexInfo indexInfo = TIndexInfo::BuildDefault();
for (ui32 i = 0; i < ydbSchema.size(); ++i) {
ui32 id = i + 1;