aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormakostrov <makostrov@yandex-team.com>2023-07-13 12:10:15 +0300
committermakostrov <makostrov@yandex-team.com>2023-07-13 12:10:15 +0300
commitcf717a072810d7e971dce2b303847a0199629c55 (patch)
treec9eea8314dc0425668125b7595452db5a2aabb73
parent589e557e0cd851dce2875618f069c3e25e9dcc3b (diff)
downloadydb-cf717a072810d7e971dce2b303847a0199629c55.tar.gz
Remove deprecated parameter
KIKIMR-17602 Remove support of old param in file index_events_processor.cpp Remove support of TParams in 2 functions
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metacache.cpp15
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metacache.h1
-rw-r--r--ydb/core/kqp/common/events/query.h5
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp27
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp30
-rw-r--r--ydb/core/kqp/host/kqp_host.h6
-rw-r--r--ydb/core/kqp/query_data/kqp_query_data.cpp30
-rw-r--r--ydb/core/kqp/query_data/kqp_query_data.h10
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h4
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp20
-rw-r--r--ydb/core/kqp/session_actor/kqp_worker_actor.cpp9
-rw-r--r--ydb/core/persqueue/writer/source_id_encoding.cpp10
-rw-r--r--ydb/core/persqueue/writer/source_id_encoding.h3
-rw-r--r--ydb/core/protos/kqp.proto2
-rw-r--r--ydb/core/ymq/actor/cleanup_queue_data.cpp87
-rw-r--r--ydb/core/ymq/actor/cleanup_queue_data.h2
-rw-r--r--ydb/core/ymq/actor/index_events_processor.cpp40
-rw-r--r--ydb/core/ymq/actor/index_events_processor.h3
-rw-r--r--ydb/core/ymq/base/run_query.cpp4
-rw-r--r--ydb/core/ymq/base/run_query.h3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_proto/accessor.cpp4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_proto/accessor.h1
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp55
23 files changed, 242 insertions, 129 deletions
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
index 53bc326c32..dcb9138e89 100644
--- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp
+++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
@@ -229,10 +229,17 @@ private:
req->Record.MutableRequest()->SetQuery(VersionQuery);
} else {
req->Record.MutableRequest()->SetQuery(TopicsQuery);
- NClient::TParameters params;
- params["$Path"] = LastTopicKey.Path;
- params["$Cluster"] = LastTopicKey.Cluster;
- req->Record.MutableRequest()->MutableParameters()->Swap(&params);
+
+ NYdb::TParams params = NYdb::TParamsBuilder()
+ .AddParam("$Path")
+ .Utf8(LastTopicKey.Path)
+ .Build()
+ .AddParam("$Cluster")
+ .Utf8(LastTopicKey.Cluster)
+ .Build()
+ .Build();
+
+ req->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params)));
}
Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), req.Release(), 0, Generation->Val());
}
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.h b/ydb/core/client/server/msgbus_server_pq_metacache.h
index 023814a97d..4c003fe7f2 100644
--- a/ydb/core/client/server/msgbus_server_pq_metacache.h
+++ b/ydb/core/client/server/msgbus_server_pq_metacache.h
@@ -7,6 +7,7 @@
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/public/api/protos/draft/persqueue_error_codes.pb.h>
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <util/generic/string.h>
#include <util/generic/vector.h>
diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h
index 1a0ecc14c4..49bd2cbac2 100644
--- a/ydb/core/kqp/common/events/query.h
+++ b/ydb/core/kqp/common/events/query.h
@@ -115,10 +115,6 @@ public:
return RequestCtx ? YqlText : Record.GetRequest().GetQuery();
}
- const ::NKikimrMiniKQL::TParams& GetParameters() const {
- return Record.GetRequest().GetParameters();
- }
-
const ::Ydb::Table::TransactionControl& GetTxControl() const {
return RequestCtx ? *TxControl : Record.GetRequest().GetTxControl();
}
@@ -229,7 +225,6 @@ public:
return ParametersSize;
}
- ParametersSize += Record.GetRequest().GetParameters().ByteSizeLong();
for (const auto& [name, param] : GetYdbParameters()) {
ParametersSize += name.size();
ParametersSize += param.ByteSizeLong();
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index c9d51a9ce6..2cd2f11652 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -23,6 +23,7 @@
#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/library/aclib/aclib.h>
#include <ydb/public/lib/base/msgbus_status.h>
+#include <ydb/public/sdk/cpp/client/ydb_params/params.h>
#include <ydb/public/api/protos/ydb_topic.pb.h>
#include <ydb/services/metadata/abstract/kqp_common.h>
#include <ydb/services/persqueue_v1/rpc_calls.h>
@@ -1874,7 +1875,7 @@ public:
ev->Record.MutableRequest()->SetKeepSession(false);
ev->Record.MutableRequest()->SetCollectStats(settings.CollectStats);
- FillParameters(params, *ev->Record.MutableRequest()->MutableParameters());
+ FillParameters(params, ev->Record.MutableRequest()->MutableYdbParameters());
return SendKqpScanQueryRequest(ev.Release(), rowsLimit,
[] (TPromise<TQueryResult> promise, TResponse&& responseEv) {
@@ -1906,7 +1907,7 @@ public:
ev->Record.MutableRequest()->SetKeepSession(false);
ev->Record.MutableRequest()->SetCollectStats(settings.CollectStats);
- FillParameters(std::move(params), *ev->Record.MutableRequest()->MutableParameters());
+ FillParameters(std::move(params), ev->Record.MutableRequest()->MutableYdbParameters());
auto& txControl = *ev->Record.MutableRequest()->MutableTxControl();
txControl.mutable_begin_tx()->CopyFrom(txSettings);
@@ -1947,7 +1948,7 @@ public:
);
// TODO: Rewrite CollectParameters at kqp_host
- FillParameters(std::move(params), *ev->Record.MutableRequest()->MutableParameters());
+ FillParameters(std::move(params), ev->Record.MutableRequest()->MutableYdbParameters());
return SendKqpScanQueryStreamRequest(ev.Release(), target,
[](TPromise<TQueryResult> promise, TResponse&& responseEv) {
@@ -2005,7 +2006,7 @@ public:
ev->Record.MutableRequest()->SetKeepSession(false);
ev->Record.MutableRequest()->SetCollectStats(settings.CollectStats);
- FillParameters(std::move(params), *ev->Record.MutableRequest()->MutableParameters());
+ FillParameters(std::move(params), ev->Record.MutableRequest()->MutableYdbParameters());
auto& txControl = *ev->Record.MutableRequest()->MutableTxControl();
txControl.mutable_begin_tx()->CopyFrom(txSettings);
@@ -2287,25 +2288,13 @@ private:
}
}
- static void FillParameters(TQueryData::TPtr params, NKikimrMiniKQL::TParams& output) {
+ static void FillParameters(TQueryData::TPtr params, ::google::protobuf::Map<TBasicString<char>, Ydb::TypedValue>* output) {
if (!params) {
return;
}
- if (params->GetParams().empty()) {
- return;
- }
-
- output.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Struct);
- auto type = output.MutableType()->MutableStruct();
- auto value = output.MutableValue();
- for (auto& pair : params->GetParams()) {
- auto typeMember = type->AddMember();
- typeMember->SetName(pair.first);
-
- typeMember->MutableType()->CopyFrom(pair.second.GetType());
- value->AddStruct()->CopyFrom(pair.second.GetValue());
- }
+ auto& paramsMap = params->GetParamsProtobuf();
+ output->insert(paramsMap.begin(), paramsMap.end());
}
static bool FillCreateColumnTableDesc(NYql::TKikimrTableMetadataPtr metadata,
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index d1e06b6948..e2445ab0ba 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -988,30 +988,30 @@ public:
});
}
- IAsyncQueryResultPtr ExecuteYqlScript(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters,
+ IAsyncQueryResultPtr ExecuteYqlScript(const TKqpQueryRef& script, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& parameters,
const TExecScriptSettings& settings) override
{
return CheckedProcessQuery(*ExprCtx,
- [this, &script, parameters = std::move(parameters), settings] (TExprContext& ctx) mutable {
- return ExecuteYqlScriptInternal(script, std::move(parameters), settings, ctx);
+ [this, &script, parameters, settings] (TExprContext& ctx) mutable {
+ return ExecuteYqlScriptInternal(script, parameters, settings, ctx);
});
}
- TQueryResult SyncExecuteYqlScript(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters,
+ TQueryResult SyncExecuteYqlScript(const TKqpQueryRef& script, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& parameters,
const TExecScriptSettings& settings) override
{
return CheckedSyncProcessQuery(
- [this, &script, parameters = std::move(parameters), settings] () mutable {
- return ExecuteYqlScript(script, std::move(parameters), settings);
+ [this, &script, parameters, settings] () mutable {
+ return ExecuteYqlScript(script, parameters, settings);
});
}
- IAsyncQueryResultPtr StreamExecuteYqlScript(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters,
+ IAsyncQueryResultPtr StreamExecuteYqlScript(const TKqpQueryRef& script, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& parameters,
const NActors::TActorId& target, const TExecScriptSettings& settings) override
{
return CheckedProcessQuery(*ExprCtx,
- [this, &script, parameters = std::move(parameters), target, settings](TExprContext& ctx) mutable {
- return StreamExecuteYqlScriptInternal(script, std::move(parameters), target, settings, ctx);
+ [this, &script, parameters, target, settings](TExprContext& ctx) mutable {
+ return StreamExecuteYqlScriptInternal(script, parameters, target, settings, ctx);
});
}
@@ -1401,7 +1401,7 @@ private:
SessionCtx, *ExecuteCtx);
}
- IAsyncQueryResultPtr ExecuteYqlScriptInternal(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters,
+ IAsyncQueryResultPtr ExecuteYqlScriptInternal(const TKqpQueryRef& script, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& parameters,
const TExecScriptSettings& settings, TExprContext& ctx)
{
SetupYqlTransformer(EKikimrQueryType::YqlScript);
@@ -1417,15 +1417,13 @@ private:
return nullptr;
}
- if (!ParseParameters(std::move(parameters), *(SessionCtx->Query().QueryData), ctx)) {
- return nullptr;
- }
+ (SessionCtx->Query().QueryData)->ParseParameters(parameters);
return MakeIntrusive<TAsyncExecuteYqlResult>(scriptExpr.Get(), ctx, *YqlTransformer, Cluster, SessionCtx,
*ResultProviderConfig, *PlanBuilder, sqlVersion);
}
- IAsyncQueryResultPtr StreamExecuteYqlScriptInternal(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters,
+ IAsyncQueryResultPtr StreamExecuteYqlScriptInternal(const TKqpQueryRef& script, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& parameters,
const NActors::TActorId& target,const TExecScriptSettings& settings, TExprContext& ctx)
{
SetupYqlTransformer(EKikimrQueryType::YqlScriptStreaming);
@@ -1443,9 +1441,7 @@ private:
return nullptr;
}
- if (!ParseParameters(std::move(parameters), *(SessionCtx->Query().QueryData), ctx)) {
- return nullptr;
- }
+ (SessionCtx->Query().QueryData)->ParseParameters(parameters);
return MakeIntrusive<TAsyncExecuteYqlResult>(scriptExpr.Get(), ctx, *YqlTransformer, Cluster, SessionCtx,
*ResultProviderConfig, *PlanBuilder, sqlVersion);
diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h
index b5fc46beb9..df444a037d 100644
--- a/ydb/core/kqp/host/kqp_host.h
+++ b/ydb/core/kqp/host/kqp_host.h
@@ -88,12 +88,12 @@ public:
virtual IAsyncQueryResultPtr ExplainYqlScript(const TKqpQueryRef& script) = 0;
virtual TQueryResult SyncExplainYqlScript(const TKqpQueryRef& script) = 0;
- virtual IAsyncQueryResultPtr ExecuteYqlScript(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters,
+ virtual IAsyncQueryResultPtr ExecuteYqlScript(const TKqpQueryRef& script, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& parameters,
const TExecScriptSettings& settings) = 0;
- virtual TQueryResult SyncExecuteYqlScript(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters,
+ virtual TQueryResult SyncExecuteYqlScript(const TKqpQueryRef& script, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& parameters,
const TExecScriptSettings& settings) = 0;
- virtual IAsyncQueryResultPtr StreamExecuteYqlScript(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters,
+ virtual IAsyncQueryResultPtr StreamExecuteYqlScript(const TKqpQueryRef& script, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& parameters,
const NActors::TActorId& target, const TExecScriptSettings& settings) = 0;
};
diff --git a/ydb/core/kqp/query_data/kqp_query_data.cpp b/ydb/core/kqp/query_data/kqp_query_data.cpp
index 8a7a94adcf..206341fe2d 100644
--- a/ydb/core/kqp/query_data/kqp_query_data.cpp
+++ b/ydb/core/kqp/query_data/kqp_query_data.cpp
@@ -7,6 +7,7 @@
#include <ydb/library/yql/minikql/mkql_string_util.h>
#include <ydb/library/yql/public/udf/udf_data_type.h>
#include <ydb/library/yql/utils/yql_panic.h>
+#include <ydb/public/sdk/cpp/client/ydb_params/params.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
namespace NKikimr::NKqp {
@@ -157,6 +158,14 @@ const TQueryData::TParamMap& TQueryData::GetParams() {
return Params;
}
+const TQueryData::TParamProtobufMap& TQueryData::GetParamsProtobuf() {
+ for(auto& [name, _] : UnboxedData) {
+ GetParameterTypedValue(name);
+ }
+
+ return ParamsProtobuf;
+}
+
NKikimr::NMiniKQL::TType* TQueryData::GetParameterType(const TString& name) {
auto it = UnboxedData.find(name);
if (it == UnboxedData.end()) {
@@ -309,6 +318,27 @@ const NKikimrMiniKQL::TParams* TQueryData::GetParameterMiniKqlValue(const TStrin
return &(it->second);
}
+const Ydb::TypedValue* TQueryData::GetParameterTypedValue(const TString& name) {
+ if (UnboxedData.find(name) == UnboxedData.end())
+ return nullptr;
+
+ auto it = ParamsProtobuf.find(name);
+ if (it == ParamsProtobuf.end()) {
+ with_lock(AllocState->Alloc) {
+ const auto& [type, uv] = GetParameterUnboxedValue(name);
+
+ auto& tv = ParamsProtobuf[name];
+
+ ExportTypeToProto(type, *tv.mutable_type());
+ ExportValueToProto(type, uv, *tv.mutable_value());
+
+ return &tv;
+ }
+ }
+
+ return &(it->second);
+}
+
const NKikimr::NMiniKQL::TTypeEnvironment& TQueryData::TypeEnv() {
return AllocState->TypeEnv;
}
diff --git a/ydb/core/kqp/query_data/kqp_query_data.h b/ydb/core/kqp/query_data/kqp_query_data.h
index 437b3036a4..aa1f8a2337 100644
--- a/ydb/core/kqp/query_data/kqp_query_data.h
+++ b/ydb/core/kqp/query_data/kqp_query_data.h
@@ -183,12 +183,18 @@ private:
NKikimrMiniKQL::TParams
>;
+ using TParamProtobufMap = ::google::protobuf::Map<
+ TString,
+ Ydb::TypedValue
+ >;
+
using TParamProvider = std::function<
bool(std::string_view name, NKikimr::NMiniKQL::TType* type, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
const NKikimr::NMiniKQL::THolderFactory& holderFactory, NUdf::TUnboxedValue& value)
>;
TParamMap Params;
+ TParamProtobufMap ParamsProtobuf;
TUnboxedParamsMap UnboxedData;
THashMap<ui32, TVector<TKqpExecuterTxResult>> TxResults;
TVector<TVector<TKqpPhyTxHolder::TConstPtr>> TxHolders;
@@ -205,6 +211,8 @@ public:
const TParamMap& GetParams();
+ const TParamProtobufMap& GetParamsProtobuf();
+
const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv();
TTxAllocatorState::TPtr GetAllocState() { return AllocState; }
@@ -243,6 +251,8 @@ public:
TTypedUnboxedValue& GetParameterUnboxedValue(const TString& name);
TTypedUnboxedValue* GetParameterUnboxedValuePtr(const TString& name);
const NKikimrMiniKQL::TParams* GetParameterMiniKqlValue(const TString& name);
+ const Ydb::TypedValue* GetParameterTypedValue(const TString& name);
+
NYql::NDqProto::TData SerializeParamValue(const TString& name);
void Clear();
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index 636b08723f..c795d0eeac 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -332,10 +332,6 @@ public:
return RequestEv->GetTxControl();
}
- const ::NKikimrMiniKQL::TParams& GetParameters() const {
- return RequestEv->GetParameters();
- }
-
// validate the compiled query response and ensure that all table versions are not
// changed since the last compilation.
bool EnsureTableVersions(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& response);
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 6b4509ba2b..7e5fedd18c 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -189,26 +189,7 @@ public:
Settings.Service, std::move(id));
}
- bool ConvertParameters() {
- auto& proto = QueryState->RequestEv->Record;
-
- if (!proto.GetRequest().HasParameters() && QueryState->RequestEv->GetYdbParameters().size()) {
- try {
- ConvertYdbParamsToMiniKQLParams(QueryState->RequestEv->GetYdbParameters(), *proto.MutableRequest()->MutableParameters());
- } catch (const std::exception& ex) {
- TString message = TStringBuilder() << "Failed to parse query parameters. "<< ex.what();
- ReplyProcessError(QueryState->Sender, QueryState->ProxyRequestId, Ydb::StatusIds::BAD_REQUEST, message);
- return false;
- }
- }
-
- return true;
- }
-
void ForwardRequest(TEvKqp::TEvQueryRequest::TPtr& ev) {
- if (!ConvertParameters())
- return;
-
if (!WorkerId) {
std::unique_ptr<IActor> workerActor(CreateKqpWorkerActor(SelfId(), SessionId, KqpSettings, Settings,
HttpGateway, ModuleResolverState, Counters));
@@ -699,7 +680,6 @@ public:
}
try {
- QueryState->QueryData->ParseParameters(QueryState->GetParameters());
QueryState->QueryData->ParseParameters(QueryState->GetYdbParameters());
} catch(const yexception& ex) {
ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what();
diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
index 69cb3fdeb0..ef0956ca99 100644
--- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
@@ -466,8 +466,7 @@ private:
switch (action) {
case NKikimrKqp::QUERY_ACTION_EXECUTE: {
- auto&& params = *QueryState->RequestEv->Record.MutableRequest()->MutableParameters();
- if (!ExecuteQuery(QueryState->RequestEv->GetQuery(), std::move(params), queryType, QueryState->RequestEv->GetRequestActorId())) {
+ if (!ExecuteQuery(QueryState->RequestEv->GetQuery(), QueryState->RequestEv->GetYdbParameters(), queryType, QueryState->RequestEv->GetRequestActorId())) {
onBadRequest(QueryState->Error);
return;
}
@@ -565,7 +564,7 @@ private:
Cleanup(ctx, true);
}
- bool ExecuteQuery(const TString& query, ::NKikimrMiniKQL::TParams&& parameters,
+ bool ExecuteQuery(const TString& query, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& parameters,
NKikimrKqp::EQueryType type, const TActorId& requestActorId)
{
auto statsMode = GetStatsMode(QueryState->RequestEv.get(), EKikimrStatsMode::Basic);
@@ -595,7 +594,7 @@ private:
execSettings.Deadlines = QueryState->QueryDeadlines;
execSettings.RpcCtx = QueryState->RequestEv->GetRequestCtx();
execSettings.StatsMode = statsMode;
- QueryState->AsyncQueryResult = KqpHost->ExecuteYqlScript(query, std::move(parameters), execSettings);
+ QueryState->AsyncQueryResult = KqpHost->ExecuteYqlScript(query, parameters, execSettings);
break;
}
@@ -604,7 +603,7 @@ private:
execSettings.Deadlines = QueryState->QueryDeadlines;
execSettings.RpcCtx = QueryState->RequestEv->GetRequestCtx();
execSettings.StatsMode = statsMode;
- QueryState->AsyncQueryResult = KqpHost->StreamExecuteYqlScript(query, std::move(parameters),
+ QueryState->AsyncQueryResult = KqpHost->StreamExecuteYqlScript(query, parameters,
requestActorId, execSettings);
break;
}
diff --git a/ydb/core/persqueue/writer/source_id_encoding.cpp b/ydb/core/persqueue/writer/source_id_encoding.cpp
index 59101a2e5f..e42a9ba7d1 100644
--- a/ydb/core/persqueue/writer/source_id_encoding.cpp
+++ b/ydb/core/persqueue/writer/source_id_encoding.cpp
@@ -221,6 +221,16 @@ void SetHashToTxParams(NClient::TParameters& parameters, const TEncodedSourceId&
}
}
+void SetHashToTParamsBuilder(NYdb::TParamsBuilder& builder, const TEncodedSourceId& encodedSrcId) {
+ switch (encodedSrcId.Generation) {
+ case ESourceIdTableGeneration::PartitionMapping:
+ builder.AddParam("$Hash").Uint64(encodedSrcId.KeysHash).Build();
+ return;
+ case ESourceIdTableGeneration::SrcIdMeta2:
+ builder.AddParam("$Hash").Uint32(encodedSrcId.Hash).Build();
+ return;
+ }
+}
} // NSourceIdEncoding
} // NPQ
diff --git a/ydb/core/persqueue/writer/source_id_encoding.h b/ydb/core/persqueue/writer/source_id_encoding.h
index a879521e61..74e672146c 100644
--- a/ydb/core/persqueue/writer/source_id_encoding.h
+++ b/ydb/core/persqueue/writer/source_id_encoding.h
@@ -3,6 +3,7 @@
#include <util/generic/fwd.h>
#include <util/generic/string.h>
#include <ydb/public/lib/deprecated/kicli/kicli.h>
+#include <ydb/public/sdk/cpp/client/ydb_params/params.h>
namespace NKikimr {
namespace NPQ {
@@ -39,6 +40,8 @@ struct TEncodedSourceId {
void SetHashToTxParams(NClient::TParameters& parameters, const TEncodedSourceId& encodedSrcId);
+void SetHashToTParamsBuilder(NYdb::TParamsBuilder& builder, const TEncodedSourceId& encodedSrcId);
+
TEncodedSourceId EncodeSrcId(const TString& topic, const TString& userSourceId, ESourceIdTableGeneration generation);
} // NSourceIdEncoding
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 19b6cbe08a..5742f70258 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -83,7 +83,7 @@ message TQueryRequest {
reserved 4; // (deprecated) KqlSettings
optional bool KeepSession = 5;
reserved 6; // (deprecated) Cluster
- optional NKikimrMiniKQL.TParams Parameters = 7;
+ reserved 7; // (deprecated) Parameters
reserved 8; // (deprecated) SqlAutoCommit
optional EQueryAction Action = 9;
reserved 10; // (deprecated) Profile
diff --git a/ydb/core/ymq/actor/cleanup_queue_data.cpp b/ydb/core/ymq/actor/cleanup_queue_data.cpp
index 0d94214b85..2ca6558479 100644
--- a/ydb/core/ymq/actor/cleanup_queue_data.cpp
+++ b/ydb/core/ymq/actor/cleanup_queue_data.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/ymq/queues/common/key_hashes.h>
+
namespace NKikimr::NSQS {
constexpr TDuration LOCK_PERIOD = TDuration::Seconds(30);
constexpr TDuration UPDATE_LOCK_PERIOD = TDuration::Seconds(20);
@@ -99,9 +100,14 @@ namespace NKikimr::NSQS {
LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] getting queues...");
State = state;
- NClient::TParameters params;
- params["$StartProcessTimestamp"] = StartProcessTimestamp.Seconds();
- params["$NodeId"] = SelfId().NodeId();
+ NYdb::TParams params = NYdb::TParamsBuilder()
+ .AddParam("$StartProcessTimestamp")
+ .Uint64(StartProcessTimestamp.Seconds())
+ .Build()
+ .AddParam("$NodeId")
+ .Uint32(SelfId().NodeId())
+ .Build()
+ .Build();
RunYqlQuery(SelectQueuesQuery, std::move(params), true, sendAfter, Cfg().GetRoot(), ctx);
}
@@ -173,11 +179,19 @@ namespace NKikimr::NSQS {
void TCleanupQueueDataActor::LockQueueToRemove(TDuration runAfter, const TActorContext& ctx) {
State = EState::LockQueue;
- NClient::TParameters params;
StartProcessTimestamp = ctx.Now();
- params["$StartProcessTimestamp"] = StartProcessTimestamp.Seconds();
- params["$LockFreeTimestamp"] = (StartProcessTimestamp - LOCK_PERIOD).Seconds();
- params["$NodeId"] = SelfId().NodeId();
+
+ NYdb::TParams params = NYdb::TParamsBuilder()
+ .AddParam("$StartProcessTimestamp")
+ .Uint64(StartProcessTimestamp.Seconds())
+ .Build()
+ .AddParam("$LockFreeTimestamp")
+ .Uint64((StartProcessTimestamp - LOCK_PERIOD).Seconds())
+ .Build()
+ .AddParam("$NodeId")
+ .Uint32(SelfId().NodeId())
+ .Build()
+ .Build();
RunYqlQuery(LockQueueQuery, std::move(params), false, runAfter, Cfg().GetRoot(), ctx);
}
@@ -186,13 +200,28 @@ namespace NKikimr::NSQS {
LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] update queue lock...");
State = EState::UpdateLockQueue;
- NClient::TParameters params;
- params["$StartProcessTimestamp"] = StartProcessTimestamp.Seconds();
- params["$NodeId"] = SelfId().NodeId();
- params["$RemoveTimestamp"] = RemoveQueueTimetsamp;
- params["$QueueIdNumber"] = QueueIdNumber;
+ NYdb::TParamsBuilder paramsBuilder;
+ paramsBuilder
+ .AddParam("$StartProcessTimestamp")
+ .Uint64(StartProcessTimestamp.Seconds())
+ .Build()
+ .AddParam("$NodeId")
+ .Uint32(SelfId().NodeId())
+ .Build()
+ .AddParam("$RemoveTimestamp")
+ .Uint64(RemoveQueueTimetsamp)
+ .Build()
+ .AddParam("$QueueIdNumber")
+ .Uint64(QueueIdNumber)
+ .Build();
+
StartProcessTimestamp = ctx.Now();
- params["$Now"] = StartProcessTimestamp.Seconds();
+ paramsBuilder
+ .AddParam("$Now")
+ .Uint64(StartProcessTimestamp.Seconds())
+ .Build();
+
+ NYdb::TParams params = paramsBuilder.Build();
RunYqlQuery(UpdateLockQueueQuery, std::move(params), false, TDuration::Zero(), Cfg().GetRoot(), ctx);
}
@@ -252,9 +281,15 @@ namespace NKikimr::NSQS {
void TCleanupQueueDataActor::Finish(const TActorContext& ctx) {
State = EState::Finish;
- NClient::TParameters params;
- params["$RemoveTimestamp"] = RemoveQueueTimetsamp;
- params["$QueueIdNumber"] = QueueIdNumber;
+ NYdb::TParams params = NYdb::TParamsBuilder()
+ .AddParam("$RemoveTimestamp")
+ .Uint64(RemoveQueueTimetsamp)
+ .Build()
+ .AddParam("$QueueIdNumber")
+ .Uint64(QueueIdNumber)
+ .Build()
+ .Build();
+
RunYqlQuery(RemoveQueueFromListQuery, std::move(params), false, TDuration::Zero(), Cfg().GetRoot(), ctx);
}
@@ -272,12 +307,22 @@ namespace NKikimr::NSQS {
return;
}
- NClient::TParameters params;
ui32 shard = ShardsToRemove ? (ShardsToRemove - 1) : 0;
- params["$QueueIdNumberAndShardHash"] = GetKeysHash(QueueIdNumber, shard);
- params["$Shard"] = shard;
- params["$QueueIdNumberHash"] = GetKeysHash(QueueIdNumber);
- params["$QueueIdNumber"] = QueueIdNumber;
+
+ NYdb::TParams params = NYdb::TParamsBuilder()
+ .AddParam("$QueueIdNumberAndShardHash")
+ .Uint64(GetKeysHash(QueueIdNumber, shard))
+ .Build()
+ .AddParam("$Shard")
+ .Uint32(shard)
+ .Build()
+ .AddParam("$QueueIdNumberHash")
+ .Uint64(GetKeysHash(QueueIdNumber))
+ .Build()
+ .AddParam("$QueueIdNumber")
+ .Uint64(QueueIdNumber)
+ .Build()
+ .Build();
RunYqlQuery(RemoveDataQuery, std::move(params), false, TDuration::Zero(), Cfg().GetRoot(), ctx);
}
diff --git a/ydb/core/ymq/actor/cleanup_queue_data.h b/ydb/core/ymq/actor/cleanup_queue_data.h
index 5867fe613d..0104276f03 100644
--- a/ydb/core/ymq/actor/cleanup_queue_data.h
+++ b/ydb/core/ymq/actor/cleanup_queue_data.h
@@ -2,8 +2,10 @@
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/public/lib/deprecated/kicli/kicli.h>
+#include <ydb/public/sdk/cpp/client/ydb_params/params.h>
#include <ydb/core/ymq/base/counters.h>
+
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/actorsystem.h>
diff --git a/ydb/core/ymq/actor/index_events_processor.cpp b/ydb/core/ymq/actor/index_events_processor.cpp
index e1fd24587e..ba57429a3d 100644
--- a/ydb/core/ymq/actor/index_events_processor.cpp
+++ b/ydb/core/ymq/actor/index_events_processor.cpp
@@ -134,9 +134,14 @@ void TSearchEventsProcessor::RunQueuesListQuery(const TActorContext& ctx) {
request->SetKeepSession(false);
request->SetPreparedQuery(SelectQueuesQuery);
- NClient::TParameters params;
- params["$Account"] = LastQueuesKey.Account;
- params["$QueueName"] = LastQueuesKey.QueueName;
+ NYdb::TParams params = NYdb::TParamsBuilder()
+ .AddParam("$Account")
+ .Utf8(LastQueuesKey.Account)
+ .Build()
+ .AddParam("$QueueName")
+ .Utf8(LastQueuesKey.QueueName)
+ .Build()
+ .Build();
RunQuery(SelectQueuesQuery, &params, true, ctx);
}
@@ -209,16 +214,29 @@ void TSearchEventsProcessor::OnEventsListingDone(NKqp::TEvKqp::TEvQueryResponse:
void TSearchEventsProcessor::RunEventsCleanup(const TActorContext& ctx) {
State = EState::CleanupExecute;
- NClient::TParameters params;
- auto param = params["$Events"];
+ NYdb::TParamsBuilder paramsBuilder;
+
+ auto& param = paramsBuilder.AddParam("$Events");
+ param.BeginList();
+
for (const auto&[qName, events] : QueuesEvents) {
for (const auto&[_, event]: events) {
- auto item = param.AddListItem();
- item["Account"] = event.CloudId;
- item["QueueName"] = qName;
- item["EventType"] = static_cast<ui64>(event.Type);
+ param.AddListItem()
+ .BeginStruct()
+ .AddMember("Account")
+ .Utf8(event.CloudId)
+ .AddMember("QueueName")
+ .Utf8(qName)
+ .AddMember("EventType")
+ .Uint64(static_cast<ui64>(event.Type))
+ .EndStruct();
}
}
+ param.EndList();
+ param.Build();
+
+ auto params = paramsBuilder.Build();
+
RunQuery(DeleteEventQuery, &params, false, ctx);
}
@@ -301,7 +319,7 @@ void TSearchEventsProcessor::StopSession(const TActorContext& ctx) {
}
}
-void TSearchEventsProcessor::RunQuery(const TString& query, NKikimr::NClient::TParameters* params, bool readonly,
+void TSearchEventsProcessor::RunQuery(const TString& query, NYdb::TParams* params, bool readonly,
const TActorContext& ctx) {
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
auto* request = ev->Record.MutableRequest();
@@ -326,7 +344,7 @@ void TSearchEventsProcessor::RunQuery(const TString& query, NKikimr::NClient::TP
}
request->MutableTxControl()->set_commit_tx(true);
if (params != nullptr) {
- request->MutableParameters()->Swap(params);
+ request->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(*params)));
}
Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
diff --git a/ydb/core/ymq/actor/index_events_processor.h b/ydb/core/ymq/actor/index_events_processor.h
index cc8f902d1d..29767c6c15 100644
--- a/ydb/core/ymq/actor/index_events_processor.h
+++ b/ydb/core/ymq/actor/index_events_processor.h
@@ -5,6 +5,7 @@
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/public/lib/deprecated/kicli/kicli.h>
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
@@ -94,7 +95,7 @@ private:
void RunEventsCleanup(const TActorContext& ctx);
void OnCleanupQueryComplete(const TActorContext&ctx);
- void RunQuery(const TString& query, NKikimr::NClient::TParameters* params, bool readonly,
+ void RunQuery(const TString& query, NYdb::TParams* params, bool readonly,
const TActorContext& ctx);
void ProcessEventsQueue(const TActorContext& ctx);
diff --git a/ydb/core/ymq/base/run_query.cpp b/ydb/core/ymq/base/run_query.cpp
index 0394e19acd..fdc2fff098 100644
--- a/ydb/core/ymq/base/run_query.cpp
+++ b/ydb/core/ymq/base/run_query.cpp
@@ -6,7 +6,7 @@ namespace NKikimr::NSQS {
void RunYqlQuery(
const TString& query,
- std::optional<NKikimr::NClient::TParameters> params,
+ std::optional<NYdb::TParams> params,
bool readonly,
TDuration sendAfter,
const TString& database,
@@ -34,7 +34,7 @@ namespace NKikimr::NSQS {
request->MutableTxControl()->set_commit_tx(true);
if (params) {
- request->MutableParameters()->Swap(&params.value());
+ request->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params.value())));
}
auto kqpActor = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
diff --git a/ydb/core/ymq/base/run_query.h b/ydb/core/ymq/base/run_query.h
index 9d7ce8e619..65bf22ab5a 100644
--- a/ydb/core/ymq/base/run_query.h
+++ b/ydb/core/ymq/base/run_query.h
@@ -2,6 +2,7 @@
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/public/lib/deprecated/kicli/kicli.h>
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <library/cpp/actors/core/actor.h>
@@ -10,7 +11,7 @@ namespace NKikimr::NSQS {
void RunYqlQuery(
const TString& query,
- std::optional<NKikimr::NClient::TParameters> params,
+ std::optional<NYdb::TParams> params,
bool readonly,
TDuration sendAfter,
const TString& database,
diff --git a/ydb/public/sdk/cpp/client/ydb_proto/accessor.cpp b/ydb/public/sdk/cpp/client/ydb_proto/accessor.cpp
index ddaa380e30..f51b810160 100644
--- a/ydb/public/sdk/cpp/client/ydb_proto/accessor.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_proto/accessor.cpp
@@ -27,6 +27,10 @@ const ::google::protobuf::Map<TString, Ydb::TypedValue>& TProtoAccessor::GetProt
return params.GetProtoMap();
}
+::google::protobuf::Map<TString, Ydb::TypedValue>* TProtoAccessor::GetProtoMapPtr(TParams& params) {
+ return params.GetProtoMapPtr();
+}
+
template Ydb::Export::ExportToS3Settings::Scheme TProtoAccessor::GetProto<Ydb::Export::ExportToS3Settings>(ES3Scheme value);
template Ydb::Import::ImportFromS3Settings::Scheme TProtoAccessor::GetProto<Ydb::Import::ImportFromS3Settings>(ES3Scheme value);
diff --git a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h
index 541fc54809..e2dda78f2b 100644
--- a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h
+++ b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h
@@ -38,6 +38,7 @@ public:
static const Ydb::Value& GetProto(const TValue& value);
static const Ydb::ResultSet& GetProto(const TResultSet& resultSet);
static const ::google::protobuf::Map<TString, Ydb::TypedValue>& GetProtoMap(const TParams& params);
+ static ::google::protobuf::Map<TString, Ydb::TypedValue>* GetProtoMapPtr(TParams& params);
static const Ydb::TableStats::QueryStats& GetProto(const NTable::TQueryStats& queryStats);
static const Ydb::Table::DescribeTableResult& GetProto(const NTable::TTableDescription& tableDescription);
static const Ydb::Topic::DescribeTopicResult& GetProto(const NYdb::NTopic::TTopicDescription& topicDescription);
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index 0f6933c4d6..df28db4e2d 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -13,6 +13,7 @@
#include <ydb/core/persqueue/write_meta.h>
#include <ydb/library/services/services.pb.h>
#include <ydb/public/lib/deprecated/kicli/kicli.h>
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
#include <ydb/services/lib/sharding/sharding.h>
#include <library/cpp/actors/core/log.h>
@@ -760,13 +761,23 @@ void TWriteSessionActor<UseMigrationProtocol>::SendSelectPartitionRequest(const
ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
// keep compiled query in cache.
ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
- NClient::TParameters parameters;
- SetHashToTxParams(parameters, EncodedSourceId);
- parameters["$Topic"] = topic;
- parameters["$SourceId"] = EncodedSourceId.EscapedSourceId;
+ NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder();
+
+ SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId);
+
+ paramsBuilder
+ .AddParam("$Topic")
+ .Utf8(topic)
+ .Build()
+ .AddParam("$SourceId")
+ .Utf8(EncodedSourceId.EscapedSourceId)
+ .Build();
+
+ NYdb::TParams params = paramsBuilder.Build();
+
+ ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params)));
- ev->Record.MutableRequest()->MutableParameters()->Swap(&parameters);
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
SelectSrcIdsInflight++;
}
@@ -950,16 +961,30 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor<UseMigrationProtocol>:
// keep compiled query in cache.
ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
- NClient::TParameters parameters;
- SetHashToTxParams(parameters, EncodedSourceId);
- //parameters["$Hash"] = hash;
- parameters["$Topic"] = topic;
- parameters["$SourceId"] = EncodedSourceId.EscapedSourceId;
-
- parameters["$CreateTime"] = SourceIdCreateTime;
- parameters["$AccessTime"] = TInstant::Now().MilliSeconds();
- parameters["$Partition"] = Partition;
- ev->Record.MutableRequest()->MutableParameters()->Swap(&parameters);
+ NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder();
+
+ SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId);
+
+ paramsBuilder
+ .AddParam("$Topic")
+ .Utf8(topic)
+ .Build()
+ .AddParam("$SourceId")
+ .Utf8(EncodedSourceId.EscapedSourceId)
+ .Build()
+ .AddParam("$CreateTime")
+ .Uint64(SourceIdCreateTime)
+ .Build()
+ .AddParam("$AccessTime")
+ .Uint64(TInstant::Now().MilliSeconds())
+ .Build()
+ .AddParam("$Partition")
+ .Uint32(Partition)
+ .Build();
+
+ NYdb::TParams params = paramsBuilder.Build();
+
+ ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params)));
return ev;
}