diff options
author | aneporada <aneporada@ydb.tech> | 2023-06-16 21:11:16 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-06-16 21:11:16 +0300 |
commit | 625dc01ee453c507fdd73e1547df9e1281e1b015 (patch) | |
tree | 934cb080a639699b0080fd30819aac84c757ddbb | |
parent | 71d1df300ccd983492ce117bc7ccd2959fc69c57 (diff) | |
download | ydb-625dc01ee453c507fdd73e1547df9e1281e1b015.tar.gz |
Switch IDqChannelStorage::Put() to TRope
-rw-r--r-- | ydb/core/kqp/runtime/kqp_channel_storage.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_spilling.h | 6 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_spilling_file.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp | 31 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_channel_storage.h | 7 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_output_channel.cpp | 16 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/ut/ut_helper.h | 14 |
7 files changed, 54 insertions, 30 deletions
diff --git a/ydb/core/kqp/runtime/kqp_channel_storage.cpp b/ydb/core/kqp/runtime/kqp_channel_storage.cpp index 8175ff8178..54b08d37b3 100644 --- a/ydb/core/kqp/runtime/kqp_channel_storage.cpp +++ b/ydb/core/kqp/runtime/kqp_channel_storage.cpp @@ -138,7 +138,7 @@ public: return WritingBlobs.size() > MAX_INFLIGHT_BLOBS_COUNT || WritingBlobsSize > MAX_INFLIGHT_BLOBS_SIZE; } - void Put(ui64 blobId, TBuffer&& blob) { + void Put(ui64 blobId, TRope&& blob) { FailOnError(); // TODO: timeout @@ -223,7 +223,7 @@ public: return SelfActor->IsFull(); } - void Put(ui64 blobId, TBuffer&& blob) override { + void Put(ui64 blobId, TRope&& blob) override { SelfActor->Put(blobId, std::move(blob)); } diff --git a/ydb/core/kqp/runtime/kqp_spilling.h b/ydb/core/kqp/runtime/kqp_spilling.h index d489fc4718..8cc9636946 100644 --- a/ydb/core/kqp/runtime/kqp_spilling.h +++ b/ydb/core/kqp/runtime/kqp_spilling.h @@ -3,6 +3,8 @@ #include <ydb/core/kqp/common/kqp_event_ids.h> #include <ydb/core/protos/config.pb.h> +#include <library/cpp/actors/util/rope.h> + #include <util/generic/buffer.h> namespace NKikimr::NKqp { @@ -10,10 +12,10 @@ namespace NKikimr::NKqp { struct TEvKqpSpilling { struct TEvWrite : public TEventLocal<TEvWrite, TKqpSpillingEvents::EvWrite> { ui64 BlobId; - TBuffer Blob; + TRope Blob; TMaybe<TDuration> Timeout; - TEvWrite(ui64 blobId, TBuffer&& blob, TMaybe<TDuration> timeout = {}) + TEvWrite(ui64 blobId, TRope&& blob, TMaybe<TDuration> timeout = {}) : BlobId(blobId), Blob(std::move(blob)), Timeout(timeout) {} }; diff --git a/ydb/core/kqp/runtime/kqp_spilling_file.cpp b/ydb/core/kqp/runtime/kqp_spilling_file.cpp index 45b7577d41..c4fe14a3a4 100644 --- a/ydb/core/kqp/runtime/kqp_spilling_file.cpp +++ b/ydb/core/kqp/runtime/kqp_spilling_file.cpp @@ -785,7 +785,7 @@ private: TString FileName; bool CreateFile = false; ui64 BlobId = 0; - TBuffer Blob; + TRope Blob; TInstant Ts = TInstant::Now(); void Process(void*) override { @@ -806,7 +806,9 @@ private: } else { file = TFile::ForAppend(FileName); } - file.Write(Blob.Data(), Blob.size()); + for (auto it = Blob.Begin(); it.Valid(); ++it) { + file.Write(it.ContiguousData(), it.ContiguousSize()); + } } catch (const yexception& e) { A_LOG_E("[Write async] file: " << FileName << ", io error: " << e.what()); resp->Error = e.what(); diff --git a/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp b/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp index c497576736..2e7e0fe6ac 100644 --- a/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp +++ b/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp @@ -30,6 +30,17 @@ TBuffer CreateBlob(ui32 size, char symbol) { return blob; } +TRope CreateRope(ui32 size, char symbol, ui32 chunkSize = 7) { + TRope result; + while (size) { + size_t count = std::min(size, chunkSize); + TString str(count, symbol); + result.Insert(result.End(), TRope{str}); + size -= count; + } + return result; +} + void AssertEquals(const TBuffer& lhs, const TBuffer& rhs) { TStringBuf l{lhs.data(), lhs.size()}; TStringBuf r{rhs.data(), rhs.size()}; @@ -100,7 +111,7 @@ Y_UNIT_TEST(Simple) { // put blob 1 { - auto ev = new TEvKqpSpilling::TEvWrite(1, CreateBlob(10, 'a')); + auto ev = new TEvKqpSpilling::TEvWrite(1, CreateRope(10, 'a')); runtime.Send(new IEventHandle(spillingActor, tester, ev)); auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester, TDuration::Seconds(1)); @@ -109,7 +120,7 @@ Y_UNIT_TEST(Simple) { // put blob 2 { - auto ev = new TEvKqpSpilling::TEvWrite(2, CreateBlob(11, 'z')); + auto ev = new TEvKqpSpilling::TEvWrite(2, CreateRope(11, 'z')); runtime.Send(new IEventHandle(spillingActor, tester, ev)); auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester, TDuration::Seconds(1)); @@ -175,7 +186,7 @@ Y_UNIT_TEST(Write_TotalSizeLimitExceeded) { WaitBootstrap(runtime); { - auto ev = new TEvKqpSpilling::TEvWrite(1, CreateBlob(51, 'a')); + auto ev = new TEvKqpSpilling::TEvWrite(1, CreateRope(51, 'a')); runtime.Send(new IEventHandle(spillingActor, tester, ev)); auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester); @@ -183,7 +194,7 @@ Y_UNIT_TEST(Write_TotalSizeLimitExceeded) { } { - auto ev = new TEvKqpSpilling::TEvWrite(2, CreateBlob(50, 'b')); + auto ev = new TEvKqpSpilling::TEvWrite(2, CreateRope(50, 'b')); runtime.Send(new IEventHandle(spillingActor, tester, ev)); auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvError>(tester); @@ -203,7 +214,7 @@ Y_UNIT_TEST(Write_FileSizeLimitExceeded) { WaitBootstrap(runtime); { - auto ev = new TEvKqpSpilling::TEvWrite(1, CreateBlob(51, 'a')); + auto ev = new TEvKqpSpilling::TEvWrite(1, CreateRope(51, 'a')); runtime.Send(new IEventHandle(spillingActor, tester, ev)); auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester); @@ -211,7 +222,7 @@ Y_UNIT_TEST(Write_FileSizeLimitExceeded) { } { - auto ev = new TEvKqpSpilling::TEvWrite(2, CreateBlob(50, 'b')); + auto ev = new TEvKqpSpilling::TEvWrite(2, CreateRope(50, 'b')); runtime.Send(new IEventHandle(spillingActor, tester, ev)); auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvError>(tester); @@ -234,7 +245,7 @@ Y_UNIT_TEST(MultipleFileParts) { for (ui32 i = 0; i < 5; ++i) { // Cerr << "---- store blob #" << i << Endl; - auto ev = new TEvKqpSpilling::TEvWrite(i, CreateBlob(20, 'a' + i)); + auto ev = new TEvKqpSpilling::TEvWrite(i, CreateRope(20, 'a' + i)); runtime.Send(new IEventHandle(spillingActor, tester, ev)); auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester); @@ -277,7 +288,7 @@ Y_UNIT_TEST(SingleFilePart) { for (ui32 i = 0; i < 5; ++i) { // Cerr << "---- store blob #" << i << Endl; - auto ev = new TEvKqpSpilling::TEvWrite(i, CreateBlob(20, 'a' + i)); + auto ev = new TEvKqpSpilling::TEvWrite(i, CreateRope(20, 'a' + i)); runtime.Send(new IEventHandle(spillingActor, tester, ev)); auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester); @@ -317,7 +328,7 @@ Y_UNIT_TEST(ReadError) { WaitBootstrap(runtime); { - auto ev = new TEvKqpSpilling::TEvWrite(0, CreateBlob(20, 'a')); + auto ev = new TEvKqpSpilling::TEvWrite(0, CreateRope(20, 'a')); runtime.Send(new IEventHandle(spillingActor, tester, ev)); auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester); @@ -396,7 +407,7 @@ Y_UNIT_TEST(StartError) { // put blob 1 { - auto ev = new TEvKqpSpilling::TEvWrite(1, CreateBlob(10, 'a')); + auto ev = new TEvKqpSpilling::TEvWrite(1, CreateRope(10, 'a')); runtime.Send(new IEventHandle(spillingActor, tester, ev)); auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvError>(tester, TDuration::Seconds(1)); diff --git a/ydb/library/yql/dq/runtime/dq_channel_storage.h b/ydb/library/yql/dq/runtime/dq_channel_storage.h index f132d39dd0..5793fdf5c4 100644 --- a/ydb/library/yql/dq/runtime/dq_channel_storage.h +++ b/ydb/library/yql/dq/runtime/dq_channel_storage.h @@ -1,5 +1,7 @@ #pragma once +#include <library/cpp/actors/util/rope.h> + #include <util/generic/buffer.h> #include <util/generic/yexception.h> @@ -23,12 +25,13 @@ public: // methods Put/Get can throw `TDqChannelStorageException` - // TODO: support IZeroCopyInput - virtual void Put(ui64 blobId, TBuffer&& blob) = 0; + // Data should be owned by `blob` argument since the Put() call is actually asynchronous + virtual void Put(ui64 blobId, TRope&& blob) = 0; // TODO: there is no way for client to delete blob. // It is better to replace Get() with Pull() which will delete blob after read // (current clients read each blob exactly once) + // Get() will return false if data is not ready yet. Client should repeat Get() in this case virtual bool Get(ui64 blobId, TBuffer& data) = 0; }; diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp index a2306247f1..9b55fd89f3 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp @@ -43,6 +43,8 @@ private: using namespace NKikimr; +using NKikimr::NMiniKQL::TPagedBuffer; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// template<bool FastPack> @@ -138,14 +140,10 @@ public: size_t bufSize = head.Buffer->Size(); YQL_ENSURE(PackedDataSize >= bufSize); - TBuffer blob; - blob.Reserve(bufSize + sizeof(head.RowCount)); - blob.Append((const char*)&head.RowCount, sizeof(head.RowCount)); - head.Buffer->ForEachPage([&blob](const char *data, size_t len) { - blob.Append(data, len); - }); - - YQL_ENSURE(blob.Size() == bufSize + sizeof(head.RowCount)); + TRope blob = TPagedBuffer::AsRope(head.Buffer); + TString header((const char*)&head.RowCount, sizeof(head.RowCount)); + blob.Insert(blob.Begin(), TRope{header}); + YQL_ENSURE(blob.size() == bufSize + sizeof(head.RowCount)); Storage->Put(NextStoredId++, std::move(blob)); PackedDataSize -= bufSize; @@ -365,7 +363,7 @@ private: TLogFunc LogFunc; struct TSerializedBatch { - NKikimr::NMiniKQL::TPagedBuffer::TPtr Buffer; + TPagedBuffer::TPtr Buffer; ui64 RowCount = 0; }; std::deque<TSerializedBatch> Data; diff --git a/ydb/library/yql/dq/runtime/ut/ut_helper.h b/ydb/library/yql/dq/runtime/ut/ut_helper.h index 003c419e7f..ceb848008a 100644 --- a/ydb/library/yql/dq/runtime/ut/ut_helper.h +++ b/ydb/library/yql/dq/runtime/ut/ut_helper.h @@ -20,7 +20,7 @@ public: return Capacity <= UsedSpace; } - void Put(ui64 blobId, TBuffer&& blob) override { + void Put(ui64 blobId, TRope&& blob) override { if (UsedSpace + blob.size() > Capacity) { ythrow yexception() << "Space limit exceeded"; } @@ -40,7 +40,15 @@ public: return false; } - data = std::move(Blobs[blobId]); + auto& blob = Blobs[blobId]; + data.Clear(); + data.Reserve(blob.size()); + for (auto it = blob.Begin(); it.Valid(); ++it) { + data.Append(it.ContiguousData(), it.ContiguousSize()); + } + + Y_VERIFY(data.size() == blob.size()); + Blobs.erase(blobId); UsedSpace -= data.size(); @@ -54,7 +62,7 @@ public: private: const ui64 Capacity; - THashMap<ui64, TBuffer> Blobs; + THashMap<ui64, TRope> Blobs; ui64 UsedSpace = 0; ui32 GetBlankRequests = 0; }; |