aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/StorageJoin.cpp
blob: 121d859a3f230f2456a9060cff53df7f52dc8db7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
#include <Storages/StorageJoin.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageSet.h>
#include <Storages/TableLockHolder.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTIdentifier_fwd.h>
#include <Core/ColumnNumbers.h>
#include <DataTypes/NestedUtils.h>
#include <Interpreters/joinDispatch.h>
#include <Interpreters/MutationsInterpreter.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/castColumn.h>
#include <Common/quoteString.h>
#include <Common/Exception.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Interpreters/JoinUtils.h>

#include <Compression/CompressedWriteBuffer.h>
#include <Processors/ISource.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Poco/String.h>
#include <filesystem>

namespace fs = std::filesystem;

namespace DB
{

namespace ErrorCodes
{
    extern const int BAD_ARGUMENTS;
    extern const int DEADLOCK_AVOIDED;
    extern const int INCOMPATIBLE_TYPE_OF_JOIN;
    extern const int LOGICAL_ERROR;
    extern const int NO_SUCH_COLUMN_IN_TABLE;
    extern const int NOT_IMPLEMENTED;
    extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
    extern const int UNSUPPORTED_JOIN_KEYS;
}

StorageJoin::StorageJoin(
    DiskPtr disk_,
    const String & relative_path_,
    const StorageID & table_id_,
    const Names & key_names_,
    bool use_nulls_,
    SizeLimits limits_,
    JoinKind kind_,
    JoinStrictness strictness_,
    const ColumnsDescription & columns_,
    const ConstraintsDescription & constraints_,
    const String & comment,
    bool overwrite_,
    bool persistent_)
    : StorageSetOrJoinBase{disk_, relative_path_, table_id_, columns_, constraints_, comment, persistent_}
    , key_names(key_names_)
    , use_nulls(use_nulls_)
    , limits(limits_)
    , kind(kind_)
    , strictness(strictness_)
    , overwrite(overwrite_)
{
    auto metadata_snapshot = getInMemoryMetadataPtr();
    for (const auto & key : key_names)
        if (!metadata_snapshot->getColumns().hasPhysical(key))
            throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "Key column ({}) does not exist in table declaration.", key);

    table_join = std::make_shared<TableJoin>(limits, use_nulls, kind, strictness, key_names);
    join = std::make_shared<HashJoin>(table_join, getRightSampleBlock(), overwrite);
    restore();
}

RWLockImpl::LockHolder StorageJoin::tryLockTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const
{
    const String query_id = context ? context->getInitialQueryId() : RWLockImpl::NO_QUERY;
    const std::chrono::milliseconds acquire_timeout
        = context ? context->getSettingsRef().lock_acquire_timeout : std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC);
    return tryLockTimed(lock, type, query_id, acquire_timeout);
}

RWLockImpl::LockHolder StorageJoin::tryLockForCurrentQueryTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context)
{
    const String query_id = context ? context->getInitialQueryId() : RWLockImpl::NO_QUERY;
    const std::chrono::milliseconds acquire_timeout
        = context ? context->getSettingsRef().lock_acquire_timeout : std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC);
    return lock->getLock(type, query_id, acquire_timeout, false);
}

SinkToStoragePtr StorageJoin::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, bool /*async_insert*/)
{
    std::lock_guard mutate_lock(mutate_mutex);
    return StorageSetOrJoinBase::write(query, metadata_snapshot, context, /*async_insert=*/false);
}

void StorageJoin::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr context, TableExclusiveLockHolder &)
{
    std::lock_guard mutate_lock(mutate_mutex);
    TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Write, context);

    if (disk->exists(path))
        disk->removeRecursive(path);
    else
        LOG_INFO(&Poco::Logger::get("StorageJoin"), "Path {} is already removed from disk {}", path, disk->getName());

    disk->createDirectories(path);
    disk->createDirectories(fs::path(path) / "tmp/");

    increment = 0;
    join = std::make_shared<HashJoin>(table_join, getRightSampleBlock(), overwrite);
}

