aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/MergeTree/MergeTreeData.h
blob: 22d7a070ad94f477fbba06e5e7f5cc8471415903 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
#pragma once

#include <mutex>
#include <base/defines.h>
#include <Common/SimpleIncrement.h>
#include <Common/SharedMutex.h>
#include <Common/MultiVersion.h>
#include <Storages/IStorage.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Disks/StoragePolicy.h>
#include <Processors/Merges/Algorithms/Graphite.h>
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartBuilder.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Storages/MergeTree/MergeTreeWriteAheadLog.h>
#include <Storages/MergeTree/PinnedPartUUIDs.h>
#include <Storages/MergeTree/ZeroCopyLock.h>
#include <Storages/MergeTree/TemporaryParts.h>
#include <Storages/IndicesDescription.h>
#include <Storages/MergeTree/AlterConversions.h>
#include <Storages/DataDestinationType.h>
#include <Storages/extractKeyExpressionList.h>
#include <Storages/PartitionCommands.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/threadPoolCallbackRunner.h>


#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/global_fun.hpp>
#include <boost/range/iterator_range_core.hpp>

namespace DB
{

/// Number of streams is not number parts, but number or parts*files, hence 1000.
const size_t DEFAULT_DELAYED_STREAMS_FOR_PARALLEL_WRITE = 1000;

class AlterCommands;
class MergeTreePartsMover;
class MergeTreeDataMergerMutator;
class MutationCommands;
class Context;
using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
struct JobAndPool;
class MergeTreeTransaction;
struct ZeroCopyLock;

class IBackupEntry;
using BackupEntries = std::vector<std::pair<String, std::shared_ptr<const IBackupEntry>>>;

class MergeTreeTransaction;
using MergeTreeTransactionPtr = std::shared_ptr<MergeTreeTransaction>;

/// Auxiliary struct holding information about the future merged or mutated part.
struct EmergingPartInfo
{
    String disk_name;
    String partition_id;
    size_t estimate_bytes;
};

struct CurrentlySubmergingEmergingTagger;

struct SelectQueryOptions;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
using ManyExpressionActions = std::vector<ExpressionActionsPtr>;
class MergeTreeDeduplicationLog;

namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
}

struct DataPartsLock
{
    std::optional<Stopwatch> wait_watch;
    std::unique_lock<std::mutex> lock;
    std::optional<Stopwatch> lock_watch;
    DataPartsLock() = default;
    explicit DataPartsLock(std::mutex & data_parts_mutex_);

    ~DataPartsLock();
};

/// Data structure for *MergeTree engines.
/// Merge tree is used for incremental sorting of data.
/// The table consists of several sorted parts.
/// During insertion new data is sorted according to the primary key and is written to the new part.
/// Parts are merged in the background according to a heuristic algorithm.
/// For each part the index file is created containing primary key values for every n-th row.
/// This allows efficient selection by primary key range predicate.
///
/// Additionally:
///
/// The date column is specified. For each part min and max dates are remembered.
/// Essentially it is an index too.
///
/// Data is partitioned by the value of the partitioning expression.
/// Parts belonging to different partitions are not merged - for the ease of administration (data sync and backup).
///
/// File structure of old-style month-partitioned tables (format_version = 0):
/// Part directory - / min-date _ max-date _ min-id _ max-id _ level /
/// Inside the part directory:
/// checksums.txt - contains the list of all files along with their sizes and checksums.
/// columns.txt - contains the list of all columns and their types.
/// primary.idx - contains the primary index.
/// [Column].bin - contains compressed column data.
/// [Column].mrk - marks, pointing to seek positions allowing to skip n * k rows.
///
/// File structure of tables with custom partitioning (format_version >= 1):
/// Part directory - / partition-id _ min-id _ max-id _ level /
/// Inside the part directory:
/// The same files as for month-partitioned tables, plus
/// count.txt - contains total number of rows in this part.
/// partition.dat - contains the value of the partitioning expression.
/// minmax_[Column].idx - MinMax indexes (see IMergeTreeDataPart::MinMaxIndex class) for the columns required by the partitioning expression.
///
/// Several modes are implemented. Modes determine additional actions during merge:
/// - Ordinary - don't do anything special
/// - Collapsing - collapse pairs of rows with the opposite values of sign_columns for the same values
///   of primary key (cf. CollapsingSortedTransform.h)
/// - Replacing - for all rows with the same primary key keep only the latest one. Or, if the version
///   column is set, keep the latest row with the maximal version.
/// - Summing - sum all numeric columns not contained in the primary key for all rows with the same primary key.
/// - Aggregating - merge columns containing aggregate function states for all rows with the same primary key.
/// - Graphite - performs coarsening of historical data for Graphite (a system for quantitative monitoring).

/// The MergeTreeData class contains a list of parts and the data structure parameters.
/// To read and modify the data use other classes:
/// - MergeTreeDataSelectExecutor
/// - MergeTreeDataWriter
/// - MergeTreeDataMergerMutator

class MergeTreeData : public IStorage, public WithMutableContext
{
public:
    /// Function to call if the part is suspected to contain corrupt data.
    using BrokenPartCallback = std::function<void (const String &)>;
    using DataPart = IMergeTreeDataPart;

    using MutableDataPartPtr = std::shared_ptr<DataPart>;
    using MutableDataPartsVector = std::vector<MutableDataPartPtr>;
    /// After the DataPart is added to the working set, it cannot be changed.
    using DataPartPtr = std::shared_ptr<const DataPart>;

    using DataPartState = MergeTreeDataPartState;
    using DataPartStates = std::initializer_list<DataPartState>;
    using DataPartStateVector = std::vector<DataPartState>;

    using PinnedPartUUIDsPtr = std::shared_ptr<const PinnedPartUUIDs>;

    constexpr static auto FORMAT_VERSION_FILE_NAME = "format_version.txt";
    constexpr static auto DETACHED_DIR_NAME = "detached";
    constexpr static auto MOVING_DIR_NAME = "moving";

    /// Auxiliary structure for index comparison. Keep in mind lifetime of MergeTreePartInfo.
    struct DataPartStateAndInfo
    {
        DataPartState state;
        const MergeTreePartInfo & info;
    };

    /// Auxiliary structure for index comparison
    struct DataPartStateAndPartitionID
    {
        DataPartState state;
        String partition_id;
    };

    STRONG_TYPEDEF(String, PartitionID)

    struct LessDataPart
    {
        using is_transparent = void;

        bool operator()(const DataPartPtr & lhs, const MergeTreePartInfo & rhs) const { return lhs->info < rhs; }
        bool operator()(const MergeTreePartInfo & lhs, const DataPartPtr & rhs) const { return lhs < rhs->info; }
        bool operator()(const DataPartPtr & lhs, const DataPartPtr & rhs) const { return lhs->info < rhs->info; }
        bool operator()(const MergeTreePartInfo & lhs, const PartitionID & rhs) const { return lhs.partition_id < rhs.toUnderType(); }
        bool operator()(const PartitionID & lhs, const MergeTreePartInfo & rhs) const { return lhs.toUnderType() < rhs.partition_id; }
    };

    struct LessStateDataPart
    {
        using is_transparent = void;

