aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-11-01 20:43:32 +0300
committersnaury <snaury@ydb.tech>2022-11-01 20:43:32 +0300
commit142474395b5618d8074e23c11321eeb8b575bade (patch)
tree3b95970cd32149aab72f26d6d0c2727484e8b35a
parent9292fd6cefee14107e345c52c309a4b8f270d0d3 (diff)
downloadydb-142474395b5618d8074e23c11321eeb8b575bade.tar.gz
Include schema in empty read table result sets
-rw-r--r--ydb/core/grpc_services/rpc_read_table.cpp21
-rw-r--r--ydb/core/tx/tx_proxy/read_table.h1
-rw-r--r--ydb/core/tx/tx_proxy/read_table_impl.cpp65
-rw-r--r--ydb/services/ydb/ydb_table_ut.cpp11
-rw-r--r--ydb/tests/functional/api/test_read_table.py3
5 files changed, 80 insertions, 21 deletions
diff --git a/ydb/core/grpc_services/rpc_read_table.cpp b/ydb/core/grpc_services/rpc_read_table.cpp
index aacc9065e3..010b5e7cf3 100644
--- a/ydb/core/grpc_services/rpc_read_table.cpp
+++ b/ydb/core/grpc_services/rpc_read_table.cpp
@@ -42,19 +42,6 @@ static ui64 CalcRuConsumption(const TString& data) {
return mb * 128; // 128 ru for 1 MiB
}
-static void NullSerializeReadTableResponse(Ydb::StatusIds::StatusCode status, ui64 planStep, ui64 txId, TString* output) {
- Ydb::Impl::ReadTableResponse readTableResponse;
- readTableResponse.set_status(status);
-
- if (planStep && txId) {
- auto* snapshot = readTableResponse.mutable_snapshot();
- snapshot->set_plan_step(planStep);
- snapshot->set_tx_id(txId);
- }
-
- Y_PROTOBUF_SUPPRESS_NODISCARD readTableResponse.SerializeToString(output);
-}
-
static void NullSerializeReadTableResponse(const TString& input, Ydb::StatusIds::StatusCode status, ui64 planStep, ui64 txId, TString* output) {
Ydb::Impl::ReadTableResponse readTableResponse;
readTableResponse.set_status(status);
@@ -516,6 +503,7 @@ private:
settings.Owner = SelfId();
settings.TablePath = req->path();
settings.Ordered = req->ordered();
+ settings.RequireResultSet = true;
if (req->row_limit()) {
settings.MaxRows = req->row_limit();
}
@@ -576,11 +564,6 @@ private:
TString out;
NullSerializeReadTableResponse(message, status, &out);
Request_->SendSerializedResult(std::move(out), status);
- } else if (!SentSerializedResult && PlanStep && TxId) {
- // Send an empty result with the snapshot
- TString out;
- NullSerializeReadTableResponse(status, PlanStep, TxId, &out);
- Request_->SendSerializedResult(std::move(out), status);
}
Request_->FinishStream();
LOG_NOTICE_S(ctx, NKikimrServices::READ_TABLE_API,
@@ -747,7 +730,6 @@ private:
}
Request_->SendSerializedResult(std::move(out), StatusIds::SUCCESS);
- SentSerializedResult = true;
LeftInGRpcAdaptorQueue_++;
if (LeftInGRpcAdaptorQueue_ > QuotaLimit_) {
@@ -792,7 +774,6 @@ private:
};
TDeque<TBuffEntry> SendBuffer_;
bool HasPendingSuccess = false;
- bool SentSerializedResult = false;
};
void DoReadTableRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider &) {
diff --git a/ydb/core/tx/tx_proxy/read_table.h b/ydb/core/tx/tx_proxy/read_table.h
index a48d1d29a4..0c7e30017b 100644
--- a/ydb/core/tx/tx_proxy/read_table.h
+++ b/ydb/core/tx/tx_proxy/read_table.h
@@ -26,6 +26,7 @@ namespace NTxProxy {
TString UserToken;
EReadTableFormat DataFormat = EReadTableFormat::YdbResultSet;
bool Ordered = false;
+ bool RequireResultSet = false;
};
IActor* CreateReadTableSnapshotWorker(const TReadTableSettings& settings);
diff --git a/ydb/core/tx/tx_proxy/read_table_impl.cpp b/ydb/core/tx/tx_proxy/read_table_impl.cpp
index 7cb9a0e0a6..8356fda135 100644
--- a/ydb/core/tx/tx_proxy/read_table_impl.cpp
+++ b/ydb/core/tx/tx_proxy/read_table_impl.cpp
@@ -1697,6 +1697,7 @@ private:
TxProxyMon->ReportStatusStreamData->Inc();
ctx.Send(Settings.Owner, x.Release(), 0, Settings.Cookie);
+ SentResultSet = true;
if (state.QuotaReserved > 0) {
Y_VERIFY(Quota.Reserved > 0 && Quota.Allocated > 0);
@@ -1714,6 +1715,65 @@ private:
ProcessQuotaRequests(ctx);
}
+ void SendEmptyResponseData(const TActorContext& ctx) {
+ TString data;
+ ui32 apiVersion = 0;
+
+ switch (Settings.DataFormat) {
+ case EReadTableFormat::OldResultSet:
+ // we don't support empty result sets
+ return;
+
+ case EReadTableFormat::YdbResultSet: {
+ Ydb::ResultSet res;
+ for (auto& col : Columns) {
+ auto* meta = res.add_columns();
+ meta->set_name(col.Name);
+
+ if (col.PType.GetTypeId() == NScheme::NTypeIds::Pg) {
+ auto pgType = meta->mutable_type()->mutable_optional_type()->mutable_item()
+ ->mutable_pg_type();
+ pgType->set_oid(NPg::PgTypeIdFromTypeDesc(col.PType.GetTypeDesc()));
+ } else {
+ auto id = static_cast<NYql::NProto::TypeIds>(col.PType.GetTypeId());
+ if (id == NYql::NProto::Decimal) {
+ auto decimalType = meta->mutable_type()->mutable_optional_type()->mutable_item()
+ ->mutable_decimal_type();
+ //TODO: Pass decimal params here
+ decimalType->set_precision(22);
+ decimalType->set_scale(9);
+ } else {
+ meta->mutable_type()->mutable_optional_type()->mutable_item()
+ ->set_type_id(static_cast<Ydb::Type::PrimitiveTypeId>(id));
+ }
+ }
+ }
+ bool ok = res.SerializeToString(&data);
+ Y_VERIFY(ok, "Unexpected failure to serialize Ydb::ResultSet");
+ apiVersion = NKikimrTxUserProxy::TReadTableTransaction::YDB_V1;
+ break;
+ }
+ }
+
+ auto x = MakeHolder<TEvTxUserProxy::TEvProposeTransactionStatus>(TEvTxUserProxy::TResultStatus::ExecResponseData);
+ x->Record.SetStatusCode(NKikimrIssues::TStatusIds::TRANSIENT);
+
+ if (PlanStep) {
+ x->Record.SetStep(PlanStep);
+ }
+ x->Record.SetTxId(TxId);
+
+ x->Record.SetSerializedReadTableResponse(data);
+ x->Record.SetReadTableResponseVersion(apiVersion);
+
+ // N.B. we use shard id 0 for virtualized quota management
+ x->Record.SetDataShardTabletId(0);
+
+ TxProxyMon->ReportStatusStreamData->Inc();
+ ctx.Send(Settings.Owner, x.Release(), 0, Settings.Cookie);
+ SentResultSet = true;
+ }
+
void ProcessStreamComplete(TShardState& state, TEvDataShard::TEvProposeTransactionResult::TPtr&, const TActorContext& ctx) {
TxProxyMon->TxResultComplete->Inc();
@@ -1741,6 +1801,9 @@ private:
if (ShardList.empty()) {
// There are no shards left to stream
+ if (!SentResultSet && Settings.RequireResultSet) {
+ SendEmptyResponseData(ctx);
+ }
return ReplyAndDie(TEvTxUserProxy::TResultStatus::ExecComplete, NKikimrIssues::TStatusIds::SUCCESS, ctx);
}
} else {
@@ -2875,6 +2938,8 @@ private:
bool ResolveShardsScheduled = false;
+ bool SentResultSet = false;
+
ui64 RemainingRows = Max<ui64>();
TQuotaState Quota;
diff --git a/ydb/services/ydb/ydb_table_ut.cpp b/ydb/services/ydb/ydb_table_ut.cpp
index d1b81d41b4..16faebbed7 100644
--- a/ydb/services/ydb/ydb_table_ut.cpp
+++ b/ydb/services/ydb/ydb_table_ut.cpp
@@ -2165,6 +2165,17 @@ R"___(<main>: Error: Transaction not found: , code: 2015
UNIT_ASSERT_VALUES_EQUAL(bool(streamPart.GetSnapshot()), true);
UNIT_ASSERT_GT(streamPart.GetSnapshot()->GetStep(), 0u);
UNIT_ASSERT_GT(streamPart.GetSnapshot()->GetTxId(), 0u);
+
+ TResultSetParser parser(streamPart.GetPart());
+ UNIT_ASSERT_VALUES_EQUAL(parser.ColumnsCount(), 2u);
+ UNIT_ASSERT_VALUES_EQUAL(parser.RowsCount(), 0u);
+ UNIT_ASSERT_VALUES_EQUAL(parser.ColumnIndex("Key"), 0);
+ UNIT_ASSERT_VALUES_EQUAL(parser.ColumnIndex("Value"), 1);
+ UNIT_ASSERT(!parser.TryNextRow());
+
+ TReadTableResultPart lastPart = it.ReadNext().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(lastPart.IsSuccess(), false);
+ UNIT_ASSERT_VALUES_EQUAL(lastPart.EOS(), true);
}
Y_UNIT_TEST(RetryOperation) {
diff --git a/ydb/tests/functional/api/test_read_table.py b/ydb/tests/functional/api/test_read_table.py
index 35c4877b78..8b001e175e 100644
--- a/ydb/tests/functional/api/test_read_table.py
+++ b/ydb/tests/functional/api/test_read_table.py
@@ -259,11 +259,12 @@ class TestReadTableSuccessStories(AbstractReadTableTest):
chunks = 0
for chunk in stream:
+ assert chunk.columns
assert not chunk.rows
assert chunk.snapshot is not None
chunks += 1
- assert chunks > 0
+ assert chunks == 1
class TestReadTableTruncatedResults(AbstractReadTableTest):