void StorageJoin::checkMutationIsPossible(const MutationCommands & commands, const Settings & /* settings */) const
{
    for (const auto & command : commands)
        if (command.type != MutationCommand::DELETE)
            throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Table engine Join supports only DELETE mutations");
}

void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context)
{
    /// Firstly acquire lock for mutation, that locks changes of data.
    /// We cannot acquire rwlock here, because read lock is needed
    /// for execution of mutation interpreter.
    std::lock_guard mutate_lock(mutate_mutex);

    constexpr auto tmp_backup_file_name = "tmp/mut.bin";
    auto metadata_snapshot = getInMemoryMetadataPtr();

    auto backup_buf = disk->writeFile(path + tmp_backup_file_name);
    auto compressed_backup_buf = CompressedWriteBuffer(*backup_buf);
    auto backup_stream = NativeWriter(compressed_backup_buf, 0, metadata_snapshot->getSampleBlock());

    auto new_data = std::make_shared<HashJoin>(table_join, getRightSampleBlock(), overwrite);

    // New scope controls lifetime of pipeline.
    {
        auto storage_ptr = DatabaseCatalog::instance().getTable(getStorageID(), context);
        MutationsInterpreter::Settings settings(true);
        auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr, metadata_snapshot, commands, context, settings);
        auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute());
        PullingPipelineExecutor executor(pipeline);

        Block block;
        while (executor.pull(block))
        {
            new_data->addBlockToJoin(block, true);
            if (persistent)
                backup_stream.write(block);
        }
    }

    /// Now acquire exclusive lock and modify storage.
    TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Write, context);

    join = std::move(new_data);
    increment = 1;

    if (persistent)
    {
        backup_stream.flush();
        compressed_backup_buf.next();
        backup_buf->next();
        backup_buf->finalize();

        std::vector<std::string> files;
        disk->listFiles(path, files);
        for (const auto & file_name: files)
        {
            if (file_name.ends_with(".bin"))
                disk->removeFileIfExists(path + file_name);
        }

        disk->replaceFile(path + tmp_backup_file_name, path + std::to_string(increment) + ".bin");
    }
}

HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join, ContextPtr context, const Names & required_columns_names) const
{
    auto metadata_snapshot = getInMemoryMetadataPtr();
    if (!analyzed_join->sameStrictnessAndKind(strictness, kind))
        throw Exception(ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN, "Table '{}' has incompatible type of JOIN", getStorageID().getNameForLogs());

    if ((analyzed_join->forceNullableRight() && !use_nulls) ||
        (!analyzed_join->forceNullableRight() && isLeftOrFull(analyzed_join->kind()) && use_nulls))
        throw Exception(
            ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN,
            "Table {} needs the same join_use_nulls setting as present in LEFT or FULL JOIN",
            getStorageID().getNameForLogs());

    if (analyzed_join->getClauses().size() != 1)
        throw Exception(ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN, "JOIN keys should match to the Join engine keys [{}]",
                        fmt::join(getKeyNames(), ", "));

    const auto & join_on = analyzed_join->getOnlyClause();
    if (join_on.on_filter_condition_left || join_on.on_filter_condition_right)
        throw Exception(ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN, "ON section of JOIN with filter conditions is not implemented");

    const auto & key_names_right = join_on.key_names_right;
    const auto & key_names_left = join_on.key_names_left;
    if (key_names.size() != key_names_right.size() || key_names.size() != key_names_left.size())
        throw Exception(ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN,
            "Number of keys in JOIN ON section ({}) doesn't match number of keys in Join engine ({})",
            key_names_right.size(), key_names.size());

    /* Resort left keys according to right keys order in StorageJoin
     * We can't change the order of keys in StorageJoin
     * because the hash table was already built with tuples serialized in the order of key_names.
     * If we try to use the same hash table with different order of keys,
     * then calculated hashes and the result of the comparison will be wrong.
     *
     * Example:
     * ```
     * CREATE TABLE t_right (a UInt32, b UInt32) ENGINE = Join(ALL, INNER, a, b);
     * SELECT * FROM t_left JOIN t_right ON t_left.y = t_right.b AND t_left.x = t_right.a;
     * ```
     * In that case right keys should still be (a, b), need to change the order of the left keys to (x, y).
     */
    Names left_key_names_resorted;
    for (const auto & key_name : key_names)
    {
        const auto & renamed_key = analyzed_join->renamedRightColumnNameWithAlias(key_name);
        /// find position of renamed_key in key_names_right
        auto it = std::find(key_names_right.begin(), key_names_right.end(), renamed_key);
        if (it == key_names_right.end())
            throw Exception(ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN,
                "Key '{}' not found in JOIN ON section. Join engine key{} '{}' have to be used",
                key_name, key_names.size() > 1 ? "s" : "", fmt::join(key_names, ", "));
        const size_t key_position = std::distance(key_names_right.begin(), it);
        left_key_names_resorted.push_back(key_names_left[key_position]);
    }

    /// Set qualified identifiers to original names (table.column -> column).
    /// It's required because storage join stores non-qualified names.
    /// Qualifies will be added by join implementation (TableJoin contains a rename mapping).
    analyzed_join->setRightKeys(key_names);
    analyzed_join->setLeftKeys(left_key_names_resorted);
    Block right_sample_block;
    for (const auto & name : required_columns_names)
        right_sample_block.insert(getRightSampleBlock().getByName(name));
    HashJoinPtr join_clone = std::make_shared<HashJoin>(analyzed_join, right_sample_block);

    RWLockImpl::LockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Read, context);
    join_clone->setLock(holder);
    join_clone->reuseJoinedData(*join);

    return join_clone;
}

void StorageJoin::insertBlock(const Block & block, ContextPtr context)
{
    Block block_to_insert = block;
    convertRightBlock(block_to_insert);
    TableLockHolder holder = tryLockForCurrentQueryTimedWithContext(rwlock, RWLockImpl::Write, context);

    /// Protection from `INSERT INTO test_table_join SELECT * FROM test_table_join`
    if (!holder)
        throw Exception(ErrorCodes::DEADLOCK_AVOIDED, "StorageJoin: cannot insert data because current query tries to read from this storage");

    join->addBlockToJoin(block_to_insert, true);
}

size_t StorageJoin::getSize(ContextPtr context) const
{
    TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Read, context);
    return join->getTotalRowCount();
}

std::optional<UInt64> StorageJoin::totalRows(const Settings &settings) const
{
    TableLockHolder holder = tryLockTimed(rwlock, RWLockImpl::Read, RWLockImpl::NO_QUERY, settings.lock_acquire_timeout);
    return join->getTotalRowCount();
}

std::optional<UInt64> StorageJoin::totalBytes(const Settings &settings) const
{
    TableLockHolder holder = tryLockTimed(rwlock, RWLockImpl::Read, RWLockImpl::NO_QUERY, settings.lock_acquire_timeout);
    return join->getTotalByteCount();
}

DataTypePtr StorageJoin::joinGetCheckAndGetReturnType(const DataTypes & data_types, const String & column_name, bool or_null) const
{
    return join->joinGetCheckAndGetReturnType(data_types, column_name, or_null);
}

ColumnWithTypeAndName StorageJoin::joinGet(const Block & block, const Block & block_with_columns_to_add, ContextPtr context) const
{
    TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Read, context);
    return join->joinGet(block, block_with_columns_to_add);
}

void StorageJoin::convertRightBlock(Block & block) const
{
    bool need_covert = use_nulls && isLeftOrFull(kind);
    if (!need_covert)
        return;

    for (auto & col : block)
        JoinCommon::convertColumnToNullable(col);
}

