diff options
author | khlebnikov <khlebnikov@yandex-team.ru> | 2022-02-10 16:50:08 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:08 +0300 |
commit | 6cffcf9a14a1dd07278bd534c7cca706ec2827b3 (patch) | |
tree | 48eb57e1d9fd00d624ca68bb3418c3c041d1b096 | |
parent | 1977f1c7bcb225f59f789f5f8735e03eb0c87e1c (diff) | |
download | ydb-6cffcf9a14a1dd07278bd534c7cca706ec2827b3.tar.gz |
Restoring authorship annotation for <khlebnikov@yandex-team.ru>. Commit 1 of 2.
-rwxr-xr-x | build/scripts/fetch_from.py | 8 | ||||
-rw-r--r-- | build/scripts/fetch_from_mds.py | 20 | ||||
-rwxr-xr-x | build/scripts/fetch_from_sandbox.py | 16 | ||||
-rw-r--r-- | build/scripts/fetch_resource.py | 16 | ||||
-rw-r--r-- | build/ya.conf.json | 156 | ||||
-rw-r--r-- | library/cpp/coroutine/engine/cont_poller.cpp | 2 | ||||
-rw-r--r-- | library/cpp/coroutine/engine/impl.cpp | 50 | ||||
-rw-r--r-- | library/cpp/coroutine/engine/impl.h | 4 | ||||
-rw-r--r-- | library/cpp/coroutine/engine/network.cpp | 2 | ||||
-rw-r--r-- | library/python/ya.make | 2 | ||||
-rw-r--r-- | util/generic/fwd.h | 8 | ||||
-rw-r--r-- | util/generic/intrlist.h | 46 | ||||
-rw-r--r-- | util/generic/intrlist_ut.cpp | 60 | ||||
-rw-r--r-- | util/network/poller.cpp | 12 | ||||
-rw-r--r-- | util/network/poller.h | 6 | ||||
-rw-r--r-- | util/network/poller_ut.cpp | 218 | ||||
-rw-r--r-- | util/network/pollerimpl.h | 98 | ||||
-rw-r--r-- | util/string/benchmark/cast/main.cpp | 26 | ||||
-rw-r--r-- | util/system/file.cpp | 694 | ||||
-rw-r--r-- | util/system/file.h | 64 | ||||
-rw-r--r-- | util/system/file_ut.cpp | 60 | ||||
-rw-r--r-- | util/system/thread.i | 8 | ||||
-rw-r--r-- | util/thread/pool.cpp | 10 |
23 files changed, 793 insertions, 793 deletions
diff --git a/build/scripts/fetch_from.py b/build/scripts/fetch_from.py index db4fea50bf..a64b23daae 100755 --- a/build/scripts/fetch_from.py +++ b/build/scripts/fetch_from.py @@ -20,11 +20,11 @@ def make_user_agent(): return 'fetch_from: {host}'.format(host=socket.gethostname()) -def add_common_arguments(parser): +def add_common_arguments(parser): parser.add_argument('--copy-to') # used by jbuild in fetch_resource parser.add_argument('--rename-to') # used by test_node in inject_mds_resource_to_graph - parser.add_argument('--copy-to-dir') - parser.add_argument('--untar-to') + parser.add_argument('--copy-to-dir') + parser.add_argument('--untar-to') parser.add_argument('--rename', action='append', default=[], metavar='FILE', help='rename FILE to the corresponding output') parser.add_argument('--executable', action='store_true', help='make outputs executable') parser.add_argument('--log-path') @@ -330,7 +330,7 @@ def process(fetched_file, file_name, args, remove=True): if args.copy_to_dir: hardlink_or_copy(fetched_file, os.path.join(args.copy_to_dir, file_name)) - if args.untar_to: + if args.untar_to: ensure_dir(args.untar_to) # Extract only requested files try: diff --git a/build/scripts/fetch_from_mds.py b/build/scripts/fetch_from_mds.py index 5e4e656394..18b680f0bb 100644 --- a/build/scripts/fetch_from_mds.py +++ b/build/scripts/fetch_from_mds.py @@ -1,7 +1,7 @@ import os import sys import logging -import argparse +import argparse import fetch_from @@ -9,10 +9,10 @@ MDS_PREFIX = "https://storage.yandex-team.ru/get-devtools/" def parse_args(): - parser = argparse.ArgumentParser() - fetch_from.add_common_arguments(parser) + parser = argparse.ArgumentParser() + fetch_from.add_common_arguments(parser) - parser.add_argument('--key', required=True) + parser.add_argument('--key', required=True) return parser.parse_args() @@ -29,21 +29,21 @@ def fetch(key): return fetched_file, file_name -def main(args): - fetched_file, resource_file_name = fetch(args.key) +def main(args): + fetched_file, resource_file_name = fetch(args.key) - fetch_from.process(fetched_file, resource_file_name, args) + fetch_from.process(fetched_file, resource_file_name, args) if __name__ == '__main__': - args = parse_args() + args = parse_args() fetch_from.setup_logging(args, os.path.basename(__file__)) try: - main(args) + main(args) except Exception as e: logging.exception(e) - print >>sys.stderr, open(args.abs_log_path).read() + print >>sys.stderr, open(args.abs_log_path).read() sys.stderr.flush() import error diff --git a/build/scripts/fetch_from_sandbox.py b/build/scripts/fetch_from_sandbox.py index a99542e174..070b491ec3 100755 --- a/build/scripts/fetch_from_sandbox.py +++ b/build/scripts/fetch_from_sandbox.py @@ -1,7 +1,7 @@ import itertools import json import logging -import argparse +import argparse import os import random import subprocess @@ -19,10 +19,10 @@ TEMPORARY_ERROR_CODES = (429, 500, 503, 504) def parse_args(): - parser = argparse.ArgumentParser() - fetch_from.add_common_arguments(parser) - parser.add_argument('--resource-id', type=int, required=True) - parser.add_argument('--custom-fetcher') + parser = argparse.ArgumentParser() + fetch_from.add_common_arguments(parser) + parser.add_argument('--resource-id', type=int, required=True) + parser.add_argument('--custom-fetcher') parser.add_argument('--resource-file') return parser.parse_args() @@ -240,7 +240,7 @@ def _get_resource_info_from_file(resource_file): return None -def main(args): +def main(args): custom_fetcher = os.environ.get('YA_CUSTOM_FETCHER') resource_info = _get_resource_info_from_file(args.resource_file) @@ -255,11 +255,11 @@ def main(args): if __name__ == '__main__': - args = parse_args() + args = parse_args() fetch_from.setup_logging(args, os.path.basename(__file__)) try: - main(args) + main(args) except Exception as e: logging.exception(e) print >>sys.stderr, open(args.abs_log_path).read() diff --git a/build/scripts/fetch_resource.py b/build/scripts/fetch_resource.py index d5af311e5d..e22d28d83a 100644 --- a/build/scripts/fetch_resource.py +++ b/build/scripts/fetch_resource.py @@ -1,13 +1,13 @@ import urllib2 -import argparse +import argparse import xmlrpclib def parse_args(): - parser = argparse.ArgumentParser() - parser.add_argument('-r', '--resource-id', type=int, required=True) - parser.add_argument('-o', '--output', required=True) - return parser.parse_args() + parser = argparse.ArgumentParser() + parser.add_argument('-r', '--resource-id', type=int, required=True) + parser.add_argument('-o', '--output', required=True) + return parser.parse_args() def fetch(url, retries=4, timeout=5): @@ -37,7 +37,7 @@ def fetch_resource(id_): if __name__ == '__main__': - args = parse_args() + args = parse_args() - with open(args.output, 'wb') as f: - f.write(fetch_resource(int(args.resource_id))) + with open(args.output, 'wb') as f: + f.write(fetch_resource(int(args.resource_id))) diff --git a/build/ya.conf.json b/build/ya.conf.json index 5f7cc875d6..3d7a512ba1 100644 --- a/build/ya.conf.json +++ b/build/ya.conf.json @@ -368,7 +368,7 @@ }, "fio": { "description": "flexible I/O tester" - }, + }, "amduprof-cli": { "description": "AMDuProfCLI is a command-line tool for AMD uProf Profiler" }, @@ -378,12 +378,12 @@ "foremost": { "description": "Foremost is a Linux program to recover files based on their headers" }, - "stress-ng": { - "description": "stress load tester" - }, - "atop": { - "description": "Advanced System & Process Monitor" + "stress-ng": { + "description": "stress load tester" }, + "atop": { + "description": "Advanced System & Process Monitor" + }, "bpftool": { "description": "tool for inspection and simple manipulation of eBPF programs and maps" }, @@ -4900,8 +4900,8 @@ } ] }, - "qemu": { - "tools": { + "qemu": { + "tools": { "qemu": { "bottle": "qemu", "executable": "qemu" @@ -4918,16 +4918,16 @@ "bottle": "qemu", "executable": "qemu-nbd" } - }, - "platforms": [ + }, + "platforms": [ { "host": { "os": "LINUX" }, "default": true } - ] - }, + ] + }, "qyp": { "tools": { "qyp": { @@ -5024,22 +5024,22 @@ } ] }, - "atop": { - "tools": { - "atop": { - "bottle": "atop", - "executable": "atop" - } - }, - "platforms": [ - { - "host": { - "os": "LINUX" - }, - "default": true - } - ] - }, + "atop": { + "tools": { + "atop": { + "bottle": "atop", + "executable": "atop" + } + }, + "platforms": [ + { + "host": { + "os": "LINUX" + }, + "default": true + } + ] + }, "bpftool": { "tools": { "bpftool": { @@ -5088,22 +5088,22 @@ } ] }, - "stress-ng": { - "tools": { - "stress-ng": { - "bottle": "stress-ng", - "executable": "stress-ng" - } - }, - "platforms": [ - { - "host": { - "os": "LINUX" - }, - "default": true - } - ] - }, + "stress-ng": { + "tools": { + "stress-ng": { + "bottle": "stress-ng", + "executable": "stress-ng" + } + }, + "platforms": [ + { + "host": { + "os": "LINUX" + }, + "default": true + } + ] + }, "iperf": { "tools": { "iperf": { @@ -7418,14 +7418,14 @@ ] } }, - "qemu": { - "formula": { + "qemu": { + "formula": { "sandbox_id": [ 504011268 ], - "match": "Build executable" - }, - "executable": { + "match": "Build executable" + }, + "executable": { "qemu": [ "qemu", "bin", @@ -7446,8 +7446,8 @@ "bin", "qemu-nbd" ] - } - }, + } + }, "qyp": { "formula": { "sandbox_id": [ @@ -7522,20 +7522,20 @@ ] } }, - "atop": { - "formula": { - "sandbox_id": [ - 576898586 - ], - "match": "infra/kernel/tools/atop/build/atop-static.tar.gz" - }, - "executable": { - "atop": [ - "atop", - "atop" - ] - } - }, + "atop": { + "formula": { + "sandbox_id": [ + 576898586 + ], + "match": "infra/kernel/tools/atop/build/atop-static.tar.gz" + }, + "executable": { + "atop": [ + "atop", + "atop" + ] + } + }, "bpftool": { "formula": { "sandbox_id": [ @@ -7578,20 +7578,20 @@ ] } }, - "stress-ng": { - "formula": { - "sandbox_id": [ + "stress-ng": { + "formula": { + "sandbox_id": [ 755257561 - ], - "match": "infra/kernel/tools/stress-ng/build/stress-ng-static.tar.gz" - }, - "executable": { - "stress-ng": [ - "stress-ng", - "stress-ng" - ] - } - }, + ], + "match": "infra/kernel/tools/stress-ng/build/stress-ng-static.tar.gz" + }, + "executable": { + "stress-ng": [ + "stress-ng", + "stress-ng" + ] + } + }, "iperf": { "formula": { "sandbox_id": [ diff --git a/library/cpp/coroutine/engine/cont_poller.cpp b/library/cpp/coroutine/engine/cont_poller.cpp index 76593d4e9b..d0fea23e97 100644 --- a/library/cpp/coroutine/engine/cont_poller.cpp +++ b/library/cpp/coroutine/engine/cont_poller.cpp @@ -12,7 +12,7 @@ namespace NCoro { } cont->Executor()->ScheduleIoWait(event); - cont->Switch(); + cont->Switch(); if (cont->Cancelled()) { return ECANCELED; diff --git a/library/cpp/coroutine/engine/impl.cpp b/library/cpp/coroutine/engine/impl.cpp index 7ae6f74051..a75923e606 100644 --- a/library/cpp/coroutine/engine/impl.cpp +++ b/library/cpp/coroutine/engine/impl.cpp @@ -51,7 +51,7 @@ bool TCont::Join(TCont* c, TInstant deadLine) noexcept { c->Cancel(); do { - Switch(); + Switch(); } while (!ev.Empty()); } @@ -68,10 +68,10 @@ int TCont::SleepD(TInstant deadline) noexcept { return ExecuteEvent(&event); } -void TCont::Switch() noexcept { - Executor()->RunScheduler(); -} - +void TCont::Switch() noexcept { + Executor()->RunScheduler(); +} + void TCont::Yield() noexcept { if (SleepD(TInstant::Zero())) { ReScheduleAndSwitch(); @@ -80,7 +80,7 @@ void TCont::Yield() noexcept { void TCont::ReScheduleAndSwitch() noexcept { ReSchedule(); - Switch(); + Switch(); } void TCont::Terminate() { @@ -300,8 +300,8 @@ void TContExecutor::RunScheduler() noexcept { try { TContExecutor* const prev = ThisThreadExecutor(); ThisThreadExecutor() = this; - TCont* caller = Current_; - TExceptionSafeContext* context = caller ? caller->Trampoline_.Context() : &SchedContext_; + TCont* caller = Current_; + TExceptionSafeContext* context = caller ? caller->Trampoline_.Context() : &SchedContext_; Y_DEFER { ThisThreadExecutor() = prev; }; @@ -309,17 +309,17 @@ void TContExecutor::RunScheduler() noexcept { while (true) { if (ScheduleCallback_ && Current_) { ScheduleCallback_->OnUnschedule(*this); - } - - WaitForIO(); - DeleteScheduled(); + } + + WaitForIO(); + DeleteScheduled(); Ready_.Append(ReadyNext_); if (Ready_.Empty()) { - Current_ = nullptr; - if (caller) { - context->SwitchTo(&SchedContext_); - } + Current_ = nullptr; + if (caller) { + context->SwitchTo(&SchedContext_); + } break; } @@ -328,21 +328,21 @@ void TContExecutor::RunScheduler() noexcept { if (ScheduleCallback_) { ScheduleCallback_->OnSchedule(*this, *cont); } - - Current_ = cont; - cont->Scheduled_ = false; - if (cont == caller) { - break; + + Current_ = cont; + cont->Scheduled_ = false; + if (cont == caller) { + break; } - context->SwitchTo(cont->Trampoline_.Context()); + context->SwitchTo(cont->Trampoline_.Context()); if (Paused_) { Paused_ = false; Current_ = nullptr; break; } - if (caller) { - break; - } + if (caller) { + break; + } } } catch (...) { TBackTrace::FromCurrentException().PrintTo(Cerr); diff --git a/library/cpp/coroutine/engine/impl.h b/library/cpp/coroutine/engine/impl.h index 283a96ecf1..b07f8aafbf 100644 --- a/library/cpp/coroutine/engine/impl.h +++ b/library/cpp/coroutine/engine/impl.h @@ -99,8 +99,8 @@ public: void ReSchedule() noexcept; - void Switch() noexcept; - + void Switch() noexcept; + void SwitchTo(TExceptionSafeContext* ctx) { Trampoline_.SwitchTo(ctx); } diff --git a/library/cpp/coroutine/engine/network.cpp b/library/cpp/coroutine/engine/network.cpp index 85b647d210..389c1ea298 100644 --- a/library/cpp/coroutine/engine/network.cpp +++ b/library/cpp/coroutine/engine/network.cpp @@ -51,7 +51,7 @@ namespace NCoro { for (auto i : xrange(nfds)) { cont->Executor()->ScheduleIoWait(events.Data() + i); } - cont->Switch(); + cont->Switch(); if (cont->Cancelled()) { return ECANCELED; diff --git a/library/python/ya.make b/library/python/ya.make index 2e1eb6e0e1..da8461683c 100644 --- a/library/python/ya.make +++ b/library/python/ya.make @@ -172,7 +172,7 @@ RECURSE( solomon spack spyt - ssh_client + ssh_client ssh_sign startrek_python_client startrek_python_client/tests_int diff --git a/util/generic/fwd.h b/util/generic/fwd.h index 5cc2da40e5..caa854f39f 100644 --- a/util/generic/fwd.h +++ b/util/generic/fwd.h @@ -35,14 +35,14 @@ template <class T> struct THash; //intrusive containers -struct TIntrusiveListDefaultTag; -template <class T, class Tag = TIntrusiveListDefaultTag> +struct TIntrusiveListDefaultTag; +template <class T, class Tag = TIntrusiveListDefaultTag> class TIntrusiveList; -template <class T, class D, class Tag = TIntrusiveListDefaultTag> +template <class T, class D, class Tag = TIntrusiveListDefaultTag> class TIntrusiveListWithAutoDelete; -template <class T, class Tag = TIntrusiveListDefaultTag> +template <class T, class Tag = TIntrusiveListDefaultTag> class TIntrusiveSList; template <class T, class C> diff --git a/util/generic/intrlist.h b/util/generic/intrlist.h index b5d3f2051b..53c0838d36 100644 --- a/util/generic/intrlist.h +++ b/util/generic/intrlist.h @@ -6,14 +6,14 @@ #include <iterator> struct TIntrusiveListDefaultTag {}; - + /* * two-way linked list */ -template <class T, class Tag = TIntrusiveListDefaultTag> +template <class T, class Tag = TIntrusiveListDefaultTag> class TIntrusiveListItem { private: - using TListItem = TIntrusiveListItem<T, Tag>; + using TListItem = TIntrusiveListItem<T, Tag>; public: inline TIntrusiveListItem() noexcept @@ -118,10 +118,10 @@ private: TListItem* Prev_; }; -template <class T, class Tag> +template <class T, class Tag> class TIntrusiveList { private: - using TListItem = TIntrusiveListItem<T, Tag>; + using TListItem = TIntrusiveListItem<T, Tag>; template <class TListItem, class TNode> class TIteratorBase { @@ -365,10 +365,10 @@ public: return std::distance(Begin(), End()); } - inline void Remove(TListItem* item) noexcept { - item->Unlink(); - } - + inline void Remove(TListItem* item) noexcept { + item->Unlink(); + } + inline void Clear() noexcept { End_.Unlink(); } @@ -584,14 +584,14 @@ private: TListItem End_; }; -template <class T, class D, class Tag> -class TIntrusiveListWithAutoDelete: public TIntrusiveList<T, Tag> { +template <class T, class D, class Tag> +class TIntrusiveListWithAutoDelete: public TIntrusiveList<T, Tag> { public: - using TIterator = typename TIntrusiveList<T, Tag>::TIterator; - using TConstIterator = typename TIntrusiveList<T, Tag>::TConstIterator; + using TIterator = typename TIntrusiveList<T, Tag>::TIterator; + using TConstIterator = typename TIntrusiveList<T, Tag>::TConstIterator; - using TReverseIterator = typename TIntrusiveList<T, Tag>::TReverseIterator; - using TConstReverseIterator = typename TIntrusiveList<T, Tag>::TConstReverseIterator; + using TReverseIterator = typename TIntrusiveList<T, Tag>::TReverseIterator; + using TConstReverseIterator = typename TIntrusiveList<T, Tag>::TConstReverseIterator; using iterator = TIterator; using const_iterator = TConstIterator; @@ -603,7 +603,7 @@ public: inline TIntrusiveListWithAutoDelete() noexcept = default; inline TIntrusiveListWithAutoDelete(TIntrusiveListWithAutoDelete&& right) noexcept - : TIntrusiveList<T, Tag>(std::move(right)) + : TIntrusiveList<T, Tag>(std::move(right)) { } @@ -612,7 +612,7 @@ public: } TIntrusiveListWithAutoDelete& operator=(TIntrusiveListWithAutoDelete&& rhs) noexcept { - TIntrusiveList<T, Tag>::operator=(std::move(rhs)); + TIntrusiveList<T, Tag>::operator=(std::move(rhs)); return *this; } @@ -624,22 +624,22 @@ public: } inline static void Cut(TIterator begin, TIterator end) noexcept { - TIntrusiveListWithAutoDelete<T, D, Tag> temp; + TIntrusiveListWithAutoDelete<T, D, Tag> temp; Cut(begin, end, temp.End()); } inline static void Cut(TIterator begin, TIterator end, TIterator pasteBefore) noexcept { - TIntrusiveList<T, Tag>::Cut(begin, end, pasteBefore); + TIntrusiveList<T, Tag>::Cut(begin, end, pasteBefore); } }; /* * one-way linked list */ -template <class T, class Tag = TIntrusiveListDefaultTag> +template <class T, class Tag = TIntrusiveListDefaultTag> class TIntrusiveSListItem { private: - using TListItem = TIntrusiveSListItem<T, Tag>; + using TListItem = TIntrusiveSListItem<T, Tag>; public: inline TIntrusiveSListItem() noexcept @@ -678,10 +678,10 @@ private: TListItem* Next_; }; -template <class T, class Tag> +template <class T, class Tag> class TIntrusiveSList { private: - using TListItem = TIntrusiveSListItem<T, Tag>; + using TListItem = TIntrusiveSListItem<T, Tag>; public: template <class TListItem, class TNode> diff --git a/util/generic/intrlist_ut.cpp b/util/generic/intrlist_ut.cpp index eff7cdf2ee..27baadd582 100644 --- a/util/generic/intrlist_ut.cpp +++ b/util/generic/intrlist_ut.cpp @@ -20,7 +20,7 @@ class TListTest: public TTestBase { UNIT_TEST(TestListWithAutoDeleteMoveCtor); UNIT_TEST(TestListWithAutoDeleteMoveOpEq); UNIT_TEST(TestListWithAutoDeleteClear); - UNIT_TEST(TestSecondTag); + UNIT_TEST(TestSecondTag); UNIT_TEST_SUITE_END(); private: @@ -38,7 +38,7 @@ private: void TestListWithAutoDeleteMoveCtor(); void TestListWithAutoDeleteMoveOpEq(); void TestListWithAutoDeleteClear(); - void TestSecondTag(); + void TestSecondTag(); }; UNIT_TEST_SUITE_REGISTRATION(TListTest); @@ -475,38 +475,38 @@ void TListTest::TestListWithAutoDeleteClear() { UNIT_ASSERT_EQUAL(counter, 0); } - + struct TSecondTag {}; - -class TDoubleNode + +class TDoubleNode : public TInt, public TIntrusiveListItem<TDoubleNode, TSecondTag> { -public: +public: TDoubleNode(int value) noexcept : TInt(value) { } -}; - -void TListTest::TestSecondTag() { - TDoubleNode zero(0), one(1); - TIntrusiveList<TInt> first; - TIntrusiveList<TDoubleNode, TSecondTag> second; - - first.PushFront(&zero); - first.PushFront(&one); - second.PushBack(&zero); - second.PushBack(&one); - - UNIT_ASSERT_EQUAL(*first.Front(), 1); - UNIT_ASSERT_EQUAL(*++first.Begin(), 0); - UNIT_ASSERT_EQUAL(*first.Back(), 0); - - UNIT_ASSERT_EQUAL(*second.Front(), 0); - UNIT_ASSERT_EQUAL(*++second.Begin(), 1); - UNIT_ASSERT_EQUAL(*second.Back(), 1); - - second.Remove(&zero); - UNIT_ASSERT_EQUAL(*second.Front(), 1); - UNIT_ASSERT_EQUAL(*first.Back(), 0); -} +}; + +void TListTest::TestSecondTag() { + TDoubleNode zero(0), one(1); + TIntrusiveList<TInt> first; + TIntrusiveList<TDoubleNode, TSecondTag> second; + + first.PushFront(&zero); + first.PushFront(&one); + second.PushBack(&zero); + second.PushBack(&one); + + UNIT_ASSERT_EQUAL(*first.Front(), 1); + UNIT_ASSERT_EQUAL(*++first.Begin(), 0); + UNIT_ASSERT_EQUAL(*first.Back(), 0); + + UNIT_ASSERT_EQUAL(*second.Front(), 0); + UNIT_ASSERT_EQUAL(*++second.Begin(), 1); + UNIT_ASSERT_EQUAL(*second.Back(), 1); + + second.Remove(&zero); + UNIT_ASSERT_EQUAL(*second.Front(), 1); + UNIT_ASSERT_EQUAL(*first.Back(), 0); +} diff --git a/util/network/poller.cpp b/util/network/poller.cpp index 7954d0e8b5..470fc1d1c2 100644 --- a/util/network/poller.cpp +++ b/util/network/poller.cpp @@ -69,14 +69,14 @@ void TSocketPoller::WaitReadWriteOneShot(SOCKET sock, void* cookie) { Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_ONE_SHOT); } -void TSocketPoller::WaitReadWriteEdgeTriggered(SOCKET sock, void* cookie) { +void TSocketPoller::WaitReadWriteEdgeTriggered(SOCKET sock, void* cookie) { Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_EDGE_TRIGGERED); -} - -void TSocketPoller::RestartReadWriteEdgeTriggered(SOCKET sock, void* cookie, bool empty) { +} + +void TSocketPoller::RestartReadWriteEdgeTriggered(SOCKET sock, void* cookie, bool empty) { Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_MODIFY | CONT_POLL_EDGE_TRIGGERED | (empty ? CONT_POLL_BACKLOG_EMPTY : 0)); -} - +} + void TSocketPoller::Unwait(SOCKET sock) { Impl_->Remove(sock); } diff --git a/util/network/poller.h b/util/network/poller.h index 8dccd73140..5a16010676 100644 --- a/util/network/poller.h +++ b/util/network/poller.h @@ -19,9 +19,9 @@ public: void WaitWriteOneShot(SOCKET sock, void* cookie); void WaitReadWriteOneShot(SOCKET sock, void* cookie); - void WaitReadWriteEdgeTriggered(SOCKET sock, void* cookie); - void RestartReadWriteEdgeTriggered(SOCKET sock, void* cookie, bool empty = true); - + void WaitReadWriteEdgeTriggered(SOCKET sock, void* cookie); + void RestartReadWriteEdgeTriggered(SOCKET sock, void* cookie, bool empty = true); + void Unwait(SOCKET sock); size_t WaitD(void** events, size_t len, const TInstant& deadLine); diff --git a/util/network/poller_ut.cpp b/util/network/poller_ut.cpp index 6df0dda8ec..9a2370aff8 100644 --- a/util/network/poller_ut.cpp +++ b/util/network/poller_ut.cpp @@ -1,5 +1,5 @@ #include <library/cpp/testing/unittest/registar.h> -#include <util/system/error.h> +#include <util/system/error.h> #include "pair.h" #include "poller.h" @@ -100,115 +100,115 @@ Y_UNIT_TEST_SUITE(TSocketPollerTest) { poller.WaitRead(s1, nullptr); poller.WaitWrite(s1, nullptr); } - - Y_UNIT_TEST(TestSimpleEdgeTriggered) { - SOCKET sockets[2]; - UNIT_ASSERT(SocketPair(sockets) == 0); - - TSocketHolder s1(sockets[0]); - TSocketHolder s2(sockets[1]); - - SetNonBlock(sockets[1]); - - TSocketPoller poller; - - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - for (ui32 i = 0; i < 3; ++i) { - poller.WaitReadWriteEdgeTriggered(sockets[1], (void*)17); - - // notify about writeble - UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - char buf[2]; - - buf[0] = i + 10; - buf[1] = i + 20; - - // send one byte - UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); - - UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - // restart without reading - poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, false); - - // after restart read and write might generate separate events - { + + Y_UNIT_TEST(TestSimpleEdgeTriggered) { + SOCKET sockets[2]; + UNIT_ASSERT(SocketPair(sockets) == 0); + + TSocketHolder s1(sockets[0]); + TSocketHolder s2(sockets[1]); + + SetNonBlock(sockets[1]); + + TSocketPoller poller; + + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + for (ui32 i = 0; i < 3; ++i) { + poller.WaitReadWriteEdgeTriggered(sockets[1], (void*)17); + + // notify about writeble + UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + char buf[2]; + + buf[0] = i + 10; + buf[1] = i + 20; + + // send one byte + UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); + + UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + // restart without reading + poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, false); + + // after restart read and write might generate separate events + { void* events[3]; - size_t count = poller.WaitT(events, 3, TDuration::Zero()); - UNIT_ASSERT_GE(count, 1); - UNIT_ASSERT_LE(count, 2); - UNIT_ASSERT_VALUES_EQUAL(events[0], (void*)17); - } - - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - // second two more bytes - UNIT_ASSERT_VALUES_EQUAL(2, send(sockets[0], buf, 2, 0)); - - // here poller could notify or not because we haven't seen end - Y_UNUSED(poller.WaitT(TDuration::Zero())); - - // recv one, leave two - UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0)); - UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); - - // nothing new - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - // recv the rest - UNIT_ASSERT_VALUES_EQUAL(2, recv(sockets[1], buf, 2, 0)); - UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); - UNIT_ASSERT_VALUES_EQUAL(char(i + 20), buf[1]); - - // still nothing new - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - // hit end - ClearLastSystemError(); - UNIT_ASSERT_VALUES_EQUAL(-1, recv(sockets[1], buf, 1, 0)); - UNIT_ASSERT_VALUES_EQUAL(EAGAIN, LastSystemError()); - - // restart after end (noop for epoll) - poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true); - - // send and recv byte - UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); - - UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - // recv and see end - UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0)); - UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); - - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - // the same but send before restart - UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); - - // restart after end (noop for epoll) - poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true); - - UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0)); - UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); - - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - poller.Unwait(sockets[1]); - } - } + size_t count = poller.WaitT(events, 3, TDuration::Zero()); + UNIT_ASSERT_GE(count, 1); + UNIT_ASSERT_LE(count, 2); + UNIT_ASSERT_VALUES_EQUAL(events[0], (void*)17); + } + + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + // second two more bytes + UNIT_ASSERT_VALUES_EQUAL(2, send(sockets[0], buf, 2, 0)); + + // here poller could notify or not because we haven't seen end + Y_UNUSED(poller.WaitT(TDuration::Zero())); + + // recv one, leave two + UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0)); + UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); + + // nothing new + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + // recv the rest + UNIT_ASSERT_VALUES_EQUAL(2, recv(sockets[1], buf, 2, 0)); + UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); + UNIT_ASSERT_VALUES_EQUAL(char(i + 20), buf[1]); + + // still nothing new + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + // hit end + ClearLastSystemError(); + UNIT_ASSERT_VALUES_EQUAL(-1, recv(sockets[1], buf, 1, 0)); + UNIT_ASSERT_VALUES_EQUAL(EAGAIN, LastSystemError()); + + // restart after end (noop for epoll) + poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true); + + // send and recv byte + UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); + + UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + // recv and see end + UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0)); + UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); + + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + // the same but send before restart + UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); + + // restart after end (noop for epoll) + poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true); + + UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0)); + UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); + + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + poller.Unwait(sockets[1]); + } + } #if defined(HAVE_EPOLL_POLLER) Y_UNIT_TEST(TestRdhup) { diff --git a/util/network/pollerimpl.h b/util/network/pollerimpl.h index e8c7e40fba..2c43c09826 100644 --- a/util/network/pollerimpl.h +++ b/util/network/pollerimpl.h @@ -107,24 +107,24 @@ public: inline void SetImpl(void* data, int fd, int what) { TEvent e[2]; - int flags = EV_ADD; - - if (what & CONT_POLL_EDGE_TRIGGERED) { - if (what & CONT_POLL_BACKLOG_EMPTY) { - // When backlog is empty, edge-triggered does not need restart. - return; - } - flags |= EV_CLEAR; - } - - if (what & CONT_POLL_ONE_SHOT) { - flags |= EV_ONESHOT; - } - + int flags = EV_ADD; + + if (what & CONT_POLL_EDGE_TRIGGERED) { + if (what & CONT_POLL_BACKLOG_EMPTY) { + // When backlog is empty, edge-triggered does not need restart. + return; + } + flags |= EV_CLEAR; + } + + if (what & CONT_POLL_ONE_SHOT) { + flags |= EV_ONESHOT; + } + Zero(e); - EV_SET(e + 0, fd, EVFILT_READ, flags | ((what & CONT_POLL_READ) ? EV_ENABLE : EV_DISABLE), 0, 0, data); - EV_SET(e + 1, fd, EVFILT_WRITE, flags | ((what & CONT_POLL_WRITE) ? EV_ENABLE : EV_DISABLE), 0, 0, data); + EV_SET(e + 0, fd, EVFILT_READ, flags | ((what & CONT_POLL_READ) ? EV_ENABLE : EV_DISABLE), 0, 0, data); + EV_SET(e + 1, fd, EVFILT_WRITE, flags | ((what & CONT_POLL_WRITE) ? EV_ENABLE : EV_DISABLE), 0, 0, data); if (Kevent(Fd_, e, 2, nullptr, 0, nullptr) == -1) { ythrow TSystemError() << "kevent add failed"; @@ -225,33 +225,33 @@ public: Zero(e); - if (what & CONT_POLL_EDGE_TRIGGERED) { - if (what & CONT_POLL_BACKLOG_EMPTY) { - // When backlog is empty, edge-triggered does not need restart. - return; - } - e.events |= EPOLLET; - } - - if (what & CONT_POLL_ONE_SHOT) { - e.events |= EPOLLONESHOT; - } - - if (what & CONT_POLL_READ) { - e.events |= EPOLLIN; - } - - if (what & CONT_POLL_WRITE) { - e.events |= EPOLLOUT; - } - + if (what & CONT_POLL_EDGE_TRIGGERED) { + if (what & CONT_POLL_BACKLOG_EMPTY) { + // When backlog is empty, edge-triggered does not need restart. + return; + } + e.events |= EPOLLET; + } + + if (what & CONT_POLL_ONE_SHOT) { + e.events |= EPOLLONESHOT; + } + + if (what & CONT_POLL_READ) { + e.events |= EPOLLIN; + } + + if (what & CONT_POLL_WRITE) { + e.events |= EPOLLOUT; + } + if (what & CONT_POLL_RDHUP) { e.events |= EPOLLRDHUP; } e.data.ptr = data; - if ((what & CONT_POLL_MODIFY) || epoll_ctl(Fd_, EPOLL_CTL_ADD, fd, &e) == -1) { + if ((what & CONT_POLL_MODIFY) || epoll_ctl(Fd_, EPOLL_CTL_ADD, fd, &e) == -1) { if (epoll_ctl(Fd_, EPOLL_CTL_MOD, fd, &e) == -1) { ythrow TSystemError() << "epoll add failed"; } @@ -345,10 +345,10 @@ struct TSelectPollerNoTemplate { Filter_ = s; } - inline void Clear(int c) noexcept { - Filter_ &= ~c; - } - + inline void Clear(int c) noexcept { + Filter_ &= ~c; + } + inline int Filter() const noexcept { return Filter_; } @@ -523,9 +523,9 @@ public: TEvent* eventsStart = events; - for (typename TFds::iterator it = Fds_.begin(); it != Fds_.end(); ++it) { + for (typename TFds::iterator it = Fds_.begin(); it != Fds_.end(); ++it) { const SOCKET fd = it->first; - THandle& handle = it->second; + THandle& handle = it->second; if (FD_ISSET(fd, errFds)) { (events++)->Error(handle.Data(), EIO); @@ -553,12 +553,12 @@ public: *keysToDeleteEnd = fd; ++keysToDeleteEnd; } - - if (handle.Filter() & CONT_POLL_EDGE_TRIGGERED) { - // Emulate edge-triggered for level-triggered select(). - // User must restart waiting this event when needed. - handle.Clear(what); - } + + if (handle.Filter() & CONT_POLL_EDGE_TRIGGERED) { + // Emulate edge-triggered for level-triggered select(). + // User must restart waiting this event when needed. + handle.Clear(what); + } } } } diff --git a/util/string/benchmark/cast/main.cpp b/util/string/benchmark/cast/main.cpp index f604712ab6..986ef7ad38 100644 --- a/util/string/benchmark/cast/main.cpp +++ b/util/string/benchmark/cast/main.cpp @@ -3,36 +3,36 @@ #include <util/string/cast.h> #include <util/generic/xrange.h> -char str1[] = "1"; -char str12[] = "12"; -char str1234[] = "1234"; -char str12345678[] = "12345678"; - +char str1[] = "1"; +char str12[] = "12"; +char str1234[] = "1234"; +char str12345678[] = "12345678"; + Y_CPU_BENCHMARK(Parse_1, iface) { for (const auto i : xrange(iface.Iterations())) { Y_UNUSED(i); - Y_DO_NOT_OPTIMIZE_AWAY(FromString<ui32>(str1, 1)); + Y_DO_NOT_OPTIMIZE_AWAY(FromString<ui32>(str1, 1)); } } Y_CPU_BENCHMARK(Parse_12, iface) { for (const auto i : xrange(iface.Iterations())) { Y_UNUSED(i); - Y_DO_NOT_OPTIMIZE_AWAY(FromString<ui32>(str12, 2)); + Y_DO_NOT_OPTIMIZE_AWAY(FromString<ui32>(str12, 2)); } } Y_CPU_BENCHMARK(Parse_1234, iface) { for (const auto i : xrange(iface.Iterations())) { Y_UNUSED(i); - Y_DO_NOT_OPTIMIZE_AWAY(FromString<ui32>(str1234, 4)); + Y_DO_NOT_OPTIMIZE_AWAY(FromString<ui32>(str1234, 4)); } } Y_CPU_BENCHMARK(Parse_12345678, iface) { for (const auto i : xrange(iface.Iterations())) { Y_UNUSED(i); - Y_DO_NOT_OPTIMIZE_AWAY(FromString<ui32>(str12345678, 8)); + Y_DO_NOT_OPTIMIZE_AWAY(FromString<ui32>(str12345678, 8)); } } @@ -40,27 +40,27 @@ Y_CPU_BENCHMARK(Parse_12345678, iface) { Y_CPU_BENCHMARK(Atoi_1, iface) { for (const auto i : xrange(iface.Iterations())) { Y_UNUSED(i); - Y_DO_NOT_OPTIMIZE_AWAY(atoi(str1)); + Y_DO_NOT_OPTIMIZE_AWAY(atoi(str1)); } } Y_CPU_BENCHMARK(Atoi_12, iface) { for (const auto i : xrange(iface.Iterations())) { Y_UNUSED(i); - Y_DO_NOT_OPTIMIZE_AWAY(atoi(str12)); + Y_DO_NOT_OPTIMIZE_AWAY(atoi(str12)); } } Y_CPU_BENCHMARK(Atoi_1234, iface) { for (const auto i : xrange(iface.Iterations())) { Y_UNUSED(i); - Y_DO_NOT_OPTIMIZE_AWAY(atoi(str1234)); + Y_DO_NOT_OPTIMIZE_AWAY(atoi(str1234)); } } Y_CPU_BENCHMARK(Atoi_12345678, iface) { for (const auto i : xrange(iface.Iterations())) { Y_UNUSED(i); - Y_DO_NOT_OPTIMIZE_AWAY(atoi(str12345678)); + Y_DO_NOT_OPTIMIZE_AWAY(atoi(str12345678)); } } diff --git a/util/system/file.cpp b/util/system/file.cpp index 4a261d020c..c0be875970 100644 --- a/util/system/file.cpp +++ b/util/system/file.cpp @@ -2,11 +2,11 @@ #include "flock.h" #include "fstat.h" #include "sysstat.h" -#include "align.h" +#include "align.h" #include "info.h" -#include <array> - +#include <array> + #include <util/string/util.h> #include <util/string/cast.h> #include <util/string/builder.h> @@ -67,12 +67,12 @@ TFileHandle::TFileHandle(const TString& fName, EOpenMode oMode) noexcept { ui32 fcMode = 0; EOpenMode createMode = oMode & MaskCreation; Y_VERIFY(!IsStupidFlagCombination(oMode), "oMode %d makes no sense", static_cast<int>(oMode)); - if (!(oMode & MaskRW)) { + if (!(oMode & MaskRW)) { oMode |= RdWr; - } - if (!(oMode & AMask)) { + } + if (!(oMode & AMask)) { oMode |= ARW; - } + } #ifdef _win_ @@ -98,13 +98,13 @@ TFileHandle::TFileHandle(const TString& fName, EOpenMode oMode) noexcept { } ui32 faMode = 0; - if (oMode & RdOnly) { + if (oMode & RdOnly) { faMode |= GENERIC_READ; - } - if (oMode & WrOnly) { - // WrOnly or RdWr + } + if (oMode & WrOnly) { + // WrOnly or RdWr faMode |= GENERIC_WRITE; - } + } if (oMode & ::ForAppend) { faMode |= GENERIC_WRITE; faMode |= FILE_APPEND_DATA; @@ -116,29 +116,29 @@ TFileHandle::TFileHandle(const TString& fName, EOpenMode oMode) noexcept { ui32 shMode = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE; ui32 attrMode = FILE_ATTRIBUTE_NORMAL; - if ((createMode == OpenExisting || createMode == OpenAlways) && ((oMode & AMask) == (oMode & AR))) { + if ((createMode == OpenExisting || createMode == OpenAlways) && ((oMode & AMask) == (oMode & AR))) { attrMode |= FILE_ATTRIBUTE_READONLY; - } - if (oMode & Seq) { + } + if (oMode & Seq) { attrMode |= FILE_FLAG_SEQUENTIAL_SCAN; - } - if (oMode & Temp) { - // we use TTempFile instead of FILE_FLAG_DELETE_ON_CLOSE - attrMode |= FILE_ATTRIBUTE_TEMPORARY; - } - if (oMode & Transient) { + } + if (oMode & Temp) { + // we use TTempFile instead of FILE_FLAG_DELETE_ON_CLOSE + attrMode |= FILE_ATTRIBUTE_TEMPORARY; + } + if (oMode & Transient) { attrMode |= FILE_FLAG_DELETE_ON_CLOSE; - } - if ((oMode & (Direct | DirectAligned)) && (oMode & WrOnly)) { - // WrOnly or RdWr + } + if ((oMode & (Direct | DirectAligned)) && (oMode & WrOnly)) { + // WrOnly or RdWr attrMode |= /*FILE_FLAG_NO_BUFFERING |*/ FILE_FLAG_WRITE_THROUGH; - } + } Fd_ = NFsPrivate::CreateFileWithUtf8Name(fName, faMode, shMode, fcMode, attrMode, inheritHandle); - if ((oMode & ::ForAppend) && (Fd_ != INVALID_FHANDLE)) { + if ((oMode & ::ForAppend) && (Fd_ != INVALID_FHANDLE)) { ::SetFilePointer(Fd_, 0, 0, FILE_END); - } + } #elif defined(_unix_) @@ -163,13 +163,13 @@ TFileHandle::TFileHandle(const TString& fName, EOpenMode oMode) noexcept { break; } - if ((oMode & RdOnly) && (oMode & WrOnly)) { + if ((oMode & RdOnly) && (oMode & WrOnly)) { fcMode |= O_RDWR; - } else if (oMode & RdOnly) { + } else if (oMode & RdOnly) { fcMode |= O_RDONLY; - } else if (oMode & WrOnly) { + } else if (oMode & WrOnly) { fcMode |= O_WRONLY; - } + } if (oMode & ::ForAppend) { fcMode |= O_APPEND; @@ -194,9 +194,9 @@ TFileHandle::TFileHandle(const TString& fName, EOpenMode oMode) noexcept { #elif defined(_linux_) if (oMode & DirectAligned) { /* - * O_DIRECT in Linux requires aligning request size and buffer address - * to size of hardware sector (see hw_sector_size or ioctl BLKSSZGET). - * Usually 512 bytes, but modern hardware works better with 4096 bytes. + * O_DIRECT in Linux requires aligning request size and buffer address + * to size of hardware sector (see hw_sector_size or ioctl BLKSSZGET). + * Usually 512 bytes, but modern hardware works better with 4096 bytes. */ fcMode |= O_DIRECT; } @@ -210,33 +210,33 @@ TFileHandle::TFileHandle(const TString& fName, EOpenMode oMode) noexcept { #endif ui32 permMode = 0; - if (oMode & AXOther) { + if (oMode & AXOther) { permMode |= S_IXOTH; - } - if (oMode & AWOther) { + } + if (oMode & AWOther) { permMode |= S_IWOTH; - } - if (oMode & AROther) { + } + if (oMode & AROther) { permMode |= S_IROTH; - } - if (oMode & AXGroup) { + } + if (oMode & AXGroup) { permMode |= S_IXGRP; - } - if (oMode & AWGroup) { + } + if (oMode & AWGroup) { permMode |= S_IWGRP; - } - if (oMode & ARGroup) { + } + if (oMode & ARGroup) { permMode |= S_IRGRP; - } - if (oMode & AXUser) { + } + if (oMode & AXUser) { permMode |= S_IXUSR; - } - if (oMode & AWUser) { + } + if (oMode & AWUser) { permMode |= S_IWUSR; - } - if (oMode & ARUser) { + } + if (oMode & ARUser) { permMode |= S_IRUSR; - } + } do { Fd_ = ::open(fName.data(), fcMode, permMode); @@ -245,21 +245,21 @@ TFileHandle::TFileHandle(const TString& fName, EOpenMode oMode) noexcept { #if HAVE_POSIX_FADVISE if (Fd_ >= 0) { if (oMode & NoReuse) { - ::posix_fadvise(Fd_, 0, 0, POSIX_FADV_NOREUSE); + ::posix_fadvise(Fd_, 0, 0, POSIX_FADV_NOREUSE); } if (oMode & Seq) { - ::posix_fadvise(Fd_, 0, 0, POSIX_FADV_SEQUENTIAL); - } - - if (oMode & NoReadAhead) { - ::posix_fadvise(Fd_, 0, 0, POSIX_FADV_RANDOM); + ::posix_fadvise(Fd_, 0, 0, POSIX_FADV_SEQUENTIAL); } + + if (oMode & NoReadAhead) { + ::posix_fadvise(Fd_, 0, 0, POSIX_FADV_RANDOM); + } } #endif //temp file - if (Fd_ >= 0 && (oMode & Transient)) { + if (Fd_ >= 0 && (oMode & Transient)) { unlink(fName.data()); } #else @@ -270,17 +270,17 @@ TFileHandle::TFileHandle(const TString& fName, EOpenMode oMode) noexcept { bool TFileHandle::Close() noexcept { bool isOk = true; #ifdef _win_ - if (Fd_ != INVALID_FHANDLE) { + if (Fd_ != INVALID_FHANDLE) { isOk = (::CloseHandle(Fd_) != 0); - } + } if (!isOk) { Y_VERIFY(GetLastError() != ERROR_INVALID_HANDLE, "must not quietly close invalid handle"); } #elif defined(_unix_) - if (Fd_ != INVALID_FHANDLE) { + if (Fd_ != INVALID_FHANDLE) { isOk = (::close(Fd_) == 0 || errno == EINTR); - } + } if (!isOk) { // Do not quietly close bad descriptor, // because often it means double close @@ -295,17 +295,17 @@ bool TFileHandle::Close() noexcept { } static inline i64 DoSeek(FHANDLE h, i64 offset, SeekDir origin) noexcept { - if (h == INVALID_FHANDLE) { + if (h == INVALID_FHANDLE) { return -1L; - } + } #if defined(_win_) static ui32 dir[] = {FILE_BEGIN, FILE_CURRENT, FILE_END}; LARGE_INTEGER pos; pos.QuadPart = offset; pos.LowPart = ::SetFilePointer(h, pos.LowPart, &pos.HighPart, dir[origin]); - if (pos.LowPart == INVALID_SET_FILE_POINTER && GetLastError() != NO_ERROR) { + if (pos.LowPart == INVALID_SET_FILE_POINTER && GetLastError() != NO_ERROR) { pos.QuadPart = -1; - } + } return pos.QuadPart; #elif defined(_unix_) static int dir[] = {SEEK_SET, SEEK_CUR, SEEK_END}; @@ -329,32 +329,32 @@ i64 TFileHandle::Seek(i64 offset, SeekDir origin) noexcept { i64 TFileHandle::GetLength() const noexcept { // XXX: returns error code, but does not set errno - if (!IsOpen()) { + if (!IsOpen()) { return -1L; - } + } return GetFileLength(Fd_); } bool TFileHandle::Resize(i64 length) noexcept { - if (!IsOpen()) { + if (!IsOpen()) { return false; - } + } i64 currentLength = GetLength(); - if (length == currentLength) { + if (length == currentLength) { return true; - } + } #if defined(_win_) i64 currentPosition = GetPosition(); - if (currentPosition == -1L) { + if (currentPosition == -1L) { return false; - } + } Seek(length, sSet); - if (!::SetEndOfFile(Fd_)) { + if (!::SetEndOfFile(Fd_)) { return false; - } - if (currentPosition < length) { + } + if (currentPosition < length) { Seek(currentPosition, sSet); - } + } return true; #elif defined(_unix_) return (0 == ftruncate(Fd_, (off_t)length)); @@ -364,17 +364,17 @@ bool TFileHandle::Resize(i64 length) noexcept { } bool TFileHandle::Reserve(i64 length) noexcept { - // FIXME this should reserve disk space with fallocate - if (!IsOpen()) { + // FIXME this should reserve disk space with fallocate + if (!IsOpen()) { return false; - } + } i64 currentLength = GetLength(); - if (length <= currentLength) { + if (length <= currentLength) { return true; - } - if (!Resize(length)) { + } + if (!Resize(length)) { return false; - } + } #if defined(_win_) if (!::SetFileValidData(Fd_, length)) { Resize(currentLength); @@ -413,29 +413,29 @@ bool TFileHandle::ShrinkToFit() noexcept { } bool TFileHandle::Flush() noexcept { - if (!IsOpen()) { + if (!IsOpen()) { return false; - } + } #if defined(_win_) bool ok = ::FlushFileBuffers(Fd_) != 0; - /* - * FlushFileBuffers fails if hFile is a handle to the console output. - * That is because the console output is not buffered. - * The function returns FALSE, and GetLastError returns ERROR_INVALID_HANDLE. - */ - return ok || GetLastError() == ERROR_INVALID_HANDLE; + /* + * FlushFileBuffers fails if hFile is a handle to the console output. + * That is because the console output is not buffered. + * The function returns FALSE, and GetLastError returns ERROR_INVALID_HANDLE. + */ + return ok || GetLastError() == ERROR_INVALID_HANDLE; #elif defined(_unix_) int ret = ::fsync(Fd_); - /* - * Ignore EROFS, EINVAL - fd is bound to a special file - * (PIPE, FIFO, or socket) which does not support synchronization. - * Fail in case of EIO, ENOSPC, EDQUOT - data might be lost. - */ + /* + * Ignore EROFS, EINVAL - fd is bound to a special file + * (PIPE, FIFO, or socket) which does not support synchronization. + * Fail in case of EIO, ENOSPC, EDQUOT - data might be lost. + */ return ret == 0 || errno == EROFS || errno == EINVAL #if defined(_darwin_) - // ENOTSUP fd does not refer to a vnode - || errno == ENOTSUP + // ENOTSUP fd does not refer to a vnode + || errno == ENOTSUP #endif ; #else @@ -451,7 +451,7 @@ bool TFileHandle::FlushData() noexcept { int ret = ::fdatasync(Fd_); - // Same loginc in error handling as for fsync above. + // Same loginc in error handling as for fsync above. return ret == 0 || errno == EROFS || errno == EINVAL; #else return Flush(); @@ -459,21 +459,21 @@ bool TFileHandle::FlushData() noexcept { } i32 TFileHandle::Read(void* buffer, ui32 byteCount) noexcept { - // FIXME size and return must be 64-bit - if (!IsOpen()) { + // FIXME size and return must be 64-bit + if (!IsOpen()) { return -1; - } + } #if defined(_win_) DWORD bytesRead = 0; - if (::ReadFile(Fd_, buffer, byteCount, &bytesRead, nullptr)) { + if (::ReadFile(Fd_, buffer, byteCount, &bytesRead, nullptr)) { return bytesRead; - } + } return -1; #elif defined(_unix_) i32 ret; - do { - ret = ::read(Fd_, buffer, byteCount); - } while (ret == -1 && errno == EINTR); + do { + ret = ::read(Fd_, buffer, byteCount); + } while (ret == -1 && errno == EINTR); return ret; #else #error unsupported platform @@ -481,20 +481,20 @@ i32 TFileHandle::Read(void* buffer, ui32 byteCount) noexcept { } i32 TFileHandle::Write(const void* buffer, ui32 byteCount) noexcept { - if (!IsOpen()) { + if (!IsOpen()) { return -1; - } + } #if defined(_win_) DWORD bytesWritten = 0; - if (::WriteFile(Fd_, buffer, byteCount, &bytesWritten, nullptr)) { + if (::WriteFile(Fd_, buffer, byteCount, &bytesWritten, nullptr)) { return bytesWritten; - } + } return -1; #elif defined(_unix_) i32 ret; - do { - ret = ::write(Fd_, buffer, byteCount); - } while (ret == -1 && errno == EINTR); + do { + ret = ::write(Fd_, buffer, byteCount); + } while (ret == -1 && errno == EINTR); return ret; #else #error unsupported platform @@ -508,18 +508,18 @@ i32 TFileHandle::Pread(void* buffer, ui32 byteCount, i64 offset) const noexcept DWORD bytesRead = 0; io.Offset = (ui32)offset; io.OffsetHigh = (ui32)(offset >> 32); - if (::ReadFile(Fd_, buffer, byteCount, &bytesRead, &io)) { + if (::ReadFile(Fd_, buffer, byteCount, &bytesRead, &io)) { return bytesRead; - } - if (::GetLastError() == ERROR_HANDLE_EOF) { + } + if (::GetLastError() == ERROR_HANDLE_EOF) { return 0; - } + } return -1; #elif defined(_unix_) i32 ret; - do { - ret = ::pread(Fd_, buffer, byteCount, offset); - } while (ret == -1 && errno == EINTR); + do { + ret = ::pread(Fd_, buffer, byteCount, offset); + } while (ret == -1 && errno == EINTR); return ret; #else #error unsupported platform @@ -533,15 +533,15 @@ i32 TFileHandle::Pwrite(const void* buffer, ui32 byteCount, i64 offset) const no DWORD bytesWritten = 0; io.Offset = (ui32)offset; io.OffsetHigh = (ui32)(offset >> 32); - if (::WriteFile(Fd_, buffer, byteCount, &bytesWritten, &io)) { + if (::WriteFile(Fd_, buffer, byteCount, &bytesWritten, &io)) { return bytesWritten; - } + } return -1; #elif defined(_unix_) i32 ret; - do { - ret = ::pwrite(Fd_, buffer, byteCount, offset); - } while (ret == -1 && errno == EINTR); + do { + ret = ::pwrite(Fd_, buffer, byteCount, offset); + } while (ret == -1 && errno == EINTR); return ret; #else #error unsupported platform @@ -549,14 +549,14 @@ i32 TFileHandle::Pwrite(const void* buffer, ui32 byteCount, i64 offset) const no } FHANDLE TFileHandle::Duplicate() const noexcept { - if (!IsOpen()) { + if (!IsOpen()) { return INVALID_FHANDLE; - } + } #if defined(_win_) FHANDLE dupHandle; - if (!::DuplicateHandle(GetCurrentProcess(), Fd_, GetCurrentProcess(), &dupHandle, 0, TRUE, DUPLICATE_SAME_ACCESS)) { + if (!::DuplicateHandle(GetCurrentProcess(), Fd_, GetCurrentProcess(), &dupHandle, 0, TRUE, DUPLICATE_SAME_ACCESS)) { return INVALID_FHANDLE; - } + } return dupHandle; #elif defined(_unix_) return ::dup(Fd_); @@ -643,138 +643,138 @@ void TFileHandle::ResetDirect() { #endif } -i64 TFileHandle::CountCache(i64 offset, i64 length) const noexcept { -#ifdef _linux_ - const i64 pageSize = NSystemInfo::GetPageSize(); +i64 TFileHandle::CountCache(i64 offset, i64 length) const noexcept { +#ifdef _linux_ + const i64 pageSize = NSystemInfo::GetPageSize(); constexpr size_t vecSize = 512; // Fetch up to 2MiB at once - const i64 batchSize = vecSize * pageSize; - std::array<ui8, vecSize> vec; - void* ptr = nullptr; - i64 res = 0; - - if (!IsOpen()) { - return -1; - } - - if (!length) { - length = GetLength(); - length -= Min(length, offset); - } - - if (!length) { - return 0; - } - - const i64 begin = AlignDown(offset, pageSize); - const i64 end = AlignUp(offset + length, pageSize); - const i64 size = end - begin; - - /* - * Since fincode is not implemented yet use mmap and mincore. - * This is not so effective and scalable for frequent usage. - */ - ptr = ::mmap( - (caddr_t) nullptr, - size, - PROT_READ, - MAP_SHARED | MAP_NORESERVE, - Fd_, - begin); - if (MAP_FAILED == ptr) { - return -1; - } - - for (i64 base = begin; base < end; base += batchSize) { - const size_t batch = Min(vecSize, size_t((end - base) / pageSize)); - void* batchPtr = static_cast<caddr_t>(ptr) + (base - begin); - - if (::mincore(batchPtr, batch * pageSize, vec.data())) { - res = -1; - break; - } - - for (size_t i = 0; i < batch; i++) { - // count uptodate complete pages in cache - if (vec[i] & 1) { - res += pageSize; - } - } - - if (base == begin && (vec[0] & 1)) { - // cut head of first page - res -= offset - begin; - } - - if ((end - base) <= batchSize && (vec[batch - 1] & 1)) { - // cut tail of last page - res -= size - (offset - begin) - length; - } - } - - ::munmap(ptr, size); - - return res; -#else - Y_UNUSED(offset); - Y_UNUSED(length); - return -1; -#endif -} - -void TFileHandle::PrefetchCache(i64 offset, i64 length, bool wait) const noexcept { -#ifdef _linux_ + const i64 batchSize = vecSize * pageSize; + std::array<ui8, vecSize> vec; + void* ptr = nullptr; + i64 res = 0; + + if (!IsOpen()) { + return -1; + } + + if (!length) { + length = GetLength(); + length -= Min(length, offset); + } + + if (!length) { + return 0; + } + + const i64 begin = AlignDown(offset, pageSize); + const i64 end = AlignUp(offset + length, pageSize); + const i64 size = end - begin; + + /* + * Since fincode is not implemented yet use mmap and mincore. + * This is not so effective and scalable for frequent usage. + */ + ptr = ::mmap( + (caddr_t) nullptr, + size, + PROT_READ, + MAP_SHARED | MAP_NORESERVE, + Fd_, + begin); + if (MAP_FAILED == ptr) { + return -1; + } + + for (i64 base = begin; base < end; base += batchSize) { + const size_t batch = Min(vecSize, size_t((end - base) / pageSize)); + void* batchPtr = static_cast<caddr_t>(ptr) + (base - begin); + + if (::mincore(batchPtr, batch * pageSize, vec.data())) { + res = -1; + break; + } + + for (size_t i = 0; i < batch; i++) { + // count uptodate complete pages in cache + if (vec[i] & 1) { + res += pageSize; + } + } + + if (base == begin && (vec[0] & 1)) { + // cut head of first page + res -= offset - begin; + } + + if ((end - base) <= batchSize && (vec[batch - 1] & 1)) { + // cut tail of last page + res -= size - (offset - begin) - length; + } + } + + ::munmap(ptr, size); + + return res; +#else + Y_UNUSED(offset); + Y_UNUSED(length); + return -1; +#endif +} + +void TFileHandle::PrefetchCache(i64 offset, i64 length, bool wait) const noexcept { +#ifdef _linux_ #if HAVE_POSIX_FADVISE - // POSIX_FADV_WILLNEED starts reading upto read_ahead_kb in background - ::posix_fadvise(Fd_, offset, length, POSIX_FADV_WILLNEED); + // POSIX_FADV_WILLNEED starts reading upto read_ahead_kb in background + ::posix_fadvise(Fd_, offset, length, POSIX_FADV_WILLNEED); #endif - - if (wait) { - TFileHandle devnull("/dev/null", OpenExisting | WrOnly | CloseOnExec); - off_t end = length ? (offset + length) : GetLength(); - off_t pos = offset; - ssize_t ret; - - do { - ret = ::sendfile((FHANDLE)devnull, Fd_, &pos, end - pos); - } while (pos < end && (ret > 0 || errno == EINTR)); - } -#else - Y_UNUSED(offset); - Y_UNUSED(length); - Y_UNUSED(wait); -#endif -} - -void TFileHandle::EvictCache(i64 offset, i64 length) const noexcept { -#if HAVE_POSIX_FADVISE - /* - * This tries to evicts only unmaped, clean, complete pages. - */ - ::posix_fadvise(Fd_, offset, length, POSIX_FADV_DONTNEED); -#else - Y_UNUSED(offset); - Y_UNUSED(length); -#endif -} - -bool TFileHandle::FlushCache(i64 offset, i64 length, bool wait) noexcept { -#if HAVE_SYNC_FILE_RANGE - int flags = SYNC_FILE_RANGE_WRITE; - if (wait) { - flags |= SYNC_FILE_RANGE_WAIT_AFTER; - } - int ret = ::sync_file_range(Fd_, offset, length, flags); - return ret == 0 || errno == EROFS; -#else - Y_UNUSED(offset); - Y_UNUSED(length); - if (wait) { - return FlushData(); - } - return true; -#endif -} - + + if (wait) { + TFileHandle devnull("/dev/null", OpenExisting | WrOnly | CloseOnExec); + off_t end = length ? (offset + length) : GetLength(); + off_t pos = offset; + ssize_t ret; + + do { + ret = ::sendfile((FHANDLE)devnull, Fd_, &pos, end - pos); + } while (pos < end && (ret > 0 || errno == EINTR)); + } +#else + Y_UNUSED(offset); + Y_UNUSED(length); + Y_UNUSED(wait); +#endif +} + +void TFileHandle::EvictCache(i64 offset, i64 length) const noexcept { +#if HAVE_POSIX_FADVISE + /* + * This tries to evicts only unmaped, clean, complete pages. + */ + ::posix_fadvise(Fd_, offset, length, POSIX_FADV_DONTNEED); +#else + Y_UNUSED(offset); + Y_UNUSED(length); +#endif +} + +bool TFileHandle::FlushCache(i64 offset, i64 length, bool wait) noexcept { +#if HAVE_SYNC_FILE_RANGE + int flags = SYNC_FILE_RANGE_WRITE; + if (wait) { + flags |= SYNC_FILE_RANGE_WAIT_AFTER; + } + int ret = ::sync_file_range(Fd_, offset, length, flags); + return ret == 0 || errno == EROFS; +#else + Y_UNUSED(offset); + Y_UNUSED(length); + if (wait) { + return FlushData(); + } + return true; +#endif +} + TString DecodeOpenMode(ui32 mode0) { ui32 mode = mode0; @@ -790,25 +790,25 @@ TString DecodeOpenMode(ui32 mode0) { } F(RdWr) - F(RdOnly) - F(WrOnly) - - F(CreateAlways) + F(RdOnly) + F(WrOnly) + + F(CreateAlways) F(CreateNew) - F(OpenAlways) - F(TruncExisting) - F(ForAppend) - F(Transient) - F(CloseOnExec) - - F(Temp) - F(Sync) - F(Direct) - F(DirectAligned) - F(Seq) - F(NoReuse) - F(NoReadAhead) - + F(OpenAlways) + F(TruncExisting) + F(ForAppend) + F(Transient) + F(CloseOnExec) + + F(Temp) + F(Sync) + F(Direct) + F(DirectAligned) + F(Seq) + F(NoReuse) + F(NoReadAhead) + F(AX) F(AR) F(AW) @@ -861,9 +861,9 @@ public: inline ~TImpl() = default; inline void Close() { - if (!Handle_.Close()) { + if (!Handle_.Close()) { ythrow TFileError() << "can't close " << FileName_.Quote(); - } + } } const TString& GetName() const noexcept { @@ -880,22 +880,22 @@ public: i64 Seek(i64 offset, SeekDir origin) { i64 pos = Handle_.Seek(offset, origin); - if (pos == -1L) { + if (pos == -1L) { ythrow TFileError() << "can't seek " << offset << " bytes in " << FileName_.Quote(); - } + } return pos; } void Resize(i64 length) { - if (!Handle_.Resize(length)) { + if (!Handle_.Resize(length)) { ythrow TFileError() << "can't resize " << FileName_.Quote() << " to size " << length; - } + } } void Reserve(i64 length) { - if (!Handle_.Reserve(length)) { + if (!Handle_.Reserve(length)) { ythrow TFileError() << "can't reserve " << length << " for file " << FileName_.Quote(); - } + } } void FallocateNoResize(i64 length) { @@ -911,22 +911,22 @@ public: } void Flush() { - if (!Handle_.Flush()) { + if (!Handle_.Flush()) { ythrow TFileError() << "can't flush " << FileName_.Quote(); - } + } } void FlushData() { - if (!Handle_.FlushData()) { + if (!Handle_.FlushData()) { ythrow TFileError() << "can't flush data " << FileName_.Quote(); - } + } } TFile Duplicate() const { TFileHandle dupH(Handle_.Duplicate()); - if (!dupH.IsOpen()) { + if (!dupH.IsOpen()) { ythrow TFileError() << "can't duplicate the handle of " << FileName_.Quote(); - } + } TFile res(dupH); dupH.Release(); return res; @@ -958,10 +958,10 @@ public: while (numBytes) { const size_t reallyRead = ReadOrFail(buf, numBytes); - if (reallyRead == 0) { - // file exhausted + if (reallyRead == 0) { + // file exhausted break; - } + } buf += reallyRead; numBytes -= reallyRead; @@ -971,9 +971,9 @@ public: } void Load(void* buf, size_t len) { - if (Read(buf, len) != len) { + if (Read(buf, len) != len) { ythrow TFileError() << "can't read " << len << " bytes from " << FileName_.Quote(); - } + } } // Maximum amount of bytes to be written via single system call. @@ -988,9 +988,9 @@ public: const i32 toWrite = (i32)Min(MaxWritePortion, numBytes); const i32 reallyWritten = Handle_.Write(buf, toWrite); - if (reallyWritten < 0) { + if (reallyWritten < 0) { ythrow TFileError() << "can't write " << toWrite << " bytes to " << FileName_.Quote(); - } + } buf += reallyWritten; numBytes -= reallyWritten; @@ -1004,14 +1004,14 @@ public: const i32 toRead = (i32)Min(MaxReadPortion, numBytes); const i32 reallyRead = RawPread(buf, toRead, offset); - if (reallyRead < 0) { + if (reallyRead < 0) { ythrow TFileError() << "can not read data from " << FileName_.Quote(); - } + } - if (reallyRead == 0) { - // file exausted + if (reallyRead == 0) { + // file exausted break; - } + } buf += reallyRead; offset += reallyRead; @@ -1026,9 +1026,9 @@ public: } void Pload(void* buf, size_t len, i64 offset) const { - if (Pread(buf, len, offset) != len) { + if (Pread(buf, len, offset) != len) { ythrow TFileError() << "can't read " << len << " bytes at offset " << offset << " from " << FileName_.Quote(); - } + } } void Pwrite(const void* buffer, size_t numBytes, i64 offset) const { @@ -1038,9 +1038,9 @@ public: const i32 toWrite = (i32)Min(MaxWritePortion, numBytes); const i32 reallyWritten = Handle_.Pwrite(buf, toWrite, offset); - if (reallyWritten < 0) { + if (reallyWritten < 0) { ythrow TFileError() << "can't write " << toWrite << " bytes to " << FileName_.Quote(); - } + } buf += reallyWritten; offset += reallyWritten; @@ -1064,24 +1064,24 @@ public: Handle_.ResetDirect(); } - i64 CountCache(i64 offset, i64 length) const noexcept { - return Handle_.CountCache(offset, length); - } - - void PrefetchCache(i64 offset, i64 length, bool wait) const noexcept { - Handle_.PrefetchCache(offset, length, wait); - } - - void EvictCache(i64 offset, i64 length) const noexcept { - Handle_.EvictCache(offset, length); - } - - void FlushCache(i64 offset, i64 length, bool wait) { - if (!Handle_.FlushCache(offset, length, wait)) { - ythrow TFileError() << "can't flush data " << FileName_.Quote(); - } - } - + i64 CountCache(i64 offset, i64 length) const noexcept { + return Handle_.CountCache(offset, length); + } + + void PrefetchCache(i64 offset, i64 length, bool wait) const noexcept { + Handle_.PrefetchCache(offset, length, wait); + } + + void EvictCache(i64 offset, i64 length) const noexcept { + Handle_.EvictCache(offset, length); + } + + void FlushCache(i64 offset, i64 length, bool wait) { + if (!Handle_.FlushCache(offset, length, wait)) { + ythrow TFileError() << "can't flush data " << FileName_.Quote(); + } + } + private: TFileHandle Handle_; TString FileName_; @@ -1215,22 +1215,22 @@ void TFile::ResetDirect() { Impl_->ResetDirect(); } -i64 TFile::CountCache(i64 offset, i64 length) const noexcept { - return Impl_->CountCache(offset, length); -} - -void TFile::PrefetchCache(i64 offset, i64 length, bool wait) const noexcept { - Impl_->PrefetchCache(offset, length, wait); -} - -void TFile::EvictCache(i64 offset, i64 length) const noexcept { - Impl_->EvictCache(offset, length); -} - -void TFile::FlushCache(i64 offset, i64 length, bool wait) { - Impl_->FlushCache(offset, length, wait); -} - +i64 TFile::CountCache(i64 offset, i64 length) const noexcept { + return Impl_->CountCache(offset, length); +} + +void TFile::PrefetchCache(i64 offset, i64 length, bool wait) const noexcept { + Impl_->PrefetchCache(offset, length, wait); +} + +void TFile::EvictCache(i64 offset, i64 length) const noexcept { + Impl_->EvictCache(offset, length); +} + +void TFile::FlushCache(i64 offset, i64 length, bool wait) { + Impl_->FlushCache(offset, length, wait); +} + void TFile::LinkTo(const TFile& f) const { if (!Impl_->GetHandle().LinkTo(f.Impl_->GetHandle())) { ythrow TFileError() << "can not link fd(" << GetName() << " -> " << f.GetName() << ")"; @@ -1271,9 +1271,9 @@ TFile Duplicate(int fd) { FHANDLE handle = reinterpret_cast<FHANDLE>(::_get_osfhandle(fd)); FHANDLE dupHandle; - if (!::DuplicateHandle(GetCurrentProcess(), handle, GetCurrentProcess(), &dupHandle, 0, TRUE, DUPLICATE_SAME_ACCESS)) { + if (!::DuplicateHandle(GetCurrentProcess(), handle, GetCurrentProcess(), &dupHandle, 0, TRUE, DUPLICATE_SAME_ACCESS)) { ythrow TFileError() << "can not duplicate file descriptor " << LastSystemError() << Endl; - } + } return TFile(dupHandle); #elif defined(_unix_) @@ -1292,7 +1292,7 @@ bool PosixDisableReadAhead(FHANDLE fileHandle, void* addr) noexcept { ret = madvise(addr, 0, MADV_RANDOM); // according to klamm@ posix_fadvise does not work under linux, madvise does work #else Y_UNUSED(addr); - ret = ::posix_fadvise(fileHandle, 0, 0, POSIX_FADV_RANDOM); + ret = ::posix_fadvise(fileHandle, 0, 0, POSIX_FADV_RANDOM); #endif #else Y_UNUSED(fileHandle); diff --git a/util/system/file.h b/util/system/file.h index 9502e159b6..ed8ef044db 100644 --- a/util/system/file.h +++ b/util/system/file.h @@ -22,16 +22,16 @@ enum EOpenModeFlag { RdWr = 24, // open for reading and writing MaskRW = 24, - Seq = 0x20, // file access is primarily sequential (POSIX_FADV_SEQUENTIAL) - Direct = 0x40, // file is being opened with no system caching (Does not work as intended! See implementation) - Temp = 0x80, // avoid writing data back to disk if sufficient cache memory is available (no op for linux) - ForAppend = 0x100, // write appends data to the end of file (O_APPEND) - Transient = 0x200, // actually, temporary file - 'delete on close' for windows, unlink after creation for unix - NoReuse = 0x400, // no second access expected (POSIX_FADV_NOREUSE) - CloseOnExec = 0x800, // set close-on-exec right at open (O_CLOEXEC) - DirectAligned = 0x1000, // file is actually being opened with no system caching (may require buffer alignment) (O_DIRECT) - Sync = 0x2000, // no write call will return before the data is transferred to the disk (O_SYNC) - NoReadAhead = 0x4000, // no sequential access expected, opposite for Seq (POSIX_FADV_RANDOM) + Seq = 0x20, // file access is primarily sequential (POSIX_FADV_SEQUENTIAL) + Direct = 0x40, // file is being opened with no system caching (Does not work as intended! See implementation) + Temp = 0x80, // avoid writing data back to disk if sufficient cache memory is available (no op for linux) + ForAppend = 0x100, // write appends data to the end of file (O_APPEND) + Transient = 0x200, // actually, temporary file - 'delete on close' for windows, unlink after creation for unix + NoReuse = 0x400, // no second access expected (POSIX_FADV_NOREUSE) + CloseOnExec = 0x800, // set close-on-exec right at open (O_CLOEXEC) + DirectAligned = 0x1000, // file is actually being opened with no system caching (may require buffer alignment) (O_DIRECT) + Sync = 0x2000, // no write call will return before the data is transferred to the disk (O_SYNC) + NoReadAhead = 0x4000, // no sequential access expected, opposite for Seq (POSIX_FADV_RANDOM) AXOther = 0x00010000, AWOther = 0x00020000, @@ -129,17 +129,17 @@ public: bool SetDirect(); void ResetDirect(); - /* Manual file cache management, length = 0 means "as much as possible" */ - - //measure amount of cached data in bytes, returns -1 if failed - i64 CountCache(i64 offset = 0, i64 length = 0) const noexcept; - //read data into cache and optionally wait for completion - void PrefetchCache(i64 offset = 0, i64 length = 0, bool wait = true) const noexcept; - //remove clean and unused data from cache - void EvictCache(i64 offset = 0, i64 length = 0) const noexcept; - //flush unwritten data in this range and optionally wait for completion - bool FlushCache(i64 offset = 0, i64 length = 0, bool wait = true) noexcept; - + /* Manual file cache management, length = 0 means "as much as possible" */ + + //measure amount of cached data in bytes, returns -1 if failed + i64 CountCache(i64 offset = 0, i64 length = 0) const noexcept; + //read data into cache and optionally wait for completion + void PrefetchCache(i64 offset = 0, i64 length = 0, bool wait = true) const noexcept; + //remove clean and unused data from cache + void EvictCache(i64 offset = 0, i64 length = 0) const noexcept; + //flush unwritten data in this range and optionally wait for completion + bool FlushCache(i64 offset = 0, i64 length = 0, bool wait = true) noexcept; + private: FHANDLE Fd_ = INVALID_FHANDLE; }; @@ -200,17 +200,17 @@ public: void SetDirect(); void ResetDirect(); - /* Manual file cache management, length = 0 means "as much as possible" */ - - //measure amount of cached data in bytes, returns -1 if failed - i64 CountCache(i64 offset = 0, i64 length = 0) const noexcept; - //read data into cache and optionally wait for completion - void PrefetchCache(i64 offset = 0, i64 length = 0, bool wait = true) const noexcept; - //remove clean and unused data from cache, incomplete pages could stay - void EvictCache(i64 offset = 0, i64 length = 0) const noexcept; - //flush unwritten data in this range and optionally wait for completion - void FlushCache(i64 offset = 0, i64 length = 0, bool wait = true); - + /* Manual file cache management, length = 0 means "as much as possible" */ + + //measure amount of cached data in bytes, returns -1 if failed + i64 CountCache(i64 offset = 0, i64 length = 0) const noexcept; + //read data into cache and optionally wait for completion + void PrefetchCache(i64 offset = 0, i64 length = 0, bool wait = true) const noexcept; + //remove clean and unused data from cache, incomplete pages could stay + void EvictCache(i64 offset = 0, i64 length = 0) const noexcept; + //flush unwritten data in this range and optionally wait for completion + void FlushCache(i64 offset = 0, i64 length = 0, bool wait = true); + static TFile Temporary(const TString& prefix); static TFile ForAppend(const TString& path); diff --git a/util/system/file_ut.cpp b/util/system/file_ut.cpp index 941e6a50f3..99c882bbbb 100644 --- a/util/system/file_ut.cpp +++ b/util/system/file_ut.cpp @@ -23,7 +23,7 @@ class TFileTest: public TTestBase { UNIT_TEST(TestRead); UNIT_TEST(TestRawPread); UNIT_TEST(TestPread); - UNIT_TEST(TestCache); + UNIT_TEST(TestCache); UNIT_TEST_SUITE_END(); public: @@ -37,7 +37,7 @@ public: void TestRead(); void TestRawPread(); void TestPread(); - void TestCache(); + void TestCache(); inline void TestLinkTo() { TTempFile tmp1("tmp1"); @@ -327,90 +327,90 @@ void TFileTest::TestPread() { } } -#ifdef _linux_ +#ifdef _linux_ #include <sys/statfs.h> -#endif - -#ifndef TMPFS_MAGIC +#endif + +#ifndef TMPFS_MAGIC #define TMPFS_MAGIC 0x01021994 -#endif - +#endif + void TFileTest::TestCache(){ -#ifdef _linux_ +#ifdef _linux_ {// create file in /tmp, current dir could be tmpfs which does not support fadvise TFile file(MakeTempName("/tmp"), OpenAlways | Transient | RdWr | NoReadAhead); - + struct statfs fs; if (!fstatfs(file.GetHandle(), &fs) && fs.f_type == TMPFS_MAGIC) { return; } - + UNIT_ASSERT_VALUES_EQUAL(file.CountCache(), 0); UNIT_ASSERT_VALUES_EQUAL(file.CountCache(0, 0), 0); - + file.Resize(7); file.PrefetchCache(); UNIT_ASSERT_VALUES_EQUAL(file.CountCache(), 7); UNIT_ASSERT_VALUES_EQUAL(file.CountCache(3, 2), 2); - + file.FlushCache(); UNIT_ASSERT_VALUES_EQUAL(file.CountCache(), 7); - + file.EvictCache(); UNIT_ASSERT_VALUES_EQUAL(file.CountCache(), 0); - + file.PrefetchCache(); UNIT_ASSERT_VALUES_EQUAL(file.CountCache(), 7); - + file.Resize(12345); UNIT_ASSERT_VALUES_EQUAL(file.CountCache(), 4096); UNIT_ASSERT_VALUES_EQUAL(file.CountCache(4096, 0), 0); - + file.PrefetchCache(); UNIT_ASSERT_VALUES_EQUAL(file.CountCache(), 12345); - + file.FlushCache(); file.EvictCache(); UNIT_ASSERT_LE(file.CountCache(), 0); - + file.Resize(33333333); file.PrefetchCache(11111111, 11111111); UNIT_ASSERT_GE(file.CountCache(), 11111111); - + UNIT_ASSERT_LE(file.CountCache(0, 11111111), 1111111); UNIT_ASSERT_VALUES_EQUAL(file.CountCache(11111111, 11111111), 11111111); UNIT_ASSERT_LE(file.CountCache(22222222, 11111111), 1111111); - + file.FlushCache(11111111, 11111111); UNIT_ASSERT_GE(file.CountCache(), 11111111); - + // first and last incomplete pages could stay in cache file.EvictCache(11111111, 11111111); UNIT_ASSERT_LT(file.CountCache(11111111, 11111111), 4096 * 2); - + file.EvictCache(); UNIT_ASSERT_VALUES_EQUAL(file.CountCache(), 0); } -#else +#else {TFile file(MakeTempName(), OpenAlways | Transient | RdWr); - + file.Resize(12345); - + UNIT_ASSERT_VALUES_EQUAL(file.CountCache(), -1); file.PrefetchCache(); file.FlushCache(); file.EvictCache(); UNIT_ASSERT_VALUES_EQUAL(file.CountCache(0, 12345), -1); } -#endif -} - +#endif +} + Y_UNIT_TEST_SUITE(TTestDecodeOpenMode) { Y_UNIT_TEST(It) { UNIT_ASSERT_VALUES_EQUAL("0", DecodeOpenMode(0)); UNIT_ASSERT_VALUES_EQUAL("RdOnly", DecodeOpenMode(RdOnly)); UNIT_ASSERT_VALUES_EQUAL("RdWr", DecodeOpenMode(RdWr)); UNIT_ASSERT_VALUES_EQUAL("WrOnly|ForAppend", DecodeOpenMode(WrOnly | ForAppend)); - UNIT_ASSERT_VALUES_EQUAL("RdWr|CreateAlways|CreateNew|ForAppend|Transient|CloseOnExec|Temp|Sync|Direct|DirectAligned|Seq|NoReuse|NoReadAhead|AX|AR|AW|AWOther|0xF8888000", DecodeOpenMode(0xFFFFFFFF)); + UNIT_ASSERT_VALUES_EQUAL("RdWr|CreateAlways|CreateNew|ForAppend|Transient|CloseOnExec|Temp|Sync|Direct|DirectAligned|Seq|NoReuse|NoReadAhead|AX|AR|AW|AWOther|0xF8888000", DecodeOpenMode(0xFFFFFFFF)); } } diff --git a/util/system/thread.i b/util/system/thread.i index 8cba505473..072b1251e8 100644 --- a/util/system/thread.i +++ b/util/system/thread.i @@ -39,10 +39,10 @@ static inline T ThreadIdHashFunction(T t) noexcept { /* * we must permute threadid bits, because some strange platforms(such Linux) * have strange threadid numeric properties - * - * Because they are alligned pointers to pthread_t rather that tid. - * Currently there is no way to get tid without syscall (slightly slower) - * (pthread_getthreadid_np is not implemeted in glibc/musl for some reason). + * + * Because they are alligned pointers to pthread_t rather that tid. + * Currently there is no way to get tid without syscall (slightly slower) + * (pthread_getthreadid_np is not implemeted in glibc/musl for some reason). */ return IntHash(t); } diff --git a/util/thread/pool.cpp b/util/thread/pool.cpp index 05fad02e9b..b17cfa8b39 100644 --- a/util/thread/pool.cpp +++ b/util/thread/pool.cpp @@ -124,8 +124,8 @@ public: Queue.Push(obj); } - QueuePushCond.Signal(); - + QueuePushCond.Signal(); + return true; } @@ -193,7 +193,7 @@ private: with_lock (StopMutex) { while (ThreadCountReal) { with_lock (QueueMutex) { - QueuePushCond.Signal(); + QueuePushCond.Signal(); } StopCond.Wait(StopMutex); @@ -213,7 +213,7 @@ private: with_lock (QueueMutex) { while (Queue.Empty() && !AtomicGet(ShouldTerminate)) { - QueuePushCond.Wait(QueueMutex); + QueuePushCond.Wait(QueueMutex); } if (AtomicGet(ShouldTerminate) && Queue.Empty()) { @@ -259,7 +259,7 @@ private: TThreadNamer Namer; mutable TMutex QueueMutex; mutable TMutex StopMutex; - TCondVar QueuePushCond; + TCondVar QueuePushCond; TCondVar QueuePopCond; TCondVar StopCond; TJobQueue Queue; |