aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2022-11-07 12:22:03 +0300
committerilnaz <ilnaz@ydb.tech>2022-11-07 12:22:03 +0300
commit53f587219b1281a60091e6dba560ff3d7e0a1310 (patch)
treede2e76f6404320bc2663ae870bb8e71345a92b6f
parenteb936463c91a94f63f99d26dda2063fdd4ccda11 (diff)
downloadydb-53f587219b1281a60091e6dba560ff3d7e0a1310.tar.gz
Do not decompress the whole portion at once (if possible)
-rw-r--r--ydb/core/tx/datashard/import_s3.cpp125
-rw-r--r--ydb/core/tx/schemeshard/ut_restore.cpp25
2 files changed, 98 insertions, 52 deletions
diff --git a/ydb/core/tx/datashard/import_s3.cpp b/ydb/core/tx/datashard/import_s3.cpp
index f3ba06180c4..ba4f23c09d0 100644
--- a/ydb/core/tx/datashard/import_s3.cpp
+++ b/ydb/core/tx/datashard/import_s3.cpp
@@ -54,7 +54,7 @@ using namespace Aws;
class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
class IReadController {
public:
- enum EFeed {
+ enum EDataStatus {
READY_DATA = 0,
NOT_ENOUGH_DATA = 1,
ERROR = 2,
@@ -62,11 +62,10 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
public:
virtual ~IReadController() = default;
- // Returns status or error
- virtual EFeed Feed(TString&& portion, TString& error) = 0;
- // Returns view to internal buffer with ready data after successful Feed()
- virtual TStringBuf GetReadyData() const = 0;
- // Clear internal buffer & makes it ready for another Feed()
+ virtual void Feed(TString&& portion) = 0;
+ // Returns data (view to internal buffer) or error
+ virtual EDataStatus TryGetData(TStringBuf& data, TString& error) = 0;
+ // Clear internal buffer & makes it ready for another Feed() and TryGetData()
virtual void Confirm() = 0;
virtual ui64 PendingBytes() const = 0;
virtual ui64 ReadyBytes() const = 0;
@@ -78,6 +77,8 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
: RangeSize(rangeSize)
, BufferSizeLimit(bufferSizeLimit)
{
+ // able to contain at least one range
+ Buffer.Reserve(RangeSize);
}
std::pair<ui64, ui64> NextRange(ui64 contentLength, ui64 processedBytes) const {
@@ -90,8 +91,8 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
}
protected:
- bool CheckBufferSize(size_t size, TString& reason) const {
- if ((size + RangeSize) >= BufferSizeLimit) {
+ bool CanIncreaseBuffer(size_t size, size_t delta, TString& reason) const {
+ if ((size + delta) >= BufferSizeLimit) {
reason = "reached buffer size limit";
return false;
}
@@ -99,6 +100,10 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
return true;
}
+ bool CanRequestNextRange(size_t size, TString& reason) const {
+ return CanIncreaseBuffer(size, RangeSize, reason);
+ }
+
TStringBuf AsStringBuf(size_t size) const {
return TStringBuf(Buffer.Data(), size);
}
@@ -116,14 +121,16 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
public:
using TReadController::TReadController;
- EFeed Feed(TString&& portion, TString& error) override {
+ void Feed(TString&& portion) override {
+ Buffer.Append(portion.data(), portion.size());
+ }
+
+ EDataStatus TryGetData(TStringBuf& data, TString& error) override {
Y_VERIFY(Pos == 0);
- Buffer.Append(portion.data(), portion.size());
const ui64 pos = AsStringBuf(Buffer.Size()).rfind('\n');
-
if (TString::npos == pos) {
- if (!CheckBufferSize(Buffer.Size(), error)) {
+ if (!CanRequestNextRange(Buffer.Size(), error)) {
return ERROR;
} else {
return NOT_ENOUGH_DATA;
@@ -131,11 +138,9 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
}
Pos = pos + 1 /* \n */;
- return READY_DATA;
- }
+ data = AsStringBuf(Pos);
- TStringBuf GetReadyData() const override {
- return AsStringBuf(Pos);
+ return READY_DATA;
}
void Confirm() override {
@@ -167,13 +172,21 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
, Context(ZSTD_createDCtx())
{
Reset();
+ // able to contain at least one block
+ // take effect if RangeSize < BlockSize
+ Buffer.Reserve(AppData()->ZstdBlockSizeForTest.GetOrElse(ZSTD_BLOCKSIZE_MAX));
}
- EFeed Feed(TString&& portion, TString& error) override {
+ void Feed(TString&& portion) override {
+ Y_VERIFY(Portion.Empty());
+ Portion.Assign(portion.data(), portion.size());
+ }
+
+ EDataStatus TryGetData(TStringBuf& data, TString& error) override {
Y_VERIFY(ReadyInputBytes == 0 && ReadyOutputPos == 0);
- auto input = ZSTD_inBuffer{portion.data(), portion.size(), 0};
- while (input.pos < input.size) {
+ auto input = ZSTD_inBuffer{Portion.Data(), Portion.Size(), 0};
+ while (!ReadyOutputPos) {
PendingInputBytes -= input.pos; // dec before decompress
auto output = ZSTD_outBuffer{Buffer.Data(), Buffer.Capacity(), Buffer.Size()};
@@ -184,18 +197,11 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
return ERROR;
}
- if (output.pos == output.size) {
- if (!CheckBufferSize(Buffer.Capacity(), error)) {
- return ERROR;
- }
-
- Buffer.Reserve(Buffer.Capacity() + AppData()->ZstdBlockSizeForTest.GetOrElse(ZSTD_BLOCKSIZE_MAX));
- }
-
PendingInputBytes += input.pos; // inc after decompress
Buffer.Proceed(output.pos);
if (res == 0) {
+ // end of frame
if (AsStringBuf(Buffer.Size()).back() != '\n') {
error = "cannot find new line symbol";
return ERROR;
@@ -205,35 +211,48 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
ReadyOutputPos = Buffer.Size();
Reset();
} else {
+ // try to find complete row
const ui64 pos = AsStringBuf(Buffer.Size()).rfind('\n');
if (TString::npos != pos) {
ReadyOutputPos = pos + 1 /* \n */;
}
}
+
+ if (input.pos >= input.size) {
+ // end of input
+ break;
+ }
+
+ if (!ReadyOutputPos && output.pos == output.size) {
+ const auto blockSize = AppData()->ZstdBlockSizeForTest.GetOrElse(ZSTD_BLOCKSIZE_MAX);
+ if (CanIncreaseBuffer(Buffer.Size(), blockSize, error)) {
+ Buffer.Reserve(Buffer.Size() + blockSize);
+ } else {
+ return ERROR;
+ }
+ }
}
+ Portion.ChopHead(input.pos);
+
if (!ReadyOutputPos) {
- if (!CheckBufferSize(Buffer.Size(), error)) {
+ if (!CanRequestNextRange(Buffer.Size(), error)) {
return ERROR;
} else {
return NOT_ENOUGH_DATA;
}
}
+ data = AsStringBuf(ReadyOutputPos);
return READY_DATA;
}
- TStringBuf GetReadyData() const override {
- return AsStringBuf(ReadyOutputPos);
- }
-
void Confirm() override {
Buffer.ChopHead(ReadyOutputPos);
+ ReadyOutputPos = 0;
PendingInputBytes -= ReadyInputBytes;
ReadyInputBytes = 0;
-
- ReadyOutputPos = 0;
}
ui64 PendingBytes() const override {
@@ -246,6 +265,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
private:
THolder<::ZSTD_DCtx, DestroyZCtx> Context;
+ TBuffer Portion;
ui64 PendingInputBytes = 0;
ui64 ReadyInputBytes = 0;
ui64 ReadyOutputPos = 0;
@@ -428,8 +448,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
return Finish();
}
- GetObject(Settings.GetDataKey(DataFormat, CompressionCodec),
- Reader->NextRange(ContentLength, ProcessedBytes));
+ Process();
}
void Handle(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
@@ -452,8 +471,15 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
<< ", content-length# " << ContentLength
<< ", body-size# " << msg.Body.size());
+ Reader->Feed(std::move(msg.Body));
+ Process();
+ }
+
+ void Process() {
+ TStringBuf data;
TString error;
- switch (Reader->Feed(std::move(msg.Body), error)) {
+
+ switch (Reader->TryGetData(data, error)) {
case TReadController::READY_DATA:
break;
@@ -471,23 +497,18 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
<< ": " << error);
}
- if (auto data = Reader->GetReadyData()) {
- ProcessedBytes += Reader->ReadyBytes();
- RequestBuilder.New(TableInfo, Scheme);
+ RequestBuilder.New(TableInfo, Scheme);
+ TMemoryPool pool(256);
+ while (ProcessData(data, pool));
- TMemoryPool pool(256);
- while (ProcessData(data, pool));
-
- if (Reader->ReadyBytes()) { // has progress
- WrittenRows += std::exchange(PendingRows, 0);
- WrittenBytes += std::exchange(PendingBytes, 0);
- }
-
- UploadRows();
- Reader->Confirm();
- } else {
- Y_FAIL("unreachable");
+ if (const auto processed = Reader->ReadyBytes()) { // has progress
+ ProcessedBytes += processed;
+ WrittenBytes += std::exchange(PendingBytes, 0);
+ WrittenRows += std::exchange(PendingRows, 0);
}
+
+ Reader->Confirm();
+ UploadRows();
}
bool ProcessData(TStringBuf& data, TMemoryPool& pool) {
diff --git a/ydb/core/tx/schemeshard/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore.cpp
index b71bae36885..e454425f541 100644
--- a/ydb/core/tx/schemeshard/ut_restore.cpp
+++ b/ydb/core/tx/schemeshard/ut_restore.cpp
@@ -516,6 +516,31 @@ Y_UNIT_TEST_SUITE(TRestoreTests) {
NKqp::CompareYson(data.YsonStr, content);
}
+ Y_UNIT_TEST(ShouldNotDecompressEntirePortionAtOnce) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ runtime.GetAppData().ZstdBlockSizeForTest = 113; // one row
+
+ ui32 uploadRowsCount = 0;
+ runtime.SetObserverFunc([&uploadRowsCount](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ uploadRowsCount += ui32(ev->GetTypeRewrite() == TEvDataShard::EvUnsafeUploadRowsResponse);
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ const auto data = GenerateZstdTestData(TString(100, 'a'), 2); // 2 rows, 1 row = 113b
+ // ensure that one decompressed row is bigger than entire compressed file
+ UNIT_ASSERT(data.Data.size() < *runtime.GetAppData().ZstdBlockSizeForTest);
+
+ Restore(runtime, env, R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Utf8" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )", {data}, data.Data.size());
+
+ UNIT_ASSERT_VALUES_EQUAL(uploadRowsCount, 2);
+ }
+
Y_UNIT_TEST_WITH_COMPRESSION(ShouldExpandBuffer) {
TTestBasicRuntime runtime;