void registerStorageJoin(StorageFactory & factory)
{
    auto creator_fn = [](const StorageFactory::Arguments & args)
    {
        /// Join(ANY, LEFT, k1, k2, ...)

        ASTs & engine_args = args.engine_args;

        const auto & settings = args.getContext()->getSettingsRef();

        auto join_use_nulls = settings.join_use_nulls;
        auto max_rows_in_join = settings.max_rows_in_join;
        auto max_bytes_in_join = settings.max_bytes_in_join;
        auto join_overflow_mode = settings.join_overflow_mode;
        auto join_any_take_last_row = settings.join_any_take_last_row;
        auto old_any_join = settings.any_join_distinct_right_table_keys;
        bool persistent = true;
        String disk_name = "default";

        if (args.storage_def && args.storage_def->settings)
        {
            for (const auto & setting : args.storage_def->settings->changes)
            {
                if (setting.name == "join_use_nulls")
                    join_use_nulls = setting.value;
                else if (setting.name == "max_rows_in_join")
                    max_rows_in_join = setting.value;
                else if (setting.name == "max_bytes_in_join")
                    max_bytes_in_join = setting.value;
                else if (setting.name == "join_overflow_mode")
                    join_overflow_mode = setting.value;
                else if (setting.name == "join_any_take_last_row")
                    join_any_take_last_row = setting.value;
                else if (setting.name == "any_join_distinct_right_table_keys")
                    old_any_join = setting.value;
                else if (setting.name == "disk")
                    disk_name = setting.value.get<String>();
                else if (setting.name == "persistent")
                {
                    persistent = setting.value.get<bool>();
                }
                else
                    throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown setting {} for storage {}", setting.name, args.engine_name);
            }
        }

        DiskPtr disk = args.getContext()->getDisk(disk_name);

        if (engine_args.size() < 3)
            throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
                            "Storage Join requires at least 3 parameters: "
                            "Join(ANY|ALL|SEMI|ANTI, LEFT|INNER|RIGHT, keys...).");

        JoinStrictness strictness = JoinStrictness::Unspecified;
        JoinKind kind = JoinKind::Comma;

        if (auto opt_strictness_id = tryGetIdentifierName(engine_args[0]))
        {
            const String strictness_str = Poco::toLower(*opt_strictness_id);

            if (strictness_str == "any")
            {
                if (old_any_join)
                    strictness = JoinStrictness::RightAny;
                else
                    strictness = JoinStrictness::Any;
            }
            else if (strictness_str == "all")
                strictness = JoinStrictness::All;
            else if (strictness_str == "semi")
                strictness = JoinStrictness::Semi;
            else if (strictness_str == "anti")
                strictness = JoinStrictness::Anti;
        }

        if (strictness == JoinStrictness::Unspecified)
            throw Exception(ErrorCodes::BAD_ARGUMENTS, "First parameter of storage Join must be ANY or ALL or SEMI or ANTI (without quotes).");

        if (auto opt_kind_id = tryGetIdentifierName(engine_args[1]))
        {
            const String kind_str = Poco::toLower(*opt_kind_id);

            if (kind_str == "left")
                kind = JoinKind::Left;
            else if (kind_str == "inner")
                kind = JoinKind::Inner;
            else if (kind_str == "right")
                kind = JoinKind::Right;
            else if (kind_str == "full")
            {
                if (strictness == JoinStrictness::Any)
                    strictness = JoinStrictness::RightAny;
                kind = JoinKind::Full;
            }
        }

        if (kind == JoinKind::Comma)
            throw Exception(ErrorCodes::BAD_ARGUMENTS, "Second parameter of storage Join must be LEFT or INNER or RIGHT or FULL (without quotes).");

        Names key_names;
        key_names.reserve(engine_args.size() - 2);
        for (size_t i = 2, size = engine_args.size(); i < size; ++i)
        {
            auto opt_key = tryGetIdentifierName(engine_args[i]);
            if (!opt_key)
                throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parameter №{} of storage Join don't look like column name.", i + 1);

            key_names.push_back(*opt_key);
        }

        return std::make_shared<StorageJoin>(
            disk,
            args.relative_data_path,
            args.table_id,
            key_names,
            join_use_nulls,
            SizeLimits{max_rows_in_join, max_bytes_in_join, join_overflow_mode},
            kind,
            strictness,
            args.columns,
            args.constraints,
            args.comment,
            join_any_take_last_row,
            persistent);
    };

    factory.registerStorage("Join", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
}

