Source code for pamqp.frame

# -*- encoding: utf-8 -*-
"""Manage the marshaling and unmarshaling of AMQP frames

unmarshal will turn a raw AMQP byte stream into the appropriate AMQP objects
from the specification file.

marshal will take an object created from the specification file and turn it
into a raw byte stream.

"""
import logging
import struct

from pamqp import body
from pamqp import decode
from pamqp import exceptions
from pamqp import heartbeat
from pamqp import header
from pamqp import specification
from pamqp import PYTHON3

AMQP = b'AMQP'
FRAME_HEADER_SIZE = 7
FRAME_END_CHAR = chr(specification.FRAME_END)
DECODE_FRAME_END_CHAR = FRAME_END_CHAR
if PYTHON3:
    FRAME_END_CHAR = bytes((specification.FRAME_END,))
    DECODE_FRAME_END_CHAR = specification.FRAME_END
LOGGER = logging.getLogger(__name__)
UNMARSHAL_FAILURE = 0, 0, None


[docs]def unmarshal(data_in): """Takes in binary data and maps builds the appropriate frame type, returning a frame object. :param bytes data_in: Raw byte stream data :rtype: tuple of bytes consumed, channel, and a frame object :raises: specification.FrameError """ # Look to see if it's a protocol header frame try: frame_value = _unmarshal_protocol_header_frame(data_in) if frame_value: return 8, 0, frame_value except ValueError as error: raise exceptions.UnmarshalingException(header.ProtocolHeader, error) # Decode the low level frame and break it into parts frame_type, channel_id, frame_size = _frame_parts(data_in) # Heartbeats do not have frame length indicators if frame_type == specification.FRAME_HEARTBEAT and frame_size == 0: return 8, channel_id, heartbeat.Heartbeat() if not frame_size: raise exceptions.UnmarshalingException('Unknown', 'No frame size') byte_count = FRAME_HEADER_SIZE + frame_size + 1 if byte_count > len(data_in): raise exceptions.UnmarshalingException('Unknown', 'Not all data received') if data_in[byte_count - 1] != DECODE_FRAME_END_CHAR: raise exceptions.UnmarshalingException('Unknown', 'Last byte error') frame_data = data_in[FRAME_HEADER_SIZE:byte_count - 1] # Decode a method frame if frame_type == specification.FRAME_METHOD: return byte_count, channel_id, _unmarshal_method_frame(frame_data) # Decode a header frame elif frame_type == specification.FRAME_HEADER: return byte_count, channel_id, _unmarshal_header_frame(frame_data) # Decode a body frame elif frame_type == specification.FRAME_BODY: return byte_count, channel_id, _unmarshal_body_frame(frame_data) raise exceptions.UnmarshalingException( 'Unknown', 'Unknown frame type: {}'.format(frame_type))
[docs]def marshal(frame_value, channel_id): """Marshal a frame to be sent over the wire. :param frame_value: The frame object to marshal :type frame_value: pamqp.specification.Frame or pamqp.heartbeat.Heartbeat :param int channel_id: The channel number to send the frame on :rtype: str :raises: ValueError """ if isinstance(frame_value, header.ProtocolHeader): return frame_value.marshal() elif isinstance(frame_value, specification.Frame): return _marshal_method_frame(frame_value, channel_id) elif isinstance(frame_value, header.ContentHeader): return _marshal_content_header_frame(frame_value, channel_id) elif isinstance(frame_value, body.ContentBody): return _marshal_content_body_frame(frame_value, channel_id) elif isinstance(frame_value, heartbeat.Heartbeat): return frame_value.marshal() raise ValueError('Could not determine frame type: {}'.format(frame_value))
def _unmarshal_protocol_header_frame(data_in): """Attempt to unmarshal a protocol header frame The ProtocolHeader is abbreviated in size and functionality compared to the rest of the frame types, so return UNMARSHAL_ERROR doesn't apply as cleanly since we don't have all of the attributes to return even regardless of success or failure. :param bytes data_in: Raw byte stream data :rtype: header.ProtocolHeader :raises: ValueError """ # Do the first four bytes match? if data_in[0:4] == AMQP: frame = header.ProtocolHeader() frame.unmarshal(data_in) return frame def _unmarshal_method_frame(frame_data): """Attempt to unmarshal a method frame :param bytes frame_data: Raw frame data to assign to our method frame :return tuple: Amount of data consumed and the frame object """ # Get the Method Index from the class data bytes_used, method_index = decode.long_int(frame_data[0:4]) # Create an instance of the method object we're going to unmarshal try: method = specification.INDEX_MAPPING[method_index]() except KeyError: raise exceptions.UnmarshalingException( 'Unknown', 'Unknown method index: {}'.format(str(method_index))) # Unmarshal the data try: method.unmarshal(frame_data[bytes_used:]) except struct.error as error: raise exceptions.UnmarshalingException(method, error) # Unmarshal the data in the object and return it return method def _unmarshal_header_frame(frame_data): """Attempt to unmarshal a header frame :param bytes frame_data: Raw frame data to assign to our header frame :return tuple: Amount of data consumed and the frame object """ content_header = header.ContentHeader() try: content_header.unmarshal(frame_data) except struct.error as error: raise exceptions.UnmarshalingException('ContentHeader', error) return content_header def _unmarshal_body_frame(frame_data): """Attempt to unmarshal a body frame :param bytes frame_data: Raw frame data to assign to our body frame :return tuple: Amount of data consumed and the frame object """ content_body = body.ContentBody() content_body.unmarshal(frame_data) return content_body def _frame_parts(data_in): """Try and decode a low-level AMQP frame and return the parts of the frame. :param bytes data_in: Raw byte stream data :return tuple: frame type, channel number, and frame data to decode """ # Get the Frame Type, Channel Number and Frame Size try: return struct.unpack('>BHI', data_in[0:FRAME_HEADER_SIZE]) except struct.error: # We didn't get a full frame return UNMARSHAL_FAILURE def _marshal(frame_type, channel_id, payload): """Marshal the low-level AMQ frame. :param int frame_type: The frame type to marshal :param int channel_id: The channel it will be sent on :param bytes|bytes payload: The frame payload :rtype: str or bytes """ return b''.join([struct.pack('>BHI', frame_type, channel_id, len(payload)), payload, FRAME_END_CHAR]) def _marshal_content_body_frame(frame_value, channel_id): """Marshal as many content body frames as needed to transmit the content :param body.ContentBody frame_value: Frame object to marshal :param int channel_id: The channel number for the frame(s) :rtype: str """ return _marshal(specification.FRAME_BODY, channel_id, frame_value.marshal()) def _marshal_content_header_frame(frame_value, channel_id): """Marshal a content header frame :param header.ContentHeader frame_value: Frame object to marshal :param int channel_id: The channel number for the frame :rtype: str """ return _marshal(specification.FRAME_HEADER, channel_id, frame_value.marshal()) def _marshal_method_frame(frame_value, channel_id): """Marshal a method frame :param specification.Frame frame_value: Frame object to marshal :param int channel_id: The channel number for the frame :rtype: str """ return _marshal(specification.FRAME_METHOD, channel_id, struct.pack('>I', frame_value.index) + frame_value.marshal())