aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <spuchin@ydb.tech>2023-04-27 14:45:30 +0300
committerspuchin <spuchin@ydb.tech>2023-04-27 14:45:30 +0300
commitf5b51eeaedd79fd606dea2d586486613cb9f49b5 (patch)
tree2c6dee1f019a5f9e22d8646d40b1767ba15cf035
parent4109a099c3cabf7603b30db9a86a5c787d72a4c2 (diff)
downloadydb-f5b51eeaedd79fd606dea2d586486613cb9f49b5.tar.gz
Support multiple resulta in QueryService. ()
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_query.cpp58
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp11
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h25
-rw-r--r--ydb/core/kqp/executer_actor/kqp_result_channel.cpp11
-rw-r--r--ydb/core/kqp/executer_actor/kqp_result_channel.h2
-rw-r--r--ydb/core/kqp/host/kqp_runner.cpp6
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp16
-rw-r--r--ydb/core/kqp/query_data/kqp_prepared_query.cpp8
-rw-r--r--ydb/core/kqp/query_data/kqp_prepared_query.h1
-rw-r--r--ydb/core/kqp/query_data/kqp_query_data.h5
-rw-r--r--ydb/core/kqp/ut/service/kqp_query_service_ut.cpp34
-rw-r--r--ydb/core/protos/kqp.proto1
-rw-r--r--ydb/core/protos/kqp_physical.proto1
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp10
14 files changed, 119 insertions, 70 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp
index 3be6197b78b..0bc431d13ca 100644
--- a/ydb/core/grpc_services/query/rpc_execute_query.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp
@@ -20,9 +20,14 @@ using namespace NActors;
using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteQueryRequest,
Ydb::Query::ExecuteQueryResponsePart>;
-class RpcFlowControlState {
+struct TProducerState {
+ TMaybe<ui64> LastSeqNo;
+ ui64 AckedFreeSpaceBytes = 0;
+};
+
+class TRpcFlowControlState {
public:
- RpcFlowControlState(ui64 inflightLimitBytes)
+ TRpcFlowControlState(ui64 inflightLimitBytes)
: InflightLimitBytes_(inflightLimitBytes) {}
void PushResponse(ui64 responseSizeBytes) {
@@ -150,41 +155,46 @@ private:
}
ui64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
- if (ResumeWithSeqNo_ && freeSpaceBytes > 0) {
- LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Resume execution, "
- << ", seqNo: " << *ResumeWithSeqNo_
- << ", freeSpace: " << freeSpaceBytes
- << ", executer: " << ExecuterActorId_);
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
- resp->Record.SetSeqNo(*ResumeWithSeqNo_);
- resp->Record.SetFreeSpace(freeSpaceBytes);
+ for (auto& pair : StreamProducers_) {
+ const auto& producerId = pair.first;
+ auto& producer = pair.second;
+
+ if (freeSpaceBytes > 0 && producer.LastSeqNo && producer.AckedFreeSpaceBytes == 0) {
+ LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Resume execution, "
+ << ", producer: " << producerId
+ << ", seqNo: " << producer.LastSeqNo
+ << ", freeSpace: " << freeSpaceBytes);
+
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ resp->Record.SetSeqNo(*producer.LastSeqNo);
+ resp->Record.SetFreeSpace(freeSpaceBytes);
- ctx.Send(ExecuterActorId_, resp.Release());
+ ctx.Send(producerId, resp.Release());
- ResumeWithSeqNo_.Clear();
+ producer.AckedFreeSpaceBytes = freeSpaceBytes;
+ }
}
+
}
void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) {
- if (!ExecuterActorId_) {
- ExecuterActorId_ = ev->Sender;
- }
-
Ydb::Query::ExecuteQueryResponsePart response;
response.set_status(Ydb::StatusIds::SUCCESS);
- response.set_result_set_index(0);
+ response.set_result_set_index(ev->Get()->Record.GetQueryResultIndex());
response.mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet());
TString out;
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
+ FlowControl_.PushResponse(out.size());
+ auto freeSpaceBytes = FlowControl_.FreeSpaceBytes();
+
Request_->SendSerializedResult(std::move(out), Ydb::StatusIds::SUCCESS);
- auto freeSpaceBytes = FlowControl_.FreeSpaceBytes();
- if (freeSpaceBytes == 0) {
- ResumeWithSeqNo_ = ev->Get()->Record.GetSeqNo();
- }
+ auto& producer = StreamProducers_[ev->Sender];
+ producer.LastSeqNo = ev->Get()->Record.GetSeqNo();
+ producer.AckedFreeSpaceBytes = freeSpaceBytes;
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Send stream data ack"
<< ", seqNo: " << ev->Get()->Record.GetSeqNo()
@@ -272,10 +282,8 @@ private:
private:
std::unique_ptr<TEvExecuteQueryRequest> Request_;
- RpcFlowControlState FlowControl_;
- TMaybe<ui64> ResumeWithSeqNo_;
-
- TActorId ExecuterActorId_;
+ TRpcFlowControlState FlowControl_;
+ TMap<TActorId, TProducerState> StreamProducers_;
};
} // namespace
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index 53345ddf3b8..b0f517dab62 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -17,9 +17,14 @@ using namespace NYql;
void TEvKqpExecuter::TEvTxResponse::InitTxResult(const TKqpPhyTxHolder::TConstPtr& tx) {
TxHolders.push_back(tx);
- TxResults.reserve(TxResults.size() + tx->GetTxResultsMeta().size());
- for (const auto& txResult : tx->GetTxResultsMeta()) {
- TxResults.emplace_back(txResult.IsStream, txResult.MkqlItemType, &txResult.ColumnOrder);
+ TxResults.reserve(TxResults.size() + tx->ResultsSize());
+
+ for (ui32 i = 0; i < tx->ResultsSize(); ++i) {
+ const auto& result = tx->GetResults(i);
+ const auto& resultMeta = tx->GetTxResultsMeta()[i];
+
+ TxResults.emplace_back(result.GetIsStream(), resultMeta.MkqlItemType, &resultMeta.ColumnOrder,
+ result.GetQueryResultIndex());
}
}
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index c78eb86c075..554ccd20458 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -960,24 +960,19 @@ protected:
}
IActor* CreateChannelProxy(const NYql::NDq::TChannel& channel) {
- IActor* proxy;
+ auto channelIt = ResultChannelProxies.find(channel.Id);
+ if (channelIt != ResultChannelProxies.end()) {
+ return channelIt->second;
+ }
- if (ResponseEv->TxResults[0].IsStream) {
- if (!ResultChannelProxies.empty()) {
- return ResultChannelProxies.begin()->second;
- }
+ YQL_ENSURE(channel.DstInputIndex < ResponseEv->ResultsSize());
+ const auto& txResult = ResponseEv->TxResults[channel.DstInputIndex];
- proxy = CreateResultStreamChannelProxy(TxId, channel.Id, ResponseEv->TxResults[0].MkqlItemType,
- ResponseEv->TxResults[0].ColumnOrder, Target, Stats.get(), this->SelfId());
+ IActor* proxy;
+ if (txResult.IsStream) {
+ proxy = CreateResultStreamChannelProxy(TxId, channel.Id, txResult.MkqlItemType,
+ txResult.ColumnOrder, txResult.QueryResultIndex, Target, Stats.get(), this->SelfId());
} else {
- YQL_ENSURE(channel.DstInputIndex < ResponseEv->ResultsSize());
-
- auto channelIt = ResultChannelProxies.find(channel.Id);
-
- if (channelIt != ResultChannelProxies.end()) {
- return channelIt->second;
- }
-
proxy = CreateResultDataChannelProxy(TxId, channel.Id, Stats.get(), this->SelfId(),
channel.DstInputIndex, ResponseEv.get());
}
diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp
index 3d83981ca7e..45e191a220d 100644
--- a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp
@@ -120,11 +120,12 @@ private:
class TResultStreamChannelProxy : public TResultCommonChannelProxy {
public:
TResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType,
- const TVector<ui32>* columnOrder, TActorId target, TQueryExecutionStats* stats,
+ const TVector<ui32>* columnOrder, ui32 queryResultIndex, TActorId target, TQueryExecutionStats* stats,
TActorId executer)
: TResultCommonChannelProxy(txId, channelId, stats, executer)
, ColumnOrder(columnOrder)
, ItemType(itemType)
+ , QueryResultIndex(queryResultIndex)
, Target(target) {}
private:
@@ -138,6 +139,7 @@ private:
auto streamEv = MakeHolder<TEvKqpExecuter::TEvStreamData>();
streamEv->Record.SetSeqNo(computeData.GetSeqNo());
+ streamEv->Record.SetQueryResultIndex(QueryResultIndex);
streamEv->Record.MutableResultSet()->Swap(&resultSet);
LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER,
@@ -149,6 +151,7 @@ private:
private:
const TVector<ui32>* ColumnOrder;
NKikimr::NMiniKQL::TType* ItemType;
+ ui32 QueryResultIndex = 0;
const NActors::TActorId Target;
};
@@ -184,14 +187,16 @@ private:
} // anonymous namespace end
NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType,
- const TVector<ui32>* columnOrder, TActorId target, TQueryExecutionStats* stats, TActorId executer)
+ const TVector<ui32>* columnOrder, ui32 queryResultIndex, TActorId target, TQueryExecutionStats* stats,
+ TActorId executer)
{
LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER,
"CreateResultStreamChannelProxy: TxId: " << txId <<
", channelId: " << channelId
);
- return new TResultStreamChannelProxy(txId, channelId, itemType, columnOrder, target, stats, executer);
+ return new TResultStreamChannelProxy(txId, channelId, itemType, columnOrder, queryResultIndex, target,
+ stats, executer);
}
NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId,
diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.h b/ydb/core/kqp/executer_actor/kqp_result_channel.h
index 5cc8c54fb00..9589b1b267e 100644
--- a/ydb/core/kqp/executer_actor/kqp_result_channel.h
+++ b/ydb/core/kqp/executer_actor/kqp_result_channel.h
@@ -26,7 +26,7 @@ struct TQueryExecutionStats;
struct TKqpExecuterTxResult;
NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType,
- const TVector<ui32>* columnOrder, NActors::TActorId target, TQueryExecutionStats* stats,
+ const TVector<ui32>* columnOrder, ui32 queryResultIndex, NActors::TActorId target, TQueryExecutionStats* stats,
NActors::TActorId executer);
NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId, TQueryExecutionStats* stats,
diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp
index 6483d705600..02c4378e059 100644
--- a/ydb/core/kqp/host/kqp_runner.cpp
+++ b/ydb/core/kqp/host/kqp_runner.cpp
@@ -185,13 +185,9 @@ public:
YQL_ENSURE(TMaybeNode<TKiDataQueryBlocks>(query));
TKiDataQueryBlocks dataQueryBlocks(query);
+ YQL_ENSURE(dataQueryBlocks.ArgCount() == 1);
const auto& queryBlock = dataQueryBlocks.Arg(0);
- if (queryBlock.Results().Size() != 1) {
- ctx.AddError(YqlIssue(ctx.GetPosition(dataQueryBlocks.Pos()), TIssuesIds::KIKIMR_PRECONDITION_FAILED,
- "Multiple result sets not yet supported."));
- return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues()));
- }
if (queryBlock.Effects().ArgCount() > 0) {
ctx.AddError(YqlIssue(ctx.GetPosition(dataQueryBlocks.Pos()), TIssuesIds::KIKIMR_PRECONDITION_FAILED,
"Data modifications not yet supported."));
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index 969fa631a49..edb8445e1d3 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -485,21 +485,25 @@ public:
CompileTransaction(tx, *queryProto.AddTransactions(), ctx);
}
- for (const auto& result : query.Results()) {
+ for (ui32 i = 0; i < query.Results().Size(); ++i) {
+ const auto& result = query.Results().Item(i);
+
YQL_ENSURE(result.Maybe<TKqpTxResultBinding>());
auto binding = result.Cast<TKqpTxResultBinding>();
-
auto txIndex = FromString<ui32>(binding.TxIndex().Value());
- auto resultIndex = FromString<ui32>(binding.ResultIndex());
+ auto txResultIndex = FromString<ui32>(binding.ResultIndex());
YQL_ENSURE(txIndex < queryProto.TransactionsSize());
- YQL_ENSURE(resultIndex < queryProto.GetTransactions(txIndex).ResultsSize());
- YQL_ENSURE(queryProto.GetTransactions(txIndex).GetResults(resultIndex).GetIsStream());
+ YQL_ENSURE(txResultIndex < queryProto.GetTransactions(txIndex).ResultsSize());
+ auto& txResult = *queryProto.MutableTransactions(txIndex)->MutableResults(txResultIndex);
+
+ YQL_ENSURE(txResult.GetIsStream());
+ txResult.SetQueryResultIndex(i);
auto& queryBindingProto = *queryProto.AddResultBindings();
auto& txBindingProto = *queryBindingProto.MutableTxResultBinding();
txBindingProto.SetTxIndex(txIndex);
- txBindingProto.SetResultIndex(resultIndex);
+ txBindingProto.SetResultIndex(txResultIndex);
}
return true;
diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.cpp b/ydb/core/kqp/query_data/kqp_prepared_query.cpp
index 1366f93cc8f..a0ac81cea07 100644
--- a/ydb/core/kqp/query_data/kqp_prepared_query.cpp
+++ b/ydb/core/kqp/query_data/kqp_prepared_query.cpp
@@ -68,10 +68,10 @@ TKqpPhyTxHolder::TKqpPhyTxHolder(const std::shared_ptr<const NKikimrKqp::TPrepar
}
}
- ui32 i = 0;
- for (const auto& txResult : Proto->GetResults()) {
- auto& result = TxResultsMeta[i++];
- result.IsStream = txResult.GetIsStream();
+ for (ui32 i = 0; i < Proto->ResultsSize(); ++i) {
+ const auto& txResult = Proto->GetResults(i);
+ auto& result = TxResultsMeta[i];
+
result.MkqlItemType = ImportTypeFromProto(txResult.GetItemType(), Alloc->TypeEnv);
if (txResult.ColumnHintsSize() > 0) {
result.ColumnOrder.reserve(txResult.GetColumnHints().size());
diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.h b/ydb/core/kqp/query_data/kqp_prepared_query.h
index e4b71443ee6..f1a01bc091a 100644
--- a/ydb/core/kqp/query_data/kqp_prepared_query.h
+++ b/ydb/core/kqp/query_data/kqp_prepared_query.h
@@ -26,7 +26,6 @@ namespace NKikimr::NKqp {
class TPreparedQueryAllocHolder;
struct TPhyTxResultMetadata {
- bool IsStream = false;
NKikimr::NMiniKQL::TType* MkqlItemType;
TVector<ui32> ColumnOrder;
};
diff --git a/ydb/core/kqp/query_data/kqp_query_data.h b/ydb/core/kqp/query_data/kqp_query_data.h
index 05e4e669fd8..669d77eccbc 100644
--- a/ydb/core/kqp/query_data/kqp_query_data.h
+++ b/ydb/core/kqp/query_data/kqp_query_data.h
@@ -59,15 +59,18 @@ struct TKqpExecuterTxResult {
bool IsStream = true;
NKikimr::NMiniKQL::TType* MkqlItemType;
const TVector<ui32>* ColumnOrder = nullptr;
+ ui32 QueryResultIndex = 0;
NKikimr::NMiniKQL::TUnboxedValueVector Rows;
explicit TKqpExecuterTxResult(
bool isStream,
NKikimr::NMiniKQL::TType* mkqlItemType,
- const TVector<ui32>* сolumnOrder = nullptr)
+ const TVector<ui32>* сolumnOrder,
+ ui32 queryResultIndex)
: IsStream(isStream)
, MkqlItemType(mkqlItemType)
, ColumnOrder(сolumnOrder)
+ , QueryResultIndex(queryResultIndex)
{}
TTypedUnboxedValue GetUV(const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
index 011dfbac23a..033316a40ac 100644
--- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
@@ -98,6 +98,40 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
CompareYson(R"([[24u]])", FormatResultSetYson(result.GetResultSet(0)));
}
+ Y_UNIT_TEST(ExecuteQueryMultiResult) {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+
+ auto result = db.ExecuteQuery(R"(
+ SELECT * FROM EightShard WHERE Text = "Value2" AND Data = 1 ORDER BY Key;
+ SELECT * FROM TwoShard WHERE Key < 10 ORDER BY Key;
+ )").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ CompareYson(R"([
+ [[1];[202u];["Value2"]];
+ [[1];[502u];["Value2"]];
+ [[1];[802u];["Value2"]]])", FormatResultSetYson(result.GetResultSet(0)));
+ CompareYson(R"([
+ [[1u];["One"];[-1]];
+ [[2u];["Two"];[0]];
+ [[3u];["Three"];[1]]])", FormatResultSetYson(result.GetResultSet(1)));
+ }
+
+ Y_UNIT_TEST(ExecuteQueryMultiScalar) {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+
+ auto result = db.ExecuteQuery(R"(
+ SELECT COUNT(*) FROM EightShard;
+ SELECT COUNT(*) FROM TwoShard;
+ )").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ CompareYson(R"([[24u]])", FormatResultSetYson(result.GetResultSet(0)));
+ CompareYson(R"([[6u]])", FormatResultSetYson(result.GetResultSet(1)));
+ }
+
Y_UNIT_TEST(ExecuteScript) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 11fa10f60c7..d0fbc4077ec 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -380,6 +380,7 @@ message TEvExecuterTxResponse {
message TEvExecuterStreamData {
optional Ydb.ResultSet ResultSet = 1;
optional uint64 SeqNo = 2;
+ optional uint32 QueryResultIndex = 3;
};
message TEvExecuterStreamDataAck {
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index 84f4e2cbdc7..1d1ba2fe3b5 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -319,6 +319,7 @@ message TKqpPhyResult {
NKikimrMiniKQL.TType ItemType = 2;
bool IsStream = 3;
repeated string ColumnHints = 4;
+ uint32 QueryResultIndex = 5;
}
message TKqpPhyTx {
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp
index 00bba6c1201..202bb329bdd 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp
@@ -124,17 +124,15 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
}
if (part.HasResultSet()) {
- // TODO: Support multi-results
- Y_ENSURE(part.GetResultSetIndex() == 0);
-
auto inRs = part.ExtractResultSet();
auto& inRsProto = TProtoAccessor::GetProto(inRs);
- if (self->ResultSets_.empty()) {
- self->ResultSets_.resize(1);
+ // TODO: Use result sets metadata
+ if (self->ResultSets_.size() <= part.GetResultSetIndex()) {
+ self->ResultSets_.resize(part.GetResultSetIndex() + 1);
}
- auto& resultSet = self->ResultSets_[0];
+ auto& resultSet = self->ResultSets_[part.GetResultSetIndex()];
if (resultSet.columns().empty()) {
resultSet.mutable_columns()->CopyFrom(inRsProto.columns());
}