from core import Adapter, AdaptationError, Pass | |
from lib import int_to_bin, bin_to_int, swap_bytes, StringIO | |
from lib import FlagsContainer, HexString | |
#=============================================================================== | |
# exceptions | |
#=============================================================================== | |
class BitIntegerError(AdaptationError): | |
__slots__ = [] | |
class MappingError(AdaptationError): | |
__slots__ = [] | |
class ConstError(AdaptationError): | |
__slots__ = [] | |
class ValidationError(AdaptationError): | |
__slots__ = [] | |
class PaddingError(AdaptationError): | |
__slots__ = [] | |
#=============================================================================== | |
# adapters | |
#=============================================================================== | |
class BitIntegerAdapter(Adapter): | |
""" | |
Adapter for bit-integers (converts bitstrings to integers, and vice versa). | |
See BitField. | |
Parameters: | |
* subcon - the subcon to adapt | |
* width - the size of the subcon, in bits | |
* swapped - whether to swap byte order (little endian/big endian). | |
default is False (big endian) | |
* signed - whether the value is signed (two's complement). the default | |
is False (unsigned) | |
* bytesize - number of bits per byte, used for byte-swapping (if swapped). | |
default is 8. | |
""" | |
__slots__ = ["width", "swapped", "signed", "bytesize"] | |
def __init__(self, subcon, width, swapped = False, signed = False, | |
bytesize = 8): | |
Adapter.__init__(self, subcon) | |
self.width = width | |
self.swapped = swapped | |
self.signed = signed | |
self.bytesize = bytesize | |
def _encode(self, obj, context): | |
if obj < 0 and not self.signed: | |
raise BitIntegerError("object is negative, but field is not signed", | |
obj) | |
obj2 = int_to_bin(obj, width = self.width) | |
if self.swapped: | |
obj2 = swap_bytes(obj2, bytesize = self.bytesize) | |
return obj2 | |
def _decode(self, obj, context): | |
if self.swapped: | |
obj = swap_bytes(obj, bytesize = self.bytesize) | |
return bin_to_int(obj, signed = self.signed) | |
class MappingAdapter(Adapter): | |
""" | |
Adapter that maps objects to other objects. | |
See SymmetricMapping and Enum. | |
Parameters: | |
* subcon - the subcon to map | |
* decoding - the decoding (parsing) mapping (a dict) | |
* encoding - the encoding (building) mapping (a dict) | |
* decdefault - the default return value when the object is not found | |
in the decoding mapping. if no object is given, an exception is raised. | |
if `Pass` is used, the unmapped object will be passed as-is | |
* encdefault - the default return value when the object is not found | |
in the encoding mapping. if no object is given, an exception is raised. | |
if `Pass` is used, the unmapped object will be passed as-is | |
""" | |
__slots__ = ["encoding", "decoding", "encdefault", "decdefault"] | |
def __init__(self, subcon, decoding, encoding, | |
decdefault = NotImplemented, encdefault = NotImplemented): | |
Adapter.__init__(self, subcon) | |
self.decoding = decoding | |
self.encoding = encoding | |
self.decdefault = decdefault | |
self.encdefault = encdefault | |
def _encode(self, obj, context): | |
try: | |
return self.encoding[obj] | |
except (KeyError, TypeError): | |
if self.encdefault is NotImplemented: | |
raise MappingError("no encoding mapping for %r" % (obj,)) | |
if self.encdefault is Pass: | |
return obj | |
return self.encdefault | |
def _decode(self, obj, context): | |
try: | |
return self.decoding[obj] | |
except (KeyError, TypeError): | |
if self.decdefault is NotImplemented: | |
raise MappingError("no decoding mapping for %r" % (obj,)) | |
if self.decdefault is Pass: | |
return obj | |
return self.decdefault | |
class FlagsAdapter(Adapter): | |
""" | |
Adapter for flag fields. Each flag is extracted from the number, resulting | |
in a FlagsContainer object. Not intended for direct usage. | |
See FlagsEnum. | |
Parameters | |
* subcon - the subcon to extract | |
* flags - a dictionary mapping flag-names to their value | |
""" | |
__slots__ = ["flags"] | |
def __init__(self, subcon, flags): | |
Adapter.__init__(self, subcon) | |
self.flags = flags | |
def _encode(self, obj, context): | |
flags = 0 | |
for name, value in self.flags.iteritems(): | |
if getattr(obj, name, False): | |
flags |= value | |
return flags | |
def _decode(self, obj, context): | |
obj2 = FlagsContainer() | |
for name, value in self.flags.iteritems(): | |
setattr(obj2, name, bool(obj & value)) | |
return obj2 | |
class StringAdapter(Adapter): | |
""" | |
Adapter for strings. Converts a sequence of characters into a python | |
string, and optionally handles character encoding. | |
See String. | |
Parameters: | |
* subcon - the subcon to convert | |
* encoding - the character encoding name (e.g., "utf8"), or None to | |
return raw bytes (usually 8-bit ASCII). | |
""" | |
__slots__ = ["encoding"] | |
def __init__(self, subcon, encoding = None): | |
Adapter.__init__(self, subcon) | |
self.encoding = encoding | |
def _encode(self, obj, context): | |
if self.encoding: | |
obj = obj.encode(self.encoding) | |
return obj | |
def _decode(self, obj, context): | |
obj = "".join(obj) | |
if self.encoding: | |
obj = obj.decode(self.encoding) | |
return obj | |
class PaddedStringAdapter(Adapter): | |
r""" | |
Adapter for padded strings. | |
See String. | |
Parameters: | |
* subcon - the subcon to adapt | |
* padchar - the padding character. default is "\x00". | |
* paddir - the direction where padding is placed ("right", "left", or | |
"center"). the default is "right". | |
* trimdir - the direction where trimming will take place ("right" or | |
"left"). the default is "right". trimming is only meaningful for | |
building, when the given string is too long. | |
""" | |
__slots__ = ["padchar", "paddir", "trimdir"] | |
def __init__(self, subcon, padchar = "\x00", paddir = "right", | |
trimdir = "right"): | |
if paddir not in ("right", "left", "center"): | |
raise ValueError("paddir must be 'right', 'left' or 'center'", | |
paddir) | |
if trimdir not in ("right", "left"): | |
raise ValueError("trimdir must be 'right' or 'left'", trimdir) | |
Adapter.__init__(self, subcon) | |
self.padchar = padchar | |
self.paddir = paddir | |
self.trimdir = trimdir | |
def _decode(self, obj, context): | |
if self.paddir == "right": | |
obj = obj.rstrip(self.padchar) | |
elif self.paddir == "left": | |
obj = obj.lstrip(self.padchar) | |
else: | |
obj = obj.strip(self.padchar) | |
return obj | |
def _encode(self, obj, context): | |
size = self._sizeof(context) | |
if self.paddir == "right": | |
obj = obj.ljust(size, self.padchar) | |
elif self.paddir == "left": | |
obj = obj.rjust(size, self.padchar) | |
else: | |
obj = obj.center(size, self.padchar) | |
if len(obj) > size: | |
if self.trimdir == "right": | |
obj = obj[:size] | |
else: | |
obj = obj[-size:] | |
return obj | |
class LengthValueAdapter(Adapter): | |
""" | |
Adapter for length-value pairs. It extracts only the value from the | |
pair, and calculates the length based on the value. | |
See PrefixedArray and PascalString. | |
Parameters: | |
* subcon - the subcon returning a length-value pair | |
""" | |
__slots__ = [] | |
def _encode(self, obj, context): | |
return (len(obj), obj) | |
def _decode(self, obj, context): | |
return obj[1] | |
class CStringAdapter(StringAdapter): | |
r""" | |
Adapter for C-style strings (strings terminated by a terminator char). | |
Parameters: | |
* subcon - the subcon to convert | |
* terminators - a sequence of terminator chars. default is "\x00". | |
* encoding - the character encoding to use (e.g., "utf8"), or None to | |
return raw-bytes. the terminator characters are not affected by the | |
encoding. | |
""" | |
__slots__ = ["terminators"] | |
def __init__(self, subcon, terminators = "\x00", encoding = None): | |
StringAdapter.__init__(self, subcon, encoding = encoding) | |
self.terminators = terminators | |
def _encode(self, obj, context): | |
return StringAdapter._encode(self, obj, context) + self.terminators[0] | |
def _decode(self, obj, context): | |
return StringAdapter._decode(self, obj[:-1], context) | |
class TunnelAdapter(Adapter): | |
""" | |
Adapter for tunneling (as in protocol tunneling). A tunnel is construct | |
nested upon another (layering). For parsing, the lower layer first parses | |
the data (note: it must return a string!), then the upper layer is called | |
to parse that data (bottom-up). For building it works in a top-down manner; | |
first the upper layer builds the data, then the lower layer takes it and | |
writes it to the stream. | |
Parameters: | |
* subcon - the lower layer subcon | |
* inner_subcon - the upper layer (tunneled/nested) subcon | |
Example: | |
# a pascal string containing compressed data (zlib encoding), so first | |
# the string is read, decompressed, and finally re-parsed as an array | |
# of UBInt16 | |
TunnelAdapter( | |
PascalString("data", encoding = "zlib"), | |
GreedyRange(UBInt16("elements")) | |
) | |
""" | |
__slots__ = ["inner_subcon"] | |
def __init__(self, subcon, inner_subcon): | |
Adapter.__init__(self, subcon) | |
self.inner_subcon = inner_subcon | |
def _decode(self, obj, context): | |
return self.inner_subcon._parse(StringIO(obj), context) | |
def _encode(self, obj, context): | |
stream = StringIO() | |
self.inner_subcon._build(obj, stream, context) | |
return stream.getvalue() | |
class ExprAdapter(Adapter): | |
""" | |
A generic adapter that accepts 'encoder' and 'decoder' as parameters. You | |
can use ExprAdapter instead of writing a full-blown class when only a | |
simple expression is needed. | |
Parameters: | |
* subcon - the subcon to adapt | |
* encoder - a function that takes (obj, context) and returns an encoded | |
version of obj | |
* decoder - a function that takes (obj, context) and returns an decoded | |
version of obj | |
Example: | |
ExprAdapter(UBInt8("foo"), | |
encoder = lambda obj, ctx: obj / 4, | |
decoder = lambda obj, ctx: obj * 4, | |
) | |
""" | |
__slots__ = ["_encode", "_decode"] | |
def __init__(self, subcon, encoder, decoder): | |
Adapter.__init__(self, subcon) | |
self._encode = encoder | |
self._decode = decoder | |
class HexDumpAdapter(Adapter): | |
""" | |
Adapter for hex-dumping strings. It returns a HexString, which is a string | |
""" | |
__slots__ = ["linesize"] | |
def __init__(self, subcon, linesize = 16): | |
Adapter.__init__(self, subcon) | |
self.linesize = linesize | |
def _encode(self, obj, context): | |
return obj | |
def _decode(self, obj, context): | |
return HexString(obj, linesize = self.linesize) | |
class ConstAdapter(Adapter): | |
""" | |
Adapter for enforcing a constant value ("magic numbers"). When decoding, | |
the return value is checked; when building, the value is substituted in. | |
Parameters: | |
* subcon - the subcon to validate | |
* value - the expected value | |
Example: | |
Const(Field("signature", 2), "MZ") | |
""" | |
__slots__ = ["value"] | |
def __init__(self, subcon, value): | |
Adapter.__init__(self, subcon) | |
self.value = value | |
def _encode(self, obj, context): | |
if obj is None or obj == self.value: | |
return self.value | |
else: | |
raise ConstError("expected %r, found %r" % (self.value, obj)) | |
def _decode(self, obj, context): | |
if obj != self.value: | |
raise ConstError("expected %r, found %r" % (self.value, obj)) | |
return obj | |
class SlicingAdapter(Adapter): | |
""" | |
Adapter for slicing a list (getting a slice from that list) | |
Parameters: | |
* subcon - the subcon to slice | |
* start - start index | |
* stop - stop index (or None for up-to-end) | |
* step - step (or None for every element) | |
""" | |
__slots__ = ["start", "stop", "step"] | |
def __init__(self, subcon, start, stop = None): | |
Adapter.__init__(self, subcon) | |
self.start = start | |
self.stop = stop | |
def _encode(self, obj, context): | |
if self.start is None: | |
return obj | |
return [None] * self.start + obj | |
def _decode(self, obj, context): | |
return obj[self.start:self.stop] | |
class IndexingAdapter(Adapter): | |
""" | |
Adapter for indexing a list (getting a single item from that list) | |
Parameters: | |
* subcon - the subcon to index | |
* index - the index of the list to get | |
""" | |
__slots__ = ["index"] | |
def __init__(self, subcon, index): | |
Adapter.__init__(self, subcon) | |
if type(index) is not int: | |
raise TypeError("index must be an integer", type(index)) | |
self.index = index | |
def _encode(self, obj, context): | |
return [None] * self.index + [obj] | |
def _decode(self, obj, context): | |
return obj[self.index] | |
class PaddingAdapter(Adapter): | |
r""" | |
Adapter for padding. | |
Parameters: | |
* subcon - the subcon to pad | |
* pattern - the padding pattern (character). default is "\x00") | |
* strict - whether or not to verify, during parsing, that the given | |
padding matches the padding pattern. default is False (unstrict) | |
""" | |
__slots__ = ["pattern", "strict"] | |
def __init__(self, subcon, pattern = "\x00", strict = False): | |
Adapter.__init__(self, subcon) | |
self.pattern = pattern | |
self.strict = strict | |
def _encode(self, obj, context): | |
return self._sizeof(context) * self.pattern | |
def _decode(self, obj, context): | |
if self.strict: | |
expected = self._sizeof(context) * self.pattern | |
if obj != expected: | |
raise PaddingError("expected %r, found %r" % (expected, obj)) | |
return obj | |
#=============================================================================== | |
# validators | |
#=============================================================================== | |
class Validator(Adapter): | |
""" | |
Abstract class: validates a condition on the encoded/decoded object. | |
Override _validate(obj, context) in deriving classes. | |
Parameters: | |
* subcon - the subcon to validate | |
""" | |
__slots__ = [] | |
def _decode(self, obj, context): | |
if not self._validate(obj, context): | |
raise ValidationError("invalid object", obj) | |
return obj | |
def _encode(self, obj, context): | |
return self._decode(obj, context) | |
def _validate(self, obj, context): | |
raise NotImplementedError() | |
class OneOf(Validator): | |
""" | |
Validates that the value is one of the listed values | |
Parameters: | |
* subcon - the subcon to validate | |
* valids - a set of valid values | |
""" | |
__slots__ = ["valids"] | |
def __init__(self, subcon, valids): | |
Validator.__init__(self, subcon) | |
self.valids = valids | |
def _validate(self, obj, context): | |
return obj in self.valids | |
class NoneOf(Validator): | |
""" | |
Validates that the value is none of the listed values | |
Parameters: | |
* subcon - the subcon to validate | |
* invalids - a set of invalid values | |
""" | |
__slots__ = ["invalids"] | |
def __init__(self, subcon, invalids): | |
Validator.__init__(self, subcon) | |
self.invalids = invalids | |
def _validate(self, obj, context): | |
return obj not in self.invalids | |