        bool operator() (const DataPartStateAndInfo & lhs, const DataPartStateAndInfo & rhs) const
        {
            return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.info)
                   < std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.info);
        }

        bool operator() (DataPartStateAndInfo info, const DataPartState & state) const
        {
            return static_cast<size_t>(info.state) < static_cast<size_t>(state);
        }

        bool operator() (const DataPartState & state, DataPartStateAndInfo info) const
        {
            return static_cast<size_t>(state) < static_cast<size_t>(info.state);
        }

        bool operator() (const DataPartStateAndInfo & lhs, const DataPartStateAndPartitionID & rhs) const
        {
            return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.info.partition_id)
                   < std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.partition_id);
        }

        bool operator() (const DataPartStateAndPartitionID & lhs, const DataPartStateAndInfo & rhs) const
        {
            return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.partition_id)
                   < std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.info.partition_id);
        }
    };

    using DataParts = std::set<DataPartPtr, LessDataPart>;
    using MutableDataParts = std::set<MutableDataPartPtr, LessDataPart>;
    using DataPartsVector = std::vector<DataPartPtr>;

    DataPartsLock lockParts() const { return DataPartsLock(data_parts_mutex); }

    using OperationDataPartsLock = std::unique_lock<std::mutex>;
    OperationDataPartsLock lockOperationsWithParts() const { return OperationDataPartsLock(operation_with_data_parts_mutex); }

    MergeTreeDataPartFormat choosePartFormat(size_t bytes_uncompressed, size_t rows_count) const;
    MergeTreeDataPartFormat choosePartFormatOnDisk(size_t bytes_uncompressed, size_t rows_count) const;
    MergeTreeDataPartBuilder getDataPartBuilder(const String & name, const VolumePtr & volume, const String & part_dir) const;

    /// Auxiliary object to add a set of parts into the working set in two steps:
    /// * First, as PreActive parts (the parts are ready, but not yet in the active set).
    /// * Next, if commit() is called, the parts are added to the active set and the parts that are
    ///   covered by them are marked Outdated.
    /// If neither commit() nor rollback() was called, the destructor rollbacks the operation.
    class Transaction : private boost::noncopyable
    {
    public:
        Transaction(MergeTreeData & data_, MergeTreeTransaction * txn_);

        DataPartsVector commit(DataPartsLock * acquired_parts_lock = nullptr);

        void addPart(MutableDataPartPtr & part);

        void rollback();

        /// Immediately remove parts from table's data_parts set and change part
        /// state to temporary. Useful for new parts which not present in table.
        void rollbackPartsToTemporaryState();

        size_t size() const { return precommitted_parts.size(); }
        bool isEmpty() const { return precommitted_parts.empty(); }

        ~Transaction()
        {
            try
            {
                rollback();
            }
            catch (...)
            {
                tryLogCurrentException("~MergeTreeData::Transaction");
            }
        }
        void clear();

        TransactionID getTID() const;

    private:
        friend class MergeTreeData;

        MergeTreeData & data;
        MergeTreeTransaction * txn;
        MutableDataParts precommitted_parts;
        MutableDataParts locked_parts;

    };

    using TransactionUniquePtr = std::unique_ptr<Transaction>;

    using PathWithDisk = std::pair<String, DiskPtr>;

    struct PartsTemporaryRename : private boost::noncopyable
    {
        PartsTemporaryRename(
            const MergeTreeData & storage_,
            const String & source_dir_)
            : storage(storage_)
            , source_dir(source_dir_)
        {
        }

        /// Adds part to rename. Both names are relative to relative_data_path.
        void addPart(const String & old_name, const String & new_name, const DiskPtr & disk);

        /// Renames part from old_name to new_name
        void tryRenameAll();

        /// Renames all added parts from new_name to old_name if old name is not empty
        ~PartsTemporaryRename();

        struct RenameInfo
        {
            String old_name;
            String new_name;
            /// Disk cannot be changed
            DiskPtr disk;
        };

        const MergeTreeData & storage;
        const String source_dir;
        std::vector<RenameInfo> old_and_new_names;
        bool renamed = false;
    };

    /// Parameters for various modes.
    struct MergingParams
    {
        /// Merging mode. See above.
        enum Mode
        {
            Ordinary            = 0,    /// Enum values are saved. Do not change them.
            Collapsing          = 1,
            Summing             = 2,
            Aggregating         = 3,
            Replacing           = 5,
            Graphite            = 6,
            VersionedCollapsing = 7,
        };

        Mode mode;

        /// For Collapsing and VersionedCollapsing mode.
        String sign_column;

        /// For Replacing mode. Can be empty for Replacing.
        String is_deleted_column;

        /// For Summing mode. If empty - columns_to_sum is determined automatically.
        Names columns_to_sum;

        /// For Replacing and VersionedCollapsing mode. Can be empty for Replacing.
        String version_column;

        /// For Graphite mode.
        Graphite::Params graphite_params;

        /// Check that needed columns are present and have correct types.
        void check(const StorageInMemoryMetadata & metadata) const;

        String getModeName() const;
    };

    /// Attach the table corresponding to the directory in full_path inside policy (must end with /), with the given columns.
    /// Correctness of names and paths is not checked.
    ///
    /// date_column_name - if not empty, the name of the Date column used for partitioning by month.
    ///     Otherwise, partition_by_ast is used for partitioning.
    ///
    /// order_by_ast - a single expression or a tuple. It is used as a sorting key
    ///     (an ASTExpressionList used for sorting data in parts);
    /// primary_key_ast - can be nullptr, an expression, or a tuple.
    ///     Used to determine an ASTExpressionList values of which are written in the primary.idx file
    ///     for one row in every `index_granularity` rows to speed up range queries.
    ///     Primary key must be a prefix of the sorting key;
    ///     If it is nullptr, then it will be determined from order_by_ast.
    ///
    /// require_part_metadata - should checksums.txt and columns.txt exist in the part directory.
    /// attach - whether the existing table is attached or the new table is created.
    MergeTreeData(const StorageID & table_id_,
                  const StorageInMemoryMetadata & metadata_,
                  ContextMutablePtr context_,
                  const String & date_column_name,
                  const MergingParams & merging_params_,
                  std::unique_ptr<MergeTreeSettings> settings_,
                  bool require_part_metadata_,
                  bool attach,
                  BrokenPartCallback broken_part_callback_ = [](const String &){});

    /// Build a block of minmax and count values of a MergeTree table. These values are extracted
    /// from minmax_indices, the first expression of primary key, and part rows.
    ///
    /// has_filter - if query has no filter, bypass partition pruning completely
    ///
    /// query_info - used to filter unneeded parts
    ///
    /// parts - part set to filter
    ///
    /// normal_parts - collects parts that don't have all the needed values to form the block.
    /// Specifically, this is when a part doesn't contain a final mark and the related max value is
    /// required.
    Block getMinMaxCountProjectionBlock(
        const StorageMetadataPtr & metadata_snapshot,
        const Names & required_columns,
        bool has_filter,
        const SelectQueryInfo & query_info,
        const DataPartsVector & parts,
        DataPartsVector & normal_parts,
        const PartitionIdToMaxBlock * max_block_numbers_to_read,
        ContextPtr query_context) const;

    std::optional<ProjectionCandidate> getQueryProcessingStageWithAggregateProjection(
        ContextPtr query_context, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info) const;

    QueryProcessingStage::Enum getQueryProcessingStage(
        ContextPtr query_context,
        QueryProcessingStage::Enum to_stage,
        const StorageSnapshotPtr & storage_snapshot,
        SelectQueryInfo & info) const override;

    ReservationPtr reserveSpace(UInt64 expected_size, VolumePtr & volume) const;
    static ReservationPtr tryReserveSpace(UInt64 expected_size, const IDataPartStorage & data_part_storage);
    static ReservationPtr reserveSpace(UInt64 expected_size, const IDataPartStorage & data_part_storage);

    static bool partsContainSameProjections(const DataPartPtr & left, const DataPartPtr & right);

    StoragePolicyPtr getStoragePolicy() const override;

    bool isMergeTree() const override { return true; }

    bool supportsPrewhere() const override { return true; }

    bool supportsFinal() const override;

    bool supportsSubcolumns() const override { return true; }

    bool supportsTTL() const override { return true; }

    bool supportsDynamicSubcolumns() const override { return true; }

    bool supportsLightweightDelete() const override;

    bool areAsynchronousInsertsEnabled() const override { return getSettings()->async_insert; }

    bool supportsTrivialCountOptimization() const override { return !hasLightweightDeletedMask(); }

    NamesAndTypesList getVirtuals() const override;

    bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, ContextPtr, const StorageMetadataPtr & metadata_snapshot) const override;

    /// Snapshot for MergeTree contains the current set of data parts
    /// at the moment of the start of query.
    struct SnapshotData : public StorageSnapshot::Data
    {
        DataPartsVector parts;
        std::vector<AlterConversionsPtr> alter_conversions;
    };

    StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override;

    /// The same as above but does not hold vector of data parts.
    StorageSnapshotPtr getStorageSnapshotWithoutData(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override;

    /// Load the set of data parts from disk. Call once - immediately after the object is created.
    void loadDataParts(bool skip_sanity_checks);

    String getLogName() const { return *std::atomic_load(&log_name); }

    Int64 getMaxBlockNumber() const;

    struct ProjectionPartsVector
    {
        DataPartsVector projection_parts;
        DataPartsVector data_parts;
    };

    /// Returns a copy of the list so that the caller shouldn't worry about locks.
    DataParts getDataParts(const DataPartStates & affordable_states) const;

    DataPartsVector getDataPartsVectorForInternalUsage(
        const DataPartStates & affordable_states, const DataPartsLock & lock, DataPartStateVector * out_states = nullptr) const;

    /// Returns sorted list of the parts with specified states
    ///  out_states will contain snapshot of each part state
    DataPartsVector getDataPartsVectorForInternalUsage(
        const DataPartStates & affordable_states, DataPartStateVector * out_states = nullptr) const;
    /// Same as above but only returns projection parts
    ProjectionPartsVector getProjectionPartsVectorForInternalUsage(
        const DataPartStates & affordable_states, DataPartStateVector * out_states = nullptr) const;


    /// Returns absolutely all parts (and snapshot of their states)
    DataPartsVector getAllDataPartsVector(DataPartStateVector * out_states = nullptr) const;

    size_t getAllPartsCount() const;

    /// Return the number of marks in all parts
    size_t getTotalMarksCount() const;

    /// Same as above but only returns projection parts
    ProjectionPartsVector getAllProjectionPartsVector(MergeTreeData::DataPartStateVector * out_states = nullptr) const;

    /// Returns parts in Active state
    DataParts getDataPartsForInternalUsage() const;
    DataPartsVector getDataPartsVectorForInternalUsage() const;

    void filterVisibleDataParts(DataPartsVector & maybe_visible_parts, CSN snapshot_version, TransactionID current_tid) const;

    /// Returns parts that visible with current snapshot
    DataPartsVector getVisibleDataPartsVector(ContextPtr local_context) const;
    DataPartsVector getVisibleDataPartsVectorUnlocked(ContextPtr local_context, const DataPartsLock & lock) const;
    DataPartsVector getVisibleDataPartsVector(const MergeTreeTransactionPtr & txn) const;
    DataPartsVector getVisibleDataPartsVector(CSN snapshot_version, TransactionID current_tid) const;

    /// Returns a part in Active state with the given name or a part containing it. If there is no such part, returns nullptr.
    DataPartPtr getActiveContainingPart(const String & part_name) const;
    DataPartPtr getActiveContainingPart(const String & part_name, DataPartsLock & lock) const;
    DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info) const;
    DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock & lock) const;

    /// Swap part with it's identical copy (possible with another path on another disk).
    /// If original part is not active or doesn't exist exception will be thrown.
    void swapActivePart(MergeTreeData::DataPartPtr part_copy, DataPartsLock &);

    /// Returns all parts in specified partition
    DataPartsVector getVisibleDataPartsVectorInPartition(MergeTreeTransaction * txn, const String & partition_id, DataPartsLock * acquired_lock = nullptr) const;
    DataPartsVector getVisibleDataPartsVectorInPartition(ContextPtr local_context, const String & partition_id, DataPartsLock & lock) const;
    DataPartsVector getVisibleDataPartsVectorInPartition(ContextPtr local_context, const String & partition_id) const;
    DataPartsVector getVisibleDataPartsVectorInPartitions(ContextPtr local_context, const std::unordered_set<String> & partition_ids) const;

    DataPartsVector getDataPartsVectorInPartitionForInternalUsage(const DataPartState & state, const String & partition_id, DataPartsLock * acquired_lock = nullptr) const;
    DataPartsVector getDataPartsVectorInPartitionForInternalUsage(const DataPartStates & affordable_states, const String & partition_id, DataPartsLock * acquired_lock = nullptr) const;

    /// Returns the part with the given name and state or nullptr if no such part.
    DataPartPtr getPartIfExistsUnlocked(const String & part_name, const DataPartStates & valid_states, DataPartsLock & acquired_lock) const;
    DataPartPtr getPartIfExistsUnlocked(const MergeTreePartInfo & part_info, const DataPartStates & valid_states, DataPartsLock & acquired_lock) const;
    DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states) const;
    DataPartPtr getPartIfExists(const MergeTreePartInfo & part_info, const DataPartStates & valid_states) const;

    /// Total size of active parts in bytes.
    size_t getTotalActiveSizeInBytes() const;

    size_t getTotalActiveSizeInRows() const;

    size_t getActivePartsCount() const;

    size_t getOutdatedPartsCount() const;

    size_t getNumberOfOutdatedPartsWithExpiredRemovalTime() const;

    /// Returns a pair with: max number of parts in partition across partitions; sum size of parts inside that partition.
    /// (if there are multiple partitions with max number of parts, the sum size of parts is returned for arbitrary of them)
    std::pair<size_t, size_t> getMaxPartsCountAndSizeForPartitionWithState(DataPartState state) const;
    std::pair<size_t, size_t> getMaxPartsCountAndSizeForPartition() const;

    size_t getMaxOutdatedPartsCountForPartition() const;

    /// Get min value of part->info.getDataVersion() for all active parts.
    /// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition.
    std::optional<Int64> getMinPartDataVersion() const;

    /// Returns all detached parts
    DetachedPartsInfo getDetachedParts() const;

    static void validateDetachedPartName(const String & name);

    void dropDetached(const ASTPtr & partition, bool part, ContextPtr context);

    MutableDataPartsVector tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part,
                                                ContextPtr context, PartsTemporaryRename & renamed_parts);

    /// If the table contains too many active parts, sleep for a while to give them time to merge.
    /// If until is non-null, wake up from the sleep earlier if the event happened.
    /// The decision to delay or throw is made according to settings 'parts_to_delay_insert' and 'parts_to_throw_insert'.
    void delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context, bool allow_throw) const;

    /// If the table contains too many unfinished mutations, sleep for a while to give them time to execute.
    /// If until is non-null, wake up from the sleep earlier if the event happened.
    /// The decision to delay or throw is made according to settings 'number_of_mutations_to_delay' and 'number_of_mutations_to_throw'.
    void delayMutationOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const;

    /// Returns number of unfinished mutations (is_done = 0).
    virtual size_t getNumberOfUnfinishedMutations() const = 0;

    /// Renames temporary part to a permanent part and adds it to the parts set.
    /// It is assumed that the part does not intersect with existing parts.
    /// Adds the part in the PreActive state (the part will be added to the active set later with out_transaction->commit()).
    /// Returns true if part was added. Returns false if part is covered by bigger part.
    bool renameTempPartAndAdd(
        MutableDataPartPtr & part,
        Transaction & transaction,
        DataPartsLock & lock);

    /// The same as renameTempPartAndAdd but the block range of the part can contain existing parts.
    /// Returns all parts covered by the added part (in ascending order).
    DataPartsVector renameTempPartAndReplace(
        MutableDataPartPtr & part,
        Transaction & out_transaction);

    /// Unlocked version of previous one. Useful when added multiple parts with a single lock.
    bool renameTempPartAndReplaceUnlocked(
        MutableDataPartPtr & part,
        Transaction & out_transaction,
        DataPartsLock & lock,
        DataPartsVector * out_covered_parts = nullptr);

    /// Remove parts from working set immediately (without wait for background
    /// process). Transfer part state to temporary. Have very limited usage only
    /// for new parts which aren't already present in table.
    void removePartsFromWorkingSetImmediatelyAndSetTemporaryState(const DataPartsVector & remove);

    /// Removes parts from the working set parts.
    /// Parts in add must already be in data_parts with PreActive, Active, or Outdated states.
    /// If clear_without_timeout is true, the parts will be deleted at once, or during the next call to
    /// clearOldParts (ignoring old_parts_lifetime).
    void removePartsFromWorkingSet(MergeTreeTransaction * txn, const DataPartsVector & remove, bool clear_without_timeout, DataPartsLock * acquired_lock = nullptr);
    void removePartsFromWorkingSet(MergeTreeTransaction * txn, const DataPartsVector & remove, bool clear_without_timeout, DataPartsLock & acquired_lock);

    /// Removes all parts covered by drop_range from the working set parts.
    /// Used in REPLACE PARTITION command.
    void removePartsInRangeFromWorkingSet(MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock);

    DataPartsVector grabActivePartsToRemoveForDropRange(
        MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock);
    /// This wrapper is required to restrict access to parts in Deleting state
    class PartToRemoveFromZooKeeper
    {
        DataPartPtr part;
        bool was_active;

    public:
        explicit PartToRemoveFromZooKeeper(DataPartPtr && part_, bool was_active_ = true)
         : part(std::move(part_)), was_active(was_active_)
        {
        }

        /// It's safe to get name of any part
        const String & getPartName() const { return part->name; }

        DataPartPtr getPartIfItWasActive() const
        {
            return was_active ? part : nullptr;
        }
    };

    using PartsToRemoveFromZooKeeper = std::vector<PartToRemoveFromZooKeeper>;

    /// Same as above, but also returns list of parts to remove from ZooKeeper.
    /// It includes parts that have been just removed by these method
    /// and Outdated parts covered by drop_range that were removed earlier for any reason.
    PartsToRemoveFromZooKeeper removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(
        MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock);

    /// Restores Outdated part and adds it to working set
    void restoreAndActivatePart(const DataPartPtr & part, DataPartsLock * acquired_lock = nullptr);

    /// Renames the part to detached/<prefix>_<part> and removes it from data_parts,
    //// so it will not be deleted in clearOldParts.
    /// If restore_covered is true, adds to the working set inactive parts, which were merged into the deleted part.
    /// NOTE: This method is safe to use only for parts which nobody else holds (like on server start or for parts which was not committed).
    /// For active parts it's unsafe because this method modifies fields of part (rename) while some other thread can try to read it.
    void forcefullyMovePartToDetachedAndRemoveFromMemory(const DataPartPtr & part, const String & prefix = "", bool restore_covered = false);

    /// This method should not be here, but async loading of Outdated parts is implemented in MergeTreeData
    virtual void forcefullyRemoveBrokenOutdatedPartFromZooKeeperBeforeDetaching(const String & /*part_name*/) {}

    /// Outdate broken part, set remove time to zero (remove as fast as possible) and make clone in detached directory.
    void outdateUnexpectedPartAndCloneToDetached(const DataPartPtr & part);

    /// If the part is Obsolete and not used by anybody else, immediately delete it from filesystem and remove from memory.
    bool tryRemovePartImmediately(DataPartPtr && part);

    /// Returns old inactive parts that can be deleted. At the same time removes them from the list of parts but not from the disk.
    /// If 'force' - don't wait for old_parts_lifetime.
    DataPartsVector grabOldParts(bool force = false);

    /// Reverts the changes made by grabOldParts(), parts should be in Deleting state.
    void rollbackDeletingParts(const DataPartsVector & parts);

    /// Removes parts from data_parts, they should be in Deleting state
    void removePartsFinally(const DataPartsVector & parts);

    /// Delete irrelevant parts from memory and disk.
    /// If 'force' - don't wait for old_parts_lifetime.
    size_t clearOldPartsFromFilesystem(bool force = false);
    /// Try to clear parts from filesystem. Throw exception in case of errors.
    void clearPartsFromFilesystem(const DataPartsVector & parts, bool throw_on_error = true, NameSet * parts_failed_to_delete = nullptr);

    /// Delete WAL files containing parts, that all already stored on disk.
    size_t clearOldWriteAheadLogs();

    size_t clearOldBrokenPartsFromDetachedDirectory();

    /// Delete all directories which names begin with "tmp"
    /// Must be called with locked lockForShare() because it's using relative_data_path.
    size_t clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes = {"tmp_", "tmp-fetch_"});
    size_t clearOldTemporaryDirectories(const String & root_path, size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes);

    size_t clearEmptyParts();

    /// After the call to dropAllData() no method can be called.
    /// Deletes the data directory and flushes the uncompressed blocks cache and the marks cache.
    void dropAllData();

    /// This flag is for hardening and assertions.
    bool all_data_dropped = false;

    /// Drop data directories if they are empty. It is safe to call this method if table creation was unsuccessful.
    void dropIfEmpty();

    /// Moves the entire data directory. Flushes the uncompressed blocks cache
    /// and the marks cache. Must be called with locked lockExclusively()
    /// because changes relative_data_path.
    void rename(const String & new_table_path, const StorageID & new_table_id) override;

    /// Also rename log names.
    void renameInMemory(const StorageID & new_table_id) override;

    /// Check if the ALTER can be performed:
    /// - all needed columns are present.
    /// - all type conversions can be done.
    /// - columns corresponding to primary key, indices, sign, sampling expression and date are not affected.
    /// If something is wrong, throws an exception.
    void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override;

    /// Checks if the Mutation can be performed.
    /// (currently no additional checks: always ok)
    void checkMutationIsPossible(const MutationCommands & commands, const Settings & settings) const override;

    /// Checks that partition name in all commands is valid
    void checkAlterPartitionIsPossible(const PartitionCommands & commands, const StorageMetadataPtr & metadata_snapshot, const Settings & settings) const override;

    /// Change MergeTreeSettings
    void changeSettings(
        const ASTPtr & new_settings,
        AlterLockHolder & table_lock_holder);

    /// Should be called if part data is suspected to be corrupted.
    /// Has the ability to check all other parts
    /// which reside on the same disk of the suspicious part.
    void reportBrokenPart(MergeTreeData::DataPartPtr data_part) const;

    /// TODO (alesap) Duplicate method required for compatibility.
    /// Must be removed.
    static ASTPtr extractKeyExpressionList(const ASTPtr & node)
    {
        return DB::extractKeyExpressionList(node);
    }

    /** Create local backup (snapshot) for parts with specified prefix.
      * Backup is created in directory clickhouse_dir/shadow/i/, where i - incremental number,
      *  or if 'with_name' is specified - backup is created in directory with specified name.
      */
    PartitionCommandsResultInfo freezePartition(
        const ASTPtr & partition,
        const StorageMetadataPtr & metadata_snapshot,
        const String & with_name,
        ContextPtr context,
        TableLockHolder & table_lock_holder);

    /// Freezes all parts.
    PartitionCommandsResultInfo freezeAll(
        const String & with_name,
        const StorageMetadataPtr & metadata_snapshot,
        ContextPtr context,
        TableLockHolder & table_lock_holder);

    /// Unfreezes particular partition.
    PartitionCommandsResultInfo unfreezePartition(
        const ASTPtr & partition,
        const String & backup_name,
        ContextPtr context,
        TableLockHolder & table_lock_holder);

    /// Unfreezes all parts.
    PartitionCommandsResultInfo unfreezeAll(
        const String & backup_name,
        ContextPtr context,
        TableLockHolder & table_lock_holder);

    /// Extract data from the backup and put it to the storage.
    void restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions) override;

    /// Returns true if the storage supports backup/restore for specific partitions.
    bool supportsBackupPartition() const override { return true; }

    /// Moves partition to specified Disk
    void movePartitionToDisk(const ASTPtr & partition, const String & name, bool moving_part, ContextPtr context);

    /// Moves partition to specified Volume
    void movePartitionToVolume(const ASTPtr & partition, const String & name, bool moving_part, ContextPtr context);

    /// Checks that Partition could be dropped right now
    /// Otherwise - throws an exception with detailed information.
    /// We do not use mutex because it is not very important that the size could change during the operation.
    void checkPartitionCanBeDropped(const ASTPtr & partition, ContextPtr local_context);

    void checkPartCanBeDropped(const String & part_name);

    Pipe alterPartition(
        const StorageMetadataPtr & metadata_snapshot,
        const PartitionCommands & commands,
        ContextPtr query_context) override;

    size_t getColumnCompressedSize(const std::string & name) const
    {
        auto lock = lockParts();
        const auto it = column_sizes.find(name);
        return it == std::end(column_sizes) ? 0 : it->second.data_compressed;
    }

    ColumnSizeByName getColumnSizes() const override
    {
        auto lock = lockParts();
        return column_sizes;
    }

    /// Creates description of columns of data type Object from the range of data parts.
    static ColumnsDescription getConcreteObjectColumns(
        const DataPartsVector & parts, const ColumnsDescription & storage_columns);

    IndexSizeByName getSecondaryIndexSizes() const override
    {
        auto lock = lockParts();
        return secondary_index_sizes;
    }

    /// For ATTACH/DETACH/DROP PARTITION.
    String getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr context, DataPartsLock * acquired_lock = nullptr) const;
    std::unordered_set<String> getPartitionIDsFromQuery(const ASTs & asts, ContextPtr context) const;
    std::set<String> getPartitionIdsAffectedByCommands(const MutationCommands & commands, ContextPtr query_context) const;

    /// Returns set of partition_ids of all Active parts
    std::unordered_set<String> getAllPartitionIds() const;

    /// Extracts MergeTreeData of other *MergeTree* storage
    ///  and checks that their structure suitable for ALTER TABLE ATTACH PARTITION FROM
    /// Tables structure should be locked.
    MergeTreeData & checkStructureAndGetMergeTreeData(const StoragePtr & source_table, const StorageMetadataPtr & src_snapshot, const StorageMetadataPtr & my_snapshot) const;
    MergeTreeData & checkStructureAndGetMergeTreeData(IStorage & source_table, const StorageMetadataPtr & src_snapshot, const StorageMetadataPtr & my_snapshot) const;

    std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> cloneAndLoadDataPartOnSameDisk(
        const MergeTreeData::DataPartPtr & src_part, const String & tmp_part_prefix,
        const MergeTreePartInfo & dst_part_info, const StorageMetadataPtr & metadata_snapshot,
        const IDataPartStorage::ClonePartParams & params);

    virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0;

    /// Returns true if table can create new parts with adaptive granularity
    /// Has additional constraint in replicated version
    virtual bool canUseAdaptiveGranularity() const
    {
        const auto settings = getSettings();
        return settings->index_granularity_bytes != 0 &&
            (settings->enable_mixed_granularity_parts || !has_non_adaptive_index_granularity_parts);
    }

    /// Get constant pointer to storage settings.
    /// Copy this pointer into your scope and you will
    /// get consistent settings.
    MergeTreeSettingsPtr getSettings() const
    {
        return storage_settings.get();
    }

    String getRelativeDataPath() const { return relative_data_path; }

    /// Get table path on disk
    String getFullPathOnDisk(const DiskPtr & disk) const;

    /// Looks for detached part on all disks,
    /// returns pointer to the disk where part is found or nullptr (the second function throws an exception)
    DiskPtr tryGetDiskForDetachedPart(const String & part_name) const;
    DiskPtr getDiskForDetachedPart(const String & part_name) const;

    bool storesDataOnDisk() const override { return !isStaticStorage(); }
    Strings getDataPaths() const override;

    /// Reserves space at least 1MB.
    ReservationPtr reserveSpace(UInt64 expected_size) const;

    /// Reserves space at least 1MB on specific disk or volume.
    static ReservationPtr reserveSpace(UInt64 expected_size, SpacePtr space);
    static ReservationPtr tryReserveSpace(UInt64 expected_size, SpacePtr space);

    /// Reserves space at least 1MB preferring best destination according to `ttl_infos`.
    ReservationPtr reserveSpacePreferringTTLRules(
        const StorageMetadataPtr & metadata_snapshot,
        UInt64 expected_size,
        const IMergeTreeDataPart::TTLInfos & ttl_infos,
        time_t time_of_move,
        size_t min_volume_index = 0,
        bool is_insert = false,
        DiskPtr selected_disk = nullptr) const;

    ReservationPtr tryReserveSpacePreferringTTLRules(
        const StorageMetadataPtr & metadata_snapshot,
        UInt64 expected_size,
        const IMergeTreeDataPart::TTLInfos & ttl_infos,
        time_t time_of_move,
        size_t min_volume_index = 0,
        bool is_insert = false,
        DiskPtr selected_disk = nullptr) const;

    /// Reserves space for the part based on the distribution of "big parts" in the same partition.
    /// Parts with estimated size larger than `min_bytes_to_rebalance_partition_over_jbod` are
    /// considered as big. The priority is lower than TTL. If reservation fails, return nullptr.
    ReservationPtr balancedReservation(
        const StorageMetadataPtr & metadata_snapshot,
        size_t part_size,
        size_t max_volume_index,
        const String & part_name,
        const MergeTreePartInfo & part_info,
        MergeTreeData::DataPartsVector covered_parts,
        std::optional<CurrentlySubmergingEmergingTagger> * tagger_ptr,
        const IMergeTreeDataPart::TTLInfos * ttl_infos,
        bool is_insert = false);

    /// Choose disk with max available free space
    /// Reserves 0 bytes
    ReservationPtr makeEmptyReservationOnLargestDisk() const { return getStoragePolicy()->makeEmptyReservationOnLargestDisk(); }

    Disks getDisks() const { return getStoragePolicy()->getDisks(); }

    /// Return alter conversions for part which must be applied on fly.
    AlterConversionsPtr getAlterConversionsForPart(MergeTreeDataPartPtr part) const;

    /// Returns destination disk or volume for the TTL rule according to current storage policy.
    SpacePtr getDestinationForMoveTTL(const TTLDescription & move_ttl) const;

    /// Whether INSERT of a data part which is already expired should move it immediately to a volume/disk declared in move rule.
    bool shouldPerformTTLMoveOnInsert(const SpacePtr & move_destination) const;

    /// Checks if given part already belongs destination disk or volume for the
    /// TTL rule.
    bool isPartInTTLDestination(const TTLDescription & ttl, const IMergeTreeDataPart & part) const;

    /// Get count of total merges with TTL in MergeList (system.merges) for all
    /// tables (not only current table).
    /// Method is cheap and doesn't require any locks.
    size_t getTotalMergesWithTTLInMergeList() const;

    using WriteAheadLogPtr = std::shared_ptr<MergeTreeWriteAheadLog>;
    WriteAheadLogPtr getWriteAheadLog();

    constexpr static auto EMPTY_PART_TMP_PREFIX = "tmp_empty_";
    std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> createEmptyPart(
        MergeTreePartInfo & new_part_info, const MergeTreePartition & partition,
        const String & new_part_name, const MergeTreeTransactionPtr & txn);

    MergeTreeDataFormatVersion format_version;

    /// Merging params - what additional actions to perform during merge.
    const MergingParams merging_params;

    bool is_custom_partitioned = false;

    /// Used only for old syntax tables. Never changes after init.
    Int64 minmax_idx_date_column_pos = -1; /// In a common case minmax index includes a date column.
    Int64 minmax_idx_time_column_pos = -1; /// In other cases, minmax index often includes a dateTime column.

    /// Get partition key expression on required columns
    static ExpressionActionsPtr getMinMaxExpr(const KeyDescription & partition_key, const ExpressionActionsSettings & settings);
    /// Get column names required for partition key
    static Names getMinMaxColumnsNames(const KeyDescription & partition_key);
    /// Get column types required for partition key
    static DataTypes getMinMaxColumnsTypes(const KeyDescription & partition_key);

    ExpressionActionsPtr
    getPrimaryKeyAndSkipIndicesExpression(const StorageMetadataPtr & metadata_snapshot, const MergeTreeIndices & indices) const;
    ExpressionActionsPtr
    getSortingKeyAndSkipIndicesExpression(const StorageMetadataPtr & metadata_snapshot, const MergeTreeIndices & indices) const;

    /// Get compression codec for part according to TTL rules and <compression>
    /// section from config.xml.
    CompressionCodecPtr getCompressionCodecForPart(size_t part_size_compressed, const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t current_time) const;

    std::shared_ptr<QueryIdHolder> getQueryIdHolder(const String & query_id, UInt64 max_concurrent_queries) const;

    /// Record current query id where querying the table. Throw if there are already `max_queries` queries accessing the same table.
    /// Returns false if the `query_id` already exists in the running set, otherwise return true.
    bool insertQueryIdOrThrow(const String & query_id, size_t max_queries) const;
    bool insertQueryIdOrThrowNoLock(const String & query_id, size_t max_queries) const TSA_REQUIRES(query_id_set_mutex);

    /// Remove current query id after query finished.
    void removeQueryId(const String & query_id) const;
    void removeQueryIdNoLock(const String & query_id) const TSA_REQUIRES(query_id_set_mutex);

    /// Return the partition expression types as a Tuple type. Return DataTypeUInt8 if partition expression is empty.
    DataTypePtr getPartitionValueType() const;

    /// Construct a sample block of virtual columns.
    Block getSampleBlockWithVirtualColumns() const;

    /// Construct a block consisting only of possible virtual columns for part pruning.
    /// If one_part is true, fill in at most one part.
    Block getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part, bool ignore_empty = false) const;

    /// In merge tree we do inserts with several steps. One of them:
    /// X. write part to temporary directory with some temp name
    /// Y. rename temporary directory to final name with correct block number value
    /// As temp name MergeTree use just ordinary in memory counter, but in some cases
    /// it can be useful to add additional part in temp name to avoid collisions on FS.
    /// FIXME: Currently unused.
    virtual std::string getPostfixForTempInsertName() const { return ""; }

    /// For generating names of temporary parts during insertion.
    SimpleIncrement insert_increment;

    bool has_non_adaptive_index_granularity_parts = false;

    /// True if at least one part contains lightweight delete.
    mutable std::atomic_bool has_lightweight_delete_parts = false;

    /// Parts that currently moving from disk/volume to another.
    /// This set have to be used with `currently_processing_in_background_mutex`.
    /// Moving may conflict with merges and mutations, but this is OK, because
    /// if we decide to move some part to another disk, than we
    /// assuredly will choose this disk for containing part, which will appear
    /// as result of merge or mutation.
    DataParts currently_moving_parts;

    /// Mutex for currently_moving_parts
    mutable std::mutex moving_parts_mutex;

    PinnedPartUUIDsPtr getPinnedPartUUIDs() const;

    /// Schedules background job to like merge/mutate/fetch an executor
    virtual bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) = 0;
    /// Schedules job to move parts between disks/volumes and so on.
    bool scheduleDataMovingJob(BackgroundJobsAssignee & assignee);
    bool areBackgroundMovesNeeded() const;


    /// Lock part in zookeeper for shared data in several nodes
    /// Overridden in StorageReplicatedMergeTree
    virtual void lockSharedData(const IMergeTreeDataPart &, bool = false, std::optional<HardlinkedFiles> = {}) const {} /// NOLINT

    /// Unlock shared data part in zookeeper
    /// Overridden in StorageReplicatedMergeTree
    virtual std::pair<bool, NameSet> unlockSharedData(const IMergeTreeDataPart &) const { return std::make_pair(true, NameSet{}); }

    /// Fetch part only if some replica has it on shared storage like S3
    /// Overridden in StorageReplicatedMergeTree
    virtual MutableDataPartPtr tryToFetchIfShared(const IMergeTreeDataPart &, const DiskPtr &, const String &) { return nullptr; }

    /// Check shared data usage on other replicas for detached/freezed part
    /// Remove local files and remote files if needed
    virtual bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name);

    virtual String getTableSharedID() const { return ""; }

    /// Store metadata for replicated tables
    /// Do nothing for non-replicated tables
    virtual void createAndStoreFreezeMetadata(DiskPtr disk, DataPartPtr part, String backup_part_path) const;

    /// Parts that currently submerging (merging to bigger parts) or emerging
    /// (to be appeared after merging finished). These two variables have to be used
    /// with `currently_submerging_emerging_mutex`.
    DataParts currently_submerging_big_parts;
    std::map<String, EmergingPartInfo> currently_emerging_big_parts;
    /// Mutex for currently_submerging_parts and currently_emerging_parts
    mutable std::mutex currently_submerging_emerging_mutex;

    /// Used for freezePartitionsByMatcher and unfreezePartitionsByMatcher
    using MatcherFn = std::function<bool(const String &)>;

    /// Returns an object that protects temporary directory from cleanup
    scope_guard getTemporaryPartDirectoryHolder(const String & part_dir_name) const;

    void waitForOutdatedPartsToBeLoaded() const;
    bool canUsePolymorphicParts() const;

    /// TODO: make enabled by default in the next release if no problems found.
    bool allowRemoveStaleMovingParts() const;

