diff options
Diffstat (limited to 'library/cpp')
| -rw-r--r-- | library/cpp/unified_agent_client/client.h | 11 | ||||
| -rw-r--r-- | library/cpp/unified_agent_client/client_impl.cpp | 80 | ||||
| -rw-r--r-- | library/cpp/unified_agent_client/client_impl.h | 9 | ||||
| -rw-r--r-- | library/cpp/unified_agent_client/proto/unified_agent.proto | 17 |
4 files changed, 97 insertions, 20 deletions
diff --git a/library/cpp/unified_agent_client/client.h b/library/cpp/unified_agent_client/client.h index 3828b1d3638..bc5372d217e 100644 --- a/library/cpp/unified_agent_client/client.h +++ b/library/cpp/unified_agent_client/client.h @@ -68,6 +68,8 @@ namespace NUnifiedAgent { // Force-cancel and recreate active grpc stream if there are inflight messages, // but no grpc callback activity for too long. This is a fail-safe for transport stalls. + // Only applies after a successful session init with negotiated protocol_version > 0; + // legacy sessions (no negotiated version) are not force-cancelled on inactivity. // // Default: 30 sec (use 0 to disable) TClientParameters& SetGrpcCallInactivityTimeout(TDuration timeout) { @@ -90,6 +92,13 @@ namespace NUnifiedAgent { return *this; } + // Max protocol version offered to the agent (HTTP Accept-style). 0 = legacy only (do not send accept_protocol_version). + // Default: DefaultMaxAcceptProtocolVersion (v1). + TClientParameters& SetMaxAcceptProtocolVersion(ui32 version) { + MaxAcceptProtocolVersion = version; + return *this; + } + // Client library sends messages to grpc in batches, this parameter // establishes upper limit on the size of single batch in bytes. // If you increase this value, don't forget to adjust max_receive_message_size (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6661788#L185) @@ -129,6 +138,7 @@ namespace NUnifiedAgent { static const size_t DefaultGrpcMaxMessageSize; static const TDuration DefaultGrpcSendDelay; static const TDuration DefaultGrpcCallInactivityTimeout; + static const ui32 DefaultMaxAcceptProtocolVersion; public: TString Uri; @@ -142,6 +152,7 @@ namespace NUnifiedAgent { bool EnableForkSupport; size_t GrpcMaxMessageSize; TIntrusivePtr<TClientCounters> Counters; + ui32 MaxAcceptProtocolVersion; }; struct TSessionParameters { diff --git a/library/cpp/unified_agent_client/client_impl.cpp b/library/cpp/unified_agent_client/client_impl.cpp index 2eb38840e08..db7f3459267 100644 --- a/library/cpp/unified_agent_client/client_impl.cpp +++ b/library/cpp/unified_agent_client/client_impl.cpp @@ -400,6 +400,8 @@ namespace NUnifiedAgent::NPrivate { Started = false; SessionId.Clear(); + NegotiatedProtocol.Clear(); + SessionBindingToken.clear(); TrimmedCount = 0; NextIndex = 0; AckSeqNo.Clear(); @@ -515,14 +517,15 @@ namespace NUnifiedAgent::NPrivate { if (timeout == TDuration::Zero()) { return; } - if (ActiveGrpcCall && + if (NegotiatedProtocol.Defined() && *NegotiatedProtocol > 0 && + ActiveGrpcCall && !CloseStarted && Counters->InflightMessages.Val() > 0 && (Now() - TInstant::MicroSeconds(LastGrpcCallActivityUsec.load())) >= timeout) { - YLOG_ERR(Sprintf( - "grpc call inactivity timeout reached [%s], cancelling active call for reconnect", - timeout.ToString().c_str())); + YLOG_ERROR_T( + "grpc call inactivity timeout reached [{}], cancelling active call for reconnect", + timeout.ToString()); ++Counters->GrpcCallsClosedByInactivity; ActiveGrpcCall->BeginClose(true); TouchGrpcCallActivity(); @@ -543,8 +546,8 @@ namespace NUnifiedAgent::NPrivate { ++Counters->ReceivedMessages; Counters->ReceivedBytes += messageSize; if (messageSize > Client->GetParameters().GrpcMaxMessageSize) { - YLOG_ERR(Sprintf("message size [%zu] is greater than max grpc message size [%zu], message dropped", - messageSize, Client->GetParameters().GrpcMaxMessageSize)); + YLOG_ERROR_T("message size [{}] is greater than max grpc message size [{}], message dropped", + messageSize, Client->GetParameters().GrpcMaxMessageSize); ++Counters->DroppedMessages; Counters->DroppedBytes += messageSize; ++Counters->ErrorsCount; @@ -571,7 +574,7 @@ namespace NUnifiedAgent::NPrivate { if (CloseRequested) { g.Release(); - YLOG_ERR(Sprintf("session is closing, message dropped, [%zu] bytes", messageSize)); + YLOG_ERROR_T("session is closing, message dropped, [{}] bytes", messageSize); --Counters->InflightMessages; Counters->InflightBytes -= messageSize; ++Counters->DroppedMessages; @@ -740,9 +743,16 @@ namespace NUnifiedAgent::NPrivate { void TClientSession::PrepareInitializeRequest(NUnifiedAgentProto::Request& target) { auto& initializeMessage = *target.MutableInitialize(); + const ui32 maxAccept = Client->GetParameters().MaxAcceptProtocolVersion; + if (maxAccept > 0) { + initializeMessage.set_accept_protocol_version(maxAccept); + } if (SessionId.Defined()) { initializeMessage.SetSessionId(*SessionId); } + if (NegotiatedProtocol.Defined() && *NegotiatedProtocol > 0 && !SessionBindingToken.empty()) { + initializeMessage.set_session_binding_proof(SessionBindingToken); + } if (Client->GetParameters().SharedSecretKey.Defined()) { initializeMessage.SetSharedSecretKey(*Client->GetParameters().SharedSecretKey); } @@ -869,8 +879,10 @@ namespace NUnifiedAgent::NPrivate { const auto addResult = requestBuilder.TryAddMessage(queueItem, *AckSeqNo + i + 1); const size_t serializedLimitToLog = AgentMaxReceiveMessage.Defined() ? *AgentMaxReceiveMessage : 0; if (addResult.LimitExceeded && target.GetDataBatch().SeqNoSize() == 0) { - YLOG_ERR(Sprintf("single serialized message is too large [%zu] > [%zu], dropping it", - addResult.NewSerializedRequestSize, serializedLimitToLog)); + YLOG_ERROR_T( + "single serialized message is too large [{}] > [{}], dropping it", + addResult.NewSerializedRequestSize, + serializedLimitToLog); queueItem.Skipped = true; ++Counters->DroppedMessages; Counters->DroppedBytes += queueItem.Size; @@ -944,9 +956,17 @@ namespace NUnifiedAgent::NPrivate { YLOG_DEBUG(Sprintf("ack [%" PRIu64 "], [%zu] messages, [%zu] bytes", seqNo, messagesCount, bytesCount)); } - void TClientSession::OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo) { + void TClientSession::OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo, + const TFMaybe<ui32>& protocolVersion, TString bindingToken) { TouchGrpcCallActivity(); SessionId = sessionId; + if (protocolVersion.Defined() && *protocolVersion > 0) { + NegotiatedProtocol = *protocolVersion; + SessionBindingToken = std::move(bindingToken); + } else { + NegotiatedProtocol.Clear(); + SessionBindingToken.clear(); + } Acknowledge(lastSeqNo); NextIndex = TrimmedCount; ++Counters->GrpcCallsInitialized; @@ -954,20 +974,31 @@ namespace NUnifiedAgent::NPrivate { Counters->GrpcInflightBytes -= GrpcInflightBytes; GrpcInflightMessages = 0; GrpcInflightBytes = 0; - YLOG_INFO(Sprintf("grpc call initialized, session_id [%s], last_seq_no [%" PRIu64 "]", - sessionId.c_str(), lastSeqNo)); + YLOG_INFO_F( + "grpc call initialized, session_id [{}], last_seq_no [{}], protocol_version [{}]", + sessionId, + lastSeqNo, + protocolVersion.Defined() ? ToString(*protocolVersion) : TString("legacy")); } - void TClientSession::OnGrpcCallFinished() { + void TClientSession::OnGrpcCallFinished(const grpc::Status& finishStatus) { Y_ABORT_UNLESS(!Closed); Y_ABORT_UNLESS(ActiveGrpcCall); ActiveGrpcCall = nullptr; + if (!finishStatus.ok() && finishStatus.error_code() == grpc::StatusCode::ALREADY_EXISTS) { + SessionId.Clear(); + NegotiatedProtocol.Clear(); + SessionBindingToken.clear(); + YLOG_WARNING_T( + "grpc session conflict (ALREADY_EXISTS), cleared session id, message [{}]", + TString{finishStatus.error_message()}); + } if (CloseStarted && (ForcedCloseStarted || WriteQueue.empty())) { DoClose(); } else { const auto reconnectTime = TInstant::Now() + Client->GetParameters().GrpcReconnectDelay; MakeGrpcCallTimer->Set(reconnectTime); - YLOG_INFO(Sprintf("grpc call delayed until [%s]", reconnectTime.ToString().c_str())); + YLOG_INFO_T("grpc call delayed until [{}]", reconnectTime.ToString()); } } @@ -1174,7 +1205,7 @@ namespace NUnifiedAgent::NPrivate { Session.TouchGrpcCallActivity(); ReadPending = false; if (FinishDone) { - Session.OnGrpcCallFinished(); + Session.OnGrpcCallFinished(FinishStatus); return; } if (!ErrorOccured && status == EIOStatus::Error && WritesBlocked) { @@ -1210,8 +1241,19 @@ namespace NUnifiedAgent::NPrivate { static_cast<int>(Response.response_case()))); return; } - Session.OnGrpcCallInitialized(Response.GetInitialized().GetSessionId(), - Response.GetInitialized().GetLastSeqNo()); + { + const auto& init = Response.GetInitialized(); + TFMaybe<ui32> protocolVersion; + if (init.has_protocol_version()) { + protocolVersion = init.protocol_version(); + } + TString bindingToken; + if (!init.session_binding_token().empty()) { + bindingToken.assign(init.session_binding_token().data(), init.session_binding_token().size()); + } + Session.OnGrpcCallInitialized(init.GetSessionId(), init.GetLastSeqNo(), protocolVersion, + std::move(bindingToken)); + } Initialized_ = true; if (!WritePending) { ScheduleWrite(); @@ -1254,7 +1296,7 @@ namespace NUnifiedAgent::NPrivate { ++Session.GetCounters().ErrorsCount; } if (!ReadPending) { - Session.OnGrpcCallFinished(); + Session.OnGrpcCallFinished(finishStatus); } } @@ -1314,6 +1356,7 @@ namespace NUnifiedAgent { , EnableForkSupport(false) , GrpcMaxMessageSize(DefaultGrpcMaxMessageSize) , Counters(nullptr) + , MaxAcceptProtocolVersion(DefaultMaxAcceptProtocolVersion) { } @@ -1329,6 +1372,7 @@ namespace NUnifiedAgent { const size_t TClientParameters::DefaultGrpcMaxMessageSize = 1_MB; const TDuration TClientParameters::DefaultGrpcSendDelay = TDuration::MilliSeconds(10); const TDuration TClientParameters::DefaultGrpcCallInactivityTimeout = TDuration::Seconds(30); + const ui32 TClientParameters::DefaultMaxAcceptProtocolVersion = 1; TClientPtr MakeClient(const TClientParameters& parameters) { diff --git a/library/cpp/unified_agent_client/client_impl.h b/library/cpp/unified_agent_client/client_impl.h index 1f1cb052d3a..75c475876a3 100644 --- a/library/cpp/unified_agent_client/client_impl.h +++ b/library/cpp/unified_agent_client/client_impl.h @@ -9,6 +9,8 @@ #include <library/cpp/unified_agent_client/proto/unified_agent.grpc.pb.h> #include <library/cpp/unified_agent_client/grpc_io.h> +#include <contrib/libs/grpc/include/grpcpp/support/status.h> + #include <library/cpp/logger/global/global.h> #include <util/generic/deque.h> @@ -144,9 +146,10 @@ namespace NUnifiedAgent::NPrivate { void Acknowledge(ui64 seqNo); - void OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo); + void OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo, const TFMaybe<ui32>& protocolVersion, + TString bindingToken); - void OnGrpcCallFinished(); + void OnGrpcCallFinished(const grpc::Status& finishStatus); NThreading::TFuture<void> PreFork(); @@ -255,6 +258,8 @@ namespace NUnifiedAgent::NPrivate { TIntrusivePtr<TClient> Client; TFMaybe<TString> OriginalSessionId; TFMaybe<TString> SessionId; + TFMaybe<ui32> NegotiatedProtocol; + TString SessionBindingToken; TFMaybe<THashMap<TString, TString>> Meta; TScopeLogger Logger; bool CloseStarted; diff --git a/library/cpp/unified_agent_client/proto/unified_agent.proto b/library/cpp/unified_agent_client/proto/unified_agent.proto index 68efe357476..f13cf372e0c 100644 --- a/library/cpp/unified_agent_client/proto/unified_agent.proto +++ b/library/cpp/unified_agent_client/proto/unified_agent.proto @@ -24,6 +24,12 @@ message Request { repeated SessionMetaItem meta = 2; string shared_secret_key = 3; + + // Max protocol version the client supports (HTTP Accept-style). Unset or 0 = legacy only. + optional uint32 accept_protocol_version = 4; + + // Proof of session binding for reconnect; used when negotiated protocol_version > 0. + bytes session_binding_proof = 5; } message MessageMetaItem { @@ -66,6 +72,12 @@ message Response { // Application can skip all formed messages by seq_no upto last_seq_no - they are consumed by server. uint64 last_seq_no = 2; + + // Negotiated protocol version (min(accept, server_max)). Unset or 0 = legacy session. + optional uint32 protocol_version = 3; + + // Opaque token; client must send it back as session_binding_proof on reconnect (protocol v1+). + bytes session_binding_token = 4; } message Ack { @@ -99,3 +111,8 @@ service UnifiedAgentService { // // Exactly once retries - when reconnect, client must provide previous session_id and same seq_no`s // for records - only in this case UnifiedAgent can dedup. +// +// Protocol negotiation (optional fields): +// - Initialize.accept_protocol_version: max version the client supports (0 / unset = legacy only). +// - Initialized.protocol_version: negotiated min(accept, server_max); unset/0 = legacy session. +// - Initialized.session_binding_token + Initialize.session_binding_proof: v1+ session binding (opaque). |
