diff options
author | ulya-sidorina <yulia@ydb.tech> | 2023-02-06 17:39:24 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2023-02-06 17:39:24 +0300 |
commit | 594453052425011d2ba187d571370aa958eb5c0d (patch) | |
tree | 856ca8147cfc7ed9c04910beb27dfb4971805fe5 | |
parent | 813befe691b16fcf51beec6e725d4e04365dfd55 (diff) | |
download | ydb-594453052425011d2ba187d571370aa958eb5c0d.tar.gz |
delete table scheme resolving from stream lookup actor
refactor(kqp): delete table scheme resolving from stream lookup actor
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp | 18 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 384 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/join/kqp_join_ut.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_ne_ut.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp | 4 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 5 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_table_ut.cpp | 2 |
14 files changed, 219 insertions, 258 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index de836d27ca..8850429c70 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -214,16 +214,30 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf settings.MutableTable()->CopyFrom(streamLookup.GetTable()); auto table = tableKeys.GetTable(MakeTableId(streamLookup.GetTable())); + for (const auto& keyColumn : table.KeyColumns) { + auto columnIt = table.Columns.find(keyColumn); + YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << keyColumn); + + auto* keyColumnProto = settings.AddKeyColumns(); + keyColumnProto->SetName(keyColumn); + keyColumnProto->SetId(columnIt->second.Id); + keyColumnProto->SetTypeId(columnIt->second.Type.GetTypeId()); + } + for (const auto& keyColumn : streamLookup.GetKeyColumns()) { auto columnIt = table.Columns.find(keyColumn); YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << keyColumn); - settings.AddKeyColumns(keyColumn); + settings.AddLookupKeyColumns(keyColumn); } for (const auto& column : streamLookup.GetColumns()) { auto columnIt = table.Columns.find(column); YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << column); - settings.AddColumns(column); + + auto* columnProto = settings.AddColumns(); + columnProto->SetName(column); + columnProto->SetId(columnIt->second.Id); + columnProto->SetTypeId(columnIt->second.Type.GetTypeId()); } TTransform streamLookupTransform; diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 942c6d47b9..356838f9c7 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -942,17 +942,21 @@ private: streamLookupProto.AddKeyColumns(TString(keyColumn->GetName())); } - for (const auto& column : streamLookup.Columns()) { - YQL_ENSURE(tableMeta->Columns.FindPtr(column), "Unknown column: " << TString(column)); - streamLookupProto.AddColumns(TString(column)); - } - const auto resultType = streamLookup.Ref().GetTypeAnn(); YQL_ENSURE(resultType, "Empty stream lookup result type"); YQL_ENSURE(resultType->GetKind() == ETypeAnnotationKind::Stream, "Unexpected stream lookup result type"); const auto resultItemType = resultType->Cast<TStreamExprType>()->GetItemType(); streamLookupProto.SetResultType(NMiniKQL::SerializeNode(CompileType(pgmBuilder, *resultItemType), TypeEnv)); + YQL_ENSURE(resultItemType->GetKind() == ETypeAnnotationKind::Struct); + const auto& resultColumns = resultItemType->Cast<TStructExprType>()->GetItems(); + for (const auto column : resultColumns) { + const auto& systemColumns = GetSystemColumns(); + YQL_ENSURE(tableMeta->Columns.FindPtr(column->GetName()) || systemColumns.find(column->GetName()) != systemColumns.end(), + "Unknown column: " << column->GetName()); + streamLookupProto.AddColumns(TString(column->GetName())); + } + return; } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index b4c2f0dc3a..82fc2334fc 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -11,6 +11,8 @@ #include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/core/tx/datashard/datashard.h> #include <ydb/core/kqp/common/kqp_event_ids.h> +#include <ydb/library/yql/public/issue/yql_issue_message.h> +#include <ydb/core/kqp/runtime/kqp_scan_data.h> namespace NKikimr { namespace NKqp { @@ -18,7 +20,7 @@ namespace NKqp { namespace { static constexpr TDuration SCHEME_CACHE_REQUEST_TIMEOUT = TDuration::Seconds(5); -static constexpr TDuration RETRY_READ_TIMEOUT = TDuration::Seconds(10); +static constexpr ui64 MAX_SHARD_RETRIES = 10; class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLookupActor>, public NYql::NDq::IDqComputeActorAsyncInput { public: @@ -30,10 +32,37 @@ public: , Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId()) , LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>()) , ImmediateTx(settings.GetImmediateTx()) - , KeyPrefixColumns(settings.GetKeyColumns().begin(), settings.GetKeyColumns().end()) - , Columns(settings.GetColumns().begin(), settings.GetColumns().end()) - , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT) - , RetryReadTimeout(RETRY_READ_TIMEOUT) { + , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT) { + + KeyColumns.reserve(settings.GetKeyColumns().size()); + i32 keyOrder = 0; + for (const auto& keyColumn : settings.GetKeyColumns()) { + KeyColumns.emplace( + keyColumn.GetName(), + TSysTables::TTableColumnInfo{ + keyColumn.GetName(), + keyColumn.GetId(), + NScheme::TTypeInfo{static_cast<NScheme::TTypeId>(keyColumn.GetTypeId())}, + keyOrder++ + } + ); + } + + LookupKeyColumns.reserve(KeyColumns.size()); + for (const auto& lookupKeyColumn : settings.GetLookupKeyColumns()) { + auto columnIt = KeyColumns.find(lookupKeyColumn); + YQL_ENSURE(columnIt != KeyColumns.end()); + LookupKeyColumns.push_back(&columnIt->second); + } + + Columns.reserve(settings.GetColumns().size()); + for (const auto& column : settings.GetColumns()) { + Columns.emplace_back(TSysTables::TTableColumnInfo{ + column.GetName(), + column.GetId(), + NScheme::TTypeInfo{static_cast<NScheme::TTypeId>(column.GetTypeId())} + }); + } }; virtual ~TKqpStreamLookupActor() { @@ -44,8 +73,7 @@ public: } void Bootstrap() { - ResolveTable(); - + ResolveTableShards(); Become(&TKqpStreamLookupActor::StateFunc); } @@ -73,17 +101,11 @@ private: : Id(id) , ShardId(shardId) , Keys(std::move(keys)) - , State(EReadState::Initial) - , Retried(false) {} + , State(EReadState::Initial) {} - void SetFinished(const NActors::TActorContext& ctx) { + void SetFinished() { Keys.clear(); State = EReadState::Finished; - - if (RetryDeadlineTimerId) { - ctx.Send(RetryDeadlineTimerId, new TEvents::TEvPoisonPill()); - RetryDeadlineTimerId = {}; - } } bool Finished() const { @@ -94,13 +116,16 @@ private: const ui64 ShardId; std::vector<TOwnedTableRange> Keys; EReadState State; - TActorId RetryDeadlineTimerId; - bool Retried; }; - enum EEvSchemeCacheRequestTag : ui64 { - TableSchemeResolving, - TableShardsResolving + struct TShardState { + ui64 RetryAttempts = 0; + std::vector<TReadState*> Reads; + }; + + struct TResult { + const ui64 ShardId; + THolder<TEventHandle<TEvDataShard::TEvReadResult>> ReadResult; }; struct TEvPrivate { @@ -110,39 +135,9 @@ private: }; struct TEvSchemeCacheRequestTimeout : public TEventLocal<TEvSchemeCacheRequestTimeout, EvSchemeCacheRequestTimeout> { - TEvSchemeCacheRequestTimeout(EEvSchemeCacheRequestTag tag) : Tag(tag) {} - - const EEvSchemeCacheRequestTag Tag; - }; - - struct TEvRetryReadTimeout : public TEventLocal<TEvRetryReadTimeout, EvRetryReadTimeout> { - TEvRetryReadTimeout(ui64 readId) : ReadId(readId) {} - - const ui64 ReadId; }; }; - struct TTableScheme { - TTableScheme(const THashMap<ui32, TSysTables::TTableColumnInfo>& columns) { - std::map<ui32, NScheme::TTypeInfo> keyColumnTypesByKeyOrder; - for (const auto& [_, column] : columns) { - if (column.KeyOrder >= 0) { - keyColumnTypesByKeyOrder[column.KeyOrder] = column.PType; - } - - ColumnsByName.emplace(column.Name, std::move(column)); - } - - KeyColumnTypes.resize(keyColumnTypesByKeyOrder.size()); - for (const auto& [keyOrder, keyColumnType] : keyColumnTypesByKeyOrder) { - KeyColumnTypes[keyOrder] = keyColumnType; - } - } - - std::unordered_map<TString, TSysTables::TTableColumnInfo> ColumnsByName; - std::vector<NScheme::TTypeInfo> KeyColumnTypes; - }; - private: void SaveState(const NYql::NDqProto::TCheckpoint&, NYql::NDqProto::TSourceState&) final {} void LoadState(const NYql::NDqProto::TSourceState&) final {} @@ -165,22 +160,18 @@ private: i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final { i64 totalDataSize = 0; - if (TableScheme) { - totalDataSize = PackResults(batch, freeSpace); - auto status = FetchLookupKeys(); - - if (Partitioning) { - ProcessLookupKeys(); - } + totalDataSize = PackResults(batch, freeSpace); + auto status = FetchLookupKeys(); - finished = (status == NUdf::EFetchStatus::Finish) - && UnprocessedKeys.empty() - && AllReadsFinished() - && Results.empty(); - } else { - Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); + if (Partitioning) { + ProcessLookupKeys(); } + finished = (status == NUdf::EFetchStatus::Finish) + && UnprocessedKeys.empty() + && AllReadsFinished() + && Results.empty(); + return totalDataSize; } @@ -205,11 +196,9 @@ private: try { switch (ev->GetTypeRewrite()) { hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle); - hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); hFunc(TEvDataShard::TEvReadResult, Handle); hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); hFunc(TEvPrivate::TEvSchemeCacheRequestTimeout, Handle); - hFunc(TEvPrivate::TEvRetryReadTimeout, Handle); IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult); default: RuntimeError(TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite(), @@ -233,20 +222,6 @@ private: ProcessLookupKeys(); } - void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { - auto& resultSet = ev->Get()->Request->ResultSet; - YQL_ENSURE(resultSet.size() == 1, "Expected one result for table: " << TableId); - auto& result = resultSet[0]; - - if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) { - return RuntimeError(TStringBuilder() << "Failed to resolve table: " << ToString(result.Status), - NYql::NDqProto::StatusIds::SCHEME_ERROR); - } - - TableScheme = std::make_unique<TTableScheme>(result.Columns); - ResolveTableShards(); - } - void Handle(TEvDataShard::TEvReadResult::TPtr& ev) { const auto& record = ev->Get()->Record; @@ -271,26 +246,26 @@ private: Snapshot = IKqpGateway::TKqpSnapshot(record.GetSnapshot().GetStep(), record.GetSnapshot().GetTxId()); } - // TODO: refactor after KIKIMR-15102 - if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) { - NKikimrTxDataShard::TReadContinuationToken continuationToken; - bool parseResult = continuationToken.ParseFromString(record.GetContinuationToken()); - YQL_ENSURE(parseResult, "Failed to parse continuation token"); - YQL_ENSURE(continuationToken.GetFirstUnprocessedQuery() <= read.Keys.size()); - - return RetryTableRead(read, continuationToken); - } - - YQL_ENSURE(record.GetResultFormat() == NKikimrTxDataShard::EScanDataFormat::CELLVEC); - auto nrows = ev->Get()->GetRowsCount(); - for (ui64 rowId = 0; rowId < nrows; ++rowId) { - Results.emplace_back(ev->Get()->GetCells(rowId)); + switch (record.GetStatus().GetCode()) { + case Ydb::StatusIds::SUCCESS: + break; + case Ydb::StatusIds::OVERLOADED: + case Ydb::StatusIds::INTERNAL_ERROR: { + NKikimrTxDataShard::TReadContinuationToken continuationToken; + bool parseResult = continuationToken.ParseFromString(record.GetContinuationToken()); + YQL_ENSURE(parseResult, "Failed to parse continuation token"); + YQL_ENSURE(continuationToken.GetFirstUnprocessedQuery() <= read.Keys.size()); + return RetryTableRead(read, continuationToken); + } + default: { + NYql::TIssues issues; + NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues); + return RuntimeError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED, issues); + } } - Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); - if (record.GetFinished()) { - read.SetFinished(TlsActivationContext->AsActorContext()); + read.SetFinished(); } else { THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck()); request->Record.SetReadId(record.GetReadId()); @@ -298,6 +273,9 @@ private: Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), read.ShardId, true), IEventHandle::FlagTrackDelivery); } + + Results.emplace_back(TResult{read.ShardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())}); + Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { @@ -305,113 +283,86 @@ private: auto shardIt = ReadsPerShard.find(tabletId); YQL_ENSURE(shardIt != ReadsPerShard.end()); - for (auto readId : shardIt->second) { - auto readIt = Reads.find(readId); - YQL_ENSURE(readIt != Reads.end()); - auto& read = readIt->second; - - if (read.State == EReadState::Running) { - for (auto& key : read.Keys) { + for (auto* read : shardIt->second.Reads) { + if (read->State == EReadState::Running) { + for (auto& key : read->Keys) { UnprocessedKeys.emplace_back(std::move(key)); } - read.SetFinished(TlsActivationContext->AsActorContext()); + read->SetFinished(); } } - ReadsPerShard.erase(shardIt); ResolveTableShards(); } - void Handle(TEvPrivate::TEvSchemeCacheRequestTimeout::TPtr& ev) { - switch (ev->Get()->Tag) { - case EEvSchemeCacheRequestTag::TableSchemeResolving: - if (!TableScheme) { - RuntimeError(TStringBuilder() << "Failed to resolve scheme for table: " << TableId - << " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT); - } - break; - case EEvSchemeCacheRequestTag::TableShardsResolving: - if (!Partitioning) { - RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << TableId - << " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT); - } - break; - default: - RuntimeError(TStringBuilder() << "Unexpected tag for TEvSchemeCacheRequestTimeout: " << (ui64)ev->Get()->Tag, - NYql::NDqProto::StatusIds::INTERNAL_ERROR); - } - } - - void Handle(TEvPrivate::TEvRetryReadTimeout::TPtr& ev) { - auto readIt = Reads.find(ev->Get()->ReadId); - YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << ev->Get()->ReadId); - auto& read = readIt->second; - - if (read.Retried) { - RuntimeError(TStringBuilder() << "Retry timeout exceeded for read: " << ev->Get()->ReadId, - NYql::NDqProto::StatusIds::TIMEOUT); + void Handle(TEvPrivate::TEvSchemeCacheRequestTimeout::TPtr&) { + if (!Partitioning) { + RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << TableId + << " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT); } } ui64 PackResults(NKikimr::NMiniKQL::TUnboxedValueVector& batch, i64 freeSpace) { - YQL_ENSURE(TableScheme); - i64 totalSize = 0; + bool sizeLimitExceeded = false; batch.clear(); - batch.reserve(Results.size()); - std::vector<NKikimr::NScheme::TTypeInfo> columnTypes; - columnTypes.reserve(Columns.size()); - for (const auto& column : Columns) { - auto colIt = TableScheme->ColumnsByName.find(column); - YQL_ENSURE(colIt != TableScheme->ColumnsByName.end()); - columnTypes.push_back(colIt->second.PType); - } - - for (; !Results.empty(); Results.pop_front()) { - const auto& result = Results.front(); - YQL_ENSURE(result.size() == Columns.size(), "Result columns mismatch"); - - NUdf::TUnboxedValue* rowItems = nullptr; - auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems); + size_t rowsCount = 0; + for (const auto& result : Results) { + rowsCount += result.ReadResult->Get()->GetRowsCount(); + } + batch.reserve(rowsCount); + + for (; !Results.empty() && !sizeLimitExceeded; Results.pop_front()) { + const auto& readResult = Results.front().ReadResult; + const auto shardId = Results.front().ShardId; + + for (size_t rowId = 0; rowId < readResult->Get()->GetRowsCount(); ++rowId) { + const auto& result = readResult->Get()->GetCells(rowId); + YQL_ENSURE(result.size() <= Columns.size(), "Result columns mismatch"); + + NUdf::TUnboxedValue* rowItems = nullptr; + auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems); + + i64 rowSize = 0; + for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size() && resultColIndex < result.size(); ++colIndex) { + const auto& column = Columns[colIndex]; + if (IsSystemColumn(column.Name)) { + NMiniKQL::FillSystemColumn(rowItems[colIndex], shardId, column.Id, column.PType); + rowSize += sizeof(NUdf::TUnboxedValue); + } else { + rowItems[colIndex] = NMiniKQL::GetCellValue(result[resultColIndex], column.PType); + rowSize += NMiniKQL::GetUnboxedValueSize(rowItems[colIndex], column.PType).AllocatedBytes; + ++resultColIndex; + } + } - i64 rowSize = 0; - for (ui32 colId = 0; colId < Columns.size(); ++colId) { - rowItems[colId] = NMiniKQL::GetCellValue(result[colId], columnTypes[colId]); - rowSize += result[colId].Size(); - } + if (totalSize + rowSize > freeSpace) { + row.DeleteUnreferenced(); + sizeLimitExceeded = true; + break; + } - if (totalSize + rowSize > freeSpace) { - row.DeleteUnreferenced(); - break; + batch.push_back(std::move(row)); + totalSize += rowSize; } - - batch.push_back(std::move(row)); - totalSize += rowSize; } return totalSize; } NUdf::EFetchStatus FetchLookupKeys() { - YQL_ENSURE(TableScheme); - YQL_ENSURE(KeyPrefixColumns.size() <= TableScheme->KeyColumnTypes.size()); - - TVector<i32> keyColumnOrder; - keyColumnOrder.reserve(KeyPrefixColumns.size()); - for (const auto& keyColumn : KeyPrefixColumns) { - auto it = TableScheme->ColumnsByName.find(keyColumn); - YQL_ENSURE(it != TableScheme->ColumnsByName.end()); - keyColumnOrder.push_back(it->second.KeyOrder); - } + YQL_ENSURE(LookupKeyColumns.size() <= KeyColumns.size()); NUdf::EFetchStatus status; NUdf::TUnboxedValue key; while ((status = Input.Fetch(key)) == NUdf::EFetchStatus::Ok) { - std::vector<TCell> keyCells(KeyPrefixColumns.size()); - for (ui32 colId = 0; colId < KeyPrefixColumns.size(); ++colId) { - keyCells[keyColumnOrder[colId]] = MakeCell(TableScheme->KeyColumnTypes[keyColumnOrder[colId]], + std::vector<TCell> keyCells(LookupKeyColumns.size()); + for (size_t colId = 0; colId < LookupKeyColumns.size(); ++colId) { + const auto* lookupKeyColumn = LookupKeyColumns[colId]; + YQL_ENSURE(lookupKeyColumn->KeyOrder < static_cast<i64>(keyCells.size())); + keyCells[lookupKeyColumn->KeyOrder] = MakeCell(lookupKeyColumn->PType, key.GetElement(colId), TypeEnv, /* copy */ true); } @@ -424,15 +375,15 @@ private: void ProcessLookupKeys() { YQL_ENSURE(Partitioning, "Table partitioning should be initialized before lookup keys processing"); - std::map<ui64, std::vector<TOwnedTableRange>> shardKeys; + std::unordered_map<ui64, std::vector<TOwnedTableRange>> shardKeys; for (; !UnprocessedKeys.empty(); UnprocessedKeys.pop_front()) { const auto& key = UnprocessedKeys.front(); YQL_ENSURE(key.Point); std::vector<ui64> shardIds; - if (KeyPrefixColumns.size() < TableScheme->KeyColumnTypes.size()) { + if (LookupKeyColumns.size() < KeyColumns.size()) { /* build range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf]) */ - std::vector<TCell> fromCells(TableScheme->KeyColumnTypes.size()); + std::vector<TCell> fromCells(KeyColumns.size()); fromCells.insert(fromCells.begin(), key.From.begin(), key.From.end()); std::vector<TCell> toCells(key.From.begin(), key.From.end()); @@ -443,7 +394,7 @@ private: } for (auto shardId : shardIds) { - shardKeys[shardId].emplace_back(key); + shardKeys[shardId].emplace_back(std::move(key)); } } @@ -453,15 +404,20 @@ private: } std::vector<ui64> GetRangePartitioning(const TOwnedTableRange& range) { - YQL_ENSURE(TableScheme); YQL_ENSURE(Partitioning); + std::vector<NScheme::TTypeInfo> keyColumnTypes(KeyColumns.size()); + for (const auto& [_, columnInfo] : KeyColumns) { + YQL_ENSURE(columnInfo.KeyOrder < static_cast<i64>(keyColumnTypes.size())); + keyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType; + } + auto it = LowerBound(Partitioning->begin(), Partitioning->end(), /* value */ true, [&](const auto& partition, bool) { const int result = CompareBorders<true, false>( partition.Range->EndKeyPrefix.GetCells(), range.From, partition.Range->IsInclusive || partition.Range->IsPoint, - range.InclusiveFrom || range.Point, TableScheme->KeyColumnTypes + range.InclusiveFrom || range.Point, keyColumnTypes ); return (result < 0); @@ -481,7 +437,7 @@ private: auto cmp = CompareBorders<true, true>( it->Range->EndKeyPrefix.GetCells(), range.To, it->Range->IsInclusive || it->Range->IsPoint, - range.InclusiveTo || range.Point, TableScheme->KeyColumnTypes + range.InclusiveTo || range.Point, keyColumnTypes ); if (cmp >= 0) { @@ -518,9 +474,9 @@ private: record.MutableTableId()->SetSchemaVersion(TableId.SchemaVersion); for (const auto& column : Columns) { - auto colIt = TableScheme->ColumnsByName.find(column); - YQL_ENSURE(colIt != TableScheme->ColumnsByName.end()); - record.AddColumns(colIt->second.Id); + if (!IsSystemColumn(column.Name)) { + record.AddColumns(column.Id); + } } for (auto& key : read.Keys) { @@ -535,64 +491,51 @@ private: const auto [readIt, succeeded] = Reads.insert({readId, std::move(read)}); YQL_ENSURE(succeeded); - ReadsPerShard[shardId].insert(readId); + ReadsPerShard[shardId].Reads.push_back(&readIt->second); return readIt->second; } void RetryTableRead(TReadState& failedRead, NKikimrTxDataShard::TReadContinuationToken& token) { YQL_ENSURE(token.GetFirstUnprocessedQuery() <= failedRead.Keys.size()); - std::vector<TOwnedTableRange> unprocessedKeys; - unprocessedKeys.reserve(failedRead.Keys.size() - token.GetFirstUnprocessedQuery()); for (ui64 idx = token.GetFirstUnprocessedQuery(); idx < failedRead.Keys.size(); ++idx) { - unprocessedKeys.emplace_back(std::move(failedRead.Keys[idx])); + UnprocessedKeys.emplace_back(std::move(failedRead.Keys[idx])); } - auto& newRead = StartTableRead(failedRead.ShardId, std::move(unprocessedKeys)); - if (failedRead.Retried) { - newRead.RetryDeadlineTimerId = failedRead.RetryDeadlineTimerId; - failedRead.RetryDeadlineTimerId = {}; + failedRead.SetFinished(); + + auto& shardState = ReadsPerShard[failedRead.ShardId]; + if (shardState.RetryAttempts > MAX_SHARD_RETRIES) { + RuntimeError(TStringBuilder() << "Retry limit exceeded for shard: " << failedRead.ShardId, + NYql::NDqProto::StatusIds::ABORTED); } else { - failedRead.Retried = true; - newRead.RetryDeadlineTimerId = CreateLongTimer(TlsActivationContext->AsActorContext(), RetryReadTimeout, - new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryReadTimeout(newRead.Id))); + ++shardState.RetryAttempts; + ResolveTableShards(); } - - failedRead.SetFinished(TlsActivationContext->AsActorContext()); - } - - void ResolveTable() { - TAutoPtr<NSchemeCache::TSchemeCacheNavigate> request(new NSchemeCache::TSchemeCacheNavigate()); - NSchemeCache::TSchemeCacheNavigate::TEntry entry; - entry.TableId = TableId; - entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; - entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable; - entry.ShowPrivatePath = true; - request->ResultSet.emplace_back(entry); - Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request)); - - SchemeCacheRequestTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), SchemeCacheRequestTimeout, - new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvSchemeCacheRequestTimeout(EEvSchemeCacheRequestTag::TableSchemeResolving))); } void ResolveTableShards() { - YQL_ENSURE(TableScheme); Partitioning.reset(); auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>(); - TVector<TCell> minusInf(TableScheme->KeyColumnTypes.size()); + TVector<TCell> minusInf(KeyColumns.size()); TVector<TCell> plusInf; TTableRange range(minusInf, true, plusInf, true, false); + std::vector<NScheme::TTypeInfo> keyColumnTypes(KeyColumns.size()); + for (const auto& [_, columnInfo] : KeyColumns) { + keyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType; + } + request->ResultSet.emplace_back(MakeHolder<TKeyDesc>(TableId, range, TKeyDesc::ERowOperation::Read, - TableScheme->KeyColumnTypes, TVector<TKeyDesc::TColumnOp>{})); + keyColumnTypes, TVector<TKeyDesc::TColumnOp>{})); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {})); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request)); SchemeCacheRequestTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), SchemeCacheRequestTimeout, - new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvSchemeCacheRequestTimeout(EEvSchemeCacheRequestTag::TableShardsResolving))); + new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvSchemeCacheRequestTimeout())); } bool AllReadsFinished() const { @@ -636,17 +579,16 @@ private: IKqpGateway::TKqpSnapshot Snapshot; const TMaybe<ui64> LockTxId; const bool ImmediateTx; - const std::vector<TString> KeyPrefixColumns; - const std::vector<TString> Columns; - std::unique_ptr<const TTableScheme> TableScheme; - std::deque<TOwnedCellVec> Results; + std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns; + std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns; + std::vector<TSysTables::TTableColumnInfo> Columns; + std::deque<TResult> Results; std::unordered_map<ui64, TReadState> Reads; - std::unordered_map<ui64, std::set<ui64>> ReadsPerShard; + std::unordered_map<ui64, TShardState> ReadsPerShard; std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning; std::deque<TOwnedTableRange> UnprocessedKeys; const TDuration SchemeCacheRequestTimeout; NActors::TActorId SchemeCacheRequestTimeoutTimer; - const TDuration RetryReadTimeout; TVector<NKikimrTxDataShard::TLock> Locks; TVector<NKikimrTxDataShard::TLock> BrokenLocks; }; diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index bf02c9e80d..ea6ae04fae 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -509,7 +509,7 @@ TDataQueryResult ExecQueryAndTestResult(TSession& session, const TString& query, const TString& expectedYson) { NYdb::NTable::TExecDataQuerySettings settings; - settings.CollectQueryStats(ECollectQueryStatsMode::Basic); + settings.CollectQueryStats(ECollectQueryStatsMode::Profile); TDataQueryResult result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), params, settings) .ExtractValueSync(); diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index f839f7a75f..d89306dcbd 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -524,7 +524,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { )")); NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery( query1, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), @@ -561,7 +561,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { )"); NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery( query1, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), @@ -596,7 +596,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { )"); NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery( query2, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), @@ -636,7 +636,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { )"); NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery( query2, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), @@ -676,7 +676,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { )"); NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery( query2, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp index da4fcf5149..6b06041bd0 100644 --- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp @@ -90,7 +90,7 @@ void Test(const TString& query, const TString& answer, size_t rightTableReads) { PrepareTables(session); TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery(Q_(query), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); diff --git a/ydb/core/kqp/ut/join/kqp_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_join_ut.cpp index a7d2ace1f9..e30d820629 100644 --- a/ydb/core/kqp/ut/join/kqp_join_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_join_ut.cpp @@ -186,7 +186,7 @@ Y_UNIT_TEST_SUITE(KqpJoin) { CreateSampleTables(session); NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery(Q_(R"( PRAGMA DisableSimpleColumns; @@ -225,7 +225,7 @@ Y_UNIT_TEST_SUITE(KqpJoin) { CreateSampleTables(session); NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery(Q_(R"( PRAGMA DisableSimpleColumns; diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index 4cbd2e4eca..1b17b84249 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -87,7 +87,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { .Build(); NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params, execSettings).ExtractValueSync(); @@ -1109,7 +1109,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { auto session = db.CreateSession().GetValueSync().GetSession(); NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery(R"( SELECT * FROM `/Root/EightShard` WHERE Key = 101 OR Key = 301 @@ -1142,7 +1142,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { .Build(); NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery(R"( DECLARE $key AS Uint64; @@ -1347,7 +1347,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { auto session = db.CreateSession().GetValueSync().GetSession(); NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery(R"( @@ -1742,7 +1742,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { .Build(); auto settings = TExecDataQuerySettings() - .CollectQueryStats(ECollectQueryStatsMode::Basic); + .CollectQueryStats(ECollectQueryStatsMode::Profile); auto it = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), std::move(params), settings).GetValueSync(); UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); @@ -2622,7 +2622,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { auto session = db.CreateSession().GetValueSync().GetSession(); NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto params = TParamsBuilder() .AddParam("$group").Uint32(1).Build() @@ -3020,7 +3020,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { auto session = db.CreateSession().GetValueSync().GetSession(); NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery(R"( --!syntax_v1 diff --git a/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp index 7df7000676..558dab8172 100644 --- a/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp @@ -641,7 +641,7 @@ Y_UNIT_TEST_SUITE(KqpRanges) { { NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync(); result.GetIssues().PrintTo(Cerr); UNIT_ASSERT(result.IsSuccess()); @@ -844,7 +844,7 @@ Y_UNIT_TEST_SUITE(KqpRanges) { CreateSampleTables(session); NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery(Q_(R"( SELECT Value FROM `/Root/TestDate` WHERE Key = Date("2019-07-01") @@ -1034,7 +1034,7 @@ Y_UNIT_TEST_SUITE(KqpRanges) { )"); NYdb::NTable::TExecDataQuerySettings settings; - settings.CollectQueryStats(ECollectQueryStatsMode::Basic); + settings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); @@ -1065,7 +1065,7 @@ Y_UNIT_TEST_SUITE(KqpRanges) { CreateSampleTables(session); NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery(Q_(R"( SELECT * FROM `/Root/EightShard` WHERE Key = 101 OR Key = 302 OR Key = 403 OR Key = 705 diff --git a/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp b/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp index f9fce43ff6..ae1d952db7 100644 --- a/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp @@ -994,7 +994,7 @@ Y_UNIT_TEST_SUITE(KqpSqlIn) { .Build(); NYdb::NTable::TExecDataQuerySettings settings; - settings.CollectQueryStats(ECollectQueryStatsMode::Basic); + settings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery(Q1_(R"( DECLARE $keys AS List<Uint64>; diff --git a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp index 816bcec8a2..6631bee275 100644 --- a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp +++ b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp @@ -295,7 +295,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { .Build(); NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery(Q1_(R"( DECLARE $key AS Uint64; @@ -331,7 +331,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { .Build(); NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery(Q1_(R"( DECLARE $key AS Uint64; @@ -477,7 +477,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { auto session = db.CreateSession().GetValueSync().GetSession(); NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto params = db.GetParamsBuilder() .AddParam("$key").Int32(3).Build() @@ -503,7 +503,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { auto session = db.CreateSession().GetValueSync().GetSession(); NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto params = db.GetParamsBuilder() .AddParam("$key").Int32(3).Build() diff --git a/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp b/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp index 8cd9b54a68..75e7b526f6 100644 --- a/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp +++ b/ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp @@ -36,11 +36,11 @@ Y_UNIT_TEST_SUITE(KqpPragma) { UNIT_ASSERT(result.IsSuccess()); CompareYson(R"([[1u]])", FormatResultSetYson(result.GetResultSet(0))); - result = session.ExecuteDataQuery(R"( + /*result = session.ExecuteDataQuery(R"( SELECT COUNT(_yql_partition_id) FROM `/Root/KeyValue` WHERE Key = 1; )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::GENERIC_ERROR); - UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::CORE_TYPE_ANN)); + UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::CORE_TYPE_ANN));*/ } Y_UNIT_TEST(OrderedColumns) { diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index ba19624a1a..06485cb653 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -586,9 +586,10 @@ message TEvKillScanTablet { message TKqpStreamLookupSettings { optional NKqpProto.TKqpPhyTableId Table = 1; - repeated string KeyColumns = 2; - repeated string Columns = 3; + repeated TKqpColumnMetadataProto KeyColumns = 2; + repeated TKqpColumnMetadataProto Columns = 3; optional TKqpSnapshot Snapshot = 4; optional uint64 LockTxId = 5; optional bool ImmediateTx = 6; + repeated string LookupKeyColumns = 7; } diff --git a/ydb/services/ydb/ydb_table_ut.cpp b/ydb/services/ydb/ydb_table_ut.cpp index adc2e278ed..2aa8cf08b6 100644 --- a/ydb/services/ydb/ydb_table_ut.cpp +++ b/ydb/services/ydb/ydb_table_ut.cpp @@ -2938,7 +2938,7 @@ R"___(<main>: Error: Transaction not found: , code: 2015 for (bool returnStats : {false, true}) { NYdb::NTable::TExecDataQuerySettings execSettings; if (returnStats) { - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); } { auto query = "UPSERT INTO `/Root/Foo` (Key, Value) VALUES (0, 'aa');"; |