diff options
author | Artem Zuikov <chertus@gmail.com> | 2022-06-06 14:52:36 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-06-06 14:52:36 +0300 |
commit | 6bc2df4a32091df6c934d8d146b5d62406c3265c (patch) | |
tree | d1ec5a9da39e1545a22823b28d0a27c94cc7ecc6 | |
parent | 076884749756a6f2aed1985190ea4083bc3546c1 (diff) | |
download | ydb-6bc2df4a32091df6c934d8d146b5d62406c3265c.tar.gz |
KIKIMR-11529: support 3+ columns in HASH_FUNCTION_CLOUD_LOGS (Cherry pick commit r9430350)22.2.31
REVIEW: 2526562
x-ydb-stable-ref: 15948ee7d3fc318243b2291d0933a222c1c7a90f
-rw-r--r-- | ydb/core/formats/sharding.h | 68 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_olapstore_ut.cpp | 140 |
2 files changed, 167 insertions, 41 deletions
diff --git a/ydb/core/formats/sharding.h b/ydb/core/formats/sharding.h index 317ec60295..bd6c7b16cc 100644 --- a/ydb/core/formats/sharding.h +++ b/ydb/core/formats/sharding.h @@ -86,30 +86,48 @@ public: std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch, const TVector<TString>& shardingColumns) const { - if (shardingColumns.size() != 2) { + if (shardingColumns.size() < 2) { return {}; } - auto tsFiled = batch->schema()->GetFieldByName(shardingColumns[0]); - auto uidFiled = batch->schema()->GetFieldByName(shardingColumns[1]); auto tsArray = batch->GetColumnByName(shardingColumns[0]); - auto uidArray = batch->GetColumnByName(shardingColumns[1]); - if (!tsArray || !uidArray || !tsFiled || !uidFiled || - (tsFiled->type()->id() != arrow::Type::TIMESTAMP) || - (uidFiled->type()->id() != arrow::Type::STRING)) { + if (!tsArray || tsArray->type_id() != arrow::Type::TIMESTAMP) { return {}; } + std::vector<std::shared_ptr<arrow::Array>> extraColumns; + extraColumns.reserve(shardingColumns.size() - 1); + + for (size_t i = 1; i < shardingColumns.size(); ++i) { + auto array = batch->GetColumnByName(shardingColumns[i]); + if (!array) { + return {}; + } + extraColumns.emplace_back(array); + } + auto tsColumn = std::static_pointer_cast<arrow::TimestampArray>(tsArray); - auto uidColumn = std::static_pointer_cast<arrow::StringArray>(uidArray); std::vector<ui32> out; out.reserve(batch->num_rows()); - for (int row = 0; row < batch->num_rows(); ++row) { - ui32 shardNo = ShardNo(tsColumn->Value(row), uidColumn->GetView(row)); - Y_VERIFY(shardNo < ShardsCount); + if (extraColumns.size() == 1 && extraColumns[0]->type_id() == arrow::Type::STRING) { + auto column = std::static_pointer_cast<arrow::StringArray>(extraColumns[0]); + + for (int row = 0; row < batch->num_rows(); ++row) { + ui32 shardNo = ShardNo(tsColumn->Value(row), column->GetView(row)); + out.emplace_back(shardNo); + } + } else { + std::string concat; + for (int row = 0; row < batch->num_rows(); ++row) { + concat.clear(); + for (auto& column : extraColumns) { + AppendField(column, row, concat); + } - out.emplace_back(shardNo); + ui32 shardNo = ShardNo(tsColumn->Value(row), concat); + out.emplace_back(shardNo); + } } return out; @@ -120,6 +138,32 @@ private: ui32 NumActive; ui64 TsMin; ui64 ChangePeriod; + + static void AppendField(const std::shared_ptr<arrow::Array>& array, int row, std::string& concat) { + NArrow::SwitchType(array->type_id(), [&](const auto& type) { + using TWrap = std::decay_t<decltype(type)>; + using T = typename TWrap::T; + using TArray = typename arrow::TypeTraits<T>::ArrayType; + + if (!array->IsNull(row)) { + auto& typedArray = static_cast<const TArray&>(*array); + auto value = typedArray.GetView(row); + if constexpr (arrow::has_string_view<T>()) { + concat.append(value.data(), value.size()); + } else if constexpr (arrow::has_c_type<T>()) { + if constexpr (arrow::is_physical_integer_type<T>()) { + concat.append(reinterpret_cast<const char*>(&value), sizeof(value)); + } else { + // Do not use bool or floats for sharding + static_assert(arrow::is_boolean_type<T>() || arrow::is_floating_type<T>()); + } + } else { + static_assert(arrow::is_decimal_type<T>()); + } + } + return true; + }); + } }; } diff --git a/ydb/services/ydb/ydb_olapstore_ut.cpp b/ydb/services/ydb/ydb_olapstore_ut.cpp index 6a61447fe9..e07d9459f1 100644 --- a/ydb/services/ydb/ydb_olapstore_ut.cpp +++ b/ydb/services/ydb/ydb_olapstore_ut.cpp @@ -12,6 +12,13 @@ using namespace NYdb; +namespace { +std::vector<TString> testShardingVariants = { + R"(["timestamp", "uid"])", + R"(["timestamp", "resource_type", "resource_id", "uid"])" +}; +} + Y_UNIT_TEST_SUITE(YdbOlapStore) { NMiniKQL::IFunctionRegistry* UdfFrFactory(const NKikimr::NScheme::TTypeRegistry& typeRegistry) { @@ -40,10 +47,11 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { return connection; } - void CreateOlapTable(const TServerSettings& settings, const TString& tableName, ui32 shards = 2) { + void CreateOlapTable(const TServerSettings& settings, const TString& tableName, ui32 numShards = 2, + const TString shardingColumns = R"(["timestamp", "uid"])") + { const char * tableDescr = R"( Name: "OlapStore" - #MetaShardCount: 1 ColumnShardCount: 4 SchemaPresets { Name: "default" @@ -67,16 +75,18 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { TClient annoyingClient(settings); NMsgBusProxy::EResponseStatus status = annoyingClient.CreateOlapStore("/Root", tableDescr); UNIT_ASSERT_VALUES_EQUAL(status, NMsgBusProxy::EResponseStatus::MSTATUS_OK); + status = annoyingClient.CreateOlapTable("/Root/OlapStore", Sprintf(R"( Name: "%s" ColumnShardCount : %d Sharding { HashSharding { Function: HASH_FUNCTION_CLOUD_LOGS - Columns: ["timestamp", "uid"] + Columns: %s } } - )", tableName.c_str(), shards)); + )", tableName.c_str(), numShards, shardingColumns.c_str())); + UNIT_ASSERT_VALUES_EQUAL(status, NMsgBusProxy::EResponseStatus::MSTATUS_OK); } @@ -197,8 +207,8 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { } // Create OLTP and OLAP tables with the same set of columns and same PK - void CreateTestTables(const TServerSettings& settings, const TString& tableName) { - CreateOlapTable(settings, tableName); + void CreateTestTables(const TServerSettings& settings, const TString& tableName, const TString& sharding) { + CreateOlapTable(settings, tableName, 2, sharding); CreateTable(settings, "oltp_" + tableName); } @@ -273,7 +283,7 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { } } - Y_UNIT_TEST(ManyTables) { + void TestManyTables(const TString& sharding) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableFeatureFlags()->SetEnableOlapSchemaOperations(true); TKikimrWithGrpcAndRootSchema server(appConfig); @@ -281,9 +291,9 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { auto connection = ConnectToServer(server); - CreateTestTables(*server.ServerSettings, "log1"); - CreateTestTables(*server.ServerSettings, "log2"); - CreateTestTables(*server.ServerSettings, "log3"); + CreateTestTables(*server.ServerSettings, "log1", sharding); + CreateTestTables(*server.ServerSettings, "log2", sharding); + CreateTestTables(*server.ServerSettings, "log3", sharding); size_t rowCount = WriteTestRows(connection, "log1", 0, 1, 50); UNIT_ASSERT_VALUES_EQUAL(rowCount, 50); @@ -305,7 +315,13 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { CompareQueryResults(connection, "log1", "SELECT count(*) FROM <TABLE>;"); } - Y_UNIT_TEST(DuplicateRows) { + Y_UNIT_TEST(ManyTables) { + for (auto& sharding : testShardingVariants) { + TestManyTables(sharding); + } + } + + void TestDuplicateRows(const TString& sharding) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableFeatureFlags()->SetEnableOlapSchemaOperations(true); TKikimrWithGrpcAndRootSchema server(appConfig); @@ -314,7 +330,7 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { auto connection = ConnectToServer(server); NYdb::NTable::TTableClient client(connection); - CreateOlapTable(*server.ServerSettings, "log1"); + CreateOlapTable(*server.ServerSettings, "log1", 2, sharding); const ui64 batchCount = 100; const ui64 batchSize = 1000; @@ -350,14 +366,20 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { } } - void TestQuery(const TString& query) { + Y_UNIT_TEST(DuplicateRows) { + for (auto& sharding : testShardingVariants) { + TestDuplicateRows(sharding); + } + } + + void TestQuery(const TString& query, const TString& sharding) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableFeatureFlags()->SetEnableOlapSchemaOperations(true); TKikimrWithGrpcAndRootSchema server(appConfig, {}, {}, false, &UdfFrFactory); auto connection = ConnectToServer(server); - CreateTestTables(*server.ServerSettings, "log1"); + CreateTestTables(*server.ServerSettings, "log1", sharding); // EnableDebugLogs(server); @@ -375,46 +397,62 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { } Y_UNIT_TEST(LogLast50) { - TestQuery(R"( + TString query(R"( SELECT `timestamp`, `resource_type`, `resource_id`, `uid`, `level`, `message` FROM <TABLE> ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC LIMIT 50 )"); + + for (auto& sharding : testShardingVariants) { + TestQuery(query, sharding); + } } Y_UNIT_TEST(LogLast50ByResource) { - TestQuery(R"( + TString query(R"( SELECT `timestamp`, `resource_type`, `resource_id`, `uid`, `level`, `message` FROM <TABLE> WHERE resource_type == 'app' AND resource_id == 'resource_1' ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC LIMIT 50 )"); + + for (auto& sharding : testShardingVariants) { + TestQuery(query, sharding); + } } Y_UNIT_TEST(LogGrepNonExisting) { - TestQuery(R"( + TString query(R"( SELECT `timestamp`, `resource_type`, `resource_id`, `uid`, `level`, `message` FROM <TABLE> WHERE message LIKE '%non-exisiting string%' ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC LIMIT 50 )"); + + for (auto& sharding : testShardingVariants) { + TestQuery(query, sharding); + } } Y_UNIT_TEST(LogGrepExisting) { - TestQuery(R"( + TString query(R"( SELECT `timestamp`, `resource_type`, `resource_id`, `uid`, `level`, `message` FROM <TABLE> WHERE message LIKE '%message%' ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC LIMIT 50 )"); + + for (auto& sharding : testShardingVariants) { + TestQuery(query, sharding); + } } Y_UNIT_TEST(LogNonExistingRequest) { - TestQuery(R"( + TString query(R"( $request_id = '0xfaceb00c'; SELECT `timestamp`, `resource_type`, `resource_id`, `uid`, `level`, `message`, `request_id` @@ -423,10 +461,14 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC LIMIT 50 )"); + + for (auto& sharding : testShardingVariants) { + TestQuery(query, sharding); + } } Y_UNIT_TEST(LogExistingRequest) { - TestQuery(R"( + TString query(R"( $request_id = '1f'; SELECT `timestamp`, `resource_type`, `resource_id`, `uid`, `level`, `message`, `request_id` @@ -435,10 +477,14 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC LIMIT 50 )"); + + for (auto& sharding : testShardingVariants) { + TestQuery(query, sharding); + } } Y_UNIT_TEST(LogNonExistingUserId) { - TestQuery(R"( + TString query(R"( $user_id = '111'; SELECT `timestamp`, `resource_type`, `resource_id`, `uid`, `level`, `message`, `json_payload` @@ -447,10 +493,14 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC LIMIT 50 )"); + + for (auto& sharding : testShardingVariants) { + TestQuery(query, sharding); + } } Y_UNIT_TEST(LogExistingUserId) { - TestQuery(R"( + TString query(R"( $user_id = '1000042'; SELECT `timestamp`, `resource_type`, `resource_id`, `uid`, `level`, `message`, `json_payload` @@ -459,10 +509,14 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC LIMIT 50 )"); + + for (auto& sharding : testShardingVariants) { + TestQuery(query, sharding); + } } Y_UNIT_TEST(LogPagingBefore) { - TestQuery(R"( + TString query(R"( PRAGMA kikimr.OptEnablePredicateExtract = "true"; $ts = CAST(3000000 AS Timestamp); @@ -477,10 +531,14 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC LIMIT 50 )"); + + for (auto& sharding : testShardingVariants) { + TestQuery(query, sharding); + } } Y_UNIT_TEST(LogPagingBetween) { - TestQuery(R"( + TString query(R"( PRAGMA kikimr.OptEnablePredicateExtract = "true"; $ts1 = CAST(2500000 AS Timestamp); @@ -501,10 +559,14 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC LIMIT 50 )"); + + for (auto& sharding : testShardingVariants) { + TestQuery(query, sharding); + } } Y_UNIT_TEST(LogPagingAfter) { - TestQuery(R"( + TString query(R"( PRAGMA kikimr.OptEnablePredicateExtract = "true"; $ts = CAST(3000000 AS Timestamp); @@ -525,19 +587,27 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { FROM $next50 ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC; )"); + + for (auto& sharding : testShardingVariants) { + TestQuery(query, sharding); + } } Y_UNIT_TEST(LogCountByResource) { - TestQuery(R"( + TString query(R"( SELECT count(*) FROM <TABLE> WHERE resource_type == 'app' AND resource_id == 'resource_1' LIMIT 50 )"); + + for (auto& sharding : testShardingVariants) { + TestQuery(query, sharding); + } } Y_UNIT_TEST(LogWithUnionAllAscending) { - TestQuery(R"( + TString query(R"( PRAGMA AnsiInForEmptyOrNullableItemsCollections; $until = CAST(4100000 AS Timestamp); @@ -562,10 +632,14 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { $data = (SELECT * FROM $part0 UNION ALL SELECT * FROM $part1 UNION ALL SELECT * FROM $part2 UNION ALL SELECT * FROM $part3 UNION ALL SELECT * FROM $part4 UNION ALL SELECT * FROM $part5 UNION ALL SELECT * FROM $part6); SELECT * FROM $data ORDER BY `timestamp` ASC, `resource_type` ASC, `resource_id` ASC, `uid` ASC LIMIT $limit; )"); + + for (auto& sharding : testShardingVariants) { + TestQuery(query, sharding); + } } Y_UNIT_TEST(LogWithUnionAllDescending) { - TestQuery(R"( + TString query(R"( PRAGMA AnsiInForEmptyOrNullableItemsCollections; $until = CAST(4093000 AS Timestamp); @@ -590,10 +664,14 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { $data = (SELECT * FROM $part0 UNION ALL SELECT * FROM $part1 UNION ALL SELECT * FROM $part2 UNION ALL SELECT * FROM $part3 UNION ALL SELECT * FROM $part4 UNION ALL SELECT * FROM $part5 UNION ALL SELECT * FROM $part6); SELECT * FROM $data ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC LIMIT $limit; )"); + + for (auto& sharding : testShardingVariants) { + TestQuery(query, sharding); + } } Y_UNIT_TEST(LogTsRangeDescending) { - TestQuery(R"( + TString query(R"( --PRAGMA AnsiInForEmptyOrNullableItemsCollections; $until = CAST(4093000 AS Timestamp); @@ -608,5 +686,9 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { `timestamp` >= $since ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC LIMIT $limit; )"); + + for (auto& sharding : testShardingVariants) { + TestQuery(query, sharding); + } } } |