aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-03-31 16:36:54 +0300
committerilnaz <ilnaz@ydb.tech>2023-03-31 16:36:54 +0300
commite7cd96e8f346577527021d248f37763808b624d7 (patch)
tree871179d0e0764c1885fb74798edb58f3f9dde68d
parent986e64263081c9086bbac0384d3d2ef455252760 (diff)
downloadydb-e7cd96e8f346577527021d248f37763808b624d7.tar.gz
Change sender's viewer
-rw-r--r--ydb/core/cms/ui/datashard_info.js1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt3
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt3
-rw-r--r--ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt3
-rw-r--r--ydb/core/tx/datashard/change_sender.cpp109
-rw-r--r--ydb/core/tx/datashard/change_sender_async_index.cpp51
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp63
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.cpp160
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.h3
-rw-r--r--ydb/core/tx/datashard/change_sender_monitoring.cpp116
-rw-r--r--ydb/core/tx/datashard/change_sender_monitoring.h45
-rw-r--r--ydb/core/tx/datashard/datashard.cpp21
-rw-r--r--ydb/core/tx/datashard/index.html4
14 files changed, 539 insertions, 46 deletions
diff --git a/ydb/core/cms/ui/datashard_info.js b/ydb/core/cms/ui/datashard_info.js
index f1c8bad40f..1c35b4cc7f 100644
--- a/ydb/core/cms/ui/datashard_info.js
+++ b/ydb/core/cms/ui/datashard_info.js
@@ -104,6 +104,7 @@ function onDataShardInfoLoaded(data) {
$('#tablet-info-role').text(info.IsFollower ? 'Follower' : 'Leader');
$('#tablet-info-state').text(info.State + (info.IsActive ? ' (active)' : ' (inactive)'));
$('#tablet-info-shared-blobs').text(info.HasSharedBlobs);
+ $('#tablet-info-change-sender').html('<a href="app?TabletID=' + TabletId + '&page=change-sender">Viewer</a>');
var activities = data.Activities;
if (activities) {
diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
index 36883adf5d..4c48c4f6ce 100644
--- a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
@@ -113,6 +113,7 @@ target_link_libraries(core-tx-datashard PUBLIC
cpp-html-pcdata
library-cpp-json
cpp-json-yson
+ monlib-service-pages
cpp-string_utils-base64
cpp-string_utils-quote
ydb-core-actorlib_impl
@@ -164,6 +165,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_common_ops.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_monitoring.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_commit_writes_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_data_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_distributed_erase_tx_unit.cpp
@@ -370,6 +372,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC
cpp-html-pcdata
library-cpp-json
cpp-json-yson
+ monlib-service-pages
cpp-string_utils-base64
cpp-string_utils-quote
ydb-core-actorlib_impl
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
index 5a350e0524..3715d25663 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
@@ -114,6 +114,7 @@ target_link_libraries(core-tx-datashard PUBLIC
cpp-html-pcdata
library-cpp-json
cpp-json-yson
+ monlib-service-pages
cpp-string_utils-base64
cpp-string_utils-quote
ydb-core-actorlib_impl
@@ -165,6 +166,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_common_ops.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_monitoring.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_commit_writes_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_data_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_distributed_erase_tx_unit.cpp
@@ -372,6 +374,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC
cpp-html-pcdata
library-cpp-json
cpp-json-yson
+ monlib-service-pages
cpp-string_utils-base64
cpp-string_utils-quote
ydb-core-actorlib_impl
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
index 5a350e0524..3715d25663 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
@@ -114,6 +114,7 @@ target_link_libraries(core-tx-datashard PUBLIC
cpp-html-pcdata
library-cpp-json
cpp-json-yson
+ monlib-service-pages
cpp-string_utils-base64
cpp-string_utils-quote
ydb-core-actorlib_impl
@@ -165,6 +166,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_common_ops.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_monitoring.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_commit_writes_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_data_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_distributed_erase_tx_unit.cpp
@@ -372,6 +374,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC
cpp-html-pcdata
library-cpp-json
cpp-json-yson
+ monlib-service-pages
cpp-string_utils-base64
cpp-string_utils-quote
ydb-core-actorlib_impl
diff --git a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
index 960d938fb5..ec9dc257dd 100644
--- a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
@@ -114,6 +114,7 @@ target_link_libraries(core-tx-datashard PUBLIC
cpp-html-pcdata
library-cpp-json
cpp-json-yson
+ monlib-service-pages
cpp-string_utils-base64
cpp-string_utils-quote
ydb-core-actorlib_impl
@@ -165,6 +166,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_async_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_common_ops.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_sender_monitoring.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_commit_writes_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_data_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_distributed_erase_tx_unit.cpp
@@ -367,6 +369,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC
cpp-html-pcdata
library-cpp-json
cpp-json-yson
+ monlib-service-pages
cpp-string_utils-base64
cpp-string_utils-quote
ydb-core-actorlib_impl
diff --git a/ydb/core/tx/datashard/change_sender.cpp b/ydb/core/tx/datashard/change_sender.cpp
index fa3193c743..48ab790c6c 100644
--- a/ydb/core/tx/datashard/change_sender.cpp
+++ b/ydb/core/tx/datashard/change_sender.cpp
@@ -1,5 +1,6 @@
#include "change_exchange.h"
#include "change_exchange_impl.h"
+#include "change_sender_monitoring.h"
#include "datashard_impl.h"
#include <ydb/core/protos/services.pb.h>
@@ -7,12 +8,14 @@
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
+#include <library/cpp/actors/core/mon.h>
+#include <library/cpp/monlib/service/pages/mon_page.h>
+#include <library/cpp/monlib/service/pages/templates.h>
#include <util/generic/hash.h>
#include <util/generic/maybe.h>
-namespace NKikimr {
-namespace NDataShard {
+namespace NKikimr::NDataShard {
class TChangeSender: public TActor<TChangeSender> {
using ESenderType = TEvChangeExchange::ESenderType;
@@ -161,6 +164,91 @@ class TChangeSender: public TActor<TChangeSender> {
}
}
+ void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) {
+ const auto& cgi = ev->Get()->Cgi();
+ if (const auto& str = cgi.Get("pathId")) {
+ if (const auto& pathId = ParsePathId(str)) {
+ auto it = Senders.find(pathId);
+ if (it != Senders.end()) {
+ if (const auto& to = it->second.ActorId) {
+ ctx.Send(ev->Forward(to));
+ } else {
+ Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(TStringBuilder()
+ << "Change sender '" << pathId << "' (" << it->second.Type << ") is not running"));
+ }
+ } else {
+ Send(ev->Sender, new NMon::TEvRemoteBinaryInfoRes(NMonitoring::HTTPNOTFOUND));
+ }
+ } else {
+ Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes("Invalid pathId"));
+ }
+
+ return;
+ }
+
+ TStringStream html;
+
+ HTML(html) {
+ Header(html, "Main change sender", DataShard.TabletId);
+
+ SimplePanel(html, "Senders", [this](IOutputStream& html) {
+ HTML(html) {
+ TABLE_CLASS("table table-hover") {
+ TABLEHEAD() {
+ TABLER() {
+ TABLEH() { html << "#"; }
+ TABLEH() { html << "PathId"; }
+ TABLEH() { html << "UserTableId"; }
+ TABLEH() { html << "Type"; }
+ TABLEH() { html << "Actor"; }
+ }
+ }
+ TABLEBODY() {
+ ui32 i = 0;
+ for (const auto& [pathId, sender] : Senders) {
+ TABLER() {
+ TABLED() { html << ++i; }
+ TABLED() { PathLink(html, pathId); }
+ TABLED() { html << sender.UserTableId; }
+ TABLED() { html << sender.Type; }
+ TABLED() { ActorLink(html, DataShard.TabletId, pathId); }
+ }
+ }
+ }
+ }
+ }
+ });
+
+ CollapsedPanel(html, "Enqueued", "enqueued", [this](IOutputStream& html) {
+ HTML(html) {
+ TABLE_CLASS("table table-hover") {
+ TABLEHEAD() {
+ TABLER() {
+ TABLEH() { html << "#"; }
+ TABLEH() { html << "Order"; }
+ TABLEH() { html << "PathId"; }
+ TABLEH() { html << "BodySize"; }
+ }
+ }
+ TABLEBODY() {
+ ui32 i = 0;
+ for (const auto& record : Enqueued) {
+ TABLER() {
+ TABLED() { html << ++i; }
+ TABLED() { html << record.Order; }
+ TABLED() { PathLink(html, record.PathId); }
+ TABLED() { html << record.BodySize; }
+ }
+ }
+ }
+ }
+ }
+ });
+ }
+
+ Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(html.Str()));
+ }
+
void PassAway() override {
for (const auto& [_, sender] : Senders) {
if (!sender.ActorId) {
@@ -199,26 +287,26 @@ public:
}
}
- STATEFN(StateBase) {
+ STFUNC(StateBase) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvChangeExchange::TEvEnqueueRecords, Handle);
hFunc(TEvChangeExchange::TEvAddSender, Handle);
hFunc(TEvChangeExchange::TEvRemoveSender, Handle);
-
- cFunc(TEvents::TEvPoison::EventType, PassAway);
+ HFunc(NMon::TEvRemoteHttpInfo, Handle);
+ sFunc(TEvents::TEvPoison, PassAway);
}
}
- STATEFN(StateInactive) {
+ STFUNC(StateInactive) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvChangeExchange::TEvActivateSender, Handle);
default:
- return StateBase(ev, TlsActivationContext->AsActorContext());
+ return StateBase(ev, ctx);
}
}
- STATEFN(StateActive) {
- return StateBase(ev, TlsActivationContext->AsActorContext());
+ STFUNC(StateActive) {
+ return StateBase(ev, ctx);
}
private:
@@ -234,5 +322,4 @@ IActor* CreateChangeSender(const TDataShard* self) {
return new TChangeSender(self);
}
-} // NDataShard
-} // NKikimr
+}
diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp
index 554f4a9f81..34b40d876b 100644
--- a/ydb/core/tx/datashard/change_sender_async_index.cpp
+++ b/ydb/core/tx/datashard/change_sender_async_index.cpp
@@ -1,24 +1,24 @@
#include "change_exchange.h"
#include "change_exchange_impl.h"
#include "change_sender_common_ops.h"
+#include "change_sender_monitoring.h"
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/protos/services.pb.h>
#include <ydb/core/tablet_flat/flat_row_eggs.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/library/yql/public/udf/udf_data_type.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
-#include <ydb/library/yql/public/udf/udf_data_type.h>
-
#include <util/generic/maybe.h>
-namespace NKikimr {
-namespace NDataShard {
+namespace NKikimr::NDataShard {
using namespace NTable;
+using ESenderType = TEvChangeExchange::ESenderType;
class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeSenderShard> {
TStringBuf GetLogPrefix() const {
@@ -196,6 +196,27 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS
Leave();
}
+ void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev) {
+ TStringStream html;
+
+ HTML(html) {
+ Header(html, "AsyncIndex partition change sender", DataShard.TabletId);
+
+ SimplePanel(html, "Info", [this](IOutputStream& html) {
+ HTML(html) {
+ DL_CLASS("dl-horizontal") {
+ TermDesc(html, "ShardId", ShardId);
+ TermDesc(html, "IndexTablePathId", IndexTablePathId);
+ TermDesc(html, "LeaderPipeCache", LeaderPipeCache);
+ TermDesc(html, "LastRecordOrder", LastRecordOrder);
+ }
+ }
+ });
+ }
+
+ Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(html.Str()));
+ }
+
void Leave() {
Send(Parent, new TEvChangeExchangePrivate::TEvGone(ShardId));
PassAway();
@@ -232,6 +253,7 @@ public:
STATEFN(StateBase) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
+ hFunc(NMon::TEvRemoteHttpInfo, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
}
@@ -249,11 +271,12 @@ private:
}; // TAsyncIndexChangeSenderShard
-class TAsyncIndexChangeSenderMain: public TActorBootstrapped<TAsyncIndexChangeSenderMain>
- , public TBaseChangeSender
- , public IChangeSenderResolver
- , private TSchemeCacheHelpers {
-
+class TAsyncIndexChangeSenderMain
+ : public TActorBootstrapped<TAsyncIndexChangeSenderMain>
+ , public TBaseChangeSender
+ , public IChangeSenderResolver
+ , private TSchemeCacheHelpers
+{
TStringBuf GetLogPrefix() const {
if (!LogPrefix) {
LogPrefix = TStringBuilder()
@@ -682,6 +705,10 @@ class TAsyncIndexChangeSenderMain: public TActorBootstrapped<TAsyncIndexChangeSe
PassAway();
}
+ void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) {
+ RenderHtmlPage(ESenderType::AsyncIndex, ev, ctx);
+ }
+
void PassAway() override {
KillSenders();
TActorBootstrapped::PassAway();
@@ -703,7 +730,7 @@ public:
ResolveUserTable();
}
- STATEFN(StateBase) {
+ STFUNC(StateBase) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvChangeExchange::TEvEnqueueRecords, Handle);
hFunc(TEvChangeExchange::TEvRecords, Handle);
@@ -711,6 +738,7 @@ public:
hFunc(TEvChangeExchange::TEvRemoveSender, Handle);
hFunc(TEvChangeExchangePrivate::TEvReady, Handle);
hFunc(TEvChangeExchangePrivate::TEvGone, Handle);
+ HFunc(NMon::TEvRemoteHttpInfo, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
}
@@ -731,5 +759,4 @@ IActor* CreateAsyncIndexChangeSender(const TDataShardId& dataShard, const TTable
return new TAsyncIndexChangeSenderMain(dataShard, userTableId, indexPathId);
}
-} // NDataShard
-} // NKikimr
+}
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index 5c79897e55..533b001e8d 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -1,23 +1,24 @@
#include "change_exchange.h"
#include "change_exchange_impl.h"
#include "change_sender_common_ops.h"
+#include "change_sender_monitoring.h"
#include "datashard_user_table.h"
-#include <library/cpp/actors/core/actor_bootstrapped.h>
-#include <library/cpp/actors/core/hfunc.h>
-#include <library/cpp/actors/core/log.h>
-#include <library/cpp/json/json_writer.h>
-
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/persqueue/writer/writer.h>
#include <ydb/core/protos/grpc_pq_old.pb.h>
#include <ydb/services/lib/sharding/sharding.h>
-namespace NKikimr {
-namespace NDataShard {
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
+#include <library/cpp/json/json_writer.h>
+
+namespace NKikimr::NDataShard {
using namespace NPQ;
+using ESenderType = TEvChangeExchange::ESenderType;
class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderPartition> {
TStringBuf GetLogPrefix() const {
@@ -165,7 +166,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
const auto& result = *ev->Get();
if (!result.IsSuccess()) {
- LOG_E("Error at 'Write"
+ LOG_E("Error at 'Write'"
<< ": reason# " << result.GetError().Reason);
return Leave();
}
@@ -203,6 +204,30 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
Ready();
}
+ void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev) {
+ TStringStream html;
+
+ HTML(html) {
+ Header(html, "CdcStream partition change sender", DataShard.TabletId);
+
+ SimplePanel(html, "Info", [this](IOutputStream& html) {
+ HTML(html) {
+ DL_CLASS("dl-horizontal") {
+ TermDesc(html, "PartitionId", PartitionId);
+ TermDesc(html, "ShardId", ShardId);
+ TermDesc(html, "SourceId", SourceId);
+ TermDesc(html, "Writer", Writer);
+ TermDesc(html, "MaxSeqNo", MaxSeqNo);
+ TermDesc(html, "Pending", Pending.size());
+ TermDesc(html, "Cookie", Cookie);
+ }
+ }
+ });
+ }
+
+ Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(html.Str()));
+ }
+
void Leave() {
Send(Parent, new TEvChangeExchangePrivate::TEvGone(PartitionId));
PassAway();
@@ -247,6 +272,7 @@ public:
STATEFN(StateBase) {
switch (ev->GetTypeRewrite()) {
sFunc(TEvPartitionWriter::TEvDisconnected, Leave);
+ hFunc(NMon::TEvRemoteHttpInfo, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
}
@@ -267,11 +293,12 @@ private:
}; // TCdcChangeSenderPartition
-class TCdcChangeSenderMain: public TActorBootstrapped<TCdcChangeSenderMain>
- , public TBaseChangeSender
- , public IChangeSenderResolver
- , private TSchemeCacheHelpers {
-
+class TCdcChangeSenderMain
+ : public TActorBootstrapped<TCdcChangeSenderMain>
+ , public TBaseChangeSender
+ , public IChangeSenderResolver
+ , private TSchemeCacheHelpers
+{
struct TPQPartitionInfo {
ui32 PartitionId;
ui64 ShardId;
@@ -699,6 +726,10 @@ class TCdcChangeSenderMain: public TActorBootstrapped<TCdcChangeSenderMain>
PassAway();
}
+ void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) {
+ RenderHtmlPage(ESenderType::CdcStream, ev, ctx);
+ }
+
void PassAway() override {
KillSenders();
TActorBootstrapped::PassAway();
@@ -719,7 +750,7 @@ public:
ResolveCdcStream();
}
- STATEFN(StateBase) {
+ STFUNC(StateBase) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvChangeExchange::TEvEnqueueRecords, Handle);
hFunc(TEvChangeExchange::TEvRecords, Handle);
@@ -727,6 +758,7 @@ public:
hFunc(TEvChangeExchange::TEvRemoveSender, Handle);
hFunc(TEvChangeExchangePrivate::TEvReady, Handle);
hFunc(TEvChangeExchangePrivate::TEvGone, Handle);
+ HFunc(NMon::TEvRemoteHttpInfo, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
}
@@ -745,5 +777,4 @@ IActor* CreateCdcStreamChangeSender(const TDataShardId& dataShard, const TPathId
return new TCdcChangeSenderMain(dataShard, streamPathId);
}
-} // NDataShard
-} // NKikimr
+}
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp
index 026c4033f2..201fd1c0a7 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.cpp
+++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp
@@ -1,9 +1,12 @@
#include "change_sender_common_ops.h"
+#include "change_sender_monitoring.h"
+
+#include <library/cpp/monlib/service/pages/mon_page.h>
+#include <library/cpp/monlib/service/pages/templates.h>
#include <util/generic/size_literals.h>
-namespace NKikimr {
-namespace NDataShard {
+namespace NKikimr::NDataShard {
void TBaseChangeSender::CreateSenders(const TVector<ui64>& partitionIds) {
THashMap<ui64, TSender> senders;
@@ -254,5 +257,154 @@ TBaseChangeSender::TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver*
{
}
-} // NDataShard
-} // NKikimr
+void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon::TEvRemoteHttpInfo::TPtr& ev,
+ const TActorContext& ctx)
+{
+ const auto& cgi = ev->Get()->Cgi();
+ if (const auto& str = cgi.Get("partitionId")) {
+ ui64 partitionId = 0;
+ if (TryFromString(str, partitionId)) {
+ auto it = Senders.find(partitionId);
+ if (it != Senders.end()) {
+ if (const auto& to = it->second.ActorId) {
+ ctx.Send(ev->Forward(to));
+ } else {
+ ActorOps->Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(TStringBuilder()
+ << "Change sender '" << PathId << ":" << partitionId << "' is not running"));
+ }
+ } else {
+ ActorOps->Send(ev->Sender, new NMon::TEvRemoteBinaryInfoRes(NMonitoring::HTTPNOTFOUND));
+ }
+ } else {
+ ActorOps->Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes("Invalid partitionId"));
+ }
+
+ return;
+ }
+
+ TStringStream html;
+
+ HTML(html) {
+ Header(html, TStringBuilder() << type << " change sender", DataShard.TabletId);
+
+ SimplePanel(html, "Partition senders", [this](IOutputStream& html) {
+ HTML(html) {
+ TABLE_CLASS("table table-hover") {
+ TABLEHEAD() {
+ TABLER() {
+ TABLEH() { html << "#"; }
+ TABLEH() { html << "PartitionId"; }
+ TABLEH() { html << "Ready"; }
+ TABLEH() { html << "Pending"; }
+ TABLEH() { html << "Actor"; }
+ }
+ }
+ TABLEBODY() {
+ ui32 i = 0;
+ for (const auto& [partitionId, sender] : Senders) {
+ TABLER() {
+ TABLED() { html << ++i; }
+ TABLED() { html << partitionId; }
+ TABLED() { html << sender.Ready; }
+ TABLED() { html << sender.Pending.size(); }
+ TABLED() { ActorLink(html, DataShard.TabletId, PathId, partitionId); }
+ }
+ }
+ }
+ }
+ }
+ });
+
+ CollapsedPanel(html, "Enqueued", "enqueued", [this](IOutputStream& html) {
+ HTML(html) {
+ TABLE_CLASS("table table-hover") {
+ TABLEHEAD() {
+ TABLER() {
+ TABLEH() { html << "#"; }
+ TABLEH() { html << "Order"; }
+ TABLEH() { html << "BodySize"; }
+ }
+ }
+ TABLEBODY() {
+ ui32 i = 0;
+ for (const auto& record : Enqueued) {
+ TABLER() {
+ TABLED() { html << ++i; }
+ TABLED() { html << record.Order; }
+ TABLED() { html << record.BodySize; }
+ }
+ }
+ }
+ }
+ }
+ });
+
+ CollapsedPanel(html, "PendingBody", "pendingBody", [this](IOutputStream& html) {
+ HTML(html) {
+ TABLE_CLASS("table table-hover") {
+ TABLEHEAD() {
+ TABLER() {
+ TABLEH() { html << "#"; }
+ TABLEH() { html << "Order"; }
+ TABLEH() { html << "BodySize"; }
+ }
+ }
+ TABLEBODY() {
+ ui32 i = 0;
+ for (const auto& record : PendingBody) {
+ TABLER() {
+ TABLED() { html << ++i; }
+ TABLED() { html << record.Order; }
+ TABLED() { html << record.BodySize; }
+ }
+ }
+ }
+ }
+ }
+ });
+
+ CollapsedPanel(html, "PendingSent", "pendingSent", [this](IOutputStream& html) {
+ HTML(html) {
+ TABLE_CLASS("table table-hover") {
+ TABLEHEAD() {
+ TABLER() {
+ TABLEH() { html << "#"; }
+ TABLEH() { html << "Order"; }
+ TABLEH() { html << "Group"; }
+ TABLEH() { html << "Step"; }
+ TABLEH() { html << "TxId"; }
+ TABLEH() { html << "LockId"; }
+ TABLEH() { html << "LockOffset"; }
+ TABLEH() { html << "PathId"; }
+ TABLEH() { html << "Kind"; }
+ TABLEH() { html << "TableId"; }
+ TABLEH() { html << "SchemaVersion"; }
+ }
+ }
+ TABLEBODY() {
+ ui32 i = 0;
+ for (const auto& [order, record] : PendingSent) {
+ TABLER() {
+ TABLED() { html << ++i; }
+ TABLED() { html << order; }
+ TABLED() { html << record.GetGroup(); }
+ TABLED() { html << record.GetStep(); }
+ TABLED() { html << record.GetTxId(); }
+ TABLED() { html << record.GetLockId(); }
+ TABLED() { html << record.GetLockOffset(); }
+ TABLED() { PathLink(html, record.GetPathId()); }
+ TABLED() { html << record.GetKind(); }
+ TABLED() { PathLink(html, record.GetTableId()); }
+ TABLED() { html << record.GetSchemaVersion(); }
+ }
+ }
+ }
+ }
+ }
+ });
+ }
+
+ ActorOps->Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(html.Str()));
+}
+
+}
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h
index efff4a722e..ce58902538 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.h
+++ b/ydb/core/tx/datashard/change_sender_common_ops.h
@@ -8,6 +8,7 @@
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/mon.h>
#include <util/generic/hash.h>
#include <util/generic/map.h>
@@ -105,6 +106,8 @@ protected:
explicit TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver,
const TDataShardId& dataShard, const TPathId& pathId);
+ void RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx);
+
private:
IActorOps* const ActorOps;
IChangeSenderResolver* const Resolver;
diff --git a/ydb/core/tx/datashard/change_sender_monitoring.cpp b/ydb/core/tx/datashard/change_sender_monitoring.cpp
new file mode 100644
index 0000000000..d6d311b7aa
--- /dev/null
+++ b/ydb/core/tx/datashard/change_sender_monitoring.cpp
@@ -0,0 +1,116 @@
+#include "change_sender_monitoring.h"
+
+#include <util/string/cast.h>
+#include <util/string/split.h>
+
+namespace NKikimr::NDataShard {
+
+void Panel(IOutputStream& str, std::function<void(IOutputStream&)> title, std::function<void(IOutputStream&)> body) {
+ HTML(str) {
+ DIV_CLASS("panel panel-default") {
+ DIV_CLASS("panel-heading") {
+ H4_CLASS("panel-title") {
+ title(str);
+ }
+ }
+ body(str);
+ }
+ }
+}
+
+void SimplePanel(IOutputStream& str, const TStringBuf title, std::function<void(IOutputStream&)> body) {
+ auto titleRenderer = [&title](IOutputStream& str) {
+ HTML(str) {
+ str << title;
+ }
+ };
+
+ auto bodyRenderer = [body = std::move(body)](IOutputStream& str) {
+ HTML(str) {
+ DIV_CLASS("panel-body") {
+ body(str);
+ }
+ }
+ };
+
+ Panel(str, std::move(titleRenderer), std::move(bodyRenderer));
+}
+
+void CollapsedPanel(IOutputStream& str, const TStringBuf title, const TStringBuf targetId,
+ std::function<void(IOutputStream&)> body)
+{
+ auto titleRenderer = [&title, &targetId](IOutputStream& str) {
+ HTML(str) {
+ str << "<a data-toggle='collapse' href='#" << targetId << "'>"
+ << title
+ << "</a>";
+ }
+ };
+
+ auto bodyRenderer = [&targetId, body = std::move(body)](IOutputStream& str) {
+ HTML(str) {
+ str << "<div id='" << targetId << "' class='collapse'>";
+ DIV_CLASS("panel-body") {
+ body(str);
+ }
+ str << "</div>";
+ }
+ };
+
+ Panel(str, std::move(titleRenderer), std::move(bodyRenderer));
+}
+
+template <typename P, typename D>
+static bool TryGetNext(TStringBuf& s, D delim, P& param) {
+ TMaybe<TStringBuf> buf;
+ GetNext(s, delim, buf);
+ if (!buf) {
+ return false;
+ }
+
+ return TryFromString(*buf, param);
+}
+
+TPathId ParsePathId(TStringBuf str) {
+ ui64 ownerId;
+ ui64 localPathId;
+
+ if (!TryGetNext(str, ':', ownerId) || !TryGetNext(str, ':', localPathId)) {
+ return {};
+ }
+
+ return TPathId(ownerId, localPathId);
+}
+
+template <typename T>
+static void Link(IOutputStream& str, const TStringBuf path, const T& title) {
+ HTML(str) {
+ HREF(path) {
+ str << title;
+ }
+ }
+}
+
+void PathLink(IOutputStream& str, const TPathId& pathId) {
+ const TString path = TStringBuilder() << "app"
+ << "?TabletID=" << pathId.OwnerId
+ << "&Page=" << "PathInfo"
+ << "&OwnerPathId=" << pathId.OwnerId
+ << "&LocalPathId=" << pathId.LocalPathId;
+ Link(str, path, pathId);
+}
+
+void ActorLink(IOutputStream& str, ui64 tabletId, const TPathId& pathId, const TMaybe<ui64>& partitionId) {
+ auto path = TStringBuilder() << "app"
+ << "?TabletID=" << tabletId
+ << "&page=" << "change-sender"
+ << "&pathId=" << pathId.OwnerId << ":" << pathId.LocalPathId;
+
+ if (partitionId) {
+ path << "&partitionId=" << partitionId;
+ }
+
+ Link(str, path, "Viewer");
+}
+
+}
diff --git a/ydb/core/tx/datashard/change_sender_monitoring.h b/ydb/core/tx/datashard/change_sender_monitoring.h
new file mode 100644
index 0000000000..14d901e251
--- /dev/null
+++ b/ydb/core/tx/datashard/change_sender_monitoring.h
@@ -0,0 +1,45 @@
+#pragma once
+
+#include <ydb/core/base/pathid.h>
+
+#include <library/cpp/monlib/service/pages/templates.h>
+
+#include <util/string/printf.h>
+
+namespace NKikimr::NDataShard {
+
+template <typename T>
+void Header(IOutputStream& str, const T& title, ui64 tabletId) {
+ HTML(str) {
+ DIV_CLASS("page-header") {
+ TAG(TH3) {
+ str << title;
+ SMALL() {
+ str << "&nbsp;";
+ HREF(Sprintf("app?TabletID=%" PRIu64, tabletId)) {
+ str << tabletId;
+ }
+ }
+ }
+ }
+ }
+}
+
+template <typename T>
+static void TermDesc(IOutputStream& str, const TStringBuf term, const T& desc) {
+ HTML(str) {
+ DT() { str << term; }
+ DD() { str << desc; }
+ }
+}
+
+void Panel(IOutputStream& str, std::function<void(IOutputStream&)> title, std::function<void(IOutputStream&)> body);
+void SimplePanel(IOutputStream& str, const TStringBuf title, std::function<void(IOutputStream&)> body);
+void CollapsedPanel(IOutputStream& str, const TStringBuf title, const TStringBuf targetId,
+ std::function<void(IOutputStream&)> body);
+
+TPathId ParsePathId(TStringBuf str);
+void PathLink(IOutputStream& str, const TPathId& pathId);
+void ActorLink(IOutputStream& str, ui64 tabletId, const TPathId& pathId, const TMaybe<ui64>& partitionId = {});
+
+}
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index e96eaff9d9..a40d95de94 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -1809,8 +1809,7 @@ bool TDataShard::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TAc
auto cgi = ev->Get()->Cgi();
- auto action = cgi.Get("action");
- if (action) {
+ if (const auto& action = cgi.Get("action")) {
if (action == "cleanup-borrowed-parts") {
Execute(CreateTxMonitoringCleanupBorrowedParts(this, ev), ctx);
return true;
@@ -1832,8 +1831,24 @@ bool TDataShard::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TAc
return true;
}
- Execute(CreateTxMonitoring(this, ev), ctx);
+ if (const auto& page = cgi.Get("page")) {
+ if (page == "main") {
+ // fallthrough
+ } else if (page == "change-sender") {
+ if (OutChangeSender) {
+ ctx.Send(ev->Forward(OutChangeSender));
+ return true;
+ } else {
+ ctx.Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes("Change sender is not running"));
+ return true;
+ }
+ } else {
+ ctx.Send(ev->Sender, new NMon::TEvRemoteBinaryInfoRes(NMonitoring::HTTPNOTFOUND));
+ return true;
+ }
+ }
+ Execute(CreateTxMonitoring(this, ev), ctx);
return true;
}
diff --git a/ydb/core/tx/datashard/index.html b/ydb/core/tx/datashard/index.html
index 776f3a97a3..dd4ac646da 100644
--- a/ydb/core/tx/datashard/index.html
+++ b/ydb/core/tx/datashard/index.html
@@ -89,6 +89,10 @@
<td class="ds-info">HasSharedBlobs</td>
<td class="ds-info" id="tablet-info-shared-blobs">Loading...</td>
</tr>
+ <tr class="ds-info">
+ <td class="ds-info">Change sender</td>
+ <td class="ds-info" id="tablet-info-change-sender">Loading...</td>
+ </tr>
</tbody>
</table>
<table class="ds-info">