aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-06-16 21:11:16 +0300
committeraneporada <aneporada@ydb.tech>2023-06-16 21:11:16 +0300
commit625dc01ee453c507fdd73e1547df9e1281e1b015 (patch)
tree934cb080a639699b0080fd30819aac84c757ddbb
parent71d1df300ccd983492ce117bc7ccd2959fc69c57 (diff)
downloadydb-625dc01ee453c507fdd73e1547df9e1281e1b015.tar.gz
Switch IDqChannelStorage::Put() to TRope
-rw-r--r--ydb/core/kqp/runtime/kqp_channel_storage.cpp4
-rw-r--r--ydb/core/kqp/runtime/kqp_spilling.h6
-rw-r--r--ydb/core/kqp/runtime/kqp_spilling_file.cpp6
-rw-r--r--ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp31
-rw-r--r--ydb/library/yql/dq/runtime/dq_channel_storage.h7
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.cpp16
-rw-r--r--ydb/library/yql/dq/runtime/ut/ut_helper.h14
7 files changed, 54 insertions, 30 deletions
diff --git a/ydb/core/kqp/runtime/kqp_channel_storage.cpp b/ydb/core/kqp/runtime/kqp_channel_storage.cpp
index 8175ff8178..54b08d37b3 100644
--- a/ydb/core/kqp/runtime/kqp_channel_storage.cpp
+++ b/ydb/core/kqp/runtime/kqp_channel_storage.cpp
@@ -138,7 +138,7 @@ public:
return WritingBlobs.size() > MAX_INFLIGHT_BLOBS_COUNT || WritingBlobsSize > MAX_INFLIGHT_BLOBS_SIZE;
}
- void Put(ui64 blobId, TBuffer&& blob) {
+ void Put(ui64 blobId, TRope&& blob) {
FailOnError();
// TODO: timeout
@@ -223,7 +223,7 @@ public:
return SelfActor->IsFull();
}
- void Put(ui64 blobId, TBuffer&& blob) override {
+ void Put(ui64 blobId, TRope&& blob) override {
SelfActor->Put(blobId, std::move(blob));
}
diff --git a/ydb/core/kqp/runtime/kqp_spilling.h b/ydb/core/kqp/runtime/kqp_spilling.h
index d489fc4718..8cc9636946 100644
--- a/ydb/core/kqp/runtime/kqp_spilling.h
+++ b/ydb/core/kqp/runtime/kqp_spilling.h
@@ -3,6 +3,8 @@
#include <ydb/core/kqp/common/kqp_event_ids.h>
#include <ydb/core/protos/config.pb.h>
+#include <library/cpp/actors/util/rope.h>
+
#include <util/generic/buffer.h>
namespace NKikimr::NKqp {
@@ -10,10 +12,10 @@ namespace NKikimr::NKqp {
struct TEvKqpSpilling {
struct TEvWrite : public TEventLocal<TEvWrite, TKqpSpillingEvents::EvWrite> {
ui64 BlobId;
- TBuffer Blob;
+ TRope Blob;
TMaybe<TDuration> Timeout;
- TEvWrite(ui64 blobId, TBuffer&& blob, TMaybe<TDuration> timeout = {})
+ TEvWrite(ui64 blobId, TRope&& blob, TMaybe<TDuration> timeout = {})
: BlobId(blobId), Blob(std::move(blob)), Timeout(timeout) {}
};
diff --git a/ydb/core/kqp/runtime/kqp_spilling_file.cpp b/ydb/core/kqp/runtime/kqp_spilling_file.cpp
index 45b7577d41..c4fe14a3a4 100644
--- a/ydb/core/kqp/runtime/kqp_spilling_file.cpp
+++ b/ydb/core/kqp/runtime/kqp_spilling_file.cpp
@@ -785,7 +785,7 @@ private:
TString FileName;
bool CreateFile = false;
ui64 BlobId = 0;
- TBuffer Blob;
+ TRope Blob;
TInstant Ts = TInstant::Now();
void Process(void*) override {
@@ -806,7 +806,9 @@ private:
} else {
file = TFile::ForAppend(FileName);
}
- file.Write(Blob.Data(), Blob.size());
+ for (auto it = Blob.Begin(); it.Valid(); ++it) {
+ file.Write(it.ContiguousData(), it.ContiguousSize());
+ }
} catch (const yexception& e) {
A_LOG_E("[Write async] file: " << FileName << ", io error: " << e.what());
resp->Error = e.what();
diff --git a/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp b/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp
index c497576736..2e7e0fe6ac 100644
--- a/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp
+++ b/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp
@@ -30,6 +30,17 @@ TBuffer CreateBlob(ui32 size, char symbol) {
return blob;
}
+TRope CreateRope(ui32 size, char symbol, ui32 chunkSize = 7) {
+ TRope result;
+ while (size) {
+ size_t count = std::min(size, chunkSize);
+ TString str(count, symbol);
+ result.Insert(result.End(), TRope{str});
+ size -= count;
+ }
+ return result;
+}
+
void AssertEquals(const TBuffer& lhs, const TBuffer& rhs) {
TStringBuf l{lhs.data(), lhs.size()};
TStringBuf r{rhs.data(), rhs.size()};
@@ -100,7 +111,7 @@ Y_UNIT_TEST(Simple) {
// put blob 1
{
- auto ev = new TEvKqpSpilling::TEvWrite(1, CreateBlob(10, 'a'));
+ auto ev = new TEvKqpSpilling::TEvWrite(1, CreateRope(10, 'a'));
runtime.Send(new IEventHandle(spillingActor, tester, ev));
auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester, TDuration::Seconds(1));
@@ -109,7 +120,7 @@ Y_UNIT_TEST(Simple) {
// put blob 2
{
- auto ev = new TEvKqpSpilling::TEvWrite(2, CreateBlob(11, 'z'));
+ auto ev = new TEvKqpSpilling::TEvWrite(2, CreateRope(11, 'z'));
runtime.Send(new IEventHandle(spillingActor, tester, ev));
auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester, TDuration::Seconds(1));
@@ -175,7 +186,7 @@ Y_UNIT_TEST(Write_TotalSizeLimitExceeded) {
WaitBootstrap(runtime);
{
- auto ev = new TEvKqpSpilling::TEvWrite(1, CreateBlob(51, 'a'));
+ auto ev = new TEvKqpSpilling::TEvWrite(1, CreateRope(51, 'a'));
runtime.Send(new IEventHandle(spillingActor, tester, ev));
auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester);
@@ -183,7 +194,7 @@ Y_UNIT_TEST(Write_TotalSizeLimitExceeded) {
}
{
- auto ev = new TEvKqpSpilling::TEvWrite(2, CreateBlob(50, 'b'));
+ auto ev = new TEvKqpSpilling::TEvWrite(2, CreateRope(50, 'b'));
runtime.Send(new IEventHandle(spillingActor, tester, ev));
auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvError>(tester);
@@ -203,7 +214,7 @@ Y_UNIT_TEST(Write_FileSizeLimitExceeded) {
WaitBootstrap(runtime);
{
- auto ev = new TEvKqpSpilling::TEvWrite(1, CreateBlob(51, 'a'));
+ auto ev = new TEvKqpSpilling::TEvWrite(1, CreateRope(51, 'a'));
runtime.Send(new IEventHandle(spillingActor, tester, ev));
auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester);
@@ -211,7 +222,7 @@ Y_UNIT_TEST(Write_FileSizeLimitExceeded) {
}
{
- auto ev = new TEvKqpSpilling::TEvWrite(2, CreateBlob(50, 'b'));
+ auto ev = new TEvKqpSpilling::TEvWrite(2, CreateRope(50, 'b'));
runtime.Send(new IEventHandle(spillingActor, tester, ev));
auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvError>(tester);
@@ -234,7 +245,7 @@ Y_UNIT_TEST(MultipleFileParts) {
for (ui32 i = 0; i < 5; ++i) {
// Cerr << "---- store blob #" << i << Endl;
- auto ev = new TEvKqpSpilling::TEvWrite(i, CreateBlob(20, 'a' + i));
+ auto ev = new TEvKqpSpilling::TEvWrite(i, CreateRope(20, 'a' + i));
runtime.Send(new IEventHandle(spillingActor, tester, ev));
auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester);
@@ -277,7 +288,7 @@ Y_UNIT_TEST(SingleFilePart) {
for (ui32 i = 0; i < 5; ++i) {
// Cerr << "---- store blob #" << i << Endl;
- auto ev = new TEvKqpSpilling::TEvWrite(i, CreateBlob(20, 'a' + i));
+ auto ev = new TEvKqpSpilling::TEvWrite(i, CreateRope(20, 'a' + i));
runtime.Send(new IEventHandle(spillingActor, tester, ev));
auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester);
@@ -317,7 +328,7 @@ Y_UNIT_TEST(ReadError) {
WaitBootstrap(runtime);
{
- auto ev = new TEvKqpSpilling::TEvWrite(0, CreateBlob(20, 'a'));
+ auto ev = new TEvKqpSpilling::TEvWrite(0, CreateRope(20, 'a'));
runtime.Send(new IEventHandle(spillingActor, tester, ev));
auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester);
@@ -396,7 +407,7 @@ Y_UNIT_TEST(StartError) {
// put blob 1
{
- auto ev = new TEvKqpSpilling::TEvWrite(1, CreateBlob(10, 'a'));
+ auto ev = new TEvKqpSpilling::TEvWrite(1, CreateRope(10, 'a'));
runtime.Send(new IEventHandle(spillingActor, tester, ev));
auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvError>(tester, TDuration::Seconds(1));
diff --git a/ydb/library/yql/dq/runtime/dq_channel_storage.h b/ydb/library/yql/dq/runtime/dq_channel_storage.h
index f132d39dd0..5793fdf5c4 100644
--- a/ydb/library/yql/dq/runtime/dq_channel_storage.h
+++ b/ydb/library/yql/dq/runtime/dq_channel_storage.h
@@ -1,5 +1,7 @@
#pragma once
+#include <library/cpp/actors/util/rope.h>
+
#include <util/generic/buffer.h>
#include <util/generic/yexception.h>
@@ -23,12 +25,13 @@ public:
// methods Put/Get can throw `TDqChannelStorageException`
- // TODO: support IZeroCopyInput
- virtual void Put(ui64 blobId, TBuffer&& blob) = 0;
+ // Data should be owned by `blob` argument since the Put() call is actually asynchronous
+ virtual void Put(ui64 blobId, TRope&& blob) = 0;
// TODO: there is no way for client to delete blob.
// It is better to replace Get() with Pull() which will delete blob after read
// (current clients read each blob exactly once)
+ // Get() will return false if data is not ready yet. Client should repeat Get() in this case
virtual bool Get(ui64 blobId, TBuffer& data) = 0;
};
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
index a2306247f1..9b55fd89f3 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
@@ -43,6 +43,8 @@ private:
using namespace NKikimr;
+using NKikimr::NMiniKQL::TPagedBuffer;
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
template<bool FastPack>
@@ -138,14 +140,10 @@ public:
size_t bufSize = head.Buffer->Size();
YQL_ENSURE(PackedDataSize >= bufSize);
- TBuffer blob;
- blob.Reserve(bufSize + sizeof(head.RowCount));
- blob.Append((const char*)&head.RowCount, sizeof(head.RowCount));
- head.Buffer->ForEachPage([&blob](const char *data, size_t len) {
- blob.Append(data, len);
- });
-
- YQL_ENSURE(blob.Size() == bufSize + sizeof(head.RowCount));
+ TRope blob = TPagedBuffer::AsRope(head.Buffer);
+ TString header((const char*)&head.RowCount, sizeof(head.RowCount));
+ blob.Insert(blob.Begin(), TRope{header});
+ YQL_ENSURE(blob.size() == bufSize + sizeof(head.RowCount));
Storage->Put(NextStoredId++, std::move(blob));
PackedDataSize -= bufSize;
@@ -365,7 +363,7 @@ private:
TLogFunc LogFunc;
struct TSerializedBatch {
- NKikimr::NMiniKQL::TPagedBuffer::TPtr Buffer;
+ TPagedBuffer::TPtr Buffer;
ui64 RowCount = 0;
};
std::deque<TSerializedBatch> Data;
diff --git a/ydb/library/yql/dq/runtime/ut/ut_helper.h b/ydb/library/yql/dq/runtime/ut/ut_helper.h
index 003c419e7f..ceb848008a 100644
--- a/ydb/library/yql/dq/runtime/ut/ut_helper.h
+++ b/ydb/library/yql/dq/runtime/ut/ut_helper.h
@@ -20,7 +20,7 @@ public:
return Capacity <= UsedSpace;
}
- void Put(ui64 blobId, TBuffer&& blob) override {
+ void Put(ui64 blobId, TRope&& blob) override {
if (UsedSpace + blob.size() > Capacity) {
ythrow yexception() << "Space limit exceeded";
}
@@ -40,7 +40,15 @@ public:
return false;
}
- data = std::move(Blobs[blobId]);
+ auto& blob = Blobs[blobId];
+ data.Clear();
+ data.Reserve(blob.size());
+ for (auto it = blob.Begin(); it.Valid(); ++it) {
+ data.Append(it.ContiguousData(), it.ContiguousSize());
+ }
+
+ Y_VERIFY(data.size() == blob.size());
+
Blobs.erase(blobId);
UsedSpace -= data.size();
@@ -54,7 +62,7 @@ public:
private:
const ui64 Capacity;
- THashMap<ui64, TBuffer> Blobs;
+ THashMap<ui64, TRope> Blobs;
ui64 UsedSpace = 0;
ui32 GetBlankRequests = 0;
};