aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@ydb.tech>2023-12-13 15:08:39 +0300
committerivanmorozov <ivanmorozov@ydb.tech>2023-12-13 16:03:39 +0300
commitb0943903218f95edba19ac132ce7b358dd1e297b (patch)
treea0547970e27a8a39f6eed15814c7ee0f48fa3aa9
parent7800ffcc112d88e155f1bdadd0f4d37afa2b112b (diff)
downloadydb-b0943903218f95edba19ac132ce7b358dd1e297b.tar.gz
KIKIMR-20179: remove deprecated reader from tests
-rw-r--r--.mapping.json6
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_events.h20
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.h8
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp33
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.h4
-rw-r--r--ydb/core/tx/columnshard/columnshard__stats_scan.h9
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.cpp30
-rw-r--r--ydb/core/tx/columnshard/common/CMakeLists.darwin-arm64.txt1
-rw-r--r--ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-arm64.txt21
-rw-r--r--ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-x86_64.txt21
-rw-r--r--ydb/core/tx/columnshard/common/tests/CMakeLists.linux-aarch64.txt22
-rw-r--r--ydb/core/tx/columnshard/common/tests/CMakeLists.linux-x86_64.txt22
-rw-r--r--ydb/core/tx/columnshard/common/tests/CMakeLists.txt19
-rw-r--r--ydb/core/tx/columnshard/common/tests/CMakeLists.windows-x86_64.txt21
-rw-r--r--ydb/core/tx/columnshard/common/tests/shard_reader.cpp4
-rw-r--r--ydb/core/tx/columnshard/common/tests/shard_reader.h271
-rw-r--r--ydb/core/tx/columnshard/common/tests/ya.make14
-rw-r--r--ydb/core/tx/columnshard/engines/reader/description.h3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/context.h9
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.h9
-rw-r--r--ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-arm64.txt1
-rw-r--r--ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/ut_rw/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp1030
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ya.make1
-rw-r--r--ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp232
-rw-r--r--ydb/core/tx/program/program.cpp25
-rw-r--r--ydb/core/tx/program/program.h1
35 files changed, 904 insertions, 942 deletions
diff --git a/.mapping.json b/.mapping.json
index a4a91ab6b9..d7267ee15c 100644
--- a/.mapping.json
+++ b/.mapping.json
@@ -6300,6 +6300,12 @@
"ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt":"",
"ydb/core/tx/columnshard/common/CMakeLists.txt":"",
"ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt":"",
+ "ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-arm64.txt":"",
+ "ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/core/tx/columnshard/common/tests/CMakeLists.linux-aarch64.txt":"",
+ "ydb/core/tx/columnshard/common/tests/CMakeLists.linux-x86_64.txt":"",
+ "ydb/core/tx/columnshard/common/tests/CMakeLists.txt":"",
+ "ydb/core/tx/columnshard/common/tests/CMakeLists.windows-x86_64.txt":"",
"ydb/core/tx/columnshard/counters/CMakeLists.darwin-arm64.txt":"",
"ydb/core/tx/columnshard/counters/CMakeLists.darwin-x86_64.txt":"",
"ydb/core/tx/columnshard/counters/CMakeLists.linux-aarch64.txt":"",
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_events.h b/ydb/core/kqp/compute_actor/kqp_compute_events.h
index f2efd23f21..5d57f94fa4 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_events.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_events.h
@@ -14,6 +14,12 @@ struct TEvKqpCompute {
struct TEvRemoteScanData : public TEventPB<TEvRemoteScanData, NKikimrKqp::TEvRemoteScanData,
TKqpComputeEvents::EvRemoteScanData> {};
+ class IShardScanStats {
+ public:
+ virtual ~IShardScanStats() = default;
+ virtual THashMap<TString, ui64> GetMetrics() const = 0;
+ };
+
/*
* Scan communications.
*
@@ -46,6 +52,19 @@ struct TEvKqpCompute {
bool Finished = false;
bool PageFault = false; // page fault was the reason for sending this message
mutable THolder<TEvRemoteScanData> Remote;
+ std::shared_ptr<IShardScanStats> StatsOnFinished;
+
+ template <class T>
+ const T& GetStatsAs() const {
+ Y_ABORT_UNLESS(!!StatsOnFinished);
+ return VerifyDynamicCast<const T&>(*StatsOnFinished);
+ }
+
+ template <class T>
+ bool CheckStatsIs() const {
+ auto p = dynamic_cast<const T*>(StatsOnFinished.get());
+ return p;
+ }
ui32 GetRowsCount() const {
if (ArrowBatch) {
@@ -225,6 +244,7 @@ struct TEvKqpCompute {
struct TEvKillScanTablet : public NActors::TEventPB<TEvKillScanTablet, NKikimrKqp::TEvKillScanTablet,
TKqpComputeEvents::EvKillScanTablet> {};
+
};
} // namespace NKikimr::NKqp
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h
index dd6a385869..2d3d7e96e0 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.h
@@ -15,6 +15,10 @@ public:
: IndexInfo(indexInfo)
{}
+ virtual std::optional<ui32> GetColumnIdOptional(const TString& name) const override {
+ return IndexInfo.GetColumnIdOptional(name);
+ }
+
TString GetColumnName(ui32 id, bool required) const override {
return IndexInfo.GetColumnName(id, required);
}
@@ -91,6 +95,10 @@ public:
return ReadyResults.size();
}
+ virtual const NOlap::TReadStats& GetStats() const override {
+ return *ReadMetadata->ReadStats;
+ }
+
virtual TString DebugString(const bool verbose) const override {
return TStringBuilder()
<< "ready_results:(" << ReadyResults.DebugString() << ");"
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 9845a5eed8..d3e7138b5a 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -97,6 +97,7 @@ public:
, Stats(NTracing::TTraceClient::GetLocalClient("SHARD", ::ToString(TabletId)/*, "SCAN_TXID:" + ::ToString(TxId)*/))
, ComputeShardingPolicy(computeShardingPolicy)
{
+ AFL_VERIFY(ReadMetadataRanges.size() == 1);
KeyYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetKeyYqlSchema();
}
@@ -396,7 +397,25 @@ private:
return singleRowWriter.Row;
}
- bool SendResult(bool pageFault, bool lastBatch){
+ class TScanStatsOwner: public NKqp::TEvKqpCompute::IShardScanStats {
+ private:
+ YDB_READONLY_DEF(NOlap::TReadStats, Stats);
+ public:
+ TScanStatsOwner(const NOlap::TReadStats& stats)
+ : Stats(stats) {
+
+ }
+
+ virtual THashMap<TString, ui64> GetMetrics() const override {
+ THashMap<TString, ui64> result;
+ result["compacted_bytes"] = Stats.CompactedPortionsBytes;
+ result["inserted_bytes"] = Stats.InsertedPortionsBytes;
+ result["committed_bytes"] = Stats.CommittedPortionsBytes;
+ return result;
+ }
+ };
+
+ bool SendResult(bool pageFault, bool lastBatch) {
if (Finished) {
return true;
}
@@ -429,6 +448,7 @@ private:
<< " bytes: " << Bytes << "/" << BytesSum << " rows: " << Rows << "/" << RowsSum << " page faults: " << Result->PageFaults
<< " finished: " << Result->Finished << " pageFault: " << Result->PageFault
<< " stats:" << Stats->ToJson() << ";iterator:" << (ScanIterator ? ScanIterator->DebugString(false) : "NO");
+ Result->StatsOnFinished = std::make_shared<TScanStatsOwner>(ScanIterator->GetStats());
} else {
Y_ABORT_UNLESS(ChunksLimiter.Take(Bytes));
Result->RequestedBytesLimitReached = !ChunksLimiter.HasMore();
@@ -696,6 +716,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) {
bool isIndexStats = read.TableName.EndsWith(NOlap::TIndexInfo::STORE_INDEX_STATS_TABLE) ||
read.TableName.EndsWith(NOlap::TIndexInfo::TABLE_INDEX_STATS_TABLE);
read.ColumnIds.assign(record.GetColumnTags().begin(), record.GetColumnTags().end());
+ read.StatsMode = record.GetStatsMode();
const NOlap::TIndexInfo* indexInfo = nullptr;
if (!isIndexStats) {
@@ -745,6 +766,7 @@ bool TTxScan::Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) {
ReadMetadataRanges.emplace_back(newRange);
}
Y_ABORT_UNLESS(ReadMetadataRanges.size() == 1);
+
return true;
}
@@ -773,8 +795,7 @@ void TTxScan::Complete(const TActorContext& ctx) {
const ui32 scanGen = request.GetGeneration();
TString table = request.GetTablePath();
auto dataFormat = request.GetDataFormat();
- TDuration timeout = TDuration::MilliSeconds(request.GetTimeoutMs());
-
+ const TDuration timeout = TDuration::MilliSeconds(request.GetTimeoutMs());
if (scanGen > 1) {
Self->IncCounter(COUNTER_SCAN_RESTARTED);
}
@@ -783,8 +804,6 @@ void TTxScan::Complete(const TActorContext& ctx) {
if (IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_TRACE, NKikimrServices::TX_COLUMNSHARD)) {
detailedInfo << " read metadata: (" << TContainerPrinter(ReadMetadataRanges) << ")" << " req: " << request;
}
- std::vector<NOlap::TReadMetadata::TConstPtr> rMetadataRanges;
-
if (ReadMetadataRanges.empty()) {
LOG_S_DEBUG("TTxScan failed "
<< " txId: " << txId
@@ -860,6 +879,10 @@ void TColumnShard::Handle(TEvColumnShard::TEvScan::TPtr& ev, const TActorContext
Execute(new TTxScan(this, ev), ctx);
}
+const NKikimr::NOlap::TReadStats& TScanIteratorBase::GetStats() const {
+ return Default<NOlap::TReadStats>();
+}
+
}
namespace NKikimr::NOlap {
diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h
index 7576cb3247..5c20f2a5bd 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.h
+++ b/ydb/core/tx/columnshard/columnshard__scan.h
@@ -9,6 +9,7 @@
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
namespace NKikimr::NOlap {
+struct TReadStats;
// Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation
class TPartialReadResult {
private:
@@ -94,6 +95,9 @@ public:
virtual void Apply(IDataTasksProcessor::ITask::TPtr /*processor*/) {
}
+
+ virtual const NOlap::TReadStats& GetStats() const;
+
virtual std::optional<ui32> GetAvailableResultsCount() const {
return {};
}
diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h
index 829e57c1dc..0211b47595 100644
--- a/ydb/core/tx/columnshard/columnshard__stats_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h
@@ -28,6 +28,15 @@ public:
return it->second.Name;
}
+ std::optional<ui32> GetColumnIdOptional(const TString& name) const override {
+ auto it = PrimaryIndexStatsSchema.ColumnNames.find(name);
+ if (it == PrimaryIndexStatsSchema.ColumnNames.end()) {
+ return {};
+ } else {
+ return it->second;
+ }
+ }
+
const NTable::TScheme::TTableSchema& GetSchema() const override {
return PrimaryIndexStatsSchema;
}
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
index 9ad4f4b42f..8df44d7df9 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
@@ -1,6 +1,7 @@
#include "columnshard_ut_common.h"
#include "columnshard__stats_scan.h"
+#include "common/tests/shard_reader.h"
#include <ydb/core/base/tablet.h>
#include <ydb/core/base/tablet_resolver.h>
@@ -449,26 +450,15 @@ namespace NKikimr::NColumnShard {
}
std::shared_ptr<arrow::RecordBatch> ReadAllAsBatch(TTestBasicRuntime& runtime, const ui64 tableId, const NOlap::TSnapshot& snapshot, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema) {
- using namespace NTxUT;
- TActorId sender = runtime.AllocateEdgeActor();
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender,
- new TEvColumnShard::TEvRead(sender, TTestTxConfig::TxTablet1, snapshot.GetPlanStep(), snapshot.GetTxId(), tableId));
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
- while(true) {
- TAutoPtr<IEventHandle> handle;
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event);
- auto b = event->GetArrowBatch();
- if (b) {
- batches.push_back(b);
- }
- if (!event->HasMore()) {
- break;
- }
+ std::vector<TString> fields;
+ for (auto&& f : schema) {
+ fields.emplace_back(f.first);
}
- auto res = NArrow::CombineBatches(batches);
- return res ? res : NArrow::MakeEmptyBatch(NArrow::MakeArrowSchema(schema));
+
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, snapshot);
+ reader.SetReplyColumns(fields);
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ return rb ? rb : NArrow::MakeEmptyBatch(NArrow::MakeArrowSchema(schema));
}
}
diff --git a/ydb/core/tx/columnshard/common/CMakeLists.darwin-arm64.txt b/ydb/core/tx/columnshard/common/CMakeLists.darwin-arm64.txt
index a059465374..fb413280b2 100644
--- a/ydb/core/tx/columnshard/common/CMakeLists.darwin-arm64.txt
+++ b/ydb/core/tx/columnshard/common/CMakeLists.darwin-arm64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(tests)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
diff --git a/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt
index a059465374..fb413280b2 100644
--- a/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(tests)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
diff --git a/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt
index 1ef9531173..6d584af68a 100644
--- a/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(tests)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
diff --git a/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt
index 1ef9531173..6d584af68a 100644
--- a/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(tests)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
diff --git a/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt
index a059465374..fb413280b2 100644
--- a/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(tests)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
diff --git a/ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-arm64.txt b/ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-arm64.txt
new file mode 100644
index 0000000000..55055e3cc4
--- /dev/null
+++ b/ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-arm64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-common-tests)
+target_link_libraries(columnshard-common-tests PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ libs-apache-arrow
+ core-formats-arrow
+ core-kqp-compute_actor
+)
+target_sources(columnshard-common-tests PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/tests/shard_reader.cpp
+)
diff --git a/ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..55055e3cc4
--- /dev/null
+++ b/ydb/core/tx/columnshard/common/tests/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-common-tests)
+target_link_libraries(columnshard-common-tests PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ libs-apache-arrow
+ core-formats-arrow
+ core-kqp-compute_actor
+)
+target_sources(columnshard-common-tests PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/tests/shard_reader.cpp
+)
diff --git a/ydb/core/tx/columnshard/common/tests/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/common/tests/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..a6c6e844ce
--- /dev/null
+++ b/ydb/core/tx/columnshard/common/tests/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-common-tests)
+target_link_libraries(columnshard-common-tests PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ libs-apache-arrow
+ core-formats-arrow
+ core-kqp-compute_actor
+)
+target_sources(columnshard-common-tests PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/tests/shard_reader.cpp
+)
diff --git a/ydb/core/tx/columnshard/common/tests/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/common/tests/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..a6c6e844ce
--- /dev/null
+++ b/ydb/core/tx/columnshard/common/tests/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-common-tests)
+target_link_libraries(columnshard-common-tests PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ libs-apache-arrow
+ core-formats-arrow
+ core-kqp-compute_actor
+)
+target_sources(columnshard-common-tests PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/tests/shard_reader.cpp
+)
diff --git a/ydb/core/tx/columnshard/common/tests/CMakeLists.txt b/ydb/core/tx/columnshard/common/tests/CMakeLists.txt
new file mode 100644
index 0000000000..d863ebd180
--- /dev/null
+++ b/ydb/core/tx/columnshard/common/tests/CMakeLists.txt
@@ -0,0 +1,19 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "arm64")
+ include(CMakeLists.darwin-arm64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/columnshard/common/tests/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/common/tests/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..55055e3cc4
--- /dev/null
+++ b/ydb/core/tx/columnshard/common/tests/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-common-tests)
+target_link_libraries(columnshard-common-tests PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ libs-apache-arrow
+ core-formats-arrow
+ core-kqp-compute_actor
+)
+target_sources(columnshard-common-tests PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/tests/shard_reader.cpp
+)
diff --git a/ydb/core/tx/columnshard/common/tests/shard_reader.cpp b/ydb/core/tx/columnshard/common/tests/shard_reader.cpp
new file mode 100644
index 0000000000..2789a63e38
--- /dev/null
+++ b/ydb/core/tx/columnshard/common/tests/shard_reader.cpp
@@ -0,0 +1,4 @@
+#include "shard_reader.h"
+
+namespace NKikimr::NOlap::NTests {
+}
diff --git a/ydb/core/tx/columnshard/common/tests/shard_reader.h b/ydb/core/tx/columnshard/common/tests/shard_reader.h
new file mode 100644
index 0000000000..777c93d8f1
--- /dev/null
+++ b/ydb/core/tx/columnshard/common/tests/shard_reader.h
@@ -0,0 +1,271 @@
+#pragma once
+#include <ydb/core/testlib/basics/runtime.h>
+#include <ydb/core/testlib/tablet_helpers.h>
+#include <ydb/core/tx/columnshard/common/snapshot.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/tx/datashard/datashard.h>
+#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
+#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <ydb/core/tx/columnshard/engines/reader/read_metadata.h>
+#include <ydb/core/tx/columnshard/columnshard_private_events.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <optional>
+
+namespace NKikimr::NOlap::NTests {
+
+class TShardReader {
+private:
+ TTestBasicRuntime& Runtime;
+ const ui64 TabletId;
+ const ui64 PathId;
+ const NOlap::TSnapshot Snapshot;
+ std::optional<NActors::TActorId> ScanActorId;
+ std::optional<int> Finished;
+ THashMap<TString, ui64> ResultStats;
+ std::optional<NKikimrSSA::TProgram> ProgramProto;
+ std::optional<TString> SerializedProgram;
+ YDB_ACCESSOR(bool, Reverse, false);
+ YDB_ACCESSOR(ui32, Limit, 0);
+ std::vector<TString> ReplyColumns;
+ std::vector<TSerializedTableRange> Ranges;
+
+ std::unique_ptr<TEvDataShard::TEvKqpScan> BuildStartEvent() const {
+ auto ev = std::make_unique<TEvDataShard::TEvKqpScan>();
+ ev->Record.SetLocalPathId(PathId);
+ ev->Record.MutableSnapshot()->SetStep(Snapshot.GetPlanStep());
+ ev->Record.MutableSnapshot()->SetTxId(Snapshot.GetTxId());
+
+ ev->Record.SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_FULL);
+ ev->Record.SetTxId(Snapshot.GetTxId());
+
+ ev->Record.SetReverse(Reverse);
+ ev->Record.SetItemsLimit(Limit);
+
+ ev->Record.SetDataFormat(NKikimrDataEvents::FORMAT_ARROW);
+
+ auto protoRanges = ev->Record.MutableRanges();
+ protoRanges->Reserve(Ranges.size());
+ for (auto& range : Ranges) {
+ auto newRange = protoRanges->Add();
+ range.Serialize(*newRange);
+ }
+
+ if (ProgramProto) {
+ NKikimrSSA::TOlapProgram olapProgram;
+ {
+ TString programBytes;
+ TStringOutput stream(programBytes);
+ ProgramProto->SerializeToArcadiaStream(&stream);
+ olapProgram.SetProgram(programBytes);
+ }
+ {
+ TString programBytes;
+ TStringOutput stream(programBytes);
+ olapProgram.SerializeToArcadiaStream(&stream);
+ ev->Record.SetOlapProgram(programBytes);
+ }
+ ev->Record.SetOlapProgramType(
+ NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS
+ );
+ } else if (SerializedProgram) {
+ ev->Record.SetOlapProgram(*SerializedProgram);
+ ev->Record.SetOlapProgramType(
+ NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS
+ );
+ }
+
+ return ev;
+ }
+
+ std::vector<std::shared_ptr<arrow::RecordBatch>> ResultBatches;
+ YDB_READONLY(ui32, IterationsCount, 0);
+public:
+ ui64 GetReadStat(const TString& paramName) const {
+ AFL_VERIFY(IsCorrectlyFinished());
+ auto it = ResultStats.find(paramName);
+ AFL_VERIFY(it != ResultStats.end());
+ return it->second;
+ }
+
+ ui64 GetReadBytes() const {
+ return GetReadStat("committed_bytes") + GetReadStat("inserted_bytes") + GetReadStat("compacted_bytes");
+ }
+
+ void AddRange(const TSerializedTableRange& r) {
+ Ranges.emplace_back(r);
+ }
+
+ ui32 GetRecordsCount() const {
+ AFL_VERIFY(IsFinished());
+ auto r = GetResult();
+ return r ? r->num_rows() : 0;
+ }
+
+ TShardReader& SetReplyColumns(const std::vector<TString>& replyColumns) {
+ AFL_VERIFY(!SerializedProgram);
+ if (!ProgramProto) {
+ ProgramProto = NKikimrSSA::TProgram();
+ }
+ for (auto&& command : *ProgramProto->MutableCommand()) {
+ if (command.HasProjection()) {
+ NKikimrSSA::TProgram::TProjection proj;
+ for (auto&& i : replyColumns) {
+ proj.AddColumns()->SetName(i);
+ }
+ *command.MutableProjection() = proj;
+ return *this;
+ }
+ }
+ {
+ auto* command = ProgramProto->AddCommand();
+ NKikimrSSA::TProgram::TProjection proj;
+ for (auto&& i : replyColumns) {
+ proj.AddColumns()->SetName(i);
+ }
+ *command->MutableProjection() = proj;
+ }
+ return *this;
+ }
+
+ TShardReader& SetReplyColumnIds(const std::vector<ui32>& replyColumnIds) {
+ AFL_VERIFY(!SerializedProgram);
+ if (!ProgramProto) {
+ ProgramProto = NKikimrSSA::TProgram();
+ }
+ for (auto&& command : *ProgramProto->MutableCommand()) {
+ if (command.HasProjection()) {
+ NKikimrSSA::TProgram::TProjection proj;
+ for (auto&& i : replyColumnIds) {
+ proj.AddColumns()->SetId(i);
+ }
+ *command.MutableProjection() = proj;
+ return *this;
+ }
+ }
+ {
+ auto* command = ProgramProto->AddCommand();
+ NKikimrSSA::TProgram::TProjection proj;
+ for (auto&& i : replyColumnIds) {
+ proj.AddColumns()->SetId(i);
+ }
+ *command->MutableProjection() = proj;
+ }
+ return *this;
+ }
+
+ TShardReader& SetProgram(const NKikimrSSA::TProgram& p) {
+ AFL_VERIFY(!ProgramProto);
+ AFL_VERIFY(!SerializedProgram);
+ ProgramProto = p;
+ return *this;
+ }
+
+ TShardReader& SetProgram(const TString& serializedProgram) {
+ AFL_VERIFY(!ProgramProto);
+ AFL_VERIFY(!SerializedProgram);
+ SerializedProgram = serializedProgram;
+ return *this;
+ }
+
+ TShardReader(TTestBasicRuntime& runtime, const ui64 tabletId, const ui64 pathId, const NOlap::TSnapshot& snapshot)
+ : Runtime(runtime)
+ , TabletId(tabletId)
+ , PathId(pathId)
+ , Snapshot(snapshot) {
+
+ }
+
+ bool IsFinished() const {
+ return !!Finished;
+ }
+
+ bool IsCorrectlyFinished() const {
+ return IsFinished() && *Finished == 1;
+ }
+
+ bool IsError() const {
+ return IsFinished() && *Finished == -1;
+ }
+
+ bool InitializeScanner() {
+ AFL_VERIFY(!ScanActorId);
+ const TActorId sender = Runtime.AllocateEdgeActor();
+ ForwardToTablet(Runtime, TabletId, sender, BuildStartEvent().release());
+ TAutoPtr<IEventHandle> handle;
+ auto event = Runtime.GrabEdgeEvents<NKqp::TEvKqpCompute::TEvScanInitActor, NKqp::TEvKqpCompute::TEvScanError>(handle);
+ if (auto* evSuccess = std::get<0>(event)) {
+ AFL_VERIFY(evSuccess);
+ auto& msg = evSuccess->Record;
+ ScanActorId = ActorIdFromProto(msg.GetScanActorId());
+ return true;
+ } else if (auto* evError = std::get<1>(event)) {
+ Finished = -1;
+ } else {
+ AFL_VERIFY(false);
+ }
+ return false;
+ }
+
+ void Ack() {
+ AFL_VERIFY(!Finished);
+ AFL_VERIFY(ScanActorId);
+ Runtime.Send(*ScanActorId, *ScanActorId, new NKqp::TEvKqpCompute::TEvScanDataAck(8 * 1024 * 1024, 0, 1));
+ ++IterationsCount;
+ }
+
+ bool Receive() {
+ AFL_VERIFY(!Finished);
+ TAutoPtr<IEventHandle> handle;
+ auto event = Runtime.GrabEdgeEvents<NKqp::TEvKqpCompute::TEvScanData, NKqp::TEvKqpCompute::TEvScanError>(handle);
+ if (auto* evData = std::get<0>(event)) {
+ auto b = evData->ArrowBatch;
+ if (b) {
+ NArrow::TStatusValidator::Validate(b->ValidateFull());
+ ResultBatches.push_back(b);
+ } else {
+ AFL_VERIFY(evData->Finished);
+ }
+ if (evData->Finished) {
+ AFL_VERIFY(evData->StatsOnFinished);
+ ResultStats = evData->StatsOnFinished->GetMetrics();
+ Finished = 1;
+ }
+ } else if (auto* evError = std::get<1>(event)) {
+ Finished = -1;
+ } else {
+ AFL_VERIFY(false);
+ }
+ return !Finished;
+ }
+
+ std::shared_ptr<arrow::RecordBatch> ReadAll() {
+ if (InitializeScanner()) {
+ Ack();
+ return ContinueReadAll();
+ }
+ return GetResult();
+ }
+
+ std::shared_ptr<arrow::RecordBatch> ContinueReadAll() {
+ while (Receive()) {
+ Ack();
+ }
+ return GetResult();
+ }
+
+ std::shared_ptr<arrow::RecordBatch> GetResult() const {
+ AFL_VERIFY(!!Finished);
+ if (IsError()) {
+ return nullptr;
+ }
+ if (ResultBatches.empty()) {
+ return nullptr;
+ } else {
+ auto result = NArrow::CombineBatches(ResultBatches);
+ NArrow::TStatusValidator::Validate(result->ValidateFull());
+ return result;
+ }
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/common/tests/ya.make b/ydb/core/tx/columnshard/common/tests/ya.make
new file mode 100644
index 0000000000..98401ad0e7
--- /dev/null
+++ b/ydb/core/tx/columnshard/common/tests/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+SRCS(
+ shard_reader.cpp
+)
+
+PEERDIR(
+ ydb/core/protos
+ contrib/libs/apache/arrow
+ ydb/core/formats/arrow
+ ydb/core/kqp/compute_actor
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/reader/description.h b/ydb/core/tx/columnshard/engines/reader/description.h
index 75b6aa4f32..f05943bffa 100644
--- a/ydb/core/tx/columnshard/engines/reader/description.h
+++ b/ydb/core/tx/columnshard/engines/reader/description.h
@@ -1,7 +1,7 @@
#pragma once
#include <ydb/core/tx/program/program.h>
#include <ydb/core/tx/columnshard/engines/predicate/filter.h>
-
+#include <ydb/library/yql/dq/actors/protos/dq_stats.pb.h>
namespace NKikimr::NOlap {
// Describes read/scan request
@@ -18,6 +18,7 @@ public:
// There's complex logic in NKikimr::TTableRange comparison that could be emulated only with separated compare
// operations with potentially different columns. We have to remove columns to support -Inf (Null) and +Inf.
NOlap::TPKRangesFilter PKRangesFilter;
+ NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE;
// List of columns
std::vector<ui32> ColumnIds;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h
index 02193c459a..2c6f213143 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/context.h
@@ -52,7 +52,14 @@ public:
auto readSchema = ReadMetadata->GetLoadSchema(ReadMetadata->GetSnapshot());
SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSpecialColumnIdsSet(), ReadMetadata->GetIndexInfo(), readSchema);
- EFColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetEarlyFilterColumnIds(), ReadMetadata->GetIndexInfo(), readSchema);
+ {
+ auto efColumns = ReadMetadata->GetEarlyFilterColumnIds();
+ if (efColumns.size()) {
+ EFColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetEarlyFilterColumnIds(), ReadMetadata->GetIndexInfo(), readSchema);
+ } else {
+ EFColumns = SpecColumns;
+ }
+ }
*EFColumns = *EFColumns + *SpecColumns;
if (ReadMetadata->HasProcessingColumnIds()) {
FFColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetProcessingColumnIds(), ReadMetadata->GetIndexInfo(), readSchema);
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
index f5474a7497..83d0ded5f5 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
@@ -42,6 +42,7 @@ bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataSto
CommittedBlobs = dataAccessor.GetCommitedBlobs(readDescription, ResultIndexSchema->GetIndexInfo().GetReplaceKey());
SelectInfo = dataAccessor.Select(readDescription);
+ StatsMode = readDescription.StatsMode;
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
index a09a017ce2..01ca8575e4 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
@@ -42,13 +42,17 @@ struct TReadStats {
ui32 SelectedRows = 0;
- TReadStats(ui32 indexNo)
+ TReadStats(ui32 indexNo = 0)
: BeginTimestamp(TInstant::Now())
, SelectedIndex(indexNo)
{}
void PrintToLog();
+ ui64 GetReadBytes() const {
+ return CompactedPortionsBytes + InsertedPortionsBytes + CompactedPortionsBytes;
+ }
+
TDuration Duration() {
return TInstant::Now() - BeginTimestamp;
}
@@ -99,7 +103,7 @@ public:
}
virtual ~TReadMetadataBase() = default;
- ui64 Limit{0}; // TODO
+ ui64 Limit = 0;
virtual void Dump(IOutputStream& out) const {
out << " predicate{" << (PKRangesFilter ? PKRangesFilter->DebugString() : "no_initialized") << "}"
@@ -149,6 +153,7 @@ public:
return result;
}
std::shared_ptr<TSelectInfo> SelectInfo;
+ NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE;
std::vector<TCommittedBlob> CommittedBlobs;
std::shared_ptr<TReadStats> ReadStats;
diff --git a/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-arm64.txt b/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-arm64.txt
index cac2dc7c55..737bdbf11b 100644
--- a/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-arm64.txt
+++ b/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-arm64.txt
@@ -25,6 +25,7 @@ target_link_libraries(ydb-core-tx-columnshard-ut_rw PUBLIC
core-testlib-default
columnshard-hooks-abstract
columnshard-hooks-testing
+ columnshard-common-tests
ydb-services-metadata
ydb-core-tx
public-lib-yson_value
diff --git a/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-x86_64.txt
index aec0c14a8a..edb570ec79 100644
--- a/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-x86_64.txt
@@ -26,6 +26,7 @@ target_link_libraries(ydb-core-tx-columnshard-ut_rw PUBLIC
core-testlib-default
columnshard-hooks-abstract
columnshard-hooks-testing
+ columnshard-common-tests
ydb-services-metadata
ydb-core-tx
public-lib-yson_value
diff --git a/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-aarch64.txt
index 8524515b05..3d90143420 100644
--- a/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-aarch64.txt
@@ -26,6 +26,7 @@ target_link_libraries(ydb-core-tx-columnshard-ut_rw PUBLIC
core-testlib-default
columnshard-hooks-abstract
columnshard-hooks-testing
+ columnshard-common-tests
ydb-services-metadata
ydb-core-tx
public-lib-yson_value
diff --git a/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-x86_64.txt
index fb7f19246f..b9e15ace09 100644
--- a/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-x86_64.txt
@@ -27,6 +27,7 @@ target_link_libraries(ydb-core-tx-columnshard-ut_rw PUBLIC
core-testlib-default
columnshard-hooks-abstract
columnshard-hooks-testing
+ columnshard-common-tests
ydb-services-metadata
ydb-core-tx
public-lib-yson_value
diff --git a/ydb/core/tx/columnshard/ut_rw/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/ut_rw/CMakeLists.windows-x86_64.txt
index 2ef0565f74..68c91949bc 100644
--- a/ydb/core/tx/columnshard/ut_rw/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/ut_rw/CMakeLists.windows-x86_64.txt
@@ -26,6 +26,7 @@ target_link_libraries(ydb-core-tx-columnshard-ut_rw PUBLIC
core-testlib-default
columnshard-hooks-abstract
columnshard-hooks-testing
+ columnshard-common-tests
ydb-services-metadata
ydb-core-tx
public-lib-yson_value
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 97e883aa3c..55fa1e342e 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -11,6 +11,7 @@
#include <ydb/core/tx/columnshard/operations/write_data.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
+#include <ydb/core/tx/columnshard/common/tests/shard_reader.h>
#include <ydb/library/actors/protos/unittests.pb.h>
#include <ydb/core/formats/arrow/simple_builder/filler.h>
#include <ydb/core/formats/arrow/simple_builder/array.h>
@@ -41,7 +42,7 @@ public:
};
template <typename TKey = ui64>
-bool DataHas(const std::vector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range,
+bool DataHas(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::pair<ui64, ui64> range,
bool requireUniq = false, const std::string& columnName = "timestamp") {
static constexpr const bool isStrKey = std::is_same_v<TKey, std::string>;
@@ -54,14 +55,8 @@ bool DataHas(const std::vector<TString>& blobs, const TString& srtSchema, std::p
}
}
- auto schema = NArrow::DeserializeSchema(srtSchema);
- //Cerr << "Got schema: " << schema->ToString() << "\n";
-
- for (auto& blob : blobs) {
- auto batch = NArrow::DeserializeBatch(blob, schema);
+ for (auto& batch : batches) {
UNIT_ASSERT(batch);
- //Cerr << "Got batch: " << batch->ToString() << "\n";
-
std::shared_ptr<arrow::Array> array = batch->GetColumnByName(columnName);
UNIT_ASSERT(array);
@@ -104,7 +99,20 @@ bool DataHas(const std::vector<TString>& blobs, const TString& srtSchema, std::p
}
template <typename TKey = ui64>
-bool DataHasOnly(const std::vector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range) {
+bool DataHas(const std::vector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range,
+ bool requireUniq = false, const std::string& columnName = "timestamp") {
+
+ auto schema = NArrow::DeserializeSchema(srtSchema);
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+ for (auto& blob : blobs) {
+ batches.emplace_back(NArrow::DeserializeBatch(blob, schema));
+ }
+
+ return DataHas<TKey>(batches, range, requireUniq, columnName);
+}
+
+template <typename TKey = ui64>
+bool DataHasOnly(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::pair<ui64, ui64> range) {
static constexpr const bool isStrKey = std::is_same_v<TKey, std::string>;
THashSet<TKey> keys;
@@ -116,9 +124,7 @@ bool DataHasOnly(const std::vector<TString>& blobs, const TString& srtSchema, st
}
}
- auto schema = NArrow::DeserializeSchema(srtSchema);
- for (auto& blob : blobs) {
- auto batch = NArrow::DeserializeBatch(blob, schema);
+ for (auto& batch : batches) {
UNIT_ASSERT(batch);
std::shared_ptr<arrow::Array> array = batch->GetColumnByName("timestamp");
@@ -154,6 +160,17 @@ bool DataHasOnly(const std::vector<TString>& blobs, const TString& srtSchema, st
return true;
}
+template <typename TKey = ui64>
+bool DataHasOnly(const std::vector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range) {
+ auto schema = NArrow::DeserializeSchema(srtSchema);
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+ for (auto& blob : blobs) {
+ batches.emplace_back(NArrow::DeserializeBatch(blob, schema));
+ }
+
+ return DataHasOnly(batches, range);
+}
+
template <typename TArrowType>
bool CheckTypedIntValues(const std::shared_ptr<arrow::Array>& array, const std::vector<int64_t>& expected) {
using TArray = typename arrow::TypeTraits<TArrowType>::ArrayType;
@@ -238,9 +255,7 @@ bool CheckIntValues(const std::shared_ptr<arrow::Array>& array, const std::vecto
return true;
}
-bool CheckOrdered(const TString& blob, const TString& srtSchema) {
- auto schema = NArrow::DeserializeSchema(srtSchema);
- auto batch = NArrow::DeserializeBatch(blob, schema);
+bool CheckOrdered(const std::shared_ptr<arrow::RecordBatch>& batch) {
UNIT_ASSERT(batch);
std::shared_ptr<arrow::Array> array = batch->GetColumnByName("timestamp");
@@ -300,10 +315,12 @@ bool CheckOrdered(const TString& blob, const TString& srtSchema) {
return true;
}
-bool CheckColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<TString>& colNames, size_t rowsCount) {
+bool CheckColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<TString>& colNames, std::optional<size_t> rowsCount) {
UNIT_ASSERT(batch);
UNIT_ASSERT_VALUES_EQUAL((ui64)batch->num_columns(), colNames.size());
- UNIT_ASSERT_VALUES_EQUAL((ui64)batch->num_rows(), rowsCount);
+ if (rowsCount) {
+ UNIT_ASSERT_VALUES_EQUAL((ui64)batch->num_rows(), *rowsCount);
+ }
UNIT_ASSERT(batch->ValidateFull().ok());
for (size_t i = 0; i < colNames.size(); ++i) {
@@ -318,46 +335,6 @@ bool CheckColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const std::v
return true;
}
-bool CheckColumns(const TString& blob, const NKikimrTxColumnShard::TMetadata& meta, const std::vector<TString>& colNames,
- size_t rowsCount = 100) {
- auto schema = NArrow::DeserializeSchema(meta.GetSchema());
- auto batch = NArrow::DeserializeBatch(blob, schema);
-
- return CheckColumns(batch, colNames, rowsCount);
-}
-
-std::vector<TString> ReadManyResults(TTestBasicRuntime& runtime, TString& schema,
- NKikimrTxColumnShard::TMetadata& meta, ui32 expected = 1000) {
- std::vector<TString> readData;
- TAutoPtr<IEventHandle> handle;
- bool finished = false;
- for (ui32 i = 0; i < expected; ++i) {
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event);
-
- auto& resRead = Proto(event);
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), TTestTxConfig::TxTablet1);
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT(resRead.GetData().size() > 0);
- //UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
- //Cerr << "GOT BATCH " << resRead.GetBatch() << " data size " << resRead.GetData().size() << "\n";
- if (resRead.GetFinished()) {
- expected = resRead.GetBatch() + 1;
- meta = resRead.GetMeta();
- finished = true;
- }
- readData.push_back(resRead.GetData());
-
- if (schema.empty()) {
- schema = resRead.GetMeta().GetSchema();
- }
- UNIT_ASSERT(CheckOrdered(resRead.GetData(), schema));
- }
- UNIT_ASSERT(finished);
- return readData;
-}
-
void TestWrite(const TestTableDescription& table) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -533,7 +510,6 @@ void TestWriteReadDup(const TestTableDescription& table = {}) {
//
- ui64 metaShard = TTestTxConfig::TxTablet1;
ui64 writeId = 0;
ui64 tableId = 1;
@@ -559,13 +535,12 @@ void TestWriteReadDup(const TestTableDescription& table = {}) {
// read
if (planStep != initPlanStep) {
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender,
- new TEvColumnShard::TEvRead(sender, metaShard, planStep-1, Max<ui64>(), tableId));
-
- TString schema;
- NKikimrTxColumnShard::TMetadata meta;
- std::vector<TString> readData = ReadManyResults(runtime, schema, meta);
- UNIT_ASSERT(DataHas(readData, schema, portion, true));
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep - 1, Max<ui64>()));
+ reader.SetReplyColumns({"timestamp"});
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ UNIT_ASSERT(CheckOrdered(rb));
+ UNIT_ASSERT(DataHas({rb}, portion, true));
}
}
}
@@ -619,24 +594,17 @@ void TestWriteReadLongTxDup() {
// read
TAutoPtr<IEventHandle> handle;
{
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender,
- new TEvColumnShard::TEvRead(sender, 0, planStep, txId, tableId));
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event);
-
- auto& resRead = Proto(event);
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), 0);
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
- UNIT_ASSERT(resRead.GetData().size() > 0);
-
- auto data = resRead.GetData();
- auto meta = resRead.GetMeta();
- UNIT_ASSERT(CheckColumns(data, meta, TTestSchema::ExtractNames(ydbSchema), numRows));
- UNIT_ASSERT(DataHas(std::vector<TString>{data}, meta.GetSchema(), portion, true));
- UNIT_ASSERT(DataHasOnly(std::vector<TString>{data}, meta.GetSchema(), portion));
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId));
+ reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema));
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ UNIT_ASSERT(rb);
+ UNIT_ASSERT(rb->num_rows());
+ NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema));
+ UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size());
+ UNIT_ASSERT(CheckOrdered(rb));
+ UNIT_ASSERT(DataHas({rb}, portion, true));
+ UNIT_ASSERT(DataHasOnly({rb}, portion));
}
}
@@ -681,7 +649,6 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
//
- ui64 metaShard = TTestTxConfig::TxTablet1;
ui64 writeId = 0;
ui64 tableId = 1;
@@ -710,18 +677,12 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
TAutoPtr<IEventHandle> handle;
{
NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 1);
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender,
- new TEvColumnShard::TEvRead(sender, metaShard, 0, 0, tableId));
- auto event2 = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event2);
-
- auto& resRead = Proto(event2);
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
- UNIT_ASSERT_EQUAL(resRead.GetData(), "");
+
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(0, 0));
+ reader.SetReplyColumns({"resource_type"});
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ UNIT_ASSERT_EQUAL(rb, nullptr);
}
// commit 1: ins:0, cmt:1, idx:0
@@ -733,86 +694,57 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
// read 2 (committed, old snapshot)
{
NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 2);
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender,
- new TEvColumnShard::TEvRead(sender, metaShard, 0, 0, tableId));
- auto event5 = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event5);
- auto& resRead2 = Proto(event5);
- UNIT_ASSERT_EQUAL(resRead2.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead2.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead2.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_EQUAL(resRead2.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead2.GetFinished(), true);
- UNIT_ASSERT_EQUAL(resRead2.GetData(), "");
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(0, 0));
+ reader.SetReplyColumns({"resource_type"});
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ UNIT_ASSERT_EQUAL(rb, nullptr);
}
// read 3 (committed)
{
NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 3);
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender,
- new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId));
- auto event6 = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event6);
-
- auto& resRead3 = Proto(event6);
- UNIT_ASSERT_EQUAL(resRead3.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead3.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead3.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_EQUAL(resRead3.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead3.GetFinished(), true);
- //UNIT_ASSERT_EQUAL(resRead3.GetData(), data);
- UNIT_ASSERT(resRead3.GetData().size() > 0);
- UNIT_ASSERT(CheckColumns(resRead3.GetData(), resRead3.GetMeta(), TTestSchema::ExtractNames(ydbSchema)));
- {
- std::vector<TString> readData;
- readData.push_back(resRead3.GetData());
- auto& schema = resRead3.GetMeta().GetSchema();
- UNIT_ASSERT(DataHas(readData, schema, portion[0]));
- UNIT_ASSERT(CheckOrdered(resRead3.GetData(), schema));
- }
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId));
+ reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema));
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(rb);
+ NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema));
+ UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size());
+ UNIT_ASSERT(rb->num_rows());
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ UNIT_ASSERT(CheckOrdered(rb));
+ UNIT_ASSERT(DataHas({rb}, portion[0]));
}
// read 4 (column by id)
{
NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 4);
- auto read_col1 = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, txId, tableId);
- Proto(read_col1.get()).AddColumnIds(1);
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read_col1.release());
- auto event7 = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event7);
-
- auto& resRead4 = Proto(event7);
- UNIT_ASSERT_EQUAL(resRead4.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead4.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead4.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_EQUAL(resRead4.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead4.GetFinished(), true);
- UNIT_ASSERT(CheckColumns(resRead4.GetData(), resRead4.GetMeta(), {"timestamp"}));
- {
- auto& schema = resRead4.GetMeta().GetSchema();
- UNIT_ASSERT(CheckOrdered(resRead4.GetData(), schema));
- }
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId));
+ reader.SetReplyColumnIds({1});
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(rb);
+ NArrow::ExtractColumnsValidate(rb, {"timestamp"});
+ UNIT_ASSERT((ui32)rb->num_columns() == 1);
+ UNIT_ASSERT(rb->num_rows());
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ UNIT_ASSERT(CheckOrdered(rb));
+ UNIT_ASSERT(DataHas({rb}, portion[0]));
}
// read 5 (2 columns by name)
- auto read_col2 = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, txId, tableId);
- Proto(read_col2.get()).AddColumnNames("timestamp");
- Proto(read_col2.get()).AddColumnNames("message");
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read_col2.release());
- auto event8 = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event8);
-
- auto& resRead5 = Proto(event8);
- UNIT_ASSERT_EQUAL(resRead5.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead5.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead5.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_EQUAL(resRead5.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead5.GetFinished(), true);
- UNIT_ASSERT(CheckColumns(resRead5.GetData(), resRead5.GetMeta(), {"timestamp", "message"}));
{
- auto& schema = resRead5.GetMeta().GetSchema();
- UNIT_ASSERT(CheckOrdered(resRead5.GetData(), schema));
+ NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 5);
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId));
+ reader.SetReplyColumns({"timestamp", "message"});
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(rb);
+ NArrow::ExtractColumnsValidate(rb, {"timestamp", "message"});
+ UNIT_ASSERT((ui32)rb->num_columns() == 2);
+ UNIT_ASSERT(rb->num_rows());
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ UNIT_ASSERT(CheckOrdered(rb));
+ UNIT_ASSERT(DataHas({rb}, portion[0]));
}
// write 2 (big portion of data): ins:1, cmt:1, idx:0
@@ -839,68 +771,47 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
UNIT_ASSERT(write(runtime, sender, writeId, tableId, MakeTestBlob(portion[2], ydbSchema), ydbSchema, intWriteIds));
// read 6, planstep 0
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender,
- new TEvColumnShard::TEvRead(sender, metaShard, 0, 0, tableId));
- auto event9 = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event9);
-
- auto& resRead6 = Proto(event9);
- UNIT_ASSERT_EQUAL(resRead6.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead6.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead6.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_EQUAL(resRead6.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead6.GetFinished(), true);
- UNIT_ASSERT_EQUAL(resRead6.GetData(), "");
+ {
+ NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 6);
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(0, 0));
+ reader.SetReplyColumns({"timestamp", "message"});
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(!rb);
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ }
// read 7, planstep 21 (part of index)
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender,
- new TEvColumnShard::TEvRead(sender, metaShard, 21, txId, tableId));
- auto event10 = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event10);
-
- auto& resRead7 = Proto(event10);
- UNIT_ASSERT_EQUAL(resRead7.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead7.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead7.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_EQUAL(resRead7.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead7.GetFinished(), true);
- UNIT_ASSERT(resRead7.GetData().size() > 0);
-
{
- std::vector<TString> readData;
- readData.push_back(resRead7.GetData());
- auto& schema = resRead7.GetMeta().GetSchema();
- UNIT_ASSERT(DataHas(readData, schema, portion[0])); // checks no checks REPLACE (indexed vs indexed)
- UNIT_ASSERT(!DataHas(readData, schema, portion[1])); // checks snapshot filter in indexed data
- UNIT_ASSERT(!DataHas(readData, schema, portion[2]));
- UNIT_ASSERT(CheckOrdered(resRead7.GetData(), schema));
+ NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 7);
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(21, txId));
+ reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema));
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(rb);
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema));
+ UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size());
+ UNIT_ASSERT(rb->num_rows());
+ UNIT_ASSERT(CheckOrdered(rb));
+ UNIT_ASSERT(DataHas({rb}, portion[0]));
+ UNIT_ASSERT(!DataHas({rb}, portion[1]));
+ UNIT_ASSERT(!DataHas({rb}, portion[2]));
}
// read 8, planstep 22 (full index)
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender,
- new TEvColumnShard::TEvRead(sender, metaShard, 22, txId, tableId));
- auto event11 = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event11);
-
- auto& resRead8 = Proto(event11);
- UNIT_ASSERT_EQUAL(resRead8.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead8.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead8.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_EQUAL(resRead8.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead8.GetFinished(), true);
- UNIT_ASSERT(resRead8.GetData().size() > 0);
-
{
- std::vector<TString> readData;
- readData.push_back(resRead8.GetData());
- auto& schema = resRead8.GetMeta().GetSchema();
- UNIT_ASSERT(DataHas(readData, schema, portion[0], true)); // checks REPLACE (indexed vs indexed)
- UNIT_ASSERT(DataHas(readData, schema, portion[1]));
- UNIT_ASSERT(!DataHas(readData, schema, portion[2]));
- UNIT_ASSERT(CheckOrdered(resRead8.GetData(), schema));
+ NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 8);
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(22, txId));
+ reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema));
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(rb);
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema));
+ UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size());
+ UNIT_ASSERT(rb->num_rows());
+ UNIT_ASSERT(CheckOrdered(rb));
+ UNIT_ASSERT(DataHas({rb}, portion[0]));
+ UNIT_ASSERT(DataHas({rb}, portion[1]));
+ UNIT_ASSERT(!DataHas({rb}, portion[2]));
}
// commit 3: ins:0, cmt:1, idx:1
@@ -917,18 +828,22 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
UNIT_ASSERT(write(runtime, sender, writeId, tableId, MakeTestBlob(portion[3], ydbSchema), ydbSchema, intWriteIds));
// read 9 (committed, indexed)
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender,
- new TEvColumnShard::TEvRead(sender, metaShard, 23, txId, tableId));
-
- TString schema;
- NKikimrTxColumnShard::TMetadata meta;
- std::vector<TString> readData = ReadManyResults(runtime, schema, meta);
-
- UNIT_ASSERT(DataHas(readData, schema, portion[0]));
- UNIT_ASSERT(DataHas(readData, schema, portion[1]));
- UNIT_ASSERT(DataHas(readData, schema, portion[2]));
- UNIT_ASSERT(!DataHas(readData, schema, portion[3]));
+ {
+ NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 9);
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(23, txId));
+ reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema));
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(rb);
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema));
+ UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size());
+ UNIT_ASSERT(rb->num_rows());
+ UNIT_ASSERT(CheckOrdered(rb));
+ UNIT_ASSERT(DataHas({rb}, portion[0]));
+ UNIT_ASSERT(DataHas({rb}, portion[1]));
+ UNIT_ASSERT(DataHas({rb}, portion[2]));
+ UNIT_ASSERT(!DataHas({rb}, portion[3]));
+ }
// commit 4: ins:0, cmt:2, idx:1 (with duplicates in PK)
@@ -938,160 +853,85 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
planCommit(runtime, sender, planStep, txId);
// read 10
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender,
- new TEvColumnShard::TEvRead(sender, metaShard, 24, txId, tableId));
- readData.clear();
- schema.clear();
- ui32 expected = 1000;
- for (ui32 i = 0; i < expected; ++i) {
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event);
-
- auto& resRead = Proto(event);
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT(resRead.GetData().size() > 0);
- //UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
- //UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
- bool lastBach = resRead.GetFinished();
- if (lastBach) {
- expected = resRead.GetBatch() + 1;
- }
- readData.push_back(resRead.GetData());
-
- auto meta = resRead.GetMeta();
- if (schema.empty()) {
- schema = meta.GetSchema();
- }
- UNIT_ASSERT(CheckOrdered(resRead.GetData(), schema));
-
- if (lastBach) {
- UNIT_ASSERT(meta.HasReadStats());
- auto& readStats = meta.GetReadStats();
-
- if (ydbSchema == TTestSchema::YdbSchema()) {
- Cerr << codec << "/" << readStats.GetCompactedPortionsBytes() << "/" << readStats.GetInsertedPortionsBytes() << "/" << readStats.GetCommittedPortionsBytes() << Endl;
- if (readStats.GetInsertedPortionsBytes()) {
- UNIT_ASSERT_GE(readStats.GetInsertedPortionsBytes() / 100000, 40);
- UNIT_ASSERT_LE(readStats.GetInsertedPortionsBytes() / 100000, 50);
- }
- if (readStats.GetCommittedPortionsBytes()) {
- UNIT_ASSERT_GE(readStats.GetCommittedPortionsBytes() / 100000, 65);
- UNIT_ASSERT_LE(readStats.GetCommittedPortionsBytes() / 100000, 78);
- }
- if (readStats.GetCompactedPortionsBytes()) {
- if (codec == "" || codec == "lz4") {
- UNIT_ASSERT_GE(readStats.GetCompactedPortionsBytes() / 100000, 40);
- UNIT_ASSERT_LE(readStats.GetCompactedPortionsBytes() / 100000, 50);
- } else if (codec == "none") {
- UNIT_ASSERT_GE(readStats.GetCompactedPortionsBytes() / 100000, 65);
- UNIT_ASSERT_LE(readStats.GetCompactedPortionsBytes() / 100000, 78);
- } else if (codec == "zstd") {
- UNIT_ASSERT_GE(readStats.GetCompactedPortionsBytes() / 100000, 20);
- UNIT_ASSERT_LE(readStats.GetCompactedPortionsBytes() / 100000, 30);
- } else {
- UNIT_ASSERT(false);
- }
- }
+ {
+ NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 10);
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(24, txId));
+ reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema));
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(rb);
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema));
+ UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size());
+ UNIT_ASSERT(rb->num_rows());
+ UNIT_ASSERT(CheckOrdered(rb));
+ UNIT_ASSERT(DataHas({rb}, portion[0]));
+ UNIT_ASSERT(DataHas({rb}, portion[1]));
+ UNIT_ASSERT(DataHas({rb}, portion[2]));
+ UNIT_ASSERT(DataHas({rb}, portion[3]));
+ UNIT_ASSERT(DataHas({rb}, {0, 500}, true));
+
+ const ui64 compactedBytes = reader.GetReadStat("compacted_bytes");
+ const ui64 insertedBytes = reader.GetReadStat("inserted_bytes");
+ const ui64 committedBytes = reader.GetReadStat("committed_bytes");
+ Cerr << codec << "/" << compactedBytes << "/" << insertedBytes << "/" << committedBytes << Endl;
+ if (insertedBytes) {
+ UNIT_ASSERT_GE(insertedBytes / 100000, 40);
+ UNIT_ASSERT_LE(insertedBytes / 100000, 50);
+ }
+ if (committedBytes) {
+ UNIT_ASSERT_GE(committedBytes / 100000, 65);
+ UNIT_ASSERT_LE(committedBytes / 100000, 78);
+ }
+ if (compactedBytes) {
+ if (codec == "" || codec == "lz4") {
+ UNIT_ASSERT_GE(compactedBytes / 100000, 40);
+ UNIT_ASSERT_LE(compactedBytes / 100000, 50);
+ } else if (codec == "none") {
+ UNIT_ASSERT_GE(compactedBytes / 100000, 65);
+ UNIT_ASSERT_LE(compactedBytes / 100000, 78);
+ } else if (codec == "zstd") {
+ UNIT_ASSERT_GE(compactedBytes / 100000, 20);
+ UNIT_ASSERT_LE(compactedBytes / 100000, 30);
+ } else {
+ UNIT_ASSERT(false);
}
}
}
- UNIT_ASSERT(DataHas(readData, schema, portion[0]));
- UNIT_ASSERT(DataHas(readData, schema, portion[1]));
- UNIT_ASSERT(DataHas(readData, schema, portion[2]));
- UNIT_ASSERT(DataHas(readData, schema, portion[3]));
- UNIT_ASSERT(DataHas(readData, schema, {0, 500}, true)); // checks REPLACE (committed vs indexed)
- // read 11 (range predicate: closed interval)
- {
- TSerializedTableRange range = MakeTestRange({10, 42}, true, true, testYdbPk);
- NOlap::TPredicate prGreater, prLess;
- std::tie(prGreater, prLess) = RangePredicates(range, testYdbPk);
-
- auto evRead = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, 24, txId, tableId);
- auto* greater = Proto(evRead.get()).MutableGreaterPredicate();
- auto* less = Proto(evRead.get()).MutableLessPredicate();
- for (auto& name : prGreater.ColumnNames()) {
- greater->AddColumnNames(name);
- }
- for (auto& name : prLess.ColumnNames()) {
- less->AddColumnNames(name);
- }
- greater->SetRow(NArrow::SerializeBatchNoCompression(prGreater.Batch));
- less->SetRow(NArrow::SerializeBatchNoCompression(prLess.Batch));
- greater->SetInclusive(prGreater.IsInclusive());
- less->SetInclusive(prLess.IsInclusive());
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evRead.release());
- }
- readData.clear();
- schema.clear();
+ // read 11 (range predicate: closed interval)
{
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event);
-
- auto& resRead = Proto(event);
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT(resRead.GetData().size() > 0);
- UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
- readData.push_back(resRead.GetData());
- if (schema.empty()) {
- schema = resRead.GetMeta().GetSchema();
- }
- UNIT_ASSERT(CheckOrdered(resRead.GetData(), schema));
+ NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 11);
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(24, txId));
+ reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema));
+ reader.AddRange(MakeTestRange({10, 42}, true, true, testYdbPk));
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(rb);
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema));
+ UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size());
+ UNIT_ASSERT(rb->num_rows());
+ UNIT_ASSERT(CheckOrdered(rb));
+ UNIT_ASSERT(DataHas({rb}, {10, 42 + 1}));
+ UNIT_ASSERT(DataHasOnly({rb}, {10, 42 + 1}));
}
- UNIT_ASSERT(DataHas(readData, schema, {10, 42 + 1}));
- UNIT_ASSERT(DataHasOnly(readData, schema, {10, 42 + 1}));
// read 12 (range predicate: open interval)
{
- TSerializedTableRange range = MakeTestRange({10, 42}, false, false, testYdbPk);
- NOlap::TPredicate prGreater, prLess;
- std::tie(prGreater, prLess) = RangePredicates(range, testYdbPk);
-
- auto evRead = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, 24, txId, tableId);
- auto* greater = Proto(evRead.get()).MutableGreaterPredicate();
- auto* less = Proto(evRead.get()).MutableLessPredicate();
- for (auto& name : prGreater.ColumnNames()) {
- greater->AddColumnNames(name);
- }
- for (auto& name : prLess.ColumnNames()) {
- less->AddColumnNames(name);
- }
-
- greater->SetRow(NArrow::SerializeBatchNoCompression(prGreater.Batch));
- less->SetRow(NArrow::SerializeBatchNoCompression(prLess.Batch));
- greater->SetInclusive(prGreater.IsInclusive());
- less->SetInclusive(prLess.IsInclusive());
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evRead.release());
- }
- readData.clear();
- schema.clear();
- {
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event);
-
- auto& resRead = Proto(event);
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT(resRead.GetData().size() > 0);
- UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
- readData.push_back(resRead.GetData());
- if (schema.empty()) {
- schema = resRead.GetMeta().GetSchema();
- }
- UNIT_ASSERT(CheckOrdered(resRead.GetData(), schema));
+ NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 11);
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(24, txId));
+ reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema));
+ reader.AddRange(MakeTestRange({10, 42}, false, false, testYdbPk));
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(rb);
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema));
+ UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size());
+ UNIT_ASSERT(rb->num_rows());
+ UNIT_ASSERT(CheckOrdered(rb));
+ UNIT_ASSERT(DataHas({rb}, {10 + 1, 41 + 1}));
+ UNIT_ASSERT(DataHasOnly({rb}, {10 + 1, 41 + 1}));
}
- UNIT_ASSERT(DataHas(readData, schema, {11, 41 + 1}));
- UNIT_ASSERT(DataHasOnly(readData, schema, {11, 41 + 1}));
}
void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table) {
@@ -1132,7 +972,6 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table
//
- ui64 metaShard = TTestTxConfig::TxTablet1;
ui64 writeId = 0;
ui64 tableId = 1;
ui64 planStep = 100;
@@ -1193,37 +1032,19 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table
--txId;
for (ui32 i = 0; i < 2; ++i) {
- auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, txId, tableId);
- Proto(read.get()).AddColumnNames("timestamp");
- Proto(read.get()).AddColumnNames("message");
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
-
- TString schema;
- NKikimrTxColumnShard::TMetadata meta;
- std::vector<TString> readData = ReadManyResults(runtime, schema, meta);
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId));
+ reader.SetReplyColumns({"timestamp", "message"});
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(rb);
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
if (ydbPk[0].second == TTypeInfo(NTypeIds::String) || ydbPk[0].second == TTypeInfo(NTypeIds::Utf8)) {
- UNIT_ASSERT(DataHas<std::string>(readData, schema, triggerPortion, true));
- UNIT_ASSERT(DataHas<std::string>(readData, schema, smallWrites, true));
+ UNIT_ASSERT(DataHas<std::string>({rb}, triggerPortion, true));
+ UNIT_ASSERT(DataHas<std::string>({rb}, smallWrites, true));
} else {
- UNIT_ASSERT(DataHas(readData, schema, triggerPortion, true));
- UNIT_ASSERT(DataHas(readData, schema, smallWrites, true));
+ UNIT_ASSERT(DataHas({rb}, triggerPortion, true));
+ UNIT_ASSERT(DataHas({rb}, smallWrites, true));
}
-
- UNIT_ASSERT(meta.HasReadStats());
- auto& readStats = meta.GetReadStats();
- Cerr << readStats.DebugString() << Endl;
- UNIT_ASSERT(readStats.GetBeginTimestamp() > 0);
- UNIT_ASSERT(readStats.GetDurationUsec() > 0);
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetSelectedIndex(), 0);
-// UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 1);
- UNIT_ASSERT(readStats.GetIndexBatches() > 0);
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0);
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), 2); // planStep, txId + 4 PK columns + "message"
- UNIT_ASSERT(readStats.GetIndexPortions() > 0); // got compaction
- UNIT_ASSERT(readStats.GetIndexPortions() <= 5); // got compaction
-
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
}
}
@@ -1411,7 +1232,6 @@ void TestReadWithProgram(const TestTableDescription& table = {})
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
runtime.DispatchEvents(options);
- ui64 metaShard = TTestTxConfig::TxTablet1;
ui64 writeId = 0;
ui64 tableId = 1;
ui64 planStep = 100;
@@ -1454,70 +1274,27 @@ void TestReadWithProgram(const TestTableDescription& table = {})
UNIT_ASSERT(program.SerializeToString(&programs.back()));
}
- for (auto& programText : programs) {
- auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId);
- auto& readProto = Proto(readEvent);
-
- readProto.SetOlapProgramType(::NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM);
- readProto.SetOlapProgram(programText);
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, readEvent);
-
- TAutoPtr<IEventHandle> handle;
- auto result = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(result);
-
- auto& resRead = Proto(result);
-
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::ERROR);
- UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
- UNIT_ASSERT_EQUAL(resRead.GetData(), "");
- }
-
ui32 i = 0;
for (auto& programText : programs) {
- auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId);
- auto& readProto = Proto(readEvent);
-
- readProto.SetOlapProgramType(::NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS);
- readProto.SetOlapProgram(programText);
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, readEvent);
-
- TAutoPtr<IEventHandle> handle;
- auto result = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(result);
-
- auto& resRead = Proto(result);
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId));
+ reader.SetProgram(programText);
+ auto rb = reader.ReadAll();
if (i < numWrong) {
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::ERROR);
- UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
- UNIT_ASSERT_EQUAL(resRead.GetData(), "");
+ UNIT_ASSERT(reader.IsError());
+ UNIT_ASSERT(reader.IsFinished());
} else {
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
-
- auto& meta = resRead.GetMeta();
- auto& schema = meta.GetSchema();
-
- std::vector<TString> readData;
- readData.push_back(resRead.GetData());
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
switch (i) {
case 1:
- UNIT_ASSERT(resRead.GetData().size() > 0);
- UNIT_ASSERT(CheckColumns(readData[0], meta, {"level", "timestamp"}));
- UNIT_ASSERT(DataHas(readData, schema, {0, 100}, true));
+ UNIT_ASSERT(rb);
+ UNIT_ASSERT(rb->num_rows());
+ NArrow::ExtractColumnsValidate(rb, {"level", "timestamp"});
+ UNIT_ASSERT(rb->num_columns() == 2);
+ UNIT_ASSERT(DataHas({rb}, {0, 100}, true));
break;
case 2:
- UNIT_ASSERT(resRead.GetData().size() == 0);
+ UNIT_ASSERT(!rb || !rb->num_rows());
break;
default:
break;
@@ -1539,7 +1316,6 @@ void TestReadWithProgramLike(const TestTableDescription& table = {}) {
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
runtime.DispatchEvents(options);
- ui64 metaShard = TTestTxConfig::TxTablet1;
ui64 writeId = 0;
ui64 tableId = 1;
ui64 planStep = 100;
@@ -1568,56 +1344,29 @@ void TestReadWithProgramLike(const TestTableDescription& table = {}) {
ui32 i = 0;
for (auto& ssa : ssas) {
- TString programText;
- {
- TString serialized;
- UNIT_ASSERT(ssa.SerializeToString(&serialized));
- NKikimrSSA::TOlapProgram program;
- program.SetProgram(serialized);
- UNIT_ASSERT(program.SerializeToString(&programText));
- }
-
- auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId);
- auto& readProto = Proto(readEvent);
-
- readProto.SetOlapProgramType(::NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS);
- readProto.SetOlapProgram(programText);
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, readEvent);
-
- TAutoPtr<IEventHandle> handle;
- auto result = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(result);
-
- auto& resRead = Proto(result);
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
- {
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
- UNIT_ASSERT(resRead.GetData().size() > 0);
-
- auto& meta = resRead.GetMeta();
- //auto& schema = meta.GetSchema();
- TString readData = resRead.GetData();
-
- switch (i) {
- case 0:
- case 1:
- UNIT_ASSERT(CheckColumns(readData, meta, {"message"}, 19));
- break;
- case 2:
- case 3:
- UNIT_ASSERT(CheckColumns(readData, meta, {"message"}, 11));
- break;
- case 4:
- case 5:
- UNIT_ASSERT(CheckColumns(readData, meta, {"message"}, 10));
- break;
- default:
- break;
- }
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId));
+ reader.SetProgram(ssa);
+ auto rb = reader.ReadAll();
+
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ UNIT_ASSERT(rb);
+ UNIT_ASSERT(rb->num_rows());
+
+ switch (i) {
+ case 0:
+ case 1:
+ UNIT_ASSERT(CheckColumns(rb, {"message"}, 19));
+ break;
+ case 2:
+ case 3:
+ UNIT_ASSERT(CheckColumns(rb, {"message"}, 11));
+ break;
+ case 4:
+ case 5:
+ UNIT_ASSERT(CheckColumns(rb, {"message"}, 10));
+ break;
+ default:
+ break;
}
++i;
}
@@ -1634,7 +1383,6 @@ void TestSomePrograms(const TestTableDescription& table) {
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
runtime.DispatchEvents(options);
- ui64 metaShard = TTestTxConfig::TxTablet1;
ui64 writeId = 0;
ui64 tableId = 1;
ui64 planStep = 100;
@@ -1667,31 +1415,10 @@ void TestSomePrograms(const TestTableDescription& table) {
// TODO: add programs with bugs here
for (auto& ssaText : programs) {
- auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId);
- auto& readProto = Proto(readEvent);
-
- TString programText;
- NKikimrSSA::TOlapProgram program;
- program.SetProgram(ssaText);
- UNIT_ASSERT(program.SerializeToString(&programText));
-
- readProto.SetOlapProgramType(::NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM);
- readProto.SetOlapProgram(programText);
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, readEvent);
-
- TAutoPtr<IEventHandle> handle;
- auto result = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(result);
-
- auto& resRead = Proto(result);
-
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::ERROR);
- UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
- //UNIT_ASSERT_EQUAL(resRead.GetData(), "");
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId));
+ reader.SetProgram(ssaText);
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(reader.IsError());
}
}
@@ -1717,7 +1444,6 @@ void TestReadAggregate(const std::vector<std::pair<TString, TTypeInfo>>& ydbSche
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
runtime.DispatchEvents(options);
- ui64 metaShard = TTestTxConfig::TxTablet1;
ui64 writeId = 0;
ui64 tableId = 1;
ui64 planStep = 100;
@@ -1799,37 +1525,10 @@ void TestReadAggregate(const std::vector<std::pair<TString, TTypeInfo>>& ydbSche
for (auto& programText : programs) {
Cerr << "-- select program: " << prog << " is filtered: " << (int)isFiltered.count(prog) << "\n";
- auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId);
- auto& readProto = Proto(readEvent);
-
- readProto.SetOlapProgramType(::NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS);
- readProto.SetOlapProgram(programText);
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, readEvent);
-
- TAutoPtr<IEventHandle> handle;
- auto result = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(result);
-
- auto& resRead = Proto(result);
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
-
- std::shared_ptr<arrow::RecordBatch> batch;
- {
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
- UNIT_ASSERT(resRead.GetData().size() > 0);
-
- auto& meta = resRead.GetMeta();
- auto& schema = meta.GetSchema();
- auto& data = resRead.GetData();
-
- batch = NArrow::DeserializeBatch(data, NArrow::DeserializeSchema(schema));
- UNIT_ASSERT(batch);
- UNIT_ASSERT(batch->ValidateFull().ok());
- }
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId));
+ reader.SetProgram(programText);
+ auto batch = reader.ReadAll();
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
if (checkResult.contains(prog)) {
if (isFiltered.contains(prog)) {
@@ -2287,7 +1986,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
TSerializedTableRange MakeRange(const std::vector<std::pair<TString, TTypeInfo>>& pk) const {
std::vector<TString> mem;
- auto cellsFrom = From ? From->GetCellVec(pk, mem, true) : std::vector<TCell>();
+ auto cellsFrom = From ? From->GetCellVec(pk, mem, false) : std::vector<TCell>();
auto cellsTo = To ? To->GetCellVec(pk, mem) : std::vector<TCell>();
return TSerializedTableRange(TConstArrayRef<TCell>(cellsFrom), (From ? From->GetInclude() : false),
TConstArrayRef<TCell>(cellsTo), (To ? To->GetInclude(): false));
@@ -2300,98 +1999,22 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
const TString TestCaseName;
void Execute() {
- const ui64 metaShard = TTestTxConfig::TxTablet1;
const ui64 tableId = 1;
- const TActorId sender = Owner.Runtime.AllocateEdgeActor();
std::set<TString> useFields = {"timestamp", "message"};
{ // read with predicate (FROM)
- auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, Owner.PlanStep, Owner.TxId, tableId);
- Proto(read.get()).AddColumnNames("timestamp");
- Proto(read.get()).AddColumnNames("message");
-
- const TSerializedTableRange range = MakeRange(Owner.YdbPk);
-
- NOlap::TPredicate prGreater, prLess;
- std::tie(prGreater, prLess) = RangePredicates(range, Owner.YdbPk);
- if (From) {
- auto* greater = Proto(read.get()).MutableGreaterPredicate();
- for (auto& name : prGreater.ColumnNames()) {
- greater->AddColumnNames(name);
- useFields.emplace(name);
- }
- greater->SetRow(NArrow::SerializeBatchNoCompression(prGreater.Batch));
- greater->SetInclusive(From->GetInclude());
- };
- if (To) {
- auto* less = Proto(read.get()).MutableLessPredicate();
- for (auto& name : prLess.ColumnNames()) {
- less->AddColumnNames(name);
- useFields.emplace(name);
- }
- less->SetRow(NArrow::SerializeBatchNoCompression(prLess.Batch));
- less->SetInclusive(To->GetInclude());
- }
-
- ForwardToTablet(Owner.Runtime, TTestTxConfig::TxTablet0, sender, read.release());
- }
-
- ui32 numRows = 0;
- std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
- ui32 expected = 100;
- for (ui32 i = 0; i < expected; ++i) {
- TAutoPtr<IEventHandle> handle;
- auto event = Owner.Runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event);
-
- auto& resRead = Proto(event);
- AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("0_type_id", Owner.YdbPk[0].second.GetTypeId())("batch", resRead.GetBatch())("data_size", resRead.GetData().size());
-
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- if (ExpectedCount && !*ExpectedCount) {
- UNIT_ASSERT(!resRead.GetBatch());
- UNIT_ASSERT(resRead.GetFinished());
- UNIT_ASSERT(!resRead.GetData().size());
- } else {
- UNIT_ASSERT(resRead.HasMeta());
-
- auto& meta = resRead.GetMeta();
- auto schema = NArrow::DeserializeSchema(meta.GetSchema());
- UNIT_ASSERT(schema);
- auto batch = NArrow::DeserializeBatch(resRead.GetData(), schema);
- UNIT_ASSERT(batch);
-
- numRows += batch->num_rows();
- batches.emplace_back(batch);
-
- if (resRead.GetFinished()) {
- UNIT_ASSERT(meta.HasReadStats());
- auto& readStats = meta.GetReadStats();
-
- UNIT_ASSERT(readStats.GetBeginTimestamp());
- UNIT_ASSERT(readStats.GetDurationUsec());
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetSelectedIndex(), 0);
- UNIT_ASSERT(readStats.GetIndexBatches());
- //UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), useFields.size()); // planStep, txId + 4 PK columns + "message"
- //UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 1);
- //UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), 0); // TODO: min-max index optimization?
+ NOlap::NTests::TShardReader reader(Owner.Runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(Owner.PlanStep, Owner.TxId));
+ reader.SetReplyColumns({"timestamp", "message"});
+ reader.AddRange(MakeRange(Owner.YdbPk));
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ if (ExpectedCount) {
+ if (*ExpectedCount) {
+ UNIT_ASSERT(CheckOrdered(rb));
+ UNIT_ASSERT(CheckColumns(rb, {"timestamp", "message"}, ExpectedCount));
+ } else {
+ UNIT_ASSERT(!rb || !rb->num_rows());
}
}
- if (resRead.GetFinished()) {
- expected = resRead.GetBatch();
- }
- }
- UNIT_ASSERT(expected < 100);
-
- if (ExpectedCount) {
- if (numRows != *ExpectedCount) {
- for (auto& batch : batches) {
- Cerr << batch->ToString() << "\n";
- }
- }
- UNIT_ASSERT_VALUES_EQUAL(numRows, *ExpectedCount);
}
}
@@ -2432,7 +2055,6 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
runtime.DispatchEvents(options);
- const ui64 metaShard = TTestTxConfig::TxTablet1;
const ui64 tableId = 1;
ui64 planStep = 100;
ui64 txId = 100;
@@ -2473,43 +2095,21 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
for (ui32 i = 0; i < 2; ++i) {
{
- auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, txId, tableId);
- Proto(read.get()).AddColumnNames("timestamp");
- Proto(read.get()).AddColumnNames("message");
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
- }
-
- {
- TString schema;
- NKikimrTxColumnShard::TMetadata meta;
- std::vector<TString> readData = ReadManyResults(runtime, schema, meta, 20000);
- {
- schema = meta.GetSchema();
-
- UNIT_ASSERT(meta.HasReadStats());
- auto& readStats = meta.GetReadStats();
-
- UNIT_ASSERT(readStats.GetBeginTimestamp() > 0);
- UNIT_ASSERT(readStats.GetDurationUsec() > 0);
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetSelectedIndex(), 0);
- UNIT_ASSERT(readStats.GetIndexBatches() > 0);
- //UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), 2); // planStep, txId + 4 PK columns + "message"
-// UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 3); // got 2 split compactions
- //UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), x);
- }
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId));
+ reader.SetReplyColumns({"timestamp", "message"});
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ UNIT_ASSERT(CheckOrdered(rb));
if (testBlobOptions.SameValueColumns.contains("timestamp")) {
UNIT_ASSERT(!testBlobOptions.SameValueColumns.contains("message"));
- UNIT_ASSERT(DataHas<std::string>(readData, schema, { 0, fullNumRows}, true, "message"));
+ UNIT_ASSERT(DataHas<std::string>({rb}, {0, fullNumRows}, true, "message"));
} else {
UNIT_ASSERT(isStrPk0
- ? DataHas<std::string>(readData, schema, { 0, fullNumRows}, true, "timestamp")
- : DataHas(readData, schema, { 0, fullNumRows}, true, "timestamp"));
+ ? DataHas<std::string>({rb}, {0, fullNumRows}, true, "timestamp")
+ : DataHas({rb}, {0, fullNumRows}, true, "timestamp"));
}
}
-
std::vector<ui32> val0 = { 0 };
std::vector<ui32> val1 = { 1 };
std::vector<ui32> val9990 = { 99990 };
@@ -2681,7 +2281,6 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
runtime.DispatchEvents(options);
- ui64 metaShard = TTestTxConfig::TxTablet1;
ui64 writeId = 0;
ui64 tableId = 1;
ui64 planStep = 1000000;
@@ -2705,30 +2304,11 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
TDuration staleness = TDuration::Minutes(6);
- // Try to read snapshot that is too old
- {
- {
- auto request = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep - staleness.MilliSeconds(), Max<ui64>(), tableId);
- request->Record.AddColumnNames("timestamp");
- request->Record.AddColumnNames("message");
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, request.release());
- }
-
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event);
-
- auto& response = event->Record;
- UNIT_ASSERT_VALUES_EQUAL(response.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_VALUES_EQUAL(response.GetTxInitiator(), metaShard);
- UNIT_ASSERT_VALUES_EQUAL(response.GetStatus(), (ui32)NKikimrTxColumnShard::EResultStatus::ERROR);
- }
-
// Try to scan snapshot that is too old
{
{
auto request = std::make_unique<TEvColumnShard::TEvScan>();
- request->Record.SetTxId(1000);
+ request->Record.SetTxId(Max<ui64>());
request->Record.SetScanId(1);
request->Record.SetLocalPathId(tableId);
request->Record.SetTablePath("test_olap_table");
@@ -2747,6 +2327,15 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
UNIT_ASSERT_VALUES_EQUAL(response.IssuesSize(), 1);
UNIT_ASSERT_STRING_CONTAINS(response.GetIssues(0).message(), "Snapshot too old: {640000:max}");
}
+
+ // Try to read snapshot that is too old
+ {
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep - staleness.MilliSeconds(), Max<ui64>()));
+ reader.SetReplyColumns({"timestamp", "message"});
+ reader.ReadAll();
+ UNIT_ASSERT(reader.IsError());
+ }
+
}
void TestCompactionGC() {
@@ -2763,7 +2352,6 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
runtime.DispatchEvents(options);
- ui64 metaShard = TTestTxConfig::TxTablet1;
ui64 writeId = 0;
ui64 tableId = 1;
@@ -2923,32 +2511,12 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
// Send a request that reads the latest version
// This request is expected to read at least 1 committed blob and several index portions
// These committed blob and portions must not be deleted by the BlobManager until the read request finishes
- auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, txId, tableId);
- Proto(read.get()).AddColumnNames("timestamp");
- Proto(read.get()).AddColumnNames("message");
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
-
- ui32 expected = 0;
- ui32 num = 0;
- while (!expected || num < expected) {
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event);
-
- auto& resRead = Proto(event);
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
-
- if (resRead.GetFinished()) {
- expected = resRead.GetBatch() + 1;
- UNIT_ASSERT(resRead.HasMeta());
- }
- UNIT_ASSERT(resRead.GetData().size() > 0);
-
- ++num;
- UNIT_ASSERT(num < 10);
- }
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep - 1, Max<ui64>()));
+ reader.SetReplyColumns({"timestamp", "message"});
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ UNIT_ASSERT(CheckOrdered(rb));
+ UNIT_ASSERT(reader.GetIterationsCount() < 10);
// We captured EvReadFinished event and dropped is so the columnshard still thinks that
// read request is in progress and keeps the portions
diff --git a/ydb/core/tx/columnshard/ut_rw/ya.make b/ydb/core/tx/columnshard/ut_rw/ya.make
index 63c9dbed1d..5932bccf75 100644
--- a/ydb/core/tx/columnshard/ut_rw/ya.make
+++ b/ydb/core/tx/columnshard/ut_rw/ya.make
@@ -21,6 +21,7 @@ PEERDIR(
ydb/core/testlib/default
ydb/core/tx/columnshard/hooks/abstract
ydb/core/tx/columnshard/hooks/testing
+ ydb/core/tx/columnshard/common/tests
ydb/services/metadata
ydb/core/tx
ydb/public/lib/yson_value
diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
index 9640b19c70..dfc554c4ad 100644
--- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/cms/console/configs_dispatcher.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/core/tx/columnshard/common/tests/shard_reader.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
#include <ydb/core/tx/columnshard/blobs_reader/actor.h>
@@ -144,25 +145,12 @@ bool TriggerTTL(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot s
return (res.GetStatus() == NKikimrTxColumnShard::SUCCESS);
}
-std::shared_ptr<arrow::Array> DeserializeColumn(const TString& blob, const TString& strSchema,
- const std::string& columnName)
-{
- auto schema = NArrow::DeserializeSchema(strSchema);
- auto batch = NArrow::DeserializeBatch(blob, schema);
- UNIT_ASSERT(batch);
-
- //Cerr << "Got data batch (" << batch->num_rows() << " rows): " << batch->ToString() << "\n";
-
- std::shared_ptr<arrow::Array> array = batch->GetColumnByName(columnName);
- UNIT_ASSERT(array);
- return array;
-}
-
-bool CheckSame(const TString& blob, const TString& strSchema, ui32 expectedSize,
+bool CheckSame(const std::shared_ptr<arrow::RecordBatch>& batch, const ui32 expectedSize,
const std::string& columnName, i64 seconds) {
- auto tsCol = DeserializeColumn(blob, strSchema, columnName);
+ UNIT_ASSERT(batch);
+ UNIT_ASSERT_VALUES_EQUAL(batch->num_rows(), expectedSize);
+ auto tsCol = batch->GetColumnByName(columnName);
UNIT_ASSERT(tsCol);
- UNIT_ASSERT_VALUES_EQUAL(tsCol->length(), expectedSize);
std::shared_ptr<arrow::Scalar> expected;
switch (tsCol->type_id()) {
@@ -239,23 +227,6 @@ enum class EExpectedResult {
ERROR
};
-TString GetReadResult(NKikimrTxColumnShard::TEvReadResult& resRead, EExpectedResult expected = EExpectedResult::OK_FINISHED)
-{
- Cerr << "Got batchNo: " << resRead.GetBatch() << "\n";
-
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), TTestTxConfig::TxTablet1);
- if (expected == EExpectedResult::ERROR) {
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::ERROR);
- } else {
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- }
- if (expected == EExpectedResult::OK_FINISHED) {
- UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
- }
- return resRead.GetData();
-}
-
static constexpr ui32 PORTION_ROWS = 80 * 1000;
// ts[0] = 1600000000; // date -u --date='@1600000000' Sun Sep 13 12:26:40 UTC 2020
@@ -292,7 +263,6 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
//
- ui64 metaShard = TTestTxConfig::TxTablet1;
ui64 writeId = 0;
ui64 tableId = 1;
ui64 planStep = 1000000000; // greater then delays
@@ -349,20 +319,11 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
{
--planStep;
- auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId);
- Proto(read.get()).AddColumnNames(spec.TtlColumn);
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event);
-
- auto& resRead = Proto(event);
- TString data = GetReadResult(resRead);
- UNIT_ASSERT_VALUES_EQUAL(resRead.GetBatch(), 0);
- UNIT_ASSERT(data.size() > 0);
-
- auto& schema = resRead.GetMeta().GetSchema();
- UNIT_ASSERT(CheckSame(data, schema, PORTION_ROWS, spec.TtlColumn, ts[1]));
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, Max<ui64>()));
+ reader.SetReplyColumns({spec.TtlColumn});
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ UNIT_ASSERT(CheckSame(rb, PORTION_ROWS, spec.TtlColumn, ts[1]));
}
// Alter TTL
@@ -389,17 +350,11 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
{
--planStep;
- auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId);
- Proto(read.get()).AddColumnNames(spec.TtlColumn);
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event);
-
- auto& resRead = Proto(event);
- TString data = GetReadResult(resRead);
- UNIT_ASSERT_VALUES_EQUAL(resRead.GetBatch(), 0);
- UNIT_ASSERT_VALUES_EQUAL(data.size(), 0);
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, Max<ui64>()));
+ reader.SetReplyColumns({spec.TtlColumn});
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ UNIT_ASSERT(!rb || !rb->num_rows());
}
// Disable TTL
@@ -426,23 +381,11 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
{
--planStep;
- auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId);
- Proto(read.get()).AddColumnNames(spec.TtlColumn);
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event);
-
- auto& resRead = Proto(event);
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
- UNIT_ASSERT(resRead.GetData().size() > 0);
-
- auto& schema = resRead.GetMeta().GetSchema();
- UNIT_ASSERT(CheckSame(resRead.GetData(), schema, PORTION_ROWS, spec.TtlColumn, ts[0]));
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, Max<ui64>()));
+ reader.SetReplyColumns({spec.TtlColumn});
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ UNIT_ASSERT(CheckSame(rb, PORTION_ROWS, spec.TtlColumn, ts[0]));
}
}
@@ -563,27 +506,6 @@ public:
if (ev->GetTypeRewrite() == TEvTablet::EvBoot) {
Counters->BlockForgets = false;
return false;
- } else if (auto* msg = TryGetPrivateEvent<NColumnShard::TEvPrivate::TEvExport>(ev)) {
- if (msg->Status == NKikimrProto::OK) {
- ss << "EXPORT(done " << ++Counters->ExportCounters.Success << "): ";
- } else {
- ss << "EXPORT(attempt " << ++Counters->ExportCounters.Attempt << "): "
- << NKikimrProto::EReplyStatus_Name(msg->Status);
- }
- } else if (auto* msg = TryGetPrivateEvent<NColumnShard::TEvPrivate::TEvForget>(ev)) {
- if (Counters->BlockForgets) {
- ss << "FORGET(ignore " << NKikimrProto::EReplyStatus_Name(msg->Status) << "): ";
- ss << " " << ev->Sender << "->" << ev->Recipient;
- Cerr << ss << Endl;
- return true;
- }
-
- if (msg->Status == NKikimrProto::OK) {
- ss << "FORGET(done " << ++Counters->ForgetCounters.Success << "): ";
- } else {
- ss << "FORGET(attempt " << ++Counters->ForgetCounters.Attempt << "): "
- << NKikimrProto::EReplyStatus_Name(msg->Status);
- }
} else if (auto* msg = TryGetPrivateEvent<NWrappers::NExternalStorage::TEvPutObjectRequest>(ev)) {
ss << "S3_REQ(put " << ++Counters->ExportCounters.Request << "):";
} else if (auto* msg = TryGetPrivateEvent<NWrappers::NExternalStorage::TEvPutObjectResponse>(ev)) {
@@ -616,8 +538,8 @@ public:
} else {
return false;
}
- } else if (auto* msg = TryGetPrivateEvent<TEvColumnShard::TEvReadResult>(ev)) {
- ss << "Got TEvReadResult " << NKikimrTxColumnShard::EResultStatus_Name(Proto(msg).GetStatus()) << Endl;
+ } else if (auto* msg = TryGetPrivateEvent<NKqp::TEvKqpCompute::TEvScanData>(ev)) {
+ ss << "Got TEvKqpCompute::TEvScanData" << Endl;
} else {
return false;
}
@@ -677,7 +599,6 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
//
- ui64 metaShard = TTestTxConfig::TxTablet1;
ui64 writeId = 0;
ui64 tableId = 1;
ui64 planStep = 1000000000; // greater then delays
@@ -760,38 +681,28 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
}
// Read crossed with eviction (start)
- if (!misconfig) {
- auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep - 1, Max<ui64>(), tableId);
- Proto(read.get()).AddColumnNames(specs[i].TtlColumn);
-
- counter.CaptureReadEvents = specs[i].WaitEmptyAfter ? 0 : 1; // TODO: we need affected by tiering blob here
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
- counter.WaitReadsCaptured(runtime);
- }
-
- // Eviction
-
- TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {}, 0, specs[i].TtlColumn);
+ {
+ std::unique_ptr<NOlap::NTests::TShardReader> reader;
+ if (!misconfig) {
+ reader = std::make_unique<NOlap::NTests::TShardReader>(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep - 1, Max<ui64>()));
+ counter.CaptureReadEvents = specs[i].WaitEmptyAfter ? 0 : 1; // TODO: we need affected by tiering blob here
+ counter.WaitReadsCaptured(runtime);
+ reader->InitializeScanner();
+ reader->Ack();
+ }
- Cerr << "-- " << (hasColdEviction ? "COLD" : "HOT")
- << " TIERING(" << i << ") num tiers: " << specs[i].Tiers.size() << Endl;
+ // Eviction
- // Read crossed with eviction (finish)
- if (!misconfig) {
- counter.ResendCapturedReads(runtime);
- ui32 numBatches = 0;
- THashSet<ui32> batchNumbers;
- while (!numBatches || numBatches < batchNumbers.size()) {
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event);
+ TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {}, 0, specs[i].TtlColumn);
- auto& resRead = Proto(event);
- TString data = GetReadResult(resRead, EExpectedResult::OK);
+ Cerr << "-- " << (hasColdEviction ? "COLD" : "HOT")
+ << " TIERING(" << i << ") num tiers: " << specs[i].Tiers.size() << Endl;
- batchNumbers.insert(resRead.GetBatch());
- if (resRead.GetFinished()) {
- numBatches = resRead.GetBatch() + 1;
- }
+ // Read crossed with eviction (finish)
+ if (!misconfig) {
+ counter.ResendCapturedReads(runtime);
+ reader->ContinueReadAll();
+ UNIT_ASSERT(reader->IsCorrectlyFinished());
}
}
while (csControllerGuard->GetTTLFinishedCounter() != csControllerGuard->GetTTLStartedCounter()) {
@@ -812,42 +723,18 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
// Read data after eviction
TString columnToRead = specs[i].TtlColumn;
- auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep-1, Max<ui64>(), tableId);
- Proto(read.get()).AddColumnNames(columnToRead);
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
-
- specRowsBytes.emplace_back(0, 0);
- ui32 numBatches = 0;
- ui32 numExpected = (expectedReadResult == EExpectedResult::ERROR) ? 1 : 100;
- for (; numBatches < numExpected; ++numBatches) {
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event);
-
- auto& resRead = Proto(event);
- TString data = GetReadResult(resRead, expectedReadResult);
- if (expectedReadResult == EExpectedResult::ERROR) {
- break;
- }
- if (!data.size()) {
- UNIT_ASSERT(resRead.GetFinished());
- break;
- }
-
- auto& meta = resRead.GetMeta();
- auto& schema = meta.GetSchema();
- auto ttlColumn = DeserializeColumn(resRead.GetData(), schema, columnToRead);
- UNIT_ASSERT(ttlColumn);
-
- specRowsBytes.back().first += ttlColumn->length();
- if (resRead.GetFinished()) {
- UNIT_ASSERT(meta.HasReadStats());
- auto& readStats = meta.GetReadStats();
- const ui64 numBytes = readStats.GetCompactedPortionsBytes() + readStats.GetInsertedPortionsBytes() + readStats.GetCommittedPortionsBytes();
- specRowsBytes.back().second += numBytes;
- numExpected = resRead.GetBatch() + 1;
- }
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep - 1, Max<ui64>()));
+ reader.SetReplyColumns({columnToRead});
+ auto rb = reader.ReadAll();
+ if (expectedReadResult == EExpectedResult::ERROR) {
+ UNIT_ASSERT(reader.IsError());
+ specRowsBytes.emplace_back(0, 0);
+ } else {
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ specRowsBytes.emplace_back(reader.GetRecordsCount(), reader.GetReadBytes());
}
- UNIT_ASSERT(numBatches < 100);
+
+ UNIT_ASSERT(reader.GetIterationsCount() < 100);
if (reboots) {
Cerr << "REBOOT(" << i << ")" << Endl;
@@ -1103,7 +990,6 @@ void TestDrop(bool reboots) {
//
- ui64 metaShard = TTestTxConfig::TxTablet1;
ui64 writeId = 0;
ui64 tableId = 1;
ui64 planStep = 1000000000; // greater then delays
@@ -1151,17 +1037,11 @@ void TestDrop(bool reboots) {
TAutoPtr<IEventHandle> handle;
{
--planStep;
- auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId);
- Proto(read.get()).AddColumnNames(TTestSchema::DefaultTtlColumn);
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event);
-
- auto& resRead = Proto(event);
- TString data = GetReadResult(resRead);
- UNIT_ASSERT_VALUES_EQUAL(resRead.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(data.size(), 0);
+ NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, Max<ui64>()));
+ reader.SetReplyColumns({TTestSchema::DefaultTtlColumn});
+ auto rb = reader.ReadAll();
+ UNIT_ASSERT(reader.IsCorrectlyFinished());
+ UNIT_ASSERT(!rb || !rb->num_rows());
}
}
diff --git a/ydb/core/tx/program/program.cpp b/ydb/core/tx/program/program.cpp
index b815cc39b9..8537990e37 100644
--- a/ydb/core/tx/program/program.cpp
+++ b/ydb/core/tx/program/program.cpp
@@ -33,13 +33,26 @@ public:
}
private:
NSsa::TColumnInfo GetColumnInfo(const NKikimrSSA::TProgram::TColumn& column) const {
- const ui32 columnId = column.GetId();
- const TString name = ColumnResolver.GetColumnName(columnId, false);
- if (name.Empty()) {
- return NSsa::TColumnInfo::Generated(columnId, GenerateName(column));
+ if (column.HasId() && column.GetId()) {
+ const ui32 columnId = column.GetId();
+ const TString name = ColumnResolver.GetColumnName(columnId, false);
+ if (name.Empty()) {
+ return NSsa::TColumnInfo::Generated(columnId, GenerateName(column));
+ } else {
+ Sources.emplace(columnId, NSsa::TColumnInfo::Original(columnId, name));
+ return NSsa::TColumnInfo::Original(columnId, name);
+ }
+ } else if (column.HasName() && !!column.GetName()) {
+ const TString name = column.GetName();
+ const std::optional<ui32> columnId = ColumnResolver.GetColumnIdOptional(name);
+ if (columnId) {
+ Sources.emplace(*columnId, NSsa::TColumnInfo::Original(*columnId, name));
+ return NSsa::TColumnInfo::Original(*columnId, name);
+ } else {
+ return NSsa::TColumnInfo::Generated(0, GenerateName(column));
+ }
} else {
- Sources.emplace(columnId, NSsa::TColumnInfo::Original(columnId, name));
- return NSsa::TColumnInfo::Original(columnId, name);
+ return NSsa::TColumnInfo::Generated(0, GenerateName(column));
}
}
diff --git a/ydb/core/tx/program/program.h b/ydb/core/tx/program/program.h
index bb3e67deb2..67cac1845a 100644
--- a/ydb/core/tx/program/program.h
+++ b/ydb/core/tx/program/program.h
@@ -14,6 +14,7 @@ class IColumnResolver {
public:
virtual ~IColumnResolver() = default;
virtual TString GetColumnName(ui32 id, bool required = true) const = 0;
+ virtual std::optional<ui32> GetColumnIdOptional(const TString& name) const = 0;
virtual const NTable::TScheme::TTableSchema& GetSchema() const = 0;
virtual NSsa::TColumnInfo GetDefaultColumn() const = 0;
};