template <typename T>
static const char * rawData(T & t)
{
    return reinterpret_cast<const char *>(&t);
}
template <typename T>
static size_t rawSize(T &)
{
    return sizeof(T);
}
template <>
const char * rawData(const StringRef & t)
{
    return t.data;
}
template <>
size_t rawSize(const StringRef & t)
{
    return t.size;
}

class JoinSource : public ISource
{
public:
    JoinSource(HashJoinPtr join_, TableLockHolder lock_holder_, UInt64 max_block_size_, Block sample_block_)
        : ISource(sample_block_)
        , join(join_)
        , lock_holder(lock_holder_)
        , max_block_size(max_block_size_)
        , sample_block(std::move(sample_block_))
    {
        if (!join->getTableJoin().oneDisjunct())
            throw DB::Exception(ErrorCodes::NOT_IMPLEMENTED, "StorageJoin does not support OR for keys in JOIN ON section");

        column_indices.resize(sample_block.columns());

        auto & saved_block = join->getJoinedData()->sample_block;

        for (size_t i = 0; i < sample_block.columns(); ++i)
        {
            auto & [_, type, name] = sample_block.getByPosition(i);
            if (join->right_table_keys.has(name))
            {
                key_pos = i;
                const auto & column = join->right_table_keys.getByName(name);
                restored_block.insert(column);
            }
            else
            {
                size_t pos = saved_block.getPositionByName(name);
                column_indices[i] = pos;

                const auto & column = saved_block.getByPosition(pos);
                restored_block.insert(column);
            }
        }
    }

    String getName() const override { return "Join"; }

protected:
    Chunk generate() override
    {
        if (join->data->blocks.empty())
            return {};

        Chunk chunk;
        if (!joinDispatch(join->kind, join->strictness, join->data->maps.front(),
                [&](auto kind, auto strictness, auto & map) { chunk = createChunk<kind, strictness>(map); }))
            throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown JOIN strictness");
        return chunk;
    }

private:
    HashJoinPtr join;
    TableLockHolder lock_holder;

    UInt64 max_block_size;
    Block sample_block;
    Block restored_block; /// sample_block with parent column types

    ColumnNumbers column_indices;
    std::optional<size_t> key_pos;

    std::unique_ptr<void, std::function<void(void *)>> position; /// type erasure


    template <JoinKind KIND, JoinStrictness STRICTNESS, typename Maps>
    Chunk createChunk(const Maps & maps)
    {
        MutableColumns mut_columns = restored_block.cloneEmpty().mutateColumns();

        size_t rows_added = 0;

        switch (join->data->type)
        {
#define M(TYPE)                                           \
    case HashJoin::Type::TYPE:                                \
        rows_added = fillColumns<KIND, STRICTNESS>(*maps.TYPE, mut_columns); \
        break;
            APPLY_FOR_JOIN_VARIANTS_LIMITED(M)
#undef M

            default:
                throw Exception(ErrorCodes::UNSUPPORTED_JOIN_KEYS, "Unsupported JOIN keys in StorageJoin. Type: {}",
                                static_cast<UInt32>(join->data->type));
        }

        if (!rows_added)
            return {};

        Columns columns;
        columns.reserve(mut_columns.size());
        for (auto & col : mut_columns)
            columns.emplace_back(std::move(col));

        /// Correct nullability and LowCardinality types
        for (size_t i = 0; i < columns.size(); ++i)
        {
            const auto & src = restored_block.getByPosition(i);
            const auto & dst = sample_block.getByPosition(i);

            if (!src.type->equals(*dst.type))
            {
                auto arg = src;
                arg.column = std::move(columns[i]);
                columns[i] = castColumn(arg, dst.type);
            }
        }

        UInt64 num_rows = columns.at(0)->size();
        return Chunk(std::move(columns), num_rows);
    }

