diff options
author | alexnick <alexnick@ydb.tech> | 2022-09-09 15:04:41 +0300 |
---|---|---|
committer | alexnick <alexnick@ydb.tech> | 2022-09-09 15:04:41 +0300 |
commit | 14b402db2f9000b53d033cee4458400d63d813ca (patch) | |
tree | 3732865d80ed87570105afafffb7243cb3b17cb7 | |
parent | 7cd934b41bf8f1487cbb4eb783a1e7e7e0a8cbdc (diff) | |
download | ydb-14b402db2f9000b53d033cee4458400d63d813ca.tar.gz |
support of cdc streams and topics inside directories in YDS
-rw-r--r-- | contrib/restricted/boost/ublas/README.md | 51 | ||||
-rw-r--r-- | ydb/services/datastreams/datastreams_proxy.cpp | 16 | ||||
-rw-r--r-- | ydb/services/datastreams/datastreams_ut.cpp | 63 |
3 files changed, 109 insertions, 21 deletions
diff --git a/contrib/restricted/boost/ublas/README.md b/contrib/restricted/boost/ublas/README.md index a0387df253..66e2b271f2 100644 --- a/contrib/restricted/boost/ublas/README.md +++ b/contrib/restricted/boost/ublas/README.md @@ -1,15 +1,46 @@ -ublas +Boost.uBLAS Linear Algebra Library ===== +Boost.uBLAS is part of the [Boost C++ Libraries](http://github.com/boostorg). It is directed towards scientific computing on the level of basic linear algebra constructions with matrices and vectors and their corresponding abstract operations. -The Boost.uBLAS Linear Algebra Library v1.0 -- To follow development and test experimental features, you can clone the Github project uBLAS/ublas -at https://github.com/uBLAS/ublas -- A development wiki is available at https://github.com/uBLAS/ublas/wiki -- A mailing-list is available at http://lists.boost.org/ublas/ -- For any other questions, you can contact David at david.bellot@gmail.com +## Documentation +uBLAS is documented at [boost.org](https://www.boost.org/doc/libs/1_69_0/libs/numeric/ublas/doc/index.html). +The development has a [wiki page](https://github.com/uBLAS/ublas/wiki). +The tensor extension has a separate [wiki page](https://github.com/BoostGSoC18/tensor/wiki). -- version numbers have never been used for this library until 02 March 2014. - So we start at v1.0 on that day. +## License +Distributed under the [Boost Software License, Version 1.0](http://www.boost.org/LICENSE_1_0.txt). -last update: 1 April 2014 +## Properties +* Header-only +* Tensor extension requires C++17 compatible compiler, compiles with + * gcc 7.3.0 + * clang 6.0 + * msvc 14.1 +* Unit-tests require Boost.Test + +## Build Status + +Branch | Travis | Appveyor | codecov.io | Docs | +:-------------: | ------ | -------- | ---------- | ---- | +[`master`](https://github.com/boostorg/ublas/tree/master) | [![Build Status](https://travis-ci.org/boostorg/ublas.svg?branch=master)](https://travis-ci.org/boostorg/ublas) | [![Build status](https://ci.appveyor.com/api/projects/status/ctu3wnfowa627ful/branch/master?svg=true)](https://ci.appveyor.com/project/stefanseefeld/ublas/branch/master) | [![codecov](https://codecov.io/gh/boostorg/ublas/branch/master/graph/badge.svg)](https://codecov.io/gh/boostorg/ublas/branch/master) | [![Documentation](https://img.shields.io/badge/docs-develop-brightgreen.svg)](http://www.boost.org/doc/libs/release/libs/numeric) +[`develop`](https://github.com/boostorg/ublas/tree/develop) | [![Build Status](https://travis-ci.org/boostorg/ublas.svg?branch=develop)](https://travis-ci.org/boostorg/ublas) | [![Build status](https://ci.appveyor.com/api/projects/status/ctu3wnfowa627ful/branch/develop?svg=true)](https://ci.appveyor.com/project/stefanseefeld/ublas/branch/develop) | [![codecov](https://codecov.io/gh/boostorg/ublas/branch/develop/graph/badge.svg)](https://codecov.io/gh/boostorg/ublas/branch/develop) | [![Documentation](https://img.shields.io/badge/docs-develop-brightgreen.svg)](http://www.boost.org/doc/libs/release/libs/numeric) + + +## Directories + +| Name | Purpose | +| ----------- | ------------------------------ | +| `doc` | documentation | +| `examples` | example files | +| `include` | headers | +| `test` | unit tests | +| `benchmarks`| timing and benchmarking | + +## More information + +* Ask questions in [stackoverflow](http://stackoverflow.com/questions/ask?tags=c%2B%2B,boost,boost-ublas) with `boost-ublas` or `ublas` tags. +* Report [bugs](https://github.com/boostorg/ublas/issues) and be sure to mention Boost version, platform and compiler you're using. A small compilable code sample to reproduce the problem is always good as well. +* Submit your patches as pull requests against **develop** branch. Note that by submitting patches you agree to license your modifications under the [Boost Software License, Version 1.0](http://www.boost.org/LICENSE_1_0.txt). +* Developer discussions about the library are held on the [Boost developers mailing list](https://lists.boost.org/mailman/listinfo.cgi/ublas). Be sure to read the [discussion policy](http://www.boost.org/community/policy.html) before posting and add the `[ublas]` tag at the beginning of the subject line +* For any other questions, you can contact David, Stefan or Cem: david.bellot-AT-gmail-DOT-com, cem.bassoy-AT-gmail-DOT-com stefan-AT-seefeld-DOT-name diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 1a6fa1b645..f4948a465f 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -149,11 +149,6 @@ namespace NKikimr::NDataStreams::V1 { } } - if (workingDir != proposal.Record.GetDatabaseName() && !proposal.Record.GetDatabaseName().empty()) { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, - "streams can be created only at database root", ctx); - } - auto pqDescr = modifyScheme.MutableCreatePersQueueGroup(); if (GetProtoRequest()->retention_case() == Ydb::DataStreams::V1::CreateStreamRequest::RetentionCase::kRetentionStorageMegabytes) { @@ -851,7 +846,14 @@ namespace NKikimr::NDataStreams::V1 { void TListStreamsActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { const NSchemeCache::TSchemeCacheNavigate* navigate = ev->Get()->Request.Get(); for (const auto& entry : navigate->ResultSet) { - if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindPath + + + if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindTable) { + for (const auto& stream : entry.CdcStreams) { + TString childFullPath = JoinPath({JoinPath(entry.Path), stream.GetName()}); + Topics.push_back(childFullPath); + } + } else if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindPath || entry.Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindSubdomain) { Y_ENSURE(entry.ListNodeEntry, "ListNodeEntry is zero"); @@ -859,6 +861,7 @@ namespace NKikimr::NDataStreams::V1 { TString childFullPath = JoinPath({JoinPath(entry.Path), child.Name}); switch (child.Kind) { case NSchemeCache::TSchemeCacheNavigate::EKind::KindPath: + case NSchemeCache::TSchemeCacheNavigate::EKind::KindTable: if (GetProtoRequest()->recurse()) { SendNavigateRequest(ctx, childFullPath); } @@ -866,6 +869,7 @@ namespace NKikimr::NDataStreams::V1 { case NSchemeCache::TSchemeCacheNavigate::EKind::KindTopic: Topics.push_back(childFullPath); break; + default: break; // ignore all other types diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp index 31653ed3bb..b63fc79748 100644 --- a/ydb/services/datastreams/datastreams_ut.cpp +++ b/ydb/services/datastreams/datastreams_ut.cpp @@ -189,7 +189,45 @@ Y_UNIT_TEST_SUITE(DataStreams) { Y_UNIT_TEST(TestControlPlaneAndMeteringData) { TInsecureDatastreamsTestServer testServer; const TString streamName = TStringBuilder() << "stream_" << Y_UNIT_TEST_NAME; + const TString streamName2 = TStringBuilder() << "tdir/stream_" << Y_UNIT_TEST_NAME; + const TString streamName3 = TStringBuilder() << "tdir/table/feed_" << Y_UNIT_TEST_NAME; + const TString tableName = "tdir/table"; + const TString feedName = TStringBuilder() << "feed_" << Y_UNIT_TEST_NAME; + + { + NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); + auto result = pqClient.CreateTopic(streamName2).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + { + NYdb::NTable::TTableClient tableClient(*testServer.Driver); + tableClient.RetryOperationSync([&](TSession session) + { + NYdb::NTable::TTableBuilder builder; + builder.AddNonNullableColumn("key", NYdb::EPrimitiveType::String).SetPrimaryKeyColumn("key"); + + auto result = session.CreateTable("/Root/" + tableName, builder.Build()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + Cerr << result.GetIssues().ToString() << "\n"; + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + auto result2 = session.AlterTable("/Root/" + tableName, NYdb::NTable::TAlterTableSettings() + .AppendAddChangefeeds(NYdb::NTable::TChangefeedDescription(feedName, + NYdb::NTable::EChangefeedMode::Updates, + NYdb::NTable::EChangefeedFormat::Json)) + ).ExtractValueSync(); + Cerr << result2.GetIssues().ToString() << "\n"; + UNIT_ASSERT_VALUES_EQUAL(result2.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result2.GetStatus(), EStatus::SUCCESS); + return result2; + } + ); + } + // Trying to delete stream that doesn't exist yet + { auto result = testServer.DataStreamsClient->DeleteStream("testfolder/" + streamName).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); @@ -242,11 +280,6 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_description_summary().open_shard_count(), 3); } - { - auto result = testServer.DataStreamsClient->CreateStream("testfolder/" + streamName).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST); - } { auto result = testServer.DataStreamsClient->ListStreams().ExtractValueSync(); @@ -271,6 +304,18 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } + // list all streams, include cdc and recursive + { + auto result = testServer.DataStreamsClient->ListStreams(NYdb::NDataStreams::V1::TListStreamsSettings().Recurse(true)).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + Cerr << result.GetResult() << "\n"; + UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_names().size(), 3); + UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_names(0), streamName); + UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_names(1), streamName2); + UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_names(2), streamName3); + } + // now when stream is created delete should work fine { auto result = testServer.DataStreamsClient->DeleteStream(streamName).ExtractValueSync(); @@ -284,6 +329,14 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR); } + + { + auto result = testServer.DataStreamsClient->CreateStream("testfolder/" + streamName).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + } Y_UNIT_TEST(TestReservedResourcesMetering) { |