diff options
author | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-28 11:06:17 +0300 |
---|---|---|
committer | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-28 11:06:17 +0300 |
commit | d0cf09e9ddedf9721115af488fb1cacd3e2df042 (patch) | |
tree | ab0d3ed8d3f26c210c918f963635ac1048b16f14 | |
parent | 140202a697f7a9d60b01de36c08f63ebd33fc5a9 (diff) | |
download | ydb-d0cf09e9ddedf9721115af488fb1cacd3e2df042.tar.gz |
add column shards test with restarts
ref:75d33f456bc476cf478716ae2d3da236b4fd3c1b
-rw-r--r-- | ydb/core/grpc_services/rpc_load_rows.cpp | 70 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp | 9 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_olap_ut.cpp | 346 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__scan.cpp | 10 |
4 files changed, 357 insertions, 78 deletions
diff --git a/ydb/core/grpc_services/rpc_load_rows.cpp b/ydb/core/grpc_services/rpc_load_rows.cpp index a4936f1c35..23c163f6f6 100644 --- a/ydb/core/grpc_services/rpc_load_rows.cpp +++ b/ydb/core/grpc_services/rpc_load_rows.cpp @@ -94,6 +94,7 @@ bool CheckValueData(NScheme::TTypeId type, const TCell& cell, TString& err) { return ok; } + // TODO: no mapping for DATE, DATETIME, TZ_*, YSON, JSON, UUID, JSON_DOCUMENT, DYNUMBER bool ConvertArrowToYdbPrimitive(const arrow::DataType& type, Ydb::Type& toType) { switch (type.id()) { @@ -177,11 +178,15 @@ bool ConvertArrowToYdbPrimitive(const arrow::DataType& type, Ydb::Type& toType) using TEvBulkUpsertRequest = TGrpcRequestOperationCall<Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>; +const Ydb::Table::BulkUpsertRequest* GetProtoRequest(IRequestOpCtx* req) { + return TEvBulkUpsertRequest::GetProtoRequest(req); +} + class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ> { using TBase = NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ>; public: - explicit TUploadRowsRPCPublic(TEvBulkUpsertRequest* request) - : TBase(GetDuration(request->GetProtoRequest()->operation_params().operation_timeout())) + explicit TUploadRowsRPCPublic(IRequestOpCtx* request) + : TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout())) , Request(request) {} @@ -298,7 +303,7 @@ private: private: bool ReportCostInfoEnabled() const { - return Request->GetProtoRequest()->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED; + return GetProtoRequest(Request.get())->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED; } TString GetDatabase()override { @@ -306,7 +311,7 @@ private: } const TString& GetTable() override { - return Request->GetProtoRequest()->table(); + return GetProtoRequest(Request.get())->table(); } const TVector<std::pair<TSerializedCellVec, TString>>& GetRows() const override { @@ -339,7 +344,7 @@ private: if (!resolveResult) { TStringStream explanation; explanation << "Access denied for " << userToken.GetUserSID() - << " table '" << Request->GetProtoRequest()->table() + << " table '" << GetProtoRequest(Request.get())->table() << "' has not been resolved yet"; errorMessage = explanation.Str(); @@ -353,7 +358,7 @@ private: TStringStream explanation; explanation << "Access denied for " << userToken.GetUserSID() << " with access " << NACLib::AccessRightsToString(access) - << " to table '" << Request->GetProtoRequest()->table() << "'"; + << " to table '" << GetProtoRequest(Request.get())->table() << "'"; errorMessage = explanation.Str(); return false; @@ -365,7 +370,7 @@ private: TVector<std::pair<TString, Ydb::Type>> GetRequestColumns(TString& errorMessage) const override { Y_UNUSED(errorMessage); - const auto& type = Request->GetProtoRequest()->Getrows().Gettype(); + const auto& type = GetProtoRequest(Request.get())->Getrows().Gettype(); const auto& rowType = type.Getlist_type(); const auto& rowFields = rowType.Getitem().Getstruct_type().Getmembers(); @@ -395,7 +400,7 @@ private: // For each row in values TMemoryPool valueDataPool(256); - const auto& rows = Request->GetProtoRequest()->Getrows().Getvalue().Getitems(); + const auto& rows = GetProtoRequest(Request.get())->Getrows().Getvalue().Getitems(); for (const auto& r : rows) { valueDataPool.Clear(); @@ -436,25 +441,25 @@ private: } private: - std::unique_ptr<TEvBulkUpsertRequest> Request; + std::unique_ptr<IRequestOpCtx> Request; TVector<std::pair<TSerializedCellVec, TString>> AllRows; }; class TUploadColumnsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ> { using TBase = NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ>; public: - explicit TUploadColumnsRPCPublic(TEvBulkUpsertRequest* request) - : TBase(GetDuration(request->GetProtoRequest()->operation_params().operation_timeout())) + explicit TUploadColumnsRPCPublic(IRequestOpCtx* request) + : TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout())) , Request(request) {} private: bool ReportCostInfoEnabled() const { - return Request->GetProtoRequest()->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED; + return GetProtoRequest(Request.get())->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED; } EUploadSource GetSourceType() const override { - auto* req = Request->GetProtoRequest(); + auto* req = GetProtoRequest(Request.get()); if (req->has_arrow_batch_settings()) { return EUploadSource::ArrowBatch; } @@ -469,7 +474,7 @@ private: } const TString& GetTable() override { - return Request->GetProtoRequest()->table(); + return GetProtoRequest(Request.get())->table(); } const TVector<std::pair<TSerializedCellVec, TString>>& GetRows() const override { @@ -477,13 +482,13 @@ private: } const TString& GetSourceData() const override { - return Request->GetProtoRequest()->data(); + return GetProtoRequest(Request.get())->data(); } const TString& GetSourceSchema() const override { static const TString none; - if (Request->GetProtoRequest()->has_arrow_batch_settings()) { - return Request->GetProtoRequest()->arrow_batch_settings().schema(); + if (GetProtoRequest(Request.get())->has_arrow_batch_settings()) { + return GetProtoRequest(Request.get())->arrow_batch_settings().schema(); } return none; } @@ -514,7 +519,7 @@ private: if (!resolveResult) { TStringStream explanation; explanation << "Access denied for " << userToken.GetUserSID() - << " table '" << Request->GetProtoRequest()->table() + << " table '" << GetProtoRequest(Request.get())->table() << "' has not been resolved yet"; errorMessage = explanation.Str(); @@ -528,7 +533,7 @@ private: TStringStream explanation; explanation << "Access denied for " << userToken.GetUserSID() << " with access " << NACLib::AccessRightsToString(access) - << " to table '" << Request->GetProtoRequest()->table() << "'"; + << " to table '" << GetProtoRequest(Request.get())->table() << "'"; errorMessage = explanation.Str(); return false; @@ -650,26 +655,35 @@ private: } private: - std::unique_ptr<TEvBulkUpsertRequest> Request; + std::unique_ptr<IRequestOpCtx> Request; TVector<std::pair<TSerializedCellVec, TString>> Rows; const Ydb::Formats::CsvSettings& GetCsvSettings() const { - return Request->GetProtoRequest()->csv_settings(); + return GetProtoRequest(Request.get())->csv_settings(); } }; void DoBulkUpsertRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + if (GetProtoRequest(p.get())->has_arrow_batch_settings()) { + TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(p.release())); + } else if (GetProtoRequest(p.get())->has_csv_settings()) { + TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(p.release())); + } else { + TActivationContext::AsActorContext().Register(new TUploadRowsRPCPublic(p.release())); + } +} - auto* req = dynamic_cast<TEvBulkUpsertRequest*>(p.release()); - Y_VERIFY(req != nullptr, "Wrong using of TGRpcRequestWrapper"); - if (req->GetProtoRequest()->has_arrow_batch_settings()) { - TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(req)); - } else if (req->GetProtoRequest()->has_csv_settings()) { - TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(req)); +template<> +IActor* TEvBulkUpsertRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) { + if (GetProtoRequest(msg)->has_arrow_batch_settings()) { + return new TUploadColumnsRPCPublic(msg); + } else if (GetProtoRequest(msg)->has_csv_settings()) { + return new TUploadColumnsRPCPublic(msg); } else { - TActivationContext::AsActorContext().Register(new TUploadRowsRPCPublic(req)); + return new TUploadRowsRPCPublic(msg); } } + } // namespace NKikimr } // namespace NGRpcService diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index c21e335639..014d92c394 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -282,7 +282,8 @@ private: return; CA_LOG_D("Got EvScanInitActor from " << scanActorId << ", gen: " << msg.GetGeneration() - << ", state: " << EShardStateToString(state->State) << ", stateGen: " << state->Generation); + << ", state: " << EShardStateToString(state->State) << ", stateGen: " << state->Generation + << ", tabletId: " << state->TabletId); YQL_ENSURE(state->Generation == msg.GetGeneration()); @@ -367,7 +368,8 @@ private: << ", from: " << ev->Sender << ", shards remain: " << PendingShards.size() << ", in flight shards " << InFlightShards.size() << ", LastKey " << PrintLastKey() - << ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimiter"); + << ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimiter" + << ", tabletId: " << state->TabletId); if (rowsCount == 0 && !msg.Finished && state->State != EShardState::PostRunning) { SendScanDataAck(state); @@ -434,7 +436,8 @@ private: CA_LOG_W("Got EvScanError scan state: " << EShardStateToString(state->State) << " status: " << Ydb::StatusIds_StatusCode_Name(status) - << ", reason: " << issues.ToString()); + << ", reason: " << issues.ToString() + << ", tablet id: " << state->TabletId); YQL_ENSURE(state->Generation == msg.GetGeneration()); diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp index e15400986c..dc1792acce 100644 --- a/ydb/core/kqp/ut/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp @@ -7,72 +7,194 @@ #include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/writer.h> +#include <ydb/core/kqp/executer/kqp_executer.h> +#include <ydb/core/tx/datashard/datashard.h> +#include <ydb/core/tx/datashard/datashard_ut_common_kqp.h> +#include <ydb/core/tx/datashard/datashard_ut_common.h> +#include <ydb/core/grpc_services/local_rpc/local_rpc.h> +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/core/tx/schemeshard/schemeshard.h> +#include <ydb/core/testlib/test_client.h> +#include <ydb/core/testlib/tablet_helpers.h> + namespace NKikimr { namespace NKqp { +using namespace NKikimr::NDataShard::NKqpHelpers; +using namespace NSchemeShard; +using namespace NActors; using namespace NYdb; using namespace NYdb::NTable; using namespace NYdb::NScheme; +using TEvBulkUpsertRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::Table::BulkUpsertRequest, + Ydb::Table::BulkUpsertResponse>; + +void InitRoot(Tests::TServer::TPtr server, + TActorId sender) +{ + if (server->GetSettings().StoragePoolTypes.empty()) { + return; + } + + auto &runtime = *server->GetRuntime(); + auto &settings = server->GetSettings(); + + auto tid = Tests::ChangeStateStorage(Tests::SchemeRoot, settings.Domain); + const TDomainsInfo::TDomain& domain = runtime.GetAppData().DomainsInfo->GetDomain(settings.Domain); + + auto evTx = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(1, tid); + auto transaction = evTx->Record.AddTransaction(); + transaction->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSubDomain); + transaction->SetWorkingDir("/"); + auto op = transaction->MutableSubDomain(); + op->SetName(domain.Name); + + for (const auto& [kind, pool] : settings.StoragePoolTypes) { + auto* p = op->AddStoragePools(); + p->SetKind(kind); + p->SetName(pool.GetName()); + } + + runtime.SendToPipe(tid, sender, evTx.Release(), 0, GetPipeConfigWithRetries()); + + { + TAutoPtr<IEventHandle> handle; + auto event = runtime.GrabEdgeEvent<TEvSchemeShard::TEvModifySchemeTransactionResult>(handle); + UNIT_ASSERT_VALUES_EQUAL(event->Record.GetSchemeshardId(), tid); + UNIT_ASSERT_VALUES_EQUAL(event->Record.GetStatus(), NKikimrScheme::EStatus::StatusAccepted); + } + + auto evSubscribe = MakeHolder<TEvSchemeShard::TEvNotifyTxCompletion>(1); + runtime.SendToPipe(tid, sender, evSubscribe.Release(), 0, GetPipeConfigWithRetries()); + + { + TAutoPtr<IEventHandle> handle; + auto event = runtime.GrabEdgeEvent<TEvSchemeShard::TEvNotifyTxCompletionResult>(handle); + UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), 1); + } +} + Y_UNIT_TEST_SUITE(KqpOlap) { + void EnableDebugLogging(NActors::TTestActorRuntime* runtime) { + //runtime->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_DEBUG); + // runtime->SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NActors::NLog::PRI_DEBUG); + // runtime->SetLogPriority(NKikimrServices::SCHEME_BOARD_REPLICA, NActors::NLog::PRI_DEBUG); + // runtime->SetLogPriority(NKikimrServices::TX_PROXY, NActors::NLog::PRI_DEBUG); + runtime->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_DEBUG); + runtime->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG); + runtime->SetLogPriority(NKikimrServices::KQP_GATEWAY, NActors::NLog::PRI_DEBUG); + runtime->SetLogPriority(NKikimrServices::KQP_RESOURCE_MANAGER, NActors::NLog::PRI_DEBUG); + //runtime->SetLogPriority(NKikimrServices::LONG_TX_SERVICE, NActors::NLog::PRI_DEBUG); + runtime->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); + runtime->SetLogPriority(NKikimrServices::TX_COLUMNSHARD_SCAN, NActors::NLog::PRI_DEBUG); + //runtime->SetLogPriority(NKikimrServices::TX_OLAPSHARD, NActors::NLog::PRI_DEBUG); + //runtime->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_DEBUG); + //runtime->SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_DEBUG); + //runtime->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG); + } + void EnableDebugLogging(TKikimrRunner& kikimr) { - // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_DEBUG); - // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NActors::NLog::PRI_DEBUG); - // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::SCHEME_BOARD_REPLICA, NActors::NLog::PRI_DEBUG); - // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::TX_PROXY, NActors::NLog::PRI_DEBUG); - // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_DEBUG); - kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG); - kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_GATEWAY, NActors::NLog::PRI_DEBUG); - kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_RESOURCE_MANAGER, NActors::NLog::PRI_DEBUG); - kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::LONG_TX_SERVICE, NActors::NLog::PRI_DEBUG); - kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD_SCAN, NActors::NLog::PRI_DEBUG); - kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::TX_OLAPSHARD, NActors::NLog::PRI_DEBUG); - kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_DEBUG); - kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_DEBUG); + EnableDebugLogging(kikimr.GetTestServer().GetRuntime()); } - void CreateTestOlapTable(TKikimrRunner& kikimr, TString tableName = "olapTable") { - auto& legacyClient = kikimr.GetTestClient(); + void WaitForSchemeOperation(Tests::TServer& server, TActorId sender, ui64 txId) { + auto &runtime = *server.GetRuntime(); + auto &settings = server.GetSettings(); + auto request = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>(); + request->Record.SetTxId(txId); + auto tid = Tests::ChangeStateStorage(Tests::SchemeRoot, settings.Domain); + runtime.SendToPipe(tid, sender, request.Release(), 0, GetPipeConfigWithRetries()); + runtime.GrabEdgeEventRethrow<TEvSchemeShard::TEvNotifyTxCompletionResult>(sender); + } - legacyClient.CreateOlapStore("/Root", R"( - Name: "olapStore" - ColumnShardCount: 4 - SchemaPresets { - Name: "default" - Schema { - Columns { Name: "timestamp" Type: "Timestamp" } - #Columns { Name: "resource_type" Type: "Utf8" } - Columns { Name: "resource_id" Type: "Utf8" } - Columns { Name: "uid" Type: "Utf8" } - Columns { Name: "level" Type: "Int32" } - Columns { Name: "message" Type: "Utf8" } - #Columns { Name: "json_payload" Type: "Json" } - #Columns { Name: "ingested_at" Type: "Timestamp" } - #Columns { Name: "saved_at" Type: "Timestamp" } - #Columns { Name: "request_id" Type: "Utf8" } - KeyColumnNames: "timestamp" - Engine: COLUMN_ENGINE_REPLACING_TIMESERIES - } - } - )"); - legacyClient.CreateOlapTable("/Root/olapStore", Sprintf(R"( + void CreateTestOlapStore(Tests::TServer& server, TActorId sender, TString scheme) { + NKikimrSchemeOp::TColumnStoreDescription store; + UNIT_ASSERT(::google::protobuf::TextFormat::ParseFromString(scheme, &store)); + + auto request = std::make_unique<TEvTxUserProxy::TEvProposeTransaction>(); + request->Record.SetExecTimeoutPeriod(Max<ui64>()); + auto* op = request->Record.MutableTransaction()->MutableModifyScheme(); + op->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnStore); + op->SetWorkingDir("/Root"); + op->MutableCreateColumnStore()->CopyFrom(store); + + server.GetRuntime()->Send(new IEventHandle(MakeTxProxyID(), sender, request.release())); + auto ev = server.GetRuntime()->GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); + ui64 txId = ev->Get()->Record.GetTxId(); + WaitForSchemeOperation(server, sender, txId); + } + + void CreateTestOlapTable(Tests::TServer& server, TActorId sender, TString storeName, TString scheme) { + NKikimrSchemeOp::TColumnTableDescription table; + UNIT_ASSERT(::google::protobuf::TextFormat::ParseFromString(scheme, &table)); + auto request = std::make_unique<TEvTxUserProxy::TEvProposeTransaction>(); + request->Record.SetExecTimeoutPeriod(Max<ui64>()); + auto* op = request->Record.MutableTransaction()->MutableModifyScheme(); + op->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnTable); + op->SetWorkingDir("/Root/" + storeName); + op->MutableCreateColumnTable()->CopyFrom(table); + + server.GetRuntime()->Send(new IEventHandle(MakeTxProxyID(), sender, request.release())); + auto ev = server.GetRuntime()->GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); + ui64 txId = ev->Get()->Record.GetTxId(); + WaitForSchemeOperation(server, sender, txId); + } + + void CreateTestOlapTable(Tests::TServer& server, TString tableName = "olapTable", TString storeName = "olapStore", + ui32 storeShardsCount = 4, ui32 tableShardsCount = 3, + TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") { + TActorId sender = server.GetRuntime()->AllocateEdgeActor(); + CreateTestOlapStore(server, sender, Sprintf(R"( + Name: "%s" + ColumnShardCount: %d + SchemaPresets { + Name: "default" + Schema { + Columns { Name: "timestamp" Type: "Timestamp" } + #Columns { Name: "resource_type" Type: "Utf8" } + Columns { Name: "resource_id" Type: "Utf8" } + Columns { Name: "uid" Type: "Utf8" } + Columns { Name: "level" Type: "Int32" } + Columns { Name: "message" Type: "Utf8" } + #Columns { Name: "json_payload" Type: "Json" } + #Columns { Name: "ingested_at" Type: "Timestamp" } + #Columns { Name: "saved_at" Type: "Timestamp" } + #Columns { Name: "request_id" Type: "Utf8" } + KeyColumnNames: "timestamp" + Engine: COLUMN_ENGINE_REPLACING_TIMESERIES + } + } + )", storeName.c_str(), storeShardsCount)); + + TString shardingColumns = "[\"timestamp\", \"uid\"]"; + if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") { + shardingColumns = "[\"uid\"]"; + } + + CreateTestOlapTable(server, sender, storeName, Sprintf(R"( Name: "%s" - ColumnShardCount: 3 + ColumnShardCount: %d Sharding { HashSharding { - Function: HASH_FUNCTION_CLOUD_LOGS - Columns: ["timestamp", "uid"] + Function: %s + Columns: %s } - })", tableName.c_str())); + })", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str())); + } - legacyClient.Ls("/Root"); - legacyClient.Ls("/Root/olapStore"); - legacyClient.Ls("/Root/olapStore/" + tableName); + + void CreateTestOlapTable(TKikimrRunner& kikimr, TString tableName = "olapTable", TString storeName = "olapStore", + ui32 storeShardsCount = 4, ui32 tableShardsCount = 3, + TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") { + + CreateTestOlapTable(kikimr.GetTestServer(), tableName, storeName, storeShardsCount, tableShardsCount, + shardingFunction); } - std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { - auto schema = std::make_shared<arrow::Schema>( + std::shared_ptr<arrow::Schema> GetArrowSchema() { + return std::make_shared<arrow::Schema>( std::vector<std::shared_ptr<arrow::Field>>{ arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)), arrow::field("resource_id", arrow::utf8()), @@ -80,6 +202,10 @@ Y_UNIT_TEST_SUITE(KqpOlap) { arrow::field("level", arrow::int32()), arrow::field("message", arrow::utf8()) }); + } + + std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { + std::shared_ptr<arrow::Schema> schema = GetArrowSchema(); arrow::TimestampBuilder b1(arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO), arrow::default_memory_pool()); arrow::StringBuilder b2; @@ -146,6 +272,34 @@ Y_UNIT_TEST_SUITE(KqpOlap) { UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString()); } + void SendDataViaActorSystem(NActors::TTestActorRuntime* runtime, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { + std::shared_ptr<arrow::Schema> schema = GetArrowSchema(); + TString serializedSchema = NArrow::SerializeSchema(*schema); + Y_VERIFY(serializedSchema); + + auto batch = TestBlob(pathIdBegin, tsBegin, rowCount); + Y_VERIFY(batch); + + Ydb::Table::BulkUpsertRequest request; + request.mutable_arrow_batch_settings()->set_schema(serializedSchema); + request.set_data(batch); + request.set_table(testTable); + + size_t responses = 0; + auto future = NRpcService::DoLocalRpc<TEvBulkUpsertRequest>(std::move(request), "", "", runtime->GetActorSystem(0)); + future.Subscribe([&](const NThreading::TFuture<Ydb::Table::BulkUpsertResponse> f) mutable { + ++responses; + UNIT_ASSERT_VALUES_EQUAL(f.GetValueSync().operation().status(), Ydb::StatusIds::SUCCESS); + }); + + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return responses >= 1; + }; + + runtime->DispatchEvents(options); + } + TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPartIterator& it) { TVector<THashMap<TString, NYdb::TValue>> rows; @@ -1275,6 +1429,104 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } } + Y_UNIT_TEST_TWIN(ManyColumnShardsWithRestarts, UseSessionActor) { + // remove this return when bug with scan is fixed. + // todo: KIKIMR-15200 + return; + + TPortManager tp; + ui16 mbusport = tp.GetPort(2134); + auto settings = Tests::TServerSettings(mbusport) + .SetDomainName("Root") + .SetUseRealThreads(false) + .SetNodeCount(2); + + Tests::TServer::TPtr server = new Tests::TServer(settings); + + server->GetRuntime()->GetAppData().FeatureFlags.SetEnableOlapSchemaOperationsForTest(true); + + auto runtime = server->GetRuntime(); + auto sender = runtime->AllocateEdgeActor(); + + InitRoot(server, sender); + EnableDebugLogging(runtime); + + CreateTestOlapTable(*server, "largeOlapTable", "largeOlapStore", 100, 100); + ui32 insertRows = 0; + for(ui64 i = 0; i < 100; ++i) { + SendDataViaActorSystem(runtime, "/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i*1000000, 2000); + insertRows += 2000; + } + + ui64 result = 0; + THashSet<TActorId> columnShardScans; + ui64 rebootedScanCount = 0; + std::set<ui64> tabletIds; + + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto { + switch (ev->GetTypeRewrite()) { + case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: { + + auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>(); + for (auto& [shardId, nodeId]: msg->ShardNodes) { + Cerr << "-- nodeId: " << nodeId << Endl; + nodeId = runtime->GetNodeId(0); + tabletIds.insert(shardId); + } + break; + } + + case NKqp::TKqpExecuterEvents::EvStreamData: { + auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record; + + Cerr << (TStringBuilder() << "-- EvStreamData: " << record.AsJSON() << Endl); + Cerr.Flush(); + + Y_ASSERT(record.GetResultSet().rows().size() == 1); + Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); + result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); + + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + resp->Record.SetEnough(false); + resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo()); + resp->Record.SetFreeSpace(100); + runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); + return TTestActorRuntime::EEventAction::DROP; + } + + case NKqp::TKqpComputeEvents::EvScanData: { + auto it = columnShardScans.find(ev->Sender); + if (it != columnShardScans.end()) { + ++rebootedScanCount; + if (rebootedScanCount == 1) { + ui64 tabletIdToKill = *tabletIds.begin(); + NKikimr::RebootTablet(*runtime, tabletIdToKill, sender); + Cerr << (TStringBuilder() << "-- EvScanData from " << ev->Sender << ": hijack event, kill tablet " << tabletIdToKill << Endl); + Cerr.Flush(); + } + } else { + columnShardScans.insert(ev->Sender); + runtime->EnableScheduleForActor(ev->Sender); + Cerr << (TStringBuilder() << "-- EvScanData from " << ev->Sender << Endl); + Cerr.Flush(); + } + + break; + } + + default: + break; + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + + runtime->SetObserverFunc(captureEvents); + auto streamSender = runtime->AllocateEdgeActor(); + SendRequest(*runtime, streamSender, MakeStreamRequest(streamSender, "SELECT COUNT(*) FROM `/Root/largeOlapStore/largeOlapTable`;", false)); + auto ev = runtime->GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(streamSender); + UNIT_ASSERT_VALUES_EQUAL(result, insertRows); + } + Y_UNIT_TEST_TWIN(StatsSysViewColumns, UseSessionActor) { auto settings = TKikimrSettings() .SetWithSampleTables(false) diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 943360b370..ae87df537d 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -170,7 +170,14 @@ private: bool ProduceResults() { Y_VERIFY(!Finished); + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Start producing result" + << ", at: " << ScanActorId + << ", txId: " << TxId << ", scanId: " << ScanId << ", gen: " << ScanGen << ", table: " << TablePath); + if (ScanIterator->Finished()) { + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Producing result: scan iterator is finished" + << ", at: " << ScanActorId + << ", txId: " << TxId << ", scanId: " << ScanId << ", gen: " << ScanGen << ", table: " << TablePath); return false; } @@ -221,6 +228,9 @@ private: void ContinueProcessingStep() { if (!ScanIterator) { + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan iterator is not initialized" + << ", at: " << ScanActorId + << ", txId: " << TxId << ", scanId: " << ScanId << ", gen: " << ScanGen << ", table: " << TablePath); return; } |