diff options
author | ssmike <[email protected]> | 2022-11-17 22:32:43 +0300 |
---|---|---|
committer | ssmike <[email protected]> | 2022-11-17 22:32:43 +0300 |
commit | 1cc2f4df3f6a62eea4ca4813b6fcdae0a9af74e6 (patch) | |
tree | 36ab568aabc2cefb29f1ecea55777a2a2ddc1362 | |
parent | 6b0ef1e419310324ad779d3adf521b92a6c78ad7 (diff) |
DQ source for scans
26 files changed, 1724 insertions, 119 deletions
diff --git a/ydb/core/kqp/common/kqp_yql.cpp b/ydb/core/kqp/common/kqp_yql.cpp index 7f76bfd8b84..8cbaaecba95 100644 --- a/ydb/core/kqp/common/kqp_yql.cpp +++ b/ydb/core/kqp/common/kqp_yql.cpp @@ -127,16 +127,15 @@ NNodes::TCoNameValueTupleList TKqpPhyTxSettings::BuildNode(TExprContext& ctx, TP namespace { -template <typename TKqlReadOperation> -TKqpReadTableSettings ParseInternal(const TKqlReadOperation& node) { +TKqpReadTableSettings ParseInternal(const TCoNameValueTupleList& node) { TKqpReadTableSettings settings; - for (const auto& tuple : node.Settings()) { + for (const auto& tuple : node) { TStringBuf name = tuple.Name().Value(); if (name == TKqpReadTableSettings::SkipNullKeysSettingName) { - YQL_ENSURE(tuple.Value().template Maybe<TCoAtomList>()); - for (const auto& key : tuple.Value().template Cast<TCoAtomList>()) { + YQL_ENSURE(tuple.Value().Maybe<TCoAtomList>()); + for (const auto& key : tuple.Value().Cast<TCoAtomList>()) { settings.SkipNullKeys.emplace_back(TString(key.Value())); } } else if (name == TKqpReadTableSettings::ItemsLimitSettingName) { @@ -158,12 +157,16 @@ TKqpReadTableSettings ParseInternal(const TKqlReadOperation& node) { } // anonymous namespace end -TKqpReadTableSettings TKqpReadTableSettings::Parse(const TKqlReadTableBase& node) { +TKqpReadTableSettings TKqpReadTableSettings::Parse(const NNodes::TCoNameValueTupleList& node) { return ParseInternal(node); } +TKqpReadTableSettings TKqpReadTableSettings::Parse(const TKqlReadTableBase& node) { + return TKqpReadTableSettings::Parse(node.Settings()); +} + TKqpReadTableSettings TKqpReadTableSettings::Parse(const TKqlReadTableRangesBase& node) { - return ParseInternal(node); + return TKqpReadTableSettings::Parse(node.Settings()); } NNodes::TCoNameValueTupleList TKqpReadTableSettings::BuildNode(TExprContext& ctx, TPositionHandle pos) const { @@ -297,9 +300,13 @@ TCoNameValueTupleList TKqpReadTableExplainPrompt::BuildNode(TExprContext& ctx, T } TKqpReadTableExplainPrompt TKqpReadTableExplainPrompt::Parse(const NNodes::TKqlReadTableRangesBase& node) { + return TKqpReadTableExplainPrompt::Parse(node.ExplainPrompt()); +} + +TKqpReadTableExplainPrompt TKqpReadTableExplainPrompt::Parse(const NNodes::TCoNameValueTupleList& node) { TKqpReadTableExplainPrompt prompt; - for (const auto& tuple : node.ExplainPrompt()) { + for (const auto& tuple : node) { TStringBuf name = tuple.Name().Value(); if (name == TKqpReadTableExplainPrompt::UsedKeyColumnsName) { diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h index 4a8352aa688..735e93360ab 100644 --- a/ydb/core/kqp/common/kqp_yql.h +++ b/ydb/core/kqp/common/kqp_yql.h @@ -39,6 +39,8 @@ struct TKqpPhyTxSettings { NNodes::TCoNameValueTupleList BuildNode(TExprContext& ctx, TPositionHandle pos) const; }; +constexpr TStringBuf KqpReadRangesSourceName = "KqpReadRangesSource"; + struct TKqpReadTableSettings { static constexpr TStringBuf SkipNullKeysSettingName = "SkipNullKeys"; static constexpr TStringBuf ItemsLimitSettingName = "ItemsLimit"; @@ -57,6 +59,7 @@ struct TKqpReadTableSettings { static TKqpReadTableSettings Parse(const NNodes::TKqlReadTableBase& node); static TKqpReadTableSettings Parse(const NNodes::TKqlReadTableRangesBase& node); + static TKqpReadTableSettings Parse(const NNodes::TCoNameValueTupleList& node); NNodes::TCoNameValueTupleList BuildNode(TExprContext& ctx, TPositionHandle pos) const; }; @@ -88,6 +91,7 @@ struct TKqpReadTableExplainPrompt { NNodes::TCoNameValueTupleList BuildNode(TExprContext& ctx, TPositionHandle pos) const; static TKqpReadTableExplainPrompt Parse(const NNodes::TKqlReadTableRangesBase& node); + static TKqpReadTableExplainPrompt Parse(const NNodes::TCoNameValueTupleList& node); }; TString KqpExprToPrettyString(const TExprNode& expr, TExprContext& ctx); diff --git a/ydb/core/kqp/compile/kqp_compile.cpp b/ydb/core/kqp/compile/kqp_compile.cpp index ede7a19f34c..e12ceca2c77 100644 --- a/ydb/core/kqp/compile/kqp_compile.cpp +++ b/ydb/core/kqp/compile/kqp_compile.cpp @@ -414,11 +414,19 @@ private: for (ui32 inputIndex = 0; inputIndex < stage.Inputs().Size(); ++inputIndex) { const auto& input = stage.Inputs().Item(inputIndex); - YQL_ENSURE(input.Maybe<TDqConnection>()); - auto connection = input.Cast<TDqConnection>(); - auto& protoInput = *stageProto.AddInputs(); - FillConnection(connection, stagesMap, protoInput, ctx); + if (input.Maybe<TDqSource>()) { + auto* protoSource = stageProto.AddSources(); + FillSource(input.Cast<TDqSource>(), protoSource, true); + protoSource->SetInputIndex(inputIndex); + } else { + YQL_ENSURE(input.Maybe<TDqConnection>()); + auto connection = input.Cast<TDqConnection>(); + + auto& protoInput = *stageProto.AddInputs(); + FillConnection(connection, stagesMap, protoInput, ctx); + protoInput.SetInputIndex(inputIndex); + } } bool hasSort = false; @@ -631,6 +639,54 @@ private: } } + void FillSource(const TDqSource& source, NKqpProto::TKqpSource* protoSource, bool allowSystemColumns) { + if (auto settings = source.Settings().Maybe<TKqpReadRangesSourceSettings>()) { + NKqpProto::TKqpReadRangesSource& readProto = *protoSource->MutableReadRangesSource(); + FillTable(settings.Table().Cast(), *readProto.MutableTable()); + + auto tableMeta = TablesData->ExistingTable(Cluster, settings.Table().Cast().Path()).Metadata; + YQL_ENSURE(tableMeta); + + FillColumns(settings.Columns().Cast(), *tableMeta, readProto, allowSystemColumns); + auto readSettings = TKqpReadTableSettings::Parse(settings.Settings().Cast()); + + readProto.SetReverse(readSettings.Reverse); + readProto.SetSorted(readSettings.Sorted); + for (auto&& key : readSettings.SkipNullKeys) { + readProto.AddSkipNullKeys(key); + } + + auto ranges = settings.RangesExpr().template Maybe<TCoParameter>(); + if (ranges.IsValid()) { + auto& rangesParam = *readProto.MutableRanges(); + rangesParam.SetParamName(ranges.Cast().Name().StringValue()); + } else { + YQL_ENSURE( + TCoVoid::Match(settings.RangesExpr().Raw()), + "Read ranges should be parameter or void, got: " << settings.RangesExpr().Cast().Ptr()->Content() + ); + } + + if (readSettings.ItemsLimit) { + TExprBase expr(readSettings.ItemsLimit); + if (expr.template Maybe<TCoUint64>()) { + auto* literal = readProto.MutableItemsLimit()->MutableLiteralValue(); + + literal->MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data); + literal->MutableType()->MutableData()->SetScheme(NScheme::NTypeIds::Uint64); + + literal->MutableValue()->SetUint64(FromString<ui64>(expr.Cast<TCoUint64>().Literal().Value())); + } else if (expr.template Maybe<TCoParameter>()) { + readProto.MutableItemsLimit()->MutableParamValue()->SetParamName(expr.template Cast<TCoParameter>().Name().StringValue()); + } else { + YQL_ENSURE(false, "Unexpected ItemsLimit callable " << expr.Ref().Content()); + } + } + } else { + YQL_ENSURE(false, "unsupported source type"); + } + } + void FillConnection(const TDqConnection& connection, const TMap<ui64, ui32>& stagesMap, NKqpProto::TKqpPhyConnection& connectionProto, TExprContext& ctx) { diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index 9fe5852f386..f9eab53745d 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -3,6 +3,7 @@ #include <ydb/core/base/appdata.h> #include <ydb/core/kqp/runtime/kqp_compute.h> #include <ydb/core/kqp/runtime/kqp_read_table.h> +#include <ydb/core/kqp/runtime/kqp_read_actor.h> #include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h> namespace NKikimr { @@ -48,6 +49,7 @@ namespace NKqp { NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory() { auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>(); RegisterStreamLookupActorFactory(*factory); + RegisterKqpReadActor(*factory); return factory; } diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h index a8c8c0579ef..8f0936d6410 100644 --- a/ydb/core/kqp/executer/kqp_executer_impl.h +++ b/ydb/core/kqp/executer/kqp_executer_impl.h @@ -185,6 +185,19 @@ protected: } } + TMaybe<size_t> FindReadRangesSource(const NKqpProto::TKqpPhyStage& stage) { + TMaybe<size_t> res; + for (size_t i = 0; i < stage.SourcesSize(); ++i) { + auto& source = stage.GetSources(i); + if (source.HasReadRangesSource()) { + YQL_ENSURE(!res); + res = i; + + } + } + return res; + } + protected: void HandleAbortExecution(TEvKqp::TEvAbortExecution::TPtr& ev) { auto& msg = ev->Get()->Record; @@ -406,6 +419,11 @@ protected: void FillInputDesc(NYql::NDqProto::TTaskInput& inputDesc, const TTaskInput& input) { switch (input.Type()) { + case NYql::NDq::TTaskInputType::Source: + inputDesc.MutableSource()->SetType(input.SourceType); + inputDesc.MutableSource()->SetWatermarksMode(input.WatermarksMode); + inputDesc.MutableSource()->MutableSettings()->CopyFrom(*input.SourceSettings); + break; case NYql::NDq::TTaskInputType::UnionAll: { inputDesc.MutableUnionAll(); break; diff --git a/ydb/core/kqp/executer/kqp_partition_helper.cpp b/ydb/core/kqp/executer/kqp_partition_helper.cpp index 98a77963b63..a065cffaeed 100644 --- a/ydb/core/kqp/executer/kqp_partition_helper.cpp +++ b/ydb/core/kqp/executer/kqp_partition_helper.cpp @@ -587,6 +587,57 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, return shardInfoMap; } +THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, + const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, + const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) +{ + const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId); + YQL_ENSURE(table); + + const auto& keyColumnTypes = table->KeyColumnTypes; + TVector<TSerializedPointOrRange> ranges; + + if (source.HasRanges()) { + ranges = FillRangesFromParameter( + keyColumnTypes, source.GetRanges(), stageInfo, holderFactory, typeEnv + ); + } else if (source.HasKeyRange()) { + //TODO: support KeyRange + Y_ENSURE(false); + } else { + ranges = BuildFullRange(keyColumnTypes); + } + + THashMap<ui64, TShardInfo> shardInfoMap; + + // KeyReadRanges must be sorted & non-intersecting, they came in such condition from predicate extraction. + for (auto& range: ranges) { + TTableRange tableRange = std::holds_alternative<TSerializedCellVec>(range) + ? TTableRange(std::get<TSerializedCellVec>(range).GetCells(), true, std::get<TSerializedCellVec>(range).GetCells(), true, true) + : TTableRange(std::get<TSerializedTableRange>(range).ToTableRange()); + + auto readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(), + keyColumnTypes); + + for (TPartitionWithRange& partitionWithRange : readPartitions) { + auto& shardInfo = shardInfoMap[partitionWithRange.PartitionInfo->ShardId]; + + if (!shardInfo.KeyReadRanges) { + shardInfo.KeyReadRanges.ConstructInPlace(); + } + + if (partitionWithRange.FullRange) { + shardInfo.KeyReadRanges->MakeFullRange(std::move(*partitionWithRange.FullRange)); + continue; + } + + shardInfo.KeyReadRanges->Add(std::move(partitionWithRange.PointOrRange)); + } + } + + return shardInfoMap; +} + THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpPhyOpReadOlapRanges& readRanges, const TStageInfo& stageInfo, @@ -908,9 +959,6 @@ THashMap<ui64, TShardInfo> PruneEffectPartitions(TKqpTableKeys& tableKeys, } } - -namespace { - void ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValue& protoItemsLimit, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv, ui64& itemsLimit, TString& itemsLimitParamName, NYql::NDqProto::TData& itemsLimitBytes, @@ -961,8 +1009,6 @@ void ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValu } } -} - TPhysicalShardReadSettings ExtractReadSettings(const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { diff --git a/ydb/core/kqp/executer/kqp_partition_helper.h b/ydb/core/kqp/executer/kqp_partition_helper.h index 7b5eea477bd..f33016a9d47 100644 --- a/ydb/core/kqp/executer/kqp_partition_helper.h +++ b/ydb/core/kqp/executer/kqp_partition_helper.h @@ -61,6 +61,16 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpPhyOpLookup& lookup, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); + +THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, + const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, + const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); + +void ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValue& protoItemsLimit, + const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv, + ui64& itemsLimit, TString& itemsLimitParamName, NYql::NDqProto::TData& itemsLimitBytes, + NKikimr::NMiniKQL::TType*& itemsLimitType); + // Returns the list of ColumnShards that can store rows from the specified range // NOTE: Unlike OLTP tables that store data in DataShards, data in OLAP tables is not range // partitioned and multiple ColumnShards store data from the same key range diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp index 396fd840e9c..6180a0e6549 100644 --- a/ydb/core/kqp/executer/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp @@ -480,6 +480,101 @@ private: } } + void BuildScanTasksFromSource(TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, + const NMiniKQL::TTypeEnvironment& typeEnv) + { + THashMap<ui64, std::vector<ui64>> nodeTasks; + THashMap<ui64, ui64> assignedShardsCount; + + auto& stage = GetStage(stageInfo); + + auto sourceIndex = FindReadRangesSource(stage); + YQL_ENSURE(sourceIndex); + YQL_ENSURE(stage.InputsSize() == 0 && stage.SourcesSize() == 1, "multiple sources or sources mixed with connections"); + + auto& source = stage.GetSources(*sourceIndex).GetReadRangesSource(); + + const auto& table = TableKeys.GetTable(MakeTableId(source.GetTable())); + const auto& keyTypes = table.KeyColumnTypes; + + YQL_ENSURE(table.TableKind != NKikimr::NKqp::ETableKind::Olap); + + auto columns = BuildKqpColumns(source, table); + THashMap<ui64, TShardInfo> partitions = PrunePartitions(TableKeys, source, stageInfo, holderFactory, typeEnv); + + bool reverse = false; + ui64 itemsLimit = 0; + bool sorted = true; + + TString itemsLimitParamName; + NDqProto::TData itemsLimitBytes; + NKikimr::NMiniKQL::TType* itemsLimitType = nullptr; + + YQL_ENSURE(!source.GetReverse(), "reverse not supported yet"); + + for (auto& [shardId, shardInfo] : partitions) { + YQL_ENSURE(!shardInfo.KeyWriteRanges); + + auto& task = AssignTaskToShard(stageInfo, shardId, nodeTasks, assignedShardsCount, sorted, false); + + for (auto& [name, value] : shardInfo.Params) { + auto ret = task.Meta.Params.emplace(name, std::move(value)); + YQL_ENSURE(ret.second); + auto typeIterator = shardInfo.ParamTypes.find(name); + YQL_ENSURE(typeIterator != shardInfo.ParamTypes.end()); + auto retType = task.Meta.ParamTypes.emplace(name, typeIterator->second); + YQL_ENSURE(retType.second); + } + + NKikimrTxDataShard::TKqpReadRangesSourceSettings settings; + FillTableMeta(stageInfo, settings.MutableTable()); + for (auto& key : source.GetSkipNullKeys()) { + settings.AddSkipNullKeys(key); + } + + for (auto& keyColumn : keyTypes) { + settings.AddKeyColumnTypes(static_cast<ui32>(keyColumn.GetTypeId())); + } + + for (auto& column : columns) { + auto* protoColumn = settings.AddColumns(); + protoColumn->SetId(column.Id); + auto columnType = NScheme::ProtoColumnTypeFromTypeInfo(column.Type); + protoColumn->SetType(columnType.TypeId); + if (columnType.TypeInfo) { + *protoColumn->MutableTypeInfo() = *columnType.TypeInfo; + } + protoColumn->SetName(column.Name); + } + + if (AppData()->FeatureFlags.GetEnableArrowFormatAtDatashard()) { + settings.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::ARROW); + } else { + settings.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC); + } + + settings.MutableSnapshot()->SetStep(Request.Snapshot.Step); + settings.MutableSnapshot()->SetTxId(Request.Snapshot.TxId); + + shardInfo.KeyReadRanges->SerializeTo(&settings); + settings.SetReverse(reverse); + + settings.SetShardIdHint(shardId); + + ExtractItemsLimit(stageInfo, source.GetItemsLimit(), holderFactory, + typeEnv, itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType); + settings.SetItemsLimit(itemsLimit); + + const auto& stageSource = stage.GetSources(*sourceIndex); + auto& input = task.Inputs[stageSource.GetInputIndex()]; + auto& taskSourceSettings = input.SourceSettings; + input.ConnectionInfo = NYql::NDq::TSourceInput{}; + taskSourceSettings.ConstructInPlace(); + taskSourceSettings->PackFrom(settings); + input.SourceType = NYql::KqpReadRangesSourceName; + } + } + void BuildScanTasks(TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { @@ -662,7 +757,9 @@ private: Y_VERIFY_DEBUG(!stage.GetIsEffectsStage()); - if (stageInfo.Meta.ShardOperations.empty()) { + if (FindReadRangesSource(stage)) { + BuildScanTasksFromSource(stageInfo, holderFactory, typeEnv); + } else if (stageInfo.Meta.ShardOperations.empty()) { BuildComputeTasks(stageInfo); } else if (stageInfo.Meta.IsSysView()) { BuildSysViewScanTasks(stageInfo, holderFactory, typeEnv); @@ -753,7 +850,13 @@ private: protoTaskMeta.AddSkipNullKeys(skipNullKey); } - YQL_ENSURE(task.Meta.Reads); + // Task with source + if (!task.Meta.Reads) { + scanTasks[task.Meta.NodeId].emplace_back(std::move(taskDesc)); + nScanTasks++; + continue; + } + YQL_ENSURE(!task.Meta.Writes); if (!task.Meta.Reads->empty()) { diff --git a/ydb/core/kqp/executer/kqp_table_resolver.cpp b/ydb/core/kqp/executer/kqp_table_resolver.cpp index d4b9d715aaf..45e322f2d08 100644 --- a/ydb/core/kqp/executer/kqp_table_resolver.cpp +++ b/ydb/core/kqp/executer/kqp_table_resolver.cpp @@ -238,6 +238,17 @@ private: } } + for (auto& source : stage.GetSources()) { + if (source.HasReadRangesSource()) { + auto& table = TableKeys.GetOrAddTable( + MakeTableId(source.GetReadRangesSource().GetTable()), + source.GetReadRangesSource().GetTable().GetPath()); + for (auto& column : source.GetReadRangesSource().GetColumns()) { + table.Columns.emplace(column.GetName(), TKqpTableKeys::TColumn()); + } + } + } + for (const auto& input : stage.GetInputs()) { if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) { const auto& streamLookup = input.GetStreamLookup(); diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.cpp b/ydb/core/kqp/executer/kqp_tasks_graph.cpp index cdab73ccfec..6c258a03e31 100644 --- a/ydb/core/kqp/executer/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer/kqp_tasks_graph.cpp @@ -44,8 +44,18 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew TStageInfoMeta meta(tx); + for (auto& source : stage.GetSources()) { + if (source.HasReadRangesSource()) { + YQL_ENSURE(source.GetInputIndex() == 0); + YQL_ENSURE(stage.SourcesSize() == 1); + meta.TableId = MakeTableId(source.GetReadRangesSource().GetTable()); + meta.TablePath = source.GetReadRangesSource().GetTable().GetPath(); + meta.ShardOperations.insert(TKeyDesc::ERowOperation::Read); + } + } + bool stageAdded = tasksGraph.AddStageInfo( - TStageInfo(stageId, stage.InputsSize(), stage.GetOutputsCount(), std::move(meta))); + TStageInfo(stageId, stage.InputsSize() + stage.SourcesSize(), stage.GetOutputsCount(), std::move(meta))); YQL_ENSURE(stageAdded); auto& stageInfo = tasksGraph.GetStageInfo(stageId); @@ -267,8 +277,8 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tabl << (spilling ? " with spilling" : " without spilling")); }; - for (ui32 inputIdx = 0; inputIdx < stage.InputsSize(); ++inputIdx) { - const auto& input = stage.GetInputs(inputIdx); + for (const auto& input : stage.GetInputs()) { + ui32 inputIdx = input.GetInputIndex(); const auto& inputStageInfo = tasksGraph.GetStageInfo(TStageId(stageInfo.Id.TxId, input.GetStageIndex())); const auto& outputIdx = input.GetOutputIndex(); @@ -318,22 +328,6 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tabl } } -TVector<TTaskMeta::TColumn> BuildKqpColumns(const NKqpProto::TKqpPhyTableOperation& op, const TKqpTableKeys::TTable& table) { - TVector<TTaskMeta::TColumn> columns; - columns.reserve(op.GetColumns().size()); - - for (const auto& column : op.GetColumns()) { - TTaskMeta::TColumn c; - c.Id = column.GetId(); - c.Type = table.Columns.at(column.GetName()).Type; - c.Name = column.GetName(); - - columns.emplace_back(std::move(c)); - } - - return columns; -} - bool IsCrossShardChannel(TKqpTasksGraph& tasksGraph, const TChannel& channel) { YQL_ENSURE(channel.SrcTask); @@ -618,6 +612,41 @@ void TShardKeyRanges::SerializeTo(NKikimrTxDataShard::TKqpTransaction_TScanTaskM } } +void TShardKeyRanges::SerializeTo(NKikimrTxDataShard::TKqpReadRangesSourceSettings* proto) const { + if (IsFullRange()) { + auto& protoRange = *proto->MutableRanges()->AddKeyRanges(); + FullRange->Serialize(protoRange); + } else { + bool usePoints = true; + for (auto& range : Ranges) { + if (std::holds_alternative<TSerializedCellVec>(range)) { + usePoints = false; + } + } + auto* protoRanges = proto->MutableRanges(); + for (auto& range : Ranges) { + if (std::holds_alternative<TSerializedCellVec>(range)) { + if (usePoints) { + const auto& x = std::get<TSerializedCellVec>(range); + protoRanges->AddKeyPoints(x.GetBuffer()); + } else { + const auto& x = std::get<TSerializedCellVec>(range); + auto& keyRange = *protoRanges->AddKeyRanges(); + keyRange.SetFrom(x.GetBuffer()); + keyRange.SetTo(x.GetBuffer()); + keyRange.SetFromInclusive(true); + keyRange.SetToInclusive(true); + } + } else { + auto& x = std::get<TSerializedTableRange>(range); + Y_VERIFY_DEBUG(!x.Point); + auto& keyRange = *protoRanges->AddKeyRanges(); + x.Serialize(keyRange); + } + } + } +} + std::pair<const TSerializedCellVec*, bool> TShardKeyRanges::GetRightBorder() const { if (FullRange) { return !FullRange->Point ? std::make_pair(&FullRange->To, true) : std::make_pair(&FullRange->From, true); diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.h b/ydb/core/kqp/executer/kqp_tasks_graph.h index 6dbc9bac458..fbe3ba7df4b 100644 --- a/ydb/core/kqp/executer/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer/kqp_tasks_graph.h @@ -12,6 +12,7 @@ namespace NKikimrTxDataShard { class TKqpTransaction_TDataTaskMeta_TKeyRange; class TKqpTransaction_TScanTaskMeta_TReadOpMeta; +class TKqpReadRangesSourceSettings; } namespace NKikimr { @@ -91,6 +92,7 @@ struct TShardKeyRanges { TString ToString(const TVector<NScheme::TTypeInfo>& keyTypes, const NScheme::TTypeRegistry& typeRegistry) const; void SerializeTo(NKikimrTxDataShard::TKqpTransaction_TDataTaskMeta_TKeyRange* proto) const; void SerializeTo(NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta_TReadOpMeta* proto) const; + void SerializeTo(NKikimrTxDataShard::TKqpReadRangesSourceSettings* proto) const; std::pair<const TSerializedCellVec*, bool> GetRightBorder() const; }; @@ -171,7 +173,23 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew void BuildKqpTaskGraphResultChannels(TKqpTasksGraph& tasksGraph, const NKqpProto::TKqpPhyTx& tx, ui64 txIdx); void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo, ui64 txId, bool enableSpilling); -TVector<TTaskMeta::TColumn> BuildKqpColumns(const NKqpProto::TKqpPhyTableOperation& op, const TKqpTableKeys::TTable& table); + +template<typename Proto> +TVector<TTaskMeta::TColumn> BuildKqpColumns(const Proto& op, const TKqpTableKeys::TTable& table) { + TVector<TTaskMeta::TColumn> columns; + columns.reserve(op.GetColumns().size()); + + for (const auto& column : op.GetColumns()) { + TTaskMeta::TColumn c; + c.Id = column.GetId(); + c.Type = table.Columns.at(column.GetName()).Type; + c.Name = column.GetName(); + + columns.emplace_back(std::move(c)); + } + + return columns; +} struct TKqpTaskOutputType { enum : ui32 { diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index 051a73d2f56..8b323d48b71 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -88,6 +88,18 @@ ] }, { + "Name": "TKqpReadRangesSourceSettings", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "KqpRowsSourceSettings"}, + "Children": [ + {"Index": 0, "Name": "Table", "Type": "TKqpTable"}, + {"Index": 1, "Name": "Columns", "Type": "TCoAtomList"}, + {"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList"}, + {"Index": 3, "Name": "RangesExpr", "Type": "TExprBase"}, + {"Index": 4, "Name": "ExplainPrompt", "Type": "TCoNameValueTupleList", "Optional": true} + ] + }, + { "Name": "TKqlReadTableRanges", "Base": "TKqlReadTableRangesBase", "Match": {"Type": "Callable", "Name": "KqlReadTableRanges"} diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index ad91854efbf..e32e895883f 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -235,6 +235,44 @@ TStatus AnnotateReadTable(const TExprNode::TPtr& node, TExprContext& ctx, const return TStatus::Ok; } +TStatus AnnotateKqpSourceSettings(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster, + const TKikimrTablesData& tablesData, bool withSystemColumns) +{ + auto table = ResolveTable(node->Child(TKqpReadRangesSourceSettings::idx_Table), ctx, cluster, tablesData); + if (!table.second) { + return TStatus::Error; + } + + const auto& columns = node->ChildPtr(TKqpReadRangesSourceSettings::idx_Columns); + if (!EnsureTupleOfAtoms(*columns, ctx)) { + return TStatus::Error; + } + + auto rowType = GetReadTableRowType(ctx, tablesData, cluster, table.first, TCoAtomList(columns), withSystemColumns); + if (!rowType) { + return TStatus::Error; + } + + auto ranges = node->Child(TKqpReadRangesSourceSettings::idx_RangesExpr); + if (!TCoVoid::Match(ranges) && + !TCoArgument::Match(ranges) && + !TCoParameter::Match(ranges) && + !TCoRangeFinalize::Match(ranges) && + !TDqPhyPrecompute::Match(ranges) && + !TKqpTxResultBinding::Match(ranges)) + { + ctx.AddError(TIssue( + ctx.GetPosition(ranges->Pos()), + TStringBuilder() + << "Expected Void, Parameter, Argument or RangeFinalize in ranges, but got: " + << ranges->Content() + )); + return TStatus::Error; + } + + node->SetTypeAnn(ctx.MakeType<TStreamExprType>(rowType)); + return TStatus::Ok; +} TStatus AnnotateReadTableRanges(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster, const TKikimrTablesData& tablesData, bool withSystemColumns) @@ -1309,6 +1347,10 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl return AnnotateKqpEnsure(input, ctx); } + if (TKqpReadRangesSourceSettings::Match(input.Get())) { + return AnnotateKqpSourceSettings(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled()); + } + return dqTransformer->Transform(input, output, ctx); }); } diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp index 56e3d118501..b40b1e7b5be 100644 --- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp +++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp @@ -283,22 +283,7 @@ private: TVector<TCoArgument> newArgs; TNodeOnNodeOwnedMap argsMap; - for (ui32 i = 0; i < stage.Inputs().Size(); ++i) { - const auto& input = stage.Inputs().Item(i); - const auto& inputArg = stage.Program().Args().Arg(i); - - auto maybeBinding = input.Maybe<TKqpTxResultBinding>(); - - if (!maybeBinding.IsValid()) { - auto newArg = ctx.NewArgument(inputArg.Pos(), inputArg.Name()); - newInputs.push_back(input); - newArgs.emplace_back(TCoArgument(newArg)); - argsMap.emplace(inputArg.Raw(), std::move(newArg)); - continue; - } - - auto binding = maybeBinding.Cast(); - + auto makeParameterBinding = [&ctx, &bindingsMap] (TKqpTxResultBinding binding, TPositionHandle pos) { TString paramName = TStringBuilder() << ParamNamePrefix << "tx_result_binding_" << binding.TxIndex().Value() << "_" << binding.ResultIndex().Value(); @@ -308,9 +293,9 @@ private: type = type->Cast<TTypeExprType>()->GetType(); YQL_ENSURE(type); - TExprBase parameter = Build<TCoParameter>(ctx, input.Pos()) + TExprBase parameter = Build<TCoParameter>(ctx, pos) .Name().Build(paramName) - .Type(ExpandType(input.Pos(), *type, ctx)) + .Type(ExpandType(pos, *type, ctx)) .Done(); // TODO: (Iterator|ToStream (Parameter ...)) -> (ToFlow (Parameter ...)) @@ -320,7 +305,7 @@ private: // .Done(); // } - auto paramBinding = Build<TKqpParamBinding>(ctx, input.Pos()) + auto paramBinding = Build<TKqpParamBinding>(ctx, pos) .Name().Build(paramName) .Binding(binding) .Done(); @@ -332,7 +317,37 @@ private: << ", first: " << KqpExprToPrettyString(inserted.first->second.Binding().Ref(), ctx) << ", second: " << KqpExprToPrettyString(binding, ctx)); } - argsMap.emplace(inputArg.Raw(), parameter.Ptr()); + return parameter; + }; + + TNodeOnNodeOwnedMap sourceReplaceMap; + for (ui32 i = 0; i < stage.Inputs().Size(); ++i) { + const auto& input = stage.Inputs().Item(i); + const auto& inputArg = stage.Program().Args().Arg(i); + + if (auto source = input.Maybe<TDqSource>()) { + VisitExpr(input.Ptr(), + [&](const TExprNode::TPtr& node) { + TExprBase expr(node); + YQL_ENSURE(!expr.Maybe<TDqConnection>().IsValid()); + if (auto binding = expr.Maybe<TKqpTxResultBinding>()) { + sourceReplaceMap.emplace(node.Get(), makeParameterBinding(binding.Cast(), node->Pos()).Ptr()); + } + return true; + }); + } + + auto maybeBinding = input.Maybe<TKqpTxResultBinding>(); + + if (!maybeBinding.IsValid()) { + auto newArg = ctx.NewArgument(inputArg.Pos(), inputArg.Name()); + newInputs.push_back(input); + newArgs.emplace_back(TCoArgument(newArg)); + argsMap.emplace(inputArg.Raw(), std::move(newArg)); + continue; + } + + argsMap.emplace(inputArg.Raw(), makeParameterBinding(maybeBinding.Cast(), input.Pos()).Ptr()); } auto inputs = Build<TExprList>(ctx, stage.Pos()) @@ -340,7 +355,7 @@ private: .Done(); return Build<TDqPhyStage>(ctx, stage.Pos()) - .Inputs(ctx.ReplaceNodes(inputs.Ptr(), stagesMap)) + .Inputs(ctx.ReplaceNodes(ctx.ReplaceNodes(inputs.Ptr(), stagesMap), sourceReplaceMap)) .Program() .Args(newArgs) .Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), argsMap)) @@ -388,6 +403,29 @@ private: bool IsPrecompute; }; +TVector<TDqPhyPrecompute> PrecomputeInputs(const TDqStage& stage) { + TVector<TDqPhyPrecompute> result; + for (const auto& input : stage.Inputs()) { + if (auto maybePrecompute = input.Maybe<TDqPhyPrecompute>()) { + result.push_back(maybePrecompute.Cast()); + } else if (auto maybeSource = input.Maybe<TDqSource>()) { + VisitExpr(maybeSource.Cast().Ptr(), + [&] (const TExprNode::TPtr& ptr) { + TExprBase node(ptr); + if (auto maybePrecompute = node.Maybe<TDqPhyPrecompute>()) { + result.push_back(maybePrecompute.Cast()); + return false; + } + if (auto maybeConnection = node.Maybe<TDqConnection>()) { + YQL_ENSURE(false, "unexpected connection in source"); + } + return true; + }); + } + } + return result; +} + class TKqpBuildTxsTransformer : public TSyncTransformerBase { public: TKqpBuildTxsTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, @@ -556,27 +594,30 @@ private: } auto stage = maybeStage.Cast(); + auto precomputeInputs = PrecomputeInputs(stage); + for (auto& precompute : precomputeInputs) { + auto precomputeStage = precompute.Connection().Output().Stage(); + precomputes.emplace(precomputeStage.Raw(), precomputeStage.Ptr()); + dependencies.emplace(stage.Raw(), stage.Ptr()); + } for (const auto& input : stage.Inputs()) { - const TExprNode* inputStage; - - if (auto maybePrecompute = input.Maybe<TDqPhyPrecompute>()) { - auto precomputeStage = maybePrecompute.Cast().Connection().Output().Stage(); - precomputes.emplace(precomputeStage.Raw(), precomputeStage.Ptr()); - dependencies.emplace(stage.Raw(), stage.Ptr()); - inputStage = precomputeStage.Raw(); + if (input.Maybe<TDqPhyPrecompute>()) { + continue; } else if (auto maybeConnection = input.Maybe<TDqConnection>()) { - inputStage = maybeConnection.Cast().Output().Stage().Raw(); + const TExprNode* inputStage = maybeConnection.Cast().Output().Stage().Raw(); + if (dependencies.contains(inputStage)) { + dependencies.emplace(stage.Raw(), stage.Ptr()); + } + } else if (auto maybeSource = input.Maybe<TDqSource>()) { + // handled in PrecomputeInputs + continue; } else if (input.Maybe<TKqpTxResultBinding>()) { // ok continue; } else { YQL_ENSURE(false, "Unexpected stage input: " << input.Ref().Content()); } - - if (dependencies.contains(inputStage)) { - dependencies.emplace(stage.Raw(), stage.Ptr()); - } } return true; @@ -618,15 +659,9 @@ private: for (auto& [_, stagePtr] : dependantStagesMap) { TDqStage stage(stagePtr); + auto precomputes = PrecomputeInputs(stage); - for (const auto& input : stage.Inputs()) { - auto maybePrecompute = input.Maybe<TDqPhyPrecompute>(); - - if (!maybePrecompute.IsValid()) { - continue; - } - - auto precompute = maybePrecompute.Cast(); + for (const auto& precompute : precomputes) { auto precomputeConnection = precompute.Connection(); auto precomputeStage = precomputeConnection.Output().Stage(); diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index 4bdf5662401..46ff7a38fce 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -535,6 +535,251 @@ private: } } + void Visit(const TKqpReadRangesSourceSettings& sourceSettings, TQueryPlanNode& planNode) { + if (sourceSettings.RangesExpr().Maybe<TKqlKeyRange>()) { + auto table = TString(sourceSettings.Table().Path()); + auto range = sourceSettings.RangesExpr().Cast<TKqlKeyRange>(); + + TOperator op; + TTableRead readInfo; + + auto describeBoundary = [this](const TExprBase& key) { + if (auto param = key.Maybe<TCoParameter>()) { + return param.Cast().Name().StringValue(); + } + + if (auto param = key.Maybe<TCoNth>().Tuple().Maybe<TCoParameter>()) { + if (auto maybeResultBinding = ContainResultBinding(param.Cast().Name().StringValue())) { + auto [txId, resId] = *maybeResultBinding; + if (auto result = GetResult(txId, resId)) { + auto index = FromString<ui32>(key.Cast<TCoNth>().Index()); + Y_ENSURE(index < result->Size()); + return (*result)[index].GetDataText(); + } + } + } + + if (auto literal = key.Maybe<TCoDataCtor>()) { + return literal.Cast().Literal().StringValue(); + } + + return TString("n/a"); + }; + + /* Collect info about scan range */ + struct TKeyPartRange { + TString From; + TString To; + TString ColumnName; + }; + auto& tableData = SerializerCtx.TablesData->GetTable(SerializerCtx.Cluster, table); + op.Properties["Table"] = tableData.RelativePath ? *tableData.RelativePath : table; + planNode.NodeInfo["Tables"].AppendValue(op.Properties["Table"]); + TVector<TKeyPartRange> scanRangeDescr(tableData.Metadata->KeyColumnNames.size()); + + auto maybeFromKey = range.From().Maybe<TKqlKeyTuple>(); + auto maybeToKey = range.To().Maybe<TKqlKeyTuple>(); + if (maybeFromKey && maybeToKey) { + auto fromKey = maybeFromKey.Cast(); + auto toKey = maybeToKey.Cast(); + + for (ui32 i = 0; i < fromKey.ArgCount(); ++i) { + scanRangeDescr[i].From = describeBoundary(fromKey.Arg(i)); + } + for (ui32 i = 0; i < toKey.ArgCount(); ++i) { + scanRangeDescr[i].To = describeBoundary(toKey.Arg(i)); + } + for (ui32 i = 0; i < scanRangeDescr.size(); ++i) { + scanRangeDescr[i].ColumnName = tableData.Metadata->KeyColumnNames[i]; + } + + TString leftParen = range.From().Maybe<TKqlKeyInc>().IsValid() ? "[" : "("; + TString rightParen = range.To().Maybe<TKqlKeyInc>().IsValid() ? "]" : ")"; + bool hasRangeScans = false; + auto& ranges = op.Properties["ReadRange"]; + for (const auto& keyPartRange : scanRangeDescr) { + TStringBuilder rangeDescr; + + if (keyPartRange.From == keyPartRange.To) { + if (keyPartRange.From.Empty()) { + rangeDescr << keyPartRange.ColumnName << " (-∞, +∞)"; + readInfo.ScanBy.push_back(rangeDescr); + } else { + rangeDescr << keyPartRange.ColumnName + << " (" << keyPartRange.From << ")"; + readInfo.LookupBy.push_back(rangeDescr); + } + } else { + rangeDescr << keyPartRange.ColumnName << " " + << (keyPartRange.From.Empty() ? "(" : leftParen) + << (keyPartRange.From.Empty() ? "-∞" : keyPartRange.From) << ", " + << (keyPartRange.To.Empty() ? "+∞" : keyPartRange.To) + << (keyPartRange.To.Empty() ? ")" : rightParen); + readInfo.ScanBy.push_back(rangeDescr); + hasRangeScans = true; + } + + ranges.AppendValue(rangeDescr); + } + + // Scan which fixes only few first members of compound primary key were called "Lookup" + // by older explain version. We continue to do so. + if (readInfo.LookupBy.size() > 0) { + readInfo.Type = EPlanTableReadType::Lookup; + } else { + readInfo.Type = hasRangeScans ? EPlanTableReadType::Scan : EPlanTableReadType::FullScan; + } + } + + auto& columns = op.Properties["ReadColumns"]; + for (auto const& col : sourceSettings.Columns()) { + readInfo.Columns.emplace_back(TString(col.Value())); + columns.AppendValue(col.Value()); + } + + auto settings = NYql::TKqpReadTableSettings::Parse(sourceSettings.Settings()); + if (settings.ItemsLimit && !readInfo.Limit) { + auto limit = GetExprStr(TExprBase(settings.ItemsLimit), false); + if (auto maybeResultBinding = ContainResultBinding(limit)) { + const auto [txId, resId] = *maybeResultBinding; + if (auto result = GetResult(txId, resId)) { + limit = result->GetDataText(); + } + } + + readInfo.Limit = limit; + op.Properties["ReadLimit"] = limit; + } + if (settings.Reverse) { + readInfo.Reverse = true; + op.Properties["Reverse"] = true; + } + + SerializerCtx.Tables[table].Reads.push_back(readInfo); + + ui32 operatorId; + if (readInfo.Type == EPlanTableReadType::Scan) { + op.Properties["Name"] = "TableRangeScan"; + operatorId = AddOperator(planNode, "TableRangeScan", std::move(op)); + } else if (readInfo.Type == EPlanTableReadType::FullScan) { + op.Properties["Name"] = "TableFullScan"; + operatorId = AddOperator(planNode, "TableFullScan", std::move(op)); + } else if (readInfo.Type == EPlanTableReadType::Lookup) { + op.Properties["Name"] = "TablePointLookup"; + operatorId = AddOperator(planNode, "TablePointLookup", std::move(op)); + } else { + op.Properties["Name"] = "TableScan"; + operatorId = AddOperator(planNode, "TableScan", std::move(op)); + } + } else { + const auto table = TString(sourceSettings.Table().Path()); + const auto explainPrompt = TKqpReadTableExplainPrompt::Parse(sourceSettings.ExplainPrompt().Cast()); + + TTableRead readInfo; + TOperator op; + + auto& tableData = SerializerCtx.TablesData->GetTable(SerializerCtx.Cluster, table); + op.Properties["Table"] = tableData.RelativePath ? *tableData.RelativePath : table; + planNode.NodeInfo["Tables"].AppendValue(op.Properties["Table"]); + + auto rangesDesc = PrettyExprStr(sourceSettings.RangesExpr()); + if (rangesDesc == "Void" || explainPrompt.UsedKeyColumns.empty()) { + readInfo.Type = EPlanTableReadType::FullScan; + + auto& ranges = op.Properties["ReadRanges"]; + for (const auto& col : tableData.Metadata->KeyColumnNames) { + TStringBuilder rangeDesc; + rangeDesc << col << " (-∞, +∞)"; + readInfo.ScanBy.push_back(rangeDesc); + ranges.AppendValue(rangeDesc); + } + } else if (auto maybeResultBinding = ContainResultBinding(rangesDesc)) { + readInfo.Type = EPlanTableReadType::Scan; + + auto [txId, resId] = *maybeResultBinding; + if (auto result = GetResult(txId, resId)) { + auto ranges = (*result)[0]; + const auto& keyColumns = tableData.Metadata->KeyColumnNames; + for (size_t rangeId = 0; rangeId < ranges.Size(); ++rangeId) { + Y_ENSURE(ranges[rangeId].HaveValue() && ranges[rangeId].Size() == 2); + auto from = ranges[rangeId][0]; + auto to = ranges[rangeId][1]; + + for (size_t colId = 0; colId < keyColumns.size(); ++colId) { + if (!from[colId].HaveValue() && !to[colId].HaveValue()) { + continue; + } + + TStringBuilder rangeDesc; + rangeDesc << keyColumns[colId] << " " + << (from[keyColumns.size()].GetDataText() == "1" ? "[" : "(") + << (from[colId].HaveValue() ? from[colId].GetDataText() : "-∞") << ", " + << (to[colId].HaveValue() ? to[colId].GetDataText() : "+∞") + << (to[keyColumns.size()].GetDataText() == "1" ? "]" : ")"); + + readInfo.ScanBy.push_back(rangeDesc); + op.Properties["ReadRanges"].AppendValue(rangeDesc); + } + } + } else { + op.Properties["ReadRanges"] = rangesDesc; + } + } else { + Y_ENSURE(false, rangesDesc); + } + + if (!explainPrompt.UsedKeyColumns.empty()) { + auto& usedColumns = op.Properties["ReadRangesKeys"]; + for (const auto& col : explainPrompt.UsedKeyColumns) { + usedColumns.AppendValue(col); + } + } + + if (explainPrompt.ExpectedMaxRanges) { + op.Properties["ReadRangesExpectedSize"] = explainPrompt.ExpectedMaxRanges; + } + + auto& columns = op.Properties["ReadColumns"]; + for (const auto& col : sourceSettings.Columns()) { + readInfo.Columns.emplace_back(TString(col.Value())); + columns.AppendValue(col.Value()); + } + + auto settings = NYql::TKqpReadTableSettings::Parse(sourceSettings.Settings()); + if (settings.ItemsLimit && !readInfo.Limit) { + auto limit = GetExprStr(TExprBase(settings.ItemsLimit), false); + if (auto maybeResultBinding = ContainResultBinding(limit)) { + const auto [txId, resId] = *maybeResultBinding; + if (auto result = GetResult(txId, resId)) { + limit = result->GetDataText(); + } + } + + readInfo.Limit = limit; + op.Properties["ReadLimit"] = limit; + } + if (settings.Reverse) { + readInfo.Reverse = true; + op.Properties["Reverse"] = true; + } + + if (tableData.Metadata->Kind == EKikimrTableKind::Olap) { + op.Properties["PredicatePushdown"] = SerializerCtx.Config.Get()->PushOlapProcess(); + } + + ui32 operatorId; + if (readInfo.Type == EPlanTableReadType::FullScan) { + op.Properties["Name"] = "TableFullScan"; + operatorId = AddOperator(planNode, "TableFullScan", std::move(op)); + } else { + op.Properties["Name"] = "TableRangesScan"; + operatorId = AddOperator(planNode, "TableRangesScan", std::move(op)); + } + + SerializerCtx.Tables[table].Reads.push_back(std::move(readInfo)); + } + } + void Visit(const TExprBase& expr, TQueryPlanNode& planNode) { if (expr.Maybe<TDqPhyStage>()) { auto stageGuid = NDq::TDqStageSettings::Parse(expr.Cast<TDqPhyStage>()).Id; @@ -575,12 +820,18 @@ private: } for (const auto& input : expr.Cast<TDqStageBase>().Inputs()) { - auto inputCn = input.Cast<TDqConnection>(); + if (auto source = input.Maybe<TDqSource>()) { + auto settings = source.Settings().Maybe<TKqpReadRangesSourceSettings>(); + YQL_ENSURE(settings.IsValid(), "only readranges sources are supported"); + Visit(settings.Cast(), stagePlanNode); + } else { + auto inputCn = input.Cast<TDqConnection>(); - auto& inputPlanNode = AddPlanNode(stagePlanNode); - FillConnectionPlanNode(inputCn, inputPlanNode); + auto& inputPlanNode = AddPlanNode(stagePlanNode); + FillConnectionPlanNode(inputCn, inputPlanNode); - Visit(inputCn.Output().Stage(), inputPlanNode); + Visit(inputCn.Output().Stage(), inputPlanNode); + } } } else { Visit(expr.Ptr(), planNode); diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp index a0e28b2ab21..da6a009c9f0 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp @@ -189,11 +189,13 @@ TExprBase KqpBuildReadTableRangesStage(TExprBase node, TExprContext& ctx, auto ranges = read.Ranges(); auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, read.Table().Path()); + bool useSource = kqpCtx.Config->FeatureFlags.GetEnableKqpScanQuerySourceRead() && kqpCtx.IsScanQuery(); bool fullScan = TCoVoid::Match(ranges.Raw()); TVector<TExprBase> input; TMaybeNode<TExprBase> argument; TVector<TCoArgument> programArgs; + TMaybeNode<TExprBase> rangesExpr; if (!fullScan) { TMaybe<TDqStage> rangesStage; @@ -268,28 +270,59 @@ TExprBase KqpBuildReadTableRangesStage(TExprBase node, TExprContext& ctx, .Build() .Done(); - argument = Build<TCoArgument>(ctx, read.Pos()) - .Name("_kqp_pc_ranges_arg_0") - .Done(); + rangesExpr = precompute; + if (!useSource) { + argument = Build<TCoArgument>(ctx, read.Pos()) + .Name("_kqp_pc_ranges_arg_0") + .Done(); - input.push_back(precompute); - programArgs.push_back(argument.Cast<TCoArgument>()); + input.push_back(precompute); + programArgs.push_back(argument.Cast<TCoArgument>()); + } } else { argument = read.Ranges(); } TMaybeNode<TExprBase> phyRead; + TMaybeNode<TExprBase> sourceArg; + if (useSource) { + YQL_ENSURE(rangesExpr.IsValid()); + + input.push_back( + Build<TDqSource>(ctx, read.Pos()) + .Settings<TKqpReadRangesSourceSettings>() + .Table(read.Table()) + .Columns(read.Columns()) + .Settings(read.Settings()) + .RangesExpr(rangesExpr.Cast()) + .ExplainPrompt(read.ExplainPrompt()) + .Build() + .DataSource<TCoDataSource>() + .Category<TCoAtom>().Value(KqpReadRangesSourceName).Build() + .Build() + .Done()); + sourceArg = Build<TCoArgument>(ctx, read.Pos()) + .Name("_kqp_pc_source_arg_0") + .Done(); + programArgs.push_back(sourceArg.Cast<TCoArgument>()); + } + switch (tableDesc.Metadata->Kind) { case EKikimrTableKind::Datashard: case EKikimrTableKind::SysView: - phyRead = Build<TKqpReadTableRanges>(ctx, read.Pos()) - .Table(read.Table()) - .Ranges(argument.Cast()) - .Columns(read.Columns()) - .Settings(read.Settings()) - .ExplainPrompt(read.ExplainPrompt()) - .Done(); + if (useSource) { + YQL_ENSURE(sourceArg.IsValid()); + phyRead = sourceArg.Cast(); + } else { + phyRead = Build<TKqpReadTableRanges>(ctx, read.Pos()) + .Table(read.Table()) + .Ranges(argument.Cast()) + .Columns(read.Columns()) + .Settings(read.Settings()) + .ExplainPrompt(read.ExplainPrompt()) + .Done(); + } break; case EKikimrTableKind::Olap: diff --git a/ydb/core/kqp/runtime/CMakeLists.txt b/ydb/core/kqp/runtime/CMakeLists.txt index 2908169720e..3f0643e6e81 100644 --- a/ydb/core/kqp/runtime/CMakeLists.txt +++ b/ydb/core/kqp/runtime/CMakeLists.txt @@ -39,6 +39,7 @@ target_sources(core-kqp-runtime PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_effects.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_output_stream.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_program_builder.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_read_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_read_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp new file mode 100644 index 00000000000..472c6f25c54 --- /dev/null +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -0,0 +1,837 @@ +#include "kqp_read_actor.h" + +#include <ydb/core/kqp/runtime/kqp_scan_data.h> +#include <ydb/core/base/tablet_pipecache.h> +#include <ydb/core/engine/minikql/minikql_engine_host.h> +#include <ydb/core/kqp/common/kqp_yql.h> +#include <ydb/core/protos/tx_datashard.pb.h> +#include <ydb/core/tx/datashard/datashard.h> +#include <ydb/core/tx/datashard/range_ops.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> + +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h> + +#include <library/cpp/actors/core/interconnect.h> +#include <library/cpp/actors/core/actorsystem.h> + +#include <util/generic/intrlist.h> + +namespace { + +static constexpr ui64 EVREAD_MAX_ROWS = 32767; +static constexpr ui64 EVREAD_MAX_BYTES = 200_MB; + +static constexpr ui64 MAX_SHARD_RETRIES = 5; +//static constexpr ui64 MAX_TOTAL_SHARD_RETRIES = 20; +static constexpr ui64 MAX_SHARD_RESOLVES = 3; + +bool IsDebugLogEnabled(const NActors::TActorSystem* actorSystem, NActors::NLog::EComponent component) { + auto* settings = actorSystem->LoggerSettings(); + return settings && settings->Satisfies(NActors::NLog::EPriority::PRI_DEBUG, component); +} + +} + +namespace NKikimr { +namespace NKqp { + +using namespace NYql; +using namespace NYql::NDq; +using namespace NKikimr; +using namespace NKikimr::NDataShard; + + +class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq::IDqComputeActorAsyncInput { + using TBase = TActorBootstrapped<TKqpReadActor>; +public: + struct TShardState : public TIntrusiveListItem<TShardState> { + TSmallVec<TSerializedTableRange> Ranges; + TSmallVec<TSerializedCellVec> Points; + + TOwnedCellVec LastKey; + ui32 FirstUnprocessedRequest = 0; + TMaybe<ui32> ReadId; + ui64 TabletId; + + size_t ResolveAttempt = 0; + size_t RetryAttempt = 0; + + TShardState(ui64 tabletId) + : TabletId(tabletId) + { + } + + TTableRange GetBounds() { + if (Ranges.empty()) { + YQL_ENSURE(!Points.empty()); + return TTableRange( + Points.front().GetCells(), true, + Points.back().GetCells(), true); + } else { + return TTableRange( + Ranges.front().From.GetCells(), Ranges.front().FromInclusive, + Ranges.back().To.GetCells(), Ranges.back().ToInclusive); + } + } + + static void MakePrefixRange(TSerializedTableRange& range, size_t keyColumns) { + if (keyColumns == 0) { + return; + } + bool fromInclusive = range.FromInclusive; + TConstArrayRef<TCell> from = range.From.GetCells(); + + bool toInclusive = range.ToInclusive; + TConstArrayRef<TCell> to = range.To.GetCells(); + + bool noop = true; + // Recognize and remove padding made here https://a.yandex-team.ru/arcadia/ydb/core/kqp/executer/kqp_partition_helper.cpp?rev=r10109549#L284 + + // Absent cells mean infinity. So in prefix notation `From` should be exclusive. + // For example x >= (Key1, Key2, +infinity) is equivalent to x > (Key1, Key2) where x is arbitrary tuple + if (range.From.GetCells().size() < keyColumns) { + fromInclusive = false; + noop = range.FromInclusive; + } else if (range.FromInclusive) { + // Nulls are minimum values so we should remove null padding. + // x >= (Key1, Key2, null) is equivalent to x >= (Key1, Key2) + ssize_t i = range.From.GetCells().size(); + while (i > 0 && range.From.GetCells()[i - 1].IsNull()) { + --i; + noop = false; + } + from = range.From.GetCells().subspan(0, i); + } + + // Absent cells mean infinity. So in prefix notation `To` should be inclusive. + // For example x < (Key1, Key2, +infinity) is equivalent to x <= (Key1, Key2) where x is arbitrary tuple + if (range.To.GetCells().size() < keyColumns) { + toInclusive = true; + // Nulls are minimum values so we should remove null padding. + // For example x < (Key1, Key2, null) is equivalent to x < (Key1, Key2) + ssize_t i = range.To.GetCells().size(); + while (i > 0 && range.To.GetCells()[i - 1].IsNull()) { + --i; + noop = false; + } + to = range.To.GetCells().subspan(0, i); + } + + if (!noop) { + return; + } + + range = TSerializedTableRange(from, fromInclusive, to, toInclusive); + } + + void FillUnprocessedRanges( + TVector<TSerializedTableRange>& result, + TConstArrayRef<NScheme::TTypeInfo> keyTypes) const + { + // Form new vector. Skip ranges already read. + bool lastKeyEmpty = LastKey.DataSize() == 0; + + if (!lastKeyEmpty) { + YQL_ENSURE(keyTypes.size() == LastKey.size(), "Key columns size != last key"); + } + + auto rangeIt = Ranges.begin() + FirstUnprocessedRequest; + + if (!lastKeyEmpty) { + // It is range, where read was interrupted. Restart operation from last read key. + result.emplace_back(std::move(TSerializedTableRange( + TSerializedCellVec::Serialize(LastKey), rangeIt->To.GetBuffer(), false, rangeIt->ToInclusive + ))); + ++rangeIt; + } + + // And push all others + result.insert(result.end(), rangeIt, Ranges.end()); + for (auto& range : result) { + MakePrefixRange(range, keyTypes.size()); + } + } + + void FillUnprocessedPoints(TVector<TSerializedCellVec>& result) const { + result.insert(result.begin(), Points.begin() + FirstUnprocessedRequest, Points.end()); + } + + void FillEvRead(TEvDataShard::TEvRead& ev, TConstArrayRef<NScheme::TTypeInfo> keyTypes) { + if (Ranges.empty()) { + FillUnprocessedPoints(ev.Keys); + } else { + FillUnprocessedRanges(ev.Ranges, keyTypes); + } + } + + TString ToString(TConstArrayRef<NScheme::TTypeInfo> keyTypes) const { + TStringBuilder sb; + sb << "TShardState{ TabletId: " << TabletId << ", Last Key " << PrintLastKey(keyTypes) + << ", Ranges: ["; + for (size_t i = 0; i < Ranges.size(); ++i) { + sb << "#" << i << ": " << DebugPrintRange(keyTypes, Ranges[i].ToTableRange(), *AppData()->TypeRegistry); + if (i + 1 != Ranges.size()) { + sb << ", "; + } + } + sb << "], " + << ", RetryAttempt: " << RetryAttempt << ", ResolveAttempt: " << ResolveAttempt << " }"; + return sb; + } + + TString PrintLastKey(TConstArrayRef<NScheme::TTypeInfo> keyTypes) const { + if (LastKey.empty()) { + return "<none>"; + } + return DebugPrintPoint(keyTypes, LastKey, *AppData()->TypeRegistry); + } + }; + + using TShardQueue = TIntrusiveListWithAutoDelete<TShardState, TDelete>; + + struct TReadState { + TShardState* Shard = nullptr; + bool Finished = false; + ui64 LastSeqNo; + TMaybe<TString> SerializedContinuationToken; + + void RegisterMessage(const TEvDataShard::TEvReadResult& result) { + LastSeqNo = result.Record.GetSeqNo(); + Finished = result.Record.GetFinished(); + } + + bool IsLastMessage(const TEvDataShard::TEvReadResult& result) { + return result.Record.GetFinished() || (Finished && result.Record.GetSeqNo() == LastSeqNo); + } + + operator bool () { + return Shard; + } + + void Reset() { + Shard = nullptr; + } + }; + +public: + TKqpReadActor(NKikimrTxDataShard::TKqpReadRangesSourceSettings settings, NYql::NDq::TDqAsyncIoFactory::TSourceArguments args) + : Settings(std::move(settings)) + , LogPrefix(TStringBuilder() << "SelfId: " << this->SelfId() << ", TxId: " << args.TxId << ", task: " << args.TaskId << ". ") + , ComputeActorId(args.ComputeActorId) + , InputIndex(args.InputIndex) + , TypeEnv(args.TypeEnv) + , HolderFactory(args.HolderFactory) + { + TableId = TTableId( + Settings.GetTable().GetTableId().GetOwnerId(), + Settings.GetTable().GetTableId().GetTableId(), + Settings.GetTable().GetSysViewInfo(), + Settings.GetTable().GetTableId().GetSchemaVersion() + ); + + KeyColumnTypes.reserve(Settings.GetKeyColumnTypes().size()); + for (auto typeId : Settings.GetKeyColumnTypes()) { + KeyColumnTypes.push_back(NScheme::TTypeInfo((NScheme::TTypeId)typeId)); + } + } + + STFUNC(ReadyState) { + Y_UNUSED(ctx); + try { + switch (ev->GetTypeRewrite()) { + hFunc(TEvDataShard::TEvReadResult, HandleRead); + hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, HandleResolve); + hFunc(TEvPipeCache::TEvDeliveryProblem, HandleError); + IgnoreFunc(TEvInterconnect::TEvNodeConnected); + IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult); + } + } catch (const yexception& e) { + RuntimeError(e.what(), NYql::NDqProto::StatusIds::INTERNAL_ERROR); + } + } + + void Bootstrap() { + //TODO: resolve if hint is not set + THolder<TShardState> stateHolder = MakeHolder<TShardState>(Settings.GetShardIdHint()); + PendingShards.PushBack(stateHolder.Get()); + auto& state = *stateHolder.Release(); + + if (Settings.HasFullRange()) { + state.Ranges.push_back(TSerializedTableRange(Settings.GetFullRange())); + } else { + YQL_ENSURE(Settings.HasRanges()); + if (Settings.GetRanges().KeyRangesSize() > 0) { + YQL_ENSURE(Settings.GetRanges().KeyPointsSize() == 0); + for (const auto& range : Settings.GetRanges().GetKeyRanges()) { + state.Ranges.push_back(TSerializedTableRange(range)); + } + } else { + for (const auto& point : Settings.GetRanges().GetKeyPoints()) { + state.Points.push_back(TSerializedCellVec(point)); + } + } + } + + StartTableScan(); + Become(&TKqpReadActor::ReadyState); + } + + bool StartTableScan() { + const ui32 maxAllowedInFlight = MaxInFlight; + bool isFirst = true; + while (!PendingShards.Empty() && RunningReads() + 1 <= maxAllowedInFlight) { + if (isFirst) { + CA_LOG_D("BEFORE: " << PendingShards.Size() << "." << RunningReads()); + isFirst = false; + } + auto state = THolder<TShardState>(PendingShards.PopFront()); + InFlightShards.PushFront(state.Get()); + StartRead(state.Release()); + } + if (!isFirst) { + CA_LOG_D("AFTER: " << PendingShards.Size() << "." << RunningReads()); + } + + CA_LOG_D("Scheduled table scans, in flight: " << RunningReads() << " shards. " + << "pending shards to read: " << PendingShards.Size() << ", "); + + return RunningReads() > 0 || !PendingShards.Empty(); + } + + void ResolveShard(TShardState* state) { + if (state->ResolveAttempt >= MAX_SHARD_RESOLVES) { + RuntimeError(TStringBuilder() << "Table '" << Settings.GetTable().GetTablePath() << "' resolve limit exceeded", + NDqProto::StatusIds::UNAVAILABLE); + return; + } + + state->ResolveAttempt++; + + auto range = state->GetBounds(); + TVector<TKeyDesc::TColumnOp> columns; + columns.reserve(Settings.GetColumns().size()); + for (const auto& column : Settings.GetColumns()) { + TKeyDesc::TColumnOp op; + op.Column = column.GetId(); + op.Operation = TKeyDesc::EColumnOperation::Read; + op.ExpectedType = NScheme::TTypeInfo((NScheme::TTypeId)column.GetType()); + columns.emplace_back(std::move(op)); + } + + auto keyDesc = MakeHolder<TKeyDesc>(TableId, range, TKeyDesc::ERowOperation::Read, + KeyColumnTypes, columns); + + CA_LOG_D("Sending TEvResolveKeySet update for table '" << Settings.GetTable().GetTablePath() << "'" + << ", range: " << DebugPrintRange(KeyColumnTypes, range, *AppData()->TypeRegistry) + << ", attempt #" << state->ResolveAttempt); + + auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>(); + request->ResultSet.emplace_back(std::move(keyDesc)); + + request->ResultSet.front().UserData = ResolveShardId; + ResolveShards[ResolveShardId] = state; + ResolveShardId += 1; + + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {})); + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request)); + } + + void HandleResolve(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) { + CA_LOG_D("Received TEvResolveKeySetResult update for table '" << Settings.GetTable().GetTablePath() << "'"); + + auto* request = ev->Get()->Request.Get(); + if (request->ErrorCount > 0) { + CA_LOG_E("Resolve request failed for table '" << Settings.GetTable().GetTablePath() << "', ErrorCount# " << request->ErrorCount); + + auto statusCode = NDqProto::StatusIds::UNAVAILABLE; + auto issueCode = TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE; + TString error; + + for (const auto& x : request->ResultSet) { + if ((ui32)x.Status < (ui32)NSchemeCache::TSchemeCacheRequest::EStatus::OkScheme) { + // invalidate table + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {})); + + switch (x.Status) { + case NSchemeCache::TSchemeCacheRequest::EStatus::PathErrorNotExist: + statusCode = NDqProto::StatusIds::SCHEME_ERROR; + issueCode = TIssuesIds::KIKIMR_SCHEME_MISMATCH; + error = TStringBuilder() << "Table '" << Settings.GetTable().GetTablePath() << "' not exists."; + break; + case NSchemeCache::TSchemeCacheRequest::EStatus::TypeCheckError: + statusCode = NDqProto::StatusIds::SCHEME_ERROR; + issueCode = TIssuesIds::KIKIMR_SCHEME_MISMATCH; + error = TStringBuilder() << "Table '" << Settings.GetTable().GetTablePath() << "' scheme changed."; + break; + case NSchemeCache::TSchemeCacheRequest::EStatus::LookupError: + statusCode = NDqProto::StatusIds::UNAVAILABLE; + issueCode = TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE; + error = TStringBuilder() << "Failed to resolve table '" << Settings.GetTable().GetTablePath() << "'."; + break; + default: + statusCode = NDqProto::StatusIds::SCHEME_ERROR; + issueCode = TIssuesIds::KIKIMR_SCHEME_MISMATCH; + error = TStringBuilder() << "Unresolved table '" << Settings.GetTable().GetTablePath() << "'. Status: " << x.Status; + break; + } + } + } + + return RuntimeError(error, statusCode); + } + + auto keyDesc = std::move(request->ResultSet[0].KeyDescription); + THolder<TShardState> state; + if (auto ptr = ResolveShards[request->ResultSet[0].UserData]) { + state = THolder<TShardState>(ptr); + ResolveShards.erase(request->ResultSet[0].UserData); + } else { + return; + } + + if (keyDesc->GetPartitions().empty()) { + TString error = TStringBuilder() << "No partitions to read from '" << Settings.GetTable().GetTablePath() << "'"; + CA_LOG_E(error); + return RuntimeError(error, NDqProto::StatusIds::SCHEME_ERROR); + } + + const auto& tr = *AppData()->TypeRegistry; + + TVector<THolder<TShardState>> newShards; + newShards.reserve(keyDesc->GetPartitions().size()); + + for (ui64 idx = 0, i = 0; idx < keyDesc->GetPartitions().size(); ++idx) { + const auto& partition = keyDesc->GetPartitions()[idx]; + + TTableRange partitionRange{ + idx == 0 ? state->Ranges.front().From.GetCells() : keyDesc->GetPartitions()[idx - 1].Range->EndKeyPrefix.GetCells(), + idx == 0 ? state->Ranges.front().FromInclusive : !keyDesc->GetPartitions()[idx - 1].Range->IsInclusive, + keyDesc->GetPartitions()[idx].Range->EndKeyPrefix.GetCells(), + keyDesc->GetPartitions()[idx].Range->IsInclusive + }; + + CA_LOG_D("Processing resolved ShardId# " << partition.ShardId + << ", partition range: " << DebugPrintRange(KeyColumnTypes, partitionRange, tr) + << ", i: " << i << ", state ranges: " << state->Ranges.size()); + + auto newShard = MakeHolder<TShardState>(partition.ShardId); + + for (ui64 j = i; j < state->Ranges.size(); ++j) { + CA_LOG_D("Intersect state range #" << j << " " << DebugPrintRange(KeyColumnTypes, state->Ranges[j].ToTableRange(), tr) + << " with partition range " << DebugPrintRange(KeyColumnTypes, partitionRange, tr)); + + auto intersection = Intersect(KeyColumnTypes, partitionRange, state->Ranges[j].ToTableRange()); + + if (!intersection.IsEmptyRange(KeyColumnTypes)) { + CA_LOG_D("Add range to new shardId: " << partition.ShardId + << ", range: " << DebugPrintRange(KeyColumnTypes, intersection, tr)); + + newShard->Ranges.emplace_back(TSerializedTableRange(intersection)); + } else { + CA_LOG_D("empty intersection"); + if (j > i) { + i = j - 1; + } + break; + } + } + + if (!newShard->Ranges.empty()) { + newShards.push_back(std::move(newShard)); + } + } + + YQL_ENSURE(!newShards.empty()); + for (int i = newShards.ysize() - 1; i >= 0; --i) { + PendingShards.PushFront(newShards[i].Release()); + } + + if (!state->LastKey.empty()) { + PendingShards.Front()->LastKey = std::move(state->LastKey); + } + + if (IsDebugLogEnabled(TlsActivationContext->ActorSystem(), NKikimrServices::KQP_COMPUTE) + && PendingShards.Size() + RunningReads() > 0) + { + TStringBuilder sb; + if (!PendingShards.Empty()) { + sb << "Pending shards States: "; + for (auto& st : PendingShards) { + sb << st.ToString(KeyColumnTypes) << "; "; + } + } + + if (!InFlightShards.Empty()) { + sb << "In Flight shards States: "; + for (auto& st : InFlightShards) { + sb << st.ToString(KeyColumnTypes) << "; "; + } + } + CA_LOG_D(sb); + } + StartTableScan(); + } + + void RetryRead(ui64 id) { + if (!Reads[id]) { + return; + } + + auto state = Reads[id].Shard; + Reads[id].Finished = true; + + state->RetryAttempt += 1; + if (state->RetryAttempt >= MAX_SHARD_RETRIES) { + return ResolveShard(state); + } + CA_LOG_D("Retrying read #" << id); + + auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>(); + cancel->Record.SetReadId(id); + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery); + + if (Reads[id].SerializedContinuationToken) { + NKikimrTxDataShard::TReadContinuationToken token; + Y_VERIFY(token.ParseFromString(*(Reads[id].SerializedContinuationToken)), "Failed to parse continuation token"); + state->FirstUnprocessedRequest = token.GetFirstUnprocessedQuery(); + + if (token.GetLastProcessedKey()) { + TSerializedCellVec vec(token.GetLastProcessedKey()); + state->LastKey = TOwnedCellVec(vec.GetCells()); + } + } + + StartRead(state); + } + + void StartRead(TShardState* state) { + THolder<TEvDataShard::TEvRead> ev(new TEvDataShard::TEvRead()); + auto& record = ev->Record; + + state->FillEvRead(*ev, KeyColumnTypes); + for (const auto& column : Settings.GetColumns()) { + record.AddColumns(column.GetId()); + } + + { + record.MutableSnapshot()->SetTxId(Settings.GetSnapshot().GetTxId()); + record.MutableSnapshot()->SetStep(Settings.GetSnapshot().GetStep()); + } + + //if (RuntimeSettings.Timeout) { + // ev->Record.SetTimeoutMs(RuntimeSettings.Timeout.Get()->MilliSeconds()); + //} + //ev->Record.SetStatsMode(RuntimeSettings.StatsMode); + //ev->Record.SetTxId(std::get<ui64>(TxId)); + + auto id = ReadId++; + Reads.resize(ReadId); + Reads[id].Shard = state; + state->ReadId = id; + + record.SetReadId(id); + + record.MutableTableId()->SetOwnerId(Settings.GetTable().GetTableId().GetOwnerId()); + record.MutableTableId()->SetTableId(Settings.GetTable().GetTableId().GetTableId()); + record.MutableTableId()->SetSchemaVersion(Settings.GetTable().GetSchemaVersion()); + + record.SetReverse(Settings.GetReverse()); + if (Settings.GetItemsLimit()) { + record.SetMaxRows(Settings.GetItemsLimit()); + } else { + record.SetMaxRows(EVREAD_MAX_ROWS); + } + record.SetMaxBytes(EVREAD_MAX_BYTES); + + record.SetResultFormat(Settings.GetDataFormat()); + + CA_LOG_D(TStringBuilder() << "Send EvRead to shardId: " << state->TabletId << ", tablePath: " << Settings.GetTable().GetTablePath() + << ", ranges: " << DebugPrintRanges(KeyColumnTypes, ev->Ranges, *AppData()->TypeRegistry) + << ", readId = " << id); + + ReadIdByTabletId[state->TabletId].push_back(id); + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true), + IEventHandle::FlagTrackDelivery); + } + + void HandleRead(TEvDataShard::TEvReadResult::TPtr ev) { + const auto& record = ev->Get()->Record; + auto id = record.GetReadId(); + Y_VERIFY(id < ReadId); + if (!Reads[id] || Reads[id].Finished) { + // dropped read + return; + } + + Reads[id].SerializedContinuationToken = record.GetContinuationToken(); + if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) { + for (auto& issue : record.GetStatus().GetIssues()) { + CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage()); + } + return RetryRead(id); + } + + Reads[id].RegisterMessage(*ev->Get()); + + YQL_ENSURE(record.GetResultFormat() == NKikimrTxDataShard::EScanDataFormat::CELLVEC); + + Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())}); + CA_LOG_D(TStringBuilder() << "new data for read #" << id << " pushed"); + Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); + } + + void HandleError(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { + auto& msg = *ev->Get(); + + for (auto& read : ReadIdByTabletId[msg.TabletId]) { + CA_LOG_W("Got EvDeliveryProblem, TabletId: " << msg.TabletId << ", NotDelivered: " << msg.NotDelivered); + RetryRead(read); + } + } + + size_t RunningReads() const { + return Reads.size() - ResetReads; + } + + ui64 GetInputIndex() const override { + return InputIndex; + } + + NMiniKQL::TBytesStatistics GetRowSize(const NUdf::TUnboxedValue* row) { + NMiniKQL::TBytesStatistics rowStats{0, 0}; + for (size_t i = 0; i < Settings.ColumnsSize(); ++i) { + if (IsSystemColumn(Settings.GetColumns(i).GetId())) { + rowStats.AllocatedBytes += sizeof(NUdf::TUnboxedValue); + } else { + rowStats.AddStatistics(NMiniKQL::GetUnboxedValueSize(row[i], NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(i).GetType()))); + } + } + if (Settings.ColumnsSize() == 0) { + rowStats.AddStatistics({sizeof(ui64), sizeof(ui64)}); + } + return rowStats; + } + + TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator() { + return TypeEnv.BindAllocator(); + } + + NMiniKQL::TBytesStatistics PackArrow( + THolder<TEventHandle<TEvDataShard::TEvReadResult>>& result, + ui64 shardId, + NKikimr::NMiniKQL::TUnboxedValueVector& batch) + { + NMiniKQL::TBytesStatistics stats; + bool hasResultColumns = false; + if (Settings.ColumnsSize() == 0) { + batch.resize(result->Get()->GetRowsCount(), HolderFactory.GetEmptyContainer()); + } else { + TVector<NUdf::TUnboxedValue*> editAccessors(result->Get()->GetRowsCount()); + batch.reserve(result->Get()->GetRowsCount()); + + for (ui64 rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) { + batch.emplace_back(HolderFactory.CreateDirectArrayHolder( + Settings.columns_size(), + editAccessors[rowIndex]) + ); + } + + for (size_t columnIndex = 0; columnIndex < Settings.ColumnsSize(); ++columnIndex) { + auto tag = Settings.GetColumns(columnIndex).GetId(); + auto type = NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(columnIndex).GetType()); + if (IsSystemColumn(tag)) { + for (ui64 rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) { + NMiniKQL::FillSystemColumn(editAccessors[rowIndex][columnIndex], shardId, tag, type); + stats.AllocatedBytes += sizeof(NUdf::TUnboxedValue); + } + } else { + hasResultColumns = true; + stats.AddStatistics( + NMiniKQL::WriteColumnValuesFromArrow(editAccessors, *result->Get()->ArrowBatch, columnIndex, type) + ); + } + } + } + + if (!hasResultColumns) { + auto rowsCnt = result->Get()->GetRowsCount(); + stats.AddStatistics({sizeof(ui64) * rowsCnt, sizeof(ui64) * rowsCnt}); + } + return stats; + } + + NMiniKQL::TBytesStatistics PackCells( + THolder<TEventHandle<TEvDataShard::TEvReadResult>>& result, + ui64 shardId, + NKikimr::NMiniKQL::TUnboxedValueVector& batch) + { + NMiniKQL::TBytesStatistics stats; + batch.reserve(batch.size()); + for (size_t rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) { + const auto& row = result->Get()->GetCells(rowIndex); + NUdf::TUnboxedValue* rowItems = nullptr; + batch.emplace_back(HolderFactory.CreateDirectArrayHolder(Settings.ColumnsSize(), rowItems)); + for (size_t i = 0; i < Settings.ColumnsSize(); ++i) { + auto tag = Settings.GetColumns(i).GetId(); + auto type = NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(i).GetType()); + if (IsSystemColumn(tag)) { + NMiniKQL::FillSystemColumn(rowItems[i], shardId, tag, type); + } else { + rowItems[i] = NMiniKQL::GetCellValue(row[i], type); + } + } + stats.AddStatistics(GetRowSize(rowItems)); + } + return stats; + } + + i64 GetAsyncInputData( + NKikimr::NMiniKQL::TUnboxedValueVector& resultVector, + TMaybe<TInstant>&, + bool& finished, + i64 freeSpace) override + { + ui64 bytes = 0; + while (!Results.empty()) { + auto& [shardId, result, batch, processedRows] = Results.front(); + auto& msg = *result->Get(); + if (!batch.Defined()) { + batch.ConstructInPlace(); + switch (msg.Record.GetResultFormat()) { + case NKikimrTxDataShard::EScanDataFormat::ARROW: + PackArrow(result, shardId, *batch); + break; + case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED: + case NKikimrTxDataShard::EScanDataFormat::CELLVEC: + PackCells(result, shardId, *batch); + } + } + + auto id = result->Get()->Record.GetReadId(); + if (!Reads[id]) { + Results.pop(); + continue; + } + auto* state = Reads[id].Shard; + + for (; processedRows < batch->size(); ++processedRows) { + NMiniKQL::TBytesStatistics rowSize = GetRowSize((*batch)[processedRows].GetElements()); + if (static_cast<ui64>(freeSpace) < bytes + rowSize.AllocatedBytes) { + break; + } + resultVector.push_back(std::move((*batch)[processedRows])); + RowCount += 1; + bytes += rowSize.AllocatedBytes; + } + CA_LOG_D(TStringBuilder() << "returned " << resultVector.size() << " rows"); + + if (batch->size() == processedRows) { + auto& record = msg.Record; + if (Reads[id].IsLastMessage(msg)) { + Reads[id].Reset(); + ResetReads++; + } else if (!Reads[id].Finished) { + THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck()); + request->Record.SetReadId(record.GetReadId()); + request->Record.SetSeqNo(record.GetSeqNo()); + request->Record.SetMaxRows(EVREAD_MAX_ROWS); + request->Record.SetMaxBytes(EVREAD_MAX_BYTES); + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), state->TabletId, true), + IEventHandle::FlagTrackDelivery); + } + + StartTableScan(); + if (PendingShards.Size() > 0) { + return bytes; + } + + Results.pop(); + CA_LOG_D("dropping batch"); + + if (RunningReads() == 0 || (Settings.HasItemsLimit() && RowCount >= Settings.GetItemsLimit())) { + finished = true; + } + } else { + break; + } + } + + return bytes; + } + + void SaveState(const NYql::NDqProto::TCheckpoint&, NYql::NDqProto::TSourceState&) override {} + void CommitState(const NYql::NDqProto::TCheckpoint&) override {} + void LoadState(const NYql::NDqProto::TSourceState&) override {} + + void PassAway() override { + { + auto guard = BindAllocator(); + Results.clear(); + } + TBase::PassAway(); + } + + void RuntimeError(const TString& message, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& subIssues = {}) { + NYql::TIssue issue(message); + for (const auto& i : subIssues) { + issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i)); + } + + NYql::TIssues issues; + issues.AddIssue(std::move(issue)); + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), statusCode)); + } + +private: + NKikimrTxDataShard::TKqpReadRangesSourceSettings Settings; + + TVector<NScheme::TTypeInfo> KeyColumnTypes; + + size_t RowCount = 0; + ui64 ResetReads = 0; + ui64 ReadId = 0; + TVector<TReadState> Reads; + THashMap<ui64, TVector<ui32>> ReadIdByTabletId; + + THashMap<ui64, TShardState*> ResolveShards; + ui64 ResolveShardId; + + TShardQueue InFlightShards; + TShardQueue PendingShards; + + struct TResult { + ui64 ShardId; + THolder<TEventHandle<TEvDataShard::TEvReadResult>> ReadResult; + TMaybe<NKikimr::NMiniKQL::TUnboxedValueVector> Batch; + size_t ProcessedRows = 0; + + TResult(ui64 shardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>> readResult) + : ShardId(shardId) + , ReadResult(std::move(readResult)) + { + } + }; + TQueue<TResult> Results; + + ui32 MaxInFlight = 1024; + const TString LogPrefix; + TTableId TableId; + + TActorId ComputeActorId; + ui64 InputIndex; + const NMiniKQL::TTypeEnvironment& TypeEnv; + const NMiniKQL::THolderFactory& HolderFactory; +}; + + +void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory& factory) { + factory.RegisterSource<NKikimrTxDataShard::TKqpReadRangesSourceSettings>( + TString(NYql::KqpReadRangesSourceName), + [] (NKikimrTxDataShard::TKqpReadRangesSourceSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TSourceArguments args) { + auto* actor = new TKqpReadActor(std::move(settings), args); + return std::make_pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*>(actor, actor); + }); +} + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_read_actor.h b/ydb/core/kqp/runtime/kqp_read_actor.h new file mode 100644 index 00000000000..28f3e17873c --- /dev/null +++ b/ydb/core/kqp/runtime/kqp_read_actor.h @@ -0,0 +1,11 @@ +#pragma once + +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> + +namespace NKikimr { +namespace NKqp { + +void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory& factory); + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp index 2c39d685db5..ead01cb124d 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data.cpp +++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp @@ -10,19 +10,6 @@ namespace NKikimr { namespace NMiniKQL { -namespace { - -struct TBytesStatistics { - ui64 AllocatedBytes = 0; - ui64 DataBytes = 0; - - void AddStatistics(const TBytesStatistics& other) { - AllocatedBytes += other.AllocatedBytes; - DataBytes += other.DataBytes; - } - -}; - TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, NScheme::TTypeInfo type) { namespace NTypeIds = NScheme::NTypeIds; if (!value) { @@ -86,6 +73,18 @@ TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, NScheme:: } } +void FillSystemColumn(NUdf::TUnboxedValue& rowItem, TMaybe<ui64> shardId, NTable::TTag tag, NScheme::TTypeInfo) { + YQL_ENSURE(tag == TKeyDesc::EColumnIdDataShard, "Unknown system column tag: " << tag); + + if (shardId) { + rowItem = NUdf::TUnboxedValuePod(*shardId); + } else { + rowItem = NUdf::TUnboxedValue(); + } +} + +namespace { + TBytesStatistics GetRowSize(const NUdf::TUnboxedValue* row, const TSmallVec<TKqpComputeContextBase::TColumn>& columns, const TSmallVec<TKqpComputeContextBase::TColumn>& systemColumns) { @@ -101,13 +100,7 @@ TBytesStatistics GetRowSize(const NUdf::TUnboxedValue* row, const TSmallVec<TKqp void FillSystemColumns(NUdf::TUnboxedValue* rowItems, TMaybe<ui64> shardId, const TSmallVec<TKqpComputeContextBase::TColumn>& systemColumns) { for (ui32 i = 0; i < systemColumns.size(); ++i) { - YQL_ENSURE(systemColumns[i].Tag == TKeyDesc::EColumnIdDataShard, "Unknown system column tag: " << systemColumns[i].Tag); - - if (shardId) { - rowItems[i] = NUdf::TUnboxedValuePod(*shardId); - } else { - rowItems[i] = NUdf::TUnboxedValue(); - } + FillSystemColumn(rowItems[i], shardId, systemColumns[i].Tag, systemColumns[i].Type); } } @@ -143,6 +136,8 @@ NUdf::TUnboxedValue MakeUnboxedValueFromDecimal128Array(arrow::Array* column, ui return NUdf::TUnboxedValuePod(val); } +} // namespace + TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors, const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType) { @@ -250,7 +245,7 @@ TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& return columnStats; } -} // namespace + std::pair<ui64, ui64> GetUnboxedValueSizeForTests(const NUdf::TUnboxedValue& value, NScheme::TTypeInfo type) { auto sizes = GetUnboxedValueSize(value, type); diff --git a/ydb/core/kqp/runtime/kqp_scan_data.h b/ydb/core/kqp/runtime/kqp_scan_data.h index e365ffa0933..6391be44f58 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data.h +++ b/ydb/core/kqp/runtime/kqp_scan_data.h @@ -22,6 +22,23 @@ namespace NKikimrTxDataShard { namespace NKikimr { namespace NMiniKQL { +struct TBytesStatistics { + ui64 AllocatedBytes = 0; + ui64 DataBytes = 0; + + void AddStatistics(const TBytesStatistics& other) { + AllocatedBytes += other.AllocatedBytes; + DataBytes += other.DataBytes; + } + +}; + +TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, NScheme::TTypeInfo type); +TBytesStatistics WriteColumnValuesFromArrow(const TVector<NUdf::TUnboxedValue*>& editAccessors, + const arrow::RecordBatch& batch, i64 columnIndex, NScheme::TTypeInfo columnType); + +void FillSystemColumn(NUdf::TUnboxedValue& rowItem, TMaybe<ui64> shardId, NTable::TTag tag, NScheme::TTypeInfo type); + std::pair<ui64, ui64> GetUnboxedValueSizeForTests(const NUdf::TUnboxedValue& value, NScheme::TTypeInfo type); class IKqpTableReader : public TSimpleRefCount<IKqpTableReader> { diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp index 305941e4322..217aa363695 100644 --- a/ydb/core/kqp/ut/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp @@ -1960,6 +1960,26 @@ Y_UNIT_TEST_SUITE(KqpScan) { CompareYson("[[%false]]", StreamResultToYson(result)); } + Y_UNIT_TEST(DqSource) { + TKikimrSettings settings; + settings.SetDomainRoot(KikimrDefaultUtDomainRoot); + TFeatureFlags flags; + flags.SetEnablePredicateExtractForDataQueries(true); + flags.SetEnableKqpScanQuerySourceRead(true); + settings.SetFeatureFlags(flags); + TKikimrRunner kikimr(settings); + auto db = kikimr.GetTableClient(); + CreateSampleTables(kikimr); + + { + auto result = db.StreamExecuteScanQuery(R"( + SELECT Key, Data FROM `/Root/EightShard` WHERE Key = 101 or (Key >= 202 and Key < 200+4) ORDER BY Key; + )").GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[[101u];[1]];[[202u];[1]];[[203u];[3]]])", StreamResultToYson(result)); + } + } + Y_UNIT_TEST(StreamLookup) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetTableClient(); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index a3be7cd884d..f77e99ed4c6 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -745,6 +745,7 @@ message TFeatureFlags { optional bool EnableKqpDataQueryStreamLookup = 75 [default = false]; optional bool EnableBorrowedSplitCompaction = 76 [default = true]; optional bool EnableChangefeedInitialScan = 77 [default = false]; + optional bool EnableKqpScanQuerySourceRead = 78 [default = false]; } diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index f97d2240ff1..54e6603fd28 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -223,6 +223,7 @@ message TKqpPhyCnStreamLookup { message TKqpPhyConnection { uint32 StageIndex = 1; uint32 OutputIndex = 2; + uint32 InputIndex = 13; oneof Type { TKqpPhyCnUnionAll UnionAll = 3; @@ -238,6 +239,29 @@ message TKqpPhyConnection { }; } +message TKqpReadRangesSource { + TKqpPhyTable Table = 1; + repeated TKqpPhyColumn Columns = 2; + TKqpPhyValue ItemsLimit = 3; + bool Reverse = 4; + bool Sorted = 5; + + oneof RangesExpr { + TKqpPhyParamValue Ranges = 6; + TKqpPhyKeyRange KeyRange = 7; + } + + repeated string SkipNullKeys = 8; +} + +message TKqpSource { + uint32 InputIndex = 1; + + oneof Type { + TKqpReadRangesSource ReadRangesSource = 3; + } +}; + message TKqpPhyStage { NYql.NDqProto.TProgram Program = 1; repeated string ProgramParameters = 2; @@ -247,6 +271,7 @@ message TKqpPhyStage { repeated TKqpPhyTableOperation TableOps = 6; bool IsEffectsStage = 7; string StageGuid = 8; + repeated TKqpSource Sources = 9; } message TKqpPhyResult { diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 77650329128..e4adad9f190 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -238,6 +238,24 @@ message TKqpTransaction { optional NKikimrKqp.TKqpSnapshot Snapshot = 6; } +message TKqpReadRangesSourceSettings { + optional TKqpTransaction.TTableMeta Table = 1; + + oneof RangeType { + NKikimrTx.TKeyRange FullRange = 2; + TKqpTransaction.TTableKeyRange Ranges = 3; + } + + repeated TKqpTransaction.TColumnMeta Columns = 4; + optional uint64 ItemsLimit = 5; + optional bool Reverse = 6; + repeated string SkipNullKeys = 7; + repeated uint32 KeyColumnTypes = 8; + optional EScanDataFormat DataFormat = 9; + optional NKikimrProto.TRowVersion Snapshot = 10; + optional uint64 ShardIdHint = 11; +} + message TKqpTaskInfo { optional uint64 TaskId = 1; optional NActorsProto.TActorId ComputeActor = 2; diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h index 8cb87b01558..9907f4e0bf0 100644 --- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h +++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h @@ -97,10 +97,13 @@ struct TMergeTaskInput { {} }; +struct TSourceInput {}; + // Enum values must match to ConnectionInfo variant alternatives enum class TTaskInputType { UnionAll, - Merge + Merge, + Source }; struct TTransform { @@ -114,7 +117,7 @@ struct TTransform { template <class TInputMeta> struct TTaskInput { - std::variant<std::monostate, TMergeTaskInput> ConnectionInfo; + std::variant<std::monostate, TMergeTaskInput, TSourceInput> ConnectionInfo; TChannelList Channels; TMaybe<::google::protobuf::Any> SourceSettings; TString SourceType; |