aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/S3Queue/StorageS3Queue.cpp
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-11-14 09:58:56 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Storages/S3Queue/StorageS3Queue.cpp
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
downloadydb-c2b2dfd9827a400a8495e172a56343462e3ceb82.tar.gz
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Storages/S3Queue/StorageS3Queue.cpp')
-rw-r--r--contrib/clickhouse/src/Storages/S3Queue/StorageS3Queue.cpp616
1 files changed, 616 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Storages/S3Queue/StorageS3Queue.cpp b/contrib/clickhouse/src/Storages/S3Queue/StorageS3Queue.cpp
new file mode 100644
index 0000000000..b2e3dcd0a0
--- /dev/null
+++ b/contrib/clickhouse/src/Storages/S3Queue/StorageS3Queue.cpp
@@ -0,0 +1,616 @@
+#include "clickhouse_config.h"
+
+
+#if USE_AWS_S3
+
+# include <Databases/DatabaseReplicated.h>
+# include <IO/WriteBuffer.h>
+# include <IO/WriteHelpers.h>
+# include <Interpreters/InterpreterInsertQuery.h>
+# include <Processors/Executors/CompletedPipelineExecutor.h>
+# include <Common/ProfileEvents.h>
+# include <Common/ZooKeeper/ZooKeeper.h>
+# include <Common/isValidUTF8.h>
+# include "IO/ParallelReadBuffer.h"
+
+# include <Functions/FunctionsConversion.h>
+
+# include <IO/S3Common.h>
+
+# include <Interpreters/TreeRewriter.h>
+
+# include <Parsers/ASTFunction.h>
+# include <Parsers/ASTInsertQuery.h>
+
+# include <Storages/NamedCollectionsHelpers.h>
+# include <Storages/PartitionedSink.h>
+# include <Storages/S3Queue/S3QueueSource.h>
+# include <Storages/S3Queue/S3QueueTableMetadata.h>
+# include <Storages/S3Queue/StorageS3Queue.h>
+# include <Storages/StorageFactory.h>
+# include <Storages/StorageMaterializedView.h>
+# include <Storages/StorageS3.h>
+# include <Storages/StorageSnapshot.h>
+# include <Storages/VirtualColumnUtils.h>
+# include <Storages/prepareReadingFromFormat.h>
+# include <Common/NamedCollections/NamedCollections.h>
+
+
+# include <Formats/FormatFactory.h>
+
+# include <Processors/Formats/IInputFormat.h>
+# include <Processors/Formats/IOutputFormat.h>
+# include <Processors/Transforms/AddingDefaultsTransform.h>
+
+# include <QueryPipeline/QueryPipelineBuilder.h>
+
+# include <DataTypes/DataTypeString.h>
+
+# include <Common/parseGlobs.h>
+
+# include <filesystem>
+# include <Processors/ISource.h>
+# include <Processors/Sinks/SinkToStorage.h>
+# include <QueryPipeline/Pipe.h>
+
+namespace fs = std::filesystem;
+
+namespace ProfileEvents
+{
+extern const Event S3DeleteObjects;
+extern const Event S3ListObjects;
+}
+
+namespace DB
+{
+
+static const String PARTITION_ID_WILDCARD = "{_partition_id}";
+static const auto MAX_THREAD_WORK_DURATION_MS = 60000;
+
+namespace ErrorCodes
+{
+ extern const int LOGICAL_ERROR;
+ extern const int BAD_ARGUMENTS;
+ extern const int S3_ERROR;
+ extern const int NOT_IMPLEMENTED;
+ extern const int QUERY_NOT_ALLOWED;
+ extern const int REPLICA_ALREADY_EXISTS;
+ extern const int INCOMPATIBLE_COLUMNS;
+}
+
+
+StorageS3Queue::StorageS3Queue(
+ std::unique_ptr<S3QueueSettings> s3queue_settings_,
+ const StorageS3::Configuration & configuration_,
+ const StorageID & table_id_,
+ const ColumnsDescription & columns_,
+ const ConstraintsDescription & constraints_,
+ const String & comment,
+ ContextPtr context_,
+ std::optional<FormatSettings> format_settings_,
+ ASTPtr partition_by_)
+ : IStorage(table_id_)
+ , WithContext(context_)
+ , s3queue_settings(std::move(s3queue_settings_))
+ , after_processing(s3queue_settings->after_processing)
+ , configuration{configuration_}
+ , reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms)
+ , format_settings(format_settings_)
+ , partition_by(partition_by_)
+ , log(&Poco::Logger::get("StorageS3Queue (" + table_id_.table_name + ")"))
+{
+ if (configuration.url.key.ends_with('/'))
+ configuration.url.key += '*';
+
+ if (!withGlobs())
+ throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs");
+
+ std::string zk_path_prefix = getContext()->getSettingsRef().s3queue_default_zookeeper_path.value;
+ if (zk_path_prefix.empty())
+ zk_path_prefix = "/";
+
+ std::string result_zk_path;
+ if (s3queue_settings->keeper_path.changed)
+ {
+ /// We do not add table uuid here on purpose.
+ result_zk_path = fs::path(zk_path_prefix) / s3queue_settings->keeper_path.value;
+ }
+ else
+ {
+ auto database_uuid = DatabaseCatalog::instance().getDatabase(table_id_.database_name)->getUUID();
+ result_zk_path = fs::path(zk_path_prefix) / toString(database_uuid) / toString(table_id_.uuid);
+ }
+
+ zk_path = zkutil::extractZooKeeperPath(result_zk_path, true/* check_starts_with_slash */, log);
+ LOG_INFO(log, "Using zookeeper path: {}", zk_path);
+
+ FormatFactory::instance().checkFormatName(configuration.format);
+ context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration.url.uri);
+ StorageInMemoryMetadata storage_metadata;
+ configuration.update(context_);
+
+ if (columns_.empty())
+ {
+ auto columns = StorageS3::getTableStructureFromDataImpl(configuration, format_settings, context_);
+ storage_metadata.setColumns(columns);
+ }
+ else
+ storage_metadata.setColumns(columns_);
+
+ storage_metadata.setConstraints(constraints_);
+ storage_metadata.setComment(comment);
+ setInMemoryMetadata(storage_metadata);
+
+ auto metadata_snapshot = getInMemoryMetadataPtr();
+ const bool is_first_replica = createTableIfNotExists(metadata_snapshot);
+
+ if (!is_first_replica)
+ {
+ checkTableStructure(zk_path, metadata_snapshot);
+ }
+
+ files_metadata = std::make_shared<S3QueueFilesMetadata>(this, *s3queue_settings);
+ virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
+
+ auto poll_thread = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); });
+ task = std::make_shared<TaskContext>(std::move(poll_thread));
+}
+
+
+bool StorageS3Queue::supportsSubcolumns() const
+{
+ return true;
+}
+
+bool StorageS3Queue::supportsSubsetOfColumns(const ContextPtr & context_) const
+{
+ return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context_, format_settings);
+}
+
+Pipe StorageS3Queue::read(
+ const Names & column_names,
+ const StorageSnapshotPtr & storage_snapshot,
+ SelectQueryInfo & query_info,
+ ContextPtr local_context,
+ QueryProcessingStage::Enum /*processed_stage*/,
+ size_t max_block_size,
+ size_t /* num_streams */)
+{
+ if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select)
+ throw Exception(
+ ErrorCodes::QUERY_NOT_ALLOWED, "Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`");
+
+ if (mv_attached)
+ throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageS3Queue with attached materialized views");
+
+ auto query_configuration = updateConfigurationAndGetCopy(local_context);
+
+ std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(local_context, query_info.query);
+
+ auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
+
+ const size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
+
+ return Pipe(std::make_shared<StorageS3QueueSource>(
+ read_from_format_info,
+ configuration.format,
+ getName(),
+ local_context,
+ format_settings,
+ max_block_size,
+ query_configuration.request_settings,
+ configuration.compression_method,
+ query_configuration.client,
+ query_configuration.url.bucket,
+ query_configuration.url.version_id,
+ query_configuration.url.uri.getHost() + std::to_string(query_configuration.url.uri.getPort()),
+ iterator_wrapper,
+ files_metadata,
+ after_processing,
+ max_download_threads));
+}
+
+SinkToStoragePtr StorageS3Queue::write(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, bool)
+{
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Write is not supported by storage {}", getName());
+}
+
+void StorageS3Queue::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
+{
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported by storage {}", getName());
+}
+
+NamesAndTypesList StorageS3Queue::getVirtuals() const
+{
+ return virtual_columns;
+}
+
+bool StorageS3Queue::supportsPartitionBy() const
+{
+ return true;
+}
+
+void StorageS3Queue::startup()
+{
+ if (task)
+ task->holder->activateAndSchedule();
+}
+
+void StorageS3Queue::shutdown()
+{
+ shutdown_called = true;
+ if (task)
+ {
+ task->stream_cancelled = true;
+ task->holder->deactivate();
+ }
+}
+
+size_t StorageS3Queue::getTableDependentCount() const
+{
+ auto table_id = getStorageID();
+ // Check if at least one direct dependency is attached
+ return DatabaseCatalog::instance().getDependentViews(table_id).size();
+}
+
+bool StorageS3Queue::hasDependencies(const StorageID & table_id)
+{
+ // Check if all dependencies are attached
+ auto view_ids = DatabaseCatalog::instance().getDependentViews(table_id);
+ LOG_TEST(log, "Number of attached views {} for {}", view_ids.size(), table_id.getNameForLogs());
+
+ if (view_ids.empty())
+ return false;
+
+ // Check the dependencies are ready?
+ for (const auto & view_id : view_ids)
+ {
+ auto view = DatabaseCatalog::instance().tryGetTable(view_id, getContext());
+ if (!view)
+ return false;
+
+ // If it materialized view, check it's target table
+ auto * materialized_view = dynamic_cast<StorageMaterializedView *>(view.get());
+ if (materialized_view && !materialized_view->tryGetTargetTable())
+ return false;
+ }
+
+ return true;
+}
+
+void StorageS3Queue::threadFunc()
+{
+ bool reschedule = true;
+ try
+ {
+ auto table_id = getStorageID();
+
+ auto dependencies_count = getTableDependentCount();
+ if (dependencies_count)
+ {
+ auto start_time = std::chrono::steady_clock::now();
+
+ mv_attached.store(true);
+ // Keep streaming as long as there are attached views and streaming is not cancelled
+ while (!task->stream_cancelled)
+ {
+ if (!hasDependencies(table_id))
+ {
+ /// For this case, we can not wait for watch thread to wake up
+ reschedule = true;
+ break;
+ }
+
+ LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count);
+ streamToViews();
+
+ auto ts = std::chrono::steady_clock::now();
+ auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(ts - start_time);
+ if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
+ {
+ LOG_TRACE(log, "Thread work duration limit exceeded. Reschedule.");
+ reschedule = true;
+ break;
+ }
+
+ reschedule_processing_interval_ms = s3queue_settings->s3queue_polling_min_timeout_ms;
+ }
+ }
+ }
+ catch (...)
+ {
+ tryLogCurrentException(__PRETTY_FUNCTION__);
+ }
+
+ mv_attached.store(false);
+
+ if (reschedule && !shutdown_called)
+ {
+ LOG_TRACE(log, "Reschedule S3 Queue thread func.");
+ /// Reschedule with backoff.
+ if (reschedule_processing_interval_ms < s3queue_settings->s3queue_polling_max_timeout_ms)
+ reschedule_processing_interval_ms += s3queue_settings->s3queue_polling_backoff_ms;
+ task->holder->scheduleAfter(reschedule_processing_interval_ms);
+ }
+}
+
+
+void StorageS3Queue::streamToViews()
+{
+ auto table_id = getStorageID();
+ auto table = DatabaseCatalog::instance().getTable(table_id, getContext());
+ if (!table)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Engine table {} doesn't exist.", table_id.getNameForLogs());
+
+ auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr(), getContext());
+
+ // Create an INSERT query for streaming data
+ auto insert = std::make_shared<ASTInsertQuery>();
+ insert->table_id = table_id;
+
+ size_t block_size = 100;
+
+ auto s3queue_context = Context::createCopy(getContext());
+ s3queue_context->makeQueryContext();
+ auto query_configuration = updateConfigurationAndGetCopy(s3queue_context);
+
+ // Create a stream for each consumer and join them in a union stream
+ // Only insert into dependent views and expect that input blocks contain virtual columns
+ InterpreterInsertQuery interpreter(insert, s3queue_context, false, true, true);
+ auto block_io = interpreter.execute();
+ auto column_names = block_io.pipeline.getHeader().getNames();
+
+ // Create a stream for each consumer and join them in a union stream
+
+ std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(s3queue_context, nullptr);
+ auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(getContext()), getVirtuals());
+ const size_t max_download_threads = s3queue_context->getSettingsRef().max_download_threads;
+
+ auto pipe = Pipe(std::make_shared<StorageS3QueueSource>(
+ read_from_format_info,
+ configuration.format,
+ getName(),
+ s3queue_context,
+ format_settings,
+ block_size,
+ query_configuration.request_settings,
+ configuration.compression_method,
+ query_configuration.client,
+ query_configuration.url.bucket,
+ query_configuration.url.version_id,
+ query_configuration.url.uri.getHost() + std::to_string(query_configuration.url.uri.getPort()),
+ iterator_wrapper,
+ files_metadata,
+ after_processing,
+ max_download_threads));
+
+
+ std::atomic_size_t rows = 0;
+ {
+ block_io.pipeline.complete(std::move(pipe));
+ block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); });
+ CompletedPipelineExecutor executor(block_io.pipeline);
+ executor.execute();
+ }
+}
+
+StorageS3Queue::Configuration StorageS3Queue::updateConfigurationAndGetCopy(ContextPtr local_context)
+{
+ configuration.update(local_context);
+ return configuration;
+}
+
+zkutil::ZooKeeperPtr StorageS3Queue::getZooKeeper() const
+{
+ std::lock_guard lock{zk_mutex};
+ if (!zk_client || zk_client->expired())
+ {
+ zk_client = getContext()->getZooKeeper();
+ zk_client->sync(zk_path);
+ }
+ return zk_client;
+}
+
+
+bool StorageS3Queue::createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot)
+{
+ auto zookeeper = getZooKeeper();
+ zookeeper->createAncestors(zk_path);
+
+ for (size_t i = 0; i < zk_create_table_retries; ++i)
+ {
+ Coordination::Requests ops;
+ bool is_first_replica = true;
+ if (zookeeper->exists(zk_path + "/metadata"))
+ {
+ if (!zookeeper->exists(zk_path + "/processing"))
+ ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processing", "", zkutil::CreateMode::Ephemeral));
+ LOG_DEBUG(log, "This table {} is already created, will use existing metadata for checking engine settings", zk_path);
+ is_first_replica = false;
+ }
+ else
+ {
+ String metadata_str = S3QueueTableMetadata(configuration, *s3queue_settings).toString();
+ ops.emplace_back(zkutil::makeCreateRequest(zk_path, "", zkutil::CreateMode::Persistent));
+ ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processed", "", zkutil::CreateMode::Persistent));
+ ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/failed", "", zkutil::CreateMode::Persistent));
+ ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processing", "", zkutil::CreateMode::Ephemeral));
+ ops.emplace_back(zkutil::makeCreateRequest(
+ zk_path + "/columns", metadata_snapshot->getColumns().toString(), zkutil::CreateMode::Persistent));
+
+ ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/metadata", metadata_str, zkutil::CreateMode::Persistent));
+ }
+
+ Coordination::Responses responses;
+ auto code = zookeeper->tryMulti(ops, responses);
+ if (code == Coordination::Error::ZNODEEXISTS)
+ {
+ LOG_INFO(log, "It looks like the table {} was created by another server at the same moment, will retry", zk_path);
+ continue;
+ }
+ else if (code != Coordination::Error::ZOK)
+ {
+ zkutil::KeeperMultiException::check(code, ops, responses);
+ }
+
+ return is_first_replica;
+ }
+
+ throw Exception(
+ ErrorCodes::REPLICA_ALREADY_EXISTS,
+ "Cannot create table, because it is created concurrently every time or because "
+ "of wrong zk_path or because of logical error");
+}
+
+
+/** Verify that list of columns and table settings match those specified in ZK (/metadata).
+ * If not, throw an exception.
+ */
+void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot)
+{
+ auto zookeeper = getZooKeeper();
+
+ S3QueueTableMetadata old_metadata(configuration, *s3queue_settings);
+
+ Coordination::Stat metadata_stat;
+ String metadata_str = zookeeper->get(fs::path(zookeeper_prefix) / "metadata", &metadata_stat);
+ auto metadata_from_zk = S3QueueTableMetadata::parse(metadata_str);
+ old_metadata.checkEquals(metadata_from_zk);
+
+ Coordination::Stat columns_stat;
+ auto columns_from_zk = ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_prefix) / "columns", &columns_stat));
+
+ const ColumnsDescription & old_columns = metadata_snapshot->getColumns();
+ if (columns_from_zk != old_columns)
+ {
+ throw Exception(
+ ErrorCodes::INCOMPATIBLE_COLUMNS,
+ "Table columns structure in ZooKeeper is different from local table structure. Local columns:\n"
+ "{}\nZookeeper columns:\n{}",
+ old_columns.toString(),
+ columns_from_zk.toString());
+ }
+}
+
+
+std::shared_ptr<StorageS3QueueSource::IIterator>
+StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query)
+{
+ auto it = std::make_shared<StorageS3QueueSource::QueueGlobIterator>(
+ *configuration.client,
+ configuration.url,
+ query,
+ virtual_columns,
+ local_context,
+ s3queue_settings->s3queue_polling_size.value,
+ configuration.request_settings);
+
+ auto zookeeper = getZooKeeper();
+ auto lock = files_metadata->acquireLock(zookeeper);
+ S3QueueFilesMetadata::S3FilesCollection files_to_skip = files_metadata->getProcessedFailedAndProcessingFiles();
+
+ Strings files_to_process;
+ if (s3queue_settings->mode == S3QueueMode::UNORDERED)
+ {
+ files_to_process = it->filterProcessingFiles(s3queue_settings->mode, files_to_skip);
+ }
+ else
+ {
+ String max_processed_file = files_metadata->getMaxProcessedFile();
+ files_to_process = it->filterProcessingFiles(s3queue_settings->mode, files_to_skip, max_processed_file);
+ }
+
+ LOG_TEST(log, "Found files to process: {}", fmt::join(files_to_process, ", "));
+
+ files_metadata->setFilesProcessing(files_to_process);
+ return it;
+}
+
+void StorageS3Queue::drop()
+{
+ auto zookeeper = getZooKeeper();
+ if (zookeeper->exists(zk_path))
+ zookeeper->removeRecursive(zk_path);
+}
+
+void registerStorageS3QueueImpl(const String & name, StorageFactory & factory)
+{
+ factory.registerStorage(
+ name,
+ [](const StorageFactory::Arguments & args)
+ {
+ if (!args.attach && !args.getLocalContext()->getSettingsRef().allow_experimental_s3queue)
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3Queue is experimental. You can enable it with the `allow_experimental_s3queue` setting.");
+
+ auto & engine_args = args.engine_args;
+ if (engine_args.empty())
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
+ auto configuration = StorageS3::getConfiguration(engine_args, args.getLocalContext());
+
+ // Use format settings from global server context + settings from
+ // the SETTINGS clause of the create query. Settings from current
+ // session and user are ignored.
+ std::optional<FormatSettings> format_settings;
+
+ auto s3queue_settings = std::make_unique<S3QueueSettings>();
+ if (args.storage_def->settings)
+ {
+ s3queue_settings->loadFromQuery(*args.storage_def);
+ FormatFactorySettings user_format_settings;
+
+ // Apply changed settings from global context, but ignore the
+ // unknown ones, because we only have the format settings here.
+ const auto & changes = args.getContext()->getSettingsRef().changes();
+ for (const auto & change : changes)
+ {
+ if (user_format_settings.has(change.name))
+ user_format_settings.set(change.name, change.value);
+ else
+ LOG_TRACE(&Poco::Logger::get("StorageS3"), "Remove: {}", change.name);
+ args.storage_def->settings->changes.removeSetting(change.name);
+ }
+
+ for (const auto & change : args.storage_def->settings->changes)
+ {
+ if (user_format_settings.has(change.name))
+ user_format_settings.applyChange(change);
+ }
+ format_settings = getFormatSettings(args.getContext(), user_format_settings);
+ }
+ else
+ {
+ format_settings = getFormatSettings(args.getContext());
+ }
+
+ ASTPtr partition_by;
+ if (args.storage_def->partition_by)
+ partition_by = args.storage_def->partition_by->clone();
+
+ return std::make_shared<StorageS3Queue>(
+ std::move(s3queue_settings),
+ std::move(configuration),
+ args.table_id,
+ args.columns,
+ args.constraints,
+ args.comment,
+ args.getContext(),
+ format_settings,
+ partition_by);
+ },
+ {
+ .supports_settings = true,
+ .supports_sort_order = true, // for partition by
+ .supports_schema_inference = true,
+ .source_access_type = AccessType::S3,
+ });
+}
+
+void registerStorageS3Queue(StorageFactory & factory)
+{
+ return registerStorageS3QueueImpl("S3Queue", factory);
+}
+
+}
+
+
+#endif