aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/debug_receiver
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/debug_receiver
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/debug_receiver')
-rw-r--r--library/cpp/messagebus/debug_receiver/debug_receiver.cpp42
-rw-r--r--library/cpp/messagebus/debug_receiver/debug_receiver_handler.cpp20
-rw-r--r--library/cpp/messagebus/debug_receiver/debug_receiver_handler.h10
-rw-r--r--library/cpp/messagebus/debug_receiver/debug_receiver_proto.cpp20
-rw-r--r--library/cpp/messagebus/debug_receiver/debug_receiver_proto.h27
-rw-r--r--library/cpp/messagebus/debug_receiver/ya.make17
6 files changed, 136 insertions, 0 deletions
diff --git a/library/cpp/messagebus/debug_receiver/debug_receiver.cpp b/library/cpp/messagebus/debug_receiver/debug_receiver.cpp
new file mode 100644
index 0000000000..23b02d1003
--- /dev/null
+++ b/library/cpp/messagebus/debug_receiver/debug_receiver.cpp
@@ -0,0 +1,42 @@
+#include "debug_receiver_handler.h"
+#include "debug_receiver_proto.h"
+
+#include <library/cpp/messagebus/ybus.h>
+
+#include <library/cpp/getopt/last_getopt.h>
+#include <library/cpp/lwtrace/all.h>
+
+using namespace NBus;
+
+int main(int argc, char** argv) {
+ NLWTrace::StartLwtraceFromEnv();
+
+ TBusQueueConfig queueConfig;
+ TBusServerSessionConfig sessionConfig;
+
+ NLastGetopt::TOpts opts;
+
+ queueConfig.ConfigureLastGetopt(opts);
+ sessionConfig.ConfigureLastGetopt(opts);
+
+ opts.AddLongOption("port").Required().RequiredArgument("PORT").StoreResult(&sessionConfig.ListenPort);
+
+ opts.SetFreeArgsMax(0);
+
+ NLastGetopt::TOptsParseResult r(&opts, argc, argv);
+
+ TBusMessageQueuePtr q(CreateMessageQueue(queueConfig));
+
+ TDebugReceiverProtocol proto;
+ TDebugReceiverHandler handler;
+
+ TBusServerSessionPtr serverSession = TBusServerSession::Create(&proto, &handler, sessionConfig, q);
+ // TODO: race is here
+ handler.ServerSession = serverSession.Get();
+
+ for (;;) {
+ Sleep(TDuration::Hours(17));
+ }
+
+ return 0;
+}
diff --git a/library/cpp/messagebus/debug_receiver/debug_receiver_handler.cpp b/library/cpp/messagebus/debug_receiver/debug_receiver_handler.cpp
new file mode 100644
index 0000000000..05f99e94ca
--- /dev/null
+++ b/library/cpp/messagebus/debug_receiver/debug_receiver_handler.cpp
@@ -0,0 +1,20 @@
+#include "debug_receiver_handler.h"
+
+#include "debug_receiver_proto.h"
+
+#include <util/generic/cast.h>
+#include <util/string/printf.h>
+
+void TDebugReceiverHandler::OnError(TAutoPtr<NBus::TBusMessage>, NBus::EMessageStatus status) {
+ Cerr << "error " << status << "\n";
+}
+
+void TDebugReceiverHandler::OnMessage(NBus::TOnMessageContext& message) {
+ TDebugReceiverMessage* typedMessage = VerifyDynamicCast<TDebugReceiverMessage*>(message.GetMessage());
+ Cerr << "type=" << typedMessage->GetHeader()->Type
+ << " size=" << typedMessage->GetHeader()->Size
+ << " flags=" << Sprintf("0x%04x", (int)typedMessage->GetHeader()->FlagsInternal)
+ << "\n";
+
+ message.ForgetRequest();
+}
diff --git a/library/cpp/messagebus/debug_receiver/debug_receiver_handler.h b/library/cpp/messagebus/debug_receiver/debug_receiver_handler.h
new file mode 100644
index 0000000000..0aed6b9984
--- /dev/null
+++ b/library/cpp/messagebus/debug_receiver/debug_receiver_handler.h
@@ -0,0 +1,10 @@
+#pragma once
+
+#include <library/cpp/messagebus/ybus.h>
+
+struct TDebugReceiverHandler: public NBus::IBusServerHandler {
+ NBus::TBusServerSession* ServerSession;
+
+ void OnError(TAutoPtr<NBus::TBusMessage> pMessage, NBus::EMessageStatus status) override;
+ void OnMessage(NBus::TOnMessageContext& message) override;
+};
diff --git a/library/cpp/messagebus/debug_receiver/debug_receiver_proto.cpp b/library/cpp/messagebus/debug_receiver/debug_receiver_proto.cpp
new file mode 100644
index 0000000000..0c74f9ecc3
--- /dev/null
+++ b/library/cpp/messagebus/debug_receiver/debug_receiver_proto.cpp
@@ -0,0 +1,20 @@
+#include "debug_receiver_proto.h"
+
+using namespace NBus;
+
+TDebugReceiverProtocol::TDebugReceiverProtocol()
+ : TBusProtocol("debug receiver", 0)
+{
+}
+
+void TDebugReceiverProtocol::Serialize(const NBus::TBusMessage*, TBuffer&) {
+ Y_FAIL("it is receiver only");
+}
+
+TAutoPtr<NBus::TBusMessage> TDebugReceiverProtocol::Deserialize(ui16, TArrayRef<const char> payload) {
+ THolder<TDebugReceiverMessage> r(new TDebugReceiverMessage(ECreateUninitialized()));
+
+ r->Payload.Append(payload.data(), payload.size());
+
+ return r.Release();
+}
diff --git a/library/cpp/messagebus/debug_receiver/debug_receiver_proto.h b/library/cpp/messagebus/debug_receiver/debug_receiver_proto.h
new file mode 100644
index 0000000000..d34710dcf7
--- /dev/null
+++ b/library/cpp/messagebus/debug_receiver/debug_receiver_proto.h
@@ -0,0 +1,27 @@
+#pragma once
+
+#include <library/cpp/messagebus/ybus.h>
+
+struct TDebugReceiverMessage: public NBus::TBusMessage {
+ /// constructor to create messages on sending end
+ TDebugReceiverMessage(ui16 type)
+ : NBus::TBusMessage(type)
+ {
+ }
+
+ /// constructor with serialzed data to examine the header
+ TDebugReceiverMessage(NBus::ECreateUninitialized)
+ : NBus::TBusMessage(NBus::ECreateUninitialized())
+ {
+ }
+
+ TBuffer Payload;
+};
+
+struct TDebugReceiverProtocol: public NBus::TBusProtocol {
+ TDebugReceiverProtocol();
+
+ void Serialize(const NBus::TBusMessage* mess, TBuffer& data) override;
+
+ TAutoPtr<NBus::TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override;
+};
diff --git a/library/cpp/messagebus/debug_receiver/ya.make b/library/cpp/messagebus/debug_receiver/ya.make
new file mode 100644
index 0000000000..f1b14d35bb
--- /dev/null
+++ b/library/cpp/messagebus/debug_receiver/ya.make
@@ -0,0 +1,17 @@
+PROGRAM(messagebus_debug_receiver)
+
+OWNER(g:messagebus)
+
+SRCS(
+ debug_receiver.cpp
+ debug_receiver_proto.cpp
+ debug_receiver_handler.cpp
+)
+
+PEERDIR(
+ library/cpp/getopt
+ library/cpp/lwtrace
+ library/cpp/messagebus
+)
+
+END()