aboutsummaryrefslogtreecommitdiffstats
path: root/kikimr/yndx/grpc_services/persqueue/grpc_pq_write.cpp
diff options
context:
space:
mode:
authorkomels <komels@yandex-team.ru>2022-04-14 13:10:53 +0300
committerkomels <komels@yandex-team.ru>2022-04-14 13:10:53 +0300
commit21c9b0e6b039e9765eb414c406c2b86e8cea6850 (patch)
treef40ebc18ff8958dfbd189954ad024043ca983ea5 /kikimr/yndx/grpc_services/persqueue/grpc_pq_write.cpp
parent9a4effa852abe489707139c2b260dccc6f4f9aa9 (diff)
downloadydb-21c9b0e6b039e9765eb414c406c2b86e8cea6850.tar.gz
Final part on compatibility layer: LOGBROKER-7215
ref:777c67aadbf705d19034a09a792b2df61ba53697
Diffstat (limited to 'kikimr/yndx/grpc_services/persqueue/grpc_pq_write.cpp')
-rw-r--r--kikimr/yndx/grpc_services/persqueue/grpc_pq_write.cpp221
1 files changed, 221 insertions, 0 deletions
diff --git a/kikimr/yndx/grpc_services/persqueue/grpc_pq_write.cpp b/kikimr/yndx/grpc_services/persqueue/grpc_pq_write.cpp
new file mode 100644
index 0000000000..36ba3fa8f6
--- /dev/null
+++ b/kikimr/yndx/grpc_services/persqueue/grpc_pq_write.cpp
@@ -0,0 +1,221 @@
+#include "grpc_pq_write.h"
+#include "grpc_pq_actor.h"
+#include "grpc_pq_session.h"
+#include "ydb/core/client/server/grpc_proxy_status.h"
+
+#include <ydb/core/base/appdata.h>
+#include <util/generic/queue.h>
+
+using namespace NActors;
+using namespace NKikimrClient;
+
+using grpc::Status;
+
+namespace NKikimr {
+namespace NGRpcProxy {
+
+using namespace NPersQueue;
+
+///////////////////////////////////////////////////////////////////////////////
+
+
+void TPQWriteServiceImpl::TSession::OnCreated() { // Start waiting for new session.
+ Proxy->WaitWriteSession();
+ if (Proxy->TooMuchSessions()) {
+ ReplyWithError("proxy overloaded", NPersQueue::NErrorCode::OVERLOAD);
+ return;
+ }
+ TMaybe<TString> localCluster = Proxy->AvailableLocalCluster();
+ if (NeedDiscoverClusters) {
+ if (!localCluster.Defined()) {
+ ReplyWithError("initializing", NPersQueue::NErrorCode::INITIALIZING);
+ return;
+ } else if (localCluster->empty()) {
+ ReplyWithError("cluster disabled", NPersQueue::NErrorCode::CLUSTER_DISABLED);
+ return;
+ } else {
+ CreateActor(*localCluster);
+ }
+ } else {
+ CreateActor(TString());
+ }
+ ReadyForNextRead();
+}
+
+void TPQWriteServiceImpl::TSession::OnRead(const TWriteRequest& request) {
+
+ switch (request.GetRequestCase()) {
+ case TWriteRequest::kInit: {
+ SendEvent(new TEvPQProxy::TEvWriteInit(request, GetPeerName(), GetDatabase()));
+ break;
+ }
+ case TWriteRequest::kDataBatch:
+ case TWriteRequest::kData: {
+ SendEvent(new TEvPQProxy::TEvWrite(request));
+ break;
+ }
+ default: {
+ ReplyWithError("unsupported request", NPersQueue::NErrorCode::BAD_REQUEST);
+ }
+ }
+}
+
+void TPQWriteServiceImpl::TSession::OnDone() {
+ SendEvent(new TEvPQProxy::TEvDone());
+}
+
+TPQWriteServiceImpl::TSession::TSession(std::shared_ptr<TPQWriteServiceImpl> proxy,
+ grpc::ServerCompletionQueue* cq, ui64 cookie, const TActorId& schemeCache,
+ TIntrusivePtr<NMonitoring::TDynamicCounters> counters, bool needDiscoverClusters)
+ : ISession(cq)
+ , Proxy(proxy)
+ , Cookie(cookie)
+ , SchemeCache(schemeCache)
+ , Counters(counters)
+ , NeedDiscoverClusters(needDiscoverClusters)
+{
+}
+
+void TPQWriteServiceImpl::TSession::Start() {
+ if (!Proxy->IsShuttingDown()) {
+ Proxy->RequestSession(&Context, &Stream, CQ, CQ, new TRequestCreated(this));
+ }
+}
+
+ui64 TPQWriteServiceImpl::TSession::GetCookie() const {
+ return Cookie;
+}
+
+void TPQWriteServiceImpl::TSession::DestroyStream(const TString& reason, const NPersQueue::NErrorCode::EErrorCode errorCode) {
+ // Send poison pill to the actor(if it is alive)
+ SendEvent(new TEvPQProxy::TEvDieCommand("write-session " + ToString<ui64>(Cookie) + ": " + reason, errorCode));
+ // Remove reference to session from "cookie -> session" map.
+ Proxy->ReleaseSession(this);
+}
+
+bool TPQWriteServiceImpl::TSession::IsShuttingDown() const {
+ return Proxy->IsShuttingDown();
+}
+
+void TPQWriteServiceImpl::TSession::CreateActor(const TString &localCluster) {
+
+ auto classifier = Proxy->GetClassifier();
+ ActorId = Proxy->ActorSystem->Register(
+ new TWriteSessionActor(this, Cookie, SchemeCache, Counters, localCluster,
+ classifier ? classifier->ClassifyAddress(GetPeerName())
+ : "unknown"), TMailboxType::Simple, 0
+ );
+}
+
+void TPQWriteServiceImpl::TSession::SendEvent(IEventBase* ev) {
+ Proxy->ActorSystem->Send(ActorId, ev);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+
+
+TPQWriteServiceImpl::TPQWriteServiceImpl(grpc::ServerCompletionQueue* cq,
+ NActors::TActorSystem* as, const TActorId& schemeCache,
+ TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const ui32 maxSessions)
+ : CQ(cq)
+ , ActorSystem(as)
+ , SchemeCache(schemeCache)
+ , Counters(counters)
+ , MaxSessions(maxSessions)
+ , NeedDiscoverClusters(false)
+{
+}
+
+void TPQWriteServiceImpl::InitClustersUpdater()
+{
+ TAppData* appData = ActorSystem->AppData<TAppData>();
+ NeedDiscoverClusters = !appData->PQConfig.GetTopicsAreFirstClassCitizen();
+ if (NeedDiscoverClusters) {
+ ActorSystem->Register(new TClustersUpdater(this));
+ }
+}
+
+
+ui64 TPQWriteServiceImpl::NextCookie() {
+ return AtomicIncrement(LastCookie);
+}
+
+
+void TPQWriteServiceImpl::ReleaseSession(TSessionRef session) {
+ with_lock (Lock) {
+ bool erased = Sessions.erase(session->GetCookie());
+ if (erased) {
+ ActorSystem->Send(MakeGRpcProxyStatusID(ActorSystem->NodeId), new TEvGRpcProxyStatus::TEvUpdateStatus(0, 0, -1, 0));
+ }
+ }
+}
+
+
+void TPQWriteServiceImpl::SetupIncomingRequests() {
+ WaitWriteSession();
+}
+
+
+void TPQWriteServiceImpl::WaitWriteSession() {
+
+ const ui64 cookie = NextCookie();
+
+ ActorSystem->Send(MakeGRpcProxyStatusID(ActorSystem->NodeId), new TEvGRpcProxyStatus::TEvUpdateStatus(0,0,1,0));
+
+ TSessionRef session(new TSession(shared_from_this(), CQ, cookie, SchemeCache, Counters, NeedDiscoverClusters));
+ {
+ with_lock (Lock) {
+ Sessions[cookie] = session;
+ }
+ }
+
+ session->Start();
+}
+
+
+bool TPQWriteServiceImpl::TooMuchSessions() {
+ with_lock (Lock) {
+ return Sessions.size() >= MaxSessions;
+ }
+}
+
+
+TMaybe<TString> TPQWriteServiceImpl::AvailableLocalCluster() {
+ with_lock (Lock) {
+ return AvailableLocalClusterName;
+ }
+}
+
+
+void TPQWriteServiceImpl::NetClassifierUpdated(NAddressClassifier::TLabeledAddressClassifier::TConstPtr classifier) {
+ auto g(Guard(Lock));
+ if (!DatacenterClassifier) {
+ for (auto it = Sessions.begin(); it != Sessions.end();) {
+ auto jt = it++;
+ jt->second->DestroyStream("datacenter classifier initialized, restart session please", NPersQueue::NErrorCode::INITIALIZING);
+ }
+ }
+
+ DatacenterClassifier = classifier;
+}
+
+
+
+void TPQWriteServiceImpl::CheckClusterChange(const TString &localCluster, const bool enabled) {
+ with_lock (Lock) {
+ AvailableLocalClusterName = enabled ? localCluster : TString();
+
+ if (!enabled) {
+ for (auto it = Sessions.begin(); it != Sessions.end();) {
+ auto jt = it++;
+ jt->second->DestroyStream("cluster disabled", NPersQueue::NErrorCode::CLUSTER_DISABLED);
+ }
+ }
+ }
+}
+
+
+///////////////////////////////////////////////////////////////////////////////
+
+}
+}