diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index 4a21326..943b2ef 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -20,7 +20,7 @@ jobs: submodules: recursive - name: Initialize CodeQL - uses: github/codeql-action/init@v2 + uses: github/codeql-action/init@v3 with: languages: 'cpp' @@ -37,6 +37,6 @@ jobs: # Perform Analysis - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v2 + uses: github/codeql-action/analyze@v3 with: category: "/language:cpp" diff --git a/CMakeLists.txt b/CMakeLists.txt index 5dac8f4..8650d57 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,7 +9,7 @@ endif() set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) if (UNIX AND (CMAKE_HOST_SYSTEM_PROCESSOR STREQUAL "x86_64")) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=native") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=native -mno-avx512f") # disable AVX512 set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mbmi2 -msse4.2 -mpopcnt") endif() diff --git a/include/build_util.hpp b/include/build_util.hpp index dfc9a07..f1804a4 100644 --- a/include/build_util.hpp +++ b/include/build_util.hpp @@ -5,7 +5,7 @@ namespace fulgor { -void build_reference_sketches(index_type const& index, +void build_reference_sketches(hfur_index_t const& index, uint64_t p, // use 2^p bytes per HLL sketch uint64_t num_threads, // num. threads for construction std::string output_filename // where the sketches will be serialized diff --git a/include/builders/differential_builder.hpp b/include/builders/differential_builder.hpp index 99e7c47..3ca4baf 100644 --- a/include/builders/differential_builder.hpp +++ b/include/builders/differential_builder.hpp @@ -162,7 +162,7 @@ struct differential_permuter { essentials::timer timer; timer.start(); if (num_points == 0) { - std::cout << "Found empty partition" << endl; + std::cout << "Found empty partition" << std::endl; clustering_data.num_clusters = 0; clustering_data.clusters = {}; return 0; @@ -206,7 +206,7 @@ struct index::differential_builder { const uint32_t num_threads = m_build_config.num_threads; - index_type index; + hfur_index_t index; essentials::logger("step 1. loading index to be differentiated..."); essentials::load(index, m_build_config.index_filename_to_partition.c_str()); essentials::logger("DONE"); diff --git a/include/builders/meta_builder.hpp b/include/builders/meta_builder.hpp index 51f73cc..3455190 100644 --- a/include/builders/meta_builder.hpp +++ b/include/builders/meta_builder.hpp @@ -15,7 +15,7 @@ struct permuter { permuter(build_configuration const& build_config) : m_build_config(build_config), m_num_partitions(0), m_max_partition_size(0) {} - void permute(index_type const& index) { + void permute(hfur_index_t const& index) { essentials::timer timer; { @@ -132,7 +132,7 @@ struct index::meta_builder { void build(index& idx) { if (idx.m_k2u.size() != 0) throw std::runtime_error("index already built"); - index_type index; + hfur_index_t index; essentials::logger("step 1. loading index to be partitioned..."); essentials::load(index, m_build_config.index_filename_to_partition.c_str()); essentials::logger("DONE"); @@ -156,7 +156,7 @@ struct index::meta_builder { essentials::logger("step 4. building partial/meta color sets"); timer.start(); - atomic_uint64_t num_integers_in_metacolor_sets = 0; + std::atomic_uint64_t num_integers_in_metacolor_sets = 0; uint64_t num_partial_color_sets = 0; typename ColorSets::builder color_sets_builder; @@ -186,7 +186,7 @@ struct index::meta_builder { thread_slices[num_threads] = index.num_color_sets(); auto exe = [&](uint64_t thread_id) { - string tmp_filename = metacolor_set_file_name(thread_id); + std::string tmp_filename = metacolor_set_file_name(thread_id); uint64_t partition_id = 0; uint32_t meta_color_set_size = 0; std::vector partial_color_set; @@ -320,7 +320,7 @@ struct index::meta_builder { std::remove(metacolor_set_file_name(thread_id).c_str()); thread_id++; - string tmp_filename = metacolor_set_file_name(thread_id); + std::string tmp_filename = metacolor_set_file_name(thread_id); metacolor_set_in = std::ifstream(tmp_filename, std::ios::binary); if (!metacolor_set_in.is_open()) throw std::runtime_error("error in opening file: " + tmp_filename); diff --git a/include/builders/meta_differential_builder.hpp b/include/builders/meta_differential_builder.hpp index 0f5cfcc..5558c95 100644 --- a/include/builders/meta_differential_builder.hpp +++ b/include/builders/meta_differential_builder.hpp @@ -15,7 +15,7 @@ struct index::meta_differential_builder { void build(index& idx) { if (idx.m_k2u.size() != 0) throw std::runtime_error("index already built"); - meta_index_type meta_index; + mfur_index_t meta_index; essentials::logger("step 1. loading index to be partitioned"); essentials::load(meta_index, m_build_config.index_filename_to_partition.c_str()); essentials::logger("DONE"); @@ -85,7 +85,7 @@ struct index::meta_differential_builder { num_partition_colors); std::vector threads(thread_slices.size()); - auto encode_color_sets = [&](uint64_t thread_id) { + auto encode_color_sets = [&thread_builders, &thread_slices, &permutation, &meta_partition, num_partition_colors](uint64_t thread_id) { auto& color_sets_builder = thread_builders[thread_id]; auto& [begin, end] = thread_slices[thread_id]; color_sets_builder.reserve_num_bits(16 * essentials::GB * 8); @@ -357,7 +357,7 @@ struct index::meta_differential_builder { auto exe = [&](uint64_t thread_id) { uint64_t l = slice_size * thread_id; - uint64_t r = min(slice_size * (thread_id + 1), idx.m_k2u.num_contigs()); + uint64_t r = std::min(slice_size * (thread_id + 1), idx.m_k2u.num_contigs()); for (uint64_t unitig_id = l; unitig_id < r; ++unitig_id) { auto it = idx.get_k2u().at_contig_id(unitig_id); diff --git a/include/color_sets/differential.hpp b/include/color_sets/differential.hpp index b633f7f..1901276 100644 --- a/include/color_sets/differential.hpp +++ b/include/color_sets/differential.hpp @@ -99,11 +99,11 @@ struct differential { prev_val = val; } } - } - void append(differential::builder const& db) { - if (db.m_color_set_offsets.size() - 1 == 0) return; + void append(builder const& db) { + // I don't know why the following line was here, but removing it seems to solve bugs + // if (db.m_color_set_offsets.size() - 1 == 0) return; uint64_t delta = m_bvb.num_bits(); m_bvb.append(db.m_bvb); m_num_total_integers += db.m_num_total_integers; @@ -289,7 +289,7 @@ struct differential { next_differential_val(); next_representative_val(); } - m_curr_val = min(m_curr_differential_val, m_curr_representative_val); + m_curr_val = std::min(m_curr_differential_val, m_curr_representative_val); } }; diff --git a/include/color_sets/meta_differential.hpp b/include/color_sets/meta_differential.hpp index 103793b..98ab2dc 100644 --- a/include/color_sets/meta_differential.hpp +++ b/include/color_sets/meta_differential.hpp @@ -32,7 +32,7 @@ struct meta_differential { m_partition_sets_offsets.reserve(num_sets); } - void process_meta_color_partition_set(vector& partition_set) { + void process_meta_color_partition_set(std::vector& partition_set) { m_curr_partition_set = partition_set; uint64_t size = partition_set.size(); uint32_t prev_val = partition_set[0]; @@ -58,7 +58,7 @@ struct meta_differential { m_prev_docs += d.num_colors(); } - void process_metacolor_set(vector& relative_colors) { + void process_metacolor_set(std::vector& relative_colors) { assert(m_curr_partition_set.size() == relative_colors.size()); m_partition_sets_partitions.push_back(false); diff --git a/include/index.hpp b/include/index.hpp index 237540b..b4d30d5 100644 --- a/include/index.hpp +++ b/include/index.hpp @@ -36,9 +36,10 @@ struct index { /* from unitig_id to color_set_id */ uint64_t u2c(uint64_t unitig_id) const { return m_u2c_rank1_index.rank1(m_u2c, unitig_id); } - void pseudoalign_full_intersection(std::string const& sequence, // - std::vector& results) const; // - + void fetch_color_set_ids(std::string const& sequence, + std::vector& color_set_ids) const; + void pseudoalign_full_intersection(std::vector& color_set_ids, // + std::vector& results, std::vector& tmp) const; // void pseudoalign_threshold_union(std::string const& sequence, // std::vector& results, // const double threshold) const; // diff --git a/include/index_types.hpp b/include/index_types.hpp index 8e63edf..e1d8806 100644 --- a/include/index_types.hpp +++ b/include/index_types.hpp @@ -7,7 +7,7 @@ namespace fulgor { typedef index hybrid_colors_index_type; -typedef hybrid_colors_index_type index_type; // in use +typedef hybrid_colors_index_type hfur_index_t; // in use } // namespace fulgor #include "builders/meta_builder.hpp" @@ -15,7 +15,7 @@ typedef hybrid_colors_index_type index_type; // in use namespace fulgor { typedef index> meta_hybrid_colors_index_type; -typedef meta_hybrid_colors_index_type meta_index_type; // in use +typedef meta_hybrid_colors_index_type mfur_index_t; // in use } // namespace fulgor #include "builders/differential_builder.hpp" @@ -23,7 +23,7 @@ typedef meta_hybrid_colors_index_type meta_index_type; // in use namespace fulgor { typedef index differential_colors_index_type; -typedef differential_colors_index_type differential_index_type; // in use +typedef differential_colors_index_type dfur_index_t; // in use } // namespace fulgor #include "color_sets/meta_differential.hpp" @@ -31,5 +31,5 @@ typedef differential_colors_index_type differential_index_type; // in use namespace fulgor { typedef index meta_differential_colors_index_type; -typedef meta_differential_colors_index_type meta_differential_index_type; // in use +typedef meta_differential_colors_index_type mdfur_index_t; // in use } // namespace fulgor diff --git a/include/util.hpp b/include/util.hpp index e035261..85923ab 100644 --- a/include/util.hpp +++ b/include/util.hpp @@ -11,6 +11,8 @@ #include "external/smhasher/src/City.h" #include "external/smhasher/src/City.cpp" +#include "external/FQFeeder/include/blockingconcurrentqueue.h" + namespace fulgor { enum index_t { HYBRID, DIFF, META, META_DIFF }; @@ -21,10 +23,10 @@ namespace constants { constexpr double invalid_threshold = -1.0; constexpr uint64_t default_ram_limit_in_GiB = 8; static const std::string default_tmp_dirname("."); -static const std::string fulgor_filename_extension("fur"); -static const std::string meta_colored_fulgor_filename_extension("mfur"); -static const std::string diff_colored_fulgor_filename_extension("dfur"); -static const std::string meta_diff_colored_fulgor_filename_extension("mdfur"); +static const std::string hfur_filename_extension("fur"); +static const std::string mfur_filename_extension("mfur"); +static const std::string dfur_filename_extension("dfur"); +static const std::string mdfur_filename_extension("mdfur"); namespace current_version_number { constexpr uint8_t x = 4; @@ -215,5 +217,50 @@ struct hasher_uint128_t { uint64_t operator()(const __uint128_t x) const { return static_cast(x) ^ (x >> 64); } }; +inline int num_digits(const uint32_t n) { + if (n >= 10000) { + if (n >= 10000000) { + if (n >= 100000000) { + if (n >= 1000000000) + return 10; + return 9; + } + return 8; + } + if (n >= 100000) { + if (n >= 1000000) + return 7; + return 6; + } + return 5; + } + if (n >= 100) { + if (n >= 1000) + return 4; + return 3; + } + if (n >= 10) + return 2; + return 1; +} + +inline void vec_to_tsv(std::vector const& vec, std::string& s) { + s.clear(); + s.reserve(vec.size() * 12); + char buffer[32]; + buffer[31] = '\t'; + uint32_t tmp; + for (uint32_t x : vec) { + int len = 0; + do { + tmp = x / 10; + buffer[30 - len++] = '0' + (x - tmp * 10); + x = tmp; + } while (x > 0); + s.append(buffer + 31 - len, len + 1); + } + s.pop_back(); +} + } // namespace util } // namespace fulgor diff --git a/src/ps_full_intersection.cpp b/src/ps_full_intersection.cpp index 8d9ad4b..a0dac51 100644 --- a/src/ps_full_intersection.cpp +++ b/src/ps_full_intersection.cpp @@ -247,12 +247,6 @@ void meta_intersect(std::vector& iterators, std::vector& col if (iterators.empty()) return; - for (auto it : iterators) { - while (it.partition_id() != it.num_partitions()) it.next_partition_id(); - it.init(); - it.change_partition(); - } - std::sort(iterators.begin(), iterators.end(), [](auto const& x, auto const& y) { return x.meta_color_set_size() < y.meta_color_set_size(); }); @@ -338,10 +332,9 @@ void meta_intersect(std::vector& iterators, std::vector& col } template -void index::pseudoalign_full_intersection(std::string const& sequence, - std::vector& colors) const { +void index::fetch_color_set_ids(std::string const& sequence, + std::vector& color_set_ids) const { if (sequence.length() < m_k2u.k()) return; - colors.clear(); std::vector unitig_ids; { /* stream through */ @@ -362,30 +355,36 @@ void index::pseudoalign_full_intersection(std::string const& sequence /* here we use it to hold the color set ids; in meta_intersect we use it to hold the partition ids */ - std::vector tmp; - std::vector iterators; + color_set_ids.clear(); /* deduplicate unitig_ids */ std::sort(unitig_ids.begin(), unitig_ids.end()); auto end_unitigs = std::unique(unitig_ids.begin(), unitig_ids.end()); - tmp.reserve(end_unitigs - unitig_ids.begin()); + color_set_ids.reserve(end_unitigs - unitig_ids.begin()); for (auto it = unitig_ids.begin(); it != end_unitigs; ++it) { uint32_t unitig_id = *it; uint32_t color_set_id = u2c(unitig_id); - tmp.push_back(color_set_id); + color_set_ids.push_back(color_set_id); } /* deduplicate color set ids */ - std::sort(tmp.begin(), tmp.end()); - auto end_tmp = std::unique(tmp.begin(), tmp.end()); - iterators.reserve(end_tmp - tmp.begin()); - for (auto it = tmp.begin(); it != end_tmp; ++it) { - uint64_t color_set_id = *it; + std::sort(color_set_ids.begin(), color_set_ids.end()); + auto end_tmp = std::unique(color_set_ids.begin(), color_set_ids.end()); + color_set_ids.erase(end_tmp, color_set_ids.end()); +} + +template +void index::pseudoalign_full_intersection(std::vector& color_set_ids, + std::vector& colors, std::vector& tmp) const { + std::vector iterators; + iterators.reserve(color_set_ids.size()); + for (auto color_set_id : color_set_ids) { auto fwd_it = m_color_sets.color_set(color_set_id); iterators.push_back(fwd_it); } - tmp.clear(); // don't need color set ids anymore + colors.clear(); + tmp.clear(); if constexpr (ColorSets::type == index_t::META) { meta_intersect(iterators, colors, tmp); } else if constexpr (ColorSets::type == index_t::META_DIFF) { diff --git a/src/ps_utils.cpp b/src/ps_utils.cpp new file mode 100644 index 0000000..e2a91c7 --- /dev/null +++ b/src/ps_utils.cpp @@ -0,0 +1,479 @@ +#include +#include +#include +#include +#include + +#include "include/index.hpp" +#include "external/FQFeeder/include/FastxParser.hpp" + +namespace fulgor { + +enum class pseudoalignment_algorithm : uint8_t { FULL_INTERSECTION, THRESHOLD_UNION }; + +std::string to_string(const pseudoalignment_algorithm algo, const double threshold) { + std::string o; + switch (algo) { + case pseudoalignment_algorithm::FULL_INTERSECTION: + o = "full-intersection"; + break; + case pseudoalignment_algorithm::THRESHOLD_UNION: + o = "threshold-union (threshold = " + std::to_string(threshold) + ")"; + break; + } + return o; +} + +template +struct formatter_buffer { + explicit formatter_buffer(Formatter* ptr) : m_formatter(ptr), m_num_bytes(0) {} + + void write(const uint32_t query_id, std::vector const& vec) { + m_num_bytes += m_formatter->format(m_buffer, query_id, vec); + + if (m_num_bytes > (1 << 14)) { + m_formatter->flush(m_buffer, m_num_bytes); + m_num_bytes = 0; + } + } + + ~formatter_buffer() { + m_formatter->flush(m_buffer, m_num_bytes); + } + +private: + Formatter* m_formatter; + typename Formatter::buffer_t m_buffer; + uint32_t m_num_bytes; +}; + +struct psa_ascii_formatter { + typedef std::stringstream buffer_t; + + explicit psa_ascii_formatter(const std::string &output_filename) + : m_file(output_filename){} + + formatter_buffer buffer() { + return formatter_buffer(this); + } + + uint32_t format(buffer_t& ss, uint32_t query_id, std::vector const& colors) { + uint32_t num_bytes = 0; + ss << query_id << "\t" << colors.size(); + num_bytes += util::num_digits(query_id) + 1 + util::num_digits(colors.size()); // query_id + tab + size + + if (!colors.empty()) { + std::string tmpstr; + util::vec_to_tsv(colors, tmpstr); + ss << "\t" << tmpstr; + num_bytes += 1 + tmpstr.size(); // tab + size of string + } + + ss << "\n"; + num_bytes += 1; // newline + return num_bytes; + } + + void flush(buffer_t& buffer, const uint32_t num_bytes) { + m_mut.lock(); + m_file.write(buffer.str().data(), num_bytes); + m_mut.unlock(); + buffer.str(""); + } + +protected: + std::ofstream m_file; + std::mutex m_mut; +}; + +struct psa_slow_formatter { + typedef std::stringstream buffer_t; + + explicit psa_slow_formatter(const std::string &output_filename) + : m_file(output_filename){} + + formatter_buffer buffer() { + return formatter_buffer(this); + } + + uint32_t format(buffer_t& ss, uint32_t query_id, std::vector const& colors) { + uint32_t num_bytes = 0; + ss << query_id << "\t" << colors.size(); + num_bytes += util::num_digits(query_id) + 1 + util::num_digits(colors.size()); // query_id + tab + size + + for(auto c : colors) { + ss << '\t' << c; + num_bytes += 1 + util::num_digits(c); // tab + color + } + ss << "\n"; + num_bytes += 1; // newline + + return num_bytes; + } + + void flush(buffer_t& buffer, const uint32_t num_bytes) { + m_mut.lock(); + m_file.write(buffer.str().data(), num_bytes); + m_mut.unlock(); + buffer.str(""); + } + +protected: + std::ofstream m_file; + std::mutex m_mut; +}; + +struct psa_binary_formatter { + typedef std::stringstream buffer_t; + + explicit psa_binary_formatter(const std::string &output_filename) + : m_file(output_filename){} + + formatter_buffer buffer() { + return formatter_buffer(this); + } + + uint32_t format(buffer_t& ss, uint32_t query_id, std::vector const& colors) { + uint32_t colors_size = colors.size(); + ss.write(reinterpret_cast(&query_id), sizeof(uint32_t)); + ss.write(reinterpret_cast(&colors_size), sizeof(uint32_t)); + if (!colors.empty()) + ss.write(reinterpret_cast(colors.data()), colors.size() * sizeof(uint32_t)); + return (2 + colors_size) * sizeof(uint32_t); // (query id + size + colors) * 4 Bytes + } + + void flush(buffer_t& buffer, const uint32_t num_bytes) { + m_mut.lock(); + m_file.write(buffer.str().data(), num_bytes); + m_mut.unlock(); + buffer.str(""); + } + +protected: + std::ofstream m_file; + std::mutex m_mut; +}; + +struct psa_compressed_formatter { + typedef bits::bit_vector::builder buffer_t; + + explicit psa_compressed_formatter(const std::string &output_filename) + : m_file(output_filename), m_num_colors(0), m_sparse_set_threshold_size(0), m_very_dense_set_threshold_size(0) {} + + void set_num_colors(uint32_t num_colors) { + assert(m_num_colors == 0); + m_num_colors = num_colors; + m_file.write(reinterpret_cast(&m_num_colors), sizeof(uint64_t)); + m_sparse_set_threshold_size = 0.25 * m_num_colors; + m_very_dense_set_threshold_size = 0.75 * m_num_colors; + } + + formatter_buffer buffer() { + return formatter_buffer(this); + } + + uint32_t format(buffer_t& bvb, uint32_t query_id, std::vector const& colors) { + assert(m_num_colors != 0); + const uint32_t num_bytes_start = bvb.data().size() * sizeof(uint64_t); + const uint32_t size = colors.size(); + bits::util::write_delta(bvb, query_id); + bits::util::write_delta(bvb, size); /* encode size */ + if (size == 0) { + + } else if (size < m_sparse_set_threshold_size) { + uint32_t prev_val = colors[0]; + bits::util::write_delta(bvb, prev_val); + for (uint64_t i = 1; i != size; ++i) { + uint32_t val = colors[i]; + assert(val >= prev_val + 1); + bits::util::write_delta(bvb, val - (prev_val + 1)); + prev_val = val; + } + } else if (size < m_very_dense_set_threshold_size) { + bits::bit_vector::builder tmp; + tmp.resize(m_num_colors); + for (uint64_t i = 0; i != size; ++i) tmp.set(colors[i]); + bvb.append(tmp); + } else { + bool first = true; + uint32_t val = 0; + uint32_t prev_val = -1; + uint32_t written = 0; + for (uint64_t i = 0; i != size; ++i) { + uint32_t x = colors[i]; + while (val < x) { + if (first) { + bits::util::write_delta(bvb, val); + first = false; + ++written; + } else { + assert(val >= prev_val + 1); + bits::util::write_delta(bvb, val - (prev_val + 1)); + ++written; + } + prev_val = val; + ++val; + } + assert(val == x); + val++; // skip x + } + while (val < m_num_colors) { + assert(val >= prev_val + 1); + bits::util::write_delta(bvb, val - (prev_val + 1)); + prev_val = val; + ++val; + ++written; + } + assert(val == m_num_colors); + /* complementary_set_size = m_num_colors - size */ + assert(m_num_colors - size <= m_num_colors); + assert(written == m_num_colors - size); + } + // ss.write(reinterpret_cast(bvb.data().data()), bvb.data().size() * sizeof(uint64_t)); + return bvb.data().size() * sizeof(uint64_t) - num_bytes_start; + } + + void flush(buffer_t& buffer, const uint32_t num_bytes) { + m_mut.lock(); + const uint64_t num_bits = buffer.num_bits(); + m_file.write(reinterpret_cast(&num_bits), sizeof(uint64_t)); + m_file.write(reinterpret_cast(buffer.data().data()), num_bytes); + m_mut.unlock(); + buffer.clear(); + } + +protected: + std::ofstream m_file; + std::mutex m_mut; + uint32_t m_num_colors, m_sparse_set_threshold_size, m_very_dense_set_threshold_size; +}; + +template +struct fastq_query_reader { + fastq_query_reader(std::string& query_filename, uint64_t num_threads, FulgorIndex& index) + : rparser({query_filename}, num_threads, num_threads-1) + , index(index){ + rparser.start(); + } + + struct query_t { + query_t() : id(-1) {} + query_t(uint32_t id, uint32_t size) : + id(id) { + cids.resize(size); + } + + query_t(uint32_t id, std::vector&& cids_) : + id(id), cids(std::move(cids_)) {} + + uint32_t id; + std::vector cids; + std::string seq; + }; + + struct query_group { + explicit query_group(fastq_query_reader* qb_) + : qb(qb_), rg(qb->rparser.getReadGroup()), curr_read_id(0) {} + + bool has_next() { + return curr_record != rg.end(); + } + + void next() { + ++curr_record; + ++curr_read_id; + } + void operator++() { next(); } + + void value(query_t& query) { + query.id = curr_read_id; + query.cids.clear(); + qb->index.fetch_color_set_ids(curr_record->seq, query.cids); + query.seq = curr_record->seq; + } + + bool refill() { + const bool result = qb->rparser.refill(rg); + if (result) { + curr_record = rg.begin(); + curr_read_id = rg.chunk_frag_offset().frag_idx; + } + return result; + } + + private: + fastq_query_reader* qb; + fastx_parser::ReadGroup rg; + std::vector::iterator curr_record; + uint32_t curr_read_id; + }; + + query_group get_query_group() { + return query_group(this); + } + + ~fastq_query_reader() { + rparser.stop(); + } + +private: + fastx_parser::FastxParser rparser; + FulgorIndex& index; +}; + +struct preprocessed_query_reader { + struct query_t { + query_t(): ids(0), cids(0) {} + query_t(std::vector&& ids_, std::vector&& cids_) : + ids(std::move(ids_)), cids(std::move(cids_)) {} + + void clear() { + ids.clear(); + cids.clear(); + } + + std::vector ids; + std::vector cids; + std::string seq; + }; + + preprocessed_query_reader(std::string const& query_filename, uint64_t num_threads) + : m_query_file(query_filename), m_queue(3*num_threads) + { + constexpr uint64_t batch_thresh = 10000; + + m_producer = std::thread([this] () -> void { + auto* batch = new std::vector(); + batch->reserve(batch_thresh); + uint32_t list_len = 0, query_id; + + query_t query; + this->m_query_file.read(reinterpret_cast(&list_len), sizeof(list_len)); + this->m_query_file.read(reinterpret_cast(&query_id), sizeof(query_id)); + query.ids.push_back(query_id); + query.cids.resize(list_len-1); + this->m_query_file.read(reinterpret_cast(query.cids.data()), + (list_len - 1) * sizeof(list_len)); + + while (this->m_query_file.read(reinterpret_cast(&list_len), sizeof(list_len))) { + assert(list_len > 0); + this->m_query_file.read(reinterpret_cast(&query_id), sizeof(query_id)); + + if (list_len == 1) { + query.ids.push_back(query_id); + continue; + } + batch->push_back(query); + + if (batch->size() >= 10000) { + while (!this->m_queue.try_enqueue(batch)) {} + batch = new std::vector(); + this->in_flight += 1; + } + + query.clear(); + query.ids.push_back(query_id); + query.cids.resize(list_len-1); + this->m_query_file.read(reinterpret_cast(query.cids.data()), + (list_len - 1) * sizeof(uint32_t)); + } + batch->push_back(query); + + + if (!batch->empty()) { + this->m_queue.enqueue(batch); + this->in_flight += 1; + } + this->done = true; + }); + } + + struct query_group { + explicit query_group(preprocessed_query_reader* qb_) + : reader(qb_), sbatch(nullptr) {} + + bool has_next() const { + return sbatch != nullptr && curr_query != sbatch->end(); + } + + void next() { + ++curr_query; + } + void operator++() { next(); } + + void value(query_t& query) const { + query.ids = std::move(curr_query->ids); + query.cids = std::move(curr_query->cids); + } + + bool refill() { + std::lock_guard lock(reader->mut); + while (!reader->done or (reader->in_flight > 0)) { + if (reader->m_queue.try_dequeue(sbatch)) break; + } + if (sbatch != nullptr) { + curr_query = sbatch->begin(); + } + if (reader->in_flight > 0) { + reader->in_flight -= 1; + return true; + } + return !reader->done; + } + + private: + preprocessed_query_reader* reader; + std::vector* sbatch; + std::vector::iterator curr_query; + }; + + query_group get_query_group() { + return query_group(this); + } + + ~preprocessed_query_reader() { + m_producer.join(); + } + +private: + std::ifstream m_query_file; + moodycamel::BlockingConcurrentQueue*> m_queue; + std::thread m_producer; + std::atomic done{false}; + std::atomic in_flight{0}; + std::mutex mut; +}; + +struct ps_options { + explicit ps_options(const pseudoalignment_algorithm algo, const bool verbose, const uint64_t num_threads) + : verbose(verbose) + , algo(algo) + , num_threads(num_threads) + , num_reads(0) + , num_mapped_reads(0) {} + + void increment_processed_reads(const int val = 1) { + uint64_t prev = num_reads.fetch_add(val); + if (verbose && prev >> batch_size != (prev + val) >> batch_size) { + io_mut.lock(); + std::cout << "processed " << num_reads << " reads" << std::endl; + io_mut.unlock(); + } + } + + void increment_mapped_reads(const int val = 1) { + num_mapped_reads += val; + } + + const bool verbose; + const pseudoalignment_algorithm algo; + const uint64_t num_threads; + std::mutex io_mut; + std::atomic num_reads; + std::atomic num_mapped_reads; + +private: + const uint64_t batch_size = 20; +}; + +} \ No newline at end of file diff --git a/tools/build.cpp b/tools/build.cpp index cf54f84..310691b 100644 --- a/tools/build.cpp +++ b/tools/build.cpp @@ -4,8 +4,8 @@ void meta_color(build_configuration const& build_config, const bool force) // { std::string output_filename = build_config.index_filename_to_partition.substr( 0, build_config.index_filename_to_partition.length() - - constants::fulgor_filename_extension.length() - 1) + - "." + constants::meta_colored_fulgor_filename_extension; + constants::hfur_filename_extension.length() - 1) + + "." + constants::mfur_filename_extension; if (std::filesystem::exists(output_filename)) { std::cerr << "An index with the name '" << output_filename << "' alreay exists." @@ -20,8 +20,8 @@ void meta_color(build_configuration const& build_config, const bool force) // essentials::timer timer; timer.start(); - meta_index_type index; - typename meta_index_type::meta_builder builder(build_config); + mfur_index_t index; + typename mfur_index_t::meta_builder builder(build_config); builder.build(index); index.print_stats(); timer.stop(); @@ -38,8 +38,8 @@ void diff_color(build_configuration const& build_config, const bool force) // { std::string output_filename = build_config.index_filename_to_partition.substr( 0, build_config.index_filename_to_partition.length() - - constants::fulgor_filename_extension.length() - 1) + - "." + constants::diff_colored_fulgor_filename_extension; + constants::hfur_filename_extension.length() - 1) + + "." + constants::dfur_filename_extension; if (std::filesystem::exists(output_filename)) { std::cerr << "An index with the name '" << output_filename << "' alreay exists." @@ -54,8 +54,8 @@ void diff_color(build_configuration const& build_config, const bool force) // essentials::timer timer; timer.start(); - differential_index_type index; - typename differential_index_type::differential_builder builder(build_config); + dfur_index_t index; + typename dfur_index_t::differential_builder builder(build_config); builder.build(index); index.print_stats(); timer.stop(); @@ -72,8 +72,8 @@ void meta_diff_color(build_configuration const& build_config, const bool force) { std::string output_filename = build_config.index_filename_to_partition.substr( 0, build_config.index_filename_to_partition.length() - - constants::fulgor_filename_extension.length() - 1) + - "." + constants::meta_diff_colored_fulgor_filename_extension; + constants::hfur_filename_extension.length() - 1) + + "." + constants::mdfur_filename_extension; if (std::filesystem::exists(output_filename)) { std::cerr << "An index with the name '" << output_filename << "' alreay exists." @@ -88,8 +88,8 @@ void meta_diff_color(build_configuration const& build_config, const bool force) std::string meta_filename = build_config.index_filename_to_partition.substr( 0, build_config.index_filename_to_partition.length() - - constants::fulgor_filename_extension.length() - 1) + - "." + constants::meta_colored_fulgor_filename_extension; + constants::hfur_filename_extension.length() - 1) + + "." + constants::mfur_filename_extension; /* first build a meta-colored Fulgor index */ if (!std::filesystem::exists(meta_filename)) { @@ -104,10 +104,10 @@ void meta_diff_color(build_configuration const& build_config, const bool force) meta_diff_build_config.index_filename_to_partition = build_config.index_filename_to_partition.substr( 0, build_config.index_filename_to_partition.length() - - constants::fulgor_filename_extension.length() - 1) + - "." + constants::meta_colored_fulgor_filename_extension; - meta_differential_index_type index; - typename meta_differential_index_type::meta_differential_builder builder( + constants::hfur_filename_extension.length() - 1) + + "." + constants::mfur_filename_extension; + mdfur_index_t index; + typename mdfur_index_t::meta_differential_builder builder( meta_diff_build_config); builder.build(index); index.print_stats(); @@ -152,7 +152,7 @@ int build(int argc, char** argv) { build_configuration build_config; build_config.file_base_name = parser.get("file_base_name"); std::string output_filename = - build_config.file_base_name + "." + constants::fulgor_filename_extension; + build_config.file_base_name + "." + constants::hfur_filename_extension; build_config.index_filename_to_partition = output_filename; bool force = parser.get("force"); build_config.meta_colored = parser.get("meta"); @@ -205,8 +205,8 @@ int build(int argc, char** argv) { essentials::timer timer; timer.start(); - index_type index; - typename index_type::builder builder(build_config); + hfur_index_t index; + typename hfur_index_t::builder builder(build_config); builder.build(index); index.print_stats(); @@ -253,9 +253,9 @@ int color(int argc, char** argv) { build_configuration build_config; build_config.index_filename_to_partition = parser.get("index_filename"); if (!sshash::util::ends_with(build_config.index_filename_to_partition, - "." + constants::fulgor_filename_extension)) { + "." + constants::hfur_filename_extension)) { std::cerr << "Error: the file to partition must have extension \"." - << constants::fulgor_filename_extension + << constants::hfur_filename_extension << "\". Have you first built a Fulgor index with the tool \"build\"?" << std::endl; return 1; diff --git a/tools/kmer_conservation.cpp b/tools/kmer_conservation.cpp index 27e03f1..8bd57eb 100644 --- a/tools/kmer_conservation.cpp +++ b/tools/kmer_conservation.cpp @@ -168,19 +168,19 @@ int kmer_conservation(int argc, char** argv) { if (verbose) util::print_cmd(argc, argv); if (sshash::util::ends_with(index_filename, - constants::meta_diff_colored_fulgor_filename_extension)) { - return kmer_conservation( + constants::mdfur_filename_extension)) { + return kmer_conservation( index_filename, query_filename, output_filename, num_threads, verbose); } else if (sshash::util::ends_with(index_filename, - constants::meta_colored_fulgor_filename_extension)) { - return kmer_conservation(index_filename, query_filename, output_filename, + constants::mfur_filename_extension)) { + return kmer_conservation(index_filename, query_filename, output_filename, num_threads, verbose); } else if (sshash::util::ends_with(index_filename, - constants::diff_colored_fulgor_filename_extension)) { - return kmer_conservation(index_filename, query_filename, + constants::dfur_filename_extension)) { + return kmer_conservation(index_filename, query_filename, output_filename, num_threads, verbose); - } else if (sshash::util::ends_with(index_filename, constants::fulgor_filename_extension)) { - return kmer_conservation(index_filename, query_filename, output_filename, + } else if (sshash::util::ends_with(index_filename, constants::hfur_filename_extension)) { + return kmer_conservation(index_filename, query_filename, output_filename, num_threads, verbose); } diff --git a/tools/permute.cpp b/tools/permute.cpp index d502570..2d9a738 100644 --- a/tools/permute.cpp +++ b/tools/permute.cpp @@ -21,9 +21,9 @@ int permute(int argc, char** argv) { auto index_filename = parser.get("index_filename"); - if (!sshash::util::ends_with(index_filename, "." + constants::fulgor_filename_extension)) { + if (!sshash::util::ends_with(index_filename, "." + constants::hfur_filename_extension)) { std::cerr << "Error: the file to partition must have extension \"." - << constants::fulgor_filename_extension + << constants::hfur_filename_extension << "\". Have you first built a Fulgor index with the tool \"build\"?" << std::endl; return 1; @@ -32,7 +32,7 @@ int permute(int argc, char** argv) { essentials::timer timer; timer.start(); - index_type index; + hfur_index_t index; essentials::logger("step 1. loading index to be partitioned..."); essentials::load(index, index_filename.c_str()); essentials::logger("DONE"); diff --git a/tools/pseudoalign.cpp b/tools/pseudoalign.cpp index 796197f..012dba2 100644 --- a/tools/pseudoalign.cpp +++ b/tools/pseudoalign.cpp @@ -1,160 +1,239 @@ #include #include #include +#include #include "src/ps_full_intersection.cpp" #include "src/ps_threshold_union.cpp" +#include "src/ps_utils.cpp" using namespace fulgor; -enum class pseudoalignment_algorithm : uint8_t { FULL_INTERSECTION, THRESHOLD_UNION }; - -std::string to_string(pseudoalignment_algorithm algo, double threshold) { - std::string o; - switch (algo) { - case pseudoalignment_algorithm::FULL_INTERSECTION: - o = "full-intersection"; - break; - case pseudoalignment_algorithm::THRESHOLD_UNION: - o = "threshold-union (threshold = " + std::to_string(threshold) + ")"; - break; - } - return o; -} - -template -int pseudoalign(FulgorIndex const& index, fastx_parser::FastxParser& rparser, - std::atomic& num_reads, std::atomic& num_mapped_reads, - pseudoalignment_algorithm algo, const double threshold, std::ofstream& out_file, - std::mutex& iomut, std::mutex& ofile_mut, const bool verbose) // +template +int pseudoalign_worker(FulgorIndex const& index, QueryReader& query_reader, + Formatter& formatter, const double threshold, ps_options& options) // { - std::vector colors; // result of pseudoalignment + auto output_buffer = formatter.buffer(); + std::vector tmp, colors; // result of pseudoalignment + std::vector color_set_ids; std::stringstream ss; - uint64_t buff_size = 0; - constexpr uint64_t buff_thresh = 50; - auto rg = rparser.getReadGroup(); - while (rparser.refill(rg)) { - for (auto const& record : rg) { - switch (algo) { + auto qg = query_reader.get_query_group(); + while (qg.refill()) { + while (qg.has_next()){ + typename QueryReader::query_t query; + qg.value(query); + + switch (options.algo) { case pseudoalignment_algorithm::FULL_INTERSECTION: - index.pseudoalign_full_intersection(record.seq, colors); + index.pseudoalign_full_intersection(query.cids, colors, tmp); break; case pseudoalignment_algorithm::THRESHOLD_UNION: - index.pseudoalign_threshold_union(record.seq, colors, threshold); + index.pseudoalign_threshold_union(query.seq, colors, threshold); break; default: break; } - buff_size += 1; - if (!colors.empty()) { - num_mapped_reads += 1; - ss << record.name << '\t' << colors.size(); - for (auto c : colors) { ss << "\t" << c; } - ss << '\n'; + + if constexpr (std::is_same_v) { + options.increment_processed_reads(query.ids.size()); + for (auto qid: query.ids) { + output_buffer.write(qid, colors); + if (!colors.empty()) { + options.increment_mapped_reads(); + } + } } else { - ss << record.name << "\t0\n"; + options.increment_processed_reads(); + output_buffer.write(query.id, colors); + if (!colors.empty()) { + options.increment_mapped_reads(); + } } - num_reads += 1; + colors.clear(); - if (verbose and num_reads > 0 and num_reads % 1000000 == 0) { - iomut.lock(); - std::cout << "mapped " << num_reads << " reads" << std::endl; - iomut.unlock(); - } - if (buff_size > buff_thresh) { - std::string outs = ss.str(); - ss.str(""); - ofile_mut.lock(); - out_file.write(outs.data(), outs.size()); - ofile_mut.unlock(); - buff_size = 0; - } + qg.next(); } } - - // dump anything left in the buffer - if (buff_size > 0) { - std::string outs = ss.str(); - ss.str(""); - ofile_mut.lock(); - out_file.write(outs.data(), outs.size()); - ofile_mut.unlock(); - buff_size = 0; - } - return 0; } -template -int pseudoalign(std::string const& index_filename, std::string const& query_filename, - std::string const& output_filename, uint64_t num_threads, double threshold, - pseudoalignment_algorithm ps_alg, const bool verbose) { - FulgorIndex index; - if (verbose) essentials::logger("loading index from disk..."); - essentials::load(index, index_filename.c_str()); - if (verbose) essentials::logger("DONE"); - - std::cerr << "query mode : " << to_string(ps_alg, threshold) << "\n"; - - std::ifstream is(query_filename.c_str()); - if (!is.good()) { - std::cerr << "error in opening the file '" + query_filename + "'" << std::endl; - return 1; - } - - if (verbose) essentials::logger("performing queries from file '" + query_filename + "'..."); +template +int pseudoalign_orchestrator(FulgorIndex& index, QueryReader& query_reader, + Formatter& formatter, const double threshold, ps_options& options) { essentials::timer t; t.start(); - std::atomic num_mapped_reads{0}; - std::atomic num_reads{0}; - - auto query_filenames = std::vector({query_filename}); + uint64_t num_threads = options.num_threads; assert(num_threads >= 2); - fastx_parser::FastxParser rparser(query_filenames, num_threads, - num_threads - 1); - rparser.start(); + if (options.verbose) essentials::logger("*** START: pseudoalignment"); std::vector workers; workers.reserve(num_threads); - std::mutex iomut; - std::mutex ofile_mut; - - std::ofstream out_file; - out_file.open(output_filename, std::ios::out | std::ios::trunc); - if (!out_file) { - std::cerr << "could not open output file " + output_filename << std::endl; - return 1; - } - for (uint64_t i = 1; i != num_threads; ++i) { - workers.push_back(std::thread([&index, &rparser, &num_reads, &num_mapped_reads, ps_alg, - threshold, &out_file, &iomut, &ofile_mut, verbose]() { - pseudoalign(index, rparser, num_reads, num_mapped_reads, ps_alg, threshold, out_file, - iomut, ofile_mut, verbose); + workers.push_back(std::thread([&index, &query_reader, &formatter, threshold, &options]() { + pseudoalign_worker(index, query_reader, formatter, threshold, options); })); } for (auto& w : workers) w.join(); - rparser.stop(); t.stop(); - if (verbose) essentials::logger("DONE"); + if (options.verbose) essentials::logger("*** DONE: pseudoalignment"); - if (verbose) { - std::cout << "mapped " << num_reads << " reads" << std::endl; + if (options.verbose) { + std::cout << "processed " << options.num_reads << " reads" << std::endl; std::cout << "elapsed = " << t.elapsed() << " millisec / "; std::cout << t.elapsed() / 1000 << " sec / "; std::cout << t.elapsed() / 1000 / 60 << " min / "; - std::cout << (t.elapsed() * 1000) / num_reads << " musec/read" << std::endl; - std::cout << "num_mapped_reads " << num_mapped_reads << "/" << num_reads << " (" - << (num_mapped_reads * 100.0) / num_reads << "%)" << std::endl; + std::cout << (t.elapsed() * 1000) / options.num_reads << " musec/read" << std::endl; + std::cout << "num_mapped_reads " << options.num_mapped_reads << "/" << options.num_reads << " (" + << (options.num_mapped_reads * 100.0) / options.num_reads << "%)" << std::endl; } return 0; } +template +void fetch_and_deduplicate_sets(const std::string& query_filename, + Formatter& output_formatter, + std::string& tmp_filename, + FulgorIndex& index, + ps_options& options) { + auto output_buffer = output_formatter.buffer(); + if (options.verbose) essentials::logger("*** START: fetching color set ids"); + + std::ofstream tmp_file(tmp_filename, std::ios::binary); + auto query_filenames = std::vector({query_filename}); + fastx_parser::FastxParser rparser(query_filenames, options.num_threads, + options.num_threads - 1); + rparser.start(); + std::vector workers; + std::mutex outfile_mut, iomut; + + constexpr int32_t buff_thresh = 50; + std::atomic num_fetched_reads = 0; + auto fetch = [&rparser, &index, &tmp_file, &outfile_mut, &iomut, &num_fetched_reads, &options] () { + uint32_t buff_size = 0; + std::vector color_set_ids; + std::stringstream ss; + + auto rg = rparser.getReadGroup(); + while (rparser.refill(rg)) { + uint32_t read_id = rg.chunk_frag_offset().frag_idx; + + for (auto const& record: rg) { + index.fetch_color_set_ids(record.seq, color_set_ids); + + buff_size += 1; + + ss.write(reinterpret_cast(&read_id), sizeof(read_id)); + uint32_t num_color_sets = static_cast(color_set_ids.size()); + ss.write(reinterpret_cast(&num_color_sets), sizeof(num_color_sets)); + if (num_color_sets > 0) { + ss.write(reinterpret_cast(color_set_ids.data()), num_color_sets * sizeof(color_set_ids[0])); + } + + color_set_ids.clear(); + if (options.verbose && num_fetched_reads > 0 && ++num_fetched_reads % 1000000 == 0) { + iomut.lock(); + std::cout << "fetched " << num_fetched_reads << " reads" << std::endl; + iomut.unlock(); + } + if (buff_size > buff_thresh) { + std::string outs = ss.str(); + ss.str(""); + outfile_mut.lock(); + tmp_file.write(outs.data(), outs.size()); + outfile_mut.unlock(); + buff_size = 0; + } + ++read_id; + } + } + if (buff_size > 0) { + std::string outs = ss.str(); + ss.str(""); + outfile_mut.lock(); + tmp_file.write(outs.data(), outs.size()); + outfile_mut.unlock(); + buff_size = 0; + } + }; + + for (uint64_t i = 1; i < options.num_threads; ++i) { + workers.push_back(std::thread(fetch)); + } + for (auto& w : workers) w.join(); + rparser.stop(); + tmp_file.close(); + + if (options.verbose) essentials::logger("*** DONE: fetching color set ids"); + if (options.verbose) essentials::logger("*** START: deduplicating queries"); + + std::ifstream ifile(tmp_filename, std::ios::binary); + std::vector> queries; + queries.reserve(num_fetched_reads); + + std::vector tmp; + uint32_t read_num = 0; + while (ifile.read(reinterpret_cast(&read_num), sizeof(read_num))) { + uint32_t num_colors = 0; + ifile.read(reinterpret_cast(&num_colors), sizeof(num_colors)); + + tmp.resize(num_colors + 1); + tmp[0] = read_num; + ifile.read(reinterpret_cast(&tmp[1]), num_colors * sizeof(num_colors)); + if (tmp.size() > 1) { + queries.push_back(tmp); + tmp.clear(); + } else { + // just write out the unmapped reads here + output_buffer.write(read_num, {}); + } + } + + if (queries.empty()) { return; } + + std::sort(queries.begin(), queries.end(), + [](const std::vector& a, const std::vector& b) -> bool { + return std::lexicographical_compare(a.begin() + 1, a.end(), b.begin() + 1, + b.end()); + }); + + auto curr = queries.begin(); + auto next = curr++; + size_t identical_lists = 0; + uint64_t identical_sizes = 0; + while (next < queries.end()) { + if ((curr->size() == next->size()) and + std::equal(curr->begin() + 1, curr->end(), next->begin() + 1)) { + identical_sizes += next->size() - 1; + next->resize(1); // retain only the id. + ++identical_lists; + ++next; + } else { + curr = next; + ++next; + } + } + + std::cerr << "number of identical lists = " << identical_lists << " (skipping " + << identical_sizes << " set ids)\n"; + ifile.close(); + std::ofstream ofile(tmp_filename, std::ios::trunc | std::ios::binary); + for (auto& query : queries) { + uint32_t s = query.size(); + ofile.write(reinterpret_cast(&s), sizeof(s)); + if (s > 0) { + ofile.write(reinterpret_cast(query.data()), sizeof(query[0]) * s); + } + } + ofile.close(); + + if (options.verbose) essentials::logger("*** DONE: deduplicating queries"); +} + int pseudoalign(int argc, char** argv) { cmd_line_parser::parser parser(argc, argv); @@ -172,12 +251,20 @@ int pseudoalign(int argc, char** argv) { parser.add("threshold", "Threshold for threshold_union algorithm. It must be a float in (0.0,1.0].", "-r", false); + parser.add("deduplicate", "Removes duplicate queries before pseudoalignment (default is false)." + " Only works on Full-Intersection. Creates a temporary file in the executable's directory.", + "--deduplicate", false, true); + parser.add("format", "Format of the output file. Must either ascii, binary, compressed" + " (default is ascii).", "--format", false); if (!parser.parse()) return 1; auto index_filename = parser.get("index_filename"); auto query_filename = parser.get("query_filename"); auto output_filename = parser.get("output_filename"); + bool deduplicate = parser.get("deduplicate"); + auto output_format = parser.parsed("format") ? parser.get("format") : "ascii"; + uint64_t num_threads = 1; if (parser.parsed("num_threads")) num_threads = parser.get("num_threads"); if (num_threads == 1) { @@ -188,39 +275,92 @@ int pseudoalign(int argc, char** argv) { } double threshold = constants::invalid_threshold; - if (parser.parsed("threshold")) threshold = parser.get("threshold"); - if (threshold == 0.0 or threshold > 1.0) { - std::cerr << "threshold must be a float in (0.0,1.0]" << std::endl; - return 1; + if (parser.parsed("threshold")) { + threshold = parser.get("threshold"); + if (threshold <= 0.0 or threshold > 1.0) { + std::cerr << "threshold must be a float in (0.0,1.0]" << std::endl; + return 1; + } } auto ps_alg = pseudoalignment_algorithm::FULL_INTERSECTION; if (threshold != constants::invalid_threshold) { + if (deduplicate) { + cerr << "Deduplication not available for threshold < 1.0. Remove --deduplicate flag." << std::endl; + return 1; + } ps_alg = pseudoalignment_algorithm::THRESHOLD_UNION; } bool verbose = parser.get("verbose"); if (verbose) util::print_cmd(argc, argv); - if (sshash::util::ends_with(index_filename, - constants::meta_diff_colored_fulgor_filename_extension)) { - return pseudoalign(index_filename, query_filename, - output_filename, num_threads, threshold, - ps_alg, verbose); - } else if (sshash::util::ends_with(index_filename, - constants::meta_colored_fulgor_filename_extension)) { - return pseudoalign(index_filename, query_filename, output_filename, - num_threads, threshold, ps_alg, verbose); - } else if (sshash::util::ends_with(index_filename, - constants::diff_colored_fulgor_filename_extension)) { - return pseudoalign(index_filename, query_filename, output_filename, - num_threads, threshold, ps_alg, verbose); - } else if (sshash::util::ends_with(index_filename, constants::fulgor_filename_extension)) { - return pseudoalign(index_filename, query_filename, output_filename, num_threads, - threshold, ps_alg, verbose); + std::variant index; + if (is_meta_diff(index_filename)) { + index = mdfur_index_t(); + } else if (is_meta(index_filename)) { + index = mfur_index_t(); + } else if (is_diff(index_filename)) { + index = dfur_index_t(); + } else if (is_hybrid(index_filename)) { + index = hfur_index_t(); + } else { + std::cerr << "Wrong index filename supplied." << std::endl; + return 1; + } + + std::variant formatter; + if (output_format == "ascii") { + formatter.emplace(output_filename); + } else if (output_format == "binary") { + formatter.emplace(output_filename); + } else if (output_format == "compressed") { + formatter.emplace(output_filename); + } else { + std::cout << "Unknown output format. Supported formats: ascii, binary, compressed." << std::endl; + return 1; + } + + std::string tmp_filename = "queries.tmp"; + ps_options options(ps_alg, verbose, num_threads); + + if (verbose) { + std::cout << "\n---------------------------------" << std::endl; + std::cout << "[Index] " << index_filename << std::endl; + std::cout << "[Queries] " << query_filename << std::endl; + std::cout << "[Output] " << output_filename << std::endl; + std::cout << "[Algorithm] " << to_string(ps_alg, threshold) << (deduplicate ? "(dedup.)" : "") << std::endl; + std::cout << "---------------------------------\n" << std::endl; } - std::cerr << "Wrong index filename supplied." << std::endl; + std::visit([&index_filename, &query_filename, &output_filename, &tmp_filename, + deduplicate, num_threads, threshold, verbose, &options] + (auto&& index, auto&& formatter) { + if (verbose) essentials::logger("*** START: loading the index"); + essentials::load(index, index_filename.c_str()); + if (verbose) essentials::logger("*** DONE: loading the index"); + + if (verbose) essentials::logger("performing queries from file '" + query_filename + "'..."); - return 1; + if constexpr (std::is_same_v, psa_compressed_formatter>) { + formatter.set_num_colors(index.num_colors()); + } + if constexpr (!std::is_same_v, std::monostate>) { + std::ofstream out(output_filename); + + if (deduplicate) { + fetch_and_deduplicate_sets(query_filename, formatter, tmp_filename, index, options); + preprocessed_query_reader query_reader(tmp_filename, num_threads); + pseudoalign_orchestrator(index, query_reader, formatter, threshold, options); + } else { + fastq_query_reader query_reader(query_filename, num_threads, index); + pseudoalign_orchestrator(index, query_reader, formatter, threshold, options); + } + } + + }, index, formatter); + + std::remove(tmp_filename.c_str()); + + return 0; } diff --git a/tools/util.cpp b/tools/util.cpp index 2e80d03..595363b 100644 --- a/tools/util.cpp +++ b/tools/util.cpp @@ -2,21 +2,21 @@ using namespace fulgor; bool is_meta(std::string const& index_filename) { return sshash::util::ends_with(index_filename, - constants::meta_colored_fulgor_filename_extension); + constants::mfur_filename_extension); } bool is_meta_diff(std::string const& index_filename) { return sshash::util::ends_with(index_filename, - constants::meta_diff_colored_fulgor_filename_extension); + constants::mdfur_filename_extension); } bool is_diff(std::string const& index_filename) { return sshash::util::ends_with(index_filename, - constants::diff_colored_fulgor_filename_extension); + constants::dfur_filename_extension); } bool is_hybrid(std::string const& index_filename) { - return sshash::util::ends_with(index_filename, constants::fulgor_filename_extension); + return sshash::util::ends_with(index_filename, constants::hfur_filename_extension); } template @@ -68,13 +68,13 @@ int verify(int argc, char** argv) { util::print_cmd(argc, argv); auto index_filename = parser.get("index_filename"); if (is_meta(index_filename)) { - verify(index_filename); + verify(index_filename); } else if (is_meta_diff(index_filename)) { - verify(index_filename); + verify(index_filename); } else if (is_diff(index_filename)) { - verify(index_filename); + verify(index_filename); } else if (is_hybrid(index_filename)) { - verify(index_filename); + verify(index_filename); } else { std::cerr << "Wrong filename supplied." << std::endl; return 1; @@ -89,13 +89,13 @@ int stats(int argc, char** argv) { util::print_cmd(argc, argv); auto index_filename = parser.get("index_filename"); if (is_meta(index_filename)) { - print_stats(index_filename); + print_stats(index_filename); } else if (is_meta_diff(index_filename)) { - print_stats(index_filename); + print_stats(index_filename); } else if (is_diff(index_filename)) { - print_stats(index_filename); + print_stats(index_filename); } else if (is_hybrid(index_filename)) { - print_stats(index_filename); + print_stats(index_filename); } else { std::cerr << "Wrong filename supplied." << std::endl; return 1; @@ -110,13 +110,13 @@ int print_filenames(int argc, char** argv) { util::print_cmd(argc, argv); auto index_filename = parser.get("index_filename"); if (is_meta_diff(index_filename)) { - print_filenames(index_filename); + print_filenames(index_filename); } else if (is_meta(index_filename)) { - print_filenames(index_filename); + print_filenames(index_filename); } else if (is_diff(index_filename)) { - print_filenames(index_filename); + print_filenames(index_filename); } else if (is_hybrid(index_filename)) { - print_filenames(index_filename); + print_filenames(index_filename); } else { std::cerr << "Wrong filename supplied." << std::endl; return 1; @@ -145,27 +145,27 @@ int dump(int argc, char** argv) { if (is_meta_diff(index_filename)) { std::string basename{index_filename.data(), index_filename.length() - - constants::meta_diff_colored_fulgor_filename_extension.length() - + constants::mdfur_filename_extension.length() - 1}; - dump( + dump( index_filename, output_basename.length() == 0 ? basename : output_basename); } else if (is_meta(index_filename)) { std::string basename{index_filename.data(), index_filename.length() - - constants::meta_colored_fulgor_filename_extension.length() - 1}; - dump(index_filename, + constants::mfur_filename_extension.length() - 1}; + dump(index_filename, output_basename.length() == 0 ? basename : output_basename); } else if (is_diff(index_filename)) { std::string basename{index_filename.data(), index_filename.length() - - constants::diff_colored_fulgor_filename_extension.length() - 1}; - dump(index_filename, + constants::dfur_filename_extension.length() - 1}; + dump(index_filename, output_basename.length() == 0 ? basename : output_basename); } else if (is_hybrid(index_filename)) { std::string basename{ index_filename.data(), - index_filename.length() - constants::fulgor_filename_extension.length() - 1}; - dump(index_filename, + index_filename.length() - constants::hfur_filename_extension.length() - 1}; + dump(index_filename, output_basename.length() == 0 ? basename : output_basename); } else { std::cerr << "Wrong filename supplied." << std::endl;