aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@ydb.tech>2025-04-30 21:22:24 +0200
committerGitHub <noreply@github.com>2025-04-30 22:22:24 +0300
commit093c9314e8f9b51b6a499f0f942dbed4fdab1a25 (patch)
tree6749456ac462be1ccc992d4c3af4713654ebdc99
parentae94de2b7e365965e9ef3fe9f18f646fb88664f9 (diff)
downloadydb-093c9314e8f9b51b6a499f0f942dbed4fdab1a25.tar.gz
Return virtual timestamp for scan query (#14001)
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_query.cpp7
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp23
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h6
-rw-r--r--ydb/core/kqp/ut/scan/kqp_scan_ut.cpp51
-rw-r--r--ydb/core/protos/kqp.proto1
-rw-r--r--ydb/public/api/protos/ydb_query.proto2
-rw-r--r--ydb/public/api/protos/ydb_table.proto2
-rw-r--r--ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h11
-rw-r--r--ydb/public/sdk/cpp/src/client/table/impl/readers.cpp9
9 files changed, 97 insertions, 15 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp
index 489e606d28c..df1ac5f69b0 100644
--- a/ydb/core/grpc_services/query/rpc_execute_query.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp
@@ -354,6 +354,13 @@ private:
response->set_result_set_index(ev->Get()->Record.GetQueryResultIndex());
response->mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet());
+ if (ev->Get()->Record.HasVirtualTimestamp()) {
+ auto snap = response->mutable_snapshot_timestamp();
+ auto& ts = ev->Get()->Record.GetVirtualTimestamp();
+ snap->set_plan_step(ts.GetStep());
+ snap->set_tx_id(ts.GetTxId());
+ }
+
TString out;
Y_PROTOBUF_SUPPRESS_NODISCARD response->SerializeToString(&out);
diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
index 6d2a6713be8..6d4e71d8bd7 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
@@ -358,27 +358,40 @@ private:
ExecuterActorId_ = ev->Sender;
}
+ auto& evRecord = ev->Get()->Record;
+
Ydb::Table::ExecuteScanQueryPartialResponse response;
- response.set_status(StatusIds::SUCCESS);
- response.mutable_result()->mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet());
+
+ {
+ response.set_status(StatusIds::SUCCESS);
+ auto result = response.mutable_result();
+ result->mutable_result_set()->Swap(evRecord.MutableResultSet());
+
+ if (evRecord.HasVirtualTimestamp()) {
+ auto snap = result->mutable_snapshot();
+ auto ts = evRecord.GetVirtualTimestamp();
+ snap->set_plan_step(ts.GetStep());
+ snap->set_tx_id(ts.GetTxId());
+ }
+ }
TString out;
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
FlowControl_.PushResponse(out.size());
const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
- LastSeqNo_ = ev->Get()->Record.GetSeqNo();
+ LastSeqNo_ = evRecord.GetSeqNo();
AckedFreeSpaceBytes_ = freeSpaceBytes;
Request_->SendSerializedResult(std::move(out), StatusIds::SUCCESS);
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " Send stream data ack"
- << ", seqNo: " << ev->Get()->Record.GetSeqNo()
+ << ", seqNo: " << evRecord.GetSeqNo()
<< ", freeSpace: " << freeSpaceBytes
<< ", to: " << ev->Sender
<< ", queue: " << FlowControl_.QueueSize());
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(evRecord.GetSeqNo(), evRecord.GetChannelId());
resp->Record.SetFreeSpace(freeSpaceBytes);
ctx.Send(ev->Sender, resp.Release());
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index c8dbcb3f163..de9f112f972 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -333,6 +333,12 @@ protected:
streamEv->Record.SetSeqNo(computeData.Proto.GetSeqNo());
streamEv->Record.SetQueryResultIndex(*txResult.QueryResultIndex + StatementResultIndex);
streamEv->Record.SetChannelId(channel.Id);
+ const auto& snap = GetSnapshot();
+ if (snap.IsValid()) {
+ auto vt = streamEv->Record.MutableVirtualTimestamp();
+ vt->SetStep(snap.Step);
+ vt->SetTxId(snap.TxId);
+ }
TKqpProtoBuilder protoBuilder{*AppData()->FunctionRegistry};
protoBuilder.BuildYdbResultSet(*streamEv->Record.MutableResultSet(), std::move(batches),
diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
index 9ece5942a08..fd196ac9834 100644
--- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
+++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
@@ -531,17 +531,52 @@ Y_UNIT_TEST_SUITE(KqpScan) {
.Build()
.Build();
- auto it = db.StreamExecuteScanQuery(R"(
- DECLARE $key AS Uint64;
- SELECT * FROM `/Root/EightShard` WHERE Key = $key;
- )", params).GetValueSync();
+ {
+ auto it = db.StreamExecuteScanQuery(R"(
+ DECLARE $key AS Uint64;
- UNIT_ASSERT(it.IsSuccess());
+ SELECT * FROM `/Root/EightShard` WHERE Key = $key;
+ )", params).GetValueSync();
- CompareYson(R"([
- [[1];[202u];["Value2"]]
- ])", StreamResultToYson(it));
+ UNIT_ASSERT(it.IsSuccess());
+
+ CompareYson(R"([
+ [[1];[202u];["Value2"]]
+ ])", StreamResultToYson(it));
+ }
+
+ {
+ auto it = db.StreamExecuteScanQuery(R"(
+ DECLARE $key AS Uint64;
+
+ SELECT * FROM `/Root/EightShard` WHERE Key = $key;
+ )", params).GetValueSync();
+
+ UNIT_ASSERT(it.IsSuccess());
+ auto part = it.ReadNext().GetValueSync();
+ UNIT_ASSERT(part.IsSuccess());
+
+
+ UNIT_ASSERT(part.HasVirtualTimestamp());
+ UNIT_ASSERT(part.GetVirtualTimestamp().GetStep() != 0);
+ UNIT_ASSERT(part.GetVirtualTimestamp().GetTxId() != 0);
+ }
+
+ {
+ auto it = db.StreamExecuteScanQuery(R"(
+ SELECT * FROM `/Root/EightShard` WHERE Key = 9876554123;
+ )", params).GetValueSync();
+
+ UNIT_ASSERT(it.IsSuccess());
+ auto part = it.ReadNext().GetValueSync();
+ UNIT_ASSERT(part.IsSuccess());
+
+
+ UNIT_ASSERT(part.HasVirtualTimestamp());
+ UNIT_ASSERT(part.GetVirtualTimestamp().GetStep() != 0);
+ UNIT_ASSERT(part.GetVirtualTimestamp().GetTxId() != 0);
+ }
}
Y_UNIT_TEST(AggregateByColumn) {
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 3fed3c2fd4a..20a6b5a1015 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -512,6 +512,7 @@ message TEvExecuterStreamData {
optional uint32 QueryResultIndex = 3;
optional uint32 ChannelId = 4;
optional NActorsProto.TActorId ChannelActorId = 5;
+ optional TKqpSnapshot VirtualTimestamp = 6;
};
message TEvExecuterStreamDataAck {
diff --git a/ydb/public/api/protos/ydb_query.proto b/ydb/public/api/protos/ydb_query.proto
index b065520458c..1a4806f17b4 100644
--- a/ydb/public/api/protos/ydb_query.proto
+++ b/ydb/public/api/protos/ydb_query.proto
@@ -8,6 +8,7 @@ option java_outer_classname = "QueryProtos";
import "google/protobuf/duration.proto";
import "ydb/public/api/protos/annotations/validation.proto";
+import "ydb/public/api/protos/ydb_common.proto";
import "ydb/public/api/protos/ydb_issue_message.proto";
import "ydb/public/api/protos/ydb_operation.proto";
import "ydb/public/api/protos/ydb_query_stats.proto";
@@ -200,6 +201,7 @@ message ExecuteQueryResponsePart {
Ydb.TableStats.QueryStats exec_stats = 5;
TransactionMeta tx_meta = 6;
+ VirtualTimestamp snapshot_timestamp = 7;
}
message ExecuteScriptRequest {
diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto
index 3d69ae628f0..9d29891b47e 100644
--- a/ydb/public/api/protos/ydb_table.proto
+++ b/ydb/public/api/protos/ydb_table.proto
@@ -1293,6 +1293,8 @@ message ExecuteScanQueryPartialResult {
// works only in mode: MODE_EXPLAIN,
// collects additional diagnostics about query compilation, including query plan and scheme
string query_full_diagnostics = 7 [deprecated = true];
+ // Optional snapshot that corresponds to the returned data
+ VirtualTimestamp snapshot = 8;
}
// Returns information about an external data source with a given path.
diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h
index 78d6980d036..0f8e9a07947 100644
--- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h
+++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h
@@ -2063,6 +2063,8 @@ private:
uint64_t TxId_;
};
+using TVirtualTimestamp = TReadTableSnapshot;
+
template<typename TPart>
class TSimpleStreamPart : public TStreamPartStatus {
public:
@@ -2117,6 +2119,10 @@ public:
const std::string& GetDiagnostics() const { return *Diagnostics_; }
std::string&& ExtractDiagnostics() { return std::move(*Diagnostics_); }
+ bool HasVirtualTimestamp() const { return Vt_.has_value(); }
+ const TVirtualTimestamp& GetVirtualTimestamp() const { return *Vt_; }
+ TVirtualTimestamp&& ExtractVirtualTimestamp() { return std::move(*Vt_); }
+
TScanQueryPart(TStatus&& status)
: TStreamPartStatus(std::move(status))
{}
@@ -2127,17 +2133,20 @@ public:
, Diagnostics_(diagnostics)
{}
- TScanQueryPart(TStatus&& status, TResultSet&& resultSet, const std::optional<TQueryStats>& queryStats, const std::optional<std::string>& diagnostics)
+ TScanQueryPart(TStatus&& status, TResultSet&& resultSet, const std::optional<TQueryStats>& queryStats,
+ const std::optional<std::string>& diagnostics, std::optional<TVirtualTimestamp>&& vt)
: TStreamPartStatus(std::move(status))
, ResultSet_(std::move(resultSet))
, QueryStats_(queryStats)
, Diagnostics_(diagnostics)
+ , Vt_(std::move(vt))
{}
private:
std::optional<TResultSet> ResultSet_;
std::optional<TQueryStats> QueryStats_;
std::optional<std::string> Diagnostics_;
+ std::optional<TVirtualTimestamp> Vt_;
};
using TAsyncScanQueryPart = NThreading::TFuture<TScanQueryPart>;
diff --git a/ydb/public/sdk/cpp/src/client/table/impl/readers.cpp b/ydb/public/sdk/cpp/src/client/table/impl/readers.cpp
index 64735b3fa82..4a614b3010f 100644
--- a/ydb/public/sdk/cpp/src/client/table/impl/readers.cpp
+++ b/ydb/public/sdk/cpp/src/client/table/impl/readers.cpp
@@ -89,9 +89,16 @@ TAsyncScanQueryPart TScanQueryPartIterator::TReaderImpl::ReadNext(std::shared_pt
diagnostics = self->Response_.result().query_full_diagnostics();
+ std::optional<TVirtualTimestamp> vt;
+
+ if (self->Response_.result().has_snapshot()) {
+ const auto& snap = self->Response_.result().snapshot();
+ vt = TVirtualTimestamp(snap.plan_step(), snap.tx_id());
+ }
+
if (self->Response_.result().has_result_set()) {
promise.SetValue({std::move(status),
- TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())), queryStats, diagnostics});
+ TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())), queryStats, diagnostics, std::move(vt)});
} else {
promise.SetValue({std::move(status), queryStats, diagnostics});
}