summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <[email protected]>2022-11-17 22:32:43 +0300
committerssmike <[email protected]>2022-11-17 22:32:43 +0300
commit1cc2f4df3f6a62eea4ca4813b6fcdae0a9af74e6 (patch)
tree36ab568aabc2cefb29f1ecea55777a2a2ddc1362
parent6b0ef1e419310324ad779d3adf521b92a6c78ad7 (diff)
DQ source for scans
-rw-r--r--ydb/core/kqp/common/kqp_yql.cpp23
-rw-r--r--ydb/core/kqp/common/kqp_yql.h4
-rw-r--r--ydb/core/kqp/compile/kqp_compile.cpp64
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.cpp2
-rw-r--r--ydb/core/kqp/executer/kqp_executer_impl.h18
-rw-r--r--ydb/core/kqp/executer/kqp_partition_helper.cpp56
-rw-r--r--ydb/core/kqp/executer/kqp_partition_helper.h10
-rw-r--r--ydb/core/kqp/executer/kqp_scan_executer.cpp107
-rw-r--r--ydb/core/kqp/executer/kqp_table_resolver.cpp11
-rw-r--r--ydb/core/kqp/executer/kqp_tasks_graph.cpp67
-rw-r--r--ydb/core/kqp/executer/kqp_tasks_graph.h20
-rw-r--r--ydb/core/kqp/expr_nodes/kqp_expr_nodes.json12
-rw-r--r--ydb/core/kqp/host/kqp_type_ann.cpp42
-rw-r--r--ydb/core/kqp/opt/kqp_opt_build_txs.cpp117
-rw-r--r--ydb/core/kqp/opt/kqp_query_plan.cpp259
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp57
-rw-r--r--ydb/core/kqp/runtime/CMakeLists.txt1
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp837
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.h11
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data.cpp37
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_data.h17
-rw-r--r--ydb/core/kqp/ut/kqp_scan_ut.cpp20
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/protos/kqp_physical.proto25
-rw-r--r--ydb/core/protos/tx_datashard.proto18
-rw-r--r--ydb/library/yql/dq/tasks/dq_tasks_graph.h7
26 files changed, 1724 insertions, 119 deletions
diff --git a/ydb/core/kqp/common/kqp_yql.cpp b/ydb/core/kqp/common/kqp_yql.cpp
index 7f76bfd8b84..8cbaaecba95 100644
--- a/ydb/core/kqp/common/kqp_yql.cpp
+++ b/ydb/core/kqp/common/kqp_yql.cpp
@@ -127,16 +127,15 @@ NNodes::TCoNameValueTupleList TKqpPhyTxSettings::BuildNode(TExprContext& ctx, TP
namespace {
-template <typename TKqlReadOperation>
-TKqpReadTableSettings ParseInternal(const TKqlReadOperation& node) {
+TKqpReadTableSettings ParseInternal(const TCoNameValueTupleList& node) {
TKqpReadTableSettings settings;
- for (const auto& tuple : node.Settings()) {
+ for (const auto& tuple : node) {
TStringBuf name = tuple.Name().Value();
if (name == TKqpReadTableSettings::SkipNullKeysSettingName) {
- YQL_ENSURE(tuple.Value().template Maybe<TCoAtomList>());
- for (const auto& key : tuple.Value().template Cast<TCoAtomList>()) {
+ YQL_ENSURE(tuple.Value().Maybe<TCoAtomList>());
+ for (const auto& key : tuple.Value().Cast<TCoAtomList>()) {
settings.SkipNullKeys.emplace_back(TString(key.Value()));
}
} else if (name == TKqpReadTableSettings::ItemsLimitSettingName) {
@@ -158,12 +157,16 @@ TKqpReadTableSettings ParseInternal(const TKqlReadOperation& node) {
} // anonymous namespace end
-TKqpReadTableSettings TKqpReadTableSettings::Parse(const TKqlReadTableBase& node) {
+TKqpReadTableSettings TKqpReadTableSettings::Parse(const NNodes::TCoNameValueTupleList& node) {
return ParseInternal(node);
}
+TKqpReadTableSettings TKqpReadTableSettings::Parse(const TKqlReadTableBase& node) {
+ return TKqpReadTableSettings::Parse(node.Settings());
+}
+
TKqpReadTableSettings TKqpReadTableSettings::Parse(const TKqlReadTableRangesBase& node) {
- return ParseInternal(node);
+ return TKqpReadTableSettings::Parse(node.Settings());
}
NNodes::TCoNameValueTupleList TKqpReadTableSettings::BuildNode(TExprContext& ctx, TPositionHandle pos) const {
@@ -297,9 +300,13 @@ TCoNameValueTupleList TKqpReadTableExplainPrompt::BuildNode(TExprContext& ctx, T
}
TKqpReadTableExplainPrompt TKqpReadTableExplainPrompt::Parse(const NNodes::TKqlReadTableRangesBase& node) {
+ return TKqpReadTableExplainPrompt::Parse(node.ExplainPrompt());
+}
+
+TKqpReadTableExplainPrompt TKqpReadTableExplainPrompt::Parse(const NNodes::TCoNameValueTupleList& node) {
TKqpReadTableExplainPrompt prompt;
- for (const auto& tuple : node.ExplainPrompt()) {
+ for (const auto& tuple : node) {
TStringBuf name = tuple.Name().Value();
if (name == TKqpReadTableExplainPrompt::UsedKeyColumnsName) {
diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h
index 4a8352aa688..735e93360ab 100644
--- a/ydb/core/kqp/common/kqp_yql.h
+++ b/ydb/core/kqp/common/kqp_yql.h
@@ -39,6 +39,8 @@ struct TKqpPhyTxSettings {
NNodes::TCoNameValueTupleList BuildNode(TExprContext& ctx, TPositionHandle pos) const;
};
+constexpr TStringBuf KqpReadRangesSourceName = "KqpReadRangesSource";
+
struct TKqpReadTableSettings {
static constexpr TStringBuf SkipNullKeysSettingName = "SkipNullKeys";
static constexpr TStringBuf ItemsLimitSettingName = "ItemsLimit";
@@ -57,6 +59,7 @@ struct TKqpReadTableSettings {
static TKqpReadTableSettings Parse(const NNodes::TKqlReadTableBase& node);
static TKqpReadTableSettings Parse(const NNodes::TKqlReadTableRangesBase& node);
+ static TKqpReadTableSettings Parse(const NNodes::TCoNameValueTupleList& node);
NNodes::TCoNameValueTupleList BuildNode(TExprContext& ctx, TPositionHandle pos) const;
};
@@ -88,6 +91,7 @@ struct TKqpReadTableExplainPrompt {
NNodes::TCoNameValueTupleList BuildNode(TExprContext& ctx, TPositionHandle pos) const;
static TKqpReadTableExplainPrompt Parse(const NNodes::TKqlReadTableRangesBase& node);
+ static TKqpReadTableExplainPrompt Parse(const NNodes::TCoNameValueTupleList& node);
};
TString KqpExprToPrettyString(const TExprNode& expr, TExprContext& ctx);
diff --git a/ydb/core/kqp/compile/kqp_compile.cpp b/ydb/core/kqp/compile/kqp_compile.cpp
index ede7a19f34c..e12ceca2c77 100644
--- a/ydb/core/kqp/compile/kqp_compile.cpp
+++ b/ydb/core/kqp/compile/kqp_compile.cpp
@@ -414,11 +414,19 @@ private:
for (ui32 inputIndex = 0; inputIndex < stage.Inputs().Size(); ++inputIndex) {
const auto& input = stage.Inputs().Item(inputIndex);
- YQL_ENSURE(input.Maybe<TDqConnection>());
- auto connection = input.Cast<TDqConnection>();
- auto& protoInput = *stageProto.AddInputs();
- FillConnection(connection, stagesMap, protoInput, ctx);
+ if (input.Maybe<TDqSource>()) {
+ auto* protoSource = stageProto.AddSources();
+ FillSource(input.Cast<TDqSource>(), protoSource, true);
+ protoSource->SetInputIndex(inputIndex);
+ } else {
+ YQL_ENSURE(input.Maybe<TDqConnection>());
+ auto connection = input.Cast<TDqConnection>();
+
+ auto& protoInput = *stageProto.AddInputs();
+ FillConnection(connection, stagesMap, protoInput, ctx);
+ protoInput.SetInputIndex(inputIndex);
+ }
}
bool hasSort = false;
@@ -631,6 +639,54 @@ private:
}
}
+ void FillSource(const TDqSource& source, NKqpProto::TKqpSource* protoSource, bool allowSystemColumns) {
+ if (auto settings = source.Settings().Maybe<TKqpReadRangesSourceSettings>()) {
+ NKqpProto::TKqpReadRangesSource& readProto = *protoSource->MutableReadRangesSource();
+ FillTable(settings.Table().Cast(), *readProto.MutableTable());
+
+ auto tableMeta = TablesData->ExistingTable(Cluster, settings.Table().Cast().Path()).Metadata;
+ YQL_ENSURE(tableMeta);
+
+ FillColumns(settings.Columns().Cast(), *tableMeta, readProto, allowSystemColumns);
+ auto readSettings = TKqpReadTableSettings::Parse(settings.Settings().Cast());
+
+ readProto.SetReverse(readSettings.Reverse);
+ readProto.SetSorted(readSettings.Sorted);
+ for (auto&& key : readSettings.SkipNullKeys) {
+ readProto.AddSkipNullKeys(key);
+ }
+
+ auto ranges = settings.RangesExpr().template Maybe<TCoParameter>();
+ if (ranges.IsValid()) {
+ auto& rangesParam = *readProto.MutableRanges();
+ rangesParam.SetParamName(ranges.Cast().Name().StringValue());
+ } else {
+ YQL_ENSURE(
+ TCoVoid::Match(settings.RangesExpr().Raw()),
+ "Read ranges should be parameter or void, got: " << settings.RangesExpr().Cast().Ptr()->Content()
+ );
+ }
+
+ if (readSettings.ItemsLimit) {
+ TExprBase expr(readSettings.ItemsLimit);
+ if (expr.template Maybe<TCoUint64>()) {
+ auto* literal = readProto.MutableItemsLimit()->MutableLiteralValue();
+
+ literal->MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data);
+ literal->MutableType()->MutableData()->SetScheme(NScheme::NTypeIds::Uint64);
+
+ literal->MutableValue()->SetUint64(FromString<ui64>(expr.Cast<TCoUint64>().Literal().Value()));
+ } else if (expr.template Maybe<TCoParameter>()) {
+ readProto.MutableItemsLimit()->MutableParamValue()->SetParamName(expr.template Cast<TCoParameter>().Name().StringValue());
+ } else {
+ YQL_ENSURE(false, "Unexpected ItemsLimit callable " << expr.Ref().Content());
+ }
+ }
+ } else {
+ YQL_ENSURE(false, "unsupported source type");
+ }
+ }
+
void FillConnection(const TDqConnection& connection, const TMap<ui64, ui32>& stagesMap,
NKqpProto::TKqpPhyConnection& connectionProto, TExprContext& ctx)
{
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
index 9fe5852f386..f9eab53745d 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
@@ -3,6 +3,7 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/kqp/runtime/kqp_compute.h>
#include <ydb/core/kqp/runtime/kqp_read_table.h>
+#include <ydb/core/kqp/runtime/kqp_read_actor.h>
#include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h>
namespace NKikimr {
@@ -48,6 +49,7 @@ namespace NKqp {
NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory() {
auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>();
RegisterStreamLookupActorFactory(*factory);
+ RegisterKqpReadActor(*factory);
return factory;
}
diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h
index a8c8c0579ef..8f0936d6410 100644
--- a/ydb/core/kqp/executer/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer/kqp_executer_impl.h
@@ -185,6 +185,19 @@ protected:
}
}
+ TMaybe<size_t> FindReadRangesSource(const NKqpProto::TKqpPhyStage& stage) {
+ TMaybe<size_t> res;
+ for (size_t i = 0; i < stage.SourcesSize(); ++i) {
+ auto& source = stage.GetSources(i);
+ if (source.HasReadRangesSource()) {
+ YQL_ENSURE(!res);
+ res = i;
+
+ }
+ }
+ return res;
+ }
+
protected:
void HandleAbortExecution(TEvKqp::TEvAbortExecution::TPtr& ev) {
auto& msg = ev->Get()->Record;
@@ -406,6 +419,11 @@ protected:
void FillInputDesc(NYql::NDqProto::TTaskInput& inputDesc, const TTaskInput& input) {
switch (input.Type()) {
+ case NYql::NDq::TTaskInputType::Source:
+ inputDesc.MutableSource()->SetType(input.SourceType);
+ inputDesc.MutableSource()->SetWatermarksMode(input.WatermarksMode);
+ inputDesc.MutableSource()->MutableSettings()->CopyFrom(*input.SourceSettings);
+ break;
case NYql::NDq::TTaskInputType::UnionAll: {
inputDesc.MutableUnionAll();
break;
diff --git a/ydb/core/kqp/executer/kqp_partition_helper.cpp b/ydb/core/kqp/executer/kqp_partition_helper.cpp
index 98a77963b63..a065cffaeed 100644
--- a/ydb/core/kqp/executer/kqp_partition_helper.cpp
+++ b/ydb/core/kqp/executer/kqp_partition_helper.cpp
@@ -587,6 +587,57 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
return shardInfoMap;
}
+THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
+ const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
+ const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
+{
+ const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId);
+ YQL_ENSURE(table);
+
+ const auto& keyColumnTypes = table->KeyColumnTypes;
+ TVector<TSerializedPointOrRange> ranges;
+
+ if (source.HasRanges()) {
+ ranges = FillRangesFromParameter(
+ keyColumnTypes, source.GetRanges(), stageInfo, holderFactory, typeEnv
+ );
+ } else if (source.HasKeyRange()) {
+ //TODO: support KeyRange
+ Y_ENSURE(false);
+ } else {
+ ranges = BuildFullRange(keyColumnTypes);
+ }
+
+ THashMap<ui64, TShardInfo> shardInfoMap;
+
+ // KeyReadRanges must be sorted & non-intersecting, they came in such condition from predicate extraction.
+ for (auto& range: ranges) {
+ TTableRange tableRange = std::holds_alternative<TSerializedCellVec>(range)
+ ? TTableRange(std::get<TSerializedCellVec>(range).GetCells(), true, std::get<TSerializedCellVec>(range).GetCells(), true, true)
+ : TTableRange(std::get<TSerializedTableRange>(range).ToTableRange());
+
+ auto readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(),
+ keyColumnTypes);
+
+ for (TPartitionWithRange& partitionWithRange : readPartitions) {
+ auto& shardInfo = shardInfoMap[partitionWithRange.PartitionInfo->ShardId];
+
+ if (!shardInfo.KeyReadRanges) {
+ shardInfo.KeyReadRanges.ConstructInPlace();
+ }
+
+ if (partitionWithRange.FullRange) {
+ shardInfo.KeyReadRanges->MakeFullRange(std::move(*partitionWithRange.FullRange));
+ continue;
+ }
+
+ shardInfo.KeyReadRanges->Add(std::move(partitionWithRange.PointOrRange));
+ }
+ }
+
+ return shardInfoMap;
+}
+
THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpPhyOpReadOlapRanges& readRanges, const TStageInfo& stageInfo,
@@ -908,9 +959,6 @@ THashMap<ui64, TShardInfo> PruneEffectPartitions(TKqpTableKeys& tableKeys,
}
}
-
-namespace {
-
void ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValue& protoItemsLimit,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv,
ui64& itemsLimit, TString& itemsLimitParamName, NYql::NDqProto::TData& itemsLimitBytes,
@@ -961,8 +1009,6 @@ void ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValu
}
}
-}
-
TPhysicalShardReadSettings ExtractReadSettings(const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
diff --git a/ydb/core/kqp/executer/kqp_partition_helper.h b/ydb/core/kqp/executer/kqp_partition_helper.h
index 7b5eea477bd..f33016a9d47 100644
--- a/ydb/core/kqp/executer/kqp_partition_helper.h
+++ b/ydb/core/kqp/executer/kqp_partition_helper.h
@@ -61,6 +61,16 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpPhyOpLookup& lookup, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
+
+THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
+ const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
+ const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
+
+void ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValue& protoItemsLimit,
+ const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv,
+ ui64& itemsLimit, TString& itemsLimitParamName, NYql::NDqProto::TData& itemsLimitBytes,
+ NKikimr::NMiniKQL::TType*& itemsLimitType);
+
// Returns the list of ColumnShards that can store rows from the specified range
// NOTE: Unlike OLTP tables that store data in DataShards, data in OLAP tables is not range
// partitioned and multiple ColumnShards store data from the same key range
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp
index 396fd840e9c..6180a0e6549 100644
--- a/ydb/core/kqp/executer/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp
@@ -480,6 +480,101 @@ private:
}
}
+ void BuildScanTasksFromSource(TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory,
+ const NMiniKQL::TTypeEnvironment& typeEnv)
+ {
+ THashMap<ui64, std::vector<ui64>> nodeTasks;
+ THashMap<ui64, ui64> assignedShardsCount;
+
+ auto& stage = GetStage(stageInfo);
+
+ auto sourceIndex = FindReadRangesSource(stage);
+ YQL_ENSURE(sourceIndex);
+ YQL_ENSURE(stage.InputsSize() == 0 && stage.SourcesSize() == 1, "multiple sources or sources mixed with connections");
+
+ auto& source = stage.GetSources(*sourceIndex).GetReadRangesSource();
+
+ const auto& table = TableKeys.GetTable(MakeTableId(source.GetTable()));
+ const auto& keyTypes = table.KeyColumnTypes;
+
+ YQL_ENSURE(table.TableKind != NKikimr::NKqp::ETableKind::Olap);
+
+ auto columns = BuildKqpColumns(source, table);
+ THashMap<ui64, TShardInfo> partitions = PrunePartitions(TableKeys, source, stageInfo, holderFactory, typeEnv);
+
+ bool reverse = false;
+ ui64 itemsLimit = 0;
+ bool sorted = true;
+
+ TString itemsLimitParamName;
+ NDqProto::TData itemsLimitBytes;
+ NKikimr::NMiniKQL::TType* itemsLimitType = nullptr;
+
+ YQL_ENSURE(!source.GetReverse(), "reverse not supported yet");
+
+ for (auto& [shardId, shardInfo] : partitions) {
+ YQL_ENSURE(!shardInfo.KeyWriteRanges);
+
+ auto& task = AssignTaskToShard(stageInfo, shardId, nodeTasks, assignedShardsCount, sorted, false);
+
+ for (auto& [name, value] : shardInfo.Params) {
+ auto ret = task.Meta.Params.emplace(name, std::move(value));
+ YQL_ENSURE(ret.second);
+ auto typeIterator = shardInfo.ParamTypes.find(name);
+ YQL_ENSURE(typeIterator != shardInfo.ParamTypes.end());
+ auto retType = task.Meta.ParamTypes.emplace(name, typeIterator->second);
+ YQL_ENSURE(retType.second);
+ }
+
+ NKikimrTxDataShard::TKqpReadRangesSourceSettings settings;
+ FillTableMeta(stageInfo, settings.MutableTable());
+ for (auto& key : source.GetSkipNullKeys()) {
+ settings.AddSkipNullKeys(key);
+ }
+
+ for (auto& keyColumn : keyTypes) {
+ settings.AddKeyColumnTypes(static_cast<ui32>(keyColumn.GetTypeId()));
+ }
+
+ for (auto& column : columns) {
+ auto* protoColumn = settings.AddColumns();
+ protoColumn->SetId(column.Id);
+ auto columnType = NScheme::ProtoColumnTypeFromTypeInfo(column.Type);
+ protoColumn->SetType(columnType.TypeId);
+ if (columnType.TypeInfo) {
+ *protoColumn->MutableTypeInfo() = *columnType.TypeInfo;
+ }
+ protoColumn->SetName(column.Name);
+ }
+
+ if (AppData()->FeatureFlags.GetEnableArrowFormatAtDatashard()) {
+ settings.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::ARROW);
+ } else {
+ settings.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC);
+ }
+
+ settings.MutableSnapshot()->SetStep(Request.Snapshot.Step);
+ settings.MutableSnapshot()->SetTxId(Request.Snapshot.TxId);
+
+ shardInfo.KeyReadRanges->SerializeTo(&settings);
+ settings.SetReverse(reverse);
+
+ settings.SetShardIdHint(shardId);
+
+ ExtractItemsLimit(stageInfo, source.GetItemsLimit(), holderFactory,
+ typeEnv, itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType);
+ settings.SetItemsLimit(itemsLimit);
+
+ const auto& stageSource = stage.GetSources(*sourceIndex);
+ auto& input = task.Inputs[stageSource.GetInputIndex()];
+ auto& taskSourceSettings = input.SourceSettings;
+ input.ConnectionInfo = NYql::NDq::TSourceInput{};
+ taskSourceSettings.ConstructInPlace();
+ taskSourceSettings->PackFrom(settings);
+ input.SourceType = NYql::KqpReadRangesSourceName;
+ }
+ }
+
void BuildScanTasks(TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory,
const NMiniKQL::TTypeEnvironment& typeEnv)
{
@@ -662,7 +757,9 @@ private:
Y_VERIFY_DEBUG(!stage.GetIsEffectsStage());
- if (stageInfo.Meta.ShardOperations.empty()) {
+ if (FindReadRangesSource(stage)) {
+ BuildScanTasksFromSource(stageInfo, holderFactory, typeEnv);
+ } else if (stageInfo.Meta.ShardOperations.empty()) {
BuildComputeTasks(stageInfo);
} else if (stageInfo.Meta.IsSysView()) {
BuildSysViewScanTasks(stageInfo, holderFactory, typeEnv);
@@ -753,7 +850,13 @@ private:
protoTaskMeta.AddSkipNullKeys(skipNullKey);
}
- YQL_ENSURE(task.Meta.Reads);
+ // Task with source
+ if (!task.Meta.Reads) {
+ scanTasks[task.Meta.NodeId].emplace_back(std::move(taskDesc));
+ nScanTasks++;
+ continue;
+ }
+
YQL_ENSURE(!task.Meta.Writes);
if (!task.Meta.Reads->empty()) {
diff --git a/ydb/core/kqp/executer/kqp_table_resolver.cpp b/ydb/core/kqp/executer/kqp_table_resolver.cpp
index d4b9d715aaf..45e322f2d08 100644
--- a/ydb/core/kqp/executer/kqp_table_resolver.cpp
+++ b/ydb/core/kqp/executer/kqp_table_resolver.cpp
@@ -238,6 +238,17 @@ private:
}
}
+ for (auto& source : stage.GetSources()) {
+ if (source.HasReadRangesSource()) {
+ auto& table = TableKeys.GetOrAddTable(
+ MakeTableId(source.GetReadRangesSource().GetTable()),
+ source.GetReadRangesSource().GetTable().GetPath());
+ for (auto& column : source.GetReadRangesSource().GetColumns()) {
+ table.Columns.emplace(column.GetName(), TKqpTableKeys::TColumn());
+ }
+ }
+ }
+
for (const auto& input : stage.GetInputs()) {
if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) {
const auto& streamLookup = input.GetStreamLookup();
diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.cpp b/ydb/core/kqp/executer/kqp_tasks_graph.cpp
index cdab73ccfec..6c258a03e31 100644
--- a/ydb/core/kqp/executer/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer/kqp_tasks_graph.cpp
@@ -44,8 +44,18 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew
TStageInfoMeta meta(tx);
+ for (auto& source : stage.GetSources()) {
+ if (source.HasReadRangesSource()) {
+ YQL_ENSURE(source.GetInputIndex() == 0);
+ YQL_ENSURE(stage.SourcesSize() == 1);
+ meta.TableId = MakeTableId(source.GetReadRangesSource().GetTable());
+ meta.TablePath = source.GetReadRangesSource().GetTable().GetPath();
+ meta.ShardOperations.insert(TKeyDesc::ERowOperation::Read);
+ }
+ }
+
bool stageAdded = tasksGraph.AddStageInfo(
- TStageInfo(stageId, stage.InputsSize(), stage.GetOutputsCount(), std::move(meta)));
+ TStageInfo(stageId, stage.InputsSize() + stage.SourcesSize(), stage.GetOutputsCount(), std::move(meta)));
YQL_ENSURE(stageAdded);
auto& stageInfo = tasksGraph.GetStageInfo(stageId);
@@ -267,8 +277,8 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tabl
<< (spilling ? " with spilling" : " without spilling"));
};
- for (ui32 inputIdx = 0; inputIdx < stage.InputsSize(); ++inputIdx) {
- const auto& input = stage.GetInputs(inputIdx);
+ for (const auto& input : stage.GetInputs()) {
+ ui32 inputIdx = input.GetInputIndex();
const auto& inputStageInfo = tasksGraph.GetStageInfo(TStageId(stageInfo.Id.TxId, input.GetStageIndex()));
const auto& outputIdx = input.GetOutputIndex();
@@ -318,22 +328,6 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tabl
}
}
-TVector<TTaskMeta::TColumn> BuildKqpColumns(const NKqpProto::TKqpPhyTableOperation& op, const TKqpTableKeys::TTable& table) {
- TVector<TTaskMeta::TColumn> columns;
- columns.reserve(op.GetColumns().size());
-
- for (const auto& column : op.GetColumns()) {
- TTaskMeta::TColumn c;
- c.Id = column.GetId();
- c.Type = table.Columns.at(column.GetName()).Type;
- c.Name = column.GetName();
-
- columns.emplace_back(std::move(c));
- }
-
- return columns;
-}
-
bool IsCrossShardChannel(TKqpTasksGraph& tasksGraph, const TChannel& channel) {
YQL_ENSURE(channel.SrcTask);
@@ -618,6 +612,41 @@ void TShardKeyRanges::SerializeTo(NKikimrTxDataShard::TKqpTransaction_TScanTaskM
}
}
+void TShardKeyRanges::SerializeTo(NKikimrTxDataShard::TKqpReadRangesSourceSettings* proto) const {
+ if (IsFullRange()) {
+ auto& protoRange = *proto->MutableRanges()->AddKeyRanges();
+ FullRange->Serialize(protoRange);
+ } else {
+ bool usePoints = true;
+ for (auto& range : Ranges) {
+ if (std::holds_alternative<TSerializedCellVec>(range)) {
+ usePoints = false;
+ }
+ }
+ auto* protoRanges = proto->MutableRanges();
+ for (auto& range : Ranges) {
+ if (std::holds_alternative<TSerializedCellVec>(range)) {
+ if (usePoints) {
+ const auto& x = std::get<TSerializedCellVec>(range);
+ protoRanges->AddKeyPoints(x.GetBuffer());
+ } else {
+ const auto& x = std::get<TSerializedCellVec>(range);
+ auto& keyRange = *protoRanges->AddKeyRanges();
+ keyRange.SetFrom(x.GetBuffer());
+ keyRange.SetTo(x.GetBuffer());
+ keyRange.SetFromInclusive(true);
+ keyRange.SetToInclusive(true);
+ }
+ } else {
+ auto& x = std::get<TSerializedTableRange>(range);
+ Y_VERIFY_DEBUG(!x.Point);
+ auto& keyRange = *protoRanges->AddKeyRanges();
+ x.Serialize(keyRange);
+ }
+ }
+ }
+}
+
std::pair<const TSerializedCellVec*, bool> TShardKeyRanges::GetRightBorder() const {
if (FullRange) {
return !FullRange->Point ? std::make_pair(&FullRange->To, true) : std::make_pair(&FullRange->From, true);
diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.h b/ydb/core/kqp/executer/kqp_tasks_graph.h
index 6dbc9bac458..fbe3ba7df4b 100644
--- a/ydb/core/kqp/executer/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer/kqp_tasks_graph.h
@@ -12,6 +12,7 @@
namespace NKikimrTxDataShard {
class TKqpTransaction_TDataTaskMeta_TKeyRange;
class TKqpTransaction_TScanTaskMeta_TReadOpMeta;
+class TKqpReadRangesSourceSettings;
}
namespace NKikimr {
@@ -91,6 +92,7 @@ struct TShardKeyRanges {
TString ToString(const TVector<NScheme::TTypeInfo>& keyTypes, const NScheme::TTypeRegistry& typeRegistry) const;
void SerializeTo(NKikimrTxDataShard::TKqpTransaction_TDataTaskMeta_TKeyRange* proto) const;
void SerializeTo(NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta_TReadOpMeta* proto) const;
+ void SerializeTo(NKikimrTxDataShard::TKqpReadRangesSourceSettings* proto) const;
std::pair<const TSerializedCellVec*, bool> GetRightBorder() const;
};
@@ -171,7 +173,23 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew
void BuildKqpTaskGraphResultChannels(TKqpTasksGraph& tasksGraph, const NKqpProto::TKqpPhyTx& tx, ui64 txIdx);
void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo,
ui64 txId, bool enableSpilling);
-TVector<TTaskMeta::TColumn> BuildKqpColumns(const NKqpProto::TKqpPhyTableOperation& op, const TKqpTableKeys::TTable& table);
+
+template<typename Proto>
+TVector<TTaskMeta::TColumn> BuildKqpColumns(const Proto& op, const TKqpTableKeys::TTable& table) {
+ TVector<TTaskMeta::TColumn> columns;
+ columns.reserve(op.GetColumns().size());
+
+ for (const auto& column : op.GetColumns()) {
+ TTaskMeta::TColumn c;
+ c.Id = column.GetId();
+ c.Type = table.Columns.at(column.GetName()).Type;
+ c.Name = column.GetName();
+
+ columns.emplace_back(std::move(c));
+ }
+
+ return columns;
+}
struct TKqpTaskOutputType {
enum : ui32 {
diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
index 051a73d2f56..8b323d48b71 100644
--- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
+++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
@@ -88,6 +88,18 @@
]
},
{
+ "Name": "TKqpReadRangesSourceSettings",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "KqpRowsSourceSettings"},
+ "Children": [
+ {"Index": 0, "Name": "Table", "Type": "TKqpTable"},
+ {"Index": 1, "Name": "Columns", "Type": "TCoAtomList"},
+ {"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList"},
+ {"Index": 3, "Name": "RangesExpr", "Type": "TExprBase"},
+ {"Index": 4, "Name": "ExplainPrompt", "Type": "TCoNameValueTupleList", "Optional": true}
+ ]
+ },
+ {
"Name": "TKqlReadTableRanges",
"Base": "TKqlReadTableRangesBase",
"Match": {"Type": "Callable", "Name": "KqlReadTableRanges"}
diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp
index ad91854efbf..e32e895883f 100644
--- a/ydb/core/kqp/host/kqp_type_ann.cpp
+++ b/ydb/core/kqp/host/kqp_type_ann.cpp
@@ -235,6 +235,44 @@ TStatus AnnotateReadTable(const TExprNode::TPtr& node, TExprContext& ctx, const
return TStatus::Ok;
}
+TStatus AnnotateKqpSourceSettings(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
+ const TKikimrTablesData& tablesData, bool withSystemColumns)
+{
+ auto table = ResolveTable(node->Child(TKqpReadRangesSourceSettings::idx_Table), ctx, cluster, tablesData);
+ if (!table.second) {
+ return TStatus::Error;
+ }
+
+ const auto& columns = node->ChildPtr(TKqpReadRangesSourceSettings::idx_Columns);
+ if (!EnsureTupleOfAtoms(*columns, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto rowType = GetReadTableRowType(ctx, tablesData, cluster, table.first, TCoAtomList(columns), withSystemColumns);
+ if (!rowType) {
+ return TStatus::Error;
+ }
+
+ auto ranges = node->Child(TKqpReadRangesSourceSettings::idx_RangesExpr);
+ if (!TCoVoid::Match(ranges) &&
+ !TCoArgument::Match(ranges) &&
+ !TCoParameter::Match(ranges) &&
+ !TCoRangeFinalize::Match(ranges) &&
+ !TDqPhyPrecompute::Match(ranges) &&
+ !TKqpTxResultBinding::Match(ranges))
+ {
+ ctx.AddError(TIssue(
+ ctx.GetPosition(ranges->Pos()),
+ TStringBuilder()
+ << "Expected Void, Parameter, Argument or RangeFinalize in ranges, but got: "
+ << ranges->Content()
+ ));
+ return TStatus::Error;
+ }
+
+ node->SetTypeAnn(ctx.MakeType<TStreamExprType>(rowType));
+ return TStatus::Ok;
+}
TStatus AnnotateReadTableRanges(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
const TKikimrTablesData& tablesData, bool withSystemColumns)
@@ -1309,6 +1347,10 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl
return AnnotateKqpEnsure(input, ctx);
}
+ if (TKqpReadRangesSourceSettings::Match(input.Get())) {
+ return AnnotateKqpSourceSettings(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled());
+ }
+
return dqTransformer->Transform(input, output, ctx);
});
}
diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
index 56e3d118501..b40b1e7b5be 100644
--- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
@@ -283,22 +283,7 @@ private:
TVector<TCoArgument> newArgs;
TNodeOnNodeOwnedMap argsMap;
- for (ui32 i = 0; i < stage.Inputs().Size(); ++i) {
- const auto& input = stage.Inputs().Item(i);
- const auto& inputArg = stage.Program().Args().Arg(i);
-
- auto maybeBinding = input.Maybe<TKqpTxResultBinding>();
-
- if (!maybeBinding.IsValid()) {
- auto newArg = ctx.NewArgument(inputArg.Pos(), inputArg.Name());
- newInputs.push_back(input);
- newArgs.emplace_back(TCoArgument(newArg));
- argsMap.emplace(inputArg.Raw(), std::move(newArg));
- continue;
- }
-
- auto binding = maybeBinding.Cast();
-
+ auto makeParameterBinding = [&ctx, &bindingsMap] (TKqpTxResultBinding binding, TPositionHandle pos) {
TString paramName = TStringBuilder() << ParamNamePrefix
<< "tx_result_binding_" << binding.TxIndex().Value() << "_" << binding.ResultIndex().Value();
@@ -308,9 +293,9 @@ private:
type = type->Cast<TTypeExprType>()->GetType();
YQL_ENSURE(type);
- TExprBase parameter = Build<TCoParameter>(ctx, input.Pos())
+ TExprBase parameter = Build<TCoParameter>(ctx, pos)
.Name().Build(paramName)
- .Type(ExpandType(input.Pos(), *type, ctx))
+ .Type(ExpandType(pos, *type, ctx))
.Done();
// TODO: (Iterator|ToStream (Parameter ...)) -> (ToFlow (Parameter ...))
@@ -320,7 +305,7 @@ private:
// .Done();
// }
- auto paramBinding = Build<TKqpParamBinding>(ctx, input.Pos())
+ auto paramBinding = Build<TKqpParamBinding>(ctx, pos)
.Name().Build(paramName)
.Binding(binding)
.Done();
@@ -332,7 +317,37 @@ private:
<< ", first: " << KqpExprToPrettyString(inserted.first->second.Binding().Ref(), ctx)
<< ", second: " << KqpExprToPrettyString(binding, ctx));
}
- argsMap.emplace(inputArg.Raw(), parameter.Ptr());
+ return parameter;
+ };
+
+ TNodeOnNodeOwnedMap sourceReplaceMap;
+ for (ui32 i = 0; i < stage.Inputs().Size(); ++i) {
+ const auto& input = stage.Inputs().Item(i);
+ const auto& inputArg = stage.Program().Args().Arg(i);
+
+ if (auto source = input.Maybe<TDqSource>()) {
+ VisitExpr(input.Ptr(),
+ [&](const TExprNode::TPtr& node) {
+ TExprBase expr(node);
+ YQL_ENSURE(!expr.Maybe<TDqConnection>().IsValid());
+ if (auto binding = expr.Maybe<TKqpTxResultBinding>()) {
+ sourceReplaceMap.emplace(node.Get(), makeParameterBinding(binding.Cast(), node->Pos()).Ptr());
+ }
+ return true;
+ });
+ }
+
+ auto maybeBinding = input.Maybe<TKqpTxResultBinding>();
+
+ if (!maybeBinding.IsValid()) {
+ auto newArg = ctx.NewArgument(inputArg.Pos(), inputArg.Name());
+ newInputs.push_back(input);
+ newArgs.emplace_back(TCoArgument(newArg));
+ argsMap.emplace(inputArg.Raw(), std::move(newArg));
+ continue;
+ }
+
+ argsMap.emplace(inputArg.Raw(), makeParameterBinding(maybeBinding.Cast(), input.Pos()).Ptr());
}
auto inputs = Build<TExprList>(ctx, stage.Pos())
@@ -340,7 +355,7 @@ private:
.Done();
return Build<TDqPhyStage>(ctx, stage.Pos())
- .Inputs(ctx.ReplaceNodes(inputs.Ptr(), stagesMap))
+ .Inputs(ctx.ReplaceNodes(ctx.ReplaceNodes(inputs.Ptr(), stagesMap), sourceReplaceMap))
.Program()
.Args(newArgs)
.Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), argsMap))
@@ -388,6 +403,29 @@ private:
bool IsPrecompute;
};
+TVector<TDqPhyPrecompute> PrecomputeInputs(const TDqStage& stage) {
+ TVector<TDqPhyPrecompute> result;
+ for (const auto& input : stage.Inputs()) {
+ if (auto maybePrecompute = input.Maybe<TDqPhyPrecompute>()) {
+ result.push_back(maybePrecompute.Cast());
+ } else if (auto maybeSource = input.Maybe<TDqSource>()) {
+ VisitExpr(maybeSource.Cast().Ptr(),
+ [&] (const TExprNode::TPtr& ptr) {
+ TExprBase node(ptr);
+ if (auto maybePrecompute = node.Maybe<TDqPhyPrecompute>()) {
+ result.push_back(maybePrecompute.Cast());
+ return false;
+ }
+ if (auto maybeConnection = node.Maybe<TDqConnection>()) {
+ YQL_ENSURE(false, "unexpected connection in source");
+ }
+ return true;
+ });
+ }
+ }
+ return result;
+}
+
class TKqpBuildTxsTransformer : public TSyncTransformerBase {
public:
TKqpBuildTxsTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx,
@@ -556,27 +594,30 @@ private:
}
auto stage = maybeStage.Cast();
+ auto precomputeInputs = PrecomputeInputs(stage);
+ for (auto& precompute : precomputeInputs) {
+ auto precomputeStage = precompute.Connection().Output().Stage();
+ precomputes.emplace(precomputeStage.Raw(), precomputeStage.Ptr());
+ dependencies.emplace(stage.Raw(), stage.Ptr());
+ }
for (const auto& input : stage.Inputs()) {
- const TExprNode* inputStage;
-
- if (auto maybePrecompute = input.Maybe<TDqPhyPrecompute>()) {
- auto precomputeStage = maybePrecompute.Cast().Connection().Output().Stage();
- precomputes.emplace(precomputeStage.Raw(), precomputeStage.Ptr());
- dependencies.emplace(stage.Raw(), stage.Ptr());
- inputStage = precomputeStage.Raw();
+ if (input.Maybe<TDqPhyPrecompute>()) {
+ continue;
} else if (auto maybeConnection = input.Maybe<TDqConnection>()) {
- inputStage = maybeConnection.Cast().Output().Stage().Raw();
+ const TExprNode* inputStage = maybeConnection.Cast().Output().Stage().Raw();
+ if (dependencies.contains(inputStage)) {
+ dependencies.emplace(stage.Raw(), stage.Ptr());
+ }
+ } else if (auto maybeSource = input.Maybe<TDqSource>()) {
+ // handled in PrecomputeInputs
+ continue;
} else if (input.Maybe<TKqpTxResultBinding>()) {
// ok
continue;
} else {
YQL_ENSURE(false, "Unexpected stage input: " << input.Ref().Content());
}
-
- if (dependencies.contains(inputStage)) {
- dependencies.emplace(stage.Raw(), stage.Ptr());
- }
}
return true;
@@ -618,15 +659,9 @@ private:
for (auto& [_, stagePtr] : dependantStagesMap) {
TDqStage stage(stagePtr);
+ auto precomputes = PrecomputeInputs(stage);
- for (const auto& input : stage.Inputs()) {
- auto maybePrecompute = input.Maybe<TDqPhyPrecompute>();
-
- if (!maybePrecompute.IsValid()) {
- continue;
- }
-
- auto precompute = maybePrecompute.Cast();
+ for (const auto& precompute : precomputes) {
auto precomputeConnection = precompute.Connection();
auto precomputeStage = precomputeConnection.Output().Stage();
diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp
index 4bdf5662401..46ff7a38fce 100644
--- a/ydb/core/kqp/opt/kqp_query_plan.cpp
+++ b/ydb/core/kqp/opt/kqp_query_plan.cpp
@@ -535,6 +535,251 @@ private:
}
}
+ void Visit(const TKqpReadRangesSourceSettings& sourceSettings, TQueryPlanNode& planNode) {
+ if (sourceSettings.RangesExpr().Maybe<TKqlKeyRange>()) {
+ auto table = TString(sourceSettings.Table().Path());
+ auto range = sourceSettings.RangesExpr().Cast<TKqlKeyRange>();
+
+ TOperator op;
+ TTableRead readInfo;
+
+ auto describeBoundary = [this](const TExprBase& key) {
+ if (auto param = key.Maybe<TCoParameter>()) {
+ return param.Cast().Name().StringValue();
+ }
+
+ if (auto param = key.Maybe<TCoNth>().Tuple().Maybe<TCoParameter>()) {
+ if (auto maybeResultBinding = ContainResultBinding(param.Cast().Name().StringValue())) {
+ auto [txId, resId] = *maybeResultBinding;
+ if (auto result = GetResult(txId, resId)) {
+ auto index = FromString<ui32>(key.Cast<TCoNth>().Index());
+ Y_ENSURE(index < result->Size());
+ return (*result)[index].GetDataText();
+ }
+ }
+ }
+
+ if (auto literal = key.Maybe<TCoDataCtor>()) {
+ return literal.Cast().Literal().StringValue();
+ }
+
+ return TString("n/a");
+ };
+
+ /* Collect info about scan range */
+ struct TKeyPartRange {
+ TString From;
+ TString To;
+ TString ColumnName;
+ };
+ auto& tableData = SerializerCtx.TablesData->GetTable(SerializerCtx.Cluster, table);
+ op.Properties["Table"] = tableData.RelativePath ? *tableData.RelativePath : table;
+ planNode.NodeInfo["Tables"].AppendValue(op.Properties["Table"]);
+ TVector<TKeyPartRange> scanRangeDescr(tableData.Metadata->KeyColumnNames.size());
+
+ auto maybeFromKey = range.From().Maybe<TKqlKeyTuple>();
+ auto maybeToKey = range.To().Maybe<TKqlKeyTuple>();
+ if (maybeFromKey && maybeToKey) {
+ auto fromKey = maybeFromKey.Cast();
+ auto toKey = maybeToKey.Cast();
+
+ for (ui32 i = 0; i < fromKey.ArgCount(); ++i) {
+ scanRangeDescr[i].From = describeBoundary(fromKey.Arg(i));
+ }
+ for (ui32 i = 0; i < toKey.ArgCount(); ++i) {
+ scanRangeDescr[i].To = describeBoundary(toKey.Arg(i));
+ }
+ for (ui32 i = 0; i < scanRangeDescr.size(); ++i) {
+ scanRangeDescr[i].ColumnName = tableData.Metadata->KeyColumnNames[i];
+ }
+
+ TString leftParen = range.From().Maybe<TKqlKeyInc>().IsValid() ? "[" : "(";
+ TString rightParen = range.To().Maybe<TKqlKeyInc>().IsValid() ? "]" : ")";
+ bool hasRangeScans = false;
+ auto& ranges = op.Properties["ReadRange"];
+ for (const auto& keyPartRange : scanRangeDescr) {
+ TStringBuilder rangeDescr;
+
+ if (keyPartRange.From == keyPartRange.To) {
+ if (keyPartRange.From.Empty()) {
+ rangeDescr << keyPartRange.ColumnName << " (-∞, +∞)";
+ readInfo.ScanBy.push_back(rangeDescr);
+ } else {
+ rangeDescr << keyPartRange.ColumnName
+ << " (" << keyPartRange.From << ")";
+ readInfo.LookupBy.push_back(rangeDescr);
+ }
+ } else {
+ rangeDescr << keyPartRange.ColumnName << " "
+ << (keyPartRange.From.Empty() ? "(" : leftParen)
+ << (keyPartRange.From.Empty() ? "-∞" : keyPartRange.From) << ", "
+ << (keyPartRange.To.Empty() ? "+∞" : keyPartRange.To)
+ << (keyPartRange.To.Empty() ? ")" : rightParen);
+ readInfo.ScanBy.push_back(rangeDescr);
+ hasRangeScans = true;
+ }
+
+ ranges.AppendValue(rangeDescr);
+ }
+
+ // Scan which fixes only few first members of compound primary key were called "Lookup"
+ // by older explain version. We continue to do so.
+ if (readInfo.LookupBy.size() > 0) {
+ readInfo.Type = EPlanTableReadType::Lookup;
+ } else {
+ readInfo.Type = hasRangeScans ? EPlanTableReadType::Scan : EPlanTableReadType::FullScan;
+ }
+ }
+
+ auto& columns = op.Properties["ReadColumns"];
+ for (auto const& col : sourceSettings.Columns()) {
+ readInfo.Columns.emplace_back(TString(col.Value()));
+ columns.AppendValue(col.Value());
+ }
+
+ auto settings = NYql::TKqpReadTableSettings::Parse(sourceSettings.Settings());
+ if (settings.ItemsLimit && !readInfo.Limit) {
+ auto limit = GetExprStr(TExprBase(settings.ItemsLimit), false);
+ if (auto maybeResultBinding = ContainResultBinding(limit)) {
+ const auto [txId, resId] = *maybeResultBinding;
+ if (auto result = GetResult(txId, resId)) {
+ limit = result->GetDataText();
+ }
+ }
+
+ readInfo.Limit = limit;
+ op.Properties["ReadLimit"] = limit;
+ }
+ if (settings.Reverse) {
+ readInfo.Reverse = true;
+ op.Properties["Reverse"] = true;
+ }
+
+ SerializerCtx.Tables[table].Reads.push_back(readInfo);
+
+ ui32 operatorId;
+ if (readInfo.Type == EPlanTableReadType::Scan) {
+ op.Properties["Name"] = "TableRangeScan";
+ operatorId = AddOperator(planNode, "TableRangeScan", std::move(op));
+ } else if (readInfo.Type == EPlanTableReadType::FullScan) {
+ op.Properties["Name"] = "TableFullScan";
+ operatorId = AddOperator(planNode, "TableFullScan", std::move(op));
+ } else if (readInfo.Type == EPlanTableReadType::Lookup) {
+ op.Properties["Name"] = "TablePointLookup";
+ operatorId = AddOperator(planNode, "TablePointLookup", std::move(op));
+ } else {
+ op.Properties["Name"] = "TableScan";
+ operatorId = AddOperator(planNode, "TableScan", std::move(op));
+ }
+ } else {
+ const auto table = TString(sourceSettings.Table().Path());
+ const auto explainPrompt = TKqpReadTableExplainPrompt::Parse(sourceSettings.ExplainPrompt().Cast());
+
+ TTableRead readInfo;
+ TOperator op;
+
+ auto& tableData = SerializerCtx.TablesData->GetTable(SerializerCtx.Cluster, table);
+ op.Properties["Table"] = tableData.RelativePath ? *tableData.RelativePath : table;
+ planNode.NodeInfo["Tables"].AppendValue(op.Properties["Table"]);
+
+ auto rangesDesc = PrettyExprStr(sourceSettings.RangesExpr());
+ if (rangesDesc == "Void" || explainPrompt.UsedKeyColumns.empty()) {
+ readInfo.Type = EPlanTableReadType::FullScan;
+
+ auto& ranges = op.Properties["ReadRanges"];
+ for (const auto& col : tableData.Metadata->KeyColumnNames) {
+ TStringBuilder rangeDesc;
+ rangeDesc << col << " (-∞, +∞)";
+ readInfo.ScanBy.push_back(rangeDesc);
+ ranges.AppendValue(rangeDesc);
+ }
+ } else if (auto maybeResultBinding = ContainResultBinding(rangesDesc)) {
+ readInfo.Type = EPlanTableReadType::Scan;
+
+ auto [txId, resId] = *maybeResultBinding;
+ if (auto result = GetResult(txId, resId)) {
+ auto ranges = (*result)[0];
+ const auto& keyColumns = tableData.Metadata->KeyColumnNames;
+ for (size_t rangeId = 0; rangeId < ranges.Size(); ++rangeId) {
+ Y_ENSURE(ranges[rangeId].HaveValue() && ranges[rangeId].Size() == 2);
+ auto from = ranges[rangeId][0];
+ auto to = ranges[rangeId][1];
+
+ for (size_t colId = 0; colId < keyColumns.size(); ++colId) {
+ if (!from[colId].HaveValue() && !to[colId].HaveValue()) {
+ continue;
+ }
+
+ TStringBuilder rangeDesc;
+ rangeDesc << keyColumns[colId] << " "
+ << (from[keyColumns.size()].GetDataText() == "1" ? "[" : "(")
+ << (from[colId].HaveValue() ? from[colId].GetDataText() : "-∞") << ", "
+ << (to[colId].HaveValue() ? to[colId].GetDataText() : "+∞")
+ << (to[keyColumns.size()].GetDataText() == "1" ? "]" : ")");
+
+ readInfo.ScanBy.push_back(rangeDesc);
+ op.Properties["ReadRanges"].AppendValue(rangeDesc);
+ }
+ }
+ } else {
+ op.Properties["ReadRanges"] = rangesDesc;
+ }
+ } else {
+ Y_ENSURE(false, rangesDesc);
+ }
+
+ if (!explainPrompt.UsedKeyColumns.empty()) {
+ auto& usedColumns = op.Properties["ReadRangesKeys"];
+ for (const auto& col : explainPrompt.UsedKeyColumns) {
+ usedColumns.AppendValue(col);
+ }
+ }
+
+ if (explainPrompt.ExpectedMaxRanges) {
+ op.Properties["ReadRangesExpectedSize"] = explainPrompt.ExpectedMaxRanges;
+ }
+
+ auto& columns = op.Properties["ReadColumns"];
+ for (const auto& col : sourceSettings.Columns()) {
+ readInfo.Columns.emplace_back(TString(col.Value()));
+ columns.AppendValue(col.Value());
+ }
+
+ auto settings = NYql::TKqpReadTableSettings::Parse(sourceSettings.Settings());
+ if (settings.ItemsLimit && !readInfo.Limit) {
+ auto limit = GetExprStr(TExprBase(settings.ItemsLimit), false);
+ if (auto maybeResultBinding = ContainResultBinding(limit)) {
+ const auto [txId, resId] = *maybeResultBinding;
+ if (auto result = GetResult(txId, resId)) {
+ limit = result->GetDataText();
+ }
+ }
+
+ readInfo.Limit = limit;
+ op.Properties["ReadLimit"] = limit;
+ }
+ if (settings.Reverse) {
+ readInfo.Reverse = true;
+ op.Properties["Reverse"] = true;
+ }
+
+ if (tableData.Metadata->Kind == EKikimrTableKind::Olap) {
+ op.Properties["PredicatePushdown"] = SerializerCtx.Config.Get()->PushOlapProcess();
+ }
+
+ ui32 operatorId;
+ if (readInfo.Type == EPlanTableReadType::FullScan) {
+ op.Properties["Name"] = "TableFullScan";
+ operatorId = AddOperator(planNode, "TableFullScan", std::move(op));
+ } else {
+ op.Properties["Name"] = "TableRangesScan";
+ operatorId = AddOperator(planNode, "TableRangesScan", std::move(op));
+ }
+
+ SerializerCtx.Tables[table].Reads.push_back(std::move(readInfo));
+ }
+ }
+
void Visit(const TExprBase& expr, TQueryPlanNode& planNode) {
if (expr.Maybe<TDqPhyStage>()) {
auto stageGuid = NDq::TDqStageSettings::Parse(expr.Cast<TDqPhyStage>()).Id;
@@ -575,12 +820,18 @@ private:
}
for (const auto& input : expr.Cast<TDqStageBase>().Inputs()) {
- auto inputCn = input.Cast<TDqConnection>();
+ if (auto source = input.Maybe<TDqSource>()) {
+ auto settings = source.Settings().Maybe<TKqpReadRangesSourceSettings>();
+ YQL_ENSURE(settings.IsValid(), "only readranges sources are supported");
+ Visit(settings.Cast(), stagePlanNode);
+ } else {
+ auto inputCn = input.Cast<TDqConnection>();
- auto& inputPlanNode = AddPlanNode(stagePlanNode);
- FillConnectionPlanNode(inputCn, inputPlanNode);
+ auto& inputPlanNode = AddPlanNode(stagePlanNode);
+ FillConnectionPlanNode(inputCn, inputPlanNode);
- Visit(inputCn.Output().Stage(), inputPlanNode);
+ Visit(inputCn.Output().Stage(), inputPlanNode);
+ }
}
} else {
Visit(expr.Ptr(), planNode);
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
index a0e28b2ab21..da6a009c9f0 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
@@ -189,11 +189,13 @@ TExprBase KqpBuildReadTableRangesStage(TExprBase node, TExprContext& ctx,
auto ranges = read.Ranges();
auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, read.Table().Path());
+ bool useSource = kqpCtx.Config->FeatureFlags.GetEnableKqpScanQuerySourceRead() && kqpCtx.IsScanQuery();
bool fullScan = TCoVoid::Match(ranges.Raw());
TVector<TExprBase> input;
TMaybeNode<TExprBase> argument;
TVector<TCoArgument> programArgs;
+ TMaybeNode<TExprBase> rangesExpr;
if (!fullScan) {
TMaybe<TDqStage> rangesStage;
@@ -268,28 +270,59 @@ TExprBase KqpBuildReadTableRangesStage(TExprBase node, TExprContext& ctx,
.Build()
.Done();
- argument = Build<TCoArgument>(ctx, read.Pos())
- .Name("_kqp_pc_ranges_arg_0")
- .Done();
+ rangesExpr = precompute;
+ if (!useSource) {
+ argument = Build<TCoArgument>(ctx, read.Pos())
+ .Name("_kqp_pc_ranges_arg_0")
+ .Done();
- input.push_back(precompute);
- programArgs.push_back(argument.Cast<TCoArgument>());
+ input.push_back(precompute);
+ programArgs.push_back(argument.Cast<TCoArgument>());
+ }
} else {
argument = read.Ranges();
}
TMaybeNode<TExprBase> phyRead;
+ TMaybeNode<TExprBase> sourceArg;
+ if (useSource) {
+ YQL_ENSURE(rangesExpr.IsValid());
+
+ input.push_back(
+ Build<TDqSource>(ctx, read.Pos())
+ .Settings<TKqpReadRangesSourceSettings>()
+ .Table(read.Table())
+ .Columns(read.Columns())
+ .Settings(read.Settings())
+ .RangesExpr(rangesExpr.Cast())
+ .ExplainPrompt(read.ExplainPrompt())
+ .Build()
+ .DataSource<TCoDataSource>()
+ .Category<TCoAtom>().Value(KqpReadRangesSourceName).Build()
+ .Build()
+ .Done());
+ sourceArg = Build<TCoArgument>(ctx, read.Pos())
+ .Name("_kqp_pc_source_arg_0")
+ .Done();
+ programArgs.push_back(sourceArg.Cast<TCoArgument>());
+ }
+
switch (tableDesc.Metadata->Kind) {
case EKikimrTableKind::Datashard:
case EKikimrTableKind::SysView:
- phyRead = Build<TKqpReadTableRanges>(ctx, read.Pos())
- .Table(read.Table())
- .Ranges(argument.Cast())
- .Columns(read.Columns())
- .Settings(read.Settings())
- .ExplainPrompt(read.ExplainPrompt())
- .Done();
+ if (useSource) {
+ YQL_ENSURE(sourceArg.IsValid());
+ phyRead = sourceArg.Cast();
+ } else {
+ phyRead = Build<TKqpReadTableRanges>(ctx, read.Pos())
+ .Table(read.Table())
+ .Ranges(argument.Cast())
+ .Columns(read.Columns())
+ .Settings(read.Settings())
+ .ExplainPrompt(read.ExplainPrompt())
+ .Done();
+ }
break;
case EKikimrTableKind::Olap:
diff --git a/ydb/core/kqp/runtime/CMakeLists.txt b/ydb/core/kqp/runtime/CMakeLists.txt
index 2908169720e..3f0643e6e81 100644
--- a/ydb/core/kqp/runtime/CMakeLists.txt
+++ b/ydb/core/kqp/runtime/CMakeLists.txt
@@ -39,6 +39,7 @@ target_sources(core-kqp-runtime PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_effects.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_output_stream.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_program_builder.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_read_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_read_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
new file mode 100644
index 00000000000..472c6f25c54
--- /dev/null
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -0,0 +1,837 @@
+#include "kqp_read_actor.h"
+
+#include <ydb/core/kqp/runtime/kqp_scan_data.h>
+#include <ydb/core/base/tablet_pipecache.h>
+#include <ydb/core/engine/minikql/minikql_engine_host.h>
+#include <ydb/core/kqp/common/kqp_yql.h>
+#include <ydb/core/protos/tx_datashard.pb.h>
+#include <ydb/core/tx/datashard/datashard.h>
+#include <ydb/core/tx/datashard/range_ops.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h>
+
+#include <library/cpp/actors/core/interconnect.h>
+#include <library/cpp/actors/core/actorsystem.h>
+
+#include <util/generic/intrlist.h>
+
+namespace {
+
+static constexpr ui64 EVREAD_MAX_ROWS = 32767;
+static constexpr ui64 EVREAD_MAX_BYTES = 200_MB;
+
+static constexpr ui64 MAX_SHARD_RETRIES = 5;
+//static constexpr ui64 MAX_TOTAL_SHARD_RETRIES = 20;
+static constexpr ui64 MAX_SHARD_RESOLVES = 3;
+
+bool IsDebugLogEnabled(const NActors::TActorSystem* actorSystem, NActors::NLog::EComponent component) {
+ auto* settings = actorSystem->LoggerSettings();
+ return settings && settings->Satisfies(NActors::NLog::EPriority::PRI_DEBUG, component);
+}
+
+}
+
+namespace NKikimr {
+namespace NKqp {
+
+using namespace NYql;
+using namespace NYql::NDq;
+using namespace NKikimr;
+using namespace NKikimr::NDataShard;
+
+
+class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq::IDqComputeActorAsyncInput {
+ using TBase = TActorBootstrapped<TKqpReadActor>;
+public:
+ struct TShardState : public TIntrusiveListItem<TShardState> {
+ TSmallVec<TSerializedTableRange> Ranges;
+ TSmallVec<TSerializedCellVec> Points;
+
+ TOwnedCellVec LastKey;
+ ui32 FirstUnprocessedRequest = 0;
+ TMaybe<ui32> ReadId;
+ ui64 TabletId;
+
+ size_t ResolveAttempt = 0;
+ size_t RetryAttempt = 0;
+
+ TShardState(ui64 tabletId)
+ : TabletId(tabletId)
+ {
+ }
+
+ TTableRange GetBounds() {
+ if (Ranges.empty()) {
+ YQL_ENSURE(!Points.empty());
+ return TTableRange(
+ Points.front().GetCells(), true,
+ Points.back().GetCells(), true);
+ } else {
+ return TTableRange(
+ Ranges.front().From.GetCells(), Ranges.front().FromInclusive,
+ Ranges.back().To.GetCells(), Ranges.back().ToInclusive);
+ }
+ }
+
+ static void MakePrefixRange(TSerializedTableRange& range, size_t keyColumns) {
+ if (keyColumns == 0) {
+ return;
+ }
+ bool fromInclusive = range.FromInclusive;
+ TConstArrayRef<TCell> from = range.From.GetCells();
+
+ bool toInclusive = range.ToInclusive;
+ TConstArrayRef<TCell> to = range.To.GetCells();
+
+ bool noop = true;
+ // Recognize and remove padding made here https://a.yandex-team.ru/arcadia/ydb/core/kqp/executer/kqp_partition_helper.cpp?rev=r10109549#L284
+
+ // Absent cells mean infinity. So in prefix notation `From` should be exclusive.
+ // For example x >= (Key1, Key2, +infinity) is equivalent to x > (Key1, Key2) where x is arbitrary tuple
+ if (range.From.GetCells().size() < keyColumns) {
+ fromInclusive = false;
+ noop = range.FromInclusive;
+ } else if (range.FromInclusive) {
+ // Nulls are minimum values so we should remove null padding.
+ // x >= (Key1, Key2, null) is equivalent to x >= (Key1, Key2)
+ ssize_t i = range.From.GetCells().size();
+ while (i > 0 && range.From.GetCells()[i - 1].IsNull()) {
+ --i;
+ noop = false;
+ }
+ from = range.From.GetCells().subspan(0, i);
+ }
+
+ // Absent cells mean infinity. So in prefix notation `To` should be inclusive.
+ // For example x < (Key1, Key2, +infinity) is equivalent to x <= (Key1, Key2) where x is arbitrary tuple
+ if (range.To.GetCells().size() < keyColumns) {
+ toInclusive = true;
+ // Nulls are minimum values so we should remove null padding.
+ // For example x < (Key1, Key2, null) is equivalent to x < (Key1, Key2)
+ ssize_t i = range.To.GetCells().size();
+ while (i > 0 && range.To.GetCells()[i - 1].IsNull()) {
+ --i;
+ noop = false;
+ }
+ to = range.To.GetCells().subspan(0, i);
+ }
+
+ if (!noop) {
+ return;
+ }
+
+ range = TSerializedTableRange(from, fromInclusive, to, toInclusive);
+ }
+
+ void FillUnprocessedRanges(
+ TVector<TSerializedTableRange>& result,
+ TConstArrayRef<NScheme::TTypeInfo> keyTypes) const
+ {
+ // Form new vector. Skip ranges already read.
+ bool lastKeyEmpty = LastKey.DataSize() == 0;
+
+ if (!lastKeyEmpty) {
+ YQL_ENSURE(keyTypes.size() == LastKey.size(), "Key columns size != last key");
+ }
+
+ auto rangeIt = Ranges.begin() + FirstUnprocessedRequest;
+
+ if (!lastKeyEmpty) {
+ // It is range, where read was interrupted. Restart operation from last read key.
+ result.emplace_back(std::move(TSerializedTableRange(
+ TSerializedCellVec::Serialize(LastKey), rangeIt->To.GetBuffer(), false, rangeIt->ToInclusive
+ )));
+ ++rangeIt;
+ }
+
+ // And push all others
+ result.insert(result.end(), rangeIt, Ranges.end());
+ for (auto& range : result) {
+ MakePrefixRange(range, keyTypes.size());
+ }
+ }
+
+ void FillUnprocessedPoints(TVector<TSerializedCellVec>& result) const {
+ result.insert(result.begin(), Points.begin() + FirstUnprocessedRequest, Points.end());
+ }
+
+ void FillEvRead(TEvDataShard::TEvRead& ev, TConstArrayRef<NScheme::TTypeInfo> keyTypes) {
+ if (Ranges.empty()) {
+ FillUnprocessedPoints(ev.Keys);
+ } else {
+ FillUnprocessedRanges(ev.Ranges, keyTypes);
+ }
+ }
+
+ TString ToString(TConstArrayRef<NScheme::TTypeInfo> keyTypes) const {
+ TStringBuilder sb;
+ sb << "TShardState{ TabletId: " << TabletId << ", Last Key " << PrintLastKey(keyTypes)
+ << ", Ranges: [";
+ for (size_t i = 0; i < Ranges.size(); ++i) {
+ sb << "#" << i << ": " << DebugPrintRange(keyTypes, Ranges[i].ToTableRange(), *AppData()->TypeRegistry);
+ if (i + 1 != Ranges.size()) {
+ sb << ", ";
+ }
+ }
+ sb << "], "
+ << ", RetryAttempt: " << RetryAttempt << ", ResolveAttempt: " << ResolveAttempt << " }";
+ return sb;
+ }
+
+ TString PrintLastKey(TConstArrayRef<NScheme::TTypeInfo> keyTypes) const {
+ if (LastKey.empty()) {
+ return "<none>";
+ }
+ return DebugPrintPoint(keyTypes, LastKey, *AppData()->TypeRegistry);
+ }
+ };
+
+ using TShardQueue = TIntrusiveListWithAutoDelete<TShardState, TDelete>;
+
+ struct TReadState {
+ TShardState* Shard = nullptr;
+ bool Finished = false;
+ ui64 LastSeqNo;
+ TMaybe<TString> SerializedContinuationToken;
+
+ void RegisterMessage(const TEvDataShard::TEvReadResult& result) {
+ LastSeqNo = result.Record.GetSeqNo();
+ Finished = result.Record.GetFinished();
+ }
+
+ bool IsLastMessage(const TEvDataShard::TEvReadResult& result) {
+ return result.Record.GetFinished() || (Finished && result.Record.GetSeqNo() == LastSeqNo);
+ }
+
+ operator bool () {
+ return Shard;
+ }
+
+ void Reset() {
+ Shard = nullptr;
+ }
+ };
+
+public:
+ TKqpReadActor(NKikimrTxDataShard::TKqpReadRangesSourceSettings settings, NYql::NDq::TDqAsyncIoFactory::TSourceArguments args)
+ : Settings(std::move(settings))
+ , LogPrefix(TStringBuilder() << "SelfId: " << this->SelfId() << ", TxId: " << args.TxId << ", task: " << args.TaskId << ". ")
+ , ComputeActorId(args.ComputeActorId)
+ , InputIndex(args.InputIndex)
+ , TypeEnv(args.TypeEnv)
+ , HolderFactory(args.HolderFactory)
+ {
+ TableId = TTableId(
+ Settings.GetTable().GetTableId().GetOwnerId(),
+ Settings.GetTable().GetTableId().GetTableId(),
+ Settings.GetTable().GetSysViewInfo(),
+ Settings.GetTable().GetTableId().GetSchemaVersion()
+ );
+
+ KeyColumnTypes.reserve(Settings.GetKeyColumnTypes().size());
+ for (auto typeId : Settings.GetKeyColumnTypes()) {
+ KeyColumnTypes.push_back(NScheme::TTypeInfo((NScheme::TTypeId)typeId));
+ }
+ }
+
+ STFUNC(ReadyState) {
+ Y_UNUSED(ctx);
+ try {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvDataShard::TEvReadResult, HandleRead);
+ hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, HandleResolve);
+ hFunc(TEvPipeCache::TEvDeliveryProblem, HandleError);
+ IgnoreFunc(TEvInterconnect::TEvNodeConnected);
+ IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult);
+ }
+ } catch (const yexception& e) {
+ RuntimeError(e.what(), NYql::NDqProto::StatusIds::INTERNAL_ERROR);
+ }
+ }
+
+ void Bootstrap() {
+ //TODO: resolve if hint is not set
+ THolder<TShardState> stateHolder = MakeHolder<TShardState>(Settings.GetShardIdHint());
+ PendingShards.PushBack(stateHolder.Get());
+ auto& state = *stateHolder.Release();
+
+ if (Settings.HasFullRange()) {
+ state.Ranges.push_back(TSerializedTableRange(Settings.GetFullRange()));
+ } else {
+ YQL_ENSURE(Settings.HasRanges());
+ if (Settings.GetRanges().KeyRangesSize() > 0) {
+ YQL_ENSURE(Settings.GetRanges().KeyPointsSize() == 0);
+ for (const auto& range : Settings.GetRanges().GetKeyRanges()) {
+ state.Ranges.push_back(TSerializedTableRange(range));
+ }
+ } else {
+ for (const auto& point : Settings.GetRanges().GetKeyPoints()) {
+ state.Points.push_back(TSerializedCellVec(point));
+ }
+ }
+ }
+
+ StartTableScan();
+ Become(&TKqpReadActor::ReadyState);
+ }
+
+ bool StartTableScan() {
+ const ui32 maxAllowedInFlight = MaxInFlight;
+ bool isFirst = true;
+ while (!PendingShards.Empty() && RunningReads() + 1 <= maxAllowedInFlight) {
+ if (isFirst) {
+ CA_LOG_D("BEFORE: " << PendingShards.Size() << "." << RunningReads());
+ isFirst = false;
+ }
+ auto state = THolder<TShardState>(PendingShards.PopFront());
+ InFlightShards.PushFront(state.Get());
+ StartRead(state.Release());
+ }
+ if (!isFirst) {
+ CA_LOG_D("AFTER: " << PendingShards.Size() << "." << RunningReads());
+ }
+
+ CA_LOG_D("Scheduled table scans, in flight: " << RunningReads() << " shards. "
+ << "pending shards to read: " << PendingShards.Size() << ", ");
+
+ return RunningReads() > 0 || !PendingShards.Empty();
+ }
+
+ void ResolveShard(TShardState* state) {
+ if (state->ResolveAttempt >= MAX_SHARD_RESOLVES) {
+ RuntimeError(TStringBuilder() << "Table '" << Settings.GetTable().GetTablePath() << "' resolve limit exceeded",
+ NDqProto::StatusIds::UNAVAILABLE);
+ return;
+ }
+
+ state->ResolveAttempt++;
+
+ auto range = state->GetBounds();
+ TVector<TKeyDesc::TColumnOp> columns;
+ columns.reserve(Settings.GetColumns().size());
+ for (const auto& column : Settings.GetColumns()) {
+ TKeyDesc::TColumnOp op;
+ op.Column = column.GetId();
+ op.Operation = TKeyDesc::EColumnOperation::Read;
+ op.ExpectedType = NScheme::TTypeInfo((NScheme::TTypeId)column.GetType());
+ columns.emplace_back(std::move(op));
+ }
+
+ auto keyDesc = MakeHolder<TKeyDesc>(TableId, range, TKeyDesc::ERowOperation::Read,
+ KeyColumnTypes, columns);
+
+ CA_LOG_D("Sending TEvResolveKeySet update for table '" << Settings.GetTable().GetTablePath() << "'"
+ << ", range: " << DebugPrintRange(KeyColumnTypes, range, *AppData()->TypeRegistry)
+ << ", attempt #" << state->ResolveAttempt);
+
+ auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
+ request->ResultSet.emplace_back(std::move(keyDesc));
+
+ request->ResultSet.front().UserData = ResolveShardId;
+ ResolveShards[ResolveShardId] = state;
+ ResolveShardId += 1;
+
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
+ }
+
+ void HandleResolve(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
+ CA_LOG_D("Received TEvResolveKeySetResult update for table '" << Settings.GetTable().GetTablePath() << "'");
+
+ auto* request = ev->Get()->Request.Get();
+ if (request->ErrorCount > 0) {
+ CA_LOG_E("Resolve request failed for table '" << Settings.GetTable().GetTablePath() << "', ErrorCount# " << request->ErrorCount);
+
+ auto statusCode = NDqProto::StatusIds::UNAVAILABLE;
+ auto issueCode = TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE;
+ TString error;
+
+ for (const auto& x : request->ResultSet) {
+ if ((ui32)x.Status < (ui32)NSchemeCache::TSchemeCacheRequest::EStatus::OkScheme) {
+ // invalidate table
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
+
+ switch (x.Status) {
+ case NSchemeCache::TSchemeCacheRequest::EStatus::PathErrorNotExist:
+ statusCode = NDqProto::StatusIds::SCHEME_ERROR;
+ issueCode = TIssuesIds::KIKIMR_SCHEME_MISMATCH;
+ error = TStringBuilder() << "Table '" << Settings.GetTable().GetTablePath() << "' not exists.";
+ break;
+ case NSchemeCache::TSchemeCacheRequest::EStatus::TypeCheckError:
+ statusCode = NDqProto::StatusIds::SCHEME_ERROR;
+ issueCode = TIssuesIds::KIKIMR_SCHEME_MISMATCH;
+ error = TStringBuilder() << "Table '" << Settings.GetTable().GetTablePath() << "' scheme changed.";
+ break;
+ case NSchemeCache::TSchemeCacheRequest::EStatus::LookupError:
+ statusCode = NDqProto::StatusIds::UNAVAILABLE;
+ issueCode = TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE;
+ error = TStringBuilder() << "Failed to resolve table '" << Settings.GetTable().GetTablePath() << "'.";
+ break;
+ default:
+ statusCode = NDqProto::StatusIds::SCHEME_ERROR;
+ issueCode = TIssuesIds::KIKIMR_SCHEME_MISMATCH;
+ error = TStringBuilder() << "Unresolved table '" << Settings.GetTable().GetTablePath() << "'. Status: " << x.Status;
+ break;
+ }
+ }
+ }
+
+ return RuntimeError(error, statusCode);
+ }
+
+ auto keyDesc = std::move(request->ResultSet[0].KeyDescription);
+ THolder<TShardState> state;
+ if (auto ptr = ResolveShards[request->ResultSet[0].UserData]) {
+ state = THolder<TShardState>(ptr);
+ ResolveShards.erase(request->ResultSet[0].UserData);
+ } else {
+ return;
+ }
+
+ if (keyDesc->GetPartitions().empty()) {
+ TString error = TStringBuilder() << "No partitions to read from '" << Settings.GetTable().GetTablePath() << "'";
+ CA_LOG_E(error);
+ return RuntimeError(error, NDqProto::StatusIds::SCHEME_ERROR);
+ }
+
+ const auto& tr = *AppData()->TypeRegistry;
+
+ TVector<THolder<TShardState>> newShards;
+ newShards.reserve(keyDesc->GetPartitions().size());
+
+ for (ui64 idx = 0, i = 0; idx < keyDesc->GetPartitions().size(); ++idx) {
+ const auto& partition = keyDesc->GetPartitions()[idx];
+
+ TTableRange partitionRange{
+ idx == 0 ? state->Ranges.front().From.GetCells() : keyDesc->GetPartitions()[idx - 1].Range->EndKeyPrefix.GetCells(),
+ idx == 0 ? state->Ranges.front().FromInclusive : !keyDesc->GetPartitions()[idx - 1].Range->IsInclusive,
+ keyDesc->GetPartitions()[idx].Range->EndKeyPrefix.GetCells(),
+ keyDesc->GetPartitions()[idx].Range->IsInclusive
+ };
+
+ CA_LOG_D("Processing resolved ShardId# " << partition.ShardId
+ << ", partition range: " << DebugPrintRange(KeyColumnTypes, partitionRange, tr)
+ << ", i: " << i << ", state ranges: " << state->Ranges.size());
+
+ auto newShard = MakeHolder<TShardState>(partition.ShardId);
+
+ for (ui64 j = i; j < state->Ranges.size(); ++j) {
+ CA_LOG_D("Intersect state range #" << j << " " << DebugPrintRange(KeyColumnTypes, state->Ranges[j].ToTableRange(), tr)
+ << " with partition range " << DebugPrintRange(KeyColumnTypes, partitionRange, tr));
+
+ auto intersection = Intersect(KeyColumnTypes, partitionRange, state->Ranges[j].ToTableRange());
+
+ if (!intersection.IsEmptyRange(KeyColumnTypes)) {
+ CA_LOG_D("Add range to new shardId: " << partition.ShardId
+ << ", range: " << DebugPrintRange(KeyColumnTypes, intersection, tr));
+
+ newShard->Ranges.emplace_back(TSerializedTableRange(intersection));
+ } else {
+ CA_LOG_D("empty intersection");
+ if (j > i) {
+ i = j - 1;
+ }
+ break;
+ }
+ }
+
+ if (!newShard->Ranges.empty()) {
+ newShards.push_back(std::move(newShard));
+ }
+ }
+
+ YQL_ENSURE(!newShards.empty());
+ for (int i = newShards.ysize() - 1; i >= 0; --i) {
+ PendingShards.PushFront(newShards[i].Release());
+ }
+
+ if (!state->LastKey.empty()) {
+ PendingShards.Front()->LastKey = std::move(state->LastKey);
+ }
+
+ if (IsDebugLogEnabled(TlsActivationContext->ActorSystem(), NKikimrServices::KQP_COMPUTE)
+ && PendingShards.Size() + RunningReads() > 0)
+ {
+ TStringBuilder sb;
+ if (!PendingShards.Empty()) {
+ sb << "Pending shards States: ";
+ for (auto& st : PendingShards) {
+ sb << st.ToString(KeyColumnTypes) << "; ";
+ }
+ }
+
+ if (!InFlightShards.Empty()) {
+ sb << "In Flight shards States: ";
+ for (auto& st : InFlightShards) {
+ sb << st.ToString(KeyColumnTypes) << "; ";
+ }
+ }
+ CA_LOG_D(sb);
+ }
+ StartTableScan();
+ }
+
+ void RetryRead(ui64 id) {
+ if (!Reads[id]) {
+ return;
+ }
+
+ auto state = Reads[id].Shard;
+ Reads[id].Finished = true;
+
+ state->RetryAttempt += 1;
+ if (state->RetryAttempt >= MAX_SHARD_RETRIES) {
+ return ResolveShard(state);
+ }
+ CA_LOG_D("Retrying read #" << id);
+
+ auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>();
+ cancel->Record.SetReadId(id);
+ Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery);
+
+ if (Reads[id].SerializedContinuationToken) {
+ NKikimrTxDataShard::TReadContinuationToken token;
+ Y_VERIFY(token.ParseFromString(*(Reads[id].SerializedContinuationToken)), "Failed to parse continuation token");
+ state->FirstUnprocessedRequest = token.GetFirstUnprocessedQuery();
+
+ if (token.GetLastProcessedKey()) {
+ TSerializedCellVec vec(token.GetLastProcessedKey());
+ state->LastKey = TOwnedCellVec(vec.GetCells());
+ }
+ }
+
+ StartRead(state);
+ }
+
+ void StartRead(TShardState* state) {
+ THolder<TEvDataShard::TEvRead> ev(new TEvDataShard::TEvRead());
+ auto& record = ev->Record;
+
+ state->FillEvRead(*ev, KeyColumnTypes);
+ for (const auto& column : Settings.GetColumns()) {
+ record.AddColumns(column.GetId());
+ }
+
+ {
+ record.MutableSnapshot()->SetTxId(Settings.GetSnapshot().GetTxId());
+ record.MutableSnapshot()->SetStep(Settings.GetSnapshot().GetStep());
+ }
+
+ //if (RuntimeSettings.Timeout) {
+ // ev->Record.SetTimeoutMs(RuntimeSettings.Timeout.Get()->MilliSeconds());
+ //}
+ //ev->Record.SetStatsMode(RuntimeSettings.StatsMode);
+ //ev->Record.SetTxId(std::get<ui64>(TxId));
+
+ auto id = ReadId++;
+ Reads.resize(ReadId);
+ Reads[id].Shard = state;
+ state->ReadId = id;
+
+ record.SetReadId(id);
+
+ record.MutableTableId()->SetOwnerId(Settings.GetTable().GetTableId().GetOwnerId());
+ record.MutableTableId()->SetTableId(Settings.GetTable().GetTableId().GetTableId());
+ record.MutableTableId()->SetSchemaVersion(Settings.GetTable().GetSchemaVersion());
+
+ record.SetReverse(Settings.GetReverse());
+ if (Settings.GetItemsLimit()) {
+ record.SetMaxRows(Settings.GetItemsLimit());
+ } else {
+ record.SetMaxRows(EVREAD_MAX_ROWS);
+ }
+ record.SetMaxBytes(EVREAD_MAX_BYTES);
+
+ record.SetResultFormat(Settings.GetDataFormat());
+
+ CA_LOG_D(TStringBuilder() << "Send EvRead to shardId: " << state->TabletId << ", tablePath: " << Settings.GetTable().GetTablePath()
+ << ", ranges: " << DebugPrintRanges(KeyColumnTypes, ev->Ranges, *AppData()->TypeRegistry)
+ << ", readId = " << id);
+
+ ReadIdByTabletId[state->TabletId].push_back(id);
+ Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true),
+ IEventHandle::FlagTrackDelivery);
+ }
+
+ void HandleRead(TEvDataShard::TEvReadResult::TPtr ev) {
+ const auto& record = ev->Get()->Record;
+ auto id = record.GetReadId();
+ Y_VERIFY(id < ReadId);
+ if (!Reads[id] || Reads[id].Finished) {
+ // dropped read
+ return;
+ }
+
+ Reads[id].SerializedContinuationToken = record.GetContinuationToken();
+ if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) {
+ for (auto& issue : record.GetStatus().GetIssues()) {
+ CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage());
+ }
+ return RetryRead(id);
+ }
+
+ Reads[id].RegisterMessage(*ev->Get());
+
+ YQL_ENSURE(record.GetResultFormat() == NKikimrTxDataShard::EScanDataFormat::CELLVEC);
+
+ Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())});
+ CA_LOG_D(TStringBuilder() << "new data for read #" << id << " pushed");
+ Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
+ }
+
+ void HandleError(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
+ auto& msg = *ev->Get();
+
+ for (auto& read : ReadIdByTabletId[msg.TabletId]) {
+ CA_LOG_W("Got EvDeliveryProblem, TabletId: " << msg.TabletId << ", NotDelivered: " << msg.NotDelivered);
+ RetryRead(read);
+ }
+ }
+
+ size_t RunningReads() const {
+ return Reads.size() - ResetReads;
+ }
+
+ ui64 GetInputIndex() const override {
+ return InputIndex;
+ }
+
+ NMiniKQL::TBytesStatistics GetRowSize(const NUdf::TUnboxedValue* row) {
+ NMiniKQL::TBytesStatistics rowStats{0, 0};
+ for (size_t i = 0; i < Settings.ColumnsSize(); ++i) {
+ if (IsSystemColumn(Settings.GetColumns(i).GetId())) {
+ rowStats.AllocatedBytes += sizeof(NUdf::TUnboxedValue);
+ } else {
+ rowStats.AddStatistics(NMiniKQL::GetUnboxedValueSize(row[i], NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(i).GetType())));
+ }
+ }
+ if (Settings.ColumnsSize() == 0) {
+ rowStats.AddStatistics({sizeof(ui64), sizeof(ui64)});
+ }
+ return rowStats;
+ }
+
+ TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator() {
+ return TypeEnv.BindAllocator();
+ }
+
+ NMiniKQL::TBytesStatistics PackArrow(
+ THolder<TEventHandle<TEvDataShard::TEvReadResult>>& result,
+ ui64 shardId,
+ NKikimr::NMiniKQL::TUnboxedValueVector& batch)
+ {
+ NMiniKQL::TBytesStatistics stats;
+ bool hasResultColumns = false;
+ if (Settings.ColumnsSize() == 0) {
+ batch.resize(result->Get()->GetRowsCount(), HolderFactory.GetEmptyContainer());
+ } else {
+ TVector<NUdf::TUnboxedValue*> editAccessors(result->Get()->GetRowsCount());
+ batch.reserve(result->Get()->GetRowsCount());
+
+ for (ui64 rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) {
+ batch.emplace_back(HolderFactory.CreateDirectArrayHolder(
+ Settings.columns_size(),
+ editAccessors[rowIndex])
+ );
+ }
+
+ for (size_t columnIndex = 0; columnIndex < Settings.ColumnsSize(); ++columnIndex) {
+ auto tag = Settings.GetColumns(columnIndex).GetId();
+ auto type = NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(columnIndex).GetType());
+ if (IsSystemColumn(tag)) {
+ for (ui64 rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) {
+ NMiniKQL::FillSystemColumn(editAccessors[rowIndex][columnIndex], shardId, tag, type);
+ stats.AllocatedBytes += sizeof(NUdf::TUnboxedValue);
+ }
+ } else {
+ hasResultColumns = true;
+ stats.AddStatistics(
+ NMiniKQL::WriteColumnValuesFromArrow(editAccessors, *result->Get()->ArrowBatch, columnIndex, type)
+ );
+ }
+ }
+ }
+
+ if (!hasResultColumns) {
+ auto rowsCnt = result->Get()->GetRowsCount();
+ stats.AddStatistics({sizeof(ui64) * rowsCnt, sizeof(ui64) * rowsCnt});
+ }
+ return stats;
+ }
+
+ NMiniKQL::TBytesStatistics PackCells(
+ THolder<TEventHandle<TEvDataShard::TEvReadResult>>& result,
+ ui64 shardId,
+ NKikimr::NMiniKQL::TUnboxedValueVector& batch)
+ {
+ NMiniKQL::TBytesStatistics stats;
+ batch.reserve(batch.size());
+ for (size_t rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) {
+ const auto& row = result->Get()->GetCells(rowIndex);
+ NUdf::TUnboxedValue* rowItems = nullptr;
+ batch.emplace_back(HolderFactory.CreateDirectArrayHolder(Settings.ColumnsSize(), rowItems));
+ for (size_t i = 0; i < Settings.ColumnsSize(); ++i) {
+ auto tag = Settings.GetColumns(i).GetId();
+ auto type = NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(i).GetType());
+ if (IsSystemColumn(tag)) {
+ NMiniKQL::FillSystemColumn(rowItems[i], shardId, tag, type);
+ } else {
+ rowItems[i] = NMiniKQL::GetCellValue(row[i], type);
+ }
+ }
+ stats.AddStatistics(GetRowSize(rowItems));
+ }
+ return stats;
+ }
+
+ i64 GetAsyncInputData(
+ NKikimr::NMiniKQL::TUnboxedValueVector& resultVector,
+ TMaybe<TInstant>&,
+ bool& finished,
+ i64 freeSpace) override
+ {
+ ui64 bytes = 0;
+ while (!Results.empty()) {
+ auto& [shardId, result, batch, processedRows] = Results.front();
+ auto& msg = *result->Get();
+ if (!batch.Defined()) {
+ batch.ConstructInPlace();
+ switch (msg.Record.GetResultFormat()) {
+ case NKikimrTxDataShard::EScanDataFormat::ARROW:
+ PackArrow(result, shardId, *batch);
+ break;
+ case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED:
+ case NKikimrTxDataShard::EScanDataFormat::CELLVEC:
+ PackCells(result, shardId, *batch);
+ }
+ }
+
+ auto id = result->Get()->Record.GetReadId();
+ if (!Reads[id]) {
+ Results.pop();
+ continue;
+ }
+ auto* state = Reads[id].Shard;
+
+ for (; processedRows < batch->size(); ++processedRows) {
+ NMiniKQL::TBytesStatistics rowSize = GetRowSize((*batch)[processedRows].GetElements());
+ if (static_cast<ui64>(freeSpace) < bytes + rowSize.AllocatedBytes) {
+ break;
+ }
+ resultVector.push_back(std::move((*batch)[processedRows]));
+ RowCount += 1;
+ bytes += rowSize.AllocatedBytes;
+ }
+ CA_LOG_D(TStringBuilder() << "returned " << resultVector.size() << " rows");
+
+ if (batch->size() == processedRows) {
+ auto& record = msg.Record;
+ if (Reads[id].IsLastMessage(msg)) {
+ Reads[id].Reset();
+ ResetReads++;
+ } else if (!Reads[id].Finished) {
+ THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck());
+ request->Record.SetReadId(record.GetReadId());
+ request->Record.SetSeqNo(record.GetSeqNo());
+ request->Record.SetMaxRows(EVREAD_MAX_ROWS);
+ request->Record.SetMaxBytes(EVREAD_MAX_BYTES);
+ Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), state->TabletId, true),
+ IEventHandle::FlagTrackDelivery);
+ }
+
+ StartTableScan();
+ if (PendingShards.Size() > 0) {
+ return bytes;
+ }
+
+ Results.pop();
+ CA_LOG_D("dropping batch");
+
+ if (RunningReads() == 0 || (Settings.HasItemsLimit() && RowCount >= Settings.GetItemsLimit())) {
+ finished = true;
+ }
+ } else {
+ break;
+ }
+ }
+
+ return bytes;
+ }
+
+ void SaveState(const NYql::NDqProto::TCheckpoint&, NYql::NDqProto::TSourceState&) override {}
+ void CommitState(const NYql::NDqProto::TCheckpoint&) override {}
+ void LoadState(const NYql::NDqProto::TSourceState&) override {}
+
+ void PassAway() override {
+ {
+ auto guard = BindAllocator();
+ Results.clear();
+ }
+ TBase::PassAway();
+ }
+
+ void RuntimeError(const TString& message, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& subIssues = {}) {
+ NYql::TIssue issue(message);
+ for (const auto& i : subIssues) {
+ issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i));
+ }
+
+ NYql::TIssues issues;
+ issues.AddIssue(std::move(issue));
+ Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), statusCode));
+ }
+
+private:
+ NKikimrTxDataShard::TKqpReadRangesSourceSettings Settings;
+
+ TVector<NScheme::TTypeInfo> KeyColumnTypes;
+
+ size_t RowCount = 0;
+ ui64 ResetReads = 0;
+ ui64 ReadId = 0;
+ TVector<TReadState> Reads;
+ THashMap<ui64, TVector<ui32>> ReadIdByTabletId;
+
+ THashMap<ui64, TShardState*> ResolveShards;
+ ui64 ResolveShardId;
+
+ TShardQueue InFlightShards;
+ TShardQueue PendingShards;
+
+ struct TResult {
+ ui64 ShardId;
+ THolder<TEventHandle<TEvDataShard::TEvReadResult>> ReadResult;
+ TMaybe<NKikimr::NMiniKQL::TUnboxedValueVector> Batch;
+ size_t ProcessedRows = 0;
+
+ TResult(ui64 shardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>> readResult)
+ : ShardId(shardId)
+ , ReadResult(std::move(readResult))
+ {
+ }
+ };
+ TQueue<TResult> Results;
+
+ ui32 MaxInFlight = 1024;
+ const TString LogPrefix;
+ TTableId TableId;
+
+ TActorId ComputeActorId;
+ ui64 InputIndex;
+ const NMiniKQL::TTypeEnvironment& TypeEnv;
+ const NMiniKQL::THolderFactory& HolderFactory;
+};
+
+
+void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory& factory) {
+ factory.RegisterSource<NKikimrTxDataShard::TKqpReadRangesSourceSettings>(
+ TString(NYql::KqpReadRangesSourceName),
+ [] (NKikimrTxDataShard::TKqpReadRangesSourceSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TSourceArguments args) {
+ auto* actor = new TKqpReadActor(std::move(settings), args);
+ return std::make_pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*>(actor, actor);
+ });
+}
+
+} // namespace NKqp
+} // namespace NKikimr
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.h b/ydb/core/kqp/runtime/kqp_read_actor.h
new file mode 100644
index 00000000000..28f3e17873c
--- /dev/null
+++ b/ydb/core/kqp/runtime/kqp_read_actor.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
+
+namespace NKikimr {
+namespace NKqp {
+
+void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory& factory);
+
+} // namespace NKqp
+} // namespace NKikimr
diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp
index 2c39d685db5..ead01cb124d 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data.cpp
+++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp
@@ -10,19 +10,6 @@
namespace NKikimr {
namespace NMiniKQL {
-namespace {
-
-struct TBytesStatistics {
- ui64 AllocatedBytes = 0;
- ui64 DataBytes = 0;
-
- void AddStatistics(const TBytesStatistics& other) {
- AllocatedBytes += other.AllocatedBytes;
- DataBytes += other.DataBytes;
- }
-
-};
-
TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, NScheme::TTypeInfo type) {
namespace NTypeIds = NScheme::NTypeIds;
if (!value) {
@@ -86,6 +73,18 @@ TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, NScheme::
}
}
+void FillSystemColumn(NUdf::TUnboxedValue& rowItem, TMaybe<ui64> shardId, NTable::TTag tag, NScheme::TTypeInfo) {
+ YQL_ENSURE(tag == TKeyDesc::EColumnIdDataShard, "Unknown system column tag: " << tag);
+
+ if (shardId) {
+ rowItem = NUdf::TUnboxedValuePod(*shardId);
+ } else {
+ rowItem = NUdf::TUnboxedValue();
+ }
+}
+
+namespace {
+
TBytesStatistics GetRowSize(const NUdf::TUnboxedValue* row, const TSmallVec<TKqpComputeContextBase::TColumn>& columns,
const TSmallVec<TKqpComputeContextBase::TColumn>& systemColumns)
{
@@ -101,13 +100,7 @@ TBytesStatistics GetRowSize(const NUdf::TUnboxedValue* row, const TSmallVec<TKqp
void FillSystemColumns(NUdf::TUnboxedValue* rowItems, TMaybe<ui64> shardId, const TSmallVec<TKqpComputeContextBase::TColumn>& systemColumns) {
for (ui32 i = 0; i < systemColumns.size(); ++i) {
- YQL_ENSURE(systemColumns[i].Tag == TKeyDesc::EColumnIdDataShard, "Unknown system column tag: " << systemColumns[i].Tag);
-
- if (shardId) {
- rowItems[i] = NUdf::TUnboxedValuePod(*shardId);
- } else {
- rowItems[i] = NUdf::TUnboxedValue();
- }
+ FillSystemColumn(rowItems[i], shardId, systemColumns[i].Tag, systemColumns[i].Type);
}
}
@@ -143,6 +136,8 @@ NUdf::TUnboxedValue MakeUnboxedValueFromDecimal128Array(arrow::Array* column, ui
return NUdf::TUnboxedValuePod(val);
}
+} // namespace
+
TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors,
const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType)
{
@@ -250,7 +245,7 @@ TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>&
return columnStats;
}
-} // namespace
+
std::pair<ui64, ui64> GetUnboxedValueSizeForTests(const NUdf::TUnboxedValue& value, NScheme::TTypeInfo type) {
auto sizes = GetUnboxedValueSize(value, type);
diff --git a/ydb/core/kqp/runtime/kqp_scan_data.h b/ydb/core/kqp/runtime/kqp_scan_data.h
index e365ffa0933..6391be44f58 100644
--- a/ydb/core/kqp/runtime/kqp_scan_data.h
+++ b/ydb/core/kqp/runtime/kqp_scan_data.h
@@ -22,6 +22,23 @@ namespace NKikimrTxDataShard {
namespace NKikimr {
namespace NMiniKQL {
+struct TBytesStatistics {
+ ui64 AllocatedBytes = 0;
+ ui64 DataBytes = 0;
+
+ void AddStatistics(const TBytesStatistics& other) {
+ AllocatedBytes += other.AllocatedBytes;
+ DataBytes += other.DataBytes;
+ }
+
+};
+
+TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, NScheme::TTypeInfo type);
+TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors,
+ const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType);
+
+void FillSystemColumn(NUdf::TUnboxedValue& rowItem, TMaybe<ui64> shardId, NTable::TTag tag, NScheme::TTypeInfo type);
+
std::pair<ui64, ui64> GetUnboxedValueSizeForTests(const NUdf::TUnboxedValue& value, NScheme::TTypeInfo type);
class IKqpTableReader : public TSimpleRefCount<IKqpTableReader> {
diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp
index 305941e4322..217aa363695 100644
--- a/ydb/core/kqp/ut/kqp_scan_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp
@@ -1960,6 +1960,26 @@ Y_UNIT_TEST_SUITE(KqpScan) {
CompareYson("[[%false]]", StreamResultToYson(result));
}
+ Y_UNIT_TEST(DqSource) {
+ TKikimrSettings settings;
+ settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
+ TFeatureFlags flags;
+ flags.SetEnablePredicateExtractForDataQueries(true);
+ flags.SetEnableKqpScanQuerySourceRead(true);
+ settings.SetFeatureFlags(flags);
+ TKikimrRunner kikimr(settings);
+ auto db = kikimr.GetTableClient();
+ CreateSampleTables(kikimr);
+
+ {
+ auto result = db.StreamExecuteScanQuery(R"(
+ SELECT Key, Data FROM `/Root/EightShard` WHERE Key = 101 or (Key >= 202 and Key < 200+4) ORDER BY Key;
+ )").GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ CompareYson(R"([[[101u];[1]];[[202u];[1]];[[203u];[3]]])", StreamResultToYson(result));
+ }
+ }
+
Y_UNIT_TEST(StreamLookup) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetTableClient();
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index a3be7cd884d..f77e99ed4c6 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -745,6 +745,7 @@ message TFeatureFlags {
optional bool EnableKqpDataQueryStreamLookup = 75 [default = false];
optional bool EnableBorrowedSplitCompaction = 76 [default = true];
optional bool EnableChangefeedInitialScan = 77 [default = false];
+ optional bool EnableKqpScanQuerySourceRead = 78 [default = false];
}
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index f97d2240ff1..54e6603fd28 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -223,6 +223,7 @@ message TKqpPhyCnStreamLookup {
message TKqpPhyConnection {
uint32 StageIndex = 1;
uint32 OutputIndex = 2;
+ uint32 InputIndex = 13;
oneof Type {
TKqpPhyCnUnionAll UnionAll = 3;
@@ -238,6 +239,29 @@ message TKqpPhyConnection {
};
}
+message TKqpReadRangesSource {
+ TKqpPhyTable Table = 1;
+ repeated TKqpPhyColumn Columns = 2;
+ TKqpPhyValue ItemsLimit = 3;
+ bool Reverse = 4;
+ bool Sorted = 5;
+
+ oneof RangesExpr {
+ TKqpPhyParamValue Ranges = 6;
+ TKqpPhyKeyRange KeyRange = 7;
+ }
+
+ repeated string SkipNullKeys = 8;
+}
+
+message TKqpSource {
+ uint32 InputIndex = 1;
+
+ oneof Type {
+ TKqpReadRangesSource ReadRangesSource = 3;
+ }
+};
+
message TKqpPhyStage {
NYql.NDqProto.TProgram Program = 1;
repeated string ProgramParameters = 2;
@@ -247,6 +271,7 @@ message TKqpPhyStage {
repeated TKqpPhyTableOperation TableOps = 6;
bool IsEffectsStage = 7;
string StageGuid = 8;
+ repeated TKqpSource Sources = 9;
}
message TKqpPhyResult {
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 77650329128..e4adad9f190 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -238,6 +238,24 @@ message TKqpTransaction {
optional NKikimrKqp.TKqpSnapshot Snapshot = 6;
}
+message TKqpReadRangesSourceSettings {
+ optional TKqpTransaction.TTableMeta Table = 1;
+
+ oneof RangeType {
+ NKikimrTx.TKeyRange FullRange = 2;
+ TKqpTransaction.TTableKeyRange Ranges = 3;
+ }
+
+ repeated TKqpTransaction.TColumnMeta Columns = 4;
+ optional uint64 ItemsLimit = 5;
+ optional bool Reverse = 6;
+ repeated string SkipNullKeys = 7;
+ repeated uint32 KeyColumnTypes = 8;
+ optional EScanDataFormat DataFormat = 9;
+ optional NKikimrProto.TRowVersion Snapshot = 10;
+ optional uint64 ShardIdHint = 11;
+}
+
message TKqpTaskInfo {
optional uint64 TaskId = 1;
optional NActorsProto.TActorId ComputeActor = 2;
diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h
index 8cb87b01558..9907f4e0bf0 100644
--- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h
+++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h
@@ -97,10 +97,13 @@ struct TMergeTaskInput {
{}
};
+struct TSourceInput {};
+
// Enum values must match to ConnectionInfo variant alternatives
enum class TTaskInputType {
UnionAll,
- Merge
+ Merge,
+ Source
};
struct TTransform {
@@ -114,7 +117,7 @@ struct TTransform {
template <class TInputMeta>
struct TTaskInput {
- std::variant<std::monostate, TMergeTaskInput> ConnectionInfo;
+ std::variant<std::monostate, TMergeTaskInput, TSourceInput> ConnectionInfo;
TChannelList Channels;
TMaybe<::google::protobuf::Any> SourceSettings;
TString SourceType;