aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2024-11-26 13:40:30 +0300
committerGitHub <noreply@github.com>2024-11-26 13:40:30 +0300
commit6ed5294048f109fb90c718d0dc20e497fd4609f5 (patch)
tree6838ddb6f590239ac5c4a144487970621f6efb47
parent7c597c1e1c97f94099f4171c5219b584eee231bc (diff)
downloadydb-6ed5294048f109fb90c718d0dc20e497fd4609f5.tar.gz
Simple reader (#11894)
-rw-r--r--ydb/core/formats/arrow/common/container.cpp19
-rw-r--r--ydb/core/formats/arrow/common/container.h25
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_events.h3
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_state.h1
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h5
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp41
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h3
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/protos/kqp.proto14
-rw-r--r--ydb/core/protos/tx_datashard.proto2
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp5
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h8
-rw-r--r--ydb/core/tx/columnshard/counters/scan.h9
-rw-r--r--ydb/core/tx/columnshard/engines/portions/data_accessor.cpp103
-rw-r--r--ydb/core/tx/columnshard/engines/portions/data_accessor.h17
-rw-r--r--ydb/core/tx/columnshard/engines/predicate/filter.h160
-rw-r--r--ydb/core/tx/columnshard/engines/reader/abstract/constructor.cpp27
-rw-r--r--ydb/core/tx/columnshard/engines/reader/abstract/constructor.h28
-rw-r--r--ydb/core/tx/columnshard/engines/reader/abstract/read_context.h5
-rw-r--r--ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h11
-rw-r--r--ydb/core/tx/columnshard/engines/reader/actor/actor.cpp19
-rw-r--r--ydb/core/tx/columnshard/engines/reader/actor/actor.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common/description.h14
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common/result.h19
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.cpp7
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.h11
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/ya.make2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/constructor.cpp47
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/constructor.h28
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.cpp124
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h172
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/resolver.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/resolver.h28
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/ya.make14
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/columns_set.cpp79
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/columns_set.h214
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.cpp22
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.h32
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp309
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h85
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetched_data.cpp21
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetched_data.h236
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp403
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h520
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/iterator.cpp59
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/iterator.h103
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp58
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h78
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp133
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h76
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp245
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h436
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make24
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/ya.make11
-rw-r--r--ydb/core/tx/columnshard/engines/reader/sys_view/abstract/iterator.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/sys_view/abstract/metadata.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/sys_view/abstract/policy.h6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h10
-rw-r--r--ydb/core/tx/columnshard/engines/reader/sys_view/constructor/constructor.h4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/sys_view/granules/granules.h8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/sys_view/optimizer/optimizer.h8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/sys_view/portions/portions.h8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp25
-rw-r--r--ydb/core/tx/columnshard/engines/reader/ya.make1
-rw-r--r--ydb/core/tx/columnshard/export/actor/export_actor.h1
-rw-r--r--ydb/core/tx/columnshard/operations/events.h8
-rw-r--r--ydb/core/tx/columnshard/operations/write.cpp10
-rw-r--r--ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp78
-rw-r--r--ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h8
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_backup.cpp3
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp101
-rw-r--r--ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp7
-rw-r--r--ydb/core/tx/conveyor/service/service.cpp4
-rw-r--r--ydb/core/tx/conveyor/usage/service.h6
-rw-r--r--ydb/core/tx/data_events/common/modification_type.h27
-rw-r--r--ydb/core/tx/data_events/events.h4
-rw-r--r--ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp12
80 files changed, 4229 insertions, 252 deletions
diff --git a/ydb/core/formats/arrow/common/container.cpp b/ydb/core/formats/arrow/common/container.cpp
index 7b159f2eef..9100a9fa56 100644
--- a/ydb/core/formats/arrow/common/container.cpp
+++ b/ydb/core/formats/arrow/common/container.cpp
@@ -148,27 +148,32 @@ std::shared_ptr<NKikimr::NArrow::TGeneralContainer> TGeneralContainer::BuildEmpt
return std::make_shared<TGeneralContainer>(Schema, std::move(columns));
}
-std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableOptional(const std::optional<std::set<std::string>>& columnNames /*= {}*/) const {
+std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableOptional(const TTableConstructionContext& context) const {
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
std::vector<std::shared_ptr<arrow::Field>> fields;
for (i32 i = 0; i < Schema->num_fields(); ++i) {
- if (columnNames && !columnNames->contains(Schema->field(i)->name())) {
+ if (context.GetColumnNames() && !context.GetColumnNames()->contains(Schema->field(i)->name())) {
continue;
}
- columns.emplace_back(Columns[i]->GetChunkedArray());
+ if (context.GetRecordsCount() || context.GetStartIndex()) {
+ columns.emplace_back(Columns[i]->Slice(context.GetStartIndex().value_or(0),
+ context.GetRecordsCount().value_or(GetRecordsCount() - context.GetStartIndex().value_or(0))));
+ } else {
+ columns.emplace_back(Columns[i]->GetChunkedArray());
+ }
fields.emplace_back(Schema->field(i));
}
if (fields.empty()) {
return nullptr;
}
AFL_VERIFY(RecordsCount);
- return arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns, *RecordsCount);
+ return arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns, context.GetRecordsCount().value_or(*RecordsCount));
}
-std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableVerified(const std::optional<std::set<std::string>>& columnNames /*= {}*/) const {
- auto result = BuildTableOptional(columnNames);
+std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableVerified(const TTableConstructionContext& context) const {
+ auto result = BuildTableOptional(context);
AFL_VERIFY(result);
- AFL_VERIFY(!columnNames || result->schema()->num_fields() == (i32)columnNames->size());
+ AFL_VERIFY(!context.GetColumnNames() || result->schema()->num_fields() == (i32)context.GetColumnNames()->size());
return result;
}
diff --git a/ydb/core/formats/arrow/common/container.h b/ydb/core/formats/arrow/common/container.h
index dacd5d62c0..23f3279e8d 100644
--- a/ydb/core/formats/arrow/common/container.h
+++ b/ydb/core/formats/arrow/common/container.h
@@ -62,8 +62,29 @@ public:
return Columns[idx];
}
- std::shared_ptr<arrow::Table> BuildTableVerified(const std::optional<std::set<std::string>>& columnNames = {}) const;
- std::shared_ptr<arrow::Table> BuildTableOptional(const std::optional<std::set<std::string>>& columnNames = {}) const;
+ class TTableConstructionContext {
+ private:
+ YDB_ACCESSOR_DEF(std::optional<std::set<std::string>>, ColumnNames);
+ YDB_ACCESSOR_DEF(std::optional<ui32>, StartIndex);
+ YDB_ACCESSOR_DEF(std::optional<ui32>, RecordsCount);
+
+ public:
+ TTableConstructionContext() = default;
+ TTableConstructionContext(std::set<std::string>&& columnNames)
+ : ColumnNames(std::move(columnNames)) {
+ }
+
+ TTableConstructionContext(const std::set<std::string>& columnNames)
+ : ColumnNames(columnNames) {
+ }
+
+ void SetColumnNames(const std::vector<TString>& names) {
+ ColumnNames = std::set<std::string>(names.begin(), names.end());
+ }
+ };
+
+ std::shared_ptr<arrow::Table> BuildTableVerified(const TTableConstructionContext& context = Default<TTableConstructionContext>()) const;
+ std::shared_ptr<arrow::Table> BuildTableOptional(const TTableConstructionContext& context = Default<TTableConstructionContext>()) const;
std::shared_ptr<TGeneralContainer> BuildEmptySame() const;
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_events.h b/ydb/core/kqp/compute_actor/kqp_compute_events.h
index a9dd127a64..3142aca264 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_events.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_events.h
@@ -53,6 +53,7 @@ struct TEvScanData: public NActors::TEventLocal<TEvScanData, TKqpComputeEvents::
std::vector<std::vector<ui32>> SplittedBatches;
TOwnedCellVec LastKey;
+ NKikimrKqp::TEvKqpScanCursor LastCursorProto;
TDuration CpuTime;
TDuration WaitTime;
ui32 PageFaults = 0; // number of page faults occurred when filling in this message
@@ -120,6 +121,7 @@ struct TEvScanData: public NActors::TEventLocal<TEvScanData, TKqpComputeEvents::
ev->Finished = pbEv->Record.GetFinished();
ev->RequestedBytesLimitReached = pbEv->Record.GetRequestedBytesLimitReached();
ev->LastKey = TOwnedCellVec(TSerializedCellVec(pbEv->Record.GetLastKey()).GetCells());
+ ev->LastCursorProto = pbEv->Record.GetLastCursor();
if (pbEv->Record.HasAvailablePacks()) {
ev->AvailablePacks = pbEv->Record.GetAvailablePacks();
}
@@ -153,6 +155,7 @@ private:
Remote->Record.SetPageFaults(PageFaults);
Remote->Record.SetPageFault(PageFault);
Remote->Record.SetLastKey(TSerializedCellVec::Serialize(LastKey));
+ *Remote->Record.MutableLastCursor() = LastCursorProto;
if (AvailablePacks) {
Remote->Record.SetAvailablePacks(*AvailablePacks);
}
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_state.h b/ydb/core/kqp/compute_actor/kqp_compute_state.h
index 7431164173..b4b71f3b72 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_state.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_state.h
@@ -38,6 +38,7 @@ struct TShardState: public TCommonRetriesState {
bool SubscribedOnTablet = false;
TActorId ActorId;
TOwnedCellVec LastKey;
+ std::optional<NKikimrKqp::TEvKqpScanCursor> LastCursorProto;
std::optional<ui32> AvailablePacks;
TString PrintLastKey(TConstArrayRef<NScheme::TTypeInfo> keyTypes) const;
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
index 7b2c7556b2..7b3477d15c 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
@@ -14,7 +14,8 @@ namespace NKikimr::NKqp::NScanPrivate {
class IExternalObjectsProvider {
public:
- virtual std::unique_ptr<TEvDataShard::TEvKqpScan> BuildEvKqpScan(const ui32 scanId, const ui32 gen, const TSmallVec<TSerializedTableRange>& ranges) const = 0;
+ virtual std::unique_ptr<TEvDataShard::TEvKqpScan> BuildEvKqpScan(const ui32 scanId, const ui32 gen, const TSmallVec<TSerializedTableRange>& ranges,
+ const std::optional<NKikimrKqp::TEvKqpScanCursor>& cursor) const = 0;
virtual const TVector<NScheme::TTypeInfo>& GetKeyColumnTypes() const = 0;
};
@@ -61,7 +62,7 @@ public:
const auto& keyColumnTypes = externalObjectsProvider.GetKeyColumnTypes();
auto ranges = state.GetScanRanges(keyColumnTypes);
- auto ev = externalObjectsProvider.BuildEvKqpScan(ScanId, Generation, ranges);
+ auto ev = externalObjectsProvider.BuildEvKqpScan(ScanId, Generation, ranges, state.LastCursorProto);
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "start_scanner")("tablet_id", TabletId)("generation", Generation)
("info", state.ToString(keyColumnTypes))("range", DebugPrintRanges(keyColumnTypes, ranges, *AppData()->TypeRegistry))
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
index e07a597f96..c17289b35f 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
@@ -37,8 +37,7 @@ TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snaps
, ShardsScanningPolicy(shardsScanningPolicy)
, Counters(counters)
, InFlightShards(ScanId, *this)
- , InFlightComputes(ComputeActorIds)
-{
+ , InFlightComputes(ComputeActorIds) {
Y_UNUSED(traceId);
AFL_ENSURE(!Meta.GetReads().empty());
AFL_ENSURE(Meta.GetTable().GetTableKind() != (ui32)ETableKind::SysView);
@@ -47,7 +46,7 @@ TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snaps
for (size_t i = 0; i < Meta.KeyColumnTypesSize(); i++) {
NScheme::TTypeId typeId = Meta.GetKeyColumnTypes().at(i);
NScheme::TTypeInfo typeInfo = NScheme::NTypeIds::IsParametrizedType(typeId) ?
- NScheme::TypeInfoFromProto(typeId,Meta.GetKeyColumnTypeInfos().at(i)) :
+ NScheme::TypeInfoFromProto(typeId, Meta.GetKeyColumnTypeInfos().at(i)) :
NScheme::TTypeInfo(typeId);
KeyColumnTypes.push_back(typeInfo);
}
@@ -127,19 +126,19 @@ void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) {
("ScanId", ev->Get()->ScanId)
("Finished", ev->Get()->Finished)
("Lock", [&]() {
- TStringBuilder builder;
- for (const auto& lock : ev->Get()->LocksInfo.Locks) {
- builder << lock.ShortDebugString();
- }
- return builder;
- }())
+ TStringBuilder builder;
+ for (const auto& lock : ev->Get()->LocksInfo.Locks) {
+ builder << lock.ShortDebugString();
+ }
+ return builder;
+ }())
("BrokenLocks", [&]() {
- TStringBuilder builder;
- for (const auto& lock : ev->Get()->LocksInfo.BrokenLocks) {
- builder << lock.ShortDebugString();
- }
- return builder;
- }());
+ TStringBuilder builder;
+ for (const auto& lock : ev->Get()->LocksInfo.BrokenLocks) {
+ builder << lock.ShortDebugString();
+ }
+ return builder;
+ }());
TInstant startTime = TActivationContext::Now();
if (ev->Get()->Finished) {
@@ -347,11 +346,12 @@ void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySet
if (!state.LastKey.empty()) {
PendingShards.front().LastKey = std::move(state.LastKey);
- while(!PendingShards.empty() && PendingShards.front().GetScanRanges(KeyColumnTypes).empty()) {
+ while (!PendingShards.empty() && PendingShards.front().GetScanRanges(KeyColumnTypes).empty()) {
CA_LOG_D("Nothing to read " << PendingShards.front().ToString(KeyColumnTypes));
auto readShard = std::move(PendingShards.front());
PendingShards.pop_front();
PendingShards.front().LastKey = std::move(readShard.LastKey);
+ PendingShards.front().LastCursorProto = std::move(readShard.LastCursorProto);
}
AFL_ENSURE(!PendingShards.empty());
@@ -409,7 +409,8 @@ bool TKqpScanFetcherActor::SendScanFinished() {
return true;
}
-std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> TKqpScanFetcherActor::BuildEvKqpScan(const ui32 scanId, const ui32 gen, const TSmallVec<TSerializedTableRange>& ranges) const {
+std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> TKqpScanFetcherActor::BuildEvKqpScan(const ui32 scanId, const ui32 gen,
+ const TSmallVec<TSerializedTableRange>& ranges, const std::optional<NKikimrKqp::TEvKqpScanCursor>& cursor) const {
auto ev = std::make_unique<TEvDataShard::TEvKqpScan>();
ev->Record.SetLocalPathId(ScanDataMeta.TableId.PathId.LocalPathId);
for (auto& column : ScanDataMeta.GetColumns()) {
@@ -423,6 +424,9 @@ std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> TKqpScanFetcherActor::BuildEv
}
}
ev->Record.MutableSkipNullKeys()->CopyFrom(Meta.GetSkipNullKeys());
+ if (cursor) {
+ *ev->Record.MutableScanCursor() = *cursor;
+ }
auto protoRanges = ev->Record.MutableRanges();
protoRanges->Reserve(ranges.size());
@@ -489,10 +493,11 @@ void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData
AFL_ENSURE(state->ActorId == ev->Sender)("expected", state->ActorId)("got", ev->Sender);
state->LastKey = std::move(msg.LastKey);
+ state->LastCursorProto = std::move(msg.LastCursorProto);
const ui64 rowsCount = msg.GetRowsCount();
AFL_ENSURE(!LockTxId || !msg.LocksInfo.Locks.empty() || !msg.LocksInfo.BrokenLocks.empty());
AFL_ENSURE(LockTxId || (msg.LocksInfo.Locks.empty() && msg.LocksInfo.BrokenLocks.empty()));
- AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("action","got EvScanData")("rows", rowsCount)("finished", msg.Finished)("exceeded", msg.RequestedBytesLimitReached)
+ AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("action", "got EvScanData")("rows", rowsCount)("finished", msg.Finished)("exceeded", msg.RequestedBytesLimitReached)
("scan", ScanId)("packs_to_send", InFlightComputes.GetPacksToSendCount())
("from", ev->Sender)("shards remain", PendingShards.size())
("in flight scans", InFlightShards.GetScansCount())
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
index 64d7041d72..d513939b4e 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
@@ -108,7 +108,8 @@ private:
bool SendScanFinished();
- virtual std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> BuildEvKqpScan(const ui32 scanId, const ui32 gen, const TSmallVec<TSerializedTableRange>& ranges) const override;
+ virtual std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> BuildEvKqpScan(const ui32 scanId, const ui32 gen,
+ const TSmallVec<TSerializedTableRange>& ranges, const std::optional<NKikimrKqp::TEvKqpScanCursor>& cursor) const override;
virtual const TVector<NScheme::TTypeInfo>& GetKeyColumnTypes() const override {
return KeyColumnTypes;
}
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index b8cf586a95..acb3fd2911 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1700,6 +1700,7 @@ message TColumnShardConfig {
optional bool ColumnChunksV0Usage = 25 [default = true];
optional bool ColumnChunksV1Usage = 26 [default = true];
optional uint64 MemoryLimitScanPortion = 27 [default = 100000000];
+ optional string ReaderClassName = 28;
}
message TSchemeShardConfig {
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index b487df72e0..2351a539ba 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -642,6 +642,19 @@ message TEvScanError {
optional uint64 TabletId = 4;
}
+message TEvKqpScanCursor {
+ message TColumnShardScanPlain {
+ }
+ message TColumnShardScanSimple {
+ optional uint64 SourceId = 1;
+ optional uint32 StartRecordIndex = 2;
+ }
+ oneof Implementation {
+ TColumnShardScanPlain ColumnShardPlain = 10;
+ TColumnShardScanSimple ColumnShardSimple = 11;
+ }
+}
+
message TEvRemoteScanData {
optional uint32 ScanId = 1;
optional uint64 CpuTimeUs = 2;
@@ -665,6 +678,7 @@ message TEvRemoteScanData {
optional bool RequestedBytesLimitReached = 11 [default = false];
optional uint32 AvailablePacks = 12;
+ optional TEvKqpScanCursor LastCursor = 13;
}
message TEvRemoteScanDataAck {
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index bf13ccf9ca..4a6b7de388 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -1700,6 +1700,8 @@ message TEvKqpScan {
optional TComputeShardingPolicy ComputeShardingPolicy = 23;
optional uint64 LockTxId = 24;
optional uint32 LockNodeId = 25;
+ optional string CSScanPolicy = 26;
+ optional NKikimrKqp.TEvKqpScanCursor ScanCursor = 27;
}
message TEvCompactTable {
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 8359c9ce06..39e53b35c8 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -94,7 +94,9 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e
AFL_VERIFY(ev->Get()->GetWriteStatus() == NKikimrProto::OK);
std::vector<TInsertedPortions> writtenPacks = ev->Get()->DetachInsertedPacks();
std::vector<TFailedWrite> fails = ev->Get()->DetachFails();
+ const TMonotonic now = TMonotonic::Now();
for (auto&& i : writtenPacks) {
+ Counters.OnWritePutBlobsSuccess(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount());
Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1);
}
for (auto&& i : fails) {
@@ -554,12 +556,15 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
return;
}
+ Counters.GetColumnTablesCounters()->GetPathIdCounter(pathId)->OnWriteEvent();
+
auto arrowData = std::make_shared<TArrowData>(schema);
if (!arrowData->Parse(operation, NEvWrite::TPayloadReader<NEvents::TDataEvents::TEvWrite>(*ev->Get()))) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "parsing data error");
ctx.Send(source, result.release(), 0, cookie);
+ return;
}
auto overloadStatus = CheckOverloaded(pathId);
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 9b7f928fdc..711bf5f0ca 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -59,6 +59,9 @@ class TTxInternalScan;
namespace NPlain {
class TIndexScannerConstructor;
}
+namespace NSimple {
+class TIndexScannerConstructor;
+}
} // namespace NReader
namespace NDataSharing {
@@ -109,7 +112,7 @@ class TSharingSessionsInitializer;
class TInFlightReadsInitializer;
class TSpecialValuesInitializer;
class TTablesManagerInitializer;
-}
+} // namespace NLoading
extern bool gAllowLogBatchingDefaultValue;
@@ -198,6 +201,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
friend class NOlap::NReader::TTxScan;
friend class NOlap::NReader::TTxInternalScan;
friend class NOlap::NReader::NPlain::TIndexScannerConstructor;
+ friend class NOlap::NReader::NSimple::TIndexScannerConstructor;
class TStoragesManager;
friend class TTxController;
@@ -246,7 +250,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
void Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvStartCompaction::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvMetadataAccessorsInfo::TPtr& ev, const TActorContext& ctx);
-
+
void Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvScanStats::TPtr& ev, const TActorContext& ctx);
diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h
index d9a02e1fef..428d74badd 100644
--- a/ydb/core/tx/columnshard/counters/scan.h
+++ b/ydb/core/tx/columnshard/counters/scan.h
@@ -295,6 +295,8 @@ private:
std::shared_ptr<TAtomicCounter> AssembleTasksCount;
std::shared_ptr<TAtomicCounter> ReadTasksCount;
std::shared_ptr<TAtomicCounter> ResourcesAllocationTasksCount;
+ std::shared_ptr<TAtomicCounter> ResultsForSourceCount;
+
public:
TScanAggregations Aggregations;
@@ -302,6 +304,10 @@ public:
return TCounterGuard(FetchAccessorsCount);
}
+ TCounterGuard GetResultsForSourceGuard() const {
+ return TCounterGuard(ResultsForSourceCount);
+ }
+
TCounterGuard GetMergeTasksGuard() const {
return TCounterGuard(MergeTasksCount);
}
@@ -320,7 +326,7 @@ public:
bool InWaiting() const {
return MergeTasksCount->Val() || AssembleTasksCount->Val() || ReadTasksCount->Val() || ResourcesAllocationTasksCount->Val() ||
- FetchAccessorsCount->Val();
+ FetchAccessorsCount->Val() || ResultsForSourceCount->Val();
}
void OnBlobsWaitDuration(const TDuration d, const TDuration fullScanDuration) const {
@@ -335,6 +341,7 @@ public:
, AssembleTasksCount(std::make_shared<TAtomicCounter>())
, ReadTasksCount(std::make_shared<TAtomicCounter>())
, ResourcesAllocationTasksCount(std::make_shared<TAtomicCounter>())
+ , ResultsForSourceCount(std::make_shared<TAtomicCounter>())
, Aggregations(TBase::BuildAggregations())
{
diff --git a/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp b/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp
index 5369ee82c5..29ada345be 100644
--- a/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp
@@ -350,6 +350,109 @@ std::vector<const TColumnRecord*> TPortionDataAccessor::GetColumnChunksPointers(
return result;
}
+std::vector<TPortionDataAccessor::TReadPage> TPortionDataAccessor::BuildReadPages(const ui64 memoryLimit, const std::set<ui32>& entityIds) const {
+ class TEntityDelimiter {
+ private:
+ YDB_READONLY(ui32, IndexStart, 0);
+ YDB_READONLY(ui32, EntityId, 0);
+ YDB_READONLY(ui32, ChunkIdx, 0);
+ YDB_READONLY(ui64, MemoryStartChunk, 0);
+ YDB_READONLY(ui64, MemoryFinishChunk, 0);
+
+ public:
+ TEntityDelimiter(const ui32 indexStart, const ui32 entityId, const ui32 chunkIdx, const ui64 memStartChunk, const ui64 memFinishChunk)
+ : IndexStart(indexStart)
+ , EntityId(entityId)
+ , ChunkIdx(chunkIdx)
+ , MemoryStartChunk(memStartChunk)
+ , MemoryFinishChunk(memFinishChunk) {
+ }
+
+ bool operator<(const TEntityDelimiter& item) const {
+ return std::tie(IndexStart, EntityId, ChunkIdx) < std::tie(item.IndexStart, item.EntityId, item.ChunkIdx);
+ }
+ };
+
+ class TGlobalDelimiter {
+ private:
+ YDB_READONLY(ui32, IndexStart, 0);
+ YDB_ACCESSOR(ui64, UsedMemory, 0);
+ YDB_ACCESSOR(ui64, WholeChunksMemory, 0);
+
+ public:
+ TGlobalDelimiter(const ui32 indexStart)
+ : IndexStart(indexStart) {
+ }
+ };
+
+ std::vector<TEntityDelimiter> delimiters;
+
+ ui32 lastAppliedId = 0;
+ ui32 currentRecordIdx = 0;
+ bool needOne = false;
+ const TColumnRecord* lastRecord = nullptr;
+ for (auto&& i : GetRecordsVerified()) {
+ if (lastAppliedId != i.GetEntityId()) {
+ if (delimiters.size()) {
+ AFL_VERIFY(delimiters.back().GetIndexStart() == PortionInfo->GetRecordsCount());
+ }
+ needOne = entityIds.contains(i.GetEntityId());
+ currentRecordIdx = 0;
+ lastAppliedId = i.GetEntityId();
+ lastRecord = nullptr;
+ }
+ if (!needOne) {
+ continue;
+ }
+ delimiters.emplace_back(
+ currentRecordIdx, i.GetEntityId(), i.GetChunkIdx(), i.GetMeta().GetRawBytes(), lastRecord ? lastRecord->GetMeta().GetRawBytes() : 0);
+ currentRecordIdx += i.GetMeta().GetRecordsCount();
+ if (currentRecordIdx == PortionInfo->GetRecordsCount()) {
+ delimiters.emplace_back(currentRecordIdx, i.GetEntityId(), i.GetChunkIdx() + 1, 0, i.GetMeta().GetRawBytes());
+ }
+ lastRecord = &i;
+ }
+ if (delimiters.empty()) {
+ return { TPortionDataAccessor::TReadPage(0, PortionInfo->GetRecordsCount(), 0) };
+ }
+ std::sort(delimiters.begin(), delimiters.end());
+ std::vector<TGlobalDelimiter> sumDelimiters;
+ for (auto&& i : delimiters) {
+ if (sumDelimiters.empty()) {
+ sumDelimiters.emplace_back(i.GetIndexStart());
+ } else if (sumDelimiters.back().GetIndexStart() != i.GetIndexStart()) {
+ AFL_VERIFY(sumDelimiters.back().GetIndexStart() < i.GetIndexStart());
+ TGlobalDelimiter backDelimiter(i.GetIndexStart());
+ backDelimiter.MutableWholeChunksMemory() = sumDelimiters.back().GetWholeChunksMemory();
+ backDelimiter.MutableUsedMemory() = sumDelimiters.back().GetUsedMemory();
+ sumDelimiters.emplace_back(std::move(backDelimiter));
+ }
+ sumDelimiters.back().MutableWholeChunksMemory() += i.GetMemoryFinishChunk();
+ sumDelimiters.back().MutableUsedMemory() += i.GetMemoryStartChunk();
+ }
+ std::vector<ui32> recordIdx = { 0 };
+ std::vector<ui64> packMemorySize;
+ const TGlobalDelimiter* lastBorder = &sumDelimiters.front();
+ for (auto&& i : sumDelimiters) {
+ const i64 sumMemory = (i64)i.GetUsedMemory() - (i64)lastBorder->GetWholeChunksMemory();
+ AFL_VERIFY(sumMemory > 0);
+ if (((ui64)sumMemory >= memoryLimit || i.GetIndexStart() == PortionInfo->GetRecordsCount()) && i.GetIndexStart()) {
+ AFL_VERIFY(lastBorder->GetIndexStart() < i.GetIndexStart());
+ recordIdx.emplace_back(i.GetIndexStart());
+ packMemorySize.emplace_back(sumMemory);
+ lastBorder = &i;
+ }
+ }
+ AFL_VERIFY(recordIdx.front() == 0);
+ AFL_VERIFY(recordIdx.back() == PortionInfo->GetRecordsCount())("real", JoinSeq(",", recordIdx))("expected", PortionInfo->GetRecordsCount());
+ AFL_VERIFY(recordIdx.size() == packMemorySize.size() + 1);
+ std::vector<TReadPage> pages;
+ for (ui32 i = 0; i < packMemorySize.size(); ++i) {
+ pages.emplace_back(recordIdx[i], recordIdx[i + 1] - recordIdx[i], packMemorySize[i]);
+ }
+ return pages;
+}
+
std::vector<TPortionDataAccessor::TPage> TPortionDataAccessor::BuildPages() const {
std::vector<TPage> pages;
struct TPart {
diff --git a/ydb/core/tx/columnshard/engines/portions/data_accessor.h b/ydb/core/tx/columnshard/engines/portions/data_accessor.h
index 46fbb43bc9..9e8f608385 100644
--- a/ydb/core/tx/columnshard/engines/portions/data_accessor.h
+++ b/ydb/core/tx/columnshard/engines/portions/data_accessor.h
@@ -436,6 +436,23 @@ public:
std::vector<TPage> BuildPages() const;
ui64 GetMinMemoryForReadColumns(const std::optional<std::set<ui32>>& columnIds) const;
+
+ class TReadPage {
+ private:
+ YDB_READONLY(ui32, IndexStart, 0);
+ YDB_READONLY(ui32, RecordsCount, 0);
+ YDB_READONLY(ui64, MemoryUsage, 0);
+
+ public:
+ TReadPage(const ui32 indexStart, const ui32 recordsCount, const ui64 memoryUsage)
+ : IndexStart(indexStart)
+ , RecordsCount(recordsCount)
+ , MemoryUsage(memoryUsage) {
+ AFL_VERIFY(RecordsCount);
+ }
+ };
+
+ std::vector<TReadPage> BuildReadPages(const ui64 memoryLimit, const std::set<ui32>& entityIds) const;
};
} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/predicate/filter.h b/ydb/core/tx/columnshard/engines/predicate/filter.h
index bbc70b5ff5..af9b339728 100644
--- a/ydb/core/tx/columnshard/engines/predicate/filter.h
+++ b/ydb/core/tx/columnshard/engines/predicate/filter.h
@@ -1,5 +1,9 @@
#pragma once
#include "range.h"
+
+#include <ydb/core/protos/tx_datashard.pb.h>
+#include <ydb/core/protos/kqp.pb.h>
+
#include <deque>
namespace NKikimr::NOlap {
@@ -88,4 +92,158 @@ public:
}
};
-}
+class ICursorEntity {
+private:
+ virtual ui64 DoGetEntityId() const = 0;
+ virtual ui64 DoGetEntityRecordsCount() const = 0;
+
+public:
+ ui64 GetEntityId() const {
+ return DoGetEntityId();
+ }
+ ui64 GetEntityRecordsCount() const {
+ return DoGetEntityRecordsCount();
+ }
+};
+
+class IScanCursor {
+private:
+ virtual const std::shared_ptr<arrow::RecordBatch>& DoGetPKCursor() const = 0;
+ virtual bool DoCheckEntityIsBorder(const std::shared_ptr<ICursorEntity>& entity, bool& usage) const = 0;
+ virtual bool DoCheckSourceIntervalUsage(const ui64 sourceId, const ui32 indexStart, const ui32 recordsCount) const = 0;
+ virtual TConclusionStatus DoDeserializeFromProto(const NKikimrKqp::TEvKqpScanCursor& proto) = 0;
+ virtual void DoSerializeToProto(NKikimrKqp::TEvKqpScanCursor& proto) const = 0;
+
+public:
+ virtual bool IsInitialized() const = 0;
+
+ virtual ~IScanCursor() = default;
+
+ const std::shared_ptr<arrow::RecordBatch>& GetPKCursor() const {
+ return DoGetPKCursor();
+ }
+
+ bool CheckSourceIntervalUsage(const ui64 sourceId, const ui32 indexStart, const ui32 recordsCount) const {
+ AFL_VERIFY(IsInitialized());
+ return DoCheckSourceIntervalUsage(sourceId, indexStart, recordsCount);
+ }
+
+ bool CheckEntityIsBorder(const std::shared_ptr<ICursorEntity>& entity, bool& usage) const {
+ AFL_VERIFY(IsInitialized());
+ return DoCheckEntityIsBorder(entity, usage);
+ }
+
+ TConclusionStatus DeserializeFromProto(const NKikimrKqp::TEvKqpScanCursor& proto) {
+ return DoDeserializeFromProto(proto);
+ }
+
+ NKikimrKqp::TEvKqpScanCursor SerializeToProto() const {
+ NKikimrKqp::TEvKqpScanCursor result;
+ DoSerializeToProto(result);
+ return result;
+ }
+};
+
+class TSimpleScanCursor: public IScanCursor {
+private:
+ YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, PrimaryKey);
+ YDB_READONLY(ui64, SourceId, 0);
+ YDB_READONLY(ui32, RecordIndex, 0);
+
+ virtual void DoSerializeToProto(NKikimrKqp::TEvKqpScanCursor& proto) const override {
+ proto.MutableColumnShardSimple()->SetSourceId(SourceId);
+ proto.MutableColumnShardSimple()->SetStartRecordIndex(RecordIndex);
+ }
+
+ virtual const std::shared_ptr<arrow::RecordBatch>& DoGetPKCursor() const override {
+ AFL_VERIFY(!!PrimaryKey);
+ return PrimaryKey;
+ }
+
+ virtual bool IsInitialized() const override {
+ return !!SourceId;
+ }
+
+ virtual bool DoCheckEntityIsBorder(const std::shared_ptr<ICursorEntity>& entity, bool& usage) const override {
+ if (SourceId != entity->GetEntityId()) {
+ return false;
+ }
+ AFL_VERIFY(RecordIndex <= entity->GetEntityRecordsCount());
+ usage = RecordIndex < entity->GetEntityRecordsCount();
+ return true;
+ }
+
+ virtual TConclusionStatus DoDeserializeFromProto(const NKikimrKqp::TEvKqpScanCursor& proto) override {
+ if (!proto.HasColumnShardSimple()) {
+ return TConclusionStatus::Success();
+ }
+ if (!proto.GetColumnShardSimple().HasSourceId()) {
+ return TConclusionStatus::Fail("incorrect source id for cursor initialization");
+ }
+ SourceId = proto.GetColumnShardSimple().GetSourceId();
+ if (!proto.GetColumnShardSimple().HasStartRecordIndex()) {
+ return TConclusionStatus::Fail("incorrect record index for cursor initialization");
+ }
+ RecordIndex = proto.GetColumnShardSimple().GetStartRecordIndex();
+ return TConclusionStatus::Success();
+ }
+
+ virtual bool DoCheckSourceIntervalUsage(const ui64 sourceId, const ui32 indexStart, const ui32 recordsCount) const override {
+ AFL_VERIFY(sourceId == SourceId);
+ if (indexStart >= RecordIndex) {
+ return true;
+ }
+ AFL_VERIFY(indexStart + recordsCount <= RecordIndex);
+ return false;
+ }
+
+public:
+ TSimpleScanCursor() = default;
+
+ TSimpleScanCursor(const std::shared_ptr<arrow::RecordBatch>& pk, const ui64 portionId, const ui32 recordIndex)
+ : PrimaryKey(pk)
+ , SourceId(portionId)
+ , RecordIndex(recordIndex) {
+ }
+};
+
+class TPlainScanCursor: public IScanCursor {
+private:
+ YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, PrimaryKey);
+
+ virtual void DoSerializeToProto(NKikimrKqp::TEvKqpScanCursor& proto) const override {
+ *proto.MutableColumnShardPlain() = {};
+ }
+
+ virtual bool IsInitialized() const override {
+ return !!PrimaryKey;
+ }
+
+ virtual const std::shared_ptr<arrow::RecordBatch>& DoGetPKCursor() const override {
+ AFL_VERIFY(!!PrimaryKey);
+ return PrimaryKey;
+ }
+
+ virtual TConclusionStatus DoDeserializeFromProto(const NKikimrKqp::TEvKqpScanCursor& /*proto*/) override {
+ return TConclusionStatus::Success();
+ }
+
+ virtual bool DoCheckEntityIsBorder(const std::shared_ptr<ICursorEntity>& /*entity*/, bool& usage) const override {
+ usage = true;
+ return true;
+ }
+
+ virtual bool DoCheckSourceIntervalUsage(const ui64 /*sourceId*/, const ui32 /*indexStart*/, const ui32 /*recordsCount*/) const override {
+ return true;
+ }
+
+public:
+ TPlainScanCursor() = default;
+
+ TPlainScanCursor(const std::shared_ptr<arrow::RecordBatch>& pk)
+ : PrimaryKey(pk) {
+ AFL_VERIFY(PrimaryKey);
+ }
+};
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/abstract/constructor.cpp
index 95a756f2f4..884bfa01bf 100644
--- a/ydb/core/tx/columnshard/engines/reader/abstract/constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/abstract/constructor.cpp
@@ -1,11 +1,13 @@
#include "constructor.h"
+
+#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/tx/columnshard/engines/reader/sys_view/abstract/policy.h>
#include <ydb/core/tx/program/program.h>
namespace NKikimr::NOlap::NReader {
-NKikimr::TConclusionStatus IScannerConstructor::ParseProgram(const TVersionedIndex* vIndex,
- const NKikimrSchemeOp::EOlapProgramType programType, const TString& serializedProgram, TReadDescription& read, const IColumnResolver& columnResolver) const {
+NKikimr::TConclusionStatus IScannerConstructor::ParseProgram(const TVersionedIndex* vIndex, const NKikimrSchemeOp::EOlapProgramType programType,
+ const TString& serializedProgram, TReadDescription& read, const IColumnResolver& columnResolver) const {
AFL_VERIFY(!read.ColumnIds.size() || !read.ColumnNames.size());
std::vector<TString> names;
std::set<TString> namesChecker;
@@ -47,7 +49,8 @@ NKikimr::TConclusionStatus IScannerConstructor::ParseProgram(const TVersionedInd
}
const auto getDiffColumnsMessage = [&]() {
- return TStringBuilder() << "ssa program has different columns with kqp request: kqp_columns=" << JoinSeq(",", namesChecker) << " vs program_columns=" << JoinSeq(",", programColumns);
+ return TStringBuilder() << "ssa program has different columns with kqp request: kqp_columns=" << JoinSeq(",", namesChecker)
+ << " vs program_columns=" << JoinSeq(",", programColumns);
};
if (namesChecker.size() != programColumns.size()) {
@@ -66,7 +69,8 @@ NKikimr::TConclusionStatus IScannerConstructor::ParseProgram(const TVersionedInd
}
}
-NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> IScannerConstructor::BuildReadMetadata(const NColumnShard::TColumnShard* self, const TReadDescription& read) const {
+NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> IScannerConstructor::BuildReadMetadata(
+ const NColumnShard::TColumnShard* self, const TReadDescription& read) const {
TConclusion<std::shared_ptr<TReadMetadataBase>> result = DoBuildReadMetadata(self, read);
if (result.IsFail()) {
return result;
@@ -78,4 +82,17 @@ NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> IScannerConstructor::Bu
}
}
-} \ No newline at end of file
+NKikimr::TConclusion<std::shared_ptr<NKikimr::NOlap::IScanCursor>> IScannerConstructor::BuildCursorFromProto(
+ const NKikimrKqp::TEvKqpScanCursor& proto) const {
+ auto result = DoBuildCursor();
+ if (!result) {
+ return result;
+ }
+ auto status = result->DeserializeFromProto(proto);
+ if (status.IsFail()) {
+ return status;
+ }
+ return result;
+}
+
+} // namespace NKikimr::NOlap::NReader
diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/constructor.h b/ydb/core/tx/columnshard/engines/reader/abstract/constructor.h
index 1eb95f2b22..21fbe1f0ac 100644
--- a/ydb/core/tx/columnshard/engines/reader/abstract/constructor.h
+++ b/ydb/core/tx/columnshard/engines/reader/abstract/constructor.h
@@ -8,6 +8,22 @@
namespace NKikimr::NOlap::NReader {
+class TScannerConstructorContext {
+private:
+ YDB_READONLY(TSnapshot, Snapshot, TSnapshot::Zero());
+ YDB_READONLY(ui32, ItemsLimit, 0);
+ YDB_READONLY(bool, Reverse, false);
+
+public:
+ TScannerConstructorContext(const TSnapshot& snapshot, const ui32 itemsLimit, const bool reverse)
+ : Snapshot(snapshot)
+ , ItemsLimit(itemsLimit)
+ , Reverse(reverse)
+ {
+
+ }
+};
+
class IScannerConstructor {
protected:
const TSnapshot Snapshot;
@@ -17,17 +33,21 @@ protected:
const TString& serializedProgram, TReadDescription& read, const IColumnResolver& columnResolver) const;
private:
virtual TConclusion<std::shared_ptr<TReadMetadataBase>> DoBuildReadMetadata(const NColumnShard::TColumnShard* self, const TReadDescription& read) const = 0;
+ virtual std::shared_ptr<IScanCursor> DoBuildCursor() const = 0;
+
public:
+ using TFactory = NObjectFactory::TParametrizedObjectFactory<IScannerConstructor, TString, TScannerConstructorContext>;
virtual ~IScannerConstructor() = default;
- IScannerConstructor(const TSnapshot& snapshot, const ui64 itemsLimit, const bool reverse)
- : Snapshot(snapshot)
- , ItemsLimit(itemsLimit)
- , IsReverse(reverse)
+ IScannerConstructor(const TScannerConstructorContext& context)
+ : Snapshot(context.GetSnapshot())
+ , ItemsLimit(context.GetItemsLimit())
+ , IsReverse(context.GetReverse())
{
}
+ TConclusion<std::shared_ptr<IScanCursor>> BuildCursorFromProto(const NKikimrKqp::TEvKqpScanCursor& proto) const;
virtual TConclusionStatus ParseProgram(const TVersionedIndex* vIndex, const NKikimrTxDataShard::TEvKqpScan& proto, TReadDescription& read) const = 0;
virtual std::vector<TNameTypeInfo> GetPrimaryKeyScheme(const NColumnShard::TColumnShard* self) const = 0;
TConclusion<std::shared_ptr<TReadMetadataBase>> BuildReadMetadata(const NColumnShard::TColumnShard* self, const TReadDescription& read) const;
diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h
index e885d4461d..e55b80f307 100644
--- a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h
@@ -53,7 +53,6 @@ private:
const TActorId ReadCoordinatorActorId;
const TComputeShardingPolicy ComputeShardingPolicy;
TAtomic AbortFlag = 0;
-
public:
template <class T>
std::shared_ptr<const T> GetReadMetadataPtrVerifiedAs() const {
@@ -62,6 +61,10 @@ public:
return result;
}
+ const std::shared_ptr<IScanCursor>& GetScanCursor() const {
+ return ReadMetadata->GetScanCursor();
+ }
+
void AbortWithError(const TString& errorMessage) {
if (AtomicCas(&AbortFlag, 1, 0)) {
NActors::TActivationContext::Send(
diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h
index f144bf05f9..5d1a684e02 100644
--- a/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h
@@ -45,6 +45,7 @@ private:
std::shared_ptr<TVersionedIndex> IndexVersionsPointer;
TSnapshot RequestSnapshot;
std::optional<TGranuleShardingInfo> RequestShardingInfo;
+ std::shared_ptr<IScanCursor> ScanCursor;
virtual void DoOnReadFinished(NColumnShard::TColumnShard& /*owner*/) const {
}
virtual void DoOnBeforeStartReading(NColumnShard::TColumnShard& /*owner*/) const {
@@ -68,6 +69,10 @@ public:
return TxId;
}
+ const std::shared_ptr<IScanCursor>& GetScanCursor() const {
+ return ScanCursor;
+ }
+
std::optional<ui64> GetLockId() const {
return LockId;
}
@@ -135,12 +140,14 @@ public:
}
TReadMetadataBase(const std::shared_ptr<TVersionedIndex> index, const ESorting sorting, const TProgramContainer& ssaProgram,
- const std::shared_ptr<ISnapshotSchema>& schema, const TSnapshot& requestSnapshot)
+ const std::shared_ptr<ISnapshotSchema>& schema, const TSnapshot& requestSnapshot, const std::shared_ptr<IScanCursor>& scanCursor)
: Sorting(sorting)
, Program(ssaProgram)
, IndexVersionsPointer(index)
, RequestSnapshot(requestSnapshot)
- , ResultIndexSchema(schema) {
+ , ScanCursor(scanCursor)
+ , ResultIndexSchema(schema)
+ {
}
virtual ~TReadMetadataBase() = default;
diff --git a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
index 8e1f0a643c..6e2af3e7f7 100644
--- a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
@@ -255,15 +255,16 @@ bool TColumnShardScan::ProduceResults() noexcept {
"batch_columns", JoinSeq(",", batch->schema()->field_names()));
}
if (CurrentLastReadKey) {
- NArrow::NMerger::TSortableBatchPosition pNew(
- result.GetLastReadKey(), 0, result.GetLastReadKey()->schema()->field_names(), {}, ReadMetadataRange->IsDescSorted());
- NArrow::NMerger::TSortableBatchPosition pOld(
- CurrentLastReadKey, 0, CurrentLastReadKey->schema()->field_names(), {}, ReadMetadataRange->IsDescSorted());
- AFL_VERIFY(pOld < pNew)("old", pOld.DebugJson().GetStringRobust())("new", pNew.DebugJson().GetStringRobust());
+ NArrow::NMerger::TSortableBatchPosition pNew(result.GetScanCursor()->GetPKCursor(), 0,
+ result.GetScanCursor()->GetPKCursor()->schema()->field_names(), {}, ReadMetadataRange->IsDescSorted());
+ NArrow::NMerger::TSortableBatchPosition pOld(CurrentLastReadKey->GetPKCursor(), 0,
+ CurrentLastReadKey->GetPKCursor()->schema()->field_names(), {}, ReadMetadataRange->IsDescSorted());
+ AFL_VERIFY(!(pNew < pOld))("old", pOld.DebugJson().GetStringRobust())("new", pNew.DebugJson().GetStringRobust());
}
- CurrentLastReadKey = result.GetLastReadKey();
+ CurrentLastReadKey = result.GetScanCursor();
- Result->LastKey = ConvertLastKey(result.GetLastReadKey());
+ Result->LastKey = ConvertLastKey(result.GetScanCursor()->GetPKCursor());
+ Result->LastCursorProto = result.GetScanCursor()->SerializeToProto();
SendResult(false, false);
ScanIterator->OnSentDataFromInterval(result.GetNotFinishedIntervalIdx());
ACFL_DEBUG("stage", "finished")("iterator", ScanIterator->DebugString());
@@ -303,8 +304,8 @@ void TColumnShardScan::ContinueProcessing() {
}
}
}
- AFL_VERIFY(!ScanIterator || !ChunksLimiter.HasMore() || ScanCountersPool.InWaiting())("scan_actor_id", ScanActorId)("tx_id", TxId)("scan_id", ScanId)(
- "gen", ScanGen)("tablet", TabletId)("debug", ScanIterator->DebugString());
+// AFL_VERIFY(!ScanIterator || !ChunksLimiter.HasMore() || ScanCountersPool.InWaiting())("scan_actor_id", ScanActorId)("tx_id", TxId)("scan_id", ScanId)(
+// "gen", ScanGen)("tablet", TabletId)("debug", ScanIterator->DebugString());
}
void TColumnShardScan::MakeResult(size_t reserveRows /*= 0*/) {
diff --git a/ydb/core/tx/columnshard/engines/reader/actor/actor.h b/ydb/core/tx/columnshard/engines/reader/actor/actor.h
index 454f0d0d79..6ed07d077a 100644
--- a/ydb/core/tx/columnshard/engines/reader/actor/actor.h
+++ b/ydb/core/tx/columnshard/engines/reader/actor/actor.h
@@ -136,7 +136,7 @@ private:
TChunksLimiter ChunksLimiter;
THolder<NKqp::TEvKqpCompute::TEvScanData> Result;
- std::shared_ptr<arrow::RecordBatch> CurrentLastReadKey;
+ std::shared_ptr<IScanCursor> CurrentLastReadKey;
bool Finished = false;
std::optional<TMonotonic> LastResultInstant;
diff --git a/ydb/core/tx/columnshard/engines/reader/common/description.h b/ydb/core/tx/columnshard/engines/reader/common/description.h
index c180dcc8d0..58872a627b 100644
--- a/ydb/core/tx/columnshard/engines/reader/common/description.h
+++ b/ydb/core/tx/columnshard/engines/reader/common/description.h
@@ -11,6 +11,8 @@ struct TReadDescription {
private:
TSnapshot Snapshot;
TProgramContainer Program;
+ std::shared_ptr<IScanCursor> ScanCursor;
+
public:
// Table
ui64 TxId = 0;
@@ -27,7 +29,17 @@ public:
// List of columns
std::vector<ui32> ColumnIds;
std::vector<TString> ColumnNames;
-
+
+ const std::shared_ptr<IScanCursor>& GetScanCursor() const {
+ AFL_VERIFY(ScanCursor);
+ return ScanCursor;
+ }
+
+ void SetScanCursor(const std::shared_ptr<IScanCursor>& cursor) {
+ AFL_VERIFY(!ScanCursor);
+ ScanCursor = cursor;
+ }
+
TReadDescription(const TSnapshot& snapshot, const bool isReverse)
: Snapshot(snapshot)
, PKRangesFilter(std::make_shared<NOlap::TPKRangesFilter>(isReverse)) {
diff --git a/ydb/core/tx/columnshard/engines/reader/common/result.h b/ydb/core/tx/columnshard/engines/reader/common/result.h
index e3028b01b5..6173d3147e 100644
--- a/ydb/core/tx/columnshard/engines/reader/common/result.h
+++ b/ydb/core/tx/columnshard/engines/reader/common/result.h
@@ -18,7 +18,7 @@ private:
// This 1-row batch contains the last key that was read while producing the ResultBatch.
// NOTE: it might be different from the Key of last row in ResulBatch in case of filtering/aggregation/limit
- std::shared_ptr<arrow::RecordBatch> LastReadKey;
+ std::shared_ptr<IScanCursor> ScanCursor;
YDB_READONLY_DEF(std::optional<ui32>, NotFinishedIntervalIdx);
public:
@@ -50,26 +50,25 @@ public:
return ResultBatch;
}
- const std::shared_ptr<arrow::RecordBatch>& GetLastReadKey() const {
- return LastReadKey;
+ const std::shared_ptr<IScanCursor>& GetScanCursor() const {
+ return ScanCursor;
}
explicit TPartialReadResult(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& resourcesGuard,
std::shared_ptr<NGroupedMemoryManager::TGroupGuard>&& gGuard, const NArrow::TShardedRecordBatch& batch,
- std::shared_ptr<arrow::RecordBatch> lastKey, const std::optional<ui32> notFinishedIntervalIdx)
+ const std::shared_ptr<IScanCursor>& scanCursor, const std::optional<ui32> notFinishedIntervalIdx)
: ResourcesGuard(std::move(resourcesGuard))
, GroupGuard(std::move(gGuard))
, ResultBatch(batch)
- , LastReadKey(lastKey)
+ , ScanCursor(scanCursor)
, NotFinishedIntervalIdx(notFinishedIntervalIdx) {
Y_ABORT_UNLESS(ResultBatch.GetRecordsCount());
- Y_ABORT_UNLESS(LastReadKey);
- Y_ABORT_UNLESS(LastReadKey->num_rows() == 1);
+ Y_ABORT_UNLESS(ScanCursor);
}
- explicit TPartialReadResult(
- const NArrow::TShardedRecordBatch& batch, std::shared_ptr<arrow::RecordBatch> lastKey, const std::optional<ui32> notFinishedIntervalIdx)
- : TPartialReadResult(nullptr, nullptr, batch, lastKey, notFinishedIntervalIdx) {
+ explicit TPartialReadResult(const NArrow::TShardedRecordBatch& batch, const std::shared_ptr<IScanCursor>& scanCursor,
+ const std::optional<ui32> notFinishedIntervalIdx)
+ : TPartialReadResult(nullptr, nullptr, batch, scanCursor, notFinishedIntervalIdx) {
}
};
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.cpp
index 78926d99dc..e343b4674d 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.cpp
@@ -3,6 +3,7 @@
#include "resolver.h"
#include <ydb/core/tx/columnshard/columnshard_impl.h>
+#include <ydb/core/tx/columnshard/engines/predicate/filter.h>
namespace NKikimr::NOlap::NReader::NPlain {
@@ -35,7 +36,7 @@ NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> TIndexScannerConstructo
TDataStorageAccessor dataAccessor(insertTable, index);
AFL_VERIFY(read.PathId);
auto readMetadata = std::make_shared<TReadMetadata>(read.PathId, index->CopyVersionedIndexPtr(), read.GetSnapshot(),
- IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, read.GetProgram());
+ IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, read.GetProgram(), nullptr);
auto initResult = readMetadata->Init(self, read, dataAccessor);
if (!initResult) {
@@ -44,4 +45,8 @@ NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> TIndexScannerConstructo
return static_pointer_cast<TReadMetadataBase>(readMetadata);
}
+std::shared_ptr<IScanCursor> TIndexScannerConstructor::DoBuildCursor() const {
+ return std::make_shared<TPlainScanCursor>();
+}
+
} // namespace NKikimr::NOlap::NReader::NPlain
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.h
index bb576fdbdc..3a534cd0d9 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.h
@@ -4,14 +4,23 @@
namespace NKikimr::NOlap::NReader::NPlain {
class TIndexScannerConstructor: public IScannerConstructor {
+public:
+ static TString GetClassNameStatic() {
+ return "PLAIN";
+ }
private:
using TBase = IScannerConstructor;
+ static const inline TFactory::TRegistrator<TIndexScannerConstructor> Registrator =
+ TFactory::TRegistrator<TIndexScannerConstructor>(GetClassNameStatic());
+
+ virtual std::shared_ptr<IScanCursor> DoBuildCursor() const override;
+
protected:
virtual TConclusion<std::shared_ptr<TReadMetadataBase>> DoBuildReadMetadata(const NColumnShard::TColumnShard* self, const TReadDescription& read) const override;
public:
- using TBase::TBase;
virtual TConclusionStatus ParseProgram(const TVersionedIndex* vIndex, const NKikimrTxDataShard::TEvKqpScan& proto, TReadDescription& read) const override;
virtual std::vector<TNameTypeInfo> GetPrimaryKeyScheme(const NColumnShard::TColumnShard* self) const override;
+ using TBase::TBase;
};
} \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h
index 317ee0f03d..34ef6496fd 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h
@@ -115,11 +115,11 @@ public:
std::vector<TCommittedBlob> CommittedBlobs;
std::shared_ptr<TReadStats> ReadStats;
- TReadMetadata(const ui64 pathId, const std::shared_ptr<TVersionedIndex> info, const TSnapshot& snapshot, const ESorting sorting, const TProgramContainer& ssaProgram)
- : TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot)
+ TReadMetadata(const ui64 pathId, const std::shared_ptr<TVersionedIndex> info, const TSnapshot& snapshot, const ESorting sorting,
+ const TProgramContainer& ssaProgram, const std::shared_ptr<IScanCursor>& scanCursor)
+ : TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot, scanCursor)
, PathId(pathId)
- , ReadStats(std::make_shared<TReadStats>())
- {
+ , ReadStats(std::make_shared<TReadStats>()) {
}
virtual std::vector<TNameTypeInfo> GetKeyYqlSchema() const override {
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/ya.make b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/ya.make
index 1ab8264148..883f2b6b8e 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/ya.make
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/ya.make
@@ -1,7 +1,7 @@
LIBRARY()
SRCS(
- constructor.cpp
+ GLOBAL constructor.cpp
resolver.cpp
read_metadata.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp
index 59f55446cd..6298efcdd1 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp
@@ -27,7 +27,8 @@ void TScanHead::OnIntervalResult(std::shared_ptr<NGroupedMemoryManager::TAllocat
} else {
gGuard = itInterval->second->GetGroupGuard();
}
- AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, std::make_shared<TPartialReadResult>(std::move(allocationGuard), std::move(gGuard), *newBatch, lastPK, callbackIdxSubscriver)).second);
+ AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, std::make_shared<TPartialReadResult>(std::move(allocationGuard), std::move(gGuard), *newBatch,
+ std::make_shared<TPlainScanCursor>(lastPK), callbackIdxSubscriver)).second);
} else {
AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, nullptr).second);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/constructor.cpp
new file mode 100644
index 0000000000..4a3946192f
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/constructor.cpp
@@ -0,0 +1,47 @@
+#include "constructor.h"
+#include "read_metadata.h"
+#include "resolver.h"
+
+#include <ydb/core/tx/columnshard/columnshard_impl.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+NKikimr::TConclusionStatus TIndexScannerConstructor::ParseProgram(
+ const TVersionedIndex* vIndex, const NKikimrTxDataShard::TEvKqpScan& proto, TReadDescription& read) const {
+ AFL_VERIFY(vIndex);
+ auto& indexInfo = vIndex->GetSchemaVerified(Snapshot)->GetIndexInfo();
+ TIndexColumnResolver columnResolver(indexInfo);
+ return TBase::ParseProgram(vIndex, proto.GetOlapProgramType(), proto.GetOlapProgram(), read, columnResolver);
+}
+
+std::vector<TNameTypeInfo> TIndexScannerConstructor::GetPrimaryKeyScheme(const NColumnShard::TColumnShard* self) const {
+ auto& indexInfo = self->TablesManager.GetIndexInfo(Snapshot);
+ return indexInfo.GetPrimaryKeyColumns();
+}
+
+NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> TIndexScannerConstructor::DoBuildReadMetadata(
+ const NColumnShard::TColumnShard* self, const TReadDescription& read) const {
+ auto& insertTable = self->InsertTable;
+ auto& index = self->TablesManager.GetPrimaryIndex();
+ if (!insertTable || !index) {
+ return std::shared_ptr<TReadMetadataBase>();
+ }
+
+ if (read.GetSnapshot().GetPlanInstant() < self->GetMinReadSnapshot().GetPlanInstant()) {
+ return TConclusionStatus::Fail(TStringBuilder() << "Snapshot too old: " << read.GetSnapshot() << ". CS min read snapshot: "
+ << self->GetMinReadSnapshot() << ". now: " << TInstant::Now());
+ }
+
+ TDataStorageAccessor dataAccessor(insertTable, index);
+ AFL_VERIFY(read.PathId);
+ auto readMetadata = std::make_shared<TReadMetadata>(read.PathId, index->CopyVersionedIndexPtr(), read.GetSnapshot(),
+ IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, read.GetProgram(), read.GetScanCursor());
+
+ auto initResult = readMetadata->Init(self, read, dataAccessor);
+ if (!initResult) {
+ return initResult;
+ }
+ return static_pointer_cast<TReadMetadataBase>(readMetadata);
+}
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/constructor.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/constructor.h
new file mode 100644
index 0000000000..76596f8dd9
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/constructor.h
@@ -0,0 +1,28 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/reader/abstract/constructor.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+class TIndexScannerConstructor: public IScannerConstructor {
+public:
+ static TString GetClassNameStatic() {
+ return "SIMPLE";
+ }
+
+private:
+ using TBase = IScannerConstructor;
+ static const inline TFactory::TRegistrator<TIndexScannerConstructor> Registrator =
+ TFactory::TRegistrator<TIndexScannerConstructor>(GetClassNameStatic());
+ virtual std::shared_ptr<IScanCursor> DoBuildCursor() const override {
+ return std::make_shared<TSimpleScanCursor>();
+ }
+
+protected:
+ virtual TConclusion<std::shared_ptr<TReadMetadataBase>> DoBuildReadMetadata(const NColumnShard::TColumnShard* self, const TReadDescription& read) const override;
+public:
+ using TBase::TBase;
+ virtual TConclusionStatus ParseProgram(const TVersionedIndex* vIndex, const NKikimrTxDataShard::TEvKqpScan& proto, TReadDescription& read) const override;
+ virtual std::vector<TNameTypeInfo> GetPrimaryKeyScheme(const NColumnShard::TColumnShard* self) const override;
+};
+
+} \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.cpp
new file mode 100644
index 0000000000..d57492b742
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.cpp
@@ -0,0 +1,124 @@
+#include "read_metadata.h"
+
+#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
+#include <ydb/core/tx/columnshard/columnshard_impl.h>
+#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/iterator.h>
+#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h>
+#include <ydb/core/tx/columnshard/transactions/locks/read_finished.h>
+#include <ydb/core/tx/columnshard/transactions/locks/read_start.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+std::unique_ptr<TScanIteratorBase> TReadMetadata::StartScan(const std::shared_ptr<TReadContext>& readContext) const {
+ return std::make_unique<TColumnShardScanIterator>(readContext, readContext->GetReadMetadataPtrVerifiedAs<TReadMetadata>());
+}
+
+TConclusionStatus TReadMetadata::Init(
+ const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor) {
+ SetPKRangesFilter(readDescription.PKRangesFilter);
+ InitShardingInfo(readDescription.PathId);
+ TxId = readDescription.TxId;
+ LockId = readDescription.LockId;
+ if (LockId) {
+ owner->GetOperationsManager().RegisterLock(*LockId, owner->Generation());
+ LockSharingInfo = owner->GetOperationsManager().GetLockVerified(*LockId).GetSharingInfo();
+ }
+
+ SelectInfo = dataAccessor.Select(readDescription, !!LockId);
+ if (LockId) {
+ for (auto&& i : SelectInfo->PortionsOrderedPK) {
+ if (i->HasInsertWriteId() && !i->HasCommitSnapshot()) {
+ if (owner->HasLongTxWrites(i->GetInsertWriteIdVerified())) {
+ } else {
+ auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(i->GetInsertWriteIdVerified());
+ AddWriteIdToCheck(i->GetInsertWriteIdVerified(), op->GetLockId());
+ }
+ }
+ }
+ }
+
+ StatsMode = readDescription.StatsMode;
+ return TConclusionStatus::Success();
+}
+
+std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
+ auto& indexInfo = ResultIndexSchema->GetIndexInfo();
+ std::set<ui32> result;
+ for (auto&& i : GetProgram().GetEarlyFilterColumns()) {
+ auto id = indexInfo.GetColumnIdOptional(i);
+ if (id) {
+ result.emplace(*id);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i);
+ }
+ }
+ return result;
+}
+
+std::set<ui32> TReadMetadata::GetPKColumnIds() const {
+ std::set<ui32> result;
+ auto& indexInfo = ResultIndexSchema->GetIndexInfo();
+ for (auto&& i : indexInfo.GetPrimaryKeyColumns()) {
+ Y_ABORT_UNLESS(result.emplace(indexInfo.GetColumnIdVerified(i.first)).second);
+ }
+ return result;
+}
+
+std::shared_ptr<IDataReader> TReadMetadata::BuildReader(const std::shared_ptr<TReadContext>& context) const {
+ return std::make_shared<TPlainReadData>(context);
+}
+
+NArrow::NMerger::TSortableBatchPosition TReadMetadata::BuildSortedPosition(const NArrow::TReplaceKey& key) const {
+ return NArrow::NMerger::TSortableBatchPosition(key.ToBatch(GetReplaceKey()), 0, GetReplaceKey()->field_names(), {}, IsDescSorted());
+}
+
+void TReadMetadata::DoOnReadFinished(NColumnShard::TColumnShard& owner) const {
+ if (!GetLockId()) {
+ return;
+ }
+ const ui64 lock = *GetLockId();
+ if (GetBrokenWithCommitted()) {
+ owner.GetOperationsManager().GetLockVerified(lock).SetBroken();
+ } else {
+ NOlap::NTxInteractions::TTxConflicts conflicts;
+ for (auto&& i : GetConflictableLockIds()) {
+ conflicts.Add(i, lock);
+ }
+ auto writer = std::make_shared<NOlap::NTxInteractions::TEvReadFinishedWriter>(PathId, conflicts);
+ owner.GetOperationsManager().AddEventForLock(owner, lock, writer);
+ }
+}
+
+void TReadMetadata::DoOnBeforeStartReading(NColumnShard::TColumnShard& owner) const {
+ if (!LockId) {
+ return;
+ }
+ auto evWriter = std::make_shared<NOlap::NTxInteractions::TEvReadStartWriter>(
+ PathId, GetResultSchema()->GetIndexInfo().GetPrimaryKey(), GetPKRangesFilterPtr(), GetConflictableLockIds());
+ owner.GetOperationsManager().AddEventForLock(owner, *LockId, evWriter);
+}
+
+void TReadMetadata::DoOnReplyConstruction(const ui64 tabletId, NKqp::NInternalImplementation::TEvScanData& scanData) const {
+ if (LockSharingInfo) {
+ NKikimrDataEvents::TLock lockInfo;
+ lockInfo.SetLockId(LockSharingInfo->GetLockId());
+ lockInfo.SetGeneration(LockSharingInfo->GetGeneration());
+ lockInfo.SetDataShard(tabletId);
+ lockInfo.SetCounter(LockSharingInfo->GetCounter());
+ lockInfo.SetPathId(PathId);
+ lockInfo.SetHasWrites(LockSharingInfo->HasWrites());
+ if (LockSharingInfo->IsBroken()) {
+ scanData.LocksInfo.BrokenLocks.emplace_back(std::move(lockInfo));
+ } else {
+ scanData.LocksInfo.Locks.emplace_back(std::move(lockInfo));
+ }
+ }
+}
+
+bool TReadMetadata::IsMyUncommitted(const TInsertWriteId writeId) const {
+ AFL_VERIFY(LockSharingInfo);
+ auto it = ConflictedWriteIds.find(writeId);
+ AFL_VERIFY(it != ConflictedWriteIds.end())("write_id", writeId)("write_ids_count", ConflictedWriteIds.size());
+ return it->second.GetLockId() == LockSharingInfo->GetLockId();
+}
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h
new file mode 100644
index 0000000000..f894284dfd
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h
@@ -0,0 +1,172 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
+#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
+#include <ydb/library/formats/arrow/replace_key.h>
+#include <ydb/core/tx/columnshard/engines/reader/common/stats.h>
+#include <ydb/core/formats/arrow/reader/position.h>
+
+namespace NKikimr::NColumnShard {
+class TLockSharingInfo;
+}
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+// Holds all metadata that is needed to perform read/scan
+struct TReadMetadata : public TReadMetadataBase {
+ using TBase = TReadMetadataBase;
+
+private:
+ const ui64 PathId;
+ std::shared_ptr<TAtomicCounter> BrokenWithCommitted = std::make_shared<TAtomicCounter>();
+ std::shared_ptr<NColumnShard::TLockSharingInfo> LockSharingInfo;
+
+ class TWriteIdInfo {
+ private:
+ const ui64 LockId;
+ std::shared_ptr<TAtomicCounter> Conflicts;
+
+ public:
+ TWriteIdInfo(const ui64 lockId, const std::shared_ptr<TAtomicCounter>& counter)
+ : LockId(lockId)
+ , Conflicts(counter) {
+ }
+
+ ui64 GetLockId() const {
+ return LockId;
+ }
+
+ void MarkAsConflictable() const {
+ Conflicts->Inc();
+ }
+
+ bool IsConflictable() const {
+ return Conflicts->Val();
+ }
+ };
+
+ THashMap<ui64, std::shared_ptr<TAtomicCounter>> LockConflictCounters;
+ THashMap<TInsertWriteId, TWriteIdInfo> ConflictedWriteIds;
+
+ virtual void DoOnReadFinished(NColumnShard::TColumnShard& owner) const override;
+ virtual void DoOnBeforeStartReading(NColumnShard::TColumnShard& owner) const override;
+ virtual void DoOnReplyConstruction(const ui64 tabletId, NKqp::NInternalImplementation::TEvScanData& scanData) const override;
+
+public:
+ using TConstPtr = std::shared_ptr<const TReadMetadata>;
+
+ bool GetBrokenWithCommitted() const {
+ return BrokenWithCommitted->Val();
+ }
+ THashSet<ui64> GetConflictableLockIds() const {
+ THashSet<ui64> result;
+ for (auto&& i : ConflictedWriteIds) {
+ if (i.second.IsConflictable()) {
+ result.emplace(i.second.GetLockId());
+ }
+ }
+ return result;
+ }
+
+ bool IsLockConflictable(const ui64 lockId) const {
+ auto it = LockConflictCounters.find(lockId);
+ AFL_VERIFY(it != LockConflictCounters.end());
+ return it->second->Val();
+ }
+
+ bool IsWriteConflictable(const TInsertWriteId writeId) const {
+ auto it = ConflictedWriteIds.find(writeId);
+ AFL_VERIFY(it != ConflictedWriteIds.end());
+ return it->second.IsConflictable();
+ }
+
+ void AddWriteIdToCheck(const TInsertWriteId writeId, const ui64 lockId) {
+ auto it = LockConflictCounters.find(lockId);
+ if (it == LockConflictCounters.end()) {
+ it = LockConflictCounters.emplace(lockId, std::make_shared<TAtomicCounter>()).first;
+ }
+ AFL_VERIFY(ConflictedWriteIds.emplace(writeId, TWriteIdInfo(lockId, it->second)).second);
+ }
+
+ [[nodiscard]] bool IsMyUncommitted(const TInsertWriteId writeId) const;
+
+ void SetConflictedWriteId(const TInsertWriteId writeId) const {
+ auto it = ConflictedWriteIds.find(writeId);
+ AFL_VERIFY(it != ConflictedWriteIds.end());
+ it->second.MarkAsConflictable();
+ }
+
+ void SetBrokenWithCommitted() const {
+ BrokenWithCommitted->Inc();
+ }
+
+ NArrow::NMerger::TSortableBatchPosition BuildSortedPosition(const NArrow::TReplaceKey& key) const;
+ std::shared_ptr<IDataReader> BuildReader(const std::shared_ptr<TReadContext>& context) const;
+
+ bool HasProcessingColumnIds() const {
+ return GetProgram().HasProcessingColumnIds();
+ }
+
+ ui64 GetPathId() const {
+ return PathId;
+ }
+
+ std::shared_ptr<TSelectInfo> SelectInfo;
+ NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE;
+ std::shared_ptr<TReadStats> ReadStats;
+
+ TReadMetadata(const ui64 pathId, const std::shared_ptr<TVersionedIndex> info, const TSnapshot& snapshot, const ESorting sorting,
+ const TProgramContainer& ssaProgram, const std::shared_ptr<IScanCursor>& scanCursor)
+ : TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot, scanCursor)
+ , PathId(pathId)
+ , ReadStats(std::make_shared<TReadStats>())
+ {
+ }
+
+ virtual std::vector<TNameTypeInfo> GetKeyYqlSchema() const override {
+ return GetResultSchema()->GetIndexInfo().GetPrimaryKeyColumns();
+ }
+
+ TConclusionStatus Init(const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor);
+
+ std::vector<std::string> GetColumnsOrder() const {
+ auto schema = GetResultSchema();
+ std::vector<std::string> result;
+ for (auto&& i : schema->GetSchema()->fields()) {
+ result.emplace_back(i->name());
+ }
+ return result;
+ }
+
+ std::set<ui32> GetEarlyFilterColumnIds() const;
+ std::set<ui32> GetPKColumnIds() const;
+
+ bool Empty() const {
+ Y_ABORT_UNLESS(SelectInfo);
+ return SelectInfo->PortionsOrderedPK.empty();
+ }
+
+ size_t NumIndexedBlobs() const {
+ Y_ABORT_UNLESS(SelectInfo);
+ return SelectInfo->Stats().Blobs;
+ }
+
+ std::unique_ptr<TScanIteratorBase> StartScan(const std::shared_ptr<TReadContext>& readContext) const override;
+
+ void Dump(IOutputStream& out) const override {
+ out << " index blobs: " << NumIndexedBlobs()
+ // << " with program steps: " << (Program ? Program->Steps.size() : 0)
+ << " at snapshot: " << GetRequestSnapshot().DebugString();
+ TBase::Dump(out);
+ if (SelectInfo) {
+ out << ", ";
+ SelectInfo->DebugStream(out);
+ }
+ }
+
+ friend IOutputStream& operator << (IOutputStream& out, const TReadMetadata& meta) {
+ meta.Dump(out);
+ return out;
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/resolver.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/resolver.cpp
new file mode 100644
index 0000000000..5f04522502
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/resolver.cpp
@@ -0,0 +1,5 @@
+#include "resolver.h"
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+} \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/resolver.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/resolver.h
new file mode 100644
index 0000000000..6267658734
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/resolver.h
@@ -0,0 +1,28 @@
+#pragma once
+#include <ydb/core/tx/program/program.h>
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+class TIndexColumnResolver: public IColumnResolver {
+ const NOlap::TIndexInfo& IndexInfo;
+
+public:
+ explicit TIndexColumnResolver(const NOlap::TIndexInfo& indexInfo)
+ : IndexInfo(indexInfo) {
+ }
+
+ virtual std::optional<ui32> GetColumnIdOptional(const TString& name) const override {
+ return IndexInfo.GetColumnIdOptional(name);
+ }
+
+ TString GetColumnName(ui32 id, bool required) const override {
+ return IndexInfo.GetColumnName(id, required);
+ }
+
+ NSsa::TColumnInfo GetDefaultColumn() const override {
+ return NSsa::TColumnInfo::Original((ui32)NOlap::TIndexInfo::ESpecialColumn::PLAN_STEP, NOlap::TIndexInfo::SPEC_COL_PLAN_STEP);
+ }
+};
+
+} \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/ya.make b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/ya.make
new file mode 100644
index 0000000000..883f2b6b8e
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+SRCS(
+ GLOBAL constructor.cpp
+ resolver.cpp
+ read_metadata.cpp
+)
+
+PEERDIR(
+ ydb/core/tx/columnshard/engines/reader/abstract
+ ydb/core/kqp/compute_actor
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/columns_set.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/columns_set.cpp
new file mode 100644
index 0000000000..d053b9affd
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/columns_set.cpp
@@ -0,0 +1,79 @@
+#include "columns_set.h"
+#include <util/string/join.h>
+#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+TString TColumnsSet::DebugString() const {
+ return TStringBuilder() << "("
+ << "column_ids=" << JoinSeq(",", ColumnIds) << ";"
+ << "column_names=" << JoinSeq(",", ColumnNames) << ";"
+ << ");";
+}
+
+TColumnsSet TColumnsSet::operator-(const TColumnsSet& external) const {
+ if (external.IsEmpty() || IsEmpty()) {
+ return *this;
+ }
+ TColumnsSet result = *this;
+ for (auto&& i : external.ColumnIds) {
+ result.ColumnIds.erase(i);
+ }
+ arrow::FieldVector fields;
+ for (auto&& i : Schema->fields()) {
+ if (!external.Schema->GetFieldByName(i->name())) {
+ fields.emplace_back(i);
+ }
+ }
+ result.Schema = std::make_shared<arrow::Schema>(fields);
+ result.Rebuild();
+ return result;
+}
+
+TColumnsSet TColumnsSet::operator+(const TColumnsSet& external) const {
+ if (external.IsEmpty()) {
+ return *this;
+ }
+ if (IsEmpty()) {
+ return external;
+ }
+ TColumnsSet result = *this;
+ result.ColumnIds.insert(external.ColumnIds.begin(), external.ColumnIds.end());
+ auto fields = result.Schema->fields();
+ for (auto&& i : external.Schema->fields()) {
+ if (!result.Schema->GetFieldByName(i->name())) {
+ fields.emplace_back(i);
+ }
+ }
+ result.Schema = std::make_shared<arrow::Schema>(fields);
+ result.Rebuild();
+ return result;
+}
+
+bool TColumnsSet::ColumnsOnly(const std::vector<std::string>& fieldNames) const {
+ if (fieldNames.size() != GetColumnsCount()) {
+ return false;
+ }
+ std::set<std::string> fieldNamesSet;
+ for (auto&& i : fieldNames) {
+ if (!fieldNamesSet.emplace(i).second) {
+ return false;
+ }
+ if (!ColumnNames.contains(TString(i.data(), i.size()))) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void TColumnsSet::Rebuild() {
+ ColumnNamesVector.clear();
+ ColumnNames.clear();
+ for (auto&& i : Schema->field_names()) {
+ ColumnNamesVector.emplace_back(i);
+ ColumnNames.emplace(i);
+ }
+ FilteredSchema = std::make_shared<TFilteredSnapshotSchema>(FullReadSchema, ColumnIds);
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/columns_set.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/columns_set.h
new file mode 100644
index 0000000000..dca3e42df6
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/columns_set.h
@@ -0,0 +1,214 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+
+#include <ydb/library/accessor/accessor.h>
+
+#include <util/string/join.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+enum class EMemType {
+ Blob,
+ Raw,
+ RawSequential
+};
+
+enum class EStageFeaturesIndexes {
+ Accessors = 0,
+ Filter = 1,
+ Fetching = 2,
+ Merge = 3
+};
+
+class TIndexesSet {
+private:
+ YDB_READONLY_DEF(std::vector<ui32>, IndexIds);
+ YDB_READONLY_DEF(std::set<ui32>, IndexIdsSet);
+
+public:
+ TIndexesSet(const std::set<ui32>& indexIds)
+ : IndexIds(indexIds.begin(), indexIds.end())
+ , IndexIdsSet(indexIds) {
+ AFL_VERIFY(IndexIds.size() == IndexIdsSet.size())("indexes", JoinSeq(",", IndexIds));
+ }
+
+ TIndexesSet(const ui32& indexId)
+ : IndexIds({ indexId })
+ , IndexIdsSet({ indexId }) {
+ }
+
+ ui32 GetIndexesCount() const {
+ return IndexIds.size();
+ }
+
+ TString DebugString() const {
+ return TStringBuilder() << JoinSeq(",", IndexIds);
+ }
+};
+
+class TColumnsSetIds {
+protected:
+ std::set<ui32> ColumnIds;
+
+public:
+ const std::set<ui32>& GetColumnIds() const {
+ return ColumnIds;
+ }
+
+ TString DebugString() const {
+ return JoinSeq(",", ColumnIds);
+ }
+
+ TColumnsSetIds(const std::set<ui32>& ids)
+ : ColumnIds(ids) {
+ }
+ TColumnsSetIds() = default;
+ TColumnsSetIds(std::set<ui32>&& ids)
+ : ColumnIds(std::move(ids)) {
+ }
+
+ TColumnsSetIds(const std::vector<ui32>& ids)
+ : ColumnIds(ids.begin(), ids.end()) {
+ }
+
+ TColumnsSetIds operator+(const TColumnsSetIds& external) const {
+ TColumnsSetIds result = *this;
+ result.ColumnIds.insert(external.ColumnIds.begin(), external.ColumnIds.end());
+ return result;
+ }
+
+ TColumnsSetIds operator-(const TColumnsSetIds& external) const {
+ TColumnsSetIds result = *this;
+ for (auto&& i : external.ColumnIds) {
+ result.ColumnIds.erase(i);
+ }
+ return result;
+ }
+ bool IsEmpty() const {
+ return ColumnIds.empty();
+ }
+
+ bool operator!() const {
+ return IsEmpty();
+ }
+ ui32 GetColumnsCount() const {
+ return ColumnIds.size();
+ }
+
+ bool Contains(const std::shared_ptr<TColumnsSetIds>& columnsSet) const {
+ if (!columnsSet) {
+ return true;
+ }
+ return Contains(*columnsSet);
+ }
+
+ bool IsEqual(const std::shared_ptr<TColumnsSetIds>& columnsSet) const {
+ if (!columnsSet) {
+ return false;
+ }
+ return IsEqual(*columnsSet);
+ }
+
+ bool Contains(const TColumnsSetIds& columnsSet) const {
+ for (auto&& i : columnsSet.ColumnIds) {
+ if (!ColumnIds.contains(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ bool Cross(const TColumnsSetIds& columnsSet) const {
+ for (auto&& i : columnsSet.ColumnIds) {
+ if (ColumnIds.contains(i)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ std::set<ui32> Intersect(const TColumnsSetIds& columnsSet) const {
+ std::set<ui32> result;
+ for (auto&& i : columnsSet.ColumnIds) {
+ if (ColumnIds.contains(i)) {
+ result.emplace(i);
+ }
+ }
+ return result;
+ }
+
+ bool IsEqual(const TColumnsSetIds& columnsSet) const {
+ if (columnsSet.GetColumnIds().size() != ColumnIds.size()) {
+ return false;
+ }
+ auto itA = ColumnIds.begin();
+ auto itB = columnsSet.ColumnIds.begin();
+ while (itA != ColumnIds.end()) {
+ if (*itA != *itB) {
+ return false;
+ }
+ ++itA;
+ ++itB;
+ }
+ return true;
+ }
+};
+
+class TColumnsSet: public TColumnsSetIds {
+private:
+ using TBase = TColumnsSetIds;
+ YDB_READONLY_DEF(std::set<TString>, ColumnNames);
+ std::vector<TString> ColumnNamesVector;
+ YDB_READONLY_DEF(std::shared_ptr<arrow::Schema>, Schema);
+ ISnapshotSchema::TPtr FullReadSchema;
+ YDB_READONLY_DEF(ISnapshotSchema::TPtr, FilteredSchema);
+
+ void Rebuild();
+
+public:
+ TColumnsSet() = default;
+ const std::vector<TString>& GetColumnNamesVector() const {
+ return ColumnNamesVector;
+ }
+
+ bool ColumnsOnly(const std::vector<std::string>& fieldNames) const;
+
+ std::shared_ptr<TColumnsSet> BuildSamePtr(const std::set<ui32>& columnIds) const {
+ return std::make_shared<TColumnsSet>(columnIds, FullReadSchema);
+ }
+
+ TColumnsSet(const std::set<ui32>& columnIds, const ISnapshotSchema::TPtr& fullReadSchema)
+ : TBase(columnIds)
+ , FullReadSchema(fullReadSchema) {
+ AFL_VERIFY(!!FullReadSchema);
+ Schema = FullReadSchema->GetIndexInfo().GetColumnsSchema(ColumnIds);
+ Rebuild();
+ }
+
+ TColumnsSet(const std::vector<ui32>& columnIds, const ISnapshotSchema::TPtr& fullReadSchema)
+ : TBase(columnIds)
+ , FullReadSchema(fullReadSchema) {
+ AFL_VERIFY(!!FullReadSchema);
+ Schema = FullReadSchema->GetIndexInfo().GetColumnsSchema(ColumnIds);
+ Rebuild();
+ }
+
+ const ISnapshotSchema& GetFilteredSchemaVerified() const {
+ AFL_VERIFY(FilteredSchema);
+ return *FilteredSchema;
+ }
+
+ const std::shared_ptr<ISnapshotSchema>& GetFilteredSchemaPtrVerified() const {
+ AFL_VERIFY(FilteredSchema);
+ return FilteredSchema;
+ }
+
+ TString DebugString() const;
+
+ TColumnsSet operator+(const TColumnsSet& external) const;
+
+ TColumnsSet operator-(const TColumnsSet& external) const;
+};
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.cpp
new file mode 100644
index 0000000000..bd1f1c5a1a
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.cpp
@@ -0,0 +1,22 @@
+#include "constructor.h"
+#include <ydb/core/tx/conveyor/usage/service.h>
+#include <ydb/core/tx/columnshard/columnshard_private_events.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+void TBlobsFetcherTask::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) {
+ Source->MutableStageData().AddBlobs(Source->DecodeBlobAddresses(ExtractBlobsData()));
+ AFL_VERIFY(Step.Next());
+ auto task = std::make_shared<TStepAction>(Source, std::move(Step), Context->GetCommonContext()->GetScanActorId());
+ NConveyor::TScanServiceOperator::SendTaskToExecute(task);
+}
+
+bool TBlobsFetcherTask::DoOnError(const TString& storageId, const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("error_on_blob_reading", range.ToString())("scan_actor_id", Context->GetCommonContext()->GetScanActorId())
+ ("status", status.GetErrorMessage())("status_code", status.GetStatus())("storage_id", storageId);
+ NActors::TActorContext::AsActorContext().Send(Context->GetCommonContext()->GetScanActorId(),
+ std::make_unique<NColumnShard::TEvPrivate::TEvTaskProcessedResult>(TConclusionStatus::Fail("cannot read blob range " + range.ToString())));
+ return false;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.h
new file mode 100644
index 0000000000..237923ed58
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.h
@@ -0,0 +1,32 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
+#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
+#include <ydb/core/tx/columnshard/engines/portions/column_record.h>
+#include <ydb/core/tx/columnshard/blobs_reader/task.h>
+#include <ydb/core/tx/columnshard/blob.h>
+#include "source.h"
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+class TBlobsFetcherTask: public NBlobOperations::NRead::ITask, public NColumnShard::TMonitoringObjectsCounter<TBlobsFetcherTask> {
+private:
+ using TBase = NBlobOperations::NRead::ITask;
+ const std::shared_ptr<IDataSource> Source;
+ TFetchingScriptCursor Step;
+ const std::shared_ptr<TSpecialReadContext> Context;
+
+ virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override;
+ virtual bool DoOnError(const TString& storageId, const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override;
+public:
+ TBlobsFetcherTask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, const std::shared_ptr<IDataSource>& sourcePtr,
+ const TFetchingScriptCursor& step, const std::shared_ptr<TSpecialReadContext>& context, const TString& taskCustomer, const TString& externalTaskId)
+ : TBase(readActions, taskCustomer, externalTaskId)
+ , Source(sourcePtr)
+ , Step(step)
+ , Context(context)
+ {
+
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp
new file mode 100644
index 0000000000..caa7f03305
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp
@@ -0,0 +1,309 @@
+#include "context.h"
+#include "source.h"
+
+#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+std::unique_ptr<NArrow::NMerger::TMergePartialStream> TSpecialReadContext::BuildMerger() const {
+ return std::make_unique<NArrow::NMerger::TMergePartialStream>(
+ ReadMetadata->GetReplaceKey(), ProgramInputColumns->GetSchema(), CommonContext->IsReverse(), IIndexInfo::GetSnapshotColumnNames());
+}
+
+std::shared_ptr<TFetchingScript> TSpecialReadContext::GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source) {
+ if (!source->GetStageData().HasPortionAccessor()) {
+ if (!AskAccumulatorsScript) {
+ AskAccumulatorsScript = std::make_shared<TFetchingScript>(*this);
+ AskAccumulatorsScript->AddStep<TAllocateMemoryStep>(source->PredictAccessorsSize(), EStageFeaturesIndexes::Accessors);
+ AskAccumulatorsScript->AddStep<TPortionAccessorFetchingStep>();
+ AskAccumulatorsScript->AddStep<TDetectInMem>(*FFColumns);
+ }
+ return AskAccumulatorsScript;
+ }
+ const bool partialUsageByPK = [&]() {
+ switch (source->GetUsageClass()) {
+ case TPKRangeFilter::EUsageClass::PartialUsage:
+ return true;
+ case TPKRangeFilter::EUsageClass::DontUsage:
+ return true;
+ case TPKRangeFilter::EUsageClass::FullUsage:
+ return false;
+ }
+ }();
+ const bool useIndexes = (IndexChecker ? source->HasIndexes(IndexChecker->GetIndexIds()) : false);
+ const bool needSnapshots = ReadMetadata->GetRequestSnapshot() < source->GetRecordSnapshotMax();
+ const bool hasDeletions = source->GetHasDeletions();
+ bool needShardingFilter = false;
+ if (!!ReadMetadata->GetRequestShardingInfo()) {
+ auto ver = source->GetShardingVersionOptional();
+ if (!ver || *ver < ReadMetadata->GetRequestShardingInfo()->GetSnapshotVersion()) {
+ needShardingFilter = true;
+ }
+ }
+ {
+ auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0][needShardingFilter ? 1 : 0]
+ [hasDeletions ? 1 : 0];
+ if (!result) {
+ TGuard<TMutex> wg(Mutex);
+ result = CacheFetchingScripts[needSnapshots ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0]
+ [needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0];
+ if (!result) {
+ result = BuildColumnsFetchingPlan(needSnapshots, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions);
+ CacheFetchingScripts[needSnapshots ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0]
+ [needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0] = result;
+ }
+ }
+ AFL_VERIFY(result);
+ if (*result) {
+ return *result;
+ } else {
+ std::shared_ptr<TFetchingScript> result = std::make_shared<TFetchingScript>(*this);
+ result->SetBranchName("FAKE");
+ result->AddStep(std::make_shared<TBuildFakeSpec>(source->GetRecordsCount()));
+ return result;
+ }
+ }
+}
+
+class TColumnsAccumulator {
+private:
+ TColumnsSetIds FetchingReadyColumns;
+ TColumnsSetIds AssemblerReadyColumns;
+ ISnapshotSchema::TPtr FullSchema;
+ std::shared_ptr<TColumnsSetIds> GuaranteeNotOptional;
+
+public:
+ TColumnsAccumulator(const std::shared_ptr<TColumnsSetIds>& guaranteeNotOptional, const ISnapshotSchema::TPtr& fullSchema)
+ : FullSchema(fullSchema)
+ , GuaranteeNotOptional(guaranteeNotOptional) {
+ }
+
+ TColumnsSetIds GetNotFetchedAlready(const TColumnsSetIds& columns) const {
+ return columns - FetchingReadyColumns;
+ }
+
+ bool AddFetchingStep(TFetchingScript& script, const TColumnsSetIds& columns, const EStageFeaturesIndexes stage) {
+ auto actualColumns = GetNotFetchedAlready(columns);
+ FetchingReadyColumns = FetchingReadyColumns + (TColumnsSetIds)columns;
+ if (!actualColumns.IsEmpty()) {
+ script.Allocation(columns.GetColumnIds(), stage, EMemType::Blob);
+ script.AddStep(std::make_shared<TColumnBlobsFetchingStep>(actualColumns));
+ return true;
+ }
+ return false;
+ }
+ bool AddAssembleStep(TFetchingScript& script, const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage,
+ const bool sequential) {
+ auto actualColumns = columns - AssemblerReadyColumns;
+ AssemblerReadyColumns = AssemblerReadyColumns + columns;
+ if (!actualColumns.IsEmpty()) {
+ auto actualSet = std::make_shared<TColumnsSet>(actualColumns.GetColumnIds(), FullSchema);
+ if (sequential) {
+ const auto notSequentialColumnIds = GuaranteeNotOptional->Intersect(*actualSet);
+ if (notSequentialColumnIds.size()) {
+ script.Allocation(notSequentialColumnIds, stage, EMemType::Raw);
+ std::shared_ptr<TColumnsSet> cross = actualSet->BuildSamePtr(notSequentialColumnIds);
+ script.AddStep<TAssemblerStep>(cross, purposeId);
+ *actualSet = *actualSet - *cross;
+ }
+ if (!actualSet->IsEmpty()) {
+ script.Allocation(notSequentialColumnIds, stage, EMemType::RawSequential);
+ script.AddStep<TOptionalAssemblerStep>(actualSet, purposeId);
+ }
+ } else {
+ script.Allocation(actualColumns.GetColumnIds(), stage, EMemType::Raw);
+ script.AddStep<TAssemblerStep>(actualSet, purposeId);
+ }
+ return true;
+ }
+ return false;
+ }
+};
+
+std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots,
+ const bool partialUsageByPredicateExt, const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const {
+ std::shared_ptr<TFetchingScript> result = std::make_shared<TFetchingScript>(*this);
+ const bool partialUsageByPredicate = partialUsageByPredicateExt && PredicateColumns->GetColumnsCount();
+
+ TColumnsAccumulator acc(MergeColumns, ReadMetadata->GetResultSchema());
+ if (!!IndexChecker && useIndexes) {
+ result->AddStep(std::make_shared<TIndexBlobsFetchingStep>(std::make_shared<TIndexesSet>(IndexChecker->GetIndexIds())));
+ result->AddStep(std::make_shared<TApplyIndexStep>(IndexChecker));
+ }
+ if (needFilterSharding && !ShardingColumns->IsEmpty()) {
+ const TColumnsSetIds columnsFetch = *ShardingColumns;
+ acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter);
+ acc.AddAssembleStep(*result, columnsFetch, "SPEC_SHARDING", EStageFeaturesIndexes::Filter, false);
+ result->AddStep(std::make_shared<TShardingFilter>());
+ }
+ {
+ result->SetBranchName("exclusive");
+ TColumnsSet columnsFetch = *EFColumns;
+ if (needFilterDeletion) {
+ columnsFetch = columnsFetch + *DeletionColumns;
+ }
+ if (needSnapshots || FFColumns->Cross(*SpecColumns)) {
+ columnsFetch = columnsFetch + *SpecColumns;
+ }
+ if (partialUsageByPredicate) {
+ columnsFetch = columnsFetch + *PredicateColumns;
+ }
+
+ if (columnsFetch.GetColumnsCount()) {
+ acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter);
+ }
+
+ if (needFilterDeletion) {
+ acc.AddAssembleStep(*result, *DeletionColumns, "SPEC_DELETION", EStageFeaturesIndexes::Filter, false);
+ result->AddStep(std::make_shared<TDeletionFilter>());
+ }
+ if (partialUsageByPredicate) {
+ acc.AddAssembleStep(*result, *PredicateColumns, "PREDICATE", EStageFeaturesIndexes::Filter, false);
+ result->AddStep(std::make_shared<TPredicateFilter>());
+ }
+ if (needSnapshots || FFColumns->Cross(*SpecColumns)) {
+ acc.AddAssembleStep(*result, *SpecColumns, "SPEC", EStageFeaturesIndexes::Filter, false);
+ result->AddStep(std::make_shared<TSnapshotFilter>());
+ }
+ for (auto&& i : ReadMetadata->GetProgram().GetSteps()) {
+ if (i->GetFilterOriginalColumnIds().empty()) {
+ break;
+ }
+ TColumnsSet stepColumnIds(i->GetFilterOriginalColumnIds(), ReadMetadata->GetResultSchema());
+ acc.AddAssembleStep(*result, stepColumnIds, "EF", EStageFeaturesIndexes::Filter, false);
+ result->AddStep(std::make_shared<TFilterProgramStep>(i));
+ if (!i->IsFilterOnly()) {
+ break;
+ }
+ }
+ if (GetReadMetadata()->Limit) {
+ result->AddStep(std::make_shared<TFilterCutLimit>(GetReadMetadata()->Limit, GetReadMetadata()->IsDescSorted()));
+ }
+ acc.AddFetchingStep(*result, *FFColumns, EStageFeaturesIndexes::Fetching);
+ acc.AddAssembleStep(*result, *FFColumns, "LAST", EStageFeaturesIndexes::Fetching, false);
+ }
+ result->AddStep<TPrepareResultStep>();
+ return result;
+}
+
+TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext)
+ : CommonContext(commonContext) {
+
+ ReadMetadata = dynamic_pointer_cast<const TReadMetadata>(CommonContext->GetReadMetadata());
+ Y_ABORT_UNLESS(ReadMetadata);
+ Y_ABORT_UNLESS(ReadMetadata->SelectInfo);
+
+ double kffAccessors = 0.01;
+ double kffFilter = 0.45;
+ double kffFetching = 0.45;
+ double kffMerge = 0.10;
+ TString stagePrefix;
+ if (ReadMetadata->GetEarlyFilterColumnIds().size()) {
+ stagePrefix = "EF";
+ kffFilter = 0.7;
+ kffFetching = 0.15;
+ kffMerge = 0.14;
+ kffAccessors = 0.01;
+ } else {
+ stagePrefix = "FO";
+ kffFilter = 0.1;
+ kffFetching = 0.75;
+ kffMerge = 0.14;
+ kffAccessors = 0.01;
+ }
+
+ std::vector<std::shared_ptr<NGroupedMemoryManager::TStageFeatures>> stages = {
+ NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(
+ stagePrefix + "::ACCESSORS", kffAccessors * TGlobalLimits::ScanMemoryLimit),
+ NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(
+ stagePrefix + "::FILTER", kffFilter * TGlobalLimits::ScanMemoryLimit),
+ NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(
+ stagePrefix + "::FETCHING", kffFetching * TGlobalLimits::ScanMemoryLimit),
+ NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(stagePrefix + "::MERGE", kffMerge * TGlobalLimits::ScanMemoryLimit)
+ };
+ ProcessMemoryGuard =
+ NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(CommonContext->GetReadMetadata()->GetTxId(), stages);
+ ProcessScopeGuard =
+ NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId());
+
+ auto readSchema = ReadMetadata->GetResultSchema();
+ SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSnapshotColumnIdsSet(), readSchema);
+ IndexChecker = ReadMetadata->GetProgram().GetIndexChecker();
+ {
+ auto predicateColumns = ReadMetadata->GetPKRangesFilter().GetColumnIds(ReadMetadata->GetIndexInfo());
+ if (predicateColumns.size()) {
+ PredicateColumns = std::make_shared<TColumnsSet>(predicateColumns, readSchema);
+ } else {
+ PredicateColumns = std::make_shared<TColumnsSet>();
+ }
+ }
+ {
+ std::set<ui32> columnIds = { NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX };
+ DeletionColumns = std::make_shared<TColumnsSet>(columnIds, ReadMetadata->GetResultSchema());
+ }
+
+ if (!!ReadMetadata->GetRequestShardingInfo()) {
+ auto shardingColumnIds =
+ ReadMetadata->GetIndexInfo().GetColumnIdsVerified(ReadMetadata->GetRequestShardingInfo()->GetShardingInfo()->GetColumnNames());
+ ShardingColumns = std::make_shared<TColumnsSet>(shardingColumnIds, ReadMetadata->GetResultSchema());
+ } else {
+ ShardingColumns = std::make_shared<TColumnsSet>();
+ }
+ {
+ auto efColumns = ReadMetadata->GetEarlyFilterColumnIds();
+ if (efColumns.size()) {
+ EFColumns = std::make_shared<TColumnsSet>(efColumns, readSchema);
+ } else {
+ EFColumns = std::make_shared<TColumnsSet>();
+ }
+ }
+ if (ReadMetadata->HasProcessingColumnIds()) {
+ FFColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetProcessingColumnIds(), readSchema);
+ if (SpecColumns->Contains(*FFColumns) && !EFColumns->IsEmpty()) {
+ FFColumns = std::make_shared<TColumnsSet>(*EFColumns + *SpecColumns);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("ff_modified", FFColumns->DebugString());
+ } else {
+ AFL_VERIFY(!FFColumns->Contains(*SpecColumns))("info", FFColumns->DebugString());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("ff_first", FFColumns->DebugString());
+ }
+ } else {
+ FFColumns = EFColumns;
+ }
+ if (FFColumns->IsEmpty()) {
+ ProgramInputColumns = SpecColumns;
+ } else {
+ ProgramInputColumns = FFColumns;
+ }
+
+ PKColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetPKColumnIds(), readSchema);
+ MergeColumns = std::make_shared<TColumnsSet>(*PKColumns + *SpecColumns);
+
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString());
+}
+
+TString TSpecialReadContext::DebugString() const {
+ TStringBuilder sb;
+ sb << "ef=" << EFColumns->DebugString() << ";"
+ << "sharding=" << ShardingColumns->DebugString() << ";"
+ << "pk=" << PKColumns->DebugString() << ";"
+ << "ff=" << FFColumns->DebugString() << ";"
+ << "program_input=" << ProgramInputColumns->DebugString() << ";";
+ return sb;
+}
+
+TString TSpecialReadContext::ProfileDebugString() const {
+ TStringBuilder sb;
+ const auto GetBit = [](const ui32 val, const ui32 pos) -> ui32 {
+ return (val & (1 << pos)) ? 1 : 0;
+ };
+
+ for (ui32 i = 0; i < (1 << 5); ++i) {
+ auto script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)];
+ if (script && *script) {
+ sb << (*script)->DebugString() << ";";
+ }
+ }
+ return sb;
+}
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h
new file mode 100644
index 0000000000..f64d0923d6
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h
@@ -0,0 +1,85 @@
+#pragma once
+#include "columns_set.h"
+#include "fetching.h"
+#include <ydb/core/tx/columnshard/common/limits.h>
+#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
+#include <ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h>
+#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
+#include <ydb/core/formats/arrow/reader/merger.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+class IDataSource;
+
+class TSpecialReadContext {
+private:
+ YDB_READONLY_DEF(std::shared_ptr<TReadContext>, CommonContext);
+ YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TProcessGuard>, ProcessMemoryGuard);
+ YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TScopeGuard>, ProcessScopeGuard);
+
+ YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, SpecColumns);
+ YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, MergeColumns);
+ YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, ShardingColumns);
+ YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, DeletionColumns);
+ YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, EFColumns);
+ YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, PredicateColumns);
+ YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, PKColumns);
+ YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FFColumns);
+ YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, ProgramInputColumns);
+
+ YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TStageFeatures>, MergeStageMemory);
+ YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TStageFeatures>, FilterStageMemory);
+ YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TStageFeatures>, FetchingStageMemory);
+
+ TAtomic AbortFlag = 0;
+ NIndexes::TIndexCheckerContainer IndexChecker;
+ TReadMetadata::TConstPtr ReadMetadata;
+ std::shared_ptr<TColumnsSet> EmptyColumns = std::make_shared<TColumnsSet>();
+ std::shared_ptr<TFetchingScript> BuildColumnsFetchingPlan(const bool needSnapshots, const bool partialUsageByPredicateExt,
+ const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const;
+ TMutex Mutex;
+ std::array<std::array<std::array<std::array<std::array<std::optional<std::shared_ptr<TFetchingScript>>, 2>, 2>, 2>, 2>, 2>
+ CacheFetchingScripts;
+ std::shared_ptr<TFetchingScript> AskAccumulatorsScript;
+
+public:
+ const ui64 ReduceMemoryIntervalLimit = NYDBTest::TControllers::GetColumnShardController()->GetReduceMemoryIntervalLimit();
+ const ui64 RejectMemoryIntervalLimit = NYDBTest::TControllers::GetColumnShardController()->GetRejectMemoryIntervalLimit();
+ const ui64 ReadSequentiallyBufferSize = TGlobalLimits::DefaultReadSequentiallyBufferSize;
+
+ ui64 GetProcessMemoryControlId() const {
+ AFL_VERIFY(ProcessMemoryGuard);
+ return ProcessMemoryGuard->GetProcessId();
+ }
+ ui64 GetRequestedMemoryBytes() const {
+ return MergeStageMemory->GetFullMemory() + FilterStageMemory->GetFullMemory() + FetchingStageMemory->GetFullMemory();
+ }
+
+ const TReadMetadata::TConstPtr& GetReadMetadata() const {
+ return ReadMetadata;
+ }
+
+ bool IsAborted() const {
+ return AtomicGet(AbortFlag);
+ }
+
+ void Abort() {
+ AtomicSet(AbortFlag, 1);
+ }
+
+ ~TSpecialReadContext() {
+ AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("profile", ProfileDebugString());
+ AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("fetching", DebugString());
+ }
+
+ std::unique_ptr<NArrow::NMerger::TMergePartialStream> BuildMerger() const;
+
+ TString DebugString() const;
+ TString ProfileDebugString() const;
+
+ TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext);
+
+ std::shared_ptr<TFetchingScript> GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source);
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetched_data.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetched_data.cpp
new file mode 100644
index 0000000000..bf38c466b7
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetched_data.cpp
@@ -0,0 +1,21 @@
+#include "fetched_data.h"
+
+#include <ydb/core/formats/arrow/accessor/plain/accessor.h>
+#include <ydb/library/formats/arrow/common/validation.h>
+#include <ydb/library/formats/arrow/simple_arrays_cache.h>
+
+namespace NKikimr::NOlap {
+
+void TFetchedData::SyncTableColumns(const std::vector<std::shared_ptr<arrow::Field>>& fields, const ISnapshotSchema& schema) {
+ for (auto&& i : fields) {
+ if (Table->GetSchema()->GetFieldByName(i->name())) {
+ continue;
+ }
+ Table
+ ->AddField(i, std::make_shared<NArrow::NAccessor::TTrivialArray>(NArrow::TThreadSimpleArraysCache::Get(
+ i->type(), schema.GetExternalDefaultValueVerified(i->name()), Table->num_rows())))
+ .Validate();
+ }
+}
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetched_data.h
new file mode 100644
index 0000000000..001f245533
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetched_data.h
@@ -0,0 +1,236 @@
+#pragma once
+#include <ydb/core/formats/arrow/arrow_filter.h>
+#include <ydb/core/formats/arrow/common/container.h>
+#include <ydb/core/formats/arrow/size_calcer.h>
+#include <ydb/core/tx/columnshard/blob.h>
+#include <ydb/core/tx/columnshard/blobs_reader/task.h>
+#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/library/actors/core/log.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_base.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/table.h>
+
+namespace NKikimr::NOlap {
+
+class TFetchedData {
+protected:
+ using TBlobs = THashMap<TChunkAddress, TPortionDataAccessor::TAssembleBlobInfo>;
+ YDB_ACCESSOR_DEF(TBlobs, Blobs);
+ YDB_READONLY_DEF(std::shared_ptr<NArrow::TGeneralContainer>, Table);
+ YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter);
+ YDB_READONLY(bool, UseFilter, false);
+
+ std::optional<TPortionDataAccessor> PortionAccessor;
+ bool DataAdded = false;
+
+public:
+ TFetchedData(const bool useFilter)
+ : UseFilter(useFilter) {
+ }
+
+ void SetUseFilter(const bool value) {
+ if (UseFilter == value) {
+ return;
+ }
+ AFL_VERIFY(!DataAdded);
+ }
+
+ bool HasPortionAccessor() const {
+ return !!PortionAccessor;
+ }
+
+ void SetPortionAccessor(TPortionDataAccessor&& accessor) {
+ AFL_VERIFY(!PortionAccessor);
+ PortionAccessor = std::move(accessor);
+ }
+
+ const TPortionDataAccessor& GetPortionAccessor() const {
+ AFL_VERIFY(!!PortionAccessor);
+ return *PortionAccessor;
+ }
+
+ ui32 GetFilteredCount(const ui32 recordsCount, const ui32 defLimit) const {
+ if (!Filter) {
+ return std::min(defLimit, recordsCount);
+ }
+ return Filter->GetFilteredCount().value_or(recordsCount);
+ }
+
+ void SyncTableColumns(const std::vector<std::shared_ptr<arrow::Field>>& fields, const ISnapshotSchema& schema);
+
+ std::shared_ptr<NArrow::TColumnFilter> GetAppliedFilter() const {
+ return UseFilter ? Filter : nullptr;
+ }
+
+ std::shared_ptr<NArrow::TColumnFilter> GetNotAppliedFilter() const {
+ return UseFilter ? nullptr : Filter;
+ }
+
+ TString ExtractBlob(const TChunkAddress& address) {
+ auto it = Blobs.find(address);
+ AFL_VERIFY(it != Blobs.end());
+ AFL_VERIFY(it->second.IsBlob());
+ auto result = it->second.GetData();
+ Blobs.erase(it);
+ return result;
+ }
+
+ void AddBlobs(THashMap<TChunkAddress, TString>&& blobData) {
+ for (auto&& i : blobData) {
+ AFL_VERIFY(Blobs.emplace(i.first, std::move(i.second)).second);
+ }
+ }
+
+ void AddDefaults(THashMap<TChunkAddress, TPortionDataAccessor::TAssembleBlobInfo>&& blobs) {
+ for (auto&& i : blobs) {
+ AFL_VERIFY(Blobs.emplace(i.first, std::move(i.second)).second);
+ }
+ }
+
+ bool IsEmpty() const {
+ return (Filter && Filter->IsTotalDenyFilter()) || (Table && !Table->num_rows());
+ }
+
+ void Clear() {
+ Filter = std::make_shared<NArrow::TColumnFilter>(NArrow::TColumnFilter::BuildDenyFilter());
+ Table = nullptr;
+ }
+
+ void AddFilter(const std::shared_ptr<NArrow::TColumnFilter>& filter) {
+ DataAdded = true;
+ if (!filter) {
+ return;
+ }
+ return AddFilter(*filter);
+ }
+
+ void CutFilter(const ui32 recordsCount, const ui32 limit, const bool reverse) {
+ auto filter = std::make_shared<NArrow::TColumnFilter>(NArrow::TColumnFilter::BuildAllowFilter());
+ ui32 recordsCountImpl = Filter ? Filter->GetFilteredCount().value_or(recordsCount) : recordsCount;
+ if (recordsCountImpl < limit) {
+ return;
+ }
+ if (reverse) {
+ filter->Add(false, recordsCountImpl - limit);
+ filter->Add(true, limit);
+ } else {
+ filter->Add(true, limit);
+ filter->Add(false, recordsCountImpl - limit);
+ }
+ if (Filter) {
+ if (UseFilter) {
+ AddFilter(*filter);
+ } else {
+ AddFilter(Filter->CombineSequentialAnd(*filter));
+ }
+ } else {
+ AddFilter(*filter);
+ }
+ }
+
+ void AddFilter(const NArrow::TColumnFilter& filter) {
+ if (UseFilter && Table) {
+ AFL_VERIFY(filter.Apply(Table));
+ }
+ if (!Filter) {
+ Filter = std::make_shared<NArrow::TColumnFilter>(filter);
+ } else if (UseFilter) {
+ *Filter = Filter->CombineSequentialAnd(filter);
+ } else {
+ *Filter = Filter->And(filter);
+ }
+ }
+
+ void AddBatch(const std::shared_ptr<NArrow::TGeneralContainer>& table) {
+ DataAdded = true;
+ AFL_VERIFY(table);
+ if (UseFilter) {
+ AddBatch(table->BuildTableVerified());
+ } else {
+ if (!Table) {
+ Table = table;
+ } else {
+ auto mergeResult = Table->MergeColumnsStrictly(*table);
+ AFL_VERIFY(mergeResult.IsSuccess())("error", mergeResult.GetErrorMessage());
+ }
+ }
+ }
+
+ void AddBatch(const std::shared_ptr<arrow::Table>& table) {
+ DataAdded = true;
+ auto tableLocal = table;
+ if (Filter && UseFilter) {
+ AFL_VERIFY(Filter->Apply(tableLocal));
+ }
+ if (!Table) {
+ Table = std::make_shared<NArrow::TGeneralContainer>(tableLocal);
+ } else {
+ auto mergeResult = Table->MergeColumnsStrictly(NArrow::TGeneralContainer(tableLocal));
+ AFL_VERIFY(mergeResult.IsSuccess())("error", mergeResult.GetErrorMessage());
+ }
+ }
+};
+
+class TFetchedResult {
+private:
+ YDB_READONLY_DEF(std::shared_ptr<NArrow::TGeneralContainer>, Batch);
+ YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, NotAppliedFilter);
+ std::optional<std::deque<TPortionDataAccessor::TReadPage>> PagesToResult;
+ std::optional<std::shared_ptr<arrow::Table>> ChunkToReply;
+
+public:
+ TFetchedResult(std::unique_ptr<TFetchedData>&& data)
+ : Batch(data->GetTable())
+ , NotAppliedFilter(data->GetNotAppliedFilter()) {
+ }
+
+ TPortionDataAccessor::TReadPage ExtractPageForResult() {
+ AFL_VERIFY(PagesToResult);
+ AFL_VERIFY(PagesToResult->size());
+ auto result = PagesToResult->front();
+ PagesToResult->pop_front();
+ return result;
+ }
+
+ const std::deque<TPortionDataAccessor::TReadPage>& GetPagesToResultVerified() const {
+ AFL_VERIFY(PagesToResult);
+ return *PagesToResult;
+ }
+
+ void SetPages(std::vector<TPortionDataAccessor::TReadPage>&& pages) {
+ AFL_VERIFY(!PagesToResult);
+ PagesToResult = std::deque<TPortionDataAccessor::TReadPage>(pages.begin(), pages.end());
+ }
+
+ void SetResultChunk(std::shared_ptr<arrow::Table>&& table, const ui32 indexStart, const ui32 recordsCount) {
+ auto page = ExtractPageForResult();
+ AFL_VERIFY(page.GetIndexStart() == indexStart)("real", page.GetIndexStart())("expected", indexStart);
+ AFL_VERIFY(page.GetRecordsCount() == recordsCount)("real", page.GetRecordsCount())("expected", recordsCount);
+ AFL_VERIFY(!ChunkToReply);
+ ChunkToReply = std::move(table);
+ }
+
+ bool IsFinished() const {
+ return GetPagesToResultVerified().empty();
+ }
+
+ bool HasResultChunk() const {
+ return !!ChunkToReply;
+ }
+
+ std::shared_ptr<arrow::Table> ExtractResultChunk() {
+ AFL_VERIFY(!!ChunkToReply);
+ auto result = std::move(*ChunkToReply);
+ ChunkToReply.reset();
+ return result;
+ }
+
+ bool IsEmpty() const {
+ return !Batch || Batch->num_rows() == 0 || (NotAppliedFilter && NotAppliedFilter->IsTotalDenyFilter());
+ }
+};
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp
new file mode 100644
index 0000000000..97b410654d
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp
@@ -0,0 +1,403 @@
+#include "fetching.h"
+#include "plain_read_data.h"
+#include "source.h"
+
+#include <ydb/core/tx/columnshard/engines/filter.h>
+#include <ydb/core/tx/conveyor/usage/service.h>
+#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>
+
+#include <ydb/library/formats/arrow/simple_arrays_cache.h>
+
+#include <yql/essentials/minikql/mkql_terminator.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+bool TStepAction::DoApply(IDataReader& owner) const {
+ if (FinishedFlag) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "apply");
+ auto* plainReader = static_cast<TPlainReadData*>(&owner);
+ plainReader->MutableScanner().OnSourceReady(Source, nullptr, 0, Source->GetRecordsCount(), *plainReader);
+ }
+ return true;
+}
+
+TConclusionStatus TStepAction::DoExecuteImpl() {
+ if (Source->GetContext()->IsAborted()) {
+ return TConclusionStatus::Success();
+ }
+ auto executeResult = Cursor.Execute(Source);
+ if (!executeResult) {
+ return executeResult;
+ }
+ if (*executeResult) {
+ FinishedFlag = true;
+ }
+ return TConclusionStatus::Success();
+}
+
+TStepAction::TStepAction(const std::shared_ptr<IDataSource>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId)
+ : TBase(ownerActorId)
+ , Source(source)
+ , Cursor(std::move(cursor))
+ , CountersGuard(Source->GetContext()->GetCommonContext()->GetCounters().GetAssembleTasksGuard()) {
+}
+
+TConclusion<bool> TColumnBlobsFetchingStep::DoExecuteInplace(
+ const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
+ return !source->StartFetchingColumns(source, step, Columns);
+}
+
+ui64 TColumnBlobsFetchingStep::GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const {
+ return source->GetColumnBlobBytes(Columns.GetColumnIds());
+}
+
+TConclusion<bool> TIndexBlobsFetchingStep::DoExecuteInplace(
+ const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
+ return !source->StartFetchingIndexes(source, step, Indexes);
+}
+
+TConclusion<bool> TAssemblerStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
+ source->AssembleColumns(Columns);
+ return true;
+}
+
+ui64 TAssemblerStep::GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const {
+ return source->GetColumnRawBytes(Columns->GetColumnIds());
+}
+
+TConclusion<bool> TOptionalAssemblerStep::DoExecuteInplace(
+ const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
+ source->AssembleColumns(Columns, !source->IsSourceInMemory());
+ return true;
+}
+
+ui64 TOptionalAssemblerStep::GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const {
+ return source->GetColumnsVolume(Columns->GetColumnIds(), EMemType::RawSequential);
+}
+
+TConclusion<bool> TFilterProgramStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
+ AFL_VERIFY(source);
+ AFL_VERIFY(Step);
+ auto filter = Step->BuildFilter(source->GetStageData().GetTable());
+ if (!filter.ok()) {
+ return TConclusionStatus::Fail(filter.status().message());
+ }
+ source->MutableStageData().AddFilter(*filter);
+ return true;
+}
+
+TConclusion<bool> TPredicateFilter::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
+ auto filter =
+ source->GetContext()->GetReadMetadata()->GetPKRangesFilter().BuildFilter(source->GetStageData().GetTable()->BuildTableVerified());
+ source->MutableStageData().AddFilter(filter);
+ return true;
+}
+
+TConclusion<bool> TSnapshotFilter::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
+ auto filter = MakeSnapshotFilter(
+ source->GetStageData().GetTable()->BuildTableVerified(), source->GetContext()->GetReadMetadata()->GetRequestSnapshot());
+ if (filter.GetFilteredCount().value_or(source->GetRecordsCount()) != source->GetRecordsCount()) {
+ if (source->AddTxConflict()) {
+ return true;
+ }
+ }
+ source->MutableStageData().AddFilter(filter);
+ return true;
+}
+
+TConclusion<bool> TDeletionFilter::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
+ auto filterTable = source->GetStageData().GetTable()->BuildTableOptional(std::set<std::string>({ TIndexInfo::SPEC_COL_DELETE_FLAG }));
+ if (!filterTable) {
+ return true;
+ }
+ AFL_VERIFY(filterTable->column(0)->type()->id() == arrow::boolean()->id());
+ NArrow::TColumnFilter filter = NArrow::TColumnFilter::BuildAllowFilter();
+ for (auto&& i : filterTable->column(0)->chunks()) {
+ auto filterFlags = static_pointer_cast<arrow::BooleanArray>(i);
+ for (ui32 i = 0; i < filterFlags->length(); ++i) {
+ filter.Add(!filterFlags->GetView(i));
+ }
+ }
+ source->MutableStageData().AddFilter(filter);
+ return true;
+}
+
+TConclusion<bool> TShardingFilter::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
+ NYDBTest::TControllers::GetColumnShardController()->OnSelectShardingFilter();
+ const auto& shardingInfo = source->GetContext()->GetReadMetadata()->GetRequestShardingInfo()->GetShardingInfo();
+ auto filter = shardingInfo->GetFilter(source->GetStageData().GetTable()->BuildTableVerified());
+ source->MutableStageData().AddFilter(filter);
+ return true;
+}
+
+TConclusion<bool> TBuildFakeSpec::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+ for (auto&& f : IIndexInfo::ArrowSchemaSnapshot()->fields()) {
+ columns.emplace_back(NArrow::TThreadSimpleArraysCache::GetConst(f->type(), NArrow::DefaultScalar(f->type()), Count));
+ }
+ source->MutableStageData().AddBatch(
+ std::make_shared<NArrow::TGeneralContainer>(arrow::RecordBatch::Make(TIndexInfo::ArrowSchemaSnapshot(), Count, columns)));
+ return true;
+}
+
+TConclusion<bool> TApplyIndexStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
+ source->ApplyIndex(IndexChecker);
+ return true;
+}
+
+TConclusion<bool> TFetchingScriptCursor::Execute(const std::shared_ptr<IDataSource>& source) {
+ AFL_VERIFY(source);
+ NMiniKQL::TThrowingBindTerminator bind;
+ Script->OnExecute();
+ while (!Script->IsFinished(CurrentStepIdx)) {
+ if (source->GetStageData().IsEmpty()) {
+ source->OnEmptyStageData();
+ break;
+ }
+ auto step = Script->GetStep(CurrentStepIdx);
+ TMemoryProfileGuard mGuard("SCAN_PROFILE::FETCHING::" + step->GetName() + "::" + Script->GetBranchName(),
+ IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY));
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("scan_step", step->DebugString())("scan_step_idx", CurrentStepIdx);
+ AFL_VERIFY(!CurrentStartInstant);
+ CurrentStartInstant = TMonotonic::Now();
+ AFL_VERIFY(!CurrentStartDataSize);
+ CurrentStartDataSize = step->GetProcessingDataSize(source);
+ const TConclusion<bool> resultStep = step->ExecuteInplace(source, *this);
+ if (!resultStep) {
+ return resultStep;
+ }
+ if (!*resultStep) {
+ return false;
+ }
+ FlushDuration();
+ ++CurrentStepIdx;
+ }
+ return true;
+}
+
+bool TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
+ const std::shared_ptr<NGroupedMemoryManager::IAllocation>& /*allocation*/) {
+ auto data = Source.lock();
+ if (!data || data->GetContext()->IsAborted()) {
+ guard->Release();
+ return false;
+ }
+ data->RegisterAllocationGuard(std::move(guard));
+ Step.Next();
+ auto task = std::make_shared<TStepAction>(data, std::move(Step), data->GetContext()->GetCommonContext()->GetScanActorId());
+ NConveyor::TScanServiceOperator::SendTaskToExecute(task);
+ return true;
+}
+
+TAllocateMemoryStep::TFetchingStepAllocation::TFetchingStepAllocation(
+ const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step)
+ : TBase(mem)
+ , Source(source)
+ , Step(step)
+ , TasksGuard(source->GetContext()->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard()) {
+}
+
+void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(const TString& errorMessage) {
+ auto sourcePtr = Source.lock();
+ if (sourcePtr) {
+ sourcePtr->GetContext()->GetCommonContext()->AbortWithError(
+ "cannot allocate memory for step " + Step.GetName() + ": '" + errorMessage + "'");
+ }
+}
+
+TConclusion<bool> TAllocateMemoryStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
+ ui64 size = PredefinedSize.value_or(0);
+ for (auto&& i : Packs) {
+ ui32 sizeLocal = source->GetColumnsVolume(i.GetColumns().GetColumnIds(), i.GetMemType());
+ if (source->GetStageData().GetUseFilter() && source->GetContext()->GetReadMetadata()->Limit && i.GetMemType() != EMemType::Blob) {
+ const ui32 filtered =
+ source->GetStageData().GetFilteredCount(source->GetRecordsCount(), source->GetContext()->GetReadMetadata()->Limit);
+ if (filtered < source->GetRecordsCount()) {
+ sizeLocal = sizeLocal * 1.0 * filtered / source->GetRecordsCount();
+ }
+ }
+ size += sizeLocal;
+ }
+
+ auto allocation = std::make_shared<TFetchingStepAllocation>(source, size, step);
+ NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(source->GetContext()->GetProcessMemoryControlId(),
+ source->GetContext()->GetCommonContext()->GetScanId(), source->GetMemoryGroupId(), { allocation }, (ui32)StageIndex);
+ return false;
+}
+
+ui64 TAllocateMemoryStep::GetProcessingDataSize(const std::shared_ptr<IDataSource>& /*source*/) const {
+ return 0;
+}
+
+TString TFetchingScript::DebugString() const {
+ TStringBuilder sb;
+ TStringBuilder sbBranch;
+ for (auto&& i : Steps) {
+ if (i->GetSumDuration() > TDuration::MilliSeconds(10)) {
+ sbBranch << "{" << i->DebugString() << "};";
+ }
+ }
+ if (!sbBranch) {
+ return "";
+ }
+ sb << "{branch:" << BranchName << ";limit:" << Limit << ";";
+ if (FinishInstant && StartInstant) {
+ sb << "duration:" << *FinishInstant - *StartInstant << ";";
+ }
+
+ sb << "steps_10Ms:[" << sbBranch << "]}";
+ return sb;
+}
+
+TFetchingScript::TFetchingScript(const TSpecialReadContext& context)
+ : Limit(context.GetReadMetadata()->Limit) {
+}
+
+void TFetchingScript::Allocation(const std::set<ui32>& entityIds, const EStageFeaturesIndexes stage, const EMemType mType) {
+ if (Steps.size() == 0) {
+ AddStep<TAllocateMemoryStep>(entityIds, mType, stage);
+ } else {
+ std::optional<ui32> addIndex;
+ for (i32 i = Steps.size() - 1; i >= 0; --i) {
+ if (auto allocation = std::dynamic_pointer_cast<TAllocateMemoryStep>(Steps[i])) {
+ if (allocation->GetStage() == stage) {
+ allocation->AddAllocation(entityIds, mType);
+ return;
+ } else {
+ addIndex = i + 1;
+ }
+ break;
+ } else if (std::dynamic_pointer_cast<TAssemblerStep>(Steps[i])) {
+ continue;
+ } else if (std::dynamic_pointer_cast<TColumnBlobsFetchingStep>(Steps[i])) {
+ continue;
+ } else {
+ addIndex = i + 1;
+ break;
+ }
+ }
+ AFL_VERIFY(addIndex);
+ InsertStep<TAllocateMemoryStep>(*addIndex, entityIds, mType, stage);
+ }
+}
+
+NKikimr::TConclusion<bool> TFilterCutLimit::DoExecuteInplace(
+ const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
+ source->MutableStageData().CutFilter(source->GetRecordsCount(), Limit, Reverse);
+ return true;
+}
+
+TConclusion<bool> TPortionAccessorFetchingStep::DoExecuteInplace(
+ const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
+ return !source->StartFetchingAccessor(source, step);
+}
+
+TConclusion<bool> TDetectInMem::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
+ if (Columns.GetColumnsCount()) {
+ source->SetSourceInMemory(
+ source->GetColumnRawBytes(Columns.GetColumnIds()) < NYDBTest::TControllers::GetColumnShardController()->GetMemoryLimitScanPortion());
+ } else {
+ source->SetSourceInMemory(true);
+ }
+ AFL_VERIFY(source->GetStageData().HasPortionAccessor());
+ auto plan = source->GetContext()->GetColumnsFetchingPlan(source);
+ source->InitFetchingPlan(plan);
+ TFetchingScriptCursor cursor(plan, 0);
+ auto task = std::make_shared<TStepAction>(source, std::move(cursor), source->GetContext()->GetCommonContext()->GetScanActorId());
+ NConveyor::TScanServiceOperator::SendTaskToExecute(task);
+ return false;
+}
+
+namespace {
+class TApplySourceResult: public IDataTasksProcessor::ITask {
+private:
+ using TBase = IDataTasksProcessor::ITask;
+ YDB_READONLY_DEF(std::shared_ptr<arrow::Table>, Result);
+ YDB_READONLY_DEF(std::shared_ptr<IDataSource>, Source);
+ YDB_READONLY(ui32, StartIndex, 0);
+ YDB_READONLY(ui32, OriginalRecordsCount, 0);
+ NColumnShard::TCounterGuard Guard;
+ TFetchingScriptCursor Step;
+
+public:
+ TString GetTaskClassIdentifier() const override {
+ return "TApplySourceResult";
+ }
+
+ TApplySourceResult(const std::shared_ptr<IDataSource>& source, std::shared_ptr<arrow::Table>&& result, const ui32 startIndex,
+ const ui32 originalRecordsCount, const TFetchingScriptCursor& step)
+ : TBase(NActors::TActorId())
+ , Result(result)
+ , Source(source)
+ , StartIndex(startIndex)
+ , OriginalRecordsCount(originalRecordsCount)
+ , Guard(source->GetContext()->GetCommonContext()->GetCounters().GetResultsForSourceGuard())
+ , Step(step)
+ {
+ }
+
+ virtual TConclusionStatus DoExecuteImpl() override {
+ AFL_VERIFY(false)("event", "not applicable");
+ return TConclusionStatus::Success();
+ }
+ virtual bool DoApply(IDataReader& indexedDataRead) const override {
+ auto* plainReader = static_cast<TPlainReadData*>(&indexedDataRead);
+ auto resultCopy = Result;
+ Source->SetCursor(Step);
+ plainReader->MutableScanner().OnSourceReady(Source, std::move(resultCopy), StartIndex, OriginalRecordsCount, *plainReader);
+ return true;
+ }
+};
+
+} // namespace
+
+TConclusion<bool> TBuildResultStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
+ auto context = source->GetContext();
+ NArrow::TGeneralContainer::TTableConstructionContext contextTableConstruct;
+ contextTableConstruct.SetColumnNames(context->GetProgramInputColumns()->GetColumnNamesVector());
+ if (!source->IsSourceInMemory()) {
+ contextTableConstruct.SetStartIndex(StartIndex).SetRecordsCount(RecordsCount);
+ } else {
+ AFL_VERIFY(StartIndex == 0);
+ AFL_VERIFY(RecordsCount == source->GetRecordsCount())("records_count", RecordsCount)("source", source->GetRecordsCount());
+ }
+ std::shared_ptr<arrow::Table> resultBatch;
+ if (!source->GetStageResult().IsEmpty()) {
+ resultBatch = source->GetStageResult().GetBatch()->BuildTableVerified(contextTableConstruct);
+ AFL_VERIFY((ui32)resultBatch->num_columns() == context->GetProgramInputColumns()->GetColumnNamesVector().size());
+ if (auto filter = source->GetStageResult().GetNotAppliedFilter()) {
+ filter->Apply(resultBatch, StartIndex, RecordsCount);
+ }
+ if (resultBatch && resultBatch->num_rows()) {
+ NArrow::TStatusValidator::Validate(context->GetReadMetadata()->GetProgram().ApplyProgram(resultBatch));
+ }
+ }
+ NActors::TActivationContext::AsActorContext().Send(context->GetCommonContext()->GetScanActorId(),
+ new NColumnShard::TEvPrivate::TEvTaskProcessedResult(
+ std::make_shared<TApplySourceResult>(source, std::move(resultBatch), StartIndex, RecordsCount, step)));
+ return false;
+}
+
+TConclusion<bool> TPrepareResultStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
+ source->Finalize(NYDBTest::TControllers::GetColumnShardController()->GetMemoryLimitScanPortion());
+ std::shared_ptr<TFetchingScript> plan = std::make_shared<TFetchingScript>(*source->GetContext());
+ if (source->IsSourceInMemory()) {
+ AFL_VERIFY(source->GetStageResult().GetPagesToResultVerified().size() == 1);
+ }
+ for (auto&& i : source->GetStageResult().GetPagesToResultVerified()) {
+ if (source->GetIsStartedByCursor() && !source->GetContext()->GetCommonContext()->GetScanCursor()->CheckSourceIntervalUsage(
+ source->GetSourceId(), i.GetIndexStart(), i.GetRecordsCount())) {
+ continue;
+ }
+ plan->AddStep<TBuildResultStep>(i.GetIndexStart(), i.GetRecordsCount());
+ }
+ AFL_VERIFY(!plan->IsFinished(0));
+ source->InitFetchingPlan(plan);
+
+ TFetchingScriptCursor cursor(plan, 0);
+ auto task = std::make_shared<TStepAction>(source, std::move(cursor), source->GetContext()->GetCommonContext()->GetScanActorId());
+ NConveyor::TScanServiceOperator::SendTaskToExecute(task);
+ return false;
+}
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h
new file mode 100644
index 0000000000..08cf3e8878
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h
@@ -0,0 +1,520 @@
+#pragma once
+#include "columns_set.h"
+
+#include <ydb/core/tx/columnshard/counters/scan.h>
+#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
+#include <ydb/core/tx/columnshard/engines/reader/common/conveyor_task.h>
+#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+#include <ydb/core/tx/limiter/grouped_memory/usage/abstract.h>
+
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+class IDataSource;
+class TFetchingScriptCursor;
+class TSpecialReadContext;
+class IFetchingStep {
+private:
+ YDB_READONLY_DEF(TString, Name);
+ YDB_READONLY(TDuration, SumDuration, TDuration::Zero());
+ YDB_READONLY(ui64, SumSize, 0);
+
+protected:
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const = 0;
+ virtual TString DoDebugString() const {
+ return "";
+ }
+
+public:
+ void AddDuration(const TDuration d) {
+ SumDuration += d;
+ }
+ void AddDataSize(const ui64 size) {
+ SumSize += size;
+ }
+
+ virtual ~IFetchingStep() = default;
+
+ [[nodiscard]] TConclusion<bool> ExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
+ return DoExecuteInplace(source, step);
+ }
+
+ virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& /*source*/) const {
+ return 0;
+ }
+
+ IFetchingStep(const TString& name)
+ : Name(name) {
+ }
+
+ TString DebugString() const {
+ TStringBuilder sb;
+ sb << "name=" << Name << ";duration=" << SumDuration << ";"
+ << "size=" << 1e-9 * SumSize << ";details={" << DoDebugString() << "};";
+ return sb;
+ }
+};
+
+class TFetchingScript {
+private:
+ YDB_ACCESSOR(TString, BranchName, "UNDEFINED");
+ std::vector<std::shared_ptr<IFetchingStep>> Steps;
+ std::optional<TMonotonic> StartInstant;
+ std::optional<TMonotonic> FinishInstant;
+ const ui32 Limit;
+
+public:
+ TFetchingScript(const TSpecialReadContext& context);
+
+ void Allocation(const std::set<ui32>& entityIds, const EStageFeaturesIndexes stage, const EMemType mType);
+
+ void AddStepDataSize(const ui32 index, const ui64 size) {
+ GetStep(index)->AddDataSize(size);
+ }
+
+ void AddStepDuration(const ui32 index, const TDuration d) {
+ FinishInstant = TMonotonic::Now();
+ GetStep(index)->AddDuration(d);
+ }
+
+ void OnExecute() {
+ if (!StartInstant) {
+ StartInstant = TMonotonic::Now();
+ }
+ }
+
+ TString DebugString() const;
+
+ const std::shared_ptr<IFetchingStep>& GetStep(const ui32 index) const {
+ AFL_VERIFY(index < Steps.size());
+ return Steps[index];
+ }
+
+ template <class T, typename... Args>
+ std::shared_ptr<T> AddStep(Args... args) {
+ auto result = std::make_shared<T>(args...);
+ Steps.emplace_back(result);
+ return result;
+ }
+
+ template <class T, typename... Args>
+ std::shared_ptr<T> InsertStep(const ui32 index, Args... args) {
+ AFL_VERIFY(index <= Steps.size())("index", index)("size", Steps.size());
+ auto result = std::make_shared<T>(args...);
+ Steps.insert(Steps.begin() + index, result);
+ return result;
+ }
+
+ void AddStep(const std::shared_ptr<IFetchingStep>& step) {
+ AFL_VERIFY(step);
+ Steps.emplace_back(step);
+ }
+
+ bool IsFinished(const ui32 currentStepIdx) const {
+ AFL_VERIFY(currentStepIdx <= Steps.size());
+ return currentStepIdx == Steps.size();
+ }
+
+ ui32 Execute(const ui32 startStepIdx, const std::shared_ptr<IDataSource>& source) const;
+};
+
+class TFetchingScriptCursor {
+private:
+ std::optional<TMonotonic> CurrentStartInstant;
+ std::optional<ui64> CurrentStartDataSize;
+ ui32 CurrentStepIdx = 0;
+ std::shared_ptr<TFetchingScript> Script;
+ void FlushDuration() {
+ AFL_VERIFY(CurrentStartInstant);
+ AFL_VERIFY(CurrentStartDataSize);
+ Script->AddStepDuration(CurrentStepIdx, TMonotonic::Now() - *CurrentStartInstant);
+ Script->AddStepDataSize(CurrentStepIdx, *CurrentStartDataSize);
+ CurrentStartInstant.reset();
+ CurrentStartDataSize.reset();
+ }
+
+public:
+ TFetchingScriptCursor(const std::shared_ptr<TFetchingScript>& script, const ui32 index)
+ : CurrentStepIdx(index)
+ , Script(script) {
+ AFL_VERIFY(!Script->IsFinished(CurrentStepIdx));
+ }
+
+ const TString& GetName() const {
+ return Script->GetStep(CurrentStepIdx)->GetName();
+ }
+
+ TString DebugString() const {
+ return Script->GetStep(CurrentStepIdx)->DebugString();
+ }
+
+ bool Next() {
+ FlushDuration();
+ return !Script->IsFinished(++CurrentStepIdx);
+ }
+
+ TConclusion<bool> Execute(const std::shared_ptr<IDataSource>& source);
+};
+
+class TStepAction: public IDataTasksProcessor::ITask {
+private:
+ using TBase = IDataTasksProcessor::ITask;
+ std::shared_ptr<IDataSource> Source;
+ TFetchingScriptCursor Cursor;
+ bool FinishedFlag = false;
+ NColumnShard::TCounterGuard CountersGuard;
+
+protected:
+ virtual bool DoApply(IDataReader& owner) const override;
+ virtual TConclusionStatus DoExecuteImpl() override;
+
+public:
+ virtual TString GetTaskClassIdentifier() const override {
+ return "STEP_ACTION";
+ }
+
+ TStepAction(const std::shared_ptr<IDataSource>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId);
+};
+
+class TBuildFakeSpec: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+ const ui32 Count = 0;
+
+protected:
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
+
+public:
+ TBuildFakeSpec(const ui32 count)
+ : TBase("FAKE_SPEC")
+ , Count(count) {
+ AFL_VERIFY(Count);
+ }
+};
+
+class TApplyIndexStep: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+ const NIndexes::TIndexCheckerContainer IndexChecker;
+
+protected:
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
+
+public:
+ TApplyIndexStep(const NIndexes::TIndexCheckerContainer& indexChecker)
+ : TBase("APPLY_INDEX")
+ , IndexChecker(indexChecker) {
+ }
+};
+
+class TAllocateMemoryStep: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+ class TColumnsPack {
+ private:
+ YDB_READONLY_DEF(TColumnsSetIds, Columns);
+ YDB_READONLY(EMemType, MemType, EMemType::Blob);
+
+ public:
+ TColumnsPack(const TColumnsSetIds& columns, const EMemType memType)
+ : Columns(columns)
+ , MemType(memType) {
+ }
+ };
+ std::vector<TColumnsPack> Packs;
+ THashMap<ui32, THashSet<EMemType>> Control;
+ const EStageFeaturesIndexes StageIndex;
+ const std::optional<ui64> PredefinedSize;
+
+protected:
+ class TFetchingStepAllocation: public NGroupedMemoryManager::IAllocation {
+ private:
+ using TBase = NGroupedMemoryManager::IAllocation;
+ std::weak_ptr<IDataSource> Source;
+ TFetchingScriptCursor Step;
+ NColumnShard::TCounterGuard TasksGuard;
+ virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
+ const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override;
+ virtual void DoOnAllocationImpossible(const TString& errorMessage) override;
+
+ public:
+ TFetchingStepAllocation(const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step);
+ };
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
+ virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
+ virtual TString DoDebugString() const override {
+ return TStringBuilder() << "stage=" << StageIndex << ";";
+ }
+
+public:
+ void AddAllocation(const TColumnsSetIds& ids, const EMemType memType) {
+ if (!ids.GetColumnsCount()) {
+ return;
+ }
+ for (auto&& i : ids.GetColumnIds()) {
+ AFL_VERIFY(Control[i].emplace(memType).second);
+ }
+ Packs.emplace_back(ids, memType);
+ }
+ EStageFeaturesIndexes GetStage() const {
+ return StageIndex;
+ }
+
+ TAllocateMemoryStep(const ui64 memSize, const EStageFeaturesIndexes stageIndex)
+ : TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex))
+ , StageIndex(stageIndex)
+ , PredefinedSize(memSize) {
+ }
+
+ TAllocateMemoryStep(const TColumnsSetIds& columns, const EMemType memType, const EStageFeaturesIndexes stageIndex)
+ : TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex))
+ , StageIndex(stageIndex) {
+ AddAllocation(columns, memType);
+ }
+};
+
+class TDetectInMemStep: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+ const TColumnsSetIds Columns;
+
+protected:
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
+ virtual TString DoDebugString() const override {
+ return TStringBuilder() << "columns=" << Columns.DebugString() << ";";
+ }
+
+public:
+ virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
+ TDetectInMemStep(const TColumnsSetIds& columns)
+ : TBase("FETCHING_COLUMNS")
+ , Columns(columns) {
+ AFL_VERIFY(Columns.GetColumnsCount());
+ }
+};
+
+class TPrepareResultStep: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+
+protected:
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
+ virtual TString DoDebugString() const override {
+ return TStringBuilder();
+ }
+
+public:
+ virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& /*source*/) const override {
+ return 0;
+ }
+ TPrepareResultStep()
+ : TBase("PREPARE_RESULT") {
+ }
+};
+
+class TBuildResultStep: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+ const ui32 StartIndex;
+ const ui32 RecordsCount;
+
+protected:
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
+ virtual TString DoDebugString() const override {
+ return TStringBuilder();
+ }
+
+public:
+ virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& /*source*/) const override {
+ return 0;
+ }
+ TBuildResultStep(const ui32 startIndex, const ui32 recordsCount)
+ : TBase("BUILD_RESULT")
+ , StartIndex(startIndex)
+ , RecordsCount(recordsCount)
+ {
+ }
+};
+
+class TColumnBlobsFetchingStep: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+ TColumnsSetIds Columns;
+
+protected:
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
+ virtual TString DoDebugString() const override {
+ return TStringBuilder() << "columns=" << Columns.DebugString() << ";";
+ }
+
+public:
+ virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
+ TColumnBlobsFetchingStep(const TColumnsSetIds& columns)
+ : TBase("FETCHING_COLUMNS")
+ , Columns(columns) {
+ AFL_VERIFY(Columns.GetColumnsCount());
+ }
+};
+
+class TPortionAccessorFetchingStep: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+
+protected:
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
+ virtual TString DoDebugString() const override {
+ return TStringBuilder();
+ }
+
+public:
+ TPortionAccessorFetchingStep()
+ : TBase("FETCHING_ACCESSOR") {
+ }
+};
+
+class TIndexBlobsFetchingStep: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+ std::shared_ptr<TIndexesSet> Indexes;
+
+protected:
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
+ virtual TString DoDebugString() const override {
+ return TStringBuilder() << "indexes=" << Indexes->DebugString() << ";";
+ }
+
+public:
+ TIndexBlobsFetchingStep(const std::shared_ptr<TIndexesSet>& indexes)
+ : TBase("FETCHING_INDEXES")
+ , Indexes(indexes) {
+ AFL_VERIFY(Indexes);
+ AFL_VERIFY(Indexes->GetIndexesCount());
+ }
+};
+
+class TAssemblerStep: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+ YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, Columns);
+ virtual TString DoDebugString() const override {
+ return TStringBuilder() << "columns=" << Columns->DebugString() << ";";
+ }
+
+public:
+ virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
+ TAssemblerStep(const std::shared_ptr<TColumnsSet>& columns, const TString& specName = Default<TString>())
+ : TBase("ASSEMBLER" + (specName ? "::" + specName : ""))
+ , Columns(columns) {
+ AFL_VERIFY(Columns);
+ AFL_VERIFY(Columns->GetColumnsCount());
+ }
+};
+
+class TOptionalAssemblerStep: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+ YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, Columns);
+ virtual TString DoDebugString() const override {
+ return TStringBuilder() << "columns=" << Columns->DebugString() << ";";
+ }
+
+public:
+ virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
+
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
+ TOptionalAssemblerStep(const std::shared_ptr<TColumnsSet>& columns, const TString& specName = Default<TString>())
+ : TBase("OPTIONAL_ASSEMBLER" + (specName ? "::" + specName : ""))
+ , Columns(columns) {
+ AFL_VERIFY(Columns);
+ AFL_VERIFY(Columns->GetColumnsCount());
+ }
+};
+
+class TFilterProgramStep: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+ std::shared_ptr<NSsa::TProgramStep> Step;
+
+public:
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
+ TFilterProgramStep(const std::shared_ptr<NSsa::TProgramStep>& step)
+ : TBase("PROGRAM")
+ , Step(step) {
+ }
+};
+
+class TFilterCutLimit: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+ const ui32 Limit;
+ const bool Reverse;
+
+public:
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
+ TFilterCutLimit(const ui32 limit, const bool reverse)
+ : TBase("LIMIT")
+ , Limit(limit)
+ , Reverse(reverse) {
+ }
+};
+
+class TPredicateFilter: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+
+public:
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
+ TPredicateFilter()
+ : TBase("PREDICATE") {
+ }
+};
+
+class TSnapshotFilter: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+
+public:
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
+ TSnapshotFilter()
+ : TBase("SNAPSHOT") {
+ }
+};
+
+class TDetectInMem: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+ TColumnsSetIds Columns;
+
+public:
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
+ TDetectInMem(const TColumnsSetIds& columns)
+ : TBase("DETECT_IN_MEM")
+ , Columns(columns) {
+ }
+};
+
+class TDeletionFilter: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+
+public:
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
+ TDeletionFilter()
+ : TBase("DELETION") {
+ }
+};
+
+class TShardingFilter: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+
+public:
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
+ TShardingFilter()
+ : TBase("SHARDING") {
+ }
+};
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/iterator.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/iterator.cpp
new file mode 100644
index 0000000000..39c548800d
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/iterator.cpp
@@ -0,0 +1,59 @@
+#include "iterator.h"
+
+#include <ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+TColumnShardScanIterator::TColumnShardScanIterator(const std::shared_ptr<TReadContext>& context, const TReadMetadata::TConstPtr& readMetadata)
+ : Context(context)
+ , ReadMetadata(readMetadata)
+ , ReadyResults(context->GetCounters()) {
+ IndexedData = readMetadata->BuildReader(Context);
+ Y_ABORT_UNLESS(Context->GetReadMetadata()->IsSorted());
+}
+
+TConclusion<std::shared_ptr<TPartialReadResult>> TColumnShardScanIterator::GetBatch() {
+ FillReadyResults();
+ return ReadyResults.pop_front();
+}
+
+void TColumnShardScanIterator::PrepareResults() {
+ FillReadyResults();
+}
+
+TConclusion<bool> TColumnShardScanIterator::ReadNextInterval() {
+ return IndexedData->ReadNextInterval();
+}
+
+void TColumnShardScanIterator::DoOnSentDataFromInterval(const ui32 intervalIdx) const {
+ return IndexedData->OnSentDataFromInterval(intervalIdx);
+}
+
+void TColumnShardScanIterator::FillReadyResults() {
+ auto ready = IndexedData->ExtractReadyResults(MaxRowsInBatch);
+ const i64 limitLeft = Context->GetReadMetadata()->Limit == 0 ? INT64_MAX : Context->GetReadMetadata()->Limit;
+ for (size_t i = 0; i < ready.size(); ++i) {
+ auto& batch = ReadyResults.emplace_back(std::move(ready[i]));
+ AFL_VERIFY(batch->GetResultBatch().num_rows() <= limitLeft);
+ ItemsRead += batch->GetResultBatch().num_rows();
+ }
+}
+
+TColumnShardScanIterator::~TColumnShardScanIterator() {
+ if (!IndexedData->IsFinished()) {
+ IndexedData->Abort("iterator destructor");
+ }
+ ReadMetadata->ReadStats->PrintToLog();
+}
+
+void TColumnShardScanIterator::Apply(const std::shared_ptr<IApplyAction>& task) {
+ if (!IndexedData->IsFinished()) {
+ Y_ABORT_UNLESS(task->Apply(*IndexedData));
+ }
+}
+
+const TReadStats& TColumnShardScanIterator::GetStats() const {
+ return *ReadMetadata->ReadStats;
+}
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/iterator.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/iterator.h
new file mode 100644
index 0000000000..46d34944f2
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/iterator.h
@@ -0,0 +1,103 @@
+#pragma once
+#include <ydb/core/tx/columnshard/counters/scan.h>
+#include <ydb/core/tx/columnshard/engines/reader/abstract/abstract.h>
+#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+struct TReadMetadata;
+
+class TReadyResults {
+private:
+ const NColumnShard::TConcreteScanCounters Counters;
+ std::deque<std::shared_ptr<TPartialReadResult>> Data;
+ i64 RecordsCount = 0;
+public:
+ TString DebugString() const {
+ TStringBuilder sb;
+ sb
+ << "count:" << Data.size() << ";"
+ << "records_count:" << RecordsCount << ";"
+ ;
+ if (Data.size()) {
+ sb << "schema=" << Data.front()->GetResultBatch().schema()->ToString() << ";";
+ }
+ return sb;
+ }
+ TReadyResults(const NColumnShard::TConcreteScanCounters& counters)
+ : Counters(counters)
+ {
+
+ }
+ const std::shared_ptr<TPartialReadResult>& emplace_back(std::shared_ptr<TPartialReadResult>&& v) {
+ AFL_VERIFY(!!v);
+ RecordsCount += v->GetResultBatch().num_rows();
+ Data.emplace_back(std::move(v));
+ return Data.back();
+ }
+ std::shared_ptr<TPartialReadResult> pop_front() {
+ if (Data.empty()) {
+ return {};
+ }
+ auto result = std::move(Data.front());
+ AFL_VERIFY(RecordsCount >= result->GetResultBatch().num_rows());
+ RecordsCount -= result->GetResultBatch().num_rows();
+ Data.pop_front();
+ return result;
+ }
+ bool empty() const {
+ return Data.empty();
+ }
+ size_t size() const {
+ return Data.size();
+ }
+};
+
+class TColumnShardScanIterator: public TScanIteratorBase {
+private:
+ std::shared_ptr<TReadContext> Context;
+ std::shared_ptr<const TReadMetadata> ReadMetadata;
+ TReadyResults ReadyResults;
+ std::shared_ptr<IDataReader> IndexedData;
+ ui64 ItemsRead = 0;
+ const i64 MaxRowsInBatch = 5000;
+ virtual void DoOnSentDataFromInterval(const ui32 intervalIdx) const override;
+
+public:
+ TColumnShardScanIterator(const std::shared_ptr<TReadContext>& context, const std::shared_ptr<const TReadMetadata>& readMetadata);
+ ~TColumnShardScanIterator();
+
+ virtual TConclusionStatus Start() override {
+ AFL_VERIFY(IndexedData);
+ return IndexedData->Start();
+ }
+
+ virtual std::optional<ui32> GetAvailableResultsCount() const override {
+ return ReadyResults.size();
+ }
+
+ virtual const TReadStats& GetStats() const override;
+
+ virtual TString DebugString(const bool verbose) const override {
+ return TStringBuilder()
+ << "ready_results:(" << ReadyResults.DebugString() << ");"
+ << "indexed_data:(" << IndexedData->DebugString(verbose) << ")"
+ ;
+ }
+
+ virtual void Apply(const std::shared_ptr<IApplyAction>& task) override;
+
+ bool Finished() const override {
+ return IndexedData->IsFinished() && ReadyResults.empty();
+ }
+
+ virtual TConclusion<std::shared_ptr<TPartialReadResult>> GetBatch() override;
+ virtual void PrepareResults() override;
+
+ virtual TConclusion<bool> ReadNextInterval() override;
+
+private:
+ void FillReadyResults();
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp
new file mode 100644
index 0000000000..b98e3d7dba
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp
@@ -0,0 +1,58 @@
+#include "plain_read_data.h"
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
+ : TBase(context)
+ , SpecialReadContext(std::make_shared<TSpecialReadContext>(context))
+{
+ ui32 sourceIdx = 0;
+ std::deque<std::shared_ptr<IDataSource>> sources;
+ const auto& portions = GetReadMetadata()->SelectInfo->PortionsOrderedPK;
+ ui64 compactedPortionsBytes = 0;
+ ui64 insertedPortionsBytes = 0;
+ for (auto&& i : portions) {
+ if (i->GetMeta().GetProduced() == NPortion::EProduced::COMPACTED || i->GetMeta().GetProduced() == NPortion::EProduced::SPLIT_COMPACTED) {
+ compactedPortionsBytes += i->GetTotalBlobBytes();
+ } else {
+ insertedPortionsBytes += i->GetTotalBlobBytes();
+ }
+ sources.emplace_back(std::make_shared<TPortionDataSource>(sourceIdx++, i, SpecialReadContext));
+ }
+ Scanner = std::make_shared<TScanHead>(std::move(sources), SpecialReadContext);
+
+ auto& stats = GetReadMetadata()->ReadStats;
+ stats->IndexPortions = GetReadMetadata()->SelectInfo->PortionsOrderedPK.size();
+ stats->IndexBatches = GetReadMetadata()->NumIndexedBlobs();
+ stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetColumnsCount();
+ stats->InsertedPortionsBytes = insertedPortionsBytes;
+ stats->CompactedPortionsBytes = compactedPortionsBytes;
+
+}
+
+std::vector<std::shared_ptr<TPartialReadResult>> TPlainReadData::DoExtractReadyResults(const int64_t /*maxRowsInBatch*/) {
+ auto result = std::move(PartialResults);
+ PartialResults.clear();
+// auto result = TPartialReadResult::SplitResults(std::move(PartialResults), maxRowsInBatch);
+ ui32 count = 0;
+ for (auto&& r: result) {
+ count += r->GetRecordsCount();
+ }
+ AFL_VERIFY(count == ReadyResultsCount);
+ ReadyResultsCount = 0;
+
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoExtractReadyResults")("result", result.size())("count", count)("finished", Scanner->IsFinished());
+ return result;
+}
+
+TConclusion<bool> TPlainReadData::DoReadNextInterval() {
+ return Scanner->BuildNextInterval();
+}
+
+void TPlainReadData::OnIntervalResult(const std::shared_ptr<TPartialReadResult>& result) {
+// result->GetResourcesGuardOnly()->Update(result->GetMemorySize());
+ ReadyResultsCount += result->GetRecordsCount();
+ PartialResults.emplace_back(result);
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h
new file mode 100644
index 0000000000..5e64761ac5
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h
@@ -0,0 +1,78 @@
+#pragma once
+#include "columns_set.h"
+#include "source.h"
+#include "scanner.h"
+
+#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
+#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
+#include <ydb/core/tx/columnshard/engines/reader/common/queue.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+class TPlainReadData: public IDataReader, TNonCopyable, NColumnShard::TMonitoringObjectsCounter<TPlainReadData> {
+private:
+ using TBase = IDataReader;
+ std::shared_ptr<TScanHead> Scanner;
+ std::shared_ptr<TSpecialReadContext> SpecialReadContext;
+ std::vector<std::shared_ptr<TPartialReadResult>> PartialResults;
+ ui32 ReadyResultsCount = 0;
+protected:
+ virtual TConclusionStatus DoStart() override {
+ return Scanner->Start();
+ }
+
+ virtual TString DoDebugString(const bool verbose) const override {
+ TStringBuilder sb;
+ sb << SpecialReadContext->DebugString() << ";";
+ if (verbose) {
+ sb << "intervals_schema=" << Scanner->DebugString();
+ }
+ return sb;
+ }
+
+ virtual std::vector<std::shared_ptr<TPartialReadResult>> DoExtractReadyResults(const int64_t maxRowsInBatch) override;
+ virtual TConclusion<bool> DoReadNextInterval() override;
+
+ virtual void DoAbort() override {
+ SpecialReadContext->Abort();
+ Scanner->Abort();
+ PartialResults.clear();
+ Y_ABORT_UNLESS(IsFinished());
+ }
+ virtual bool DoIsFinished() const override {
+ return (Scanner->IsFinished() && PartialResults.empty());
+ }
+public:
+ const TReadMetadata::TConstPtr& GetReadMetadata() const {
+ return SpecialReadContext->GetReadMetadata();
+ }
+
+ const std::shared_ptr<TSpecialReadContext>& GetSpecialReadContext() const {
+ return SpecialReadContext;
+ }
+
+ const TScanHead& GetScanner() const {
+ return *Scanner;
+ }
+
+ TScanHead& MutableScanner() {
+ return *Scanner;
+ }
+ virtual void OnSentDataFromInterval(const ui32 sourceIdx) const override {
+ if (SpecialReadContext->IsAborted()) {
+ return;
+ }
+ Scanner->ContinueSource(sourceIdx);
+ }
+
+ void OnIntervalResult(const std::shared_ptr<TPartialReadResult>& result);
+
+ TPlainReadData(const std::shared_ptr<TReadContext>& context);
+ ~TPlainReadData() {
+ if (!SpecialReadContext->IsAborted()) {
+ Abort("unexpected on destructor");
+ }
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp
new file mode 100644
index 0000000000..00a0ae70a1
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp
@@ -0,0 +1,133 @@
+#include "plain_read_data.h"
+#include "scanner.h"
+
+#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
+
+#include <ydb/library/actors/core/log.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::shared_ptr<arrow::Table>&& table, const ui32 startIndex,
+ const ui32 recordsCount, TPlainReadData& reader) {
+ source->MutableStageResult().SetResultChunk(std::move(table), startIndex, recordsCount);
+ if ((!table || !table->num_rows()) && Context->GetCommonContext()->GetReadMetadata()->Limit && InFlightLimit < MaxInFlight) {
+ InFlightLimit = 2 * InFlightLimit;
+ }
+ while (FetchingSources.size()) {
+ auto frontSource = *FetchingSources.begin();
+ if (!frontSource->HasStageResult()) {
+ break;
+ }
+ if (!frontSource->GetStageResult().HasResultChunk()) {
+ break;
+ }
+ auto table = frontSource->MutableStageResult().ExtractResultChunk();
+ const bool isFinished = frontSource->GetStageResult().IsFinished();
+ std::optional<ui32> sourceIdxToContinue;
+ if (!isFinished) {
+ sourceIdxToContinue = frontSource->GetSourceIdx();
+ }
+ if (table && table->num_rows()) {
+ auto cursor =
+ std::make_shared<TSimpleScanCursor>(frontSource->GetStartPKRecordBatch(), frontSource->GetSourceId(), startIndex + recordsCount);
+ reader.OnIntervalResult(std::make_shared<TPartialReadResult>(nullptr, nullptr, table, cursor, sourceIdxToContinue));
+ } else if (sourceIdxToContinue) {
+ ContinueSource(*sourceIdxToContinue);
+ break;
+ }
+ if (!isFinished) {
+ break;
+ }
+ AFL_VERIFY(FetchingSourcesByIdx.erase(frontSource->GetSourceIdx()));
+ if (Context->GetCommonContext()->GetReadMetadata()->Limit) {
+ FinishedSources.emplace(*FetchingSources.begin());
+ }
+ FetchingSources.erase(FetchingSources.begin());
+ while (FetchingSources.size() && FinishedSources.size()) {
+ auto finishedSource = *FinishedSources.begin();
+ auto fetchingSource = *FetchingSources.begin();
+ if (finishedSource->GetFinish() < fetchingSource->GetStart()) {
+ FetchedCount += finishedSource->GetRecordsCount();
+ }
+ FinishedSources.erase(FinishedSources.begin());
+ if (FetchedCount > Context->GetCommonContext()->GetReadMetadata()->Limit) {
+ Context->Abort();
+ Abort();
+ }
+ }
+ }
+}
+
+TConclusionStatus TScanHead::Start() {
+ for (auto&& i : SortedSources) {
+ i->InitFetchingPlan(Context->GetColumnsFetchingPlan(i));
+ }
+ return TConclusionStatus::Success();
+}
+
+TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const std::shared_ptr<TSpecialReadContext>& context)
+ : Context(context) {
+ if (HasAppData()) {
+ if (AppDataVerified().ColumnShardConfig.HasMaxInFlightIntervalsOnRequest()) {
+ MaxInFlight = AppDataVerified().ColumnShardConfig.GetMaxInFlightIntervalsOnRequest();
+ }
+ }
+ if (Context->GetReadMetadata()->Limit) {
+ InFlightLimit = 1;
+ } else {
+ InFlightLimit = MaxInFlight;
+ }
+ bool started = !context->GetCommonContext()->GetScanCursor()->IsInitialized();
+ for (auto&& i : sources) {
+ if (!started) {
+ bool usage = false;
+ if (!context->GetCommonContext()->GetScanCursor()->CheckEntityIsBorder(i, usage)) {
+ continue;
+ }
+ started = true;
+ if (!usage) {
+ continue;
+ }
+ i->SetIsStartedByCursor();
+ }
+ SortedSources.emplace(i);
+ }
+}
+
+TConclusion<bool> TScanHead::BuildNextInterval() {
+ if (Context->IsAborted()) {
+ return false;
+ }
+ bool changed = false;
+ while (SortedSources.size() && FetchingSources.size() < InFlightLimit) {
+ (*SortedSources.begin())->StartProcessing(*SortedSources.begin());
+ FetchingSources.emplace(*SortedSources.begin());
+ FetchingSourcesByIdx.emplace((*SortedSources.begin())->GetSourceIdx(), *SortedSources.begin());
+ SortedSources.erase(SortedSources.begin());
+ changed = true;
+ }
+ return changed;
+}
+
+const TReadContext& TScanHead::GetContext() const {
+ return *Context->GetCommonContext();
+}
+
+bool TScanHead::IsReverse() const {
+ return GetContext().GetReadMetadata()->IsDescSorted();
+}
+
+void TScanHead::Abort() {
+ AFL_VERIFY(Context->IsAborted());
+ for (auto&& i : FetchingSources) {
+ i->Abort();
+ }
+ for (auto&& i : SortedSources) {
+ i->Abort();
+ }
+ FetchingSources.clear();
+ SortedSources.clear();
+ Y_ABORT_UNLESS(IsFinished());
+}
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h
new file mode 100644
index 0000000000..bc94b997b4
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.h
@@ -0,0 +1,76 @@
+#pragma once
+#include "source.h"
+#include <ydb/core/formats/arrow/reader/position.h>
+#include <ydb/core/tx/columnshard/common/limits.h>
+#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
+#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+class TPlainReadData;
+
+class TDataSourceEndpoint {
+private:
+ YDB_READONLY_DEF(std::vector<std::shared_ptr<IDataSource>>, StartSources);
+ YDB_READONLY_DEF(std::vector<std::shared_ptr<IDataSource>>, FinishSources);
+public:
+ void AddStart(std::shared_ptr<IDataSource> source) {
+ StartSources.emplace_back(source);
+ }
+ void AddFinish(std::shared_ptr<IDataSource> source) {
+ FinishSources.emplace_back(source);
+ }
+};
+
+class TScanHead {
+private:
+ std::shared_ptr<TSpecialReadContext> Context;
+ THashMap<ui64, std::shared_ptr<IDataSource>> FetchingSourcesByIdx;
+ std::set<std::shared_ptr<IDataSource>, IDataSource::TCompareStartForScanSequence> SortedSources;
+ std::set<std::shared_ptr<IDataSource>, IDataSource::TCompareStartForScanSequence> FetchingSources;
+ std::set<std::shared_ptr<IDataSource>, IDataSource::TCompareFinishForScanSequence> FinishedSources;
+ ui64 FetchedCount = 0;
+ ui64 InFlightLimit = 1;
+ ui64 MaxInFlight = 256;
+public:
+
+ void ContinueSource(const ui32 sourceIdx) const {
+ auto it = FetchingSourcesByIdx.find(sourceIdx);
+ AFL_VERIFY(it != FetchingSourcesByIdx.end())("source_idx", sourceIdx)("count", FetchingSourcesByIdx.size());
+ it->second->ContinueCursor(it->second);
+ }
+
+ bool IsReverse() const;
+ void Abort();
+
+ bool IsFinished() const {
+ return FetchingSources.empty() && SortedSources.empty();
+ }
+
+ const TReadContext& GetContext() const;
+
+ TString DebugString() const {
+ TStringBuilder sb;
+ sb << "S:";
+ for (auto&& i : SortedSources) {
+ sb << i->GetSourceId() << ";";
+ }
+ sb << "F:";
+ for (auto&& i : FetchingSources) {
+ sb << i->GetSourceId() << ";";
+ }
+ return sb;
+ }
+
+ void OnSourceReady(const std::shared_ptr<IDataSource>& source, std::shared_ptr<arrow::Table>&& table, const ui32 startIndex,
+ const ui32 recordsCount, TPlainReadData& reader);
+
+ TConclusionStatus Start();
+
+ TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const std::shared_ptr<TSpecialReadContext>& context);
+
+ [[nodiscard]] TConclusion<bool> BuildNextInterval();
+
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp
new file mode 100644
index 0000000000..9c578661f0
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp
@@ -0,0 +1,245 @@
+#include "constructor.h"
+#include "fetched_data.h"
+#include "plain_read_data.h"
+#include "source.h"
+
+#include <ydb/core/tx/columnshard/blobs_reader/actor.h>
+#include <ydb/core/tx/columnshard/blobs_reader/events.h>
+#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
+#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
+#include <ydb/core/tx/conveyor/usage/service.h>
+#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>
+
+#include <ydb/library/formats/arrow/simple_arrays_cache.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+void IDataSource::InitFetchingPlan(const std::shared_ptr<TFetchingScript>& fetching) {
+ AFL_VERIFY(fetching);
+// AFL_VERIFY(!FetchingPlan);
+ FetchingPlan = fetching;
+}
+
+void IDataSource::StartProcessing(const std::shared_ptr<IDataSource>& sourcePtr) {
+ AFL_VERIFY(!ProcessingStarted);
+ AFL_VERIFY(FetchingPlan);
+ AFL_VERIFY(!Context->IsAborted());
+ ProcessingStarted = true;
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", SourceIdx);
+ NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan"));
+ TFetchingScriptCursor cursor(FetchingPlan, 0);
+ auto task = std::make_shared<TStepAction>(sourcePtr, std::move(cursor), Context->GetCommonContext()->GetScanActorId());
+ NConveyor::TScanServiceOperator::SendTaskToExecute(task);
+}
+
+void IDataSource::ContinueCursor(const std::shared_ptr<IDataSource>& sourcePtr) {
+ AFL_VERIFY(!!ScriptCursor);
+ if (ScriptCursor->Next()) {
+ auto task = std::make_shared<TStepAction>(sourcePtr, std::move(*ScriptCursor), Context->GetCommonContext()->GetScanActorId());
+ NConveyor::TScanServiceOperator::SendTaskToExecute(task);
+ ScriptCursor.reset();
+ }
+}
+
+void TPortionDataSource::NeedFetchColumns(const std::set<ui32>& columnIds, TBlobsAction& blobsAction,
+ THashMap<TChunkAddress, TPortionDataAccessor::TAssembleBlobInfo>& defaultBlocks, const std::shared_ptr<NArrow::TColumnFilter>& filter) {
+ const NArrow::TColumnFilter& cFilter = filter ? *filter : NArrow::TColumnFilter::BuildAllowFilter();
+ ui32 fetchedChunks = 0;
+ ui32 nullChunks = 0;
+ for (auto&& i : columnIds) {
+ auto columnChunks = GetStageData().GetPortionAccessor().GetColumnChunksPointers(i);
+ if (columnChunks.empty()) {
+ continue;
+ }
+ auto itFilter = cFilter.GetIterator(false, Portion->GetRecordsCount());
+ bool itFinished = false;
+ for (auto&& c : columnChunks) {
+ AFL_VERIFY(!itFinished);
+ if (!itFilter.IsBatchForSkip(c->GetMeta().GetRecordsCount())) {
+ auto reading = blobsAction.GetReading(Portion->GetColumnStorageId(c->GetColumnId(), Schema->GetIndexInfo()));
+ reading->SetIsBackgroundProcess(false);
+ reading->AddRange(Portion->RestoreBlobRange(c->BlobRange));
+ ++fetchedChunks;
+ } else {
+ defaultBlocks.emplace(c->GetAddress(), TPortionDataAccessor::TAssembleBlobInfo(c->GetMeta().GetRecordsCount(),
+ Schema->GetExternalDefaultValueVerified(c->GetColumnId())));
+ ++nullChunks;
+ }
+ itFinished = !itFilter.Next(c->GetMeta().GetRecordsCount());
+ }
+ AFL_VERIFY(itFinished)("filter", itFilter.DebugString())("count", Portion->GetRecordsCount());
+ }
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "chunks_stats")("fetch", fetchedChunks)("null", nullChunks)(
+ "reading_actions", blobsAction.GetStorageIds())("columns", columnIds.size());
+}
+
+bool TPortionDataSource::DoStartFetchingColumns(
+ const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step.GetName());
+ AFL_VERIFY(columns.GetColumnsCount());
+ AFL_VERIFY(!StageData->GetAppliedFilter() || !StageData->GetAppliedFilter()->IsTotalDenyFilter());
+ auto& columnIds = columns.GetColumnIds();
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step.GetName())("fetching_info", step.DebugString());
+
+ TBlobsAction action(GetContext()->GetCommonContext()->GetStoragesManager(), NBlobOperations::EConsumer::SCAN);
+ {
+ THashMap<TChunkAddress, TPortionDataAccessor::TAssembleBlobInfo> nullBlocks;
+ NeedFetchColumns(columnIds, action, nullBlocks, StageData->GetAppliedFilter());
+ StageData->AddDefaults(std::move(nullBlocks));
+ }
+
+ auto readActions = action.GetReadingActions();
+ if (!readActions.size()) {
+ return false;
+ }
+
+ auto constructor = std::make_shared<TBlobsFetcherTask>(readActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), "");
+ NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor));
+ return true;
+}
+
+bool TPortionDataSource::DoStartFetchingIndexes(
+ const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step, const std::shared_ptr<TIndexesSet>& indexes) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step.GetName());
+ AFL_VERIFY(indexes->GetIndexesCount());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step.GetName())("fetching_info", step.DebugString());
+
+ TBlobsAction action(GetContext()->GetCommonContext()->GetStoragesManager(), NBlobOperations::EConsumer::SCAN);
+ {
+ std::set<ui32> indexIds;
+ for (auto&& i : GetStageData().GetPortionAccessor().GetIndexesVerified()) {
+ if (!indexes->GetIndexIdsSet().contains(i.GetIndexId())) {
+ continue;
+ }
+ indexIds.emplace(i.GetIndexId());
+ if (auto bRange = i.GetBlobRangeOptional()) {
+ auto readAction = action.GetReading(Portion->GetIndexStorageId(i.GetIndexId(), Schema->GetIndexInfo()));
+ readAction->SetIsBackgroundProcess(false);
+ readAction->AddRange(Portion->RestoreBlobRange(*bRange));
+ }
+ }
+ if (indexes->GetIndexIdsSet().size() != indexIds.size()) {
+ return false;
+ }
+ }
+ auto readingActions = action.GetReadingActions();
+ if (!readingActions.size()) {
+ NYDBTest::TControllers::GetColumnShardController()->OnIndexSelectProcessed({});
+ return false;
+ }
+
+ auto constructor = std::make_shared<TBlobsFetcherTask>(readingActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), "");
+ NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor));
+ return true;
+}
+
+void TPortionDataSource::DoAbort() {
+}
+
+void TPortionDataSource::DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexChecker) {
+ THashMap<ui32, std::vector<TString>> indexBlobs;
+ std::set<ui32> indexIds = indexChecker->GetIndexIds();
+ // NActors::TLogContextGuard gLog = NActors::TLogContextBuilder::Build()("records_count", GetRecordsCount())("portion_id", Portion->GetAddress().DebugString());
+ std::vector<TPortionDataAccessor::TPage> pages = GetStageData().GetPortionAccessor().BuildPages();
+ NArrow::TColumnFilter constructor = NArrow::TColumnFilter::BuildAllowFilter();
+ for (auto&& p : pages) {
+ for (auto&& i : p.GetIndexes()) {
+ if (!indexIds.contains(i->GetIndexId())) {
+ continue;
+ }
+ if (i->HasBlobData()) {
+ indexBlobs[i->GetIndexId()].emplace_back(i->GetBlobDataVerified());
+ } else {
+ indexBlobs[i->GetIndexId()].emplace_back(StageData->ExtractBlob(i->GetAddress()));
+ }
+ }
+ for (auto&& i : indexIds) {
+ if (!indexBlobs.contains(i)) {
+ return;
+ }
+ }
+ if (indexChecker->Check(indexBlobs)) {
+ NYDBTest::TControllers::GetColumnShardController()->OnIndexSelectProcessed(true);
+ constructor.Add(true, p.GetRecordsCount());
+ } else {
+ NYDBTest::TControllers::GetColumnShardController()->OnIndexSelectProcessed(false);
+ constructor.Add(false, p.GetRecordsCount());
+ }
+ }
+ AFL_VERIFY(constructor.Size() == Portion->GetRecordsCount());
+ if (constructor.IsTotalDenyFilter()) {
+ StageData->AddFilter(NArrow::TColumnFilter::BuildDenyFilter());
+ } else if (constructor.IsTotalAllowFilter()) {
+ return;
+ } else {
+ StageData->AddFilter(constructor);
+ }
+}
+
+void TPortionDataSource::DoAssembleColumns(const std::shared_ptr<TColumnsSet>& columns, const bool sequential) {
+ auto blobSchema = GetContext()->GetReadMetadata()->GetLoadSchemaVerified(*Portion);
+
+ std::optional<TSnapshot> ss;
+ if (Portion->HasInsertWriteId()) {
+ if (Portion->HasCommitSnapshot()) {
+ ss = Portion->GetCommitSnapshotVerified();
+ } else if (GetContext()->GetReadMetadata()->IsMyUncommitted(Portion->GetInsertWriteIdVerified())) {
+ ss = GetContext()->GetReadMetadata()->GetRequestSnapshot();
+ }
+ }
+
+ auto batch = GetStageData()
+ .GetPortionAccessor()
+ .PrepareForAssemble(*blobSchema, columns->GetFilteredSchemaVerified(), MutableStageData().MutableBlobs(), ss)
+ .AssembleToGeneralContainer(sequential ? columns->GetColumnIds() : std::set<ui32>())
+ .DetachResult();
+
+ MutableStageData().AddBatch(batch);
+}
+
+namespace {
+class TPortionAccessorFetchingSubscriber: public IDataAccessorRequestsSubscriber {
+private:
+ TFetchingScriptCursor Step;
+ std::shared_ptr<IDataSource> Source;
+ virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override {
+ AFL_VERIFY(!result.HasErrors());
+ AFL_VERIFY(result.GetPortions().size() == 1)("count", result.GetPortions().size());
+ Source->MutableStageData().SetPortionAccessor(std::move(result.ExtractPortionsVector().front()));
+ AFL_VERIFY(Step.Next());
+ auto task = std::make_shared<TStepAction>(Source, std::move(Step), Source->GetContext()->GetCommonContext()->GetScanActorId());
+ NConveyor::TScanServiceOperator::SendTaskToExecute(task);
+ }
+
+public:
+ TPortionAccessorFetchingSubscriber(const TFetchingScriptCursor& step, const std::shared_ptr<IDataSource>& source)
+ : Step(step)
+ , Source(source) {
+ }
+};
+
+} // namespace
+
+bool TPortionDataSource::DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) {
+ AFL_VERIFY(!StageData->HasPortionAccessor());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step.GetName())("fetching_info", step.DebugString());
+
+ std::shared_ptr<TDataAccessorsRequest> request = std::make_shared<TDataAccessorsRequest>();
+ request->AddPortion(Portion);
+ request->RegisterSubscriber(std::make_shared<TPortionAccessorFetchingSubscriber>(step, sourcePtr));
+ GetContext()->GetCommonContext()->GetDataAccessorsManager()->AskData(request);
+ return true;
+}
+
+TPortionDataSource::TPortionDataSource(
+ const ui32 sourceIdx, const std::shared_ptr<TPortionInfo>& portion, const std::shared_ptr<TSpecialReadContext>& context)
+ : TBase(portion->GetPortionId(), sourceIdx, context, portion->IndexKeyStart(), portion->IndexKeyEnd(),
+ portion->RecordSnapshotMin(TSnapshot::Zero()), portion->RecordSnapshotMax(TSnapshot::Zero()), portion->GetRecordsCount(),
+ portion->GetShardingVersionOptional(), portion->GetMeta().GetDeletionsCount())
+ , Portion(portion)
+ , Schema(GetContext()->GetReadMetadata()->GetLoadSchemaVerified(*portion))
+ , SourceGroupGuard(NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(
+ GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId())) {
+}
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h
new file mode 100644
index 0000000000..91c4533cb7
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h
@@ -0,0 +1,436 @@
+#pragma once
+#include "columns_set.h"
+#include "context.h"
+#include "fetched_data.h"
+
+#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <ydb/core/formats/arrow/reader/position.h>
+#include <ydb/core/tx/columnshard/blob.h>
+#include <ydb/core/tx/columnshard/blobs_action/abstract/action.h>
+#include <ydb/core/tx/columnshard/common/snapshot.h>
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+#include <ydb/core/tx/columnshard/engines/predicate/range.h>
+#include <ydb/core/tx/columnshard/engines/scheme/versions/filtered_scheme.h>
+#include <ydb/core/tx/columnshard/resource_subscriber/task.h>
+#include <ydb/core/tx/limiter/grouped_memory/usage/abstract.h>
+
+#include <util/string/join.h>
+
+namespace NKikimr::NOlap {
+class IDataReader;
+}
+
+namespace NKikimr::NOlap::NReader::NSimple {
+
+class TFetchingInterval;
+class TPlainReadData;
+class IFetchTaskConstructor;
+class IFetchingStep;
+
+class TPortionPage {
+private:
+ YDB_READONLY(ui32, StartIndex, 0);
+ YDB_READONLY(ui32, RecordsCount, 0);
+ YDB_READONLY(ui64, MemoryBytes, 0);
+ YDB_ACCESSOR_DEF(std::shared_ptr<arrow::Table>, Result);
+
+public:
+ TPortionPage(const ui32 startIndex, const ui32 recordsCount, const ui64 memoryBytes)
+ : StartIndex(startIndex)
+ , RecordsCount(recordsCount)
+ , MemoryBytes(memoryBytes)
+ {
+
+ }
+};
+
+class IDataSource: public ICursorEntity {
+private:
+ YDB_READONLY(ui32, SourceId, 0);
+ YDB_READONLY(ui32, SourceIdx, 0);
+ YDB_READONLY_DEF(NArrow::NMerger::TSortableBatchPosition, Start);
+ YDB_READONLY_DEF(NArrow::NMerger::TSortableBatchPosition, Finish);
+ NArrow::TReplaceKey StartReplaceKey;
+ NArrow::TReplaceKey FinishReplaceKey;
+ YDB_READONLY_DEF(std::shared_ptr<TSpecialReadContext>, Context);
+ YDB_READONLY(TSnapshot, RecordSnapshotMin, TSnapshot::Zero());
+ YDB_READONLY(TSnapshot, RecordSnapshotMax, TSnapshot::Zero());
+ YDB_READONLY(ui32, RecordsCount, 0);
+ YDB_READONLY_DEF(std::optional<ui64>, ShardingVersionOptional);
+ YDB_READONLY(bool, HasDeletions, false);
+ virtual NJson::TJsonValue DoDebugJson() const = 0;
+ std::shared_ptr<TFetchingScript> FetchingPlan;
+ std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>> ResourceGuards;
+ YDB_READONLY(TPKRangeFilter::EUsageClass, UsageClass, TPKRangeFilter::EUsageClass::PartialUsage);
+ bool ProcessingStarted = false;
+ bool IsStartedByCursor = false;
+
+ virtual ui64 DoGetEntityId() const override {
+ return SourceId;
+ }
+
+ virtual ui64 DoGetEntityRecordsCount() const override {
+ return RecordsCount;
+ }
+
+ std::optional<TFetchingScriptCursor> ScriptCursor;
+
+protected:
+ std::optional<bool> IsSourceInMemoryFlag;
+
+ std::unique_ptr<TFetchedData> StageData;
+ std::unique_ptr<TFetchedResult> StageResult;
+
+ virtual bool DoStartFetchingColumns(
+ const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) = 0;
+ virtual bool DoStartFetchingIndexes(
+ const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step, const std::shared_ptr<TIndexesSet>& indexes) = 0;
+ virtual void DoAssembleColumns(const std::shared_ptr<TColumnsSet>& columns, const bool sequential) = 0;
+ virtual void DoAbort() = 0;
+ virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) = 0;
+ virtual NJson::TJsonValue DoDebugJsonForMemory() const {
+ return NJson::JSON_MAP;
+ }
+ virtual bool DoAddTxConflict() = 0;
+ virtual bool DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) = 0;
+
+public:
+ virtual ui64 GetMemoryGroupId() const = 0;
+ bool GetIsStartedByCursor() const {
+ return IsStartedByCursor;
+ }
+
+ void SetIsStartedByCursor() {
+ IsStartedByCursor = true;
+ }
+
+ void SetCursor(const TFetchingScriptCursor& scriptCursor) {
+ AFL_VERIFY(!ScriptCursor);
+ ScriptCursor = scriptCursor;
+ }
+
+ void ContinueCursor(const std::shared_ptr<IDataSource>& sourcePtr);
+
+ class TCompareStartForScanSequence {
+ public:
+ bool operator()(const std::shared_ptr<IDataSource>& l, const std::shared_ptr<IDataSource>& r) const {
+ const std::partial_ordering compareResult = l->GetStart().Compare(r->GetStart());
+ if (compareResult == std::partial_ordering::equivalent) {
+ return l->GetSourceId() < r->GetSourceId();
+ } else {
+ return compareResult == std::partial_ordering::less;
+ }
+ };
+ };
+
+ class TCompareFinishForScanSequence {
+ public:
+ bool operator()(const std::shared_ptr<IDataSource>& l, const std::shared_ptr<IDataSource>& r) const {
+ const std::partial_ordering compareResult = l->GetFinish().Compare(r->GetFinish());
+ if (compareResult == std::partial_ordering::equivalent) {
+ return l->GetSourceId() < r->GetSourceId();
+ } else {
+ return compareResult == std::partial_ordering::less;
+ }
+ };
+ };
+
+ virtual std::shared_ptr<arrow::RecordBatch> GetStartPKRecordBatch() const = 0;
+
+ void StartProcessing(const std::shared_ptr<IDataSource>& sourcePtr);
+ virtual ui64 PredictAccessorsSize() const = 0;
+
+ bool StartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) {
+ return DoStartFetchingAccessor(sourcePtr, step);
+ }
+
+ bool AddTxConflict() {
+ if (!Context->GetCommonContext()->HasLock()) {
+ return false;
+ }
+ if (DoAddTxConflict()) {
+ StageData->Clear();
+ return true;
+ }
+ return false;
+ }
+
+ ui64 GetResourceGuardsMemory() const {
+ ui64 result = 0;
+ for (auto&& i : ResourceGuards) {
+ result += i->GetMemory();
+ }
+ return result;
+ }
+ void RegisterAllocationGuard(const std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>& guard) {
+ ResourceGuards.emplace_back(guard);
+ }
+ bool IsSourceInMemory() const {
+ AFL_VERIFY(IsSourceInMemoryFlag);
+ return *IsSourceInMemoryFlag;
+ }
+ void SetSourceInMemory(const bool value) {
+ AFL_VERIFY(!IsSourceInMemoryFlag);
+ IsSourceInMemoryFlag = value;
+ AFL_VERIFY(StageData);
+ if (!value) {
+ StageData->SetUseFilter(value);
+ }
+ }
+ virtual THashMap<TChunkAddress, TString> DecodeBlobAddresses(NBlobOperations::NRead::TCompositeReadBlobs&& blobsOriginal) const = 0;
+
+ virtual ui64 GetPathId() const = 0;
+ virtual bool HasIndexes(const std::set<ui32>& indexIds) const = 0;
+
+ const NArrow::TReplaceKey& GetStartReplaceKey() const {
+ return StartReplaceKey;
+ }
+ const NArrow::TReplaceKey& GetFinishReplaceKey() const {
+ return FinishReplaceKey;
+ }
+
+ bool HasStageResult() const {
+ return !!StageResult;
+ }
+
+ const TFetchedResult& GetStageResult() const {
+ AFL_VERIFY(!!StageResult);
+ return *StageResult;
+ }
+
+ TFetchedResult& MutableStageResult() {
+ AFL_VERIFY(!!StageResult);
+ return *StageResult;
+ }
+
+ void Finalize(const std::optional<ui64> memoryLimit) {
+ AFL_VERIFY(!StageResult);
+ AFL_VERIFY(StageData);
+ TMemoryProfileGuard mpg("SCAN_PROFILE::STAGE_RESULT", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY));
+
+ const auto accessor = StageData->GetPortionAccessor();
+ StageResult = std::make_unique<TFetchedResult>(std::move(StageData));
+ if (memoryLimit) {
+ StageResult->SetPages(accessor.BuildReadPages(*memoryLimit, GetContext()->GetProgramInputColumns()->GetColumnIds()));
+ } else {
+ StageResult->SetPages({ TPortionDataAccessor::TReadPage(0, GetRecordsCount(), 0) });
+ }
+ }
+
+ void ApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) {
+ return DoApplyIndex(indexMeta);
+ }
+
+ void AssembleColumns(const std::shared_ptr<TColumnsSet>& columns, const bool sequential = false) {
+ if (columns->IsEmpty()) {
+ return;
+ }
+ DoAssembleColumns(columns, sequential);
+ }
+
+ bool StartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) {
+ return DoStartFetchingColumns(sourcePtr, step, columns);
+ }
+
+ bool StartFetchingIndexes(
+ const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step, const std::shared_ptr<TIndexesSet>& indexes) {
+ AFL_VERIFY(indexes);
+ return DoStartFetchingIndexes(sourcePtr, step, indexes);
+ }
+ void InitFetchingPlan(const std::shared_ptr<TFetchingScript>& fetching);
+
+ std::shared_ptr<arrow::RecordBatch> GetLastPK() const {
+ return Finish.BuildSortingCursor().ExtractSortingPosition(Finish.GetSortFields());
+ }
+
+ virtual ui64 GetColumnsVolume(const std::set<ui32>& columnIds, const EMemType type) const = 0;
+
+ virtual ui64 GetColumnRawBytes(const std::set<ui32>& columnIds) const = 0;
+ virtual ui64 GetIndexRawBytes(const std::set<ui32>& indexIds) const = 0;
+ virtual ui64 GetColumnBlobBytes(const std::set<ui32>& columnsIds) const = 0;
+
+ void Abort() {
+ DoAbort();
+ }
+
+ NJson::TJsonValue DebugJsonForMemory() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("details", DoDebugJsonForMemory());
+ result.InsertValue("count", RecordsCount);
+ return result;
+ }
+
+ NJson::TJsonValue DebugJson() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("source_idx", SourceIdx);
+ result.InsertValue("start", Start.DebugJson());
+ result.InsertValue("finish", Finish.DebugJson());
+ result.InsertValue("specific", DoDebugJson());
+ return result;
+ }
+
+ bool OnIntervalFinished(const ui32 intervalIdx);
+
+ void OnEmptyStageData() {
+ if (!ResourceGuards.size()) {
+ return;
+ }
+ ResourceGuards.back()->Update(0);
+ Finalize(std::nullopt);
+ }
+
+ const TFetchedData& GetStageData() const {
+ AFL_VERIFY(StageData);
+ return *StageData;
+ }
+
+ TFetchedData& MutableStageData() {
+ AFL_VERIFY(StageData);
+ return *StageData;
+ }
+
+ IDataSource(const ui32 sourceId, const ui32 sourceIdx, const std::shared_ptr<TSpecialReadContext>& context,
+ const NArrow::TReplaceKey& start,
+ const NArrow::TReplaceKey& finish, const TSnapshot& recordSnapshotMin, const TSnapshot& recordSnapshotMax, const ui32 recordsCount,
+ const std::optional<ui64> shardingVersion, const bool hasDeletions)
+ : SourceId(sourceId)
+ , SourceIdx(sourceIdx)
+ , Start(context->GetReadMetadata()->BuildSortedPosition(start))
+ , Finish(context->GetReadMetadata()->BuildSortedPosition(finish))
+ , StartReplaceKey(start)
+ , FinishReplaceKey(finish)
+ , Context(context)
+ , RecordSnapshotMin(recordSnapshotMin)
+ , RecordSnapshotMax(recordSnapshotMax)
+ , RecordsCount(recordsCount)
+ , ShardingVersionOptional(shardingVersion)
+ , HasDeletions(hasDeletions) {
+ StageData = std::make_unique<TFetchedData>(true);
+ UsageClass = Context->GetReadMetadata()->GetPKRangesFilter().IsPortionInPartialUsage(GetStartReplaceKey(), GetFinishReplaceKey());
+ AFL_VERIFY(UsageClass != TPKRangeFilter::EUsageClass::DontUsage);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portions_for_merge")("start", Start.DebugJson())("finish", Finish.DebugJson());
+ if (Start.IsReverseSort()) {
+ std::swap(Start, Finish);
+ }
+ Y_ABORT_UNLESS(Start.Compare(Finish) != std::partial_ordering::greater);
+ }
+
+ virtual ~IDataSource() = default;
+};
+
+class TPortionDataSource: public IDataSource {
+private:
+ using TBase = IDataSource;
+ const TPortionInfo::TConstPtr Portion;
+ std::shared_ptr<ISnapshotSchema> Schema;
+ const std::shared_ptr<NGroupedMemoryManager::TGroupGuard> SourceGroupGuard;
+
+ void NeedFetchColumns(const std::set<ui32>& columnIds, TBlobsAction& blobsAction,
+ THashMap<TChunkAddress, TPortionDataAccessor::TAssembleBlobInfo>& nullBlocks, const std::shared_ptr<NArrow::TColumnFilter>& filter);
+
+ virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexChecker) override;
+ virtual bool DoStartFetchingColumns(
+ const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) override;
+ virtual bool DoStartFetchingIndexes(
+ const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step, const std::shared_ptr<TIndexesSet>& indexes) override;
+ virtual void DoAssembleColumns(const std::shared_ptr<TColumnsSet>& columns, const bool sequential) override;
+ virtual NJson::TJsonValue DoDebugJson() const override {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("type", "portion");
+ result.InsertValue("info", Portion->DebugString());
+ result.InsertValue("commit", Portion->GetCommitSnapshotOptional().value_or(TSnapshot::Zero()).DebugString());
+ result.InsertValue("insert", (ui64)Portion->GetInsertWriteIdOptional().value_or(TInsertWriteId(0)));
+ return result;
+ }
+
+ virtual NJson::TJsonValue DoDebugJsonForMemory() const override {
+ NJson::TJsonValue result = TBase::DoDebugJsonForMemory();
+ if (GetStageData().HasPortionAccessor()) {
+ auto columns = GetStageData().GetPortionAccessor().GetColumnIds();
+ // result.InsertValue("sequential_columns", JoinSeq(",", SequentialEntityIds));
+ result.InsertValue("in_mem", GetStageData().GetPortionAccessor().GetColumnRawBytes(columns, false));
+ result.InsertValue("columns_in_mem", JoinSeq(",", columns));
+ }
+ result.InsertValue("portion_id", Portion->GetPortionId());
+ result.InsertValue("raw", Portion->GetTotalRawBytes());
+ result.InsertValue("blob", Portion->GetTotalBlobBytes());
+ result.InsertValue("read_memory", GetColumnRawBytes(GetStageData().GetPortionAccessor().GetColumnIds()));
+ return result;
+ }
+ virtual void DoAbort() override;
+ virtual ui64 GetPathId() const override {
+ return Portion->GetPathId();
+ }
+
+ virtual bool DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) override;
+
+public:
+ virtual ui64 GetMemoryGroupId() const override {
+ return SourceGroupGuard->GetGroupId();
+ }
+
+ virtual ui64 PredictAccessorsSize() const override {
+ return Portion->GetApproxChunksCount(GetContext()->GetCommonContext()->GetReadMetadata()->GetResultSchema()->GetColumnsCount()) * sizeof(TColumnRecord);
+ }
+
+ virtual std::shared_ptr<arrow::RecordBatch> GetStartPKRecordBatch() const override {
+ return Portion->GetMeta().GetFirstLastPK().GetBatch()->Slice(0, 1);
+ }
+
+ virtual bool DoAddTxConflict() override {
+ if (Portion->HasCommitSnapshot() || !Portion->HasInsertWriteId()) {
+ GetContext()->GetReadMetadata()->SetBrokenWithCommitted();
+ return true;
+ } else if (!GetContext()->GetReadMetadata()->IsMyUncommitted(Portion->GetInsertWriteIdVerified())) {
+ GetContext()->GetReadMetadata()->SetConflictedWriteId(Portion->GetInsertWriteIdVerified());
+ return true;
+ }
+ return false;
+ }
+
+ virtual bool HasIndexes(const std::set<ui32>& indexIds) const override {
+ return Schema->GetIndexInfo().HasIndexes(indexIds);
+ }
+
+ virtual THashMap<TChunkAddress, TString> DecodeBlobAddresses(NBlobOperations::NRead::TCompositeReadBlobs&& blobsOriginal) const override {
+ return GetStageData().GetPortionAccessor().DecodeBlobAddresses(std::move(blobsOriginal), Schema->GetIndexInfo());
+ }
+
+ virtual ui64 GetColumnsVolume(const std::set<ui32>& columnIds, const EMemType type) const override {
+ AFL_VERIFY(columnIds.size());
+ switch (type) {
+ case EMemType::Raw:
+ return GetStageData().GetPortionAccessor().GetColumnRawBytes(columnIds, false);
+ case EMemType::Blob:
+ return GetStageData().GetPortionAccessor().GetColumnBlobBytes(columnIds, false);
+ case EMemType::RawSequential:
+ return GetStageData().GetPortionAccessor().GetMinMemoryForReadColumns(columnIds);
+ }
+ }
+
+ virtual ui64 GetColumnRawBytes(const std::set<ui32>& columnsIds) const override {
+ AFL_VERIFY(columnsIds.size());
+ return GetStageData().GetPortionAccessor().GetColumnRawBytes(columnsIds, false);
+ }
+
+ virtual ui64 GetColumnBlobBytes(const std::set<ui32>& columnsIds) const override {
+ return GetStageData().GetPortionAccessor().GetColumnBlobBytes(columnsIds, false);
+ }
+
+ virtual ui64 GetIndexRawBytes(const std::set<ui32>& indexIds) const override {
+ return Portion->GetTotalRawBytes();
+ return GetStageData().GetPortionAccessor().GetIndexRawBytes(indexIds, false);
+ }
+
+ const TPortionInfo& GetPortionInfo() const {
+ return *Portion;
+ }
+
+ const TPortionInfo::TConstPtr& GetPortionInfoPtr() const {
+ return Portion;
+ }
+
+ TPortionDataSource(const ui32 sourceIdx, const std::shared_ptr<TPortionInfo>& portion, const std::shared_ptr<TSpecialReadContext>& context);
+};
+
+} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make
new file mode 100644
index 0000000000..e6fb9d623a
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make
@@ -0,0 +1,24 @@
+LIBRARY()
+
+SRCS(
+ scanner.cpp
+ constructor.cpp
+ source.cpp
+ fetched_data.cpp
+ plain_read_data.cpp
+ columns_set.cpp
+ context.cpp
+ fetching.cpp
+ iterator.cpp
+)
+
+PEERDIR(
+ ydb/core/formats/arrow
+ ydb/core/tx/columnshard/blobs_action
+ ydb/core/tx/conveyor/usage
+ ydb/core/tx/limiter/grouped_memory/usage
+)
+
+GENERATE_ENUM_SERIALIZATION(columns_set.h)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/ya.make b/ydb/core/tx/columnshard/engines/reader/simple_reader/ya.make
new file mode 100644
index 0000000000..6926bde358
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/ya.make
@@ -0,0 +1,11 @@
+LIBRARY()
+
+SRCS(
+)
+
+PEERDIR(
+ ydb/core/tx/columnshard/engines/reader/simple_reader/constructor
+ ydb/core/tx/columnshard/engines/reader/simple_reader/iterator
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/iterator.h b/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/iterator.h
index 72ffbdda77..a44b43beec 100644
--- a/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/iterator.h
+++ b/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/iterator.h
@@ -68,7 +68,7 @@ public:
continue;
}
auto table = NArrow::TStatusValidator::GetValid(arrow::Table::FromRecordBatches({resultBatch}));
- return std::make_shared<TPartialReadResult>(table, lastKey, std::nullopt);
+ return std::make_shared<TPartialReadResult>(table, std::make_shared<TPlainScanCursor>(lastKey), std::nullopt);
}
return std::shared_ptr<TPartialReadResult>();
}
diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/metadata.h b/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/metadata.h
index c5068be3c8..7a9ee6bd36 100644
--- a/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/metadata.h
@@ -18,7 +18,7 @@ public:
explicit TReadStatsMetadata(const std::shared_ptr<TVersionedIndex>& info, ui64 tabletId, const ESorting sorting,
const TProgramContainer& ssaProgram, const std::shared_ptr<ISnapshotSchema>& schema, const TSnapshot& requestSnapshot)
- : TBase(info, sorting, ssaProgram, schema, requestSnapshot)
+ : TBase(info, sorting, ssaProgram, schema, requestSnapshot, nullptr)
, TabletId(tabletId) {
}
};
diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/policy.h b/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/policy.h
index 72c5281455..dc277d4981 100644
--- a/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/policy.h
+++ b/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/policy.h
@@ -10,7 +10,7 @@ namespace NKikimr::NOlap::NReader::NSysView::NAbstract {
class ISysViewPolicy {
private:
- virtual std::unique_ptr<IScannerConstructor> DoCreateConstructor(const TSnapshot& snapshot, const ui64 itemsLimit, const bool reverse) const = 0;
+ virtual std::unique_ptr<IScannerConstructor> DoCreateConstructor(const TScannerConstructorContext& request) const = 0;
virtual std::shared_ptr<IMetadataFiller> DoCreateMetadataFiller() const = 0;
public:
virtual ~ISysViewPolicy() = default;
@@ -24,8 +24,8 @@ public:
AFL_VERIFY(!!result);
return result;
}
- std::unique_ptr<IScannerConstructor> CreateConstructor(const TSnapshot& snapshot, const ui64 itemsLimit, const bool reverse) const {
- auto result = DoCreateConstructor(snapshot, itemsLimit, reverse);
+ std::unique_ptr<IScannerConstructor> CreateConstructor(const TScannerConstructorContext& request) const {
+ auto result = DoCreateConstructor(request);
AFL_VERIFY(!!result);
return result;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h
index b932e86533..1808888ff6 100644
--- a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h
+++ b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h
@@ -153,9 +153,8 @@ public:
class TStoreSysViewPolicy: public NAbstract::ISysViewPolicy {
protected:
- virtual std::unique_ptr<IScannerConstructor> DoCreateConstructor(
- const TSnapshot& snapshot, const ui64 itemsLimit, const bool reverse) const override {
- return std::make_unique<TConstructor>(snapshot, itemsLimit, reverse);
+ virtual std::unique_ptr<IScannerConstructor> DoCreateConstructor(const TScannerConstructorContext& request) const override {
+ return std::make_unique<TConstructor>(request);
}
virtual std::shared_ptr<NAbstract::IMetadataFiller> DoCreateMetadataFiller() const override {
return std::make_shared<NAbstract::TMetadataFromStore>();
@@ -168,9 +167,8 @@ public:
class TTableSysViewPolicy: public NAbstract::ISysViewPolicy {
protected:
- virtual std::unique_ptr<IScannerConstructor> DoCreateConstructor(
- const TSnapshot& snapshot, const ui64 itemsLimit, const bool reverse) const override {
- return std::make_unique<TConstructor>(snapshot, itemsLimit, reverse);
+ virtual std::unique_ptr<IScannerConstructor> DoCreateConstructor(const TScannerConstructorContext& request) const override {
+ return std::make_unique<TConstructor>(request);
}
virtual std::shared_ptr<NAbstract::IMetadataFiller> DoCreateMetadataFiller() const override {
return std::make_shared<NAbstract::TMetadataFromTable>();
diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/constructor/constructor.h b/ydb/core/tx/columnshard/engines/reader/sys_view/constructor/constructor.h
index 02b3220a95..64ef291fc8 100644
--- a/ydb/core/tx/columnshard/engines/reader/sys_view/constructor/constructor.h
+++ b/ydb/core/tx/columnshard/engines/reader/sys_view/constructor/constructor.h
@@ -13,6 +13,10 @@ class TStatScannerConstructor: public IScannerConstructor {
private:
using TBase = IScannerConstructor;
+ virtual std::shared_ptr<IScanCursor> DoBuildCursor() const override {
+ return nullptr;
+ }
+
virtual std::shared_ptr<NAbstract::TReadStatsMetadata> BuildMetadata(const NColumnShard::TColumnShard* self, const TReadDescription& read) const = 0;
virtual TConclusion<std::shared_ptr<TReadMetadataBase>> DoBuildReadMetadata(const NColumnShard::TColumnShard* self, const TReadDescription& read) const override {
diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/granules/granules.h b/ydb/core/tx/columnshard/engines/reader/sys_view/granules/granules.h
index 8effbf9b66..ec0ed83714 100644
--- a/ydb/core/tx/columnshard/engines/reader/sys_view/granules/granules.h
+++ b/ydb/core/tx/columnshard/engines/reader/sys_view/granules/granules.h
@@ -41,8 +41,8 @@ public:
class TStoreSysViewPolicy: public NAbstract::ISysViewPolicy {
protected:
- virtual std::unique_ptr<IScannerConstructor> DoCreateConstructor(const TSnapshot& snapshot, const ui64 itemsLimit, const bool reverse) const override {
- return std::make_unique<TConstructor>(snapshot, itemsLimit, reverse);
+ virtual std::unique_ptr<IScannerConstructor> DoCreateConstructor(const TScannerConstructorContext& request) const override {
+ return std::make_unique<TConstructor>(request);
}
virtual std::shared_ptr<NAbstract::IMetadataFiller> DoCreateMetadataFiller() const override {
return std::make_shared<NAbstract::TMetadataFromStore>();
@@ -54,8 +54,8 @@ public:
class TTableSysViewPolicy: public NAbstract::ISysViewPolicy {
protected:
- virtual std::unique_ptr<IScannerConstructor> DoCreateConstructor(const TSnapshot& snapshot, const ui64 itemsLimit, const bool reverse) const override {
- return std::make_unique<TConstructor>(snapshot, itemsLimit, reverse);
+ virtual std::unique_ptr<IScannerConstructor> DoCreateConstructor(const TScannerConstructorContext& request) const override {
+ return std::make_unique<TConstructor>(request);
}
virtual std::shared_ptr<NAbstract::IMetadataFiller> DoCreateMetadataFiller() const override {
return std::make_shared<NAbstract::TMetadataFromTable>();
diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/optimizer/optimizer.h b/ydb/core/tx/columnshard/engines/reader/sys_view/optimizer/optimizer.h
index c442c46242..6c69bece0f 100644
--- a/ydb/core/tx/columnshard/engines/reader/sys_view/optimizer/optimizer.h
+++ b/ydb/core/tx/columnshard/engines/reader/sys_view/optimizer/optimizer.h
@@ -63,8 +63,8 @@ public:
class TStoreSysViewPolicy: public NAbstract::ISysViewPolicy {
protected:
- virtual std::unique_ptr<IScannerConstructor> DoCreateConstructor(const TSnapshot& snapshot, const ui64 itemsLimit, const bool reverse) const override {
- return std::make_unique<TConstructor>(snapshot, itemsLimit, reverse);
+ virtual std::unique_ptr<IScannerConstructor> DoCreateConstructor(const TScannerConstructorContext& request) const override {
+ return std::make_unique<TConstructor>(request);
}
virtual std::shared_ptr<NAbstract::IMetadataFiller> DoCreateMetadataFiller() const override {
return std::make_shared<TMetadataFromStore>();
@@ -76,8 +76,8 @@ public:
class TTableSysViewPolicy: public NAbstract::ISysViewPolicy {
protected:
- virtual std::unique_ptr<IScannerConstructor> DoCreateConstructor(const TSnapshot& snapshot, const ui64 itemsLimit, const bool reverse) const override {
- return std::make_unique<TConstructor>(snapshot, itemsLimit, reverse);
+ virtual std::unique_ptr<IScannerConstructor> DoCreateConstructor(const TScannerConstructorContext& request) const override {
+ return std::make_unique<TConstructor>(request);
}
virtual std::shared_ptr<NAbstract::IMetadataFiller> DoCreateMetadataFiller() const override {
return std::make_shared<TMetadataFromTable>();
diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/portions/portions.h b/ydb/core/tx/columnshard/engines/reader/sys_view/portions/portions.h
index 82cf42beff..9f5fd67fb8 100644
--- a/ydb/core/tx/columnshard/engines/reader/sys_view/portions/portions.h
+++ b/ydb/core/tx/columnshard/engines/reader/sys_view/portions/portions.h
@@ -38,8 +38,8 @@ public:
class TStoreSysViewPolicy: public NAbstract::ISysViewPolicy {
protected:
- virtual std::unique_ptr<IScannerConstructor> DoCreateConstructor(const TSnapshot& snapshot, const ui64 itemsLimit, const bool reverse) const override {
- return std::make_unique<TConstructor>(snapshot, itemsLimit, reverse);
+ virtual std::unique_ptr<IScannerConstructor> DoCreateConstructor(const TScannerConstructorContext& request) const override {
+ return std::make_unique<TConstructor>(request);
}
virtual std::shared_ptr<NAbstract::IMetadataFiller> DoCreateMetadataFiller() const override {
return std::make_shared<NAbstract::TMetadataFromStore>();
@@ -51,8 +51,8 @@ public:
class TTableSysViewPolicy: public NAbstract::ISysViewPolicy {
protected:
- virtual std::unique_ptr<IScannerConstructor> DoCreateConstructor(const TSnapshot& snapshot, const ui64 itemsLimit, const bool reverse) const override {
- return std::make_unique<TConstructor>(snapshot, itemsLimit, reverse);
+ virtual std::unique_ptr<IScannerConstructor> DoCreateConstructor(const TScannerConstructorContext& request) const override {
+ return std::make_unique<TConstructor>(request);
}
virtual std::shared_ptr<NAbstract::IMetadataFiller> DoCreateMetadataFiller() const override {
return std::make_shared<NAbstract::TMetadataFromTable>();
diff --git a/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp b/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp
index 17b7c8497f..eab1651e5f 100644
--- a/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp
@@ -38,13 +38,13 @@ void TTxInternalScan::Complete(const TActorContext& ctx) {
const NActors::TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build()("tablet", Self->TabletID())("snapshot", snapshot.DebugString());
TReadMetadataPtr readMetadataRange;
+ TScannerConstructorContext context(snapshot, 0, request.GetReverse());
{
TReadDescription read(snapshot, request.GetReverse());
read.PathId = request.GetPathId();
read.LockId = LockId;
read.ReadNothing = !Self->TablesManager.HasTable(read.PathId);
- std::unique_ptr<IScannerConstructor> scannerConstructor(
- new NPlain::TIndexScannerConstructor(snapshot, request.GetItemsLimit(), request.GetReverse()));
+ std::unique_ptr<IScannerConstructor> scannerConstructor(new NPlain::TIndexScannerConstructor(context));
read.ColumnIds = request.GetColumnIds();
read.ColumnNames = request.GetColumnNames();
if (request.RangesFilter) {
diff --git a/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp b/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp
index d3c1aa0f47..1e68265602 100644
--- a/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp
@@ -38,6 +38,7 @@ void TTxScan::Complete(const TActorContext& ctx) {
if (snapshot.IsZero()) {
snapshot = Self->GetLastTxSnapshot();
}
+ TScannerConstructorContext context(snapshot, request.HasItemsLimit() ? request.GetItemsLimit() : 0, request.GetReverse());
const auto scanId = request.GetScanId();
const ui64 txId = request.GetTxId();
const ui32 scanGen = request.GetGeneration();
@@ -62,17 +63,31 @@ void TTxScan::Complete(const TActorContext& ctx) {
read.PathId = request.GetLocalPathId();
read.ReadNothing = !Self->TablesManager.HasTable(read.PathId);
read.TableName = table;
- bool isIndex = false;
+ const TString defaultReader =
+ AppDataVerified().ColumnShardConfig.GetReaderClassName() ? AppDataVerified().ColumnShardConfig.GetReaderClassName() : "PLAIN";
std::unique_ptr<IScannerConstructor> scannerConstructor = [&]() {
- const ui64 itemsLimit = request.HasItemsLimit() ? request.GetItemsLimit() : 0;
auto sysViewPolicy = NSysView::NAbstract::ISysViewPolicy::BuildByPath(read.TableName);
- isIndex = !sysViewPolicy;
if (!sysViewPolicy) {
- return std::unique_ptr<IScannerConstructor>(new NPlain::TIndexScannerConstructor(snapshot, itemsLimit, request.GetReverse()));
+ auto constructor = NReader::IScannerConstructor::TFactory::MakeHolder(
+ request.GetCSScanPolicy() ? request.GetCSScanPolicy() : defaultReader, context);
+ if (!constructor) {
+ return std::unique_ptr<IScannerConstructor>();
+ }
+ return std::unique_ptr<IScannerConstructor>(constructor.Release());
} else {
- return sysViewPolicy->CreateConstructor(snapshot, itemsLimit, request.GetReverse());
+ return sysViewPolicy->CreateConstructor(context);
}
}();
+ if (!scannerConstructor) {
+ return SendError("cannot build scanner", AppDataVerified().ColumnShardConfig.GetReaderClassName(), ctx);
+ }
+ {
+ auto cursorConclusion = scannerConstructor->BuildCursorFromProto(request.GetScanCursor());
+ if (cursorConclusion.IsFail()) {
+ return SendError("cannot build scanner cursor", cursorConclusion.GetErrorMessage(), ctx);
+ }
+ read.SetScanCursor(cursorConclusion.DetachResult());
+ }
read.ColumnIds.assign(request.GetColumnTags().begin(), request.GetColumnTags().end());
read.StatsMode = request.GetStatsMode();
diff --git a/ydb/core/tx/columnshard/engines/reader/ya.make b/ydb/core/tx/columnshard/engines/reader/ya.make
index c1a5dbd873..c742412035 100644
--- a/ydb/core/tx/columnshard/engines/reader/ya.make
+++ b/ydb/core/tx/columnshard/engines/reader/ya.make
@@ -12,6 +12,7 @@ PEERDIR(
ydb/core/tx/columnshard/resources
ydb/core/tx/program
ydb/core/tx/columnshard/engines/reader/plain_reader
+ ydb/core/tx/columnshard/engines/reader/simple_reader
ydb/core/tx/columnshard/engines/reader/sys_view
ydb/core/tx/columnshard/engines/reader/abstract
ydb/core/tx/columnshard/engines/reader/common
diff --git a/ydb/core/tx/columnshard/export/actor/export_actor.h b/ydb/core/tx/columnshard/export/actor/export_actor.h
index ce4c4d517e..9089277daf 100644
--- a/ydb/core/tx/columnshard/export/actor/export_actor.h
+++ b/ydb/core/tx/columnshard/export/actor/export_actor.h
@@ -82,6 +82,7 @@ protected:
virtual void OnBootstrap(const TActorContext& /*ctx*/) override {
auto evStart = ExportSession->GetTask().GetSelector()->BuildRequestInitiator(ExportSession->GetCursor());
evStart->Record.SetGeneration((ui64)TabletId);
+ evStart->Record.SetCSScanPolicy("PLAIN");
Send(TabletActorId, evStart.release());
Become(&TActor::StateFunc);
}
diff --git a/ydb/core/tx/columnshard/operations/events.h b/ydb/core/tx/columnshard/operations/events.h
index d5c9ff925a..affceeb82b 100644
--- a/ydb/core/tx/columnshard/operations/events.h
+++ b/ydb/core/tx/columnshard/operations/events.h
@@ -34,6 +34,14 @@ private:
YDB_READONLY_DEF(std::vector<NOlap::TInsertWriteId>, InsertWriteIds);
public:
+ ui64 GetRecordsCount() const {
+ ui64 result = 0;
+ for (auto&& i : Portions) {
+ result += i.GetPKBatch()->num_rows();
+ }
+ return result;
+ }
+
const NEvWrite::TWriteMeta& GetWriteMeta() const {
return WriteMeta;
}
diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp
index 540900f881..934dafd0f7 100644
--- a/ydb/core/tx/columnshard/operations/write.cpp
+++ b/ydb/core/tx/columnshard/operations/write.cpp
@@ -45,7 +45,7 @@ void TWriteOperation::Start(
void TWriteOperation::CommitOnExecute(
TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) const {
- Y_ABORT_UNLESS(Status == EOperationStatus::Prepared);
+ Y_ABORT_UNLESS(Status == EOperationStatus::Prepared || InsertWriteIds.empty());
TBlobGroupSelector dsGroupSelector(owner.Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
@@ -55,8 +55,10 @@ void TWriteOperation::CommitOnExecute(
auto pathExists = [&](ui64 pathId) {
return owner.TablesManager.HasTable(pathId);
};
- const auto counters = owner.InsertTable->Commit(dbTable, snapshot.GetPlanStep(), snapshot.GetTxId(), insertWriteIds, pathExists);
- owner.Counters.GetTabletCounters()->OnWriteCommitted(counters);
+ if (insertWriteIds.size()) {
+ const auto counters = owner.InsertTable->Commit(dbTable, snapshot.GetPlanStep(), snapshot.GetTxId(), insertWriteIds, pathExists);
+ owner.Counters.GetTabletCounters()->OnWriteCommitted(counters);
+ }
} else {
for (auto&& i : InsertWriteIds) {
owner.MutableIndexAs<NOlap::TColumnEngineForLogs>().MutableGranuleVerified(PathId).CommitPortionOnExecute(txc, i, snapshot);
@@ -65,7 +67,7 @@ void TWriteOperation::CommitOnExecute(
}
void TWriteOperation::CommitOnComplete(TColumnShard& owner, const NOlap::TSnapshot& /*snapshot*/) const {
- Y_ABORT_UNLESS(Status == EOperationStatus::Prepared);
+ Y_ABORT_UNLESS(Status == EOperationStatus::Prepared || InsertWriteIds.empty());
if (!WritePortions) {
owner.UpdateInsertTableCounters();
} else {
diff --git a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp
index 38ef52ec8c..8e08a0fd33 100644
--- a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp
@@ -4,6 +4,9 @@
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
#include <ydb/core/tx/columnshard/engines/reader/sys_view/portions/portions.h>
#include <ydb/core/tx/columnshard/engines/storage/indexes/max/meta.h>
+#include <ydb/core/tx/data_events/common/modification_type.h>
+#include <ydb/core/tx/data_events/payload_helper.h>
+#include <ydb/core/protos/data_events.pb.h>
#include <ydb/core/base/tablet.h>
#include <ydb/core/base/tablet_resolver.h>
@@ -96,26 +99,29 @@ void PlanWriteTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSna
ui32 WaitWriteResult(TTestBasicRuntime& runtime, ui64 shardId, std::vector<ui64>* writeIds) {
TAutoPtr<IEventHandle> handle;
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvWriteResult>(handle);
+ auto event = runtime.GrabEdgeEvent<NEvents::TDataEvents::TEvWriteResult>(handle);
UNIT_ASSERT(event);
- auto& resWrite = Proto(event);
+ auto& resWrite = event->Record;
UNIT_ASSERT_EQUAL(resWrite.GetOrigin(), shardId);
- UNIT_ASSERT_EQUAL(resWrite.GetTxInitiator(), 0);
- if (writeIds && resWrite.GetStatus() == NKikimrTxColumnShard::EResultStatus::SUCCESS) {
- writeIds->push_back(resWrite.GetWriteId());
+ if (writeIds && resWrite.GetStatus() == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED) {
+ writeIds->push_back(resWrite.GetTxId());
}
return resWrite.GetStatus();
}
-bool WriteDataImpl(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId, const ui64 tableId,
- const NLongTxService::TLongTxId& longTxId, const ui64 writeId,
- const TString& data, const std::shared_ptr<arrow::Schema>& schema, std::vector<ui64>* writeIds, const NEvWrite::EModificationType mType) {
+bool WriteDataImpl(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId, const ui64 tableId, const ui64 writeId,
+ const TString& data, const std::shared_ptr<arrow::Schema>& schema, std::vector<ui64>* writeIds, const NEvWrite::EModificationType mType, const ui64 lockId) {
const TString dedupId = ToString(writeId);
- auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, longTxId, tableId, dedupId, data, writeId, mType);
- Y_ABORT_UNLESS(schema);
- write->SetArrowSchema(NArrow::SerializeSchema(*schema));
+ auto write = std::make_unique<NEvents::TDataEvents::TEvWrite>(writeId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
+ write->SetLockId(lockId, 1);
+ auto& operation = write->AddOperation(TEnumOperator<NEvWrite::EModificationType>::SerializeToWriteProto(mType), TTableId(0, tableId, 1), {},
+ 0, NKikimrDataEvents::FORMAT_ARROW);
+ *operation.MutablePayloadSchema() = NArrow::SerializeSchema(*schema);
+ NEvWrite::TPayloadWriter<NEvents::TDataEvents::TEvWrite> writer(*write);
+ auto dataCopy = data;
+ writer.AddDataToPayload(std::move(dataCopy));
ForwardToTablet(runtime, shardId, sender, write.release());
if (writeIds) {
@@ -125,25 +131,21 @@ bool WriteDataImpl(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shar
}
bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId, const ui64 writeId, const ui64 tableId, const TString& data,
- const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, std::vector<ui64>* writeIds, const NEvWrite::EModificationType mType) {
- NLongTxService::TLongTxId longTxId;
- UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1"));
- return WriteDataImpl(
- runtime, sender, shardId, tableId, longTxId, writeId, data, NArrow::MakeArrowSchema(ydbSchema), writeIds, mType);
+ const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, std::vector<ui64>* writeIds, const NEvWrite::EModificationType mType,
+ const ui64 lockId) {
+ return WriteDataImpl(runtime, sender, shardId, tableId, writeId, data, NArrow::MakeArrowSchema(ydbSchema), writeIds, mType, lockId);
}
bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 writeId, const ui64 tableId, const TString& data,
const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, bool waitResult, std::vector<ui64>* writeIds,
- const NEvWrite::EModificationType mType) {
- NLongTxService::TLongTxId longTxId;
- UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1"));
+ const NEvWrite::EModificationType mType, const ui64 lockId) {
if (writeIds) {
- return WriteDataImpl(runtime, sender, TTestTxConfig::TxTablet0, tableId, longTxId, writeId, data,
- NArrow::MakeArrowSchema(ydbSchema), writeIds, mType);
+ return WriteDataImpl(
+ runtime, sender, TTestTxConfig::TxTablet0, tableId, writeId, data, NArrow::MakeArrowSchema(ydbSchema), writeIds, mType, lockId);
}
std::vector<ui64> ids;
- return WriteDataImpl(runtime, sender, TTestTxConfig::TxTablet0, tableId, longTxId, writeId, data,
- NArrow::MakeArrowSchema(ydbSchema), waitResult ? &ids : nullptr, mType);
+ return WriteDataImpl(runtime, sender, TTestTxConfig::TxTablet0, tableId, writeId, data, NArrow::MakeArrowSchema(ydbSchema),
+ waitResult ? &ids : nullptr, mType, lockId);
}
std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, const NLongTxService::TLongTxId& longTxId,
@@ -205,24 +207,24 @@ void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const std::vec
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, scan.release());
}
-void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& writeIds) {
- NKikimrTxColumnShard::ETransactionKind txKind = NKikimrTxColumnShard::ETransactionKind::TX_KIND_COMMIT;
- TString txBody = TTestSchema::CommitTxBody(0, writeIds);
+void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& writeIds, const ui64 lockId) {
+ auto write = std::make_unique<NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
+ auto* lock = write->Record.MutableLocks()->AddLocks();
+ lock->SetLockId(lockId);
+ write->Record.MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
- ForwardToTablet(runtime, shardId, sender,
- new TEvColumnShard::TEvProposeTransaction(txKind, sender, txId, txBody));
+ ForwardToTablet(runtime, shardId, sender, write.release());
TAutoPtr<IEventHandle> handle;
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(handle);
+ auto event = runtime.GrabEdgeEvent<NEvents::TDataEvents::TEvWriteResult>(handle);
UNIT_ASSERT(event);
- auto& res = Proto(event);
- UNIT_ASSERT_EQUAL(res.GetTxKind(), txKind);
- UNIT_ASSERT_EQUAL(res.GetTxId(), txId);
- UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::EResultStatus::PREPARED);
+ auto& res = event->Record;
+ AFL_VERIFY(res.GetTxId() == txId)("tx_id", txId)("res", res.GetTxId());
+ UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED);
}
-void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector<ui64>& writeIds) {
- ProposeCommit(runtime, sender, TTestTxConfig::TxTablet0, txId, writeIds);
+void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector<ui64>& writeIds, const ui64 lockId) {
+ ProposeCommit(runtime, sender, TTestTxConfig::TxTablet0, txId, writeIds, lockId);
}
void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, const TSet<ui64>& txIds) {
@@ -246,12 +248,12 @@ void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64
TAutoPtr<IEventHandle> handle;
for (ui32 i = 0; i < txIds.size(); ++i) {
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(handle);
+ auto event = runtime.GrabEdgeEvent<NEvents::TDataEvents::TEvWriteResult>(handle);
UNIT_ASSERT(event);
- auto& res = Proto(event);
+ auto& res = event->Record;
UNIT_ASSERT(txIds.contains(res.GetTxId()));
- UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
+ UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
}
Wakeup(runtime, sender, shardId);
}
diff --git a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
index 8a83692528..3dd60dcb8b 100644
--- a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
@@ -408,11 +408,11 @@ void PlanWriteTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSna
bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId, const ui64 writeId, const ui64 tableId, const TString& data,
const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, std::vector<ui64>* writeIds,
- const NEvWrite::EModificationType mType = NEvWrite::EModificationType::Upsert);
+ const NEvWrite::EModificationType mType = NEvWrite::EModificationType::Upsert, const ui64 lockId = 1);
bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 writeId, const ui64 tableId, const TString& data,
const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, bool waitResult = true, std::vector<ui64>* writeIds = nullptr,
- const NEvWrite::EModificationType mType = NEvWrite::EModificationType::Upsert);
+ const NEvWrite::EModificationType mType = NEvWrite::EModificationType::Upsert, const ui64 lockId = 1);
std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, const NLongTxService::TLongTxId& longTxId,
ui64 tableId, const ui64 writePartId, const TString& data,
@@ -423,8 +423,8 @@ ui32 WaitWriteResult(TTestBasicRuntime& runtime, ui64 shardId, std::vector<ui64>
void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const std::vector<ui64>& pathIds,
NOlap::TSnapshot snap, ui64 scanId = 0);
-void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& writeIds);
-void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector<ui64>& writeIds);
+void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& writeIds, const ui64 lockId = 1);
+void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector<ui64>& writeIds, const ui64 lockId = 1);
void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 planStep, const TSet<ui64>& txIds);
void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, const TSet<ui64>& txIds);
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp b/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp
index 6463e4a0a2..887780e4da 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp
@@ -82,7 +82,6 @@ Y_UNIT_TEST_SUITE(Backup) {
PlanCommit(runtime, sender, ++planStep, txId);
}
- const ui32 start = csControllerGuard->GetInsertStartedCounter().Val();
TestWaitCondition(runtime, "insert compacted",
[&]() {
++writeId;
@@ -90,7 +89,7 @@ Y_UNIT_TEST_SUITE(Backup) {
WriteData(runtime, sender, writeId, tableId, MakeTestBlob({writeId * 100, (writeId + 1) * 100}, schema), schema, true, &writeIds);
ProposeCommit(runtime, sender, ++txId, writeIds);
PlanCommit(runtime, sender, ++planStep, txId);
- return csControllerGuard->GetInsertStartedCounter().Val() > start + 1;
+ return true;
}, TDuration::Seconds(1000));
NKikimrTxColumnShard::TBackupTxBody txBody;
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 1e6f1c5937..25e1de8038 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -40,7 +40,7 @@ using TDefaultTestsController = NKikimr::NYDBTest::NColumnShard::TController;
template <typename TKey = ui64>
bool DataHas(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::pair<ui64, ui64> range, bool requireUniq = false,
- const std::string& columnName = "timestamp") {
+ const std::string& columnName = "timestamp", const bool inverseCheck = false) {
static constexpr const bool isStrKey = std::is_same_v<TKey, std::string>;
THashMap<TKey, ui32> keys;
@@ -81,18 +81,19 @@ bool DataHas(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, st
}
}
+ bool problems = false;
for (auto& [key, count] : keys) {
- if (!count) {
+ if (!count && !inverseCheck) {
Cerr << "No key: " << key << "\n";
- return false;
+ problems = true;
}
if (requireUniq && count > 1) {
Cerr << "Not unique key: " << key << " (count: " << count << ")\n";
- return false;
+ problems = true;
}
}
- return true;
+ return !problems;
}
template <typename TKey = ui64>
@@ -107,6 +108,11 @@ bool DataHas(const std::vector<TString>& blobs, const TString& srtSchema, std::p
return DataHas<TKey>(batches, range, requireUniq, columnName);
}
+bool DataNotHas(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::pair<ui64, ui64> range, bool requireUniq = false,
+ const std::string& columnName = "timestamp") {
+ return DataHas<ui64>(batches, range, requireUniq, columnName, true);
+}
+
template <typename TKey = ui64>
bool DataHasOnly(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::pair<ui64, ui64> range) {
static constexpr const bool isStrKey = std::is_same_v<TKey, std::string>;
@@ -424,7 +430,7 @@ void TestWrite(const TestTableDescription& table) {
UNIT_ASSERT(bigData.size() > NColumnShard::TLimits::GetMaxBlobSize());
UNIT_ASSERT(bigData.size() < NColumnShard::TLimits::GetMaxBlobSize() + 2 * 1024 * 1024);
ok = WriteData(runtime, sender, writeId++, tableId, bigData, ydbSchema);
- UNIT_ASSERT(!ok);
+ UNIT_ASSERT(ok);
}
void TestWriteOverload(const TestTableDescription& table) {
@@ -482,11 +488,11 @@ void TestWriteOverload(const TestTableDescription& table) {
UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testBlob, table.Schema, false));
}
- UNIT_ASSERT_VALUES_EQUAL(WaitWriteResult(runtime, TTestTxConfig::TxTablet0), (ui32)NKikimrTxColumnShard::EResultStatus::OVERLOADED);
+ UNIT_ASSERT_VALUES_EQUAL(WaitWriteResult(runtime, TTestTxConfig::TxTablet0), (ui32)NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED);
while (capturedWrites.size()) {
resendOneCaptured();
- UNIT_ASSERT_VALUES_EQUAL(WaitWriteResult(runtime, TTestTxConfig::TxTablet0), (ui32)NKikimrTxColumnShard::EResultStatus::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(WaitWriteResult(runtime, TTestTxConfig::TxTablet0), (ui32)NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
}
UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testBlob, table.Schema)); // OK after overload
@@ -524,8 +530,9 @@ void TestWriteReadDup(const TestTableDescription& table = {}) {
TSet<ui64> txIds;
for (ui32 i = 0; i <= 5; ++i) {
std::vector<ui64> writeIds;
- UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds));
- ProposeCommit(runtime, sender, ++txId, writeIds);
+ ++txId;
+ UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Upsert, txId));
+ ProposeCommit(runtime, sender, txId, writeIds, txId);
txIds.insert(txId);
}
PlanCommit(runtime, sender, planStep, txIds);
@@ -542,69 +549,6 @@ void TestWriteReadDup(const TestTableDescription& table = {}) {
}
}
-void TestWriteReadLongTxDup() {
- TTestBasicRuntime runtime;
- TTester::Setup(runtime);
- auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();
-
- TActorId sender = runtime.AllocateEdgeActor();
- CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);
-
- TDispatchOptions options;
- options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
- runtime.DispatchEvents(options);
-
- //
-
- ui64 tableId = 1;
- auto ydbSchema = TTestSchema::YdbSchema();
- SetupSchema(runtime, sender, tableId);
-
- constexpr ui32 numRows = 10;
- std::pair<ui64, ui64> portion = { 10, 10 + numRows };
-
- NLongTxService::TLongTxId longTxId;
- UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1"));
-
- ui64 txId = 0;
- ui64 planStep = 100;
- std::optional<ui64> writeId;
-
- // Only the first blob with dedup pair {longTx, dedupId} should be inserted
- // Others should return OK (write retries emulation)
- for (ui32 i = 0; i < 4; ++i) {
- auto data = MakeTestBlob({ portion.first + i, portion.second + i }, ydbSchema);
- UNIT_ASSERT(data.size() < NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
-
- auto writeIdOpt = WriteData(runtime, sender, longTxId, tableId, 1, data, ydbSchema);
- UNIT_ASSERT(writeIdOpt);
- if (!i) {
- writeId = *writeIdOpt;
- }
- UNIT_ASSERT_EQUAL(*writeIdOpt, *writeId);
- }
-
- ProposeCommit(runtime, sender, ++txId, { *writeId });
- TSet<ui64> txIds = { txId };
- PlanCommit(runtime, sender, planStep, txIds);
-
- // read
- TAutoPtr<IEventHandle> handle;
- {
- TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId));
- reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema));
- auto rb = reader.ReadAll();
- UNIT_ASSERT(reader.IsCorrectlyFinished());
- UNIT_ASSERT(rb);
- UNIT_ASSERT(rb->num_rows());
- Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, TTestSchema::ExtractNames(ydbSchema)));
- UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size());
- UNIT_ASSERT(CheckOrdered(rb));
- UNIT_ASSERT(DataHas({ rb }, portion, true));
- UNIT_ASSERT(DataHasOnly({ rb }, portion));
- }
-}
-
void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString codec = "") {
auto csControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();
csControllerGuard->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction);
@@ -787,8 +731,8 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
UNIT_ASSERT(rb->num_rows());
UNIT_ASSERT(CheckOrdered(rb));
UNIT_ASSERT(DataHas({ rb }, portion[0]));
- UNIT_ASSERT(!DataHas({ rb }, portion[1]));
- UNIT_ASSERT(!DataHas({ rb }, portion[2]));
+ UNIT_ASSERT(DataNotHas({ rb }, portion[1]));
+ UNIT_ASSERT(DataNotHas({ rb }, portion[2]));
}
// read 8, planstep 22 (full index)
@@ -805,7 +749,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
UNIT_ASSERT(CheckOrdered(rb));
UNIT_ASSERT(DataHas({ rb }, portion[0]));
UNIT_ASSERT(DataHas({ rb }, portion[1]));
- UNIT_ASSERT(!DataHas({ rb }, portion[2]));
+ UNIT_ASSERT(DataNotHas({ rb }, portion[2]));
}
// commit 3: ins:0, cmt:1, idx:1
@@ -836,7 +780,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
UNIT_ASSERT(DataHas({ rb }, portion[0]));
UNIT_ASSERT(DataHas({ rb }, portion[1]));
UNIT_ASSERT(DataHas({ rb }, portion[2]));
- UNIT_ASSERT(!DataHas({ rb }, portion[3]));
+ UNIT_ASSERT(DataNotHas({ rb }, portion[3]));
}
// commit 4: ins:0, cmt:2, idx:1 (with duplicates in PK)
@@ -862,7 +806,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
UNIT_ASSERT(DataHas({ rb }, portion[1]));
UNIT_ASSERT(DataHas({ rb }, portion[2]));
UNIT_ASSERT(DataHas({ rb }, portion[3]));
- UNIT_ASSERT(DataHas({ rb }, { 0, 500 }, true));
+ UNIT_ASSERT(DataHas({ rb }, { 0, 500 }, false));
const ui64 compactedBytes = reader.GetReadStat("compacted_bytes");
const ui64 insertedBytes = reader.GetReadStat("inserted_bytes");
@@ -1730,7 +1674,6 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
Y_UNIT_TEST(WriteReadDuplicate) {
TestWriteReadDup();
- TestWriteReadLongTxDup();
}
Y_UNIT_TEST(WriteReadModifications) {
diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
index 7060877880..1220870814 100644
--- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
@@ -1028,6 +1028,7 @@ void TestDropWriteRace() {
ui64 tableId = 1;
ui64 planStep = 1000000000; // greater then delays
ui64 txId = 100;
+ ui32 writeId = 0;
NLongTxService::TLongTxId longTxId;
UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1"));
@@ -1038,9 +1039,9 @@ void TestDropWriteRace() {
UNIT_ASSERT(data.size() < NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
// Write into InsertTable
- auto writeIdOpt = WriteData(runtime, sender, longTxId, tableId, 1, data, testYdbSchema);
- UNIT_ASSERT(writeIdOpt);
- ProposeCommit(runtime, sender, ++txId, {*writeIdOpt});
+ ++txId;
+ AFL_VERIFY(WriteData(runtime, sender, ++writeId, tableId, data, testYdbSchema));
+ ProposeCommit(runtime, sender, txId, { writeId });
auto commitTxId = txId;
// Drop table
diff --git a/ydb/core/tx/conveyor/service/service.cpp b/ydb/core/tx/conveyor/service/service.cpp
index 68900c3e25..c34acba81c 100644
--- a/ydb/core/tx/conveyor/service/service.cpp
+++ b/ydb/core/tx/conveyor/service/service.cpp
@@ -13,7 +13,7 @@ TDistributor::TDistributor(const TConfig& config, const TString& conveyorName, T
void TDistributor::Bootstrap() {
const ui32 workersCount = Config.GetWorkersCountForConveyor(NKqp::TStagePredictor::GetUsableThreads());
- AFL_NOTICE(NKikimrServices::TX_CONVEYOR)("name", ConveyorName)("action", "conveyor_registered")("config", Config.DebugString());
+ AFL_NOTICE(NKikimrServices::TX_CONVEYOR)("name", ConveyorName)("action", "conveyor_registered")("config", Config.DebugString())("actor_id", SelfId());
for (ui32 i = 0; i < workersCount; ++i) {
const double usage = Config.GetWorkerCPUUsage(i);
Workers.emplace_back(Register(new TWorker(ConveyorName, usage, SelfId())));
@@ -46,10 +46,10 @@ void TDistributor::HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& ev) {
}
void TDistributor::HandleMain(TEvExecution::TEvNewTask::TPtr& ev) {
- AFL_DEBUG(NKikimrServices::TX_CONVEYOR)("action", "add_task")("sender", ev->Sender);
Counters.IncomingRate->Inc();
const TString taskClass = ev->Get()->GetTask()->GetTaskClassIdentifier();
+ AFL_DEBUG(NKikimrServices::TX_CONVEYOR)("action", "add_task")("sender", ev->Sender)("task", taskClass);
auto itSignal = Signals.find(taskClass);
if (itSignal == Signals.end()) {
itSignal = Signals.emplace(taskClass, std::make_shared<TTaskSignals>("Conveyor/" + ConveyorName, taskClass)).first;
diff --git a/ydb/core/tx/conveyor/usage/service.h b/ydb/core/tx/conveyor/usage/service.h
index 6ba3c3320f..22928ae3e6 100644
--- a/ydb/core/tx/conveyor/usage/service.h
+++ b/ydb/core/tx/conveyor/usage/service.h
@@ -41,9 +41,9 @@ public:
context.Register(new TAsyncTaskExecutor(task));
}
static bool SendTaskToExecute(const std::shared_ptr<ITask>& task) {
- auto& context = NActors::TActorContext::AsActorContext();
- const NActors::TActorId& selfId = context.SelfID;
- if (TSelf::IsEnabled()) {
+ if (TSelf::IsEnabled() && NActors::TlsActivationContext) {
+ auto& context = NActors::TActorContext::AsActorContext();
+ const NActors::TActorId& selfId = context.SelfID;
context.Send(MakeServiceId(selfId.NodeId()), new NConveyor::TEvExecution::TEvNewTask(task));
return true;
} else {
diff --git a/ydb/core/tx/data_events/common/modification_type.h b/ydb/core/tx/data_events/common/modification_type.h
index cf9f8d90e2..f93eeda183 100644
--- a/ydb/core/tx/data_events/common/modification_type.h
+++ b/ydb/core/tx/data_events/common/modification_type.h
@@ -49,18 +49,18 @@ public:
}
}
- static TProto SerializeToProto(const NEvWrite::EModificationType value) {
+ static NKikimrDataEvents::TEvWrite::TOperation::EOperationType SerializeToWriteProto(const NEvWrite::EModificationType value) {
switch (value) {
case NEvWrite::EModificationType::Upsert:
- return NKikimrTxColumnShard::TEvWrite::OPERATION_UPSERT;
+ return NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT;
case NEvWrite::EModificationType::Insert:
- return NKikimrTxColumnShard::TEvWrite::OPERATION_INSERT;
+ return NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT;
case NEvWrite::EModificationType::Delete:
- return NKikimrTxColumnShard::TEvWrite::OPERATION_DELETE;
+ return NKikimrDataEvents::TEvWrite::TOperation::OPERATION_DELETE;
case NEvWrite::EModificationType::Replace:
- return NKikimrTxColumnShard::TEvWrite::OPERATION_REPLACE;
+ return NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE;
case NEvWrite::EModificationType::Update:
- return NKikimrTxColumnShard::TEvWrite::OPERATION_UPDATE;
+ return NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE;
}
}
@@ -81,6 +81,21 @@ public:
}
}
+ static TProto SerializeToProto(const NEvWrite::EModificationType value) {
+ switch (value) {
+ case NEvWrite::EModificationType::Upsert:
+ return NKikimrTxColumnShard::TEvWrite::OPERATION_UPSERT;
+ case NEvWrite::EModificationType::Insert:
+ return NKikimrTxColumnShard::TEvWrite::OPERATION_INSERT;
+ case NEvWrite::EModificationType::Delete:
+ return NKikimrTxColumnShard::TEvWrite::OPERATION_DELETE;
+ case NEvWrite::EModificationType::Replace:
+ return NKikimrTxColumnShard::TEvWrite::OPERATION_REPLACE;
+ case NEvWrite::EModificationType::Update:
+ return NKikimrTxColumnShard::TEvWrite::OPERATION_UPDATE;
+ }
+ }
+
static NEvWrite::EModificationType DeserializeFromProto(const NKikimrTxColumnShard::TEvWrite::EModificationType value) {
switch (value) {
case NKikimrTxColumnShard::TEvWrite::OPERATION_UPSERT:
diff --git a/ydb/core/tx/data_events/events.h b/ydb/core/tx/data_events/events.h
index bd4f06284e..f4b190eaff 100644
--- a/ydb/core/tx/data_events/events.h
+++ b/ydb/core/tx/data_events/events.h
@@ -62,7 +62,8 @@ struct TDataEvents {
return *this;
}
- void AddOperation(NKikimrDataEvents::TEvWrite_TOperation::EOperationType operationType, const TTableId& tableId, const std::vector<ui32>& columnIds,
+ NKikimrDataEvents::TEvWrite::TOperation& AddOperation(NKikimrDataEvents::TEvWrite_TOperation::EOperationType operationType,
+ const TTableId& tableId, const std::vector<ui32>& columnIds,
ui64 payloadIndex, NKikimrDataEvents::EDataFormat payloadFormat) {
Y_ABORT_UNLESS(operationType != NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UNSPECIFIED);
Y_ABORT_UNLESS(payloadFormat != NKikimrDataEvents::FORMAT_UNSPECIFIED);
@@ -75,6 +76,7 @@ struct TDataEvents {
operation->MutableTableId()->SetTableId(tableId.PathId.LocalPathId);
operation->MutableTableId()->SetSchemaVersion(tableId.SchemaVersion);
operation->MutableColumnIds()->Assign(columnIds.begin(), columnIds.end());
+ return *operation;
}
ui64 GetTxId() const {
diff --git a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp
index 7836ca0008..f0f5fb0832 100644
--- a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp
+++ b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp
@@ -715,8 +715,9 @@ Y_UNIT_TEST_SUITE(TOlap) {
TSet<ui64> txIds;
for (ui32 i = 0; i < 10; ++i) {
std::vector<ui64> writeIds;
- NTxUT::WriteData(runtime, sender, shardId, ++writeId, pathId, data, defaultYdbSchema, &writeIds, NEvWrite::EModificationType::Upsert);
- NTxUT::ProposeCommit(runtime, sender, shardId, ++txId, writeIds);
+ ++txId;
+ NTxUT::WriteData(runtime, sender, shardId, ++writeId, pathId, data, defaultYdbSchema, &writeIds, NEvWrite::EModificationType::Upsert, txId);
+ NTxUT::ProposeCommit(runtime, sender, shardId, txId, writeIds, txId);
txIds.insert(txId);
}
@@ -727,9 +728,10 @@ Y_UNIT_TEST_SUITE(TOlap) {
// trigger periodic stats at shard (after timeout)
std::vector<ui64> writeIds;
- NTxUT::WriteData(runtime, sender, shardId, ++writeId, pathId, data, defaultYdbSchema, &writeIds, NEvWrite::EModificationType::Upsert);
- NTxUT::ProposeCommit(runtime, sender, shardId, ++txId, writeIds);
- NTxUT::PlanCommit(runtime, sender, shardId, ++planStep, {txId});
+ ++txId;
+ NTxUT::WriteData(runtime, sender, shardId, ++writeId, pathId, data, defaultYdbSchema, &writeIds, NEvWrite::EModificationType::Upsert, txId);
+ NTxUT::ProposeCommit(runtime, sender, shardId, txId, writeIds, txId);
+ NTxUT::PlanCommit(runtime, sender, shardId, ++planStep, { txId });
}
csController->WaitIndexation(TDuration::Seconds(5));
{