aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArtem Zuikov <chertus@gmail.com>2022-06-06 14:52:36 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-06-06 14:52:36 +0300
commit6bc2df4a32091df6c934d8d146b5d62406c3265c (patch)
treed1ec5a9da39e1545a22823b28d0a27c94cc7ecc6
parent076884749756a6f2aed1985190ea4083bc3546c1 (diff)
downloadydb-6bc2df4a32091df6c934d8d146b5d62406c3265c.tar.gz
KIKIMR-11529: support 3+ columns in HASH_FUNCTION_CLOUD_LOGS (Cherry pick commit r9430350)22.2.31
REVIEW: 2526562 x-ydb-stable-ref: 15948ee7d3fc318243b2291d0933a222c1c7a90f
-rw-r--r--ydb/core/formats/sharding.h68
-rw-r--r--ydb/services/ydb/ydb_olapstore_ut.cpp140
2 files changed, 167 insertions, 41 deletions
diff --git a/ydb/core/formats/sharding.h b/ydb/core/formats/sharding.h
index 317ec60295..bd6c7b16cc 100644
--- a/ydb/core/formats/sharding.h
+++ b/ydb/core/formats/sharding.h
@@ -86,30 +86,48 @@ public:
std::vector<ui32> MakeSharding(const std::shared_ptr<arrow::RecordBatch>& batch,
const TVector<TString>& shardingColumns) const
{
- if (shardingColumns.size() != 2) {
+ if (shardingColumns.size() < 2) {
return {};
}
- auto tsFiled = batch->schema()->GetFieldByName(shardingColumns[0]);
- auto uidFiled = batch->schema()->GetFieldByName(shardingColumns[1]);
auto tsArray = batch->GetColumnByName(shardingColumns[0]);
- auto uidArray = batch->GetColumnByName(shardingColumns[1]);
- if (!tsArray || !uidArray || !tsFiled || !uidFiled ||
- (tsFiled->type()->id() != arrow::Type::TIMESTAMP) ||
- (uidFiled->type()->id() != arrow::Type::STRING)) {
+ if (!tsArray || tsArray->type_id() != arrow::Type::TIMESTAMP) {
return {};
}
+ std::vector<std::shared_ptr<arrow::Array>> extraColumns;
+ extraColumns.reserve(shardingColumns.size() - 1);
+
+ for (size_t i = 1; i < shardingColumns.size(); ++i) {
+ auto array = batch->GetColumnByName(shardingColumns[i]);
+ if (!array) {
+ return {};
+ }
+ extraColumns.emplace_back(array);
+ }
+
auto tsColumn = std::static_pointer_cast<arrow::TimestampArray>(tsArray);
- auto uidColumn = std::static_pointer_cast<arrow::StringArray>(uidArray);
std::vector<ui32> out;
out.reserve(batch->num_rows());
- for (int row = 0; row < batch->num_rows(); ++row) {
- ui32 shardNo = ShardNo(tsColumn->Value(row), uidColumn->GetView(row));
- Y_VERIFY(shardNo < ShardsCount);
+ if (extraColumns.size() == 1 && extraColumns[0]->type_id() == arrow::Type::STRING) {
+ auto column = std::static_pointer_cast<arrow::StringArray>(extraColumns[0]);
+
+ for (int row = 0; row < batch->num_rows(); ++row) {
+ ui32 shardNo = ShardNo(tsColumn->Value(row), column->GetView(row));
+ out.emplace_back(shardNo);
+ }
+ } else {
+ std::string concat;
+ for (int row = 0; row < batch->num_rows(); ++row) {
+ concat.clear();
+ for (auto& column : extraColumns) {
+ AppendField(column, row, concat);
+ }
- out.emplace_back(shardNo);
+ ui32 shardNo = ShardNo(tsColumn->Value(row), concat);
+ out.emplace_back(shardNo);
+ }
}
return out;
@@ -120,6 +138,32 @@ private:
ui32 NumActive;
ui64 TsMin;
ui64 ChangePeriod;
+
+ static void AppendField(const std::shared_ptr<arrow::Array>& array, int row, std::string& concat) {
+ NArrow::SwitchType(array->type_id(), [&](const auto& type) {
+ using TWrap = std::decay_t<decltype(type)>;
+ using T = typename TWrap::T;
+ using TArray = typename arrow::TypeTraits<T>::ArrayType;
+
+ if (!array->IsNull(row)) {
+ auto& typedArray = static_cast<const TArray&>(*array);
+ auto value = typedArray.GetView(row);
+ if constexpr (arrow::has_string_view<T>()) {
+ concat.append(value.data(), value.size());
+ } else if constexpr (arrow::has_c_type<T>()) {
+ if constexpr (arrow::is_physical_integer_type<T>()) {
+ concat.append(reinterpret_cast<const char*>(&value), sizeof(value));
+ } else {
+ // Do not use bool or floats for sharding
+ static_assert(arrow::is_boolean_type<T>() || arrow::is_floating_type<T>());
+ }
+ } else {
+ static_assert(arrow::is_decimal_type<T>());
+ }
+ }
+ return true;
+ });
+ }
};
}
diff --git a/ydb/services/ydb/ydb_olapstore_ut.cpp b/ydb/services/ydb/ydb_olapstore_ut.cpp
index 6a61447fe9..e07d9459f1 100644
--- a/ydb/services/ydb/ydb_olapstore_ut.cpp
+++ b/ydb/services/ydb/ydb_olapstore_ut.cpp
@@ -12,6 +12,13 @@
using namespace NYdb;
+namespace {
+std::vector<TString> testShardingVariants = {
+ R"(["timestamp", "uid"])",
+ R"(["timestamp", "resource_type", "resource_id", "uid"])"
+};
+}
+
Y_UNIT_TEST_SUITE(YdbOlapStore) {
NMiniKQL::IFunctionRegistry* UdfFrFactory(const NKikimr::NScheme::TTypeRegistry& typeRegistry) {
@@ -40,10 +47,11 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
return connection;
}
- void CreateOlapTable(const TServerSettings& settings, const TString& tableName, ui32 shards = 2) {
+ void CreateOlapTable(const TServerSettings& settings, const TString& tableName, ui32 numShards = 2,
+ const TString shardingColumns = R"(["timestamp", "uid"])")
+ {
const char * tableDescr = R"(
Name: "OlapStore"
- #MetaShardCount: 1
ColumnShardCount: 4
SchemaPresets {
Name: "default"
@@ -67,16 +75,18 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
TClient annoyingClient(settings);
NMsgBusProxy::EResponseStatus status = annoyingClient.CreateOlapStore("/Root", tableDescr);
UNIT_ASSERT_VALUES_EQUAL(status, NMsgBusProxy::EResponseStatus::MSTATUS_OK);
+
status = annoyingClient.CreateOlapTable("/Root/OlapStore", Sprintf(R"(
Name: "%s"
ColumnShardCount : %d
Sharding {
HashSharding {
Function: HASH_FUNCTION_CLOUD_LOGS
- Columns: ["timestamp", "uid"]
+ Columns: %s
}
}
- )", tableName.c_str(), shards));
+ )", tableName.c_str(), numShards, shardingColumns.c_str()));
+
UNIT_ASSERT_VALUES_EQUAL(status, NMsgBusProxy::EResponseStatus::MSTATUS_OK);
}
@@ -197,8 +207,8 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
}
// Create OLTP and OLAP tables with the same set of columns and same PK
- void CreateTestTables(const TServerSettings& settings, const TString& tableName) {
- CreateOlapTable(settings, tableName);
+ void CreateTestTables(const TServerSettings& settings, const TString& tableName, const TString& sharding) {
+ CreateOlapTable(settings, tableName, 2, sharding);
CreateTable(settings, "oltp_" + tableName);
}
@@ -273,7 +283,7 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
}
}
- Y_UNIT_TEST(ManyTables) {
+ void TestManyTables(const TString& sharding) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableFeatureFlags()->SetEnableOlapSchemaOperations(true);
TKikimrWithGrpcAndRootSchema server(appConfig);
@@ -281,9 +291,9 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
auto connection = ConnectToServer(server);
- CreateTestTables(*server.ServerSettings, "log1");
- CreateTestTables(*server.ServerSettings, "log2");
- CreateTestTables(*server.ServerSettings, "log3");
+ CreateTestTables(*server.ServerSettings, "log1", sharding);
+ CreateTestTables(*server.ServerSettings, "log2", sharding);
+ CreateTestTables(*server.ServerSettings, "log3", sharding);
size_t rowCount = WriteTestRows(connection, "log1", 0, 1, 50);
UNIT_ASSERT_VALUES_EQUAL(rowCount, 50);
@@ -305,7 +315,13 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
CompareQueryResults(connection, "log1", "SELECT count(*) FROM <TABLE>;");
}
- Y_UNIT_TEST(DuplicateRows) {
+ Y_UNIT_TEST(ManyTables) {
+ for (auto& sharding : testShardingVariants) {
+ TestManyTables(sharding);
+ }
+ }
+
+ void TestDuplicateRows(const TString& sharding) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableFeatureFlags()->SetEnableOlapSchemaOperations(true);
TKikimrWithGrpcAndRootSchema server(appConfig);
@@ -314,7 +330,7 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
auto connection = ConnectToServer(server);
NYdb::NTable::TTableClient client(connection);
- CreateOlapTable(*server.ServerSettings, "log1");
+ CreateOlapTable(*server.ServerSettings, "log1", 2, sharding);
const ui64 batchCount = 100;
const ui64 batchSize = 1000;
@@ -350,14 +366,20 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
}
}
- void TestQuery(const TString& query) {
+ Y_UNIT_TEST(DuplicateRows) {
+ for (auto& sharding : testShardingVariants) {
+ TestDuplicateRows(sharding);
+ }
+ }
+
+ void TestQuery(const TString& query, const TString& sharding) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableFeatureFlags()->SetEnableOlapSchemaOperations(true);
TKikimrWithGrpcAndRootSchema server(appConfig, {}, {}, false, &UdfFrFactory);
auto connection = ConnectToServer(server);
- CreateTestTables(*server.ServerSettings, "log1");
+ CreateTestTables(*server.ServerSettings, "log1", sharding);
// EnableDebugLogs(server);
@@ -375,46 +397,62 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
}
Y_UNIT_TEST(LogLast50) {
- TestQuery(R"(
+ TString query(R"(
SELECT `timestamp`, `resource_type`, `resource_id`, `uid`, `level`, `message`
FROM <TABLE>
ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC
LIMIT 50
)");
+
+ for (auto& sharding : testShardingVariants) {
+ TestQuery(query, sharding);
+ }
}
Y_UNIT_TEST(LogLast50ByResource) {
- TestQuery(R"(
+ TString query(R"(
SELECT `timestamp`, `resource_type`, `resource_id`, `uid`, `level`, `message`
FROM <TABLE>
WHERE resource_type == 'app' AND resource_id == 'resource_1'
ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC
LIMIT 50
)");
+
+ for (auto& sharding : testShardingVariants) {
+ TestQuery(query, sharding);
+ }
}
Y_UNIT_TEST(LogGrepNonExisting) {
- TestQuery(R"(
+ TString query(R"(
SELECT `timestamp`, `resource_type`, `resource_id`, `uid`, `level`, `message`
FROM <TABLE>
WHERE message LIKE '%non-exisiting string%'
ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC
LIMIT 50
)");
+
+ for (auto& sharding : testShardingVariants) {
+ TestQuery(query, sharding);
+ }
}
Y_UNIT_TEST(LogGrepExisting) {
- TestQuery(R"(
+ TString query(R"(
SELECT `timestamp`, `resource_type`, `resource_id`, `uid`, `level`, `message`
FROM <TABLE>
WHERE message LIKE '%message%'
ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC
LIMIT 50
)");
+
+ for (auto& sharding : testShardingVariants) {
+ TestQuery(query, sharding);
+ }
}
Y_UNIT_TEST(LogNonExistingRequest) {
- TestQuery(R"(
+ TString query(R"(
$request_id = '0xfaceb00c';
SELECT `timestamp`, `resource_type`, `resource_id`, `uid`, `level`, `message`, `request_id`
@@ -423,10 +461,14 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC
LIMIT 50
)");
+
+ for (auto& sharding : testShardingVariants) {
+ TestQuery(query, sharding);
+ }
}
Y_UNIT_TEST(LogExistingRequest) {
- TestQuery(R"(
+ TString query(R"(
$request_id = '1f';
SELECT `timestamp`, `resource_type`, `resource_id`, `uid`, `level`, `message`, `request_id`
@@ -435,10 +477,14 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC
LIMIT 50
)");
+
+ for (auto& sharding : testShardingVariants) {
+ TestQuery(query, sharding);
+ }
}
Y_UNIT_TEST(LogNonExistingUserId) {
- TestQuery(R"(
+ TString query(R"(
$user_id = '111';
SELECT `timestamp`, `resource_type`, `resource_id`, `uid`, `level`, `message`, `json_payload`
@@ -447,10 +493,14 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC
LIMIT 50
)");
+
+ for (auto& sharding : testShardingVariants) {
+ TestQuery(query, sharding);
+ }
}
Y_UNIT_TEST(LogExistingUserId) {
- TestQuery(R"(
+ TString query(R"(
$user_id = '1000042';
SELECT `timestamp`, `resource_type`, `resource_id`, `uid`, `level`, `message`, `json_payload`
@@ -459,10 +509,14 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC
LIMIT 50
)");
+
+ for (auto& sharding : testShardingVariants) {
+ TestQuery(query, sharding);
+ }
}
Y_UNIT_TEST(LogPagingBefore) {
- TestQuery(R"(
+ TString query(R"(
PRAGMA kikimr.OptEnablePredicateExtract = "true";
$ts = CAST(3000000 AS Timestamp);
@@ -477,10 +531,14 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC
LIMIT 50
)");
+
+ for (auto& sharding : testShardingVariants) {
+ TestQuery(query, sharding);
+ }
}
Y_UNIT_TEST(LogPagingBetween) {
- TestQuery(R"(
+ TString query(R"(
PRAGMA kikimr.OptEnablePredicateExtract = "true";
$ts1 = CAST(2500000 AS Timestamp);
@@ -501,10 +559,14 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC
LIMIT 50
)");
+
+ for (auto& sharding : testShardingVariants) {
+ TestQuery(query, sharding);
+ }
}
Y_UNIT_TEST(LogPagingAfter) {
- TestQuery(R"(
+ TString query(R"(
PRAGMA kikimr.OptEnablePredicateExtract = "true";
$ts = CAST(3000000 AS Timestamp);
@@ -525,19 +587,27 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
FROM $next50
ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC;
)");
+
+ for (auto& sharding : testShardingVariants) {
+ TestQuery(query, sharding);
+ }
}
Y_UNIT_TEST(LogCountByResource) {
- TestQuery(R"(
+ TString query(R"(
SELECT count(*)
FROM <TABLE>
WHERE resource_type == 'app' AND resource_id == 'resource_1'
LIMIT 50
)");
+
+ for (auto& sharding : testShardingVariants) {
+ TestQuery(query, sharding);
+ }
}
Y_UNIT_TEST(LogWithUnionAllAscending) {
- TestQuery(R"(
+ TString query(R"(
PRAGMA AnsiInForEmptyOrNullableItemsCollections;
$until = CAST(4100000 AS Timestamp);
@@ -562,10 +632,14 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
$data = (SELECT * FROM $part0 UNION ALL SELECT * FROM $part1 UNION ALL SELECT * FROM $part2 UNION ALL SELECT * FROM $part3 UNION ALL SELECT * FROM $part4 UNION ALL SELECT * FROM $part5 UNION ALL SELECT * FROM $part6);
SELECT * FROM $data ORDER BY `timestamp` ASC, `resource_type` ASC, `resource_id` ASC, `uid` ASC LIMIT $limit;
)");
+
+ for (auto& sharding : testShardingVariants) {
+ TestQuery(query, sharding);
+ }
}
Y_UNIT_TEST(LogWithUnionAllDescending) {
- TestQuery(R"(
+ TString query(R"(
PRAGMA AnsiInForEmptyOrNullableItemsCollections;
$until = CAST(4093000 AS Timestamp);
@@ -590,10 +664,14 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
$data = (SELECT * FROM $part0 UNION ALL SELECT * FROM $part1 UNION ALL SELECT * FROM $part2 UNION ALL SELECT * FROM $part3 UNION ALL SELECT * FROM $part4 UNION ALL SELECT * FROM $part5 UNION ALL SELECT * FROM $part6);
SELECT * FROM $data ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC LIMIT $limit;
)");
+
+ for (auto& sharding : testShardingVariants) {
+ TestQuery(query, sharding);
+ }
}
Y_UNIT_TEST(LogTsRangeDescending) {
- TestQuery(R"(
+ TString query(R"(
--PRAGMA AnsiInForEmptyOrNullableItemsCollections;
$until = CAST(4093000 AS Timestamp);
@@ -608,5 +686,9 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
`timestamp` >= $since
ORDER BY `timestamp` DESC, `resource_type` DESC, `resource_id` DESC, `uid` DESC LIMIT $limit;
)");
+
+ for (auto& sharding : testShardingVariants) {
+ TestQuery(query, sharding);
+ }
}
}