aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2023-01-24 16:20:03 +0300
committereivanov89 <eivanov89@ydb.tech>2023-01-24 16:20:03 +0300
commite5993e128fdc420e090421b6d1c368f39a5a48ee (patch)
tree297923d3da65f8bc54f7153ee5a0da5dd4d4697f
parentc4ba73bc1252fb6adf017574a7c9da046d1aa421 (diff)
downloadydb-e5993e128fdc420e090421b6d1c368f39a5a48ee.tar.gz
allow empty column set when read iterator reads in cellvec format
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp2
-rw-r--r--ydb/core/protos/tx_datashard.proto9
-rw-r--r--ydb/core/tx/datashard/datashard.cpp16
-rw-r--r--ydb/core/tx/datashard/datashard.h21
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp57
-rw-r--r--ydb/core/tx/datashard/datashard_ut_read_iterator.cpp56
6 files changed, 138 insertions, 23 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index b52521af4d..277eed9026 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -692,7 +692,7 @@ public:
} else {
hasResultColumns = true;
stats.AddStatistics(
- NMiniKQL::WriteColumnValuesFromArrow(editAccessors, *result->Get()->ArrowBatch, columnIndex, type)
+ NMiniKQL::WriteColumnValuesFromArrow(editAccessors, *result->Get()->GetArrowBatch(), columnIndex, type)
);
}
}
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 08820a7718..bd44157c3d 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -1594,7 +1594,12 @@ message TEvRead {
// must not be set
optional TTableId TableId = 2;
- // A list of column ids that are returned in the result set
+ // A list of column ids that are returned in the result set. Empty columns can be used to Count(*) rows.
+ // * In case of CellVec and empty columns: result will
+ // contain an empty CellVec for each read row.
+ // * In case of Arrow and empty Columns: ArrowBatch is a batch with single column of type NullType
+ // and with num_rows equal rowsCount. All values are null.
+ // In all cases RowCount can be checked with GetRowsCount() call (return RowCount field).
repeated uint32 Columns = 3;
// An optional snapshot for point in time queries
@@ -1695,6 +1700,8 @@ message TEvReadResult {
repeated TLock TxLocks = 10;
repeated TLock BrokenTxLocks = 11;
+ optional uint64 RowCount = 12;
+
// Data for the possibly partial result
oneof ReadResult {
TArrowBatch ArrowBatch = 900;
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 2c5bafdb99..19a2651bcf 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -3,6 +3,7 @@
#include <ydb/core/base/interconnect_channels.h>
#include <ydb/core/engine/minikql/flat_local_tx_factory.h>
+#include <ydb/core/formats/arrow_batch_builder.h>
#include <ydb/core/scheme/scheme_tablecell.h>
#include <ydb/core/tablet/tablet_counters_protobuf.h>
#include <ydb/core/tx/long_tx_service/public/events.h>
@@ -3979,4 +3980,19 @@ void TEvDataShard::TEvReadResult::FillRecord() {
}
}
+std::shared_ptr<arrow::RecordBatch> TEvDataShard::TEvReadResult::GetArrowBatch() const {
+ return const_cast<TEvDataShard::TEvReadResult*>(this)->GetArrowBatch();
+}
+
+std::shared_ptr<arrow::RecordBatch> TEvDataShard::TEvReadResult::GetArrowBatch() {
+ if (ArrowBatch)
+ return ArrowBatch;
+
+ if (Record.GetRowCount() == 0)
+ return nullptr;
+
+ ArrowBatch = NArrow::CreateNoColumnsBatch(Record.GetRowCount());
+ return ArrowBatch;
+}
+
} // NKikimr
diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h
index 1bdad5517b..b45c0a0aff 100644
--- a/ydb/core/tx/datashard/datashard.h
+++ b/ydb/core/tx/datashard/datashard.h
@@ -927,20 +927,24 @@ struct TEvDataShard {
static NActors::IEventBase* Load(TEventSerializedData* data);
+ size_t GetRowsCount() const {
+ return Record.GetRowCount();
+ }
+
private:
void FillRecord();
public:
// CellVec (TODO: add schema?)
- size_t GetRowsCount() const {
- return Rows.size() + RowsSerialized.size();
- }
-
TConstArrayRef<TCell> GetCells(size_t row) const {
+ if (Rows.empty() && RowsSerialized.empty() && Record.GetRowCount())
+ return {};
+
if (!Rows.empty()) {
return Rows[row];
}
+
return RowsSerialized[row].GetCells();
}
@@ -950,7 +954,12 @@ struct TEvDataShard {
// Arrow
- std::shared_ptr<arrow::RecordBatch> ArrowBatch;
+ void SetArrowBatch(std::shared_ptr<arrow::RecordBatch>&& batch) {
+ ArrowBatch = std::move(batch);
+ }
+
+ std::shared_ptr<arrow::RecordBatch> GetArrowBatch();
+ std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const;
private:
// for local events
@@ -958,6 +967,8 @@ struct TEvDataShard {
// for remote events to avoid extra copying
TVector<TSerializedCellVec> RowsSerialized;
+
+ std::shared_ptr<arrow::RecordBatch> ArrowBatch;
};
struct TEvReadContinue : public TEventLocal<TEvReadContinue, TEvDataShard::EvReadContinue> {
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index 05f5a7e8a2..e3c39b4bee 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -17,6 +17,32 @@ namespace {
constexpr ui64 MinRowsPerCheck = 1000;
+class TRowCountBlockBuilder : public IBlockBuilder {
+public:
+ bool Start(const TVector<std::pair<TString, NScheme::TTypeInfo>>&, ui64, ui64, TString&) override
+ {
+ return true;
+ }
+
+ void AddRow(const TDbTupleRef&, const TDbTupleRef&) override {
+ ++RowCount;
+ }
+
+ TString Finish() override {
+ return TString();
+ }
+
+ size_t Bytes() const override { return 0; }
+
+private:
+ ui64 RowCount = 0;
+
+ std::unique_ptr<IBlockBuilder> Clone() const override {
+ return nullptr;
+ }
+};
+
+
class TCellBlockBuilder : public IBlockBuilder {
public:
bool Start(
@@ -152,6 +178,11 @@ std::pair<std::unique_ptr<IBlockBuilder>, TString> CreateBlockBuilder(
std::unique_ptr<IBlockBuilder> blockBuilder;
TString error;
+ if (state.Columns.empty()) {
+ blockBuilder.reset(new TRowCountBlockBuilder());
+ return std::make_pair(std::move(blockBuilder), error);
+ }
+
auto nameTypeCols = GetNameTypeColumns(state.Columns, tableInfo);
if (nameTypeCols.empty()) {
error = "Wrong columns requested";
@@ -159,10 +190,10 @@ std::pair<std::unique_ptr<IBlockBuilder>, TString> CreateBlockBuilder(
}
switch (state.Format) {
- case NKikimrTxDataShard::EScanDataFormat::ARROW:
+ case NKikimrTxDataShard::ARROW:
blockBuilder.reset(new NArrow::TArrowBatchBuilder());
break;
- case NKikimrTxDataShard::EScanDataFormat::CELLVEC:
+ case NKikimrTxDataShard::CELLVEC:
blockBuilder.reset(new TCellBlockBuilder());
break;
default:
@@ -493,27 +524,34 @@ public:
}
}
- if (!isKeysRequest)
+ Self->IncCounter(COUNTER_READ_ITERATOR_ROWS_READ, RowsRead);
+ if (!isKeysRequest) {
Self->IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE_ROW_SKIPS, InvisibleRowSkips);
+ Self->IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE_ROWS, RowsRead);
+ Self->IncCounter(COUNTER_RANGE_READ_ROWS_PER_REQUEST, RowsRead);
+ }
+
+ if (RowsRead) {
+ record.SetRowCount(RowsRead);
+ }
+ // not that in case of empty columns set, here we have 0 bytes
+ // and if is false
BytesInResult = BlockBuilder.Bytes();
if (BytesInResult) {
- Self->IncCounter(COUNTER_READ_ITERATOR_ROWS_READ, RowsRead);
Self->IncCounter(COUNTER_READ_ITERATOR_BYTES_READ, BytesInResult);
if (isKeysRequest) {
// backward compatibility
Self->IncCounter(COUNTER_ENGINE_HOST_SELECT_ROW_BYTES, BytesInResult);
} else {
// backward compatibility
- Self->IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE_ROWS, RowsRead);
- Self->IncCounter(COUNTER_RANGE_READ_ROWS_PER_REQUEST, RowsRead);
Self->IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE_BYTES, BytesInResult);
}
switch (State.Format) {
case NKikimrTxDataShard::ARROW: {
auto& arrowBuilder = static_cast<NArrow::TArrowBatchBuilder&>(BlockBuilder);
- result.ArrowBatch = arrowBuilder.FlushBatch(false);
+ result.SetArrowBatch(arrowBuilder.FlushBatch(false));
break;
}
case NKikimrTxDataShard::CELLVEC: {
@@ -1930,11 +1968,6 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
return;
}
- if (record.ColumnsSize() == 0) {
- replyWithError(Ydb::StatusIds::BAD_REQUEST, "Missing Columns");
- return;
- }
-
TRowVersion readVersion = TRowVersion::Max();
bool isHeadRead = false;
if (record.HasSnapshot()) {
diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
index 261b67f43d..0e4df92779 100644
--- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
@@ -79,14 +79,14 @@ std::vector<TOwnedCellVec> GetRows(
const TVector<std::pair<TString, NScheme::TTypeInfo>>& batchSchema,
const TEvDataShard::TEvReadResult& result)
{
- UNIT_ASSERT(result.ArrowBatch);
+ UNIT_ASSERT(result.GetArrowBatch());
// TODO: use schema from ArrowBatch
TRowWriter writer;
NArrow::TArrowToYdbConverter converter(batchSchema, writer);
TString error;
- UNIT_ASSERT(converter.Process(*result.ArrowBatch, error));
+ UNIT_ASSERT(converter.Process(*result.GetArrowBatch(), error));
return std::move(writer.Rows);
}
@@ -146,7 +146,7 @@ void CheckResultArrow(
std::vector<NTable::TTag> columns = {})
{
UNIT_ASSERT(!gold.empty());
- UNIT_ASSERT(result.ArrowBatch);
+ UNIT_ASSERT(result.GetArrowBatch());
TVector<std::pair<TString, NScheme::TTypeInfo>> batchSchema;
const auto& description = userTable.GetDescription();
@@ -201,7 +201,7 @@ void CheckResult(
UNIT_ASSERT(false);
}
} else {
- UNIT_ASSERT(!result.ArrowBatch && result.GetRowsCount() == 0);
+ UNIT_ASSERT(!result.GetArrowBatch() && result.GetRowsCount() == 0);
}
}
@@ -913,6 +913,54 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
}
}
+ Y_UNIT_TEST(ShouldReadNoColumnsCellVec) {
+ // KIKIMR-16897: no columns mean we want to calc row count
+ TTestHelper helper;
+
+ auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::CELLVEC);
+ request->Record.ClearColumns();
+ AddRangeQuery<ui32>(
+ *request,
+ {1, 1, 1},
+ true,
+ {5, 5, 5},
+ true
+ );
+
+ auto readResult = helper.SendRead("table-1", request.release());
+ UNIT_ASSERT(readResult);
+ CheckResult(helper.Tables["table-1"].UserTable, *readResult, {
+ std::vector<ui32>(),
+ std::vector<ui32>(),
+ std::vector<ui32>(),
+ });
+ UNIT_ASSERT_VALUES_EQUAL(readResult->GetRowsCount(), 3UL);
+ }
+
+ Y_UNIT_TEST(ShouldReadNoColumnsArrow) {
+ // KIKIMR-16897: no columns mean we want to calc row count
+ TTestHelper helper;
+
+ auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW);
+ request->Record.ClearColumns();
+ AddRangeQuery<ui32>(
+ *request,
+ {1, 1, 1},
+ true,
+ {5, 5, 5},
+ true
+ );
+
+ auto readResult = helper.SendRead("table-1", request.release());
+ UNIT_ASSERT(readResult);
+ UNIT_ASSERT_VALUES_EQUAL(readResult->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(readResult->GetRowsCount(), 3UL);
+ UNIT_ASSERT(readResult->GetArrowBatch());
+
+ auto batch = readResult->GetArrowBatch();
+ UNIT_ASSERT_VALUES_EQUAL(batch->num_rows(), 3UL);
+ }
+
Y_UNIT_TEST(ShouldReadNonExistingKey) {
TTestHelper helper;