aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/StorageReplicatedMergeTree.h
blob: 33acf5af836d6491b1bdf222ea5f61652ddc104d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
#pragma once

#include <base/UUID.h>
#include <atomic>
#include <pcg_random.hpp>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
#include <Storages/IStorageCluster.h>
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
#include <Storages/MergeTree/MergeFromLogEntryTask.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Storages/MergeTree/PartMovesBetweenShardsOrchestrator.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAttachThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h>
#include <Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
#include <Storages/MergeTree/ReplicatedTableStatus.h>
#include <Storages/RenamingRestrictions.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/PartLog.h>
#include <Common/randomSeed.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/Throttler.h>
#include <Common/EventNotifier.h>
#include <base/defines.h>
#include <Core/BackgroundSchedulePool.h>
#include <QueryPipeline/Pipe.h>
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Parsers/SyncReplicaMode.h>


namespace DB
{

/** The engine that uses the merge tree (see MergeTreeData) and is replicated through ZooKeeper.
  *
  * ZooKeeper is used for the following things:
  * - the structure of the table (/metadata, /columns)
  * - action log with data (/log/log-...,/replicas/replica_name/queue/queue-...);
  * - a replica list (/replicas), and replica activity tag (/replicas/replica_name/is_active), replica addresses (/replicas/replica_name/host);
  * - the leader replica election (/leader_election) - these are the replicas that assign merges, mutations
  *   and partition manipulations.
  *   (after ClickHouse version 20.5 we allow multiple leaders to act concurrently);
  * - a set of parts of data on each replica (/replicas/replica_name/parts);
  * - list of the last N blocks of data with checksum, for deduplication (/blocks);
  * - the list of incremental block numbers (/block_numbers) that we are about to insert,
  *   to ensure the linear order of data insertion and data merge only on the intervals in this sequence;
  * - coordinate writes with quorum (/quorum).
  * - Storage of mutation entries (ALTER DELETE, ALTER UPDATE etc.) to execute (/mutations).
  *   See comments in StorageReplicatedMergeTree::mutate() for details.
  */

/** The replicated tables have a common log (/log/log-...).
  * Log - a sequence of entries (LogEntry) about what to do.
  * Each entry is one of:
  * - normal data insertion (GET),
  * - data insertion with a possible attach from local data (ATTACH),
  * - merge (MERGE),
  * - delete the partition (DROP).
  *
  * Each replica copies (queueUpdatingTask, pullLogsToQueue) entries from the log to its queue (/replicas/replica_name/queue/queue-...)
  *  and then executes them (queueTask).
  * Despite the name of the "queue", execution can be reordered, if necessary (shouldExecuteLogEntry, executeLogEntry).
  * In addition, the records in the queue can be generated independently (not from the log), in the following cases:
  * - when creating a new replica, actions are put on GET from other replicas (createReplica);
  * - if the part is corrupt (removePartAndEnqueueFetch) or absent during the check
  *   (at start - checkParts, while running - searchForMissingPart), actions are put on GET from other replicas;
  *
  * The replica to which INSERT was made in the queue will also have an entry of the GET of this data.
  * Such an entry is considered to be executed as soon as the queue handler sees it.
  *
  * The log entry has a creation time. This time is generated by the clock of server that created entry
  * - the one on which the corresponding INSERT or ALTER query came.
  *
  * For the entries in the queue that the replica made for itself,
  * as the time will take the time of creation the appropriate part on any of the replicas.
  */

class ZooKeeperWithFaultInjection;
using ZooKeeperWithFaultInjectionPtr = std::shared_ptr<ZooKeeperWithFaultInjection>;

class StorageReplicatedMergeTree final : public MergeTreeData
{
public:
    /** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
      */
    StorageReplicatedMergeTree(
        const String & zookeeper_path_,
        const String & replica_name_,
        bool attach,
        const StorageID & table_id_,
        const String & relative_data_path_,
        const StorageInMemoryMetadata & metadata_,
        ContextMutablePtr context_,
        const String & date_column_name,
        const MergingParams & merging_params_,
        std::unique_ptr<MergeTreeSettings> settings_,
        bool has_force_restore_data_flag,
        RenamingRestrictions renaming_restrictions_,
        bool need_check_structure);

    void startup() override;

    /// To many shutdown methods....
    ///
    /// Partial shutdown called if we loose connection to zookeeper.
    /// Table can also recover after partial shutdown and continue
    /// to work. This method can be called regularly.
    void partialShutdown();

    /// These two methods are called during final table shutdown (DROP/DETACH/overall server shutdown).
    /// The shutdown process is split into two methods to make it more soft and fast. In database shutdown()
    /// looks like:
    /// for (table : tables)
    ///     table->flushAndPrepareForShutdown()
    ///
    /// for (table : tables)
    ///     table->shutdown()
    ///
    /// So we stop producing all the parts first for all tables (fast operation). And after we can wait in shutdown()
    /// for other replicas to download parts.
    ///
    /// In flushAndPrepareForShutdown we cancel all part-producing operations:
    /// merges, fetches, moves and so on. If it wasn't called before shutdown() -- shutdown() will
    /// call it (defensive programming).
    void flushAndPrepareForShutdown() override;
    /// In shutdown we completely terminate table -- remove
    /// is_active node and interserver handler. Also optionally
    /// wait until other replicas will download some parts from our replica.
    void shutdown() override;

    ~StorageReplicatedMergeTree() override;

