aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzverevgeny <zverevgeny@ydb.tech>2024-06-18 11:08:28 +0300
committerGitHub <noreply@github.com>2024-06-18 11:08:28 +0300
commit82687fa128f0ac3fd261815b98d88cbbc426e2ab (patch)
tree5bc517a9fc804b5a4366bd8d7c5676430e72a077
parentd23ebbe747a462f1bb29afaf810ff41cb1d4cd96 (diff)
downloadydb-82687fa128f0ac3fd261815b98d88cbbc426e2ab.tar.gz
YQ-2068 use HashMap for lookup request and result (#5594)
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h33
-rw-r--r--ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp35
-rw-r--r--ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp60
-rw-r--r--ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp59
-rw-r--r--ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h1
-rw-r--r--ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp1
-rw-r--r--ydb/library/yql/providers/yt/actors/ut/yql_yt_lookup_actor_ut.cpp52
-rw-r--r--ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.cpp46
-rw-r--r--ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.h1
-rw-r--r--ydb/library/yql/providers/yt/actors/yql_yt_provider_factories.cpp1
10 files changed, 140 insertions, 149 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
index bed2fe2dd0..85a60599bd 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
@@ -202,40 +202,50 @@ struct IDqComputeActorAsyncOutput {
};
struct IDqAsyncLookupSource {
+ using TKeyTypeHelper = NKikimr::NMiniKQL::TKeyTypeContanerHelper<true, true, false>;
+ using TUnboxedValueMap = THashMap<
+ NUdf::TUnboxedValue,
+ NUdf::TUnboxedValue,
+ NKikimr::NMiniKQL::TValueHasher,
+ NKikimr::NMiniKQL::TValueEqual,
+ NKikimr::NMiniKQL::TMKQLAllocator<std::pair<const NUdf::TUnboxedValue, NUdf::TUnboxedValue>>
+ >;
struct TEvLookupRequest: NActors::TEventLocal<TEvLookupRequest, TDqComputeEvents::EvLookupRequest> {
- TEvLookupRequest(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, NKikimr::NMiniKQL::TUnboxedValueVector&& keys)
+ TEvLookupRequest(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& request)
: Alloc(alloc)
- , Keys(std::move(keys))
+ , Request(std::move(request))
{
}
~TEvLookupRequest() {
auto guard = Guard(*Alloc);
- Keys = NKikimr::NMiniKQL::TUnboxedValueVector{};
+ TKeyTypeHelper empty;
+ Request = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()};
}
-
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
- NKikimr::NMiniKQL::TUnboxedValueVector Keys;
+ TUnboxedValueMap Request;
};
+
struct TEvLookupResult: NActors::TEventLocal<TEvLookupResult, TDqComputeEvents::EvLookupResult> {
- TEvLookupResult(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, NKikimr::NMiniKQL::TKeyPayloadPairVector&& data)
+ TEvLookupResult(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TUnboxedValueMap&& result)
: Alloc(alloc)
- , Data(std::move(data))
+ , Result(std::move(result))
{
}
~TEvLookupResult() {
auto guard = Guard(*Alloc.get());
- Data = NKikimr::NMiniKQL::TKeyPayloadPairVector{};
+ TKeyTypeHelper empty;
+ Result = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()};
}
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
- NKikimr::NMiniKQL::TKeyPayloadPairVector Data;
+ TUnboxedValueMap Result;
};
virtual size_t GetMaxSupportedKeysInRequest() const = 0;
//Initiate lookup for requested keys
//Only one request at a time is allowed. Request must contain no more than GetMaxSupportedKeysInRequest() keys
- //Upon completion, results are sent in a TEvLookupResult to the preconfigured actor
- virtual void AsyncLookup(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) = 0;
+ //Upon completion, results are sent in TEvLookupResult event to the preconfigured actor
+ virtual void AsyncLookup(TUnboxedValueMap&& request) = 0;
protected:
~IDqAsyncLookupSource() {}
};
@@ -266,6 +276,7 @@ public:
struct TLookupSourceArguments {
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
+ std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> KeyTypeHelper;
NActors::TActorId ParentId;
google::protobuf::Any LookupSource; //provider specific data source
const NKikimr::NMiniKQL::TStructType* KeyType;
diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp
index 19d4b770eb..38576688ca 100644
--- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp
+++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp
@@ -51,7 +51,7 @@ public:
, LookupJoinColumns(std::move(lookupJoinColumns))
, InputRowType(inputRowType)
, LookupKeyType(lookupKeyType)
- , KeyTypeHelper(lookupKeyType)
+ , KeyTypeHelper(std::make_shared<IDqAsyncLookupSource::TKeyTypeHelper>(lookupKeyType))
, LookupPayloadType(lookupPayloadType)
, OutputRowType(outputRowType)
, OutputRowColumnOrder(outputRowColumnOrder)
@@ -67,6 +67,7 @@ public:
Become(&TInputTransformStreamLookupBase::StateFunc);
NDq::IDqAsyncIoFactory::TLookupSourceArguments lookupSourceArgs {
.Alloc = Alloc,
+ .KeyTypeHelper = KeyTypeHelper,
.ParentId = SelfId(),
.LookupSource = Settings.GetRightSource().GetLookupSource(),
.KeyType = LookupKeyType,
@@ -90,22 +91,7 @@ private: //events
void Handle(IDqAsyncLookupSource::TEvLookupResult::TPtr ev) {
auto guard = BindAllocator();
- THashMap<
- NUdf::TUnboxedValue,
- NUdf::TUnboxedValue,
- NKikimr::NMiniKQL::TValueHasher,
- NKikimr::NMiniKQL::TValueEqual,
- NKikimr::NMiniKQL::TMKQLAllocator<std::pair<const NUdf::TUnboxedValue, NUdf::TUnboxedValue>>
- > map(
- ev->Get()->Data.size(),
- KeyTypeHelper.GetValueHash(),
- KeyTypeHelper.GetValueEqual()
- );
- for (auto& r: ev->Get()->Data) {
- Y_ABORT_UNLESS(r.first.IsBoxed());
- Y_ABORT_UNLESS(!r.second || r.second.IsBoxed());
- map.emplace(std::move(r));
- }
+ const auto lookupResult = std::move(ev->Get()->Result);
while (!AwaitingQueue.empty()) {
const auto wideInputRow = AwaitingQueue.Head();
NUdf::TUnboxedValue* keyItems;
@@ -113,7 +99,7 @@ private: //events
for (size_t i = 0; i != InputJoinColumns.size(); ++i) {
keyItems[i] = wideInputRow[InputJoinColumns[i]];
}
- auto lookupPayload = map.FindPtr(lookupKey);
+ auto lookupPayload = lookupResult.FindPtr(lookupKey);
NUdf::TUnboxedValue* outputRowItems;
NUdf::TUnboxedValue outputRow = HolderFactory.CreateDirectArrayHolder(OutputRowColumnOrder.size(), outputRowItems);
@@ -163,7 +149,7 @@ private: //IDqComputeActorAsyncInput
auto guard = BindAllocator();
//All resources, held by this class, that have been created with mkql allocator, must be deallocated here
InputFlow.Clear();
- KeyTypeHelper = TKeyTypeHelper{};
+ KeyTypeHelper.reset();
NMiniKQL::TUnboxedValueBatch{}.swap(AwaitingQueue);
NMiniKQL::TUnboxedValueBatch{}.swap(ReadyQueue);
}
@@ -180,21 +166,21 @@ private: //IDqComputeActorAsyncInput
NUdf::TUnboxedValue* inputRowItems;
NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems);
const auto maxKeysInRequest = LookupSource.first->GetMaxSupportedKeysInRequest();
- NKikimr::NMiniKQL::TUnboxedValueVector keysForLookup;
+ IDqAsyncLookupSource::TUnboxedValueMap keysForLookup{maxKeysInRequest, KeyTypeHelper->GetValueHash(), KeyTypeHelper->GetValueEqual()};
while (
((InputFlowFetchStatus = FetchWideInputValue(inputRowItems)) == NUdf::EFetchStatus::Ok) &&
(keysForLookup.size() < maxKeysInRequest)
) {
NUdf::TUnboxedValue* keyItems;
- auto key = HolderFactory.CreateDirectArrayHolder(InputJoinColumns.size(), keyItems);
+ NUdf::TUnboxedValue key = HolderFactory.CreateDirectArrayHolder(InputJoinColumns.size(), keyItems);
for (size_t i = 0; i != InputJoinColumns.size(); ++i) {
keyItems[i] = inputRowItems[InputJoinColumns[i]];
}
- keysForLookup.push_back(key);
+ keysForLookup.emplace(std::move(key), NUdf::TUnboxedValue{});
AwaitingQueue.PushRow(inputRowItems, InputRowType->GetElementsCount());
}
if (!keysForLookup.empty()) {
- LookupSource.first->AsyncLookup(keysForLookup);
+ LookupSource.first->AsyncLookup(std::move(keysForLookup));
WaitingForLookupResults = true;
}
}
@@ -241,8 +227,7 @@ protected:
const TVector<size_t> LookupJoinColumns;
const NMiniKQL::TMultiType* const InputRowType;
const NMiniKQL::TStructType* const LookupKeyType; //key column types in LookupTable
- using TKeyTypeHelper = NKikimr::NMiniKQL::TKeyTypeContanerHelper<true, true, false>;
- TKeyTypeHelper KeyTypeHelper;
+ std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> KeyTypeHelper;
const NMiniKQL::TStructType* const LookupPayloadType; //other column types in LookupTable
const NMiniKQL::TMultiType* const OutputRowType;
const TOutputRowColumnOrder OutputRowColumnOrder;
diff --git a/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp b/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp
index 722fd64fcd..0f294ca056 100644
--- a/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp
+++ b/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp
@@ -21,24 +21,30 @@ using namespace NActors;
Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
+ NYql::NUdf::TUnboxedValue CreateStructValue(NKikimr::NMiniKQL::THolderFactory& holderFactory, std::initializer_list<ui64> members) {
+ NYql::NUdf::TUnboxedValue* items;
+ NYql::NUdf::TUnboxedValue result = holderFactory.CreateDirectArrayHolder(members.size(), items);
+ for (size_t i = 0; i != members.size(); ++i) {
+ items[i] = NYql::NUdf::TUnboxedValuePod{*(members.begin() + i)};
+ }
+ return result;
+ }
+
//Simple actor to call IDqAsyncLookupSource::AsyncLookup from an actor system's thread
class TCallLookupActor: public TActorBootstrapped<TCallLookupActor> {
public:
TCallLookupActor(
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
NYql::NDq::IDqAsyncLookupSource* lookupSource,
- NKikimr::NMiniKQL::TUnboxedValueVector&& keysToLookUp)
+ NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap&& request)
: Alloc(alloc)
, LookupSource(lookupSource)
- , KeysToLookUp(std::move(keysToLookUp))
+ , Request(std::move(request))
{
}
void Bootstrap() {
- LookupSource->AsyncLookup(std::move(KeysToLookUp));
- auto guard = Guard(*Alloc);
- KeysToLookUp.clear();
- KeysToLookUp.shrink_to_fit();
+ LookupSource->AsyncLookup(std::move(Request));
}
private:
@@ -47,7 +53,7 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
private:
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NYql::NDq::IDqAsyncLookupSource* LookupSource;
- NKikimr::NMiniKQL::TUnboxedValueVector KeysToLookUp;
+ NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap Request;
};
Y_UNIT_TEST(Lookup) {
@@ -94,8 +100,8 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
.Disjunction()
.Operand()
.Conjunction()
- .Operand().Equal().Column("id").Value<ui64>(0).Done().Done()
- .Operand().Equal().Column("optional_id").OptionalValue<ui64>(100).Done().Done()
+ .Operand().Equal().Column("id").Value<ui64>(2).Done().Done()
+ .Operand().Equal().Column("optional_id").OptionalValue<ui64>(102).Done().Done()
.Done()
.Done()
.Operand()
@@ -106,8 +112,8 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
.Done()
.Operand()
.Conjunction()
- .Operand().Equal().Column("id").Value<ui64>(2).Done().Done()
- .Operand().Equal().Column("optional_id").OptionalValue<ui64>(102).Done().Done()
+ .Operand().Equal().Column("id").Value<ui64>(0).Done().Done()
+ .Operand().Equal().Column("optional_id").OptionalValue<ui64>(100).Done().Done()
.Done()
.Done()
.Done()
@@ -153,12 +159,14 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
outputypeBuilder.Add("string_value", typeBuilder.NewDataType(NYql::NUdf::EDataSlot::String, true));
auto guard = Guard(*alloc.get());
+ auto keyTypeHelper = std::make_shared<NYql::NDq::IDqAsyncLookupSource::TKeyTypeHelper>(keyTypeBuilder.Build());
auto [lookupSource, actor] = NYql::NDq::CreateGenericLookupActor(
connectorMock,
std::make_shared<NYql::NTestCreds::TSecuredServiceAccountCredentialsFactory>(),
edge,
alloc,
+ keyTypeHelper,
std::move(lookupSourceSettings),
keyTypeBuilder.Build(),
outputypeBuilder.Build(),
@@ -167,45 +175,41 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
1'000'000);
runtime.Register(actor);
- NKikimr::NMiniKQL::TUnboxedValueVector keys;
+ NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap request(3, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual());
for (size_t i = 0; i != 3; ++i) {
NYql::NUdf::TUnboxedValue* keyItems;
auto key = holderFactory.CreateDirectArrayHolder(2, keyItems);
keyItems[0] = NYql::NUdf::TUnboxedValuePod(ui64(i));
keyItems[1] = NYql::NUdf::TUnboxedValuePod(ui64(100 + i));
- keys.push_back(std::move(key));
+ request.emplace(std::move(key), NYql::NUdf::TUnboxedValue{});
}
guard.Release(); //let actors use alloc
- auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(keys));
+ auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(request));
runtime.Register(callLookupActor);
auto ev = runtime.GrabEdgeEventRethrow<NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge);
auto guard2 = Guard(*alloc.get());
- NKikimr::NMiniKQL::TKeyPayloadPairVector lookupResult = std::move(ev->Get()->Data);
+ auto lookupResult = std::move(ev->Get()->Result);
UNIT_ASSERT_EQUAL(3, lookupResult.size());
{
- auto& [k, v] = lookupResult[0];
- UNIT_ASSERT_EQUAL(0, k.GetElement(0).Get<ui64>());
- UNIT_ASSERT_EQUAL(100, k.GetElement(1).Get<ui64>());
- NYql::NUdf::TUnboxedValue val = v.GetElement(0);
+ const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {0, 100}));
+ UNIT_ASSERT(v);
+ NYql::NUdf::TUnboxedValue val = v->GetElement(0);
UNIT_ASSERT(val.AsStringRef() == TStringBuf("a"));
}
{
- auto& [k, v] = lookupResult[1];
- UNIT_ASSERT_EQUAL(1, k.GetElement(0).Get<ui64>());
- UNIT_ASSERT_EQUAL(101, k.GetElement(1).Get<ui64>());
- NYql::NUdf::TUnboxedValue val = v.GetElement(0);
+ const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {1, 101}));
+ UNIT_ASSERT(v);
+ NYql::NUdf::TUnboxedValue val = v->GetElement(0);
UNIT_ASSERT(val.AsStringRef() == TStringBuf("b"));
}
{
- auto& [k, v] = lookupResult[2];
- UNIT_ASSERT_EQUAL(2, k.GetElement(0).Get<ui64>());
- UNIT_ASSERT_EQUAL(102, k.GetElement(1).Get<ui64>());
- //this key was not found and reported as empty
- UNIT_ASSERT(!v);
+ const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {2, 102}));
+ UNIT_ASSERT(v);
+ UNIT_ASSERT(!*v);
}
}
diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp
index acd025ce7f..956a0b7c30 100644
--- a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp
+++ b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp
@@ -65,6 +65,7 @@ namespace NYql::NDq {
TGenericTokenProvider::TPtr tokenProvider,
NActors::TActorId&& parentId,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
+ std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> keyTypeHelper,
NYql::Generic::TLookupSource&& lookupSource,
const NKikimr::NMiniKQL::TStructType* keyType,
const NKikimr::NMiniKQL::TStructType* payloadType,
@@ -75,6 +76,7 @@ namespace NYql::NDq {
, TokenProvider(std::move(tokenProvider))
, ParentId(std::move(parentId))
, Alloc(alloc)
+ , KeyTypeHelper(keyTypeHelper)
, LookupSource(std::move(lookupSource))
, KeyType(keyType)
, PayloadType(payloadType)
@@ -82,17 +84,18 @@ namespace NYql::NDq {
, HolderFactory(holderFactory)
, ColumnDestinations(CreateColumnDestination())
, MaxKeysInRequest(maxKeysInRequest)
- , KeyTypeHelper(keyType)
- , RequestedKeys(1000,
- KeyTypeHelper.GetValueHash(),
- KeyTypeHelper.GetValueEqual())
+ , Request(
+ 0,
+ KeyTypeHelper->GetValueHash(),
+ KeyTypeHelper->GetValueEqual())
{
}
~TGenericLookupActor() {
auto guard = Guard(*Alloc);
- KeyTypeHelper = TKeyTypeHelper{};
- RequestedKeys = TRequestedKeys(0, KeyTypeHelper.GetValueHash(), KeyTypeHelper.GetValueEqual());
+ KeyTypeHelper.reset();
+ TKeyTypeHelper empty;
+ Request = IDqAsyncLookupSource::TUnboxedValueMap(0, empty.GetValueHash(), empty.GetValueEqual());
}
void Bootstrap() {
@@ -113,9 +116,9 @@ namespace NYql::NDq {
size_t GetMaxSupportedKeysInRequest() const override {
return MaxKeysInRequest;
}
- void AsyncLookup(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) override {
+ void AsyncLookup(IDqAsyncLookupSource::TUnboxedValueMap&& request) override {
auto guard = Guard(*Alloc);
- CreateRequest(keys);
+ CreateRequest(std::move(request));
}
private: //events
@@ -187,15 +190,14 @@ namespace NYql::NDq {
}
private:
- void CreateRequest(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) {
- YQL_CLOG(INFO, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << keys.size() << " keys";
+ void CreateRequest(IDqAsyncLookupSource::TUnboxedValueMap&& request) {
+ YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request.size() << " keys";
Y_ABORT_IF(InProgress);
- Y_ABORT_IF(keys.size() > MaxKeysInRequest);
+ Y_ABORT_IF(request.size() == 0 || request.size() > MaxKeysInRequest);
+ Request = std::move(request);
NConnector::NApi::TListSplitsRequest splitRequest;
- *splitRequest.add_selects() = CreateSelect(keys);
+ *splitRequest.add_selects() = CreateSelect();
splitRequest.Setmax_split_count(1);
- Y_ABORT_UNLESS(RequestedKeys.empty());
- RequestedKeys.insert(keys.begin(), keys.end());
Connector->ListSplits(splitRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
auto result = ExtractFromConstFuture(asyncResult);
if (result.Status.Ok()) {
@@ -256,21 +258,16 @@ namespace NYql::NDq {
for (size_t j = 0; j != columns.size(); ++j) {
(ColumnDestinations[j].first == EColumnDestination::Key ? keyItems : outputItems)[ColumnDestinations[j].second] = columns[j][i];
}
- if (auto it = RequestedKeys.find(key); it != RequestedKeys.end()) { //remove duplicatas in lookup results
- LookupResult.emplace_back(std::move(key), std::move(output));
- RequestedKeys.erase(it);
+ if (auto* v = Request.FindPtr(key)) {
+ *v = std::move(output); //duplicates will be overwritten
}
}
}
void FinalizeRequest() {
- YQL_CLOG(INFO, ProviderGeneric) << "Sending lookup results with " << LookupResult.size() << " filled rows, " << RequestedKeys.size() << " empty rows";
+ YQL_CLOG(DEBUG, ProviderGeneric) << "Sending lookup results for " << Request.size() << " keys";
auto guard = Guard(*Alloc);
- for (auto&& k : RequestedKeys) {
- LookupResult.emplace_back(std::move(k), NUdf::TUnboxedValue{});
- }
- RequestedKeys.clear();
- auto ev = new IDqAsyncLookupSource::TEvLookupResult(Alloc, std::move(LookupResult));
+ auto ev = new IDqAsyncLookupSource::TEvLookupResult(Alloc, std::move(Request));
TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(ParentId, SelfId(), ev));
LookupResult = {};
ReadSplitsIterator = {};
@@ -325,7 +322,7 @@ namespace NYql::NDq {
return dsi;
}
- NConnector::NApi::TSelect CreateSelect(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) {
+ NConnector::NApi::TSelect CreateSelect() {
NConnector::NApi::TSelect select;
*select.mutable_data_source_instance() = GetDataSourceInstanceWithToken();
@@ -338,7 +335,7 @@ namespace NYql::NDq {
select.mutable_from()->Settable(LookupSource.table());
NConnector::NApi::TPredicate_TDisjunction disjunction;
- for (const auto& k : keys) {
+ for (const auto& [k, _] : Request) {
NConnector::NApi::TPredicate_TConjunction conjunction;
for (ui32 c = 0; c != KeyType->GetMembersCount(); ++c) {
NConnector::NApi::TPredicate_TComparison eq;
@@ -360,6 +357,7 @@ namespace NYql::NDq {
TGenericTokenProvider::TPtr TokenProvider;
const NActors::TActorId ParentId;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
+ std::shared_ptr<TKeyTypeHelper> KeyTypeHelper;
const NYql::Generic::TLookupSource LookupSource;
const NKikimr::NMiniKQL::TStructType* const KeyType;
const NKikimr::NMiniKQL::TStructType* const PayloadType;
@@ -368,14 +366,7 @@ namespace NYql::NDq {
const std::vector<std::pair<EColumnDestination, size_t>> ColumnDestinations;
const size_t MaxKeysInRequest;
std::atomic_bool InProgress;
- using TKeyTypeHelper = NKikimr::NMiniKQL::TKeyTypeContanerHelper<true, true, false>;
- TKeyTypeHelper KeyTypeHelper;
- using TRequestedKeys = std::unordered_set<
- NUdf::TUnboxedValue,
- NKikimr::NMiniKQL::TValueHasher,
- NKikimr::NMiniKQL::TValueEqual,
- NKikimr::NMiniKQL::TMKQLAllocator<NUdf::TUnboxedValue>>;
- TRequestedKeys RequestedKeys;
+ IDqAsyncLookupSource::TUnboxedValueMap Request;
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; //TODO move me to TEvReadSplitsPart
NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
};
@@ -385,6 +376,7 @@ namespace NYql::NDq {
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
NActors::TActorId parentId,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
+ std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> keyTypeHelper,
NYql::Generic::TLookupSource&& lookupSource,
const NKikimr::NMiniKQL::TStructType* keyType,
const NKikimr::NMiniKQL::TStructType* payloadType,
@@ -399,6 +391,7 @@ namespace NYql::NDq {
std::move(tokenProvider),
std::move(parentId),
alloc,
+ keyTypeHelper,
std::move(lookupSource),
keyType,
payloadType,
diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h
index cd8448e3a1..9f8c0c268f 100644
--- a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h
+++ b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h
@@ -16,6 +16,7 @@ namespace NYql::NDq {
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
NActors::TActorId parentId,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
+ std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> keyTypeHelper,
NYql::Generic::TLookupSource&& lookupSource,
const NKikimr::NMiniKQL::TStructType* keyType,
const NKikimr::NMiniKQL::TStructType* payloadType,
diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp
index 3d154ef99b..e9b2b8bf8b 100644
--- a/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp
+++ b/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp
@@ -23,6 +23,7 @@ namespace NYql::NDq {
credentialsFactory,
std::move(args.ParentId),
args.Alloc,
+ args.KeyTypeHelper,
std::move(lookupSource),
args.KeyType,
args.PayloadType,
diff --git a/ydb/library/yql/providers/yt/actors/ut/yql_yt_lookup_actor_ut.cpp b/ydb/library/yql/providers/yt/actors/ut/yql_yt_lookup_actor_ut.cpp
index 1e9ba6b9ff..03f77cf2b2 100644
--- a/ydb/library/yql/providers/yt/actors/ut/yql_yt_lookup_actor_ut.cpp
+++ b/ydb/library/yql/providers/yt/actors/ut/yql_yt_lookup_actor_ut.cpp
@@ -49,18 +49,16 @@ public:
TCallLookupActor(
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const NActors::TActorId& lookupActor,
- NKikimr::NMiniKQL::TUnboxedValueVector&& keysToLookUp)
+ NDq::IDqAsyncLookupSource::TUnboxedValueMap&& request)
: Alloc(alloc)
, LookupActor(lookupActor)
- , KeysToLookUp(std::move(keysToLookUp))
+ , Request(std::move(request))
{
}
void Bootstrap() {
- auto ev = new NDq::IDqAsyncLookupSource::TEvLookupRequest(Alloc, std::move(KeysToLookUp));
+ auto ev = new NDq::IDqAsyncLookupSource::TEvLookupRequest(Alloc, std::move(Request));
TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(LookupActor, SelfId(), ev));
- auto guard = Guard(*Alloc);
- KeysToLookUp = NKikimr::NMiniKQL::TUnboxedValueVector{};
}
private:
@@ -69,7 +67,7 @@ private:
private:
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
const NActors::TActorId LookupActor;
- NKikimr::NMiniKQL::TUnboxedValueVector KeysToLookUp;
+ NDq::IDqAsyncLookupSource::TUnboxedValueMap Request;
};
Y_UNIT_TEST(Lookup) {
@@ -127,10 +125,12 @@ Y_UNIT_TEST(Lookup) {
mapping
);
auto guard = Guard(*alloc.get());
+ auto keyTypeHelper = std::make_shared<NDq::IDqAsyncLookupSource::TKeyTypeHelper>(keyTypeBuilder.Build());
auto [_, lookupActor] = NYql::NDq::CreateYtLookupActor(
ytServices,
edge,
alloc,
+ keyTypeHelper,
*functionRegistry,
std::move(source),
keyTypeBuilder.Build(),
@@ -140,43 +140,41 @@ Y_UNIT_TEST(Lookup) {
1'000'000);
runtime.Register(lookupActor);
- NKikimr::NMiniKQL::TUnboxedValueVector keys {\
- CreateStructValue(holderFactory, {"host1", "vpc1"}),
- CreateStructValue(holderFactory, {"host2", "vpc1"}),
- CreateStructValue(holderFactory, {"host2", "vpc2"}), //NOT_FOUND expected
- CreateStructValue(holderFactory, {"very very long hostname to for test 2", "vpc2"}),
- };
+ NDq::IDqAsyncLookupSource::TUnboxedValueMap request{4, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual()};
+ request.emplace(CreateStructValue(holderFactory, {"host1", "vpc1"}), NUdf::TUnboxedValue{});
+ request.emplace(CreateStructValue(holderFactory, {"host2", "vpc1"}), NUdf::TUnboxedValue{});
+ request.emplace(CreateStructValue(holderFactory, {"host2", "vpc2"}), NUdf::TUnboxedValue{}); //NOT_FOUND expected
+ request.emplace(CreateStructValue(holderFactory, {"very very long hostname to for test 2", "vpc2"}), NUdf::TUnboxedValue{});
guard.Release(); //let actors use alloc
- auto callLookupActor = new TCallLookupActor(alloc, lookupActor->SelfId(), std::move(keys));
+ auto callLookupActor = new TCallLookupActor(alloc, lookupActor->SelfId(), std::move(request));
runtime.Register(callLookupActor);
auto ev = runtime.GrabEdgeEventRethrow<NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge);
auto guard2 = Guard(*alloc.get());
- NKikimr::NMiniKQL::TKeyPayloadPairVector lookupResult = std::move(ev->Get()->Data);
+ auto lookupResult = std::move(ev->Get()->Result);
UNIT_ASSERT_EQUAL(4, lookupResult.size());
{
- auto& [k, v] = lookupResult[0];
- UNIT_ASSERT(CheckStructValue(k, {"host1", "vpc1"}));
- UNIT_ASSERT(CheckStructValue(v, {"host1.vpc1.net", "192.168.1.1"}));
+ const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {"host1", "vpc1"}));
+ UNIT_ASSERT(v);
+ UNIT_ASSERT(CheckStructValue(*v, {"host1.vpc1.net", "192.168.1.1"}));
}
{
- auto& [k, v] = lookupResult[1];
- UNIT_ASSERT(CheckStructValue(k, {"host2", "vpc1"}));
- UNIT_ASSERT(CheckStructValue(v, {"host2.vpc1.net", "192.168.1.2"}));
+ const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {"host2", "vpc1"}));
+ UNIT_ASSERT(v);
+ UNIT_ASSERT(CheckStructValue(*v, {"host2.vpc1.net", "192.168.1.2"}));
}
{
- auto& [k, v] = lookupResult[2];
- UNIT_ASSERT(CheckStructValue(k, {"host2", "vpc2"}));
- UNIT_ASSERT(!v);
+ const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {"host2", "vpc2"}));
+ UNIT_ASSERT(v);
+ UNIT_ASSERT(!*v);
}
{
- auto& [k, v] = lookupResult[3];
- UNIT_ASSERT(CheckStructValue(k, {"very very long hostname to for test 2", "vpc2"}));
- UNIT_ASSERT(CheckStructValue(v, {"very very long fqdn for test 2", "192.168.100.2"}));
+ const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {"very very long hostname to for test 2", "vpc2"}));
+ UNIT_ASSERT(v);
+ UNIT_ASSERT(CheckStructValue(*v, {"very very long fqdn for test 2", "192.168.100.2"}));
}
-
}
} //Y_UNIT_TEST_SUITE(GenericProviderLookupActor) \ No newline at end of file
diff --git a/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.cpp b/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.cpp
index f79dc9f7b5..35298adb62 100644
--- a/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.cpp
+++ b/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.cpp
@@ -61,6 +61,7 @@ public:
NFile::TYtFileServices::TPtr ytServices,
NActors::TActorId parentId,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
+ std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> keyTypeHelper,
const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
NYql::NYt::NSource::TLookupSource&& lookupSource,
@@ -71,6 +72,7 @@ public:
: YtServices(ytServices)
, ParentId(std::move(parentId))
, Alloc(alloc)
+ , KeyTypeHelper(keyTypeHelper)
, FunctionRegistry(functionRegistry)
, LookupSource(std::move(lookupSource))
, KeyType(keyType)
@@ -78,17 +80,17 @@ public:
, HolderFactory(holderFactory)
, TypeEnv(typeEnv)
, MaxKeysInRequest(maxKeysInRequest)
- , KeyTypeHelper(keyType)
, Data(10,
- KeyTypeHelper.GetValueHash(),
- KeyTypeHelper.GetValueEqual()
+ KeyTypeHelper->GetValueHash(),
+ KeyTypeHelper->GetValueEqual()
)
{
}
~TYtLookupActor() {
auto guard = Guard(*Alloc);
- KeyTypeHelper = TKeyTypeHelper{};
- Data = TTableData(0, KeyTypeHelper.GetValueHash(), KeyTypeHelper.GetValueEqual());
+ KeyTypeHelper.reset();
+ TKeyTypeHelper empty;
+ Data = IDqAsyncLookupSource::TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()};
}
@@ -154,19 +156,18 @@ private: //IDqAsyncLookupSource
size_t GetMaxSupportedKeysInRequest() const override {
return MaxKeysInRequest;
}
- void AsyncLookup(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) override {
- YQL_CLOG(DEBUG, ProviderYt) << "ActorId=" << SelfId() << " Got LookupRequest for " << keys.size() << " keys";
+ void AsyncLookup(IDqAsyncLookupSource::TUnboxedValueMap&& request) override {
+ YQL_CLOG(DEBUG, ProviderYt) << "ActorId=" << SelfId() << " Got LookupRequest for " << request.size() << " keys";
Y_ABORT_IF(InProgress);
- Y_ABORT_IF(keys.size() > MaxKeysInRequest);
+ Y_ABORT_IF(request.size() > MaxKeysInRequest);
InProgress = true;
auto guard = Guard(*Alloc);
- NKikimr::NMiniKQL::TKeyPayloadPairVector lookupResult;
- lookupResult.reserve(keys.size());
- for (const auto& k: keys) {
- const auto it = Data.find(k);
- lookupResult.emplace_back(k, it != Data.end() ? it->second : NUdf::TUnboxedValue{});
+ for (const auto& [k, _]: request) {
+ if (const auto* v = Data.FindPtr(k)) {
+ request[k] = *v;
+ }
}
- auto ev = new IDqAsyncLookupSource::TEvLookupResult(Alloc, std::move(lookupResult));
+ auto ev = new IDqAsyncLookupSource::TEvLookupResult(Alloc, std::move(request));
TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(ParentId, SelfId(), ev));
InProgress = false;
}
@@ -177,7 +178,7 @@ private: //events
hFunc(NActors::TEvents::TEvPoison, Handle);
)
void Handle(IDqAsyncLookupSource::TEvLookupRequest::TPtr ev) {
- AsyncLookup(ev->Get()->Keys);
+ AsyncLookup(std::move(ev->Get()->Request));
}
void Handle(NActors::TEvents::TEvPoison::TPtr) {
PassAway();
@@ -193,6 +194,7 @@ private:
NFile::TYtFileServices::TPtr YtServices;
const NActors::TActorId ParentId;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
+ std::shared_ptr<TKeyTypeHelper> KeyTypeHelper;
const NKikimr::NMiniKQL::IFunctionRegistry& FunctionRegistry;
NYql::NYt::NSource::TLookupSource LookupSource;
const NKikimr::NMiniKQL::TStructType* const KeyType;
@@ -201,22 +203,15 @@ private:
const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv;
const size_t MaxKeysInRequest;
std::atomic_bool InProgress;
- using TKeyTypeHelper = NKikimr::NMiniKQL::TKeyTypeContanerHelper<true, true, false>;
- TKeyTypeHelper KeyTypeHelper;
- using TTableData = std::unordered_map<
- NUdf::TUnboxedValue,
- NUdf::TUnboxedValue,
- NKikimr::NMiniKQL::TValueHasher,
- NKikimr::NMiniKQL::TValueEqual,
- NKikimr::NMiniKQL::TMKQLAllocator<std::pair<const NUdf::TUnboxedValue, NUdf::TUnboxedValue>>
- >;
- TTableData Data;
+
+ IDqAsyncLookupSource::TUnboxedValueMap Data;
};
std::pair<NYql::NDq::IDqAsyncLookupSource*, NActors::IActor*> CreateYtLookupActor(
NFile::TYtFileServices::TPtr ytServices,
NActors::TActorId parentId,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
+ std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> keyTypeHelper,
const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
NYql::NYt::NSource::TLookupSource&& lookupSource,
const NKikimr::NMiniKQL::TStructType* keyType,
@@ -229,6 +224,7 @@ std::pair<NYql::NDq::IDqAsyncLookupSource*, NActors::IActor*> CreateYtLookupActo
ytServices,
parentId,
alloc,
+ keyTypeHelper,
functionRegistry,
typeEnv,
std::move(lookupSource),
diff --git a/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.h b/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.h
index c6fb5431fe..7fc5ce2052 100644
--- a/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.h
+++ b/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.h
@@ -13,6 +13,7 @@ std::pair<NYql::NDq::IDqAsyncLookupSource*, NActors::IActor*> CreateYtLookupActo
NFile::TYtFileServices::TPtr ytServices,
NActors::TActorId parentId,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
+ std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> keyTypeHelper,
const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
NYql::NYt::NSource::TLookupSource&& lookupSource,
const NKikimr::NMiniKQL::TStructType* keyType,
diff --git a/ydb/library/yql/providers/yt/actors/yql_yt_provider_factories.cpp b/ydb/library/yql/providers/yt/actors/yql_yt_provider_factories.cpp
index 18cbaa4016..c1e28d4d92 100644
--- a/ydb/library/yql/providers/yt/actors/yql_yt_provider_factories.cpp
+++ b/ydb/library/yql/providers/yt/actors/yql_yt_provider_factories.cpp
@@ -12,6 +12,7 @@ namespace NYql::NDq {
ytServices,
std::move(args.ParentId),
args.Alloc,
+ args.KeyTypeHelper,
functionRegistry,
std::move(lookupSource),
args.KeyType,