from lib import BitStreamReader, BitStreamWriter, encode_bin, decode_bin | |
from core import * | |
from adapters import * | |
#=============================================================================== | |
# fields | |
#=============================================================================== | |
def Field(name, length): | |
"""a field | |
* name - the name of the field | |
* length - the length of the field. the length can be either an integer | |
(StaticField), or a function that takes the context as an argument and | |
returns the length (MetaField) | |
""" | |
if callable(length): | |
return MetaField(name, length) | |
else: | |
return StaticField(name, length) | |
def BitField(name, length, swapped = False, signed = False, bytesize = 8): | |
"""a bit field; must be enclosed in a BitStruct | |
* name - the name of the field | |
* length - the length of the field in bits. the length can be either | |
an integer, or a function that takes the context as an argument and | |
returns the length | |
* swapped - whether the value is byte-swapped (little endian). the | |
default is False. | |
* signed - whether the value of the bitfield is a signed integer. the | |
default is False. | |
* bytesize - the number of bits in a byte (used for byte-swapping). the | |
default is 8. | |
""" | |
return BitIntegerAdapter(Field(name, length), | |
length, | |
swapped = swapped, | |
signed = signed, | |
bytesize = bytesize | |
) | |
def Padding(length, pattern = "\x00", strict = False): | |
r"""a padding field (value is discarded) | |
* length - the length of the field. the length can be either an integer, | |
or a function that takes the context as an argument and returns the | |
length | |
* pattern - the padding pattern (character) to use. default is "\x00" | |
* strict - whether or not to raise an exception is the actual padding | |
pattern mismatches the desired pattern. default is False. | |
""" | |
return PaddingAdapter(Field(None, length), | |
pattern = pattern, | |
strict = strict, | |
) | |
def Flag(name, truth = 1, falsehood = 0, default = False): | |
"""a flag field (True or False) | |
* name - the name of the field | |
* truth - the numeric value of truth. the default is 1. | |
* falsehood - the numeric value of falsehood. the default is 0. | |
* default - the default value to assume, when the value is neither | |
`truth` nor `falsehood`. the default is False. | |
""" | |
return SymmetricMapping(Field(name, 1), | |
{True : chr(truth), False : chr(falsehood)}, | |
default = default, | |
) | |
#=============================================================================== | |
# field shortcuts | |
#=============================================================================== | |
def Bit(name): | |
"""a 1-bit BitField; must be enclosed in a BitStruct""" | |
return BitField(name, 1) | |
def Nibble(name): | |
"""a 4-bit BitField; must be enclosed in a BitStruct""" | |
return BitField(name, 4) | |
def Octet(name): | |
"""an 8-bit BitField; must be enclosed in a BitStruct""" | |
return BitField(name, 8) | |
def UBInt8(name): | |
"""unsigned, big endian 8-bit integer""" | |
return FormatField(name, ">", "B") | |
def UBInt16(name): | |
"""unsigned, big endian 16-bit integer""" | |
return FormatField(name, ">", "H") | |
def UBInt32(name): | |
"""unsigned, big endian 32-bit integer""" | |
return FormatField(name, ">", "L") | |
def UBInt64(name): | |
"""unsigned, big endian 64-bit integer""" | |
return FormatField(name, ">", "Q") | |
def SBInt8(name): | |
"""signed, big endian 8-bit integer""" | |
return FormatField(name, ">", "b") | |
def SBInt16(name): | |
"""signed, big endian 16-bit integer""" | |
return FormatField(name, ">", "h") | |
def SBInt32(name): | |
"""signed, big endian 32-bit integer""" | |
return FormatField(name, ">", "l") | |
def SBInt64(name): | |
"""signed, big endian 64-bit integer""" | |
return FormatField(name, ">", "q") | |
def ULInt8(name): | |
"""unsigned, little endian 8-bit integer""" | |
return FormatField(name, "<", "B") | |
def ULInt16(name): | |
"""unsigned, little endian 16-bit integer""" | |
return FormatField(name, "<", "H") | |
def ULInt32(name): | |
"""unsigned, little endian 32-bit integer""" | |
return FormatField(name, "<", "L") | |
def ULInt64(name): | |
"""unsigned, little endian 64-bit integer""" | |
return FormatField(name, "<", "Q") | |
def SLInt8(name): | |
"""signed, little endian 8-bit integer""" | |
return FormatField(name, "<", "b") | |
def SLInt16(name): | |
"""signed, little endian 16-bit integer""" | |
return FormatField(name, "<", "h") | |
def SLInt32(name): | |
"""signed, little endian 32-bit integer""" | |
return FormatField(name, "<", "l") | |
def SLInt64(name): | |
"""signed, little endian 64-bit integer""" | |
return FormatField(name, "<", "q") | |
def UNInt8(name): | |
"""unsigned, native endianity 8-bit integer""" | |
return FormatField(name, "=", "B") | |
def UNInt16(name): | |
"""unsigned, native endianity 16-bit integer""" | |
return FormatField(name, "=", "H") | |
def UNInt32(name): | |
"""unsigned, native endianity 32-bit integer""" | |
return FormatField(name, "=", "L") | |
def UNInt64(name): | |
"""unsigned, native endianity 64-bit integer""" | |
return FormatField(name, "=", "Q") | |
def SNInt8(name): | |
"""signed, native endianity 8-bit integer""" | |
return FormatField(name, "=", "b") | |
def SNInt16(name): | |
"""signed, native endianity 16-bit integer""" | |
return FormatField(name, "=", "h") | |
def SNInt32(name): | |
"""signed, native endianity 32-bit integer""" | |
return FormatField(name, "=", "l") | |
def SNInt64(name): | |
"""signed, native endianity 64-bit integer""" | |
return FormatField(name, "=", "q") | |
def BFloat32(name): | |
"""big endian, 32-bit IEEE floating point number""" | |
return FormatField(name, ">", "f") | |
def LFloat32(name): | |
"""little endian, 32-bit IEEE floating point number""" | |
return FormatField(name, "<", "f") | |
def NFloat32(name): | |
"""native endianity, 32-bit IEEE floating point number""" | |
return FormatField(name, "=", "f") | |
def BFloat64(name): | |
"""big endian, 64-bit IEEE floating point number""" | |
return FormatField(name, ">", "d") | |
def LFloat64(name): | |
"""little endian, 64-bit IEEE floating point number""" | |
return FormatField(name, "<", "d") | |
def NFloat64(name): | |
"""native endianity, 64-bit IEEE floating point number""" | |
return FormatField(name, "=", "d") | |
#=============================================================================== | |
# arrays | |
#=============================================================================== | |
def Array(count, subcon): | |
"""array of subcon repeated count times. | |
* subcon - the subcon. | |
* count - an integer, or a function taking the context as an argument, | |
returning the count | |
""" | |
if callable(count): | |
con = MetaArray(count, subcon) | |
else: | |
con = MetaArray(lambda ctx: count, subcon) | |
con._clear_flag(con.FLAG_DYNAMIC) | |
return con | |
def PrefixedArray(subcon, length_field = UBInt8("length")): | |
"""an array prefixed by a length field. | |
* subcon - the subcon to be repeated | |
* length_field - an integer construct | |
""" | |
return LengthValueAdapter( | |
Sequence(subcon.name, | |
length_field, | |
Array(lambda ctx: ctx[length_field.name], subcon), | |
nested = False | |
) | |
) | |
def OpenRange(mincount, subcon): | |
from sys import maxint | |
return Range(mincount, maxint, subcon) | |
def GreedyRange(subcon): | |
"""an open range (1 or more times) of repeated subcon. | |
* subcon - the subcon to repeat""" | |
return OpenRange(1, subcon) | |
def OptionalGreedyRange(subcon): | |
"""an open range (0 or more times) of repeated subcon. | |
* subcon - the subcon to repeat""" | |
return OpenRange(0, subcon) | |
#=============================================================================== | |
# subconstructs | |
#=============================================================================== | |
def Optional(subcon): | |
"""an optional construct. if parsing fails, returns None. | |
* subcon - the subcon to optionally parse or build | |
""" | |
return Select(subcon.name, subcon, Pass) | |
def Bitwise(subcon): | |
"""converts the stream to bits, and passes the bitstream to subcon | |
* subcon - a bitwise construct (usually BitField) | |
""" | |
# subcons larger than MAX_BUFFER will be wrapped by Restream instead | |
# of Buffered. implementation details, don't stick your nose :) | |
MAX_BUFFER = 1024 * 8 | |
def resizer(length): | |
if length & 7: | |
raise SizeofError("size must be a multiple of 8", length) | |
return length >> 3 | |
if not subcon._is_flag(subcon.FLAG_DYNAMIC) and subcon.sizeof() < MAX_BUFFER: | |
con = Buffered(subcon, | |
encoder = decode_bin, | |
decoder = encode_bin, | |
resizer = resizer | |
) | |
else: | |
con = Restream(subcon, | |
stream_reader = BitStreamReader, | |
stream_writer = BitStreamWriter, | |
resizer = resizer) | |
return con | |
def Aligned(subcon, modulus = 4, pattern = "\x00"): | |
r"""aligns subcon to modulus boundary using padding pattern | |
* subcon - the subcon to align | |
* modulus - the modulus boundary (default is 4) | |
* pattern - the padding pattern (default is \x00) | |
""" | |
if modulus < 2: | |
raise ValueError("modulus must be >= 2", modulus) | |
if modulus in (2, 4, 8, 16, 32, 64, 128, 256, 512, 1024): | |
def padlength(ctx): | |
m1 = modulus - 1 | |
return (modulus - (subcon._sizeof(ctx) & m1)) & m1 | |
else: | |
def padlength(ctx): | |
return (modulus - (subcon._sizeof(ctx) % modulus)) % modulus | |
return IndexingAdapter( | |
Sequence(subcon.name, | |
subcon, | |
Padding(padlength, pattern = pattern), | |
nested = False, | |
), | |
0 | |
) | |
def Embedded(subcon): | |
"""embeds a struct into the enclosing struct. | |
* subcon - the struct to embed | |
""" | |
return Reconfig(subcon.name, subcon, subcon.FLAG_EMBED) | |
def Rename(newname, subcon): | |
"""renames an existing construct | |
* newname - the new name | |
* subcon - the subcon to rename | |
""" | |
return Reconfig(newname, subcon) | |
def Alias(newname, oldname): | |
"""creates an alias for an existing element in a struct | |
* newname - the new name | |
* oldname - the name of an existing element | |
""" | |
return Value(newname, lambda ctx: ctx[oldname]) | |
#=============================================================================== | |
# mapping | |
#=============================================================================== | |
def SymmetricMapping(subcon, mapping, default = NotImplemented): | |
"""defines a symmetrical mapping: a->b, b->a. | |
* subcon - the subcon to map | |
* mapping - the encoding mapping (a dict); the decoding mapping is | |
achieved by reversing this mapping | |
* default - the default value to use when no mapping is found. if no | |
default value is given, and exception is raised. setting to Pass would | |
return the value "as is" (unmapped) | |
""" | |
reversed_mapping = dict((v, k) for k, v in mapping.iteritems()) | |
return MappingAdapter(subcon, | |
encoding = mapping, | |
decoding = reversed_mapping, | |
encdefault = default, | |
decdefault = default, | |
) | |
def Enum(subcon, **kw): | |
"""a set of named values mapping. | |
* subcon - the subcon to map | |
* kw - keyword arguments which serve as the encoding mapping | |
* _default_ - an optional, keyword-only argument that specifies the | |
default value to use when the mapping is undefined. if not given, | |
and exception is raised when the mapping is undefined. use `Pass` to | |
pass the unmapped value as-is | |
""" | |
return SymmetricMapping(subcon, kw, kw.pop("_default_", NotImplemented)) | |
def FlagsEnum(subcon, **kw): | |
"""a set of flag values mapping. | |
* subcon - the subcon to map | |
* kw - keyword arguments which serve as the encoding mapping | |
""" | |
return FlagsAdapter(subcon, kw) | |
#=============================================================================== | |
# structs | |
#=============================================================================== | |
def AlignedStruct(name, *subcons, **kw): | |
"""a struct of aligned fields | |
* name - the name of the struct | |
* subcons - the subcons that make up this structure | |
* kw - keyword arguments to pass to Aligned: 'modulus' and 'pattern' | |
""" | |
return Struct(name, *(Aligned(sc, **kw) for sc in subcons)) | |
def BitStruct(name, *subcons): | |
"""a struct of bitwise fields | |
* name - the name of the struct | |
* subcons - the subcons that make up this structure | |
""" | |
return Bitwise(Struct(name, *subcons)) | |
def EmbeddedBitStruct(*subcons): | |
"""an embedded BitStruct. no name is necessary. | |
* subcons - the subcons that make up this structure | |
""" | |
return Bitwise(Embedded(Struct(None, *subcons))) | |
#=============================================================================== | |
# strings | |
#=============================================================================== | |
def String(name, length, encoding = None, padchar = None, | |
paddir = "right", trimdir = "right"): | |
"""a fixed-length, optionally padded string of characters | |
* name - the name of the field | |
* length - the length (integer) | |
* encoding - the encoding to use (e.g., "utf8"), or None, for raw bytes. | |
default is None | |
* padchar - the padding character (commonly "\x00"), or None to | |
disable padding. default is None | |
* paddir - the direction where padding is placed ("right", "left", or | |
"center"). the default is "right". this argument is meaningless if | |
padchar is None. | |
* trimdir - the direction where trimming will take place ("right" or | |
"left"). the default is "right". trimming is only meaningful for | |
building, when the given string is too long. this argument is | |
meaningless if padchar is None. | |
""" | |
con = StringAdapter(Field(name, length), encoding = encoding) | |
if padchar is not None: | |
con = PaddedStringAdapter(con, | |
padchar = padchar, | |
paddir = paddir, | |
trimdir = trimdir | |
) | |
return con | |
def PascalString(name, length_field = UBInt8("length"), encoding = None): | |
"""a string prefixed with a length field. the data must directly follow | |
the length field. | |
* name - the name of the | |
* length_field - a numeric construct (i.e., UBInt8) that holds the | |
length. default is an unsigned, 8-bit integer field. note that this | |
argument must pass an instance of a construct, not a class | |
(`UBInt8("length")` rather than `UBInt8`) | |
* encoding - the encoding to use (e.g., "utf8"), or None, for raw bytes. | |
default is None | |
""" | |
return StringAdapter( | |
LengthValueAdapter( | |
Sequence(name, | |
length_field, | |
Field("data", lambda ctx: ctx[length_field.name]), | |
) | |
), | |
encoding = encoding, | |
) | |
def CString(name, terminators = "\x00", encoding = None, | |
char_field = Field(None, 1)): | |
r"""a c-style string (string terminated by a terminator char) | |
* name - the name fo the string | |
* terminators - a sequence of terminator chars. default is "\x00". | |
* encoding - the encoding to use (e.g., "utf8"), or None, for raw bytes. | |
default is None | |
* char_field - the construct that represents a single character. default | |
is a one-byte character. note that this argument must be an instance | |
of a construct, not a construct class (`Field("char", 1)` rather than | |
`Field`) | |
""" | |
return Rename(name, | |
CStringAdapter( | |
RepeatUntil(lambda obj, ctx: obj in terminators, | |
char_field, | |
), | |
terminators = terminators, | |
encoding = encoding, | |
) | |
) | |
#=============================================================================== | |
# conditional | |
#=============================================================================== | |
def IfThenElse(name, predicate, then_subcon, else_subcon): | |
"""an if-then-else conditional construct: if the predicate indicates True, | |
`then_subcon` will be used; otherwise `else_subcon` | |
* name - the name of the construct | |
* predicate - a function taking the context as an argument and returning | |
True or False | |
* then_subcon - the subcon that will be used if the predicate returns True | |
* else_subcon - the subcon that will be used if the predicate returns False | |
""" | |
return Switch(name, lambda ctx: bool(predicate(ctx)), | |
{ | |
True : then_subcon, | |
False : else_subcon, | |
} | |
) | |
def If(predicate, subcon, elsevalue = None): | |
"""an if-then conditional construct: if the predicate indicates True, | |
subcon will be used; otherwise, `elsevalue` will be returned instead. | |
* predicate - a function taking the context as an argument and returning | |
True or False | |
* subcon - the subcon that will be used if the predicate returns True | |
* elsevalue - the value that will be used should the predicate return False. | |
by default this value is None. | |
""" | |
return IfThenElse(subcon.name, | |
predicate, | |
subcon, | |
Value("elsevalue", lambda ctx: elsevalue) | |
) | |
#=============================================================================== | |
# misc | |
#=============================================================================== | |
def OnDemandPointer(offsetfunc, subcon, force_build = True): | |
"""an on-demand pointer. | |
* offsetfunc - a function taking the context as an argument and returning | |
the absolute stream position | |
* subcon - the subcon that will be parsed from the `offsetfunc()` stream | |
position on demand | |
* force_build - see OnDemand. by default True. | |
""" | |
return OnDemand(Pointer(offsetfunc, subcon), | |
advance_stream = False, | |
force_build = force_build | |
) | |