protected:
    friend class IMergeTreeDataPart;
    friend class MergeTreeDataMergerMutator;
    friend struct ReplicatedMergeTreeTableMetadata;
    friend class StorageReplicatedMergeTree;
    friend class MergeTreeDataWriter;
    friend class MergeTask;
    friend class IPartMetadataManager;
    friend class IMergedBlockOutputStream; // for access to log

    bool require_part_metadata;

    /// Relative path data, changes during rename for ordinary databases use
    /// under lockForShare if rename is possible.
    String relative_data_path;

    /// Current column sizes in compressed and uncompressed form.
    ColumnSizeByName column_sizes;

    /// Current secondary index sizes in compressed and uncompressed form.
    IndexSizeByName secondary_index_sizes;

    /// Engine-specific methods
    BrokenPartCallback broken_part_callback;

    /// log_name will change during table RENAME. Use atomic_shared_ptr to allow concurrent RW.
    /// NOTE clang-14 doesn't have atomic_shared_ptr yet. Use std::atomic* operations for now.
    std::shared_ptr<String> log_name;
    std::atomic<Poco::Logger *> log;

    /// Storage settings.
    /// Use get and set to receive readonly versions.
    MultiVersion<MergeTreeSettings> storage_settings;

    /// Used to determine which UUIDs to send to root query executor for deduplication.
    mutable SharedMutex pinned_part_uuids_mutex;
    PinnedPartUUIDsPtr pinned_part_uuids;

    /// True if at least one part was created/removed with transaction.
    mutable std::atomic_bool transactions_enabled = false;

    std::atomic_bool data_parts_loading_finished = false;

    /// Work with data parts

    struct TagByInfo{};
    struct TagByStateAndInfo{};

    void initializeDirectoriesAndFormatVersion(const std::string & relative_data_path_, bool attach, const std::string & date_column_name, bool need_create_directories=true);

    static const MergeTreePartInfo & dataPartPtrToInfo(const DataPartPtr & part)
    {
        return part->info;
    }

    static DataPartStateAndInfo dataPartPtrToStateAndInfo(const DataPartPtr & part)
    {
        return {part->getState(), part->info};
    }

    using DataPartsIndexes = boost::multi_index_container<DataPartPtr,
        boost::multi_index::indexed_by<
            /// Index by Info
            boost::multi_index::ordered_unique<
                boost::multi_index::tag<TagByInfo>,
                boost::multi_index::global_fun<const DataPartPtr &, const MergeTreePartInfo &, dataPartPtrToInfo>
            >,
            /// Index by (State, Info), is used to obtain ordered slices of parts with the same state
            boost::multi_index::ordered_unique<
                boost::multi_index::tag<TagByStateAndInfo>,
                boost::multi_index::global_fun<const DataPartPtr &, DataPartStateAndInfo, dataPartPtrToStateAndInfo>,
                LessStateDataPart
            >
        >
    >;

    /// Current set of data parts.
    mutable std::mutex data_parts_mutex;
    DataPartsIndexes data_parts_indexes;
    DataPartsIndexes::index<TagByInfo>::type & data_parts_by_info;
    DataPartsIndexes::index<TagByStateAndInfo>::type & data_parts_by_state_and_info;

    /// Mutex for critical sections which alter set of parts
    /// It is like truncate, drop/detach partition
    mutable std::mutex operation_with_data_parts_mutex;

    /// Current description of columns of data type Object.
    /// It changes only when set of parts is changed and is
    /// protected by @data_parts_mutex.
    ColumnsDescription object_columns;

    MergeTreePartsMover parts_mover;

    /// Executors are common for both ReplicatedMergeTree and plain MergeTree
    /// but they are being started and finished in derived classes, so let them be protected.
    ///
    /// Why there are two executors, not one? Or an executor for each kind of operation?
    /// It is historically formed.
    /// Another explanation is that moving operations are common for Replicated and Plain MergeTree classes.
    /// Task that schedules this operations is executed with its own timetable and triggered in a specific places in code.
    /// And for ReplicatedMergeTree we don't have LogEntry type for this operation.
    BackgroundJobsAssignee background_operations_assignee;
    BackgroundJobsAssignee background_moves_assignee;
    bool use_metadata_cache;

    /// Strongly connected with two fields above.
    /// Every task that is finished will ask to assign a new one into an executor.
    /// These callbacks will be passed to the constructor of each task.
    IExecutableTask::TaskResultCallback common_assignee_trigger;
    IExecutableTask::TaskResultCallback moves_assignee_trigger;

    using DataPartIteratorByInfo = DataPartsIndexes::index<TagByInfo>::type::iterator;
    using DataPartIteratorByStateAndInfo = DataPartsIndexes::index<TagByStateAndInfo>::type::iterator;

    boost::iterator_range<DataPartIteratorByStateAndInfo> getDataPartsStateRange(DataPartState state) const
    {
        auto begin = data_parts_by_state_and_info.lower_bound(state, LessStateDataPart());
        auto end = data_parts_by_state_and_info.upper_bound(state, LessStateDataPart());
        return {begin, end};
    }

    boost::iterator_range<DataPartIteratorByInfo> getDataPartsPartitionRange(const String & partition_id) const
    {
        auto begin = data_parts_by_info.lower_bound(PartitionID(partition_id), LessDataPart());
        auto end = data_parts_by_info.upper_bound(PartitionID(partition_id), LessDataPart());
        return {begin, end};
    }

    /// Creates description of columns of data type Object from the range of data parts.
    static ColumnsDescription getConcreteObjectColumns(
        boost::iterator_range<DataPartIteratorByStateAndInfo> range, const ColumnsDescription & storage_columns);

    std::optional<UInt64> totalRowsByPartitionPredicateImpl(
        const SelectQueryInfo & query_info, ContextPtr context, const DataPartsVector & parts) const;

    static decltype(auto) getStateModifier(DataPartState state)
    {
        return [state] (const DataPartPtr & part) { part->setState(state); };
    }

    void modifyPartState(DataPartIteratorByStateAndInfo it, DataPartState state)
    {
        if (!data_parts_by_state_and_info.modify(it, getStateModifier(state)))
            throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't modify {}", (*it)->getNameWithState());
    }

    void modifyPartState(DataPartIteratorByInfo it, DataPartState state)
    {
        if (!data_parts_by_state_and_info.modify(data_parts_indexes.project<TagByStateAndInfo>(it), getStateModifier(state)))
            throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't modify {}", (*it)->getNameWithState());
    }

    void modifyPartState(const DataPartPtr & part, DataPartState state)
    {
        auto it = data_parts_by_info.find(part->info);
        if (it == data_parts_by_info.end() || (*it).get() != part.get())
            throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} doesn't exist (info: {})", part->name, part->info.getPartNameForLogs());

        if (!data_parts_by_state_and_info.modify(data_parts_indexes.project<TagByStateAndInfo>(it), getStateModifier(state)))
            throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't modify {}", (*it)->getNameWithState());
    }

    /// Used to serialize calls to grabOldParts.
    std::mutex grab_old_parts_mutex;
    /// The same for clearOldTemporaryDirectories.
    std::mutex clear_old_temporary_directories_mutex;

    void checkProperties(const StorageInMemoryMetadata & new_metadata, const StorageInMemoryMetadata & old_metadata, bool attach, bool allow_empty_sorting_key, ContextPtr local_context) const;

    void setProperties(const StorageInMemoryMetadata & new_metadata, const StorageInMemoryMetadata & old_metadata, bool attach = false, ContextPtr local_context = nullptr);

    void checkPartitionKeyAndInitMinMax(const KeyDescription & new_partition_key);

    void checkTTLExpressions(const StorageInMemoryMetadata & new_metadata, const StorageInMemoryMetadata & old_metadata) const;

    void checkStoragePolicy(const StoragePolicyPtr & new_storage_policy) const;

    /// Calculates column and secondary indexes sizes in compressed form for the current state of data_parts. Call with data_parts mutex locked.
    void calculateColumnAndSecondaryIndexSizesImpl();

    /// Adds or subtracts the contribution of the part to compressed column and secondary indexes sizes.
    void addPartContributionToColumnAndSecondaryIndexSizes(const DataPartPtr & part);
    void removePartContributionToColumnAndSecondaryIndexSizes(const DataPartPtr & part);

    /// If there is no part in the partition with ID `partition_id`, returns empty ptr. Should be called under the lock.
    DataPartPtr getAnyPartInPartition(const String & partition_id, DataPartsLock & data_parts_lock) const;

    /// Return parts in the Active set that are covered by the new_part_info or the part that covers it.
    /// Will check that the new part doesn't already exist and that it doesn't intersect existing part.
    DataPartsVector getActivePartsToReplace(
        const MergeTreePartInfo & new_part_info,
        const String & new_part_name,
        DataPartPtr & out_covering_part,
        DataPartsLock & data_parts_lock) const;

    DataPartsVector getCoveredOutdatedParts(
        const DataPartPtr & part,
        DataPartsLock & data_parts_lock) const;

    struct PartHierarchy
    {
        DataPartPtr duplicate_part;
        DataPartsVector covering_parts;
        DataPartsVector covered_parts;
        DataPartsVector intersected_parts;
    };

    PartHierarchy getPartHierarchy(
        const MergeTreePartInfo & part_info,
        DataPartState state,
        DataPartsLock & /* data_parts_lock */) const;

    /// Checks whether the column is in the primary key, possibly wrapped in a chain of functions with single argument.
    bool isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node, const StorageMetadataPtr & metadata_snapshot) const;

    /// Common part for |freezePartition()| and |freezeAll()|.
    PartitionCommandsResultInfo freezePartitionsByMatcher(MatcherFn matcher, const StorageMetadataPtr & metadata_snapshot, const String & with_name, ContextPtr context);
    PartitionCommandsResultInfo unfreezePartitionsByMatcher(MatcherFn matcher, const String & backup_name, ContextPtr context);

    // Partition helpers
    bool canReplacePartition(const DataPartPtr & src_part) const;

    /// Tries to drop part in background without any waits or throwing exceptions in case of errors.
    virtual void dropPartNoWaitNoThrow(const String & part_name) = 0;

    virtual void dropPart(const String & part_name, bool detach, ContextPtr context) = 0;
    virtual void dropPartition(const ASTPtr & partition, bool detach, ContextPtr context) = 0;
    virtual PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, ContextPtr context) = 0;
    virtual void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr context) = 0;
    virtual void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr context) = 0;

    virtual void fetchPartition(
        const ASTPtr & partition,
        const StorageMetadataPtr & metadata_snapshot,
        const String & from,
        bool fetch_part,
        ContextPtr query_context);

    virtual void movePartitionToShard(const ASTPtr & partition, bool move_part, const String & to, ContextPtr query_context);

    void writePartLog(
        PartLogElement::Type type,
        const ExecutionStatus & execution_status,
        UInt64 elapsed_ns,
        const String & new_part_name,
        const DataPartPtr & result_part,
        const DataPartsVector & source_parts,
        const MergeListEntry * merge_entry,
        std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters);

    /// If part is assigned to merge or mutation (possibly replicated)
    /// Should be overridden by children, because they can have different
    /// mechanisms for parts locking
    virtual bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const = 0;

    /// Return most recent mutations commands for part which weren't applied
    /// Used to receive AlterConversions for part and apply them on fly. This
    /// method has different implementations for replicated and non replicated
    /// MergeTree because they store mutations in different way.
    virtual std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const DataPartPtr & part) const = 0;
    /// Moves part to specified space, used in ALTER ... MOVE ... queries
    MovePartsOutcome movePartsToSpace(const DataPartsVector & parts, SpacePtr space);

    struct PartBackupEntries
    {
        String part_name;
        UInt128 part_checksum; /// same as MinimalisticDataPartChecksums::hash_of_all_files
        BackupEntries backup_entries;
    };
    using PartsBackupEntries = std::vector<PartBackupEntries>;

    /// Makes backup entries to backup the parts of this table.
    PartsBackupEntries backupParts(const DataPartsVector & data_parts, const String & data_path_in_backup, const BackupSettings & backup_settings, const ReadSettings & read_settings, const ContextPtr & local_context);

    class RestoredPartsHolder;

    /// Restores the parts of this table from backup.
    void restorePartsFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions);
    void restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> restored_parts_holder, const MergeTreePartInfo & part_info, const String & part_path_in_backup) const;

    /// Attaches restored parts to the storage.
    virtual void attachRestoredParts(MutableDataPartsVector && parts) = 0;

    void resetObjectColumnsFromActiveParts(const DataPartsLock & lock);
    void updateObjectColumns(const DataPartPtr & part, const DataPartsLock & lock);

    /** A structure that explicitly represents a "merge tree" of parts
     *  which is implicitly presented by min-max block numbers and levels of parts.
     *  The children of node are parts which are covered by parent part.
     *  This tree provides the order of loading of parts.
     *
     *  We start to traverse tree from the top level and load parts
     *  corresposponded to nodes. If part is loaded successfully then
     *  we stop traversal at this node. Otherwise part is broken and we
     *  traverse its children and try to load covered parts which will
     *  replace broken covering part. Unloaded nodes represent outdated parts
     *  nd they are pushed to background task and loaded asynchronoulsy.
     */
    class PartLoadingTree
    {
    public:
        struct Node
        {
            Node(const MergeTreePartInfo & info_, const String & name_, const DiskPtr & disk_)
                : info(info_), name(name_), disk(disk_)
            {
            }

            const MergeTreePartInfo info;
            const String name;
            const DiskPtr disk;

            bool is_loaded = false;
            std::map<MergeTreePartInfo, std::shared_ptr<Node>> children;
        };

        struct PartLoadingInfo
        {
            PartLoadingInfo(const MergeTreePartInfo & info_, const String & name_, const DiskPtr & disk_)
                : info(info_), name(name_), disk(disk_)
            {
            }

            /// Store name explicitly because it cannot be easily
            /// retrieved from info in tables with old syntax.
            MergeTreePartInfo info;
            String name;
            DiskPtr disk;
        };

        using NodePtr = std::shared_ptr<Node>;
        using PartLoadingInfos = std::vector<PartLoadingInfo>;

        /// Builds a tree from the list of part infos.
        static PartLoadingTree build(PartLoadingInfos nodes);

        /// Traverses a tree and call @func on each node.
        /// If recursive is false traverses only the top level.
        template <typename Func>
        void traverse(bool recursive, Func && func);

    private:
        /// NOTE: Parts should be added in descending order of their levels
        /// because rearranging tree to the new root is not supported.
        void add(const MergeTreePartInfo & info, const String & name, const DiskPtr & disk);
        std::unordered_map<String, NodePtr> root_by_partition;
    };

    using PartLoadingTreeNodes = std::vector<PartLoadingTree::NodePtr>;

    struct LoadPartResult
    {
        bool is_broken = false;
        std::optional<size_t> size_of_part;
        MutableDataPartPtr part;
    };

    mutable std::mutex outdated_data_parts_mutex;
    mutable std::condition_variable outdated_data_parts_cv;

    BackgroundSchedulePool::TaskHolder outdated_data_parts_loading_task;
    PartLoadingTreeNodes outdated_unloaded_data_parts TSA_GUARDED_BY(outdated_data_parts_mutex);
    bool outdated_data_parts_loading_canceled TSA_GUARDED_BY(outdated_data_parts_mutex) = false;

    /// This has to be "true" by default, because in case of empty table or absence of Outdated parts
    /// it is automatically finished.
    bool outdated_data_parts_loading_finished TSA_GUARDED_BY(outdated_data_parts_mutex) = true;

    void loadOutdatedDataParts(bool is_async);
    void startOutdatedDataPartsLoadingTask();
    void stopOutdatedDataPartsLoadingTask();

    static void incrementInsertedPartsProfileEvent(MergeTreeDataPartType type);
    static void incrementMergedPartsProfileEvent(MergeTreeDataPartType type);

    bool addTempPart(
        MutableDataPartPtr & part,
        Transaction & out_transaction,
        DataPartsLock & lock,
        DataPartsVector * out_covered_parts);

