summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <[email protected]>2023-03-19 20:12:11 +0300
committerulya-sidorina <[email protected]>2023-03-19 20:12:11 +0300
commitd303c1711ff5efe7d68ff92c6d57e831b4de16f1 (patch)
tree8555274794c4103128068c2b0f8427e87053adde
parent47866ecfec6a044227776f817bcd230b5ab59328 (diff)
add logging for stream lookup actor
feature(kqp): enable stream lookup by default
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp49
-rw-r--r--ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp151
-rw-r--r--ydb/core/tx/datashard/ut_kqp/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/ut_kqp/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/datashard/ut_kqp/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/ut_kqp/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/ut_kqp/ya.make1
7 files changed, 197 insertions, 8 deletions
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index f29b446e475..7afbf788d66 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -14,6 +14,7 @@
#include <ydb/core/kqp/common/kqp_event_ids.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/core/kqp/runtime/kqp_scan_data.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h>
namespace NKikimr {
namespace NKqp {
@@ -29,7 +30,8 @@ public:
const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, NKikimrKqp::TKqpStreamLookupSettings&& settings,
TIntrusivePtr<TKqpCounters> counters)
- : InputIndex(inputIndex), Input(input), ComputeActorId(computeActorId), TypeEnv(typeEnv)
+ : LogPrefix(TStringBuilder() << "StreamLookupActor, inputIndex: " << inputIndex << ", CA Id " << computeActorId)
+ , InputIndex(inputIndex), Input(input), ComputeActorId(computeActorId), TypeEnv(typeEnv)
, HolderFactory(holderFactory), Alloc(alloc), TablePath(settings.GetTable().GetPath())
, TableId(MakeTableId(settings.GetTable()))
, Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
@@ -77,6 +79,8 @@ public:
}
void Bootstrap() {
+ CA_LOG_D("Start stream lookup actor");
+
Counters->StreamLookupActorsCount->Inc();
ResolveTableShards();
Become(&TKqpStreamLookupActor::StateFunc);
@@ -215,6 +219,7 @@ private:
&& AllReadsFinished()
&& Results.empty();
+ CA_LOG_D("Returned " << totalDataSize << " bytes, finished: " << finished);
return totalDataSize;
}
@@ -253,6 +258,7 @@ private:
}
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
+ CA_LOG_D("TEvResolveKeySetResult was received for table: " << TablePath);
if (ev->Get()->Request->ErrorCount > 0) {
return RuntimeError(TStringBuilder() << "Failed to get partitioning for table: " << TableId,
NYql::NDqProto::StatusIds::SCHEME_ERROR);
@@ -268,6 +274,9 @@ private:
void Handle(TEvDataShard::TEvReadResult::TPtr& ev) {
const auto& record = ev->Get()->Record;
+ CA_LOG_D("TEvReadResult was received for table: " << TablePath <<
+ ", readId: " << record.GetReadId() << ", finished: " << record.GetFinished());
+
auto readIt = Reads.find(record.GetReadId());
YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << record.GetReadId());
auto& read = readIt->second;
@@ -292,13 +301,16 @@ private:
switch (record.GetStatus().GetCode()) {
case Ydb::StatusIds::SUCCESS:
break;
+ case Ydb::StatusIds::NOT_FOUND:
case Ydb::StatusIds::OVERLOADED:
case Ydb::StatusIds::INTERNAL_ERROR: {
++ErrorsCount;
- NKikimrTxDataShard::TReadContinuationToken continuationToken;
- bool parseResult = continuationToken.ParseFromString(record.GetContinuationToken());
- YQL_ENSURE(parseResult, "Failed to parse continuation token");
- YQL_ENSURE(continuationToken.GetFirstUnprocessedQuery() <= read.Keys.size());
+ TMaybe<NKikimrTxDataShard::TReadContinuationToken> continuationToken;
+ if (record.HasContinuationToken()) {
+ bool parseResult = continuationToken->ParseFromString(record.GetContinuationToken());
+ YQL_ENSURE(parseResult, "Failed to parse continuation token");
+ }
+
return RetryTableRead(read, continuationToken);
}
default: {
@@ -316,8 +328,12 @@ private:
THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck());
request->Record.SetReadId(record.GetReadId());
request->Record.SetSeqNo(record.GetSeqNo());
+ request->Record.SetMaxRows(Max<ui16>());
+ request->Record.SetMaxBytes(5_MB);
Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), read.ShardId, true),
IEventHandle::FlagTrackDelivery);
+
+ CA_LOG_D("TEvReadAck was sent to shard: " << read.ShardId);
}
Results.emplace_back(TResult{read.ShardId, THolder<TEventHandleFat<TEvDataShard::TEvReadResult>>(ev.Release())});
@@ -325,6 +341,8 @@ private:
}
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
+ CA_LOG_D("TEvDeliveryProblem was received from tablet: " << ev->Get()->TabletId);
+
const auto& tabletId = ev->Get()->TabletId;
auto shardIt = ReadsPerShard.find(tabletId);
YQL_ENSURE(shardIt != ReadsPerShard.end());
@@ -345,6 +363,9 @@ private:
}
void Handle(TEvPrivate::TEvSchemeCacheRequestTimeout::TPtr&) {
+ CA_LOG_D("TEvSchemeCacheRequestTimeout was received, shards for table " << TablePath
+ << " was resolved: " << !!Partitioning);
+
if (!Partitioning) {
RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << TableId
<< " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT);
@@ -402,6 +423,7 @@ private:
}
}
+ CA_LOG_D("Total batch size: " << totalSize << ", size limit exceeded: " << sizeLimitExceeded);
return totalSize;
}
@@ -505,6 +527,8 @@ private:
const auto readId = GetNextReadId();
TReadState read(readId, shardId, std::move(keys));
+ CA_LOG_D("Start reading of table: " << TablePath << ", readId: " << readId << ", shardId: " << shardId);
+
Counters->CreatedIterators->Inc();
THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead());
auto& record = request->Record;
@@ -518,6 +542,8 @@ private:
}
record.SetReadId(read.Id);
+ record.SetMaxRows(Max<ui16>());
+ record.SetMaxBytes(5_MB);
record.SetResultFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC);
record.MutableTableId()->SetOwnerId(TableId.PathId.OwnerId);
@@ -547,9 +573,13 @@ private:
return readIt->second;
}
- void RetryTableRead(TReadState& failedRead, NKikimrTxDataShard::TReadContinuationToken& token) {
- YQL_ENSURE(token.GetFirstUnprocessedQuery() <= failedRead.Keys.size());
- for (ui64 idx = token.GetFirstUnprocessedQuery(); idx < failedRead.Keys.size(); ++idx) {
+ void RetryTableRead(TReadState& failedRead, TMaybe<NKikimrTxDataShard::TReadContinuationToken>& token) {
+ CA_LOG_D("Retry reading of table: " << TablePath << ", readId: " << failedRead.Id
+ << ", shardId: " << failedRead.ShardId);
+
+ size_t firstUnprocessedQuery = token ? token->GetFirstUnprocessedQuery() : 0;
+ YQL_ENSURE(firstUnprocessedQuery <= failedRead.Keys.size());
+ for (ui64 idx = firstUnprocessedQuery; idx < failedRead.Keys.size(); ++idx) {
UnprocessedKeys.emplace_back(std::move(failedRead.Keys[idx]));
}
@@ -566,6 +596,8 @@ private:
}
void ResolveTableShards() {
+ CA_LOG_D("Resolve shards for table: " << TablePath);
+
Partitioning.reset();
auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
@@ -621,6 +653,7 @@ private:
}
private:
+ const TString LogPrefix;
const ui64 InputIndex;
NUdf::TUnboxedValue Input;
const NActors::TActorId ComputeActorId;
diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp
new file mode 100644
index 00000000000..d17cf96558d
--- /dev/null
+++ b/ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp
@@ -0,0 +1,151 @@
+#include "datashard_ut_common_kqp.h"
+
+namespace NKikimr {
+
+using namespace Tests;
+using namespace NDataShard::NKqpHelpers;
+
+namespace {
+ TString FillTableQuery() {
+ TStringBuilder sql;
+ sql << "UPSERT INTO `/Root/TestTable` (key, value) VALUES ";
+ for (size_t i = 0; i < 1000; ++i) {
+ sql << " (" << i << ", " << i << i << "),";
+ }
+ sql << " (10000, 10000);";
+ return sql;
+ }
+}
+
+Y_UNIT_TEST_SUITE(KqpStreamLookup) {
+ Y_UNIT_TEST(ReadTableDuringSplit) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto runtime = server->GetRuntime();
+ auto sender = runtime->AllocateEdgeActor();
+
+ InitRoot(server, sender);
+
+ // Split would fail otherwise :(
+ SetSplitMergePartCountLimit(server->GetRuntime(), -1);
+
+ CreateShardedTable(server, sender, "/Root", "TestTable", 1);
+ auto shards = GetTableShards(server, sender, "/Root/TestTable");
+
+ ExecSQL(server, sender, FillTableQuery());
+
+ bool readReceived = false;
+ auto captureEvents = [&](TTestActorRuntimeBase &, TAutoPtr <IEventHandle> &ev) {
+ if (ev->GetTypeRewrite() == TEvDataShard::TEvRead::EventType) {
+ IActor* actor = runtime->FindActor(ev->Sender);
+ if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR) {
+
+ if (!readReceived) {
+ auto senderSplit = runtime->AllocateEdgeActor();
+ ui64 txId = AsyncSplitTable(server, senderSplit, "/Root/TestTable", shards[0], 500);
+ Cerr << "--- split started ---" << Endl;
+ WaitTxNotification(server, senderSplit, txId);
+ Cerr << "--- split finished ---" << Endl;
+ shards = GetTableShards(server, sender, "/Root/TestTable");
+ UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
+
+ readReceived = true;
+ }
+ }
+ }
+
+ return false;
+ };
+
+ server->GetRuntime()->SetEventFilter(captureEvents);
+
+ SendSQL(server, sender, R"(
+ $keys = SELECT key FROM `/Root/TestTable`;
+ SELECT * FROM `/Root/TestTable` WHERE key IN $keys;
+ )");
+
+ auto reply = runtime->GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ auto results = reply->Get()->Record.GetRef().GetResponse().GetResults();
+ UNIT_ASSERT_VALUES_EQUAL(results.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(results[0].GetValue().GetStruct(0).ListSize(), 1000);
+ }
+
+ Y_UNIT_TEST(ReadTableWithIndexDuringSplit) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto runtime = server->GetRuntime();
+ auto sender = runtime->AllocateEdgeActor();
+
+ InitRoot(server, sender);
+
+ // Split would fail otherwise :(
+ SetSplitMergePartCountLimit(server->GetRuntime(), -1);
+
+ CreateShardedTable(server, sender, "/Root", "TestTable",
+ TShardedTableOptions()
+ .Columns({
+ {"key", "Uint32", true, false},
+ {"value", "Uint32", false, false},
+ })
+ .Indexes({
+ TShardedTableOptions::TIndex{
+ "by_value",
+ {"value"},
+ {},
+ NKikimrSchemeOp::EIndexTypeGlobal
+ }
+ })
+ );
+
+ auto shards = GetTableShards(server, sender, "/Root/TestTable");
+
+ ExecSQL(server, sender, FillTableQuery());
+
+ bool readReceived = false;
+ auto captureEvents = [&](TTestActorRuntimeBase &, TAutoPtr <IEventHandle> &ev) {
+ if (ev->GetTypeRewrite() == TEvDataShard::TEvRead::EventType) {
+ IActor* actor = runtime->FindActor(ev->Sender);
+ if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR) {
+
+ if (!readReceived) {
+ auto senderSplit = runtime->AllocateEdgeActor();
+ ui64 txId = AsyncSplitTable(server, senderSplit, "/Root/TestTable", shards[0], 500);
+ Cerr << "--- split started ---" << Endl;
+ WaitTxNotification(server, senderSplit, txId);
+ Cerr << "--- split finished ---" << Endl;
+ shards = GetTableShards(server, sender, "/Root/TestTable");
+ UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
+
+ readReceived = true;
+ }
+ }
+ }
+
+ return false;
+ };
+
+ server->GetRuntime()->SetEventFilter(captureEvents);
+
+ SendSQL(server, sender, R"(
+ SELECT * FROM `/Root/TestTable` VIEW by_value WHERE value = 500500;
+ )");
+
+ auto reply = runtime->GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ auto results = reply->Get()->Record.GetRef().GetResponse().GetResults();
+ UNIT_ASSERT_VALUES_EQUAL(results.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(results[0].GetValue().GetStruct(0).ListSize(), 1);
+ }
+
+} // Y_UNIT_TEST_SUITE(KqpStreamLookup)
+} // namespace NKikimr
+
diff --git a/ydb/core/tx/datashard/ut_kqp/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/ut_kqp/CMakeLists.darwin-x86_64.txt
index 60015599ab7..61a5c8b61a5 100644
--- a/ydb/core/tx/datashard/ut_kqp/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/datashard/ut_kqp/CMakeLists.darwin-x86_64.txt
@@ -42,6 +42,7 @@ target_link_options(ydb-core-tx-datashard-ut_kqp PRIVATE
target_sources(ydb-core-tx-datashard-ut_kqp PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_kqp.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp
)
set_property(
TARGET
diff --git a/ydb/core/tx/datashard/ut_kqp/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/ut_kqp/CMakeLists.linux-aarch64.txt
index d82f3b4945f..d63684cf756 100644
--- a/ydb/core/tx/datashard/ut_kqp/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/datashard/ut_kqp/CMakeLists.linux-aarch64.txt
@@ -44,6 +44,7 @@ target_link_options(ydb-core-tx-datashard-ut_kqp PRIVATE
target_sources(ydb-core-tx-datashard-ut_kqp PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_kqp.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp
)
set_property(
TARGET
diff --git a/ydb/core/tx/datashard/ut_kqp/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/ut_kqp/CMakeLists.linux-x86_64.txt
index a1e308e3a1d..733420c8cea 100644
--- a/ydb/core/tx/datashard/ut_kqp/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/datashard/ut_kqp/CMakeLists.linux-x86_64.txt
@@ -46,6 +46,7 @@ target_link_options(ydb-core-tx-datashard-ut_kqp PRIVATE
target_sources(ydb-core-tx-datashard-ut_kqp PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_kqp.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp
)
set_property(
TARGET
diff --git a/ydb/core/tx/datashard/ut_kqp/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/ut_kqp/CMakeLists.windows-x86_64.txt
index 1ed83413721..e2944594844 100644
--- a/ydb/core/tx/datashard/ut_kqp/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/datashard/ut_kqp/CMakeLists.windows-x86_64.txt
@@ -34,6 +34,7 @@ target_link_libraries(ydb-core-tx-datashard-ut_kqp PUBLIC
target_sources(ydb-core-tx-datashard-ut_kqp PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_kqp.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp
)
set_property(
TARGET
diff --git a/ydb/core/tx/datashard/ut_kqp/ya.make b/ydb/core/tx/datashard/ut_kqp/ya.make
index ed5caf38820..24550e5db06 100644
--- a/ydb/core/tx/datashard/ut_kqp/ya.make
+++ b/ydb/core/tx/datashard/ut_kqp/ya.make
@@ -32,6 +32,7 @@ SRCS(
datashard_ut_common.cpp
datashard_ut_common.h
datashard_ut_kqp.cpp
+ datashard_ut_kqp_stream_lookup.cpp
)
REQUIREMENTS(ram:32)