aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSemyon Danilov <senya@ydb.tech>2024-01-11 15:27:56 +0400
committerGitHub <noreply@github.com>2024-01-11 15:27:56 +0400
commit34f54c96857e8c3de2ec6bc5816509ecb0986c47 (patch)
tree864d395db06df0c5eeb2f53c3bdcaf25f1dd274f
parent57a192d4f253ebd3c1bcf84d61912f81ba3b99f0 (diff)
downloadydb-34f54c96857e8c3de2ec6bc5816509ecb0986c47.tar.gz
Fix datashard read traces (#913)
Fixes #916
-rw-r--r--ydb/core/tablet_flat/shared_sausagecache.cpp10
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_ut_trace.cpp114
-rw-r--r--ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp7
-rw-r--r--ydb/core/tx/datashard/ut_common/datashard_ut_common.h3
5 files changed, 108 insertions, 30 deletions
diff --git a/ydb/core/tablet_flat/shared_sausagecache.cpp b/ydb/core/tablet_flat/shared_sausagecache.cpp
index 8ca7054cc3..5aa3d5e5b5 100644
--- a/ydb/core/tablet_flat/shared_sausagecache.cpp
+++ b/ydb/core/tablet_flat/shared_sausagecache.cpp
@@ -258,9 +258,10 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
};
struct TRequest : public TSimpleRefCount<TRequest> {
- TRequest(TIntrusiveConstPtr<NPageCollection::IPageCollection> pageCollection)
+ TRequest(TIntrusiveConstPtr<NPageCollection::IPageCollection> pageCollection, NWilson::TTraceId &&traceId)
: Label(pageCollection->Label())
, PageCollection(std::move(pageCollection))
+ , TraceId(std::move(traceId))
{
}
@@ -275,6 +276,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
ui64 PendingBlocks = 0;
TBlocks ReadyBlocks;
TDeque<ui32> PagesToRequest;
+ NWilson::TTraceId TraceId;
};
struct TExpectant {
@@ -587,7 +589,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
}
}
- auto waitingRequest = MakeIntrusive<TRequest>(std::move(msg->Fetch->PageCollection));
+ auto waitingRequest = MakeIntrusive<TRequest>(std::move(msg->Fetch->PageCollection), std::move(msg->Fetch->TraceId));
waitingRequest->Source = ev->Sender;
waitingRequest->Owner = msg->Owner;
@@ -674,7 +676,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
} else {
AddInFlyPages(pagesToRequest.size(), pagesToRequestBytes);
// fetch cookie -> requested size
- auto *fetch = new NPageCollection::TFetch(pagesToRequestBytes, waitingRequest->PageCollection, std::move(pagesToRequest));
+ auto *fetch = new NPageCollection::TFetch(pagesToRequestBytes, waitingRequest->PageCollection, std::move(pagesToRequest), std::move(waitingRequest->TraceId));
NBlockIO::Start(this, waitingRequest->Owner, 0, waitingRequest->Priority, fetch);
}
}
@@ -758,7 +760,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> {
AddInFlyPages(toLoad.size(), sizeToLoad);
// fetch cookie -> requested size;
// event cookie -> ptr to queue
- auto *fetch = new NPageCollection::TFetch(sizeToLoad, wa.PageCollection, std::move(toLoad));
+ auto *fetch = new NPageCollection::TFetch(sizeToLoad, wa.PageCollection, std::move(toLoad), std::move(wa.TraceId));
NBlockIO::Start(this, wa.Owner, (ui64)&queue, wa.Priority, fetch);
}
}
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index 6552f276d9..8dca4a6ddc 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -2107,7 +2107,6 @@ public:
const ui64 tieBreaker = Self->NextTieBreakerIndex++;
Op = new TReadOperation(Self, ctx.Now(), tieBreaker, Ev);
- Op->OperationSpan = NWilson::TSpan(TWilsonTablet::Tablet, readSpan.GetTraceId(), "ReadIterator.ReadOperation", NWilson::EFlags::AUTO_END);
Op->BuildExecutionPlan(false);
Self->Pipeline.GetExecutionUnit(Op->GetCurrentUnit()).AddOperation(Op);
@@ -2537,7 +2536,8 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
auto* request = ev->Get();
if (!request->ReadSpan) {
- request->ReadSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(ev->TraceId), "DataShard.Read");
+ request->ReadSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(ev->TraceId), "Datashard.Read", NWilson::EFlags::AUTO_END);
+ request->ReadSpan.Attribute("Shard", std::to_string(TabletID()));
}
const auto& record = request->Record;
diff --git a/ydb/core/tx/datashard/datashard_ut_trace.cpp b/ydb/core/tx/datashard/datashard_ut_trace.cpp
index 1bd642c9e1..50a5670865 100644
--- a/ydb/core/tx/datashard/datashard_ut_trace.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_trace.cpp
@@ -21,6 +21,93 @@ using namespace NDataShardReadTableTest;
Y_UNIT_TEST_SUITE(TDataShardTrace) {
+ void ExecSQL(Tests::TServer::TPtr server,
+ TActorId sender,
+ const TString &sql,
+ Ydb::StatusIds::StatusCode code,
+ NWilson::TTraceId traceId = {})
+ {
+ google::protobuf::Arena arena;
+ auto &runtime = *server->GetRuntime();
+ TAutoPtr<IEventHandle> handle;
+
+ THolder<NKqp::TEvKqp::TEvQueryRequest> request;
+ if (traceId) {
+ struct RequestCtx : NGRpcService::IRequestCtxMtSafe {
+ RequestCtx(NWilson::TTraceId &&traceId) : TraceId(std::move(traceId)) {}
+
+ NWilson::TTraceId GetWilsonTraceId() const override {
+ return TraceId.Clone();
+ }
+
+ TMaybe<TString> GetTraceId() const override {
+ return Nothing();
+ }
+
+ const TMaybe<TString> GetDatabaseName() const override {
+ return "";
+ }
+
+ const TIntrusiveConstPtr<NACLib::TUserToken>& GetInternalToken() const override {
+ return Ptr;
+ }
+
+ const TString& GetSerializedToken() const override {
+ return Token;
+ }
+
+ bool IsClientLost() const override {
+ return false;
+ };
+
+ virtual const google::protobuf::Message* GetRequest() const override {
+ return nullptr;
+ };
+
+ const TMaybe<TString> GetRequestType() const override {
+ return "_document_api_request";
+ };
+
+ void SetFinishAction(std::function<void()>&& cb) override {
+ Y_UNUSED(cb);
+ };
+
+ google::protobuf::Arena* GetArena() override {
+ return nullptr;
+ };
+
+ TIntrusiveConstPtr<NACLib::TUserToken> Ptr;
+ TString Token;
+ NWilson::TTraceId TraceId;
+ };
+
+ auto *txControl = google::protobuf::Arena::CreateMessage<Ydb::Table::TransactionControl>(&arena);
+ txControl->mutable_begin_tx()->mutable_serializable_read_write();
+ txControl->set_commit_tx(true);
+
+ auto ptr = std::make_shared<RequestCtx>(std::move(traceId));
+ request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
+ NKikimrKqp::QUERY_ACTION_EXECUTE,
+ NKikimrKqp::QUERY_TYPE_SQL_DML,
+ TActorId(),
+ ptr,
+ TString(), //sessionId
+ TString(sql),
+ TString(), //queryId
+ txControl, //tx_control
+ nullptr, //ydbParameters
+ Ydb::Table::QueryStatsCollection::STATS_COLLECTION_UNSPECIFIED, //collectStats
+ nullptr, // query_cache_policy
+ nullptr //operationParams
+ );
+ } else {
+ request = MakeSQLRequest(sql, true);
+ }
+ runtime.Send(new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release(), 0, 0, nullptr));
+ auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetRef().GetYdbStatus(), code);
+ }
+
class FakeWilsonUploader : public TActorBootstrapped<FakeWilsonUploader> {
public:
class Span {
@@ -232,7 +319,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
server,
sender,
"UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);",
- true,
Ydb::StatusIds::SUCCESS,
std::move(traceId)
);
@@ -292,7 +378,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
server,
sender,
"UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);",
- true,
Ydb::StatusIds::SUCCESS
);
@@ -300,7 +385,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
server,
sender,
"UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 100), (4, 300), (6, 500), (8, 700), (10, 900);",
- true,
Ydb::StatusIds::SUCCESS
);
@@ -326,7 +410,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
server,
sender,
"SELECT * FROM `/Root/table-1` WHERE key = 1 OR key = 3 OR key = 5 OR key = 7 OR key = 9;",
- true,
Ydb::StatusIds::SUCCESS,
std::move(traceId)
);
@@ -342,23 +425,23 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
auto lookupActorSpan = trace.Root.BFSFindOne("LookupActor");
UNIT_ASSERT(lookupActorSpan);
- auto dsReads = lookupActorSpan->get().FindAll("DataShard.Read"); // Lookup actor sends EvRead to each shard.
+ auto dsReads = lookupActorSpan->get().FindAll("Datashard.Read"); // Lookup actor sends EvRead to each shard.
UNIT_ASSERT_EQUAL(dsReads.size(), 2);
canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) "
", (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) "
- ", (ComputeActor -> [(LookupActor -> [(WaitForShardsResolve) , (DataShard.Read "
+ ", (ComputeActor -> [(LookupActor -> [(WaitForShardsResolve) , (Datashard.Read "
"-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) "
", (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) "
", (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) "
", (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog "
- "-> [(Tablet.WriteLog.LogEntry)])]) , (ReadIterator.ReadOperation)]) , (DataShard.Read "
+ "-> [(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Read "
"-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) "
", (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) "
", (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) "
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) "
- ", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])]) "
- ", (ReadIterator.ReadOperation)])])]) , (ComputeActor) , (RunTasks)])])";
+ ", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])"
+ "])])]) , (ComputeActor) , (RunTasks)])])";
} else {
auto deSpan = trace.Root.BFSFindOne("DataExecuter");
UNIT_ASSERT(deSpan);
@@ -419,7 +502,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
server,
sender,
"UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);",
- true,
Ydb::StatusIds::SUCCESS
);
@@ -427,7 +509,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
server,
sender,
"UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 100), (4, 300), (6, 500), (8, 700), (10, 900);",
- true,
Ydb::StatusIds::SUCCESS
);
@@ -437,7 +518,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
server,
sender,
"SELECT * FROM `/Root/table-1`;",
- true,
Ydb::StatusIds::SUCCESS,
std::move(traceId)
);
@@ -451,17 +531,17 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
auto readActorSpan = trace.Root.BFSFindOne("ReadActor");
UNIT_ASSERT(readActorSpan);
- auto dsReads = readActorSpan->get().FindAll("DataShard.Read"); // Read actor sends EvRead to each shard.
+ auto dsReads = readActorSpan->get().FindAll("Datashard.Read"); // Read actor sends EvRead to each shard.
UNIT_ASSERT_EQUAL(dsReads.size(), 2);
std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , "
"(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) , "
"(RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , "
- "(DataShard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
- "(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])]) , "
- "(ReadIterator.ReadOperation)]) , (DataShard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
+ "(Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
+ "(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])"
+ "]) , (Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
"[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
- "[(Tablet.WriteLog.LogEntry)])]) , (ReadIterator.ReadOperation)])])])])])";
+ "[(Tablet.WriteLog.LogEntry)])])])])])])])";
UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString());
}
diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
index 6245c081e5..7946929529 100644
--- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
@@ -1796,14 +1796,11 @@ void ExecSQL(Tests::TServer::TPtr server,
TActorId sender,
const TString &sql,
bool dml,
- Ydb::StatusIds::StatusCode code,
- NWilson::TTraceId traceId)
+ Ydb::StatusIds::StatusCode code)
{
auto &runtime = *server->GetRuntime();
- TAutoPtr<IEventHandle> handle;
-
auto request = MakeSQLRequest(sql, dml);
- runtime.Send(new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release(), 0, 0, nullptr, std::move(traceId)));
+ runtime.Send(new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release(), 0, 0, nullptr));
auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(sender);
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetRef().GetYdbStatus(), code);
}
diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
index c090313bc9..0ae7f51ebc 100644
--- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h
@@ -708,8 +708,7 @@ void ExecSQL(Tests::TServer::TPtr server,
TActorId sender,
const TString &sql,
bool dml = true,
- Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS,
- NWilson::TTraceId traceId = {});
+ Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS);
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {});
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {});