diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2025-06-04 20:39:36 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-06-04 17:39:36 +0000 |
commit | 5cbe9deddd00545d1e5d040d60c84b21f4495fd5 (patch) | |
tree | ced520974bb55dce0534d270f2004dd4032924c6 | |
parent | 0dabbb3ec4627215265adeea24eacc650e8247a3 (diff) | |
download | ydb-5cbe9deddd00545d1e5d040d60c84b21f4495fd5.tar.gz |
Prepare StreamLookupWorker for index write (#19252)
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 23 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp | 400 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_worker.h | 30 |
3 files changed, 290 insertions, 163 deletions
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 7ec2533f605..ba0f3a89e9c 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -152,8 +152,6 @@ private: const ui64 Id; const ui64 ShardId; EReadState State; - TMaybe<TOwnedCellVec> LastProcessedKey; - ui32 FirstUnprocessedQuery = 0; ui64 LastSeqNo = 0; }; @@ -439,7 +437,7 @@ private: break; case Ydb::StatusIds::NOT_FOUND: { - StreamLookupWorker->ResetRowsProcessing(read.Id, read.FirstUnprocessedQuery, read.LastProcessedKey); + StreamLookupWorker->ResetRowsProcessing(read.Id); CA_LOG_D("NOT_FOUND was received from tablet: " << read.ShardId << ". " << getIssues().ToOneLineString()); Reads.erase(read); @@ -478,19 +476,6 @@ private: if (record.GetFinished()) { Reads.erase(read); } else { - YQL_ENSURE(record.HasContinuationToken(), "Successful TEvReadResult should contain continuation token"); - NKikimrTxDataShard::TReadContinuationToken continuationToken; - bool parseResult = continuationToken.ParseFromString(record.GetContinuationToken()); - YQL_ENSURE(parseResult, "Failed to parse continuation token"); - read.FirstUnprocessedQuery = continuationToken.GetFirstUnprocessedQuery(); - - if (continuationToken.HasLastProcessedKey()) { - TSerializedCellVec lastKey(continuationToken.GetLastProcessedKey()); - read.LastProcessedKey = TOwnedCellVec(lastKey.GetCells()); - } else { - read.LastProcessedKey.Clear(); - } - Counters->SentIteratorAcks->Inc(); THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck()); request->Record.SetReadId(record.GetReadId()); @@ -562,7 +547,7 @@ private: if ((read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) || read.State == EReadState::Blocked) { if (ev->Get()->InstantStart) { - auto requests = StreamLookupWorker->RebuildRequest(read.Id, read.FirstUnprocessedQuery, read.LastProcessedKey, ReadId); + auto requests = StreamLookupWorker->RebuildRequest(read.Id, ReadId); for (auto& request : requests) { StartTableRead(read.ShardId, std::move(request)); } @@ -664,14 +649,14 @@ private: ++TotalRetryAttempts; if (Reads.CheckShardRetriesExeeded(failedRead)) { - StreamLookupWorker->ResetRowsProcessing(failedRead.Id, failedRead.FirstUnprocessedQuery, failedRead.LastProcessedKey); + StreamLookupWorker->ResetRowsProcessing(failedRead.Id); Reads.erase(failedRead); return ResolveTableShards(); } auto delay = Reads.CalcDelayForShard(failedRead, allowInstantRetry); if (delay == TDuration::Zero()) { - auto requests = StreamLookupWorker->RebuildRequest(failedRead.Id, failedRead.FirstUnprocessedQuery, failedRead.LastProcessedKey, ReadId); + auto requests = StreamLookupWorker->RebuildRequest(failedRead.Id, ReadId); for (auto& request : requests) { StartTableRead(failedRead.ShardId, std::move(request)); } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index a285c8e84fd..41c9a767660 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -76,57 +76,40 @@ std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpSt return rangePartition; } +struct TReadState { + std::vector<TOwnedTableRange> PendingKeys; + + TMaybe<TOwnedCellVec> LastProcessedKey; + ui32 FirstUnprocessedQuery = 0; + ui64 LastSeqNo = 0; +}; + +void UpdateContinuationData(const NKikimrTxDataShard::TEvReadResult& record, TReadState& state) { + YQL_ENSURE(record.HasContinuationToken(), "Successful TEvReadResult should contain continuation token"); + NKikimrTxDataShard::TReadContinuationToken continuationToken; + bool parseResult = continuationToken.ParseFromString(record.GetContinuationToken()); + YQL_ENSURE(parseResult, "Failed to parse continuation token"); + state.FirstUnprocessedQuery = continuationToken.GetFirstUnprocessedQuery(); + + if (continuationToken.HasLastProcessedKey()) { + TSerializedCellVec lastKey(continuationToken.GetLastProcessedKey()); + state.LastProcessedKey = TOwnedCellVec(lastKey.GetCells()); + } else { + state.LastProcessedKey.Clear(); + } +} } // !namespace -TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, +TKqpStreamLookupWorker::TKqpStreamLookupWorker(TLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc) - : Settings(std::move(settings)) - , TypeEnv(typeEnv) + : TypeEnv(typeEnv) , HolderFactory(holderFactory) , InputDesc(inputDesc) - , TablePath(Settings.GetTable().GetPath()) - , TableId(MakeTableId(Settings.GetTable())) { - - KeyColumns.reserve(Settings.GetKeyColumns().size()); - i32 keyOrder = 0; - for (const auto& keyColumn : Settings.GetKeyColumns()) { - NScheme::TTypeInfo typeInfo = NScheme::TypeInfoFromProto(keyColumn.GetTypeId(), keyColumn.GetTypeInfo()); - - KeyColumns.emplace( - keyColumn.GetName(), - TSysTables::TTableColumnInfo{ - keyColumn.GetName(), - keyColumn.GetId(), - typeInfo, - keyColumn.GetTypeInfo().GetPgTypeMod(), - keyOrder++ - } - ); - } - - LookupKeyColumns.reserve(Settings.GetLookupKeyColumns().size()); - for (const auto& lookupKey : Settings.GetLookupKeyColumns()) { - auto columnIt = KeyColumns.find(lookupKey); - YQL_ENSURE(columnIt != KeyColumns.end()); - LookupKeyColumns.push_back(&columnIt->second); - } - - Columns.reserve(Settings.GetColumns().size()); - for (const auto& column : Settings.GetColumns()) { - NScheme::TTypeInfo typeInfo = NScheme::TypeInfoFromProto(column.GetTypeId(), column.GetTypeInfo()); - - Columns.emplace_back(TSysTables::TTableColumnInfo{ - column.GetName(), - column.GetId(), - typeInfo, - column.GetTypeInfo().GetPgTypeMod() - }); - } - - KeyColumnTypes.resize(KeyColumns.size()); - for (const auto& [_, columnInfo] : KeyColumns) { + , Settings(std::move(settings)) { + KeyColumnTypes.resize(Settings.KeyColumns.size()); + for (const auto& [_, columnInfo] : Settings.KeyColumns) { YQL_ENSURE(columnInfo.KeyOrder < static_cast<i64>(KeyColumnTypes.size())); KeyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType; } @@ -136,16 +119,16 @@ TKqpStreamLookupWorker::~TKqpStreamLookupWorker() { } std::string TKqpStreamLookupWorker::GetTablePath() const { - return TablePath; + return Settings.TablePath; } TTableId TKqpStreamLookupWorker::GetTableId() const { - return TableId; + return Settings.TableId; } class TKqpLookupRows : public TKqpStreamLookupWorker { public: - TKqpLookupRows(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv, + TKqpLookupRows(TLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc) : TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc) { } @@ -154,9 +137,9 @@ public: void AddInputRow(NUdf::TUnboxedValue inputRow) final { NMiniKQL::TStringProviderBackend backend; - std::vector<TCell> keyCells(LookupKeyColumns.size()); - for (size_t colId = 0; colId < LookupKeyColumns.size(); ++colId) { - const auto* lookupKeyColumn = LookupKeyColumns[colId]; + std::vector<TCell> keyCells(Settings.LookupKeyColumns.size()); + for (size_t colId = 0; colId < Settings.LookupKeyColumns.size(); ++colId) { + const auto* lookupKeyColumn = Settings.LookupKeyColumns[colId]; YQL_ENSURE(lookupKeyColumn->KeyOrder < static_cast<i64>(keyCells.size())); // when making a cell we don't really need to make a copy of data, because // TOwnedCellVec will make its' own copy. @@ -164,9 +147,25 @@ public: inputRow.GetElement(colId), backend, /* copy */ false); } - if (keyCells.size() < KeyColumns.size()) { + AddInputRowImpl(std::move(keyCells)); + } + + void AddInputRow(TConstArrayRef<TCell> inputRow) final { + NMiniKQL::TStringProviderBackend backend; + std::vector<TCell> keyCells(Settings.LookupKeyColumns.size()); + for (size_t colId = 0; colId < Settings.LookupKeyColumns.size(); ++colId) { + const auto* lookupKeyColumn = Settings.LookupKeyColumns[colId]; + YQL_ENSURE(lookupKeyColumn->KeyOrder < static_cast<i64>(keyCells.size())); + keyCells[lookupKeyColumn->KeyOrder] = inputRow[colId]; + } + + AddInputRowImpl(std::move(keyCells)); + } + + virtual void AddInputRowImpl(std::vector<TCell> keyCells) { + if (keyCells.size() < Settings.KeyColumns.size()) { // build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf]) - std::vector<TCell> fromCells(KeyColumns.size()); + std::vector<TCell> fromCells(Settings.KeyColumns.size()); for (size_t i = 0; i < keyCells.size(); ++i) { fromCells[i] = keyCells[i]; } @@ -181,18 +180,23 @@ public: } } - std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery, - TMaybe<TOwnedCellVec> lastProcessedKey, ui64& newReadId) final { + std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui64& newReadId) final { - auto it = PendingKeysByReadId.find(prevReadId); - if (it == PendingKeysByReadId.end()) { + auto it = ReadStateByReadId.find(prevReadId); + if (it == ReadStateByReadId.end()) { return {}; } std::vector<TOwnedTableRange> unprocessedRanges; std::vector<TOwnedTableRange> unprocessedPoints; - auto& ranges = it->second; + TReadState state = std::move(it->second); + auto& ranges = state.PendingKeys; + + ReadStateByReadId.erase(it); + + ui32 firstUnprocessedQuery = state.FirstUnprocessedQuery; + const auto& lastProcessedKey = state.LastProcessedKey; if (lastProcessedKey) { YQL_ENSURE(firstUnprocessedQuery < ranges.size()); @@ -212,8 +216,6 @@ public: } } - PendingKeysByReadId.erase(it); - std::vector<THolder<TEvDataShard::TEvRead>> requests; requests.reserve(unprocessedPoints.size() + unprocessedRanges.size()); @@ -221,14 +223,22 @@ public: THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead()); FillReadRequest(++newReadId, request, unprocessedPoints); requests.emplace_back(std::move(request)); - PendingKeysByReadId.insert({newReadId, std::move(unprocessedPoints)}); + ReadStateByReadId.emplace( + newReadId, + TReadState{ + .PendingKeys = std::move(unprocessedPoints), + }); } if (!unprocessedRanges.empty()) { THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead()); FillReadRequest(++newReadId, request, unprocessedRanges); requests.emplace_back(std::move(request)); - PendingKeysByReadId.insert({newReadId, std::move(unprocessedRanges)}); + ReadStateByReadId.emplace( + newReadId, + TReadState{ + .PendingKeys = std::move(unprocessedRanges), + }); } return requests; @@ -261,14 +271,22 @@ public: THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead()); FillReadRequest(++readId, request, points); readRequests.emplace_back(shardId, std::move(request)); - PendingKeysByReadId.insert({readId, std::move(points)}); + ReadStateByReadId.emplace( + readId, + TReadState{ + .PendingKeys = std::move(points), + }); } for (auto& [shardId, ranges] : rangesPerShard) { THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead()); FillReadRequest(++readId, request, ranges); readRequests.emplace_back(shardId, std::move(request)); - PendingKeysByReadId.insert({readId, std::move(ranges)}); + ReadStateByReadId.emplace( + readId, + TReadState{ + .PendingKeys = std::move(ranges), + }); } return readRequests; @@ -278,8 +296,12 @@ public: const auto& record = result.ReadResult->Get()->Record; YQL_ENSURE(record.GetStatus().GetCode() == Ydb::StatusIds::SUCCESS); - auto it = PendingKeysByReadId.find(record.GetReadId()); - YQL_ENSURE(it != PendingKeysByReadId.end()); + auto it = ReadStateByReadId.find(record.GetReadId()); + YQL_ENSURE(it != ReadStateByReadId.end()); + + if (!record.GetFinished()) { + UpdateContinuationData(record, it->second); + } ReadResults.emplace_back(std::move(result)); } @@ -296,15 +318,15 @@ public: auto& result = ReadResults.front(); for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) { const auto& resultRow = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow); - YQL_ENSURE(resultRow.size() <= Columns.size(), "Result columns mismatch"); + YQL_ENSURE(resultRow.size() <= Settings.Columns.size(), "Result columns mismatch"); NUdf::TUnboxedValue* rowItems = nullptr; - auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems); + auto row = HolderFactory.CreateDirectArrayHolder(Settings.Columns.size(), rowItems); i64 rowSize = 0; i64 storageRowSize = 0; - for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size(); ++colIndex) { - const auto& column = Columns[colIndex]; + for (size_t colIndex = 0, resultColIndex = 0; colIndex < Settings.Columns.size(); ++colIndex) { + const auto& column = Settings.Columns[colIndex]; if (IsSystemColumn(column.Name)) { NMiniKQL::FillSystemColumn(rowItems[colIndex], result.ShardId, column.Id, column.PType); rowSize += sizeof(NUdf::TUnboxedValue); @@ -338,8 +360,41 @@ public: if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) { if (result.ReadResult->Get()->Record.GetFinished()) { // delete finished read - auto it = PendingKeysByReadId.find(result.ReadResult->Get()->Record.GetReadId()); - PendingKeysByReadId.erase(it); + auto it = ReadStateByReadId.find(result.ReadResult->Get()->Record.GetReadId()); + ReadStateByReadId.erase(it); + } + + ReadResults.pop_front(); + } + } + + return resultStats; + } + + TReadResultStats ReadAllResult(std::function<void(TConstArrayRef<TCell>)> reader) final { + TReadResultStats resultStats; + + while (!ReadResults.empty()) { + auto& result = ReadResults.front(); + for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) { + const auto& resultRow = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow); + YQL_ENSURE(resultRow.size() <= Settings.Columns.size(), "Result columns mismatch"); + + const i64 storageRowSize = EstimateSize(resultRow); + + reader(resultRow); + + resultStats.ReadRowsCount += 1; + resultStats.ReadBytesCount += storageRowSize; + resultStats.ResultRowsCount += 1; + resultStats.ResultBytesCount += storageRowSize; + } + + if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) { + if (result.ReadResult->Get()->Record.GetFinished()) { + // delete finished read + auto it = ReadStateByReadId.find(result.ReadResult->Get()->Record.GetReadId()); + ReadStateByReadId.erase(it); } ReadResults.pop_front(); @@ -351,19 +406,23 @@ public: bool AllRowsProcessed() final { return UnprocessedKeys.empty() - && PendingKeysByReadId.empty() + && ReadStateByReadId.empty() && ReadResults.empty(); } - void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) final { - auto it = PendingKeysByReadId.find(readId); - if (it == PendingKeysByReadId.end()) { + void ResetRowsProcessing(ui64 readId) final { + auto it = ReadStateByReadId.find(readId); + if (it == ReadStateByReadId.end()) { return; } + auto& keys = it->second.PendingKeys; + + ui32 firstUnprocessedQuery = it->second.FirstUnprocessedQuery; + const auto& lastProcessedKey = it->second.LastProcessedKey; if (lastProcessedKey) { - YQL_ENSURE(firstUnprocessedQuery < it->second.size()); - auto unprocessedRange = it->second[firstUnprocessedQuery]; + YQL_ENSURE(firstUnprocessedQuery < keys.size()); + auto unprocessedRange = keys[firstUnprocessedQuery]; YQL_ENSURE(!unprocessedRange.Point); UnprocessedKeys.emplace_back(*lastProcessedKey, false, @@ -371,11 +430,11 @@ public: ++firstUnprocessedQuery; } - for (ui32 keyIdx = firstUnprocessedQuery; keyIdx < it->second.size(); ++keyIdx) { - UnprocessedKeys.emplace_back(std::move(it->second[keyIdx])); + for (ui32 keyIdx = firstUnprocessedQuery; keyIdx < keys.size(); ++keyIdx) { + UnprocessedKeys.emplace_back(std::move(keys[keyIdx])); } - PendingKeysByReadId.erase(it); + ReadStateByReadId.erase(it); } private: @@ -384,11 +443,11 @@ private: record.SetReadId(readId); - record.MutableTableId()->SetOwnerId(TableId.PathId.OwnerId); - record.MutableTableId()->SetTableId(TableId.PathId.LocalPathId); - record.MutableTableId()->SetSchemaVersion(TableId.SchemaVersion); + record.MutableTableId()->SetOwnerId(Settings.TableId.PathId.OwnerId); + record.MutableTableId()->SetTableId(Settings.TableId.PathId.LocalPathId); + record.MutableTableId()->SetSchemaVersion(Settings.TableId.SchemaVersion); - for (const auto& column : Columns) { + for (const auto& column : Settings.Columns) { if (!IsSystemColumn(column.Name)) { record.AddColumns(column.Id); } @@ -406,7 +465,7 @@ private: for (auto& range : ranges) { YQL_ENSURE(!range.Point); - if (range.To.size() < KeyColumns.size()) { + if (range.To.size() < Settings.KeyColumns.size()) { // absent cells mean infinity => in prefix notation `To` should be inclusive request->Ranges.emplace_back(TSerializedTableRange(range.From, range.InclusiveFrom, range.To, true)); } else { @@ -418,34 +477,34 @@ private: private: std::deque<TOwnedTableRange> UnprocessedKeys; - std::unordered_map<ui64, std::vector<TOwnedTableRange>> PendingKeysByReadId; + std::unordered_map<ui64, TReadState> ReadStateByReadId; std::deque<TShardReadResult> ReadResults; }; class TKqpJoinRows : public TKqpStreamLookupWorker { public: - TKqpJoinRows(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv, + TKqpJoinRows(TLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc) : TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc) { // read columns should contain join key and result columns - for (auto joinKey : LookupKeyColumns) { + for (auto joinKey : Settings.LookupKeyColumns) { ReadColumns.emplace(joinKey->Name, *joinKey); } - for (auto column : Columns) { + for (auto column : Settings.Columns) { ReadColumns.emplace(column.Name, column); } } void AddInputRow(NUdf::TUnboxedValue inputRow) final { auto joinKey = inputRow.GetElement(0); - std::vector<TCell> joinKeyCells(LookupKeyColumns.size()); + std::vector<TCell> joinKeyCells(Settings.LookupKeyColumns.size()); NMiniKQL::TStringProviderBackend backend; if (joinKey.HasValue()) { - for (size_t colId = 0; colId < LookupKeyColumns.size(); ++colId) { - const auto* joinKeyColumn = LookupKeyColumns[colId]; + for (size_t colId = 0; colId < Settings.LookupKeyColumns.size(); ++colId) { + const auto* joinKeyColumn = Settings.LookupKeyColumns[colId]; YQL_ENSURE(joinKeyColumn->KeyOrder < static_cast<i64>(joinKeyCells.size())); // when making a cell we don't really need to make a copy of data, because // TOwnedCellVec will make its' own copy. @@ -457,22 +516,31 @@ public: UnprocessedRows.emplace_back(std::make_pair(TOwnedCellVec(joinKeyCells), std::move(inputRow.GetElement(1)))); } + void AddInputRow(TConstArrayRef<TCell>) final { + YQL_ENSURE(false); + } + bool IsOverloaded() final { return UnprocessedRows.size() >= MAX_IN_FLIGHT_LIMIT || PendingLeftRowsByKey.size() >= MAX_IN_FLIGHT_LIMIT || ResultRowsBySeqNo.size() >= MAX_IN_FLIGHT_LIMIT; } - std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery, - TMaybe<TOwnedCellVec> lastProcessedKey, ui64& newReadId) final { + std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui64& newReadId) final { - auto readIt = PendingKeysByReadId.find(prevReadId); - if (readIt == PendingKeysByReadId.end()) { + auto readIt = ReadStateByReadId.find(prevReadId); + if (readIt == ReadStateByReadId.end()) { return {}; } std::vector<TOwnedTableRange> unprocessedRanges; std::vector<TOwnedTableRange> unprocessedPoints; - auto& ranges = readIt->second; + TReadState state = std::move(readIt->second); + auto& ranges = state.PendingKeys; + + ReadStateByReadId.erase(readIt); + + ui32 firstUnprocessedQuery = state.FirstUnprocessedQuery; + const auto& lastProcessedKey = state.LastProcessedKey; if (lastProcessedKey) { YQL_ENSURE(firstUnprocessedQuery < ranges.size()); @@ -496,8 +564,6 @@ public: } } - PendingKeysByReadId.erase(readIt); - std::vector<THolder<TEvDataShard::TEvRead>> readRequests; if (!unprocessedPoints.empty()) { @@ -511,7 +577,11 @@ public: rowIt->second.PendingReads.insert(newReadId); } - PendingKeysByReadId.insert({newReadId, std::move(unprocessedPoints)}); + ReadStateByReadId.emplace( + newReadId, + TReadState{ + .PendingKeys = std::move(unprocessedPoints), + }); } if (!unprocessedRanges.empty()) { @@ -525,7 +595,11 @@ public: rowIt->second.PendingReads.insert(newReadId); } - PendingKeysByReadId.insert({newReadId, std::move(unprocessedRanges)}); + ReadStateByReadId.emplace( + newReadId, + TReadState{ + .PendingKeys = std::move(unprocessedRanges), + }); } return readRequests; @@ -560,7 +634,7 @@ public: } auto isKeyAllowed = [&](const TOwnedCellVec& cellVec) { - auto allowNullKeysPrefixSize = Settings.HasAllowNullKeysPrefixSize() ? Settings.GetAllowNullKeysPrefixSize() : 0; + auto allowNullKeysPrefixSize = Settings.AllowNullKeysPrefixSize; if (allowNullKeysPrefixSize >= cellVec.size()) { // all lookup key components can contain null return true; @@ -579,9 +653,9 @@ public: UnprocessedRows.pop_front(); if (isKeyAllowed(joinKey)) { std::vector <std::pair<ui64, TOwnedTableRange>> partitions; - if (joinKey.size() < KeyColumns.size()) { + if (joinKey.size() < Settings.KeyColumns.size()) { // build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf]) - std::vector <TCell> fromCells(KeyColumns.size()); + std::vector <TCell> fromCells(Settings.KeyColumns.size()); fromCells.insert(fromCells.begin(), joinKey.begin(), joinKey.end()); bool fromInclusive = true; bool toInclusive = false; @@ -620,7 +694,11 @@ public: rowIt->second.PendingReads.insert(readId); } - PendingKeysByReadId.insert({readId, std::move(points)}); + ReadStateByReadId.emplace( + readId, + TReadState{ + .PendingKeys = std::move(points), + }); } for (auto& [shardId, ranges] : rangesPerShard) { @@ -634,7 +712,11 @@ public: rowIt->second.PendingReads.insert(readId); } - PendingKeysByReadId.insert({readId, std::move(ranges)}); + ReadStateByReadId.emplace( + readId, + TReadState{ + .PendingKeys = std::move(ranges), + }); } return requests; @@ -644,25 +726,25 @@ public: const auto& record = result.ReadResult->Get()->Record; YQL_ENSURE(record.GetStatus().GetCode() == Ydb::StatusIds::SUCCESS); - auto pendingKeysIt = PendingKeysByReadId.find(record.GetReadId()); - YQL_ENSURE(pendingKeysIt != PendingKeysByReadId.end()); + auto pendingKeysIt = ReadStateByReadId.find(record.GetReadId()); + YQL_ENSURE(pendingKeysIt != ReadStateByReadId.end()); for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) { const auto& row = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow); // result can contain fewer columns because of system columns YQL_ENSURE(row.size() <= ReadColumns.size(), "Result columns mismatch"); - std::vector<TCell> joinKeyCells(LookupKeyColumns.size()); - for (size_t joinKeyColumn = 0; joinKeyColumn < LookupKeyColumns.size(); ++joinKeyColumn) { - auto columnIt = ReadColumns.find(LookupKeyColumns[joinKeyColumn]->Name); + std::vector<TCell> joinKeyCells(Settings.LookupKeyColumns.size()); + for (size_t joinKeyColumn = 0; joinKeyColumn < Settings.LookupKeyColumns.size(); ++joinKeyColumn) { + auto columnIt = ReadColumns.find(Settings.LookupKeyColumns[joinKeyColumn]->Name); YQL_ENSURE(columnIt != ReadColumns.end()); - joinKeyCells[LookupKeyColumns[joinKeyColumn]->KeyOrder] = row[std::distance(ReadColumns.begin(), columnIt)]; + joinKeyCells[Settings.LookupKeyColumns[joinKeyColumn]->KeyOrder] = row[std::distance(ReadColumns.begin(), columnIt)]; } auto leftRowIt = PendingLeftRowsByKey.find(joinKeyCells); YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end()); - if (Settings.GetLookupStrategy() == NKqpProto::EStreamLookupStrategy::SEMI_JOIN && leftRowIt->second.RightRowExist) { + if (Settings.LookupStrategy == NKqpProto::EStreamLookupStrategy::SEMI_JOIN && leftRowIt->second.RightRowExist) { // semi join should return one result row per key continue; } @@ -674,7 +756,7 @@ public: } if (record.GetFinished()) { - for (const auto& key : pendingKeysIt->second) { + for (const auto& key : pendingKeysIt->second.PendingKeys) { auto leftRowIt = PendingLeftRowsByKey.find(ExtractKeyPrefix(key)); if (leftRowIt != PendingLeftRowsByKey.end()) { leftRowIt->second.PendingReads.erase(record.GetReadId()); @@ -691,26 +773,30 @@ public: } } - PendingKeysByReadId.erase(pendingKeysIt); + ReadStateByReadId.erase(pendingKeysIt); + } else { + UpdateContinuationData(record, pendingKeysIt->second); } } bool AllRowsProcessed() final { return UnprocessedRows.empty() && UnprocessedKeys.empty() - && PendingKeysByReadId.empty() + && ReadStateByReadId.empty() && ResultRowsBySeqNo.empty() && PendingLeftRowsByKey.empty(); } - void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) final { - auto readIt = PendingKeysByReadId.find(readId); - if (readIt == PendingKeysByReadId.end()) { + void ResetRowsProcessing(ui64 readId) final { + auto readIt = ReadStateByReadId.find(readId); + if (readIt == ReadStateByReadId.end()) { return; } - auto& ranges = readIt->second; + auto& ranges = readIt->second.PendingKeys; + ui32 firstUnprocessedQuery = readIt->second.FirstUnprocessedQuery; + const auto& lastProcessedKey = readIt->second.LastProcessedKey; if (lastProcessedKey) { YQL_ENSURE(firstUnprocessedQuery < ranges.size()); auto unprocessedRange = ranges[firstUnprocessedQuery]; @@ -730,7 +816,7 @@ public: UnprocessedKeys.emplace_back(std::move(range)); } - PendingKeysByReadId.erase(readIt); + ReadStateByReadId.erase(readIt); } TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final { @@ -800,6 +886,10 @@ public: return resultStats; } + TReadResultStats ReadAllResult(std::function<void(TConstArrayRef<TCell>)>) final { + YQL_ENSURE(false); + } + ~TKqpJoinRows() { UnprocessedRows.clear(); PendingLeftRowsByKey.clear(); @@ -828,7 +918,7 @@ private: }; bool ShoulKeepRowsOrder() const { - return Settings.HasKeepRowsOrder() && Settings.GetKeepRowsOrder(); + return Settings.KeepRowsOrder; } bool IsRowSeqNoValid(const ui64& seqNo) const { @@ -845,9 +935,9 @@ private: record.SetReadId(readId); - record.MutableTableId()->SetOwnerId(TableId.PathId.OwnerId); - record.MutableTableId()->SetTableId(TableId.PathId.LocalPathId); - record.MutableTableId()->SetSchemaVersion(TableId.SchemaVersion); + record.MutableTableId()->SetOwnerId(Settings.TableId.PathId.OwnerId); + record.MutableTableId()->SetTableId(Settings.TableId.PathId.LocalPathId); + record.MutableTableId()->SetSchemaVersion(Settings.TableId.SchemaVersion); for (const auto& [name, column] : ReadColumns) { if (!IsSystemColumn(name)) { @@ -866,7 +956,7 @@ private: request->Ranges.reserve(ranges.size()); for (auto& range : ranges) { YQL_ENSURE(!range.Point); - if (range.To.size() < KeyColumns.size()) { + if (range.To.size() < Settings.KeyColumns.size()) { // Absent cells mean infinity. So in prefix notation `To` should be inclusive. request->Ranges.emplace_back(TSerializedTableRange(range.From, range.InclusiveFrom, range.To, true)); } else { @@ -877,11 +967,11 @@ private: } TConstArrayRef<TCell> ExtractKeyPrefix(const TOwnedTableRange& range) { - if (range.From.size() == LookupKeyColumns.size()) { + if (range.From.size() == Settings.LookupKeyColumns.size()) { return range.From; } - return range.From.subspan(0, LookupKeyColumns.size()); + return range.From.subspan(0, Settings.LookupKeyColumns.size()); } NMiniKQL::TStructType* GetLeftRowType() { @@ -928,10 +1018,10 @@ private: leftRowInfo.RightRowExist = true; NUdf::TUnboxedValue* rightRowItems = nullptr; - resultRowItems[1] = HolderFactory.CreateDirectArrayHolder(Columns.size(), rightRowItems); + resultRowItems[1] = HolderFactory.CreateDirectArrayHolder(Settings.Columns.size(), rightRowItems); - for (size_t colIndex = 0; colIndex < Columns.size(); ++colIndex) { - const auto& column = Columns[colIndex]; + for (size_t colIndex = 0; colIndex < Settings.Columns.size(); ++colIndex) { + const auto& column = Settings.Columns[colIndex]; auto it = ReadColumns.find(column.Name); YQL_ENSURE(it != ReadColumns.end()); @@ -963,7 +1053,7 @@ private: std::map<std::string, TSysTables::TTableColumnInfo> ReadColumns; std::deque<std::pair<TOwnedCellVec, NUdf::TUnboxedValue>> UnprocessedRows; std::deque<TOwnedTableRange> UnprocessedKeys; - std::unordered_map<ui64, std::vector<TOwnedTableRange>> PendingKeysByReadId; + std::unordered_map<ui64, TReadState> ReadStateByReadId; absl::flat_hash_map<TOwnedCellVec, TLeftRowInfo, NKikimr::TCellVectorsHash, NKikimr::TCellVectorsEquals> PendingLeftRowsByKey; std::unordered_map<ui64, TResultBatch> ResultRowsBySeqNo; ui64 InputRowSeqNo = 0; @@ -975,12 +1065,54 @@ std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKq const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc) { + TLookupSettings preparedSettings; + preparedSettings.TablePath = std::move(settings.GetTable().GetPath()); + preparedSettings.TableId = MakeTableId(settings.GetTable()); + + preparedSettings.AllowNullKeysPrefixSize = settings.HasAllowNullKeysPrefixSize() ? settings.GetAllowNullKeysPrefixSize() : 0; + preparedSettings.KeepRowsOrder = settings.HasKeepRowsOrder() && settings.GetKeepRowsOrder(); + preparedSettings.LookupStrategy = settings.GetLookupStrategy(); + + preparedSettings.KeyColumns.reserve(settings.GetKeyColumns().size()); + i32 keyOrder = 0; + for (const auto& keyColumn : settings.GetKeyColumns()) { + NScheme::TTypeInfo typeInfo = NScheme::TypeInfoFromProto(keyColumn.GetTypeId(), keyColumn.GetTypeInfo()); + preparedSettings.KeyColumns.emplace( + keyColumn.GetName(), + TSysTables::TTableColumnInfo{ + keyColumn.GetName(), + keyColumn.GetId(), + typeInfo, + keyColumn.GetTypeInfo().GetPgTypeMod(), + keyOrder++ + } + ); + } + + preparedSettings.LookupKeyColumns.reserve(settings.GetLookupKeyColumns().size()); + for (const auto& lookupKey : settings.GetLookupKeyColumns()) { + auto columnIt = preparedSettings.KeyColumns.find(lookupKey); + YQL_ENSURE(columnIt != preparedSettings.KeyColumns.end()); + preparedSettings.LookupKeyColumns.push_back(&columnIt->second); + } + + preparedSettings.Columns.reserve(settings.GetColumns().size()); + for (const auto& column : settings.GetColumns()) { + NScheme::TTypeInfo typeInfo = NScheme::TypeInfoFromProto(column.GetTypeId(), column.GetTypeInfo()); + preparedSettings.Columns.emplace_back(TSysTables::TTableColumnInfo{ + column.GetName(), + column.GetId(), + typeInfo, + column.GetTypeInfo().GetPgTypeMod() + }); + } + switch (settings.GetLookupStrategy()) { case NKqpProto::EStreamLookupStrategy::LOOKUP: - return std::make_unique<TKqpLookupRows>(std::move(settings), typeEnv, holderFactory, inputDesc); + return std::make_unique<TKqpLookupRows>(std::move(preparedSettings), typeEnv, holderFactory, inputDesc); case NKqpProto::EStreamLookupStrategy::JOIN: case NKqpProto::EStreamLookupStrategy::SEMI_JOIN: - return std::make_unique<TKqpJoinRows>(std::move(settings), typeEnv, holderFactory, inputDesc); + return std::make_unique<TKqpJoinRows>(std::move(preparedSettings), typeEnv, holderFactory, inputDesc); default: return {}; } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h index 26dfbf5462c..c4668f848a4 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h @@ -11,6 +11,19 @@ namespace NKikimr { namespace NKqp { +struct TLookupSettings { + TString TablePath; + TTableId TableId; + + ui32 AllowNullKeysPrefixSize; + bool KeepRowsOrder; + NKqpProto::EStreamLookupStrategy LookupStrategy; + + std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns; + std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns; + std::vector<TSysTables::TTableColumnInfo> Columns; +}; + class TKqpStreamLookupWorker { public: using TReadList = std::vector<std::pair<ui64, THolder<TEvDataShard::TEvRead>>>; @@ -45,7 +58,7 @@ public: }; public: - TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv, + TKqpStreamLookupWorker(TLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc); virtual ~TKqpStreamLookupWorker(); @@ -58,25 +71,22 @@ public: } virtual void AddInputRow(NUdf::TUnboxedValue inputRow) = 0; - virtual std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery, - TMaybe<TOwnedCellVec> lastProcessedKey, ui64& newReadId) = 0; + virtual void AddInputRow(TConstArrayRef<TCell> inputRow) = 0; + virtual std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui64& newReadId) = 0; virtual TReadList BuildRequests(const TPartitionInfo& partitioning, ui64& readId) = 0; virtual void AddResult(TShardReadResult result) = 0; virtual TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) = 0; + virtual TReadResultStats ReadAllResult(std::function<void(TConstArrayRef<TCell>)> reader) = 0; virtual bool AllRowsProcessed() = 0; - virtual void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) = 0; + virtual void ResetRowsProcessing(ui64 readId) = 0; virtual bool IsOverloaded() = 0; protected: - const NKikimrKqp::TKqpStreamLookupSettings Settings; const NMiniKQL::TTypeEnvironment& TypeEnv; const NMiniKQL::THolderFactory& HolderFactory; const NYql::NDqProto::TTaskInput& InputDesc; - const TString TablePath; - const TTableId TableId; - std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns; - std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns; - std::vector<TSysTables::TTableColumnInfo> Columns; + const TLookupSettings Settings; + std::vector<NScheme::TTypeInfo> KeyColumnTypes; }; |