diff options
author | hcpp <hcpp@ydb.tech> | 2023-03-17 13:31:14 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-03-17 13:31:14 +0300 |
commit | a2c9dca2891fe7d1e93aa8805358eaf50dce6368 (patch) | |
tree | d10ecf7cb51d35c95a422bf3a342e23b0a55fd00 | |
parent | ce08e2b5999fa79363615e13b108e10621022540 (diff) | |
download | ydb-a2c9dca2891fe7d1e93aa8805358eaf50dce6368.tar.gz |
external source plugin has been added
38 files changed, 1072 insertions, 6 deletions
diff --git a/ydb/core/CMakeLists.txt b/ydb/core/CMakeLists.txt index e4c01cb040e..b60f4b0f166 100644 --- a/ydb/core/CMakeLists.txt +++ b/ydb/core/CMakeLists.txt @@ -20,6 +20,7 @@ add_subdirectory(discovery) add_subdirectory(driver_lib) add_subdirectory(engine) add_subdirectory(erasure) +add_subdirectory(external_sources) add_subdirectory(filestore) add_subdirectory(formats) add_subdirectory(fq) diff --git a/ydb/core/external_sources/CMakeLists.darwin-x86_64.txt b/ydb/core/external_sources/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..be0a88b0693 --- /dev/null +++ b/ydb/core/external_sources/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,25 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(ydb-core-external_sources) +target_link_libraries(ydb-core-external_sources PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-scheme + ydb-core-base + ydb-core-protos + providers-s3-path_generator + cpp-client-ydb_params + cpp-client-ydb_value +) +target_sources(ydb-core-external_sources PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/external_source_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/object_storage.cpp +) diff --git a/ydb/core/external_sources/CMakeLists.linux-aarch64.txt b/ydb/core/external_sources/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..3aa55187905 --- /dev/null +++ b/ydb/core/external_sources/CMakeLists.linux-aarch64.txt @@ -0,0 +1,26 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(ydb-core-external_sources) +target_link_libraries(ydb-core-external_sources PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-scheme + ydb-core-base + ydb-core-protos + providers-s3-path_generator + cpp-client-ydb_params + cpp-client-ydb_value +) +target_sources(ydb-core-external_sources PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/external_source_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/object_storage.cpp +) diff --git a/ydb/core/external_sources/CMakeLists.linux-x86_64.txt b/ydb/core/external_sources/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..3aa55187905 --- /dev/null +++ b/ydb/core/external_sources/CMakeLists.linux-x86_64.txt @@ -0,0 +1,26 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(ydb-core-external_sources) +target_link_libraries(ydb-core-external_sources PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-scheme + ydb-core-base + ydb-core-protos + providers-s3-path_generator + cpp-client-ydb_params + cpp-client-ydb_value +) +target_sources(ydb-core-external_sources PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/external_source_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/object_storage.cpp +) diff --git a/ydb/core/external_sources/CMakeLists.txt b/ydb/core/external_sources/CMakeLists.txt new file mode 100644 index 00000000000..d90657116d0 --- /dev/null +++ b/ydb/core/external_sources/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/external_sources/CMakeLists.windows-x86_64.txt b/ydb/core/external_sources/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..be0a88b0693 --- /dev/null +++ b/ydb/core/external_sources/CMakeLists.windows-x86_64.txt @@ -0,0 +1,25 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(ydb-core-external_sources) +target_link_libraries(ydb-core-external_sources PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-scheme + ydb-core-base + ydb-core-protos + providers-s3-path_generator + cpp-client-ydb_params + cpp-client-ydb_value +) +target_sources(ydb-core-external_sources PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/external_source_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/object_storage.cpp +) diff --git a/ydb/core/external_sources/external_source.h b/ydb/core/external_sources/external_source.h new file mode 100644 index 00000000000..9b5f1afe400 --- /dev/null +++ b/ydb/core/external_sources/external_source.h @@ -0,0 +1,19 @@ +#pragma once + +#include <util/generic/string.h> +#include <ydb/core/protos/external_sources.pb.h> +#include <ydb/library/yql/public/issue/yql_issue.h> + +namespace NKikimr::NExternalSource { + +struct TExternalSourceException: public yexception { +}; + +struct IExternalSource : public TThrRefBase { + using TPtr = TIntrusivePtr<IExternalSource>; + + virtual TString Pack(const NKikimrExternalSources::TSchema& schema, + const NKikimrExternalSources::TGeneral& general) const = 0; +}; + +} diff --git a/ydb/core/external_sources/external_source_factory.cpp b/ydb/core/external_sources/external_source_factory.cpp new file mode 100644 index 00000000000..4bc78d4d9b1 --- /dev/null +++ b/ydb/core/external_sources/external_source_factory.cpp @@ -0,0 +1,36 @@ +#include "external_source_factory.h" +#include "object_storage.h" + +#include <util/generic/map.h> + + +namespace NKikimr::NExternalSource { + +namespace { + +struct TExternalSourceFactory : public IExternalSourceFactory { + TExternalSourceFactory(const TMap<TString, IExternalSource::TPtr>& sources) + : Sources(sources) + {} + + IExternalSource::TPtr GetOrCreate(const TString& type) const override { + auto it = Sources.find(type); + if (it != Sources.end()) { + return it->second; + } + ythrow TExternalSourceException() << "External source with type " << type << " was not found"; + } + +private: + TMap<TString, IExternalSource::TPtr> Sources; +}; + +} + +IExternalSourceFactory::TPtr CreateExternalSourceFactory() { + return MakeIntrusive<TExternalSourceFactory>(TMap<TString, IExternalSource::TPtr>{ + {"ObjectStorage", CreateObjectStorageExternalSource()} + }); +} + +} diff --git a/ydb/core/external_sources/external_source_factory.h b/ydb/core/external_sources/external_source_factory.h new file mode 100644 index 00000000000..a1df487f502 --- /dev/null +++ b/ydb/core/external_sources/external_source_factory.h @@ -0,0 +1,15 @@ +#pragma once + +#include "external_source.h" + +namespace NKikimr::NExternalSource { + +struct IExternalSourceFactory : public TThrRefBase { + using TPtr = TIntrusivePtr<IExternalSourceFactory>; + + virtual IExternalSource::TPtr GetOrCreate(const TString& type) const = 0; +}; + +IExternalSourceFactory::TPtr CreateExternalSourceFactory(); + +} diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp new file mode 100644 index 00000000000..c2a8340a879 --- /dev/null +++ b/ydb/core/external_sources/object_storage.cpp @@ -0,0 +1,319 @@ +#include "external_source.h" + +#include <ydb/core/protos/external_sources.pb.h> +#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h> +#include <ydb/public/api/protos/ydb_status_codes.pb.h> +#include <ydb/public/sdk/cpp/client/ydb_value/value.h> + +#include <library/cpp/scheme/scheme.h> + +#include <util/string/builder.h> + +#include <array> + +namespace NKikimr::NExternalSource { + +namespace { + +struct TObjectStorageExternalSource : public IExternalSource { + virtual TString Pack(const NKikimrExternalSources::TSchema& schema, + const NKikimrExternalSources::TGeneral& general) const override { + NKikimrExternalSources::TObjectStorage objectStorage; + for (const auto& [key, value]: general.attributes()) { + if (key == "format") { + objectStorage.set_format(value); + } else if (key == "compression") { + objectStorage.set_compression(value); + } else if (key.StartsWith("projection.") || key == "storage.location.template") { + objectStorage.mutable_projection()->insert({key, value}); + } else if (key == "partitioned_by") { + auto json = NSc::TValue::FromJsonThrow(value); + for (const auto& column: json.GetArray()) { + *objectStorage.add_partitioned_by() = column; + } + } else if (IsIn({"file_pattern"sv, "data.interval.unit"sv, "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv, "csv_delimiter"sv}, key)) { + objectStorage.mutable_format_setting()->insert({key, value}); + } else { + ythrow TExternalSourceException() << "Unknown attribute " << key; + } + } + + if (auto issues = Validate(schema, objectStorage)) { + ythrow TExternalSourceException() << issues.ToString() << Endl; + } + + return objectStorage.SerializeAsString(); + } + +private: + static NYql::TIssues Validate(const NKikimrExternalSources::TSchema& schema, const NKikimrExternalSources::TObjectStorage& objectStorage) { + NYql::TIssues issues; + issues.AddIssues(ValidateFormatSetting(objectStorage.format(), objectStorage.format_setting())); + if (objectStorage.projection_size() || objectStorage.partitioned_by_size()) { + try { + TVector<TString> partitionedBy{objectStorage.partitioned_by().begin(), objectStorage.partitioned_by().end()}; + issues.AddIssues(ValidateProjectionColumns(schema, partitionedBy)); + TString projectionStr; + if (objectStorage.projection_size()) { + NSc::TValue projection; + for (const auto& [key, value]: objectStorage.projection()) { + projection[key] = value; + } + projectionStr = projection.ToJsonPretty(); + } + issues.AddIssues(ValidateProjection(schema, projectionStr, partitionedBy)); + } catch (...) { + issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, CurrentExceptionMessage())); + } + } + return issues; + } + + static NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobuf::Map<TString, TString>& formatSetting) { + NYql::TIssues issues; + issues.AddIssues(ValidateDateFormatSetting(formatSetting)); + for (const auto& [key, value]: formatSetting) { + if (key == "file_pattern"sv) { + continue; + } + + if (key == "data.interval.unit"sv) { + if (!IsValidIntervalUnit(value)) { + issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, "unknown value for data.interval.unit " + value)); + } + continue; + } + + if (IsIn({ "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv}, key)) { + continue; + } + + if (key == "csv_delimiter"sv) { + if (format != "csv_with_names"sv) { + issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, "csv_delimiter should be used only with format csv_with_names")); + } + if (value.size() != 1) { + issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, "csv_delimiter should contain only one character")); + } + continue; + } + + issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, "unknown format setting " + key)); + } + return issues; + } + + static NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting) { + NYql::TIssues issues; + TSet<TString> conflictingKeys; + for (const auto& [key, value]: formatSetting) { + if (key == "data.datetime.format_name"sv) { + if (!IsValidDateTimeFormatName(value)) { + issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, "unknown value for data.datetime.format_name " + value)); + } + if (conflictingKeys.contains("data.datetime.format")) { + issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, "Don't use data.datetime.format_name and data.datetime.format together")); + } + conflictingKeys.insert("data.datetime.format_name"); + continue; + } + + if (key == "data.datetime.format"sv) { + if (conflictingKeys.contains("data.datetime.format_name")) { + issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, "Don't use data.datetime.format_name and data.datetime.format together")); + } + conflictingKeys.insert("data.datetime.format"); + continue; + } + + if (key == "data.timestamp.format_name"sv) { + if (!IsValidTimestampFormatName(value)) { + issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, "unknown value for data.timestamp.format_name " + value)); + } + if (conflictingKeys.contains("data.timestamp.format")) { + issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, "Don't use data.timestamp.format_name and data.timestamp.format together")); + } + conflictingKeys.insert("data.timestamp.format_name"); + continue; + } + + if (key == "data.timestamp.format"sv) { + if (conflictingKeys.contains("data.timestamp.format_name")) { + issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, "Don't use data.timestamp.format_name and data.timestamp.format together")); + } + conflictingKeys.insert("data.timestamp.format"); + continue; + } + } + return issues; + } + + static bool IsValidIntervalUnit(const TString& unit) { + static constexpr std::array<std::string_view, 7> IntervalUnits = { + "MICROSECONDS"sv, + "MILLISECONDS"sv, + "SECONDS"sv, + "MINUTES"sv, + "HOURS"sv, + "DAYS"sv, + "WEEKS"sv + }; + return IsIn(IntervalUnits, unit); + } + + static bool IsValidDateTimeFormatName(const TString& formatName) { + static constexpr std::array<std::string_view, 2> FormatNames = { + "POSIX"sv, + "ISO"sv + }; + return IsIn(FormatNames, formatName); + } + + static bool IsValidTimestampFormatName(const TString& formatName) { + static constexpr std::array<std::string_view, 5> FormatNames = { + "POSIX"sv, + "ISO"sv, + "UNIX_TIME_MILLISECONDS"sv, + "UNIX_TIME_SECONDS"sv, + "UNIX_TIME_MICROSECONDS"sv + }; + return IsIn(FormatNames, formatName); + } + + static NYql::TIssue MakeErrorIssue(NYql::TIssueCode id, const TString& message) { + NYql::TIssue issue; + issue.SetCode(id, NYql::TSeverityIds::S_ERROR); + issue.SetMessage(message); + return issue; + } + + static NYql::TIssues ValidateProjectionColumns(const NKikimrExternalSources::TSchema& schema, const TVector<TString>& partitionedBy) { + NYql::TIssues issues; + TMap<TString, Ydb::Type> types; + for (const auto& column: schema.column()) { + types[column.name()] = column.type(); + } + for (const auto& parititonedColumn: partitionedBy) { + auto it = types.find(parititonedColumn); + if (it == types.end()) { + issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, TStringBuilder{} << "Column " << parititonedColumn << " from partitioned_by does not exist in the scheme. Please add such a column to your scheme")); + continue; + } + NYdb::TType columnType{it->second}; + issues.AddIssues(ValidateCommonProjectionType(columnType, parititonedColumn)); + } + return issues; + } + + static NYql::TIssues ValidateProjectionType(const NYdb::TType& columnType, const TString& columnName, const std::vector<NYdb::TType>& availableTypes) { + return FindIf(availableTypes, [&columnType](const auto& availableType) { return NYdb::TypesEqual(availableType, columnType); }) == availableTypes.end() + ? NYql::TIssues{MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, TStringBuilder{} << "Column \"" << columnName << "\" from projection does not support " << columnType.ToString() << " type")} + : NYql::TIssues{}; + } + + static NYql::TIssues ValidateIntegerProjectionType(const NYdb::TType& columnType, const TString& columnName) { + static const std::vector<NYdb::TType> availableTypes { + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::String) + .Build(), + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Int64) + .Build(), + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Utf8) + .Build() + }; + return ValidateProjectionType(columnType, columnName, availableTypes); + } + + static NYql::TIssues ValidateEnumProjectionType(const NYdb::TType& columnType, const TString& columnName) { + static const std::vector<NYdb::TType> availableTypes { + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::String) + .Build() + }; + return ValidateProjectionType(columnType, columnName, availableTypes); + } + + static NYql::TIssues ValidateCommonProjectionType(const NYdb::TType& columnType, const TString& columnName) { + static const std::vector<NYdb::TType> availableTypes { + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::String) + .Build(), + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Int64) + .Build(), + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Utf8) + .Build(), + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Int32) + .Build(), + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Uint32) + .Build(), + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Uint64) + .Build(), + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Date) + .Build() + }; + return ValidateProjectionType(columnType, columnName, availableTypes); + } + + static NYql::TIssues ValidateDateProjectionType(const NYdb::TType& columnType, const TString& columnName) { + static const std::vector<NYdb::TType> availableTypes { + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::String) + .Build(), + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Utf8) + .Build(), + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Uint32) + .Build(), + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Date) + .Build() + }; + return ValidateProjectionType(columnType, columnName, availableTypes); + } + + static NYql::TIssues ValidateProjection(const NKikimrExternalSources::TSchema& schema, const TString& projection, const TVector<TString>& partitionedBy) { + auto generator =NYql::NPathGenerator::CreatePathGenerator(projection, partitionedBy); // an exception is thrown if an error occurs + TMap<TString, NYql::NPathGenerator::IPathGenerator::EType> projectionColumns; + for (const auto& column: generator->GetConfig().Rules) { + projectionColumns[column.Name] = column.Type; + } + NYql::TIssues issues; + for (const auto& column: schema.column()) { + auto it = projectionColumns.find(column.name()); + if (it != projectionColumns.end()) { + switch (it->second) { + case NYql::NPathGenerator::IPathGenerator::EType::INTEGER: + issues.AddIssues(ValidateIntegerProjectionType(NYdb::TType{column.type()}, column.name())); + break; + case NYql::NPathGenerator::IPathGenerator::EType::ENUM: + issues.AddIssues(ValidateEnumProjectionType(NYdb::TType{column.type()}, column.name())); + break; + case NYql::NPathGenerator::IPathGenerator::EType::DATE: + issues.AddIssues(ValidateDateProjectionType(NYdb::TType{column.type()}, column.name())); + break; + case NYql::NPathGenerator::IPathGenerator::EType::UNDEFINED: + issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, TStringBuilder{} << "Column \"" << column.name() << "\" from projection has undefined generator type")); + break; + } + } + } + return issues; + } +}; + +} + +IExternalSource::TPtr CreateObjectStorageExternalSource() { + return MakeIntrusive<TObjectStorageExternalSource>(); +} + +} diff --git a/ydb/core/external_sources/object_storage.h b/ydb/core/external_sources/object_storage.h new file mode 100644 index 00000000000..22bd9f6d765 --- /dev/null +++ b/ydb/core/external_sources/object_storage.h @@ -0,0 +1,9 @@ +#pragma once + +#include "external_source.h" + +namespace NKikimr::NExternalSource { + +IExternalSource::TPtr CreateObjectStorageExternalSource(); + +} diff --git a/ydb/core/external_sources/object_storage_ut.cpp b/ydb/core/external_sources/object_storage_ut.cpp new file mode 100644 index 00000000000..9fe102f6293 --- /dev/null +++ b/ydb/core/external_sources/object_storage_ut.cpp @@ -0,0 +1,34 @@ +#include "object_storage.h" + +#include <library/cpp/testing/unittest/registar.h> +#include <util/random/random.h> +#include <ydb/core/protos/external_sources.pb.h> + +namespace NKikimr { + +Y_UNIT_TEST_SUITE(ObjectStorageTest) { + Y_UNIT_TEST(SuccessValidation) { + auto source = NExternalSource::CreateObjectStorageExternalSource(); + NKikimrExternalSources::TSchema schema; + NKikimrExternalSources::TGeneral general; + UNIT_ASSERT_NO_EXCEPTION(source->Pack(schema, general)); + } + + Y_UNIT_TEST(FailedCreate) { + auto source = NExternalSource::CreateObjectStorageExternalSource(); + NKikimrExternalSources::TSchema schema; + NKikimrExternalSources::TGeneral general; + general.mutable_attributes()->insert({"a", "b"}); + UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Unknown attribute a"); + } + + Y_UNIT_TEST(FailedValidation) { + auto source = NExternalSource::CreateObjectStorageExternalSource(); + NKikimrExternalSources::TSchema schema; + NKikimrExternalSources::TGeneral general; + general.mutable_attributes()->insert({"projection.h", "b"}); + UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Partition by must always be specified"); + } +} + +} // NKikimr diff --git a/ydb/core/external_sources/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/external_sources/ut/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..612eb287c48 --- /dev/null +++ b/ydb/core/external_sources/ut/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,68 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-external_sources-ut) +target_include_directories(ydb-core-external_sources-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources +) +target_link_libraries(ydb-core-external_sources-ut PUBLIC + contrib-libs-cxxsupp + yutil + cpp-malloc-system + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-core-external_sources + udf-service-stub + yql-sql-pg_dummy +) +target_link_options(ydb-core-external_sources-ut PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-core-external_sources-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/object_storage_ut.cpp +) +set_property( + TARGET + ydb-core-external_sources-ut + PROPERTY + SPLIT_FACTOR + 1 +) +add_yunittest( + NAME + ydb-core-external_sources-ut + TEST_TARGET + ydb-core-external_sources-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-external_sources-ut + PROPERTY + LABELS + SMALL +) +set_yunittest_property( + TEST + ydb-core-external_sources-ut + PROPERTY + PROCESSORS + 1 +) +vcs_info(ydb-core-external_sources-ut) diff --git a/ydb/core/external_sources/ut/CMakeLists.linux-aarch64.txt b/ydb/core/external_sources/ut/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..effc768c6a2 --- /dev/null +++ b/ydb/core/external_sources/ut/CMakeLists.linux-aarch64.txt @@ -0,0 +1,70 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-external_sources-ut) +target_include_directories(ydb-core-external_sources-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources +) +target_link_libraries(ydb-core-external_sources-ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-malloc-jemalloc + cpp-testing-unittest_main + ydb-core-external_sources + udf-service-stub + yql-sql-pg_dummy +) +target_link_options(ydb-core-external_sources-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-external_sources-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/object_storage_ut.cpp +) +set_property( + TARGET + ydb-core-external_sources-ut + PROPERTY + SPLIT_FACTOR + 1 +) +add_yunittest( + NAME + ydb-core-external_sources-ut + TEST_TARGET + ydb-core-external_sources-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-external_sources-ut + PROPERTY + LABELS + SMALL +) +set_yunittest_property( + TEST + ydb-core-external_sources-ut + PROPERTY + PROCESSORS + 1 +) +vcs_info(ydb-core-external_sources-ut) diff --git a/ydb/core/external_sources/ut/CMakeLists.linux-x86_64.txt b/ydb/core/external_sources/ut/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..79f8a47944c --- /dev/null +++ b/ydb/core/external_sources/ut/CMakeLists.linux-x86_64.txt @@ -0,0 +1,72 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-external_sources-ut) +target_include_directories(ydb-core-external_sources-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources +) +target_link_libraries(ydb-core-external_sources-ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-core-external_sources + udf-service-stub + yql-sql-pg_dummy +) +target_link_options(ydb-core-external_sources-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-external_sources-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/object_storage_ut.cpp +) +set_property( + TARGET + ydb-core-external_sources-ut + PROPERTY + SPLIT_FACTOR + 1 +) +add_yunittest( + NAME + ydb-core-external_sources-ut + TEST_TARGET + ydb-core-external_sources-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-external_sources-ut + PROPERTY + LABELS + SMALL +) +set_yunittest_property( + TEST + ydb-core-external_sources-ut + PROPERTY + PROCESSORS + 1 +) +vcs_info(ydb-core-external_sources-ut) diff --git a/ydb/core/external_sources/ut/CMakeLists.txt b/ydb/core/external_sources/ut/CMakeLists.txt new file mode 100644 index 00000000000..d90657116d0 --- /dev/null +++ b/ydb/core/external_sources/ut/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/external_sources/ut/CMakeLists.windows-x86_64.txt b/ydb/core/external_sources/ut/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..b147d30121b --- /dev/null +++ b/ydb/core/external_sources/ut/CMakeLists.windows-x86_64.txt @@ -0,0 +1,60 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-external_sources-ut) +target_include_directories(ydb-core-external_sources-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources +) +target_link_libraries(ydb-core-external_sources-ut PUBLIC + contrib-libs-cxxsupp + yutil + cpp-malloc-system + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-core-external_sources + udf-service-stub + yql-sql-pg_dummy +) +target_sources(ydb-core-external_sources-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/external_sources/object_storage_ut.cpp +) +set_property( + TARGET + ydb-core-external_sources-ut + PROPERTY + SPLIT_FACTOR + 1 +) +add_yunittest( + NAME + ydb-core-external_sources-ut + TEST_TARGET + ydb-core-external_sources-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-external_sources-ut + PROPERTY + LABELS + SMALL +) +set_yunittest_property( + TEST + ydb-core-external_sources-ut + PROPERTY + PROCESSORS + 1 +) +vcs_info(ydb-core-external_sources-ut) diff --git a/ydb/core/external_sources/ut/ya.make b/ydb/core/external_sources/ut/ya.make new file mode 100644 index 00000000000..66515b54f3f --- /dev/null +++ b/ydb/core/external_sources/ut/ya.make @@ -0,0 +1,12 @@ +UNITTEST_FOR(ydb/core/external_sources) + +PEERDIR( + ydb/library/yql/public/udf/service/stub + ydb/library/yql/sql/pg_dummy +) + +SRCS( + object_storage_ut.cpp +) + +END() diff --git a/ydb/core/external_sources/ya.make b/ydb/core/external_sources/ya.make new file mode 100644 index 00000000000..27cc4665082 --- /dev/null +++ b/ydb/core/external_sources/ya.make @@ -0,0 +1,21 @@ +LIBRARY() + +SRCS( + external_source_factory.cpp + object_storage.cpp +) + +PEERDIR( + library/cpp/scheme + ydb/core/base + ydb/core/protos + ydb/library/yql/providers/s3/path_generator + ydb/public/sdk/cpp/client/ydb_params + ydb/public/sdk/cpp/client/ydb_value +) + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 59ea3cbad02..96b8c615492 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -12,6 +12,7 @@ #include <ydb/core/kqp/executer_actor/kqp_executer.h> #include <ydb/core/kqp/rm_service/kqp_snapshot_manager.h> #include <ydb/core/protos/console_config.pb.h> +#include <ydb/core/protos/external_sources.pb.h> #include <ydb/core/tx/schemeshard/schemeshard.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/grpc_services/table_settings.h> @@ -2232,6 +2233,7 @@ private: externalTableDesc.SetName(name); externalTableDesc.SetDataSourcePath(settings.DataSourcePath); externalTableDesc.SetLocation(settings.Location); + externalTableDesc.SetSourceType("General"); Y_ENSURE(settings.ColumnOrder.size() == settings.Columns.size()); for (const auto& name : settings.ColumnOrder) { @@ -2243,6 +2245,12 @@ private: columnDesc.SetType(columnIt->second.Type); columnDesc.SetNotNull(columnIt->second.NotNull); } + NKikimrExternalSources::TGeneral general; + auto& attributes = *general.mutable_attributes(); + for (const auto& [key, value]: settings.SourceTypeParameters) { + attributes.insert({key, value}); + } + externalTableDesc.SetContent(general.SerializeAsString()); } static void FillCreateExternalDataSourceDesc(NKikimrSchemeOp::TExternalDataSourceDescription& externaDataSourceDesc, diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 1f006b53864..030fdcdb7fa 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -3929,8 +3929,8 @@ Y_UNIT_TEST_SUITE(KqpScheme) { CREATE EXTERNAL TABLE `)" << externalTableName << R"(` ( Key Uint64, Value String, - year Int, - month Int + year Int64 NOT NULL, + month Int64 NOT NULL ) WITH ( DATA_SOURCE=")" << externalDataSourceName << R"(", LOCATION="/folder1/*", @@ -3946,7 +3946,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) { `projection.month.interval`="1", `projection.month.digits`="2", `storage.location.template`="${year}/${month}", - PARTITONED_BY = "[year, month]" + PARTITIONED_BY = "[year, month]" );)"; auto result = session.ExecuteSchemeQuery(query).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); diff --git a/ydb/core/protos/CMakeLists.darwin-x86_64.txt b/ydb/core/protos/CMakeLists.darwin-x86_64.txt index ca69a6fd6bc..2742c4d4e7c 100644 --- a/ydb/core/protos/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/protos/CMakeLists.darwin-x86_64.txt @@ -1490,6 +1490,18 @@ get_built_tool_path( cpp_styleguide ) get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) +get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency tools/enum_parser/enum_parser @@ -1572,6 +1584,7 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/datashard_load.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/drivemodel.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/export.proto + ${CMAKE_SOURCE_DIR}/ydb/core/protos/external_sources.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/flat_tx_scheme.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/flat_scheme_op.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/health.proto diff --git a/ydb/core/protos/CMakeLists.linux-aarch64.txt b/ydb/core/protos/CMakeLists.linux-aarch64.txt index 10f1106fe58..f127d672c5b 100644 --- a/ydb/core/protos/CMakeLists.linux-aarch64.txt +++ b/ydb/core/protos/CMakeLists.linux-aarch64.txt @@ -1490,6 +1490,18 @@ get_built_tool_path( cpp_styleguide ) get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) +get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency tools/enum_parser/enum_parser @@ -1573,6 +1585,7 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/datashard_load.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/drivemodel.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/export.proto + ${CMAKE_SOURCE_DIR}/ydb/core/protos/external_sources.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/flat_tx_scheme.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/flat_scheme_op.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/health.proto diff --git a/ydb/core/protos/CMakeLists.linux-x86_64.txt b/ydb/core/protos/CMakeLists.linux-x86_64.txt index 10f1106fe58..f127d672c5b 100644 --- a/ydb/core/protos/CMakeLists.linux-x86_64.txt +++ b/ydb/core/protos/CMakeLists.linux-x86_64.txt @@ -1490,6 +1490,18 @@ get_built_tool_path( cpp_styleguide ) get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) +get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency tools/enum_parser/enum_parser @@ -1573,6 +1585,7 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/datashard_load.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/drivemodel.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/export.proto + ${CMAKE_SOURCE_DIR}/ydb/core/protos/external_sources.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/flat_tx_scheme.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/flat_scheme_op.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/health.proto diff --git a/ydb/core/protos/CMakeLists.windows-x86_64.txt b/ydb/core/protos/CMakeLists.windows-x86_64.txt index ca69a6fd6bc..2742c4d4e7c 100644 --- a/ydb/core/protos/CMakeLists.windows-x86_64.txt +++ b/ydb/core/protos/CMakeLists.windows-x86_64.txt @@ -1490,6 +1490,18 @@ get_built_tool_path( cpp_styleguide ) get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) +get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency tools/enum_parser/enum_parser @@ -1572,6 +1584,7 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/datashard_load.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/drivemodel.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/export.proto + ${CMAKE_SOURCE_DIR}/ydb/core/protos/external_sources.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/flat_tx_scheme.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/flat_scheme_op.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/health.proto diff --git a/ydb/core/protos/external_sources.proto b/ydb/core/protos/external_sources.proto new file mode 100644 index 00000000000..6701e041637 --- /dev/null +++ b/ydb/core/protos/external_sources.proto @@ -0,0 +1,63 @@ +package NKikimrExternalSources; +option java_package = "ru.yandex.kikimr.proto"; + +import "ydb/public/api/protos/ydb_value.proto"; + +import "ydb/public/api/protos/annotations/sensitive.proto"; +import "ydb/public/api/protos/annotations/validation.proto"; + +message TSchema { + repeated Ydb.Column column = 1 [(Ydb.size).le = 100]; +} + +message TGeneral { + map<string, string> attributes = 1 [(Ydb.size).le = 100]; +} + +message TObjectStorage { + optional string format = 1 [(Ydb.length).le = 1024]; + map<string, string> format_setting = 2 [(Ydb.size).le = 100]; + optional string compression = 3 [(Ydb.length).le = 1024]; + + /* + Partition projection is used to speed up the processing of highly partitioned + storages and automate the management of partitions. In partition projection, partition values and + locations are calculated from configuration rather than read from an object storage. Depending on the + specific characteristics of the query and underlying data, partition projection can significantly + reduce query execution time if it uses partitioning constraints on partition metadata retrieval. Similar + functionality is implemented in Athena: https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html + Only enum, integer and date types are supported for path generation. When using projection, there must + be at least one element in partitioned_by. This behavior is introduced for symmetric query usage and + compatibility with Athena behavior. + + Example: + projection = { + "projection.enabled" : "true", // used to enable and disable partitioning + "projection.city.type" : "enum", // to generate the city column, the enum type will be used (enumeration of objects separated by commas) + "projection.city.values" : "Washington,Roma", // column values city Washington or Roma + "projection.code.type" : "enum", // to generate the code column, the enum type will be used (enumeration of objects separated by commas) + "projection.code.values" : "0,1", // column values code 0 or 1 + "storage.location.template" : "/${city}/${code}/${device_id}" // the template to which the generated values will be substituted + } + partitioned_by = [ "city", "device_id" ] // a subset of columns that are included in partitioning + - If storage.location.template and partitioned_by are specified together, then the rule from storage.location.template will be used. + - If only partitioned_by is specified, then the Hive Metastore format will be used for storage.location.template: "/city=${city}/device_id=${device_id}" + The list of paths that correspond to described projection and partitioned_by values are: + "/Washington/0/${device_id}", "/Washington/1/${device_id}", "/Roma/0/${device_id}", "/Roma/1/${device_id}" + */ + map<string, string> projection = 4; + + /* + By separating the data, it is possible to limit the amount of data scanned by each query, thereby improving + performance and reducing costs. Therefore, user data is partition by key (in practice, this is a partition by time). + The partitioned_by defines the keys on which to partition data. The columns described in partitioned_by + must be specified in the schema. If projection is not specified, the template will be generated according to + partitioned_by. Similar functionality is implemented in Athena: https://docs.aws.amazon.com/athena/latest/ug/partitions.html + + Example: + partitioned_by = [ "city", "code", "device_id" ] + The corresponding storage.location.template will be as follows: + "/city=${city}/code=${code}/device_id=${device_id}" + */ + repeated string partitioned_by = 5; +} diff --git a/ydb/core/protos/ya.make b/ydb/core/protos/ya.make index 2f975131888..d0918753599 100644 --- a/ydb/core/protos/ya.make +++ b/ydb/core/protos/ya.make @@ -55,6 +55,7 @@ SRCS( datashard_load.proto drivemodel.proto export.proto + external_sources.proto flat_tx_scheme.proto flat_scheme_op.proto health.proto diff --git a/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt index a949324968f..7cdb345fb8a 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt @@ -89,6 +89,7 @@ target_link_libraries(core-tx-schemeshard PUBLIC core-blockstore-core ydb-core-engine core-engine-minikql + ydb-core-external_sources core-filestore-core core-kesus-tablet ydb-core-metering diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt index 644a5c51b9d..a10c709ecd5 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt @@ -90,6 +90,7 @@ target_link_libraries(core-tx-schemeshard PUBLIC core-blockstore-core ydb-core-engine core-engine-minikql + ydb-core-external_sources core-filestore-core core-kesus-tablet ydb-core-metering diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt index 644a5c51b9d..a10c709ecd5 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt @@ -90,6 +90,7 @@ target_link_libraries(core-tx-schemeshard PUBLIC core-blockstore-core ydb-core-engine core-engine-minikql + ydb-core-external_sources core-filestore-core core-kesus-tablet ydb-core-metering diff --git a/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt index b53d9a03700..b066e2c3017 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt @@ -89,6 +89,7 @@ target_link_libraries(core-tx-schemeshard PUBLIC core-blockstore-core ydb-core-engine core-engine-minikql + ydb-core-external_sources core-filestore-core core-kesus-tablet ydb-core-metering diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp index f4cc731d9cd..e1ab61dadb0 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_external_table.cpp @@ -77,7 +77,23 @@ bool IsAllowedType(ui32 typeId) { return true; } -TExternalTableInfo::TPtr CreateExternalTable(const TString& sourceType, const NKikimrSchemeOp::TExternalTableDescription& desc, TString& errStr) { +Ydb::Type CreateYdbType(const NScheme::TTypeInfo& typeInfo, bool notNull) { + Ydb::Type ydbType; + if (typeInfo.GetTypeId() == NScheme::NTypeIds::Pg) { + auto* typeDesc = typeInfo.GetTypeDesc(); + auto* pg = ydbType.mutable_pg_type(); + pg->set_type_name(NPg::PgTypeNameFromTypeDesc(typeDesc)); + pg->set_oid(NPg::PgTypeIdFromTypeDesc(typeDesc)); + } else { + auto& item = notNull + ? ydbType + : *ydbType.mutable_optional_type()->mutable_item(); + item.set_type_id((Ydb::Type::PrimitiveTypeId)typeInfo.GetTypeId()); + } + return ydbType; +} + +TExternalTableInfo::TPtr CreateExternalTable(const TString& sourceType, const NKikimrSchemeOp::TExternalTableDescription& desc, const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& factory, TString& errStr) { if (!Validate(sourceType, desc, errStr)) { return nullptr; } @@ -90,12 +106,17 @@ TExternalTableInfo::TPtr CreateExternalTable(const TString& sourceType, const NK TExternalTableInfo::TPtr externalTableInfo = new TExternalTableInfo; const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; + if (desc.GetSourceType() != "General") { + errStr = "Only general data source has been supported as request"; + return nullptr; + } + externalTableInfo->DataSourcePath = desc.GetDataSourcePath(); externalTableInfo->Location = desc.GetLocation(); externalTableInfo->AlterVersion = 1; - externalTableInfo->Content = desc.GetContent(); externalTableInfo->SourceType = sourceType; + NKikimrExternalSources::TSchema schema; uint64_t nextColumnId = 1; for (const auto& col : desc.GetColumns()) { TString colName = col.GetName(); @@ -152,6 +173,20 @@ TExternalTableInfo::TPtr CreateExternalTable(const TString& sourceType, const NK TTableInfo::TColumn& column = externalTableInfo->Columns[colId]; column = TTableInfo::TColumn(colName, colId, typeInfo, ""); // TODO: do we need typeMod here? column.NotNull = col.GetNotNull(); + + auto& schemaColumn= *schema.add_column(); + schemaColumn.set_name(colName); + *schemaColumn.mutable_type() = CreateYdbType(typeInfo, col.GetNotNull()); + } + + try { + NKikimrExternalSources::TGeneral general; + general.ParseFromStringOrThrow(desc.GetContent()); + auto source = factory->GetOrCreate(sourceType); + externalTableInfo->Content = source->Pack(schema, general); + } catch (...) { + errStr = CurrentExceptionMessage(); + return nullptr; } return externalTableInfo; @@ -348,7 +383,7 @@ public: return result; } - TExternalTableInfo::TPtr externalTableInfo = CreateExternalTable(externalDataSource->SourceType, externalTableDescription, errStr); + TExternalTableInfo::TPtr externalTableInfo = CreateExternalTable(externalDataSource->SourceType, externalTableDescription, context.SS->ExternalSourceFactory, errStr); if (!externalTableInfo) { result->SetError(NKikimrScheme::StatusSchemeError, errStr); return result; diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 3f4656cc4bc..7f3046f3e76 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -25,6 +25,7 @@ #include <ydb/core/base/tx_processing.h> #include <ydb/core/cms/console/configs_dispatcher.h> #include <ydb/core/cms/console/console.h> +#include <ydb/core/external_sources/external_source_factory.h> #include <ydb/core/kesus/tablet/events.h> #include <ydb/core/persqueue/events/global.h> #include <ydb/core/protos/blockstore_config.pb.h> @@ -301,6 +302,8 @@ public: TActorId DelayedInitTenantDestination; TAutoPtr<TEvSchemeShard::TEvInitTenantSchemeShardResult> DelayedInitTenantReply; + NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory()}; + THolder<TProposeResponse> IgniteOperation(TProposeRequest& request, TOperationContext& context); THolder<TEvDataShard::TEvProposeTransaction> MakeDataShardProposal(const TPathId& pathId, const TOperationId& opId, const TString& body, const TActorContext& ctx) const; diff --git a/ydb/core/tx/schemeshard/ut_external_data_source.cpp b/ydb/core/tx/schemeshard/ut_external_data_source.cpp index e2719ae0fc5..34d97d94e80 100644 --- a/ydb/core/tx/schemeshard/ut_external_data_source.cpp +++ b/ydb/core/tx/schemeshard/ut_external_data_source.cpp @@ -324,6 +324,7 @@ Y_UNIT_TEST_SUITE(TExternalDataSourceTest) { TestCreateExternalTable(runtime, txId++, "/MyRoot", R"( Name: "ExternalTable" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "key" Type: "Uint64" } @@ -362,6 +363,7 @@ Y_UNIT_TEST_SUITE(TExternalDataSourceTest) { TestCreateExternalTable(runtime, txId++, "/MyRoot", R"( Name: "ExternalTable" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "key" Type: "Uint64" } diff --git a/ydb/core/tx/schemeshard/ut_external_table.cpp b/ydb/core/tx/schemeshard/ut_external_table.cpp index e8a355cbdba..7c8921a2813 100644 --- a/ydb/core/tx/schemeshard/ut_external_table.cpp +++ b/ydb/core/tx/schemeshard/ut_external_table.cpp @@ -33,6 +33,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTest) { CreateExternalDataSource(runtime, env, txId++); TestCreateExternalTable(runtime, txId++, "/MyRoot", R"( Name: "ExternalTable" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "key" Type: "Uint64" } @@ -51,6 +52,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTest) { CreateExternalDataSource(runtime, env, txId++); TestCreateExternalTable(runtime, txId++, "/MyRoot", R"( Name: "ExternalTable" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "key" Type: "Uint64" } @@ -99,6 +101,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTest) { auto createFn = [](TTestBasicRuntime& runtime, ui64 txId) { TestCreateExternalTable(runtime, txId, "/MyRoot", R"( Name: "ExternalTable" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "key" Type: "Uint64" } @@ -122,6 +125,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTest) { AsyncMkDir(runtime, ++txId, "/MyRoot", "DirA"); AsyncCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", R"( Name: "ExternalTable1" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "RowId" Type: "Uint64"} @@ -129,6 +133,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTest) { )"); AsyncCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", R"( Name: "ExternalTable2" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "key1" Type: "Uint32"} @@ -162,6 +167,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTest) { TString tableConfig = R"( Name: "NilNoviSubLuna" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "key" Type: "Uint64"} @@ -212,6 +218,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTest) { AsyncMkDir(runtime, ++txId, "/MyRoot", "SubDirA"); AsyncCreateExternalTable(runtime, ++txId, "/MyRoot", R"( Name: "ExternalTable1" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "RowId" Type: "Uint64"} @@ -236,6 +243,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTest) { TestMkDir(runtime, ++txId, "/MyRoot", "SubDirBBBB", {NKikimrScheme::StatusReadOnly}); TestCreateExternalTable(runtime, ++txId, "/MyRoot", R"( Name: "ExternalTable1" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "RowId" Type: "Uint64"} @@ -261,30 +269,35 @@ Y_UNIT_TEST_SUITE(TExternalTableTest) { CreateExternalDataSource(runtime, env, txId++); TestCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", R"( Name: "Table2" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "RowId" Type: "BlaBlaType"} )", {{NKikimrScheme::StatusSchemeError, "Type 'BlaBlaType' specified for column 'RowId' is not supported"}}); TestCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", R"( Name: "Table2" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "" Type: "Uint64"} )", {{NKikimrScheme::StatusSchemeError, "Columns cannot have an empty name"}}); TestCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", R"( Name: "Table2" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "RowId" TypeId: 27} )", {{NKikimrScheme::StatusSchemeError, "a"}}); TestCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", R"( Name: "Table2" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "RowId" } )", {{NKikimrScheme::StatusSchemeError, "Missing Type for column 'RowId'"}}); TestCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", R"( Name: "Table2" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "RowId" Type: "Uint64" Id: 2} @@ -292,6 +305,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTest) { )", {{NKikimrScheme::StatusSchemeError, "Duplicate column id: 2"}}); TestCreateExternalTable(runtime, ++txId, "/MyRoot/DirA", R"( Name: "Table2" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource1" Location: "/" Columns { Name: "RowId" Type: "Uint64"} diff --git a/ydb/core/tx/schemeshard/ut_external_table_reboots.cpp b/ydb/core/tx/schemeshard/ut_external_table_reboots.cpp index e24ca8a8629..10e5d12fafb 100644 --- a/ydb/core/tx/schemeshard/ut_external_table_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_external_table_reboots.cpp @@ -37,6 +37,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTestReboots) { AsyncMkDir(runtime, ++t.TxId, "/MyRoot", "DirExternalTable"); AsyncCreateExternalTable(runtime, ++t.TxId, "/MyRoot/DirExternalTable", R"( Name: "ExternalTable" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "a" Type: "Int32" NotNull: true } @@ -76,6 +77,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTestReboots) { CreateExternalDataSource(runtime, *t.TestEnv, t.TxId); AsyncCreateExternalTable(runtime, ++t.TxId, "/MyRoot", R"( Name: "DropMe" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "RowId" Type: "Uint64" } @@ -104,6 +106,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTestReboots) { TInactiveZone inactive(activeZone); TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot",R"( Name: "ExternalTable" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "a" Type: "Int32" NotNull: true } @@ -132,6 +135,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTestReboots) { TInactiveZone inactive(activeZone); TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot",R"( Name: "ExternalTable" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "a" Type: "Int32" NotNull: true } @@ -159,6 +163,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTestReboots) { TInactiveZone inactive(activeZone); TestCreateExternalTable(runtime, t.TxId, "/MyRoot", R"( Name: "ExternalTable" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "RowId" Type: "Uint64"} @@ -177,6 +182,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTestReboots) { TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot", R"( Name: "ExternalTable" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "RowId" Type: "Uint64"} @@ -201,6 +207,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTestReboots) { TInactiveZone inactive(activeZone); TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot", R"( Name: "ExternalTable" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "RowId" Type: "Uint64"} @@ -214,6 +221,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTestReboots) { TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot", R"( Name: "ExternalTable" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "RowId" Type: "Uint64"} @@ -238,6 +246,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTestReboots) { TInactiveZone inactive(activeZone); TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot", R"( Name: "ExternalTable" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "RowId" Type: "Uint64"} @@ -250,6 +259,7 @@ Y_UNIT_TEST_SUITE(TExternalTableTestReboots) { TestCreateExternalTable(runtime, ++t.TxId, "/MyRoot", R"( Name: "ExternalTable" + SourceType: "General" DataSourcePath: "/MyRoot/ExternalDataSource" Location: "/" Columns { Name: "RowId" Type: "Uint64"} diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 1ed5b79294c..3db90118558 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -239,6 +239,7 @@ PEERDIR( ydb/core/blockstore/core ydb/core/engine ydb/core/engine/minikql + ydb/core/external_sources ydb/core/filestore/core ydb/core/kesus/tablet ydb/core/metering diff --git a/ydb/core/ya.make b/ydb/core/ya.make index 962516ed94c..0a9590eecd7 100644 --- a/ydb/core/ya.make +++ b/ydb/core/ya.make @@ -11,6 +11,7 @@ RECURSE( discovery engine erasure + external_sources filestore fq formats |