diff options
author | chertus <azuikov@ydb.tech> | 2022-12-30 12:17:45 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2022-12-30 12:17:45 +0300 |
commit | 6629b1db31555de019a6363e912da8c2324c9408 (patch) | |
tree | 0a1b12e93fcb5b7d858a5c237638f27df01e586a | |
parent | d2bfeb9534f9d2c12073a0adda43d2861ac9bbbb (diff) | |
download | ydb-6629b1db31555de019a6363e912da8c2324c9408.tar.gz |
smoke test for some ClickBench queries
-rw-r--r-- | ydb/core/grpc_services/rpc_long_tx.cpp | 29 | ||||
-rw-r--r-- | ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 192 | ||||
-rw-r--r-- | ydb/core/testlib/cs_helper.cpp | 237 | ||||
-rw-r--r-- | ydb/core/testlib/cs_helper.h | 158 | ||||
-rw-r--r-- | ydb/core/tx/tiering/ut/ut_tiers.cpp | 15 |
5 files changed, 545 insertions, 86 deletions
diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp index 3ca643789a0..fe034e22572 100644 --- a/ydb/core/grpc_services/rpc_long_tx.cpp +++ b/ydb/core/grpc_services/rpc_long_tx.cpp @@ -68,12 +68,14 @@ class TFullSplitData { private: ui32 ShardsCount = 0; THashMap<ui64, TShardInfo> ShardsInfo; + public: - TFullSplitData(const ui32 shardsCount) - : ShardsCount(shardsCount) - { + TString ErrorString; - } + TFullSplitData(const ui32 shardsCount, TString errString = {}) + : ShardsCount(shardsCount) + , ErrorString(errString) + {} const THashMap<ui64, TShardInfo>& GetShardsInfo() const { return ShardsInfo; @@ -123,6 +125,14 @@ TFullSplitData SplitData(const std::shared_ptr<arrow::RecordBatch>& batch, } if (rowSharding.empty()) { + result.ErrorString = "empty " + + NKikimrSchemeOp::TColumnTableSharding::THashSharding::EHashFunction_Name(hashSharding.GetFunction()) + + " sharding"; + for (auto& column : shardingColumns) { + if (batch->schema()->GetFieldIndex(column) < 0) { + result.ErrorString += ", no column '" + column + "'"; + } + } return result; } @@ -149,8 +159,13 @@ TFullSplitData SplitData(const TString& data, const NKikimrSchemeOp::TColumnTabl std::shared_ptr<arrow::Schema> schema = ExtractArrowSchema(olapSchema); std::shared_ptr<arrow::RecordBatch> batch = NArrow::DeserializeBatch(data, schema); - if (!batch || !batch->ValidateFull().ok()) { - return TFullSplitData(0); + if (!batch) { + return TFullSplitData(0, TString("cannot deserialize batch with schema ") + schema->ToString()); + } + + auto res = batch->ValidateFull(); + if (!res.ok()) { + return TFullSplitData(0, TString("deserialize batch is not valid: ") + res.ToString()); } return SplitData(batch, description); } @@ -465,7 +480,7 @@ protected: SplitData(GetDeserializedBatch(), description) : SplitData(GetSerializedData(), description); if (batches.GetShardsInfo().empty()) { - return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Cannot deserialize or split input data"); + return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Input data sharding error: " + batches.ErrorString); } ui32 sumBytes = 0; ui32 rowsCount = 0; diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 26a560fcc4a..b4edb0db146 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -49,8 +49,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { runtime->SetLogPriority(NKikimrServices::KQP_RESOURCE_MANAGER, NActors::NLog::PRI_DEBUG); //runtime->SetLogPriority(NKikimrServices::LONG_TX_SERVICE, NActors::NLog::PRI_DEBUG); runtime->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_TRACE); - //runtime->SetLogPriority(NKikimrServices::TX_COLUMNSHARD_SCAN, NActors::NLog::PRI_DEBUG); - //runtime->SetLogPriority(NKikimrServices::TX_OLAPSHARD, NActors::NLog::PRI_DEBUG); + runtime->SetLogPriority(NKikimrServices::TX_COLUMNSHARD_SCAN, NActors::NLog::PRI_DEBUG); //runtime->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_DEBUG); //runtime->SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_DEBUG); //runtime->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG); @@ -69,26 +68,15 @@ Y_UNIT_TEST_SUITE(KqpOlap) { TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") { TActorId sender = Server.GetRuntime()->AllocateEdgeActor(); CreateTestOlapStore(sender, Sprintf(R"( - Name: "%s" - ColumnShardCount: %d - SchemaPresets { - Name: "default" - Schema { - Columns { Name: "timestamp" Type: "Timestamp" NotNull: true } - #Columns { Name: "resource_type" Type: "Utf8" } - Columns { Name: "resource_id" Type: "Utf8" } - Columns { Name: "uid" Type: "Utf8" } - Columns { Name: "level" Type: "Int32" } - Columns { Name: "message" Type: "Utf8" } - #Columns { Name: "json_payload" Type: "Json" } - #Columns { Name: "ingested_at" Type: "Timestamp" } - #Columns { Name: "saved_at" Type: "Timestamp" } - #Columns { Name: "request_id" Type: "Utf8" } - KeyColumnNames: "timestamp" - Engine: COLUMN_ENGINE_REPLACING_TIMESERIES - } - } - )", storeName.c_str(), storeShardsCount)); + Name: "%s" + ColumnShardCount: %d + SchemaPresets { + Name: "default" + Schema { + %s + } + } + )", storeName.c_str(), storeShardsCount, PROTO_SCHEMA)); TString shardingColumns = "[\"timestamp\", \"uid\"]"; if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") { @@ -113,18 +101,37 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } }; - std::shared_ptr<arrow::Schema> GetArrowSchema() { - return std::make_shared<arrow::Schema>( - std::vector<std::shared_ptr<arrow::Field>>{ - arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)), - arrow::field("resource_id", arrow::utf8()), - arrow::field("uid", arrow::utf8()), - arrow::field("level", arrow::int32()), - arrow::field("message", arrow::utf8()) - }); - } + class TClickHelper : public Tests::NCS::TCickBenchHelper { + private: + using TBase = Tests::NCS::TCickBenchHelper; + public: + using TBase::TBase; + + TClickHelper(TKikimrRunner& runner) + : TBase(runner.GetTestServer()) + {} + + void CreateClickBenchTable(TString tableName = "benchTable", ui32 shardsCount = 4) { + TActorId sender = Server.GetRuntime()->AllocateEdgeActor(); + + TBase::CreateTestOlapTable(sender, "", Sprintf(R"( + Name: "%s" + ColumnShardCount: %d + Schema { + %s + } + Sharding { + HashSharding { + Function: HASH_FUNCTION_MODULO_N + Columns: "EventTime" + } + })", tableName.c_str(), shardsCount, PROTO_SCHEMA)); + } + }; void WriteTestData(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { + UNIT_ASSERT(testTable != "/Root/benchTable"); // TODO: check schema instead + TLocalHelper lHelper(kikimr.GetTestServer()); NYdb::NLongTx::TClient client(kikimr.GetDriver()); @@ -132,7 +139,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) { UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString()); auto txId = resBeginTx.GetResult().tx_id(); - TString data = lHelper.TestBlob(pathIdBegin, tsBegin, rowCount); + auto batch = lHelper.TestArrowBatch(pathIdBegin, tsBegin, rowCount); + TString data = NArrow::SerializeBatchNoCompression(batch); NLongTx::TLongTxWriteResult resWrite = client.Write(txId, testTable, txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); @@ -1571,6 +1579,66 @@ Y_UNIT_TEST_SUITE(KqpOlap) { TestAggregationsInternal(cases); } + void TestClickBench(const std::vector<TAggregationTestCase>& cases) { + TPortManager tp; + ui16 mbusport = tp.GetPort(2134); + auto settings = Tests::TServerSettings(mbusport) + .SetDomainName("Root") + .SetUseRealThreads(false) + .SetNodeCount(2); + + Tests::TServer::TPtr server = new Tests::TServer(settings); + + auto runtime = server->GetRuntime(); + auto sender = runtime->AllocateEdgeActor(); + + InitRoot(server, sender); + EnableDebugLogging(runtime); + + TClickHelper(*server).CreateClickBenchTable(); + + // write data + + ui32 numIterations = 10; + const ui32 iterationPackSize = 2000; + for (ui64 i = 0; i < numIterations; ++i) { + TClickHelper(*server).SendDataViaActorSystem("/Root/benchTable", 0, 1000000 + i * 1000000, + iterationPackSize); + } + + TAggregationTestCase currentTest; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case NKqp::TKqpComputeEvents::EvScanData: + { + auto* msg = ev->Get<NKqp::TEvKqpCompute::TEvScanData>(); + Y_VERIFY(currentTest.MutableLimitChecker().CheckExpectedLimitOnScanData(msg->ArrowBatch ? msg->ArrowBatch->num_rows() : 0)); + Y_VERIFY(currentTest.MutableRecordChecker().CheckExpectedOnScanData(msg->ArrowBatch ? msg->ArrowBatch->num_columns() : 0)); + break; + } + case TEvDataShard::EvKqpScan: + { + auto* msg = ev->Get<TEvDataShard::TEvKqpScan>(); + Y_VERIFY(currentTest.MutableLimitChecker().CheckExpectedLimitOnScanTask(msg->Record.GetItemsLimit())); + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + runtime->SetObserverFunc(captureEvents); + + // selects + + for (auto&& i : cases) { + const TString queryFixed = i.GetFixedQuery(); + currentTest = i; + auto streamSender = runtime->AllocateEdgeActor(); + SendRequest(*runtime, streamSender, MakeStreamRequest(streamSender, queryFixed, false)); + auto ev = runtime->GrabEdgeEventRethrow<NKqp::TEvKqpCompute::TEvScanData>(streamSender, TDuration::Seconds(10)); + Y_VERIFY(currentTest.CheckFinished()); + } + } + Y_UNIT_TEST(Aggregation_ResultDistinctCountRI_GroupByL) { TAggregationTestCase testCase; testCase.SetQuery(R"( @@ -1905,6 +1973,64 @@ Y_UNIT_TEST_SUITE(KqpOlap) { TestAggregations({ testCase }); } + Y_UNIT_TEST(ClickBenchSmoke) { + TAggregationTestCase q7; + q7.SetQuery(R"( + SELECT + AdvEngineID, COUNT(*) as c + FROM `/Root/benchTable` + WHERE AdvEngineID != 0 + GROUP BY AdvEngineID + ORDER BY c DESC + )") + //.SetExpectedReply("[[[\"40999\"];[4];1u];[[\"40998\"];[3];1u];[[\"40997\"];[2];1u]]") + .AddExpectedPlanOptions("TKqpOlapAgg") + .SetExpectedReadNodeType("TableFullScan"); + + TAggregationTestCase q9; + q9.SetQuery(R"( + SELECT + RegionID, SUM(AdvEngineID), COUNT(*) AS c, avg(ResolutionWidth), COUNT(DISTINCT UserID) + FROM `/Root/benchTable` + GROUP BY RegionID + ORDER BY c DESC + LIMIT 10 + )") + //.SetExpectedReply("[[[\"40999\"];[4];1u];[[\"40998\"];[3];1u];[[\"40997\"];[2];1u]]") + .AddExpectedPlanOptions("TKqpOlapAgg") + .SetExpectedReadNodeType("TableFullScan"); + + TAggregationTestCase q12; + q12.SetQuery(R"( + SELECT + SearchPhrase, count(*) AS c + FROM `/Root/benchTable` + WHERE SearchPhrase != '' + GROUP BY SearchPhrase + ORDER BY c DESC + LIMIT 10; + )") + //.SetExpectedReply("[[[\"40999\"];[4];1u];[[\"40998\"];[3];1u];[[\"40997\"];[2];1u]]") + .AddExpectedPlanOptions("TKqpOlapAgg") + .SetExpectedReadNodeType("TableFullScan"); + + TAggregationTestCase q14; + q14.SetQuery(R"( + SELECT + SearchEngineID, SearchPhrase, count(*) AS c + FROM `/Root/benchTable` + WHERE SearchPhrase != '' + GROUP BY SearchEngineID, SearchPhrase + ORDER BY c DESC + LIMIT 10; + )") + //.SetExpectedReply("[[[\"40999\"];[4];1u];[[\"40998\"];[3];1u];[[\"40997\"];[2];1u]]") + .AddExpectedPlanOptions("TKqpOlapAgg") + .SetExpectedReadNodeType("TableFullScan"); + + TestClickBench({ q7, q9, q12, q14 }); + } + Y_UNIT_TEST(StatsSysView) { auto settings = TKikimrSettings() .SetWithSampleTables(false); diff --git a/ydb/core/testlib/cs_helper.cpp b/ydb/core/testlib/cs_helper.cpp index 98064014db6..b1daae1bc78 100644 --- a/ydb/core/testlib/cs_helper.cpp +++ b/ydb/core/testlib/cs_helper.cpp @@ -13,7 +13,7 @@ namespace NKikimr::Tests::NCS { -void THelper::CreateTestOlapStore(TActorId sender, TString scheme) { +void THelperSchemaless::CreateTestOlapStore(TActorId sender, TString scheme) { NKikimrSchemeOp::TColumnStoreDescription store; UNIT_ASSERT(::google::protobuf::TextFormat::ParseFromString(scheme, &store)); @@ -21,7 +21,7 @@ void THelper::CreateTestOlapStore(TActorId sender, TString scheme) { request->Record.SetExecTimeoutPeriod(Max<ui64>()); auto* op = request->Record.MutableTransaction()->MutableModifyScheme(); op->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnStore); - op->SetWorkingDir("/Root"); + op->SetWorkingDir(ROOT_PATH); op->MutableCreateColumnStore()->CopyFrom(store); Server.GetRuntime()->Send(new IEventHandle(MakeTxProxyID(), sender, request.release())); @@ -30,49 +30,44 @@ void THelper::CreateTestOlapStore(TActorId sender, TString scheme) { WaitForSchemeOperation(sender, txId); } -void THelper::CreateTestOlapTable(TActorId sender, TString storeName, TString scheme) { +void THelperSchemaless::CreateTestOlapTable(TActorId sender, TString storeOrDirName, TString scheme) { NKikimrSchemeOp::TColumnTableDescription table; UNIT_ASSERT(::google::protobuf::TextFormat::ParseFromString(scheme, &table)); auto request = std::make_unique<TEvTxUserProxy::TEvProposeTransaction>(); request->Record.SetExecTimeoutPeriod(Max<ui64>()); + + TString workingDir = ROOT_PATH; + if (!storeOrDirName.empty()) { + workingDir += "/" + storeOrDirName; + } + auto* op = request->Record.MutableTransaction()->MutableModifyScheme(); op->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnTable); - op->SetWorkingDir("/Root/" + storeName); + op->SetWorkingDir(workingDir); op->MutableCreateColumnTable()->CopyFrom(table); Server.GetRuntime()->Send(new IEventHandle(MakeTxProxyID(), sender, request.release())); auto ev = Server.GetRuntime()->GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); ui64 txId = ev->Get()->Record.GetTxId(); + auto status = ev->Get()->Record.GetStatus(); + UNIT_ASSERT(status != TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecError); WaitForSchemeOperation(sender, txId); } -std::shared_ptr<arrow::Schema> THelper::GetArrowSchema() { - return std::make_shared<arrow::Schema>( - std::vector<std::shared_ptr<arrow::Field>>{ - arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)), - arrow::field("resource_id", arrow::utf8()), - arrow::field("uid", arrow::utf8()), - arrow::field("level", arrow::int32()), - arrow::field("message", arrow::utf8()) - }); -} +void THelperSchemaless::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { + auto* runtime = Server.GetRuntime(); -TString THelper::TestBlob(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { auto batch = TestArrowBatch(pathIdBegin, tsBegin, rowCount); - return NArrow::SerializeBatchNoCompression(batch); -} - -void THelper::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { - auto* runtime = Server.GetRuntime(); - std::shared_ptr<arrow::Schema> schema = GetArrowSchema(); - TString serializedSchema = NArrow::SerializeSchema(*schema); - Y_VERIFY(serializedSchema); - auto batch = TestBlob(pathIdBegin, tsBegin, rowCount); - Y_VERIFY(batch); + UNIT_ASSERT(batch); + UNIT_ASSERT(batch->num_rows()); + auto data = NArrow::SerializeBatchNoCompression(batch); + UNIT_ASSERT(!data.empty()); + TString serializedSchema = NArrow::SerializeSchema(*batch->schema()); + UNIT_ASSERT(serializedSchema); Ydb::Table::BulkUpsertRequest request; request.mutable_arrow_batch_settings()->set_schema(serializedSchema); - request.set_data(batch); + request.set_data(data); request.set_table(testTable); size_t responses = 0; @@ -81,7 +76,14 @@ void THelper::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 t auto future = NRpcService::DoLocalRpc<TEvBulkUpsertRequest>(std::move(request), "", "", runtime->GetActorSystem(0)); future.Subscribe([&](const NThreading::TFuture<Ydb::Table::BulkUpsertResponse> f) mutable { ++responses; - UNIT_ASSERT_VALUES_EQUAL(f.GetValueSync().operation().status(), Ydb::StatusIds::SUCCESS); + auto op = f.GetValueSync().operation(); + if (op.status() != Ydb::StatusIds::SUCCESS) { + for (auto& issue : op.issues()) { + Cerr << issue.message() << " "; + } + Cerr << "\n"; + } + UNIT_ASSERT_VALUES_EQUAL(op.status(), Ydb::StatusIds::SUCCESS); }); TDispatchOptions options; @@ -92,6 +94,19 @@ void THelper::SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 t runtime->DispatchEvents(options); } +// + +std::shared_ptr<arrow::Schema> THelper::GetArrowSchema() { + return std::make_shared<arrow::Schema>( + std::vector<std::shared_ptr<arrow::Field>>{ + arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)), + arrow::field("resource_id", arrow::utf8()), + arrow::field("uid", arrow::utf8()), + arrow::field("level", arrow::int32()), + arrow::field("message", arrow::utf8()) + }); +} + std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { std::shared_ptr<arrow::Schema> schema = GetArrowSchema(); @@ -126,4 +141,172 @@ std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui return arrow::RecordBatch::Make(schema, rowCount, { a1, a2, a3, a4, a5 }); } +// + +std::shared_ptr<arrow::Schema> TCickBenchHelper::GetArrowSchema() { + return std::make_shared<arrow::Schema>( + std::vector<std::shared_ptr<arrow::Field>> { + arrow::field("WatchID", arrow::int64()), + arrow::field("JavaEnable", arrow::int16()), + arrow::field("Title", arrow::utf8()), + arrow::field("GoodEvent", arrow::int16()), + arrow::field("EventTime", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)), + arrow::field("EventDate", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)), // TODO: Date + arrow::field("CounterID", arrow::int32()), + arrow::field("ClientIP", arrow::int32()), + arrow::field("RegionID", arrow::int32()), + arrow::field("UserID", arrow::int64()), + arrow::field("CounterClass", arrow::int16()), + arrow::field("OS", arrow::int16()), + arrow::field("UserAgent", arrow::int16()), + arrow::field("URL", arrow::utf8()), + arrow::field("Referer", arrow::utf8()), + arrow::field("IsRefresh", arrow::int16()), + arrow::field("RefererCategoryID", arrow::int16()), + arrow::field("RefererRegionID", arrow::int32()), + arrow::field("URLCategoryID", arrow::int16()), + arrow::field("URLRegionID", arrow::int32()), + arrow::field("ResolutionWidth", arrow::int16()), + arrow::field("ResolutionHeight", arrow::int16()), + arrow::field("ResolutionDepth", arrow::int16()), + arrow::field("FlashMajor", arrow::int16()), + arrow::field("FlashMinor", arrow::int16()), + arrow::field("FlashMinor2", arrow::utf8()), + arrow::field("NetMajor", arrow::int16()), + arrow::field("NetMinor", arrow::int16()), + arrow::field("UserAgentMajor", arrow::int16()), + arrow::field("UserAgentMinor", arrow::binary()), + arrow::field("CookieEnable", arrow::int16()), + arrow::field("JavascriptEnable", arrow::int16()), + arrow::field("IsMobile", arrow::int16()), + arrow::field("MobilePhone", arrow::int16()), + arrow::field("MobilePhoneModel", arrow::utf8()), + arrow::field("Params", arrow::utf8()), + arrow::field("IPNetworkID", arrow::int32()), + arrow::field("TraficSourceID", arrow::int16()), + arrow::field("SearchEngineID", arrow::int16()), + arrow::field("SearchPhrase", arrow::utf8()), + arrow::field("AdvEngineID", arrow::int16()), + arrow::field("IsArtifical", arrow::int16()), + arrow::field("WindowClientWidth", arrow::int16()), + arrow::field("WindowClientHeight", arrow::int16()), + arrow::field("ClientTimeZone", arrow::int16()), + arrow::field("ClientEventTime", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)), + arrow::field("SilverlightVersion1", arrow::int16()), + arrow::field("SilverlightVersion2", arrow::int16()), + arrow::field("SilverlightVersion3", arrow::int32()), + arrow::field("SilverlightVersion4", arrow::int16()), + arrow::field("PageCharset", arrow::utf8()), + arrow::field("CodeVersion", arrow::int32()), + arrow::field("IsLink", arrow::int16()), + arrow::field("IsDownload", arrow::int16()), + arrow::field("IsNotBounce", arrow::int16()), + arrow::field("FUniqID", arrow::int64()), + arrow::field("OriginalURL", arrow::utf8()), + arrow::field("HID", arrow::int32()), + arrow::field("IsOldCounter", arrow::int16()), + arrow::field("IsEvent", arrow::int16()), + arrow::field("IsParameter", arrow::int16()), + arrow::field("DontCountHits", arrow::int16()), + arrow::field("WithHash", arrow::int16()), + arrow::field("HitColor", arrow::binary()), + arrow::field("LocalEventTime", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)), + arrow::field("Age", arrow::int16()), + arrow::field("Sex", arrow::int16()), + arrow::field("Income", arrow::int16()), + arrow::field("Interests", arrow::int16()), + arrow::field("Robotness", arrow::int16()), + arrow::field("RemoteIP", arrow::int32()), + arrow::field("WindowName", arrow::int32()), + arrow::field("OpenerName", arrow::int32()), + arrow::field("HistoryLength", arrow::int16()), + arrow::field("BrowserLanguage", arrow::utf8()), + arrow::field("BrowserCountry", arrow::utf8()), + arrow::field("SocialNetwork", arrow::utf8()), + arrow::field("SocialAction", arrow::utf8()), + arrow::field("HTTPError", arrow::int16()), + arrow::field("SendTiming", arrow::int32()), + arrow::field("DNSTiming", arrow::int32()), + arrow::field("ConnectTiming", arrow::int32()), + arrow::field("ResponseStartTiming", arrow::int32()), + arrow::field("ResponseEndTiming", arrow::int32()), + arrow::field("FetchTiming", arrow::int32()), + arrow::field("SocialSourceNetworkID", arrow::int16()), + arrow::field("SocialSourcePage", arrow::utf8()), + arrow::field("ParamPrice", arrow::int64()), + arrow::field("ParamOrderID", arrow::utf8()), + arrow::field("ParamCurrency", arrow::utf8()), + arrow::field("ParamCurrencyID", arrow::int16()), + arrow::field("OpenstatServiceName", arrow::utf8()), + arrow::field("OpenstatCampaignID", arrow::utf8()), + arrow::field("OpenstatAdID", arrow::utf8()), + arrow::field("OpenstatSourceID", arrow::utf8()), + arrow::field("UTMSource", arrow::utf8()), + arrow::field("UTMMedium", arrow::utf8()), + arrow::field("UTMCampaign", arrow::utf8()), + arrow::field("UTMContent", arrow::utf8()), + arrow::field("UTMTerm", arrow::utf8()), + arrow::field("FromTag", arrow::utf8()), + arrow::field("HasGCLID", arrow::int16()), + arrow::field("RefererHash", arrow::int64()), + arrow::field("URLHash", arrow::int64()), + arrow::field("CLID", arrow::int32()) + }); +} + +std::shared_ptr<arrow::RecordBatch> TCickBenchHelper::TestArrowBatch(ui64, ui64 begin, size_t rowCount) { + std::shared_ptr<arrow::Schema> schema = GetArrowSchema(); + UNIT_ASSERT(schema); + UNIT_ASSERT(schema->num_fields()); + + std::unique_ptr<arrow::RecordBatchBuilder> builders; + auto res = arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool(), rowCount, &builders); + UNIT_ASSERT(res.ok()); + + for (i32 col = 0; col < schema->num_fields(); ++col) { + auto& field = schema->field(col); + auto typeId = field->type()->id(); + for (size_t row = 0; row < rowCount; ++row) { + ui64 value = begin + row; + switch (typeId) { + case arrow::Type::INT16: { + UNIT_ASSERT(builders->GetFieldAs<arrow::Int16Builder>(col)->Append(value).ok()); + break; + } + case arrow::Type::INT32: { + UNIT_ASSERT(builders->GetFieldAs<arrow::Int32Builder>(col)->Append(value).ok()); + break; + } + case arrow::Type::INT64: { + UNIT_ASSERT(builders->GetFieldAs<arrow::Int64Builder>(col)->Append(value).ok()); + break; + } + case arrow::Type::TIMESTAMP: { + UNIT_ASSERT(builders->GetFieldAs<arrow::TimestampBuilder>(col)->Append(value).ok()); + break; + } + case arrow::Type::BINARY: { + auto str = ToString(value); + UNIT_ASSERT(builders->GetFieldAs<arrow::BinaryBuilder>(col)->Append(str.data(), str.size()).ok()); + break; + } + case arrow::Type::STRING: { + auto str = ToString(value); + UNIT_ASSERT(builders->GetFieldAs<arrow::StringBuilder>(col)->Append(str.data(), str.size()).ok()); + break; + } + default: + Y_FAIL("unexpected type"); + } + } + } + + std::shared_ptr<arrow::RecordBatch> batch; + UNIT_ASSERT(builders->Flush(&batch).ok()); + UNIT_ASSERT(batch); + UNIT_ASSERT(batch->num_rows()); + UNIT_ASSERT(batch->Validate().ok()); + return batch; +} + } diff --git a/ydb/core/testlib/cs_helper.h b/ydb/core/testlib/cs_helper.h index 74efd67ed1c..2e8658675f6 100644 --- a/ydb/core/testlib/cs_helper.h +++ b/ydb/core/testlib/cs_helper.h @@ -5,19 +5,165 @@ namespace NKikimr::Tests::NCS { -class THelper: public NCommon::THelper { +class THelperSchemaless : public NCommon::THelper { private: using TBase = NCommon::THelper; +public: + static constexpr const char * ROOT_PATH = "/Root"; + + using TBase::TBase; + void CreateTestOlapStore(TActorId sender, TString scheme); + void CreateTestOlapTable(TActorId sender, TString storeOrDirName, TString scheme); + void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount); + + virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) = 0; +}; + +class THelper: public THelperSchemaless { +private: + using TBase = THelperSchemaless; + + std::shared_ptr<arrow::Schema> GetArrowSchema(); +public: + using TBase::TBase; + + static constexpr const char * PROTO_SCHEMA = R"( + Columns { Name: "timestamp" Type: "Timestamp" NotNull: true } + #Columns { Name: "resource_type" Type: "Utf8" } + Columns { Name: "resource_id" Type: "Utf8" } + Columns { Name: "uid" Type: "Utf8" } + Columns { Name: "level" Type: "Int32" } + Columns { Name: "message" Type: "Utf8" } + #Columns { Name: "json_payload" Type: "Json" } + #Columns { Name: "ingested_at" Type: "Timestamp" } + #Columns { Name: "saved_at" Type: "Timestamp" } + #Columns { Name: "request_id" Type: "Utf8" } + KeyColumnNames: "timestamp" + Engine: COLUMN_ENGINE_REPLACING_TIMESERIES + )"; + + std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) override; +}; + +class TCickBenchHelper: public THelperSchemaless { +private: + using TBase = THelperSchemaless; std::shared_ptr<arrow::Schema> GetArrowSchema(); - std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount); public: using TBase::TBase; - void CreateTestOlapStore(TActorId sender, TString scheme); - void CreateTestOlapTable(TActorId sender, TString storeName, TString scheme); - TString TestBlob(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount); - void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount); + + static constexpr const char * PROTO_SCHEMA = R"( + Columns { Name: "WatchID" Type: "Int64" NotNull: true } + Columns { Name: "JavaEnable" Type: "Int16" NotNull: true } + Columns { Name: "Title" Type: "Utf8" NotNull: true } + Columns { Name: "GoodEvent" Type: "Int16" NotNull: true } + Columns { Name: "EventTime" Type: "Timestamp" NotNull: true } + Columns { Name: "EventDate" Type: "Timestamp" NotNull: true } + Columns { Name: "CounterID" Type: "Int32" NotNull: true } + Columns { Name: "ClientIP" Type: "Int32" NotNull: true } + Columns { Name: "RegionID" Type: "Int32" NotNull: true } + Columns { Name: "UserID" Type: "Int64" NotNull: true } + Columns { Name: "CounterClass" Type: "Int16" NotNull: true } + Columns { Name: "OS" Type: "Int16" NotNull: true } + Columns { Name: "UserAgent" Type: "Int16" NotNull: true } + Columns { Name: "URL" Type: "Utf8" NotNull: true } + Columns { Name: "Referer" Type: "Utf8" NotNull: true } + Columns { Name: "IsRefresh" Type: "Int16" NotNull: true } + Columns { Name: "RefererCategoryID" Type: "Int16" NotNull: true } + Columns { Name: "RefererRegionID" Type: "Int32" NotNull: true } + Columns { Name: "URLCategoryID" Type: "Int16" NotNull: true } + Columns { Name: "URLRegionID" Type: "Int32" NotNull: true } + Columns { Name: "ResolutionWidth" Type: "Int16" NotNull: true } + Columns { Name: "ResolutionHeight" Type: "Int16" NotNull: true } + Columns { Name: "ResolutionDepth" Type: "Int16" NotNull: true } + Columns { Name: "FlashMajor" Type: "Int16" NotNull: true } + Columns { Name: "FlashMinor" Type: "Int16" NotNull: true } + Columns { Name: "FlashMinor2" Type: "Utf8" NotNull: true } + Columns { Name: "NetMajor" Type: "Int16" NotNull: true } + Columns { Name: "NetMinor" Type: "Int16" NotNull: true } + Columns { Name: "UserAgentMajor" Type: "Int16" NotNull: true } + Columns { Name: "UserAgentMinor" Type: "String" NotNull: true } + Columns { Name: "CookieEnable" Type: "Int16" NotNull: true } + Columns { Name: "JavascriptEnable" Type: "Int16" NotNull: true } + Columns { Name: "IsMobile" Type: "Int16" NotNull: true } + Columns { Name: "MobilePhone" Type: "Int16" NotNull: true } + Columns { Name: "MobilePhoneModel" Type: "Utf8" NotNull: true } + Columns { Name: "Params" Type: "Utf8" NotNull: true } + Columns { Name: "IPNetworkID" Type: "Int32" NotNull: true } + Columns { Name: "TraficSourceID" Type: "Int16" NotNull: true } + Columns { Name: "SearchEngineID" Type: "Int16" NotNull: true } + Columns { Name: "SearchPhrase" Type: "Utf8" NotNull: true } + Columns { Name: "AdvEngineID" Type: "Int16" NotNull: true } + Columns { Name: "IsArtifical" Type: "Int16" NotNull: true } + Columns { Name: "WindowClientWidth" Type: "Int16" NotNull: true } + Columns { Name: "WindowClientHeight" Type: "Int16" NotNull: true } + Columns { Name: "ClientTimeZone" Type: "Int16" NotNull: true } + Columns { Name: "ClientEventTime" Type: "Timestamp" NotNull: true } + Columns { Name: "SilverlightVersion1" Type: "Int16" NotNull: true } + Columns { Name: "SilverlightVersion2" Type: "Int16" NotNull: true } + Columns { Name: "SilverlightVersion3" Type: "Int32" NotNull: true } + Columns { Name: "SilverlightVersion4" Type: "Int16" NotNull: true } + Columns { Name: "PageCharset" Type: "Utf8" NotNull: true } + Columns { Name: "CodeVersion" Type: "Int32" NotNull: true } + Columns { Name: "IsLink" Type: "Int16" NotNull: true } + Columns { Name: "IsDownload" Type: "Int16" NotNull: true } + Columns { Name: "IsNotBounce" Type: "Int16" NotNull: true } + Columns { Name: "FUniqID" Type: "Int64" NotNull: true } + Columns { Name: "OriginalURL" Type: "Utf8" NotNull: true } + Columns { Name: "HID" Type: "Int32" NotNull: true } + Columns { Name: "IsOldCounter" Type: "Int16" NotNull: true } + Columns { Name: "IsEvent" Type: "Int16" NotNull: true } + Columns { Name: "IsParameter" Type: "Int16" NotNull: true } + Columns { Name: "DontCountHits" Type: "Int16" NotNull: true } + Columns { Name: "WithHash" Type: "Int16" NotNull: true } + Columns { Name: "HitColor" Type: "String" NotNull: true } + Columns { Name: "LocalEventTime" Type: "Timestamp" NotNull: true } + Columns { Name: "Age" Type: "Int16" NotNull: true } + Columns { Name: "Sex" Type: "Int16" NotNull: true } + Columns { Name: "Income" Type: "Int16" NotNull: true } + Columns { Name: "Interests" Type: "Int16" NotNull: true } + Columns { Name: "Robotness" Type: "Int16" NotNull: true } + Columns { Name: "RemoteIP" Type: "Int32" NotNull: true } + Columns { Name: "WindowName" Type: "Int32" NotNull: true } + Columns { Name: "OpenerName" Type: "Int32" NotNull: true } + Columns { Name: "HistoryLength" Type: "Int16" NotNull: true } + Columns { Name: "BrowserLanguage" Type: "Utf8" NotNull: true } + Columns { Name: "BrowserCountry" Type: "Utf8" NotNull: true } + Columns { Name: "SocialNetwork" Type: "Utf8" NotNull: true } + Columns { Name: "SocialAction" Type: "Utf8" NotNull: true } + Columns { Name: "HTTPError" Type: "Int16" NotNull: true } + Columns { Name: "SendTiming" Type: "Int32" NotNull: true } + Columns { Name: "DNSTiming" Type: "Int32" NotNull: true } + Columns { Name: "ConnectTiming" Type: "Int32" NotNull: true } + Columns { Name: "ResponseStartTiming" Type: "Int32" NotNull: true } + Columns { Name: "ResponseEndTiming" Type: "Int32" NotNull: true } + Columns { Name: "FetchTiming" Type: "Int32" NotNull: true } + Columns { Name: "SocialSourceNetworkID" Type: "Int16" NotNull: true } + Columns { Name: "SocialSourcePage" Type: "Utf8" NotNull: true } + Columns { Name: "ParamPrice" Type: "Int64" NotNull: true } + Columns { Name: "ParamOrderID" Type: "Utf8" NotNull: true } + Columns { Name: "ParamCurrency" Type: "Utf8" NotNull: true } + Columns { Name: "ParamCurrencyID" Type: "Int16" NotNull: true } + Columns { Name: "OpenstatServiceName" Type: "Utf8" NotNull: true } + Columns { Name: "OpenstatCampaignID" Type: "Utf8" NotNull: true } + Columns { Name: "OpenstatAdID" Type: "Utf8" NotNull: true } + Columns { Name: "OpenstatSourceID" Type: "Utf8" NotNull: true } + Columns { Name: "UTMSource" Type: "Utf8" NotNull: true } + Columns { Name: "UTMMedium" Type: "Utf8" NotNull: true } + Columns { Name: "UTMCampaign" Type: "Utf8" NotNull: true } + Columns { Name: "UTMContent" Type: "Utf8" NotNull: true } + Columns { Name: "UTMTerm" Type: "Utf8" NotNull: true } + Columns { Name: "FromTag" Type: "Utf8" NotNull: true } + Columns { Name: "HasGCLID" Type: "Int16" NotNull: true } + Columns { Name: "RefererHash" Type: "Int64" NotNull: true } + Columns { Name: "URLHash" Type: "Int64" NotNull: true } + Columns { Name: "CLID" Type: "Int32" NotNull: true } + KeyColumnNames: ["EventTime", "EventDate", "CounterID", "UserID", "WatchID"] + )"; + + std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64, ui64 begin, size_t rowCount) override; }; } diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp index 02d6fce31eb..9c3d133ba8d 100644 --- a/ydb/core/tx/tiering/ut/ut_tiers.cpp +++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp @@ -39,21 +39,10 @@ public: SchemaPresets { Name: "default" Schema { - Columns { Name: "timestamp" Type: "Timestamp" } - #Columns { Name: "resource_type" Type: "Utf8" } - Columns { Name: "resource_id" Type: "Utf8" } - Columns { Name: "uid" Type: "Utf8" } - Columns { Name: "level" Type: "Int32" } - Columns { Name: "message" Type: "Utf8" } - #Columns { Name: "json_payload" Type: "Json" } - #Columns { Name: "ingested_at" Type: "Timestamp" } - #Columns { Name: "saved_at" Type: "Timestamp" } - #Columns { Name: "request_id" Type: "Utf8" } - KeyColumnNames: "timestamp" - Engine: COLUMN_ENGINE_REPLACING_TIMESERIES + %s } } - )", storeName.c_str(), storeShardsCount)); + )", storeName.c_str(), storeShardsCount, PROTO_SCHEMA)); TString shardingColumns = "[\"timestamp\", \"uid\"]"; if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") { |