diff options
| author | Alexander Smirnov <[email protected]> | 2025-04-25 19:17:12 +0000 |
|---|---|---|
| committer | Alexander Smirnov <[email protected]> | 2025-04-25 19:17:12 +0000 |
| commit | 38e151af5a0f4088b768a31b8857c4cd0101d9a3 (patch) | |
| tree | d111320f0f221260e2bf6d539281aa41773cf917 | |
| parent | 065dd6eaff1c9cca457433a5247645b56a2c3295 (diff) | |
| parent | 452d892ccb5ffd6b496939e339cf200e558ce9bc (diff) | |
Merge branch 'rightlib' into merge-libs-250425-1915
139 files changed, 1995 insertions, 809 deletions
diff --git a/build/conf/compilers/gnu_compiler.conf b/build/conf/compilers/gnu_compiler.conf index a1cbaaa84d5..dd6c2351b6a 100644 --- a/build/conf/compilers/gnu_compiler.conf +++ b/build/conf/compilers/gnu_compiler.conf @@ -97,7 +97,7 @@ when ($ARCH_XTENSA == "yes") { when ($OS_EMSCRIPTEN == "yes") { FSTACK=-fno-stack-protector - CFLAGS+=-D__EMSCRIPTEN__ + CFLAGS+=-D__EMSCRIPTEN__=1 CFLAGS+=-DSTANDALONE_WASM=1 } diff --git a/build/ymake.core.conf b/build/ymake.core.conf index 6f9a3110e69..945c1fb077f 100644 --- a/build/ymake.core.conf +++ b/build/ymake.core.conf @@ -5922,8 +5922,8 @@ macro LINK_EXCLUDE_LIBRARIES(Libs...) { ### ### GENERATE_IMPLIB(cuda $CUDA_TARGET_ROOT/lib64/stubs/libcuda.so SONAME libcuda.so.1) ### -macro GENERATE_IMPLIB(Lib, Path, SONAME="") { - .CMD=${tool:"contrib/tools/implib"} --target $HARDWARE_TYPE --outdir $BINDIR ${pre=--library-load-name :SONAME} $Path ${hide;output;suf=.init.c;nopath:Path} ${hide;output;suf=.tramp.S;nopath:Path} +macro GENERATE_IMPLIB(Lib, Path, SONAME="", DLOPEN_CALLBACK="_") { + .CMD=${tool:"contrib/tools/implib"} --target $HARDWARE_TYPE --dlopen-callback $DLOPEN_CALLBACK --outdir $BINDIR ${pre=--library-load-name :SONAME} $Path ${hide;output;suf=.init.c;nopath:Path} ${hide;output;suf=.tramp.S;nopath:Path} LINK_EXCLUDE_LIBRARIES($Lib) } diff --git a/contrib/libs/yajl/patch/fix_memory_leak.patch b/contrib/libs/yajl/patch/fix_memory_leak.patch new file mode 100644 index 00000000000..0ac5f0f3914 --- /dev/null +++ b/contrib/libs/yajl/patch/fix_memory_leak.patch @@ -0,0 +1,26 @@ +--- contrib/libs/yajl/yajl_tree.c (3b4a018b6b389390da3ee13f6b4ce0880cb71807) ++++ contrib/libs/yajl/yajl_tree.c (5fe0776c5f96630ddffb63e4ff0292037e12039e) +@@ -143,7 +143,7 @@ static yajl_val context_pop(context_t *ctx) + ctx->stack = stack->next; + + v = stack->value; +- ++ free (stack->key); + free (stack); + + return (v); +@@ -453,7 +453,14 @@ yajl_val yajl_tree_parse (const char *input, + (const unsigned char *) input, + strlen(input))); + } ++ while(ctx.stack != NULL) { ++ yajl_val v = context_pop(&ctx); ++ yajl_tree_free(v); ++ } + yajl_free (handle); ++ //If the requested memory is not released in time, it will cause memory leakage ++ if(ctx.root) ++ yajl_tree_free(ctx.root); + return NULL; + } + diff --git a/contrib/libs/yajl/uint64_support_r767862_review16855.patch b/contrib/libs/yajl/patch/uint64_support_r767862_review16855.patch index 62da2521b0f..62da2521b0f 100644 --- a/contrib/libs/yajl/uint64_support_r767862_review16855.patch +++ b/contrib/libs/yajl/patch/uint64_support_r767862_review16855.patch diff --git a/contrib/libs/yajl/yajl_tree.c b/contrib/libs/yajl/yajl_tree.c index c5874e660e0..6a9d9ffb1b5 100644 --- a/contrib/libs/yajl/yajl_tree.c +++ b/contrib/libs/yajl/yajl_tree.c @@ -143,7 +143,7 @@ static yajl_val context_pop(context_t *ctx) ctx->stack = stack->next; v = stack->value; - + free (stack->key); free (stack); return (v); @@ -453,7 +453,14 @@ yajl_val yajl_tree_parse (const char *input, (const unsigned char *) input, strlen(input))); } + while(ctx.stack != NULL) { + yajl_val v = context_pop(&ctx); + yajl_tree_free(v); + } yajl_free (handle); + //If the requested memory is not released in time, it will cause memory leakage + if(ctx.root) + yajl_tree_free(ctx.root); return NULL; } diff --git a/contrib/python/ydb/py3/.dist-info/METADATA b/contrib/python/ydb/py3/.dist-info/METADATA index 904414722ef..8a15b64dc4b 100644 --- a/contrib/python/ydb/py3/.dist-info/METADATA +++ b/contrib/python/ydb/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: ydb -Version: 3.20.1 +Version: 3.21.0 Summary: YDB Python SDK Home-page: http://github.com/ydb-platform/ydb-python-sdk Author: Yandex LLC diff --git a/contrib/python/ydb/py3/ya.make b/contrib/python/ydb/py3/ya.make index fbc5d148f8a..b938db3247b 100644 --- a/contrib/python/ydb/py3/ya.make +++ b/contrib/python/ydb/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(3.20.1) +VERSION(3.21.0) LICENSE(Apache-2.0) @@ -40,6 +40,7 @@ PY_SRCS( ydb/_topic_common/common.py ydb/_topic_reader/__init__.py ydb/_topic_reader/datatypes.py + ydb/_topic_reader/events.py ydb/_topic_reader/topic_reader.py ydb/_topic_reader/topic_reader_asyncio.py ydb/_topic_reader/topic_reader_sync.py diff --git a/contrib/python/ydb/py3/ydb/_apis.py b/contrib/python/ydb/py3/ydb/_apis.py index fc6f16e287c..b0fa8f3cb22 100644 --- a/contrib/python/ydb/py3/ydb/_apis.py +++ b/contrib/python/ydb/py3/ydb/_apis.py @@ -116,6 +116,7 @@ class TopicService(object): StreamRead = "StreamRead" StreamWrite = "StreamWrite" UpdateOffsetsInTransaction = "UpdateOffsetsInTransaction" + CommitOffset = "CommitOffset" class QueryService(object): diff --git a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py index 0f8a0f03a7a..6b3594b8da4 100644 --- a/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -136,6 +136,22 @@ class UpdateTokenResponse(IFromProto): return UpdateTokenResponse() +@dataclass +class CommitOffsetRequest(IToProto): + path: str + consumer: str + partition_id: int + offset: int + + def to_proto(self) -> ydb_topic_pb2.CommitOffsetRequest: + return ydb_topic_pb2.CommitOffsetRequest( + path=self.path, + consumer=self.consumer, + partition_id=self.partition_id, + offset=self.offset, + ) + + ######################################################################################################################## # StreamWrite ######################################################################################################################## @@ -438,12 +454,13 @@ class StreamReadMessage: @dataclass class InitRequest(IToProto): topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"] - consumer: str + consumer: Optional[str] auto_partitioning_support: bool def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest: res = ydb_topic_pb2.StreamReadMessage.InitRequest() - res.consumer = self.consumer + if self.consumer is not None: + res.consumer = self.consumer for settings in self.topics_read_settings: res.topics_read_settings.append(settings.to_proto()) res.auto_partitioning_support = self.auto_partitioning_support diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py b/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py index 74f06a086fc..737fa4149e2 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/datatypes.py @@ -56,6 +56,10 @@ class PublicMessage(ICommittable, ISessionAlive): def alive(self) -> bool: return not self._partition_session.closed + @property + def partition_id(self) -> int: + return self._partition_session.partition_id + @dataclass class PartitionSession: diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/events.py b/contrib/python/ydb/py3/ydb/_topic_reader/events.py new file mode 100644 index 00000000000..b229713c957 --- /dev/null +++ b/contrib/python/ydb/py3/ydb/_topic_reader/events.py @@ -0,0 +1,81 @@ +import asyncio +from dataclasses import dataclass +from typing import Awaitable, Union + +from ..issues import ClientInternalError + +__all__ = [ + "OnCommit", + "OnPartitionGetStartOffsetRequest", + "OnPartitionGetStartOffsetResponse", + "OnInitPartition", + "OnShutdownPartition", + "EventHandler", +] + + +class BaseReaderEvent: + pass + + +@dataclass +class OnCommit(BaseReaderEvent): + topic: str + offset: int + + +@dataclass +class OnPartitionGetStartOffsetRequest(BaseReaderEvent): + topic: str + partition_id: int + + +@dataclass +class OnPartitionGetStartOffsetResponse: + start_offset: int + + +class OnInitPartition(BaseReaderEvent): + pass + + +class OnShutdownPartition: + pass + + +TopicEventDispatchType = Union[OnPartitionGetStartOffsetResponse, None] + + +class EventHandler: + def on_commit(self, event: OnCommit) -> Union[None, Awaitable[None]]: + pass + + def on_partition_get_start_offset( + self, + event: OnPartitionGetStartOffsetRequest, + ) -> Union[OnPartitionGetStartOffsetResponse, Awaitable[OnPartitionGetStartOffsetResponse]]: + pass + + def on_init_partition(self, event: OnInitPartition) -> Union[None, Awaitable[None]]: + pass + + def on_shutdown_partition(self, event: OnShutdownPartition) -> Union[None, Awaitable[None]]: + pass + + async def _dispatch(self, event: BaseReaderEvent) -> Awaitable[TopicEventDispatchType]: + f = None + if isinstance(event, OnCommit): + f = self.on_commit + elif isinstance(event, OnPartitionGetStartOffsetRequest): + f = self.on_partition_get_start_offset + elif isinstance(event, OnInitPartition): + f = self.on_init_partition + elif isinstance(event, OnShutdownPartition): + f = self.on_shutdown_partition + else: + raise ClientInternalError("Unsupported topic reader event") + + if asyncio.iscoroutinefunction(f): + return await f(event) + + return f(event) diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py index 8bc12cc0d87..d477c9ca1bb 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader.py @@ -10,6 +10,7 @@ from typing import ( Callable, ) +from .events import EventHandler from ..retries import RetrySettings from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange @@ -20,6 +21,7 @@ class PublicTopicSelector: partitions: Optional[Union[int, List[int]]] = None read_from: Optional[datetime.datetime] = None max_lag: Optional[datetime.timedelta] = None + read_offset: Optional[int] = None def _to_topic_read_settings(self) -> StreamReadMessage.InitRequest.TopicReadSettings: partitions = self.partitions @@ -42,7 +44,7 @@ TopicSelectorTypes = Union[str, PublicTopicSelector, List[Union[str, PublicTopic @dataclass class PublicReaderSettings: - consumer: str + consumer: Optional[str] topic: TopicSelectorTypes buffer_size_bytes: int = 50 * 1024 * 1024 auto_partitioning_support: bool = True @@ -53,13 +55,14 @@ class PublicReaderSettings: # decoder_executor, must be set for handle non raw messages decoder_executor: Optional[concurrent.futures.Executor] = None update_token_interval: Union[int, float] = 3600 + event_handler: Optional[EventHandler] = None def __post_init__(self): # check possible create init message _ = self._init_message() def _init_message(self) -> StreamReadMessage.InitRequest: - if not isinstance(self.consumer, str): + if self.consumer is not None and not isinstance(self.consumer, str): raise TypeError("Unsupported type for customer field: '%s'" % type(self.consumer)) if isinstance(self.topic, list): @@ -85,25 +88,6 @@ class PublicReaderSettings: return RetrySettings(idempotent=True) -class Events: - class OnCommit: - topic: str - offset: int - - class OnPartitionGetStartOffsetRequest: - topic: str - partition_id: int - - class OnPartitionGetStartOffsetResponse: - start_offset: int - - class OnInitPartition: - pass - - class OnShutdownPatition: - pass - - class RetryPolicy: connection_timeout_sec: float overload_timeout_sec: float diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py index 87012554ef5..34c52108e17 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_asyncio.py @@ -15,6 +15,7 @@ from .._utilities import AtomicCounter from ..aio import Driver from ..issues import Error as YdbError, _process_response from . import datatypes +from . import events from . import topic_reader from .._grpc.grpcwrapper.common_utils import ( IGrpcWrapperAsyncIO, @@ -72,6 +73,7 @@ class TopicReaderClosedError(TopicReaderError): class PublicAsyncIOReader: _loop: asyncio.AbstractEventLoop _closed: bool + _settings: topic_reader.PublicReaderSettings _reconnector: ReaderReconnector _parent: typing.Any # need for prevent close parent client by GC @@ -84,6 +86,7 @@ class PublicAsyncIOReader: ): self._loop = asyncio.get_running_loop() self._closed = False + self._settings = settings self._reconnector = ReaderReconnector(driver, settings, self._loop) self._parent = _parent @@ -156,6 +159,9 @@ class PublicAsyncIOReader: For the method no way check the commit result (for example if lost connection - commits will not re-send and committed messages will receive again). """ + if self._settings.consumer is None: + raise issues.Error("Commit operations are not supported for topic reader without consumer.") + try: self._reconnector.commit(batch) except PublicTopicReaderPartitionExpiredError: @@ -171,6 +177,9 @@ class PublicAsyncIOReader: before receive commit ack. Message may be acked or not (if not - it will send in other read session, to this or other reader). """ + if self._settings.consumer is None: + raise issues.Error("Commit operations are not supported for topic reader without consumer.") + waiter = self._reconnector.commit(batch) await waiter.future @@ -393,6 +402,7 @@ class ReaderStream: _update_token_interval: Union[int, float] _update_token_event: asyncio.Event _get_token_function: Callable[[], str] + _settings: topic_reader.PublicReaderSettings def __init__( self, @@ -425,6 +435,8 @@ class ReaderStream: self._get_token_function = get_token_function self._update_token_event = asyncio.Event() + self._settings = settings + @staticmethod async def create( reader_reconnector_id: int, @@ -615,7 +627,7 @@ class ReaderStream: message.server_message, StreamReadMessage.StartPartitionSessionRequest, ): - self._on_start_partition_session(message.server_message) + await self._on_start_partition_session(message.server_message) elif isinstance( message.server_message, @@ -660,7 +672,7 @@ class ReaderStream: finally: self._update_token_event.clear() - def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionSessionRequest): + async def _on_start_partition_session(self, message: StreamReadMessage.StartPartitionSessionRequest): try: if message.partition_session.partition_session_id in self._partition_sessions: raise TopicReaderError( @@ -676,11 +688,23 @@ class ReaderStream: reader_reconnector_id=self._reader_reconnector_id, reader_stream_id=self._id, ) + + read_offset = None + + if self._settings.event_handler is not None: + resp = await self._settings.event_handler._dispatch( + events.OnPartitionGetStartOffsetRequest( + message.partition_session.path, + message.partition_session.partition_id, + ) + ) + read_offset = None if resp is None else resp.start_offset + self._stream.write( StreamReadMessage.FromClient( client_message=StreamReadMessage.StartPartitionSessionResponse( partition_session_id=message.partition_session.partition_session_id, - read_offset=None, + read_offset=read_offset, commit_offset=None, ) ), diff --git a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py index 31f28899271..bb2fc2a34b0 100644 --- a/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py +++ b/contrib/python/ydb/py3/ydb/_topic_reader/topic_reader_sync.py @@ -4,6 +4,7 @@ import logging import typing from typing import List, Union, Optional +from ydb import issues from ydb._grpc.grpcwrapper.common_utils import SupportedDriverType from ydb._topic_common.common import ( _get_shared_event_loop, @@ -31,6 +32,7 @@ class TopicReaderSync: _caller: CallFromSyncToAsync _async_reader: PublicAsyncIOReader _closed: bool + _settings: PublicReaderSettings _parent: typing.Any # need for prevent stop the client by GC def __init__( @@ -55,6 +57,8 @@ class TopicReaderSync: self._async_reader = asyncio.run_coroutine_threadsafe(create_reader(), loop).result() + self._settings = settings + self._parent = _parent def __del__(self): @@ -154,6 +158,9 @@ class TopicReaderSync: """ self._check_closed() + if self._settings.consumer is None: + raise issues.Error("Commit operations are not supported for topic reader without consumer.") + self._caller.call_sync(lambda: self._async_reader.commit(mess)) def commit_with_ack( @@ -168,6 +175,9 @@ class TopicReaderSync: """ self._check_closed() + if self._settings.consumer is None: + raise issues.Error("Commit operations are not supported for topic reader without consumer.") + return self._caller.unsafe_call_with_result(self._async_reader.commit_with_ack(mess), timeout) def async_commit_with_ack( @@ -178,6 +188,9 @@ class TopicReaderSync: """ self._check_closed() + if self._settings.consumer is None: + raise issues.Error("Commit operations are not supported for topic reader without consumer.") + return self._caller.unsafe_call_with_future(self._async_reader.commit_with_ack(mess)) def close(self, *, flush: bool = True, timeout: TimeoutType = None): diff --git a/contrib/python/ydb/py3/ydb/aio/query/pool.py b/contrib/python/ydb/py3/ydb/aio/query/pool.py index f1ca68d1cf0..b691a1b111c 100644 --- a/contrib/python/ydb/py3/ydb/aio/query/pool.py +++ b/contrib/python/ydb/py3/ydb/aio/query/pool.py @@ -142,7 +142,7 @@ class QuerySessionPool: """Special interface to execute a bunch of commands with transaction in a safe, retriable way. :param callee: A function, that works with session. - :param tx_mode: Transaction mode, which is a one from the following choises: + :param tx_mode: Transaction mode, which is a one from the following choices: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); diff --git a/contrib/python/ydb/py3/ydb/aio/query/session.py b/contrib/python/ydb/py3/ydb/aio/query/session.py index 0561de8c391..fe857878a54 100644 --- a/contrib/python/ydb/py3/ydb/aio/query/session.py +++ b/contrib/python/ydb/py3/ydb/aio/query/session.py @@ -117,15 +117,22 @@ class QuerySession(BaseQuerySession): exec_mode: base.QueryExecMode = None, concurrent_result_sets: bool = False, settings: Optional[BaseRequestSettings] = None, + *, + stats_mode: Optional[base.QueryStatsMode] = None, ) -> AsyncResponseContextIterator: """Sends a query to Query Service :param query: (YQL or SQL text) to be executed. - :param syntax: Syntax of the query, which is a one from the following choises: + :param syntax: Syntax of the query, which is a one from the following choices: 1) QuerySyntax.YQL_V1, which is default; 2) QuerySyntax.PG. :param parameters: dict with parameters and YDB types; :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + :param stats_mode: Mode of query statistics to gather, which is a one from the following choices: + 1) QueryStatsMode:NONE, which is default; + 2) QueryStatsMode.BASIC; + 3) QueryStatsMode.FULL; + 4) QueryStatsMode.PROFILE; :return: Iterator with result sets """ @@ -133,10 +140,11 @@ class QuerySession(BaseQuerySession): stream_it = await self._execute_call( query=query, + parameters=parameters, commit_tx=True, syntax=syntax, exec_mode=exec_mode, - parameters=parameters, + stats_mode=stats_mode, concurrent_result_sets=concurrent_result_sets, settings=settings, ) @@ -147,6 +155,7 @@ class QuerySession(BaseQuerySession): rpc_state=None, response_pb=resp, session_state=self._state, + session=self, settings=self._settings, ), ) diff --git a/contrib/python/ydb/py3/ydb/aio/query/transaction.py b/contrib/python/ydb/py3/ydb/aio/query/transaction.py index f0547e5f01f..c9a6e445c93 100644 --- a/contrib/python/ydb/py3/ydb/aio/query/transaction.py +++ b/contrib/python/ydb/py3/ydb/aio/query/transaction.py @@ -29,7 +29,7 @@ class QueryTxContext(BaseQueryTxContext): :param driver: A driver instance :param session_state: A state of session - :param tx_mode: Transaction mode, which is a one from the following choises: + :param tx_mode: Transaction mode, which is a one from the following choices: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); @@ -142,21 +142,28 @@ class QueryTxContext(BaseQueryTxContext): exec_mode: Optional[base.QueryExecMode] = None, concurrent_result_sets: Optional[bool] = False, settings: Optional[BaseRequestSettings] = None, + *, + stats_mode: Optional[base.QueryStatsMode] = None, ) -> AsyncResponseContextIterator: """Sends a query to Query Service :param query: (YQL or SQL text) to be executed. :param parameters: dict with parameters and YDB types; :param commit_tx: A special flag that allows transaction commit. - :param syntax: Syntax of the query, which is a one from the following choises: + :param syntax: Syntax of the query, which is a one from the following choices: 1) QuerySyntax.YQL_V1, which is default; 2) QuerySyntax.PG. - :param exec_mode: Exec mode of the query, which is a one from the following choises: + :param exec_mode: Exec mode of the query, which is a one from the following choices: 1) QueryExecMode.EXECUTE, which is default; 2) QueryExecMode.EXPLAIN; 3) QueryExecMode.VALIDATE; 4) QueryExecMode.PARSE. :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + :param stats_mode: Mode of query statistics to gather, which is a one from the following choices: + 1) QueryStatsMode:NONE, which is default; + 2) QueryStatsMode.BASIC; + 3) QueryStatsMode.FULL; + 4) QueryStatsMode.PROFILE; :return: Iterator with result sets """ @@ -164,10 +171,11 @@ class QueryTxContext(BaseQueryTxContext): stream_it = await self._execute_call( query=query, + parameters=parameters, commit_tx=commit_tx, syntax=syntax, exec_mode=exec_mode, - parameters=parameters, + stats_mode=stats_mode, concurrent_result_sets=concurrent_result_sets, settings=settings, ) diff --git a/contrib/python/ydb/py3/ydb/iam/auth.py b/contrib/python/ydb/py3/ydb/iam/auth.py index c4096c096bf..688dededfca 100644 --- a/contrib/python/ydb/py3/ydb/iam/auth.py +++ b/contrib/python/ydb/py3/ydb/iam/auth.py @@ -3,7 +3,7 @@ from ydb import credentials, tracing import grpc import time import abc -from datetime import datetime +from datetime import datetime, timezone import json import os @@ -43,8 +43,8 @@ YANDEX_CLOUD_JWT_ALGORITHM = "PS256" def get_jwt(account_id, access_key_id, private_key, jwt_expiration_timeout, algorithm, token_service_url, subject=None): assert jwt is not None, "Install pyjwt library to use jwt tokens" now = time.time() - now_utc = datetime.utcfromtimestamp(now) - exp_utc = datetime.utcfromtimestamp(now + jwt_expiration_timeout) + now_utc = datetime.fromtimestamp(now, timezone.utc) + exp_utc = datetime.fromtimestamp(now + jwt_expiration_timeout, timezone.utc) payload = { "iss": account_id, "aud": token_service_url, diff --git a/contrib/python/ydb/py3/ydb/query/__init__.py b/contrib/python/ydb/py3/ydb/query/__init__.py index 59dd799294b..76436f98353 100644 --- a/contrib/python/ydb/py3/ydb/query/__init__.py +++ b/contrib/python/ydb/py3/ydb/query/__init__.py @@ -7,6 +7,7 @@ __all__ = [ "QuerySessionPool", "QueryClientSettings", "QuerySession", + "QueryStatsMode", "QueryTxContext", ] @@ -14,6 +15,7 @@ import logging from .base import ( QueryClientSettings, + QueryStatsMode, ) from .session import QuerySession diff --git a/contrib/python/ydb/py3/ydb/query/base.py b/contrib/python/ydb/py3/ydb/query/base.py index a5ebedd95b3..52a6312e3be 100644 --- a/contrib/python/ydb/py3/ydb/query/base.py +++ b/contrib/python/ydb/py3/ydb/query/base.py @@ -25,6 +25,7 @@ from ydb._grpc.grpcwrapper.common_utils import to_thread if typing.TYPE_CHECKING: from .transaction import BaseQueryTxContext + from .session import BaseQuerySession class QuerySyntax(enum.IntEnum): @@ -41,7 +42,7 @@ class QueryExecMode(enum.IntEnum): EXECUTE = 50 -class StatsMode(enum.IntEnum): +class QueryStatsMode(enum.IntEnum): UNSPECIFIED = 0 NONE = 10 BASIC = 20 @@ -132,12 +133,13 @@ def create_execute_query_request( tx_mode: Optional[BaseQueryTxMode], syntax: Optional[QuerySyntax], exec_mode: Optional[QueryExecMode], + stats_mode: Optional[QueryStatsMode], parameters: Optional[dict], concurrent_result_sets: Optional[bool], ) -> ydb_query.ExecuteQueryRequest: syntax = QuerySyntax.YQL_V1 if not syntax else syntax exec_mode = QueryExecMode.EXECUTE if not exec_mode else exec_mode - stats_mode = StatsMode.NONE # TODO: choise is not supported yet + stats_mode = QueryStatsMode.NONE if stats_mode is None else stats_mode tx_control = None if not tx_id and not tx_mode: @@ -189,6 +191,7 @@ def wrap_execute_query_response( response_pb: _apis.ydb_query.ExecuteQueryResponsePart, session_state: IQuerySessionState, tx: Optional["BaseQueryTxContext"] = None, + session: Optional["BaseQuerySession"] = None, commit_tx: Optional[bool] = False, settings: Optional[QueryClientSettings] = None, ) -> convert.ResultSet: @@ -198,6 +201,12 @@ def wrap_execute_query_response( elif tx and response_pb.tx_meta and not tx.tx_id: tx._move_to_beginned(response_pb.tx_meta.id) + if response_pb.HasField("exec_stats"): + if tx is not None: + tx._last_query_stats = response_pb.exec_stats + if session is not None: + session._last_query_stats = response_pb.exec_stats + if response_pb.HasField("result_set"): return convert.ResultSet.from_message(response_pb.result_set, settings) diff --git a/contrib/python/ydb/py3/ydb/query/pool.py b/contrib/python/ydb/py3/ydb/query/pool.py index b25f7db855c..1cf95ac0d13 100644 --- a/contrib/python/ydb/py3/ydb/query/pool.py +++ b/contrib/python/ydb/py3/ydb/query/pool.py @@ -151,7 +151,7 @@ class QuerySessionPool: """Special interface to execute a bunch of commands with transaction in a safe, retriable way. :param callee: A function, that works with session. - :param tx_mode: Transaction mode, which is a one from the following choises: + :param tx_mode: Transaction mode, which is a one from the following choices: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); diff --git a/contrib/python/ydb/py3/ydb/query/session.py b/contrib/python/ydb/py3/ydb/query/session.py index 382c922d5e6..3cc6c13d4e9 100644 --- a/contrib/python/ydb/py3/ydb/query/session.py +++ b/contrib/python/ydb/py3/ydb/query/session.py @@ -147,6 +147,12 @@ class BaseQuerySession: .with_timeout(DEFAULT_ATTACH_LONG_TIMEOUT) ) + self._last_query_stats = None + + @property + def last_query_stats(self): + return self._last_query_stats + def _get_client_settings( self, driver: common_utils.SupportedDriverType, @@ -189,22 +195,26 @@ class BaseQuerySession: def _execute_call( self, query: str, + parameters: dict = None, commit_tx: bool = False, syntax: base.QuerySyntax = None, exec_mode: base.QueryExecMode = None, - parameters: dict = None, + stats_mode: Optional[base.QueryStatsMode] = None, concurrent_result_sets: bool = False, settings: Optional[BaseRequestSettings] = None, ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: + self._last_query_stats = None + request = base.create_execute_query_request( query=query, - session_id=self._state.session_id, + parameters=parameters, commit_tx=commit_tx, + session_id=self._state.session_id, tx_mode=None, tx_id=None, syntax=syntax, exec_mode=exec_mode, - parameters=parameters, + stats_mode=stats_mode, concurrent_result_sets=concurrent_result_sets, ) @@ -293,7 +303,7 @@ class QuerySession(BaseQuerySession): def transaction(self, tx_mode: Optional[base.BaseQueryTxMode] = None) -> QueryTxContext: """Creates a transaction context manager with specified transaction mode. - :param tx_mode: Transaction mode, which is a one from the following choises: + :param tx_mode: Transaction mode, which is a one from the following choices: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); @@ -321,15 +331,22 @@ class QuerySession(BaseQuerySession): exec_mode: base.QueryExecMode = None, concurrent_result_sets: bool = False, settings: Optional[BaseRequestSettings] = None, + *, + stats_mode: Optional[base.QueryStatsMode] = None, ) -> base.SyncResponseContextIterator: """Sends a query to Query Service :param query: (YQL or SQL text) to be executed. - :param syntax: Syntax of the query, which is a one from the following choises: + :param syntax: Syntax of the query, which is a one from the following choices: 1) QuerySyntax.YQL_V1, which is default; 2) QuerySyntax.PG. :param parameters: dict with parameters and YDB types; :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + :param stats_mode: Mode of query statistics to gather, which is a one from the following choices: + 1) QueryStatsMode:NONE, which is default; + 2) QueryStatsMode.BASIC; + 3) QueryStatsMode.FULL; + 4) QueryStatsMode.PROFILE; :return: Iterator with result sets """ @@ -337,10 +354,11 @@ class QuerySession(BaseQuerySession): stream_it = self._execute_call( query=query, + parameters=parameters, commit_tx=True, syntax=syntax, exec_mode=exec_mode, - parameters=parameters, + stats_mode=stats_mode, concurrent_result_sets=concurrent_result_sets, settings=settings, ) @@ -351,6 +369,7 @@ class QuerySession(BaseQuerySession): rpc_state=None, response_pb=resp, session_state=self._state, + session=self, settings=self._settings, ), ) diff --git a/contrib/python/ydb/py3/ydb/query/transaction.py b/contrib/python/ydb/py3/ydb/query/transaction.py index ae7642dbe21..008ac7c404f 100644 --- a/contrib/python/ydb/py3/ydb/query/transaction.py +++ b/contrib/python/ydb/py3/ydb/query/transaction.py @@ -197,7 +197,7 @@ class BaseQueryTxContext(base.CallbackHandler): :param driver: A driver instance :param session_state: A state of session - :param tx_mode: Transaction mode, which is a one from the following choises: + :param tx_mode: Transaction mode, which is a one from the following choices: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); @@ -210,6 +210,7 @@ class BaseQueryTxContext(base.CallbackHandler): self.session = session self._prev_stream = None self._external_error = None + self._last_query_stats = None @property def session_id(self) -> str: @@ -229,6 +230,10 @@ class BaseQueryTxContext(base.CallbackHandler): """ return self._tx_state.tx_id + @property + def last_query_stats(self): + return self._last_query_stats + def _tx_identity(self) -> _ydb_topic.TransactionIdentity: if not self.tx_id: raise RuntimeError("Unable to get tx identity without started tx.") @@ -283,25 +288,29 @@ class BaseQueryTxContext(base.CallbackHandler): def _execute_call( self, query: str, + parameters: Optional[dict], commit_tx: Optional[bool], syntax: Optional[base.QuerySyntax], exec_mode: Optional[base.QueryExecMode], - parameters: Optional[dict], + stats_mode: Optional[base.QueryStatsMode], concurrent_result_sets: Optional[bool], settings: Optional[BaseRequestSettings], ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: self._tx_state._check_tx_ready_to_use() self._check_external_error_set() + self._last_query_stats = None + request = base.create_execute_query_request( query=query, - session_id=self._session_state.session_id, + parameters=parameters, commit_tx=commit_tx, + session_id=self._session_state.session_id, tx_id=self._tx_state.tx_id, tx_mode=self._tx_state.tx_mode, syntax=syntax, exec_mode=exec_mode, - parameters=parameters, + stats_mode=stats_mode, concurrent_result_sets=concurrent_result_sets, ) @@ -338,7 +347,7 @@ class QueryTxContext(BaseQueryTxContext): :param driver: A driver instance :param session_state: A state of session - :param tx_mode: Transaction mode, which is a one from the following choises: + :param tx_mode: Transaction mode, which is a one from the following choices: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); @@ -451,22 +460,29 @@ class QueryTxContext(BaseQueryTxContext): exec_mode: Optional[base.QueryExecMode] = None, concurrent_result_sets: Optional[bool] = False, settings: Optional[BaseRequestSettings] = None, + *, + stats_mode: Optional[base.QueryStatsMode] = None, ) -> base.SyncResponseContextIterator: """Sends a query to Query Service :param query: (YQL or SQL text) to be executed. :param parameters: dict with parameters and YDB types; :param commit_tx: A special flag that allows transaction commit. - :param syntax: Syntax of the query, which is a one from the following choises: + :param syntax: Syntax of the query, which is a one from the following choices: 1) QuerySyntax.YQL_V1, which is default; 2) QuerySyntax.PG. - :param exec_mode: Exec mode of the query, which is a one from the following choises: + :param exec_mode: Exec mode of the query, which is a one from the following choices: 1) QueryExecMode.EXECUTE, which is default; 2) QueryExecMode.EXPLAIN; 3) QueryExecMode.VALIDATE; 4) QueryExecMode.PARSE. :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; :param settings: An additional request settings BaseRequestSettings; + :param stats_mode: Mode of query statistics to gather, which is a one from the following choices: + 1) QueryStatsMode:NONE, which is default; + 2) QueryStatsMode.BASIC; + 3) QueryStatsMode.FULL; + 4) QueryStatsMode.PROFILE; :return: Iterator with result sets """ @@ -477,6 +493,7 @@ class QueryTxContext(BaseQueryTxContext): commit_tx=commit_tx, syntax=syntax, exec_mode=exec_mode, + stats_mode=stats_mode, parameters=parameters, concurrent_result_sets=concurrent_result_sets, settings=settings, diff --git a/contrib/python/ydb/py3/ydb/topic.py b/contrib/python/ydb/py3/ydb/topic.py index a501f9d2750..ceb82efb5dc 100644 --- a/contrib/python/ydb/py3/ydb/topic.py +++ b/contrib/python/ydb/py3/ydb/topic.py @@ -16,6 +16,7 @@ __all__ = [ "TopicReader", "TopicReaderAsyncIO", "TopicReaderBatch", + "TopicReaderEvents", "TopicReaderMessage", "TopicReaderSelector", "TopicReaderSettings", @@ -42,6 +43,8 @@ from . import aio, Credentials, _apis, issues from . import driver +from ._topic_reader import events as TopicReaderEvents + from ._topic_reader.datatypes import ( PublicBatch as TopicReaderBatch, PublicMessage as TopicReaderMessage, @@ -52,7 +55,9 @@ from ._topic_reader.topic_reader import ( PublicTopicSelector as TopicReaderSelector, ) -from ._topic_reader.topic_reader_sync import TopicReaderSync as TopicReader +from ._topic_reader.topic_reader_sync import ( + TopicReaderSync as TopicReader, +) from ._topic_reader.topic_reader_asyncio import ( PublicAsyncIOReader as TopicReaderAsyncIO, @@ -240,7 +245,7 @@ class TopicClientAsyncIO: def reader( self, topic: Union[str, TopicReaderSelector, List[Union[str, TopicReaderSelector]]], - consumer: str, + consumer: Optional[str], buffer_size_bytes: int = 50 * 1024 * 1024, # decoders: map[codec_code] func(encoded_bytes)->decoded_bytes # the func will be called from multiply threads in parallel @@ -249,6 +254,7 @@ class TopicClientAsyncIO: # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel decoder_executor: Optional[concurrent.futures.Executor] = None, auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True. + event_handler: Optional[TopicReaderEvents.EventHandler] = None, ) -> TopicReaderAsyncIO: if not decoder_executor: @@ -257,6 +263,23 @@ class TopicClientAsyncIO: args = locals().copy() del args["self"] + if consumer == "": + raise issues.Error( + "Consumer name could not be empty! To use reader without consumer specify consumer as None." + ) + + if consumer is None: + if not isinstance(topic, TopicReaderSelector) or topic.partitions is None: + raise issues.Error( + "To use reader without consumer it is required to specify partition_ids in topic selector." + ) + + if event_handler is None: + raise issues.Error( + "To use reader without consumer it is required to specify event_handler with " + "on_partition_get_start_offset method." + ) + settings = TopicReaderSettings(**args) return TopicReaderAsyncIO(self._driver, settings, _parent=self) @@ -317,6 +340,21 @@ class TopicClientAsyncIO: return TopicTxWriterAsyncIO(tx=tx, driver=self._driver, settings=settings, _client=self) + async def commit_offset(self, path: str, consumer: str, partition_id: int, offset: int) -> None: + req = _ydb_topic.CommitOffsetRequest( + path=path, + consumer=consumer, + partition_id=partition_id, + offset=offset, + ) + + await self._driver( + req.to_proto(), + _apis.TopicService.Stub, + _apis.TopicService.CommitOffset, + _wrap_operation, + ) + def close(self): if self._closed: return @@ -484,7 +522,7 @@ class TopicClient: def reader( self, topic: Union[str, TopicReaderSelector, List[Union[str, TopicReaderSelector]]], - consumer: str, + consumer: Optional[str], buffer_size_bytes: int = 50 * 1024 * 1024, # decoders: map[codec_code] func(encoded_bytes)->decoded_bytes # the func will be called from multiply threads in parallel @@ -493,13 +531,30 @@ class TopicClient: # if max_worker in the executor is 1 - then decoders will be called from the thread without parallel decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True. + event_handler: Optional[TopicReaderEvents.EventHandler] = None, ) -> TopicReader: if not decoder_executor: decoder_executor = self._executor args = locals().copy() del args["self"] - self._check_closed() + + if consumer == "": + raise issues.Error( + "Consumer name could not be empty! To use reader without consumer specify consumer as None." + ) + + if consumer is None: + if not isinstance(topic, TopicReaderSelector) or topic.partitions is None: + raise issues.Error( + "To use reader without consumer it is required to specify partition_ids in topic selector." + ) + + if event_handler is None: + raise issues.Error( + "To use reader without consumer it is required to specify event_handler with " + "on_partition_get_start_offset method." + ) settings = TopicReaderSettings(**args) @@ -563,6 +618,21 @@ class TopicClient: return TopicTxWriter(tx, self._driver, settings, _parent=self) + def commit_offset(self, path: str, consumer: str, partition_id: int, offset: int) -> None: + req = _ydb_topic.CommitOffsetRequest( + path=path, + consumer=consumer, + partition_id=partition_id, + offset=offset, + ) + + self._driver( + req.to_proto(), + _apis.TopicService.Stub, + _apis.TopicService.CommitOffset, + _wrap_operation, + ) + def close(self): if self._closed: return diff --git a/contrib/python/ydb/py3/ydb/ydb_version.py b/contrib/python/ydb/py3/ydb/ydb_version.py index 4a5c580f99f..6b71007009d 100644 --- a/contrib/python/ydb/py3/ydb/ydb_version.py +++ b/contrib/python/ydb/py3/ydb/ydb_version.py @@ -1 +1 @@ -VERSION = "3.20.1" +VERSION = "3.21.0" diff --git a/contrib/restricted/boost/align/.yandex_meta/default.nix b/contrib/restricted/boost/align/.yandex_meta/default.nix index 753c2e88b87..16d3566df5e 100644 --- a/contrib/restricted/boost/align/.yandex_meta/default.nix +++ b/contrib/restricted/boost/align/.yandex_meta/default.nix @@ -1,7 +1,7 @@ self: super: with self; { boost_align = stdenv.mkDerivation rec { pname = "boost_align"; - version = "1.87.0"; + version = "1.88.0"; src = fetchFromGitHub { owner = "boostorg"; diff --git a/contrib/restricted/boost/align/ya.make b/contrib/restricted/boost/align/ya.make index da48278a2a2..8855ab8caf2 100644 --- a/contrib/restricted/boost/align/ya.make +++ b/contrib/restricted/boost/align/ya.make @@ -6,9 +6,9 @@ LICENSE(BSL-1.0) LICENSE_TEXTS(.yandex_meta/licenses.list.txt) -VERSION(1.87.0) +VERSION(1.88.0) -ORIGINAL_SOURCE(https://github.com/boostorg/align/archive/boost-1.87.0.tar.gz) +ORIGINAL_SOURCE(https://github.com/boostorg/align/archive/boost-1.88.0.tar.gz) PEERDIR( contrib/restricted/boost/assert diff --git a/contrib/restricted/boost/bimap/.yandex_meta/default.nix b/contrib/restricted/boost/bimap/.yandex_meta/default.nix index 29459e38785..abedd34d1d3 100644 --- a/contrib/restricted/boost/bimap/.yandex_meta/default.nix +++ b/contrib/restricted/boost/bimap/.yandex_meta/default.nix @@ -1,13 +1,13 @@ self: super: with self; { boost_bimap = stdenv.mkDerivation rec { pname = "boost_bimap"; - version = "1.87.0"; + version = "1.88.0"; src = fetchFromGitHub { owner = "boostorg"; repo = "bimap"; rev = "boost-${version}"; - hash = "sha256-gyWR4Ix7omtWG+DW2LICGplPvqTvwPAsI7toflUWlSA="; + hash = "sha256-ZZn2oSdmSy8iygrZq3+RMOkSY0zVaODcgZbAN/w80Cc="; }; }; } diff --git a/contrib/restricted/boost/bimap/include/boost/bimap/detail/map_view_iterator.hpp b/contrib/restricted/boost/bimap/include/boost/bimap/detail/map_view_iterator.hpp index 0fc3f6eab68..85395fc98be 100644 --- a/contrib/restricted/boost/bimap/include/boost/bimap/detail/map_view_iterator.hpp +++ b/contrib/restricted/boost/bimap/include/boost/bimap/detail/map_view_iterator.hpp @@ -24,7 +24,6 @@ #include <boost/core/serialization.hpp> #endif // BOOST_BIMAP_DISABLE_SERIALIZATION -#include <boost/iterator/detail/enable_if.hpp> #include <boost/iterator/iterator_adaptor.hpp> #include <boost/bimap/relation/support/pair_by.hpp> diff --git a/contrib/restricted/boost/bimap/include/boost/bimap/detail/set_view_iterator.hpp b/contrib/restricted/boost/bimap/include/boost/bimap/detail/set_view_iterator.hpp index a8b2425b9b0..b1c7496558d 100644 --- a/contrib/restricted/boost/bimap/include/boost/bimap/detail/set_view_iterator.hpp +++ b/contrib/restricted/boost/bimap/include/boost/bimap/detail/set_view_iterator.hpp @@ -24,7 +24,6 @@ #include <boost/core/serialization.hpp> #endif // BOOST_BIMAP_DISABLE_SERIALIZATION -#include <boost/iterator/detail/enable_if.hpp> #include <boost/iterator/iterator_adaptor.hpp> #include <boost/bimap/relation/support/get_pair_functor.hpp> diff --git a/contrib/restricted/boost/bimap/ya.make b/contrib/restricted/boost/bimap/ya.make index f20f7459040..be4525a79c8 100644 --- a/contrib/restricted/boost/bimap/ya.make +++ b/contrib/restricted/boost/bimap/ya.make @@ -6,9 +6,9 @@ LICENSE(BSL-1.0) LICENSE_TEXTS(.yandex_meta/licenses.list.txt) -VERSION(1.87.0) +VERSION(1.88.0) -ORIGINAL_SOURCE(https://github.com/boostorg/bimap/archive/boost-1.87.0.tar.gz) +ORIGINAL_SOURCE(https://github.com/boostorg/bimap/archive/boost-1.88.0.tar.gz) PEERDIR( contrib/restricted/boost/concept_check diff --git a/contrib/restricted/boost/bind/.yandex_meta/default.nix b/contrib/restricted/boost/bind/.yandex_meta/default.nix index 73152bb3163..e15a6e8c72e 100644 --- a/contrib/restricted/boost/bind/.yandex_meta/default.nix +++ b/contrib/restricted/boost/bind/.yandex_meta/default.nix @@ -1,13 +1,13 @@ self: super: with self; { boost_bind = stdenv.mkDerivation rec { pname = "boost_bind"; - version = "1.87.0"; + version = "1.88.0"; src = fetchFromGitHub { owner = "boostorg"; repo = "bind"; rev = "boost-${version}"; - hash = "sha256-Eu/inCn4rlpkHd6LFu/N9CtzxwM4IoLms2RPvJgF11Q="; + hash = "sha256-xdT8F7JuinEWbY5EOey21oUTjvF/7cPsu1tD31oEVus="; }; }; } diff --git a/contrib/restricted/boost/bind/ya.make b/contrib/restricted/boost/bind/ya.make index c5a29006309..836b945d92c 100644 --- a/contrib/restricted/boost/bind/ya.make +++ b/contrib/restricted/boost/bind/ya.make @@ -6,9 +6,9 @@ LICENSE(BSL-1.0) LICENSE_TEXTS(.yandex_meta/licenses.list.txt) -VERSION(1.87.0) +VERSION(1.88.0) -ORIGINAL_SOURCE(https://github.com/boostorg/bind/archive/boost-1.87.0.tar.gz) +ORIGINAL_SOURCE(https://github.com/boostorg/bind/archive/boost-1.88.0.tar.gz) PEERDIR( contrib/restricted/boost/config diff --git a/contrib/restricted/boost/container_hash/.yandex_meta/default.nix b/contrib/restricted/boost/container_hash/.yandex_meta/default.nix index 46ed9a42e3d..53775a1ea22 100644 --- a/contrib/restricted/boost/container_hash/.yandex_meta/default.nix +++ b/contrib/restricted/boost/container_hash/.yandex_meta/default.nix @@ -1,13 +1,13 @@ self: super: with self; { boost_container_hash = stdenv.mkDerivation rec { pname = "boost_container_hash"; - version = "1.87.0"; + version = "1.88.0"; src = fetchFromGitHub { owner = "boostorg"; repo = "container_hash"; rev = "boost-${version}"; - hash = "sha256-jfRtfQ7/IEgH3ZQv0qNu+ZYX8RFttwARb8G/jjKS6us="; + hash = "sha256-0LilNldSf2i+QeO9W1qiMQI5QCa6tCmAyLv2feKL3W4="; }; }; } diff --git a/contrib/restricted/boost/container_hash/ya.make b/contrib/restricted/boost/container_hash/ya.make index e36c0a5f8a0..536c5ea00ca 100644 --- a/contrib/restricted/boost/container_hash/ya.make +++ b/contrib/restricted/boost/container_hash/ya.make @@ -6,9 +6,9 @@ LICENSE(BSL-1.0) LICENSE_TEXTS(.yandex_meta/licenses.list.txt) -VERSION(1.87.0) +VERSION(1.88.0) -ORIGINAL_SOURCE(https://github.com/boostorg/container_hash/archive/boost-1.87.0.tar.gz) +ORIGINAL_SOURCE(https://github.com/boostorg/container_hash/archive/boost-1.88.0.tar.gz) PEERDIR( contrib/restricted/boost/config diff --git a/contrib/restricted/boost/crc/.yandex_meta/default.nix b/contrib/restricted/boost/crc/.yandex_meta/default.nix index bb0de9726c3..86a022471d2 100644 --- a/contrib/restricted/boost/crc/.yandex_meta/default.nix +++ b/contrib/restricted/boost/crc/.yandex_meta/default.nix @@ -1,13 +1,13 @@ self: super: with self; { boost_crc = stdenv.mkDerivation rec { pname = "boost_crc"; - version = "1.87.0"; + version = "1.88.0"; src = fetchFromGitHub { owner = "boostorg"; repo = "crc"; rev = "boost-${version}"; - hash = "sha256-zPCiNBmDIcn2Aa0sYmUwXg9AB3kajuZLZuUVrolIxT4="; + hash = "sha256-lmVm65cHe43RoidOJJ2g13I3SJAf4Lmg2skcRq41zYE="; }; }; } diff --git a/contrib/restricted/boost/crc/ya.make b/contrib/restricted/boost/crc/ya.make index 9feb7e69bff..d2926948806 100644 --- a/contrib/restricted/boost/crc/ya.make +++ b/contrib/restricted/boost/crc/ya.make @@ -6,9 +6,9 @@ LICENSE(BSL-1.0) LICENSE_TEXTS(.yandex_meta/licenses.list.txt) -VERSION(1.87.0) +VERSION(1.88.0) -ORIGINAL_SOURCE(https://github.com/boostorg/crc/archive/boost-1.87.0.tar.gz) +ORIGINAL_SOURCE(https://github.com/boostorg/crc/archive/boost-1.88.0.tar.gz) ADDINCL( GLOBAL contrib/restricted/boost/crc/include diff --git a/contrib/restricted/boost/describe/.yandex_meta/default.nix b/contrib/restricted/boost/describe/.yandex_meta/default.nix index f8d24e2ccb7..152aada3e71 100644 --- a/contrib/restricted/boost/describe/.yandex_meta/default.nix +++ b/contrib/restricted/boost/describe/.yandex_meta/default.nix @@ -1,13 +1,13 @@ self: super: with self; { boost_describe = stdenv.mkDerivation rec { pname = "boost_describe"; - version = "1.87.0"; + version = "1.88.0"; src = fetchFromGitHub { owner = "boostorg"; repo = "describe"; rev = "boost-${version}"; - hash = "sha256-20NRrM3S0JU8NFd4923iFvlOR/lC2nrWNMZqTEvyP+0="; + hash = "sha256-cjsYoyl4Vd8g5gSHyHQOJDUujWJOa6mPt6qAaOv9jFg="; }; }; } diff --git a/contrib/restricted/boost/describe/ya.make b/contrib/restricted/boost/describe/ya.make index 4fe135f3896..f48eb62ea46 100644 --- a/contrib/restricted/boost/describe/ya.make +++ b/contrib/restricted/boost/describe/ya.make @@ -6,9 +6,9 @@ LICENSE(BSL-1.0) LICENSE_TEXTS(.yandex_meta/licenses.list.txt) -VERSION(1.87.0) +VERSION(1.88.0) -ORIGINAL_SOURCE(https://github.com/boostorg/describe/archive/boost-1.87.0.tar.gz) +ORIGINAL_SOURCE(https://github.com/boostorg/describe/archive/boost-1.88.0.tar.gz) PEERDIR( contrib/restricted/boost/mp11 diff --git a/contrib/restricted/boost/endian/.yandex_meta/default.nix b/contrib/restricted/boost/endian/.yandex_meta/default.nix index 9ba90acafc6..c6c27d497be 100644 --- a/contrib/restricted/boost/endian/.yandex_meta/default.nix +++ b/contrib/restricted/boost/endian/.yandex_meta/default.nix @@ -1,13 +1,13 @@ self: super: with self; { boost_endian = stdenv.mkDerivation rec { pname = "boost_endian"; - version = "1.87.0"; + version = "1.88.0"; src = fetchFromGitHub { owner = "boostorg"; repo = "endian"; rev = "boost-${version}"; - hash = "sha256-VvRTa92Ca9wdx0cArbKr2xlnbi2UQLUSRuiwVLA5kyA="; + hash = "sha256-fpG5BghLPZAKJuDqFHHaYjWrQ5bXveQMMQv/oFlhjbA="; }; }; } diff --git a/contrib/restricted/boost/endian/ya.make b/contrib/restricted/boost/endian/ya.make index a8ef1e0929e..b896990c230 100644 --- a/contrib/restricted/boost/endian/ya.make +++ b/contrib/restricted/boost/endian/ya.make @@ -6,9 +6,9 @@ LICENSE(BSL-1.0) LICENSE_TEXTS(.yandex_meta/licenses.list.txt) -VERSION(1.87.0) +VERSION(1.88.0) -ORIGINAL_SOURCE(https://github.com/boostorg/endian/archive/boost-1.87.0.tar.gz) +ORIGINAL_SOURCE(https://github.com/boostorg/endian/archive/boost-1.88.0.tar.gz) PEERDIR( contrib/restricted/boost/config diff --git a/contrib/restricted/boost/format/.yandex_meta/default.nix b/contrib/restricted/boost/format/.yandex_meta/default.nix index 941a48907cf..1a631de769e 100644 --- a/contrib/restricted/boost/format/.yandex_meta/default.nix +++ b/contrib/restricted/boost/format/.yandex_meta/default.nix @@ -1,13 +1,13 @@ self: super: with self; { boost_format = stdenv.mkDerivation rec { pname = "boost_format"; - version = "1.87.0"; + version = "1.88.0"; src = fetchFromGitHub { owner = "boostorg"; repo = "format"; rev = "boost-${version}"; - hash = "sha256-p7Qy3XhapFa9umYjYDlIAyD0ESK3P237sTGFaKHiyFI="; + hash = "sha256-S46TJZNFw3sDw8D5Sa6zVTt5dkfvjGahLHJ5OC5dHXs="; }; }; } diff --git a/contrib/restricted/boost/format/.yandex_meta/devtools.licenses.report b/contrib/restricted/boost/format/.yandex_meta/devtools.licenses.report index 999eb9426c6..cd283c6f55f 100644 --- a/contrib/restricted/boost/format/.yandex_meta/devtools.licenses.report +++ b/contrib/restricted/boost/format/.yandex_meta/devtools.licenses.report @@ -86,26 +86,26 @@ BELONGS ya.make Files with this license: include/boost/format/exceptions.hpp [7:9] -KEEP BSL-1.0 648ee54e68cb4c96cfd2e41a7a53e0f8 +KEEP BSL-1.0 a5006bb276a0e8fcc0c080cd5a14814e BELONGS ya.make - License text: - \### License - Distributed under the [Boost Software License, Version 1.0](http://www.boost.org/LICENSE_1_0.txt). + Note: matched license text is too long. Read it in the source files. Scancode info: Original SPDX id: BSL-1.0 - Score : 60.00 + Score : 55.00 Match type : NOTICE Links : http://www.boost.org/LICENSE_1_0.txt, http://www.boost.org/users/license.html, https://spdx.org/licenses/BSL-1.0 Files with this license: - README.md [3:5] + README.md [35:35] -KEEP BSL-1.0 901941bd35f9f19e23af80f6271c10c4 +KEEP BSL-1.0 a84b62b7fab19a18e7129119ab661542 BELONGS ya.make - Note: matched license text is too long. Read it in the source files. + License text: + \### License + Distributed under the [Boost Software License, Version 1.0](https://www.boost.org/LICENSE_1_0.txt). Scancode info: Original SPDX id: BSL-1.0 - Score : 60.00 + Score : 55.00 Match type : NOTICE Links : http://www.boost.org/LICENSE_1_0.txt, http://www.boost.org/users/license.html, https://spdx.org/licenses/BSL-1.0 Files with this license: - README.md [34:34] + README.md [3:5] diff --git a/contrib/restricted/boost/format/.yandex_meta/licenses.list.txt b/contrib/restricted/boost/format/.yandex_meta/licenses.list.txt index 48a23e64221..d735fee9b1f 100644 --- a/contrib/restricted/boost/format/.yandex_meta/licenses.list.txt +++ b/contrib/restricted/boost/format/.yandex_meta/licenses.list.txt @@ -1,11 +1,11 @@ ====================BSL-1.0==================== ### License -Distributed under the [Boost Software License, Version 1.0](http://www.boost.org/LICENSE_1_0.txt). +Distributed under the [Boost Software License, Version 1.0](https://www.boost.org/LICENSE_1_0.txt). ====================BSL-1.0==================== -* [Submit Pull Requests](https://github.com/boostorg/format/pulls) against the **develop** branch. Note that by submitting patches you agree to license your modifications under the [Boost Software License, Version 1.0](http://www.boost.org/LICENSE_1_0.txt). Be sure to include tests proving your changes work properly. +* [Submit Pull Requests](https://github.com/boostorg/format/pulls) against the **develop** branch. Note that by submitting patches you agree to license your modifications under the [Boost Software License, Version 1.0](https://www.boost.org/LICENSE_1_0.txt). Be sure to include tests proving your changes work properly. ====================BSL-1.0==================== diff --git a/contrib/restricted/boost/format/README.md b/contrib/restricted/boost/format/README.md index 16945065390..4fc6496001b 100644 --- a/contrib/restricted/boost/format/README.md +++ b/contrib/restricted/boost/format/README.md @@ -1,8 +1,8 @@ -Format, part of the collection of [Boost C++ Libraries](http://github.com/boostorg), provides a type-safe mechanism for formatting arguments according to a printf-like format-string. User-defined types are supported by providing a `std::ostream operator <<` implementation for them. +Format, part of the collection of [Boost C++ Libraries](https://github.com/boostorg), provides a type-safe mechanism for formatting arguments according to a printf-like format-string. User-defined types are supported by providing a `std::ostream operator <<` implementation for them. ### License -Distributed under the [Boost Software License, Version 1.0](http://www.boost.org/LICENSE_1_0.txt). +Distributed under the [Boost Software License, Version 1.0](https://www.boost.org/LICENSE_1_0.txt). ### Properties @@ -11,10 +11,11 @@ Distributed under the [Boost Software License, Version 1.0](http://www.boost.org ### Build Status -Branch | GHA CI | Appveyor | Coverity Scan | codecov.io | Deps | Docs | Tests | -:-------------: | ------ | -------- | ------------- | ---------- | ---- | ---- | ----- | -[`master`](https://github.com/boostorg/format/tree/master) | [](https://github.com/boostorg/format/actions?query=branch:master) | [](https://ci.appveyor.com/project/jeking3/format-bhjc4/branch/master) | [](https://scan.coverity.com/projects/boostorg-format) | [](https://codecov.io/gh/boostorg/format/branch/master) | [](https://pdimov.github.io/boostdep-report/master/format.html) | [](https://www.boost.org/doc/libs/master/libs/format/) | [](http://www.boost.org/development/tests/master/developer/format.html) -[`develop`](https://github.com/boostorg/format/tree/develop) | [](https://github.com/boostorg/format/actions?query=branch:develop) | [](https://ci.appveyor.com/project/jeking3/format-bhjc4/branch/develop) | [](https://scan.coverity.com/projects/boostorg-format) | [](https://codecov.io/gh/boostorg/format/branch/develop) | [](https://pdimov.github.io/boostdep-report/develop/format.html) | [](https://www.boost.org/doc/libs/develop/libs/format/) | [](http://www.boost.org/development/tests/develop/developer/format.html) +<!-- boost-ci/tools/makebadges.sh --project format --appveyor aeg8obnkb0mrgqvd --codecov --coverity 14007 --> +| Branch | GHA CI | Appveyor | Coverity Scan | codecov.io | Deps | Docs | Tests | +| :-------------: | ------ | -------- | ------------- | ---------- | ---- | ---- | ----- | +| [`master`](https://github.com/boostorg/format/tree/master) | [](https://github.com/boostorg/format/actions?query=branch:master) | [](https://ci.appveyor.com/project/cppalliance/format/branch/master) | [](https://scan.coverity.com/projects/boostorg-format) | [](https://codecov.io/gh/boostorg/format/tree/master) | [](https://pdimov.github.io/boostdep-report/master/format.html) | [](https://www.boost.org/doc/libs/master/libs/format) | [](https://www.boost.org/development/tests/master/developer/format.html) +| [`develop`](https://github.com/boostorg/format/tree/develop) | [](https://github.com/boostorg/format/actions?query=branch:develop) | [](https://ci.appveyor.com/project/cppalliance/format/branch/develop) | [](https://scan.coverity.com/projects/boostorg-format) | [](https://codecov.io/gh/boostorg/format/tree/develop) | [](https://pdimov.github.io/boostdep-report/develop/format.html) | [](https://www.boost.org/doc/libs/develop/libs/format) | [](https://www.boost.org/development/tests/develop/developer/format.html) ### Directories @@ -29,8 +30,8 @@ Branch | GHA CI | Appveyor | Coverity Scan | codecov.io | Deps | Docs | ### More information -* [Ask questions](http://stackoverflow.com/questions/ask?tags=c%2B%2B,boost,boost-format): Be sure to read the documentation first as Boost.Format, like any other string formatting library, has its own rules. +* [Ask questions](https://stackoverflow.com/questions/ask?tags=c%2B%2B,boost,boost-format): Be sure to read the documentation first as Boost.Format, like any other string formatting library, has its own rules. * [Report bugs](https://github.com/boostorg/format/issues): Be sure to mention Boost version, platform and compiler you're using. A small compilable code sample to reproduce the problem is always good as well. -* [Submit Pull Requests](https://github.com/boostorg/format/pulls) against the **develop** branch. Note that by submitting patches you agree to license your modifications under the [Boost Software License, Version 1.0](http://www.boost.org/LICENSE_1_0.txt). Be sure to include tests proving your changes work properly. -* Discussions about the library are held on the [Boost developers mailing list](http://www.boost.org/community/groups.html#main). Be sure to read the [discussion policy](http://www.boost.org/community/policy.html) before posting and add the `[format]` tag at the beginning of the subject line. +* [Submit Pull Requests](https://github.com/boostorg/format/pulls) against the **develop** branch. Note that by submitting patches you agree to license your modifications under the [Boost Software License, Version 1.0](https://www.boost.org/LICENSE_1_0.txt). Be sure to include tests proving your changes work properly. +* Discussions about the library are held on the [Boost developers mailing list](https://www.boost.org/community/groups.html#main). Be sure to read the [discussion policy](https://www.boost.org/community/policy.html) before posting and add the `[format]` tag at the beginning of the subject line. diff --git a/contrib/restricted/boost/format/include/boost/format/alt_sstream_impl.hpp b/contrib/restricted/boost/format/include/boost/format/alt_sstream_impl.hpp index 05379dcb9e5..e72e7d00671 100644 --- a/contrib/restricted/boost/format/include/boost/format/alt_sstream_impl.hpp +++ b/contrib/restricted/boost/format/include/boost/format/alt_sstream_impl.hpp @@ -126,7 +126,7 @@ namespace boost { off += static_cast<off_type>(putend_ - pptr()); else if(way == ::std::ios_base::beg) off += static_cast<off_type>(pbase() - pptr()); - else if(way != ::std::ios_base::beg) + else return pos_type(off_type(-1)); if(pbase() <= off+pptr() && off+pptr() <= putend_) // set pptr diff --git a/contrib/restricted/boost/format/ya.make b/contrib/restricted/boost/format/ya.make index e6a379ac2d0..3db1cf7f132 100644 --- a/contrib/restricted/boost/format/ya.make +++ b/contrib/restricted/boost/format/ya.make @@ -6,9 +6,9 @@ LICENSE(BSL-1.0) LICENSE_TEXTS(.yandex_meta/licenses.list.txt) -VERSION(1.87.0) +VERSION(1.88.0) -ORIGINAL_SOURCE(https://github.com/boostorg/format/archive/boost-1.87.0.tar.gz) +ORIGINAL_SOURCE(https://github.com/boostorg/format/archive/boost-1.88.0.tar.gz) PEERDIR( contrib/restricted/boost/assert diff --git a/contrib/restricted/boost/function/.yandex_meta/default.nix b/contrib/restricted/boost/function/.yandex_meta/default.nix index 647aea48b0b..9f9b1412de5 100644 --- a/contrib/restricted/boost/function/.yandex_meta/default.nix +++ b/contrib/restricted/boost/function/.yandex_meta/default.nix @@ -1,13 +1,13 @@ self: super: with self; { boost_function = stdenv.mkDerivation rec { pname = "boost_function"; - version = "1.87.0"; + version = "1.88.0"; src = fetchFromGitHub { owner = "boostorg"; repo = "function"; rev = "boost-${version}"; - hash = "sha256-OsSx0OF1brwuswfNA0ynh2SeVw0Wl/6QKyZ9UQIIYZI="; + hash = "sha256-ZfEd50J1Lq1W2X90QYLgkudvofmNHlwgVmIgpQWqdtE="; }; }; } diff --git a/contrib/restricted/boost/function/ya.make b/contrib/restricted/boost/function/ya.make index 8f4408c4e64..ff88c0ac978 100644 --- a/contrib/restricted/boost/function/ya.make +++ b/contrib/restricted/boost/function/ya.make @@ -6,9 +6,9 @@ LICENSE(BSL-1.0) LICENSE_TEXTS(.yandex_meta/licenses.list.txt) -VERSION(1.87.0) +VERSION(1.88.0) -ORIGINAL_SOURCE(https://github.com/boostorg/function/archive/boost-1.87.0.tar.gz) +ORIGINAL_SOURCE(https://github.com/boostorg/function/archive/boost-1.88.0.tar.gz) PEERDIR( contrib/restricted/boost/assert diff --git a/contrib/restricted/boost/multi_index/.yandex_meta/default.nix b/contrib/restricted/boost/multi_index/.yandex_meta/default.nix index 7e75e408ba7..bacd1c20422 100644 --- a/contrib/restricted/boost/multi_index/.yandex_meta/default.nix +++ b/contrib/restricted/boost/multi_index/.yandex_meta/default.nix @@ -1,13 +1,13 @@ self: super: with self; { boost_multi_index = stdenv.mkDerivation rec { pname = "boost_multi_index"; - version = "1.87.0"; + version = "1.88.0"; src = fetchFromGitHub { owner = "boostorg"; repo = "multi_index"; rev = "boost-${version}"; - hash = "sha256-+CbwQndrqN4fgvDBSEN7naeyQ9TScEOoenDpYAI991g="; + hash = "sha256-VC2QL0YBsaYfS9DH5lMG8f9Ui236bsv1Dz+ScyqKFBU="; }; }; } diff --git a/contrib/restricted/boost/multi_index/ya.make b/contrib/restricted/boost/multi_index/ya.make index 7935b94c2b6..fb35c536d9c 100644 --- a/contrib/restricted/boost/multi_index/ya.make +++ b/contrib/restricted/boost/multi_index/ya.make @@ -9,9 +9,9 @@ LICENSE( LICENSE_TEXTS(.yandex_meta/licenses.list.txt) -VERSION(1.87.0) +VERSION(1.88.0) -ORIGINAL_SOURCE(https://github.com/boostorg/multi_index/archive/boost-1.87.0.tar.gz) +ORIGINAL_SOURCE(https://github.com/boostorg/multi_index/archive/boost-1.88.0.tar.gz) PEERDIR( contrib/restricted/boost/assert diff --git a/contrib/restricted/boost/predef/.yandex_meta/default.nix b/contrib/restricted/boost/predef/.yandex_meta/default.nix index ebc4a40b6f3..81b3768257e 100644 --- a/contrib/restricted/boost/predef/.yandex_meta/default.nix +++ b/contrib/restricted/boost/predef/.yandex_meta/default.nix @@ -1,13 +1,13 @@ self: super: with self; { boost_predef = stdenv.mkDerivation rec { pname = "boost_predef"; - version = "1.87.0"; + version = "1.88.0"; src = fetchFromGitHub { owner = "boostorg"; repo = "predef"; rev = "boost-${version}"; - hash = "sha256-CxSlsedkHXHs1LTG9aWPjS0jh2QeIXGalZDmGCt7Jf4="; + hash = "sha256-C1aMk1RkLMqSBIv8oTlfkZhlp1kzWqXoAn3Cyev9Jtc="; }; }; } diff --git a/contrib/restricted/boost/predef/ya.make b/contrib/restricted/boost/predef/ya.make index 650be04dbcc..44a9fc192ef 100644 --- a/contrib/restricted/boost/predef/ya.make +++ b/contrib/restricted/boost/predef/ya.make @@ -6,9 +6,9 @@ LICENSE(BSL-1.0) LICENSE_TEXTS(.yandex_meta/licenses.list.txt) -VERSION(1.87.0) +VERSION(1.88.0) -ORIGINAL_SOURCE(https://github.com/boostorg/predef/archive/boost-1.87.0.tar.gz) +ORIGINAL_SOURCE(https://github.com/boostorg/predef/archive/boost-1.88.0.tar.gz) ADDINCL( GLOBAL contrib/restricted/boost/predef/include diff --git a/contrib/restricted/boost/system/.yandex_meta/default.nix b/contrib/restricted/boost/system/.yandex_meta/default.nix index b9d8a6d23c8..feecc1758d1 100644 --- a/contrib/restricted/boost/system/.yandex_meta/default.nix +++ b/contrib/restricted/boost/system/.yandex_meta/default.nix @@ -1,13 +1,13 @@ self: super: with self; { boost_system = stdenv.mkDerivation rec { pname = "boost_system"; - version = "1.87.0"; + version = "1.88.0"; src = fetchFromGitHub { owner = "boostorg"; repo = "system"; rev = "boost-${version}"; - hash = "sha256-u1KPJNJtAOPOfHmrdw7WQVRmuTWQg6KtLhkINr2L8JM="; + hash = "sha256-wI/Hs+kS/jPg90iE5E6ZFhQ41O34JCh0N17iRjteYyk="; }; }; } diff --git a/contrib/restricted/boost/system/include/boost/system/result.hpp b/contrib/restricted/boost/system/include/boost/system/result.hpp index 456a79299ea..539113c2e30 100644 --- a/contrib/restricted/boost/system/include/boost/system/result.hpp +++ b/contrib/restricted/boost/system/include/boost/system/result.hpp @@ -332,40 +332,28 @@ public: BOOST_CXX14_CONSTEXPR T& operator*() noexcept { - T* p = operator->(); - - BOOST_ASSERT( p != 0 ); - - return *p; + BOOST_ASSERT( has_value() ); + return *operator->(); } BOOST_CXX14_CONSTEXPR T const& operator*() const noexcept { - T const* p = operator->(); - - BOOST_ASSERT( p != 0 ); - - return *p; + BOOST_ASSERT( has_value() ); + return *operator->(); } #else BOOST_CXX14_CONSTEXPR T& operator*() & noexcept { - T* p = operator->(); - - BOOST_ASSERT( p != 0 ); - - return *p; + BOOST_ASSERT( has_value() ); + return *operator->(); } BOOST_CXX14_CONSTEXPR T const& operator*() const & noexcept { - T const* p = operator->(); - - BOOST_ASSERT( p != 0 ); - - return *p; + BOOST_ASSERT( has_value() ); + return *operator->(); } template<class U = T> @@ -838,11 +826,8 @@ public: BOOST_CXX14_CONSTEXPR U& operator*() const noexcept { - U* p = operator->(); - - BOOST_ASSERT( p != 0 ); - - return *p; + BOOST_ASSERT( has_value() ); + return *operator->(); } // error access diff --git a/contrib/restricted/boost/system/ya.make b/contrib/restricted/boost/system/ya.make index 0bc1a155a5e..4f7edb28d90 100644 --- a/contrib/restricted/boost/system/ya.make +++ b/contrib/restricted/boost/system/ya.make @@ -6,9 +6,9 @@ LICENSE(BSL-1.0) LICENSE_TEXTS(.yandex_meta/licenses.list.txt) -VERSION(1.87.0) +VERSION(1.88.0) -ORIGINAL_SOURCE(https://github.com/boostorg/system/archive/boost-1.87.0.tar.gz) +ORIGINAL_SOURCE(https://github.com/boostorg/system/archive/boost-1.88.0.tar.gz) PEERDIR( contrib/restricted/boost/assert diff --git a/library/cpp/accurate_accumulate/benchmark/metrics/ya.make b/library/cpp/accurate_accumulate/benchmark/metrics/ya.make index 51476cad403..88be9edbd60 100644 --- a/library/cpp/accurate_accumulate/benchmark/metrics/ya.make +++ b/library/cpp/accurate_accumulate/benchmark/metrics/ya.make @@ -6,8 +6,6 @@ TAG( ya:force_sandbox sb:intel_e5_2660v1 ya:fat - ya:large_tests_on_multi_slots - ya:large_tests_on_ya_make_2 ) TEST_SRCS( diff --git a/library/cpp/containers/intrusive_rb_tree/fuzz/ya.make b/library/cpp/containers/intrusive_rb_tree/fuzz/ya.make index 6d2c9ac2616..99499a9cabf 100644 --- a/library/cpp/containers/intrusive_rb_tree/fuzz/ya.make +++ b/library/cpp/containers/intrusive_rb_tree/fuzz/ya.make @@ -4,8 +4,6 @@ SIZE(LARGE) TAG( ya:fat - ya:large_tests_on_multi_slots - ya:large_tests_on_ya_make_2 ) PEERDIR( diff --git a/library/cpp/digest/argonish/ut_fat/ya.make b/library/cpp/digest/argonish/ut_fat/ya.make index 05969539243..6f0a77611d4 100644 --- a/library/cpp/digest/argonish/ut_fat/ya.make +++ b/library/cpp/digest/argonish/ut_fat/ya.make @@ -12,8 +12,6 @@ TAG( sb:intel_e5_2660v4 ya:fat ya:force_sandbox - ya:large_tests_on_multi_slots - ya:large_tests_on_ya_make_2 ) SIZE(LARGE) diff --git a/library/cpp/json/yson/ut/ya.make b/library/cpp/json/yson/ut/ya.make index 3f89b545073..e07dfdb8a47 100644 --- a/library/cpp/json/yson/ut/ya.make +++ b/library/cpp/json/yson/ut/ya.make @@ -16,8 +16,6 @@ SIZE(LARGE) TAG( ya:fat - ya:large_tests_on_multi_slots - ya:large_tests_on_ya_make_2 ) TIMEOUT(600) diff --git a/library/cpp/string_utils/base64/bench/metrics/ya.make b/library/cpp/string_utils/base64/bench/metrics/ya.make index a77c2c95052..5e759329794 100644 --- a/library/cpp/string_utils/base64/bench/metrics/ya.make +++ b/library/cpp/string_utils/base64/bench/metrics/ya.make @@ -6,8 +6,6 @@ TAG( ya:force_sandbox sb:intel_e5_2660v1 ya:fat - ya:large_tests_on_multi_slots - ya:large_tests_on_ya_make_2 ) TEST_SRCS( diff --git a/library/cpp/testing/benchmark/examples/metrics/ya.make b/library/cpp/testing/benchmark/examples/metrics/ya.make index 7481bdf2159..ed4d429653d 100644 --- a/library/cpp/testing/benchmark/examples/metrics/ya.make +++ b/library/cpp/testing/benchmark/examples/metrics/ya.make @@ -6,8 +6,6 @@ TAG( ya:force_sandbox sb:intel_e5_2660v1 ya:fat - ya:large_tests_on_multi_slots - ya:large_tests_on_ya_make_2 ) TEST_SRCS( diff --git a/library/cpp/testing/unittest/fat/ya.make b/library/cpp/testing/unittest/fat/ya.make index d02282d605a..464f26818f6 100644 --- a/library/cpp/testing/unittest/fat/ya.make +++ b/library/cpp/testing/unittest/fat/ya.make @@ -12,8 +12,6 @@ FORK_SUBTESTS() TAG( ya:fat ya:force_sandbox - ya:large_tests_on_multi_slots - ya:large_tests_on_ya_make_2 ) END() diff --git a/library/cpp/yt/logging/logger.h b/library/cpp/yt/logging/logger.h index 1b40b42053a..1dc241ed81a 100644 --- a/library/cpp/yt/logging/logger.h +++ b/library/cpp/yt/logging/logger.h @@ -315,7 +315,7 @@ void LogStructuredEvent( #define YT_LOG_ERROR_IF(condition, ...) if (condition) YT_LOG_ERROR(__VA_ARGS__) #define YT_LOG_ERROR_UNLESS(condition, ...) if (!(condition)) YT_LOG_ERROR(__VA_ARGS__) -#define YT_LOG_ALERT(...) YT_LOG_EVENT(Logger, ::NYT::NLogging::ELogLevel::Alert, __VA_ARGS__); +#define YT_LOG_ALERT(...) YT_LOG_EVENT(Logger, ::NYT::NLogging::ELogLevel::Alert, __VA_ARGS__) #define YT_LOG_ALERT_IF(condition, ...) if (condition) YT_LOG_ALERT(__VA_ARGS__) #define YT_LOG_ALERT_UNLESS(condition, ...) if (!(condition)) YT_LOG_ALERT(__VA_ARGS__) @@ -341,9 +341,19 @@ void LogStructuredEvent( YT_LOG_EVENT(Logger, ::NYT::NLogging::ELogLevel::Alert, __VA_ARGS__); \ THROW_ERROR_EXCEPTION( \ ::NYT::EErrorCode::Fatal, \ - "Malformed request or incorrect state detected"); -#define YT_LOG_ALERT_AND_THROW_IF(condition, ...) if (condition) YT_LOG_ALERT_AND_THROW(__VA_ARGS__) -#define YT_LOG_ALERT_AND_THROW_UNLESS(condition, ...) if (!(condition)) YT_LOG_ALERT_AND_THROW(__VA_ARGS__) + "Malformed request or incorrect state detected") + +#define YT_LOG_ALERT_AND_THROW_IF(condition, ...) \ + if (Y_UNLIKELY(condition)) { \ + YT_LOG_ALERT_AND_THROW(__VA_ARGS__); \ + } \ + static_assert(true) + +#define YT_LOG_ALERT_AND_THROW_UNLESS(condition, ...) \ + if (!Y_UNLIKELY(condition)) { \ + YT_LOG_ALERT_AND_THROW(__VA_ARGS__); \ + } \ + static_assert(true) #define YT_LOG_EVENT(logger, level, ...) \ do { \ diff --git a/library/python/pytest/plugins/collection.py b/library/python/pytest/plugins/collection.py index 7ca8efc47e9..8bc375c625d 100644 --- a/library/python/pytest/plugins/collection.py +++ b/library/python/pytest/plugins/collection.py @@ -90,7 +90,9 @@ def _is_skipped_module_level(module): # pytest.skip.Exception https://docs.pytest.org/en/stable/reference/reference.html#pytest-skip try: module.obj - except pytest.skip.Exception: + except pytest.skip.Exception as e: + if not e.allow_module_level: + raise RuntimeError("Using pytest.skip outside of a test will skip the entire module. If that's your intention, pass `allow_module_level=True`.") return True except Exception: # letting other exceptions such as ImportError slip through diff --git a/util/generic/ptr.cpp b/util/generic/ptr.cpp index b29baebc175..963dcd5160c 100644 --- a/util/generic/ptr.cpp +++ b/util/generic/ptr.cpp @@ -1,4 +1,5 @@ #include "ptr.h" +#include "yexception.h" #include <util/system/defaults.h> #include <util/memory/alloc.h> @@ -15,3 +16,7 @@ void TDelete::Destroy(void* t) noexcept { } TThrRefBase::~TThrRefBase() = default; + +[[noreturn]] void NDetail::NullDerefenceThrowImpl() { + ythrow yexception{} << "nullptr dereference"; +} diff --git a/util/generic/ptr.h b/util/generic/ptr.h index 625418ccdb0..cf28d0bfd2c 100644 --- a/util/generic/ptr.h +++ b/util/generic/ptr.h @@ -100,6 +100,10 @@ private: static void DoDestroy(void* t) noexcept; }; +namespace NDetail { + [[noreturn]] void NullDerefenceThrowImpl(); +} // namespace NDetail + template <class Base, class T> class TPointerCommon { public: @@ -116,6 +120,18 @@ public: return ptr; } + inline typename std::add_lvalue_reference<T>::type GetRef() const { + T* ptr = AsT(); + if (Y_UNLIKELY(!ptr)) { + NDetail::NullDerefenceThrowImpl(); + } + if constexpr (std::is_void<T>::value) { + return; + } else { + return *ptr; + } + } + #ifndef __cpp_impl_three_way_comparison template <class C> inline bool operator==(const C& p) const noexcept { diff --git a/util/generic/ptr_ut.cpp b/util/generic/ptr_ut.cpp index 8f72cee5219..e6a4a7837c0 100644 --- a/util/generic/ptr_ut.cpp +++ b/util/generic/ptr_ut.cpp @@ -36,6 +36,7 @@ class TPointerTest: public TTestBase { UNIT_TEST(TestRefCountedPtrsInHashSet); UNIT_TEST(TestSharedPtrDowncast); UNIT_TEST(TestStdCompatibility); + UNIT_TEST(TestGetRef); UNIT_TEST_SUITE_END(); private: @@ -91,6 +92,7 @@ private: void TestRefCountedPtrsInHashSet(); void TestSharedPtrDowncast(); void TestStdCompatibility(); + void TestGetRef(); }; UNIT_TEST_SUITE_REGISTRATION(TPointerTest); @@ -962,3 +964,30 @@ void TPointerTest::TestStdCompatibility() { UNIT_ASSERT_VALUES_EQUAL(ptr.get(), ptr.Get()); } } + +void TPointerTest::TestGetRef() { + { + TSimpleSharedPtr<int> ptr = MakeSimpleShared<int>(5); + UNIT_ASSERT_TYPES_EQUAL(decltype(ptr.GetRef()), int&); + UNIT_ASSERT_VALUES_EQUAL(ptr.GetRef(), 5); + ptr.GetRef() += 5; + UNIT_ASSERT_VALUES_EQUAL(ptr.GetRef(), 10); + } + { + THolder<const int> ptr = MakeHolder<int>(5); + UNIT_ASSERT_TYPES_EQUAL(decltype(ptr.GetRef()), const int&); + UNIT_ASSERT_VALUES_EQUAL(ptr.GetRef(), 5); + } + { + THolder<const int> ptr; + UNIT_ASSERT_EXCEPTION(ptr.GetRef(), yexception); + } + + { + THolder<void, TNoAction> ptr((void*)10); + UNIT_ASSERT_TYPES_EQUAL(decltype(ptr.GetRef()), void); + ptr.GetRef(); + ptr = {}; + UNIT_ASSERT_EXCEPTION(ptr.GetRef(), yexception); + } +} diff --git a/yql/essentials/cfg/tests/gateways-experimental.conf b/yql/essentials/cfg/tests/gateways-experimental.conf index 391199eccd2..e8adb01227c 100644 --- a/yql/essentials/cfg/tests/gateways-experimental.conf +++ b/yql/essentials/cfg/tests/gateways-experimental.conf @@ -96,7 +96,6 @@ YqlCore { Flags { Name: "OptimizerFlags" Args: [ - "EmitPruneKeys", "MemberNthOverFlatMap", "ExtractMembersSplitOnOptional", "FilterNullMembersOverJust", diff --git a/yql/essentials/core/expr_nodes/yql_expr_nodes.json b/yql/essentials/core/expr_nodes/yql_expr_nodes.json index c051517eee9..740fae8474f 100644 --- a/yql/essentials/core/expr_nodes/yql_expr_nodes.json +++ b/yql/essentials/core/expr_nodes/yql_expr_nodes.json @@ -1281,6 +1281,11 @@ "Match": {"Type": "Callable", "Name": "=="} }, { + "Name": "TCoCmpEqualsIgnoreCase", + "Base": "TCoCompare", + "Match": {"Type": "Callable", "Name": "EqualsIgnoreCase"} + }, + { "Name": "TCoCmpNotEqual", "Base": "TCoCompare", "Match": {"Type": "Callable", "Name": "!="} @@ -1301,16 +1306,32 @@ "Match": {"Type": "Callable", "Name": "StartsWith"} }, { + "Name": "TCoCmpStartsWithIgnoreCase", + "Base": "TCoCompare", + "Match": {"Type": "Callable", "Name": "StartsWithIgnoreCase"} + }, + { "Name": "TCoCmpEndsWith", "Base": "TCoCompare", "Match": {"Type": "Callable", "Name": "EndsWith"} }, { + "Name": "TCoCmpEndsWithIgnoreCase", + "Base": "TCoCompare", + "Match": {"Type": "Callable", "Name": "EndsWithIgnoreCase"} + }, + + { "Name": "TCoCmpStringContains", "Base": "TCoCompare", "Match": {"Type": "Callable", "Name": "StringContains"} }, { + "Name": "TCoCmpStringContainsIgnoreCase", + "Base": "TCoCompare", + "Match": {"Type": "Callable", "Name": "StringContainsIgnoreCase"} + }, + { "Name": "TCoInc", "Base": "TCallable", "Match": {"Type": "Callable", "Name": "Inc"}, @@ -1439,7 +1460,8 @@ {"Index": 1, "Name": "FunctionName", "Type": "TCoAtom"}, {"Index": 2, "Name": "FunctionType", "Type": "TExprBase"}, {"Index": 3, "Name": "Script", "Type": "TExprBase"}, - {"Index": 4, "Name": "Options", "Type": "TCoNameValueTupleList", "Optional": true} + {"Index": 4, "Name": "Options", "Type": "TCoNameValueTupleList", "Optional": true}, + {"Index": 5, "Name": "FileAlias", "Type": "TCoAtom", "Optional": true} ] }, { diff --git a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp index ef78839b26d..2b64e4e0bd1 100644 --- a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -8620,6 +8620,69 @@ TExprNode::TPtr ExpandSqlCompare(const TExprNode::TPtr& node, TExprContext& ctx) return node; } +TExprNode::TPtr ExpandContainsIgnoreCase(const TExprNode::TPtr& node, TExprContext& ctx) { + YQL_CLOG(DEBUG, CorePeepHole) << "Expand " << node->Content(); + const TString part{node->Child(1)->Child(0)->Content()}; + TString pattern; + if (node->Content() == "EqualsIgnoreCase") { + pattern = part; + } else if (node->Content() == "StartsWithIgnoreCase") { + pattern = part + "%"; + } else if (node->Content() == "EndsWithIgnoreCase") { + pattern = "%" + part; + } else if (node->Content() == "StringContainsIgnoreCase") { + pattern = "%" + part + "%"; + } else { + YQL_ENSURE(!"Unknown IngoreCase node"); + } + const auto pos = node->Pos(); + auto patternExpr = ctx.Builder(pos) + .Callable("Apply") + .Callable(0, "Udf") + .Atom(0, "Re2.PatternFromLike") + .Seal() + .Callable(1, node->Child(1)->Content()) + .Atom(0, pattern) + .Seal() + .Seal() + .Build(); + + + auto optionsExpr = ctx.Builder(pos) + .Callable("NamedApply") + .Callable(0, "Udf") + .Atom(0, "Re2.Options") + .Seal() + .List(1) + .Seal() + .Callable(2, "AsStruct") + .List(0) + .Atom(0, "CaseSensitive") + .Callable(1, "Bool") + .Atom(0, "false", TNodeFlags::Default) + .Seal() + .Seal() + .Seal() + .Seal() + .Build(); + + auto result = ctx.Builder(pos) + .Callable("Apply") + .Callable(0, "AssumeStrict") + .Callable(0, "Udf") + .Atom(0, "Re2.Match") + .List(1) + .Add(0, patternExpr) + .Add(1, optionsExpr) + .Seal() + .Seal() + .Seal() + .Add(1, node->Child(0)) + .Seal() + .Build(); + + return result; +} template <bool Equals> TExprNode::TPtr ExpandAggrEqual(const TExprNode::TPtr& node, TExprContext& ctx) { @@ -8944,6 +9007,10 @@ struct TPeepHoleRules { {"EmptyIterator", &DropDependsOnFromEmptyIterator}, {"Version", &ExpandVersion}, {RightName, &ExpandRightOverCons}, + {"EqualsIgnoreCase", &ExpandContainsIgnoreCase}, + {"StartsWithIgnoreCase", &ExpandContainsIgnoreCase}, + {"EndsWithIgnoreCase", &ExpandContainsIgnoreCase}, + {"StringContainsIgnoreCase", &ExpandContainsIgnoreCase}, }; const TExtPeepHoleOptimizerMap CommonStageExtRules = { diff --git a/yql/essentials/core/type_ann/type_ann_core.cpp b/yql/essentials/core/type_ann/type_ann_core.cpp index a4a780a71af..e0518348d7c 100644 --- a/yql/essentials/core/type_ann/type_ann_core.cpp +++ b/yql/essentials/core/type_ann/type_ann_core.cpp @@ -3323,7 +3323,9 @@ namespace NTypeAnnImpl { return IGraphTransformer::TStatus::Error; } - if (IsNull(input->Head()) || IsNull(input->Tail())) { + const auto ignoreNulls = input->Content().ends_with("IgnoreCase"); + + if ((IsNull(input->Head()) || IsNull(input->Tail())) && !ignoreNulls) { output = MakeBoolNothing(input->Pos(), ctx.Expr); return IGraphTransformer::TStatus::Repeat; } @@ -3343,8 +3345,11 @@ namespace NTypeAnnImpl { } bool isOptional = false; const TDataExprType* dataType = nullptr; - if (!IsDataOrOptionalOfData(type, isOptional, dataType) || - !(dataType->GetSlot() == EDataSlot::String || dataType->GetSlot() == EDataSlot::Utf8)) + if ((!IsDataOrOptionalOfData(type, isOptional, dataType) || + !(dataType->GetSlot() == EDataSlot::String || dataType->GetSlot() == EDataSlot::Utf8) || + dataType->IsOptionalOrNull()) && + (!IsNull(*type) && ignoreNulls) + ) { ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(child->Pos()), TStringBuilder() << "Expected (optional) string/utf8 or corresponding Pg type, but got: " << *child->GetTypeAnn())); @@ -3352,8 +3357,7 @@ namespace NTypeAnnImpl { } hasOptionals = hasOptionals || isOptional; } - - if (hasOptionals) + if (hasOptionals && !ignoreNulls) input->SetTypeAnn(ctx.Expr.MakeType<TOptionalExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Bool))); else input->SetTypeAnn(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Bool)); @@ -7837,7 +7841,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> if (!EnsureMinArgsCount(*input, 4, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } - if (!EnsureMaxArgsCount(*input, 5, ctx.Expr)) { + if (!EnsureMaxArgsCount(*input, 6, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } @@ -7896,6 +7900,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> bool isCustomPython = NKikimr::NMiniKQL::IsCustomPython(scriptType); auto canonizedModuleName = isCustomPython ? moduleName : NKikimr::NMiniKQL::ScriptTypeAsStr(scriptType); bool foundModule = false; + TStringBuf fileAlias = ""_sb; // resolve script udf from external resources (files / urls) // (main usage of CustomPython) @@ -7906,7 +7911,10 @@ template <NKikimr::NUdf::EDataSlot DataSlot> return IGraphTransformer::TStatus::Error; } - foundModule = ctx.Types.UdfModules.find(canonizedModuleName) != ctx.Types.UdfModules.end(); + if (auto udfInfo = ctx.Types.UdfModules.FindPtr(canonizedModuleName)) { + foundModule = true; + fileAlias = udfInfo->FileAlias; + } } // fallback for preinstalled CustomPython case @@ -7926,7 +7934,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> return IGraphTransformer::TStatus::Error; } - if (input->ChildrenSize() == 5) { + if (input->ChildrenSize() > 4) { if (!EnsureTuple(*input->Child(4), ctx.Expr)) { return IGraphTransformer::TStatus::Error; } @@ -7966,6 +7974,21 @@ template <NKikimr::NUdf::EDataSlot DataSlot> } } + if (input->ChildrenSize() > 5) { + if (!EnsureAtom(*input->Child(5), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + } else if (fileAlias) { + auto children = input->ChildrenList(); + if (children.size() < 5) { + children.push_back(ctx.Expr.NewList(input->Pos(), {})); + } + children.push_back(ctx.Expr.NewAtom(input->Pos(), fileAlias)); + YQL_ENSURE(children.size() == 6); + output = ctx.Expr.ChangeChildren(*input, std::move(children)); + return IGraphTransformer::TStatus::Repeat; + } + input->SetTypeAnn(callableType); return IGraphTransformer::TStatus::Ok; } @@ -12490,6 +12513,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["GreaterOrEqual"] = &CompareWrapper<false>; Functions["=="] = &CompareWrapper<true>; Functions["Equal"] = &CompareWrapper<true>; + Functions["EqualsIgnoreCase"] = &WithWrapper; Functions["!="] = &CompareWrapper<true>; Functions["NotEqual"] = &CompareWrapper<true>; Functions["Inc"] = &IncDecWrapper<true>; @@ -12553,8 +12577,11 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["Find"] = &FindWrapper; Functions["RFind"] = &FindWrapper; Functions["StartsWith"] = &WithWrapper; + Functions["StartsWithIgnoreCase"] = &WithWrapper; Functions["EndsWith"] = &WithWrapper; + Functions["EndsWithIgnoreCase"] = &WithWrapper; Functions["StringContains"] = &WithWrapper; + Functions["StringContainsIgnoreCase"] = &WithWrapper; Functions["ByteAt"] = &ByteAtWrapper; Functions["ListIf"] = &ListIfWrapper; Functions["AsList"] = &AsListWrapper<false>; diff --git a/yql/essentials/docs/en/builtins/basic.md b/yql/essentials/docs/en/builtins/basic.md index f92a303295d..59c52a65dbc 100644 --- a/yql/essentials/docs/en/builtins/basic.md +++ b/yql/essentials/docs/en/builtins/basic.md @@ -959,16 +959,22 @@ SELECT Arguments: -1. An unsigned number that's subject to the operation. TestBit is also implemented for strings. +1. An unsigned number that's subject to the operation. `TestBit` is also implemented for strings (see the description below). 2. Number of the bit. -TestBit returns `true/false`. The other functions return a copy of their first argument with the corresponding conversion. +`TestBit` returns `true/false`. The other functions return a copy of their first argument with the corresponding conversion. + +`TestBit` works the following way for the string argument: + +1. For the second argument (the number of the bit) the corresponding byte **from the beginning of the string** is chosen. +2. Next, for the given byte the corresponding LSB is chosen. #### Examples ```yql SELECT TestBit(1u, 0), -- true + TestBit('ax', 12) -- true (second byte, fourth bit) SetBit(8u, 0); -- 9 ``` diff --git a/yql/essentials/docs/ru/builtins/basic.md b/yql/essentials/docs/ru/builtins/basic.md index 83690a3aae0..0dae688bd3e 100644 --- a/yql/essentials/docs/ru/builtins/basic.md +++ b/yql/essentials/docs/ru/builtins/basic.md @@ -1252,6 +1252,8 @@ SELECT ```yql TestBit(T, Uint8)->Bool TestBit(T?, Uint8)->Bool? +TestBit(String, Uint8)->Bool? +TestBit(String?, Uint8)->Bool? ClearBit(T, Uint8)->T ClearBit(T?, Uint8)->T? @@ -1265,16 +1267,22 @@ FlipBit(T?, Uint8)->T? Аргументы: -1. Беззнаковое число, над которым выполнять требуемую операцию. TestBit также реализован и для строк. +1. Беззнаковое число, над которым выполнять требуемую операцию. `TestBit` также реализован и для строк (см. ниже описание). 2. Номер бита. -TestBit возвращает `true/false`. Остальные функции возвращают копию своего первого аргумента с проведенным соответствующим преобразованием. +`TestBit` возвращает `true/false`. Остальные функции возвращают копию своего первого аргумента с проведенным соответствующим преобразованием. + +`TestBit` для строковых аргументов работает следующим образом: + +1. По второму аргументу (номеру бита) выбирается соответсвующий байт *с начала строки*. +2. Затем в выбранном байте выбирается соответствующий младший бит. #### Примеры ```yql SELECT TestBit(1u, 0), -- true + TestBit('ax', 12) -- true (второй байт, четвертый бит) SetBit(8u, 0); -- 9 ``` diff --git a/yql/essentials/mount/lib/yql/core.yqls b/yql/essentials/mount/lib/yql/core.yqls index 4dbb358d545..8b6e1798804 100755 --- a/yql/essentials/mount/lib/yql/core.yqls +++ b/yql/essentials/mount/lib/yql/core.yqls @@ -13,7 +13,9 @@ (let MaskBit (lambda '(value index) (ShiftLeft (Data (Apply RemoveOptionalType (TypeOf value)) '1) index))) (let TestBitInt (lambda '(value index) (Convert (BitAnd value (Apply MaskBit value index)) 'Bool))) -(let TestBitStr (lambda '(value index) (Apply TestBitInt (ByteAt value (Convert (ShiftRight (Convert index 'Uint64) (Uint8 '8)) 'Uint32)) (BitAnd (Convert index 'Uint8) (Uint8 '7))))) +# XXX: Coalesce drops the Optional type returned by ByteAt, so to +# respect the current signature, the result is wrapped with Just. +(let TestBitStr (lambda '(value index) (Just (Coalesce (Apply TestBitInt (ByteAt value (Convert (ShiftRight (Convert index 'Uint64) (Uint8 '3)) 'Uint32)) (BitAnd (Convert index 'Uint8) (Uint8 '7))) (Bool 'false))))) (let TestBit (lambda '(value index) (block '( (let x (InstanceOf (Apply RemoveOptionalType (TypeOf value)))) (return (IfType x (DataType 'String) diff --git a/yql/essentials/providers/common/proto/gateways_config.proto b/yql/essentials/providers/common/proto/gateways_config.proto index 6624cba8bae..20ee92dd04a 100644 --- a/yql/essentials/providers/common/proto/gateways_config.proto +++ b/yql/essentials/providers/common/proto/gateways_config.proto @@ -679,11 +679,6 @@ message TIcebergCatalog { message THadoop { } - // todo: remove - message THive { - optional string uri = 1; - } - // Hive Iceberg Catalog which is based on a Hive Metastore message THiveMetastore { // Location of a hive metastore @@ -693,10 +688,10 @@ message TIcebergCatalog { oneof payload { THadoop hadoop = 1; - // todo: remove - THive hive = 2; THiveMetastore hive_metastore = 3; } + + reserved 2; } // TIcebergWarehouse represents settings specific to iceberg warehouse diff --git a/yql/essentials/sql/v1/complete/name/fallback/name_service.cpp b/yql/essentials/sql/v1/complete/name/fallback/name_service.cpp deleted file mode 100644 index 0c24281da10..00000000000 --- a/yql/essentials/sql/v1/complete/name/fallback/name_service.cpp +++ /dev/null @@ -1,57 +0,0 @@ -#include "name_service.h" - -namespace NSQLComplete { - - class TDeadlinedNameService: public INameService { - public: - TDeadlinedNameService(INameService::TPtr origin, TDuration timeout) - : Origin_(std::move(origin)) - , Timeout_(std::move(timeout)) - { - } - - TFuture<TNameResponse> Lookup(TNameRequest request) override { - auto future = Origin_->Lookup(std::move(request)); - if (!future.Wait(Timeout_)) { - auto e = NThreading::TFutureException() << "Timeout " << Timeout_; - return NThreading::MakeErrorFuture<TNameResponse>(std::make_exception_ptr(e)); - } - return future; - } - - private: - INameService::TPtr Origin_; - TDuration Timeout_; - }; - - class TFallbackNameService: public INameService { - public: - TFallbackNameService(INameService::TPtr primary, INameService::TPtr standby) - : Primary_(std::move(primary)) - , Standby_(std::move(standby)) - { - } - - TFuture<TNameResponse> Lookup(TNameRequest request) override { - auto future = Primary_->Lookup(request); - future.Wait(); - if (future.HasException()) { - return Standby_->Lookup(request); - } - return future; - } - - private: - INameService::TPtr Primary_; - INameService::TPtr Standby_; - }; - - INameService::TPtr MakeDeadlinedNameService(INameService::TPtr origin, TDuration timeout) { - return INameService::TPtr(new TDeadlinedNameService(std::move(origin), std::move(timeout))); - } - - INameService::TPtr MakeFallbackNameService(INameService::TPtr primary, INameService::TPtr standby) { - return INameService::TPtr(new TFallbackNameService(std::move(primary), std::move(standby))); - } - -} // namespace NSQLComplete diff --git a/yql/essentials/sql/v1/complete/name/fallback/name_service.h b/yql/essentials/sql/v1/complete/name/fallback/name_service.h deleted file mode 100644 index 7077d9ddce1..00000000000 --- a/yql/essentials/sql/v1/complete/name/fallback/name_service.h +++ /dev/null @@ -1,13 +0,0 @@ -#pragma once - -#include <yql/essentials/sql/v1/complete/name/name_service.h> - -namespace NSQLComplete { - - INameService::TPtr MakeDeadlinedNameService( - INameService::TPtr origin, TDuration timeout); - - INameService::TPtr MakeFallbackNameService( - INameService::TPtr primary, INameService::TPtr standby); - -} // namespace NSQLComplete diff --git a/yql/essentials/sql/v1/complete/name/fallback/ya.make b/yql/essentials/sql/v1/complete/name/fallback/ya.make deleted file mode 100644 index 018fa5a654b..00000000000 --- a/yql/essentials/sql/v1/complete/name/fallback/ya.make +++ /dev/null @@ -1,11 +0,0 @@ -LIBRARY() - -SRCS( - name_service.cpp -) - -PEERDIR( - yql/essentials/sql/v1/complete/name -) - -END() diff --git a/yql/essentials/sql/v1/complete/name/name_service.h b/yql/essentials/sql/v1/complete/name/name_service.h index 665068e1520..e0f0d05e1d1 100644 --- a/yql/essentials/sql/v1/complete/name/name_service.h +++ b/yql/essentials/sql/v1/complete/name/name_service.h @@ -1,83 +1,3 @@ #pragma once -#include <yql/essentials/sql/v1/complete/core/statement.h> - -#include <library/cpp/threading/future/core/future.h> - -#include <util/generic/vector.h> -#include <util/generic/string.h> - -namespace NSQLComplete { - - using NThreading::TFuture; - - struct TIndentifier { - TString Indentifier; - }; - - struct TNamespaced { - TString Namespace; - }; - - struct TKeyword { - TString Content; - }; - - struct TPragmaName: TIndentifier { - struct TConstraints: TNamespaced {}; - }; - - struct TTypeName: TIndentifier { - using TConstraints = std::monostate; - }; - - struct TFunctionName: TIndentifier { - struct TConstraints: TNamespaced {}; - }; - - struct THintName: TIndentifier { - struct TConstraints { - EStatementKind Statement; - }; - }; - - using TGenericName = std::variant< - TKeyword, - TPragmaName, - TTypeName, - TFunctionName, - THintName>; - - struct TNameRequest { - TVector<TString> Keywords; - struct { - std::optional<TPragmaName::TConstraints> Pragma; - std::optional<TTypeName::TConstraints> Type; - std::optional<TFunctionName::TConstraints> Function; - std::optional<THintName::TConstraints> Hint; - } Constraints; - TString Prefix = ""; - size_t Limit = 128; - - bool IsEmpty() const { - return Keywords.empty() && - !Constraints.Pragma && - !Constraints.Type && - !Constraints.Function && - !Constraints.Hint; - } - }; - - struct TNameResponse { - TVector<TGenericName> RankedNames; - }; - - class INameService { - public: - using TPtr = THolder<INameService>; - - virtual TFuture<TNameResponse> Lookup(TNameRequest request) = 0; - virtual ~INameService() = default; - }; - -} // namespace NSQLComplete +#include <yql/essentials/sql/v1/complete/name/service/name_service.h> diff --git a/yql/essentials/sql/v1/complete/name/object/schema_gateway.h b/yql/essentials/sql/v1/complete/name/object/schema_gateway.h index c09883d8bb7..37fff8571e0 100644 --- a/yql/essentials/sql/v1/complete/name/object/schema_gateway.h +++ b/yql/essentials/sql/v1/complete/name/object/schema_gateway.h @@ -38,12 +38,12 @@ namespace NSQLComplete { TVector<TFolderEntry> Entries; }; - class ISchemaGateway { + class ISchemaGateway: public TThrRefBase { public: - using TPtr = THolder<ISchemaGateway>; + using TPtr = TIntrusivePtr<ISchemaGateway>; virtual ~ISchemaGateway() = default; - virtual NThreading::TFuture<TListResponse> List(const TListRequest& request) = 0; + virtual NThreading::TFuture<TListResponse> List(const TListRequest& request) const = 0; }; } // namespace NSQLComplete diff --git a/yql/essentials/sql/v1/complete/name/object/static/schema_gateway.cpp b/yql/essentials/sql/v1/complete/name/object/static/schema_gateway.cpp index 974c1ee5922..e325b908f7a 100644 --- a/yql/essentials/sql/v1/complete/name/object/static/schema_gateway.cpp +++ b/yql/essentials/sql/v1/complete/name/object/static/schema_gateway.cpp @@ -21,10 +21,14 @@ namespace NSQLComplete { } } - NThreading::TFuture<TListResponse> List(const TListRequest& request) override { + NThreading::TFuture<TListResponse> List(const TListRequest& request) const override { auto [path, prefix] = ParsePath(request.Path); - TVector<TFolderEntry> entries = Data_[path]; + TVector<TFolderEntry> entries; + if (const auto* data = Data_.FindPtr(path)) { + entries = *data; + } + EraseIf(entries, [prefix = ToLowerUTF8(prefix)](const TFolderEntry& entry) { return !entry.Name.StartsWith(prefix); }); @@ -62,7 +66,7 @@ namespace NSQLComplete { } // namespace ISchemaGateway::TPtr MakeStaticSchemaGateway(THashMap<TString, TVector<TFolderEntry>> fs) { - return MakeHolder<TSchemaGateway>(std::move(fs)); + return MakeIntrusive<TSchemaGateway>(std::move(fs)); } } // namespace NSQLComplete diff --git a/yql/essentials/sql/v1/complete/name/service/name_service.h b/yql/essentials/sql/v1/complete/name/service/name_service.h new file mode 100644 index 00000000000..58cb1485c3b --- /dev/null +++ b/yql/essentials/sql/v1/complete/name/service/name_service.h @@ -0,0 +1,83 @@ +#pragma once + +#include <yql/essentials/sql/v1/complete/core/statement.h> + +#include <library/cpp/threading/future/core/future.h> + +#include <util/generic/vector.h> +#include <util/generic/string.h> + +namespace NSQLComplete { + + using NThreading::TFuture; + + struct TIndentifier { + TString Indentifier; + }; + + struct TNamespaced { + TString Namespace; + }; + + struct TKeyword { + TString Content; + }; + + struct TPragmaName: TIndentifier { + struct TConstraints: TNamespaced {}; + }; + + struct TTypeName: TIndentifier { + using TConstraints = std::monostate; + }; + + struct TFunctionName: TIndentifier { + struct TConstraints: TNamespaced {}; + }; + + struct THintName: TIndentifier { + struct TConstraints { + EStatementKind Statement; + }; + }; + + using TGenericName = std::variant< + TKeyword, + TPragmaName, + TTypeName, + TFunctionName, + THintName>; + + struct TNameRequest { + TVector<TString> Keywords; + struct { + std::optional<TPragmaName::TConstraints> Pragma; + std::optional<TTypeName::TConstraints> Type; + std::optional<TFunctionName::TConstraints> Function; + std::optional<THintName::TConstraints> Hint; + } Constraints; + TString Prefix = ""; + size_t Limit = 128; + + bool IsEmpty() const { + return Keywords.empty() && + !Constraints.Pragma && + !Constraints.Type && + !Constraints.Function && + !Constraints.Hint; + } + }; + + struct TNameResponse { + TVector<TGenericName> RankedNames; + }; + + class INameService: public TThrRefBase { + public: + using TPtr = TIntrusivePtr<INameService>; + + virtual TFuture<TNameResponse> Lookup(TNameRequest request) const = 0; + virtual ~INameService() = default; + }; + +} // namespace NSQLComplete diff --git a/yql/essentials/sql/v1/complete/name/static/frequency.cpp b/yql/essentials/sql/v1/complete/name/service/static/frequency.cpp index 456a9feebcd..456a9feebcd 100644 --- a/yql/essentials/sql/v1/complete/name/static/frequency.cpp +++ b/yql/essentials/sql/v1/complete/name/service/static/frequency.cpp diff --git a/yql/essentials/sql/v1/complete/name/static/frequency.h b/yql/essentials/sql/v1/complete/name/service/static/frequency.h index 0a5dc36dfbd..0a5dc36dfbd 100644 --- a/yql/essentials/sql/v1/complete/name/static/frequency.h +++ b/yql/essentials/sql/v1/complete/name/service/static/frequency.h diff --git a/yql/essentials/sql/v1/complete/name/static/frequency_ut.cpp b/yql/essentials/sql/v1/complete/name/service/static/frequency_ut.cpp index c630f0ca987..c630f0ca987 100644 --- a/yql/essentials/sql/v1/complete/name/static/frequency_ut.cpp +++ b/yql/essentials/sql/v1/complete/name/service/static/frequency_ut.cpp diff --git a/yql/essentials/sql/v1/complete/name/static/json_name_set.cpp b/yql/essentials/sql/v1/complete/name/service/static/json_name_set.cpp index a30f62aac20..a30f62aac20 100644 --- a/yql/essentials/sql/v1/complete/name/static/json_name_set.cpp +++ b/yql/essentials/sql/v1/complete/name/service/static/json_name_set.cpp diff --git a/yql/essentials/sql/v1/complete/name/static/name_index.cpp b/yql/essentials/sql/v1/complete/name/service/static/name_index.cpp index bfbf6af7fb4..bfbf6af7fb4 100644 --- a/yql/essentials/sql/v1/complete/name/static/name_index.cpp +++ b/yql/essentials/sql/v1/complete/name/service/static/name_index.cpp diff --git a/yql/essentials/sql/v1/complete/name/static/name_index.h b/yql/essentials/sql/v1/complete/name/service/static/name_index.h index 77b50238846..77b50238846 100644 --- a/yql/essentials/sql/v1/complete/name/static/name_index.h +++ b/yql/essentials/sql/v1/complete/name/service/static/name_index.h diff --git a/yql/essentials/sql/v1/complete/name/static/name_service.cpp b/yql/essentials/sql/v1/complete/name/service/static/name_service.cpp index 201f096bd9e..7b3c3f1bd1a 100644 --- a/yql/essentials/sql/v1/complete/name/static/name_service.cpp +++ b/yql/essentials/sql/v1/complete/name/service/static/name_service.cpp @@ -87,7 +87,7 @@ namespace NSQLComplete { { } - TFuture<TNameResponse> Lookup(TNameRequest request) override { + TFuture<TNameResponse> Lookup(TNameRequest request) const override { TNameResponse response; Sort(request.Keywords, NoCaseCompare); @@ -115,9 +115,11 @@ namespace NSQLComplete { if (request.Constraints.Hint) { const auto stmt = request.Constraints.Hint->Statement; - AppendAs<THintName>( - response.RankedNames, - FilteredByPrefix(request.Prefix, Hints_[stmt])); + if (const auto* hints = Hints_.FindPtr(stmt)) { + AppendAs<THintName>( + response.RankedNames, + FilteredByPrefix(request.Prefix, *hints)); + } } Ranking_->CropToSortedPrefix(response.RankedNames, request.Limit); @@ -142,7 +144,7 @@ namespace NSQLComplete { } INameService::TPtr MakeStaticNameService(NameSet names, IRanking::TPtr ranking) { - return INameService::TPtr(new TStaticNameService(std::move(names), std::move(ranking))); + return MakeIntrusive<TStaticNameService>(std::move(names), std::move(ranking)); } } // namespace NSQLComplete diff --git a/yql/essentials/sql/v1/complete/name/service/static/name_service.h b/yql/essentials/sql/v1/complete/name/service/static/name_service.h new file mode 100644 index 00000000000..b0ca2b2ff81 --- /dev/null +++ b/yql/essentials/sql/v1/complete/name/service/static/name_service.h @@ -0,0 +1,22 @@ +#pragma once + +#include "ranking.h" + +#include <yql/essentials/sql/v1/complete/name/service/name_service.h> + +namespace NSQLComplete { + + struct NameSet { + TVector<TString> Pragmas; + TVector<TString> Types; + TVector<TString> Functions; + THashMap<EStatementKind, TVector<TString>> Hints; + }; + + NameSet MakeDefaultNameSet(); + + INameService::TPtr MakeStaticNameService(); + + INameService::TPtr MakeStaticNameService(NameSet names, IRanking::TPtr ranking); + +} // namespace NSQLComplete diff --git a/yql/essentials/sql/v1/complete/name/static/ranking.cpp b/yql/essentials/sql/v1/complete/name/service/static/ranking.cpp index aa08bd7a639..6ef3817defb 100644 --- a/yql/essentials/sql/v1/complete/name/static/ranking.cpp +++ b/yql/essentials/sql/v1/complete/name/service/static/ranking.cpp @@ -2,7 +2,7 @@ #include "frequency.h" -#include <yql/essentials/sql/v1/complete/name/name_service.h> +#include <yql/essentials/sql/v1/complete/name/service/name_service.h> #include <yql/essentials/core/sql_types/normalize_name.h> @@ -23,7 +23,7 @@ namespace NSQLComplete { { } - void CropToSortedPrefix(TVector<TGenericName>& names, size_t limit) override { + void CropToSortedPrefix(TVector<TGenericName>& names, size_t limit) const override { limit = std::min(limit, names.size()); TVector<TRow> rows; @@ -115,11 +115,11 @@ namespace NSQLComplete { }; IRanking::TPtr MakeDefaultRanking() { - return IRanking::TPtr(new TRanking(LoadFrequencyData())); + return MakeIntrusive<TRanking>(LoadFrequencyData()); } IRanking::TPtr MakeDefaultRanking(TFrequencyData frequency) { - return IRanking::TPtr(new TRanking(frequency)); + return MakeIntrusive<TRanking>(frequency); } } // namespace NSQLComplete diff --git a/yql/essentials/sql/v1/complete/name/service/static/ranking.h b/yql/essentials/sql/v1/complete/name/service/static/ranking.h new file mode 100644 index 00000000000..bc1f57f9a51 --- /dev/null +++ b/yql/essentials/sql/v1/complete/name/service/static/ranking.h @@ -0,0 +1,23 @@ +#pragma once + +#include "frequency.h" + +#include <yql/essentials/sql/v1/complete/name/service/name_service.h> + +#include <util/generic/hash.h> + +namespace NSQLComplete { + + class IRanking: public TThrRefBase { + public: + using TPtr = TIntrusivePtr<IRanking>; + + virtual void CropToSortedPrefix(TVector<TGenericName>& names, size_t limit) const = 0; + virtual ~IRanking() = default; + }; + + IRanking::TPtr MakeDefaultRanking(); + + IRanking::TPtr MakeDefaultRanking(TFrequencyData frequency); + +} // namespace NSQLComplete diff --git a/yql/essentials/sql/v1/complete/name/service/static/ut/ya.make b/yql/essentials/sql/v1/complete/name/service/static/ut/ya.make new file mode 100644 index 00000000000..a486db3cd0d --- /dev/null +++ b/yql/essentials/sql/v1/complete/name/service/static/ut/ya.make @@ -0,0 +1,7 @@ +UNITTEST_FOR(yql/essentials/sql/v1/complete/name/service/static) + +SRCS( + frequency_ut.cpp +) + +END() diff --git a/yql/essentials/sql/v1/complete/name/service/static/ya.make b/yql/essentials/sql/v1/complete/name/service/static/ya.make new file mode 100644 index 00000000000..95417316688 --- /dev/null +++ b/yql/essentials/sql/v1/complete/name/service/static/ya.make @@ -0,0 +1,30 @@ +LIBRARY() + +SRCS( + frequency.cpp + json_name_set.cpp + name_index.cpp + name_service.cpp + ranking.cpp +) + +PEERDIR( + yql/essentials/core/sql_types + yql/essentials/sql/v1/complete/name/service + yql/essentials/sql/v1/complete/text +) + +RESOURCE( + yql/essentials/data/language/pragmas_opensource.json pragmas_opensource.json + yql/essentials/data/language/types.json types.json + yql/essentials/data/language/sql_functions.json sql_functions.json + yql/essentials/data/language/udfs_basic.json udfs_basic.json + yql/essentials/data/language/statements_opensource.json statements_opensource.json + yql/essentials/data/language/rules_corr_basic.json rules_corr_basic.json +) + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/yql/essentials/sql/v1/complete/name/service/ya.make b/yql/essentials/sql/v1/complete/name/service/ya.make new file mode 100644 index 00000000000..7ab6f698b17 --- /dev/null +++ b/yql/essentials/sql/v1/complete/name/service/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +PEERDIR( + yql/essentials/sql/v1/complete/core +) + +END() + +RECURSE( + static +) diff --git a/yql/essentials/sql/v1/complete/name/static/name_service.h b/yql/essentials/sql/v1/complete/name/static/name_service.h index d5459faa1d7..1a77cc5ea7f 100644 --- a/yql/essentials/sql/v1/complete/name/static/name_service.h +++ b/yql/essentials/sql/v1/complete/name/static/name_service.h @@ -1,22 +1,3 @@ #pragma once -#include "ranking.h" - -#include <yql/essentials/sql/v1/complete/name/name_service.h> - -namespace NSQLComplete { - - struct NameSet { - TVector<TString> Pragmas; - TVector<TString> Types; - TVector<TString> Functions; - THashMap<EStatementKind, TVector<TString>> Hints; - }; - - NameSet MakeDefaultNameSet(); - - INameService::TPtr MakeStaticNameService(); - - INameService::TPtr MakeStaticNameService(NameSet names, IRanking::TPtr ranking); - -} // namespace NSQLComplete +#include <yql/essentials/sql/v1/complete/name/service/static/name_service.h> diff --git a/yql/essentials/sql/v1/complete/name/static/ranking.h b/yql/essentials/sql/v1/complete/name/static/ranking.h index e24607eded6..61e7d3955a5 100644 --- a/yql/essentials/sql/v1/complete/name/static/ranking.h +++ b/yql/essentials/sql/v1/complete/name/static/ranking.h @@ -1,23 +1,3 @@ #pragma once -#include "frequency.h" - -#include <yql/essentials/sql/v1/complete/name/name_service.h> - -#include <util/generic/hash.h> - -namespace NSQLComplete { - - class IRanking { - public: - using TPtr = THolder<IRanking>; - - virtual void CropToSortedPrefix(TVector<TGenericName>& names, size_t limit) = 0; - virtual ~IRanking() = default; - }; - - IRanking::TPtr MakeDefaultRanking(); - - IRanking::TPtr MakeDefaultRanking(TFrequencyData frequency); - -} // namespace NSQLComplete +#include <yql/essentials/sql/v1/complete/name/service/static/ranking.h> diff --git a/yql/essentials/sql/v1/complete/name/static/ut/ya.make b/yql/essentials/sql/v1/complete/name/static/ut/ya.make deleted file mode 100644 index 60963b761b0..00000000000 --- a/yql/essentials/sql/v1/complete/name/static/ut/ya.make +++ /dev/null @@ -1,7 +0,0 @@ -UNITTEST_FOR(yql/essentials/sql/v1/complete/name/static) - -SRCS( - frequency_ut.cpp -) - -END() diff --git a/yql/essentials/sql/v1/complete/name/static/ya.make b/yql/essentials/sql/v1/complete/name/static/ya.make index 1315d7475da..5202c31ea9e 100644 --- a/yql/essentials/sql/v1/complete/name/static/ya.make +++ b/yql/essentials/sql/v1/complete/name/static/ya.make @@ -1,30 +1,8 @@ LIBRARY() -SRCS( - frequency.cpp - json_name_set.cpp - name_index.cpp - name_service.cpp - ranking.cpp -) - PEERDIR( yql/essentials/core/sql_types - yql/essentials/sql/v1/complete/name - yql/essentials/sql/v1/complete/text -) - -RESOURCE( - yql/essentials/data/language/pragmas_opensource.json pragmas_opensource.json - yql/essentials/data/language/types.json types.json - yql/essentials/data/language/sql_functions.json sql_functions.json - yql/essentials/data/language/udfs_basic.json udfs_basic.json - yql/essentials/data/language/statements_opensource.json statements_opensource.json - yql/essentials/data/language/rules_corr_basic.json rules_corr_basic.json + yql/essentials/sql/v1/complete/name/service/static ) END() - -RECURSE_FOR_TESTS( - ut -) diff --git a/yql/essentials/sql/v1/complete/name/ya.make b/yql/essentials/sql/v1/complete/name/ya.make index 9b64891e4d6..8eb198ffa3d 100644 --- a/yql/essentials/sql/v1/complete/name/ya.make +++ b/yql/essentials/sql/v1/complete/name/ya.make @@ -2,12 +2,12 @@ LIBRARY() PEERDIR( yql/essentials/sql/v1/complete/core + yql/essentials/sql/v1/complete/name/service ) END() RECURSE( - fallback object - static + service ) diff --git a/yql/essentials/sql/v1/complete/sql_complete.cpp b/yql/essentials/sql/v1/complete/sql_complete.cpp index 0789aa4bfc0..0ec34e212db 100644 --- a/yql/essentials/sql/v1/complete/sql_complete.cpp +++ b/yql/essentials/sql/v1/complete/sql_complete.cpp @@ -1,7 +1,7 @@ #include "sql_complete.h" #include <yql/essentials/sql/v1/complete/text/word.h> -#include <yql/essentials/sql/v1/complete/name/static/name_service.h> +#include <yql/essentials/sql/v1/complete/name/service/static/name_service.h> #include <yql/essentials/sql/v1/complete/syntax/local.h> #include <yql/essentials/sql/v1/complete/syntax/format.h> @@ -22,7 +22,11 @@ namespace NSQLComplete { { } - TCompletion Complete(TCompletionInput input) { + TCompletion Complete(TCompletionInput input) override { + return CompleteAsync(std::move(input)).ExtractValueSync(); + } + + virtual NThreading::TFuture<TCompletion> CompleteAsync(TCompletionInput input) override { if ( input.CursorPosition < input.Text.length() && IsUTF8ContinuationByte(input.Text.at(input.CursorPosition)) || @@ -37,21 +41,24 @@ namespace NSQLComplete { TStringBuf prefix = input.Text.Head(input.CursorPosition); TCompletedToken completedToken = GetCompletedToken(prefix); - return { - .CompletedToken = std::move(completedToken), - .Candidates = GetCanidates(std::move(context), completedToken), - }; + return GetCandidates(std::move(context), completedToken) + .Apply([completedToken](NThreading::TFuture<TVector<TCandidate>> f) { + return TCompletion{ + .CompletedToken = std::move(completedToken), + .Candidates = f.ExtractValue(), + }; + }); } private: - TCompletedToken GetCompletedToken(TStringBuf prefix) { + TCompletedToken GetCompletedToken(TStringBuf prefix) const { return { .Content = LastWord(prefix), .SourcePosition = LastWordIndex(prefix), }; } - TVector<TCandidate> GetCanidates(TLocalSyntaxContext context, const TCompletedToken& prefix) { + NThreading::TFuture<TVector<TCandidate>> GetCandidates(TLocalSyntaxContext context, const TCompletedToken& prefix) const { TNameRequest request = { .Prefix = TString(prefix.Content), .Limit = Configuration.Limit, @@ -84,16 +91,17 @@ namespace NSQLComplete { } if (request.IsEmpty()) { - return {}; + return NThreading::MakeFuture<TVector<TCandidate>>({}); } - // User should prepare a robust INameService - TNameResponse response = Names->Lookup(std::move(request)).ExtractValueSync(); - - return Convert(std::move(response.RankedNames), std::move(context.Keywords)); + return Names->Lookup(std::move(request)) + .Apply([keywords = std::move(context.Keywords)](NThreading::TFuture<TNameResponse> f) { + TNameResponse response = f.ExtractValue(); + return Convert(std::move(response.RankedNames), std::move(keywords)); + }); } - TVector<TCandidate> Convert(TVector<TGenericName> names, TLocalSyntaxContext::TKeywords keywords) { + static TVector<TCandidate> Convert(TVector<TGenericName> names, TLocalSyntaxContext::TKeywords keywords) { TVector<TCandidate> candidates; for (auto& name : names) { candidates.emplace_back(std::visit([&](auto&& name) -> TCandidate { @@ -130,8 +138,8 @@ namespace NSQLComplete { TLexerSupplier lexer, INameService::TPtr names, ISqlCompletionEngine::TConfiguration configuration) { - return ISqlCompletionEngine::TPtr( - new TSqlCompletionEngine(lexer, std::move(names), std::move(configuration))); + return MakeHolder<TSqlCompletionEngine>( + lexer, std::move(names), std::move(configuration)); } } // namespace NSQLComplete diff --git a/yql/essentials/sql/v1/complete/sql_complete.h b/yql/essentials/sql/v1/complete/sql_complete.h index faea272590a..e74f3646ba9 100644 --- a/yql/essentials/sql/v1/complete/sql_complete.h +++ b/yql/essentials/sql/v1/complete/sql_complete.h @@ -1,9 +1,11 @@ #pragma once #include <yql/essentials/sql/v1/complete/core/input.h> -#include <yql/essentials/sql/v1/complete/name/name_service.h> +#include <yql/essentials/sql/v1/complete/name/service/name_service.h> #include <yql/essentials/sql/v1/lexer/lexer.h> +#include <library/cpp/threading/future/future.h> + #include <util/generic/string.h> #include <util/generic/vector.h> @@ -42,8 +44,9 @@ namespace NSQLComplete { size_t Limit = 256; }; - virtual TCompletion Complete(TCompletionInput input) = 0; virtual ~ISqlCompletionEngine() = default; + virtual TCompletion Complete(TCompletionInput input) = 0; // TODO(YQL-19747): migrate YDB CLI to CompleteAsync + virtual NThreading::TFuture<TCompletion> CompleteAsync(TCompletionInput input) = 0; }; using TLexerSupplier = std::function<NSQLTranslation::ILexer::TPtr(bool ansi)>; diff --git a/yql/essentials/sql/v1/complete/sql_complete_ut.cpp b/yql/essentials/sql/v1/complete/sql_complete_ut.cpp index 5ad2f87e867..b614f561492 100644 --- a/yql/essentials/sql/v1/complete/sql_complete_ut.cpp +++ b/yql/essentials/sql/v1/complete/sql_complete_ut.cpp @@ -1,9 +1,8 @@ #include "sql_complete.h" -#include <yql/essentials/sql/v1/complete/name/fallback/name_service.h> -#include <yql/essentials/sql/v1/complete/name/static/frequency.h> -#include <yql/essentials/sql/v1/complete/name/static/name_service.h> -#include <yql/essentials/sql/v1/complete/name/static/ranking.h> +#include <yql/essentials/sql/v1/complete/name/service/static/frequency.h> +#include <yql/essentials/sql/v1/complete/name/service/static/name_service.h> +#include <yql/essentials/sql/v1/complete/name/service/static/ranking.h> #include <yql/essentials/sql/v1/lexer/lexer.h> #include <yql/essentials/sql/v1/lexer/antlr4_pure/lexer.h> @@ -24,20 +23,12 @@ public: class TFailingNameService: public INameService { public: - TFuture<TNameResponse> Lookup(TNameRequest) override { + TFuture<TNameResponse> Lookup(TNameRequest) const override { auto e = std::make_exception_ptr(TDummyException()); return NThreading::MakeErrorFuture<TNameResponse>(e); } }; -class TSilentNameService: public INameService { -public: - TFuture<TNameResponse> Lookup(TNameRequest) override { - auto promise = NThreading::NewPromise<TNameResponse>(); - return promise.GetFuture(); - } -}; - Y_UNIT_TEST_SUITE(SqlCompleteTests) { using ECandidateKind::FunctionName; using ECandidateKind::HintName; @@ -91,7 +82,7 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) { } TVector<TCandidate> Complete(ISqlCompletionEngine::TPtr& engine, TString sharped) { - return engine->Complete(SharpedInput(sharped)).Candidates; + return engine->CompleteAsync(SharpedInput(sharped)).GetValueSync().Candidates; } TVector<TCandidate> CompleteTop(size_t limit, ISqlCompletionEngine::TPtr& engine, TString sharped) { @@ -305,28 +296,28 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) { TVector<TCandidate> expected = { {Keyword, "ANSI"}, {PragmaName, "yson.CastToString"}}; - auto completion = engine->Complete({"PRAGMA "}); + auto completion = engine->CompleteAsync({"PRAGMA "}).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL(completion.Candidates, expected); UNIT_ASSERT_VALUES_EQUAL(completion.CompletedToken.Content, ""); } { TVector<TCandidate> expected = { {PragmaName, "yson.CastToString"}}; - auto completion = engine->Complete({"PRAGMA yson"}); + auto completion = engine->CompleteAsync({"PRAGMA yson"}).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL(completion.Candidates, expected); UNIT_ASSERT_VALUES_EQUAL(completion.CompletedToken.Content, "yson"); } { TVector<TCandidate> expected = { {PragmaName, "CastToString"}}; - auto completion = engine->Complete({"PRAGMA yson."}); + auto completion = engine->CompleteAsync({"PRAGMA yson."}).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL(completion.Candidates, expected); UNIT_ASSERT_VALUES_EQUAL(completion.CompletedToken.Content, ""); } { TVector<TCandidate> expected = { {PragmaName, "CastToString"}}; - auto completion = engine->Complete({"PRAGMA yson.cast"}); + auto completion = engine->CompleteAsync({"PRAGMA yson.cast"}).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL(completion.Candidates, expected); UNIT_ASSERT_VALUES_EQUAL(completion.CompletedToken.Content, "cast"); } @@ -489,7 +480,7 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) { TVector<TCandidate> expected = { {FunctionName, "DateTime::Split("}, }; - auto completion = engine->Complete({"SELECT Date"}); + auto completion = engine->CompleteAsync({"SELECT Date"}).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL(completion.Candidates, expected); UNIT_ASSERT_VALUES_EQUAL(completion.CompletedToken.Content, "Date"); } @@ -497,14 +488,14 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) { TVector<TCandidate> expected = { {FunctionName, "Split("}, }; - auto completion = engine->Complete({"SELECT DateTime:"}); + auto completion = engine->CompleteAsync({"SELECT DateTime:"}).GetValueSync(); UNIT_ASSERT(completion.Candidates.empty()); } { TVector<TCandidate> expected = { {FunctionName, "Split("}, }; - auto completion = engine->Complete({"SELECT DateTime::"}); + auto completion = engine->CompleteAsync({"SELECT DateTime::"}).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL(completion.Candidates, expected); UNIT_ASSERT_VALUES_EQUAL(completion.CompletedToken.Content, ""); } @@ -512,7 +503,7 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) { TVector<TCandidate> expected = { {FunctionName, "Split("}, }; - auto completion = engine->Complete({"SELECT DateTime::s"}); + auto completion = engine->CompleteAsync({"SELECT DateTime::s"}).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL(completion.Candidates, expected); UNIT_ASSERT_VALUES_EQUAL(completion.CompletedToken.Content, "s"); } @@ -606,7 +597,7 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) { for (std::size_t size = 0; size <= queryUtf16.size(); ++size) { const TWtringBuf prefixUtf16(queryUtf16, 0, size); - auto completion = engine->Complete({TString::FromUtf16(prefixUtf16)}); + TCompletion completion = engine->CompleteAsync({TString::FromUtf16(prefixUtf16)}).GetValueSync(); Y_DO_NOT_OPTIMIZE_AWAY(completion); } } @@ -630,10 +621,11 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) { wchar32 rune; while (ptr < end) { Y_ENSURE(ReadUTF8CharAndAdvance(rune, ptr, end) == RECODE_OK); - auto completion = engine->Complete({ - .Text = query, - .CursorPosition = static_cast<size_t>(std::distance(begin, ptr)), - }); + TCompletion completion = engine->CompleteAsync({ + .Text = query, + .CursorPosition = static_cast<size_t>(std::distance(begin, ptr)), + }) + .GetValueSync(); Y_DO_NOT_OPTIMIZE_AWAY(completion); } } @@ -665,15 +657,15 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) { Y_UNIT_TEST(InvalidCursorPosition) { auto engine = MakeSqlCompletionEngineUT(); - UNIT_ASSERT_NO_EXCEPTION(engine->Complete({"", 0})); - UNIT_ASSERT_EXCEPTION(engine->Complete({"", 1}), yexception); + UNIT_ASSERT_NO_EXCEPTION(engine->CompleteAsync({"", 0}).GetValueSync()); + UNIT_ASSERT_EXCEPTION(engine->CompleteAsync({"", 1}).GetValueSync(), yexception); - UNIT_ASSERT_NO_EXCEPTION(engine->Complete({"s", 0})); - UNIT_ASSERT_NO_EXCEPTION(engine->Complete({"s", 1})); + UNIT_ASSERT_NO_EXCEPTION(engine->CompleteAsync({"s", 0}).GetValueSync()); + UNIT_ASSERT_NO_EXCEPTION(engine->CompleteAsync({"s", 1}).GetValueSync()); - UNIT_ASSERT_NO_EXCEPTION(engine->Complete({"ы", 0})); - UNIT_ASSERT_EXCEPTION(engine->Complete({"ы", 1}), yexception); - UNIT_ASSERT_NO_EXCEPTION(engine->Complete({"ы", 2})); + UNIT_ASSERT_NO_EXCEPTION(engine->CompleteAsync({"ы", 0}).GetValueSync()); + UNIT_ASSERT_EXCEPTION(engine->CompleteAsync({"ы", 1}).GetValueSync(), yexception); + UNIT_ASSERT_NO_EXCEPTION(engine->CompleteAsync({"ы", 2}).GetValueSync()); } Y_UNIT_TEST(DefaultNameService) { @@ -711,36 +703,13 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) { } Y_UNIT_TEST(OnFailingNameService) { - auto service = MakeHolder<TFailingNameService>(); + auto service = MakeIntrusive<TFailingNameService>(); auto engine = MakeSqlCompletionEngine(MakePureLexerSupplier(), std::move(service)); UNIT_ASSERT_EXCEPTION(Complete(engine, ""), TDummyException); UNIT_ASSERT_EXCEPTION(Complete(engine, "SELECT OPTIONAL<U"), TDummyException); UNIT_ASSERT_EXCEPTION(Complete(engine, "SELECT CAST (1 AS ").size(), TDummyException); } - Y_UNIT_TEST(OnSilentNameService) { - auto silent = MakeHolder<TSilentNameService>(); - auto deadlined = MakeDeadlinedNameService(std::move(silent), TDuration::MilliSeconds(1)); - - auto engine = MakeSqlCompletionEngine(MakePureLexerSupplier(), std::move(deadlined)); - UNIT_ASSERT_EXCEPTION(Complete(engine, "SELECT OPTIONAL<U"), NThreading::TFutureException); - UNIT_ASSERT_EXCEPTION(Complete(engine, "SELECT OPTIONAL<"), NThreading::TFutureException); - } - - Y_UNIT_TEST(OnFallbackNameService) { - auto silent = MakeHolder<TSilentNameService>(); - auto primary = MakeDeadlinedNameService(std::move(silent), TDuration::MilliSeconds(1)); - - auto standby = MakeStaticNameService(MakeDefaultNameSet(), MakeDefaultRanking({})); - - auto fallback = MakeFallbackNameService(std::move(primary), std::move(standby)); - - auto engine = MakeSqlCompletionEngine(MakePureLexerSupplier(), std::move(fallback)); - UNIT_ASSERT_GE(Complete(engine, "SELECT CAST (1 AS U").size(), 6); - UNIT_ASSERT_GE(Complete(engine, "SELECT CAST (1 AS ").size(), 47); - UNIT_ASSERT_GE(Complete(engine, "SELECT ").size(), 55); - } - Y_UNIT_TEST(NameNormalization) { auto set = MakeDefaultNameSet(); auto service = MakeStaticNameService(std::move(set), MakeDefaultRanking()); diff --git a/yql/essentials/sql/v1/complete/syntax/local.cpp b/yql/essentials/sql/v1/complete/syntax/local.cpp index 5d6ca2f9760..a1e41ee50a2 100644 --- a/yql/essentials/sql/v1/complete/syntax/local.cpp +++ b/yql/essentials/sql/v1/complete/syntax/local.cpp @@ -233,7 +233,7 @@ namespace NSQLComplete { }; ILocalSyntaxAnalysis::TPtr MakeLocalSyntaxAnalysis(TLexerSupplier lexer) { - return TLocalSyntaxAnalysis::TPtr(new TLocalSyntaxAnalysis(lexer)); + return MakeHolder<TLocalSyntaxAnalysis>(lexer); } } // namespace NSQLComplete diff --git a/yql/essentials/sql/v1/complete/ut/ya.make b/yql/essentials/sql/v1/complete/ut/ya.make index 0a5d13dec46..4c50124cf7c 100644 --- a/yql/essentials/sql/v1/complete/ut/ya.make +++ b/yql/essentials/sql/v1/complete/ut/ya.make @@ -5,7 +5,6 @@ SRCS( ) PEERDIR( - yql/essentials/sql/v1/complete/name/fallback yql/essentials/sql/v1/lexer/antlr4_pure yql/essentials/sql/v1/lexer/antlr4_pure_ansi ) diff --git a/yql/essentials/sql/v1/complete/ya.make b/yql/essentials/sql/v1/complete/ya.make index 59c15502af5..4e2d02b7a97 100644 --- a/yql/essentials/sql/v1/complete/ya.make +++ b/yql/essentials/sql/v1/complete/ya.make @@ -7,8 +7,8 @@ SRCS( PEERDIR( yql/essentials/sql/v1/lexer yql/essentials/sql/v1/complete/antlr4 - yql/essentials/sql/v1/complete/name - yql/essentials/sql/v1/complete/name/static + yql/essentials/sql/v1/complete/name/service + yql/essentials/sql/v1/complete/name/service/static yql/essentials/sql/v1/complete/syntax yql/essentials/sql/v1/complete/text ) diff --git a/yql/essentials/sql/v1/context.cpp b/yql/essentials/sql/v1/context.cpp index ae313a171e1..9decaefd4e9 100644 --- a/yql/essentials/sql/v1/context.cpp +++ b/yql/essentials/sql/v1/context.cpp @@ -71,6 +71,7 @@ THashMap<TStringBuf, TPragmaField> CTX_PRAGMA_FIELDS = { {"DistinctOverKeys", &TContext::DistinctOverKeys}, {"GroupByExprAfterWhere", &TContext::GroupByExprAfterWhere}, {"FailOnGroupByExprOverride", &TContext::FailOnGroupByExprOverride}, + {"OptimizeSimpleILIKE", &TContext::OptimizeSimpleIlike} }; typedef TMaybe<bool> TContext::*TPragmaMaybeField; diff --git a/yql/essentials/sql/v1/context.h b/yql/essentials/sql/v1/context.h index 14314c0a314..e45a0495a2c 100644 --- a/yql/essentials/sql/v1/context.h +++ b/yql/essentials/sql/v1/context.h @@ -376,6 +376,7 @@ namespace NSQLTranslationV1 { bool GroupByExprAfterWhere = false; bool FailOnGroupByExprOverride = false; bool EmitUnionMerge = false; + bool OptimizeSimpleIlike = false; TVector<size_t> ForAllStatementsParts; TMaybe<TString> Engine; diff --git a/yql/essentials/sql/v1/node.cpp b/yql/essentials/sql/v1/node.cpp index e5c7d353795..fe5d4dff7b0 100644 --- a/yql/essentials/sql/v1/node.cpp +++ b/yql/essentials/sql/v1/node.cpp @@ -3179,7 +3179,10 @@ TNodePtr BuildBinaryOp(TContext& ctx, TPosition pos, const TString& opName, TNod return nullptr; } - static const THashSet<TStringBuf> nullSafeOps = {"IsDistinctFrom", "IsNotDistinctFrom"}; + static const THashSet<TStringBuf> nullSafeOps = { + "IsDistinctFrom", "IsNotDistinctFrom", + "EqualsIgnoreCase", "StartsWithIgnoreCase", "EndsWithIgnoreCase", "StringContainsIgnoreCase" + }; if (!nullSafeOps.contains(opName)) { const bool bothArgNull = a->IsNull() && b->IsNull(); const bool oneArgNull = a->IsNull() || b->IsNull(); diff --git a/yql/essentials/sql/v1/sql_expression.cpp b/yql/essentials/sql/v1/sql_expression.cpp index 3eb1904cc59..4b7e31fdaa3 100644 --- a/yql/essentials/sql/v1/sql_expression.cpp +++ b/yql/essentials/sql/v1/sql_expression.cpp @@ -1797,19 +1797,25 @@ TNodePtr TSqlExpression::SubExpr(const TRule_xor_subexpr& node, const TTrailingQ return nullptr; } - if (opName == "like" || mayIgnoreCase) { + if ((opName == "like") || mayIgnoreCase || Ctx.OptimizeSimpleIlike) { // TODO: expand LIKE in optimizers - we can analyze argument types there + const bool useIgnoreCaseOp = (opName == "ilike") && !mayIgnoreCase; + const auto& equalOp = useIgnoreCaseOp ? "EqualsIgnoreCase" : "=="; + const auto& startsWithOp = useIgnoreCaseOp ? "StartsWithIgnoreCase" : "StartsWith"; + const auto& endsWithOp = useIgnoreCaseOp ? "EndsWithIgnoreCase" : "EndsWith"; + const auto& containsOp = useIgnoreCaseOp ? "StringContainsIgnoreCase" : "StringContains"; + YQL_ENSURE(!components.empty()); const auto& first = components.front(); if (components.size() == 1 && first.IsSimple) { // no '%'s and '_'s in pattern YQL_ENSURE(first.Prefix == first.Suffix); - isMatch = BuildBinaryOp(Ctx, pos, "==", res, BuildLiteralRawString(pos, first.Suffix, isUtf8)); + isMatch = BuildBinaryOp(Ctx, pos, equalOp, res, BuildLiteralRawString(pos, first.Suffix, isUtf8)); } else if (!first.Prefix.empty()) { const TString& prefix = first.Prefix; TNodePtr prefixMatch; if (Ctx.EmitStartsWith) { - prefixMatch = BuildBinaryOp(Ctx, pos, "StartsWith", res, BuildLiteralRawString(pos, prefix, isUtf8)); + prefixMatch = BuildBinaryOp(Ctx, pos, startsWithOp, res, BuildLiteralRawString(pos, prefix, isUtf8)); } else { prefixMatch = BuildBinaryOp(Ctx, pos, ">=", res, BuildLiteralRawString(pos, prefix, isUtf8)); auto upperBound = isUtf8 ? NextValidUtf8(prefix) : NextLexicographicString(prefix); @@ -1834,7 +1840,7 @@ TNodePtr TSqlExpression::SubExpr(const TRule_xor_subexpr& node, const TTrailingQ TNodePtr sizePred = BuildBinaryOp(Ctx, pos, ">=", TNodePtr(new TCallNodeImpl(pos, "Size", { res })), TNodePtr(new TLiteralNumberNode<ui32>(pos, "Uint32", ToString(prefix.size() + suffix.size())))); - TNodePtr suffixMatch = BuildBinaryOp(Ctx, pos, "EndsWith", res, BuildLiteralRawString(pos, suffix, isUtf8)); + TNodePtr suffixMatch = BuildBinaryOp(Ctx, pos, endsWithOp, res, BuildLiteralRawString(pos, suffix, isUtf8)); isMatch = new TCallNodeImpl(pos, "And", { sizePred, prefixMatch, @@ -1849,14 +1855,14 @@ TNodePtr TSqlExpression::SubExpr(const TRule_xor_subexpr& node, const TTrailingQ if (components.size() == 3 && components.back().Prefix.empty()) { // '%foo%' YQL_ENSURE(!components[1].Prefix.empty()); - isMatch = BuildBinaryOp(Ctx, pos, "StringContains", res, BuildLiteralRawString(pos, components[1].Prefix, isUtf8)); + isMatch = BuildBinaryOp(Ctx, pos, containsOp, res, BuildLiteralRawString(pos, components[1].Prefix, isUtf8)); } else if (components.size() == 2) { // '%foo' - isMatch = BuildBinaryOp(Ctx, pos, "EndsWith", res, BuildLiteralRawString(pos, components[1].Prefix, isUtf8)); + isMatch = BuildBinaryOp(Ctx, pos, endsWithOp, res, BuildLiteralRawString(pos, components[1].Prefix, isUtf8)); } } else if (Ctx.AnsiLike && !components.back().Suffix.empty()) { const TString& suffix = components.back().Suffix; - TNodePtr suffixMatch = BuildBinaryOp(Ctx, pos, "EndsWith", res, BuildLiteralRawString(pos, suffix, isUtf8)); + TNodePtr suffixMatch = BuildBinaryOp(Ctx, pos, endsWithOp, res, BuildLiteralRawString(pos, suffix, isUtf8)); isMatch = BuildBinaryOp(Ctx, pos, "And", suffixMatch, isMatch); } // TODO: more StringContains/StartsWith/EndsWith cases? diff --git a/yql/essentials/sql/v1/sql_query.cpp b/yql/essentials/sql/v1/sql_query.cpp index c9ea302c8c7..a750d941917 100644 --- a/yql/essentials/sql/v1/sql_query.cpp +++ b/yql/essentials/sql/v1/sql_query.cpp @@ -3434,6 +3434,12 @@ TNodePtr TSqlQuery::PragmaStatement(const TRule_pragma_stmt& stmt, bool& success } Ctx.Engine = *literal; + } else if (normalizedPragma == "optimizesimpleilike") { + Ctx.OptimizeSimpleIlike = true; + Ctx.IncrementMonCounter("sql_pragma", "OptimizeSimpleILIKE"); + } else if (normalizedPragma == "disableoptimizesimpleilike") { + Ctx.OptimizeSimpleIlike = false; + Ctx.IncrementMonCounter("sql_pragma", "DisableOptimizeSimpleILIKE"); } else { Error() << "Unknown pragma: " << pragma; Ctx.IncrementMonCounter("sql_errors", "UnknownPragma"); diff --git a/yql/essentials/tests/s-expressions/minirun/part0/canondata/result.json b/yql/essentials/tests/s-expressions/minirun/part0/canondata/result.json index 5902893a5ac..e203f265ac5 100644 --- a/yql/essentials/tests/s-expressions/minirun/part0/canondata/result.json +++ b/yql/essentials/tests/s-expressions/minirun/part0/canondata/result.json @@ -211,16 +211,16 @@ ], "test.test[InMem-BitCore-default.txt-Debug]": [ { - "checksum": "3fab26087d7f76a6ef31ed20f9ec0573", - "size": 2021, - "uri": "https://{canondata_backend}/1871002/8daca6f2a7b63a0a94f426a2b401ad3af26d4985/resource.tar.gz#test.test_InMem-BitCore-default.txt-Debug_/opt.yql" + "checksum": "2e88c6c546707f0923c3f4a5cb6061b1", + "size": 2750, + "uri": "https://{canondata_backend}/1871182/13a5336c3cbff0a7768aef57bd68f03fe757b446/resource.tar.gz#test.test_InMem-BitCore-default.txt-Debug_/opt.yql" } ], "test.test[InMem-BitCore-default.txt-Results]": [ { - "checksum": "35cecef4cdd5e75ae9060c89f7af2f9c", - "size": 3977, - "uri": "https://{canondata_backend}/1871002/8daca6f2a7b63a0a94f426a2b401ad3af26d4985/resource.tar.gz#test.test_InMem-BitCore-default.txt-Results_/results.txt" + "checksum": "1b2d2884e64e3e9a2ea816e9e714110e", + "size": 4861, + "uri": "https://{canondata_backend}/1871182/13a5336c3cbff0a7768aef57bd68f03fe757b446/resource.tar.gz#test.test_InMem-BitCore-default.txt-Results_/results.txt" } ], "test.test[InMem-ByteAt-default.txt-Debug]": [ diff --git a/yql/essentials/tests/s-expressions/suites/InMem/BitCore.yqls b/yql/essentials/tests/s-expressions/suites/InMem/BitCore.yqls index 406f7e22703..ea70273ce70 100644 --- a/yql/essentials/tests/s-expressions/suites/InMem/BitCore.yqls +++ b/yql/essentials/tests/s-expressions/suites/InMem/BitCore.yqls @@ -9,8 +9,11 @@ )))) (let world (Apply test world (Apply (bind core_module 'TestBit) (String '"\x05\x04") (Uint64 '0)))) +(let world (Apply test world (Apply (bind core_module 'TestBit) (String '"\x05\x04") (Uint64 '2)))) +(let world (Apply test world (Apply (bind core_module 'TestBit) (String '"\x05\x04") (Uint64 '7)))) +(let world (Apply test world (Apply (bind core_module 'TestBit) (String '"\x05\x04") (Uint64 '8)))) (let world (Apply test world (Apply (bind core_module 'TestBit) (String '"\x05\x04") (Uint64 '10)))) -(let world (Apply test world (Apply (bind core_module 'TestBit) (String '"\x05\x04") (Uint64 '11)))) +(let world (Apply test world (Apply (bind core_module 'TestBit) (String '"\x05\x04") (Uint64 '15)))) (let world (Apply test world (Apply (bind core_module 'TestBit) (String '"\x05\x04") (Uint64 '23)))) (let world (Apply test world (Apply (bind core_module 'TestBit) (Just (String '"\x05\x04")) (Uint64 '0)))) (let world (Apply test world (Apply (bind core_module 'TestBit) (Just (Utf8 '"\x05\x04")) (Uint64 '0)))) diff --git a/yql/essentials/tests/sql/minirun/part1/canondata/result.json b/yql/essentials/tests/sql/minirun/part1/canondata/result.json index baec7526562..b5d225b7f0b 100644 --- a/yql/essentials/tests/sql/minirun/part1/canondata/result.json +++ b/yql/essentials/tests/sql/minirun/part1/canondata/result.json @@ -1232,9 +1232,9 @@ ], "test.test[select-core_func_test_bit-default.txt-Debug]": [ { - "checksum": "bab2b83b596cd2b7e76f7b15cdc71fcf", - "size": 548, - "uri": "https://{canondata_backend}/1936842/f457790bc6a954bcb4a6a17ab144af05c57a7754/resource.tar.gz#test.test_select-core_func_test_bit-default.txt-Debug_/opt.yql" + "checksum": "77122d9689e5286e7f1072e93e70a723", + "size": 613, + "uri": "https://{canondata_backend}/1881367/f5d048c3de61bd751328e99d5916444547ba4d0c/resource.tar.gz#test.test_select-core_func_test_bit-default.txt-Debug_/opt.yql" } ], "test.test[select-core_func_test_bit-default.txt-Results]": [ diff --git a/yql/essentials/tests/sql/minirun/part5/canondata/result.json b/yql/essentials/tests/sql/minirun/part5/canondata/result.json index ec17a6eec0b..1eadc4bc263 100644 --- a/yql/essentials/tests/sql/minirun/part5/canondata/result.json +++ b/yql/essentials/tests/sql/minirun/part5/canondata/result.json @@ -1145,6 +1145,20 @@ "uri": "https://{canondata_backend}/1942100/1466d7e49a6dc5a8df761a5ac92539095e1a14a0/resource.tar.gz#test.test_library-library_udf--Results_/results.txt" } ], + "test.test[like-ilike--Debug]": [ + { + "checksum": "e9edc47e022f94844d6755223a748bbe", + "size": 5224, + "uri": "https://{canondata_backend}/1899731/c18b136d437186ca17b897aa3cbd89e03d6ab70e/resource.tar.gz#test.test_like-ilike--Debug_/opt.yql" + } + ], + "test.test[like-ilike--Results]": [ + { + "checksum": "4f140efd3491b22880e294dce1c735c2", + "size": 9773, + "uri": "https://{canondata_backend}/1899731/c18b136d437186ca17b897aa3cbd89e03d6ab70e/resource.tar.gz#test.test_like-ilike--Results_/results.txt" + } + ], "test.test[like-like_escape-default.txt-Debug]": [ { "checksum": "9a241374a2f6995712f675e514d7e64a", diff --git a/yql/essentials/tests/sql/minirun/part6/canondata/result.json b/yql/essentials/tests/sql/minirun/part6/canondata/result.json index 60e9fee9459..74fb98a7cc1 100644 --- a/yql/essentials/tests/sql/minirun/part6/canondata/result.json +++ b/yql/essentials/tests/sql/minirun/part6/canondata/result.json @@ -875,6 +875,20 @@ "uri": "file://test.test_lambda-list_aggregate_flatmap-default.txt-Results_/extracted" } ], + "test.test[like-ilike-Ansi-Debug]": [ + { + "checksum": "2bf3a064a80a62b0d1e57b4fc19f85aa", + "size": 4198, + "uri": "https://{canondata_backend}/1871102/cf0bf303bf8ddaa5f80dc41d0b1079fd931793f8/resource.tar.gz#test.test_like-ilike-Ansi-Debug_/opt.yql" + } + ], + "test.test[like-ilike-Ansi-Results]": [ + { + "checksum": "4f140efd3491b22880e294dce1c735c2", + "size": 9773, + "uri": "https://{canondata_backend}/1871102/cf0bf303bf8ddaa5f80dc41d0b1079fd931793f8/resource.tar.gz#test.test_like-ilike-Ansi-Results_/results.txt" + } + ], "test.test[match_recognize-alerts_without_order-default.txt-Debug]": [ { "checksum": "617ad997b55cab0792da8c20f2fdeb07", diff --git a/yql/essentials/tests/sql/sql2yql/canondata/result.json b/yql/essentials/tests/sql/sql2yql/canondata/result.json index dd80cad3ba4..7bb69811dcd 100644 --- a/yql/essentials/tests/sql/sql2yql/canondata/result.json +++ b/yql/essentials/tests/sql/sql2yql/canondata/result.json @@ -4283,6 +4283,13 @@ "uri": "https://{canondata_backend}/1942173/99e88108149e222741552e7e6cddef041d6a2846/resource.tar.gz#test_sql2yql.test_library-library_yqls_/sql.yql" } ], + "test_sql2yql.test[like-ilike]": [ + { + "checksum": "82ed5458d953d02cb6b225495713cd99", + "size": 11180, + "uri": "https://{canondata_backend}/1946324/f3be5399e4ba264ff20e7048ebc764e7239129dc/resource.tar.gz#test_sql2yql.test_like-ilike_/sql.yql" + } + ], "test_sql2yql.test[like-like_escape]": [ { "checksum": "dad96e3616b966f1a15c37b4396213a1", @@ -10521,6 +10528,11 @@ "uri": "file://test_sql_format.test_library-library_yqls_/formatted.sql" } ], + "test_sql_format.test[like-ilike]": [ + { + "uri": "file://test_sql_format.test_like-ilike_/formatted.sql" + } + ], "test_sql_format.test[like-like_escape]": [ { "uri": "file://test_sql_format.test_like-like_escape_/formatted.sql" diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_like-ilike_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_like-ilike_/formatted.sql new file mode 100644 index 00000000000..96e89141855 --- /dev/null +++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_like-ilike_/formatted.sql @@ -0,0 +1,51 @@ +PRAGMA OptimizeSimpleILIKE; + +SELECT + 'test' ILIKE 'test', + 'TEST' ILIKE 'test', + 'тест' ILIKE 'ТЕСТ', + 'Привет' ILIKE 'пРиВеТ', + 'prefix' ILIKE 'Pre%', + 'префикс' ILIKE 'Пре%', + 'suFfix' ILIKE '%Fix', + 'СУФФИКС' ILIKE '%Икс', + 'инфикс' ILIKE '%Фи%', + 'Комплекс' ILIKE '%О%П%С', + + --negative + 'тест' ILIKE 'Тесть', + 'Тост' ILIKE '%мост', + 'Лось' ILIKE 'Лом%' +; + +SELECT + NULL ILIKE 'test', + NULL ILIKE 'te%', + NULL ILIKE '%st', + NULL ILIKE '%es%' +; + +$table = [ + <| + str: 'TeSt', + optStr: Just('TeSt'), + nullStr: Nothing(String?) + |> +]; + +SELECT + str ILIKE 'test', + str ILIKE 'te%', + str ILIKE '%st', + str ILIKE '%es%', + optStr ILIKE 'test', + optStr ILIKE 'te%', + optStr ILIKE '%st', + optStr ILIKE '%es%', + nullStr ILIKE 'test', + nullStr ILIKE 'te%', + nullStr ILIKE '%st', + nullStr ILIKE '%es%' +FROM + AS_TABLE($table) +; diff --git a/yql/essentials/tests/sql/suites/like/ilike-Ansi.cfg b/yql/essentials/tests/sql/suites/like/ilike-Ansi.cfg new file mode 100644 index 00000000000..7dc25ec8499 --- /dev/null +++ b/yql/essentials/tests/sql/suites/like/ilike-Ansi.cfg @@ -0,0 +1 @@ +pragma AnsiLike
\ No newline at end of file diff --git a/yql/essentials/tests/sql/suites/like/ilike.cfg b/yql/essentials/tests/sql/suites/like/ilike.cfg new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/yql/essentials/tests/sql/suites/like/ilike.cfg diff --git a/yql/essentials/tests/sql/suites/like/ilike.sql b/yql/essentials/tests/sql/suites/like/ilike.sql new file mode 100644 index 00000000000..b67825e2feb --- /dev/null +++ b/yql/essentials/tests/sql/suites/like/ilike.sql @@ -0,0 +1,48 @@ +PRAGMA OptimizeSimpleILIKE; + +SELECT + 'test' ILIKE 'test', + 'TEST' ILIKE 'test', + 'тест' ILIKE 'ТЕСТ', + 'Привет' ILIKE 'пРиВеТ', + 'prefix' ILIKE 'Pre%', + 'префикс' ILIKE 'Пре%', + 'suFfix' ILIKE '%Fix', + 'СУФФИКС' ILIKE '%Икс', + 'инфикс' ILIKE '%Фи%', + 'Комплекс' ILIKE '%О%П%С', + --negative + 'тест' ILIKE 'Тесть', + 'Тост' ILIKE '%мост', + 'Лось' ILIKE 'Лом%' +; + +SELECT + Null ILIKE 'test', + Null ILIKE 'te%', + Null ILIKE '%st', + Null ILIKE '%es%' +; + +$table = [<| + str: 'TeSt', + optStr: Just("TeSt"), + nullStr: Nothing(String?) +|>]; + +SELECT + str ILIKE 'test', + str ILIKE 'te%', + str ILIKE '%st', + str ILIKE '%es%', + + optStr ILIKE 'test', + optStr ILIKE 'te%', + optStr ILIKE '%st', + optStr ILIKE '%es%', + + nullStr ILIKE 'test', + nullStr ILIKE 'te%', + nullStr ILIKE '%st', + nullStr ILIKE '%es%' +FROM AS_TABLE($table);
\ No newline at end of file diff --git a/yql/essentials/tools/yql_complete/yql_complete.cpp b/yql/essentials/tools/yql_complete/yql_complete.cpp index 18e57492282..59fd71a87c1 100644 --- a/yql/essentials/tools/yql_complete/yql_complete.cpp +++ b/yql/essentials/tools/yql_complete/yql_complete.cpp @@ -1,7 +1,7 @@ #include <yql/essentials/sql/v1/complete/sql_complete.h> -#include <yql/essentials/sql/v1/complete/name/static/frequency.h> -#include <yql/essentials/sql/v1/complete/name/static/ranking.h> -#include <yql/essentials/sql/v1/complete/name/static/name_service.h> +#include <yql/essentials/sql/v1/complete/name/service/static/frequency.h> +#include <yql/essentials/sql/v1/complete/name/service/static/ranking.h> +#include <yql/essentials/sql/v1/complete/name/service/static/name_service.h> #include <yql/essentials/sql/v1/lexer/antlr4_pure/lexer.h> #include <yql/essentials/sql/v1/lexer/antlr4_pure_ansi/lexer.h> @@ -98,7 +98,7 @@ int Run(int argc, char* argv[]) { input.CursorPosition = queryString.size(); } - auto output = engine->Complete(input); + auto output = engine->CompleteAsync(input).ExtractValueSync(); for (const auto& c : output.Candidates) { Cout << "[" << c.Kind << "] " << c.Content << "\n"; } diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp index b292f281fba..a326b111ad4 100644 --- a/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp +++ b/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp @@ -1293,14 +1293,14 @@ private: } if (writeRef && !options.FillSettings().Discard) { - auto cluster = GetClusterName(pull.Input()); writer.OnKeyedItem("Ref"); writer.OnBeginList(); for (auto& tableInfo: GetInputTableInfos(pull.Input())) { + TString cluster = tableInfo->Cluster; writer.OnListItem(); if (tableInfo->IsTemp) { auto outPath = Services_->GetTmpTablePath(tableInfo->Name); - session.CancelDeleteAtFinalize(TString{cluster}, outPath); + session.CancelDeleteAtFinalize(cluster, outPath); } NYql::WriteTableReference(writer, YtProviderName, cluster, tableInfo->Name, tableInfo->IsTemp, columns); } diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_exec_ctx.h b/yt/yql/providers/yt/gateway/native/yql_yt_exec_ctx.h index 059af11397f..f10cae2bd9a 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_exec_ctx.h +++ b/yt/yql/providers/yt/gateway/native/yql_yt_exec_ctx.h @@ -45,6 +45,7 @@ struct TInputInfo { TInputInfo(const TString& name, const NYT::TRichYPath& path, bool temp, bool strict, const TYtTableBaseInfo& info, const NYT::TNode& spec, ui32 group = 0) : Name(name) , Path(path) + , Cluster(info.Cluster) , Temp(temp) , Dynamic(info.Meta->IsDynamic) , Strict(strict) @@ -59,6 +60,7 @@ struct TInputInfo { TString Name; NYT::TRichYPath Path; + TString Cluster; bool Temp = false; bool Dynamic = false; bool Strict = true; diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp index 6a65dadebb8..a24ae8253fe 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp +++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp @@ -3325,7 +3325,13 @@ private: bool ref = NCommon::HasResOrPullOption(pull.Ref(), "ref"); bool autoRef = NCommon::HasResOrPullOption(pull.Ref(), "autoref"); - auto cluster = TString{GetClusterName(pull.Input())}; + TString cluster = options.UsedCluster(); + if (cluster.empty()) { + cluster = options.Config()->DefaultCluster.Get().GetOrElse(TString()); + } + if (cluster.empty()) { + cluster = Clusters_->GetDefaultClusterName(); + } auto execCtx = MakeExecCtx(std::move(options), session, cluster, pull.Raw(), &ctx); if (auto read = pull.Input().Maybe<TCoRight>().Input().Maybe<TYtReadTable>()) { @@ -3464,12 +3470,14 @@ private: } } } else if (auto limiter = TTableLimiter(range)) { - auto entry = execCtx->GetEntry(); bool stop = false; const bool useNativeDyntableRead = execCtx->Options_.Config()->UseNativeDynamicTableRead.Get().GetOrElse(DEFAULT_USE_NATIVE_DYNAMIC_TABLE_READ); for (size_t i = 0; i < execCtx->InputTables_.size(); ++i) { TString srcTableName = execCtx->InputTables_[i].Name; NYT::TRichYPath srcTable = execCtx->InputTables_[i].Path; + srcTable.Cluster_.Clear(); + TString srcTableCluster = execCtx->InputTables_[i].Cluster; + YQL_ENSURE(srcTableCluster); const bool isDynamic = execCtx->InputTables_[i].Dynamic; if (!isDynamic || useNativeDyntableRead) { if (const auto recordsCount = execCtx->InputTables_[i].Records; recordsCount || !isDynamic) { @@ -3481,6 +3489,7 @@ private: limiter.NextDynamicTable(); } + auto entry = execCtx->GetEntryForCluster(srcTableCluster); if (isDynamic && !useNativeDyntableRead) { YQL_ENSURE(srcTable.GetRanges().Empty()); stop = NYql::SelectRows(entry->Client, srcTableName, i, specsCache, pullData, limiter); diff --git a/yt/yql/providers/yt/lib/hash/ya.make b/yt/yql/providers/yt/lib/hash/ya.make index 508131391f2..b444d8a0fc4 100644 --- a/yt/yql/providers/yt/lib/hash/ya.make +++ b/yt/yql/providers/yt/lib/hash/ya.make @@ -14,4 +14,6 @@ PEERDIR( yql/essentials/core/expr_nodes ) +YQL_LAST_ABI_VERSION() + END() diff --git a/yt/yql/providers/yt/lib/hash/yql_op_hash.cpp b/yt/yql/providers/yt/lib/hash/yql_op_hash.cpp index 0fe82bd4e20..2f6e3240b53 100644 --- a/yt/yql/providers/yt/lib/hash/yql_op_hash.cpp +++ b/yt/yql/providers/yt/lib/hash/yql_op_hash.cpp @@ -3,6 +3,7 @@ #include <yql/essentials/core/yql_type_annotation.h> #include <yql/essentials/core/expr_nodes/yql_expr_nodes.h> +#include <yql/essentials/minikql/mkql_program_builder.h> #include <yql/essentials/utils/log/log.h> #include <yql/essentials/utils/yql_panic.h> @@ -81,10 +82,34 @@ TString TNodeHashCalculator::GetHashImpl(const TExprNode& node, TArgIndex& argIn isHashable = false; } else { - if (TCoUdf::Match(&node) && node.ChildrenSize() > TCoUdf::idx_FileAlias && !node.Child(TCoUdf::idx_FileAlias)->Content().empty()) { - // an udf from imported file, use hash of file - auto alias = node.Child(TCoUdf::idx_FileAlias)->Content(); - UpdateFileHash(builder, alias); + if (TCoUdf::Match(&node)) { + if (node.ChildrenSize() > TCoUdf::idx_FileAlias && !node.Child(TCoUdf::idx_FileAlias)->Content().empty()) { + // an udf from imported file, use hash of file + auto alias = node.Child(TCoUdf::idx_FileAlias)->Content(); + UpdateFileHash(builder, alias); + } else { + // preinstalled + TStringBuf moduleName, funcName; + YQL_ENSURE(SplitUdfName(node.Head().Content(), moduleName, funcName)); + const auto res = Types.UdfResolver->GetSystemModulePath(moduleName); + YQL_ENSURE(res, "Expected either file alias or system module"); + builder << moduleName << res->Md5; + } + } else if (TCoScriptUdf::Match(&node)) { + if (node.ChildrenSize() > TCoScriptUdf::idx_FileAlias && !node.Child(TCoScriptUdf::idx_FileAlias)->Content().empty()) { + // an udf from imported file, use hash of file + auto alias = node.Child(TCoScriptUdf::idx_FileAlias)->Content(); + UpdateFileHash(builder, alias); + } else { + auto moduleName = node.Head().Content(); + auto scriptType = NKikimr::NMiniKQL::CanonizeScriptType(NKikimr::NMiniKQL::ScriptTypeFromStr(moduleName)); + if (!NKikimr::NMiniKQL::IsCustomPython(scriptType)) { + moduleName = NKikimr::NMiniKQL::ScriptTypeAsStr(scriptType); + } + const auto res = Types.UdfResolver->GetSystemModulePath(moduleName); + YQL_ENSURE(res, "Expected either file alias or system module"); + builder << moduleName << res->Md5; + } } else if (node.Content() == "FilePath" || node.Content() == "FileContent") { auto alias = node.Child(0)->Content(); UpdateFileHash(builder, alias); diff --git a/yt/yql/providers/yt/provider/yql_yt_datasink_exec.cpp b/yt/yql/providers/yt/provider/yql_yt_datasink_exec.cpp index 74905bb7b32..72865fcce3c 100644 --- a/yt/yql/providers/yt/provider/yql_yt_datasink_exec.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_datasink_exec.cpp @@ -710,7 +710,7 @@ private: TString operationHash; if (const auto queryCacheMode = config->QueryCacheMode.Get().GetOrElse(EQueryCacheMode::Disable); queryCacheMode != EQueryCacheMode::Disable) { if (!hasNonDeterministicFunctions) { - operationHash = TYtNodeHashCalculator(State_, cluster, config).GetHash(*input); + operationHash = TYtNodeHashCalculator(State_, cluster, config).GetHash(*optimizedNode); } YQL_CLOG(DEBUG, ProviderYt) << "Operation hash: " << HexEncode(operationHash).Quote() << ", cache mode: " << queryCacheMode; } diff --git a/yt/yql/tests/sql/suites/multicluster/pull.sql b/yt/yql/tests/sql/suites/multicluster/pull.sql new file mode 100644 index 00000000000..d37f8d16dc4 --- /dev/null +++ b/yt/yql/tests/sql/suites/multicluster/pull.sql @@ -0,0 +1,8 @@ +/* postgres can not */ +/* yt can not */ +pragma yt.RuntimeCluster='banach'; +pragma yt.RuntimeClusterSelection='auto'; + +select * from plato.PInput +union all +select * from banach.BInput diff --git a/yt/yt/client/api/config.cpp b/yt/yt/client/api/config.cpp index 82a43d37d7a..ff1bb03903f 100644 --- a/yt/yt/client/api/config.cpp +++ b/yt/yt/client/api/config.cpp @@ -153,6 +153,8 @@ void TJournalWriterConfig::Register(TRegistrar registrar) registrar.Parameter("enable_checksums", &TThis::EnableChecksums) .Default(false); + registrar.Parameter("validate_erasure_coding", &TThis::ValidateErasureCoding) + .Default(false); registrar.Parameter("dont_close", &TThis::DontClose) .Default(false); diff --git a/yt/yt/client/api/config.h b/yt/yt/client/api/config.h index cdd6901b3a5..8666bdd7ac6 100644 --- a/yt/yt/client/api/config.h +++ b/yt/yt/client/api/config.h @@ -210,6 +210,7 @@ struct TJournalWriterConfig TDuration PrerequisiteTransactionProbePeriod; bool EnableChecksums; + bool ValidateErasureCoding; // For testing purposes only. bool DontClose; diff --git a/yt/yt/client/chunk_client/config.cpp b/yt/yt/client/chunk_client/config.cpp index 069ef37b30b..5b493671584 100644 --- a/yt/yt/client/chunk_client/config.cpp +++ b/yt/yt/client/chunk_client/config.cpp @@ -177,6 +177,8 @@ void TReplicationReaderConfig::Register(TRegistrar registrar) .Default(false); registrar.Parameter("use_read_blocks_batcher", &TThis::UseReadBlocksBatcher) .Default(false); + registrar.Parameter("block_set_subrequest_threshold", &TThis::BlockSetSubrequestThreshold) + .Default(); registrar.Postprocessor([] (TThis* config) { // Seems unreasonable to make backoff greater than half of total session timeout. diff --git a/yt/yt/core/concurrency/throughput_throttler.cpp b/yt/yt/core/concurrency/throughput_throttler.cpp index c88a7a2ddd2..daafe952a33 100644 --- a/yt/yt/core/concurrency/throughput_throttler.cpp +++ b/yt/yt/core/concurrency/throughput_throttler.cpp @@ -725,11 +725,26 @@ public: })); } - bool TryAcquire(i64 /*amount*/) override + bool TryAcquire(i64 amount) override { - YT_ABORT(); + size_t i = 0; + for (; i < Throttlers_.size(); ++i) { + if (!Throttlers_[i]->TryAcquire(amount)) { + break; + } + } + + if (i != Throttlers_.size()) { + for (size_t j = 0; j < i; ++j) { + Throttlers_[j]->Release(amount); + } + return false; + } + + return true; } + // TODO: implement TryAcquireAvailable the same way as TryAcquire. i64 TryAcquireAvailable(i64 /*amount*/) override { YT_ABORT(); diff --git a/yt/yt/core/misc/proc.cpp b/yt/yt/core/misc/proc.cpp index 4f043800fbd..20179f75d2b 100644 --- a/yt/yt/core/misc/proc.cpp +++ b/yt/yt/core/misc/proc.cpp @@ -106,6 +106,53 @@ bool IsSystemError(const TError& error) //////////////////////////////////////////////////////////////////////////////// +TFileDescriptorGuard::TFileDescriptorGuard(TFileDescriptor fd) noexcept + : FD_(fd) +{} + +TFileDescriptorGuard::~TFileDescriptorGuard() +{ + Reset(); +} + +TFileDescriptorGuard::TFileDescriptorGuard(TFileDescriptorGuard&& other) noexcept + : FD_(other.FD_) +{ + other.FD_ = -1; +} + +TFileDescriptorGuard& TFileDescriptorGuard::operator=(TFileDescriptorGuard&& other) noexcept +{ + if (this != &other) { + Reset(); + FD_ = other.FD_; + other.FD_ = -1; + } + return *this; +} + +TFileDescriptor TFileDescriptorGuard::Get() const noexcept +{ + return FD_; +} + +TFileDescriptor TFileDescriptorGuard::Release() noexcept +{ + TFileDescriptor fd = FD_; + FD_ = -1; + return fd; +} + +void TFileDescriptorGuard::Reset() noexcept +{ + if (FD_ != -1) { + YT_VERIFY(TryClose(FD_, false)); + FD_ = -1; + } +} + +//////////////////////////////////////////////////////////////////////////////// + std::optional<int> GetParentPid(int pid) { TFileInput in(Format("/proc/%v/status", pid)); diff --git a/yt/yt/core/misc/proc.h b/yt/yt/core/misc/proc.h index a76fea3c54e..7111ffbdafd 100644 --- a/yt/yt/core/misc/proc.h +++ b/yt/yt/core/misc/proc.h @@ -28,6 +28,30 @@ bool IsSystemError(const TError& error); //////////////////////////////////////////////////////////////////////////////// +class TFileDescriptorGuard +{ +public: + TFileDescriptorGuard(TFileDescriptor fd = -1) noexcept; + + ~TFileDescriptorGuard(); + + TFileDescriptorGuard(const TFileDescriptorGuard&) = delete; + TFileDescriptorGuard& operator = (const TFileDescriptorGuard&) = delete; + + TFileDescriptorGuard(TFileDescriptorGuard&& other) noexcept; + TFileDescriptorGuard& operator = (TFileDescriptorGuard&& other) noexcept; + + TFileDescriptor Get() const noexcept; + + TFileDescriptor Release() noexcept; + void Reset() noexcept; + +private: + TFileDescriptor FD_ = -1; +}; + +//////////////////////////////////////////////////////////////////////////////// + std::vector<int> ListPids(); std::vector<int> GetPidsByUid(int uid = -1); std::vector<int> GetPidsUnderParent(int targetPid); diff --git a/yt/yt/core/net/connection.cpp b/yt/yt/core/net/connection.cpp index c1d9ae15a8d..5f64072b0c9 100644 --- a/yt/yt/core/net/connection.cpp +++ b/yt/yt/core/net/connection.cpp @@ -10,6 +10,8 @@ #include <yt/yt/core/net/socket.h> +#include <library/cpp/yt/memory/non_null_ptr.h> + #include <library/cpp/yt/system/handle_eintr.h> #include <util/network/pollerimpl.h> @@ -18,6 +20,7 @@ #ifdef _linux_ #include <sys/ioctl.h> + #include <sys/signalfd.h> #endif #ifdef _win_ @@ -97,48 +100,205 @@ ssize_t WriteToFD(TFileDescriptor fd, const char* buffer, size_t length) #endif } -TErrorOr<int> CheckPipeBytesLeftToRead(const TString& pipePath) noexcept +TError MakeSystemError(TFormatString<> message) +{ + return TError(message) + << TError::FromSystem(); +} + +[[maybe_unused]] TErrorOr<int> CheckPipeBytesLeftToRead(TFileDescriptor fd) noexcept { #ifdef _linux_ int bytesLeft = 0; - auto makeSystemError = [&] (TFormatString<> message) { - return TError(message) - << TError::FromSystem() - << TErrorAttribute("pipe_path", pipePath); - }; - { - int flags = O_RDONLY | O_CLOEXEC | O_NONBLOCK; - int fd = HandleEintr(::open, pipePath.c_str(), flags); - - if (fd == -1) { - return makeSystemError("Failed to open file descriptor"); - } - int ret = ::ioctl(fd, FIONREAD, &bytesLeft); - if (ret == -1) { - return makeSystemError("ioctl failed"); - } - - if (!TryClose(fd, /*ignoreBadFD*/ false)) { - return makeSystemError("Failed to close file descriptor"); + return MakeSystemError("ioctl failed"); } } return bytesLeft; #else - Y_UNUSED(pipePath); + Y_UNUSED(fd); return TError("Unsupported platform"); #endif } +template <class TDerived> +class TWriteConnectionBase + : public virtual IConnectionWriter +{ +public: + TFuture<void> Write(const TSharedRef& data) override + { + return ToDerived()->GetImpl()->Write(data); + } + + TFuture<void> Close() override + { + return ToDerived()->GetImpl()->Close(); + } + + TFuture<void> WriteV(const TSharedRefArray& data) override + { + return ToDerived()->GetImpl()->WriteV(data); + } + + TFuture<void> CloseWrite() override + { + return ToDerived()->GetImpl()->CloseWrite(); + } + + TFuture<void> Abort() override + { + return ToDerived()->GetImpl()->Abort(TError(NNet::EErrorCode::Aborted, "Connection aborted")); + } + + int GetHandle() const override + { + return ToDerived()->GetImpl()->GetHandle(); + } + + i64 GetWriteByteCount() const override + { + return ToDerived()->GetImpl()->GetWriteByteCount(); + } + + void SetWriteDeadline(std::optional<TInstant> deadline) override + { + ToDerived()->GetImpl()->SetWriteDeadline(deadline); + } + + TConnectionStatistics GetWriteStatistics() const override + { + return ToDerived()->GetImpl()->GetWriteStatistics(); + } + +protected: + TDerived* ToDerived() + { + return static_cast<TDerived*>(this); + } + + const TDerived* ToDerived() const + { + return static_cast<const TDerived*>(this); + } +}; + +template <class TDerived> +class TReadConnectionBase + : public virtual IConnectionReader +{ +public: + TFuture<size_t> Read(const TSharedMutableRef& data) override + { + return ToDerived()->GetImpl()->Read(data); + } + + TFuture<void> CloseRead() override + { + return ToDerived()->GetImpl()->CloseRead(); + } + + TFuture<void> Abort() override + { + return ToDerived()->GetImpl()->Abort(TError(NNet::EErrorCode::Aborted, "Connection aborted")); + } + + int GetHandle() const override + { + return ToDerived()->GetImpl()->GetHandle(); + } + + i64 GetReadByteCount() const override + { + return ToDerived()->GetImpl()->GetReadByteCount(); + } + + void SetReadDeadline(std::optional<TInstant> deadline) override + { + ToDerived()->GetImpl()->SetReadDeadline(deadline); + } + + TConnectionStatistics GetReadStatistics() const override + { + return ToDerived()->GetImpl()->GetReadStatistics(); + } +protected: + TDerived* ToDerived() + { + return static_cast<TDerived*>(this); + } + + const TDerived* ToDerived() const + { + return static_cast<const TDerived*>(this); + } +}; + +template <class TDerived> +class TReadWriteConnectionBase + : public TWriteConnectionBase<TDerived> + , public TReadConnectionBase<TDerived> + , public IConnection +{ + using TWriteConnectionBase<TDerived>::ToDerived; + +public: + TConnectionId GetId() const override + { + return ToDerived()->GetImpl()->GetId(); + } + + const TNetworkAddress& GetLocalAddress() const override + { + return ToDerived()->GetImpl()->GetLocalAddress(); + } + + const TNetworkAddress& GetRemoteAddress() const override + { + return ToDerived()->GetImpl()->GetRemoteAddress(); + } + + bool IsIdle() const override + { + return ToDerived()->GetImpl()->IsIdle(); + } + + bool IsReusable() const override + { + return ToDerived()->GetImpl()->IsReusable(); + } + + bool SetNoDelay() override + { + return ToDerived()->GetImpl()->SetNoDelay(); + } + + bool SetKeepAlive() override + { + return ToDerived()->GetImpl()->SetKeepAlive(); + } + + TFuture<void> Abort() override + { + return ToDerived()->GetImpl()->Abort(TError(NNet::EErrorCode::Aborted, "Connection aborted")); + } + + void SubscribePeerDisconnect(TCallback<void()> cb) override + { + return ToDerived()->GetImpl()->SubscribePeerDisconnect(std::move(cb)); + } +}; + } // namespace //////////////////////////////////////////////////////////////////////////////// DECLARE_REFCOUNTED_CLASS(TFDConnectionImpl) +DECLARE_REFCOUNTED_CLASS(TDeliveryFencedWriteConnectionImpl) //////////////////////////////////////////////////////////////////////////////// @@ -182,8 +342,7 @@ public: return TIOResult{.Retry = Position_ == 0, .ByteCount = bytesRead}; } - return TError("Read failed") - << TError::FromSystem(); + return MakeSystemError("Read failed"); } if (size == 0) { break; @@ -243,8 +402,7 @@ public: return TIOResult{.Retry = true, .ByteCount = 0}; } - return TError("Read failed") - << TError::FromSystem(); + return MakeSystemError("Read failed"); } Position_ += size; @@ -297,8 +455,7 @@ public: if (GetLastNetworkError() == EWOULDBLOCK) { return TIOResult{.Retry = true, .ByteCount = bytesWritten}; } - return TError("Write failed") - << TError::FromSystem(); + return MakeSystemError("Write failed"); } YT_VERIFY(size > 0); @@ -323,6 +480,12 @@ public: return ResultPromise_.ToFuture(); } +protected: + bool IsWriteComplete(const TErrorOr<TIOResult>& result) + { + return result.IsOK() && !result.Value().Retry; + } + private: const TSharedRef Buffer_; const TPromise<void> ResultPromise_ = NewPromise<void>(); @@ -332,11 +495,12 @@ private: //////////////////////////////////////////////////////////////////////////////// -class TDeliveryFencedWriteOperation +#ifdef _linux_ +class TDeliveryFencedWriteOperationOld : public TWriteOperation { public: - TDeliveryFencedWriteOperation(TSharedRef buffer, TString pipePath) + TDeliveryFencedWriteOperationOld(TSharedRef buffer, std::string pipePath) : TWriteOperation(std::move(buffer)) , PipePath_(std::move(pipePath)) { } @@ -345,12 +509,19 @@ public: { auto result = TWriteOperation::PerformIO(fd); if (IsWriteComplete(result)) { - auto bytesLeftOrError = CheckPipeBytesLeftToRead(PipePath_); + int flags = O_RDONLY | O_CLOEXEC | O_NONBLOCK; + int fd = HandleEintr(::open, PipePath_.data(), flags); + if (fd == -1) { + return MakeSystemError("Failed to open file descriptor"); + } + auto bytesLeftOrError = CheckPipeBytesLeftToRead(fd); + + YT_VERIFY(TryClose(fd, /*ignoreBadFD*/ false)); if (!bytesLeftOrError.IsOK()) { YT_LOG_ERROR(bytesLeftOrError, "Delivery fenced write failed"); - return std::move(bytesLeftOrError).Wrap(); + return bytesLeftOrError; } else { YT_LOG_DEBUG("Delivery fenced write pipe check finished (BytesLeft: %v)", bytesLeftOrError.Value()); } @@ -364,14 +535,91 @@ public: } private: - const TString PipePath_; + const std::string PipePath_; +}; - bool IsWriteComplete(const TErrorOr<TIOResult>& result) +class TDeliveryFencedWriteOperation + : public TWriteOperation +{ +public: + TDeliveryFencedWriteOperation(TSharedRef buffer, TFileDescriptor writeFd, TFileDescriptor readFd) + : TWriteOperation(std::move(buffer)) + , WriteFD_(writeFd) + , ReadFD_(readFd) + { } + + TErrorOr<TIOResult> PerformIO(TFileDescriptor fd) override { - return result.IsOK() && !result.Value().Retry; + if (!std::exchange(FirstWritingAttempt_, false)) { + auto errorOrIsSignalConsumed = ConsumeSignalFdEventsAndCheckItContainsOurFD(fd); + + if (!errorOrIsSignalConsumed.IsOK()) { + return errorOrIsSignalConsumed; + } + + if (!errorOrIsSignalConsumed.Value()) { + return TIOResult{ + .Retry = true, + .ByteCount = 0, + }; + } + } + + auto result = TWriteOperation::PerformIO(WriteFD_); + if (IsWriteComplete(result)) { + auto bytesLeftOrError = CheckPipeBytesLeftToRead(ReadFD_); + + if (!bytesLeftOrError.IsOK()) { + YT_LOG_ERROR(bytesLeftOrError, "Delivery fenced write failed"); + return bytesLeftOrError; + } else { + YT_LOG_DEBUG("Delivery fenced write pipe check finished (BytesLeft: %v)", bytesLeftOrError.Value()); + } + + result.Value().Retry = (bytesLeftOrError.Value() != 0); + } else { + YT_LOG_DEBUG("Delivery fenced write to pipe step finished (Result: %v)", result); + } + + return result; + } + +private: + const TFileDescriptor WriteFD_; + const TFileDescriptor ReadFD_; + + bool FirstWritingAttempt_ = true; + + TErrorOr<bool> ConsumeSignalFdEventsAndCheckItContainsOurFD(TFileDescriptor fd) + { + bool signalForWriteFDConsumed = false; + for (; ;) { + struct signalfd_siginfo fdsi; + ssize_t s = HandleEintr(::read, fd, &fdsi, sizeof(fdsi)); + if (s <= 0) { + break; + } + if (s != sizeof(fdsi)) { + return MakeSystemError("Invalid signalfd_siginfo size"); + } + + if (fdsi.ssi_signo != static_cast<uint32_t>(DeliveryFencedWriteSignal)) { + continue; + } + + if (fdsi.ssi_fd != WriteFD_) { + continue; + } + + signalForWriteFDConsumed = true; + } + + return signalForWriteFDConsumed; } }; +#endif // _linux_ + //////////////////////////////////////////////////////////////////////////////// class TWriteVOperation @@ -407,8 +655,7 @@ public: return TIOResult{.Retry = true, .ByteCount = bytesWritten}; } - return TError("Write failed") - << TError::FromSystem(); + return MakeSystemError("Write failed"); } YT_VERIFY(size > 0); @@ -460,8 +707,7 @@ public: { int res = HandleEintr(::shutdown, fd, ShutdownRead_ ? SHUT_RD : SHUT_WR); if (res == -1) { - return TError("Shutdown failed") - << TError::FromSystem(); + return MakeSystemError("Shutdown failed"); } return TIOResult{.Retry = false, .ByteCount = 0}; } @@ -488,17 +734,33 @@ private: //////////////////////////////////////////////////////////////////////////////// +// TODO(pogorelov): Make separate clases for pipe and socket connections. class TFDConnectionImpl : public TPollableBase { + struct TIODirection; public: static TFDConnectionImplPtr Create( TFileDescriptor fd, - TString filePath, IPollerPtr poller, + std::string filePath, + // COMPAT(pogorelov) bool useDeliveryFence) { - auto impl = New<TFDConnectionImpl>(fd, std::move(filePath), std::move(poller), useDeliveryFence); + #ifndef _linux_ + THROW_ERROR_EXCEPTION_IF(useDeliveryFence, "Delivery fenced write is not supported on this platform"); + #endif // _linux_ + auto epollControl = EPollControl::Read | EPollControl::Write | EPollControl::EdgeTriggered; + auto readEpollControl = EPollControl::Read; + auto writeEpollControl = EPollControl::Write; + auto impl = New<TFDConnectionImpl>( + fd, + epollControl, + readEpollControl, + writeEpollControl, + std::move(poller), + std::move(filePath), + useDeliveryFence); impl->Init(); return impl; } @@ -509,8 +771,19 @@ public: const TNetworkAddress& remoteAddress, IPollerPtr poller) { - auto impl = New<TFDConnectionImpl>(fd, localAddress, remoteAddress, std::move(poller)); + auto epollControl = EPollControl::Read | EPollControl::Write | EPollControl::EdgeTriggered | EPollControl::ReadHup; + auto readEpollControl = EPollControl::Read; + auto writeEpollControl = EPollControl::Write; + auto impl = New<TFDConnectionImpl>( + fd, + epollControl, + readEpollControl, + writeEpollControl, + localAddress, + remoteAddress, + std::move(poller)); impl->Init(); + return impl; } @@ -521,8 +794,8 @@ public: void OnEvent(EPollControl control) override { - DoIO(&WriteDirection_, Any(control & EPollControl::Write)); - DoIO(&ReadDirection_, Any(control & EPollControl::Read)); + DoIO(GetPtr(WriteDirection_), Any(control & WriteEpollControl_)); + DoIO(GetPtr(ReadDirection_), Any(control & ReadEpollControl_)); if (Any(control & EPollControl::ReadHup)) { OnPeerDisconnected(); @@ -566,7 +839,7 @@ public: } Poller_->Unarm(FD_, this); - YT_VERIFY(TryClose(FD_, false)); + YT_VERIFY(TryClose(FD_, /*ignoreBadFD*/ false)); FD_ = -1; OnPeerDisconnected(); @@ -580,7 +853,7 @@ public: { auto read = std::make_unique<TReadOperation>(data); auto future = read->ToFuture(); - StartIO(&ReadDirection_, std::move(read)); + StartIO(GetPtr(ReadDirection_), std::move(read)); return future; } @@ -588,7 +861,7 @@ public: { auto receive = std::make_unique<TReceiveFromOperation>(buffer); auto future = receive->ToFuture(); - StartIO(&ReadDirection_, std::move(receive)); + StartIO(GetPtr(ReadDirection_), std::move(receive)); return future; } @@ -604,8 +877,7 @@ public: address.GetSockAddr(), address.GetLength()); if (res == -1) { - THROW_ERROR_EXCEPTION("Write failed") - << TError::FromSystem(); + THROW_ERROR_EXCEPTION(MakeSystemError("Write failed")); } } @@ -623,18 +895,34 @@ public: TFuture<void> Write(const TSharedRef& data) { - if (UseDeliveryFence_) { - return DoDeliveryFencedWrite(data); - } + #ifdef _linux_ + auto writeOperation = UseDeliveryFence_ + ? std::make_unique<TDeliveryFencedWriteOperationOld>(data, PipePath_) + : std::make_unique<TWriteOperation>(data); + #else // _linux_ + auto writeOperation = std::make_unique<TWriteOperation>(data); + YT_VERIFY(!UseDeliveryFence_); + #endif // _linux_ + + auto future = writeOperation->ToFuture(); + + DoWrite(std::move(writeOperation)); + + return future; + } - return DoWrite(data); + // forcefullyConsiderPending used to perform first io without waiting event on epoll. + // See TDeliveryFencedWriteConnection. + void DoWrite(std::unique_ptr<IIOOperation> operation, bool forcefullyConsiderPending = false) + { + StartIO(GetPtr(WriteDirection_), std::move(operation), forcefullyConsiderPending); } TFuture<void> WriteV(const TSharedRefArray& data) { auto writeV = std::make_unique<TWriteVOperation>(data); auto future = writeV->ToFuture(); - StartIO(&WriteDirection_, std::move(writeV)); + StartIO(GetPtr(WriteDirection_), std::move(writeV)); return future; } @@ -661,17 +949,17 @@ public: return IsIdle(); } - TFuture<void> Abort(const TError& error) + TFuture<void> Abort(TError error) { YT_LOG_DEBUG(error, "Aborting connection"); - return AbortIO(error); + return AbortIO(std::move(error)); } TFuture<void> CloseRead() { auto shutdownRead = std::make_unique<TShutdownOperation>(true); auto future = shutdownRead->ToFuture(); - StartIO(&ReadDirection_, std::move(shutdownRead)); + StartIO(GetPtr(ReadDirection_), std::move(shutdownRead)); return future; } @@ -679,7 +967,7 @@ public: { auto shutdownWrite = std::make_unique<TShutdownOperation>(false); auto future = shutdownWrite->ToFuture(); - StartIO(&WriteDirection_, std::move(shutdownWrite)); + StartIO(GetPtr(WriteDirection_), std::move(shutdownWrite)); return future; } @@ -760,35 +1048,30 @@ public: PeerDisconnectedList_.Subscribe(std::move(callback)); } -private: +protected: const TConnectionId Id_ = TConnectionId::Create(); - const TString Endpoint_; + const std::string Endpoint_; const std::string LoggingTag_; const NLogging::TLogger Logger; - const TNetworkAddress LocalAddress_; - const TNetworkAddress RemoteAddress_; TFileDescriptor FD_ = -1; - const IPollerPtr Poller_; - - // If set to true via ctor argument - // |useDeliveryFence| will use - // DeliverFencedWriteOperations - // instead of WriteOperations, - // which future is set only - // after data from pipe has been read. - const bool UseDeliveryFence_ = false; - const TString PipePath_; - + int SynchronousIOCount_ = 0; TFDConnectionImpl( TFileDescriptor fd, - TString filePath, + EPollControl FDEpollControl, + EPollControl readEpollControl, + EPollControl writeEpollControl, IPollerPtr poller, + std::string filePath, + // COMPAT(pogorelov) bool useDeliveryFence) : Endpoint_(Format("File{%v}", filePath)) , LoggingTag_(MakeLoggingTag(Id_, Endpoint_)) , Logger(NetLogger().WithRawTag(LoggingTag_)) , FD_(fd) + , FDEpollControl_(FDEpollControl) + , ReadEpollControl_(readEpollControl) + , WriteEpollControl_(writeEpollControl) , Poller_(std::move(poller)) , UseDeliveryFence_(useDeliveryFence) , PipePath_(std::move(filePath)) @@ -796,15 +1079,21 @@ private: TFDConnectionImpl( TFileDescriptor fd, + EPollControl epollControl, + EPollControl readEpollControl, + EPollControl writeEpollControl, const TNetworkAddress& localAddress, const TNetworkAddress& remoteAddress, IPollerPtr poller) : Endpoint_(Format("FD{%v<->%v}", localAddress, remoteAddress)) , LoggingTag_(MakeLoggingTag(Id_, Endpoint_)) , Logger(NetLogger().WithRawTag(LoggingTag_)) + , FD_(fd) + , FDEpollControl_(epollControl) + , ReadEpollControl_(readEpollControl) + , WriteEpollControl_(writeEpollControl) , LocalAddress_(localAddress) , RemoteAddress_(remoteAddress) - , FD_(fd) , Poller_(std::move(poller)) { } @@ -813,6 +1102,34 @@ private: YT_LOG_DEBUG("Connection destroyed"); } + void Arm(EPollControl additionalFlags = {}) + { + Poller_->Arm(FD_, this, FDEpollControl_ | additionalFlags); + } + + bool TryRegister() + { + return Poller_->TryRegister(MakeStrong(this)); + } + +private: + const EPollControl FDEpollControl_; + const EPollControl ReadEpollControl_; + const EPollControl WriteEpollControl_; + const TNetworkAddress LocalAddress_; + const TNetworkAddress RemoteAddress_; + const IPollerPtr Poller_; + + // COMPAT(pogorelov) + // If set to true via ctor argument + // |useDeliveryFence| will use + // DeliverFencedWriteOperations + // instead of WriteOperations, + // which future is set only + // after data from pipe has been read. + const bool UseDeliveryFence_ = false; + const std::string PipePath_; + DECLARE_NEW_FRIEND() class TSynchronousIOGuard @@ -905,7 +1222,6 @@ private: TIODirection ReadDirection_{EDirection::Read}; TIODirection WriteDirection_{EDirection::Write}; bool ShutdownRequested_ = false; - int SynchronousIOCount_ = 0; TError WriteError_; TError ReadError_; const TPromise<void> ShutdownPromise_ = NewPromise<void>(); @@ -918,36 +1234,21 @@ private: TDelayedExecutorCookie ReadTimeoutCookie_; TDelayedExecutorCookie WriteTimeoutCookie_; - static std::string MakeLoggingTag(TConnectionId id, const TString& endpoint) + static std::string MakeLoggingTag(TConnectionId id, const std::string& endpoint) { - return Format("ConnectionId: %v, Endpoint: %v", + return Format( + "ConnectionId: %v, Endpoint: %v", id, endpoint); } - TError AnnotateError(const TError& error) const + TError AnnotateError(TError error) const { - return error + return std::move(error) << TErrorAttribute("connection_id", Id_) << TErrorAttribute("connection_endpoint", Endpoint_); } - TFuture<void> DoWrite(const TSharedRef& data) - { - auto write = std::make_unique<TWriteOperation>(data); - auto future = write->ToFuture(); - StartIO(&WriteDirection_, std::move(write)); - return future; - } - - TFuture<void> DoDeliveryFencedWrite(const TSharedRef& data) - { - auto syncWrite = std::make_unique<TDeliveryFencedWriteOperation>(data, PipePath_); - auto future = syncWrite->ToFuture(); - StartIO(&WriteDirection_, std::move(syncWrite)); - return future; - } - void Init() { YT_LOG_DEBUG("Connection created"); @@ -955,7 +1256,7 @@ private: AbortFromReadTimeout_ = BIND(&TFDConnectionImpl::AbortFromReadTimeout, MakeWeak(this)); AbortFromWriteTimeout_ = BIND(&TFDConnectionImpl::AbortFromWriteTimeout, MakeWeak(this)); - if (!Poller_->TryRegister(this)) { + if (!TryRegister()) { ReadError_ = WriteError_ = AnnotateError(TError("Cannot register connection pollable")); return; } @@ -963,12 +1264,6 @@ private: Arm(); } - void Arm(EPollControl additionalFlags = {}) - { - auto control = EPollControl::Read | EPollControl::Write | EPollControl::EdgeTriggered | EPollControl::ReadHup; - Poller_->Arm(FD_, this, control | additionalFlags); - } - TError GetCurrentError(EDirection direction) { switch (direction) { @@ -984,7 +1279,7 @@ private: } } - void StartIO(TIODirection* direction, std::unique_ptr<IIOOperation> operation) + void StartIO(TNonNullPtr<TIODirection> direction, std::unique_ptr<IIOOperation> operation, bool forcefullyConsiderPending = false) { TError error; bool needRetry = false; @@ -1001,6 +1296,9 @@ private: YT_VERIFY(!direction->Running); direction->Operation = std::move(operation); direction->StartBusyTimer(); + + direction->Pending |= forcefullyConsiderPending; + // Start operation only if this direction already has pending // event otherwise reading from FIFO before opening by writer // will return EOF immediately. @@ -1009,7 +1307,7 @@ private: } if (!error.IsOK()) { - operation->Abort(error); + operation->Abort(std::move(error)); return; } @@ -1018,7 +1316,7 @@ private: } } - void DoIO(TIODirection* direction, bool event) + void DoIO(TNonNullPtr<TIODirection> direction, bool event) { { auto guard = Guard(Lock_); @@ -1045,7 +1343,7 @@ private: if (result.IsOK()) { direction->BytesTransferred += result.Value().ByteCount; } else { - result = AnnotateError(result); + result = AnnotateError(std::move(result)); } bool needUnregister = false; @@ -1102,7 +1400,7 @@ private: } if (!result.IsOK()) { - operation->Abort(result); + operation->Abort(std::move(result)); } else if (!result.Value().Retry) { operation->SetResult(); } else if (needRetry) { @@ -1114,12 +1412,13 @@ private: } } - TFuture<void> AbortIO(const TError& error) + TFuture<void> AbortIO(TError error) { + auto annotatedError = AnnotateError(std::move(error)); + auto guard = Guard(Lock_); // In case of read errors we have called Unarm and Unregister already. bool needUnarmAndUnregister = ReadError_.IsOK(); - auto annotatedError = AnnotateError(error); if (WriteError_.IsOK()) { WriteError_ = annotatedError; } @@ -1154,184 +1453,359 @@ private: DEFINE_REFCOUNTED_TYPE(TFDConnectionImpl) -//////////////////////////////////////////////////////////////////////////////// - -// The sole purpose of this class is to call Abort on Impl in dtor. -class TFDConnection - : public IConnection +#ifdef _linux_ +class TDeliveryFencedWriteConnectionImpl + : public TFDConnectionImpl { public: - TFDConnection( - TFileDescriptor fd, - TString pipePath, + static TDeliveryFencedWriteConnectionImplPtr Create( IPollerPtr poller, - TRefCountedPtr pipeHolder = nullptr, - bool useDeliveryFence = false) - : Impl_(TFDConnectionImpl::Create(fd, std::move(pipePath), std::move(poller), useDeliveryFence)) - , PipeHolder_(std::move(pipeHolder)) - { } - - TFDConnection( - TFileDescriptor fd, - const TNetworkAddress& localAddress, - const TNetworkAddress& remoteAddress, - IPollerPtr poller) - : Impl_(TFDConnectionImpl::Create(fd, localAddress, remoteAddress, std::move(poller))) - { } - - ~TFDConnection() + std::string pipePath, + std::optional<int> capacity) { - YT_UNUSED_FUTURE(Impl_->Abort(TError("Connection is abandoned"))); - } + TFileDescriptorGuard signalFD = CreateSignalFD(); + TDeliveryFencedWriteConnectionImplPtr impl; + impl = New<TDeliveryFencedWriteConnectionImpl>(std::move(poller), std::move(pipePath), signalFD.Get(), capacity); + impl->Init(); - TConnectionId GetId() const override - { - return Impl_->GetId(); + signalFD.Release(); + return impl; } - const TNetworkAddress& GetLocalAddress() const override + TFuture<void> Write(const TSharedRef& data) { - return Impl_->GetLocalAddress(); - } + auto writeOperation = std::make_unique<TDeliveryFencedWriteOperation>(data, WriteFD_, ReadFD_); - const TNetworkAddress& GetRemoteAddress() const override - { - return Impl_->GetRemoteAddress(); - } + auto future = writeOperation->ToFuture(); - int GetHandle() const override - { - return Impl_->GetHandle(); - } + DoWrite(std::move(writeOperation), /*forcefullyConsiderPending*/ true); - TFuture<size_t> Read(const TSharedMutableRef& data) override - { - return Impl_->Read(data); + return future; } - TFuture<void> Write(const TSharedRef& data) override - { - return Impl_->Write(data); - } +private: + const std::string PipePath_; + const std::optional<int> PipeCapacity_; - TFuture<void> WriteV(const TSharedRefArray& data) override - { - return Impl_->WriteV(data); - } + TFileDescriptor WriteFD_ = -1; + TFileDescriptor ReadFD_ = -1; - TFuture<void> Close() override + TDeliveryFencedWriteConnectionImpl( + IPollerPtr poller, + std::string pipePath, + TFileDescriptor signalFD, + std::optional<int> capacity) + : TFDConnectionImpl( + signalFD, + /*FDEpollControl*/ EPollControl::Read | EPollControl::EdgeTriggered, + /*readEpollControll*/ EPollControl::None, + // Yes, we read to write :) + /*writeEpollControl*/ EPollControl::Read, + std::move(poller), + pipePath, + // NB(pogorelov): DeliveryFence is old compat logic, so we turn it off here. + /*useDeliveryFence*/ false) + , PipePath_(std::move(pipePath)) + , PipeCapacity_(capacity) { - return Impl_->Close(); + YT_LOG_DEBUG("Delivery fenced connection created"); } - bool IsIdle() const override + ~TDeliveryFencedWriteConnectionImpl() { - return Impl_->IsIdle(); + YT_LOG_DEBUG("Delivery fenced connection destroyed"); } - bool IsReusable() const override + DECLARE_NEW_FRIEND(); + + static TFileDescriptor CreateSignalFD() { - return Impl_->IsReusable(); + sigset_t mask; + if (sigemptyset(&mask) == -1) { + ThrowError("empty sig set"); + } + if (sigaddset(&mask, DeliveryFencedWriteSignal) == -1) { + ThrowError(Format("add %v RT signal to sig set", DeliveryFencedWriteSignal - SIGRTMIN)); + } + + auto fd = HandleEintr(::signalfd, -1, &mask, SFD_NONBLOCK | SFD_CLOEXEC); + if (fd == -1) { + ThrowError("open signalfd"); + } + + return fd; } - TFuture<void> Abort() override + void Init() { - return Impl_->Abort(TError(NNet::EErrorCode::Aborted, "Connection aborted")); + TFileDescriptorGuard readFdGuard = HandleEintr(::open, PipePath_.data(), O_RDONLY | O_CLOEXEC | O_NONBLOCK); + if (readFdGuard.Get() == -1) { + ThrowError("open pipe for reading"); + } + + TFileDescriptorGuard writeFdGuard = HandleEintr(::open, PipePath_.data(), O_WRONLY | O_CLOEXEC); + if (writeFdGuard.Get() == -1) { + ThrowError("open pipe for writing"); + } + + auto flags = fcntl(writeFdGuard.Get(), F_GETFL); + if (flags == -1) { + ThrowError("get pipe writing fd flags"); + } + if (fcntl(writeFdGuard.Get(), F_SETFL, flags | O_NONBLOCK | FASYNC) == -1) { + ThrowError("set pipe writing fd flags"); + } + if (fcntl(writeFdGuard.Get(), F_SETOWN, getpid()) == -1) { + ThrowError("set pipe owner"); + } + if (fcntl(writeFdGuard.Get(), F_SETSIG, DeliveryFencedWriteSignal) == -1) { + ThrowError("set custom pipe signal"); + } + if (PipeCapacity_) { + SafeSetPipeCapacity(writeFdGuard.Get(), *PipeCapacity_); + } + + if (!TryRegister()) { + ThrowError("register connection in poller"); + } + + try { + Arm(); + } catch (const std::exception& ex) { + ThrowError("arm connection", ex); + } catch (...) { + ThrowError("arm connection"); + } + + YT_LOG_DEBUG("Delivery fenced connection initialized"); + + ReadFD_ = readFdGuard.Release(); + WriteFD_ = writeFdGuard.Release(); } - TFuture<void> CloseRead() override + void OnShutdown() final { - return Impl_->CloseRead(); + TFDConnectionImpl::OnShutdown(); + + YT_VERIFY(SynchronousIOCount_ == 0); + + YT_VERIFY(TryClose(WriteFD_, /*ignoreBadFD*/ false)); + YT_VERIFY(TryClose(ReadFD_, /*ignoreBadFD*/ false)); } - TFuture<void> CloseWrite() override + [[noreturn]] static void ThrowError(std::string_view action, TError innerError = TError()) { - return Impl_->CloseWrite(); + auto error = TError("Failed to %v for delivery fenced connection", action); + if (!innerError.IsOK()) { + innerError <<= std::move(innerError); + } else { + error <<= TError::FromSystem(); + } + THROW_ERROR(std::move(error)); } +}; - i64 GetReadByteCount() const override +DEFINE_REFCOUNTED_TYPE(TDeliveryFencedWriteConnectionImpl) + +#endif // _linux_ + +//////////////////////////////////////////////////////////////////////////////// + +// TODO(pogorelov): Make separate clases for pipe and socket connections. +// The sole purpose of this class is to call Abort on Impl in dtor. +// Since object of TFDConnection is created, you should not care about fd. +// But in case of exception you should close fd by yourself. +class TFDConnection + : public TReadWriteConnectionBase<TFDConnection> +{ +public: + TFDConnection( + TFileDescriptor fd, + IPollerPtr poller, + TRefCountedPtr pipeHolder, + std::string pipePath = "", + // COMPAT(pogorelov) + bool useDeliveryFence = false) + : Impl_(TFDConnectionImpl::Create(fd, std::move(poller), std::move(pipePath), useDeliveryFence)) + , PipeHolder_(std::move(pipeHolder)) + { } + + TFDConnection( + TFileDescriptor fd, + const TNetworkAddress& localAddress, + const TNetworkAddress& remoteAddress, + IPollerPtr poller) + : Impl_(TFDConnectionImpl::Create(fd, localAddress, remoteAddress, std::move(poller))) + { } + + TFDConnection( + TRefCountedPtr pipeHolder, + TFDConnectionImplPtr impl) + : Impl_(std::move(impl)) + , PipeHolder_(std::move(pipeHolder)) + { } + + ~TFDConnection() { - return Impl_->GetReadByteCount(); + YT_UNUSED_FUTURE(Impl_->Abort(TError("Connection is abandoned"))); } - - i64 GetWriteByteCount() const override + const TFDConnectionImplPtr& GetImpl() const { - return Impl_->GetWriteByteCount(); + return Impl_; } - TConnectionStatistics GetReadStatistics() const override +private: + const TFDConnectionImplPtr Impl_; + const TRefCountedPtr PipeHolder_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +#ifdef _linux_ + +[[noreturn]] void ThrowUnimplemented(std::string_view method) +{ + THROW_ERROR_EXCEPTION("%v is not supported", method); +} + +// NOTE(pogorelov): Using 2 TDeliveryFencedWriteConnection concurrently is not supported. +// Desired behavior: +// Write should complete only after all data has been written to the pipe and fully read by the reader. +// Yes, this is not what pipes were originally designed for. +// However, we now have a large amount of user code relying on our API (which currently only reads from stdin). +// We want to be able to adjust the supplying data portion size without modifying the API, hence this solution. +// +// Implementation: +// To achieve this behavior, we use: +// - signalfd (for signal-based notifications) +// - FASYNC mode (for asynchronous pipe I/O) +// - FIONREAD (to check remaining data in the pipe) +// +// Note about FASYNC Behavior: +// Starting from kernel 5.15, the writer receives a signal (SIGIO) on every read operation from the pipe. +// In earlier kernel versions, this behavior has been changed several times, but it appears to be finalized in 5.15 and later: +// https://web.git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit?id=fe67f4dd8daa252eb9aa7acb61555f3cc3c1ce4c +// +// Initialization Steps: +// 1) Configure the pipe for signals: +// - Set the process as the FIFO owner (F_SETOWN). +// - Enable FASYNC on the write-side file descriptor (F_SETFL) to receive I/O event signals. +// - Use a custom real-time signal (instead of SIGIO) to not occupy SIGIO. +// - We use real-time signal as a reserve for the future +// (when RT signal is used, kernel stores the queue with signal and its metadata such as fd it triggered by). +// 2) Block the custom signal for the entire process (sigprocmask). +// 3) Create a signalfd to read signals via a file descriptor. +// 4) Arm epoll for the signal FD: +// Monitor for EPOLLIN (even though this is the writer side—we’re waiting to read signals). +// +// Write Steps: +// 1) Write data to the pipe: +// Write until all data is sent or EWOULDBLOCK is encountered. +// (Unlike common writer, we don’t immediately sleep on epoll. +// Opening the pipe for reading wakes the writer, but FASYNC does not send SIGIO in this case.) +// 2) Check pipe status with FIONREAD: +// If the pipe is empty, writing is complete. +// 3) Wait for signals via epoll. +// 4) Process signals from signalfd: +// Read all the signals and repeat from Step 1. +class TDeliveryFencedWriteConnection + : public TWriteConnectionBase<TDeliveryFencedWriteConnection> +{ +public: + TDeliveryFencedWriteConnection( + TRefCountedPtr pipeHolder, + IPollerPtr poller, + std::string pipePath, + std::optional<int> capacity) + : Impl_(TDeliveryFencedWriteConnectionImpl::Create(std::move(poller), std::move(pipePath), capacity)) + , PipeHolder_(std::move(pipeHolder)) { - return Impl_->GetReadStatistics(); + YT_VERIFY(!HasActiveConnection.exchange(true)); } - TConnectionStatistics GetWriteStatistics() const override + ~TDeliveryFencedWriteConnection() { - return Impl_->GetWriteStatistics(); + YT_VERIFY(HasActiveConnection.exchange(false)); + YT_UNUSED_FUTURE(Impl_->Abort(TError("Connection is abandoned"))); } - - void SetReadDeadline(std::optional<TInstant> deadline) override + TFuture<void> WriteV(const TSharedRefArray& /*data*/) final { - Impl_->SetReadDeadline(deadline); + ThrowUnimplemented("WriteV"); } - void SetWriteDeadline(std::optional<TInstant> deadline) override + TFuture<void> CloseWrite() final { - Impl_->SetWriteDeadline(deadline); + ThrowUnimplemented("CloseWrite");; } - bool SetNoDelay() override + void SetWriteDeadline(std::optional<TInstant> /*deadline*/) { - return Impl_->SetNoDelay(); + ThrowUnimplemented("SetWriteDeadline"); } - bool SetKeepAlive() override + TFuture<void> Write(const TSharedRef& data) final { - return Impl_->SetKeepAlive(); + return static_cast<TDeliveryFencedWriteConnectionImpl&>(*Impl_).Write(data); } - void SubscribePeerDisconnect(TCallback<void()> cb) override + const TDeliveryFencedWriteConnectionImplPtr& GetImpl() const { - return Impl_->SubscribePeerDisconnect(std::move(cb)); + return Impl_; } private: - const TFDConnectionImplPtr Impl_; + const TDeliveryFencedWriteConnectionImplPtr Impl_; const TRefCountedPtr PipeHolder_; + + static inline std::atomic<bool> HasActiveConnection = false; }; +#endif // _linux_ + //////////////////////////////////////////////////////////////////////////////// namespace { TFileDescriptor CreateWriteFDForConnection( - const TString& pipePath, + const std::string& pipePath, std::optional<int> capacity, + // COMPAT(pogorelov) bool useDeliveryFence) { #ifdef _unix_ int flags = O_WRONLY | O_CLOEXEC; - int fd = HandleEintr(::open, pipePath.c_str(), flags); - if (fd == -1) { - THROW_ERROR_EXCEPTION("Failed to open named pipe") - << TError::FromSystem() + TFileDescriptorGuard fd = HandleEintr(::open, pipePath.c_str(), flags); + if (fd.Get() == -1) { + THROW_ERROR_EXCEPTION(MakeSystemError("Failed to open named pipe")) << TErrorAttribute("path", pipePath); } try { if (capacity) { - SafeSetPipeCapacity(fd, *capacity); + SafeSetPipeCapacity(fd.Get(), *capacity); } if (useDeliveryFence) { - SafeEnableEmptyPipeEpollEvent(fd); + SafeEnableEmptyPipeEpollEvent(fd.Get()); } - SafeMakeNonblocking(fd); + SafeMakeNonblocking(fd.Get()); + } catch (const std::exception& ex) { + YT_LOG_WARNING( + TError(ex) << TError::FromSystem(), + "Failed to open pipe for writing (UseDeliveryFence: %v, Capacity: %v)", + useDeliveryFence, + capacity); + throw; } catch (...) { - SafeClose(fd, false); + YT_LOG_WARNING( + "Failed to open pipe for writing (MaybeRelevantError: %v, UseDeliveryFence: %v, Capacity: %v)", + TError::FromSystem(), + useDeliveryFence, + capacity); throw; } - return fd; + return fd.Release(); #else THROW_ERROR_EXCEPTION("Unsupported platform"); #endif @@ -1353,31 +1827,27 @@ std::pair<IConnectionPtr, IConnectionPtr> CreateConnectionPair(IPollerPtr poller #endif if (HandleEintr(::socketpair, AF_LOCAL, flags, 0, fds) == -1) { - THROW_ERROR_EXCEPTION("Failed to create socket pair") - << TError::FromSystem(); + THROW_ERROR_EXCEPTION(MakeSystemError("Failed to create socket pair")); } #else if (SocketPair(fds, /*overlapped*/ false, /*cloexec*/ true) == SOCKET_ERROR) { - THROW_ERROR_EXCEPTION("Failed to create socket pair") - << TError::FromSystem(); + THROW_ERROR_EXCEPTION(MakeSystemError("Failed to create socket pair")); } SetNonBlock(fds[0]); SetNonBlock(fds[1]); #endif + TFileDescriptorGuard fd0(fds[0]); + TFileDescriptorGuard fd1(fds[1]); - try { - auto address0 = GetSocketName(fds[0]); - auto address1 = GetSocketName(fds[1]); + auto address0 = GetSocketName(fds[0]); + auto address1 = GetSocketName(fds[1]); - auto first = New<TFDConnection>(fds[0], address0, address1, poller); - auto second = New<TFDConnection>(fds[1], address1, address0, std::move(poller)); - return std::pair(std::move(first), std::move(second)); - } catch (...) { - YT_VERIFY(TryClose(fds[0], false)); - YT_VERIFY(TryClose(fds[1], false)); - throw; - } + auto first = New<TFDConnection>(fds[0], address0, address1, poller); + fd0.Release(); + auto second = New<TFDConnection>(fds[1], address1, address0, std::move(poller)); + fd1.Release(); + return std::pair(std::move(first), std::move(second)); } IConnectionPtr CreateConnectionFromFD( @@ -1391,46 +1861,65 @@ IConnectionPtr CreateConnectionFromFD( IConnectionReaderPtr CreateInputConnectionFromFD( TFileDescriptor fd, - TString pipePath, + const std::string& /*pipePath*/, IPollerPtr poller, const TRefCountedPtr& pipeHolder) { - return New<TFDConnection>(fd, std::move(pipePath), std::move(poller), pipeHolder); + return New<TFDConnection>(fd, std::move(poller), pipeHolder); } IConnectionReaderPtr CreateInputConnectionFromPath( - TString pipePath, + std::string pipePath, IPollerPtr poller, - const TRefCountedPtr& pipeHolder) + TRefCountedPtr pipeHolder) { #ifdef _unix_ int flags = O_RDONLY | O_CLOEXEC | O_NONBLOCK; - int fd = HandleEintr(::open, pipePath.c_str(), flags); - if (fd == -1) { - THROW_ERROR_EXCEPTION("Failed to open named pipe") - << TError::FromSystem() + TFileDescriptorGuard fd = HandleEintr(::open, pipePath.c_str(), flags); + if (fd.Get() == -1) { + THROW_ERROR_EXCEPTION(MakeSystemError("Failed to open named pipe")) << TErrorAttribute("path", pipePath); } - return New<TFDConnection>(fd, std::move(pipePath), std::move(poller), pipeHolder); + auto connection = New<TFDConnection>(fd.Get(), std::move(poller), std::move(pipeHolder), std::move(pipePath)); + fd.Release(); + return connection; #else THROW_ERROR_EXCEPTION("Unsupported platform"); #endif } IConnectionWriterPtr CreateOutputConnectionFromPath( - TString pipePath, + std::string pipePath, IPollerPtr poller, - const TRefCountedPtr& pipeHolder, + TRefCountedPtr pipeHolder, std::optional<int> capacity, - bool useDeliveryFence) + EDeliveryFencedMode deliveryFencedMode) { - return New<TFDConnection>( - CreateWriteFDForConnection(pipePath, capacity, useDeliveryFence), - std::move(pipePath), + if (deliveryFencedMode == EDeliveryFencedMode::New) { + #ifdef _linux_ + return New<TDeliveryFencedWriteConnection>( + std::move(pipeHolder), + std::move(poller), + std::move(pipePath), + capacity); + #else // _linux_ + THROW_ERROR_EXCEPTION("Delivery fenced write is not supported on this platform"); + #endif // _linux_ + } + + bool useDeliveryFence = deliveryFencedMode == EDeliveryFencedMode::Old; + + TFileDescriptorGuard fd = CreateWriteFDForConnection(pipePath, capacity, useDeliveryFence); + auto connection = New<TFDConnection>( + fd.Get(), std::move(poller), - pipeHolder, + std::move(pipeHolder), + std::move(pipePath), useDeliveryFence); + fd.Release(); + + return connection; } //////////////////////////////////////////////////////////////////////////////// @@ -1475,16 +1964,18 @@ IPacketConnectionPtr CreatePacketConnection( const TNetworkAddress& at, NConcurrency::IPollerPtr poller) { - auto fd = CreateUdpSocket(); + TFileDescriptorGuard fd = CreateUdpSocket(); try { - SetReuseAddrFlag(fd); - BindSocket(fd, at); + SetReuseAddrFlag(fd.Get()); + BindSocket(fd.Get(), at); } catch (...) { - SafeClose(fd, false); + SafeClose(fd.Get(), false); throw; } - return New<TPacketConnection>(fd, at, std::move(poller)); + auto connection = New<TPacketConnection>(fd.Get(), at, std::move(poller)); + fd.Release(); + return connection; } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/net/connection.h b/yt/yt/core/net/connection.h index 3c9998e1d0c..4f0c3e4c238 100644 --- a/yt/yt/core/net/connection.h +++ b/yt/yt/core/net/connection.h @@ -65,8 +65,8 @@ DEFINE_REFCOUNTED_TYPE(IConnectionWriter) //////////////////////////////////////////////////////////////////////////////// struct IConnection - : public IConnectionReader - , public IConnectionWriter + : public virtual IConnectionReader + , public virtual IConnectionWriter { virtual TConnectionId GetId() const = 0; @@ -104,21 +104,21 @@ IConnectionPtr CreateConnectionFromFD( IConnectionReaderPtr CreateInputConnectionFromFD( TFileDescriptor fd, - TString pipePath, + const std::string& pipePath, NConcurrency::IPollerPtr poller, const TRefCountedPtr& pipeHolder); IConnectionReaderPtr CreateInputConnectionFromPath( - TString pipePath, + std::string pipePath, NConcurrency::IPollerPtr poller, - const TRefCountedPtr& pipeHolder); + TRefCountedPtr pipeHolder); IConnectionWriterPtr CreateOutputConnectionFromPath( - TString pipePath, + std::string pipePath, NConcurrency::IPollerPtr poller, - const TRefCountedPtr& pipeHolder, + TRefCountedPtr pipeHolder, std::optional<int> capacity = {}, - bool useDeliveryFence = false); + EDeliveryFencedMode deliveryFencedMode = EDeliveryFencedMode::None); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/net/public.h b/yt/yt/core/net/public.h index 725dc0c5170..bbca1eccfb4 100644 --- a/yt/yt/core/net/public.h +++ b/yt/yt/core/net/public.h @@ -8,6 +8,10 @@ #include <library/cpp/yt/misc/guid.h> +#ifdef _linux_ + #include <sys/signalfd.h> +#endif + namespace NYT::NNet { //////////////////////////////////////////////////////////////////////////////// @@ -18,6 +22,18 @@ class TIP6Network; using TConnectionId = TGuid; +enum class EDeliveryFencedMode +{ + None, + // COMPAT(pogorelov) + Old, + New, +}; + +#ifdef _linux_ +static inline const auto DeliveryFencedWriteSignal = SIGRTMIN; +#endif // _linux + DECLARE_REFCOUNTED_STRUCT(IConnection) DECLARE_REFCOUNTED_STRUCT(IPacketConnection) DECLARE_REFCOUNTED_STRUCT(IConnectionReader) |
