diff options
author | makostrov <makostrov@yandex-team.com> | 2023-07-13 12:10:15 +0300 |
---|---|---|
committer | makostrov <makostrov@yandex-team.com> | 2023-07-13 12:10:15 +0300 |
commit | cf717a072810d7e971dce2b303847a0199629c55 (patch) | |
tree | c9eea8314dc0425668125b7595452db5a2aabb73 | |
parent | 589e557e0cd851dce2875618f069c3e25e9dcc3b (diff) | |
download | ydb-cf717a072810d7e971dce2b303847a0199629c55.tar.gz |
Remove deprecated parameter
KIKIMR-17602
Remove support of old param in file index_events_processor.cpp
Remove support of TParams in 2 functions
23 files changed, 242 insertions, 129 deletions
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp index 53bc326c32..dcb9138e89 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp @@ -229,10 +229,17 @@ private: req->Record.MutableRequest()->SetQuery(VersionQuery); } else { req->Record.MutableRequest()->SetQuery(TopicsQuery); - NClient::TParameters params; - params["$Path"] = LastTopicKey.Path; - params["$Cluster"] = LastTopicKey.Cluster; - req->Record.MutableRequest()->MutableParameters()->Swap(¶ms); + + NYdb::TParams params = NYdb::TParamsBuilder() + .AddParam("$Path") + .Utf8(LastTopicKey.Path) + .Build() + .AddParam("$Cluster") + .Utf8(LastTopicKey.Cluster) + .Build() + .Build(); + + req->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params))); } Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), req.Release(), 0, Generation->Val()); } diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.h b/ydb/core/client/server/msgbus_server_pq_metacache.h index 023814a97d..4c003fe7f2 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.h +++ b/ydb/core/client/server/msgbus_server_pq_metacache.h @@ -7,6 +7,7 @@ #include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/public/api/protos/draft/persqueue_error_codes.pb.h> +#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <util/generic/string.h> #include <util/generic/vector.h> diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h index 1a0ecc14c4..49bd2cbac2 100644 --- a/ydb/core/kqp/common/events/query.h +++ b/ydb/core/kqp/common/events/query.h @@ -115,10 +115,6 @@ public: return RequestCtx ? YqlText : Record.GetRequest().GetQuery(); } - const ::NKikimrMiniKQL::TParams& GetParameters() const { - return Record.GetRequest().GetParameters(); - } - const ::Ydb::Table::TransactionControl& GetTxControl() const { return RequestCtx ? *TxControl : Record.GetRequest().GetTxControl(); } @@ -229,7 +225,6 @@ public: return ParametersSize; } - ParametersSize += Record.GetRequest().GetParameters().ByteSizeLong(); for (const auto& [name, param] : GetYdbParameters()) { ParametersSize += name.size(); ParametersSize += param.ByteSizeLong(); diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index c9d51a9ce6..2cd2f11652 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -23,6 +23,7 @@ #include <ydb/core/ydb_convert/ydb_convert.h> #include <ydb/library/aclib/aclib.h> #include <ydb/public/lib/base/msgbus_status.h> +#include <ydb/public/sdk/cpp/client/ydb_params/params.h> #include <ydb/public/api/protos/ydb_topic.pb.h> #include <ydb/services/metadata/abstract/kqp_common.h> #include <ydb/services/persqueue_v1/rpc_calls.h> @@ -1874,7 +1875,7 @@ public: ev->Record.MutableRequest()->SetKeepSession(false); ev->Record.MutableRequest()->SetCollectStats(settings.CollectStats); - FillParameters(params, *ev->Record.MutableRequest()->MutableParameters()); + FillParameters(params, ev->Record.MutableRequest()->MutableYdbParameters()); return SendKqpScanQueryRequest(ev.Release(), rowsLimit, [] (TPromise<TQueryResult> promise, TResponse&& responseEv) { @@ -1906,7 +1907,7 @@ public: ev->Record.MutableRequest()->SetKeepSession(false); ev->Record.MutableRequest()->SetCollectStats(settings.CollectStats); - FillParameters(std::move(params), *ev->Record.MutableRequest()->MutableParameters()); + FillParameters(std::move(params), ev->Record.MutableRequest()->MutableYdbParameters()); auto& txControl = *ev->Record.MutableRequest()->MutableTxControl(); txControl.mutable_begin_tx()->CopyFrom(txSettings); @@ -1947,7 +1948,7 @@ public: ); // TODO: Rewrite CollectParameters at kqp_host - FillParameters(std::move(params), *ev->Record.MutableRequest()->MutableParameters()); + FillParameters(std::move(params), ev->Record.MutableRequest()->MutableYdbParameters()); return SendKqpScanQueryStreamRequest(ev.Release(), target, [](TPromise<TQueryResult> promise, TResponse&& responseEv) { @@ -2005,7 +2006,7 @@ public: ev->Record.MutableRequest()->SetKeepSession(false); ev->Record.MutableRequest()->SetCollectStats(settings.CollectStats); - FillParameters(std::move(params), *ev->Record.MutableRequest()->MutableParameters()); + FillParameters(std::move(params), ev->Record.MutableRequest()->MutableYdbParameters()); auto& txControl = *ev->Record.MutableRequest()->MutableTxControl(); txControl.mutable_begin_tx()->CopyFrom(txSettings); @@ -2287,25 +2288,13 @@ private: } } - static void FillParameters(TQueryData::TPtr params, NKikimrMiniKQL::TParams& output) { + static void FillParameters(TQueryData::TPtr params, ::google::protobuf::Map<TBasicString<char>, Ydb::TypedValue>* output) { if (!params) { return; } - if (params->GetParams().empty()) { - return; - } - - output.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Struct); - auto type = output.MutableType()->MutableStruct(); - auto value = output.MutableValue(); - for (auto& pair : params->GetParams()) { - auto typeMember = type->AddMember(); - typeMember->SetName(pair.first); - - typeMember->MutableType()->CopyFrom(pair.second.GetType()); - value->AddStruct()->CopyFrom(pair.second.GetValue()); - } + auto& paramsMap = params->GetParamsProtobuf(); + output->insert(paramsMap.begin(), paramsMap.end()); } static bool FillCreateColumnTableDesc(NYql::TKikimrTableMetadataPtr metadata, diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index d1e06b6948..e2445ab0ba 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -988,30 +988,30 @@ public: }); } - IAsyncQueryResultPtr ExecuteYqlScript(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters, + IAsyncQueryResultPtr ExecuteYqlScript(const TKqpQueryRef& script, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& parameters, const TExecScriptSettings& settings) override { return CheckedProcessQuery(*ExprCtx, - [this, &script, parameters = std::move(parameters), settings] (TExprContext& ctx) mutable { - return ExecuteYqlScriptInternal(script, std::move(parameters), settings, ctx); + [this, &script, parameters, settings] (TExprContext& ctx) mutable { + return ExecuteYqlScriptInternal(script, parameters, settings, ctx); }); } - TQueryResult SyncExecuteYqlScript(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters, + TQueryResult SyncExecuteYqlScript(const TKqpQueryRef& script, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& parameters, const TExecScriptSettings& settings) override { return CheckedSyncProcessQuery( - [this, &script, parameters = std::move(parameters), settings] () mutable { - return ExecuteYqlScript(script, std::move(parameters), settings); + [this, &script, parameters, settings] () mutable { + return ExecuteYqlScript(script, parameters, settings); }); } - IAsyncQueryResultPtr StreamExecuteYqlScript(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters, + IAsyncQueryResultPtr StreamExecuteYqlScript(const TKqpQueryRef& script, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& parameters, const NActors::TActorId& target, const TExecScriptSettings& settings) override { return CheckedProcessQuery(*ExprCtx, - [this, &script, parameters = std::move(parameters), target, settings](TExprContext& ctx) mutable { - return StreamExecuteYqlScriptInternal(script, std::move(parameters), target, settings, ctx); + [this, &script, parameters, target, settings](TExprContext& ctx) mutable { + return StreamExecuteYqlScriptInternal(script, parameters, target, settings, ctx); }); } @@ -1401,7 +1401,7 @@ private: SessionCtx, *ExecuteCtx); } - IAsyncQueryResultPtr ExecuteYqlScriptInternal(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters, + IAsyncQueryResultPtr ExecuteYqlScriptInternal(const TKqpQueryRef& script, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& parameters, const TExecScriptSettings& settings, TExprContext& ctx) { SetupYqlTransformer(EKikimrQueryType::YqlScript); @@ -1417,15 +1417,13 @@ private: return nullptr; } - if (!ParseParameters(std::move(parameters), *(SessionCtx->Query().QueryData), ctx)) { - return nullptr; - } + (SessionCtx->Query().QueryData)->ParseParameters(parameters); return MakeIntrusive<TAsyncExecuteYqlResult>(scriptExpr.Get(), ctx, *YqlTransformer, Cluster, SessionCtx, *ResultProviderConfig, *PlanBuilder, sqlVersion); } - IAsyncQueryResultPtr StreamExecuteYqlScriptInternal(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters, + IAsyncQueryResultPtr StreamExecuteYqlScriptInternal(const TKqpQueryRef& script, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& parameters, const NActors::TActorId& target,const TExecScriptSettings& settings, TExprContext& ctx) { SetupYqlTransformer(EKikimrQueryType::YqlScriptStreaming); @@ -1443,9 +1441,7 @@ private: return nullptr; } - if (!ParseParameters(std::move(parameters), *(SessionCtx->Query().QueryData), ctx)) { - return nullptr; - } + (SessionCtx->Query().QueryData)->ParseParameters(parameters); return MakeIntrusive<TAsyncExecuteYqlResult>(scriptExpr.Get(), ctx, *YqlTransformer, Cluster, SessionCtx, *ResultProviderConfig, *PlanBuilder, sqlVersion); diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h index b5fc46beb9..df444a037d 100644 --- a/ydb/core/kqp/host/kqp_host.h +++ b/ydb/core/kqp/host/kqp_host.h @@ -88,12 +88,12 @@ public: virtual IAsyncQueryResultPtr ExplainYqlScript(const TKqpQueryRef& script) = 0; virtual TQueryResult SyncExplainYqlScript(const TKqpQueryRef& script) = 0; - virtual IAsyncQueryResultPtr ExecuteYqlScript(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters, + virtual IAsyncQueryResultPtr ExecuteYqlScript(const TKqpQueryRef& script, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& parameters, const TExecScriptSettings& settings) = 0; - virtual TQueryResult SyncExecuteYqlScript(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters, + virtual TQueryResult SyncExecuteYqlScript(const TKqpQueryRef& script, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& parameters, const TExecScriptSettings& settings) = 0; - virtual IAsyncQueryResultPtr StreamExecuteYqlScript(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters, + virtual IAsyncQueryResultPtr StreamExecuteYqlScript(const TKqpQueryRef& script, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& parameters, const NActors::TActorId& target, const TExecScriptSettings& settings) = 0; }; diff --git a/ydb/core/kqp/query_data/kqp_query_data.cpp b/ydb/core/kqp/query_data/kqp_query_data.cpp index 8a7a94adcf..206341fe2d 100644 --- a/ydb/core/kqp/query_data/kqp_query_data.cpp +++ b/ydb/core/kqp/query_data/kqp_query_data.cpp @@ -7,6 +7,7 @@ #include <ydb/library/yql/minikql/mkql_string_util.h> #include <ydb/library/yql/public/udf/udf_data_type.h> #include <ydb/library/yql/utils/yql_panic.h> +#include <ydb/public/sdk/cpp/client/ydb_params/params.h> #include <ydb/library/yverify_stream/yverify_stream.h> namespace NKikimr::NKqp { @@ -157,6 +158,14 @@ const TQueryData::TParamMap& TQueryData::GetParams() { return Params; } +const TQueryData::TParamProtobufMap& TQueryData::GetParamsProtobuf() { + for(auto& [name, _] : UnboxedData) { + GetParameterTypedValue(name); + } + + return ParamsProtobuf; +} + NKikimr::NMiniKQL::TType* TQueryData::GetParameterType(const TString& name) { auto it = UnboxedData.find(name); if (it == UnboxedData.end()) { @@ -309,6 +318,27 @@ const NKikimrMiniKQL::TParams* TQueryData::GetParameterMiniKqlValue(const TStrin return &(it->second); } +const Ydb::TypedValue* TQueryData::GetParameterTypedValue(const TString& name) { + if (UnboxedData.find(name) == UnboxedData.end()) + return nullptr; + + auto it = ParamsProtobuf.find(name); + if (it == ParamsProtobuf.end()) { + with_lock(AllocState->Alloc) { + const auto& [type, uv] = GetParameterUnboxedValue(name); + + auto& tv = ParamsProtobuf[name]; + + ExportTypeToProto(type, *tv.mutable_type()); + ExportValueToProto(type, uv, *tv.mutable_value()); + + return &tv; + } + } + + return &(it->second); +} + const NKikimr::NMiniKQL::TTypeEnvironment& TQueryData::TypeEnv() { return AllocState->TypeEnv; } diff --git a/ydb/core/kqp/query_data/kqp_query_data.h b/ydb/core/kqp/query_data/kqp_query_data.h index 437b3036a4..aa1f8a2337 100644 --- a/ydb/core/kqp/query_data/kqp_query_data.h +++ b/ydb/core/kqp/query_data/kqp_query_data.h @@ -183,12 +183,18 @@ private: NKikimrMiniKQL::TParams >; + using TParamProtobufMap = ::google::protobuf::Map< + TString, + Ydb::TypedValue + >; + using TParamProvider = std::function< bool(std::string_view name, NKikimr::NMiniKQL::TType* type, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, NUdf::TUnboxedValue& value) >; TParamMap Params; + TParamProtobufMap ParamsProtobuf; TUnboxedParamsMap UnboxedData; THashMap<ui32, TVector<TKqpExecuterTxResult>> TxResults; TVector<TVector<TKqpPhyTxHolder::TConstPtr>> TxHolders; @@ -205,6 +211,8 @@ public: const TParamMap& GetParams(); + const TParamProtobufMap& GetParamsProtobuf(); + const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv(); TTxAllocatorState::TPtr GetAllocState() { return AllocState; } @@ -243,6 +251,8 @@ public: TTypedUnboxedValue& GetParameterUnboxedValue(const TString& name); TTypedUnboxedValue* GetParameterUnboxedValuePtr(const TString& name); const NKikimrMiniKQL::TParams* GetParameterMiniKqlValue(const TString& name); + const Ydb::TypedValue* GetParameterTypedValue(const TString& name); + NYql::NDqProto::TData SerializeParamValue(const TString& name); void Clear(); diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 636b08723f..c795d0eeac 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -332,10 +332,6 @@ public: return RequestEv->GetTxControl(); } - const ::NKikimrMiniKQL::TParams& GetParameters() const { - return RequestEv->GetParameters(); - } - // validate the compiled query response and ensure that all table versions are not // changed since the last compilation. bool EnsureTableVersions(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& response); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 6b4509ba2b..7e5fedd18c 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -189,26 +189,7 @@ public: Settings.Service, std::move(id)); } - bool ConvertParameters() { - auto& proto = QueryState->RequestEv->Record; - - if (!proto.GetRequest().HasParameters() && QueryState->RequestEv->GetYdbParameters().size()) { - try { - ConvertYdbParamsToMiniKQLParams(QueryState->RequestEv->GetYdbParameters(), *proto.MutableRequest()->MutableParameters()); - } catch (const std::exception& ex) { - TString message = TStringBuilder() << "Failed to parse query parameters. "<< ex.what(); - ReplyProcessError(QueryState->Sender, QueryState->ProxyRequestId, Ydb::StatusIds::BAD_REQUEST, message); - return false; - } - } - - return true; - } - void ForwardRequest(TEvKqp::TEvQueryRequest::TPtr& ev) { - if (!ConvertParameters()) - return; - if (!WorkerId) { std::unique_ptr<IActor> workerActor(CreateKqpWorkerActor(SelfId(), SessionId, KqpSettings, Settings, HttpGateway, ModuleResolverState, Counters)); @@ -699,7 +680,6 @@ public: } try { - QueryState->QueryData->ParseParameters(QueryState->GetParameters()); QueryState->QueryData->ParseParameters(QueryState->GetYdbParameters()); } catch(const yexception& ex) { ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what(); diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp index 69cb3fdeb0..ef0956ca99 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp @@ -466,8 +466,7 @@ private: switch (action) { case NKikimrKqp::QUERY_ACTION_EXECUTE: { - auto&& params = *QueryState->RequestEv->Record.MutableRequest()->MutableParameters(); - if (!ExecuteQuery(QueryState->RequestEv->GetQuery(), std::move(params), queryType, QueryState->RequestEv->GetRequestActorId())) { + if (!ExecuteQuery(QueryState->RequestEv->GetQuery(), QueryState->RequestEv->GetYdbParameters(), queryType, QueryState->RequestEv->GetRequestActorId())) { onBadRequest(QueryState->Error); return; } @@ -565,7 +564,7 @@ private: Cleanup(ctx, true); } - bool ExecuteQuery(const TString& query, ::NKikimrMiniKQL::TParams&& parameters, + bool ExecuteQuery(const TString& query, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>& parameters, NKikimrKqp::EQueryType type, const TActorId& requestActorId) { auto statsMode = GetStatsMode(QueryState->RequestEv.get(), EKikimrStatsMode::Basic); @@ -595,7 +594,7 @@ private: execSettings.Deadlines = QueryState->QueryDeadlines; execSettings.RpcCtx = QueryState->RequestEv->GetRequestCtx(); execSettings.StatsMode = statsMode; - QueryState->AsyncQueryResult = KqpHost->ExecuteYqlScript(query, std::move(parameters), execSettings); + QueryState->AsyncQueryResult = KqpHost->ExecuteYqlScript(query, parameters, execSettings); break; } @@ -604,7 +603,7 @@ private: execSettings.Deadlines = QueryState->QueryDeadlines; execSettings.RpcCtx = QueryState->RequestEv->GetRequestCtx(); execSettings.StatsMode = statsMode; - QueryState->AsyncQueryResult = KqpHost->StreamExecuteYqlScript(query, std::move(parameters), + QueryState->AsyncQueryResult = KqpHost->StreamExecuteYqlScript(query, parameters, requestActorId, execSettings); break; } diff --git a/ydb/core/persqueue/writer/source_id_encoding.cpp b/ydb/core/persqueue/writer/source_id_encoding.cpp index 59101a2e5f..e42a9ba7d1 100644 --- a/ydb/core/persqueue/writer/source_id_encoding.cpp +++ b/ydb/core/persqueue/writer/source_id_encoding.cpp @@ -221,6 +221,16 @@ void SetHashToTxParams(NClient::TParameters& parameters, const TEncodedSourceId& } } +void SetHashToTParamsBuilder(NYdb::TParamsBuilder& builder, const TEncodedSourceId& encodedSrcId) { + switch (encodedSrcId.Generation) { + case ESourceIdTableGeneration::PartitionMapping: + builder.AddParam("$Hash").Uint64(encodedSrcId.KeysHash).Build(); + return; + case ESourceIdTableGeneration::SrcIdMeta2: + builder.AddParam("$Hash").Uint32(encodedSrcId.Hash).Build(); + return; + } +} } // NSourceIdEncoding } // NPQ diff --git a/ydb/core/persqueue/writer/source_id_encoding.h b/ydb/core/persqueue/writer/source_id_encoding.h index a879521e61..74e672146c 100644 --- a/ydb/core/persqueue/writer/source_id_encoding.h +++ b/ydb/core/persqueue/writer/source_id_encoding.h @@ -3,6 +3,7 @@ #include <util/generic/fwd.h> #include <util/generic/string.h> #include <ydb/public/lib/deprecated/kicli/kicli.h> +#include <ydb/public/sdk/cpp/client/ydb_params/params.h> namespace NKikimr { namespace NPQ { @@ -39,6 +40,8 @@ struct TEncodedSourceId { void SetHashToTxParams(NClient::TParameters& parameters, const TEncodedSourceId& encodedSrcId); +void SetHashToTParamsBuilder(NYdb::TParamsBuilder& builder, const TEncodedSourceId& encodedSrcId); + TEncodedSourceId EncodeSrcId(const TString& topic, const TString& userSourceId, ESourceIdTableGeneration generation); } // NSourceIdEncoding diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 19b6cbe08a..5742f70258 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -83,7 +83,7 @@ message TQueryRequest { reserved 4; // (deprecated) KqlSettings optional bool KeepSession = 5; reserved 6; // (deprecated) Cluster - optional NKikimrMiniKQL.TParams Parameters = 7; + reserved 7; // (deprecated) Parameters reserved 8; // (deprecated) SqlAutoCommit optional EQueryAction Action = 9; reserved 10; // (deprecated) Profile diff --git a/ydb/core/ymq/actor/cleanup_queue_data.cpp b/ydb/core/ymq/actor/cleanup_queue_data.cpp index 0d94214b85..2ca6558479 100644 --- a/ydb/core/ymq/actor/cleanup_queue_data.cpp +++ b/ydb/core/ymq/actor/cleanup_queue_data.cpp @@ -5,6 +5,7 @@ #include <ydb/core/ymq/queues/common/key_hashes.h> + namespace NKikimr::NSQS { constexpr TDuration LOCK_PERIOD = TDuration::Seconds(30); constexpr TDuration UPDATE_LOCK_PERIOD = TDuration::Seconds(20); @@ -99,9 +100,14 @@ namespace NKikimr::NSQS { LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] getting queues..."); State = state; - NClient::TParameters params; - params["$StartProcessTimestamp"] = StartProcessTimestamp.Seconds(); - params["$NodeId"] = SelfId().NodeId(); + NYdb::TParams params = NYdb::TParamsBuilder() + .AddParam("$StartProcessTimestamp") + .Uint64(StartProcessTimestamp.Seconds()) + .Build() + .AddParam("$NodeId") + .Uint32(SelfId().NodeId()) + .Build() + .Build(); RunYqlQuery(SelectQueuesQuery, std::move(params), true, sendAfter, Cfg().GetRoot(), ctx); } @@ -173,11 +179,19 @@ namespace NKikimr::NSQS { void TCleanupQueueDataActor::LockQueueToRemove(TDuration runAfter, const TActorContext& ctx) { State = EState::LockQueue; - NClient::TParameters params; StartProcessTimestamp = ctx.Now(); - params["$StartProcessTimestamp"] = StartProcessTimestamp.Seconds(); - params["$LockFreeTimestamp"] = (StartProcessTimestamp - LOCK_PERIOD).Seconds(); - params["$NodeId"] = SelfId().NodeId(); + + NYdb::TParams params = NYdb::TParamsBuilder() + .AddParam("$StartProcessTimestamp") + .Uint64(StartProcessTimestamp.Seconds()) + .Build() + .AddParam("$LockFreeTimestamp") + .Uint64((StartProcessTimestamp - LOCK_PERIOD).Seconds()) + .Build() + .AddParam("$NodeId") + .Uint32(SelfId().NodeId()) + .Build() + .Build(); RunYqlQuery(LockQueueQuery, std::move(params), false, runAfter, Cfg().GetRoot(), ctx); } @@ -186,13 +200,28 @@ namespace NKikimr::NSQS { LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] update queue lock..."); State = EState::UpdateLockQueue; - NClient::TParameters params; - params["$StartProcessTimestamp"] = StartProcessTimestamp.Seconds(); - params["$NodeId"] = SelfId().NodeId(); - params["$RemoveTimestamp"] = RemoveQueueTimetsamp; - params["$QueueIdNumber"] = QueueIdNumber; + NYdb::TParamsBuilder paramsBuilder; + paramsBuilder + .AddParam("$StartProcessTimestamp") + .Uint64(StartProcessTimestamp.Seconds()) + .Build() + .AddParam("$NodeId") + .Uint32(SelfId().NodeId()) + .Build() + .AddParam("$RemoveTimestamp") + .Uint64(RemoveQueueTimetsamp) + .Build() + .AddParam("$QueueIdNumber") + .Uint64(QueueIdNumber) + .Build(); + StartProcessTimestamp = ctx.Now(); - params["$Now"] = StartProcessTimestamp.Seconds(); + paramsBuilder + .AddParam("$Now") + .Uint64(StartProcessTimestamp.Seconds()) + .Build(); + + NYdb::TParams params = paramsBuilder.Build(); RunYqlQuery(UpdateLockQueueQuery, std::move(params), false, TDuration::Zero(), Cfg().GetRoot(), ctx); } @@ -252,9 +281,15 @@ namespace NKikimr::NSQS { void TCleanupQueueDataActor::Finish(const TActorContext& ctx) { State = EState::Finish; - NClient::TParameters params; - params["$RemoveTimestamp"] = RemoveQueueTimetsamp; - params["$QueueIdNumber"] = QueueIdNumber; + NYdb::TParams params = NYdb::TParamsBuilder() + .AddParam("$RemoveTimestamp") + .Uint64(RemoveQueueTimetsamp) + .Build() + .AddParam("$QueueIdNumber") + .Uint64(QueueIdNumber) + .Build() + .Build(); + RunYqlQuery(RemoveQueueFromListQuery, std::move(params), false, TDuration::Zero(), Cfg().GetRoot(), ctx); } @@ -272,12 +307,22 @@ namespace NKikimr::NSQS { return; } - NClient::TParameters params; ui32 shard = ShardsToRemove ? (ShardsToRemove - 1) : 0; - params["$QueueIdNumberAndShardHash"] = GetKeysHash(QueueIdNumber, shard); - params["$Shard"] = shard; - params["$QueueIdNumberHash"] = GetKeysHash(QueueIdNumber); - params["$QueueIdNumber"] = QueueIdNumber; + + NYdb::TParams params = NYdb::TParamsBuilder() + .AddParam("$QueueIdNumberAndShardHash") + .Uint64(GetKeysHash(QueueIdNumber, shard)) + .Build() + .AddParam("$Shard") + .Uint32(shard) + .Build() + .AddParam("$QueueIdNumberHash") + .Uint64(GetKeysHash(QueueIdNumber)) + .Build() + .AddParam("$QueueIdNumber") + .Uint64(QueueIdNumber) + .Build() + .Build(); RunYqlQuery(RemoveDataQuery, std::move(params), false, TDuration::Zero(), Cfg().GetRoot(), ctx); } diff --git a/ydb/core/ymq/actor/cleanup_queue_data.h b/ydb/core/ymq/actor/cleanup_queue_data.h index 5867fe613d..0104276f03 100644 --- a/ydb/core/ymq/actor/cleanup_queue_data.h +++ b/ydb/core/ymq/actor/cleanup_queue_data.h @@ -2,8 +2,10 @@ #include <ydb/core/kqp/common/kqp.h>
#include <ydb/public/lib/deprecated/kicli/kicli.h>
+#include <ydb/public/sdk/cpp/client/ydb_params/params.h>
#include <ydb/core/ymq/base/counters.h>
+
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/actorsystem.h>
diff --git a/ydb/core/ymq/actor/index_events_processor.cpp b/ydb/core/ymq/actor/index_events_processor.cpp index e1fd24587e..ba57429a3d 100644 --- a/ydb/core/ymq/actor/index_events_processor.cpp +++ b/ydb/core/ymq/actor/index_events_processor.cpp @@ -134,9 +134,14 @@ void TSearchEventsProcessor::RunQueuesListQuery(const TActorContext& ctx) { request->SetKeepSession(false); request->SetPreparedQuery(SelectQueuesQuery); - NClient::TParameters params; - params["$Account"] = LastQueuesKey.Account; - params["$QueueName"] = LastQueuesKey.QueueName; + NYdb::TParams params = NYdb::TParamsBuilder() + .AddParam("$Account") + .Utf8(LastQueuesKey.Account) + .Build() + .AddParam("$QueueName") + .Utf8(LastQueuesKey.QueueName) + .Build() + .Build(); RunQuery(SelectQueuesQuery, ¶ms, true, ctx); } @@ -209,16 +214,29 @@ void TSearchEventsProcessor::OnEventsListingDone(NKqp::TEvKqp::TEvQueryResponse: void TSearchEventsProcessor::RunEventsCleanup(const TActorContext& ctx) { State = EState::CleanupExecute; - NClient::TParameters params; - auto param = params["$Events"]; + NYdb::TParamsBuilder paramsBuilder; + + auto& param = paramsBuilder.AddParam("$Events"); + param.BeginList(); + for (const auto&[qName, events] : QueuesEvents) { for (const auto&[_, event]: events) { - auto item = param.AddListItem(); - item["Account"] = event.CloudId; - item["QueueName"] = qName; - item["EventType"] = static_cast<ui64>(event.Type); + param.AddListItem() + .BeginStruct() + .AddMember("Account") + .Utf8(event.CloudId) + .AddMember("QueueName") + .Utf8(qName) + .AddMember("EventType") + .Uint64(static_cast<ui64>(event.Type)) + .EndStruct(); } } + param.EndList(); + param.Build(); + + auto params = paramsBuilder.Build(); + RunQuery(DeleteEventQuery, ¶ms, false, ctx); } @@ -301,7 +319,7 @@ void TSearchEventsProcessor::StopSession(const TActorContext& ctx) { } } -void TSearchEventsProcessor::RunQuery(const TString& query, NKikimr::NClient::TParameters* params, bool readonly, +void TSearchEventsProcessor::RunQuery(const TString& query, NYdb::TParams* params, bool readonly, const TActorContext& ctx) { auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); auto* request = ev->Record.MutableRequest(); @@ -326,7 +344,7 @@ void TSearchEventsProcessor::RunQuery(const TString& query, NKikimr::NClient::TP } request->MutableTxControl()->set_commit_tx(true); if (params != nullptr) { - request->MutableParameters()->Swap(params); + request->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(*params))); } Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); diff --git a/ydb/core/ymq/actor/index_events_processor.h b/ydb/core/ymq/actor/index_events_processor.h index cc8f902d1d..29767c6c15 100644 --- a/ydb/core/ymq/actor/index_events_processor.h +++ b/ydb/core/ymq/actor/index_events_processor.h @@ -5,6 +5,7 @@ #include <ydb/core/kqp/common/kqp.h> #include <ydb/public/lib/deprecated/kicli/kicli.h> +#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> @@ -94,7 +95,7 @@ private: void RunEventsCleanup(const TActorContext& ctx); void OnCleanupQueryComplete(const TActorContext&ctx); - void RunQuery(const TString& query, NKikimr::NClient::TParameters* params, bool readonly, + void RunQuery(const TString& query, NYdb::TParams* params, bool readonly, const TActorContext& ctx); void ProcessEventsQueue(const TActorContext& ctx); diff --git a/ydb/core/ymq/base/run_query.cpp b/ydb/core/ymq/base/run_query.cpp index 0394e19acd..fdc2fff098 100644 --- a/ydb/core/ymq/base/run_query.cpp +++ b/ydb/core/ymq/base/run_query.cpp @@ -6,7 +6,7 @@ namespace NKikimr::NSQS { void RunYqlQuery(
const TString& query,
- std::optional<NKikimr::NClient::TParameters> params,
+ std::optional<NYdb::TParams> params,
bool readonly,
TDuration sendAfter,
const TString& database,
@@ -34,7 +34,7 @@ namespace NKikimr::NSQS { request->MutableTxControl()->set_commit_tx(true);
if (params) {
- request->MutableParameters()->Swap(¶ms.value());
+ request->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params.value())));
}
auto kqpActor = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
diff --git a/ydb/core/ymq/base/run_query.h b/ydb/core/ymq/base/run_query.h index 9d7ce8e619..65bf22ab5a 100644 --- a/ydb/core/ymq/base/run_query.h +++ b/ydb/core/ymq/base/run_query.h @@ -2,6 +2,7 @@ #include <ydb/core/kqp/common/kqp.h>
#include <ydb/public/lib/deprecated/kicli/kicli.h>
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <library/cpp/actors/core/actor.h>
@@ -10,7 +11,7 @@ namespace NKikimr::NSQS { void RunYqlQuery(
const TString& query,
- std::optional<NKikimr::NClient::TParameters> params,
+ std::optional<NYdb::TParams> params,
bool readonly,
TDuration sendAfter,
const TString& database,
diff --git a/ydb/public/sdk/cpp/client/ydb_proto/accessor.cpp b/ydb/public/sdk/cpp/client/ydb_proto/accessor.cpp index ddaa380e30..f51b810160 100644 --- a/ydb/public/sdk/cpp/client/ydb_proto/accessor.cpp +++ b/ydb/public/sdk/cpp/client/ydb_proto/accessor.cpp @@ -27,6 +27,10 @@ const ::google::protobuf::Map<TString, Ydb::TypedValue>& TProtoAccessor::GetProt return params.GetProtoMap(); } +::google::protobuf::Map<TString, Ydb::TypedValue>* TProtoAccessor::GetProtoMapPtr(TParams& params) { + return params.GetProtoMapPtr(); +} + template Ydb::Export::ExportToS3Settings::Scheme TProtoAccessor::GetProto<Ydb::Export::ExportToS3Settings>(ES3Scheme value); template Ydb::Import::ImportFromS3Settings::Scheme TProtoAccessor::GetProto<Ydb::Import::ImportFromS3Settings>(ES3Scheme value); diff --git a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h index 541fc54809..e2dda78f2b 100644 --- a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h +++ b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h @@ -38,6 +38,7 @@ public: static const Ydb::Value& GetProto(const TValue& value); static const Ydb::ResultSet& GetProto(const TResultSet& resultSet); static const ::google::protobuf::Map<TString, Ydb::TypedValue>& GetProtoMap(const TParams& params); + static ::google::protobuf::Map<TString, Ydb::TypedValue>* GetProtoMapPtr(TParams& params); static const Ydb::TableStats::QueryStats& GetProto(const NTable::TQueryStats& queryStats); static const Ydb::Table::DescribeTableResult& GetProto(const NTable::TTableDescription& tableDescription); static const Ydb::Topic::DescribeTopicResult& GetProto(const NYdb::NTopic::TTopicDescription& topicDescription); diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index 0f6933c4d6..df28db4e2d 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -13,6 +13,7 @@ #include <ydb/core/persqueue/write_meta.h> #include <ydb/library/services/services.pb.h> #include <ydb/public/lib/deprecated/kicli/kicli.h> +#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> #include <ydb/library/persqueue/topic_parser/topic_parser.h> #include <ydb/services/lib/sharding/sharding.h> #include <library/cpp/actors/core/log.h> @@ -760,13 +761,23 @@ void TWriteSessionActor<UseMigrationProtocol>::SendSelectPartitionRequest(const ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); // keep compiled query in cache. ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); - NClient::TParameters parameters; - SetHashToTxParams(parameters, EncodedSourceId); - parameters["$Topic"] = topic; - parameters["$SourceId"] = EncodedSourceId.EscapedSourceId; + NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder(); + + SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId); + + paramsBuilder + .AddParam("$Topic") + .Utf8(topic) + .Build() + .AddParam("$SourceId") + .Utf8(EncodedSourceId.EscapedSourceId) + .Build(); + + NYdb::TParams params = paramsBuilder.Build(); + + ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params))); - ev->Record.MutableRequest()->MutableParameters()->Swap(¶meters); ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); SelectSrcIdsInflight++; } @@ -950,16 +961,30 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor<UseMigrationProtocol>: // keep compiled query in cache. ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); - NClient::TParameters parameters; - SetHashToTxParams(parameters, EncodedSourceId); - //parameters["$Hash"] = hash; - parameters["$Topic"] = topic; - parameters["$SourceId"] = EncodedSourceId.EscapedSourceId; - - parameters["$CreateTime"] = SourceIdCreateTime; - parameters["$AccessTime"] = TInstant::Now().MilliSeconds(); - parameters["$Partition"] = Partition; - ev->Record.MutableRequest()->MutableParameters()->Swap(¶meters); + NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder(); + + SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId); + + paramsBuilder + .AddParam("$Topic") + .Utf8(topic) + .Build() + .AddParam("$SourceId") + .Utf8(EncodedSourceId.EscapedSourceId) + .Build() + .AddParam("$CreateTime") + .Uint64(SourceIdCreateTime) + .Build() + .AddParam("$AccessTime") + .Uint64(TInstant::Now().MilliSeconds()) + .Build() + .AddParam("$Partition") + .Uint32(Partition) + .Build(); + + NYdb::TParams params = paramsBuilder.Build(); + + ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params))); return ev; } |