private:
    /// Checking that candidate part doesn't break invariants: correct partition
    void checkPartPartition(MutableDataPartPtr & part, DataPartsLock & lock) const;
    void checkPartDuplicate(MutableDataPartPtr & part, Transaction & transaction, DataPartsLock & lock) const;
    void checkPartDynamicColumns(MutableDataPartPtr & part, DataPartsLock & lock) const;

    /// Preparing itself to be committed in memory: fill some fields inside part, add it to data_parts_indexes
    /// in precommitted state and to transaction
    void preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename);

    /// Low-level method for preparing parts for commit (in-memory).
    /// FIXME Merge MergeTreeTransaction and Transaction
    bool renameTempPartAndReplaceImpl(
        MutableDataPartPtr & part,
        Transaction & out_transaction,
        DataPartsLock & lock,
        DataPartsVector * out_covered_parts);

    /// RAII Wrapper for atomic work with currently moving parts
    /// Acquire them in constructor and remove them in destructor
    /// Uses data.currently_moving_parts_mutex
    struct CurrentlyMovingPartsTagger
    {
        MergeTreeMovingParts parts_to_move;
        MergeTreeData & data;
        CurrentlyMovingPartsTagger(MergeTreeMovingParts && moving_parts_, MergeTreeData & data_);

        ~CurrentlyMovingPartsTagger();
    };

    using CurrentlyMovingPartsTaggerPtr = std::shared_ptr<CurrentlyMovingPartsTagger>;

    /// Move selected parts to corresponding disks
    MovePartsOutcome moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, bool wait_for_move_if_zero_copy=false);

    /// Select parts for move and disks for them. Used in background moving processes.
    CurrentlyMovingPartsTaggerPtr selectPartsForMove();

    /// Check selected parts for movements. Used by ALTER ... MOVE queries.
    CurrentlyMovingPartsTaggerPtr checkPartsForMove(const DataPartsVector & parts, SpacePtr space);

    bool canUsePolymorphicParts(const MergeTreeSettings & settings, String & out_reason) const;

    std::mutex write_ahead_log_mutex;
    WriteAheadLogPtr write_ahead_log;

    virtual void startBackgroundMovesIfNeeded() = 0;

    bool allow_nullable_key{};

    void addPartContributionToDataVolume(const DataPartPtr & part);
    void removePartContributionToDataVolume(const DataPartPtr & part);

    void increaseDataVolume(ssize_t bytes, ssize_t rows, ssize_t parts);
    void setDataVolume(size_t bytes, size_t rows, size_t parts);

    std::atomic<size_t> total_active_size_bytes = 0;
    std::atomic<size_t> total_active_size_rows = 0;
    std::atomic<size_t> total_active_size_parts = 0;

    mutable std::atomic<size_t> total_outdated_parts_count = 0;

    // Record all query ids which access the table. It's guarded by `query_id_set_mutex` and is always mutable.
    mutable std::set<String> query_id_set TSA_GUARDED_BY(query_id_set_mutex);
    mutable std::mutex query_id_set_mutex;

    // Get partition matcher for FREEZE / UNFREEZE queries.
    MatcherFn getPartitionMatcher(const ASTPtr & partition, ContextPtr context) const;

    /// Returns default settings for storage with possible changes from global config.
    virtual std::unique_ptr<MergeTreeSettings> getDefaultSettings() const = 0;

    LoadPartResult loadDataPart(
        const MergeTreePartInfo & part_info,
        const String & part_name,
        const DiskPtr & part_disk_ptr,
        MergeTreeDataPartState to_state,
        std::mutex & part_loading_mutex);

    LoadPartResult loadDataPartWithRetries(
        const MergeTreePartInfo & part_info,
        const String & part_name,
        const DiskPtr & part_disk_ptr,
        MergeTreeDataPartState to_state,
        std::mutex & part_loading_mutex,
        size_t backoff_ms,
        size_t max_backoff_ms,
        size_t max_tries);

    std::vector<LoadPartResult> loadDataPartsFromDisk(PartLoadingTreeNodes & parts_to_load);

    void loadDataPartsFromWAL(MutableDataPartsVector & parts_from_wal);

    /// Create zero-copy exclusive lock for part and disk. Useful for coordination of
    /// distributed operations which can lead to data duplication. Implemented only in ReplicatedMergeTree.
    virtual std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String &, const DiskPtr &) { return std::nullopt; }
    virtual bool waitZeroCopyLockToDisappear(const ZeroCopyLock &, size_t) { return false; }

    /// Remove parts from disk calling part->remove(). Can do it in parallel in case of big set of parts and enabled settings.
    /// If we fail to remove some part and throw_on_error equal to `true` will throw an exception on the first failed part.
    /// Otherwise, in non-parallel case will break and return.
    void clearPartsFromFilesystemImpl(const DataPartsVector & parts, NameSet * part_names_succeed);

    static MutableDataPartPtr asMutableDeletingPart(const DataPartPtr & part);

    mutable TemporaryParts temporary_parts;

    /// Estimate the number of marks to read to make a decision whether to enable parallel replicas (distributed processing) or not
    /// Note: it could be very rough.
    bool canUseParallelReplicasBasedOnPKAnalysis(
        ContextPtr query_context,
        const StorageSnapshotPtr & storage_snapshot,
        SelectQueryInfo & query_info) const;
};

/// RAII struct to record big parts that are submerging or emerging.
/// It's used to calculate the balanced statistics of JBOD array.
struct CurrentlySubmergingEmergingTagger
{
    MergeTreeData & storage;
    String emerging_part_name;
    MergeTreeData::DataPartsVector submerging_parts;
    Poco::Logger * log;

    CurrentlySubmergingEmergingTagger(
        MergeTreeData & storage_, const String & name_, MergeTreeData::DataPartsVector && parts_, Poco::Logger * log_)
        : storage(storage_), emerging_part_name(name_), submerging_parts(std::move(parts_)), log(log_)
    {
    }

    ~CurrentlySubmergingEmergingTagger();
};


/// TODO: move it somewhere
[[ maybe_unused ]] static bool needSyncPart(size_t input_rows, size_t input_bytes, const MergeTreeSettings & settings)
{
    return ((settings.min_rows_to_fsync_after_merge && input_rows >= settings.min_rows_to_fsync_after_merge)
        || (settings.min_compressed_bytes_to_fsync_after_merge && input_bytes >= settings.min_compressed_bytes_to_fsync_after_merge));
}

}