diff options
author | Daniil Cherednik <dcherednik@ydb.tech> | 2025-04-30 21:22:24 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-30 22:22:24 +0300 |
commit | 093c9314e8f9b51b6a499f0f942dbed4fdab1a25 (patch) | |
tree | 6749456ac462be1ccc992d4c3af4713654ebdc99 | |
parent | ae94de2b7e365965e9ef3fe9f18f646fb88664f9 (diff) | |
download | ydb-093c9314e8f9b51b6a499f0f942dbed4fdab1a25.tar.gz |
Return virtual timestamp for scan query (#14001)
-rw-r--r-- | ydb/core/grpc_services/query/rpc_execute_query.cpp | 7 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp | 23 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 6 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scan/kqp_scan_ut.cpp | 51 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 1 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_query.proto | 2 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_table.proto | 2 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h | 11 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/src/client/table/impl/readers.cpp | 9 |
9 files changed, 97 insertions, 15 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index 489e606d28c..df1ac5f69b0 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -354,6 +354,13 @@ private: response->set_result_set_index(ev->Get()->Record.GetQueryResultIndex()); response->mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet()); + if (ev->Get()->Record.HasVirtualTimestamp()) { + auto snap = response->mutable_snapshot_timestamp(); + auto& ts = ev->Get()->Record.GetVirtualTimestamp(); + snap->set_plan_step(ts.GetStep()); + snap->set_tx_id(ts.GetTxId()); + } + TString out; Y_PROTOBUF_SUPPRESS_NODISCARD response->SerializeToString(&out); diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp index 6d2a6713be8..6d4e71d8bd7 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp @@ -358,27 +358,40 @@ private: ExecuterActorId_ = ev->Sender; } + auto& evRecord = ev->Get()->Record; + Ydb::Table::ExecuteScanQueryPartialResponse response; - response.set_status(StatusIds::SUCCESS); - response.mutable_result()->mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet()); + + { + response.set_status(StatusIds::SUCCESS); + auto result = response.mutable_result(); + result->mutable_result_set()->Swap(evRecord.MutableResultSet()); + + if (evRecord.HasVirtualTimestamp()) { + auto snap = result->mutable_snapshot(); + auto ts = evRecord.GetVirtualTimestamp(); + snap->set_plan_step(ts.GetStep()); + snap->set_tx_id(ts.GetTxId()); + } + } TString out; Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out); FlowControl_.PushResponse(out.size()); const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes(); - LastSeqNo_ = ev->Get()->Record.GetSeqNo(); + LastSeqNo_ = evRecord.GetSeqNo(); AckedFreeSpaceBytes_ = freeSpaceBytes; Request_->SendSerializedResult(std::move(out), StatusIds::SUCCESS); LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " Send stream data ack" - << ", seqNo: " << ev->Get()->Record.GetSeqNo() + << ", seqNo: " << evRecord.GetSeqNo() << ", freeSpace: " << freeSpaceBytes << ", to: " << ev->Sender << ", queue: " << FlowControl_.QueueSize()); - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(evRecord.GetSeqNo(), evRecord.GetChannelId()); resp->Record.SetFreeSpace(freeSpaceBytes); ctx.Send(ev->Sender, resp.Release()); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index c8dbcb3f163..de9f112f972 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -333,6 +333,12 @@ protected: streamEv->Record.SetSeqNo(computeData.Proto.GetSeqNo()); streamEv->Record.SetQueryResultIndex(*txResult.QueryResultIndex + StatementResultIndex); streamEv->Record.SetChannelId(channel.Id); + const auto& snap = GetSnapshot(); + if (snap.IsValid()) { + auto vt = streamEv->Record.MutableVirtualTimestamp(); + vt->SetStep(snap.Step); + vt->SetTxId(snap.TxId); + } TKqpProtoBuilder protoBuilder{*AppData()->FunctionRegistry}; protoBuilder.BuildYdbResultSet(*streamEv->Record.MutableResultSet(), std::move(batches), diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp index 9ece5942a08..fd196ac9834 100644 --- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp @@ -531,17 +531,52 @@ Y_UNIT_TEST_SUITE(KqpScan) { .Build() .Build(); - auto it = db.StreamExecuteScanQuery(R"( - DECLARE $key AS Uint64; - SELECT * FROM `/Root/EightShard` WHERE Key = $key; - )", params).GetValueSync(); + { + auto it = db.StreamExecuteScanQuery(R"( + DECLARE $key AS Uint64; - UNIT_ASSERT(it.IsSuccess()); + SELECT * FROM `/Root/EightShard` WHERE Key = $key; + )", params).GetValueSync(); - CompareYson(R"([ - [[1];[202u];["Value2"]] - ])", StreamResultToYson(it)); + UNIT_ASSERT(it.IsSuccess()); + + CompareYson(R"([ + [[1];[202u];["Value2"]] + ])", StreamResultToYson(it)); + } + + { + auto it = db.StreamExecuteScanQuery(R"( + DECLARE $key AS Uint64; + + SELECT * FROM `/Root/EightShard` WHERE Key = $key; + )", params).GetValueSync(); + + UNIT_ASSERT(it.IsSuccess()); + auto part = it.ReadNext().GetValueSync(); + UNIT_ASSERT(part.IsSuccess()); + + + UNIT_ASSERT(part.HasVirtualTimestamp()); + UNIT_ASSERT(part.GetVirtualTimestamp().GetStep() != 0); + UNIT_ASSERT(part.GetVirtualTimestamp().GetTxId() != 0); + } + + { + auto it = db.StreamExecuteScanQuery(R"( + SELECT * FROM `/Root/EightShard` WHERE Key = 9876554123; + )", params).GetValueSync(); + + UNIT_ASSERT(it.IsSuccess()); + auto part = it.ReadNext().GetValueSync(); + UNIT_ASSERT(part.IsSuccess()); + + + UNIT_ASSERT(part.HasVirtualTimestamp()); + UNIT_ASSERT(part.GetVirtualTimestamp().GetStep() != 0); + UNIT_ASSERT(part.GetVirtualTimestamp().GetTxId() != 0); + } } Y_UNIT_TEST(AggregateByColumn) { diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 3fed3c2fd4a..20a6b5a1015 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -512,6 +512,7 @@ message TEvExecuterStreamData { optional uint32 QueryResultIndex = 3; optional uint32 ChannelId = 4; optional NActorsProto.TActorId ChannelActorId = 5; + optional TKqpSnapshot VirtualTimestamp = 6; }; message TEvExecuterStreamDataAck { diff --git a/ydb/public/api/protos/ydb_query.proto b/ydb/public/api/protos/ydb_query.proto index b065520458c..1a4806f17b4 100644 --- a/ydb/public/api/protos/ydb_query.proto +++ b/ydb/public/api/protos/ydb_query.proto @@ -8,6 +8,7 @@ option java_outer_classname = "QueryProtos"; import "google/protobuf/duration.proto"; import "ydb/public/api/protos/annotations/validation.proto"; +import "ydb/public/api/protos/ydb_common.proto"; import "ydb/public/api/protos/ydb_issue_message.proto"; import "ydb/public/api/protos/ydb_operation.proto"; import "ydb/public/api/protos/ydb_query_stats.proto"; @@ -200,6 +201,7 @@ message ExecuteQueryResponsePart { Ydb.TableStats.QueryStats exec_stats = 5; TransactionMeta tx_meta = 6; + VirtualTimestamp snapshot_timestamp = 7; } message ExecuteScriptRequest { diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index 3d69ae628f0..9d29891b47e 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -1293,6 +1293,8 @@ message ExecuteScanQueryPartialResult { // works only in mode: MODE_EXPLAIN, // collects additional diagnostics about query compilation, including query plan and scheme string query_full_diagnostics = 7 [deprecated = true]; + // Optional snapshot that corresponds to the returned data + VirtualTimestamp snapshot = 8; } // Returns information about an external data source with a given path. diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h index 78d6980d036..0f8e9a07947 100644 --- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h @@ -2063,6 +2063,8 @@ private: uint64_t TxId_; }; +using TVirtualTimestamp = TReadTableSnapshot; + template<typename TPart> class TSimpleStreamPart : public TStreamPartStatus { public: @@ -2117,6 +2119,10 @@ public: const std::string& GetDiagnostics() const { return *Diagnostics_; } std::string&& ExtractDiagnostics() { return std::move(*Diagnostics_); } + bool HasVirtualTimestamp() const { return Vt_.has_value(); } + const TVirtualTimestamp& GetVirtualTimestamp() const { return *Vt_; } + TVirtualTimestamp&& ExtractVirtualTimestamp() { return std::move(*Vt_); } + TScanQueryPart(TStatus&& status) : TStreamPartStatus(std::move(status)) {} @@ -2127,17 +2133,20 @@ public: , Diagnostics_(diagnostics) {} - TScanQueryPart(TStatus&& status, TResultSet&& resultSet, const std::optional<TQueryStats>& queryStats, const std::optional<std::string>& diagnostics) + TScanQueryPart(TStatus&& status, TResultSet&& resultSet, const std::optional<TQueryStats>& queryStats, + const std::optional<std::string>& diagnostics, std::optional<TVirtualTimestamp>&& vt) : TStreamPartStatus(std::move(status)) , ResultSet_(std::move(resultSet)) , QueryStats_(queryStats) , Diagnostics_(diagnostics) + , Vt_(std::move(vt)) {} private: std::optional<TResultSet> ResultSet_; std::optional<TQueryStats> QueryStats_; std::optional<std::string> Diagnostics_; + std::optional<TVirtualTimestamp> Vt_; }; using TAsyncScanQueryPart = NThreading::TFuture<TScanQueryPart>; diff --git a/ydb/public/sdk/cpp/src/client/table/impl/readers.cpp b/ydb/public/sdk/cpp/src/client/table/impl/readers.cpp index 64735b3fa82..4a614b3010f 100644 --- a/ydb/public/sdk/cpp/src/client/table/impl/readers.cpp +++ b/ydb/public/sdk/cpp/src/client/table/impl/readers.cpp @@ -89,9 +89,16 @@ TAsyncScanQueryPart TScanQueryPartIterator::TReaderImpl::ReadNext(std::shared_pt diagnostics = self->Response_.result().query_full_diagnostics(); + std::optional<TVirtualTimestamp> vt; + + if (self->Response_.result().has_snapshot()) { + const auto& snap = self->Response_.result().snapshot(); + vt = TVirtualTimestamp(snap.plan_step(), snap.tx_id()); + } + if (self->Response_.result().has_result_set()) { promise.SetValue({std::move(status), - TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())), queryStats, diagnostics}); + TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())), queryStats, diagnostics, std::move(vt)}); } else { promise.SetValue({std::move(status), queryStats, diagnostics}); } |