diff options
author | komels <komels@yandex-team.ru> | 2022-02-28 17:08:00 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-28 17:08:00 +0300 |
commit | e7de3116bb9e9e10fcc25dbad25e0a595e1324b4 (patch) | |
tree | a2035fb7a868a193067f96efd98f762d432f1aed | |
parent | 4531da9beb698966823ea272d1561d5f843d511c (diff) | |
download | ydb-e7de3116bb9e9e10fcc25dbad25e0a595e1324b4.tar.gz |
Backport PQ fixes from 21-4
8a8121ea26e5821bdcc586150b2fb3be5b68a3bf
890defc764d33def715b7c96b4f27adfacc14076
REVIEW: 2318246
REVIEW: 2353681
x-ydb-stable-ref: 40f9d95ccf1d9f080c0454da20ba7a7155a4c864
-rw-r--r-- | ydb/core/client/server/msgbus_server_persqueue.cpp | 4 | ||||
-rw-r--r-- | ydb/core/client/server/msgbus_server_persqueue.h | 2 | ||||
-rw-r--r-- | ydb/core/client/server/msgbus_server_pq_metacache.cpp | 156 | ||||
-rw-r--r-- | ydb/core/client/server/msgbus_server_pq_metacache.h | 35 | ||||
-rw-r--r-- | ydb/core/persqueue/writer/source_id_encoding.cpp | 55 | ||||
-rw-r--r-- | ydb/core/persqueue/writer/source_id_encoding.h | 17 | ||||
-rw-r--r-- | ydb/core/testlib/mock_pq_metacache.h | 2 | ||||
-rw-r--r-- | ydb/core/testlib/test_pq_client.h | 2 | ||||
-rw-r--r-- | ydb/library/persqueue/topic_parser/topic_parser.h | 21 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_actor.h | 20 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_write_actor.cpp | 152 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 63 |
12 files changed, 349 insertions, 180 deletions
diff --git a/ydb/core/client/server/msgbus_server_persqueue.cpp b/ydb/core/client/server/msgbus_server_persqueue.cpp index d14fd75ebe..347471e787 100644 --- a/ydb/core/client/server/msgbus_server_persqueue.cpp +++ b/ydb/core/client/server/msgbus_server_persqueue.cpp @@ -33,7 +33,7 @@ const TString& TopicPrefix(const TActorContext& ctx) { } TProcessingResult ProcessMetaCacheAllTopicsResponse(TEvPqNewMetaCache::TEvDescribeAllTopicsResponse::TPtr& ev) { - auto *res = ev->Get()->Result.Get(); + auto& res = ev->Get()->Result; const TString& path = ev->Get()->Path; TProcessingResult result; if (!ev->Get()->Success) { @@ -874,7 +874,7 @@ public: void Handle(TEvAllTopicsDescribeResponse::TPtr& ev, const TActorContext& ctx) { --DescribeRequests; - auto* res = ev->Get()->Result.Get(); + auto& res = ev->Get()->Result; auto processResult = ProcessMetaCacheAllTopicsResponse(ev); if (processResult.IsFatal) { ErrorReason = processResult.Reason; diff --git a/ydb/core/client/server/msgbus_server_persqueue.h b/ydb/core/client/server/msgbus_server_persqueue.h index 52c67832e4..1a915080e0 100644 --- a/ydb/core/client/server/msgbus_server_persqueue.h +++ b/ydb/core/client/server/msgbus_server_persqueue.h @@ -145,7 +145,7 @@ protected: const TActorId PqMetaCache; THashMap<TActorId, THolder<TPerTopicInfo>> Children; size_t ChildrenAnswered = 0; - THolder<NSchemeCache::TSchemeCacheNavigate> SchemeCacheResponse; + std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> SchemeCacheResponse; // Nodes info const bool ListNodes; diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp index f44ab6f480..169e2f549b 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp @@ -208,8 +208,8 @@ private: void HandleGetTopicsResult(const TActorContext& ctx) { auto result = AsyncQueryResult.GetValue(); if (!result.IsSuccess()) { - LOG_ERROR_S(ctx, NKikimrServices::PQ_METACACHE, - "Got error trying to get topics: " << result.GetIssues().ToString()); + LOG_INFO_S(ctx, NKikimrServices::PQ_METACACHE, + "Got error trying to get topics: " << result.GetIssues().ToString()); return Reset(ctx); } Y_VERIFY(result.GetResultSets().size() == 1); @@ -231,20 +231,11 @@ private: CurrentTopicsVersion = NewTopicsVersion; FullTopicsCacheOutdated = true; FullTopicsCache = nullptr; + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "reset topics cache"); while (!ListTopicsWaiters.empty()) { auto& waiter = ListTopicsWaiters.front(); - switch (waiter.Type) { - case EWaiterType::ListTopics: - SendListTopicsResponse(waiter.WaiterId, ctx); - break; - case EWaiterType::DescribeAllTopics: - ProcessDescribeAllTopics(waiter.WaiterId, ctx); - break; - default: - Y_FAIL(); - break; - } - ListTopicsWaiters.pop_front(); + ProcessDescribeAllTopics(waiter, ctx); + ListTopicsWaiters.pop(); } LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Updated topics list with : " << CurrentTopics.size() << " topics"); @@ -252,6 +243,7 @@ private: } } + void HandleQueryComplete(TEvPqNewMetaCache::TEvQueryComplete::TPtr& ev, const TActorContext& ctx) { switch (ev->Get()->Type) { case EQueryType::ECheckVersion: @@ -268,26 +260,25 @@ private: RunQuery(ev->Get()->Type, ctx); } - void HandleListTopics(TEvPqNewMetaCache::TEvListTopicsRequest::TPtr& ev, const TActorContext& ctx) { - if (!LastVersionUpdate || !EverGotTopics) { - ListTopicsWaiters.emplace_back(TWaiter{ev->Sender, {}, EWaiterType::ListTopics}); - } else { - SendListTopicsResponse(ev->Sender, ctx); - } + void HandleGetVersion(TEvPqNewMetaCache::TEvGetVersionRequest::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Send version response: " << CurrentTopicsVersion); + + ctx.Send(ev->Sender, new TEvPqNewMetaCache::TEvGetVersionResponse{CurrentTopicsVersion}); } void HandleDescribeTopics(TEvPqNewMetaCache::TEvDescribeTopicsRequest::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe topics with prefix: " << ev->Get()->PathPrefix); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe topics"); - SendSchemeCacheRequest(ev->Get()->Topics, !ev->Get()->PathPrefix.empty(), ctx); + SendSchemeCacheRequest(ev->Get()->Topics, !ev->Get()->PathPrefix.empty(), false, ctx); auto inserted = DescribeTopicsWaiters.insert(std::make_pair( RequestId, - TWaiter{ev->Sender, std::move(ev->Get()->Topics), EWaiterType::DescribeCustomTopics} + TWaiter{ev->Sender, std::move(ev->Get()->Topics)} )).second; Y_VERIFY(inserted); } void HandleDescribeAllTopics(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe all topics"); if (ev->Get()->PathPrefix && ev->Get()->PathPrefix != PathPrefix) { auto* response = new TEvPqNewMetaCache::TEvDescribeAllTopicsResponse(ev->Get()->PathPrefix); response->Success = false; @@ -295,40 +286,35 @@ private: return; } if (!EverGotTopics) { - ListTopicsWaiters.emplace_back(TWaiter{ev->Sender, {}, EWaiterType::DescribeAllTopics}); + ListTopicsWaiters.push(ev->Sender); return; } return ProcessDescribeAllTopics(ev->Sender, ctx); } - void ProcessDescribeAllTopics(const TActorId& waiterId, const TActorContext& ctx) { + void ProcessDescribeAllTopics(const TActorId& waiter, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Process describe all topics"); if (EverGotTopics && CurrentTopics.empty()) { - SendDescribeAllTopicsResponse(waiterId, ctx, true); + SendDescribeAllTopicsResponse(waiter, ctx, true); return; } if (FullTopicsCache && !FullTopicsCacheOutdated) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Respond from cache"); - - return SendDescribeAllTopicsResponse(waiterId, ctx); + return SendDescribeAllTopicsResponse(waiter, ctx); + } + if (DescribeAllTopicsWaiters.empty()) { + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Make full list SC request"); + SendSchemeCacheRequest(CurrentTopics, true, true, ctx); } LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Store waiter"); - SendSchemeCacheRequest(CurrentTopics, true, ctx); - auto inserted = DescribeTopicsWaiters.insert(std::make_pair( - RequestId, - TWaiter{waiterId, {}, EWaiterType::DescribeAllTopics} - )).second; - Y_VERIFY(inserted); + DescribeAllTopicsWaiters.push(waiter); FullTopicsCacheOutdated = false; FullTopicsCache = nullptr; } void HandleSchemeCacheResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { - auto* result = ev->Get()->Request.Release(); - auto waiterIter = DescribeTopicsWaiters.find(result->Instant); - Y_VERIFY(!waiterIter.IsEnd()); - auto& waiter = waiterIter->second; - - if (waiter.Type == EWaiterType::DescribeAllTopics) { + auto& result = ev->Get()->Request; + if (result->Instant == 0) { for (const auto& entry : result->ResultSet) { if (!entry.PQGroupInfo) { continue; @@ -338,62 +324,70 @@ private: if (desc.HasBalancerTabletID() && desc.GetBalancerTabletID() != 0) { continue; } - FullTopicsCacheOutdated = true; } - FullTopicsCache.Reset(result); + FullTopicsCache.reset(result.Release()); LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Updated topics cache with " << FullTopicsCache->ResultSet.size()); - SendDescribeAllTopicsResponse(waiter.WaiterId, ctx); + while (!DescribeAllTopicsWaiters.empty()) { + SendDescribeAllTopicsResponse(DescribeAllTopicsWaiters.front(), ctx); + DescribeAllTopicsWaiters.pop(); + } } else { - Y_VERIFY(waiter.Type == EWaiterType::DescribeCustomTopics); + auto waiterIter = DescribeTopicsWaiters.find(result->Instant); + Y_VERIFY(!waiterIter.IsEnd()); + auto& waiter = waiterIter->second; + Y_VERIFY(waiter.Topics.size() == result->ResultSet.size()); auto *response = new TEvPqNewMetaCache::TEvDescribeTopicsResponse{ - std::move(waiter.Topics), result + std::move(waiter.Topics), result.Release() }; ctx.Send(waiter.WaiterId, response); + DescribeTopicsWaiters.erase(waiterIter); } - DescribeTopicsWaiters.erase(waiterIter); } - void SendSchemeCacheRequest(const TVector<TString>& topics, bool addDefaultPathPrefix, const TActorContext& ctx) { - auto schemeCacheRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>(++RequestId); + void SendSchemeCacheRequest( + const TVector<TString>& topics, bool addDefaultPathPrefix, bool isFullListingRequest, const TActorContext& ctx + ) { + auto instant = isFullListingRequest ? 0 : ++RequestId; + auto schemeCacheRequest = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(instant); for (const auto& path : topics) { auto split = NKikimr::SplitPath(path); - Y_VERIFY(!split.empty()); NSchemeCache::TSchemeCacheNavigate::TEntry entry; if (addDefaultPathPrefix) { entry.Path.insert(entry.Path.end(), PathPrefixParts.begin(), PathPrefixParts.end()); } entry.Path.insert(entry.Path.end(), split.begin(), split.end()); - entry.SyncVersion = true; + entry.SyncVersion = !isFullListingRequest; entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTopic; schemeCacheRequest->ResultSet.emplace_back(std::move(entry)); } - ctx.Send(SchemeCacheId, new TEvTxProxySchemeCache::TEvNavigateKeySet(schemeCacheRequest.release())); - } - - void SendListTopicsResponse(const TActorId& recipient, const TActorContext& ctx) { - auto* response = new TEvPqNewMetaCache::TEvListTopicsResponse(); - response->Topics = CurrentTopics; - response->TopicsVersion = CurrentTopicsVersion; - ctx.Send(recipient, response); + ctx.Send(SchemeCacheId, new TEvTxProxySchemeCache::TEvNavigateKeySet(schemeCacheRequest.Release())); } void SendDescribeAllTopicsResponse(const TActorId& recipient, const TActorContext& ctx, bool empty = false) { - NSchemeCache::TSchemeCacheNavigate* scResponse; if (empty) { - scResponse = new NSchemeCache::TSchemeCacheNavigate(); + ctx.Send( + recipient, + new TEvPqNewMetaCache::TEvDescribeAllTopicsResponse( + PathPrefix, std::make_shared<NSchemeCache::TSchemeCacheNavigate>() + ) + ); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Send empty describe all topics response"); + return; } else { - scResponse = new NSchemeCache::TSchemeCacheNavigate(*FullTopicsCache); + ctx.Send( + recipient, + new TEvPqNewMetaCache::TEvDescribeAllTopicsResponse( + PathPrefix, FullTopicsCache + ) + ); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Send describe all topics response with " << FullTopicsCache->ResultSet.size() << " topics"); + } - LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Send describe all topics response with " << scResponse->ResultSet.size() << " topics"); - auto* response = new TEvPqNewMetaCache::TEvDescribeAllTopicsResponse( - PathPrefix, scResponse - ); - ctx.Send(recipient, response); } static NActors::TActorSystem* GetActorSystem() { @@ -407,16 +401,16 @@ public: } STRICT_STFUNC(StateFunc, - SFunc(NActors::TEvents::TEvWakeup, StartSession) - SFunc(TEvPqNewMetaCache::TEvSessionStarted, HandleSessionStarted) - SFunc(TEvPqNewMetaCache::TEvQueryPrepared, HandleTopicsQueryPrepared) - HFunc(TEvPqNewMetaCache::TEvQueryComplete, HandleQueryComplete) - HFunc(TEvPqNewMetaCache::TEvRestartQuery, HandleRestartQuery) - - HFunc(TEvPqNewMetaCache::TEvListTopicsRequest, HandleListTopics) - HFunc(TEvPqNewMetaCache::TEvDescribeTopicsRequest, HandleDescribeTopics) - HFunc(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest, HandleDescribeAllTopics) - HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse) + SFunc(NActors::TEvents::TEvWakeup, StartSession) + SFunc(TEvPqNewMetaCache::TEvSessionStarted, HandleSessionStarted) + SFunc(TEvPqNewMetaCache::TEvQueryPrepared, HandleTopicsQueryPrepared) + HFunc(TEvPqNewMetaCache::TEvQueryComplete, HandleQueryComplete) + HFunc(TEvPqNewMetaCache::TEvRestartQuery, HandleRestartQuery) + + HFunc(TEvPqNewMetaCache::TEvGetVersionRequest, HandleGetVersion) + HFunc(TEvPqNewMetaCache::TEvDescribeTopicsRequest, HandleDescribeTopics) + HFunc(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest, HandleDescribeAllTopics) + HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse) ) private: @@ -429,11 +423,6 @@ private: struct TWaiter { TActorId WaiterId; TVector<TString> Topics; - EWaiterType Type; - - static TWaiter AllTopicsWaiter(const TActorId& waiterId) { - return TWaiter{waiterId, {}, EWaiterType::DescribeAllTopics}; - } }; struct TTopicKey { @@ -513,11 +502,12 @@ private: TDuration VersionCheckInterval = TDuration::Seconds(1); TInstant LastVersionUpdate = TInstant::Zero(); - TDeque<TWaiter> ListTopicsWaiters; + TQueue<TActorId> ListTopicsWaiters; + TQueue<TActorId> DescribeAllTopicsWaiters; THashMap<ui64, TWaiter> DescribeTopicsWaiters; ui64 RequestId = 1; - THolder<NSchemeCache::TSchemeCacheNavigate> FullTopicsCache; + std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> FullTopicsCache; bool FullTopicsCacheOutdated = false; NActors::TActorId SchemeCacheId; TString PathPrefix; diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.h b/ydb/core/client/server/msgbus_server_pq_metacache.h index 6c12b76b04..64c4c5ed11 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.h +++ b/ydb/core/client/server/msgbus_server_pq_metacache.h @@ -42,8 +42,8 @@ struct TEvPqNewMetaCache { EvQueryPrepared, EvQueryComplete, EvRestartQuery, - EvListTopicsRequest, - EvListTopicsResponse, + EvGetVersionRequest, + EvGetVersionResponse, EvDescribeTopicsRequest, EvDescribeTopicsResponse, EvDescribeAllTopicsRequest, @@ -74,11 +74,14 @@ struct TEvPqNewMetaCache { : Type(type) {} }; - struct TEvListTopicsRequest : public TEventLocal<TEvListTopicsRequest, EvListTopicsRequest> { + struct TEvGetVersionRequest : public TEventLocal<TEvGetVersionRequest, EvGetVersionRequest> { }; - struct TEvListTopicsResponse : public TEventLocal<TEvListTopicsResponse, EvListTopicsResponse> { - TVector<TString> Topics; + struct TEvGetVersionResponse : public TEventLocal<TEvGetVersionResponse, EvGetVersionResponse> { + TEvGetVersionResponse(ui64 version) + : TopicsVersion(version) + {} + ui64 TopicsVersion; }; @@ -89,23 +92,23 @@ struct TEvPqNewMetaCache { TEvDescribeTopicsRequest() = default; explicit TEvDescribeTopicsRequest(const TVector<TString>& topics) - : Topics(topics) + : Topics(topics) {} TEvDescribeTopicsRequest(const TVector<TString>& topics, const TString& pathPrefix) - : PathPrefix(pathPrefix) - , Topics(topics) + : PathPrefix(pathPrefix) + , Topics(topics) {} }; struct TEvDescribeTopicsResponse : public TEventLocal<TEvDescribeTopicsResponse, EvDescribeTopicsResponse> { TVector<TString> TopicsRequested; - THolder<NSchemeCache::TSchemeCacheNavigate> Result; + std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> Result; explicit TEvDescribeTopicsResponse(const TVector<TString>& topics, NSchemeCache::TSchemeCacheNavigate* result) - : TopicsRequested(topics) - , Result(result) + : TopicsRequested(topics) + , Result(result) {} }; @@ -120,13 +123,13 @@ struct TEvPqNewMetaCache { struct TEvDescribeAllTopicsResponse : public TEventLocal<TEvDescribeAllTopicsResponse, EvDescribeAllTopicsResponse> { bool Success = true; TString Path; - THolder<NSchemeCache::TSchemeCacheNavigate> Result; + std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> Result; explicit TEvDescribeAllTopicsResponse(const TString& path) - : Path(path) + : Path(path) {} - TEvDescribeAllTopicsResponse(const TString& path, NSchemeCache::TSchemeCacheNavigate* result) - : Path(path) - , Result(result) + TEvDescribeAllTopicsResponse(const TString& path, const std::shared_ptr<NSchemeCache::TSchemeCacheNavigate>& result) + : Path(path) + , Result(result) {} }; }; diff --git a/ydb/core/persqueue/writer/source_id_encoding.cpp b/ydb/core/persqueue/writer/source_id_encoding.cpp index 13a37c2c61..b528da2099 100644 --- a/ydb/core/persqueue/writer/source_id_encoding.cpp +++ b/ydb/core/persqueue/writer/source_id_encoding.cpp @@ -4,11 +4,54 @@ #include <util/generic/yexception.h> #include <util/string/strip.h> +#include <util/string/builder.h> +#include <util/string/hex.h> +#include <util/digest/murmur.h> +#include <library/cpp/digest/md5/md5.h> namespace NKikimr { namespace NPQ { + + +TString GetSourceIdSelectQueryFromPath(const TString& path) { + TStringBuilder res; + res << "--!syntax_v1\n" + "DECLARE $Hash AS Uint32; " + "DECLARE $Topic AS Utf8; " + "DECLARE $SourceId AS Utf8; " + "SELECT Partition, CreateTime, AccessTime FROM `" << path << "` " + "WHERE Hash == $Hash AND Topic == $Topic AND SourceId == $SourceId;"; + return res; +} + +TString GetSourceIdSelectQuery(const TString& root) { + return GetSourceIdSelectQueryFromPath(root + "/SourceIdMeta2"); +} + +TString GetUpdateIdSelectQueryFromPath(const TString& path) { + TStringBuilder res; + res << "--!syntax_v1\n" + "DECLARE $SourceId AS Utf8; " + "DECLARE $Topic AS Utf8; " + "DECLARE $Hash AS Uint32; " + "DECLARE $Partition AS Uint32; " + "DECLARE $CreateTime AS Uint64; " + "DECLARE $AccessTime AS Uint64;\n" + "UPSERT INTO `" << path << "` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition) VALUES " + "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition);"; + + return res; +} + +TString GetUpdateIdSelectQuery(const TString& root) { + return GetUpdateIdSelectQueryFromPath(root + "/SourceIdMeta2"); +} + + namespace NSourceIdEncoding { +static const ui32 MURMUR_ARRAY_SEED = 0x9747b28c; + struct TTags { static constexpr char Simple = 0; static constexpr char Base64 = 1; @@ -84,6 +127,18 @@ bool IsValidEncoded(const TString& sourceId) { } } + +TEncodedSourceId EncodeSrcId(const TString& topic, const TString& userSourceId) { + TEncodedSourceId res; + TString encodedSourceId = Encode(userSourceId); + res.EscapedSourceId = HexEncode(encodedSourceId); + + TString s = topic + encodedSourceId; + res.Hash = MurmurHash<ui32>(s.c_str(), s.size(), MURMUR_ARRAY_SEED); + return res; +} + + } // NSourceIdEncoding } // NPQ } // NKikimr diff --git a/ydb/core/persqueue/writer/source_id_encoding.h b/ydb/core/persqueue/writer/source_id_encoding.h index 6472d8b4ac..c48a1c574e 100644 --- a/ydb/core/persqueue/writer/source_id_encoding.h +++ b/ydb/core/persqueue/writer/source_id_encoding.h @@ -1,9 +1,18 @@ #pragma once #include <util/generic/fwd.h> +#include <util/generic/string.h> namespace NKikimr { namespace NPQ { + +TString GetSourceIdSelectQuery(const TString& root); +TString GetUpdateIdSelectQuery(const TString& root); + +TString GetSourceIdSelectQueryFromPath(const TString& path); +TString GetUpdateIdSelectQueryFromPath(const TString& path); + + namespace NSourceIdEncoding { TString EncodeSimple(const TString& sourceId); @@ -13,6 +22,14 @@ TString Decode(const TString& encodedSourceId); bool IsValidEncoded(const TString& encodedSourceId); + +struct TEncodedSourceId { + TString EscapedSourceId; + ui32 Hash = 0; +}; + +TEncodedSourceId EncodeSrcId(const TString& topic, const TString& userSourceId); + } // NSourceIdEncoding } // NPQ } // NKikimr diff --git a/ydb/core/testlib/mock_pq_metacache.h b/ydb/core/testlib/mock_pq_metacache.h index 1280f87779..e132cd5f9a 100644 --- a/ydb/core/testlib/mock_pq_metacache.h +++ b/ydb/core/testlib/mock_pq_metacache.h @@ -69,7 +69,7 @@ public: response->Success = success; auto* result = new NSchemeCache::TSchemeCacheNavigate(); result->ResultSet = resultSet; - response->Result.Reset(result); + response->Result.reset(result); ctx.Send(ev->Sender, std::move(response)); }; diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h index 59a93052f7..841500d544 100644 --- a/ydb/core/testlib/test_pq_client.h +++ b/ydb/core/testlib/test_pq_client.h @@ -478,6 +478,7 @@ private: UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); } +public: TMaybe<NYdb::TResultSet> RunYqlDataQueryWithParams(TString query, const NYdb::TParams& params) { auto tableClient = NYdb::NTable::TTableClient(*Driver); TMaybe<NYdb::TResultSet> rs; @@ -502,7 +503,6 @@ private: } -public: TFlatMsgBusPQClient( const Tests::TServerSettings& settings, ui16 grpc, TMaybe<TString> databaseName = Nothing() ) diff --git a/ydb/library/persqueue/topic_parser/topic_parser.h b/ydb/library/persqueue/topic_parser/topic_parser.h index eb5a69cc37..4f5ade9c99 100644 --- a/ydb/library/persqueue/topic_parser/topic_parser.h +++ b/ydb/library/persqueue/topic_parser/topic_parser.h @@ -53,6 +53,8 @@ protected: TStringBuf topicNormalized = topic; topicNormalized.SkipPrefix("/"); + CHECK_SET_VALID(!topicNormalized.StartsWith("/"), "Multiple leading '/' in topic name", return *this); + CHECK_SET_VALID(!topicNormalized.empty(), "Empty topic name", return *this); TStringBuf dbNormalized = database; dbNormalized.SkipPrefix("/"); @@ -167,6 +169,25 @@ public: } return TString{fst}; } + TString GetTopicForSrcId() const { + if (!IsValid()) + return {}; + if (NoDcMode) { + return FullModernPath; + } else { + return FullLegacyName; + } + } + + TString GetTopicForSrcIdHash() const { + if (!IsValid()) + return {}; + if (NoDcMode) { + return FullModernPath; + } else { + return LegacyName; + } + } private: bool IsLegacyTopicName(const TStringBuf& path) { diff --git a/ydb/services/persqueue_v1/grpc_pq_actor.h b/ydb/services/persqueue_v1/grpc_pq_actor.h index 65c8aca1db..55127d3f03 100644 --- a/ydb/services/persqueue_v1/grpc_pq_actor.h +++ b/ydb/services/persqueue_v1/grpc_pq_actor.h @@ -9,7 +9,7 @@ #include <ydb/core/client/server/msgbus_server_pq_metacache.h> #include <ydb/core/client/server/msgbus_server_persqueue.h> - +#include <ydb/core/persqueue/writer/source_id_encoding.h> #include <ydb/core/base/events.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> @@ -526,12 +526,16 @@ private: void LogSession(const TActorContext& ctx); void DiscoverPartition(const NActors::TActorContext& ctx); + void SendSelectPartitionRequest(ui32 hash, const TString& topic, const NActors::TActorContext& ctx); + void UpdatePartition(const NActors::TActorContext& ctx); void RequestNextPartition(const NActors::TActorContext& ctx); void ProceedPartition(const ui32 partition, const NActors::TActorContext& ctx); - THolder<NKqp::TEvKqp::TEvQueryRequest> MakeUpdateSourceIdMetadataRequest(const NActors::TActorContext& ctx); - void InitCheckACL(const TActorContext& ctx); + THolder<NKqp::TEvKqp::TEvQueryRequest> MakeUpdateSourceIdMetadataRequest(ui32 hash, const TString& topic, + const NActors::TActorContext& ctx); + void SendUpdateSrcIdsRequests(const TActorContext& ctx); + //void InitCheckACL(const TActorContext& ctx); void Handle(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev, const TActorContext& ctx); void Handle(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev, const TActorContext& ctx); @@ -581,10 +585,11 @@ private: NPersQueue::TConverterPtr TopicConverter; ui32 Partition; ui32 PreferedPartition; + bool PartitionFound = false; // 'SourceId' is called 'MessageGroupId' since gRPC data plane API v1 TString SourceId; // TODO: Replace with 'MessageGroupId' everywhere - TString EscapedSourceId; - ui32 Hash = 0; + NPQ::NSourceIdEncoding::TEncodedSourceId EncodedSourceId; + ui32 CompatibleHash; TString OwnerCookie; TString UserAgent; @@ -659,9 +664,12 @@ private: TString ClientDC; TString SelectSourceIdQuery; TString UpdateSourceIdQuery; + ui32 SelectSrcIdsInflight = 0; + ui64 MaxSrcIdAccessTime = 0; + TInstant LastSourceIdUpdate; ui64 SourceIdCreateTime; - bool SourceIdUpdateInfly; + ui32 SourceIdUpdatesInflight = 0; TVector<NPQ::TLabelsInfo> Aggr; NKikimr::NPQ::TMultiCounter SLITotal; diff --git a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp index 86f101df9b..98334996a3 100644 --- a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp @@ -23,6 +23,7 @@ using namespace NKikimrClient; namespace NKikimr { using namespace NSchemeCache; +using namespace NPQ; Ydb::PersQueue::V1::Codec CodecByName(const TString& codec) { static const THashMap<TString, Ydb::PersQueue::V1::Codec> codecsByName = { @@ -81,30 +82,8 @@ using namespace Ydb::PersQueue::V1; static const ui32 MAX_RESERVE_REQUESTS_INFLIGHT = 5; static const ui32 MAX_BYTES_INFLIGHT = 1 << 20; //1mb -static const ui32 MURMUR_ARRAY_SEED = 0x9747b28c; static const TDuration SOURCEID_UPDATE_PERIOD = TDuration::Hours(1); -static const TString SELECT_SOURCEID_QUERY1 = - "--!syntax_v1\n" - "DECLARE $Hash AS Uint32; " - "DECLARE $Topic AS Utf8; " - "DECLARE $SourceId AS Utf8; " - "SELECT Partition, CreateTime FROM `"; -static const TString SELECT_SOURCEID_QUERY2 = "` " - "WHERE Hash == $Hash AND Topic == $Topic AND SourceId == $SourceId; "; - -static const TString UPDATE_SOURCEID_QUERY1 = - "--!syntax_v1\n" - "DECLARE $SourceId AS Utf8; " - "DECLARE $Topic AS Utf8; " - "DECLARE $Hash AS Uint32; " - "DECLARE $Partition AS Uint32; " - "DECLARE $CreateTime AS Uint64; " - "DECLARE $AccessTime AS Uint64; " - "UPSERT INTO `"; -static const TString UPDATE_SOURCEID_QUERY2 = "` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition) VALUES " - "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition); "; - //TODO: add here tracking of bytes in/out @@ -142,7 +121,7 @@ TWriteSessionActor::TWriteSessionActor( , ClientDC(clientDC ? *clientDC : "other") , LastSourceIdUpdate(TInstant::Zero()) , SourceIdCreateTime(0) - , SourceIdUpdateInfly(false) + , SourceIdUpdatesInflight(0) { Y_ASSERT(Request); ++(*GetServiceCounters(Counters, "pqproxy|writeSession")->GetCounter("SessionsCreatedTotal", true)); @@ -154,8 +133,8 @@ TWriteSessionActor::~TWriteSessionActor() = default; void TWriteSessionActor::Bootstrap(const TActorContext& ctx) { Y_VERIFY(Request); - SelectSourceIdQuery = SELECT_SOURCEID_QUERY1 + AppData(ctx)->PQConfig.GetSourceIdTablePath() + SELECT_SOURCEID_QUERY2; - UpdateSourceIdQuery = UPDATE_SOURCEID_QUERY1 + AppData(ctx)->PQConfig.GetSourceIdTablePath() + UPDATE_SOURCEID_QUERY2; + SelectSourceIdQuery = GetSourceIdSelectQueryFromPath(AppData(ctx)->PQConfig.GetSourceIdTablePath()); + UpdateSourceIdQuery = GetUpdateIdSelectQueryFromPath(AppData(ctx)->PQConfig.GetSourceIdTablePath()); Request->GetStreamCtx()->Attach(ctx.SelfID); if (!Request->GetStreamCtx()->Read()) { @@ -316,17 +295,18 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActor PeerName = event->PeerName; SourceId = init.message_group_id(); - TString encodedSourceId; try { - encodedSourceId = NPQ::NSourceIdEncoding::Encode(SourceId); + // "Bad" hash - in legacy mode calculated from full name ("rt3...", not short name); + // Here we had a bug for all the time being and now have to keep compatibility was invalid hashes + // Generally GetTopicForSrcIdHash for encoding. Do not copy-paste this line; + EncodedSourceId = NSourceIdEncoding::EncodeSrcId(TopicConverter->GetTopicForSrcId(), SourceId); + + // Good hash and proper way of calcultion; + CompatibleHash = NSourceIdEncoding::EncodeSrcId(TopicConverter->GetTopicForSrcIdHash(), SourceId).Hash; } catch (yexception& e) { CloseSession(TStringBuilder() << "incorrect sourceId \"" << SourceId << "\": " << e.what(), PersQueue::ErrorCode::BAD_REQUEST, ctx); return; } - EscapedSourceId = HexEncode(encodedSourceId); - - TString s = TopicConverter->GetClientsideName() + encodedSourceId; - Hash = MurmurHash<ui32>(s.c_str(), s.size(), MURMUR_ARRAY_SEED); LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << " " << init << " from " << PeerName); //TODO: get user agent from headers @@ -406,7 +386,7 @@ void TWriteSessionActor::InitCheckSchema(const TActorContext& ctx, bool needWait } void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActorContext& ctx) { - auto* res = ev->Get()->Result.Get(); + auto& res = ev->Get()->Result; Y_VERIFY(res->ResultSet.size() == 1); auto& entry = res->ResultSet[0]; @@ -529,7 +509,12 @@ void TWriteSessionActor::DiscoverPartition(const NActors::TActorContext& ctx) { ProceedPartition(partitionId, ctx); return; } + SendSelectPartitionRequest(CompatibleHash, TopicConverter->GetFullLegacyName(), ctx); + SendSelectPartitionRequest(EncodedSourceId.Hash, TopicConverter->GetFullLegacyName(), ctx); + State = ES_WAIT_TABLE_REQUEST_1; +} +void TWriteSessionActor::SendSelectPartitionRequest(ui32 hash, const TString& topic, const NActors::TActorContext& ctx) { //read from DS auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); @@ -543,19 +528,20 @@ void TWriteSessionActor::DiscoverPartition(const NActors::TActorContext& ctx) { // keep compiled query in cache. ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); NClient::TParameters parameters; - parameters["$Hash"] = Hash; - parameters["$Topic"] = TopicConverter->GetClientsideName(); - parameters["$SourceId"] = EscapedSourceId; + + parameters["$Hash"] = hash; + parameters["$Topic"] = topic; + parameters["$SourceId"] = EncodedSourceId.EscapedSourceId; + ev->Record.MutableRequest()->MutableParameters()->Swap(¶meters); ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); - State = ES_WAIT_TABLE_REQUEST_1; + SelectSrcIdsInflight++; } void TWriteSessionActor::UpdatePartition(const TActorContext& ctx) { Y_VERIFY(State == ES_WAIT_TABLE_REQUEST_1 || State == ES_WAIT_NEXT_PARTITION); - auto ev = MakeUpdateSourceIdMetadataRequest(ctx); - ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); + SendUpdateSrcIdsRequests(ctx); State = ES_WAIT_TABLE_REQUEST_2; } @@ -589,7 +575,7 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const if (record.GetYdbStatus() == Ydb::StatusIds::ABORTED) { LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " messageGroupId " - << SourceId << " escaped " << EscapedSourceId << " discover partition race, retrying"); + << SourceId << " escaped " << EncodedSourceId.EscapedSourceId << " discover partition race, retrying"); DiscoverPartition(ctx); return; } @@ -599,7 +585,7 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const errorReason << "internal error in kqp Marker# PQ50 : " << record; if (State == EState::ES_INITED) { LOG_WARN_S(ctx, NKikimrServices::PQ_WRITE_PROXY, errorReason); - SourceIdUpdateInfly = false; + SourceIdUpdatesInflight--; } else { CloseSession(errorReason, PersQueue::ErrorCode::ERROR, ctx); } @@ -607,53 +593,66 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const } if (State == EState::ES_WAIT_TABLE_REQUEST_1) { - SourceIdCreateTime = TInstant::Now().MilliSeconds(); - - bool partitionFound = false; + SelectSrcIdsInflight--; auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0); if (t.ListSize() != 0) { auto& tt = t.GetList(0).GetStruct(0); if (tt.HasOptional() && tt.GetOptional().HasUint32()) { //already got partition - Partition = tt.GetOptional().GetUint32(); - if (PreferedPartition < Max<ui32>() && Partition != PreferedPartition) { - CloseSession(TStringBuilder() << "MessageGroupId " << SourceId << " is already bound to PartitionGroupId " << (Partition + 1) << ", but client provided " << (PreferedPartition + 1) << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use another MessageGroupId, specify PartitionGroupId " << (Partition + 1) << ", or do not specify PartitionGroupId at all.", - PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + auto accessTime = t.GetList(0).GetStruct(2).GetOptional().GetUint64(); + if (accessTime > MaxSrcIdAccessTime) { // AccessTime + Partition = tt.GetOptional().GetUint32(); + PartitionFound = true; + SourceIdCreateTime = t.GetList(0).GetStruct(1).GetOptional().GetUint64(); + MaxSrcIdAccessTime = accessTime; } - partitionFound = true; - SourceIdCreateTime = t.GetList(0).GetStruct(1).GetOptional().GetUint64(); } } + if (SelectSrcIdsInflight != 0) { + return; + } + if (PartitionFound && PreferedPartition < Max<ui32>() && Partition != PreferedPartition) { + CloseSession(TStringBuilder() << "MessageGroupId " << SourceId << " is already bound to PartitionGroupId " << (Partition + 1) << ", but client provided " << (PreferedPartition + 1) << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use another MessageGroupId, specify PartitionGroupId " << (Partition + 1) << ", or do not specify PartitionGroupId at all.", + PersQueue::ErrorCode::BAD_REQUEST, ctx); + return; + } + if (SourceIdCreateTime == 0) { + SourceIdCreateTime = TInstant::Now().MilliSeconds(); + } LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " messageGroupId " - << SourceId << " escaped " << EscapedSourceId << " hash " << Hash << " partition " << Partition << " partitions " - << PartitionToTablet.size() << "(" << Hash % PartitionToTablet.size() << ") create " << SourceIdCreateTime << " result " << t); + << SourceId << " escaped " << EncodedSourceId.EscapedSourceId << " hash " << CompatibleHash << " partition " << Partition << " partitions " + << PartitionToTablet.size() << "(" << CompatibleHash % PartitionToTablet.size() << ") create " << SourceIdCreateTime << " result " << t); - if (!partitionFound && (PreferedPartition < Max<ui32>() || !AppData(ctx)->PQConfig.GetRoundRobinPartitionMapping())) { - Partition = PreferedPartition < Max<ui32>() ? PreferedPartition : Hash % PartitionToTablet.size(); //choose partition default value - partitionFound = true; + if (!PartitionFound && (PreferedPartition < Max<ui32>() || !AppData(ctx)->PQConfig.GetRoundRobinPartitionMapping())) { + Partition = PreferedPartition < Max<ui32>() ? PreferedPartition : CompatibleHash % PartitionToTablet.size(); //choose partition default value + PartitionFound = true; } - if (partitionFound) { + if (PartitionFound) { UpdatePartition(ctx); } else { RequestNextPartition(ctx); } return; } else if (State == EState::ES_WAIT_TABLE_REQUEST_2) { - LastSourceIdUpdate = ctx.Now(); - ProceedPartition(Partition, ctx); + SourceIdUpdatesInflight--; + if (!SourceIdUpdatesInflight) { + LastSourceIdUpdate = ctx.Now(); + ProceedPartition(Partition, ctx); + } } else if (State == EState::ES_INITED) { - SourceIdUpdateInfly = false; - LastSourceIdUpdate = ctx.Now(); + SourceIdUpdatesInflight--; + if (!SourceIdUpdatesInflight) { + LastSourceIdUpdate = ctx.Now(); + } } else { Y_FAIL("Wrong state"); } } THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor::MakeUpdateSourceIdMetadataRequest( - const NActors::TActorContext& ctx + ui32 hash, const TString& topic, const NActors::TActorContext& ctx ) { auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); @@ -670,9 +669,10 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor::MakeUpdateSourceIdMet ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); NClient::TParameters parameters; - parameters["$Hash"] = Hash; - parameters["$Topic"] = TopicConverter->GetClientsideName(); - parameters["$SourceId"] = EscapedSourceId; + parameters["$Hash"] = hash; + parameters["$Topic"] = topic; + parameters["$SourceId"] = EncodedSourceId.EscapedSourceId; + parameters["$CreateTime"] = SourceIdCreateTime; parameters["$AccessTime"] = TInstant::Now().MilliSeconds(); parameters["$Partition"] = Partition; @@ -681,12 +681,25 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor::MakeUpdateSourceIdMet return ev; } +void TWriteSessionActor::SendUpdateSrcIdsRequests(const TActorContext& ctx) { + { + //full legacy name (rt3.dc--acc--topic) + auto ev = MakeUpdateSourceIdMetadataRequest(CompatibleHash, TopicConverter->GetFullLegacyName(), ctx); + SourceIdUpdatesInflight++; + ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); + } + { + auto ev = MakeUpdateSourceIdMetadataRequest(EncodedSourceId.Hash, TopicConverter->GetFullLegacyName(), ctx); + SourceIdUpdatesInflight++; + ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); + } +} void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr &ev, const TActorContext &ctx) { auto& record = ev->Get()->Record; LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session cookie: " << Cookie << " sessionId: " << OwnerCookie << " sourceID " - << SourceId << " escaped " << EscapedSourceId << " discover partition error - " << record); + << SourceId << " escaped " << EncodedSourceId.EscapedSourceId << " discover partition error - " << record); CloseSession("Internal error on discovering partition", PersQueue::ErrorCode::ERROR, ctx); } @@ -1167,10 +1180,11 @@ void TWriteSessionActor::HandleWakeup(const TActorContext& ctx) { InitCheckSchema(ctx); } // ToDo[migration] - separate flag for having config tables - if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() && !SourceIdUpdateInfly && ctx.Now() - LastSourceIdUpdate > SOURCEID_UPDATE_PERIOD) { - auto ev = MakeUpdateSourceIdMetadataRequest(ctx); - SourceIdUpdateInfly = true; - ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); + if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() + && !SourceIdUpdatesInflight + && ctx.Now() - LastSourceIdUpdate > SOURCEID_UPDATE_PERIOD + ) { + SendUpdateSrcIdsRequests(ctx); } if (ctx.Now() >= LogSessionDeadline) { LogSession(ctx); diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 2c955a1085..42ea25e7c1 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -9,7 +9,7 @@ #include <ydb/core/testlib/test_pq_client.h> #include <ydb/core/protos/grpc_pq_old.pb.h> #include <ydb/core/persqueue/cluster_tracker.h> - +#include <ydb/core/persqueue/writer/source_id_encoding.h> #include <ydb/core/tablet/tablet_counters_aggregator.h> #include <ydb/library/aclib/aclib.h> @@ -3568,5 +3568,66 @@ namespace { Cerr << "partition status: " << partitionStatus->DebugString() << Endl; } } + Y_UNIT_TEST(SrcIdCompatibility) { + NPersQueue::TTestServer server{}; + auto runTest = [&] ( + const TString& topicToAdd, const TString& topicForHash, const TString& topicName, + const TString& srcId, ui32 partId, ui64 accessTime = 0 + ) { + TStringBuilder query; + auto encoded = NPQ::NSourceIdEncoding::EncodeSrcId(topicForHash, srcId); + Cerr << "===save partition with time: " << accessTime << Endl; + + if (accessTime == 0) { + accessTime = TInstant::Now().MilliSeconds(); + } + if (!topicToAdd.empty()) { // Empty means don't add anything + query << + "--!syntax_v1\n" + "UPSERT INTO `/Root/PQ/SourceIdMeta2` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition) VALUES (" + << encoded.Hash << ", \"" << topicToAdd << "\", \"" << encoded.EscapedSourceId << "\", " + << TInstant::Now().MilliSeconds() << ", " << accessTime << ", " << partId << "); "; + Cerr << "Run query:\n" << query << Endl; + auto scResult = server.AnnoyingClient->RunYqlDataQuery(query); + //UNIT_ASSERT(scResult.Defined()); + } + + auto driver = server.AnnoyingClient->GetDriver(); + auto writer = CreateWriter(*driver, topicName, srcId); + auto ev = writer->GetEvent(true); + auto ct = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent >(&*ev); + UNIT_ASSERT(ct); + writer->Write(std::move(ct->ContinuationToken), "1234567890"); + UNIT_ASSERT(ev.Defined()); + while(true) { + ev = writer->GetEvent(true); + auto ack = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TAcksEvent>(&*ev); + if (ack) { + UNIT_ASSERT_VALUES_EQUAL(ack->Acks[0].Details->PartitionId, partId); + break; + } + + } + }; + + TString legacyName = "rt3.dc1--account--topic100"; + TString shortLegacyName = "account--topic100"; + TString fullPath = "/Root/PQ/rt3.dc1--account--topic100"; + TString topicName = "account/topic100"; + TString srcId1 = "test-src-id-compat", srcId2 = "test-src-id-compat2"; + server.AnnoyingClient->CreateTopic(legacyName, 100); + + runTest(legacyName, shortLegacyName, topicName, srcId1, 5, 100); + runTest(legacyName, legacyName, topicName, srcId2, 6, 100); + runTest("", "", topicName, srcId1, 5, 100); + runTest("", "", topicName, srcId2, 6, 100); + + ui64 time = (TInstant::Now() + TDuration::Hours(4)).MilliSeconds(); + runTest(legacyName, legacyName, topicName, srcId2, 7, time); + + + + } + } } |