diff options
author | ssmike <ssmike@ydb.tech> | 2023-06-02 15:53:08 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-06-02 15:53:08 +0300 |
commit | a080f3314e29c6dcae3c36f3198b7aa36edb6cfd (patch) | |
tree | 269bc041ac073f708fcb83d2c23c8a5d23c6dbb1 | |
parent | 045235991e9b5259c78c98e94d8c0d4d16849a1f (diff) | |
download | ydb-a080f3314e29c6dcae3c36f3198b7aa36edb6cfd.tar.gz |
Fix sequential reads
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 25 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_partition_helper.cpp | 25 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_partition_helper.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 20 |
4 files changed, 49 insertions, 23 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 60258929ac..3b8c667ef2 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -723,17 +723,20 @@ protected: const auto& snapshot = GetSnapshot(); - auto addPartiton = [&](TMaybe<ui64> shardId, const TShardInfo& shardInfo, TMaybe<ui64> maxInFlightShards = Nothing()) { + auto addPartiton = [&]( + ui64 taskLocation, + TMaybe<ui64> shardId, + const TShardInfo& shardInfo, + TMaybe<ui64> maxInFlightShards = Nothing()) + { YQL_ENSURE(!shardInfo.KeyWriteRanges); auto& task = TasksGraph.AddTask(stageInfo); task.Meta.ExecuterId = this->SelfId(); - if (shardId) { - if (auto ptr = ShardIdToNodeId.FindPtr(*shardId)) { - task.Meta.NodeId = *ptr; - } else { - task.Meta.ShardId = *shardId; - } + if (auto ptr = ShardIdToNodeId.FindPtr(taskLocation)) { + task.Meta.NodeId = *ptr; + } else { + task.Meta.ShardId = taskLocation; } NKikimrTxDataShard::TKqpReadRangesSourceSettings settings; @@ -806,17 +809,17 @@ protected: }; if (source.GetSequentialInFlightShards()) { - auto shardInfo = MakeVirtualTablePartition(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv()); + auto [startShard, shardInfo] = MakeVirtualTablePartition(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv()); if (shardInfo.KeyReadRanges) { - addPartiton({}, shardInfo, source.GetSequentialInFlightShards()); - return {}; + addPartiton(startShard, {}, shardInfo, source.GetSequentialInFlightShards()); + return Nothing(); } else { return 0; } } else { THashMap<ui64, TShardInfo> partitions = PrunePartitions(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv()); for (auto& [shardId, shardInfo] : partitions) { - addPartiton(shardId, shardInfo, {}); + addPartiton(shardId, shardId, shardInfo, {}); } return partitions.size(); } diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp index 07453a3d31..de5296f2db 100644 --- a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp +++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp @@ -588,12 +588,17 @@ TVector<TSerializedPointOrRange> ExtractRanges(const TKqpTableKeys& tableKeys, return ranges; } -TShardInfo MakeVirtualTablePartition(const TKqpTableKeys& tableKeys, +std::pair<ui64, TShardInfo> MakeVirtualTablePartition(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { auto guard = typeEnv.BindAllocator(); + const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId); + YQL_ENSURE(table); + + const auto& keyColumnTypes = table->KeyColumnTypes; auto ranges = ExtractRanges(tableKeys, source, stageInfo, holderFactory, typeEnv, guard); + TShardInfo result; for (auto& range: ranges) { if (!result.KeyReadRanges) { @@ -603,7 +608,23 @@ TShardInfo MakeVirtualTablePartition(const TKqpTableKeys& tableKeys, result.KeyReadRanges->Add(std::move(range)); } - return result; + ui64 shard = 0; + if (!ranges.empty()) { + auto& range = source.GetReverse() ? ranges.back() : ranges[0]; + TTableRange tableRange = std::holds_alternative<TSerializedCellVec>(range) + ? TTableRange(std::get<TSerializedCellVec>(range).GetCells(), true, std::get<TSerializedCellVec>(range).GetCells(), true, true) + : TTableRange(std::get<TSerializedTableRange>(range).ToTableRange()); + + auto readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(), + keyColumnTypes); + + if (readPartitions) { + auto& partition = source.GetReverse() ? readPartitions.back() : readPartitions[0]; + shard = partition.PartitionInfo->ShardId; + } + } + + return {shard, result}; } diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.h b/ydb/core/kqp/executer_actor/kqp_partition_helper.h index ffca6633cd..3f5973d1c8 100644 --- a/ydb/core/kqp/executer_actor/kqp_partition_helper.h +++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.h @@ -66,7 +66,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpPhyOpLookup& lookup, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); -TShardInfo MakeVirtualTablePartition(const TKqpTableKeys& tableKeys, +std::pair<ui64, TShardInfo> MakeVirtualTablePartition(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index b8ce4b9714..5d791c4656 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -442,6 +442,7 @@ public: } if (!Settings.HasShardIdHint()) { + InFlightShards.PushBack(&state); ResolveShard(&state); } else { StartShards(); @@ -521,7 +522,15 @@ public: CA_LOG_D("Received TEvResolveKeySetResult update for table '" << Settings.GetTable().GetTablePath() << "'"); auto* request = ev->Get()->Request.Get(); - if (request->ErrorCount > 0) { + THolder<TShardState> state; + if (!request->ResultSet.empty()) { + if (auto ptr = ResolveShards[request->ResultSet[0].UserData]) { + state = THolder<TShardState>(ptr); + ResolveShards.erase(request->ResultSet[0].UserData); + } + } + + if (request->ErrorCount > 0 || !state) { CA_LOG_E("Resolve request failed for table '" << Settings.GetTable().GetTablePath() << "', ErrorCount# " << request->ErrorCount); auto statusCode = NDqProto::StatusIds::UNAVAILABLE; @@ -557,13 +566,6 @@ public: } auto keyDesc = std::move(request->ResultSet[0].KeyDescription); - THolder<TShardState> state; - if (auto ptr = ResolveShards[request->ResultSet[0].UserData]) { - state = THolder<TShardState>(ptr); - ResolveShards.erase(request->ResultSet[0].UserData); - } else { - return; - } if (keyDesc->GetPartitions().size() == 1) { auto& partition = keyDesc->GetPartitions()[0]; @@ -1173,7 +1175,7 @@ public: } } - if (RunningReads() == 0 && PendingShards.Empty() && ScanStarted) { + if (ScanStarted && RunningReads() == 0 && PendingShards.Empty() && ResolveShards.empty()) { finished = true; } |