aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2023-01-11 09:31:43 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2023-01-11 09:31:43 +0300
commitd2f394ea9eed33077773721fd3bac7d1688e35c8 (patch)
tree501a2a66998cc54aa6914238d7bc4a0d70b0fce1
parentc3317f5485ca2ff2a3f834d6154f74fbefdd5c08 (diff)
downloadydb-d2f394ea9eed33077773721fd3bac7d1688e35c8.tar.gz
Fix crash while writing empty values to S3 objects
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp40
-rw-r--r--ydb/library/yql/providers/s3/compressors/brotli.cpp2
-rw-r--r--ydb/library/yql/providers/s3/compressors/bzip2.cpp2
-rw-r--r--ydb/library/yql/providers/s3/compressors/gz.cpp2
-rw-r--r--ydb/library/yql/providers/s3/compressors/lz4io.cpp17
-rw-r--r--ydb/library/yql/providers/s3/compressors/xz.cpp2
-rw-r--r--ydb/library/yql/providers/s3/compressors/zstd.cpp2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp2
8 files changed, 42 insertions, 27 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index 086169fb957..ec59045d641 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -145,22 +145,27 @@ public:
void Bootstrap(const TActorId& parentId) {
ParentId = parentId;
LOG_D("TS3FileWriteActor", __func__ << " by " << ParentId << " for Key: [" << Key << "], Url: [" << Url << "], request id: [" << RequestId << "]");
- if (Parts->IsSealed() && 1U == Parts->Size()) {
- const auto size = Parts->Volume();
+ if (Parts->IsSealed() && Parts->Size() <= 1) {
+ Become(&TS3FileWriteActor::SinglepartWorkingStateFunc);
+ const size_t size = Max<size_t>(Parts->Volume(), 1);
InFlight += size;
SentSize += size;
- Gateway->Upload(Url, MakeHeaders(RequestId), Parts->Pop(), std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, RequestId, SentSize, std::placeholders::_1), true, RetryPolicy);
+ Gateway->Upload(Url, MakeHeaders(RequestId), Parts->Pop(), std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, RequestId, size, std::placeholders::_1), true, RetryPolicy);
} else {
- Become(&TS3FileWriteActor::InitialStateFunc);
+ Become(&TS3FileWriteActor::MultipartInitialStateFunc);
Gateway->Upload(Url + "?uploads", MakeHeaders(RequestId), 0, std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, RequestId, std::placeholders::_1), false, RetryPolicy);
}
}
static constexpr char ActorName[] = "S3_FILE_WRITE_ACTOR";
+ void Handle(TEvPrivate::TEvUploadFinished::TPtr& ev) {
+ InFlight -= ev->Get()->UploadSize;
+ }
+
void PassAway() override {
if (InFlight || !Parts->Empty()) {
- LOG_W("TS3FileWriteActor", "PassAway: but NOT finished, InFlight: " << InFlight << ", Parts: " << Parts->Size() << ", request id: [" << RequestId << "]");
+ LOG_W("TS3FileWriteActor", "PassAway: but NOT finished, InFlight: " << InFlight << ", Parts: " << Parts->Size() << ", Sealed: " << Parts->IsSealed() << ", request id: [" << RequestId << "]");
} else {
LOG_D("TS3FileWriteActor", "PassAway: request id: [" << RequestId << "]");
}
@@ -182,11 +187,13 @@ public:
void Finish() {
Parts->Seal();
- if (!UploadId.empty())
- StartUploadParts();
- if (!InFlight && Parts->Empty())
- CommitUploadedParts();
+ if (!UploadId.empty()) {
+ if (!Parts->Empty())
+ StartUploadParts();
+ else if (!InFlight && Parts->Empty())
+ CommitUploadedParts();
+ }
}
bool IsFinishing() const { return Parts->IsSealed(); }
@@ -197,14 +204,18 @@ public:
return InFlight + Parts->Volume();
}
private:
- STRICT_STFUNC(InitialStateFunc,
+ STRICT_STFUNC(MultipartInitialStateFunc,
hFunc(TEvPrivate::TEvUploadStarted, Handle);
)
- STRICT_STFUNC(WorkingStateFunc,
+ STRICT_STFUNC(MultipartWorkingStateFunc,
hFunc(TEvPrivate::TEvUploadPartFinished, Handle);
)
+ STRICT_STFUNC(SinglepartWorkingStateFunc,
+ hFunc(TEvPrivate::TEvUploadFinished, Handle);
+ )
+
static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& requestId, IHTTPGateway::TResult&& result) {
switch (result.index()) {
case 0U: try {
@@ -294,6 +305,7 @@ private:
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(content.HttpResponseCode, TStringBuilder{} << errorText << ", request id: [" << requestId << "]")));
}
} else {
+ actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize)));
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize)));
}
break;
@@ -310,7 +322,7 @@ private:
void Handle(TEvPrivate::TEvUploadStarted::TPtr& result) {
UploadId = result->Get()->UploadId;
- Become(&TS3FileWriteActor::WorkingStateFunc);
+ Become(&TS3FileWriteActor::MultipartWorkingStateFunc);
StartUploadParts();
}
@@ -600,10 +612,10 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
credentialsProviderFactory->CreateProvider(),
randomProvider, params.GetUrl(),
params.GetPath(),
- params.HasExtension() ? params.GetExtension() : "",
+ params.GetExtension(),
std::vector<TString>(params.GetKeys().cbegin(), params.GetKeys().cend()),
params.HasMemoryLimit() ? params.GetMemoryLimit() : 1_GB,
- params.HasCompression() ? params.GetCompression() : "",
+ params.GetCompression(),
params.GetMultipart(),
callbacks,
retryPolicy);
diff --git a/ydb/library/yql/providers/s3/compressors/brotli.cpp b/ydb/library/yql/providers/s3/compressors/brotli.cpp
index d510d885df5..c61b97d3bf5 100644
--- a/ydb/library/yql/providers/s3/compressors/brotli.cpp
+++ b/ydb/library/yql/providers/s3/compressors/brotli.cpp
@@ -143,7 +143,7 @@ private:
}
void DoCompression() {
- while (!InputQueue.Empty()) {
+ while (!InputQueue.Empty() || !TOutputQueue::IsSealed()) {
const auto& pop = InputQueue.Pop();
const bool done = InputQueue.IsSealed() && InputQueue.Empty();
if (pop.empty() && !done)
diff --git a/ydb/library/yql/providers/s3/compressors/bzip2.cpp b/ydb/library/yql/providers/s3/compressors/bzip2.cpp
index b5d934e3f40..f895a371420 100644
--- a/ydb/library/yql/providers/s3/compressors/bzip2.cpp
+++ b/ydb/library/yql/providers/s3/compressors/bzip2.cpp
@@ -98,7 +98,7 @@ private:
}
void DoCompression() {
- while (!InputQueue.Empty()) {
+ while (!InputQueue.Empty() || !TOutputQueue::IsSealed()) {
const auto& pop = InputQueue.Pop();
const bool done = InputQueue.IsSealed() && InputQueue.Empty();
if (pop.empty() && !done)
diff --git a/ydb/library/yql/providers/s3/compressors/gz.cpp b/ydb/library/yql/providers/s3/compressors/gz.cpp
index b887b00aabc..d9adf1b0266 100644
--- a/ydb/library/yql/providers/s3/compressors/gz.cpp
+++ b/ydb/library/yql/providers/s3/compressors/gz.cpp
@@ -98,7 +98,7 @@ private:
}
void DoCompression() {
- while (!InputQueue.Empty()) {
+ while (!InputQueue.Empty() || !TOutputQueue::IsSealed()) {
const auto& pop = InputQueue.Pop();
const bool done = InputQueue.IsSealed() && InputQueue.Empty();
if (pop.empty() && !done)
diff --git a/ydb/library/yql/providers/s3/compressors/lz4io.cpp b/ydb/library/yql/providers/s3/compressors/lz4io.cpp
index 70bdfb87b23..be27d7702e3 100644
--- a/ydb/library/yql/providers/s3/compressors/lz4io.cpp
+++ b/ydb/library/yql/providers/s3/compressors/lz4io.cpp
@@ -214,12 +214,13 @@ private:
}
void DoCompression() {
- while (true) {
+ while (!InputQueue.Empty() || !TOutputQueue::IsSealed()) {
const auto& pop = InputQueue.Pop();
- if (pop.empty())
+ const bool done = InputQueue.IsSealed() && InputQueue.Empty();
+ if (pop.empty() && !done)
break;
- if (IsFirstBlock && InputQueue.IsSealed() && InputQueue.Empty()) {
+ if (IsFirstBlock && done) {
const auto cSize = LZ4F_compressFrame_usingCDict(Ctx, OutputBuffer.get(), OutputBufferSize, pop.data(), pop.size(), nullptr, &Prefs);
YQL_ENSURE(!LZ4F_isError(cSize), "Compression failed: " << LZ4F_getErrorName(cSize));
TOutputQueue::Push(TString(OutputBuffer.get(), cSize));
@@ -233,11 +234,13 @@ private:
IsFirstBlock = false;
}
- const auto outSize = LZ4F_compressUpdate(Ctx, OutputBuffer.get(), OutputBufferSize, pop.data(), pop.size(), nullptr);
- YQL_ENSURE(!LZ4F_isError(outSize), "Compression failed: " << LZ4F_getErrorName(outSize));
- TOutputQueue::Push(TString(OutputBuffer.get(), outSize));
+ if (!pop.empty()) {
+ const auto outSize = LZ4F_compressUpdate(Ctx, OutputBuffer.get(), OutputBufferSize, pop.data(), pop.size(), nullptr);
+ YQL_ENSURE(!LZ4F_isError(outSize), "Compression failed: " << LZ4F_getErrorName(outSize));
+ TOutputQueue::Push(TString(OutputBuffer.get(), outSize));
+ }
- if (InputQueue.IsSealed() && InputQueue.Empty()) {
+ if (done) {
const auto endSize = LZ4F_compressEnd(Ctx, OutputBuffer.get(), OutputBufferSize, nullptr);
YQL_ENSURE(!LZ4F_isError(endSize), "End of frame error: " << LZ4F_getErrorName(endSize));
TOutputQueue::Push(TString(OutputBuffer.get(), endSize));
diff --git a/ydb/library/yql/providers/s3/compressors/xz.cpp b/ydb/library/yql/providers/s3/compressors/xz.cpp
index a4ce6318aaf..589bb1bad2a 100644
--- a/ydb/library/yql/providers/s3/compressors/xz.cpp
+++ b/ydb/library/yql/providers/s3/compressors/xz.cpp
@@ -142,7 +142,7 @@ private:
}
void DoCompression() {
- while (!InputQueue.Empty()) {
+ while (!InputQueue.Empty() || !TOutputQueue::IsSealed()) {
const auto& pop = InputQueue.Pop();
const bool done = InputQueue.IsSealed() && InputQueue.Empty();
if (pop.empty() && !done)
diff --git a/ydb/library/yql/providers/s3/compressors/zstd.cpp b/ydb/library/yql/providers/s3/compressors/zstd.cpp
index 213b68e5300..0d7d8217ffd 100644
--- a/ydb/library/yql/providers/s3/compressors/zstd.cpp
+++ b/ydb/library/yql/providers/s3/compressors/zstd.cpp
@@ -93,7 +93,7 @@ private:
}
void DoCompression() {
- while (!InputQueue.Empty()) {
+ while (!InputQueue.Empty() || !TOutputQueue::IsSealed()) {
const auto& pop = InputQueue.Pop();
const bool done = InputQueue.IsSealed() && InputQueue.Empty();
if (pop.empty() && !done)
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
index 970d49feea1..7389c5b55b7 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
@@ -168,7 +168,7 @@ public:
}
const TStringBuf format = targetNode.Format();
- if (format != "raw" || format != "json_list") { // multipart
+ if (format != "raw" && format != "json_list") { // multipart
{
TExprNode::TListType pair;
pair.push_back(ctx.NewAtom(targetNode.Pos(), "multipart"));