summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <[email protected]>2022-08-12 20:07:18 +0300
committeralexnick <[email protected]>2022-08-12 20:07:18 +0300
commitcc65f4da6b407e808692063c6ea6a9a910a41166 (patch)
treeea7945941cf0f5a272779daea6c1dec76374a076
parent9d8ceae328ea2efda6fe8d5480b434e22ad3ba95 (diff)
reduce KQP requests count
+
-rw-r--r--contrib/restricted/boost/hana/CONTRIBUTING.md5
-rw-r--r--contrib/restricted/boost/hana/README.md23
-rw-r--r--contrib/restricted/boost/hana/RELEASE_NOTES.md2
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.cpp33
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.h2
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp4
6 files changed, 33 insertions, 36 deletions
diff --git a/contrib/restricted/boost/hana/CONTRIBUTING.md b/contrib/restricted/boost/hana/CONTRIBUTING.md
index 403fae5c603..52d0858a6aa 100644
--- a/contrib/restricted/boost/hana/CONTRIBUTING.md
+++ b/contrib/restricted/boost/hana/CONTRIBUTING.md
@@ -18,7 +18,10 @@ you contribute:
running the tests:
```shell
- make check
+ mkdir build
+ cd build
+ cmake ..
+ cmake --build . --target check
```
5. Commit your changes. Your commit message should start with a one line
short description of the modifications, with the details and explanations
diff --git a/contrib/restricted/boost/hana/README.md b/contrib/restricted/boost/hana/README.md
index b4ef11bb35c..93364118b70 100644
--- a/contrib/restricted/boost/hana/README.md
+++ b/contrib/restricted/boost/hana/README.md
@@ -171,21 +171,13 @@ Please see [LICENSE.md](LICENSE.md).
## Releasing
-This section acts as a reminder of the few simple steps required to release a
-new version of the library. This is only relevant to Hana's developers. To
-release a new version of the library, make sure the current version in
-`include/boost/hana/version.hpp` matches the release you're about to publish.
-Then, create an annotated tag with:
-```sh
-git tag -a --file=- v<version> <<EOM
-...your message here...
-EOM
-```
-
-Then, push the tag and create a new GitHub release pointing to that tag.
-Once that is done, bump the version number in `include/boost/hana/version.hpp`
-so that it matches the next _planned_ release. Finally, do not forget to update
-the [Homebrew formula][] to point to the latest version.
+To release a new version of Hana, use the `util/release.sh` script. The script
+will merge `develop` to `master`, create a tag on `master` and then bump the
+version on `develop`. The tag on `master` will be annotated with the contents
+of the `RELEASE_NOTES.md` file. Once the `release.sh` script has been run, the
+`master` and `develop` branches should be pushed manually, as well as the tag
+that was created on `master`. Finally, create a GitHub release pointing to the
+new tag on `master`.
<!-- Links -->
@@ -199,4 +191,3 @@ the [Homebrew formula][] to point to the latest version.
[eRuby]: http://en.wikipedia.org/wiki/ERuby
[Hana.docs]: http://boostorg.github.io/hana
[Hana.wiki]: https://github.com/boostorg/hana/wiki
-[Homebrew formula]: https://github.com/Homebrew/homebrew-core/blob/master/Formula/hana.rb
diff --git a/contrib/restricted/boost/hana/RELEASE_NOTES.md b/contrib/restricted/boost/hana/RELEASE_NOTES.md
new file mode 100644
index 00000000000..c10c2a0a258
--- /dev/null
+++ b/contrib/restricted/boost/hana/RELEASE_NOTES.md
@@ -0,0 +1,2 @@
+Release notes for Hana 1.6.2
+============================
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.cpp b/ydb/services/persqueue_v1/actors/write_session_actor.cpp
index 35e39983a7b..acf3841435f 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.cpp
@@ -320,13 +320,7 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActor
void TWriteSessionActor::InitAfterDiscovery(const TActorContext& ctx) {
try {
- // "Bad" hash - in legacy mode calculated from full name ("rt3...", not short name);
- // Here we had a bug for all the time being and now have to keep compatibility was invalid hashes
- // Generally GetTopicForSrcIdHash for encoding. Do not copy-paste this line;
- EncodedSourceId = NSourceIdEncoding::EncodeSrcId(FullConverter->GetTopicForSrcId(), SourceId);
-
- // Good hash and proper way of calcultion;
- CompatibleHash = NSourceIdEncoding::EncodeSrcId(FullConverter->GetTopicForSrcIdHash(), SourceId).Hash;
+ EncodedSourceId = NSourceIdEncoding::EncodeSrcId(FullConverter->GetTopicForSrcIdHash(), SourceId);
} catch (yexception& e) {
CloseSession(TStringBuilder() << "incorrect sourceId \"" << SourceId << "\": " << e.what(), PersQueue::ErrorCode::BAD_REQUEST, ctx);
return;
@@ -465,7 +459,6 @@ void TWriteSessionActor::DiscoverPartition(const NActors::TActorContext& ctx) {
ProceedPartition(partitionId, ctx);
return;
}
- SendSelectPartitionRequest(CompatibleHash, FullConverter->GetClientsideName(), ctx);
SendSelectPartitionRequest(EncodedSourceId.Hash, FullConverter->GetClientsideName(), ctx);
State = ES_WAIT_TABLE_REQUEST_1;
}
@@ -475,11 +468,11 @@ void TWriteSessionActor::SendSelectPartitionRequest(ui32 hash, const TString& to
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
- ev->Record.MutableRequest()->SetKeepSession(false);
+ ev->Record.MutableRequest()->SetKeepSession(true);
ev->Record.MutableRequest()->SetQuery(SelectSourceIdQuery);
ev->Record.MutableRequest()->SetDatabase(NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig));
// fill tx settings: set commit tx flag & begin new serializable tx.
- ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
+ ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(false);
ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
// keep compiled query in cache.
ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
@@ -552,6 +545,10 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const
SelectSrcIdsInflight--;
auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
+ TxId = record.GetResponse().GetTxMeta().id();
+ KqpSessionId = record.GetResponse().GetSessionId();
+ Y_VERIFY(!TxId.empty());
+
if (t.ListSize() != 0) {
auto& tt = t.GetList(0).GetStruct(0);
if (tt.HasOptional() && tt.GetOptional().HasUint32()) { //already got partition
@@ -592,6 +589,7 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const
}
return;
} else if (State == EState::ES_WAIT_TABLE_REQUEST_2) {
+
SourceIdUpdatesInflight--;
if (!SourceIdUpdatesInflight) {
LastSourceIdUpdate = ctx.Now();
@@ -620,7 +618,12 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor::MakeUpdateSourceIdMet
ev->Record.MutableRequest()->SetKeepSession(false);
// fill tx settings: set commit tx flag & begin new serializable tx.
ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
- ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
+ if (TxId) {
+ ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(TxId);
+ ev->Record.MutableRequest()->SetSessionId(KqpSessionId);
+ } else {
+ ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
+ }
// keep compiled query in cache.
ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
@@ -639,12 +642,6 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor::MakeUpdateSourceIdMet
void TWriteSessionActor::SendUpdateSrcIdsRequests(const TActorContext& ctx) {
{
- //full legacy name (rt3.dc--acc--topic)
- auto ev = MakeUpdateSourceIdMetadataRequest(CompatibleHash, FullConverter->GetClientsideName(), ctx);
- SourceIdUpdatesInflight++;
- ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
- }
- {
auto ev = MakeUpdateSourceIdMetadataRequest(EncodedSourceId.Hash, FullConverter->GetClientsideName(), ctx);
SourceIdUpdatesInflight++;
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
@@ -667,6 +664,8 @@ void TWriteSessionActor::ProceedPartition(const ui32 partition, const TActorCont
ui64 tabletId = it != PartitionToTablet.end() ? it->second : 0;
+ TxId = "";
+
if (!tabletId) {
CloseSession(
Sprintf("no partition %u in topic '%s', Marker# PQ4", Partition,
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h
index 5a7312f3b08..335b9e91189 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.h
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.h
@@ -238,6 +238,8 @@ private:
TString ClientDC;
TString SelectSourceIdQuery;
TString UpdateSourceIdQuery;
+ TString TxId;
+ TString KqpSessionId;
ui32 SelectSrcIdsInflight = 0;
ui64 MaxSrcIdAccessTime = 0;
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 91683d5490b..ad1e4f4d4da 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -4639,12 +4639,12 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
server.AnnoyingClient->CreateTopic(legacyName, 100);
runTest(legacyName, shortLegacyName, topicName, srcId1, 5, 100);
- runTest(legacyName, legacyName, topicName, srcId2, 6, 100);
+ runTest(legacyName, shortLegacyName, topicName, srcId2, 6, 100);
runTest("", "", topicName, srcId1, 5, 100);
runTest("", "", topicName, srcId2, 6, 100);
ui64 time = (TInstant::Now() + TDuration::Hours(4)).MilliSeconds();
- runTest(legacyName, legacyName, topicName, srcId2, 7, time);
+ runTest(legacyName, shortLegacyName, topicName, srcId2, 7, time);
}
Y_UNIT_TEST(TestReadPartitionStatus) {