aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@yandex-team.ru>2022-04-07 07:27:31 +0300
committeraozeritsky <aozeritsky@yandex-team.ru>2022-04-07 07:27:31 +0300
commit02e04bdd43700600b9ebe6b1fe7cdf421285b136 (patch)
treebaa32cd881ff6d4ca944cbdd01b4c25c999b55eb
parent2d9b61d661f97eb64fc680983b2d3580cc2d302d (diff)
downloadydb-02e04bdd43700600b9ebe6b1fe7cdf421285b136.tar.gz
YQL-14608: More fixes on segf
ref:39ba20faad9e9ea52d34866846bdafcbb9fa91b9
-rw-r--r--ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp17
-rw-r--r--ydb/library/yql/utils/actors/http_sender_actor.cpp10
2 files changed, 17 insertions, 10 deletions
diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
index 826c7d0572..f5f52c8712 100644
--- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
+++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
@@ -70,6 +70,7 @@ struct TMetricsToSend {
struct TMetricsInflight {
TActorId HttpSenderId;
ui64 MetricsCount = 0;
+ ui64 BodySize = 0;
};
} // namespace
@@ -223,7 +224,7 @@ private:
return;
}
- HandleSuccessSolomonResponse(*res->HttpIncomingResponse->Get());
+ HandleSuccessSolomonResponse(*res->HttpIncomingResponse->Get(), ev->Cookie);
while (TryToSendNextBatch()) {}
}
@@ -340,12 +341,13 @@ private:
const auto metricsToSend = std::get<TMetricsToSend>(variant);
const NHttp::THttpOutgoingRequestPtr httpRequest = BuildSolomonRequest(metricsToSend.Data);
+ const auto bodySize = httpRequest->Body.Size();
const TActorId httpSenderId = Register(CreateHttpSenderActor(SelfId(), HttpProxyId));
- Send(httpSenderId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest));
+ Send(httpSenderId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest), /*flags=*/0, Cookie);
SINK_LOG_D("Sent " << metricsToSend.MetricsCount << " metrics with size of " << metricsToSend.Data.size() << " bytes to solomon");
*Metrics.SentMetrics += metricsToSend.MetricsCount;
- InflightBuffer.emplace(httpRequest, TMetricsInflight { httpSenderId, metricsToSend.MetricsCount });
+ InflightBuffer.emplace(Cookie++, TMetricsInflight { httpSenderId, metricsToSend.MetricsCount, bodySize });
return true;
}
@@ -361,7 +363,7 @@ private:
Y_ENSURE(false, "Bad type");
}
- void HandleSuccessSolomonResponse(const NHttp::TEvHttpProxy::TEvHttpIncomingResponse& response) {
+ void HandleSuccessSolomonResponse(const NHttp::TEvHttpProxy::TEvHttpIncomingResponse& response, ui64 cookie) {
SINK_LOG_D("Solomon response: " << response.Response->GetObfuscatedData());
NJson::TJsonParser parser;
switch (WriteParams.Shard.GetClusterType()) {
@@ -384,7 +386,7 @@ private:
}
Y_VERIFY(res.size() == 2);
- auto ptr = InflightBuffer.find(response.Request);
+ auto ptr = InflightBuffer.find(cookie);
Y_VERIFY(ptr != InflightBuffer.end());
const ui64 writtenMetricsCount = std::stoul(res[0]);
@@ -397,7 +399,7 @@ private:
SINK_LOG_W("Some metrics were not written. MetricsCount=" << ptr->second.MetricsCount << " writtenMetricsCount=" << writtenMetricsCount << " Solomon response: " << response.Response->GetObfuscatedData());
}
- FreeSpace += ptr->first->Body.Size();
+ FreeSpace += ptr->second.BodySize;
if (ShouldNotifyNewFreeSpace) {
Callbacks->ResumeExecution();
ShouldNotifyNewFreeSpace = false;
@@ -427,10 +429,11 @@ private:
bool ShouldNotifyNewFreeSpace = false;
std::optional<NDqProto::TCheckpoint> CheckpointInProgress = std::nullopt;
std::queue<std::variant<TMetricsToSend, NDqProto::TCheckpoint>> SendingBuffer;
- THashMap<NHttp::THttpOutgoingRequestPtr, TMetricsInflight> InflightBuffer;
+ THashMap<ui64, TMetricsInflight> InflightBuffer;
TMetricsEncoder UserMetricsEncoder;
std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider;
+ ui64 Cookie = 0;
};
std::pair<NYql::NDq::IDqSinkActor*, NActors::IActor*> CreateDqSolomonWriteActor(
diff --git a/ydb/library/yql/utils/actors/http_sender_actor.cpp b/ydb/library/yql/utils/actors/http_sender_actor.cpp
index 8a39c5754b..3cb61468c8 100644
--- a/ydb/library/yql/utils/actors/http_sender_actor.cpp
+++ b/ydb/library/yql/utils/actors/http_sender_actor.cpp
@@ -37,7 +37,9 @@ private:
)
void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr& ev) {
- Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(ev->Get()->Request));
+ Request = ev->Get()->Request;
+ Cookie = ev->Cookie;
+ Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(Request->Duplicate()));
}
void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr& ev) {
@@ -45,14 +47,14 @@ private:
const TString& error = res->GetError();
const bool isTerminal = error.empty() || MaxRetries && RetryCount >= MaxRetries;
- Send(SenderId, new TEvHttpBase::TEvSendResult(ev, RetryCount++, isTerminal));
+ Send(SenderId, new TEvHttpBase::TEvSendResult(ev, RetryCount++, isTerminal), /*flags=*/0, Cookie);
if (isTerminal) {
PassAway();
return;
}
- Schedule(GetRetryDelay(), new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(res->Request->Duplicate()));
+ Schedule(GetRetryDelay(), new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(Request->Duplicate()));
}
void Handle(TEvents::TEvPoison::TPtr&) {
@@ -72,6 +74,8 @@ private:
ui32 RetryCount = 0;
TDuration CurrentDelay = BaseRetryDelay;
+ NHttp::THttpOutgoingRequestPtr Request;
+ ui64 Cookie = 0;
};
NActors::IActor* CreateHttpSenderActor(TActorId senderId, TActorId httpProxyId) {