# -*- encoding: utf-8 -*-
"""Manage the marshaling and unmarshaling of AMQP frames
unmarshal will turn a raw AMQP byte stream into the appropriate AMQP objects
from the specification file.
marshal will take an object created from the specification file and turn it
into a raw byte stream.
"""
import logging
import struct
import typing
from pamqp import (base, body, commands, common, constants, decode, exceptions,
header, heartbeat)
LOGGER = logging.getLogger(__name__)
UNMARSHAL_FAILURE = 0, 0, None
FrameTypes = typing.Union[base.Frame, body.ContentBody, header.ContentHeader,
header.ProtocolHeader, heartbeat.Heartbeat]
[docs]def marshal(frame_value: FrameTypes, channel_id: int) -> bytes:
"""Marshal a frame to be sent over the wire.
:raises: ValueError
"""
if isinstance(frame_value, header.ProtocolHeader):
return frame_value.marshal()
elif isinstance(frame_value, base.Frame):
return _marshal_method_frame(frame_value, channel_id)
elif isinstance(frame_value, header.ContentHeader):
return _marshal_content_header_frame(frame_value, channel_id)
elif isinstance(frame_value, body.ContentBody):
return _marshal_content_body_frame(frame_value, channel_id)
elif isinstance(frame_value, heartbeat.Heartbeat):
return frame_value.marshal()
raise ValueError('Could not determine frame type: {}'.format(frame_value))
[docs]def unmarshal(data_in: bytes) -> typing.Tuple[int, int, FrameTypes]:
"""Takes in binary data and maps builds the appropriate frame type,
returning a frame object.
:returns: tuple of bytes consumed, channel, and a frame object
:raises: exceptions.UnmarshalingException
"""
try: # Look to see if it's a protocol header frame
value = _unmarshal_protocol_header_frame(data_in)
except ValueError as error:
raise exceptions.UnmarshalingException(header.ProtocolHeader, error)
else:
if value:
return 8, 0, value
frame_type, channel_id, frame_size = frame_parts(data_in)
# Heartbeats do not have frame length indicators
if frame_type == constants.FRAME_HEARTBEAT and frame_size == 0:
return 8, channel_id, heartbeat.Heartbeat()
if not frame_size:
raise exceptions.UnmarshalingException('Unknown', 'No frame size')
byte_count = constants.FRAME_HEADER_SIZE + frame_size + 1
if byte_count > len(data_in):
raise exceptions.UnmarshalingException('Unknown',
'Not all data received')
if data_in[byte_count - 1] != constants.FRAME_END:
raise exceptions.UnmarshalingException('Unknown', 'Last byte error')
frame_data = data_in[constants.FRAME_HEADER_SIZE:byte_count - 1]
if frame_type == constants.FRAME_METHOD:
return byte_count, channel_id, _unmarshal_method_frame(frame_data)
elif frame_type == constants.FRAME_HEADER:
return byte_count, channel_id, _unmarshal_header_frame(frame_data)
elif frame_type == constants.FRAME_BODY:
return byte_count, channel_id, _unmarshal_body_frame(frame_data)
raise exceptions.UnmarshalingException(
'Unknown', 'Unknown frame type: {}'.format(frame_type))
[docs]def frame_parts(data: bytes) -> typing.Tuple[int, int, typing.Optional[int]]:
"""Attempt to decode a low-level frame, returning frame parts"""
try: # Get the Frame Type, Channel Number and Frame Size
return struct.unpack('>BHI', data[0:constants.FRAME_HEADER_SIZE])
except struct.error: # Did not receive a full frame
return UNMARSHAL_FAILURE
def _marshal(frame_type: int, channel_id: int, payload: bytes) -> bytes:
"""Marshal the low-level AMQ frame"""
return b''.join([
struct.pack('>BHI', frame_type, channel_id, len(payload)), payload,
constants.FRAME_END_CHAR
])
def _marshal_content_body_frame(value: body.ContentBody,
channel_id: int) -> bytes:
"""Marshal as many content body frames as needed to transmit the content"""
return _marshal(constants.FRAME_BODY, channel_id, value.marshal())
def _marshal_content_header_frame(value: header.ContentHeader,
channel_id: int) -> bytes:
"""Marshal a content header frame"""
return _marshal(constants.FRAME_HEADER, channel_id, value.marshal())
def _marshal_method_frame(value: base.Frame, channel_id: int) -> bytes:
"""Marshal a method frame"""
return _marshal(constants.FRAME_METHOD, channel_id,
common.Struct.integer.pack(value.index) + value.marshal())
def _unmarshal_protocol_header_frame(data_in: bytes) \
-> typing.Optional[header.ProtocolHeader]:
"""Attempt to unmarshal a protocol header frame
The ProtocolHeader is abbreviated in size and functionality compared to
the rest of the frame types, so return UNMARSHAL_ERROR doesn't apply
as cleanly since we don't have all of the attributes to return even
regardless of success or failure.
:raises: ValueError
"""
if data_in[0:4] == constants.AMQP: # Do the first four bytes match?
frame = header.ProtocolHeader()
frame.unmarshal(data_in)
return frame
return None
def _unmarshal_method_frame(frame_data: bytes) -> base.Frame:
"""Attempt to unmarshal a method frame
:raises: pamqp.exceptions.UnmarshalingException
"""
bytes_used, method_index = decode.long_int(frame_data[0:4])
try:
method = commands.INDEX_MAPPING[method_index]()
except KeyError:
raise exceptions.UnmarshalingException(
'Unknown', 'Unknown method index: {}'.format(str(method_index)))
try:
method.unmarshal(frame_data[bytes_used:])
except struct.error as error:
raise exceptions.UnmarshalingException(method, error)
return method
def _unmarshal_header_frame(frame_data: bytes) -> header.ContentHeader:
"""Attempt to unmarshal a header frame
:raises: pamqp.exceptions.UnmarshalingException
"""
content_header = header.ContentHeader()
try:
content_header.unmarshal(frame_data)
except struct.error as error:
raise exceptions.UnmarshalingException('ContentHeader', error)
return content_header
def _unmarshal_body_frame(frame_data: bytes) -> body.ContentBody:
"""Attempt to unmarshal a body frame"""
content_body = body.ContentBody(b'')
content_body.unmarshal(frame_data)
return content_body