diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-01-11 09:31:43 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-01-11 09:31:43 +0300 |
commit | d2f394ea9eed33077773721fd3bac7d1688e35c8 (patch) | |
tree | 501a2a66998cc54aa6914238d7bc4a0d70b0fce1 | |
parent | c3317f5485ca2ff2a3f834d6154f74fbefdd5c08 (diff) | |
download | ydb-d2f394ea9eed33077773721fd3bac7d1688e35c8.tar.gz |
Fix crash while writing empty values to S3 objects
8 files changed, 42 insertions, 27 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 086169fb957..ec59045d641 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -145,22 +145,27 @@ public: void Bootstrap(const TActorId& parentId) { ParentId = parentId; LOG_D("TS3FileWriteActor", __func__ << " by " << ParentId << " for Key: [" << Key << "], Url: [" << Url << "], request id: [" << RequestId << "]"); - if (Parts->IsSealed() && 1U == Parts->Size()) { - const auto size = Parts->Volume(); + if (Parts->IsSealed() && Parts->Size() <= 1) { + Become(&TS3FileWriteActor::SinglepartWorkingStateFunc); + const size_t size = Max<size_t>(Parts->Volume(), 1); InFlight += size; SentSize += size; - Gateway->Upload(Url, MakeHeaders(RequestId), Parts->Pop(), std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, RequestId, SentSize, std::placeholders::_1), true, RetryPolicy); + Gateway->Upload(Url, MakeHeaders(RequestId), Parts->Pop(), std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, RequestId, size, std::placeholders::_1), true, RetryPolicy); } else { - Become(&TS3FileWriteActor::InitialStateFunc); + Become(&TS3FileWriteActor::MultipartInitialStateFunc); Gateway->Upload(Url + "?uploads", MakeHeaders(RequestId), 0, std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, RequestId, std::placeholders::_1), false, RetryPolicy); } } static constexpr char ActorName[] = "S3_FILE_WRITE_ACTOR"; + void Handle(TEvPrivate::TEvUploadFinished::TPtr& ev) { + InFlight -= ev->Get()->UploadSize; + } + void PassAway() override { if (InFlight || !Parts->Empty()) { - LOG_W("TS3FileWriteActor", "PassAway: but NOT finished, InFlight: " << InFlight << ", Parts: " << Parts->Size() << ", request id: [" << RequestId << "]"); + LOG_W("TS3FileWriteActor", "PassAway: but NOT finished, InFlight: " << InFlight << ", Parts: " << Parts->Size() << ", Sealed: " << Parts->IsSealed() << ", request id: [" << RequestId << "]"); } else { LOG_D("TS3FileWriteActor", "PassAway: request id: [" << RequestId << "]"); } @@ -182,11 +187,13 @@ public: void Finish() { Parts->Seal(); - if (!UploadId.empty()) - StartUploadParts(); - if (!InFlight && Parts->Empty()) - CommitUploadedParts(); + if (!UploadId.empty()) { + if (!Parts->Empty()) + StartUploadParts(); + else if (!InFlight && Parts->Empty()) + CommitUploadedParts(); + } } bool IsFinishing() const { return Parts->IsSealed(); } @@ -197,14 +204,18 @@ public: return InFlight + Parts->Volume(); } private: - STRICT_STFUNC(InitialStateFunc, + STRICT_STFUNC(MultipartInitialStateFunc, hFunc(TEvPrivate::TEvUploadStarted, Handle); ) - STRICT_STFUNC(WorkingStateFunc, + STRICT_STFUNC(MultipartWorkingStateFunc, hFunc(TEvPrivate::TEvUploadPartFinished, Handle); ) + STRICT_STFUNC(SinglepartWorkingStateFunc, + hFunc(TEvPrivate::TEvUploadFinished, Handle); + ) + static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& requestId, IHTTPGateway::TResult&& result) { switch (result.index()) { case 0U: try { @@ -294,6 +305,7 @@ private: actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(content.HttpResponseCode, TStringBuilder{} << errorText << ", request id: [" << requestId << "]"))); } } else { + actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); } break; @@ -310,7 +322,7 @@ private: void Handle(TEvPrivate::TEvUploadStarted::TPtr& result) { UploadId = result->Get()->UploadId; - Become(&TS3FileWriteActor::WorkingStateFunc); + Become(&TS3FileWriteActor::MultipartWorkingStateFunc); StartUploadParts(); } @@ -600,10 +612,10 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( credentialsProviderFactory->CreateProvider(), randomProvider, params.GetUrl(), params.GetPath(), - params.HasExtension() ? params.GetExtension() : "", + params.GetExtension(), std::vector<TString>(params.GetKeys().cbegin(), params.GetKeys().cend()), params.HasMemoryLimit() ? params.GetMemoryLimit() : 1_GB, - params.HasCompression() ? params.GetCompression() : "", + params.GetCompression(), params.GetMultipart(), callbacks, retryPolicy); diff --git a/ydb/library/yql/providers/s3/compressors/brotli.cpp b/ydb/library/yql/providers/s3/compressors/brotli.cpp index d510d885df5..c61b97d3bf5 100644 --- a/ydb/library/yql/providers/s3/compressors/brotli.cpp +++ b/ydb/library/yql/providers/s3/compressors/brotli.cpp @@ -143,7 +143,7 @@ private: } void DoCompression() { - while (!InputQueue.Empty()) { + while (!InputQueue.Empty() || !TOutputQueue::IsSealed()) { const auto& pop = InputQueue.Pop(); const bool done = InputQueue.IsSealed() && InputQueue.Empty(); if (pop.empty() && !done) diff --git a/ydb/library/yql/providers/s3/compressors/bzip2.cpp b/ydb/library/yql/providers/s3/compressors/bzip2.cpp index b5d934e3f40..f895a371420 100644 --- a/ydb/library/yql/providers/s3/compressors/bzip2.cpp +++ b/ydb/library/yql/providers/s3/compressors/bzip2.cpp @@ -98,7 +98,7 @@ private: } void DoCompression() { - while (!InputQueue.Empty()) { + while (!InputQueue.Empty() || !TOutputQueue::IsSealed()) { const auto& pop = InputQueue.Pop(); const bool done = InputQueue.IsSealed() && InputQueue.Empty(); if (pop.empty() && !done) diff --git a/ydb/library/yql/providers/s3/compressors/gz.cpp b/ydb/library/yql/providers/s3/compressors/gz.cpp index b887b00aabc..d9adf1b0266 100644 --- a/ydb/library/yql/providers/s3/compressors/gz.cpp +++ b/ydb/library/yql/providers/s3/compressors/gz.cpp @@ -98,7 +98,7 @@ private: } void DoCompression() { - while (!InputQueue.Empty()) { + while (!InputQueue.Empty() || !TOutputQueue::IsSealed()) { const auto& pop = InputQueue.Pop(); const bool done = InputQueue.IsSealed() && InputQueue.Empty(); if (pop.empty() && !done) diff --git a/ydb/library/yql/providers/s3/compressors/lz4io.cpp b/ydb/library/yql/providers/s3/compressors/lz4io.cpp index 70bdfb87b23..be27d7702e3 100644 --- a/ydb/library/yql/providers/s3/compressors/lz4io.cpp +++ b/ydb/library/yql/providers/s3/compressors/lz4io.cpp @@ -214,12 +214,13 @@ private: } void DoCompression() { - while (true) { + while (!InputQueue.Empty() || !TOutputQueue::IsSealed()) { const auto& pop = InputQueue.Pop(); - if (pop.empty()) + const bool done = InputQueue.IsSealed() && InputQueue.Empty(); + if (pop.empty() && !done) break; - if (IsFirstBlock && InputQueue.IsSealed() && InputQueue.Empty()) { + if (IsFirstBlock && done) { const auto cSize = LZ4F_compressFrame_usingCDict(Ctx, OutputBuffer.get(), OutputBufferSize, pop.data(), pop.size(), nullptr, &Prefs); YQL_ENSURE(!LZ4F_isError(cSize), "Compression failed: " << LZ4F_getErrorName(cSize)); TOutputQueue::Push(TString(OutputBuffer.get(), cSize)); @@ -233,11 +234,13 @@ private: IsFirstBlock = false; } - const auto outSize = LZ4F_compressUpdate(Ctx, OutputBuffer.get(), OutputBufferSize, pop.data(), pop.size(), nullptr); - YQL_ENSURE(!LZ4F_isError(outSize), "Compression failed: " << LZ4F_getErrorName(outSize)); - TOutputQueue::Push(TString(OutputBuffer.get(), outSize)); + if (!pop.empty()) { + const auto outSize = LZ4F_compressUpdate(Ctx, OutputBuffer.get(), OutputBufferSize, pop.data(), pop.size(), nullptr); + YQL_ENSURE(!LZ4F_isError(outSize), "Compression failed: " << LZ4F_getErrorName(outSize)); + TOutputQueue::Push(TString(OutputBuffer.get(), outSize)); + } - if (InputQueue.IsSealed() && InputQueue.Empty()) { + if (done) { const auto endSize = LZ4F_compressEnd(Ctx, OutputBuffer.get(), OutputBufferSize, nullptr); YQL_ENSURE(!LZ4F_isError(endSize), "End of frame error: " << LZ4F_getErrorName(endSize)); TOutputQueue::Push(TString(OutputBuffer.get(), endSize)); diff --git a/ydb/library/yql/providers/s3/compressors/xz.cpp b/ydb/library/yql/providers/s3/compressors/xz.cpp index a4ce6318aaf..589bb1bad2a 100644 --- a/ydb/library/yql/providers/s3/compressors/xz.cpp +++ b/ydb/library/yql/providers/s3/compressors/xz.cpp @@ -142,7 +142,7 @@ private: } void DoCompression() { - while (!InputQueue.Empty()) { + while (!InputQueue.Empty() || !TOutputQueue::IsSealed()) { const auto& pop = InputQueue.Pop(); const bool done = InputQueue.IsSealed() && InputQueue.Empty(); if (pop.empty() && !done) diff --git a/ydb/library/yql/providers/s3/compressors/zstd.cpp b/ydb/library/yql/providers/s3/compressors/zstd.cpp index 213b68e5300..0d7d8217ffd 100644 --- a/ydb/library/yql/providers/s3/compressors/zstd.cpp +++ b/ydb/library/yql/providers/s3/compressors/zstd.cpp @@ -93,7 +93,7 @@ private: } void DoCompression() { - while (!InputQueue.Empty()) { + while (!InputQueue.Empty() || !TOutputQueue::IsSealed()) { const auto& pop = InputQueue.Pop(); const bool done = InputQueue.IsSealed() && InputQueue.Empty(); if (pop.empty() && !done) diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp index 970d49feea1..7389c5b55b7 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp @@ -168,7 +168,7 @@ public: } const TStringBuf format = targetNode.Format(); - if (format != "raw" || format != "json_list") { // multipart + if (format != "raw" && format != "json_list") { // multipart { TExprNode::TListType pair; pair.push_back(ctx.NewAtom(targetNode.Pos(), "multipart")); |