diff options
author | aozeritsky <aozeritsky@yandex-team.ru> | 2022-04-07 07:27:31 +0300 |
---|---|---|
committer | aozeritsky <aozeritsky@yandex-team.ru> | 2022-04-07 07:27:31 +0300 |
commit | 02e04bdd43700600b9ebe6b1fe7cdf421285b136 (patch) | |
tree | baa32cd881ff6d4ca944cbdd01b4c25c999b55eb | |
parent | 2d9b61d661f97eb64fc680983b2d3580cc2d302d (diff) | |
download | ydb-02e04bdd43700600b9ebe6b1fe7cdf421285b136.tar.gz |
YQL-14608: More fixes on segf
ref:39ba20faad9e9ea52d34866846bdafcbb9fa91b9
-rw-r--r-- | ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp | 17 | ||||
-rw-r--r-- | ydb/library/yql/utils/actors/http_sender_actor.cpp | 10 |
2 files changed, 17 insertions, 10 deletions
diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp index 826c7d0572..f5f52c8712 100644 --- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp +++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp @@ -70,6 +70,7 @@ struct TMetricsToSend { struct TMetricsInflight { TActorId HttpSenderId; ui64 MetricsCount = 0; + ui64 BodySize = 0; }; } // namespace @@ -223,7 +224,7 @@ private: return; } - HandleSuccessSolomonResponse(*res->HttpIncomingResponse->Get()); + HandleSuccessSolomonResponse(*res->HttpIncomingResponse->Get(), ev->Cookie); while (TryToSendNextBatch()) {} } @@ -340,12 +341,13 @@ private: const auto metricsToSend = std::get<TMetricsToSend>(variant); const NHttp::THttpOutgoingRequestPtr httpRequest = BuildSolomonRequest(metricsToSend.Data); + const auto bodySize = httpRequest->Body.Size(); const TActorId httpSenderId = Register(CreateHttpSenderActor(SelfId(), HttpProxyId)); - Send(httpSenderId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)); + Send(httpSenderId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest), /*flags=*/0, Cookie); SINK_LOG_D("Sent " << metricsToSend.MetricsCount << " metrics with size of " << metricsToSend.Data.size() << " bytes to solomon"); *Metrics.SentMetrics += metricsToSend.MetricsCount; - InflightBuffer.emplace(httpRequest, TMetricsInflight { httpSenderId, metricsToSend.MetricsCount }); + InflightBuffer.emplace(Cookie++, TMetricsInflight { httpSenderId, metricsToSend.MetricsCount, bodySize }); return true; } @@ -361,7 +363,7 @@ private: Y_ENSURE(false, "Bad type"); } - void HandleSuccessSolomonResponse(const NHttp::TEvHttpProxy::TEvHttpIncomingResponse& response) { + void HandleSuccessSolomonResponse(const NHttp::TEvHttpProxy::TEvHttpIncomingResponse& response, ui64 cookie) { SINK_LOG_D("Solomon response: " << response.Response->GetObfuscatedData()); NJson::TJsonParser parser; switch (WriteParams.Shard.GetClusterType()) { @@ -384,7 +386,7 @@ private: } Y_VERIFY(res.size() == 2); - auto ptr = InflightBuffer.find(response.Request); + auto ptr = InflightBuffer.find(cookie); Y_VERIFY(ptr != InflightBuffer.end()); const ui64 writtenMetricsCount = std::stoul(res[0]); @@ -397,7 +399,7 @@ private: SINK_LOG_W("Some metrics were not written. MetricsCount=" << ptr->second.MetricsCount << " writtenMetricsCount=" << writtenMetricsCount << " Solomon response: " << response.Response->GetObfuscatedData()); } - FreeSpace += ptr->first->Body.Size(); + FreeSpace += ptr->second.BodySize; if (ShouldNotifyNewFreeSpace) { Callbacks->ResumeExecution(); ShouldNotifyNewFreeSpace = false; @@ -427,10 +429,11 @@ private: bool ShouldNotifyNewFreeSpace = false; std::optional<NDqProto::TCheckpoint> CheckpointInProgress = std::nullopt; std::queue<std::variant<TMetricsToSend, NDqProto::TCheckpoint>> SendingBuffer; - THashMap<NHttp::THttpOutgoingRequestPtr, TMetricsInflight> InflightBuffer; + THashMap<ui64, TMetricsInflight> InflightBuffer; TMetricsEncoder UserMetricsEncoder; std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider; + ui64 Cookie = 0; }; std::pair<NYql::NDq::IDqSinkActor*, NActors::IActor*> CreateDqSolomonWriteActor( diff --git a/ydb/library/yql/utils/actors/http_sender_actor.cpp b/ydb/library/yql/utils/actors/http_sender_actor.cpp index 8a39c5754b..3cb61468c8 100644 --- a/ydb/library/yql/utils/actors/http_sender_actor.cpp +++ b/ydb/library/yql/utils/actors/http_sender_actor.cpp @@ -37,7 +37,9 @@ private: ) void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& ev) { - Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(ev->Get()->Request)); + Request = ev->Get()->Request; + Cookie = ev->Cookie; + Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(Request->Duplicate())); } void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr& ev) { @@ -45,14 +47,14 @@ private: const TString& error = res->GetError(); const bool isTerminal = error.empty() || MaxRetries && RetryCount >= MaxRetries; - Send(SenderId, new TEvHttpBase::TEvSendResult(ev, RetryCount++, isTerminal)); + Send(SenderId, new TEvHttpBase::TEvSendResult(ev, RetryCount++, isTerminal), /*flags=*/0, Cookie); if (isTerminal) { PassAway(); return; } - Schedule(GetRetryDelay(), new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(res->Request->Duplicate())); + Schedule(GetRetryDelay(), new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(Request->Duplicate())); } void Handle(TEvents::TEvPoison::TPtr&) { @@ -72,6 +74,8 @@ private: ui32 RetryCount = 0; TDuration CurrentDelay = BaseRetryDelay; + NHttp::THttpOutgoingRequestPtr Request; + ui64 Cookie = 0; }; NActors::IActor* CreateHttpSenderActor(TActorId senderId, TActorId httpProxyId) { |