aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMikhail Surin <ssmike@ydb.tech>2024-08-13 13:49:17 +0300
committerGitHub <noreply@github.com>2024-08-13 13:49:17 +0300
commitcc68f9c9047467e87333e2488c9826520515d6e1 (patch)
tree2e26738feaf397fc68c6a05bdb471244a5d8059c
parente437bd0a59ec8116480f31afd63749dc11a8b8e0 (diff)
downloadydb-cc68f9c9047467e87333e2488c9826520515d6e1.tar.gz
Return seqno & row index for duplicate rows (#7660)
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp19
1 files changed, 14 insertions, 5 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 8d149e5b9d8..3434f1209be 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -54,11 +54,13 @@ public:
size_t ProcessedRows = 0;
size_t PackedRows = 0;
ui64 ReadId;
+ ui64 SeqNo;
- TResult(ui64 shardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>> readResult, ui64 readId)
+ TResult(ui64 shardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>> readResult, ui64 readId, ui64 seqNo)
: ShardId(shardId)
, ReadResult(std::move(readResult))
, ReadId(readId)
+ , SeqNo(seqNo)
{
}
};
@@ -983,6 +985,7 @@ public:
CA_LOG_D("Taken " << Locks.size() << " locks");
Reads[id].SerializedContinuationToken = record.GetContinuationToken();
+ ui64 seqNo = ev->Get()->Record.GetSeqNo();
Reads[id].RegisterMessage(*ev->Get());
@@ -992,7 +995,7 @@ public:
<< " finished = " << ev->Get()->Record.GetFinished());
CA_LOG_T(TStringBuilder() << "read #" << id << " pushed " << DebugPrintCells(ev->Get()) << " continuation token " << DebugPrintContionuationToken(record.GetContinuationToken()));
- Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release()), id});
+ Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release()), id, seqNo});
NotifyCA();
}
@@ -1057,7 +1060,7 @@ public:
}
NMiniKQL::TBytesStatistics PackArrow(TResult& handle, i64& freeSpace) {
- auto& [shardId, result, batch, _, packed, readId] = handle;
+ auto& [shardId, result, batch, _, packed, readId, seqNo] = handle;
NMiniKQL::TBytesStatistics stats;
bool hasResultColumns = false;
if (result->Get()->GetRowsCount() == 0) {
@@ -1144,7 +1147,7 @@ public:
}
NMiniKQL::TBytesStatistics PackCells(TResult& handle, i64& freeSpace) {
- auto& [shardId, result, batch, processedRows, packed, readId] = handle;
+ auto& [shardId, result, batch, processedRows, packed, readId, seqNo] = handle;
NMiniKQL::TBytesStatistics stats;
batch->reserve(batch->size());
CA_LOG_D(TStringBuilder() << "enter pack cells method "
@@ -1205,13 +1208,17 @@ public:
<< " current is " << handle.ShardId
<< " previous readId is " << ptr->ReadId
<< " current is " << handle.ReadId
+ << " previous seqNo is " << ptr->SeqNo
+ << " current is " << handle.SeqNo
+ << " previous row number is " << ptr->RowIndex
+ << " current is " << rowIndex
<< " key is " << rowRepr;
CA_LOG_E(rowMessage);
Counters->RowsDuplicationsFound->Inc();
RuntimeError(rowMessage, NYql::NDqProto::StatusIds::INTERNAL_ERROR, {});
return stats;
}
- DuplicateCheckStats[result] = {.ReadId = readId , .ShardId = handle.ShardId};
+ DuplicateCheckStats[result] = {.ReadId = readId , .ShardId = handle.ShardId, .SeqNo = seqNo, .RowIndex = rowIndex };
}
stats.DataBytes += rowSize;
@@ -1547,6 +1554,8 @@ private:
struct TDuplicationStats {
ui64 ReadId;
ui64 ShardId;
+ ui64 SeqNo;
+ ui64 RowIndex;
};
THashMap<TString, TDuplicationStats> DuplicateCheckStats;
TVector<TResultColumn> DuplicateCheckExtraColumns;