from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
"""Modified version of BioPython.bgzf module. Includes LRU buffer dictionary.
Copyright (c) 2018, Marcus D. Sherman
This code is part of the bamnostic distribution and governed by its
license. Please see the LICENSE file that should have been included
as part of this package.
Some methods are modified versions of their counterparts within the BioPython.bgzf
module. BioPython BGZF is now under a 3-clause BSD license. The same license
found in this package. Refer to LICENSE for information.
Copyright (c) 2010-2015 by Peter Cock.
Description: Read and write BGZF compressed files (the GZIP variant used in BAM).
Significant changes were made to the original BGZF module, produced by
Peter Cock. Aside from adding an LRU dictionary, the new BGZF module can read
BAM files directly, decompressing and unpacking the byte-encoded data structure
outlined in the BAM_ format.
.. _BAM: https://samtools.github.io/hts-specs/SAMv1.pdf
@author: "Marcus D. Sherman"
@copyright: "Copyright 2018, University of Michigan, Mills Lab
@email: "mdsherman<at>betteridiot<dot>tech"
"""
import sys
import zlib
import struct
import io
import os
import warnings
import array
import re
import bamnostic
from bamnostic import bgzf, bai, csi
from bamnostic.utils import *
_PY_VERSION = sys.version_info
if _PY_VERSION[0] == 2:
from io import open
def _format_warnings(message, category, filename, lineno, file=None, line=None):
""" Warning formatter
Args:
message: warning message
category (str): level of warning
filename (str): path for warning output
lineno (int): Where the warning originates
Returns:
Formatted warning for logging purposes
"""
return ' {}:{}:{}: {}\n'.format(category.__name__, filename, lineno, message)
warnings.formatwarning = _format_warnings
[docs]class BamReader(bgzf.BgzfReader):
""" The BAM reader. Heavily modified from Peter Cock's BgzfReader.
Attributes:
header: representation of header data (if present)
lengths (:py:obj:`list` of :py:obj:`int`): lengths of references listed in header
nocoordinate (int): number of reads that have no coordinates
nreferences (int): number of references in header
ref2tid (:py:obj:`dict` of :py:obj:`str`, :py:obj:`int`): refernce names and refID dictionary
references (:py:obj:`list` of :py:obj:`str`): names of references listed in header
text (str): SAM header (if present)
unmapped (int): number of unmapped reads
Note:
This implementation is likely to change. While the API was meant to
mirror `pysam`, it makes sense to include the `pysam`-like API in an extension
that will wrap the core reader. This would be a major refactor, and therefore
will not happen any time soon (30 May 2018).
"""
def __init__(self, filepath_or_object, mode="rb", max_cache=128, index_filename=None,
filename=None, check_header=False, check_sq=True, reference_filename=None,
filepath_index=None, require_index=False, duplicate_filehandle=None,
ignore_truncation=False):
"""Initialize the class.
Args:
filepath_or_object (str | :py:obj:`file`): the path or file object of the BAM file
mode (str): Mode for reading. BAM files are binary by nature (default: 'rb').
max_cache (int): number of desired LRU cache size, preferably a multiple of 2 (default: 128).
index_filename (str): path to index file (BAI) if it is named differently than the BAM file (default: None).
filename (str | :py:obj:`file`): synonym for `filepath_or_object`
check_header (bool): Obsolete method maintained for backwards compatibility (default: False)
check_sq (bool): Inspect BAM file for `@SQ` entries within the header
reference_filename (str): Not implemented. Maintained for backwards compatibility
filepath_index (str): synonym for `index_filename`
require_index (bool): require the presence of an index file or raise (default: False)
duplicate_filehandle (bool): Not implemented. Raises warning if True.
ignore_truncation (bool): Whether or not to allow trucated file processing (default: False).
"""
# # Connect to the BAM file
# self._handle = handle
super_args = {'filepath_or_object': locals()['filepath_or_object'],
'mode': locals()['mode'], 'max_cache': locals()['max_cache'],
'filename': locals()['filename'], 'ignore_truncation': locals()['ignore_truncation'],
'duplicate_filehandle': locals()['duplicate_filehandle']}
super(BamReader, self).__init__(**super_args)
self._ignore_truncation = ignore_truncation
self._truncated = self._check_truncation()
# Check BAM file integrity
if not self._ignore_truncation:
if self._truncated:
raise Exception('BAM file may be truncated. Turn off ignore_truncation if you wish to continue')
# Connect and process the Index file (if present)
self._index_ext = None
self._index = None
if filepath_index and index_filename and index_filename != filepath_index:
raise IOError('Use index_filename or filepath_or_object. Not both')
self._check_idx = self.check_index(index_filename if index_filename else filepath_index, require_index)
self._init_index()
# Load in the BAM header as an instance attribute
self._load_header(check_sq)
# Helper dictionary for changing reference names to refID/TID
self.ref2tid = {v[0]: k for k, v in self._header.refs.items()}
# Final exception handling
if check_header:
warnings.warn('Obsolete method', UserWarning)
if duplicate_filehandle:
warnings.warn('duplicate_filehandle not necessary as the C API for samtools is not used', UserWarning)
if reference_filename:
raise NotImplementedError('CRAM file support not yet implemented')
[docs] def check_index(self, index_filename=None, req_idx=False):
""" Checks to make sure index file is available. If not, it disables random access.
Args:
index_filename (str): path to index file (BAI) if it does not fit naming convention (default: None).
req_idx (bool): Raise error if index file is not present (default: False).
Returns:
(bool): True if index is present, else False
Raises:
IOError: If the index file is closed or index could not be opened
Warns:
UserWarning: If index could not be loaded. Random access is disabled.
Examples:
>>> bam = bamnostic.AlignmentFile(bamnostic.example_bam)
>>> bam.check_index(bamnostic.example_bam + '.bai')
True
>>> bam.check_index('not_a_file.bai')
False
"""
if index_filename is None:
for ext in ('csi', 'bai'):
possible_index_path = r'{}.{}'.format(os.path.abspath(self._handle.name), ext)
if os.path.isfile(possible_index_path):
self._index_path = possible_index_path
self._random_access = True
self._index_ext = ext
return True
else:
if req_idx:
raise IOError('htsfile is closed or index could not be opened')
warnings.warn("No supplied index file was not found. Random access disabled", UserWarning)
self._random_access = False
return False
else:
if os.path.isfile(index_filename):
self._index_path = index_filename
self._random_access = True
self._index_ext = index_filename.split('.')[-1]
return True
else:
if req_idx:
raise IOError('htsfile is closed or index could not be opened')
warnings.warn("Index file '{}' was not found. Random access disabled".format(index_filename), UserWarning)
self._random_access = False
return False
def _init_index(self):
"""Initialize the index file (BAI)"""
if self._check_idx:
# self._index = bamnostic.bai.Bai(self._index_path)
if self._index_ext == "csi":
self._index = csi.Csi(self._index_path)
elif self._index_ext == "bai":
self._index = bai.Bai(self._index_path)
self.__nocoordinate = self._index.n_no_coor
self.__mapped = sum(
self._index.unmapped[mapped].n_mapped
for mapped in self._index.unmapped
) + (self.__nocoordinate if self.__nocoordinate is not None else 0)
self.__unmapped = sum(
self._index.unmapped[unmapped].n_unmapped
for unmapped in self._index.unmapped
) + (self.__nocoordinate if self.__nocoordinate is not None else 0)
@property
def nocoordinate(self):
"""Get the number of reads without coordiantes according to the statistics recorded in the index.
Returns:
(int): sum of reads without coordinates
"""
return self.__nocoordinate
@property
def mapped(self):
"""Get the number of mapped reads according to the statistics recorded in the index.
Returns:
(int): sum of mapped reads
"""
return self.__mapped
@property
def unmapped(self):
"""Get the number of unmapped reads without coordiantes **and**
without coordinate.
Returns:
(int): sum of unmapped reads and reads without coordinates
"""
return self.__unmapped + self.__nocoordinate
def _check_sq(self):
""" Inspect BAM file for @SQ entries within the header
The implementation of this check is for BAM files specifically. I inspects
the SAM header (if present) for the `@SQ` entires. However, if the SAM header
is not present, will inspect the BAM header for reference sequence entries. If this
test ever returns `FALSE`, the BAM file is not operational.
Returns:
(bool): True if present, else false
Example:
>>> bam = bamnostic.AlignmentFile(bamnostic.example_bam, 'rb')
>>> bam._check_sq()
True
"""
if self._header._header_length == 0:
if not self._header.refs:
return False
else:
if 'SQ' not in self._header.SAMheader:
return False
return True
def _load_header(self, check_sq=True):
""" Loads the header into the reader object
Args:
check_sq (bool): whether to check for file header or not (default: True).
Raises:
KeyError: If 'SQ' entry is not present in BAM header
"""
self._header = BAMheader(self)
self.header = self._header.SAMheader if self._header.SAMheader else self._header
self.text = self._header._SAMheader_raw
# make compatible with pysam attributes, even though the data exists elsewhere
self.__references = []
self.__lengths = []
for n in range(self._header.n_refs):
self.__references.append(self._header.refs[n][0])
self.__lengths.append(self._header.refs[n][1])
self.__nreferences = self._header.n_refs
if check_sq:
if not self._check_sq():
raise KeyError('No SQ entries in header')
@property
def references(self):
""" Get all references used in alignment.
Returns:
(:py:obj:`tuple` of :py:obj:`str`): reference names
"""
return tuple(self.__references)
@property
def nreferences(self):
""" Get the number of references listed in alignment
Returns:
(int): count of references
"""
return self.__nreferences
@property
def lengths(self):
""" Get all reference lengths used in alignment.
Returns:
(:py:obj:`tuple` of :py:obj:`int`): reference lengths
"""
return tuple(self.__lengths)
def _check_truncation(self):
""" Confusing function to check for file truncation.
Every BAM file should contain an EOF signature within the last
28 bytes of the file. This function checks for that signature.
Returns:
(bool): True if truncated, else False
Warns:
BytesWarning: if no EOF signature found.
"""
temp_pos = self._handle.tell()
self._handle.seek(-28, 2)
eof = self._handle.read()
self._handle.seek(temp_pos)
if eof == bgzf._bgzf_eof:
return False
else:
warnings.warn('No EOF character found. File may be truncated', BytesWarning)
return True
def pileup(self):
raise NotImplementedError('Pileup is not implemented. Consider using `fetch` instead')
[docs] def has_index(self):
"""Checks if file has index and it is open
Returns:
bool: True if present and opened, else False
"""
if self._check_idx and self._index:
return self._check_idx
[docs] def fetch(self, contig=None, start=None, stop=None, region=None,
tid=None, until_eof=False, multiple_iterators=False,
reference=None, end=None):
"""Creates a generator that returns all reads within the given region. (inclusive, exclusive)
Args:
contig (str): name of reference/contig
start (int): start position of region of interest (0-based)
stop (int): stop position of region of interest (0-based)
region (str): SAM region formatted string. Accepts tab-delimited values as well
tid (int): the refID or target id of a reference/contig
until_eof (bool): iterate until end of file
mutiple_iterators (bool): allow multiple iterators over region. Not Implemented. \
Notice: each iterator will open up a new view into the BAM file, so overhead will apply.
reference (str): synonym for `contig`
end (str): synonym for `stop`
Yields:
reads over the region of interest if any
Raises:
ValueError: if the genomic coordinates are out of range or invalid
KeyError: Reference is not found in header
Note:
SAM region formatted strings take on the following form:
'chr1:100000-200000'
Usage:
.. code-block:: python
AlignmentFile.fetch(contig='chr1', start=1, stop= 1000)
AlignmentFile.fetch('chr1', 1, 1000)
AlignmentFile.fetch('chr1:1-1000')
AlignmentFile.fetch('chr1', 1)
AlignmentFile.fetch('chr1')
Examples:
>>> bam = bamnostic.AlignmentFile(bamnostic.example_bam, 'rb')
>>> next(bam.fetch('chr1', 1, 100)) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
EAS56_57:6:190:289:82 ... MF:C:192
>>> next(bam.fetch('chr10', 1, 10))
Traceback (most recent call last):
...
KeyError: 'chr10 was not found in the file header'
>>> next(bam.fetch('chr1', 1700, 1701))
Traceback (most recent call last):
...
ValueError: Genomic region out of bounds.
>>> next(bam.fetch('chr1', 100, 10))
Traceback (most recent call last):
...
AssertionError: Malformed region: start should be <= stop, you entered 100, 10
"""
# Inclusive, exclusive. This means if start and stop are
# the same, then the user is *essentially* looking at nothing
# e.g. a = "abc"; print(a[1:1]) -> ''
if start is not None and start == stop:
return
if not self._random_access:
raise ValueError('Random access not available due to lack of index file')
if multiple_iterators:
raise NotImplementedError('multiple_iterators not yet implemented')
signature = locals()
for key in ['self', 'multiple_iterators']:
signature.pop(key)
query = parse_region(**signature)
if query.tid is not None and query.contig is None:
query.contig = self.get_reference_name(tid)
elif query.tid is not None and query.contig is not None:
if self.ref2tid[query.contig] != tid:
raise ValueError('tid and contig name do not match')
elif query.contig is not None and query.tid is None:
try:
query.tid = self.ref2tid[query.contig]
except KeyError:
raise KeyError('{} was not found in the file header'.format(query.contig))
try:
if query.start is None:
query.start = 0
if query.start > self._header.refs[query.tid][1]:
raise ValueError('Genomic region out of bounds.')
if query.stop is None:
# set end to length of chromosome
query.stop = self._header.refs[query.tid][1]
assert query.start <= query.stop, 'Malformed region: start should be <= stop, you entered {}, {}'.format(query.start, query.stop)
except KeyError:
raise KeyError('{} was not found in the file header'.format(query.contig))
# from the index, get the virtual offset of the chunk that
# begins the overlapping region of interest
first_read_block = self._index.query(query.tid, query.start, query.stop)
if first_read_block is None:
return
# move to that virtual offset...should load the block into the cache
# if it hasn't been visited before
self.seek(first_read_block)
for next_read in self:
if not until_eof:
# check to see if the read is out of bounds of the region
# On the wrong contig -> not the right place
if next_read.reference_name != query.contig:
return
# Read is too far left -> keep going
elif (next_read.pos + len(next_read.seq)) < query.start:
continue
# Originates outside, but overlaps
elif next_read.pos < query.start <= (next_read.pos + len(next_read.seq)):
yield next_read
# Read wholly inside region
elif query.start <= next_read.pos < query.stop:
yield next_read
# Read too far right -> gotta stop
elif query.stop <= next_read.pos:
return
# check for stop iteration
elif next_read:
yield next_read
# Empty read
else:
return
# Read until the end of file
else:
try:
yield next_read
except:
return
else:
return
[docs] def count(self, contig=None, start=None, stop=None, region=None,
until_eof=False, tid=None, read_callback='nofilter',
reference=None, end=None):
r"""Count the number of reads in the given region
Note: this counts the number of reads that **overlap** the given region.
Can potentially make use of a filter for the reads (or custom function
that returns `True` or `False` for each read).
Args:
contig (str): the reference name (Default: None)
reference (str): synonym for `contig` (Default: None)
start (int): 0-based inclusive start position (Default: None)
stop (int): 0-based exclusive start position (Default: None)
end (int): Synonym for `stop` (Default: None)
region (str): SAM-style region format. \
Example: 'chr1:10000-50000' (Default: None)
until_eof (bool): count number of reads from start to end of file \
Note, this can potentially be an expensive operation. \
(Default: False)
read_callback (str|function): select (or create) a filter of which \
reads to count. Built-in filters:
* `all`: skips reads that contain the following flags:
* 0x4 (4): read unmapped
* 0x100 (256): not primary alignment
* 0x200 (512): QC Fail
* 0x400 (1024): PCR or optical duplicate
* `nofilter`: uses all reads (Default)
* The user can also supply a custom function that \
returns boolean objects for each read
Returns:
(int): count of reads in the given region that meet parameters
Raises:
ValueError: if genomic coordinates are out of range or invalid or random access is disabled
RuntimeError: if `read_callback` is not properly set
KeyError: Reference is not found in header
AssertionError: if genomic region is malformed
Notes:
SAM region formatted strings take on the following form:
'chr1:100000-200000'
Usage:
AlignmentFile.count(contig='chr1', start=1, stop= 1000)
AlignmentFile.count('chr1', 1, 1000)
AlignmentFile.count('chr1:1-1000')
AlignmentFile.count('chr1', 1)
AlignmentFile.count('chr1')
Example:
>>> bam = bamnostic.AlignmentFile(bamnostic.example_bam, 'rb')
>>> bam.count('chr1', 1, 100)
2
>>> bam.count('chr1', 1, 100, read_callback='all')
1
>>> bam.count('chr10', 1, 10)
Traceback (most recent call last):
...
KeyError: 'chr10 was not found in the file header'
>>> bam.count('chr1', 1700, 1701)
Traceback (most recent call last):
...
ValueError: Genomic region out of bounds.
>>> bam.count('chr1', 100, 10)
Traceback (most recent call last):
...
AssertionError: Malformed region: start should be <= stop, you entered 100, 10
"""
# pass the signature to fetch
signature = locals()
signature.pop('read_callback')
signature.pop('self')
roi_reads = self.fetch(**signature)
# make `nofilter` the default filter unless told otherwise
# read_callback = kwargs.get('read_callback', 'nofilter')
# go through all the reads over a given region and count them
count = 0
for read in roi_reads:
if filter_read(read, read_callback):
count += 1
return count
[docs] def count_coverage(self, contig=None, start=None, stop=None, region=None,
quality_threshold=15, read_callback='all',
reference=None, end=None, base_quality_threshold=0):
""" Counts the coverage of each base supported by a read the given interval.
Given an interval (inclusive, exclusive), this method pulls each read that overlaps
with the region. To ensure that the read truly overlaps with the region, the CIGAR string
is required. These reads can further be filtered out by their flags, MAPQ qualities, or
custom filtering function. Using the CIGAR string, the aligned portion of the read
is traversed and the presence of each nucleotide base is tallied into respective arrays.
Additionally, the user can choose to filter the counted bases based on its base quality
score that is stored in the quality string.
Args:
contig (str): the reference name (Default: None)
reference (str): synonym for `contig` (Default: None)
start (int): 0-based inclusive start position (Default: None)
stop (int): 0-based exclusive start position (Default: None)
end (int): Synonym for `stop` (Default: None)
region (str): SAM-style region format. \
Example: 'chr1:10000-50000' (Default: None)
quality_threshold (int): MAPQ quality threshold (Default: 15)
base_quality_threshold (int): base quality score threshold (Default: 0)
read_callback (str|function): select (or create) a filter of which \
reads to count. Built-in filters:
* `all`: skips reads that contain the following flags:
* 0x4 (4): read unmapped
* 0x100 (256): not primary alignment
* 0x200 (512): QC Fail
* 0x400 (1024): PCR or optical duplicate
* `nofilter`: uses all reads (Default)
* The user can also supply a custom function that \
returns boolean objects for each read
Returns:
(:py:obj:`array.array`): Four arrays in the order of **A**, **C**, **G**, **T**
Raises:
ValueError: if genomic coordinates are out of range or invalid, random access is disabled, or nucleotide base is unrecognized
RuntimeError: if `read_callback` is not properly set
KeyError: Reference is not found in header
AssertionError: if genomic region is malformed
Notes:
SAM region formatted strings take on the following form:
'chr1:100-200'
Usage:
AlignmentFile.count_coverage(contig='chr1', start=1, stop= 1000)
AlignmentFile.count_coverage('chr1', 1, 1000)
AlignmentFile.count_coverage('chr1:1-1000')
AlignmentFile.count_coverage('chr1', 1)
AlignmentFile.count_coverage('chr1')
Examples:
>>> bam = bamnostic.AlignmentFile(bamnostic.example_bam, 'rb')
>>> for arr in bam.count_coverage('chr1', 100, 150): # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
... print("array('{}', {})".format(arr.typecode, list(map(int, arr.tolist()))))
array('L', [0, 0, 0, 0, ..., 0, 0, 0, 0, 0])
array('L', [0, 0, 0, 0, ..., 15, 0, 14, 0, 0])
array('L', [1, 1, 2, 2, ..., 0, 0, 0, 0, 14])
array('L', [0, 0, 0, 0, ..., 0, 15, 0, 14, 0])
>>> for arr in bam.count_coverage('chr1', 100, 150, quality_threshold=20, base_quality_threshold=25): # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
... print("array('{}', {})".format(arr.typecode, list(map(int, arr.tolist()))))
array('L', [0, 0, 0, 0, ..., 0, 0, 0, 0, 0])
array('L', [0, 0, 0, 0, ..., 13, 0, 14, 0, 0])
array('L', [1, 1, 2, 2, ..., 0, 0, 0, 0, 13])
array('L', [0, 0, 0, 0, ..., 0, 14, 0, 13, 0])
"""
signature = locals()
for key in ['self', 'quality_threshold', 'read_callback', 'base_quality_threshold']:
signature.pop(key)
adenine = array.array('L', [0] * (stop - start))
cytosine = adenine[:]
guanine = adenine[:]
thymine = adenine[:]
for read in self.fetch(**signature):
if read.cigarstring is not None and read.mapq >= quality_threshold:
if filter_read(read, read_callback):
for base, index in cigar_alignment(seq=read.seq, cigar=read.cigarstring,
start_pos=read.pos, qualities=read.query_qualities,
base_qual_thresh=base_quality_threshold):
if start <= index < stop:
if base == 'A':
adenine[index - start if index-start > 0 else 0] += 1
elif base == 'G':
guanine[index - start if index-start > 0 else 0] += 1
elif base == 'C':
cytosine[index - start if index-start > 0 else 0] += 1
elif base == 'T':
thymine[index - start if index-start > 0 else 0] += 1
else:
raise ValueError('Read base was {}, not A, T, C, or G'.format(base))
return adenine, cytosine, guanine, thymine
[docs] def get_index_stats(self):
""" Inspects the index file (BAI) for alignment statistics.
Every BAM index file contains metrics regarding the alignment
process for the given BAM file. The stored data are the number
of mapped and unmapped reads for a given reference. Unmapped reads
are paired end reads where only one part is mapped. Additionally,
index files also contain the number of unplaced unmapped reads. This
is stored within the `nocoordinate` instance attribute (if present).
Returns:
idx_stats (:py:obj:`list` of :py:obj:`tuple`): list of tuples for each reference in the order seen in the header. Each tuple contains the number of mapped reads, unmapped reads, and the sum of both.
Raises:
AssertionError: if the index file is not available
Example:
>>> bam = bamnostic.AlignmentFile(bamnostic.example_bam, 'rb')
>>> bam.get_index_stats()
[(1446, 18, 1464), (1789, 17, 1806)]
>>> bam_no_bai = bamnostic.AlignmentFile(bamnostic.example_bam, 'rb', index_filename='not_a_file.bai')
>>> bam_no_bai.get_index_stats()
Traceback (most recent call last):
...
AssertionError: No index available
"""
assert self._check_idx, 'No index available'
idx_stats = []
for ref in range(self._header.n_refs):
try:
mapped = self._index.unmapped[ref].n_mapped
unmapped = self._index.unmapped[ref].n_unmapped
idx_stats.append((mapped, unmapped, mapped + unmapped))
except KeyError:
idx_stats.append((0, 0, 0))
return idx_stats
[docs] def is_valid_tid(self, tid):
""" Return `True` if TID/RefID is valid.
Returns:
`True` if TID/refID is valid, else `False`
Example:
>>> bam = bamnostic.AlignmentFile(bamnostic.example_bam, 'rb')
>>> bam.is_valid_tid(0)
True
>>> bam.is_valid_tid(10) # because there are only 2 in this file
False
"""
return True if tid in self._header.refs else False
[docs] def get_reference_name(self, tid):
""" Convert TID/refID to reference name.
The TID/refID is the position a reference sequence is seen
within the header file of the BAM file. The references are
sorted by ASCII order. Therefore, for a **Homo sapien** aligned
to GRCh38, 'chr10' comes before 'chr1' in the header. Therefore,
'chr10' would have the TID/refID of 0, not 'chr1'.
Args:
tid (int): TID/refID of desired reference/contig
Returns:
String representation of chromosome if valid, else None
Raises:
KeyError: if TID/refID is not valid
Examples:
>>> bam = bamnostic.AlignmentFile(bamnostic.example_bam, 'rb')
>>> bam.get_reference_name(0) # doctest: +ALLOW_UNICODE
\'chr1'
>>> bam.get_reference_name(10)
Traceback (most recent call last):
...
KeyError: '10 is not a valid TID/refID for this file.'
"""
if self.is_valid_tid(tid):
return self._header.refs[tid][0]
else:
raise KeyError('{} is not a valid TID/refID for this file.'.format(tid))
[docs] def get_tid(self, reference):
""" Convert reference/contig name to refID/TID.
The TID/refID is the position a reference sequence is seen
within the header file of the BAM file. The references are
sorted by ASCII order. Therefore, for a **Homo sapien** aligned
to GRCh38, 'chr10' comes before 'chr1' in the header. Therefore,
'chr10' would have the TID/refID of 0, not 'chr1'.
Args:
reference (str): reference/contig name
Returns:
(int): the TID/refID of desired reference/contig
Raises:
KeyError: if reference name not found file header
Example:
>>> bam = bamnostic.AlignmentFile(bamnostic.example_bam, 'rb')
>>> bam.get_tid('chr1')
0
>>> bam.get_tid('chr10')
Traceback (most recent call last):
...
KeyError: 'chr10 was not found in the file header'
"""
tid = self.ref2tid.get(reference, -1)
if tid == -1:
raise KeyError('{} was not found in the file header'.format(reference))
return tid
[docs] def mate(self, AlignedSegment):
""" Gets the mate to a given AlignedSegment.
Note:
Slow, when compared to the C-API. Not meant for high-throughput analysis.
Does not advance current iterator position.
Args:
AlignedSegment (:py:class:`bamnostic.AlignedSegment`): a bamnostic AlignedSegment read with a mate
Returns:
(:py:class:`bamnostic.AlignedSegment`): if read has a valid mate, else None
Raises:
ValueError: if AlignedSegment is unpaired
"""
with bamnostic.AlignmentFile(self._handle.name, index_filename=self._index_path) as mate_head:
# Don't look if there isn't a pair
if not AlignedSegment.is_paired:
raise ValueError('Read is unpaired')
# Based on standard convention
read_name_base = read_name_pat.search(AlignedSegment.read_name).groupdict()['read_name']
rnext = AlignedSegment.next_reference_id
# Look for available mate information
if rnext < 0:
return None # Information is unavailable
pnext = AlignedSegment.next_reference_start
if pnext < 0:
return None # no information available on read
mate_gen = mate_head.fetch(tid=rnext, start=pnext, stop=pnext + 1)
for read in mate_gen:
if read.read_name.split('/')[0].split('#')[0] == read_name_base:
if AlignedSegment.is_read1 is not read.is_read1:
return read
[docs] def head(self, n=5, multiple_iterators=False):
""" List out the first **n** reads of the file.
This method is primarily used when doing an initial exploration
of the data. Whether or not `multiple_iterators` is used, cursor
position within the file will not change.
Note:
Using `multiple_interators` opens a new file object of the
same file currently in use and, thus, impacts the memory
footprint of your analysis.
Args:
n (int): number of aligned reads to print (default: 5)
mutliple_iterators (bool): Whether to use current file object or create a new one (default: False).
Returns:
head_reads (:py:obj:`list` of :py:obj:`AlignedSegment`): list of **n** reads from the front of the BAM file
Example:
>>> bam = bamnostic.AlignmentFile(bamnostic.example_bam, 'rb')
>>> bam.head(n=5, multiple_iterators = False)[0] # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
EAS56_57:6:190:289:82 ... MF:C:192
>>> bam.head(n = 5)[1] # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
EAS56_57:6:190:289:82 ... UQ:C:0
"""
if multiple_iterators:
head_iter = bamnostic.AlignmentFile(self._handle.name, index_filename=self._index_path)
else:
curr_pos = self.tell()
# BAMheader uses byte specific positions (and not BGZF virtual offsets)
# self._handle.seek(self._header._BAMheader_end)
self._load_block(self._header._BAMheader_end)
head_iter = self
head_reads = [next(head_iter) for read in range(n)]
if multiple_iterators:
# close the independent file object
head_iter.close()
else:
# otherwise, just go back to old position
self.seek(curr_pos)
assert self.tell() == curr_pos
return head_reads
def __next__(self):
"""Return the next line (Py2 Compatibility)."""
read = bamnostic.AlignedSegment(self)
if not read:
raise StopIteration
return read
[docs] def next(self):
"""Return the next line."""
read = bamnostic.AlignedSegment(self)
if not read:
return
return read
[docs] def seekable(self):
"""Return True indicating the BGZF supports random access.
Note:
Modified from original Bio.BgzfReader: checks to see if BAM
file has associated index file (BAI)
"""
return self._check_idx
def _sam_header_to_ref_list(header):
"""Modified code from Peter Cock's [BamWriter](https://github.com/peterjc/biopython/blob/SamBam2015/Bio/Sequencing/SamBam/__init__.py#L1714)
Removes blank lines, ensures trailing newlines, spots some errors.
"""
references = []
for line in header.split("\n"):
if line.startswith("@SQ\t"):
r = None
l = None
for part in line[3:].rstrip().split("\t"):
if part.startswith("SN:"):
r = part[3:]
elif part.startswith("LN:"):
l = int(part[3:])
if r is None or l is None:
raise ValueError("Malformed @SQ header (SN and LN required):\n%r"
% line)
references.append((r, l))
return references
def _ref_list_to_sam_header(references):
"""Modified code from Peter Cock's [BamWriter](https://github.com/peterjc/biopython/blob/SamBam2015/Bio/Sequencing/SamBam/__init__.py#L1714)
Removes blank lines, ensures trailing newlines, spots some errors.
"""
return "".join(["@SQ\tSN:%s\tLN:%i\n" % (name, length) for name, length in references])
def _check_header_text(text):
"""Modified code from Peter Cock's [BamWriter](https://github.com/peterjc/biopython/blob/SamBam2015/Bio/Sequencing/SamBam/__init__.py#L1714)
Removes blank lines, ensures trailing newlines, spots some errors.
"""
lines = []
for line in text.split("\n"):
if not line:
continue
# raise ValueError("Blank line in header")
elif line[0] != "@":
raise ValueError("SAM header lines must start with @, not %s" % line)
lines.append(line + "\n")
return "".join(lines)
def _cross_check_header_refs(reads, header="", referencenames = None, referencelengths = None):
"""Modified code from Peter Cock's [BamWriter](https://github.com/peterjc/biopython/blob/SamBam2015/Bio/Sequencing/SamBam/__init__.py#L1714)
"""
if not header:
# If the reads argument is a SamIterator or BamIterator this works:
try:
header = reads.text
except AttributeError:
pass
if referencenames is not None or referencelengths is not None:
assert len(referencenames) == len(referencelengths)
references = zip(referencenames, referencelengths)
alt_refs = _sam_header_to_ref_list(header)
if not alt_refs:
# Append minimal @SQ lines to the header
header += _ref_list_to_sam_header(references)
elif alt_refs != references:
raise ValueError("Reference names and lengths inconsistent with header @SQ lines")
else:
references = _sam_header_to_ref_list(header)
return header, references
[docs]class BamWriter(bgzf.BgzfWriter):
def __init__(self, filepath_or_object, mode = 'xb', compresslevel = 6, ignore_overwrite = False,
copy_header = None, header = b'', reference_names = None, reference_lengths = None):
"""Useful class for writing BAM files.
Note:
As of right now, it is tuned for working within a pipeline where reads
are being filtered or modified from another BAM file.
Args:
filepath_or_object (str | :py:obj:`file`): the path or file object of the BAM file
mode (str): Mode for writing. BAM files are binary by nature (default: 'xb')
compresslevel (int): desired level of Gzip compression (default: 6)
ignore_overwrite (bool): whether or not to ignore overwrite detection
copy_header (filepath_or_object): copy the header from another BAM file
header (bytes): SAM-style header
reference_names (:py:obj:`list` of :py:obj:`str`): list of all reference names used in file
reference_lengths (:py:obj:`list` of :py:obj:`int`): list of all reference lengths used in file
"""
if 'b' not in mode:
raise IOError("BAM files must be written in binary mode, try '{}b'".format(mode))
if 'r' in mode and not '+' in mode:
raise IOError("bamnostic.bam.BamWriter cannot operate in reading (r) mode")
if type(filepath_or_object) is str:
# Make sure the user knows they will be overwriting the data
if 'w' in mode and not ignore_overwrite and os.path.isfile(filepath_or_object):
if yes_no('{} already exists. Going further will overwrite the file'.format(filepath_or_object)):
super(BamWriter, self).__init__(filepath_or_object, mode=mode, compresslevel = compresslevel)
else:
raise FileExistsError('User declined overwrite')
# If they know beforehand, let them overwrite
elif 'w' in mode and ignore_overwrite:
super(BamWriter, self).__init__(filepath_or_object, mode=mode, compresslevel = compresslevel)
# Just a new file
elif 'w' in mode:
super(BamWriter, self).__init__(filepath_or_object, mode=mode, compresslevel = compresslevel)
# Pluck off the EOF block for appended files and make sure to not re-write header
elif 'a' in mode:
with open(filepath_or_object, 'r+') as appended:
appended.seek(-28, os.SEEK_END)
appended.truncate()
super(BamWriter, self).__init__(filepath_or_object, mode=mode, compresslevel = compresslevel)
return None
# All other options
elif 'x' in mode or '+' in mode:
super(BamWriter, self).__init__(filepath_or_object, mode=mode, compresslevel = compresslevel)
# Make sure to not re-write header
if 'r' in mode:
return None
# If they already opened the file, let them do whatever they want
elif isinstance(filepath_or_object, io.IOBase):
super(BamWriter, self).__init__(self, filepath_or_object, mode=mode, compresslevel=compresslevel)
self.write_header(copy_header = copy_header, header = header,
reference_names = reference_names,
reference_lengths = reference_lengths)
[docs] def write(self, data):
if len(data) + len(self._buffer) > 65536 and len(self._buffer) != 0:
self.flush()
super(BamWriter, self).write(data)