"""Manage the marshaling and unmarshaling of AMQP frames
unmarshal will turn a raw AMQP byte stream into the appropriate AMQP objects
from the specification file.
marshal will take an object created from the specification file and turn it
into a raw byte stream.
"""
import logging
import struct
from pamqp import body
from pamqp import codec
from pamqp import exceptions
from pamqp import heartbeat
from pamqp import header
from pamqp import specification
from pamqp import PYTHON3
AMQP = b'AMQP' if PYTHON3 else 'AMQP'
FRAME_HEADER_SIZE = 7
FRAME_END_CHAR = chr(specification.FRAME_END)
DECODE_FRAME_END_CHAR = FRAME_END_CHAR
if PYTHON3:
FRAME_END_CHAR = bytes(FRAME_END_CHAR, 'latin-1')
DECODE_FRAME_END_CHAR = specification.FRAME_END
LOGGER = logging.getLogger(__name__)
UNMARSHAL_FAILURE = 0, 0, None
[docs]def unmarshal(data_in):
"""Takes in binary data and maps builds the appropriate frame type,
returning a frame object.
:param str data_in: Raw byte stream data
:rtype: tuple of bytes consumed, channel, and a frame object
:raises: specification.FrameError
"""
# Look to see if it's a protocol header frame
try:
frame_value = _unmarshal_protocol_header_frame(data_in)
if frame_value:
return 8, 0, frame_value
except ValueError as error:
raise exceptions.UnmarshalingException(header.ProtocolHeader, error)
# Decode the low level frame and break it into parts
try:
frame_type, channel_id, frame_size = _frame_parts(data_in)
# Heartbeats do not have frame length indicators
if frame_type == specification.FRAME_HEARTBEAT and frame_size == 0:
return 8, channel_id, heartbeat.Heartbeat()
if not frame_size:
raise exceptions.UnmarshalingException('Unknown', 'No frame size')
byte_count = FRAME_HEADER_SIZE + frame_size + 1
if byte_count > len(data_in):
raise exceptions.UnmarshalingException('Unknown',
'Not all data received')
if data_in[byte_count - 1] != DECODE_FRAME_END_CHAR:
raise exceptions.UnmarshalingException('Unknown',
'Last byte error')
frame_data = data_in[FRAME_HEADER_SIZE:byte_count - 1]
except ValueError as error:
raise exceptions.UnmarshalingException('Unknown', error)
# Decode a method frame
if frame_type == specification.FRAME_METHOD:
return byte_count, channel_id, _unmarshal_method_frame(frame_data)
# Decode a header frame
elif frame_type == specification.FRAME_HEADER:
return byte_count, channel_id, _unmarshal_header_frame(frame_data)
# Decode a body frame
elif frame_type == specification.FRAME_BODY:
return byte_count, channel_id, _unmarshal_body_frame(frame_data)
raise exceptions.UnmarshalingException('Unknown',
'Unknown frame type: %i' %
frame_type)
[docs]def marshal(frame_value, channel_id):
"""Marshal a frame to be sent over the wire.
:param frame_value: The frame object to marshal
:type frame_value: pamqp.specification.Frame or pamqp.heartbeat.Heartbeat
:param int channel_id: The channel number to send the frame on
:rtype: str
:raises: ValueError
"""
if isinstance(frame_value, header.ProtocolHeader):
return frame_value.marshal()
elif isinstance(frame_value, specification.Frame):
return _marshal_method_frame(frame_value, channel_id)
elif isinstance(frame_value, header.ContentHeader):
return _marshal_content_header_frame(frame_value, channel_id)
elif isinstance(frame_value, body.ContentBody):
return _marshal_content_body_frame(frame_value, channel_id)
elif isinstance(frame_value, heartbeat.Heartbeat):
return frame_value.marshal()
raise ValueError('Could not determine frame type: %r' % frame_value)
def _unmarshal_protocol_header_frame(data_in):
"""Attempt to unmarshal a protocol header frame
The ProtocolHeader is abbreviated in size and functionality compared to
the rest of the frame types, so return UNMARSHAL_ERROR doesn't apply
as cleanly since we don't have all of the attributes to return even
regardless of success or failure.
:param str data_in: Raw byte stream data
:rtype: header.ProtocolHeader
:raises: ValueError
"""
# Do the first four bytes match?
if data_in[0:4] == AMQP:
try:
frame = header.ProtocolHeader()
frame.unmarshal(data_in)
return frame
except IndexError:
raise ValueError('Frame data did not meet minimum length')
def _unmarshal_method_frame(frame_data):
"""Attempt to unmarshal a method frame
:param str frame_data: Raw frame data to assign to our method frame
:return tuple: Amount of data consumed and the frame object
"""
# Get the Method Index from the class data
bytes_used, method_index = codec.decode.long_int(frame_data[0:4])
# Create an instance of the method object we're going to unmarshal
try:
method = specification.INDEX_MAPPING[method_index]()
except KeyError as error:
raise exceptions.UnmarshalingException('Unknown', error)
# Unmarshal the data
try:
method.unmarshal(frame_data[bytes_used:])
except struct.error as error:
raise exceptions.UnmarshalingException(method, error)
# Unmarshal the data in the object and return it
return method
def _unmarshal_header_frame(frame_data):
"""Attempt to unmarshal a header frame
:param str frame_data: Raw frame data to assign to our header frame
:return tuple: Amount of data consumed and the frame object
"""
content_header = header.ContentHeader()
try:
content_header.unmarshal(frame_data)
except struct.error as error:
raise exceptions.UnmarshalingException('ContentHeader', error)
return content_header
def _unmarshal_body_frame(frame_data):
"""Attempt to unmarshal a body frame
:param str frame_data: Raw frame data to assign to our body frame
:return tuple: Amount of data consumed and the frame object
"""
content_body = body.ContentBody()
content_body.unmarshal(frame_data)
return content_body
def _frame_parts(data_in):
"""Try and decode a low-level AMQP frame and return the parts of the frame.
:param str data_in: Raw byte stream data
:return tuple: frame type, channel number, and frame data to decode
"""
# Get the Frame Type, Channel Number and Frame Size
try:
return struct.unpack('>BHI', data_in[0:FRAME_HEADER_SIZE])
except struct.error:
# We didn't get a full frame
return UNMARSHAL_FAILURE
def _marshal(frame_type, channel_id, payload):
"""Marshal the low-level AMQ frame.
:param int frame_type: The frame type to marshal
:param int channel_id: The channel it will be sent on
:param str|bytes payload: The frame payload
:rtype: str or bytes
"""
header = struct.pack('>BHI', frame_type, channel_id, len(payload))
if PYTHON3:
return b''.join([header, payload, FRAME_END_CHAR])
return ''.join([header, payload, FRAME_END_CHAR])
def _marshal_content_body_frame(frame_value, channel_id):
"""Marshal as many content body frames as needed to transmit the content
:param body.ContentBody frame_value: Frame object to marshal
:param int channel_id: The channel number for the frame(s)
:rtype: str
"""
return _marshal(specification.FRAME_BODY, channel_id,
frame_value.marshal())
def _marshal_content_header_frame(frame_value, channel_id):
"""Marshal a content header frame
:param header.ContentHeader frame_value: Frame object to marshal
:param int channel_id: The channel number for the frame
:rtype: str
"""
return _marshal(specification.FRAME_HEADER, channel_id,
frame_value.marshal())
def _marshal_method_frame(frame_value, channel_id):
"""Marshal a method frame
:param specification.Frame frame_value: Frame object to marshal
:param int channel_id: The channel number for the frame
:rtype: str
"""
return _marshal(specification.FRAME_METHOD,
channel_id,
struct.pack('>I', frame_value.index) +
frame_value.marshal())