diff options
author | eivanov89 <eivanov89@ydb.tech> | 2023-01-24 16:20:03 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2023-01-24 16:20:03 +0300 |
commit | e5993e128fdc420e090421b6d1c368f39a5a48ee (patch) | |
tree | 297923d3da65f8bc54f7153ee5a0da5dd4d4697f | |
parent | c4ba73bc1252fb6adf017574a7c9da046d1aa421 (diff) | |
download | ydb-e5993e128fdc420e090421b6d1c368f39a5a48ee.tar.gz |
allow empty column set when read iterator reads in cellvec format
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 9 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 16 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.h | 21 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 57 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_read_iterator.cpp | 56 |
6 files changed, 138 insertions, 23 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index b52521af4d..277eed9026 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -692,7 +692,7 @@ public: } else { hasResultColumns = true; stats.AddStatistics( - NMiniKQL::WriteColumnValuesFromArrow(editAccessors, *result->Get()->ArrowBatch, columnIndex, type) + NMiniKQL::WriteColumnValuesFromArrow(editAccessors, *result->Get()->GetArrowBatch(), columnIndex, type) ); } } diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 08820a7718..bd44157c3d 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1594,7 +1594,12 @@ message TEvRead { // must not be set optional TTableId TableId = 2; - // A list of column ids that are returned in the result set + // A list of column ids that are returned in the result set. Empty columns can be used to Count(*) rows. + // * In case of CellVec and empty columns: result will + // contain an empty CellVec for each read row. + // * In case of Arrow and empty Columns: ArrowBatch is a batch with single column of type NullType + // and with num_rows equal rowsCount. All values are null. + // In all cases RowCount can be checked with GetRowsCount() call (return RowCount field). repeated uint32 Columns = 3; // An optional snapshot for point in time queries @@ -1695,6 +1700,8 @@ message TEvReadResult { repeated TLock TxLocks = 10; repeated TLock BrokenTxLocks = 11; + optional uint64 RowCount = 12; + // Data for the possibly partial result oneof ReadResult { TArrowBatch ArrowBatch = 900; diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 2c5bafdb99..19a2651bcf 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -3,6 +3,7 @@ #include <ydb/core/base/interconnect_channels.h> #include <ydb/core/engine/minikql/flat_local_tx_factory.h> +#include <ydb/core/formats/arrow_batch_builder.h> #include <ydb/core/scheme/scheme_tablecell.h> #include <ydb/core/tablet/tablet_counters_protobuf.h> #include <ydb/core/tx/long_tx_service/public/events.h> @@ -3979,4 +3980,19 @@ void TEvDataShard::TEvReadResult::FillRecord() { } } +std::shared_ptr<arrow::RecordBatch> TEvDataShard::TEvReadResult::GetArrowBatch() const { + return const_cast<TEvDataShard::TEvReadResult*>(this)->GetArrowBatch(); +} + +std::shared_ptr<arrow::RecordBatch> TEvDataShard::TEvReadResult::GetArrowBatch() { + if (ArrowBatch) + return ArrowBatch; + + if (Record.GetRowCount() == 0) + return nullptr; + + ArrowBatch = NArrow::CreateNoColumnsBatch(Record.GetRowCount()); + return ArrowBatch; +} + } // NKikimr diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index 1bdad5517b..b45c0a0aff 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -927,20 +927,24 @@ struct TEvDataShard { static NActors::IEventBase* Load(TEventSerializedData* data); + size_t GetRowsCount() const { + return Record.GetRowCount(); + } + private: void FillRecord(); public: // CellVec (TODO: add schema?) - size_t GetRowsCount() const { - return Rows.size() + RowsSerialized.size(); - } - TConstArrayRef<TCell> GetCells(size_t row) const { + if (Rows.empty() && RowsSerialized.empty() && Record.GetRowCount()) + return {}; + if (!Rows.empty()) { return Rows[row]; } + return RowsSerialized[row].GetCells(); } @@ -950,7 +954,12 @@ struct TEvDataShard { // Arrow - std::shared_ptr<arrow::RecordBatch> ArrowBatch; + void SetArrowBatch(std::shared_ptr<arrow::RecordBatch>&& batch) { + ArrowBatch = std::move(batch); + } + + std::shared_ptr<arrow::RecordBatch> GetArrowBatch(); + std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const; private: // for local events @@ -958,6 +967,8 @@ struct TEvDataShard { // for remote events to avoid extra copying TVector<TSerializedCellVec> RowsSerialized; + + std::shared_ptr<arrow::RecordBatch> ArrowBatch; }; struct TEvReadContinue : public TEventLocal<TEvReadContinue, TEvDataShard::EvReadContinue> { diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 05f5a7e8a2..e3c39b4bee 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -17,6 +17,32 @@ namespace { constexpr ui64 MinRowsPerCheck = 1000; +class TRowCountBlockBuilder : public IBlockBuilder { +public: + bool Start(const TVector<std::pair<TString, NScheme::TTypeInfo>>&, ui64, ui64, TString&) override + { + return true; + } + + void AddRow(const TDbTupleRef&, const TDbTupleRef&) override { + ++RowCount; + } + + TString Finish() override { + return TString(); + } + + size_t Bytes() const override { return 0; } + +private: + ui64 RowCount = 0; + + std::unique_ptr<IBlockBuilder> Clone() const override { + return nullptr; + } +}; + + class TCellBlockBuilder : public IBlockBuilder { public: bool Start( @@ -152,6 +178,11 @@ std::pair<std::unique_ptr<IBlockBuilder>, TString> CreateBlockBuilder( std::unique_ptr<IBlockBuilder> blockBuilder; TString error; + if (state.Columns.empty()) { + blockBuilder.reset(new TRowCountBlockBuilder()); + return std::make_pair(std::move(blockBuilder), error); + } + auto nameTypeCols = GetNameTypeColumns(state.Columns, tableInfo); if (nameTypeCols.empty()) { error = "Wrong columns requested"; @@ -159,10 +190,10 @@ std::pair<std::unique_ptr<IBlockBuilder>, TString> CreateBlockBuilder( } switch (state.Format) { - case NKikimrTxDataShard::EScanDataFormat::ARROW: + case NKikimrTxDataShard::ARROW: blockBuilder.reset(new NArrow::TArrowBatchBuilder()); break; - case NKikimrTxDataShard::EScanDataFormat::CELLVEC: + case NKikimrTxDataShard::CELLVEC: blockBuilder.reset(new TCellBlockBuilder()); break; default: @@ -493,27 +524,34 @@ public: } } - if (!isKeysRequest) + Self->IncCounter(COUNTER_READ_ITERATOR_ROWS_READ, RowsRead); + if (!isKeysRequest) { Self->IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE_ROW_SKIPS, InvisibleRowSkips); + Self->IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE_ROWS, RowsRead); + Self->IncCounter(COUNTER_RANGE_READ_ROWS_PER_REQUEST, RowsRead); + } + + if (RowsRead) { + record.SetRowCount(RowsRead); + } + // not that in case of empty columns set, here we have 0 bytes + // and if is false BytesInResult = BlockBuilder.Bytes(); if (BytesInResult) { - Self->IncCounter(COUNTER_READ_ITERATOR_ROWS_READ, RowsRead); Self->IncCounter(COUNTER_READ_ITERATOR_BYTES_READ, BytesInResult); if (isKeysRequest) { // backward compatibility Self->IncCounter(COUNTER_ENGINE_HOST_SELECT_ROW_BYTES, BytesInResult); } else { // backward compatibility - Self->IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE_ROWS, RowsRead); - Self->IncCounter(COUNTER_RANGE_READ_ROWS_PER_REQUEST, RowsRead); Self->IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE_BYTES, BytesInResult); } switch (State.Format) { case NKikimrTxDataShard::ARROW: { auto& arrowBuilder = static_cast<NArrow::TArrowBatchBuilder&>(BlockBuilder); - result.ArrowBatch = arrowBuilder.FlushBatch(false); + result.SetArrowBatch(arrowBuilder.FlushBatch(false)); break; } case NKikimrTxDataShard::CELLVEC: { @@ -1930,11 +1968,6 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct return; } - if (record.ColumnsSize() == 0) { - replyWithError(Ydb::StatusIds::BAD_REQUEST, "Missing Columns"); - return; - } - TRowVersion readVersion = TRowVersion::Max(); bool isHeadRead = false; if (record.HasSnapshot()) { diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index 261b67f43d..0e4df92779 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -79,14 +79,14 @@ std::vector<TOwnedCellVec> GetRows( const TVector<std::pair<TString, NScheme::TTypeInfo>>& batchSchema, const TEvDataShard::TEvReadResult& result) { - UNIT_ASSERT(result.ArrowBatch); + UNIT_ASSERT(result.GetArrowBatch()); // TODO: use schema from ArrowBatch TRowWriter writer; NArrow::TArrowToYdbConverter converter(batchSchema, writer); TString error; - UNIT_ASSERT(converter.Process(*result.ArrowBatch, error)); + UNIT_ASSERT(converter.Process(*result.GetArrowBatch(), error)); return std::move(writer.Rows); } @@ -146,7 +146,7 @@ void CheckResultArrow( std::vector<NTable::TTag> columns = {}) { UNIT_ASSERT(!gold.empty()); - UNIT_ASSERT(result.ArrowBatch); + UNIT_ASSERT(result.GetArrowBatch()); TVector<std::pair<TString, NScheme::TTypeInfo>> batchSchema; const auto& description = userTable.GetDescription(); @@ -201,7 +201,7 @@ void CheckResult( UNIT_ASSERT(false); } } else { - UNIT_ASSERT(!result.ArrowBatch && result.GetRowsCount() == 0); + UNIT_ASSERT(!result.GetArrowBatch() && result.GetRowsCount() == 0); } } @@ -913,6 +913,54 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { } } + Y_UNIT_TEST(ShouldReadNoColumnsCellVec) { + // KIKIMR-16897: no columns mean we want to calc row count + TTestHelper helper; + + auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::CELLVEC); + request->Record.ClearColumns(); + AddRangeQuery<ui32>( + *request, + {1, 1, 1}, + true, + {5, 5, 5}, + true + ); + + auto readResult = helper.SendRead("table-1", request.release()); + UNIT_ASSERT(readResult); + CheckResult(helper.Tables["table-1"].UserTable, *readResult, { + std::vector<ui32>(), + std::vector<ui32>(), + std::vector<ui32>(), + }); + UNIT_ASSERT_VALUES_EQUAL(readResult->GetRowsCount(), 3UL); + } + + Y_UNIT_TEST(ShouldReadNoColumnsArrow) { + // KIKIMR-16897: no columns mean we want to calc row count + TTestHelper helper; + + auto request = helper.GetBaseReadRequest("table-1", 1, NKikimrTxDataShard::ARROW); + request->Record.ClearColumns(); + AddRangeQuery<ui32>( + *request, + {1, 1, 1}, + true, + {5, 5, 5}, + true + ); + + auto readResult = helper.SendRead("table-1", request.release()); + UNIT_ASSERT(readResult); + UNIT_ASSERT_VALUES_EQUAL(readResult->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(readResult->GetRowsCount(), 3UL); + UNIT_ASSERT(readResult->GetArrowBatch()); + + auto batch = readResult->GetArrowBatch(); + UNIT_ASSERT_VALUES_EQUAL(batch->num_rows(), 3UL); + } + Y_UNIT_TEST(ShouldReadNonExistingKey) { TTestHelper helper; |