    static String getDefaultZooKeeperPath(const Poco::Util::AbstractConfiguration & config);
    static String getDefaultReplicaName(const Poco::Util::AbstractConfiguration & config);

    std::string getName() const override { return "Replicated" + merging_params.getModeName() + "MergeTree"; }

    bool supportsParallelInsert() const override { return true; }
    bool supportsReplication() const override { return true; }
    bool supportsDeduplication() const override { return true; }

    void read(
        QueryPlan & query_plan,
        const Names & column_names,
        const StorageSnapshotPtr & storage_snapshot,
        SelectQueryInfo & query_info,
        ContextPtr local_context,
        QueryProcessingStage::Enum processed_stage,
        size_t max_block_size,
        size_t num_streams) override;

    std::optional<UInt64> totalRows(const Settings & settings) const override;
    std::optional<UInt64> totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, ContextPtr context) const override;
    std::optional<UInt64> totalBytes(const Settings & settings) const override;

    SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context, bool async_insert) override;

    std::optional<QueryPipeline> distributedWrite(const ASTInsertQuery & /*query*/, ContextPtr /*context*/) override;

    bool optimize(
        const ASTPtr & query,
        const StorageMetadataPtr & metadata_snapshot,
        const ASTPtr & partition,
        bool final,
        bool deduplicate,
        const Names & deduplicate_by_columns,
        bool cleanup,
        ContextPtr query_context) override;

    void alter(const AlterCommands & commands, ContextPtr query_context, AlterLockHolder & table_lock_holder) override;

    void mutate(const MutationCommands & commands, ContextPtr context) override;
    void waitMutation(const String & znode_name, size_t mutations_sync) const;
    std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
    CancellationCode killMutation(const String & mutation_id) override;

    bool hasLightweightDeletedMask() const override;

    /** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper.
      */
    void drop() override;

    void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr query_context, TableExclusiveLockHolder &) override;

    void checkTableCanBeRenamed(const StorageID & new_name) const override;

    void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;

    bool supportsIndexForIn() const override { return true; }

    void checkTableCanBeDropped([[ maybe_unused ]] ContextPtr query_context) const override;

    ActionLock getActionLock(StorageActionBlockType action_type) override;

    void onActionLockRemove(StorageActionBlockType action_type) override;

    /// Wait till replication queue's current last entry is processed or till size becomes 0
    /// If timeout is exceeded returns false
    bool waitForProcessingQueue(UInt64 max_wait_milliseconds, SyncReplicaMode sync_mode);

    /// Get the status of the table. If with_zk_fields = false - do not fill in the fields that require queries to ZK.
    void getStatus(ReplicatedTableStatus & res, bool with_zk_fields = true);

    using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>;
    void getQueue(LogEntriesData & res, String & replica_name);

    std::vector<PartMovesBetweenShardsOrchestrator::Entry> getPartMovesBetweenShardsEntries();

    /// Get replica delay relative to current time.
    time_t getAbsoluteDelay() const;

    /// If the absolute delay is greater than min_relative_delay_to_measure,
    /// will also calculate the difference from the unprocessed time of the best replica.
    /// NOTE: Will communicate to ZooKeeper to calculate relative delay.
    void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay);

    /// Add a part to the queue of parts whose data you want to check in the background thread.
    void enqueuePartForCheck(const String & part_name, time_t delay_to_check_seconds = 0);

    CheckResults checkData(const ASTPtr & query, ContextPtr context) override;

    /// Checks ability to use granularity
    bool canUseAdaptiveGranularity() const override;

    /// Returns the default path to the table in ZooKeeper.
    /// It's used if not set in engine's arguments while creating a replicated table.
    static String getDefaultReplicaPath(const ContextPtr & context_);

    /// Returns the default replica name in ZooKeeper.
    /// It's used if not set in engine's arguments while creating a replicated table.
    static String getDefaultReplicaName(const ContextPtr & context_);

    /// Modify a CREATE TABLE query to make a variant which must be written to a backup.
    void adjustCreateQueryForBackup(ASTPtr & create_query) const override;

    /// Makes backup entries to backup the data of the storage.
    void backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional<ASTs> & partitions) override;

    /// Extract data from the backup and put it to the storage.
    void restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions) override;

    /** Remove a specific replica from zookeeper.
     */
    static void dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica,
                            Poco::Logger * logger, MergeTreeSettingsPtr table_settings = nullptr, std::optional<bool> * has_metadata_out = nullptr);

    void dropReplica(const String & drop_zookeeper_path, const String & drop_replica, Poco::Logger * logger);

    /// Removes table from ZooKeeper after the last replica was dropped
    static bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path,
                                              const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger);

    /// Schedules job to execute in background pool (merge, mutate, drop range and so on)
    bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;

    /// Checks that fetches are not disabled with action blocker and pool for fetches
    /// is not overloaded
    bool canExecuteFetch(const ReplicatedMergeTreeLogEntry & entry, String & disable_reason) const;

    /// Fetch part only when it stored on shared storage like S3
    MutableDataPartPtr executeFetchShared(const String & source_replica, const String & new_part_name, const DiskPtr & disk, const String & path);

    /// Lock part in zookeeper for use shared data in several nodes
    void lockSharedData(const IMergeTreeDataPart & part, bool replace_existing_lock, std::optional<HardlinkedFiles> hardlinked_files) const override;
    void lockSharedData(
        const IMergeTreeDataPart & part,
        const ZooKeeperWithFaultInjectionPtr & zookeeper,
        bool replace_existing_lock,
        std::optional<HardlinkedFiles> hardlinked_files) const;

    void getLockSharedDataOps(
        const IMergeTreeDataPart & part,
        const ZooKeeperWithFaultInjectionPtr & zookeeper,
        bool replace_existing_lock,
        std::optional<HardlinkedFiles> hardlinked_files,
        Coordination::Requests & requests) const;

    zkutil::EphemeralNodeHolderPtr lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const;

    /// Unlock shared data part in zookeeper
    /// Return true if data unlocked
    /// Return false if data is still used by another node
    std::pair<bool, NameSet> unlockSharedData(const IMergeTreeDataPart & part) const override;
    std::pair<bool, NameSet>
    unlockSharedData(const IMergeTreeDataPart & part, const ZooKeeperWithFaultInjectionPtr & zookeeper) const;

    /// Unlock shared data part in zookeeper by part id
    /// Return true if data unlocked
    /// Return false if data is still used by another node
    static std::pair<bool, NameSet> unlockSharedDataByID(
        String part_id,
        const String & table_uuid,
        const MergeTreePartInfo & part_info,
        const String & replica_name_,
        const std::string & disk_type,
        const ZooKeeperWithFaultInjectionPtr & zookeeper_,
        const MergeTreeSettings & settings,
        Poco::Logger * logger,
        const String & zookeeper_path_old,
        MergeTreeDataFormatVersion data_format_version);

    /// Fetch part only if some replica has it on shared storage like S3
    MutableDataPartPtr tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;

    /// Get best replica having this partition on a same type remote disk
    String getSharedDataReplica(const IMergeTreeDataPart & part, DataSourceType data_source_type) const;

    inline const String & getReplicaName() const { return replica_name; }

    /// Restores table metadata if ZooKeeper lost it.
    /// Used only on restarted readonly replicas (not checked). All active (Active) parts are moved to detached/
    /// folder and attached. Parts in all other states are just moved to detached/ folder.
    void restoreMetadataInZooKeeper();

    /// Get throttler for replicated fetches
    ThrottlerPtr getFetchesThrottler() const
    {
        return replicated_fetches_throttler;
    }

    /// Get throttler for replicated sends
    ThrottlerPtr getSendsThrottler() const
    {
        return replicated_sends_throttler;
    }

    bool createEmptyPartInsteadOfLost(zkutil::ZooKeeperPtr zookeeper, const String & lost_part_name);

    // Return default or custom zookeeper name for table
    const String & getZooKeeperName() const { return zookeeper_name; }

    const String & getZooKeeperPath() const { return zookeeper_path; }

    // Return table id, common for different replicas
    String getTableSharedID() const override;

    size_t getNumberOfUnfinishedMutations() const override;

    /// Returns the same as getTableSharedID(), but extracts it from a create query.
    static std::optional<String> tryGetTableSharedIDFromCreateQuery(const IAST & create_query, const ContextPtr & global_context);

    static const String & getDefaultZooKeeperName() { return default_zookeeper_name; }

    /// Check if there are new broken disks and enqueue part recovery tasks.
    void checkBrokenDisks();

    static bool removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name, const String & table_uuid,
        const String & replica_name, const String & zookeeper_path, const ContextPtr & local_context, const zkutil::ZooKeeperPtr & zookeeper);

    bool canUseZeroCopyReplication() const;

    bool isTableReadOnly () { return is_readonly; }

    /// Get a sequential consistent view of current parts.
    ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const;

    void addLastSentPart(const MergeTreePartInfo & info);

    /// Wait required amount of milliseconds to give other replicas a chance to
    /// download unique parts from our replica
    using ShutdownDeadline = std::chrono::time_point<std::chrono::system_clock>;
    void waitForUniquePartsToBeFetchedByOtherReplicas(ShutdownDeadline shutdown_deadline);

