diff options
author | ivanmorozov <ivanmorozov@ydb.tech> | 2023-12-13 15:08:39 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@ydb.tech> | 2023-12-13 16:03:39 +0300 |
commit | b0943903218f95edba19ac132ce7b358dd1e297b (patch) | |
tree | a0547970e27a8a39f6eed15814c7ee0f48fa3aa9 | |
parent | 7800ffcc112d88e155f1bdadd0f4d37afa2b112b (diff) | |
download | ydb-b0943903218f95edba19ac132ce7b358dd1e297b.tar.gz |
KIKIMR-20179: remove deprecated reader from tests
35 files changed, 904 insertions, 942 deletions
diff --git a/.mapping.json b/.mapping.json index a4a91ab6b9..d7267ee15c 100644 --- a/.mapping.json +++ b/.mapping.json @@ -6300,6 +6300,12 @@ "ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt":"", "ydb/core/tx/columnshard/common/CMakeLists.txt":"", "ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt":"", + "ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-arm64.txt":"", + "ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-x86_64.txt":"", + "ydb/core/tx/columnshard/common/tests/CMakeLists.linux-aarch64.txt":"", + "ydb/core/tx/columnshard/common/tests/CMakeLists.linux-x86_64.txt":"", + "ydb/core/tx/columnshard/common/tests/CMakeLists.txt":"", + "ydb/core/tx/columnshard/common/tests/CMakeLists.windows-x86_64.txt":"", "ydb/core/tx/columnshard/counters/CMakeLists.darwin-arm64.txt":"", "ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt":"", "ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt":"", diff --git a/ydb/core/kqp/compute_actor/kqp_compute_events.h b/ydb/core/kqp/compute_actor/kqp_compute_events.h index f2efd23f21..5d57f94fa4 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_events.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_events.h @@ -14,6 +14,12 @@ struct TEvKqpCompute { struct TEvRemoteScanData : public TEventPB<TEvRemoteScanData, NKikimrKqp::TEvRemoteScanData, TKqpComputeEvents::EvRemoteScanData> {}; + class IShardScanStats { + public: + virtual ~IShardScanStats() = default; + virtual THashMap<TString, ui64> GetMetrics() const = 0; + }; + /* * Scan communications. * @@ -46,6 +52,19 @@ struct TEvKqpCompute { bool Finished = false; bool PageFault = false; // page fault was the reason for sending this message mutable THolder<TEvRemoteScanData> Remote; + std::shared_ptr<IShardScanStats> StatsOnFinished; + + template <class T> + const T& GetStatsAs() const { + Y_ABORT_UNLESS(!!StatsOnFinished); + return VerifyDynamicCast<const T&>(*StatsOnFinished); + } + + template <class T> + bool CheckStatsIs() const { + auto p = dynamic_cast<const T*>(StatsOnFinished.get()); + return p; + } ui32 GetRowsCount() const { if (ArrowBatch) { @@ -225,6 +244,7 @@ struct TEvKqpCompute { struct TEvKillScanTablet : public NActors::TEventPB<TEvKillScanTablet, NKikimrKqp::TEvKillScanTablet, TKqpComputeEvents::EvKillScanTablet> {}; + }; } // namespace NKikimr::NKqp diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h index dd6a385869..2d3d7e96e0 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.h +++ b/ydb/core/tx/columnshard/columnshard__index_scan.h @@ -15,6 +15,10 @@ public: : IndexInfo(indexInfo) {} + virtual std::optional<ui32> GetColumnIdOptional(const TString& name) const override { + return IndexInfo.GetColumnIdOptional(name); + } + TString GetColumnName(ui32 id, bool required) const override { return IndexInfo.GetColumnName(id, required); } @@ -91,6 +95,10 @@ public: return ReadyResults.size(); } + virtual const NOlap::TReadStats& GetStats() const override { + return *ReadMetadata->ReadStats; + } + virtual TString DebugString(const bool verbose) const override { return TStringBuilder() << "ready_results:(" << ReadyResults.DebugString() << ");" diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 9845a5eed8..d3e7138b5a 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -97,6 +97,7 @@ public: , Stats(NTracing::TTraceClient::GetLocalClient("SHARD", ::ToString(TabletId)/*, "SCAN_TXID:" + ::ToString(TxId)*/)) , ComputeShardingPolicy(computeShardingPolicy) { + AFL_VERIFY(ReadMetadataRanges.size() == 1); KeyYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetKeyYqlSchema(); } @@ -396,7 +397,25 @@ private: return singleRowWriter.Row; } - bool SendResult(bool pageFault, bool lastBatch){ + class TScanStatsOwner: public NKqp::TEvKqpCompute::IShardScanStats { + private: + YDB_READONLY_DEF(NOlap::TReadStats, Stats); + public: + TScanStatsOwner(const NOlap::TReadStats& stats) + : Stats(stats) { + + } + + virtual THashMap<TString, ui64> GetMetrics() const override { + THashMap<TString, ui64> result; + result["compacted_bytes"] = Stats.CompactedPortionsBytes; + result["inserted_bytes"] = Stats.InsertedPortionsBytes; + result["committed_bytes"] = Stats.CommittedPortionsBytes; + return result; + } + }; + + bool SendResult(bool pageFault, bool lastBatch) { if (Finished) { return true; } @@ -429,6 +448,7 @@ private: << " bytes: " << Bytes << "/" << BytesSum << " rows: " << Rows << "/" << RowsSum << " page faults: " << Result->PageFaults << " finished: " << Result->Finished << " pageFault: " << Result->PageFault << " stats:" << Stats->ToJson() << ";iterator:" << (ScanIterator ? ScanIterator->DebugString(false) : "NO"); + Result->StatsOnFinished = std::make_shared<TScanStatsOwner>(ScanIterator->GetStats()); } else { Y_ABORT_UNLESS(ChunksLimiter.Take(Bytes)); Result->RequestedBytesLimitReached = !ChunksLimiter.HasMore(); @@ -696,6 +716,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) { bool isIndexStats = read.TableName.EndsWith(NOlap::TIndexInfo::STORE_INDEX_STATS_TABLE) || read.TableName.EndsWith(NOlap::TIndexInfo::TABLE_INDEX_STATS_TABLE); read.ColumnIds.assign(record.GetColumnTags().begin(), record.GetColumnTags().end()); + read.StatsMode = record.GetStatsMode(); const NOlap::TIndexInfo* indexInfo = nullptr; if (!isIndexStats) { @@ -745,6 +766,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) { ReadMetadataRanges.emplace_back(newRange); } Y_ABORT_UNLESS(ReadMetadataRanges.size() == 1); + return true; } @@ -773,8 +795,7 @@ void TTxScan::Complete(const TActorContext& ctx) { const ui32 scanGen = request.GetGeneration(); TString table = request.GetTablePath(); auto dataFormat = request.GetDataFormat(); - TDuration timeout = TDuration::MilliSeconds(request.GetTimeoutMs()); - + const TDuration timeout = TDuration::MilliSeconds(request.GetTimeoutMs()); if (scanGen > 1) { Self->IncCounter(COUNTER_SCAN_RESTARTED); } @@ -783,8 +804,6 @@ void TTxScan::Complete(const TActorContext& ctx) { if (IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_TRACE, NKikimrServices::TX_COLUMNSHARD)) { detailedInfo << " read metadata: (" << TContainerPrinter(ReadMetadataRanges) << ")" << " req: " << request; } - std::vector<NOlap::TReadMetadata::TConstPtr> rMetadataRanges; - if (ReadMetadataRanges.empty()) { LOG_S_DEBUG("TTxScan failed " << " txId: " << txId @@ -860,6 +879,10 @@ void TColumnShard::Handle(TEvColumnShard::TEvScan::TPtr& ev, const TActorContext Execute(new TTxScan(this, ev), ctx); } +const NKikimr::NOlap::TReadStats& TScanIteratorBase::GetStats() const { + return Default<NOlap::TReadStats>(); +} + } namespace NKikimr::NOlap { diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h index 7576cb3247..5c20f2a5bd 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.h +++ b/ydb/core/tx/columnshard/columnshard__scan.h @@ -9,6 +9,7 @@ #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> namespace NKikimr::NOlap { +struct TReadStats; // Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation class TPartialReadResult { private: @@ -94,6 +95,9 @@ public: virtual void Apply(IDataTasksProcessor::ITask::TPtr /*processor*/) { } + + virtual const NOlap::TReadStats& GetStats() const; + virtual std::optional<ui32> GetAvailableResultsCount() const { return {}; } diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h index 829e57c1dc..0211b47595 100644 --- a/ydb/core/tx/columnshard/columnshard__stats_scan.h +++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h @@ -28,6 +28,15 @@ public: return it->second.Name; } + std::optional<ui32> GetColumnIdOptional(const TString& name) const override { + auto it = PrimaryIndexStatsSchema.ColumnNames.find(name); + if (it == PrimaryIndexStatsSchema.ColumnNames.end()) { + return {}; + } else { + return it->second; + } + } + const NTable::TScheme::TTableSchema& GetSchema() const override { return PrimaryIndexStatsSchema; } diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp index 9ad4f4b42f..8df44d7df9 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp @@ -1,6 +1,7 @@ #include "columnshard_ut_common.h" #include "columnshard__stats_scan.h" +#include "common/tests/shard_reader.h" #include <ydb/core/base/tablet.h> #include <ydb/core/base/tablet_resolver.h> @@ -449,26 +450,15 @@ namespace NKikimr::NColumnShard { } std::shared_ptr<arrow::RecordBatch> ReadAllAsBatch(TTestBasicRuntime& runtime, const ui64 tableId, const NOlap::TSnapshot& snapshot, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema) { - using namespace NTxUT; - TActorId sender = runtime.AllocateEdgeActor(); - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, - new TEvColumnShard::TEvRead(sender, TTestTxConfig::TxTablet1, snapshot.GetPlanStep(), snapshot.GetTxId(), tableId)); - - std::vector<std::shared_ptr<arrow::RecordBatch>> batches; - while(true) { - TAutoPtr<IEventHandle> handle; - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); - auto b = event->GetArrowBatch(); - if (b) { - batches.push_back(b); - } - if (!event->HasMore()) { - break; - } + std::vector<TString> fields; + for (auto&& f : schema) { + fields.emplace_back(f.first); } - auto res = NArrow::CombineBatches(batches); - return res ? res : NArrow::MakeEmptyBatch(NArrow::MakeArrowSchema(schema)); + + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, snapshot); + reader.SetReplyColumns(fields); + auto rb = reader.ReadAll(); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + return rb ? rb : NArrow::MakeEmptyBatch(NArrow::MakeArrowSchema(schema)); } } diff --git a/ydb/core/tx/columnshard/common/CMakeLists.darwin-arm64.txt b/ydb/core/tx/columnshard/common/CMakeLists.darwin-arm64.txt index a059465374..fb413280b2 100644 --- a/ydb/core/tx/columnshard/common/CMakeLists.darwin-arm64.txt +++ b/ydb/core/tx/columnshard/common/CMakeLists.darwin-arm64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(tests) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency diff --git a/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt index a059465374..fb413280b2 100644 --- a/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(tests) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency diff --git a/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt index 1ef9531173..6d584af68a 100644 --- a/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(tests) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency diff --git a/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt index 1ef9531173..6d584af68a 100644 --- a/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(tests) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency diff --git a/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt index a059465374..fb413280b2 100644 --- a/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(tests) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency diff --git a/ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-arm64.txt b/ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-arm64.txt new file mode 100644 index 0000000000..55055e3cc4 --- /dev/null +++ b/ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-arm64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-common-tests) +target_link_libraries(columnshard-common-tests PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-protos + libs-apache-arrow + core-formats-arrow + core-kqp-compute_actor +) +target_sources(columnshard-common-tests PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/tests/shard_reader.cpp +) diff --git a/ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..55055e3cc4 --- /dev/null +++ b/ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-common-tests) +target_link_libraries(columnshard-common-tests PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-protos + libs-apache-arrow + core-formats-arrow + core-kqp-compute_actor +) +target_sources(columnshard-common-tests PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/tests/shard_reader.cpp +) diff --git a/ydb/core/tx/columnshard/common/tests/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/common/tests/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..a6c6e844ce --- /dev/null +++ b/ydb/core/tx/columnshard/common/tests/CMakeLists.linux-aarch64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-common-tests) +target_link_libraries(columnshard-common-tests PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-protos + libs-apache-arrow + core-formats-arrow + core-kqp-compute_actor +) +target_sources(columnshard-common-tests PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/tests/shard_reader.cpp +) diff --git a/ydb/core/tx/columnshard/common/tests/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/common/tests/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..a6c6e844ce --- /dev/null +++ b/ydb/core/tx/columnshard/common/tests/CMakeLists.linux-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-common-tests) +target_link_libraries(columnshard-common-tests PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-protos + libs-apache-arrow + core-formats-arrow + core-kqp-compute_actor +) +target_sources(columnshard-common-tests PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/tests/shard_reader.cpp +) diff --git a/ydb/core/tx/columnshard/common/tests/CMakeLists.txt b/ydb/core/tx/columnshard/common/tests/CMakeLists.txt new file mode 100644 index 0000000000..d863ebd180 --- /dev/null +++ b/ydb/core/tx/columnshard/common/tests/CMakeLists.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "arm64") + include(CMakeLists.darwin-arm64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +endif() diff --git a/ydb/core/tx/columnshard/common/tests/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/common/tests/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..55055e3cc4 --- /dev/null +++ b/ydb/core/tx/columnshard/common/tests/CMakeLists.windows-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-common-tests) +target_link_libraries(columnshard-common-tests PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-protos + libs-apache-arrow + core-formats-arrow + core-kqp-compute_actor +) +target_sources(columnshard-common-tests PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/tests/shard_reader.cpp +) diff --git a/ydb/core/tx/columnshard/common/tests/shard_reader.cpp b/ydb/core/tx/columnshard/common/tests/shard_reader.cpp new file mode 100644 index 0000000000..2789a63e38 --- /dev/null +++ b/ydb/core/tx/columnshard/common/tests/shard_reader.cpp @@ -0,0 +1,4 @@ +#include "shard_reader.h" + +namespace NKikimr::NOlap::NTests { +} diff --git a/ydb/core/tx/columnshard/common/tests/shard_reader.h b/ydb/core/tx/columnshard/common/tests/shard_reader.h new file mode 100644 index 0000000000..777c93d8f1 --- /dev/null +++ b/ydb/core/tx/columnshard/common/tests/shard_reader.h @@ -0,0 +1,271 @@ +#pragma once +#include <ydb/core/testlib/basics/runtime.h> +#include <ydb/core/testlib/tablet_helpers.h> +#include <ydb/core/tx/columnshard/common/snapshot.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/tx/datashard/datashard.h> +#include <ydb/core/kqp/compute_actor/kqp_compute_events.h> +#include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/core/tx/columnshard/engines/reader/read_metadata.h> +#include <ydb/core/tx/columnshard/columnshard_private_events.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <optional> + +namespace NKikimr::NOlap::NTests { + +class TShardReader { +private: + TTestBasicRuntime& Runtime; + const ui64 TabletId; + const ui64 PathId; + const NOlap::TSnapshot Snapshot; + std::optional<NActors::TActorId> ScanActorId; + std::optional<int> Finished; + THashMap<TString, ui64> ResultStats; + std::optional<NKikimrSSA::TProgram> ProgramProto; + std::optional<TString> SerializedProgram; + YDB_ACCESSOR(bool, Reverse, false); + YDB_ACCESSOR(ui32, Limit, 0); + std::vector<TString> ReplyColumns; + std::vector<TSerializedTableRange> Ranges; + + std::unique_ptr<TEvDataShard::TEvKqpScan> BuildStartEvent() const { + auto ev = std::make_unique<TEvDataShard::TEvKqpScan>(); + ev->Record.SetLocalPathId(PathId); + ev->Record.MutableSnapshot()->SetStep(Snapshot.GetPlanStep()); + ev->Record.MutableSnapshot()->SetTxId(Snapshot.GetTxId()); + + ev->Record.SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_FULL); + ev->Record.SetTxId(Snapshot.GetTxId()); + + ev->Record.SetReverse(Reverse); + ev->Record.SetItemsLimit(Limit); + + ev->Record.SetDataFormat(NKikimrDataEvents::FORMAT_ARROW); + + auto protoRanges = ev->Record.MutableRanges(); + protoRanges->Reserve(Ranges.size()); + for (auto& range : Ranges) { + auto newRange = protoRanges->Add(); + range.Serialize(*newRange); + } + + if (ProgramProto) { + NKikimrSSA::TOlapProgram olapProgram; + { + TString programBytes; + TStringOutput stream(programBytes); + ProgramProto->SerializeToArcadiaStream(&stream); + olapProgram.SetProgram(programBytes); + } + { + TString programBytes; + TStringOutput stream(programBytes); + olapProgram.SerializeToArcadiaStream(&stream); + ev->Record.SetOlapProgram(programBytes); + } + ev->Record.SetOlapProgramType( + NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS + ); + } else if (SerializedProgram) { + ev->Record.SetOlapProgram(*SerializedProgram); + ev->Record.SetOlapProgramType( + NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS + ); + } + + return ev; + } + + std::vector<std::shared_ptr<arrow::RecordBatch>> ResultBatches; + YDB_READONLY(ui32, IterationsCount, 0); +public: + ui64 GetReadStat(const TString& paramName) const { + AFL_VERIFY(IsCorrectlyFinished()); + auto it = ResultStats.find(paramName); + AFL_VERIFY(it != ResultStats.end()); + return it->second; + } + + ui64 GetReadBytes() const { + return GetReadStat("committed_bytes") + GetReadStat("inserted_bytes") + GetReadStat("compacted_bytes"); + } + + void AddRange(const TSerializedTableRange& r) { + Ranges.emplace_back(r); + } + + ui32 GetRecordsCount() const { + AFL_VERIFY(IsFinished()); + auto r = GetResult(); + return r ? r->num_rows() : 0; + } + + TShardReader& SetReplyColumns(const std::vector<TString>& replyColumns) { + AFL_VERIFY(!SerializedProgram); + if (!ProgramProto) { + ProgramProto = NKikimrSSA::TProgram(); + } + for (auto&& command : *ProgramProto->MutableCommand()) { + if (command.HasProjection()) { + NKikimrSSA::TProgram::TProjection proj; + for (auto&& i : replyColumns) { + proj.AddColumns()->SetName(i); + } + *command.MutableProjection() = proj; + return *this; + } + } + { + auto* command = ProgramProto->AddCommand(); + NKikimrSSA::TProgram::TProjection proj; + for (auto&& i : replyColumns) { + proj.AddColumns()->SetName(i); + } + *command->MutableProjection() = proj; + } + return *this; + } + + TShardReader& SetReplyColumnIds(const std::vector<ui32>& replyColumnIds) { + AFL_VERIFY(!SerializedProgram); + if (!ProgramProto) { + ProgramProto = NKikimrSSA::TProgram(); + } + for (auto&& command : *ProgramProto->MutableCommand()) { + if (command.HasProjection()) { + NKikimrSSA::TProgram::TProjection proj; + for (auto&& i : replyColumnIds) { + proj.AddColumns()->SetId(i); + } + *command.MutableProjection() = proj; + return *this; + } + } + { + auto* command = ProgramProto->AddCommand(); + NKikimrSSA::TProgram::TProjection proj; + for (auto&& i : replyColumnIds) { + proj.AddColumns()->SetId(i); + } + *command->MutableProjection() = proj; + } + return *this; + } + + TShardReader& SetProgram(const NKikimrSSA::TProgram& p) { + AFL_VERIFY(!ProgramProto); + AFL_VERIFY(!SerializedProgram); + ProgramProto = p; + return *this; + } + + TShardReader& SetProgram(const TString& serializedProgram) { + AFL_VERIFY(!ProgramProto); + AFL_VERIFY(!SerializedProgram); + SerializedProgram = serializedProgram; + return *this; + } + + TShardReader(TTestBasicRuntime& runtime, const ui64 tabletId, const ui64 pathId, const NOlap::TSnapshot& snapshot) + : Runtime(runtime) + , TabletId(tabletId) + , PathId(pathId) + , Snapshot(snapshot) { + + } + + bool IsFinished() const { + return !!Finished; + } + + bool IsCorrectlyFinished() const { + return IsFinished() && *Finished == 1; + } + + bool IsError() const { + return IsFinished() && *Finished == -1; + } + + bool InitializeScanner() { + AFL_VERIFY(!ScanActorId); + const TActorId sender = Runtime.AllocateEdgeActor(); + ForwardToTablet(Runtime, TabletId, sender, BuildStartEvent().release()); + TAutoPtr<IEventHandle> handle; + auto event = Runtime.GrabEdgeEvents<NKqp::TEvKqpCompute::TEvScanInitActor, NKqp::TEvKqpCompute::TEvScanError>(handle); + if (auto* evSuccess = std::get<0>(event)) { + AFL_VERIFY(evSuccess); + auto& msg = evSuccess->Record; + ScanActorId = ActorIdFromProto(msg.GetScanActorId()); + return true; + } else if (auto* evError = std::get<1>(event)) { + Finished = -1; + } else { + AFL_VERIFY(false); + } + return false; + } + + void Ack() { + AFL_VERIFY(!Finished); + AFL_VERIFY(ScanActorId); + Runtime.Send(*ScanActorId, *ScanActorId, new NKqp::TEvKqpCompute::TEvScanDataAck(8 * 1024 * 1024, 0, 1)); + ++IterationsCount; + } + + bool Receive() { + AFL_VERIFY(!Finished); + TAutoPtr<IEventHandle> handle; + auto event = Runtime.GrabEdgeEvents<NKqp::TEvKqpCompute::TEvScanData, NKqp::TEvKqpCompute::TEvScanError>(handle); + if (auto* evData = std::get<0>(event)) { + auto b = evData->ArrowBatch; + if (b) { + NArrow::TStatusValidator::Validate(b->ValidateFull()); + ResultBatches.push_back(b); + } else { + AFL_VERIFY(evData->Finished); + } + if (evData->Finished) { + AFL_VERIFY(evData->StatsOnFinished); + ResultStats = evData->StatsOnFinished->GetMetrics(); + Finished = 1; + } + } else if (auto* evError = std::get<1>(event)) { + Finished = -1; + } else { + AFL_VERIFY(false); + } + return !Finished; + } + + std::shared_ptr<arrow::RecordBatch> ReadAll() { + if (InitializeScanner()) { + Ack(); + return ContinueReadAll(); + } + return GetResult(); + } + + std::shared_ptr<arrow::RecordBatch> ContinueReadAll() { + while (Receive()) { + Ack(); + } + return GetResult(); + } + + std::shared_ptr<arrow::RecordBatch> GetResult() const { + AFL_VERIFY(!!Finished); + if (IsError()) { + return nullptr; + } + if (ResultBatches.empty()) { + return nullptr; + } else { + auto result = NArrow::CombineBatches(ResultBatches); + NArrow::TStatusValidator::Validate(result->ValidateFull()); + return result; + } + } +}; + +} diff --git a/ydb/core/tx/columnshard/common/tests/ya.make b/ydb/core/tx/columnshard/common/tests/ya.make new file mode 100644 index 0000000000..98401ad0e7 --- /dev/null +++ b/ydb/core/tx/columnshard/common/tests/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + shard_reader.cpp +) + +PEERDIR( + ydb/core/protos + contrib/libs/apache/arrow + ydb/core/formats/arrow + ydb/core/kqp/compute_actor +) + +END() diff --git a/ydb/core/tx/columnshard/engines/reader/description.h b/ydb/core/tx/columnshard/engines/reader/description.h index 75b6aa4f32..f05943bffa 100644 --- a/ydb/core/tx/columnshard/engines/reader/description.h +++ b/ydb/core/tx/columnshard/engines/reader/description.h @@ -1,7 +1,7 @@ #pragma once #include <ydb/core/tx/program/program.h> #include <ydb/core/tx/columnshard/engines/predicate/filter.h> - +#include <ydb/library/yql/dq/actors/protos/dq_stats.pb.h> namespace NKikimr::NOlap { // Describes read/scan request @@ -18,6 +18,7 @@ public: // There's complex logic in NKikimr::TTableRange comparison that could be emulated only with separated compare // operations with potentially different columns. We have to remove columns to support -Inf (Null) and +Inf. NOlap::TPKRangesFilter PKRangesFilter; + NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE; // List of columns std::vector<ui32> ColumnIds; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h index 02193c459a..2c6f213143 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h @@ -52,7 +52,14 @@ public: auto readSchema = ReadMetadata->GetLoadSchema(ReadMetadata->GetSnapshot()); SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSpecialColumnIdsSet(), ReadMetadata->GetIndexInfo(), readSchema); - EFColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetEarlyFilterColumnIds(), ReadMetadata->GetIndexInfo(), readSchema); + { + auto efColumns = ReadMetadata->GetEarlyFilterColumnIds(); + if (efColumns.size()) { + EFColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetEarlyFilterColumnIds(), ReadMetadata->GetIndexInfo(), readSchema); + } else { + EFColumns = SpecColumns; + } + } *EFColumns = *EFColumns + *SpecColumns; if (ReadMetadata->HasProcessingColumnIds()) { FFColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetProcessingColumnIds(), ReadMetadata->GetIndexInfo(), readSchema); diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp index f5474a7497..83d0ded5f5 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp @@ -42,6 +42,7 @@ bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataSto CommittedBlobs = dataAccessor.GetCommitedBlobs(readDescription, ResultIndexSchema->GetIndexInfo().GetReplaceKey()); SelectInfo = dataAccessor.Select(readDescription); + StatsMode = readDescription.StatsMode; return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h index a09a017ce2..01ca8575e4 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h @@ -42,13 +42,17 @@ struct TReadStats { ui32 SelectedRows = 0; - TReadStats(ui32 indexNo) + TReadStats(ui32 indexNo = 0) : BeginTimestamp(TInstant::Now()) , SelectedIndex(indexNo) {} void PrintToLog(); + ui64 GetReadBytes() const { + return CompactedPortionsBytes + InsertedPortionsBytes + CompactedPortionsBytes; + } + TDuration Duration() { return TInstant::Now() - BeginTimestamp; } @@ -99,7 +103,7 @@ public: } virtual ~TReadMetadataBase() = default; - ui64 Limit{0}; // TODO + ui64 Limit = 0; virtual void Dump(IOutputStream& out) const { out << " predicate{" << (PKRangesFilter ? PKRangesFilter->DebugString() : "no_initialized") << "}" @@ -149,6 +153,7 @@ public: return result; } std::shared_ptr<TSelectInfo> SelectInfo; + NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE; std::vector<TCommittedBlob> CommittedBlobs; std::shared_ptr<TReadStats> ReadStats; diff --git a/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-arm64.txt b/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-arm64.txt index cac2dc7c55..737bdbf11b 100644 --- a/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-arm64.txt +++ b/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-arm64.txt @@ -25,6 +25,7 @@ target_link_libraries(ydb-core-tx-columnshard-ut_rw PUBLIC core-testlib-default columnshard-hooks-abstract columnshard-hooks-testing + columnshard-common-tests ydb-services-metadata ydb-core-tx public-lib-yson_value diff --git a/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-x86_64.txt index aec0c14a8a..edb570ec79 100644 --- a/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-x86_64.txt @@ -26,6 +26,7 @@ target_link_libraries(ydb-core-tx-columnshard-ut_rw PUBLIC core-testlib-default columnshard-hooks-abstract columnshard-hooks-testing + columnshard-common-tests ydb-services-metadata ydb-core-tx public-lib-yson_value diff --git a/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-aarch64.txt index 8524515b05..3d90143420 100644 --- a/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-aarch64.txt @@ -26,6 +26,7 @@ target_link_libraries(ydb-core-tx-columnshard-ut_rw PUBLIC core-testlib-default columnshard-hooks-abstract columnshard-hooks-testing + columnshard-common-tests ydb-services-metadata ydb-core-tx public-lib-yson_value diff --git a/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-x86_64.txt index fb7f19246f..b9e15ace09 100644 --- a/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-x86_64.txt @@ -27,6 +27,7 @@ target_link_libraries(ydb-core-tx-columnshard-ut_rw PUBLIC core-testlib-default columnshard-hooks-abstract columnshard-hooks-testing + columnshard-common-tests ydb-services-metadata ydb-core-tx public-lib-yson_value diff --git a/ydb/core/tx/columnshard/ut_rw/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/ut_rw/CMakeLists.windows-x86_64.txt index 2ef0565f74..68c91949bc 100644 --- a/ydb/core/tx/columnshard/ut_rw/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/ut_rw/CMakeLists.windows-x86_64.txt @@ -26,6 +26,7 @@ target_link_libraries(ydb-core-tx-columnshard-ut_rw PUBLIC core-testlib-default columnshard-hooks-abstract columnshard-hooks-testing + columnshard-common-tests ydb-services-metadata ydb-core-tx public-lib-yson_value diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 97e883aa3c..55fa1e342e 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -11,6 +11,7 @@ #include <ydb/core/tx/columnshard/operations/write_data.h> #include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> #include <ydb/core/tx/columnshard/hooks/testing/controller.h> +#include <ydb/core/tx/columnshard/common/tests/shard_reader.h> #include <ydb/library/actors/protos/unittests.pb.h> #include <ydb/core/formats/arrow/simple_builder/filler.h> #include <ydb/core/formats/arrow/simple_builder/array.h> @@ -41,7 +42,7 @@ public: }; template <typename TKey = ui64> -bool DataHas(const std::vector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range, +bool DataHas(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::pair<ui64, ui64> range, bool requireUniq = false, const std::string& columnName = "timestamp") { static constexpr const bool isStrKey = std::is_same_v<TKey, std::string>; @@ -54,14 +55,8 @@ bool DataHas(const std::vector<TString>& blobs, const TString& srtSchema, std::p } } - auto schema = NArrow::DeserializeSchema(srtSchema); - //Cerr << "Got schema: " << schema->ToString() << "\n"; - - for (auto& blob : blobs) { - auto batch = NArrow::DeserializeBatch(blob, schema); + for (auto& batch : batches) { UNIT_ASSERT(batch); - //Cerr << "Got batch: " << batch->ToString() << "\n"; - std::shared_ptr<arrow::Array> array = batch->GetColumnByName(columnName); UNIT_ASSERT(array); @@ -104,7 +99,20 @@ bool DataHas(const std::vector<TString>& blobs, const TString& srtSchema, std::p } template <typename TKey = ui64> -bool DataHasOnly(const std::vector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range) { +bool DataHas(const std::vector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range, + bool requireUniq = false, const std::string& columnName = "timestamp") { + + auto schema = NArrow::DeserializeSchema(srtSchema); + std::vector<std::shared_ptr<arrow::RecordBatch>> batches; + for (auto& blob : blobs) { + batches.emplace_back(NArrow::DeserializeBatch(blob, schema)); + } + + return DataHas<TKey>(batches, range, requireUniq, columnName); +} + +template <typename TKey = ui64> +bool DataHasOnly(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::pair<ui64, ui64> range) { static constexpr const bool isStrKey = std::is_same_v<TKey, std::string>; THashSet<TKey> keys; @@ -116,9 +124,7 @@ bool DataHasOnly(const std::vector<TString>& blobs, const TString& srtSchema, st } } - auto schema = NArrow::DeserializeSchema(srtSchema); - for (auto& blob : blobs) { - auto batch = NArrow::DeserializeBatch(blob, schema); + for (auto& batch : batches) { UNIT_ASSERT(batch); std::shared_ptr<arrow::Array> array = batch->GetColumnByName("timestamp"); @@ -154,6 +160,17 @@ bool DataHasOnly(const std::vector<TString>& blobs, const TString& srtSchema, st return true; } +template <typename TKey = ui64> +bool DataHasOnly(const std::vector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range) { + auto schema = NArrow::DeserializeSchema(srtSchema); + std::vector<std::shared_ptr<arrow::RecordBatch>> batches; + for (auto& blob : blobs) { + batches.emplace_back(NArrow::DeserializeBatch(blob, schema)); + } + + return DataHasOnly(batches, range); +} + template <typename TArrowType> bool CheckTypedIntValues(const std::shared_ptr<arrow::Array>& array, const std::vector<int64_t>& expected) { using TArray = typename arrow::TypeTraits<TArrowType>::ArrayType; @@ -238,9 +255,7 @@ bool CheckIntValues(const std::shared_ptr<arrow::Array>& array, const std::vecto return true; } -bool CheckOrdered(const TString& blob, const TString& srtSchema) { - auto schema = NArrow::DeserializeSchema(srtSchema); - auto batch = NArrow::DeserializeBatch(blob, schema); +bool CheckOrdered(const std::shared_ptr<arrow::RecordBatch>& batch) { UNIT_ASSERT(batch); std::shared_ptr<arrow::Array> array = batch->GetColumnByName("timestamp"); @@ -300,10 +315,12 @@ bool CheckOrdered(const TString& blob, const TString& srtSchema) { return true; } -bool CheckColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<TString>& colNames, size_t rowsCount) { +bool CheckColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<TString>& colNames, std::optional<size_t> rowsCount) { UNIT_ASSERT(batch); UNIT_ASSERT_VALUES_EQUAL((ui64)batch->num_columns(), colNames.size()); - UNIT_ASSERT_VALUES_EQUAL((ui64)batch->num_rows(), rowsCount); + if (rowsCount) { + UNIT_ASSERT_VALUES_EQUAL((ui64)batch->num_rows(), *rowsCount); + } UNIT_ASSERT(batch->ValidateFull().ok()); for (size_t i = 0; i < colNames.size(); ++i) { @@ -318,46 +335,6 @@ bool CheckColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const std::v return true; } -bool CheckColumns(const TString& blob, const NKikimrTxColumnShard::TMetadata& meta, const std::vector<TString>& colNames, - size_t rowsCount = 100) { - auto schema = NArrow::DeserializeSchema(meta.GetSchema()); - auto batch = NArrow::DeserializeBatch(blob, schema); - - return CheckColumns(batch, colNames, rowsCount); -} - -std::vector<TString> ReadManyResults(TTestBasicRuntime& runtime, TString& schema, - NKikimrTxColumnShard::TMetadata& meta, ui32 expected = 1000) { - std::vector<TString> readData; - TAutoPtr<IEventHandle> handle; - bool finished = false; - for (ui32 i = 0; i < expected; ++i) { - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); - - auto& resRead = Proto(event); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), TTestTxConfig::TxTablet1); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT(resRead.GetData().size() > 0); - //UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - //Cerr << "GOT BATCH " << resRead.GetBatch() << " data size " << resRead.GetData().size() << "\n"; - if (resRead.GetFinished()) { - expected = resRead.GetBatch() + 1; - meta = resRead.GetMeta(); - finished = true; - } - readData.push_back(resRead.GetData()); - - if (schema.empty()) { - schema = resRead.GetMeta().GetSchema(); - } - UNIT_ASSERT(CheckOrdered(resRead.GetData(), schema)); - } - UNIT_ASSERT(finished); - return readData; -} - void TestWrite(const TestTableDescription& table) { TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -533,7 +510,6 @@ void TestWriteReadDup(const TestTableDescription& table = {}) { // - ui64 metaShard = TTestTxConfig::TxTablet1; ui64 writeId = 0; ui64 tableId = 1; @@ -559,13 +535,12 @@ void TestWriteReadDup(const TestTableDescription& table = {}) { // read if (planStep != initPlanStep) { - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, - new TEvColumnShard::TEvRead(sender, metaShard, planStep-1, Max<ui64>(), tableId)); - - TString schema; - NKikimrTxColumnShard::TMetadata meta; - std::vector<TString> readData = ReadManyResults(runtime, schema, meta); - UNIT_ASSERT(DataHas(readData, schema, portion, true)); + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep - 1, Max<ui64>())); + reader.SetReplyColumns({"timestamp"}); + auto rb = reader.ReadAll(); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + UNIT_ASSERT(CheckOrdered(rb)); + UNIT_ASSERT(DataHas({rb}, portion, true)); } } } @@ -619,24 +594,17 @@ void TestWriteReadLongTxDup() { // read TAutoPtr<IEventHandle> handle; { - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, - new TEvColumnShard::TEvRead(sender, 0, planStep, txId, tableId)); - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); - - auto& resRead = Proto(event); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), 0); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - UNIT_ASSERT(resRead.GetData().size() > 0); - - auto data = resRead.GetData(); - auto meta = resRead.GetMeta(); - UNIT_ASSERT(CheckColumns(data, meta, TTestSchema::ExtractNames(ydbSchema), numRows)); - UNIT_ASSERT(DataHas(std::vector<TString>{data}, meta.GetSchema(), portion, true)); - UNIT_ASSERT(DataHasOnly(std::vector<TString>{data}, meta.GetSchema(), portion)); + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId)); + reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema)); + auto rb = reader.ReadAll(); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + UNIT_ASSERT(rb); + UNIT_ASSERT(rb->num_rows()); + NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema)); + UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); + UNIT_ASSERT(CheckOrdered(rb)); + UNIT_ASSERT(DataHas({rb}, portion, true)); + UNIT_ASSERT(DataHasOnly({rb}, portion)); } } @@ -681,7 +649,6 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString // - ui64 metaShard = TTestTxConfig::TxTablet1; ui64 writeId = 0; ui64 tableId = 1; @@ -710,18 +677,12 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString TAutoPtr<IEventHandle> handle; { NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 1); - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, - new TEvColumnShard::TEvRead(sender, metaShard, 0, 0, tableId)); - auto event2 = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event2); - - auto& resRead = Proto(event2); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - UNIT_ASSERT_EQUAL(resRead.GetData(), ""); + + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(0, 0)); + reader.SetReplyColumns({"resource_type"}); + auto rb = reader.ReadAll(); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + UNIT_ASSERT_EQUAL(rb, nullptr); } // commit 1: ins:0, cmt:1, idx:0 @@ -733,86 +694,57 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString // read 2 (committed, old snapshot) { NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 2); - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, - new TEvColumnShard::TEvRead(sender, metaShard, 0, 0, tableId)); - auto event5 = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event5); - auto& resRead2 = Proto(event5); - UNIT_ASSERT_EQUAL(resRead2.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead2.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead2.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead2.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead2.GetFinished(), true); - UNIT_ASSERT_EQUAL(resRead2.GetData(), ""); + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(0, 0)); + reader.SetReplyColumns({"resource_type"}); + auto rb = reader.ReadAll(); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + UNIT_ASSERT_EQUAL(rb, nullptr); } // read 3 (committed) { NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 3); - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, - new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId)); - auto event6 = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event6); - - auto& resRead3 = Proto(event6); - UNIT_ASSERT_EQUAL(resRead3.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead3.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead3.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead3.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead3.GetFinished(), true); - //UNIT_ASSERT_EQUAL(resRead3.GetData(), data); - UNIT_ASSERT(resRead3.GetData().size() > 0); - UNIT_ASSERT(CheckColumns(resRead3.GetData(), resRead3.GetMeta(), TTestSchema::ExtractNames(ydbSchema))); - { - std::vector<TString> readData; - readData.push_back(resRead3.GetData()); - auto& schema = resRead3.GetMeta().GetSchema(); - UNIT_ASSERT(DataHas(readData, schema, portion[0])); - UNIT_ASSERT(CheckOrdered(resRead3.GetData(), schema)); - } + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId)); + reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema)); + auto rb = reader.ReadAll(); + UNIT_ASSERT(rb); + NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema)); + UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); + UNIT_ASSERT(rb->num_rows()); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + UNIT_ASSERT(CheckOrdered(rb)); + UNIT_ASSERT(DataHas({rb}, portion[0])); } // read 4 (column by id) { NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 4); - auto read_col1 = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, txId, tableId); - Proto(read_col1.get()).AddColumnIds(1); - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read_col1.release()); - auto event7 = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event7); - - auto& resRead4 = Proto(event7); - UNIT_ASSERT_EQUAL(resRead4.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead4.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead4.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead4.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead4.GetFinished(), true); - UNIT_ASSERT(CheckColumns(resRead4.GetData(), resRead4.GetMeta(), {"timestamp"})); - { - auto& schema = resRead4.GetMeta().GetSchema(); - UNIT_ASSERT(CheckOrdered(resRead4.GetData(), schema)); - } + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId)); + reader.SetReplyColumnIds({1}); + auto rb = reader.ReadAll(); + UNIT_ASSERT(rb); + NArrow::ExtractColumnsValidate(rb, {"timestamp"}); + UNIT_ASSERT((ui32)rb->num_columns() == 1); + UNIT_ASSERT(rb->num_rows()); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + UNIT_ASSERT(CheckOrdered(rb)); + UNIT_ASSERT(DataHas({rb}, portion[0])); } // read 5 (2 columns by name) - auto read_col2 = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, txId, tableId); - Proto(read_col2.get()).AddColumnNames("timestamp"); - Proto(read_col2.get()).AddColumnNames("message"); - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read_col2.release()); - auto event8 = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event8); - - auto& resRead5 = Proto(event8); - UNIT_ASSERT_EQUAL(resRead5.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead5.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead5.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead5.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead5.GetFinished(), true); - UNIT_ASSERT(CheckColumns(resRead5.GetData(), resRead5.GetMeta(), {"timestamp", "message"})); { - auto& schema = resRead5.GetMeta().GetSchema(); - UNIT_ASSERT(CheckOrdered(resRead5.GetData(), schema)); + NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 5); + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId)); + reader.SetReplyColumns({"timestamp", "message"}); + auto rb = reader.ReadAll(); + UNIT_ASSERT(rb); + NArrow::ExtractColumnsValidate(rb, {"timestamp", "message"}); + UNIT_ASSERT((ui32)rb->num_columns() == 2); + UNIT_ASSERT(rb->num_rows()); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + UNIT_ASSERT(CheckOrdered(rb)); + UNIT_ASSERT(DataHas({rb}, portion[0])); } // write 2 (big portion of data): ins:1, cmt:1, idx:0 @@ -839,68 +771,47 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString UNIT_ASSERT(write(runtime, sender, writeId, tableId, MakeTestBlob(portion[2], ydbSchema), ydbSchema, intWriteIds)); // read 6, planstep 0 - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, - new TEvColumnShard::TEvRead(sender, metaShard, 0, 0, tableId)); - auto event9 = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event9); - - auto& resRead6 = Proto(event9); - UNIT_ASSERT_EQUAL(resRead6.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead6.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead6.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead6.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead6.GetFinished(), true); - UNIT_ASSERT_EQUAL(resRead6.GetData(), ""); + { + NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 6); + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(0, 0)); + reader.SetReplyColumns({"timestamp", "message"}); + auto rb = reader.ReadAll(); + UNIT_ASSERT(!rb); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + } // read 7, planstep 21 (part of index) - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, - new TEvColumnShard::TEvRead(sender, metaShard, 21, txId, tableId)); - auto event10 = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event10); - - auto& resRead7 = Proto(event10); - UNIT_ASSERT_EQUAL(resRead7.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead7.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead7.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead7.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead7.GetFinished(), true); - UNIT_ASSERT(resRead7.GetData().size() > 0); - { - std::vector<TString> readData; - readData.push_back(resRead7.GetData()); - auto& schema = resRead7.GetMeta().GetSchema(); - UNIT_ASSERT(DataHas(readData, schema, portion[0])); // checks no checks REPLACE (indexed vs indexed) - UNIT_ASSERT(!DataHas(readData, schema, portion[1])); // checks snapshot filter in indexed data - UNIT_ASSERT(!DataHas(readData, schema, portion[2])); - UNIT_ASSERT(CheckOrdered(resRead7.GetData(), schema)); + NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 7); + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(21, txId)); + reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema)); + auto rb = reader.ReadAll(); + UNIT_ASSERT(rb); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema)); + UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); + UNIT_ASSERT(rb->num_rows()); + UNIT_ASSERT(CheckOrdered(rb)); + UNIT_ASSERT(DataHas({rb}, portion[0])); + UNIT_ASSERT(!DataHas({rb}, portion[1])); + UNIT_ASSERT(!DataHas({rb}, portion[2])); } // read 8, planstep 22 (full index) - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, - new TEvColumnShard::TEvRead(sender, metaShard, 22, txId, tableId)); - auto event11 = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event11); - - auto& resRead8 = Proto(event11); - UNIT_ASSERT_EQUAL(resRead8.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead8.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead8.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead8.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead8.GetFinished(), true); - UNIT_ASSERT(resRead8.GetData().size() > 0); - { - std::vector<TString> readData; - readData.push_back(resRead8.GetData()); - auto& schema = resRead8.GetMeta().GetSchema(); - UNIT_ASSERT(DataHas(readData, schema, portion[0], true)); // checks REPLACE (indexed vs indexed) - UNIT_ASSERT(DataHas(readData, schema, portion[1])); - UNIT_ASSERT(!DataHas(readData, schema, portion[2])); - UNIT_ASSERT(CheckOrdered(resRead8.GetData(), schema)); + NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 8); + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(22, txId)); + reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema)); + auto rb = reader.ReadAll(); + UNIT_ASSERT(rb); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema)); + UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); + UNIT_ASSERT(rb->num_rows()); + UNIT_ASSERT(CheckOrdered(rb)); + UNIT_ASSERT(DataHas({rb}, portion[0])); + UNIT_ASSERT(DataHas({rb}, portion[1])); + UNIT_ASSERT(!DataHas({rb}, portion[2])); } // commit 3: ins:0, cmt:1, idx:1 @@ -917,18 +828,22 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString UNIT_ASSERT(write(runtime, sender, writeId, tableId, MakeTestBlob(portion[3], ydbSchema), ydbSchema, intWriteIds)); // read 9 (committed, indexed) - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, - new TEvColumnShard::TEvRead(sender, metaShard, 23, txId, tableId)); - - TString schema; - NKikimrTxColumnShard::TMetadata meta; - std::vector<TString> readData = ReadManyResults(runtime, schema, meta); - - UNIT_ASSERT(DataHas(readData, schema, portion[0])); - UNIT_ASSERT(DataHas(readData, schema, portion[1])); - UNIT_ASSERT(DataHas(readData, schema, portion[2])); - UNIT_ASSERT(!DataHas(readData, schema, portion[3])); + { + NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 9); + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(23, txId)); + reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema)); + auto rb = reader.ReadAll(); + UNIT_ASSERT(rb); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema)); + UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); + UNIT_ASSERT(rb->num_rows()); + UNIT_ASSERT(CheckOrdered(rb)); + UNIT_ASSERT(DataHas({rb}, portion[0])); + UNIT_ASSERT(DataHas({rb}, portion[1])); + UNIT_ASSERT(DataHas({rb}, portion[2])); + UNIT_ASSERT(!DataHas({rb}, portion[3])); + } // commit 4: ins:0, cmt:2, idx:1 (with duplicates in PK) @@ -938,160 +853,85 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString planCommit(runtime, sender, planStep, txId); // read 10 - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, - new TEvColumnShard::TEvRead(sender, metaShard, 24, txId, tableId)); - readData.clear(); - schema.clear(); - ui32 expected = 1000; - for (ui32 i = 0; i < expected; ++i) { - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); - - auto& resRead = Proto(event); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT(resRead.GetData().size() > 0); - //UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - //UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - bool lastBach = resRead.GetFinished(); - if (lastBach) { - expected = resRead.GetBatch() + 1; - } - readData.push_back(resRead.GetData()); - - auto meta = resRead.GetMeta(); - if (schema.empty()) { - schema = meta.GetSchema(); - } - UNIT_ASSERT(CheckOrdered(resRead.GetData(), schema)); - - if (lastBach) { - UNIT_ASSERT(meta.HasReadStats()); - auto& readStats = meta.GetReadStats(); - - if (ydbSchema == TTestSchema::YdbSchema()) { - Cerr << codec << "/" << readStats.GetCompactedPortionsBytes() << "/" << readStats.GetInsertedPortionsBytes() << "/" << readStats.GetCommittedPortionsBytes() << Endl; - if (readStats.GetInsertedPortionsBytes()) { - UNIT_ASSERT_GE(readStats.GetInsertedPortionsBytes() / 100000, 40); - UNIT_ASSERT_LE(readStats.GetInsertedPortionsBytes() / 100000, 50); - } - if (readStats.GetCommittedPortionsBytes()) { - UNIT_ASSERT_GE(readStats.GetCommittedPortionsBytes() / 100000, 65); - UNIT_ASSERT_LE(readStats.GetCommittedPortionsBytes() / 100000, 78); - } - if (readStats.GetCompactedPortionsBytes()) { - if (codec == "" || codec == "lz4") { - UNIT_ASSERT_GE(readStats.GetCompactedPortionsBytes() / 100000, 40); - UNIT_ASSERT_LE(readStats.GetCompactedPortionsBytes() / 100000, 50); - } else if (codec == "none") { - UNIT_ASSERT_GE(readStats.GetCompactedPortionsBytes() / 100000, 65); - UNIT_ASSERT_LE(readStats.GetCompactedPortionsBytes() / 100000, 78); - } else if (codec == "zstd") { - UNIT_ASSERT_GE(readStats.GetCompactedPortionsBytes() / 100000, 20); - UNIT_ASSERT_LE(readStats.GetCompactedPortionsBytes() / 100000, 30); - } else { - UNIT_ASSERT(false); - } - } + { + NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 10); + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(24, txId)); + reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema)); + auto rb = reader.ReadAll(); + UNIT_ASSERT(rb); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema)); + UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); + UNIT_ASSERT(rb->num_rows()); + UNIT_ASSERT(CheckOrdered(rb)); + UNIT_ASSERT(DataHas({rb}, portion[0])); + UNIT_ASSERT(DataHas({rb}, portion[1])); + UNIT_ASSERT(DataHas({rb}, portion[2])); + UNIT_ASSERT(DataHas({rb}, portion[3])); + UNIT_ASSERT(DataHas({rb}, {0, 500}, true)); + + const ui64 compactedBytes = reader.GetReadStat("compacted_bytes"); + const ui64 insertedBytes = reader.GetReadStat("inserted_bytes"); + const ui64 committedBytes = reader.GetReadStat("committed_bytes"); + Cerr << codec << "/" << compactedBytes << "/" << insertedBytes << "/" << committedBytes << Endl; + if (insertedBytes) { + UNIT_ASSERT_GE(insertedBytes / 100000, 40); + UNIT_ASSERT_LE(insertedBytes / 100000, 50); + } + if (committedBytes) { + UNIT_ASSERT_GE(committedBytes / 100000, 65); + UNIT_ASSERT_LE(committedBytes / 100000, 78); + } + if (compactedBytes) { + if (codec == "" || codec == "lz4") { + UNIT_ASSERT_GE(compactedBytes / 100000, 40); + UNIT_ASSERT_LE(compactedBytes / 100000, 50); + } else if (codec == "none") { + UNIT_ASSERT_GE(compactedBytes / 100000, 65); + UNIT_ASSERT_LE(compactedBytes / 100000, 78); + } else if (codec == "zstd") { + UNIT_ASSERT_GE(compactedBytes / 100000, 20); + UNIT_ASSERT_LE(compactedBytes / 100000, 30); + } else { + UNIT_ASSERT(false); } } } - UNIT_ASSERT(DataHas(readData, schema, portion[0])); - UNIT_ASSERT(DataHas(readData, schema, portion[1])); - UNIT_ASSERT(DataHas(readData, schema, portion[2])); - UNIT_ASSERT(DataHas(readData, schema, portion[3])); - UNIT_ASSERT(DataHas(readData, schema, {0, 500}, true)); // checks REPLACE (committed vs indexed) - // read 11 (range predicate: closed interval) - { - TSerializedTableRange range = MakeTestRange({10, 42}, true, true, testYdbPk); - NOlap::TPredicate prGreater, prLess; - std::tie(prGreater, prLess) = RangePredicates(range, testYdbPk); - - auto evRead = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, 24, txId, tableId); - auto* greater = Proto(evRead.get()).MutableGreaterPredicate(); - auto* less = Proto(evRead.get()).MutableLessPredicate(); - for (auto& name : prGreater.ColumnNames()) { - greater->AddColumnNames(name); - } - for (auto& name : prLess.ColumnNames()) { - less->AddColumnNames(name); - } - greater->SetRow(NArrow::SerializeBatchNoCompression(prGreater.Batch)); - less->SetRow(NArrow::SerializeBatchNoCompression(prLess.Batch)); - greater->SetInclusive(prGreater.IsInclusive()); - less->SetInclusive(prLess.IsInclusive()); - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evRead.release()); - } - readData.clear(); - schema.clear(); + // read 11 (range predicate: closed interval) { - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); - - auto& resRead = Proto(event); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT(resRead.GetData().size() > 0); - UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - readData.push_back(resRead.GetData()); - if (schema.empty()) { - schema = resRead.GetMeta().GetSchema(); - } - UNIT_ASSERT(CheckOrdered(resRead.GetData(), schema)); + NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 11); + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(24, txId)); + reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema)); + reader.AddRange(MakeTestRange({10, 42}, true, true, testYdbPk)); + auto rb = reader.ReadAll(); + UNIT_ASSERT(rb); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema)); + UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); + UNIT_ASSERT(rb->num_rows()); + UNIT_ASSERT(CheckOrdered(rb)); + UNIT_ASSERT(DataHas({rb}, {10, 42 + 1})); + UNIT_ASSERT(DataHasOnly({rb}, {10, 42 + 1})); } - UNIT_ASSERT(DataHas(readData, schema, {10, 42 + 1})); - UNIT_ASSERT(DataHasOnly(readData, schema, {10, 42 + 1})); // read 12 (range predicate: open interval) { - TSerializedTableRange range = MakeTestRange({10, 42}, false, false, testYdbPk); - NOlap::TPredicate prGreater, prLess; - std::tie(prGreater, prLess) = RangePredicates(range, testYdbPk); - - auto evRead = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, 24, txId, tableId); - auto* greater = Proto(evRead.get()).MutableGreaterPredicate(); - auto* less = Proto(evRead.get()).MutableLessPredicate(); - for (auto& name : prGreater.ColumnNames()) { - greater->AddColumnNames(name); - } - for (auto& name : prLess.ColumnNames()) { - less->AddColumnNames(name); - } - - greater->SetRow(NArrow::SerializeBatchNoCompression(prGreater.Batch)); - less->SetRow(NArrow::SerializeBatchNoCompression(prLess.Batch)); - greater->SetInclusive(prGreater.IsInclusive()); - less->SetInclusive(prLess.IsInclusive()); - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evRead.release()); - } - readData.clear(); - schema.clear(); - { - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); - - auto& resRead = Proto(event); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT(resRead.GetData().size() > 0); - UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - readData.push_back(resRead.GetData()); - if (schema.empty()) { - schema = resRead.GetMeta().GetSchema(); - } - UNIT_ASSERT(CheckOrdered(resRead.GetData(), schema)); + NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 11); + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(24, txId)); + reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema)); + reader.AddRange(MakeTestRange({10, 42}, false, false, testYdbPk)); + auto rb = reader.ReadAll(); + UNIT_ASSERT(rb); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema)); + UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); + UNIT_ASSERT(rb->num_rows()); + UNIT_ASSERT(CheckOrdered(rb)); + UNIT_ASSERT(DataHas({rb}, {10 + 1, 41 + 1})); + UNIT_ASSERT(DataHasOnly({rb}, {10 + 1, 41 + 1})); } - UNIT_ASSERT(DataHas(readData, schema, {11, 41 + 1})); - UNIT_ASSERT(DataHasOnly(readData, schema, {11, 41 + 1})); } void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table) { @@ -1132,7 +972,6 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table // - ui64 metaShard = TTestTxConfig::TxTablet1; ui64 writeId = 0; ui64 tableId = 1; ui64 planStep = 100; @@ -1193,37 +1032,19 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table --txId; for (ui32 i = 0; i < 2; ++i) { - auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, txId, tableId); - Proto(read.get()).AddColumnNames("timestamp"); - Proto(read.get()).AddColumnNames("message"); - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); - - TString schema; - NKikimrTxColumnShard::TMetadata meta; - std::vector<TString> readData = ReadManyResults(runtime, schema, meta); + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId)); + reader.SetReplyColumns({"timestamp", "message"}); + auto rb = reader.ReadAll(); + UNIT_ASSERT(rb); + UNIT_ASSERT(reader.IsCorrectlyFinished()); if (ydbPk[0].second == TTypeInfo(NTypeIds::String) || ydbPk[0].second == TTypeInfo(NTypeIds::Utf8)) { - UNIT_ASSERT(DataHas<std::string>(readData, schema, triggerPortion, true)); - UNIT_ASSERT(DataHas<std::string>(readData, schema, smallWrites, true)); + UNIT_ASSERT(DataHas<std::string>({rb}, triggerPortion, true)); + UNIT_ASSERT(DataHas<std::string>({rb}, smallWrites, true)); } else { - UNIT_ASSERT(DataHas(readData, schema, triggerPortion, true)); - UNIT_ASSERT(DataHas(readData, schema, smallWrites, true)); + UNIT_ASSERT(DataHas({rb}, triggerPortion, true)); + UNIT_ASSERT(DataHas({rb}, smallWrites, true)); } - - UNIT_ASSERT(meta.HasReadStats()); - auto& readStats = meta.GetReadStats(); - Cerr << readStats.DebugString() << Endl; - UNIT_ASSERT(readStats.GetBeginTimestamp() > 0); - UNIT_ASSERT(readStats.GetDurationUsec() > 0); - UNIT_ASSERT_VALUES_EQUAL(readStats.GetSelectedIndex(), 0); -// UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 1); - UNIT_ASSERT(readStats.GetIndexBatches() > 0); - UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); - UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), 2); // planStep, txId + 4 PK columns + "message" - UNIT_ASSERT(readStats.GetIndexPortions() > 0); // got compaction - UNIT_ASSERT(readStats.GetIndexPortions() <= 5); // got compaction - RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); } } @@ -1411,7 +1232,6 @@ void TestReadWithProgram(const TestTableDescription& table = {}) options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); runtime.DispatchEvents(options); - ui64 metaShard = TTestTxConfig::TxTablet1; ui64 writeId = 0; ui64 tableId = 1; ui64 planStep = 100; @@ -1454,70 +1274,27 @@ void TestReadWithProgram(const TestTableDescription& table = {}) UNIT_ASSERT(program.SerializeToString(&programs.back())); } - for (auto& programText : programs) { - auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId); - auto& readProto = Proto(readEvent); - - readProto.SetOlapProgramType(::NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM); - readProto.SetOlapProgram(programText); - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, readEvent); - - TAutoPtr<IEventHandle> handle; - auto result = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(result); - - auto& resRead = Proto(result); - - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::ERROR); - UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - UNIT_ASSERT_EQUAL(resRead.GetData(), ""); - } - ui32 i = 0; for (auto& programText : programs) { - auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId); - auto& readProto = Proto(readEvent); - - readProto.SetOlapProgramType(::NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS); - readProto.SetOlapProgram(programText); - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, readEvent); - - TAutoPtr<IEventHandle> handle; - auto result = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(result); - - auto& resRead = Proto(result); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId)); + reader.SetProgram(programText); + auto rb = reader.ReadAll(); if (i < numWrong) { - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::ERROR); - UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - UNIT_ASSERT_EQUAL(resRead.GetData(), ""); + UNIT_ASSERT(reader.IsError()); + UNIT_ASSERT(reader.IsFinished()); } else { - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - - auto& meta = resRead.GetMeta(); - auto& schema = meta.GetSchema(); - - std::vector<TString> readData; - readData.push_back(resRead.GetData()); + UNIT_ASSERT(reader.IsCorrectlyFinished()); switch (i) { case 1: - UNIT_ASSERT(resRead.GetData().size() > 0); - UNIT_ASSERT(CheckColumns(readData[0], meta, {"level", "timestamp"})); - UNIT_ASSERT(DataHas(readData, schema, {0, 100}, true)); + UNIT_ASSERT(rb); + UNIT_ASSERT(rb->num_rows()); + NArrow::ExtractColumnsValidate(rb, {"level", "timestamp"}); + UNIT_ASSERT(rb->num_columns() == 2); + UNIT_ASSERT(DataHas({rb}, {0, 100}, true)); break; case 2: - UNIT_ASSERT(resRead.GetData().size() == 0); + UNIT_ASSERT(!rb || !rb->num_rows()); break; default: break; @@ -1539,7 +1316,6 @@ void TestReadWithProgramLike(const TestTableDescription& table = {}) { options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); runtime.DispatchEvents(options); - ui64 metaShard = TTestTxConfig::TxTablet1; ui64 writeId = 0; ui64 tableId = 1; ui64 planStep = 100; @@ -1568,56 +1344,29 @@ void TestReadWithProgramLike(const TestTableDescription& table = {}) { ui32 i = 0; for (auto& ssa : ssas) { - TString programText; - { - TString serialized; - UNIT_ASSERT(ssa.SerializeToString(&serialized)); - NKikimrSSA::TOlapProgram program; - program.SetProgram(serialized); - UNIT_ASSERT(program.SerializeToString(&programText)); - } - - auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId); - auto& readProto = Proto(readEvent); - - readProto.SetOlapProgramType(::NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS); - readProto.SetOlapProgram(programText); - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, readEvent); - - TAutoPtr<IEventHandle> handle; - auto result = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(result); - - auto& resRead = Proto(result); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - { - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - UNIT_ASSERT(resRead.GetData().size() > 0); - - auto& meta = resRead.GetMeta(); - //auto& schema = meta.GetSchema(); - TString readData = resRead.GetData(); - - switch (i) { - case 0: - case 1: - UNIT_ASSERT(CheckColumns(readData, meta, {"message"}, 19)); - break; - case 2: - case 3: - UNIT_ASSERT(CheckColumns(readData, meta, {"message"}, 11)); - break; - case 4: - case 5: - UNIT_ASSERT(CheckColumns(readData, meta, {"message"}, 10)); - break; - default: - break; - } + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId)); + reader.SetProgram(ssa); + auto rb = reader.ReadAll(); + + UNIT_ASSERT(reader.IsCorrectlyFinished()); + UNIT_ASSERT(rb); + UNIT_ASSERT(rb->num_rows()); + + switch (i) { + case 0: + case 1: + UNIT_ASSERT(CheckColumns(rb, {"message"}, 19)); + break; + case 2: + case 3: + UNIT_ASSERT(CheckColumns(rb, {"message"}, 11)); + break; + case 4: + case 5: + UNIT_ASSERT(CheckColumns(rb, {"message"}, 10)); + break; + default: + break; } ++i; } @@ -1634,7 +1383,6 @@ void TestSomePrograms(const TestTableDescription& table) { options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); runtime.DispatchEvents(options); - ui64 metaShard = TTestTxConfig::TxTablet1; ui64 writeId = 0; ui64 tableId = 1; ui64 planStep = 100; @@ -1667,31 +1415,10 @@ void TestSomePrograms(const TestTableDescription& table) { // TODO: add programs with bugs here for (auto& ssaText : programs) { - auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId); - auto& readProto = Proto(readEvent); - - TString programText; - NKikimrSSA::TOlapProgram program; - program.SetProgram(ssaText); - UNIT_ASSERT(program.SerializeToString(&programText)); - - readProto.SetOlapProgramType(::NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM); - readProto.SetOlapProgram(programText); - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, readEvent); - - TAutoPtr<IEventHandle> handle; - auto result = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(result); - - auto& resRead = Proto(result); - - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::ERROR); - UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - //UNIT_ASSERT_EQUAL(resRead.GetData(), ""); + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId)); + reader.SetProgram(ssaText); + auto rb = reader.ReadAll(); + UNIT_ASSERT(reader.IsError()); } } @@ -1717,7 +1444,6 @@ void TestReadAggregate(const std::vector<std::pair<TString, TTypeInfo>>& ydbSche options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); runtime.DispatchEvents(options); - ui64 metaShard = TTestTxConfig::TxTablet1; ui64 writeId = 0; ui64 tableId = 1; ui64 planStep = 100; @@ -1799,37 +1525,10 @@ void TestReadAggregate(const std::vector<std::pair<TString, TTypeInfo>>& ydbSche for (auto& programText : programs) { Cerr << "-- select program: " << prog << " is filtered: " << (int)isFiltered.count(prog) << "\n"; - auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId); - auto& readProto = Proto(readEvent); - - readProto.SetOlapProgramType(::NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS); - readProto.SetOlapProgram(programText); - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, readEvent); - - TAutoPtr<IEventHandle> handle; - auto result = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(result); - - auto& resRead = Proto(result); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - - std::shared_ptr<arrow::RecordBatch> batch; - { - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - UNIT_ASSERT(resRead.GetData().size() > 0); - - auto& meta = resRead.GetMeta(); - auto& schema = meta.GetSchema(); - auto& data = resRead.GetData(); - - batch = NArrow::DeserializeBatch(data, NArrow::DeserializeSchema(schema)); - UNIT_ASSERT(batch); - UNIT_ASSERT(batch->ValidateFull().ok()); - } + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId)); + reader.SetProgram(programText); + auto batch = reader.ReadAll(); + UNIT_ASSERT(reader.IsCorrectlyFinished()); if (checkResult.contains(prog)) { if (isFiltered.contains(prog)) { @@ -2287,7 +1986,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { TSerializedTableRange MakeRange(const std::vector<std::pair<TString, TTypeInfo>>& pk) const { std::vector<TString> mem; - auto cellsFrom = From ? From->GetCellVec(pk, mem, true) : std::vector<TCell>(); + auto cellsFrom = From ? From->GetCellVec(pk, mem, false) : std::vector<TCell>(); auto cellsTo = To ? To->GetCellVec(pk, mem) : std::vector<TCell>(); return TSerializedTableRange(TConstArrayRef<TCell>(cellsFrom), (From ? From->GetInclude() : false), TConstArrayRef<TCell>(cellsTo), (To ? To->GetInclude(): false)); @@ -2300,98 +1999,22 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { const TString TestCaseName; void Execute() { - const ui64 metaShard = TTestTxConfig::TxTablet1; const ui64 tableId = 1; - const TActorId sender = Owner.Runtime.AllocateEdgeActor(); std::set<TString> useFields = {"timestamp", "message"}; { // read with predicate (FROM) - auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, Owner.PlanStep, Owner.TxId, tableId); - Proto(read.get()).AddColumnNames("timestamp"); - Proto(read.get()).AddColumnNames("message"); - - const TSerializedTableRange range = MakeRange(Owner.YdbPk); - - NOlap::TPredicate prGreater, prLess; - std::tie(prGreater, prLess) = RangePredicates(range, Owner.YdbPk); - if (From) { - auto* greater = Proto(read.get()).MutableGreaterPredicate(); - for (auto& name : prGreater.ColumnNames()) { - greater->AddColumnNames(name); - useFields.emplace(name); - } - greater->SetRow(NArrow::SerializeBatchNoCompression(prGreater.Batch)); - greater->SetInclusive(From->GetInclude()); - }; - if (To) { - auto* less = Proto(read.get()).MutableLessPredicate(); - for (auto& name : prLess.ColumnNames()) { - less->AddColumnNames(name); - useFields.emplace(name); - } - less->SetRow(NArrow::SerializeBatchNoCompression(prLess.Batch)); - less->SetInclusive(To->GetInclude()); - } - - ForwardToTablet(Owner.Runtime, TTestTxConfig::TxTablet0, sender, read.release()); - } - - ui32 numRows = 0; - std::vector<std::shared_ptr<arrow::RecordBatch>> batches; - ui32 expected = 100; - for (ui32 i = 0; i < expected; ++i) { - TAutoPtr<IEventHandle> handle; - auto event = Owner.Runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); - - auto& resRead = Proto(event); - AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("0_type_id", Owner.YdbPk[0].second.GetTypeId())("batch", resRead.GetBatch())("data_size", resRead.GetData().size()); - - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - if (ExpectedCount && !*ExpectedCount) { - UNIT_ASSERT(!resRead.GetBatch()); - UNIT_ASSERT(resRead.GetFinished()); - UNIT_ASSERT(!resRead.GetData().size()); - } else { - UNIT_ASSERT(resRead.HasMeta()); - - auto& meta = resRead.GetMeta(); - auto schema = NArrow::DeserializeSchema(meta.GetSchema()); - UNIT_ASSERT(schema); - auto batch = NArrow::DeserializeBatch(resRead.GetData(), schema); - UNIT_ASSERT(batch); - - numRows += batch->num_rows(); - batches.emplace_back(batch); - - if (resRead.GetFinished()) { - UNIT_ASSERT(meta.HasReadStats()); - auto& readStats = meta.GetReadStats(); - - UNIT_ASSERT(readStats.GetBeginTimestamp()); - UNIT_ASSERT(readStats.GetDurationUsec()); - UNIT_ASSERT_VALUES_EQUAL(readStats.GetSelectedIndex(), 0); - UNIT_ASSERT(readStats.GetIndexBatches()); - //UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO - UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), useFields.size()); // planStep, txId + 4 PK columns + "message" - //UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 1); - //UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), 0); // TODO: min-max index optimization? + NOlap::NTests::TShardReader reader(Owner.Runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(Owner.PlanStep, Owner.TxId)); + reader.SetReplyColumns({"timestamp", "message"}); + reader.AddRange(MakeRange(Owner.YdbPk)); + auto rb = reader.ReadAll(); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + if (ExpectedCount) { + if (*ExpectedCount) { + UNIT_ASSERT(CheckOrdered(rb)); + UNIT_ASSERT(CheckColumns(rb, {"timestamp", "message"}, ExpectedCount)); + } else { + UNIT_ASSERT(!rb || !rb->num_rows()); } } - if (resRead.GetFinished()) { - expected = resRead.GetBatch(); - } - } - UNIT_ASSERT(expected < 100); - - if (ExpectedCount) { - if (numRows != *ExpectedCount) { - for (auto& batch : batches) { - Cerr << batch->ToString() << "\n"; - } - } - UNIT_ASSERT_VALUES_EQUAL(numRows, *ExpectedCount); } } @@ -2432,7 +2055,6 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); runtime.DispatchEvents(options); - const ui64 metaShard = TTestTxConfig::TxTablet1; const ui64 tableId = 1; ui64 planStep = 100; ui64 txId = 100; @@ -2473,43 +2095,21 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { for (ui32 i = 0; i < 2; ++i) { { - auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, txId, tableId); - Proto(read.get()).AddColumnNames("timestamp"); - Proto(read.get()).AddColumnNames("message"); - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); - } - - { - TString schema; - NKikimrTxColumnShard::TMetadata meta; - std::vector<TString> readData = ReadManyResults(runtime, schema, meta, 20000); - { - schema = meta.GetSchema(); - - UNIT_ASSERT(meta.HasReadStats()); - auto& readStats = meta.GetReadStats(); - - UNIT_ASSERT(readStats.GetBeginTimestamp() > 0); - UNIT_ASSERT(readStats.GetDurationUsec() > 0); - UNIT_ASSERT_VALUES_EQUAL(readStats.GetSelectedIndex(), 0); - UNIT_ASSERT(readStats.GetIndexBatches() > 0); - //UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO - UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), 2); // planStep, txId + 4 PK columns + "message" -// UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 3); // got 2 split compactions - //UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), x); - } + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId)); + reader.SetReplyColumns({"timestamp", "message"}); + auto rb = reader.ReadAll(); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + UNIT_ASSERT(CheckOrdered(rb)); if (testBlobOptions.SameValueColumns.contains("timestamp")) { UNIT_ASSERT(!testBlobOptions.SameValueColumns.contains("message")); - UNIT_ASSERT(DataHas<std::string>(readData, schema, { 0, fullNumRows}, true, "message")); + UNIT_ASSERT(DataHas<std::string>({rb}, {0, fullNumRows}, true, "message")); } else { UNIT_ASSERT(isStrPk0 - ? DataHas<std::string>(readData, schema, { 0, fullNumRows}, true, "timestamp") - : DataHas(readData, schema, { 0, fullNumRows}, true, "timestamp")); + ? DataHas<std::string>({rb}, {0, fullNumRows}, true, "timestamp") + : DataHas({rb}, {0, fullNumRows}, true, "timestamp")); } } - std::vector<ui32> val0 = { 0 }; std::vector<ui32> val1 = { 1 }; std::vector<ui32> val9990 = { 99990 }; @@ -2681,7 +2281,6 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); runtime.DispatchEvents(options); - ui64 metaShard = TTestTxConfig::TxTablet1; ui64 writeId = 0; ui64 tableId = 1; ui64 planStep = 1000000; @@ -2705,30 +2304,11 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { TDuration staleness = TDuration::Minutes(6); - // Try to read snapshot that is too old - { - { - auto request = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep - staleness.MilliSeconds(), Max<ui64>(), tableId); - request->Record.AddColumnNames("timestamp"); - request->Record.AddColumnNames("message"); - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, request.release()); - } - - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); - - auto& response = event->Record; - UNIT_ASSERT_VALUES_EQUAL(response.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_VALUES_EQUAL(response.GetTxInitiator(), metaShard); - UNIT_ASSERT_VALUES_EQUAL(response.GetStatus(), (ui32)NKikimrTxColumnShard::EResultStatus::ERROR); - } - // Try to scan snapshot that is too old { { auto request = std::make_unique<TEvColumnShard::TEvScan>(); - request->Record.SetTxId(1000); + request->Record.SetTxId(Max<ui64>()); request->Record.SetScanId(1); request->Record.SetLocalPathId(tableId); request->Record.SetTablePath("test_olap_table"); @@ -2747,6 +2327,15 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { UNIT_ASSERT_VALUES_EQUAL(response.IssuesSize(), 1); UNIT_ASSERT_STRING_CONTAINS(response.GetIssues(0).message(), "Snapshot too old: {640000:max}"); } + + // Try to read snapshot that is too old + { + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep - staleness.MilliSeconds(), Max<ui64>())); + reader.SetReplyColumns({"timestamp", "message"}); + reader.ReadAll(); + UNIT_ASSERT(reader.IsError()); + } + } void TestCompactionGC() { @@ -2763,7 +2352,6 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); runtime.DispatchEvents(options); - ui64 metaShard = TTestTxConfig::TxTablet1; ui64 writeId = 0; ui64 tableId = 1; @@ -2923,32 +2511,12 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { // Send a request that reads the latest version // This request is expected to read at least 1 committed blob and several index portions // These committed blob and portions must not be deleted by the BlobManager until the read request finishes - auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, txId, tableId); - Proto(read.get()).AddColumnNames("timestamp"); - Proto(read.get()).AddColumnNames("message"); - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); - - ui32 expected = 0; - ui32 num = 0; - while (!expected || num < expected) { - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); - - auto& resRead = Proto(event); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - - if (resRead.GetFinished()) { - expected = resRead.GetBatch() + 1; - UNIT_ASSERT(resRead.HasMeta()); - } - UNIT_ASSERT(resRead.GetData().size() > 0); - - ++num; - UNIT_ASSERT(num < 10); - } + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep - 1, Max<ui64>())); + reader.SetReplyColumns({"timestamp", "message"}); + auto rb = reader.ReadAll(); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + UNIT_ASSERT(CheckOrdered(rb)); + UNIT_ASSERT(reader.GetIterationsCount() < 10); // We captured EvReadFinished event and dropped is so the columnshard still thinks that // read request is in progress and keeps the portions diff --git a/ydb/core/tx/columnshard/ut_rw/ya.make b/ydb/core/tx/columnshard/ut_rw/ya.make index 63c9dbed1d..5932bccf75 100644 --- a/ydb/core/tx/columnshard/ut_rw/ya.make +++ b/ydb/core/tx/columnshard/ut_rw/ya.make @@ -21,6 +21,7 @@ PEERDIR( ydb/core/testlib/default ydb/core/tx/columnshard/hooks/abstract ydb/core/tx/columnshard/hooks/testing + ydb/core/tx/columnshard/common/tests ydb/services/metadata ydb/core/tx ydb/public/lib/yson_value diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp index 9640b19c70..dfc554c4ad 100644 --- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp @@ -5,6 +5,7 @@ #include <ydb/core/cms/console/configs_dispatcher.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/tx/schemeshard/schemeshard.h> +#include <ydb/core/tx/columnshard/common/tests/shard_reader.h> #include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> #include <ydb/core/tx/columnshard/hooks/testing/controller.h> #include <ydb/core/tx/columnshard/blobs_reader/actor.h> @@ -144,25 +145,12 @@ bool TriggerTTL(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot s return (res.GetStatus() == NKikimrTxColumnShard::SUCCESS); } -std::shared_ptr<arrow::Array> DeserializeColumn(const TString& blob, const TString& strSchema, - const std::string& columnName) -{ - auto schema = NArrow::DeserializeSchema(strSchema); - auto batch = NArrow::DeserializeBatch(blob, schema); - UNIT_ASSERT(batch); - - //Cerr << "Got data batch (" << batch->num_rows() << " rows): " << batch->ToString() << "\n"; - - std::shared_ptr<arrow::Array> array = batch->GetColumnByName(columnName); - UNIT_ASSERT(array); - return array; -} - -bool CheckSame(const TString& blob, const TString& strSchema, ui32 expectedSize, +bool CheckSame(const std::shared_ptr<arrow::RecordBatch>& batch, const ui32 expectedSize, const std::string& columnName, i64 seconds) { - auto tsCol = DeserializeColumn(blob, strSchema, columnName); + UNIT_ASSERT(batch); + UNIT_ASSERT_VALUES_EQUAL(batch->num_rows(), expectedSize); + auto tsCol = batch->GetColumnByName(columnName); UNIT_ASSERT(tsCol); - UNIT_ASSERT_VALUES_EQUAL(tsCol->length(), expectedSize); std::shared_ptr<arrow::Scalar> expected; switch (tsCol->type_id()) { @@ -239,23 +227,6 @@ enum class EExpectedResult { ERROR }; -TString GetReadResult(NKikimrTxColumnShard::TEvReadResult& resRead, EExpectedResult expected = EExpectedResult::OK_FINISHED) -{ - Cerr << "Got batchNo: " << resRead.GetBatch() << "\n"; - - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), TTestTxConfig::TxTablet1); - if (expected == EExpectedResult::ERROR) { - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::ERROR); - } else { - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - } - if (expected == EExpectedResult::OK_FINISHED) { - UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - } - return resRead.GetData(); -} - static constexpr ui32 PORTION_ROWS = 80 * 1000; // ts[0] = 1600000000; // date -u --date='@1600000000' Sun Sep 13 12:26:40 UTC 2020 @@ -292,7 +263,6 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, // - ui64 metaShard = TTestTxConfig::TxTablet1; ui64 writeId = 0; ui64 tableId = 1; ui64 planStep = 1000000000; // greater then delays @@ -349,20 +319,11 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, { --planStep; - auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId); - Proto(read.get()).AddColumnNames(spec.TtlColumn); - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); - - auto& resRead = Proto(event); - TString data = GetReadResult(resRead); - UNIT_ASSERT_VALUES_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT(data.size() > 0); - - auto& schema = resRead.GetMeta().GetSchema(); - UNIT_ASSERT(CheckSame(data, schema, PORTION_ROWS, spec.TtlColumn, ts[1])); + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, Max<ui64>())); + reader.SetReplyColumns({spec.TtlColumn}); + auto rb = reader.ReadAll(); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + UNIT_ASSERT(CheckSame(rb, PORTION_ROWS, spec.TtlColumn, ts[1])); } // Alter TTL @@ -389,17 +350,11 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, { --planStep; - auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId); - Proto(read.get()).AddColumnNames(spec.TtlColumn); - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); - - auto& resRead = Proto(event); - TString data = GetReadResult(resRead); - UNIT_ASSERT_VALUES_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_VALUES_EQUAL(data.size(), 0); + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, Max<ui64>())); + reader.SetReplyColumns({spec.TtlColumn}); + auto rb = reader.ReadAll(); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + UNIT_ASSERT(!rb || !rb->num_rows()); } // Disable TTL @@ -426,23 +381,11 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, { --planStep; - auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId); - Proto(read.get()).AddColumnNames(spec.TtlColumn); - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); - - auto& resRead = Proto(event); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - UNIT_ASSERT(resRead.GetData().size() > 0); - - auto& schema = resRead.GetMeta().GetSchema(); - UNIT_ASSERT(CheckSame(resRead.GetData(), schema, PORTION_ROWS, spec.TtlColumn, ts[0])); + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, Max<ui64>())); + reader.SetReplyColumns({spec.TtlColumn}); + auto rb = reader.ReadAll(); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + UNIT_ASSERT(CheckSame(rb, PORTION_ROWS, spec.TtlColumn, ts[0])); } } @@ -563,27 +506,6 @@ public: if (ev->GetTypeRewrite() == TEvTablet::EvBoot) { Counters->BlockForgets = false; return false; - } else if (auto* msg = TryGetPrivateEvent<NColumnShard::TEvPrivate::TEvExport>(ev)) { - if (msg->Status == NKikimrProto::OK) { - ss << "EXPORT(done " << ++Counters->ExportCounters.Success << "): "; - } else { - ss << "EXPORT(attempt " << ++Counters->ExportCounters.Attempt << "): " - << NKikimrProto::EReplyStatus_Name(msg->Status); - } - } else if (auto* msg = TryGetPrivateEvent<NColumnShard::TEvPrivate::TEvForget>(ev)) { - if (Counters->BlockForgets) { - ss << "FORGET(ignore " << NKikimrProto::EReplyStatus_Name(msg->Status) << "): "; - ss << " " << ev->Sender << "->" << ev->Recipient; - Cerr << ss << Endl; - return true; - } - - if (msg->Status == NKikimrProto::OK) { - ss << "FORGET(done " << ++Counters->ForgetCounters.Success << "): "; - } else { - ss << "FORGET(attempt " << ++Counters->ForgetCounters.Attempt << "): " - << NKikimrProto::EReplyStatus_Name(msg->Status); - } } else if (auto* msg = TryGetPrivateEvent<NWrappers::NExternalStorage::TEvPutObjectRequest>(ev)) { ss << "S3_REQ(put " << ++Counters->ExportCounters.Request << "):"; } else if (auto* msg = TryGetPrivateEvent<NWrappers::NExternalStorage::TEvPutObjectResponse>(ev)) { @@ -616,8 +538,8 @@ public: } else { return false; } - } else if (auto* msg = TryGetPrivateEvent<TEvColumnShard::TEvReadResult>(ev)) { - ss << "Got TEvReadResult " << NKikimrTxColumnShard::EResultStatus_Name(Proto(msg).GetStatus()) << Endl; + } else if (auto* msg = TryGetPrivateEvent<NKqp::TEvKqpCompute::TEvScanData>(ev)) { + ss << "Got TEvKqpCompute::TEvScanData" << Endl; } else { return false; } @@ -677,7 +599,6 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt // - ui64 metaShard = TTestTxConfig::TxTablet1; ui64 writeId = 0; ui64 tableId = 1; ui64 planStep = 1000000000; // greater then delays @@ -760,38 +681,28 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt } // Read crossed with eviction (start) - if (!misconfig) { - auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep - 1, Max<ui64>(), tableId); - Proto(read.get()).AddColumnNames(specs[i].TtlColumn); - - counter.CaptureReadEvents = specs[i].WaitEmptyAfter ? 0 : 1; // TODO: we need affected by tiering blob here - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); - counter.WaitReadsCaptured(runtime); - } - - // Eviction - - TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {}, 0, specs[i].TtlColumn); + { + std::unique_ptr<NOlap::NTests::TShardReader> reader; + if (!misconfig) { + reader = std::make_unique<NOlap::NTests::TShardReader>(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep - 1, Max<ui64>())); + counter.CaptureReadEvents = specs[i].WaitEmptyAfter ? 0 : 1; // TODO: we need affected by tiering blob here + counter.WaitReadsCaptured(runtime); + reader->InitializeScanner(); + reader->Ack(); + } - Cerr << "-- " << (hasColdEviction ? "COLD" : "HOT") - << " TIERING(" << i << ") num tiers: " << specs[i].Tiers.size() << Endl; + // Eviction - // Read crossed with eviction (finish) - if (!misconfig) { - counter.ResendCapturedReads(runtime); - ui32 numBatches = 0; - THashSet<ui32> batchNumbers; - while (!numBatches || numBatches < batchNumbers.size()) { - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); + TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {}, 0, specs[i].TtlColumn); - auto& resRead = Proto(event); - TString data = GetReadResult(resRead, EExpectedResult::OK); + Cerr << "-- " << (hasColdEviction ? "COLD" : "HOT") + << " TIERING(" << i << ") num tiers: " << specs[i].Tiers.size() << Endl; - batchNumbers.insert(resRead.GetBatch()); - if (resRead.GetFinished()) { - numBatches = resRead.GetBatch() + 1; - } + // Read crossed with eviction (finish) + if (!misconfig) { + counter.ResendCapturedReads(runtime); + reader->ContinueReadAll(); + UNIT_ASSERT(reader->IsCorrectlyFinished()); } } while (csControllerGuard->GetTTLFinishedCounter() != csControllerGuard->GetTTLStartedCounter()) { @@ -812,42 +723,18 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt // Read data after eviction TString columnToRead = specs[i].TtlColumn; - auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep-1, Max<ui64>(), tableId); - Proto(read.get()).AddColumnNames(columnToRead); - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); - - specRowsBytes.emplace_back(0, 0); - ui32 numBatches = 0; - ui32 numExpected = (expectedReadResult == EExpectedResult::ERROR) ? 1 : 100; - for (; numBatches < numExpected; ++numBatches) { - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); - - auto& resRead = Proto(event); - TString data = GetReadResult(resRead, expectedReadResult); - if (expectedReadResult == EExpectedResult::ERROR) { - break; - } - if (!data.size()) { - UNIT_ASSERT(resRead.GetFinished()); - break; - } - - auto& meta = resRead.GetMeta(); - auto& schema = meta.GetSchema(); - auto ttlColumn = DeserializeColumn(resRead.GetData(), schema, columnToRead); - UNIT_ASSERT(ttlColumn); - - specRowsBytes.back().first += ttlColumn->length(); - if (resRead.GetFinished()) { - UNIT_ASSERT(meta.HasReadStats()); - auto& readStats = meta.GetReadStats(); - const ui64 numBytes = readStats.GetCompactedPortionsBytes() + readStats.GetInsertedPortionsBytes() + readStats.GetCommittedPortionsBytes(); - specRowsBytes.back().second += numBytes; - numExpected = resRead.GetBatch() + 1; - } + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep - 1, Max<ui64>())); + reader.SetReplyColumns({columnToRead}); + auto rb = reader.ReadAll(); + if (expectedReadResult == EExpectedResult::ERROR) { + UNIT_ASSERT(reader.IsError()); + specRowsBytes.emplace_back(0, 0); + } else { + UNIT_ASSERT(reader.IsCorrectlyFinished()); + specRowsBytes.emplace_back(reader.GetRecordsCount(), reader.GetReadBytes()); } - UNIT_ASSERT(numBatches < 100); + + UNIT_ASSERT(reader.GetIterationsCount() < 100); if (reboots) { Cerr << "REBOOT(" << i << ")" << Endl; @@ -1103,7 +990,6 @@ void TestDrop(bool reboots) { // - ui64 metaShard = TTestTxConfig::TxTablet1; ui64 writeId = 0; ui64 tableId = 1; ui64 planStep = 1000000000; // greater then delays @@ -1151,17 +1037,11 @@ void TestDrop(bool reboots) { TAutoPtr<IEventHandle> handle; { --planStep; - auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId); - Proto(read.get()).AddColumnNames(TTestSchema::DefaultTtlColumn); - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); - - auto& resRead = Proto(event); - TString data = GetReadResult(resRead); - UNIT_ASSERT_VALUES_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_EQUAL(data.size(), 0); + NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, Max<ui64>())); + reader.SetReplyColumns({TTestSchema::DefaultTtlColumn}); + auto rb = reader.ReadAll(); + UNIT_ASSERT(reader.IsCorrectlyFinished()); + UNIT_ASSERT(!rb || !rb->num_rows()); } } diff --git a/ydb/core/tx/program/program.cpp b/ydb/core/tx/program/program.cpp index b815cc39b9..8537990e37 100644 --- a/ydb/core/tx/program/program.cpp +++ b/ydb/core/tx/program/program.cpp @@ -33,13 +33,26 @@ public: } private: NSsa::TColumnInfo GetColumnInfo(const NKikimrSSA::TProgram::TColumn& column) const { - const ui32 columnId = column.GetId(); - const TString name = ColumnResolver.GetColumnName(columnId, false); - if (name.Empty()) { - return NSsa::TColumnInfo::Generated(columnId, GenerateName(column)); + if (column.HasId() && column.GetId()) { + const ui32 columnId = column.GetId(); + const TString name = ColumnResolver.GetColumnName(columnId, false); + if (name.Empty()) { + return NSsa::TColumnInfo::Generated(columnId, GenerateName(column)); + } else { + Sources.emplace(columnId, NSsa::TColumnInfo::Original(columnId, name)); + return NSsa::TColumnInfo::Original(columnId, name); + } + } else if (column.HasName() && !!column.GetName()) { + const TString name = column.GetName(); + const std::optional<ui32> columnId = ColumnResolver.GetColumnIdOptional(name); + if (columnId) { + Sources.emplace(*columnId, NSsa::TColumnInfo::Original(*columnId, name)); + return NSsa::TColumnInfo::Original(*columnId, name); + } else { + return NSsa::TColumnInfo::Generated(0, GenerateName(column)); + } } else { - Sources.emplace(columnId, NSsa::TColumnInfo::Original(columnId, name)); - return NSsa::TColumnInfo::Original(columnId, name); + return NSsa::TColumnInfo::Generated(0, GenerateName(column)); } } diff --git a/ydb/core/tx/program/program.h b/ydb/core/tx/program/program.h index bb3e67deb2..67cac1845a 100644 --- a/ydb/core/tx/program/program.h +++ b/ydb/core/tx/program/program.h @@ -14,6 +14,7 @@ class IColumnResolver { public: virtual ~IColumnResolver() = default; virtual TString GetColumnName(ui32 id, bool required = true) const = 0; + virtual std::optional<ui32> GetColumnIdOptional(const TString& name) const = 0; virtual const NTable::TScheme::TTableSchema& GetSchema() const = 0; virtual NSsa::TColumnInfo GetDefaultColumn() const = 0; }; |