aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <gridnevvvit@gmail.com>2022-06-28 11:06:17 +0300
committerVitalii Gridnev <gridnevvvit@gmail.com>2022-06-28 11:06:17 +0300
commitd0cf09e9ddedf9721115af488fb1cacd3e2df042 (patch)
treeab0d3ed8d3f26c210c918f963635ac1048b16f14
parent140202a697f7a9d60b01de36c08f63ebd33fc5a9 (diff)
downloadydb-d0cf09e9ddedf9721115af488fb1cacd3e2df042.tar.gz
add column shards test with restarts
ref:75d33f456bc476cf478716ae2d3da236b4fd3c1b
-rw-r--r--ydb/core/grpc_services/rpc_load_rows.cpp70
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp9
-rw-r--r--ydb/core/kqp/ut/kqp_olap_ut.cpp346
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp10
4 files changed, 357 insertions, 78 deletions
diff --git a/ydb/core/grpc_services/rpc_load_rows.cpp b/ydb/core/grpc_services/rpc_load_rows.cpp
index a4936f1c35..23c163f6f6 100644
--- a/ydb/core/grpc_services/rpc_load_rows.cpp
+++ b/ydb/core/grpc_services/rpc_load_rows.cpp
@@ -94,6 +94,7 @@ bool CheckValueData(NScheme::TTypeId type, const TCell& cell, TString& err) {
return ok;
}
+
// TODO: no mapping for DATE, DATETIME, TZ_*, YSON, JSON, UUID, JSON_DOCUMENT, DYNUMBER
bool ConvertArrowToYdbPrimitive(const arrow::DataType& type, Ydb::Type& toType) {
switch (type.id()) {
@@ -177,11 +178,15 @@ bool ConvertArrowToYdbPrimitive(const arrow::DataType& type, Ydb::Type& toType)
using TEvBulkUpsertRequest = TGrpcRequestOperationCall<Ydb::Table::BulkUpsertRequest,
Ydb::Table::BulkUpsertResponse>;
+const Ydb::Table::BulkUpsertRequest* GetProtoRequest(IRequestOpCtx* req) {
+ return TEvBulkUpsertRequest::GetProtoRequest(req);
+}
+
class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ> {
using TBase = NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ>;
public:
- explicit TUploadRowsRPCPublic(TEvBulkUpsertRequest* request)
- : TBase(GetDuration(request->GetProtoRequest()->operation_params().operation_timeout()))
+ explicit TUploadRowsRPCPublic(IRequestOpCtx* request)
+ : TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout()))
, Request(request)
{}
@@ -298,7 +303,7 @@ private:
private:
bool ReportCostInfoEnabled() const {
- return Request->GetProtoRequest()->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED;
+ return GetProtoRequest(Request.get())->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED;
}
TString GetDatabase()override {
@@ -306,7 +311,7 @@ private:
}
const TString& GetTable() override {
- return Request->GetProtoRequest()->table();
+ return GetProtoRequest(Request.get())->table();
}
const TVector<std::pair<TSerializedCellVec, TString>>& GetRows() const override {
@@ -339,7 +344,7 @@ private:
if (!resolveResult) {
TStringStream explanation;
explanation << "Access denied for " << userToken.GetUserSID()
- << " table '" << Request->GetProtoRequest()->table()
+ << " table '" << GetProtoRequest(Request.get())->table()
<< "' has not been resolved yet";
errorMessage = explanation.Str();
@@ -353,7 +358,7 @@ private:
TStringStream explanation;
explanation << "Access denied for " << userToken.GetUserSID()
<< " with access " << NACLib::AccessRightsToString(access)
- << " to table '" << Request->GetProtoRequest()->table() << "'";
+ << " to table '" << GetProtoRequest(Request.get())->table() << "'";
errorMessage = explanation.Str();
return false;
@@ -365,7 +370,7 @@ private:
TVector<std::pair<TString, Ydb::Type>> GetRequestColumns(TString& errorMessage) const override {
Y_UNUSED(errorMessage);
- const auto& type = Request->GetProtoRequest()->Getrows().Gettype();
+ const auto& type = GetProtoRequest(Request.get())->Getrows().Gettype();
const auto& rowType = type.Getlist_type();
const auto& rowFields = rowType.Getitem().Getstruct_type().Getmembers();
@@ -395,7 +400,7 @@ private:
// For each row in values
TMemoryPool valueDataPool(256);
- const auto& rows = Request->GetProtoRequest()->Getrows().Getvalue().Getitems();
+ const auto& rows = GetProtoRequest(Request.get())->Getrows().Getvalue().Getitems();
for (const auto& r : rows) {
valueDataPool.Clear();
@@ -436,25 +441,25 @@ private:
}
private:
- std::unique_ptr<TEvBulkUpsertRequest> Request;
+ std::unique_ptr<IRequestOpCtx> Request;
TVector<std::pair<TSerializedCellVec, TString>> AllRows;
};
class TUploadColumnsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ> {
using TBase = NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ>;
public:
- explicit TUploadColumnsRPCPublic(TEvBulkUpsertRequest* request)
- : TBase(GetDuration(request->GetProtoRequest()->operation_params().operation_timeout()))
+ explicit TUploadColumnsRPCPublic(IRequestOpCtx* request)
+ : TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout()))
, Request(request)
{}
private:
bool ReportCostInfoEnabled() const {
- return Request->GetProtoRequest()->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED;
+ return GetProtoRequest(Request.get())->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED;
}
EUploadSource GetSourceType() const override {
- auto* req = Request->GetProtoRequest();
+ auto* req = GetProtoRequest(Request.get());
if (req->has_arrow_batch_settings()) {
return EUploadSource::ArrowBatch;
}
@@ -469,7 +474,7 @@ private:
}
const TString& GetTable() override {
- return Request->GetProtoRequest()->table();
+ return GetProtoRequest(Request.get())->table();
}
const TVector<std::pair<TSerializedCellVec, TString>>& GetRows() const override {
@@ -477,13 +482,13 @@ private:
}
const TString& GetSourceData() const override {
- return Request->GetProtoRequest()->data();
+ return GetProtoRequest(Request.get())->data();
}
const TString& GetSourceSchema() const override {
static const TString none;
- if (Request->GetProtoRequest()->has_arrow_batch_settings()) {
- return Request->GetProtoRequest()->arrow_batch_settings().schema();
+ if (GetProtoRequest(Request.get())->has_arrow_batch_settings()) {
+ return GetProtoRequest(Request.get())->arrow_batch_settings().schema();
}
return none;
}
@@ -514,7 +519,7 @@ private:
if (!resolveResult) {
TStringStream explanation;
explanation << "Access denied for " << userToken.GetUserSID()
- << " table '" << Request->GetProtoRequest()->table()
+ << " table '" << GetProtoRequest(Request.get())->table()
<< "' has not been resolved yet";
errorMessage = explanation.Str();
@@ -528,7 +533,7 @@ private:
TStringStream explanation;
explanation << "Access denied for " << userToken.GetUserSID()
<< " with access " << NACLib::AccessRightsToString(access)
- << " to table '" << Request->GetProtoRequest()->table() << "'";
+ << " to table '" << GetProtoRequest(Request.get())->table() << "'";
errorMessage = explanation.Str();
return false;
@@ -650,26 +655,35 @@ private:
}
private:
- std::unique_ptr<TEvBulkUpsertRequest> Request;
+ std::unique_ptr<IRequestOpCtx> Request;
TVector<std::pair<TSerializedCellVec, TString>> Rows;
const Ydb::Formats::CsvSettings& GetCsvSettings() const {
- return Request->GetProtoRequest()->csv_settings();
+ return GetProtoRequest(Request.get())->csv_settings();
}
};
void DoBulkUpsertRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ if (GetProtoRequest(p.get())->has_arrow_batch_settings()) {
+ TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(p.release()));
+ } else if (GetProtoRequest(p.get())->has_csv_settings()) {
+ TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(p.release()));
+ } else {
+ TActivationContext::AsActorContext().Register(new TUploadRowsRPCPublic(p.release()));
+ }
+}
- auto* req = dynamic_cast<TEvBulkUpsertRequest*>(p.release());
- Y_VERIFY(req != nullptr, "Wrong using of TGRpcRequestWrapper");
- if (req->GetProtoRequest()->has_arrow_batch_settings()) {
- TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(req));
- } else if (req->GetProtoRequest()->has_csv_settings()) {
- TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(req));
+template<>
+IActor* TEvBulkUpsertRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) {
+ if (GetProtoRequest(msg)->has_arrow_batch_settings()) {
+ return new TUploadColumnsRPCPublic(msg);
+ } else if (GetProtoRequest(msg)->has_csv_settings()) {
+ return new TUploadColumnsRPCPublic(msg);
} else {
- TActivationContext::AsActorContext().Register(new TUploadRowsRPCPublic(req));
+ return new TUploadRowsRPCPublic(msg);
}
}
+
} // namespace NKikimr
} // namespace NGRpcService
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index c21e335639..014d92c394 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -282,7 +282,8 @@ private:
return;
CA_LOG_D("Got EvScanInitActor from " << scanActorId << ", gen: " << msg.GetGeneration()
- << ", state: " << EShardStateToString(state->State) << ", stateGen: " << state->Generation);
+ << ", state: " << EShardStateToString(state->State) << ", stateGen: " << state->Generation
+ << ", tabletId: " << state->TabletId);
YQL_ENSURE(state->Generation == msg.GetGeneration());
@@ -367,7 +368,8 @@ private:
<< ", from: " << ev->Sender << ", shards remain: " << PendingShards.size()
<< ", in flight shards " << InFlightShards.size()
<< ", LastKey " << PrintLastKey()
- << ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimiter");
+ << ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimiter"
+ << ", tabletId: " << state->TabletId);
if (rowsCount == 0 && !msg.Finished && state->State != EShardState::PostRunning) {
SendScanDataAck(state);
@@ -434,7 +436,8 @@ private:
CA_LOG_W("Got EvScanError scan state: " << EShardStateToString(state->State)
<< " status: " << Ydb::StatusIds_StatusCode_Name(status)
- << ", reason: " << issues.ToString());
+ << ", reason: " << issues.ToString()
+ << ", tablet id: " << state->TabletId);
YQL_ENSURE(state->Generation == msg.GetGeneration());
diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp
index e15400986c..dc1792acce 100644
--- a/ydb/core/kqp/ut/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp
@@ -7,72 +7,194 @@
#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/writer.h>
+#include <ydb/core/kqp/executer/kqp_executer.h>
+#include <ydb/core/tx/datashard/datashard.h>
+#include <ydb/core/tx/datashard/datashard_ut_common_kqp.h>
+#include <ydb/core/tx/datashard/datashard_ut_common.h>
+#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/core/testlib/test_client.h>
+#include <ydb/core/testlib/tablet_helpers.h>
+
namespace NKikimr {
namespace NKqp {
+using namespace NKikimr::NDataShard::NKqpHelpers;
+using namespace NSchemeShard;
+using namespace NActors;
using namespace NYdb;
using namespace NYdb::NTable;
using namespace NYdb::NScheme;
+using TEvBulkUpsertRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::Table::BulkUpsertRequest,
+ Ydb::Table::BulkUpsertResponse>;
+
+void InitRoot(Tests::TServer::TPtr server,
+ TActorId sender)
+{
+ if (server->GetSettings().StoragePoolTypes.empty()) {
+ return;
+ }
+
+ auto &runtime = *server->GetRuntime();
+ auto &settings = server->GetSettings();
+
+ auto tid = Tests::ChangeStateStorage(Tests::SchemeRoot, settings.Domain);
+ const TDomainsInfo::TDomain& domain = runtime.GetAppData().DomainsInfo->GetDomain(settings.Domain);
+
+ auto evTx = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(1, tid);
+ auto transaction = evTx->Record.AddTransaction();
+ transaction->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSubDomain);
+ transaction->SetWorkingDir("/");
+ auto op = transaction->MutableSubDomain();
+ op->SetName(domain.Name);
+
+ for (const auto& [kind, pool] : settings.StoragePoolTypes) {
+ auto* p = op->AddStoragePools();
+ p->SetKind(kind);
+ p->SetName(pool.GetName());
+ }
+
+ runtime.SendToPipe(tid, sender, evTx.Release(), 0, GetPipeConfigWithRetries());
+
+ {
+ TAutoPtr<IEventHandle> handle;
+ auto event = runtime.GrabEdgeEvent<TEvSchemeShard::TEvModifySchemeTransactionResult>(handle);
+ UNIT_ASSERT_VALUES_EQUAL(event->Record.GetSchemeshardId(), tid);
+ UNIT_ASSERT_VALUES_EQUAL(event->Record.GetStatus(), NKikimrScheme::EStatus::StatusAccepted);
+ }
+
+ auto evSubscribe = MakeHolder<TEvSchemeShard::TEvNotifyTxCompletion>(1);
+ runtime.SendToPipe(tid, sender, evSubscribe.Release(), 0, GetPipeConfigWithRetries());
+
+ {
+ TAutoPtr<IEventHandle> handle;
+ auto event = runtime.GrabEdgeEvent<TEvSchemeShard::TEvNotifyTxCompletionResult>(handle);
+ UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), 1);
+ }
+}
+
Y_UNIT_TEST_SUITE(KqpOlap) {
+ void EnableDebugLogging(NActors::TTestActorRuntime* runtime) {
+ //runtime->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_DEBUG);
+ // runtime->SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NActors::NLog::PRI_DEBUG);
+ // runtime->SetLogPriority(NKikimrServices::SCHEME_BOARD_REPLICA, NActors::NLog::PRI_DEBUG);
+ // runtime->SetLogPriority(NKikimrServices::TX_PROXY, NActors::NLog::PRI_DEBUG);
+ runtime->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_DEBUG);
+ runtime->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG);
+ runtime->SetLogPriority(NKikimrServices::KQP_GATEWAY, NActors::NLog::PRI_DEBUG);
+ runtime->SetLogPriority(NKikimrServices::KQP_RESOURCE_MANAGER, NActors::NLog::PRI_DEBUG);
+ //runtime->SetLogPriority(NKikimrServices::LONG_TX_SERVICE, NActors::NLog::PRI_DEBUG);
+ runtime->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
+ runtime->SetLogPriority(NKikimrServices::TX_COLUMNSHARD_SCAN, NActors::NLog::PRI_DEBUG);
+ //runtime->SetLogPriority(NKikimrServices::TX_OLAPSHARD, NActors::NLog::PRI_DEBUG);
+ //runtime->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_DEBUG);
+ //runtime->SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_DEBUG);
+ //runtime->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG);
+ }
+
void EnableDebugLogging(TKikimrRunner& kikimr) {
- // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_DEBUG);
- // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NActors::NLog::PRI_DEBUG);
- // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::SCHEME_BOARD_REPLICA, NActors::NLog::PRI_DEBUG);
- // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::TX_PROXY, NActors::NLog::PRI_DEBUG);
- // kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_DEBUG);
- kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG);
- kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_GATEWAY, NActors::NLog::PRI_DEBUG);
- kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_RESOURCE_MANAGER, NActors::NLog::PRI_DEBUG);
- kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::LONG_TX_SERVICE, NActors::NLog::PRI_DEBUG);
- kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
- kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD_SCAN, NActors::NLog::PRI_DEBUG);
- kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::TX_OLAPSHARD, NActors::NLog::PRI_DEBUG);
- kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_DEBUG);
- kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_DEBUG);
+ EnableDebugLogging(kikimr.GetTestServer().GetRuntime());
}
- void CreateTestOlapTable(TKikimrRunner& kikimr, TString tableName = "olapTable") {
- auto& legacyClient = kikimr.GetTestClient();
+ void WaitForSchemeOperation(Tests::TServer& server, TActorId sender, ui64 txId) {
+ auto &runtime = *server.GetRuntime();
+ auto &settings = server.GetSettings();
+ auto request = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>();
+ request->Record.SetTxId(txId);
+ auto tid = Tests::ChangeStateStorage(Tests::SchemeRoot, settings.Domain);
+ runtime.SendToPipe(tid, sender, request.Release(), 0, GetPipeConfigWithRetries());
+ runtime.GrabEdgeEventRethrow<TEvSchemeShard::TEvNotifyTxCompletionResult>(sender);
+ }
- legacyClient.CreateOlapStore("/Root", R"(
- Name: "olapStore"
- ColumnShardCount: 4
- SchemaPresets {
- Name: "default"
- Schema {
- Columns { Name: "timestamp" Type: "Timestamp" }
- #Columns { Name: "resource_type" Type: "Utf8" }
- Columns { Name: "resource_id" Type: "Utf8" }
- Columns { Name: "uid" Type: "Utf8" }
- Columns { Name: "level" Type: "Int32" }
- Columns { Name: "message" Type: "Utf8" }
- #Columns { Name: "json_payload" Type: "Json" }
- #Columns { Name: "ingested_at" Type: "Timestamp" }
- #Columns { Name: "saved_at" Type: "Timestamp" }
- #Columns { Name: "request_id" Type: "Utf8" }
- KeyColumnNames: "timestamp"
- Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
- }
- }
- )");
- legacyClient.CreateOlapTable("/Root/olapStore", Sprintf(R"(
+ void CreateTestOlapStore(Tests::TServer& server, TActorId sender, TString scheme) {
+ NKikimrSchemeOp::TColumnStoreDescription store;
+ UNIT_ASSERT(::google::protobuf::TextFormat::ParseFromString(scheme, &store));
+
+ auto request = std::make_unique<TEvTxUserProxy::TEvProposeTransaction>();
+ request->Record.SetExecTimeoutPeriod(Max<ui64>());
+ auto* op = request->Record.MutableTransaction()->MutableModifyScheme();
+ op->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnStore);
+ op->SetWorkingDir("/Root");
+ op->MutableCreateColumnStore()->CopyFrom(store);
+
+ server.GetRuntime()->Send(new IEventHandle(MakeTxProxyID(), sender, request.release()));
+ auto ev = server.GetRuntime()->GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
+ ui64 txId = ev->Get()->Record.GetTxId();
+ WaitForSchemeOperation(server, sender, txId);
+ }
+
+ void CreateTestOlapTable(Tests::TServer& server, TActorId sender, TString storeName, TString scheme) {
+ NKikimrSchemeOp::TColumnTableDescription table;
+ UNIT_ASSERT(::google::protobuf::TextFormat::ParseFromString(scheme, &table));
+ auto request = std::make_unique<TEvTxUserProxy::TEvProposeTransaction>();
+ request->Record.SetExecTimeoutPeriod(Max<ui64>());
+ auto* op = request->Record.MutableTransaction()->MutableModifyScheme();
+ op->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnTable);
+ op->SetWorkingDir("/Root/" + storeName);
+ op->MutableCreateColumnTable()->CopyFrom(table);
+
+ server.GetRuntime()->Send(new IEventHandle(MakeTxProxyID(), sender, request.release()));
+ auto ev = server.GetRuntime()->GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
+ ui64 txId = ev->Get()->Record.GetTxId();
+ WaitForSchemeOperation(server, sender, txId);
+ }
+
+ void CreateTestOlapTable(Tests::TServer& server, TString tableName = "olapTable", TString storeName = "olapStore",
+ ui32 storeShardsCount = 4, ui32 tableShardsCount = 3,
+ TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") {
+ TActorId sender = server.GetRuntime()->AllocateEdgeActor();
+ CreateTestOlapStore(server, sender, Sprintf(R"(
+ Name: "%s"
+ ColumnShardCount: %d
+ SchemaPresets {
+ Name: "default"
+ Schema {
+ Columns { Name: "timestamp" Type: "Timestamp" }
+ #Columns { Name: "resource_type" Type: "Utf8" }
+ Columns { Name: "resource_id" Type: "Utf8" }
+ Columns { Name: "uid" Type: "Utf8" }
+ Columns { Name: "level" Type: "Int32" }
+ Columns { Name: "message" Type: "Utf8" }
+ #Columns { Name: "json_payload" Type: "Json" }
+ #Columns { Name: "ingested_at" Type: "Timestamp" }
+ #Columns { Name: "saved_at" Type: "Timestamp" }
+ #Columns { Name: "request_id" Type: "Utf8" }
+ KeyColumnNames: "timestamp"
+ Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
+ }
+ }
+ )", storeName.c_str(), storeShardsCount));
+
+ TString shardingColumns = "[\"timestamp\", \"uid\"]";
+ if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") {
+ shardingColumns = "[\"uid\"]";
+ }
+
+ CreateTestOlapTable(server, sender, storeName, Sprintf(R"(
Name: "%s"
- ColumnShardCount: 3
+ ColumnShardCount: %d
Sharding {
HashSharding {
- Function: HASH_FUNCTION_CLOUD_LOGS
- Columns: ["timestamp", "uid"]
+ Function: %s
+ Columns: %s
}
- })", tableName.c_str()));
+ })", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str()));
+ }
- legacyClient.Ls("/Root");
- legacyClient.Ls("/Root/olapStore");
- legacyClient.Ls("/Root/olapStore/" + tableName);
+
+ void CreateTestOlapTable(TKikimrRunner& kikimr, TString tableName = "olapTable", TString storeName = "olapStore",
+ ui32 storeShardsCount = 4, ui32 tableShardsCount = 3,
+ TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") {
+
+ CreateTestOlapTable(kikimr.GetTestServer(), tableName, storeName, storeShardsCount, tableShardsCount,
+ shardingFunction);
}
- std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
- auto schema = std::make_shared<arrow::Schema>(
+ std::shared_ptr<arrow::Schema> GetArrowSchema() {
+ return std::make_shared<arrow::Schema>(
std::vector<std::shared_ptr<arrow::Field>>{
arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)),
arrow::field("resource_id", arrow::utf8()),
@@ -80,6 +202,10 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
arrow::field("level", arrow::int32()),
arrow::field("message", arrow::utf8())
});
+ }
+
+ std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
+ std::shared_ptr<arrow::Schema> schema = GetArrowSchema();
arrow::TimestampBuilder b1(arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO), arrow::default_memory_pool());
arrow::StringBuilder b2;
@@ -146,6 +272,34 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString());
}
+ void SendDataViaActorSystem(NActors::TTestActorRuntime* runtime, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
+ std::shared_ptr<arrow::Schema> schema = GetArrowSchema();
+ TString serializedSchema = NArrow::SerializeSchema(*schema);
+ Y_VERIFY(serializedSchema);
+
+ auto batch = TestBlob(pathIdBegin, tsBegin, rowCount);
+ Y_VERIFY(batch);
+
+ Ydb::Table::BulkUpsertRequest request;
+ request.mutable_arrow_batch_settings()->set_schema(serializedSchema);
+ request.set_data(batch);
+ request.set_table(testTable);
+
+ size_t responses = 0;
+ auto future = NRpcService::DoLocalRpc<TEvBulkUpsertRequest>(std::move(request), "", "", runtime->GetActorSystem(0));
+ future.Subscribe([&](const NThreading::TFuture<Ydb::Table::BulkUpsertResponse> f) mutable {
+ ++responses;
+ UNIT_ASSERT_VALUES_EQUAL(f.GetValueSync().operation().status(), Ydb::StatusIds::SUCCESS);
+ });
+
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return responses >= 1;
+ };
+
+ runtime->DispatchEvents(options);
+ }
+
TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPartIterator& it) {
TVector<THashMap<TString, NYdb::TValue>> rows;
@@ -1275,6 +1429,104 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
}
+ Y_UNIT_TEST_TWIN(ManyColumnShardsWithRestarts, UseSessionActor) {
+ // remove this return when bug with scan is fixed.
+ // todo: KIKIMR-15200
+ return;
+
+ TPortManager tp;
+ ui16 mbusport = tp.GetPort(2134);
+ auto settings = Tests::TServerSettings(mbusport)
+ .SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetNodeCount(2);
+
+ Tests::TServer::TPtr server = new Tests::TServer(settings);
+
+ server->GetRuntime()->GetAppData().FeatureFlags.SetEnableOlapSchemaOperationsForTest(true);
+
+ auto runtime = server->GetRuntime();
+ auto sender = runtime->AllocateEdgeActor();
+
+ InitRoot(server, sender);
+ EnableDebugLogging(runtime);
+
+ CreateTestOlapTable(*server, "largeOlapTable", "largeOlapStore", 100, 100);
+ ui32 insertRows = 0;
+ for(ui64 i = 0; i < 100; ++i) {
+ SendDataViaActorSystem(runtime, "/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i*1000000, 2000);
+ insertRows += 2000;
+ }
+
+ ui64 result = 0;
+ THashSet<TActorId> columnShardScans;
+ ui64 rebootedScanCount = 0;
+ std::set<ui64> tabletIds;
+
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: {
+
+ auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>();
+ for (auto& [shardId, nodeId]: msg->ShardNodes) {
+ Cerr << "-- nodeId: " << nodeId << Endl;
+ nodeId = runtime->GetNodeId(0);
+ tabletIds.insert(shardId);
+ }
+ break;
+ }
+
+ case NKqp::TKqpExecuterEvents::EvStreamData: {
+ auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record;
+
+ Cerr << (TStringBuilder() << "-- EvStreamData: " << record.AsJSON() << Endl);
+ Cerr.Flush();
+
+ Y_ASSERT(record.GetResultSet().rows().size() == 1);
+ Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
+ result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();
+
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ resp->Record.SetEnough(false);
+ resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
+ resp->Record.SetFreeSpace(100);
+ runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+
+ case NKqp::TKqpComputeEvents::EvScanData: {
+ auto it = columnShardScans.find(ev->Sender);
+ if (it != columnShardScans.end()) {
+ ++rebootedScanCount;
+ if (rebootedScanCount == 1) {
+ ui64 tabletIdToKill = *tabletIds.begin();
+ NKikimr::RebootTablet(*runtime, tabletIdToKill, sender);
+ Cerr << (TStringBuilder() << "-- EvScanData from " << ev->Sender << ": hijack event, kill tablet " << tabletIdToKill << Endl);
+ Cerr.Flush();
+ }
+ } else {
+ columnShardScans.insert(ev->Sender);
+ runtime->EnableScheduleForActor(ev->Sender);
+ Cerr << (TStringBuilder() << "-- EvScanData from " << ev->Sender << Endl);
+ Cerr.Flush();
+ }
+
+ break;
+ }
+
+ default:
+ break;
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+
+ runtime->SetObserverFunc(captureEvents);
+ auto streamSender = runtime->AllocateEdgeActor();
+ SendRequest(*runtime, streamSender, MakeStreamRequest(streamSender, "SELECT COUNT(*) FROM `/Root/largeOlapStore/largeOlapTable`;", false));
+ auto ev = runtime->GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(streamSender);
+ UNIT_ASSERT_VALUES_EQUAL(result, insertRows);
+ }
+
Y_UNIT_TEST_TWIN(StatsSysViewColumns, UseSessionActor) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 943360b370..ae87df537d 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -170,7 +170,14 @@ private:
bool ProduceResults() {
Y_VERIFY(!Finished);
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Start producing result"
+ << ", at: " << ScanActorId
+ << ", txId: " << TxId << ", scanId: " << ScanId << ", gen: " << ScanGen << ", table: " << TablePath);
+
if (ScanIterator->Finished()) {
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Producing result: scan iterator is finished"
+ << ", at: " << ScanActorId
+ << ", txId: " << TxId << ", scanId: " << ScanId << ", gen: " << ScanGen << ", table: " << TablePath);
return false;
}
@@ -221,6 +228,9 @@ private:
void ContinueProcessingStep() {
if (!ScanIterator) {
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan iterator is not initialized"
+ << ", at: " << ScanActorId
+ << ", txId: " << TxId << ", scanId: " << ScanId << ", gen: " << ScanGen << ", table: " << TablePath);
return;
}