aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2023-02-06 17:39:24 +0300
committerulya-sidorina <yulia@ydb.tech>2023-02-06 17:39:24 +0300
commit594453052425011d2ba187d571370aa958eb5c0d (patch)
tree856ca8147cfc7ed9c04910beb27dfb4971805fe5
parent813befe691b16fcf51beec6e725d4e04365dfd55 (diff)
downloadydb-594453052425011d2ba187d571370aa958eb5c0d.tar.gz
delete table scheme resolving from stream lookup actor
refactor(kqp): delete table scheme resolving from stream lookup actor
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp18
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp14
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp384
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp2
-rw-r--r--ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp10
-rw-r--r--ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp2
-rw-r--r--ydb/core/kqp/ut/join/kqp_join_ut.cpp4
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ne_ut.cpp14
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp8
-rw-r--r--ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp2
-rw-r--r--ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp8
-rw-r--r--ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp4
-rw-r--r--ydb/core/protos/kqp.proto5
-rw-r--r--ydb/services/ydb/ydb_table_ut.cpp2
14 files changed, 219 insertions, 258 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index de836d27ca..8850429c70 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -214,16 +214,30 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf
settings.MutableTable()->CopyFrom(streamLookup.GetTable());
auto table = tableKeys.GetTable(MakeTableId(streamLookup.GetTable()));
+ for (const auto& keyColumn : table.KeyColumns) {
+ auto columnIt = table.Columns.find(keyColumn);
+ YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << keyColumn);
+
+ auto* keyColumnProto = settings.AddKeyColumns();
+ keyColumnProto->SetName(keyColumn);
+ keyColumnProto->SetId(columnIt->second.Id);
+ keyColumnProto->SetTypeId(columnIt->second.Type.GetTypeId());
+ }
+
for (const auto& keyColumn : streamLookup.GetKeyColumns()) {
auto columnIt = table.Columns.find(keyColumn);
YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << keyColumn);
- settings.AddKeyColumns(keyColumn);
+ settings.AddLookupKeyColumns(keyColumn);
}
for (const auto& column : streamLookup.GetColumns()) {
auto columnIt = table.Columns.find(column);
YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << column);
- settings.AddColumns(column);
+
+ auto* columnProto = settings.AddColumns();
+ columnProto->SetName(column);
+ columnProto->SetId(columnIt->second.Id);
+ columnProto->SetTypeId(columnIt->second.Type.GetTypeId());
}
TTransform streamLookupTransform;
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index 942c6d47b9..356838f9c7 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -942,17 +942,21 @@ private:
streamLookupProto.AddKeyColumns(TString(keyColumn->GetName()));
}
- for (const auto& column : streamLookup.Columns()) {
- YQL_ENSURE(tableMeta->Columns.FindPtr(column), "Unknown column: " << TString(column));
- streamLookupProto.AddColumns(TString(column));
- }
-
const auto resultType = streamLookup.Ref().GetTypeAnn();
YQL_ENSURE(resultType, "Empty stream lookup result type");
YQL_ENSURE(resultType->GetKind() == ETypeAnnotationKind::Stream, "Unexpected stream lookup result type");
const auto resultItemType = resultType->Cast<TStreamExprType>()->GetItemType();
streamLookupProto.SetResultType(NMiniKQL::SerializeNode(CompileType(pgmBuilder, *resultItemType), TypeEnv));
+ YQL_ENSURE(resultItemType->GetKind() == ETypeAnnotationKind::Struct);
+ const auto& resultColumns = resultItemType->Cast<TStructExprType>()->GetItems();
+ for (const auto column : resultColumns) {
+ const auto& systemColumns = GetSystemColumns();
+ YQL_ENSURE(tableMeta->Columns.FindPtr(column->GetName()) || systemColumns.find(column->GetName()) != systemColumns.end(),
+ "Unknown column: " << column->GetName());
+ streamLookupProto.AddColumns(TString(column->GetName()));
+ }
+
return;
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index b4c2f0dc3a..82fc2334fc 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -11,6 +11,8 @@
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/kqp/common/kqp_event_ids.h>
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
+#include <ydb/core/kqp/runtime/kqp_scan_data.h>
namespace NKikimr {
namespace NKqp {
@@ -18,7 +20,7 @@ namespace NKqp {
namespace {
static constexpr TDuration SCHEME_CACHE_REQUEST_TIMEOUT = TDuration::Seconds(5);
-static constexpr TDuration RETRY_READ_TIMEOUT = TDuration::Seconds(10);
+static constexpr ui64 MAX_SHARD_RETRIES = 10;
class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLookupActor>, public NYql::NDq::IDqComputeActorAsyncInput {
public:
@@ -30,10 +32,37 @@ public:
, Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
, ImmediateTx(settings.GetImmediateTx())
- , KeyPrefixColumns(settings.GetKeyColumns().begin(), settings.GetKeyColumns().end())
- , Columns(settings.GetColumns().begin(), settings.GetColumns().end())
- , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
- , RetryReadTimeout(RETRY_READ_TIMEOUT) {
+ , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT) {
+
+ KeyColumns.reserve(settings.GetKeyColumns().size());
+ i32 keyOrder = 0;
+ for (const auto& keyColumn : settings.GetKeyColumns()) {
+ KeyColumns.emplace(
+ keyColumn.GetName(),
+ TSysTables::TTableColumnInfo{
+ keyColumn.GetName(),
+ keyColumn.GetId(),
+ NScheme::TTypeInfo{static_cast<NScheme::TTypeId>(keyColumn.GetTypeId())},
+ keyOrder++
+ }
+ );
+ }
+
+ LookupKeyColumns.reserve(KeyColumns.size());
+ for (const auto& lookupKeyColumn : settings.GetLookupKeyColumns()) {
+ auto columnIt = KeyColumns.find(lookupKeyColumn);
+ YQL_ENSURE(columnIt != KeyColumns.end());
+ LookupKeyColumns.push_back(&columnIt->second);
+ }
+
+ Columns.reserve(settings.GetColumns().size());
+ for (const auto& column : settings.GetColumns()) {
+ Columns.emplace_back(TSysTables::TTableColumnInfo{
+ column.GetName(),
+ column.GetId(),
+ NScheme::TTypeInfo{static_cast<NScheme::TTypeId>(column.GetTypeId())}
+ });
+ }
};
virtual ~TKqpStreamLookupActor() {
@@ -44,8 +73,7 @@ public:
}
void Bootstrap() {
- ResolveTable();
-
+ ResolveTableShards();
Become(&TKqpStreamLookupActor::StateFunc);
}
@@ -73,17 +101,11 @@ private:
: Id(id)
, ShardId(shardId)
, Keys(std::move(keys))
- , State(EReadState::Initial)
- , Retried(false) {}
+ , State(EReadState::Initial) {}
- void SetFinished(const NActors::TActorContext& ctx) {
+ void SetFinished() {
Keys.clear();
State = EReadState::Finished;
-
- if (RetryDeadlineTimerId) {
- ctx.Send(RetryDeadlineTimerId, new TEvents::TEvPoisonPill());
- RetryDeadlineTimerId = {};
- }
}
bool Finished() const {
@@ -94,13 +116,16 @@ private:
const ui64 ShardId;
std::vector<TOwnedTableRange> Keys;
EReadState State;
- TActorId RetryDeadlineTimerId;
- bool Retried;
};
- enum EEvSchemeCacheRequestTag : ui64 {
- TableSchemeResolving,
- TableShardsResolving
+ struct TShardState {
+ ui64 RetryAttempts = 0;
+ std::vector<TReadState*> Reads;
+ };
+
+ struct TResult {
+ const ui64 ShardId;
+ THolder<TEventHandle<TEvDataShard::TEvReadResult>> ReadResult;
};
struct TEvPrivate {
@@ -110,39 +135,9 @@ private:
};
struct TEvSchemeCacheRequestTimeout : public TEventLocal<TEvSchemeCacheRequestTimeout, EvSchemeCacheRequestTimeout> {
- TEvSchemeCacheRequestTimeout(EEvSchemeCacheRequestTag tag) : Tag(tag) {}
-
- const EEvSchemeCacheRequestTag Tag;
- };
-
- struct TEvRetryReadTimeout : public TEventLocal<TEvRetryReadTimeout, EvRetryReadTimeout> {
- TEvRetryReadTimeout(ui64 readId) : ReadId(readId) {}
-
- const ui64 ReadId;
};
};
- struct TTableScheme {
- TTableScheme(const THashMap<ui32, TSysTables::TTableColumnInfo>& columns) {
- std::map<ui32, NScheme::TTypeInfo> keyColumnTypesByKeyOrder;
- for (const auto& [_, column] : columns) {
- if (column.KeyOrder >= 0) {
- keyColumnTypesByKeyOrder[column.KeyOrder] = column.PType;
- }
-
- ColumnsByName.emplace(column.Name, std::move(column));
- }
-
- KeyColumnTypes.resize(keyColumnTypesByKeyOrder.size());
- for (const auto& [keyOrder, keyColumnType] : keyColumnTypesByKeyOrder) {
- KeyColumnTypes[keyOrder] = keyColumnType;
- }
- }
-
- std::unordered_map<TString, TSysTables::TTableColumnInfo> ColumnsByName;
- std::vector<NScheme::TTypeInfo> KeyColumnTypes;
- };
-
private:
void SaveState(const NYql::NDqProto::TCheckpoint&, NYql::NDqProto::TSourceState&) final {}
void LoadState(const NYql::NDqProto::TSourceState&) final {}
@@ -165,22 +160,18 @@ private:
i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
i64 totalDataSize = 0;
- if (TableScheme) {
- totalDataSize = PackResults(batch, freeSpace);
- auto status = FetchLookupKeys();
-
- if (Partitioning) {
- ProcessLookupKeys();
- }
+ totalDataSize = PackResults(batch, freeSpace);
+ auto status = FetchLookupKeys();
- finished = (status == NUdf::EFetchStatus::Finish)
- && UnprocessedKeys.empty()
- && AllReadsFinished()
- && Results.empty();
- } else {
- Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
+ if (Partitioning) {
+ ProcessLookupKeys();
}
+ finished = (status == NUdf::EFetchStatus::Finish)
+ && UnprocessedKeys.empty()
+ && AllReadsFinished()
+ && Results.empty();
+
return totalDataSize;
}
@@ -205,11 +196,9 @@ private:
try {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle);
- hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
hFunc(TEvDataShard::TEvReadResult, Handle);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
hFunc(TEvPrivate::TEvSchemeCacheRequestTimeout, Handle);
- hFunc(TEvPrivate::TEvRetryReadTimeout, Handle);
IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult);
default:
RuntimeError(TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite(),
@@ -233,20 +222,6 @@ private:
ProcessLookupKeys();
}
- void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
- auto& resultSet = ev->Get()->Request->ResultSet;
- YQL_ENSURE(resultSet.size() == 1, "Expected one result for table: " << TableId);
- auto& result = resultSet[0];
-
- if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
- return RuntimeError(TStringBuilder() << "Failed to resolve table: " << ToString(result.Status),
- NYql::NDqProto::StatusIds::SCHEME_ERROR);
- }
-
- TableScheme = std::make_unique<TTableScheme>(result.Columns);
- ResolveTableShards();
- }
-
void Handle(TEvDataShard::TEvReadResult::TPtr& ev) {
const auto& record = ev->Get()->Record;
@@ -271,26 +246,26 @@ private:
Snapshot = IKqpGateway::TKqpSnapshot(record.GetSnapshot().GetStep(), record.GetSnapshot().GetTxId());
}
- // TODO: refactor after KIKIMR-15102
- if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) {
- NKikimrTxDataShard::TReadContinuationToken continuationToken;
- bool parseResult = continuationToken.ParseFromString(record.GetContinuationToken());
- YQL_ENSURE(parseResult, "Failed to parse continuation token");
- YQL_ENSURE(continuationToken.GetFirstUnprocessedQuery() <= read.Keys.size());
-
- return RetryTableRead(read, continuationToken);
- }
-
- YQL_ENSURE(record.GetResultFormat() == NKikimrTxDataShard::EScanDataFormat::CELLVEC);
- auto nrows = ev->Get()->GetRowsCount();
- for (ui64 rowId = 0; rowId < nrows; ++rowId) {
- Results.emplace_back(ev->Get()->GetCells(rowId));
+ switch (record.GetStatus().GetCode()) {
+ case Ydb::StatusIds::SUCCESS:
+ break;
+ case Ydb::StatusIds::OVERLOADED:
+ case Ydb::StatusIds::INTERNAL_ERROR: {
+ NKikimrTxDataShard::TReadContinuationToken continuationToken;
+ bool parseResult = continuationToken.ParseFromString(record.GetContinuationToken());
+ YQL_ENSURE(parseResult, "Failed to parse continuation token");
+ YQL_ENSURE(continuationToken.GetFirstUnprocessedQuery() <= read.Keys.size());
+ return RetryTableRead(read, continuationToken);
+ }
+ default: {
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
+ return RuntimeError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED, issues);
+ }
}
- Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
-
if (record.GetFinished()) {
- read.SetFinished(TlsActivationContext->AsActorContext());
+ read.SetFinished();
} else {
THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck());
request->Record.SetReadId(record.GetReadId());
@@ -298,6 +273,9 @@ private:
Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), read.ShardId, true),
IEventHandle::FlagTrackDelivery);
}
+
+ Results.emplace_back(TResult{read.ShardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())});
+ Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
@@ -305,113 +283,86 @@ private:
auto shardIt = ReadsPerShard.find(tabletId);
YQL_ENSURE(shardIt != ReadsPerShard.end());
- for (auto readId : shardIt->second) {
- auto readIt = Reads.find(readId);
- YQL_ENSURE(readIt != Reads.end());
- auto& read = readIt->second;
-
- if (read.State == EReadState::Running) {
- for (auto& key : read.Keys) {
+ for (auto* read : shardIt->second.Reads) {
+ if (read->State == EReadState::Running) {
+ for (auto& key : read->Keys) {
UnprocessedKeys.emplace_back(std::move(key));
}
- read.SetFinished(TlsActivationContext->AsActorContext());
+ read->SetFinished();
}
}
- ReadsPerShard.erase(shardIt);
ResolveTableShards();
}
- void Handle(TEvPrivate::TEvSchemeCacheRequestTimeout::TPtr& ev) {
- switch (ev->Get()->Tag) {
- case EEvSchemeCacheRequestTag::TableSchemeResolving:
- if (!TableScheme) {
- RuntimeError(TStringBuilder() << "Failed to resolve scheme for table: " << TableId
- << " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT);
- }
- break;
- case EEvSchemeCacheRequestTag::TableShardsResolving:
- if (!Partitioning) {
- RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << TableId
- << " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT);
- }
- break;
- default:
- RuntimeError(TStringBuilder() << "Unexpected tag for TEvSchemeCacheRequestTimeout: " << (ui64)ev->Get()->Tag,
- NYql::NDqProto::StatusIds::INTERNAL_ERROR);
- }
- }
-
- void Handle(TEvPrivate::TEvRetryReadTimeout::TPtr& ev) {
- auto readIt = Reads.find(ev->Get()->ReadId);
- YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << ev->Get()->ReadId);
- auto& read = readIt->second;
-
- if (read.Retried) {
- RuntimeError(TStringBuilder() << "Retry timeout exceeded for read: " << ev->Get()->ReadId,
- NYql::NDqProto::StatusIds::TIMEOUT);
+ void Handle(TEvPrivate::TEvSchemeCacheRequestTimeout::TPtr&) {
+ if (!Partitioning) {
+ RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << TableId
+ << " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT);
}
}
ui64 PackResults(NKikimr::NMiniKQL::TUnboxedValueVector& batch, i64 freeSpace) {
- YQL_ENSURE(TableScheme);
-
i64 totalSize = 0;
+ bool sizeLimitExceeded = false;
batch.clear();
- batch.reserve(Results.size());
- std::vector<NKikimr::NScheme::TTypeInfo> columnTypes;
- columnTypes.reserve(Columns.size());
- for (const auto& column : Columns) {
- auto colIt = TableScheme->ColumnsByName.find(column);
- YQL_ENSURE(colIt != TableScheme->ColumnsByName.end());
- columnTypes.push_back(colIt->second.PType);
- }
-
- for (; !Results.empty(); Results.pop_front()) {
- const auto& result = Results.front();
- YQL_ENSURE(result.size() == Columns.size(), "Result columns mismatch");
-
- NUdf::TUnboxedValue* rowItems = nullptr;
- auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);
+ size_t rowsCount = 0;
+ for (const auto& result : Results) {
+ rowsCount += result.ReadResult->Get()->GetRowsCount();
+ }
+ batch.reserve(rowsCount);
+
+ for (; !Results.empty() && !sizeLimitExceeded; Results.pop_front()) {
+ const auto& readResult = Results.front().ReadResult;
+ const auto shardId = Results.front().ShardId;
+
+ for (size_t rowId = 0; rowId < readResult->Get()->GetRowsCount(); ++rowId) {
+ const auto& result = readResult->Get()->GetCells(rowId);
+ YQL_ENSURE(result.size() <= Columns.size(), "Result columns mismatch");
+
+ NUdf::TUnboxedValue* rowItems = nullptr;
+ auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);
+
+ i64 rowSize = 0;
+ for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size() && resultColIndex < result.size(); ++colIndex) {
+ const auto& column = Columns[colIndex];
+ if (IsSystemColumn(column.Name)) {
+ NMiniKQL::FillSystemColumn(rowItems[colIndex], shardId, column.Id, column.PType);
+ rowSize += sizeof(NUdf::TUnboxedValue);
+ } else {
+ rowItems[colIndex] = NMiniKQL::GetCellValue(result[resultColIndex], column.PType);
+ rowSize += NMiniKQL::GetUnboxedValueSize(rowItems[colIndex], column.PType).AllocatedBytes;
+ ++resultColIndex;
+ }
+ }
- i64 rowSize = 0;
- for (ui32 colId = 0; colId < Columns.size(); ++colId) {
- rowItems[colId] = NMiniKQL::GetCellValue(result[colId], columnTypes[colId]);
- rowSize += result[colId].Size();
- }
+ if (totalSize + rowSize > freeSpace) {
+ row.DeleteUnreferenced();
+ sizeLimitExceeded = true;
+ break;
+ }
- if (totalSize + rowSize > freeSpace) {
- row.DeleteUnreferenced();
- break;
+ batch.push_back(std::move(row));
+ totalSize += rowSize;
}
-
- batch.push_back(std::move(row));
- totalSize += rowSize;
}
return totalSize;
}
NUdf::EFetchStatus FetchLookupKeys() {
- YQL_ENSURE(TableScheme);
- YQL_ENSURE(KeyPrefixColumns.size() <= TableScheme->KeyColumnTypes.size());
-
- TVector<i32> keyColumnOrder;
- keyColumnOrder.reserve(KeyPrefixColumns.size());
- for (const auto& keyColumn : KeyPrefixColumns) {
- auto it = TableScheme->ColumnsByName.find(keyColumn);
- YQL_ENSURE(it != TableScheme->ColumnsByName.end());
- keyColumnOrder.push_back(it->second.KeyOrder);
- }
+ YQL_ENSURE(LookupKeyColumns.size() <= KeyColumns.size());
NUdf::EFetchStatus status;
NUdf::TUnboxedValue key;
while ((status = Input.Fetch(key)) == NUdf::EFetchStatus::Ok) {
- std::vector<TCell> keyCells(KeyPrefixColumns.size());
- for (ui32 colId = 0; colId < KeyPrefixColumns.size(); ++colId) {
- keyCells[keyColumnOrder[colId]] = MakeCell(TableScheme->KeyColumnTypes[keyColumnOrder[colId]],
+ std::vector<TCell> keyCells(LookupKeyColumns.size());
+ for (size_t colId = 0; colId < LookupKeyColumns.size(); ++colId) {
+ const auto* lookupKeyColumn = LookupKeyColumns[colId];
+ YQL_ENSURE(lookupKeyColumn->KeyOrder < static_cast<i64>(keyCells.size()));
+ keyCells[lookupKeyColumn->KeyOrder] = MakeCell(lookupKeyColumn->PType,
key.GetElement(colId), TypeEnv, /* copy */ true);
}
@@ -424,15 +375,15 @@ private:
void ProcessLookupKeys() {
YQL_ENSURE(Partitioning, "Table partitioning should be initialized before lookup keys processing");
- std::map<ui64, std::vector<TOwnedTableRange>> shardKeys;
+ std::unordered_map<ui64, std::vector<TOwnedTableRange>> shardKeys;
for (; !UnprocessedKeys.empty(); UnprocessedKeys.pop_front()) {
const auto& key = UnprocessedKeys.front();
YQL_ENSURE(key.Point);
std::vector<ui64> shardIds;
- if (KeyPrefixColumns.size() < TableScheme->KeyColumnTypes.size()) {
+ if (LookupKeyColumns.size() < KeyColumns.size()) {
/* build range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf]) */
- std::vector<TCell> fromCells(TableScheme->KeyColumnTypes.size());
+ std::vector<TCell> fromCells(KeyColumns.size());
fromCells.insert(fromCells.begin(), key.From.begin(), key.From.end());
std::vector<TCell> toCells(key.From.begin(), key.From.end());
@@ -443,7 +394,7 @@ private:
}
for (auto shardId : shardIds) {
- shardKeys[shardId].emplace_back(key);
+ shardKeys[shardId].emplace_back(std::move(key));
}
}
@@ -453,15 +404,20 @@ private:
}
std::vector<ui64> GetRangePartitioning(const TOwnedTableRange& range) {
- YQL_ENSURE(TableScheme);
YQL_ENSURE(Partitioning);
+ std::vector<NScheme::TTypeInfo> keyColumnTypes(KeyColumns.size());
+ for (const auto& [_, columnInfo] : KeyColumns) {
+ YQL_ENSURE(columnInfo.KeyOrder < static_cast<i64>(keyColumnTypes.size()));
+ keyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType;
+ }
+
auto it = LowerBound(Partitioning->begin(), Partitioning->end(), /* value */ true,
[&](const auto& partition, bool) {
const int result = CompareBorders<true, false>(
partition.Range->EndKeyPrefix.GetCells(), range.From,
partition.Range->IsInclusive || partition.Range->IsPoint,
- range.InclusiveFrom || range.Point, TableScheme->KeyColumnTypes
+ range.InclusiveFrom || range.Point, keyColumnTypes
);
return (result < 0);
@@ -481,7 +437,7 @@ private:
auto cmp = CompareBorders<true, true>(
it->Range->EndKeyPrefix.GetCells(), range.To,
it->Range->IsInclusive || it->Range->IsPoint,
- range.InclusiveTo || range.Point, TableScheme->KeyColumnTypes
+ range.InclusiveTo || range.Point, keyColumnTypes
);
if (cmp >= 0) {
@@ -518,9 +474,9 @@ private:
record.MutableTableId()->SetSchemaVersion(TableId.SchemaVersion);
for (const auto& column : Columns) {
- auto colIt = TableScheme->ColumnsByName.find(column);
- YQL_ENSURE(colIt != TableScheme->ColumnsByName.end());
- record.AddColumns(colIt->second.Id);
+ if (!IsSystemColumn(column.Name)) {
+ record.AddColumns(column.Id);
+ }
}
for (auto& key : read.Keys) {
@@ -535,64 +491,51 @@ private:
const auto [readIt, succeeded] = Reads.insert({readId, std::move(read)});
YQL_ENSURE(succeeded);
- ReadsPerShard[shardId].insert(readId);
+ ReadsPerShard[shardId].Reads.push_back(&readIt->second);
return readIt->second;
}
void RetryTableRead(TReadState& failedRead, NKikimrTxDataShard::TReadContinuationToken& token) {
YQL_ENSURE(token.GetFirstUnprocessedQuery() <= failedRead.Keys.size());
- std::vector<TOwnedTableRange> unprocessedKeys;
- unprocessedKeys.reserve(failedRead.Keys.size() - token.GetFirstUnprocessedQuery());
for (ui64 idx = token.GetFirstUnprocessedQuery(); idx < failedRead.Keys.size(); ++idx) {
- unprocessedKeys.emplace_back(std::move(failedRead.Keys[idx]));
+ UnprocessedKeys.emplace_back(std::move(failedRead.Keys[idx]));
}
- auto& newRead = StartTableRead(failedRead.ShardId, std::move(unprocessedKeys));
- if (failedRead.Retried) {
- newRead.RetryDeadlineTimerId = failedRead.RetryDeadlineTimerId;
- failedRead.RetryDeadlineTimerId = {};
+ failedRead.SetFinished();
+
+ auto& shardState = ReadsPerShard[failedRead.ShardId];
+ if (shardState.RetryAttempts > MAX_SHARD_RETRIES) {
+ RuntimeError(TStringBuilder() << "Retry limit exceeded for shard: " << failedRead.ShardId,
+ NYql::NDqProto::StatusIds::ABORTED);
} else {
- failedRead.Retried = true;
- newRead.RetryDeadlineTimerId = CreateLongTimer(TlsActivationContext->AsActorContext(), RetryReadTimeout,
- new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryReadTimeout(newRead.Id)));
+ ++shardState.RetryAttempts;
+ ResolveTableShards();
}
-
- failedRead.SetFinished(TlsActivationContext->AsActorContext());
- }
-
- void ResolveTable() {
- TAutoPtr<NSchemeCache::TSchemeCacheNavigate> request(new NSchemeCache::TSchemeCacheNavigate());
- NSchemeCache::TSchemeCacheNavigate::TEntry entry;
- entry.TableId = TableId;
- entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
- entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable;
- entry.ShowPrivatePath = true;
- request->ResultSet.emplace_back(entry);
- Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request));
-
- SchemeCacheRequestTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), SchemeCacheRequestTimeout,
- new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvSchemeCacheRequestTimeout(EEvSchemeCacheRequestTag::TableSchemeResolving)));
}
void ResolveTableShards() {
- YQL_ENSURE(TableScheme);
Partitioning.reset();
auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
- TVector<TCell> minusInf(TableScheme->KeyColumnTypes.size());
+ TVector<TCell> minusInf(KeyColumns.size());
TVector<TCell> plusInf;
TTableRange range(minusInf, true, plusInf, true, false);
+ std::vector<NScheme::TTypeInfo> keyColumnTypes(KeyColumns.size());
+ for (const auto& [_, columnInfo] : KeyColumns) {
+ keyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType;
+ }
+
request->ResultSet.emplace_back(MakeHolder<TKeyDesc>(TableId, range, TKeyDesc::ERowOperation::Read,
- TableScheme->KeyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));
+ keyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
SchemeCacheRequestTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), SchemeCacheRequestTimeout,
- new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvSchemeCacheRequestTimeout(EEvSchemeCacheRequestTag::TableShardsResolving)));
+ new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvSchemeCacheRequestTimeout()));
}
bool AllReadsFinished() const {
@@ -636,17 +579,16 @@ private:
IKqpGateway::TKqpSnapshot Snapshot;
const TMaybe<ui64> LockTxId;
const bool ImmediateTx;
- const std::vector<TString> KeyPrefixColumns;
- const std::vector<TString> Columns;
- std::unique_ptr<const TTableScheme> TableScheme;
- std::deque<TOwnedCellVec> Results;
+ std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns;
+ std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns;
+ std::vector<TSysTables::TTableColumnInfo> Columns;
+ std::deque<TResult> Results;
std::unordered_map<ui64, TReadState> Reads;
- std::unordered_map<ui64, std::set<ui64>> ReadsPerShard;
+ std::unordered_map<ui64, TShardState> ReadsPerShard;
std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning;
std::deque<TOwnedTableRange> UnprocessedKeys;
const TDuration SchemeCacheRequestTimeout;
NActors::TActorId SchemeCacheRequestTimeoutTimer;
- const TDuration RetryReadTimeout;
TVector<NKikimrTxDataShard::TLock> Locks;
TVector<NKikimrTxDataShard::TLock> BrokenLocks;
};
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index bf02c9e80d..ea6ae04fae 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -509,7 +509,7 @@ TDataQueryResult ExecQueryAndTestResult(TSession& session, const TString& query,
const TString& expectedYson)
{
NYdb::NTable::TExecDataQuerySettings settings;
- settings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ settings.CollectQueryStats(ECollectQueryStatsMode::Profile);
TDataQueryResult result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), params, settings)
.ExtractValueSync();
diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
index f839f7a75f..d89306dcbd 100644
--- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
+++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
@@ -524,7 +524,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
)"));
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(
query1,
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
@@ -561,7 +561,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
)");
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(
query1,
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
@@ -596,7 +596,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
)");
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(
query2,
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
@@ -636,7 +636,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
)");
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(
query2,
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
@@ -676,7 +676,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
)");
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(
query2,
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
index da4fcf5149..6b06041bd0 100644
--- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
+++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
@@ -90,7 +90,7 @@ void Test(const TString& query, const TString& answer, size_t rightTableReads) {
PrepareTables(session);
TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(Q_(query), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
diff --git a/ydb/core/kqp/ut/join/kqp_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_join_ut.cpp
index a7d2ace1f9..e30d820629 100644
--- a/ydb/core/kqp/ut/join/kqp_join_ut.cpp
+++ b/ydb/core/kqp/ut/join/kqp_join_ut.cpp
@@ -186,7 +186,7 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
CreateSampleTables(session);
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(Q_(R"(
PRAGMA DisableSimpleColumns;
@@ -225,7 +225,7 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
CreateSampleTables(session);
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(Q_(R"(
PRAGMA DisableSimpleColumns;
diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
index 4cbd2e4eca..1b17b84249 100644
--- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
@@ -87,7 +87,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
.Build();
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(query,
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params, execSettings).ExtractValueSync();
@@ -1109,7 +1109,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
auto session = db.CreateSession().GetValueSync().GetSession();
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(R"(
SELECT * FROM `/Root/EightShard` WHERE Key = 101 OR Key = 301
@@ -1142,7 +1142,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
.Build();
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(R"(
DECLARE $key AS Uint64;
@@ -1347,7 +1347,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
auto session = db.CreateSession().GetValueSync().GetSession();
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(R"(
@@ -1742,7 +1742,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
.Build();
auto settings = TExecDataQuerySettings()
- .CollectQueryStats(ECollectQueryStatsMode::Basic);
+ .CollectQueryStats(ECollectQueryStatsMode::Profile);
auto it = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), std::move(params), settings).GetValueSync();
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
@@ -2622,7 +2622,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
auto session = db.CreateSession().GetValueSync().GetSession();
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto params = TParamsBuilder()
.AddParam("$group").Uint32(1).Build()
@@ -3020,7 +3020,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
auto session = db.CreateSession().GetValueSync().GetSession();
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
diff --git a/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp
index 7df7000676..558dab8172 100644
--- a/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp
@@ -641,7 +641,7 @@ Y_UNIT_TEST_SUITE(KqpRanges) {
{
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
result.GetIssues().PrintTo(Cerr);
UNIT_ASSERT(result.IsSuccess());
@@ -844,7 +844,7 @@ Y_UNIT_TEST_SUITE(KqpRanges) {
CreateSampleTables(session);
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(Q_(R"(
SELECT Value FROM `/Root/TestDate`
WHERE Key = Date("2019-07-01")
@@ -1034,7 +1034,7 @@ Y_UNIT_TEST_SUITE(KqpRanges) {
)");
NYdb::NTable::TExecDataQuerySettings settings;
- settings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ settings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
@@ -1065,7 +1065,7 @@ Y_UNIT_TEST_SUITE(KqpRanges) {
CreateSampleTables(session);
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(Q_(R"(
SELECT * FROM `/Root/EightShard`
WHERE Key = 101 OR Key = 302 OR Key = 403 OR Key = 705
diff --git a/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp b/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp
index f9fce43ff6..ae1d952db7 100644
--- a/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp
@@ -994,7 +994,7 @@ Y_UNIT_TEST_SUITE(KqpSqlIn) {
.Build();
NYdb::NTable::TExecDataQuerySettings settings;
- settings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ settings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(Q1_(R"(
DECLARE $keys AS List<Uint64>;
diff --git a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp
index 816bcec8a2..6631bee275 100644
--- a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp
+++ b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp
@@ -295,7 +295,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
.Build();
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(Q1_(R"(
DECLARE $key AS Uint64;
@@ -331,7 +331,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
.Build();
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto result = session.ExecuteDataQuery(Q1_(R"(
DECLARE $key AS Uint64;
@@ -477,7 +477,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
auto session = db.CreateSession().GetValueSync().GetSession();
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto params = db.GetParamsBuilder()
.AddParam("$key").Int32(3).Build()
@@ -503,7 +503,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
auto session = db.CreateSession().GetValueSync().GetSession();
NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
auto params = db.GetParamsBuilder()
.AddParam("$key").Int32(3).Build()
diff --git a/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp b/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp
index 8cd9b54a68..75e7b526f6 100644
--- a/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp
+++ b/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp
@@ -36,11 +36,11 @@ Y_UNIT_TEST_SUITE(KqpPragma) {
UNIT_ASSERT(result.IsSuccess());
CompareYson(R"([[1u]])", FormatResultSetYson(result.GetResultSet(0)));
- result = session.ExecuteDataQuery(R"(
+ /*result = session.ExecuteDataQuery(R"(
SELECT COUNT(_yql_partition_id) FROM `/Root/KeyValue` WHERE Key = 1;
)", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::GENERIC_ERROR);
- UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::CORE_TYPE_ANN));
+ UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::CORE_TYPE_ANN));*/
}
Y_UNIT_TEST(OrderedColumns) {
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index ba19624a1a..06485cb653 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -586,9 +586,10 @@ message TEvKillScanTablet {
message TKqpStreamLookupSettings {
optional NKqpProto.TKqpPhyTableId Table = 1;
- repeated string KeyColumns = 2;
- repeated string Columns = 3;
+ repeated TKqpColumnMetadataProto KeyColumns = 2;
+ repeated TKqpColumnMetadataProto Columns = 3;
optional TKqpSnapshot Snapshot = 4;
optional uint64 LockTxId = 5;
optional bool ImmediateTx = 6;
+ repeated string LookupKeyColumns = 7;
}
diff --git a/ydb/services/ydb/ydb_table_ut.cpp b/ydb/services/ydb/ydb_table_ut.cpp
index adc2e278ed..2aa8cf08b6 100644
--- a/ydb/services/ydb/ydb_table_ut.cpp
+++ b/ydb/services/ydb/ydb_table_ut.cpp
@@ -2938,7 +2938,7 @@ R"___(<main>: Error: Transaction not found: , code: 2015
for (bool returnStats : {false, true}) {
NYdb::NTable::TExecDataQuerySettings execSettings;
if (returnStats) {
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
}
{
auto query = "UPSERT INTO `/Root/Foo` (Key, Value) VALUES (0, 'aa');";