private:
    std::atomic_bool are_restoring_replica {false};

    /// Delete old parts from disk and from ZooKeeper. Returns the number of removed parts
    size_t clearOldPartsAndRemoveFromZK();
    void clearOldPartsAndRemoveFromZKImpl(zkutil::ZooKeeperPtr zookeeper, DataPartsVector && parts);

    template<bool async_insert>
    friend class ReplicatedMergeTreeSinkImpl;
    friend class ReplicatedMergeTreePartCheckThread;
    friend class ReplicatedMergeTreeCleanupThread;
    friend class AsyncBlockIDsCache<StorageReplicatedMergeTree>;
    friend class ReplicatedMergeTreeAlterThread;
    friend class ReplicatedMergeTreeRestartingThread;
    friend class ReplicatedMergeTreeAttachThread;
    friend class ReplicatedMergeTreeMergeStrategyPicker;
    friend struct ReplicatedMergeTreeLogEntry;
    friend class ScopedPartitionMergeLock;
    friend class ReplicatedMergeTreeQueue;
    friend class PartMovesBetweenShardsOrchestrator;
    friend class MergeTreeData;
    friend class MergeFromLogEntryTask;
    friend class MutateFromLogEntryTask;
    friend class ReplicatedMergeMutateTaskBase;

    using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker;
    using LogEntry = ReplicatedMergeTreeLogEntry;
    using LogEntryPtr = LogEntry::Ptr;

    using MergeTreeData::MutableDataPartPtr;

    zkutil::ZooKeeperPtr current_zookeeper;        /// Use only the methods below.
    mutable std::mutex current_zookeeper_mutex;    /// To recreate the session in the background thread.

    zkutil::ZooKeeperPtr tryGetZooKeeper() const;
    zkutil::ZooKeeperPtr getZooKeeper() const;
    /// Get connection from global context and reconnect if needed.
    /// NOTE: use it only when table is shut down, in all other cases
    /// use getZooKeeper() because it is managed by restarting thread
    /// which guarantees that we have only one connected object
    /// for table.
    zkutil::ZooKeeperPtr getZooKeeperIfTableShutDown() const;
    zkutil::ZooKeeperPtr getZooKeeperAndAssertNotReadonly() const;
    void setZooKeeper();
    String getEndpointName() const;

    /// If true, the table is offline and can not be written to it.
    /// This flag is managed by RestartingThread.
    std::atomic_bool is_readonly {true};
    /// If nullopt - ZooKeeper is not available, so we don't know if there is table metadata.
    /// If false - ZooKeeper is available, but there is no table metadata. It's safe to drop table in this case.
    std::optional<bool> has_metadata_in_zookeeper;

    bool is_readonly_metric_set = false;

    static const String default_zookeeper_name;
    const String zookeeper_name;
    const String zookeeper_path;
    const String replica_name;
    const String replica_path;

    /** /replicas/me/is_active.
      */
    zkutil::EphemeralNodeHolderPtr replica_is_active_node;

    /** Is this replica "leading". The leader replica selects the parts to merge.
      * It can be false only when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders.
      */
    std::atomic<bool> is_leader {false};

    InterserverIOEndpointPtr data_parts_exchange_endpoint;

    MergeTreeDataSelectExecutor reader;
    MergeTreeDataWriter writer;
    MergeTreeDataMergerMutator merger_mutator;

    MergeStrategyPicker merge_strategy_picker;

    /** The queue of what needs to be done on this replica to catch up with everyone. It is taken from ZooKeeper (/replicas/me/queue/).
     * In ZK entries in chronological order. Here it is not necessary.
     */
    ReplicatedMergeTreeQueue queue;
    std::atomic<time_t> last_queue_update_start_time{0};
    std::atomic<time_t> last_queue_update_finish_time{0};

    mutable std::mutex last_queue_update_exception_lock;
    String last_queue_update_exception;
    String getLastQueueUpdateException() const;

    DataPartsExchange::Fetcher fetcher;

    /// When activated, replica is initialized and startup() method could exit
    Poco::Event startup_event;

    /// Do I need to complete background threads (except restarting_thread)?
    std::atomic<bool> partial_shutdown_called {false};

    /// Event that is signalled (and is reset) by the restarting_thread when the ZooKeeper session expires.
    Poco::Event partial_shutdown_event {false};     /// Poco::Event::EVENT_MANUALRESET

    std::atomic<bool> shutdown_called {false};
    std::atomic<bool> shutdown_prepared_called {false};
    std::optional<ShutdownDeadline> shutdown_deadline;

    /// We call flushAndPrepareForShutdown before acquiring DDLGuard, so we can shutdown a table that is being created right now
    mutable std::mutex flush_and_shutdown_mutex;


    mutable std::mutex last_sent_parts_mutex;
    std::condition_variable last_sent_parts_cv;
    std::deque<MergeTreePartInfo> last_sent_parts;

    /// Threads.
    ///

    /// A task that keeps track of the updates in the logs of all replicas and loads them into the queue.
    bool queue_update_in_progress = false;
    BackgroundSchedulePool::TaskHolder queue_updating_task;

    BackgroundSchedulePool::TaskHolder mutations_updating_task;
    Coordination::WatchCallbackPtr mutations_watch_callback;

    /// A task that selects parts to merge.
    BackgroundSchedulePool::TaskHolder merge_selecting_task;
    /// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query.
    std::mutex merge_selecting_mutex;

    UInt64 merge_selecting_sleep_ms;

    /// A task that marks finished mutations as done.
    BackgroundSchedulePool::TaskHolder mutations_finalizing_task;

    /// A thread that removes old parts, log entries, and blocks.
    ReplicatedMergeTreeCleanupThread cleanup_thread;

    AsyncBlockIDsCache<StorageReplicatedMergeTree> async_block_ids_cache;

    /// A thread that checks the data of the parts, as well as the queue of the parts to be checked.
    ReplicatedMergeTreePartCheckThread part_check_thread;

    /// A thread that processes reconnection to ZooKeeper when the session expires.
    ReplicatedMergeTreeRestartingThread restarting_thread;
    EventNotifier::HandlerPtr session_expired_callback_handler;

    /// A thread that attaches the table using ZooKeeper
    std::optional<ReplicatedMergeTreeAttachThread> attach_thread;

    PartMovesBetweenShardsOrchestrator part_moves_between_shards_orchestrator;

    std::atomic<bool> initialization_done{false};

    /// True if replica was created for existing table with fixed granularity
    bool other_replicas_fixed_granularity = false;

    /// Do not allow RENAME TABLE if zookeeper_path contains {database} or {table} macro
    const RenamingRestrictions renaming_restrictions;

    /// Throttlers used in DataPartsExchange to lower maximum fetch/sends
    /// speed.
    ThrottlerPtr replicated_fetches_throttler;
    ThrottlerPtr replicated_sends_throttler;

    /// Global ID, synced via ZooKeeper between replicas
    mutable std::mutex table_shared_id_mutex;
    mutable UUID table_shared_id;

    std::mutex last_broken_disks_mutex;
    std::set<String> last_broken_disks;

    std::mutex existing_zero_copy_locks_mutex;

    struct ZeroCopyLockDescription
    {
        std::string replica;
        std::shared_ptr<std::atomic<bool>> exists;
    };

    std::unordered_map<String, ZeroCopyLockDescription> existing_zero_copy_locks;

    static std::optional<QueryPipeline> distributedWriteFromClusterStorage(const std::shared_ptr<IStorageCluster> & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context);

    void readLocalImpl(
        QueryPlan & query_plan,
        const Names & column_names,
        const StorageSnapshotPtr & storage_snapshot,
        SelectQueryInfo & query_info,
        ContextPtr local_context,
        QueryProcessingStage::Enum processed_stage,
        size_t max_block_size,
        size_t num_streams);

    void readLocalSequentialConsistencyImpl(
        QueryPlan & query_plan,
        const Names & column_names,
        const StorageSnapshotPtr & storage_snapshot,
        SelectQueryInfo & query_info,
        ContextPtr local_context,
        QueryProcessingStage::Enum processed_stage,
        size_t max_block_size,
        size_t num_streams);

    void readParallelReplicasImpl(
        QueryPlan & query_plan,
        const Names & column_names,
        const StorageSnapshotPtr & storage_snapshot,
        SelectQueryInfo & query_info,
        ContextPtr local_context,
        QueryProcessingStage::Enum processed_stage,
        size_t max_block_size,
        size_t num_streams);

    template <class Func>
    void foreachActiveParts(Func && func, bool select_sequential_consistency) const;

    /** Creates the minimum set of nodes in ZooKeeper and create first replica.
      * Returns true if was created, false if exists.
      */
    bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot);

    /**
     * Creates a replica in ZooKeeper and adds to the queue all that it takes to catch up with the rest of the replicas.
     */
    void createReplica(const StorageMetadataPtr & metadata_snapshot);

    /** Create nodes in the ZK, which must always be, but which might not exist when older versions of the server are running.
      */
    void createNewZooKeeperNodes();

    bool checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, bool strict_check = true);

    /// A part of ALTER: apply metadata changes only (data parts are altered separately).
    /// Must be called under IStorage::lockForAlter() lock.
    void setTableStructure(
        const StorageID & table_id, const ContextPtr & local_context,
        ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff,
        int32_t new_metadata_version);

    /** Check that the set of parts corresponds to that in ZK (/replicas/me/parts/).
      * If any parts described in ZK are not locally, throw an exception.
      * If any local parts are not mentioned in ZK, remove them.
      *  But if there are too many, throw an exception just in case - it's probably a configuration error.
      */
    void checkParts(bool skip_sanity_checks);

    /// Synchronize the list of part uuids which are currently pinned. These should be sent to root query executor
    /// to be used for deduplication.
    void syncPinnedPartUUIDs();

    /** Check that the part's checksum is the same as the checksum of the same part on some other replica.
      * If no one has such a part, nothing checks.
      * Not very reliable: if two replicas add a part almost at the same time, no checks will occur.
      * Adds actions to `ops` that add data about the part into ZooKeeper.
      * Call under lockForShare.
      */
    bool checkPartChecksumsAndAddCommitOps(
        const ZooKeeperWithFaultInjectionPtr & zookeeper,
        const DataPartPtr & part,
        Coordination::Requests & ops,
        String part_name,
        NameSet & absent_replicas_paths);

    String getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const;

    /// Accepts a PreActive part, atomically checks its checksums with ones on other replicas and commit the part
    DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction, const MutableDataPartPtr & part, std::optional<HardlinkedFiles> hardlinked_files = {}, bool replace_zero_copy_lock=false);

    bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;

    void getCommitPartOps(Coordination::Requests & ops, const DataPartPtr & part, const String & block_id_path = "") const;

    void getCommitPartOps(Coordination::Requests & ops, const DataPartPtr & part, const std::vector<String> & block_id_paths) const;

    /// Adds actions to `ops` that remove a part from ZooKeeper.
    /// Set has_children to true for "old-style" parts (those with /columns and /checksums child znodes).
    void getRemovePartFromZooKeeperOps(const String & part_name, Coordination::Requests & ops, bool has_children);

    /// Quickly removes big set of parts from ZooKeeper (using async multi queries)
    void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names,
                                  NameSet * parts_should_be_retried = nullptr);

    /// Remove parts from ZooKeeper, throw exception if unable to do so after max_retries.
    void removePartsFromZooKeeperWithRetries(const Strings & part_names, size_t max_retries = 5);
    void removePartsFromZooKeeperWithRetries(PartsToRemoveFromZooKeeper & parts, size_t max_retries = 5);

    void forcefullyRemoveBrokenOutdatedPartFromZooKeeperBeforeDetaching(const String & part_name) override;

    void paranoidCheckForCoveredPartsInZooKeeperOnStart(const Strings & parts_in_zk, const Strings & parts_to_fetch) const;

    /// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts.
    void removePartAndEnqueueFetch(const String & part_name, bool storage_init);

    /// Running jobs from the queue.

    /** Execute the action from the queue. Throws an exception if something is wrong.
      * Returns whether or not it succeeds. If it did not work, write it to the end of the queue.
      */
    bool executeLogEntry(LogEntry & entry);

    /// Lookup the part for the entry in the detached/ folder.
    /// returns nullptr if the part is corrupt or missing.
    MutableDataPartPtr attachPartHelperFoundValidPart(const LogEntry& entry) const;

    void executeDropRange(const LogEntry & entry);

    /// Execute alter of table metadata. Set replica/metadata and replica/columns
    /// nodes in zookeeper and also changes in memory metadata.
    /// New metadata and columns values stored in entry.
    bool executeMetadataAlter(const LogEntry & entry);

    /// Fetch part from other replica (inserted or merged/mutated)
    /// NOTE: Attention! First of all tries to find covering part on other replica
    /// and set it into entry.actual_new_part_name. After that tries to fetch this new covering part.
    /// If fetch was not successful, clears entry.actual_new_part_name.
    bool executeFetch(LogEntry & entry, bool need_to_check_missing_part=true);

    bool executeReplaceRange(const LogEntry & entry);
    void executeClonePartFromShard(const LogEntry & entry);

    /** Updates the queue.
      */
    void queueUpdatingTask();

    void mutationsUpdatingTask();

    /** Clone data from another replica.
      * If replica can not be cloned throw Exception.
      */
    void cloneReplica(const String & source_replica, Coordination::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper);

    /// Repairs metadata of staled replica. Called from cloneReplica(...)
    void cloneMetadataIfNeeded(const String & source_replica, const String & source_path, zkutil::ZooKeeperPtr & zookeeper);

    /// Clone replica if it is lost.
    void cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper);


    ReplicatedMergeTreeQueue::SelectedEntryPtr selectQueueEntry();


    MergeFromLogEntryTaskPtr getTaskToProcessMergeQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr entry);

    bool processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr entry);

    /// Start being leader (if not disabled by setting).
    /// Since multi-leaders are allowed, it just sets is_leader flag.
    void startBeingLeader();
    void stopBeingLeader();

    /** Selects the parts to merge and writes to the log.
      */
    void mergeSelectingTask();

    /// Checks if some mutations are done and marks them as done.
    void mutationsFinalizingTask();

    /** Write the selected parts to merge into the log,
      * Call when merge_selecting_mutex is locked.
      * Returns false if any part is not in ZK.
      */
    enum class CreateMergeEntryResult { Ok, MissingPart, LogUpdated, Other };

    CreateMergeEntryResult createLogEntryToMergeParts(
        zkutil::ZooKeeperPtr & zookeeper,
        const DataPartsVector & parts,
        const String & merged_name,
        const UUID & merged_part_uuid,
        const MergeTreeDataPartFormat & merged_part_format,
        bool deduplicate,
        const Names & deduplicate_by_columns,
        bool cleanup,
        ReplicatedMergeTreeLogEntryData * out_log_entry,
        int32_t log_version,
        MergeType merge_type);

    CreateMergeEntryResult createLogEntryToMutatePart(
        const IMergeTreeDataPart & part,
        const UUID & new_part_uuid,
        Int64 mutation_version,
        int32_t alter_version,
        int32_t log_version);

    /// Exchange parts.

    ConnectionTimeouts getFetchPartHTTPTimeouts(ContextPtr context);

    /** Returns an empty string if no one has a part.
      */
    String findReplicaHavingPart(const String & part_name, bool active);
    static String findReplicaHavingPart(const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_);

    bool checkReplicaHavePart(const String & replica, const String & part_name);
    bool checkIfDetachedPartExists(const String & part_name);
    bool checkIfDetachedPartitionExists(const String & partition_name);

    /** Find replica having specified part or any part that covers it.
      * If active = true, consider only active replicas.
      * If found, returns replica name and set 'entry->actual_new_part_name' to name of found largest covering part.
      * If not found, returns empty string.
      */
    String findReplicaHavingCoveringPart(LogEntry & entry, bool active);
    String findReplicaHavingCoveringPart(const String & part_name, bool active, String & found_part_name);
    static std::set<MergeTreePartInfo> findReplicaUniqueParts(const String & replica_name_, const String & zookeeper_path_, MergeTreeDataFormatVersion format_version_, zkutil::ZooKeeper::Ptr zookeeper_, Poco::Logger * log_);

    /** Download the specified part from the specified replica.
      * If `to_detached`, the part is placed in the `detached` directory.
      * If quorum != 0, then the node for tracking the quorum is updated.
      * Returns false if part is already fetching right now.
      */
    bool fetchPart(
        const String & part_name,
        const StorageMetadataPtr & metadata_snapshot,
        const String & source_zookeeper_name,
        const String & source_replica_path,
        bool to_detached,
        size_t quorum,
        zkutil::ZooKeeper::Ptr zookeeper_ = nullptr,
        bool try_fetch_shared = true);

    /** Download the specified part from the specified replica.
      * Used for replace local part on the same s3-shared part in hybrid storage.
      * Returns false if part is already fetching right now.
      */
    MutableDataPartPtr fetchExistsPart(
        const String & part_name,
        const StorageMetadataPtr & metadata_snapshot,
        const String & replica_path,
        DiskPtr replaced_disk,
        String replaced_part_path);

    /// Required only to avoid races between executeLogEntry and fetchPartition
    std::unordered_set<String> currently_fetching_parts;
    std::mutex currently_fetching_parts_mutex;

    /// With the quorum being tracked, add a replica to the quorum for the part.
    void updateQuorum(const String & part_name, bool is_parallel);

    /// Deletes info from quorum/last_part node for particular partition_id.
    void cleanLastPartNode(const String & partition_id);

    /// Part name is stored in quorum/last_part for corresponding partition_id.
    bool partIsLastQuorumPart(const MergeTreePartInfo & part_info) const;

    /// Part currently inserting with quorum (node quorum/parallel/part_name exists)
    bool partIsInsertingWithParallelQuorum(const MergeTreePartInfo & part_info) const;

    /// Creates new block number if block with such block_id does not exist
    /// If zookeeper_path_prefix specified then allocate block number on this path
    /// (can be used if we want to allocate blocks on other replicas)
    std::optional<EphemeralLockInZooKeeper> allocateBlockNumber(
        const String & partition_id, const zkutil::ZooKeeperPtr & zookeeper,
        const String & zookeeper_block_id_path = "", const String & zookeeper_path_prefix = "") const;

    template<typename T>
    std::optional<EphemeralLockInZooKeeper> allocateBlockNumber(
        const String & partition_id,
        const ZooKeeperWithFaultInjectionPtr & zookeeper,
        const T & zookeeper_block_id_path,
        const String & zookeeper_path_prefix = "") const;

    /** Wait until all replicas, including this, execute the specified action from the log.
      * If replicas are added at the same time, it can not wait the added replica.
      *
      * Waits for inactive replicas no more than wait_for_inactive_timeout.
      * Returns list of inactive replicas that have not executed entry or throws exception.
      *
      * NOTE: This method must be called without table lock held.
      * Because it effectively waits for other thread that usually has to also acquire a lock to proceed and this yields deadlock.
      */
    void waitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry,
                                             Int64 wait_for_inactive_timeout, const String & error_context = {});
    Strings tryWaitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry,
                                                   Int64 wait_for_inactive_timeout);

    /** Wait until the specified replica executes the specified action from the log.
      * NOTE: See comment about locks above.
      */
    bool tryWaitForReplicaToProcessLogEntry(const String & table_zookeeper_path, const String & replica_name,
                                            const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0);

    /// Depending on settings, do nothing or wait for this replica or all replicas process log entry.
    void waitForLogEntryToBeProcessedIfNecessary(const ReplicatedMergeTreeLogEntryData & entry, ContextPtr query_context, const String & error_context = {});

    /// Throw an exception if the table is readonly.
    void assertNotReadonly() const;

    /// Produce an imaginary part info covering all parts in the specified partition (at the call moment).
    /// Returns false if the partition doesn't exist yet.
    /// Caller must hold delimiting_block_lock until creation of drop/replace entry in log.
    /// Otherwise some replica may assign merge which intersects part_info.
    bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info,
                                                std::optional<EphemeralLockInZooKeeper> & delimiting_block_lock, bool for_replace_range = false);

    /// Check for a node in ZK. If it is, remember this information, and then immediately answer true.
    mutable std::unordered_set<std::string> existing_nodes_cache;
    mutable std::mutex existing_nodes_cache_mutex;
    bool existsNodeCached(const ZooKeeperWithFaultInjectionPtr & zookeeper, const std::string & path) const;

    /// Cancels INSERTs in the block range by removing ephemeral block numbers
    void clearLockedBlockNumbersInPartition(zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);

    void getClearBlocksInPartitionOps(Coordination::Requests & ops, zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);

    void getClearBlocksInPartitionOpsImpl(Coordination::Requests & ops, zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num, const String & blocks_dir_name);
    /// Remove block IDs from `blocks/` in ZooKeeper for the given partition ID in the given block number range.
    void clearBlocksInPartition(
        zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);

    /// Info about how other replicas can access this one.
    ReplicatedMergeTreeAddress getReplicatedMergeTreeAddress() const;

    bool addOpsToDropAllPartsInPartition(
        zkutil::ZooKeeper & zookeeper, const String & partition_id, bool detach,
        Coordination::Requests & ops, std::vector<LogEntryPtr> & entries,
        std::vector<EphemeralLockInZooKeeper> & delimiting_block_locks,
        std::vector<size_t> & log_entry_ops_idx);
    void dropAllPartsInPartitions(
        zkutil::ZooKeeper & zookeeper, const Strings & partition_ids, std::vector<LogEntryPtr> & entries, ContextPtr query_context, bool detach);

    LogEntryPtr dropAllPartsInPartition(
        zkutil::ZooKeeper & zookeeper, const String & partition_id, ContextPtr query_context, bool detach);


    void dropAllPartitionsImpl(const zkutil::ZooKeeperPtr & zookeeper, bool detach, ContextPtr query_context);

    void dropPartNoWaitNoThrow(const String & part_name) override;
    void dropPart(const String & part_name, bool detach, ContextPtr query_context) override;

    // Partition helpers
    void dropPartition(const ASTPtr & partition, bool detach, ContextPtr query_context) override;
    PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, ContextPtr query_context) override;
    void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr query_context) override;
    void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) override;
    void movePartitionToShard(const ASTPtr & partition, bool move_part, const String & to, ContextPtr query_context) override;
    CancellationCode killPartMoveToShard(const UUID & task_uuid) override;
    void fetchPartition(
        const ASTPtr & partition,
        const StorageMetadataPtr & metadata_snapshot,
        const String & from,
        bool fetch_part,
        ContextPtr query_context) override;

    /// NOTE: there are no guarantees for concurrent merges. Dropping part can
    /// be concurrently merged into some covering part and dropPart will do
    /// nothing. There are some fundamental problems with it. But this is OK
    /// because:
    ///
    /// dropPart used in the following cases:
    /// 1) Remove empty parts after TTL.
    /// 2) Remove parts after move between shards.
    /// 3) User queries: ALTER TABLE DROP PART 'part_name'.
    ///
    /// In the first case merge of empty part is even better than DROP. In the
    /// second case part UUIDs used to forbid merges for moving parts so there
    /// is no problem with concurrent merges. The third case is quite rare and
    /// we give very weak guarantee: there will be no active part with this
    /// name, but possibly it was merged to some other part.
    ///
    /// NOTE: don't rely on dropPart if you 100% need to remove non-empty part
    /// and don't use any explicit locking mechanism for merges.
    bool dropPartImpl(zkutil::ZooKeeperPtr & zookeeper, String part_name, LogEntry & entry, bool detach, bool throw_if_noop);

    /// Check granularity of already existing replicated table in zookeeper if it exists
    /// return true if it's fixed
    bool checkFixedGranularityInZookeeper();

    /// Wait for timeout seconds mutation is finished on replicas
    void waitMutationToFinishOnReplicas(
        const Strings & replicas, const String & mutation_id) const;

    std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const DataPartPtr & part) const override;

    void startBackgroundMovesIfNeeded() override;

    /// Attaches restored parts to the storage.
    void attachRestoredParts(MutableDataPartsVector && parts) override;

    std::unique_ptr<MergeTreeSettings> getDefaultSettings() const override;

    PartitionBlockNumbersHolder allocateBlockNumbersInAffectedPartitions(
        const MutationCommands & commands, ContextPtr query_context, const zkutil::ZooKeeperPtr & zookeeper) const;

    static Strings getZeroCopyPartPath(const MergeTreeSettings & settings, const std::string & disk_type, const String & table_uuid,
        const String & part_name, const String & zookeeper_path_old);

    static void createZeroCopyLockNode(
        const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node,
        int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false,
        const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {});

    static void getZeroCopyLockNodeCreateOps(
        const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node, Coordination::Requests & requests,
        int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false,
        const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {});


    bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name) override;

    /// Create freeze metadata for table and save in zookeeper. Required only if zero-copy replication enabled.
    void createAndStoreFreezeMetadata(DiskPtr disk, DataPartPtr part, String backup_part_path) const override;

    // Create table id if needed
    void createTableSharedID() const;

    bool checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk, String & lock_replica);
    void watchZeroCopyLock(const String & part_name, const DiskPtr & disk);

    std::optional<String> getZeroCopyPartPath(const String & part_name, const DiskPtr & disk);

    /// Create ephemeral lock in zookeeper for part and disk which support zero copy replication.
    /// If no connection to zookeeper, shutdown, readonly -- return std::nullopt.
    /// If somebody already holding the lock -- return unlocked ZeroCopyLock object (not std::nullopt).
    std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk) override;

    /// Wait for ephemral lock to disappear. Return true if table shutdown/readonly/timeout exceeded, etc.
    /// Or if node actually disappeared.
    bool waitZeroCopyLockToDisappear(const ZeroCopyLock & lock, size_t milliseconds_to_wait) override;

    void startupImpl(bool from_attach_thread);
};

