summaryrefslogtreecommitdiffstats
path: root/library/cpp/unified_agent_client/client_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'library/cpp/unified_agent_client/client_impl.cpp')
-rw-r--r--library/cpp/unified_agent_client/client_impl.cpp80
1 files changed, 62 insertions, 18 deletions
diff --git a/library/cpp/unified_agent_client/client_impl.cpp b/library/cpp/unified_agent_client/client_impl.cpp
index 2eb38840e08..db7f3459267 100644
--- a/library/cpp/unified_agent_client/client_impl.cpp
+++ b/library/cpp/unified_agent_client/client_impl.cpp
@@ -400,6 +400,8 @@ namespace NUnifiedAgent::NPrivate {
Started = false;
SessionId.Clear();
+ NegotiatedProtocol.Clear();
+ SessionBindingToken.clear();
TrimmedCount = 0;
NextIndex = 0;
AckSeqNo.Clear();
@@ -515,14 +517,15 @@ namespace NUnifiedAgent::NPrivate {
if (timeout == TDuration::Zero()) {
return;
}
- if (ActiveGrpcCall &&
+ if (NegotiatedProtocol.Defined() && *NegotiatedProtocol > 0 &&
+ ActiveGrpcCall &&
!CloseStarted &&
Counters->InflightMessages.Val() > 0 &&
(Now() - TInstant::MicroSeconds(LastGrpcCallActivityUsec.load())) >= timeout)
{
- YLOG_ERR(Sprintf(
- "grpc call inactivity timeout reached [%s], cancelling active call for reconnect",
- timeout.ToString().c_str()));
+ YLOG_ERROR_T(
+ "grpc call inactivity timeout reached [{}], cancelling active call for reconnect",
+ timeout.ToString());
++Counters->GrpcCallsClosedByInactivity;
ActiveGrpcCall->BeginClose(true);
TouchGrpcCallActivity();
@@ -543,8 +546,8 @@ namespace NUnifiedAgent::NPrivate {
++Counters->ReceivedMessages;
Counters->ReceivedBytes += messageSize;
if (messageSize > Client->GetParameters().GrpcMaxMessageSize) {
- YLOG_ERR(Sprintf("message size [%zu] is greater than max grpc message size [%zu], message dropped",
- messageSize, Client->GetParameters().GrpcMaxMessageSize));
+ YLOG_ERROR_T("message size [{}] is greater than max grpc message size [{}], message dropped",
+ messageSize, Client->GetParameters().GrpcMaxMessageSize);
++Counters->DroppedMessages;
Counters->DroppedBytes += messageSize;
++Counters->ErrorsCount;
@@ -571,7 +574,7 @@ namespace NUnifiedAgent::NPrivate {
if (CloseRequested) {
g.Release();
- YLOG_ERR(Sprintf("session is closing, message dropped, [%zu] bytes", messageSize));
+ YLOG_ERROR_T("session is closing, message dropped, [{}] bytes", messageSize);
--Counters->InflightMessages;
Counters->InflightBytes -= messageSize;
++Counters->DroppedMessages;
@@ -740,9 +743,16 @@ namespace NUnifiedAgent::NPrivate {
void TClientSession::PrepareInitializeRequest(NUnifiedAgentProto::Request& target) {
auto& initializeMessage = *target.MutableInitialize();
+ const ui32 maxAccept = Client->GetParameters().MaxAcceptProtocolVersion;
+ if (maxAccept > 0) {
+ initializeMessage.set_accept_protocol_version(maxAccept);
+ }
if (SessionId.Defined()) {
initializeMessage.SetSessionId(*SessionId);
}
+ if (NegotiatedProtocol.Defined() && *NegotiatedProtocol > 0 && !SessionBindingToken.empty()) {
+ initializeMessage.set_session_binding_proof(SessionBindingToken);
+ }
if (Client->GetParameters().SharedSecretKey.Defined()) {
initializeMessage.SetSharedSecretKey(*Client->GetParameters().SharedSecretKey);
}
@@ -869,8 +879,10 @@ namespace NUnifiedAgent::NPrivate {
const auto addResult = requestBuilder.TryAddMessage(queueItem, *AckSeqNo + i + 1);
const size_t serializedLimitToLog = AgentMaxReceiveMessage.Defined() ? *AgentMaxReceiveMessage : 0;
if (addResult.LimitExceeded && target.GetDataBatch().SeqNoSize() == 0) {
- YLOG_ERR(Sprintf("single serialized message is too large [%zu] > [%zu], dropping it",
- addResult.NewSerializedRequestSize, serializedLimitToLog));
+ YLOG_ERROR_T(
+ "single serialized message is too large [{}] > [{}], dropping it",
+ addResult.NewSerializedRequestSize,
+ serializedLimitToLog);
queueItem.Skipped = true;
++Counters->DroppedMessages;
Counters->DroppedBytes += queueItem.Size;
@@ -944,9 +956,17 @@ namespace NUnifiedAgent::NPrivate {
YLOG_DEBUG(Sprintf("ack [%" PRIu64 "], [%zu] messages, [%zu] bytes", seqNo, messagesCount, bytesCount));
}
- void TClientSession::OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo) {
+ void TClientSession::OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo,
+ const TFMaybe<ui32>& protocolVersion, TString bindingToken) {
TouchGrpcCallActivity();
SessionId = sessionId;
+ if (protocolVersion.Defined() && *protocolVersion > 0) {
+ NegotiatedProtocol = *protocolVersion;
+ SessionBindingToken = std::move(bindingToken);
+ } else {
+ NegotiatedProtocol.Clear();
+ SessionBindingToken.clear();
+ }
Acknowledge(lastSeqNo);
NextIndex = TrimmedCount;
++Counters->GrpcCallsInitialized;
@@ -954,20 +974,31 @@ namespace NUnifiedAgent::NPrivate {
Counters->GrpcInflightBytes -= GrpcInflightBytes;
GrpcInflightMessages = 0;
GrpcInflightBytes = 0;
- YLOG_INFO(Sprintf("grpc call initialized, session_id [%s], last_seq_no [%" PRIu64 "]",
- sessionId.c_str(), lastSeqNo));
+ YLOG_INFO_F(
+ "grpc call initialized, session_id [{}], last_seq_no [{}], protocol_version [{}]",
+ sessionId,
+ lastSeqNo,
+ protocolVersion.Defined() ? ToString(*protocolVersion) : TString("legacy"));
}
- void TClientSession::OnGrpcCallFinished() {
+ void TClientSession::OnGrpcCallFinished(const grpc::Status& finishStatus) {
Y_ABORT_UNLESS(!Closed);
Y_ABORT_UNLESS(ActiveGrpcCall);
ActiveGrpcCall = nullptr;
+ if (!finishStatus.ok() && finishStatus.error_code() == grpc::StatusCode::ALREADY_EXISTS) {
+ SessionId.Clear();
+ NegotiatedProtocol.Clear();
+ SessionBindingToken.clear();
+ YLOG_WARNING_T(
+ "grpc session conflict (ALREADY_EXISTS), cleared session id, message [{}]",
+ TString{finishStatus.error_message()});
+ }
if (CloseStarted && (ForcedCloseStarted || WriteQueue.empty())) {
DoClose();
} else {
const auto reconnectTime = TInstant::Now() + Client->GetParameters().GrpcReconnectDelay;
MakeGrpcCallTimer->Set(reconnectTime);
- YLOG_INFO(Sprintf("grpc call delayed until [%s]", reconnectTime.ToString().c_str()));
+ YLOG_INFO_T("grpc call delayed until [{}]", reconnectTime.ToString());
}
}
@@ -1174,7 +1205,7 @@ namespace NUnifiedAgent::NPrivate {
Session.TouchGrpcCallActivity();
ReadPending = false;
if (FinishDone) {
- Session.OnGrpcCallFinished();
+ Session.OnGrpcCallFinished(FinishStatus);
return;
}
if (!ErrorOccured && status == EIOStatus::Error && WritesBlocked) {
@@ -1210,8 +1241,19 @@ namespace NUnifiedAgent::NPrivate {
static_cast<int>(Response.response_case())));
return;
}
- Session.OnGrpcCallInitialized(Response.GetInitialized().GetSessionId(),
- Response.GetInitialized().GetLastSeqNo());
+ {
+ const auto& init = Response.GetInitialized();
+ TFMaybe<ui32> protocolVersion;
+ if (init.has_protocol_version()) {
+ protocolVersion = init.protocol_version();
+ }
+ TString bindingToken;
+ if (!init.session_binding_token().empty()) {
+ bindingToken.assign(init.session_binding_token().data(), init.session_binding_token().size());
+ }
+ Session.OnGrpcCallInitialized(init.GetSessionId(), init.GetLastSeqNo(), protocolVersion,
+ std::move(bindingToken));
+ }
Initialized_ = true;
if (!WritePending) {
ScheduleWrite();
@@ -1254,7 +1296,7 @@ namespace NUnifiedAgent::NPrivate {
++Session.GetCounters().ErrorsCount;
}
if (!ReadPending) {
- Session.OnGrpcCallFinished();
+ Session.OnGrpcCallFinished(finishStatus);
}
}
@@ -1314,6 +1356,7 @@ namespace NUnifiedAgent {
, EnableForkSupport(false)
, GrpcMaxMessageSize(DefaultGrpcMaxMessageSize)
, Counters(nullptr)
+ , MaxAcceptProtocolVersion(DefaultMaxAcceptProtocolVersion)
{
}
@@ -1329,6 +1372,7 @@ namespace NUnifiedAgent {
const size_t TClientParameters::DefaultGrpcMaxMessageSize = 1_MB;
const TDuration TClientParameters::DefaultGrpcSendDelay = TDuration::MilliSeconds(10);
const TDuration TClientParameters::DefaultGrpcCallInactivityTimeout = TDuration::Seconds(30);
+ const ui32 TClientParameters::DefaultMaxAcceptProtocolVersion = 1;
TClientPtr MakeClient(const TClientParameters& parameters) {