    template <JoinKind KIND, JoinStrictness STRICTNESS, typename Map>
    size_t fillColumns(const Map & map, MutableColumns & columns)
    {
        size_t rows_added = 0;

        if (!position)
            position = decltype(position)(
                static_cast<void *>(new typename Map::const_iterator(map.begin())),
                [](void * ptr) { delete reinterpret_cast<typename Map::const_iterator *>(ptr); });

        auto & it = *reinterpret_cast<typename Map::const_iterator *>(position.get());
        auto end = map.end();

        for (; it != end; ++it)
        {
            if constexpr (STRICTNESS == JoinStrictness::RightAny)
            {
                fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
            }
            else if constexpr (STRICTNESS == JoinStrictness::All)
            {
                fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
            }
            else if constexpr (STRICTNESS == JoinStrictness::Any)
            {
                if constexpr (KIND == JoinKind::Left || KIND == JoinKind::Inner)
                    fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
                else if constexpr (KIND == JoinKind::Right)
                    fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
            }
            else if constexpr (STRICTNESS == JoinStrictness::Semi)
            {
                if constexpr (KIND == JoinKind::Left)
                    fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
                else if constexpr (KIND == JoinKind::Right)
                    fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
            }
            else if constexpr (STRICTNESS == JoinStrictness::Anti)
            {
                if constexpr (KIND == JoinKind::Left)
                    fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
                else if constexpr (KIND == JoinKind::Right)
                    fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
            }
            else
                throw Exception(ErrorCodes::NOT_IMPLEMENTED, "This JOIN is not implemented yet");

            if (rows_added >= max_block_size)
            {
                ++it;
                break;
            }
        }

        return rows_added;
    }

    template <typename Map>
    static void fillOne(MutableColumns & columns, const ColumnNumbers & column_indices, typename Map::const_iterator & it,
                        const std::optional<size_t> & key_pos, size_t & rows_added)
    {
        for (size_t j = 0; j < columns.size(); ++j)
            if (j == key_pos)
                columns[j]->insertData(rawData(it->getKey()), rawSize(it->getKey()));
            else
                columns[j]->insertFrom(*it->getMapped().block->getByPosition(column_indices[j]).column.get(), it->getMapped().row_num);
        ++rows_added;
    }

    template <typename Map>
    static void fillAll(MutableColumns & columns, const ColumnNumbers & column_indices, typename Map::const_iterator & it,
                        const std::optional<size_t> & key_pos, size_t & rows_added)
    {
        for (auto ref_it = it->getMapped().begin(); ref_it.ok(); ++ref_it)
        {
            for (size_t j = 0; j < columns.size(); ++j)
                if (j == key_pos)
                    columns[j]->insertData(rawData(it->getKey()), rawSize(it->getKey()));
                else
                    columns[j]->insertFrom(*ref_it->block->getByPosition(column_indices[j]).column.get(), ref_it->row_num);
            ++rows_added;
        }
    }
};


// TODO: multiple stream read and index read
Pipe StorageJoin::read(
    const Names & column_names,
    const StorageSnapshotPtr & storage_snapshot,
    SelectQueryInfo & /*query_info*/,
    ContextPtr context,
    QueryProcessingStage::Enum /*processed_stage*/,
    size_t max_block_size,
    size_t /*num_streams*/)
{
    storage_snapshot->check(column_names);

    Block source_sample_block = storage_snapshot->getSampleBlockForColumns(column_names);
    RWLockImpl::LockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Read, context);
    return Pipe(std::make_shared<JoinSource>(join, std::move(holder), max_block_size, source_sample_block));
}

}