aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <komels@yandex-team.ru>2022-02-28 17:08:00 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-28 17:08:00 +0300
commite7de3116bb9e9e10fcc25dbad25e0a595e1324b4 (patch)
treea2035fb7a868a193067f96efd98f762d432f1aed
parent4531da9beb698966823ea272d1561d5f843d511c (diff)
downloadydb-e7de3116bb9e9e10fcc25dbad25e0a595e1324b4.tar.gz
Backport PQ fixes from 21-4
8a8121ea26e5821bdcc586150b2fb3be5b68a3bf 890defc764d33def715b7c96b4f27adfacc14076 REVIEW: 2318246 REVIEW: 2353681 x-ydb-stable-ref: 40f9d95ccf1d9f080c0454da20ba7a7155a4c864
-rw-r--r--ydb/core/client/server/msgbus_server_persqueue.cpp4
-rw-r--r--ydb/core/client/server/msgbus_server_persqueue.h2
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metacache.cpp156
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metacache.h35
-rw-r--r--ydb/core/persqueue/writer/source_id_encoding.cpp55
-rw-r--r--ydb/core/persqueue/writer/source_id_encoding.h17
-rw-r--r--ydb/core/testlib/mock_pq_metacache.h2
-rw-r--r--ydb/core/testlib/test_pq_client.h2
-rw-r--r--ydb/library/persqueue/topic_parser/topic_parser.h21
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_actor.h20
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_write_actor.cpp152
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp63
12 files changed, 349 insertions, 180 deletions
diff --git a/ydb/core/client/server/msgbus_server_persqueue.cpp b/ydb/core/client/server/msgbus_server_persqueue.cpp
index d14fd75ebe..347471e787 100644
--- a/ydb/core/client/server/msgbus_server_persqueue.cpp
+++ b/ydb/core/client/server/msgbus_server_persqueue.cpp
@@ -33,7 +33,7 @@ const TString& TopicPrefix(const TActorContext& ctx) {
}
TProcessingResult ProcessMetaCacheAllTopicsResponse(TEvPqNewMetaCache::TEvDescribeAllTopicsResponse::TPtr& ev) {
- auto *res = ev->Get()->Result.Get();
+ auto& res = ev->Get()->Result;
const TString& path = ev->Get()->Path;
TProcessingResult result;
if (!ev->Get()->Success) {
@@ -874,7 +874,7 @@ public:
void Handle(TEvAllTopicsDescribeResponse::TPtr& ev, const TActorContext& ctx) {
--DescribeRequests;
- auto* res = ev->Get()->Result.Get();
+ auto& res = ev->Get()->Result;
auto processResult = ProcessMetaCacheAllTopicsResponse(ev);
if (processResult.IsFatal) {
ErrorReason = processResult.Reason;
diff --git a/ydb/core/client/server/msgbus_server_persqueue.h b/ydb/core/client/server/msgbus_server_persqueue.h
index 52c67832e4..1a915080e0 100644
--- a/ydb/core/client/server/msgbus_server_persqueue.h
+++ b/ydb/core/client/server/msgbus_server_persqueue.h
@@ -145,7 +145,7 @@ protected:
const TActorId PqMetaCache;
THashMap<TActorId, THolder<TPerTopicInfo>> Children;
size_t ChildrenAnswered = 0;
- THolder<NSchemeCache::TSchemeCacheNavigate> SchemeCacheResponse;
+ std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> SchemeCacheResponse;
// Nodes info
const bool ListNodes;
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
index f44ab6f480..169e2f549b 100644
--- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp
+++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
@@ -208,8 +208,8 @@ private:
void HandleGetTopicsResult(const TActorContext& ctx) {
auto result = AsyncQueryResult.GetValue();
if (!result.IsSuccess()) {
- LOG_ERROR_S(ctx, NKikimrServices::PQ_METACACHE,
- "Got error trying to get topics: " << result.GetIssues().ToString());
+ LOG_INFO_S(ctx, NKikimrServices::PQ_METACACHE,
+ "Got error trying to get topics: " << result.GetIssues().ToString());
return Reset(ctx);
}
Y_VERIFY(result.GetResultSets().size() == 1);
@@ -231,20 +231,11 @@ private:
CurrentTopicsVersion = NewTopicsVersion;
FullTopicsCacheOutdated = true;
FullTopicsCache = nullptr;
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "reset topics cache");
while (!ListTopicsWaiters.empty()) {
auto& waiter = ListTopicsWaiters.front();
- switch (waiter.Type) {
- case EWaiterType::ListTopics:
- SendListTopicsResponse(waiter.WaiterId, ctx);
- break;
- case EWaiterType::DescribeAllTopics:
- ProcessDescribeAllTopics(waiter.WaiterId, ctx);
- break;
- default:
- Y_FAIL();
- break;
- }
- ListTopicsWaiters.pop_front();
+ ProcessDescribeAllTopics(waiter, ctx);
+ ListTopicsWaiters.pop();
}
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE,
"Updated topics list with : " << CurrentTopics.size() << " topics");
@@ -252,6 +243,7 @@ private:
}
}
+
void HandleQueryComplete(TEvPqNewMetaCache::TEvQueryComplete::TPtr& ev, const TActorContext& ctx) {
switch (ev->Get()->Type) {
case EQueryType::ECheckVersion:
@@ -268,26 +260,25 @@ private:
RunQuery(ev->Get()->Type, ctx);
}
- void HandleListTopics(TEvPqNewMetaCache::TEvListTopicsRequest::TPtr& ev, const TActorContext& ctx) {
- if (!LastVersionUpdate || !EverGotTopics) {
- ListTopicsWaiters.emplace_back(TWaiter{ev->Sender, {}, EWaiterType::ListTopics});
- } else {
- SendListTopicsResponse(ev->Sender, ctx);
- }
+ void HandleGetVersion(TEvPqNewMetaCache::TEvGetVersionRequest::TPtr& ev, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Send version response: " << CurrentTopicsVersion);
+
+ ctx.Send(ev->Sender, new TEvPqNewMetaCache::TEvGetVersionResponse{CurrentTopicsVersion});
}
void HandleDescribeTopics(TEvPqNewMetaCache::TEvDescribeTopicsRequest::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe topics with prefix: " << ev->Get()->PathPrefix);
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe topics");
- SendSchemeCacheRequest(ev->Get()->Topics, !ev->Get()->PathPrefix.empty(), ctx);
+ SendSchemeCacheRequest(ev->Get()->Topics, !ev->Get()->PathPrefix.empty(), false, ctx);
auto inserted = DescribeTopicsWaiters.insert(std::make_pair(
RequestId,
- TWaiter{ev->Sender, std::move(ev->Get()->Topics), EWaiterType::DescribeCustomTopics}
+ TWaiter{ev->Sender, std::move(ev->Get()->Topics)}
)).second;
Y_VERIFY(inserted);
}
void HandleDescribeAllTopics(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest::TPtr& ev, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe all topics");
if (ev->Get()->PathPrefix && ev->Get()->PathPrefix != PathPrefix) {
auto* response = new TEvPqNewMetaCache::TEvDescribeAllTopicsResponse(ev->Get()->PathPrefix);
response->Success = false;
@@ -295,40 +286,35 @@ private:
return;
}
if (!EverGotTopics) {
- ListTopicsWaiters.emplace_back(TWaiter{ev->Sender, {}, EWaiterType::DescribeAllTopics});
+ ListTopicsWaiters.push(ev->Sender);
return;
}
return ProcessDescribeAllTopics(ev->Sender, ctx);
}
- void ProcessDescribeAllTopics(const TActorId& waiterId, const TActorContext& ctx) {
+ void ProcessDescribeAllTopics(const TActorId& waiter, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Process describe all topics");
if (EverGotTopics && CurrentTopics.empty()) {
- SendDescribeAllTopicsResponse(waiterId, ctx, true);
+ SendDescribeAllTopicsResponse(waiter, ctx, true);
return;
}
if (FullTopicsCache && !FullTopicsCacheOutdated) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Respond from cache");
-
- return SendDescribeAllTopicsResponse(waiterId, ctx);
+ return SendDescribeAllTopicsResponse(waiter, ctx);
+ }
+ if (DescribeAllTopicsWaiters.empty()) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Make full list SC request");
+ SendSchemeCacheRequest(CurrentTopics, true, true, ctx);
}
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Store waiter");
- SendSchemeCacheRequest(CurrentTopics, true, ctx);
- auto inserted = DescribeTopicsWaiters.insert(std::make_pair(
- RequestId,
- TWaiter{waiterId, {}, EWaiterType::DescribeAllTopics}
- )).second;
- Y_VERIFY(inserted);
+ DescribeAllTopicsWaiters.push(waiter);
FullTopicsCacheOutdated = false;
FullTopicsCache = nullptr;
}
void HandleSchemeCacheResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
- auto* result = ev->Get()->Request.Release();
- auto waiterIter = DescribeTopicsWaiters.find(result->Instant);
- Y_VERIFY(!waiterIter.IsEnd());
- auto& waiter = waiterIter->second;
-
- if (waiter.Type == EWaiterType::DescribeAllTopics) {
+ auto& result = ev->Get()->Request;
+ if (result->Instant == 0) {
for (const auto& entry : result->ResultSet) {
if (!entry.PQGroupInfo) {
continue;
@@ -338,62 +324,70 @@ private:
if (desc.HasBalancerTabletID() && desc.GetBalancerTabletID() != 0) {
continue;
}
-
FullTopicsCacheOutdated = true;
}
- FullTopicsCache.Reset(result);
+ FullTopicsCache.reset(result.Release());
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Updated topics cache with " << FullTopicsCache->ResultSet.size());
- SendDescribeAllTopicsResponse(waiter.WaiterId, ctx);
+ while (!DescribeAllTopicsWaiters.empty()) {
+ SendDescribeAllTopicsResponse(DescribeAllTopicsWaiters.front(), ctx);
+ DescribeAllTopicsWaiters.pop();
+ }
} else {
- Y_VERIFY(waiter.Type == EWaiterType::DescribeCustomTopics);
+ auto waiterIter = DescribeTopicsWaiters.find(result->Instant);
+ Y_VERIFY(!waiterIter.IsEnd());
+ auto& waiter = waiterIter->second;
+
Y_VERIFY(waiter.Topics.size() == result->ResultSet.size());
auto *response = new TEvPqNewMetaCache::TEvDescribeTopicsResponse{
- std::move(waiter.Topics), result
+ std::move(waiter.Topics), result.Release()
};
ctx.Send(waiter.WaiterId, response);
+ DescribeTopicsWaiters.erase(waiterIter);
}
- DescribeTopicsWaiters.erase(waiterIter);
}
- void SendSchemeCacheRequest(const TVector<TString>& topics, bool addDefaultPathPrefix, const TActorContext& ctx) {
- auto schemeCacheRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>(++RequestId);
+ void SendSchemeCacheRequest(
+ const TVector<TString>& topics, bool addDefaultPathPrefix, bool isFullListingRequest, const TActorContext& ctx
+ ) {
+ auto instant = isFullListingRequest ? 0 : ++RequestId;
+ auto schemeCacheRequest = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(instant);
for (const auto& path : topics) {
auto split = NKikimr::SplitPath(path);
- Y_VERIFY(!split.empty());
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
if (addDefaultPathPrefix) {
entry.Path.insert(entry.Path.end(), PathPrefixParts.begin(), PathPrefixParts.end());
}
entry.Path.insert(entry.Path.end(), split.begin(), split.end());
- entry.SyncVersion = true;
+ entry.SyncVersion = !isFullListingRequest;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTopic;
schemeCacheRequest->ResultSet.emplace_back(std::move(entry));
}
- ctx.Send(SchemeCacheId, new TEvTxProxySchemeCache::TEvNavigateKeySet(schemeCacheRequest.release()));
- }
-
- void SendListTopicsResponse(const TActorId& recipient, const TActorContext& ctx) {
- auto* response = new TEvPqNewMetaCache::TEvListTopicsResponse();
- response->Topics = CurrentTopics;
- response->TopicsVersion = CurrentTopicsVersion;
- ctx.Send(recipient, response);
+ ctx.Send(SchemeCacheId, new TEvTxProxySchemeCache::TEvNavigateKeySet(schemeCacheRequest.Release()));
}
void SendDescribeAllTopicsResponse(const TActorId& recipient, const TActorContext& ctx, bool empty = false) {
- NSchemeCache::TSchemeCacheNavigate* scResponse;
if (empty) {
- scResponse = new NSchemeCache::TSchemeCacheNavigate();
+ ctx.Send(
+ recipient,
+ new TEvPqNewMetaCache::TEvDescribeAllTopicsResponse(
+ PathPrefix, std::make_shared<NSchemeCache::TSchemeCacheNavigate>()
+ )
+ );
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Send empty describe all topics response");
+ return;
} else {
- scResponse = new NSchemeCache::TSchemeCacheNavigate(*FullTopicsCache);
+ ctx.Send(
+ recipient,
+ new TEvPqNewMetaCache::TEvDescribeAllTopicsResponse(
+ PathPrefix, FullTopicsCache
+ )
+ );
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Send describe all topics response with " << FullTopicsCache->ResultSet.size() << " topics");
+
}
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Send describe all topics response with " << scResponse->ResultSet.size() << " topics");
- auto* response = new TEvPqNewMetaCache::TEvDescribeAllTopicsResponse(
- PathPrefix, scResponse
- );
- ctx.Send(recipient, response);
}
static NActors::TActorSystem* GetActorSystem() {
@@ -407,16 +401,16 @@ public:
}
STRICT_STFUNC(StateFunc,
- SFunc(NActors::TEvents::TEvWakeup, StartSession)
- SFunc(TEvPqNewMetaCache::TEvSessionStarted, HandleSessionStarted)
- SFunc(TEvPqNewMetaCache::TEvQueryPrepared, HandleTopicsQueryPrepared)
- HFunc(TEvPqNewMetaCache::TEvQueryComplete, HandleQueryComplete)
- HFunc(TEvPqNewMetaCache::TEvRestartQuery, HandleRestartQuery)
-
- HFunc(TEvPqNewMetaCache::TEvListTopicsRequest, HandleListTopics)
- HFunc(TEvPqNewMetaCache::TEvDescribeTopicsRequest, HandleDescribeTopics)
- HFunc(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest, HandleDescribeAllTopics)
- HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse)
+ SFunc(NActors::TEvents::TEvWakeup, StartSession)
+ SFunc(TEvPqNewMetaCache::TEvSessionStarted, HandleSessionStarted)
+ SFunc(TEvPqNewMetaCache::TEvQueryPrepared, HandleTopicsQueryPrepared)
+ HFunc(TEvPqNewMetaCache::TEvQueryComplete, HandleQueryComplete)
+ HFunc(TEvPqNewMetaCache::TEvRestartQuery, HandleRestartQuery)
+
+ HFunc(TEvPqNewMetaCache::TEvGetVersionRequest, HandleGetVersion)
+ HFunc(TEvPqNewMetaCache::TEvDescribeTopicsRequest, HandleDescribeTopics)
+ HFunc(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest, HandleDescribeAllTopics)
+ HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse)
)
private:
@@ -429,11 +423,6 @@ private:
struct TWaiter {
TActorId WaiterId;
TVector<TString> Topics;
- EWaiterType Type;
-
- static TWaiter AllTopicsWaiter(const TActorId& waiterId) {
- return TWaiter{waiterId, {}, EWaiterType::DescribeAllTopics};
- }
};
struct TTopicKey {
@@ -513,11 +502,12 @@ private:
TDuration VersionCheckInterval = TDuration::Seconds(1);
TInstant LastVersionUpdate = TInstant::Zero();
- TDeque<TWaiter> ListTopicsWaiters;
+ TQueue<TActorId> ListTopicsWaiters;
+ TQueue<TActorId> DescribeAllTopicsWaiters;
THashMap<ui64, TWaiter> DescribeTopicsWaiters;
ui64 RequestId = 1;
- THolder<NSchemeCache::TSchemeCacheNavigate> FullTopicsCache;
+ std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> FullTopicsCache;
bool FullTopicsCacheOutdated = false;
NActors::TActorId SchemeCacheId;
TString PathPrefix;
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.h b/ydb/core/client/server/msgbus_server_pq_metacache.h
index 6c12b76b04..64c4c5ed11 100644
--- a/ydb/core/client/server/msgbus_server_pq_metacache.h
+++ b/ydb/core/client/server/msgbus_server_pq_metacache.h
@@ -42,8 +42,8 @@ struct TEvPqNewMetaCache {
EvQueryPrepared,
EvQueryComplete,
EvRestartQuery,
- EvListTopicsRequest,
- EvListTopicsResponse,
+ EvGetVersionRequest,
+ EvGetVersionResponse,
EvDescribeTopicsRequest,
EvDescribeTopicsResponse,
EvDescribeAllTopicsRequest,
@@ -74,11 +74,14 @@ struct TEvPqNewMetaCache {
: Type(type) {}
};
- struct TEvListTopicsRequest : public TEventLocal<TEvListTopicsRequest, EvListTopicsRequest> {
+ struct TEvGetVersionRequest : public TEventLocal<TEvGetVersionRequest, EvGetVersionRequest> {
};
- struct TEvListTopicsResponse : public TEventLocal<TEvListTopicsResponse, EvListTopicsResponse> {
- TVector<TString> Topics;
+ struct TEvGetVersionResponse : public TEventLocal<TEvGetVersionResponse, EvGetVersionResponse> {
+ TEvGetVersionResponse(ui64 version)
+ : TopicsVersion(version)
+ {}
+
ui64 TopicsVersion;
};
@@ -89,23 +92,23 @@ struct TEvPqNewMetaCache {
TEvDescribeTopicsRequest() = default;
explicit TEvDescribeTopicsRequest(const TVector<TString>& topics)
- : Topics(topics)
+ : Topics(topics)
{}
TEvDescribeTopicsRequest(const TVector<TString>& topics, const TString& pathPrefix)
- : PathPrefix(pathPrefix)
- , Topics(topics)
+ : PathPrefix(pathPrefix)
+ , Topics(topics)
{}
};
struct TEvDescribeTopicsResponse : public TEventLocal<TEvDescribeTopicsResponse, EvDescribeTopicsResponse> {
TVector<TString> TopicsRequested;
- THolder<NSchemeCache::TSchemeCacheNavigate> Result;
+ std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> Result;
explicit TEvDescribeTopicsResponse(const TVector<TString>& topics,
NSchemeCache::TSchemeCacheNavigate* result)
- : TopicsRequested(topics)
- , Result(result)
+ : TopicsRequested(topics)
+ , Result(result)
{}
};
@@ -120,13 +123,13 @@ struct TEvPqNewMetaCache {
struct TEvDescribeAllTopicsResponse : public TEventLocal<TEvDescribeAllTopicsResponse, EvDescribeAllTopicsResponse> {
bool Success = true;
TString Path;
- THolder<NSchemeCache::TSchemeCacheNavigate> Result;
+ std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> Result;
explicit TEvDescribeAllTopicsResponse(const TString& path)
- : Path(path)
+ : Path(path)
{}
- TEvDescribeAllTopicsResponse(const TString& path, NSchemeCache::TSchemeCacheNavigate* result)
- : Path(path)
- , Result(result)
+ TEvDescribeAllTopicsResponse(const TString& path, const std::shared_ptr<NSchemeCache::TSchemeCacheNavigate>& result)
+ : Path(path)
+ , Result(result)
{}
};
};
diff --git a/ydb/core/persqueue/writer/source_id_encoding.cpp b/ydb/core/persqueue/writer/source_id_encoding.cpp
index 13a37c2c61..b528da2099 100644
--- a/ydb/core/persqueue/writer/source_id_encoding.cpp
+++ b/ydb/core/persqueue/writer/source_id_encoding.cpp
@@ -4,11 +4,54 @@
#include <util/generic/yexception.h>
#include <util/string/strip.h>
+#include <util/string/builder.h>
+#include <util/string/hex.h>
+#include <util/digest/murmur.h>
+#include <library/cpp/digest/md5/md5.h>
namespace NKikimr {
namespace NPQ {
+
+
+TString GetSourceIdSelectQueryFromPath(const TString& path) {
+ TStringBuilder res;
+ res << "--!syntax_v1\n"
+ "DECLARE $Hash AS Uint32; "
+ "DECLARE $Topic AS Utf8; "
+ "DECLARE $SourceId AS Utf8; "
+ "SELECT Partition, CreateTime, AccessTime FROM `" << path << "` "
+ "WHERE Hash == $Hash AND Topic == $Topic AND SourceId == $SourceId;";
+ return res;
+}
+
+TString GetSourceIdSelectQuery(const TString& root) {
+ return GetSourceIdSelectQueryFromPath(root + "/SourceIdMeta2");
+}
+
+TString GetUpdateIdSelectQueryFromPath(const TString& path) {
+ TStringBuilder res;
+ res << "--!syntax_v1\n"
+ "DECLARE $SourceId AS Utf8; "
+ "DECLARE $Topic AS Utf8; "
+ "DECLARE $Hash AS Uint32; "
+ "DECLARE $Partition AS Uint32; "
+ "DECLARE $CreateTime AS Uint64; "
+ "DECLARE $AccessTime AS Uint64;\n"
+ "UPSERT INTO `" << path << "` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition) VALUES "
+ "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition);";
+
+ return res;
+}
+
+TString GetUpdateIdSelectQuery(const TString& root) {
+ return GetUpdateIdSelectQueryFromPath(root + "/SourceIdMeta2");
+}
+
+
namespace NSourceIdEncoding {
+static const ui32 MURMUR_ARRAY_SEED = 0x9747b28c;
+
struct TTags {
static constexpr char Simple = 0;
static constexpr char Base64 = 1;
@@ -84,6 +127,18 @@ bool IsValidEncoded(const TString& sourceId) {
}
}
+
+TEncodedSourceId EncodeSrcId(const TString& topic, const TString& userSourceId) {
+ TEncodedSourceId res;
+ TString encodedSourceId = Encode(userSourceId);
+ res.EscapedSourceId = HexEncode(encodedSourceId);
+
+ TString s = topic + encodedSourceId;
+ res.Hash = MurmurHash<ui32>(s.c_str(), s.size(), MURMUR_ARRAY_SEED);
+ return res;
+}
+
+
} // NSourceIdEncoding
} // NPQ
} // NKikimr
diff --git a/ydb/core/persqueue/writer/source_id_encoding.h b/ydb/core/persqueue/writer/source_id_encoding.h
index 6472d8b4ac..c48a1c574e 100644
--- a/ydb/core/persqueue/writer/source_id_encoding.h
+++ b/ydb/core/persqueue/writer/source_id_encoding.h
@@ -1,9 +1,18 @@
#pragma once
#include <util/generic/fwd.h>
+#include <util/generic/string.h>
namespace NKikimr {
namespace NPQ {
+
+TString GetSourceIdSelectQuery(const TString& root);
+TString GetUpdateIdSelectQuery(const TString& root);
+
+TString GetSourceIdSelectQueryFromPath(const TString& path);
+TString GetUpdateIdSelectQueryFromPath(const TString& path);
+
+
namespace NSourceIdEncoding {
TString EncodeSimple(const TString& sourceId);
@@ -13,6 +22,14 @@ TString Decode(const TString& encodedSourceId);
bool IsValidEncoded(const TString& encodedSourceId);
+
+struct TEncodedSourceId {
+ TString EscapedSourceId;
+ ui32 Hash = 0;
+};
+
+TEncodedSourceId EncodeSrcId(const TString& topic, const TString& userSourceId);
+
} // NSourceIdEncoding
} // NPQ
} // NKikimr
diff --git a/ydb/core/testlib/mock_pq_metacache.h b/ydb/core/testlib/mock_pq_metacache.h
index 1280f87779..e132cd5f9a 100644
--- a/ydb/core/testlib/mock_pq_metacache.h
+++ b/ydb/core/testlib/mock_pq_metacache.h
@@ -69,7 +69,7 @@ public:
response->Success = success;
auto* result = new NSchemeCache::TSchemeCacheNavigate();
result->ResultSet = resultSet;
- response->Result.Reset(result);
+ response->Result.reset(result);
ctx.Send(ev->Sender, std::move(response));
};
diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h
index 59a93052f7..841500d544 100644
--- a/ydb/core/testlib/test_pq_client.h
+++ b/ydb/core/testlib/test_pq_client.h
@@ -478,6 +478,7 @@ private:
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
+public:
TMaybe<NYdb::TResultSet> RunYqlDataQueryWithParams(TString query, const NYdb::TParams& params) {
auto tableClient = NYdb::NTable::TTableClient(*Driver);
TMaybe<NYdb::TResultSet> rs;
@@ -502,7 +503,6 @@ private:
}
-public:
TFlatMsgBusPQClient(
const Tests::TServerSettings& settings, ui16 grpc, TMaybe<TString> databaseName = Nothing()
)
diff --git a/ydb/library/persqueue/topic_parser/topic_parser.h b/ydb/library/persqueue/topic_parser/topic_parser.h
index eb5a69cc37..4f5ade9c99 100644
--- a/ydb/library/persqueue/topic_parser/topic_parser.h
+++ b/ydb/library/persqueue/topic_parser/topic_parser.h
@@ -53,6 +53,8 @@ protected:
TStringBuf topicNormalized = topic;
topicNormalized.SkipPrefix("/");
+ CHECK_SET_VALID(!topicNormalized.StartsWith("/"), "Multiple leading '/' in topic name", return *this);
+ CHECK_SET_VALID(!topicNormalized.empty(), "Empty topic name", return *this);
TStringBuf dbNormalized = database;
dbNormalized.SkipPrefix("/");
@@ -167,6 +169,25 @@ public:
}
return TString{fst};
}
+ TString GetTopicForSrcId() const {
+ if (!IsValid())
+ return {};
+ if (NoDcMode) {
+ return FullModernPath;
+ } else {
+ return FullLegacyName;
+ }
+ }
+
+ TString GetTopicForSrcIdHash() const {
+ if (!IsValid())
+ return {};
+ if (NoDcMode) {
+ return FullModernPath;
+ } else {
+ return LegacyName;
+ }
+ }
private:
bool IsLegacyTopicName(const TStringBuf& path) {
diff --git a/ydb/services/persqueue_v1/grpc_pq_actor.h b/ydb/services/persqueue_v1/grpc_pq_actor.h
index 65c8aca1db..55127d3f03 100644
--- a/ydb/services/persqueue_v1/grpc_pq_actor.h
+++ b/ydb/services/persqueue_v1/grpc_pq_actor.h
@@ -9,7 +9,7 @@
#include <ydb/core/client/server/msgbus_server_pq_metacache.h>
#include <ydb/core/client/server/msgbus_server_persqueue.h>
-
+#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/base/events.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
@@ -526,12 +526,16 @@ private:
void LogSession(const TActorContext& ctx);
void DiscoverPartition(const NActors::TActorContext& ctx);
+ void SendSelectPartitionRequest(ui32 hash, const TString& topic, const NActors::TActorContext& ctx);
+
void UpdatePartition(const NActors::TActorContext& ctx);
void RequestNextPartition(const NActors::TActorContext& ctx);
void ProceedPartition(const ui32 partition, const NActors::TActorContext& ctx);
- THolder<NKqp::TEvKqp::TEvQueryRequest> MakeUpdateSourceIdMetadataRequest(const NActors::TActorContext& ctx);
- void InitCheckACL(const TActorContext& ctx);
+ THolder<NKqp::TEvKqp::TEvQueryRequest> MakeUpdateSourceIdMetadataRequest(ui32 hash, const TString& topic,
+ const NActors::TActorContext& ctx);
+ void SendUpdateSrcIdsRequests(const TActorContext& ctx);
+ //void InitCheckACL(const TActorContext& ctx);
void Handle(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev, const TActorContext& ctx);
void Handle(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev, const TActorContext& ctx);
@@ -581,10 +585,11 @@ private:
NPersQueue::TConverterPtr TopicConverter;
ui32 Partition;
ui32 PreferedPartition;
+ bool PartitionFound = false;
// 'SourceId' is called 'MessageGroupId' since gRPC data plane API v1
TString SourceId; // TODO: Replace with 'MessageGroupId' everywhere
- TString EscapedSourceId;
- ui32 Hash = 0;
+ NPQ::NSourceIdEncoding::TEncodedSourceId EncodedSourceId;
+ ui32 CompatibleHash;
TString OwnerCookie;
TString UserAgent;
@@ -659,9 +664,12 @@ private:
TString ClientDC;
TString SelectSourceIdQuery;
TString UpdateSourceIdQuery;
+ ui32 SelectSrcIdsInflight = 0;
+ ui64 MaxSrcIdAccessTime = 0;
+
TInstant LastSourceIdUpdate;
ui64 SourceIdCreateTime;
- bool SourceIdUpdateInfly;
+ ui32 SourceIdUpdatesInflight = 0;
TVector<NPQ::TLabelsInfo> Aggr;
NKikimr::NPQ::TMultiCounter SLITotal;
diff --git a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
index 86f101df9b..98334996a3 100644
--- a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
@@ -23,6 +23,7 @@ using namespace NKikimrClient;
namespace NKikimr {
using namespace NSchemeCache;
+using namespace NPQ;
Ydb::PersQueue::V1::Codec CodecByName(const TString& codec) {
static const THashMap<TString, Ydb::PersQueue::V1::Codec> codecsByName = {
@@ -81,30 +82,8 @@ using namespace Ydb::PersQueue::V1;
static const ui32 MAX_RESERVE_REQUESTS_INFLIGHT = 5;
static const ui32 MAX_BYTES_INFLIGHT = 1 << 20; //1mb
-static const ui32 MURMUR_ARRAY_SEED = 0x9747b28c;
static const TDuration SOURCEID_UPDATE_PERIOD = TDuration::Hours(1);
-static const TString SELECT_SOURCEID_QUERY1 =
- "--!syntax_v1\n"
- "DECLARE $Hash AS Uint32; "
- "DECLARE $Topic AS Utf8; "
- "DECLARE $SourceId AS Utf8; "
- "SELECT Partition, CreateTime FROM `";
-static const TString SELECT_SOURCEID_QUERY2 = "` "
- "WHERE Hash == $Hash AND Topic == $Topic AND SourceId == $SourceId; ";
-
-static const TString UPDATE_SOURCEID_QUERY1 =
- "--!syntax_v1\n"
- "DECLARE $SourceId AS Utf8; "
- "DECLARE $Topic AS Utf8; "
- "DECLARE $Hash AS Uint32; "
- "DECLARE $Partition AS Uint32; "
- "DECLARE $CreateTime AS Uint64; "
- "DECLARE $AccessTime AS Uint64; "
- "UPSERT INTO `";
-static const TString UPDATE_SOURCEID_QUERY2 = "` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition) VALUES "
- "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition); ";
-
//TODO: add here tracking of bytes in/out
@@ -142,7 +121,7 @@ TWriteSessionActor::TWriteSessionActor(
, ClientDC(clientDC ? *clientDC : "other")
, LastSourceIdUpdate(TInstant::Zero())
, SourceIdCreateTime(0)
- , SourceIdUpdateInfly(false)
+ , SourceIdUpdatesInflight(0)
{
Y_ASSERT(Request);
++(*GetServiceCounters(Counters, "pqproxy|writeSession")->GetCounter("SessionsCreatedTotal", true));
@@ -154,8 +133,8 @@ TWriteSessionActor::~TWriteSessionActor() = default;
void TWriteSessionActor::Bootstrap(const TActorContext& ctx) {
Y_VERIFY(Request);
- SelectSourceIdQuery = SELECT_SOURCEID_QUERY1 + AppData(ctx)->PQConfig.GetSourceIdTablePath() + SELECT_SOURCEID_QUERY2;
- UpdateSourceIdQuery = UPDATE_SOURCEID_QUERY1 + AppData(ctx)->PQConfig.GetSourceIdTablePath() + UPDATE_SOURCEID_QUERY2;
+ SelectSourceIdQuery = GetSourceIdSelectQueryFromPath(AppData(ctx)->PQConfig.GetSourceIdTablePath());
+ UpdateSourceIdQuery = GetUpdateIdSelectQueryFromPath(AppData(ctx)->PQConfig.GetSourceIdTablePath());
Request->GetStreamCtx()->Attach(ctx.SelfID);
if (!Request->GetStreamCtx()->Read()) {
@@ -316,17 +295,18 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActor
PeerName = event->PeerName;
SourceId = init.message_group_id();
- TString encodedSourceId;
try {
- encodedSourceId = NPQ::NSourceIdEncoding::Encode(SourceId);
+ // "Bad" hash - in legacy mode calculated from full name ("rt3...", not short name);
+ // Here we had a bug for all the time being and now have to keep compatibility was invalid hashes
+ // Generally GetTopicForSrcIdHash for encoding. Do not copy-paste this line;
+ EncodedSourceId = NSourceIdEncoding::EncodeSrcId(TopicConverter->GetTopicForSrcId(), SourceId);
+
+ // Good hash and proper way of calcultion;
+ CompatibleHash = NSourceIdEncoding::EncodeSrcId(TopicConverter->GetTopicForSrcIdHash(), SourceId).Hash;
} catch (yexception& e) {
CloseSession(TStringBuilder() << "incorrect sourceId \"" << SourceId << "\": " << e.what(), PersQueue::ErrorCode::BAD_REQUEST, ctx);
return;
}
- EscapedSourceId = HexEncode(encodedSourceId);
-
- TString s = TopicConverter->GetClientsideName() + encodedSourceId;
- Hash = MurmurHash<ui32>(s.c_str(), s.size(), MURMUR_ARRAY_SEED);
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << " " << init << " from " << PeerName);
//TODO: get user agent from headers
@@ -406,7 +386,7 @@ void TWriteSessionActor::InitCheckSchema(const TActorContext& ctx, bool needWait
}
void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActorContext& ctx) {
- auto* res = ev->Get()->Result.Get();
+ auto& res = ev->Get()->Result;
Y_VERIFY(res->ResultSet.size() == 1);
auto& entry = res->ResultSet[0];
@@ -529,7 +509,12 @@ void TWriteSessionActor::DiscoverPartition(const NActors::TActorContext& ctx) {
ProceedPartition(partitionId, ctx);
return;
}
+ SendSelectPartitionRequest(CompatibleHash, TopicConverter->GetFullLegacyName(), ctx);
+ SendSelectPartitionRequest(EncodedSourceId.Hash, TopicConverter->GetFullLegacyName(), ctx);
+ State = ES_WAIT_TABLE_REQUEST_1;
+}
+void TWriteSessionActor::SendSelectPartitionRequest(ui32 hash, const TString& topic, const NActors::TActorContext& ctx) {
//read from DS
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
@@ -543,19 +528,20 @@ void TWriteSessionActor::DiscoverPartition(const NActors::TActorContext& ctx) {
// keep compiled query in cache.
ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
NClient::TParameters parameters;
- parameters["$Hash"] = Hash;
- parameters["$Topic"] = TopicConverter->GetClientsideName();
- parameters["$SourceId"] = EscapedSourceId;
+
+ parameters["$Hash"] = hash;
+ parameters["$Topic"] = topic;
+ parameters["$SourceId"] = EncodedSourceId.EscapedSourceId;
+
ev->Record.MutableRequest()->MutableParameters()->Swap(&parameters);
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
- State = ES_WAIT_TABLE_REQUEST_1;
+ SelectSrcIdsInflight++;
}
void TWriteSessionActor::UpdatePartition(const TActorContext& ctx) {
Y_VERIFY(State == ES_WAIT_TABLE_REQUEST_1 || State == ES_WAIT_NEXT_PARTITION);
- auto ev = MakeUpdateSourceIdMetadataRequest(ctx);
- ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
+ SendUpdateSrcIdsRequests(ctx);
State = ES_WAIT_TABLE_REQUEST_2;
}
@@ -589,7 +575,7 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const
if (record.GetYdbStatus() == Ydb::StatusIds::ABORTED) {
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " messageGroupId "
- << SourceId << " escaped " << EscapedSourceId << " discover partition race, retrying");
+ << SourceId << " escaped " << EncodedSourceId.EscapedSourceId << " discover partition race, retrying");
DiscoverPartition(ctx);
return;
}
@@ -599,7 +585,7 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const
errorReason << "internal error in kqp Marker# PQ50 : " << record;
if (State == EState::ES_INITED) {
LOG_WARN_S(ctx, NKikimrServices::PQ_WRITE_PROXY, errorReason);
- SourceIdUpdateInfly = false;
+ SourceIdUpdatesInflight--;
} else {
CloseSession(errorReason, PersQueue::ErrorCode::ERROR, ctx);
}
@@ -607,53 +593,66 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const
}
if (State == EState::ES_WAIT_TABLE_REQUEST_1) {
- SourceIdCreateTime = TInstant::Now().MilliSeconds();
-
- bool partitionFound = false;
+ SelectSrcIdsInflight--;
auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
if (t.ListSize() != 0) {
auto& tt = t.GetList(0).GetStruct(0);
if (tt.HasOptional() && tt.GetOptional().HasUint32()) { //already got partition
- Partition = tt.GetOptional().GetUint32();
- if (PreferedPartition < Max<ui32>() && Partition != PreferedPartition) {
- CloseSession(TStringBuilder() << "MessageGroupId " << SourceId << " is already bound to PartitionGroupId " << (Partition + 1) << ", but client provided " << (PreferedPartition + 1) << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use another MessageGroupId, specify PartitionGroupId " << (Partition + 1) << ", or do not specify PartitionGroupId at all.",
- PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ auto accessTime = t.GetList(0).GetStruct(2).GetOptional().GetUint64();
+ if (accessTime > MaxSrcIdAccessTime) { // AccessTime
+ Partition = tt.GetOptional().GetUint32();
+ PartitionFound = true;
+ SourceIdCreateTime = t.GetList(0).GetStruct(1).GetOptional().GetUint64();
+ MaxSrcIdAccessTime = accessTime;
}
- partitionFound = true;
- SourceIdCreateTime = t.GetList(0).GetStruct(1).GetOptional().GetUint64();
}
}
+ if (SelectSrcIdsInflight != 0) {
+ return;
+ }
+ if (PartitionFound && PreferedPartition < Max<ui32>() && Partition != PreferedPartition) {
+ CloseSession(TStringBuilder() << "MessageGroupId " << SourceId << " is already bound to PartitionGroupId " << (Partition + 1) << ", but client provided " << (PreferedPartition + 1) << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use another MessageGroupId, specify PartitionGroupId " << (Partition + 1) << ", or do not specify PartitionGroupId at all.",
+ PersQueue::ErrorCode::BAD_REQUEST, ctx);
+ return;
+ }
+ if (SourceIdCreateTime == 0) {
+ SourceIdCreateTime = TInstant::Now().MilliSeconds();
+ }
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " messageGroupId "
- << SourceId << " escaped " << EscapedSourceId << " hash " << Hash << " partition " << Partition << " partitions "
- << PartitionToTablet.size() << "(" << Hash % PartitionToTablet.size() << ") create " << SourceIdCreateTime << " result " << t);
+ << SourceId << " escaped " << EncodedSourceId.EscapedSourceId << " hash " << CompatibleHash << " partition " << Partition << " partitions "
+ << PartitionToTablet.size() << "(" << CompatibleHash % PartitionToTablet.size() << ") create " << SourceIdCreateTime << " result " << t);
- if (!partitionFound && (PreferedPartition < Max<ui32>() || !AppData(ctx)->PQConfig.GetRoundRobinPartitionMapping())) {
- Partition = PreferedPartition < Max<ui32>() ? PreferedPartition : Hash % PartitionToTablet.size(); //choose partition default value
- partitionFound = true;
+ if (!PartitionFound && (PreferedPartition < Max<ui32>() || !AppData(ctx)->PQConfig.GetRoundRobinPartitionMapping())) {
+ Partition = PreferedPartition < Max<ui32>() ? PreferedPartition : CompatibleHash % PartitionToTablet.size(); //choose partition default value
+ PartitionFound = true;
}
- if (partitionFound) {
+ if (PartitionFound) {
UpdatePartition(ctx);
} else {
RequestNextPartition(ctx);
}
return;
} else if (State == EState::ES_WAIT_TABLE_REQUEST_2) {
- LastSourceIdUpdate = ctx.Now();
- ProceedPartition(Partition, ctx);
+ SourceIdUpdatesInflight--;
+ if (!SourceIdUpdatesInflight) {
+ LastSourceIdUpdate = ctx.Now();
+ ProceedPartition(Partition, ctx);
+ }
} else if (State == EState::ES_INITED) {
- SourceIdUpdateInfly = false;
- LastSourceIdUpdate = ctx.Now();
+ SourceIdUpdatesInflight--;
+ if (!SourceIdUpdatesInflight) {
+ LastSourceIdUpdate = ctx.Now();
+ }
} else {
Y_FAIL("Wrong state");
}
}
THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor::MakeUpdateSourceIdMetadataRequest(
- const NActors::TActorContext& ctx
+ ui32 hash, const TString& topic, const NActors::TActorContext& ctx
) {
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
@@ -670,9 +669,10 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor::MakeUpdateSourceIdMet
ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
NClient::TParameters parameters;
- parameters["$Hash"] = Hash;
- parameters["$Topic"] = TopicConverter->GetClientsideName();
- parameters["$SourceId"] = EscapedSourceId;
+ parameters["$Hash"] = hash;
+ parameters["$Topic"] = topic;
+ parameters["$SourceId"] = EncodedSourceId.EscapedSourceId;
+
parameters["$CreateTime"] = SourceIdCreateTime;
parameters["$AccessTime"] = TInstant::Now().MilliSeconds();
parameters["$Partition"] = Partition;
@@ -681,12 +681,25 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor::MakeUpdateSourceIdMet
return ev;
}
+void TWriteSessionActor::SendUpdateSrcIdsRequests(const TActorContext& ctx) {
+ {
+ //full legacy name (rt3.dc--acc--topic)
+ auto ev = MakeUpdateSourceIdMetadataRequest(CompatibleHash, TopicConverter->GetFullLegacyName(), ctx);
+ SourceIdUpdatesInflight++;
+ ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
+ }
+ {
+ auto ev = MakeUpdateSourceIdMetadataRequest(EncodedSourceId.Hash, TopicConverter->GetFullLegacyName(), ctx);
+ SourceIdUpdatesInflight++;
+ ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
+ }
+}
void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr &ev, const TActorContext &ctx) {
auto& record = ev->Get()->Record;
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session cookie: " << Cookie << " sessionId: " << OwnerCookie << " sourceID "
- << SourceId << " escaped " << EscapedSourceId << " discover partition error - " << record);
+ << SourceId << " escaped " << EncodedSourceId.EscapedSourceId << " discover partition error - " << record);
CloseSession("Internal error on discovering partition", PersQueue::ErrorCode::ERROR, ctx);
}
@@ -1167,10 +1180,11 @@ void TWriteSessionActor::HandleWakeup(const TActorContext& ctx) {
InitCheckSchema(ctx);
}
// ToDo[migration] - separate flag for having config tables
- if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() && !SourceIdUpdateInfly && ctx.Now() - LastSourceIdUpdate > SOURCEID_UPDATE_PERIOD) {
- auto ev = MakeUpdateSourceIdMetadataRequest(ctx);
- SourceIdUpdateInfly = true;
- ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
+ if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()
+ && !SourceIdUpdatesInflight
+ && ctx.Now() - LastSourceIdUpdate > SOURCEID_UPDATE_PERIOD
+ ) {
+ SendUpdateSrcIdsRequests(ctx);
}
if (ctx.Now() >= LogSessionDeadline) {
LogSession(ctx);
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 2c955a1085..42ea25e7c1 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -9,7 +9,7 @@
#include <ydb/core/testlib/test_pq_client.h>
#include <ydb/core/protos/grpc_pq_old.pb.h>
#include <ydb/core/persqueue/cluster_tracker.h>
-
+#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/tablet/tablet_counters_aggregator.h>
#include <ydb/library/aclib/aclib.h>
@@ -3568,5 +3568,66 @@ namespace {
Cerr << "partition status: " << partitionStatus->DebugString() << Endl;
}
}
+ Y_UNIT_TEST(SrcIdCompatibility) {
+ NPersQueue::TTestServer server{};
+ auto runTest = [&] (
+ const TString& topicToAdd, const TString& topicForHash, const TString& topicName,
+ const TString& srcId, ui32 partId, ui64 accessTime = 0
+ ) {
+ TStringBuilder query;
+ auto encoded = NPQ::NSourceIdEncoding::EncodeSrcId(topicForHash, srcId);
+ Cerr << "===save partition with time: " << accessTime << Endl;
+
+ if (accessTime == 0) {
+ accessTime = TInstant::Now().MilliSeconds();
+ }
+ if (!topicToAdd.empty()) { // Empty means don't add anything
+ query <<
+ "--!syntax_v1\n"
+ "UPSERT INTO `/Root/PQ/SourceIdMeta2` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition) VALUES ("
+ << encoded.Hash << ", \"" << topicToAdd << "\", \"" << encoded.EscapedSourceId << "\", "
+ << TInstant::Now().MilliSeconds() << ", " << accessTime << ", " << partId << "); ";
+ Cerr << "Run query:\n" << query << Endl;
+ auto scResult = server.AnnoyingClient->RunYqlDataQuery(query);
+ //UNIT_ASSERT(scResult.Defined());
+ }
+
+ auto driver = server.AnnoyingClient->GetDriver();
+ auto writer = CreateWriter(*driver, topicName, srcId);
+ auto ev = writer->GetEvent(true);
+ auto ct = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent >(&*ev);
+ UNIT_ASSERT(ct);
+ writer->Write(std::move(ct->ContinuationToken), "1234567890");
+ UNIT_ASSERT(ev.Defined());
+ while(true) {
+ ev = writer->GetEvent(true);
+ auto ack = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TAcksEvent>(&*ev);
+ if (ack) {
+ UNIT_ASSERT_VALUES_EQUAL(ack->Acks[0].Details->PartitionId, partId);
+ break;
+ }
+
+ }
+ };
+
+ TString legacyName = "rt3.dc1--account--topic100";
+ TString shortLegacyName = "account--topic100";
+ TString fullPath = "/Root/PQ/rt3.dc1--account--topic100";
+ TString topicName = "account/topic100";
+ TString srcId1 = "test-src-id-compat", srcId2 = "test-src-id-compat2";
+ server.AnnoyingClient->CreateTopic(legacyName, 100);
+
+ runTest(legacyName, shortLegacyName, topicName, srcId1, 5, 100);
+ runTest(legacyName, legacyName, topicName, srcId2, 6, 100);
+ runTest("", "", topicName, srcId1, 5, 100);
+ runTest("", "", topicName, srcId2, 6, 100);
+
+ ui64 time = (TInstant::Now() + TDuration::Hours(4)).MilliSeconds();
+ runTest(legacyName, legacyName, topicName, srcId2, 7, time);
+
+
+
+ }
+
}
}