diff options
author | Mikhail Surin <ssmike@ydb.tech> | 2024-08-13 13:49:17 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-13 13:49:17 +0300 |
commit | cc68f9c9047467e87333e2488c9826520515d6e1 (patch) | |
tree | 2e26738feaf397fc68c6a05bdb471244a5d8059c | |
parent | e437bd0a59ec8116480f31afd63749dc11a8b8e0 (diff) | |
download | ydb-cc68f9c9047467e87333e2488c9826520515d6e1.tar.gz |
Return seqno & row index for duplicate rows (#7660)
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 19 |
1 files changed, 14 insertions, 5 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 8d149e5b9d8..3434f1209be 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -54,11 +54,13 @@ public: size_t ProcessedRows = 0; size_t PackedRows = 0; ui64 ReadId; + ui64 SeqNo; - TResult(ui64 shardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>> readResult, ui64 readId) + TResult(ui64 shardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>> readResult, ui64 readId, ui64 seqNo) : ShardId(shardId) , ReadResult(std::move(readResult)) , ReadId(readId) + , SeqNo(seqNo) { } }; @@ -983,6 +985,7 @@ public: CA_LOG_D("Taken " << Locks.size() << " locks"); Reads[id].SerializedContinuationToken = record.GetContinuationToken(); + ui64 seqNo = ev->Get()->Record.GetSeqNo(); Reads[id].RegisterMessage(*ev->Get()); @@ -992,7 +995,7 @@ public: << " finished = " << ev->Get()->Record.GetFinished()); CA_LOG_T(TStringBuilder() << "read #" << id << " pushed " << DebugPrintCells(ev->Get()) << " continuation token " << DebugPrintContionuationToken(record.GetContinuationToken())); - Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release()), id}); + Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release()), id, seqNo}); NotifyCA(); } @@ -1057,7 +1060,7 @@ public: } NMiniKQL::TBytesStatistics PackArrow(TResult& handle, i64& freeSpace) { - auto& [shardId, result, batch, _, packed, readId] = handle; + auto& [shardId, result, batch, _, packed, readId, seqNo] = handle; NMiniKQL::TBytesStatistics stats; bool hasResultColumns = false; if (result->Get()->GetRowsCount() == 0) { @@ -1144,7 +1147,7 @@ public: } NMiniKQL::TBytesStatistics PackCells(TResult& handle, i64& freeSpace) { - auto& [shardId, result, batch, processedRows, packed, readId] = handle; + auto& [shardId, result, batch, processedRows, packed, readId, seqNo] = handle; NMiniKQL::TBytesStatistics stats; batch->reserve(batch->size()); CA_LOG_D(TStringBuilder() << "enter pack cells method " @@ -1205,13 +1208,17 @@ public: << " current is " << handle.ShardId << " previous readId is " << ptr->ReadId << " current is " << handle.ReadId + << " previous seqNo is " << ptr->SeqNo + << " current is " << handle.SeqNo + << " previous row number is " << ptr->RowIndex + << " current is " << rowIndex << " key is " << rowRepr; CA_LOG_E(rowMessage); Counters->RowsDuplicationsFound->Inc(); RuntimeError(rowMessage, NYql::NDqProto::StatusIds::INTERNAL_ERROR, {}); return stats; } - DuplicateCheckStats[result] = {.ReadId = readId , .ShardId = handle.ShardId}; + DuplicateCheckStats[result] = {.ReadId = readId , .ShardId = handle.ShardId, .SeqNo = seqNo, .RowIndex = rowIndex }; } stats.DataBytes += rowSize; @@ -1547,6 +1554,8 @@ private: struct TDuplicationStats { ui64 ReadId; ui64 ShardId; + ui64 SeqNo; + ui64 RowIndex; }; THashMap<TString, TDuplicationStats> DuplicateCheckStats; TVector<TResultColumn> DuplicateCheckExtraColumns; |