import sys
import zlib
import struct
import io
import os
import warnings
import array
import re
import bamnostic
from bamnostic.utils import *
_PY_VERSION = sys.version_info
if _PY_VERSION[0] == 2:
from io import open
def _format_warnings(message, category, filename, lineno, file=None, line=None):
""" Warning formatter
Args:
message: warning message
category (str): level of warning
filename (str): path for warning output
lineno (int): Where the warning originates
Returns:
Formatted warning for logging purposes
"""
return ' {}:{}:{}: {}\n'.format(category.__name__, filename, lineno, message)
warnings.formatwarning = _format_warnings
# Constants used in BGZF format
_bgzf_magic = b"\x1f\x8b\x08\x04" # First 4 bytes of BAM file
_bgzf_header = b"\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\x06\x00\x42\x43\x02\x00" # Ideal GZIP header
_bgzf_eof = b"\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\x06\x00BC\x02\x00\x1b\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00" # 28 null byte signature at the end of a non-truncated BAM file
_bytes_BC = b"BC" # "Payload" or Subfield Identifiers 1 & 2 of GZIP header
def _as_bytes(s):
""" Used to ensure string is treated as bytes
The output
Args:
s (str): string to convert to bytes
Returns:
byte-encoded string
Example:
>>> str(_as_bytes('Hello, World').decode()) # Duck typing to check for byte-type object
'Hello, World'
"""
if isinstance(s, bytes):
return s
return bytes(s, encoding='latin_1')
# Helper compiled structures
unpack_gzip_header = struct.Struct('<4BI2BH').unpack
_gzip_header_size = struct.calcsize('<4BI2BH')
unpack_subfields = struct.Struct('<2s2H').unpack
_subfield_size = struct.calcsize('<2s2H')
unpack_gzip_integrity = struct.Struct('<2I').unpack
_integrity_size = struct.calcsize('<2I')
unpack_bgzf_metaheader = struct.Struct('<4BI2BH2BH').unpack
_metaheader_size = struct.calcsize('<4BI2BH2BH')
def _bgzf_metaheader(handle):
""" Pull out the metadata header for a BGZF block
BAM files essentially concatenated GZIP blocks put together into a cohesive file
format. The caveat to this is that the GZIP blocks are specially formatted to contain
metadata that indicates them as being part of a larger BAM file. Due to these specifications,
these blocks are identified as BGZF blocks. Listed below are those specifications. These
specifications can be found `here <https://samtools.github.io/hts-specs/SAMv1.pdf>`_.
The following GZIP fields are to have these expected values:
==================== =========== ===========
Field: Label: Exp.Value:
==================== =========== ===========
Identifier1 **ID1** 31
Identifier2 **ID2** 139
Compression Method **CM** 8
Flags **FLG** 4
Subfield Identifier1 **SI1** 66
Subfield Identifier2 **SI2** 67
Subfield Length **SLEN** 2
==================== =========== ===========
Args:
handle (:py:obj:`file`): Open BAM file object
Returns:
:py:obj:`tuple` of (:py:obj:`tuple`, :py:obj:`bytes`): the unpacked metadata and its raw bytestring
Raises:
ValueError: if the header does not match expected values
.. _here: https://samtools.github.io/hts-specs/SAMv1.pdf
"""
meta_raw = handle.read(_metaheader_size)
meta = unpack_bgzf_metaheader(meta_raw)
ID1, ID2, CM, FLG, MTIME, XFL, OS, XLEN, SI1, SI2, SLEN = meta
# check the header integrity
checks = [
ID1 == 31,
ID2 == 139,
CM == 8,
FLG == 4 ,
SI1 == 66,
SI2 == 67,
SLEN == 2
]
if not all(checks):
raise ValueError('Malformed BGZF block')
return meta, meta_raw
[docs]def get_block(handle, offset=0):
r""" Pulls out entire GZIP block
Used primarily for copying the header block of a BAM file. However,
it can be used to copy any BGZF block within a BAM file that starts at
the given offset.
Note:
Does not progress file cursor position.
Args:
handle (:py:obj:`file`): open BAM file
offset (int): offset of BGZF block (default: 0)
Returns:
Complete BGZF block
Raises:
ValueError: if the BGZF block header is malformed
Example:
>>> bam_header = get_block(bamnostic.example_bam)
>>> try:
... bam_header.startswith(b'\x1f\x8b\x08\x04')
... except SyntaxError:
... bam_header.startswith('\x1f\x8b\x08\x04')
True
"""
if isinstance(handle, bamnostic.core.AlignmentFile):
handle = handle._handle.name # get the raw file object, not wrapper
elif type(handle) == str:
handle = handle
with open(handle, 'rb') as header_handle:
header_handle.seek(offset) # get to the start of the BGZF block
# Capture raw bytes of metadata header
_, meta_raw = _bgzf_metaheader(header_handle)
BSIZE_raw = header_handle.read(2)
BSIZE = struct.unpack('<H', BSIZE_raw)[0]
# capture the CRC32 and ISIZE fields in addition to compressed data
# 6 = XLEN, 19 = spec offset, 8 = CRC32 & ISIZE -> -5
block_tail = header_handle.read(BSIZE - 5)
return meta_raw + BSIZE_raw + block_tail
def _load_bgzf_block(handle):
r"""Load the next BGZF block of compressed data (PRIVATE).
BAM files essentially concatenated GZIP blocks put together into a cohesive file
format. The caveat to this is that the GZIP blocks are specially formatted to contain
metadata that indicates them as being part of a larger BAM file. Due to these specifications,
these blocks are identified as BGZF blocks.
Args:
handle (:py:obj:`file`): open BAM file
Returns:
deflated GZIP data
Raises:
ValueError: if CRC32 or ISIZE do not match deflated data
Example:
>>> with open(bamnostic.example_bam,'rb') as bam:
... block = _load_bgzf_block(bam)
... try:
... block[0] == 53 and block[1].startswith(b'BAM\x01')
... except TypeError:
... block[0] == 53 and block[1].startswith('BAM\x01')
True
"""
# Pull in the BGZF block header information
header, _ = _bgzf_metaheader(handle)
XLEN = header[-4]
BSIZE = unpack('<H', handle)
# Expose the compressed data
d_size = BSIZE - XLEN - 19
d_obj = zlib.decompressobj(-15)
data = d_obj.decompress(handle.read(d_size)) + d_obj.flush()
# Checking data integrity
CRC32, ISIZE = unpack_gzip_integrity(handle.read(_integrity_size))
deflated_crc = zlib.crc32(data)
if deflated_crc < 0:
deflated_crc = deflated_crc % (1 << 32)
if CRC32 != deflated_crc:
raise ValueError('CRCs are not equal: is {}, not {}'.format(CRC32, deflated_crc))
if ISIZE != len(data):
raise ValueError('unequal uncompressed data size')
return BSIZE + 1, data
[docs]class BgzfReader(object):
""" The BAM reader. Heavily modified from Peter Cock's BgzfReader.
"""
def __init__(self, filepath_or_object, mode="rb", max_cache=None,
filename=None, duplicate_filehandle=None, ignore_truncation=False):
"""Initialize the class.
Args:
filepath_or_object (str | :py:obj:`file`): the path or file object of the BAM file
mode (str): Mode for reading. BAM files are binary by nature (default: 'rb').
max_cache (int): number of desired LRU cache size, preferably a multiple of 2 (default: 128).
filename (str | :py:obj:`file`): synonym for `filepath_or_object`
duplicate_filehandle (bool): Not implemented. Raises warning if True.
ignore_truncation (bool): Whether or not to allow trucated file processing (default: False).
"""
# Set up the LRU buffer dictionary
if max_cache is None:
pass
elif max_cache < 1:
raise ValueError("Use max_cache with a minimum of 1")
self._buffers = LruDict(max_cache=max_cache)
# handle contradictory arguments caused by synonyms
if filepath_or_object and filename and filename != filepath_or_object:
raise ValueError('filepath_or_object and filename parameters do not match. Try using only one')
elif filepath_or_object:
pass
else:
if filename:
filepath_or_object = filename
else:
raise ValueError('either filepath_or_object or filename must be set')
# Check to see if file object or path was passed
if isinstance(filepath_or_object, io.IOBase):
handle = filepath_or_object
else:
handle = open(filepath_or_object, "rb")
self._text = "b" not in mode.lower()
if 'b' not in mode.lower():
raise IOError('BGZF file format requires binary mode ("rb")')
# Connect to the BAM file
self._handle = handle
# Load the first block into the buffer and initialize cursor attributes
self._block_start_offset = None
self._block_raw_length = None
self._load_block(handle.tell())
def _load_block(self, start_offset=None):
"""(PRIVATE) Used to load next BGZF block into the buffer, and orients the cursor position.
Args:
start_offset (int): byte offset of BGZF block (default: None)
"""
if start_offset is None:
# If the file is being read sequentially, then _handle.tell()
# should be pointing at the start of the next block.
# However, if seek has been used, we can't assume that.
start_offset = self._block_start_offset + self._block_raw_length
if start_offset == self._block_start_offset:
self._within_block_offset = 0
return
elif start_offset in self._buffers:
# Already in cache
try:
self._buffer, self._block_raw_length = self._buffers.get(start_offset)
except TypeError:
pass
self._within_block_offset = 0
self._block_start_offset = start_offset
return
# Now load the block
handle = self._handle
if start_offset is not None:
handle.seek(start_offset)
self._block_start_offset = handle.tell()
try:
block_size, self._buffer = _load_bgzf_block(handle)
except StopIteration:
# EOF
block_size = 0
if self._text:
self._buffer = ""
else:
self._buffer = b''
self._within_block_offset = 0
self._block_raw_length = block_size
# Finally save the block in our cache,
self._buffers[self._block_start_offset] = self._buffer, block_size
[docs] def tell(self):
"""Return a 64-bit unsigned BGZF virtual offset."""
return make_virtual_offset(self._block_start_offset, self._within_block_offset)
[docs] def seek(self, virtual_offset):
"""Seek to a 64-bit unsigned BGZF virtual offset.
A virtual offset is a composite number made up of the compressed
offset (`coffset`) position of the start position of the BGZF block that
the position originates within, and the uncompressed offset (`uoffset`)
within the deflated BGZF block where the position starts. The virtual offset
is defined as
`virtual_offset = coffset << 16 | uoffset`
Args:
virtual_offset (int): 64-bit unsigned composite byte offset
Returns:
virtual_offset (int): an echo of the new position
Raises:
ValueError: if within block offset is more than block size
AssertionError: if the start position is not the block start position
Example:
>>> bam = bamnostic.AlignmentFile(bamnostic.example_bam, 'rb')
>>> bam.seek(10)
10
>>> bam.seek(bamnostic.utils.make_virtual_offset(0, 42))
Traceback (most recent call last):
...
ValueError: Within offset 42 but block size only 38
"""
# Do this inline to avoid a function call,
# start_offset, within_block = split_virtual_offset(virtual_offset)
start_offset = virtual_offset >> 16
within_block = virtual_offset ^ (start_offset << 16)
if start_offset != self._block_start_offset:
# Don't need to load the block if already there
# (this avoids a function call since _load_block would do nothing)
self._load_block(start_offset)
assert start_offset == self._block_start_offset
if within_block > len(self._buffer):
if not (within_block == 0 and len(self._buffer) == 0):
raise ValueError("Within offset {} but block size only {}".format(within_block, len(self._buffer)))
self._within_block_offset = within_block
return virtual_offset
[docs] def read(self, size=-1):
"""Read method for the BGZF module.
Args:
size (int): the number of bytes to read from file. Advances the cursor.
Returns:
data (:py:obj:`bytes`): byte string of length `size`
Raises:
NotImplementedError: if the user tries to read the whole file
AssertionError: if read does not return any data
"""
if size < 0:
raise NotImplementedError("Don't be greedy, that could be massive!")
elif size == 0:
if self._text:
return ""
else:
return b""
# If size is a real positive integer
else:
if self._text:
data = ""
else:
data = b""
# Avoids recursion, but will still effectively get all the data
# for both long and short reads. It doesn't matter if it overflows
# to multiple blocks, or fully contained within one.
while size:
if self._within_block_offset + size <= len(self._buffer):
# This may leave us right at the end of a block
# (lazy loading, don't load the next block unless we have too)
data += self._buffer[self._within_block_offset:self._within_block_offset + size]
self._within_block_offset += size
assert data # Must be at least 1 byte
return data
else:
# if read data overflows to next block
# pull in rest of data in current block
sub_data = self._buffer[self._within_block_offset:]
data += sub_data
# decrement size so that we only pull the rest of the data
# from next block
size -= len(sub_data)
self._load_block() # will reset offsets
if not self._buffer:
return data # EOF
else:
# Only needed the end of the last block
return data
[docs] def readline(self):
"""Read a single line for the BGZF file.
Binary operations do not support `readline()`. Code is commented
out for posterity sake
"""
raise NotImplementedError("Readline does not work on byte data")
def __iter__(self):
"""Iterate over the lines in the BGZF file."""
return self
[docs] def close(self):
"""Close BGZF file."""
self._handle.close()
self._buffer = None
self._block_start_offset = None
self._buffers = None
[docs] def seekable(self):
"""Return True indicating the BGZF supports random access.
Note:
Modified from original Bio.BgzfReader: checks to see if BAM
file has associated index file (BAI)
"""
return self._check_idx
[docs] def isatty(self):
"""Return True if connected to a TTY device."""
return False
[docs] def fileno(self):
"""Return integer file descriptor."""
return self._handle.fileno()
def __enter__(self):
"""Open a file operable with WITH statement."""
return self
def __exit__(self, type, value, traceback):
"""Close a file with WITH statement."""
self.close()
[docs]class BgzfWriter(object):
"""Define a BGZFWriter object."""
def __init__(self, filepath_or_object, mode="wb", compresslevel=6):
"""Initialize the class."""
if isinstance(filepath_or_object, io.IOBase):
handle = filepath_or_object
else:
handle = open(filepath_or_object, mode=mode)
"""
if fileobj:
assert filename is None
handle = fileobj
else:
if "w" not in mode.lower() and "a" not in mode.lower():
raise ValueError("Must use write or append mode, not %r" % mode)
if "a" in mode.lower():
handle = open(filename, "ab")
else:
handle = open(filename, "wb")
"""
self._text = "b" not in mode.lower()
self._handle = handle
self._buffer = b""
self.compresslevel = compresslevel
def _write_block(self, block):
"""Write provided data to file as a single BGZF compressed block (PRIVATE)."""
# print("Saving %i bytes" % len(block))
start_offset = self._handle.tell()
assert len(block) <= 65536
# Giving a negative window bits means no gzip/zlib headers,
# -15 used in samtools
c = zlib.compressobj(self.compresslevel,
zlib.DEFLATED,
-15,
zlib.DEF_MEM_LEVEL,
0)
compressed = c.compress(block) + c.flush()
del c
assert len(compressed) < 65536, \
"TODO - Didn't compress enough, try less data in this block"
crc = zlib.crc32(block)
# Should cope with a mix of Python platforms...
if crc < 0:
crc = struct.pack("<i", crc)
else:
crc = struct.pack("<I", crc)
bsize = struct.pack("<H", len(compressed) + 25) # includes -1
crc = struct.pack("<I", zlib.crc32(block) & 0xffffffff)
uncompressed_length = struct.pack("<I", len(block))
# Fixed 16 bytes,
# gzip magic bytes (4) mod time (4),
# gzip flag (1), os (1), extra length which is six (2),
# sub field which is BC (2), sub field length of two (2),
# Variable data,
# 2 bytes: block length as BC sub field (2)
# X bytes: the data
# 8 bytes: crc (4), uncompressed data length (4)
data = _bgzf_header + bsize + compressed + crc + uncompressed_length
self._handle.write(data)
[docs] def write(self, data):
"""Write method for the class."""
# TODO - Check bytes vs unicode
data = _as_bytes(data)
# block_size = 2**16 = 65536
data_len = len(data)
if len(self._buffer) + data_len < 65536:
# print("Cached %r" % data)
self._buffer += data
return
else:
# print("Got %r, writing out some data..." % data)
self._buffer += data
while len(self._buffer) >= 65536:
self._write_block(self._buffer[:65536])
self._buffer = self._buffer[65536:]
[docs] def flush(self):
"""Flush data explicitly."""
while len(self._buffer) >= 65536:
self._write_block(self._buffer[:65535])
self._buffer = self._buffer[65535:]
self._write_block(self._buffer)
self._buffer = b""
self._handle.flush()
[docs] def close(self):
"""Flush data, write 28 bytes BGZF EOF marker, and close BGZF file.
samtools will look for a magic EOF marker, just a 28 byte empty BGZF
block, and if it is missing warns the BAM file may be truncated. In
addition to samtools writing this block, so too does bgzip - so this
implementation does too.
"""
if self._buffer:
self.flush()
self._handle.write(_bgzf_eof)
self._handle.flush()
self._handle.close()
[docs] def tell(self):
"""Return a BGZF 64-bit virtual offset."""
return make_virtual_offset(self._handle.tell(), len(self._buffer))
[docs] def seekable(self):
"""Return True indicating the BGZF supports random access."""
# Not seekable, but we do support tell...
return False
[docs] def isatty(self):
"""Return True if connected to a TTY device."""
return False
[docs] def fileno(self):
"""Return integer file descriptor."""
return self._handle.fileno()
def __enter__(self):
"""Open a file operable with WITH statement."""
return self
def __exit__(self, type, value, traceback):
"""Close a file with WITH statement."""
self.close()