aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2025-06-04 20:39:36 +0300
committerGitHub <noreply@github.com>2025-06-04 17:39:36 +0000
commit5cbe9deddd00545d1e5d040d60c84b21f4495fd5 (patch)
treeced520974bb55dce0534d270f2004dd4032924c6
parent0dabbb3ec4627215265adeea24eacc650e8247a3 (diff)
downloadydb-5cbe9deddd00545d1e5d040d60c84b21f4495fd5.tar.gz
Prepare StreamLookupWorker for index write (#19252)
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp23
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp400
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.h30
3 files changed, 290 insertions, 163 deletions
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 7ec2533f605..ba0f3a89e9c 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -152,8 +152,6 @@ private:
const ui64 Id;
const ui64 ShardId;
EReadState State;
- TMaybe<TOwnedCellVec> LastProcessedKey;
- ui32 FirstUnprocessedQuery = 0;
ui64 LastSeqNo = 0;
};
@@ -439,7 +437,7 @@ private:
break;
case Ydb::StatusIds::NOT_FOUND:
{
- StreamLookupWorker->ResetRowsProcessing(read.Id, read.FirstUnprocessedQuery, read.LastProcessedKey);
+ StreamLookupWorker->ResetRowsProcessing(read.Id);
CA_LOG_D("NOT_FOUND was received from tablet: " << read.ShardId << ". "
<< getIssues().ToOneLineString());
Reads.erase(read);
@@ -478,19 +476,6 @@ private:
if (record.GetFinished()) {
Reads.erase(read);
} else {
- YQL_ENSURE(record.HasContinuationToken(), "Successful TEvReadResult should contain continuation token");
- NKikimrTxDataShard::TReadContinuationToken continuationToken;
- bool parseResult = continuationToken.ParseFromString(record.GetContinuationToken());
- YQL_ENSURE(parseResult, "Failed to parse continuation token");
- read.FirstUnprocessedQuery = continuationToken.GetFirstUnprocessedQuery();
-
- if (continuationToken.HasLastProcessedKey()) {
- TSerializedCellVec lastKey(continuationToken.GetLastProcessedKey());
- read.LastProcessedKey = TOwnedCellVec(lastKey.GetCells());
- } else {
- read.LastProcessedKey.Clear();
- }
-
Counters->SentIteratorAcks->Inc();
THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck());
request->Record.SetReadId(record.GetReadId());
@@ -562,7 +547,7 @@ private:
if ((read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) || read.State == EReadState::Blocked) {
if (ev->Get()->InstantStart) {
- auto requests = StreamLookupWorker->RebuildRequest(read.Id, read.FirstUnprocessedQuery, read.LastProcessedKey, ReadId);
+ auto requests = StreamLookupWorker->RebuildRequest(read.Id, ReadId);
for (auto& request : requests) {
StartTableRead(read.ShardId, std::move(request));
}
@@ -664,14 +649,14 @@ private:
++TotalRetryAttempts;
if (Reads.CheckShardRetriesExeeded(failedRead)) {
- StreamLookupWorker->ResetRowsProcessing(failedRead.Id, failedRead.FirstUnprocessedQuery, failedRead.LastProcessedKey);
+ StreamLookupWorker->ResetRowsProcessing(failedRead.Id);
Reads.erase(failedRead);
return ResolveTableShards();
}
auto delay = Reads.CalcDelayForShard(failedRead, allowInstantRetry);
if (delay == TDuration::Zero()) {
- auto requests = StreamLookupWorker->RebuildRequest(failedRead.Id, failedRead.FirstUnprocessedQuery, failedRead.LastProcessedKey, ReadId);
+ auto requests = StreamLookupWorker->RebuildRequest(failedRead.Id, ReadId);
for (auto& request : requests) {
StartTableRead(failedRead.ShardId, std::move(request));
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
index a285c8e84fd..41c9a767660 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
@@ -76,57 +76,40 @@ std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpSt
return rangePartition;
}
+struct TReadState {
+ std::vector<TOwnedTableRange> PendingKeys;
+
+ TMaybe<TOwnedCellVec> LastProcessedKey;
+ ui32 FirstUnprocessedQuery = 0;
+ ui64 LastSeqNo = 0;
+};
+
+void UpdateContinuationData(const NKikimrTxDataShard::TEvReadResult& record, TReadState& state) {
+ YQL_ENSURE(record.HasContinuationToken(), "Successful TEvReadResult should contain continuation token");
+ NKikimrTxDataShard::TReadContinuationToken continuationToken;
+ bool parseResult = continuationToken.ParseFromString(record.GetContinuationToken());
+ YQL_ENSURE(parseResult, "Failed to parse continuation token");
+ state.FirstUnprocessedQuery = continuationToken.GetFirstUnprocessedQuery();
+
+ if (continuationToken.HasLastProcessedKey()) {
+ TSerializedCellVec lastKey(continuationToken.GetLastProcessedKey());
+ state.LastProcessedKey = TOwnedCellVec(lastKey.GetCells());
+ } else {
+ state.LastProcessedKey.Clear();
+ }
+}
} // !namespace
-TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
+TKqpStreamLookupWorker::TKqpStreamLookupWorker(TLookupSettings&& settings,
const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
const NYql::NDqProto::TTaskInput& inputDesc)
- : Settings(std::move(settings))
- , TypeEnv(typeEnv)
+ : TypeEnv(typeEnv)
, HolderFactory(holderFactory)
, InputDesc(inputDesc)
- , TablePath(Settings.GetTable().GetPath())
- , TableId(MakeTableId(Settings.GetTable())) {
-
- KeyColumns.reserve(Settings.GetKeyColumns().size());
- i32 keyOrder = 0;
- for (const auto& keyColumn : Settings.GetKeyColumns()) {
- NScheme::TTypeInfo typeInfo = NScheme::TypeInfoFromProto(keyColumn.GetTypeId(), keyColumn.GetTypeInfo());
-
- KeyColumns.emplace(
- keyColumn.GetName(),
- TSysTables::TTableColumnInfo{
- keyColumn.GetName(),
- keyColumn.GetId(),
- typeInfo,
- keyColumn.GetTypeInfo().GetPgTypeMod(),
- keyOrder++
- }
- );
- }
-
- LookupKeyColumns.reserve(Settings.GetLookupKeyColumns().size());
- for (const auto& lookupKey : Settings.GetLookupKeyColumns()) {
- auto columnIt = KeyColumns.find(lookupKey);
- YQL_ENSURE(columnIt != KeyColumns.end());
- LookupKeyColumns.push_back(&columnIt->second);
- }
-
- Columns.reserve(Settings.GetColumns().size());
- for (const auto& column : Settings.GetColumns()) {
- NScheme::TTypeInfo typeInfo = NScheme::TypeInfoFromProto(column.GetTypeId(), column.GetTypeInfo());
-
- Columns.emplace_back(TSysTables::TTableColumnInfo{
- column.GetName(),
- column.GetId(),
- typeInfo,
- column.GetTypeInfo().GetPgTypeMod()
- });
- }
-
- KeyColumnTypes.resize(KeyColumns.size());
- for (const auto& [_, columnInfo] : KeyColumns) {
+ , Settings(std::move(settings)) {
+ KeyColumnTypes.resize(Settings.KeyColumns.size());
+ for (const auto& [_, columnInfo] : Settings.KeyColumns) {
YQL_ENSURE(columnInfo.KeyOrder < static_cast<i64>(KeyColumnTypes.size()));
KeyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType;
}
@@ -136,16 +119,16 @@ TKqpStreamLookupWorker::~TKqpStreamLookupWorker() {
}
std::string TKqpStreamLookupWorker::GetTablePath() const {
- return TablePath;
+ return Settings.TablePath;
}
TTableId TKqpStreamLookupWorker::GetTableId() const {
- return TableId;
+ return Settings.TableId;
}
class TKqpLookupRows : public TKqpStreamLookupWorker {
public:
- TKqpLookupRows(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv,
+ TKqpLookupRows(TLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv,
const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc)
: TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc) {
}
@@ -154,9 +137,9 @@ public:
void AddInputRow(NUdf::TUnboxedValue inputRow) final {
NMiniKQL::TStringProviderBackend backend;
- std::vector<TCell> keyCells(LookupKeyColumns.size());
- for (size_t colId = 0; colId < LookupKeyColumns.size(); ++colId) {
- const auto* lookupKeyColumn = LookupKeyColumns[colId];
+ std::vector<TCell> keyCells(Settings.LookupKeyColumns.size());
+ for (size_t colId = 0; colId < Settings.LookupKeyColumns.size(); ++colId) {
+ const auto* lookupKeyColumn = Settings.LookupKeyColumns[colId];
YQL_ENSURE(lookupKeyColumn->KeyOrder < static_cast<i64>(keyCells.size()));
// when making a cell we don't really need to make a copy of data, because
// TOwnedCellVec will make its' own copy.
@@ -164,9 +147,25 @@ public:
inputRow.GetElement(colId), backend, /* copy */ false);
}
- if (keyCells.size() < KeyColumns.size()) {
+ AddInputRowImpl(std::move(keyCells));
+ }
+
+ void AddInputRow(TConstArrayRef<TCell> inputRow) final {
+ NMiniKQL::TStringProviderBackend backend;
+ std::vector<TCell> keyCells(Settings.LookupKeyColumns.size());
+ for (size_t colId = 0; colId < Settings.LookupKeyColumns.size(); ++colId) {
+ const auto* lookupKeyColumn = Settings.LookupKeyColumns[colId];
+ YQL_ENSURE(lookupKeyColumn->KeyOrder < static_cast<i64>(keyCells.size()));
+ keyCells[lookupKeyColumn->KeyOrder] = inputRow[colId];
+ }
+
+ AddInputRowImpl(std::move(keyCells));
+ }
+
+ virtual void AddInputRowImpl(std::vector<TCell> keyCells) {
+ if (keyCells.size() < Settings.KeyColumns.size()) {
// build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf])
- std::vector<TCell> fromCells(KeyColumns.size());
+ std::vector<TCell> fromCells(Settings.KeyColumns.size());
for (size_t i = 0; i < keyCells.size(); ++i) {
fromCells[i] = keyCells[i];
}
@@ -181,18 +180,23 @@ public:
}
}
- std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery,
- TMaybe<TOwnedCellVec> lastProcessedKey, ui64& newReadId) final {
+ std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui64& newReadId) final {
- auto it = PendingKeysByReadId.find(prevReadId);
- if (it == PendingKeysByReadId.end()) {
+ auto it = ReadStateByReadId.find(prevReadId);
+ if (it == ReadStateByReadId.end()) {
return {};
}
std::vector<TOwnedTableRange> unprocessedRanges;
std::vector<TOwnedTableRange> unprocessedPoints;
- auto& ranges = it->second;
+ TReadState state = std::move(it->second);
+ auto& ranges = state.PendingKeys;
+
+ ReadStateByReadId.erase(it);
+
+ ui32 firstUnprocessedQuery = state.FirstUnprocessedQuery;
+ const auto& lastProcessedKey = state.LastProcessedKey;
if (lastProcessedKey) {
YQL_ENSURE(firstUnprocessedQuery < ranges.size());
@@ -212,8 +216,6 @@ public:
}
}
- PendingKeysByReadId.erase(it);
-
std::vector<THolder<TEvDataShard::TEvRead>> requests;
requests.reserve(unprocessedPoints.size() + unprocessedRanges.size());
@@ -221,14 +223,22 @@ public:
THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead());
FillReadRequest(++newReadId, request, unprocessedPoints);
requests.emplace_back(std::move(request));
- PendingKeysByReadId.insert({newReadId, std::move(unprocessedPoints)});
+ ReadStateByReadId.emplace(
+ newReadId,
+ TReadState{
+ .PendingKeys = std::move(unprocessedPoints),
+ });
}
if (!unprocessedRanges.empty()) {
THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead());
FillReadRequest(++newReadId, request, unprocessedRanges);
requests.emplace_back(std::move(request));
- PendingKeysByReadId.insert({newReadId, std::move(unprocessedRanges)});
+ ReadStateByReadId.emplace(
+ newReadId,
+ TReadState{
+ .PendingKeys = std::move(unprocessedRanges),
+ });
}
return requests;
@@ -261,14 +271,22 @@ public:
THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead());
FillReadRequest(++readId, request, points);
readRequests.emplace_back(shardId, std::move(request));
- PendingKeysByReadId.insert({readId, std::move(points)});
+ ReadStateByReadId.emplace(
+ readId,
+ TReadState{
+ .PendingKeys = std::move(points),
+ });
}
for (auto& [shardId, ranges] : rangesPerShard) {
THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead());
FillReadRequest(++readId, request, ranges);
readRequests.emplace_back(shardId, std::move(request));
- PendingKeysByReadId.insert({readId, std::move(ranges)});
+ ReadStateByReadId.emplace(
+ readId,
+ TReadState{
+ .PendingKeys = std::move(ranges),
+ });
}
return readRequests;
@@ -278,8 +296,12 @@ public:
const auto& record = result.ReadResult->Get()->Record;
YQL_ENSURE(record.GetStatus().GetCode() == Ydb::StatusIds::SUCCESS);
- auto it = PendingKeysByReadId.find(record.GetReadId());
- YQL_ENSURE(it != PendingKeysByReadId.end());
+ auto it = ReadStateByReadId.find(record.GetReadId());
+ YQL_ENSURE(it != ReadStateByReadId.end());
+
+ if (!record.GetFinished()) {
+ UpdateContinuationData(record, it->second);
+ }
ReadResults.emplace_back(std::move(result));
}
@@ -296,15 +318,15 @@ public:
auto& result = ReadResults.front();
for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) {
const auto& resultRow = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow);
- YQL_ENSURE(resultRow.size() <= Columns.size(), "Result columns mismatch");
+ YQL_ENSURE(resultRow.size() <= Settings.Columns.size(), "Result columns mismatch");
NUdf::TUnboxedValue* rowItems = nullptr;
- auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);
+ auto row = HolderFactory.CreateDirectArrayHolder(Settings.Columns.size(), rowItems);
i64 rowSize = 0;
i64 storageRowSize = 0;
- for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size(); ++colIndex) {
- const auto& column = Columns[colIndex];
+ for (size_t colIndex = 0, resultColIndex = 0; colIndex < Settings.Columns.size(); ++colIndex) {
+ const auto& column = Settings.Columns[colIndex];
if (IsSystemColumn(column.Name)) {
NMiniKQL::FillSystemColumn(rowItems[colIndex], result.ShardId, column.Id, column.PType);
rowSize += sizeof(NUdf::TUnboxedValue);
@@ -338,8 +360,41 @@ public:
if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) {
if (result.ReadResult->Get()->Record.GetFinished()) {
// delete finished read
- auto it = PendingKeysByReadId.find(result.ReadResult->Get()->Record.GetReadId());
- PendingKeysByReadId.erase(it);
+ auto it = ReadStateByReadId.find(result.ReadResult->Get()->Record.GetReadId());
+ ReadStateByReadId.erase(it);
+ }
+
+ ReadResults.pop_front();
+ }
+ }
+
+ return resultStats;
+ }
+
+ TReadResultStats ReadAllResult(std::function<void(TConstArrayRef<TCell>)> reader) final {
+ TReadResultStats resultStats;
+
+ while (!ReadResults.empty()) {
+ auto& result = ReadResults.front();
+ for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) {
+ const auto& resultRow = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow);
+ YQL_ENSURE(resultRow.size() <= Settings.Columns.size(), "Result columns mismatch");
+
+ const i64 storageRowSize = EstimateSize(resultRow);
+
+ reader(resultRow);
+
+ resultStats.ReadRowsCount += 1;
+ resultStats.ReadBytesCount += storageRowSize;
+ resultStats.ResultRowsCount += 1;
+ resultStats.ResultBytesCount += storageRowSize;
+ }
+
+ if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) {
+ if (result.ReadResult->Get()->Record.GetFinished()) {
+ // delete finished read
+ auto it = ReadStateByReadId.find(result.ReadResult->Get()->Record.GetReadId());
+ ReadStateByReadId.erase(it);
}
ReadResults.pop_front();
@@ -351,19 +406,23 @@ public:
bool AllRowsProcessed() final {
return UnprocessedKeys.empty()
- && PendingKeysByReadId.empty()
+ && ReadStateByReadId.empty()
&& ReadResults.empty();
}
- void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) final {
- auto it = PendingKeysByReadId.find(readId);
- if (it == PendingKeysByReadId.end()) {
+ void ResetRowsProcessing(ui64 readId) final {
+ auto it = ReadStateByReadId.find(readId);
+ if (it == ReadStateByReadId.end()) {
return;
}
+ auto& keys = it->second.PendingKeys;
+
+ ui32 firstUnprocessedQuery = it->second.FirstUnprocessedQuery;
+ const auto& lastProcessedKey = it->second.LastProcessedKey;
if (lastProcessedKey) {
- YQL_ENSURE(firstUnprocessedQuery < it->second.size());
- auto unprocessedRange = it->second[firstUnprocessedQuery];
+ YQL_ENSURE(firstUnprocessedQuery < keys.size());
+ auto unprocessedRange = keys[firstUnprocessedQuery];
YQL_ENSURE(!unprocessedRange.Point);
UnprocessedKeys.emplace_back(*lastProcessedKey, false,
@@ -371,11 +430,11 @@ public:
++firstUnprocessedQuery;
}
- for (ui32 keyIdx = firstUnprocessedQuery; keyIdx < it->second.size(); ++keyIdx) {
- UnprocessedKeys.emplace_back(std::move(it->second[keyIdx]));
+ for (ui32 keyIdx = firstUnprocessedQuery; keyIdx < keys.size(); ++keyIdx) {
+ UnprocessedKeys.emplace_back(std::move(keys[keyIdx]));
}
- PendingKeysByReadId.erase(it);
+ ReadStateByReadId.erase(it);
}
private:
@@ -384,11 +443,11 @@ private:
record.SetReadId(readId);
- record.MutableTableId()->SetOwnerId(TableId.PathId.OwnerId);
- record.MutableTableId()->SetTableId(TableId.PathId.LocalPathId);
- record.MutableTableId()->SetSchemaVersion(TableId.SchemaVersion);
+ record.MutableTableId()->SetOwnerId(Settings.TableId.PathId.OwnerId);
+ record.MutableTableId()->SetTableId(Settings.TableId.PathId.LocalPathId);
+ record.MutableTableId()->SetSchemaVersion(Settings.TableId.SchemaVersion);
- for (const auto& column : Columns) {
+ for (const auto& column : Settings.Columns) {
if (!IsSystemColumn(column.Name)) {
record.AddColumns(column.Id);
}
@@ -406,7 +465,7 @@ private:
for (auto& range : ranges) {
YQL_ENSURE(!range.Point);
- if (range.To.size() < KeyColumns.size()) {
+ if (range.To.size() < Settings.KeyColumns.size()) {
// absent cells mean infinity => in prefix notation `To` should be inclusive
request->Ranges.emplace_back(TSerializedTableRange(range.From, range.InclusiveFrom, range.To, true));
} else {
@@ -418,34 +477,34 @@ private:
private:
std::deque<TOwnedTableRange> UnprocessedKeys;
- std::unordered_map<ui64, std::vector<TOwnedTableRange>> PendingKeysByReadId;
+ std::unordered_map<ui64, TReadState> ReadStateByReadId;
std::deque<TShardReadResult> ReadResults;
};
class TKqpJoinRows : public TKqpStreamLookupWorker {
public:
- TKqpJoinRows(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv,
+ TKqpJoinRows(TLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv,
const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc)
: TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc) {
// read columns should contain join key and result columns
- for (auto joinKey : LookupKeyColumns) {
+ for (auto joinKey : Settings.LookupKeyColumns) {
ReadColumns.emplace(joinKey->Name, *joinKey);
}
- for (auto column : Columns) {
+ for (auto column : Settings.Columns) {
ReadColumns.emplace(column.Name, column);
}
}
void AddInputRow(NUdf::TUnboxedValue inputRow) final {
auto joinKey = inputRow.GetElement(0);
- std::vector<TCell> joinKeyCells(LookupKeyColumns.size());
+ std::vector<TCell> joinKeyCells(Settings.LookupKeyColumns.size());
NMiniKQL::TStringProviderBackend backend;
if (joinKey.HasValue()) {
- for (size_t colId = 0; colId < LookupKeyColumns.size(); ++colId) {
- const auto* joinKeyColumn = LookupKeyColumns[colId];
+ for (size_t colId = 0; colId < Settings.LookupKeyColumns.size(); ++colId) {
+ const auto* joinKeyColumn = Settings.LookupKeyColumns[colId];
YQL_ENSURE(joinKeyColumn->KeyOrder < static_cast<i64>(joinKeyCells.size()));
// when making a cell we don't really need to make a copy of data, because
// TOwnedCellVec will make its' own copy.
@@ -457,22 +516,31 @@ public:
UnprocessedRows.emplace_back(std::make_pair(TOwnedCellVec(joinKeyCells), std::move(inputRow.GetElement(1))));
}
+ void AddInputRow(TConstArrayRef<TCell>) final {
+ YQL_ENSURE(false);
+ }
+
bool IsOverloaded() final {
return UnprocessedRows.size() >= MAX_IN_FLIGHT_LIMIT || PendingLeftRowsByKey.size() >= MAX_IN_FLIGHT_LIMIT || ResultRowsBySeqNo.size() >= MAX_IN_FLIGHT_LIMIT;
}
- std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery,
- TMaybe<TOwnedCellVec> lastProcessedKey, ui64& newReadId) final {
+ std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui64& newReadId) final {
- auto readIt = PendingKeysByReadId.find(prevReadId);
- if (readIt == PendingKeysByReadId.end()) {
+ auto readIt = ReadStateByReadId.find(prevReadId);
+ if (readIt == ReadStateByReadId.end()) {
return {};
}
std::vector<TOwnedTableRange> unprocessedRanges;
std::vector<TOwnedTableRange> unprocessedPoints;
- auto& ranges = readIt->second;
+ TReadState state = std::move(readIt->second);
+ auto& ranges = state.PendingKeys;
+
+ ReadStateByReadId.erase(readIt);
+
+ ui32 firstUnprocessedQuery = state.FirstUnprocessedQuery;
+ const auto& lastProcessedKey = state.LastProcessedKey;
if (lastProcessedKey) {
YQL_ENSURE(firstUnprocessedQuery < ranges.size());
@@ -496,8 +564,6 @@ public:
}
}
- PendingKeysByReadId.erase(readIt);
-
std::vector<THolder<TEvDataShard::TEvRead>> readRequests;
if (!unprocessedPoints.empty()) {
@@ -511,7 +577,11 @@ public:
rowIt->second.PendingReads.insert(newReadId);
}
- PendingKeysByReadId.insert({newReadId, std::move(unprocessedPoints)});
+ ReadStateByReadId.emplace(
+ newReadId,
+ TReadState{
+ .PendingKeys = std::move(unprocessedPoints),
+ });
}
if (!unprocessedRanges.empty()) {
@@ -525,7 +595,11 @@ public:
rowIt->second.PendingReads.insert(newReadId);
}
- PendingKeysByReadId.insert({newReadId, std::move(unprocessedRanges)});
+ ReadStateByReadId.emplace(
+ newReadId,
+ TReadState{
+ .PendingKeys = std::move(unprocessedRanges),
+ });
}
return readRequests;
@@ -560,7 +634,7 @@ public:
}
auto isKeyAllowed = [&](const TOwnedCellVec& cellVec) {
- auto allowNullKeysPrefixSize = Settings.HasAllowNullKeysPrefixSize() ? Settings.GetAllowNullKeysPrefixSize() : 0;
+ auto allowNullKeysPrefixSize = Settings.AllowNullKeysPrefixSize;
if (allowNullKeysPrefixSize >= cellVec.size()) {
// all lookup key components can contain null
return true;
@@ -579,9 +653,9 @@ public:
UnprocessedRows.pop_front();
if (isKeyAllowed(joinKey)) {
std::vector <std::pair<ui64, TOwnedTableRange>> partitions;
- if (joinKey.size() < KeyColumns.size()) {
+ if (joinKey.size() < Settings.KeyColumns.size()) {
// build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf])
- std::vector <TCell> fromCells(KeyColumns.size());
+ std::vector <TCell> fromCells(Settings.KeyColumns.size());
fromCells.insert(fromCells.begin(), joinKey.begin(), joinKey.end());
bool fromInclusive = true;
bool toInclusive = false;
@@ -620,7 +694,11 @@ public:
rowIt->second.PendingReads.insert(readId);
}
- PendingKeysByReadId.insert({readId, std::move(points)});
+ ReadStateByReadId.emplace(
+ readId,
+ TReadState{
+ .PendingKeys = std::move(points),
+ });
}
for (auto& [shardId, ranges] : rangesPerShard) {
@@ -634,7 +712,11 @@ public:
rowIt->second.PendingReads.insert(readId);
}
- PendingKeysByReadId.insert({readId, std::move(ranges)});
+ ReadStateByReadId.emplace(
+ readId,
+ TReadState{
+ .PendingKeys = std::move(ranges),
+ });
}
return requests;
@@ -644,25 +726,25 @@ public:
const auto& record = result.ReadResult->Get()->Record;
YQL_ENSURE(record.GetStatus().GetCode() == Ydb::StatusIds::SUCCESS);
- auto pendingKeysIt = PendingKeysByReadId.find(record.GetReadId());
- YQL_ENSURE(pendingKeysIt != PendingKeysByReadId.end());
+ auto pendingKeysIt = ReadStateByReadId.find(record.GetReadId());
+ YQL_ENSURE(pendingKeysIt != ReadStateByReadId.end());
for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) {
const auto& row = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow);
// result can contain fewer columns because of system columns
YQL_ENSURE(row.size() <= ReadColumns.size(), "Result columns mismatch");
- std::vector<TCell> joinKeyCells(LookupKeyColumns.size());
- for (size_t joinKeyColumn = 0; joinKeyColumn < LookupKeyColumns.size(); ++joinKeyColumn) {
- auto columnIt = ReadColumns.find(LookupKeyColumns[joinKeyColumn]->Name);
+ std::vector<TCell> joinKeyCells(Settings.LookupKeyColumns.size());
+ for (size_t joinKeyColumn = 0; joinKeyColumn < Settings.LookupKeyColumns.size(); ++joinKeyColumn) {
+ auto columnIt = ReadColumns.find(Settings.LookupKeyColumns[joinKeyColumn]->Name);
YQL_ENSURE(columnIt != ReadColumns.end());
- joinKeyCells[LookupKeyColumns[joinKeyColumn]->KeyOrder] = row[std::distance(ReadColumns.begin(), columnIt)];
+ joinKeyCells[Settings.LookupKeyColumns[joinKeyColumn]->KeyOrder] = row[std::distance(ReadColumns.begin(), columnIt)];
}
auto leftRowIt = PendingLeftRowsByKey.find(joinKeyCells);
YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end());
- if (Settings.GetLookupStrategy() == NKqpProto::EStreamLookupStrategy::SEMI_JOIN && leftRowIt->second.RightRowExist) {
+ if (Settings.LookupStrategy == NKqpProto::EStreamLookupStrategy::SEMI_JOIN && leftRowIt->second.RightRowExist) {
// semi join should return one result row per key
continue;
}
@@ -674,7 +756,7 @@ public:
}
if (record.GetFinished()) {
- for (const auto& key : pendingKeysIt->second) {
+ for (const auto& key : pendingKeysIt->second.PendingKeys) {
auto leftRowIt = PendingLeftRowsByKey.find(ExtractKeyPrefix(key));
if (leftRowIt != PendingLeftRowsByKey.end()) {
leftRowIt->second.PendingReads.erase(record.GetReadId());
@@ -691,26 +773,30 @@ public:
}
}
- PendingKeysByReadId.erase(pendingKeysIt);
+ ReadStateByReadId.erase(pendingKeysIt);
+ } else {
+ UpdateContinuationData(record, pendingKeysIt->second);
}
}
bool AllRowsProcessed() final {
return UnprocessedRows.empty()
&& UnprocessedKeys.empty()
- && PendingKeysByReadId.empty()
+ && ReadStateByReadId.empty()
&& ResultRowsBySeqNo.empty()
&& PendingLeftRowsByKey.empty();
}
- void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) final {
- auto readIt = PendingKeysByReadId.find(readId);
- if (readIt == PendingKeysByReadId.end()) {
+ void ResetRowsProcessing(ui64 readId) final {
+ auto readIt = ReadStateByReadId.find(readId);
+ if (readIt == ReadStateByReadId.end()) {
return;
}
- auto& ranges = readIt->second;
+ auto& ranges = readIt->second.PendingKeys;
+ ui32 firstUnprocessedQuery = readIt->second.FirstUnprocessedQuery;
+ const auto& lastProcessedKey = readIt->second.LastProcessedKey;
if (lastProcessedKey) {
YQL_ENSURE(firstUnprocessedQuery < ranges.size());
auto unprocessedRange = ranges[firstUnprocessedQuery];
@@ -730,7 +816,7 @@ public:
UnprocessedKeys.emplace_back(std::move(range));
}
- PendingKeysByReadId.erase(readIt);
+ ReadStateByReadId.erase(readIt);
}
TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
@@ -800,6 +886,10 @@ public:
return resultStats;
}
+ TReadResultStats ReadAllResult(std::function<void(TConstArrayRef<TCell>)>) final {
+ YQL_ENSURE(false);
+ }
+
~TKqpJoinRows() {
UnprocessedRows.clear();
PendingLeftRowsByKey.clear();
@@ -828,7 +918,7 @@ private:
};
bool ShoulKeepRowsOrder() const {
- return Settings.HasKeepRowsOrder() && Settings.GetKeepRowsOrder();
+ return Settings.KeepRowsOrder;
}
bool IsRowSeqNoValid(const ui64& seqNo) const {
@@ -845,9 +935,9 @@ private:
record.SetReadId(readId);
- record.MutableTableId()->SetOwnerId(TableId.PathId.OwnerId);
- record.MutableTableId()->SetTableId(TableId.PathId.LocalPathId);
- record.MutableTableId()->SetSchemaVersion(TableId.SchemaVersion);
+ record.MutableTableId()->SetOwnerId(Settings.TableId.PathId.OwnerId);
+ record.MutableTableId()->SetTableId(Settings.TableId.PathId.LocalPathId);
+ record.MutableTableId()->SetSchemaVersion(Settings.TableId.SchemaVersion);
for (const auto& [name, column] : ReadColumns) {
if (!IsSystemColumn(name)) {
@@ -866,7 +956,7 @@ private:
request->Ranges.reserve(ranges.size());
for (auto& range : ranges) {
YQL_ENSURE(!range.Point);
- if (range.To.size() < KeyColumns.size()) {
+ if (range.To.size() < Settings.KeyColumns.size()) {
// Absent cells mean infinity. So in prefix notation `To` should be inclusive.
request->Ranges.emplace_back(TSerializedTableRange(range.From, range.InclusiveFrom, range.To, true));
} else {
@@ -877,11 +967,11 @@ private:
}
TConstArrayRef<TCell> ExtractKeyPrefix(const TOwnedTableRange& range) {
- if (range.From.size() == LookupKeyColumns.size()) {
+ if (range.From.size() == Settings.LookupKeyColumns.size()) {
return range.From;
}
- return range.From.subspan(0, LookupKeyColumns.size());
+ return range.From.subspan(0, Settings.LookupKeyColumns.size());
}
NMiniKQL::TStructType* GetLeftRowType() {
@@ -928,10 +1018,10 @@ private:
leftRowInfo.RightRowExist = true;
NUdf::TUnboxedValue* rightRowItems = nullptr;
- resultRowItems[1] = HolderFactory.CreateDirectArrayHolder(Columns.size(), rightRowItems);
+ resultRowItems[1] = HolderFactory.CreateDirectArrayHolder(Settings.Columns.size(), rightRowItems);
- for (size_t colIndex = 0; colIndex < Columns.size(); ++colIndex) {
- const auto& column = Columns[colIndex];
+ for (size_t colIndex = 0; colIndex < Settings.Columns.size(); ++colIndex) {
+ const auto& column = Settings.Columns[colIndex];
auto it = ReadColumns.find(column.Name);
YQL_ENSURE(it != ReadColumns.end());
@@ -963,7 +1053,7 @@ private:
std::map<std::string, TSysTables::TTableColumnInfo> ReadColumns;
std::deque<std::pair<TOwnedCellVec, NUdf::TUnboxedValue>> UnprocessedRows;
std::deque<TOwnedTableRange> UnprocessedKeys;
- std::unordered_map<ui64, std::vector<TOwnedTableRange>> PendingKeysByReadId;
+ std::unordered_map<ui64, TReadState> ReadStateByReadId;
absl::flat_hash_map<TOwnedCellVec, TLeftRowInfo, NKikimr::TCellVectorsHash, NKikimr::TCellVectorsEquals> PendingLeftRowsByKey;
std::unordered_map<ui64, TResultBatch> ResultRowsBySeqNo;
ui64 InputRowSeqNo = 0;
@@ -975,12 +1065,54 @@ std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKq
const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
const NYql::NDqProto::TTaskInput& inputDesc) {
+ TLookupSettings preparedSettings;
+ preparedSettings.TablePath = std::move(settings.GetTable().GetPath());
+ preparedSettings.TableId = MakeTableId(settings.GetTable());
+
+ preparedSettings.AllowNullKeysPrefixSize = settings.HasAllowNullKeysPrefixSize() ? settings.GetAllowNullKeysPrefixSize() : 0;
+ preparedSettings.KeepRowsOrder = settings.HasKeepRowsOrder() && settings.GetKeepRowsOrder();
+ preparedSettings.LookupStrategy = settings.GetLookupStrategy();
+
+ preparedSettings.KeyColumns.reserve(settings.GetKeyColumns().size());
+ i32 keyOrder = 0;
+ for (const auto& keyColumn : settings.GetKeyColumns()) {
+ NScheme::TTypeInfo typeInfo = NScheme::TypeInfoFromProto(keyColumn.GetTypeId(), keyColumn.GetTypeInfo());
+ preparedSettings.KeyColumns.emplace(
+ keyColumn.GetName(),
+ TSysTables::TTableColumnInfo{
+ keyColumn.GetName(),
+ keyColumn.GetId(),
+ typeInfo,
+ keyColumn.GetTypeInfo().GetPgTypeMod(),
+ keyOrder++
+ }
+ );
+ }
+
+ preparedSettings.LookupKeyColumns.reserve(settings.GetLookupKeyColumns().size());
+ for (const auto& lookupKey : settings.GetLookupKeyColumns()) {
+ auto columnIt = preparedSettings.KeyColumns.find(lookupKey);
+ YQL_ENSURE(columnIt != preparedSettings.KeyColumns.end());
+ preparedSettings.LookupKeyColumns.push_back(&columnIt->second);
+ }
+
+ preparedSettings.Columns.reserve(settings.GetColumns().size());
+ for (const auto& column : settings.GetColumns()) {
+ NScheme::TTypeInfo typeInfo = NScheme::TypeInfoFromProto(column.GetTypeId(), column.GetTypeInfo());
+ preparedSettings.Columns.emplace_back(TSysTables::TTableColumnInfo{
+ column.GetName(),
+ column.GetId(),
+ typeInfo,
+ column.GetTypeInfo().GetPgTypeMod()
+ });
+ }
+
switch (settings.GetLookupStrategy()) {
case NKqpProto::EStreamLookupStrategy::LOOKUP:
- return std::make_unique<TKqpLookupRows>(std::move(settings), typeEnv, holderFactory, inputDesc);
+ return std::make_unique<TKqpLookupRows>(std::move(preparedSettings), typeEnv, holderFactory, inputDesc);
case NKqpProto::EStreamLookupStrategy::JOIN:
case NKqpProto::EStreamLookupStrategy::SEMI_JOIN:
- return std::make_unique<TKqpJoinRows>(std::move(settings), typeEnv, holderFactory, inputDesc);
+ return std::make_unique<TKqpJoinRows>(std::move(preparedSettings), typeEnv, holderFactory, inputDesc);
default:
return {};
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
index 26dfbf5462c..c4668f848a4 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
@@ -11,6 +11,19 @@
namespace NKikimr {
namespace NKqp {
+struct TLookupSettings {
+ TString TablePath;
+ TTableId TableId;
+
+ ui32 AllowNullKeysPrefixSize;
+ bool KeepRowsOrder;
+ NKqpProto::EStreamLookupStrategy LookupStrategy;
+
+ std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns;
+ std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns;
+ std::vector<TSysTables::TTableColumnInfo> Columns;
+};
+
class TKqpStreamLookupWorker {
public:
using TReadList = std::vector<std::pair<ui64, THolder<TEvDataShard::TEvRead>>>;
@@ -45,7 +58,7 @@ public:
};
public:
- TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv,
+ TKqpStreamLookupWorker(TLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv,
const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc);
virtual ~TKqpStreamLookupWorker();
@@ -58,25 +71,22 @@ public:
}
virtual void AddInputRow(NUdf::TUnboxedValue inputRow) = 0;
- virtual std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery,
- TMaybe<TOwnedCellVec> lastProcessedKey, ui64& newReadId) = 0;
+ virtual void AddInputRow(TConstArrayRef<TCell> inputRow) = 0;
+ virtual std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui64& newReadId) = 0;
virtual TReadList BuildRequests(const TPartitionInfo& partitioning, ui64& readId) = 0;
virtual void AddResult(TShardReadResult result) = 0;
virtual TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) = 0;
+ virtual TReadResultStats ReadAllResult(std::function<void(TConstArrayRef<TCell>)> reader) = 0;
virtual bool AllRowsProcessed() = 0;
- virtual void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) = 0;
+ virtual void ResetRowsProcessing(ui64 readId) = 0;
virtual bool IsOverloaded() = 0;
protected:
- const NKikimrKqp::TKqpStreamLookupSettings Settings;
const NMiniKQL::TTypeEnvironment& TypeEnv;
const NMiniKQL::THolderFactory& HolderFactory;
const NYql::NDqProto::TTaskInput& InputDesc;
- const TString TablePath;
- const TTableId TableId;
- std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns;
- std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns;
- std::vector<TSysTables::TTableColumnInfo> Columns;
+ const TLookupSettings Settings;
+
std::vector<NScheme::TTypeInfo> KeyColumnTypes;
};