diff options
author | zverevgeny <zverevgeny@ydb.tech> | 2024-06-18 11:08:28 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-18 11:08:28 +0300 |
commit | 82687fa128f0ac3fd261815b98d88cbbc426e2ab (patch) | |
tree | 5bc517a9fc804b5a4366bd8d7c5676430e72a077 | |
parent | d23ebbe747a462f1bb29afaf810ff41cb1d4cd96 (diff) | |
download | ydb-82687fa128f0ac3fd261815b98d88cbbc426e2ab.tar.gz |
YQ-2068 use HashMap for lookup request and result (#5594)
10 files changed, 140 insertions, 149 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index bed2fe2dd0..85a60599bd 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -202,40 +202,50 @@ struct IDqComputeActorAsyncOutput { }; struct IDqAsyncLookupSource { + using TKeyTypeHelper = NKikimr::NMiniKQL::TKeyTypeContanerHelper<true, true, false>; + using TUnboxedValueMap = THashMap< + NUdf::TUnboxedValue, + NUdf::TUnboxedValue, + NKikimr::NMiniKQL::TValueHasher, + NKikimr::NMiniKQL::TValueEqual, + NKikimr::NMiniKQL::TMKQLAllocator<std::pair<const NUdf::TUnboxedValue, NUdf::TUnboxedValue>> + >; struct TEvLookupRequest: NActors::TEventLocal<TEvLookupRequest, TDqComputeEvents::EvLookupRequest> { - TEvLookupRequest(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, NKikimr::NMiniKQL::TUnboxedValueVector&& keys) + TEvLookupRequest(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& request) : Alloc(alloc) - , Keys(std::move(keys)) + , Request(std::move(request)) { } ~TEvLookupRequest() { auto guard = Guard(*Alloc); - Keys = NKikimr::NMiniKQL::TUnboxedValueVector{}; + TKeyTypeHelper empty; + Request = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()}; } - std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; - NKikimr::NMiniKQL::TUnboxedValueVector Keys; + TUnboxedValueMap Request; }; + struct TEvLookupResult: NActors::TEventLocal<TEvLookupResult, TDqComputeEvents::EvLookupResult> { - TEvLookupResult(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, NKikimr::NMiniKQL::TKeyPayloadPairVector&& data) + TEvLookupResult(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& result) : Alloc(alloc) - , Data(std::move(data)) + , Result(std::move(result)) { } ~TEvLookupResult() { auto guard = Guard(*Alloc.get()); - Data = NKikimr::NMiniKQL::TKeyPayloadPairVector{}; + TKeyTypeHelper empty; + Result = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()}; } std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; - NKikimr::NMiniKQL::TKeyPayloadPairVector Data; + TUnboxedValueMap Result; }; virtual size_t GetMaxSupportedKeysInRequest() const = 0; //Initiate lookup for requested keys //Only one request at a time is allowed. Request must contain no more than GetMaxSupportedKeysInRequest() keys - //Upon completion, results are sent in a TEvLookupResult to the preconfigured actor - virtual void AsyncLookup(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) = 0; + //Upon completion, results are sent in TEvLookupResult event to the preconfigured actor + virtual void AsyncLookup(TUnboxedValueMap&& request) = 0; protected: ~IDqAsyncLookupSource() {} }; @@ -266,6 +276,7 @@ public: struct TLookupSourceArguments { std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; + std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> KeyTypeHelper; NActors::TActorId ParentId; google::protobuf::Any LookupSource; //provider specific data source const NKikimr::NMiniKQL::TStructType* KeyType; diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index 19d4b770eb..38576688ca 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -51,7 +51,7 @@ public: , LookupJoinColumns(std::move(lookupJoinColumns)) , InputRowType(inputRowType) , LookupKeyType(lookupKeyType) - , KeyTypeHelper(lookupKeyType) + , KeyTypeHelper(std::make_shared<IDqAsyncLookupSource::TKeyTypeHelper>(lookupKeyType)) , LookupPayloadType(lookupPayloadType) , OutputRowType(outputRowType) , OutputRowColumnOrder(outputRowColumnOrder) @@ -67,6 +67,7 @@ public: Become(&TInputTransformStreamLookupBase::StateFunc); NDq::IDqAsyncIoFactory::TLookupSourceArguments lookupSourceArgs { .Alloc = Alloc, + .KeyTypeHelper = KeyTypeHelper, .ParentId = SelfId(), .LookupSource = Settings.GetRightSource().GetLookupSource(), .KeyType = LookupKeyType, @@ -90,22 +91,7 @@ private: //events void Handle(IDqAsyncLookupSource::TEvLookupResult::TPtr ev) { auto guard = BindAllocator(); - THashMap< - NUdf::TUnboxedValue, - NUdf::TUnboxedValue, - NKikimr::NMiniKQL::TValueHasher, - NKikimr::NMiniKQL::TValueEqual, - NKikimr::NMiniKQL::TMKQLAllocator<std::pair<const NUdf::TUnboxedValue, NUdf::TUnboxedValue>> - > map( - ev->Get()->Data.size(), - KeyTypeHelper.GetValueHash(), - KeyTypeHelper.GetValueEqual() - ); - for (auto& r: ev->Get()->Data) { - Y_ABORT_UNLESS(r.first.IsBoxed()); - Y_ABORT_UNLESS(!r.second || r.second.IsBoxed()); - map.emplace(std::move(r)); - } + const auto lookupResult = std::move(ev->Get()->Result); while (!AwaitingQueue.empty()) { const auto wideInputRow = AwaitingQueue.Head(); NUdf::TUnboxedValue* keyItems; @@ -113,7 +99,7 @@ private: //events for (size_t i = 0; i != InputJoinColumns.size(); ++i) { keyItems[i] = wideInputRow[InputJoinColumns[i]]; } - auto lookupPayload = map.FindPtr(lookupKey); + auto lookupPayload = lookupResult.FindPtr(lookupKey); NUdf::TUnboxedValue* outputRowItems; NUdf::TUnboxedValue outputRow = HolderFactory.CreateDirectArrayHolder(OutputRowColumnOrder.size(), outputRowItems); @@ -163,7 +149,7 @@ private: //IDqComputeActorAsyncInput auto guard = BindAllocator(); //All resources, held by this class, that have been created with mkql allocator, must be deallocated here InputFlow.Clear(); - KeyTypeHelper = TKeyTypeHelper{}; + KeyTypeHelper.reset(); NMiniKQL::TUnboxedValueBatch{}.swap(AwaitingQueue); NMiniKQL::TUnboxedValueBatch{}.swap(ReadyQueue); } @@ -180,21 +166,21 @@ private: //IDqComputeActorAsyncInput NUdf::TUnboxedValue* inputRowItems; NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems); const auto maxKeysInRequest = LookupSource.first->GetMaxSupportedKeysInRequest(); - NKikimr::NMiniKQL::TUnboxedValueVector keysForLookup; + IDqAsyncLookupSource::TUnboxedValueMap keysForLookup{maxKeysInRequest, KeyTypeHelper->GetValueHash(), KeyTypeHelper->GetValueEqual()}; while ( ((InputFlowFetchStatus = FetchWideInputValue(inputRowItems)) == NUdf::EFetchStatus::Ok) && (keysForLookup.size() < maxKeysInRequest) ) { NUdf::TUnboxedValue* keyItems; - auto key = HolderFactory.CreateDirectArrayHolder(InputJoinColumns.size(), keyItems); + NUdf::TUnboxedValue key = HolderFactory.CreateDirectArrayHolder(InputJoinColumns.size(), keyItems); for (size_t i = 0; i != InputJoinColumns.size(); ++i) { keyItems[i] = inputRowItems[InputJoinColumns[i]]; } - keysForLookup.push_back(key); + keysForLookup.emplace(std::move(key), NUdf::TUnboxedValue{}); AwaitingQueue.PushRow(inputRowItems, InputRowType->GetElementsCount()); } if (!keysForLookup.empty()) { - LookupSource.first->AsyncLookup(keysForLookup); + LookupSource.first->AsyncLookup(std::move(keysForLookup)); WaitingForLookupResults = true; } } @@ -241,8 +227,7 @@ protected: const TVector<size_t> LookupJoinColumns; const NMiniKQL::TMultiType* const InputRowType; const NMiniKQL::TStructType* const LookupKeyType; //key column types in LookupTable - using TKeyTypeHelper = NKikimr::NMiniKQL::TKeyTypeContanerHelper<true, true, false>; - TKeyTypeHelper KeyTypeHelper; + std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> KeyTypeHelper; const NMiniKQL::TStructType* const LookupPayloadType; //other column types in LookupTable const NMiniKQL::TMultiType* const OutputRowType; const TOutputRowColumnOrder OutputRowColumnOrder; diff --git a/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp b/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp index 722fd64fcd..0f294ca056 100644 --- a/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp +++ b/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp @@ -21,24 +21,30 @@ using namespace NActors; Y_UNIT_TEST_SUITE(GenericProviderLookupActor) { + NYql::NUdf::TUnboxedValue CreateStructValue(NKikimr::NMiniKQL::THolderFactory& holderFactory, std::initializer_list<ui64> members) { + NYql::NUdf::TUnboxedValue* items; + NYql::NUdf::TUnboxedValue result = holderFactory.CreateDirectArrayHolder(members.size(), items); + for (size_t i = 0; i != members.size(); ++i) { + items[i] = NYql::NUdf::TUnboxedValuePod{*(members.begin() + i)}; + } + return result; + } + //Simple actor to call IDqAsyncLookupSource::AsyncLookup from an actor system's thread class TCallLookupActor: public TActorBootstrapped<TCallLookupActor> { public: TCallLookupActor( std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, NYql::NDq::IDqAsyncLookupSource* lookupSource, - NKikimr::NMiniKQL::TUnboxedValueVector&& keysToLookUp) + NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap&& request) : Alloc(alloc) , LookupSource(lookupSource) - , KeysToLookUp(std::move(keysToLookUp)) + , Request(std::move(request)) { } void Bootstrap() { - LookupSource->AsyncLookup(std::move(KeysToLookUp)); - auto guard = Guard(*Alloc); - KeysToLookUp.clear(); - KeysToLookUp.shrink_to_fit(); + LookupSource->AsyncLookup(std::move(Request)); } private: @@ -47,7 +53,7 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) { private: std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; NYql::NDq::IDqAsyncLookupSource* LookupSource; - NKikimr::NMiniKQL::TUnboxedValueVector KeysToLookUp; + NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap Request; }; Y_UNIT_TEST(Lookup) { @@ -94,8 +100,8 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) { .Disjunction() .Operand() .Conjunction() - .Operand().Equal().Column("id").Value<ui64>(0).Done().Done() - .Operand().Equal().Column("optional_id").OptionalValue<ui64>(100).Done().Done() + .Operand().Equal().Column("id").Value<ui64>(2).Done().Done() + .Operand().Equal().Column("optional_id").OptionalValue<ui64>(102).Done().Done() .Done() .Done() .Operand() @@ -106,8 +112,8 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) { .Done() .Operand() .Conjunction() - .Operand().Equal().Column("id").Value<ui64>(2).Done().Done() - .Operand().Equal().Column("optional_id").OptionalValue<ui64>(102).Done().Done() + .Operand().Equal().Column("id").Value<ui64>(0).Done().Done() + .Operand().Equal().Column("optional_id").OptionalValue<ui64>(100).Done().Done() .Done() .Done() .Done() @@ -153,12 +159,14 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) { outputypeBuilder.Add("string_value", typeBuilder.NewDataType(NYql::NUdf::EDataSlot::String, true)); auto guard = Guard(*alloc.get()); + auto keyTypeHelper = std::make_shared<NYql::NDq::IDqAsyncLookupSource::TKeyTypeHelper>(keyTypeBuilder.Build()); auto [lookupSource, actor] = NYql::NDq::CreateGenericLookupActor( connectorMock, std::make_shared<NYql::NTestCreds::TSecuredServiceAccountCredentialsFactory>(), edge, alloc, + keyTypeHelper, std::move(lookupSourceSettings), keyTypeBuilder.Build(), outputypeBuilder.Build(), @@ -167,45 +175,41 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) { 1'000'000); runtime.Register(actor); - NKikimr::NMiniKQL::TUnboxedValueVector keys; + NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap request(3, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual()); for (size_t i = 0; i != 3; ++i) { NYql::NUdf::TUnboxedValue* keyItems; auto key = holderFactory.CreateDirectArrayHolder(2, keyItems); keyItems[0] = NYql::NUdf::TUnboxedValuePod(ui64(i)); keyItems[1] = NYql::NUdf::TUnboxedValuePod(ui64(100 + i)); - keys.push_back(std::move(key)); + request.emplace(std::move(key), NYql::NUdf::TUnboxedValue{}); } guard.Release(); //let actors use alloc - auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(keys)); + auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(request)); runtime.Register(callLookupActor); auto ev = runtime.GrabEdgeEventRethrow<NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge); auto guard2 = Guard(*alloc.get()); - NKikimr::NMiniKQL::TKeyPayloadPairVector lookupResult = std::move(ev->Get()->Data); + auto lookupResult = std::move(ev->Get()->Result); UNIT_ASSERT_EQUAL(3, lookupResult.size()); { - auto& [k, v] = lookupResult[0]; - UNIT_ASSERT_EQUAL(0, k.GetElement(0).Get<ui64>()); - UNIT_ASSERT_EQUAL(100, k.GetElement(1).Get<ui64>()); - NYql::NUdf::TUnboxedValue val = v.GetElement(0); + const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {0, 100})); + UNIT_ASSERT(v); + NYql::NUdf::TUnboxedValue val = v->GetElement(0); UNIT_ASSERT(val.AsStringRef() == TStringBuf("a")); } { - auto& [k, v] = lookupResult[1]; - UNIT_ASSERT_EQUAL(1, k.GetElement(0).Get<ui64>()); - UNIT_ASSERT_EQUAL(101, k.GetElement(1).Get<ui64>()); - NYql::NUdf::TUnboxedValue val = v.GetElement(0); + const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {1, 101})); + UNIT_ASSERT(v); + NYql::NUdf::TUnboxedValue val = v->GetElement(0); UNIT_ASSERT(val.AsStringRef() == TStringBuf("b")); } { - auto& [k, v] = lookupResult[2]; - UNIT_ASSERT_EQUAL(2, k.GetElement(0).Get<ui64>()); - UNIT_ASSERT_EQUAL(102, k.GetElement(1).Get<ui64>()); - //this key was not found and reported as empty - UNIT_ASSERT(!v); + const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {2, 102})); + UNIT_ASSERT(v); + UNIT_ASSERT(!*v); } } diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp index acd025ce7f..956a0b7c30 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp @@ -65,6 +65,7 @@ namespace NYql::NDq { TGenericTokenProvider::TPtr tokenProvider, NActors::TActorId&& parentId, std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, + std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> keyTypeHelper, NYql::Generic::TLookupSource&& lookupSource, const NKikimr::NMiniKQL::TStructType* keyType, const NKikimr::NMiniKQL::TStructType* payloadType, @@ -75,6 +76,7 @@ namespace NYql::NDq { , TokenProvider(std::move(tokenProvider)) , ParentId(std::move(parentId)) , Alloc(alloc) + , KeyTypeHelper(keyTypeHelper) , LookupSource(std::move(lookupSource)) , KeyType(keyType) , PayloadType(payloadType) @@ -82,17 +84,18 @@ namespace NYql::NDq { , HolderFactory(holderFactory) , ColumnDestinations(CreateColumnDestination()) , MaxKeysInRequest(maxKeysInRequest) - , KeyTypeHelper(keyType) - , RequestedKeys(1000, - KeyTypeHelper.GetValueHash(), - KeyTypeHelper.GetValueEqual()) + , Request( + 0, + KeyTypeHelper->GetValueHash(), + KeyTypeHelper->GetValueEqual()) { } ~TGenericLookupActor() { auto guard = Guard(*Alloc); - KeyTypeHelper = TKeyTypeHelper{}; - RequestedKeys = TRequestedKeys(0, KeyTypeHelper.GetValueHash(), KeyTypeHelper.GetValueEqual()); + KeyTypeHelper.reset(); + TKeyTypeHelper empty; + Request = IDqAsyncLookupSource::TUnboxedValueMap(0, empty.GetValueHash(), empty.GetValueEqual()); } void Bootstrap() { @@ -113,9 +116,9 @@ namespace NYql::NDq { size_t GetMaxSupportedKeysInRequest() const override { return MaxKeysInRequest; } - void AsyncLookup(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) override { + void AsyncLookup(IDqAsyncLookupSource::TUnboxedValueMap&& request) override { auto guard = Guard(*Alloc); - CreateRequest(keys); + CreateRequest(std::move(request)); } private: //events @@ -187,15 +190,14 @@ namespace NYql::NDq { } private: - void CreateRequest(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) { - YQL_CLOG(INFO, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << keys.size() << " keys"; + void CreateRequest(IDqAsyncLookupSource::TUnboxedValueMap&& request) { + YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request.size() << " keys"; Y_ABORT_IF(InProgress); - Y_ABORT_IF(keys.size() > MaxKeysInRequest); + Y_ABORT_IF(request.size() == 0 || request.size() > MaxKeysInRequest); + Request = std::move(request); NConnector::NApi::TListSplitsRequest splitRequest; - *splitRequest.add_selects() = CreateSelect(keys); + *splitRequest.add_selects() = CreateSelect(); splitRequest.Setmax_split_count(1); - Y_ABORT_UNLESS(RequestedKeys.empty()); - RequestedKeys.insert(keys.begin(), keys.end()); Connector->ListSplits(splitRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) { auto result = ExtractFromConstFuture(asyncResult); if (result.Status.Ok()) { @@ -256,21 +258,16 @@ namespace NYql::NDq { for (size_t j = 0; j != columns.size(); ++j) { (ColumnDestinations[j].first == EColumnDestination::Key ? keyItems : outputItems)[ColumnDestinations[j].second] = columns[j][i]; } - if (auto it = RequestedKeys.find(key); it != RequestedKeys.end()) { //remove duplicatas in lookup results - LookupResult.emplace_back(std::move(key), std::move(output)); - RequestedKeys.erase(it); + if (auto* v = Request.FindPtr(key)) { + *v = std::move(output); //duplicates will be overwritten } } } void FinalizeRequest() { - YQL_CLOG(INFO, ProviderGeneric) << "Sending lookup results with " << LookupResult.size() << " filled rows, " << RequestedKeys.size() << " empty rows"; + YQL_CLOG(DEBUG, ProviderGeneric) << "Sending lookup results for " << Request.size() << " keys"; auto guard = Guard(*Alloc); - for (auto&& k : RequestedKeys) { - LookupResult.emplace_back(std::move(k), NUdf::TUnboxedValue{}); - } - RequestedKeys.clear(); - auto ev = new IDqAsyncLookupSource::TEvLookupResult(Alloc, std::move(LookupResult)); + auto ev = new IDqAsyncLookupSource::TEvLookupResult(Alloc, std::move(Request)); TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(ParentId, SelfId(), ev)); LookupResult = {}; ReadSplitsIterator = {}; @@ -325,7 +322,7 @@ namespace NYql::NDq { return dsi; } - NConnector::NApi::TSelect CreateSelect(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) { + NConnector::NApi::TSelect CreateSelect() { NConnector::NApi::TSelect select; *select.mutable_data_source_instance() = GetDataSourceInstanceWithToken(); @@ -338,7 +335,7 @@ namespace NYql::NDq { select.mutable_from()->Settable(LookupSource.table()); NConnector::NApi::TPredicate_TDisjunction disjunction; - for (const auto& k : keys) { + for (const auto& [k, _] : Request) { NConnector::NApi::TPredicate_TConjunction conjunction; for (ui32 c = 0; c != KeyType->GetMembersCount(); ++c) { NConnector::NApi::TPredicate_TComparison eq; @@ -360,6 +357,7 @@ namespace NYql::NDq { TGenericTokenProvider::TPtr TokenProvider; const NActors::TActorId ParentId; std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; + std::shared_ptr<TKeyTypeHelper> KeyTypeHelper; const NYql::Generic::TLookupSource LookupSource; const NKikimr::NMiniKQL::TStructType* const KeyType; const NKikimr::NMiniKQL::TStructType* const PayloadType; @@ -368,14 +366,7 @@ namespace NYql::NDq { const std::vector<std::pair<EColumnDestination, size_t>> ColumnDestinations; const size_t MaxKeysInRequest; std::atomic_bool InProgress; - using TKeyTypeHelper = NKikimr::NMiniKQL::TKeyTypeContanerHelper<true, true, false>; - TKeyTypeHelper KeyTypeHelper; - using TRequestedKeys = std::unordered_set< - NUdf::TUnboxedValue, - NKikimr::NMiniKQL::TValueHasher, - NKikimr::NMiniKQL::TValueEqual, - NKikimr::NMiniKQL::TMKQLAllocator<NUdf::TUnboxedValue>>; - TRequestedKeys RequestedKeys; + IDqAsyncLookupSource::TUnboxedValueMap Request; NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; //TODO move me to TEvReadSplitsPart NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult; }; @@ -385,6 +376,7 @@ namespace NYql::NDq { ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, NActors::TActorId parentId, std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, + std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> keyTypeHelper, NYql::Generic::TLookupSource&& lookupSource, const NKikimr::NMiniKQL::TStructType* keyType, const NKikimr::NMiniKQL::TStructType* payloadType, @@ -399,6 +391,7 @@ namespace NYql::NDq { std::move(tokenProvider), std::move(parentId), alloc, + keyTypeHelper, std::move(lookupSource), keyType, payloadType, diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h index cd8448e3a1..9f8c0c268f 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h +++ b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h @@ -16,6 +16,7 @@ namespace NYql::NDq { ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, NActors::TActorId parentId, std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, + std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> keyTypeHelper, NYql::Generic::TLookupSource&& lookupSource, const NKikimr::NMiniKQL::TStructType* keyType, const NKikimr::NMiniKQL::TStructType* payloadType, diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp index 3d154ef99b..e9b2b8bf8b 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp @@ -23,6 +23,7 @@ namespace NYql::NDq { credentialsFactory, std::move(args.ParentId), args.Alloc, + args.KeyTypeHelper, std::move(lookupSource), args.KeyType, args.PayloadType, diff --git a/ydb/library/yql/providers/yt/actors/ut/yql_yt_lookup_actor_ut.cpp b/ydb/library/yql/providers/yt/actors/ut/yql_yt_lookup_actor_ut.cpp index 1e9ba6b9ff..03f77cf2b2 100644 --- a/ydb/library/yql/providers/yt/actors/ut/yql_yt_lookup_actor_ut.cpp +++ b/ydb/library/yql/providers/yt/actors/ut/yql_yt_lookup_actor_ut.cpp @@ -49,18 +49,16 @@ public: TCallLookupActor( std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const NActors::TActorId& lookupActor, - NKikimr::NMiniKQL::TUnboxedValueVector&& keysToLookUp) + NDq::IDqAsyncLookupSource::TUnboxedValueMap&& request) : Alloc(alloc) , LookupActor(lookupActor) - , KeysToLookUp(std::move(keysToLookUp)) + , Request(std::move(request)) { } void Bootstrap() { - auto ev = new NDq::IDqAsyncLookupSource::TEvLookupRequest(Alloc, std::move(KeysToLookUp)); + auto ev = new NDq::IDqAsyncLookupSource::TEvLookupRequest(Alloc, std::move(Request)); TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(LookupActor, SelfId(), ev)); - auto guard = Guard(*Alloc); - KeysToLookUp = NKikimr::NMiniKQL::TUnboxedValueVector{}; } private: @@ -69,7 +67,7 @@ private: private: std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; const NActors::TActorId LookupActor; - NKikimr::NMiniKQL::TUnboxedValueVector KeysToLookUp; + NDq::IDqAsyncLookupSource::TUnboxedValueMap Request; }; Y_UNIT_TEST(Lookup) { @@ -127,10 +125,12 @@ Y_UNIT_TEST(Lookup) { mapping ); auto guard = Guard(*alloc.get()); + auto keyTypeHelper = std::make_shared<NDq::IDqAsyncLookupSource::TKeyTypeHelper>(keyTypeBuilder.Build()); auto [_, lookupActor] = NYql::NDq::CreateYtLookupActor( ytServices, edge, alloc, + keyTypeHelper, *functionRegistry, std::move(source), keyTypeBuilder.Build(), @@ -140,43 +140,41 @@ Y_UNIT_TEST(Lookup) { 1'000'000); runtime.Register(lookupActor); - NKikimr::NMiniKQL::TUnboxedValueVector keys {\ - CreateStructValue(holderFactory, {"host1", "vpc1"}), - CreateStructValue(holderFactory, {"host2", "vpc1"}), - CreateStructValue(holderFactory, {"host2", "vpc2"}), //NOT_FOUND expected - CreateStructValue(holderFactory, {"very very long hostname to for test 2", "vpc2"}), - }; + NDq::IDqAsyncLookupSource::TUnboxedValueMap request{4, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual()}; + request.emplace(CreateStructValue(holderFactory, {"host1", "vpc1"}), NUdf::TUnboxedValue{}); + request.emplace(CreateStructValue(holderFactory, {"host2", "vpc1"}), NUdf::TUnboxedValue{}); + request.emplace(CreateStructValue(holderFactory, {"host2", "vpc2"}), NUdf::TUnboxedValue{}); //NOT_FOUND expected + request.emplace(CreateStructValue(holderFactory, {"very very long hostname to for test 2", "vpc2"}), NUdf::TUnboxedValue{}); guard.Release(); //let actors use alloc - auto callLookupActor = new TCallLookupActor(alloc, lookupActor->SelfId(), std::move(keys)); + auto callLookupActor = new TCallLookupActor(alloc, lookupActor->SelfId(), std::move(request)); runtime.Register(callLookupActor); auto ev = runtime.GrabEdgeEventRethrow<NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge); auto guard2 = Guard(*alloc.get()); - NKikimr::NMiniKQL::TKeyPayloadPairVector lookupResult = std::move(ev->Get()->Data); + auto lookupResult = std::move(ev->Get()->Result); UNIT_ASSERT_EQUAL(4, lookupResult.size()); { - auto& [k, v] = lookupResult[0]; - UNIT_ASSERT(CheckStructValue(k, {"host1", "vpc1"})); - UNIT_ASSERT(CheckStructValue(v, {"host1.vpc1.net", "192.168.1.1"})); + const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {"host1", "vpc1"})); + UNIT_ASSERT(v); + UNIT_ASSERT(CheckStructValue(*v, {"host1.vpc1.net", "192.168.1.1"})); } { - auto& [k, v] = lookupResult[1]; - UNIT_ASSERT(CheckStructValue(k, {"host2", "vpc1"})); - UNIT_ASSERT(CheckStructValue(v, {"host2.vpc1.net", "192.168.1.2"})); + const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {"host2", "vpc1"})); + UNIT_ASSERT(v); + UNIT_ASSERT(CheckStructValue(*v, {"host2.vpc1.net", "192.168.1.2"})); } { - auto& [k, v] = lookupResult[2]; - UNIT_ASSERT(CheckStructValue(k, {"host2", "vpc2"})); - UNIT_ASSERT(!v); + const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {"host2", "vpc2"})); + UNIT_ASSERT(v); + UNIT_ASSERT(!*v); } { - auto& [k, v] = lookupResult[3]; - UNIT_ASSERT(CheckStructValue(k, {"very very long hostname to for test 2", "vpc2"})); - UNIT_ASSERT(CheckStructValue(v, {"very very long fqdn for test 2", "192.168.100.2"})); + const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {"very very long hostname to for test 2", "vpc2"})); + UNIT_ASSERT(v); + UNIT_ASSERT(CheckStructValue(*v, {"very very long fqdn for test 2", "192.168.100.2"})); } - } } //Y_UNIT_TEST_SUITE(GenericProviderLookupActor)
\ No newline at end of file diff --git a/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.cpp b/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.cpp index f79dc9f7b5..35298adb62 100644 --- a/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.cpp +++ b/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.cpp @@ -61,6 +61,7 @@ public: NFile::TYtFileServices::TPtr ytServices, NActors::TActorId parentId, std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, + std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> keyTypeHelper, const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, NYql::NYt::NSource::TLookupSource&& lookupSource, @@ -71,6 +72,7 @@ public: : YtServices(ytServices) , ParentId(std::move(parentId)) , Alloc(alloc) + , KeyTypeHelper(keyTypeHelper) , FunctionRegistry(functionRegistry) , LookupSource(std::move(lookupSource)) , KeyType(keyType) @@ -78,17 +80,17 @@ public: , HolderFactory(holderFactory) , TypeEnv(typeEnv) , MaxKeysInRequest(maxKeysInRequest) - , KeyTypeHelper(keyType) , Data(10, - KeyTypeHelper.GetValueHash(), - KeyTypeHelper.GetValueEqual() + KeyTypeHelper->GetValueHash(), + KeyTypeHelper->GetValueEqual() ) { } ~TYtLookupActor() { auto guard = Guard(*Alloc); - KeyTypeHelper = TKeyTypeHelper{}; - Data = TTableData(0, KeyTypeHelper.GetValueHash(), KeyTypeHelper.GetValueEqual()); + KeyTypeHelper.reset(); + TKeyTypeHelper empty; + Data = IDqAsyncLookupSource::TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()}; } @@ -154,19 +156,18 @@ private: //IDqAsyncLookupSource size_t GetMaxSupportedKeysInRequest() const override { return MaxKeysInRequest; } - void AsyncLookup(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) override { - YQL_CLOG(DEBUG, ProviderYt) << "ActorId=" << SelfId() << " Got LookupRequest for " << keys.size() << " keys"; + void AsyncLookup(IDqAsyncLookupSource::TUnboxedValueMap&& request) override { + YQL_CLOG(DEBUG, ProviderYt) << "ActorId=" << SelfId() << " Got LookupRequest for " << request.size() << " keys"; Y_ABORT_IF(InProgress); - Y_ABORT_IF(keys.size() > MaxKeysInRequest); + Y_ABORT_IF(request.size() > MaxKeysInRequest); InProgress = true; auto guard = Guard(*Alloc); - NKikimr::NMiniKQL::TKeyPayloadPairVector lookupResult; - lookupResult.reserve(keys.size()); - for (const auto& k: keys) { - const auto it = Data.find(k); - lookupResult.emplace_back(k, it != Data.end() ? it->second : NUdf::TUnboxedValue{}); + for (const auto& [k, _]: request) { + if (const auto* v = Data.FindPtr(k)) { + request[k] = *v; + } } - auto ev = new IDqAsyncLookupSource::TEvLookupResult(Alloc, std::move(lookupResult)); + auto ev = new IDqAsyncLookupSource::TEvLookupResult(Alloc, std::move(request)); TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(ParentId, SelfId(), ev)); InProgress = false; } @@ -177,7 +178,7 @@ private: //events hFunc(NActors::TEvents::TEvPoison, Handle); ) void Handle(IDqAsyncLookupSource::TEvLookupRequest::TPtr ev) { - AsyncLookup(ev->Get()->Keys); + AsyncLookup(std::move(ev->Get()->Request)); } void Handle(NActors::TEvents::TEvPoison::TPtr) { PassAway(); @@ -193,6 +194,7 @@ private: NFile::TYtFileServices::TPtr YtServices; const NActors::TActorId ParentId; std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; + std::shared_ptr<TKeyTypeHelper> KeyTypeHelper; const NKikimr::NMiniKQL::IFunctionRegistry& FunctionRegistry; NYql::NYt::NSource::TLookupSource LookupSource; const NKikimr::NMiniKQL::TStructType* const KeyType; @@ -201,22 +203,15 @@ private: const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv; const size_t MaxKeysInRequest; std::atomic_bool InProgress; - using TKeyTypeHelper = NKikimr::NMiniKQL::TKeyTypeContanerHelper<true, true, false>; - TKeyTypeHelper KeyTypeHelper; - using TTableData = std::unordered_map< - NUdf::TUnboxedValue, - NUdf::TUnboxedValue, - NKikimr::NMiniKQL::TValueHasher, - NKikimr::NMiniKQL::TValueEqual, - NKikimr::NMiniKQL::TMKQLAllocator<std::pair<const NUdf::TUnboxedValue, NUdf::TUnboxedValue>> - >; - TTableData Data; + + IDqAsyncLookupSource::TUnboxedValueMap Data; }; std::pair<NYql::NDq::IDqAsyncLookupSource*, NActors::IActor*> CreateYtLookupActor( NFile::TYtFileServices::TPtr ytServices, NActors::TActorId parentId, std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, + std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> keyTypeHelper, const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, NYql::NYt::NSource::TLookupSource&& lookupSource, const NKikimr::NMiniKQL::TStructType* keyType, @@ -229,6 +224,7 @@ std::pair<NYql::NDq::IDqAsyncLookupSource*, NActors::IActor*> CreateYtLookupActo ytServices, parentId, alloc, + keyTypeHelper, functionRegistry, typeEnv, std::move(lookupSource), diff --git a/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.h b/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.h index c6fb5431fe..7fc5ce2052 100644 --- a/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.h +++ b/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.h @@ -13,6 +13,7 @@ std::pair<NYql::NDq::IDqAsyncLookupSource*, NActors::IActor*> CreateYtLookupActo NFile::TYtFileServices::TPtr ytServices, NActors::TActorId parentId, std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, + std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> keyTypeHelper, const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, NYql::NYt::NSource::TLookupSource&& lookupSource, const NKikimr::NMiniKQL::TStructType* keyType, diff --git a/ydb/library/yql/providers/yt/actors/yql_yt_provider_factories.cpp b/ydb/library/yql/providers/yt/actors/yql_yt_provider_factories.cpp index 18cbaa4016..c1e28d4d92 100644 --- a/ydb/library/yql/providers/yt/actors/yql_yt_provider_factories.cpp +++ b/ydb/library/yql/providers/yt/actors/yql_yt_provider_factories.cpp @@ -12,6 +12,7 @@ namespace NYql::NDq { ytServices, std::move(args.ParentId), args.Alloc, + args.KeyTypeHelper, functionRegistry, std::move(lookupSource), args.KeyType, |