diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-03-31 16:36:54 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-03-31 16:36:54 +0300 |
commit | e7cd96e8f346577527021d248f37763808b624d7 (patch) | |
tree | 871179d0e0764c1885fb74798edb58f3f9dde68d | |
parent | 986e64263081c9086bbac0384d3d2ef455252760 (diff) | |
download | ydb-e7cd96e8f346577527021d248f37763808b624d7.tar.gz |
Change sender's viewer
-rw-r--r-- | ydb/core/cms/ui/datashard_info.js | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender.cpp | 109 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_async_index.cpp | 51 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 63 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.cpp | 160 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_monitoring.cpp | 116 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_monitoring.h | 45 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 21 | ||||
-rw-r--r-- | ydb/core/tx/datashard/index.html | 4 |
14 files changed, 539 insertions, 46 deletions
diff --git a/ydb/core/cms/ui/datashard_info.js b/ydb/core/cms/ui/datashard_info.js index f1c8bad40f..1c35b4cc7f 100644 --- a/ydb/core/cms/ui/datashard_info.js +++ b/ydb/core/cms/ui/datashard_info.js @@ -104,6 +104,7 @@ function onDataShardInfoLoaded(data) { $('#tablet-info-role').text(info.IsFollower ? 'Follower' : 'Leader'); $('#tablet-info-state').text(info.State + (info.IsActive ? ' (active)' : ' (inactive)')); $('#tablet-info-shared-blobs').text(info.HasSharedBlobs); + $('#tablet-info-change-sender').html('<a href="app?TabletID=' + TabletId + '&page=change-sender">Viewer</a>'); var activities = data.Activities; if (activities) { diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt index 36883adf5d..4c48c4f6ce 100644 --- a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt @@ -113,6 +113,7 @@ target_link_libraries(core-tx-datashard PUBLIC cpp-html-pcdata library-cpp-json cpp-json-yson + monlib-service-pages cpp-string_utils-base64 cpp-string_utils-quote ydb-core-actorlib_impl @@ -164,6 +165,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_common_ops.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_monitoring.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_commit_writes_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_data_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_distributed_erase_tx_unit.cpp @@ -370,6 +372,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC cpp-html-pcdata library-cpp-json cpp-json-yson + monlib-service-pages cpp-string_utils-base64 cpp-string_utils-quote ydb-core-actorlib_impl diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt index 5a350e0524..3715d25663 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt @@ -114,6 +114,7 @@ target_link_libraries(core-tx-datashard PUBLIC cpp-html-pcdata library-cpp-json cpp-json-yson + monlib-service-pages cpp-string_utils-base64 cpp-string_utils-quote ydb-core-actorlib_impl @@ -165,6 +166,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_common_ops.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_monitoring.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_commit_writes_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_data_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_distributed_erase_tx_unit.cpp @@ -372,6 +374,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC cpp-html-pcdata library-cpp-json cpp-json-yson + monlib-service-pages cpp-string_utils-base64 cpp-string_utils-quote ydb-core-actorlib_impl diff --git a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt index 5a350e0524..3715d25663 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt @@ -114,6 +114,7 @@ target_link_libraries(core-tx-datashard PUBLIC cpp-html-pcdata library-cpp-json cpp-json-yson + monlib-service-pages cpp-string_utils-base64 cpp-string_utils-quote ydb-core-actorlib_impl @@ -165,6 +166,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_common_ops.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_monitoring.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_commit_writes_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_data_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_distributed_erase_tx_unit.cpp @@ -372,6 +374,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC cpp-html-pcdata library-cpp-json cpp-json-yson + monlib-service-pages cpp-string_utils-base64 cpp-string_utils-quote ydb-core-actorlib_impl diff --git a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt index 960d938fb5..ec9dc257dd 100644 --- a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt @@ -114,6 +114,7 @@ target_link_libraries(core-tx-datashard PUBLIC cpp-html-pcdata library-cpp-json cpp-json-yson + monlib-service-pages cpp-string_utils-base64 cpp-string_utils-quote ydb-core-actorlib_impl @@ -165,6 +166,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_common_ops.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_monitoring.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_commit_writes_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_data_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_distributed_erase_tx_unit.cpp @@ -367,6 +369,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC cpp-html-pcdata library-cpp-json cpp-json-yson + monlib-service-pages cpp-string_utils-base64 cpp-string_utils-quote ydb-core-actorlib_impl diff --git a/ydb/core/tx/datashard/change_sender.cpp b/ydb/core/tx/datashard/change_sender.cpp index fa3193c743..48ab790c6c 100644 --- a/ydb/core/tx/datashard/change_sender.cpp +++ b/ydb/core/tx/datashard/change_sender.cpp @@ -1,5 +1,6 @@ #include "change_exchange.h" #include "change_exchange_impl.h" +#include "change_sender_monitoring.h" #include "datashard_impl.h" #include <ydb/core/protos/services.pb.h> @@ -7,12 +8,14 @@ #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> +#include <library/cpp/actors/core/mon.h> +#include <library/cpp/monlib/service/pages/mon_page.h> +#include <library/cpp/monlib/service/pages/templates.h> #include <util/generic/hash.h> #include <util/generic/maybe.h> -namespace NKikimr { -namespace NDataShard { +namespace NKikimr::NDataShard { class TChangeSender: public TActor<TChangeSender> { using ESenderType = TEvChangeExchange::ESenderType; @@ -161,6 +164,91 @@ class TChangeSender: public TActor<TChangeSender> { } } + void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) { + const auto& cgi = ev->Get()->Cgi(); + if (const auto& str = cgi.Get("pathId")) { + if (const auto& pathId = ParsePathId(str)) { + auto it = Senders.find(pathId); + if (it != Senders.end()) { + if (const auto& to = it->second.ActorId) { + ctx.Send(ev->Forward(to)); + } else { + Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(TStringBuilder() + << "Change sender '" << pathId << "' (" << it->second.Type << ") is not running")); + } + } else { + Send(ev->Sender, new NMon::TEvRemoteBinaryInfoRes(NMonitoring::HTTPNOTFOUND)); + } + } else { + Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes("Invalid pathId")); + } + + return; + } + + TStringStream html; + + HTML(html) { + Header(html, "Main change sender", DataShard.TabletId); + + SimplePanel(html, "Senders", [this](IOutputStream& html) { + HTML(html) { + TABLE_CLASS("table table-hover") { + TABLEHEAD() { + TABLER() { + TABLEH() { html << "#"; } + TABLEH() { html << "PathId"; } + TABLEH() { html << "UserTableId"; } + TABLEH() { html << "Type"; } + TABLEH() { html << "Actor"; } + } + } + TABLEBODY() { + ui32 i = 0; + for (const auto& [pathId, sender] : Senders) { + TABLER() { + TABLED() { html << ++i; } + TABLED() { PathLink(html, pathId); } + TABLED() { html << sender.UserTableId; } + TABLED() { html << sender.Type; } + TABLED() { ActorLink(html, DataShard.TabletId, pathId); } + } + } + } + } + } + }); + + CollapsedPanel(html, "Enqueued", "enqueued", [this](IOutputStream& html) { + HTML(html) { + TABLE_CLASS("table table-hover") { + TABLEHEAD() { + TABLER() { + TABLEH() { html << "#"; } + TABLEH() { html << "Order"; } + TABLEH() { html << "PathId"; } + TABLEH() { html << "BodySize"; } + } + } + TABLEBODY() { + ui32 i = 0; + for (const auto& record : Enqueued) { + TABLER() { + TABLED() { html << ++i; } + TABLED() { html << record.Order; } + TABLED() { PathLink(html, record.PathId); } + TABLED() { html << record.BodySize; } + } + } + } + } + } + }); + } + + Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(html.Str())); + } + void PassAway() override { for (const auto& [_, sender] : Senders) { if (!sender.ActorId) { @@ -199,26 +287,26 @@ public: } } - STATEFN(StateBase) { + STFUNC(StateBase) { switch (ev->GetTypeRewrite()) { hFunc(TEvChangeExchange::TEvEnqueueRecords, Handle); hFunc(TEvChangeExchange::TEvAddSender, Handle); hFunc(TEvChangeExchange::TEvRemoveSender, Handle); - - cFunc(TEvents::TEvPoison::EventType, PassAway); + HFunc(NMon::TEvRemoteHttpInfo, Handle); + sFunc(TEvents::TEvPoison, PassAway); } } - STATEFN(StateInactive) { + STFUNC(StateInactive) { switch (ev->GetTypeRewrite()) { hFunc(TEvChangeExchange::TEvActivateSender, Handle); default: - return StateBase(ev, TlsActivationContext->AsActorContext()); + return StateBase(ev, ctx); } } - STATEFN(StateActive) { - return StateBase(ev, TlsActivationContext->AsActorContext()); + STFUNC(StateActive) { + return StateBase(ev, ctx); } private: @@ -234,5 +322,4 @@ IActor* CreateChangeSender(const TDataShard* self) { return new TChangeSender(self); } -} // NDataShard -} // NKikimr +} diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index 554f4a9f81..34b40d876b 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -1,24 +1,24 @@ #include "change_exchange.h" #include "change_exchange_impl.h" #include "change_sender_common_ops.h" +#include "change_sender_monitoring.h" #include <ydb/core/base/tablet_pipecache.h> #include <ydb/core/protos/services.pb.h> #include <ydb/core/tablet_flat/flat_row_eggs.h> #include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/library/yql/public/udf/udf_data_type.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> -#include <ydb/library/yql/public/udf/udf_data_type.h> - #include <util/generic/maybe.h> -namespace NKikimr { -namespace NDataShard { +namespace NKikimr::NDataShard { using namespace NTable; +using ESenderType = TEvChangeExchange::ESenderType; class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeSenderShard> { TStringBuf GetLogPrefix() const { @@ -196,6 +196,27 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS Leave(); } + void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev) { + TStringStream html; + + HTML(html) { + Header(html, "AsyncIndex partition change sender", DataShard.TabletId); + + SimplePanel(html, "Info", [this](IOutputStream& html) { + HTML(html) { + DL_CLASS("dl-horizontal") { + TermDesc(html, "ShardId", ShardId); + TermDesc(html, "IndexTablePathId", IndexTablePathId); + TermDesc(html, "LeaderPipeCache", LeaderPipeCache); + TermDesc(html, "LastRecordOrder", LastRecordOrder); + } + } + }); + } + + Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(html.Str())); + } + void Leave() { Send(Parent, new TEvChangeExchangePrivate::TEvGone(ShardId)); PassAway(); @@ -232,6 +253,7 @@ public: STATEFN(StateBase) { switch (ev->GetTypeRewrite()) { hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); + hFunc(NMon::TEvRemoteHttpInfo, Handle); sFunc(TEvents::TEvPoison, PassAway); } } @@ -249,11 +271,12 @@ private: }; // TAsyncIndexChangeSenderShard -class TAsyncIndexChangeSenderMain: public TActorBootstrapped<TAsyncIndexChangeSenderMain> - , public TBaseChangeSender - , public IChangeSenderResolver - , private TSchemeCacheHelpers { - +class TAsyncIndexChangeSenderMain + : public TActorBootstrapped<TAsyncIndexChangeSenderMain> + , public TBaseChangeSender + , public IChangeSenderResolver + , private TSchemeCacheHelpers +{ TStringBuf GetLogPrefix() const { if (!LogPrefix) { LogPrefix = TStringBuilder() @@ -682,6 +705,10 @@ class TAsyncIndexChangeSenderMain: public TActorBootstrapped<TAsyncIndexChangeSe PassAway(); } + void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) { + RenderHtmlPage(ESenderType::AsyncIndex, ev, ctx); + } + void PassAway() override { KillSenders(); TActorBootstrapped::PassAway(); @@ -703,7 +730,7 @@ public: ResolveUserTable(); } - STATEFN(StateBase) { + STFUNC(StateBase) { switch (ev->GetTypeRewrite()) { hFunc(TEvChangeExchange::TEvEnqueueRecords, Handle); hFunc(TEvChangeExchange::TEvRecords, Handle); @@ -711,6 +738,7 @@ public: hFunc(TEvChangeExchange::TEvRemoveSender, Handle); hFunc(TEvChangeExchangePrivate::TEvReady, Handle); hFunc(TEvChangeExchangePrivate::TEvGone, Handle); + HFunc(NMon::TEvRemoteHttpInfo, Handle); sFunc(TEvents::TEvPoison, PassAway); } } @@ -731,5 +759,4 @@ IActor* CreateAsyncIndexChangeSender(const TDataShardId& dataShard, const TTable return new TAsyncIndexChangeSenderMain(dataShard, userTableId, indexPathId); } -} // NDataShard -} // NKikimr +} diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 5c79897e55..533b001e8d 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -1,23 +1,24 @@ #include "change_exchange.h" #include "change_exchange_impl.h" #include "change_sender_common_ops.h" +#include "change_sender_monitoring.h" #include "datashard_user_table.h" -#include <library/cpp/actors/core/actor_bootstrapped.h> -#include <library/cpp/actors/core/hfunc.h> -#include <library/cpp/actors/core/log.h> -#include <library/cpp/json/json_writer.h> - #include <ydb/core/persqueue/partition_key_range/partition_key_range.h> #include <ydb/core/persqueue/writer/source_id_encoding.h> #include <ydb/core/persqueue/writer/writer.h> #include <ydb/core/protos/grpc_pq_old.pb.h> #include <ydb/services/lib/sharding/sharding.h> -namespace NKikimr { -namespace NDataShard { +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> +#include <library/cpp/json/json_writer.h> + +namespace NKikimr::NDataShard { using namespace NPQ; +using ESenderType = TEvChangeExchange::ESenderType; class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderPartition> { TStringBuf GetLogPrefix() const { @@ -165,7 +166,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti const auto& result = *ev->Get(); if (!result.IsSuccess()) { - LOG_E("Error at 'Write" + LOG_E("Error at 'Write'" << ": reason# " << result.GetError().Reason); return Leave(); } @@ -203,6 +204,30 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti Ready(); } + void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev) { + TStringStream html; + + HTML(html) { + Header(html, "CdcStream partition change sender", DataShard.TabletId); + + SimplePanel(html, "Info", [this](IOutputStream& html) { + HTML(html) { + DL_CLASS("dl-horizontal") { + TermDesc(html, "PartitionId", PartitionId); + TermDesc(html, "ShardId", ShardId); + TermDesc(html, "SourceId", SourceId); + TermDesc(html, "Writer", Writer); + TermDesc(html, "MaxSeqNo", MaxSeqNo); + TermDesc(html, "Pending", Pending.size()); + TermDesc(html, "Cookie", Cookie); + } + } + }); + } + + Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(html.Str())); + } + void Leave() { Send(Parent, new TEvChangeExchangePrivate::TEvGone(PartitionId)); PassAway(); @@ -247,6 +272,7 @@ public: STATEFN(StateBase) { switch (ev->GetTypeRewrite()) { sFunc(TEvPartitionWriter::TEvDisconnected, Leave); + hFunc(NMon::TEvRemoteHttpInfo, Handle); sFunc(TEvents::TEvPoison, PassAway); } } @@ -267,11 +293,12 @@ private: }; // TCdcChangeSenderPartition -class TCdcChangeSenderMain: public TActorBootstrapped<TCdcChangeSenderMain> - , public TBaseChangeSender - , public IChangeSenderResolver - , private TSchemeCacheHelpers { - +class TCdcChangeSenderMain + : public TActorBootstrapped<TCdcChangeSenderMain> + , public TBaseChangeSender + , public IChangeSenderResolver + , private TSchemeCacheHelpers +{ struct TPQPartitionInfo { ui32 PartitionId; ui64 ShardId; @@ -699,6 +726,10 @@ class TCdcChangeSenderMain: public TActorBootstrapped<TCdcChangeSenderMain> PassAway(); } + void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) { + RenderHtmlPage(ESenderType::CdcStream, ev, ctx); + } + void PassAway() override { KillSenders(); TActorBootstrapped::PassAway(); @@ -719,7 +750,7 @@ public: ResolveCdcStream(); } - STATEFN(StateBase) { + STFUNC(StateBase) { switch (ev->GetTypeRewrite()) { hFunc(TEvChangeExchange::TEvEnqueueRecords, Handle); hFunc(TEvChangeExchange::TEvRecords, Handle); @@ -727,6 +758,7 @@ public: hFunc(TEvChangeExchange::TEvRemoveSender, Handle); hFunc(TEvChangeExchangePrivate::TEvReady, Handle); hFunc(TEvChangeExchangePrivate::TEvGone, Handle); + HFunc(NMon::TEvRemoteHttpInfo, Handle); sFunc(TEvents::TEvPoison, PassAway); } } @@ -745,5 +777,4 @@ IActor* CreateCdcStreamChangeSender(const TDataShardId& dataShard, const TPathId return new TCdcChangeSenderMain(dataShard, streamPathId); } -} // NDataShard -} // NKikimr +} diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp index 026c4033f2..201fd1c0a7 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.cpp +++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp @@ -1,9 +1,12 @@ #include "change_sender_common_ops.h" +#include "change_sender_monitoring.h" + +#include <library/cpp/monlib/service/pages/mon_page.h> +#include <library/cpp/monlib/service/pages/templates.h> #include <util/generic/size_literals.h> -namespace NKikimr { -namespace NDataShard { +namespace NKikimr::NDataShard { void TBaseChangeSender::CreateSenders(const TVector<ui64>& partitionIds) { THashMap<ui64, TSender> senders; @@ -254,5 +257,154 @@ TBaseChangeSender::TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* { } -} // NDataShard -} // NKikimr +void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon::TEvRemoteHttpInfo::TPtr& ev, + const TActorContext& ctx) +{ + const auto& cgi = ev->Get()->Cgi(); + if (const auto& str = cgi.Get("partitionId")) { + ui64 partitionId = 0; + if (TryFromString(str, partitionId)) { + auto it = Senders.find(partitionId); + if (it != Senders.end()) { + if (const auto& to = it->second.ActorId) { + ctx.Send(ev->Forward(to)); + } else { + ActorOps->Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(TStringBuilder() + << "Change sender '" << PathId << ":" << partitionId << "' is not running")); + } + } else { + ActorOps->Send(ev->Sender, new NMon::TEvRemoteBinaryInfoRes(NMonitoring::HTTPNOTFOUND)); + } + } else { + ActorOps->Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes("Invalid partitionId")); + } + + return; + } + + TStringStream html; + + HTML(html) { + Header(html, TStringBuilder() << type << " change sender", DataShard.TabletId); + + SimplePanel(html, "Partition senders", [this](IOutputStream& html) { + HTML(html) { + TABLE_CLASS("table table-hover") { + TABLEHEAD() { + TABLER() { + TABLEH() { html << "#"; } + TABLEH() { html << "PartitionId"; } + TABLEH() { html << "Ready"; } + TABLEH() { html << "Pending"; } + TABLEH() { html << "Actor"; } + } + } + TABLEBODY() { + ui32 i = 0; + for (const auto& [partitionId, sender] : Senders) { + TABLER() { + TABLED() { html << ++i; } + TABLED() { html << partitionId; } + TABLED() { html << sender.Ready; } + TABLED() { html << sender.Pending.size(); } + TABLED() { ActorLink(html, DataShard.TabletId, PathId, partitionId); } + } + } + } + } + } + }); + + CollapsedPanel(html, "Enqueued", "enqueued", [this](IOutputStream& html) { + HTML(html) { + TABLE_CLASS("table table-hover") { + TABLEHEAD() { + TABLER() { + TABLEH() { html << "#"; } + TABLEH() { html << "Order"; } + TABLEH() { html << "BodySize"; } + } + } + TABLEBODY() { + ui32 i = 0; + for (const auto& record : Enqueued) { + TABLER() { + TABLED() { html << ++i; } + TABLED() { html << record.Order; } + TABLED() { html << record.BodySize; } + } + } + } + } + } + }); + + CollapsedPanel(html, "PendingBody", "pendingBody", [this](IOutputStream& html) { + HTML(html) { + TABLE_CLASS("table table-hover") { + TABLEHEAD() { + TABLER() { + TABLEH() { html << "#"; } + TABLEH() { html << "Order"; } + TABLEH() { html << "BodySize"; } + } + } + TABLEBODY() { + ui32 i = 0; + for (const auto& record : PendingBody) { + TABLER() { + TABLED() { html << ++i; } + TABLED() { html << record.Order; } + TABLED() { html << record.BodySize; } + } + } + } + } + } + }); + + CollapsedPanel(html, "PendingSent", "pendingSent", [this](IOutputStream& html) { + HTML(html) { + TABLE_CLASS("table table-hover") { + TABLEHEAD() { + TABLER() { + TABLEH() { html << "#"; } + TABLEH() { html << "Order"; } + TABLEH() { html << "Group"; } + TABLEH() { html << "Step"; } + TABLEH() { html << "TxId"; } + TABLEH() { html << "LockId"; } + TABLEH() { html << "LockOffset"; } + TABLEH() { html << "PathId"; } + TABLEH() { html << "Kind"; } + TABLEH() { html << "TableId"; } + TABLEH() { html << "SchemaVersion"; } + } + } + TABLEBODY() { + ui32 i = 0; + for (const auto& [order, record] : PendingSent) { + TABLER() { + TABLED() { html << ++i; } + TABLED() { html << order; } + TABLED() { html << record.GetGroup(); } + TABLED() { html << record.GetStep(); } + TABLED() { html << record.GetTxId(); } + TABLED() { html << record.GetLockId(); } + TABLED() { html << record.GetLockOffset(); } + TABLED() { PathLink(html, record.GetPathId()); } + TABLED() { html << record.GetKind(); } + TABLED() { PathLink(html, record.GetTableId()); } + TABLED() { html << record.GetSchemaVersion(); } + } + } + } + } + } + }); + } + + ActorOps->Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(html.Str())); +} + +} diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h index efff4a722e..ce58902538 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.h +++ b/ydb/core/tx/datashard/change_sender_common_ops.h @@ -8,6 +8,7 @@ #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/mon.h> #include <util/generic/hash.h> #include <util/generic/map.h> @@ -105,6 +106,8 @@ protected: explicit TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver, const TDataShardId& dataShard, const TPathId& pathId); + void RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx); + private: IActorOps* const ActorOps; IChangeSenderResolver* const Resolver; diff --git a/ydb/core/tx/datashard/change_sender_monitoring.cpp b/ydb/core/tx/datashard/change_sender_monitoring.cpp new file mode 100644 index 0000000000..d6d311b7aa --- /dev/null +++ b/ydb/core/tx/datashard/change_sender_monitoring.cpp @@ -0,0 +1,116 @@ +#include "change_sender_monitoring.h" + +#include <util/string/cast.h> +#include <util/string/split.h> + +namespace NKikimr::NDataShard { + +void Panel(IOutputStream& str, std::function<void(IOutputStream&)> title, std::function<void(IOutputStream&)> body) { + HTML(str) { + DIV_CLASS("panel panel-default") { + DIV_CLASS("panel-heading") { + H4_CLASS("panel-title") { + title(str); + } + } + body(str); + } + } +} + +void SimplePanel(IOutputStream& str, const TStringBuf title, std::function<void(IOutputStream&)> body) { + auto titleRenderer = [&title](IOutputStream& str) { + HTML(str) { + str << title; + } + }; + + auto bodyRenderer = [body = std::move(body)](IOutputStream& str) { + HTML(str) { + DIV_CLASS("panel-body") { + body(str); + } + } + }; + + Panel(str, std::move(titleRenderer), std::move(bodyRenderer)); +} + +void CollapsedPanel(IOutputStream& str, const TStringBuf title, const TStringBuf targetId, + std::function<void(IOutputStream&)> body) +{ + auto titleRenderer = [&title, &targetId](IOutputStream& str) { + HTML(str) { + str << "<a data-toggle='collapse' href='#" << targetId << "'>" + << title + << "</a>"; + } + }; + + auto bodyRenderer = [&targetId, body = std::move(body)](IOutputStream& str) { + HTML(str) { + str << "<div id='" << targetId << "' class='collapse'>"; + DIV_CLASS("panel-body") { + body(str); + } + str << "</div>"; + } + }; + + Panel(str, std::move(titleRenderer), std::move(bodyRenderer)); +} + +template <typename P, typename D> +static bool TryGetNext(TStringBuf& s, D delim, P& param) { + TMaybe<TStringBuf> buf; + GetNext(s, delim, buf); + if (!buf) { + return false; + } + + return TryFromString(*buf, param); +} + +TPathId ParsePathId(TStringBuf str) { + ui64 ownerId; + ui64 localPathId; + + if (!TryGetNext(str, ':', ownerId) || !TryGetNext(str, ':', localPathId)) { + return {}; + } + + return TPathId(ownerId, localPathId); +} + +template <typename T> +static void Link(IOutputStream& str, const TStringBuf path, const T& title) { + HTML(str) { + HREF(path) { + str << title; + } + } +} + +void PathLink(IOutputStream& str, const TPathId& pathId) { + const TString path = TStringBuilder() << "app" + << "?TabletID=" << pathId.OwnerId + << "&Page=" << "PathInfo" + << "&OwnerPathId=" << pathId.OwnerId + << "&LocalPathId=" << pathId.LocalPathId; + Link(str, path, pathId); +} + +void ActorLink(IOutputStream& str, ui64 tabletId, const TPathId& pathId, const TMaybe<ui64>& partitionId) { + auto path = TStringBuilder() << "app" + << "?TabletID=" << tabletId + << "&page=" << "change-sender" + << "&pathId=" << pathId.OwnerId << ":" << pathId.LocalPathId; + + if (partitionId) { + path << "&partitionId=" << partitionId; + } + + Link(str, path, "Viewer"); +} + +} diff --git a/ydb/core/tx/datashard/change_sender_monitoring.h b/ydb/core/tx/datashard/change_sender_monitoring.h new file mode 100644 index 0000000000..14d901e251 --- /dev/null +++ b/ydb/core/tx/datashard/change_sender_monitoring.h @@ -0,0 +1,45 @@ +#pragma once + +#include <ydb/core/base/pathid.h> + +#include <library/cpp/monlib/service/pages/templates.h> + +#include <util/string/printf.h> + +namespace NKikimr::NDataShard { + +template <typename T> +void Header(IOutputStream& str, const T& title, ui64 tabletId) { + HTML(str) { + DIV_CLASS("page-header") { + TAG(TH3) { + str << title; + SMALL() { + str << " "; + HREF(Sprintf("app?TabletID=%" PRIu64, tabletId)) { + str << tabletId; + } + } + } + } + } +} + +template <typename T> +static void TermDesc(IOutputStream& str, const TStringBuf term, const T& desc) { + HTML(str) { + DT() { str << term; } + DD() { str << desc; } + } +} + +void Panel(IOutputStream& str, std::function<void(IOutputStream&)> title, std::function<void(IOutputStream&)> body); +void SimplePanel(IOutputStream& str, const TStringBuf title, std::function<void(IOutputStream&)> body); +void CollapsedPanel(IOutputStream& str, const TStringBuf title, const TStringBuf targetId, + std::function<void(IOutputStream&)> body); + +TPathId ParsePathId(TStringBuf str); +void PathLink(IOutputStream& str, const TPathId& pathId); +void ActorLink(IOutputStream& str, ui64 tabletId, const TPathId& pathId, const TMaybe<ui64>& partitionId = {}); + +} diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index e96eaff9d9..a40d95de94 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -1809,8 +1809,7 @@ bool TDataShard::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TAc auto cgi = ev->Get()->Cgi(); - auto action = cgi.Get("action"); - if (action) { + if (const auto& action = cgi.Get("action")) { if (action == "cleanup-borrowed-parts") { Execute(CreateTxMonitoringCleanupBorrowedParts(this, ev), ctx); return true; @@ -1832,8 +1831,24 @@ bool TDataShard::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TAc return true; } - Execute(CreateTxMonitoring(this, ev), ctx); + if (const auto& page = cgi.Get("page")) { + if (page == "main") { + // fallthrough + } else if (page == "change-sender") { + if (OutChangeSender) { + ctx.Send(ev->Forward(OutChangeSender)); + return true; + } else { + ctx.Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes("Change sender is not running")); + return true; + } + } else { + ctx.Send(ev->Sender, new NMon::TEvRemoteBinaryInfoRes(NMonitoring::HTTPNOTFOUND)); + return true; + } + } + Execute(CreateTxMonitoring(this, ev), ctx); return true; } diff --git a/ydb/core/tx/datashard/index.html b/ydb/core/tx/datashard/index.html index 776f3a97a3..dd4ac646da 100644 --- a/ydb/core/tx/datashard/index.html +++ b/ydb/core/tx/datashard/index.html @@ -89,6 +89,10 @@ <td class="ds-info">HasSharedBlobs</td> <td class="ds-info" id="tablet-info-shared-blobs">Loading...</td> </tr> + <tr class="ds-info"> + <td class="ds-info">Change sender</td> + <td class="ds-info" id="tablet-info-change-sender">Loading...</td> + </tr> </tbody> </table> <table class="ds-info"> |