Source code for bamnostic.bgzf

import sys
import zlib
import struct
import io
import os
import warnings
import array
import re

import bamnostic
from bamnostic.utils import *

_PY_VERSION = sys.version_info

if _PY_VERSION[0] == 2:
    from io import open


def _format_warnings(message, category, filename, lineno, file=None, line=None):
    """ Warning formatter

    Args:
        message: warning message
        category (str): level of warning
        filename (str): path for warning output
        lineno (int): Where the warning originates

    Returns:
        Formatted warning for logging purposes

    """
    return ' {}:{}:{}: {}\n'.format(category.__name__, filename, lineno, message)


warnings.formatwarning = _format_warnings

# Constants used in BGZF format
_bgzf_magic = b"\x1f\x8b\x08\x04"  # First 4 bytes of BAM file
_bgzf_header = b"\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\x06\x00\x42\x43\x02\x00"  # Ideal GZIP header
_bgzf_eof = b"\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\x06\x00BC\x02\x00\x1b\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00"  # 28 null byte signature at the end of a non-truncated BAM file
_bytes_BC = b"BC"  # "Payload" or Subfield Identifiers 1 & 2 of GZIP header


def _as_bytes(s):
    """ Used to ensure string is treated as bytes

    The output

    Args:
        s (str): string to convert to bytes

    Returns:
        byte-encoded string

    Example:
        >>> str(_as_bytes('Hello, World').decode()) # Duck typing to check for byte-type object
        'Hello, World'

    """
    if isinstance(s, bytes):
        return s
    return bytes(s, encoding='latin_1')


# Helper compiled structures
unpack_gzip_header = struct.Struct('<4BI2BH').unpack
_gzip_header_size = struct.calcsize('<4BI2BH')

unpack_subfields = struct.Struct('<2s2H').unpack
_subfield_size = struct.calcsize('<2s2H')

unpack_gzip_integrity = struct.Struct('<2I').unpack
_integrity_size = struct.calcsize('<2I')

unpack_bgzf_metaheader = struct.Struct('<4BI2BH2BH').unpack
_metaheader_size = struct.calcsize('<4BI2BH2BH')


def _bgzf_metaheader(handle):
    """ Pull out the metadata header for a BGZF block

    BAM files essentially concatenated GZIP blocks put together into a cohesive file
    format. The caveat to this is that the GZIP blocks are specially formatted to contain
    metadata that indicates them as being part of a larger BAM file. Due to these specifications,
    these blocks are identified as BGZF blocks. Listed below are those specifications. These
    specifications can be found `here <https://samtools.github.io/hts-specs/SAMv1.pdf>`_.

    The following GZIP fields are to have these expected values:

    ==================== ===========  ===========
    Field:               Label:       Exp.Value:
    ==================== ===========  ===========
    Identifier1          **ID1**      31
    Identifier2          **ID2**      139
    Compression Method   **CM**       8
    Flags                **FLG**      4
    Subfield Identifier1 **SI1**      66
    Subfield Identifier2 **SI2**      67
    Subfield Length      **SLEN**     2
    ==================== ===========  ===========

    Args:
        handle (:py:obj:`file`): Open BAM file object

    Returns:
        :py:obj:`tuple` of (:py:obj:`tuple`, :py:obj:`bytes`): the unpacked metadata and its raw bytestring

    Raises:
        ValueError: if the header does not match expected values

    .. _here: https://samtools.github.io/hts-specs/SAMv1.pdf

    """
    meta_raw = handle.read(_metaheader_size)
    meta = unpack_bgzf_metaheader(meta_raw)
    ID1, ID2, CM, FLG, MTIME, XFL, OS, XLEN, SI1, SI2, SLEN = meta

    # check the header integrity
    checks = [
        ID1 == 31,
        ID2 == 139,
        CM == 8,
        FLG == 4 ,
        SI1 == 66,
        SI2 == 67,
        SLEN == 2
        ]

    if not all(checks):
        raise ValueError('Malformed BGZF block')

    return meta, meta_raw


