#pragma once
/// Asynchronous Messaging Library implements framework for sending and
/// receiving messages between loosely connected processes.
#include "coreconn.h"
#include "defs.h"
#include "handler.h"
#include "handler_impl.h"
#include "local_flags.h"
#include "locator.h"
#include "message.h"
#include "message_status.h"
#include "network.h"
#include "queue_config.h"
#include "remote_connection_status.h"
#include "session.h"
#include "session_config.h"
#include "socket_addr.h"
#include <library/cpp/messagebus/actor/executor.h>
#include <library/cpp/messagebus/scheduler/scheduler.h>
#include <library/cpp/codecs/codecs.h>
#include <util/generic/array_ref.h>
#include <util/generic/buffer.h>
#include <util/generic/noncopyable.h>
#include <util/generic/ptr.h>
#include <util/stream/input.h>
#include <library/cpp/deprecated/atomic/atomic.h>
#include <util/system/condvar.h>
#include <util/system/type_name.h>
#include <util/system/event.h>
#include <util/system/mutex.h>
namespace NBus {
////////////////////////////////////////////////////////
/// \brief Common structure to store address information
int CompareByHost(const IRemoteAddr& l, const IRemoteAddr& r) noexcept;
bool operator<(const TNetAddr& a1, const TNetAddr& a2); // compare by addresses
/////////////////////////////////////////////////////////////////////////
/// \brief Handles routing and data encoding to/from wire
/// Protocol is stateless threadsafe singleton object that
/// encapsulates relationship between a message (TBusMessage) object
/// and destination server. Protocol object is reponsible for serializing in-memory
/// message and reply into the wire, retuning name of the service and resource
/// distribution key for given protocol.
/// Protocol object should transparently handle messages and replies.
/// This is interface only class, actuall instances of the protocols
/// should be created using templates inhereted from this base class.
class TBusProtocol {
private:
TString ServiceName;
int ServicePort;
public:
TBusProtocol(TBusService name = "UNKNOWN", int port = 0)
: ServiceName(name)
, ServicePort(port)
{
}
/// returns service type for this protocol and message
TBusService GetService() const {
return ServiceName.data();
}
/// returns port number for destination session to open socket
int GetPort() const {
return ServicePort;
}
virtual ~TBusProtocol() {
}
/// \brief serialized protocol specific data into TBusData
/// \note buffer passed to the function (data) is not empty, use append functions
virtual void Serialize(const TBusMessage* mess, TBuffer& data) = 0;
/// deserialized TBusData into new instance of the message
virtual TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) = 0;
/// returns key for messages of this protocol
virtual TBusKey GetKey(const TBusMessage*) {
return YBUS_KEYMIN;
}
/// default implementation of routing policy to allow overrides
virtual EMessageStatus GetDestination(const TBusClientSession* session, TBusMessage* mess, TBusLocator* locator, TNetAddr* addr);
/// codec for transport level compression
virtual NCodecs::TCodecPtr GetTransportCodec(void) const {
return NCodecs::ICodec::GetInstance("snappy");
}
};
class TBusSyncSourceSession: public TAtomicRefCount<TBusSyncSourceSession> {
friend class TBusMessageQueue;
public:
TBusSyncSourceSession(TIntrusivePtr< ::NBus::NPrivate::TBusSyncSourceSessionImpl> session);
~TBusSyncSourceSession();
void Shutdown();
TBusMessage* SendSyncMessage(TBusMessage* pMessage, EMessageStatus& status, const TNetAddr* addr = nullptr);
int RegisterService(const char* hostname, TBusKey start = YBUS_KEYMIN, TBusKey end = YBUS_KEYMAX, EIpVersion ipVersion = EIP_VERSION_4);
int GetInFlight();
const TBusProtocol* GetProto() const;
const TBusClientSession* GetBusClientSessionWorkaroundDoNotUse() const; // It's for TLoadBalancedProtocol::GetDestination() function that really needs TBusClientSession* unlike all other protocols. Look at review 32425 (http://rb.yandex-team.ru/arc/r/32425/) for more information.
private:
TIntrusivePtr< ::NBus::NPrivate::TBusSyncSourceSessionImpl> Session;
};
using TBusSyncClientSessionPtr = TIntrusivePtr<TBusSyncSourceSession>;
///////////////////////////////////////////////////////////////////
/// \brief Main message queue object, need one per application
class TBusMessageQueue: public TAtomicRefCount<TBusMessageQueue> {
/// allow mesage queue to be created only via factory
friend TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name);
friend class ::NBus::NPrivate::TRemoteConnection;
friend struct ::NBus::NPrivate::TBusSessionImpl;
friend class ::NBus::NPrivate::TAcceptor;
friend struct ::NBus::TBusServerSession;
private:
const TBusQueueConfig Config;
TMutex Lock;
TList<TIntrusivePtr< ::NBus::NPrivate::TBusSessionImpl>> Sessions;
TSimpleIntrusivePtr<TBusLocator> Locator;
NPrivate::TScheduler Scheduler;
::NActor::TExecutorPtr WorkQueue;
TAtomic Running;
TSystemEvent ShutdownComplete;
private:
/// constructor is protected, used NBus::CreateMessageQueue() to create a instance
TBusMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name);
public:
TString GetNameInternal() const;
~TBusMessageQueue();
void Stop();
bool IsRunning();
public:
void EnqueueWork(TArrayRef< ::NActor::IWorkItem* const> w) {
WorkQueue->EnqueueWork(w);
}
::NActor::TExecutor* GetExecutor() {
return WorkQueue.Get();
}
TString GetStatus(ui16 flags = YBUS_STATUS_CONNS) const;
// without sessions
NPrivate::TBusMessageQueueStatus GetStatusRecordInternal() const;
TString GetStatusSelf() const;
TString GetStatusSingleLine() const;
TBusLocator* GetLocator() const {
return Locator.Get();
}
TBusClientSessionPtr CreateSource(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, const TString& name = "");
TBusSyncClientSessionPtr CreateSyncSource(TBusProtocol* proto, const TBusClientSessionConfig& config, bool needReply = true, const TString& name = "");
TBusServerSessionPtr CreateDestination(TBusProtocol* proto, IBusServerHandler* hander, const TBusServerSessionConfig& config, const TString& name = "");
TBusServerSessionPtr CreateDestination(TBusProtocol* proto, IBusServerHandler* hander, const TBusServerSessionConfig& config, const TVector<TBindResult>& bindTo, const TString& name = "");
private:
void Destroy(TBusSession* session);
void Destroy(TBusSyncClientSessionPtr session);
public:
void Schedule(NPrivate::IScheduleItemAutoPtr i);
private:
void DestroyAllSessions();
void Add(TIntrusivePtr< ::NBus::NPrivate::TBusSessionImpl> session);
void Remove(TBusSession* session);
};
/////////////////////////////////////////////////////////////////
/// Factory methods to construct message queue
TBusMessageQueuePtr CreateMessageQueue(const char* name = "");
TBusMessageQueuePtr CreateMessageQueue(NActor::TExecutorPtr executor, const char* name = "");
TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, const char* name = "");
TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, TBusLocator* locator, const char* name = "");
TBusMessageQueuePtr CreateMessageQueue(const TBusQueueConfig& config, NActor::TExecutorPtr executor, TBusLocator* locator, const char* name = "");
}