diff options
author | snaury <snaury@ydb.tech> | 2022-11-01 20:43:32 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-11-01 20:43:32 +0300 |
commit | 142474395b5618d8074e23c11321eeb8b575bade (patch) | |
tree | 3b95970cd32149aab72f26d6d0c2727484e8b35a | |
parent | 9292fd6cefee14107e345c52c309a4b8f270d0d3 (diff) | |
download | ydb-142474395b5618d8074e23c11321eeb8b575bade.tar.gz |
Include schema in empty read table result sets
-rw-r--r-- | ydb/core/grpc_services/rpc_read_table.cpp | 21 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/read_table.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/read_table_impl.cpp | 65 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_table_ut.cpp | 11 | ||||
-rw-r--r-- | ydb/tests/functional/api/test_read_table.py | 3 |
5 files changed, 80 insertions, 21 deletions
diff --git a/ydb/core/grpc_services/rpc_read_table.cpp b/ydb/core/grpc_services/rpc_read_table.cpp index aacc9065e3..010b5e7cf3 100644 --- a/ydb/core/grpc_services/rpc_read_table.cpp +++ b/ydb/core/grpc_services/rpc_read_table.cpp @@ -42,19 +42,6 @@ static ui64 CalcRuConsumption(const TString& data) { return mb * 128; // 128 ru for 1 MiB } -static void NullSerializeReadTableResponse(Ydb::StatusIds::StatusCode status, ui64 planStep, ui64 txId, TString* output) { - Ydb::Impl::ReadTableResponse readTableResponse; - readTableResponse.set_status(status); - - if (planStep && txId) { - auto* snapshot = readTableResponse.mutable_snapshot(); - snapshot->set_plan_step(planStep); - snapshot->set_tx_id(txId); - } - - Y_PROTOBUF_SUPPRESS_NODISCARD readTableResponse.SerializeToString(output); -} - static void NullSerializeReadTableResponse(const TString& input, Ydb::StatusIds::StatusCode status, ui64 planStep, ui64 txId, TString* output) { Ydb::Impl::ReadTableResponse readTableResponse; readTableResponse.set_status(status); @@ -516,6 +503,7 @@ private: settings.Owner = SelfId(); settings.TablePath = req->path(); settings.Ordered = req->ordered(); + settings.RequireResultSet = true; if (req->row_limit()) { settings.MaxRows = req->row_limit(); } @@ -576,11 +564,6 @@ private: TString out; NullSerializeReadTableResponse(message, status, &out); Request_->SendSerializedResult(std::move(out), status); - } else if (!SentSerializedResult && PlanStep && TxId) { - // Send an empty result with the snapshot - TString out; - NullSerializeReadTableResponse(status, PlanStep, TxId, &out); - Request_->SendSerializedResult(std::move(out), status); } Request_->FinishStream(); LOG_NOTICE_S(ctx, NKikimrServices::READ_TABLE_API, @@ -747,7 +730,6 @@ private: } Request_->SendSerializedResult(std::move(out), StatusIds::SUCCESS); - SentSerializedResult = true; LeftInGRpcAdaptorQueue_++; if (LeftInGRpcAdaptorQueue_ > QuotaLimit_) { @@ -792,7 +774,6 @@ private: }; TDeque<TBuffEntry> SendBuffer_; bool HasPendingSuccess = false; - bool SentSerializedResult = false; }; void DoReadTableRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider &) { diff --git a/ydb/core/tx/tx_proxy/read_table.h b/ydb/core/tx/tx_proxy/read_table.h index a48d1d29a4..0c7e30017b 100644 --- a/ydb/core/tx/tx_proxy/read_table.h +++ b/ydb/core/tx/tx_proxy/read_table.h @@ -26,6 +26,7 @@ namespace NTxProxy { TString UserToken; EReadTableFormat DataFormat = EReadTableFormat::YdbResultSet; bool Ordered = false; + bool RequireResultSet = false; }; IActor* CreateReadTableSnapshotWorker(const TReadTableSettings& settings); diff --git a/ydb/core/tx/tx_proxy/read_table_impl.cpp b/ydb/core/tx/tx_proxy/read_table_impl.cpp index 7cb9a0e0a6..8356fda135 100644 --- a/ydb/core/tx/tx_proxy/read_table_impl.cpp +++ b/ydb/core/tx/tx_proxy/read_table_impl.cpp @@ -1697,6 +1697,7 @@ private: TxProxyMon->ReportStatusStreamData->Inc(); ctx.Send(Settings.Owner, x.Release(), 0, Settings.Cookie); + SentResultSet = true; if (state.QuotaReserved > 0) { Y_VERIFY(Quota.Reserved > 0 && Quota.Allocated > 0); @@ -1714,6 +1715,65 @@ private: ProcessQuotaRequests(ctx); } + void SendEmptyResponseData(const TActorContext& ctx) { + TString data; + ui32 apiVersion = 0; + + switch (Settings.DataFormat) { + case EReadTableFormat::OldResultSet: + // we don't support empty result sets + return; + + case EReadTableFormat::YdbResultSet: { + Ydb::ResultSet res; + for (auto& col : Columns) { + auto* meta = res.add_columns(); + meta->set_name(col.Name); + + if (col.PType.GetTypeId() == NScheme::NTypeIds::Pg) { + auto pgType = meta->mutable_type()->mutable_optional_type()->mutable_item() + ->mutable_pg_type(); + pgType->set_oid(NPg::PgTypeIdFromTypeDesc(col.PType.GetTypeDesc())); + } else { + auto id = static_cast<NYql::NProto::TypeIds>(col.PType.GetTypeId()); + if (id == NYql::NProto::Decimal) { + auto decimalType = meta->mutable_type()->mutable_optional_type()->mutable_item() + ->mutable_decimal_type(); + //TODO: Pass decimal params here + decimalType->set_precision(22); + decimalType->set_scale(9); + } else { + meta->mutable_type()->mutable_optional_type()->mutable_item() + ->set_type_id(static_cast<Ydb::Type::PrimitiveTypeId>(id)); + } + } + } + bool ok = res.SerializeToString(&data); + Y_VERIFY(ok, "Unexpected failure to serialize Ydb::ResultSet"); + apiVersion = NKikimrTxUserProxy::TReadTableTransaction::YDB_V1; + break; + } + } + + auto x = MakeHolder<TEvTxUserProxy::TEvProposeTransactionStatus>(TEvTxUserProxy::TResultStatus::ExecResponseData); + x->Record.SetStatusCode(NKikimrIssues::TStatusIds::TRANSIENT); + + if (PlanStep) { + x->Record.SetStep(PlanStep); + } + x->Record.SetTxId(TxId); + + x->Record.SetSerializedReadTableResponse(data); + x->Record.SetReadTableResponseVersion(apiVersion); + + // N.B. we use shard id 0 for virtualized quota management + x->Record.SetDataShardTabletId(0); + + TxProxyMon->ReportStatusStreamData->Inc(); + ctx.Send(Settings.Owner, x.Release(), 0, Settings.Cookie); + SentResultSet = true; + } + void ProcessStreamComplete(TShardState& state, TEvDataShard::TEvProposeTransactionResult::TPtr&, const TActorContext& ctx) { TxProxyMon->TxResultComplete->Inc(); @@ -1741,6 +1801,9 @@ private: if (ShardList.empty()) { // There are no shards left to stream + if (!SentResultSet && Settings.RequireResultSet) { + SendEmptyResponseData(ctx); + } return ReplyAndDie(TEvTxUserProxy::TResultStatus::ExecComplete, NKikimrIssues::TStatusIds::SUCCESS, ctx); } } else { @@ -2875,6 +2938,8 @@ private: bool ResolveShardsScheduled = false; + bool SentResultSet = false; + ui64 RemainingRows = Max<ui64>(); TQuotaState Quota; diff --git a/ydb/services/ydb/ydb_table_ut.cpp b/ydb/services/ydb/ydb_table_ut.cpp index d1b81d41b4..16faebbed7 100644 --- a/ydb/services/ydb/ydb_table_ut.cpp +++ b/ydb/services/ydb/ydb_table_ut.cpp @@ -2165,6 +2165,17 @@ R"___(<main>: Error: Transaction not found: , code: 2015 UNIT_ASSERT_VALUES_EQUAL(bool(streamPart.GetSnapshot()), true); UNIT_ASSERT_GT(streamPart.GetSnapshot()->GetStep(), 0u); UNIT_ASSERT_GT(streamPart.GetSnapshot()->GetTxId(), 0u); + + TResultSetParser parser(streamPart.GetPart()); + UNIT_ASSERT_VALUES_EQUAL(parser.ColumnsCount(), 2u); + UNIT_ASSERT_VALUES_EQUAL(parser.RowsCount(), 0u); + UNIT_ASSERT_VALUES_EQUAL(parser.ColumnIndex("Key"), 0); + UNIT_ASSERT_VALUES_EQUAL(parser.ColumnIndex("Value"), 1); + UNIT_ASSERT(!parser.TryNextRow()); + + TReadTableResultPart lastPart = it.ReadNext().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(lastPart.IsSuccess(), false); + UNIT_ASSERT_VALUES_EQUAL(lastPart.EOS(), true); } Y_UNIT_TEST(RetryOperation) { diff --git a/ydb/tests/functional/api/test_read_table.py b/ydb/tests/functional/api/test_read_table.py index 35c4877b78..8b001e175e 100644 --- a/ydb/tests/functional/api/test_read_table.py +++ b/ydb/tests/functional/api/test_read_table.py @@ -259,11 +259,12 @@ class TestReadTableSuccessStories(AbstractReadTableTest): chunks = 0 for chunk in stream: + assert chunk.columns assert not chunk.rows assert chunk.snapshot is not None chunks += 1 - assert chunks > 0 + assert chunks == 1 class TestReadTableTruncatedResults(AbstractReadTableTest): |