diff options
| author | Anton Samokhvalov <[email protected]> | 2022-02-10 16:45:17 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:45:17 +0300 | 
| commit | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch) | |
| tree | dd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/threading | |
| parent | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff) | |
Restoring authorship annotation for Anton Samokhvalov <[email protected]>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading')
37 files changed, 2895 insertions, 2895 deletions
diff --git a/library/cpp/threading/atomic/bool.h b/library/cpp/threading/atomic/bool.h index ec8f75427b4..d52544e7626 100644 --- a/library/cpp/threading/atomic/bool.h +++ b/library/cpp/threading/atomic/bool.h @@ -1,7 +1,7 @@ -#pragma once  -  +#pragma once +  #include <util/system/atomic.h> -  +  namespace NAtomic {      class TBool {      public: @@ -20,12 +20,12 @@ namespace NAtomic {              return AtomicGet(Val_);          } -        const TBool& operator=(bool val) noexcept {  +        const TBool& operator=(bool val) noexcept {              AtomicSet(Val_, val);              return *this;          } -        const TBool& operator=(const TBool& src) noexcept {  +        const TBool& operator=(const TBool& src) noexcept {              AtomicSet(Val_, AtomicGet(src.Val_));              return *this;          } @@ -33,4 +33,4 @@ namespace NAtomic {      private:          TAtomic Val_ = 0;      }; -}  +} diff --git a/library/cpp/threading/chunk_queue/queue.h b/library/cpp/threading/chunk_queue/queue.h index 1959b0258c6..55859601a1e 100644 --- a/library/cpp/threading/chunk_queue/queue.h +++ b/library/cpp/threading/chunk_queue/queue.h @@ -19,21 +19,21 @@ namespace NThreading {  // Platform helpers  #if !defined(PLATFORM_CACHE_LINE) -#define PLATFORM_CACHE_LINE 64  +#define PLATFORM_CACHE_LINE 64  #endif  #if !defined(PLATFORM_PAGE_SIZE) -#define PLATFORM_PAGE_SIZE 4 * 1024  +#define PLATFORM_PAGE_SIZE 4 * 1024  #endif -    template <typename T, size_t PadSize = PLATFORM_CACHE_LINE>  -    struct TPadded: public T {  -        char Pad[PadSize - sizeof(T) % PadSize];  +    template <typename T, size_t PadSize = PLATFORM_CACHE_LINE> +    struct TPadded: public T { +        char Pad[PadSize - sizeof(T) % PadSize]; -        TPadded() {  -            static_assert(sizeof(*this) % PadSize == 0, "padding does not work");  -            Y_UNUSED(Pad);  -        }  +        TPadded() { +            static_assert(sizeof(*this) % PadSize == 0, "padding does not work"); +            Y_UNUSED(Pad); +        }          template<typename... Args>          TPadded(Args&&... args) @@ -42,280 +42,280 @@ namespace NThreading {              static_assert(sizeof(*this) % PadSize == 0, "padding does not work");              Y_UNUSED(Pad);          } -    };  +    }; + +    //////////////////////////////////////////////////////////////////////////////// +    // Type helpers + +    namespace NImpl { +        template <typename T> +        struct TPodTypeHelper { +            template <typename TT> +            static void Write(T* ptr, TT&& value) { +                *ptr = value; +            } + +            static T Read(T* ptr) { +                return *ptr; +            } + +            static void Destroy(T* ptr) { +                Y_UNUSED(ptr); +            } +        }; + +        template <typename T> +        struct TNonPodTypeHelper { +            template <typename TT> +            static void Write(T* ptr, TT&& value) { +                new (ptr) T(std::forward<TT>(value)); +            } + +            static T Read(T* ptr) { +                return std::move(*ptr); +            } + +            static void Destroy(T* ptr) { +                (void)ptr; /* Make MSVC happy. */ +                ptr->~T(); +            } +        }; + +        template <typename T> +        using TTypeHelper = std::conditional_t< +            TTypeTraits<T>::IsPod, +            TPodTypeHelper<T>, +            TNonPodTypeHelper<T>>; + +    } + +    //////////////////////////////////////////////////////////////////////////////// +    // One producer/one consumer chunked queue. + +    template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE> +    class TOneOneQueue: private TNonCopyable { +        using TTypeHelper = NImpl::TTypeHelper<T>; + +        struct TChunk; + +        struct TChunkHeader { +            size_t Count = 0; +            TChunk* Next = nullptr; +        }; + +        struct TChunk: public TChunkHeader { +            static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T); + +            char Entries[MaxCount * sizeof(T)]; + +            TChunk() { +                Y_UNUSED(Entries); // uninitialized +            } + +            ~TChunk() { +                for (size_t i = 0; i < this->Count; ++i) { +                    TTypeHelper::Destroy(GetPtr(i)); +                } +            } + +            T* GetPtr(size_t i) { +                return (T*)Entries + i; +            } +        }; + +        struct TWriterState { +            TChunk* Chunk = nullptr; +        }; + +        struct TReaderState { +            TChunk* Chunk = nullptr; +            size_t Count = 0; +        }; + +    private: +        TPadded<TWriterState> Writer; +        TPadded<TReaderState> Reader; + +    public: +        using TItem = T; + +        TOneOneQueue() { +            Writer.Chunk = Reader.Chunk = new TChunk(); +        } + +        ~TOneOneQueue() { +            DeleteChunks(Reader.Chunk); +        } + +        template <typename TT> +        void Enqueue(TT&& value) { +            T* ptr = PrepareWrite(); +            Y_ASSERT(ptr); +            TTypeHelper::Write(ptr, std::forward<TT>(value)); +            CompleteWrite(); +        } + +        bool Dequeue(T& value) { +            if (T* ptr = PrepareRead()) { +                value = TTypeHelper::Read(ptr); +                CompleteRead(); +                return true; +            } +            return false; +        } + +        bool IsEmpty() { +            return !PrepareRead(); +        } + +    protected: +        T* PrepareWrite() { +            TChunk* chunk = Writer.Chunk; +            Y_ASSERT(chunk && !chunk->Next); + +            if (chunk->Count != TChunk::MaxCount) { +                return chunk->GetPtr(chunk->Count); +            } + +            chunk = new TChunk(); +            AtomicSet(Writer.Chunk->Next, chunk); +            Writer.Chunk = chunk; +            return chunk->GetPtr(0); +        } + +        void CompleteWrite() { +            AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1); +        } + +        T* PrepareRead() { +            TChunk* chunk = Reader.Chunk; +            Y_ASSERT(chunk); + +            for (;;) { +                size_t writerCount = AtomicGet(chunk->Count); +                if (Reader.Count != writerCount) { +                    return chunk->GetPtr(Reader.Count); +                } + +                if (writerCount != TChunk::MaxCount) { +                    return nullptr; +                } + +                chunk = AtomicGet(chunk->Next); +                if (!chunk) { +                    return nullptr; +                } + +                delete Reader.Chunk; +                Reader.Chunk = chunk; +                Reader.Count = 0; +            } +        } -    ////////////////////////////////////////////////////////////////////////////////  -    // Type helpers  +        void CompleteRead() { +            ++Reader.Count; +        } -    namespace NImpl {  -        template <typename T>  -        struct TPodTypeHelper {  -            template <typename TT>  -            static void Write(T* ptr, TT&& value) {  -                *ptr = value;  -            }  +    private: +        static void DeleteChunks(TChunk* chunk) { +            while (chunk) { +                TChunk* next = chunk->Next; +                delete chunk; +                chunk = next; +            } +        } +    }; -            static T Read(T* ptr) {  -                return *ptr;  -            }  - -            static void Destroy(T* ptr) {  -                Y_UNUSED(ptr);  -            }  -        };  +    //////////////////////////////////////////////////////////////////////////////// +    // Multiple producers/single consumer partitioned queue. +    // Provides FIFO guaranties for each producer. -        template <typename T>  -        struct TNonPodTypeHelper {  -            template <typename TT>  -            static void Write(T* ptr, TT&& value) {  -                new (ptr) T(std::forward<TT>(value));  -            }  - -            static T Read(T* ptr) {  -                return std::move(*ptr);  -            }  - -            static void Destroy(T* ptr) {  -                (void)ptr; /* Make MSVC happy. */  -                ptr->~T();  -            }  -        };  - -        template <typename T>  -        using TTypeHelper = std::conditional_t<  -            TTypeTraits<T>::IsPod,  -            TPodTypeHelper<T>,  -            TNonPodTypeHelper<T>>;  - -    }  - -    ////////////////////////////////////////////////////////////////////////////////  -    // One producer/one consumer chunked queue.  - -    template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>  -    class TOneOneQueue: private TNonCopyable {  -        using TTypeHelper = NImpl::TTypeHelper<T>;  - -        struct TChunk;  - -        struct TChunkHeader {  -            size_t Count = 0;  -            TChunk* Next = nullptr;  -        };  - -        struct TChunk: public TChunkHeader {  -            static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T);  - -            char Entries[MaxCount * sizeof(T)];  - -            TChunk() {  -                Y_UNUSED(Entries); // uninitialized  -            }  - -            ~TChunk() {  -                for (size_t i = 0; i < this->Count; ++i) {  -                    TTypeHelper::Destroy(GetPtr(i));  -                }  -            } - -            T* GetPtr(size_t i) {  -                return (T*)Entries + i;  -            }  -        };  - -        struct TWriterState {  -            TChunk* Chunk = nullptr;  -        };  - -        struct TReaderState {  -            TChunk* Chunk = nullptr;  -            size_t Count = 0;  -        };  - -    private:  -        TPadded<TWriterState> Writer;  -        TPadded<TReaderState> Reader;  - -    public:  -        using TItem = T;  - -        TOneOneQueue() {  -            Writer.Chunk = Reader.Chunk = new TChunk();  -        }  - -        ~TOneOneQueue() {  -            DeleteChunks(Reader.Chunk);  -        }  - -        template <typename TT>  -        void Enqueue(TT&& value) {  -            T* ptr = PrepareWrite();  -            Y_ASSERT(ptr);  -            TTypeHelper::Write(ptr, std::forward<TT>(value));  -            CompleteWrite();  -        }  - -        bool Dequeue(T& value) {  -            if (T* ptr = PrepareRead()) {  -                value = TTypeHelper::Read(ptr);  -                CompleteRead();  -                return true;  -            }  -            return false;  -        } - -        bool IsEmpty() {  -            return !PrepareRead();  -        }  +    template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> +    class TManyOneQueue: private TNonCopyable { +        using TTypeHelper = NImpl::TTypeHelper<T>; -    protected:  -        T* PrepareWrite() {  -            TChunk* chunk = Writer.Chunk;  -            Y_ASSERT(chunk && !chunk->Next);  - -            if (chunk->Count != TChunk::MaxCount) {  -                return chunk->GetPtr(chunk->Count);  -            }  -  -            chunk = new TChunk();  -            AtomicSet(Writer.Chunk->Next, chunk);  -            Writer.Chunk = chunk;  -            return chunk->GetPtr(0);  -        } - -        void CompleteWrite() {  -            AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1);  -        }  +        struct TEntry { +            T Value; +            ui64 Tag; +        }; + +        struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> { +            TAtomic WriteLock = 0; -        T* PrepareRead() {  -            TChunk* chunk = Reader.Chunk;  -            Y_ASSERT(chunk);  +            using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite; +            using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite; + +            using TOneOneQueue<TEntry, ChunkSize>::PrepareRead; +            using TOneOneQueue<TEntry, ChunkSize>::CompleteRead; +        }; -            for (;;) {  -                size_t writerCount = AtomicGet(chunk->Count);  -                if (Reader.Count != writerCount) {  -                    return chunk->GetPtr(Reader.Count);  -                }  - -                if (writerCount != TChunk::MaxCount) {  -                    return nullptr;  -                }  - -                chunk = AtomicGet(chunk->Next);  -                if (!chunk) {  -                    return nullptr;  -                }  - -                delete Reader.Chunk;  -                Reader.Chunk = chunk;  -                Reader.Count = 0;  -            } -        }  - -        void CompleteRead() {  -            ++Reader.Count;  -        } - -    private:  -        static void DeleteChunks(TChunk* chunk) {  -            while (chunk) {  -                TChunk* next = chunk->Next;  -                delete chunk;  -                chunk = next;  -            }  -        } -    };  - -    ////////////////////////////////////////////////////////////////////////////////  -    // Multiple producers/single consumer partitioned queue.  -    // Provides FIFO guaranties for each producer.  - -    template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>  -    class TManyOneQueue: private TNonCopyable {  -        using TTypeHelper = NImpl::TTypeHelper<T>;  - -        struct TEntry {  -            T Value;  -            ui64 Tag;  -        };  - -        struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> {  -            TAtomic WriteLock = 0;  - -            using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite;  -            using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite;  - -            using TOneOneQueue<TEntry, ChunkSize>::PrepareRead;  -            using TOneOneQueue<TEntry, ChunkSize>::CompleteRead;  -        };  - -    private:  -        union {  -            TAtomic WriteTag = 0;  -            char Pad[PLATFORM_CACHE_LINE];  -        };  - -        TQueueType Queues[Concurrency];  - -    public:  -        using TItem = T;  - -        template <typename TT>  -        void Enqueue(TT&& value) {  -            ui64 tag = NextTag();  -            while (!TryEnqueue(std::forward<TT>(value), tag)) {  -                SpinLockPause();  -            }  -        } - -        bool Dequeue(T& value) {  -            size_t index = 0;  -            if (TEntry* entry = PrepareRead(index)) {  -                value = TTypeHelper::Read(&entry->Value);  -                Queues[index].CompleteRead();  -                return true;  -            }  -            return false;  -        } +    private: +        union { +            TAtomic WriteTag = 0; +            char Pad[PLATFORM_CACHE_LINE]; +        }; + +        TQueueType Queues[Concurrency]; + +    public: +        using TItem = T; + +        template <typename TT> +        void Enqueue(TT&& value) { +            ui64 tag = NextTag(); +            while (!TryEnqueue(std::forward<TT>(value), tag)) { +                SpinLockPause(); +            } +        } + +        bool Dequeue(T& value) { +            size_t index = 0; +            if (TEntry* entry = PrepareRead(index)) { +                value = TTypeHelper::Read(&entry->Value); +                Queues[index].CompleteRead(); +                return true; +            } +            return false; +        } -        bool IsEmpty() {  -            for (size_t i = 0; i < Concurrency; ++i) {  -                if (!Queues[i].IsEmpty()) {  -                    return false;  -                }  -            } -            return true;  +        bool IsEmpty() { +            for (size_t i = 0; i < Concurrency; ++i) { +                if (!Queues[i].IsEmpty()) { +                    return false; +                } +            } +            return true;          } -    private:  -        ui64 NextTag() {  -            // TODO: can we avoid synchronization here? it costs 1.5x performance penalty  -            // return GetCycleCount();  -            return AtomicIncrement(WriteTag);  -        }  +    private: +        ui64 NextTag() { +            // TODO: can we avoid synchronization here? it costs 1.5x performance penalty +            // return GetCycleCount(); +            return AtomicIncrement(WriteTag); +        } -        template <typename TT>  -        bool TryEnqueue(TT&& value, ui64 tag) {  -            for (size_t i = 0; i < Concurrency; ++i) {  -                TQueueType& queue = Queues[i];  -                if (AtomicTryAndTryLock(&queue.WriteLock)) {  -                    TEntry* entry = queue.PrepareWrite();  -                    Y_ASSERT(entry);  -                    TTypeHelper::Write(&entry->Value, std::forward<TT>(value));  -                    entry->Tag = tag;  -                    queue.CompleteWrite();  -                    AtomicUnlock(&queue.WriteLock);  -                    return true;  -                }  +        template <typename TT> +        bool TryEnqueue(TT&& value, ui64 tag) { +            for (size_t i = 0; i < Concurrency; ++i) { +                TQueueType& queue = Queues[i]; +                if (AtomicTryAndTryLock(&queue.WriteLock)) { +                    TEntry* entry = queue.PrepareWrite(); +                    Y_ASSERT(entry); +                    TTypeHelper::Write(&entry->Value, std::forward<TT>(value)); +                    entry->Tag = tag; +                    queue.CompleteWrite(); +                    AtomicUnlock(&queue.WriteLock); +                    return true; +                }              } -            return false;  +            return false;          } -        TEntry* PrepareRead(size_t& index) {  -            TEntry* entry = nullptr;  -            ui64 tag = Max();  +        TEntry* PrepareRead(size_t& index) { +            TEntry* entry = nullptr; +            ui64 tag = Max(); -            for (size_t i = 0; i < Concurrency; ++i) {  +            for (size_t i = 0; i < Concurrency; ++i) {                  TEntry* e = Queues[i].PrepareRead();                  if (e && e->Tag < tag) {                      index = i; @@ -323,246 +323,246 @@ namespace NThreading {                      tag = e->Tag;                  }              } -  -            if (entry) {  -                // need second pass to catch updates within already scanned range  -                size_t candidate = index;  -                for (size_t i = 0; i < candidate; ++i) {  -                    TEntry* e = Queues[i].PrepareRead();  -                    if (e && e->Tag < tag) {  -                        index = i;  -                        entry = e;  -                        tag = e->Tag;  -                    }  -                }  -            }  -  -            return entry;  + +            if (entry) { +                // need second pass to catch updates within already scanned range +                size_t candidate = index; +                for (size_t i = 0; i < candidate; ++i) { +                    TEntry* e = Queues[i].PrepareRead(); +                    if (e && e->Tag < tag) { +                        index = i; +                        entry = e; +                        tag = e->Tag; +                    } +                } +            } + +            return entry;          } -    };  +    }; -    ////////////////////////////////////////////////////////////////////////////////  -    // Concurrent many-many queue with strong FIFO guaranties.  -    // Writers will not block readers (and vice versa), but will block each other.  +    //////////////////////////////////////////////////////////////////////////////// +    // Concurrent many-many queue with strong FIFO guaranties. +    // Writers will not block readers (and vice versa), but will block each other. -    template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>  -    class TManyManyQueue: private TNonCopyable {  -    private:  -        TPadded<TLock> WriteLock;  -        TPadded<TLock> ReadLock;  - -        TOneOneQueue<T, ChunkSize> Queue;  - -    public:  -        using TItem = T;  - -        template <typename TT>  -        void Enqueue(TT&& value) {  -            with_lock (WriteLock) {  -                Queue.Enqueue(std::forward<TT>(value));  -            }  -        } - -        bool Dequeue(T& value) {  -            with_lock (ReadLock) {  -                return Queue.Dequeue(value);  -            }  -        } - -        bool IsEmpty() {  -            with_lock (ReadLock) {  -                return Queue.IsEmpty();  -            }  -        } -    };  - -    ////////////////////////////////////////////////////////////////////////////////  -    // Multiple producers/single consumer partitioned queue.  -    // Because of random partitioning reordering possible - FIFO not guaranteed!  - -    template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>  -    class TRelaxedManyOneQueue: private TNonCopyable {  -        struct TQueueType: public TOneOneQueue<T, ChunkSize> {  -            TAtomic WriteLock = 0;  -        };  - -    private:  -        union {  -            size_t ReadPos = 0;  -            char Pad[PLATFORM_CACHE_LINE];  -        };  - -        TQueueType Queues[Concurrency];  - -    public:  -        using TItem = T;  - -        template <typename TT>  -        void Enqueue(TT&& value) {  -            while (!TryEnqueue(std::forward<TT>(value))) {  -                SpinLockPause();  -            }  -        } - -        bool Dequeue(T& value) {  -            for (size_t i = 0; i < Concurrency; ++i) {  -                TQueueType& queue = Queues[ReadPos++ % Concurrency];  -                if (queue.Dequeue(value)) {  -                    return true;  -                }  -            } -            return false;  -        } - -        bool IsEmpty() {  -            for (size_t i = 0; i < Concurrency; ++i) {  -                if (!Queues[i].IsEmpty()) {  -                    return false;  -                }  -            } -            return true;  -        } - -    private:  -        template <typename TT>  -        bool TryEnqueue(TT&& value) {  -            size_t writePos = GetCycleCount();  -            for (size_t i = 0; i < Concurrency; ++i) {  -                TQueueType& queue = Queues[writePos++ % Concurrency];  -                if (AtomicTryAndTryLock(&queue.WriteLock)) {  -                    queue.Enqueue(std::forward<TT>(value));  -                    AtomicUnlock(&queue.WriteLock);  -                    return true;  -                }  -            } -            return false;  +    template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock> +    class TManyManyQueue: private TNonCopyable { +    private: +        TPadded<TLock> WriteLock; +        TPadded<TLock> ReadLock; + +        TOneOneQueue<T, ChunkSize> Queue; + +    public: +        using TItem = T; + +        template <typename TT> +        void Enqueue(TT&& value) { +            with_lock (WriteLock) { +                Queue.Enqueue(std::forward<TT>(value)); +            }          } -    };  -    ////////////////////////////////////////////////////////////////////////////////  -    // Concurrent many-many partitioned queue.  -    // Because of random partitioning reordering possible - FIFO not guaranteed!  - -    template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>  -    class TRelaxedManyManyQueue: private TNonCopyable {  -        struct TQueueType: public TOneOneQueue<T, ChunkSize> {  -            union {  -                TAtomic WriteLock = 0;  -                char Pad1[PLATFORM_CACHE_LINE];  -            };  -            union {  -                TAtomic ReadLock = 0;  -                char Pad2[PLATFORM_CACHE_LINE];  -            };  +        bool Dequeue(T& value) { +            with_lock (ReadLock) { +                return Queue.Dequeue(value); +            } +        } + +        bool IsEmpty() { +            with_lock (ReadLock) { +                return Queue.IsEmpty(); +            } +        } +    }; + +    //////////////////////////////////////////////////////////////////////////////// +    // Multiple producers/single consumer partitioned queue. +    // Because of random partitioning reordering possible - FIFO not guaranteed! + +    template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> +    class TRelaxedManyOneQueue: private TNonCopyable { +        struct TQueueType: public TOneOneQueue<T, ChunkSize> { +            TAtomic WriteLock = 0; +        }; + +    private: +        union { +            size_t ReadPos = 0; +            char Pad[PLATFORM_CACHE_LINE];          }; -    private:  -        TQueueType Queues[Concurrency];  - -    public:  -        using TItem = T;  - -        template <typename TT>  -        void Enqueue(TT&& value) {  -            while (!TryEnqueue(std::forward<TT>(value))) {  -                SpinLockPause();  -            }  -        } - -        bool Dequeue(T& value) {  -            size_t readPos = GetCycleCount();  -            for (size_t i = 0; i < Concurrency; ++i) {  -                TQueueType& queue = Queues[readPos++ % Concurrency];  -                if (AtomicTryAndTryLock(&queue.ReadLock)) {  -                    bool dequeued = queue.Dequeue(value);  -                    AtomicUnlock(&queue.ReadLock);  -                    if (dequeued) {  -                        return true;  -                    }  +        TQueueType Queues[Concurrency]; + +    public: +        using TItem = T; + +        template <typename TT> +        void Enqueue(TT&& value) { +            while (!TryEnqueue(std::forward<TT>(value))) { +                SpinLockPause(); +            } +        } + +        bool Dequeue(T& value) { +            for (size_t i = 0; i < Concurrency; ++i) { +                TQueueType& queue = Queues[ReadPos++ % Concurrency]; +                if (queue.Dequeue(value)) { +                    return true;                  }              } -            return false;  +            return false;          } -        bool IsEmpty() {  -            for (size_t i = 0; i < Concurrency; ++i) {  -                TQueueType& queue = Queues[i];  -                if (AtomicTryAndTryLock(&queue.ReadLock)) {  -                    bool empty = queue.IsEmpty();  -                    AtomicUnlock(&queue.ReadLock);  -                    if (!empty) {  -                        return false;  -                    }  +        bool IsEmpty() { +            for (size_t i = 0; i < Concurrency; ++i) { +                if (!Queues[i].IsEmpty()) { +                    return false;                  }              } -            return true;  +            return true;          } -    private:  -        template <typename TT>  -        bool TryEnqueue(TT&& value) {  -            size_t writePos = GetCycleCount();  -            for (size_t i = 0; i < Concurrency; ++i) {  -                TQueueType& queue = Queues[writePos++ % Concurrency];  -                if (AtomicTryAndTryLock(&queue.WriteLock)) {  -                    queue.Enqueue(std::forward<TT>(value));  -                    AtomicUnlock(&queue.WriteLock);  -                    return true;  -                }  +    private: +        template <typename TT> +        bool TryEnqueue(TT&& value) { +            size_t writePos = GetCycleCount(); +            for (size_t i = 0; i < Concurrency; ++i) { +                TQueueType& queue = Queues[writePos++ % Concurrency]; +                if (AtomicTryAndTryLock(&queue.WriteLock)) { +                    queue.Enqueue(std::forward<TT>(value)); +                    AtomicUnlock(&queue.WriteLock); +                    return true; +                }              } -            return false;  +            return false;          } -    };  +    }; + +    //////////////////////////////////////////////////////////////////////////////// +    // Concurrent many-many partitioned queue. +    // Because of random partitioning reordering possible - FIFO not guaranteed! + +    template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> +    class TRelaxedManyManyQueue: private TNonCopyable { +        struct TQueueType: public TOneOneQueue<T, ChunkSize> { +            union { +                TAtomic WriteLock = 0; +                char Pad1[PLATFORM_CACHE_LINE]; +            }; +            union { +                TAtomic ReadLock = 0; +                char Pad2[PLATFORM_CACHE_LINE]; +            }; +        }; -    ////////////////////////////////////////////////////////////////////////////////  -    // Simple wrapper to deal with AutoPtrs  +    private: +        TQueueType Queues[Concurrency]; -    template <typename T, typename TImpl>  -    class TAutoQueueBase: private TNonCopyable {  -    private:  -        TImpl Impl;  +    public: +        using TItem = T; -    public:  -        using TItem = TAutoPtr<T>;  +        template <typename TT> +        void Enqueue(TT&& value) { +            while (!TryEnqueue(std::forward<TT>(value))) { +                SpinLockPause(); +            } +        } -        ~TAutoQueueBase() {  -            TAutoPtr<T> value;  -            while (Dequeue(value)) {  -                // do nothing  -            }  +        bool Dequeue(T& value) { +            size_t readPos = GetCycleCount(); +            for (size_t i = 0; i < Concurrency; ++i) { +                TQueueType& queue = Queues[readPos++ % Concurrency]; +                if (AtomicTryAndTryLock(&queue.ReadLock)) { +                    bool dequeued = queue.Dequeue(value); +                    AtomicUnlock(&queue.ReadLock); +                    if (dequeued) { +                        return true; +                    } +                } +            } +            return false;          } -        void Enqueue(TAutoPtr<T> value) {  -            Impl.Enqueue(value.Get());  +        bool IsEmpty() { +            for (size_t i = 0; i < Concurrency; ++i) { +                TQueueType& queue = Queues[i]; +                if (AtomicTryAndTryLock(&queue.ReadLock)) { +                    bool empty = queue.IsEmpty(); +                    AtomicUnlock(&queue.ReadLock); +                    if (!empty) { +                        return false; +                    } +                } +            } +            return true; +        } + +    private: +        template <typename TT> +        bool TryEnqueue(TT&& value) { +            size_t writePos = GetCycleCount(); +            for (size_t i = 0; i < Concurrency; ++i) { +                TQueueType& queue = Queues[writePos++ % Concurrency]; +                if (AtomicTryAndTryLock(&queue.WriteLock)) { +                    queue.Enqueue(std::forward<TT>(value)); +                    AtomicUnlock(&queue.WriteLock); +                    return true; +                } +            } +            return false; +        } +    }; + +    //////////////////////////////////////////////////////////////////////////////// +    // Simple wrapper to deal with AutoPtrs + +    template <typename T, typename TImpl> +    class TAutoQueueBase: private TNonCopyable { +    private: +        TImpl Impl; + +    public: +        using TItem = TAutoPtr<T>; + +        ~TAutoQueueBase() { +            TAutoPtr<T> value; +            while (Dequeue(value)) { +                // do nothing +            } +        } + +        void Enqueue(TAutoPtr<T> value) { +            Impl.Enqueue(value.Get());              Y_UNUSED(value.Release()); -        }  +        } -        bool Dequeue(TAutoPtr<T>& value) {  -            T* ptr = nullptr;  -            if (Impl.Dequeue(ptr)) {  -                value.Reset(ptr);  -                return true;  -            }  -            return false;  +        bool Dequeue(TAutoPtr<T>& value) { +            T* ptr = nullptr; +            if (Impl.Dequeue(ptr)) { +                value.Reset(ptr); +                return true; +            } +            return false;          } -        bool IsEmpty() {  -            return Impl.IsEmpty();  -        }  -    };  +        bool IsEmpty() { +            return Impl.IsEmpty(); +        } +    }; -    template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>  -    using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>;  +    template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE> +    using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>; -    template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>  -    using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>;  +    template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> +    using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>; -    template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>  -    using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>;  +    template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock> +    using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>; -    template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>  -    using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>;  +    template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> +    using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>; -    template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>  -    using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>;  -}  +    template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> +    using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>; +} diff --git a/library/cpp/threading/chunk_queue/queue_ut.cpp b/library/cpp/threading/chunk_queue/queue_ut.cpp index 406b71dd4c1..8cb36d8dd19 100644 --- a/library/cpp/threading/chunk_queue/queue_ut.cpp +++ b/library/cpp/threading/chunk_queue/queue_ut.cpp @@ -5,55 +5,55 @@  #include <util/generic/set.h>  namespace NThreading { -    ////////////////////////////////////////////////////////////////////////////////  +    ////////////////////////////////////////////////////////////////////////////////      Y_UNIT_TEST_SUITE(TOneOneQueueTest){          Y_UNIT_TEST(ShouldBeEmptyAtStart){ -            TOneOneQueue<int> queue;  +            TOneOneQueue<int> queue; -    int result = 0;  -    UNIT_ASSERT(queue.IsEmpty());  -    UNIT_ASSERT(!queue.Dequeue(result));  -}  +    int result = 0; +    UNIT_ASSERT(queue.IsEmpty()); +    UNIT_ASSERT(!queue.Dequeue(result)); +}  Y_UNIT_TEST(ShouldReturnEntries) { -    TOneOneQueue<int> queue;  -    queue.Enqueue(1);  -    queue.Enqueue(2);  -    queue.Enqueue(3);  +    TOneOneQueue<int> queue; +    queue.Enqueue(1); +    queue.Enqueue(2); +    queue.Enqueue(3); -    int result = 0;  -    UNIT_ASSERT(!queue.IsEmpty());  -    UNIT_ASSERT(queue.Dequeue(result));  -    UNIT_ASSERT_EQUAL(result, 1);  +    int result = 0; +    UNIT_ASSERT(!queue.IsEmpty()); +    UNIT_ASSERT(queue.Dequeue(result)); +    UNIT_ASSERT_EQUAL(result, 1); -    UNIT_ASSERT(!queue.IsEmpty());  -    UNIT_ASSERT(queue.Dequeue(result));  -    UNIT_ASSERT_EQUAL(result, 2);  +    UNIT_ASSERT(!queue.IsEmpty()); +    UNIT_ASSERT(queue.Dequeue(result)); +    UNIT_ASSERT_EQUAL(result, 2); -    UNIT_ASSERT(!queue.IsEmpty());  -    UNIT_ASSERT(queue.Dequeue(result));  -    UNIT_ASSERT_EQUAL(result, 3);  +    UNIT_ASSERT(!queue.IsEmpty()); +    UNIT_ASSERT(queue.Dequeue(result)); +    UNIT_ASSERT_EQUAL(result, 3); -    UNIT_ASSERT(queue.IsEmpty());  -    UNIT_ASSERT(!queue.Dequeue(result));  -}  +    UNIT_ASSERT(queue.IsEmpty()); +    UNIT_ASSERT(!queue.Dequeue(result)); +}  Y_UNIT_TEST(ShouldStoreMultipleChunks) { -    TOneOneQueue<int, 100> queue;  -    for (int i = 0; i < 1000; ++i) {  -        queue.Enqueue(i);  +    TOneOneQueue<int, 100> queue; +    for (int i = 0; i < 1000; ++i) { +        queue.Enqueue(i);      } -    for (int i = 0; i < 1000; ++i) {  -        int result = 0;  -        UNIT_ASSERT(!queue.IsEmpty());  -        UNIT_ASSERT(queue.Dequeue(result));  -        UNIT_ASSERT_EQUAL(result, i);  +    for (int i = 0; i < 1000; ++i) { +        int result = 0; +        UNIT_ASSERT(!queue.IsEmpty()); +        UNIT_ASSERT(queue.Dequeue(result)); +        UNIT_ASSERT_EQUAL(result, i);      } -}  -}  -;  +} +} +;  //////////////////////////////////////////////////////////////////////////////// @@ -61,35 +61,35 @@ Y_UNIT_TEST_SUITE(TManyOneQueueTest){      Y_UNIT_TEST(ShouldBeEmptyAtStart){          TManyOneQueue<int> queue; -int result;  -UNIT_ASSERT(queue.IsEmpty());  -UNIT_ASSERT(!queue.Dequeue(result));  -}  +int result; +UNIT_ASSERT(queue.IsEmpty()); +UNIT_ASSERT(!queue.Dequeue(result)); +}  Y_UNIT_TEST(ShouldReturnEntries) { -    TManyOneQueue<int> queue;  -    queue.Enqueue(1);  -    queue.Enqueue(2);  -    queue.Enqueue(3);  - -    int result = 0;  -    UNIT_ASSERT(!queue.IsEmpty());  -    UNIT_ASSERT(queue.Dequeue(result));  -    UNIT_ASSERT_EQUAL(result, 1);  - -    UNIT_ASSERT(!queue.IsEmpty());  -    UNIT_ASSERT(queue.Dequeue(result));  -    UNIT_ASSERT_EQUAL(result, 2);  - -    UNIT_ASSERT(!queue.IsEmpty());  -    UNIT_ASSERT(queue.Dequeue(result));  -    UNIT_ASSERT_EQUAL(result, 3);  - -    UNIT_ASSERT(queue.IsEmpty());  -    UNIT_ASSERT(!queue.Dequeue(result));  -}  -}  -;  +    TManyOneQueue<int> queue; +    queue.Enqueue(1); +    queue.Enqueue(2); +    queue.Enqueue(3); + +    int result = 0; +    UNIT_ASSERT(!queue.IsEmpty()); +    UNIT_ASSERT(queue.Dequeue(result)); +    UNIT_ASSERT_EQUAL(result, 1); + +    UNIT_ASSERT(!queue.IsEmpty()); +    UNIT_ASSERT(queue.Dequeue(result)); +    UNIT_ASSERT_EQUAL(result, 2); + +    UNIT_ASSERT(!queue.IsEmpty()); +    UNIT_ASSERT(queue.Dequeue(result)); +    UNIT_ASSERT_EQUAL(result, 3); + +    UNIT_ASSERT(queue.IsEmpty()); +    UNIT_ASSERT(!queue.Dequeue(result)); +} +} +;  //////////////////////////////////////////////////////////////////////////////// @@ -97,35 +97,35 @@ Y_UNIT_TEST_SUITE(TManyManyQueueTest){      Y_UNIT_TEST(ShouldBeEmptyAtStart){          TManyManyQueue<int> queue; -int result = 0;  -UNIT_ASSERT(queue.IsEmpty());  -UNIT_ASSERT(!queue.Dequeue(result));  -}  +int result = 0; +UNIT_ASSERT(queue.IsEmpty()); +UNIT_ASSERT(!queue.Dequeue(result)); +}  Y_UNIT_TEST(ShouldReturnEntries) { -    TManyManyQueue<int> queue;  -    queue.Enqueue(1);  -    queue.Enqueue(2);  -    queue.Enqueue(3);  - -    int result = 0;  -    UNIT_ASSERT(!queue.IsEmpty());  -    UNIT_ASSERT(queue.Dequeue(result));  -    UNIT_ASSERT_EQUAL(result, 1);  - -    UNIT_ASSERT(!queue.IsEmpty());  -    UNIT_ASSERT(queue.Dequeue(result));  -    UNIT_ASSERT_EQUAL(result, 2);  - -    UNIT_ASSERT(!queue.IsEmpty());  -    UNIT_ASSERT(queue.Dequeue(result));  -    UNIT_ASSERT_EQUAL(result, 3);  - -    UNIT_ASSERT(queue.IsEmpty());  -    UNIT_ASSERT(!queue.Dequeue(result));  -}  -}  -;  +    TManyManyQueue<int> queue; +    queue.Enqueue(1); +    queue.Enqueue(2); +    queue.Enqueue(3); + +    int result = 0; +    UNIT_ASSERT(!queue.IsEmpty()); +    UNIT_ASSERT(queue.Dequeue(result)); +    UNIT_ASSERT_EQUAL(result, 1); + +    UNIT_ASSERT(!queue.IsEmpty()); +    UNIT_ASSERT(queue.Dequeue(result)); +    UNIT_ASSERT_EQUAL(result, 2); + +    UNIT_ASSERT(!queue.IsEmpty()); +    UNIT_ASSERT(queue.Dequeue(result)); +    UNIT_ASSERT_EQUAL(result, 3); + +    UNIT_ASSERT(queue.IsEmpty()); +    UNIT_ASSERT(!queue.Dequeue(result)); +} +} +;  //////////////////////////////////////////////////////////////////////////////// @@ -133,37 +133,37 @@ Y_UNIT_TEST_SUITE(TRelaxedManyOneQueueTest){      Y_UNIT_TEST(ShouldBeEmptyAtStart){          TRelaxedManyOneQueue<int> queue; -int result;  -UNIT_ASSERT(queue.IsEmpty());  -UNIT_ASSERT(!queue.Dequeue(result));  -}  +int result; +UNIT_ASSERT(queue.IsEmpty()); +UNIT_ASSERT(!queue.Dequeue(result)); +}  Y_UNIT_TEST(ShouldReturnEntries) { -    TSet<int> items = {1, 2, 3};  +    TSet<int> items = {1, 2, 3}; -    TRelaxedManyOneQueue<int> queue;  -    for (int item : items) {  -        queue.Enqueue(item);  -    }  +    TRelaxedManyOneQueue<int> queue; +    for (int item : items) { +        queue.Enqueue(item); +    } -    int result = 0;  -    UNIT_ASSERT(!queue.IsEmpty());  -    UNIT_ASSERT(queue.Dequeue(result));  -    UNIT_ASSERT(items.erase(result));  +    int result = 0; +    UNIT_ASSERT(!queue.IsEmpty()); +    UNIT_ASSERT(queue.Dequeue(result)); +    UNIT_ASSERT(items.erase(result)); -    UNIT_ASSERT(!queue.IsEmpty());  -    UNIT_ASSERT(queue.Dequeue(result));  -    UNIT_ASSERT(items.erase(result));  +    UNIT_ASSERT(!queue.IsEmpty()); +    UNIT_ASSERT(queue.Dequeue(result)); +    UNIT_ASSERT(items.erase(result)); -    UNIT_ASSERT(!queue.IsEmpty());  -    UNIT_ASSERT(queue.Dequeue(result));  -    UNIT_ASSERT(items.erase(result));  +    UNIT_ASSERT(!queue.IsEmpty()); +    UNIT_ASSERT(queue.Dequeue(result)); +    UNIT_ASSERT(items.erase(result)); -    UNIT_ASSERT(queue.IsEmpty());  -    UNIT_ASSERT(!queue.Dequeue(result));  -}  -}  -;  +    UNIT_ASSERT(queue.IsEmpty()); +    UNIT_ASSERT(!queue.Dequeue(result)); +} +} +;  //////////////////////////////////////////////////////////////////////////////// @@ -171,35 +171,35 @@ Y_UNIT_TEST_SUITE(TRelaxedManyManyQueueTest){      Y_UNIT_TEST(ShouldBeEmptyAtStart){          TRelaxedManyManyQueue<int> queue; -int result = 0;  -UNIT_ASSERT(queue.IsEmpty());  -UNIT_ASSERT(!queue.Dequeue(result));  -}  +int result = 0; +UNIT_ASSERT(queue.IsEmpty()); +UNIT_ASSERT(!queue.Dequeue(result)); +}  Y_UNIT_TEST(ShouldReturnEntries) { -    TSet<int> items = {1, 2, 3};  - -    TRelaxedManyManyQueue<int> queue;  -    for (int item : items) {  -        queue.Enqueue(item);  -    }  - -    int result = 0;  -    UNIT_ASSERT(!queue.IsEmpty());  -    UNIT_ASSERT(queue.Dequeue(result));  -    UNIT_ASSERT(items.erase(result));  - -    UNIT_ASSERT(!queue.IsEmpty());  -    UNIT_ASSERT(queue.Dequeue(result));  -    UNIT_ASSERT(items.erase(result));  - -    UNIT_ASSERT(!queue.IsEmpty());  -    UNIT_ASSERT(queue.Dequeue(result));  -    UNIT_ASSERT(items.erase(result));  - -    UNIT_ASSERT(queue.IsEmpty());  -    UNIT_ASSERT(!queue.Dequeue(result));  -}  -}  -;  -}  +    TSet<int> items = {1, 2, 3}; + +    TRelaxedManyManyQueue<int> queue; +    for (int item : items) { +        queue.Enqueue(item); +    } + +    int result = 0; +    UNIT_ASSERT(!queue.IsEmpty()); +    UNIT_ASSERT(queue.Dequeue(result)); +    UNIT_ASSERT(items.erase(result)); + +    UNIT_ASSERT(!queue.IsEmpty()); +    UNIT_ASSERT(queue.Dequeue(result)); +    UNIT_ASSERT(items.erase(result)); + +    UNIT_ASSERT(!queue.IsEmpty()); +    UNIT_ASSERT(queue.Dequeue(result)); +    UNIT_ASSERT(items.erase(result)); + +    UNIT_ASSERT(queue.IsEmpty()); +    UNIT_ASSERT(!queue.Dequeue(result)); +} +} +; +} diff --git a/library/cpp/threading/future/async.h b/library/cpp/threading/future/async.h index f964d2dc887..8543fdd5c68 100644 --- a/library/cpp/threading/future/async.h +++ b/library/cpp/threading/future/async.h @@ -2,11 +2,11 @@  #include "future.h" -#include <util/generic/function.h>  +#include <util/generic/function.h>  #include <util/thread/pool.h>  namespace NThreading { -    /**  +    /**   * @brief Asynchronously executes @arg func in @arg queue returning a future for the result.   *   * @arg func should be a callable object with signature T(). @@ -17,15 +17,15 @@ namespace NThreading {   * If you want to use another queue for execution just write an overload, @see ExtensionExample   * unittest.   */ -    template <typename Func>  +    template <typename Func>      TFuture<TFutureType<TFunctionResult<Func>>> Async(Func&& func, IThreadPool& queue) { -        auto promise = NewPromise<TFutureType<TFunctionResult<Func>>>();  +        auto promise = NewPromise<TFutureType<TFunctionResult<Func>>>();          auto lambda = [promise, func = std::forward<Func>(func)]() mutable { -            NImpl::SetValue(promise, func);  +            NImpl::SetValue(promise, func);          };          queue.SafeAddFunc(std::move(lambda)); -        return promise.GetFuture();  -    }  +        return promise.GetFuture(); +    }  } diff --git a/library/cpp/threading/future/async_ut.cpp b/library/cpp/threading/future/async_ut.cpp index a452965dbcd..a3699744e48 100644 --- a/library/cpp/threading/future/async_ut.cpp +++ b/library/cpp/threading/future/async_ut.cpp @@ -6,13 +6,13 @@  #include <util/generic/vector.h>  namespace { -    struct TMySuperTaskQueue {  -    };  +    struct TMySuperTaskQueue { +    };  }  namespace NThreading { -    /* Here we provide an Async overload for TMySuperTaskQueue indide NThreading namespace  +    /* Here we provide an Async overload for TMySuperTaskQueue indide NThreading namespace   * so that we can call it in the way   *   * TMySuperTaskQueue queue; @@ -20,38 +20,38 @@ namespace NThreading {   *   * See also ExtensionExample unittest.   */ -    template <typename Func>  -    TFuture<TFunctionResult<Func>> Async(Func func, TMySuperTaskQueue&) {  -        return MakeFuture(func());  -    }  +    template <typename Func> +    TFuture<TFunctionResult<Func>> Async(Func func, TMySuperTaskQueue&) { +        return MakeFuture(func()); +    }  }  Y_UNIT_TEST_SUITE(Async) {      Y_UNIT_TEST(ExtensionExample) { -        TMySuperTaskQueue queue;  -        auto future = NThreading::Async([]() { return 5; }, queue);  -        future.Wait();  -        UNIT_ASSERT_VALUES_EQUAL(future.GetValue(), 5);  -    }  +        TMySuperTaskQueue queue; +        auto future = NThreading::Async([]() { return 5; }, queue); +        future.Wait(); +        UNIT_ASSERT_VALUES_EQUAL(future.GetValue(), 5); +    }      Y_UNIT_TEST(WorksWithIMtpQueue) {          auto queue = MakeHolder<TThreadPool>(); -        queue->Start(1);  +        queue->Start(1); -        auto future = NThreading::Async([]() { return 5; }, *queue);  -        future.Wait();  -        UNIT_ASSERT_VALUES_EQUAL(future.GetValue(), 5);  -    }  +        auto future = NThreading::Async([]() { return 5; }, *queue); +        future.Wait(); +        UNIT_ASSERT_VALUES_EQUAL(future.GetValue(), 5); +    }      Y_UNIT_TEST(ProperlyDeducesFutureType) { -        // Compileability test  +        // Compileability test          auto queue = CreateThreadPool(1); -        NThreading::TFuture<void> f1 = NThreading::Async([]() {}, *queue);  -        NThreading::TFuture<int> f2 = NThreading::Async([]() { return 5; }, *queue);  -        NThreading::TFuture<double> f3 = NThreading::Async([]() { return 5.0; }, *queue);  -        NThreading::TFuture<TVector<int>> f4 = NThreading::Async([]() { return TVector<int>(); }, *queue);  -        NThreading::TFuture<int> f5 = NThreading::Async([]() { return NThreading::MakeFuture(5); }, *queue);  -    }  +        NThreading::TFuture<void> f1 = NThreading::Async([]() {}, *queue); +        NThreading::TFuture<int> f2 = NThreading::Async([]() { return 5; }, *queue); +        NThreading::TFuture<double> f3 = NThreading::Async([]() { return 5.0; }, *queue); +        NThreading::TFuture<TVector<int>> f4 = NThreading::Async([]() { return TVector<int>(); }, *queue); +        NThreading::TFuture<int> f5 = NThreading::Async([]() { return NThreading::MakeFuture(5); }, *queue); +    }  } diff --git a/library/cpp/threading/future/core/future-inl.h b/library/cpp/threading/future/core/future-inl.h index a0e06c18912..5fd4296a93c 100644 --- a/library/cpp/threading/future/core/future-inl.h +++ b/library/cpp/threading/future/core/future-inl.h @@ -2,92 +2,92 @@  #if !defined(INCLUDE_FUTURE_INL_H)  #error "you should never include future-inl.h directly" -#endif // INCLUDE_FUTURE_INL_H  +#endif // INCLUDE_FUTURE_INL_H  namespace NThreading { -    namespace NImpl {  -        ////////////////////////////////////////////////////////////////////////////////  +    namespace NImpl { +        //////////////////////////////////////////////////////////////////////////////// -        template <typename T>  -        using TCallback = std::function<void(const TFuture<T>&)>;  +        template <typename T> +        using TCallback = std::function<void(const TFuture<T>&)>; -        template <typename T>  -        using TCallbackList = TVector<TCallback<T>>; // TODO: small vector  +        template <typename T> +        using TCallbackList = TVector<TCallback<T>>; // TODO: small vector -        ////////////////////////////////////////////////////////////////////////////////  +        ////////////////////////////////////////////////////////////////////////////////          enum class TError {              Error          }; -        template <typename T>  -        class TFutureState: public TAtomicRefCount<TFutureState<T>> {  -            enum {  -                NotReady,  -                ExceptionSet,  -                ValueMoved, // keep the ordering of this and following values  -                ValueSet,  -                ValueRead,  -            };  - -        private:  -            mutable TAtomic State;  -            TAdaptiveLock StateLock;  - -            TCallbackList<T> Callbacks;  +        template <typename T> +        class TFutureState: public TAtomicRefCount<TFutureState<T>> { +            enum { +                NotReady, +                ExceptionSet, +                ValueMoved, // keep the ordering of this and following values +                ValueSet, +                ValueRead, +            }; + +        private: +            mutable TAtomic State; +            TAdaptiveLock StateLock; + +            TCallbackList<T> Callbacks;              mutable THolder<TSystemEvent> ReadyEvent; -            std::exception_ptr Exception;  - -            union {  -                char NullValue;  -                T Value;  -            };  - -            void AccessValue(TDuration timeout, int acquireState) const {  -                int state = AtomicGet(State);  -                if (Y_UNLIKELY(state == NotReady)) {  -                    if (timeout == TDuration::Zero()) {  -                        ythrow TFutureException() << "value not set";  -                    }  - -                    if (!Wait(timeout)) {  -                        ythrow TFutureException() << "wait timeout";  -                    }  -  -                    state = AtomicGet(State);  -                }  -  +            std::exception_ptr Exception; + +            union { +                char NullValue; +                T Value; +            }; + +            void AccessValue(TDuration timeout, int acquireState) const { +                int state = AtomicGet(State); +                if (Y_UNLIKELY(state == NotReady)) { +                    if (timeout == TDuration::Zero()) { +                        ythrow TFutureException() << "value not set"; +                    } + +                    if (!Wait(timeout)) { +                        ythrow TFutureException() << "wait timeout"; +                    } + +                    state = AtomicGet(State); +                } +                  TryRethrowWithState(state); -  -                switch (AtomicGetAndCas(&State, acquireState, ValueSet)) {  -                    case ValueSet:  -                        break;  -                    case ValueRead:  -                        if (acquireState != ValueRead) {  -                            ythrow TFutureException() << "value being read";  -                        }  -                        break;  -                    case ValueMoved:  -                        ythrow TFutureException() << "value was moved";  -                    default:  -                        Y_ASSERT(state == ValueSet);  -                }  -            } - -        public:  -            TFutureState()  -                : State(NotReady)  -                , NullValue(0)  -            {  -            } - -            template <typename TT>  -            TFutureState(TT&& value)  -                : State(ValueSet)  -                , Value(std::forward<TT>(value))  -            {  -            }  + +                switch (AtomicGetAndCas(&State, acquireState, ValueSet)) { +                    case ValueSet: +                        break; +                    case ValueRead: +                        if (acquireState != ValueRead) { +                            ythrow TFutureException() << "value being read"; +                        } +                        break; +                    case ValueMoved: +                        ythrow TFutureException() << "value was moved"; +                    default: +                        Y_ASSERT(state == ValueSet); +                } +            } + +        public: +            TFutureState() +                : State(NotReady) +                , NullValue(0) +            { +            } + +            template <typename TT> +            TFutureState(TT&& value) +                : State(ValueSet) +                , Value(std::forward<TT>(value)) +            { +            }              TFutureState(std::exception_ptr exception, TError)                  : State(ExceptionSet) @@ -96,14 +96,14 @@ namespace NThreading {              {              } -            ~TFutureState() {  -                if (State >= ValueMoved) { // ValueMoved, ValueSet, ValueRead  -                    Value.~T();  -                }  -            }  +            ~TFutureState() { +                if (State >= ValueMoved) { // ValueMoved, ValueSet, ValueRead +                    Value.~T(); +                } +            } -            bool HasValue() const {  -                return AtomicGet(State) >= ValueMoved; // ValueMoved, ValueSet, ValueRead  +            bool HasValue() const { +                return AtomicGet(State) >= ValueMoved; // ValueMoved, ValueSet, ValueRead              }              void TryRethrow() const { @@ -111,22 +111,22 @@ namespace NThreading {                  TryRethrowWithState(state);              } -            bool HasException() const {  -                return AtomicGet(State) == ExceptionSet;  -            }  +            bool HasException() const { +                return AtomicGet(State) == ExceptionSet; +            } -            const T& GetValue(TDuration timeout = TDuration::Zero()) const {  -                AccessValue(timeout, ValueRead);  -                return Value;  -            }  +            const T& GetValue(TDuration timeout = TDuration::Zero()) const { +                AccessValue(timeout, ValueRead); +                return Value; +            } -            T ExtractValue(TDuration timeout = TDuration::Zero()) {  -                AccessValue(timeout, ValueMoved);  -                return std::move(Value);  -            }  +            T ExtractValue(TDuration timeout = TDuration::Zero()) { +                AccessValue(timeout, ValueMoved); +                return std::move(Value); +            } -            template <typename TT>  -            void SetValue(TT&& value) {  +            template <typename TT> +            void SetValue(TT&& value) {                  bool success = TrySetValue(std::forward<TT>(value));                  if (Y_UNLIKELY(!success)) {                      ythrow TFutureException() << "value already set"; @@ -136,37 +136,37 @@ namespace NThreading {              template <typename TT>              bool TrySetValue(TT&& value) {                  TSystemEvent* readyEvent = nullptr; -                TCallbackList<T> callbacks;  +                TCallbackList<T> callbacks; -                with_lock (StateLock) {  -                    int state = AtomicGet(State);  -                    if (Y_UNLIKELY(state != NotReady)) {  +                with_lock (StateLock) { +                    int state = AtomicGet(State); +                    if (Y_UNLIKELY(state != NotReady)) {                          return false; -                    }  +                    } + +                    new (&Value) T(std::forward<TT>(value)); -                    new (&Value) T(std::forward<TT>(value));  +                    readyEvent = ReadyEvent.Get(); +                    callbacks = std::move(Callbacks); -                    readyEvent = ReadyEvent.Get();  -                    callbacks = std::move(Callbacks);  +                    AtomicSet(State, ValueSet); +                } -                    AtomicSet(State, ValueSet);  -                }  +                if (readyEvent) { +                    readyEvent->Signal(); +                } -                if (readyEvent) {  -                    readyEvent->Signal();  -                }  -  -                if (callbacks) {  -                    TFuture<T> temp(this);  -                    for (auto& callback : callbacks) {  -                        callback(temp);  -                    }  -                }  +                if (callbacks) { +                    TFuture<T> temp(this); +                    for (auto& callback : callbacks) { +                        callback(temp); +                    } +                }                  return true;              } -            void SetException(std::exception_ptr e) {  +            void SetException(std::exception_ptr e) {                  bool success = TrySetException(std::move(e));                  if (Y_UNLIKELY(!success)) {                      ythrow TFutureException() << "value already set"; @@ -175,73 +175,73 @@ namespace NThreading {              bool TrySetException(std::exception_ptr e) {                  TSystemEvent* readyEvent; -                TCallbackList<T> callbacks;  +                TCallbackList<T> callbacks; -                with_lock (StateLock) {  -                    int state = AtomicGet(State);  -                    if (Y_UNLIKELY(state != NotReady)) {  +                with_lock (StateLock) { +                    int state = AtomicGet(State); +                    if (Y_UNLIKELY(state != NotReady)) {                          return false; -                    }  - -                    Exception = std::move(e);  - -                    readyEvent = ReadyEvent.Get();  -                    callbacks = std::move(Callbacks);  - -                    AtomicSet(State, ExceptionSet);  -                }  -  -                if (readyEvent) {  -                    readyEvent->Signal();  -                }  -  -                if (callbacks) {  -                    TFuture<T> temp(this);  -                    for (auto& callback : callbacks) {  -                        callback(temp);  -                    }  -                }  +                    } + +                    Exception = std::move(e); + +                    readyEvent = ReadyEvent.Get(); +                    callbacks = std::move(Callbacks); + +                    AtomicSet(State, ExceptionSet); +                } + +                if (readyEvent) { +                    readyEvent->Signal(); +                } + +                if (callbacks) { +                    TFuture<T> temp(this); +                    for (auto& callback : callbacks) { +                        callback(temp); +                    } +                }                  return true;              } -            template <typename F>  -            bool Subscribe(F&& func) {  -                with_lock (StateLock) {  -                    int state = AtomicGet(State);  -                    if (state == NotReady) {  -                        Callbacks.emplace_back(std::forward<F>(func));  -                        return true;  -                    }  -                }  -                return false;  -            }  +            template <typename F> +            bool Subscribe(F&& func) { +                with_lock (StateLock) { +                    int state = AtomicGet(State); +                    if (state == NotReady) { +                        Callbacks.emplace_back(std::forward<F>(func)); +                        return true; +                    } +                } +                return false; +            } -            void Wait() const {  -                Wait(TInstant::Max());  +            void Wait() const { +                Wait(TInstant::Max());              } -            bool Wait(TDuration timeout) const {  -                return Wait(timeout.ToDeadLine());  -            }  +            bool Wait(TDuration timeout) const { +                return Wait(timeout.ToDeadLine()); +            } -            bool Wait(TInstant deadline) const {  +            bool Wait(TInstant deadline) const {                  TSystemEvent* readyEvent = nullptr; -                with_lock (StateLock) {  -                    int state = AtomicGet(State);  -                    if (state != NotReady) {  -                        return true;  -                    }  +                with_lock (StateLock) { +                    int state = AtomicGet(State); +                    if (state != NotReady) { +                        return true; +                    } -                    if (!ReadyEvent) {  +                    if (!ReadyEvent) {                          ReadyEvent.Reset(new TSystemEvent()); -                    }  -                    readyEvent = ReadyEvent.Get();  -                }  +                    } +                    readyEvent = ReadyEvent.Get(); +                } -                Y_ASSERT(readyEvent);  -                return readyEvent->WaitD(deadline);  +                Y_ASSERT(readyEvent); +                return readyEvent->WaitD(deadline);              }              void TryRethrowWithState(int state) const { @@ -250,31 +250,31 @@ namespace NThreading {                      std::rethrow_exception(Exception);                  }              } -        };  +        }; -        ////////////////////////////////////////////////////////////////////////////////  +        //////////////////////////////////////////////////////////////////////////////// -        template <>  -        class TFutureState<void>: public TAtomicRefCount<TFutureState<void>> {  -            enum {  -                NotReady,  -                ValueSet,  -                ExceptionSet,  -            };  +        template <> +        class TFutureState<void>: public TAtomicRefCount<TFutureState<void>> { +            enum { +                NotReady, +                ValueSet, +                ExceptionSet, +            }; -        private:  -            TAtomic State;  -            TAdaptiveLock StateLock;  +        private: +            TAtomic State; +            TAdaptiveLock StateLock; -            TCallbackList<void> Callbacks;  +            TCallbackList<void> Callbacks;              mutable THolder<TSystemEvent> ReadyEvent; -            std::exception_ptr Exception;  -  -        public:  -            TFutureState(bool valueSet = false)  -                : State(valueSet ? ValueSet : NotReady)  -            {  +            std::exception_ptr Exception; + +        public: +            TFutureState(bool valueSet = false) +                : State(valueSet ? ValueSet : NotReady) +            {              }              TFutureState(std::exception_ptr exception, TError) @@ -283,8 +283,8 @@ namespace NThreading {              {              } -            bool HasValue() const {  -                return AtomicGet(State) == ValueSet;  +            bool HasValue() const { +                return AtomicGet(State) == ValueSet;              }              void TryRethrow() const { @@ -292,30 +292,30 @@ namespace NThreading {                  TryRethrowWithState(state);              } -            bool HasException() const {  -                return AtomicGet(State) == ExceptionSet;  -            }  +            bool HasException() const { +                return AtomicGet(State) == ExceptionSet; +            } -            void GetValue(TDuration timeout = TDuration::Zero()) const {  -                int state = AtomicGet(State);  -                if (Y_UNLIKELY(state == NotReady)) {  -                    if (timeout == TDuration::Zero()) {  -                        ythrow TFutureException() << "value not set";  -                    }  +            void GetValue(TDuration timeout = TDuration::Zero()) const { +                int state = AtomicGet(State); +                if (Y_UNLIKELY(state == NotReady)) { +                    if (timeout == TDuration::Zero()) { +                        ythrow TFutureException() << "value not set"; +                    } -                    if (!Wait(timeout)) {  -                        ythrow TFutureException() << "wait timeout";  -                    }  +                    if (!Wait(timeout)) { +                        ythrow TFutureException() << "wait timeout"; +                    } -                    state = AtomicGet(State);  -                }  +                    state = AtomicGet(State); +                }                  TryRethrowWithState(state); -                Y_ASSERT(state == ValueSet);  -            }  +                Y_ASSERT(state == ValueSet); +            } -            void SetValue() {  +            void SetValue() {                  bool success = TrySetValue();                  if (Y_UNLIKELY(!success)) {                      ythrow TFutureException() << "value already set"; @@ -324,35 +324,35 @@ namespace NThreading {              bool TrySetValue() {                  TSystemEvent* readyEvent = nullptr; -                TCallbackList<void> callbacks;  +                TCallbackList<void> callbacks; -                with_lock (StateLock) {  -                    int state = AtomicGet(State);  -                    if (Y_UNLIKELY(state != NotReady)) {  +                with_lock (StateLock) { +                    int state = AtomicGet(State); +                    if (Y_UNLIKELY(state != NotReady)) {                          return false; -                    }  +                    } -                    readyEvent = ReadyEvent.Get();  -                    callbacks = std::move(Callbacks);  +                    readyEvent = ReadyEvent.Get(); +                    callbacks = std::move(Callbacks); -                    AtomicSet(State, ValueSet);  -                }  +                    AtomicSet(State, ValueSet); +                } -                if (readyEvent) {  -                    readyEvent->Signal();  -                }  -  -                if (callbacks) {  -                    TFuture<void> temp(this);  -                    for (auto& callback : callbacks) {  -                        callback(temp);  -                    }  -                }  +                if (readyEvent) { +                    readyEvent->Signal(); +                } + +                if (callbacks) { +                    TFuture<void> temp(this); +                    for (auto& callback : callbacks) { +                        callback(temp); +                    } +                }                  return true;              } -            void SetException(std::exception_ptr e) {  +            void SetException(std::exception_ptr e) {                  bool success = TrySetException(std::move(e));                  if (Y_UNLIKELY(!success)) {                      ythrow TFutureException() << "value already set"; @@ -361,73 +361,73 @@ namespace NThreading {              bool TrySetException(std::exception_ptr e) {                  TSystemEvent* readyEvent = nullptr; -                TCallbackList<void> callbacks;  +                TCallbackList<void> callbacks; -                with_lock (StateLock) {  -                    int state = AtomicGet(State);  -                    if (Y_UNLIKELY(state != NotReady)) {  +                with_lock (StateLock) { +                    int state = AtomicGet(State); +                    if (Y_UNLIKELY(state != NotReady)) {                          return false; -                    }  +                    } -                    Exception = std::move(e);  +                    Exception = std::move(e); -                    readyEvent = ReadyEvent.Get();  -                    callbacks = std::move(Callbacks);  +                    readyEvent = ReadyEvent.Get(); +                    callbacks = std::move(Callbacks); -                    AtomicSet(State, ExceptionSet);  -                }  +                    AtomicSet(State, ExceptionSet); +                } -                if (readyEvent) {  -                    readyEvent->Signal();  -                }  +                if (readyEvent) { +                    readyEvent->Signal(); +                } -                if (callbacks) {  -                    TFuture<void> temp(this);  -                    for (auto& callback : callbacks) {  -                        callback(temp);  -                    }  -                }  +                if (callbacks) { +                    TFuture<void> temp(this); +                    for (auto& callback : callbacks) { +                        callback(temp); +                    } +                }                  return true; -            }  +            } -            template <typename F>  -            bool Subscribe(F&& func) {  -                with_lock (StateLock) {  -                    int state = AtomicGet(State);  -                    if (state == NotReady) {  -                        Callbacks.emplace_back(std::forward<F>(func));  -                        return true;  -                    }  -                }  -                return false;  -            }  +            template <typename F> +            bool Subscribe(F&& func) { +                with_lock (StateLock) { +                    int state = AtomicGet(State); +                    if (state == NotReady) { +                        Callbacks.emplace_back(std::forward<F>(func)); +                        return true; +                    } +                } +                return false; +            } -            void Wait() const {  -                Wait(TInstant::Max());  +            void Wait() const { +                Wait(TInstant::Max());              } -            bool Wait(TDuration timeout) const {  -                return Wait(timeout.ToDeadLine());  -            }  +            bool Wait(TDuration timeout) const { +                return Wait(timeout.ToDeadLine()); +            } -            bool Wait(TInstant deadline) const {  +            bool Wait(TInstant deadline) const {                  TSystemEvent* readyEvent = nullptr; -  -                with_lock (StateLock) {  -                    int state = AtomicGet(State);  -                    if (state != NotReady) {  -                        return true;  -                    }  -  -                    if (!ReadyEvent) {  + +                with_lock (StateLock) { +                    int state = AtomicGet(State); +                    if (state != NotReady) { +                        return true; +                    } + +                    if (!ReadyEvent) {                          ReadyEvent.Reset(new TSystemEvent()); -                    }  -                    readyEvent = ReadyEvent.Get();  -                }  -  -                Y_ASSERT(readyEvent);  -                return readyEvent->WaitD(deadline);  +                    } +                    readyEvent = ReadyEvent.Get(); +                } + +                Y_ASSERT(readyEvent); +                return readyEvent->WaitD(deadline);              }              void TryRethrowWithState(int state) const { @@ -436,53 +436,53 @@ namespace NThreading {                      std::rethrow_exception(Exception);                  }              } -        };  +        }; -        ////////////////////////////////////////////////////////////////////////////////  +        //////////////////////////////////////////////////////////////////////////////// -        template <typename T>  -        inline void SetValueImpl(TPromise<T>& promise, const T& value) {  -            promise.SetValue(value);  -        }  +        template <typename T> +        inline void SetValueImpl(TPromise<T>& promise, const T& value) { +            promise.SetValue(value); +        } -        template <typename T>  -        inline void SetValueImpl(TPromise<T>& promise, T&& value) {  -            promise.SetValue(std::move(value));  +        template <typename T> +        inline void SetValueImpl(TPromise<T>& promise, T&& value) { +            promise.SetValue(std::move(value));          } -        template <typename T>  +        template <typename T>          inline void SetValueImpl(TPromise<T>& promise, const TFuture<T>& future,                                   std::enable_if_t<!std::is_void<T>::value, bool> = false) { -            future.Subscribe([=](const TFuture<T>& f) mutable {  +            future.Subscribe([=](const TFuture<T>& f) mutable {                  T const* value; -                try {  +                try {                      value = &f.GetValue(); -                } catch (...) {  -                    promise.SetException(std::current_exception());  +                } catch (...) { +                    promise.SetException(std::current_exception());                      return; -                }  +                }                  promise.SetValue(*value); -            });  +            });          }          template <typename T>          inline void SetValueImpl(TPromise<void>& promise, const TFuture<T>& future) {              future.Subscribe([=](const TFuture<T>& f) mutable { -                try {  +                try {                      f.TryRethrow(); -                } catch (...) {  -                    promise.SetException(std::current_exception());  +                } catch (...) { +                    promise.SetException(std::current_exception());                      return; -                }  +                }                  promise.SetValue(); -            });  -        }  -  -        template <typename T, typename F>  -        inline void SetValue(TPromise<T>& promise, F&& func) {  -            try {  -                SetValueImpl(promise, func());  -            } catch (...) {  +            }); +        } + +        template <typename T, typename F> +        inline void SetValue(TPromise<T>& promise, F&& func) { +            try { +                SetValueImpl(promise, func()); +            } catch (...) {                  const bool success = promise.TrySetException(std::current_exception());                  if (Y_UNLIKELY(!success)) {                      throw; @@ -490,21 +490,21 @@ namespace NThreading {              }          } -        template <typename F>  -        inline void SetValue(TPromise<void>& promise, F&& func,  -                             std::enable_if_t<std::is_void<TFunctionResult<F>>::value, bool> = false) {  -            try {  -                func();  -            } catch (...) {  -                promise.SetException(std::current_exception());  +        template <typename F> +        inline void SetValue(TPromise<void>& promise, F&& func, +                             std::enable_if_t<std::is_void<TFunctionResult<F>>::value, bool> = false) { +            try { +                func(); +            } catch (...) { +                promise.SetException(std::current_exception());                  return;              }              promise.SetValue();          } -    }  +    } -    ////////////////////////////////////////////////////////////////////////////////  +    ////////////////////////////////////////////////////////////////////////////////      class TFutureStateId {      private: @@ -532,45 +532,45 @@ namespace NThreading {      //////////////////////////////////////////////////////////////////////////////// -    template <typename T>  +    template <typename T>      inline TFuture<T>::TFuture(const TIntrusivePtr<TFutureState>& state) noexcept -        : State(state)  +        : State(state)      {      } -    template <typename T>  -    inline void TFuture<T>::Swap(TFuture<T>& other) {  -        State.Swap(other.State);  -    }  +    template <typename T> +    inline void TFuture<T>::Swap(TFuture<T>& other) { +        State.Swap(other.State); +    } -    template <typename T>  -    inline bool TFuture<T>::HasValue() const {  -        return State && State->HasValue();  -    }  +    template <typename T> +    inline bool TFuture<T>::HasValue() const { +        return State && State->HasValue(); +    } -    template <typename T>  -    inline const T& TFuture<T>::GetValue(TDuration timeout) const {  -        EnsureInitialized();  -        return State->GetValue(timeout);  +    template <typename T> +    inline const T& TFuture<T>::GetValue(TDuration timeout) const { +        EnsureInitialized(); +        return State->GetValue(timeout);      } -    template <typename T>  -    inline T TFuture<T>::ExtractValue(TDuration timeout) {  -        EnsureInitialized();  -        return State->ExtractValue(timeout);  -    }  +    template <typename T> +    inline T TFuture<T>::ExtractValue(TDuration timeout) { +        EnsureInitialized(); +        return State->ExtractValue(timeout); +    } -    template <typename T>  -    inline const T& TFuture<T>::GetValueSync() const {  -        return GetValue(TDuration::Max());  -    }  +    template <typename T> +    inline const T& TFuture<T>::GetValueSync() const { +        return GetValue(TDuration::Max()); +    } -    template <typename T>  -    inline T TFuture<T>::ExtractValueSync() {  -        return ExtractValue(TDuration::Max());  -    }  +    template <typename T> +    inline T TFuture<T>::ExtractValueSync() { +        return ExtractValue(TDuration::Max()); +    } -    template <typename T>  +    template <typename T>      inline void TFuture<T>::TryRethrow() const {          if (State) {              State->TryRethrow(); @@ -578,40 +578,40 @@ namespace NThreading {      }      template <typename T> -    inline bool TFuture<T>::HasException() const {  -        return State && State->HasException();  -    }  - -    template <typename T>  -    inline void TFuture<T>::Wait() const {  -        EnsureInitialized();  -        return State->Wait();  -    }  - -    template <typename T>  -    inline bool TFuture<T>::Wait(TDuration timeout) const {  -        EnsureInitialized();  -        return State->Wait(timeout);  -    }  - -    template <typename T>  -    inline bool TFuture<T>::Wait(TInstant deadline) const {  -        EnsureInitialized();  -        return State->Wait(deadline);  -    }  - -    template <typename T>  -    template <typename F>  -    inline const TFuture<T>& TFuture<T>::Subscribe(F&& func) const {  -        EnsureInitialized();  -        if (!State->Subscribe(std::forward<F>(func))) {  -            func(*this);  -        }  -        return *this;  -    }  - -    template <typename T>  -    template <typename F>  +    inline bool TFuture<T>::HasException() const { +        return State && State->HasException(); +    } + +    template <typename T> +    inline void TFuture<T>::Wait() const { +        EnsureInitialized(); +        return State->Wait(); +    } + +    template <typename T> +    inline bool TFuture<T>::Wait(TDuration timeout) const { +        EnsureInitialized(); +        return State->Wait(timeout); +    } + +    template <typename T> +    inline bool TFuture<T>::Wait(TInstant deadline) const { +        EnsureInitialized(); +        return State->Wait(deadline); +    } + +    template <typename T> +    template <typename F> +    inline const TFuture<T>& TFuture<T>::Subscribe(F&& func) const { +        EnsureInitialized(); +        if (!State->Subscribe(std::forward<F>(func))) { +            func(*this); +        } +        return *this; +    } + +    template <typename T> +    template <typename F>      inline const TFuture<T>& TFuture<T>::NoexceptSubscribe(F&& func) const noexcept {          return Subscribe(std::forward<F>(func));      } @@ -623,59 +623,59 @@ namespace NThreading {          auto promise = NewPromise<TFutureType<TFutureCallResult<F, T>>>();          Subscribe([promise, func = std::forward<F>(func)](const TFuture<T>& future) mutable {              NImpl::SetValue(promise, [&]() { return func(future); }); -        });  -        return promise;  -    }  - -    template <typename T>  -    inline TFuture<void> TFuture<T>::IgnoreResult() const {  -        auto promise = NewPromise();  -        Subscribe([=](const TFuture<T>& future) mutable {  +        }); +        return promise; +    } + +    template <typename T> +    inline TFuture<void> TFuture<T>::IgnoreResult() const { +        auto promise = NewPromise(); +        Subscribe([=](const TFuture<T>& future) mutable {              NImpl::SetValueImpl(promise, future); -        });  -        return promise;  -    }  +        }); +        return promise; +    } -    template <typename T>  -    inline bool TFuture<T>::Initialized() const {  -        return bool(State);  +    template <typename T> +    inline bool TFuture<T>::Initialized() const { +        return bool(State);      } -    template <typename T>  +    template <typename T>      inline TMaybe<TFutureStateId> TFuture<T>::StateId() const noexcept {          return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing();      }      template <typename T> -    inline void TFuture<T>::EnsureInitialized() const {  -        if (!State) {  -            ythrow TFutureException() << "state not initialized";  +    inline void TFuture<T>::EnsureInitialized() const { +        if (!State) { +            ythrow TFutureException() << "state not initialized";          } -    }  +    } -    ////////////////////////////////////////////////////////////////////////////////  +    ////////////////////////////////////////////////////////////////////////////////      inline TFuture<void>::TFuture(const TIntrusivePtr<TFutureState>& state) noexcept -        : State(state)  -    {  -    }  +        : State(state) +    { +    } -    inline void TFuture<void>::Swap(TFuture<void>& other) {  -        State.Swap(other.State);  -    }  +    inline void TFuture<void>::Swap(TFuture<void>& other) { +        State.Swap(other.State); +    } -    inline bool TFuture<void>::HasValue() const {  -        return State && State->HasValue();  -    }  +    inline bool TFuture<void>::HasValue() const { +        return State && State->HasValue(); +    } -    inline void TFuture<void>::GetValue(TDuration timeout) const {  -        EnsureInitialized();  -        State->GetValue(timeout);  -    }  +    inline void TFuture<void>::GetValue(TDuration timeout) const { +        EnsureInitialized(); +        State->GetValue(timeout); +    } -    inline void TFuture<void>::GetValueSync() const {  -        GetValue(TDuration::Max());  -    }  +    inline void TFuture<void>::GetValueSync() const { +        GetValue(TDuration::Max()); +    }      inline void TFuture<void>::TryRethrow() const {          if (State) { @@ -683,35 +683,35 @@ namespace NThreading {          }      } -    inline bool TFuture<void>::HasException() const {  -        return State && State->HasException();  -    }  +    inline bool TFuture<void>::HasException() const { +        return State && State->HasException(); +    } -    inline void TFuture<void>::Wait() const {  -        EnsureInitialized();  -        return State->Wait();  -    }  +    inline void TFuture<void>::Wait() const { +        EnsureInitialized(); +        return State->Wait(); +    } -    inline bool TFuture<void>::Wait(TDuration timeout) const {  -        EnsureInitialized();  -        return State->Wait(timeout);  -    }  +    inline bool TFuture<void>::Wait(TDuration timeout) const { +        EnsureInitialized(); +        return State->Wait(timeout); +    } -    inline bool TFuture<void>::Wait(TInstant deadline) const {  -        EnsureInitialized();  -        return State->Wait(deadline);  -    }  +    inline bool TFuture<void>::Wait(TInstant deadline) const { +        EnsureInitialized(); +        return State->Wait(deadline); +    } -    template <typename F>  -    inline const TFuture<void>& TFuture<void>::Subscribe(F&& func) const {  -        EnsureInitialized();  -        if (!State->Subscribe(std::forward<F>(func))) {  -            func(*this);  -        }  -        return *this;  -    }  +    template <typename F> +    inline const TFuture<void>& TFuture<void>::Subscribe(F&& func) const { +        EnsureInitialized(); +        if (!State->Subscribe(std::forward<F>(func))) { +            func(*this); +        } +        return *this; +    } -    template <typename F>  +    template <typename F>      inline const TFuture<void>& TFuture<void>::NoexceptSubscribe(F&& func) const noexcept {          return Subscribe(std::forward<F>(func));      } @@ -722,82 +722,82 @@ namespace NThreading {          auto promise = NewPromise<TFutureType<TFutureCallResult<F, void>>>();          Subscribe([promise, func = std::forward<F>(func)](const TFuture<void>& future) mutable {              NImpl::SetValue(promise, [&]() { return func(future); }); -        });  -        return promise;  -    }  - -    template <typename R>  -    inline TFuture<R> TFuture<void>::Return(const R& value) const {  -        auto promise = NewPromise<R>();  -        Subscribe([=](const TFuture<void>& future) mutable {  -            try {  +        }); +        return promise; +    } + +    template <typename R> +    inline TFuture<R> TFuture<void>::Return(const R& value) const { +        auto promise = NewPromise<R>(); +        Subscribe([=](const TFuture<void>& future) mutable { +            try {                  future.TryRethrow(); -            } catch (...) {  -                promise.SetException(std::current_exception());  +            } catch (...) { +                promise.SetException(std::current_exception());                  return; -            }  +            }              promise.SetValue(value); -        });  -        return promise;  +        }); +        return promise;      } -    inline bool TFuture<void>::Initialized() const {  -        return bool(State);  -    }  +    inline bool TFuture<void>::Initialized() const { +        return bool(State); +    }      inline TMaybe<TFutureStateId> TFuture<void>::StateId() const noexcept {          return State != nullptr ? MakeMaybe<TFutureStateId>(*State) : Nothing();      } -    inline void TFuture<void>::EnsureInitialized() const {  -        if (!State) {  -            ythrow TFutureException() << "state not initialized";  +    inline void TFuture<void>::EnsureInitialized() const { +        if (!State) { +            ythrow TFutureException() << "state not initialized";          } -    }  +    } -    ////////////////////////////////////////////////////////////////////////////////  +    //////////////////////////////////////////////////////////////////////////////// -    template <typename T>  +    template <typename T>      inline TPromise<T>::TPromise(const TIntrusivePtr<TFutureState>& state) noexcept -        : State(state)  -    {  -    }  - -    template <typename T>  -    inline void TPromise<T>::Swap(TPromise<T>& other) {  -        State.Swap(other.State);  -    }  - -    template <typename T>  -    inline const T& TPromise<T>::GetValue() const {  -        EnsureInitialized();  -        return State->GetValue();  -    }  - -    template <typename T>  -    inline T TPromise<T>::ExtractValue() {  -        EnsureInitialized();  -        return State->ExtractValue();  -    }  - -    template <typename T>  -    inline bool TPromise<T>::HasValue() const {  -        return State && State->HasValue();  -    }  - -    template <typename T>  -    inline void TPromise<T>::SetValue(const T& value) {  -        EnsureInitialized();  -        State->SetValue(value);  -    }  - -    template <typename T>  -    inline void TPromise<T>::SetValue(T&& value) {  -        EnsureInitialized();  -        State->SetValue(std::move(value));  -    }  - -    template <typename T>  +        : State(state) +    { +    } + +    template <typename T> +    inline void TPromise<T>::Swap(TPromise<T>& other) { +        State.Swap(other.State); +    } + +    template <typename T> +    inline const T& TPromise<T>::GetValue() const { +        EnsureInitialized(); +        return State->GetValue(); +    } + +    template <typename T> +    inline T TPromise<T>::ExtractValue() { +        EnsureInitialized(); +        return State->ExtractValue(); +    } + +    template <typename T> +    inline bool TPromise<T>::HasValue() const { +        return State && State->HasValue(); +    } + +    template <typename T> +    inline void TPromise<T>::SetValue(const T& value) { +        EnsureInitialized(); +        State->SetValue(value); +    } + +    template <typename T> +    inline void TPromise<T>::SetValue(T&& value) { +        EnsureInitialized(); +        State->SetValue(std::move(value)); +    } + +    template <typename T>      inline bool TPromise<T>::TrySetValue(const T& value) {          EnsureInitialized();          return State->TrySetValue(value); @@ -817,75 +817,75 @@ namespace NThreading {      }      template <typename T> -    inline bool TPromise<T>::HasException() const {  -        return State && State->HasException();  -    }  +    inline bool TPromise<T>::HasException() const { +        return State && State->HasException(); +    } -    template <typename T>  -    inline void TPromise<T>::SetException(const TString& e) {  -        EnsureInitialized();  -        State->SetException(std::make_exception_ptr(yexception() << e));  -    }  +    template <typename T> +    inline void TPromise<T>::SetException(const TString& e) { +        EnsureInitialized(); +        State->SetException(std::make_exception_ptr(yexception() << e)); +    } -    template <typename T>  -    inline void TPromise<T>::SetException(std::exception_ptr e) {  -        EnsureInitialized();  -        State->SetException(std::move(e));  -    }  +    template <typename T> +    inline void TPromise<T>::SetException(std::exception_ptr e) { +        EnsureInitialized(); +        State->SetException(std::move(e)); +    } -    template <typename T>  +    template <typename T>      inline bool TPromise<T>::TrySetException(std::exception_ptr e) {          EnsureInitialized();          return State->TrySetException(std::move(e));      }      template <typename T> -    inline TFuture<T> TPromise<T>::GetFuture() const {  -        EnsureInitialized();  -        return TFuture<T>(State);  -    }  +    inline TFuture<T> TPromise<T>::GetFuture() const { +        EnsureInitialized(); +        return TFuture<T>(State); +    } -    template <typename T>  -    inline TPromise<T>::operator TFuture<T>() const {  -        return GetFuture();  -    }  +    template <typename T> +    inline TPromise<T>::operator TFuture<T>() const { +        return GetFuture(); +    } -    template <typename T>  -    inline bool TPromise<T>::Initialized() const {  -        return bool(State);  -    }  +    template <typename T> +    inline bool TPromise<T>::Initialized() const { +        return bool(State); +    } -    template <typename T>  -    inline void TPromise<T>::EnsureInitialized() const {  -        if (!State) {  -            ythrow TFutureException() << "state not initialized";  -        }  -    }  +    template <typename T> +    inline void TPromise<T>::EnsureInitialized() const { +        if (!State) { +            ythrow TFutureException() << "state not initialized"; +        } +    } -    ////////////////////////////////////////////////////////////////////////////////  +    ////////////////////////////////////////////////////////////////////////////////      inline TPromise<void>::TPromise(const TIntrusivePtr<TFutureState>& state) noexcept -        : State(state)  -    {  -    }  +        : State(state) +    { +    } -    inline void TPromise<void>::Swap(TPromise<void>& other) {  -        State.Swap(other.State);  -    }  +    inline void TPromise<void>::Swap(TPromise<void>& other) { +        State.Swap(other.State); +    } -    inline void TPromise<void>::GetValue() const {  -        EnsureInitialized();  -        State->GetValue();  -    }  +    inline void TPromise<void>::GetValue() const { +        EnsureInitialized(); +        State->GetValue(); +    } -    inline bool TPromise<void>::HasValue() const {  -        return State && State->HasValue();  -    }  +    inline bool TPromise<void>::HasValue() const { +        return State && State->HasValue(); +    } -    inline void TPromise<void>::SetValue() {  -        EnsureInitialized();  -        State->SetValue();  -    }  +    inline void TPromise<void>::SetValue() { +        EnsureInitialized(); +        State->SetValue(); +    }      inline bool TPromise<void>::TrySetValue() {          EnsureInitialized(); @@ -898,78 +898,78 @@ namespace NThreading {          }      } -    inline bool TPromise<void>::HasException() const {  -        return State && State->HasException();  -    }  +    inline bool TPromise<void>::HasException() const { +        return State && State->HasException(); +    } -    inline void TPromise<void>::SetException(const TString& e) {  -        EnsureInitialized();  -        State->SetException(std::make_exception_ptr(yexception() << e));  -    }  +    inline void TPromise<void>::SetException(const TString& e) { +        EnsureInitialized(); +        State->SetException(std::make_exception_ptr(yexception() << e)); +    } -    inline void TPromise<void>::SetException(std::exception_ptr e) {  -        EnsureInitialized();  -        State->SetException(std::move(e));  -    }  +    inline void TPromise<void>::SetException(std::exception_ptr e) { +        EnsureInitialized(); +        State->SetException(std::move(e)); +    }      inline bool TPromise<void>::TrySetException(std::exception_ptr e) {          EnsureInitialized();          return State->TrySetException(std::move(e));      } -    inline TFuture<void> TPromise<void>::GetFuture() const {  -        EnsureInitialized();  -        return TFuture<void>(State);  -    }  +    inline TFuture<void> TPromise<void>::GetFuture() const { +        EnsureInitialized(); +        return TFuture<void>(State); +    } -    inline TPromise<void>::operator TFuture<void>() const {  -        return GetFuture();  -    }  +    inline TPromise<void>::operator TFuture<void>() const { +        return GetFuture(); +    } -    inline bool TPromise<void>::Initialized() const {  -        return bool(State);  -    }  +    inline bool TPromise<void>::Initialized() const { +        return bool(State); +    } -    inline void TPromise<void>::EnsureInitialized() const {  -        if (!State) {  -            ythrow TFutureException() << "state not initialized";  -        }  -    }  +    inline void TPromise<void>::EnsureInitialized() const { +        if (!State) { +            ythrow TFutureException() << "state not initialized"; +        } +    } -    ////////////////////////////////////////////////////////////////////////////////  +    //////////////////////////////////////////////////////////////////////////////// -    template <typename T>  -    inline TPromise<T> NewPromise() {  -        return {new NImpl::TFutureState<T>()};  +    template <typename T> +    inline TPromise<T> NewPromise() { +        return {new NImpl::TFutureState<T>()};      } -    inline TPromise<void> NewPromise() {  -        return {new NImpl::TFutureState<void>()};  -    }  +    inline TPromise<void> NewPromise() { +        return {new NImpl::TFutureState<void>()}; +    } -    template <typename T>  -    inline TFuture<T> MakeFuture(const T& value) {  -        return {new NImpl::TFutureState<T>(value)};  -    }  +    template <typename T> +    inline TFuture<T> MakeFuture(const T& value) { +        return {new NImpl::TFutureState<T>(value)}; +    } -    template <typename T>  -    inline TFuture<std::remove_reference_t<T>> MakeFuture(T&& value) {  -        return {new NImpl::TFutureState<std::remove_reference_t<T>>(std::forward<T>(value))};  -    }  +    template <typename T> +    inline TFuture<std::remove_reference_t<T>> MakeFuture(T&& value) { +        return {new NImpl::TFutureState<std::remove_reference_t<T>>(std::forward<T>(value))}; +    } -    template <typename T>  -    inline TFuture<T> MakeFuture() {  -        struct TCache {  -            TFuture<T> Instance{new NImpl::TFutureState<T>(Default<T>())};  +    template <typename T> +    inline TFuture<T> MakeFuture() { +        struct TCache { +            TFuture<T> Instance{new NImpl::TFutureState<T>(Default<T>())};              TCache() {                  // Immediately advance state from ValueSet to ValueRead.                  // This should prevent corrupting shared value with an ExtractValue() call.                  Y_UNUSED(Instance.GetValue());              } -        };  -        return Singleton<TCache>()->Instance;  -    }  +        }; +        return Singleton<TCache>()->Instance; +    }      template <typename T>      inline TFuture<T> MakeErrorFuture(std::exception_ptr exception) @@ -977,10 +977,10 @@ namespace NThreading {          return {new NImpl::TFutureState<T>(std::move(exception), NImpl::TError::Error)};      } -    inline TFuture<void> MakeFuture() {  -        struct TCache {  -            TFuture<void> Instance{new NImpl::TFutureState<void>(true)};  -        };  -        return Singleton<TCache>()->Instance;  -    }  +    inline TFuture<void> MakeFuture() { +        struct TCache { +            TFuture<void> Instance{new NImpl::TFutureState<void>(true)}; +        }; +        return Singleton<TCache>()->Instance; +    }  } diff --git a/library/cpp/threading/future/core/future.h b/library/cpp/threading/future/core/future.h index 12623389cac..2e82bb953eb 100644 --- a/library/cpp/threading/future/core/future.h +++ b/library/cpp/threading/future/core/future.h @@ -12,51 +12,51 @@  #include <util/system/spinlock.h>  namespace NThreading { -    ////////////////////////////////////////////////////////////////////////////////  - -    struct TFutureException: public yexception {};  - -    // creates unset promise  -    template <typename T>  -    TPromise<T> NewPromise();  -    TPromise<void> NewPromise();  - -    // creates preset future  -    template <typename T>  -    TFuture<T> MakeFuture(const T& value);  -    template <typename T>  -    TFuture<std::remove_reference_t<T>> MakeFuture(T&& value);  -    template <typename T>  -    TFuture<T> MakeFuture();  +    //////////////////////////////////////////////////////////////////////////////// + +    struct TFutureException: public yexception {}; + +    // creates unset promise +    template <typename T> +    TPromise<T> NewPromise(); +    TPromise<void> NewPromise(); + +    // creates preset future +    template <typename T> +    TFuture<T> MakeFuture(const T& value); +    template <typename T> +    TFuture<std::remove_reference_t<T>> MakeFuture(T&& value); +    template <typename T> +    TFuture<T> MakeFuture();      template <typename T>      TFuture<T> MakeErrorFuture(std::exception_ptr exception); -    TFuture<void> MakeFuture();  +    TFuture<void> MakeFuture(); -    ////////////////////////////////////////////////////////////////////////////////  +    //////////////////////////////////////////////////////////////////////////////// -    namespace NImpl {  -        template <typename T>  -        class TFutureState;  +    namespace NImpl { +        template <typename T> +        class TFutureState; -        template <typename T>  -        struct TFutureType {  -            using TType = T;  -        };  +        template <typename T> +        struct TFutureType { +            using TType = T; +        }; -        template <typename T>  -        struct TFutureType<TFuture<T>> {  -            using TType = typename TFutureType<T>::TType;  -        };  +        template <typename T> +        struct TFutureType<TFuture<T>> { +            using TType = typename TFutureType<T>::TType; +        };          template <typename F, typename T>          struct TFutureCallResult {              // NOTE: separate class for msvc compatibility              using TType = decltype(std::declval<F&>()(std::declval<const TFuture<T>&>()));          }; -    }  +    } -    template <typename F>  -    using TFutureType = typename NImpl::TFutureType<F>::TType;  +    template <typename F> +    using TFutureType = typename NImpl::TFutureType<F>::TType;      template <typename F, typename T>      using TFutureCallResult = typename NImpl::TFutureCallResult<F, T>::TType; @@ -64,16 +64,16 @@ namespace NThreading {      //! Type of the future/promise state identifier      class TFutureStateId; -    ////////////////////////////////////////////////////////////////////////////////  +    //////////////////////////////////////////////////////////////////////////////// -    template <typename T>  -    class TFuture {  -        using TFutureState = NImpl::TFutureState<T>;  +    template <typename T> +    class TFuture { +        using TFutureState = NImpl::TFutureState<T>; -    private:  -        TIntrusivePtr<TFutureState> State;  +    private: +        TIntrusivePtr<TFutureState> State; -    public:  +    public:          using value_type = T;          TFuture() noexcept = default; @@ -83,54 +83,54 @@ namespace NThreading {          TFuture<T>& operator=(const TFuture<T>& other) noexcept = default;          TFuture<T>& operator=(TFuture<T>&& other) noexcept = default; -        void Swap(TFuture<T>& other);  +        void Swap(TFuture<T>& other); -        bool Initialized() const;  +        bool Initialized() const; -        bool HasValue() const;  -        const T& GetValue(TDuration timeout = TDuration::Zero()) const;  -        const T& GetValueSync() const;  -        T ExtractValue(TDuration timeout = TDuration::Zero());  -        T ExtractValueSync();  +        bool HasValue() const; +        const T& GetValue(TDuration timeout = TDuration::Zero()) const; +        const T& GetValueSync() const; +        T ExtractValue(TDuration timeout = TDuration::Zero()); +        T ExtractValueSync();          void TryRethrow() const; -        bool HasException() const;  +        bool HasException() const; -        void Wait() const;  -        bool Wait(TDuration timeout) const;  -        bool Wait(TInstant deadline) const;  +        void Wait() const; +        bool Wait(TDuration timeout) const; +        bool Wait(TInstant deadline) const; -        template <typename F>  -        const TFuture<T>& Subscribe(F&& callback) const;  +        template <typename F> +        const TFuture<T>& Subscribe(F&& callback) const;          // precondition: EnsureInitialized() passes          // postcondition: std::terminate is highly unlikely -        template <typename F>  +        template <typename F>          const TFuture<T>& NoexceptSubscribe(F&& callback) const noexcept;          template <typename F>          TFuture<TFutureType<TFutureCallResult<F, T>>> Apply(F&& func) const; -        TFuture<void> IgnoreResult() const;  +        TFuture<void> IgnoreResult() const;          //! If the future is initialized returns the future state identifier. Otherwise returns an empty optional          /** The state identifier is guaranteed to be unique during the future state lifetime and could be reused after its death          **/          TMaybe<TFutureStateId> StateId() const noexcept; -        void EnsureInitialized() const;  -    };  +        void EnsureInitialized() const; +    }; -    ////////////////////////////////////////////////////////////////////////////////  +    //////////////////////////////////////////////////////////////////////////////// -    template <>  -    class TFuture<void> {  -        using TFutureState = NImpl::TFutureState<void>;  +    template <> +    class TFuture<void> { +        using TFutureState = NImpl::TFutureState<void>; -    private:  +    private:          TIntrusivePtr<TFutureState> State = nullptr; -    public:  +    public:          using value_type = void;          TFuture() noexcept = default; @@ -140,34 +140,34 @@ namespace NThreading {          TFuture<void>& operator=(const TFuture<void>& other) noexcept = default;          TFuture<void>& operator=(TFuture<void>&& other) noexcept = default; -        void Swap(TFuture<void>& other);  +        void Swap(TFuture<void>& other); -        bool Initialized() const;  +        bool Initialized() const; -        bool HasValue() const;  -        void GetValue(TDuration timeout = TDuration::Zero()) const;  -        void GetValueSync() const;  +        bool HasValue() const; +        void GetValue(TDuration timeout = TDuration::Zero()) const; +        void GetValueSync() const;          void TryRethrow() const; -        bool HasException() const;  +        bool HasException() const; -        void Wait() const;  -        bool Wait(TDuration timeout) const;  -        bool Wait(TInstant deadline) const;  +        void Wait() const; +        bool Wait(TDuration timeout) const; +        bool Wait(TInstant deadline) const; -        template <typename F>  -        const TFuture<void>& Subscribe(F&& callback) const;  +        template <typename F> +        const TFuture<void>& Subscribe(F&& callback) const;          // precondition: EnsureInitialized() passes          // postcondition: std::terminate is highly unlikely -        template <typename F>  +        template <typename F>          const TFuture<void>& NoexceptSubscribe(F&& callback) const noexcept;          template <typename F>          TFuture<TFutureType<TFutureCallResult<F, void>>> Apply(F&& func) const; -        template <typename R>  -        TFuture<R> Return(const R& value) const;  +        template <typename R> +        TFuture<R> Return(const R& value) const;          TFuture<void> IgnoreResult() const {              return *this; @@ -178,19 +178,19 @@ namespace NThreading {          **/          TMaybe<TFutureStateId> StateId() const noexcept; -        void EnsureInitialized() const;  -    };  +        void EnsureInitialized() const; +    }; -    ////////////////////////////////////////////////////////////////////////////////  +    //////////////////////////////////////////////////////////////////////////////// -    template <typename T>  -    class TPromise {  -        using TFutureState = NImpl::TFutureState<T>;  +    template <typename T> +    class TPromise { +        using TFutureState = NImpl::TFutureState<T>; -    private:  +    private:          TIntrusivePtr<TFutureState> State = nullptr; -    public:  +    public:          TPromise() noexcept = default;          TPromise(const TPromise<T>& other) noexcept = default;          TPromise(TPromise<T>&& other) noexcept = default; @@ -198,43 +198,43 @@ namespace NThreading {          TPromise<T>& operator=(const TPromise<T>& other) noexcept = default;          TPromise<T>& operator=(TPromise<T>&& other) noexcept = default; -        void Swap(TPromise<T>& other);  +        void Swap(TPromise<T>& other); -        bool Initialized() const;  +        bool Initialized() const; -        bool HasValue() const;  -        const T& GetValue() const;  -        T ExtractValue();  +        bool HasValue() const; +        const T& GetValue() const; +        T ExtractValue(); -        void SetValue(const T& value);  -        void SetValue(T&& value);  +        void SetValue(const T& value); +        void SetValue(T&& value);          bool TrySetValue(const T& value);          bool TrySetValue(T&& value);          void TryRethrow() const; -        bool HasException() const;  -        void SetException(const TString& e);  -        void SetException(std::exception_ptr e);  +        bool HasException() const; +        void SetException(const TString& e); +        void SetException(std::exception_ptr e);          bool TrySetException(std::exception_ptr e); -        TFuture<T> GetFuture() const;  -        operator TFuture<T>() const;  +        TFuture<T> GetFuture() const; +        operator TFuture<T>() const; -    private:  -        void EnsureInitialized() const;  -    };  +    private: +        void EnsureInitialized() const; +    }; -    ////////////////////////////////////////////////////////////////////////////////  +    //////////////////////////////////////////////////////////////////////////////// -    template <>  -    class TPromise<void> {  -        using TFutureState = NImpl::TFutureState<void>;  +    template <> +    class TPromise<void> { +        using TFutureState = NImpl::TFutureState<void>; -    private:  -        TIntrusivePtr<TFutureState> State;  +    private: +        TIntrusivePtr<TFutureState> State; -    public:  +    public:          TPromise() noexcept = default;          TPromise(const TPromise<void>& other) noexcept = default;          TPromise(TPromise<void>&& other) noexcept = default; @@ -242,30 +242,30 @@ namespace NThreading {          TPromise<void>& operator=(const TPromise<void>& other) noexcept = default;          TPromise<void>& operator=(TPromise<void>&& other) noexcept = default; -        void Swap(TPromise<void>& other);  +        void Swap(TPromise<void>& other); -        bool Initialized() const;  +        bool Initialized() const; -        bool HasValue() const;  -        void GetValue() const;  +        bool HasValue() const; +        void GetValue() const; -        void SetValue();  +        void SetValue();          bool TrySetValue();          void TryRethrow() const; -        bool HasException() const;  -        void SetException(const TString& e);  -        void SetException(std::exception_ptr e);  +        bool HasException() const; +        void SetException(const TString& e); +        void SetException(std::exception_ptr e);          bool TrySetException(std::exception_ptr e); -        TFuture<void> GetFuture() const;  -        operator TFuture<void>() const;  +        TFuture<void> GetFuture() const; +        operator TFuture<void>() const; -    private:  -        void EnsureInitialized() const;  -    };  +    private: +        void EnsureInitialized() const; +    }; -}  +}  #define INCLUDE_FUTURE_INL_H  #include "future-inl.h" diff --git a/library/cpp/threading/future/future_ut.cpp b/library/cpp/threading/future/future_ut.cpp index 636b113f2fa..05950a568d4 100644 --- a/library/cpp/threading/future/future_ut.cpp +++ b/library/cpp/threading/future/future_ut.cpp @@ -62,180 +62,180 @@ namespace {  } -    ////////////////////////////////////////////////////////////////////////////////  +    ////////////////////////////////////////////////////////////////////////////////      Y_UNIT_TEST_SUITE(TFutureTest) {          Y_UNIT_TEST(ShouldInitiallyHasNoValue) { -            TPromise<int> promise;  -            UNIT_ASSERT(!promise.HasValue());  +            TPromise<int> promise; +            UNIT_ASSERT(!promise.HasValue()); -            promise = NewPromise<int>();  -            UNIT_ASSERT(!promise.HasValue());  +            promise = NewPromise<int>(); +            UNIT_ASSERT(!promise.HasValue()); -            TFuture<int> future;  -            UNIT_ASSERT(!future.HasValue());  +            TFuture<int> future; +            UNIT_ASSERT(!future.HasValue()); -            future = promise.GetFuture();  -            UNIT_ASSERT(!future.HasValue());  -        }  +            future = promise.GetFuture(); +            UNIT_ASSERT(!future.HasValue()); +        }          Y_UNIT_TEST(ShouldInitiallyHasNoValueVoid) { -            TPromise<void> promise;  -            UNIT_ASSERT(!promise.HasValue());  +            TPromise<void> promise; +            UNIT_ASSERT(!promise.HasValue()); -            promise = NewPromise();  -            UNIT_ASSERT(!promise.HasValue());  +            promise = NewPromise(); +            UNIT_ASSERT(!promise.HasValue()); -            TFuture<void> future;  -            UNIT_ASSERT(!future.HasValue());  +            TFuture<void> future; +            UNIT_ASSERT(!future.HasValue()); -            future = promise.GetFuture();  -            UNIT_ASSERT(!future.HasValue());  -        }  +            future = promise.GetFuture(); +            UNIT_ASSERT(!future.HasValue()); +        }          Y_UNIT_TEST(ShouldStoreValue) { -            TPromise<int> promise = NewPromise<int>();  -            promise.SetValue(123);  -            UNIT_ASSERT(promise.HasValue());  -            UNIT_ASSERT_EQUAL(promise.GetValue(), 123);  +            TPromise<int> promise = NewPromise<int>(); +            promise.SetValue(123); +            UNIT_ASSERT(promise.HasValue()); +            UNIT_ASSERT_EQUAL(promise.GetValue(), 123); -            TFuture<int> future = promise.GetFuture();  -            UNIT_ASSERT(future.HasValue());  -            UNIT_ASSERT_EQUAL(future.GetValue(), 123);  +            TFuture<int> future = promise.GetFuture(); +            UNIT_ASSERT(future.HasValue()); +            UNIT_ASSERT_EQUAL(future.GetValue(), 123); -            future = MakeFuture(345);  -            UNIT_ASSERT(future.HasValue());  -            UNIT_ASSERT_EQUAL(future.GetValue(), 345);  -        }  +            future = MakeFuture(345); +            UNIT_ASSERT(future.HasValue()); +            UNIT_ASSERT_EQUAL(future.GetValue(), 345); +        }          Y_UNIT_TEST(ShouldStoreValueVoid) { -            TPromise<void> promise = NewPromise();  -            promise.SetValue();  -            UNIT_ASSERT(promise.HasValue());  +            TPromise<void> promise = NewPromise(); +            promise.SetValue(); +            UNIT_ASSERT(promise.HasValue()); -            TFuture<void> future = promise.GetFuture();  -            UNIT_ASSERT(future.HasValue());  +            TFuture<void> future = promise.GetFuture(); +            UNIT_ASSERT(future.HasValue()); -            future = MakeFuture();  -            UNIT_ASSERT(future.HasValue());  -        }  +            future = MakeFuture(); +            UNIT_ASSERT(future.HasValue()); +        } -        struct TTestCallback {  -            int Value;  +        struct TTestCallback { +            int Value; -            TTestCallback(int value)  -                : Value(value)  -            {  -            }  +            TTestCallback(int value) +                : Value(value) +            { +            } -            void Callback(const TFuture<int>& future) {  -                Value += future.GetValue();  -            }  +            void Callback(const TFuture<int>& future) { +                Value += future.GetValue(); +            } -            int Func(const TFuture<int>& future) {  -                return (Value += future.GetValue());  -            }  +            int Func(const TFuture<int>& future) { +                return (Value += future.GetValue()); +            } -            void VoidFunc(const TFuture<int>& future) {  -                future.GetValue();  -            }  +            void VoidFunc(const TFuture<int>& future) { +                future.GetValue(); +            } -            TFuture<int> FutureFunc(const TFuture<int>& future) {  -                return MakeFuture(Value += future.GetValue());  -            }  +            TFuture<int> FutureFunc(const TFuture<int>& future) { +                return MakeFuture(Value += future.GetValue()); +            } -            TPromise<void> Signal = NewPromise();  -            TFuture<void> FutureVoidFunc(const TFuture<int>& future) {  -                future.GetValue();  -                return Signal;  -            }  -        };  +            TPromise<void> Signal = NewPromise(); +            TFuture<void> FutureVoidFunc(const TFuture<int>& future) { +                future.GetValue(); +                return Signal; +            } +        };          Y_UNIT_TEST(ShouldInvokeCallback) { -            TPromise<int> promise = NewPromise<int>();  +            TPromise<int> promise = NewPromise<int>(); -            TTestCallback callback(123);  -            TFuture<int> future = promise.GetFuture()  -                                      .Subscribe([&](const TFuture<int>& theFuture) { return callback.Callback(theFuture); });  +            TTestCallback callback(123); +            TFuture<int> future = promise.GetFuture() +                                      .Subscribe([&](const TFuture<int>& theFuture) { return callback.Callback(theFuture); }); -            promise.SetValue(456);  -            UNIT_ASSERT_EQUAL(future.GetValue(), 456);  -            UNIT_ASSERT_EQUAL(callback.Value, 123 + 456);  -        }  +            promise.SetValue(456); +            UNIT_ASSERT_EQUAL(future.GetValue(), 456); +            UNIT_ASSERT_EQUAL(callback.Value, 123 + 456); +        }          Y_UNIT_TEST(ShouldApplyFunc) { -            TPromise<int> promise = NewPromise<int>();  +            TPromise<int> promise = NewPromise<int>(); -            TTestCallback callback(123);  -            TFuture<int> future = promise.GetFuture()  +            TTestCallback callback(123); +            TFuture<int> future = promise.GetFuture()                                        .Apply([&](const auto& theFuture) { return callback.Func(theFuture); }); -            promise.SetValue(456);  -            UNIT_ASSERT_EQUAL(future.GetValue(), 123 + 456);  -            UNIT_ASSERT_EQUAL(callback.Value, 123 + 456);  -        }  +            promise.SetValue(456); +            UNIT_ASSERT_EQUAL(future.GetValue(), 123 + 456); +            UNIT_ASSERT_EQUAL(callback.Value, 123 + 456); +        }          Y_UNIT_TEST(ShouldApplyVoidFunc) { -            TPromise<int> promise = NewPromise<int>();  +            TPromise<int> promise = NewPromise<int>(); -            TTestCallback callback(123);  -            TFuture<void> future = promise.GetFuture()  +            TTestCallback callback(123); +            TFuture<void> future = promise.GetFuture()                                         .Apply([&](const auto& theFuture) { return callback.VoidFunc(theFuture); }); -            promise.SetValue(456);  -            UNIT_ASSERT(future.HasValue());  -        }  +            promise.SetValue(456); +            UNIT_ASSERT(future.HasValue()); +        }          Y_UNIT_TEST(ShouldApplyFutureFunc) { -            TPromise<int> promise = NewPromise<int>();  +            TPromise<int> promise = NewPromise<int>(); -            TTestCallback callback(123);  -            TFuture<int> future = promise.GetFuture()  +            TTestCallback callback(123); +            TFuture<int> future = promise.GetFuture()                                        .Apply([&](const auto& theFuture) { return callback.FutureFunc(theFuture); }); -            promise.SetValue(456);  -            UNIT_ASSERT_EQUAL(future.GetValue(), 123 + 456);  -            UNIT_ASSERT_EQUAL(callback.Value, 123 + 456);  -        }  +            promise.SetValue(456); +            UNIT_ASSERT_EQUAL(future.GetValue(), 123 + 456); +            UNIT_ASSERT_EQUAL(callback.Value, 123 + 456); +        }          Y_UNIT_TEST(ShouldApplyFutureVoidFunc) { -            TPromise<int> promise = NewPromise<int>();  +            TPromise<int> promise = NewPromise<int>(); -            TTestCallback callback(123);  -            TFuture<void> future = promise.GetFuture()  +            TTestCallback callback(123); +            TFuture<void> future = promise.GetFuture()                                         .Apply([&](const auto& theFuture) { return callback.FutureVoidFunc(theFuture); }); -            promise.SetValue(456);  -            UNIT_ASSERT(!future.HasValue());  +            promise.SetValue(456); +            UNIT_ASSERT(!future.HasValue()); -            callback.Signal.SetValue();  -            UNIT_ASSERT(future.HasValue());  -        }  +            callback.Signal.SetValue(); +            UNIT_ASSERT(future.HasValue()); +        }          Y_UNIT_TEST(ShouldIgnoreResultIfAsked) { -            TPromise<int> promise = NewPromise<int>();  +            TPromise<int> promise = NewPromise<int>(); -            TTestCallback callback(123);  -            TFuture<int> future = promise.GetFuture().IgnoreResult().Return(42);  +            TTestCallback callback(123); +            TFuture<int> future = promise.GetFuture().IgnoreResult().Return(42); -            promise.SetValue(456);  -            UNIT_ASSERT_EQUAL(future.GetValue(), 42);  -        }  +            promise.SetValue(456); +            UNIT_ASSERT_EQUAL(future.GetValue(), 42); +        } -        class TCustomException: public yexception {  -        };  +        class TCustomException: public yexception { +        };          Y_UNIT_TEST(ShouldRethrowException) { -            TPromise<int> promise = NewPromise<int>();  -            try {  -                ythrow TCustomException();  -            } catch (...) {  -                promise.SetException(std::current_exception());  -            }  - -            UNIT_ASSERT(!promise.HasValue());  -            UNIT_ASSERT(promise.HasException());  -            UNIT_ASSERT_EXCEPTION(promise.GetValue(), TCustomException);  +            TPromise<int> promise = NewPromise<int>(); +            try { +                ythrow TCustomException(); +            } catch (...) { +                promise.SetException(std::current_exception()); +            } + +            UNIT_ASSERT(!promise.HasValue()); +            UNIT_ASSERT(promise.HasException()); +            UNIT_ASSERT_EXCEPTION(promise.GetValue(), TCustomException);              UNIT_ASSERT_EXCEPTION(promise.TryRethrow(), TCustomException);          } @@ -261,36 +261,36 @@ namespace {          Y_UNIT_TEST(ShouldWaitExceptionOrAll) { -            TPromise<void> promise1 = NewPromise();  -            TPromise<void> promise2 = NewPromise();  +            TPromise<void> promise1 = NewPromise(); +            TPromise<void> promise2 = NewPromise();              TFuture<void> future = WaitExceptionOrAll(promise1, promise2); -            UNIT_ASSERT(!future.HasValue());  +            UNIT_ASSERT(!future.HasValue()); -            promise1.SetValue();  -            UNIT_ASSERT(!future.HasValue());  +            promise1.SetValue(); +            UNIT_ASSERT(!future.HasValue()); -            promise2.SetValue();  -            UNIT_ASSERT(future.HasValue());  -        }  +            promise2.SetValue(); +            UNIT_ASSERT(future.HasValue()); +        }          Y_UNIT_TEST(ShouldWaitExceptionOrAllVector) { -            TPromise<void> promise1 = NewPromise();  -            TPromise<void> promise2 = NewPromise();  +            TPromise<void> promise1 = NewPromise(); +            TPromise<void> promise2 = NewPromise(); -            TVector<TFuture<void>> promises;  -            promises.push_back(promise1);  -            promises.push_back(promise2);  +            TVector<TFuture<void>> promises; +            promises.push_back(promise1); +            promises.push_back(promise2);              TFuture<void> future = WaitExceptionOrAll(promises); -            UNIT_ASSERT(!future.HasValue());  +            UNIT_ASSERT(!future.HasValue()); -            promise1.SetValue();  -            UNIT_ASSERT(!future.HasValue());  +            promise1.SetValue(); +            UNIT_ASSERT(!future.HasValue()); -            promise2.SetValue();  -            UNIT_ASSERT(future.HasValue());  -        }  +            promise2.SetValue(); +            UNIT_ASSERT(future.HasValue()); +        }          Y_UNIT_TEST(ShouldWaitExceptionOrAllVectorWithValueType) {              TPromise<int> promise1 = NewPromise<int>(); @@ -311,47 +311,47 @@ namespace {          }          Y_UNIT_TEST(ShouldWaitExceptionOrAllList) { -            TPromise<void> promise1 = NewPromise();  -            TPromise<void> promise2 = NewPromise();  +            TPromise<void> promise1 = NewPromise(); +            TPromise<void> promise2 = NewPromise(); -            std::list<TFuture<void>> promises;  -            promises.push_back(promise1);  -            promises.push_back(promise2);  +            std::list<TFuture<void>> promises; +            promises.push_back(promise1); +            promises.push_back(promise2);              TFuture<void> future = WaitExceptionOrAll(promises); -            UNIT_ASSERT(!future.HasValue());  +            UNIT_ASSERT(!future.HasValue()); -            promise1.SetValue();  -            UNIT_ASSERT(!future.HasValue());  +            promise1.SetValue(); +            UNIT_ASSERT(!future.HasValue()); -            promise2.SetValue();  -            UNIT_ASSERT(future.HasValue());  -        }  +            promise2.SetValue(); +            UNIT_ASSERT(future.HasValue()); +        }          Y_UNIT_TEST(ShouldWaitExceptionOrAllVectorEmpty) { -            TVector<TFuture<void>> promises;  +            TVector<TFuture<void>> promises;              TFuture<void> future = WaitExceptionOrAll(promises); -            UNIT_ASSERT(future.HasValue());  -        }  +            UNIT_ASSERT(future.HasValue()); +        }          Y_UNIT_TEST(ShouldWaitAnyVector) { -            TPromise<void> promise1 = NewPromise();  -            TPromise<void> promise2 = NewPromise();  +            TPromise<void> promise1 = NewPromise(); +            TPromise<void> promise2 = NewPromise(); -            TVector<TFuture<void>> promises;  -            promises.push_back(promise1);  -            promises.push_back(promise2);  +            TVector<TFuture<void>> promises; +            promises.push_back(promise1); +            promises.push_back(promise2); -            TFuture<void> future = WaitAny(promises);  -            UNIT_ASSERT(!future.HasValue());  +            TFuture<void> future = WaitAny(promises); +            UNIT_ASSERT(!future.HasValue()); -            promise1.SetValue();  -            UNIT_ASSERT(future.HasValue());  +            promise1.SetValue(); +            UNIT_ASSERT(future.HasValue()); -            promise2.SetValue();  -            UNIT_ASSERT(future.HasValue());  -        }  +            promise2.SetValue(); +            UNIT_ASSERT(future.HasValue()); +        }          Y_UNIT_TEST(ShouldWaitAnyVectorWithValueType) { @@ -373,112 +373,112 @@ namespace {          }          Y_UNIT_TEST(ShouldWaitAnyList) { -            TPromise<void> promise1 = NewPromise();  -            TPromise<void> promise2 = NewPromise();  +            TPromise<void> promise1 = NewPromise(); +            TPromise<void> promise2 = NewPromise(); -            std::list<TFuture<void>> promises;  -            promises.push_back(promise1);  -            promises.push_back(promise2);  +            std::list<TFuture<void>> promises; +            promises.push_back(promise1); +            promises.push_back(promise2); -            TFuture<void> future = WaitAny(promises);  -            UNIT_ASSERT(!future.HasValue());  +            TFuture<void> future = WaitAny(promises); +            UNIT_ASSERT(!future.HasValue()); -            promise1.SetValue();  -            UNIT_ASSERT(future.HasValue());  +            promise1.SetValue(); +            UNIT_ASSERT(future.HasValue()); -            promise2.SetValue();  -            UNIT_ASSERT(future.HasValue());  -        }  +            promise2.SetValue(); +            UNIT_ASSERT(future.HasValue()); +        }          Y_UNIT_TEST(ShouldWaitAnyVectorEmpty) { -            TVector<TFuture<void>> promises;  +            TVector<TFuture<void>> promises; -            TFuture<void> future = WaitAny(promises);  -            UNIT_ASSERT(future.HasValue());  -        }  +            TFuture<void> future = WaitAny(promises); +            UNIT_ASSERT(future.HasValue()); +        }          Y_UNIT_TEST(ShouldWaitAny) { -            TPromise<void> promise1 = NewPromise();  -            TPromise<void> promise2 = NewPromise();  +            TPromise<void> promise1 = NewPromise(); +            TPromise<void> promise2 = NewPromise(); -            TFuture<void> future = WaitAny(promise1, promise2);  -            UNIT_ASSERT(!future.HasValue());  +            TFuture<void> future = WaitAny(promise1, promise2); +            UNIT_ASSERT(!future.HasValue()); -            promise1.SetValue();  -            UNIT_ASSERT(future.HasValue());  +            promise1.SetValue(); +            UNIT_ASSERT(future.HasValue()); -            promise2.SetValue();  -            UNIT_ASSERT(future.HasValue());  -        }  +            promise2.SetValue(); +            UNIT_ASSERT(future.HasValue()); +        }          Y_UNIT_TEST(ShouldStoreTypesWithoutDefaultConstructor) { -            // compileability test  -            struct TRec {  -                explicit TRec(int) {  -                }  -            };  +            // compileability test +            struct TRec { +                explicit TRec(int) { +                } +            }; -            auto promise = NewPromise<TRec>();  -            promise.SetValue(TRec(1));  +            auto promise = NewPromise<TRec>(); +            promise.SetValue(TRec(1)); -            auto future = MakeFuture(TRec(1));  -            const auto& rec = future.GetValue();  -            Y_UNUSED(rec);  -        }  +            auto future = MakeFuture(TRec(1)); +            const auto& rec = future.GetValue(); +            Y_UNUSED(rec); +        }          Y_UNIT_TEST(ShouldStoreMovableTypes) { -            // compileability test  -            struct TRec : TMoveOnly {  -                explicit TRec(int) {  -                }  -            };  +            // compileability test +            struct TRec : TMoveOnly { +                explicit TRec(int) { +                } +            }; -            auto promise = NewPromise<TRec>();  -            promise.SetValue(TRec(1));  +            auto promise = NewPromise<TRec>(); +            promise.SetValue(TRec(1)); -            auto future = MakeFuture(TRec(1));  -            const auto& rec = future.GetValue();  -            Y_UNUSED(rec);  -        }  +            auto future = MakeFuture(TRec(1)); +            const auto& rec = future.GetValue(); +            Y_UNUSED(rec); +        }          Y_UNIT_TEST(ShouldMoveMovableTypes) { -            // compileability test  -            struct TRec : TMoveOnly {  -                explicit TRec(int) {  -                }  -            };  +            // compileability test +            struct TRec : TMoveOnly { +                explicit TRec(int) { +                } +            }; -            auto promise = NewPromise<TRec>();  -            promise.SetValue(TRec(1));  +            auto promise = NewPromise<TRec>(); +            promise.SetValue(TRec(1)); -            auto future = MakeFuture(TRec(1));  -            auto rec = future.ExtractValue();  -            Y_UNUSED(rec);  -        }  +            auto future = MakeFuture(TRec(1)); +            auto rec = future.ExtractValue(); +            Y_UNUSED(rec); +        }          Y_UNIT_TEST(ShouldNotExtractAfterGet) { -            TPromise<int> promise = NewPromise<int>();  -            promise.SetValue(123);  -            UNIT_ASSERT(promise.HasValue());  -            UNIT_ASSERT_EQUAL(promise.GetValue(), 123);  -            UNIT_CHECK_GENERATED_EXCEPTION(promise.ExtractValue(), TFutureException);  -        }  +            TPromise<int> promise = NewPromise<int>(); +            promise.SetValue(123); +            UNIT_ASSERT(promise.HasValue()); +            UNIT_ASSERT_EQUAL(promise.GetValue(), 123); +            UNIT_CHECK_GENERATED_EXCEPTION(promise.ExtractValue(), TFutureException); +        }          Y_UNIT_TEST(ShouldNotGetAfterExtract) { -            TPromise<int> promise = NewPromise<int>();  -            promise.SetValue(123);  -            UNIT_ASSERT(promise.HasValue());  -            UNIT_ASSERT_EQUAL(promise.ExtractValue(), 123);  -            UNIT_CHECK_GENERATED_EXCEPTION(promise.GetValue(), TFutureException);  -        }  +            TPromise<int> promise = NewPromise<int>(); +            promise.SetValue(123); +            UNIT_ASSERT(promise.HasValue()); +            UNIT_ASSERT_EQUAL(promise.ExtractValue(), 123); +            UNIT_CHECK_GENERATED_EXCEPTION(promise.GetValue(), TFutureException); +        }          Y_UNIT_TEST(ShouldNotExtractAfterExtract) { -            TPromise<int> promise = NewPromise<int>();  -            promise.SetValue(123);  -            UNIT_ASSERT(promise.HasValue());  -            UNIT_ASSERT_EQUAL(promise.ExtractValue(), 123);  -            UNIT_CHECK_GENERATED_EXCEPTION(promise.ExtractValue(), TFutureException);  -        }  +            TPromise<int> promise = NewPromise<int>(); +            promise.SetValue(123); +            UNIT_ASSERT(promise.HasValue()); +            UNIT_ASSERT_EQUAL(promise.ExtractValue(), 123); +            UNIT_CHECK_GENERATED_EXCEPTION(promise.ExtractValue(), TFutureException); +        }          Y_UNIT_TEST(ShouldNotExtractFromSharedDefault) {              UNIT_CHECK_GENERATED_EXCEPTION(MakeFuture<int>().ExtractValue(), TFutureException); diff --git a/library/cpp/threading/future/legacy_future.h b/library/cpp/threading/future/legacy_future.h index c699aadf5cd..6f1eabad73b 100644 --- a/library/cpp/threading/future/legacy_future.h +++ b/library/cpp/threading/future/legacy_future.h @@ -4,79 +4,79 @@  #include "future.h"  #include <util/thread/factory.h> -  +  #include <functional> -  +  namespace NThreading {      template <typename TR, bool IgnoreException>      class TLegacyFuture: public IThreadFactory::IThreadAble, TNonCopyable { -    public:  -        typedef TR(TFunctionSignature)();  -        using TFunctionObjectType = std::function<TFunctionSignature>;  -        using TResult = typename TFunctionObjectType::result_type;  +    public: +        typedef TR(TFunctionSignature)(); +        using TFunctionObjectType = std::function<TFunctionSignature>; +        using TResult = typename TFunctionObjectType::result_type; -    private:  -        TFunctionObjectType Func_;  -        TPromise<TResult> Result_;  +    private: +        TFunctionObjectType Func_; +        TPromise<TResult> Result_;          THolder<IThreadFactory::IThread> Thread_; -    public:  +    public:          inline TLegacyFuture(const TFunctionObjectType func, IThreadFactory* pool = SystemThreadFactory()) -            : Func_(func)  -            , Result_(NewPromise<TResult>())  -            , Thread_(pool->Run(this))  -        {  -        }  +            : Func_(func) +            , Result_(NewPromise<TResult>()) +            , Thread_(pool->Run(this)) +        { +        } -        inline ~TLegacyFuture() override {  -            this->Join();  -        }  +        inline ~TLegacyFuture() override { +            this->Join(); +        } -        inline TResult Get() {  -            this->Join();  -            return Result_.GetValue();  -        }  +        inline TResult Get() { +            this->Join(); +            return Result_.GetValue(); +        } -    private:  -        inline void Join() {  -            if (Thread_) {  -                Thread_->Join();  -                Thread_.Destroy();  -            }  +    private: +        inline void Join() { +            if (Thread_) { +                Thread_->Join(); +                Thread_.Destroy(); +            }          } -        template <typename Result, bool IgnoreException_>  -        struct TExecutor {  -            static void SetPromise(TPromise<Result>& promise, const TFunctionObjectType& func) {  -                if (IgnoreException_) {  -                    try {  -                        promise.SetValue(func());  -                    } catch (...) {  -                    }  -                } else {  +        template <typename Result, bool IgnoreException_> +        struct TExecutor { +            static void SetPromise(TPromise<Result>& promise, const TFunctionObjectType& func) { +                if (IgnoreException_) { +                    try { +                        promise.SetValue(func()); +                    } catch (...) { +                    } +                } else {                      promise.SetValue(func());                  }              } -        };  +        }; -        template <bool IgnoreException_>  -        struct TExecutor<void, IgnoreException_> {  -            static void SetPromise(TPromise<void>& promise, const TFunctionObjectType& func) {  -                if (IgnoreException_) {  -                    try {  -                        func();  -                        promise.SetValue();  -                    } catch (...) {  -                    }  -                } else {  +        template <bool IgnoreException_> +        struct TExecutor<void, IgnoreException_> { +            static void SetPromise(TPromise<void>& promise, const TFunctionObjectType& func) { +                if (IgnoreException_) { +                    try { +                        func(); +                        promise.SetValue(); +                    } catch (...) { +                    } +                } else {                      func();                      promise.SetValue();                  }              } -        };  -  -        void DoExecute() override {  -            TExecutor<TResult, IgnoreException>::SetPromise(Result_, Func_);  +        }; + +        void DoExecute() override { +            TExecutor<TResult, IgnoreException>::SetPromise(Result_, Func_);          }      }; diff --git a/library/cpp/threading/future/legacy_future_ut.cpp b/library/cpp/threading/future/legacy_future_ut.cpp index 96b46ccebf8..ff63db17250 100644 --- a/library/cpp/threading/future/legacy_future_ut.cpp +++ b/library/cpp/threading/future/legacy_future_ut.cpp @@ -4,69 +4,69 @@  namespace NThreading {      Y_UNIT_TEST_SUITE(TLegacyFutureTest) { -        int intf() {  -            return 17;  -        }  +        int intf() { +            return 17; +        }          Y_UNIT_TEST(TestIntFunction) { -            TLegacyFuture<int> f((&intf));  -            UNIT_ASSERT_VALUES_EQUAL(17, f.Get());  -        }  +            TLegacyFuture<int> f((&intf)); +            UNIT_ASSERT_VALUES_EQUAL(17, f.Get()); +        } -        static int r;  +        static int r; -        void voidf() {  -            r = 18;  -        }  +        void voidf() { +            r = 18; +        }          Y_UNIT_TEST(TestVoidFunction) { -            r = 0;  -            TLegacyFuture<> f((&voidf));  -            f.Get();  -            UNIT_ASSERT_VALUES_EQUAL(18, r);  -        }  +            r = 0; +            TLegacyFuture<> f((&voidf)); +            f.Get(); +            UNIT_ASSERT_VALUES_EQUAL(18, r); +        } -        struct TSampleClass {  -            int mValue;  +        struct TSampleClass { +            int mValue; -            TSampleClass(int value)  -                : mValue(value)  -            {  -            }  +            TSampleClass(int value) +                : mValue(value) +            { +            } -            int Calc() {  -                return mValue + 1;  -            }  -        };  +            int Calc() { +                return mValue + 1; +            } +        };          Y_UNIT_TEST(TestMethod) { -            TLegacyFuture<int> f11(std::bind(&TSampleClass::Calc, TSampleClass(3)));  -            UNIT_ASSERT_VALUES_EQUAL(4, f11.Get());  +            TLegacyFuture<int> f11(std::bind(&TSampleClass::Calc, TSampleClass(3))); +            UNIT_ASSERT_VALUES_EQUAL(4, f11.Get());              TLegacyFuture<int> f12(std::bind(&TSampleClass::Calc, TSampleClass(3)), SystemThreadFactory()); -            UNIT_ASSERT_VALUES_EQUAL(4, f12.Get());  +            UNIT_ASSERT_VALUES_EQUAL(4, f12.Get()); -            TSampleClass c(5);  +            TSampleClass c(5); -            TLegacyFuture<int> f21(std::bind(&TSampleClass::Calc, std::ref(c)));  -            UNIT_ASSERT_VALUES_EQUAL(6, f21.Get());  +            TLegacyFuture<int> f21(std::bind(&TSampleClass::Calc, std::ref(c))); +            UNIT_ASSERT_VALUES_EQUAL(6, f21.Get());              TLegacyFuture<int> f22(std::bind(&TSampleClass::Calc, std::ref(c)), SystemThreadFactory()); -            UNIT_ASSERT_VALUES_EQUAL(6, f22.Get());  -        }  +            UNIT_ASSERT_VALUES_EQUAL(6, f22.Get()); +        }          struct TSomeThreadPool: public IThreadFactory {};          Y_UNIT_TEST(TestFunction) { -            std::function<int()> f((&intf));  +            std::function<int()> f((&intf)); -            UNIT_ASSERT_VALUES_EQUAL(17, TLegacyFuture<int>(f).Get());  +            UNIT_ASSERT_VALUES_EQUAL(17, TLegacyFuture<int>(f).Get());              UNIT_ASSERT_VALUES_EQUAL(17, TLegacyFuture<int>(f, SystemThreadFactory()).Get()); -            if (false) {  -                TSomeThreadPool* q = nullptr;  -                TLegacyFuture<int>(f, q); // just check compiles, do not start  -            }  +            if (false) { +                TSomeThreadPool* q = nullptr; +                TLegacyFuture<int>(f, q); // just check compiles, do not start +            }          }      } diff --git a/library/cpp/threading/future/perf/main.cpp b/library/cpp/threading/future/perf/main.cpp index c7da5a51f30..5a0690af473 100644 --- a/library/cpp/threading/future/perf/main.cpp +++ b/library/cpp/threading/future/perf/main.cpp @@ -7,44 +7,44 @@  using namespace NThreading;  template <typename T> -void TestAllocPromise(const NBench::NCpu::TParams& iface) {  -    for (const auto it : xrange(iface.Iterations())) {  +void TestAllocPromise(const NBench::NCpu::TParams& iface) { +    for (const auto it : xrange(iface.Iterations())) {          Y_UNUSED(it);          Y_DO_NOT_OPTIMIZE_AWAY(NewPromise<T>());      }  }  template <typename T> -TPromise<T> SetPromise(T value) {  +TPromise<T> SetPromise(T value) {      auto promise = NewPromise<T>();      promise.SetValue(value);      return promise;  }  template <typename T> -void TestSetPromise(const NBench::NCpu::TParams& iface, T value) {  -    for (const auto it : xrange(iface.Iterations())) {  +void TestSetPromise(const NBench::NCpu::TParams& iface, T value) { +    for (const auto it : xrange(iface.Iterations())) {          Y_UNUSED(it);          Y_DO_NOT_OPTIMIZE_AWAY(SetPromise(value));      }  } -Y_CPU_BENCHMARK(AllocPromiseVoid, iface) {  +Y_CPU_BENCHMARK(AllocPromiseVoid, iface) {      TestAllocPromise<void>(iface);  } -Y_CPU_BENCHMARK(AllocPromiseUI64, iface) {  +Y_CPU_BENCHMARK(AllocPromiseUI64, iface) {      TestAllocPromise<ui64>(iface);  } -Y_CPU_BENCHMARK(AllocPromiseStroka, iface) {  +Y_CPU_BENCHMARK(AllocPromiseStroka, iface) {      TestAllocPromise<TString>(iface);  } -Y_CPU_BENCHMARK(SetPromiseUI64, iface) {  +Y_CPU_BENCHMARK(SetPromiseUI64, iface) {      TestSetPromise<ui64>(iface, 1234567890ull);  } -Y_CPU_BENCHMARK(SetPromiseStroka, iface) {  +Y_CPU_BENCHMARK(SetPromiseStroka, iface) {      TestSetPromise<TString>(iface, "test test test");  } diff --git a/library/cpp/threading/future/wait/wait-inl.h b/library/cpp/threading/future/wait/wait-inl.h index 9d056ff7771..2753d5446cc 100644 --- a/library/cpp/threading/future/wait/wait-inl.h +++ b/library/cpp/threading/future/wait/wait-inl.h @@ -2,10 +2,10 @@  #if !defined(INCLUDE_FUTURE_INL_H)  #error "you should never include wait-inl.h directly" -#endif // INCLUDE_FUTURE_INL_H  +#endif // INCLUDE_FUTURE_INL_H  namespace NThreading { -    namespace NImpl {  +    namespace NImpl {          template <typename TContainer>          TVector<TFuture<void>> ToVoidFutures(const TContainer& futures) {              TVector<TFuture<void>> voidFutures; @@ -17,19 +17,19 @@ namespace NThreading {              return voidFutures;          } -    }  +    }      template <typename TContainer>      [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAll(const TContainer& futures) {          return WaitAll(NImpl::ToVoidFutures(futures));      } -    template <typename TContainer>  +    template <typename TContainer>      [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitExceptionOrAll(const TContainer& futures) {          return WaitExceptionOrAll(NImpl::ToVoidFutures(futures));      } -    template <typename TContainer>  +    template <typename TContainer>      [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAny(const TContainer& futures) {          return WaitAny(NImpl::ToVoidFutures(futures));      } diff --git a/library/cpp/threading/future/wait/wait.cpp b/library/cpp/threading/future/wait/wait.cpp index e0a1c3bbd35..a173833a7f9 100644 --- a/library/cpp/threading/future/wait/wait.cpp +++ b/library/cpp/threading/future/wait/wait.cpp @@ -31,13 +31,13 @@ namespace NThreading {              TWaitGroup<WaitPolicy> wg;              for (const auto& fut : futures) {                  wg.Add(fut); -            }  +            }              return std::move(wg).Finish();          } -    }  +    } -    ////////////////////////////////////////////////////////////////////////////////  +    ////////////////////////////////////////////////////////////////////////////////      TFuture<void> WaitAll(const TFuture<void>& f1) {          return WaitGeneric<TWaitPolicy::TAll>(f1); @@ -56,25 +56,25 @@ namespace NThreading {      TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1) {          return WaitGeneric<TWaitPolicy::TExceptionOrAll>(f1); -    }  +    }      TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1, const TFuture<void>& f2) {          return WaitGeneric<TWaitPolicy::TExceptionOrAll>(f1, f2); -    }  +    }      TFuture<void> WaitExceptionOrAll(TArrayRef<const TFuture<void>> futures) {          return WaitGeneric<TWaitPolicy::TExceptionOrAll>(futures);      } -    ////////////////////////////////////////////////////////////////////////////////  +    ////////////////////////////////////////////////////////////////////////////////      TFuture<void> WaitAny(const TFuture<void>& f1) {          return WaitGeneric<TWaitPolicy::TAny>(f1); -    }  +    }      TFuture<void> WaitAny(const TFuture<void>& f1, const TFuture<void>& f2) {          return WaitGeneric<TWaitPolicy::TAny>(f1, f2); -    }  +    }      TFuture<void> WaitAny(TArrayRef<const TFuture<void>> futures) {          return WaitGeneric<TWaitPolicy::TAny>(futures); diff --git a/library/cpp/threading/future/wait/wait.h b/library/cpp/threading/future/wait/wait.h index 60ec5b6a63e..6ff7d57baac 100644 --- a/library/cpp/threading/future/wait/wait.h +++ b/library/cpp/threading/future/wait/wait.h @@ -25,16 +25,16 @@ namespace NThreading {      [[nodiscard]] TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1);      [[nodiscard]] TFuture<void> WaitExceptionOrAll(const TFuture<void>& f1, const TFuture<void>& f2);      [[nodiscard]] TFuture<void> WaitExceptionOrAll(TArrayRef<const TFuture<void>> futures); -    template <typename TContainer>  +    template <typename TContainer>      [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitExceptionOrAll(const TContainer& futures); -    // waits for any future  +    // waits for any future      [[nodiscard]] TFuture<void> WaitAny(const TFuture<void>& f1);      [[nodiscard]] TFuture<void> WaitAny(const TFuture<void>& f1, const TFuture<void>& f2);      [[nodiscard]] TFuture<void> WaitAny(TArrayRef<const TFuture<void>> futures); -    template <typename TContainer>  +    template <typename TContainer>      [[nodiscard]] NImpl::EnableGenericWait<TContainer> WaitAny(const TContainer& futures); -}  +}  #define INCLUDE_FUTURE_INL_H  #include "wait-inl.h" diff --git a/library/cpp/threading/light_rw_lock/bench/lightrwlock_test.cpp b/library/cpp/threading/light_rw_lock/bench/lightrwlock_test.cpp index 4faa53fc2a4..c3027ea5449 100644 --- a/library/cpp/threading/light_rw_lock/bench/lightrwlock_test.cpp +++ b/library/cpp/threading/light_rw_lock/bench/lightrwlock_test.cpp @@ -28,7 +28,7 @@ static int Y_FORCE_INLINE AtomicFetchAdd(volatile int& item, int value) {  }  #elif defined(__GNUC__)  #else -#error unsupported platform  +#error unsupported platform  #endif  class TPosixRWLock { diff --git a/library/cpp/threading/light_rw_lock/lightrwlock.cpp b/library/cpp/threading/light_rw_lock/lightrwlock.cpp index 107c8ec3bfb..fbb63fd47f7 100644 --- a/library/cpp/threading/light_rw_lock/lightrwlock.cpp +++ b/library/cpp/threading/light_rw_lock/lightrwlock.cpp @@ -79,7 +79,7 @@ void TLightRWLock::WaitForUntrappedAndAcquireRead() {          }      skip_lock_try: -        if (AtomicLoad(UnshareFutex_) && (AtomicLoad(Counter_) & 0x7FFFFFFF) == 0) {  +        if (AtomicLoad(UnshareFutex_) && (AtomicLoad(Counter_) & 0x7FFFFFFF) == 0) {              SequenceStore(UnshareFutex_, 0);              FutexWake(UnshareFutex_, 1);          } diff --git a/library/cpp/threading/light_rw_lock/lightrwlock.h b/library/cpp/threading/light_rw_lock/lightrwlock.h index 4c648fc2a31..931a1817bce 100644 --- a/library/cpp/threading/light_rw_lock/lightrwlock.h +++ b/library/cpp/threading/light_rw_lock/lightrwlock.h @@ -1,5 +1,5 @@  #pragma once -  +  #include <util/system/rwlock.h>  #include <util/system/sanitizers.h> @@ -34,55 +34,55 @@  #include <errno.h>  namespace NS_LightRWLock { -    static int Y_FORCE_INLINE AtomicFetchAdd(volatile int& item, int value) {  -        return __atomic_fetch_add(&item, value, __ATOMIC_SEQ_CST);  -    }  +    static int Y_FORCE_INLINE AtomicFetchAdd(volatile int& item, int value) { +        return __atomic_fetch_add(&item, value, __ATOMIC_SEQ_CST); +    }  #if defined(_x86_64_) || defined(_i386_) -    static char Y_FORCE_INLINE AtomicSetBit(volatile int& item, unsigned bit) {  -        char ret;  -        __asm__ __volatile__(  -            "lock bts %2,%0\n"  -            "setc %1\n"  -            : "+m"(item), "=rm"(ret)  -            : "r"(bit)  -            : "cc");  +    static char Y_FORCE_INLINE AtomicSetBit(volatile int& item, unsigned bit) { +        char ret; +        __asm__ __volatile__( +            "lock bts %2,%0\n" +            "setc %1\n" +            : "+m"(item), "=rm"(ret) +            : "r"(bit) +            : "cc");          // msan doesn't treat ret as initialized          NSan::Unpoison(&ret, sizeof(ret)); -        return ret;  -    }  +        return ret; +    } -    static char Y_FORCE_INLINE AtomicClearBit(volatile int& item, unsigned bit) {  -        char ret;  -        __asm__ __volatile__(  -            "lock btc %2,%0\n"  -            "setc %1\n"  -            : "+m"(item), "=rm"(ret)  -            : "r"(bit)  -            : "cc");  +    static char Y_FORCE_INLINE AtomicClearBit(volatile int& item, unsigned bit) { +        char ret; +        __asm__ __volatile__( +            "lock btc %2,%0\n" +            "setc %1\n" +            : "+m"(item), "=rm"(ret) +            : "r"(bit) +            : "cc");          // msan doesn't treat ret as initialized          NSan::Unpoison(&ret, sizeof(ret)); -        return ret;  -    }  +        return ret; +    }  #else -    static char Y_FORCE_INLINE AtomicSetBit(volatile int& item, unsigned bit) {  -        int prev = __atomic_fetch_or(&item, 1 << bit, __ATOMIC_SEQ_CST);  -        return (prev & (1 << bit)) != 0 ? 1 : 0;  -    }  +    static char Y_FORCE_INLINE AtomicSetBit(volatile int& item, unsigned bit) { +        int prev = __atomic_fetch_or(&item, 1 << bit, __ATOMIC_SEQ_CST); +        return (prev & (1 << bit)) != 0 ? 1 : 0; +    } -    static char Y_FORCE_INLINE  -    AtomicClearBit(volatile int& item, unsigned bit) {  -        int prev = __atomic_fetch_and(&item, ~(1 << bit), __ATOMIC_SEQ_CST);  -        return (prev & (1 << bit)) != 0 ? 1 : 0;  -    }  +    static char Y_FORCE_INLINE +    AtomicClearBit(volatile int& item, unsigned bit) { +        int prev = __atomic_fetch_and(&item, ~(1 << bit), __ATOMIC_SEQ_CST); +        return (prev & (1 << bit)) != 0 ? 1 : 0; +    }  #endif  #if defined(_x86_64_) || defined(_i386_) || defined (__aarch64__) || defined (__powerpc64__) @@ -100,42 +100,42 @@ namespace NS_LightRWLock {  #endif -    template <typename TInt>  -    static void Y_FORCE_INLINE AtomicStore(volatile TInt& var, TInt value) {  -        __atomic_store_n(&var, value, __ATOMIC_RELEASE);  -    }  - -    template <typename TInt>  -    static void Y_FORCE_INLINE SequenceStore(volatile TInt& var, TInt value) {  -        __atomic_store_n(&var, value, __ATOMIC_SEQ_CST);  -    }  - -    template <typename TInt>  -    static TInt Y_FORCE_INLINE AtomicLoad(const volatile TInt& var) {  -        return __atomic_load_n(&var, __ATOMIC_ACQUIRE);  -    }  - -    static void Y_FORCE_INLINE FutexWait(volatile int& fvar, int value) {  -        for (;;) {  -            int result =  -                syscall(SYS_futex, &fvar, FUTEX_WAIT_PRIVATE, value, NULL, NULL, 0);  -            if (Y_UNLIKELY(result == -1)) {  -                if (errno == EWOULDBLOCK)  -                    return;  -                if (errno == EINTR)  -                    continue;  -                Y_FAIL("futex error");  -            }  +    template <typename TInt> +    static void Y_FORCE_INLINE AtomicStore(volatile TInt& var, TInt value) { +        __atomic_store_n(&var, value, __ATOMIC_RELEASE); +    } + +    template <typename TInt> +    static void Y_FORCE_INLINE SequenceStore(volatile TInt& var, TInt value) { +        __atomic_store_n(&var, value, __ATOMIC_SEQ_CST); +    } + +    template <typename TInt> +    static TInt Y_FORCE_INLINE AtomicLoad(const volatile TInt& var) { +        return __atomic_load_n(&var, __ATOMIC_ACQUIRE); +    } + +    static void Y_FORCE_INLINE FutexWait(volatile int& fvar, int value) { +        for (;;) { +            int result = +                syscall(SYS_futex, &fvar, FUTEX_WAIT_PRIVATE, value, NULL, NULL, 0); +            if (Y_UNLIKELY(result == -1)) { +                if (errno == EWOULDBLOCK) +                    return; +                if (errno == EINTR) +                    continue; +                Y_FAIL("futex error"); +            }          }      } -    static void Y_FORCE_INLINE FutexWake(volatile int& fvar, int amount) {  -        const int result =  -            syscall(SYS_futex, &fvar, FUTEX_WAKE_PRIVATE, amount, NULL, NULL, 0);  -        if (Y_UNLIKELY(result == -1))  -            Y_FAIL("futex error");  -    }  -  +    static void Y_FORCE_INLINE FutexWake(volatile int& fvar, int amount) { +        const int result = +            syscall(SYS_futex, &fvar, FUTEX_WAKE_PRIVATE, amount, NULL, NULL, 0); +        if (Y_UNLIKELY(result == -1)) +            Y_FAIL("futex error"); +    } +  }  class alignas(64) TLightRWLock { @@ -145,8 +145,8 @@ public:          , TrappedFutex_(0)          , UnshareFutex_(0)          , SpinCount_(spinCount) -    {  -    }  +    { +    }      TLightRWLock(const TLightRWLock&) = delete;      void operator=(const TLightRWLock&) = delete; @@ -208,10 +208,10 @@ private:  class TLightRWLock: public TRWMutex {  public: -    TLightRWLock() {  -    }  -    TLightRWLock(ui32) {  -    }  +    TLightRWLock() { +    } +    TLightRWLock(ui32) { +    }  };  #endif diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp index f2b825681f1..1d3fbb4bf44 100644 --- a/library/cpp/threading/local_executor/local_executor.cpp +++ b/library/cpp/threading/local_executor/local_executor.cpp @@ -40,7 +40,7 @@ namespace {          NPar::TLocallyExecutableFunction Exec;          int FirstId, LastId;          TVector<NThreading::TPromise<void>> Promises; -  +      public:          TFunctionWrapperWithPromise(NPar::TLocallyExecutableFunction exec, int firstId, int lastId)              : Exec(std::move(exec)) diff --git a/library/cpp/threading/local_executor/local_executor.h b/library/cpp/threading/local_executor/local_executor.h index 33476722b88..c1c824f67cb 100644 --- a/library/cpp/threading/local_executor/local_executor.h +++ b/library/cpp/threading/local_executor/local_executor.h @@ -5,7 +5,7 @@  #include <util/generic/cast.h>  #include <util/generic/fwd.h>  #include <util/generic/noncopyable.h> -#include <util/generic/ptr.h>  +#include <util/generic/ptr.h>  #include <util/generic/singleton.h>  #include <util/generic/ymath.h> @@ -135,9 +135,9 @@ namespace NPar {          //          TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); -        template <typename TBody>  +        template <typename TBody>          static inline auto BlockedLoopBody(const TExecRangeParams& params, const TBody& body) { -            return [=](int blockId) {  +            return [=](int blockId) {                  const int blockFirstId = params.FirstId + blockId * params.GetBlockSize();                  const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize());                  for (int i = blockFirstId; i < blockLastId; ++i) { diff --git a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp index ccc833c1b94..ac5737717cd 100644 --- a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp +++ b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp @@ -15,315 +15,315 @@ static const int DefaultThreadsCount = 41;  static const int DefaultRangeSize = 999;  Y_UNIT_TEST_SUITE(ExecRangeWithFutures){ -    bool AllOf(const TVector<int>& vec, int value){  +    bool AllOf(const TVector<int>& vec, int value){          return AllOf(vec, [value](int element) { return value == element; }); -}  - -void AsyncRunAndWaitFuturesReady(int rangeSize, int threads) {  -    TLocalExecutor localExecutor;  -    localExecutor.RunAdditionalThreads(threads);  -    TAtomic signal = 0;  -    TVector<int> data(rangeSize, 0);  -    TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&signal, &data](int i) {  -        UNIT_ASSERT(data[i] == 0);  -        while (AtomicGet(signal) == 0)  -            ;  -        data[i] += 1;  -    },  -                                                                                    0, rangeSize, TLocalExecutor::HIGH_PRIORITY);  -    UNIT_ASSERT(AllOf(data, 0));  -    for (auto& future : futures)  -        UNIT_ASSERT(!future.HasValue());  -    AtomicSet(signal, 1);  -    for (auto& future : futures) {  -        future.GetValueSync();  +} + +void AsyncRunAndWaitFuturesReady(int rangeSize, int threads) { +    TLocalExecutor localExecutor; +    localExecutor.RunAdditionalThreads(threads); +    TAtomic signal = 0; +    TVector<int> data(rangeSize, 0); +    TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&signal, &data](int i) { +        UNIT_ASSERT(data[i] == 0); +        while (AtomicGet(signal) == 0) +            ; +        data[i] += 1; +    }, +                                                                                    0, rangeSize, TLocalExecutor::HIGH_PRIORITY); +    UNIT_ASSERT(AllOf(data, 0)); +    for (auto& future : futures) +        UNIT_ASSERT(!future.HasValue()); +    AtomicSet(signal, 1); +    for (auto& future : futures) { +        future.GetValueSync();      } -    UNIT_ASSERT(AllOf(data, 1));  -}  +    UNIT_ASSERT(AllOf(data, 1)); +}  Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReady) { -    AsyncRunAndWaitFuturesReady(DefaultRangeSize, DefaultThreadsCount);  -}  +    AsyncRunAndWaitFuturesReady(DefaultRangeSize, DefaultThreadsCount); +}  Y_UNIT_TEST(AsyncRunOneTaskAndWaitFuturesReady) { -    AsyncRunAndWaitFuturesReady(1, DefaultThreadsCount);  -}  +    AsyncRunAndWaitFuturesReady(1, DefaultThreadsCount); +}  Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReadyOneExtraThread) { -    AsyncRunAndWaitFuturesReady(DefaultRangeSize, 1);  -}  +    AsyncRunAndWaitFuturesReady(DefaultRangeSize, 1); +}  Y_UNIT_TEST(AsyncRunOneThreadAndWaitFuturesReadyOneExtraThread) { -    AsyncRunAndWaitFuturesReady(1, 1);  -}  +    AsyncRunAndWaitFuturesReady(1, 1); +}  Y_UNIT_TEST(AsyncRunTwoRangesAndWaitFuturesReady) { -    TLocalExecutor localExecutor;  -    localExecutor.RunAdditionalThreads(DefaultThreadsCount);  -    TAtomic signal = 0;  -    TVector<int> data1(DefaultRangeSize, 0);  -    TVector<NThreading::TFuture<void>> futures1 = localExecutor.ExecRangeWithFutures([&signal, &data1](int i) {  -        UNIT_ASSERT(data1[i] == 0);  -        while (AtomicGet(signal) == 0)  -            ;  -        data1[i] += 1;  -    },  -                                                                                     0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY);  -    TVector<int> data2(DefaultRangeSize, 0);  -    TVector<NThreading::TFuture<void>> futures2 = localExecutor.ExecRangeWithFutures([&signal, &data2](int i) {  -        UNIT_ASSERT(data2[i] == 0);  -        while (AtomicGet(signal) == 0)  -            ;  -        data2[i] += 2;  -    },  -                                                                                     0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY);  -    UNIT_ASSERT(AllOf(data1, 0));  -    UNIT_ASSERT(AllOf(data2, 0));  -    AtomicSet(signal, 1);  -    for (int i = 0; i < DefaultRangeSize; ++i) {  -        futures1[i].GetValueSync();  -        futures2[i].GetValueSync();  +    TLocalExecutor localExecutor; +    localExecutor.RunAdditionalThreads(DefaultThreadsCount); +    TAtomic signal = 0; +    TVector<int> data1(DefaultRangeSize, 0); +    TVector<NThreading::TFuture<void>> futures1 = localExecutor.ExecRangeWithFutures([&signal, &data1](int i) { +        UNIT_ASSERT(data1[i] == 0); +        while (AtomicGet(signal) == 0) +            ; +        data1[i] += 1; +    }, +                                                                                     0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY); +    TVector<int> data2(DefaultRangeSize, 0); +    TVector<NThreading::TFuture<void>> futures2 = localExecutor.ExecRangeWithFutures([&signal, &data2](int i) { +        UNIT_ASSERT(data2[i] == 0); +        while (AtomicGet(signal) == 0) +            ; +        data2[i] += 2; +    }, +                                                                                     0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY); +    UNIT_ASSERT(AllOf(data1, 0)); +    UNIT_ASSERT(AllOf(data2, 0)); +    AtomicSet(signal, 1); +    for (int i = 0; i < DefaultRangeSize; ++i) { +        futures1[i].GetValueSync(); +        futures2[i].GetValueSync();      } -    UNIT_ASSERT(AllOf(data1, 1));  -    UNIT_ASSERT(AllOf(data2, 2));  -}  - -void AsyncRunRangeAndWaitExceptions(int rangeSize, int threadsCount) {  -    TLocalExecutor localExecutor;  -    localExecutor.RunAdditionalThreads(threadsCount);  -    TAtomic signal = 0;  -    TVector<int> data(rangeSize, 0);  -    TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&signal, &data](int i) {  -        UNIT_ASSERT(data[i] == 0);  -        while (AtomicGet(signal) == 0)  -            ;  -        data[i] += 1;  -        throw 10000 + i;  -    },  -                                                                                    0, rangeSize, TLocalExecutor::HIGH_PRIORITY);  -    UNIT_ASSERT(AllOf(data, 0));  -    UNIT_ASSERT(futures.ysize() == rangeSize);  -    AtomicSet(signal, 1);  -    int exceptionsCaught = 0;  -    for (int i = 0; i < rangeSize; ++i) {  -        try {  -            futures[i].GetValueSync();  -        } catch (int& e) {  -            if (e == 10000 + i) {  -                ++exceptionsCaught;  +    UNIT_ASSERT(AllOf(data1, 1)); +    UNIT_ASSERT(AllOf(data2, 2)); +} + +void AsyncRunRangeAndWaitExceptions(int rangeSize, int threadsCount) { +    TLocalExecutor localExecutor; +    localExecutor.RunAdditionalThreads(threadsCount); +    TAtomic signal = 0; +    TVector<int> data(rangeSize, 0); +    TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&signal, &data](int i) { +        UNIT_ASSERT(data[i] == 0); +        while (AtomicGet(signal) == 0) +            ; +        data[i] += 1; +        throw 10000 + i; +    }, +                                                                                    0, rangeSize, TLocalExecutor::HIGH_PRIORITY); +    UNIT_ASSERT(AllOf(data, 0)); +    UNIT_ASSERT(futures.ysize() == rangeSize); +    AtomicSet(signal, 1); +    int exceptionsCaught = 0; +    for (int i = 0; i < rangeSize; ++i) { +        try { +            futures[i].GetValueSync(); +        } catch (int& e) { +            if (e == 10000 + i) { +                ++exceptionsCaught;              }          }      } -    UNIT_ASSERT(exceptionsCaught == rangeSize);  -    UNIT_ASSERT(AllOf(data, 1));  -}  +    UNIT_ASSERT(exceptionsCaught == rangeSize); +    UNIT_ASSERT(AllOf(data, 1)); +}  Y_UNIT_TEST(AsyncRunRangeAndWaitExceptions) { -    AsyncRunRangeAndWaitExceptions(DefaultRangeSize, DefaultThreadsCount);  -}  +    AsyncRunRangeAndWaitExceptions(DefaultRangeSize, DefaultThreadsCount); +}  Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptions) { -    AsyncRunRangeAndWaitExceptions(1, DefaultThreadsCount);  -}  +    AsyncRunRangeAndWaitExceptions(1, DefaultThreadsCount); +}  Y_UNIT_TEST(AsyncRunRangeAndWaitExceptionsOneExtraThread) { -    AsyncRunRangeAndWaitExceptions(DefaultRangeSize, 1);  -}  +    AsyncRunRangeAndWaitExceptions(DefaultRangeSize, 1); +}  Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptionsOneExtraThread) { -    AsyncRunRangeAndWaitExceptions(1, 1);  -}  +    AsyncRunRangeAndWaitExceptions(1, 1); +}  Y_UNIT_TEST(AsyncRunTwoRangesAndWaitExceptions) { -    TLocalExecutor localExecutor;  -    localExecutor.RunAdditionalThreads(DefaultThreadsCount);  -    TAtomic signal = 0;  -    TVector<int> data1(DefaultRangeSize, 0);  -    TVector<NThreading::TFuture<void>> futures1 = localExecutor.ExecRangeWithFutures([&signal, &data1](int i) {  -        UNIT_ASSERT(data1[i] == 0);  -        while (AtomicGet(signal) == 0)  -            ;  -        data1[i] += 1;  -        throw 15000 + i;  -    },  -                                                                                     0, DefaultRangeSize, TLocalExecutor::LOW_PRIORITY);  -    TVector<int> data2(DefaultRangeSize, 0);  -    TVector<NThreading::TFuture<void>> futures2 = localExecutor.ExecRangeWithFutures([&signal, &data2](int i) {  -        UNIT_ASSERT(data2[i] == 0);  -        while (AtomicGet(signal) == 0)  -            ;  -        data2[i] += 2;  -        throw 16000 + i;  -    },  -                                                                                     0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY);  - -    UNIT_ASSERT(AllOf(data1, 0));  -    UNIT_ASSERT(AllOf(data2, 0));  -    UNIT_ASSERT(futures1.size() == DefaultRangeSize);  -    UNIT_ASSERT(futures2.size() == DefaultRangeSize);  -    AtomicSet(signal, 1);  -    int exceptionsCaught = 0;  -    for (int i = 0; i < DefaultRangeSize; ++i) {  -        try {  -            futures1[i].GetValueSync();  -        } catch (int& e) {  -            if (e == 15000 + i) {  -                ++exceptionsCaught;  +    TLocalExecutor localExecutor; +    localExecutor.RunAdditionalThreads(DefaultThreadsCount); +    TAtomic signal = 0; +    TVector<int> data1(DefaultRangeSize, 0); +    TVector<NThreading::TFuture<void>> futures1 = localExecutor.ExecRangeWithFutures([&signal, &data1](int i) { +        UNIT_ASSERT(data1[i] == 0); +        while (AtomicGet(signal) == 0) +            ; +        data1[i] += 1; +        throw 15000 + i; +    }, +                                                                                     0, DefaultRangeSize, TLocalExecutor::LOW_PRIORITY); +    TVector<int> data2(DefaultRangeSize, 0); +    TVector<NThreading::TFuture<void>> futures2 = localExecutor.ExecRangeWithFutures([&signal, &data2](int i) { +        UNIT_ASSERT(data2[i] == 0); +        while (AtomicGet(signal) == 0) +            ; +        data2[i] += 2; +        throw 16000 + i; +    }, +                                                                                     0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY); + +    UNIT_ASSERT(AllOf(data1, 0)); +    UNIT_ASSERT(AllOf(data2, 0)); +    UNIT_ASSERT(futures1.size() == DefaultRangeSize); +    UNIT_ASSERT(futures2.size() == DefaultRangeSize); +    AtomicSet(signal, 1); +    int exceptionsCaught = 0; +    for (int i = 0; i < DefaultRangeSize; ++i) { +        try { +            futures1[i].GetValueSync(); +        } catch (int& e) { +            if (e == 15000 + i) { +                ++exceptionsCaught;              } -        }  -        try {  -            futures2[i].GetValueSync();  -        } catch (int& e) {  -            if (e == 16000 + i) {  -                ++exceptionsCaught;  +        } +        try { +            futures2[i].GetValueSync(); +        } catch (int& e) { +            if (e == 16000 + i) { +                ++exceptionsCaught;              }          }      } -    UNIT_ASSERT(exceptionsCaught == 2 * DefaultRangeSize);  -    UNIT_ASSERT(AllOf(data1, 1));  -    UNIT_ASSERT(AllOf(data2, 2));  -}  - -void RunRangeAndCheckExceptionsWithWaitComplete(int rangeSize, int threadsCount) {  -    TLocalExecutor localExecutor;  -    localExecutor.RunAdditionalThreads(threadsCount);  -    TVector<int> data(rangeSize, 0);  -    TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&data](int i) {  -        UNIT_ASSERT(data[i] == 0);  -        data[i] += 1;  -        throw 30000 + i;  -    },  -                                                                                    0, rangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE);  -    UNIT_ASSERT(AllOf(data, 1));  -    int exceptionsCaught = 0;  -    for (int i = 0; i < rangeSize; ++i) {  -        try {  -            futures[i].GetValueSync();  -        } catch (int& e) {  -            if (e == 30000 + i) {  -                ++exceptionsCaught;  +    UNIT_ASSERT(exceptionsCaught == 2 * DefaultRangeSize); +    UNIT_ASSERT(AllOf(data1, 1)); +    UNIT_ASSERT(AllOf(data2, 2)); +} + +void RunRangeAndCheckExceptionsWithWaitComplete(int rangeSize, int threadsCount) { +    TLocalExecutor localExecutor; +    localExecutor.RunAdditionalThreads(threadsCount); +    TVector<int> data(rangeSize, 0); +    TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&data](int i) { +        UNIT_ASSERT(data[i] == 0); +        data[i] += 1; +        throw 30000 + i; +    }, +                                                                                    0, rangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE); +    UNIT_ASSERT(AllOf(data, 1)); +    int exceptionsCaught = 0; +    for (int i = 0; i < rangeSize; ++i) { +        try { +            futures[i].GetValueSync(); +        } catch (int& e) { +            if (e == 30000 + i) { +                ++exceptionsCaught;              }          }      } -    UNIT_ASSERT(exceptionsCaught == rangeSize);  -    UNIT_ASSERT(AllOf(data, 1));  -}  +    UNIT_ASSERT(exceptionsCaught == rangeSize); +    UNIT_ASSERT(AllOf(data, 1)); +}  Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitComplete) { -    RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, DefaultThreadsCount);  -}  +    RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, DefaultThreadsCount); +}  Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitComplete) { -    RunRangeAndCheckExceptionsWithWaitComplete(1, DefaultThreadsCount);  -}  +    RunRangeAndCheckExceptionsWithWaitComplete(1, DefaultThreadsCount); +}  Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteOneExtraThread) { -    RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 1);  -}  +    RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 1); +}  Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteOneExtraThread) { -    RunRangeAndCheckExceptionsWithWaitComplete(1, 1);  -}  +    RunRangeAndCheckExceptionsWithWaitComplete(1, 1); +}  Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteZeroExtraThreads) { -    RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 0);  -}  +    RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 0); +}  Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteZeroExtraThreads) { -    RunRangeAndCheckExceptionsWithWaitComplete(1, 0);  -}  -}  -;  +    RunRangeAndCheckExceptionsWithWaitComplete(1, 0); +} +} +;  Y_UNIT_TEST_SUITE(ExecRangeWithThrow){ -    void RunParallelWhichThrowsTTestException(int rangeStart, int rangeSize, int threadsCount, int flags, TAtomic& processed){  -        AtomicSet(processed, 0);  -TLocalExecutor localExecutor;  -localExecutor.RunAdditionalThreads(threadsCount);  -localExecutor.ExecRangeWithThrow([&processed](int) {  -    AtomicAdd(processed, 1);  -    throw TTestException();  -},  -                                 rangeStart, rangeStart + rangeSize, flags);  -}  +    void RunParallelWhichThrowsTTestException(int rangeStart, int rangeSize, int threadsCount, int flags, TAtomic& processed){ +        AtomicSet(processed, 0); +TLocalExecutor localExecutor; +localExecutor.RunAdditionalThreads(threadsCount); +localExecutor.ExecRangeWithThrow([&processed](int) { +    AtomicAdd(processed, 1); +    throw TTestException(); +}, +                                 rangeStart, rangeStart + rangeSize, flags); +}  Y_UNIT_TEST(RunParallelWhichThrowsTTestException) { -    TAtomic processed = 0;  -    UNIT_ASSERT_EXCEPTION(  -        RunParallelWhichThrowsTTestException(10, 40, DefaultThreadsCount,  -                                             TLocalExecutor::EFlags::WAIT_COMPLETE, processed),  -        TTestException);  -    UNIT_ASSERT(AtomicGet(processed) == 40);  -}  - -void ThrowAndCatchTTestException(int rangeSize, int threadsCount, int flags) {  -    TAtomic processed = 0;  -    UNIT_ASSERT_EXCEPTION(  -        RunParallelWhichThrowsTTestException(0, rangeSize, threadsCount, flags, processed),  -        TTestException);  -    UNIT_ASSERT(AtomicGet(processed) == rangeSize);  -}  +    TAtomic processed = 0; +    UNIT_ASSERT_EXCEPTION( +        RunParallelWhichThrowsTTestException(10, 40, DefaultThreadsCount, +                                             TLocalExecutor::EFlags::WAIT_COMPLETE, processed), +        TTestException); +    UNIT_ASSERT(AtomicGet(processed) == 40); +} + +void ThrowAndCatchTTestException(int rangeSize, int threadsCount, int flags) { +    TAtomic processed = 0; +    UNIT_ASSERT_EXCEPTION( +        RunParallelWhichThrowsTTestException(0, rangeSize, threadsCount, flags, processed), +        TTestException); +    UNIT_ASSERT(AtomicGet(processed) == rangeSize); +}  Y_UNIT_TEST(ThrowAndCatchTTestExceptionLowPriority) { -    ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,  -                                TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::LOW_PRIORITY);  -}  +    ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, +                                TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::LOW_PRIORITY); +}  Y_UNIT_TEST(ThrowAndCatchTTestExceptionMedPriority) { -    ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,  -                                TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::MED_PRIORITY);  -}  +    ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, +                                TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::MED_PRIORITY); +}  Y_UNIT_TEST(ThrowAndCatchTTestExceptionHighPriority) { -    ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,  -                                TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::HIGH_PRIORITY);  -}  +    ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, +                                TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::HIGH_PRIORITY); +}  Y_UNIT_TEST(ThrowAndCatchTTestExceptionWaitComplete) { -    ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,  -                                TLocalExecutor::EFlags::WAIT_COMPLETE);  -}  +    ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, +                                TLocalExecutor::EFlags::WAIT_COMPLETE); +}  Y_UNIT_TEST(RethrowExeptionSequentialWaitComplete) { -    ThrowAndCatchTTestException(DefaultRangeSize, 0,  -                                TLocalExecutor::EFlags::WAIT_COMPLETE);  -}  +    ThrowAndCatchTTestException(DefaultRangeSize, 0, +                                TLocalExecutor::EFlags::WAIT_COMPLETE); +}  Y_UNIT_TEST(RethrowExeptionOneExtraThreadWaitComplete) { -    ThrowAndCatchTTestException(DefaultRangeSize, 1,  -                                TLocalExecutor::EFlags::WAIT_COMPLETE);  -}  - -void ThrowsTTestExceptionFromNested(TLocalExecutor& localExecutor) {  -    localExecutor.ExecRangeWithThrow([](int) {  -        throw TTestException();  -    },  -                                     0, 10, TLocalExecutor::EFlags::WAIT_COMPLETE);  -}  - -void CatchTTestExceptionFromNested(TAtomic& processed1, TAtomic& processed2) {  -    TLocalExecutor localExecutor;  -    localExecutor.RunAdditionalThreads(DefaultThreadsCount);  -    localExecutor.ExecRangeWithThrow([&processed1, &processed2, &localExecutor](int) {  -        AtomicAdd(processed1, 1);  -        UNIT_ASSERT_EXCEPTION(  -            ThrowsTTestExceptionFromNested(localExecutor),  -            TTestException);  -        AtomicAdd(processed2, 1);  -    },  -                                     0, DefaultRangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE);  -}  +    ThrowAndCatchTTestException(DefaultRangeSize, 1, +                                TLocalExecutor::EFlags::WAIT_COMPLETE); +} + +void ThrowsTTestExceptionFromNested(TLocalExecutor& localExecutor) { +    localExecutor.ExecRangeWithThrow([](int) { +        throw TTestException(); +    }, +                                     0, 10, TLocalExecutor::EFlags::WAIT_COMPLETE); +} + +void CatchTTestExceptionFromNested(TAtomic& processed1, TAtomic& processed2) { +    TLocalExecutor localExecutor; +    localExecutor.RunAdditionalThreads(DefaultThreadsCount); +    localExecutor.ExecRangeWithThrow([&processed1, &processed2, &localExecutor](int) { +        AtomicAdd(processed1, 1); +        UNIT_ASSERT_EXCEPTION( +            ThrowsTTestExceptionFromNested(localExecutor), +            TTestException); +        AtomicAdd(processed2, 1); +    }, +                                     0, DefaultRangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE); +}  Y_UNIT_TEST(NestedParallelExceptionsDoNotLeak) { -    TAtomic processed1 = 0;  -    TAtomic processed2 = 0;  -    UNIT_ASSERT_NO_EXCEPTION(  -        CatchTTestExceptionFromNested(processed1, processed2));  -    UNIT_ASSERT_EQUAL(AtomicGet(processed1), DefaultRangeSize);  -    UNIT_ASSERT_EQUAL(AtomicGet(processed2), DefaultRangeSize);  -}  -}  -;  +    TAtomic processed1 = 0; +    TAtomic processed2 = 0; +    UNIT_ASSERT_NO_EXCEPTION( +        CatchTTestExceptionFromNested(processed1, processed2)); +    UNIT_ASSERT_EQUAL(AtomicGet(processed1), DefaultRangeSize); +    UNIT_ASSERT_EQUAL(AtomicGet(processed2), DefaultRangeSize); +} +} +;  Y_UNIT_TEST_SUITE(ExecLargeRangeWithThrow){ diff --git a/library/cpp/threading/poor_man_openmp/thread_helper.cpp b/library/cpp/threading/poor_man_openmp/thread_helper.cpp index b4ec5c78795..34cb6507b9f 100644 --- a/library/cpp/threading/poor_man_openmp/thread_helper.cpp +++ b/library/cpp/threading/poor_man_openmp/thread_helper.cpp @@ -1,7 +1,7 @@  #include "thread_helper.h" -  -#include <util/generic/singleton.h>  -  -TMtpQueueHelper& TMtpQueueHelper::Instance() {  -    return *Singleton<TMtpQueueHelper>();  -}  + +#include <util/generic/singleton.h> + +TMtpQueueHelper& TMtpQueueHelper::Instance() { +    return *Singleton<TMtpQueueHelper>(); +} diff --git a/library/cpp/threading/poor_man_openmp/thread_helper.h b/library/cpp/threading/poor_man_openmp/thread_helper.h index 1536c186cb7..0ecee0590b5 100644 --- a/library/cpp/threading/poor_man_openmp/thread_helper.h +++ b/library/cpp/threading/poor_man_openmp/thread_helper.h @@ -2,17 +2,17 @@  #include <util/thread/pool.h>  #include <util/generic/utility.h> -#include <util/generic/yexception.h>  +#include <util/generic/yexception.h>  #include <util/system/info.h>  #include <util/system/atomic.h>  #include <util/system/condvar.h>  #include <util/system/mutex.h> -#include <util/stream/output.h>  +#include <util/stream/output.h>  #include <functional> -#include <cstdlib>  +#include <cstdlib> -class TMtpQueueHelper {  +class TMtpQueueHelper {  public:      TMtpQueueHelper() {          SetThreadCount(NSystemInfo::CachedNumberOfCpus()); @@ -27,79 +27,79 @@ public:          ThreadCount = threads;          q = CreateThreadPool(ThreadCount);      } -  -    static TMtpQueueHelper& Instance();  -  + +    static TMtpQueueHelper& Instance(); +  private:      size_t ThreadCount;      TAutoPtr<IThreadPool> q;  }; -namespace NYmp {  +namespace NYmp {      inline void SetThreadCount(size_t threads) { -        TMtpQueueHelper::Instance().SetThreadCount(threads);  +        TMtpQueueHelper::Instance().SetThreadCount(threads);      }      inline size_t GetThreadCount() { -        return TMtpQueueHelper::Instance().GetThreadCount();  +        return TMtpQueueHelper::Instance().GetThreadCount();      } -    template <typename T>  +    template <typename T>      inline void ParallelForStaticChunk(T begin, T end, size_t chunkSize, std::function<void(T)> func) { -        chunkSize = Max<size_t>(chunkSize, 1);  -  -        size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount();  +        chunkSize = Max<size_t>(chunkSize, 1); + +        size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount();          IThreadPool* queue = TMtpQueueHelper::Instance().Get();          TCondVar cv;          TMutex mutex;          TAtomic counter = threadCount; -        std::exception_ptr err;  -  -        for (size_t i = 0; i < threadCount; ++i) {  -            queue->SafeAddFunc([&cv, &counter, &mutex, &func, i, begin, end, chunkSize, threadCount, &err]() {  -                try {  -                    T currentChunkStart = begin + static_cast<decltype(T() - T())>(i * chunkSize);  -  -                    while (currentChunkStart < end) {  -                        T currentChunkEnd = Min<T>(end, currentChunkStart + chunkSize);  -  -                        for (T val = currentChunkStart; val < currentChunkEnd; ++val) {  -                            func(val);  -                        }  -  -                        currentChunkStart += chunkSize * threadCount;  +        std::exception_ptr err; + +        for (size_t i = 0; i < threadCount; ++i) { +            queue->SafeAddFunc([&cv, &counter, &mutex, &func, i, begin, end, chunkSize, threadCount, &err]() { +                try { +                    T currentChunkStart = begin + static_cast<decltype(T() - T())>(i * chunkSize); + +                    while (currentChunkStart < end) { +                        T currentChunkEnd = Min<T>(end, currentChunkStart + chunkSize); + +                        for (T val = currentChunkStart; val < currentChunkEnd; ++val) { +                            func(val); +                        } + +                        currentChunkStart += chunkSize * threadCount; +                    } +                } catch (...) { +                    with_lock (mutex) { +                        err = std::current_exception(); +                    } +                } + +                with_lock (mutex) { +                    if (AtomicDecrement(counter) == 0) { +                        //last one +                        cv.Signal();                      } -                } catch (...) {  -                    with_lock (mutex) {  -                        err = std::current_exception();  -                    }                   } -  -                with_lock (mutex) {  -                    if (AtomicDecrement(counter) == 0) {  -                        //last one  -                        cv.Signal();  -                    }  -                }               });          } -  -        with_lock (mutex) {  -            while (AtomicGet(counter) > 0) {  -                cv.WaitI(mutex);  -            }  + +        with_lock (mutex) { +            while (AtomicGet(counter) > 0) { +                cv.WaitI(mutex); +            } +        } + +        if (err) { +            std::rethrow_exception(err);          } -  -        if (err) {  -            std::rethrow_exception(err);  -        }       } -    template <typename T>  +    template <typename T>      inline void ParallelForStaticAutoChunk(T begin, T end, std::function<void(T)> func) { -        const size_t taskSize = end - begin;  -        const size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount();  -  +        const size_t taskSize = end - begin; +        const size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount(); +          ParallelForStaticChunk(begin, end, (taskSize + threadCount - 1) / threadCount, func);      } -}  +} diff --git a/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp b/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp index 79c7a14b5e4..74176368643 100644 --- a/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp +++ b/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp @@ -1,26 +1,26 @@ -#include "thread_helper.h"  -  +#include "thread_helper.h" +  #include <library/cpp/testing/unittest/registar.h> -  +  #include <util/generic/string.h> -#include <util/generic/yexception.h>  -  +#include <util/generic/yexception.h> +  Y_UNIT_TEST_SUITE(TestMP) {      Y_UNIT_TEST(TestErr) { -        std::function<void(int)> f = [](int x) {  -            if (x == 5) {  -                ythrow yexception() << "oops";  -            }  -        };  -  +        std::function<void(int)> f = [](int x) { +            if (x == 5) { +                ythrow yexception() << "oops"; +            } +        }; +          TString s; -  -        try {  -            NYmp::ParallelForStaticAutoChunk(0, 10, f);  -        } catch (...) {  -            s = CurrentExceptionMessage();  -        }  -  -        UNIT_ASSERT(s.find("oops") > 0);  -    }  -}  + +        try { +            NYmp::ParallelForStaticAutoChunk(0, 10, f); +        } catch (...) { +            s = CurrentExceptionMessage(); +        } + +        UNIT_ASSERT(s.find("oops") > 0); +    } +} diff --git a/library/cpp/threading/poor_man_openmp/ut/ya.make b/library/cpp/threading/poor_man_openmp/ut/ya.make index 7305d14b997..6d7aa123edc 100644 --- a/library/cpp/threading/poor_man_openmp/ut/ya.make +++ b/library/cpp/threading/poor_man_openmp/ut/ya.make @@ -1,12 +1,12 @@  UNITTEST_FOR(library/cpp/threading/poor_man_openmp) -  +  OWNER(      pg      agorodilov  ) -  -SRCS(  -    thread_helper_ut.cpp  -)  -  -END()  + +SRCS( +    thread_helper_ut.cpp +) + +END() diff --git a/library/cpp/threading/queue/mpsc_htswap.h b/library/cpp/threading/queue/mpsc_htswap.h index 420b1e88292..c42caa7ac02 100644 --- a/library/cpp/threading/queue/mpsc_htswap.h +++ b/library/cpp/threading/queue/mpsc_htswap.h @@ -28,8 +28,8 @@ namespace NThreading {      namespace NHTSwapPrivate {          template <typename T, typename TTuneup>          struct TNode -           : public TTuneup::TNodeBase,  -              public TTuneup::template TNodeLayout<TNode<T, TTuneup>, T> {  +           : public TTuneup::TNodeBase, +              public TTuneup::template TNodeLayout<TNode<T, TTuneup>, T> {              TNode(const T& item) {                  this->Next = nullptr;                  this->Item = item; @@ -60,7 +60,7 @@ namespace NThreading {          template <typename T, typename TTuneup>          class THTSwapQueueImpl -           : protected  TTuneup::template TQueueLayout<TNode<T, TTuneup>> {  +           : protected  TTuneup::template TQueueLayout<TNode<T, TTuneup>> {          protected:              using TTunedNode = TNode<T, TTuneup>; @@ -124,9 +124,9 @@ namespace NThreading {      DeclareTuneTypeParam(THTSwapNodeLayout, TNodeLayout);      DeclareTuneTypeParam(THTSwapQueueLayout, TQueueLayout); -    template <typename T = void*, typename... TParams>  +    template <typename T = void*, typename... TParams>      class THTSwapQueue -       : public NHTSwapPrivate::THTSwapQueueImpl<T,  -                                                  TTune<NHTSwapPrivate::TDefaultTuneup, TParams...>> {  +       : public NHTSwapPrivate::THTSwapQueueImpl<T, +                                                  TTune<NHTSwapPrivate::TDefaultTuneup, TParams...>> {      };  } diff --git a/library/cpp/threading/queue/mpsc_intrusive_unordered.h b/library/cpp/threading/queue/mpsc_intrusive_unordered.h index 97e6131dd47..6ac7537ae9a 100644 --- a/library/cpp/threading/queue/mpsc_intrusive_unordered.h +++ b/library/cpp/threading/queue/mpsc_intrusive_unordered.h @@ -25,7 +25,7 @@ namespace NThreading {          void Push(void* node) noexcept {              Push(reinterpret_cast<TIntrusiveNode*>(node));          } -  +      private:          TIntrusiveNode* HeadForCaS = nullptr;          TIntrusiveNode* HeadForSwap = nullptr; diff --git a/library/cpp/threading/queue/mpsc_read_as_filled.h b/library/cpp/threading/queue/mpsc_read_as_filled.h index 621517328e3..be33ba5a584 100644 --- a/library/cpp/threading/queue/mpsc_read_as_filled.h +++ b/library/cpp/threading/queue/mpsc_read_as_filled.h @@ -132,7 +132,7 @@ namespace NThreading {              TMsgBunch* volatile NextToken;              /* this push can return PUSH_RESULT_BLOCKED */ -            inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) {  +            inline TPushResult Push(TMsgLink msg, ui64 slot, TAux auxiliary) {                  if (Y_UNLIKELY(slot < FirstSlot)) {                      return PUSH_RESULT_BACKWARD;                  } @@ -194,7 +194,7 @@ namespace NThreading {              // the object could be destroyed after this method              inline void SetNextToken(TMsgBunch* next) {                  AtomicSet(NextToken, next); -                if (Y_UNLIKELY(AtomicAdd(Token, RELEASE_SIZE) == RELEASE_SIZE)) {  +                if (Y_UNLIKELY(AtomicAdd(Token, RELEASE_SIZE) == RELEASE_SIZE)) {                      Release(this);                      next->DecrementToken();                  } @@ -317,8 +317,8 @@ namespace NThreading {              }          }; -        template <typename TWBucket = TWriteBucket<>,  -                  template <typename, typename...> class TContainer = TDeque>  +        template <typename TWBucket = TWriteBucket<>, +                  template <typename, typename...> class TContainer = TDeque>          class TReadBucket {          public:              using TAux = typename TWBucket::TUsingAux; @@ -543,7 +543,7 @@ namespace NThreading {              static constexpr ui32 BUNCH_SIZE = DEFAULT_BUNCH_SIZE;              using TBunchBase = TEmpty; -            template <typename TElem, typename... TRest>  +            template <typename TElem, typename... TRest>              using TContainer = TDeque<TElem, TRest...>;              static constexpr bool DeleteItems = true; @@ -556,7 +556,7 @@ namespace NThreading {      DeclareTuneContainer(TRaFQueueSkipContainer, TContainer);      DeclareTuneValueParam(TRaFQueueDeleteItems, bool, DeleteItems); -    template <typename TItem = void, typename... TParams>  +    template <typename TItem = void, typename... TParams>      class TReadAsFilledQueue {      private:          using TTuned = TTune<NReadAsFilledPrivate::TDefaultParams, TParams...>; @@ -565,7 +565,7 @@ namespace NThreading {          using TBunchBase = typename TTuned::TBunchBase; -        template <typename TElem, typename... TRest>  +        template <typename TElem, typename... TRest>          using TContainer =              typename TTuned::template TContainer<TElem, TRest...>; diff --git a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h index 4c85bef6ecd..5f91f1b5a84 100644 --- a/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h +++ b/library/cpp/threading/queue/mpsc_vinfarr_obstructive.h @@ -469,7 +469,7 @@ namespace NThreading {      DeclareTuneTypeParam(TObstructiveQueueBunchBase, TBunchBase);      DeclareTuneTypeParam(TObstructiveQueueAux, TAux); -    template <typename TItem = void, typename... TParams>  +    template <typename TItem = void, typename... TParams>      class TObstructiveConsumerAuxQueue {      private:          using TTuned = @@ -522,7 +522,7 @@ namespace NThreading {      template <typename TItem = void, bool DeleteItems = true>      class TObstructiveConsumerQueue -       : public TObstructiveConsumerAuxQueue<TItem,  -                                              TObstructiveQueueDeleteItems<DeleteItems>> {  +       : public TObstructiveConsumerAuxQueue<TItem, +                                              TObstructiveQueueDeleteItems<DeleteItems>> {      }; -}  +} diff --git a/library/cpp/threading/queue/queue_ut.cpp b/library/cpp/threading/queue/queue_ut.cpp index a55f952cbcb..80eca147da9 100644 --- a/library/cpp/threading/queue/queue_ut.cpp +++ b/library/cpp/threading/queue/queue_ut.cpp @@ -12,7 +12,7 @@ private:      UNIT_TEST(Threads2_Push1M_Threads1_Pop2M)      UNIT_TEST(Threads4_Push1M_Threads1_Pop4M)      UNIT_TEST(Threads8_RndPush100K_Threads8_Queues) -    /*  +    /*      UNIT_TEST(Threads24_RndPush100K_Threads24_Queues)      UNIT_TEST(Threads24_RndPush100K_Threads8_Queues)      UNIT_TEST(Threads24_RndPush100K_Threads4_Queues) diff --git a/library/cpp/threading/queue/tune.h b/library/cpp/threading/queue/tune.h index 1072342620b..50fc3dc17cd 100644 --- a/library/cpp/threading/queue/tune.h +++ b/library/cpp/threading/queue/tune.h @@ -96,14 +96,14 @@          };                                                          \      } -#define DeclareTuneContainer(TParamName, InternalName)              \  -    template <template <typename, typename...> class TNewContainer> \  -    struct TParamName {                                             \  -        template <typename TBase>                                   \  -        struct TApply: public TBase {                               \  -            template <typename TElem, typename... TRest>            \  -            using InternalName = TNewContainer<TElem, TRest...>;    \  -        };                                                          \  +#define DeclareTuneContainer(TParamName, InternalName)              \ +    template <template <typename, typename...> class TNewContainer> \ +    struct TParamName {                                             \ +        template <typename TBase>                                   \ +        struct TApply: public TBase {                               \ +            template <typename TElem, typename... TRest>            \ +            using InternalName = TNewContainer<TElem, TRest...>;    \ +        };                                                          \      }  namespace NTunePrivate { diff --git a/library/cpp/threading/queue/unordered_ut.cpp b/library/cpp/threading/queue/unordered_ut.cpp index 49ebd4a1cfc..a43b7f520e5 100644 --- a/library/cpp/threading/queue/unordered_ut.cpp +++ b/library/cpp/threading/queue/unordered_ut.cpp @@ -59,9 +59,9 @@ public:          class TWorker: public ISimpleThread {          public:              TWorker( -                TQueueType* queues_,  -                ui16 mine,  -                TAtomic* pushDone)  +                TQueueType* queues_, +                ui16 mine, +                TAtomic* pushDone)                  : Queues(queues_)                  , MineQueue(mine)                  , PushDone(pushDone) @@ -132,7 +132,7 @@ public:          for (ui32 i = 0; i < COUNT; ++i) {              workers[i]->Join();              all.insert(all.begin(), -                       workers[i]->Received.begin(), workers[i]->Received.end());  +                       workers[i]->Received.begin(), workers[i]->Received.end());          }          std::sort(all.begin(), all.end()); diff --git a/library/cpp/threading/queue/ut_helpers.h b/library/cpp/threading/queue/ut_helpers.h index b0177637942..2756b52601e 100644 --- a/library/cpp/threading/queue/ut_helpers.h +++ b/library/cpp/threading/queue/ut_helpers.h @@ -13,11 +13,11 @@ struct TBasicReadAsFilled: public NThreading::TReadAsFilledQueue<> {  };  struct TBasicObstructiveConsumer -   : public NThreading::TObstructiveConsumerQueue<> {  +   : public NThreading::TObstructiveConsumerQueue<> {  };  struct TBasicMPSCIntrusiveUnordered -   : public NThreading::TMPSCIntrusiveUnordered {  +   : public NThreading::TMPSCIntrusiveUnordered {  };  struct TIntrusiveLink: public NThreading::TIntrusiveNode { @@ -30,11 +30,11 @@ struct TMPMCUnorderedRing: public NThreading::TMPMCUnorderedRing {      }  }; -#define REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TestTemplate)         \  -    UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicHTSwap>);       \  -    UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicReadAsFilled>); \  +#define REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TestTemplate)         \ +    UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicHTSwap>);       \ +    UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicReadAsFilled>); \      UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicObstructiveConsumer>) -#define REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TestTemplate)                 \  +#define REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TestTemplate)                 \      UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TBasicMPSCIntrusiveUnordered>); \      UNIT_TEST_SUITE_REGISTRATION(TestTemplate<TMPMCUnorderedRing>); diff --git a/library/cpp/threading/skip_list/compare.h b/library/cpp/threading/skip_list/compare.h index 336582a1b89..ac98b3e1ced 100644 --- a/library/cpp/threading/skip_list/compare.h +++ b/library/cpp/threading/skip_list/compare.h @@ -4,74 +4,74 @@  #include <util/str_stl.h>  namespace NThreading { -    namespace NImpl {  -        Y_HAS_MEMBER(compare);  -        Y_HAS_MEMBER(Compare);  +    namespace NImpl { +        Y_HAS_MEMBER(compare); +        Y_HAS_MEMBER(Compare);          template <typename T> -        inline int CompareImpl(const T& l, const T& r) {  -            if (l < r) {  -                return -1;  -            } else if (r < l) {  -                return +1;  -            } else {  -                return 0;  -            }  +        inline int CompareImpl(const T& l, const T& r) { +            if (l < r) { +                return -1; +            } else if (r < l) { +                return +1; +            } else { +                return 0; +            }          } -        template <bool val>  -        struct TSmallCompareSelector {  -            template <typename T>  -            static inline int Compare(const T& l, const T& r) {  -                return CompareImpl(l, r);  -            }  -        };  +        template <bool val> +        struct TSmallCompareSelector { +            template <typename T> +            static inline int Compare(const T& l, const T& r) { +                return CompareImpl(l, r); +            } +        }; -        template <>  -        struct TSmallCompareSelector<true> {  -            template <typename T>  -            static inline int Compare(const T& l, const T& r) {  -                return l.compare(r);  -            }  -        };  +        template <> +        struct TSmallCompareSelector<true> { +            template <typename T> +            static inline int Compare(const T& l, const T& r) { +                return l.compare(r); +            } +        }; -        template <bool val>  -        struct TBigCompareSelector {  -            template <typename T>  -            static inline int Compare(const T& l, const T& r) {  +        template <bool val> +        struct TBigCompareSelector { +            template <typename T> +            static inline int Compare(const T& l, const T& r) {                  return TSmallCompareSelector<THascompare<T>::value>::Compare(l, r); -            }  -        };  -  -        template <>  -        struct TBigCompareSelector<true> {  -            template <typename T>  -            static inline int Compare(const T& l, const T& r) {  -                return l.Compare(r);  -            }  -        };  -  +            } +        }; + +        template <> +        struct TBigCompareSelector<true> { +            template <typename T> +            static inline int Compare(const T& l, const T& r) { +                return l.Compare(r); +            } +        }; +          template <typename T>          struct TCompareSelector: public TBigCompareSelector<THasCompare<T>::value> { -        };  -    }  +        }; +    } + +    //////////////////////////////////////////////////////////////////////////////// +    // Generic compare function -    ////////////////////////////////////////////////////////////////////////////////  -    // Generic compare function  -       template <typename T> -    inline int Compare(const T& l, const T& r) {  -        return NImpl::TCompareSelector<T>::Compare(l, r);  -    }  -  -    ////////////////////////////////////////////////////////////////////////////////  -    // Generic compare functor  -  -    template <typename T>  -    struct TCompare {  -        inline int operator()(const T& l, const T& r) const {  -            return Compare(l, r);  -        }  +    inline int Compare(const T& l, const T& r) { +        return NImpl::TCompareSelector<T>::Compare(l, r); +    } + +    //////////////////////////////////////////////////////////////////////////////// +    // Generic compare functor + +    template <typename T> +    struct TCompare { +        inline int operator()(const T& l, const T& r) const { +            return Compare(l, r); +        }      };  } diff --git a/library/cpp/threading/skip_list/perf/main.cpp b/library/cpp/threading/skip_list/perf/main.cpp index d722d43436c..4ad52049e73 100644 --- a/library/cpp/threading/skip_list/perf/main.cpp +++ b/library/cpp/threading/skip_list/perf/main.cpp @@ -14,345 +14,345 @@  #include <util/system/thread.h>  namespace { -    using namespace NThreading;  +    using namespace NThreading; -    ////////////////////////////////////////////////////////////////////////////////  +    ////////////////////////////////////////////////////////////////////////////////      IOutputStream& LogInfo() { -        return Cerr << TInstant::Now() << " INFO: ";  -    }  +        return Cerr << TInstant::Now() << " INFO: "; +    }      IOutputStream& LogError() { -        return Cerr << TInstant::Now() << " ERROR: ";  -    }  - -    ////////////////////////////////////////////////////////////////////////////////  - -    struct TListItem {  -        TStringBuf Key;  -        TStringBuf Value;  - -        TListItem(const TStringBuf& key, const TStringBuf& value)  -            : Key(key)  -            , Value(value)  -        {  -        }  - -        int Compare(const TListItem& other) const {  -            return Key.compare(other.Key);  -        }  -    };  - -    using TListType = TSkipList<TListItem>;  - -    ////////////////////////////////////////////////////////////////////////////////  - -    class TRandomData {  -    private:  -        TVector<char> Buffer;  - -    public:  -        TRandomData()  -            : Buffer(1024 * 1024)  -        {  -            for (size_t i = 0; i < Buffer.size(); ++i) {  -                Buffer[i] = RandomNumber<char>();  -            }  -        }  - -        TStringBuf GetString(size_t len) const {  -            size_t start = RandomNumber(Buffer.size() - len);  -            return TStringBuf(&Buffer[start], len);  +        return Cerr << TInstant::Now() << " ERROR: "; +    } + +    //////////////////////////////////////////////////////////////////////////////// + +    struct TListItem { +        TStringBuf Key; +        TStringBuf Value; + +        TListItem(const TStringBuf& key, const TStringBuf& value) +            : Key(key) +            , Value(value) +        { +        } + +        int Compare(const TListItem& other) const { +            return Key.compare(other.Key); +        } +    }; + +    using TListType = TSkipList<TListItem>; + +    //////////////////////////////////////////////////////////////////////////////// + +    class TRandomData { +    private: +        TVector<char> Buffer; + +    public: +        TRandomData() +            : Buffer(1024 * 1024) +        { +            for (size_t i = 0; i < Buffer.size(); ++i) { +                Buffer[i] = RandomNumber<char>(); +            } +        } + +        TStringBuf GetString(size_t len) const { +            size_t start = RandomNumber(Buffer.size() - len); +            return TStringBuf(&Buffer[start], len); +        } + +        TStringBuf GetString(size_t min, size_t max) const { +            return GetString(min + RandomNumber(max - min));          } +    }; -        TStringBuf GetString(size_t min, size_t max) const {  -            return GetString(min + RandomNumber(max - min));  -        }  -    };  - -    ////////////////////////////////////////////////////////////////////////////////  - -    class TWorkerThread: public ISimpleThread {  -    private:  -        std::function<void()> Func;  -        TDuration Time;  - -    public:  -        TWorkerThread(std::function<void()> func)  -            : Func(func)  -        {  -        }  - -        TDuration GetTime() const {  -            return Time;  -        }  - -    private:  -        void* ThreadProc() noexcept override {  -            TInstant started = TInstant::Now();  -            Func();  -            Time = TInstant::Now() - started;  -            return nullptr;  -        }  -    };  - -    inline TAutoPtr<TWorkerThread> StartThread(std::function<void()> func) {  -        TAutoPtr<TWorkerThread> thread = new TWorkerThread(func);  -        thread->Start();  -        return thread;  +    //////////////////////////////////////////////////////////////////////////////// + +    class TWorkerThread: public ISimpleThread { +    private: +        std::function<void()> Func; +        TDuration Time; + +    public: +        TWorkerThread(std::function<void()> func) +            : Func(func) +        { +        } + +        TDuration GetTime() const { +            return Time; +        } + +    private: +        void* ThreadProc() noexcept override { +            TInstant started = TInstant::Now(); +            Func(); +            Time = TInstant::Now() - started; +            return nullptr; +        } +    }; + +    inline TAutoPtr<TWorkerThread> StartThread(std::function<void()> func) { +        TAutoPtr<TWorkerThread> thread = new TWorkerThread(func); +        thread->Start(); +        return thread;      } -    ////////////////////////////////////////////////////////////////////////////////  - -    typedef std::function<void()> TTestFunc;  - -    struct TTest {  -        TString Name;  -        TTestFunc Func;  - -        TTest() {  -        }  - -        TTest(const TString& name, const TTestFunc& func)  -            : Name(name)  -            , Func(func)  -        {  -        }  -    };  - -    ////////////////////////////////////////////////////////////////////////////////  - -    class TTestSuite {  -    private:  -        size_t Iterations = 1000000;  -        size_t KeyLen = 10;  -        size_t ValueLen = 100;  -        size_t NumReaders = 4;  -        size_t NumWriters = 1;  -        size_t BatchSize = 20;  - -        TMemoryPool MemoryPool;  -        TListType List;  -        TMutex Mutex;  -        TRandomData Random;  - -        TMap<TCiString, TTest> AllTests;  -        TVector<TTest> Tests;  - -    public:  -        TTestSuite()  -            : MemoryPool(64 * 1024)  -            , List(MemoryPool)  -        {  -        }  - -        bool Init(int argc, const char* argv[]) {  -            TVector<TString> tests;  -            try {  -                NLastGetopt::TOpts opts;  -                opts.AddHelpOption();  - -#define OPTION(opt, x)             \  -    opts.AddLongOption(opt, #x)    \  -        .Optional()                \  -        .DefaultValue(ToString(x)) \  -        .StoreResult(&x) // end of OPTION  - -                OPTION('i', Iterations);  -                OPTION('k', KeyLen);  -                OPTION('v', ValueLen);  -                OPTION('r', NumReaders);  -                OPTION('w', NumWriters);  -                OPTION('b', BatchSize);  +    //////////////////////////////////////////////////////////////////////////////// + +    typedef std::function<void()> TTestFunc; + +    struct TTest { +        TString Name; +        TTestFunc Func; + +        TTest() { +        } + +        TTest(const TString& name, const TTestFunc& func) +            : Name(name) +            , Func(func) +        { +        } +    }; + +    //////////////////////////////////////////////////////////////////////////////// + +    class TTestSuite { +    private: +        size_t Iterations = 1000000; +        size_t KeyLen = 10; +        size_t ValueLen = 100; +        size_t NumReaders = 4; +        size_t NumWriters = 1; +        size_t BatchSize = 20; + +        TMemoryPool MemoryPool; +        TListType List; +        TMutex Mutex; +        TRandomData Random; + +        TMap<TCiString, TTest> AllTests; +        TVector<TTest> Tests; + +    public: +        TTestSuite() +            : MemoryPool(64 * 1024) +            , List(MemoryPool) +        { +        } + +        bool Init(int argc, const char* argv[]) { +            TVector<TString> tests; +            try { +                NLastGetopt::TOpts opts; +                opts.AddHelpOption(); + +#define OPTION(opt, x)             \ +    opts.AddLongOption(opt, #x)    \ +        .Optional()                \ +        .DefaultValue(ToString(x)) \ +        .StoreResult(&x) // end of OPTION + +                OPTION('i', Iterations); +                OPTION('k', KeyLen); +                OPTION('v', ValueLen); +                OPTION('r', NumReaders); +                OPTION('w', NumWriters); +                OPTION('b', BatchSize);  #undef OPTION -                NLastGetopt::TOptsParseResultException optsRes(&opts, argc, argv);  -                for (const auto& opt : opts.Opts_) {  -                    const NLastGetopt::TOptParseResult* r = optsRes.FindOptParseResult(opt.Get(), true);  -                    if (r) {  -                        LogInfo() << "[-" << opt->GetChar() << "] " << opt->GetName() << ": " << r->Back() << Endl;  -                    }  +                NLastGetopt::TOptsParseResultException optsRes(&opts, argc, argv); +                for (const auto& opt : opts.Opts_) { +                    const NLastGetopt::TOptParseResult* r = optsRes.FindOptParseResult(opt.Get(), true); +                    if (r) { +                        LogInfo() << "[-" << opt->GetChar() << "] " << opt->GetName() << ": " << r->Back() << Endl; +                    }                  } -                tests = optsRes.GetFreeArgs();  -            } catch (...) {  -                LogError() << CurrentExceptionMessage() << Endl;  -                return false;  +                tests = optsRes.GetFreeArgs(); +            } catch (...) { +                LogError() << CurrentExceptionMessage() << Endl; +                return false;              }  #define TEST(type) \ -    AddTest(#type, std::bind(&TTestSuite::Y_CAT(TEST_, type), this)) // end of TEST  +    AddTest(#type, std::bind(&TTestSuite::Y_CAT(TEST_, type), this)) // end of TEST -            TEST(Clear);  -            TEST(InsertRandom);  -            TEST(InsertSequential);  -            TEST(InsertSequentialSimple);  -            TEST(LookupRandom);  -            TEST(Concurrent);  +            TEST(Clear); +            TEST(InsertRandom); +            TEST(InsertSequential); +            TEST(InsertSequentialSimple); +            TEST(LookupRandom); +            TEST(Concurrent);  #undef TEST -            if (tests.empty()) {  -                LogError() << "no tests specified, choose from: " << PrintTests() << Endl;  +            if (tests.empty()) { +                LogError() << "no tests specified, choose from: " << PrintTests() << Endl;                  return false;              } -  -            for (size_t i = 0; i < tests.size(); ++i) {  + +            for (size_t i = 0; i < tests.size(); ++i) {                  if (!AllTests.contains(tests[i])) { -                    LogError() << "unknown test name: " << tests[i] << Endl;  -                    return false;  -                }  -                Tests.push_back(AllTests[tests[i]]);  -            }  -  -            return true;  +                    LogError() << "unknown test name: " << tests[i] << Endl; +                    return false; +                } +                Tests.push_back(AllTests[tests[i]]); +            } + +            return true;          } -        void Run() {  +        void Run() {  #if !defined(NDEBUG) -            LogInfo() << "*** DEBUG build! ***" << Endl;  +            LogInfo() << "*** DEBUG build! ***" << Endl;  #endif -            for (const TTest& test : Tests) {  -                LogInfo() << "Starting test " << test.Name << Endl;  - -                TInstant started = TInstant::Now();  -                try {  -                    test.Func();  -                } catch (...) {  -                    LogError() << "test " << test.Name  -                               << " failed: " << CurrentExceptionMessage()  -                               << Endl;  -                }  - -                LogInfo() << "List size = " << List.GetSize() << Endl;  - -                TDuration duration = TInstant::Now() - started;  -                LogInfo() << "test " << test.Name  -                          << " duration: " << duration  -                          << " (" << (double)duration.MicroSeconds() / (Iterations * NumWriters) << "us per iteration)"  -                          << Endl;  -                LogInfo() << "Finished test " << test.Name << Endl;  -            }  +            for (const TTest& test : Tests) { +                LogInfo() << "Starting test " << test.Name << Endl; + +                TInstant started = TInstant::Now(); +                try { +                    test.Func(); +                } catch (...) { +                    LogError() << "test " << test.Name +                               << " failed: " << CurrentExceptionMessage() +                               << Endl; +                } + +                LogInfo() << "List size = " << List.GetSize() << Endl; + +                TDuration duration = TInstant::Now() - started; +                LogInfo() << "test " << test.Name +                          << " duration: " << duration +                          << " (" << (double)duration.MicroSeconds() / (Iterations * NumWriters) << "us per iteration)" +                          << Endl; +                LogInfo() << "Finished test " << test.Name << Endl; +            } +        } + +    private: +        void AddTest(const char* name, TTestFunc func) { +            AllTests[name] = TTest(name, func);          } -    private:  -        void AddTest(const char* name, TTestFunc func) {  -            AllTests[name] = TTest(name, func);  -        }  - -        TString PrintTests() const {  -            TVector<TString> names;  -            for (const auto& it : AllTests) {  -                names.push_back(it.first);  -            }  -            return JoinSeq(", ", names);  +        TString PrintTests() const { +            TVector<TString> names; +            for (const auto& it : AllTests) { +                names.push_back(it.first); +            } +            return JoinSeq(", ", names);          } -        void TEST_Clear() {  -            List.Clear();  -        }  +        void TEST_Clear() { +            List.Clear(); +        } -        void TEST_InsertRandom() {  -            for (size_t i = 0; i < Iterations; ++i) {  -                List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen)));  -            }  +        void TEST_InsertRandom() { +            for (size_t i = 0; i < Iterations; ++i) { +                List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen))); +            }          } -        void TEST_InsertSequential() {  -            TString key;  -            for (size_t i = 0; i < Iterations;) {  -                key.assign(Random.GetString(KeyLen));  -                size_t batch = BatchSize / 2 + RandomNumber(BatchSize);  -                for (size_t j = 0; j < batch; ++j, ++i) {  -                    key.resize(KeyLen - 1);  -                    key.append((char)j);  -                    List.Insert(TListItem(key, Random.GetString(ValueLen)));  -                }  +        void TEST_InsertSequential() { +            TString key; +            for (size_t i = 0; i < Iterations;) { +                key.assign(Random.GetString(KeyLen)); +                size_t batch = BatchSize / 2 + RandomNumber(BatchSize); +                for (size_t j = 0; j < batch; ++j, ++i) { +                    key.resize(KeyLen - 1); +                    key.append((char)j); +                    List.Insert(TListItem(key, Random.GetString(ValueLen))); +                }              }          } -        void TEST_InsertSequentialSimple() {  -            for (size_t i = 0; i < Iterations; ++i) {  -                List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen)));  -            }  +        void TEST_InsertSequentialSimple() { +            for (size_t i = 0; i < Iterations; ++i) { +                List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen))); +            }          } -        void TEST_LookupRandom() {  -            for (size_t i = 0; i < Iterations; ++i) {  -                List.SeekTo(TListItem(Random.GetString(KeyLen), TStringBuf()));  -            }  +        void TEST_LookupRandom() { +            for (size_t i = 0; i < Iterations; ++i) { +                List.SeekTo(TListItem(Random.GetString(KeyLen), TStringBuf())); +            }          } -        void TEST_Concurrent() {  -            LogInfo() << "starting producers..." << Endl;  - -            TVector<TAutoPtr<TWorkerThread>> producers(NumWriters);  -            for (size_t i1 = 0; i1 < producers.size(); ++i1) {  -                producers[i1] = StartThread([&] {  -                    TInstant started = TInstant::Now();  -                    for (size_t i2 = 0; i2 < Iterations; ++i2) {  -                        {  -                            TGuard<TMutex> guard(Mutex);  -                            List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen)));  -                        }  +        void TEST_Concurrent() { +            LogInfo() << "starting producers..." << Endl; + +            TVector<TAutoPtr<TWorkerThread>> producers(NumWriters); +            for (size_t i1 = 0; i1 < producers.size(); ++i1) { +                producers[i1] = StartThread([&] { +                    TInstant started = TInstant::Now(); +                    for (size_t i2 = 0; i2 < Iterations; ++i2) { +                        { +                            TGuard<TMutex> guard(Mutex); +                            List.Insert(TListItem(Random.GetString(KeyLen), Random.GetString(ValueLen))); +                        } +                    } +                    TDuration duration = TInstant::Now() - started; +                    LogInfo() +                        << "Average time for producer = " +                        << (double)duration.MicroSeconds() / Iterations << "us per iteration" +                        << Endl; +                }); +            } + +            LogInfo() << "starting consumers..." << Endl; + +            TVector<TAutoPtr<TWorkerThread>> consumers(NumReaders); +            for (size_t i1 = 0; i1 < consumers.size(); ++i1) { +                consumers[i1] = StartThread([&] { +                    TInstant started = TInstant::Now(); +                    for (size_t i2 = 0; i2 < Iterations; ++i2) { +                        List.SeekTo(TListItem(Random.GetString(KeyLen), TStringBuf()));                      } -                    TDuration duration = TInstant::Now() - started;  -                    LogInfo()  -                        << "Average time for producer = "  -                        << (double)duration.MicroSeconds() / Iterations << "us per iteration"  -                        << Endl;  -                });  -            }  - -            LogInfo() << "starting consumers..." << Endl;  - -            TVector<TAutoPtr<TWorkerThread>> consumers(NumReaders);  -            for (size_t i1 = 0; i1 < consumers.size(); ++i1) {  -                consumers[i1] = StartThread([&] {  -                    TInstant started = TInstant::Now();  -                    for (size_t i2 = 0; i2 < Iterations; ++i2) {  -                        List.SeekTo(TListItem(Random.GetString(KeyLen), TStringBuf()));  -                    }  -                    TDuration duration = TInstant::Now() - started;  -                    LogInfo()  -                        << "Average time for consumer = "  -                        << (double)duration.MicroSeconds() / Iterations << "us per iteration"  -                        << Endl;  -                });  -            }  - -            LogInfo() << "wait for producers..." << Endl;  - -            TDuration producerTime;  -            for (size_t i = 0; i < producers.size(); ++i) {  -                producers[i]->Join();  -                producerTime += producers[i]->GetTime();  -            }  - -            LogInfo() << "wait for consumers..." << Endl;  - -            TDuration consumerTime;  -            for (size_t i = 0; i < consumers.size(); ++i) {  -                consumers[i]->Join();  -                consumerTime += consumers[i]->GetTime();  -            }  - -            LogInfo() << "average producer time: "  -                      << producerTime.SecondsFloat() / producers.size() << " seconds"  -                      << Endl;  - -            LogInfo() << "average consumer time: "  -                      << consumerTime.SecondsFloat() / consumers.size() << " seconds"  -                      << Endl;  -        }  -    };  - -}  +                    TDuration duration = TInstant::Now() - started; +                    LogInfo() +                        << "Average time for consumer = " +                        << (double)duration.MicroSeconds() / Iterations << "us per iteration" +                        << Endl; +                }); +            } + +            LogInfo() << "wait for producers..." << Endl; + +            TDuration producerTime; +            for (size_t i = 0; i < producers.size(); ++i) { +                producers[i]->Join(); +                producerTime += producers[i]->GetTime(); +            } + +            LogInfo() << "wait for consumers..." << Endl; + +            TDuration consumerTime; +            for (size_t i = 0; i < consumers.size(); ++i) { +                consumers[i]->Join(); +                consumerTime += consumers[i]->GetTime(); +            } + +            LogInfo() << "average producer time: " +                      << producerTime.SecondsFloat() / producers.size() << " seconds" +                      << Endl; + +            LogInfo() << "average consumer time: " +                      << consumerTime.SecondsFloat() / consumers.size() << " seconds" +                      << Endl; +        } +    }; + +}  //////////////////////////////////////////////////////////////////////////////// -int main(int argc, const char* argv[]) {  +int main(int argc, const char* argv[]) {      TTestSuite suite;      if (!suite.Init(argc, argv)) {          return -1; diff --git a/library/cpp/threading/skip_list/skiplist.h b/library/cpp/threading/skip_list/skiplist.h index c1ed46c4aa4..914a7c6ee76 100644 --- a/library/cpp/threading/skip_list/skiplist.h +++ b/library/cpp/threading/skip_list/skiplist.h @@ -10,399 +10,399 @@  #include <util/system/atomic.h>  namespace NThreading { -    ////////////////////////////////////////////////////////////////////////////////  +    //////////////////////////////////////////////////////////////////////////////// -    class TNopCounter {  -    protected:  -        template <typename T>  -        void OnInsert(const T&) {  -        }  +    class TNopCounter { +    protected: +        template <typename T> +        void OnInsert(const T&) { +        } -        template <typename T>  -        void OnUpdate(const T&) {  -        }  +        template <typename T> +        void OnUpdate(const T&) { +        } -        void Reset() {  -        }  -    };  +        void Reset() { +        } +    }; -    ////////////////////////////////////////////////////////////////////////////////  +    //////////////////////////////////////////////////////////////////////////////// -    class TSizeCounter {  +    class TSizeCounter {      private: -        size_t Size;  +        size_t Size;      public: -        TSizeCounter()  -            : Size(0)  +        TSizeCounter() +            : Size(0)          {          } -        size_t GetSize() const {  -            return Size;  +        size_t GetSize() const { +            return Size;          } -    protected:  -        template <typename T>  -        void OnInsert(const T&) {  -            ++Size;  +    protected: +        template <typename T> +        void OnInsert(const T&) { +            ++Size;          } -        template <typename T>  -        void OnUpdate(const T&) {  +        template <typename T> +        void OnUpdate(const T&) {          } -        void Reset() {  -            Size = 0;  +        void Reset() { +            Size = 0;          }      }; -    ////////////////////////////////////////////////////////////////////////////////  -    // Append-only concurrent skip-list  -    //  -    // Readers do not require any synchronization.  -    // Writers should be externally synchronized.  -    // Nodes will be allocated using TMemoryPool instance.  - -    template <  -        typename T,  -        typename TComparer = TCompare<T>,  -        typename TAllocator = TMemoryPool,  -        typename TCounter = TSizeCounter,  -        int MaxHeight = 12,  -        int Branching = 4>  -    class TSkipList: public TCounter, private TNonCopyable {  -        class TNode {  -        private:  -            T Value;       // should be immutable after insert  -            TNode* Next[]; // variable-size array maximum of MaxHeight values  -  -        public:  +    //////////////////////////////////////////////////////////////////////////////// +    // Append-only concurrent skip-list +    // +    // Readers do not require any synchronization. +    // Writers should be externally synchronized. +    // Nodes will be allocated using TMemoryPool instance. + +    template < +        typename T, +        typename TComparer = TCompare<T>, +        typename TAllocator = TMemoryPool, +        typename TCounter = TSizeCounter, +        int MaxHeight = 12, +        int Branching = 4> +    class TSkipList: public TCounter, private TNonCopyable { +        class TNode { +        private: +            T Value;       // should be immutable after insert +            TNode* Next[]; // variable-size array maximum of MaxHeight values + +        public:              TNode(T&& value)                  : Value(std::move(value)) -            {  -                Y_UNUSED(Next);  -            }  -  -            const T& GetValue() const {  -                return Value;  -            }  -  -            T& GetValue() {  -                return Value;  -            }  -  -            TNode* GetNext(int height) const {  -                return AtomicGet(Next[height]);  -            }  -  -            void Link(int height, TNode** prev) {  -                for (int i = 0; i < height; ++i) {  -                    Next[i] = prev[i]->Next[i];  -                    AtomicSet(prev[i]->Next[i], this);  -                }  -            }  -        };  -  +            { +                Y_UNUSED(Next); +            } + +            const T& GetValue() const { +                return Value; +            } + +            T& GetValue() { +                return Value; +            } + +            TNode* GetNext(int height) const { +                return AtomicGet(Next[height]); +            } + +            void Link(int height, TNode** prev) { +                for (int i = 0; i < height; ++i) { +                    Next[i] = prev[i]->Next[i]; +                    AtomicSet(prev[i]->Next[i], this); +                } +            } +        }; +      public: -        class TIterator {  -        private:  -            const TSkipList* List;  -            const TNode* Node;  - -        public:  -            TIterator()  -                : List(nullptr)  -                , Node(nullptr)  -            {  -            }  - -            TIterator(const TSkipList* list, const TNode* node)  -                : List(list)  -                , Node(node)  -            {  -            }  - -            TIterator(const TIterator& other)  -                : List(other.List)  -                , Node(other.Node)  -            {  -            }  - -            TIterator& operator=(const TIterator& other) {  -                List = other.List;  -                Node = other.Node;  -                return *this;  -            }  - -            void Next() {  -                Node = Node ? Node->GetNext(0) : nullptr;  +        class TIterator { +        private: +            const TSkipList* List; +            const TNode* Node; + +        public: +            TIterator() +                : List(nullptr) +                , Node(nullptr) +            { +            } + +            TIterator(const TSkipList* list, const TNode* node) +                : List(list) +                , Node(node) +            { +            } + +            TIterator(const TIterator& other) +                : List(other.List) +                , Node(other.Node) +            {              } -            // much less efficient than Next as our list is single-linked  -            void Prev() {  -                if (Node) {  -                    TNode* node = List->FindLessThan(Node->GetValue(), nullptr);  -                    Node = (node != List->Head ? node : nullptr);  -                }  -            }  +            TIterator& operator=(const TIterator& other) { +                List = other.List; +                Node = other.Node; +                return *this; +            } -            void Reset() {  -                Node = nullptr;  -            }  +            void Next() { +                Node = Node ? Node->GetNext(0) : nullptr; +            } -            bool IsValid() const {  -                return Node != nullptr;  -            }  +            // much less efficient than Next as our list is single-linked +            void Prev() { +                if (Node) { +                    TNode* node = List->FindLessThan(Node->GetValue(), nullptr); +                    Node = (node != List->Head ? node : nullptr); +                } +            } + +            void Reset() { +                Node = nullptr; +            } + +            bool IsValid() const { +                return Node != nullptr; +            } -            const T& GetValue() const {  -                Y_ASSERT(IsValid());  -                return Node->GetValue();  -            }  -        };  +            const T& GetValue() const { +                Y_ASSERT(IsValid()); +                return Node->GetValue(); +            } +        }; -    private:  -        TAllocator& Allocator;  -        TComparer Comparer;  +    private: +        TAllocator& Allocator; +        TComparer Comparer; -        TNode* Head;  -        TAtomic Height;  -        TCounter Counter;  +        TNode* Head; +        TAtomic Height; +        TCounter Counter; -        TNode* Prev[MaxHeight];  +        TNode* Prev[MaxHeight]; -        template <typename TValue>  +        template <typename TValue>          using TComparerReturnType = std::invoke_result_t<TComparer, const T&, const TValue&>; -    public:  -        TSkipList(TAllocator& allocator, const TComparer& comparer = TComparer())  -            : Allocator(allocator)  -            , Comparer(comparer)  -        {  -            Init();  -        }  - -        ~TSkipList() {  -            CallDtors();  -        }  - -        void Clear() {  -            CallDtors();  -            Allocator.ClearKeepFirstChunk();  -            Init();  +    public: +        TSkipList(TAllocator& allocator, const TComparer& comparer = TComparer()) +            : Allocator(allocator) +            , Comparer(comparer) +        { +            Init(); +        } + +        ~TSkipList() { +            CallDtors(); +        } + +        void Clear() { +            CallDtors(); +            Allocator.ClearKeepFirstChunk(); +            Init();          }          bool Insert(T value) { -            TNode* node = PrepareInsert(value);  -            if (Y_UNLIKELY(node && Compare(node, value) == 0)) {  -                // we do not allow duplicates  -                return false;  +            TNode* node = PrepareInsert(value); +            if (Y_UNLIKELY(node && Compare(node, value) == 0)) { +                // we do not allow duplicates +                return false;              }              node = DoInsert(std::move(value)); -            TCounter::OnInsert(node->GetValue());  -            return true;  +            TCounter::OnInsert(node->GetValue()); +            return true;          } -        template <typename TInsertAction, typename TUpdateAction>  -        bool Insert(const T& value, TInsertAction insert, TUpdateAction update) {  -            TNode* node = PrepareInsert(value);  -            if (Y_UNLIKELY(node && Compare(node, value) == 0)) {  -                if (update(node->GetValue())) {  -                    TCounter::OnUpdate(node->GetValue());  -                    return true;  -                }  -                // we do not allow duplicates  -                return false;  -            }  -            node = DoInsert(insert(value));  -            TCounter::OnInsert(node->GetValue());  -            return true;  -        }  - -        template <typename TValue>  -        bool Contains(const TValue& value) const {  -            TNode* node = FindGreaterThanOrEqual(value);  -            return node && Compare(node, value) == 0;  -        }  - -        TIterator SeekToFirst() const {  -            return TIterator(this, FindFirst());  -        }  - -        TIterator SeekToLast() const {  -            TNode* last = FindLast();  -            return TIterator(this, last != Head ? last : nullptr);  -        }  - -        template <typename TValue>  -        TIterator SeekTo(const TValue& value) const {  -            return TIterator(this, FindGreaterThanOrEqual(value));  +        template <typename TInsertAction, typename TUpdateAction> +        bool Insert(const T& value, TInsertAction insert, TUpdateAction update) { +            TNode* node = PrepareInsert(value); +            if (Y_UNLIKELY(node && Compare(node, value) == 0)) { +                if (update(node->GetValue())) { +                    TCounter::OnUpdate(node->GetValue()); +                    return true; +                } +                // we do not allow duplicates +                return false; +            } +            node = DoInsert(insert(value)); +            TCounter::OnInsert(node->GetValue()); +            return true; +        } + +        template <typename TValue> +        bool Contains(const TValue& value) const { +            TNode* node = FindGreaterThanOrEqual(value); +            return node && Compare(node, value) == 0; +        } + +        TIterator SeekToFirst() const { +            return TIterator(this, FindFirst()); +        } + +        TIterator SeekToLast() const { +            TNode* last = FindLast(); +            return TIterator(this, last != Head ? last : nullptr); +        } + +        template <typename TValue> +        TIterator SeekTo(const TValue& value) const { +            return TIterator(this, FindGreaterThanOrEqual(value));          } -    private:  -        static int RandomHeight() {  -            int height = 1;  -            while (height < MaxHeight && (RandomNumber<unsigned int>() % Branching) == 0) {  -                ++height;  -            }  -            return height;  -        }  - -        void Init() {  -            Head = AllocateRootNode();  -            Height = 1;  -            TCounter::Reset();  -  -            for (int i = 0; i < MaxHeight; ++i) {  -                Prev[i] = Head;  -            }  +    private: +        static int RandomHeight() { +            int height = 1; +            while (height < MaxHeight && (RandomNumber<unsigned int>() % Branching) == 0) { +                ++height; +            } +            return height;          } -        void CallDtors() {  -            if (!TTypeTraits<T>::IsPod) {  -                // we should explicitly call destructors for our nodes  -                TNode* node = Head->GetNext(0);  -                while (node) {  -                    TNode* next = node->GetNext(0);  -                    node->~TNode();  -                    node = next;  -                }  +        void Init() { +            Head = AllocateRootNode(); +            Height = 1; +            TCounter::Reset(); + +            for (int i = 0; i < MaxHeight; ++i) { +                Prev[i] = Head;              }          } -        TNode* AllocateRootNode() {  -            size_t size = sizeof(TNode) + sizeof(TNode*) * MaxHeight;  -            void* buffer = Allocator.Allocate(size);  -            memset(buffer, 0, size);  -            return static_cast<TNode*>(buffer);  -        }  +        void CallDtors() { +            if (!TTypeTraits<T>::IsPod) { +                // we should explicitly call destructors for our nodes +                TNode* node = Head->GetNext(0); +                while (node) { +                    TNode* next = node->GetNext(0); +                    node->~TNode(); +                    node = next; +                } +            } +        } + +        TNode* AllocateRootNode() { +            size_t size = sizeof(TNode) + sizeof(TNode*) * MaxHeight; +            void* buffer = Allocator.Allocate(size); +            memset(buffer, 0, size); +            return static_cast<TNode*>(buffer); +        }          TNode* AllocateNode(T&& value, int height) { -            size_t size = sizeof(TNode) + sizeof(TNode*) * height;  -            void* buffer = Allocator.Allocate(size);  -            memset(buffer, 0, size);  +            size_t size = sizeof(TNode) + sizeof(TNode*) * height; +            void* buffer = Allocator.Allocate(size); +            memset(buffer, 0, size);              return new (buffer) TNode(std::move(value)); -        }  - -        TNode* FindFirst() const {  -            return Head->GetNext(0);  -        }  - -        TNode* FindLast() const {  -            TNode* node = Head;  -            int height = AtomicGet(Height) - 1;  - -            while (true) {  -                TNode* next = node->GetNext(height);  -                if (next) {  -                    node = next;  -                    continue;  -                }  - -                if (height) {  -                    --height;  -                } else {  -                    return node;  -                }  +        } + +        TNode* FindFirst() const { +            return Head->GetNext(0); +        } + +        TNode* FindLast() const { +            TNode* node = Head; +            int height = AtomicGet(Height) - 1; + +            while (true) { +                TNode* next = node->GetNext(height); +                if (next) { +                    node = next; +                    continue; +                } + +                if (height) { +                    --height; +                } else { +                    return node; +                }              }          } -        template <typename TValue>  -        TComparerReturnType<TValue> Compare(const TNode* node, const TValue& value) const {  -            return Comparer(node->GetValue(), value);  -        }  - -        template <typename TValue>  -        TNode* FindLessThan(const TValue& value, TNode** links) const {  -            TNode* node = Head;  -            int height = AtomicGet(Height) - 1;  - -            TNode* prev = nullptr;  -            while (true) {  -                TNode* next = node->GetNext(height);  -                if (next && next != prev) {  -                    TComparerReturnType<TValue> cmp = Compare(next, value);  -                    if (cmp < 0) {  -                        node = next;  -                        continue;  -                    }  +        template <typename TValue> +        TComparerReturnType<TValue> Compare(const TNode* node, const TValue& value) const { +            return Comparer(node->GetValue(), value); +        } + +        template <typename TValue> +        TNode* FindLessThan(const TValue& value, TNode** links) const { +            TNode* node = Head; +            int height = AtomicGet(Height) - 1; + +            TNode* prev = nullptr; +            while (true) { +                TNode* next = node->GetNext(height); +                if (next && next != prev) { +                    TComparerReturnType<TValue> cmp = Compare(next, value); +                    if (cmp < 0) { +                        node = next; +                        continue; +                    }                  } -                if (links) {  -                    // collect links from upper levels  -                    links[height] = node;  -                }  - -                if (height) {  -                    prev = next;  -                    --height;  -                } else {  -                    return node;  -                }  +                if (links) { +                    // collect links from upper levels +                    links[height] = node; +                } + +                if (height) { +                    prev = next; +                    --height; +                } else { +                    return node; +                }              }          } -        template <typename TValue>  -        TNode* FindGreaterThanOrEqual(const TValue& value) const {  -            TNode* node = Head;  -            int height = AtomicGet(Height) - 1;  - -            TNode* prev = nullptr;  -            while (true) {  -                TNode* next = node->GetNext(height);  -                if (next && next != prev) {  -                    TComparerReturnType<TValue> cmp = Compare(next, value);  -                    if (cmp < 0) {  -                        node = next;  -                        continue;  -                    }  -                    if (cmp == 0) {  -                        return next;  -                    }  +        template <typename TValue> +        TNode* FindGreaterThanOrEqual(const TValue& value) const { +            TNode* node = Head; +            int height = AtomicGet(Height) - 1; + +            TNode* prev = nullptr; +            while (true) { +                TNode* next = node->GetNext(height); +                if (next && next != prev) { +                    TComparerReturnType<TValue> cmp = Compare(next, value); +                    if (cmp < 0) { +                        node = next; +                        continue; +                    } +                    if (cmp == 0) { +                        return next; +                    }                  } -  -                if (height) {  -                    prev = next;  -                    --height;  -                } else {  + +                if (height) { +                    prev = next; +                    --height; +                } else {                      return next;                  }              } -        }  +        } -        TNode* PrepareInsert(const T& value) {  -            TNode* prev = Prev[0];  -            TNode* next = prev->GetNext(0);  -            if ((prev == Head || Compare(prev, value) < 0) && (next == nullptr || Compare(next, value) >= 0)) {  -                // avoid seek in case of sequential insert  +        TNode* PrepareInsert(const T& value) { +            TNode* prev = Prev[0]; +            TNode* next = prev->GetNext(0); +            if ((prev == Head || Compare(prev, value) < 0) && (next == nullptr || Compare(next, value) >= 0)) { +                // avoid seek in case of sequential insert              } else { -                prev = FindLessThan(value, Prev);  -                next = prev->GetNext(0);  +                prev = FindLessThan(value, Prev); +                next = prev->GetNext(0);              } -            return next;  +            return next;          }          TNode* DoInsert(T&& value) { -            // choose level to place new node  -            int currentHeight = AtomicGet(Height);  -            int height = RandomHeight();  -            if (height > currentHeight) {  -                for (int i = currentHeight; i < height; ++i) {  -                    // head should link to all levels  -                    Prev[i] = Head;  -                }  -                AtomicSet(Height, height);  +            // choose level to place new node +            int currentHeight = AtomicGet(Height); +            int height = RandomHeight(); +            if (height > currentHeight) { +                for (int i = currentHeight; i < height; ++i) { +                    // head should link to all levels +                    Prev[i] = Head; +                } +                AtomicSet(Height, height);              }              TNode* node = AllocateNode(std::move(value), height); -            node->Link(height, Prev);  +            node->Link(height, Prev); -            // keep last inserted node to optimize sequential inserts  -            for (int i = 0; i < height; i++) {  -                Prev[i] = node;  -            }  -            return node;  +            // keep last inserted node to optimize sequential inserts +            for (int i = 0; i < height; i++) { +                Prev[i] = node; +            } +            return node;          } -    };  +    }; -}  +} diff --git a/library/cpp/threading/skip_list/skiplist_ut.cpp b/library/cpp/threading/skip_list/skiplist_ut.cpp index fdc831dffd2..52fcffda661 100644 --- a/library/cpp/threading/skip_list/skiplist_ut.cpp +++ b/library/cpp/threading/skip_list/skiplist_ut.cpp @@ -3,41 +3,41 @@  #include <library/cpp/testing/unittest/registar.h>  namespace NThreading { -    namespace {  -        struct TTestObject {  -            static size_t Count;  -            int Tag;  - -            TTestObject(int tag)  -                : Tag(tag)  -            {  -                ++Count;  -            }  - -            TTestObject(const TTestObject& other)  -                : Tag(other.Tag)  -            {  -                ++Count;  -            }  - -            ~TTestObject() {  -                --Count;  -            }  - -            bool operator<(const TTestObject& other) const {  -                return Tag < other.Tag;  -            }  -        };  - -        size_t TTestObject::Count = 0;  +    namespace { +        struct TTestObject { +            static size_t Count; +            int Tag; + +            TTestObject(int tag) +                : Tag(tag) +            { +                ++Count; +            } + +            TTestObject(const TTestObject& other) +                : Tag(other.Tag) +            { +                ++Count; +            } + +            ~TTestObject() { +                --Count; +            } + +            bool operator<(const TTestObject& other) const { +                return Tag < other.Tag; +            } +        }; + +        size_t TTestObject::Count = 0;      } -    ////////////////////////////////////////////////////////////////////////////////  +    ////////////////////////////////////////////////////////////////////////////////      Y_UNIT_TEST_SUITE(TSkipListTest) {          Y_UNIT_TEST(ShouldBeEmptyAfterCreation) { -            TMemoryPool pool(1024);  +            TMemoryPool pool(1024);              TSkipList<int> list(pool);              UNIT_ASSERT_EQUAL(list.GetSize(), 0); @@ -182,4 +182,4 @@ namespace NThreading {          }      } -}  +} diff --git a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp index 2223dce6507..3b5203194a3 100644 --- a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp +++ b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp @@ -14,9 +14,9 @@ class TTaskSchedulerTest: public TTestBase {          class TCheckTask: public TTaskScheduler::IRepeatedTask {              public: -                TCheckTask(const TDuration& delay)  -                    : Start_(Now())  -                    , Delay_(delay)  +                TCheckTask(const TDuration& delay) +                    : Start_(Now()) +                    , Delay_(delay)                  {                      AtomicIncrement(ScheduledTaskCounter_);                  } @@ -25,15 +25,15 @@ class TTaskSchedulerTest: public TTestBase {                  }                  bool Process() override { -                    const TDuration delay = Now() - Start_;  +                    const TDuration delay = Now() - Start_; -                    if (delay < Delay_) {  +                    if (delay < Delay_) {                          AtomicIncrement(BadTimeoutCounter_);                      }                      AtomicIncrement(ExecutedTaskCounter_); -  -                    return false;  + +                    return false;                  }                  static bool AllTaskExecuted() { @@ -45,8 +45,8 @@ class TTaskSchedulerTest: public TTestBase {                  }              private: -                TInstant Start_;  -                TDuration Delay_;  +                TInstant Start_; +                TDuration Delay_;                  static TAtomic BadTimeoutCounter_;                  static TAtomic ScheduledTaskCounter_;                  static TAtomic ExecutedTaskCounter_; @@ -60,7 +60,7 @@ class TTaskSchedulerTest: public TTestBase {              ScheduleCheckTask(10000);              ScheduleCheckTask(5000); -            Scheduler_.Start();  +            Scheduler_.Start();              usleep(1000000); @@ -70,8 +70,8 @@ class TTaskSchedulerTest: public TTestBase {      private:          void ScheduleCheckTask(size_t delay) { -            TDuration d = TDuration::MicroSeconds(delay);  -  +            TDuration d = TDuration::MicroSeconds(delay); +              Scheduler_.Add(new TCheckTask(d), d);          }  | 
