aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2022-12-30 12:17:45 +0300
committerchertus <azuikov@ydb.tech>2022-12-30 12:17:45 +0300
commit6629b1db31555de019a6363e912da8c2324c9408 (patch)
tree0a1b12e93fcb5b7d858a5c237638f27df01e586a
parentd2bfeb9534f9d2c12073a0adda43d2861ac9bbbb (diff)
downloadydb-6629b1db31555de019a6363e912da8c2324c9408.tar.gz
smoke test for some ClickBench queries
-rw-r--r--ydb/core/grpc_services/rpc_long_tx.cpp29
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp192
-rw-r--r--ydb/core/testlib/cs_helper.cpp237
-rw-r--r--ydb/core/testlib/cs_helper.h158
-rw-r--r--ydb/core/tx/tiering/ut/ut_tiers.cpp15
5 files changed, 545 insertions, 86 deletions
diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp
index 3ca643789a0..fe034e22572 100644
--- a/ydb/core/grpc_services/rpc_long_tx.cpp
+++ b/ydb/core/grpc_services/rpc_long_tx.cpp
@@ -68,12 +68,14 @@ class TFullSplitData {
private:
ui32 ShardsCount = 0;
THashMap<ui64, TShardInfo> ShardsInfo;
+
public:
- TFullSplitData(const ui32 shardsCount)
- : ShardsCount(shardsCount)
- {
+ TString ErrorString;
- }
+ TFullSplitData(const ui32 shardsCount, TString errString = {})
+ : ShardsCount(shardsCount)
+ , ErrorString(errString)
+ {}
const THashMap<ui64, TShardInfo>& GetShardsInfo() const {
return ShardsInfo;
@@ -123,6 +125,14 @@ TFullSplitData SplitData(const std::shared_ptr<arrow::RecordBatch>& batch,
}
if (rowSharding.empty()) {
+ result.ErrorString = "empty "
+ + NKikimrSchemeOp::TColumnTableSharding::THashSharding::EHashFunction_Name(hashSharding.GetFunction())
+ + " sharding";
+ for (auto& column : shardingColumns) {
+ if (batch->schema()->GetFieldIndex(column) < 0) {
+ result.ErrorString += ", no column '" + column + "'";
+ }
+ }
return result;
}
@@ -149,8 +159,13 @@ TFullSplitData SplitData(const TString& data, const NKikimrSchemeOp::TColumnTabl
std::shared_ptr<arrow::Schema> schema = ExtractArrowSchema(olapSchema);
std::shared_ptr<arrow::RecordBatch> batch = NArrow::DeserializeBatch(data, schema);
- if (!batch || !batch->ValidateFull().ok()) {
- return TFullSplitData(0);
+ if (!batch) {
+ return TFullSplitData(0, TString("cannot deserialize batch with schema ") + schema->ToString());
+ }
+
+ auto res = batch->ValidateFull();
+ if (!res.ok()) {
+ return TFullSplitData(0, TString("deserialize batch is not valid: ") + res.ToString());
}
return SplitData(batch, description);
}
@@ -465,7 +480,7 @@ protected:
SplitData(GetDeserializedBatch(), description) :
SplitData(GetSerializedData(), description);
if (batches.GetShardsInfo().empty()) {
- return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Cannot deserialize or split input data");
+ return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Input data sharding error: " + batches.ErrorString);
}
ui32 sumBytes = 0;
ui32 rowsCount = 0;
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index 26a560fcc4a..b4edb0db146 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -49,8 +49,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
runtime->SetLogPriority(NKikimrServices::KQP_RESOURCE_MANAGER, NActors::NLog::PRI_DEBUG);
//runtime->SetLogPriority(NKikimrServices::LONG_TX_SERVICE, NActors::NLog::PRI_DEBUG);
runtime->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_TRACE);
- //runtime->SetLogPriority(NKikimrServices::TX_COLUMNSHARD_SCAN, NActors::NLog::PRI_DEBUG);
- //runtime->SetLogPriority(NKikimrServices::TX_OLAPSHARD, NActors::NLog::PRI_DEBUG);
+ runtime->SetLogPriority(NKikimrServices::TX_COLUMNSHARD_SCAN, NActors::NLog::PRI_DEBUG);
//runtime->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_DEBUG);
//runtime->SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_DEBUG);
//runtime->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG);
@@ -69,26 +68,15 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") {
TActorId sender = Server.GetRuntime()->AllocateEdgeActor();
CreateTestOlapStore(sender, Sprintf(R"(
- Name: "%s"
- ColumnShardCount: %d
- SchemaPresets {
- Name: "default"
- Schema {
- Columns { Name: "timestamp" Type: "Timestamp" NotNull: true }
- #Columns { Name: "resource_type" Type: "Utf8" }
- Columns { Name: "resource_id" Type: "Utf8" }
- Columns { Name: "uid" Type: "Utf8" }
- Columns { Name: "level" Type: "Int32" }
- Columns { Name: "message" Type: "Utf8" }
- #Columns { Name: "json_payload" Type: "Json" }
- #Columns { Name: "ingested_at" Type: "Timestamp" }
- #Columns { Name: "saved_at" Type: "Timestamp" }
- #Columns { Name: "request_id" Type: "Utf8" }
- KeyColumnNames: "timestamp"
- Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
- }
- }
- )", storeName.c_str(), storeShardsCount));
+ Name: "%s"
+ ColumnShardCount: %d
+ SchemaPresets {
+ Name: "default"
+ Schema {
+ %s
+ }
+ }
+ )", storeName.c_str(), storeShardsCount, PROTO_SCHEMA));
TString shardingColumns = "[\"timestamp\", \"uid\"]";
if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") {
@@ -113,18 +101,37 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
};
- std::shared_ptr<arrow::Schema> GetArrowSchema() {
- return std::make_shared<arrow::Schema>(
- std::vector<std::shared_ptr<arrow::Field>>{
- arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)),
- arrow::field("resource_id", arrow::utf8()),
- arrow::field("uid", arrow::utf8()),
- arrow::field("level", arrow::int32()),
- arrow::field("message", arrow::utf8())
- });
- }
+ class TClickHelper : public Tests::NCS::TCickBenchHelper {
+ private:
+ using TBase = Tests::NCS::TCickBenchHelper;
+ public:
+ using TBase::TBase;
+
+ TClickHelper(TKikimrRunner& runner)
+ : TBase(runner.GetTestServer())
+ {}
+
+ void CreateClickBenchTable(TString tableName = "benchTable", ui32 shardsCount = 4) {
+ TActorId sender = Server.GetRuntime()->AllocateEdgeActor();
+
+ TBase::CreateTestOlapTable(sender, "", Sprintf(R"(
+ Name: "%s"
+ ColumnShardCount: %d
+ Schema {
+ %s
+ }
+ Sharding {
+ HashSharding {
+ Function: HASH_FUNCTION_MODULO_N
+ Columns: "EventTime"
+ }
+ })", tableName.c_str(), shardsCount, PROTO_SCHEMA));
+ }
+ };
void WriteTestData(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
+ UNIT_ASSERT(testTable != "/Root/benchTable"); // TODO: check schema instead
+
TLocalHelper lHelper(kikimr.GetTestServer());
NYdb::NLongTx::TClient client(kikimr.GetDriver());
@@ -132,7 +139,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString());
auto txId = resBeginTx.GetResult().tx_id();
- TString data = lHelper.TestBlob(pathIdBegin, tsBegin, rowCount);
+ auto batch = lHelper.TestArrowBatch(pathIdBegin, tsBegin, rowCount);
+ TString data = NArrow::SerializeBatchNoCompression(batch);
NLongTx::TLongTxWriteResult resWrite =
client.Write(txId, testTable, txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync();
@@ -1571,6 +1579,66 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TestAggregationsInternal(cases);
}
+ void TestClickBench(const std::vector<TAggregationTestCase>& cases) {
+ TPortManager tp;
+ ui16 mbusport = tp.GetPort(2134);
+ auto settings = Tests::TServerSettings(mbusport)
+ .SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetNodeCount(2);
+
+ Tests::TServer::TPtr server = new Tests::TServer(settings);
+
+ auto runtime = server->GetRuntime();
+ auto sender = runtime->AllocateEdgeActor();
+
+ InitRoot(server, sender);
+ EnableDebugLogging(runtime);
+
+ TClickHelper(*server).CreateClickBenchTable();
+
+ // write data
+
+ ui32 numIterations = 10;
+ const ui32 iterationPackSize = 2000;
+ for (ui64 i = 0; i < numIterations; ++i) {
+ TClickHelper(*server).SendDataViaActorSystem("/Root/benchTable", 0, 1000000 + i * 1000000,
+ iterationPackSize);
+ }
+
+ TAggregationTestCase currentTest;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case NKqp::TKqpComputeEvents::EvScanData:
+ {
+ auto* msg = ev->Get<NKqp::TEvKqpCompute::TEvScanData>();
+ Y_VERIFY(currentTest.MutableLimitChecker().CheckExpectedLimitOnScanData(msg->ArrowBatch ? msg->ArrowBatch->num_rows() : 0));
+ Y_VERIFY(currentTest.MutableRecordChecker().CheckExpectedOnScanData(msg->ArrowBatch ? msg->ArrowBatch->num_columns() : 0));
+ break;
+ }
+ case TEvDataShard::EvKqpScan:
+ {
+ auto* msg = ev->Get<TEvDataShard::TEvKqpScan>();
+ Y_VERIFY(currentTest.MutableLimitChecker().CheckExpectedLimitOnScanTask(msg->Record.GetItemsLimit()));
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ runtime->SetObserverFunc(captureEvents);
+
+ // selects
+
+ for (auto&& i : cases) {
+ const TString queryFixed = i.GetFixedQuery();
+ currentTest = i;
+ auto streamSender = runtime->AllocateEdgeActor();
+ SendRequest(*runtime, streamSender, MakeStreamRequest(streamSender, queryFixed, false));
+ auto ev = runtime->GrabEdgeEventRethrow<NKqp::TEvKqpCompute::TEvScanData>(streamSender, TDuration::Seconds(10));
+ Y_VERIFY(currentTest.CheckFinished());
+ }
+ }
+
Y_UNIT_TEST(Aggregation_ResultDistinctCountRI_GroupByL) {
TAggregationTestCase testCase;
testCase.SetQuery(R"(
@@ -1905,6 +1973,64 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TestAggregations({ testCase });
}
+ Y_UNIT_TEST(ClickBenchSmoke) {
+ TAggregationTestCase q7;
+ q7.SetQuery(R"(
+ SELECT
+ AdvEngineID, COUNT(*) as c
+ FROM `/Root/benchTable`
+ WHERE AdvEngineID != 0
+ GROUP BY AdvEngineID
+ ORDER BY c DESC
+ )")
+ //.SetExpectedReply("[[[\"40999\"];[4];1u];[[\"40998\"];[3];1u];[[\"40997\"];[2];1u]]")
+ .AddExpectedPlanOptions("TKqpOlapAgg")
+ .SetExpectedReadNodeType("TableFullScan");
+
+ TAggregationTestCase q9;
+ q9.SetQuery(R"(
+ SELECT
+ RegionID, SUM(AdvEngineID), COUNT(*) AS c, avg(ResolutionWidth), COUNT(DISTINCT UserID)
+ FROM `/Root/benchTable`
+ GROUP BY RegionID
+ ORDER BY c DESC
+ LIMIT 10
+ )")
+ //.SetExpectedReply("[[[\"40999\"];[4];1u];[[\"40998\"];[3];1u];[[\"40997\"];[2];1u]]")
+ .AddExpectedPlanOptions("TKqpOlapAgg")
+ .SetExpectedReadNodeType("TableFullScan");
+
+ TAggregationTestCase q12;
+ q12.SetQuery(R"(
+ SELECT
+ SearchPhrase, count(*) AS c
+ FROM `/Root/benchTable`
+ WHERE SearchPhrase != ''
+ GROUP BY SearchPhrase
+ ORDER BY c DESC
+ LIMIT 10;
+ )")
+ //.SetExpectedReply("[[[\"40999\"];[4];1u];[[\"40998\"];[3];1u];[[\"40997\"];[2];1u]]")
+ .AddExpectedPlanOptions("TKqpOlapAgg")
+ .SetExpectedReadNodeType("TableFullScan");
+
+ TAggregationTestCase q14;
+ q14.SetQuery(R"(
+ SELECT
+ SearchEngineID, SearchPhrase, count(*) AS c
+ FROM `/Root/benchTable`
+ WHERE SearchPhrase != ''
+ GROUP BY SearchEngineID, SearchPhrase
+ ORDER BY c DESC
+ LIMIT 10;
+ )")
+ //.SetExpectedReply("[[[\"40999\"];[4];1u];[[\"40998\"];[3];1u];[[\"40997\"];[2];1u]]")
+ .AddExpectedPlanOptions("TKqpOlapAgg")
+ .SetExpectedReadNodeType("TableFullScan");
+
+ TestClickBench({ q7, q9, q12, q14 });
+ }
+
Y_UNIT_TEST(StatsSysView) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
diff --git a/ydb/core/testlib/cs_helper.cpp b/ydb/core/testlib/cs_helper.cpp
index 98064014db6..b1daae1bc78 100644
--- a/ydb/core/testlib/cs_helper.cpp
+++ b/ydb/core/testlib/cs_helper.cpp
@@ -13,7 +13,7 @@
namespace NKikimr::Tests::NCS {
-void THelper::CreateTestOlapStore(TActorId sender, TString scheme) {
+void THelperSchemaless::CreateTestOlapStore(TActorId sender, TString scheme) {
NKikimrSchemeOp::TColumnStoreDescription store;
UNIT_ASSERT(::google::protobuf::TextFormat::ParseFromString(scheme, &store));
@@ -21,7 +21,7 @@ void THelper::CreateTestOlapStore(TActorId sender, TString scheme) {
request->Record.SetExecTimeoutPeriod(Max<ui64>());
auto* op = request->Record.MutableTransaction()->MutableModifyScheme();
op->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnStore);
- op->SetWorkingDir("/Root");
+ op->SetWorkingDir(ROOT_PATH);
op->MutableCreateColumnStore()->CopyFrom(store);
Server.GetRuntime()->Send(new IEventHandle(MakeTxProxyID(), sender, request.release()));
@@ -30,49 +30,44 @@ void THelper::CreateTestOlapStore(TActorId sender, TString scheme) {
WaitForSchemeOperation(sender, txId);
}
-void THelper::CreateTestOlapTable(TActorId sender, TString storeName, TString scheme) {
+void THelperSchemaless::CreateTestOlapTable(TActorId sender, TString storeOrDirName, TString scheme) {
NKikimrSchemeOp::TColumnTableDescription table;
UNIT_ASSERT(::google::protobuf::TextFormat::ParseFromString(scheme, &table));
auto request = std::make_unique<TEvTxUserProxy::TEvProposeTransaction>();
request->Record.SetExecTimeoutPeriod(Max<ui64>());
+
+ TString workingDir = ROOT_PATH;
+ if (!storeOrDirName.empty()) {
+ workingDir += "/" + storeOrDirName;
+ }
+
auto* op = request->Record.MutableTransaction()->MutableModifyScheme();
op->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnTable);
- op->SetWorkingDir("/Root/" + storeName);
+ op->SetWorkingDir(workingDir);
op->MutableCreateColumnTable()->CopyFrom(table);
Server.GetRuntime()->Send(new IEventHandle(MakeTxProxyID(), sender, request.release()));
auto ev = Server.GetRuntime()->GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
ui64 txId = ev->Get()->Record.GetTxId();
+ auto status = ev->Get()->Record.GetStatus();
+ UNIT_ASSERT(status != TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecError);
WaitForSchemeOperation(sender, txId);
}
-std::shared_ptr<arrow::Schema> THelper::GetArrowSchema() {
- return std::make_shared<arrow::Schema>(
- std::vector<std::shared_ptr<arrow::Field>>{
- arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)),
- arrow::field("resource_id", arrow::utf8()),
- arrow::field("uid", arrow::utf8()),
- arrow::field("level", arrow::int32()),
- arrow::field("message", arrow::utf8())
- });
-}
+void THelperSchemaless::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
+ auto* runtime = Server.GetRuntime();
-TString THelper::TestBlob(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
auto batch = TestArrowBatch(pathIdBegin, tsBegin, rowCount);
- return NArrow::SerializeBatchNoCompression(batch);
-}
-
-void THelper::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
- auto* runtime = Server.GetRuntime();
- std::shared_ptr<arrow::Schema> schema = GetArrowSchema();
- TString serializedSchema = NArrow::SerializeSchema(*schema);
- Y_VERIFY(serializedSchema);
- auto batch = TestBlob(pathIdBegin, tsBegin, rowCount);
- Y_VERIFY(batch);
+ UNIT_ASSERT(batch);
+ UNIT_ASSERT(batch->num_rows());
+ auto data = NArrow::SerializeBatchNoCompression(batch);
+ UNIT_ASSERT(!data.empty());
+ TString serializedSchema = NArrow::SerializeSchema(*batch->schema());
+ UNIT_ASSERT(serializedSchema);
Ydb::Table::BulkUpsertRequest request;
request.mutable_arrow_batch_settings()->set_schema(serializedSchema);
- request.set_data(batch);
+ request.set_data(data);
request.set_table(testTable);
size_t responses = 0;
@@ -81,7 +76,14 @@ void THelper::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 t
auto future = NRpcService::DoLocalRpc<TEvBulkUpsertRequest>(std::move(request), "", "", runtime->GetActorSystem(0));
future.Subscribe([&](const NThreading::TFuture<Ydb::Table::BulkUpsertResponse> f) mutable {
++responses;
- UNIT_ASSERT_VALUES_EQUAL(f.GetValueSync().operation().status(), Ydb::StatusIds::SUCCESS);
+ auto op = f.GetValueSync().operation();
+ if (op.status() != Ydb::StatusIds::SUCCESS) {
+ for (auto& issue : op.issues()) {
+ Cerr << issue.message() << " ";
+ }
+ Cerr << "\n";
+ }
+ UNIT_ASSERT_VALUES_EQUAL(op.status(), Ydb::StatusIds::SUCCESS);
});
TDispatchOptions options;
@@ -92,6 +94,19 @@ void THelper::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 t
runtime->DispatchEvents(options);
}
+//
+
+std::shared_ptr<arrow::Schema> THelper::GetArrowSchema() {
+ return std::make_shared<arrow::Schema>(
+ std::vector<std::shared_ptr<arrow::Field>>{
+ arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)),
+ arrow::field("resource_id", arrow::utf8()),
+ arrow::field("uid", arrow::utf8()),
+ arrow::field("level", arrow::int32()),
+ arrow::field("message", arrow::utf8())
+ });
+}
+
std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
std::shared_ptr<arrow::Schema> schema = GetArrowSchema();
@@ -126,4 +141,172 @@ std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui
return arrow::RecordBatch::Make(schema, rowCount, { a1, a2, a3, a4, a5 });
}
+//
+
+std::shared_ptr<arrow::Schema> TCickBenchHelper::GetArrowSchema() {
+ return std::make_shared<arrow::Schema>(
+ std::vector<std::shared_ptr<arrow::Field>> {
+ arrow::field("WatchID", arrow::int64()),
+ arrow::field("JavaEnable", arrow::int16()),
+ arrow::field("Title", arrow::utf8()),
+ arrow::field("GoodEvent", arrow::int16()),
+ arrow::field("EventTime", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)),
+ arrow::field("EventDate", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)), // TODO: Date
+ arrow::field("CounterID", arrow::int32()),
+ arrow::field("ClientIP", arrow::int32()),
+ arrow::field("RegionID", arrow::int32()),
+ arrow::field("UserID", arrow::int64()),
+ arrow::field("CounterClass", arrow::int16()),
+ arrow::field("OS", arrow::int16()),
+ arrow::field("UserAgent", arrow::int16()),
+ arrow::field("URL", arrow::utf8()),
+ arrow::field("Referer", arrow::utf8()),
+ arrow::field("IsRefresh", arrow::int16()),
+ arrow::field("RefererCategoryID", arrow::int16()),
+ arrow::field("RefererRegionID", arrow::int32()),
+ arrow::field("URLCategoryID", arrow::int16()),
+ arrow::field("URLRegionID", arrow::int32()),
+ arrow::field("ResolutionWidth", arrow::int16()),
+ arrow::field("ResolutionHeight", arrow::int16()),
+ arrow::field("ResolutionDepth", arrow::int16()),
+ arrow::field("FlashMajor", arrow::int16()),
+ arrow::field("FlashMinor", arrow::int16()),
+ arrow::field("FlashMinor2", arrow::utf8()),
+ arrow::field("NetMajor", arrow::int16()),
+ arrow::field("NetMinor", arrow::int16()),
+ arrow::field("UserAgentMajor", arrow::int16()),
+ arrow::field("UserAgentMinor", arrow::binary()),
+ arrow::field("CookieEnable", arrow::int16()),
+ arrow::field("JavascriptEnable", arrow::int16()),
+ arrow::field("IsMobile", arrow::int16()),
+ arrow::field("MobilePhone", arrow::int16()),
+ arrow::field("MobilePhoneModel", arrow::utf8()),
+ arrow::field("Params", arrow::utf8()),
+ arrow::field("IPNetworkID", arrow::int32()),
+ arrow::field("TraficSourceID", arrow::int16()),
+ arrow::field("SearchEngineID", arrow::int16()),
+ arrow::field("SearchPhrase", arrow::utf8()),
+ arrow::field("AdvEngineID", arrow::int16()),
+ arrow::field("IsArtifical", arrow::int16()),
+ arrow::field("WindowClientWidth", arrow::int16()),
+ arrow::field("WindowClientHeight", arrow::int16()),
+ arrow::field("ClientTimeZone", arrow::int16()),
+ arrow::field("ClientEventTime", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)),
+ arrow::field("SilverlightVersion1", arrow::int16()),
+ arrow::field("SilverlightVersion2", arrow::int16()),
+ arrow::field("SilverlightVersion3", arrow::int32()),
+ arrow::field("SilverlightVersion4", arrow::int16()),
+ arrow::field("PageCharset", arrow::utf8()),
+ arrow::field("CodeVersion", arrow::int32()),
+ arrow::field("IsLink", arrow::int16()),
+ arrow::field("IsDownload", arrow::int16()),
+ arrow::field("IsNotBounce", arrow::int16()),
+ arrow::field("FUniqID", arrow::int64()),
+ arrow::field("OriginalURL", arrow::utf8()),
+ arrow::field("HID", arrow::int32()),
+ arrow::field("IsOldCounter", arrow::int16()),
+ arrow::field("IsEvent", arrow::int16()),
+ arrow::field("IsParameter", arrow::int16()),
+ arrow::field("DontCountHits", arrow::int16()),
+ arrow::field("WithHash", arrow::int16()),
+ arrow::field("HitColor", arrow::binary()),
+ arrow::field("LocalEventTime", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)),
+ arrow::field("Age", arrow::int16()),
+ arrow::field("Sex", arrow::int16()),
+ arrow::field("Income", arrow::int16()),
+ arrow::field("Interests", arrow::int16()),
+ arrow::field("Robotness", arrow::int16()),
+ arrow::field("RemoteIP", arrow::int32()),
+ arrow::field("WindowName", arrow::int32()),
+ arrow::field("OpenerName", arrow::int32()),
+ arrow::field("HistoryLength", arrow::int16()),
+ arrow::field("BrowserLanguage", arrow::utf8()),
+ arrow::field("BrowserCountry", arrow::utf8()),
+ arrow::field("SocialNetwork", arrow::utf8()),
+ arrow::field("SocialAction", arrow::utf8()),
+ arrow::field("HTTPError", arrow::int16()),
+ arrow::field("SendTiming", arrow::int32()),
+ arrow::field("DNSTiming", arrow::int32()),
+ arrow::field("ConnectTiming", arrow::int32()),
+ arrow::field("ResponseStartTiming", arrow::int32()),
+ arrow::field("ResponseEndTiming", arrow::int32()),
+ arrow::field("FetchTiming", arrow::int32()),
+ arrow::field("SocialSourceNetworkID", arrow::int16()),
+ arrow::field("SocialSourcePage", arrow::utf8()),
+ arrow::field("ParamPrice", arrow::int64()),
+ arrow::field("ParamOrderID", arrow::utf8()),
+ arrow::field("ParamCurrency", arrow::utf8()),
+ arrow::field("ParamCurrencyID", arrow::int16()),
+ arrow::field("OpenstatServiceName", arrow::utf8()),
+ arrow::field("OpenstatCampaignID", arrow::utf8()),
+ arrow::field("OpenstatAdID", arrow::utf8()),
+ arrow::field("OpenstatSourceID", arrow::utf8()),
+ arrow::field("UTMSource", arrow::utf8()),
+ arrow::field("UTMMedium", arrow::utf8()),
+ arrow::field("UTMCampaign", arrow::utf8()),
+ arrow::field("UTMContent", arrow::utf8()),
+ arrow::field("UTMTerm", arrow::utf8()),
+ arrow::field("FromTag", arrow::utf8()),
+ arrow::field("HasGCLID", arrow::int16()),
+ arrow::field("RefererHash", arrow::int64()),
+ arrow::field("URLHash", arrow::int64()),
+ arrow::field("CLID", arrow::int32())
+ });
+}
+
+std::shared_ptr<arrow::RecordBatch> TCickBenchHelper::TestArrowBatch(ui64, ui64 begin, size_t rowCount) {
+ std::shared_ptr<arrow::Schema> schema = GetArrowSchema();
+ UNIT_ASSERT(schema);
+ UNIT_ASSERT(schema->num_fields());
+
+ std::unique_ptr<arrow::RecordBatchBuilder> builders;
+ auto res = arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool(), rowCount, &builders);
+ UNIT_ASSERT(res.ok());
+
+ for (i32 col = 0; col < schema->num_fields(); ++col) {
+ auto& field = schema->field(col);
+ auto typeId = field->type()->id();
+ for (size_t row = 0; row < rowCount; ++row) {
+ ui64 value = begin + row;
+ switch (typeId) {
+ case arrow::Type::INT16: {
+ UNIT_ASSERT(builders->GetFieldAs<arrow::Int16Builder>(col)->Append(value).ok());
+ break;
+ }
+ case arrow::Type::INT32: {
+ UNIT_ASSERT(builders->GetFieldAs<arrow::Int32Builder>(col)->Append(value).ok());
+ break;
+ }
+ case arrow::Type::INT64: {
+ UNIT_ASSERT(builders->GetFieldAs<arrow::Int64Builder>(col)->Append(value).ok());
+ break;
+ }
+ case arrow::Type::TIMESTAMP: {
+ UNIT_ASSERT(builders->GetFieldAs<arrow::TimestampBuilder>(col)->Append(value).ok());
+ break;
+ }
+ case arrow::Type::BINARY: {
+ auto str = ToString(value);
+ UNIT_ASSERT(builders->GetFieldAs<arrow::BinaryBuilder>(col)->Append(str.data(), str.size()).ok());
+ break;
+ }
+ case arrow::Type::STRING: {
+ auto str = ToString(value);
+ UNIT_ASSERT(builders->GetFieldAs<arrow::StringBuilder>(col)->Append(str.data(), str.size()).ok());
+ break;
+ }
+ default:
+ Y_FAIL("unexpected type");
+ }
+ }
+ }
+
+ std::shared_ptr<arrow::RecordBatch> batch;
+ UNIT_ASSERT(builders->Flush(&batch).ok());
+ UNIT_ASSERT(batch);
+ UNIT_ASSERT(batch->num_rows());
+ UNIT_ASSERT(batch->Validate().ok());
+ return batch;
+}
+
}
diff --git a/ydb/core/testlib/cs_helper.h b/ydb/core/testlib/cs_helper.h
index 74efd67ed1c..2e8658675f6 100644
--- a/ydb/core/testlib/cs_helper.h
+++ b/ydb/core/testlib/cs_helper.h
@@ -5,19 +5,165 @@
namespace NKikimr::Tests::NCS {
-class THelper: public NCommon::THelper {
+class THelperSchemaless : public NCommon::THelper {
private:
using TBase = NCommon::THelper;
+public:
+ static constexpr const char * ROOT_PATH = "/Root";
+
+ using TBase::TBase;
+ void CreateTestOlapStore(TActorId sender, TString scheme);
+ void CreateTestOlapTable(TActorId sender, TString storeOrDirName, TString scheme);
+ void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount);
+
+ virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) = 0;
+};
+
+class THelper: public THelperSchemaless {
+private:
+ using TBase = THelperSchemaless;
+
+ std::shared_ptr<arrow::Schema> GetArrowSchema();
+public:
+ using TBase::TBase;
+
+ static constexpr const char * PROTO_SCHEMA = R"(
+ Columns { Name: "timestamp" Type: "Timestamp" NotNull: true }
+ #Columns { Name: "resource_type" Type: "Utf8" }
+ Columns { Name: "resource_id" Type: "Utf8" }
+ Columns { Name: "uid" Type: "Utf8" }
+ Columns { Name: "level" Type: "Int32" }
+ Columns { Name: "message" Type: "Utf8" }
+ #Columns { Name: "json_payload" Type: "Json" }
+ #Columns { Name: "ingested_at" Type: "Timestamp" }
+ #Columns { Name: "saved_at" Type: "Timestamp" }
+ #Columns { Name: "request_id" Type: "Utf8" }
+ KeyColumnNames: "timestamp"
+ Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
+ )";
+
+ std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) override;
+};
+
+class TCickBenchHelper: public THelperSchemaless {
+private:
+ using TBase = THelperSchemaless;
std::shared_ptr<arrow::Schema> GetArrowSchema();
- std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount);
public:
using TBase::TBase;
- void CreateTestOlapStore(TActorId sender, TString scheme);
- void CreateTestOlapTable(TActorId sender, TString storeName, TString scheme);
- TString TestBlob(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount);
- void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount);
+
+ static constexpr const char * PROTO_SCHEMA = R"(
+ Columns { Name: "WatchID" Type: "Int64" NotNull: true }
+ Columns { Name: "JavaEnable" Type: "Int16" NotNull: true }
+ Columns { Name: "Title" Type: "Utf8" NotNull: true }
+ Columns { Name: "GoodEvent" Type: "Int16" NotNull: true }
+ Columns { Name: "EventTime" Type: "Timestamp" NotNull: true }
+ Columns { Name: "EventDate" Type: "Timestamp" NotNull: true }
+ Columns { Name: "CounterID" Type: "Int32" NotNull: true }
+ Columns { Name: "ClientIP" Type: "Int32" NotNull: true }
+ Columns { Name: "RegionID" Type: "Int32" NotNull: true }
+ Columns { Name: "UserID" Type: "Int64" NotNull: true }
+ Columns { Name: "CounterClass" Type: "Int16" NotNull: true }
+ Columns { Name: "OS" Type: "Int16" NotNull: true }
+ Columns { Name: "UserAgent" Type: "Int16" NotNull: true }
+ Columns { Name: "URL" Type: "Utf8" NotNull: true }
+ Columns { Name: "Referer" Type: "Utf8" NotNull: true }
+ Columns { Name: "IsRefresh" Type: "Int16" NotNull: true }
+ Columns { Name: "RefererCategoryID" Type: "Int16" NotNull: true }
+ Columns { Name: "RefererRegionID" Type: "Int32" NotNull: true }
+ Columns { Name: "URLCategoryID" Type: "Int16" NotNull: true }
+ Columns { Name: "URLRegionID" Type: "Int32" NotNull: true }
+ Columns { Name: "ResolutionWidth" Type: "Int16" NotNull: true }
+ Columns { Name: "ResolutionHeight" Type: "Int16" NotNull: true }
+ Columns { Name: "ResolutionDepth" Type: "Int16" NotNull: true }
+ Columns { Name: "FlashMajor" Type: "Int16" NotNull: true }
+ Columns { Name: "FlashMinor" Type: "Int16" NotNull: true }
+ Columns { Name: "FlashMinor2" Type: "Utf8" NotNull: true }
+ Columns { Name: "NetMajor" Type: "Int16" NotNull: true }
+ Columns { Name: "NetMinor" Type: "Int16" NotNull: true }
+ Columns { Name: "UserAgentMajor" Type: "Int16" NotNull: true }
+ Columns { Name: "UserAgentMinor" Type: "String" NotNull: true }
+ Columns { Name: "CookieEnable" Type: "Int16" NotNull: true }
+ Columns { Name: "JavascriptEnable" Type: "Int16" NotNull: true }
+ Columns { Name: "IsMobile" Type: "Int16" NotNull: true }
+ Columns { Name: "MobilePhone" Type: "Int16" NotNull: true }
+ Columns { Name: "MobilePhoneModel" Type: "Utf8" NotNull: true }
+ Columns { Name: "Params" Type: "Utf8" NotNull: true }
+ Columns { Name: "IPNetworkID" Type: "Int32" NotNull: true }
+ Columns { Name: "TraficSourceID" Type: "Int16" NotNull: true }
+ Columns { Name: "SearchEngineID" Type: "Int16" NotNull: true }
+ Columns { Name: "SearchPhrase" Type: "Utf8" NotNull: true }
+ Columns { Name: "AdvEngineID" Type: "Int16" NotNull: true }
+ Columns { Name: "IsArtifical" Type: "Int16" NotNull: true }
+ Columns { Name: "WindowClientWidth" Type: "Int16" NotNull: true }
+ Columns { Name: "WindowClientHeight" Type: "Int16" NotNull: true }
+ Columns { Name: "ClientTimeZone" Type: "Int16" NotNull: true }
+ Columns { Name: "ClientEventTime" Type: "Timestamp" NotNull: true }
+ Columns { Name: "SilverlightVersion1" Type: "Int16" NotNull: true }
+ Columns { Name: "SilverlightVersion2" Type: "Int16" NotNull: true }
+ Columns { Name: "SilverlightVersion3" Type: "Int32" NotNull: true }
+ Columns { Name: "SilverlightVersion4" Type: "Int16" NotNull: true }
+ Columns { Name: "PageCharset" Type: "Utf8" NotNull: true }
+ Columns { Name: "CodeVersion" Type: "Int32" NotNull: true }
+ Columns { Name: "IsLink" Type: "Int16" NotNull: true }
+ Columns { Name: "IsDownload" Type: "Int16" NotNull: true }
+ Columns { Name: "IsNotBounce" Type: "Int16" NotNull: true }
+ Columns { Name: "FUniqID" Type: "Int64" NotNull: true }
+ Columns { Name: "OriginalURL" Type: "Utf8" NotNull: true }
+ Columns { Name: "HID" Type: "Int32" NotNull: true }
+ Columns { Name: "IsOldCounter" Type: "Int16" NotNull: true }
+ Columns { Name: "IsEvent" Type: "Int16" NotNull: true }
+ Columns { Name: "IsParameter" Type: "Int16" NotNull: true }
+ Columns { Name: "DontCountHits" Type: "Int16" NotNull: true }
+ Columns { Name: "WithHash" Type: "Int16" NotNull: true }
+ Columns { Name: "HitColor" Type: "String" NotNull: true }
+ Columns { Name: "LocalEventTime" Type: "Timestamp" NotNull: true }
+ Columns { Name: "Age" Type: "Int16" NotNull: true }
+ Columns { Name: "Sex" Type: "Int16" NotNull: true }
+ Columns { Name: "Income" Type: "Int16" NotNull: true }
+ Columns { Name: "Interests" Type: "Int16" NotNull: true }
+ Columns { Name: "Robotness" Type: "Int16" NotNull: true }
+ Columns { Name: "RemoteIP" Type: "Int32" NotNull: true }
+ Columns { Name: "WindowName" Type: "Int32" NotNull: true }
+ Columns { Name: "OpenerName" Type: "Int32" NotNull: true }
+ Columns { Name: "HistoryLength" Type: "Int16" NotNull: true }
+ Columns { Name: "BrowserLanguage" Type: "Utf8" NotNull: true }
+ Columns { Name: "BrowserCountry" Type: "Utf8" NotNull: true }
+ Columns { Name: "SocialNetwork" Type: "Utf8" NotNull: true }
+ Columns { Name: "SocialAction" Type: "Utf8" NotNull: true }
+ Columns { Name: "HTTPError" Type: "Int16" NotNull: true }
+ Columns { Name: "SendTiming" Type: "Int32" NotNull: true }
+ Columns { Name: "DNSTiming" Type: "Int32" NotNull: true }
+ Columns { Name: "ConnectTiming" Type: "Int32" NotNull: true }
+ Columns { Name: "ResponseStartTiming" Type: "Int32" NotNull: true }
+ Columns { Name: "ResponseEndTiming" Type: "Int32" NotNull: true }
+ Columns { Name: "FetchTiming" Type: "Int32" NotNull: true }
+ Columns { Name: "SocialSourceNetworkID" Type: "Int16" NotNull: true }
+ Columns { Name: "SocialSourcePage" Type: "Utf8" NotNull: true }
+ Columns { Name: "ParamPrice" Type: "Int64" NotNull: true }
+ Columns { Name: "ParamOrderID" Type: "Utf8" NotNull: true }
+ Columns { Name: "ParamCurrency" Type: "Utf8" NotNull: true }
+ Columns { Name: "ParamCurrencyID" Type: "Int16" NotNull: true }
+ Columns { Name: "OpenstatServiceName" Type: "Utf8" NotNull: true }
+ Columns { Name: "OpenstatCampaignID" Type: "Utf8" NotNull: true }
+ Columns { Name: "OpenstatAdID" Type: "Utf8" NotNull: true }
+ Columns { Name: "OpenstatSourceID" Type: "Utf8" NotNull: true }
+ Columns { Name: "UTMSource" Type: "Utf8" NotNull: true }
+ Columns { Name: "UTMMedium" Type: "Utf8" NotNull: true }
+ Columns { Name: "UTMCampaign" Type: "Utf8" NotNull: true }
+ Columns { Name: "UTMContent" Type: "Utf8" NotNull: true }
+ Columns { Name: "UTMTerm" Type: "Utf8" NotNull: true }
+ Columns { Name: "FromTag" Type: "Utf8" NotNull: true }
+ Columns { Name: "HasGCLID" Type: "Int16" NotNull: true }
+ Columns { Name: "RefererHash" Type: "Int64" NotNull: true }
+ Columns { Name: "URLHash" Type: "Int64" NotNull: true }
+ Columns { Name: "CLID" Type: "Int32" NotNull: true }
+ KeyColumnNames: ["EventTime", "EventDate", "CounterID", "UserID", "WatchID"]
+ )";
+
+ std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64 begin, size_t rowCount) override;
};
}
diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp
index 02d6fce31eb..9c3d133ba8d 100644
--- a/ydb/core/tx/tiering/ut/ut_tiers.cpp
+++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp
@@ -39,21 +39,10 @@ public:
SchemaPresets {
Name: "default"
Schema {
- Columns { Name: "timestamp" Type: "Timestamp" }
- #Columns { Name: "resource_type" Type: "Utf8" }
- Columns { Name: "resource_id" Type: "Utf8" }
- Columns { Name: "uid" Type: "Utf8" }
- Columns { Name: "level" Type: "Int32" }
- Columns { Name: "message" Type: "Utf8" }
- #Columns { Name: "json_payload" Type: "Json" }
- #Columns { Name: "ingested_at" Type: "Timestamp" }
- #Columns { Name: "saved_at" Type: "Timestamp" }
- #Columns { Name: "request_id" Type: "Utf8" }
- KeyColumnNames: "timestamp"
- Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
+ %s
}
}
- )", storeName.c_str(), storeShardsCount));
+ )", storeName.c_str(), storeShardsCount, PROTO_SCHEMA));
TString shardingColumns = "[\"timestamp\", \"uid\"]";
if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") {