diff options
author | alexnick <[email protected]> | 2022-08-12 20:07:18 +0300 |
---|---|---|
committer | alexnick <[email protected]> | 2022-08-12 20:07:18 +0300 |
commit | cc65f4da6b407e808692063c6ea6a9a910a41166 (patch) | |
tree | ea7945941cf0f5a272779daea6c1dec76374a076 | |
parent | 9d8ceae328ea2efda6fe8d5480b434e22ad3ba95 (diff) |
reduce KQP requests count
+
-rw-r--r-- | contrib/restricted/boost/hana/CONTRIBUTING.md | 5 | ||||
-rw-r--r-- | contrib/restricted/boost/hana/README.md | 23 | ||||
-rw-r--r-- | contrib/restricted/boost/hana/RELEASE_NOTES.md | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/write_session_actor.cpp | 33 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/write_session_actor.h | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 4 |
6 files changed, 33 insertions, 36 deletions
diff --git a/contrib/restricted/boost/hana/CONTRIBUTING.md b/contrib/restricted/boost/hana/CONTRIBUTING.md index 403fae5c603..52d0858a6aa 100644 --- a/contrib/restricted/boost/hana/CONTRIBUTING.md +++ b/contrib/restricted/boost/hana/CONTRIBUTING.md @@ -18,7 +18,10 @@ you contribute: running the tests: ```shell - make check + mkdir build + cd build + cmake .. + cmake --build . --target check ``` 5. Commit your changes. Your commit message should start with a one line short description of the modifications, with the details and explanations diff --git a/contrib/restricted/boost/hana/README.md b/contrib/restricted/boost/hana/README.md index b4ef11bb35c..93364118b70 100644 --- a/contrib/restricted/boost/hana/README.md +++ b/contrib/restricted/boost/hana/README.md @@ -171,21 +171,13 @@ Please see [LICENSE.md](LICENSE.md). ## Releasing -This section acts as a reminder of the few simple steps required to release a -new version of the library. This is only relevant to Hana's developers. To -release a new version of the library, make sure the current version in -`include/boost/hana/version.hpp` matches the release you're about to publish. -Then, create an annotated tag with: -```sh -git tag -a --file=- v<version> <<EOM -...your message here... -EOM -``` - -Then, push the tag and create a new GitHub release pointing to that tag. -Once that is done, bump the version number in `include/boost/hana/version.hpp` -so that it matches the next _planned_ release. Finally, do not forget to update -the [Homebrew formula][] to point to the latest version. +To release a new version of Hana, use the `util/release.sh` script. The script +will merge `develop` to `master`, create a tag on `master` and then bump the +version on `develop`. The tag on `master` will be annotated with the contents +of the `RELEASE_NOTES.md` file. Once the `release.sh` script has been run, the +`master` and `develop` branches should be pushed manually, as well as the tag +that was created on `master`. Finally, create a GitHub release pointing to the +new tag on `master`. <!-- Links --> @@ -199,4 +191,3 @@ the [Homebrew formula][] to point to the latest version. [eRuby]: http://en.wikipedia.org/wiki/ERuby [Hana.docs]: http://boostorg.github.io/hana [Hana.wiki]: https://github.com/boostorg/hana/wiki -[Homebrew formula]: https://github.com/Homebrew/homebrew-core/blob/master/Formula/hana.rb diff --git a/contrib/restricted/boost/hana/RELEASE_NOTES.md b/contrib/restricted/boost/hana/RELEASE_NOTES.md new file mode 100644 index 00000000000..c10c2a0a258 --- /dev/null +++ b/contrib/restricted/boost/hana/RELEASE_NOTES.md @@ -0,0 +1,2 @@ +Release notes for Hana 1.6.2 +============================ diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.cpp b/ydb/services/persqueue_v1/actors/write_session_actor.cpp index 35e39983a7b..acf3841435f 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.cpp @@ -320,13 +320,7 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActor void TWriteSessionActor::InitAfterDiscovery(const TActorContext& ctx) { try { - // "Bad" hash - in legacy mode calculated from full name ("rt3...", not short name); - // Here we had a bug for all the time being and now have to keep compatibility was invalid hashes - // Generally GetTopicForSrcIdHash for encoding. Do not copy-paste this line; - EncodedSourceId = NSourceIdEncoding::EncodeSrcId(FullConverter->GetTopicForSrcId(), SourceId); - - // Good hash and proper way of calcultion; - CompatibleHash = NSourceIdEncoding::EncodeSrcId(FullConverter->GetTopicForSrcIdHash(), SourceId).Hash; + EncodedSourceId = NSourceIdEncoding::EncodeSrcId(FullConverter->GetTopicForSrcIdHash(), SourceId); } catch (yexception& e) { CloseSession(TStringBuilder() << "incorrect sourceId \"" << SourceId << "\": " << e.what(), PersQueue::ErrorCode::BAD_REQUEST, ctx); return; @@ -465,7 +459,6 @@ void TWriteSessionActor::DiscoverPartition(const NActors::TActorContext& ctx) { ProceedPartition(partitionId, ctx); return; } - SendSelectPartitionRequest(CompatibleHash, FullConverter->GetClientsideName(), ctx); SendSelectPartitionRequest(EncodedSourceId.Hash, FullConverter->GetClientsideName(), ctx); State = ES_WAIT_TABLE_REQUEST_1; } @@ -475,11 +468,11 @@ void TWriteSessionActor::SendSelectPartitionRequest(ui32 hash, const TString& to auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); - ev->Record.MutableRequest()->SetKeepSession(false); + ev->Record.MutableRequest()->SetKeepSession(true); ev->Record.MutableRequest()->SetQuery(SelectSourceIdQuery); ev->Record.MutableRequest()->SetDatabase(NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig)); // fill tx settings: set commit tx flag & begin new serializable tx. - ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); + ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(false); ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); // keep compiled query in cache. ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); @@ -552,6 +545,10 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const SelectSrcIdsInflight--; auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0); + TxId = record.GetResponse().GetTxMeta().id(); + KqpSessionId = record.GetResponse().GetSessionId(); + Y_VERIFY(!TxId.empty()); + if (t.ListSize() != 0) { auto& tt = t.GetList(0).GetStruct(0); if (tt.HasOptional() && tt.GetOptional().HasUint32()) { //already got partition @@ -592,6 +589,7 @@ void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const } return; } else if (State == EState::ES_WAIT_TABLE_REQUEST_2) { + SourceIdUpdatesInflight--; if (!SourceIdUpdatesInflight) { LastSourceIdUpdate = ctx.Now(); @@ -620,7 +618,12 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor::MakeUpdateSourceIdMet ev->Record.MutableRequest()->SetKeepSession(false); // fill tx settings: set commit tx flag & begin new serializable tx. ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); - ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + if (TxId) { + ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(TxId); + ev->Record.MutableRequest()->SetSessionId(KqpSessionId); + } else { + ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + } // keep compiled query in cache. ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); @@ -639,12 +642,6 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor::MakeUpdateSourceIdMet void TWriteSessionActor::SendUpdateSrcIdsRequests(const TActorContext& ctx) { { - //full legacy name (rt3.dc--acc--topic) - auto ev = MakeUpdateSourceIdMetadataRequest(CompatibleHash, FullConverter->GetClientsideName(), ctx); - SourceIdUpdatesInflight++; - ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); - } - { auto ev = MakeUpdateSourceIdMetadataRequest(EncodedSourceId.Hash, FullConverter->GetClientsideName(), ctx); SourceIdUpdatesInflight++; ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); @@ -667,6 +664,8 @@ void TWriteSessionActor::ProceedPartition(const ui32 partition, const TActorCont ui64 tabletId = it != PartitionToTablet.end() ? it->second : 0; + TxId = ""; + if (!tabletId) { CloseSession( Sprintf("no partition %u in topic '%s', Marker# PQ4", Partition, diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h index 5a7312f3b08..335b9e91189 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.h +++ b/ydb/services/persqueue_v1/actors/write_session_actor.h @@ -238,6 +238,8 @@ private: TString ClientDC; TString SelectSourceIdQuery; TString UpdateSourceIdQuery; + TString TxId; + TString KqpSessionId; ui32 SelectSrcIdsInflight = 0; ui64 MaxSrcIdAccessTime = 0; diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 91683d5490b..ad1e4f4d4da 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -4639,12 +4639,12 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { server.AnnoyingClient->CreateTopic(legacyName, 100); runTest(legacyName, shortLegacyName, topicName, srcId1, 5, 100); - runTest(legacyName, legacyName, topicName, srcId2, 6, 100); + runTest(legacyName, shortLegacyName, topicName, srcId2, 6, 100); runTest("", "", topicName, srcId1, 5, 100); runTest("", "", topicName, srcId2, 6, 100); ui64 time = (TInstant::Now() + TDuration::Hours(4)).MilliSeconds(); - runTest(legacyName, legacyName, topicName, srcId2, 7, time); + runTest(legacyName, shortLegacyName, topicName, srcId2, 7, time); } Y_UNIT_TEST(TestReadPartitionStatus) { |