aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@ydb.tech>2022-09-09 15:04:41 +0300
committeralexnick <alexnick@ydb.tech>2022-09-09 15:04:41 +0300
commit14b402db2f9000b53d033cee4458400d63d813ca (patch)
tree3732865d80ed87570105afafffb7243cb3b17cb7
parent7cd934b41bf8f1487cbb4eb783a1e7e7e0a8cbdc (diff)
downloadydb-14b402db2f9000b53d033cee4458400d63d813ca.tar.gz
support of cdc streams and topics inside directories in YDS
-rw-r--r--contrib/restricted/boost/ublas/README.md51
-rw-r--r--ydb/services/datastreams/datastreams_proxy.cpp16
-rw-r--r--ydb/services/datastreams/datastreams_ut.cpp63
3 files changed, 109 insertions, 21 deletions
diff --git a/contrib/restricted/boost/ublas/README.md b/contrib/restricted/boost/ublas/README.md
index a0387df253..66e2b271f2 100644
--- a/contrib/restricted/boost/ublas/README.md
+++ b/contrib/restricted/boost/ublas/README.md
@@ -1,15 +1,46 @@
-ublas
+Boost.uBLAS Linear Algebra Library
=====
+Boost.uBLAS is part of the [Boost C++ Libraries](http://github.com/boostorg). It is directed towards scientific computing on the level of basic linear algebra constructions with matrices and vectors and their corresponding abstract operations.
-The Boost.uBLAS Linear Algebra Library v1.0
-- To follow development and test experimental features, you can clone the Github project uBLAS/ublas
-at https://github.com/uBLAS/ublas
-- A development wiki is available at https://github.com/uBLAS/ublas/wiki
-- A mailing-list is available at http://lists.boost.org/ublas/
-- For any other questions, you can contact David at david.bellot@gmail.com
+## Documentation
+uBLAS is documented at [boost.org](https://www.boost.org/doc/libs/1_69_0/libs/numeric/ublas/doc/index.html).
+The development has a [wiki page](https://github.com/uBLAS/ublas/wiki).
+The tensor extension has a separate [wiki page](https://github.com/BoostGSoC18/tensor/wiki).
-- version numbers have never been used for this library until 02 March 2014.
- So we start at v1.0 on that day.
+## License
+Distributed under the [Boost Software License, Version 1.0](http://www.boost.org/LICENSE_1_0.txt).
-last update: 1 April 2014
+## Properties
+* Header-only
+* Tensor extension requires C++17 compatible compiler, compiles with
+ * gcc 7.3.0
+ * clang 6.0
+ * msvc 14.1
+* Unit-tests require Boost.Test
+
+## Build Status
+
+Branch | Travis | Appveyor | codecov.io | Docs |
+:-------------: | ------ | -------- | ---------- | ---- |
+[`master`](https://github.com/boostorg/ublas/tree/master) | [![Build Status](https://travis-ci.org/boostorg/ublas.svg?branch=master)](https://travis-ci.org/boostorg/ublas) | [![Build status](https://ci.appveyor.com/api/projects/status/ctu3wnfowa627ful/branch/master?svg=true)](https://ci.appveyor.com/project/stefanseefeld/ublas/branch/master) | [![codecov](https://codecov.io/gh/boostorg/ublas/branch/master/graph/badge.svg)](https://codecov.io/gh/boostorg/ublas/branch/master) | [![Documentation](https://img.shields.io/badge/docs-develop-brightgreen.svg)](http://www.boost.org/doc/libs/release/libs/numeric)
+[`develop`](https://github.com/boostorg/ublas/tree/develop) | [![Build Status](https://travis-ci.org/boostorg/ublas.svg?branch=develop)](https://travis-ci.org/boostorg/ublas) | [![Build status](https://ci.appveyor.com/api/projects/status/ctu3wnfowa627ful/branch/develop?svg=true)](https://ci.appveyor.com/project/stefanseefeld/ublas/branch/develop) | [![codecov](https://codecov.io/gh/boostorg/ublas/branch/develop/graph/badge.svg)](https://codecov.io/gh/boostorg/ublas/branch/develop) | [![Documentation](https://img.shields.io/badge/docs-develop-brightgreen.svg)](http://www.boost.org/doc/libs/release/libs/numeric)
+
+
+## Directories
+
+| Name | Purpose |
+| ----------- | ------------------------------ |
+| `doc` | documentation |
+| `examples` | example files |
+| `include` | headers |
+| `test` | unit tests |
+| `benchmarks`| timing and benchmarking |
+
+## More information
+
+* Ask questions in [stackoverflow](http://stackoverflow.com/questions/ask?tags=c%2B%2B,boost,boost-ublas) with `boost-ublas` or `ublas` tags.
+* Report [bugs](https://github.com/boostorg/ublas/issues) and be sure to mention Boost version, platform and compiler you're using. A small compilable code sample to reproduce the problem is always good as well.
+* Submit your patches as pull requests against **develop** branch. Note that by submitting patches you agree to license your modifications under the [Boost Software License, Version 1.0](http://www.boost.org/LICENSE_1_0.txt).
+* Developer discussions about the library are held on the [Boost developers mailing list](https://lists.boost.org/mailman/listinfo.cgi/ublas). Be sure to read the [discussion policy](http://www.boost.org/community/policy.html) before posting and add the `[ublas]` tag at the beginning of the subject line
+* For any other questions, you can contact David, Stefan or Cem: david.bellot-AT-gmail-DOT-com, cem.bassoy-AT-gmail-DOT-com stefan-AT-seefeld-DOT-name
diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp
index 1a6fa1b645..f4948a465f 100644
--- a/ydb/services/datastreams/datastreams_proxy.cpp
+++ b/ydb/services/datastreams/datastreams_proxy.cpp
@@ -149,11 +149,6 @@ namespace NKikimr::NDataStreams::V1 {
}
}
- if (workingDir != proposal.Record.GetDatabaseName() && !proposal.Record.GetDatabaseName().empty()) {
- return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
- "streams can be created only at database root", ctx);
- }
-
auto pqDescr = modifyScheme.MutableCreatePersQueueGroup();
if (GetProtoRequest()->retention_case() ==
Ydb::DataStreams::V1::CreateStreamRequest::RetentionCase::kRetentionStorageMegabytes) {
@@ -851,7 +846,14 @@ namespace NKikimr::NDataStreams::V1 {
void TListStreamsActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
const NSchemeCache::TSchemeCacheNavigate* navigate = ev->Get()->Request.Get();
for (const auto& entry : navigate->ResultSet) {
- if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindPath
+
+
+ if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindTable) {
+ for (const auto& stream : entry.CdcStreams) {
+ TString childFullPath = JoinPath({JoinPath(entry.Path), stream.GetName()});
+ Topics.push_back(childFullPath);
+ }
+ } else if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindPath
|| entry.Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindSubdomain)
{
Y_ENSURE(entry.ListNodeEntry, "ListNodeEntry is zero");
@@ -859,6 +861,7 @@ namespace NKikimr::NDataStreams::V1 {
TString childFullPath = JoinPath({JoinPath(entry.Path), child.Name});
switch (child.Kind) {
case NSchemeCache::TSchemeCacheNavigate::EKind::KindPath:
+ case NSchemeCache::TSchemeCacheNavigate::EKind::KindTable:
if (GetProtoRequest()->recurse()) {
SendNavigateRequest(ctx, childFullPath);
}
@@ -866,6 +869,7 @@ namespace NKikimr::NDataStreams::V1 {
case NSchemeCache::TSchemeCacheNavigate::EKind::KindTopic:
Topics.push_back(childFullPath);
break;
+
default:
break;
// ignore all other types
diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp
index 31653ed3bb..b63fc79748 100644
--- a/ydb/services/datastreams/datastreams_ut.cpp
+++ b/ydb/services/datastreams/datastreams_ut.cpp
@@ -189,7 +189,45 @@ Y_UNIT_TEST_SUITE(DataStreams) {
Y_UNIT_TEST(TestControlPlaneAndMeteringData) {
TInsecureDatastreamsTestServer testServer;
const TString streamName = TStringBuilder() << "stream_" << Y_UNIT_TEST_NAME;
+ const TString streamName2 = TStringBuilder() << "tdir/stream_" << Y_UNIT_TEST_NAME;
+ const TString streamName3 = TStringBuilder() << "tdir/table/feed_" << Y_UNIT_TEST_NAME;
+ const TString tableName = "tdir/table";
+ const TString feedName = TStringBuilder() << "feed_" << Y_UNIT_TEST_NAME;
+
+ {
+ NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
+ auto result = pqClient.CreateTopic(streamName2).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+
+ {
+ NYdb::NTable::TTableClient tableClient(*testServer.Driver);
+ tableClient.RetryOperationSync([&](TSession session)
+ {
+ NYdb::NTable::TTableBuilder builder;
+ builder.AddNonNullableColumn("key", NYdb::EPrimitiveType::String).SetPrimaryKeyColumn("key");
+
+ auto result = session.CreateTable("/Root/" + tableName, builder.Build()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ Cerr << result.GetIssues().ToString() << "\n";
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+
+ auto result2 = session.AlterTable("/Root/" + tableName, NYdb::NTable::TAlterTableSettings()
+ .AppendAddChangefeeds(NYdb::NTable::TChangefeedDescription(feedName,
+ NYdb::NTable::EChangefeedMode::Updates,
+ NYdb::NTable::EChangefeedFormat::Json))
+ ).ExtractValueSync();
+ Cerr << result2.GetIssues().ToString() << "\n";
+ UNIT_ASSERT_VALUES_EQUAL(result2.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result2.GetStatus(), EStatus::SUCCESS);
+ return result2;
+ }
+ );
+ }
+
// Trying to delete stream that doesn't exist yet
+
{
auto result = testServer.DataStreamsClient->DeleteStream("testfolder/" + streamName).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
@@ -242,11 +280,6 @@ Y_UNIT_TEST_SUITE(DataStreams) {
UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_description_summary().open_shard_count(), 3);
}
- {
- auto result = testServer.DataStreamsClient->CreateStream("testfolder/" + streamName).ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
- }
{
auto result = testServer.DataStreamsClient->ListStreams().ExtractValueSync();
@@ -271,6 +304,18 @@ Y_UNIT_TEST_SUITE(DataStreams) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
+ // list all streams, include cdc and recursive
+ {
+ auto result = testServer.DataStreamsClient->ListStreams(NYdb::NDataStreams::V1::TListStreamsSettings().Recurse(true)).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ Cerr << result.GetResult() << "\n";
+ UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_names().size(), 3);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_names(0), streamName);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_names(1), streamName2);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_names(2), streamName3);
+ }
+
// now when stream is created delete should work fine
{
auto result = testServer.DataStreamsClient->DeleteStream(streamName).ExtractValueSync();
@@ -284,6 +329,14 @@ Y_UNIT_TEST_SUITE(DataStreams) {
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR);
}
+
+ {
+ auto result = testServer.DataStreamsClient->CreateStream("testfolder/" + streamName).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+
+
}
Y_UNIT_TEST(TestReservedResourcesMetering) {