aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-06-02 15:53:08 +0300
committerssmike <ssmike@ydb.tech>2023-06-02 15:53:08 +0300
commita080f3314e29c6dcae3c36f3198b7aa36edb6cfd (patch)
tree269bc041ac073f708fcb83d2c23c8a5d23c6dbb1
parent045235991e9b5259c78c98e94d8c0d4d16849a1f (diff)
downloadydb-a080f3314e29c6dcae3c36f3198b7aa36edb6cfd.tar.gz
Fix sequential reads
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h25
-rw-r--r--ydb/core/kqp/executer_actor/kqp_partition_helper.cpp25
-rw-r--r--ydb/core/kqp/executer_actor/kqp_partition_helper.h2
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp20
4 files changed, 49 insertions, 23 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 60258929ac..3b8c667ef2 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -723,17 +723,20 @@ protected:
const auto& snapshot = GetSnapshot();
- auto addPartiton = [&](TMaybe<ui64> shardId, const TShardInfo& shardInfo, TMaybe<ui64> maxInFlightShards = Nothing()) {
+ auto addPartiton = [&](
+ ui64 taskLocation,
+ TMaybe<ui64> shardId,
+ const TShardInfo& shardInfo,
+ TMaybe<ui64> maxInFlightShards = Nothing())
+ {
YQL_ENSURE(!shardInfo.KeyWriteRanges);
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.ExecuterId = this->SelfId();
- if (shardId) {
- if (auto ptr = ShardIdToNodeId.FindPtr(*shardId)) {
- task.Meta.NodeId = *ptr;
- } else {
- task.Meta.ShardId = *shardId;
- }
+ if (auto ptr = ShardIdToNodeId.FindPtr(taskLocation)) {
+ task.Meta.NodeId = *ptr;
+ } else {
+ task.Meta.ShardId = taskLocation;
}
NKikimrTxDataShard::TKqpReadRangesSourceSettings settings;
@@ -806,17 +809,17 @@ protected:
};
if (source.GetSequentialInFlightShards()) {
- auto shardInfo = MakeVirtualTablePartition(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv());
+ auto [startShard, shardInfo] = MakeVirtualTablePartition(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv());
if (shardInfo.KeyReadRanges) {
- addPartiton({}, shardInfo, source.GetSequentialInFlightShards());
- return {};
+ addPartiton(startShard, {}, shardInfo, source.GetSequentialInFlightShards());
+ return Nothing();
} else {
return 0;
}
} else {
THashMap<ui64, TShardInfo> partitions = PrunePartitions(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv());
for (auto& [shardId, shardInfo] : partitions) {
- addPartiton(shardId, shardInfo, {});
+ addPartiton(shardId, shardId, shardInfo, {});
}
return partitions.size();
}
diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
index 07453a3d31..de5296f2db 100644
--- a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
@@ -588,12 +588,17 @@ TVector<TSerializedPointOrRange> ExtractRanges(const TKqpTableKeys& tableKeys,
return ranges;
}
-TShardInfo MakeVirtualTablePartition(const TKqpTableKeys& tableKeys,
+std::pair<ui64, TShardInfo> MakeVirtualTablePartition(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
auto guard = typeEnv.BindAllocator();
+ const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId);
+ YQL_ENSURE(table);
+
+ const auto& keyColumnTypes = table->KeyColumnTypes;
auto ranges = ExtractRanges(tableKeys, source, stageInfo, holderFactory, typeEnv, guard);
+
TShardInfo result;
for (auto& range: ranges) {
if (!result.KeyReadRanges) {
@@ -603,7 +608,23 @@ TShardInfo MakeVirtualTablePartition(const TKqpTableKeys& tableKeys,
result.KeyReadRanges->Add(std::move(range));
}
- return result;
+ ui64 shard = 0;
+ if (!ranges.empty()) {
+ auto& range = source.GetReverse() ? ranges.back() : ranges[0];
+ TTableRange tableRange = std::holds_alternative<TSerializedCellVec>(range)
+ ? TTableRange(std::get<TSerializedCellVec>(range).GetCells(), true, std::get<TSerializedCellVec>(range).GetCells(), true, true)
+ : TTableRange(std::get<TSerializedTableRange>(range).ToTableRange());
+
+ auto readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(),
+ keyColumnTypes);
+
+ if (readPartitions) {
+ auto& partition = source.GetReverse() ? readPartitions.back() : readPartitions[0];
+ shard = partition.PartitionInfo->ShardId;
+ }
+ }
+
+ return {shard, result};
}
diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.h b/ydb/core/kqp/executer_actor/kqp_partition_helper.h
index ffca6633cd..3f5973d1c8 100644
--- a/ydb/core/kqp/executer_actor/kqp_partition_helper.h
+++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.h
@@ -66,7 +66,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpPhyOpLookup& lookup, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
-TShardInfo MakeVirtualTablePartition(const TKqpTableKeys& tableKeys,
+std::pair<ui64, TShardInfo> MakeVirtualTablePartition(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index b8ce4b9714..5d791c4656 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -442,6 +442,7 @@ public:
}
if (!Settings.HasShardIdHint()) {
+ InFlightShards.PushBack(&state);
ResolveShard(&state);
} else {
StartShards();
@@ -521,7 +522,15 @@ public:
CA_LOG_D("Received TEvResolveKeySetResult update for table '" << Settings.GetTable().GetTablePath() << "'");
auto* request = ev->Get()->Request.Get();
- if (request->ErrorCount > 0) {
+ THolder<TShardState> state;
+ if (!request->ResultSet.empty()) {
+ if (auto ptr = ResolveShards[request->ResultSet[0].UserData]) {
+ state = THolder<TShardState>(ptr);
+ ResolveShards.erase(request->ResultSet[0].UserData);
+ }
+ }
+
+ if (request->ErrorCount > 0 || !state) {
CA_LOG_E("Resolve request failed for table '" << Settings.GetTable().GetTablePath() << "', ErrorCount# " << request->ErrorCount);
auto statusCode = NDqProto::StatusIds::UNAVAILABLE;
@@ -557,13 +566,6 @@ public:
}
auto keyDesc = std::move(request->ResultSet[0].KeyDescription);
- THolder<TShardState> state;
- if (auto ptr = ResolveShards[request->ResultSet[0].UserData]) {
- state = THolder<TShardState>(ptr);
- ResolveShards.erase(request->ResultSet[0].UserData);
- } else {
- return;
- }
if (keyDesc->GetPartitions().size() == 1) {
auto& partition = keyDesc->GetPartitions()[0];
@@ -1173,7 +1175,7 @@ public:
}
}
- if (RunningReads() == 0 && PendingShards.Empty() && ScanStarted) {
+ if (ScanStarted && RunningReads() == 0 && PendingShards.Empty() && ResolveShards.empty()) {
finished = true;
}