diff options
author | Semyon Danilov <senya@ydb.tech> | 2024-01-11 15:27:56 +0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-11 15:27:56 +0400 |
commit | 34f54c96857e8c3de2ec6bc5816509ecb0986c47 (patch) | |
tree | 864d395db06df0c5eeb2f53c3bdcaf25f1dd274f | |
parent | 57a192d4f253ebd3c1bcf84d61912f81ba3b99f0 (diff) | |
download | ydb-34f54c96857e8c3de2ec6bc5816509ecb0986c47.tar.gz |
Fix datashard read traces (#913)
Fixes #916
-rw-r--r-- | ydb/core/tablet_flat/shared_sausagecache.cpp | 10 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_trace.cpp | 114 | ||||
-rw-r--r-- | ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/ut_common/datashard_ut_common.h | 3 |
5 files changed, 108 insertions, 30 deletions
diff --git a/ydb/core/tablet_flat/shared_sausagecache.cpp b/ydb/core/tablet_flat/shared_sausagecache.cpp index 8ca7054cc3..5aa3d5e5b5 100644 --- a/ydb/core/tablet_flat/shared_sausagecache.cpp +++ b/ydb/core/tablet_flat/shared_sausagecache.cpp @@ -258,9 +258,10 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> { }; struct TRequest : public TSimpleRefCount<TRequest> { - TRequest(TIntrusiveConstPtr<NPageCollection::IPageCollection> pageCollection) + TRequest(TIntrusiveConstPtr<NPageCollection::IPageCollection> pageCollection, NWilson::TTraceId &&traceId) : Label(pageCollection->Label()) , PageCollection(std::move(pageCollection)) + , TraceId(std::move(traceId)) { } @@ -275,6 +276,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> { ui64 PendingBlocks = 0; TBlocks ReadyBlocks; TDeque<ui32> PagesToRequest; + NWilson::TTraceId TraceId; }; struct TExpectant { @@ -587,7 +589,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> { } } - auto waitingRequest = MakeIntrusive<TRequest>(std::move(msg->Fetch->PageCollection)); + auto waitingRequest = MakeIntrusive<TRequest>(std::move(msg->Fetch->PageCollection), std::move(msg->Fetch->TraceId)); waitingRequest->Source = ev->Sender; waitingRequest->Owner = msg->Owner; @@ -674,7 +676,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> { } else { AddInFlyPages(pagesToRequest.size(), pagesToRequestBytes); // fetch cookie -> requested size - auto *fetch = new NPageCollection::TFetch(pagesToRequestBytes, waitingRequest->PageCollection, std::move(pagesToRequest)); + auto *fetch = new NPageCollection::TFetch(pagesToRequestBytes, waitingRequest->PageCollection, std::move(pagesToRequest), std::move(waitingRequest->TraceId)); NBlockIO::Start(this, waitingRequest->Owner, 0, waitingRequest->Priority, fetch); } } @@ -758,7 +760,7 @@ class TSharedPageCache : public TActorBootstrapped<TSharedPageCache> { AddInFlyPages(toLoad.size(), sizeToLoad); // fetch cookie -> requested size; // event cookie -> ptr to queue - auto *fetch = new NPageCollection::TFetch(sizeToLoad, wa.PageCollection, std::move(toLoad)); + auto *fetch = new NPageCollection::TFetch(sizeToLoad, wa.PageCollection, std::move(toLoad), std::move(wa.TraceId)); NBlockIO::Start(this, wa.Owner, (ui64)&queue, wa.Priority, fetch); } } diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 6552f276d9..8dca4a6ddc 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -2107,7 +2107,6 @@ public: const ui64 tieBreaker = Self->NextTieBreakerIndex++; Op = new TReadOperation(Self, ctx.Now(), tieBreaker, Ev); - Op->OperationSpan = NWilson::TSpan(TWilsonTablet::Tablet, readSpan.GetTraceId(), "ReadIterator.ReadOperation", NWilson::EFlags::AUTO_END); Op->BuildExecutionPlan(false); Self->Pipeline.GetExecutionUnit(Op->GetCurrentUnit()).AddOperation(Op); @@ -2537,7 +2536,8 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct auto* request = ev->Get(); if (!request->ReadSpan) { - request->ReadSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(ev->TraceId), "DataShard.Read"); + request->ReadSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(ev->TraceId), "Datashard.Read", NWilson::EFlags::AUTO_END); + request->ReadSpan.Attribute("Shard", std::to_string(TabletID())); } const auto& record = request->Record; diff --git a/ydb/core/tx/datashard/datashard_ut_trace.cpp b/ydb/core/tx/datashard/datashard_ut_trace.cpp index 1bd642c9e1..50a5670865 100644 --- a/ydb/core/tx/datashard/datashard_ut_trace.cpp +++ b/ydb/core/tx/datashard/datashard_ut_trace.cpp @@ -21,6 +21,93 @@ using namespace NDataShardReadTableTest; Y_UNIT_TEST_SUITE(TDataShardTrace) { + void ExecSQL(Tests::TServer::TPtr server, + TActorId sender, + const TString &sql, + Ydb::StatusIds::StatusCode code, + NWilson::TTraceId traceId = {}) + { + google::protobuf::Arena arena; + auto &runtime = *server->GetRuntime(); + TAutoPtr<IEventHandle> handle; + + THolder<NKqp::TEvKqp::TEvQueryRequest> request; + if (traceId) { + struct RequestCtx : NGRpcService::IRequestCtxMtSafe { + RequestCtx(NWilson::TTraceId &&traceId) : TraceId(std::move(traceId)) {} + + NWilson::TTraceId GetWilsonTraceId() const override { + return TraceId.Clone(); + } + + TMaybe<TString> GetTraceId() const override { + return Nothing(); + } + + const TMaybe<TString> GetDatabaseName() const override { + return ""; + } + + const TIntrusiveConstPtr<NACLib::TUserToken>& GetInternalToken() const override { + return Ptr; + } + + const TString& GetSerializedToken() const override { + return Token; + } + + bool IsClientLost() const override { + return false; + }; + + virtual const google::protobuf::Message* GetRequest() const override { + return nullptr; + }; + + const TMaybe<TString> GetRequestType() const override { + return "_document_api_request"; + }; + + void SetFinishAction(std::function<void()>&& cb) override { + Y_UNUSED(cb); + }; + + google::protobuf::Arena* GetArena() override { + return nullptr; + }; + + TIntrusiveConstPtr<NACLib::TUserToken> Ptr; + TString Token; + NWilson::TTraceId TraceId; + }; + + auto *txControl = google::protobuf::Arena::CreateMessage<Ydb::Table::TransactionControl>(&arena); + txControl->mutable_begin_tx()->mutable_serializable_read_write(); + txControl->set_commit_tx(true); + + auto ptr = std::make_shared<RequestCtx>(std::move(traceId)); + request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>( + NKikimrKqp::QUERY_ACTION_EXECUTE, + NKikimrKqp::QUERY_TYPE_SQL_DML, + TActorId(), + ptr, + TString(), //sessionId + TString(sql), + TString(), //queryId + txControl, //tx_control + nullptr, //ydbParameters + Ydb::Table::QueryStatsCollection::STATS_COLLECTION_UNSPECIFIED, //collectStats + nullptr, // query_cache_policy + nullptr //operationParams + ); + } else { + request = MakeSQLRequest(sql, true); + } + runtime.Send(new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release(), 0, 0, nullptr)); + auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(sender); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetRef().GetYdbStatus(), code); + } + class FakeWilsonUploader : public TActorBootstrapped<FakeWilsonUploader> { public: class Span { @@ -232,7 +319,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);", - true, Ydb::StatusIds::SUCCESS, std::move(traceId) ); @@ -292,7 +378,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);", - true, Ydb::StatusIds::SUCCESS ); @@ -300,7 +385,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 100), (4, 300), (6, 500), (8, 700), (10, 900);", - true, Ydb::StatusIds::SUCCESS ); @@ -326,7 +410,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { server, sender, "SELECT * FROM `/Root/table-1` WHERE key = 1 OR key = 3 OR key = 5 OR key = 7 OR key = 9;", - true, Ydb::StatusIds::SUCCESS, std::move(traceId) ); @@ -342,23 +425,23 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { auto lookupActorSpan = trace.Root.BFSFindOne("LookupActor"); UNIT_ASSERT(lookupActorSpan); - auto dsReads = lookupActorSpan->get().FindAll("DataShard.Read"); // Lookup actor sends EvRead to each shard. + auto dsReads = lookupActorSpan->get().FindAll("Datashard.Read"); // Lookup actor sends EvRead to each shard. UNIT_ASSERT_EQUAL(dsReads.size(), 2); canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) " ", (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) " - ", (ComputeActor -> [(LookupActor -> [(WaitForShardsResolve) , (DataShard.Read " + ", (ComputeActor -> [(LookupActor -> [(WaitForShardsResolve) , (Datashard.Read " "-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) " ", (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) " ", (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) " ", (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog " - "-> [(Tablet.WriteLog.LogEntry)])]) , (ReadIterator.ReadOperation)]) , (DataShard.Read " + "-> [(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Read " "-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) " ", (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) " ", (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) " ", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) " - ", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])]) " - ", (ReadIterator.ReadOperation)])])]) , (ComputeActor) , (RunTasks)])])"; + ", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])" + "])])]) , (ComputeActor) , (RunTasks)])])"; } else { auto deSpan = trace.Root.BFSFindOne("DataExecuter"); UNIT_ASSERT(deSpan); @@ -419,7 +502,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);", - true, Ydb::StatusIds::SUCCESS ); @@ -427,7 +509,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 100), (4, 300), (6, 500), (8, 700), (10, 900);", - true, Ydb::StatusIds::SUCCESS ); @@ -437,7 +518,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { server, sender, "SELECT * FROM `/Root/table-1`;", - true, Ydb::StatusIds::SUCCESS, std::move(traceId) ); @@ -451,17 +531,17 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { auto readActorSpan = trace.Root.BFSFindOne("ReadActor"); UNIT_ASSERT(readActorSpan); - auto dsReads = readActorSpan->get().FindAll("DataShard.Read"); // Read actor sends EvRead to each shard. + auto dsReads = readActorSpan->get().FindAll("Datashard.Read"); // Read actor sends EvRead to each shard. UNIT_ASSERT_EQUAL(dsReads.size(), 2); std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , " "(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) , " "(RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , " - "(DataShard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , " - "(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])]) , " - "(ReadIterator.ReadOperation)]) , (DataShard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> " + "(Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , " + "(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])" + "]) , (Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> " "[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> " - "[(Tablet.WriteLog.LogEntry)])]) , (ReadIterator.ReadOperation)])])])])])"; + "[(Tablet.WriteLog.LogEntry)])])])])])])])"; UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString()); } diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp index 6245c081e5..7946929529 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp @@ -1796,14 +1796,11 @@ void ExecSQL(Tests::TServer::TPtr server, TActorId sender, const TString &sql, bool dml, - Ydb::StatusIds::StatusCode code, - NWilson::TTraceId traceId) + Ydb::StatusIds::StatusCode code) { auto &runtime = *server->GetRuntime(); - TAutoPtr<IEventHandle> handle; - auto request = MakeSQLRequest(sql, dml); - runtime.Send(new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release(), 0, 0, nullptr, std::move(traceId))); + runtime.Send(new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release(), 0, 0, nullptr)); auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(sender); UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetRef().GetYdbStatus(), code); } diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h index c090313bc9..0ae7f51ebc 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h @@ -708,8 +708,7 @@ void ExecSQL(Tests::TServer::TPtr server, TActorId sender, const TString &sql, bool dml = true, - Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS, - NWilson::TTraceId traceId = {}); + Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS); NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {}); NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {}); |