diff options
author | ulya-sidorina <[email protected]> | 2023-03-19 20:12:11 +0300 |
---|---|---|
committer | ulya-sidorina <[email protected]> | 2023-03-19 20:12:11 +0300 |
commit | d303c1711ff5efe7d68ff92c6d57e831b4de16f1 (patch) | |
tree | 8555274794c4103128068c2b0f8427e87053adde | |
parent | 47866ecfec6a044227776f817bcd230b5ab59328 (diff) |
add logging for stream lookup actor
feature(kqp): enable stream lookup by default
7 files changed, 197 insertions, 8 deletions
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index f29b446e475..7afbf788d66 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -14,6 +14,7 @@ #include <ydb/core/kqp/common/kqp_event_ids.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/core/kqp/runtime/kqp_scan_data.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h> namespace NKikimr { namespace NKqp { @@ -29,7 +30,8 @@ public: const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters> counters) - : InputIndex(inputIndex), Input(input), ComputeActorId(computeActorId), TypeEnv(typeEnv) + : LogPrefix(TStringBuilder() << "StreamLookupActor, inputIndex: " << inputIndex << ", CA Id " << computeActorId) + , InputIndex(inputIndex), Input(input), ComputeActorId(computeActorId), TypeEnv(typeEnv) , HolderFactory(holderFactory), Alloc(alloc), TablePath(settings.GetTable().GetPath()) , TableId(MakeTableId(settings.GetTable())) , Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId()) @@ -77,6 +79,8 @@ public: } void Bootstrap() { + CA_LOG_D("Start stream lookup actor"); + Counters->StreamLookupActorsCount->Inc(); ResolveTableShards(); Become(&TKqpStreamLookupActor::StateFunc); @@ -215,6 +219,7 @@ private: && AllReadsFinished() && Results.empty(); + CA_LOG_D("Returned " << totalDataSize << " bytes, finished: " << finished); return totalDataSize; } @@ -253,6 +258,7 @@ private: } void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) { + CA_LOG_D("TEvResolveKeySetResult was received for table: " << TablePath); if (ev->Get()->Request->ErrorCount > 0) { return RuntimeError(TStringBuilder() << "Failed to get partitioning for table: " << TableId, NYql::NDqProto::StatusIds::SCHEME_ERROR); @@ -268,6 +274,9 @@ private: void Handle(TEvDataShard::TEvReadResult::TPtr& ev) { const auto& record = ev->Get()->Record; + CA_LOG_D("TEvReadResult was received for table: " << TablePath << + ", readId: " << record.GetReadId() << ", finished: " << record.GetFinished()); + auto readIt = Reads.find(record.GetReadId()); YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << record.GetReadId()); auto& read = readIt->second; @@ -292,13 +301,16 @@ private: switch (record.GetStatus().GetCode()) { case Ydb::StatusIds::SUCCESS: break; + case Ydb::StatusIds::NOT_FOUND: case Ydb::StatusIds::OVERLOADED: case Ydb::StatusIds::INTERNAL_ERROR: { ++ErrorsCount; - NKikimrTxDataShard::TReadContinuationToken continuationToken; - bool parseResult = continuationToken.ParseFromString(record.GetContinuationToken()); - YQL_ENSURE(parseResult, "Failed to parse continuation token"); - YQL_ENSURE(continuationToken.GetFirstUnprocessedQuery() <= read.Keys.size()); + TMaybe<NKikimrTxDataShard::TReadContinuationToken> continuationToken; + if (record.HasContinuationToken()) { + bool parseResult = continuationToken->ParseFromString(record.GetContinuationToken()); + YQL_ENSURE(parseResult, "Failed to parse continuation token"); + } + return RetryTableRead(read, continuationToken); } default: { @@ -316,8 +328,12 @@ private: THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck()); request->Record.SetReadId(record.GetReadId()); request->Record.SetSeqNo(record.GetSeqNo()); + request->Record.SetMaxRows(Max<ui16>()); + request->Record.SetMaxBytes(5_MB); Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), read.ShardId, true), IEventHandle::FlagTrackDelivery); + + CA_LOG_D("TEvReadAck was sent to shard: " << read.ShardId); } Results.emplace_back(TResult{read.ShardId, THolder<TEventHandleFat<TEvDataShard::TEvReadResult>>(ev.Release())}); @@ -325,6 +341,8 @@ private: } void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { + CA_LOG_D("TEvDeliveryProblem was received from tablet: " << ev->Get()->TabletId); + const auto& tabletId = ev->Get()->TabletId; auto shardIt = ReadsPerShard.find(tabletId); YQL_ENSURE(shardIt != ReadsPerShard.end()); @@ -345,6 +363,9 @@ private: } void Handle(TEvPrivate::TEvSchemeCacheRequestTimeout::TPtr&) { + CA_LOG_D("TEvSchemeCacheRequestTimeout was received, shards for table " << TablePath + << " was resolved: " << !!Partitioning); + if (!Partitioning) { RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << TableId << " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT); @@ -402,6 +423,7 @@ private: } } + CA_LOG_D("Total batch size: " << totalSize << ", size limit exceeded: " << sizeLimitExceeded); return totalSize; } @@ -505,6 +527,8 @@ private: const auto readId = GetNextReadId(); TReadState read(readId, shardId, std::move(keys)); + CA_LOG_D("Start reading of table: " << TablePath << ", readId: " << readId << ", shardId: " << shardId); + Counters->CreatedIterators->Inc(); THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead()); auto& record = request->Record; @@ -518,6 +542,8 @@ private: } record.SetReadId(read.Id); + record.SetMaxRows(Max<ui16>()); + record.SetMaxBytes(5_MB); record.SetResultFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC); record.MutableTableId()->SetOwnerId(TableId.PathId.OwnerId); @@ -547,9 +573,13 @@ private: return readIt->second; } - void RetryTableRead(TReadState& failedRead, NKikimrTxDataShard::TReadContinuationToken& token) { - YQL_ENSURE(token.GetFirstUnprocessedQuery() <= failedRead.Keys.size()); - for (ui64 idx = token.GetFirstUnprocessedQuery(); idx < failedRead.Keys.size(); ++idx) { + void RetryTableRead(TReadState& failedRead, TMaybe<NKikimrTxDataShard::TReadContinuationToken>& token) { + CA_LOG_D("Retry reading of table: " << TablePath << ", readId: " << failedRead.Id + << ", shardId: " << failedRead.ShardId); + + size_t firstUnprocessedQuery = token ? token->GetFirstUnprocessedQuery() : 0; + YQL_ENSURE(firstUnprocessedQuery <= failedRead.Keys.size()); + for (ui64 idx = firstUnprocessedQuery; idx < failedRead.Keys.size(); ++idx) { UnprocessedKeys.emplace_back(std::move(failedRead.Keys[idx])); } @@ -566,6 +596,8 @@ private: } void ResolveTableShards() { + CA_LOG_D("Resolve shards for table: " << TablePath); + Partitioning.reset(); auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>(); @@ -621,6 +653,7 @@ private: } private: + const TString LogPrefix; const ui64 InputIndex; NUdf::TUnboxedValue Input; const NActors::TActorId ComputeActorId; diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp new file mode 100644 index 00000000000..d17cf96558d --- /dev/null +++ b/ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp @@ -0,0 +1,151 @@ +#include "datashard_ut_common_kqp.h" + +namespace NKikimr { + +using namespace Tests; +using namespace NDataShard::NKqpHelpers; + +namespace { + TString FillTableQuery() { + TStringBuilder sql; + sql << "UPSERT INTO `/Root/TestTable` (key, value) VALUES "; + for (size_t i = 0; i < 1000; ++i) { + sql << " (" << i << ", " << i << i << "),"; + } + sql << " (10000, 10000);"; + return sql; + } +} + +Y_UNIT_TEST_SUITE(KqpStreamLookup) { + Y_UNIT_TEST(ReadTableDuringSplit) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto runtime = server->GetRuntime(); + auto sender = runtime->AllocateEdgeActor(); + + InitRoot(server, sender); + + // Split would fail otherwise :( + SetSplitMergePartCountLimit(server->GetRuntime(), -1); + + CreateShardedTable(server, sender, "/Root", "TestTable", 1); + auto shards = GetTableShards(server, sender, "/Root/TestTable"); + + ExecSQL(server, sender, FillTableQuery()); + + bool readReceived = false; + auto captureEvents = [&](TTestActorRuntimeBase &, TAutoPtr <IEventHandle> &ev) { + if (ev->GetTypeRewrite() == TEvDataShard::TEvRead::EventType) { + IActor* actor = runtime->FindActor(ev->Sender); + if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR) { + + if (!readReceived) { + auto senderSplit = runtime->AllocateEdgeActor(); + ui64 txId = AsyncSplitTable(server, senderSplit, "/Root/TestTable", shards[0], 500); + Cerr << "--- split started ---" << Endl; + WaitTxNotification(server, senderSplit, txId); + Cerr << "--- split finished ---" << Endl; + shards = GetTableShards(server, sender, "/Root/TestTable"); + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u); + + readReceived = true; + } + } + } + + return false; + }; + + server->GetRuntime()->SetEventFilter(captureEvents); + + SendSQL(server, sender, R"( + $keys = SELECT key FROM `/Root/TestTable`; + SELECT * FROM `/Root/TestTable` WHERE key IN $keys; + )"); + + auto reply = runtime->GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(sender); + UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS); + auto results = reply->Get()->Record.GetRef().GetResponse().GetResults(); + UNIT_ASSERT_VALUES_EQUAL(results.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(results[0].GetValue().GetStruct(0).ListSize(), 1000); + } + + Y_UNIT_TEST(ReadTableWithIndexDuringSplit) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto runtime = server->GetRuntime(); + auto sender = runtime->AllocateEdgeActor(); + + InitRoot(server, sender); + + // Split would fail otherwise :( + SetSplitMergePartCountLimit(server->GetRuntime(), -1); + + CreateShardedTable(server, sender, "/Root", "TestTable", + TShardedTableOptions() + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}, + }) + .Indexes({ + TShardedTableOptions::TIndex{ + "by_value", + {"value"}, + {}, + NKikimrSchemeOp::EIndexTypeGlobal + } + }) + ); + + auto shards = GetTableShards(server, sender, "/Root/TestTable"); + + ExecSQL(server, sender, FillTableQuery()); + + bool readReceived = false; + auto captureEvents = [&](TTestActorRuntimeBase &, TAutoPtr <IEventHandle> &ev) { + if (ev->GetTypeRewrite() == TEvDataShard::TEvRead::EventType) { + IActor* actor = runtime->FindActor(ev->Sender); + if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR) { + + if (!readReceived) { + auto senderSplit = runtime->AllocateEdgeActor(); + ui64 txId = AsyncSplitTable(server, senderSplit, "/Root/TestTable", shards[0], 500); + Cerr << "--- split started ---" << Endl; + WaitTxNotification(server, senderSplit, txId); + Cerr << "--- split finished ---" << Endl; + shards = GetTableShards(server, sender, "/Root/TestTable"); + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u); + + readReceived = true; + } + } + } + + return false; + }; + + server->GetRuntime()->SetEventFilter(captureEvents); + + SendSQL(server, sender, R"( + SELECT * FROM `/Root/TestTable` VIEW by_value WHERE value = 500500; + )"); + + auto reply = runtime->GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(sender); + UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS); + auto results = reply->Get()->Record.GetRef().GetResponse().GetResults(); + UNIT_ASSERT_VALUES_EQUAL(results.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(results[0].GetValue().GetStruct(0).ListSize(), 1); + } + +} // Y_UNIT_TEST_SUITE(KqpStreamLookup) +} // namespace NKikimr + diff --git a/ydb/core/tx/datashard/ut_kqp/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/ut_kqp/CMakeLists.darwin-x86_64.txt index 60015599ab7..61a5c8b61a5 100644 --- a/ydb/core/tx/datashard/ut_kqp/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/datashard/ut_kqp/CMakeLists.darwin-x86_64.txt @@ -42,6 +42,7 @@ target_link_options(ydb-core-tx-datashard-ut_kqp PRIVATE target_sources(ydb-core-tx-datashard-ut_kqp PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_kqp.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp ) set_property( TARGET diff --git a/ydb/core/tx/datashard/ut_kqp/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/ut_kqp/CMakeLists.linux-aarch64.txt index d82f3b4945f..d63684cf756 100644 --- a/ydb/core/tx/datashard/ut_kqp/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/datashard/ut_kqp/CMakeLists.linux-aarch64.txt @@ -44,6 +44,7 @@ target_link_options(ydb-core-tx-datashard-ut_kqp PRIVATE target_sources(ydb-core-tx-datashard-ut_kqp PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_kqp.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp ) set_property( TARGET diff --git a/ydb/core/tx/datashard/ut_kqp/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/ut_kqp/CMakeLists.linux-x86_64.txt index a1e308e3a1d..733420c8cea 100644 --- a/ydb/core/tx/datashard/ut_kqp/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/datashard/ut_kqp/CMakeLists.linux-x86_64.txt @@ -46,6 +46,7 @@ target_link_options(ydb-core-tx-datashard-ut_kqp PRIVATE target_sources(ydb-core-tx-datashard-ut_kqp PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_kqp.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp ) set_property( TARGET diff --git a/ydb/core/tx/datashard/ut_kqp/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/ut_kqp/CMakeLists.windows-x86_64.txt index 1ed83413721..e2944594844 100644 --- a/ydb/core/tx/datashard/ut_kqp/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/datashard/ut_kqp/CMakeLists.windows-x86_64.txt @@ -34,6 +34,7 @@ target_link_libraries(ydb-core-tx-datashard-ut_kqp PUBLIC target_sources(ydb-core-tx-datashard-ut_kqp PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_kqp.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp ) set_property( TARGET diff --git a/ydb/core/tx/datashard/ut_kqp/ya.make b/ydb/core/tx/datashard/ut_kqp/ya.make index ed5caf38820..24550e5db06 100644 --- a/ydb/core/tx/datashard/ut_kqp/ya.make +++ b/ydb/core/tx/datashard/ut_kqp/ya.make @@ -32,6 +32,7 @@ SRCS( datashard_ut_common.cpp datashard_ut_common.h datashard_ut_kqp.cpp + datashard_ut_kqp_stream_lookup.cpp ) REQUIREMENTS(ram:32) |