[docs]def get_block(handle, offset=0): r""" Pulls out entire GZIP block Used primarily for copying the header block of a BAM file. However, it can be used to copy any BGZF block within a BAM file that starts at the given offset. Note: Does not progress file cursor position. Args: handle (:py:obj:`file`): open BAM file offset (int): offset of BGZF block (default: 0) Returns: Complete BGZF block Raises: ValueError: if the BGZF block header is malformed Example: >>> bam_header = get_block(bamnostic.example_bam) >>> try: ... bam_header.startswith(b'\x1f\x8b\x08\x04') ... except SyntaxError: ... bam_header.startswith('\x1f\x8b\x08\x04') True """ if isinstance(handle, bamnostic.core.AlignmentFile): handle = handle._handle.name # get the raw file object, not wrapper elif type(handle) == str: handle = handle with open(handle, 'rb') as header_handle: header_handle.seek(offset) # get to the start of the BGZF block # Capture raw bytes of metadata header _, meta_raw = _bgzf_metaheader(header_handle) BSIZE_raw = header_handle.read(2) BSIZE = struct.unpack('<H', BSIZE_raw)[0] # capture the CRC32 and ISIZE fields in addition to compressed data # 6 = XLEN, 19 = spec offset, 8 = CRC32 & ISIZE -> -5 block_tail = header_handle.read(BSIZE - 5) return meta_raw + BSIZE_raw + block_tail
def _load_bgzf_block(handle): r"""Load the next BGZF block of compressed data (PRIVATE). BAM files essentially concatenated GZIP blocks put together into a cohesive file format. The caveat to this is that the GZIP blocks are specially formatted to contain metadata that indicates them as being part of a larger BAM file. Due to these specifications, these blocks are identified as BGZF blocks. Args: handle (:py:obj:`file`): open BAM file Returns: deflated GZIP data Raises: ValueError: if CRC32 or ISIZE do not match deflated data Example: >>> with open(bamnostic.example_bam,'rb') as bam: ... block = _load_bgzf_block(bam) ... try: ... block[0] == 53 and block[1].startswith(b'BAM\x01') ... except TypeError: ... block[0] == 53 and block[1].startswith('BAM\x01') True """ # Pull in the BGZF block header information header, _ = _bgzf_metaheader(handle) XLEN = header[-4] BSIZE = unpack('<H', handle) # Expose the compressed data d_size = BSIZE - XLEN - 19 d_obj = zlib.decompressobj(-15) data = d_obj.decompress(handle.read(d_size)) + d_obj.flush() # Checking data integrity CRC32, ISIZE = unpack_gzip_integrity(handle.read(_integrity_size)) deflated_crc = zlib.crc32(data) if deflated_crc < 0: deflated_crc = deflated_crc % (1 << 32) if CRC32 != deflated_crc: raise ValueError('CRCs are not equal: is {}, not {}'.format(CRC32, deflated_crc)) if ISIZE != len(data): raise ValueError('unequal uncompressed data size') return BSIZE + 1, data
[docs]class BgzfReader(object): """ The BAM reader. Heavily modified from Peter Cock's BgzfReader. """ def __init__(self, filepath_or_object, mode="rb", max_cache=None, filename=None, duplicate_filehandle=None, ignore_truncation=False): """Initialize the class. Args: filepath_or_object (str | :py:obj:`file`): the path or file object of the BAM file mode (str): Mode for reading. BAM files are binary by nature (default: 'rb'). max_cache (int): number of desired LRU cache size, preferably a multiple of 2 (default: 128). filename (str | :py:obj:`file`): synonym for `filepath_or_object` duplicate_filehandle (bool): Not implemented. Raises warning if True. ignore_truncation (bool): Whether or not to allow trucated file processing (default: False). """ # Set up the LRU buffer dictionary if max_cache is None: pass elif max_cache < 1: raise ValueError("Use max_cache with a minimum of 1") self._buffers = LruDict(max_cache=max_cache) # handle contradictory arguments caused by synonyms if filepath_or_object and filename and filename != filepath_or_object: raise ValueError('filepath_or_object and filename parameters do not match. Try using only one') elif filepath_or_object: pass else: if filename: filepath_or_object = filename else: raise ValueError('either filepath_or_object or filename must be set') # Check to see if file object or path was passed if isinstance(filepath_or_object, io.IOBase): handle = filepath_or_object else: handle = open(filepath_or_object, "rb") self._text = "b" not in mode.lower() if 'b' not in mode.lower(): raise IOError('BGZF file format requires binary mode ("rb")') # Connect to the BAM file self._handle = handle # Load the first block into the buffer and initialize cursor attributes self._block_start_offset = None self._block_raw_length = None self._load_block(handle.tell()) def _load_block(self, start_offset=None): """(PRIVATE) Used to load next BGZF block into the buffer, and orients the cursor position. Args: start_offset (int): byte offset of BGZF block (default: None) """ if start_offset is None: # If the file is being read sequentially, then _handle.tell() # should be pointing at the start of the next block. # However, if seek has been used, we can't assume that. start_offset = self._block_start_offset + self._block_raw_length if start_offset == self._block_start_offset: self._within_block_offset = 0 return elif start_offset in self._buffers: # Already in cache try: self._buffer, self._block_raw_length = self._buffers.get(start_offset) except TypeError: pass self._within_block_offset = 0 self._block_start_offset = start_offset return # Now load the block handle = self._handle if start_offset is not None: handle.seek(start_offset) self._block_start_offset = handle.tell() try: block_size, self._buffer = _load_bgzf_block(handle) except StopIteration: # EOF block_size = 0 if self._text: self._buffer = "" else: self._buffer = b'' self._within_block_offset = 0 self._block_raw_length = block_size # Finally save the block in our cache, self._buffers[self._block_start_offset] = self._buffer, block_size
[docs] def tell(self): """Return a 64-bit unsigned BGZF virtual offset.""" return make_virtual_offset(self._block_start_offset, self._within_block_offset)
[docs] def seek(self, virtual_offset): """Seek to a 64-bit unsigned BGZF virtual offset. A virtual offset is a composite number made up of the compressed offset (`coffset`) position of the start position of the BGZF block that the position originates within, and the uncompressed offset (`uoffset`) within the deflated BGZF block where the position starts. The virtual offset is defined as `virtual_offset = coffset << 16 | uoffset` Args: virtual_offset (int): 64-bit unsigned composite byte offset Returns: virtual_offset (int): an echo of the new position Raises: ValueError: if within block offset is more than block size AssertionError: if the start position is not the block start position Example: >>> bam = bamnostic.AlignmentFile(bamnostic.example_bam, 'rb') >>> bam.seek(10) 10 >>> bam.seek(bamnostic.utils.make_virtual_offset(0, 42)) Traceback (most recent call last): ... ValueError: Within offset 42 but block size only 38 """ # Do this inline to avoid a function call, # start_offset, within_block = split_virtual_offset(virtual_offset) start_offset = virtual_offset >> 16 within_block = virtual_offset ^ (start_offset << 16) if start_offset != self._block_start_offset: # Don't need to load the block if already there # (this avoids a function call since _load_block would do nothing) self._load_block(start_offset) assert start_offset == self._block_start_offset if within_block > len(self._buffer): if not (within_block == 0 and len(self._buffer) == 0): raise ValueError("Within offset {} but block size only {}".format(within_block, len(self._buffer))) self._within_block_offset = within_block return virtual_offset
[docs] def read(self, size=-1): """Read method for the BGZF module. Args: size (int): the number of bytes to read from file. Advances the cursor. Returns: data (:py:obj:`bytes`): byte string of length `size` Raises: NotImplementedError: if the user tries to read the whole file AssertionError: if read does not return any data """ if size < 0: raise NotImplementedError("Don't be greedy, that could be massive!") elif size == 0: if self._text: return "" else: return b"" # If size is a real positive integer else: if self._text: data = "" else: data = b"" # Avoids recursion, but will still effectively get all the data # for both long and short reads. It doesn't matter if it overflows # to multiple blocks, or fully contained within one. while size: if self._within_block_offset + size <= len(self._buffer): # This may leave us right at the end of a block # (lazy loading, don't load the next block unless we have too) data += self._buffer[self._within_block_offset:self._within_block_offset + size] self._within_block_offset += size assert data # Must be at least 1 byte return data else: # if read data overflows to next block # pull in rest of data in current block sub_data = self._buffer[self._within_block_offset:] data += sub_data # decrement size so that we only pull the rest of the data # from next block size -= len(sub_data) self._load_block() # will reset offsets if not self._buffer: return data # EOF else: # Only needed the end of the last block return data
[docs] def readline(self): """Read a single line for the BGZF file. Binary operations do not support `readline()`. Code is commented out for posterity sake """ raise NotImplementedError("Readline does not work on byte data")
def __iter__(self): """Iterate over the lines in the BGZF file.""" return self
[docs] def close(self): """Close BGZF file.""" self._handle.close() self._buffer = None self._block_start_offset = None self._buffers = None
[docs] def seekable(self): """Return True indicating the BGZF supports random access. Note: Modified from original Bio.BgzfReader: checks to see if BAM file has associated index file (BAI) """ return self._check_idx
[docs] def isatty(self): """Return True if connected to a TTY device.""" return False
[docs] def fileno(self): """Return integer file descriptor.""" return self._handle.fileno()
def __enter__(self): """Open a file operable with WITH statement.""" return self def __exit__(self, type, value, traceback): """Close a file with WITH statement.""" self.close()
[docs]class BgzfWriter(object): """Define a BGZFWriter object.""" def __init__(self, filepath_or_object, mode="wb", compresslevel=6): """Initialize the class.""" if isinstance(filepath_or_object, io.IOBase): handle = filepath_or_object else: handle = open(filepath_or_object, mode=mode) """ if fileobj: assert filename is None handle = fileobj else: if "w" not in mode.lower() and "a" not in mode.lower(): raise ValueError("Must use write or append mode, not %r" % mode) if "a" in mode.lower(): handle = open(filename, "ab") else: handle = open(filename, "wb") """ self._text = "b" not in mode.lower() self._handle = handle self._buffer = b"" self.compresslevel = compresslevel def _write_block(self, block): """Write provided data to file as a single BGZF compressed block (PRIVATE).""" # print("Saving %i bytes" % len(block)) start_offset = self._handle.tell() assert len(block) <= 65536 # Giving a negative window bits means no gzip/zlib headers, # -15 used in samtools c = zlib.compressobj(self.compresslevel, zlib.DEFLATED, -15, zlib.DEF_MEM_LEVEL, 0) compressed = c.compress(block) + c.flush() del c assert len(compressed) < 65536, \ "TODO - Didn't compress enough, try less data in this block" crc = zlib.crc32(block) # Should cope with a mix of Python platforms... if crc < 0: crc = struct.pack("<i", crc) else: crc = struct.pack("<I", crc) bsize = struct.pack("<H", len(compressed) + 25) # includes -1 crc = struct.pack("<I", zlib.crc32(block) & 0xffffffff) uncompressed_length = struct.pack("<I", len(block)) # Fixed 16 bytes, # gzip magic bytes (4) mod time (4), # gzip flag (1), os (1), extra length which is six (2), # sub field which is BC (2), sub field length of two (2), # Variable data, # 2 bytes: block length as BC sub field (2) # X bytes: the data # 8 bytes: crc (4), uncompressed data length (4) data = _bgzf_header + bsize + compressed + crc + uncompressed_length self._handle.write(data)
[docs] def write(self, data): """Write method for the class.""" # TODO - Check bytes vs unicode data = _as_bytes(data) # block_size = 2**16 = 65536 data_len = len(data) if len(self._buffer) + data_len < 65536: # print("Cached %r" % data) self._buffer += data return else: # print("Got %r, writing out some data..." % data) self._buffer += data while len(self._buffer) >= 65536: self._write_block(self._buffer[:65536]) self._buffer = self._buffer[65536:]
[docs] def flush(self): """Flush data explicitly.""" while len(self._buffer) >= 65536: self._write_block(self._buffer[:65535]) self._buffer = self._buffer[65535:] self._write_block(self._buffer) self._buffer = b"" self._handle.flush()
[docs] def close(self): """Flush data, write 28 bytes BGZF EOF marker, and close BGZF file. samtools will look for a magic EOF marker, just a 28 byte empty BGZF block, and if it is missing warns the BAM file may be truncated. In addition to samtools writing this block, so too does bgzip - so this implementation does too. """ if self._buffer: self.flush() self._handle.write(_bgzf_eof) self._handle.flush() self._handle.close()
[docs] def tell(self): """Return a BGZF 64-bit virtual offset.""" return make_virtual_offset(self._handle.tell(), len(self._buffer))
[docs] def seekable(self): """Return True indicating the BGZF supports random access.""" # Not seekable, but we do support tell... return False
[docs] def isatty(self): """Return True if connected to a TTY device.""" return False
[docs] def fileno(self): """Return integer file descriptor.""" return self._handle.fileno()
def __enter__(self): """Open a file operable with WITH statement.""" return self def __exit__(self, type, value, traceback): """Close a file with WITH statement.""" self.close()