aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-06-06 21:51:14 +0300
committeraneporada <aneporada@ydb.tech>2023-06-06 21:51:14 +0300
commitd7618ef587053f2eb7d20f0b2d89709e69009c1b (patch)
tree1d4e0f418fbf355be1293e0bdd9b37f0f5d63e2e
parentf18d76db543cf768f466baf6e44549a6556daec6 (diff)
downloadydb-d7618ef587053f2eb7d20f0b2d89709e69009c1b.tar.gz
Support for wide streams in dq channels
-rw-r--r--ydb/core/kqp/query_data/kqp_query_data.cpp24
-rw-r--r--ydb/core/kqp/query_data/kqp_query_data.h2
-rw-r--r--ydb/core/kqp/runtime/kqp_effects.cpp6
-rw-r--r--ydb/core/kqp/runtime/kqp_output_stream.cpp6
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp8
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp12
-rw-r--r--ydb/core/kqp/runtime/kqp_transport.cpp8
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp6
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h4
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h10
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor.h4
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp4
-rw-r--r--ydb/library/yql/dq/runtime/dq_async_input.cpp2
-rw-r--r--ydb/library/yql/dq/runtime/dq_async_input.h2
-rw-r--r--ydb/library/yql/dq/runtime/dq_async_output.cpp11
-rw-r--r--ydb/library/yql/dq/runtime/dq_async_output.h2
-rw-r--r--ydb/library/yql/dq/runtime/dq_columns_resolve.cpp26
-rw-r--r--ydb/library/yql/dq/runtime/dq_input.h9
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_channel.cpp13
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_impl.h62
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_producer.cpp207
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_producer.h1
-rw-r--r--ydb/library/yql/dq/runtime/dq_output.h1
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.cpp40
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp285
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_consumer.cpp74
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_consumer.h5
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp31
-rw-r--r--ydb/library/yql/dq/runtime/dq_transport.cpp45
-rw-r--r--ydb/library/yql/dq/runtime/dq_transport.h4
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_holders.h219
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp69
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_pack.h5
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp12
-rw-r--r--ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp2
-rw-r--r--ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp1
-rw-r--r--ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h10
-rw-r--r--ydb/library/yql/providers/dq/actors/proto_builder.cpp22
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp4
-rw-r--r--ydb/library/yql/providers/dq/runtime/task_command_executor.cpp19
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp23
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp11
-rw-r--r--ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp3
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp9
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp20
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp5
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp11
-rw-r--r--ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp11
-rw-r--r--ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp3
49 files changed, 1048 insertions, 325 deletions
diff --git a/ydb/core/kqp/query_data/kqp_query_data.cpp b/ydb/core/kqp/query_data/kqp_query_data.cpp
index 13eb649faff..a7673a46493 100644
--- a/ydb/core/kqp/query_data/kqp_query_data.cpp
+++ b/ydb/core/kqp/query_data/kqp_query_data.cpp
@@ -19,13 +19,19 @@ TTypedUnboxedValue TKqpExecuterTxResult::GetUV(
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
const NKikimr::NMiniKQL::THolderFactory& factory)
{
+ YQL_ENSURE(!Rows.IsWide());
if (IsStream) {
auto* listOfItemType = NKikimr::NMiniKQL::TListType::Create(MkqlItemType, typeEnv);
- NUdf::TUnboxedValue value = factory.VectorAsArray(Rows);
+
+ NUdf::TUnboxedValue* itemsPtr = nullptr;
+ auto value = factory.CreateDirectArrayHolder(Rows.RowCount(), itemsPtr);
+ Rows.ForEachRow([&](NUdf::TUnboxedValue& value) {
+ *itemsPtr++ = std::move(value);
+ });
return {listOfItemType, value};
} else {
- YQL_ENSURE(Rows.size() == 1, "Actual buffer size: " << Rows.size());
- return {MkqlItemType, Rows[0]};
+ YQL_ENSURE(Rows.RowCount() == 1, "Actual buffer size: " << Rows.RowCount());
+ return {MkqlItemType, *Rows.Head()};
}
}
@@ -42,6 +48,7 @@ NKikimrMiniKQL::TResult TKqpExecuterTxResult::GetMkql() {
}
void TKqpExecuterTxResult::FillMkql(NKikimrMiniKQL::TResult* mkqlResult) {
+ YQL_ENSURE(!Rows.IsWide());
if (IsStream) {
mkqlResult->MutableType()->SetKind(NKikimrMiniKQL::List);
ExportTypeToProto(
@@ -49,16 +56,15 @@ void TKqpExecuterTxResult::FillMkql(NKikimrMiniKQL::TResult* mkqlResult) {
*mkqlResult->MutableType()->MutableList()->MutableItem(),
ColumnOrder);
- for(auto& row: Rows) {
+ Rows.ForEachRow([&](NUdf::TUnboxedValue& value) {
ExportValueToProto(
- MkqlItemType, row, *mkqlResult->MutableValue()->AddList(),
+ MkqlItemType, value, *mkqlResult->MutableValue()->AddList(),
ColumnOrder);
- }
-
+ });
} else {
- YQL_ENSURE(Rows.size() == 1, "Actual buffer size: " << Rows.size());
+ YQL_ENSURE(Rows.RowCount() == 1, "Actual buffer size: " << Rows.RowCount());
ExportTypeToProto(MkqlItemType, *mkqlResult->MutableType());
- ExportValueToProto(MkqlItemType, Rows[0], *mkqlResult->MutableValue());
+ ExportValueToProto(MkqlItemType, *Rows.Head(), *mkqlResult->MutableValue());
}
}
diff --git a/ydb/core/kqp/query_data/kqp_query_data.h b/ydb/core/kqp/query_data/kqp_query_data.h
index 519961e0ee1..437b3036a42 100644
--- a/ydb/core/kqp/query_data/kqp_query_data.h
+++ b/ydb/core/kqp/query_data/kqp_query_data.h
@@ -80,7 +80,7 @@ struct TKqpExecuterTxResult {
NKikimr::NMiniKQL::TType* MkqlItemType;
const TVector<ui32>* ColumnOrder = nullptr;
ui32 QueryResultIndex = 0;
- NKikimr::NMiniKQL::TUnboxedValueVector Rows;
+ NKikimr::NMiniKQL::TUnboxedValueBatch Rows;
explicit TKqpExecuterTxResult(
bool isStream,
diff --git a/ydb/core/kqp/runtime/kqp_effects.cpp b/ydb/core/kqp/runtime/kqp_effects.cpp
index 35ca7b017e1..36758e44459 100644
--- a/ydb/core/kqp/runtime/kqp_effects.cpp
+++ b/ydb/core/kqp/runtime/kqp_effects.cpp
@@ -23,6 +23,12 @@ public:
value.Apply(*ApplyCtx);
}
+ void WideConsume(NUdf::TUnboxedValue* values, ui32 count) final {
+ Y_UNUSED(values);
+ Y_UNUSED(count);
+ Y_FAIL("WideConsume not supported yet");
+ }
+
void Consume(NDqProto::TCheckpoint&&) final {
Y_FAIL("Shouldn't be called");
}
diff --git a/ydb/core/kqp/runtime/kqp_output_stream.cpp b/ydb/core/kqp/runtime/kqp_output_stream.cpp
index b931881f785..431db53178c 100644
--- a/ydb/core/kqp/runtime/kqp_output_stream.cpp
+++ b/ydb/core/kqp/runtime/kqp_output_stream.cpp
@@ -45,6 +45,12 @@ public:
Outputs[partitionIndex]->Push(std::move(value));
}
+ void WideConsume(TUnboxedValue* values, ui32 count) final {
+ Y_UNUSED(values);
+ Y_UNUSED(count);
+ Y_FAIL("WideConsume not supported yet");
+ }
+
void Consume(NDqProto::TCheckpoint&&) final {
Y_FAIL("Shouldn't be called");
}
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 5d791c4656d..617b4588032 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -1086,7 +1086,7 @@ public:
}
i64 GetAsyncInputData(
- NKikimr::NMiniKQL::TUnboxedValueVector& resultVector,
+ NKikimr::NMiniKQL::TUnboxedValueBatch& resultBatch,
TMaybe<TInstant>&,
bool& finished,
i64 freeSpace) override
@@ -1097,6 +1097,8 @@ public:
return 0;
}
+ YQL_ENSURE(!resultBatch.IsWide(), "Wide stream is not supported");
+
CA_LOG_D(TStringBuilder() << " enter getasyncinputdata results size " << Results.size());
ui64 bytes = 0;
while (!Results.empty()) {
@@ -1120,7 +1122,7 @@ public:
for (; result.ProcessedRows < result.PackedRows; ++result.ProcessedRows) {
NMiniKQL::TBytesStatistics rowSize = GetRowSize((*batch)[result.ProcessedRows].GetElements());
- resultVector.push_back(std::move((*batch)[result.ProcessedRows]));
+ resultBatch.push_back(std::move((*batch)[result.ProcessedRows]));
ProcessedRowCount += 1;
bytes += rowSize.AllocatedBytes;
if (ProcessedRowCount == Settings.GetItemsLimit()) {
@@ -1129,7 +1131,7 @@ public:
return bytes;
}
}
- CA_LOG_D(TStringBuilder() << "returned " << resultVector.size() << " rows; processed " << ProcessedRowCount << " rows");
+ CA_LOG_D(TStringBuilder() << "returned " << resultBatch.RowCount() << " rows; processed " << ProcessedRowCount << " rows");
size_t rowCount = result.ReadResult.Get()->Get()->GetRowsCount();
if (rowCount == result.ProcessedRows) {
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index f7106c30d8e..9d00649423b 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -203,9 +203,11 @@ private:
TActorBootstrapped<TKqpStreamLookupActor>::PassAway();
}
- i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
+ i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
i64 totalDataSize = 0;
+ YQL_ENSURE(!batch.IsWide(), "Wide stream is not supported");
+
totalDataSize = PackResults(batch, freeSpace);
auto status = FetchLookupKeys();
@@ -366,17 +368,11 @@ private:
}
}
- ui64 PackResults(NKikimr::NMiniKQL::TUnboxedValueVector& batch, i64 freeSpace) {
+ ui64 PackResults(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) {
i64 totalSize = 0;
bool sizeLimitExceeded = false;
batch.clear();
- size_t rowsCount = 0;
- for (const auto& result : Results) {
- rowsCount += result.ReadResult->Get()->GetRowsCount();
- }
- batch.reserve(rowsCount);
-
while (!Results.empty() && !sizeLimitExceeded) {
auto& result = Results.front();
for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) {
diff --git a/ydb/core/kqp/runtime/kqp_transport.cpp b/ydb/core/kqp/runtime/kqp_transport.cpp
index 6ee6d5163e8..3feb75e88fb 100644
--- a/ydb/core/kqp/runtime/kqp_transport.cpp
+++ b/ydb/core/kqp/runtime/kqp_transport.cpp
@@ -75,11 +75,11 @@ Ydb::ResultSet TKqpProtoBuilder::BuildYdbResultSet(
NDq::TDqDataSerializer dataSerializer(*TypeEnv, *HolderFactory, transportVersion);
for (auto& part : data) {
if (part.GetRows()) {
- TUnboxedValueVector rows;
+ TUnboxedValueBatch rows(mkqlSrcRowType);
dataSerializer.Deserialize(part, mkqlSrcRowType, rows);
- for (auto& row : rows) {
- ExportValueToProto(mkqlSrcRowType, row, *resultSet.add_rows(), columnOrder);
- }
+ rows.ForEachRow([&](const NUdf::TUnboxedValue& value) {
+ ExportValueToProto(mkqlSrcRowType, value, *resultSet.add_rows(), columnOrder);
+ });
}
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 5e24b9880ea..8626a73d82f 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -291,7 +291,7 @@ private:
DoExecute();
}
- void AsyncInputPush(NKikimr::NMiniKQL::TUnboxedValueVector&& batch, TAsyncInputInfoBase& source, i64 space, bool finished) override {
+ void AsyncInputPush(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, TAsyncInputInfoBase& source, i64 space, bool finished) override {
ProcessSourcesState.Inflight++;
source.PushStarted = true;
source.Finished = finished;
@@ -693,7 +693,7 @@ private:
}
void SinkSend(ui64 index,
- NKikimr::NMiniKQL::TUnboxedValueVector&& batch,
+ NKikimr::NMiniKQL::TUnboxedValueBatch&& batch,
TMaybe<NDqProto::TCheckpoint>&& checkpoint,
i64 size,
i64 checkpointSize,
@@ -722,7 +722,7 @@ private:
{
auto guard = BindAllocator();
- NKikimr::NMiniKQL::TUnboxedValueVector data = std::move(batch);
+ NKikimr::NMiniKQL::TUnboxedValueBatch data = std::move(batch);
sinkInfo.AsyncOutput->SendData(std::move(data), size, std::move(checkpoint), finished);
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
index 901bf62167d..0c496218fb9 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
@@ -75,7 +75,7 @@ struct IDqComputeActorAsyncInput {
// Method should be called under bound mkql allocator.
// Could throw YQL errors.
virtual i64 GetAsyncInputData(
- NKikimr::NMiniKQL::TUnboxedValueVector& batch,
+ NKikimr::NMiniKQL::TUnboxedValueBatch& batch,
TMaybe<TInstant>& watermark,
bool& finished,
i64 freeSpace) = 0;
@@ -149,7 +149,7 @@ struct IDqComputeActorAsyncOutput {
// Could throw YQL errors.
// Checkpoint (if any) is supposed to be ordered after batch,
// and finished flag is supposed to be ordered after checkpoint.
- virtual void SendData(NKikimr::NMiniKQL::TUnboxedValueVector&& batch, i64 dataSize,
+ virtual void SendData(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 dataSize,
const TMaybe<NDqProto::TCheckpoint>& checkpoint, bool finished) = 0;
// Checkpointing.
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 1d1e406d74a..19623a925e2 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1080,7 +1080,7 @@ protected:
return TaskRunner->BindAllocator();
}
- virtual void AsyncInputPush(NKikimr::NMiniKQL::TUnboxedValueVector&& batch, TAsyncInputInfoBase& source, i64 space, bool finished) {
+ virtual void AsyncInputPush(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, TAsyncInputInfoBase& source, i64 space, bool finished) {
source.Buffer->Push(std::move(batch), space);
if (finished) {
source.Buffer->Finish();
@@ -1420,7 +1420,7 @@ private:
ui32 SendDataChunkToAsyncOutput(ui64 outputIndex, TAsyncOutputInfoBase& outputInfo, ui64 bytes) {
auto sink = outputInfo.Buffer;
- NKikimr::NMiniKQL::TUnboxedValueVector dataBatch;
+ NKikimr::NMiniKQL::TUnboxedValueBatch dataBatch(sink->GetOutputType());
NDqProto::TCheckpoint checkpoint;
const ui64 dataSize = !outputInfo.Finished ? sink->Pop(dataBatch, bytes) : 0;
@@ -1646,14 +1646,14 @@ protected:
const i64 freeSpace = AsyncInputFreeSpace(info);
if (freeSpace > 0) {
TMaybe<TInstant> watermark;
- NKikimr::NMiniKQL::TUnboxedValueVector batch;
+ NKikimr::NMiniKQL::TUnboxedValueBatch batch;
Y_VERIFY(info.AsyncInput);
bool finished = false;
const i64 space = info.AsyncInput->GetAsyncInputData(batch, watermark, finished, freeSpace);
CA_LOG_T("Poll async input " << inputIndex
<< ". Buffer free space: " << freeSpace
<< ", read from async input: " << space << " bytes, "
- << batch.size() << " rows, finished: " << finished);
+ << batch.RowCount() << " rows, finished: " << finished);
if (!batch.empty()) {
// If we have read some data, we must run such reading again
@@ -1662,7 +1662,7 @@ protected:
ContinueExecute();
}
- DqComputeActorMetrics.ReportAsyncInputData(inputIndex, batch.size(), watermark);
+ DqComputeActorMetrics.ReportAsyncInputData(inputIndex, batch.RowCount(), watermark);
if (watermark) {
const auto inputWatermarkChanged = WatermarksTracker.NotifyAsyncInputWatermarkReceived(
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h b/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h
index 0d45a9fde86..9ae31976c18 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor.h
@@ -16,7 +16,7 @@ struct ITaskRunnerActor {
virtual ~ICallbacks() = default;
virtual void SinkSend(
ui64 index,
- NKikimr::NMiniKQL::TUnboxedValueVector&& batch,
+ NKikimr::NMiniKQL::TUnboxedValueBatch&& batch,
TMaybe<NDqProto::TCheckpoint>&& checkpoint,
i64 size,
i64 checkpointSize,
@@ -29,7 +29,7 @@ struct ITaskRunnerActor {
virtual void AsyncInputPush(
ui64 cookie,
ui64 index,
- NKikimr::NMiniKQL::TUnboxedValueVector&& batch,
+ NKikimr::NMiniKQL::TUnboxedValueBatch&& batch,
i64 space,
bool finish) = 0;
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index 45715caef0d..2638afd0fac 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -286,7 +286,7 @@ private:
void AsyncInputPush(
ui64 cookie,
ui64 index,
- NKikimr::NMiniKQL::TUnboxedValueVector&& batch,
+ NKikimr::NMiniKQL::TUnboxedValueBatch&& batch,
i64 space,
bool finish) override
{
@@ -385,7 +385,7 @@ private:
auto guard = TaskRunner->BindAllocator();
auto sink = TaskRunner->GetSink(ev->Get()->Index);
- NKikimr::NMiniKQL::TUnboxedValueVector batch;
+ NKikimr::NMiniKQL::TUnboxedValueBatch batch(sink->GetOutputType());
NDqProto::TCheckpoint checkpoint;
TMaybe<NDqProto::TCheckpoint> maybeCheckpoint;
i64 size = 0;
diff --git a/ydb/library/yql/dq/runtime/dq_async_input.cpp b/ydb/library/yql/dq/runtime/dq_async_input.cpp
index 2b26d151ff7..25be6bf2eef 100644
--- a/ydb/library/yql/dq/runtime/dq_async_input.cpp
+++ b/ydb/library/yql/dq/runtime/dq_async_input.cpp
@@ -17,7 +17,7 @@ public:
return InputIndex;
}
- void Push(NKikimr::NMiniKQL::TUnboxedValueVector&& batch, i64 space) override {
+ void Push(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) override {
Y_VERIFY(!batch.empty() || !space);
if (!batch.empty()) {
AddBatch(std::move(batch), space);
diff --git a/ydb/library/yql/dq/runtime/dq_async_input.h b/ydb/library/yql/dq/runtime/dq_async_input.h
index a73fe4bac99..45c3ef4fc39 100644
--- a/ydb/library/yql/dq/runtime/dq_async_input.h
+++ b/ydb/library/yql/dq/runtime/dq_async_input.h
@@ -29,7 +29,7 @@ public:
virtual ui64 GetInputIndex() const = 0;
- virtual void Push(NKikimr::NMiniKQL::TUnboxedValueVector&& batch, i64 space) = 0;
+ virtual void Push(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) = 0;
virtual void Finish() = 0;
diff --git a/ydb/library/yql/dq/runtime/dq_async_output.cpp b/ydb/library/yql/dq/runtime/dq_async_output.cpp
index 5c8b38054ed..e60e7ca970f 100644
--- a/ydb/library/yql/dq/runtime/dq_async_output.cpp
+++ b/ydb/library/yql/dq/runtime/dq_async_output.cpp
@@ -67,6 +67,12 @@ public:
ReportChunkIn();
}
+ void WidePush(NUdf::TUnboxedValue* values, ui32 count) override {
+ Y_UNUSED(values);
+ Y_UNUSED(count);
+ YQL_ENSURE(false, "Wide stream is not supported");
+ }
+
void Push(NDqProto::TWatermark&& watermark) override {
const ui64 bytesSize = watermark.ByteSize();
Values.emplace_back(std::move(watermark), bytesSize);
@@ -91,7 +97,7 @@ public:
}
}
- ui64 Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch, ui64 bytes) override {
+ ui64 Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, ui64 bytes) override {
batch.clear();
ui64 valuesCount = 0;
ui64 usedBytes = 0;
@@ -106,7 +112,6 @@ public:
}
// Reserve size and return data.
- batch.reserve(valuesCount);
while (valuesCount--) {
batch.emplace_back(std::move(std::get<NUdf::TUnboxedValue>(Values.front().Value)));
Values.pop_front();
@@ -114,7 +119,7 @@ public:
Y_VERIFY(EstimatedStoredBytes >= usedBytes);
EstimatedStoredBytes -= usedBytes;
- ReportChunkOut(batch.size(), usedBytes);
+ ReportChunkOut(batch.RowCount(), usedBytes);
return usedBytes;
}
diff --git a/ydb/library/yql/dq/runtime/dq_async_output.h b/ydb/library/yql/dq/runtime/dq_async_output.h
index 86e57b6e58c..c8e8eb8be72 100644
--- a/ydb/library/yql/dq/runtime/dq_async_output.h
+++ b/ydb/library/yql/dq/runtime/dq_async_output.h
@@ -32,7 +32,7 @@ public:
// Pop data to send. Return estimated size of returned data.
[[nodiscard]]
- virtual ui64 Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch, ui64 bytes) = 0;
+ virtual ui64 Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, ui64 bytes) = 0;
// Pop watermark
[[nodiscard]]
virtual bool Pop(NDqProto::TWatermark& watermark) = 0;
diff --git a/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp b/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp
index a5b1b4babcd..81b3761004d 100644
--- a/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp
+++ b/ydb/library/yql/dq/runtime/dq_columns_resolve.cpp
@@ -7,21 +7,29 @@ namespace NYql::NDq {
using namespace NKikimr::NMiniKQL;
TMaybe<TColumnInfo> FindColumnInfo(const NKikimr::NMiniKQL::TType* type, TStringBuf columnName) {
- YQL_ENSURE(type->GetKind() == TType::EKind::Struct);
- const auto& structType = static_cast<const TStructType&>(*type);
-
- auto columnIndex = structType.FindMemberIndex(columnName);
- if (!columnIndex) {
- return {};
+ TType* memberType = nullptr;
+ ui32 idx;
+ if (type->GetKind() == TType::EKind::Multi) {
+ const auto& multiType = static_cast<const TMultiType&>(*type);
+ YQL_ENSURE(TryFromString(columnName, idx), "Expecting number as column name");
+ YQL_ENSURE(idx < multiType.GetElementsCount(), "Invalid column index");
+ memberType = multiType.GetElementType(idx);
+ } else {
+ YQL_ENSURE(type->GetKind() == TType::EKind::Struct);
+ const auto& structType = static_cast<const TStructType&>(*type);
+ auto columnIndex = structType.FindMemberIndex(columnName);
+ if (!columnIndex) {
+ return {};
+ }
+ memberType = structType.GetMemberType(*columnIndex);
+ idx = *columnIndex;
}
- auto memberType = structType.GetMemberType(*columnIndex);
-
if (memberType->GetKind() == TType::EKind::Optional) {
memberType = static_cast<TOptionalType&>(*memberType).GetItemType();
}
- return TColumnInfo{TString(columnName), *columnIndex, memberType};
+ return TColumnInfo{TString(columnName), idx, memberType};
}
TColumnInfo GetColumnInfo(const TType* type, TStringBuf columnName) {
diff --git a/ydb/library/yql/dq/runtime/dq_input.h b/ydb/library/yql/dq/runtime/dq_input.h
index 4067549a82f..d027d1d03a7 100644
--- a/ydb/library/yql/dq/runtime/dq_input.h
+++ b/ydb/library/yql/dq/runtime/dq_input.h
@@ -31,13 +31,20 @@ public:
virtual bool Empty() const = 0;
[[nodiscard]]
- virtual bool Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch) = 0;
+ virtual bool Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch) = 0;
virtual bool IsFinished() const = 0;
virtual const TDqInputStats* GetStats() const = 0;
virtual NKikimr::NMiniKQL::TType* GetInputType() const = 0;
+ inline TMaybe<ui32> GetInputWidth() const {
+ auto type = GetInputType();
+ if (type->IsMulti()) {
+ return static_cast<const NKikimr::NMiniKQL::TMultiType*>(type)->GetElementsCount();
+ }
+ return {};
+ }
// Checkpointing
// After pause IDqInput::Pop() stops return batches that were pushed before pause
diff --git a/ydb/library/yql/dq/runtime/dq_input_channel.cpp b/ydb/library/yql/dq/runtime/dq_input_channel.cpp
index 3431bfdbdb2..85956f9ecae 100644
--- a/ydb/library/yql/dq/runtime/dq_input_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_input_channel.cpp
@@ -13,18 +13,17 @@ private:
void PushImpl(NDqProto::TData&& data) {
const i64 space = data.GetRaw().size();
- NKikimr::NMiniKQL::TUnboxedValueVector buffer;
- buffer.reserve(data.GetRows());
-
+ NKikimr::NMiniKQL::TUnboxedValueBatch batch(InputType);
if (Y_UNLIKELY(ProfileStats)) {
auto startTime = TInstant::Now();
- DataSerializer.Deserialize(data, InputType, buffer);
+ DataSerializer.Deserialize(data, InputType, batch);
ProfileStats->DeserializationTime += (TInstant::Now() - startTime);
} else {
- DataSerializer.Deserialize(data, InputType, buffer);
+ DataSerializer.Deserialize(data, InputType, batch);
}
- AddBatch(std::move(buffer), space);
+ YQL_ENSURE(batch.RowCount() == data.GetRows());
+ AddBatch(std::move(batch), space);
}
void DeserializeAllData() {
@@ -73,7 +72,7 @@ public:
}
[[nodiscard]]
- bool Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch) override {
+ bool Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch) override {
if (Batches.empty()) {
DeserializeAllData();
}
diff --git a/ydb/library/yql/dq/runtime/dq_input_impl.h b/ydb/library/yql/dq/runtime/dq_input_impl.h
index dca47f85dd5..c4a672cfa7e 100644
--- a/ydb/library/yql/dq/runtime/dq_input_impl.h
+++ b/ydb/library/yql/dq/runtime/dq_input_impl.h
@@ -1,5 +1,8 @@
#pragma once
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/minikql/mkql_node.h>
+
namespace NYql::NDq {
template <class TDerived, class IInputInterface>
@@ -7,6 +10,7 @@ class TDqInputImpl : public IInputInterface {
public:
TDqInputImpl(NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes)
: InputType(inputType)
+ , Width(inputType->IsMulti() ? static_cast<const NKikimr::NMiniKQL::TMultiType*>(inputType)->GetElementsCount() : TMaybe<ui32>{})
, MaxBufferBytes(maxBufferBytes)
{
}
@@ -24,14 +28,16 @@ public:
return Batches.empty() || (IsPaused() && GetBatchesBeforePause() == 0);
}
- void AddBatch(NKikimr::NMiniKQL::TUnboxedValueVector&& batch, i64 space) {
+ void AddBatch(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) {
+ Y_VERIFY(batch.Width() == GetWidth());
+
StoredBytes += space;
- StoredRows += batch.size();
+ StoredRows += batch.RowCount();
auto& stats = MutableBasicStats();
stats.Chunks++;
stats.Bytes += space;
- stats.RowsIn += batch.size();
+ stats.RowsIn += batch.RowCount();
if (!stats.FirstRowTs) {
stats.FirstRowTs = TInstant::Now();
}
@@ -44,7 +50,8 @@ public:
}
[[nodiscard]]
- bool Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch) override {
+ bool Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch) override {
+ Y_VERIFY(batch.Width() == GetWidth());
if (Empty()) {
return false;
}
@@ -55,12 +62,22 @@ public:
ui64 batchesCount = GetBatchesBeforePause();
Y_VERIFY(batchesCount <= Batches.size());
- batch.reserve(StoredRowsBeforePause);
-
- while (batchesCount--) {
- auto& part = Batches.front();
- std::move(part.begin(), part.end(), std::back_inserter(batch));
- Batches.pop_front();
+ if (batch.IsWide()) {
+ while (batchesCount--) {
+ auto& part = Batches.front();
+ part.ForEachRowWide([&batch](NUdf::TUnboxedValue* values, ui32 width) {
+ batch.PushRow(values, width);
+ });
+ Batches.pop_front();
+ }
+ } else {
+ while (batchesCount--) {
+ auto& part = Batches.front();
+ part.ForEachRow([&batch](NUdf::TUnboxedValue& value) {
+ batch.emplace_back(std::move(value));
+ });
+ Batches.pop_front();
+ }
}
BatchesBeforePause = PauseMask;
@@ -70,10 +87,18 @@ public:
StoredBytesBeforePause = 0;
StoredRowsBeforePause = 0;
} else {
- batch.reserve(StoredRows);
-
- for (auto&& part : Batches) {
- std::move(part.begin(), part.end(), std::back_inserter(batch));
+ if (batch.IsWide()) {
+ for (auto&& part : Batches) {
+ part.ForEachRowWide([&batch](NUdf::TUnboxedValue* values, ui32 width) {
+ batch.PushRow(values, width);
+ });
+ }
+ } else {
+ for (auto&& part : Batches) {
+ part.ForEachRow([&batch](NUdf::TUnboxedValue& value) {
+ batch.emplace_back(std::move(value));
+ });
+ }
}
StoredBytes = 0;
@@ -81,7 +106,7 @@ public:
Batches.clear();
}
- MutableBasicStats().RowsOut += batch.size();
+ MutableBasicStats().RowsOut += batch.RowCount();
return true;
}
@@ -128,10 +153,15 @@ protected:
return BatchesBeforePause & ~PauseMask;
}
+ TMaybe<ui32> GetWidth() const {
+ return Width;
+ }
+
protected:
NKikimr::NMiniKQL::TType* const InputType = nullptr;
+ const TMaybe<ui32> Width;
const ui64 MaxBufferBytes = 0;
- TList<NKikimr::NMiniKQL::TUnboxedValueVector, NKikimr::NMiniKQL::TMKQLAllocator<NUdf::TUnboxedValue>> Batches;
+ TList<NKikimr::NMiniKQL::TUnboxedValueBatch, NKikimr::NMiniKQL::TMKQLAllocator<NKikimr::NMiniKQL::TUnboxedValueBatch>> Batches;
ui64 StoredBytes = 0;
ui64 StoredRows = 0;
bool Finished = false;
diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.cpp b/ydb/library/yql/dq/runtime/dq_input_producer.cpp
index d86bcd90a1e..7860afeff2c 100644
--- a/ydb/library/yql/dq/runtime/dq_input_producer.cpp
+++ b/ydb/library/yql/dq/runtime/dq_input_producer.cpp
@@ -12,18 +12,21 @@ using namespace NUdf;
namespace {
-class TDqInputUnionStreamValue : public TComputationValue<TDqInputUnionStreamValue> {
+template<bool IsWide>
+class TDqInputUnionStreamValue : public TComputationValue<TDqInputUnionStreamValue<IsWide>> {
+ using TBase = TComputationValue<TDqInputUnionStreamValue<IsWide>>;
public:
TDqInputUnionStreamValue(TMemoryUsageInfo* memInfo, TVector<IDqInput::TPtr>&& inputs, TDqMeteringStats::TInputStatsMeter stats)
- : TComputationValue<TDqInputUnionStreamValue>(memInfo)
+ : TBase(memInfo)
, Inputs(std::move(inputs))
- , CurrentItemIndex(0)
+ , Batch(Inputs.empty() ? nullptr : Inputs.front()->GetInputType())
, Stats(stats)
{}
private:
NUdf::EFetchStatus Fetch(NKikimr::NUdf::TUnboxedValue& result) final {
- if (CurrentItemIndex >= CurrentBuffer.size()) {
+ MKQL_ENSURE(!IsWide, "Using Fetch() on wide input");
+ if (Batch.empty()) {
auto status = FindBuffer();
switch (status) {
case NUdf::EFetchStatus::Ok:
@@ -34,21 +37,45 @@ private:
}
}
- result = std::move(CurrentBuffer[CurrentItemIndex]);
+ result = std::move(*Batch.Head());
+ Batch.Pop();
+
if (Stats) {
Stats.Add(result);
}
- ++CurrentItemIndex;
+ return NUdf::EFetchStatus::Ok;
+ }
+
+ NUdf::EFetchStatus WideFetch(NKikimr::NUdf::TUnboxedValue* result, ui32 width) final {
+ YQL_ENSURE(IsWide, "Using WideFetch() on narrow input");
+ if (Batch.empty()) {
+ auto status = FindBuffer();
+ switch (status) {
+ case NUdf::EFetchStatus::Ok:
+ break;
+ case NUdf::EFetchStatus::Finish:
+ case NUdf::EFetchStatus::Yield:
+ return status;
+ }
+ }
+
+ MKQL_ENSURE_S(Batch.Width() == width);
+ NUdf::TUnboxedValue* head = Batch.Head();
+ std::move(head, head + width, result);
+ Batch.Pop();
+
+ if (Stats) {
+ Stats.Add(result, width);
+ }
return NUdf::EFetchStatus::Ok;
}
NUdf::EFetchStatus FindBuffer() {
bool allFinished = true;
- CurrentBuffer.clear();
+ Batch.clear();
for (auto& input : Inputs) {
- if (input->Pop(CurrentBuffer)) {
- CurrentItemIndex = 0;
+ if (input->Pop(Batch)) {
return NUdf::EFetchStatus::Ok;
}
allFinished &= input->IsFinished();
@@ -59,46 +86,47 @@ private:
private:
TVector<IDqInput::TPtr> Inputs;
- TUnboxedValueVector CurrentBuffer;
- ui64 CurrentItemIndex;
+ TUnboxedValueBatch Batch;
TDqMeteringStats::TInputStatsMeter Stats;
};
-class TDqInputMergeStreamValue : public TComputationValue<TDqInputMergeStreamValue> {
+template<bool IsWide>
+class TDqInputMergeStreamValue : public TComputationValue<TDqInputMergeStreamValue<IsWide>> {
+ using TBase = TComputationValue<TDqInputMergeStreamValue<IsWide>>;
public:
TDqInputMergeStreamValue(TMemoryUsageInfo* memInfo, TVector<IDqInput::TPtr>&& inputs,
TVector<TSortColumnInfo>&& sortCols, TDqMeteringStats::TInputStatsMeter stats)
- : TComputationValue<TDqInputMergeStreamValue>(memInfo)
+ : TBase(memInfo)
, Inputs(std::move(inputs))
, SortCols(std::move(sortCols))
, Stats(stats)
- {
- CurrentBuffers.resize(Inputs.size());
- CurrentItemIndexes.reserve(Inputs.size());
- for (ui32 idx = 0; idx < Inputs.size(); ++idx) {
- CurrentItemIndexes.emplace_back(TUnboxedValuesIterator(*this, Inputs[idx], CurrentBuffers[idx]));
- }
- for (auto& sortCol : SortCols) {
- const auto typeId = sortCol.GetTypeId();
- TMaybe<EDataSlot> maybeDataSlot = FindDataSlot(typeId);
- YQL_ENSURE(maybeDataSlot, "Trying to compare columns with unknown type id: " << typeId);
- YQL_ENSURE(IsTypeSupportedInMergeCn(*maybeDataSlot), "Column '" << sortCol.Name <<
- "' has unsupported type for Merge connection: " << *maybeDataSlot);
- SortColTypes[sortCol.Index] = *maybeDataSlot;
- }
+ {
+ CurrentBuffers.resize(Inputs.size());
+ CurrentItemIndexes.reserve(Inputs.size());
+ for (ui32 idx = 0; idx < Inputs.size(); ++idx) {
+ CurrentItemIndexes.emplace_back(TUnboxedValuesIterator<IsWide>(*this, Inputs[idx], CurrentBuffers[idx]));
}
+ for (auto& sortCol : SortCols) {
+ const auto typeId = sortCol.GetTypeId();
+ TMaybe<EDataSlot> maybeDataSlot = FindDataSlot(typeId);
+ YQL_ENSURE(maybeDataSlot, "Trying to compare columns with unknown type id: " << typeId);
+ YQL_ENSURE(IsTypeSupportedInMergeCn(*maybeDataSlot), "Column '" << sortCol.Name <<
+ "' has unsupported type for Merge connection: " << *maybeDataSlot);
+ SortColTypes[sortCol.Index] = *maybeDataSlot;
+ }
+ }
private:
+ template<bool IsWideIter>
class TUnboxedValuesIterator {
private:
- TUnboxedValueVector* Data = nullptr;
+ TUnboxedValueBatch* Data = nullptr;
IDqInput::TPtr Input;
const TDqInputMergeStreamValue* Comparator = nullptr;
- ui32 CurrentIndex = 0;
+ ui32 Width_;
public:
NUdf::EFetchStatus FindBuffer() {
Data->clear();
- CurrentIndex = 0;
if (Input->Pop(*Data)) {
return NUdf::EFetchStatus::Ok;
}
@@ -106,34 +134,40 @@ private:
return Input->IsFinished() ? NUdf::EFetchStatus::Finish : NUdf::EFetchStatus::Yield;
}
- TUnboxedValuesIterator(const TDqInputMergeStreamValue& comparator, IDqInput::TPtr input, TUnboxedValueVector& data)
+ TUnboxedValuesIterator(const TDqInputMergeStreamValue<IsWideIter>& comparator, IDqInput::TPtr input, TUnboxedValueBatch& data)
: Data(&data)
, Input(input)
, Comparator(&comparator)
+ , Width_(data.IsWide() ? *data.Width() : 1u)
{
-
}
bool IsYield() const {
- return CurrentIndex == Data->size();
+ return Data->empty();
}
bool operator<(const TUnboxedValuesIterator& item) const {
return Comparator->CompareSortCols(GetValue(), item.GetValue()) > 0;
}
- void operator++() {
- ++CurrentIndex;
- Y_VERIFY(CurrentIndex <= Data->size());
+
+ ui32 Width() const {
+ return Width_;
}
- NKikimr::NUdf::TUnboxedValue& GetValue() {
- return (*Data)[CurrentIndex];
+
+ void Pop() {
+ Data->Pop();
+ }
+
+ NKikimr::NUdf::TUnboxedValue* GetValue() {
+ return Data->Head();
}
- const NKikimr::NUdf::TUnboxedValue& GetValue() const {
- return (*Data)[CurrentIndex];
+ const NKikimr::NUdf::TUnboxedValue* GetValue() const {
+ return Data->Head();
}
};
NUdf::EFetchStatus Fetch(NKikimr::NUdf::TUnboxedValue& result) final {
+ YQL_ENSURE(!IsWide, "Using Fetch() on wide input");
auto status = CheckBuffers();
switch (status) {
case NUdf::EFetchStatus::Ok:
@@ -143,31 +177,60 @@ private:
return status;
}
- result = std::move(FindResult());
+ CopyResult(&result, 1);
if (Stats) {
Stats.Add(result);
}
return NUdf::EFetchStatus::Ok;
}
- NKikimr::NUdf::TUnboxedValue FindResult() {
+ NUdf::EFetchStatus WideFetch(NKikimr::NUdf::TUnboxedValue* result, ui32 width) final {
+ YQL_ENSURE(IsWide, "Using WideFetch() on narrow input");
+ auto status = CheckBuffers();
+ switch (status) {
+ case NUdf::EFetchStatus::Ok:
+ break;
+ case NUdf::EFetchStatus::Finish:
+ case NUdf::EFetchStatus::Yield:
+ return status;
+ }
+
+ YQL_ENSURE(!Inputs.empty() && *Inputs.front()->GetInputWidth() == width);
+ CopyResult(result, width);
+ if (Stats) {
+ Stats.Add(result, width);
+ }
+ return NUdf::EFetchStatus::Ok;
+ }
+
+
+ void CopyResult(NKikimr::NUdf::TUnboxedValue* dst, ui32 width) {
YQL_ENSURE(CurrentItemIndexes.size());
std::pop_heap(CurrentItemIndexes.begin(), CurrentItemIndexes.end());
auto& current = CurrentItemIndexes.back();
Y_VERIFY(!current.IsYield());
- NKikimr::NUdf::TUnboxedValue res = current.GetValue();
- ++current;
+ YQL_ENSURE(width == current.Width());
+
+ NKikimr::NUdf::TUnboxedValue* res = current.GetValue();
+ std::move(res, res + width, dst);
+ current.Pop();
if (!current.IsYield()) {
std::push_heap(CurrentItemIndexes.begin(), CurrentItemIndexes.end());
}
- return res;
}
- int CompareSortCols(const NKikimr::NUdf::TUnboxedValue& lhs, const NKikimr::NUdf::TUnboxedValue& rhs) const {
+ int CompareSortCols(const NKikimr::NUdf::TUnboxedValue* lhs, const NKikimr::NUdf::TUnboxedValue* rhs) const {
int compRes = 0;
for (auto sortCol = SortCols.begin(); sortCol != SortCols.end() && compRes == 0; ++sortCol) {
- auto lhsColValue = lhs.GetElement(sortCol->Index);
- auto rhsColValue = rhs.GetElement(sortCol->Index);
+ NKikimr::NUdf::TUnboxedValue lhsColValue;
+ NKikimr::NUdf::TUnboxedValue rhsColValue;
+ if constexpr (IsWide) {
+ lhsColValue = *(lhs + sortCol->Index);
+ rhsColValue = *(rhs + sortCol->Index);
+ } else {
+ lhsColValue = lhs->GetElement(sortCol->Index);
+ rhsColValue = rhs->GetElement(sortCol->Index);
+ }
auto it = SortColTypes.find(sortCol->Index);
Y_VERIFY(it != SortColTypes.end());
compRes = NKikimr::NMiniKQL::CompareValues(it->second,
@@ -217,13 +280,27 @@ private:
private:
TVector<IDqInput::TPtr> Inputs;
TVector<TSortColumnInfo> SortCols;
- TVector<TUnboxedValueVector> CurrentBuffers;
- TVector<TUnboxedValuesIterator> CurrentItemIndexes;
+ TVector<TUnboxedValueBatch> CurrentBuffers;
+ TVector<TUnboxedValuesIterator<IsWide>> CurrentItemIndexes;
ui32 InitializationIndex = 0;
TMap<ui32, EDataSlot> SortColTypes;
TDqMeteringStats::TInputStatsMeter Stats;
};
+bool IsWideInputs(const TVector<IDqInput::TPtr>& inputs) {
+ NKikimr::NMiniKQL::TType* type = nullptr;
+ bool isWide = false;
+ for (auto& input : inputs) {
+ if (!type) {
+ type = input->GetInputType();
+ isWide = input->GetInputWidth().Defined();
+ } else {
+ YQL_ENSURE(type->IsSameType(*input->GetInputType()));
+ }
+ }
+ return isWide;
+}
+
} // namespace
void TDqMeteringStats::TInputStatsMeter::Add(const NKikimr::NUdf::TUnboxedValue& val) {
@@ -237,16 +314,42 @@ void TDqMeteringStats::TInputStatsMeter::Add(const NKikimr::NUdf::TUnboxedValue&
}
}
+void TDqMeteringStats::TInputStatsMeter::Add(const NKikimr::NUdf::TUnboxedValue* row, ui32 width) {
+ Stats->RowsConsumed += 1;
+ if (InputType) {
+ YQL_ENSURE(InputType->IsMulti());
+ auto multiType = static_cast<const TMultiType*>(InputType);
+ YQL_ENSURE(width == multiType->GetElementsCount());
+
+ NYql::NDq::TDqDataSerializer::TEstimateSizeSettings settings;
+ settings.DiscardUnsupportedTypes = true;
+ settings.WithHeaders = false;
+
+ ui64 size = 0;
+ for (ui32 i = 0; i < multiType->GetElementsCount(); ++i) {
+ size += TDqDataSerializer::EstimateSize(row[i], multiType->GetElementType(i), nullptr, settings);
+ }
+
+ Stats->BytesConsumed += Max<ui64>(size, 8 /* billing size for count(*) */);
+ }
+}
+
NUdf::TUnboxedValue CreateInputUnionValue(TVector<IDqInput::TPtr>&& inputs,
const NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats)
{
- return factory.Create<TDqInputUnionStreamValue>(std::move(inputs), stats);
+ if (IsWideInputs(inputs)) {
+ return factory.Create<TDqInputUnionStreamValue<true>>(std::move(inputs), stats);
+ }
+ return factory.Create<TDqInputUnionStreamValue<false>>(std::move(inputs), stats);
}
NKikimr::NUdf::TUnboxedValue CreateInputMergeValue(TVector<IDqInput::TPtr>&& inputs,
TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats)
{
- return factory.Create<TDqInputMergeStreamValue>(std::move(inputs), std::move(sortCols), stats);
+ if (IsWideInputs(inputs)) {
+ return factory.Create<TDqInputMergeStreamValue<true>>(std::move(inputs), std::move(sortCols), stats);
+ }
+ return factory.Create<TDqInputMergeStreamValue<false>>(std::move(inputs), std::move(sortCols), stats);
}
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.h b/ydb/library/yql/dq/runtime/dq_input_producer.h
index 60628f76268..d4e96b2aebb 100644
--- a/ydb/library/yql/dq/runtime/dq_input_producer.h
+++ b/ydb/library/yql/dq/runtime/dq_input_producer.h
@@ -13,6 +13,7 @@ struct TDqMeteringStats {
struct TInputStatsMeter {
void Add(const NKikimr::NUdf::TUnboxedValue&);
+ void Add(const NKikimr::NUdf::TUnboxedValue* row, ui32 width);
operator bool() { return Stats; }
TInputStats* Stats = nullptr;
diff --git a/ydb/library/yql/dq/runtime/dq_output.h b/ydb/library/yql/dq/runtime/dq_output.h
index c0251410782..23dd7dac29b 100644
--- a/ydb/library/yql/dq/runtime/dq_output.h
+++ b/ydb/library/yql/dq/runtime/dq_output.h
@@ -43,6 +43,7 @@ public:
virtual bool IsFull() const = 0;
// can throw TDqChannelStorageException
virtual void Push(NUdf::TUnboxedValue&& value) = 0;
+ virtual void WidePush(NUdf::TUnboxedValue* values, ui32 count) = 0;
virtual void Push(NDqProto::TWatermark&& watermark) = 0;
// Push checkpoint. Checkpoints may be pushed to channel even after it is finished.
virtual void Push(NDqProto::TCheckpoint&& checkpoint) = 0;
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
index 940e899211f..266568920cd 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
@@ -55,6 +55,7 @@ public:
, BasicStats(ChannelId)
, ProfileStats(settings.CollectProfileStats ? &BasicStats : nullptr)
, Packer(OutputType)
+ , Width(OutputType->IsMulti() ? static_cast<NMiniKQL::TMultiType*>(OutputType)->GetElementsCount() : 1u)
, Storage(settings.ChannelStorage)
, HolderFactory(holderFactory)
, TransportVersion(settings.TransportVersion)
@@ -81,7 +82,18 @@ public:
return Storage->IsFull();
}
- void Push(NUdf::TUnboxedValue&& value) override {
+ virtual void Push(NUdf::TUnboxedValue&& value) override {
+ YQL_ENSURE(!OutputType->IsMulti());
+ DoPush(&value, 1);
+ }
+
+ virtual void WidePush(NUdf::TUnboxedValue* values, ui32 width) override {
+ YQL_ENSURE(OutputType->IsMulti());
+ YQL_ENSURE(Width == width);
+ DoPush(values, width);
+ }
+
+ void DoPush(NUdf::TUnboxedValue* values, ui32 width) {
TProfileGuard guard(ProfileStats ? &ProfileStats->SerializationTime : nullptr);
if (!BasicStats.FirstRowIn) {
BasicStats.FirstRowIn = TInstant::Now();
@@ -97,8 +109,15 @@ public:
return;
}
- Packer.AddItem(value);
- value = {};
+ if (OutputType->IsMulti()) {
+ Packer.AddWideItem(values, width);
+ } else {
+ Packer.AddItem(*values);
+ }
+ for (ui32 i = 0; i < width; ++i) {
+ values[i] = {};
+ }
+
ChunkRowCount++;
BasicStats.RowsIn++;
@@ -265,7 +284,7 @@ public:
ChunkRowCount = 0;
}
- NKikimr::NMiniKQL::TUnboxedValueVector rows;
+ NKikimr::NMiniKQL::TUnboxedValueBatch rows(OutputType);
for (;;) {
NDqProto::TData chunk;
if (!this->Pop(chunk)) {
@@ -275,11 +294,17 @@ public:
Packer.UnpackBatch(buf, HolderFactory, rows);
}
- for (auto& row : rows) {
- Packer.AddItem(row);
+ if (OutputType->IsMulti()) {
+ rows.ForEachRowWide([this](const auto* values, ui32 width) {
+ Packer.AddWideItem(values, width);
+ });
+ } else {
+ rows.ForEachRow([this](const auto& value) {
+ Packer.AddItem(value);
+ });
}
- data.SetRows(rows.size());
+ data.SetRows(rows.RowCount());
auto buffer = Packer.FinishAndPull();
data.MutableRaw()->reserve(buffer.Size());
buffer.CopyTo(*data.MutableRaw());
@@ -330,6 +355,7 @@ private:
TDqOutputChannelStats BasicStats;
TDqOutputChannelStats* ProfileStats = nullptr;
NKikimr::NMiniKQL::TValuePackerTransport<FastPack> Packer;
+ const ui32 Width;
const IDqChannelStorage::TPtr Storage;
const NMiniKQL::THolderFactory& HolderFactory;
const NDqProto::EDataTransportVersion TransportVersion;
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp
index 20800da3223..1d72c08ffde 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp
@@ -26,6 +26,11 @@ void Log(TStringBuf msg) {
#endif
}
+enum EChannelWidth {
+ NARROW_CHANNEL,
+ WIDE_CHANNEL,
+};
+
struct TTestContext {
TScopedAlloc Alloc;
TTypeEnvironment TypeEnv;
@@ -33,18 +38,22 @@ struct TTestContext {
THolderFactory HolderFactory;
TDefaultValueBuilder Vb;
NDqProto::EDataTransportVersion TransportVersion;
+ bool IsWide;
TDqDataSerializer Ds;
TStructType* OutputType = nullptr;
+ TMultiType* WideOutputType = nullptr;
- TTestContext(NDqProto::EDataTransportVersion transportVersion = NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, bool bigRows = false)
+ TTestContext(EChannelWidth width = NARROW_CHANNEL, NDqProto::EDataTransportVersion transportVersion = NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, bool bigRows = false)
: Alloc(__LOCATION__)
, TypeEnv(Alloc)
, MemInfo("Mem")
, HolderFactory(Alloc.Ref(), MemInfo)
, Vb(HolderFactory)
, TransportVersion(transportVersion)
+ , IsWide(width == WIDE_CHANNEL)
, Ds(TypeEnv, HolderFactory, TransportVersion)
{
+ //TMultiType::Create(ui32 elementsCount, TType *const *elements, const TTypeEnvironment &env)
if (bigRows) {
TStructMember members[3] = {
{"x", TDataType::Create(NUdf::TDataType<i32>::Id, TypeEnv)},
@@ -59,9 +68,27 @@ struct TTestContext {
};
OutputType = TStructType::Create(2, members, TypeEnv);
}
+
+ TVector<TType*> components;
+ for (ui32 i = 0; i < OutputType->GetMembersCount(); ++i) {
+ components.push_back(OutputType->GetMemberType(i));
+ }
+ WideOutputType = TMultiType::Create(components.size(), components.data(), TypeEnv);
}
- NUdf::TUnboxedValue CreateRow(ui32 value) {
+ TUnboxedValueBatch CreateRow(ui32 value) {
+ if (IsWide) {
+ TUnboxedValueBatch result(WideOutputType);
+ result.PushRow([&](ui32 idx) {
+ if (idx == 0) {
+ return NUdf::TUnboxedValuePod(value);
+ } else if (idx == 1) {
+ return NUdf::TUnboxedValuePod((ui64)(value * value));
+ }
+ return NMiniKQL::MakeString("***");
+ });
+ return result;
+ }
NUdf::TUnboxedValue* items;
auto row = Vb.NewArray(OutputType->GetMembersCount(), items);
items[0] = NUdf::TUnboxedValuePod(value);
@@ -69,10 +96,24 @@ struct TTestContext {
if (OutputType->GetMembersCount() == 3) {
items[2] = NMiniKQL::MakeString("***");
}
- return row;
+ TUnboxedValueBatch result(OutputType);
+ result.emplace_back(std::move(row));
+ return result;
}
- NUdf::TUnboxedValue CreateBigRow(ui32 value, ui32 size) {
+ TUnboxedValueBatch CreateBigRow(ui32 value, ui32 size) {
+ if (IsWide) {
+ TUnboxedValueBatch result(WideOutputType);
+ result.PushRow([&](ui32 idx) {
+ if (idx == 0) {
+ return NUdf::TUnboxedValuePod(value);
+ } else if (idx == 1) {
+ return NUdf::TUnboxedValuePod((ui64)(value * value));
+ }
+ return NMiniKQL::MakeString(std::string(size, '*'));
+ });
+ return result;
+ }
NUdf::TUnboxedValue* items;
auto row = Vb.NewArray(OutputType->GetMembersCount(), items);
items[0] = NUdf::TUnboxedValuePod(value);
@@ -80,10 +121,57 @@ struct TTestContext {
if (OutputType->GetMembersCount() == 3) {
items[2] = NMiniKQL::MakeString(std::string(size, '*'));
}
- return row;
+ TUnboxedValueBatch result(OutputType);
+ result.emplace_back(std::move(row));
+ return result;
+ }
+
+ TType* GetOutputType() const {
+ if (IsWide) {
+ return WideOutputType;
+ }
+ return OutputType;
+ }
+
+ ui32 Width() const {
+ if (IsWide) {
+ return WideOutputType->GetElementsCount();
+ }
+ return 1u;
}
};
+void ValidateBatch(const TTestContext& ctx, const TUnboxedValueBatch& batch, ui32 startIndex, size_t expectedBatchSize) {
+ UNIT_ASSERT_VALUES_EQUAL(expectedBatchSize, batch.RowCount());
+ ui32 i = 0;
+ if (ctx.IsWide) {
+ batch.ForEachRowWide([&](const NUdf::TUnboxedValue* values, ui32 width) {
+ ui32 j = i + startIndex;
+ UNIT_ASSERT_VALUES_EQUAL(width, ctx.Width());
+ UNIT_ASSERT_VALUES_EQUAL(j, values[0].Get<i32>());
+ UNIT_ASSERT_VALUES_EQUAL(j * j, values[1].Get<ui64>());
+ ++i;
+ });
+ } else {
+ batch.ForEachRow([&](const NUdf::TUnboxedValue& value) {
+ ui32 j = i + startIndex;
+ UNIT_ASSERT_VALUES_EQUAL(j, value.GetElement(0).Get<i32>());
+ UNIT_ASSERT_VALUES_EQUAL(j * j, value.GetElement(1).Get<ui64>());
+ ++i;
+ });
+ }
+ UNIT_ASSERT_VALUES_EQUAL(expectedBatchSize, i);
+}
+
+void PushRow(const TTestContext& ctx, TUnboxedValueBatch&& row, const IDqOutputChannel::TPtr& ch) {
+ auto* values = row.Head();
+ if (ctx.IsWide) {
+ ch->WidePush(values, *row.Width());
+ } else {
+ ch->Push(std::move(*values));
+ }
+}
+
void TestSingleRead(TTestContext& ctx) {
TDqOutputChannelSettings settings;
settings.MaxStoredBytes = 1000;
@@ -91,12 +179,12 @@ void TestSingleRead(TTestContext& ctx) {
settings.CollectProfileStats = true;
settings.TransportVersion = ctx.TransportVersion;
- auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log);
+ auto ch = CreateDqOutputChannel(1, ctx.GetOutputType(), ctx.HolderFactory, settings, Log);
for (i32 i = 0; i < 10; ++i) {
auto row = ctx.CreateRow(i);
UNIT_ASSERT(!ch->IsFull());
- ch->Push(std::move(row));
+ PushRow(ctx, std::move(row), ch);
}
UNIT_ASSERT_VALUES_EQUAL(0, ch->GetStats()->Chunks);
@@ -111,15 +199,10 @@ void TestSingleRead(TTestContext& ctx) {
UNIT_ASSERT_VALUES_EQUAL(10, ch->GetStats()->RowsIn);
UNIT_ASSERT_VALUES_EQUAL(10, ch->GetStats()->RowsOut);
- TUnboxedValueVector buffer;
- ctx.Ds.Deserialize(data, ctx.OutputType, buffer);
-
- UNIT_ASSERT_VALUES_EQUAL(10, buffer.size());
- for (i32 i = 0; i < 10; ++i) {
- UNIT_ASSERT_VALUES_EQUAL(i, buffer[i].GetElement(0).Get<i32>());
- UNIT_ASSERT_VALUES_EQUAL(i * i, buffer[i].GetElement(1).Get<ui64>());
- }
+ TUnboxedValueBatch buffer(ctx.GetOutputType());
+ ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer);
+ ValidateBatch(ctx, buffer, 0, 10);
data.Clear();
UNIT_ASSERT(!ch->Pop(data));
}
@@ -131,12 +214,12 @@ void TestPartialRead(TTestContext& ctx) {
settings.CollectProfileStats = true;
settings.TransportVersion = ctx.TransportVersion;
- auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log);
+ auto ch = CreateDqOutputChannel(1, ctx.GetOutputType(), ctx.HolderFactory, settings, Log);
for (i32 i = 0; i < 9; ++i) {
auto row = ctx.CreateRow(i);
UNIT_ASSERT(!ch->IsFull());
- ch->Push(std::move(row));
+ PushRow(ctx, std::move(row), ch);
}
UNIT_ASSERT_VALUES_EQUAL(0, ch->GetStats()->Chunks);
@@ -160,16 +243,9 @@ void TestPartialRead(TTestContext& ctx) {
UNIT_ASSERT_VALUES_EQUAL(9, ch->GetStats()->RowsIn);
UNIT_ASSERT_VALUES_EQUAL(readRows + data.GetRows(), ch->GetStats()->RowsOut);
- TUnboxedValueVector buffer;
- ctx.Ds.Deserialize(data, ctx.OutputType, buffer);
-
- UNIT_ASSERT_VALUES_EQUAL(data.GetRows(), buffer.size());
- for (ui32 i = 0; i < data.GetRows(); ++i) {
- ui32 j = readRows + i;
- UNIT_ASSERT_VALUES_EQUAL(j, buffer[i].GetElement(0).Get<i32>());
- UNIT_ASSERT_VALUES_EQUAL(j * j, buffer[i].GetElement(1).Get<ui64>());
- }
-
+ TUnboxedValueBatch buffer(ctx.GetOutputType());
+ ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer);
+ ValidateBatch(ctx, buffer, readRows, data.GetRows());
readRows += data.GetRows();
}
@@ -184,12 +260,12 @@ void TestOverflow(TTestContext& ctx) {
settings.CollectProfileStats = true;
settings.TransportVersion = ctx.TransportVersion;
- auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log);
+ auto ch = CreateDqOutputChannel(1, ctx.GetOutputType(), ctx.HolderFactory, settings, Log);
for (i32 i = 0; i < 8; ++i) {
auto row = ctx.CreateRow(i);
UNIT_ASSERT(!ch->IsFull());
- ch->Push(std::move(row));
+ PushRow(ctx, std::move(row), ch);
}
UNIT_ASSERT_VALUES_EQUAL(8, ch->GetStats()->RowsIn);
@@ -198,7 +274,7 @@ void TestOverflow(TTestContext& ctx) {
UNIT_ASSERT(ch->IsFull());
try {
auto row = ctx.CreateRow(100'500);
- ch->Push(std::move(row));
+ PushRow(ctx, std::move(row), ch);
UNIT_FAIL("");
} catch (yexception& e) {
UNIT_ASSERT(TString(e.what()).Contains("requirement !IsFull() failed"));
@@ -212,32 +288,26 @@ void TestPopAll(TTestContext& ctx) {
settings.CollectProfileStats = true;
settings.TransportVersion = ctx.TransportVersion;
- auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log);
+ auto ch = CreateDqOutputChannel(1, ctx.GetOutputType(), ctx.HolderFactory, settings, Log);
for (i32 i = 0; i < 50; ++i) {
auto row = ctx.CreateRow(i);
UNIT_ASSERT(!ch->IsFull());
- ch->Push(std::move(row));
+ PushRow(ctx, std::move(row), ch);
}
UNIT_ASSERT_VALUES_EQUAL(50, ch->GetStats()->RowsIn);
UNIT_ASSERT_VALUES_EQUAL(0, ch->GetStats()->RowsOut);
NDqProto::TData data;
- TUnboxedValueVector buffer;
+ TUnboxedValueBatch buffer(ctx.GetOutputType());
UNIT_ASSERT(ch->PopAll(data));
UNIT_ASSERT_VALUES_EQUAL(50, data.GetRows());
- ctx.Ds.Deserialize(data, ctx.OutputType, buffer);
- UNIT_ASSERT_VALUES_EQUAL(50, buffer.size());
-
- for (i32 i = 0; i < 50; ++i) {
- UNIT_ASSERT_VALUES_EQUAL(i, buffer[i].GetElement(0).Get<i32>());
- UNIT_ASSERT_VALUES_EQUAL(i * i, buffer[i].GetElement(1).Get<ui64>());
- }
-
+ ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer);
+ ValidateBatch(ctx, buffer, 0, 50);
data.Clear();
UNIT_ASSERT(!ch->Pop(data));
}
@@ -249,18 +319,18 @@ void TestBigRow(TTestContext& ctx) {
settings.CollectProfileStats = true;
settings.TransportVersion = ctx.TransportVersion;
- auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log);
+ auto ch = CreateDqOutputChannel(1, ctx.GetOutputType(), ctx.HolderFactory, settings, Log);
{
auto row = ctx.CreateRow(1);
UNIT_ASSERT(!ch->IsFull());
- ch->Push(std::move(row));
+ PushRow(ctx, std::move(row), ch);
}
{
for (ui32 i = 2; i < 10; ++i) {
auto row = ctx.CreateBigRow(i, 10_MB);
UNIT_ASSERT(!ch->IsFull());
- ch->Push(std::move(row));
+ PushRow(ctx, std::move(row), ch);
}
}
@@ -277,14 +347,27 @@ void TestBigRow(TTestContext& ctx) {
UNIT_ASSERT_VALUES_EQUAL(9, ch->GetStats()->RowsIn);
UNIT_ASSERT_VALUES_EQUAL(2, ch->GetStats()->RowsOut);
- TUnboxedValueVector buffer;
- ctx.Ds.Deserialize(data, ctx.OutputType, buffer);
+ TUnboxedValueBatch buffer(ctx.GetOutputType());
+ ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer);
+
+ UNIT_ASSERT_VALUES_EQUAL(2, buffer.RowCount());
+ ui32 i = 1;
- UNIT_ASSERT_VALUES_EQUAL(2, buffer.size());
- for (ui32 i = 1; i < 3; ++i) {
- UNIT_ASSERT_VALUES_EQUAL(i, buffer[i - 1].GetElement(0).Get<i32>());
- UNIT_ASSERT_VALUES_EQUAL(i * i, buffer[i - 1].GetElement(1).Get<ui64>());
+ if (ctx.IsWide) {
+ buffer.ForEachRowWide([&](const NUdf::TUnboxedValue* values, ui32 width) {
+ UNIT_ASSERT_VALUES_EQUAL(width, ctx.Width());
+ UNIT_ASSERT_VALUES_EQUAL(i, values[0].Get<i32>());
+ UNIT_ASSERT_VALUES_EQUAL(i * i, values[1].Get<ui64>());
+ ++i;
+ });
+ } else {
+ buffer.ForEachRow([&](const NUdf::TUnboxedValue& value) {
+ UNIT_ASSERT_VALUES_EQUAL(i, value.GetElement(0).Get<i32>());
+ UNIT_ASSERT_VALUES_EQUAL(i * i, value.GetElement(1).Get<ui64>());
+ ++i;
+ });
}
+ UNIT_ASSERT_VALUES_EQUAL(3, i);
}
for (ui32 i = 3; i < 10; ++i) {
@@ -296,12 +379,19 @@ void TestBigRow(TTestContext& ctx) {
UNIT_ASSERT_VALUES_EQUAL(9, ch->GetStats()->RowsIn);
UNIT_ASSERT_VALUES_EQUAL(i, ch->GetStats()->RowsOut);
- TUnboxedValueVector buffer;
- ctx.Ds.Deserialize(data, ctx.OutputType, buffer);
+ TUnboxedValueBatch buffer(ctx.GetOutputType());
+ ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer);
- UNIT_ASSERT_VALUES_EQUAL(1, buffer.size());
- UNIT_ASSERT_VALUES_EQUAL(i, buffer[0].GetElement(0).Get<i32>());
- UNIT_ASSERT_VALUES_EQUAL(i * i, buffer[0].GetElement(1).Get<ui64>());
+ UNIT_ASSERT_VALUES_EQUAL(1, buffer.RowCount());
+
+ auto head = buffer.Head();
+ if (ctx.IsWide) {
+ UNIT_ASSERT_VALUES_EQUAL(i, head[0].Get<i32>());
+ UNIT_ASSERT_VALUES_EQUAL(i * i, head[1].Get<ui64>());
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(i, head->GetElement(0).Get<i32>());
+ UNIT_ASSERT_VALUES_EQUAL(i * i, head->GetElement(1).Get<ui64>());
+ }
}
NDqProto::TData data;
@@ -318,12 +408,12 @@ void TestSpillWithMockStorage(TTestContext& ctx) {
auto storage = MakeIntrusive<TMockChannelStorage>(100'500ul);
settings.ChannelStorage = storage;
- auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log);
+ auto ch = CreateDqOutputChannel(1, ctx.GetOutputType(), ctx.HolderFactory, settings, Log);
for (i32 i = 0; i < 35; ++i) {
auto row = ctx.CreateRow(i);
UNIT_ASSERT(!ch->IsFull());
- ch->Push(std::move(row));
+ PushRow(ctx, std::move(row), ch);
}
UNIT_ASSERT_VALUES_EQUAL(35, ch->GetValuesCount());
@@ -338,16 +428,9 @@ void TestSpillWithMockStorage(TTestContext& ctx) {
NDqProto::TData data;
while (ch->Pop(data)) {
- TUnboxedValueVector buffer;
- ctx.Ds.Deserialize(data, ctx.OutputType, buffer);
-
- UNIT_ASSERT_VALUES_EQUAL(data.GetRows(), buffer.size());
- for (ui32 i = 0; i < data.GetRows(); ++i) {
- auto j = loadedRows + i;
- UNIT_ASSERT_VALUES_EQUAL(j, buffer[i].GetElement(0).Get<i32>());
- UNIT_ASSERT_VALUES_EQUAL(j * j, buffer[i].GetElement(1).Get<ui64>());
- }
-
+ TUnboxedValueBatch buffer(ctx.GetOutputType());
+ ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer);
+ ValidateBatch(ctx, buffer, loadedRows, data.GetRows());
loadedRows += data.GetRows();
}
UNIT_ASSERT_VALUES_EQUAL(35, loadedRows);
@@ -360,23 +443,16 @@ void TestSpillWithMockStorage(TTestContext& ctx) {
for (i32 i = 100; i < 105; ++i) {
auto row = ctx.CreateRow(i);
UNIT_ASSERT(!ch->IsFull());
- ch->Push(std::move(row));
+ PushRow(ctx, std::move(row), ch);
}
UNIT_ASSERT_VALUES_EQUAL(5, ch->GetValuesCount());
NDqProto::TData data;
while (ch->Pop(data)) {
- TUnboxedValueVector buffer;
- ctx.Ds.Deserialize(data, ctx.OutputType, buffer);
-
- UNIT_ASSERT_VALUES_EQUAL(data.GetRows(), buffer.size());
- for (ui32 i = 0; i < data.GetRows(); ++i) {
- auto j = 100 + loadedRows + i;
- UNIT_ASSERT_VALUES_EQUAL(j, buffer[i].GetElement(0).Get<i32>());
- UNIT_ASSERT_VALUES_EQUAL(j * j, buffer[i].GetElement(1).Get<ui64>());
- }
-
+ TUnboxedValueBatch buffer(ctx.GetOutputType());
+ ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer);
+ ValidateBatch(ctx, buffer, loadedRows + 100, data.GetRows());
loadedRows += data.GetRows();
}
UNIT_ASSERT_VALUES_EQUAL(5, loadedRows);
@@ -394,12 +470,12 @@ void TestOverflowWithMockStorage(TTestContext& ctx) {
auto storage = MakeIntrusive<TMockChannelStorage>(500ul);
settings.ChannelStorage = storage;
- auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log);
+ auto ch = CreateDqOutputChannel(1, ctx.GetOutputType(), ctx.HolderFactory, settings, Log);
for (i32 i = 0; i < 42; ++i) {
auto row = ctx.CreateRow(i);
UNIT_ASSERT(!ch->IsFull());
- ch->Push(std::move(row));
+ PushRow(ctx, std::move(row), ch);
}
UNIT_ASSERT_VALUES_EQUAL(42, ch->GetStats()->RowsIn);
@@ -407,7 +483,7 @@ void TestOverflowWithMockStorage(TTestContext& ctx) {
// UNIT_ASSERT(ch->IsFull()); it can be false-negative with storage enabled
try {
- ch->Push(ctx.CreateBigRow(0, 100'500));
+ PushRow(ctx, ctx.CreateBigRow(0, 100'500), ch);
UNIT_FAIL("");
} catch (yexception &e) {
UNIT_ASSERT(TString(e.what()).Contains("Space limit exceeded"));
@@ -439,7 +515,36 @@ Y_UNIT_TEST(PopAll) {
}
Y_UNIT_TEST(BigRow) {
- TTestContext ctx(NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true);
+ TTestContext ctx(NARROW_CHANNEL, NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true);
+ TestBigRow(ctx);
+}
+
+}
+
+Y_UNIT_TEST_SUITE(DqOutputWideChannelTests) {
+
+Y_UNIT_TEST(SingleRead) {
+ TTestContext ctx(WIDE_CHANNEL);
+ TestSingleRead(ctx);
+}
+
+Y_UNIT_TEST(PartialRead) {
+ TTestContext ctx(WIDE_CHANNEL);
+ TestPartialRead(ctx);
+}
+
+Y_UNIT_TEST(Overflow) {
+ TTestContext ctx(WIDE_CHANNEL);
+ TestOverflow(ctx);
+}
+
+Y_UNIT_TEST(PopAll) {
+ TTestContext ctx(WIDE_CHANNEL);
+ TestPopAll(ctx);
+}
+
+Y_UNIT_TEST(BigRow) {
+ TTestContext ctx(WIDE_CHANNEL, NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true);
TestBigRow(ctx);
}
@@ -453,7 +558,21 @@ Y_UNIT_TEST(Spill) {
}
Y_UNIT_TEST(Overflow) {
- TTestContext ctx(NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true);
+ TTestContext ctx(NARROW_CHANNEL, NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true);
+ TestOverflowWithMockStorage(ctx);
+}
+
+}
+
+Y_UNIT_TEST_SUITE(DqOutputWideChannelWithStorageTests) {
+
+Y_UNIT_TEST(Spill) {
+ TTestContext ctx(WIDE_CHANNEL);
+ TestSpillWithMockStorage(ctx);
+}
+
+Y_UNIT_TEST(Overflow) {
+ TTestContext ctx(WIDE_CHANNEL, NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true);
TestOverflowWithMockStorage(ctx);
}
diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
index da9e8315c0b..a74f6787386 100644
--- a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
@@ -27,6 +27,12 @@ public:
return AnyOf(Consumers, [](const auto& consumer) { return consumer->IsFull(); });
}
+ void WideConsume(TUnboxedValue* values, ui32 count) override {
+ Y_UNUSED(values);
+ Y_UNUSED(count);
+ YQL_ENSURE(false, "WideConsume is not supported");
+ }
+
void Consume(TUnboxedValue&& value) override {
if (Consumers.size() == 1) {
Consumers[0]->Consume(std::move(value));
@@ -68,6 +74,10 @@ public:
Output->Push(std::move(value));
}
+ void WideConsume(TUnboxedValue* values, ui32 count) override {
+ Output->WidePush(values, count);
+ }
+
void Consume(NDqProto::TCheckpoint&& checkpoint) override {
Output->Push(std::move(checkpoint));
}
@@ -84,6 +94,7 @@ class TDqOutputHashPartitionConsumer : public IDqOutputConsumer {
private:
mutable bool IsWaitingFlag = false;
mutable TUnboxedValue WaitingValue;
+ mutable TUnboxedValueVector WideWaitingValues;
mutable IDqOutput::TPtr OutputWaiting;
protected:
void DrainWaiting() const {
@@ -102,16 +113,22 @@ protected:
}
public:
TDqOutputHashPartitionConsumer(TVector<IDqOutput::TPtr>&& outputs,
- TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices)
+ TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices,
+ TMaybe<ui32> outputWidth)
: Outputs(std::move(outputs))
, KeyColumnIndices(std::move(keyColumnIndices))
, ValueHashers(KeyColumnIndices.size(), NUdf::IHash::TPtr{})
+ , OutputWidth(outputWidth)
{
MKQL_ENSURE_S(keyColumnTypes.size() == KeyColumnIndices.size());
for (auto i = 0U; i < keyColumnTypes.size(); i++) {
ValueHashers[i] = MakeHashImpl(keyColumnTypes[i]);
}
+
+ if (outputWidth.Defined()) {
+ WideWaitingValues.resize(*outputWidth);
+ }
}
bool IsFull() const override {
@@ -120,6 +137,7 @@ public:
}
void Consume(TUnboxedValue&& value) final {
+ YQL_ENSURE(!OutputWidth.Defined());
ui32 partitionIndex = GetHashPartitionIndex(value);
if (Outputs[partitionIndex]->IsFull()) {
YQL_ENSURE(!IsWaitingFlag);
@@ -131,6 +149,21 @@ public:
}
}
+ void WideConsume(TUnboxedValue* values, ui32 count) final {
+ YQL_ENSURE(OutputWidth.Defined() && count == OutputWidth);
+ ui32 partitionIndex = GetHashPartitionIndex(values);
+ if (Outputs[partitionIndex]->IsFull()) {
+ YQL_ENSURE(!IsWaitingFlag);
+ IsWaitingFlag = true;
+ OutputWaiting = Outputs[partitionIndex];
+ for (ui32 i = 0; i < count; ++i) {
+ WideWaitingValues[i] = std::move(values[i]);
+ }
+ } else {
+ Outputs[partitionIndex]->WidePush(values, count);
+ }
+ }
+
void Consume(NDqProto::TCheckpoint&& checkpoint) override {
for (auto& output : Outputs) {
output->Push(NDqProto::TCheckpoint(checkpoint));
@@ -155,6 +188,17 @@ private:
return hash % Outputs.size();
}
+ size_t GetHashPartitionIndex(const TUnboxedValue* values) {
+ ui64 hash = 0;
+
+ for (size_t keyId = 0; keyId < KeyColumnIndices.size(); keyId++) {
+ MKQL_ENSURE_S(KeyColumnIndices[keyId] < OutputWidth);
+ hash = CombineHashes(hash, HashColumn(keyId, values[KeyColumnIndices[keyId]]));
+ }
+
+ return hash % Outputs.size();
+ }
+
ui64 HashColumn(size_t keyId, const TUnboxedValue& value) const {
if (!value.HasValue()) {
return 0;
@@ -166,24 +210,38 @@ private:
TVector<IDqOutput::TPtr> Outputs;
TVector<ui32> KeyColumnIndices;
TVector<NUdf::IHash::TPtr> ValueHashers;
+ const TMaybe<ui32> OutputWidth;
};
class TDqOutputBroadcastConsumer : public IDqOutputConsumer {
public:
- TDqOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs)
- : Outputs(std::move(outputs)) {}
+ TDqOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui32> outputWidth)
+ : Outputs(std::move(outputs))
+ , OutputWidth(outputWidth)
+ , Tmp(outputWidth.Defined() ? *outputWidth : 0u)
+ {
+ }
bool IsFull() const override {
return AnyOf(Outputs, [](const auto& output) { return output->IsFull(); });
}
void Consume(TUnboxedValue&& value) final {
+ YQL_ENSURE(!OutputWidth.Defined());
for (auto& output : Outputs) {
TUnboxedValue copy{ value };
output->Push(std::move(copy));
}
}
+ void WideConsume(TUnboxedValue* values, ui32 count) final {
+ YQL_ENSURE(OutputWidth.Defined() && OutputWidth == count);
+ for (auto& output : Outputs) {
+ std::copy(values, values + count, Tmp.begin());
+ output->WidePush(Tmp.data(), count);
+ }
+ }
+
void Consume(NDqProto::TCheckpoint&& checkpoint) override {
for (auto& output : Outputs) {
output->Push(NDqProto::TCheckpoint(checkpoint));
@@ -198,6 +256,8 @@ public:
private:
TVector<IDqOutput::TPtr> Outputs;
+ const TMaybe<ui32> OutputWidth;
+ TUnboxedValueVector Tmp;
};
} // namespace
@@ -212,14 +272,14 @@ IDqOutputConsumer::TPtr CreateOutputMapConsumer(IDqOutput::TPtr output) {
IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer(
TVector<IDqOutput::TPtr>&& outputs,
- TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices)
+ TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices, TMaybe<ui32> outputWidth)
{
return MakeIntrusive<TDqOutputHashPartitionConsumer>(std::move(outputs), std::move(keyColumnTypes),
- std::move(keyColumnIndices));
+ std::move(keyColumnIndices), outputWidth);
}
-IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs) {
- return MakeIntrusive<TDqOutputBroadcastConsumer>(std::move(outputs));
+IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui32> outputWidth) {
+ return MakeIntrusive<TDqOutputBroadcastConsumer>(std::move(outputs), outputWidth);
}
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.h b/ydb/library/yql/dq/runtime/dq_output_consumer.h
index cdfcec0eee3..8bb7ea1ece7 100644
--- a/ydb/library/yql/dq/runtime/dq_output_consumer.h
+++ b/ydb/library/yql/dq/runtime/dq_output_consumer.h
@@ -26,6 +26,7 @@ public:
}
virtual bool IsFull() const = 0;
virtual void Consume(NKikimr::NUdf::TUnboxedValue&& value) = 0;
+ virtual void WideConsume(NKikimr::NUdf::TUnboxedValue* values, ui32 count) = 0;
virtual void Consume(NDqProto::TCheckpoint&& checkpoint) = 0;
virtual void Finish() = 0;
bool IsFinishing() const {
@@ -43,9 +44,9 @@ IDqOutputConsumer::TPtr CreateOutputMapConsumer(IDqOutput::TPtr output);
IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer(
TVector<IDqOutput::TPtr>&& outputs,
- TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices);
+ TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices, TMaybe<ui32> outputWidth);
-IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs);
+IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui32> outputWidth);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 19ffc4651b1..6c1542a9998 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -163,6 +163,10 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con
IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type,
const NMiniKQL::TTypeEnvironment& typeEnv, TVector<IDqOutput::TPtr>&& outputs)
{
+ TMaybe<ui32> outputWidth;
+ if (type->IsMulti()) {
+ outputWidth = static_cast<const NMiniKQL::TMultiType*>(type)->GetElementsCount();
+ }
auto guard = typeEnv.BindAllocator();
switch (outputDesc.GetTypeCase()) {
case NDqProto::TTaskOutput::kSink:
@@ -186,11 +190,11 @@ IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outpu
}
return CreateOutputHashPartitionConsumer(std::move(outputs), std::move(keyColumnTypes),
- std::move(keyColumnIndices));
+ std::move(keyColumnIndices), outputWidth);
}
case NDqProto::TTaskOutput::kBroadcast: {
- return CreateOutputBroadcastConsumer(std::move(outputs));
+ return CreateOutputBroadcastConsumer(std::move(outputs), outputWidth);
}
case NDqProto::TTaskOutput::kRangePartition: {
@@ -665,6 +669,9 @@ public:
AllocatedHolder->Output = nullptr;
} else if (outputConsumers.size() == 1) {
AllocatedHolder->Output = std::move(outputConsumers[0]);
+ if (entry->OutputItemTypes[0]->IsMulti()) {
+ AllocatedHolder->OutputWideType = static_cast<TMultiType*>(entry->OutputItemTypes[0]);
+ }
} else {
auto guard = BindAllocator();
AllocatedHolder->Output = CreateOutputMultiConsumer(std::move(outputConsumers));
@@ -888,6 +895,12 @@ private:
return ERunStatus::PendingOutput;
}
}
+
+ TUnboxedValueVector wideBuffer;
+ const bool isWide = AllocatedHolder->OutputWideType != nullptr;
+ if (isWide) {
+ wideBuffer.resize(AllocatedHolder->OutputWideType->GetElementsCount());
+ }
while (!AllocatedHolder->Output->IsFull()) {
if (Y_UNLIKELY(CollectProfileStats)) {
auto now = TInstant::Now();
@@ -896,11 +909,20 @@ private:
}
NUdf::TUnboxedValue value;
- auto fetchStatus = AllocatedHolder->ResultStream.Fetch(value);
+ NUdf::EFetchStatus fetchStatus;
+ if (isWide) {
+ fetchStatus = AllocatedHolder->ResultStream.WideFetch(wideBuffer.data(), wideBuffer.size());
+ } else {
+ fetchStatus = AllocatedHolder->ResultStream.Fetch(value);
+ }
switch (fetchStatus) {
case NUdf::EFetchStatus::Ok: {
- AllocatedHolder->Output->Consume(std::move(value));
+ if (isWide) {
+ AllocatedHolder->Output->WideConsume(wideBuffer.data(), wideBuffer.size());
+ } else {
+ AllocatedHolder->Output->Consume(std::move(value));
+ }
break;
}
case NUdf::EFetchStatus::Finish: {
@@ -964,6 +986,7 @@ private:
THashMap<ui64, TOutputTransformInfo> OutputTransforms; // Output index -> Transform
IDqOutputConsumer::TPtr Output;
+ TMultiType* OutputWideType = nullptr;
NUdf::TUnboxedValue ResultStream;
};
diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp
index bfcd936c405..8d66a49abae 100644
--- a/ydb/library/yql/dq/runtime/dq_transport.cpp
+++ b/ydb/library/yql/dq/runtime/dq_transport.cpp
@@ -35,6 +35,31 @@ NDqProto::TData SerializeValuePickleV1(const TType* type, const NUdf::TUnboxedVa
}
template<bool Fast>
+NDqProto::TData SerializeBufferPickleV1(const TType* type, const TUnboxedValueBatch& buffer) {
+ using TPacker = TValuePackerTransport<Fast>;
+
+ TPacker packer(/* stable */ false, type);
+ if (type->IsMulti()) {
+ buffer.ForEachRowWide([&packer](const auto* values, ui32 width) {
+ packer.AddWideItem(values, width);
+ });
+ } else {
+ buffer.ForEachRow([&packer](const auto value) {
+ packer.AddItem(value);
+ });
+ }
+ const auto& packResult = packer.Finish();
+
+ NDqProto::TData data;
+ data.SetTransportVersion(Fast ? NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0 : NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
+ data.MutableRaw()->reserve(packResult.Size());
+ packResult.CopyTo(*data.MutableRaw());
+ data.SetRows(buffer.RowCount());
+
+ return data;
+}
+
+template<bool Fast>
void DeserializeValuePickleV1(const TType* type, const NDqProto::TData& data, NUdf::TUnboxedValue& value,
const THolderFactory& holderFactory)
{
@@ -45,7 +70,7 @@ void DeserializeValuePickleV1(const TType* type, const NDqProto::TData& data, NU
template<bool Fast>
void DeserializeBufferPickleV1(const NDqProto::TData& data, const TType* itemType,
- const THolderFactory& holderFactory, TUnboxedValueVector& buffer)
+ const THolderFactory& holderFactory, TUnboxedValueBatch& buffer)
{
using TPacker = TValuePackerTransport<Fast>;
TPacker packer(/* stable */ false, itemType);
@@ -71,8 +96,24 @@ NDqProto::TData TDqDataSerializer::Serialize(const NUdf::TUnboxedValue& value, c
}
}
+NDqProto::TData TDqDataSerializer::Serialize(const NKikimr::NMiniKQL::TUnboxedValueBatch& buffer,
+ const NKikimr::NMiniKQL::TType* itemType) const
+{
+ auto guard = TypeEnv.BindAllocator();
+ switch (TransportVersion) {
+ case NDqProto::DATA_TRANSPORT_VERSION_UNSPECIFIED:
+ case NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0:
+ return SerializeBufferPickleV1<false>(itemType, buffer);
+ case NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0:
+ return SerializeBufferPickleV1<true>(itemType, buffer);
+ default:
+ YQL_ENSURE(false, "Unsupported TransportVersion");
+ }
+
+}
+
void TDqDataSerializer::Deserialize(const NDqProto::TData& data, const TType* itemType,
- TUnboxedValueVector& buffer) const
+ TUnboxedValueBatch& buffer) const
{
auto guard = TypeEnv.BindAllocator();
switch (data.GetTransportVersion()) {
diff --git a/ydb/library/yql/dq/runtime/dq_transport.h b/ydb/library/yql/dq/runtime/dq_transport.h
index b22e947cb78..469383c8a13 100644
--- a/ydb/library/yql/dq/runtime/dq_transport.h
+++ b/ydb/library/yql/dq/runtime/dq_transport.h
@@ -7,6 +7,7 @@
#include <ydb/library/yql/ast/yql_expr.h>
#include <ydb/library/yql/minikql/mkql_function_registry.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack.h>
@@ -23,6 +24,7 @@ public:
NDqProto::EDataTransportVersion GetTransportVersion() const;
NDqProto::TData Serialize(const NUdf::TUnboxedValue& value, const NKikimr::NMiniKQL::TType* itemType) const;
+ NDqProto::TData Serialize(const NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, const NKikimr::NMiniKQL::TType* itemType) const;
template <class TForwardIterator>
NDqProto::TData Serialize(TForwardIterator first, TForwardIterator last, const NKikimr::NMiniKQL::TType* itemType) const {
@@ -41,7 +43,7 @@ public:
}
void Deserialize(const NDqProto::TData& data, const NKikimr::NMiniKQL::TType* itemType,
- NKikimr::NMiniKQL::TUnboxedValueVector& buffer) const;
+ NKikimr::NMiniKQL::TUnboxedValueBatch& buffer) const;
void Deserialize(const NDqProto::TData& data, const NKikimr::NMiniKQL::TType* itemType, NUdf::TUnboxedValue& value) const;
struct TEstimateSizeSettings {
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h b/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h
index 96c8012df53..b88542f70fb 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h
@@ -46,6 +46,225 @@ using TUnboxedValueDeque = std::deque<NUdf::TUnboxedValue, TMKQLAllocator<NUdf::
using TKeyPayloadPair = std::pair<NUdf::TUnboxedValue, NUdf::TUnboxedValue>;
using TKeyPayloadPairVector = std::vector<TKeyPayloadPair, TMKQLAllocator<TKeyPayloadPair>>;
+class TUnboxedValueBatch {
+ // TUnboxedValueBatch represents column values for RowCount rows
+ // If wide encoding is used and each row contains Width columns, Values consists of Width * RowCount items:
+ // first Width elements correspond to first row,
+ // second Width elements - to second row, etc
+ // For narrow encoding, each row is represented as a single item (a struct) - so Width is equal to 1
+private:
+ using TBottomType = TUnboxedValueVector;
+ using TTopType = std::deque<TBottomType>;
+
+public:
+ using value_type = NUdf::TUnboxedValue;
+
+ explicit TUnboxedValueBatch(const TType* rowType = nullptr)
+ : Width_((rowType && rowType->IsMulti()) ? static_cast<const TMultiType*>(rowType)->GetElementsCount() : 1u)
+ , IsWide_(rowType && rowType->IsMulti())
+ , PageSize_(GetPageSize(Width_))
+ {
+ }
+
+ TUnboxedValueBatch(const TUnboxedValueBatch& other) = default;
+ TUnboxedValueBatch& operator=(const TUnboxedValueBatch& other) = default;
+
+ TUnboxedValueBatch(TUnboxedValueBatch&& other)
+ : Width_(other.Width_)
+ , IsWide_(other.IsWide_)
+ , PageSize_(other.PageSize_)
+ , Values_(std::move(other.Values_))
+ , RowOffset_(other.RowOffset_)
+ , RowCount_(other.RowCount_)
+ {
+ other.clear();
+ }
+
+ inline void clear() {
+ Values_.clear();
+ RowOffset_ = RowCount_ = 0;
+ }
+
+ inline bool empty() const {
+ return RowCount_ == 0;
+ }
+
+ inline void swap(TUnboxedValueBatch& other) {
+ std::swap(Width_, other.Width_);
+ std::swap(PageSize_, other.PageSize_);
+ std::swap(Values_, other.Values_);
+ std::swap(RowOffset_, other.RowOffset_);
+ std::swap(RowCount_, other.RowCount_);
+ }
+
+ template<typename... TArgs>
+ void emplace_back(TArgs&&... args) {
+ MKQL_ENSURE(!IsWide(), "emplace_back() should not be used for wide batch");
+ if (Values_.empty() || Values_.back().size() == Values_.back().capacity()) {
+ Values_.emplace_back();
+ Values_.back().reserve(PageSize_);
+ }
+ Values_.back().emplace_back(std::forward<TArgs>(args)...);
+ RowCount_++;
+ }
+
+ inline void push_back(const value_type& row) {
+ emplace_back(row);
+ }
+
+ inline void push_back(value_type&& row) {
+ emplace_back(std::move(row));
+ }
+
+ template<typename TFunc>
+ auto ForEachRow(const TFunc& cb) const {
+ MKQL_ENSURE(!IsWide(), "ForEachRowWide() should be used instead");
+ return DoForEachRow<const NUdf::TUnboxedValue, const TUnboxedValueBatch>(this,
+ [&cb](const NUdf::TUnboxedValue* values, ui32 width) {
+ Y_VERIFY_DEBUG(width == 1);
+ Y_VERIFY_DEBUG(values);
+ return cb(*values);
+ });
+ }
+
+ template<typename TFunc>
+ auto ForEachRow(const TFunc& cb) {
+ MKQL_ENSURE(!IsWide(), "ForEachRowWide() should be used instead");
+ return DoForEachRow<NUdf::TUnboxedValue, TUnboxedValueBatch>(this,
+ [&cb](NUdf::TUnboxedValue* values, ui32 width) {
+ Y_VERIFY_DEBUG(width == 1);
+ Y_VERIFY_DEBUG(values);
+ return cb(*values);
+ });
+ }
+
+ template<typename TFunc>
+ auto ForEachRowWide(const TFunc& cb) const {
+ MKQL_ENSURE(IsWide(), "ForEachRow() should be used instead");
+ return DoForEachRow<const NUdf::TUnboxedValue, const TUnboxedValueBatch>(this, cb);
+ }
+
+ template<typename TFunc>
+ auto ForEachRowWide(const TFunc& cb) {
+ MKQL_ENSURE(IsWide(), "ForEachRow() should be used instead");
+ return DoForEachRow<NUdf::TUnboxedValue, TUnboxedValueBatch>(this, cb);
+ }
+
+ inline TMaybe<ui32> Width() const {
+ return IsWide_ ? Width_ : TMaybe<ui32>{};
+ }
+
+ inline bool IsWide() const {
+ return IsWide_;
+ }
+
+ inline ui64 RowCount() const {
+ return RowCount_;
+ }
+
+ const value_type* Head() const {
+ MKQL_ENSURE(RowCount_, "Head() on empty batch");
+ return Width_ ? &Values_.front()[RowOffset_ * Width_] : nullptr;
+ }
+
+ value_type* Head() {
+ MKQL_ENSURE(RowCount_, "Head() on empty batch");
+ return Width_ ? &Values_.front()[RowOffset_ * Width_] : nullptr;
+ }
+
+ inline void Pop(size_t rowCount = 1) {
+ MKQL_ENSURE(rowCount <= RowCount_, "Invalid arg");
+ ui64 newStartOffset = (RowOffset_ + rowCount) * Width_;
+ while (newStartOffset >= PageSize_) {
+ MKQL_ENSURE_S(!Values_.empty());
+ Values_.pop_front();
+ newStartOffset -= PageSize_;
+ }
+
+ RowOffset_ = Width_ ? newStartOffset / Width_ : 0;
+ RowCount_ -= rowCount;
+ }
+
+ template<typename TFunc>
+ void PushRow(const TFunc& producer) {
+ ReserveNextRow();
+ for (ui32 i = 0; i < Width_; ++i) {
+ Values_.back().emplace_back(producer(i));
+ }
+ ++RowCount_;
+ }
+
+ void PushRow(NUdf::TUnboxedValue* values, ui32 width) {
+ Y_VERIFY_DEBUG(width == Width_);
+ ReserveNextRow();
+ for (ui32 i = 0; i < Width_; ++i) {
+ Values_.back().emplace_back(std::move(values[i]));
+ }
+ ++RowCount_;
+ }
+
+private:
+ static const size_t DesiredPageSize = 1024;
+ static inline size_t GetPageSize(size_t width) {
+ if (!width) {
+ return DesiredPageSize;
+ }
+ size_t pageSize = DesiredPageSize + width - 1;
+ return pageSize - pageSize % width;
+ }
+
+ inline void ReserveNextRow() {
+ bool full = Width_ && (Values_.empty() || Values_.back().size() == PageSize_);
+ if (full) {
+ Values_.emplace_back();
+ Values_.back().reserve(PageSize_);
+ }
+ }
+
+ template<typename TValue, typename TParent, typename TFunc>
+ static auto DoForEachRow(TParent* parent, const TFunc& cb) {
+ using TReturn = typename std::result_of<TFunc(TValue*, ui32)>::type;
+
+ auto currTop = parent->Values_.begin();
+
+ Y_VERIFY_DEBUG(parent->PageSize_ > parent->RowOffset_);
+ Y_VERIFY_DEBUG(parent->Width_ == 0 || (parent->PageSize_ - parent->RowOffset_) % parent->Width_ == 0);
+
+ size_t valuesOnPage = parent->PageSize_ - parent->RowOffset_;
+
+ TValue* values = (parent->Width_ && parent->RowCount_) ? currTop->data() + parent->RowOffset_ : nullptr;
+ for (size_t i = 0; i < parent->RowCount_; ++i) {
+ if constexpr (std::is_same_v<TReturn, bool>) {
+ if (!cb(values, parent->Width_)) {
+ return false;
+ }
+ } else {
+ static_assert(std::is_same_v<TReturn, void>, "Callback should either return bool or void");
+ cb(values, parent->Width_);
+ }
+ values += parent->Width_;
+ valuesOnPage -= parent->Width_;
+ if (!valuesOnPage) {
+ valuesOnPage = parent->PageSize_;
+ ++currTop;
+ values = currTop->data();
+ }
+ }
+
+ if constexpr (std::is_same_v<TReturn, bool>) {
+ return true;
+ }
+ }
+
+ ui32 Width_;
+ bool IsWide_;
+ size_t PageSize_;
+
+ TTopType Values_;
+ ui64 RowOffset_ = 0;
+ ui64 RowCount_ = 0;
+};
+
inline int CompareValues(NUdf::EDataSlot type, bool asc, bool isOptional, const NUdf::TUnboxedValuePod& lhs, const NUdf::TUnboxedValuePod& rhs) {
int cmp;
if (isOptional) {
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
index 9c375c5d6a4..91684b1cbea 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
@@ -216,6 +216,16 @@ bool HasOptionalFields(const TType* type) {
return HasOptionalFields(taggedType->GetBaseType());
}
+ case TType::EKind::Multi: {
+ auto multiType = static_cast<const TMultiType*>(type);
+ for (ui32 index = 0; index < multiType->GetElementsCount(); ++index) {
+ if (HasOptionalFields(multiType->GetElementType(index))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
default:
THROW yexception() << "Unsupported type: " << type->GetKindAsStr();
}
@@ -889,6 +899,7 @@ TStringBuf TValuePackerGeneric<Fast>::Pack(const NUdf::TUnboxedValuePod& value)
template<bool Fast>
TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type)
: Type_(type)
+ , Width_(type->IsMulti() ? static_cast<const TMultiType*>(type)->GetElementsCount() : TMaybe<ui32>())
, State_(ScanTypeProperties(Type_, false))
, IncrementalState_(ScanTypeProperties(Type_, true))
{
@@ -909,7 +920,7 @@ NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TStringBuf buf, const TH
}
template<bool Fast>
-void TValuePackerTransport<Fast>::UnpackBatch(TStringBuf buf, const THolderFactory& holderFactory, TUnboxedValueVector& result) const {
+void TValuePackerTransport<Fast>::UnpackBatch(TStringBuf buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const {
auto& s = IncrementalState_;
ui64 len;
ui32 topLength;
@@ -930,9 +941,21 @@ void TValuePackerTransport<Fast>::UnpackBatch(TStringBuf buf, const THolderFacto
len = NDetails::GetRawData<ui64>(buf);
}
- result.reserve(len + result.size());
- for (ui64 i = 0; i < len; ++i) {
- result.emplace_back(UnpackFromContigousBuffer<Fast>(itemType, buf, topLength, holderFactory, s));
+ if (Type_->IsMulti()) {
+ auto multiType = static_cast<const TMultiType*>(Type_);
+ const ui32 width = multiType->GetElementsCount();
+ Y_VERIFY_DEBUG(result.IsWide());
+ Y_VERIFY_DEBUG(result.Width() == width);
+ for (ui64 i = 0; i < len; ++i) {
+ result.PushRow([&](ui32 j) {
+ return UnpackFromContigousBuffer<Fast>(multiType->GetElementType(j), buf, topLength, holderFactory, s);
+ });
+ }
+ } else {
+ Y_VERIFY_DEBUG(!result.IsWide());
+ for (ui64 i = 0; i < len; ++i) {
+ result.emplace_back(UnpackFromContigousBuffer<Fast>(itemType, buf, topLength, holderFactory, s));
+ }
}
MKQL_ENSURE(buf.empty(), "Bad packed data. Not fully data read");
}
@@ -953,17 +976,23 @@ const TPagedBuffer& TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValueP
}
template<bool Fast>
+void TValuePackerTransport<Fast>::StartPack() {
+ Buffer_.Clear();
+ if constexpr (Fast) {
+ // reserve place for list item count
+ Buffer_.ReserveHeader(sizeof(ItemCount_));
+ } else {
+ IncrementalState_.OptionalUsageMask.Reset();
+ Buffer_.ReserveHeader(sizeof(ui32) + State_.OptionalMaskReserve + MAX_PACKED64_SIZE);
+ }
+}
+
+template<bool Fast>
TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddItem(const NUdf::TUnboxedValuePod& value) {
+ Y_VERIFY_DEBUG(!Type_->IsMulti());
const TType* itemType = Type_;
if (!ItemCount_) {
- Buffer_.Clear();
- if constexpr (Fast) {
- // reserve place for list item count
- Buffer_.ReserveHeader(sizeof(ItemCount_));
- } else {
- IncrementalState_.OptionalUsageMask.Reset();
- Buffer_.ReserveHeader(sizeof(ui32) + State_.OptionalMaskReserve + MAX_PACKED64_SIZE);
- }
+ StartPack();
}
PackImpl<Fast, false>(itemType, Buffer_, value, IncrementalState_);
@@ -972,6 +1001,22 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddItem(const NUdf::TU
}
template<bool Fast>
+TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItem(const NUdf::TUnboxedValuePod* values, ui32 width) {
+ Y_VERIFY_DEBUG(Type_->IsMulti());
+ Y_VERIFY_DEBUG(static_cast<const TMultiType*>(Type_)->GetElementsCount() == width);
+ const TMultiType* itemType = static_cast<const TMultiType*>(Type_);
+ if (!ItemCount_) {
+ StartPack();
+ }
+
+ for (ui32 i = 0; i < width; ++i) {
+ PackImpl<Fast, false>(itemType->GetElementType(i), Buffer_, values[i], IncrementalState_);
+ }
+ ++ItemCount_;
+ return *this;
+}
+
+template<bool Fast>
void TValuePackerTransport<Fast>::Clear() {
Buffer_.Clear();
ItemCount_ = 0;
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
index 16c7d423f18..53710d46029 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
@@ -78,6 +78,7 @@ public:
// AddItem()/UnpackBatch() will perform incremental packing - type T is processed as list item type. Will produce List<T> layout
TSelf& AddItem(const NUdf::TUnboxedValuePod& value);
+ TSelf& AddWideItem(const NUdf::TUnboxedValuePod* values, ui32 count);
size_t PackedSizeEstimate() const {
return Buffer_.Size() + Buffer_.ReservedHeaderSize();
}
@@ -89,11 +90,13 @@ public:
// reference is valid till the next call to Pack()
const TPagedBuffer& Pack(const NUdf::TUnboxedValuePod &value) const;
NUdf::TUnboxedValue Unpack(TStringBuf buf, const THolderFactory& holderFactory) const;
- void UnpackBatch(TStringBuf buf, const THolderFactory& holderFactory, TUnboxedValueVector& result) const;
+ void UnpackBatch(TStringBuf buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const;
private:
void BuildMeta(bool addItemCount) const;
+ void StartPack();
const TType* const Type_;
+ const TMaybe<ui32> Width_;
ui64 ItemCount_ = 0;
mutable TPagedBuffer Buffer_;
mutable NDetails::TPackerState State_;
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp
index 1ceb8ec4047..b750210c2a7 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp
@@ -548,13 +548,13 @@ protected:
UNIT_ASSERT_VALUES_EQUAL(std::string_view(uVal.AsStringRef()), str);
}
- TUnboxedValueVector items;
+ TUnboxedValueBatch items;
packer.UnpackBatch(serializedStr, HolderFactory, items);
- UNIT_ASSERT_VALUES_EQUAL(items.size(), count);
- for (auto& uVal : items) {
- UNIT_ASSERT(uVal);
- UNIT_ASSERT_VALUES_EQUAL(std::string_view(uVal.AsStringRef()), str);
- }
+ UNIT_ASSERT_VALUES_EQUAL(items.RowCount(), count);
+ items.ForEachRow([&](const NUdf::TUnboxedValue& value) {
+ UNIT_ASSERT(value);
+ UNIT_ASSERT_VALUES_EQUAL(std::string_view(value.AsStringRef()), str);
+ });
}
}
diff --git a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp
index 1d97b87d40b..6dd36387665 100644
--- a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp
+++ b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp
@@ -87,7 +87,7 @@ private:
}
}
- i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>&, bool& finished, i64 /*freeSpace*/) final {
+ i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>&, bool& finished, i64 /*freeSpace*/) final {
if (Result) {
const auto size = Result->size();
buffer.emplace_back(NKikimr::NMiniKQL::MakeString(std::string_view(*Result)));
diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp
index 402e9e73115..61e4be5668a 100644
--- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp
+++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp
@@ -109,6 +109,7 @@ TFakeCASetup::~TFakeCASetup() {
void TFakeCASetup::AsyncOutputWrite(const TWriteValueProducer valueProducer, TMaybe<NDqProto::TCheckpoint> checkpoint, bool finish) {
Execute([&valueProducer, checkpoint, finish](TFakeActor& actor) {
auto batch = valueProducer(actor.GetHolderFactory());
+
Y_ASSERT(actor.DqAsyncOutput);
actor.DqAsyncOutput->SendData(std::move(batch), 0, checkpoint, finish);
});
diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
index 90e08b96158..8bee231a43c 100644
--- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
+++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
@@ -24,7 +24,7 @@ using TRuntimePtr = std::unique_ptr<NActors::TTestActorRuntime>;
using TCallback = std::function<void(TFakeActor&)>;
template<typename T>
using TReadValueParser = std::function<std::vector<T>(const NUdf::TUnboxedValue&)>;
-using TWriteValueProducer = std::function<NKikimr::NMiniKQL::TUnboxedValueVector(NKikimr::NMiniKQL::THolderFactory&)>;
+using TWriteValueProducer = std::function<NKikimr::NMiniKQL::TUnboxedValueBatch(NKikimr::NMiniKQL::THolderFactory&)>;
namespace {
struct TEvPrivate {
@@ -191,15 +191,15 @@ struct TFakeCASetup {
NThreading::TFuture<bool> nextDataFuture;
Execute([&result, &parser, freeSpace, &nextDataFutureOut, this](TFakeActor& actor) {
TMaybe<TInstant> watermark;
- NKikimr::NMiniKQL::TUnboxedValueVector buffer;
+ NKikimr::NMiniKQL::TUnboxedValueBatch buffer;
bool finished = false;
actor.DqAsyncInput->GetAsyncInputData(buffer, watermark, finished, freeSpace);
- for (const auto& uv : buffer) {
- for (const auto item : parser(uv)) {
+ buffer.ForEachRow([&](const NUdf::TUnboxedValue& value) {
+ for (const auto item : parser(value)) {
result.emplace_back(item);
}
- }
+ });
if (watermark) {
result.emplace_back(*watermark);
diff --git a/ydb/library/yql/providers/dq/actors/proto_builder.cpp b/ydb/library/yql/providers/dq/actors/proto_builder.cpp
index cafacb757c1..1019e7d7031 100644
--- a/ydb/library/yql/providers/dq/actors/proto_builder.cpp
+++ b/ydb/library/yql/providers/dq/actors/proto_builder.cpp
@@ -96,15 +96,13 @@ bool TProtoBuilder::WriteData(const NDqProto::TData& data, const std::function<b
const auto transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED;
NDq::TDqDataSerializer dataSerializer(TypeEnv, holderFactory, transportVersion);
- TUnboxedValueVector buffer;
+ YQL_ENSURE(!ResultType->IsMulti());
+ TUnboxedValueBatch buffer(ResultType);
dataSerializer.Deserialize(data, ResultType, buffer);
- for (const auto& item : buffer) {
- if (!func(item)) {
- return false;
- }
- }
- return true;
+ return buffer.ForEachRow([&func](const auto& value) {
+ return func(value);
+ });
}
bool TProtoBuilder::WriteData(const TVector<NDqProto::TData>& rows, const std::function<bool(const NYql::NUdf::TUnboxedValuePod& value)>& func) {
@@ -115,13 +113,13 @@ bool TProtoBuilder::WriteData(const TVector<NDqProto::TData>& rows, const std::f
const auto transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED;
NDq::TDqDataSerializer dataSerializer(TypeEnv, holderFactory, transportVersion);
+ YQL_ENSURE(!ResultType->IsMulti());
+
for (const auto& part : rows) {
- TUnboxedValueVector buffer;
+ TUnboxedValueBatch buffer(ResultType);
dataSerializer.Deserialize(part, ResultType, buffer);
- for (const auto& item : buffer) {
- if (!func(item)) {
- return false;
- }
+ if (!buffer.ForEachRow([&func](const auto& value) { return func(value); })) {
+ return false;
}
}
return true;
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index b329f3e3474..d97928a6614 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -560,7 +560,7 @@ private:
}
auto guard = source.TypeEnv->BindAllocator();
TMaybe<TInstant> watermark;
- NKikimr::NMiniKQL::TUnboxedValueVector batch;
+ NKikimr::NMiniKQL::TUnboxedValueBatch batch;
bool finished = false;
const i64 space = source.Source->GetAsyncInputData(batch, watermark, finished, freeSpace);
const ui64 index = inputIndex;
@@ -706,7 +706,7 @@ private:
void SinkSend(
ui64 index,
- NKikimr::NMiniKQL::TUnboxedValueVector&& batch,
+ NKikimr::NMiniKQL::TUnboxedValueBatch&& batch,
TMaybe<NDqProto::TCheckpoint>&& checkpoint,
i64 size,
i64 checkpointSize,
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
index 655e31c44fb..5ce5c8ab5c6 100644
--- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
+++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
@@ -369,18 +369,15 @@ public:
auto guard = Runner->BindAllocator(0); // Explicitly reset memory limit
NDq::TDqDataSerializer dataSerializer(Runner->GetTypeEnv(), Runner->GetHolderFactory(),
NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED);
- NKikimr::NMiniKQL::TUnboxedValueVector buffer;
- buffer.reserve(request.GetData().GetRows());
+ NKikimr::NMiniKQL::TUnboxedValueBatch buffer(source->GetInputType());
if (request.GetString().empty() && request.GetChunks() == 0) {
dataSerializer.Deserialize(request.GetData(), source->GetInputType(), buffer);
} else if (!request.GetString().empty()) {
- buffer.reserve(request.GetString().size());
for (auto& row : request.GetString()) {
buffer.emplace_back(NKikimr::NMiniKQL::MakeString(row));
}
} else {
i64 chunks = request.GetChunks();
- buffer.reserve(chunks);
for (i64 i = 0; i < chunks; i++) {
NDqProto::TSourcePushChunk chunk;
chunk.Load(&input);
@@ -634,25 +631,23 @@ public:
request.Load(&input);
auto guard = Runner->BindAllocator(0); // Explicitly reset memory limit
- NKikimr::NMiniKQL::TUnboxedValueVector batch;
auto sink = Runner->GetSink(channelId);
auto* outputType = sink->GetOutputType();
+ NKikimr::NMiniKQL::TUnboxedValueBatch batch(outputType);
+ YQL_ENSURE(!batch.IsWide());
auto bytes = sink->Pop(batch, request.GetBytes());
NDqProto::TSinkPopResponse response;
if (request.GetRaw()) {
- for (auto& raw : batch) {
- *response.AddString() = raw.AsStringRef();
- }
+ batch.ForEachRow([&response](const auto& value) {
+ *response.AddString() = value.AsStringRef();
+ });
} else {
NDq::TDqDataSerializer dataSerializer(
Runner->GetTypeEnv(),
Runner->GetHolderFactory(),
NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
-
- *response.MutableData() = dataSerializer.Serialize(
- batch.begin(), batch.end(),
- static_cast<NKikimr::NMiniKQL::TType*>(outputType));
+ *response.MutableData() = dataSerializer.Serialize(batch, outputType);
}
response.SetBytes(bytes);
response.Save(&output);
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index 722ce32ddbb..21294f30735 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -586,7 +586,7 @@ public:
}
[[nodiscard]]
- bool Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch) override {
+ bool Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch) override {
Y_UNUSED(batch);
ythrow yexception() << "unimplemented";
}
@@ -726,11 +726,11 @@ public:
}
}
- void Push(NKikimr::NMiniKQL::TUnboxedValueVector&& batch, i64 space) override {
+ void Push(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) override {
auto inputType = GetInputType();
NDqProto::TSourcePushRequest data;
TDqDataSerializer dataSerializer(TaskRunner->GetTypeEnv(), TaskRunner->GetHolderFactory(), NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
- *data.MutableData() = dataSerializer.Serialize(batch.begin(), batch.end(), static_cast<NKikimr::NMiniKQL::TType*>(inputType));
+ *data.MutableData() = dataSerializer.Serialize(batch, inputType);
data.SetSpace(space);
NDqProto::TCommandHeader header;
@@ -740,10 +740,11 @@ public:
header.SetChannelId(InputIndex);
header.Save(&Output);
data.Save(&Output);
+
}
[[nodiscard]]
- bool Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch) override {
+ bool Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch) override {
Y_UNUSED(batch);
ythrow yexception() << "unimplemented";
}
@@ -912,6 +913,12 @@ public:
ythrow yexception() << "unimplemented";
}
+ void WidePush(NUdf::TUnboxedValue* values, ui32 count) override {
+ Y_UNUSED(values);
+ Y_UNUSED(count);
+ ythrow yexception() << "unimplemented";
+ }
+
void Push(NDqProto::TWatermark&& watermark) override {
Y_UNUSED(watermark);
ythrow yexception() << "unimplemented";
@@ -1087,7 +1094,7 @@ public:
}
}
- ui64 Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch, ui64 bytes) override {
+ ui64 Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, ui64 bytes) override {
try {
NDqProto::TCommandHeader header;
header.SetVersion(5);
@@ -1201,6 +1208,12 @@ public:
Y_FAIL("Unimplemented");
}
+ void WidePush(NUdf::TUnboxedValue* values, ui32 count) override {
+ Y_UNUSED(values);
+ Y_UNUSED(count);
+ Y_FAIL("Unimplemented");
+ }
+
void Push(NDqProto::TWatermark&& watermark) override {
Y_UNUSED(watermark);
Y_FAIL("Watermarks are not supported");
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index 1cac519e70b..6fb14025eba 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -247,7 +247,7 @@ private:
void AsyncInputPush(
ui64 cookie,
ui64 index,
- NKikimr::NMiniKQL::TUnboxedValueVector&& batch,
+ NKikimr::NMiniKQL::TUnboxedValueBatch&& batch,
i64 space,
bool finish) override
{
@@ -255,9 +255,10 @@ private:
auto selfId = SelfId();
TVector<TString> strings;
- for (auto& row : batch) {
- strings.emplace_back(row.AsStringRef());
- }
+ YQL_ENSURE(!batch.IsWide());
+ batch.ForEachRow([&strings](const auto& value) {
+ strings.emplace_back(value.AsStringRef());
+ });
Invoker->Invoke([strings=std::move(strings),taskRunner=TaskRunner, actorSystem, selfId, cookie, parentId=ParentId, space, finish, index, settings=Settings, stageId=StageId]() mutable {
try {
@@ -363,7 +364,7 @@ private:
void OnSinkPopFinished(TEvSinkPopFinished::TPtr& ev) {
auto guard = TaskRunner->BindAllocator();
- NKikimr::NMiniKQL::TUnboxedValueVector batch;
+ NKikimr::NMiniKQL::TUnboxedValueBatch batch;
for (auto& row: ev->Get()->Strings) {
batch.emplace_back(NKikimr::NMiniKQL::MakeString(row));
}
diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
index d97028c5ccf..8e49d47c742 100644
--- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
+++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
@@ -126,8 +126,9 @@ namespace NYql::NDq {
hFunc(TEvPrivate::TEvReadResult, Handle);
hFunc(TEvPrivate::TEvReadError, Handle);)
- i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>&, bool& finished,
+ i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>&, bool& finished,
i64 /*freeSpace*/) final {
+ YQL_ENSURE(!buffer.IsWide(), "Wide stream is not supported");
if (Result) {
NUdf::TUnboxedValue value;
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
index 55dbf335d1b..e415a2f75c8 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
@@ -277,7 +277,7 @@ private:
}
}
- i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>& watermark, bool&, i64 freeSpace) override {
+ i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>& watermark, bool&, i64 freeSpace) override {
SRC_LOG_T("GetAsyncInputData freeSpace = " << freeSpace);
const auto now = TInstant::Now();
@@ -392,14 +392,15 @@ private:
THashMap<NYdb::NPersQueue::TPartitionStream::TPtr, TList<std::pair<ui64, ui64>>> OffsetRanges; // [start, end)
};
- bool MaybeReturnReadyBatch(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>& watermark, i64& usedSpace) {
+ bool MaybeReturnReadyBatch(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>& watermark, i64& usedSpace) {
if (ReadyBuffer.empty()) {
SubscribeOnNextEvent();
return false;
}
auto& readyBatch = ReadyBuffer.front();
- buffer.swap(readyBatch.Data);
+ buffer.clear();
+ std::move(readyBatch.Data.begin(), readyBatch.Data.end(), std::back_inserter(buffer));
watermark = readyBatch.Watermark;
usedSpace = readyBatch.UsedSpace;
@@ -419,7 +420,7 @@ private:
}
SRC_LOG_T("Return ready batch."
- << " DataCount = " << buffer.size()
+ << " DataCount = " << buffer.RowCount()
<< " Watermark = " << (watermark ? ToString(*watermark) : "none")
<< " Used space = " << usedSpace);
return true;
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
index 434e37e6780..c9d763b8581 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
@@ -122,12 +122,12 @@ public:
public:
void SendData(
- NKikimr::NMiniKQL::TUnboxedValueVector&& batch,
+ NKikimr::NMiniKQL::TUnboxedValueBatch&& batch,
i64 dataSize,
const TMaybe<NDqProto::TCheckpoint>& checkpoint,
bool finished) override
{
- SINK_LOG_T("SendData. Batch: " << batch.size()
+ SINK_LOG_T("SendData. Batch: " << batch.RowCount()
<< ". Checkpoint: " << checkpoint.Defined()
<< ". Finished: " << finished);
Y_UNUSED(dataSize);
@@ -138,17 +138,18 @@ public:
CreateSessionIfNotExists();
- for (const NUdf::TUnboxedValue& item : batch) {
- if (!item.IsBoxed()) {
+ Y_VERIFY(!batch.IsWide(), "Wide batch is not supported");
+ if (!batch.ForEachRow([&](const auto& value) {
+ if (!value.IsBoxed()) {
Fail("Struct with single field was expected");
- return;
+ return false;
}
- const NUdf::TUnboxedValue dataCol = item.GetElement(0);
+ const NUdf::TUnboxedValue dataCol = value.GetElement(0);
if (!dataCol.IsString() && !dataCol.IsEmbedded()) {
Fail(TStringBuilder() << "Non string value could not be written to YDS stream");
- return;
+ return false;
}
TString data(dataCol.AsStringRef());
@@ -160,11 +161,14 @@ public:
if (messageSize > MaxMessageSize) {
Fail(TStringBuilder() << "Max message size for YDS is " << MaxMessageSize
<< " bytes but received message with size of " << messageSize << " bytes");
- return;
+ return false;
}
FreeSpace -= messageSize;
Buffer.push(std::move(data));
+ return true;
+ })) {
+ return;
}
if (checkpoint) {
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 48a24c7091f..2ba03ea7d06 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -838,10 +838,9 @@ private:
}
}
- i64 GetAsyncInputData(TUnboxedValueVector& buffer, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
+ i64 GetAsyncInputData(TUnboxedValueBatch& buffer, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
i64 total = 0LL;
if (!Blocks.empty()) {
- buffer.reserve(buffer.size() + Blocks.size());
do {
auto& content = std::get<IHTTPGateway::TContent>(Blocks.front());
const auto size = content.size();
@@ -2462,7 +2461,7 @@ private:
LOG_D("TS3StreamReadActor", "Memory usage. Ready blocks: " << Blocks.size() << ". Ready blocks total size: " << blocksTotalSize);
}
- i64 GetAsyncInputData(TUnboxedValueVector& output, TMaybe<TInstant>&, bool& finished, i64 free) final {
+ i64 GetAsyncInputData(TUnboxedValueBatch& output, TMaybe<TInstant>&, bool& finished, i64 free) final {
ReportMemoryUsage();
i64 total = 0LL;
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index d953464688c..0493626a1ae 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -498,10 +498,11 @@ private:
hFunc(TEvPrivate::TEvUploadFinished, Handle);
)
- void SendData(TUnboxedValueVector&& data, i64, const TMaybe<NDqProto::TCheckpoint>&, bool finished) final {
+ void SendData(TUnboxedValueBatch&& data, i64, const TMaybe<NDqProto::TCheckpoint>&, bool finished) final {
std::unordered_set<TS3FileWriteActor*> processedActors;
- for (const auto& v : data) {
- const auto& key = MakePartitionKey(v);
+ YQL_ENSURE(!data.IsWide(), "Wide stream is not supported yet");
+ data.ForEachRow([&](const auto& row) {
+ const auto& key = MakePartitionKey(row);
const auto [keyIt, insertedNew] = FileWriteActors.emplace(key, std::vector<TS3FileWriteActor*>());
if (insertedNew || keyIt->second.empty() || keyIt->second.back()->IsFinishing()) {
auto fileWrite = std::make_unique<TS3FileWriteActor>(
@@ -516,7 +517,7 @@ private:
RegisterWithSameMailbox(fileWrite.release());
}
- const NUdf::TUnboxedValue& value = Keys.empty() ? v : *v.GetElements();
+ const NUdf::TUnboxedValue& value = Keys.empty() ? row : *row.GetElements();
TS3FileWriteActor* actor = keyIt->second.back();
if (value) {
actor->AddData(TString(value.AsStringRef()));
@@ -525,7 +526,7 @@ private:
actor->Seal();
}
processedActors.insert(actor);
- }
+ });
for (TS3FileWriteActor* actor : processedActors) {
actor->Go();
diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
index c6c216fb083..d59e49f748a 100644
--- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
+++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
@@ -151,12 +151,13 @@ public:
public:
void SendData(
- NKikimr::NMiniKQL::TUnboxedValueVector&& batch,
+ NKikimr::NMiniKQL::TUnboxedValueBatch&& batch,
i64,
const TMaybe<NDqProto::TCheckpoint>& checkpoint,
bool finished) override
{
- SINK_LOG_T("Got " << batch.size() << " items to send. Checkpoint: " << checkpoint.Defined()
+ YQL_ENSURE(!batch.IsWide(), "Wide streams are not supported");
+ SINK_LOG_T("Got " << batch.RowCount() << " items to send. Checkpoint: " << checkpoint.Defined()
<< ". Send queue: " << SendingBuffer.size() << ". Inflight: " << InflightBuffer.size()
<< ". Checkpoint in progress: " << CheckpointInProgress.has_value());
@@ -165,13 +166,13 @@ public:
}
ui64 metricsCount = 0;
- for (const auto& item : batch) {
+ batch.ForEachRow([&](const auto& value) {
if (metricsCount + WriteParams.Shard.GetScheme().GetSensors().size() > MaxMetricsPerRequest) {
PushMetricsToBuffer(metricsCount);
}
- metricsCount += UserMetricsEncoder.Append(item);
- }
+ metricsCount += UserMetricsEncoder.Append(value);
+ });
if (metricsCount != 0) {
PushMetricsToBuffer(metricsCount);
diff --git a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp
index 48c4bebd2f9..745d4a4a41d 100644
--- a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp
+++ b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp
@@ -126,10 +126,9 @@ private:
TActorBootstrapped<TYdbReadActor>::PassAway();
}
- i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
+ i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
i64 total = 0LL;
if (!Blocks.empty()) {
- buffer.reserve(buffer.size() + Blocks.size());
do {
const auto size = Blocks.front().size();
buffer.emplace_back(NKikimr::NMiniKQL::MakeString(Blocks.front()));