String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info);


/** There are three places for each part, where it should be
  * 1. In the RAM, data_parts, all_data_parts.
  * 2. In the filesystem (FS), the directory with the data of the table.
  * 3. in ZooKeeper (ZK).
  *
  * When adding a part, it must be added immediately to these three places.
  * This is done like this
  * - [FS] first write the part into a temporary directory on the filesystem;
  * - [FS] rename the temporary part to the result on the filesystem;
  * - [RAM] immediately afterwards add it to the `data_parts`, and remove from `data_parts` any parts covered by this one;
  * - [RAM] also set the `Transaction` object, which in case of an exception (in next point),
  *   rolls back the changes in `data_parts` (from the previous point) back;
  * - [ZK] then send a transaction (multi) to add a part to ZooKeeper (and some more actions);
  * - [FS, ZK] by the way, removing the covered (old) parts from filesystem, from ZooKeeper and from `all_data_parts`
  *   is delayed, after a few minutes.
  *
  * There is no atomicity here.
  * It could be possible to achieve atomicity using undo/redo logs and a flag in `DataPart` when it is completely ready.
  * But it would be inconvenient - I would have to write undo/redo logs for each `Part` in ZK, and this would increase already large number of interactions.
  *
  * Instead, we are forced to work in a situation where at any time
  *  (from another thread, or after server restart), there may be an unfinished transaction.
  *  (note - for this the part should be in RAM)
  * From these cases the most frequent one is when the part is already in the data_parts, but it's not yet in ZooKeeper.
  * This case must be distinguished from the case where such a situation is achieved due to some kind of damage to the state.
  *
  * Do this with the threshold for the time.
  * If the part is young enough, its lack in ZooKeeper will be perceived optimistically - as if it just did not have time to be added there
  *  - as if the transaction has not yet been executed, but will soon be executed.
  * And if the part is old, its absence in ZooKeeper will be perceived as an unfinished transaction that needs to be rolled back.
  *
  * PS. Perhaps it would be better to add a flag to the DataPart that a part is inserted into ZK.
  * But here it's too easy to get confused with the consistency of this flag.
  */
/// NOLINTNEXTLINE
#define MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER (5 * 60)

}