diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/debug_receiver | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/debug_receiver')
6 files changed, 136 insertions, 0 deletions
diff --git a/library/cpp/messagebus/debug_receiver/debug_receiver.cpp b/library/cpp/messagebus/debug_receiver/debug_receiver.cpp new file mode 100644 index 0000000000..23b02d1003 --- /dev/null +++ b/library/cpp/messagebus/debug_receiver/debug_receiver.cpp @@ -0,0 +1,42 @@ +#include "debug_receiver_handler.h" +#include "debug_receiver_proto.h" + +#include <library/cpp/messagebus/ybus.h> + +#include <library/cpp/getopt/last_getopt.h> +#include <library/cpp/lwtrace/all.h> + +using namespace NBus; + +int main(int argc, char** argv) { + NLWTrace::StartLwtraceFromEnv(); + + TBusQueueConfig queueConfig; + TBusServerSessionConfig sessionConfig; + + NLastGetopt::TOpts opts; + + queueConfig.ConfigureLastGetopt(opts); + sessionConfig.ConfigureLastGetopt(opts); + + opts.AddLongOption("port").Required().RequiredArgument("PORT").StoreResult(&sessionConfig.ListenPort); + + opts.SetFreeArgsMax(0); + + NLastGetopt::TOptsParseResult r(&opts, argc, argv); + + TBusMessageQueuePtr q(CreateMessageQueue(queueConfig)); + + TDebugReceiverProtocol proto; + TDebugReceiverHandler handler; + + TBusServerSessionPtr serverSession = TBusServerSession::Create(&proto, &handler, sessionConfig, q); + // TODO: race is here + handler.ServerSession = serverSession.Get(); + + for (;;) { + Sleep(TDuration::Hours(17)); + } + + return 0; +} diff --git a/library/cpp/messagebus/debug_receiver/debug_receiver_handler.cpp b/library/cpp/messagebus/debug_receiver/debug_receiver_handler.cpp new file mode 100644 index 0000000000..05f99e94ca --- /dev/null +++ b/library/cpp/messagebus/debug_receiver/debug_receiver_handler.cpp @@ -0,0 +1,20 @@ +#include "debug_receiver_handler.h" + +#include "debug_receiver_proto.h" + +#include <util/generic/cast.h> +#include <util/string/printf.h> + +void TDebugReceiverHandler::OnError(TAutoPtr<NBus::TBusMessage>, NBus::EMessageStatus status) { + Cerr << "error " << status << "\n"; +} + +void TDebugReceiverHandler::OnMessage(NBus::TOnMessageContext& message) { + TDebugReceiverMessage* typedMessage = VerifyDynamicCast<TDebugReceiverMessage*>(message.GetMessage()); + Cerr << "type=" << typedMessage->GetHeader()->Type + << " size=" << typedMessage->GetHeader()->Size + << " flags=" << Sprintf("0x%04x", (int)typedMessage->GetHeader()->FlagsInternal) + << "\n"; + + message.ForgetRequest(); +} diff --git a/library/cpp/messagebus/debug_receiver/debug_receiver_handler.h b/library/cpp/messagebus/debug_receiver/debug_receiver_handler.h new file mode 100644 index 0000000000..0aed6b9984 --- /dev/null +++ b/library/cpp/messagebus/debug_receiver/debug_receiver_handler.h @@ -0,0 +1,10 @@ +#pragma once + +#include <library/cpp/messagebus/ybus.h> + +struct TDebugReceiverHandler: public NBus::IBusServerHandler { + NBus::TBusServerSession* ServerSession; + + void OnError(TAutoPtr<NBus::TBusMessage> pMessage, NBus::EMessageStatus status) override; + void OnMessage(NBus::TOnMessageContext& message) override; +}; diff --git a/library/cpp/messagebus/debug_receiver/debug_receiver_proto.cpp b/library/cpp/messagebus/debug_receiver/debug_receiver_proto.cpp new file mode 100644 index 0000000000..0c74f9ecc3 --- /dev/null +++ b/library/cpp/messagebus/debug_receiver/debug_receiver_proto.cpp @@ -0,0 +1,20 @@ +#include "debug_receiver_proto.h" + +using namespace NBus; + +TDebugReceiverProtocol::TDebugReceiverProtocol() + : TBusProtocol("debug receiver", 0) +{ +} + +void TDebugReceiverProtocol::Serialize(const NBus::TBusMessage*, TBuffer&) { + Y_FAIL("it is receiver only"); +} + +TAutoPtr<NBus::TBusMessage> TDebugReceiverProtocol::Deserialize(ui16, TArrayRef<const char> payload) { + THolder<TDebugReceiverMessage> r(new TDebugReceiverMessage(ECreateUninitialized())); + + r->Payload.Append(payload.data(), payload.size()); + + return r.Release(); +} diff --git a/library/cpp/messagebus/debug_receiver/debug_receiver_proto.h b/library/cpp/messagebus/debug_receiver/debug_receiver_proto.h new file mode 100644 index 0000000000..d34710dcf7 --- /dev/null +++ b/library/cpp/messagebus/debug_receiver/debug_receiver_proto.h @@ -0,0 +1,27 @@ +#pragma once + +#include <library/cpp/messagebus/ybus.h> + +struct TDebugReceiverMessage: public NBus::TBusMessage { + /// constructor to create messages on sending end + TDebugReceiverMessage(ui16 type) + : NBus::TBusMessage(type) + { + } + + /// constructor with serialzed data to examine the header + TDebugReceiverMessage(NBus::ECreateUninitialized) + : NBus::TBusMessage(NBus::ECreateUninitialized()) + { + } + + TBuffer Payload; +}; + +struct TDebugReceiverProtocol: public NBus::TBusProtocol { + TDebugReceiverProtocol(); + + void Serialize(const NBus::TBusMessage* mess, TBuffer& data) override; + + TAutoPtr<NBus::TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override; +}; diff --git a/library/cpp/messagebus/debug_receiver/ya.make b/library/cpp/messagebus/debug_receiver/ya.make new file mode 100644 index 0000000000..f1b14d35bb --- /dev/null +++ b/library/cpp/messagebus/debug_receiver/ya.make @@ -0,0 +1,17 @@ +PROGRAM(messagebus_debug_receiver) + +OWNER(g:messagebus) + +SRCS( + debug_receiver.cpp + debug_receiver_proto.cpp + debug_receiver_handler.cpp +) + +PEERDIR( + library/cpp/getopt + library/cpp/lwtrace + library/cpp/messagebus +) + +END() |